diff --git a/oslo_messaging/_drivers/amqp.py b/oslo_messaging/_drivers/amqp.py index ce5c21e0e..a91be1aad 100644 --- a/oslo_messaging/_drivers/amqp.py +++ b/oslo_messaging/_drivers/amqp.py @@ -49,6 +49,18 @@ amqp_opts = [ default=False, deprecated_group='DEFAULT', help='Auto-delete queues in AMQP.'), + cfg.BoolOpt('send_single_reply', + default=False, + help='Send a single AMQP reply to call message. The current ' + 'behaviour since oslo-incubator is to send two AMQP ' + 'replies - first one with the payload, a second one to ' + 'ensure the other have finish to send the payload. We ' + 'are going to remove it in the N release, but we must ' + 'keep backward compatible at the same time. This option ' + 'provides such compatibility - it defaults to False in ' + 'Liberty and can be turned on for early adopters with a ' + 'new installations or for testing. Please note, that ' + 'this option will be removed in M release.') ] UNIQUE_ID = '_unique_id' diff --git a/oslo_messaging/_drivers/amqpdriver.py b/oslo_messaging/_drivers/amqpdriver.py index 6e19dd897..3a1d9bbed 100644 --- a/oslo_messaging/_drivers/amqpdriver.py +++ b/oslo_messaging/_drivers/amqpdriver.py @@ -71,8 +71,12 @@ class AMQPIncomingMessage(base.IncomingMessage): return with self.listener.driver._get_connection( rpc_amqp.PURPOSE_SEND) as conn: - self._send_reply(conn, reply, failure, log_failure=log_failure) - self._send_reply(conn, ending=True) + if self.listener.driver.send_single_reply: + self._send_reply(conn, reply, failure, log_failure=log_failure, + ending=True) + else: + self._send_reply(conn, reply, failure, log_failure=log_failure) + self._send_reply(conn, ending=True) def acknowledge(self): self.listener.msg_id_cache.add(self.unique_id) @@ -257,7 +261,8 @@ class ReplyWaiter(object): class AMQPDriverBase(base.BaseDriver): def __init__(self, conf, url, connection_pool, - default_exchange=None, allowed_remote_exmods=None): + default_exchange=None, allowed_remote_exmods=None, + send_single_reply=False): super(AMQPDriverBase, self).__init__(conf, url, default_exchange, allowed_remote_exmods) @@ -270,6 +275,8 @@ class AMQPDriverBase(base.BaseDriver): self._reply_q_conn = None self._waiter = None + self.send_single_reply = send_single_reply + def _get_exchange(self, target): return target.exchange or self._default_exchange diff --git a/oslo_messaging/_drivers/impl_qpid.py b/oslo_messaging/_drivers/impl_qpid.py index 8153abae6..c4dd11784 100644 --- a/oslo_messaging/_drivers/impl_qpid.py +++ b/oslo_messaging/_drivers/impl_qpid.py @@ -778,7 +778,10 @@ class QpidDriver(amqpdriver.AMQPDriverBase): conf, conf.oslo_messaging_qpid.rpc_conn_pool_size, url, Connection) - super(QpidDriver, self).__init__(conf, url, - connection_pool, - default_exchange, - allowed_remote_exmods) + super(QpidDriver, self).__init__( + conf, url, + connection_pool, + default_exchange, + allowed_remote_exmods, + conf.oslo_messaging_qpid.send_single_reply, + ) diff --git a/oslo_messaging/_drivers/impl_rabbit.py b/oslo_messaging/_drivers/impl_rabbit.py index 0f57a9279..a51072f8f 100644 --- a/oslo_messaging/_drivers/impl_rabbit.py +++ b/oslo_messaging/_drivers/impl_rabbit.py @@ -1085,10 +1085,13 @@ class RabbitDriver(amqpdriver.AMQPDriverBase): conf, conf.oslo_messaging_rabbit.rpc_conn_pool_size, url, Connection) - super(RabbitDriver, self).__init__(conf, url, - connection_pool, - default_exchange, - allowed_remote_exmods) + super(RabbitDriver, self).__init__( + conf, url, + connection_pool, + default_exchange, + allowed_remote_exmods, + conf.oslo_messaging_rabbit.send_single_reply, + ) def require_features(self, requeue=True): pass