Merge "Cancel consumer if queue down"
This commit is contained in:
commit
8d78ab2339
@ -173,6 +173,11 @@ rabbit_opts = [
|
||||
'for direct send. The direct send is used as reply, '
|
||||
'so the MessageUndeliverable exception is raised '
|
||||
'in case the client queue does not exist.'),
|
||||
cfg.BoolOpt('enable_cancel_on_failover',
|
||||
default=False,
|
||||
help="Enable x-cancel-on-ha-failover flag so that "
|
||||
"rabbitmq server will cancel and notify consumers"
|
||||
"when queue is down")
|
||||
]
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
@ -234,7 +239,8 @@ class Consumer(object):
|
||||
|
||||
def __init__(self, exchange_name, queue_name, routing_key, type, durable,
|
||||
exchange_auto_delete, queue_auto_delete, callback,
|
||||
nowait=False, rabbit_ha_queues=None, rabbit_queue_ttl=0):
|
||||
nowait=False, rabbit_ha_queues=None, rabbit_queue_ttl=0,
|
||||
enable_cancel_on_failover=False):
|
||||
"""Init the Consumer class with the exchange_name, routing_key,
|
||||
type, durable auto_delete
|
||||
"""
|
||||
@ -256,10 +262,16 @@ class Consumer(object):
|
||||
type=type,
|
||||
durable=self.durable,
|
||||
auto_delete=self.exchange_auto_delete)
|
||||
self.enable_cancel_on_failover = enable_cancel_on_failover
|
||||
|
||||
def declare(self, conn):
|
||||
"""Re-declare the queue after a rabbit (re)connect."""
|
||||
|
||||
consumer_arguments = None
|
||||
if self.enable_cancel_on_failover:
|
||||
consumer_arguments = {
|
||||
"x-cancel-on-ha-failover": True}
|
||||
|
||||
self.queue = kombu.entity.Queue(
|
||||
name=self.queue_name,
|
||||
channel=conn.channel,
|
||||
@ -267,7 +279,9 @@ class Consumer(object):
|
||||
durable=self.durable,
|
||||
auto_delete=self.queue_auto_delete,
|
||||
routing_key=self.routing_key,
|
||||
queue_arguments=self.queue_arguments)
|
||||
queue_arguments=self.queue_arguments,
|
||||
consumer_arguments=consumer_arguments
|
||||
)
|
||||
|
||||
try:
|
||||
LOG.debug('[%s] Queue.declare: %s',
|
||||
@ -468,6 +482,7 @@ class Connection(object):
|
||||
self.kombu_failover_strategy = driver_conf.kombu_failover_strategy
|
||||
self.kombu_compression = driver_conf.kombu_compression
|
||||
self.heartbeat_in_pthread = driver_conf.heartbeat_in_pthread
|
||||
self.enable_cancel_on_failover = driver_conf.enable_cancel_on_failover
|
||||
|
||||
if self.heartbeat_in_pthread:
|
||||
# NOTE(hberaud): Experimental: threading module is in use to run
|
||||
@ -1116,31 +1131,35 @@ class Connection(object):
|
||||
responses for call/multicall
|
||||
"""
|
||||
|
||||
consumer = Consumer(exchange_name='', # using default exchange
|
||||
queue_name=topic,
|
||||
routing_key='',
|
||||
type='direct',
|
||||
durable=False,
|
||||
exchange_auto_delete=False,
|
||||
queue_auto_delete=False,
|
||||
callback=callback,
|
||||
rabbit_ha_queues=self.rabbit_ha_queues,
|
||||
rabbit_queue_ttl=self.rabbit_transient_queues_ttl)
|
||||
consumer = Consumer(
|
||||
exchange_name='', # using default exchange
|
||||
queue_name=topic,
|
||||
routing_key='',
|
||||
type='direct',
|
||||
durable=False,
|
||||
exchange_auto_delete=False,
|
||||
queue_auto_delete=False,
|
||||
callback=callback,
|
||||
rabbit_ha_queues=self.rabbit_ha_queues,
|
||||
rabbit_queue_ttl=self.rabbit_transient_queues_ttl,
|
||||
enable_cancel_on_failover=self.enable_cancel_on_failover)
|
||||
|
||||
self.declare_consumer(consumer)
|
||||
|
||||
def declare_topic_consumer(self, exchange_name, topic, callback=None,
|
||||
queue_name=None):
|
||||
"""Create a 'topic' consumer."""
|
||||
consumer = Consumer(exchange_name=exchange_name,
|
||||
queue_name=queue_name or topic,
|
||||
routing_key=topic,
|
||||
type='topic',
|
||||
durable=self.amqp_durable_queues,
|
||||
exchange_auto_delete=self.amqp_auto_delete,
|
||||
queue_auto_delete=self.amqp_auto_delete,
|
||||
callback=callback,
|
||||
rabbit_ha_queues=self.rabbit_ha_queues)
|
||||
consumer = Consumer(
|
||||
exchange_name=exchange_name,
|
||||
queue_name=queue_name or topic,
|
||||
routing_key=topic,
|
||||
type='topic',
|
||||
durable=self.amqp_durable_queues,
|
||||
exchange_auto_delete=self.amqp_auto_delete,
|
||||
queue_auto_delete=self.amqp_auto_delete,
|
||||
callback=callback,
|
||||
rabbit_ha_queues=self.rabbit_ha_queues,
|
||||
enable_cancel_on_failover=self.enable_cancel_on_failover)
|
||||
|
||||
self.declare_consumer(consumer)
|
||||
|
||||
@ -1151,16 +1170,18 @@ class Connection(object):
|
||||
exchange_name = '%s_fanout' % topic
|
||||
queue_name = '%s_fanout_%s' % (topic, unique)
|
||||
|
||||
consumer = Consumer(exchange_name=exchange_name,
|
||||
queue_name=queue_name,
|
||||
routing_key=topic,
|
||||
type='fanout',
|
||||
durable=False,
|
||||
exchange_auto_delete=True,
|
||||
queue_auto_delete=False,
|
||||
callback=callback,
|
||||
rabbit_ha_queues=self.rabbit_ha_queues,
|
||||
rabbit_queue_ttl=self.rabbit_transient_queues_ttl)
|
||||
consumer = Consumer(
|
||||
exchange_name=exchange_name,
|
||||
queue_name=queue_name,
|
||||
routing_key=topic,
|
||||
type='fanout',
|
||||
durable=False,
|
||||
exchange_auto_delete=True,
|
||||
queue_auto_delete=False,
|
||||
callback=callback,
|
||||
rabbit_ha_queues=self.rabbit_ha_queues,
|
||||
rabbit_queue_ttl=self.rabbit_transient_queues_ttl,
|
||||
enable_cancel_on_failover=self.enable_cancel_on_failover)
|
||||
|
||||
self.declare_consumer(consumer)
|
||||
|
||||
|
@ -39,6 +39,12 @@ class RabbitMQFailoverTests(test_utils.BaseTestCase):
|
||||
]
|
||||
|
||||
def test_failover_scenario(self):
|
||||
self._test_failover_scenario()
|
||||
|
||||
def test_failover_scenario_enable_cancel_on_failover(self):
|
||||
self._test_failover_scenario(enable_cancel_on_failover=True)
|
||||
|
||||
def _test_failover_scenario(self, enable_cancel_on_failover=False):
|
||||
# NOTE(sileht): run this test only if functional suite run of a driver
|
||||
# that use rabbitmq as backend
|
||||
self.driver = os.environ.get('TRANSPORT_DRIVER')
|
||||
@ -53,6 +59,7 @@ class RabbitMQFailoverTests(test_utils.BaseTestCase):
|
||||
kombu_reconnect_delay=0,
|
||||
rabbit_retry_interval=0,
|
||||
rabbit_retry_backoff=0,
|
||||
enable_cancel_on_failover=enable_cancel_on_failover,
|
||||
group='oslo_messaging_rabbit')
|
||||
|
||||
self.pifpaf = self.useFixture(rabbitmq.RabbitMQDriver(cluster=True,
|
||||
|
@ -0,0 +1,6 @@
|
||||
---
|
||||
fixes:
|
||||
- |
|
||||
Add a new option `enable_cancel_on_failover` for rabbitmq driver
|
||||
which when enabled, will cancel consumers when queue appears
|
||||
to be down.
|
Loading…
x
Reference in New Issue
Block a user