From 33a1c7235bee3ba5da04a3593b0416de79bd2273 Mon Sep 17 00:00:00 2001 From: mathieu-rohon Date: Fri, 14 Mar 2014 10:17:55 +0100 Subject: [PATCH] Send fdb remove message when a port is migrated the fdb_remove rpc message is sent when the status of the port goes to BUILD, that is when the new host send a get_device_details which means that it owns the migrated port. The fdb_add message will be sent as soon as the new host send update_device_up Closes bug: #1237841 Change-Id: Ibdc7768d8db922b7e6eb9dc505382168cbb8e55d --- .../plugins/ml2/drivers/l2pop/mech_driver.py | 59 +++++---- neutron/plugins/ml2/rpc.py | 4 + .../unit/ml2/drivers/test_l2population.py | 122 +++++++++++++++++- neutron/tests/unit/ml2/test_port_binding.py | 3 +- 4 files changed, 164 insertions(+), 24 deletions(-) diff --git a/neutron/plugins/ml2/drivers/l2pop/mech_driver.py b/neutron/plugins/ml2/drivers/l2pop/mech_driver.py index df9bcbb57a..7c2719ddd6 100644 --- a/neutron/plugins/ml2/drivers/l2pop/mech_driver.py +++ b/neutron/plugins/ml2/drivers/l2pop/mech_driver.py @@ -37,6 +37,8 @@ class L2populationMechanismDriver(api.MechanismDriver, def initialize(self): LOG.debug(_("Experimental L2 population driver")) self.rpc_ctx = n_context.get_admin_context_without_session() + self.migrated_ports = {} + self.deleted_ports = {} def _get_port_fdb_entries(self, port): return [[port['mac_address'], @@ -47,11 +49,15 @@ class L2populationMechanismDriver(api.MechanismDriver, # available in delete_port_postcommit. in delete_port_postcommit # agent_active_ports will be equal to 0, and the _update_port_down # won't need agent_active_ports_count_for_flooding anymore - self.remove_fdb_entries = self._update_port_down(context, 1) + port_context = context.current + fdb_entries = self._update_port_down(context, port_context, 1) + self.deleted_ports[context.current['id']] = fdb_entries def delete_port_postcommit(self, context): - l2pop_rpc.L2populationAgentNotify.remove_fdb_entries( - self.rpc_ctx, self.remove_fdb_entries) + fanout_msg = self.deleted_ports.pop(context.current['id'], None) + if fanout_msg: + l2pop_rpc.L2populationAgentNotify.remove_fdb_entries( + self.rpc_ctx, fanout_msg) def _get_diff_ips(self, orig, port): orig_ips = set([ip['ip_address'] for ip in orig['fixed_ips']]) @@ -64,10 +70,7 @@ class L2populationMechanismDriver(api.MechanismDriver, if orig_chg_ips or port_chg_ips: return orig_chg_ips, port_chg_ips - def _fixed_ips_changed(self, context, orig, port): - diff_ips = self._get_diff_ips(orig, port) - if not diff_ips: - return + def _fixed_ips_changed(self, context, orig, port, diff_ips): orig_ips, port_ips = diff_ips port_infos = self._get_port_infos(context, orig) @@ -96,14 +99,30 @@ class L2populationMechanismDriver(api.MechanismDriver, port = context.current orig = context.original - if port['status'] == orig['status']: - self._fixed_ips_changed(context, orig, port) - elif port['status'] == const.PORT_STATUS_ACTIVE: - self._update_port_up(context) - elif port['status'] == const.PORT_STATUS_DOWN: - fdb_entries = self._update_port_down(context) - l2pop_rpc.L2populationAgentNotify.remove_fdb_entries( - self.rpc_ctx, fdb_entries) + diff_ips = self._get_diff_ips(orig, port) + if diff_ips: + self._fixed_ips_changed(context, orig, port, diff_ips) + if (port['binding:host_id'] != orig['binding:host_id'] + and port['status'] == const.PORT_STATUS_ACTIVE + and not self.migrated_ports.get(orig['id'])): + # The port has been migrated. We have to store the original + # binding to send appropriate fdb once the port will be set + # on the destination host + self.migrated_ports[orig['id']] = orig + elif port['status'] != orig['status']: + if port['status'] == const.PORT_STATUS_ACTIVE: + self._update_port_up(context) + elif port['status'] == const.PORT_STATUS_DOWN: + fdb_entries = self._update_port_down(context, port) + l2pop_rpc.L2populationAgentNotify.remove_fdb_entries( + self.rpc_ctx, fdb_entries) + elif port['status'] == const.PORT_STATUS_BUILD: + orig = self.migrated_ports.pop(port['id'], None) + if orig: + # this port has been migrated : remove its entries from fdb + fdb_entries = self._update_port_down(context, orig) + l2pop_rpc.L2populationAgentNotify.remove_fdb_entries( + self.rpc_ctx, fdb_entries) def _get_port_infos(self, context, port): agent_host = port['binding:host_id'] @@ -196,9 +215,8 @@ class L2populationMechanismDriver(api.MechanismDriver, l2pop_rpc.L2populationAgentNotify.add_fdb_entries(self.rpc_ctx, other_fdb_entries) - def _update_port_down(self, context, + def _update_port_down(self, context, port_context, agent_active_ports_count_for_flooding=0): - port_context = context.current port_infos = self._get_port_infos(context, port_context) if not port_infos: return @@ -215,15 +233,12 @@ class L2populationMechanismDriver(api.MechanismDriver, {'segment_id': segment['segmentation_id'], 'network_type': segment['network_type'], 'ports': {agent_ip: []}}} - if agent_active_ports == agent_active_ports_count_for_flooding: # Agent is removing its last activated port in this network, # other agents needs to be notified to delete their flooding entry. other_fdb_entries[network_id]['ports'][agent_ip].append( const.FLOODING_ENTRY) - - # Notify other agents to remove fdb rule for current port - fdb_entries = self._get_port_fdb_entries(port_context) - other_fdb_entries[network_id]['ports'][agent_ip] += fdb_entries + # Notify other agents to remove fdb rules for current port + other_fdb_entries[network_id]['ports'][agent_ip] += port_fdb_entries return other_fdb_entries diff --git a/neutron/plugins/ml2/rpc.py b/neutron/plugins/ml2/rpc.py index b77ddd1372..73f7b265d3 100644 --- a/neutron/plugins/ml2/rpc.py +++ b/neutron/plugins/ml2/rpc.py @@ -139,6 +139,10 @@ class RpcCallbacks(dhcp_rpc_base.DhcpRpcCallbackMixin, new_status = (q_const.PORT_STATUS_BUILD if port.admin_state_up else q_const.PORT_STATUS_DOWN) if port.status != new_status: + plugin = manager.NeutronManager.get_plugin() + plugin.update_port_status(rpc_context, + port_id, + new_status) port.status = new_status entry = {'device': device, 'network_id': port.network_id, diff --git a/neutron/tests/unit/ml2/drivers/test_l2population.py b/neutron/tests/unit/ml2/drivers/test_l2population.py index 2698b3db37..7e4efb11c9 100644 --- a/neutron/tests/unit/ml2/drivers/test_l2population.py +++ b/neutron/tests/unit/ml2/drivers/test_l2population.py @@ -61,13 +61,24 @@ L2_AGENT_3 = { 'binary': 'neutron-openvswitch-agent', 'host': HOST + '_3', 'topic': constants.L2_AGENT_TOPIC, - 'configurations': {'tunneling_ip': '20.0.0.2', + 'configurations': {'tunneling_ip': '20.0.0.3', 'tunnel_types': []}, 'agent_type': constants.AGENT_TYPE_OVS, 'tunnel_type': [], 'start_flag': True } +L2_AGENT_4 = { + 'binary': 'neutron-openvswitch-agent', + 'host': HOST + '_4', + 'topic': constants.L2_AGENT_TOPIC, + 'configurations': {'tunneling_ip': '20.0.0.4', + 'tunnel_types': ['vxlan']}, + 'agent_type': constants.AGENT_TYPE_OVS, + 'tunnel_type': [], + 'start_flag': True +} + PLUGIN_NAME = 'neutron.plugins.ml2.plugin.Ml2Plugin' NOTIFIER = 'neutron.plugins.ml2.rpc.AgentNotifierApi' @@ -137,6 +148,9 @@ class TestL2PopulationRpcTestCase(test_plugin.NeutronDbPluginV2TestCase): callback.report_state(self.adminContext, agent_state={'agent_state': L2_AGENT_3}, time=timeutils.strtime()) + callback.report_state(self.adminContext, + agent_state={'agent_state': L2_AGENT_4}, + time=timeutils.strtime()) def test_fdb_add_called(self): self._register_ml2_agents() @@ -603,3 +617,109 @@ class TestL2PopulationRpcTestCase(test_plugin.NeutronDbPluginV2TestCase): self.assertFalse(mock_fanout.called) fanout_patch.stop() + + def test_host_changed(self): + self._register_ml2_agents() + with self.subnet(network=self._network) as subnet: + host_arg = {portbindings.HOST_ID: L2_AGENT['host']} + host2_arg = {portbindings.HOST_ID: L2_AGENT_2['host']} + with self.port(subnet=subnet, cidr='10.0.0.0/24', + arg_list=(portbindings.HOST_ID,), + **host_arg) as port1: + with self.port(subnet=subnet, cidr='10.0.0.0/24', + arg_list=(portbindings.HOST_ID,), + **host2_arg) as port2: + p1 = port1['port'] + device1 = 'tap' + p1['id'] + self.callbacks.update_device_up( + self.adminContext, + agent_id=L2_AGENT['host'], + device=device1) + p2 = port2['port'] + device2 = 'tap' + p2['id'] + self.callbacks.update_device_up( + self.adminContext, + agent_id=L2_AGENT_2['host'], + device=device2) + data2 = {'port': {'binding:host_id': L2_AGENT_2['host']}} + req = self.new_update_request('ports', data2, p1['id']) + res = self.deserialize(self.fmt, + req.get_response(self.api)) + self.assertEqual(res['port']['binding:host_id'], + L2_AGENT_2['host']) + self.mock_fanout.reset_mock() + self.callbacks.get_device_details( + self.adminContext, + device=device1, + agent_id=L2_AGENT_2['host']) + p1_ips = [p['ip_address'] for p in p1['fixed_ips']] + expected = {'args': + {'fdb_entries': + {p1['network_id']: + {'ports': + {'20.0.0.1': [constants.FLOODING_ENTRY, + [p1['mac_address'], + p1_ips[0]]]}, + 'network_type': 'vxlan', + 'segment_id': 1}}}, + 'namespace': None, + 'method': 'remove_fdb_entries'} + + self.mock_fanout.assert_called_with( + mock.ANY, expected, topic=self.fanout_topic) + + def test_host_changed_twice(self): + self._register_ml2_agents() + with self.subnet(network=self._network) as subnet: + host_arg = {portbindings.HOST_ID: L2_AGENT['host']} + host2_arg = {portbindings.HOST_ID: L2_AGENT_2['host']} + with self.port(subnet=subnet, cidr='10.0.0.0/24', + arg_list=(portbindings.HOST_ID,), + **host_arg) as port1: + with self.port(subnet=subnet, cidr='10.0.0.0/24', + arg_list=(portbindings.HOST_ID,), + **host2_arg) as port2: + p1 = port1['port'] + device1 = 'tap' + p1['id'] + self.callbacks.update_device_up( + self.adminContext, + agent_id=L2_AGENT['host'], + device=device1) + p2 = port2['port'] + device2 = 'tap' + p2['id'] + self.callbacks.update_device_up( + self.adminContext, + agent_id=L2_AGENT_2['host'], + device=device2) + data2 = {'port': {'binding:host_id': L2_AGENT_2['host']}} + req = self.new_update_request('ports', data2, p1['id']) + res = self.deserialize(self.fmt, + req.get_response(self.api)) + self.assertEqual(res['port']['binding:host_id'], + L2_AGENT_2['host']) + data4 = {'port': {'binding:host_id': L2_AGENT_4['host']}} + req = self.new_update_request('ports', data4, p1['id']) + res = self.deserialize(self.fmt, + req.get_response(self.api)) + self.assertEqual(res['port']['binding:host_id'], + L2_AGENT_4['host']) + self.mock_fanout.reset_mock() + self.callbacks.get_device_details( + self.adminContext, + device=device1, + agent_id=L2_AGENT_4['host']) + p1_ips = [p['ip_address'] for p in p1['fixed_ips']] + expected = {'args': + {'fdb_entries': + {p1['network_id']: + {'ports': + {'20.0.0.1': [constants.FLOODING_ENTRY, + [p1['mac_address'], + p1_ips[0]]]}, + 'network_type': 'vxlan', + 'segment_id': 1}}}, + 'namespace': None, + 'method': 'remove_fdb_entries'} + + self.mock_fanout.assert_called_with( + mock.ANY, expected, topic=self.fanout_topic) diff --git a/neutron/tests/unit/ml2/test_port_binding.py b/neutron/tests/unit/ml2/test_port_binding.py index f8946f76a9..a1d002f24b 100644 --- a/neutron/tests/unit/ml2/test_port_binding.py +++ b/neutron/tests/unit/ml2/test_port_binding.py @@ -57,8 +57,9 @@ class PortBindingTestCase(test_plugin.NeutronDbPluginV2TestCase): self._check_response(port['port'], vif_type, has_port_filter, bound) port_id = port['port']['id'] + neutron_context = context.get_admin_context() details = self.plugin.callbacks.get_device_details( - None, agent_id="theAgentId", device=port_id) + neutron_context, agent_id="theAgentId", device=port_id) if bound: self.assertEqual(details['network_type'], 'local') else: