Send fdb remove message when a port is migrated

the fdb_remove rpc message is sent when the status
of the port goes to BUILD, that is when the new host
send a get_device_details which means that it owns
the migrated port. The fdb_add message will be sent
as soon as the new host send update_device_up

Closes bug: #1237841

Change-Id: Ibdc7768d8db922b7e6eb9dc505382168cbb8e55d
This commit is contained in:
mathieu-rohon 2014-03-14 10:17:55 +01:00
parent 314951e1b4
commit 33a1c7235b
4 changed files with 164 additions and 24 deletions

View File

@ -37,6 +37,8 @@ class L2populationMechanismDriver(api.MechanismDriver,
def initialize(self): def initialize(self):
LOG.debug(_("Experimental L2 population driver")) LOG.debug(_("Experimental L2 population driver"))
self.rpc_ctx = n_context.get_admin_context_without_session() self.rpc_ctx = n_context.get_admin_context_without_session()
self.migrated_ports = {}
self.deleted_ports = {}
def _get_port_fdb_entries(self, port): def _get_port_fdb_entries(self, port):
return [[port['mac_address'], return [[port['mac_address'],
@ -47,11 +49,15 @@ class L2populationMechanismDriver(api.MechanismDriver,
# available in delete_port_postcommit. in delete_port_postcommit # available in delete_port_postcommit. in delete_port_postcommit
# agent_active_ports will be equal to 0, and the _update_port_down # agent_active_ports will be equal to 0, and the _update_port_down
# won't need agent_active_ports_count_for_flooding anymore # won't need agent_active_ports_count_for_flooding anymore
self.remove_fdb_entries = self._update_port_down(context, 1) port_context = context.current
fdb_entries = self._update_port_down(context, port_context, 1)
self.deleted_ports[context.current['id']] = fdb_entries
def delete_port_postcommit(self, context): def delete_port_postcommit(self, context):
l2pop_rpc.L2populationAgentNotify.remove_fdb_entries( fanout_msg = self.deleted_ports.pop(context.current['id'], None)
self.rpc_ctx, self.remove_fdb_entries) if fanout_msg:
l2pop_rpc.L2populationAgentNotify.remove_fdb_entries(
self.rpc_ctx, fanout_msg)
def _get_diff_ips(self, orig, port): def _get_diff_ips(self, orig, port):
orig_ips = set([ip['ip_address'] for ip in orig['fixed_ips']]) orig_ips = set([ip['ip_address'] for ip in orig['fixed_ips']])
@ -64,10 +70,7 @@ class L2populationMechanismDriver(api.MechanismDriver,
if orig_chg_ips or port_chg_ips: if orig_chg_ips or port_chg_ips:
return orig_chg_ips, port_chg_ips return orig_chg_ips, port_chg_ips
def _fixed_ips_changed(self, context, orig, port): def _fixed_ips_changed(self, context, orig, port, diff_ips):
diff_ips = self._get_diff_ips(orig, port)
if not diff_ips:
return
orig_ips, port_ips = diff_ips orig_ips, port_ips = diff_ips
port_infos = self._get_port_infos(context, orig) port_infos = self._get_port_infos(context, orig)
@ -96,14 +99,30 @@ class L2populationMechanismDriver(api.MechanismDriver,
port = context.current port = context.current
orig = context.original orig = context.original
if port['status'] == orig['status']: diff_ips = self._get_diff_ips(orig, port)
self._fixed_ips_changed(context, orig, port) if diff_ips:
elif port['status'] == const.PORT_STATUS_ACTIVE: self._fixed_ips_changed(context, orig, port, diff_ips)
self._update_port_up(context) if (port['binding:host_id'] != orig['binding:host_id']
elif port['status'] == const.PORT_STATUS_DOWN: and port['status'] == const.PORT_STATUS_ACTIVE
fdb_entries = self._update_port_down(context) and not self.migrated_ports.get(orig['id'])):
l2pop_rpc.L2populationAgentNotify.remove_fdb_entries( # The port has been migrated. We have to store the original
self.rpc_ctx, fdb_entries) # binding to send appropriate fdb once the port will be set
# on the destination host
self.migrated_ports[orig['id']] = orig
elif port['status'] != orig['status']:
if port['status'] == const.PORT_STATUS_ACTIVE:
self._update_port_up(context)
elif port['status'] == const.PORT_STATUS_DOWN:
fdb_entries = self._update_port_down(context, port)
l2pop_rpc.L2populationAgentNotify.remove_fdb_entries(
self.rpc_ctx, fdb_entries)
elif port['status'] == const.PORT_STATUS_BUILD:
orig = self.migrated_ports.pop(port['id'], None)
if orig:
# this port has been migrated : remove its entries from fdb
fdb_entries = self._update_port_down(context, orig)
l2pop_rpc.L2populationAgentNotify.remove_fdb_entries(
self.rpc_ctx, fdb_entries)
def _get_port_infos(self, context, port): def _get_port_infos(self, context, port):
agent_host = port['binding:host_id'] agent_host = port['binding:host_id']
@ -196,9 +215,8 @@ class L2populationMechanismDriver(api.MechanismDriver,
l2pop_rpc.L2populationAgentNotify.add_fdb_entries(self.rpc_ctx, l2pop_rpc.L2populationAgentNotify.add_fdb_entries(self.rpc_ctx,
other_fdb_entries) other_fdb_entries)
def _update_port_down(self, context, def _update_port_down(self, context, port_context,
agent_active_ports_count_for_flooding=0): agent_active_ports_count_for_flooding=0):
port_context = context.current
port_infos = self._get_port_infos(context, port_context) port_infos = self._get_port_infos(context, port_context)
if not port_infos: if not port_infos:
return return
@ -215,15 +233,12 @@ class L2populationMechanismDriver(api.MechanismDriver,
{'segment_id': segment['segmentation_id'], {'segment_id': segment['segmentation_id'],
'network_type': segment['network_type'], 'network_type': segment['network_type'],
'ports': {agent_ip: []}}} 'ports': {agent_ip: []}}}
if agent_active_ports == agent_active_ports_count_for_flooding: if agent_active_ports == agent_active_ports_count_for_flooding:
# Agent is removing its last activated port in this network, # Agent is removing its last activated port in this network,
# other agents needs to be notified to delete their flooding entry. # other agents needs to be notified to delete their flooding entry.
other_fdb_entries[network_id]['ports'][agent_ip].append( other_fdb_entries[network_id]['ports'][agent_ip].append(
const.FLOODING_ENTRY) const.FLOODING_ENTRY)
# Notify other agents to remove fdb rules for current port
# Notify other agents to remove fdb rule for current port other_fdb_entries[network_id]['ports'][agent_ip] += port_fdb_entries
fdb_entries = self._get_port_fdb_entries(port_context)
other_fdb_entries[network_id]['ports'][agent_ip] += fdb_entries
return other_fdb_entries return other_fdb_entries

View File

@ -139,6 +139,10 @@ class RpcCallbacks(dhcp_rpc_base.DhcpRpcCallbackMixin,
new_status = (q_const.PORT_STATUS_BUILD if port.admin_state_up new_status = (q_const.PORT_STATUS_BUILD if port.admin_state_up
else q_const.PORT_STATUS_DOWN) else q_const.PORT_STATUS_DOWN)
if port.status != new_status: if port.status != new_status:
plugin = manager.NeutronManager.get_plugin()
plugin.update_port_status(rpc_context,
port_id,
new_status)
port.status = new_status port.status = new_status
entry = {'device': device, entry = {'device': device,
'network_id': port.network_id, 'network_id': port.network_id,

View File

@ -61,13 +61,24 @@ L2_AGENT_3 = {
'binary': 'neutron-openvswitch-agent', 'binary': 'neutron-openvswitch-agent',
'host': HOST + '_3', 'host': HOST + '_3',
'topic': constants.L2_AGENT_TOPIC, 'topic': constants.L2_AGENT_TOPIC,
'configurations': {'tunneling_ip': '20.0.0.2', 'configurations': {'tunneling_ip': '20.0.0.3',
'tunnel_types': []}, 'tunnel_types': []},
'agent_type': constants.AGENT_TYPE_OVS, 'agent_type': constants.AGENT_TYPE_OVS,
'tunnel_type': [], 'tunnel_type': [],
'start_flag': True 'start_flag': True
} }
L2_AGENT_4 = {
'binary': 'neutron-openvswitch-agent',
'host': HOST + '_4',
'topic': constants.L2_AGENT_TOPIC,
'configurations': {'tunneling_ip': '20.0.0.4',
'tunnel_types': ['vxlan']},
'agent_type': constants.AGENT_TYPE_OVS,
'tunnel_type': [],
'start_flag': True
}
PLUGIN_NAME = 'neutron.plugins.ml2.plugin.Ml2Plugin' PLUGIN_NAME = 'neutron.plugins.ml2.plugin.Ml2Plugin'
NOTIFIER = 'neutron.plugins.ml2.rpc.AgentNotifierApi' NOTIFIER = 'neutron.plugins.ml2.rpc.AgentNotifierApi'
@ -137,6 +148,9 @@ class TestL2PopulationRpcTestCase(test_plugin.NeutronDbPluginV2TestCase):
callback.report_state(self.adminContext, callback.report_state(self.adminContext,
agent_state={'agent_state': L2_AGENT_3}, agent_state={'agent_state': L2_AGENT_3},
time=timeutils.strtime()) time=timeutils.strtime())
callback.report_state(self.adminContext,
agent_state={'agent_state': L2_AGENT_4},
time=timeutils.strtime())
def test_fdb_add_called(self): def test_fdb_add_called(self):
self._register_ml2_agents() self._register_ml2_agents()
@ -603,3 +617,109 @@ class TestL2PopulationRpcTestCase(test_plugin.NeutronDbPluginV2TestCase):
self.assertFalse(mock_fanout.called) self.assertFalse(mock_fanout.called)
fanout_patch.stop() fanout_patch.stop()
def test_host_changed(self):
self._register_ml2_agents()
with self.subnet(network=self._network) as subnet:
host_arg = {portbindings.HOST_ID: L2_AGENT['host']}
host2_arg = {portbindings.HOST_ID: L2_AGENT_2['host']}
with self.port(subnet=subnet, cidr='10.0.0.0/24',
arg_list=(portbindings.HOST_ID,),
**host_arg) as port1:
with self.port(subnet=subnet, cidr='10.0.0.0/24',
arg_list=(portbindings.HOST_ID,),
**host2_arg) as port2:
p1 = port1['port']
device1 = 'tap' + p1['id']
self.callbacks.update_device_up(
self.adminContext,
agent_id=L2_AGENT['host'],
device=device1)
p2 = port2['port']
device2 = 'tap' + p2['id']
self.callbacks.update_device_up(
self.adminContext,
agent_id=L2_AGENT_2['host'],
device=device2)
data2 = {'port': {'binding:host_id': L2_AGENT_2['host']}}
req = self.new_update_request('ports', data2, p1['id'])
res = self.deserialize(self.fmt,
req.get_response(self.api))
self.assertEqual(res['port']['binding:host_id'],
L2_AGENT_2['host'])
self.mock_fanout.reset_mock()
self.callbacks.get_device_details(
self.adminContext,
device=device1,
agent_id=L2_AGENT_2['host'])
p1_ips = [p['ip_address'] for p in p1['fixed_ips']]
expected = {'args':
{'fdb_entries':
{p1['network_id']:
{'ports':
{'20.0.0.1': [constants.FLOODING_ENTRY,
[p1['mac_address'],
p1_ips[0]]]},
'network_type': 'vxlan',
'segment_id': 1}}},
'namespace': None,
'method': 'remove_fdb_entries'}
self.mock_fanout.assert_called_with(
mock.ANY, expected, topic=self.fanout_topic)
def test_host_changed_twice(self):
self._register_ml2_agents()
with self.subnet(network=self._network) as subnet:
host_arg = {portbindings.HOST_ID: L2_AGENT['host']}
host2_arg = {portbindings.HOST_ID: L2_AGENT_2['host']}
with self.port(subnet=subnet, cidr='10.0.0.0/24',
arg_list=(portbindings.HOST_ID,),
**host_arg) as port1:
with self.port(subnet=subnet, cidr='10.0.0.0/24',
arg_list=(portbindings.HOST_ID,),
**host2_arg) as port2:
p1 = port1['port']
device1 = 'tap' + p1['id']
self.callbacks.update_device_up(
self.adminContext,
agent_id=L2_AGENT['host'],
device=device1)
p2 = port2['port']
device2 = 'tap' + p2['id']
self.callbacks.update_device_up(
self.adminContext,
agent_id=L2_AGENT_2['host'],
device=device2)
data2 = {'port': {'binding:host_id': L2_AGENT_2['host']}}
req = self.new_update_request('ports', data2, p1['id'])
res = self.deserialize(self.fmt,
req.get_response(self.api))
self.assertEqual(res['port']['binding:host_id'],
L2_AGENT_2['host'])
data4 = {'port': {'binding:host_id': L2_AGENT_4['host']}}
req = self.new_update_request('ports', data4, p1['id'])
res = self.deserialize(self.fmt,
req.get_response(self.api))
self.assertEqual(res['port']['binding:host_id'],
L2_AGENT_4['host'])
self.mock_fanout.reset_mock()
self.callbacks.get_device_details(
self.adminContext,
device=device1,
agent_id=L2_AGENT_4['host'])
p1_ips = [p['ip_address'] for p in p1['fixed_ips']]
expected = {'args':
{'fdb_entries':
{p1['network_id']:
{'ports':
{'20.0.0.1': [constants.FLOODING_ENTRY,
[p1['mac_address'],
p1_ips[0]]]},
'network_type': 'vxlan',
'segment_id': 1}}},
'namespace': None,
'method': 'remove_fdb_entries'}
self.mock_fanout.assert_called_with(
mock.ANY, expected, topic=self.fanout_topic)

View File

@ -57,8 +57,9 @@ class PortBindingTestCase(test_plugin.NeutronDbPluginV2TestCase):
self._check_response(port['port'], vif_type, has_port_filter, self._check_response(port['port'], vif_type, has_port_filter,
bound) bound)
port_id = port['port']['id'] port_id = port['port']['id']
neutron_context = context.get_admin_context()
details = self.plugin.callbacks.get_device_details( details = self.plugin.callbacks.get_device_details(
None, agent_id="theAgentId", device=port_id) neutron_context, agent_id="theAgentId", device=port_id)
if bound: if bound:
self.assertEqual(details['network_type'], 'local') self.assertEqual(details['network_type'], 'local')
else: else: