diff --git a/neutron/plugins/nec/agent/nec_neutron_agent.py b/neutron/plugins/nec/agent/nec_neutron_agent.py index 57d6257594..93766ab6bc 100755 --- a/neutron/plugins/nec/agent/nec_neutron_agent.py +++ b/neutron/plugins/nec/agent/nec_neutron_agent.py @@ -54,17 +54,13 @@ class NECPluginApi(agent_rpc.PluginApi): LOG.info(_("Update ports: added=%(added)s, " "removed=%(removed)s"), {'added': port_added, 'removed': port_removed}) - try: - self.call(context, - self.make_msg('update_ports', - topic=topics.AGENT, - agent_id=agent_id, - datapath_id=datapath_id, - port_added=port_added, - port_removed=port_removed)) - except Exception: - LOG.warn(_("update_ports() failed.")) - return + self.call(context, + self.make_msg('update_ports', + topic=topics.AGENT, + agent_id=agent_id, + datapath_id=datapath_id, + port_added=port_added, + port_removed=port_removed)) class NECAgentRpcCallback(object): @@ -126,6 +122,7 @@ class NECNeutronAgent(object): self.int_br = ovs_lib.OVSBridge(integ_br, root_helper) self.polling_interval = polling_interval self.cur_ports = [] + self.need_sync = True self.datapath_id = "0x%s" % self.int_br.get_datapath_id() @@ -194,21 +191,22 @@ class NECNeutronAgent(object): if port_removed: self.sg_agent.remove_devices_filter(port_removed) - def daemon_loop(self): - """Main processing loop for NEC Plugin Agent.""" - while True: + def loop_handler(self): + try: + # self.cur_ports will be kept until loop_handler succeeds. + cur_ports = [] if self.need_sync else self.cur_ports new_ports = [] port_added = [] for vif_port in self.int_br.get_vif_ports(): port_id = vif_port.vif_id new_ports.append(port_id) - if port_id not in self.cur_ports: + if port_id not in cur_ports: port_info = self._vif_port_to_port_info(vif_port) port_added.append(port_info) port_removed = [] - for port_id in self.cur_ports: + for port_id in cur_ports: if port_id not in new_ports: port_removed.append(port_id) @@ -221,6 +219,15 @@ class NECNeutronAgent(object): LOG.debug(_("No port changed.")) self.cur_ports = new_ports + self.need_sync = False + except Exception: + LOG.exception(_("Error in agent event loop")) + self.need_sync = True + + def daemon_loop(self): + """Main processing loop for NEC Plugin Agent.""" + while True: + self.loop_handler() time.sleep(self.polling_interval) diff --git a/neutron/tests/unit/nec/test_nec_agent.py b/neutron/tests/unit/nec/test_nec_agent.py index 2314370b69..4c9689ec62 100644 --- a/neutron/tests/unit/nec/test_nec_agent.py +++ b/neutron/tests/unit/nec/test_nec_agent.py @@ -61,6 +61,71 @@ class TestNecAgentBase(base.BaseTestCase): class TestNecAgent(TestNecAgentBase): + def _setup_mock(self): + vif_ports = [ovs_lib.VifPort('port1', '1', 'id-1', 'mac-1', + self.agent.int_br), + ovs_lib.VifPort('port2', '2', 'id-2', 'mac-2', + self.agent.int_br)] + self.get_vif_ports = mock.patch.object( + ovs_lib.OVSBridge, 'get_vif_ports', + return_value=vif_ports).start() + self.update_ports = mock.patch.object( + nec_neutron_agent.NECPluginApi, 'update_ports').start() + self.prepare_devices_filter = mock.patch.object( + self.agent.sg_agent, 'prepare_devices_filter').start() + self.remove_devices_filter = mock.patch.object( + self.agent.sg_agent, 'remove_devices_filter').start() + + def _test_single_loop(self, with_exc=False, need_sync=False): + self.agent.cur_ports = ['id-0', 'id-1'] + self.agent.need_sync = need_sync + + self.agent.loop_handler() + if with_exc: + self.assertEqual(self.agent.cur_ports, ['id-0', 'id-1']) + self.assertTrue(self.agent.need_sync) + else: + self.assertEqual(self.agent.cur_ports, ['id-1', 'id-2']) + self.assertFalse(self.agent.need_sync) + + def test_single_loop_normal(self): + self._setup_mock() + self._test_single_loop() + agent_id = 'nec-q-agent.dummy-host' + self.update_ports.assert_called_once_with( + mock.ANY, agent_id, OVS_DPID_0X, + [{'id': 'id-2', 'mac': 'mac-2', 'port_no': '2'}], + ['id-0']) + self.prepare_devices_filter.assert_called_once_with(['id-2']) + self.remove_devices_filter.assert_called_once_with(['id-0']) + + def test_single_loop_need_sync(self): + self._setup_mock() + self._test_single_loop(need_sync=True) + agent_id = 'nec-q-agent.dummy-host' + self.update_ports.assert_called_once_with( + mock.ANY, agent_id, OVS_DPID_0X, + [{'id': 'id-1', 'mac': 'mac-1', 'port_no': '1'}, + {'id': 'id-2', 'mac': 'mac-2', 'port_no': '2'}], + []) + self.prepare_devices_filter.assert_called_once_with(['id-1', 'id-2']) + self.assertFalse(self.remove_devices_filter.call_count) + + def test_single_loop_with_sg_exception_remove(self): + self._setup_mock() + self.update_ports.side_effect = Exception() + self._test_single_loop(with_exc=True) + + def test_single_loop_with_sg_exception_prepare(self): + self._setup_mock() + self.prepare_devices_filter.side_effect = Exception() + self._test_single_loop(with_exc=True) + + def test_single_loop_with_update_ports_exception(self): + self._setup_mock() + self.remove_devices_filter.side_effect = Exception() + self._test_single_loop(with_exc=True) + def test_daemon_loop(self): def state_check(index): @@ -278,9 +343,6 @@ class TestNecAgentPluginApi(TestNecAgentBase): def test_plugin_api(self): self._test_plugin_api() - def test_plugin_api_fail(self): - self._test_plugin_api(expected_failure=True) - class TestNecAgentMain(base.BaseTestCase): def test_main(self):