Force creating non durable control exchange when a precondition failed
Precondition failed exception related to durable exchange config may be triggered when a control exchange is shared between services and when services try to create it with configs that differ from each others. RabbitMQ will reject the services that try to create it with a configuration that differ from the one used first. This kind of exception is not managed for now and services can fails without handling this kind of issue. These changes catch this kind exception to analyze if they related to durable config. In this case we try to re-declare the failing exchange/queue as non durable. This problem can be easily reproduced by running a local RabbitMQ server. By setting the config below (sample.conf): ``` [DEFAULT] transport_url = rabbit://localhost/ [OSLO_MESSAGING_RABBIT] amqp_durable_queues = true ``` And by running our simulator twice: ``` $ tox -e venv -- python tools/simulator.py -d rpc-server -w 40 $ tox -e venv -- python tools/simulator.py --config-file ./sample.conf -d rpc-server -w 40 ``` The first one will create a default non durable control exchange. The second one will create the same default control exchange but as durable. Closes-Bug: #1953351 Change-Id: I27625b468c428cde6609730c8ab429c2c112d010
This commit is contained in:
parent
feb72de7b8
commit
1fd461647f
@ -254,11 +254,44 @@ class Consumer(object):
|
||||
self._declared_on = None
|
||||
self.exchange = kombu.entity.Exchange(
|
||||
name=exchange_name,
|
||||
type=type,
|
||||
type=self.type,
|
||||
durable=self.durable,
|
||||
auto_delete=self.exchange_auto_delete)
|
||||
self.enable_cancel_on_failover = enable_cancel_on_failover
|
||||
|
||||
def _declare_fallback(self, err, conn, consumer_arguments):
|
||||
"""Fallback by declaring a non durable queue.
|
||||
|
||||
When a control exchange is shared between services it is possible
|
||||
that some service created first a non durable control exchange and
|
||||
then after that an other service can try to create the same control
|
||||
exchange but as a durable control exchange. In this case RabbitMQ
|
||||
will raise an exception (PreconditionFailed), and then it will stop
|
||||
our execution and our service will fail entirly. In this case we want
|
||||
to fallback by creating a non durable queue to match the default
|
||||
config.
|
||||
"""
|
||||
if "PRECONDITION_FAILED - inequivalent arg 'durable'" in str(err):
|
||||
LOG.info(
|
||||
"[%s] Retrying to declare the exchange (%s) as "
|
||||
"non durable", conn.connection_id, self.exchange_name)
|
||||
self.exchange = kombu.entity.Exchange(
|
||||
name=self.exchange_name,
|
||||
type=self.type,
|
||||
durable=False,
|
||||
auto_delete=self.queue_auto_delete)
|
||||
self.queue = kombu.entity.Queue(
|
||||
name=self.queue_name,
|
||||
channel=conn.channel,
|
||||
exchange=self.exchange,
|
||||
durable=False,
|
||||
auto_delete=self.queue_auto_delete,
|
||||
routing_key=self.routing_key,
|
||||
queue_arguments=self.queue_arguments,
|
||||
consumer_arguments=consumer_arguments
|
||||
)
|
||||
self.queue.declare()
|
||||
|
||||
def declare(self, conn):
|
||||
"""Re-declare the queue after a rabbit (re)connect."""
|
||||
|
||||
@ -281,7 +314,18 @@ class Consumer(object):
|
||||
try:
|
||||
LOG.debug('[%s] Queue.declare: %s',
|
||||
conn.connection_id, self.queue_name)
|
||||
try:
|
||||
self.queue.declare()
|
||||
except amqp_exec.PreconditionFailed as err:
|
||||
# NOTE(hberaud): This kind of exception may be triggered
|
||||
# when a control exchange is shared between services and
|
||||
# when services try to create it with configs that differ
|
||||
# from each others. RabbitMQ will reject the services
|
||||
# that try to create it with a configuration that differ
|
||||
# from the one used first.
|
||||
LOG.warning(err)
|
||||
self._declare_fallback(err, conn, consumer_arguments)
|
||||
|
||||
except conn.connection.channel_errors as exc:
|
||||
# NOTE(jrosenboom): This exception may be triggered by a race
|
||||
# condition. Simply retrying will solve the error most of the time
|
||||
@ -1219,6 +1263,18 @@ class Connection(object):
|
||||
"""Publish a message."""
|
||||
|
||||
if not (exchange.passive or exchange.name in self._declared_exchanges):
|
||||
try:
|
||||
exchange(self.channel).declare()
|
||||
except amqp_exec.PreconditionFailed as err:
|
||||
# NOTE(hberaud): This kind of exception may be triggered
|
||||
# when a control exchange is shared between services and
|
||||
# when services try to create it with configs that differ
|
||||
# from each others. RabbitMQ will reject the services
|
||||
# that try to create it with a configuration that differ
|
||||
# from the one used first.
|
||||
if "PRECONDITION_FAILED - inequivalent arg 'durable'" \
|
||||
in str(err):
|
||||
exchange.durable = False
|
||||
exchange(self.channel).declare()
|
||||
self._declared_exchanges.add(exchange.name)
|
||||
|
||||
|
@ -0,0 +1,5 @@
|
||||
---
|
||||
fixes:
|
||||
- |
|
||||
Force creating non durable control exchange when a precondition failed
|
||||
related to config that differ occuring.
|
Loading…
Reference in New Issue
Block a user