Handle AutoReconnect errors.

AutoReconnect errors are not being handled and so, propagated to the client.

This patch adds a safe_call decorator. The purpose of safe_call is to catch
ConnectionFailure and raise a ConnectionError instead. Future patches will make
the transport catch ConnectionError and handle it correctly instead of
propagating it to the client.

All storage back-end should support this.

Fixes bug: #1169821

Change-Id: I523232a7cefbd00082447403ceb3abada9af6db3
This commit is contained in:
Flaper Fesp 2013-07-10 00:14:36 +02:00
parent 45dd041fa7
commit 1ca46da302
8 changed files with 67 additions and 12 deletions

View File

@ -14,6 +14,12 @@
# limitations under the License. # limitations under the License.
class ConnectionError(Exception):
"""Raised when the connection with the back-end
was lost.
"""
class DoesNotExist(Exception): class DoesNotExist(Exception):
pass pass

View File

@ -60,6 +60,7 @@ class ClaimController(storage.ClaimBase):
queue_controller = self.driver.queue_controller queue_controller = self.driver.queue_controller
return queue_controller._get_id(queue, project) return queue_controller._get_id(queue, project)
@utils.raises_conn_error
def get(self, queue, claim_id, project=None): def get(self, queue, claim_id, project=None):
msg_ctrl = self.driver.message_controller msg_ctrl = self.driver.message_controller
@ -102,6 +103,7 @@ class ClaimController(storage.ClaimBase):
return (claim, messages) return (claim, messages)
@utils.raises_conn_error
def create(self, queue, metadata, project=None, limit=10): def create(self, queue, metadata, project=None, limit=10):
"""Creates a claim. """Creates a claim.
@ -180,6 +182,7 @@ class ClaimController(storage.ClaimBase):
claim, messages = self.get(queue, oid, project=project) claim, messages = self.get(queue, oid, project=project)
return (str(oid), messages) return (str(oid), messages)
@utils.raises_conn_error
def update(self, queue, claim_id, metadata, project=None): def update(self, queue, claim_id, metadata, project=None):
try: try:
cid = utils.to_oid(claim_id) cid = utils.to_oid(claim_id)
@ -225,6 +228,7 @@ class ClaimController(storage.ClaimBase):
{'$set': {'e': expires, 't': ttl}}, {'$set': {'e': expires, 't': ttl}},
upsert=False, multi=True) upsert=False, multi=True)
@utils.raises_conn_error
def delete(self, queue, claim_id, project=None): def delete(self, queue, claim_id, project=None):
msg_ctrl = self.driver.message_controller msg_ctrl = self.driver.message_controller
msg_ctrl.unclaim(claim_id) msg_ctrl.unclaim(claim_id)

View File

@ -365,6 +365,7 @@ class MessageController(storage.MessageBase):
yield utils.HookedCursor(messages, denormalizer) yield utils.HookedCursor(messages, denormalizer)
yield str(marker_id['next']) yield str(marker_id['next'])
@utils.raises_conn_error
def get(self, queue, message_ids, project=None): def get(self, queue, message_ids, project=None):
if not isinstance(message_ids, list): if not isinstance(message_ids, list):
message_ids = [message_ids] message_ids = [message_ids]
@ -394,6 +395,7 @@ class MessageController(storage.MessageBase):
return utils.HookedCursor(messages, denormalizer) return utils.HookedCursor(messages, denormalizer)
@utils.raises_conn_error
def post(self, queue, messages, client_uuid, project=None): def post(self, queue, messages, client_uuid, project=None):
now = timeutils.utcnow() now = timeutils.utcnow()
queue_id = self._get_queue_id(queue, project) queue_id = self._get_queue_id(queue, project)
@ -523,6 +525,7 @@ class MessageController(storage.MessageBase):
succeeded_ids = map(str, aggregated_results) succeeded_ids = map(str, aggregated_results)
raise exceptions.MessageConflict(queue, project, succeeded_ids) raise exceptions.MessageConflict(queue, project, succeeded_ids)
@utils.raises_conn_error
def delete(self, queue, message_id, project=None, claim=None): def delete(self, queue, message_id, project=None, claim=None):
try: try:
mid = utils.to_oid(message_id) mid = utils.to_oid(message_id)

View File

@ -24,6 +24,7 @@ Field Mappings:
import marconi.openstack.common.log as logging import marconi.openstack.common.log as logging
from marconi import storage from marconi import storage
from marconi.storage import exceptions from marconi.storage import exceptions
from marconi.storage.mongodb import utils
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
@ -94,21 +95,22 @@ class QueueController(storage.QueueBase):
cursor = cursor.limit(limit).sort('n') cursor = cursor.limit(limit).sort('n')
marker_name = {} marker_name = {}
def normalizer(records): def normalizer(record):
for rec in records: queue = {'name': record['n']}
queue = {'name': rec['n']} marker_name['next'] = queue['name']
marker_name['next'] = queue['name'] if detailed:
if detailed: queue['metadata'] = record['m']
queue['metadata'] = rec['m'] return queue
yield queue
yield normalizer(cursor) yield utils.HookedCursor(cursor, normalizer)
yield marker_name['next'] yield marker_name and marker_name['next']
@utils.raises_conn_error
def get(self, name, project=None): def get(self, name, project=None):
queue = self._get(name, project) queue = self._get(name, project)
return queue.get('m', {}) return queue.get('m', {})
@utils.raises_conn_error
def upsert(self, name, metadata, project=None): def upsert(self, name, metadata, project=None):
super(QueueController, self).upsert(name, metadata, project) super(QueueController, self).upsert(name, metadata, project)
@ -120,10 +122,12 @@ class QueueController(storage.QueueBase):
return not rst['updatedExisting'] return not rst['updatedExisting']
@utils.raises_conn_error
def delete(self, name, project=None): def delete(self, name, project=None):
self.driver.message_controller._purge_queue(name, project) self.driver.message_controller._purge_queue(name, project)
self._col.remove({'p': project, 'n': name}) self._col.remove({'p': project, 'n': name})
@utils.raises_conn_error
def stats(self, name, project=None): def stats(self, name, project=None):
queue_id = self._get_id(name, project) queue_id = self._get_id(name, project)
controller = self.driver.message_controller controller = self.driver.message_controller
@ -138,5 +142,6 @@ class QueueController(storage.QueueBase):
} }
} }
@utils.raises_conn_error
def actions(self, name, project=None, marker=None, limit=10): def actions(self, name, project=None, marker=None, limit=10):
raise NotImplementedError raise NotImplementedError

View File

@ -14,19 +14,24 @@
# limitations under the License. # limitations under the License.
import collections import collections
import functools
import random import random
import re import re
from bson import errors as berrors from bson import errors as berrors
from bson import objectid from bson import objectid
from pymongo import errors
from marconi.common import exceptions from marconi.common import exceptions
import marconi.openstack.common.log as logging
from marconi.openstack.common import timeutils from marconi.openstack.common import timeutils
from marconi.storage import exceptions as storage_exceptions from marconi.storage import exceptions as storage_exceptions
DUP_MARKER_REGEX = re.compile(r'\$queue_marker\s+dup key: { : [^:]+: (\d)+') DUP_MARKER_REGEX = re.compile(r'\$queue_marker\s+dup key: { : [^:]+: (\d)+')
LOG = logging.getLogger(__name__)
def dup_marker_from_error(error_message): def dup_marker_from_error(error_message):
"""Extracts the duplicate marker from a MongoDB error string. """Extracts the duplicate marker from a MongoDB error string.
@ -135,6 +140,25 @@ def oid_utc(oid):
raise TypeError(_('Expected ObjectId and got %s') % type(oid)) raise TypeError(_('Expected ObjectId and got %s') % type(oid))
def raises_conn_error(func):
"""Handles mongodb ConnectionFailure error
This decorator catches mongodb's ConnectionFailure
exceptions and raises Marconi's ConnectionError instead.
"""
@functools.wraps(func)
def wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except errors.ConnectionFailure:
# NOTE(flaper87): Raise the error
msg = "ConnectionFailure caught"
LOG.error(msg)
raise storage_exceptions.ConnectionError(msg)
return wrapper
class HookedCursor(object): class HookedCursor(object):
def __init__(self, cursor, denormalizer): def __init__(self, cursor, denormalizer):
@ -150,6 +174,7 @@ class HookedCursor(object):
def __len__(self): def __len__(self):
return self.cursor.count(True) return self.cursor.count(True)
@raises_conn_error
def next(self): def next(self):
item = self.cursor.next() item = self.cursor.next()
return self.denormalizer(item) return self.denormalizer(item)

View File

@ -17,6 +17,9 @@ import os
import random import random
import time import time
import mock
from pymongo import cursor
import pymongo.errors
from testtools import matchers from testtools import matchers
from marconi.common import exceptions from marconi.common import exceptions
@ -127,6 +130,15 @@ class MongodbQueueTests(base.QueueControllerTest):
col = self.message_controller._col col = self.message_controller._col
self.assertEqual(col.find({'q': qid}).count(), 0) self.assertEqual(col.find({'q': qid}).count(), 0)
def test_raises_connection_error(self):
with mock.patch.object(cursor.Cursor, 'next', autospec=True) as method:
error = pymongo.errors.ConnectionFailure()
method.side_effect = error
queues = self.controller.list().next()
self.assertRaises(storage.exceptions.ConnectionError, queues.next)
class MongodbMessageTests(base.MessageControllerTest): class MongodbMessageTests(base.MessageControllerTest):

View File

@ -1,10 +1,10 @@
# Packaging # Packaging
mock
distribute>=0.6.24 distribute>=0.6.24
# Unit testing # Unit testing
discover discover
fixtures fixtures
mox
python-subunit python-subunit
testrepository testrepository
testtools testtools
@ -15,4 +15,4 @@ nose-exclude
openstack.nose_plugin openstack.nose_plugin
# Metrics and style # Metrics and style
hacking hacking

View File

@ -28,4 +28,4 @@ commands = {posargs}
[flake8] [flake8]
builtins = _,__MARCONI_SETUP__ builtins = _,__MARCONI_SETUP__
exclude = .venv,.git,.tox,dist,doc,*openstack/common*,*lib/python*,*.egg,.update-venv exclude = .venv*,.git,.tox,dist,doc,*openstack/common*,*lib/python*,*.egg,.update-venv