Do not leak _unique_id out of amqp drivers
In commit d8d2ad9 we delayed when a message's unique ID gets added to the duplicate message cache in order to allow for message requeueing. However, as part of this, we exposed the private _unique_id field outside of the driver. This commit reverses that change by storing the ID in the AMQPIncomingMessage object. Change-Id: Ibeb7896de7ad9abf3c6a43495c1a87aabb762c0d
This commit is contained in:
parent
d8d2ad95d7
commit
5464229e63
@ -318,16 +318,17 @@ class _MsgIdCache(object):
|
||||
"""AMQP consumers may read same message twice when exceptions occur
|
||||
before ack is returned. This method prevents doing it.
|
||||
"""
|
||||
if UNIQUE_ID in message_data:
|
||||
msg_id = message_data.get(UNIQUE_ID)
|
||||
if msg_id in self.prev_msgids:
|
||||
raise rpc_common.DuplicateMessageError(msg_id=msg_id)
|
||||
|
||||
def add(self, message_data):
|
||||
if UNIQUE_ID in message_data:
|
||||
try:
|
||||
msg_id = message_data.pop(UNIQUE_ID)
|
||||
if msg_id not in self.prev_msgids:
|
||||
self.prev_msgids.append(msg_id)
|
||||
except KeyError:
|
||||
return
|
||||
if msg_id in self.prev_msgids:
|
||||
raise rpc_common.DuplicateMessageError(msg_id=msg_id)
|
||||
return msg_id
|
||||
|
||||
def add(self, msg_id):
|
||||
if msg_id and msg_id not in self.prev_msgids:
|
||||
self.prev_msgids.append(msg_id)
|
||||
|
||||
|
||||
def _add_unique_id(msg):
|
||||
|
@ -31,10 +31,11 @@ LOG = logging.getLogger(__name__)
|
||||
|
||||
class AMQPIncomingMessage(base.IncomingMessage):
|
||||
|
||||
def __init__(self, listener, ctxt, message, msg_id, reply_q):
|
||||
def __init__(self, listener, ctxt, message, unique_id, msg_id, reply_q):
|
||||
super(AMQPIncomingMessage, self).__init__(listener, ctxt,
|
||||
dict(message))
|
||||
|
||||
self.unique_id = unique_id
|
||||
self.msg_id = msg_id
|
||||
self.reply_q = reply_q
|
||||
self.acknowledge_callback = message.acknowledge
|
||||
@ -67,7 +68,7 @@ class AMQPIncomingMessage(base.IncomingMessage):
|
||||
self._send_reply(conn, ending=True)
|
||||
|
||||
def acknowledge(self):
|
||||
self.listener.msg_id_cache.add(self.message)
|
||||
self.listener.msg_id_cache.add(self.unique_id)
|
||||
self.acknowledge_callback()
|
||||
|
||||
def requeue(self):
|
||||
@ -92,12 +93,13 @@ class AMQPListener(base.Listener):
|
||||
# FIXME(markmc): logging isn't driver specific
|
||||
rpc_common._safe_log(LOG.debug, 'received %s', dict(message))
|
||||
|
||||
self.msg_id_cache.check_duplicate_message(message)
|
||||
unique_id = self.msg_id_cache.check_duplicate_message(message)
|
||||
ctxt = rpc_amqp.unpack_context(self.conf, message)
|
||||
|
||||
self.incoming.append(AMQPIncomingMessage(self,
|
||||
ctxt.to_dict(),
|
||||
message,
|
||||
unique_id,
|
||||
ctxt.msg_id,
|
||||
ctxt.reply_q))
|
||||
|
||||
|
@ -206,7 +206,6 @@ class TestSendReceive(test_utils.BaseTestCase):
|
||||
senders[i].start()
|
||||
|
||||
received = listener.poll()
|
||||
received.message.pop('_unique_id')
|
||||
self.assertIsNotNone(received)
|
||||
self.assertEqual(received.ctxt, self.ctxt)
|
||||
self.assertEqual(received.message, {'tx_id': i})
|
||||
@ -303,14 +302,12 @@ class TestRacyWaitForReply(test_utils.BaseTestCase):
|
||||
senders[0].start()
|
||||
|
||||
msgs.append(listener.poll())
|
||||
msgs[-1].message.pop('_unique_id')
|
||||
self.assertEqual(msgs[-1].message, {'tx_id': 0})
|
||||
|
||||
# Start the second guy, receive his message
|
||||
senders[1].start()
|
||||
|
||||
msgs.append(listener.poll())
|
||||
msgs[-1].message.pop('_unique_id')
|
||||
self.assertEqual(msgs[-1].message, {'tx_id': 1})
|
||||
|
||||
# Reply to both in order, making the second thread queue
|
||||
@ -605,7 +602,6 @@ class TestReplyWireFormat(test_utils.BaseTestCase):
|
||||
producer.publish(msg)
|
||||
|
||||
received = listener.poll()
|
||||
received.message.pop('_unique_id')
|
||||
self.assertIsNotNone(received)
|
||||
self.assertEqual(self.expected_ctxt, received.ctxt)
|
||||
self.assertEqual(self.expected, received.message)
|
||||
|
Loading…
x
Reference in New Issue
Block a user