Sync rpc fix from oslo-incubator

Sync the following fixes from oslo-incubator:

e227c0e Properly reconnect subscribing clients when QPID broker restarts
ef406a2 Create a shared queue for QPID topic consumers

Change-Id: I286edf6bc4a677aa61f60da785802c19878c79c7
Closes-bug: #1251757
Closes-bug: #1257293
This commit is contained in:
Flavio Percoco 2013-12-06 15:35:14 +01:00
parent 5d5073b328
commit e7d193da91

View File

@ -18,7 +18,6 @@
import functools
import itertools
import time
import uuid
import eventlet
import greenlet
@ -123,7 +122,6 @@ class ConsumerBase(object):
},
},
"link": {
"name": link_name,
"durable": True,
"x-declare": {
"durable": False,
@ -138,6 +136,7 @@ class ConsumerBase(object):
"link": {
"x-declare": {
"auto-delete": True,
"exclusive": False,
},
},
}
@ -145,6 +144,8 @@ class ConsumerBase(object):
raise_invalid_topology_version()
addr_opts["link"]["x-declare"].update(link_opts)
if link_name:
addr_opts["link"]["name"] = link_name
self.address = "%s ; %s" % (node_name, jsonutils.dumps(addr_opts))
@ -208,14 +209,16 @@ class DirectConsumer(ConsumerBase):
if conf.qpid_topology_version == 1:
node_name = "%s/%s" % (msg_id, msg_id)
node_opts = {"type": "direct"}
link_name = msg_id
elif conf.qpid_topology_version == 2:
node_name = "amq.direct/%s" % msg_id
node_opts = {}
link_name = None
else:
raise_invalid_topology_version()
super(DirectConsumer, self).__init__(conf, session, callback,
node_name, node_opts, msg_id,
node_name, node_opts, link_name,
link_opts)
@ -266,16 +269,14 @@ class FanoutConsumer(ConsumerBase):
if conf.qpid_topology_version == 1:
node_name = "%s_fanout" % topic
node_opts = {"durable": False, "type": "fanout"}
link_name = "%s_fanout_%s" % (topic, uuid.uuid4().hex)
elif conf.qpid_topology_version == 2:
node_name = "amq.topic/fanout/%s" % topic
node_opts = {}
link_name = ""
else:
raise_invalid_topology_version()
super(FanoutConsumer, self).__init__(conf, session, callback,
node_name, node_opts, link_name,
node_name, node_opts, None,
link_opts)