Select AMQP message broker at random

The rule of choosing AMQP message broker is that chose first
available one in order now. The order depends on what we set in
configuration file. That means all the connections will flock
to same message broker and that may lead out performance issue.
This patch randomizes the order of choosing message broker for
each connection to leverage broker cluster.

Change-Id: Ib5098e574d4ef81428065885e2295d0f87aba715
Partial-Bug: #1261631
This commit is contained in:
ChangBo Guo(gcb) 2014-03-21 09:49:32 +08:00
parent c90581937c
commit 8ae1880f7a
4 changed files with 11 additions and 14 deletions

View File

@ -16,6 +16,7 @@
import functools import functools
import itertools import itertools
import logging import logging
import random
import time import time
from oslo.config import cfg from oslo.config import cfg
@ -467,12 +468,13 @@ class Connection(object):
] ]
params = { params = {
'qpid_hosts': self.conf.qpid_hosts, 'qpid_hosts': self.conf.qpid_hosts[:],
'username': self.conf.qpid_username, 'username': self.conf.qpid_username,
'password': self.conf.qpid_password, 'password': self.conf.qpid_password,
} }
params.update(server_params or {}) params.update(server_params or {})
random.shuffle(params['qpid_hosts'])
self.brokers = itertools.cycle(params['qpid_hosts']) self.brokers = itertools.cycle(params['qpid_hosts'])
self.username = params['username'] self.username = params['username']

View File

@ -15,6 +15,7 @@
import functools import functools
import itertools import itertools
import logging import logging
import random
import socket import socket
import ssl import ssl
import time import time
@ -466,6 +467,7 @@ class Connection(object):
params_list.append(params) params_list.append(params)
random.shuffle(params_list)
self.params_list = itertools.cycle(params_list) self.params_list = itertools.cycle(params_list)
self.memory_transport = self.conf.fake_rabbit self.memory_transport = self.conf.fake_rabbit

View File

@ -410,8 +410,6 @@ class TestQpidReconnectOrder(test_utils.BaseTestCase):
for _ in range(brokers_count): for _ in range(brokers_count):
connection.reconnect() connection.reconnect()
connection.close()
expected = [] expected = []
for broker in brokers: for broker in brokers:
expected.extend([mock.call(broker), expected.extend([mock.call(broker),
@ -421,12 +419,7 @@ class TestQpidReconnectOrder(test_utils.BaseTestCase):
mock.call().opened().__nonzero__(), mock.call().opened().__nonzero__(),
mock.call().close()]) mock.call().close()])
# the last one was closed with close(), not reconnect() conn_mock.assert_has_calls(expected, any_order=True)
expected.extend([mock.call(brokers[0]),
mock.call().open(),
mock.call().session(),
mock.call().close()])
conn_mock.assert_has_calls(expected)
def synchronized(func): def synchronized(func):

View File

@ -622,17 +622,17 @@ class RpcKombuHATestCase(test_utils.BaseTestCase):
self.conf.rabbit_hosts = brokers self.conf.rabbit_hosts = brokers
self.conf.rabbit_max_retries = 1 self.conf.rabbit_max_retries = 1
info = {'attempt': 0} hostname_sets = set()
def _connect(myself, params): def _connect(myself, params):
# do as little work that is enough to pass connection attempt # do as little work that is enough to pass connection attempt
myself.connection = kombu.connection.BrokerConnection(**params) myself.connection = kombu.connection.BrokerConnection(**params)
myself.connection_errors = myself.connection.connection_errors myself.connection_errors = myself.connection.connection_errors
expected_broker = brokers[info['attempt'] % brokers_count] hostname = params['hostname']
self.assertEqual(expected_broker, params['hostname']) self.assertNotIn(hostname, hostname_sets)
info['attempt'] += 1 hostname_sets.add(hostname)
# just make sure connection instantiation does not fail with an # just make sure connection instantiation does not fail with an
# exception # exception
@ -645,7 +645,7 @@ class RpcKombuHATestCase(test_utils.BaseTestCase):
# implementation # implementation
self.stubs.UnsetAll() self.stubs.UnsetAll()
for i in range(len(brokers)): for i in range(brokers_count):
self.assertRaises(driver_common.RPCException, connection.reconnect) self.assertRaises(driver_common.RPCException, connection.reconnect)
connection.close() connection.close()