diff --git a/tests/unit/test_worker.py b/tests/unit/test_worker.py index b6590ef..52da770 100644 --- a/tests/unit/test_worker.py +++ b/tests/unit/test_worker.py @@ -50,14 +50,14 @@ class ConsumerTestCase(unittest.TestCase): self.mox.StubOutWithMock(worker.Consumer, '_create_queue') consumer = worker.Consumer('test', None, None, True, {}, "nova", ["monitor.info", "monitor.error"], - "stacktach") + "stacktach_") exchange = self.mox.CreateMockAnything() consumer._create_exchange('nova', 'topic').AndReturn(exchange) info_queue = self.mox.CreateMockAnything() error_queue = self.mox.CreateMockAnything() - consumer._create_queue('stacktach', exchange, 'monitor.info')\ + consumer._create_queue('stacktach_nova', exchange, 'monitor.info')\ .AndReturn(info_queue) - consumer._create_queue('stacktach', exchange, 'monitor.error')\ + consumer._create_queue('stacktach_nova', exchange, 'monitor.error')\ .AndReturn(error_queue) self.mox.ReplayAll() consumers = consumer.get_consumers(Consumer, None) @@ -74,7 +74,7 @@ class ConsumerTestCase(unittest.TestCase): args = {'key': 'value'} consumer = worker.Consumer('test', None, None, True, args, 'nova', ["monitor.info", "monitor.error"], - "stacktach") + "stacktach_") self.mox.StubOutClassWithMocks(kombu.entity, 'Exchange') exchange = kombu.entity.Exchange('nova', type='topic', exclusive=False, @@ -92,7 +92,7 @@ class ConsumerTestCase(unittest.TestCase): queue_arguments={}) consumer = worker.Consumer('test', None, None, True, {}, 'nova', ["monitor.info", "monitor.error"], - "stacktach") + "stacktach_") self.mox.ReplayAll() actual_queue = consumer._create_queue('name', exchange, 'routing.key', exclusive=False, @@ -110,7 +110,7 @@ class ConsumerTestCase(unittest.TestCase): queue_arguments=queue_args) consumer = worker.Consumer('test', None, None, True, queue_args, 'nova', ["monitor.info", "monitor.error"], - "stacktach") + "stacktach_") self.mox.ReplayAll() actual_queue = consumer._create_queue('name', exchange, 'routing.key', exclusive=False, @@ -127,7 +127,7 @@ class ConsumerTestCase(unittest.TestCase): exchange = 'nova' consumer = worker.Consumer('test', None, deployment, True, {}, exchange, ["monitor.info", "monitor.error"], - "stacktach") + "stacktach_") routing_key = 'monitor.info' message.delivery_info = {'routing_key': routing_key} body_dict = {u'key': u'value'} @@ -189,7 +189,7 @@ class ConsumerTestCase(unittest.TestCase): consumer = worker.Consumer(config['name'], conn, deployment, config['durable_queue'], {}, exchange, ["monitor.info", "monitor.error"], - "stacktach") + "stacktach_") consumer.run() worker.continue_running().AndReturn(False) self.mox.ReplayAll() @@ -206,7 +206,7 @@ class ConsumerTestCase(unittest.TestCase): 'rabbit_password': 'rabbit', 'rabbit_virtual_host': '/', 'queue_arguments': {'x-ha-policy': 'all'}, - 'queue_name': "test_name", + 'queue_name_prefix': "test_name_", "services": ["nova"], "topics": {"nova": ["monitor.info", "monitor.error"]} } @@ -233,7 +233,7 @@ class ConsumerTestCase(unittest.TestCase): config['durable_queue'], config['queue_arguments'], exchange, ["monitor.info", "monitor.error"], - "test_name") + "test_name_") consumer.run() worker.continue_running().AndReturn(False) self.mox.ReplayAll() diff --git a/worker/worker.py b/worker/worker.py index 9f25f96..75a93b7 100644 --- a/worker/worker.py +++ b/worker/worker.py @@ -44,7 +44,7 @@ LOG = stacklog.get_logger() class Consumer(kombu.mixins.ConsumerMixin): def __init__(self, name, connection, deployment, durable, queue_arguments, - exchange, topics, queue_name): + exchange, topics, queue_name_prefix): self.connection = connection self.deployment = deployment self.durable = durable @@ -56,7 +56,7 @@ class Consumer(kombu.mixins.ConsumerMixin): self.total_processed = 0 self.topics = topics self.exchange = exchange - self.queue_name = queue_name + self.queue_name_prefix = queue_name_prefix def _create_exchange(self, name, type, exclusive=False, auto_delete=False): return kombu.entity.Exchange(name, type=type, exclusive=exclusive, @@ -73,7 +73,8 @@ class Consumer(kombu.mixins.ConsumerMixin): def get_consumers(self, Consumer, channel): exchange = self._create_exchange(self.exchange, "topic") - queues = [self._create_queue(self.queue_name, exchange, topic) + queue_name = "%s%s" % (self.queue_name_prefix, self.exchange) + queues = [self._create_queue(queue_name, exchange, topic) for topic in self.topics] return [Consumer(queues=queues, callbacks=[self.on_nova])] @@ -153,7 +154,7 @@ def run(deployment_config, exchange): queue_arguments = deployment_config.get('queue_arguments', {}) exit_on_exception = deployment_config.get('exit_on_exception', False) topics = deployment_config.get('topics', {}) - queue_name = deployment_config.get('queue_name', 'stacktach') + queue_name_prefix = deployment_config.get('queue_name_prefix', 'stacktach_') deployment, new = db.get_or_create_deployment(name) @@ -177,7 +178,7 @@ def run(deployment_config, exchange): consumer = Consumer(name, conn, deployment, durable, queue_arguments, exchange, topics[exchange], - queue_name) + queue_name_prefix) consumer.run() except Exception as e: LOG.error("!!!!Exception!!!!")