Merge "Replace retrying with tenacity"
This commit is contained in:
commit
cf5e3685f1
@ -16,7 +16,7 @@ from oslo_config import cfg
|
||||
from oslo_log import log as logging
|
||||
from oslo_utils import timeutils
|
||||
import pika_pool
|
||||
import retrying
|
||||
import tenacity
|
||||
|
||||
from oslo_messaging._drivers import base
|
||||
from oslo_messaging._drivers.pika_driver import (pika_connection_factory as
|
||||
@ -201,14 +201,15 @@ class PikaDriver(base.BaseDriver):
|
||||
else:
|
||||
return False
|
||||
|
||||
retrier = (
|
||||
None if retry == 0 else
|
||||
retrying.retry(
|
||||
stop_max_attempt_number=(None if retry == -1 else retry),
|
||||
retry_on_exception=on_exception,
|
||||
wait_fixed=self._pika_engine.rpc_retry_delay * 1000,
|
||||
if retry:
|
||||
retrier = tenacity.retry(
|
||||
stop=(tenacity.stop_never if retry == -1 else
|
||||
tenacity.stop_after_attempt(retry)),
|
||||
retry=tenacity.retry_if_exception(on_exception),
|
||||
wait=tenacity.wait_fixed(self._pika_engine.rpc_retry_delay)
|
||||
)
|
||||
)
|
||||
else:
|
||||
retrier = None
|
||||
|
||||
if target.fanout:
|
||||
return self.cast_all_workers(
|
||||
@ -312,11 +313,17 @@ class PikaDriver(base.BaseDriver):
|
||||
else:
|
||||
return False
|
||||
|
||||
retrier = retrying.retry(
|
||||
stop_max_attempt_number=(None if retry == -1 else retry),
|
||||
retry_on_exception=on_exception,
|
||||
wait_fixed=self._pika_engine.notification_retry_delay * 1000,
|
||||
)
|
||||
if retry:
|
||||
retrier = tenacity.retry(
|
||||
stop=(tenacity.stop_never if retry == -1 else
|
||||
tenacity.stop_after_attempt(retry)),
|
||||
retry=tenacity.retry_if_exception(on_exception),
|
||||
wait=tenacity.wait_fixed(
|
||||
self._pika_engine.notification_retry_delay
|
||||
)
|
||||
)
|
||||
else:
|
||||
retrier = None
|
||||
|
||||
msg = pika_drv_msg.PikaOutgoingMessage(self._pika_engine, message,
|
||||
ctxt)
|
||||
|
@ -25,8 +25,8 @@ from oslo_utils import timeutils
|
||||
from pika import exceptions as pika_exceptions
|
||||
from pika import spec as pika_spec
|
||||
import pika_pool
|
||||
import retrying
|
||||
import six
|
||||
import tenacity
|
||||
|
||||
|
||||
import oslo_messaging
|
||||
@ -201,14 +201,22 @@ class RpcPikaIncomingMessage(PikaIncomingMessage, base.RpcIncomingMessage):
|
||||
else:
|
||||
return False
|
||||
|
||||
retrier = retrying.retry(
|
||||
stop_max_attempt_number=(
|
||||
None if self._pika_engine.rpc_reply_retry_attempts == -1
|
||||
else self._pika_engine.rpc_reply_retry_attempts
|
||||
),
|
||||
retry_on_exception=on_exception,
|
||||
wait_fixed=self._pika_engine.rpc_reply_retry_delay * 1000,
|
||||
) if self._pika_engine.rpc_reply_retry_attempts else None
|
||||
if self._pika_engine.rpc_reply_retry_attempts:
|
||||
retrier = tenacity.retry(
|
||||
stop=(
|
||||
tenacity.stop_never
|
||||
if self._pika_engine.rpc_reply_retry_attempts == -1 else
|
||||
tenacity.stop_after_attempt(
|
||||
self._pika_engine.rpc_reply_retry_attempts
|
||||
)
|
||||
),
|
||||
retry=tenacity.retry_if_exception(on_exception),
|
||||
wait=tenacity.wait_fixed(
|
||||
self._pika_engine.rpc_reply_retry_delay
|
||||
)
|
||||
)
|
||||
else:
|
||||
retrier = None
|
||||
|
||||
try:
|
||||
timeout = (None if self.expiration_time is None else
|
||||
@ -438,8 +446,8 @@ class PikaOutgoingMessage(object):
|
||||
for routing into durable queues
|
||||
:param stopwatch: StopWatch, stopwatch object for calculating
|
||||
allowed timeouts
|
||||
:param retrier: retrying.Retrier, configured retrier object for sending
|
||||
message, if None no retrying is performed
|
||||
:param retrier: tenacity.Retrying, configured retrier object for
|
||||
sending message, if None no retrying is performed
|
||||
"""
|
||||
msg_props.delivery_mode = 2 if persistent else 1
|
||||
|
||||
@ -475,8 +483,8 @@ class PikaOutgoingMessage(object):
|
||||
for routing into durable queues
|
||||
:param stopwatch: StopWatch, stopwatch object for calculating
|
||||
allowed timeouts
|
||||
:param retrier: retrying.Retrier, configured retrier object for sending
|
||||
message, if None no retrying is performed
|
||||
:param retrier: tenacity.Retrying, configured retrier object for
|
||||
sending message, if None no retrying is performed
|
||||
"""
|
||||
msg_dict, msg_props = self._prepare_message_to_send()
|
||||
|
||||
@ -506,8 +514,8 @@ class RpcPikaOutgoingMessage(PikaOutgoingMessage):
|
||||
reply. If None - return immediately without reply waiting
|
||||
:param stopwatch: StopWatch, stopwatch object for calculating
|
||||
allowed timeouts
|
||||
:param retrier: retrying.Retrier, configured retrier object for sending
|
||||
message, if None no retrying is performed
|
||||
:param retrier: tenacity.Retrying, configured retrier object for
|
||||
sending message, if None no retrying is performed
|
||||
"""
|
||||
msg_dict, msg_props = self._prepare_message_to_send()
|
||||
|
||||
@ -595,8 +603,8 @@ class RpcReplyPikaOutgoingMessage(PikaOutgoingMessage):
|
||||
:param reply_q: String, queue name for sending reply
|
||||
:param stopwatch: StopWatch, stopwatch object for calculating
|
||||
allowed timeouts
|
||||
:param retrier: retrying.Retrier, configured retrier object for sending
|
||||
message, if None no retrying is performed
|
||||
:param retrier: tenacity.Retrying, configured retrier object for
|
||||
sending message, if None no retrying is performed
|
||||
"""
|
||||
|
||||
msg_dict, msg_props = self._prepare_message_to_send()
|
||||
|
@ -16,8 +16,8 @@ import abc
|
||||
import contextlib
|
||||
import logging
|
||||
|
||||
import retrying
|
||||
import six
|
||||
import tenacity
|
||||
|
||||
from oslo_messaging._drivers.zmq_driver.matchmaker import zmq_matchmaker_base
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_async
|
||||
@ -30,9 +30,9 @@ zmq = zmq_async.import_zmq()
|
||||
|
||||
def _drop_message_warn(request):
|
||||
LOG.warning(_LW("Matchmaker contains no records for specified "
|
||||
"target %(target)s. Dropping message %(msg_id)s.")
|
||||
% {"target": request.target,
|
||||
"msg_id": request.message_id})
|
||||
"target %(target)s. Dropping message %(msg_id)s."),
|
||||
{"target": request.target,
|
||||
"msg_id": request.message_id})
|
||||
|
||||
|
||||
def target_not_found_warn(func):
|
||||
@ -40,7 +40,7 @@ def target_not_found_warn(func):
|
||||
try:
|
||||
return func(self, request, *args, **kwargs)
|
||||
except (zmq_matchmaker_base.MatchmakerUnavailable,
|
||||
retrying.RetryError):
|
||||
tenacity.RetryError):
|
||||
_drop_message_warn(request)
|
||||
return _target_not_found_warn
|
||||
|
||||
@ -50,7 +50,7 @@ def target_not_found_timeout(func):
|
||||
try:
|
||||
return func(self, request, *args, **kwargs)
|
||||
except (zmq_matchmaker_base.MatchmakerUnavailable,
|
||||
retrying.RetryError):
|
||||
tenacity.RetryError):
|
||||
_drop_message_warn(request)
|
||||
self.publisher._raise_timeout(request)
|
||||
return _target_not_found_timeout
|
||||
|
@ -20,8 +20,8 @@ import time
|
||||
|
||||
from oslo_config import cfg
|
||||
from oslo_utils import importutils
|
||||
from retrying import retry
|
||||
import six
|
||||
import tenacity
|
||||
|
||||
from oslo_messaging._drivers.zmq_driver.matchmaker import zmq_matchmaker_base
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_address
|
||||
@ -30,6 +30,7 @@ from oslo_messaging._i18n import _LE, _LI, _LW
|
||||
|
||||
redis = importutils.try_import('redis')
|
||||
redis_sentinel = importutils.try_import('redis.sentinel')
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@ -54,8 +55,8 @@ matchmaker_redis_opts = [
|
||||
default=[],
|
||||
deprecated_for_removal=True,
|
||||
deprecated_reason="Replaced by [DEFAULT]/transport_url",
|
||||
help='List of Redis Sentinel hosts (fault tolerance mode) e.g.\
|
||||
[host:port, host1:port ... ]'),
|
||||
help='List of Redis Sentinel hosts (fault tolerance mode), '
|
||||
'e.g., [host:port, host1:port ... ]'),
|
||||
cfg.StrOpt('sentinel_group_name',
|
||||
default='oslo-messaging-zeromq',
|
||||
help='Redis replica set name.'),
|
||||
@ -67,7 +68,7 @@ matchmaker_redis_opts = [
|
||||
help='Time in ms to wait before the transaction is killed.'),
|
||||
cfg.IntOpt('socket_timeout',
|
||||
default=10000,
|
||||
help='Timeout in ms on blocking socket operations'),
|
||||
help='Timeout in ms on blocking socket operations.'),
|
||||
]
|
||||
|
||||
_PUBLISHERS_KEY = "PUBLISHERS"
|
||||
@ -132,11 +133,7 @@ def empty_list_on_error(func):
|
||||
return func_wrapper
|
||||
|
||||
|
||||
def retry_if_connection_error(ex):
|
||||
return isinstance(ex, zmq_matchmaker_base.MatchmakerUnavailable)
|
||||
|
||||
|
||||
def retry_if_empty(hosts):
|
||||
def is_empty(hosts):
|
||||
return not hosts
|
||||
|
||||
|
||||
@ -239,12 +236,15 @@ class MatchmakerRedisBase(zmq_matchmaker_base.MatchmakerBase):
|
||||
return self._retry_method(target, listener_type, self.get_hosts_fanout)
|
||||
|
||||
def _retry_method(self, target, listener_type, method):
|
||||
@retry(retry_on_result=retry_if_empty,
|
||||
wrap_exception=True,
|
||||
wait_fixed=self.conf.matchmaker_redis.wait_timeout,
|
||||
stop_max_delay=self.conf.matchmaker_redis.check_timeout)
|
||||
wait_timeout = self.conf.matchmaker_redis.wait_timeout / 1000.
|
||||
check_timeout = self.conf.matchmaker_redis.check_timeout / 1000.
|
||||
|
||||
@tenacity.retry(retry=tenacity.retry_if_result(is_empty),
|
||||
wait=tenacity.wait_fixed(wait_timeout),
|
||||
stop=tenacity.stop_after_delay(check_timeout))
|
||||
def _get_hosts_retry(target, listener_type):
|
||||
return method(target, listener_type)
|
||||
|
||||
return _get_hosts_retry(target, listener_type)
|
||||
|
||||
|
||||
@ -362,15 +362,15 @@ class MatchmakerSentinel(MatchmakerRedisBase):
|
||||
|
||||
def __init__(self, conf, *args, **kwargs):
|
||||
super(MatchmakerSentinel, self).__init__(conf, *args, **kwargs)
|
||||
socket_timeout = self.conf.matchmaker_redis.socket_timeout / 1000.
|
||||
|
||||
self._sentinel_hosts, password, master_group = \
|
||||
self._extract_sentinel_hosts()
|
||||
|
||||
self._sentinel = redis_sentinel.Sentinel(
|
||||
sentinels=self._sentinel_hosts,
|
||||
socket_timeout=socket_timeout,
|
||||
password=password)
|
||||
socket_timeout=self.conf.matchmaker_redis.socket_timeout / 1000.,
|
||||
password=password
|
||||
)
|
||||
|
||||
self._redis_master = self._sentinel.master_for(master_group)
|
||||
self._redis_slave = self._sentinel.slave_for(master_group)
|
||||
|
@ -11,6 +11,7 @@
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import functools
|
||||
import unittest
|
||||
|
||||
@ -172,7 +173,7 @@ class RpcPikaIncomingMessageTestCase(unittest.TestCase):
|
||||
|
||||
@patch("oslo_messaging._drivers.pika_driver.pika_message."
|
||||
"RpcReplyPikaOutgoingMessage")
|
||||
@patch("retrying.retry")
|
||||
@patch("tenacity.retry")
|
||||
def test_positive_reply_for_call_message(self,
|
||||
retry_mock,
|
||||
outgoing_message_mock):
|
||||
@ -202,13 +203,12 @@ class RpcPikaIncomingMessageTestCase(unittest.TestCase):
|
||||
reply_q='reply_queue', stopwatch=mock.ANY, retrier=mock.ANY
|
||||
)
|
||||
retry_mock.assert_called_once_with(
|
||||
retry_on_exception=mock.ANY, stop_max_attempt_number=3,
|
||||
wait_fixed=250.0
|
||||
stop=mock.ANY, retry=mock.ANY, wait=mock.ANY
|
||||
)
|
||||
|
||||
@patch("oslo_messaging._drivers.pika_driver.pika_message."
|
||||
"RpcReplyPikaOutgoingMessage")
|
||||
@patch("retrying.retry")
|
||||
@patch("tenacity.retry")
|
||||
def test_negative_reply_for_call_message(self,
|
||||
retry_mock,
|
||||
outgoing_message_mock):
|
||||
@ -241,8 +241,7 @@ class RpcPikaIncomingMessageTestCase(unittest.TestCase):
|
||||
reply_q='reply_queue', stopwatch=mock.ANY, retrier=mock.ANY
|
||||
)
|
||||
retry_mock.assert_called_once_with(
|
||||
retry_on_exception=mock.ANY, stop_max_attempt_number=3,
|
||||
wait_fixed=250.0
|
||||
stop=mock.ANY, retry=mock.ANY, wait=mock.ANY
|
||||
)
|
||||
|
||||
|
||||
|
@ -20,9 +20,7 @@ monotonic>=0.6 # Apache-2.0
|
||||
six>=1.9.0 # MIT
|
||||
cachetools>=1.1.0 # MIT License
|
||||
|
||||
|
||||
# FIXME(markmc): remove this when the drivers no longer
|
||||
# import eventlet
|
||||
# FIXME(markmc): remove this when the drivers no longer import eventlet
|
||||
|
||||
eventlet!=0.18.3,>=0.18.2 # MIT
|
||||
greenlet>=0.3.2 # MIT
|
||||
@ -41,7 +39,7 @@ pika-pool>=0.1.3 # BSD
|
||||
|
||||
# used by pika and zmq drivers
|
||||
futures>=3.0;python_version=='2.7' or python_version=='2.6' # BSD
|
||||
retrying!=1.3.0,>=1.2.3 # Apache-2.0
|
||||
tenacity>=3.2.1 # Apache-2.0
|
||||
|
||||
# middleware
|
||||
oslo.middleware>=3.0.0 # Apache-2.0
|
||||
|
Loading…
x
Reference in New Issue
Block a user