Merge "vshield task manager: abort tasks in stop() on termination"
This commit is contained in:
commit
6ed90e2466
@ -20,7 +20,6 @@ import uuid
|
|||||||
|
|
||||||
from eventlet import event
|
from eventlet import event
|
||||||
from eventlet import greenthread
|
from eventlet import greenthread
|
||||||
from eventlet.support import greenlets as greenlet
|
|
||||||
|
|
||||||
from neutron.common import exceptions
|
from neutron.common import exceptions
|
||||||
from neutron.openstack.common import log as logging
|
from neutron.openstack.common import log as logging
|
||||||
@ -295,12 +294,9 @@ class TaskManager():
|
|||||||
while True:
|
while True:
|
||||||
try:
|
try:
|
||||||
if self._stopped:
|
if self._stopped:
|
||||||
# Somehow greenlet.GreenletExit exception is ignored
|
# Gracefully terminate this thread if the _stopped
|
||||||
# during unit-test when self._execute() is making db
|
# attribute was set to true
|
||||||
# access. This makes this thread not terminating and
|
LOG.info(_("Stopping TaskManager"))
|
||||||
# stop() caller wait indefinitely. So we added a check
|
|
||||||
# here before trying to do a block call on getting a
|
|
||||||
# task from queue
|
|
||||||
break
|
break
|
||||||
|
|
||||||
# get a task from queue, or timeout for periodic status check
|
# get a task from queue, or timeout for periodic status check
|
||||||
@ -324,17 +320,11 @@ class TaskManager():
|
|||||||
self._result(task)
|
self._result(task)
|
||||||
else:
|
else:
|
||||||
self._enqueue(task)
|
self._enqueue(task)
|
||||||
except greenlet.GreenletExit:
|
|
||||||
break
|
|
||||||
except Exception:
|
except Exception:
|
||||||
LOG.exception(_("TaskManager terminated"))
|
LOG.exception(_("TaskManager terminating because "
|
||||||
|
"of an exception"))
|
||||||
break
|
break
|
||||||
|
|
||||||
self._monitor.stop()
|
|
||||||
if self._monitor_busy:
|
|
||||||
self._monitor.wait()
|
|
||||||
self._abort()
|
|
||||||
|
|
||||||
def add(self, task):
|
def add(self, task):
|
||||||
task.id = uuid.uuid1()
|
task.id = uuid.uuid1()
|
||||||
self._tasks_queue.append(task)
|
self._tasks_queue.append(task)
|
||||||
@ -347,8 +337,13 @@ class TaskManager():
|
|||||||
return
|
return
|
||||||
self._stopped = True
|
self._stopped = True
|
||||||
self._thread.kill()
|
self._thread.kill()
|
||||||
self._thread.wait()
|
|
||||||
self._thread = None
|
self._thread = None
|
||||||
|
# Stop looping call and abort running tasks
|
||||||
|
self._monitor.stop()
|
||||||
|
if self._monitor_busy:
|
||||||
|
self._monitor.wait()
|
||||||
|
self._abort()
|
||||||
|
LOG.info(_("TaskManager terminated"))
|
||||||
|
|
||||||
def has_pending_task(self):
|
def has_pending_task(self):
|
||||||
if self._tasks_queue or self._tasks or self._main_thread_exec_task:
|
if self._tasks_queue or self._tasks or self._main_thread_exec_task:
|
||||||
|
@ -144,7 +144,8 @@ class ServiceRouterTest(test_nicira_plugin.NiciraL3NatTest,
|
|||||||
manager.show_pending_tasks()
|
manager.show_pending_tasks()
|
||||||
raise Exception(_("Tasks not completed"))
|
raise Exception(_("Tasks not completed"))
|
||||||
manager.stop()
|
manager.stop()
|
||||||
|
# Ensure the manager thread has been stopped
|
||||||
|
self.assertIsNone(manager._thread)
|
||||||
super(ServiceRouterTest, self).tearDown()
|
super(ServiceRouterTest, self).tearDown()
|
||||||
|
|
||||||
def _create_router(self, fmt, tenant_id, name=None,
|
def _create_router(self, fmt, tenant_id, name=None,
|
||||||
|
@ -45,6 +45,9 @@ class VcnsDriverTaskManagerTestCase(base.BaseTestCase):
|
|||||||
|
|
||||||
def tearDown(self):
|
def tearDown(self):
|
||||||
self.manager.stop()
|
self.manager.stop()
|
||||||
|
# Task manager should not leave running threads around
|
||||||
|
# if _thread is None it means it was killed in stop()
|
||||||
|
self.assertIsNone(self.manager._thread)
|
||||||
super(VcnsDriverTaskManagerTestCase, self).tearDown()
|
super(VcnsDriverTaskManagerTestCase, self).tearDown()
|
||||||
|
|
||||||
def _test_task_manager_task_process_state(self, sync_exec=False):
|
def _test_task_manager_task_process_state(self, sync_exec=False):
|
||||||
@ -222,6 +225,9 @@ class VcnsDriverTaskManagerTestCase(base.BaseTestCase):
|
|||||||
|
|
||||||
manager = ts.TaskManager().start(100)
|
manager = ts.TaskManager().start(100)
|
||||||
manager.stop()
|
manager.stop()
|
||||||
|
# Task manager should not leave running threads around
|
||||||
|
# if _thread is None it means it was killed in stop()
|
||||||
|
self.assertIsNone(manager._thread)
|
||||||
manager.start(100)
|
manager.start(100)
|
||||||
|
|
||||||
alltasks = {}
|
alltasks = {}
|
||||||
@ -236,6 +242,9 @@ class VcnsDriverTaskManagerTestCase(base.BaseTestCase):
|
|||||||
|
|
||||||
greenthread.sleep(stop_wait)
|
greenthread.sleep(stop_wait)
|
||||||
manager.stop()
|
manager.stop()
|
||||||
|
# Task manager should not leave running threads around
|
||||||
|
# if _thread is None it means it was killed in stop()
|
||||||
|
self.assertIsNone(manager._thread)
|
||||||
|
|
||||||
for res, tasks in alltasks.iteritems():
|
for res, tasks in alltasks.iteritems():
|
||||||
for task in tasks:
|
for task in tasks:
|
||||||
@ -325,6 +334,9 @@ class VcnsDriverTestCase(base.BaseTestCase):
|
|||||||
|
|
||||||
def tearDown(self):
|
def tearDown(self):
|
||||||
self.vcns_driver.task_manager.stop()
|
self.vcns_driver.task_manager.stop()
|
||||||
|
# Task manager should not leave running threads around
|
||||||
|
# if _thread is None it means it was killed in stop()
|
||||||
|
self.assertIsNone(self.vcns_driver.task_manager._thread)
|
||||||
super(VcnsDriverTestCase, self).tearDown()
|
super(VcnsDriverTestCase, self).tearDown()
|
||||||
|
|
||||||
def _deploy_edge(self):
|
def _deploy_edge(self):
|
||||||
|
Loading…
Reference in New Issue
Block a user