From 968d3e6741ac95a0988ee5cbd2fee2ad53697d58 Mon Sep 17 00:00:00 2001 From: Dmitriy Ukhlov Date: Mon, 2 Nov 2015 16:05:59 +0200 Subject: [PATCH] Fixes and improvements after testing on RabbitMQ cluster: 1) adds tcp_user_timeout parameter - timiout for unacked tcp pockets 2) adds host_connection_reconnect_delay parameter - delay for reconnection to some host if error occurs during connection. It allows to use other hosts if we have some host disconnected 3) adds rpc_listener_ack and rpc_listener_prefetch_count properties - enable consumer acknowledges and set maximum number of unacknowledged messages 4) fixes time units (in oslo.messaging it is seconds but in RabbitMQ - milliseconds) Change-Id: Ifd549a1eebeef27a3d36ceb6d3e8b1c76ea00b65 --- oslo_messaging/_drivers/impl_pika.py | 344 +++++++++++++++++++-------- 1 file changed, 250 insertions(+), 94 deletions(-) diff --git a/oslo_messaging/_drivers/impl_pika.py b/oslo_messaging/_drivers/impl_pika.py index 8dfe3bda8..fc6de2c82 100644 --- a/oslo_messaging/_drivers/impl_pika.py +++ b/oslo_messaging/_drivers/impl_pika.py @@ -24,6 +24,7 @@ import pika_pool import retrying import six +import socket import sys import threading import time @@ -46,30 +47,36 @@ pika_opts = [ help='Maximum number of channels to allow'), cfg.IntOpt('frame_max', default=None, help='The maximum byte size for an AMQP frame'), - cfg.IntOpt('heartbeat_interval', default=None, - help='How often to send heartbeats'), + cfg.IntOpt('heartbeat_interval', default=1, + help="How often to send heartbeats for consumer's connections"), cfg.BoolOpt('ssl', default=None, help='Enable SSL'), cfg.DictOpt('ssl_options', default=None, help='Arguments passed to ssl.wrap_socket'), - cfg.FloatOpt('socket_timeout', default=None, - help='Use for high latency networks'), + cfg.FloatOpt('socket_timeout', default=0.25, + help="Set socket timeout in seconds for connection's socket"), + cfg.FloatOpt('tcp_user_timeout', default=0.25, + help="Set TCP_USER_TIMEOUT in seconds for connection's " + "socket"), + cfg.FloatOpt('host_connection_reconnect_delay', default=5, + help="Set delay for reconnection to some host which has " + "connection error") ] pika_pool_opts = [ cfg.IntOpt('pool_max_size', default=10, help="Maximum number of connections to keep queued."), - cfg.IntOpt('pool_max_overflow', default=10, + cfg.IntOpt('pool_max_overflow', default=0, help="Maximum number of connections to create above " "`pool_max_size`."), cfg.IntOpt('pool_timeout', default=30, help="Default number of seconds to wait for a connections to " "available"), - cfg.IntOpt('pool_recycle', default=None, + cfg.IntOpt('pool_recycle', default=600, help="Lifetime of a connection (since creation) in seconds " "or None for no recycling. Expired connections are " "closed on acquire."), - cfg.IntOpt('pool_stale', default=None, + cfg.IntOpt('pool_stale', default=60, help="Threshold at which inactive (since release) connections " "are considered stale in seconds or None for no " "staleness. Stale connections are closed on acquire.") @@ -87,7 +94,7 @@ notification_opts = [ "sending notification, -1 means infinite retry." ), cfg.FloatOpt( - 'notification_retry_delay', default=0.1, + 'notification_retry_delay', default=0.25, help="Reconnecting retry delay in case of connectivity problem during " "sending notification message" ) @@ -101,23 +108,45 @@ rpc_opts = [ help="Exchange name for for sending RPC messages"), cfg.StrOpt('rpc_reply_exchange', default="${control_exchange}_rpc_reply", help="Exchange name for for receiving RPC replies"), + + cfg.BoolOpt('rpc_listener_ack', default=True, + help="Disable to increase performance. If disabled - some " + "messages may be lost in case of connectivity problem. " + "If enabled - may cause not needed message redelivery " + "and rpc request could be processed more then one time"), + cfg.BoolOpt('rpc_reply_listener_ack', default=True, + help="Disable to increase performance. If disabled - some " + "replies may be lost in case of connectivity problem."), cfg.IntOpt( - 'rpc_reply_retry_attempts', default=3, + 'rpc_listener_prefetch_count', default=10, + help="Max number of not acknowledged message which RabbitMQ can send " + "to rpc listener. Works only if rpc_listener_ack == True" + ), + cfg.IntOpt( + 'rpc_reply_listener_prefetch_count', default=10, + help="Max number of not acknowledged message which RabbitMQ can send " + "to rpc reply listener. Works only if rpc_reply_listener_ack == " + "True" + ), + cfg.IntOpt( + 'rpc_reply_retry_attempts', default=-1, help="Reconnecting retry count in case of connectivity problem during " - "sending reply. -1 means infinite retry." + "sending reply. -1 means infinite retry during rpc_timeout" ), cfg.FloatOpt( - 'rpc_reply_retry_delay', default=0.1, + 'rpc_reply_retry_delay', default=0.25, help="Reconnecting retry delay in case of connectivity problem during " "sending reply." ), cfg.IntOpt( - 'default_rpc_retry_attempts', default=0, + 'default_rpc_retry_attempts', default=-1, help="Reconnecting retry count in case of connectivity problem during " - "sending RPC message, -1 means infinite retry." + "sending RPC message, -1 means infinite retry. If actual " + "retry attempts in not 0 the rpc request could be processed more " + "then one time" ), cfg.FloatOpt( - 'rpc_retry_delay', default=0.1, + 'rpc_retry_delay', default=0.25, help="Reconnecting retry delay in case of connectivity problem during " "sending RPC message" ) @@ -147,10 +176,18 @@ class ConnectionException(exceptions.MessagingException): pass +class HostConnectionNotAllowedException(ConnectionException): + pass + + class EstablishConnectionException(ConnectionException): pass +class TimeoutConnectionException(ConnectionException): + pass + + class PooledConnectionWithConfirmations(pika_pool.Connection): @property def channel(self): @@ -161,9 +198,16 @@ class PooledConnectionWithConfirmations(pika_pool.Connection): class PikaEngine(object): + HOST_CONNECTION_LAST_TRY_TIME = "last_try_time" + HOST_CONNECTION_LAST_SUCCESS_TRY_TIME = "last_success_try_time" + + TCP_USER_TIMEOUT = 18 + def __init__(self, conf, url, default_exchange=None): self.conf = conf + # processing rpc options + self.default_rpc_exchange = ( conf.oslo_messaging_pika.default_rpc_exchange if conf.oslo_messaging_pika.default_rpc_exchange else @@ -175,14 +219,18 @@ class PikaEngine(object): default_exchange ) - self.default_notification_exchange = ( - conf.oslo_messaging_pika.default_notification_exchange if - conf.oslo_messaging_pika.default_notification_exchange else - default_exchange + self.rpc_listener_ack = conf.oslo_messaging_pika.rpc_listener_ack + + self.rpc_reply_listener_ack = ( + conf.oslo_messaging_pika.rpc_reply_listener_ack ) - self.notification_persistence = ( - conf.oslo_messaging_pika.notification_persistence + self.rpc_listener_prefetch_count = ( + conf.oslo_messaging_pika.rpc_listener_prefetch_count + ) + + self.rpc_reply_listener_prefetch_count = ( + conf.oslo_messaging_pika.rpc_listener_prefetch_count ) self.rpc_reply_retry_attempts = ( @@ -198,6 +246,17 @@ class PikaEngine(object): raise ValueError("rpc_reply_retry_delay should be non-negative " "integer") + # processing notification options + self.default_notification_exchange = ( + conf.oslo_messaging_pika.default_notification_exchange if + conf.oslo_messaging_pika.default_notification_exchange else + default_exchange + ) + + self.notification_persistence = ( + conf.oslo_messaging_pika.notification_persistence + ) + self.default_rpc_retry_attempts = ( conf.oslo_messaging_pika.default_rpc_retry_attempts ) @@ -235,32 +294,41 @@ class PikaEngine(object): self._reply_consumer_lock = threading.Lock() self._puller_thread = None + self._tcp_user_timeout = self.conf.oslo_messaging_pika.tcp_user_timeout + self._host_connection_reconnect_delay = ( + self.conf.oslo_messaging_pika.host_connection_reconnect_delay + ) + # initializing connection parameters for configured RabbitMQ hosts - self._pika_next_connection_num = 0 common_pika_params = { 'virtual_host': url.virtual_host, 'channel_max': self.conf.oslo_messaging_pika.channel_max, 'frame_max': self.conf.oslo_messaging_pika.frame_max, - 'heartbeat_interval': - self.conf.oslo_messaging_pika.heartbeat_interval, 'ssl': self.conf.oslo_messaging_pika.ssl, 'ssl_options': self.conf.oslo_messaging_pika.ssl_options, 'socket_timeout': self.conf.oslo_messaging_pika.socket_timeout, } - self._pika_params_list = [] - self._create_connection_lock = threading.Lock() + self._connection_lock = threading.Lock() + + self._connection_host_param_list = [] + self._connection_host_status_list = [] + self._next_connection_host_num = 0 for transport_host in url.hosts: - pika_params = pika.ConnectionParameters( + pika_params = common_pika_params.copy() + pika_params.update( host=transport_host.hostname, port=transport_host.port, credentials=pika_credentials.PlainCredentials( transport_host.username, transport_host.password ), - **common_pika_params ) - self._pika_params_list.append(pika_params) + self._connection_host_param_list.append(pika_params) + self._connection_host_status_list.append({ + self.HOST_CONNECTION_LAST_TRY_TIME: 0, + self.HOST_CONNECTION_LAST_SUCCESS_TRY_TIME: 0 + }) # initializing 2 connection pools: 1st for connections without # confirmations, 2nd - with confirmations @@ -286,40 +354,124 @@ class PikaEngine(object): PooledConnectionWithConfirmations ) - def create_connection(self): + def _next_connection_num(self): + with self._connection_lock: + cur_num = self._next_connection_host_num + self._next_connection_host_num += 1 + self._next_connection_host_num %= len( + self._connection_host_param_list + ) + return cur_num + + def create_connection(self, for_listening=False): """Create and return connection to any available host. :return: cerated connection :raise: ConnectionException if all hosts are not reachable """ - host_num = len(self._pika_params_list) - connection_attempts = host_num + host_count = len(self._connection_host_param_list) + connection_attempts = host_count + + pika_next_connection_num = self._next_connection_num() + while connection_attempts > 0: - with self._create_connection_lock: - try: - return self.create_host_connection( - self._pika_next_connection_num - ) - except pika_pool.Connection.connectivity_errors as e: - LOG.warn(str(e)) - connection_attempts -= 1 - continue - finally: - self._pika_next_connection_num += 1 - self._pika_next_connection_num %= host_num + try: + return self.create_host_connection( + pika_next_connection_num, for_listening + ) + except pika_pool.Connection.connectivity_errors as e: + LOG.warn(str(e)) + except HostConnectionNotAllowedException as e: + LOG.warn(str(e)) + + connection_attempts -= 1 + pika_next_connection_num += 1 + pika_next_connection_num %= host_count + raise EstablishConnectionException( "Can not establish connection to any configured RabbitMQ host: " + - str(self._pika_params_list) + str(self._connection_host_param_list) ) - def create_host_connection(self, host_index): + def _set_tcp_user_timeout(self, s): + if not self._tcp_user_timeout: + return + try: + s.setsockopt( + socket.IPPROTO_TCP, self.TCP_USER_TIMEOUT, + int(self._tcp_user_timeout * 1000) + ) + except socket.error: + LOG.warn( + "Whoops, this kernel doesn't seem to support TCP_USER_TIMEOUT." + ) + + def create_host_connection(self, host_index, for_listening=False): """Create new connection to host #host_index :return: New connection """ - return pika_adapters.BlockingConnection( - self._pika_params_list[host_index] + + with self._connection_lock: + cur_time = time.time() + + last_success_time = self._connection_host_status_list[host_index][ + self.HOST_CONNECTION_LAST_SUCCESS_TRY_TIME + ] + last_time = self._connection_host_status_list[host_index][ + self.HOST_CONNECTION_LAST_TRY_TIME + ] + if (last_time != last_success_time and + cur_time - last_time < + self._host_connection_reconnect_delay): + raise HostConnectionNotAllowedException( + "Connection to host #{} is not allowed now because of " + "previous failure".format(host_index) + ) + + try: + base_host_params = self._connection_host_param_list[host_index] + + connection = pika_adapters.BlockingConnection( + pika.ConnectionParameters( + heartbeat_interval=( + self.conf.oslo_messaging_pika.heartbeat_interval + if for_listening else None + ), + **base_host_params + ) + ) + + self._set_tcp_user_timeout(connection._impl.socket) + + self._connection_host_status_list[host_index][ + self.HOST_CONNECTION_LAST_SUCCESS_TRY_TIME + ] = cur_time + + return connection + finally: + self._connection_host_status_list[host_index][ + self.HOST_CONNECTION_LAST_TRY_TIME + ] = cur_time + + @staticmethod + def declare_queue_binding_by_channel(channel, exchange, queue, routing_key, + exchange_type, queue_expiration, + queue_auto_delete, durable): + channel.exchange_declare( + exchange, exchange_type, auto_delete=True, durable=durable ) + arguments = {} + + if queue_expiration > 0: + arguments['x-expires'] = queue_expiration * 1000 + + channel.queue_declare( + queue, auto_delete=queue_auto_delete, durable=durable, + arguments=arguments + ) + + channel.queue_bind(queue, exchange, routing_key) def declare_queue_binding(self, exchange, queue, routing_key, exchange_type, queue_expiration, @@ -331,20 +483,10 @@ class PikaEngine(object): ) try: with self.connection_pool.acquire(timeout=timeout) as conn: - conn.channel.exchange_declare( - exchange, exchange_type, auto_delete=True, durable=durable + self.declare_queue_binding_by_channel( + conn.channel, exchange, queue, routing_key, exchange_type, + queue_expiration, queue_auto_delete, durable ) - arguments = {} - - if queue_expiration > 0: - arguments['x-expires'] = queue_expiration * 1000 - - conn.channel.queue_declare( - queue, auto_delete=queue_auto_delete, durable=durable, - arguments=arguments - ) - - conn.channel.queue_bind(queue, exchange, routing_key) except pika_pool.Timeout as e: raise exceptions.MessagingTimeout( "Timeout for current operation was expired. {}.".format(str(e)) @@ -369,11 +511,10 @@ class PikaEngine(object): raise exceptions.MessagingTimeout( "Timeout for current operation was expired." ) - try: with pool.acquire(timeout=timeout) as conn: if timeout is not None: - properties.expiration = str(int(timeout)) + properties.expiration = str(int(timeout * 1000)) conn.channel.publish( exchange=exchange, routing_key=routing_key, @@ -410,6 +551,7 @@ class PikaEngine(object): body, properties, exchange, routing_key, str(e) ) ) + raise ConnectionException( "Connectivity problem detected during sending the message: " "[body:{}, properties: {}] to target: [exchange:{}, " @@ -417,6 +559,10 @@ class PikaEngine(object): body, properties, exchange, routing_key, str(e) ) ) + except socket.timeout: + raise TimeoutConnectionException( + "Socket timeout exceeded." + ) def publish(self, exchange, routing_key, body, properties, confirm, mandatory, expiration_time, retrier): @@ -454,6 +600,8 @@ class PikaEngine(object): pika_engine=self, exchange=self.rpc_reply_exchange, queue=self._reply_queue, + no_ack=not self.rpc_reply_listener_ack, + prefetch_count=self.rpc_reply_listener_prefetch_count ) self._reply_listener.start(timeout=timeout) @@ -473,6 +621,7 @@ class PikaEngine(object): while self._reply_consumer_thread_run_flag: try: message = self._reply_listener.poll(timeout=1) + message.acknowledge() if message is None: continue i = 0 @@ -528,9 +677,9 @@ class PikaIncomingMessage(object): self.content_encoding = getattr(properties, "content_encoding", "utf-8") - self.expiration = ( + self.expiration_time = ( None if properties.expiration is None else - int(properties.expiration) + time.time() + int(properties.expiration) / 1000 ) if self.content_type != "application/json": @@ -584,7 +733,7 @@ class PikaIncomingMessage(object): else self._pika_engine.rpc_reply_retry_attempts ), retry_on_exception=on_exception, - wait_fixed=self._pika_engine.rpc_reply_retry_delay, + wait_fixed=self._pika_engine.rpc_reply_retry_delay * 1000, ) try: @@ -601,7 +750,7 @@ class PikaIncomingMessage(object): ), confirm=True, mandatory=False, - expiration_time=time.time() + self.expiration, + expiration_time=self.expiration_time, retrier=retrier ) LOG.debug( @@ -618,18 +767,12 @@ class PikaIncomingMessage(object): def acknowledge(self): if not self._no_ack: - try: - self._channel.basic_ack(delivery_tag=self.delivery_tag) - except Exception: - LOG.exception("Unable to acknowledge the message") + self._channel.basic_ack(delivery_tag=self.delivery_tag) def requeue(self): if not self._no_ack: - try: - return self._channel.basic_nack(delivery_tag=self.delivery_tag, - requeue=True) - except Exception: - LOG.exception("Unable to requeue the message") + return self._channel.basic_nack(delivery_tag=self.delivery_tag, + requeue=True) class PikaOutgoingMessage(object): @@ -669,7 +812,7 @@ class PikaOutgoingMessage(object): ) expiration_time = ( - None if timeout is None else timeout + time.time() + None if timeout is None else (timeout + time.time()) ) if wait_for_reply: @@ -722,7 +865,9 @@ class PikaListener(object): self._message_queue = collections.deque() def _reconnect(self): - self._connection = self._pika_engine.create_connection() + self._connection = self._pika_engine.create_connection( + for_listening=True + ) self._channel = self._connection.channel() self._channel.basic_qos(prefetch_count=self._prefetch_count) @@ -763,12 +908,14 @@ class PikaListener(object): with self._lock: if not self._started: return None - if self._channel is None: - self._reconnect() + try: + if self._channel is None: + self._reconnect() self._connection.process_data_events() - except pika_pool.Connection.connectivity_errors: + except Exception: self._cleanup() + raise if timeout and time.time() - start > timeout: return None @@ -800,7 +947,7 @@ class PikaListener(object): class RpcServicePikaListener(PikaListener): - def __init__(self, pika_engine, target, no_ack=True, prefetch_count=1): + def __init__(self, pika_engine, target, no_ack, prefetch_count): self._target = target super(RpcServicePikaListener, self).__init__( @@ -820,17 +967,20 @@ class RpcServicePikaListener(PikaListener): self._pika_engine.conf.oslo_messaging_pika.rpc_queue_expiration ) - self._pika_engine.declare_queue_binding( + self._pika_engine.declare_queue_binding_by_channel( + channel=self._channel, exchange=exchange, queue=queue, routing_key=queue, exchange_type='direct', queue_expiration=queue_expiration, queue_auto_delete=False, durable=False ) - self._pika_engine.declare_queue_binding( + self._pika_engine.declare_queue_binding_by_channel( + channel=self._channel, exchange=exchange, queue=server_queue, routing_key=server_queue, exchange_type='direct', queue_expiration=queue_expiration, queue_auto_delete=False, durable=False ) - self._pika_engine.declare_queue_binding( + self._pika_engine.declare_queue_binding_by_channel( + channel=self._channel, exchange=fanout_exchange, queue=server_queue, routing_key="", exchange_type='fanout', queue_expiration=queue_expiration, queue_auto_delete=False, durable=False @@ -849,8 +999,7 @@ class RpcServicePikaListener(PikaListener): class RpcReplyPikaListener(PikaListener): - def __init__(self, pika_engine, exchange, queue, no_ack=True, - prefetch_count=1): + def __init__(self, pika_engine, exchange, queue, no_ack, prefetch_count): self._exchange = exchange self._queue = queue @@ -863,7 +1012,8 @@ class RpcReplyPikaListener(PikaListener): self._pika_engine.conf.oslo_messaging_pika.rpc_queue_expiration ) - self._pika_engine.declare_queue_binding( + self._pika_engine.declare_queue_binding_by_channel( + channel=self._channel, exchange=self._exchange, queue=self._queue, routing_key=self._queue, exchange_type='direct', queue_expiration=queue_expiration, queue_auto_delete=False, @@ -881,8 +1031,8 @@ class RpcReplyPikaListener(PikaListener): retrier = retrying.retry( stop_max_attempt_number=self._pika_engine.rpc_reply_retry_attempts, - stop_max_delay=timeout, - wait_fixed=self._pika_engine.rpc_reply_retry_delay, + stop_max_delay=timeout * 1000, + wait_fixed=self._pika_engine.rpc_reply_retry_delay * 1000, retry_on_exception=on_exception, ) @@ -912,7 +1062,8 @@ class NotificationPikaListener(PikaListener): for target, priority in self._targets_and_priorities: routing_key = '%s.%s' % (target.topic, priority) queue = self._queue_name or routing_key - self._pika_engine.declare_queue_binding( + self._pika_engine.declare_queue_binding_by_channel( + channel=self._channel, exchange=( target.exchange or self._pika_engine.default_notification_exchange @@ -991,7 +1142,7 @@ class PikaDriver(object): retrying.retry( stop_max_attempt_number=(None if retry == -1 else retry), retry_on_exception=on_exception, - wait_fixed=self._pika_engine.rpc_retry_delay, + wait_fixed=self._pika_engine.rpc_retry_delay * 1000, ) ) @@ -1045,7 +1196,7 @@ class PikaDriver(object): queue=target.topic, routing_key=target.topic, exchange_type='direct', - queue_expiration=False, + queue_expiration=None, queue_auto_delete=False, durable=self._pika_engine.notification_persistence, ) @@ -1054,6 +1205,7 @@ class PikaDriver(object): return True elif isinstance(ex, (ConnectionException, MessageRejectedException)): + LOG.warn(str(ex)) return True else: return False @@ -1061,7 +1213,7 @@ class PikaDriver(object): retrier = retrying.retry( stop_max_attempt_number=(None if retry == -1 else retry), retry_on_exception=on_exception, - wait_fixed=self._pika_engine.notification_retry_delay, + wait_fixed=self._pika_engine.notification_retry_delay * 1000, ) msg = PikaOutgoingMessage(self._pika_engine, message, ctxt) @@ -1080,7 +1232,11 @@ class PikaDriver(object): ) def listen(self, target): - listener = RpcServicePikaListener(self._pika_engine, target) + listener = RpcServicePikaListener( + self._pika_engine, target, + no_ack=not self._pika_engine.rpc_listener_ack, + prefetch_count=self._pika_engine.rpc_listener_prefetch_count + ) listener.start() return listener