Implements notification-dispatcher-filter
The NotificationFilter class is used to filter notifications that a notification listerner endpoint will received. The notification can be filtered on different fields: context, publisher_id, event_type, metadata, and payload DocImpact Implements: blueprint notification-dispatcher-filter Change-Id: Ic1239422fac9879b113c9d3d6c3f8dccef20d044
This commit is contained in:
parent
e55a83e832
commit
3d232a0f38
@ -17,9 +17,11 @@ __all__ = ['Notifier',
|
|||||||
'LoggingNotificationHandler',
|
'LoggingNotificationHandler',
|
||||||
'get_notification_listener',
|
'get_notification_listener',
|
||||||
'NotificationResult',
|
'NotificationResult',
|
||||||
|
'NotificationFilter',
|
||||||
'PublishErrorsHandler',
|
'PublishErrorsHandler',
|
||||||
'LoggingErrorNotificationHandler']
|
'LoggingErrorNotificationHandler']
|
||||||
|
|
||||||
|
from .filter import NotificationFilter
|
||||||
from .notifier import *
|
from .notifier import *
|
||||||
from .listener import *
|
from .listener import *
|
||||||
from .log_handler import *
|
from .log_handler import *
|
||||||
|
@ -56,7 +56,9 @@ class NotificationDispatcher(object):
|
|||||||
for endpoint, prio in itertools.product(endpoints, PRIORITIES):
|
for endpoint, prio in itertools.product(endpoints, PRIORITIES):
|
||||||
if hasattr(endpoint, prio):
|
if hasattr(endpoint, prio):
|
||||||
method = getattr(endpoint, prio)
|
method = getattr(endpoint, prio)
|
||||||
self._callbacks_by_priority.setdefault(prio, []).append(method)
|
screen = getattr(endpoint, 'filter_rule', None)
|
||||||
|
self._callbacks_by_priority.setdefault(prio, []).append(
|
||||||
|
(screen, method))
|
||||||
|
|
||||||
priorities = self._callbacks_by_priority.keys()
|
priorities = self._callbacks_by_priority.keys()
|
||||||
self._targets_priorities = set(itertools.product(self.targets,
|
self._targets_priorities = set(itertools.product(self.targets,
|
||||||
@ -118,7 +120,10 @@ class NotificationDispatcher(object):
|
|||||||
payload = self.serializer.deserialize_entity(ctxt,
|
payload = self.serializer.deserialize_entity(ctxt,
|
||||||
message.get('payload'))
|
message.get('payload'))
|
||||||
|
|
||||||
for callback in self._callbacks_by_priority.get(priority, []):
|
for screen, callback in self._callbacks_by_priority.get(priority, []):
|
||||||
|
if screen and not screen.match(ctxt, publisher_id, event_type,
|
||||||
|
metadata, payload):
|
||||||
|
continue
|
||||||
localcontext.set_local_context(ctxt)
|
localcontext.set_local_context(ctxt)
|
||||||
try:
|
try:
|
||||||
if executor_callback:
|
if executor_callback:
|
||||||
|
77
oslo_messaging/notify/filter.py
Normal file
77
oslo_messaging/notify/filter.py
Normal file
@ -0,0 +1,77 @@
|
|||||||
|
#
|
||||||
|
# Copyright 2013 eNovance
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
# not use this file except in compliance with the License. You may obtain
|
||||||
|
# a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
# License for the specific language governing permissions and limitations
|
||||||
|
# under the License.
|
||||||
|
|
||||||
|
import re
|
||||||
|
|
||||||
|
|
||||||
|
class NotificationFilter(object):
|
||||||
|
|
||||||
|
"""Filter notification messages
|
||||||
|
|
||||||
|
The NotificationFilter class is used to filter notifications that an
|
||||||
|
endpoint will received.
|
||||||
|
|
||||||
|
The notification can be filter on different fields: context,
|
||||||
|
publisher_id, event_type, metadata and payload.
|
||||||
|
|
||||||
|
The filter is done via a regular expression
|
||||||
|
|
||||||
|
filter_rule = NotificationFilter(
|
||||||
|
publisher_id='^compute.*',
|
||||||
|
context={'tenant_id': '^5f643cfc-664b-4c69-8000-ce2ed7b08216$',
|
||||||
|
'roles='private'},
|
||||||
|
event_type='^compute\.instance\..*',
|
||||||
|
metadata={'timestamp': 'Aug'},
|
||||||
|
payload={'state': '^active$')
|
||||||
|
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, context=None, publisher_id=None, event_type=None,
|
||||||
|
metadata=None, payload=None):
|
||||||
|
self._regex_publisher_id = None
|
||||||
|
self._regex_event_type = None
|
||||||
|
|
||||||
|
if publisher_id is not None:
|
||||||
|
self._regex_publisher_id = re.compile(publisher_id)
|
||||||
|
if event_type is not None:
|
||||||
|
self._regex_event_type = re.compile(event_type)
|
||||||
|
self._regexs_context = self._build_regex_dict(context)
|
||||||
|
self._regexs_metadata = self._build_regex_dict(metadata)
|
||||||
|
self._regexs_payload = self._build_regex_dict(payload)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _build_regex_dict(regex_list):
|
||||||
|
if regex_list is None:
|
||||||
|
return {}
|
||||||
|
return dict((k, re.compile(regex_list[k])) for k in regex_list)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _check_for_mismatch(data, regex):
|
||||||
|
if isinstance(regex, dict):
|
||||||
|
for k in regex:
|
||||||
|
if (k not in data or not regex[k].match(data[k])):
|
||||||
|
return True
|
||||||
|
elif regex is not None and not regex.match(data):
|
||||||
|
return True
|
||||||
|
return False
|
||||||
|
|
||||||
|
def match(self, context, publisher_id, event_type, metadata, payload):
|
||||||
|
if (self._check_for_mismatch(publisher_id, self._regex_publisher_id) or
|
||||||
|
self._check_for_mismatch(event_type, self._regex_event_type) or
|
||||||
|
self._check_for_mismatch(context, self._regexs_context) or
|
||||||
|
self._check_for_mismatch(metadata, self._regexs_metadata) or
|
||||||
|
self._check_for_mismatch(payload, self._regexs_payload)):
|
||||||
|
return False
|
||||||
|
return True
|
@ -44,10 +44,15 @@ A simple example of a notification listener with multiple endpoints might be::
|
|||||||
import oslo_messaging
|
import oslo_messaging
|
||||||
|
|
||||||
class NotificationEndpoint(object):
|
class NotificationEndpoint(object):
|
||||||
|
filter_rule = NotificationFilter(publisher_id='^compute.*')
|
||||||
|
|
||||||
def warn(self, ctxt, publisher_id, event_type, payload, metadata):
|
def warn(self, ctxt, publisher_id, event_type, payload, metadata):
|
||||||
do_something(payload)
|
do_something(payload)
|
||||||
|
|
||||||
class ErrorEndpoint(object):
|
class ErrorEndpoint(object):
|
||||||
|
filter_rule = NotificationFilter(event_type='^instance\..*\.start$',
|
||||||
|
context={'ctxt_key': 'regexp'})
|
||||||
|
|
||||||
def error(self, ctxt, publisher_id, event_type, payload, metadata):
|
def error(self, ctxt, publisher_id, event_type, payload, metadata):
|
||||||
do_something(payload)
|
do_something(payload)
|
||||||
|
|
||||||
@ -69,7 +74,8 @@ A simple example of a notification listener with multiple endpoints might be::
|
|||||||
A notifier sends a notification on a topic with a priority, the notification
|
A notifier sends a notification on a topic with a priority, the notification
|
||||||
listener will receive this notification if the topic of this one have been set
|
listener will receive this notification if the topic of this one have been set
|
||||||
in one of the targets and if an endpoint implements the method named like the
|
in one of the targets and if an endpoint implements the method named like the
|
||||||
priority
|
priority and if the notification match the NotificationFilter rule set into
|
||||||
|
the filter_rule attribute of the endpoint.
|
||||||
|
|
||||||
Parameters to endpoint methods are the request context supplied by the client,
|
Parameters to endpoint methods are the request context supplied by the client,
|
||||||
the publisher_id of the notification message, the event_type, the payload and
|
the publisher_id of the notification message, the event_type, the payload and
|
||||||
|
@ -147,3 +147,107 @@ class TestDispatcher(test_utils.BaseTestCase):
|
|||||||
callback()
|
callback()
|
||||||
mylog.warning.assert_called_once_with('Unknown priority "%s"',
|
mylog.warning.assert_called_once_with('Unknown priority "%s"',
|
||||||
'what???')
|
'what???')
|
||||||
|
|
||||||
|
|
||||||
|
class TestDispatcherFilter(test_utils.BaseTestCase):
|
||||||
|
scenarios = [
|
||||||
|
('publisher_id_match',
|
||||||
|
dict(filter_rule=dict(publisher_id='^compute.*'),
|
||||||
|
publisher_id='compute01.manager',
|
||||||
|
event_type='instance.create.start',
|
||||||
|
context={},
|
||||||
|
match=True)),
|
||||||
|
('publisher_id_nomatch',
|
||||||
|
dict(filter_rule=dict(publisher_id='^compute.*'),
|
||||||
|
publisher_id='network01.manager',
|
||||||
|
event_type='instance.create.start',
|
||||||
|
context={},
|
||||||
|
match=False)),
|
||||||
|
('event_type_match',
|
||||||
|
dict(filter_rule=dict(event_type='^instance\.create'),
|
||||||
|
publisher_id='compute01.manager',
|
||||||
|
event_type='instance.create.start',
|
||||||
|
context={},
|
||||||
|
match=True)),
|
||||||
|
('event_type_nomatch',
|
||||||
|
dict(filter_rule=dict(event_type='^instance\.delete'),
|
||||||
|
publisher_id='compute01.manager',
|
||||||
|
event_type='instance.create.start',
|
||||||
|
context={},
|
||||||
|
match=False)),
|
||||||
|
('context_match',
|
||||||
|
dict(filter_rule=dict(context={'user': '^adm'}),
|
||||||
|
publisher_id='compute01.manager',
|
||||||
|
event_type='instance.create.start',
|
||||||
|
context={'user': 'admin'},
|
||||||
|
match=True)),
|
||||||
|
('context_key_missing',
|
||||||
|
dict(filter_rule=dict(context={'user': '^adm'}),
|
||||||
|
publisher_id='compute01.manager',
|
||||||
|
event_type='instance.create.start',
|
||||||
|
context={'project': 'admin'},
|
||||||
|
metadata={},
|
||||||
|
match=False)),
|
||||||
|
('metadata_match',
|
||||||
|
dict(filter_rule=dict(metadata={'message_id': '^99'}),
|
||||||
|
publisher_id='compute01.manager',
|
||||||
|
event_type='instance.create.start',
|
||||||
|
context={},
|
||||||
|
match=True)),
|
||||||
|
('metadata_key_missing',
|
||||||
|
dict(filter_rule=dict(metadata={'user': '^adm'}),
|
||||||
|
publisher_id='compute01.manager',
|
||||||
|
event_type='instance.create.start',
|
||||||
|
context={},
|
||||||
|
match=False)),
|
||||||
|
('payload_match',
|
||||||
|
dict(filter_rule=dict(payload={'state': '^active$'}),
|
||||||
|
publisher_id='compute01.manager',
|
||||||
|
event_type='instance.create.start',
|
||||||
|
context={},
|
||||||
|
match=True)),
|
||||||
|
('payload_no_match',
|
||||||
|
dict(filter_rule=dict(payload={'state': '^deleted$'}),
|
||||||
|
publisher_id='compute01.manager',
|
||||||
|
event_type='instance.create.start',
|
||||||
|
context={},
|
||||||
|
match=False)),
|
||||||
|
('payload_key_missing',
|
||||||
|
dict(filter_rule=dict(payload={'user': '^adm'}),
|
||||||
|
publisher_id='compute01.manager',
|
||||||
|
event_type='instance.create.start',
|
||||||
|
context={},
|
||||||
|
match=False)),
|
||||||
|
('mix_match',
|
||||||
|
dict(filter_rule=dict(event_type='^instance\.create',
|
||||||
|
publisher_id='^compute',
|
||||||
|
context={'user': '^adm'}),
|
||||||
|
publisher_id='compute01.manager',
|
||||||
|
event_type='instance.create.start',
|
||||||
|
context={'user': 'admin'},
|
||||||
|
match=True)),
|
||||||
|
|
||||||
|
]
|
||||||
|
|
||||||
|
def test_filters(self):
|
||||||
|
notification_filter = oslo_messaging.NotificationFilter(
|
||||||
|
**self.filter_rule)
|
||||||
|
endpoint = mock.Mock(spec=['info'], filter_rule=notification_filter)
|
||||||
|
|
||||||
|
targets = [oslo_messaging.Target(topic='notifications')]
|
||||||
|
dispatcher = notify_dispatcher.NotificationDispatcher(
|
||||||
|
targets, [endpoint], serializer=None, allow_requeue=True)
|
||||||
|
message = {'payload': {'state': 'active'},
|
||||||
|
'priority': 'info',
|
||||||
|
'publisher_id': self.publisher_id,
|
||||||
|
'event_type': self.event_type,
|
||||||
|
'timestamp': '2014-03-03 18:21:04.369234',
|
||||||
|
'message_id': '99863dda-97f0-443a-a0c1-6ed317b7fd45'}
|
||||||
|
incoming = mock.Mock(ctxt=self.context, message=message)
|
||||||
|
with dispatcher(incoming) as callback:
|
||||||
|
callback()
|
||||||
|
|
||||||
|
if self.match:
|
||||||
|
self.assertEqual(1, endpoint.info.call_count)
|
||||||
|
else:
|
||||||
|
self.assertEqual(0, endpoint.info.call_count)
|
||||||
|
Loading…
Reference in New Issue
Block a user