From 3357cc982f2c1389cd7f53e8451b061dcd604979 Mon Sep 17 00:00:00 2001 From: Eoghan Glynn Date: Fri, 6 Dec 2013 10:52:38 +0000 Subject: [PATCH] Create a shared queue for QPID topic consumers Fixes bug 1257293 This is a cherrypick of oslo-incubator: ef406a21782134aeefb944f74b3f1a47d6169318 From the oslo-incubator commit: When multiple RPC servers (consumers) are subscribed to the same RPC topic, a single RPC request to that topic should be received by only one of the consumers. A bug in the QPID driver caused every consumer to receive a copy of the RPC request. This bug affects only Topology version 2. This patch will cause a single queue to be created for each topic, and shared among all consumers of that topic. This results in each RPC request being received by only one consumer, in turn across all the competing consumers. Change-Id: I97c6eb5fe63322ab70716c63074818ff1b6ea82b --- ceilometer/openstack/common/rpc/impl_qpid.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/ceilometer/openstack/common/rpc/impl_qpid.py b/ceilometer/openstack/common/rpc/impl_qpid.py index 0aac045ac..ba6f88a3a 100644 --- a/ceilometer/openstack/common/rpc/impl_qpid.py +++ b/ceilometer/openstack/common/rpc/impl_qpid.py @@ -130,14 +130,13 @@ class ConsumerBase(object): }, }, } - if link_name: - addr_opts["link"]["name"] = link_name addr_opts["node"]["x-declare"].update(node_opts) elif conf.qpid_topology_version == 2: addr_opts = { "link": { "x-declare": { "auto-delete": True, + "exclusive": False, }, }, } @@ -145,6 +144,8 @@ class ConsumerBase(object): raise_invalid_topology_version() addr_opts["link"]["x-declare"].update(link_opts) + if link_name: + addr_opts["link"]["name"] = link_name self.address = "%s ; %s" % (node_name, jsonutils.dumps(addr_opts)) @@ -219,14 +220,16 @@ class DirectConsumer(ConsumerBase): if conf.qpid_topology_version == 1: node_name = "%s/%s" % (msg_id, msg_id) node_opts = {"type": "direct"} + link_name = msg_id elif conf.qpid_topology_version == 2: node_name = "amq.direct/%s" % msg_id node_opts = {} + link_name = None else: raise_invalid_topology_version() super(DirectConsumer, self).__init__(conf, session, callback, - node_name, node_opts, msg_id, + node_name, node_opts, link_name, link_opts)