Merge "Remove RPC notification from transaction in create/update port"
This commit is contained in:
commit
5c5dc82868
@ -156,7 +156,27 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
|
|||||||
# TODO(rkukura): Implement filtering.
|
# TODO(rkukura): Implement filtering.
|
||||||
return nets
|
return nets
|
||||||
|
|
||||||
def _process_port_binding(self, mech_context, context, attrs):
|
def _notify_l3_agent_new_port(self, context, port):
|
||||||
|
if not port:
|
||||||
|
return
|
||||||
|
|
||||||
|
# Whenever a DVR serviceable port comes up on a
|
||||||
|
# node, it has to be communicated to the L3 Plugin
|
||||||
|
# and agent for creating the respective namespaces.
|
||||||
|
if (utils.is_dvr_serviced(port['device_owner'])):
|
||||||
|
l3plugin = manager.NeutronManager.get_service_plugins().get(
|
||||||
|
service_constants.L3_ROUTER_NAT)
|
||||||
|
if (utils.is_extension_supported(
|
||||||
|
l3plugin, const.L3_DISTRIBUTED_EXT_ALIAS)):
|
||||||
|
l3plugin.dvr_update_router_addvm(context, port)
|
||||||
|
|
||||||
|
def _get_host_port_if_changed(self, mech_context, attrs):
|
||||||
|
binding = mech_context._binding
|
||||||
|
host = attrs and attrs.get(portbindings.HOST_ID)
|
||||||
|
if (attributes.is_attr_set(host) and binding.host != host):
|
||||||
|
return mech_context.current
|
||||||
|
|
||||||
|
def _process_port_binding(self, mech_context, attrs):
|
||||||
binding = mech_context._binding
|
binding = mech_context._binding
|
||||||
port = mech_context.current
|
port = mech_context.current
|
||||||
changes = False
|
changes = False
|
||||||
@ -166,15 +186,6 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
|
|||||||
binding.host != host):
|
binding.host != host):
|
||||||
binding.host = host
|
binding.host = host
|
||||||
changes = True
|
changes = True
|
||||||
# Whenever a DVR serviceable port comes up on a
|
|
||||||
# node, it has to be communicated to the L3 Plugin
|
|
||||||
# and agent for creating the respective namespaces.
|
|
||||||
if (utils.is_dvr_serviced(port['device_owner'])):
|
|
||||||
l3plugin = manager.NeutronManager.get_service_plugins().get(
|
|
||||||
service_constants.L3_ROUTER_NAT)
|
|
||||||
if (utils.is_extension_supported(
|
|
||||||
l3plugin, const.L3_DISTRIBUTED_EXT_ALIAS)):
|
|
||||||
l3plugin.dvr_update_router_addvm(context, port)
|
|
||||||
|
|
||||||
vnic_type = attrs and attrs.get(portbindings.VNIC_TYPE)
|
vnic_type = attrs and attrs.get(portbindings.VNIC_TYPE)
|
||||||
if (attributes.is_attr_set(vnic_type) and
|
if (attributes.is_attr_set(vnic_type) and
|
||||||
@ -767,7 +778,8 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
|
|||||||
binding = db.add_port_binding(session, result['id'])
|
binding = db.add_port_binding(session, result['id'])
|
||||||
mech_context = driver_context.PortContext(self, context, result,
|
mech_context = driver_context.PortContext(self, context, result,
|
||||||
network, binding)
|
network, binding)
|
||||||
self._process_port_binding(mech_context, context, attrs)
|
new_host_port = self._get_host_port_if_changed(mech_context, attrs)
|
||||||
|
self._process_port_binding(mech_context, attrs)
|
||||||
|
|
||||||
result[addr_pair.ADDRESS_PAIRS] = (
|
result[addr_pair.ADDRESS_PAIRS] = (
|
||||||
self._process_create_allowed_address_pairs(
|
self._process_create_allowed_address_pairs(
|
||||||
@ -777,6 +789,9 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
|
|||||||
dhcp_opts)
|
dhcp_opts)
|
||||||
self.mechanism_manager.create_port_precommit(mech_context)
|
self.mechanism_manager.create_port_precommit(mech_context)
|
||||||
|
|
||||||
|
# Notification must be sent after the above transaction is complete
|
||||||
|
self._notify_l3_agent_new_port(context, new_host_port)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
self.mechanism_manager.create_port_postcommit(mech_context)
|
self.mechanism_manager.create_port_postcommit(mech_context)
|
||||||
except ml2_exc.MechanismDriverError:
|
except ml2_exc.MechanismDriverError:
|
||||||
@ -831,10 +846,14 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
|
|||||||
mech_context = driver_context.PortContext(
|
mech_context = driver_context.PortContext(
|
||||||
self, context, updated_port, network, binding,
|
self, context, updated_port, network, binding,
|
||||||
original_port=original_port)
|
original_port=original_port)
|
||||||
|
new_host_port = self._get_host_port_if_changed(mech_context, attrs)
|
||||||
need_port_update_notify |= self._process_port_binding(
|
need_port_update_notify |= self._process_port_binding(
|
||||||
mech_context, context, attrs)
|
mech_context, attrs)
|
||||||
self.mechanism_manager.update_port_precommit(mech_context)
|
self.mechanism_manager.update_port_precommit(mech_context)
|
||||||
|
|
||||||
|
# Notification must be sent after the above transaction is complete
|
||||||
|
self._notify_l3_agent_new_port(context, new_host_port)
|
||||||
|
|
||||||
# TODO(apech) - handle errors raised by update_port, potentially
|
# TODO(apech) - handle errors raised by update_port, potentially
|
||||||
# by re-calling update_port with the previous attributes. For
|
# by re-calling update_port with the previous attributes. For
|
||||||
# now the error is propogated to the caller, which is expected to
|
# now the error is propogated to the caller, which is expected to
|
||||||
|
@ -23,6 +23,7 @@ from neutron.common import constants
|
|||||||
from neutron.common import exceptions as exc
|
from neutron.common import exceptions as exc
|
||||||
from neutron.common import utils
|
from neutron.common import utils
|
||||||
from neutron import context
|
from neutron import context
|
||||||
|
from neutron.db import db_base_plugin_v2 as base_plugin
|
||||||
from neutron.extensions import multiprovidernet as mpnet
|
from neutron.extensions import multiprovidernet as mpnet
|
||||||
from neutron.extensions import portbindings
|
from neutron.extensions import portbindings
|
||||||
from neutron.extensions import providernet as pnet
|
from neutron.extensions import providernet as pnet
|
||||||
@ -36,6 +37,7 @@ from neutron.plugins.ml2 import driver_context
|
|||||||
from neutron.plugins.ml2.drivers import type_vlan
|
from neutron.plugins.ml2.drivers import type_vlan
|
||||||
from neutron.plugins.ml2 import models
|
from neutron.plugins.ml2 import models
|
||||||
from neutron.plugins.ml2 import plugin as ml2_plugin
|
from neutron.plugins.ml2 import plugin as ml2_plugin
|
||||||
|
from neutron.tests import base
|
||||||
from neutron.tests.unit import _test_extension_portbindings as test_bindings
|
from neutron.tests.unit import _test_extension_portbindings as test_bindings
|
||||||
from neutron.tests.unit.ml2.drivers import mechanism_logger as mech_logger
|
from neutron.tests.unit.ml2.drivers import mechanism_logger as mech_logger
|
||||||
from neutron.tests.unit.ml2.drivers import mechanism_test as mech_test
|
from neutron.tests.unit.ml2.drivers import mechanism_test as mech_test
|
||||||
@ -942,3 +944,60 @@ class TestFaultyMechansimDriver(Ml2PluginV2FaultyDriverTestCase):
|
|||||||
self.assertEqual(new_name, port['port']['name'])
|
self.assertEqual(new_name, port['port']['name'])
|
||||||
|
|
||||||
self._delete('ports', port['port']['id'])
|
self._delete('ports', port['port']['id'])
|
||||||
|
|
||||||
|
|
||||||
|
class TestMl2PluginCreateUpdatePort(base.BaseTestCase):
|
||||||
|
def setUp(self):
|
||||||
|
super(TestMl2PluginCreateUpdatePort, self).setUp()
|
||||||
|
self.context = mock.MagicMock()
|
||||||
|
|
||||||
|
def _ensure_transaction_is_closed(self):
|
||||||
|
transaction = self.context.session.begin(subtransactions=True)
|
||||||
|
enter = transaction.__enter__.call_count
|
||||||
|
exit = transaction.__exit__.call_count
|
||||||
|
self.assertEqual(enter, exit)
|
||||||
|
|
||||||
|
def _create_plugin_for_create_update_port(self, new_host_port):
|
||||||
|
plugin = ml2_plugin.Ml2Plugin()
|
||||||
|
plugin.extension_manager = mock.Mock()
|
||||||
|
plugin.type_manager = mock.Mock()
|
||||||
|
plugin.mechanism_manager = mock.Mock()
|
||||||
|
plugin.notifier = mock.Mock()
|
||||||
|
plugin._get_host_port_if_changed = mock.Mock(
|
||||||
|
return_value=new_host_port)
|
||||||
|
|
||||||
|
plugin._notify_l3_agent_new_port = mock.Mock()
|
||||||
|
plugin._notify_l3_agent_new_port.side_effect = (
|
||||||
|
lambda c, p: self._ensure_transaction_is_closed())
|
||||||
|
|
||||||
|
return plugin
|
||||||
|
|
||||||
|
def test_create_port_rpc_outside_transaction(self):
|
||||||
|
with contextlib.nested(
|
||||||
|
mock.patch.object(ml2_plugin.Ml2Plugin, '__init__'),
|
||||||
|
mock.patch.object(base_plugin.NeutronDbPluginV2, 'create_port'),
|
||||||
|
) as (init, super_create_port):
|
||||||
|
init.return_value = None
|
||||||
|
|
||||||
|
new_host_port = mock.Mock()
|
||||||
|
plugin = self._create_plugin_for_create_update_port(new_host_port)
|
||||||
|
|
||||||
|
plugin.create_port(self.context, mock.MagicMock())
|
||||||
|
|
||||||
|
plugin._notify_l3_agent_new_port.assert_called_once_with(
|
||||||
|
self.context, new_host_port)
|
||||||
|
|
||||||
|
def test_update_port_rpc_outside_transaction(self):
|
||||||
|
with contextlib.nested(
|
||||||
|
mock.patch.object(ml2_plugin.Ml2Plugin, '__init__'),
|
||||||
|
mock.patch.object(base_plugin.NeutronDbPluginV2, 'update_port'),
|
||||||
|
) as (init, super_update_port):
|
||||||
|
init.return_value = None
|
||||||
|
|
||||||
|
new_host_port = mock.Mock()
|
||||||
|
plugin = self._create_plugin_for_create_update_port(new_host_port)
|
||||||
|
|
||||||
|
plugin.update_port(self.context, 'fake_id', mock.MagicMock())
|
||||||
|
|
||||||
|
plugin._notify_l3_agent_new_port.assert_called_once_with(
|
||||||
|
self.context, new_host_port)
|
||||||
|
Loading…
Reference in New Issue
Block a user