Fix random unit-test failure for NVP advanced plugin

Two problems were observed:
1. Tasks are not scheduled as desired in greenthread. The fix is to
   increase the max status probing time from 1 second to 10 seconds.
2. An error message is show in unit-test log output. Turns out in
   addition to a bug that may cause task manager stop prematurely,
   another unit-test case test_vcns_driver can cause same error
   message being displayed in the log.

Change-Id: I697dd8fc509308108ff1f40400f36ac6271bf4bb
Close-Bug: 1245698
This commit is contained in:
Kaiwei Fan 2013-10-28 15:47:51 -07:00
parent 99da250353
commit a52ef6ecf1
3 changed files with 50 additions and 17 deletions

View File

@ -15,8 +15,6 @@
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import print_function
import collections
import uuid
@ -167,6 +165,9 @@ class TaskManager():
# A dict to store resource -> resource's tasks
self._tasks = {}
# Current task being executed in main thread
self._main_thread_exec_task = None
# New request event
self._req = event.Event()
@ -311,8 +312,10 @@ class TaskManager():
continue
try:
self._main_thread_exec_task = task
self._execute(task)
finally:
self._main_thread_exec_task = None
if task.status is None:
# The thread is killed during _execute(). To guarantee
# the task been aborted correctly, put it to the queue.
@ -348,20 +351,19 @@ class TaskManager():
self._thread = None
def has_pending_task(self):
if self._tasks_queue:
if self._tasks_queue or self._tasks or self._main_thread_exec_task:
return True
if self._tasks:
return True
return False
else:
return False
def show_pending_tasks(self):
for task in self._tasks_queue:
print(str(task))
LOG.info(str(task))
for resource, tasks in self._tasks.iteritems():
for task in tasks:
print(str(task))
LOG.info(str(task))
if self._main_thread_exec_task:
LOG.info(str(self._main_thread_exec_task))
def count(self):
count = 0

View File

@ -135,7 +135,8 @@ class ServiceRouterTest(test_nicira_plugin.NiciraL3NatTest,
def tearDown(self):
plugin = NeutronManager.get_plugin()
manager = plugin.vcns_driver.task_manager
for i in range(20):
# wait max ~10 seconds for all tasks to be finished
for i in range(100):
if not manager.has_pending_task():
break
greenthread.sleep(0.1)
@ -183,8 +184,8 @@ class ServiceRouterTestCase(ServiceRouterTest, NvpRouterTestCase):
for k, v in expected_value_1:
self.assertEqual(router['router'][k], v)
# wait ~1 seconds for router status update
for i in range(2):
# wait max ~10 seconds for router status update
for i in range(20):
greenthread.sleep(0.5)
res = self._show('routers', router['router']['id'])
if res['router']['status'] == 'ACTIVE':

View File

@ -253,6 +253,31 @@ class VcnsDriverTaskManagerTestCase(base.BaseTestCase):
def test_task_manager_stop_4(self):
self._test_task_manager_stop(False, False, 1)
def test_task_pending_task(self):
def _exec(task):
task.userdata['executing'] = True
while not task.userdata['tested']:
greenthread.sleep(0)
task.userdata['executing'] = False
return TaskStatus.COMPLETED
userdata = {
'executing': False,
'tested': False
}
manager = ts.TaskManager().start(100)
task = ts.Task('name', 'res', _exec, userdata=userdata)
manager.add(task)
while not userdata['executing']:
greenthread.sleep(0)
self.assertTrue(manager.has_pending_task())
userdata['tested'] = True
while userdata['executing']:
greenthread.sleep(0)
self.assertFalse(manager.has_pending_task())
class VcnsDriverTestCase(base.BaseTestCase):
@ -298,6 +323,10 @@ class VcnsDriverTestCase(base.BaseTestCase):
self.edge_id = None
self.result = None
def tearDown(self):
self.vcns_driver.task_manager.stop()
super(VcnsDriverTestCase, self).tearDown()
def _deploy_edge(self):
task = self.vcns_driver.deploy_edge(
'router-id', 'myedge', 'internal-network', {}, wait_for_exec=True)
@ -355,12 +384,13 @@ class VcnsDriverTestCase(base.BaseTestCase):
self.assertTrue(jobdata.get('edge_deploy_result'))
def test_deploy_edge_fail(self):
self.vcns_driver.deploy_edge(
task1 = self.vcns_driver.deploy_edge(
'router-1', 'myedge', 'internal-network', {}, wait_for_exec=True)
task = self.vcns_driver.deploy_edge(
task2 = self.vcns_driver.deploy_edge(
'router-2', 'myedge', 'internal-network', {}, wait_for_exec=True)
task.wait(TaskState.RESULT)
self.assertEqual(task.status, TaskStatus.ERROR)
task1.wait(TaskState.RESULT)
task2.wait(TaskState.RESULT)
self.assertEqual(task2.status, TaskStatus.ERROR)
def test_get_edge_status(self):
self._deploy_edge()