Unique Queue Names Per Exchange.

Each service's queue needs to be uniquely named so that other
services notifications don't end up in each other's queue.
This commit is contained in:
Andrew Melton 2013-07-30 14:59:12 -04:00
parent a709d957fd
commit 691c2833cc
2 changed files with 16 additions and 15 deletions

View File

@ -50,14 +50,14 @@ class ConsumerTestCase(unittest.TestCase):
self.mox.StubOutWithMock(worker.Consumer, '_create_queue') self.mox.StubOutWithMock(worker.Consumer, '_create_queue')
consumer = worker.Consumer('test', None, None, True, {}, "nova", consumer = worker.Consumer('test', None, None, True, {}, "nova",
["monitor.info", "monitor.error"], ["monitor.info", "monitor.error"],
"stacktach") "stacktach_")
exchange = self.mox.CreateMockAnything() exchange = self.mox.CreateMockAnything()
consumer._create_exchange('nova', 'topic').AndReturn(exchange) consumer._create_exchange('nova', 'topic').AndReturn(exchange)
info_queue = self.mox.CreateMockAnything() info_queue = self.mox.CreateMockAnything()
error_queue = self.mox.CreateMockAnything() error_queue = self.mox.CreateMockAnything()
consumer._create_queue('stacktach', exchange, 'monitor.info')\ consumer._create_queue('stacktach_nova', exchange, 'monitor.info')\
.AndReturn(info_queue) .AndReturn(info_queue)
consumer._create_queue('stacktach', exchange, 'monitor.error')\ consumer._create_queue('stacktach_nova', exchange, 'monitor.error')\
.AndReturn(error_queue) .AndReturn(error_queue)
self.mox.ReplayAll() self.mox.ReplayAll()
consumers = consumer.get_consumers(Consumer, None) consumers = consumer.get_consumers(Consumer, None)
@ -74,7 +74,7 @@ class ConsumerTestCase(unittest.TestCase):
args = {'key': 'value'} args = {'key': 'value'}
consumer = worker.Consumer('test', None, None, True, args, 'nova', consumer = worker.Consumer('test', None, None, True, args, 'nova',
["monitor.info", "monitor.error"], ["monitor.info", "monitor.error"],
"stacktach") "stacktach_")
self.mox.StubOutClassWithMocks(kombu.entity, 'Exchange') self.mox.StubOutClassWithMocks(kombu.entity, 'Exchange')
exchange = kombu.entity.Exchange('nova', type='topic', exclusive=False, exchange = kombu.entity.Exchange('nova', type='topic', exclusive=False,
@ -92,7 +92,7 @@ class ConsumerTestCase(unittest.TestCase):
queue_arguments={}) queue_arguments={})
consumer = worker.Consumer('test', None, None, True, {}, 'nova', consumer = worker.Consumer('test', None, None, True, {}, 'nova',
["monitor.info", "monitor.error"], ["monitor.info", "monitor.error"],
"stacktach") "stacktach_")
self.mox.ReplayAll() self.mox.ReplayAll()
actual_queue = consumer._create_queue('name', exchange, 'routing.key', actual_queue = consumer._create_queue('name', exchange, 'routing.key',
exclusive=False, exclusive=False,
@ -110,7 +110,7 @@ class ConsumerTestCase(unittest.TestCase):
queue_arguments=queue_args) queue_arguments=queue_args)
consumer = worker.Consumer('test', None, None, True, queue_args, consumer = worker.Consumer('test', None, None, True, queue_args,
'nova', ["monitor.info", "monitor.error"], 'nova', ["monitor.info", "monitor.error"],
"stacktach") "stacktach_")
self.mox.ReplayAll() self.mox.ReplayAll()
actual_queue = consumer._create_queue('name', exchange, 'routing.key', actual_queue = consumer._create_queue('name', exchange, 'routing.key',
exclusive=False, exclusive=False,
@ -127,7 +127,7 @@ class ConsumerTestCase(unittest.TestCase):
exchange = 'nova' exchange = 'nova'
consumer = worker.Consumer('test', None, deployment, True, {}, consumer = worker.Consumer('test', None, deployment, True, {},
exchange, ["monitor.info", "monitor.error"], exchange, ["monitor.info", "monitor.error"],
"stacktach") "stacktach_")
routing_key = 'monitor.info' routing_key = 'monitor.info'
message.delivery_info = {'routing_key': routing_key} message.delivery_info = {'routing_key': routing_key}
body_dict = {u'key': u'value'} body_dict = {u'key': u'value'}
@ -189,7 +189,7 @@ class ConsumerTestCase(unittest.TestCase):
consumer = worker.Consumer(config['name'], conn, deployment, consumer = worker.Consumer(config['name'], conn, deployment,
config['durable_queue'], {}, exchange, config['durable_queue'], {}, exchange,
["monitor.info", "monitor.error"], ["monitor.info", "monitor.error"],
"stacktach") "stacktach_")
consumer.run() consumer.run()
worker.continue_running().AndReturn(False) worker.continue_running().AndReturn(False)
self.mox.ReplayAll() self.mox.ReplayAll()
@ -206,7 +206,7 @@ class ConsumerTestCase(unittest.TestCase):
'rabbit_password': 'rabbit', 'rabbit_password': 'rabbit',
'rabbit_virtual_host': '/', 'rabbit_virtual_host': '/',
'queue_arguments': {'x-ha-policy': 'all'}, 'queue_arguments': {'x-ha-policy': 'all'},
'queue_name': "test_name", 'queue_name_prefix': "test_name_",
"services": ["nova"], "services": ["nova"],
"topics": {"nova": ["monitor.info", "monitor.error"]} "topics": {"nova": ["monitor.info", "monitor.error"]}
} }
@ -233,7 +233,7 @@ class ConsumerTestCase(unittest.TestCase):
config['durable_queue'], config['durable_queue'],
config['queue_arguments'], exchange, config['queue_arguments'], exchange,
["monitor.info", "monitor.error"], ["monitor.info", "monitor.error"],
"test_name") "test_name_")
consumer.run() consumer.run()
worker.continue_running().AndReturn(False) worker.continue_running().AndReturn(False)
self.mox.ReplayAll() self.mox.ReplayAll()

View File

@ -44,7 +44,7 @@ LOG = stacklog.get_logger()
class Consumer(kombu.mixins.ConsumerMixin): class Consumer(kombu.mixins.ConsumerMixin):
def __init__(self, name, connection, deployment, durable, queue_arguments, def __init__(self, name, connection, deployment, durable, queue_arguments,
exchange, topics, queue_name): exchange, topics, queue_name_prefix):
self.connection = connection self.connection = connection
self.deployment = deployment self.deployment = deployment
self.durable = durable self.durable = durable
@ -56,7 +56,7 @@ class Consumer(kombu.mixins.ConsumerMixin):
self.total_processed = 0 self.total_processed = 0
self.topics = topics self.topics = topics
self.exchange = exchange self.exchange = exchange
self.queue_name = queue_name self.queue_name_prefix = queue_name_prefix
def _create_exchange(self, name, type, exclusive=False, auto_delete=False): def _create_exchange(self, name, type, exclusive=False, auto_delete=False):
return kombu.entity.Exchange(name, type=type, exclusive=exclusive, return kombu.entity.Exchange(name, type=type, exclusive=exclusive,
@ -73,7 +73,8 @@ class Consumer(kombu.mixins.ConsumerMixin):
def get_consumers(self, Consumer, channel): def get_consumers(self, Consumer, channel):
exchange = self._create_exchange(self.exchange, "topic") exchange = self._create_exchange(self.exchange, "topic")
queues = [self._create_queue(self.queue_name, exchange, topic) queue_name = "%s%s" % (self.queue_name_prefix, self.exchange)
queues = [self._create_queue(queue_name, exchange, topic)
for topic in self.topics] for topic in self.topics]
return [Consumer(queues=queues, callbacks=[self.on_nova])] return [Consumer(queues=queues, callbacks=[self.on_nova])]
@ -153,7 +154,7 @@ def run(deployment_config, exchange):
queue_arguments = deployment_config.get('queue_arguments', {}) queue_arguments = deployment_config.get('queue_arguments', {})
exit_on_exception = deployment_config.get('exit_on_exception', False) exit_on_exception = deployment_config.get('exit_on_exception', False)
topics = deployment_config.get('topics', {}) topics = deployment_config.get('topics', {})
queue_name = deployment_config.get('queue_name', 'stacktach') queue_name_prefix = deployment_config.get('queue_name_prefix', 'stacktach_')
deployment, new = db.get_or_create_deployment(name) deployment, new = db.get_or_create_deployment(name)
@ -177,7 +178,7 @@ def run(deployment_config, exchange):
consumer = Consumer(name, conn, deployment, durable, consumer = Consumer(name, conn, deployment, durable,
queue_arguments, exchange, queue_arguments, exchange,
topics[exchange], topics[exchange],
queue_name) queue_name_prefix)
consumer.run() consumer.run()
except Exception as e: except Exception as e:
LOG.error("!!!!Exception!!!!") LOG.error("!!!!Exception!!!!")