From a709d957fdb085ed6f1e356e6114d5f7ee4e0e15 Mon Sep 17 00:00:00 2001 From: Andrew Melton Date: Tue, 30 Jul 2013 11:51:55 -0400 Subject: [PATCH] Adding queue_name to worker config. The idea behind this is to allow consumption of notifications from a service that can only emit notifications on a single routing_key. --- etc/sample_stacktach_worker_config.json | 2 ++ tests/unit/test_worker.py | 26 ++++++++++++++++--------- worker/worker.py | 9 ++++++--- 3 files changed, 25 insertions(+), 12 deletions(-) diff --git a/etc/sample_stacktach_worker_config.json b/etc/sample_stacktach_worker_config.json index f15892e..e0238f3 100644 --- a/etc/sample_stacktach_worker_config.json +++ b/etc/sample_stacktach_worker_config.json @@ -8,6 +8,7 @@ "rabbit_password": "rabbit", "rabbit_virtual_host": "/", "exit_on_exception": true, + "queue_name": "stacktach", "topics": { "nova": ["monitor.info", "monitor.error"], "glance": ["monitor_glance.info", "monitor_glance.error"] @@ -22,6 +23,7 @@ "rabbit_password": "rabbit", "rabbit_virtual_host": "/", "exit_on_exception": false, + "queue_name": "stacktach", "topics": { "nova": ["monitor.info", "monitor.error"], "glance": ["monitor_glance.info", "monitor_glance.error"] diff --git a/tests/unit/test_worker.py b/tests/unit/test_worker.py index a7db64b..b6590ef 100644 --- a/tests/unit/test_worker.py +++ b/tests/unit/test_worker.py @@ -49,14 +49,15 @@ class ConsumerTestCase(unittest.TestCase): self.mox.StubOutWithMock(worker.Consumer, '_create_exchange') self.mox.StubOutWithMock(worker.Consumer, '_create_queue') consumer = worker.Consumer('test', None, None, True, {}, "nova", - ["monitor.info", "monitor.error"]) + ["monitor.info", "monitor.error"], + "stacktach") exchange = self.mox.CreateMockAnything() consumer._create_exchange('nova', 'topic').AndReturn(exchange) info_queue = self.mox.CreateMockAnything() error_queue = self.mox.CreateMockAnything() - consumer._create_queue('monitor.info', exchange, 'monitor.info')\ + consumer._create_queue('stacktach', exchange, 'monitor.info')\ .AndReturn(info_queue) - consumer._create_queue('monitor.error', exchange, 'monitor.error')\ + consumer._create_queue('stacktach', exchange, 'monitor.error')\ .AndReturn(error_queue) self.mox.ReplayAll() consumers = consumer.get_consumers(Consumer, None) @@ -72,7 +73,8 @@ class ConsumerTestCase(unittest.TestCase): def test_create_exchange(self): args = {'key': 'value'} consumer = worker.Consumer('test', None, None, True, args, 'nova', - ["monitor.info", "monitor.error"]) + ["monitor.info", "monitor.error"], + "stacktach") self.mox.StubOutClassWithMocks(kombu.entity, 'Exchange') exchange = kombu.entity.Exchange('nova', type='topic', exclusive=False, @@ -89,7 +91,8 @@ class ConsumerTestCase(unittest.TestCase): exclusive=False, routing_key='routing.key', queue_arguments={}) consumer = worker.Consumer('test', None, None, True, {}, 'nova', - ["monitor.info", "monitor.error"]) + ["monitor.info", "monitor.error"], + "stacktach") self.mox.ReplayAll() actual_queue = consumer._create_queue('name', exchange, 'routing.key', exclusive=False, @@ -106,7 +109,8 @@ class ConsumerTestCase(unittest.TestCase): exclusive=False, routing_key='routing.key', queue_arguments=queue_args) consumer = worker.Consumer('test', None, None, True, queue_args, - 'nova', ["monitor.info", "monitor.error"]) + 'nova', ["monitor.info", "monitor.error"], + "stacktach") self.mox.ReplayAll() actual_queue = consumer._create_queue('name', exchange, 'routing.key', exclusive=False, @@ -122,7 +126,8 @@ class ConsumerTestCase(unittest.TestCase): exchange = 'nova' consumer = worker.Consumer('test', None, deployment, True, {}, - exchange, ["monitor.info", "monitor.error"]) + exchange, ["monitor.info", "monitor.error"], + "stacktach") routing_key = 'monitor.info' message.delivery_info = {'routing_key': routing_key} body_dict = {u'key': u'value'} @@ -183,7 +188,8 @@ class ConsumerTestCase(unittest.TestCase): exchange = 'nova' consumer = worker.Consumer(config['name'], conn, deployment, config['durable_queue'], {}, exchange, - ["monitor.info", "monitor.error"]) + ["monitor.info", "monitor.error"], + "stacktach") consumer.run() worker.continue_running().AndReturn(False) self.mox.ReplayAll() @@ -200,6 +206,7 @@ class ConsumerTestCase(unittest.TestCase): 'rabbit_password': 'rabbit', 'rabbit_virtual_host': '/', 'queue_arguments': {'x-ha-policy': 'all'}, + 'queue_name': "test_name", "services": ["nova"], "topics": {"nova": ["monitor.info", "monitor.error"]} } @@ -225,7 +232,8 @@ class ConsumerTestCase(unittest.TestCase): consumer = worker.Consumer(config['name'], conn, deployment, config['durable_queue'], config['queue_arguments'], exchange, - ["monitor.info", "monitor.error"]) + ["monitor.info", "monitor.error"], + "test_name") consumer.run() worker.continue_running().AndReturn(False) self.mox.ReplayAll() diff --git a/worker/worker.py b/worker/worker.py index 615b693..9f25f96 100644 --- a/worker/worker.py +++ b/worker/worker.py @@ -44,7 +44,7 @@ LOG = stacklog.get_logger() class Consumer(kombu.mixins.ConsumerMixin): def __init__(self, name, connection, deployment, durable, queue_arguments, - exchange, topics): + exchange, topics, queue_name): self.connection = connection self.deployment = deployment self.durable = durable @@ -56,6 +56,7 @@ class Consumer(kombu.mixins.ConsumerMixin): self.total_processed = 0 self.topics = topics self.exchange = exchange + self.queue_name = queue_name def _create_exchange(self, name, type, exclusive=False, auto_delete=False): return kombu.entity.Exchange(name, type=type, exclusive=exclusive, @@ -72,7 +73,7 @@ class Consumer(kombu.mixins.ConsumerMixin): def get_consumers(self, Consumer, channel): exchange = self._create_exchange(self.exchange, "topic") - queues = [self._create_queue(topic, exchange, topic) + queues = [self._create_queue(self.queue_name, exchange, topic) for topic in self.topics] return [Consumer(queues=queues, callbacks=[self.on_nova])] @@ -152,6 +153,7 @@ def run(deployment_config, exchange): queue_arguments = deployment_config.get('queue_arguments', {}) exit_on_exception = deployment_config.get('exit_on_exception', False) topics = deployment_config.get('topics', {}) + queue_name = deployment_config.get('queue_name', 'stacktach') deployment, new = db.get_or_create_deployment(name) @@ -174,7 +176,8 @@ def run(deployment_config, exchange): try: consumer = Consumer(name, conn, deployment, durable, queue_arguments, exchange, - topics[exchange]) + topics[exchange], + queue_name) consumer.run() except Exception as e: LOG.error("!!!!Exception!!!!")