diff --git a/oslo_messaging/_drivers/impl_pika.py b/oslo_messaging/_drivers/impl_pika.py index 25047e107..a9434818b 100644 --- a/oslo_messaging/_drivers/impl_pika.py +++ b/oslo_messaging/_drivers/impl_pika.py @@ -195,7 +195,7 @@ class PikaDriver(base.BaseDriver): self._declare_rpc_exchange(exchange, expiration_time - time.time()) except pika_drv_exc.ConnectionException as e: - LOG.warning("Problem during declaring exchange. %", e) + LOG.warning("Problem during declaring exchange. %s", e) return True elif isinstance(ex, (pika_drv_exc.ConnectionException, exceptions.MessageDeliveryFailure)): @@ -240,7 +240,7 @@ class PikaDriver(base.BaseDriver): self._declare_rpc_exchange(exchange, expiration_time - time.time()) except pika_drv_exc.ConnectionException as e: - LOG.warning("Problem during declaring exchange. %", e) + LOG.warning("Problem during declaring exchange. %s", e) raise ex if reply is not None: @@ -269,7 +269,7 @@ class PikaDriver(base.BaseDriver): exchange, expiration_time - time.time() ) except pika_drv_exc.ConnectionException as e: - LOG.warning("Problem during declaring exchange. %", e) + LOG.warning("Problem during declaring exchange. %s", e) def _declare_notification_queue_binding(self, target, timeout=None): if timeout is not None and timeout < 0: @@ -303,16 +303,16 @@ class PikaDriver(base.BaseDriver): def on_exception(ex): if isinstance(ex, (pika_drv_exc.ExchangeNotFoundException, pika_drv_exc.RoutingException)): - LOG.warning("Problem during sending notification. %", ex) + LOG.warning("Problem during sending notification. %s", ex) try: self._declare_notification_queue_binding(target) except pika_drv_exc.ConnectionException as e: LOG.warning("Problem during declaring notification queue " - "binding. %", e) + "binding. %s", e) return True elif isinstance(ex, (pika_drv_exc.ConnectionException, pika_drv_exc.MessageRejectedException)): - LOG.warning("Problem during sending notification. %", ex) + LOG.warning("Problem during sending notification. %s", ex) return True else: return False diff --git a/oslo_messaging/_drivers/pika_driver/pika_commons.py b/oslo_messaging/_drivers/pika_driver/pika_commons.py new file mode 100644 index 000000000..a076f40ad --- /dev/null +++ b/oslo_messaging/_drivers/pika_driver/pika_commons.py @@ -0,0 +1,44 @@ +# Copyright 2015 Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import select +import socket +import sys + +from pika import exceptions as pika_exceptions +import six + + +PIKA_CONNECTIVITY_ERRORS = ( + pika_exceptions.AMQPConnectionError, + pika_exceptions.ConnectionClosed, + pika_exceptions.ChannelClosed, + socket.timeout, + select.error +) + +EXCEPTIONS_MODULE = 'exceptions' if six.PY2 else 'builtins' + + +def is_eventlet_monkey_patched(module): + """Determines safely is eventlet patching for module enabled or not + + :param module: String, module name + :return Bool, True if module is pathed, False otherwise + """ + + if 'eventlet.patcher' not in sys.modules: + return False + import eventlet.patcher + return eventlet.patcher.is_monkey_patched(module) diff --git a/oslo_messaging/_drivers/pika_driver/pika_engine.py b/oslo_messaging/_drivers/pika_driver/pika_engine.py index c09936576..3b762c327 100644 --- a/oslo_messaging/_drivers/pika_driver/pika_engine.py +++ b/oslo_messaging/_drivers/pika_driver/pika_engine.py @@ -14,7 +14,6 @@ import random import socket -import sys import threading import time @@ -22,30 +21,16 @@ from oslo_log import log as logging import pika from pika.adapters import select_connection from pika import credentials as pika_credentials + import pika_pool -import six import uuid +from oslo_messaging._drivers.pika_driver import pika_commons as pika_drv_cmns from oslo_messaging._drivers.pika_driver import pika_exceptions as pika_drv_exc LOG = logging.getLogger(__name__) -_EXCEPTIONS_MODULE = 'exceptions' if six.PY2 else 'builtins' - - -def _is_eventlet_monkey_patched(module): - """Determines safely is eventlet patching for module enabled or not - - :param module: String, module name - :return Bool, True if module is pathed, False otherwise - """ - - if 'eventlet.patcher' not in sys.modules: - return False - import eventlet.patcher - return eventlet.patcher.is_monkey_patched(module) - def _create_select_poller_connection_impl( parameters, on_open_callback, on_open_error_callback, @@ -99,7 +84,9 @@ class PikaEngine(object): allowed_remote_exmods=None): self.conf = conf - self._force_select_poller_use = _is_eventlet_monkey_patched('select') + self._force_select_poller_use = ( + pika_drv_cmns.is_eventlet_monkey_patched('select') + ) # processing rpc options self.default_rpc_exchange = ( @@ -109,7 +96,7 @@ class PikaEngine(object): conf.oslo_messaging_pika.rpc_reply_exchange ) - self.allowed_remote_exmods = [_EXCEPTIONS_MODULE] + self.allowed_remote_exmods = [pika_drv_cmns.EXCEPTIONS_MODULE] if allowed_remote_exmods: self.allowed_remote_exmods.extend(allowed_remote_exmods) @@ -359,8 +346,8 @@ class PikaEngine(object): self.HOST_CONNECTION_LAST_TRY_TIME ] = cur_time - @staticmethod - def declare_exchange_by_channel(channel, exchange, exchange_type, durable): + def declare_exchange_by_channel(self, channel, exchange, exchange_type, + durable): """Declare exchange using already created channel, if they don't exist :param channel: Channel for communication with RabbitMQ @@ -373,7 +360,7 @@ class PikaEngine(object): channel.exchange_declare( exchange, exchange_type, auto_delete=True, durable=durable ) - except pika_pool.Connection.connectivity_errors as e: + except pika_drv_cmns.PIKA_CONNECTIVITY_ERRORS as e: raise pika_drv_exc.ConnectionException( "Connectivity problem detected during declaring exchange: " "exchange:{}, exchange_type: {}, durable: {}. {}".format( @@ -381,10 +368,9 @@ class PikaEngine(object): ) ) - @staticmethod - def declare_queue_binding_by_channel(channel, exchange, queue, routing_key, - exchange_type, queue_expiration, - durable): + def declare_queue_binding_by_channel(self, channel, exchange, queue, + routing_key, exchange_type, + queue_expiration, durable): """Declare exchange, queue and bind them using already created channel, if they don't exist @@ -410,7 +396,7 @@ class PikaEngine(object): channel.queue_declare(queue, durable=durable, arguments=arguments) channel.queue_bind(queue, exchange, routing_key) - except pika_pool.Connection.connectivity_errors as e: + except pika_drv_cmns.PIKA_CONNECTIVITY_ERRORS as e: raise pika_drv_exc.ConnectionException( "Connectivity problem detected during declaring queue " "binding: exchange:{}, queue: {}, routing_key: {}, " diff --git a/oslo_messaging/_drivers/pika_driver/pika_listener.py b/oslo_messaging/_drivers/pika_driver/pika_listener.py index 54ede1230..1e52969b7 100644 --- a/oslo_messaging/_drivers/pika_driver/pika_listener.py +++ b/oslo_messaging/_drivers/pika_driver/pika_listener.py @@ -13,13 +13,11 @@ # under the License. import threading -import time import uuid from concurrent import futures from oslo_log import log as logging -from oslo_messaging._drivers.pika_driver import pika_exceptions as pika_drv_exc from oslo_messaging._drivers.pika_driver import pika_poller as pika_drv_poller LOG = logging.getLogger(__name__) @@ -97,15 +95,7 @@ class RpcReplyPikaListener(object): """ while self._reply_poller: try: - try: - messages = self._reply_poller.poll() - except pika_drv_exc.EstablishConnectionException: - LOG.exception("Problem during establishing connection for " - "reply polling") - time.sleep( - self._pika_engine.host_connection_reconnect_delay - ) - continue + messages = self._reply_poller.poll() for message in messages: try: diff --git a/oslo_messaging/_drivers/pika_driver/pika_poller.py b/oslo_messaging/_drivers/pika_driver/pika_poller.py index a5279fa8e..dc3d27912 100644 --- a/oslo_messaging/_drivers/pika_driver/pika_poller.py +++ b/oslo_messaging/_drivers/pika_driver/pika_poller.py @@ -13,13 +13,14 @@ # under the License. import threading +import time from oslo_log import log as logging from oslo_utils import timeutils -import pika_pool import six from oslo_messaging._drivers import base +from oslo_messaging._drivers.pika_driver import pika_commons as pika_drv_cmns from oslo_messaging._drivers.pika_driver import pika_exceptions as pika_drv_exc from oslo_messaging._drivers.pika_driver import pika_message as pika_drv_msg @@ -143,21 +144,17 @@ class PikaPoller(base.Listener): """Cleanup allocated resources (channel, connection, etc). It is unsafe method for internal use only """ - if self._channel: - try: - self._channel.close() - except Exception as ex: - if not pika_pool.Connection.is_connection_invalidated(ex): - LOG.exception("Unexpected error during closing channel") - self._channel = None - if self._connection: try: self._connection.close() - except Exception as ex: - if not pika_pool.Connection.is_connection_invalidated(ex): - LOG.exception("Unexpected error during closing connection") - self._connection = None + except pika_drv_cmns.PIKA_CONNECTIVITY_ERRORS: + # expected errors + pass + except Exception: + LOG.exception("Unexpected error during closing connection") + finally: + self._channel = None + self._connection = None for i in six.moves.range(len(self._message_queue) - 1, -1, -1): message = self._message_queue[i] @@ -201,15 +198,25 @@ class PikaPoller(base.Listener): else: # consumer is stopped so we don't expect new # messages, just process already sent events - self._connection.process_data_events( - time_limit=0 - ) + if self._channel is not None: + self._connection.process_data_events( + time_limit=0 + ) # and return result if we don't see new messages if last_queue_size == len(self._message_queue): result = self._message_queue[:prefetch_size] del self._message_queue[:prefetch_size] return result - except pika_pool.Connection.connectivity_errors: + except pika_drv_exc.EstablishConnectionException as e: + LOG.warn("Problem during establishing connection for" + "pika poller %s", e, exc_info=True) + time.sleep( + self._pika_engine.host_connection_reconnect_delay + ) + except pika_drv_exc.ConnectionException: + self._cleanup() + raise + except pika_drv_cmns.PIKA_CONNECTIVITY_ERRORS: self._cleanup() raise @@ -220,19 +227,24 @@ class PikaPoller(base.Listener): with self._lock: if self._started: return - self._started = True - self._cleanup() try: self._reconnect() - except Exception as exc: + except pika_drv_exc.EstablishConnectionException as exc: + LOG.warn("Can not establishing connection during pika " + "Conecting required during first poll() call. %s", + exc, exc_info=True) + except pika_drv_exc.ConnectionException as exc: self._cleanup() - if isinstance(exc, pika_pool.Connection.connectivity_errors): - raise pika_drv_exc.ConnectionException( - "Connectivity problem detected during establishing " - "poller's connection. " + str(exc)) - else: - raise exc + LOG.warn("Connectivity problem during pika poller's start(). " + "Reconnecting required during first poll() call. %s", + exc, exc_info=True) + except pika_drv_cmns.PIKA_CONNECTIVITY_ERRORS as exc: + self._cleanup() + LOG.warn("Connectivity problem during pika poller's start(). " + "Reconnecting required during first poll() call. %s", + exc, exc_info=True) + self._started = True def stop(self): """Stops poller. Should be called when polling is not needed anymore to @@ -246,15 +258,10 @@ class PikaPoller(base.Listener): if self._queues_to_consume and self._channel: try: self._stop_consuming() - except Exception as exc: + except pika_drv_cmns.PIKA_CONNECTIVITY_ERRORS as exc: self._cleanup() - if isinstance(exc, - pika_pool.Connection.connectivity_errors): - raise pika_drv_exc.ConnectionException( - "Connectivity problem detected during " - "consumer canceling. " + str(exc)) - else: - raise exc + LOG.warn("Connectivity problem detected during consumer " + "cancellation. %s", exc, exc_info=True) self._started = False def cleanup(self): diff --git a/oslo_messaging/tests/drivers/pika/test_message.py b/oslo_messaging/tests/drivers/pika/test_message.py index 0cc1b869d..7cddb082c 100644 --- a/oslo_messaging/tests/drivers/pika/test_message.py +++ b/oslo_messaging/tests/drivers/pika/test_message.py @@ -22,7 +22,7 @@ from oslo_serialization import jsonutils import pika import oslo_messaging -from oslo_messaging._drivers.pika_driver import pika_engine +from oslo_messaging._drivers.pika_driver import pika_commons as pika_drv_cmns from oslo_messaging._drivers.pika_driver import pika_message as pika_drv_msg @@ -252,7 +252,7 @@ class RpcReplyPikaIncomingMessageTestCase(unittest.TestCase): def setUp(self): self._pika_engine = mock.Mock() self._pika_engine.allowed_remote_exmods = [ - pika_engine._EXCEPTIONS_MODULE, "oslo_messaging.exceptions" + pika_drv_cmns.EXCEPTIONS_MODULE, "oslo_messaging.exceptions" ] self._channel = mock.Mock() diff --git a/oslo_messaging/tests/drivers/pika/test_poller.py b/oslo_messaging/tests/drivers/pika/test_poller.py index 1209d131a..23667ab7e 100644 --- a/oslo_messaging/tests/drivers/pika/test_poller.py +++ b/oslo_messaging/tests/drivers/pika/test_poller.py @@ -12,6 +12,7 @@ # License for the specific language governing permissions and limitations # under the License. +import socket import time import unittest @@ -33,6 +34,18 @@ class PikaPollerTestCase(unittest.TestCase): ) self._prefetch_count = 123 + def test_start_when_connection_unavailable(self): + incoming_message_class_mock = mock.Mock() + poller = pika_poller.PikaPoller( + self._pika_engine, self._prefetch_count, + incoming_message_class=incoming_message_class_mock + ) + + self._pika_engine.create_connection.side_effect = socket.timeout() + + # start() should not raise socket.timeout exception + poller.start() + @mock.patch("oslo_messaging._drivers.pika_driver.pika_poller.PikaPoller." "_declare_queue_binding") def test_poll(self, declare_queue_binding_mock):