Fix l2 pop doesn't propagate ip address updates

Propagates ip address changes when an ip address is :
added, removed, or changed.
Add a new rpc call for the updates of forwarding informations.

Fixes: Bug #1234137
Change-Id: Ib5b971bd02f20a0ea73f88ce9685e944226bb5a2
This commit is contained in:
Sylvain Afchain 2013-09-26 16:57:58 +02:00
parent cc0ed67c01
commit 40779c5b0f
7 changed files with 233 additions and 45 deletions

View File

@ -37,6 +37,11 @@ class L2populationRpcCallBackMixin(object):
if not host or host == cfg.CONF.host:
self.fdb_remove(context, fdb_entries)
@log.log
def update_fdb_entries(self, context, fdb_entries, host=None):
if not host or host == cfg.CONF.host:
self.fdb_update(context, fdb_entries)
@abc.abstractmethod
def fdb_add(self, context, fdb_entries):
pass
@ -44,3 +49,7 @@ class L2populationRpcCallBackMixin(object):
@abc.abstractmethod
def fdb_remove(self, context, fdb_entries):
pass
@abc.abstractmethod
def fdb_update(self, context, fdb_entries):
pass

View File

@ -714,6 +714,40 @@ class LinuxBridgeRpcCallbacks(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
ports,
interface)
def _fdb_chg_ip(self, context, fdb_entries):
LOG.debug(_("update chg_ip received"))
for network_id, agent_ports in fdb_entries.items():
segment = self.agent.br_mgr.network_map.get(network_id)
if not segment:
return
if segment.network_type != lconst.TYPE_VXLAN:
return
interface = self.agent.br_mgr.get_vxlan_device_name(
segment.segmentation_id)
for agent_ip, state in agent_ports.items():
if agent_ip == self.agent.br_mgr.local_ip:
continue
after = state.get('after')
for mac, ip in after:
self.agent.br_mgr.add_fdb_ip_entry(mac, ip, interface)
before = state.get('before')
for mac, ip in before:
self.agent.br_mgr.remove_fdb_ip_entry(mac, ip, interface)
def fdb_update(self, context, fdb_entries):
LOG.debug(_("fdb_update received"))
for action, values in fdb_entries.items():
method = '_fdb_' + action
if not hasattr(self, method):
raise NotImplementedError()
getattr(self, method)(context, values)
def create_rpc_dispatcher(self):
'''Get the rpc dispatcher for this manager.

View File

@ -36,6 +36,7 @@ class L2populationMechanismDriver(api.MechanismDriver,
def initialize(self):
LOG.debug(_("Experimental L2 population driver"))
self.rpc_ctx = n_context.get_admin_context_without_session()
def _get_port_fdb_entries(self, port):
return [[port['mac_address'],
@ -45,31 +46,64 @@ class L2populationMechanismDriver(api.MechanismDriver,
self.remove_fdb_entries = self._update_port_down(context)
def delete_port_postcommit(self, context):
self._notify_remove_fdb_entries(context,
self.remove_fdb_entries)
def _notify_remove_fdb_entries(self, context, fdb_entries):
rpc_ctx = n_context.get_admin_context_without_session()
l2pop_rpc.L2populationAgentNotify.remove_fdb_entries(
rpc_ctx, fdb_entries)
self.rpc_ctx, self.remove_fdb_entries)
def _get_diff_ips(self, orig, port):
orig_ips = set([ip['ip_address'] for ip in orig['fixed_ips']])
port_ips = set([ip['ip_address'] for ip in port['fixed_ips']])
# check if an ip has been added or removed
orig_chg_ips = orig_ips.difference(port_ips)
port_chg_ips = port_ips.difference(orig_ips)
if orig_chg_ips or port_chg_ips:
return orig_chg_ips, port_chg_ips
def _fixed_ips_changed(self, context, orig, port):
diff_ips = self._get_diff_ips(orig, port)
if not diff_ips:
return
orig_ips, port_ips = diff_ips
port_infos = self._get_port_infos(context, orig)
if not port_infos:
return
agent, agent_ip, segment, port_fdb_entries = port_infos
orig_mac_ip = [[port['mac_address'], ip] for ip in orig_ips]
port_mac_ip = [[port['mac_address'], ip] for ip in port_ips]
upd_fdb_entries = {port['network_id']: {agent_ip: {}}}
ports = upd_fdb_entries[port['network_id']][agent_ip]
if orig_mac_ip:
ports['before'] = orig_mac_ip
if port_mac_ip:
ports['after'] = port_mac_ip
l2pop_rpc.L2populationAgentNotify.update_fdb_entries(
self.rpc_ctx, {'chg_ip': upd_fdb_entries})
return True
def update_port_postcommit(self, context):
port = context.current
orig = context.original
if port['status'] == orig['status']:
return
self._fixed_ips_changed(context, orig, port)
if port['status'] == const.PORT_STATUS_ACTIVE:
self._update_port_up(context)
elif port['status'] == const.PORT_STATUS_DOWN:
fdb_entries = self._update_port_down(context)
self._notify_remove_fdb_entries(context, fdb_entries)
l2pop_rpc.L2populationAgentNotify.remove_fdb_entries(
self.rpc_ctx, fdb_entries)
def _update_port_up(self, context):
port_context = context.current
network_id = port_context['network_id']
agent_host = port_context['binding:host_id']
def _get_port_infos(self, context, port):
agent_host = port['binding:host_id']
if not agent_host:
return
@ -80,26 +114,39 @@ class L2populationMechanismDriver(api.MechanismDriver,
agent_ip = self.get_agent_ip(agent)
if not agent_ip:
LOG.warning(_("Unable to retrieve the tunelling ip of agent %s"),
agent_host)
LOG.warning(_("Unable to retrieve the agent ip, check the agent "
"configuration."))
return
segment = context.bound_segment
if not segment:
LOG.warning(_("Port %(port)s updated by agent %(agent)s "
"isn't bound to any segment"),
{'port': port_context['id'], 'agent': agent.host})
{'port': port['id'], 'agent': agent})
return
tunnel_types = self.get_agent_tunnel_types(agent)
if segment['network_type'] not in tunnel_types:
return
fdb_entries = self._get_port_fdb_entries(port)
return agent, agent_ip, segment, fdb_entries
def _update_port_up(self, context):
port_context = context.current
port_infos = self._get_port_infos(context, port_context)
if not port_infos:
return
agent, agent_ip, segment, port_fdb_entries = port_infos
agent_host = port_context['binding:host_id']
network_id = port_context['network_id']
session = db_api.get_session()
agent_ports = self.get_agent_network_port_count(session, agent_host,
network_id)
rpc_ctx = n_context.get_admin_context_without_session()
other_fdb_entries = {network_id:
{'segment_id': segment['segmentation_id'],
'network_type': segment['network_type'],
@ -138,45 +185,25 @@ class L2populationMechanismDriver(api.MechanismDriver,
if ports.keys():
l2pop_rpc.L2populationAgentNotify.add_fdb_entries(
rpc_ctx, agent_fdb_entries, agent_host)
self.rpc_ctx, agent_fdb_entries, agent_host)
# Notify other agents to add fdb rule for current port
fdb_entries = self._get_port_fdb_entries(port_context)
other_fdb_entries[network_id]['ports'][agent_ip] += fdb_entries
other_fdb_entries[network_id]['ports'][agent_ip] += port_fdb_entries
l2pop_rpc.L2populationAgentNotify.add_fdb_entries(rpc_ctx,
l2pop_rpc.L2populationAgentNotify.add_fdb_entries(self.rpc_ctx,
other_fdb_entries)
def _update_port_down(self, context):
port_context = context.current
network_id = port_context['network_id']
port_infos = self._get_port_infos(context, port_context)
if not port_infos:
return
agent, agent_ip, segment, port_fdb_entries = port_infos
agent_host = port_context['binding:host_id']
if not agent_host:
return
network_id = port_context['network_id']
session = db_api.get_session()
agent = self.get_agent_by_host(session, agent_host)
if not agent:
return
agent_ip = self.get_agent_ip(agent)
if not agent_ip:
LOG.warning(_("Unable to retrieve the agent ip, check the agent "
"configuration."))
return
segment = context.bound_segment
if not segment:
LOG.warning(_("Port %(port)s updated by agent %(agent)s "
"isn't bound to any segment"),
{'port': port_context['id'], 'agent': agent})
return
tunnel_types = self.get_agent_tunnel_types(agent)
if segment['network_type'] not in tunnel_types:
return
agent_ports = self.get_agent_network_port_count(session, agent_host,
network_id)

View File

@ -76,4 +76,13 @@ class L2populationAgentNotifyAPI(proxy.RpcProxy):
self._notification_fanout(context, 'remove_fdb_entries',
fdb_entries)
def update_fdb_entries(self, context, fdb_entries, host=None):
if fdb_entries:
if host:
self._notification_host(context, 'update_fdb_entries',
fdb_entries, host)
else:
self._notification_fanout(context, 'update_fdb_entries',
fdb_entries)
L2populationAgentNotify = L2populationAgentNotifyAPI()

View File

@ -412,6 +412,15 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
dl_vlan=lvm.vlan,
dl_dst=port_info[0])
def fdb_update(self, context, fdb_entries):
LOG.debug(_("fdb_update received"))
for action, values in fdb_entries.items():
method = '_fdb_' + action
if not hasattr(self, method):
raise NotImplementedError()
getattr(self, method)(context, values)
def create_rpc_dispatcher(self):
'''Get the rpc dispatcher for this manager.

View File

@ -878,3 +878,26 @@ class TestLinuxBridgeRpcCallbacks(base.BaseTestCase):
check_exit_code=False),
]
execute_fn.assert_has_calls(expected)
def test_fdb_update_chg_ip(self):
fdb_entries = {'chg_ip':
{'net_id':
{'agent_ip':
{'before': [['port_mac', 'port_ip_1']],
'after': [['port_mac', 'port_ip_2']]}}}}
with mock.patch.object(utils, 'execute',
return_value='') as execute_fn:
self.lb_rpc.fdb_update(None, fdb_entries)
expected = [
mock.call(['ip', 'neigh', 'add', 'port_ip_2', 'lladdr',
'port_mac', 'dev', 'vxlan-1', 'nud', 'permanent'],
root_helper=self.root_helper,
check_exit_code=False),
mock.call(['ip', 'neigh', 'del', 'port_ip_1', 'lladdr',
'port_mac', 'dev', 'vxlan-1'],
root_helper=self.root_helper,
check_exit_code=False)
]
execute_fn.assert_has_calls(expected)

View File

@ -406,3 +406,80 @@ class TestL2PopulationRpcTestCase(test_plugin.NeutronDbPluginV2TestCase):
self.mock_fanout.assert_any_call(
mock.ANY, expected, topic=self.fanout_topic)
def test_fixed_ips_changed(self):
self._register_ml2_agents()
with self.subnet(network=self._network) as subnet:
host_arg = {portbindings.HOST_ID: HOST}
with self.port(subnet=subnet, cidr='10.0.0.0/24',
arg_list=(portbindings.HOST_ID,),
**host_arg) as port1:
p1 = port1['port']
self.mock_fanout.reset_mock()
data = {'port': {'fixed_ips': [{'ip_address': '10.0.0.2'},
{'ip_address': '10.0.0.10'}]}}
req = self.new_update_request('ports', data, p1['id'])
res = self.deserialize(self.fmt, req.get_response(self.api))
ips = res['port']['fixed_ips']
self.assertEqual(len(ips), 2)
add_expected = {'args':
{'fdb_entries':
{'chg_ip':
{p1['network_id']:
{'20.0.0.1':
{'after': [[p1['mac_address'],
'10.0.0.10']]}}}}},
'namespace': None,
'method': 'update_fdb_entries'}
self.mock_fanout.assert_any_call(
mock.ANY, add_expected, topic=self.fanout_topic)
self.mock_fanout.reset_mock()
data = {'port': {'fixed_ips': [{'ip_address': '10.0.0.2'},
{'ip_address': '10.0.0.16'}]}}
req = self.new_update_request('ports', data, p1['id'])
res = self.deserialize(self.fmt, req.get_response(self.api))
ips = res['port']['fixed_ips']
self.assertEqual(len(ips), 2)
upd_expected = {'args':
{'fdb_entries':
{'chg_ip':
{p1['network_id']:
{'20.0.0.1':
{'before': [[p1['mac_address'],
'10.0.0.10']],
'after': [[p1['mac_address'],
'10.0.0.16']]}}}}},
'namespace': None,
'method': 'update_fdb_entries'}
self.mock_fanout.assert_any_call(
mock.ANY, upd_expected, topic=self.fanout_topic)
self.mock_fanout.reset_mock()
data = {'port': {'fixed_ips': [{'ip_address': '10.0.0.16'}]}}
req = self.new_update_request('ports', data, p1['id'])
res = self.deserialize(self.fmt, req.get_response(self.api))
ips = res['port']['fixed_ips']
self.assertEqual(len(ips), 1)
del_expected = {'args':
{'fdb_entries':
{'chg_ip':
{p1['network_id']:
{'20.0.0.1':
{'before': [[p1['mac_address'],
'10.0.0.2']]}}}}},
'namespace': None,
'method': 'update_fdb_entries'}
self.mock_fanout.assert_any_call(
mock.ANY, del_expected, topic=self.fanout_topic)