Add batch listener support for event evaluator
This change add batch messages listener for aodh-listener service, that allow users configure aodh-listener to receive messages in batch. Change-Id: Ifd287b4903bec92db10be759aa239ea354da8a06 Signed-off-by: liusheng <liusheng@huawei.com>
This commit is contained in:
parent
b5ebdaf58d
commit
56f24bdad1
@ -14,6 +14,7 @@
|
|||||||
# under the License.
|
# under the License.
|
||||||
|
|
||||||
from oslo_config import cfg
|
from oslo_config import cfg
|
||||||
|
from oslo_log import log
|
||||||
import oslo_messaging
|
import oslo_messaging
|
||||||
from oslo_service import service
|
from oslo_service import service
|
||||||
|
|
||||||
@ -21,11 +22,21 @@ from aodh.evaluator import event
|
|||||||
from aodh import messaging
|
from aodh import messaging
|
||||||
from aodh import storage
|
from aodh import storage
|
||||||
|
|
||||||
|
LOG = log.getLogger(__name__)
|
||||||
|
|
||||||
OPTS = [
|
OPTS = [
|
||||||
cfg.StrOpt('event_alarm_topic',
|
cfg.StrOpt('event_alarm_topic',
|
||||||
default='alarm.all',
|
default='alarm.all',
|
||||||
|
deprecated_group='DEFAULT',
|
||||||
help='The topic that aodh uses for event alarm evaluation.'),
|
help='The topic that aodh uses for event alarm evaluation.'),
|
||||||
|
cfg.IntOpt('batch_size',
|
||||||
|
default=1,
|
||||||
|
help='Number of notification messages to wait before '
|
||||||
|
'dispatching them.'),
|
||||||
|
cfg.IntOpt('batch_timeout',
|
||||||
|
default=None,
|
||||||
|
help='Number of seconds to wait before dispatching samples '
|
||||||
|
'when batch_size is not reached (None means indefinitely).'),
|
||||||
]
|
]
|
||||||
|
|
||||||
|
|
||||||
@ -34,9 +45,10 @@ class EventAlarmEndpoint(object):
|
|||||||
def __init__(self, evaluator):
|
def __init__(self, evaluator):
|
||||||
self.evaluator = evaluator
|
self.evaluator = evaluator
|
||||||
|
|
||||||
def sample(self, ctxt, publisher_id, event_type, payload, metadata):
|
def sample(self, notifications):
|
||||||
# TODO(r-mibu): requeue on error
|
LOG.debug('Received %s messages in batch.', len(notifications))
|
||||||
self.evaluator.evaluate_events(payload)
|
for notification in notifications:
|
||||||
|
self.evaluator.evaluate_events(notification['payload'])
|
||||||
|
|
||||||
|
|
||||||
class EventAlarmEvaluationService(service.Service):
|
class EventAlarmEvaluationService(service.Service):
|
||||||
@ -49,10 +61,13 @@ class EventAlarmEvaluationService(service.Service):
|
|||||||
super(EventAlarmEvaluationService, self).start()
|
super(EventAlarmEvaluationService, self).start()
|
||||||
self.storage_conn = storage.get_connection_from_config(self.conf)
|
self.storage_conn = storage.get_connection_from_config(self.conf)
|
||||||
self.evaluator = event.EventAlarmEvaluator(self.conf)
|
self.evaluator = event.EventAlarmEvaluator(self.conf)
|
||||||
self.listener = messaging.get_notification_listener(
|
self.listener = messaging.get_batch_notification_listener(
|
||||||
messaging.get_transport(self.conf),
|
messaging.get_transport(self.conf),
|
||||||
[oslo_messaging.Target(topic=self.conf.event_alarm_topic)],
|
[oslo_messaging.Target(
|
||||||
[EventAlarmEndpoint(self.evaluator)])
|
topic=self.conf.listener.event_alarm_topic)],
|
||||||
|
[EventAlarmEndpoint(self.evaluator)], False,
|
||||||
|
self.conf.listener.batch_size,
|
||||||
|
self.conf.listener.batch_timeout)
|
||||||
self.listener.start()
|
self.listener.start()
|
||||||
# Add a dummy thread to have wait() working
|
# Add a dummy thread to have wait() working
|
||||||
self.tg.add_timer(604800, lambda: None)
|
self.tg.add_timer(604800, lambda: None)
|
||||||
|
@ -45,14 +45,6 @@ def get_transport(conf, url=None, optional=False, cache=True):
|
|||||||
return transport
|
return transport
|
||||||
|
|
||||||
|
|
||||||
def get_notification_listener(transport, targets, endpoints,
|
|
||||||
allow_requeue=False):
|
|
||||||
"""Return a configured oslo_messaging notification listener."""
|
|
||||||
return oslo_messaging.get_notification_listener(
|
|
||||||
transport, targets, endpoints, executor='threading',
|
|
||||||
allow_requeue=allow_requeue)
|
|
||||||
|
|
||||||
|
|
||||||
def get_batch_notification_listener(transport, targets, endpoints,
|
def get_batch_notification_listener(transport, targets, endpoints,
|
||||||
allow_requeue=False,
|
allow_requeue=False,
|
||||||
batch_size=1, batch_timeout=None):
|
batch_size=1, batch_timeout=None):
|
||||||
|
@ -36,7 +36,6 @@ def list_opts():
|
|||||||
itertools.chain(
|
itertools.chain(
|
||||||
aodh.evaluator.OPTS,
|
aodh.evaluator.OPTS,
|
||||||
aodh.evaluator.event.OPTS,
|
aodh.evaluator.event.OPTS,
|
||||||
aodh.event.OPTS,
|
|
||||||
aodh.notifier.rest.OPTS,
|
aodh.notifier.rest.OPTS,
|
||||||
aodh.queue.OPTS,
|
aodh.queue.OPTS,
|
||||||
aodh.service.OPTS,
|
aodh.service.OPTS,
|
||||||
@ -64,11 +63,12 @@ def list_opts():
|
|||||||
('coordination', aodh.coordination.OPTS),
|
('coordination', aodh.coordination.OPTS),
|
||||||
('database', aodh.storage.OPTS),
|
('database', aodh.storage.OPTS),
|
||||||
('evaluator', aodh.service.EVALUATOR_OPTS),
|
('evaluator', aodh.service.EVALUATOR_OPTS),
|
||||||
('listener', aodh.service.LISTENER_OPTS),
|
('listener', itertools.chain(aodh.service.LISTENER_OPTS,
|
||||||
|
aodh.event.OPTS)),
|
||||||
('notifier', aodh.service.NOTIFIER_OPTS),
|
('notifier', aodh.service.NOTIFIER_OPTS),
|
||||||
('service_credentials', aodh.keystone_client.OPTS),
|
('service_credentials', aodh.keystone_client.OPTS),
|
||||||
('service_types', aodh.notifier.zaqar.SERVICE_OPTS),
|
('service_types', aodh.notifier.zaqar.SERVICE_OPTS),
|
||||||
('notifier', aodh.notifier.OPTS)
|
('notifier', aodh.notifier.OPTS),
|
||||||
]
|
]
|
||||||
|
|
||||||
|
|
||||||
|
@ -1657,15 +1657,16 @@ class TestAlarms(TestAlarmsBase):
|
|||||||
}
|
}
|
||||||
endpoint = mock.MagicMock()
|
endpoint = mock.MagicMock()
|
||||||
target = oslo_messaging.Target(topic="notifications")
|
target = oslo_messaging.Target(topic="notifications")
|
||||||
listener = messaging.get_notification_listener(
|
listener = messaging.get_batch_notification_listener(
|
||||||
self.transport, [target], [endpoint])
|
self.transport, [target], [endpoint])
|
||||||
listener.start()
|
listener.start()
|
||||||
endpoint.info.side_effect = lambda *args: listener.stop()
|
endpoint.info.side_effect = lambda *args: listener.stop()
|
||||||
self.post_json('/alarms', params=json, headers=self.auth_headers)
|
self.post_json('/alarms', params=json, headers=self.auth_headers)
|
||||||
listener.wait()
|
listener.wait()
|
||||||
|
|
||||||
class PayloadMatcher(object):
|
class NotificationsMatcher(object):
|
||||||
def __eq__(self, payload):
|
def __eq__(self, notifications):
|
||||||
|
payload = notifications[0]['payload']
|
||||||
return (payload['detail']['name'] == 'sent_notification' and
|
return (payload['detail']['name'] == 'sent_notification' and
|
||||||
payload['type'] == 'creation' and
|
payload['type'] == 'creation' and
|
||||||
payload['detail']['rule']['meter_name'] == 'ameter' and
|
payload['detail']['rule']['meter_name'] == 'ameter' and
|
||||||
@ -1673,10 +1674,7 @@ class TestAlarms(TestAlarmsBase):
|
|||||||
'project_id', 'timestamp',
|
'project_id', 'timestamp',
|
||||||
'user_id']).issubset(payload.keys()))
|
'user_id']).issubset(payload.keys()))
|
||||||
|
|
||||||
endpoint.info.assert_called_once_with(
|
endpoint.info.assert_called_once_with(NotificationsMatcher())
|
||||||
{},
|
|
||||||
'aodh.api', 'alarm.creation',
|
|
||||||
PayloadMatcher(), mock.ANY)
|
|
||||||
|
|
||||||
def test_alarm_sends_notification(self):
|
def test_alarm_sends_notification(self):
|
||||||
with mock.patch.object(messaging, 'get_notifier') as get_notifier:
|
with mock.patch.object(messaging, 'get_notifier') as get_notifier:
|
||||||
|
@ -14,8 +14,10 @@
|
|||||||
# under the License.
|
# under the License.
|
||||||
|
|
||||||
import mock
|
import mock
|
||||||
|
import time
|
||||||
|
|
||||||
from oslo_config import fixture as fixture_config
|
from oslo_config import fixture as fixture_config
|
||||||
|
import oslo_messaging
|
||||||
from oslo_messaging import server
|
from oslo_messaging import server
|
||||||
|
|
||||||
from aodh import event
|
from aodh import event
|
||||||
@ -32,6 +34,9 @@ class TestEventAlarmEvaluationService(tests_base.BaseTestCase):
|
|||||||
self.CONF = self.useFixture(fixture_config.Config(conf)).conf
|
self.CONF = self.useFixture(fixture_config.Config(conf)).conf
|
||||||
self.setup_messaging(self.CONF)
|
self.setup_messaging(self.CONF)
|
||||||
self.service = event.EventAlarmEvaluationService(self.CONF)
|
self.service = event.EventAlarmEvaluationService(self.CONF)
|
||||||
|
self._msg_notifier = oslo_messaging.Notifier(
|
||||||
|
self.transport, topics=['alarm.all'], driver='messaging',
|
||||||
|
publisher_id='test-publisher')
|
||||||
|
|
||||||
@mock.patch('aodh.storage.get_connection_from_config',
|
@mock.patch('aodh.storage.get_connection_from_config',
|
||||||
mock.MagicMock())
|
mock.MagicMock())
|
||||||
@ -45,8 +50,31 @@ class TestEventAlarmEvaluationService(tests_base.BaseTestCase):
|
|||||||
mock.MagicMock())
|
mock.MagicMock())
|
||||||
def test_listener_start_called(self):
|
def test_listener_start_called(self):
|
||||||
listener = mock.Mock()
|
listener = mock.Mock()
|
||||||
with mock.patch('aodh.messaging.get_notification_listener',
|
with mock.patch('aodh.messaging.get_batch_notification_listener',
|
||||||
return_value=listener):
|
return_value=listener):
|
||||||
self.addCleanup(self.service.stop)
|
self.addCleanup(self.service.stop)
|
||||||
self.service.start()
|
self.service.start()
|
||||||
self.assertTrue(listener.start.called)
|
self.assertTrue(listener.start.called)
|
||||||
|
|
||||||
|
@mock.patch('aodh.event.EventAlarmEndpoint.sample')
|
||||||
|
def test_batch_event_listener(self, mocked):
|
||||||
|
received_events = []
|
||||||
|
mocked.side_effect = lambda msg: received_events.append(msg)
|
||||||
|
self.CONF.set_override("batch_size", 2, 'listener')
|
||||||
|
with mock.patch('aodh.storage.get_connection_from_config'):
|
||||||
|
self.svc = event.EventAlarmEvaluationService(self.CONF)
|
||||||
|
self.svc.start()
|
||||||
|
event1 = {'event_type': 'compute.instance.update',
|
||||||
|
'traits': ['foo', 'bar'],
|
||||||
|
'message_id': '20d03d17-4aba-4900-a179-dba1281a3451',
|
||||||
|
'generated': '2016-04-23T06:50:21.622739'}
|
||||||
|
event2 = {'event_type': 'compute.instance.update',
|
||||||
|
'traits': ['foo', 'bar'],
|
||||||
|
'message_id': '20d03d17-4aba-4900-a179-dba1281a3452',
|
||||||
|
'generated': '2016-04-23T06:50:23.622739'}
|
||||||
|
self._msg_notifier.sample({}, 'event', event1)
|
||||||
|
self._msg_notifier.sample({}, 'event', event2)
|
||||||
|
time.sleep(1)
|
||||||
|
self.assertEqual(1, len(received_events))
|
||||||
|
self.assertEqual(2, len(received_events[0]))
|
||||||
|
self.svc.stop()
|
||||||
|
@ -0,0 +1,12 @@
|
|||||||
|
---
|
||||||
|
features:
|
||||||
|
- >
|
||||||
|
Add support for batch processing of messages from queue. This will allow
|
||||||
|
the aodh-listener to grab multiple event messages per thread to enable more
|
||||||
|
efficient processing.
|
||||||
|
upgrade:
|
||||||
|
- >
|
||||||
|
batch_size and batch_timeout configuration options are added to [listener]
|
||||||
|
section of configuration. The batch_size controls the number of messages to
|
||||||
|
grab before processing. Similarly, the batch_timeout defines the wait time
|
||||||
|
before processing.
|
Loading…
x
Reference in New Issue
Block a user