diff --git a/neutron/services/loadbalancer/drivers/radware/driver.py b/neutron/services/loadbalancer/drivers/radware/driver.py index dae5d14454..cb159f7c84 100644 --- a/neutron/services/loadbalancer/drivers/radware/driver.py +++ b/neutron/services/loadbalancer/drivers/radware/driver.py @@ -177,7 +177,7 @@ class LoadBalancerDriver(abstract_driver.LoadBalancerAbstractDriver): plugin) self.workflow_templates_exists = False self.completion_handler.setDaemon(True) - self.completion_handler.start() + self.completion_handler_started = False def create_vip(self, context, vip): LOG.debug(_('create_vip. vip: %s'), str(vip)) @@ -340,6 +340,12 @@ class LoadBalancerDriver(abstract_driver.LoadBalancerAbstractDriver): context, extended_vip['subnet_id']) return subnet['network_id'] + def _start_completion_handling_thread(self): + if not self.completion_handler_started: + LOG.info(_('Starting operation completion handling thread')) + self.completion_handler.start() + self.completion_handler_started = True + @call_log.log def _update_workflow(self, wf_name, action, wf_params, context, @@ -371,6 +377,8 @@ class LoadBalancerDriver(abstract_driver.LoadBalancerAbstractDriver): entity_id, delete=delete) LOG.debug(_('Pushing operation %s to the queue'), oper) + + self._start_completion_handling_thread() self.queue.put_nowait(oper) def _remove_workflow(self, ids, context): @@ -391,6 +399,8 @@ class LoadBalancerDriver(abstract_driver.LoadBalancerAbstractDriver): ids['vip'], delete=True) LOG.debug(_('Pushing operation %s to the queue'), oper) + + self._start_completion_handling_thread() self.queue.put_nowait(oper) def _remove_service(self, service_name): @@ -619,24 +629,52 @@ class OperationCompletionHandler(threading.Thread): self.stoprequest = threading.Event() self.opers_to_handle_before_rest = 0 - def _get_db_status(self, operation, success, messages=None): - """Get the db_status based on the status of the vdirect operation.""" - if not success: - # we have a failure - log it and set the return ERROR as DB state - msg = ', '.join(messages) if messages else "unknown" - error_params = {"operation": operation, "msg": msg} - LOG.error(_('Operation %(operation)s failed. Reason: %(msg)s'), - error_params) - return constants.ERROR - if operation.delete: - return None - else: - return constants.ACTIVE - def join(self, timeout=None): self.stoprequest.set() super(OperationCompletionHandler, self).join(timeout) + def handle_operation_completion(self, oper): + result = self.rest_client.call('GET', + oper.operation_url, + None, + None) + completed = result[RESP_DATA]['complete'] + reason = result[RESP_REASON], + description = result[RESP_STR] + if completed: + # operation is done - update the DB with the status + # or delete the entire graph from DB + success = result[RESP_DATA]['success'] + sec_to_completion = time.time() - oper.creation_time + debug_data = {'oper': oper, + 'sec_to_completion': sec_to_completion, + 'success': success} + LOG.debug(_('Operation %(oper)s is completed after ' + '%(sec_to_completion)d sec ' + 'with success status: %(success)s :'), + debug_data) + db_status = None + if not success: + # failure - log it and set the return ERROR as DB state + if reason or description: + msg = 'Reason:%s. Description:%s' % (reason, description) + else: + msg = "unknown" + error_params = {"operation": oper, "msg": msg} + LOG.error(_('Operation %(operation)s failed. Reason: %(msg)s'), + error_params) + db_status = constants.ERROR + else: + if oper.delete: + _remove_object_from_db(self.plugin, oper) + else: + db_status = constants.ACTIVE + + if db_status: + _update_vip_graph_status(self.plugin, oper, db_status) + + return completed + def run(self): oper = None while not self.stoprequest.isSet(): @@ -653,31 +691,7 @@ class OperationCompletionHandler(threading.Thread): str(oper)) # check the status - if oper is done: update the db , # else push the oper again to the queue - result = self.rest_client.call('GET', - oper.operation_url, - None, - None) - completed = result[RESP_DATA]['complete'] - if completed: - # operation is done - update the DB with the status - # or delete the entire graph from DB - success = result[RESP_DATA]['success'] - sec_to_completion = time.time() - oper.creation_time - debug_data = {'oper': oper, - 'sec_to_completion': sec_to_completion, - 'success': success} - LOG.debug(_('Operation %(oper)s is completed after ' - '%(sec_to_completion)d sec ' - 'with success status: %(success)s :'), - debug_data) - db_status = self._get_db_status(oper, success) - if db_status: - _update_vip_graph_status( - self.plugin, oper, db_status) - else: - _remove_object_from_db( - self.plugin, oper) - else: + if not self.handle_operation_completion(oper): LOG.debug(_('Operation %s is not completed yet..') % oper) # Not completed - push to the queue again self.queue.put_nowait(oper) diff --git a/neutron/tests/unit/services/loadbalancer/drivers/radware/test_plugin_driver.py b/neutron/tests/unit/services/loadbalancer/drivers/radware/test_plugin_driver.py index 4d24440626..ffbb9866b4 100644 --- a/neutron/tests/unit/services/loadbalancer/drivers/radware/test_plugin_driver.py +++ b/neutron/tests/unit/services/loadbalancer/drivers/radware/test_plugin_driver.py @@ -16,10 +16,10 @@ # # @author: Avishay Balderman, Radware +import Queue import re import contextlib -import eventlet import mock from neutron import context @@ -34,8 +34,16 @@ from neutron.tests.unit.db.loadbalancer import test_db_loadbalancer GET_200 = ('/api/workflow/', '/api/service/', '/api/workflowTemplate') -def rest_call_function_mock(action, resource, data, headers, binary=False): +class QueueMock(Queue.Queue): + def __init__(self, completion_handler): + self.completion_handler = completion_handler + super(QueueMock, self).__init__() + def put_nowait(self, oper): + self.completion_handler(oper) + + +def rest_call_function_mock(action, resource, data, headers, binary=False): if rest_call_function_mock.RESPOND_WITH_ERROR: return 400, 'error_status', 'error_description', None @@ -107,13 +115,24 @@ class TestLoadBalancerPlugin(TestLoadBalancerPluginBase): rest_call_function_mock.__dict__.update( {'TEMPLATES_MISSING': False}) - self.rest_call_mock = mock.Mock(name='rest_call_mock', - side_effect=rest_call_function_mock, - spec=self.plugin_instance. - drivers['radware']. - rest_client.call) + self.operation_completer_start_mock = mock.Mock( + return_value=None) + self.operation_completer_join_mock = mock.Mock( + return_value=None) + self.driver_rest_call_mock = mock.Mock( + side_effect=rest_call_function_mock) + radware_driver = self.plugin_instance.drivers['radware'] - radware_driver.rest_client.call = self.rest_call_mock + radware_driver.completion_handler.start = ( + self.operation_completer_start_mock) + radware_driver.completion_handler.join = ( + self.operation_completer_join_mock) + radware_driver.rest_client.call = self.driver_rest_call_mock + radware_driver.completion_handler.rest_client.call = ( + self.driver_rest_call_mock) + + radware_driver.queue = QueueMock( + radware_driver.completion_handler.handle_operation_completion) self.addCleanup(radware_driver.completion_handler.join) @@ -128,7 +147,6 @@ class TestLoadBalancerPlugin(TestLoadBalancerPluginBase): def test_create_vip_failure(self): """Test the rest call failure handling by Exception raising.""" - self.rest_call_mock.reset_mock() with self.network(do_delete=False) as network: with self.subnet(network=network, do_delete=False) as subnet: with self.pool(no_delete=True, provider='radware') as pool: @@ -155,9 +173,6 @@ class TestLoadBalancerPlugin(TestLoadBalancerPluginBase): {'vip': vip_data}) def test_create_vip(self): - self.skipTest("Skipping test till bug 1288312 is fixed") - - self.rest_call_mock.reset_mock() with self.subnet() as subnet: with self.pool(provider='radware') as pool: vip_data = { @@ -211,10 +226,8 @@ class TestLoadBalancerPlugin(TestLoadBalancerPluginBase): mock.call('GET', '/api/workflow/' + pool['pool']['id'], None, None) ] - self.rest_call_mock.assert_has_calls(calls, any_order=True) - - # sleep to wait for the operation completion - eventlet.greenthread.sleep(0) + self.driver_rest_call_mock.assert_has_calls(calls, + any_order=True) #Test DB new_vip = self.plugin_instance.get_vip( @@ -232,12 +245,10 @@ class TestLoadBalancerPlugin(TestLoadBalancerPluginBase): mock.call('DELETE', u'/api/workflow/' + pool['pool']['id'], None, None) ] - self.rest_call_mock.assert_has_calls(calls, any_order=True) - # need to wait some time to allow driver to delete vip - eventlet.greenthread.sleep(1) + self.driver_rest_call_mock.assert_has_calls( + calls, any_order=True) def test_update_vip(self): - self.rest_call_mock.reset_mock() with self.subnet() as subnet: with self.pool(provider='radware', no_delete=True) as pool: vip_data = { @@ -268,15 +279,9 @@ class TestLoadBalancerPlugin(TestLoadBalancerPluginBase): '/action/BaseCreate', mock.ANY, driver.TEMPLATE_HEADER), ] - self.rest_call_mock.assert_has_calls(calls, any_order=True) + self.driver_rest_call_mock.assert_has_calls( + calls, any_order=True) - updated_vip = self.plugin_instance.get_vip( - context.get_admin_context(), vip['id']) - self.assertEqual(updated_vip['status'], - constants.PENDING_UPDATE) - - # sleep to wait for the operation completion - eventlet.greenthread.sleep(1) updated_vip = self.plugin_instance.get_vip( context.get_admin_context(), vip['id']) self.assertEqual(updated_vip['status'], constants.ACTIVE) @@ -286,7 +291,6 @@ class TestLoadBalancerPlugin(TestLoadBalancerPluginBase): context.get_admin_context(), vip['id']) def test_delete_vip_failure(self): - self.rest_call_mock.reset_mock() plugin = self.plugin_instance with self.network(do_delete=False) as network: @@ -306,8 +310,6 @@ class TestLoadBalancerPlugin(TestLoadBalancerPluginBase): context.get_admin_context(), hm, pool['pool']['id'] ) - eventlet.greenthread.sleep(1) - rest_call_function_mock.__dict__.update( {'RESPOND_WITH_ERROR': True}) @@ -333,7 +335,6 @@ class TestLoadBalancerPlugin(TestLoadBalancerPluginBase): self.assertEqual(u_phm['status'], constants.ACTIVE) def test_delete_vip(self): - self.rest_call_mock.reset_mock() with self.subnet() as subnet: with self.pool(provider='radware', no_delete=True) as pool: vip_data = { @@ -360,14 +361,14 @@ class TestLoadBalancerPlugin(TestLoadBalancerPluginBase): mock.call('DELETE', '/api/workflow/' + pool['pool']['id'], None, None) ] - self.rest_call_mock.assert_has_calls(calls, any_order=True) + self.driver_rest_call_mock.assert_has_calls( + calls, any_order=True) self.assertRaises(loadbalancer.VipNotFound, self.plugin_instance.get_vip, context.get_admin_context(), vip['id']) def test_update_pool(self): - self.rest_call_mock.reset_mock() with self.subnet(): with self.pool() as pool: del pool['pool']['provider'] @@ -380,7 +381,6 @@ class TestLoadBalancerPlugin(TestLoadBalancerPluginBase): self.assertEqual(pool_db['status'], constants.PENDING_UPDATE) def test_delete_pool_with_vip(self): - self.rest_call_mock.reset_mock() with self.subnet() as subnet: with self.pool(provider='radware', no_delete=True) as pool: with self.vip(pool=pool, subnet=subnet): @@ -390,7 +390,6 @@ class TestLoadBalancerPlugin(TestLoadBalancerPluginBase): pool['pool']['id']) def test_create_member_with_vip(self): - self.rest_call_mock.reset_mock() with self.subnet() as subnet: with self.pool(provider='radware') as p: with self.vip(pool=p, subnet=subnet): @@ -407,11 +406,10 @@ class TestLoadBalancerPlugin(TestLoadBalancerPluginBase): mock.ANY, driver.TEMPLATE_HEADER ) ] - self.rest_call_mock.assert_has_calls(calls, - any_order=True) + self.driver_rest_call_mock.assert_has_calls( + calls, any_order=True) def test_update_member_with_vip(self): - self.rest_call_mock.reset_mock() with self.subnet() as subnet: with self.pool(provider='radware') as p: with self.member(pool_id=p['pool']['id']) as member: @@ -432,16 +430,14 @@ class TestLoadBalancerPlugin(TestLoadBalancerPluginBase): mock.ANY, driver.TEMPLATE_HEADER ) ] - self.rest_call_mock.assert_has_calls(calls, - any_order=True) + self.driver_rest_call_mock.assert_has_calls( + calls, any_order=True) updated_member = self.plugin_instance.get_member( context.get_admin_context(), member['member']['id'] ) - # sleep to wait for the operation completion - eventlet.greenthread.sleep(0) updated_member = self.plugin_instance.get_member( context.get_admin_context(), member['member']['id'] @@ -450,7 +446,6 @@ class TestLoadBalancerPlugin(TestLoadBalancerPluginBase): constants.ACTIVE) def test_update_member_without_vip(self): - self.rest_call_mock.reset_mock() with self.subnet(): with self.pool(provider='radware') as pool: with self.member(pool_id=pool['pool']['id']) as member: @@ -463,7 +458,6 @@ class TestLoadBalancerPlugin(TestLoadBalancerPluginBase): constants.PENDING_UPDATE) def test_delete_member_with_vip(self): - self.rest_call_mock.reset_mock() with self.subnet() as subnet: with self.pool(provider='radware') as p: with self.member(pool_id=p['pool']['id'], @@ -474,21 +468,22 @@ class TestLoadBalancerPlugin(TestLoadBalancerPluginBase): # wait for being sure the member # Changed status from PENDING-CREATE # to ACTIVE - self.rest_call_mock.reset_mock() - eventlet.greenthread.sleep(1) self.plugin_instance.delete_member( context.get_admin_context(), m['member']['id'] ) - args, kwargs = self.rest_call_mock.call_args + name, args, kwargs = ( + self.driver_rest_call_mock.mock_calls[-2] + ) deletion_post_graph = str(args[2]) self.assertTrue(re.search( r'.*\'member_address_array\': \[\].*', deletion_post_graph )) + calls = [ mock.call( 'POST', '/api/workflow/' + p['pool']['id'] + @@ -496,17 +491,15 @@ class TestLoadBalancerPlugin(TestLoadBalancerPluginBase): mock.ANY, driver.TEMPLATE_HEADER ) ] - self.rest_call_mock.assert_has_calls( + self.driver_rest_call_mock.assert_has_calls( calls, any_order=True) - eventlet.greenthread.sleep(1) self.assertRaises(loadbalancer.MemberNotFound, self.plugin_instance.get_member, context.get_admin_context(), m['member']['id']) def test_delete_member_without_vip(self): - self.rest_call_mock.reset_mock() with self.subnet(): with self.pool(provider='radware') as p: with self.member(pool_id=p['pool']['id'], no_delete=True) as m: @@ -519,7 +512,6 @@ class TestLoadBalancerPlugin(TestLoadBalancerPluginBase): m['member']['id']) def test_create_hm_with_vip(self): - self.rest_call_mock.reset_mock() with self.subnet() as subnet: with self.health_monitor() as hm: with self.pool(provider='radware') as pool: @@ -543,11 +535,9 @@ class TestLoadBalancerPlugin(TestLoadBalancerPluginBase): mock.ANY, driver.TEMPLATE_HEADER ) ] - self.rest_call_mock.assert_has_calls( + self.driver_rest_call_mock.assert_has_calls( calls, any_order=True) - eventlet.greenthread.sleep(1) - phm = self.plugin_instance.get_pool_health_monitor( context.get_admin_context(), hm['health_monitor']['id'], pool['pool']['id'] @@ -555,7 +545,6 @@ class TestLoadBalancerPlugin(TestLoadBalancerPluginBase): self.assertEqual(phm['status'], constants.ACTIVE) def test_delete_pool_hm_with_vip(self): - self.rest_call_mock.reset_mock() with self.subnet() as subnet: with self.health_monitor(no_delete=True) as hm: with self.pool(provider='radware') as pool: @@ -565,21 +554,15 @@ class TestLoadBalancerPlugin(TestLoadBalancerPluginBase): hm, pool['pool']['id'] ) - # Reset mock and - # wait for being sure that status - # changed from PENDING-CREATE - # to ACTIVE - self.rest_call_mock.reset_mock() - eventlet.greenthread.sleep(1) - self.plugin_instance.delete_pool_health_monitor( context.get_admin_context(), hm['health_monitor']['id'], pool['pool']['id'] ) - eventlet.greenthread.sleep(1) - name, args, kwargs = self.rest_call_mock.mock_calls[-2] + name, args, kwargs = ( + self.driver_rest_call_mock.mock_calls[-2] + ) deletion_post_graph = str(args[2]) self.assertTrue(re.search( @@ -594,7 +577,7 @@ class TestLoadBalancerPlugin(TestLoadBalancerPluginBase): mock.ANY, driver.TEMPLATE_HEADER ) ] - self.rest_call_mock.assert_has_calls( + self.driver_rest_call_mock.assert_has_calls( calls, any_order=True) self.assertRaises(