diff --git a/neutron/plugins/openvswitch/agent/ovs_neutron_agent.py b/neutron/plugins/openvswitch/agent/ovs_neutron_agent.py index cbfa110d62..17aa55f499 100644 --- a/neutron/plugins/openvswitch/agent/ovs_neutron_agent.py +++ b/neutron/plugins/openvswitch/agent/ovs_neutron_agent.py @@ -42,10 +42,8 @@ from neutron.common import legacy from neutron.common import topics from neutron.common import utils as q_utils from neutron import context -from neutron.extensions import securitygroup as ext_sg from neutron.openstack.common import log as logging from neutron.openstack.common import loopingcall -from neutron.openstack.common.rpc import common as rpc_common from neutron.openstack.common.rpc import dispatcher from neutron.plugins.common import constants as p_const from neutron.plugins.openvswitch.common import config # noqa @@ -229,6 +227,8 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin, self.sg_agent = OVSSecurityGroupAgent(self.context, self.plugin_rpc, root_helper) + # Stores port update notifications for processing in main rpc loop + self.updated_ports = set() # Initialize iteration counter self.iter_num = 0 @@ -293,34 +293,13 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin, LOG.debug(_("Network %s not used on agent."), network_id) def port_update(self, context, **kwargs): - LOG.debug(_("port_update received")) port = kwargs.get('port') - # Validate that port is on OVS - vif_port = self.int_br.get_vif_port_by_id(port['id']) - if not vif_port: - return - - if ext_sg.SECURITYGROUPS in port: - self.sg_agent.refresh_firewall() - network_type = kwargs.get('network_type') - segmentation_id = kwargs.get('segmentation_id') - physical_network = kwargs.get('physical_network') - self.treat_vif_port(vif_port, port['id'], port['network_id'], - network_type, physical_network, - segmentation_id, port['admin_state_up']) - try: - if port['admin_state_up']: - # update plugin about port status - self.plugin_rpc.update_device_up(self.context, port['id'], - self.agent_id, - cfg.CONF.host) - else: - # update plugin about port status - self.plugin_rpc.update_device_down(self.context, port['id'], - self.agent_id, - cfg.CONF.host) - except rpc_common.Timeout: - LOG.error(_("RPC timeout while updating port %s"), port['id']) + # Put the port identifier in the updated_ports set. + # Even if full port details might be provided to this call, + # they are not used since there is no guarantee the notifications + # are processed in the same order as the relevant API requests + self.updated_ports.add(port['id']) + LOG.debug(_("port_update message processed for port %s"), port['id']) def tunnel_update(self, context, **kwargs): LOG.debug(_("tunnel_update received")) @@ -830,16 +809,29 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin, int_veth.link.set_mtu(self.veth_mtu) phys_veth.link.set_mtu(self.veth_mtu) - def update_ports(self, registered_ports): - ports = self.int_br.get_vif_port_set() - if ports == registered_ports: - return - self.int_br_device_count = len(ports) - added = ports - registered_ports - removed = registered_ports - ports - return {'current': ports, - 'added': added, - 'removed': removed} + def scan_ports(self, registered_ports, updated_ports=None): + cur_ports = self.int_br.get_vif_port_set() + self.int_br_device_count = len(cur_ports) + port_info = {'current': cur_ports} + if updated_ports: + # Some updated ports might have been removed in the + # meanwhile, and therefore should not be processed. + # In this case the updated port won't be found among + # current ports. + updated_ports &= cur_ports + if updated_ports: + port_info['updated'] = updated_ports + + # FIXME(salv-orlando): It's not really necessary to return early + # if nothing has changed. + if cur_ports == registered_ports: + # No added or removed ports to set, just return here + return port_info + + port_info['added'] = cur_ports - registered_ports + # Remove all the known ports not found on the integration bridge + port_info['removed'] = registered_ports - cur_ports + return port_info def update_ancillary_ports(self, registered_ports): ports = set() @@ -917,12 +909,13 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin, self.tun_br.delete_port(port_name) self.tun_br_ofports[tunnel_type].pop(remote_ip, None) - def treat_devices_added(self, devices): + def treat_devices_added_or_updated(self, devices): resync = False - self.sg_agent.prepare_devices_filter(devices) for device in devices: - LOG.info(_("Port %s added"), device) + LOG.debug(_("Processing port:%s"), device) try: + # TODO(salv-orlando): Provide bulk API for retrieving + # details for all devices in one call details = self.plugin_rpc.get_device_details(self.context, device, self.agent_id) @@ -942,12 +935,15 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin, details['physical_network'], details['segmentation_id'], details['admin_state_up']) - # update plugin about port status - self.plugin_rpc.update_device_up(self.context, - device, - self.agent_id, - cfg.CONF.host) + if details.get('admin_state_up'): + LOG.debug(_("Setting status for %s to UP"), device) + self.plugin_rpc.update_device_up( + self.context, device, self.agent_id, cfg.CONF.host) + else: + LOG.debug(_("Setting status for %s to DOWN"), device) + self.plugin_rpc.update_device_down( + self.context, device, self.agent_id, cfg.CONF.host) else: LOG.debug(_("Device %s not defined on plugin"), device) if (port and int(port.ofport) != -1): @@ -1017,11 +1013,30 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin, def process_network_ports(self, port_info): resync_a = False resync_b = False - if 'added' in port_info: + # If there is an exception while processing security groups ports + # will not be wired anyway, and a resync will be triggered + self.sg_agent.prepare_devices_filter(port_info.get('added', set())) + # TODO(salv-orlando): Optimize by avoiding unnecessary applying + # filters twice to the same ports, and unnecessary calls to the + # plugin (eg: when there are no IP address changes) + if port_info.get('updated'): + self.sg_agent.refresh_firewall() + # VIF wiring needs to be performed always for 'new' devices. + # For updated ports, re-wiring is not needed in most cases, but needs + # to be performed anyway when the admin state of a device is changed. + # TODO(salv-orlando): Optimize for avoiding unnecessary VIF + # processing for updated ports whose admin state is left unchanged + # A device might be both in the 'added' and 'updated' + # list at the same time; avoid processing it twice. + devices_added_updated = (port_info.get('added', set()) | + port_info.get('updated', set())) + if devices_added_updated: start = time.time() - resync_a = self.treat_devices_added(port_info['added']) + resync_a = self.treat_devices_added_or_updated( + devices_added_updated) LOG.debug(_("process_network_ports - iteration:%(iter_num)d -" - "treat_devices_added completed in %(elapsed).3f"), + "treat_devices_added_or_updated completed " + "in %(elapsed).3f"), {'iter_num': self.iter_num, 'elapsed': time.time() - start}) if 'removed' in port_info: @@ -1080,55 +1095,80 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin, resync = True return resync + def _agent_has_updates(self, polling_manager): + return (polling_manager.is_polling_required or + self.updated_ports) + + def _port_info_has_changes(self, port_info): + return (port_info.get('added') or + port_info.get('removed') or + port_info.get('updated')) + def rpc_loop(self, polling_manager=None): if not polling_manager: polling_manager = polling.AlwaysPoll() sync = True ports = set() + updated_ports_copy = set() ancillary_ports = set() tunnel_sync = True while True: - try: - start = time.time() - port_stats = {'regular': {'added': 0, 'removed': 0}, - 'ancillary': {'added': 0, 'removed': 0}} - LOG.debug(_("Agent rpc_loop - iteration:%d started"), - self.iter_num) - if sync: - LOG.info(_("Agent out of sync with plugin!")) - ports.clear() - ancillary_ports.clear() - sync = False - polling_manager.force_polling() - - # Notify the plugin of tunnel IP - if self.enable_tunneling and tunnel_sync: - LOG.info(_("Agent tunnel out of sync with plugin!")) + start = time.time() + port_stats = {'regular': {'added': 0, + 'updated': 0, + 'removed': 0}, + 'ancillary': {'added': 0, + 'removed': 0}} + LOG.debug(_("Agent rpc_loop - iteration:%d started"), + self.iter_num) + if sync: + LOG.info(_("Agent out of sync with plugin!")) + ports.clear() + ancillary_ports.clear() + sync = False + polling_manager.force_polling() + # Notify the plugin of tunnel IP + if self.enable_tunneling and tunnel_sync: + LOG.info(_("Agent tunnel out of sync with plugin!")) + try: tunnel_sync = self.tunnel_sync() - if polling_manager.is_polling_required: + except Exception: + LOG.exception(_("Error while synchronizing tunnels")) + tunnel_sync = True + if self._agent_has_updates(polling_manager): + try: LOG.debug(_("Agent rpc_loop - iteration:%(iter_num)d - " "starting polling. Elapsed:%(elapsed).3f"), {'iter_num': self.iter_num, 'elapsed': time.time() - start}) - port_info = self.update_ports(ports) + # Save updated ports dict to perform rollback in + # case resync would be needed, and then clear + # self.updated_ports. As the greenthread should not yield + # between these two statements, this will be thread-safe + updated_ports_copy = self.updated_ports + self.updated_ports = set() + port_info = self.scan_ports(ports, updated_ports_copy) + ports = port_info['current'] LOG.debug(_("Agent rpc_loop - iteration:%(iter_num)d - " "port information retrieved. " "Elapsed:%(elapsed).3f"), {'iter_num': self.iter_num, 'elapsed': time.time() - start}) # notify plugin about port deltas - if port_info: - LOG.debug(_("Agent loop has new devices!")) + if self._port_info_has_changes(port_info): + LOG.debug(_("Starting to process devices in:%s"), + port_info) # If treat devices fails - must resync with plugin sync = self.process_network_ports(port_info) LOG.debug(_("Agent rpc_loop - iteration:%(iter_num)d -" "ports processed. Elapsed:%(elapsed).3f"), {'iter_num': self.iter_num, 'elapsed': time.time() - start}) - ports = port_info['current'] port_stats['regular']['added'] = ( len(port_info.get('added', []))) + port_stats['regular']['updated'] = ( + len(port_info.get('updated', []))) port_stats['regular']['removed'] = ( len(port_info.get('removed', []))) # Treat ancillary devices if they exist @@ -1157,11 +1197,11 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin, sync = sync | rc polling_manager.polling_completed() - - except Exception: - LOG.exception(_("Error in agent event loop")) - sync = True - tunnel_sync = True + except Exception: + LOG.exception(_("Error while processing VIF ports")) + # Put the ports back in self.updated_port + self.updated_ports |= updated_ports_copy + sync = True # sleep till end of polling interval elapsed = (time.time() - start) diff --git a/neutron/tests/unit/openvswitch/test_ovs_neutron_agent.py b/neutron/tests/unit/openvswitch/test_ovs_neutron_agent.py index 1d1b24447c..76eb976beb 100644 --- a/neutron/tests/unit/openvswitch/test_ovs_neutron_agent.py +++ b/neutron/tests/unit/openvswitch/test_ovs_neutron_agent.py @@ -25,7 +25,6 @@ from neutron.agent.linux import ip_lib from neutron.agent.linux import ovs_lib from neutron.agent.linux import utils from neutron.common import constants as n_const -from neutron.openstack.common.rpc import common as rpc_common from neutron.plugins.common import constants as p_const from neutron.plugins.openvswitch.agent import ovs_neutron_agent from neutron.plugins.openvswitch.common import constants @@ -154,28 +153,69 @@ class TestOvsNeutronAgent(base.BaseTestCase): self.agent.port_dead(mock.Mock()) self.assertTrue(add_flow_func.called) - def mock_update_ports(self, vif_port_set=None, registered_ports=None): + def mock_scan_ports(self, vif_port_set=None, registered_ports=None, + updated_ports=None): with mock.patch.object(self.agent.int_br, 'get_vif_port_set', return_value=vif_port_set): - return self.agent.update_ports(registered_ports) + return self.agent.scan_ports(registered_ports, updated_ports) - def test_update_ports_returns_none_for_unchanged_ports(self): - self.assertIsNone(self.mock_update_ports()) + def test_scan_ports_returns_current_only_for_unchanged_ports(self): + vif_port_set = set([1, 3]) + registered_ports = set([1, 3]) + expected = {'current': vif_port_set} + actual = self.mock_scan_ports(vif_port_set, registered_ports) + self.assertEqual(expected, actual) - def test_update_ports_returns_port_changes(self): + def test_scan_ports_returns_port_changes(self): vif_port_set = set([1, 3]) registered_ports = set([1, 2]) expected = dict(current=vif_port_set, added=set([3]), removed=set([2])) - actual = self.mock_update_ports(vif_port_set, registered_ports) + actual = self.mock_scan_ports(vif_port_set, registered_ports) + self.assertEqual(expected, actual) + + def _test_scan_ports_with_updated_ports(self, updated_ports): + vif_port_set = set([1, 3, 4]) + registered_ports = set([1, 2, 4]) + expected = dict(current=vif_port_set, added=set([3]), + removed=set([2]), updated=set([4])) + actual = self.mock_scan_ports(vif_port_set, registered_ports, + updated_ports) + self.assertEqual(expected, actual) + + def test_scan_ports_finds_known_updated_ports(self): + self._test_scan_ports_with_updated_ports(set([4])) + + def test_scan_ports_ignores_unknown_updated_ports(self): + # the port '5' was not seen on current ports. Hence it has either + # never been wired or already removed and should be ignored + self._test_scan_ports_with_updated_ports(set([4, 5])) + + def test_scan_ports_ignores_updated_port_if_removed(self): + vif_port_set = set([1, 3]) + registered_ports = set([1, 2]) + updated_ports = set([1, 2]) + expected = dict(current=vif_port_set, added=set([3]), + removed=set([2]), updated=set([1])) + actual = self.mock_scan_ports(vif_port_set, registered_ports, + updated_ports) + self.assertEqual(expected, actual) + + def test_scan_ports_no_vif_changes_returns_updated_port_only(self): + vif_port_set = set([1, 2, 3]) + registered_ports = set([1, 2, 3]) + updated_ports = set([2]) + expected = dict(current=vif_port_set, updated=set([2])) + actual = self.mock_scan_ports(vif_port_set, registered_ports, + updated_ports) self.assertEqual(expected, actual) def test_treat_devices_added_returns_true_for_missing_device(self): with mock.patch.object(self.agent.plugin_rpc, 'get_device_details', side_effect=Exception()): - self.assertTrue(self.agent.treat_devices_added([{}])) + self.assertTrue(self.agent.treat_devices_added_or_updated([{}])) - def _mock_treat_devices_added(self, details, port, func_name): - """Mock treat devices added. + def _mock_treat_devices_added_updated(self, details, port, func_name): + """Mock treat devices added or updated. :param details: the details to return for the device :param port: the port that get_vif_port_by_id should return @@ -188,29 +228,51 @@ class TestOvsNeutronAgent(base.BaseTestCase): mock.patch.object(self.agent.int_br, 'get_vif_port_by_id', return_value=port), mock.patch.object(self.agent.plugin_rpc, 'update_device_up'), + mock.patch.object(self.agent.plugin_rpc, 'update_device_down'), mock.patch.object(self.agent, func_name) - ) as (get_dev_fn, get_vif_func, upd_dev_up, func): - self.assertFalse(self.agent.treat_devices_added([{}])) + ) as (get_dev_fn, get_vif_func, upd_dev_up, upd_dev_down, func): + self.assertFalse(self.agent.treat_devices_added_or_updated([{}])) return func.called - def test_treat_devices_added_ignores_invalid_ofport(self): + def test_treat_devices_added_updated_ignores_invalid_ofport(self): port = mock.Mock() port.ofport = -1 - self.assertFalse(self._mock_treat_devices_added(mock.MagicMock(), port, - 'port_dead')) + self.assertFalse(self._mock_treat_devices_added_updated( + mock.MagicMock(), port, 'port_dead')) - def test_treat_devices_added_marks_unknown_port_as_dead(self): + def test_treat_devices_added_updated_marks_unknown_port_as_dead(self): port = mock.Mock() port.ofport = 1 - self.assertTrue(self._mock_treat_devices_added(mock.MagicMock(), port, - 'port_dead')) + self.assertTrue(self._mock_treat_devices_added_updated( + mock.MagicMock(), port, 'port_dead')) - def test_treat_devices_added_updates_known_port(self): + def test_treat_devices_added_updated_updates_known_port(self): details = mock.MagicMock() details.__contains__.side_effect = lambda x: True - self.assertTrue(self._mock_treat_devices_added(details, - mock.Mock(), - 'treat_vif_port')) + self.assertTrue(self._mock_treat_devices_added_updated( + details, mock.Mock(), 'treat_vif_port')) + + def test_treat_devices_added_updated_put_port_down(self): + fake_details_dict = {'admin_state_up': False, + 'port_id': 'xxx', + 'device': 'xxx', + 'network_id': 'yyy', + 'physical_network': 'foo', + 'segmentation_id': 'bar', + 'network_type': 'baz'} + with contextlib.nested( + mock.patch.object(self.agent.plugin_rpc, 'get_device_details', + return_value=fake_details_dict), + mock.patch.object(self.agent.int_br, 'get_vif_port_by_id', + return_value=mock.MagicMock()), + mock.patch.object(self.agent.plugin_rpc, 'update_device_up'), + mock.patch.object(self.agent.plugin_rpc, 'update_device_down'), + mock.patch.object(self.agent, 'treat_vif_port') + ) as (get_dev_fn, get_vif_func, upd_dev_up, + upd_dev_down, treat_vif_port): + self.assertFalse(self.agent.treat_devices_added_or_updated([{}])) + self.assertTrue(treat_vif_port.called) + self.assertTrue(upd_dev_down.called) def test_treat_devices_removed_returns_true_for_missing_device(self): with mock.patch.object(self.agent.plugin_rpc, 'update_device_down', @@ -231,17 +293,36 @@ class TestOvsNeutronAgent(base.BaseTestCase): def test_treat_devices_removed_ignores_missing_port(self): self._mock_treat_devices_removed(False) + def _test_process_network_ports(self, port_info): + with contextlib.nested( + mock.patch.object(self.agent.sg_agent, "prepare_devices_filter"), + mock.patch.object(self.agent.sg_agent, "refresh_firewall"), + mock.patch.object(self.agent, "treat_devices_added_or_updated", + return_value=False), + mock.patch.object(self.agent, "treat_devices_removed", + return_value=False) + ) as (prep_dev_filter, refresh_fw, + device_added_updated, device_removed): + self.assertFalse(self.agent.process_network_ports(port_info)) + prep_dev_filter.assert_called_once_with(port_info['added']) + if port_info.get('updated'): + self.assertEqual(1, refresh_fw.call_count) + device_added_updated.assert_called_once_with( + port_info['added'] | port_info.get('updated', set())) + device_removed.assert_called_once_with(port_info['removed']) + def test_process_network_ports(self): - reply = {'current': set(['tap0']), - 'removed': set(['eth0']), - 'added': set(['eth1'])} - with mock.patch.object(self.agent, 'treat_devices_added', - return_value=False) as device_added: - with mock.patch.object(self.agent, 'treat_devices_removed', - return_value=False) as device_removed: - self.assertFalse(self.agent.process_network_ports(reply)) - self.assertTrue(device_added.called) - self.assertTrue(device_removed.called) + self._test_process_network_ports( + {'current': set(['tap0']), + 'removed': set(['eth0']), + 'added': set(['eth1'])}) + + def test_process_network_port_with_updated_ports(self): + self._test_process_network_ports( + {'current': set(['tap0', 'tap1']), + 'updated': set(['tap1', 'eth1']), + 'removed': set(['eth0']), + 'added': set(['eth1'])}) def test_report_state(self): with mock.patch.object(self.agent.state_rpc, @@ -271,61 +352,15 @@ class TestOvsNeutronAgent(base.BaseTestCase): recl_fn.assert_called_with("123") def test_port_update(self): - with contextlib.nested( - mock.patch.object(self.agent.int_br, "get_vif_port_by_id"), - mock.patch.object(self.agent, "treat_vif_port"), - mock.patch.object(self.agent.plugin_rpc, "update_device_up"), - mock.patch.object(self.agent.plugin_rpc, "update_device_down") - ) as (getvif_fn, treatvif_fn, updup_fn, upddown_fn): - port = {"id": "123", - "network_id": "124", - "admin_state_up": False} - getvif_fn.return_value = "vif_port_obj" - self.agent.port_update("unused_context", - port=port, - network_type="vlan", - segmentation_id="1", - physical_network="physnet") - treatvif_fn.assert_called_with("vif_port_obj", "123", - "124", "vlan", "physnet", - "1", False) - upddown_fn.assert_called_with(self.agent.context, - "123", self.agent.agent_id, - cfg.CONF.host) - - port["admin_state_up"] = True - self.agent.port_update("unused_context", - port=port, - network_type="vlan", - segmentation_id="1", - physical_network="physnet") - updup_fn.assert_called_with(self.agent.context, - "123", self.agent.agent_id, - cfg.CONF.host) - - def test_port_update_plugin_rpc_failed(self): - port = {'id': 1, - 'network_id': 1, - 'admin_state_up': True} - with contextlib.nested( - mock.patch.object(ovs_neutron_agent.LOG, 'error'), - mock.patch.object(self.agent.int_br, "get_vif_port_by_id"), - mock.patch.object(self.agent.plugin_rpc, 'update_device_up'), - mock.patch.object(self.agent, 'port_bound'), - mock.patch.object(self.agent.plugin_rpc, 'update_device_down'), - mock.patch.object(self.agent, 'port_dead') - ) as (log, _, device_up, _, device_down, _): - device_up.side_effect = rpc_common.Timeout - self.agent.port_update(mock.Mock(), port=port) - self.assertTrue(device_up.called) - self.assertEqual(log.call_count, 1) - - log.reset_mock() - port['admin_state_up'] = False - device_down.side_effect = rpc_common.Timeout - self.agent.port_update(mock.Mock(), port=port) - self.assertTrue(device_down.called) - self.assertEqual(log.call_count, 1) + port = {"id": "123", + "network_id": "124", + "admin_state_up": False} + self.agent.port_update("unused_context", + port=port, + network_type="vlan", + segmentation_id="1", + physical_network="physnet") + self.assertEqual(set(['123']), self.agent.updated_ports) def test_setup_physical_bridges(self): with contextlib.nested( diff --git a/neutron/tests/unit/openvswitch/test_ovs_tunnel.py b/neutron/tests/unit/openvswitch/test_ovs_tunnel.py index 8b3ba059cf..335a33f0cf 100644 --- a/neutron/tests/unit/openvswitch/test_ovs_tunnel.py +++ b/neutron/tests/unit/openvswitch/test_ovs_tunnel.py @@ -496,23 +496,23 @@ class TunnelTest(base.BaseTestCase): def test_daemon_loop(self): reply2 = {'current': set(['tap0']), - 'added': set([]), + 'added': set(['tap2']), 'removed': set([])} reply3 = {'current': set(['tap2']), 'added': set([]), - 'removed': set([])} + 'removed': set(['tap0'])} with contextlib.nested( mock.patch.object(log.ContextAdapter, 'exception'), mock.patch.object(ovs_neutron_agent.OVSNeutronAgent, - 'update_ports'), + 'scan_ports'), mock.patch.object(ovs_neutron_agent.OVSNeutronAgent, 'process_network_ports') - ) as (log_exception, update_ports, process_network_ports): + ) as (log_exception, scan_ports, process_network_ports): log_exception.side_effect = Exception( 'Fake exception to get out of the loop') - update_ports.side_effect = [reply2, reply3] + scan_ports.side_effect = [reply2, reply3] process_network_ports.side_effect = [ False, Exception('Fake exception to get out of the loop')] @@ -531,17 +531,19 @@ class TunnelTest(base.BaseTestCase): except Exception: pass - log_exception.assert_called_once_with("Error in agent event loop") - update_ports.assert_has_calls([ - mock.call(set()), - mock.call(set(['tap0'])) + # FIXME(salv-orlando): There should not be assertions on log messages + log_exception.assert_called_once_with( + "Error while processing VIF ports") + scan_ports.assert_has_calls([ + mock.call(set(), set()), + mock.call(set(['tap0']), set()) ]) process_network_ports.assert_has_calls([ mock.call({'current': set(['tap0']), 'removed': set([]), - 'added': set([])}), + 'added': set(['tap2'])}), mock.call({'current': set(['tap2']), - 'removed': set([]), + 'removed': set(['tap0']), 'added': set([])}) ]) self._verify_mock_calls()