diff --git a/neutron/plugins/mlnx/agent/eswitch_neutron_agent.py b/neutron/plugins/mlnx/agent/eswitch_neutron_agent.py index e3e0e4feea..f0e79915ef 100644 --- a/neutron/plugins/mlnx/agent/eswitch_neutron_agent.py +++ b/neutron/plugins/mlnx/agent/eswitch_neutron_agent.py @@ -168,45 +168,10 @@ class MlnxEswitchRpcCallbacks(n_rpc.RpcCallback, self.eswitch.remove_network(network_id) def port_update(self, context, **kwargs): - LOG.debug(_("port_update received")) port = kwargs.get('port') - net_type = kwargs.get('network_type') - segmentation_id = kwargs.get('segmentation_id') - if not segmentation_id: - # compatibility with pre-Havana RPC vlan_id encoding - segmentation_id = kwargs.get('vlan_id') - physical_network = kwargs.get('physical_network') - net_id = port['network_id'] - if self.eswitch.vnic_port_exists(port['mac_address']): - if 'security_groups' in port: - self.sg_agent.refresh_firewall() - try: - if port['admin_state_up']: - self.eswitch.port_up(net_id, - net_type, - physical_network, - segmentation_id, - port['id'], - port['mac_address']) - # update plugin about port status - self.agent.plugin_rpc.update_device_up(self.context, - port['mac_address'], - self.agent.agent_id, - cfg.CONF.host) - else: - self.eswitch.port_down(net_id, - physical_network, - port['mac_address']) - # update plugin about port status - self.agent.plugin_rpc.update_device_down( - self.context, - port['mac_address'], - self.agent.agent_id, - cfg.CONF.host) - except n_rpc.MessagingTimeout: - LOG.error(_("RPC timeout while updating port %s"), port['id']) - else: - LOG.debug(_("No port %s defined on agent."), port['id']) + self.agent.add_port_update(port['mac_address']) + LOG.debug("port_update message processed for port with mac %s", + port['mac_address']) class MlnxEswitchPluginApi(agent_rpc.PluginApi, @@ -229,6 +194,8 @@ class MlnxEswitchNeutronAgent(sg_rpc.SecurityGroupAgentRpcMixin): 'configurations': configurations, 'agent_type': q_constants.AGENT_TYPE_MLNX, 'start_flag': True} + # Stores port update notifications for processing in main rpc loop + self.updated_ports = set() self._setup_rpc() self.init_firewall() @@ -272,24 +239,27 @@ class MlnxEswitchNeutronAgent(sg_rpc.SecurityGroupAgentRpcMixin): self._report_state) heartbeat.start(interval=report_interval) - def update_ports(self, registered_ports): - ports = self.eswitch.get_vnics_mac() - if ports == registered_ports: - return - added = ports - registered_ports - removed = registered_ports - ports - return {'current': ports, - 'added': added, - 'removed': removed} + def add_port_update(self, port): + self.updated_ports.add(port) + + def scan_ports(self, registered_ports, updated_ports_copy=None): + cur_ports = self.eswitch.get_vnics_mac() + port_info = {'current': cur_ports} + # Shouldn't process updates for not existing ports + port_info['updated'] = updated_ports_copy & cur_ports + port_info['added'] = cur_ports - registered_ports + port_info['removed'] = registered_ports - cur_ports + return port_info def process_network_ports(self, port_info): resync_a = False resync_b = False - if port_info.get('added'): - LOG.debug(_("Ports added!")) - resync_a = self.treat_devices_added(port_info['added']) - if port_info.get('removed'): - LOG.debug(_("Ports removed!")) + device_added_updated = port_info['added'] | port_info['updated'] + + if device_added_updated: + resync_a = self.treat_devices_added_or_updated( + device_added_updated) + if port_info['removed']: resync_b = self.treat_devices_removed(port_info['removed']) # If one of the above opertaions fails => resync with plugin return (resync_a | resync_b) @@ -311,7 +281,7 @@ class MlnxEswitchNeutronAgent(sg_rpc.SecurityGroupAgentRpcMixin): else: LOG.debug(_("No port %s defined on agent."), port_id) - def treat_devices_added(self, devices): + def treat_devices_added_or_updated(self, devices): try: devs_details_list = self.plugin_rpc.get_devices_details_list( self.context, @@ -326,11 +296,11 @@ class MlnxEswitchNeutronAgent(sg_rpc.SecurityGroupAgentRpcMixin): for dev_details in devs_details_list: device = dev_details['device'] - LOG.info(_("Adding port with mac %s"), device) + LOG.info(_("Adding or updating port with mac %s"), device) if 'port_id' in dev_details: LOG.info(_("Port %s updated"), device) - LOG.debug(_("Device details %s"), str(dev_details)) + LOG.debug("Device details %s", str(dev_details)) self.treat_vif_port(dev_details['port_id'], dev_details['device'], dev_details['network_id'], @@ -339,12 +309,16 @@ class MlnxEswitchNeutronAgent(sg_rpc.SecurityGroupAgentRpcMixin): dev_details['segmentation_id'], dev_details['admin_state_up']) if dev_details.get('admin_state_up'): - self.plugin_rpc.update_device_up(self.context, - device, - self.agent_id) + LOG.debug("Setting status for %s to UP", device) + self.plugin_rpc.update_device_up( + self.context, device, self.agent_id) + else: + LOG.debug("Setting status for %s to DOWN", device) + self.plugin_rpc.update_device_down( + self.context, device, self.agent_id) else: - LOG.debug(_("Device with mac_address %s not defined " - "on Neutron Plugin"), device) + LOG.debug("Device with mac_address %s not defined " + "on Neutron Plugin", device) return False def treat_devices_removed(self, devices): @@ -369,27 +343,37 @@ class MlnxEswitchNeutronAgent(sg_rpc.SecurityGroupAgentRpcMixin): self.eswitch.port_release(device) return resync + def _port_info_has_changes(self, port_info): + return (port_info['added'] or + port_info['removed'] or + port_info['updated']) + def daemon_loop(self): sync = True ports = set() + updated_ports_copy = set() LOG.info(_("eSwitch Agent Started!")) while True: - try: - start = time.time() - if sync: - LOG.info(_("Agent out of sync with plugin!")) - ports.clear() - sync = False + start = time.time() + if sync: + LOG.info(_("Agent out of sync with plugin!")) + ports.clear() + sync = False - port_info = self.update_ports(ports) - # notify plugin about port deltas - if port_info: - LOG.debug(_("Agent loop process devices!")) - # If treat devices fails - must resync with plugin + try: + updated_ports_copy = self.updated_ports + self.updated_ports = set() + port_info = self.scan_ports(ports, updated_ports_copy) + LOG.debug("Agent loop process devices!") + # If treat devices fails - must resync with plugin + ports = port_info['current'] + if self._port_info_has_changes(port_info): + LOG.debug("Starting to process devices in:%s", port_info) + # sync with upper/lower layers about port deltas sync = self.process_network_ports(port_info) - ports = port_info['current'] + except exceptions.RequestTimeout: LOG.exception(_("Request timeout in agent event loop " "eSwitchD is not responding - exiting...")) @@ -397,6 +381,7 @@ class MlnxEswitchNeutronAgent(sg_rpc.SecurityGroupAgentRpcMixin): except Exception: LOG.exception(_("Error in agent event loop")) sync = True + self.updated_ports |= updated_ports_copy # sleep till end of polling interval elapsed = (time.time() - start) if (elapsed < self._polling_interval): diff --git a/neutron/tests/unit/mlnx/test_mlnx_neutron_agent.py b/neutron/tests/unit/mlnx/test_mlnx_neutron_agent.py index f45bece67e..8d67e04365 100644 --- a/neutron/tests/unit/mlnx/test_mlnx_neutron_agent.py +++ b/neutron/tests/unit/mlnx/test_mlnx_neutron_agent.py @@ -45,6 +45,23 @@ class TestEswichManager(base.BaseTestCase): self.manager.get_port_id_by_mac('no-such-mac') +class TestMlnxEswitchRpcCallbacks(base.BaseTestCase): + + def setUp(self): + super(TestMlnxEswitchRpcCallbacks, self).setUp() + agent = mock.Mock() + self.rpc_callbacks = eswitch_neutron_agent.MlnxEswitchRpcCallbacks( + 'context', + agent + ) + + def test_port_update(self): + port = {'mac_address': '10:20:30:40:50:60'} + add_port_update = self.rpc_callbacks.agent.add_port_update + self.rpc_callbacks.port_update('context', port=port) + add_port_update.assert_called_once_with(port['mac_address']) + + class TestEswitchAgent(base.BaseTestCase): def setUp(self): @@ -82,9 +99,9 @@ class TestEswitchAgent(base.BaseTestCase): mock.patch('neutron.plugins.mlnx.agent.eswitch_neutron_agent.' 'EswitchManager.get_vnics_mac', return_value=[])): - self.assertTrue(self.agent.treat_devices_added([{}])) + self.assertTrue(self.agent.treat_devices_added_or_updated([{}])) - def _mock_treat_devices_added(self, details, func_name): + def _mock_treat_devices_added_updated(self, details, func_name): """Mock treat devices added. :param details: the details to return for the device @@ -101,14 +118,14 @@ class TestEswitchAgent(base.BaseTestCase): mock.patch.object(self.agent.plugin_rpc, 'update_device_up'), mock.patch.object(self.agent, func_name) ) as (vnics_fn, get_dev_fn, upd_dev_up, func): - self.assertFalse(self.agent.treat_devices_added([{}])) + self.assertFalse(self.agent.treat_devices_added_or_updated([{}])) return (func.called, upd_dev_up.called) def test_treat_devices_added_updates_known_port(self): details = mock.MagicMock() details.__contains__.side_effect = lambda x: True - func, dev_up = self._mock_treat_devices_added(details, - 'treat_vif_port') + func, dev_up = self._mock_treat_devices_added_updated(details, + 'treat_vif_port') self.assertTrue(func) self.assertTrue(dev_up) @@ -120,8 +137,8 @@ class TestEswitchAgent(base.BaseTestCase): 'physical_network': 'default', 'segmentation_id': 2, 'admin_state_up': False} - func, dev_up = self._mock_treat_devices_added(details, - 'treat_vif_port') + func, dev_up = self._mock_treat_devices_added_updated(details, + 'treat_vif_port') self.assertTrue(func) self.assertFalse(dev_up) @@ -139,17 +156,75 @@ class TestEswitchAgent(base.BaseTestCase): self.assertFalse(self.agent.treat_devices_removed([{}])) self.assertTrue(port_release.called) + def _test_process_network_ports(self, port_info): + with contextlib.nested( + mock.patch.object(self.agent, 'treat_devices_added_or_updated', + return_value=False), + mock.patch.object(self.agent, 'treat_devices_removed', + return_value=False) + ) as (device_added_updated, device_removed): + self.assertFalse(self.agent.process_network_ports(port_info)) + device_added_updated.assert_called_once_with( + port_info['added'] | port_info['updated']) + device_removed.assert_called_once_with(port_info['removed']) + def test_process_network_ports(self): - current_ports = set(['01:02:03:04:05:06']) - added_ports = set(['10:20:30:40:50:60']) - removed_ports = set(['11:22:33:44:55:66']) - reply = {'current': current_ports, - 'removed': removed_ports, - 'added': added_ports} - with mock.patch.object(self.agent, 'treat_devices_added', - return_value=False) as device_added: - with mock.patch.object(self.agent, 'treat_devices_removed', - return_value=False) as device_removed: - self.assertFalse(self.agent.process_network_ports(reply)) - device_added.assert_called_once_with(added_ports) - device_removed.assert_called_once_with(removed_ports) + self._test_process_network_ports( + {'current': set(['10:20:30:40:50:60']), + 'updated': set(), + 'added': set(['11:21:31:41:51:61']), + 'removed': set(['13:23:33:43:53:63'])}) + + def test_process_network_ports_with_updated_ports(self): + self._test_process_network_ports( + {'current': set(['10:20:30:40:50:60']), + 'updated': set(['12:22:32:42:52:62']), + 'added': set(['11:21:31:41:51:61']), + 'removed': set(['13:23:33:43:53:63'])}) + + def test_add_port_update(self): + mac_addr = '10:20:30:40:50:60' + self.agent.add_port_update(mac_addr) + self.assertEqual(set([mac_addr]), self.agent.updated_ports) + + def _mock_scan_ports(self, vif_port_set, registered_ports, updated_ports): + with mock.patch.object(self.agent.eswitch, 'get_vnics_mac', + return_value=vif_port_set): + return self.agent.scan_ports(registered_ports, updated_ports) + + def test_scan_ports_return_current_for_unchanged_ports(self): + vif_port_set = set([1, 2]) + registered_ports = set([1, 2]) + actual = self._mock_scan_ports(vif_port_set, + registered_ports, set()) + expected = dict(current=vif_port_set, added=set(), + removed=set(), updated=set()) + self.assertEqual(expected, actual) + + def test_scan_ports_return_port_changes(self): + vif_port_set = set([1, 3]) + registered_ports = set([1, 2]) + actual = self._mock_scan_ports(vif_port_set, + registered_ports, set()) + expected = dict(current=vif_port_set, added=set([3]), + removed=set([2]), updated=set()) + self.assertEqual(expected, actual) + + def test_scan_ports_with_updated_ports(self): + vif_port_set = set([1, 3, 4]) + registered_ports = set([1, 2, 4]) + actual = self._mock_scan_ports(vif_port_set, + registered_ports, set([4])) + expected = dict(current=vif_port_set, added=set([3]), + removed=set([2]), updated=set([4])) + self.assertEqual(expected, actual) + + def test_scan_ports_with_unknown_updated_ports(self): + vif_port_set = set([1, 3, 4]) + registered_ports = set([1, 2, 4]) + actual = self._mock_scan_ports(vif_port_set, + registered_ports, + updated_ports=set([4, 5])) + expected = dict(current=vif_port_set, added=set([3]), + removed=set([2]), updated=set([4])) + self.assertEqual(expected, actual)