diff --git a/neutron/plugins/linuxbridge/agent/linuxbridge_neutron_agent.py b/neutron/plugins/linuxbridge/agent/linuxbridge_neutron_agent.py index fa60b19dd0..f5a4507f6e 100755 --- a/neutron/plugins/linuxbridge/agent/linuxbridge_neutron_agent.py +++ b/neutron/plugins/linuxbridge/agent/linuxbridge_neutron_agent.py @@ -934,15 +934,45 @@ class LinuxBridgeNeutronAgentRPC(sg_rpc.SecurityGroupAgentRpcMixin): self.br_mgr.remove_empty_bridges() return resync - def scan_devices(self, registered_devices, updated_devices): - curr_devices = self.br_mgr.get_tap_devices() + def scan_devices(self, previous, sync): device_info = {} - device_info['current'] = curr_devices - device_info['added'] = curr_devices - registered_devices - # we don't want to process updates for devices that don't exist - device_info['updated'] = updated_devices & curr_devices - # we need to clean up after devices are removed - device_info['removed'] = registered_devices - curr_devices + + # Save and reinitialise the set variable that the port_update RPC uses. + # This should be thread-safe as the greenthread should not yield + # between these two statements. + updated_devices = self.updated_devices + self.updated_devices = set() + + current_devices = self.br_mgr.get_tap_devices() + device_info['current'] = current_devices + + if previous is None: + # This is the first iteration of daemon_loop(). + previous = {'added': set(), + 'current': set(), + 'updated': set(), + 'removed': set()} + + if sync: + # This is the first iteration, or the previous one had a problem. + # Re-add all existing devices. + device_info['added'] = current_devices + + # Retry cleaning devices that may not have been cleaned properly. + # And clean any that disappeared since the previous iteration. + device_info['removed'] = (previous['removed'] | previous['current'] + - current_devices) + + # Retry updating devices that may not have been updated properly. + # And any that were updated since the previous iteration. + # Only update devices that currently exist. + device_info['updated'] = (previous['updated'] | updated_devices + & current_devices) + else: + device_info['added'] = current_devices - previous['current'] + device_info['removed'] = previous['current'] - current_devices + device_info['updated'] = updated_devices & current_devices + return device_info def _device_info_has_changes(self, device_info): @@ -951,39 +981,27 @@ class LinuxBridgeNeutronAgentRPC(sg_rpc.SecurityGroupAgentRpcMixin): or device_info.get('removed')) def daemon_loop(self): - sync = True - devices = set() - LOG.info(_("LinuxBridge Agent RPC Daemon Started!")) + device_info = None + sync = True while True: start = time.time() + + device_info = self.scan_devices(previous=device_info, sync=sync) + if sync: LOG.info(_("Agent out of sync with plugin!")) - devices.clear() sync = False - device_info = {} - # Save updated devices dict to perform rollback in case - # resync would be needed, and then clear self.updated_devices. - # As the greenthread should not yield between these - # two statements, this will should be thread-safe. - updated_devices_copy = self.updated_devices - self.updated_devices = set() - try: - device_info = self.scan_devices(devices, updated_devices_copy) - if self._device_info_has_changes(device_info): - LOG.debug(_("Agent loop found changes! %s"), device_info) - # If treat devices fails - indicates must resync with - # plugin + + if self._device_info_has_changes(device_info): + LOG.debug(_("Agent loop found changes! %s"), device_info) + try: sync = self.process_network_devices(device_info) - devices = device_info['current'] - except Exception: - LOG.exception(_("Error in agent loop. Devices info: %s"), - device_info) - sync = True - # Restore devices that were removed from this set earlier - # without overwriting ones that may have arrived since. - self.updated_devices |= updated_devices_copy + except Exception: + LOG.exception(_("Error in agent loop. Devices info: %s"), + device_info) + sync = True # sleep till end of polling interval elapsed = (time.time() - start) diff --git a/neutron/tests/unit/linuxbridge/test_lb_neutron_agent.py b/neutron/tests/unit/linuxbridge/test_lb_neutron_agent.py index a2a8b1785b..0c4661267d 100644 --- a/neutron/tests/unit/linuxbridge/test_lb_neutron_agent.py +++ b/neutron/tests/unit/linuxbridge/test_lb_neutron_agent.py @@ -17,7 +17,6 @@ import os import mock from oslo.config import cfg -import testtools from neutron.agent.linux import ip_lib from neutron.agent.linux import utils @@ -168,88 +167,121 @@ class TestLinuxBridgeAgent(base.BaseTestCase): self.assertTrue(fn_udd.called) self.assertTrue(fn_rdf.called) - def test_loop_restores_updated_devices_on_exception(self): - agent = self.agent - agent.updated_devices = set(['tap1', 'tap2']) - - with contextlib.nested( - mock.patch.object(agent, 'scan_devices'), - mock.patch.object(linuxbridge_neutron_agent.LOG, 'info'), - mock.patch.object(agent, 'process_network_devices') - ) as (scan_devices, log, process_network_devices): - # Simulate effect of 2 port_update()s when loop is running. - # And break out of loop at start of 2nd iteration. - log.side_effect = [agent.updated_devices.add('tap3'), - agent.updated_devices.add('tap4'), - ValueError] - scan_devices.side_effect = RuntimeError - - with testtools.ExpectedException(ValueError): - agent.daemon_loop() - - # Check that the originals {tap1,tap2} have been restored - # and the new updates {tap3, tap4} have not been overwritten. - self.assertEqual(set(['tap1', 'tap2', 'tap3', 'tap4']), - agent.updated_devices) - self.assertEqual(3, log.call_count) - - def mock_scan_devices(self, expected, mock_current, - registered_devices, updated_devices): + def _test_scan_devices(self, previous, updated, + fake_current, expected, sync): self.agent.br_mgr = mock.Mock() - self.agent.br_mgr.get_tap_devices.return_value = mock_current + self.agent.br_mgr.get_tap_devices.return_value = fake_current - results = self.agent.scan_devices(registered_devices, updated_devices) + self.agent.updated_devices = updated + results = self.agent.scan_devices(previous, sync) self.assertEqual(expected, results) - def test_scan_devices_returns_empty_sets(self): - registered = set() - updated = set() - mock_current = set() - expected = {'current': set(), - 'updated': set(), - 'added': set(), - 'removed': set()} - self.mock_scan_devices(expected, mock_current, registered, updated) - def test_scan_devices_no_changes(self): - registered = set(['tap1', 'tap2']) - updated = set() - mock_current = set(['tap1', 'tap2']) - expected = {'current': set(['tap1', 'tap2']), + previous = {'current': set([1, 2]), 'updated': set(), 'added': set(), 'removed': set()} - self.mock_scan_devices(expected, mock_current, registered, updated) - - def test_scan_devices_new_and_removed(self): - registered = set(['tap1', 'tap2']) + fake_current = set([1, 2]) updated = set() - mock_current = set(['tap2', 'tap3']) - expected = {'current': set(['tap2', 'tap3']), - 'updated': set(), - 'added': set(['tap3']), - 'removed': set(['tap1'])} - self.mock_scan_devices(expected, mock_current, registered, updated) - - def test_scan_devices_new_updates(self): - registered = set(['tap1']) - updated = set(['tap2']) - mock_current = set(['tap1', 'tap2']) - expected = {'current': set(['tap1', 'tap2']), - 'updated': set(['tap2']), - 'added': set(['tap2']), - 'removed': set()} - self.mock_scan_devices(expected, mock_current, registered, updated) - - def test_scan_devices_updated_missing(self): - registered = set(['tap1']) - updated = set(['tap2']) - mock_current = set(['tap1']) - expected = {'current': set(['tap1']), + expected = {'current': set([1, 2]), 'updated': set(), 'added': set(), 'removed': set()} - self.mock_scan_devices(expected, mock_current, registered, updated) + + self._test_scan_devices(previous, updated, fake_current, expected, + sync=False) + + def test_scan_devices_added_removed(self): + previous = {'current': set([1, 2]), + 'updated': set(), + 'added': set(), + 'removed': set()} + fake_current = set([2, 3]) + updated = set() + expected = {'current': set([2, 3]), + 'updated': set(), + 'added': set([3]), + 'removed': set([1])} + + self._test_scan_devices(previous, updated, fake_current, expected, + sync=False) + + def test_scan_devices_removed_retried_on_sync(self): + previous = {'current': set([2, 3]), + 'updated': set(), + 'added': set(), + 'removed': set([1])} + fake_current = set([2, 3]) + updated = set() + expected = {'current': set([2, 3]), + 'updated': set(), + 'added': set([2, 3]), + 'removed': set([1])} + + self._test_scan_devices(previous, updated, fake_current, expected, + sync=True) + + def test_scan_devices_vanished_removed_on_sync(self): + previous = {'current': set([2, 3]), + 'updated': set(), + 'added': set(), + 'removed': set([1])} + # Device 2 disappeared. + fake_current = set([3]) + updated = set() + # Device 1 should be retried. + expected = {'current': set([3]), + 'updated': set(), + 'added': set([3]), + 'removed': set([1, 2])} + + self._test_scan_devices(previous, updated, fake_current, expected, + sync=True) + + def test_scan_devices_updated(self): + previous = {'current': set([1, 2]), + 'updated': set(), + 'added': set(), + 'removed': set()} + fake_current = set([1, 2]) + updated = set([1]) + expected = {'current': set([1, 2]), + 'updated': set([1]), + 'added': set(), + 'removed': set()} + + self._test_scan_devices(previous, updated, fake_current, expected, + sync=False) + + def test_scan_devices_updated_non_existing(self): + previous = {'current': set([1, 2]), + 'updated': set(), + 'added': set(), + 'removed': set()} + fake_current = set([1, 2]) + updated = set([3]) + expected = {'current': set([1, 2]), + 'updated': set(), + 'added': set(), + 'removed': set()} + + self._test_scan_devices(previous, updated, fake_current, expected, + sync=False) + + def test_scan_devices_updated_on_sync(self): + previous = {'current': set([1, 2]), + 'updated': set([1]), + 'added': set(), + 'removed': set()} + fake_current = set([1, 2]) + updated = set([2]) + expected = {'current': set([1, 2]), + 'updated': set([1, 2]), + 'added': set([1, 2]), + 'removed': set()} + + self._test_scan_devices(previous, updated, fake_current, expected, + sync=True) def test_process_network_devices(self): agent = self.agent