vshield task manager: abort tasks in stop() on termination
This patch kills the manager thread, and aborts active tasks rather than sending an exception to the manager thread and have it do the abort on termination. Unit tests involving vshield task manager might take longer as a side effect of this patch. Related-bug: #1282452 Change-Id: I9e9e41ce7e8969a2ea51bfce96b1303125a24308
This commit is contained in:
parent
705da7e7f0
commit
62ec632c47
@ -20,7 +20,6 @@ import uuid
|
|||||||
|
|
||||||
from eventlet import event
|
from eventlet import event
|
||||||
from eventlet import greenthread
|
from eventlet import greenthread
|
||||||
from eventlet.support import greenlets as greenlet
|
|
||||||
|
|
||||||
from neutron.common import exceptions
|
from neutron.common import exceptions
|
||||||
from neutron.openstack.common import log as logging
|
from neutron.openstack.common import log as logging
|
||||||
@ -295,12 +294,9 @@ class TaskManager():
|
|||||||
while True:
|
while True:
|
||||||
try:
|
try:
|
||||||
if self._stopped:
|
if self._stopped:
|
||||||
# Somehow greenlet.GreenletExit exception is ignored
|
# Gracefully terminate this thread if the _stopped
|
||||||
# during unit-test when self._execute() is making db
|
# attribute was set to true
|
||||||
# access. This makes this thread not terminating and
|
LOG.info(_("Stopping TaskManager"))
|
||||||
# stop() caller wait indefinitely. So we added a check
|
|
||||||
# here before trying to do a block call on getting a
|
|
||||||
# task from queue
|
|
||||||
break
|
break
|
||||||
|
|
||||||
# get a task from queue, or timeout for periodic status check
|
# get a task from queue, or timeout for periodic status check
|
||||||
@ -324,17 +320,11 @@ class TaskManager():
|
|||||||
self._result(task)
|
self._result(task)
|
||||||
else:
|
else:
|
||||||
self._enqueue(task)
|
self._enqueue(task)
|
||||||
except greenlet.GreenletExit:
|
|
||||||
break
|
|
||||||
except Exception:
|
except Exception:
|
||||||
LOG.exception(_("TaskManager terminated"))
|
LOG.exception(_("TaskManager terminating because "
|
||||||
|
"of an exception"))
|
||||||
break
|
break
|
||||||
|
|
||||||
self._monitor.stop()
|
|
||||||
if self._monitor_busy:
|
|
||||||
self._monitor.wait()
|
|
||||||
self._abort()
|
|
||||||
|
|
||||||
def add(self, task):
|
def add(self, task):
|
||||||
task.id = uuid.uuid1()
|
task.id = uuid.uuid1()
|
||||||
self._tasks_queue.append(task)
|
self._tasks_queue.append(task)
|
||||||
@ -347,8 +337,13 @@ class TaskManager():
|
|||||||
return
|
return
|
||||||
self._stopped = True
|
self._stopped = True
|
||||||
self._thread.kill()
|
self._thread.kill()
|
||||||
self._thread.wait()
|
|
||||||
self._thread = None
|
self._thread = None
|
||||||
|
# Stop looping call and abort running tasks
|
||||||
|
self._monitor.stop()
|
||||||
|
if self._monitor_busy:
|
||||||
|
self._monitor.wait()
|
||||||
|
self._abort()
|
||||||
|
LOG.info(_("TaskManager terminated"))
|
||||||
|
|
||||||
def has_pending_task(self):
|
def has_pending_task(self):
|
||||||
if self._tasks_queue or self._tasks or self._main_thread_exec_task:
|
if self._tasks_queue or self._tasks or self._main_thread_exec_task:
|
||||||
|
@ -144,7 +144,8 @@ class ServiceRouterTest(test_nicira_plugin.NiciraL3NatTest,
|
|||||||
manager.show_pending_tasks()
|
manager.show_pending_tasks()
|
||||||
raise Exception(_("Tasks not completed"))
|
raise Exception(_("Tasks not completed"))
|
||||||
manager.stop()
|
manager.stop()
|
||||||
|
# Ensure the manager thread has been stopped
|
||||||
|
self.assertIsNone(manager._thread)
|
||||||
super(ServiceRouterTest, self).tearDown()
|
super(ServiceRouterTest, self).tearDown()
|
||||||
|
|
||||||
def _create_router(self, fmt, tenant_id, name=None,
|
def _create_router(self, fmt, tenant_id, name=None,
|
||||||
|
@ -45,6 +45,9 @@ class VcnsDriverTaskManagerTestCase(base.BaseTestCase):
|
|||||||
|
|
||||||
def tearDown(self):
|
def tearDown(self):
|
||||||
self.manager.stop()
|
self.manager.stop()
|
||||||
|
# Task manager should not leave running threads around
|
||||||
|
# if _thread is None it means it was killed in stop()
|
||||||
|
self.assertIsNone(self.manager._thread)
|
||||||
super(VcnsDriverTaskManagerTestCase, self).tearDown()
|
super(VcnsDriverTaskManagerTestCase, self).tearDown()
|
||||||
|
|
||||||
def _test_task_manager_task_process_state(self, sync_exec=False):
|
def _test_task_manager_task_process_state(self, sync_exec=False):
|
||||||
@ -222,6 +225,9 @@ class VcnsDriverTaskManagerTestCase(base.BaseTestCase):
|
|||||||
|
|
||||||
manager = ts.TaskManager().start(100)
|
manager = ts.TaskManager().start(100)
|
||||||
manager.stop()
|
manager.stop()
|
||||||
|
# Task manager should not leave running threads around
|
||||||
|
# if _thread is None it means it was killed in stop()
|
||||||
|
self.assertIsNone(manager._thread)
|
||||||
manager.start(100)
|
manager.start(100)
|
||||||
|
|
||||||
alltasks = {}
|
alltasks = {}
|
||||||
@ -236,6 +242,9 @@ class VcnsDriverTaskManagerTestCase(base.BaseTestCase):
|
|||||||
|
|
||||||
greenthread.sleep(stop_wait)
|
greenthread.sleep(stop_wait)
|
||||||
manager.stop()
|
manager.stop()
|
||||||
|
# Task manager should not leave running threads around
|
||||||
|
# if _thread is None it means it was killed in stop()
|
||||||
|
self.assertIsNone(manager._thread)
|
||||||
|
|
||||||
for res, tasks in alltasks.iteritems():
|
for res, tasks in alltasks.iteritems():
|
||||||
for task in tasks:
|
for task in tasks:
|
||||||
@ -325,6 +334,9 @@ class VcnsDriverTestCase(base.BaseTestCase):
|
|||||||
|
|
||||||
def tearDown(self):
|
def tearDown(self):
|
||||||
self.vcns_driver.task_manager.stop()
|
self.vcns_driver.task_manager.stop()
|
||||||
|
# Task manager should not leave running threads around
|
||||||
|
# if _thread is None it means it was killed in stop()
|
||||||
|
self.assertIsNone(self.vcns_driver.task_manager._thread)
|
||||||
super(VcnsDriverTestCase, self).tearDown()
|
super(VcnsDriverTestCase, self).tearDown()
|
||||||
|
|
||||||
def _deploy_edge(self):
|
def _deploy_edge(self):
|
||||||
|
Loading…
Reference in New Issue
Block a user