From 71c6866471d628b207d7c5b84bfd37cc9fed0898 Mon Sep 17 00:00:00 2001 From: Ihar Hrachyshka Date: Fri, 24 Jan 2014 17:31:46 +0100 Subject: [PATCH] RabbitMQ: advance thru the list of brokers on reconnect In RabbitMQ implementation, when using multiple rabbit_hosts, we don't want to immediately retry failed connection for the same failed broker. This was not the case in existing implementation though, where we've always attempted to reconnect starting from the first broker in the list of candidates. So if the first broker failed, we initiated reconnect to the same failed broker. This change makes reconnect() implementation to select the next broker in the list. This also means that non-failure reconnect attempts will also switch the current broker. All in all, users should not rely on any particular order to use brokers from the list, so this should not constitute an issue. Change-Id: I76965ea06c1fb6fd0cd384a5b3edc14703470b8f Partial-Bug: 1261631 --- oslo/messaging/_drivers/impl_rabbit.py | 5 +++- tests/test_rabbit.py | 38 ++++++++++++++++++++++++++ 2 files changed, 42 insertions(+), 1 deletion(-) diff --git a/oslo/messaging/_drivers/impl_rabbit.py b/oslo/messaging/_drivers/impl_rabbit.py index aef9ffac9..57fa89eb7 100644 --- a/oslo/messaging/_drivers/impl_rabbit.py +++ b/oslo/messaging/_drivers/impl_rabbit.py @@ -448,6 +448,9 @@ class Connection(object): self.params_list = params_list + brokers_count = len(self.params_list) + self.next_broker_indices = itertools.cycle(range(brokers_count)) + self.memory_transport = self.conf.fake_rabbit self.connection = None @@ -520,7 +523,7 @@ class Connection(object): attempt = 0 while True: - params = self.params_list[attempt % len(self.params_list)] + params = self.params_list[next(self.next_broker_indices)] attempt += 1 try: self._connect(params) diff --git a/tests/test_rabbit.py b/tests/test_rabbit.py index 81b670d63..4e1273cb6 100644 --- a/tests/test_rabbit.py +++ b/tests/test_rabbit.py @@ -603,3 +603,41 @@ class TestReplyWireFormat(test_utils.BaseTestCase): TestReplyWireFormat.generate_scenarios() + + +class RpcKombuHATestCase(test_utils.BaseTestCase): + + def test_reconnect_order(self): + brokers = ['host1', 'host2', 'host3', 'host4', 'host5'] + brokers_count = len(brokers) + + self.conf.rabbit_hosts = brokers + self.conf.rabbit_max_retries = 1 + + info = {'attempt': 0} + + def _connect(myself, params): + # do as little work that is enough to pass connection attempt + myself.connection = kombu.connection.BrokerConnection(**params) + myself.connection_errors = myself.connection.connection_errors + + expected_broker = brokers[info['attempt'] % brokers_count] + self.assertEqual(params['hostname'], expected_broker) + + info['attempt'] += 1 + + # just make sure connection instantiation does not fail with an + # exception + self.stubs.Set(rabbit_driver.Connection, '_connect', _connect) + + # starting from the first broker in the list + connection = rabbit_driver.Connection(self.conf) + + # now that we have connection object, revert to the real 'connect' + # implementation + self.stubs.UnsetAll() + + for i in range(len(brokers)): + self.assertRaises(driver_common.RPCException, connection.reconnect) + + connection.close()