diff --git a/oslo/messaging/_drivers/impl_rabbit.py b/oslo/messaging/_drivers/impl_rabbit.py index aef9ffac9..57fa89eb7 100644 --- a/oslo/messaging/_drivers/impl_rabbit.py +++ b/oslo/messaging/_drivers/impl_rabbit.py @@ -448,6 +448,9 @@ class Connection(object): self.params_list = params_list + brokers_count = len(self.params_list) + self.next_broker_indices = itertools.cycle(range(brokers_count)) + self.memory_transport = self.conf.fake_rabbit self.connection = None @@ -520,7 +523,7 @@ class Connection(object): attempt = 0 while True: - params = self.params_list[attempt % len(self.params_list)] + params = self.params_list[next(self.next_broker_indices)] attempt += 1 try: self._connect(params) diff --git a/tests/test_rabbit.py b/tests/test_rabbit.py index 81b670d63..4e1273cb6 100644 --- a/tests/test_rabbit.py +++ b/tests/test_rabbit.py @@ -603,3 +603,41 @@ class TestReplyWireFormat(test_utils.BaseTestCase): TestReplyWireFormat.generate_scenarios() + + +class RpcKombuHATestCase(test_utils.BaseTestCase): + + def test_reconnect_order(self): + brokers = ['host1', 'host2', 'host3', 'host4', 'host5'] + brokers_count = len(brokers) + + self.conf.rabbit_hosts = brokers + self.conf.rabbit_max_retries = 1 + + info = {'attempt': 0} + + def _connect(myself, params): + # do as little work that is enough to pass connection attempt + myself.connection = kombu.connection.BrokerConnection(**params) + myself.connection_errors = myself.connection.connection_errors + + expected_broker = brokers[info['attempt'] % brokers_count] + self.assertEqual(params['hostname'], expected_broker) + + info['attempt'] += 1 + + # just make sure connection instantiation does not fail with an + # exception + self.stubs.Set(rabbit_driver.Connection, '_connect', _connect) + + # starting from the first broker in the list + connection = rabbit_driver.Connection(self.conf) + + # now that we have connection object, revert to the real 'connect' + # implementation + self.stubs.UnsetAll() + + for i in range(len(brokers)): + self.assertRaises(driver_common.RPCException, connection.reconnect) + + connection.close()