Create a shared queue for QPID topic consumers

Fixes bug 1257293

This is a cherrypick of oslo-incubator:
ef406a21782134aeefb944f74b3f1a47d6169318

From the oslo-incubator commit:
When multiple RPC servers (consumers) are subscribed to the same RPC
topic, a single RPC request to that topic should be received by only
one of the consumers.  A bug in the QPID driver caused every consumer
to receive a copy of the RPC request.  This bug affects only Topology
version 2.  This patch will cause a single queue to be created for
each topic, and shared among all consumers of that topic.  This
results in each RPC request being received by only one consumer,
in turn across all the competing consumers.

Change-Id: I97c6eb5fe63322ab70716c63074818ff1b6ea82b
This commit is contained in:
Eoghan Glynn 2013-12-06 10:52:38 +00:00
parent ee9bd9b506
commit 3357cc982f

View File

@ -130,14 +130,13 @@ class ConsumerBase(object):
},
},
}
if link_name:
addr_opts["link"]["name"] = link_name
addr_opts["node"]["x-declare"].update(node_opts)
elif conf.qpid_topology_version == 2:
addr_opts = {
"link": {
"x-declare": {
"auto-delete": True,
"exclusive": False,
},
},
}
@ -145,6 +144,8 @@ class ConsumerBase(object):
raise_invalid_topology_version()
addr_opts["link"]["x-declare"].update(link_opts)
if link_name:
addr_opts["link"]["name"] = link_name
self.address = "%s ; %s" % (node_name, jsonutils.dumps(addr_opts))
@ -219,14 +220,16 @@ class DirectConsumer(ConsumerBase):
if conf.qpid_topology_version == 1:
node_name = "%s/%s" % (msg_id, msg_id)
node_opts = {"type": "direct"}
link_name = msg_id
elif conf.qpid_topology_version == 2:
node_name = "amq.direct/%s" % msg_id
node_opts = {}
link_name = None
else:
raise_invalid_topology_version()
super(DirectConsumer, self).__init__(conf, session, callback,
node_name, node_opts, msg_id,
node_name, node_opts, link_name,
link_opts)