Merge pull request #13 from rackspace/refix_durable
ugh, messed up the merge
This commit is contained in:
commit
a56e6834db
@ -39,9 +39,10 @@ LOG.addHandler(handler)
|
|||||||
|
|
||||||
|
|
||||||
class NovaConsumer(kombu.mixins.ConsumerMixin):
|
class NovaConsumer(kombu.mixins.ConsumerMixin):
|
||||||
def __init__(self, name, connection, deployment):
|
def __init__(self, name, connection, deployment, durable):
|
||||||
self.connection = connection
|
self.connection = connection
|
||||||
self.deployment = deployment
|
self.deployment = deployment
|
||||||
|
self.durable = durable
|
||||||
self.name = name
|
self.name = name
|
||||||
self.last_time = None
|
self.last_time = None
|
||||||
self.pmi = None
|
self.pmi = None
|
||||||
@ -49,15 +50,15 @@ class NovaConsumer(kombu.mixins.ConsumerMixin):
|
|||||||
self.total_processed = 0
|
self.total_processed = 0
|
||||||
|
|
||||||
def get_consumers(self, Consumer, channel):
|
def get_consumers(self, Consumer, channel):
|
||||||
durable = self.deployment_config.get('durable_queue', True)
|
|
||||||
nova_exchange = kombu.entity.Exchange("nova", type="topic",
|
nova_exchange = kombu.entity.Exchange("nova", type="topic",
|
||||||
exclusive=False, durable=durable, auto_delete=False)
|
exclusive=False, durable=self.durable,
|
||||||
|
auto_delete=False)
|
||||||
|
|
||||||
nova_queues = [
|
nova_queues = [
|
||||||
kombu.Queue("monitor.info", nova_exchange, durable=durable,
|
kombu.Queue("monitor.info", nova_exchange, durable=self.durable,
|
||||||
auto_delete=False,
|
auto_delete=False,
|
||||||
exclusive=False, routing_key='monitor.info'),
|
exclusive=False, routing_key='monitor.info'),
|
||||||
kombu.Queue("monitor.error", nova_exchange, durable=durable,
|
kombu.Queue("monitor.error", nova_exchange, durable=self.durable,
|
||||||
auto_delete=False,
|
auto_delete=False,
|
||||||
exclusive=False, routing_key='monitor.error'),
|
exclusive=False, routing_key='monitor.error'),
|
||||||
]
|
]
|
||||||
@ -122,6 +123,7 @@ def run(deployment_config):
|
|||||||
user_id = deployment_config.get('rabbit_userid', 'rabbit')
|
user_id = deployment_config.get('rabbit_userid', 'rabbit')
|
||||||
password = deployment_config.get('rabbit_password', 'rabbit')
|
password = deployment_config.get('rabbit_password', 'rabbit')
|
||||||
virtual_host = deployment_config.get('rabbit_virtual_host', '/')
|
virtual_host = deployment_config.get('rabbit_virtual_host', '/')
|
||||||
|
durable = deployment_config.get('durable_queue', True)
|
||||||
|
|
||||||
deployment, new = models.get_or_create_deployment(name)
|
deployment, new = models.get_or_create_deployment(name)
|
||||||
|
|
||||||
@ -139,7 +141,7 @@ def run(deployment_config):
|
|||||||
LOG.debug("Processing on '%s'" % name)
|
LOG.debug("Processing on '%s'" % name)
|
||||||
with kombu.connection.BrokerConnection(**params) as conn:
|
with kombu.connection.BrokerConnection(**params) as conn:
|
||||||
try:
|
try:
|
||||||
consumer = NovaConsumer(name, conn, deployment)
|
consumer = NovaConsumer(name, conn, deployment, durable)
|
||||||
consumer.run()
|
consumer.run()
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
LOG.exception("name=%s, exception=%s. Reconnecting in 5s" %
|
LOG.exception("name=%s, exception=%s. Reconnecting in 5s" %
|
||||||
|
Loading…
Reference in New Issue
Block a user