Merge "Race for l2pop when ports go up/down on same host"

This commit is contained in:
Jenkins 2014-10-15 03:39:59 +00:00 committed by Gerrit Code Review
commit 5d6b619f08
3 changed files with 29 additions and 28 deletions

View File

@ -38,32 +38,18 @@ class L2populationMechanismDriver(api.MechanismDriver,
LOG.debug(_("Experimental L2 population driver")) LOG.debug(_("Experimental L2 population driver"))
self.rpc_ctx = n_context.get_admin_context_without_session() self.rpc_ctx = n_context.get_admin_context_without_session()
self.migrated_ports = {} self.migrated_ports = {}
self.remove_fdb_entries = {}
def _get_port_fdb_entries(self, port): def _get_port_fdb_entries(self, port):
return [[port['mac_address'], return [[port['mac_address'],
ip['ip_address']] for ip in port['fixed_ips']] ip['ip_address']] for ip in port['fixed_ips']]
def delete_port_precommit(self, context):
port = context.current
agent_host = context.host
if port['id'] not in self.remove_fdb_entries:
self.remove_fdb_entries[port['id']] = {}
self.remove_fdb_entries[port['id']][agent_host] = (
self._update_port_down(context, port, agent_host, 1))
def delete_port_postcommit(self, context): def delete_port_postcommit(self, context):
port = context.current port = context.current
agent_host = context.host agent_host = context.host
if port['id'] in self.remove_fdb_entries:
for agent_host in list(self.remove_fdb_entries[port['id']]): fdb_entries = self._update_port_down(context, port, agent_host)
self.L2populationAgentNotify.remove_fdb_entries( self.L2populationAgentNotify.remove_fdb_entries(self.rpc_ctx,
self.rpc_ctx, fdb_entries)
self.remove_fdb_entries[port['id']][agent_host])
self.remove_fdb_entries[port['id']].pop(agent_host, 0)
self.remove_fdb_entries.pop(port['id'], 0)
def _get_diff_ips(self, orig, port): def _get_diff_ips(self, orig, port):
orig_ips = set([ip['ip_address'] for ip in orig['fixed_ips']]) orig_ips = set([ip['ip_address'] for ip in orig['fixed_ips']])
@ -260,8 +246,7 @@ class L2populationMechanismDriver(api.MechanismDriver,
self.L2populationAgentNotify.add_fdb_entries(self.rpc_ctx, self.L2populationAgentNotify.add_fdb_entries(self.rpc_ctx,
other_fdb_entries) other_fdb_entries)
def _update_port_down(self, context, port, agent_host, def _update_port_down(self, context, port, agent_host):
agent_active_ports_count_for_flooding=0):
port_infos = self._get_port_infos(context, port, agent_host) port_infos = self._get_port_infos(context, port, agent_host)
if not port_infos: if not port_infos:
return return
@ -277,7 +262,7 @@ class L2populationMechanismDriver(api.MechanismDriver,
{'segment_id': segment['segmentation_id'], {'segment_id': segment['segmentation_id'],
'network_type': segment['network_type'], 'network_type': segment['network_type'],
'ports': {agent_ip: []}}} 'ports': {agent_ip: []}}}
if agent_active_ports == agent_active_ports_count_for_flooding: if agent_active_ports == 0:
# Agent is removing its last activated port in this network, # Agent is removing its last activated port in this network,
# other agents needs to be notified to delete their flooding entry. # other agents needs to be notified to delete their flooding entry.
other_fdb_entries[network_id]['ports'][agent_ip].append( other_fdb_entries[network_id]['ports'][agent_ip].append(

View File

@ -990,7 +990,7 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
port = self._make_port_dict(port_db) port = self._make_port_dict(port_db)
network = self.get_network(context, port['network_id']) network = self.get_network(context, port['network_id'])
mech_context = None bound_mech_contexts = []
device_owner = port['device_owner'] device_owner = port['device_owner']
if device_owner == const.DEVICE_OWNER_DVR_INTERFACE: if device_owner == const.DEVICE_OWNER_DVR_INTERFACE:
bindings = db.get_dvr_port_bindings(context.session, id) bindings = db.get_dvr_port_bindings(context.session, id)
@ -998,6 +998,7 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
mech_context = driver_context.DvrPortContext( mech_context = driver_context.DvrPortContext(
self, context, port, network, bind) self, context, port, network, bind)
self.mechanism_manager.delete_port_precommit(mech_context) self.mechanism_manager.delete_port_precommit(mech_context)
bound_mech_contexts.append(mech_context)
else: else:
mech_context = driver_context.PortContext(self, context, port, mech_context = driver_context.PortContext(self, context, port,
network, binding) network, binding)
@ -1005,6 +1006,7 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
router_info = l3plugin.dvr_deletens_if_no_port(context, id) router_info = l3plugin.dvr_deletens_if_no_port(context, id)
removed_routers += router_info removed_routers += router_info
self.mechanism_manager.delete_port_precommit(mech_context) self.mechanism_manager.delete_port_precommit(mech_context)
bound_mech_contexts.append(mech_context)
self._delete_port_security_group_bindings(context, id) self._delete_port_security_group_bindings(context, id)
if l3plugin: if l3plugin:
router_ids = l3plugin.disassociate_floatingips( router_ids = l3plugin.disassociate_floatingips(
@ -1029,12 +1031,10 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
{'id': router['router_id'], {'id': router['router_id'],
'agent': router['agent_id']}) 'agent': router['agent_id']})
try: try:
# for both normal and DVR Interface ports, only one invocation of # Note that DVR Interface ports will have bindings on
# delete_port_postcommit. We use gather/scatter technique for DVR # multiple hosts, and so will have multiple mech_contexts,
# interface ports, where the bindings are gathered in # while other ports typically have just one.
# delete_port_precommit() call earlier and scattered as l2pop for mech_context in bound_mech_contexts:
# rules to cloud nodes in delete_port_postcommit() here
if mech_context:
self.mechanism_manager.delete_port_postcommit(mech_context) self.mechanism_manager.delete_port_postcommit(mech_context)
except ml2_exc.MechanismDriverError: except ml2_exc.MechanismDriverError:
# TODO(apech) - One or more mechanism driver failed to # TODO(apech) - One or more mechanism driver failed to

View File

@ -13,6 +13,7 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import contextlib
import mock import mock
from neutron.common import constants from neutron.common import constants
@ -24,6 +25,7 @@ from neutron.extensions import providernet as pnet
from neutron import manager from neutron import manager
from neutron.openstack.common import timeutils from neutron.openstack.common import timeutils
from neutron.plugins.ml2 import config as config from neutron.plugins.ml2 import config as config
from neutron.plugins.ml2.drivers.l2pop import mech_driver as l2pop_mech_driver
from neutron.plugins.ml2 import managers from neutron.plugins.ml2 import managers
from neutron.plugins.ml2 import rpc from neutron.plugins.ml2 import rpc
from neutron.tests.unit import test_db_plugin as test_plugin from neutron.tests.unit import test_db_plugin as test_plugin
@ -794,3 +796,17 @@ class TestL2PopulationRpcTestCase(test_plugin.NeutronDbPluginV2TestCase):
self.mock_fanout.assert_called_with( self.mock_fanout.assert_called_with(
mock.ANY, expected, topic=self.fanout_topic) mock.ANY, expected, topic=self.fanout_topic)
def test_delete_port_invokes_update_device_down(self):
l2pop_mech = l2pop_mech_driver.L2populationMechanismDriver()
l2pop_mech.L2PopulationAgentNotify = mock.Mock()
l2pop_mech.rpc_ctx = mock.Mock()
with contextlib.nested(
mock.patch.object(l2pop_mech,
'_update_port_down',
return_value=None),
mock.patch.object(l2pop_mech.L2PopulationAgentNotify,
'remove_fdb_entries')) as (upd_port_down,
rem_fdb_entries):
l2pop_mech.delete_port_postcommit(mock.Mock())
self.assertTrue(upd_port_down.called)