Cancelling thread start while unit tests running

This change modifies the Radware driver and its unit testing code
to not start operations completion thread while unit tests are running.

The driver initialization changed not to start the operations completion thread,
the thread is started only when operation completion item is inserted into the queue
for the first time.
The operation completion functionality was moved to a new function which
is called by the operations completion thread run() function.
The run() function still have the functionality of popping operation completion
items out of the queue and push failed items back.

Unit testing code mocks the operation completion items queue
by calling the operations completion hanler new function when item
is added.

Start() and join() functions of the thread were mocked to do nothing.

All sleep() entrances were removed from the unit testing code.
All unnecessary mock_reset() calls were removed.

Change-Id: I72380bf223be690831aba1fc29c3dca910245516
Closes-Bug: #1245208
This commit is contained in:
Evgeny Fedoruk 2014-03-18 10:55:11 -07:00
parent 1e81fdb1ea
commit 4c7b59c7bb
2 changed files with 103 additions and 106 deletions

View File

@ -177,7 +177,7 @@ class LoadBalancerDriver(abstract_driver.LoadBalancerAbstractDriver):
plugin)
self.workflow_templates_exists = False
self.completion_handler.setDaemon(True)
self.completion_handler.start()
self.completion_handler_started = False
def create_vip(self, context, vip):
LOG.debug(_('create_vip. vip: %s'), str(vip))
@ -340,6 +340,12 @@ class LoadBalancerDriver(abstract_driver.LoadBalancerAbstractDriver):
context, extended_vip['subnet_id'])
return subnet['network_id']
def _start_completion_handling_thread(self):
if not self.completion_handler_started:
LOG.info(_('Starting operation completion handling thread'))
self.completion_handler.start()
self.completion_handler_started = True
@call_log.log
def _update_workflow(self, wf_name, action,
wf_params, context,
@ -371,6 +377,8 @@ class LoadBalancerDriver(abstract_driver.LoadBalancerAbstractDriver):
entity_id,
delete=delete)
LOG.debug(_('Pushing operation %s to the queue'), oper)
self._start_completion_handling_thread()
self.queue.put_nowait(oper)
def _remove_workflow(self, ids, context):
@ -391,6 +399,8 @@ class LoadBalancerDriver(abstract_driver.LoadBalancerAbstractDriver):
ids['vip'],
delete=True)
LOG.debug(_('Pushing operation %s to the queue'), oper)
self._start_completion_handling_thread()
self.queue.put_nowait(oper)
def _remove_service(self, service_name):
@ -619,24 +629,52 @@ class OperationCompletionHandler(threading.Thread):
self.stoprequest = threading.Event()
self.opers_to_handle_before_rest = 0
def _get_db_status(self, operation, success, messages=None):
"""Get the db_status based on the status of the vdirect operation."""
if not success:
# we have a failure - log it and set the return ERROR as DB state
msg = ', '.join(messages) if messages else "unknown"
error_params = {"operation": operation, "msg": msg}
LOG.error(_('Operation %(operation)s failed. Reason: %(msg)s'),
error_params)
return constants.ERROR
if operation.delete:
return None
else:
return constants.ACTIVE
def join(self, timeout=None):
self.stoprequest.set()
super(OperationCompletionHandler, self).join(timeout)
def handle_operation_completion(self, oper):
result = self.rest_client.call('GET',
oper.operation_url,
None,
None)
completed = result[RESP_DATA]['complete']
reason = result[RESP_REASON],
description = result[RESP_STR]
if completed:
# operation is done - update the DB with the status
# or delete the entire graph from DB
success = result[RESP_DATA]['success']
sec_to_completion = time.time() - oper.creation_time
debug_data = {'oper': oper,
'sec_to_completion': sec_to_completion,
'success': success}
LOG.debug(_('Operation %(oper)s is completed after '
'%(sec_to_completion)d sec '
'with success status: %(success)s :'),
debug_data)
db_status = None
if not success:
# failure - log it and set the return ERROR as DB state
if reason or description:
msg = 'Reason:%s. Description:%s' % (reason, description)
else:
msg = "unknown"
error_params = {"operation": oper, "msg": msg}
LOG.error(_('Operation %(operation)s failed. Reason: %(msg)s'),
error_params)
db_status = constants.ERROR
else:
if oper.delete:
_remove_object_from_db(self.plugin, oper)
else:
db_status = constants.ACTIVE
if db_status:
_update_vip_graph_status(self.plugin, oper, db_status)
return completed
def run(self):
oper = None
while not self.stoprequest.isSet():
@ -653,31 +691,7 @@ class OperationCompletionHandler(threading.Thread):
str(oper))
# check the status - if oper is done: update the db ,
# else push the oper again to the queue
result = self.rest_client.call('GET',
oper.operation_url,
None,
None)
completed = result[RESP_DATA]['complete']
if completed:
# operation is done - update the DB with the status
# or delete the entire graph from DB
success = result[RESP_DATA]['success']
sec_to_completion = time.time() - oper.creation_time
debug_data = {'oper': oper,
'sec_to_completion': sec_to_completion,
'success': success}
LOG.debug(_('Operation %(oper)s is completed after '
'%(sec_to_completion)d sec '
'with success status: %(success)s :'),
debug_data)
db_status = self._get_db_status(oper, success)
if db_status:
_update_vip_graph_status(
self.plugin, oper, db_status)
else:
_remove_object_from_db(
self.plugin, oper)
else:
if not self.handle_operation_completion(oper):
LOG.debug(_('Operation %s is not completed yet..') % oper)
# Not completed - push to the queue again
self.queue.put_nowait(oper)

View File

@ -16,10 +16,10 @@
#
# @author: Avishay Balderman, Radware
import Queue
import re
import contextlib
import eventlet
import mock
from neutron import context
@ -34,8 +34,16 @@ from neutron.tests.unit.db.loadbalancer import test_db_loadbalancer
GET_200 = ('/api/workflow/', '/api/service/', '/api/workflowTemplate')
def rest_call_function_mock(action, resource, data, headers, binary=False):
class QueueMock(Queue.Queue):
def __init__(self, completion_handler):
self.completion_handler = completion_handler
super(QueueMock, self).__init__()
def put_nowait(self, oper):
self.completion_handler(oper)
def rest_call_function_mock(action, resource, data, headers, binary=False):
if rest_call_function_mock.RESPOND_WITH_ERROR:
return 400, 'error_status', 'error_description', None
@ -107,13 +115,24 @@ class TestLoadBalancerPlugin(TestLoadBalancerPluginBase):
rest_call_function_mock.__dict__.update(
{'TEMPLATES_MISSING': False})
self.rest_call_mock = mock.Mock(name='rest_call_mock',
side_effect=rest_call_function_mock,
spec=self.plugin_instance.
drivers['radware'].
rest_client.call)
self.operation_completer_start_mock = mock.Mock(
return_value=None)
self.operation_completer_join_mock = mock.Mock(
return_value=None)
self.driver_rest_call_mock = mock.Mock(
side_effect=rest_call_function_mock)
radware_driver = self.plugin_instance.drivers['radware']
radware_driver.rest_client.call = self.rest_call_mock
radware_driver.completion_handler.start = (
self.operation_completer_start_mock)
radware_driver.completion_handler.join = (
self.operation_completer_join_mock)
radware_driver.rest_client.call = self.driver_rest_call_mock
radware_driver.completion_handler.rest_client.call = (
self.driver_rest_call_mock)
radware_driver.queue = QueueMock(
radware_driver.completion_handler.handle_operation_completion)
self.addCleanup(radware_driver.completion_handler.join)
@ -128,7 +147,6 @@ class TestLoadBalancerPlugin(TestLoadBalancerPluginBase):
def test_create_vip_failure(self):
"""Test the rest call failure handling by Exception raising."""
self.rest_call_mock.reset_mock()
with self.network(do_delete=False) as network:
with self.subnet(network=network, do_delete=False) as subnet:
with self.pool(no_delete=True, provider='radware') as pool:
@ -155,9 +173,6 @@ class TestLoadBalancerPlugin(TestLoadBalancerPluginBase):
{'vip': vip_data})
def test_create_vip(self):
self.skipTest("Skipping test till bug 1288312 is fixed")
self.rest_call_mock.reset_mock()
with self.subnet() as subnet:
with self.pool(provider='radware') as pool:
vip_data = {
@ -211,10 +226,8 @@ class TestLoadBalancerPlugin(TestLoadBalancerPluginBase):
mock.call('GET', '/api/workflow/' +
pool['pool']['id'], None, None)
]
self.rest_call_mock.assert_has_calls(calls, any_order=True)
# sleep to wait for the operation completion
eventlet.greenthread.sleep(0)
self.driver_rest_call_mock.assert_has_calls(calls,
any_order=True)
#Test DB
new_vip = self.plugin_instance.get_vip(
@ -232,12 +245,10 @@ class TestLoadBalancerPlugin(TestLoadBalancerPluginBase):
mock.call('DELETE', u'/api/workflow/' + pool['pool']['id'],
None, None)
]
self.rest_call_mock.assert_has_calls(calls, any_order=True)
# need to wait some time to allow driver to delete vip
eventlet.greenthread.sleep(1)
self.driver_rest_call_mock.assert_has_calls(
calls, any_order=True)
def test_update_vip(self):
self.rest_call_mock.reset_mock()
with self.subnet() as subnet:
with self.pool(provider='radware', no_delete=True) as pool:
vip_data = {
@ -268,15 +279,9 @@ class TestLoadBalancerPlugin(TestLoadBalancerPluginBase):
'/action/BaseCreate',
mock.ANY, driver.TEMPLATE_HEADER),
]
self.rest_call_mock.assert_has_calls(calls, any_order=True)
self.driver_rest_call_mock.assert_has_calls(
calls, any_order=True)
updated_vip = self.plugin_instance.get_vip(
context.get_admin_context(), vip['id'])
self.assertEqual(updated_vip['status'],
constants.PENDING_UPDATE)
# sleep to wait for the operation completion
eventlet.greenthread.sleep(1)
updated_vip = self.plugin_instance.get_vip(
context.get_admin_context(), vip['id'])
self.assertEqual(updated_vip['status'], constants.ACTIVE)
@ -286,7 +291,6 @@ class TestLoadBalancerPlugin(TestLoadBalancerPluginBase):
context.get_admin_context(), vip['id'])
def test_delete_vip_failure(self):
self.rest_call_mock.reset_mock()
plugin = self.plugin_instance
with self.network(do_delete=False) as network:
@ -306,8 +310,6 @@ class TestLoadBalancerPlugin(TestLoadBalancerPluginBase):
context.get_admin_context(), hm, pool['pool']['id']
)
eventlet.greenthread.sleep(1)
rest_call_function_mock.__dict__.update(
{'RESPOND_WITH_ERROR': True})
@ -333,7 +335,6 @@ class TestLoadBalancerPlugin(TestLoadBalancerPluginBase):
self.assertEqual(u_phm['status'], constants.ACTIVE)
def test_delete_vip(self):
self.rest_call_mock.reset_mock()
with self.subnet() as subnet:
with self.pool(provider='radware', no_delete=True) as pool:
vip_data = {
@ -360,14 +361,14 @@ class TestLoadBalancerPlugin(TestLoadBalancerPluginBase):
mock.call('DELETE', '/api/workflow/' + pool['pool']['id'],
None, None)
]
self.rest_call_mock.assert_has_calls(calls, any_order=True)
self.driver_rest_call_mock.assert_has_calls(
calls, any_order=True)
self.assertRaises(loadbalancer.VipNotFound,
self.plugin_instance.get_vip,
context.get_admin_context(), vip['id'])
def test_update_pool(self):
self.rest_call_mock.reset_mock()
with self.subnet():
with self.pool() as pool:
del pool['pool']['provider']
@ -380,7 +381,6 @@ class TestLoadBalancerPlugin(TestLoadBalancerPluginBase):
self.assertEqual(pool_db['status'], constants.PENDING_UPDATE)
def test_delete_pool_with_vip(self):
self.rest_call_mock.reset_mock()
with self.subnet() as subnet:
with self.pool(provider='radware', no_delete=True) as pool:
with self.vip(pool=pool, subnet=subnet):
@ -390,7 +390,6 @@ class TestLoadBalancerPlugin(TestLoadBalancerPluginBase):
pool['pool']['id'])
def test_create_member_with_vip(self):
self.rest_call_mock.reset_mock()
with self.subnet() as subnet:
with self.pool(provider='radware') as p:
with self.vip(pool=p, subnet=subnet):
@ -407,11 +406,10 @@ class TestLoadBalancerPlugin(TestLoadBalancerPluginBase):
mock.ANY, driver.TEMPLATE_HEADER
)
]
self.rest_call_mock.assert_has_calls(calls,
any_order=True)
self.driver_rest_call_mock.assert_has_calls(
calls, any_order=True)
def test_update_member_with_vip(self):
self.rest_call_mock.reset_mock()
with self.subnet() as subnet:
with self.pool(provider='radware') as p:
with self.member(pool_id=p['pool']['id']) as member:
@ -432,16 +430,14 @@ class TestLoadBalancerPlugin(TestLoadBalancerPluginBase):
mock.ANY, driver.TEMPLATE_HEADER
)
]
self.rest_call_mock.assert_has_calls(calls,
any_order=True)
self.driver_rest_call_mock.assert_has_calls(
calls, any_order=True)
updated_member = self.plugin_instance.get_member(
context.get_admin_context(),
member['member']['id']
)
# sleep to wait for the operation completion
eventlet.greenthread.sleep(0)
updated_member = self.plugin_instance.get_member(
context.get_admin_context(),
member['member']['id']
@ -450,7 +446,6 @@ class TestLoadBalancerPlugin(TestLoadBalancerPluginBase):
constants.ACTIVE)
def test_update_member_without_vip(self):
self.rest_call_mock.reset_mock()
with self.subnet():
with self.pool(provider='radware') as pool:
with self.member(pool_id=pool['pool']['id']) as member:
@ -463,7 +458,6 @@ class TestLoadBalancerPlugin(TestLoadBalancerPluginBase):
constants.PENDING_UPDATE)
def test_delete_member_with_vip(self):
self.rest_call_mock.reset_mock()
with self.subnet() as subnet:
with self.pool(provider='radware') as p:
with self.member(pool_id=p['pool']['id'],
@ -474,21 +468,22 @@ class TestLoadBalancerPlugin(TestLoadBalancerPluginBase):
# wait for being sure the member
# Changed status from PENDING-CREATE
# to ACTIVE
self.rest_call_mock.reset_mock()
eventlet.greenthread.sleep(1)
self.plugin_instance.delete_member(
context.get_admin_context(),
m['member']['id']
)
args, kwargs = self.rest_call_mock.call_args
name, args, kwargs = (
self.driver_rest_call_mock.mock_calls[-2]
)
deletion_post_graph = str(args[2])
self.assertTrue(re.search(
r'.*\'member_address_array\': \[\].*',
deletion_post_graph
))
calls = [
mock.call(
'POST', '/api/workflow/' + p['pool']['id'] +
@ -496,17 +491,15 @@ class TestLoadBalancerPlugin(TestLoadBalancerPluginBase):
mock.ANY, driver.TEMPLATE_HEADER
)
]
self.rest_call_mock.assert_has_calls(
self.driver_rest_call_mock.assert_has_calls(
calls, any_order=True)
eventlet.greenthread.sleep(1)
self.assertRaises(loadbalancer.MemberNotFound,
self.plugin_instance.get_member,
context.get_admin_context(),
m['member']['id'])
def test_delete_member_without_vip(self):
self.rest_call_mock.reset_mock()
with self.subnet():
with self.pool(provider='radware') as p:
with self.member(pool_id=p['pool']['id'], no_delete=True) as m:
@ -519,7 +512,6 @@ class TestLoadBalancerPlugin(TestLoadBalancerPluginBase):
m['member']['id'])
def test_create_hm_with_vip(self):
self.rest_call_mock.reset_mock()
with self.subnet() as subnet:
with self.health_monitor() as hm:
with self.pool(provider='radware') as pool:
@ -543,11 +535,9 @@ class TestLoadBalancerPlugin(TestLoadBalancerPluginBase):
mock.ANY, driver.TEMPLATE_HEADER
)
]
self.rest_call_mock.assert_has_calls(
self.driver_rest_call_mock.assert_has_calls(
calls, any_order=True)
eventlet.greenthread.sleep(1)
phm = self.plugin_instance.get_pool_health_monitor(
context.get_admin_context(),
hm['health_monitor']['id'], pool['pool']['id']
@ -555,7 +545,6 @@ class TestLoadBalancerPlugin(TestLoadBalancerPluginBase):
self.assertEqual(phm['status'], constants.ACTIVE)
def test_delete_pool_hm_with_vip(self):
self.rest_call_mock.reset_mock()
with self.subnet() as subnet:
with self.health_monitor(no_delete=True) as hm:
with self.pool(provider='radware') as pool:
@ -565,21 +554,15 @@ class TestLoadBalancerPlugin(TestLoadBalancerPluginBase):
hm, pool['pool']['id']
)
# Reset mock and
# wait for being sure that status
# changed from PENDING-CREATE
# to ACTIVE
self.rest_call_mock.reset_mock()
eventlet.greenthread.sleep(1)
self.plugin_instance.delete_pool_health_monitor(
context.get_admin_context(),
hm['health_monitor']['id'],
pool['pool']['id']
)
eventlet.greenthread.sleep(1)
name, args, kwargs = self.rest_call_mock.mock_calls[-2]
name, args, kwargs = (
self.driver_rest_call_mock.mock_calls[-2]
)
deletion_post_graph = str(args[2])
self.assertTrue(re.search(
@ -594,7 +577,7 @@ class TestLoadBalancerPlugin(TestLoadBalancerPluginBase):
mock.ANY, driver.TEMPLATE_HEADER
)
]
self.rest_call_mock.assert_has_calls(
self.driver_rest_call_mock.assert_has_calls(
calls, any_order=True)
self.assertRaises(