From 3d232a0f389eb46b857bf209644f5ac817b88801 Mon Sep 17 00:00:00 2001 From: Mehdi Abaakouk Date: Mon, 3 Mar 2014 19:35:16 +0100 Subject: [PATCH] Implements notification-dispatcher-filter The NotificationFilter class is used to filter notifications that a notification listerner endpoint will received. The notification can be filtered on different fields: context, publisher_id, event_type, metadata, and payload DocImpact Implements: blueprint notification-dispatcher-filter Change-Id: Ic1239422fac9879b113c9d3d6c3f8dccef20d044 --- oslo_messaging/notify/__init__.py | 2 + oslo_messaging/notify/dispatcher.py | 9 +- oslo_messaging/notify/filter.py | 77 +++++++++++++ oslo_messaging/notify/listener.py | 8 +- .../tests/notify/test_dispatcher.py | 104 ++++++++++++++++++ 5 files changed, 197 insertions(+), 3 deletions(-) create mode 100644 oslo_messaging/notify/filter.py diff --git a/oslo_messaging/notify/__init__.py b/oslo_messaging/notify/__init__.py index c5032db83..dd5304d46 100644 --- a/oslo_messaging/notify/__init__.py +++ b/oslo_messaging/notify/__init__.py @@ -17,9 +17,11 @@ __all__ = ['Notifier', 'LoggingNotificationHandler', 'get_notification_listener', 'NotificationResult', + 'NotificationFilter', 'PublishErrorsHandler', 'LoggingErrorNotificationHandler'] +from .filter import NotificationFilter from .notifier import * from .listener import * from .log_handler import * diff --git a/oslo_messaging/notify/dispatcher.py b/oslo_messaging/notify/dispatcher.py index a9e8cc618..48c4e2097 100644 --- a/oslo_messaging/notify/dispatcher.py +++ b/oslo_messaging/notify/dispatcher.py @@ -56,7 +56,9 @@ class NotificationDispatcher(object): for endpoint, prio in itertools.product(endpoints, PRIORITIES): if hasattr(endpoint, prio): method = getattr(endpoint, prio) - self._callbacks_by_priority.setdefault(prio, []).append(method) + screen = getattr(endpoint, 'filter_rule', None) + self._callbacks_by_priority.setdefault(prio, []).append( + (screen, method)) priorities = self._callbacks_by_priority.keys() self._targets_priorities = set(itertools.product(self.targets, @@ -118,7 +120,10 @@ class NotificationDispatcher(object): payload = self.serializer.deserialize_entity(ctxt, message.get('payload')) - for callback in self._callbacks_by_priority.get(priority, []): + for screen, callback in self._callbacks_by_priority.get(priority, []): + if screen and not screen.match(ctxt, publisher_id, event_type, + metadata, payload): + continue localcontext.set_local_context(ctxt) try: if executor_callback: diff --git a/oslo_messaging/notify/filter.py b/oslo_messaging/notify/filter.py new file mode 100644 index 000000000..b23fac40d --- /dev/null +++ b/oslo_messaging/notify/filter.py @@ -0,0 +1,77 @@ +# +# Copyright 2013 eNovance +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import re + + +class NotificationFilter(object): + + """Filter notification messages + + The NotificationFilter class is used to filter notifications that an + endpoint will received. + + The notification can be filter on different fields: context, + publisher_id, event_type, metadata and payload. + + The filter is done via a regular expression + + filter_rule = NotificationFilter( + publisher_id='^compute.*', + context={'tenant_id': '^5f643cfc-664b-4c69-8000-ce2ed7b08216$', + 'roles='private'}, + event_type='^compute\.instance\..*', + metadata={'timestamp': 'Aug'}, + payload={'state': '^active$') + + """ + + def __init__(self, context=None, publisher_id=None, event_type=None, + metadata=None, payload=None): + self._regex_publisher_id = None + self._regex_event_type = None + + if publisher_id is not None: + self._regex_publisher_id = re.compile(publisher_id) + if event_type is not None: + self._regex_event_type = re.compile(event_type) + self._regexs_context = self._build_regex_dict(context) + self._regexs_metadata = self._build_regex_dict(metadata) + self._regexs_payload = self._build_regex_dict(payload) + + @staticmethod + def _build_regex_dict(regex_list): + if regex_list is None: + return {} + return dict((k, re.compile(regex_list[k])) for k in regex_list) + + @staticmethod + def _check_for_mismatch(data, regex): + if isinstance(regex, dict): + for k in regex: + if (k not in data or not regex[k].match(data[k])): + return True + elif regex is not None and not regex.match(data): + return True + return False + + def match(self, context, publisher_id, event_type, metadata, payload): + if (self._check_for_mismatch(publisher_id, self._regex_publisher_id) or + self._check_for_mismatch(event_type, self._regex_event_type) or + self._check_for_mismatch(context, self._regexs_context) or + self._check_for_mismatch(metadata, self._regexs_metadata) or + self._check_for_mismatch(payload, self._regexs_payload)): + return False + return True diff --git a/oslo_messaging/notify/listener.py b/oslo_messaging/notify/listener.py index a1586ddfb..6a7540a2a 100644 --- a/oslo_messaging/notify/listener.py +++ b/oslo_messaging/notify/listener.py @@ -44,10 +44,15 @@ A simple example of a notification listener with multiple endpoints might be:: import oslo_messaging class NotificationEndpoint(object): + filter_rule = NotificationFilter(publisher_id='^compute.*') + def warn(self, ctxt, publisher_id, event_type, payload, metadata): do_something(payload) class ErrorEndpoint(object): + filter_rule = NotificationFilter(event_type='^instance\..*\.start$', + context={'ctxt_key': 'regexp'}) + def error(self, ctxt, publisher_id, event_type, payload, metadata): do_something(payload) @@ -69,7 +74,8 @@ A simple example of a notification listener with multiple endpoints might be:: A notifier sends a notification on a topic with a priority, the notification listener will receive this notification if the topic of this one have been set in one of the targets and if an endpoint implements the method named like the -priority +priority and if the notification match the NotificationFilter rule set into +the filter_rule attribute of the endpoint. Parameters to endpoint methods are the request context supplied by the client, the publisher_id of the notification message, the event_type, the payload and diff --git a/oslo_messaging/tests/notify/test_dispatcher.py b/oslo_messaging/tests/notify/test_dispatcher.py index 029f737ec..990ada2dd 100644 --- a/oslo_messaging/tests/notify/test_dispatcher.py +++ b/oslo_messaging/tests/notify/test_dispatcher.py @@ -147,3 +147,107 @@ class TestDispatcher(test_utils.BaseTestCase): callback() mylog.warning.assert_called_once_with('Unknown priority "%s"', 'what???') + + +class TestDispatcherFilter(test_utils.BaseTestCase): + scenarios = [ + ('publisher_id_match', + dict(filter_rule=dict(publisher_id='^compute.*'), + publisher_id='compute01.manager', + event_type='instance.create.start', + context={}, + match=True)), + ('publisher_id_nomatch', + dict(filter_rule=dict(publisher_id='^compute.*'), + publisher_id='network01.manager', + event_type='instance.create.start', + context={}, + match=False)), + ('event_type_match', + dict(filter_rule=dict(event_type='^instance\.create'), + publisher_id='compute01.manager', + event_type='instance.create.start', + context={}, + match=True)), + ('event_type_nomatch', + dict(filter_rule=dict(event_type='^instance\.delete'), + publisher_id='compute01.manager', + event_type='instance.create.start', + context={}, + match=False)), + ('context_match', + dict(filter_rule=dict(context={'user': '^adm'}), + publisher_id='compute01.manager', + event_type='instance.create.start', + context={'user': 'admin'}, + match=True)), + ('context_key_missing', + dict(filter_rule=dict(context={'user': '^adm'}), + publisher_id='compute01.manager', + event_type='instance.create.start', + context={'project': 'admin'}, + metadata={}, + match=False)), + ('metadata_match', + dict(filter_rule=dict(metadata={'message_id': '^99'}), + publisher_id='compute01.manager', + event_type='instance.create.start', + context={}, + match=True)), + ('metadata_key_missing', + dict(filter_rule=dict(metadata={'user': '^adm'}), + publisher_id='compute01.manager', + event_type='instance.create.start', + context={}, + match=False)), + ('payload_match', + dict(filter_rule=dict(payload={'state': '^active$'}), + publisher_id='compute01.manager', + event_type='instance.create.start', + context={}, + match=True)), + ('payload_no_match', + dict(filter_rule=dict(payload={'state': '^deleted$'}), + publisher_id='compute01.manager', + event_type='instance.create.start', + context={}, + match=False)), + ('payload_key_missing', + dict(filter_rule=dict(payload={'user': '^adm'}), + publisher_id='compute01.manager', + event_type='instance.create.start', + context={}, + match=False)), + ('mix_match', + dict(filter_rule=dict(event_type='^instance\.create', + publisher_id='^compute', + context={'user': '^adm'}), + publisher_id='compute01.manager', + event_type='instance.create.start', + context={'user': 'admin'}, + match=True)), + + ] + + def test_filters(self): + notification_filter = oslo_messaging.NotificationFilter( + **self.filter_rule) + endpoint = mock.Mock(spec=['info'], filter_rule=notification_filter) + + targets = [oslo_messaging.Target(topic='notifications')] + dispatcher = notify_dispatcher.NotificationDispatcher( + targets, [endpoint], serializer=None, allow_requeue=True) + message = {'payload': {'state': 'active'}, + 'priority': 'info', + 'publisher_id': self.publisher_id, + 'event_type': self.event_type, + 'timestamp': '2014-03-03 18:21:04.369234', + 'message_id': '99863dda-97f0-443a-a0c1-6ed317b7fd45'} + incoming = mock.Mock(ctxt=self.context, message=message) + with dispatcher(incoming) as callback: + callback() + + if self.match: + self.assertEqual(1, endpoint.info.call_count) + else: + self.assertEqual(0, endpoint.info.call_count)