From c13fb6f93eba49f88d88820d0c06b3d4be89d558 Mon Sep 17 00:00:00 2001 From: kgriffs Date: Thu, 30 Jan 2014 11:09:31 -0600 Subject: [PATCH] fix(MongoDB): Driver does not retry on AutoReconnect errors When a primary MongoDB node fails over to a secondary, pymongo raises an AutoReconnect error. Let's catch that and retry the operation so that we truly are Highly Available (in the sense that the user will never notice the few ms of "downtime" caused by a failover). This is particularly important when hosting backend with a DBaaS that routinely fails over the master as a way of compacting shards. NOTE: In order to get all MongoDB tests green, a tiny unrelated bug in test_shards was fixed as part of this patch. Closes-Bug: 1214973 Change-Id: Ibf172e30ec6e7fa0bbb8fdcebda9e985d1e49714 --- marconi/queues/storage/mongodb/claims.py | 10 ++++ marconi/queues/storage/mongodb/messages.py | 11 +++++ marconi/queues/storage/mongodb/options.py | 10 ++++ marconi/queues/storage/mongodb/queues.py | 15 ++++++ marconi/queues/storage/mongodb/utils.py | 46 ++++++++++++++++++- .../queues/transport/wsgi/test_shards.py | 2 +- tests/etc/wsgi_mongodb.conf | 2 + .../unit/queues/storage/test_impl_mongodb.py | 43 +++++++++++++++++ 8 files changed, 136 insertions(+), 3 deletions(-) diff --git a/marconi/queues/storage/mongodb/claims.py b/marconi/queues/storage/mongodb/claims.py index b30e48e16..cf217d4f9 100644 --- a/marconi/queues/storage/mongodb/claims.py +++ b/marconi/queues/storage/mongodb/claims.py @@ -57,6 +57,7 @@ class ClaimController(storage.Claim): """ @utils.raises_conn_error + @utils.retries_on_autoreconnect def get(self, queue, claim_id, project=None): msg_ctrl = self.driver.message_controller @@ -97,7 +98,14 @@ class ClaimController(storage.Claim): return (claim_meta, msgs) + # NOTE(kgriffs): If we get an autoreconnect or any other connection error, + # the worst that can happen is you get an orphaned claim, but it will + # expire eventually and free up those messages to be claimed again. We + # might consider setting a "claim valid" flag similar to how posting + # messages works, in order to avoid this situation if it turns out to + # be a real problem for users. @utils.raises_conn_error + @utils.retries_on_autoreconnect def create(self, queue, metadata, project=None, limit=storage.DEFAULT_MESSAGES_PER_CLAIM): """Creates a claim. @@ -190,6 +198,7 @@ class ClaimController(storage.Claim): return (str(oid), messages) @utils.raises_conn_error + @utils.retries_on_autoreconnect def update(self, queue, claim_id, metadata, project=None): cid = utils.to_oid(claim_id) if cid is None: @@ -233,6 +242,7 @@ class ClaimController(storage.Claim): upsert=False, multi=True) @utils.raises_conn_error + @utils.retries_on_autoreconnect def delete(self, queue, claim_id, project=None): msg_ctrl = self.driver.message_controller msg_ctrl._unclaim(queue, claim_id, project=project) diff --git a/marconi/queues/storage/mongodb/messages.py b/marconi/queues/storage/mongodb/messages.py index e60c17aa5..9ffb29c85 100644 --- a/marconi/queues/storage/mongodb/messages.py +++ b/marconi/queues/storage/mongodb/messages.py @@ -425,6 +425,7 @@ class MessageController(storage.Message): yield str(marker_id['next']) @utils.raises_conn_error + @utils.retries_on_autoreconnect def first(self, queue_name, project=None, sort=1): cursor = self._list(queue_name, project=project, include_claimed=True, sort=sort, @@ -437,6 +438,7 @@ class MessageController(storage.Message): return message @utils.raises_conn_error + @utils.retries_on_autoreconnect def get(self, queue_name, message_id, project=None): mid = utils.to_oid(message_id) if mid is None: @@ -460,6 +462,7 @@ class MessageController(storage.Message): return _basic_message(message[0], now) @utils.raises_conn_error + @utils.retries_on_autoreconnect def bulk_get(self, queue_name, message_ids, project=None): message_ids = [mid for mid in map(utils.to_oid, message_ids) if mid] if not message_ids: @@ -485,7 +488,13 @@ class MessageController(storage.Message): return utils.HookedCursor(messages, denormalizer) @utils.raises_conn_error + @utils.retries_on_autoreconnect def post(self, queue_name, messages, client_uuid, project=None): + # NOTE(flaper87): This method should be safe to retry on + # autoreconnect, since we've a 2-step insert for messages. + # The worst-case scenario is that we'll increase the counter + # several times and we'd end up with some non-active messages. + if not self._queue_ctrl.exists(queue_name, project): raise errors.QueueDoesNotExist(queue_name, project) @@ -664,6 +673,7 @@ class MessageController(storage.Message): succeeded_ids) @utils.raises_conn_error + @utils.retries_on_autoreconnect def delete(self, queue_name, message_id, project=None, claim=None): # NOTE(cpp-cabrera): return early - this is an invalid message # id so we won't be able to find it any way @@ -714,6 +724,7 @@ class MessageController(storage.Message): collection.remove(query['_id'], w=0) @utils.raises_conn_error + @utils.retries_on_autoreconnect def bulk_delete(self, queue_name, message_ids, project=None): message_ids = [mid for mid in map(utils.to_oid, message_ids) if mid] query = { diff --git a/marconi/queues/storage/mongodb/options.py b/marconi/queues/storage/mongodb/options.py index 007841829..1e8f53faf 100644 --- a/marconi/queues/storage/mongodb/options.py +++ b/marconi/queues/storage/mongodb/options.py @@ -53,6 +53,16 @@ MONGODB_OPTIONS = [ 'sleep interval, in order to decrease probability ' 'that parallel requests will retry at the ' 'same instant.')), + + cfg.IntOpt('max_reconnect_attempts', default=10, + help=('Maximum number of times to retry an operation that ' + 'failed due to a primary node failover.')), + + cfg.FloatOpt('reconnect_sleep', default=0.020, + help=('Base sleep interval between attempts to reconnect ' + 'after a primary node failover. ' + 'The actual sleep time increases exponentially (power ' + 'of 2) each time the operation is retried.')), ] MONGODB_GROUP = 'drivers:storage:mongodb' diff --git a/marconi/queues/storage/mongodb/queues.py b/marconi/queues/storage/mongodb/queues.py index 13e149595..03259b83b 100644 --- a/marconi/queues/storage/mongodb/queues.py +++ b/marconi/queues/storage/mongodb/queues.py @@ -192,12 +192,23 @@ class QueueController(storage.Queue): yield marker_name and marker_name['next'] @utils.raises_conn_error + @utils.retries_on_autoreconnect def get_metadata(self, name, project=None): queue = self._get(name, project) return queue.get('m', {}) @utils.raises_conn_error + # @utils.retries_on_autoreconnect def create(self, name, project=None): + # NOTE(flaper87): If the connection fails after it was called + # and we retry to insert the queue, we could end up returning + # `False` because of the `DuplicatedKeyError` although the + # queue was indeed created by this API call. + # + # TODO(kgriffs): Commented out `retries_on_autoreconnect` for + # now due to the above issue, since creating a queue is less + # important to make super HA. + try: # NOTE(kgriffs): Start counting at 1, and assume the first # message ever posted will succeed and set t to a UNIX @@ -214,11 +225,13 @@ class QueueController(storage.Queue): return True @utils.raises_conn_error + @utils.retries_on_autoreconnect def exists(self, name, project=None): query = _get_scoped_query(name, project) return self._collection.find_one(query) is not None @utils.raises_conn_error + @utils.retries_on_autoreconnect def set_metadata(self, name, metadata, project=None): rst = self._collection.update(_get_scoped_query(name, project), {'$set': {'m': metadata}}, @@ -229,11 +242,13 @@ class QueueController(storage.Queue): raise errors.QueueDoesNotExist(name, project) @utils.raises_conn_error + @utils.retries_on_autoreconnect def delete(self, name, project=None): self.driver.message_controller._purge_queue(name, project) self._collection.remove(_get_scoped_query(name, project)) @utils.raises_conn_error + @utils.retries_on_autoreconnect def stats(self, name, project=None): if not self.exists(name, project=project): raise errors.QueueDoesNotExist(name, project) diff --git a/marconi/queues/storage/mongodb/utils.py b/marconi/queues/storage/mongodb/utils.py index 858b22370..33e573977 100644 --- a/marconi/queues/storage/mongodb/utils.py +++ b/marconi/queues/storage/mongodb/utils.py @@ -18,12 +18,14 @@ import collections import datetime import functools import random +import time from bson import errors as berrors from bson import objectid from bson import tz_util from pymongo import errors +from marconi.openstack.common.gettextutils import _ import marconi.openstack.common.log as logging from marconi.openstack.common import timeutils from marconi.queues.storage import errors as storage_errors @@ -238,9 +240,9 @@ def get_partition(num_partitions, queue, project=None): def raises_conn_error(func): - """Handles mongodb ConnectionFailure error + """Handles the MongoDB ConnectionFailure error. - This decorator catches mongodb's ConnectionFailure + This decorator catches MongoDB's ConnectionFailure error and raises Marconi's ConnectionError instead. """ @@ -255,6 +257,46 @@ def raises_conn_error(func): return wrapper +def retries_on_autoreconnect(func): + """Causes the wrapped function to be re-called on AutoReconnect. + + This decorator catches MongoDB's AutoReconnect error and retries + the function call. + + .. Note:: + Assumes that the decorated function has defined self.driver.mongodb_conf + so that `max_reconnect_attempts` and `reconnect_sleep` can be taken + into account. + + .. Warning:: The decorated function must be idempotent. + """ + + @functools.wraps(func) + def wrapper(self, *args, **kwargs): + # TODO(kgriffs): Figure out a way to not have to rely on the + # presence of `mongodb_conf` + max_attemps = self.driver.mongodb_conf.max_reconnect_attempts + sleep_sec = self.driver.mongodb_conf.reconnect_sleep + + for attempt in range(max_attemps): + try: + return func(self, *args, **kwargs) + break + + except errors.AutoReconnect as ex: + LOG.warn(_(u'Caught AutoReconnect, retrying the ' + 'call to {0}').format(func)) + + time.sleep(sleep_sec * (2 ** attempt)) + else: + LOG.error(_(u'Caught AutoReconnect, maximum attempts ' + 'to {0} exceeded.').format(func)) + + raise ex + + return wrapper + + class HookedCursor(object): def __init__(self, cursor, denormalizer): diff --git a/marconi/tests/queues/transport/wsgi/test_shards.py b/marconi/tests/queues/transport/wsgi/test_shards.py index 91c958caa..bcb0473e1 100644 --- a/marconi/tests/queues/transport/wsgi/test_shards.py +++ b/marconi/tests/queues/transport/wsgi/test_shards.py @@ -132,7 +132,7 @@ class ShardsBaseTest(base.TestBase): def test_put_existing_overwrites(self): # NOTE(cabrera): setUp creates default shard - expect = {'weight': 20, 'uri': 'sqlalchemy://other'} + expect = self.doc self.simulate_put(self.shard, body=json.dumps(expect)) self.assertEqual(self.srmock.status, falcon.HTTP_201) diff --git a/tests/etc/wsgi_mongodb.conf b/tests/etc/wsgi_mongodb.conf index b09aeb7eb..1cdc042a1 100644 --- a/tests/etc/wsgi_mongodb.conf +++ b/tests/etc/wsgi_mongodb.conf @@ -12,3 +12,5 @@ port = 8888 [drivers:storage:mongodb] uri = mongodb://127.0.0.1:27017 database = marconi_test +max_reconnect_attempts = 3 +reconnect_sleep = 0.001 \ No newline at end of file diff --git a/tests/unit/queues/storage/test_impl_mongodb.py b/tests/unit/queues/storage/test_impl_mongodb.py index e7fe82d7a..1f6b342e3 100644 --- a/tests/unit/queues/storage/test_impl_mongodb.py +++ b/tests/unit/queues/storage/test_impl_mongodb.py @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import collections import datetime import time import uuid @@ -44,6 +45,20 @@ def _cleanup_databases(controller): class MongodbUtilsTest(testing.TestBase): + config_file = 'wsgi_mongodb.conf' + + def setUp(self): + super(MongodbUtilsTest, self).setUp() + + self.conf.register_opts(options.MONGODB_OPTIONS, + group=options.MONGODB_GROUP) + + self.mongodb_conf = self.conf[options.MONGODB_GROUP] + + MockDriver = collections.namedtuple('MockDriver', 'mongodb_conf') + + self.driver = MockDriver(self.mongodb_conf) + def test_scope_queue_name(self): self.assertEqual(utils.scope_queue_name('my-q'), '/my-q') self.assertEqual(utils.scope_queue_name('my-q', None), '/my-q') @@ -84,6 +99,34 @@ class MongodbUtilsTest(testing.TestBase): self.assertRaises(ValueError, utils.calculate_backoff, 10, 10, 2, 0) self.assertRaises(ValueError, utils.calculate_backoff, 11, 10, 2, 0) + def test_retries_on_autoreconnect(self): + num_calls = [0] + + @utils.retries_on_autoreconnect + def _raises_autoreconnect(self): + num_calls[0] += 1 + raise pymongo.errors.AutoReconnect() + + self.assertRaises(pymongo.errors.AutoReconnect, + _raises_autoreconnect, self) + self.assertEqual(num_calls, [self.mongodb_conf.max_reconnect_attempts]) + + def test_retries_on_autoreconnect_neg(self): + num_calls = [0] + + @utils.retries_on_autoreconnect + def _raises_autoreconnect(self): + num_calls[0] += 1 + + # NOTE(kgriffs): Don't exceed until the last attempt + if num_calls[0] < self.mongodb_conf.max_reconnect_attempts: + raise pymongo.errors.AutoReconnect() + + # NOTE(kgriffs): Test that this does *not* raise AutoReconnect + _raises_autoreconnect(self) + + self.assertEqual(num_calls, [self.mongodb_conf.max_reconnect_attempts]) + @testing.requires_mongodb class MongodbDriverTest(testing.TestBase):