RabbitMQ: advance thru the list of brokers on reconnect

In RabbitMQ implementation, when using multiple rabbit_hosts, we don't want to
immediately retry failed connection for the same failed broker. This was not
the case in existing implementation though, where we've always attempted to
reconnect starting from the first broker in the list of candidates. So if the
first broker failed, we initiated reconnect to the same failed broker.

This change makes reconnect() implementation to select the next broker in the
list. This also means that non-failure reconnect attempts will also switch the
current broker. All in all, users should not rely on any particular order to
use brokers from the list, so this should not constitute an issue.

Change-Id: I76965ea06c1fb6fd0cd384a5b3edc14703470b8f
Partial-Bug: 1261631
This commit is contained in:
Ihar Hrachyshka 2014-01-24 17:31:46 +01:00
parent 84298bd041
commit 71c6866471
2 changed files with 42 additions and 1 deletions

View File

@ -448,6 +448,9 @@ class Connection(object):
self.params_list = params_list
brokers_count = len(self.params_list)
self.next_broker_indices = itertools.cycle(range(brokers_count))
self.memory_transport = self.conf.fake_rabbit
self.connection = None
@ -520,7 +523,7 @@ class Connection(object):
attempt = 0
while True:
params = self.params_list[attempt % len(self.params_list)]
params = self.params_list[next(self.next_broker_indices)]
attempt += 1
try:
self._connect(params)

View File

@ -603,3 +603,41 @@ class TestReplyWireFormat(test_utils.BaseTestCase):
TestReplyWireFormat.generate_scenarios()
class RpcKombuHATestCase(test_utils.BaseTestCase):
def test_reconnect_order(self):
brokers = ['host1', 'host2', 'host3', 'host4', 'host5']
brokers_count = len(brokers)
self.conf.rabbit_hosts = brokers
self.conf.rabbit_max_retries = 1
info = {'attempt': 0}
def _connect(myself, params):
# do as little work that is enough to pass connection attempt
myself.connection = kombu.connection.BrokerConnection(**params)
myself.connection_errors = myself.connection.connection_errors
expected_broker = brokers[info['attempt'] % brokers_count]
self.assertEqual(params['hostname'], expected_broker)
info['attempt'] += 1
# just make sure connection instantiation does not fail with an
# exception
self.stubs.Set(rabbit_driver.Connection, '_connect', _connect)
# starting from the first broker in the list
connection = rabbit_driver.Connection(self.conf)
# now that we have connection object, revert to the real 'connect'
# implementation
self.stubs.UnsetAll()
for i in range(len(brokers)):
self.assertRaises(driver_common.RPCException, connection.reconnect)
connection.close()