Replace loopingcall in notifier with a delayed send

The loopingcall thread here was started before processes fork and so
the thread stops working after the fork call.  This is a problem that
will probably need to be worked out in the long run.

To ensure that this notifier works correctly in all processes, this
change replaces the persistent loopingcall thread with a thread
created on demand to delay and batch up notifications.  The first
notification will trigger spawning the thread to wait to send it.  Any
notifications that come in the meantime will notice that there is
already a thread waiting to send and will return without spawning.

Change-Id: I519d4e89b8cee341c0e1cfffbce3e77151e8202a
Closes-Bug: #1301035
This commit is contained in:
Carl Baldwin 2014-04-02 16:53:33 +00:00
parent 0df5e1f0f8
commit 855e5df1de
2 changed files with 70 additions and 7 deletions

View File

@ -13,6 +13,7 @@
# License for the specific language governing permissions and limitations
# under the License.
import eventlet
from novaclient import exceptions as nova_exceptions
import novaclient.v1_1.client as nclient
from novaclient.v1_1.contrib import server_external_events
@ -23,7 +24,6 @@ from neutron.common import constants
from neutron import context
from neutron import manager
from neutron.openstack.common import log as logging
from neutron.openstack.common import loopingcall
from neutron.openstack.common import uuidutils
@ -55,8 +55,44 @@ class Notifier(object):
region_name=cfg.CONF.nova_region_name,
extensions=[server_external_events])
self.pending_events = []
event_sender = loopingcall.FixedIntervalLoopingCall(self.send_events)
event_sender.start(interval=cfg.CONF.send_events_interval)
self._waiting_to_send = False
def queue_event(self, event):
"""Called to queue sending an event with the next batch of events.
Sending events individually, as they occur, has been problematic as it
can result in a flood of sends. Previously, there was a loopingcall
thread that would send batched events on a periodic interval. However,
maintaining a persistent thread in the loopingcall was also
problematic.
This replaces the loopingcall with a mechanism that creates a
short-lived thread on demand when the first event is queued. That
thread will sleep once for the same send_events_interval to allow other
events to queue up in pending_events and then will send them when it
wakes.
If a thread is already alive and waiting, this call will simply queue
the event and return leaving it up to the thread to send it.
:param event: the event that occured.
"""
if not event:
return
self.pending_events.append(event)
if self._waiting_to_send:
return
self._waiting_to_send = True
def last_out_sends():
eventlet.sleep(cfg.CONF.send_events_interval)
self._waiting_to_send = False
self.send_events()
eventlet.spawn_n(last_out_sends)
def _is_compute_port(self, port):
try:
@ -94,8 +130,7 @@ class Notifier(object):
event = self.create_port_changed_event(action, original_obj,
returned_obj)
if event:
self.pending_events.append(event)
self.queue_event(event)
def create_port_changed_event(self, action, original_obj, returned_obj):
port = None
@ -172,8 +207,7 @@ class Notifier(object):
def send_port_status(self, mapper, connection, port):
event = getattr(port, "_notify_event", None)
if event:
self.pending_events.append(event)
self.queue_event(event)
port._notify_event = None
def send_events(self):

View File

@ -274,3 +274,32 @@ class TestNovaNotify(base.BaseTestCase):
self.nova_notifier.pending_events.append(
{'name': 'network-changed', 'server_uuid': device_id})
self.nova_notifier.send_events()
def test_queue_event_no_event(self):
with mock.patch('eventlet.spawn_n') as spawn_n:
self.nova_notifier.queue_event(None)
self.assertEqual(0, len(self.nova_notifier.pending_events))
self.assertEqual(0, spawn_n.call_count)
def test_queue_event_first_event(self):
with mock.patch('eventlet.spawn_n') as spawn_n:
self.nova_notifier.queue_event(mock.Mock())
self.assertEqual(1, len(self.nova_notifier.pending_events))
self.assertEqual(1, spawn_n.call_count)
def test_queue_event_multiple_events(self):
with mock.patch('eventlet.spawn_n') as spawn_n:
events = 6
for i in range(0, events):
self.nova_notifier.queue_event(mock.Mock())
self.assertEqual(events, len(self.nova_notifier.pending_events))
self.assertEqual(1, spawn_n.call_count)
def test_queue_event_call_send_events(self):
with mock.patch.object(self.nova_notifier,
'send_events') as send_events:
with mock.patch('eventlet.spawn_n') as spawn_n:
spawn_n.side_effect = lambda func: func()
self.nova_notifier.queue_event(mock.Mock())
self.assertFalse(self.nova_notifier._waiting_to_send)
send_events.assert_called_once_with()