rabbit: Remove unused stuffs from publisher

The publisher code is over engineered, it allows to override
everything, but this is never used.

None of the child Class have the same signature, sometimes
the constructor use the parameter name as the parent class but for
a different purpose, that make the code hard to read.

It's was never clear which options is passed to the queue and the
exchange at this end to kombu.

This changes removes all of that stuffs, and only use the kombu
terminology for publisher parameters.

Change-Id: I3cebf3ed1647a3121dcf33e2160cf315486f5204
This commit is contained in:
Mehdi Abaakouk 2015-04-30 16:13:14 +02:00
parent 87137e7af0
commit cca84f66d4

View File

@ -365,111 +365,108 @@ class FanoutConsumer(ConsumerBase):
class Publisher(object):
"""Base Publisher class."""
"""Publisher that silently creates exchange but no queues."""
def __init__(self, channel, exchange_name, routing_key, **kwargs):
passive = False
def __init__(self, conf, exchange_name, routing_key, type, durable,
auto_delete):
"""Init the Publisher class with the exchange_name, routing_key,
and other options
type, durable auto_delete
"""
self.queue_arguments = _get_queue_arguments(conf)
self.exchange_name = exchange_name
self.routing_key = routing_key
self.kwargs = kwargs
self.reconnect(channel)
def reconnect(self, channel):
"""Re-establish the Producer after a rabbit reconnection."""
self.auto_delete = auto_delete
self.durable = durable
self.exchange = kombu.entity.Exchange(name=self.exchange_name,
**self.kwargs)
self.producer = kombu.messaging.Producer(exchange=self.exchange,
channel=channel,
routing_key=self.routing_key)
type=type,
exclusive=False,
durable=durable,
auto_delete=auto_delete,
passive=self.passive)
def send(self, msg, timeout=None):
"""Send a message."""
def send(self, conn, msg, timeout=None):
"""Send a message on an channel."""
producer = kombu.messaging.Producer(exchange=self.exchange,
channel=conn.channel,
routing_key=self.routing_key)
headers = {}
if timeout:
#
# AMQP TTL is in milliseconds when set in the header.
#
self.producer.publish(msg, headers={'ttl': (timeout * 1000)})
else:
self.producer.publish(msg)
# AMQP TTL is in milliseconds when set in the property.
# Details: http://www.rabbitmq.com/ttl.html#per-message-ttl
# NOTE(sileht): this amqp header doesn't exists ... LP#1444854
headers['ttl'] = timeout * 1000
producer.publish(msg, headers=headers)
class DirectPublisher(Publisher):
"""Publisher class for 'direct'."""
def __init__(self, conf, channel, topic, **kwargs):
"""Init a 'direct' publisher.
class DeclareQueuePublisher(Publisher):
"""Publisher that declares a default queue
Kombu options may be passed as keyword args to override defaults
"""
When the exchange is missing instead of silently creating an exchange
not binded to a queue, this publisher creates a default queue
named with the routing_key.
options = {'durable': False,
'auto_delete': True,
'exclusive': False,
'passive': True}
options.update(kwargs)
super(DirectPublisher, self).__init__(channel, topic, topic,
type='direct', **options)
This is mainly used to not miss notifications in case of nobody consumes
them yet. If the future consumer binds the default queue it can retrieve
missing messages.
"""
# FIXME(sileht): The side effect of this is that we declare again and
# again the same queue, and generate a lot of useless rabbit traffic.
# https://bugs.launchpad.net/oslo.messaging/+bug/1437902
class TopicPublisher(Publisher):
"""Publisher class for 'topic'."""
def __init__(self, conf, channel, exchange_name, topic, **kwargs):
"""Init a 'topic' publisher.
Kombu options may be passed as keyword args to override defaults
"""
options = {'durable': conf.amqp_durable_queues,
'auto_delete': conf.amqp_auto_delete,
'exclusive': False}
options.update(kwargs)
super(TopicPublisher, self).__init__(channel,
exchange_name,
topic,
type='topic',
**options)
class FanoutPublisher(Publisher):
"""Publisher class for 'fanout'."""
def __init__(self, conf, channel, topic, **kwargs):
"""Init a 'fanout' publisher.
Kombu options may be passed as keyword args to override defaults
"""
options = {'durable': False,
'auto_delete': True,
'exclusive': False}
options.update(kwargs)
super(FanoutPublisher, self).__init__(channel, '%s_fanout' % topic,
None, type='fanout', **options)
class NotifyPublisher(TopicPublisher):
"""Publisher class for 'notify'."""
def __init__(self, conf, channel, exchange_name, topic, **kwargs):
self.durable = kwargs.pop('durable', conf.amqp_durable_queues)
self.auto_delete = kwargs.pop('auto_delete', conf.amqp_auto_delete)
self.queue_arguments = _get_queue_arguments(conf)
super(NotifyPublisher, self).__init__(conf, channel, exchange_name,
topic, **kwargs)
def reconnect(self, channel):
super(NotifyPublisher, self).reconnect(channel)
# NOTE(jerdfelt): Normally the consumer would create the queue, but
# we do this to ensure that messages don't get dropped if the
# consumer is started after we do
queue = kombu.entity.Queue(channel=channel,
exchange=self.exchange,
durable=self.durable,
auto_delete=self.auto_delete,
name=self.routing_key,
routing_key=self.routing_key,
queue_arguments=self.queue_arguments)
def send(self, conn, msg, timeout=None):
queue = kombu.entity.Queue(
channel=conn.channel,
exchange=self.exchange,
durable=self.durable,
auto_delete=self.auto_delete,
name=self.routing_key,
routing_key=self.routing_key,
queue_arguments=self.queue_arguments)
queue.declare()
super(DeclareQueuePublisher, self).send(
conn, msg, timeout)
class RetryOnMissingExchangePublisher(Publisher):
"""Publisher that retry during 60 seconds if the exchange is missing."""
passive = True
def send(self, conn, msg, timeout=None):
# TODO(sileht):
# * use timeout parameter when available
# * use rpc_timeout if not instead of hardcoded 60
# * use @retrying
timer = rpc_common.DecayingTimer(duration=60)
timer.start()
while True:
try:
super(RetryOnMissingExchangePublisher, self).send(conn, msg,
timeout)
return
except conn.connection.channel_errors as exc:
# NOTE(noelbk/sileht):
# If rabbit dies, the consumer can be disconnected before the
# publisher sends, and if the consumer hasn't declared the
# queue, the publisher's will send a message to an exchange
# that's not bound to a queue, and the message wll be lost.
# So we set passive=True to the publisher exchange and catch
# the 404 kombu ChannelError and retry until the exchange
# appears
if exc.code == 404 and timer.check_return() > 0:
LOG.info(_LI("The exchange %(exchange)s to send to "
"%(routing_key)s doesn't exist yet, "
"retrying...") % {
'exchange': self.exchange,
'routing_key': self.routing_key})
time.sleep(1)
continue
raise
class DummyConnectionLock(object):
@ -1038,32 +1035,20 @@ class Connection(object):
recoverable_error_callback=_recoverable_error_callback,
error_callback=_error_callback)
@staticmethod
def _log_publisher_send_error(topic, exc):
log_info = {'topic': topic, 'err_str': exc}
LOG.error(_("Failed to publish message to topic "
"'%(topic)s': %(err_str)s"), log_info)
LOG.debug('Exception', exc_info=exc)
default_marker = object()
def publisher_send(self, cls, topic, msg, timeout=None, retry=None,
error_callback=default_marker, **kwargs):
def publisher_send(self, publisher, msg, timeout=None, retry=None):
"""Send to a publisher based on the publisher class."""
def _default_error_callback(exc):
self._log_publisher_send_error(topic, exc)
if error_callback is self.default_marker:
error_callback = _default_error_callback
def _error_callback(exc):
log_info = {'topic': publisher.exchange_name, 'err_str': exc}
LOG.error(_("Failed to publish message to topic "
"'%(topic)s': %(err_str)s"), log_info)
LOG.debug('Exception', exc_info=exc)
def _publish():
publisher = cls(self.driver_conf, self.channel, topic=topic,
**kwargs)
publisher.send(msg, timeout)
publisher.send(self, msg, timeout)
with self._connection_lock:
self.ensure(_publish, retry=retry, error_callback=error_callback)
self.ensure(_publish, retry=retry, error_callback=_error_callback)
def declare_direct_consumer(self, topic, callback):
"""Create a 'direct' queue.
@ -1088,49 +1073,48 @@ class Connection(object):
def direct_send(self, msg_id, msg):
"""Send a 'direct' message."""
timer = rpc_common.DecayingTimer(duration=60)
timer.start()
# NOTE(sileht): retry at least 60sec, after we have a good change
# that the caller is really dead too...
p = RetryOnMissingExchangePublisher(self.driver_conf,
exchange_name=msg_id,
routing_key=msg_id,
type='direct',
durable=False,
auto_delete=True)
while True:
try:
self.publisher_send(DirectPublisher, msg_id, msg,
error_callback=None)
return
except self.connection.channel_errors as exc:
# NOTE(noelbk/sileht):
# If rabbit dies, the consumer can be disconnected before the
# publisher sends, and if the consumer hasn't declared the
# queue, the publisher's will send a message to an exchange
# that's not bound to a queue, and the message wll be lost.
# So we set passive=True to the publisher exchange and catch
# the 404 kombu ChannelError and retry until the exchange
# appears
if exc.code == 404 and timer.check_return() > 0:
LOG.info(_LI("The exchange to reply to %s doesn't "
"exist yet, retrying...") % msg_id)
time.sleep(1)
continue
self._log_publisher_send_error(msg_id, exc)
raise
except Exception as exc:
self._log_publisher_send_error(msg_id, exc)
raise
self.publisher_send(p, msg)
def topic_send(self, exchange_name, topic, msg, timeout=None, retry=None):
"""Send a 'topic' message."""
self.publisher_send(TopicPublisher, topic, msg, timeout,
exchange_name=exchange_name, retry=retry)
p = Publisher(self.driver_conf,
exchange_name=exchange_name,
routing_key=topic,
type='topic',
durable=self.driver_conf.amqp_durable_queues,
auto_delete=self.driver_conf.amqp_auto_delete)
self.publisher_send(p, msg, timeout, retry=retry)
def fanout_send(self, topic, msg, retry=None):
"""Send a 'fanout' message."""
self.publisher_send(FanoutPublisher, topic, msg, retry=retry)
p = Publisher(self.driver_conf,
exchange_name='%s_fanout' % topic,
routing_key=None,
type='fanout',
durable=False,
auto_delete=True)
self.publisher_send(p, msg, retry=retry)
def notify_send(self, exchange_name, topic, msg, retry=None, **kwargs):
"""Send a notify message on a topic."""
self.publisher_send(NotifyPublisher, topic, msg, timeout=None,
exchange_name=exchange_name, retry=retry, **kwargs)
p = DeclareQueuePublisher(
self.driver_conf,
exchange_name=exchange_name,
routing_key=topic,
type='topic',
durable=self.driver_conf.amqp_durable_queues,
auto_delete=self.driver_conf.amqp_auto_delete)
self.publisher_send(p, msg, timeout=None, retry=retry)
def stop_consuming(self):
self._consume_loop_stopped = True