diff --git a/neutron/plugins/nicira/vshield/tasks/tasks.py b/neutron/plugins/nicira/vshield/tasks/tasks.py index 5a76986961..2128b3cbd6 100755 --- a/neutron/plugins/nicira/vshield/tasks/tasks.py +++ b/neutron/plugins/nicira/vshield/tasks/tasks.py @@ -169,12 +169,11 @@ class TaskManager(): self._req = event.Event() # TaskHandler stopped event - self._stopped = event.Event() + self._stopped = False # Periodic function trigger self._monitor = None self._monitor_busy = False - self._monitor_stop = None # Thread handling the task request self._thread = None @@ -221,8 +220,8 @@ class TaskManager(): def _check_pending_tasks(self): """Check all pending tasks status.""" for resource_id in self._tasks.keys(): - if self._monitor_stop: - # looping call is asked to stop, return now + if self._stopped: + # Task manager is stopped, return now return tasks = self._tasks[resource_id] @@ -270,6 +269,12 @@ class TaskManager(): def _abort(self): """Abort all tasks.""" + # put all tasks haven't been received by main thread to queue + # so the following abort handling can cover them + for t in self._tasks_queue: + self._enqueue(t) + self._tasks_queue.clear() + for resource_id in self._tasks.keys(): tasks = list(self._tasks[resource_id]) for task in tasks: @@ -287,6 +292,15 @@ class TaskManager(): def run(self): while True: try: + if self._stopped: + # Somehow greenlet.GreenletExit exception is ignored + # during unit-test when self._execute() is making db + # access. This makes this thread not terminating and + # stop() caller wait indefinitely. So we added a check + # here before trying to do a block call on getting a + # task from queue + break + # get a task from queue, or timeout for periodic status check task = self._get_task() if task.resource_id in self._tasks: @@ -295,13 +309,17 @@ class TaskManager(): self._enqueue(task) continue - status = self._execute(task) - - if status != TaskStatus.PENDING: - self._result(task) - continue - - self._enqueue(task) + try: + self._execute(task) + finally: + if task.status is None: + # The thread is killed during _execute(). To guarantee + # the task been aborted correctly, put it to the queue. + self._enqueue(task) + elif task.status != TaskStatus.PENDING: + self._result(task) + else: + self._enqueue(task) except greenlet.GreenletExit: break except Exception: @@ -310,11 +328,8 @@ class TaskManager(): self._monitor.stop() if self._monitor_busy: - self._monitor_stop = event.Event() - self._monitor_stop.wait() - self._monitor_stop = None + self._monitor.wait() self._abort() - self._stopped.send() def add(self, task): task.id = uuid.uuid1() @@ -324,10 +339,11 @@ class TaskManager(): return task.id def stop(self): - if not self._thread: + if self._thread is None: return + self._stopped = True self._thread.kill() - self._stopped.wait() + self._thread.wait() self._thread = None def has_pending_task(self): @@ -357,26 +373,27 @@ class TaskManager(): self.run() def _loopingcall_callback(): + self._monitor_busy = True try: - self._monitor_busy = True self._check_pending_tasks() - self._monitor_busy = False - if self._monitor_stop: - self._monitor_stop.send() except Exception: LOG.exception(_("Exception in _check_pending_tasks")) + self._monitor_busy = False - if self._thread: + if self._thread is not None: return self if interval is None or interval == 0: interval = self._interval + self._stopped = False self._thread = greenthread.spawn(_inner) self._monitor = loopingcall.FixedIntervalLoopingCall( _loopingcall_callback) self._monitor.start(interval / 1000.0, interval / 1000.0) + # To allow the created thread start running + greenthread.sleep(0) return self diff --git a/neutron/tests/unit/nicira/test_vcns_driver.py b/neutron/tests/unit/nicira/test_vcns_driver.py index b5867f4b73..e26e2a1318 100644 --- a/neutron/tests/unit/nicira/test_vcns_driver.py +++ b/neutron/tests/unit/nicira/test_vcns_driver.py @@ -212,18 +212,25 @@ class VcnsDriverTaskManagerTestCase(base.BaseTestCase): task.wait(TaskState.RESULT) self.assertTrue(task.userdata['result']) - def test_task_manager_stop(self): + def _test_task_manager_stop(self, exec_wait=False, result_wait=False, + stop_wait=0): def _exec(task): + if exec_wait: + greenthread.sleep(0.01) return TaskStatus.PENDING def _status(task): - greenthread.sleep(0.1) + greenthread.sleep(0.01) return TaskStatus.PENDING def _result(task): + if result_wait: + greenthread.sleep(0) pass manager = ts.TaskManager().start(100) + manager.stop() + manager.start(100) alltasks = {} for i in range(100): @@ -235,12 +242,25 @@ class VcnsDriverTaskManagerTestCase(base.BaseTestCase): tasks.append(task) alltasks[res] = tasks - greenthread.sleep(2) + greenthread.sleep(stop_wait) manager.stop() + for res, tasks in alltasks.iteritems(): for task in tasks: self.assertEqual(task.status, TaskStatus.ABORT) + def test_task_manager_stop_1(self): + self._test_task_manager_stop(True, True, 0) + + def test_task_manager_stop_2(self): + self._test_task_manager_stop(True, True, 1) + + def test_task_manager_stop_3(self): + self._test_task_manager_stop(False, False, 0) + + def test_task_manager_stop_4(self): + self._test_task_manager_stop(False, False, 1) + class VcnsDriverTestCase(base.BaseTestCase):