From 5464229e633a5b8268b1b142fd7584544001fe5c Mon Sep 17 00:00:00 2001 From: Mark McLoughlin Date: Mon, 3 Mar 2014 06:58:35 -0800 Subject: [PATCH] Do not leak _unique_id out of amqp drivers In commit d8d2ad9 we delayed when a message's unique ID gets added to the duplicate message cache in order to allow for message requeueing. However, as part of this, we exposed the private _unique_id field outside of the driver. This commit reverses that change by storing the ID in the AMQPIncomingMessage object. Change-Id: Ibeb7896de7ad9abf3c6a43495c1a87aabb762c0d --- oslo/messaging/_drivers/amqp.py | 19 ++++++++++--------- oslo/messaging/_drivers/amqpdriver.py | 8 +++++--- tests/test_rabbit.py | 4 ---- 3 files changed, 15 insertions(+), 16 deletions(-) diff --git a/oslo/messaging/_drivers/amqp.py b/oslo/messaging/_drivers/amqp.py index 6b9c88ec9..4f3056770 100644 --- a/oslo/messaging/_drivers/amqp.py +++ b/oslo/messaging/_drivers/amqp.py @@ -318,16 +318,17 @@ class _MsgIdCache(object): """AMQP consumers may read same message twice when exceptions occur before ack is returned. This method prevents doing it. """ - if UNIQUE_ID in message_data: - msg_id = message_data.get(UNIQUE_ID) - if msg_id in self.prev_msgids: - raise rpc_common.DuplicateMessageError(msg_id=msg_id) - - def add(self, message_data): - if UNIQUE_ID in message_data: + try: msg_id = message_data.pop(UNIQUE_ID) - if msg_id not in self.prev_msgids: - self.prev_msgids.append(msg_id) + except KeyError: + return + if msg_id in self.prev_msgids: + raise rpc_common.DuplicateMessageError(msg_id=msg_id) + return msg_id + + def add(self, msg_id): + if msg_id and msg_id not in self.prev_msgids: + self.prev_msgids.append(msg_id) def _add_unique_id(msg): diff --git a/oslo/messaging/_drivers/amqpdriver.py b/oslo/messaging/_drivers/amqpdriver.py index 8ccba92ac..b8b9fcefe 100644 --- a/oslo/messaging/_drivers/amqpdriver.py +++ b/oslo/messaging/_drivers/amqpdriver.py @@ -31,10 +31,11 @@ LOG = logging.getLogger(__name__) class AMQPIncomingMessage(base.IncomingMessage): - def __init__(self, listener, ctxt, message, msg_id, reply_q): + def __init__(self, listener, ctxt, message, unique_id, msg_id, reply_q): super(AMQPIncomingMessage, self).__init__(listener, ctxt, dict(message)) + self.unique_id = unique_id self.msg_id = msg_id self.reply_q = reply_q self.acknowledge_callback = message.acknowledge @@ -67,7 +68,7 @@ class AMQPIncomingMessage(base.IncomingMessage): self._send_reply(conn, ending=True) def acknowledge(self): - self.listener.msg_id_cache.add(self.message) + self.listener.msg_id_cache.add(self.unique_id) self.acknowledge_callback() def requeue(self): @@ -92,12 +93,13 @@ class AMQPListener(base.Listener): # FIXME(markmc): logging isn't driver specific rpc_common._safe_log(LOG.debug, 'received %s', dict(message)) - self.msg_id_cache.check_duplicate_message(message) + unique_id = self.msg_id_cache.check_duplicate_message(message) ctxt = rpc_amqp.unpack_context(self.conf, message) self.incoming.append(AMQPIncomingMessage(self, ctxt.to_dict(), message, + unique_id, ctxt.msg_id, ctxt.reply_q)) diff --git a/tests/test_rabbit.py b/tests/test_rabbit.py index 77afd41bb..8c0874191 100644 --- a/tests/test_rabbit.py +++ b/tests/test_rabbit.py @@ -206,7 +206,6 @@ class TestSendReceive(test_utils.BaseTestCase): senders[i].start() received = listener.poll() - received.message.pop('_unique_id') self.assertIsNotNone(received) self.assertEqual(received.ctxt, self.ctxt) self.assertEqual(received.message, {'tx_id': i}) @@ -303,14 +302,12 @@ class TestRacyWaitForReply(test_utils.BaseTestCase): senders[0].start() msgs.append(listener.poll()) - msgs[-1].message.pop('_unique_id') self.assertEqual(msgs[-1].message, {'tx_id': 0}) # Start the second guy, receive his message senders[1].start() msgs.append(listener.poll()) - msgs[-1].message.pop('_unique_id') self.assertEqual(msgs[-1].message, {'tx_id': 1}) # Reply to both in order, making the second thread queue @@ -605,7 +602,6 @@ class TestReplyWireFormat(test_utils.BaseTestCase): producer.publish(msg) received = listener.poll() - received.message.pop('_unique_id') self.assertIsNotNone(received) self.assertEqual(self.expected_ctxt, received.ctxt) self.assertEqual(self.expected, received.message)