diff --git a/marconi/queues/storage/mongodb/claims.py b/marconi/queues/storage/mongodb/claims.py index b30e48e16..cf217d4f9 100644 --- a/marconi/queues/storage/mongodb/claims.py +++ b/marconi/queues/storage/mongodb/claims.py @@ -57,6 +57,7 @@ class ClaimController(storage.Claim): """ @utils.raises_conn_error + @utils.retries_on_autoreconnect def get(self, queue, claim_id, project=None): msg_ctrl = self.driver.message_controller @@ -97,7 +98,14 @@ class ClaimController(storage.Claim): return (claim_meta, msgs) + # NOTE(kgriffs): If we get an autoreconnect or any other connection error, + # the worst that can happen is you get an orphaned claim, but it will + # expire eventually and free up those messages to be claimed again. We + # might consider setting a "claim valid" flag similar to how posting + # messages works, in order to avoid this situation if it turns out to + # be a real problem for users. @utils.raises_conn_error + @utils.retries_on_autoreconnect def create(self, queue, metadata, project=None, limit=storage.DEFAULT_MESSAGES_PER_CLAIM): """Creates a claim. @@ -190,6 +198,7 @@ class ClaimController(storage.Claim): return (str(oid), messages) @utils.raises_conn_error + @utils.retries_on_autoreconnect def update(self, queue, claim_id, metadata, project=None): cid = utils.to_oid(claim_id) if cid is None: @@ -233,6 +242,7 @@ class ClaimController(storage.Claim): upsert=False, multi=True) @utils.raises_conn_error + @utils.retries_on_autoreconnect def delete(self, queue, claim_id, project=None): msg_ctrl = self.driver.message_controller msg_ctrl._unclaim(queue, claim_id, project=project) diff --git a/marconi/queues/storage/mongodb/messages.py b/marconi/queues/storage/mongodb/messages.py index e60c17aa5..9ffb29c85 100644 --- a/marconi/queues/storage/mongodb/messages.py +++ b/marconi/queues/storage/mongodb/messages.py @@ -425,6 +425,7 @@ class MessageController(storage.Message): yield str(marker_id['next']) @utils.raises_conn_error + @utils.retries_on_autoreconnect def first(self, queue_name, project=None, sort=1): cursor = self._list(queue_name, project=project, include_claimed=True, sort=sort, @@ -437,6 +438,7 @@ class MessageController(storage.Message): return message @utils.raises_conn_error + @utils.retries_on_autoreconnect def get(self, queue_name, message_id, project=None): mid = utils.to_oid(message_id) if mid is None: @@ -460,6 +462,7 @@ class MessageController(storage.Message): return _basic_message(message[0], now) @utils.raises_conn_error + @utils.retries_on_autoreconnect def bulk_get(self, queue_name, message_ids, project=None): message_ids = [mid for mid in map(utils.to_oid, message_ids) if mid] if not message_ids: @@ -485,7 +488,13 @@ class MessageController(storage.Message): return utils.HookedCursor(messages, denormalizer) @utils.raises_conn_error + @utils.retries_on_autoreconnect def post(self, queue_name, messages, client_uuid, project=None): + # NOTE(flaper87): This method should be safe to retry on + # autoreconnect, since we've a 2-step insert for messages. + # The worst-case scenario is that we'll increase the counter + # several times and we'd end up with some non-active messages. + if not self._queue_ctrl.exists(queue_name, project): raise errors.QueueDoesNotExist(queue_name, project) @@ -664,6 +673,7 @@ class MessageController(storage.Message): succeeded_ids) @utils.raises_conn_error + @utils.retries_on_autoreconnect def delete(self, queue_name, message_id, project=None, claim=None): # NOTE(cpp-cabrera): return early - this is an invalid message # id so we won't be able to find it any way @@ -714,6 +724,7 @@ class MessageController(storage.Message): collection.remove(query['_id'], w=0) @utils.raises_conn_error + @utils.retries_on_autoreconnect def bulk_delete(self, queue_name, message_ids, project=None): message_ids = [mid for mid in map(utils.to_oid, message_ids) if mid] query = { diff --git a/marconi/queues/storage/mongodb/options.py b/marconi/queues/storage/mongodb/options.py index 007841829..1e8f53faf 100644 --- a/marconi/queues/storage/mongodb/options.py +++ b/marconi/queues/storage/mongodb/options.py @@ -53,6 +53,16 @@ MONGODB_OPTIONS = [ 'sleep interval, in order to decrease probability ' 'that parallel requests will retry at the ' 'same instant.')), + + cfg.IntOpt('max_reconnect_attempts', default=10, + help=('Maximum number of times to retry an operation that ' + 'failed due to a primary node failover.')), + + cfg.FloatOpt('reconnect_sleep', default=0.020, + help=('Base sleep interval between attempts to reconnect ' + 'after a primary node failover. ' + 'The actual sleep time increases exponentially (power ' + 'of 2) each time the operation is retried.')), ] MONGODB_GROUP = 'drivers:storage:mongodb' diff --git a/marconi/queues/storage/mongodb/queues.py b/marconi/queues/storage/mongodb/queues.py index 13e149595..03259b83b 100644 --- a/marconi/queues/storage/mongodb/queues.py +++ b/marconi/queues/storage/mongodb/queues.py @@ -192,12 +192,23 @@ class QueueController(storage.Queue): yield marker_name and marker_name['next'] @utils.raises_conn_error + @utils.retries_on_autoreconnect def get_metadata(self, name, project=None): queue = self._get(name, project) return queue.get('m', {}) @utils.raises_conn_error + # @utils.retries_on_autoreconnect def create(self, name, project=None): + # NOTE(flaper87): If the connection fails after it was called + # and we retry to insert the queue, we could end up returning + # `False` because of the `DuplicatedKeyError` although the + # queue was indeed created by this API call. + # + # TODO(kgriffs): Commented out `retries_on_autoreconnect` for + # now due to the above issue, since creating a queue is less + # important to make super HA. + try: # NOTE(kgriffs): Start counting at 1, and assume the first # message ever posted will succeed and set t to a UNIX @@ -214,11 +225,13 @@ class QueueController(storage.Queue): return True @utils.raises_conn_error + @utils.retries_on_autoreconnect def exists(self, name, project=None): query = _get_scoped_query(name, project) return self._collection.find_one(query) is not None @utils.raises_conn_error + @utils.retries_on_autoreconnect def set_metadata(self, name, metadata, project=None): rst = self._collection.update(_get_scoped_query(name, project), {'$set': {'m': metadata}}, @@ -229,11 +242,13 @@ class QueueController(storage.Queue): raise errors.QueueDoesNotExist(name, project) @utils.raises_conn_error + @utils.retries_on_autoreconnect def delete(self, name, project=None): self.driver.message_controller._purge_queue(name, project) self._collection.remove(_get_scoped_query(name, project)) @utils.raises_conn_error + @utils.retries_on_autoreconnect def stats(self, name, project=None): if not self.exists(name, project=project): raise errors.QueueDoesNotExist(name, project) diff --git a/marconi/queues/storage/mongodb/utils.py b/marconi/queues/storage/mongodb/utils.py index 858b22370..33e573977 100644 --- a/marconi/queues/storage/mongodb/utils.py +++ b/marconi/queues/storage/mongodb/utils.py @@ -18,12 +18,14 @@ import collections import datetime import functools import random +import time from bson import errors as berrors from bson import objectid from bson import tz_util from pymongo import errors +from marconi.openstack.common.gettextutils import _ import marconi.openstack.common.log as logging from marconi.openstack.common import timeutils from marconi.queues.storage import errors as storage_errors @@ -238,9 +240,9 @@ def get_partition(num_partitions, queue, project=None): def raises_conn_error(func): - """Handles mongodb ConnectionFailure error + """Handles the MongoDB ConnectionFailure error. - This decorator catches mongodb's ConnectionFailure + This decorator catches MongoDB's ConnectionFailure error and raises Marconi's ConnectionError instead. """ @@ -255,6 +257,46 @@ def raises_conn_error(func): return wrapper +def retries_on_autoreconnect(func): + """Causes the wrapped function to be re-called on AutoReconnect. + + This decorator catches MongoDB's AutoReconnect error and retries + the function call. + + .. Note:: + Assumes that the decorated function has defined self.driver.mongodb_conf + so that `max_reconnect_attempts` and `reconnect_sleep` can be taken + into account. + + .. Warning:: The decorated function must be idempotent. + """ + + @functools.wraps(func) + def wrapper(self, *args, **kwargs): + # TODO(kgriffs): Figure out a way to not have to rely on the + # presence of `mongodb_conf` + max_attemps = self.driver.mongodb_conf.max_reconnect_attempts + sleep_sec = self.driver.mongodb_conf.reconnect_sleep + + for attempt in range(max_attemps): + try: + return func(self, *args, **kwargs) + break + + except errors.AutoReconnect as ex: + LOG.warn(_(u'Caught AutoReconnect, retrying the ' + 'call to {0}').format(func)) + + time.sleep(sleep_sec * (2 ** attempt)) + else: + LOG.error(_(u'Caught AutoReconnect, maximum attempts ' + 'to {0} exceeded.').format(func)) + + raise ex + + return wrapper + + class HookedCursor(object): def __init__(self, cursor, denormalizer): diff --git a/marconi/tests/queues/transport/wsgi/test_shards.py b/marconi/tests/queues/transport/wsgi/test_shards.py index 91c958caa..bcb0473e1 100644 --- a/marconi/tests/queues/transport/wsgi/test_shards.py +++ b/marconi/tests/queues/transport/wsgi/test_shards.py @@ -132,7 +132,7 @@ class ShardsBaseTest(base.TestBase): def test_put_existing_overwrites(self): # NOTE(cabrera): setUp creates default shard - expect = {'weight': 20, 'uri': 'sqlalchemy://other'} + expect = self.doc self.simulate_put(self.shard, body=json.dumps(expect)) self.assertEqual(self.srmock.status, falcon.HTTP_201) diff --git a/tests/etc/wsgi_mongodb.conf b/tests/etc/wsgi_mongodb.conf index b09aeb7eb..1cdc042a1 100644 --- a/tests/etc/wsgi_mongodb.conf +++ b/tests/etc/wsgi_mongodb.conf @@ -12,3 +12,5 @@ port = 8888 [drivers:storage:mongodb] uri = mongodb://127.0.0.1:27017 database = marconi_test +max_reconnect_attempts = 3 +reconnect_sleep = 0.001 \ No newline at end of file diff --git a/tests/unit/queues/storage/test_impl_mongodb.py b/tests/unit/queues/storage/test_impl_mongodb.py index e7fe82d7a..1f6b342e3 100644 --- a/tests/unit/queues/storage/test_impl_mongodb.py +++ b/tests/unit/queues/storage/test_impl_mongodb.py @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import collections import datetime import time import uuid @@ -44,6 +45,20 @@ def _cleanup_databases(controller): class MongodbUtilsTest(testing.TestBase): + config_file = 'wsgi_mongodb.conf' + + def setUp(self): + super(MongodbUtilsTest, self).setUp() + + self.conf.register_opts(options.MONGODB_OPTIONS, + group=options.MONGODB_GROUP) + + self.mongodb_conf = self.conf[options.MONGODB_GROUP] + + MockDriver = collections.namedtuple('MockDriver', 'mongodb_conf') + + self.driver = MockDriver(self.mongodb_conf) + def test_scope_queue_name(self): self.assertEqual(utils.scope_queue_name('my-q'), '/my-q') self.assertEqual(utils.scope_queue_name('my-q', None), '/my-q') @@ -84,6 +99,34 @@ class MongodbUtilsTest(testing.TestBase): self.assertRaises(ValueError, utils.calculate_backoff, 10, 10, 2, 0) self.assertRaises(ValueError, utils.calculate_backoff, 11, 10, 2, 0) + def test_retries_on_autoreconnect(self): + num_calls = [0] + + @utils.retries_on_autoreconnect + def _raises_autoreconnect(self): + num_calls[0] += 1 + raise pymongo.errors.AutoReconnect() + + self.assertRaises(pymongo.errors.AutoReconnect, + _raises_autoreconnect, self) + self.assertEqual(num_calls, [self.mongodb_conf.max_reconnect_attempts]) + + def test_retries_on_autoreconnect_neg(self): + num_calls = [0] + + @utils.retries_on_autoreconnect + def _raises_autoreconnect(self): + num_calls[0] += 1 + + # NOTE(kgriffs): Don't exceed until the last attempt + if num_calls[0] < self.mongodb_conf.max_reconnect_attempts: + raise pymongo.errors.AutoReconnect() + + # NOTE(kgriffs): Test that this does *not* raise AutoReconnect + _raises_autoreconnect(self) + + self.assertEqual(num_calls, [self.mongodb_conf.max_reconnect_attempts]) + @testing.requires_mongodb class MongodbDriverTest(testing.TestBase):