From 56f24bdad16c21fe7daa4502844fa9e8a976a232 Mon Sep 17 00:00:00 2001 From: liusheng Date: Sat, 23 Apr 2016 09:21:46 +0800 Subject: [PATCH] Add batch listener support for event evaluator This change add batch messages listener for aodh-listener service, that allow users configure aodh-listener to receive messages in batch. Change-Id: Ifd287b4903bec92db10be759aa239ea354da8a06 Signed-off-by: liusheng --- aodh/event.py | 27 +++++++++++++---- aodh/messaging.py | 8 ----- aodh/opts.py | 6 ++-- .../functional/api/v2/test_alarm_scenarios.py | 12 ++++---- aodh/tests/unit/test_event.py | 30 ++++++++++++++++++- ...stener-batch-support-04e6ff159ef34d8c.yaml | 12 ++++++++ 6 files changed, 70 insertions(+), 25 deletions(-) create mode 100644 releasenotes/notes/event-listener-batch-support-04e6ff159ef34d8c.yaml diff --git a/aodh/event.py b/aodh/event.py index 2840aa88d..f7117d2b4 100644 --- a/aodh/event.py +++ b/aodh/event.py @@ -14,6 +14,7 @@ # under the License. from oslo_config import cfg +from oslo_log import log import oslo_messaging from oslo_service import service @@ -21,11 +22,21 @@ from aodh.evaluator import event from aodh import messaging from aodh import storage +LOG = log.getLogger(__name__) OPTS = [ cfg.StrOpt('event_alarm_topic', default='alarm.all', + deprecated_group='DEFAULT', help='The topic that aodh uses for event alarm evaluation.'), + cfg.IntOpt('batch_size', + default=1, + help='Number of notification messages to wait before ' + 'dispatching them.'), + cfg.IntOpt('batch_timeout', + default=None, + help='Number of seconds to wait before dispatching samples ' + 'when batch_size is not reached (None means indefinitely).'), ] @@ -34,9 +45,10 @@ class EventAlarmEndpoint(object): def __init__(self, evaluator): self.evaluator = evaluator - def sample(self, ctxt, publisher_id, event_type, payload, metadata): - # TODO(r-mibu): requeue on error - self.evaluator.evaluate_events(payload) + def sample(self, notifications): + LOG.debug('Received %s messages in batch.', len(notifications)) + for notification in notifications: + self.evaluator.evaluate_events(notification['payload']) class EventAlarmEvaluationService(service.Service): @@ -49,10 +61,13 @@ class EventAlarmEvaluationService(service.Service): super(EventAlarmEvaluationService, self).start() self.storage_conn = storage.get_connection_from_config(self.conf) self.evaluator = event.EventAlarmEvaluator(self.conf) - self.listener = messaging.get_notification_listener( + self.listener = messaging.get_batch_notification_listener( messaging.get_transport(self.conf), - [oslo_messaging.Target(topic=self.conf.event_alarm_topic)], - [EventAlarmEndpoint(self.evaluator)]) + [oslo_messaging.Target( + topic=self.conf.listener.event_alarm_topic)], + [EventAlarmEndpoint(self.evaluator)], False, + self.conf.listener.batch_size, + self.conf.listener.batch_timeout) self.listener.start() # Add a dummy thread to have wait() working self.tg.add_timer(604800, lambda: None) diff --git a/aodh/messaging.py b/aodh/messaging.py index b8e7022cb..53dfc108b 100644 --- a/aodh/messaging.py +++ b/aodh/messaging.py @@ -45,14 +45,6 @@ def get_transport(conf, url=None, optional=False, cache=True): return transport -def get_notification_listener(transport, targets, endpoints, - allow_requeue=False): - """Return a configured oslo_messaging notification listener.""" - return oslo_messaging.get_notification_listener( - transport, targets, endpoints, executor='threading', - allow_requeue=allow_requeue) - - def get_batch_notification_listener(transport, targets, endpoints, allow_requeue=False, batch_size=1, batch_timeout=None): diff --git a/aodh/opts.py b/aodh/opts.py index 016b74529..26a15801a 100644 --- a/aodh/opts.py +++ b/aodh/opts.py @@ -36,7 +36,6 @@ def list_opts(): itertools.chain( aodh.evaluator.OPTS, aodh.evaluator.event.OPTS, - aodh.event.OPTS, aodh.notifier.rest.OPTS, aodh.queue.OPTS, aodh.service.OPTS, @@ -64,11 +63,12 @@ def list_opts(): ('coordination', aodh.coordination.OPTS), ('database', aodh.storage.OPTS), ('evaluator', aodh.service.EVALUATOR_OPTS), - ('listener', aodh.service.LISTENER_OPTS), + ('listener', itertools.chain(aodh.service.LISTENER_OPTS, + aodh.event.OPTS)), ('notifier', aodh.service.NOTIFIER_OPTS), ('service_credentials', aodh.keystone_client.OPTS), ('service_types', aodh.notifier.zaqar.SERVICE_OPTS), - ('notifier', aodh.notifier.OPTS) + ('notifier', aodh.notifier.OPTS), ] diff --git a/aodh/tests/functional/api/v2/test_alarm_scenarios.py b/aodh/tests/functional/api/v2/test_alarm_scenarios.py index ea81256a3..03b4dce11 100644 --- a/aodh/tests/functional/api/v2/test_alarm_scenarios.py +++ b/aodh/tests/functional/api/v2/test_alarm_scenarios.py @@ -1657,15 +1657,16 @@ class TestAlarms(TestAlarmsBase): } endpoint = mock.MagicMock() target = oslo_messaging.Target(topic="notifications") - listener = messaging.get_notification_listener( + listener = messaging.get_batch_notification_listener( self.transport, [target], [endpoint]) listener.start() endpoint.info.side_effect = lambda *args: listener.stop() self.post_json('/alarms', params=json, headers=self.auth_headers) listener.wait() - class PayloadMatcher(object): - def __eq__(self, payload): + class NotificationsMatcher(object): + def __eq__(self, notifications): + payload = notifications[0]['payload'] return (payload['detail']['name'] == 'sent_notification' and payload['type'] == 'creation' and payload['detail']['rule']['meter_name'] == 'ameter' and @@ -1673,10 +1674,7 @@ class TestAlarms(TestAlarmsBase): 'project_id', 'timestamp', 'user_id']).issubset(payload.keys())) - endpoint.info.assert_called_once_with( - {}, - 'aodh.api', 'alarm.creation', - PayloadMatcher(), mock.ANY) + endpoint.info.assert_called_once_with(NotificationsMatcher()) def test_alarm_sends_notification(self): with mock.patch.object(messaging, 'get_notifier') as get_notifier: diff --git a/aodh/tests/unit/test_event.py b/aodh/tests/unit/test_event.py index 19de42175..bea1062a0 100644 --- a/aodh/tests/unit/test_event.py +++ b/aodh/tests/unit/test_event.py @@ -14,8 +14,10 @@ # under the License. import mock +import time from oslo_config import fixture as fixture_config +import oslo_messaging from oslo_messaging import server from aodh import event @@ -32,6 +34,9 @@ class TestEventAlarmEvaluationService(tests_base.BaseTestCase): self.CONF = self.useFixture(fixture_config.Config(conf)).conf self.setup_messaging(self.CONF) self.service = event.EventAlarmEvaluationService(self.CONF) + self._msg_notifier = oslo_messaging.Notifier( + self.transport, topics=['alarm.all'], driver='messaging', + publisher_id='test-publisher') @mock.patch('aodh.storage.get_connection_from_config', mock.MagicMock()) @@ -45,8 +50,31 @@ class TestEventAlarmEvaluationService(tests_base.BaseTestCase): mock.MagicMock()) def test_listener_start_called(self): listener = mock.Mock() - with mock.patch('aodh.messaging.get_notification_listener', + with mock.patch('aodh.messaging.get_batch_notification_listener', return_value=listener): self.addCleanup(self.service.stop) self.service.start() self.assertTrue(listener.start.called) + + @mock.patch('aodh.event.EventAlarmEndpoint.sample') + def test_batch_event_listener(self, mocked): + received_events = [] + mocked.side_effect = lambda msg: received_events.append(msg) + self.CONF.set_override("batch_size", 2, 'listener') + with mock.patch('aodh.storage.get_connection_from_config'): + self.svc = event.EventAlarmEvaluationService(self.CONF) + self.svc.start() + event1 = {'event_type': 'compute.instance.update', + 'traits': ['foo', 'bar'], + 'message_id': '20d03d17-4aba-4900-a179-dba1281a3451', + 'generated': '2016-04-23T06:50:21.622739'} + event2 = {'event_type': 'compute.instance.update', + 'traits': ['foo', 'bar'], + 'message_id': '20d03d17-4aba-4900-a179-dba1281a3452', + 'generated': '2016-04-23T06:50:23.622739'} + self._msg_notifier.sample({}, 'event', event1) + self._msg_notifier.sample({}, 'event', event2) + time.sleep(1) + self.assertEqual(1, len(received_events)) + self.assertEqual(2, len(received_events[0])) + self.svc.stop() diff --git a/releasenotes/notes/event-listener-batch-support-04e6ff159ef34d8c.yaml b/releasenotes/notes/event-listener-batch-support-04e6ff159ef34d8c.yaml new file mode 100644 index 000000000..1acaf4be3 --- /dev/null +++ b/releasenotes/notes/event-listener-batch-support-04e6ff159ef34d8c.yaml @@ -0,0 +1,12 @@ +--- +features: + - > + Add support for batch processing of messages from queue. This will allow + the aodh-listener to grab multiple event messages per thread to enable more + efficient processing. +upgrade: + - > + batch_size and batch_timeout configuration options are added to [listener] + section of configuration. The batch_size controls the number of messages to + grab before processing. Similarly, the batch_timeout defines the wait time + before processing.