f6df32d943
Review I4e7b19dc730342091fd70a717065741d56da4555 gives a lot of the background here, but the idea is that some exceptions raised by an RPC endpoint method do not indicate any sort of failure and should not be logged by the server as an error. The classic example of this is conductor's instance_get() method raising InstanceNotFound. This is perfectly normal and should not be considered an error. The new API is a decorator which you can use with RPC endpoints methods to indicate which exceptions are expected: @messaging.expected_exceptions(InstanceNotFound) def instance_get(self, context, instance_id): ... but we also need to expose the ExpectedException type itself so that direct "local" users of the endpoint class know what type will be used to wrap expected exceptions. For example, Nova has an ExceptionHelper class which unwraps the original exception from an ExpectedException and re-raises it. I've changed from client_exceptions() and ClientException to make it more clear it's intent. I felt that the "client" naming gave the impression it was intended for use on the client side. Change-Id: Ieec4600bd6b70cf31ac7925a98a517b84acada4d
439 lines
15 KiB
Python
439 lines
15 KiB
Python
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
|
|
|
# Copyright 2013 Red Hat, Inc.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
import datetime
|
|
import sys
|
|
import threading
|
|
import uuid
|
|
|
|
import fixtures
|
|
import kombu
|
|
import testscenarios
|
|
|
|
from oslo import messaging
|
|
from oslo.messaging._drivers import impl_rabbit as rabbit_driver
|
|
from oslo.messaging.openstack.common import jsonutils
|
|
from oslo.messaging import transport as msg_transport
|
|
from tests import utils as test_utils
|
|
|
|
load_tests = testscenarios.load_tests_apply_scenarios
|
|
|
|
|
|
class TestRabbitDriverLoad(test_utils.BaseTestCase):
|
|
|
|
def setUp(self):
|
|
super(TestRabbitDriverLoad, self).setUp()
|
|
self.conf.register_opts(msg_transport._transport_opts)
|
|
self.conf.register_opts(rabbit_driver.rabbit_opts)
|
|
self.config(rpc_backend='rabbit')
|
|
self.config(fake_rabbit=True)
|
|
|
|
def test_driver_load(self):
|
|
transport = messaging.get_transport(self.conf)
|
|
self.assertIsInstance(transport._driver, rabbit_driver.RabbitDriver)
|
|
|
|
|
|
class TestSendReceive(test_utils.BaseTestCase):
|
|
|
|
_n_senders = [
|
|
('single_sender', dict(n_senders=1)),
|
|
('multiple_senders', dict(n_senders=10)),
|
|
]
|
|
|
|
_context = [
|
|
('empty_context', dict(ctxt={})),
|
|
('with_context', dict(ctxt={'user': 'mark'})),
|
|
]
|
|
|
|
_failure = [
|
|
('success', dict(failure=False)),
|
|
('failure', dict(failure=True, expected=False)),
|
|
('expected_failure', dict(failure=True, expected=True)),
|
|
]
|
|
|
|
_timeout = [
|
|
('no_timeout', dict(timeout=None)),
|
|
('timeout', dict(timeout=0.01)), # FIXME(markmc): timeout=0 is broken?
|
|
]
|
|
|
|
@classmethod
|
|
def generate_scenarios(cls):
|
|
cls.scenarios = testscenarios.multiply_scenarios(cls._n_senders,
|
|
cls._context,
|
|
cls._failure,
|
|
cls._timeout)
|
|
|
|
def setUp(self):
|
|
super(TestSendReceive, self).setUp()
|
|
self.conf.register_opts(msg_transport._transport_opts)
|
|
self.conf.register_opts(rabbit_driver.rabbit_opts)
|
|
self.config(rpc_backend='rabbit')
|
|
self.config(fake_rabbit=True)
|
|
|
|
def test_send_receive(self):
|
|
transport = messaging.get_transport(self.conf)
|
|
self.addCleanup(transport.cleanup)
|
|
|
|
driver = transport._driver
|
|
|
|
target = messaging.Target(topic='testtopic')
|
|
|
|
listener = driver.listen(target)
|
|
|
|
senders = []
|
|
replies = []
|
|
msgs = []
|
|
|
|
def send_and_wait_for_reply(i):
|
|
try:
|
|
replies.append(driver.send(target,
|
|
self.ctxt,
|
|
{'foo': i},
|
|
wait_for_reply=True,
|
|
timeout=self.timeout))
|
|
self.assertFalse(self.failure)
|
|
self.assertIsNone(self.timeout)
|
|
except (ZeroDivisionError, messaging.MessagingTimeout) as e:
|
|
replies.append(e)
|
|
self.assertTrue(self.failure or self.timeout is not None)
|
|
|
|
while len(senders) < self.n_senders:
|
|
senders.append(threading.Thread(target=send_and_wait_for_reply,
|
|
args=(len(senders), )))
|
|
|
|
for i in range(len(senders)):
|
|
senders[i].start()
|
|
|
|
received = listener.poll()
|
|
self.assertIsNotNone(received)
|
|
self.assertEqual(received.ctxt, self.ctxt)
|
|
self.assertEqual(received.message, {'foo': i})
|
|
msgs.append(received)
|
|
|
|
# reply in reverse, except reply to the first guy second from last
|
|
order = range(len(senders)-1, -1, -1)
|
|
if len(order) > 1:
|
|
order[-1], order[-2] = order[-2], order[-1]
|
|
|
|
for i in order:
|
|
if self.timeout is None:
|
|
if self.failure:
|
|
try:
|
|
raise ZeroDivisionError
|
|
except Exception:
|
|
failure = sys.exc_info()
|
|
msgs[i].reply(failure=failure,
|
|
log_failure=not self.expected)
|
|
else:
|
|
msgs[i].reply({'bar': msgs[i].message['foo']})
|
|
senders[i].join()
|
|
|
|
self.assertEqual(len(replies), len(senders))
|
|
for i, reply in enumerate(replies):
|
|
if self.timeout is not None:
|
|
self.assertIsInstance(reply, messaging.MessagingTimeout)
|
|
elif self.failure:
|
|
self.assertIsInstance(reply, ZeroDivisionError)
|
|
else:
|
|
self.assertEqual(reply, {'bar': order[i]})
|
|
|
|
|
|
TestSendReceive.generate_scenarios()
|
|
|
|
|
|
def _declare_queue(target):
|
|
connection = kombu.connection.BrokerConnection(transport='memory')
|
|
|
|
# Kludge to speed up tests.
|
|
connection.transport.polling_interval = 0.0
|
|
|
|
connection.connect()
|
|
channel = connection.channel()
|
|
|
|
# work around 'memory' transport bug in 1.1.3
|
|
channel._new_queue('ae.undeliver')
|
|
|
|
if target.fanout:
|
|
exchange = kombu.entity.Exchange(name=target.topic + '_fanout',
|
|
type='fanout',
|
|
durable=False,
|
|
auto_delete=True)
|
|
queue = kombu.entity.Queue(name=target.topic + '_fanout_12345',
|
|
channel=channel,
|
|
exchange=exchange,
|
|
routing_key=target.topic)
|
|
if target.server:
|
|
exchange = kombu.entity.Exchange(name='openstack',
|
|
type='topic',
|
|
durable=False,
|
|
auto_delete=False)
|
|
topic = '%s.%s' % (target.topic, target.server)
|
|
queue = kombu.entity.Queue(name=topic,
|
|
channel=channel,
|
|
exchange=exchange,
|
|
routing_key=topic)
|
|
else:
|
|
exchange = kombu.entity.Exchange(name='openstack',
|
|
type='topic',
|
|
durable=False,
|
|
auto_delete=False)
|
|
queue = kombu.entity.Queue(name=target.topic,
|
|
channel=channel,
|
|
exchange=exchange,
|
|
routing_key=target.topic)
|
|
|
|
queue.declare()
|
|
|
|
return connection, channel, queue
|
|
|
|
|
|
class TestRequestWireFormat(test_utils.BaseTestCase):
|
|
|
|
_target = [
|
|
('topic_target',
|
|
dict(topic='testtopic', server=None, fanout=False)),
|
|
('server_target',
|
|
dict(topic='testtopic', server='testserver', fanout=False)),
|
|
# NOTE(markmc): https://github.com/celery/kombu/issues/195
|
|
('fanout_target',
|
|
dict(topic='testtopic', server=None, fanout=True,
|
|
skip_msg='Requires kombu>2.5.12 to fix kombu issue #195')),
|
|
]
|
|
|
|
_msg = [
|
|
('empty_msg',
|
|
dict(msg={}, expected={})),
|
|
('primitive_msg',
|
|
dict(msg={'foo': 'bar'}, expected={'foo': 'bar'})),
|
|
('complex_msg',
|
|
dict(msg={'a': {'b': datetime.datetime(1920, 2, 3, 4, 5, 6, 7)}},
|
|
expected={'a': {'b': '1920-02-03T04:05:06.000007'}})),
|
|
]
|
|
|
|
_context = [
|
|
('empty_ctxt', dict(ctxt={}, expected_ctxt={})),
|
|
('user_project_ctxt',
|
|
dict(ctxt={'user': 'mark', 'project': 'snarkybunch'},
|
|
expected_ctxt={'_context_user': 'mark',
|
|
'_context_project': 'snarkybunch'})),
|
|
]
|
|
|
|
@classmethod
|
|
def generate_scenarios(cls):
|
|
cls.scenarios = testscenarios.multiply_scenarios(cls._msg,
|
|
cls._context,
|
|
cls._target)
|
|
|
|
def setUp(self):
|
|
super(TestRequestWireFormat, self).setUp()
|
|
self.conf.register_opts(msg_transport._transport_opts)
|
|
self.conf.register_opts(rabbit_driver.rabbit_opts)
|
|
self.config(rpc_backend='rabbit')
|
|
self.config(fake_rabbit=True)
|
|
|
|
self.uuids = []
|
|
self.orig_uuid4 = uuid.uuid4
|
|
self.useFixture(fixtures.MonkeyPatch('uuid.uuid4', self.mock_uuid4))
|
|
|
|
def mock_uuid4(self):
|
|
self.uuids.append(self.orig_uuid4())
|
|
return self.uuids[-1]
|
|
|
|
def test_request_wire_format(self):
|
|
if hasattr(self, 'skip_msg'):
|
|
self.skipTest(self.skip_msg)
|
|
|
|
transport = messaging.get_transport(self.conf)
|
|
self.addCleanup(transport.cleanup)
|
|
|
|
driver = transport._driver
|
|
|
|
target = messaging.Target(topic=self.topic,
|
|
server=self.server,
|
|
fanout=self.fanout)
|
|
|
|
connection, channel, queue = _declare_queue(target)
|
|
self.addCleanup(connection.release)
|
|
|
|
driver.send(target, self.ctxt, self.msg)
|
|
|
|
msgs = []
|
|
|
|
def callback(msg):
|
|
msg = channel.message_to_python(msg)
|
|
msg.ack()
|
|
msgs.append(msg.payload)
|
|
|
|
queue.consume(callback=callback,
|
|
consumer_tag='1',
|
|
nowait=False)
|
|
|
|
connection.drain_events()
|
|
|
|
self.assertEqual(1, len(msgs))
|
|
self.assertIn('oslo.message', msgs[0])
|
|
|
|
received = msgs[0]
|
|
received['oslo.message'] = jsonutils.loads(received['oslo.message'])
|
|
|
|
expected_msg = {
|
|
'_msg_id': self.uuids[0].hex,
|
|
'_unique_id': self.uuids[1].hex,
|
|
'_reply_q': 'reply_' + self.uuids[2].hex,
|
|
}
|
|
expected_msg.update(self.expected)
|
|
expected_msg.update(self.expected_ctxt)
|
|
|
|
expected = {
|
|
'oslo.version': '2.0',
|
|
'oslo.message': expected_msg,
|
|
}
|
|
|
|
self.assertEqual(expected, received)
|
|
|
|
|
|
TestRequestWireFormat.generate_scenarios()
|
|
|
|
|
|
def _create_producer(target):
|
|
connection = kombu.connection.BrokerConnection(transport='memory')
|
|
|
|
# Kludge to speed up tests.
|
|
connection.transport.polling_interval = 0.0
|
|
|
|
connection.connect()
|
|
channel = connection.channel()
|
|
|
|
# work around 'memory' transport bug in 1.1.3
|
|
channel._new_queue('ae.undeliver')
|
|
|
|
if target.fanout:
|
|
exchange = kombu.entity.Exchange(name=target.topic + '_fanout',
|
|
type='fanout',
|
|
durable=False,
|
|
auto_delete=True)
|
|
producer = kombu.messaging.Producer(exchange=exchange,
|
|
channel=channel,
|
|
routing_key=target.topic)
|
|
elif target.server:
|
|
exchange = kombu.entity.Exchange(name='openstack',
|
|
type='topic',
|
|
durable=False,
|
|
auto_delete=False)
|
|
topic = '%s.%s' % (target.topic, target.server)
|
|
producer = kombu.messaging.Producer(exchange=exchange,
|
|
channel=channel,
|
|
routing_key=topic)
|
|
else:
|
|
exchange = kombu.entity.Exchange(name='openstack',
|
|
type='topic',
|
|
durable=False,
|
|
auto_delete=False)
|
|
producer = kombu.messaging.Producer(exchange=exchange,
|
|
channel=channel,
|
|
routing_key=target.topic)
|
|
|
|
return connection, producer
|
|
|
|
|
|
class TestReplyWireFormat(test_utils.BaseTestCase):
|
|
|
|
_target = [
|
|
('topic_target',
|
|
dict(topic='testtopic', server=None, fanout=False)),
|
|
('server_target',
|
|
dict(topic='testtopic', server='testserver', fanout=False)),
|
|
# NOTE(markmc): https://github.com/celery/kombu/issues/195
|
|
('fanout_target',
|
|
dict(topic='testtopic', server=None, fanout=True,
|
|
skip_msg='Requires kombu>2.5.12 to fix kombu issue #195')),
|
|
]
|
|
|
|
_msg = [
|
|
('empty_msg',
|
|
dict(msg={}, expected={})),
|
|
('primitive_msg',
|
|
dict(msg={'foo': 'bar'}, expected={'foo': 'bar'})),
|
|
('complex_msg',
|
|
dict(msg={'a': {'b': '1920-02-03T04:05:06.000007'}},
|
|
expected={'a': {'b': '1920-02-03T04:05:06.000007'}})),
|
|
]
|
|
|
|
_context = [
|
|
('empty_ctxt', dict(ctxt={}, expected_ctxt={})),
|
|
('user_project_ctxt',
|
|
dict(ctxt={'_context_user': 'mark',
|
|
'_context_project': 'snarkybunch'},
|
|
expected_ctxt={'user': 'mark', 'project': 'snarkybunch'})),
|
|
]
|
|
|
|
@classmethod
|
|
def generate_scenarios(cls):
|
|
cls.scenarios = testscenarios.multiply_scenarios(cls._msg,
|
|
cls._context,
|
|
cls._target)
|
|
|
|
def setUp(self):
|
|
super(TestReplyWireFormat, self).setUp()
|
|
self.conf.register_opts(msg_transport._transport_opts)
|
|
self.conf.register_opts(rabbit_driver.rabbit_opts)
|
|
self.config(rpc_backend='rabbit')
|
|
self.config(fake_rabbit=True)
|
|
|
|
def test_reply_wire_format(self):
|
|
if hasattr(self, 'skip_msg'):
|
|
self.skipTest(self.skip_msg)
|
|
|
|
transport = messaging.get_transport(self.conf)
|
|
self.addCleanup(transport.cleanup)
|
|
|
|
driver = transport._driver
|
|
|
|
target = messaging.Target(topic=self.topic,
|
|
server=self.server,
|
|
fanout=self.fanout)
|
|
|
|
listener = driver.listen(target)
|
|
|
|
connection, producer = _create_producer(target)
|
|
self.addCleanup(connection.release)
|
|
|
|
msg = {
|
|
'oslo.version': '2.0',
|
|
'oslo.message': {}
|
|
}
|
|
|
|
msg['oslo.message'].update(self.msg)
|
|
msg['oslo.message'].update(self.ctxt)
|
|
|
|
msg['oslo.message'].update({
|
|
'_msg_id': uuid.uuid4().hex,
|
|
'_unique_id': uuid.uuid4().hex,
|
|
'_reply_q': 'reply_' + uuid.uuid4().hex,
|
|
})
|
|
|
|
msg['oslo.message'] = jsonutils.dumps(msg['oslo.message'])
|
|
|
|
producer.publish(msg)
|
|
|
|
received = listener.poll()
|
|
self.assertIsNotNone(received)
|
|
self.assertEqual(self.expected_ctxt, received.ctxt)
|
|
self.assertEqual(self.expected, received.message)
|
|
|
|
|
|
TestReplyWireFormat.generate_scenarios()
|