From 1ca46da302a358413d859681b1ad3bac8171538b Mon Sep 17 00:00:00 2001 From: Flaper Fesp Date: Wed, 10 Jul 2013 00:14:36 +0200 Subject: [PATCH] Handle AutoReconnect errors. AutoReconnect errors are not being handled and so, propagated to the client. This patch adds a safe_call decorator. The purpose of safe_call is to catch ConnectionFailure and raise a ConnectionError instead. Future patches will make the transport catch ConnectionError and handle it correctly instead of propagating it to the client. All storage back-end should support this. Fixes bug: #1169821 Change-Id: I523232a7cefbd00082447403ceb3abada9af6db3 --- marconi/storage/exceptions.py | 6 ++++++ marconi/storage/mongodb/claims.py | 4 ++++ marconi/storage/mongodb/messages.py | 3 +++ marconi/storage/mongodb/queues.py | 23 ++++++++++++-------- marconi/storage/mongodb/utils.py | 25 ++++++++++++++++++++++ marconi/tests/storage/test_impl_mongodb.py | 12 +++++++++++ test-requirements.txt | 4 ++-- tox.ini | 2 +- 8 files changed, 67 insertions(+), 12 deletions(-) diff --git a/marconi/storage/exceptions.py b/marconi/storage/exceptions.py index f787a8cc9..a49ff5b8c 100644 --- a/marconi/storage/exceptions.py +++ b/marconi/storage/exceptions.py @@ -14,6 +14,12 @@ # limitations under the License. +class ConnectionError(Exception): + """Raised when the connection with the back-end + was lost. + """ + + class DoesNotExist(Exception): pass diff --git a/marconi/storage/mongodb/claims.py b/marconi/storage/mongodb/claims.py index 423b665f4..0981b73d7 100644 --- a/marconi/storage/mongodb/claims.py +++ b/marconi/storage/mongodb/claims.py @@ -60,6 +60,7 @@ class ClaimController(storage.ClaimBase): queue_controller = self.driver.queue_controller return queue_controller._get_id(queue, project) + @utils.raises_conn_error def get(self, queue, claim_id, project=None): msg_ctrl = self.driver.message_controller @@ -102,6 +103,7 @@ class ClaimController(storage.ClaimBase): return (claim, messages) + @utils.raises_conn_error def create(self, queue, metadata, project=None, limit=10): """Creates a claim. @@ -180,6 +182,7 @@ class ClaimController(storage.ClaimBase): claim, messages = self.get(queue, oid, project=project) return (str(oid), messages) + @utils.raises_conn_error def update(self, queue, claim_id, metadata, project=None): try: cid = utils.to_oid(claim_id) @@ -225,6 +228,7 @@ class ClaimController(storage.ClaimBase): {'$set': {'e': expires, 't': ttl}}, upsert=False, multi=True) + @utils.raises_conn_error def delete(self, queue, claim_id, project=None): msg_ctrl = self.driver.message_controller msg_ctrl.unclaim(claim_id) diff --git a/marconi/storage/mongodb/messages.py b/marconi/storage/mongodb/messages.py index 15dec1e61..175becefd 100644 --- a/marconi/storage/mongodb/messages.py +++ b/marconi/storage/mongodb/messages.py @@ -365,6 +365,7 @@ class MessageController(storage.MessageBase): yield utils.HookedCursor(messages, denormalizer) yield str(marker_id['next']) + @utils.raises_conn_error def get(self, queue, message_ids, project=None): if not isinstance(message_ids, list): message_ids = [message_ids] @@ -394,6 +395,7 @@ class MessageController(storage.MessageBase): return utils.HookedCursor(messages, denormalizer) + @utils.raises_conn_error def post(self, queue, messages, client_uuid, project=None): now = timeutils.utcnow() queue_id = self._get_queue_id(queue, project) @@ -523,6 +525,7 @@ class MessageController(storage.MessageBase): succeeded_ids = map(str, aggregated_results) raise exceptions.MessageConflict(queue, project, succeeded_ids) + @utils.raises_conn_error def delete(self, queue, message_id, project=None, claim=None): try: mid = utils.to_oid(message_id) diff --git a/marconi/storage/mongodb/queues.py b/marconi/storage/mongodb/queues.py index 14f976437..c9d5ae888 100644 --- a/marconi/storage/mongodb/queues.py +++ b/marconi/storage/mongodb/queues.py @@ -24,6 +24,7 @@ Field Mappings: import marconi.openstack.common.log as logging from marconi import storage from marconi.storage import exceptions +from marconi.storage.mongodb import utils LOG = logging.getLogger(__name__) @@ -94,21 +95,22 @@ class QueueController(storage.QueueBase): cursor = cursor.limit(limit).sort('n') marker_name = {} - def normalizer(records): - for rec in records: - queue = {'name': rec['n']} - marker_name['next'] = queue['name'] - if detailed: - queue['metadata'] = rec['m'] - yield queue + def normalizer(record): + queue = {'name': record['n']} + marker_name['next'] = queue['name'] + if detailed: + queue['metadata'] = record['m'] + return queue - yield normalizer(cursor) - yield marker_name['next'] + yield utils.HookedCursor(cursor, normalizer) + yield marker_name and marker_name['next'] + @utils.raises_conn_error def get(self, name, project=None): queue = self._get(name, project) return queue.get('m', {}) + @utils.raises_conn_error def upsert(self, name, metadata, project=None): super(QueueController, self).upsert(name, metadata, project) @@ -120,10 +122,12 @@ class QueueController(storage.QueueBase): return not rst['updatedExisting'] + @utils.raises_conn_error def delete(self, name, project=None): self.driver.message_controller._purge_queue(name, project) self._col.remove({'p': project, 'n': name}) + @utils.raises_conn_error def stats(self, name, project=None): queue_id = self._get_id(name, project) controller = self.driver.message_controller @@ -138,5 +142,6 @@ class QueueController(storage.QueueBase): } } + @utils.raises_conn_error def actions(self, name, project=None, marker=None, limit=10): raise NotImplementedError diff --git a/marconi/storage/mongodb/utils.py b/marconi/storage/mongodb/utils.py index 35a05d66c..680778198 100644 --- a/marconi/storage/mongodb/utils.py +++ b/marconi/storage/mongodb/utils.py @@ -14,19 +14,24 @@ # limitations under the License. import collections +import functools import random import re from bson import errors as berrors from bson import objectid +from pymongo import errors from marconi.common import exceptions +import marconi.openstack.common.log as logging from marconi.openstack.common import timeutils from marconi.storage import exceptions as storage_exceptions DUP_MARKER_REGEX = re.compile(r'\$queue_marker\s+dup key: { : [^:]+: (\d)+') +LOG = logging.getLogger(__name__) + def dup_marker_from_error(error_message): """Extracts the duplicate marker from a MongoDB error string. @@ -135,6 +140,25 @@ def oid_utc(oid): raise TypeError(_('Expected ObjectId and got %s') % type(oid)) +def raises_conn_error(func): + """Handles mongodb ConnectionFailure error + + This decorator catches mongodb's ConnectionFailure + exceptions and raises Marconi's ConnectionError instead. + """ + + @functools.wraps(func) + def wrapper(*args, **kwargs): + try: + return func(*args, **kwargs) + except errors.ConnectionFailure: + # NOTE(flaper87): Raise the error + msg = "ConnectionFailure caught" + LOG.error(msg) + raise storage_exceptions.ConnectionError(msg) + return wrapper + + class HookedCursor(object): def __init__(self, cursor, denormalizer): @@ -150,6 +174,7 @@ class HookedCursor(object): def __len__(self): return self.cursor.count(True) + @raises_conn_error def next(self): item = self.cursor.next() return self.denormalizer(item) diff --git a/marconi/tests/storage/test_impl_mongodb.py b/marconi/tests/storage/test_impl_mongodb.py index f8778bc4b..77440f0cf 100644 --- a/marconi/tests/storage/test_impl_mongodb.py +++ b/marconi/tests/storage/test_impl_mongodb.py @@ -17,6 +17,9 @@ import os import random import time +import mock +from pymongo import cursor +import pymongo.errors from testtools import matchers from marconi.common import exceptions @@ -127,6 +130,15 @@ class MongodbQueueTests(base.QueueControllerTest): col = self.message_controller._col self.assertEqual(col.find({'q': qid}).count(), 0) + def test_raises_connection_error(self): + + with mock.patch.object(cursor.Cursor, 'next', autospec=True) as method: + error = pymongo.errors.ConnectionFailure() + method.side_effect = error + + queues = self.controller.list().next() + self.assertRaises(storage.exceptions.ConnectionError, queues.next) + class MongodbMessageTests(base.MessageControllerTest): diff --git a/test-requirements.txt b/test-requirements.txt index f11b0d57b..4177141e3 100644 --- a/test-requirements.txt +++ b/test-requirements.txt @@ -1,10 +1,10 @@ # Packaging +mock distribute>=0.6.24 # Unit testing discover fixtures -mox python-subunit testrepository testtools @@ -15,4 +15,4 @@ nose-exclude openstack.nose_plugin # Metrics and style -hacking \ No newline at end of file +hacking diff --git a/tox.ini b/tox.ini index f43996b38..31f4fc67a 100644 --- a/tox.ini +++ b/tox.ini @@ -28,4 +28,4 @@ commands = {posargs} [flake8] builtins = _,__MARCONI_SETUP__ -exclude = .venv,.git,.tox,dist,doc,*openstack/common*,*lib/python*,*.egg,.update-venv +exclude = .venv*,.git,.tox,dist,doc,*openstack/common*,*lib/python*,*.egg,.update-venv