Properly reconnect subscribing clients when QPID broker restarts

Fixes bug 1251757

This is a cherrypick of oslo-incubator:
e227c0ed7e0ed1f9b8d029336f8aeb60e38c23df

From the oslo-incubator bug:
When the QPID broker is restarted (or fails over), subscribed clients
will attempt to re-establish their connections.  In the case of fanout
subscriptions, this reconnection functionality is broken. For version
1 topologies, the clients attempt to reconnect twice to the same
exclusive address - which is illegal.  In the case of version 2
topologies, the address parsing is broken and an illegal address is
created on reconnect.  This fix avoids the problem by removing the
special-case reconnect code that manages UUID addresses; it is
unnecessary as the QPID broker will generate unique queue names
automatically when the clients reconnect.

Change-Id: If966bb25e49b5837293a1bb181bbb02086599d81
This commit is contained in:
Eoghan Glynn 2013-12-05 22:16:55 +00:00
parent 5591e6a980
commit ee9bd9b506

View File

@ -16,7 +16,6 @@
import functools
import itertools
import time
import uuid
import eventlet
import greenlet
@ -123,7 +122,6 @@ class ConsumerBase(object):
},
},
"link": {
"name": link_name,
"durable": True,
"x-declare": {
"durable": False,
@ -132,6 +130,8 @@ class ConsumerBase(object):
},
},
}
if link_name:
addr_opts["link"]["name"] = link_name
addr_opts["node"]["x-declare"].update(node_opts)
elif conf.qpid_topology_version == 2:
addr_opts = {
@ -278,30 +278,16 @@ class FanoutConsumer(ConsumerBase):
if conf.qpid_topology_version == 1:
node_name = "%s_fanout" % topic
node_opts = {"durable": False, "type": "fanout"}
link_name = "%s_fanout_%s" % (topic, uuid.uuid4().hex)
elif conf.qpid_topology_version == 2:
node_name = "amq.topic/fanout/%s" % topic
node_opts = {}
link_name = ""
else:
raise_invalid_topology_version()
super(FanoutConsumer, self).__init__(conf, session, callback,
node_name, node_opts, link_name,
node_name, node_opts, None,
link_opts)
def reconnect(self, session):
topic = self.get_node_name().rpartition('_fanout')[0]
params = {
'session': session,
'topic': topic,
'callback': self.callback,
}
self.__init__(conf=self.conf, **params)
super(FanoutConsumer, self).reconnect(session)
class Publisher(object):
"""Base Publisher class."""