diff --git a/aodh/messaging.py b/aodh/messaging.py index cab4527bd..b8e7022cb 100644 --- a/aodh/messaging.py +++ b/aodh/messaging.py @@ -53,6 +53,16 @@ def get_notification_listener(transport, targets, endpoints, allow_requeue=allow_requeue) +def get_batch_notification_listener(transport, targets, endpoints, + allow_requeue=False, + batch_size=1, batch_timeout=None): + """Return a configured oslo_messaging notification listener.""" + return oslo_messaging.get_batch_notification_listener( + transport, targets, endpoints, executor='threading', + allow_requeue=allow_requeue, + batch_size=batch_size, batch_timeout=batch_timeout) + + def get_notifier(transport, publisher_id): """Return a configured oslo_messaging notifier.""" notifier = oslo_messaging.Notifier(transport, serializer=_SERIALIZER) diff --git a/aodh/notifier/__init__.py b/aodh/notifier/__init__.py index 5b6c92c3b..bd38bf885 100644 --- a/aodh/notifier/__init__.py +++ b/aodh/notifier/__init__.py @@ -15,6 +15,7 @@ import abc +from oslo_config import cfg from oslo_log import log import oslo_messaging from oslo_service import service as os_service @@ -28,6 +29,18 @@ from aodh import messaging LOG = log.getLogger(__name__) +OPTS = [ + cfg.IntOpt('batch_size', + default=1, + help='Number of notification messages to wait before ' + 'dispatching them.'), + cfg.IntOpt('batch_timeout', + default=None, + help='Number of seconds to wait before dispatching samples ' + 'when batch_size is not reached (None means indefinitely).' + ), +] + @six.add_metaclass(abc.ABCMeta) class AlarmNotifier(object): @@ -69,9 +82,9 @@ class AlarmNotifierService(os_service.Service): invoke_args=(self.conf,)) target = oslo_messaging.Target(topic=self.conf.notifier_topic) - self.listener = messaging.get_notification_listener( - transport, [target], - [AlarmEndpoint(self.notifiers)]) + self.listener = messaging.get_batch_notification_listener( + transport, [target], [AlarmEndpoint(self.notifiers)], False, + self.conf.notifier.batch_size, self.conf.notifier.batch_timeout) self.listener.start() # Add a dummy thread to have wait() working self.tg.add_timer(604800, lambda: None) @@ -88,9 +101,11 @@ class AlarmEndpoint(object): def __init__(self, notifiers): self.notifiers = notifiers - def sample(self, ctxt, publisher_id, event_type, payload, metadata): + def sample(self, notifications): """Endpoint for alarm notifications""" - self._process_alarm(self.notifiers, payload) + LOG.debug('Received %s messages in batch.', len(notifications)) + for notification in notifications: + self._process_alarm(self.notifiers, notification['payload']) @staticmethod def _handle_action(notifiers, action, alarm_id, alarm_name, severity, @@ -133,7 +148,6 @@ class AlarmEndpoint(object): previous, current, reason, reason_data) except Exception: LOG.exception(_LE("Unable to notify alarm %s"), alarm_id) - return @staticmethod def _process_alarm(notifiers, data): diff --git a/aodh/opts.py b/aodh/opts.py index c2e36eab2..016b74529 100644 --- a/aodh/opts.py +++ b/aodh/opts.py @@ -68,6 +68,7 @@ def list_opts(): ('notifier', aodh.service.NOTIFIER_OPTS), ('service_credentials', aodh.keystone_client.OPTS), ('service_types', aodh.notifier.zaqar.SERVICE_OPTS), + ('notifier', aodh.notifier.OPTS) ] diff --git a/aodh/tests/unit/test_notifier.py b/aodh/tests/unit/test_notifier.py index bee763bf6..d750ca831 100644 --- a/aodh/tests/unit/test_notifier.py +++ b/aodh/tests/unit/test_notifier.py @@ -98,6 +98,60 @@ class TestAlarmNotifier(tests_base.BaseTestCase): data['reason_data']), notifications[0]) + @mock.patch('aodh.notifier.LOG.debug') + def test_notify_alarm_with_batch_listener(self, logger): + data1 = { + 'actions': ['test://'], + 'alarm_id': 'foobar', + 'alarm_name': 'testalarm', + 'severity': 'critical', + 'previous': 'OK', + 'current': 'ALARM', + 'reason': 'Everything is on fire', + 'reason_data': {'fire': 'everywhere'} + } + data2 = { + 'actions': ['test://'], + 'alarm_id': 'foobar2', + 'alarm_name': 'testalarm2', + 'severity': 'low', + 'previous': 'ALARM', + 'current': 'OK', + 'reason': 'Everything is fine', + 'reason_data': {'fine': 'fine'} + } + self.service.stop() + self.CONF.set_override("batch_size", 2, 'notifier') + # Init a new service with new configuration + self.svc = notifier.AlarmNotifierService(self.CONF) + self.svc.start() + self._msg_notifier.sample({}, 'alarm.update', data1) + self._msg_notifier.sample({}, 'alarm.update', data2) + time.sleep(1) + notifications = self.svc.notifiers['test'].obj.notifications + self.assertEqual(2, len(notifications)) + self.assertEqual((urlparse.urlsplit(data1['actions'][0]), + data1['alarm_id'], + data1['alarm_name'], + data1['severity'], + data1['previous'], + data1['current'], + data1['reason'], + data1['reason_data']), + notifications[0]) + self.assertEqual((urlparse.urlsplit(data2['actions'][0]), + data2['alarm_id'], + data2['alarm_name'], + data2['severity'], + data2['previous'], + data2['current'], + data2['reason'], + data2['reason_data']), + notifications[1]) + self.assertEqual(mock.call('Received %s messages in batch.', 2), + logger.call_args_list[0]) + self.svc.stop() + @staticmethod def _notification(action): notification = {} diff --git a/releasenotes/notes/notifier-batch-listener-01796e2cb06344dd.yaml b/releasenotes/notes/notifier-batch-listener-01796e2cb06344dd.yaml new file mode 100644 index 000000000..dd9a20e6a --- /dev/null +++ b/releasenotes/notes/notifier-batch-listener-01796e2cb06344dd.yaml @@ -0,0 +1,12 @@ +--- +features: + - > + Add support for batch processing of messages from queue. This will allow + the aodh-notifier to grab multiple messages per thread to enable more + efficient processing. +upgrade: + - > + batch_size and batch_timeout configuration options are added to [notifier] + section of configuration. The batch_size controls the number of messages to + grab before processing. Similarly, the batch_timeout defines the wait time + before processing. diff --git a/requirements.txt b/requirements.txt index d59b0b5ab..89e1b7370 100644 --- a/requirements.txt +++ b/requirements.txt @@ -19,7 +19,7 @@ oslo.service>=0.1.0 # Apache-2.0 PasteDeploy>=1.5.0 pbr<2.0,>=0.11 pecan>=0.8.0 -oslo.messaging>2.6.1,!=2.8.0 # Apache-2.0 +oslo.messaging>=4.5.0 # Apache-2.0 oslo.middleware>=3.0.0 # Apache-2.0 oslo.serialization>=1.4.0 # Apache-2.0 oslo.utils>=3.5.0 # Apache-2.0