Merge "Replace retrying with tenacity"
This commit is contained in:
commit
6e137a0e7d
@ -19,8 +19,8 @@ import uuid
|
|||||||
|
|
||||||
from oslo_config import cfg
|
from oslo_config import cfg
|
||||||
from oslo_log import log
|
from oslo_log import log
|
||||||
import retrying
|
|
||||||
import six
|
import six
|
||||||
|
import tenacity
|
||||||
import tooz.coordination
|
import tooz.coordination
|
||||||
|
|
||||||
from aodh.i18n import _LE, _LI, _LW
|
from aodh.i18n import _LE, _LI, _LW
|
||||||
@ -67,14 +67,6 @@ class MemberNotInGroupError(Exception):
|
|||||||
{'group_id': group_id, 'members': members, 'me': my_id})
|
{'group_id': group_id, 'members': members, 'me': my_id})
|
||||||
|
|
||||||
|
|
||||||
def retry_on_error_joining_partition(exception):
|
|
||||||
return isinstance(exception, ErrorJoiningPartitioningGroup)
|
|
||||||
|
|
||||||
|
|
||||||
def retry_on_member_not_in_group(exception):
|
|
||||||
return isinstance(exception, MemberNotInGroupError)
|
|
||||||
|
|
||||||
|
|
||||||
class HashRing(object):
|
class HashRing(object):
|
||||||
|
|
||||||
def __init__(self, nodes, replicas=100):
|
def __init__(self, nodes, replicas=100):
|
||||||
@ -169,14 +161,12 @@ class PartitionCoordinator(object):
|
|||||||
or not group_id):
|
or not group_id):
|
||||||
return
|
return
|
||||||
|
|
||||||
retry_backoff = self.conf.coordination.retry_backoff * 1000
|
@tenacity.retry(
|
||||||
max_retry_interval = self.conf.coordination.max_retry_interval * 1000
|
wait=tenacity.wait_exponential(
|
||||||
|
multiplier=self.conf.coordination.retry_backoff,
|
||||||
@retrying.retry(
|
max=self.conf.coordination.max_retry_interval),
|
||||||
wait_exponential_multiplier=retry_backoff,
|
retry=tenacity.retry_if_exception_type(
|
||||||
wait_exponential_max=max_retry_interval,
|
ErrorJoiningPartitioningGroup))
|
||||||
retry_on_exception=retry_on_error_joining_partition,
|
|
||||||
wrap_exception=True)
|
|
||||||
def _inner():
|
def _inner():
|
||||||
try:
|
try:
|
||||||
join_req = self._coordinator.join_group(group_id)
|
join_req = self._coordinator.join_group(group_id)
|
||||||
@ -218,8 +208,11 @@ class PartitionCoordinator(object):
|
|||||||
except tooz.coordination.GroupNotCreated:
|
except tooz.coordination.GroupNotCreated:
|
||||||
self.join_group(group_id)
|
self.join_group(group_id)
|
||||||
|
|
||||||
@retrying.retry(stop_max_attempt_number=5, wait_random_max=2000,
|
@tenacity.retry(
|
||||||
retry_on_exception=retry_on_member_not_in_group)
|
wait=tenacity.wait_random(max=2),
|
||||||
|
stop=tenacity.stop_after_attempt(5),
|
||||||
|
retry=tenacity.retry_if_exception_type(MemberNotInGroupError),
|
||||||
|
reraise=True)
|
||||||
def extract_my_subset(self, group_id, universal_set):
|
def extract_my_subset(self, group_id, universal_set):
|
||||||
"""Filters an iterable, returning only objects assigned to this agent.
|
"""Filters an iterable, returning only objects assigned to this agent.
|
||||||
|
|
||||||
|
@ -19,9 +19,9 @@ import datetime
|
|||||||
from oslo_config import cfg
|
from oslo_config import cfg
|
||||||
from oslo_log import log
|
from oslo_log import log
|
||||||
from oslo_utils import timeutils
|
from oslo_utils import timeutils
|
||||||
import retrying
|
|
||||||
import six.moves.urllib.parse as urlparse
|
import six.moves.urllib.parse as urlparse
|
||||||
from stevedore import driver
|
from stevedore import driver
|
||||||
|
import tenacity
|
||||||
|
|
||||||
_NAMESPACE = 'aodh.storage'
|
_NAMESPACE = 'aodh.storage'
|
||||||
|
|
||||||
@ -61,9 +61,10 @@ def get_connection_from_config(conf):
|
|||||||
{'name': connection_scheme, 'namespace': _NAMESPACE})
|
{'name': connection_scheme, 'namespace': _NAMESPACE})
|
||||||
mgr = driver.DriverManager(_NAMESPACE, connection_scheme)
|
mgr = driver.DriverManager(_NAMESPACE, connection_scheme)
|
||||||
|
|
||||||
# Convert retry_interval secs to msecs for retry decorator
|
@tenacity.retry(
|
||||||
@retrying.retry(wait_fixed=conf.database.retry_interval * 1000,
|
wait=tenacity.wait_fixed(conf.database.retry_interval),
|
||||||
stop_max_attempt_number=retries if retries >= 0 else None)
|
stop=tenacity.stop_after_attempt(retries if retries >= 0 else 5),
|
||||||
|
reraise=True)
|
||||||
def _get_connection():
|
def _get_connection():
|
||||||
"""Return an open connection to the database."""
|
"""Return an open connection to the database."""
|
||||||
return mgr.driver(conf, url)
|
return mgr.driver(conf, url)
|
||||||
|
@ -18,7 +18,6 @@
|
|||||||
import mock
|
import mock
|
||||||
from oslo_config import fixture as fixture_config
|
from oslo_config import fixture as fixture_config
|
||||||
from oslotest import base
|
from oslotest import base
|
||||||
import retrying
|
|
||||||
|
|
||||||
from aodh import service
|
from aodh import service
|
||||||
from aodh import storage
|
from aodh import storage
|
||||||
@ -59,27 +58,25 @@ class ConnectionRetryTest(base.BaseTestCase):
|
|||||||
def test_retries(self):
|
def test_retries(self):
|
||||||
max_retries = 5
|
max_retries = 5
|
||||||
with mock.patch.object(
|
with mock.patch.object(
|
||||||
retrying.Retrying, 'should_reject') as retry_reject:
|
storage.impl_log.Connection, '__init__') as log_init:
|
||||||
with mock.patch.object(
|
|
||||||
storage.impl_log.Connection, '__init__') as log_init:
|
|
||||||
|
|
||||||
class ConnectionError(Exception):
|
class ConnectionError(Exception):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
def x(a, b):
|
def x(a, b):
|
||||||
raise ConnectionError
|
raise ConnectionError
|
||||||
|
|
||||||
log_init.side_effect = x
|
log_init.side_effect = x
|
||||||
self.CONF.set_override("connection", "log://", "database",
|
self.CONF.set_override("connection", "log://", "database",
|
||||||
enforce_type=True)
|
enforce_type=True)
|
||||||
self.CONF.set_override("retry_interval", 0.00001, "database",
|
self.CONF.set_override("retry_interval", 0.00001, "database",
|
||||||
enforce_type=True)
|
enforce_type=True)
|
||||||
self.CONF.set_override("max_retries", max_retries, "database",
|
self.CONF.set_override("max_retries", max_retries, "database",
|
||||||
enforce_type=True)
|
enforce_type=True)
|
||||||
self.assertRaises(ConnectionError,
|
self.assertRaises(ConnectionError,
|
||||||
storage.get_connection_from_config,
|
storage.get_connection_from_config,
|
||||||
self.CONF)
|
self.CONF)
|
||||||
self.assertEqual(max_retries, retry_reject.call_count)
|
self.assertEqual(max_retries, log_init.call_count)
|
||||||
|
|
||||||
|
|
||||||
class ConnectionConfigTest(base.BaseTestCase):
|
class ConnectionConfigTest(base.BaseTestCase):
|
||||||
|
@ -2,7 +2,7 @@
|
|||||||
# of appearance. Changing the order has an impact on the overall integration
|
# of appearance. Changing the order has an impact on the overall integration
|
||||||
# process, which may cause wedges in the gate later.
|
# process, which may cause wedges in the gate later.
|
||||||
|
|
||||||
retrying!=1.3.0,>=1.2.3 # Apache-2.0
|
tenacity>=3.2.1 # Apache-2.0
|
||||||
croniter>=0.3.4 # MIT License
|
croniter>=0.3.4 # MIT License
|
||||||
futures>=3.0;python_version=='2.7' or python_version=='2.6' # BSD
|
futures>=3.0;python_version=='2.7' or python_version=='2.6' # BSD
|
||||||
futurist>=0.11.0 # Apache-2.0
|
futurist>=0.11.0 # Apache-2.0
|
||||||
|
Loading…
Reference in New Issue
Block a user