Introduce bulk calls for get device details
Allow to get multiple devices details instead of just one This change introduces a new method in the rpc api. blueprint bulk-get-device-details Change-Id: I8497256d7f4f2fb48b5cb792e35aaedf63f129fc
This commit is contained in:
parent
befc4a0229
commit
88638e6994
@ -14,6 +14,7 @@
|
||||
# under the License.
|
||||
|
||||
import itertools
|
||||
from oslo import messaging
|
||||
|
||||
from neutron.common import rpc as n_rpc
|
||||
from neutron.common import topics
|
||||
@ -91,6 +92,24 @@ class PluginApi(n_rpc.RpcProxy):
|
||||
agent_id=agent_id),
|
||||
topic=self.topic)
|
||||
|
||||
def get_devices_details_list(self, context, devices, agent_id):
|
||||
res = []
|
||||
try:
|
||||
res = self.call(context,
|
||||
self.make_msg('get_devices_details_list',
|
||||
devices=devices,
|
||||
agent_id=agent_id),
|
||||
topic=self.topic, version='1.2')
|
||||
except messaging.UnsupportedVersion:
|
||||
res = [
|
||||
self.call(context,
|
||||
self.make_msg('get_device_details', device=device,
|
||||
agent_id=agent_id),
|
||||
topic=self.topic)
|
||||
for device in devices
|
||||
]
|
||||
return res
|
||||
|
||||
def update_device_down(self, context, device, agent_id, host=None):
|
||||
return self.call(context,
|
||||
self.make_msg('update_device_down', device=device,
|
||||
|
@ -83,10 +83,11 @@ class BridgeRpcCallbacks(n_rpc.RpcCallback,
|
||||
sg_db_rpc.SecurityGroupServerRpcCallbackMixin):
|
||||
"""Agent callback."""
|
||||
|
||||
RPC_API_VERSION = '1.1'
|
||||
RPC_API_VERSION = '1.2'
|
||||
# Device names start with "tap"
|
||||
# history
|
||||
# 1.1 Support Security Group RPC
|
||||
# 1.2 Support get_devices_details_list
|
||||
TAP_PREFIX_LEN = 3
|
||||
|
||||
@classmethod
|
||||
@ -136,6 +137,16 @@ class BridgeRpcCallbacks(n_rpc.RpcCallback,
|
||||
LOG.debug(_("%s can not be found in database"), device)
|
||||
return entry
|
||||
|
||||
def get_devices_details_list(self, rpc_context, **kwargs):
|
||||
return [
|
||||
self.get_device_details(
|
||||
rpc_context,
|
||||
device=device,
|
||||
**kwargs
|
||||
)
|
||||
for device in kwargs.pop('devices', [])
|
||||
]
|
||||
|
||||
def update_device_down(self, rpc_context, **kwargs):
|
||||
"""Device no longer exists on agent."""
|
||||
|
||||
|
@ -357,21 +357,21 @@ class HyperVNeutronAgent(n_rpc.RpcCallback):
|
||||
LOG.debug(_("No port %s defined on agent."), port_id)
|
||||
|
||||
def _treat_devices_added(self, devices):
|
||||
resync = False
|
||||
for device in devices:
|
||||
try:
|
||||
devices_details_list = self.plugin_rpc.get_devices_details_list(
|
||||
self.context,
|
||||
devices,
|
||||
self.agent_id)
|
||||
except Exception as e:
|
||||
LOG.debug("Unable to get ports details for "
|
||||
"devices %(devices)s: %(e)s",
|
||||
{'devices': devices, 'e': e})
|
||||
# resync is needed
|
||||
return True
|
||||
|
||||
for device_details in devices_details_list:
|
||||
device = device_details['device']
|
||||
LOG.info(_("Adding port %s"), device)
|
||||
try:
|
||||
device_details = self.plugin_rpc.get_device_details(
|
||||
self.context,
|
||||
device,
|
||||
self.agent_id)
|
||||
except Exception as e:
|
||||
LOG.debug(
|
||||
_("Unable to get port details for "
|
||||
"device %(device)s: %(e)s"),
|
||||
{'device': device, 'e': e})
|
||||
resync = True
|
||||
continue
|
||||
if 'port_id' in device_details:
|
||||
LOG.info(
|
||||
_("Port %(device)s updated. Details: %(device_details)s"),
|
||||
@ -395,7 +395,7 @@ class HyperVNeutronAgent(n_rpc.RpcCallback):
|
||||
device,
|
||||
self.agent_id,
|
||||
cfg.CONF.host)
|
||||
return resync
|
||||
return False
|
||||
|
||||
def _treat_devices_removed(self, devices):
|
||||
resync = False
|
||||
|
@ -30,8 +30,10 @@ class HyperVRpcCallbacks(
|
||||
dhcp_rpc_base.DhcpRpcCallbackMixin,
|
||||
l3_rpc_base.L3RpcCallbackMixin):
|
||||
|
||||
# Set RPC API version to 1.0 by default.
|
||||
RPC_API_VERSION = '1.1'
|
||||
# history
|
||||
# 1.1 Support Security Group RPC
|
||||
# 1.2 Support get_devices_details_list
|
||||
RPC_API_VERSION = '1.2'
|
||||
|
||||
def __init__(self, notifier):
|
||||
super(HyperVRpcCallbacks, self).__init__()
|
||||
@ -61,6 +63,16 @@ class HyperVRpcCallbacks(
|
||||
LOG.debug(_("%s can not be found in database"), device)
|
||||
return entry
|
||||
|
||||
def get_devices_details_list(self, rpc_context, **kwargs):
|
||||
return [
|
||||
self.get_device_details(
|
||||
rpc_context,
|
||||
device=device,
|
||||
**kwargs
|
||||
)
|
||||
for device in kwargs.pop('devices', [])
|
||||
]
|
||||
|
||||
def update_device_down(self, rpc_context, **kwargs):
|
||||
"""Device no longer exists on agent."""
|
||||
# TODO(garyk) - live migration and port status
|
||||
|
@ -861,38 +861,39 @@ class LinuxBridgeNeutronAgentRPC(sg_rpc.SecurityGroupAgentRpcMixin):
|
||||
return (resync_a | resync_b)
|
||||
|
||||
def treat_devices_added_updated(self, devices):
|
||||
resync = False
|
||||
try:
|
||||
devices_details_list = self.plugin_rpc.get_devices_details_list(
|
||||
self.context, devices, self.agent_id)
|
||||
except Exception as e:
|
||||
LOG.debug("Unable to get port details for "
|
||||
"%(devices)s: %(e)s",
|
||||
{'devices': devices, 'e': e})
|
||||
# resync is needed
|
||||
return True
|
||||
|
||||
for device in devices:
|
||||
LOG.debug(_("Treating added or updated device: %s"), device)
|
||||
try:
|
||||
details = self.plugin_rpc.get_device_details(self.context,
|
||||
device,
|
||||
self.agent_id)
|
||||
except Exception as e:
|
||||
LOG.debug(_("Unable to get port details for "
|
||||
"%(device)s: %(e)s"),
|
||||
{'device': device, 'e': e})
|
||||
resync = True
|
||||
continue
|
||||
if 'port_id' in details:
|
||||
for device_details in devices_details_list:
|
||||
device = device_details['device']
|
||||
LOG.debug("Port %s added", device)
|
||||
|
||||
if 'port_id' in device_details:
|
||||
LOG.info(_("Port %(device)s updated. Details: %(details)s"),
|
||||
{'device': device, 'details': details})
|
||||
if details['admin_state_up']:
|
||||
{'device': device, 'details': device_details})
|
||||
if device_details['admin_state_up']:
|
||||
# create the networking for the port
|
||||
network_type = details.get('network_type')
|
||||
network_type = device_details.get('network_type')
|
||||
if network_type:
|
||||
segmentation_id = details.get('segmentation_id')
|
||||
segmentation_id = device_details.get('segmentation_id')
|
||||
else:
|
||||
# compatibility with pre-Havana RPC vlan_id encoding
|
||||
vlan_id = details.get('vlan_id')
|
||||
vlan_id = device_details.get('vlan_id')
|
||||
(network_type,
|
||||
segmentation_id) = lconst.interpret_vlan_id(vlan_id)
|
||||
if self.br_mgr.add_interface(details['network_id'],
|
||||
network_type,
|
||||
details['physical_network'],
|
||||
segmentation_id,
|
||||
details['port_id']):
|
||||
if self.br_mgr.add_interface(
|
||||
device_details['network_id'],
|
||||
network_type,
|
||||
device_details['physical_network'],
|
||||
segmentation_id,
|
||||
device_details['port_id']):
|
||||
|
||||
# update plugin about port status
|
||||
self.plugin_rpc.update_device_up(self.context,
|
||||
@ -905,11 +906,11 @@ class LinuxBridgeNeutronAgentRPC(sg_rpc.SecurityGroupAgentRpcMixin):
|
||||
self.agent_id,
|
||||
cfg.CONF.host)
|
||||
else:
|
||||
self.remove_port_binding(details['network_id'],
|
||||
details['port_id'])
|
||||
self.remove_port_binding(device_details['network_id'],
|
||||
device_details['port_id'])
|
||||
else:
|
||||
LOG.info(_("Device %s not defined on plugin"), device)
|
||||
return resync
|
||||
return False
|
||||
|
||||
def treat_devices_removed(self, devices):
|
||||
resync = False
|
||||
|
@ -61,7 +61,8 @@ class LinuxBridgeRpcCallbacks(n_rpc.RpcCallback,
|
||||
|
||||
# history
|
||||
# 1.1 Support Security Group RPC
|
||||
RPC_API_VERSION = '1.1'
|
||||
# 1.2 Support get_devices_details_list
|
||||
RPC_API_VERSION = '1.2'
|
||||
# Device names start with "tap"
|
||||
TAP_PREFIX_LEN = 3
|
||||
|
||||
@ -102,6 +103,16 @@ class LinuxBridgeRpcCallbacks(n_rpc.RpcCallback,
|
||||
LOG.debug(_("%s can not be found in database"), device)
|
||||
return entry
|
||||
|
||||
def get_devices_details_list(self, rpc_context, **kwargs):
|
||||
return [
|
||||
self.get_device_details(
|
||||
rpc_context,
|
||||
device=device,
|
||||
**kwargs
|
||||
)
|
||||
for device in kwargs.pop('devices', [])
|
||||
]
|
||||
|
||||
def update_device_down(self, rpc_context, **kwargs):
|
||||
"""Device no longer exists on agent."""
|
||||
# TODO(garyk) - live migration and port status
|
||||
|
@ -41,10 +41,11 @@ class RpcCallbacks(dhcp_rpc_base.DhcpRpcCallbackMixin,
|
||||
sg_db_rpc.SecurityGroupServerRpcCallbackMixin,
|
||||
type_tunnel.TunnelRpcCallbackMixin):
|
||||
|
||||
RPC_API_VERSION = '1.1'
|
||||
RPC_API_VERSION = '1.2'
|
||||
# history
|
||||
# 1.0 Initial version (from openvswitch/linuxbridge)
|
||||
# 1.1 Support Security Group RPC
|
||||
# 1.2 Support get_devices_details_list
|
||||
|
||||
# FIXME(ihrachys): we can't use n_rpc.RpcCallback here due to
|
||||
# inheritance problems
|
||||
@ -149,6 +150,16 @@ class RpcCallbacks(dhcp_rpc_base.DhcpRpcCallbackMixin,
|
||||
LOG.debug(_("Returning: %s"), entry)
|
||||
return entry
|
||||
|
||||
def get_devices_details_list(self, rpc_context, **kwargs):
|
||||
return [
|
||||
self.get_device_details(
|
||||
rpc_context,
|
||||
device=device,
|
||||
**kwargs
|
||||
)
|
||||
for device in kwargs.pop('devices', [])
|
||||
]
|
||||
|
||||
def _find_segment(self, segments, segment_id):
|
||||
for segment in segments:
|
||||
if segment[api.ID] == segment_id:
|
||||
|
@ -312,20 +312,22 @@ class MlnxEswitchNeutronAgent(sg_rpc.SecurityGroupAgentRpcMixin):
|
||||
LOG.debug(_("No port %s defined on agent."), port_id)
|
||||
|
||||
def treat_devices_added(self, devices):
|
||||
resync = False
|
||||
for device in devices:
|
||||
try:
|
||||
devs_details_list = self.plugin_rpc.get_devices_details_list(
|
||||
self.context,
|
||||
devices,
|
||||
self.agent_id)
|
||||
except Exception as e:
|
||||
LOG.debug("Unable to get device details for devices "
|
||||
"with MAC address %(devices)s: due to %(exc)s",
|
||||
{'devices': devices, 'exc': e})
|
||||
# resync is needed
|
||||
return True
|
||||
|
||||
for dev_details in devs_details_list:
|
||||
device = dev_details['device']
|
||||
LOG.info(_("Adding port with mac %s"), device)
|
||||
try:
|
||||
dev_details = self.plugin_rpc.get_device_details(
|
||||
self.context,
|
||||
device,
|
||||
self.agent_id)
|
||||
except Exception as e:
|
||||
LOG.debug(_("Unable to get device dev_details for device "
|
||||
"with mac_address %(device)s: due to %(exc)s"),
|
||||
{'device': device, 'exc': e})
|
||||
resync = True
|
||||
continue
|
||||
|
||||
if 'port_id' in dev_details:
|
||||
LOG.info(_("Port %s updated"), device)
|
||||
LOG.debug(_("Device details %s"), str(dev_details))
|
||||
@ -343,7 +345,7 @@ class MlnxEswitchNeutronAgent(sg_rpc.SecurityGroupAgentRpcMixin):
|
||||
else:
|
||||
LOG.debug(_("Device with mac_address %s not defined "
|
||||
"on Neutron Plugin"), device)
|
||||
return resync
|
||||
return False
|
||||
|
||||
def treat_devices_removed(self, devices):
|
||||
resync = False
|
||||
|
@ -32,7 +32,8 @@ class MlnxRpcCallbacks(n_rpc.RpcCallback,
|
||||
sg_db_rpc.SecurityGroupServerRpcCallbackMixin):
|
||||
# History
|
||||
# 1.1 Support Security Group RPC
|
||||
RPC_API_VERSION = '1.1'
|
||||
# 1.2 Support get_devices_details_list
|
||||
RPC_API_VERSION = '1.2'
|
||||
|
||||
#to be compatible with Linux Bridge Agent on Network Node
|
||||
TAP_PREFIX_LEN = 3
|
||||
@ -83,6 +84,16 @@ class MlnxRpcCallbacks(n_rpc.RpcCallback,
|
||||
LOG.debug(_("%s can not be found in database"), device)
|
||||
return entry
|
||||
|
||||
def get_devices_details_list(self, rpc_context, **kwargs):
|
||||
return [
|
||||
self.get_device_details(
|
||||
rpc_context,
|
||||
device=device,
|
||||
**kwargs
|
||||
)
|
||||
for device in kwargs.pop('devices', [])
|
||||
]
|
||||
|
||||
def update_device_down(self, rpc_context, **kwargs):
|
||||
"""Device no longer exists on agent."""
|
||||
agent_id = kwargs.get('agent_id')
|
||||
|
@ -1078,9 +1078,19 @@ class OVSNeutronAgent(n_rpc.RpcCallback,
|
||||
self.tun_br_ofports[tunnel_type].pop(remote_ip, None)
|
||||
|
||||
def treat_devices_added_or_updated(self, devices, ovs_restarted):
|
||||
resync = False
|
||||
for device in devices:
|
||||
LOG.debug(_("Processing port %s"), device)
|
||||
try:
|
||||
devices_details_list = self.plugin_rpc.get_devices_details_list(
|
||||
self.context,
|
||||
devices,
|
||||
self.agent_id)
|
||||
except Exception as e:
|
||||
LOG.debug("Unable to get port details for %(devices)s: %(e)s",
|
||||
{'devices': devices, 'e': e})
|
||||
# resync is needed
|
||||
return True
|
||||
for details in devices_details_list:
|
||||
device = details['device']
|
||||
LOG.debug("Processing port: %s", device)
|
||||
port = self.int_br.get_vif_port_by_id(device)
|
||||
if not port:
|
||||
# The port has disappeared and should not be processed
|
||||
@ -1089,18 +1099,7 @@ class OVSNeutronAgent(n_rpc.RpcCallback,
|
||||
LOG.info(_("Port %s was not found on the integration bridge "
|
||||
"and will therefore not be processed"), device)
|
||||
continue
|
||||
try:
|
||||
# TODO(salv-orlando): Provide bulk API for retrieving
|
||||
# details for all devices in one call
|
||||
details = self.plugin_rpc.get_device_details(self.context,
|
||||
device,
|
||||
self.agent_id)
|
||||
except Exception as e:
|
||||
LOG.debug(_("Unable to get port details for "
|
||||
"%(device)s: %(e)s"),
|
||||
{'device': device, 'e': e})
|
||||
resync = True
|
||||
continue
|
||||
|
||||
if 'port_id' in details:
|
||||
LOG.info(_("Port %(device)s updated. Details: %(details)s"),
|
||||
{'device': device, 'details': details})
|
||||
@ -1125,28 +1124,30 @@ class OVSNeutronAgent(n_rpc.RpcCallback,
|
||||
LOG.warn(_("Device %s not defined on plugin"), device)
|
||||
if (port and port.ofport != -1):
|
||||
self.port_dead(port)
|
||||
return resync
|
||||
return False
|
||||
|
||||
def treat_ancillary_devices_added(self, devices):
|
||||
resync = False
|
||||
for device in devices:
|
||||
try:
|
||||
devices_details_list = self.plugin_rpc.get_devices_details_list(
|
||||
self.context,
|
||||
devices,
|
||||
self.agent_id)
|
||||
except Exception as e:
|
||||
LOG.debug("Unable to get port details for "
|
||||
"%(devices)s: %(e)s", {'devices': devices, 'e': e})
|
||||
# resync is needed
|
||||
return True
|
||||
|
||||
for details in devices_details_list:
|
||||
device = details['device']
|
||||
LOG.info(_("Ancillary Port %s added"), device)
|
||||
try:
|
||||
self.plugin_rpc.get_device_details(self.context, device,
|
||||
self.agent_id)
|
||||
except Exception as e:
|
||||
LOG.debug(_("Unable to get port details for "
|
||||
"%(device)s: %(e)s"),
|
||||
{'device': device, 'e': e})
|
||||
resync = True
|
||||
continue
|
||||
|
||||
# update plugin about port status
|
||||
self.plugin_rpc.update_device_up(self.context,
|
||||
device,
|
||||
self.agent_id,
|
||||
cfg.CONF.host)
|
||||
return resync
|
||||
return False
|
||||
|
||||
def treat_devices_removed(self, devices):
|
||||
resync = False
|
||||
|
@ -65,8 +65,9 @@ class OVSRpcCallbacks(n_rpc.RpcCallback,
|
||||
# history
|
||||
# 1.0 Initial version
|
||||
# 1.1 Support Security Group RPC
|
||||
# 1.2 Support get_devices_details_list
|
||||
|
||||
RPC_API_VERSION = '1.1'
|
||||
RPC_API_VERSION = '1.2'
|
||||
|
||||
def __init__(self, notifier, tunnel_type):
|
||||
super(OVSRpcCallbacks, self).__init__()
|
||||
@ -105,6 +106,16 @@ class OVSRpcCallbacks(n_rpc.RpcCallback,
|
||||
LOG.debug(_("%s can not be found in database"), device)
|
||||
return entry
|
||||
|
||||
def get_devices_details_list(self, rpc_context, **kwargs):
|
||||
return [
|
||||
self.get_device_details(
|
||||
rpc_context,
|
||||
device=device,
|
||||
**kwargs
|
||||
)
|
||||
for device in kwargs.pop('devices', [])
|
||||
]
|
||||
|
||||
def update_device_down(self, rpc_context, **kwargs):
|
||||
"""Device no longer exists on agent."""
|
||||
agent_id = kwargs.get('agent_id')
|
||||
|
@ -145,7 +145,7 @@ class TestHyperVNeutronAgent(base.BaseTestCase):
|
||||
self.assertNotIn(self._FAKE_PORT_ID, self.agent._port_metric_retries)
|
||||
|
||||
def test_treat_devices_added_returns_true_for_missing_device(self):
|
||||
attrs = {'get_device_details.side_effect': Exception()}
|
||||
attrs = {'get_devices_details_list.side_effect': Exception()}
|
||||
self.agent.plugin_rpc.configure_mock(**attrs)
|
||||
self.assertTrue(self.agent._treat_devices_added([{}]))
|
||||
|
||||
@ -156,7 +156,7 @@ class TestHyperVNeutronAgent(base.BaseTestCase):
|
||||
:param func_name: the function that should be called
|
||||
:returns: whether the named function was called
|
||||
"""
|
||||
attrs = {'get_device_details.return_value': details}
|
||||
attrs = {'get_devices_details_list.return_value': [details]}
|
||||
self.agent.plugin_rpc.configure_mock(**attrs)
|
||||
with mock.patch.object(self.agent, func_name) as func:
|
||||
self.assertFalse(self.agent._treat_devices_added([{}]))
|
||||
|
@ -35,6 +35,7 @@ class rpcHyperVApiTestCase(base.BaseTestCase):
|
||||
self, rpcapi, topic, method, rpc_method, **kwargs):
|
||||
ctxt = context.RequestContext('fake_user', 'fake_project')
|
||||
expected_retval = 'foo' if method == 'call' else None
|
||||
expected_version = kwargs.pop('version', None)
|
||||
expected_msg = rpcapi.make_msg(method, **kwargs)
|
||||
if rpc_method == 'cast' and method == 'run_instance':
|
||||
kwargs['call'] = False
|
||||
@ -45,9 +46,14 @@ class rpcHyperVApiTestCase(base.BaseTestCase):
|
||||
retval = getattr(rpcapi, method)(ctxt, **kwargs)
|
||||
|
||||
self.assertEqual(retval, expected_retval)
|
||||
expected = [
|
||||
mock.call(ctxt, expected_msg, topic=topic)
|
||||
]
|
||||
if expected_version:
|
||||
expected = [
|
||||
mock.call(ctxt, expected_msg, topic=topic,
|
||||
version=expected_version)]
|
||||
else:
|
||||
expected = [
|
||||
mock.call(ctxt, expected_msg, topic=topic)
|
||||
]
|
||||
rpc_method_mock.assert_has_calls(expected)
|
||||
|
||||
def test_delete_network(self):
|
||||
@ -105,6 +111,15 @@ class rpcHyperVApiTestCase(base.BaseTestCase):
|
||||
device='fake_device',
|
||||
agent_id='fake_agent_id')
|
||||
|
||||
def test_devices_details_list(self):
|
||||
rpcapi = agent_rpc.PluginApi(topics.PLUGIN)
|
||||
self._test_hyperv_neutron_api(
|
||||
rpcapi, topics.PLUGIN,
|
||||
'get_devices_details_list', rpc_method='call',
|
||||
devices=['fake_device1', 'fake_device2'],
|
||||
agent_id='fake_agent_id',
|
||||
version='1.2')
|
||||
|
||||
def test_update_device_down(self):
|
||||
rpcapi = agent_rpc.PluginApi(topics.PLUGIN)
|
||||
self._test_hyperv_neutron_api(
|
||||
|
@ -273,17 +273,17 @@ class TestLinuxBridgeAgent(base.BaseTestCase):
|
||||
|
||||
def test_treat_devices_added_updated_admin_state_up_true(self):
|
||||
agent = self.agent
|
||||
mock_details = {'port_id': 'port123',
|
||||
mock_details = {'device': 'dev123',
|
||||
'port_id': 'port123',
|
||||
'network_id': 'net123',
|
||||
'admin_state_up': True,
|
||||
'network_type': 'vlan',
|
||||
'segmentation_id': 100,
|
||||
'physical_network': 'physnet1'}
|
||||
agent.plugin_rpc = mock.Mock()
|
||||
agent.plugin_rpc.get_device_details.return_value = mock_details
|
||||
agent.plugin_rpc.get_devices_details_list.return_value = [mock_details]
|
||||
agent.br_mgr = mock.Mock()
|
||||
agent.br_mgr.add_interface.return_value = True
|
||||
|
||||
resync_needed = agent.treat_devices_added_updated(set(['tap1']))
|
||||
|
||||
self.assertFalse(resync_needed)
|
||||
@ -293,21 +293,22 @@ class TestLinuxBridgeAgent(base.BaseTestCase):
|
||||
self.assertTrue(agent.plugin_rpc.update_device_up.called)
|
||||
|
||||
def test_treat_devices_added_updated_admin_state_up_false(self):
|
||||
mock_details = {'port_id': 'port123',
|
||||
agent = self.agent
|
||||
mock_details = {'device': 'dev123',
|
||||
'port_id': 'port123',
|
||||
'network_id': 'net123',
|
||||
'admin_state_up': False,
|
||||
'network_type': 'vlan',
|
||||
'segmentation_id': 100,
|
||||
'physical_network': 'physnet1'}
|
||||
self.agent.plugin_rpc = mock.Mock()
|
||||
self.agent.plugin_rpc.get_device_details.return_value = mock_details
|
||||
self.agent.remove_port_binding = mock.Mock()
|
||||
|
||||
resync_needed = self.agent.treat_devices_added_updated(set(['tap1']))
|
||||
agent.plugin_rpc = mock.Mock()
|
||||
agent.plugin_rpc.get_devices_details_list.return_value = [mock_details]
|
||||
agent.remove_port_binding = mock.Mock()
|
||||
resync_needed = agent.treat_devices_added_updated(set(['tap1']))
|
||||
|
||||
self.assertFalse(resync_needed)
|
||||
self.agent.remove_port_binding.assert_called_with('net123', 'port123')
|
||||
self.assertFalse(self.agent.plugin_rpc.update_device_up.called)
|
||||
agent.remove_port_binding.assert_called_with('net123', 'port123')
|
||||
self.assertFalse(agent.plugin_rpc.update_device_up.called)
|
||||
|
||||
|
||||
class TestLinuxBridgeManager(base.BaseTestCase):
|
||||
|
@ -31,6 +31,9 @@ class rpcApiTestCase(base.BaseTestCase):
|
||||
expected_msg=None, **kwargs):
|
||||
ctxt = context.RequestContext('fake_user', 'fake_project')
|
||||
expected_retval = 'foo' if method == 'call' else None
|
||||
expected_kwargs = {'topic': topic}
|
||||
if 'version' in kwargs:
|
||||
expected_kwargs['version'] = kwargs.pop('version')
|
||||
if not expected_msg:
|
||||
expected_msg = rpcapi.make_msg(method, **kwargs)
|
||||
if rpc_method == 'cast' and method == 'run_instance':
|
||||
@ -53,7 +56,6 @@ class rpcApiTestCase(base.BaseTestCase):
|
||||
|
||||
self.assertEqual(expected_retval, retval)
|
||||
expected_args = [ctxt, expected_msg]
|
||||
expected_kwargs = {'topic': topic}
|
||||
|
||||
# skip the first argument which is 'self'
|
||||
for arg, expected_arg in zip(self.fake_args[1:], expected_args):
|
||||
@ -113,6 +115,14 @@ class rpcApiTestCase(base.BaseTestCase):
|
||||
device='fake_device',
|
||||
agent_id='fake_agent_id')
|
||||
|
||||
def test_devices_details_list(self):
|
||||
rpcapi = agent_rpc.PluginApi(topics.PLUGIN)
|
||||
self._test_lb_api(rpcapi, topics.PLUGIN,
|
||||
'get_devices_details_list', rpc_method='call',
|
||||
devices=['fake_device1', 'fake_device2'],
|
||||
agent_id='fake_agent_id',
|
||||
version='1.2')
|
||||
|
||||
def test_update_device_down(self):
|
||||
rpcapi = agent_rpc.PluginApi(topics.PLUGIN)
|
||||
self._test_lb_api(rpcapi, topics.PLUGIN,
|
||||
|
@ -33,6 +33,7 @@ class RpcApiTestCase(base.BaseTestCase):
|
||||
def _test_rpc_api(self, rpcapi, topic, method, rpc_method, **kwargs):
|
||||
ctxt = context.RequestContext('fake_user', 'fake_project')
|
||||
expected_retval = 'foo' if method == 'call' else None
|
||||
expected_version = kwargs.pop('version', None)
|
||||
expected_msg = rpcapi.make_msg(method, **kwargs)
|
||||
if rpc_method == 'cast' and method == 'run_instance':
|
||||
kwargs['call'] = False
|
||||
@ -43,9 +44,14 @@ class RpcApiTestCase(base.BaseTestCase):
|
||||
retval = getattr(rpcapi, method)(ctxt, **kwargs)
|
||||
|
||||
self.assertEqual(retval, expected_retval)
|
||||
expected = [
|
||||
mock.call(ctxt, expected_msg, topic=topic)
|
||||
]
|
||||
if expected_version:
|
||||
expected = [
|
||||
mock.call(ctxt, expected_msg, topic=topic,
|
||||
version=expected_version)]
|
||||
else:
|
||||
expected = [
|
||||
mock.call(ctxt, expected_msg, topic=topic)
|
||||
]
|
||||
rpc_method_mock.assert_has_calls(expected)
|
||||
|
||||
def test_delete_network(self):
|
||||
@ -85,6 +91,14 @@ class RpcApiTestCase(base.BaseTestCase):
|
||||
device='fake_device',
|
||||
agent_id='fake_agent_id')
|
||||
|
||||
def test_devices_details_list(self):
|
||||
rpcapi = agent_rpc.PluginApi(topics.PLUGIN)
|
||||
self._test_rpc_api(rpcapi, topics.PLUGIN,
|
||||
'get_devices_details_list', rpc_method='call',
|
||||
devices=['fake_device1', 'fake_device2'],
|
||||
agent_id='fake_agent_id',
|
||||
version='1.2')
|
||||
|
||||
def test_update_device_down(self):
|
||||
rpcapi = agent_rpc.PluginApi(topics.PLUGIN)
|
||||
self._test_rpc_api(rpcapi, topics.PLUGIN,
|
||||
|
@ -76,7 +76,7 @@ class TestEswitchAgent(base.BaseTestCase):
|
||||
self.agent.eswitch.get_vnics_mac.return_value = []
|
||||
|
||||
def test_treat_devices_added_returns_true_for_missing_device(self):
|
||||
attrs = {'get_device_details.side_effect': Exception()}
|
||||
attrs = {'get_devices_details_list.side_effect': Exception()}
|
||||
self.agent.plugin_rpc.configure_mock(**attrs)
|
||||
with contextlib.nested(
|
||||
mock.patch('neutron.plugins.mlnx.agent.eswitch_neutron_agent.'
|
||||
@ -95,8 +95,9 @@ class TestEswitchAgent(base.BaseTestCase):
|
||||
mock.patch('neutron.plugins.mlnx.agent.eswitch_neutron_agent.'
|
||||
'EswitchManager.get_vnics_mac',
|
||||
return_value=[]),
|
||||
mock.patch.object(self.agent.plugin_rpc, 'get_device_details',
|
||||
return_value=details),
|
||||
mock.patch.object(self.agent.plugin_rpc,
|
||||
'get_devices_details_list',
|
||||
return_value=[details]),
|
||||
mock.patch.object(self.agent.plugin_rpc, 'update_device_up'),
|
||||
mock.patch.object(self.agent, func_name)
|
||||
) as (vnics_fn, get_dev_fn, upd_dev_up, func):
|
||||
|
@ -33,6 +33,9 @@ class rpcApiTestCase(base.BaseTestCase):
|
||||
expected_msg=None, **kwargs):
|
||||
ctxt = context.RequestContext('fake_user', 'fake_project')
|
||||
expected_retval = 'foo' if method == 'call' else None
|
||||
expected_kwargs = {'topic': topic}
|
||||
if 'version' in kwargs:
|
||||
expected_kwargs['version'] = kwargs.pop('version')
|
||||
if not expected_msg:
|
||||
expected_msg = rpcapi.make_msg(method, **kwargs)
|
||||
if rpc_method == 'cast' and method == 'run_instance':
|
||||
@ -55,7 +58,6 @@ class rpcApiTestCase(base.BaseTestCase):
|
||||
|
||||
self.assertEqual(expected_retval, retval)
|
||||
expected_args = [ctxt, expected_msg]
|
||||
expected_kwargs = {'topic': topic}
|
||||
|
||||
# skip the first argument which is 'self'
|
||||
for arg, expected_arg in zip(self.fake_args[1:], expected_args):
|
||||
@ -136,6 +138,14 @@ class rpcApiTestCase(base.BaseTestCase):
|
||||
device='fake_device',
|
||||
agent_id='fake_agent_id')
|
||||
|
||||
def test_devices_details_list(self):
|
||||
rpcapi = agent_rpc.PluginApi(topics.PLUGIN)
|
||||
self._test_mlnx_api(rpcapi, topics.PLUGIN,
|
||||
'get_devices_details_list', rpc_method='call',
|
||||
devices=['fake_device1', 'fake_device1'],
|
||||
agent_id='fake_agent_id',
|
||||
version='1.2')
|
||||
|
||||
def test_update_device_down(self):
|
||||
rpcapi = agent_rpc.PluginApi(topics.PLUGIN)
|
||||
self._test_mlnx_api(rpcapi, topics.PLUGIN,
|
||||
|
@ -281,7 +281,8 @@ class TestOvsNeutronAgent(base.BaseTestCase):
|
||||
|
||||
def test_treat_devices_added_returns_true_for_missing_device(self):
|
||||
with contextlib.nested(
|
||||
mock.patch.object(self.agent.plugin_rpc, 'get_device_details',
|
||||
mock.patch.object(self.agent.plugin_rpc,
|
||||
'get_devices_details_list',
|
||||
side_effect=Exception()),
|
||||
mock.patch.object(self.agent.int_br, 'get_vif_port_by_id',
|
||||
return_value=mock.Mock())):
|
||||
@ -297,8 +298,9 @@ class TestOvsNeutronAgent(base.BaseTestCase):
|
||||
:returns: whether the named function was called
|
||||
"""
|
||||
with contextlib.nested(
|
||||
mock.patch.object(self.agent.plugin_rpc, 'get_device_details',
|
||||
return_value=details),
|
||||
mock.patch.object(self.agent.plugin_rpc,
|
||||
'get_devices_details_list',
|
||||
return_value=[details]),
|
||||
mock.patch.object(self.agent.int_br, 'get_vif_port_by_id',
|
||||
return_value=port),
|
||||
mock.patch.object(self.agent.plugin_rpc, 'update_device_up'),
|
||||
@ -344,8 +346,9 @@ class TestOvsNeutronAgent(base.BaseTestCase):
|
||||
'segmentation_id': 'bar',
|
||||
'network_type': 'baz'}
|
||||
with contextlib.nested(
|
||||
mock.patch.object(self.agent.plugin_rpc, 'get_device_details',
|
||||
return_value=fake_details_dict),
|
||||
mock.patch.object(self.agent.plugin_rpc,
|
||||
'get_devices_details_list',
|
||||
return_value=[fake_details_dict]),
|
||||
mock.patch.object(self.agent.int_br, 'get_vif_port_by_id',
|
||||
return_value=mock.MagicMock()),
|
||||
mock.patch.object(self.agent.plugin_rpc, 'update_device_up'),
|
||||
|
@ -31,6 +31,9 @@ class rpcApiTestCase(base.BaseTestCase):
|
||||
def _test_ovs_api(self, rpcapi, topic, method, rpc_method, **kwargs):
|
||||
ctxt = context.RequestContext('fake_user', 'fake_project')
|
||||
expected_retval = 'foo' if method == 'call' else None
|
||||
expected_kwargs = {'topic': topic}
|
||||
if 'version' in kwargs:
|
||||
expected_kwargs['version'] = kwargs.pop('version')
|
||||
expected_msg = rpcapi.make_msg(method, **kwargs)
|
||||
if rpc_method == 'cast' and method == 'run_instance':
|
||||
kwargs['call'] = False
|
||||
@ -52,7 +55,6 @@ class rpcApiTestCase(base.BaseTestCase):
|
||||
|
||||
self.assertEqual(retval, expected_retval)
|
||||
expected_args = [ctxt, expected_msg]
|
||||
expected_kwargs = {'topic': topic}
|
||||
|
||||
# skip the first argument which is 'self'
|
||||
for arg, expected_arg in zip(self.fake_args[1:], expected_args):
|
||||
@ -97,6 +99,14 @@ class rpcApiTestCase(base.BaseTestCase):
|
||||
device='fake_device',
|
||||
agent_id='fake_agent_id')
|
||||
|
||||
def test_devices_details_list(self):
|
||||
rpcapi = agent_rpc.PluginApi(topics.PLUGIN)
|
||||
self._test_ovs_api(rpcapi, topics.PLUGIN,
|
||||
'get_devices_details_list', rpc_method='call',
|
||||
devices=['fake_device1', 'fake_device2'],
|
||||
agent_id='fake_agent_id',
|
||||
version='1.2')
|
||||
|
||||
def test_update_device_down(self):
|
||||
rpcapi = agent_rpc.PluginApi(topics.PLUGIN)
|
||||
self._test_ovs_api(rpcapi, topics.PLUGIN,
|
||||
|
@ -14,6 +14,7 @@
|
||||
# under the License.
|
||||
|
||||
import mock
|
||||
from oslo import messaging
|
||||
|
||||
from neutron.agent import rpc
|
||||
from neutron.openstack.common import context
|
||||
@ -37,6 +38,21 @@ class AgentRPCPluginApi(base.BaseTestCase):
|
||||
def test_get_device_details(self):
|
||||
self._test_rpc_call('get_device_details')
|
||||
|
||||
def test_get_devices_details_list(self):
|
||||
self._test_rpc_call('get_devices_details_list')
|
||||
|
||||
def test_devices_details_list_unsupported(self):
|
||||
agent = rpc.PluginApi('fake_topic')
|
||||
ctxt = context.RequestContext('fake_user', 'fake_project')
|
||||
expect_val_get_device_details = 'foo'
|
||||
expect_val = [expect_val_get_device_details]
|
||||
with mock.patch('neutron.common.rpc.RpcProxy.call') as rpc_call:
|
||||
rpc_call.side_effect = [messaging.UnsupportedVersion('1.2'),
|
||||
expect_val_get_device_details]
|
||||
func_obj = getattr(agent, 'get_devices_details_list')
|
||||
actual_val = func_obj(ctxt, ['fake_device'], 'fake_agent_id')
|
||||
self.assertEqual(actual_val, expect_val)
|
||||
|
||||
def test_update_device_down(self):
|
||||
self._test_rpc_call('update_device_down')
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user