Merge "fix(MongoDB): Driver does not retry on AutoReconnect errors"
This commit is contained in:
commit
edae7f388a
@ -57,6 +57,7 @@ class ClaimController(storage.Claim):
|
||||
"""
|
||||
|
||||
@utils.raises_conn_error
|
||||
@utils.retries_on_autoreconnect
|
||||
def get(self, queue, claim_id, project=None):
|
||||
msg_ctrl = self.driver.message_controller
|
||||
|
||||
@ -97,7 +98,14 @@ class ClaimController(storage.Claim):
|
||||
|
||||
return (claim_meta, msgs)
|
||||
|
||||
# NOTE(kgriffs): If we get an autoreconnect or any other connection error,
|
||||
# the worst that can happen is you get an orphaned claim, but it will
|
||||
# expire eventually and free up those messages to be claimed again. We
|
||||
# might consider setting a "claim valid" flag similar to how posting
|
||||
# messages works, in order to avoid this situation if it turns out to
|
||||
# be a real problem for users.
|
||||
@utils.raises_conn_error
|
||||
@utils.retries_on_autoreconnect
|
||||
def create(self, queue, metadata, project=None,
|
||||
limit=storage.DEFAULT_MESSAGES_PER_CLAIM):
|
||||
"""Creates a claim.
|
||||
@ -190,6 +198,7 @@ class ClaimController(storage.Claim):
|
||||
return (str(oid), messages)
|
||||
|
||||
@utils.raises_conn_error
|
||||
@utils.retries_on_autoreconnect
|
||||
def update(self, queue, claim_id, metadata, project=None):
|
||||
cid = utils.to_oid(claim_id)
|
||||
if cid is None:
|
||||
@ -233,6 +242,7 @@ class ClaimController(storage.Claim):
|
||||
upsert=False, multi=True)
|
||||
|
||||
@utils.raises_conn_error
|
||||
@utils.retries_on_autoreconnect
|
||||
def delete(self, queue, claim_id, project=None):
|
||||
msg_ctrl = self.driver.message_controller
|
||||
msg_ctrl._unclaim(queue, claim_id, project=project)
|
||||
|
@ -425,6 +425,7 @@ class MessageController(storage.Message):
|
||||
yield str(marker_id['next'])
|
||||
|
||||
@utils.raises_conn_error
|
||||
@utils.retries_on_autoreconnect
|
||||
def first(self, queue_name, project=None, sort=1):
|
||||
cursor = self._list(queue_name, project=project,
|
||||
include_claimed=True, sort=sort,
|
||||
@ -437,6 +438,7 @@ class MessageController(storage.Message):
|
||||
return message
|
||||
|
||||
@utils.raises_conn_error
|
||||
@utils.retries_on_autoreconnect
|
||||
def get(self, queue_name, message_id, project=None):
|
||||
mid = utils.to_oid(message_id)
|
||||
if mid is None:
|
||||
@ -460,6 +462,7 @@ class MessageController(storage.Message):
|
||||
return _basic_message(message[0], now)
|
||||
|
||||
@utils.raises_conn_error
|
||||
@utils.retries_on_autoreconnect
|
||||
def bulk_get(self, queue_name, message_ids, project=None):
|
||||
message_ids = [mid for mid in map(utils.to_oid, message_ids) if mid]
|
||||
if not message_ids:
|
||||
@ -485,7 +488,13 @@ class MessageController(storage.Message):
|
||||
return utils.HookedCursor(messages, denormalizer)
|
||||
|
||||
@utils.raises_conn_error
|
||||
@utils.retries_on_autoreconnect
|
||||
def post(self, queue_name, messages, client_uuid, project=None):
|
||||
# NOTE(flaper87): This method should be safe to retry on
|
||||
# autoreconnect, since we've a 2-step insert for messages.
|
||||
# The worst-case scenario is that we'll increase the counter
|
||||
# several times and we'd end up with some non-active messages.
|
||||
|
||||
if not self._queue_ctrl.exists(queue_name, project):
|
||||
raise errors.QueueDoesNotExist(queue_name, project)
|
||||
|
||||
@ -664,6 +673,7 @@ class MessageController(storage.Message):
|
||||
succeeded_ids)
|
||||
|
||||
@utils.raises_conn_error
|
||||
@utils.retries_on_autoreconnect
|
||||
def delete(self, queue_name, message_id, project=None, claim=None):
|
||||
# NOTE(cpp-cabrera): return early - this is an invalid message
|
||||
# id so we won't be able to find it any way
|
||||
@ -714,6 +724,7 @@ class MessageController(storage.Message):
|
||||
collection.remove(query['_id'], w=0)
|
||||
|
||||
@utils.raises_conn_error
|
||||
@utils.retries_on_autoreconnect
|
||||
def bulk_delete(self, queue_name, message_ids, project=None):
|
||||
message_ids = [mid for mid in map(utils.to_oid, message_ids) if mid]
|
||||
query = {
|
||||
|
@ -53,6 +53,16 @@ MONGODB_OPTIONS = [
|
||||
'sleep interval, in order to decrease probability '
|
||||
'that parallel requests will retry at the '
|
||||
'same instant.')),
|
||||
|
||||
cfg.IntOpt('max_reconnect_attempts', default=10,
|
||||
help=('Maximum number of times to retry an operation that '
|
||||
'failed due to a primary node failover.')),
|
||||
|
||||
cfg.FloatOpt('reconnect_sleep', default=0.020,
|
||||
help=('Base sleep interval between attempts to reconnect '
|
||||
'after a primary node failover. '
|
||||
'The actual sleep time increases exponentially (power '
|
||||
'of 2) each time the operation is retried.')),
|
||||
]
|
||||
|
||||
MONGODB_GROUP = 'drivers:storage:mongodb'
|
||||
|
@ -192,12 +192,23 @@ class QueueController(storage.Queue):
|
||||
yield marker_name and marker_name['next']
|
||||
|
||||
@utils.raises_conn_error
|
||||
@utils.retries_on_autoreconnect
|
||||
def get_metadata(self, name, project=None):
|
||||
queue = self._get(name, project)
|
||||
return queue.get('m', {})
|
||||
|
||||
@utils.raises_conn_error
|
||||
# @utils.retries_on_autoreconnect
|
||||
def create(self, name, project=None):
|
||||
# NOTE(flaper87): If the connection fails after it was called
|
||||
# and we retry to insert the queue, we could end up returning
|
||||
# `False` because of the `DuplicatedKeyError` although the
|
||||
# queue was indeed created by this API call.
|
||||
#
|
||||
# TODO(kgriffs): Commented out `retries_on_autoreconnect` for
|
||||
# now due to the above issue, since creating a queue is less
|
||||
# important to make super HA.
|
||||
|
||||
try:
|
||||
# NOTE(kgriffs): Start counting at 1, and assume the first
|
||||
# message ever posted will succeed and set t to a UNIX
|
||||
@ -214,11 +225,13 @@ class QueueController(storage.Queue):
|
||||
return True
|
||||
|
||||
@utils.raises_conn_error
|
||||
@utils.retries_on_autoreconnect
|
||||
def exists(self, name, project=None):
|
||||
query = _get_scoped_query(name, project)
|
||||
return self._collection.find_one(query) is not None
|
||||
|
||||
@utils.raises_conn_error
|
||||
@utils.retries_on_autoreconnect
|
||||
def set_metadata(self, name, metadata, project=None):
|
||||
rst = self._collection.update(_get_scoped_query(name, project),
|
||||
{'$set': {'m': metadata}},
|
||||
@ -229,11 +242,13 @@ class QueueController(storage.Queue):
|
||||
raise errors.QueueDoesNotExist(name, project)
|
||||
|
||||
@utils.raises_conn_error
|
||||
@utils.retries_on_autoreconnect
|
||||
def delete(self, name, project=None):
|
||||
self.driver.message_controller._purge_queue(name, project)
|
||||
self._collection.remove(_get_scoped_query(name, project))
|
||||
|
||||
@utils.raises_conn_error
|
||||
@utils.retries_on_autoreconnect
|
||||
def stats(self, name, project=None):
|
||||
if not self.exists(name, project=project):
|
||||
raise errors.QueueDoesNotExist(name, project)
|
||||
|
@ -18,12 +18,14 @@ import collections
|
||||
import datetime
|
||||
import functools
|
||||
import random
|
||||
import time
|
||||
|
||||
from bson import errors as berrors
|
||||
from bson import objectid
|
||||
from bson import tz_util
|
||||
from pymongo import errors
|
||||
|
||||
from marconi.openstack.common.gettextutils import _
|
||||
import marconi.openstack.common.log as logging
|
||||
from marconi.openstack.common import timeutils
|
||||
from marconi.queues.storage import errors as storage_errors
|
||||
@ -238,9 +240,9 @@ def get_partition(num_partitions, queue, project=None):
|
||||
|
||||
|
||||
def raises_conn_error(func):
|
||||
"""Handles mongodb ConnectionFailure error
|
||||
"""Handles the MongoDB ConnectionFailure error.
|
||||
|
||||
This decorator catches mongodb's ConnectionFailure
|
||||
This decorator catches MongoDB's ConnectionFailure
|
||||
error and raises Marconi's ConnectionError instead.
|
||||
"""
|
||||
|
||||
@ -255,6 +257,46 @@ def raises_conn_error(func):
|
||||
return wrapper
|
||||
|
||||
|
||||
def retries_on_autoreconnect(func):
|
||||
"""Causes the wrapped function to be re-called on AutoReconnect.
|
||||
|
||||
This decorator catches MongoDB's AutoReconnect error and retries
|
||||
the function call.
|
||||
|
||||
.. Note::
|
||||
Assumes that the decorated function has defined self.driver.mongodb_conf
|
||||
so that `max_reconnect_attempts` and `reconnect_sleep` can be taken
|
||||
into account.
|
||||
|
||||
.. Warning:: The decorated function must be idempotent.
|
||||
"""
|
||||
|
||||
@functools.wraps(func)
|
||||
def wrapper(self, *args, **kwargs):
|
||||
# TODO(kgriffs): Figure out a way to not have to rely on the
|
||||
# presence of `mongodb_conf`
|
||||
max_attemps = self.driver.mongodb_conf.max_reconnect_attempts
|
||||
sleep_sec = self.driver.mongodb_conf.reconnect_sleep
|
||||
|
||||
for attempt in range(max_attemps):
|
||||
try:
|
||||
return func(self, *args, **kwargs)
|
||||
break
|
||||
|
||||
except errors.AutoReconnect as ex:
|
||||
LOG.warn(_(u'Caught AutoReconnect, retrying the '
|
||||
'call to {0}').format(func))
|
||||
|
||||
time.sleep(sleep_sec * (2 ** attempt))
|
||||
else:
|
||||
LOG.error(_(u'Caught AutoReconnect, maximum attempts '
|
||||
'to {0} exceeded.').format(func))
|
||||
|
||||
raise ex
|
||||
|
||||
return wrapper
|
||||
|
||||
|
||||
class HookedCursor(object):
|
||||
|
||||
def __init__(self, cursor, denormalizer):
|
||||
|
@ -132,7 +132,7 @@ class ShardsBaseTest(base.TestBase):
|
||||
|
||||
def test_put_existing_overwrites(self):
|
||||
# NOTE(cabrera): setUp creates default shard
|
||||
expect = {'weight': 20, 'uri': 'sqlalchemy://other'}
|
||||
expect = self.doc
|
||||
self.simulate_put(self.shard,
|
||||
body=json.dumps(expect))
|
||||
self.assertEqual(self.srmock.status, falcon.HTTP_201)
|
||||
|
@ -12,3 +12,5 @@ port = 8888
|
||||
[drivers:storage:mongodb]
|
||||
uri = mongodb://127.0.0.1:27017
|
||||
database = marconi_test
|
||||
max_reconnect_attempts = 3
|
||||
reconnect_sleep = 0.001
|
@ -13,6 +13,7 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import collections
|
||||
import datetime
|
||||
import time
|
||||
import uuid
|
||||
@ -44,6 +45,20 @@ def _cleanup_databases(controller):
|
||||
|
||||
class MongodbUtilsTest(testing.TestBase):
|
||||
|
||||
config_file = 'wsgi_mongodb.conf'
|
||||
|
||||
def setUp(self):
|
||||
super(MongodbUtilsTest, self).setUp()
|
||||
|
||||
self.conf.register_opts(options.MONGODB_OPTIONS,
|
||||
group=options.MONGODB_GROUP)
|
||||
|
||||
self.mongodb_conf = self.conf[options.MONGODB_GROUP]
|
||||
|
||||
MockDriver = collections.namedtuple('MockDriver', 'mongodb_conf')
|
||||
|
||||
self.driver = MockDriver(self.mongodb_conf)
|
||||
|
||||
def test_scope_queue_name(self):
|
||||
self.assertEqual(utils.scope_queue_name('my-q'), '/my-q')
|
||||
self.assertEqual(utils.scope_queue_name('my-q', None), '/my-q')
|
||||
@ -84,6 +99,34 @@ class MongodbUtilsTest(testing.TestBase):
|
||||
self.assertRaises(ValueError, utils.calculate_backoff, 10, 10, 2, 0)
|
||||
self.assertRaises(ValueError, utils.calculate_backoff, 11, 10, 2, 0)
|
||||
|
||||
def test_retries_on_autoreconnect(self):
|
||||
num_calls = [0]
|
||||
|
||||
@utils.retries_on_autoreconnect
|
||||
def _raises_autoreconnect(self):
|
||||
num_calls[0] += 1
|
||||
raise pymongo.errors.AutoReconnect()
|
||||
|
||||
self.assertRaises(pymongo.errors.AutoReconnect,
|
||||
_raises_autoreconnect, self)
|
||||
self.assertEqual(num_calls, [self.mongodb_conf.max_reconnect_attempts])
|
||||
|
||||
def test_retries_on_autoreconnect_neg(self):
|
||||
num_calls = [0]
|
||||
|
||||
@utils.retries_on_autoreconnect
|
||||
def _raises_autoreconnect(self):
|
||||
num_calls[0] += 1
|
||||
|
||||
# NOTE(kgriffs): Don't exceed until the last attempt
|
||||
if num_calls[0] < self.mongodb_conf.max_reconnect_attempts:
|
||||
raise pymongo.errors.AutoReconnect()
|
||||
|
||||
# NOTE(kgriffs): Test that this does *not* raise AutoReconnect
|
||||
_raises_autoreconnect(self)
|
||||
|
||||
self.assertEqual(num_calls, [self.mongodb_conf.max_reconnect_attempts])
|
||||
|
||||
|
||||
@testing.requires_mongodb
|
||||
class MongodbDriverTest(testing.TestBase):
|
||||
|
Loading…
x
Reference in New Issue
Block a user