Merge "Implements notification-dispatcher-filter"
This commit is contained in:
commit
fb9f01d26a
@ -17,9 +17,11 @@ __all__ = ['Notifier',
|
||||
'LoggingNotificationHandler',
|
||||
'get_notification_listener',
|
||||
'NotificationResult',
|
||||
'NotificationFilter',
|
||||
'PublishErrorsHandler',
|
||||
'LoggingErrorNotificationHandler']
|
||||
|
||||
from .filter import NotificationFilter
|
||||
from .notifier import *
|
||||
from .listener import *
|
||||
from .log_handler import *
|
||||
|
@ -56,7 +56,9 @@ class NotificationDispatcher(object):
|
||||
for endpoint, prio in itertools.product(endpoints, PRIORITIES):
|
||||
if hasattr(endpoint, prio):
|
||||
method = getattr(endpoint, prio)
|
||||
self._callbacks_by_priority.setdefault(prio, []).append(method)
|
||||
screen = getattr(endpoint, 'filter_rule', None)
|
||||
self._callbacks_by_priority.setdefault(prio, []).append(
|
||||
(screen, method))
|
||||
|
||||
priorities = self._callbacks_by_priority.keys()
|
||||
self._targets_priorities = set(itertools.product(self.targets,
|
||||
@ -118,7 +120,10 @@ class NotificationDispatcher(object):
|
||||
payload = self.serializer.deserialize_entity(ctxt,
|
||||
message.get('payload'))
|
||||
|
||||
for callback in self._callbacks_by_priority.get(priority, []):
|
||||
for screen, callback in self._callbacks_by_priority.get(priority, []):
|
||||
if screen and not screen.match(ctxt, publisher_id, event_type,
|
||||
metadata, payload):
|
||||
continue
|
||||
localcontext.set_local_context(ctxt)
|
||||
try:
|
||||
if executor_callback:
|
||||
|
77
oslo_messaging/notify/filter.py
Normal file
77
oslo_messaging/notify/filter.py
Normal file
@ -0,0 +1,77 @@
|
||||
#
|
||||
# Copyright 2013 eNovance
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import re
|
||||
|
||||
|
||||
class NotificationFilter(object):
|
||||
|
||||
"""Filter notification messages
|
||||
|
||||
The NotificationFilter class is used to filter notifications that an
|
||||
endpoint will received.
|
||||
|
||||
The notification can be filter on different fields: context,
|
||||
publisher_id, event_type, metadata and payload.
|
||||
|
||||
The filter is done via a regular expression
|
||||
|
||||
filter_rule = NotificationFilter(
|
||||
publisher_id='^compute.*',
|
||||
context={'tenant_id': '^5f643cfc-664b-4c69-8000-ce2ed7b08216$',
|
||||
'roles='private'},
|
||||
event_type='^compute\.instance\..*',
|
||||
metadata={'timestamp': 'Aug'},
|
||||
payload={'state': '^active$')
|
||||
|
||||
"""
|
||||
|
||||
def __init__(self, context=None, publisher_id=None, event_type=None,
|
||||
metadata=None, payload=None):
|
||||
self._regex_publisher_id = None
|
||||
self._regex_event_type = None
|
||||
|
||||
if publisher_id is not None:
|
||||
self._regex_publisher_id = re.compile(publisher_id)
|
||||
if event_type is not None:
|
||||
self._regex_event_type = re.compile(event_type)
|
||||
self._regexs_context = self._build_regex_dict(context)
|
||||
self._regexs_metadata = self._build_regex_dict(metadata)
|
||||
self._regexs_payload = self._build_regex_dict(payload)
|
||||
|
||||
@staticmethod
|
||||
def _build_regex_dict(regex_list):
|
||||
if regex_list is None:
|
||||
return {}
|
||||
return dict((k, re.compile(regex_list[k])) for k in regex_list)
|
||||
|
||||
@staticmethod
|
||||
def _check_for_mismatch(data, regex):
|
||||
if isinstance(regex, dict):
|
||||
for k in regex:
|
||||
if (k not in data or not regex[k].match(data[k])):
|
||||
return True
|
||||
elif regex is not None and not regex.match(data):
|
||||
return True
|
||||
return False
|
||||
|
||||
def match(self, context, publisher_id, event_type, metadata, payload):
|
||||
if (self._check_for_mismatch(publisher_id, self._regex_publisher_id) or
|
||||
self._check_for_mismatch(event_type, self._regex_event_type) or
|
||||
self._check_for_mismatch(context, self._regexs_context) or
|
||||
self._check_for_mismatch(metadata, self._regexs_metadata) or
|
||||
self._check_for_mismatch(payload, self._regexs_payload)):
|
||||
return False
|
||||
return True
|
@ -44,10 +44,15 @@ A simple example of a notification listener with multiple endpoints might be::
|
||||
import oslo_messaging
|
||||
|
||||
class NotificationEndpoint(object):
|
||||
filter_rule = NotificationFilter(publisher_id='^compute.*')
|
||||
|
||||
def warn(self, ctxt, publisher_id, event_type, payload, metadata):
|
||||
do_something(payload)
|
||||
|
||||
class ErrorEndpoint(object):
|
||||
filter_rule = NotificationFilter(event_type='^instance\..*\.start$',
|
||||
context={'ctxt_key': 'regexp'})
|
||||
|
||||
def error(self, ctxt, publisher_id, event_type, payload, metadata):
|
||||
do_something(payload)
|
||||
|
||||
@ -69,7 +74,8 @@ A simple example of a notification listener with multiple endpoints might be::
|
||||
A notifier sends a notification on a topic with a priority, the notification
|
||||
listener will receive this notification if the topic of this one have been set
|
||||
in one of the targets and if an endpoint implements the method named like the
|
||||
priority
|
||||
priority and if the notification match the NotificationFilter rule set into
|
||||
the filter_rule attribute of the endpoint.
|
||||
|
||||
Parameters to endpoint methods are the request context supplied by the client,
|
||||
the publisher_id of the notification message, the event_type, the payload and
|
||||
|
@ -147,3 +147,107 @@ class TestDispatcher(test_utils.BaseTestCase):
|
||||
callback()
|
||||
mylog.warning.assert_called_once_with('Unknown priority "%s"',
|
||||
'what???')
|
||||
|
||||
|
||||
class TestDispatcherFilter(test_utils.BaseTestCase):
|
||||
scenarios = [
|
||||
('publisher_id_match',
|
||||
dict(filter_rule=dict(publisher_id='^compute.*'),
|
||||
publisher_id='compute01.manager',
|
||||
event_type='instance.create.start',
|
||||
context={},
|
||||
match=True)),
|
||||
('publisher_id_nomatch',
|
||||
dict(filter_rule=dict(publisher_id='^compute.*'),
|
||||
publisher_id='network01.manager',
|
||||
event_type='instance.create.start',
|
||||
context={},
|
||||
match=False)),
|
||||
('event_type_match',
|
||||
dict(filter_rule=dict(event_type='^instance\.create'),
|
||||
publisher_id='compute01.manager',
|
||||
event_type='instance.create.start',
|
||||
context={},
|
||||
match=True)),
|
||||
('event_type_nomatch',
|
||||
dict(filter_rule=dict(event_type='^instance\.delete'),
|
||||
publisher_id='compute01.manager',
|
||||
event_type='instance.create.start',
|
||||
context={},
|
||||
match=False)),
|
||||
('context_match',
|
||||
dict(filter_rule=dict(context={'user': '^adm'}),
|
||||
publisher_id='compute01.manager',
|
||||
event_type='instance.create.start',
|
||||
context={'user': 'admin'},
|
||||
match=True)),
|
||||
('context_key_missing',
|
||||
dict(filter_rule=dict(context={'user': '^adm'}),
|
||||
publisher_id='compute01.manager',
|
||||
event_type='instance.create.start',
|
||||
context={'project': 'admin'},
|
||||
metadata={},
|
||||
match=False)),
|
||||
('metadata_match',
|
||||
dict(filter_rule=dict(metadata={'message_id': '^99'}),
|
||||
publisher_id='compute01.manager',
|
||||
event_type='instance.create.start',
|
||||
context={},
|
||||
match=True)),
|
||||
('metadata_key_missing',
|
||||
dict(filter_rule=dict(metadata={'user': '^adm'}),
|
||||
publisher_id='compute01.manager',
|
||||
event_type='instance.create.start',
|
||||
context={},
|
||||
match=False)),
|
||||
('payload_match',
|
||||
dict(filter_rule=dict(payload={'state': '^active$'}),
|
||||
publisher_id='compute01.manager',
|
||||
event_type='instance.create.start',
|
||||
context={},
|
||||
match=True)),
|
||||
('payload_no_match',
|
||||
dict(filter_rule=dict(payload={'state': '^deleted$'}),
|
||||
publisher_id='compute01.manager',
|
||||
event_type='instance.create.start',
|
||||
context={},
|
||||
match=False)),
|
||||
('payload_key_missing',
|
||||
dict(filter_rule=dict(payload={'user': '^adm'}),
|
||||
publisher_id='compute01.manager',
|
||||
event_type='instance.create.start',
|
||||
context={},
|
||||
match=False)),
|
||||
('mix_match',
|
||||
dict(filter_rule=dict(event_type='^instance\.create',
|
||||
publisher_id='^compute',
|
||||
context={'user': '^adm'}),
|
||||
publisher_id='compute01.manager',
|
||||
event_type='instance.create.start',
|
||||
context={'user': 'admin'},
|
||||
match=True)),
|
||||
|
||||
]
|
||||
|
||||
def test_filters(self):
|
||||
notification_filter = oslo_messaging.NotificationFilter(
|
||||
**self.filter_rule)
|
||||
endpoint = mock.Mock(spec=['info'], filter_rule=notification_filter)
|
||||
|
||||
targets = [oslo_messaging.Target(topic='notifications')]
|
||||
dispatcher = notify_dispatcher.NotificationDispatcher(
|
||||
targets, [endpoint], serializer=None, allow_requeue=True)
|
||||
message = {'payload': {'state': 'active'},
|
||||
'priority': 'info',
|
||||
'publisher_id': self.publisher_id,
|
||||
'event_type': self.event_type,
|
||||
'timestamp': '2014-03-03 18:21:04.369234',
|
||||
'message_id': '99863dda-97f0-443a-a0c1-6ed317b7fd45'}
|
||||
incoming = mock.Mock(ctxt=self.context, message=message)
|
||||
with dispatcher(incoming) as callback:
|
||||
callback()
|
||||
|
||||
if self.match:
|
||||
self.assertEqual(1, endpoint.info.call_count)
|
||||
else:
|
||||
self.assertEqual(0, endpoint.info.call_count)
|
||||
|
Loading…
Reference in New Issue
Block a user