Utilize the new RequestContext redacted_copy method
We now expect context objects to support returning a redacted copy of themselves. As a related cleanup, removed the practice entirely of using dictionaries to represent contexts in unit tests and the logging driver. As part of developing this change, I discovered code in Glance (and potentially other services) which explicitly pass {} in lieu of a context when notifying; so we now properly handle dictionaries as contexts. To ensure we have the method required; require oslo.context 5.3.0 or newer. Change-Id: I894f38cc83c98d3e8d48b59864c0c7c2d27e7dcd
This commit is contained in:
parent
6ad1ccf89c
commit
800c58826e
@ -64,7 +64,7 @@ class LoggingNotificationHandler(logging.Handler):
|
|||||||
return
|
return
|
||||||
|
|
||||||
method(
|
method(
|
||||||
{},
|
None,
|
||||||
'logrecord',
|
'logrecord',
|
||||||
{
|
{
|
||||||
'name': record.name,
|
'name': record.name,
|
||||||
|
@ -172,48 +172,21 @@ def get_notification_transport(conf, url=None, allowed_remote_exmods=None):
|
|||||||
|
|
||||||
|
|
||||||
def _sanitize_context(ctxt):
|
def _sanitize_context(ctxt):
|
||||||
# NOTE(JayF): The below values are in the same order they are in
|
if ctxt is None or type(ctxt) is dict:
|
||||||
# oslo_context.context.RequestContext.__init__()
|
# NOTE(JayF): Logging drivers, unit tests, and some code calls
|
||||||
safe_keys = (
|
# notifier with an emptydict or None instead of an
|
||||||
'user_id',
|
# actual context. In these cases, discard the passed
|
||||||
'project_id',
|
# value.
|
||||||
'domain_id',
|
return {}
|
||||||
'user_domain_id',
|
|
||||||
'project_domain_id',
|
try:
|
||||||
# NOTE(JayF): Without is_admin; heat will make a roundtrip to policy
|
return ctxt.redacted_copy()
|
||||||
# to try to set it to a sane value when instantiating the
|
except AttributeError:
|
||||||
# replacement context. Instead, just pass it on.
|
# NOTE(JayF): We'd rather send a notification without any context
|
||||||
'is_admin',
|
# than missing sending the notification altogether.
|
||||||
'request_id',
|
_LOG.warning("Unable to properly redact context for "
|
||||||
'roles',
|
"notification, omitting context from notification.")
|
||||||
'user_name',
|
return {}
|
||||||
'project_name',
|
|
||||||
'domain_name',
|
|
||||||
'user_domain_name',
|
|
||||||
'project_domain_name',
|
|
||||||
'service_user_id',
|
|
||||||
'service_user_domain_id',
|
|
||||||
'service_user_domain_name',
|
|
||||||
'service_project_id',
|
|
||||||
'service_project_name',
|
|
||||||
'service_project_domain_id',
|
|
||||||
'service_project_domain_name',
|
|
||||||
'service_roles',
|
|
||||||
'global_request_id',
|
|
||||||
'system_scope',
|
|
||||||
# NOTE(JayF) These have been renamed but may show up in notifications
|
|
||||||
'user',
|
|
||||||
'domain',
|
|
||||||
'user_domain',
|
|
||||||
'project_domain',
|
|
||||||
)
|
|
||||||
ctxt_dict = ctxt if isinstance(ctxt, dict) else ctxt.to_dict()
|
|
||||||
safe_dict = {k: v for k, v in ctxt_dict.items()
|
|
||||||
if k in safe_keys}
|
|
||||||
if ctxt_dict is ctxt:
|
|
||||||
return safe_dict
|
|
||||||
else:
|
|
||||||
return ctxt.__class__.from_dict(safe_dict)
|
|
||||||
|
|
||||||
|
|
||||||
class Notifier(object):
|
class Notifier(object):
|
||||||
|
@ -189,13 +189,14 @@ class TestNotifyListener(test_utils.BaseTestCase, ListenerSetupMixin):
|
|||||||
batch=(5, 1))
|
batch=(5, 1))
|
||||||
|
|
||||||
notifier = self._setup_notifier(transport)
|
notifier = self._setup_notifier(transport)
|
||||||
|
cxt = test_utils.TestContext()
|
||||||
for _ in range(12):
|
for _ in range(12):
|
||||||
notifier.info({}, 'an_event.start', 'test message')
|
notifier.info(cxt, 'an_event.start', 'test message')
|
||||||
|
|
||||||
self.wait_for_messages(3)
|
self.wait_for_messages(3)
|
||||||
self.assertFalse(listener_thread.stop())
|
self.assertFalse(listener_thread.stop())
|
||||||
|
|
||||||
messages = [dict(ctxt={},
|
messages = [dict(ctxt=cxt,
|
||||||
publisher_id='testpublisher',
|
publisher_id='testpublisher',
|
||||||
event_type='an_event.start',
|
event_type='an_event.start',
|
||||||
payload='test message',
|
payload='test message',
|
||||||
@ -216,13 +217,14 @@ class TestNotifyListener(test_utils.BaseTestCase, ListenerSetupMixin):
|
|||||||
batch=(5, None))
|
batch=(5, None))
|
||||||
|
|
||||||
notifier = self._setup_notifier(transport)
|
notifier = self._setup_notifier(transport)
|
||||||
|
ctxt = test_utils.TestContext()
|
||||||
for _ in range(10):
|
for _ in range(10):
|
||||||
notifier.info({}, 'an_event.start', 'test message')
|
notifier.info(ctxt, 'an_event.start', 'test message')
|
||||||
|
|
||||||
self.wait_for_messages(2)
|
self.wait_for_messages(2)
|
||||||
self.assertFalse(listener_thread.stop())
|
self.assertFalse(listener_thread.stop())
|
||||||
|
|
||||||
messages = [dict(ctxt={},
|
messages = [dict(ctxt=ctxt,
|
||||||
publisher_id='testpublisher',
|
publisher_id='testpublisher',
|
||||||
event_type='an_event.start',
|
event_type='an_event.start',
|
||||||
payload='test message',
|
payload='test message',
|
||||||
@ -242,13 +244,14 @@ class TestNotifyListener(test_utils.BaseTestCase, ListenerSetupMixin):
|
|||||||
batch=(5, None))
|
batch=(5, None))
|
||||||
|
|
||||||
notifier = self._setup_notifier(transport)
|
notifier = self._setup_notifier(transport)
|
||||||
|
ctxt = test_utils.TestContext()
|
||||||
for _ in range(10):
|
for _ in range(10):
|
||||||
notifier.info({}, 'an_event.start', 'test message')
|
notifier.info(ctxt, 'an_event.start', 'test message')
|
||||||
|
|
||||||
self.wait_for_messages(2)
|
self.wait_for_messages(2)
|
||||||
self.assertFalse(listener_thread.stop())
|
self.assertFalse(listener_thread.stop())
|
||||||
|
|
||||||
messages = [dict(ctxt={},
|
messages = [dict(ctxt=ctxt,
|
||||||
publisher_id='testpublisher',
|
publisher_id='testpublisher',
|
||||||
event_type='an_event.start',
|
event_type='an_event.start',
|
||||||
payload='test message',
|
payload='test message',
|
||||||
@ -266,13 +269,14 @@ class TestNotifyListener(test_utils.BaseTestCase, ListenerSetupMixin):
|
|||||||
listener_thread = self._setup_listener(transport, [endpoint])
|
listener_thread = self._setup_listener(transport, [endpoint])
|
||||||
|
|
||||||
notifier = self._setup_notifier(transport)
|
notifier = self._setup_notifier(transport)
|
||||||
notifier.info({}, 'an_event.start', 'test message')
|
cxt = test_utils.TestContext()
|
||||||
|
notifier.info(cxt, 'an_event.start', 'test message')
|
||||||
|
|
||||||
self.wait_for_messages(1)
|
self.wait_for_messages(1)
|
||||||
self.assertFalse(listener_thread.stop())
|
self.assertFalse(listener_thread.stop())
|
||||||
|
|
||||||
endpoint.info.assert_called_once_with(
|
endpoint.info.assert_called_once_with(
|
||||||
{}, 'testpublisher', 'an_event.start', 'test message',
|
cxt, 'testpublisher', 'an_event.start', 'test message',
|
||||||
{'message_id': mock.ANY, 'timestamp': mock.ANY})
|
{'message_id': mock.ANY, 'timestamp': mock.ANY})
|
||||||
|
|
||||||
def test_two_topics(self):
|
def test_two_topics(self):
|
||||||
@ -286,18 +290,20 @@ class TestNotifyListener(test_utils.BaseTestCase, ListenerSetupMixin):
|
|||||||
listener_thread = self._setup_listener(transport, [endpoint],
|
listener_thread = self._setup_listener(transport, [endpoint],
|
||||||
targets=targets)
|
targets=targets)
|
||||||
notifier = self._setup_notifier(transport, topics=['topic1'])
|
notifier = self._setup_notifier(transport, topics=['topic1'])
|
||||||
notifier.info({'user_name': 'bob'}, 'an_event.start1', 'test')
|
cxt1 = test_utils.TestContext(user_name='bob')
|
||||||
|
notifier.info(cxt1, 'an_event.start1', 'test')
|
||||||
notifier = self._setup_notifier(transport, topics=['topic2'])
|
notifier = self._setup_notifier(transport, topics=['topic2'])
|
||||||
notifier.info({'user_name': 'bob2'}, 'an_event.start2', 'test')
|
cxt2 = test_utils.TestContext(user_name='bob2')
|
||||||
|
notifier.info(cxt2, 'an_event.start2', 'test')
|
||||||
|
|
||||||
self.wait_for_messages(2)
|
self.wait_for_messages(2)
|
||||||
self.assertFalse(listener_thread.stop())
|
self.assertFalse(listener_thread.stop())
|
||||||
|
|
||||||
endpoint.info.assert_has_calls([
|
endpoint.info.assert_has_calls([
|
||||||
mock.call({'user_name': 'bob'}, 'testpublisher',
|
mock.call(cxt1, 'testpublisher',
|
||||||
'an_event.start1', 'test',
|
'an_event.start1', 'test',
|
||||||
{'timestamp': mock.ANY, 'message_id': mock.ANY}),
|
{'timestamp': mock.ANY, 'message_id': mock.ANY}),
|
||||||
mock.call({'user_name': 'bob2'}, 'testpublisher',
|
mock.call(cxt2, 'testpublisher',
|
||||||
'an_event.start2', 'test',
|
'an_event.start2', 'test',
|
||||||
{'timestamp': mock.ANY, 'message_id': mock.ANY})],
|
{'timestamp': mock.ANY, 'message_id': mock.ANY})],
|
||||||
any_order=True)
|
any_order=True)
|
||||||
@ -326,23 +332,23 @@ class TestNotifyListener(test_utils.BaseTestCase, ListenerSetupMixin):
|
|||||||
transport._send_notification = mock.MagicMock(
|
transport._send_notification = mock.MagicMock(
|
||||||
side_effect=side_effect)
|
side_effect=side_effect)
|
||||||
|
|
||||||
notifier.info({'user_name': 'bob0'},
|
notifier.info(test_utils.TestContext(user_name='bob0'),
|
||||||
'an_event.start', 'test message default exchange')
|
'an_event.start', 'test message default exchange')
|
||||||
mock_notifier_exchange('exchange1')
|
mock_notifier_exchange('exchange1')
|
||||||
notifier.info({'user_name': 'bob1'},
|
ctxt1 = test_utils.TestContext(user_name='bob1')
|
||||||
'an_event.start', 'test message exchange1')
|
notifier.info(ctxt1, 'an_event.start', 'test message exchange1')
|
||||||
mock_notifier_exchange('exchange2')
|
mock_notifier_exchange('exchange2')
|
||||||
notifier.info({'user_name': 'bob2'},
|
ctxt2 = test_utils.TestContext(user_name='bob2')
|
||||||
'an_event.start', 'test message exchange2')
|
notifier.info(ctxt2, 'an_event.start', 'test message exchange2')
|
||||||
|
|
||||||
self.wait_for_messages(2)
|
self.wait_for_messages(2)
|
||||||
self.assertFalse(listener_thread.stop())
|
self.assertFalse(listener_thread.stop())
|
||||||
|
|
||||||
endpoint.info.assert_has_calls([
|
endpoint.info.assert_has_calls([
|
||||||
mock.call({'user_name': 'bob1'}, 'testpublisher', 'an_event.start',
|
mock.call(ctxt1, 'testpublisher', 'an_event.start',
|
||||||
'test message exchange1',
|
'test message exchange1',
|
||||||
{'timestamp': mock.ANY, 'message_id': mock.ANY}),
|
{'timestamp': mock.ANY, 'message_id': mock.ANY}),
|
||||||
mock.call({'user_name': 'bob2'}, 'testpublisher', 'an_event.start',
|
mock.call(ctxt2, 'testpublisher', 'an_event.start',
|
||||||
'test message exchange2',
|
'test message exchange2',
|
||||||
{'timestamp': mock.ANY, 'message_id': mock.ANY})],
|
{'timestamp': mock.ANY, 'message_id': mock.ANY})],
|
||||||
any_order=True)
|
any_order=True)
|
||||||
@ -358,18 +364,19 @@ class TestNotifyListener(test_utils.BaseTestCase, ListenerSetupMixin):
|
|||||||
listener_thread = self._setup_listener(transport,
|
listener_thread = self._setup_listener(transport,
|
||||||
[endpoint1, endpoint2])
|
[endpoint1, endpoint2])
|
||||||
notifier = self._setup_notifier(transport)
|
notifier = self._setup_notifier(transport)
|
||||||
notifier.info({}, 'an_event.start', 'test')
|
cxt = test_utils.TestContext()
|
||||||
|
notifier.info(cxt, 'an_event.start', 'test')
|
||||||
|
|
||||||
self.wait_for_messages(1)
|
self.wait_for_messages(1)
|
||||||
self.assertFalse(listener_thread.stop())
|
self.assertFalse(listener_thread.stop())
|
||||||
|
|
||||||
endpoint1.info.assert_called_once_with(
|
endpoint1.info.assert_called_once_with(
|
||||||
{}, 'testpublisher', 'an_event.start', 'test', {
|
cxt, 'testpublisher', 'an_event.start', 'test', {
|
||||||
'timestamp': mock.ANY,
|
'timestamp': mock.ANY,
|
||||||
'message_id': mock.ANY})
|
'message_id': mock.ANY})
|
||||||
|
|
||||||
endpoint2.info.assert_called_once_with(
|
endpoint2.info.assert_called_once_with(
|
||||||
{}, 'testpublisher', 'an_event.start', 'test', {
|
cxt, 'testpublisher', 'an_event.start', 'test', {
|
||||||
'timestamp': mock.ANY,
|
'timestamp': mock.ANY,
|
||||||
'message_id': mock.ANY})
|
'message_id': mock.ANY})
|
||||||
|
|
||||||
@ -387,15 +394,16 @@ class TestNotifyListener(test_utils.BaseTestCase, ListenerSetupMixin):
|
|||||||
endpoint.info.side_effect = side_effect_requeue
|
endpoint.info.side_effect = side_effect_requeue
|
||||||
listener_thread = self._setup_listener(transport, [endpoint])
|
listener_thread = self._setup_listener(transport, [endpoint])
|
||||||
notifier = self._setup_notifier(transport)
|
notifier = self._setup_notifier(transport)
|
||||||
notifier.info({}, 'an_event.start', 'test')
|
cxt = test_utils.TestContext()
|
||||||
|
notifier.info(cxt, 'an_event.start', 'test')
|
||||||
|
|
||||||
self.wait_for_messages(2)
|
self.wait_for_messages(2)
|
||||||
self.assertFalse(listener_thread.stop())
|
self.assertFalse(listener_thread.stop())
|
||||||
|
|
||||||
endpoint.info.assert_has_calls([
|
endpoint.info.assert_has_calls([
|
||||||
mock.call({}, 'testpublisher', 'an_event.start', 'test',
|
mock.call(cxt, 'testpublisher', 'an_event.start', 'test',
|
||||||
{'timestamp': mock.ANY, 'message_id': mock.ANY}),
|
{'timestamp': mock.ANY, 'message_id': mock.ANY}),
|
||||||
mock.call({}, 'testpublisher', 'an_event.start', 'test',
|
mock.call(cxt, 'testpublisher', 'an_event.start', 'test',
|
||||||
{'timestamp': mock.ANY, 'message_id': mock.ANY})])
|
{'timestamp': mock.ANY, 'message_id': mock.ANY})])
|
||||||
|
|
||||||
def test_two_pools(self):
|
def test_two_pools(self):
|
||||||
@ -414,23 +422,27 @@ class TestNotifyListener(test_utils.BaseTestCase, ListenerSetupMixin):
|
|||||||
targets=targets, pool="pool2")
|
targets=targets, pool="pool2")
|
||||||
|
|
||||||
notifier = self._setup_notifier(transport, topics=["topic"])
|
notifier = self._setup_notifier(transport, topics=["topic"])
|
||||||
notifier.info({'user_name': 'bob0'}, 'an_event.start', 'test message0')
|
ctxts = [
|
||||||
notifier.info({'user_name': 'bob1'}, 'an_event.start', 'test message1')
|
test_utils.TestContext(user_name='bob0'),
|
||||||
|
test_utils.TestContext(user_name='bob1')
|
||||||
|
]
|
||||||
|
notifier.info(ctxts[0], 'an_event.start', 'test message0')
|
||||||
|
notifier.info(ctxts[1], 'an_event.start', 'test message1')
|
||||||
|
|
||||||
self.wait_for_messages(2, "pool1")
|
self.wait_for_messages(2, "pool1")
|
||||||
self.wait_for_messages(2, "pool2")
|
self.wait_for_messages(2, "pool2")
|
||||||
self.assertFalse(listener2_thread.stop())
|
self.assertFalse(listener2_thread.stop())
|
||||||
self.assertFalse(listener1_thread.stop())
|
self.assertFalse(listener1_thread.stop())
|
||||||
|
|
||||||
def mocked_endpoint_call(i):
|
def mocked_endpoint_call(i, ctxts):
|
||||||
return mock.call({'user_name': 'bob%d' % i}, 'testpublisher',
|
return mock.call(ctxts[i], 'testpublisher',
|
||||||
'an_event.start', 'test message%d' % i,
|
'an_event.start', 'test message%d' % i,
|
||||||
{'timestamp': mock.ANY, 'message_id': mock.ANY})
|
{'timestamp': mock.ANY, 'message_id': mock.ANY})
|
||||||
|
|
||||||
endpoint1.info.assert_has_calls([mocked_endpoint_call(0),
|
endpoint1.info.assert_has_calls([mocked_endpoint_call(0, ctxts),
|
||||||
mocked_endpoint_call(1)])
|
mocked_endpoint_call(1, ctxts)])
|
||||||
endpoint2.info.assert_has_calls([mocked_endpoint_call(0),
|
endpoint2.info.assert_has_calls([mocked_endpoint_call(0, ctxts),
|
||||||
mocked_endpoint_call(1)])
|
mocked_endpoint_call(1, ctxts)])
|
||||||
|
|
||||||
def test_two_pools_three_listener(self):
|
def test_two_pools_three_listener(self):
|
||||||
transport = msg_notifier.get_notification_transport(
|
transport = msg_notifier.get_notification_transport(
|
||||||
@ -451,42 +463,42 @@ class TestNotifyListener(test_utils.BaseTestCase, ListenerSetupMixin):
|
|||||||
listener3_thread = self._setup_listener(transport, [endpoint3],
|
listener3_thread = self._setup_listener(transport, [endpoint3],
|
||||||
targets=targets, pool="pool2")
|
targets=targets, pool="pool2")
|
||||||
|
|
||||||
def mocked_endpoint_call(i):
|
def mocked_endpoint_call(i, ctxt):
|
||||||
return mock.call({'user_name': 'bob%d' % i}, 'testpublisher',
|
return mock.call(ctxt, 'testpublisher',
|
||||||
'an_event.start', 'test message%d' % i,
|
'an_event.start', 'test message%d' % i,
|
||||||
{'timestamp': mock.ANY, 'message_id': mock.ANY})
|
{'timestamp': mock.ANY, 'message_id': mock.ANY})
|
||||||
|
|
||||||
notifier = self._setup_notifier(transport, topics=["topic"])
|
notifier = self._setup_notifier(transport, topics=["topic"])
|
||||||
mocked_endpoint1_calls = []
|
mocked_endpoint1_calls = []
|
||||||
for i in range(0, 25):
|
for i in range(0, 25):
|
||||||
notifier.info({'user_name': 'bob%d' % i}, 'an_event.start',
|
ctxt = test_utils.TestContext(user_name='bob%d' % i)
|
||||||
'test message%d' % i)
|
notifier.info(ctxt, 'an_event.start', 'test message%d' % i)
|
||||||
mocked_endpoint1_calls.append(mocked_endpoint_call(i))
|
mocked_endpoint1_calls.append(mocked_endpoint_call(i, ctxt))
|
||||||
|
|
||||||
self.wait_for_messages(25, 'pool2')
|
self.wait_for_messages(25, 'pool2')
|
||||||
listener2_thread.stop()
|
listener2_thread.stop()
|
||||||
|
|
||||||
for i in range(0, 25):
|
for i in range(0, 25):
|
||||||
notifier.info({'user_name': 'bob%d' % i}, 'an_event.start',
|
cxt = test_utils.TestContext(user_name='bob%d' % i)
|
||||||
'test message%d' % i)
|
notifier.info(cxt, 'an_event.start', 'test message%d' % i)
|
||||||
mocked_endpoint1_calls.append(mocked_endpoint_call(i))
|
mocked_endpoint1_calls.append(mocked_endpoint_call(i, cxt))
|
||||||
|
|
||||||
self.wait_for_messages(50, 'pool2')
|
self.wait_for_messages(50, 'pool2')
|
||||||
listener2_thread.start()
|
listener2_thread.start()
|
||||||
listener3_thread.stop()
|
listener3_thread.stop()
|
||||||
|
|
||||||
for i in range(0, 25):
|
for i in range(0, 25):
|
||||||
notifier.info({'user_name': 'bob%d' % i}, 'an_event.start',
|
ctxt = test_utils.TestContext(user_name='bob%d' % i)
|
||||||
'test message%d' % i)
|
notifier.info(ctxt, 'an_event.start', 'test message%d' % i)
|
||||||
mocked_endpoint1_calls.append(mocked_endpoint_call(i))
|
mocked_endpoint1_calls.append(mocked_endpoint_call(i, ctxt))
|
||||||
|
|
||||||
self.wait_for_messages(75, 'pool2')
|
self.wait_for_messages(75, 'pool2')
|
||||||
listener3_thread.start()
|
listener3_thread.start()
|
||||||
|
|
||||||
for i in range(0, 25):
|
for i in range(0, 25):
|
||||||
notifier.info({'user_name': 'bob%d' % i}, 'an_event.start',
|
ctxt = test_utils.TestContext(user_name='bob%d' % i)
|
||||||
'test message%d' % i)
|
notifier.info(ctxt, 'an_event.start', 'test message%d' % i)
|
||||||
mocked_endpoint1_calls.append(mocked_endpoint_call(i))
|
mocked_endpoint1_calls.append(mocked_endpoint_call(i, ctxt))
|
||||||
|
|
||||||
self.wait_for_messages(100, 'pool1')
|
self.wait_for_messages(100, 'pool1')
|
||||||
self.wait_for_messages(100, 'pool2')
|
self.wait_for_messages(100, 'pool2')
|
||||||
|
@ -36,6 +36,7 @@ from oslo_messaging import serializer as msg_serializer
|
|||||||
from oslo_messaging.tests import utils as test_utils
|
from oslo_messaging.tests import utils as test_utils
|
||||||
from unittest import mock
|
from unittest import mock
|
||||||
|
|
||||||
|
|
||||||
load_tests = testscenarios.load_tests_apply_scenarios
|
load_tests = testscenarios.load_tests_apply_scenarios
|
||||||
|
|
||||||
|
|
||||||
@ -122,7 +123,7 @@ class TestMessagingNotifier(test_utils.BaseTestCase):
|
|||||||
]
|
]
|
||||||
|
|
||||||
_context = [
|
_context = [
|
||||||
('ctxt', dict(ctxt={'user_name': 'bob'})),
|
('ctxt', dict(ctxt=test_utils.TestContext(user_name='bob'))),
|
||||||
]
|
]
|
||||||
|
|
||||||
_retry = [
|
_retry = [
|
||||||
@ -229,157 +230,6 @@ class TestMessagingNotifier(test_utils.BaseTestCase):
|
|||||||
TestMessagingNotifier.generate_scenarios()
|
TestMessagingNotifier.generate_scenarios()
|
||||||
|
|
||||||
|
|
||||||
class TestMessagingNotifierContextFiltering(test_utils.BaseTestCase):
|
|
||||||
|
|
||||||
_v1 = [
|
|
||||||
('v1', dict(v1=True)),
|
|
||||||
('not_v1', dict(v1=False)),
|
|
||||||
]
|
|
||||||
|
|
||||||
_v2 = [
|
|
||||||
('v2', dict(v2=True)),
|
|
||||||
('not_v2', dict(v2=False)),
|
|
||||||
]
|
|
||||||
|
|
||||||
_publisher_id = [
|
|
||||||
('ctor_pub_id', dict(ctor_pub_id='test',
|
|
||||||
expected_pub_id='test')),
|
|
||||||
('prep_pub_id', dict(prep_pub_id='test.localhost',
|
|
||||||
expected_pub_id='test.localhost')),
|
|
||||||
('override', dict(ctor_pub_id='test',
|
|
||||||
prep_pub_id='test.localhost',
|
|
||||||
expected_pub_id='test.localhost')),
|
|
||||||
]
|
|
||||||
|
|
||||||
_topics = [
|
|
||||||
('no_topics', dict(topics=[])),
|
|
||||||
('single_topic', dict(topics=['notifications'])),
|
|
||||||
('multiple_topic2', dict(topics=['foo', 'bar'])),
|
|
||||||
]
|
|
||||||
|
|
||||||
_priority = [
|
|
||||||
('audit', dict(priority='audit')),
|
|
||||||
('debug', dict(priority='debug')),
|
|
||||||
('info', dict(priority='info')),
|
|
||||||
('warn', dict(priority='warn')),
|
|
||||||
('error', dict(priority='error')),
|
|
||||||
('sample', dict(priority='sample')),
|
|
||||||
('critical', dict(priority='critical')),
|
|
||||||
]
|
|
||||||
|
|
||||||
_payload = [
|
|
||||||
('payload', dict(payload={'foo': 'bar'})),
|
|
||||||
]
|
|
||||||
|
|
||||||
_context = [
|
|
||||||
('ctxt', dict(ctxt={'user_name': 'bob'})),
|
|
||||||
]
|
|
||||||
|
|
||||||
_retry = [
|
|
||||||
('unconfigured', dict()),
|
|
||||||
('None', dict(retry=None)),
|
|
||||||
('0', dict(retry=0)),
|
|
||||||
('5', dict(retry=5)),
|
|
||||||
]
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def generate_scenarios(cls):
|
|
||||||
cls.scenarios = testscenarios.multiply_scenarios(cls._v1,
|
|
||||||
cls._v2,
|
|
||||||
cls._publisher_id,
|
|
||||||
cls._topics,
|
|
||||||
cls._priority,
|
|
||||||
cls._payload,
|
|
||||||
cls._retry)
|
|
||||||
|
|
||||||
def setUp(self):
|
|
||||||
super(TestMessagingNotifierContextFiltering, self).setUp()
|
|
||||||
|
|
||||||
self.logger = self.useFixture(_ReRaiseLoggedExceptionsFixture()).logger
|
|
||||||
self.useFixture(fixtures.MockPatchObject(
|
|
||||||
messaging, 'LOG', self.logger))
|
|
||||||
self.useFixture(fixtures.MockPatchObject(
|
|
||||||
msg_notifier, '_LOG', self.logger))
|
|
||||||
|
|
||||||
@mock.patch('oslo_utils.timeutils.utcnow')
|
|
||||||
def test_notifier(self, mock_utcnow):
|
|
||||||
ctxt = {'user_name': 'bob', 'secret_data': 'redact_me'}
|
|
||||||
safe_ctxt = {'user_name': 'bob'}
|
|
||||||
drivers = []
|
|
||||||
if self.v1:
|
|
||||||
drivers.append('messaging')
|
|
||||||
if self.v2:
|
|
||||||
drivers.append('messagingv2')
|
|
||||||
|
|
||||||
self.config(driver=drivers,
|
|
||||||
topics=self.topics,
|
|
||||||
group='oslo_messaging_notifications')
|
|
||||||
|
|
||||||
transport = oslo_messaging.get_notification_transport(self.conf,
|
|
||||||
url='fake:')
|
|
||||||
|
|
||||||
if hasattr(self, 'ctor_pub_id'):
|
|
||||||
notifier = oslo_messaging.Notifier(transport,
|
|
||||||
publisher_id=self.ctor_pub_id)
|
|
||||||
else:
|
|
||||||
notifier = oslo_messaging.Notifier(transport)
|
|
||||||
|
|
||||||
prepare_kwds = {}
|
|
||||||
if hasattr(self, 'retry'):
|
|
||||||
prepare_kwds['retry'] = self.retry
|
|
||||||
if hasattr(self, 'prep_pub_id'):
|
|
||||||
prepare_kwds['publisher_id'] = self.prep_pub_id
|
|
||||||
if prepare_kwds:
|
|
||||||
notifier = notifier.prepare(**prepare_kwds)
|
|
||||||
|
|
||||||
transport._send_notification = mock.Mock()
|
|
||||||
|
|
||||||
message_id = uuid.uuid4()
|
|
||||||
uuid.uuid4 = mock.Mock(return_value=message_id)
|
|
||||||
|
|
||||||
mock_utcnow.return_value = datetime.datetime.utcnow()
|
|
||||||
|
|
||||||
message = {
|
|
||||||
'message_id': str(message_id),
|
|
||||||
'publisher_id': self.expected_pub_id,
|
|
||||||
'event_type': 'test.notify',
|
|
||||||
'priority': self.priority.upper(),
|
|
||||||
'payload': self.payload,
|
|
||||||
'timestamp': str(timeutils.utcnow()),
|
|
||||||
}
|
|
||||||
|
|
||||||
sends = []
|
|
||||||
if self.v1:
|
|
||||||
sends.append(dict(version=1.0))
|
|
||||||
if self.v2:
|
|
||||||
sends.append(dict(version=2.0))
|
|
||||||
|
|
||||||
calls = []
|
|
||||||
for send_kwargs in sends:
|
|
||||||
for topic in self.topics:
|
|
||||||
if hasattr(self, 'retry'):
|
|
||||||
send_kwargs['retry'] = self.retry
|
|
||||||
else:
|
|
||||||
send_kwargs['retry'] = -1
|
|
||||||
target = oslo_messaging.Target(topic='%s.%s' % (topic,
|
|
||||||
self.priority))
|
|
||||||
calls.append(mock.call(target,
|
|
||||||
safe_ctxt,
|
|
||||||
message,
|
|
||||||
**send_kwargs))
|
|
||||||
|
|
||||||
method = getattr(notifier, self.priority)
|
|
||||||
method(ctxt, 'test.notify', self.payload)
|
|
||||||
|
|
||||||
uuid.uuid4.assert_called_once_with()
|
|
||||||
transport._send_notification.assert_has_calls(calls, any_order=True)
|
|
||||||
|
|
||||||
self.assertTrue(notifier.is_enabled())
|
|
||||||
|
|
||||||
|
|
||||||
TestMessagingNotifierContextFiltering.generate_scenarios()
|
|
||||||
|
|
||||||
|
|
||||||
class TestMessagingNotifierRetry(test_utils.BaseTestCase):
|
class TestMessagingNotifierRetry(test_utils.BaseTestCase):
|
||||||
|
|
||||||
class TestingException(BaseException):
|
class TestingException(BaseException):
|
||||||
@ -422,7 +272,7 @@ class TestMessagingNotifierRetry(test_utils.BaseTestCase):
|
|||||||
with mock.patch(
|
with mock.patch(
|
||||||
'oslo_messaging.notify.messaging.LOG.exception'
|
'oslo_messaging.notify.messaging.LOG.exception'
|
||||||
) as mock_log:
|
) as mock_log:
|
||||||
notifier.info({}, "test", {})
|
notifier.info(test_utils.TestContext(), "test", {})
|
||||||
|
|
||||||
# one normal call plus two retries
|
# one normal call plus two retries
|
||||||
self.assertEqual(3, len(calls))
|
self.assertEqual(3, len(calls))
|
||||||
@ -451,7 +301,7 @@ class TestMessagingNotifierRetry(test_utils.BaseTestCase):
|
|||||||
# call simply returns without i) failing to deliver the message to
|
# call simply returns without i) failing to deliver the message to
|
||||||
# the non existent kafka bus ii) retrying the message delivery twice
|
# the non existent kafka bus ii) retrying the message delivery twice
|
||||||
# as the configuration requested it.
|
# as the configuration requested it.
|
||||||
notifier.info({}, "test", {})
|
notifier.info(test_utils.TestContext(), "test", {})
|
||||||
|
|
||||||
|
|
||||||
class TestSerializer(test_utils.BaseTestCase):
|
class TestSerializer(test_utils.BaseTestCase):
|
||||||
@ -484,7 +334,8 @@ class TestSerializer(test_utils.BaseTestCase):
|
|||||||
serializer.serialize_entity = mock.Mock()
|
serializer.serialize_entity = mock.Mock()
|
||||||
serializer.serialize_entity.return_value = 'sbar'
|
serializer.serialize_entity.return_value = 'sbar'
|
||||||
|
|
||||||
notifier.info(dict(user_name='bob'), 'test.notify', 'bar')
|
ctxt = test_utils.TestContext(user_name='bob')
|
||||||
|
notifier.info(ctxt, 'test.notify', 'bar')
|
||||||
|
|
||||||
message = {
|
message = {
|
||||||
'message_id': str(message_id),
|
'message_id': str(message_id),
|
||||||
@ -498,11 +349,10 @@ class TestSerializer(test_utils.BaseTestCase):
|
|||||||
self.assertEqual([(dict(user_name='alice'), message, 'INFO', -1)],
|
self.assertEqual([(dict(user_name='alice'), message, 'INFO', -1)],
|
||||||
_impl_test.NOTIFICATIONS)
|
_impl_test.NOTIFICATIONS)
|
||||||
|
|
||||||
uuid.uuid4.assert_called_once_with()
|
# NOTE(JayF): This is also called when we create a TestContext
|
||||||
serializer.serialize_context.assert_called_once_with(
|
uuid.uuid4.assert_has_calls([mock.call(), mock.call()])
|
||||||
dict(user_name='bob'))
|
serializer.serialize_context.assert_called_once_with(ctxt)
|
||||||
serializer.serialize_entity.assert_called_once_with(
|
serializer.serialize_entity.assert_called_once_with(ctxt, 'bar')
|
||||||
dict(user_name='bob'), 'bar')
|
|
||||||
|
|
||||||
|
|
||||||
class TestNotifierTopics(test_utils.BaseTestCase):
|
class TestNotifierTopics(test_utils.BaseTestCase):
|
||||||
@ -561,9 +411,10 @@ class TestLogNotifier(test_utils.BaseTestCase):
|
|||||||
with mock.patch.object(logging, 'getLogger') as gl:
|
with mock.patch.object(logging, 'getLogger') as gl:
|
||||||
gl.return_value = logger
|
gl.return_value = logger
|
||||||
|
|
||||||
notifier.info({}, 'test.notify', 'bar')
|
notifier.info(test_utils.TestContext(), 'test.notify', 'bar')
|
||||||
|
|
||||||
uuid.uuid4.assert_called_once_with()
|
# NOTE(JayF): TestContext calls this, too
|
||||||
|
uuid.uuid4.assert_has_calls([mock.call(), mock.call()])
|
||||||
logging.getLogger.assert_called_once_with(
|
logging.getLogger.assert_called_once_with(
|
||||||
'oslo.messaging.notification.test.notify')
|
'oslo.messaging.notification.test.notify')
|
||||||
|
|
||||||
@ -818,7 +669,7 @@ group_1:
|
|||||||
with mock.patch.object(self.router, 'plugin_manager') as pm:
|
with mock.patch.object(self.router, 'plugin_manager') as pm:
|
||||||
with mock.patch.object(self.router, '_get_drivers_for_message',
|
with mock.patch.object(self.router, '_get_drivers_for_message',
|
||||||
drivers_mock):
|
drivers_mock):
|
||||||
self.notifier.info({}, 'my_event', {})
|
self.notifier.info(test_utils.TestContext(), 'my_event', {})
|
||||||
self.assertEqual(sorted(['rpc', 'foo']),
|
self.assertEqual(sorted(['rpc', 'foo']),
|
||||||
sorted(pm.map.call_args[0][6]))
|
sorted(pm.map.call_args[0][6]))
|
||||||
|
|
||||||
@ -856,13 +707,13 @@ group_1:
|
|||||||
return_value=pm):
|
return_value=pm):
|
||||||
with mock.patch('oslo_messaging.notify.'
|
with mock.patch('oslo_messaging.notify.'
|
||||||
'_impl_routing.LOG'):
|
'_impl_routing.LOG'):
|
||||||
|
cxt = test_utils.TestContext()
|
||||||
self.notifier.info({}, 'my_event', {})
|
self.notifier.info(cxt, 'my_event', {})
|
||||||
self.assertFalse(bar_driver.info.called)
|
self.assertFalse(bar_driver.info.called)
|
||||||
rpc_driver.notify.assert_called_once_with(
|
rpc_driver.notify.assert_called_once_with(
|
||||||
{}, mock.ANY, 'INFO', -1)
|
cxt, mock.ANY, 'INFO', -1)
|
||||||
rpc2_driver.notify.assert_called_once_with(
|
rpc2_driver.notify.assert_called_once_with(
|
||||||
{}, mock.ANY, 'INFO', -1)
|
cxt, mock.ANY, 'INFO', -1)
|
||||||
|
|
||||||
|
|
||||||
class TestNoOpNotifier(test_utils.BaseTestCase):
|
class TestNoOpNotifier(test_utils.BaseTestCase):
|
||||||
|
@ -12,7 +12,6 @@
|
|||||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
# License for the specific language governing permissions and limitations
|
# License for the specific language governing permissions and limitations
|
||||||
# under the License.
|
# under the License.
|
||||||
|
|
||||||
from oslo_messaging._drivers import common
|
from oslo_messaging._drivers import common
|
||||||
from oslo_messaging import _utils as utils
|
from oslo_messaging import _utils as utils
|
||||||
from oslo_messaging.tests import utils as test_utils
|
from oslo_messaging.tests import utils as test_utils
|
||||||
|
@ -22,6 +22,7 @@
|
|||||||
import threading
|
import threading
|
||||||
|
|
||||||
from oslo_config import cfg
|
from oslo_config import cfg
|
||||||
|
from oslo_context.context import RequestContext
|
||||||
from oslo_utils import eventletutils
|
from oslo_utils import eventletutils
|
||||||
from oslotest import base
|
from oslotest import base
|
||||||
|
|
||||||
@ -82,3 +83,10 @@ class ServerThreadHelper(threading.Thread):
|
|||||||
|
|
||||||
def stop(self):
|
def stop(self):
|
||||||
self._stop_event.set()
|
self._stop_event.set()
|
||||||
|
|
||||||
|
|
||||||
|
class TestContext(RequestContext):
|
||||||
|
def redacted_copy(self):
|
||||||
|
# NOTE(JayF): By returning our self here instead of redacting, we can
|
||||||
|
# continue using equality comparisons in unit tests.
|
||||||
|
return self
|
||||||
|
@ -6,6 +6,7 @@ pbr!=2.1.0,>=2.0.0 # Apache-2.0
|
|||||||
|
|
||||||
futurist>=1.2.0 # Apache-2.0
|
futurist>=1.2.0 # Apache-2.0
|
||||||
oslo.config>=5.2.0 # Apache-2.0
|
oslo.config>=5.2.0 # Apache-2.0
|
||||||
|
oslo.context>=5.3.0 # Apache-2.0
|
||||||
oslo.log>=3.36.0 # Apache-2.0
|
oslo.log>=3.36.0 # Apache-2.0
|
||||||
oslo.utils>=3.37.0 # Apache-2.0
|
oslo.utils>=3.37.0 # Apache-2.0
|
||||||
oslo.serialization!=2.19.1,>=2.18.0 # Apache-2.0
|
oslo.serialization!=2.19.1,>=2.18.0 # Apache-2.0
|
||||||
|
Loading…
Reference in New Issue
Block a user