Improve ovs and linuxbridge agents rpc exception handling
Fixes bug 1053497 Change-Id: Id946542b204cf75586224f3749b0007cc2d6b3a7
This commit is contained in:
parent
cf9a0c7585
commit
6e0f9c1b9f
@ -41,6 +41,7 @@ from quantum.common import utils as q_utils
|
||||
from quantum import context
|
||||
from quantum.openstack.common import log as logging
|
||||
from quantum.openstack.common import loopingcall
|
||||
from quantum.openstack.common.rpc import common as rpc_common
|
||||
from quantum.openstack.common.rpc import dispatcher
|
||||
from quantum.plugins.linuxbridge.common import config # noqa
|
||||
from quantum.plugins.linuxbridge.common import constants as lconst
|
||||
@ -437,35 +438,38 @@ class LinuxBridgeRpcCallbacks(sg_rpc.SecurityGroupAgentRpcCallbackMixin):
|
||||
|
||||
if 'security_groups' in port:
|
||||
self.sg_agent.refresh_firewall()
|
||||
|
||||
if port['admin_state_up']:
|
||||
network_type = kwargs.get('network_type')
|
||||
if network_type:
|
||||
segmentation_id = kwargs.get('segmentation_id')
|
||||
try:
|
||||
if port['admin_state_up']:
|
||||
network_type = kwargs.get('network_type')
|
||||
if network_type:
|
||||
segmentation_id = kwargs.get('segmentation_id')
|
||||
else:
|
||||
# compatibility with pre-Havana RPC vlan_id encoding
|
||||
vlan_id = kwargs.get('vlan_id')
|
||||
(network_type,
|
||||
segmentation_id) = lconst.interpret_vlan_id(vlan_id)
|
||||
physical_network = kwargs.get('physical_network')
|
||||
# create the networking for the port
|
||||
self.agent.br_mgr.add_interface(port['network_id'],
|
||||
network_type,
|
||||
physical_network,
|
||||
segmentation_id,
|
||||
port['id'])
|
||||
# update plugin about port status
|
||||
self.agent.plugin_rpc.update_device_up(self.context,
|
||||
tap_device_name,
|
||||
self.agent.agent_id)
|
||||
else:
|
||||
# compatibility with pre-Havana RPC vlan_id encoding
|
||||
vlan_id = kwargs.get('vlan_id')
|
||||
(network_type,
|
||||
segmentation_id) = lconst.interpret_vlan_id(vlan_id)
|
||||
physical_network = kwargs.get('physical_network')
|
||||
# create the networking for the port
|
||||
self.agent.br_mgr.add_interface(port['network_id'],
|
||||
network_type,
|
||||
physical_network,
|
||||
segmentation_id,
|
||||
port['id'])
|
||||
# update plugin about port status
|
||||
self.agent.plugin_rpc.update_device_up(self.context,
|
||||
tap_device_name,
|
||||
self.agent.agent_id)
|
||||
else:
|
||||
bridge_name = self.agent.br_mgr.get_bridge_name(
|
||||
port['network_id'])
|
||||
self.agent.br_mgr.remove_interface(bridge_name, tap_device_name)
|
||||
# update plugin about port status
|
||||
self.agent.plugin_rpc.update_device_down(self.context,
|
||||
tap_device_name,
|
||||
self.agent.agent_id)
|
||||
bridge_name = self.agent.br_mgr.get_bridge_name(
|
||||
port['network_id'])
|
||||
self.agent.br_mgr.remove_interface(bridge_name,
|
||||
tap_device_name)
|
||||
# update plugin about port status
|
||||
self.agent.plugin_rpc.update_device_down(self.context,
|
||||
tap_device_name,
|
||||
self.agent.agent_id)
|
||||
except rpc_common.Timeout:
|
||||
LOG.error(_("RPC timeout while updating port %s"), port['id'])
|
||||
|
||||
def create_rpc_dispatcher(self):
|
||||
'''Get the rpc dispatcher for this manager.
|
||||
|
@ -39,6 +39,7 @@ from quantum import context
|
||||
from quantum.extensions import securitygroup as ext_sg
|
||||
from quantum.openstack.common import log as logging
|
||||
from quantum.openstack.common import loopingcall
|
||||
from quantum.openstack.common.rpc import common as rpc_common
|
||||
from quantum.openstack.common.rpc import dispatcher
|
||||
from quantum.plugins.openvswitch.common import config # noqa
|
||||
from quantum.plugins.openvswitch.common import constants
|
||||
@ -261,14 +262,17 @@ class OVSQuantumAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin):
|
||||
self.treat_vif_port(vif_port, port['id'], port['network_id'],
|
||||
network_type, physical_network,
|
||||
segmentation_id, port['admin_state_up'])
|
||||
if port['admin_state_up']:
|
||||
# update plugin about port status
|
||||
self.plugin_rpc.update_device_up(self.context, port['id'],
|
||||
self.agent_id)
|
||||
else:
|
||||
# update plugin about port status
|
||||
self.plugin_rpc.update_device_down(self.context, port['id'],
|
||||
self.agent_id)
|
||||
try:
|
||||
if port['admin_state_up']:
|
||||
# update plugin about port status
|
||||
self.plugin_rpc.update_device_up(self.context, port['id'],
|
||||
self.agent_id)
|
||||
else:
|
||||
# update plugin about port status
|
||||
self.plugin_rpc.update_device_down(self.context, port['id'],
|
||||
self.agent_id)
|
||||
except rpc_common.Timeout:
|
||||
LOG.error(_("RPC timeout while updating port %s"), port['id'])
|
||||
|
||||
def tunnel_update(self, context, **kwargs):
|
||||
LOG.debug(_("tunnel_update received"))
|
||||
|
@ -23,6 +23,7 @@ import testtools
|
||||
|
||||
from quantum.agent.linux import ip_lib
|
||||
from quantum.agent.linux import utils
|
||||
from quantum.openstack.common.rpc import common as rpc_common
|
||||
from quantum.plugins.linuxbridge.agent import linuxbridge_quantum_agent
|
||||
from quantum.plugins.linuxbridge.common import constants as lconst
|
||||
from quantum.tests import base
|
||||
@ -574,3 +575,37 @@ class TestLinuxBridgeRpcCallbacks(base.BaseTestCase):
|
||||
"tap123",
|
||||
self.lb_rpc.agent.agent_id
|
||||
)
|
||||
|
||||
def test_port_update_plugin_rpc_failed(self):
|
||||
with contextlib.nested(
|
||||
mock.patch.object(self.lb_rpc.agent.br_mgr,
|
||||
"get_tap_device_name"),
|
||||
mock.patch.object(self.lb_rpc.agent.br_mgr,
|
||||
"udev_get_tap_devices"),
|
||||
mock.patch.object(self.lb_rpc.agent.br_mgr,
|
||||
"get_bridge_name"),
|
||||
mock.patch.object(self.lb_rpc.agent.br_mgr,
|
||||
"remove_interface"),
|
||||
mock.patch.object(self.lb_rpc.agent.br_mgr, "add_interface"),
|
||||
mock.patch.object(self.lb_rpc.sg_agent,
|
||||
"refresh_firewall", create=True),
|
||||
mock.patch.object(self.lb_rpc.agent,
|
||||
"plugin_rpc", create=True),
|
||||
mock.patch.object(linuxbridge_quantum_agent.LOG, 'error'),
|
||||
) as (get_tap_fn, udev_fn, _, _, _, _, plugin_rpc, log):
|
||||
get_tap_fn.return_value = "tap123"
|
||||
udev_fn.return_value = ["tap123", "tap124"]
|
||||
port = {"admin_state_up": True,
|
||||
"id": "1234-5678",
|
||||
"network_id": "123-123"}
|
||||
plugin_rpc.update_device_up.side_effect = rpc_common.Timeout
|
||||
self.lb_rpc.port_update(mock.Mock(), port=port)
|
||||
self.assertTrue(plugin_rpc.update_device_up.called)
|
||||
self.assertEqual(log.call_count, 1)
|
||||
|
||||
log.reset_mock()
|
||||
port["admin_state_up"] = False
|
||||
plugin_rpc.update_device_down.side_effect = rpc_common.Timeout
|
||||
self.lb_rpc.port_update(mock.Mock(), port=port)
|
||||
self.assertTrue(plugin_rpc.update_device_down.called)
|
||||
self.assertEqual(log.call_count, 1)
|
||||
|
@ -23,6 +23,7 @@ import testtools
|
||||
|
||||
from quantum.agent.linux import ip_lib
|
||||
from quantum.agent.linux import ovs_lib
|
||||
from quantum.openstack.common.rpc import common as rpc_common
|
||||
from quantum.plugins.openvswitch.agent import ovs_quantum_agent
|
||||
from quantum.tests import base
|
||||
|
||||
@ -236,6 +237,30 @@ class TestOvsQuantumAgent(base.BaseTestCase):
|
||||
updup_fn.assert_called_with(self.agent.context,
|
||||
"123", self.agent.agent_id)
|
||||
|
||||
def test_port_update_plugin_rpc_failed(self):
|
||||
port = {'id': 1,
|
||||
'network_id': 1,
|
||||
'admin_state_up': True}
|
||||
with contextlib.nested(
|
||||
mock.patch.object(ovs_quantum_agent.LOG, 'error'),
|
||||
mock.patch.object(self.agent.int_br, "get_vif_port_by_id"),
|
||||
mock.patch.object(self.agent.plugin_rpc, 'update_device_up'),
|
||||
mock.patch.object(self.agent, 'port_bound'),
|
||||
mock.patch.object(self.agent.plugin_rpc, 'update_device_down'),
|
||||
mock.patch.object(self.agent, 'port_dead')
|
||||
) as (log, _, device_up, _, device_down, _):
|
||||
device_up.side_effect = rpc_common.Timeout
|
||||
self.agent.port_update(mock.Mock(), port=port)
|
||||
self.assertTrue(device_up.called)
|
||||
self.assertEqual(log.call_count, 1)
|
||||
|
||||
log.reset_mock()
|
||||
port['admin_state_up'] = False
|
||||
device_down.side_effect = rpc_common.Timeout
|
||||
self.agent.port_update(mock.Mock(), port=port)
|
||||
self.assertTrue(device_down.called)
|
||||
self.assertEqual(log.call_count, 1)
|
||||
|
||||
def test_setup_physical_bridges(self):
|
||||
with contextlib.nested(
|
||||
mock.patch.object(ip_lib, "device_exists"),
|
||||
|
Loading…
x
Reference in New Issue
Block a user