Merge "MLNX Agent: Process port_update notifications in the main agent loop"

This commit is contained in:
Jenkins 2014-08-12 04:04:30 +00:00 committed by Gerrit Code Review
commit 0cb08f1674
2 changed files with 152 additions and 92 deletions

View File

@ -168,45 +168,10 @@ class MlnxEswitchRpcCallbacks(n_rpc.RpcCallback,
self.eswitch.remove_network(network_id)
def port_update(self, context, **kwargs):
LOG.debug(_("port_update received"))
port = kwargs.get('port')
net_type = kwargs.get('network_type')
segmentation_id = kwargs.get('segmentation_id')
if not segmentation_id:
# compatibility with pre-Havana RPC vlan_id encoding
segmentation_id = kwargs.get('vlan_id')
physical_network = kwargs.get('physical_network')
net_id = port['network_id']
if self.eswitch.vnic_port_exists(port['mac_address']):
if 'security_groups' in port:
self.sg_agent.refresh_firewall()
try:
if port['admin_state_up']:
self.eswitch.port_up(net_id,
net_type,
physical_network,
segmentation_id,
port['id'],
port['mac_address'])
# update plugin about port status
self.agent.plugin_rpc.update_device_up(self.context,
port['mac_address'],
self.agent.agent_id,
cfg.CONF.host)
else:
self.eswitch.port_down(net_id,
physical_network,
port['mac_address'])
# update plugin about port status
self.agent.plugin_rpc.update_device_down(
self.context,
port['mac_address'],
self.agent.agent_id,
cfg.CONF.host)
except n_rpc.MessagingTimeout:
LOG.error(_("RPC timeout while updating port %s"), port['id'])
else:
LOG.debug(_("No port %s defined on agent."), port['id'])
self.agent.add_port_update(port['mac_address'])
LOG.debug("port_update message processed for port with mac %s",
port['mac_address'])
class MlnxEswitchPluginApi(agent_rpc.PluginApi,
@ -229,6 +194,8 @@ class MlnxEswitchNeutronAgent(sg_rpc.SecurityGroupAgentRpcMixin):
'configurations': configurations,
'agent_type': q_constants.AGENT_TYPE_MLNX,
'start_flag': True}
# Stores port update notifications for processing in main rpc loop
self.updated_ports = set()
self._setup_rpc()
self.init_firewall()
@ -272,24 +239,27 @@ class MlnxEswitchNeutronAgent(sg_rpc.SecurityGroupAgentRpcMixin):
self._report_state)
heartbeat.start(interval=report_interval)
def update_ports(self, registered_ports):
ports = self.eswitch.get_vnics_mac()
if ports == registered_ports:
return
added = ports - registered_ports
removed = registered_ports - ports
return {'current': ports,
'added': added,
'removed': removed}
def add_port_update(self, port):
self.updated_ports.add(port)
def scan_ports(self, registered_ports, updated_ports_copy=None):
cur_ports = self.eswitch.get_vnics_mac()
port_info = {'current': cur_ports}
# Shouldn't process updates for not existing ports
port_info['updated'] = updated_ports_copy & cur_ports
port_info['added'] = cur_ports - registered_ports
port_info['removed'] = registered_ports - cur_ports
return port_info
def process_network_ports(self, port_info):
resync_a = False
resync_b = False
if port_info.get('added'):
LOG.debug(_("Ports added!"))
resync_a = self.treat_devices_added(port_info['added'])
if port_info.get('removed'):
LOG.debug(_("Ports removed!"))
device_added_updated = port_info['added'] | port_info['updated']
if device_added_updated:
resync_a = self.treat_devices_added_or_updated(
device_added_updated)
if port_info['removed']:
resync_b = self.treat_devices_removed(port_info['removed'])
# If one of the above opertaions fails => resync with plugin
return (resync_a | resync_b)
@ -311,7 +281,7 @@ class MlnxEswitchNeutronAgent(sg_rpc.SecurityGroupAgentRpcMixin):
else:
LOG.debug(_("No port %s defined on agent."), port_id)
def treat_devices_added(self, devices):
def treat_devices_added_or_updated(self, devices):
try:
devs_details_list = self.plugin_rpc.get_devices_details_list(
self.context,
@ -326,11 +296,11 @@ class MlnxEswitchNeutronAgent(sg_rpc.SecurityGroupAgentRpcMixin):
for dev_details in devs_details_list:
device = dev_details['device']
LOG.info(_("Adding port with mac %s"), device)
LOG.info(_("Adding or updating port with mac %s"), device)
if 'port_id' in dev_details:
LOG.info(_("Port %s updated"), device)
LOG.debug(_("Device details %s"), str(dev_details))
LOG.debug("Device details %s", str(dev_details))
self.treat_vif_port(dev_details['port_id'],
dev_details['device'],
dev_details['network_id'],
@ -339,12 +309,16 @@ class MlnxEswitchNeutronAgent(sg_rpc.SecurityGroupAgentRpcMixin):
dev_details['segmentation_id'],
dev_details['admin_state_up'])
if dev_details.get('admin_state_up'):
self.plugin_rpc.update_device_up(self.context,
device,
self.agent_id)
LOG.debug("Setting status for %s to UP", device)
self.plugin_rpc.update_device_up(
self.context, device, self.agent_id)
else:
LOG.debug("Setting status for %s to DOWN", device)
self.plugin_rpc.update_device_down(
self.context, device, self.agent_id)
else:
LOG.debug(_("Device with mac_address %s not defined "
"on Neutron Plugin"), device)
LOG.debug("Device with mac_address %s not defined "
"on Neutron Plugin", device)
return False
def treat_devices_removed(self, devices):
@ -369,27 +343,37 @@ class MlnxEswitchNeutronAgent(sg_rpc.SecurityGroupAgentRpcMixin):
self.eswitch.port_release(device)
return resync
def _port_info_has_changes(self, port_info):
return (port_info['added'] or
port_info['removed'] or
port_info['updated'])
def daemon_loop(self):
sync = True
ports = set()
updated_ports_copy = set()
LOG.info(_("eSwitch Agent Started!"))
while True:
try:
start = time.time()
if sync:
LOG.info(_("Agent out of sync with plugin!"))
ports.clear()
sync = False
start = time.time()
if sync:
LOG.info(_("Agent out of sync with plugin!"))
ports.clear()
sync = False
port_info = self.update_ports(ports)
# notify plugin about port deltas
if port_info:
LOG.debug(_("Agent loop process devices!"))
# If treat devices fails - must resync with plugin
try:
updated_ports_copy = self.updated_ports
self.updated_ports = set()
port_info = self.scan_ports(ports, updated_ports_copy)
LOG.debug("Agent loop process devices!")
# If treat devices fails - must resync with plugin
ports = port_info['current']
if self._port_info_has_changes(port_info):
LOG.debug("Starting to process devices in:%s", port_info)
# sync with upper/lower layers about port deltas
sync = self.process_network_ports(port_info)
ports = port_info['current']
except exceptions.RequestTimeout:
LOG.exception(_("Request timeout in agent event loop "
"eSwitchD is not responding - exiting..."))
@ -397,6 +381,7 @@ class MlnxEswitchNeutronAgent(sg_rpc.SecurityGroupAgentRpcMixin):
except Exception:
LOG.exception(_("Error in agent event loop"))
sync = True
self.updated_ports |= updated_ports_copy
# sleep till end of polling interval
elapsed = (time.time() - start)
if (elapsed < self._polling_interval):

View File

@ -45,6 +45,23 @@ class TestEswichManager(base.BaseTestCase):
self.manager.get_port_id_by_mac('no-such-mac')
class TestMlnxEswitchRpcCallbacks(base.BaseTestCase):
def setUp(self):
super(TestMlnxEswitchRpcCallbacks, self).setUp()
agent = mock.Mock()
self.rpc_callbacks = eswitch_neutron_agent.MlnxEswitchRpcCallbacks(
'context',
agent
)
def test_port_update(self):
port = {'mac_address': '10:20:30:40:50:60'}
add_port_update = self.rpc_callbacks.agent.add_port_update
self.rpc_callbacks.port_update('context', port=port)
add_port_update.assert_called_once_with(port['mac_address'])
class TestEswitchAgent(base.BaseTestCase):
def setUp(self):
@ -82,9 +99,9 @@ class TestEswitchAgent(base.BaseTestCase):
mock.patch('neutron.plugins.mlnx.agent.eswitch_neutron_agent.'
'EswitchManager.get_vnics_mac',
return_value=[])):
self.assertTrue(self.agent.treat_devices_added([{}]))
self.assertTrue(self.agent.treat_devices_added_or_updated([{}]))
def _mock_treat_devices_added(self, details, func_name):
def _mock_treat_devices_added_updated(self, details, func_name):
"""Mock treat devices added.
:param details: the details to return for the device
@ -101,14 +118,14 @@ class TestEswitchAgent(base.BaseTestCase):
mock.patch.object(self.agent.plugin_rpc, 'update_device_up'),
mock.patch.object(self.agent, func_name)
) as (vnics_fn, get_dev_fn, upd_dev_up, func):
self.assertFalse(self.agent.treat_devices_added([{}]))
self.assertFalse(self.agent.treat_devices_added_or_updated([{}]))
return (func.called, upd_dev_up.called)
def test_treat_devices_added_updates_known_port(self):
details = mock.MagicMock()
details.__contains__.side_effect = lambda x: True
func, dev_up = self._mock_treat_devices_added(details,
'treat_vif_port')
func, dev_up = self._mock_treat_devices_added_updated(details,
'treat_vif_port')
self.assertTrue(func)
self.assertTrue(dev_up)
@ -120,8 +137,8 @@ class TestEswitchAgent(base.BaseTestCase):
'physical_network': 'default',
'segmentation_id': 2,
'admin_state_up': False}
func, dev_up = self._mock_treat_devices_added(details,
'treat_vif_port')
func, dev_up = self._mock_treat_devices_added_updated(details,
'treat_vif_port')
self.assertTrue(func)
self.assertFalse(dev_up)
@ -139,17 +156,75 @@ class TestEswitchAgent(base.BaseTestCase):
self.assertFalse(self.agent.treat_devices_removed([{}]))
self.assertTrue(port_release.called)
def _test_process_network_ports(self, port_info):
with contextlib.nested(
mock.patch.object(self.agent, 'treat_devices_added_or_updated',
return_value=False),
mock.patch.object(self.agent, 'treat_devices_removed',
return_value=False)
) as (device_added_updated, device_removed):
self.assertFalse(self.agent.process_network_ports(port_info))
device_added_updated.assert_called_once_with(
port_info['added'] | port_info['updated'])
device_removed.assert_called_once_with(port_info['removed'])
def test_process_network_ports(self):
current_ports = set(['01:02:03:04:05:06'])
added_ports = set(['10:20:30:40:50:60'])
removed_ports = set(['11:22:33:44:55:66'])
reply = {'current': current_ports,
'removed': removed_ports,
'added': added_ports}
with mock.patch.object(self.agent, 'treat_devices_added',
return_value=False) as device_added:
with mock.patch.object(self.agent, 'treat_devices_removed',
return_value=False) as device_removed:
self.assertFalse(self.agent.process_network_ports(reply))
device_added.assert_called_once_with(added_ports)
device_removed.assert_called_once_with(removed_ports)
self._test_process_network_ports(
{'current': set(['10:20:30:40:50:60']),
'updated': set(),
'added': set(['11:21:31:41:51:61']),
'removed': set(['13:23:33:43:53:63'])})
def test_process_network_ports_with_updated_ports(self):
self._test_process_network_ports(
{'current': set(['10:20:30:40:50:60']),
'updated': set(['12:22:32:42:52:62']),
'added': set(['11:21:31:41:51:61']),
'removed': set(['13:23:33:43:53:63'])})
def test_add_port_update(self):
mac_addr = '10:20:30:40:50:60'
self.agent.add_port_update(mac_addr)
self.assertEqual(set([mac_addr]), self.agent.updated_ports)
def _mock_scan_ports(self, vif_port_set, registered_ports, updated_ports):
with mock.patch.object(self.agent.eswitch, 'get_vnics_mac',
return_value=vif_port_set):
return self.agent.scan_ports(registered_ports, updated_ports)
def test_scan_ports_return_current_for_unchanged_ports(self):
vif_port_set = set([1, 2])
registered_ports = set([1, 2])
actual = self._mock_scan_ports(vif_port_set,
registered_ports, set())
expected = dict(current=vif_port_set, added=set(),
removed=set(), updated=set())
self.assertEqual(expected, actual)
def test_scan_ports_return_port_changes(self):
vif_port_set = set([1, 3])
registered_ports = set([1, 2])
actual = self._mock_scan_ports(vif_port_set,
registered_ports, set())
expected = dict(current=vif_port_set, added=set([3]),
removed=set([2]), updated=set())
self.assertEqual(expected, actual)
def test_scan_ports_with_updated_ports(self):
vif_port_set = set([1, 3, 4])
registered_ports = set([1, 2, 4])
actual = self._mock_scan_ports(vif_port_set,
registered_ports, set([4]))
expected = dict(current=vif_port_set, added=set([3]),
removed=set([2]), updated=set([4]))
self.assertEqual(expected, actual)
def test_scan_ports_with_unknown_updated_ports(self):
vif_port_set = set([1, 3, 4])
registered_ports = set([1, 2, 4])
actual = self._mock_scan_ports(vif_port_set,
registered_ports,
updated_ports=set([4, 5]))
expected = dict(current=vif_port_set, added=set([3]),
removed=set([2]), updated=set([4]))
self.assertEqual(expected, actual)