Routing notifier

Takes a yaml-based config file
(see etc/oslo/routing_notifier.yaml.sample) via
routing_notifier_config option.

Events may be routed by priority or event_type.

Implements: blueprint configurable-notification
Change-Id: I437dfac348f387044e6da3d6a0bbb208323c1741
This commit is contained in:
Sandy Walsh 2013-12-02 21:08:07 +00:00
parent d3f3fd7759
commit 30c9334306
5 changed files with 355 additions and 1 deletions

View File

@ -0,0 +1,29 @@
# Setting a priority AND an event means both have to be satisfied.
#
# However, defining different sets for the same driver allows you
# to do OR operations.
#
# See how this logic is modeled below:
#
# if (priority in info, warn or error) or
# (event == compute.scheduler.run_instance)
# send to messaging driver ...
#
# if priority == 'poll' and
# event == 'bandwidth.*'
# send to poll driver
group_1:
messaging:
accepted_priorities: ['info', 'warn', 'error']
poll:
accepted_priorities: ['poll']
accepted_events: ['bandwidth.*']
log:
accepted_events: ['compute.instance.exists']
group_2:
messaging:⋅
accepted_events: ['compute.scheduler.run_instance.*']

View File

@ -0,0 +1,136 @@
# Copyright 2014 Rackspace Hosting
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import fnmatch
import logging
from oslo.config import cfg
import six
from stevedore import dispatch
import yaml
from oslo.messaging.notify import notifier
from oslo.messaging.openstack.common.gettextutils import _ # noqa
LOG = logging.getLogger(__name__)
router_config = cfg.StrOpt('routing_notifier_config', default='',
help='RoutingNotifier configuration file location')
CONF = cfg.CONF
CONF.register_opt(router_config)
class RoutingDriver(notifier._Driver):
NOTIFIER_PLUGIN_NAMESPACE = 'oslo.messaging.notify.drivers'
plugin_manager = None
routing_groups = None # The routing groups from the config file.
used_drivers = None # Used driver names, extracted from config file.
def _should_load_plugin(self, ext, *args, **kwargs):
# Hack to keep stevedore from circular importing since these
# endpoints are used for different purposes.
if ext.name == 'routing':
return False
return ext.name in self.used_drivers
def _get_notifier_config_file(self, filename):
"""Broken out for testing."""
return file(filename, 'r')
def _load_notifiers(self):
"""One-time load of notifier config file."""
self.routing_groups = {}
self.used_drivers = set()
filename = CONF.routing_notifier_config
if not filename:
return
# Infer which drivers are used from the config file.
self.routing_groups = yaml.load(
self._get_notifier_config_file(filename))
if not self.routing_groups:
self.routing_groups = {} # In case we got None from load()
return
for group in self.routing_groups.values():
self.used_drivers.update(group.keys())
LOG.debug(_('loading notifiers from %(namespace)s') %
{'namespace': self.NOTIFIER_PLUGIN_NAMESPACE})
self.plugin_manager = dispatch.DispatchExtensionManager(
namespace=self.NOTIFIER_PLUGIN_NAMESPACE,
check_func=self._should_load_plugin,
invoke_on_load=True,
invoke_args=None)
if not list(self.plugin_manager):
LOG.warning(_("Failed to load any notifiers "
"for %(namespace)s") %
{'namespace': self.NOTIFIER_PLUGIN_NAMESPACE})
def _get_drivers_for_message(self, group, event_type, priority):
"""Which drivers should be called for this event_type
or priority.
"""
accepted_drivers = set()
for driver, rules in six.iteritems(group):
checks = []
for key, patterns in six.iteritems(rules):
if key == 'accepted_events':
c = [fnmatch.fnmatch(event_type, p)
for p in patterns]
checks.append(any(c))
if key == 'accepted_priorities':
c = [fnmatch.fnmatch(priority, p.lower())
for p in patterns]
checks.append(any(c))
if all(checks):
accepted_drivers.add(driver)
return list(accepted_drivers)
def _filter_func(self, ext, context, message, accepted_drivers):
"""True/False if the driver should be called for this message.
"""
# context is unused here, but passed in by map()
return ext.name in accepted_drivers
def _call_notify(self, ext, context, message, accepted_drivers):
"""Emit the notification.
"""
# accepted_drivers is passed in as a result of the map() function
LOG.info(_("Routing '%(event)s' notification to '%(driver)s' driver") %
{'event': message.get('event_type'), 'driver': ext.name})
ext.obj.notify(context, message)
def notify(self, context, message):
if not self.plugin_manager:
self._load_notifiers()
# Fail if these aren't present ...
event_type = message['event_type']
priority = message['priority'].lower()
accepted_drivers = set()
for group in self.routing_groups.values():
accepted_drivers.update(self._get_drivers_for_message(group,
event_type,
priority))
self.plugin_manager.map(self._filter_func, self._call_notify, context,
message, list(accepted_drivers))

View File

@ -9,7 +9,11 @@ six>=1.4.1
# FIXME(markmc): remove this when the drivers no longer # FIXME(markmc): remove this when the drivers no longer
# import eventlet # import eventlet
eventlet>=0.13.0 eventlet>=0.13.0
# used by openstack/common/gettextutils.py # used by openstack/common/gettextutils.py
Babel>=1.3 Babel>=1.3
# for the routing notifier
PyYAML>=3.1.0

View File

@ -48,6 +48,7 @@ oslo.messaging.notify.drivers =
log = oslo.messaging.notify._impl_log:LogDriver log = oslo.messaging.notify._impl_log:LogDriver
test = oslo.messaging.notify._impl_test:TestDriver test = oslo.messaging.notify._impl_test:TestDriver
noop = oslo.messaging.notify._impl_noop:NoOpDriver noop = oslo.messaging.notify._impl_noop:NoOpDriver
routing = oslo.messaging.notify._impl_routing:RoutingDriver
[build_sphinx] [build_sphinx]
source-dir = doc/source source-dir = doc/source

View File

@ -20,11 +20,15 @@ import uuid
import fixtures import fixtures
import mock import mock
from stevedore import extension
from stevedore.tests import manager as test_manager
import testscenarios import testscenarios
import yaml
from oslo import messaging from oslo import messaging
from oslo.messaging.notify import _impl_log from oslo.messaging.notify import _impl_log
from oslo.messaging.notify import _impl_messaging from oslo.messaging.notify import _impl_messaging
from oslo.messaging.notify import _impl_routing as routing
from oslo.messaging.notify import _impl_test from oslo.messaging.notify import _impl_test
from oslo.messaging.notify import notifier as msg_notifier from oslo.messaging.notify import notifier as msg_notifier
from oslo.messaging.openstack.common import jsonutils from oslo.messaging.openstack.common import jsonutils
@ -188,7 +192,7 @@ class TestMessagingNotifier(test_utils.BaseTestCase):
target = messaging.Target(topic='%s.%s' % (topic, target = messaging.Target(topic='%s.%s' % (topic,
self.priority)) self.priority))
transport._send_notification(target, self.ctxt, message, transport._send_notification(target, self.ctxt, message,
**send_kwargs) **send_kwargs).InAnyOrder()
self.mox.ReplayAll() self.mox.ReplayAll()
@ -302,3 +306,183 @@ class TestLogNotifier(test_utils.BaseTestCase):
msg = {'event_type': 'foo'} msg = {'event_type': 'foo'}
driver.notify(None, msg, "sample") driver.notify(None, msg, "sample")
class TestRoutingNotifier(test_utils.BaseTestCase):
def setUp(self):
super(TestRoutingNotifier, self).setUp()
self.router = routing.RoutingDriver(None, None, None)
def _fake_extension_manager(self, ext):
return test_manager.TestExtensionManager(
[extension.Extension('test', None, None, ext), ])
def _empty_extension_manager(self):
return test_manager.TestExtensionManager([])
def test_should_load_plugin(self):
self.router.used_drivers = set(["zoo", "blah"])
ext = mock.MagicMock()
ext.name = "foo"
self.assertFalse(self.router._should_load_plugin(ext))
ext.name = "zoo"
self.assertTrue(self.router._should_load_plugin(ext))
def test_load_notifiers_no_config(self):
# default routing_notifier_config=""
self.router._load_notifiers()
self.assertEqual(self.router.routing_groups, {})
self.assertEqual(0, len(self.router.used_drivers))
def test_load_notifiers_no_extensions(self):
self.config(routing_notifier_config="routing_notifier.yaml")
routing_config = r""
config_file = mock.MagicMock()
config_file.return_value = routing_config
with mock.patch.object(self.router, '_get_notifier_config_file',
config_file):
with mock.patch('stevedore.dispatch.DispatchExtensionManager',
return_value=self._empty_extension_manager()):
with mock.patch('oslo.messaging.notify.'
'_impl_routing.LOG') as mylog:
self.router._load_notifiers()
self.assertFalse(mylog.debug.called)
self.assertEqual(self.router.routing_groups, {})
def test_load_notifiers_config(self):
self.config(routing_notifier_config="routing_notifier.yaml")
routing_config = r"""
group_1:
rpc : foo
group_2:
rpc : blah
"""
config_file = mock.MagicMock()
config_file.return_value = routing_config
with mock.patch.object(self.router, '_get_notifier_config_file',
config_file):
with mock.patch('stevedore.dispatch.DispatchExtensionManager',
return_value=self._fake_extension_manager(
mock.MagicMock())):
self.router._load_notifiers()
groups = self.router.routing_groups.keys()
groups.sort()
self.assertEqual(['group_1', 'group_2'], groups)
def test_get_drivers_for_message_accepted_events(self):
config = r"""
group_1:
rpc:
accepted_events:
- foo.*
- blah.zoo.*
- zip
"""
groups = yaml.load(config)
group = groups['group_1']
# No matching event ...
self.assertEqual([],
self.router._get_drivers_for_message(
group, "unknown", None))
# Child of foo ...
self.assertEqual(['rpc'],
self.router._get_drivers_for_message(
group, "foo.1", None))
# Foo itself ...
self.assertEqual([],
self.router._get_drivers_for_message(
group, "foo", None))
# Child of blah.zoo
self.assertEqual(['rpc'],
self.router._get_drivers_for_message(
group, "blah.zoo.zing", None))
def test_get_drivers_for_message_accepted_priorities(self):
config = r"""
group_1:
rpc:
accepted_priorities:
- info
- error
"""
groups = yaml.load(config)
group = groups['group_1']
# No matching priority
self.assertEqual([],
self.router._get_drivers_for_message(
group, None, "unknown"))
# Info ...
self.assertEqual(['rpc'],
self.router._get_drivers_for_message(
group, None, "info"))
# Error (to make sure the list is getting processed) ...
self.assertEqual(['rpc'],
self.router._get_drivers_for_message(
group, None, "error"))
def test_get_drivers_for_message_both(self):
config = r"""
group_1:
rpc:
accepted_priorities:
- info
accepted_events:
- foo.*
driver_1:
accepted_priorities:
- info
driver_2:
accepted_events:
- foo.*
"""
groups = yaml.load(config)
group = groups['group_1']
# Valid event, but no matching priority
self.assertEqual(['driver_2'],
self.router._get_drivers_for_message(
group, 'foo.blah', "unknown"))
# Valid priority, but no matching event
self.assertEqual(['driver_1'],
self.router._get_drivers_for_message(
group, 'unknown', "info"))
# Happy day ...
x = self.router._get_drivers_for_message(group, 'foo.blah', "info")
x.sort()
self.assertEqual(['driver_1', 'driver_2', 'rpc'], x)
def test_filter_func(self):
ext = mock.MagicMock()
ext.name = "rpc"
# Good ...
self.assertTrue(self.router._filter_func(ext, {}, {},
['foo', 'rpc']))
# Bad
self.assertFalse(self.router._filter_func(ext, {}, {}, ['foo']))
def test_notify(self):
self.router.routing_groups = {'group_1': None, 'group_2': None}
message = {'event_type': 'my_event', 'priority': 'my_priority'}
drivers_mock = mock.MagicMock()
drivers_mock.side_effect = [['rpc'], ['foo']]
with mock.patch.object(self.router, 'plugin_manager') as pm:
with mock.patch.object(self.router, '_get_drivers_for_message',
drivers_mock):
self.router.notify({}, message)
self.assertEqual(pm.map.call_args[0][4], ['rpc', 'foo'])