diff --git a/neutron/notifiers/nova.py b/neutron/notifiers/nova.py index bcb253ccfa..1237a16b74 100644 --- a/neutron/notifiers/nova.py +++ b/neutron/notifiers/nova.py @@ -13,6 +13,7 @@ # License for the specific language governing permissions and limitations # under the License. +import eventlet from novaclient import exceptions as nova_exceptions import novaclient.v1_1.client as nclient from novaclient.v1_1.contrib import server_external_events @@ -23,7 +24,6 @@ from neutron.common import constants from neutron import context from neutron import manager from neutron.openstack.common import log as logging -from neutron.openstack.common import loopingcall from neutron.openstack.common import uuidutils @@ -55,8 +55,44 @@ class Notifier(object): region_name=cfg.CONF.nova_region_name, extensions=[server_external_events]) self.pending_events = [] - event_sender = loopingcall.FixedIntervalLoopingCall(self.send_events) - event_sender.start(interval=cfg.CONF.send_events_interval) + self._waiting_to_send = False + + def queue_event(self, event): + """Called to queue sending an event with the next batch of events. + + Sending events individually, as they occur, has been problematic as it + can result in a flood of sends. Previously, there was a loopingcall + thread that would send batched events on a periodic interval. However, + maintaining a persistent thread in the loopingcall was also + problematic. + + This replaces the loopingcall with a mechanism that creates a + short-lived thread on demand when the first event is queued. That + thread will sleep once for the same send_events_interval to allow other + events to queue up in pending_events and then will send them when it + wakes. + + If a thread is already alive and waiting, this call will simply queue + the event and return leaving it up to the thread to send it. + + :param event: the event that occured. + """ + if not event: + return + + self.pending_events.append(event) + + if self._waiting_to_send: + return + + self._waiting_to_send = True + + def last_out_sends(): + eventlet.sleep(cfg.CONF.send_events_interval) + self._waiting_to_send = False + self.send_events() + + eventlet.spawn_n(last_out_sends) def _is_compute_port(self, port): try: @@ -94,8 +130,7 @@ class Notifier(object): event = self.create_port_changed_event(action, original_obj, returned_obj) - if event: - self.pending_events.append(event) + self.queue_event(event) def create_port_changed_event(self, action, original_obj, returned_obj): port = None @@ -172,8 +207,7 @@ class Notifier(object): def send_port_status(self, mapper, connection, port): event = getattr(port, "_notify_event", None) - if event: - self.pending_events.append(event) + self.queue_event(event) port._notify_event = None def send_events(self): diff --git a/neutron/tests/unit/notifiers/test_notifiers_nova.py b/neutron/tests/unit/notifiers/test_notifiers_nova.py index eb32aef6df..7972ebf55a 100644 --- a/neutron/tests/unit/notifiers/test_notifiers_nova.py +++ b/neutron/tests/unit/notifiers/test_notifiers_nova.py @@ -274,3 +274,32 @@ class TestNovaNotify(base.BaseTestCase): self.nova_notifier.pending_events.append( {'name': 'network-changed', 'server_uuid': device_id}) self.nova_notifier.send_events() + + def test_queue_event_no_event(self): + with mock.patch('eventlet.spawn_n') as spawn_n: + self.nova_notifier.queue_event(None) + self.assertEqual(0, len(self.nova_notifier.pending_events)) + self.assertEqual(0, spawn_n.call_count) + + def test_queue_event_first_event(self): + with mock.patch('eventlet.spawn_n') as spawn_n: + self.nova_notifier.queue_event(mock.Mock()) + self.assertEqual(1, len(self.nova_notifier.pending_events)) + self.assertEqual(1, spawn_n.call_count) + + def test_queue_event_multiple_events(self): + with mock.patch('eventlet.spawn_n') as spawn_n: + events = 6 + for i in range(0, events): + self.nova_notifier.queue_event(mock.Mock()) + self.assertEqual(events, len(self.nova_notifier.pending_events)) + self.assertEqual(1, spawn_n.call_count) + + def test_queue_event_call_send_events(self): + with mock.patch.object(self.nova_notifier, + 'send_events') as send_events: + with mock.patch('eventlet.spawn_n') as spawn_n: + spawn_n.side_effect = lambda func: func() + self.nova_notifier.queue_event(mock.Mock()) + self.assertFalse(self.nova_notifier._waiting_to_send) + send_events.assert_called_once_with()