From 5149461fd2523a0c2afc187bd023784438f7b81a Mon Sep 17 00:00:00 2001 From: Dmitriy Ukhlov Date: Mon, 14 Dec 2015 11:36:28 +0200 Subject: [PATCH] Adds tests for pika_message.py Also small mistakes were fixed, msg_id, unique_id and reply_q fields were moved to corresponding AMQP properties Change-Id: I5147c35c1a2ce0205e08ca81db164a3cc879fb0a --- oslo_messaging/_drivers/impl_pika.py | 4 +- .../_drivers/pika_driver/pika_engine.py | 12 +- .../_drivers/pika_driver/pika_message.py | 261 ++++---- .../_drivers/pika_driver/pika_poller.py | 3 +- oslo_messaging/tests/drivers/pika/__init__.py | 0 .../tests/drivers/pika/test_message.py | 622 ++++++++++++++++++ 6 files changed, 781 insertions(+), 121 deletions(-) create mode 100644 oslo_messaging/tests/drivers/pika/__init__.py create mode 100644 oslo_messaging/tests/drivers/pika/test_message.py diff --git a/oslo_messaging/_drivers/impl_pika.py b/oslo_messaging/_drivers/impl_pika.py index eaa4a7685..3d633a5b1 100644 --- a/oslo_messaging/_drivers/impl_pika.py +++ b/oslo_messaging/_drivers/impl_pika.py @@ -198,8 +198,8 @@ class PikaDriver(object): "Timeout for current operation was expired." ) try: - with self._pika_engine.connection_pool.acquire( - timeout=timeout) as conn: + with (self._pika_engine.connection_without_confirmation_pool + .acquire)(timeout=timeout) as conn: self._pika_engine.declare_queue_binding_by_channel( conn.channel, exchange=( diff --git a/oslo_messaging/_drivers/pika_driver/pika_engine.py b/oslo_messaging/_drivers/pika_driver/pika_engine.py index 06dfcdbf1..6e877bb2e 100644 --- a/oslo_messaging/_drivers/pika_driver/pika_engine.py +++ b/oslo_messaging/_drivers/pika_driver/pika_engine.py @@ -12,6 +12,7 @@ # License for the specific language governing permissions and limitations # under the License. +import random import socket import sys import threading @@ -44,7 +45,7 @@ def _is_eventlet_monkey_patched(module): return eventlet.patcher.is_monkey_patched(module) -def _create__select_poller_connection_impl( +def _create_select_poller_connection_impl( parameters, on_open_callback, on_open_error_callback, on_close_callback, stop_ioloop_on_close): """Used for disabling autochoise of poller ('select', 'poll', 'epool', etc) @@ -198,7 +199,6 @@ class PikaEngine(object): self._connection_host_param_list = [] self._connection_host_status_list = [] - self._next_connection_host_num = 0 for transport_host in url.hosts: pika_params = common_pika_params.copy() @@ -215,9 +215,13 @@ class PikaEngine(object): self.HOST_CONNECTION_LAST_SUCCESS_TRY_TIME: 0 }) + self._next_connection_host_num = random.randint( + 0, len(self._connection_host_param_list) - 1 + ) + # initializing 2 connection pools: 1st for connections without # confirmations, 2nd - with confirmations - self.connection_pool = pika_pool.QueuedPool( + self.connection_without_confirmation_pool = pika_pool.QueuedPool( create=self.create_connection, max_size=self.conf.oslo_messaging_pika.pool_max_size, max_overflow=self.conf.oslo_messaging_pika.pool_max_overflow, @@ -336,7 +340,7 @@ class PikaEngine(object): ), **base_host_params ), - _impl_class=(_create__select_poller_connection_impl + _impl_class=(_create_select_poller_connection_impl if self._force_select_poller_use else None) ) diff --git a/oslo_messaging/_drivers/pika_driver/pika_message.py b/oslo_messaging/_drivers/pika_driver/pika_message.py index 9bf9febdb..edd5c7328 100644 --- a/oslo_messaging/_drivers/pika_driver/pika_message.py +++ b/oslo_messaging/_drivers/pika_driver/pika_message.py @@ -95,40 +95,36 @@ class PikaIncomingMessage(object): self._pika_engine = pika_engine self._no_ack = no_ack self._channel = channel - self.delivery_tag = method.delivery_tag + self._delivery_tag = method.delivery_tag - self.version = version + self._version = version - self.content_type = getattr(properties, "content_type", - "application/json") - self.content_encoding = getattr(properties, "content_encoding", - "utf-8") + self._content_type = properties.content_type + self._content_encoding = properties.content_encoding + self.unique_id = properties.message_id self.expiration_time = ( None if properties.expiration is None else time.time() + float(properties.expiration) / 1000 ) - if self.content_type != "application/json": + if self._content_type != "application/json": raise NotImplementedError( "Content-type['{}'] is not valid, " "'application/json' only is supported.".format( - self.content_type + self._content_type ) ) - message_dict = jsonutils.loads(body, encoding=self.content_encoding) + message_dict = jsonutils.loads(body, encoding=self._content_encoding) context_dict = {} for key in list(message_dict.keys()): key = six.text_type(key) - if key.startswith('_context_'): + if key.startswith('_$_'): value = message_dict.pop(key) - context_dict[key[9:]] = value - elif key.startswith('_'): - value = message_dict.pop(key) - setattr(self, key[1:], value) + context_dict[key[3:]] = value self.message = message_dict self.ctxt = context_dict @@ -138,7 +134,7 @@ class PikaIncomingMessage(object): message anymore) """ if not self._no_ack: - self._channel.basic_ack(delivery_tag=self.delivery_tag) + self._channel.basic_ack(delivery_tag=self._delivery_tag) def requeue(self): """Rollback the message. Should be called by message processing logic @@ -146,7 +142,7 @@ class PikaIncomingMessage(object): later if it is possible """ if not self._no_ack: - return self._channel.basic_nack(delivery_tag=self.delivery_tag, + return self._channel.basic_nack(delivery_tag=self._delivery_tag, requeue=True) @@ -170,58 +166,30 @@ class RpcPikaIncomingMessage(PikaIncomingMessage): :param no_ack: Boolean, defines should this message be acked by consumer or not """ - self.msg_id = None - self.reply_q = None - super(RpcPikaIncomingMessage, self).__init__( pika_engine, channel, method, properties, body, no_ack ) + self.reply_q = properties.reply_to + self.msg_id = properties.correlation_id def reply(self, reply=None, failure=None, log_failure=True): """Send back reply to the RPC client - :param reply - Dictionary, reply. In case of exception should be None - :param failure - Exception, exception, raised during processing RPC - request. Should be None if RPC request was successfully processed - :param log_failure, Boolean, not used in this implementation. + :param reply: Dictionary, reply. In case of exception should be None + :param failure: Tuple, should be a sys.exc_info() tuple. + Should be None if RPC request was successfully processed. + :param log_failure: Boolean, not used in this implementation. It present here to be compatible with driver API + + :return RpcReplyPikaIncomingMessage, message with reply """ - if not (self.msg_id and self.reply_q): + + if self.reply_q is None: return - msg = { - '_msg_id': self.msg_id, - } - - if failure is not None: - if isinstance(failure, RemoteExceptionMixin): - failure_data = { - 'class': failure.clazz, - 'module': failure.module, - 'message': failure.message, - 'tb': failure.trace - } - else: - tb = traceback.format_exception(*failure) - failure = failure[1] - - cls_name = six.text_type(failure.__class__.__name__) - mod_name = six.text_type(failure.__class__.__module__) - - failure_data = { - 'class': cls_name, - 'module': mod_name, - 'message': six.text_type(failure), - 'tb': tb - } - - msg['_failure'] = failure_data - - if reply is not None: - msg['_result'] = reply - - reply_outgoing_message = PikaOutgoingMessage( - self._pika_engine, msg, self.ctxt, content_type=self.content_type, - content_encoding=self.content_encoding + reply_outgoing_message = RpcReplyPikaOutgoingMessage( + self._pika_engine, self.msg_id, reply=reply, failure_info=failure, + content_type=self._content_type, + content_encoding=self._content_encoding ) def on_exception(ex): @@ -242,11 +210,7 @@ class RpcPikaIncomingMessage(PikaIncomingMessage): try: reply_outgoing_message.send( - exchange=self._pika_engine.rpc_reply_exchange, - routing_key=self.reply_q, - confirm=True, - mandatory=False, - persistent=False, + reply_q=self.reply_q, expiration_time=self.expiration_time, retrier=retrier ) @@ -282,18 +246,20 @@ class RpcReplyPikaIncomingMessage(PikaIncomingMessage): :param no_ack: Boolean, defines should this message be acked by consumer or not """ - self.result = None - self.failure = None - super(RpcReplyPikaIncomingMessage, self).__init__( pika_engine, channel, method, properties, body, no_ack ) + self.msg_id = properties.correlation_id + + self.result = self.message.get("s", None) + self.failure = self.message.get("e", None) + if self.failure is not None: - trace = self.failure.get('tb', []) - message = self.failure.get('message', "") - class_name = self.failure.get('class') - module_name = self.failure.get('module') + trace = self.failure.get('t', []) + message = self.failure.get('s', "") + class_name = self.failure.get('c') + module_name = self.failure.get('m') res_exc = None @@ -343,14 +309,14 @@ class PikaOutgoingMessage(object): self._pika_engine = pika_engine - self.content_type = content_type - self.content_encoding = content_encoding + self._content_type = content_type + self._content_encoding = content_encoding - if self.content_type != "application/json": + if self._content_type != "application/json": raise NotImplementedError( "Content-type['{}'] is not valid, " "'application/json' only is supported.".format( - self.content_type + self._content_type ) ) @@ -362,23 +328,21 @@ class PikaOutgoingMessage(object): def _prepare_message_to_send(self): """Combine user's message fields an system fields (_unique_id, context's data etc) - - :param pika_engine: PikaEngine, shared object with configuration and - shared driver functionality - :param message: Dictionary, user's message fields - :param context: Dictionary, request context's fields - :param content_type: String, content-type header, defines serialization - mechanism - :param content_encoding: String, defines encoding for text data """ msg = self.message.copy() - msg['_unique_id'] = self.unique_id + if self.context: + for key, value in six.iteritems(self.context): + key = six.text_type(key) + msg['_$_' + key] = value - for key, value in self.context.iteritems(): - key = six.text_type(key) - msg['_context_' + key] = value - return msg + props = pika_spec.BasicProperties( + content_encoding=self._content_encoding, + content_type=self._content_type, + headers={_VERSION_HEADER: _VERSION}, + message_id=self.unique_id, + ) + return msg, props @staticmethod def _publish(pool, exchange, routing_key, body, properties, mandatory, @@ -456,14 +420,15 @@ class PikaOutgoingMessage(object): "Socket timeout exceeded." ) - def _do_send(self, exchange, routing_key, msg_dict, confirm=True, - mandatory=True, persistent=False, expiration_time=None, - retrier=None): + def _do_send(self, exchange, routing_key, msg_dict, msg_props, + confirm=True, mandatory=True, persistent=False, + expiration_time=None, retrier=None): """Send prepared message with configured retrying :param exchange: String, RabbitMQ exchange name for message sending :param routing_key: String, RabbitMQ routing key for message routing :param msg_dict: Dictionary, message payload + :param msg_props: Properties, message properties :param confirm: Boolean, enable publisher confirmation if True :param mandatory: Boolean, RabbitMQ publish mandatory flag (raise exception if it is not possible to deliver message to any queue) @@ -474,29 +439,26 @@ class PikaOutgoingMessage(object): :param retrier: retrying.Retrier, configured retrier object for sending message, if None no retrying is performed """ - properties = pika_spec.BasicProperties( - content_encoding=self.content_encoding, - content_type=self.content_type, - headers={_VERSION_HEADER: _VERSION}, - delivery_mode=2 if persistent else 1 - ) + msg_props.delivery_mode = 2 if persistent else 1 pool = (self._pika_engine.connection_with_confirmation_pool - if confirm else self._pika_engine.connection_pool) + if confirm else + self._pika_engine.connection_without_confirmation_pool) - body = jsonutils.dumps(msg_dict, encoding=self.content_encoding) + body = jsonutils.dump_as_bytes(msg_dict, + encoding=self._content_encoding) LOG.debug( "Sending message:[body:{}; properties: {}] to target: " "[exchange:{}; routing_key:{}]".format( - body, properties, exchange, routing_key + body, msg_props, exchange, routing_key ) ) publish = (self._publish if retrier is None else retrier(self._publish)) - return publish(pool, exchange, routing_key, body, properties, + return publish(pool, exchange, routing_key, body, msg_props, mandatory, expiration_time) def send(self, exchange, routing_key='', confirm=True, mandatory=True, @@ -515,10 +477,11 @@ class PikaOutgoingMessage(object): :param retrier: retrying.Retrier, configured retrier object for sending message, if None no retrying is performed """ - msg_dict = self._prepare_message_to_send() + msg_dict, msg_props = self._prepare_message_to_send() - return self._do_send(exchange, routing_key, msg_dict, confirm, - mandatory, persistent, expiration_time, retrier) + return self._do_send(exchange, routing_key, msg_dict, msg_props, + confirm, mandatory, persistent, expiration_time, + retrier) class RpcPikaOutgoingMessage(PikaOutgoingMessage): @@ -554,23 +517,25 @@ class RpcPikaOutgoingMessage(PikaOutgoingMessage): target.topic, target.server, retrier is None ) - msg_dict = self._prepare_message_to_send() + msg_dict, msg_props = self._prepare_message_to_send() if reply_listener: - msg_id = uuid.uuid4().hex - msg_dict["_msg_id"] = msg_id - LOG.debug('MSG_ID is %s', msg_id) + self.msg_id = uuid.uuid4().hex + msg_props.correlation_id = self.msg_id + LOG.debug('MSG_ID is %s', self.msg_id) - msg_dict["_reply_q"] = reply_listener.get_reply_qname( + self.reply_q = reply_listener.get_reply_qname( expiration_time - time.time() ) + msg_props.reply_to = self.reply_q - future = reply_listener.register_reply_waiter(msg_id=msg_id) + future = reply_listener.register_reply_waiter(msg_id=self.msg_id) self._do_send( exchange=exchange, routing_key=queue, msg_dict=msg_dict, - confirm=True, mandatory=True, persistent=False, - expiration_time=expiration_time, retrier=retrier + msg_props=msg_props, confirm=True, mandatory=True, + persistent=False, expiration_time=expiration_time, + retrier=retrier ) try: @@ -580,10 +545,78 @@ class RpcPikaOutgoingMessage(PikaOutgoingMessage): if isinstance(e, futures.TimeoutError): e = exceptions.MessagingTimeout() raise e - else: self._do_send( exchange=exchange, routing_key=queue, msg_dict=msg_dict, - confirm=True, mandatory=True, persistent=False, - expiration_time=expiration_time, retrier=retrier + msg_props=msg_props, confirm=True, mandatory=True, + persistent=False, expiration_time=expiration_time, + retrier=retrier ) + + +class RpcReplyPikaOutgoingMessage(PikaOutgoingMessage): + """PikaOutgoingMessage implementation for RPC reply messages. It sets + correlation_id AMQP property to link this reply with response + """ + def __init__(self, pika_engine, msg_id, reply=None, failure_info=None, + content_type="application/json", content_encoding="utf-8"): + """Initialize with reply information for sending + + :param pika_engine: PikaEngine, shared object with configuration and + shared driver functionality + :param msg_id: String, msg_id of RPC request, which waits for reply + :param reply: Dictionary, reply. In case of exception should be None + :param failure_info: Tuple, should be a sys.exc_info() tuple. + Should be None if RPC request was successfully processed. + :param content_type: String, content-type header, defines serialization + mechanism + :param content_encoding: String, defines encoding for text data + """ + self.msg_id = msg_id + + if failure_info is not None: + ex_class = failure_info[0] + ex = failure_info[1] + tb = traceback.format_exception(*failure_info) + if issubclass(ex_class, RemoteExceptionMixin): + failure_data = { + 'c': ex.clazz, + 'm': ex.module, + 's': ex.message, + 't': tb + } + else: + failure_data = { + 'c': six.text_type(ex_class.__name__), + 'm': six.text_type(ex_class.__module__), + 's': six.text_type(ex), + 't': tb + } + + msg = {'e': failure_data} + else: + msg = {'s': reply} + + super(RpcReplyPikaOutgoingMessage, self).__init__( + pika_engine, msg, None, content_type, content_encoding + ) + + def send(self, reply_q, expiration_time=None, retrier=None): + """Send RPC message with configured retrying + + :param reply_q: String, queue name for sending reply + :param expiration_time: Float, expiration time in seconds + (like time.time()) + :param retrier: retrying.Retrier, configured retrier object for sending + message, if None no retrying is performed + """ + + msg_dict, msg_props = self._prepare_message_to_send() + msg_props.correlation_id = self.msg_id + + self._do_send( + exchange=self._pika_engine.rpc_reply_exchange, routing_key=reply_q, + msg_dict=msg_dict, msg_props=msg_props, confirm=True, + mandatory=True, persistent=False, expiration_time=expiration_time, + retrier=retrier + ) diff --git a/oslo_messaging/_drivers/pika_driver/pika_poller.py b/oslo_messaging/_drivers/pika_driver/pika_poller.py index 1390ced75..5aa948a2e 100644 --- a/oslo_messaging/_drivers/pika_driver/pika_poller.py +++ b/oslo_messaging/_drivers/pika_driver/pika_poller.py @@ -18,6 +18,7 @@ import time from oslo_log import log as logging import pika_pool import retrying +import six from oslo_messaging._drivers.pika_driver import pika_message as pika_drv_msg @@ -68,7 +69,7 @@ class PikaPoller(object): if self._queues_to_consume is None: self._queues_to_consume = self._declare_queue_binding() - for queue, no_ack in self._queues_to_consume.iteritems(): + for queue, no_ack in six.iteritems(self._queues_to_consume): self._start_consuming(queue, no_ack) def _declare_queue_binding(self): diff --git a/oslo_messaging/tests/drivers/pika/__init__.py b/oslo_messaging/tests/drivers/pika/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/oslo_messaging/tests/drivers/pika/test_message.py b/oslo_messaging/tests/drivers/pika/test_message.py new file mode 100644 index 000000000..5008ce36e --- /dev/null +++ b/oslo_messaging/tests/drivers/pika/test_message.py @@ -0,0 +1,622 @@ +# Copyright 2015 Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +import functools +import time +import unittest + +from concurrent import futures +from mock import mock, patch +from oslo_serialization import jsonutils +import pika +from pika import spec + +import oslo_messaging +from oslo_messaging._drivers.pika_driver import pika_engine +from oslo_messaging._drivers.pika_driver import pika_message as pika_drv_msg + + +class PikaIncomingMessageTestCase(unittest.TestCase): + def setUp(self): + self._pika_engine = mock.Mock() + self._channel = mock.Mock() + + self._delivery_tag = 12345 + + self._method = pika.spec.Basic.Deliver(delivery_tag=self._delivery_tag) + self._properties = pika.BasicProperties( + content_type="application/json", + headers={"version": "1.0"}, + ) + self._body = ( + b'{"_$_key_context":"context_value",' + b'"payload_key": "payload_value"}' + ) + + def test_message_body_parsing(self): + message = pika_drv_msg.PikaIncomingMessage( + self._pika_engine, self._channel, self._method, self._properties, + self._body, True + ) + + self.assertEqual(message.ctxt.get("key_context", None), + "context_value") + self.assertEqual(message.message.get("payload_key", None), + "payload_value") + + def test_message_acknowledge(self): + message = pika_drv_msg.PikaIncomingMessage( + self._pika_engine, self._channel, self._method, self._properties, + self._body, False + ) + + message.acknowledge() + + self.assertEqual(1, self._channel.basic_ack.call_count) + self.assertEqual({"delivery_tag": self._delivery_tag}, + self._channel.basic_ack.call_args[1]) + + def test_message_acknowledge_no_ack(self): + message = pika_drv_msg.PikaIncomingMessage( + self._pika_engine, self._channel, self._method, self._properties, + self._body, True + ) + + message.acknowledge() + + self.assertEqual(0, self._channel.basic_ack.call_count) + + def test_message_requeue(self): + message = pika_drv_msg.PikaIncomingMessage( + self._pika_engine, self._channel, self._method, self._properties, + self._body, False + ) + + message.requeue() + + self.assertEqual(1, self._channel.basic_nack.call_count) + self.assertEqual({"delivery_tag": self._delivery_tag, 'requeue': True}, + self._channel.basic_nack.call_args[1]) + + def test_message_requeue_no_ack(self): + message = pika_drv_msg.PikaIncomingMessage( + self._pika_engine, self._channel, self._method, self._properties, + self._body, True + ) + + message.requeue() + + self.assertEqual(0, self._channel.basic_nack.call_count) + + +class RpcPikaIncomingMessageTestCase(unittest.TestCase): + def setUp(self): + self._pika_engine = mock.Mock() + self._pika_engine.rpc_reply_retry_attempts = 3 + self._pika_engine.rpc_reply_retry_delay = 0.25 + + self._channel = mock.Mock() + + self._delivery_tag = 12345 + + self._method = pika.spec.Basic.Deliver(delivery_tag=self._delivery_tag) + self._body = ( + b'{"_$_key_context":"context_value",' + b'"payload_key":"payload_value"}' + ) + self._properties = pika.BasicProperties( + content_type="application/json", + content_encoding="utf-8", + headers={"version": "1.0"}, + ) + + def test_call_message_body_parsing(self): + self._properties.correlation_id = 123456789 + self._properties.reply_to = "reply_queue" + + message = pika_drv_msg.RpcPikaIncomingMessage( + self._pika_engine, self._channel, self._method, self._properties, + self._body, True + ) + + self.assertEqual(message.ctxt.get("key_context", None), + "context_value") + self.assertEqual(message.msg_id, 123456789) + self.assertEqual(message.reply_q, "reply_queue") + + self.assertEqual(message.message.get("payload_key", None), + "payload_value") + + def test_cast_message_body_parsing(self): + message = pika_drv_msg.RpcPikaIncomingMessage( + self._pika_engine, self._channel, self._method, self._properties, + self._body, True + ) + + self.assertEqual(message.ctxt.get("key_context", None), + "context_value") + self.assertEqual(message.msg_id, None) + self.assertEqual(message.reply_q, None) + + self.assertEqual(message.message.get("payload_key", None), + "payload_value") + + @patch(("oslo_messaging._drivers.pika_driver.pika_message." + "PikaOutgoingMessage.send")) + def test_reply_for_cast_message(self, send_reply_mock): + message = pika_drv_msg.RpcPikaIncomingMessage( + self._pika_engine, self._channel, self._method, self._properties, + self._body, True + ) + + self.assertEqual(message.ctxt.get("key_context", None), + "context_value") + self.assertEqual(message.msg_id, None) + self.assertEqual(message.reply_q, None) + + self.assertEqual(message.message.get("payload_key", None), + "payload_value") + + message.reply(reply=object()) + + self.assertEqual(send_reply_mock.call_count, 0) + + @patch("oslo_messaging._drivers.pika_driver.pika_message." + "RpcReplyPikaOutgoingMessage") + @patch("retrying.retry") + def test_positive_reply_for_call_message(self, + retry_mock, + outgoing_message_mock): + self._properties.correlation_id = 123456789 + self._properties.reply_to = "reply_queue" + + message = pika_drv_msg.RpcPikaIncomingMessage( + self._pika_engine, self._channel, self._method, self._properties, + self._body, True + ) + + self.assertEqual(message.ctxt.get("key_context", None), + "context_value") + self.assertEqual(message.msg_id, 123456789) + self.assertEqual(message.reply_q, "reply_queue") + + self.assertEqual(message.message.get("payload_key", None), + "payload_value") + reply = "all_fine" + message.reply(reply=reply) + + outgoing_message_mock.assert_called_once_with( + self._pika_engine, 123456789, failure_info=None, reply='all_fine', + content_encoding='utf-8', content_type='application/json' + ) + outgoing_message_mock().send.assert_called_once_with( + expiration_time=None, reply_q='reply_queue', retrier=mock.ANY + ) + retry_mock.assert_called_once_with( + retry_on_exception=mock.ANY, stop_max_attempt_number=3, + wait_fixed=250.0 + ) + + @patch("oslo_messaging._drivers.pika_driver.pika_message." + "RpcReplyPikaOutgoingMessage") + @patch("retrying.retry") + def test_negative_reply_for_call_message(self, + retry_mock, + outgoing_message_mock): + self._properties.correlation_id = 123456789 + self._properties.reply_to = "reply_queue" + + message = pika_drv_msg.RpcPikaIncomingMessage( + self._pika_engine, self._channel, self._method, self._properties, + self._body, True + ) + + self.assertEqual(message.ctxt.get("key_context", None), + "context_value") + self.assertEqual(message.msg_id, 123456789) + self.assertEqual(message.reply_q, "reply_queue") + + self.assertEqual(message.message.get("payload_key", None), + "payload_value") + + failure_info = object() + message.reply(failure=failure_info) + + outgoing_message_mock.assert_called_once_with( + self._pika_engine, 123456789, + failure_info=failure_info, + reply=None, + content_encoding='utf-8', + content_type='application/json' + ) + outgoing_message_mock().send.assert_called_once_with( + expiration_time=None, reply_q='reply_queue', retrier=mock.ANY + ) + retry_mock.assert_called_once_with( + retry_on_exception=mock.ANY, stop_max_attempt_number=3, + wait_fixed=250.0 + ) + + +class RpcReplyPikaIncomingMessageTestCase(unittest.TestCase): + def setUp(self): + self._pika_engine = mock.Mock() + self._pika_engine.allowed_remote_exmods = [ + pika_engine._EXCEPTIONS_MODULE, "oslo_messaging.exceptions" + ] + + self._channel = mock.Mock() + + self._delivery_tag = 12345 + + self._method = pika.spec.Basic.Deliver(delivery_tag=self._delivery_tag) + + self._properties = pika.BasicProperties( + content_type="application/json", + content_encoding="utf-8", + headers={"version": "1.0"}, + correlation_id=123456789 + ) + + def test_positive_reply_message_body_parsing(self): + + body = b'{"s": "all fine"}' + + message = pika_drv_msg.RpcReplyPikaIncomingMessage( + self._pika_engine, self._channel, self._method, self._properties, + body, True + ) + + self.assertEqual(message.msg_id, 123456789) + self.assertIsNone(message.failure) + self.assertEquals(message.result, "all fine") + + def test_negative_reply_message_body_parsing(self): + + body = (b'{' + b' "e": {' + b' "s": "Error message",' + b' "t": ["TRACE HERE"],' + b' "c": "MessagingException",' + b' "m": "oslo_messaging.exceptions"' + b' }' + b'}') + + message = pika_drv_msg.RpcReplyPikaIncomingMessage( + self._pika_engine, self._channel, self._method, self._properties, + body, True + ) + + self.assertEqual(message.msg_id, 123456789) + self.assertIsNone(message.result) + self.assertEquals( + str(message.failure), + 'Error message\n' + 'TRACE HERE' + ) + self.assertIsInstance(message.failure, + oslo_messaging.MessagingException) + + +class PikaOutgoingMessageTestCase(unittest.TestCase): + def setUp(self): + self._pika_engine = mock.MagicMock() + self._exchange = "it is exchange" + self._routing_key = "it is routing key" + self._expiration = 1 + self._expiration_time = time.time() + self._expiration + self._mandatory = object() + + self._message = {"msg_type": 1, "msg_str": "hello"} + self._context = {"request_id": 555, "token": "it is a token"} + + @patch("oslo_serialization.jsonutils.dumps", + new=functools.partial(jsonutils.dumps, sort_keys=True)) + def test_send_with_confirmation(self): + message = pika_drv_msg.PikaOutgoingMessage( + self._pika_engine, self._message, self._context + ) + + message.send( + exchange=self._exchange, + routing_key=self._routing_key, + confirm=True, + mandatory=self._mandatory, + persistent=True, + expiration_time=self._expiration_time, + retrier=None + ) + + self._pika_engine.connection_with_confirmation_pool.acquire( + ).__enter__().channel.publish.assert_called_once_with( + body=mock.ANY, + exchange=self._exchange, mandatory=self._mandatory, + properties=mock.ANY, + routing_key=self._routing_key + ) + + body = self._pika_engine.connection_with_confirmation_pool.acquire( + ).__enter__().channel.publish.call_args[1]["body"] + + self.assertEqual( + b'{"_$_request_id": 555, "_$_token": "it is a token", ' + b'"msg_str": "hello", "msg_type": 1}', + body + ) + + props = self._pika_engine.connection_with_confirmation_pool.acquire( + ).__enter__().channel.publish.call_args[1]["properties"] + + self.assertEqual(props.content_encoding, 'utf-8') + self.assertEqual(props.content_type, 'application/json') + self.assertEqual(props.delivery_mode, 2) + self.assertTrue(self._expiration * 1000 - float(props.expiration) < + 100) + self.assertEqual(props.headers, {'version': '1.0'}) + self.assertTrue(props.message_id) + + @patch("oslo_serialization.jsonutils.dumps", + new=functools.partial(jsonutils.dumps, sort_keys=True)) + def test_send_without_confirmation(self): + message = pika_drv_msg.PikaOutgoingMessage( + self._pika_engine, self._message, self._context + ) + + message.send( + exchange=self._exchange, + routing_key=self._routing_key, + confirm=False, + mandatory=self._mandatory, + persistent=False, + expiration_time=self._expiration_time, + retrier=None + ) + + self._pika_engine.connection_without_confirmation_pool.acquire( + ).__enter__().channel.publish.assert_called_once_with( + body=mock.ANY, + exchange=self._exchange, mandatory=self._mandatory, + properties=mock.ANY, + routing_key=self._routing_key + ) + + body = self._pika_engine.connection_without_confirmation_pool.acquire( + ).__enter__().channel.publish.call_args[1]["body"] + + self.assertEqual( + b'{"_$_request_id": 555, "_$_token": "it is a token", ' + b'"msg_str": "hello", "msg_type": 1}', + body + ) + + props = self._pika_engine.connection_without_confirmation_pool.acquire( + ).__enter__().channel.publish.call_args[1]["properties"] + + self.assertEqual(props.content_encoding, 'utf-8') + self.assertEqual(props.content_type, 'application/json') + self.assertEqual(props.delivery_mode, 1) + self.assertTrue(self._expiration * 1000 - float(props.expiration) + < 100) + self.assertEqual(props.headers, {'version': '1.0'}) + self.assertTrue(props.message_id) + + +class RpcPikaOutgoingMessageTestCase(unittest.TestCase): + def setUp(self): + self._exchange = "it is exchange" + self._routing_key = "it is routing key" + + self._pika_engine = mock.MagicMock() + self._pika_engine.get_rpc_exchange_name.return_value = self._exchange + self._pika_engine.get_rpc_queue_name.return_value = self._routing_key + + self._message = {"msg_type": 1, "msg_str": "hello"} + self._context = {"request_id": 555, "token": "it is a token"} + + @patch("oslo_serialization.jsonutils.dumps", + new=functools.partial(jsonutils.dumps, sort_keys=True)) + def test_send_cast_message(self): + message = pika_drv_msg.RpcPikaOutgoingMessage( + self._pika_engine, self._message, self._context + ) + + expiration = 1 + expiration_time = time.time() + expiration + + message.send( + target=oslo_messaging.Target(exchange=self._exchange, + topic=self._routing_key), + reply_listener=None, + expiration_time=expiration_time, + retrier=None + ) + + self._pika_engine.connection_with_confirmation_pool.acquire( + ).__enter__().channel.publish.assert_called_once_with( + body=mock.ANY, + exchange=self._exchange, mandatory=True, + properties=mock.ANY, + routing_key=self._routing_key + ) + + body = self._pika_engine.connection_with_confirmation_pool.acquire( + ).__enter__().channel.publish.call_args[1]["body"] + + self.assertEqual( + b'{"_$_request_id": 555, "_$_token": "it is a token", ' + b'"msg_str": "hello", "msg_type": 1}', + body + ) + + props = self._pika_engine.connection_with_confirmation_pool.acquire( + ).__enter__().channel.publish.call_args[1]["properties"] + + self.assertEqual(props.content_encoding, 'utf-8') + self.assertEqual(props.content_type, 'application/json') + self.assertEqual(props.delivery_mode, 1) + self.assertTrue(expiration * 1000 - float(props.expiration) < 100) + self.assertEqual(props.headers, {'version': '1.0'}) + self.assertIsNone(props.correlation_id) + self.assertIsNone(props.reply_to) + self.assertTrue(props.message_id) + + @patch("oslo_serialization.jsonutils.dumps", + new=functools.partial(jsonutils.dumps, sort_keys=True)) + def test_send_call_message(self): + message = pika_drv_msg.RpcPikaOutgoingMessage( + self._pika_engine, self._message, self._context + ) + + expiration = 1 + expiration_time = time.time() + expiration + + result = "it is a result" + reply_queue_name = "reply_queue_name" + + future = futures.Future() + future.set_result(result) + reply_listener = mock.Mock() + reply_listener.register_reply_waiter.return_value = future + reply_listener.get_reply_qname.return_value = reply_queue_name + + res = message.send( + target=oslo_messaging.Target(exchange=self._exchange, + topic=self._routing_key), + reply_listener=reply_listener, + expiration_time=expiration_time, + retrier=None + ) + + self.assertEqual(result, res) + + self._pika_engine.connection_with_confirmation_pool.acquire( + ).__enter__().channel.publish.assert_called_once_with( + body=mock.ANY, + exchange=self._exchange, mandatory=True, + properties=mock.ANY, + routing_key=self._routing_key + ) + + body = self._pika_engine.connection_with_confirmation_pool.acquire( + ).__enter__().channel.publish.call_args[1]["body"] + + self.assertEqual( + b'{"_$_request_id": 555, "_$_token": "it is a token", ' + b'"msg_str": "hello", "msg_type": 1}', + body + ) + + props = self._pika_engine.connection_with_confirmation_pool.acquire( + ).__enter__().channel.publish.call_args[1]["properties"] + + self.assertEqual(props.content_encoding, 'utf-8') + self.assertEqual(props.content_type, 'application/json') + self.assertEqual(props.delivery_mode, 1) + self.assertTrue(expiration * 1000 - float(props.expiration) < 100) + self.assertEqual(props.headers, {'version': '1.0'}) + self.assertEqual(props.correlation_id, message.msg_id) + self.assertEquals(props.reply_to, reply_queue_name) + self.assertTrue(props.message_id) + + +class RpcReplyPikaOutgoingMessageTestCase(unittest.TestCase): + def setUp(self): + self._reply_q = "reply_queue_name" + + self._expiration = 1 + self._expiration_time = time.time() + self._expiration + + self._pika_engine = mock.MagicMock() + + self._rpc_reply_exchange = "rpc_reply_exchange" + self._pika_engine.rpc_reply_exchange = self._rpc_reply_exchange + + self._msg_id = 12345567 + + @patch("oslo_serialization.jsonutils.dumps", + new=functools.partial(jsonutils.dumps, sort_keys=True)) + def test_success_message_send(self): + message = pika_drv_msg.RpcReplyPikaOutgoingMessage( + self._pika_engine, self._msg_id, reply="all_fine" + ) + + message.send(self._reply_q, expiration_time=self._expiration_time, + retrier=None) + + self._pika_engine.connection_with_confirmation_pool.acquire( + ).__enter__().channel.publish.assert_called_once_with( + body=b'{"s": "all_fine"}', + exchange=self._rpc_reply_exchange, mandatory=True, + properties=mock.ANY, + routing_key=self._reply_q + ) + + props = self._pika_engine.connection_with_confirmation_pool.acquire( + ).__enter__().channel.publish.call_args[1]["properties"] + + self.assertEqual(props.content_encoding, 'utf-8') + self.assertEqual(props.content_type, 'application/json') + self.assertEqual(props.delivery_mode, 1) + self.assertTrue(self._expiration * 1000 - float(props.expiration) < + 100) + self.assertEqual(props.headers, {'version': '1.0'}) + self.assertEqual(props.correlation_id, message.msg_id) + self.assertIsNone(props.reply_to) + self.assertTrue(props.message_id) + + @patch("traceback.format_exception", new=lambda x,y,z:z) + @patch("oslo_serialization.jsonutils.dumps", + new=functools.partial(jsonutils.dumps, sort_keys=True)) + def test_failure_message_send(self): + failure_info = (oslo_messaging.MessagingException, + oslo_messaging.MessagingException("Error message"), + ['It is a trace']) + + + message = pika_drv_msg.RpcReplyPikaOutgoingMessage( + self._pika_engine, self._msg_id, failure_info=failure_info + ) + + message.send(self._reply_q, expiration_time=self._expiration_time, + retrier=None) + + self._pika_engine.connection_with_confirmation_pool.acquire( + ).__enter__().channel.publish.assert_called_once_with( + body=mock.ANY, + exchange=self._rpc_reply_exchange, + mandatory=True, + properties=mock.ANY, + routing_key=self._reply_q + ) + + body = self._pika_engine.connection_with_confirmation_pool.acquire( + ).__enter__().channel.publish.call_args[1]["body"] + self.assertEqual( + b'{"e": {"c": "MessagingException", ' + b'"m": "oslo_messaging.exceptions", "s": "Error message", ' + b'"t": ["It is a trace"]}}', + body + ) + + props = self._pika_engine.connection_with_confirmation_pool.acquire( + ).__enter__().channel.publish.call_args[1]["properties"] + + self.assertEqual(props.content_encoding, 'utf-8') + self.assertEqual(props.content_type, 'application/json') + self.assertEqual(props.delivery_mode, 1) + self.assertTrue(self._expiration * 1000 - float(props.expiration) < + 100) + self.assertEqual(props.headers, {'version': '1.0'}) + self.assertEqual(props.correlation_id, message.msg_id) + self.assertIsNone(props.reply_to) + self.assertTrue(props.message_id)