Merge "Fix IF checks on spawned green thread instance"

This commit is contained in:
Jenkins 2013-09-07 16:57:54 +00:00 committed by Gerrit Code Review
commit 57d7e7ff5a
2 changed files with 62 additions and 25 deletions

View File

@ -169,12 +169,11 @@ class TaskManager():
self._req = event.Event()
# TaskHandler stopped event
self._stopped = event.Event()
self._stopped = False
# Periodic function trigger
self._monitor = None
self._monitor_busy = False
self._monitor_stop = None
# Thread handling the task request
self._thread = None
@ -221,8 +220,8 @@ class TaskManager():
def _check_pending_tasks(self):
"""Check all pending tasks status."""
for resource_id in self._tasks.keys():
if self._monitor_stop:
# looping call is asked to stop, return now
if self._stopped:
# Task manager is stopped, return now
return
tasks = self._tasks[resource_id]
@ -270,6 +269,12 @@ class TaskManager():
def _abort(self):
"""Abort all tasks."""
# put all tasks haven't been received by main thread to queue
# so the following abort handling can cover them
for t in self._tasks_queue:
self._enqueue(t)
self._tasks_queue.clear()
for resource_id in self._tasks.keys():
tasks = list(self._tasks[resource_id])
for task in tasks:
@ -287,6 +292,15 @@ class TaskManager():
def run(self):
while True:
try:
if self._stopped:
# Somehow greenlet.GreenletExit exception is ignored
# during unit-test when self._execute() is making db
# access. This makes this thread not terminating and
# stop() caller wait indefinitely. So we added a check
# here before trying to do a block call on getting a
# task from queue
break
# get a task from queue, or timeout for periodic status check
task = self._get_task()
if task.resource_id in self._tasks:
@ -295,13 +309,17 @@ class TaskManager():
self._enqueue(task)
continue
status = self._execute(task)
if status != TaskStatus.PENDING:
self._result(task)
continue
self._enqueue(task)
try:
self._execute(task)
finally:
if task.status is None:
# The thread is killed during _execute(). To guarantee
# the task been aborted correctly, put it to the queue.
self._enqueue(task)
elif task.status != TaskStatus.PENDING:
self._result(task)
else:
self._enqueue(task)
except greenlet.GreenletExit:
break
except Exception:
@ -310,11 +328,8 @@ class TaskManager():
self._monitor.stop()
if self._monitor_busy:
self._monitor_stop = event.Event()
self._monitor_stop.wait()
self._monitor_stop = None
self._monitor.wait()
self._abort()
self._stopped.send()
def add(self, task):
task.id = uuid.uuid1()
@ -324,10 +339,11 @@ class TaskManager():
return task.id
def stop(self):
if not self._thread:
if self._thread is None:
return
self._stopped = True
self._thread.kill()
self._stopped.wait()
self._thread.wait()
self._thread = None
def has_pending_task(self):
@ -357,26 +373,27 @@ class TaskManager():
self.run()
def _loopingcall_callback():
self._monitor_busy = True
try:
self._monitor_busy = True
self._check_pending_tasks()
self._monitor_busy = False
if self._monitor_stop:
self._monitor_stop.send()
except Exception:
LOG.exception(_("Exception in _check_pending_tasks"))
self._monitor_busy = False
if self._thread:
if self._thread is not None:
return self
if interval is None or interval == 0:
interval = self._interval
self._stopped = False
self._thread = greenthread.spawn(_inner)
self._monitor = loopingcall.FixedIntervalLoopingCall(
_loopingcall_callback)
self._monitor.start(interval / 1000.0,
interval / 1000.0)
# To allow the created thread start running
greenthread.sleep(0)
return self

View File

@ -212,18 +212,25 @@ class VcnsDriverTaskManagerTestCase(base.BaseTestCase):
task.wait(TaskState.RESULT)
self.assertTrue(task.userdata['result'])
def test_task_manager_stop(self):
def _test_task_manager_stop(self, exec_wait=False, result_wait=False,
stop_wait=0):
def _exec(task):
if exec_wait:
greenthread.sleep(0.01)
return TaskStatus.PENDING
def _status(task):
greenthread.sleep(0.1)
greenthread.sleep(0.01)
return TaskStatus.PENDING
def _result(task):
if result_wait:
greenthread.sleep(0)
pass
manager = ts.TaskManager().start(100)
manager.stop()
manager.start(100)
alltasks = {}
for i in range(100):
@ -235,12 +242,25 @@ class VcnsDriverTaskManagerTestCase(base.BaseTestCase):
tasks.append(task)
alltasks[res] = tasks
greenthread.sleep(2)
greenthread.sleep(stop_wait)
manager.stop()
for res, tasks in alltasks.iteritems():
for task in tasks:
self.assertEqual(task.status, TaskStatus.ABORT)
def test_task_manager_stop_1(self):
self._test_task_manager_stop(True, True, 0)
def test_task_manager_stop_2(self):
self._test_task_manager_stop(True, True, 1)
def test_task_manager_stop_3(self):
self._test_task_manager_stop(False, False, 0)
def test_task_manager_stop_4(self):
self._test_task_manager_stop(False, False, 1)
class VcnsDriverTestCase(base.BaseTestCase):