Refactored storage controllers into standalone modules

Since controllers are getting pretty long, I moved
each class into its own module. I also took the
opportunity to reconcile some stylistic differences
between the MongoDB and SQLite drivers.

Change-Id: If93a3e8db3c75c24aa993304c52f8d119842ce74
This commit is contained in:
kgriffs 2013-06-24 21:47:42 -04:00
parent 4208286d53
commit 4d388dfb87
11 changed files with 1534 additions and 1364 deletions

View File

@ -0,0 +1,230 @@
# Copyright (c) 2013 Red Hat, Inc.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Implements the MongoDB storage controller for claims.
Field Mappings:
In order to reduce the disk / memory space used,
field names will be, most of the time, the first
letter of their long name.
import datetime
from bson import objectid
import marconi.openstack.common.log as logging
from marconi.openstack.common import timeutils
from marconi import storage
from import exceptions
from import utils
LOG = logging.getLogger(__name__)
class ClaimController(storage.ClaimBase):
"""Implements claim resource operations using MongoDB.
No dedicated collection is being used
for claims.
Claims are created in the messages
collection and live within messages, that is,
in the c field.
This implementation certainly uses more space
on disk but reduces the number of queries to
be executed and the time needed to retrieve
claims and claimed messages.
As for the memory usage, this implementation
requires less memory since a single index is
required. The index is a compound index between
the claim id and it's expiration timestamp.
def _get_queue_id(self, queue, project):
queue_controller = self.driver.queue_controller
return queue_controller._get_id(queue, project)
def get(self, queue, claim_id, project=None):
msg_ctrl = self.driver.message_controller
# Check whether the queue exists or not
qid = self._get_queue_id(queue, project)
# Base query, always check expire time
now = timeutils.utcnow()
cid = utils.to_oid(claim_id)
except ValueError:
raise exceptions.ClaimDoesNotExist()
age = now - utils.oid_utc(cid)
def messages(msg_iter):
msg =
yield msg.pop('claim')
yield msg
# Smoke it!
for msg in msg_iter:
del msg['claim']
yield msg
# Lets get claim's data
# from the first message
# in the iterator
messages = messages(msg_ctrl.claimed(qid, cid, now))
claim =
claim = {
'age': age.seconds,
'ttl': claim.pop('t'),
'id': str(claim['id']),
except StopIteration:
raise exceptions.ClaimDoesNotExist(cid, queue, project)
return (claim, messages)
def create(self, queue, metadata, project=None, limit=10):
"""Creates a claim.
This implementation was done in a best-effort fashion.
In order to create a claim we need to get a list
of messages that can be claimed. Once we have that
list we execute a query filtering by the ids returned
by the previous query.
Since there's a lot of space for race conditions here,
we'll check if the number of updated records is equal to
the max number of messages to claim. If the number of updated
messages is lower than limit we'll try to claim the remaining
number of messages.
This 2 queries are required because there's no way, as for the
time being, to executed an update on a limited number of records
msg_ctrl = self.driver.message_controller
# We don't need the qid here but
# we need to verify it exists.
qid = self._get_queue_id(queue, project)
ttl = int(metadata.get('ttl', 60))
oid = objectid.ObjectId()
now = timeutils.utcnow()
ttl_delta = datetime.timedelta(seconds=ttl)
expires = now + ttl_delta
meta = {
'id': oid,
't': ttl,
'e': expires,
# Get a list of active, not claimed nor expired
# messages that could be claimed.
msgs =, fields={'_id': 1})
msgs = msgs.limit(limit).sort('_id')
messages = iter([])
# Lets respect the limit
# during the count
if msgs.count(True) == 0:
return (str(oid), messages)
ids = [msg['_id'] for msg in msgs]
now = timeutils.utcnow()
# Set claim field for messages in ids
updated = msg_ctrl._col.update({'_id': {'$in': ids},
'$or': [
{'': None},
'': {'$ne': None},
'c.e': {'$lte': now}
{'$set': {'c': meta}}, upsert=False,
# NOTE(flaper87): Dirty hack!
# This sets the expiration time to
# `expires` on messages that would
# expire before claim.
msg_ctrl._col.update({'q': queue,
'e': {'$lt': expires},
'': oid},
{'$set': {'e': expires, 't': ttl}},
upsert=False, multi=True)
if updated != 0:
claim, messages = self.get(queue, oid, project=project)
return (str(oid), messages)
def update(self, queue, claim_id, metadata, project=None):
cid = utils.to_oid(claim_id)
except ValueError:
raise exceptions.ClaimDoesNotExist(claim_id, queue, project)
now = timeutils.utcnow()
ttl = int(metadata.get('ttl', 60))
ttl_delta = datetime.timedelta(seconds=ttl)
expires = now + ttl_delta
if now > expires:
msg = _('New ttl will make the claim expires')
raise ValueError(msg)
qid = self._get_queue_id(queue, project)
msg_ctrl = self.driver.message_controller
claimed = msg_ctrl.claimed(qid, cid, expires=now, limit=1)
except StopIteration:
raise exceptions.ClaimDoesNotExist(claim_id, queue, project)
meta = {
'id': cid,
't': ttl,
'e': expires,
msg_ctrl._col.update({'q': qid, '': cid},
{'$set': {'c': meta}},
upsert=False, multi=True)
# NOTE(flaper87): Dirty hack!
# This sets the expiration time to
# `expires` on messages that would
# expire before claim.
msg_ctrl._col.update({'q': qid,
'e': {'$lt': expires},
'': cid},
{'$set': {'e': expires, 't': ttl}},
upsert=False, multi=True)
def delete(self, queue, claim_id, project=None):
msg_ctrl = self.driver.message_controller

View File

@ -13,854 +13,19 @@
# See the License for the specific language governing permissions and
# limitations under the License.
"""Implements Mongodb storage controllers.
"""Exports Mongodb storage controllers.
Field Mappings:
In order to reduce the disk / memory space used,
fields name will be, most of the time, the first
letter of their long name. Fields mapping will be
updated and documented in each class.
updated and documented in each controller class.
import collections
import datetime
import time
from import claims
from import messages
from import queues
from bson import objectid
import pymongo.errors
import marconi.openstack.common.log as logging
from marconi.openstack.common import timeutils
from marconi import storage
from import exceptions
from import options
from import utils
LOG = logging.getLogger(__name__)
class QueueController(storage.QueueBase):
"""Implements queue resource operations using MongoDB.
Name Field
name -> n
project -> p
counter -> c
metadata -> m
def __init__(self, *args, **kwargs):
super(QueueController, self).__init__(*args, **kwargs)
self._col = self.driver.db['queues']
# NOTE(flaper87): This creates a unique compound index for
# project and name. Using project as the first field of the
# index allows for querying by project and project+name.
# This is also useful for retrieving the queues list for
# as specific project, for example. Order Matters!
self._col.ensure_index([('p', 1), ('n', 1)], unique=True)
# Helpers
def _get(self, name, project=None, fields={'m': 1, '_id': 0}):
queue = self._col.find_one({'p': project, 'n': name}, fields=fields)
if queue is None:
raise exceptions.QueueDoesNotExist(name, project)
return queue
def _get_id(self, name, project=None):
"""Just like the `get` method, but only returns the queue's id
:returns: Queue's `ObjectId`
queue = self._get(name, project, fields=['_id'])
return queue.get('_id')
def _get_ids(self):
"""Returns a generator producing a list of all queue IDs."""
cursor = self._col.find({}, fields={'_id': 1})
return (doc['_id'] for doc in cursor)
# Interface
def list(self, project=None, marker=None,
limit=10, detailed=False):
query = {'p': project}
if marker:
query['n'] = {'$gt': marker}
fields = {'n': 1, '_id': 0}
if detailed:
fields['m'] = 1
cursor = self._col.find(query, fields=fields)
cursor = cursor.limit(limit).sort('n')
marker_name = {}
def normalizer(records):
for rec in records:
queue = {'name': rec['n']}
marker_name['next'] = queue['name']
if detailed:
queue['metadata'] = rec['m']
yield queue
yield normalizer(cursor)
yield marker_name['next']
def get(self, name, project=None):
queue = self._get(name, project)
return queue.get('m', {})
def upsert(self, name, metadata, project=None):
super(QueueController, self).upsert(name, metadata, project)
rst = self._col.update({'p': project, 'n': name},
{'$set': {'m': metadata, 'c': 1}},
return not rst['updatedExisting']
def delete(self, name, project=None):
self.driver.message_controller._purge_queue(name, project)
self._col.remove({'p': project, 'n': name})
def stats(self, name, project=None):
queue_id = self._get_id(name, project)
controller = self.driver.message_controller
active =
claimed = controller.claimed(queue_id)
return {
'actions': 0,
'messages': {
'claimed': claimed.count(),
'free': active.count(),
def actions(self, name, project=None, marker=None, limit=10):
raise NotImplementedError
class MessageController(storage.MessageBase):
"""Implements message resource operations using MongoDB.
Name Field
queue_id -> q
expires -> e
ttl -> t
uuid -> u
claim -> c
marker -> k
def __init__(self, *args, **kwargs):
super(MessageController, self).__init__(*args, **kwargs)
# Cache for convenience and performance (avoids extra lookups and
# recreating the range for every request.)
self._queue_controller = self.driver.queue_controller
self._db = self.driver.db
self._retry_range = range(options.CFG.max_attempts)
# Make sure indexes exist before,
# doing anything.
self._col = self._db['messages']
# NOTE(flaper87): This index is used mostly in the
# active method but some parts of it are used in
# other places.
# * q: Mostly everywhere. It must stay at the
# beginning of the index.
# * e: Together with q is used for getting a
# specific message. (see `get`)
active_fields = [
('q', 1),
('e', 1),
('c.e', 1),
('k', 1),
('_id', -1),
# Index used for claims
claimed_fields = [
('q', 1),
('', 1),
('c.e', 1),
('_id', -1),
# Index used for _next_marker() and also to ensure
# uniqueness.
# NOTE(kgriffs): This index must be unique so that
# inserting a message with the same marker to the
# same queue will fail; this is used to detect a
# race condition which can cause an observer client
# to miss a message when there is more than one
# producer posting messages to the same queue, in
# parallel.
self._col.ensure_index([('q', 1), ('k', -1)],
# Helpers
def _get_queue_id(self, queue, project=None):
return self._queue_controller._get_id(queue, project)
def _get_queue_ids(self):
return self._queue_controller._get_ids()
def _next_marker(self, queue_id):
"""Retrieves the next message marker for a given queue.
This helper is used to generate monotonic pagination
markers that are saved as part of the message
document. Simply taking the max of the current message
markers works, since Marconi always leaves the most recent
message in the queue (new queues always return 1).
Note 1: Markers are scoped per-queue and so are *not*
globally unique or globally ordered.
Note 2: If two or more requests to this method are made
in parallel, this method will return the same
marker. This is done intentionally so that the caller
can detect a parallel message post, allowing it to
mitigate race conditions between producer and
observer clients.
:param queue_id: queue ID
:returns: next message marker as an integer
document = self._col.find_one({'q': queue_id},
sort=[('k', -1)],
fields={'k': 1, '_id': 0})
# NOTE(kgriffs): this approach is faster than using 'or'
return 1 if document is None else (document['k'] + 1)
def _backoff_sleep(self, attempt):
"""Sleep between retries using a jitter algorithm.
Mitigates thrashing between multiple parallel requests, and
creates backpressure on clients to slow down the rate
at which they submit requests.
:param attempt: current attempt number, zero-based
seconds = utils.calculate_backoff(attempt, options.CFG.max_attempts,
def _count_expired(self, queue_id):
"""Counts the number of expired messages in a queue.
:param queue_id: id for the queue to stat
query = {
'q': queue_id,
'e': {'$lte': timeutils.utcnow()},
return self._col.find(query).count()
def _remove_expired(self, queue_id):
"""Removes all expired messages except for the most recent
in each queue.
This method is used in lieu of mongo's TTL index since we
must always leave at least one message in the queue for
calculating the next marker.
Note that expired messages are only removed if their count
exceeds options.CFG.gc_threshold.
:param queue_id: id for the queue from which to remove
expired messages
if options.CFG.gc_threshold <= self._count_expired(queue_id):
# Get the message with the highest marker, and leave
# it in the queue
head = self._col.find_one({'q': queue_id},
sort=[('k', -1)],
fields={'_id': 1})
if head is None:
# Assume queue was just deleted via a parallel request
LOG.warning(_('Queue %s is empty or missing.') % queue_id)
query = {
'q': queue_id,
'e': {'$lte': timeutils.utcnow()},
'_id': {'$ne': head['_id']}
def _purge_queue(self, queue, project=None):
"""Removes all messages from the queue.
Warning: Only use this when deleting the queue; otherwise
you can cause a side-effect of reseting the marker counter
which can cause clients to miss tons of messages.
If the queue does not exist, this method fails silently.
:param queue: name of the queue to purge
:param project: name of the project to which the queue belongs
qid = self._get_queue_id(queue, project)
self._col.remove({'q': qid}, w=0)
except exceptions.QueueDoesNotExist:
# Interface
def all(self):
return self._col.find()
def active(self, queue_id, marker=None, echo=False,
client_uuid=None, fields=None):
now = timeutils.utcnow()
query = {
# Messages must belong to this queue
'q': utils.to_oid(queue_id),
# The messages can not be expired
'e': {'$gt': now},
# Include messages that are part of expired claims
'c.e': {'$lte': now},
if fields and not isinstance(fields, (dict, list)):
raise TypeError(_('Fields must be an instance of list / dict'))
if not echo and client_uuid:
query['u'] = {'$ne': client_uuid}
if marker:
query['k'] = {'$gt': marker}
return self._col.find(query, fields=fields)
def claimed(self, queue_id, claim_id=None, expires=None, limit=None):
query = {
'': claim_id,
'c.e': {'$gt': expires or timeutils.utcnow()},
'q': utils.to_oid(queue_id),
if not claim_id:
# lookup over to use the index
query[''] = {'$ne': None}
msgs = self._col.find(query, sort=[('_id', 1)])
if limit:
msgs = msgs.limit(limit)
now = timeutils.utcnow()
def denormalizer(msg):
oid = msg['_id']
age = now - utils.oid_utc(oid)
return {
'id': str(oid),
'age': age.seconds,
'ttl': msg['t'],
'body': msg['b'],
'claim': msg['c']
return utils.HookedCursor(msgs, denormalizer)
def unclaim(self, claim_id):
cid = utils.to_oid(claim_id)
except ValueError:
self._col.update({'': cid},
{'$set': {'c': {'id': None, 'e': 0}}},
upsert=False, multi=True)
def remove_expired(self, project=None):
"""Removes all expired messages except for the most recent
in each queue.
This method is used in lieu of mongo's TTL index since we
must always leave at least one message in the queue for
calculating the next marker.
Warning: This method is expensive, since it must perform
separate queries for each queue, due to the requirement that
it must leave at least one message in each queue, and it
is impractical to send a huge list of _id's to filter out
in a single call. That being said, this is somewhat mitigated
by the gc_threshold configuration option, which reduces the
frequency at which the DB is locked for non-busy queues. Also,
since .remove is run on each queue seperately, this reduces
the duration that any given lock is held, avoiding blocking
regular writes.
# TODO(kgriffs): Optimize first by batching the .removes, second
# by setting a 'last inserted ID' in the queue collection for
# each message inserted (TBD, may cause problematic side-effect),
# and third, by changing the marker algorithm such that it no
# longer depends on retaining the last message in the queue!
for id in self._get_queue_ids():
def list(self, queue, project=None, marker=None,
limit=10, echo=False, client_uuid=None):
if marker is not None:
marker = int(marker)
except ValueError:
raise exceptions.MalformedMarker()
qid = self._get_queue_id(queue, project)
messages =, marker, echo, client_uuid)
messages = messages.limit(limit).sort('_id')
marker_id = {}
now = timeutils.utcnow()
def denormalizer(msg):
oid = msg['_id']
age = now - utils.oid_utc(oid)
marker_id['next'] = msg['k']
return {
'id': str(oid),
'age': age.seconds,
'ttl': msg['t'],
'body': msg['b'],
yield utils.HookedCursor(messages, denormalizer)
yield str(marker_id['next'])
def get(self, queue, message_ids, project=None):
if not isinstance(message_ids, list):
message_ids = [message_ids]
message_ids = [utils.to_oid(id) for id in message_ids]
now = timeutils.utcnow()
# Base query, always check expire time
query = {
'q': self._get_queue_id(queue, project),
'e': {'$gt': now},
'_id': {'$in': message_ids},
messages = self._col.find(query)
def denormalizer(msg):
oid = msg['_id']
age = now - utils.oid_utc(oid)
return {
'id': str(oid),
'age': age.seconds,
'ttl': msg['t'],
'body': msg['b'],
return utils.HookedCursor(messages, denormalizer)
def post(self, queue, messages, client_uuid, project=None):
now = timeutils.utcnow()
queue_id = self._get_queue_id(queue, project)
# Set the next basis marker for the first attempt.
next_marker = self._next_marker(queue_id)
# Results are aggregated across all attempts
# NOTE(kgriffs): lazy instantiation
aggregated_results = None
# NOTE(kgriffs): This avoids iterating over messages twice,
# since pymongo internally will iterate over them all to
# encode as bson before submitting to mongod. By using a
# generator, we can produce each message only once,
# as needed by pymongo. At the same time, each message is
# cached in case we need to retry any of them.
message_gen = (
't': message['ttl'],
'q': queue_id,
'e': now + datetime.timedelta(seconds=message['ttl']),
'u': client_uuid,
'c': {'id': None, 'e': now},
'b': message['body'] if 'body' in message else {},
'k': next_marker + index,
for index, message in enumerate(messages)
prepared_messages, cached_messages = utils.cached_gen(message_gen)
# Use a retry range for sanity, although we expect
# to rarely, if ever, reach the maximum number of
# retries.
for attempt in self._retry_range:
ids = self._col.insert(prepared_messages)
# NOTE(kgriffs): Only use aggregated results if we must,
# which saves some cycles on the happy path.
if aggregated_results:
ids = aggregated_results
# Log a message if we retried, for debugging perf issues
if attempt != 0:
message = _('%(attempts)d attempt(s) required to post '
'%(num_messages)d messages to queue '
message %= dict(queue_id=queue_id, attempts=attempt + 1,
return map(str, ids)
except pymongo.errors.DuplicateKeyError as ex:
# Try again with the remaining messages
# NOTE(kgriffs): This can be used in conjunction with the
# log line, above, that is emitted after all messages have
# been posted, to guage how long it is taking for messages
# to be posted to a given queue, or overall.
# TODO(kgriffs): Add transaction ID to help match up loglines
if attempt == 0:
message = _('First attempt failed while adding messages '
'to queue %s for current request') % queue_id
# TODO(kgriffs): Record stats of how often retries happen,
# and how many attempts, on average, are required to insert
# messages.
# NOTE(kgriffs): Slice prepared_messages. We have to interpret
# the error message to get the duplicate key, which gives
# us the marker that had a dupe, allowing us to extrapolate
# how many messages were consumed, since markers are monotonic
# counters.
duplicate_marker = utils.dup_marker_from_error(str(ex))
failed_index = duplicate_marker - next_marker
# First time here, convert the deque to a list
# to support slicing.
if isinstance(cached_messages, collections.deque):
cached_messages = list(cached_messages)
# Put the successful one's IDs into aggregated_results.
succeeded_messages = cached_messages[:failed_index]
succeeded_ids = [msg['_id'] for msg in succeeded_messages]
# Results are aggregated across all attempts
if aggregated_results is None:
aggregated_results = succeeded_ids
# Retry the remaining messages with a new sequence
# of markers.
prepared_messages = cached_messages[failed_index:]
next_marker = self._next_marker(queue_id)
for index, message in enumerate(prepared_messages):
message['k'] = next_marker + index
# Chill out to avoid thrashing/thundering
except Exception as ex:
# TODO(kgriffs): Query the DB to get the last marker that
# made it, and extrapolate from there to figure out what
# needs to be retried. Definitely retry on AutoReconnect;
# other types of errors TBD.
message = _('Hit maximum number of attempts (%(max)s) for queue '
'%(id)s in project %(project)s')
message %= dict(max=options.CFG.max_attempts, id=queue_id,
succeeded_ids = map(str, aggregated_results)
raise exceptions.MessageConflict(queue, project, succeeded_ids)
def delete(self, queue, message_id, project=None, claim=None):
mid = utils.to_oid(message_id)
query = {
'q': self._get_queue_id(queue, project),
'_id': mid
if claim:
now = timeutils.utcnow()
query['e'] = {'$gt': now}
message = self._col.find_one(query)
if message is None:
cid = utils.to_oid(claim)
if not ('c' in message and
message['c']['id'] == cid and
message['c']['e'] > now):
raise exceptions.ClaimNotPermitted(message_id, claim)
self._col.remove(query['_id'], w=0)
self._col.remove(query, w=0)
except exceptions.QueueDoesNotExist:
class ClaimController(storage.ClaimBase):
"""Implements claim resource operations using MongoDB.
No dedicated collection is being used
for claims.
Claims are created in the messages
collection and live within messages, that is,
in the c field.
This implementation certainly uses more space
on disk but reduces the number of queries to
be executed and the time needed to retrieve
claims and claimed messages.
As for the memory usage, this implementation
requires less memory since a single index is
required. The index is a compound index between
the claim id and it's expiration timestamp.
def _get_queue_id(self, queue, project):
queue_controller = self.driver.queue_controller
return queue_controller._get_id(queue, project)
def get(self, queue, claim_id, project=None):
msg_ctrl = self.driver.message_controller
# Check whether the queue exists or not
qid = self._get_queue_id(queue, project)
# Base query, always check expire time
now = timeutils.utcnow()
cid = utils.to_oid(claim_id)
except ValueError:
raise exceptions.ClaimDoesNotExist()
age = now - utils.oid_utc(cid)
def messages(msg_iter):
msg =
yield msg.pop('claim')
yield msg
# Smoke it!
for msg in msg_iter:
del msg['claim']
yield msg
# Lets get claim's data
# from the first message
# in the iterator
messages = messages(msg_ctrl.claimed(qid, cid, now))
claim =
claim = {
'age': age.seconds,
'ttl': claim.pop('t'),
'id': str(claim['id']),
except StopIteration:
raise exceptions.ClaimDoesNotExist(cid, queue, project)
return (claim, messages)
def create(self, queue, metadata, project=None, limit=10):
"""Creates a claim.
This implementation was done in a best-effort fashion.
In order to create a claim we need to get a list
of messages that can be claimed. Once we have that
list we execute a query filtering by the ids returned
by the previous query.
Since there's a lot of space for race conditions here,
we'll check if the number of updated records is equal to
the max number of messages to claim. If the number of updated
messages is lower than limit we'll try to claim the remaining
number of messages.
This 2 queries are required because there's no way, as for the
time being, to executed an update on a limited number of records
msg_ctrl = self.driver.message_controller
# We don't need the qid here but
# we need to verify it exists.
qid = self._get_queue_id(queue, project)
ttl = int(metadata.get('ttl', 60))
oid = objectid.ObjectId()
now = timeutils.utcnow()
ttl_delta = datetime.timedelta(seconds=ttl)
expires = now + ttl_delta
meta = {
'id': oid,
't': ttl,
'e': expires,
# Get a list of active, not claimed nor expired
# messages that could be claimed.
msgs =, fields={'_id': 1})
msgs = msgs.limit(limit).sort('_id')
messages = iter([])
# Lets respect the limit
# during the count
if msgs.count(True) == 0:
return (str(oid), messages)
ids = [msg['_id'] for msg in msgs]
now = timeutils.utcnow()
# Set claim field for messages in ids
updated = msg_ctrl._col.update({'_id': {'$in': ids},
'$or': [
{'': None},
'': {'$ne': None},
'c.e': {'$lte': now}
{'$set': {'c': meta}}, upsert=False,
# NOTE(flaper87): Dirty hack!
# This sets the expiration time to
# `expires` on messages that would
# expire before claim.
msg_ctrl._col.update({'q': queue,
'e': {'$lt': expires},
'': oid},
{'$set': {'e': expires, 't': ttl}},
upsert=False, multi=True)
if updated != 0:
claim, messages = self.get(queue, oid, project=project)
return (str(oid), messages)
def update(self, queue, claim_id, metadata, project=None):
cid = utils.to_oid(claim_id)
except ValueError:
raise exceptions.ClaimDoesNotExist(claim_id, queue, project)
now = timeutils.utcnow()
ttl = int(metadata.get('ttl', 60))
ttl_delta = datetime.timedelta(seconds=ttl)
expires = now + ttl_delta
if now > expires:
msg = _('New ttl will make the claim expires')
raise ValueError(msg)
qid = self._get_queue_id(queue, project)
msg_ctrl = self.driver.message_controller
claimed = msg_ctrl.claimed(qid, cid, expires=now, limit=1)
except StopIteration:
raise exceptions.ClaimDoesNotExist(claim_id, queue, project)
meta = {
'id': cid,
't': ttl,
'e': expires,
msg_ctrl._col.update({'q': qid, '': cid},
{'$set': {'c': meta}},
upsert=False, multi=True)
# NOTE(flaper87): Dirty hack!
# This sets the expiration time to
# `expires` on messages that would
# expire before claim.
msg_ctrl._col.update({'q': qid,
'e': {'$lt': expires},
'': cid},
{'$set': {'e': expires, 't': ttl}},
upsert=False, multi=True)
def delete(self, queue, claim_id, project=None):
msg_ctrl = self.driver.message_controller
ClaimController = claims.ClaimController
MessageController = messages.MessageController
QueueController = queues.QueueController

View File

@ -0,0 +1,554 @@
# Copyright (c) 2013 Red Hat, Inc.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Implements MongoDB the storage controller for messages.
Field Mappings:
In order to reduce the disk / memory space used,
field names will be, most of the time, the first
letter of their long name.
import collections
import datetime
import time
import pymongo.errors
import marconi.openstack.common.log as logging
from marconi.openstack.common import timeutils
from marconi import storage
from import exceptions
from import options
from import utils
LOG = logging.getLogger(__name__)
class MessageController(storage.MessageBase):
"""Implements message resource operations using MongoDB.
Name Field
queue_id -> q
expires -> e
ttl -> t
uuid -> u
claim -> c
marker -> k
def __init__(self, *args, **kwargs):
super(MessageController, self).__init__(*args, **kwargs)
# Cache for convenience and performance (avoids extra lookups and
# recreating the range for every request.)
self._queue_controller = self.driver.queue_controller
self._db = self.driver.db
self._retry_range = range(options.CFG.max_attempts)
# Make sure indexes exist before,
# doing anything.
self._col = self._db['messages']
# NOTE(flaper87): This index is used mostly in the
# active method but some parts of it are used in
# other places.
# * q: Mostly everywhere. It must stay at the
# beginning of the index.
# * e: Together with q is used for getting a
# specific message. (see `get`)
active_fields = [
('q', 1),
('e', 1),
('c.e', 1),
('k', 1),
('_id', -1),
# Index used for claims
claimed_fields = [
('q', 1),
('', 1),
('c.e', 1),
('_id', -1),
# Index used for _next_marker() and also to ensure
# uniqueness.
# NOTE(kgriffs): This index must be unique so that
# inserting a message with the same marker to the
# same queue will fail; this is used to detect a
# race condition which can cause an observer client
# to miss a message when there is more than one
# producer posting messages to the same queue, in
# parallel.
self._col.ensure_index([('q', 1), ('k', -1)],
# Helpers
def _get_queue_id(self, queue, project=None):
return self._queue_controller._get_id(queue, project)
def _get_queue_ids(self):
return self._queue_controller._get_ids()
def _next_marker(self, queue_id):
"""Retrieves the next message marker for a given queue.
This helper is used to generate monotonic pagination
markers that are saved as part of the message
document. Simply taking the max of the current message
markers works, since Marconi always leaves the most recent
message in the queue (new queues always return 1).
Note 1: Markers are scoped per-queue and so are *not*
globally unique or globally ordered.
Note 2: If two or more requests to this method are made
in parallel, this method will return the same
marker. This is done intentionally so that the caller
can detect a parallel message post, allowing it to
mitigate race conditions between producer and
observer clients.
:param queue_id: queue ID
:returns: next message marker as an integer
document = self._col.find_one({'q': queue_id},
sort=[('k', -1)],
fields={'k': 1, '_id': 0})
# NOTE(kgriffs): this approach is faster than using 'or'
return 1 if document is None else (document['k'] + 1)
def _backoff_sleep(self, attempt):
"""Sleep between retries using a jitter algorithm.
Mitigates thrashing between multiple parallel requests, and
creates backpressure on clients to slow down the rate
at which they submit requests.
:param attempt: current attempt number, zero-based
seconds = utils.calculate_backoff(attempt, options.CFG.max_attempts,
def _count_expired(self, queue_id):
"""Counts the number of expired messages in a queue.
:param queue_id: id for the queue to stat
query = {
'q': queue_id,
'e': {'$lte': timeutils.utcnow()},
return self._col.find(query).count()
def _remove_expired(self, queue_id):
"""Removes all expired messages except for the most recent
in each queue.
This method is used in lieu of mongo's TTL index since we
must always leave at least one message in the queue for
calculating the next marker.
Note that expired messages are only removed if their count
exceeds options.CFG.gc_threshold.
:param queue_id: id for the queue from which to remove
expired messages
if options.CFG.gc_threshold <= self._count_expired(queue_id):
# Get the message with the highest marker, and leave
# it in the queue
head = self._col.find_one({'q': queue_id},
sort=[('k', -1)],
fields={'_id': 1})
if head is None:
# Assume queue was just deleted via a parallel request
LOG.warning(_('Queue %s is empty or missing.') % queue_id)
query = {
'q': queue_id,
'e': {'$lte': timeutils.utcnow()},
'_id': {'$ne': head['_id']}
def _purge_queue(self, queue, project=None):
"""Removes all messages from the queue.
Warning: Only use this when deleting the queue; otherwise
you can cause a side-effect of reseting the marker counter
which can cause clients to miss tons of messages.
If the queue does not exist, this method fails silently.
:param queue: name of the queue to purge
:param project: name of the project to which the queue belongs
qid = self._get_queue_id(queue, project)
self._col.remove({'q': qid}, w=0)
except exceptions.QueueDoesNotExist:
# Interface
def all(self):
return self._col.find()
def active(self, queue_id, marker=None, echo=False,
client_uuid=None, fields=None):
now = timeutils.utcnow()
query = {
# Messages must belong to this queue
'q': utils.to_oid(queue_id),
# The messages can not be expired
'e': {'$gt': now},
# Include messages that are part of expired claims
'c.e': {'$lte': now},
if fields and not isinstance(fields, (dict, list)):
raise TypeError(_('Fields must be an instance of list / dict'))
if not echo and client_uuid:
query['u'] = {'$ne': client_uuid}
if marker:
query['k'] = {'$gt': marker}
return self._col.find(query, fields=fields)
def claimed(self, queue_id, claim_id=None, expires=None, limit=None):
query = {
'': claim_id,
'c.e': {'$gt': expires or timeutils.utcnow()},
'q': utils.to_oid(queue_id),
if not claim_id:
# lookup over to use the index
query[''] = {'$ne': None}
msgs = self._col.find(query, sort=[('_id', 1)])
if limit:
msgs = msgs.limit(limit)
now = timeutils.utcnow()
def denormalizer(msg):
oid = msg['_id']
age = now - utils.oid_utc(oid)
return {
'id': str(oid),
'age': age.seconds,
'ttl': msg['t'],
'body': msg['b'],
'claim': msg['c']
return utils.HookedCursor(msgs, denormalizer)
def unclaim(self, claim_id):
cid = utils.to_oid(claim_id)
except ValueError:
self._col.update({'': cid},
{'$set': {'c': {'id': None, 'e': 0}}},
upsert=False, multi=True)
def remove_expired(self, project=None):
"""Removes all expired messages except for the most recent
in each queue.
This method is used in lieu of mongo's TTL index since we
must always leave at least one message in the queue for
calculating the next marker.
Warning: This method is expensive, since it must perform
separate queries for each queue, due to the requirement that
it must leave at least one message in each queue, and it
is impractical to send a huge list of _id's to filter out
in a single call. That being said, this is somewhat mitigated
by the gc_threshold configuration option, which reduces the
frequency at which the DB is locked for non-busy queues. Also,
since .remove is run on each queue seperately, this reduces
the duration that any given lock is held, avoiding blocking
regular writes.
# TODO(kgriffs): Optimize first by batching the .removes, second
# by setting a 'last inserted ID' in the queue collection for
# each message inserted (TBD, may cause problematic side-effect),
# and third, by changing the marker algorithm such that it no
# longer depends on retaining the last message in the queue!
for id in self._get_queue_ids():
def list(self, queue, project=None, marker=None,
limit=10, echo=False, client_uuid=None):
if marker is not None:
marker = int(marker)
except ValueError:
raise exceptions.MalformedMarker()
qid = self._get_queue_id(queue, project)
messages =, marker, echo, client_uuid)
messages = messages.limit(limit).sort('_id')
marker_id = {}
now = timeutils.utcnow()
def denormalizer(msg):
oid = msg['_id']
age = now - utils.oid_utc(oid)
marker_id['next'] = msg['k']
return {
'id': str(oid),
'age': age.seconds,
'ttl': msg['t'],
'body': msg['b'],
yield utils.HookedCursor(messages, denormalizer)
yield str(marker_id['next'])
def get(self, queue, message_ids, project=None):
if not isinstance(message_ids, list):
message_ids = [message_ids]
message_ids = [utils.to_oid(id) for id in message_ids]
now = timeutils.utcnow()
# Base query, always check expire time
query = {
'q': self._get_queue_id(queue, project),
'e': {'$gt': now},
'_id': {'$in': message_ids},
messages = self._col.find(query)
def denormalizer(msg):
oid = msg['_id']
age = now - utils.oid_utc(oid)
return {
'id': str(oid),
'age': age.seconds,
'ttl': msg['t'],
'body': msg['b'],
return utils.HookedCursor(messages, denormalizer)
def post(self, queue, messages, client_uuid, project=None):
now = timeutils.utcnow()
queue_id = self._get_queue_id(queue, project)
# Set the next basis marker for the first attempt.
next_marker = self._next_marker(queue_id)
# Results are aggregated across all attempts
# NOTE(kgriffs): lazy instantiation
aggregated_results = None
# NOTE(kgriffs): This avoids iterating over messages twice,
# since pymongo internally will iterate over them all to
# encode as bson before submitting to mongod. By using a
# generator, we can produce each message only once,
# as needed by pymongo. At the same time, each message is
# cached in case we need to retry any of them.
message_gen = (
't': message['ttl'],
'q': queue_id,
'e': now + datetime.timedelta(seconds=message['ttl']),
'u': client_uuid,
'c': {'id': None, 'e': now},
'b': message['body'] if 'body' in message else {},
'k': next_marker + index,
for index, message in enumerate(messages)
prepared_messages, cached_messages = utils.cached_gen(message_gen)
# Use a retry range for sanity, although we expect
# to rarely, if ever, reach the maximum number of
# retries.
for attempt in self._retry_range:
ids = self._col.insert(prepared_messages)
# NOTE(kgriffs): Only use aggregated results if we must,
# which saves some cycles on the happy path.
if aggregated_results:
ids = aggregated_results
# Log a message if we retried, for debugging perf issues
if attempt != 0:
message = _('%(attempts)d attempt(s) required to post '
'%(num_messages)d messages to queue '
message %= dict(queue_id=queue_id, attempts=attempt + 1,
return map(str, ids)
except pymongo.errors.DuplicateKeyError as ex:
# Try again with the remaining messages
# NOTE(kgriffs): This can be used in conjunction with the
# log line, above, that is emitted after all messages have
# been posted, to guage how long it is taking for messages
# to be posted to a given queue, or overall.
# TODO(kgriffs): Add transaction ID to help match up loglines
if attempt == 0:
message = _('First attempt failed while adding messages '
'to queue %s for current request') % queue_id
# TODO(kgriffs): Record stats of how often retries happen,
# and how many attempts, on average, are required to insert
# messages.
# NOTE(kgriffs): Slice prepared_messages. We have to interpret
# the error message to get the duplicate key, which gives
# us the marker that had a dupe, allowing us to extrapolate
# how many messages were consumed, since markers are monotonic
# counters.
duplicate_marker = utils.dup_marker_from_error(str(ex))
failed_index = duplicate_marker - next_marker
# First time here, convert the deque to a list
# to support slicing.
if isinstance(cached_messages, collections.deque):
cached_messages = list(cached_messages)
# Put the successful one's IDs into aggregated_results.
succeeded_messages = cached_messages[:failed_index]
succeeded_ids = [msg['_id'] for msg in succeeded_messages]
# Results are aggregated across all attempts
if aggregated_results is None:
aggregated_results = succeeded_ids
# Retry the remaining messages with a new sequence
# of markers.
prepared_messages = cached_messages[failed_index:]
next_marker = self._next_marker(queue_id)
for index, message in enumerate(prepared_messages):
message['k'] = next_marker + index
# Chill out to avoid thrashing/thundering
except Exception as ex:
# TODO(kgriffs): Query the DB to get the last marker that
# made it, and extrapolate from there to figure out what
# needs to be retried. Definitely retry on AutoReconnect;
# other types of errors TBD.
message = _('Hit maximum number of attempts (%(max)s) for queue '
'%(id)s in project %(project)s')
message %= dict(max=options.CFG.max_attempts, id=queue_id,
succeeded_ids = map(str, aggregated_results)
raise exceptions.MessageConflict(queue, project, succeeded_ids)
def delete(self, queue, message_id, project=None, claim=None):
mid = utils.to_oid(message_id)
query = {
'q': self._get_queue_id(queue, project),
'_id': mid
if claim:
now = timeutils.utcnow()
query['e'] = {'$gt': now}
message = self._col.find_one(query)
if message is None:
cid = utils.to_oid(claim)
if not ('c' in message and
message['c']['id'] == cid and
message['c']['e'] > now):
raise exceptions.ClaimNotPermitted(message_id, claim)
self._col.remove(query['_id'], w=0)
self._col.remove(query, w=0)
except exceptions.QueueDoesNotExist:

View File

@ -0,0 +1,142 @@
# Copyright (c) 2013 Red Hat, Inc.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Implements the MongoDB storage controller for queues.
Field Mappings:
In order to reduce the disk / memory space used,
field names will be, most of the time, the first
letter of their long name.
import marconi.openstack.common.log as logging
from marconi import storage
from import exceptions
LOG = logging.getLogger(__name__)
class QueueController(storage.QueueBase):
"""Implements queue resource operations using MongoDB.
Name Field
name -> n
project -> p
counter -> c
metadata -> m
def __init__(self, *args, **kwargs):
super(QueueController, self).__init__(*args, **kwargs)
self._col = self.driver.db['queues']
# NOTE(flaper87): This creates a unique compound index for
# project and name. Using project as the first field of the
# index allows for querying by project and project+name.
# This is also useful for retrieving the queues list for
# as specific project, for example. Order Matters!
self._col.ensure_index([('p', 1), ('n', 1)], unique=True)
# Helpers
def _get(self, name, project=None, fields={'m': 1, '_id': 0}):
queue = self._col.find_one({'p': project, 'n': name}, fields=fields)
if queue is None:
raise exceptions.QueueDoesNotExist(name, project)
return queue
def _get_id(self, name, project=None):
"""Just like the `get` method, but only returns the queue's id
:returns: Queue's `ObjectId`
queue = self._get(name, project, fields=['_id'])
return queue.get('_id')
def _get_ids(self):
"""Returns a generator producing a list of all queue IDs."""
cursor = self._col.find({}, fields={'_id': 1})
return (doc['_id'] for doc in cursor)
# Interface
def list(self, project=None, marker=None,
limit=10, detailed=False):
query = {'p': project}
if marker:
query['n'] = {'$gt': marker}
fields = {'n': 1, '_id': 0}
if detailed:
fields['m'] = 1
cursor = self._col.find(query, fields=fields)
cursor = cursor.limit(limit).sort('n')
marker_name = {}
def normalizer(records):
for rec in records:
queue = {'name': rec['n']}
marker_name['next'] = queue['name']
if detailed:
queue['metadata'] = rec['m']
yield queue
yield normalizer(cursor)
yield marker_name['next']
def get(self, name, project=None):
queue = self._get(name, project)
return queue.get('m', {})
def upsert(self, name, metadata, project=None):
super(QueueController, self).upsert(name, metadata, project)
rst = self._col.update({'p': project, 'n': name},
{'$set': {'m': metadata, 'c': 1}},
return not rst['updatedExisting']
def delete(self, name, project=None):
self.driver.message_controller._purge_queue(name, project)
self._col.remove({'p': project, 'n': name})
def stats(self, name, project=None):
queue_id = self._get_id(name, project)
controller = self.driver.message_controller
active =
claimed = controller.claimed(queue_id)
return {
'actions': 0,
'messages': {
'claimed': claimed.count(),
'free': active.count(),
def actions(self, name, project=None, marker=None, limit=10):
raise NotImplementedError

View File

@ -0,0 +1,177 @@
# Copyright (c) 2013 Rackspace, Inc.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from import base
from import exceptions
from import utils
class ClaimController(base.ClaimBase):
def __init__(self, driver):
self.driver = driver'''
create table
if not exists
Claims (
created DATETIME, -- seconds since the Julian day
FOREIGN KEY(qid) references Queues(id) on delete cascade
create table
if not exists
Locked (
msgid INTEGER,
FOREIGN KEY(cid) references Claims(id) on delete cascade,
FOREIGN KEY(msgid) references Messages(id) on delete cascade
def get(self, queue, claim_id, project):
if project is None:
project = ''
with self.driver('deferred'):
id, ttl, age = self.driver.get('''
select, C.ttl, julianday() * 86400.0 - C.created
from Queues as Q join Claims as C
on = C.qid
where C.ttl > julianday() * 86400.0 - C.created
and = ? and project = ? and name = ?
''', utils.cid_decode(claim_id), project, queue)
return (
'id': claim_id,
'ttl': ttl,
'age': int(age),
except (utils.NoResult, exceptions.MalformedID()):
raise exceptions.ClaimDoesNotExist(claim_id, queue, project)
def create(self, queue, metadata, project, limit=10):
if project is None:
project = ''
with self.driver('immediate'):
qid = utils.get_qid(self.driver, queue, project)
# Clean up all expired claims in this queue'''
delete from Claims
where ttl <= julianday() * 86400.0 - created
and qid = ?''', qid)'''
insert into Claims
values (null, ?, ?, julianday() * 86400.0)
''', qid, metadata['ttl'])
id = self.driver.lastrowid'''
insert into Locked
select last_insert_rowid(), id
from Messages left join Locked
on id = msgid
where msgid is null
and ttl > julianday() * 86400.0 - created
and qid = ?
limit ?''', qid, limit)
self.__update_claimed(id, metadata['ttl'])
return (utils.cid_encode(id), self.__get(id))
def __get(self, cid):
records ='''
select id, content, ttl, julianday() * 86400.0 - created
from Messages join Locked
on msgid = id
where ttl > julianday() * 86400.0 - created
and cid = ?''', cid)
for id, content, ttl, age in records:
yield {
'id': utils.msgid_encode(id),
'ttl': ttl,
'age': int(age),
'body': content,
def update(self, queue, claim_id, metadata, project):
if project is None:
project = ''
id = utils.cid_decode(claim_id)
except exceptions.MalformedID:
raise exceptions.ClaimDoesNotExist(claim_id, queue, project)
with self.driver('deferred'):
# still delay the cleanup here'''
update Claims
set created = julianday() * 86400.0,
ttl = ?
where ttl > julianday() * 86400.0 - created
and id = ?
and qid = (select id from Queues
where project = ? and name = ?)
''', metadata['ttl'], id, project, queue)
if not self.driver.affected:
raise exceptions.ClaimDoesNotExist(claim_id,
self.__update_claimed(id, metadata['ttl'])
def __update_claimed(self, cid, ttl):
# Precondition: cid is not expired'''
update Messages
set created = julianday() * 86400.0,
ttl = ?
where ttl < ?
and id in (select msgid from Locked
where cid = ?)
''', ttl, ttl, cid)
def delete(self, queue, claim_id, project):
if project is None:
project = ''
cid = utils.cid_decode(claim_id)
except exceptions.MalformedID:
delete from Claims
where id = ?
and qid = (select id from Queues
where project = ? and name = ?)
''', cid, project, queue)

View File

@ -10,520 +10,16 @@
# distributed under the License is distributed on an "AS IS" BASIS,
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Exports SQLite driver controllers."""
from import base
from import exceptions
from import claims
from import messages
from import queues
class Queue(base.QueueBase):
def __init__(self, driver):
self.driver = driver'''
create table
if not exists
Queues (
project TEXT,
name TEXT,
metadata DOCUMENT,
UNIQUE(project, name)
def list(self, project, marker=None,
limit=10, detailed=False):
if project is None:
project = ''
sql = (('''
select name from Queues''' if not detailed
else '''
select name, metadata from Queues''') +
where project = ?''')
args = [project]
if marker:
sql += '''
and name > ?'''
args += [marker]
sql += '''
order by name
limit ?'''
args += [limit]
records =, *args)
marker_name = {}
def it():
for rec in records:
marker_name['next'] = rec[0]
yield ({'name': rec[0]} if not detailed
{'name': rec[0], 'metadata': rec[1]})
yield it()
yield marker_name['next']
def get(self, name, project):
if project is None:
project = ''
return self.driver.get('''
select metadata from Queues
where project = ? and name = ?''', project, name)[0]
except _NoResult:
raise exceptions.QueueDoesNotExist(name, project)
def upsert(self, name, metadata, project):
if project is None:
project = ''
with self.driver('immediate'):
previous_record ='''
select id from Queues
where project = ? and name = ?
''', project, name).fetchone()'''
replace into Queues
values (null, ?, ?, ?)
''', project, name, self.driver.pack(metadata))
return previous_record is None
def delete(self, name, project):
if project is None:
project = '''''
delete from Queues
where project = ? and name = ?''', project, name)
def stats(self, name, project):
if project is None:
project = ''
with self.driver('deferred'):
qid = _get_qid(self.driver, name, project)
claimed, free = self.driver.get('''
select * from
(select count(msgid)
from Claims join Locked
on id = cid
where ttl > julianday() * 86400.0 - created
and qid = ?),
(select count(id)
from Messages left join Locked
on id = msgid
where msgid is null
and ttl > julianday() * 86400.0 - created
and qid = ?)
''', qid, qid)
return {
'messages': {
'claimed': claimed,
'free': free,
'actions': 0,
def actions(self, name, project, marker=None, limit=10):
raise NotImplementedError
class Message(base.MessageBase):
def __init__(self, driver):
self.driver = driver'''
create table
if not exists
Messages (
content DOCUMENT,
client TEXT,
created DATETIME, -- seconds since the Julian day
FOREIGN KEY(qid) references Queues(id) on delete cascade
def get(self, queue, message_ids, project):
if project is None:
project = ''
if not isinstance(message_ids, list):
message_ids = [message_ids]
message_ids = ["'%s'" % _msgid_decode(id) for id in message_ids]
message_ids = ','.join(message_ids)
sql = '''
select, content, ttl, julianday() * 86400.0 - created
from Queues as Q join Messages as M
on qid =
where ttl > julianday() * 86400.0 - created
and in (%s) and project = ? and name = ?
''' % message_ids
records =, project, queue)
for id, content, ttl, age in records:
yield {
'id': _msgid_encode(id),
'ttl': ttl,
'age': int(age),
'body': content,
def list(self, queue, project, marker=None,
limit=10, echo=False, client_uuid=None):
if project is None:
project = ''
with self.driver('deferred'):
sql = '''
select id, content, ttl, julianday() * 86400.0 - created
from Messages
where ttl > julianday() * 86400.0 - created
and qid = ?'''
args = [_get_qid(self.driver, queue, project)]
if not echo:
sql += '''
and client != ?'''
args += [client_uuid]
if marker:
sql += '''
and id > ?'''
args += [_marker_decode(marker)]
sql += '''
limit ?'''
args += [limit]
records =, *args)
marker_id = {}
def it():
for id, content, ttl, age in records:
marker_id['next'] = id
yield {
'id': _msgid_encode(id),
'ttl': ttl,
'age': int(age),
'body': content,
yield it()
yield _marker_encode(marker_id['next'])
def post(self, queue, messages, client_uuid, project):
if project is None:
project = ''
with self.driver('immediate'):
qid = _get_qid(self.driver, queue, project)
# cleanup all expired messages in this queue'''
delete from Messages
where ttl <= julianday() * 86400.0 - created
and qid = ?''', qid)
# executemany() sets lastrowid to None, so no matter we manually
# generate the IDs or not, we still need to query for it.
unused = self.driver.get('''
select max(id) + 1 from Messages''')[0] or 1001
my = dict(newid=unused)
def it():
for m in messages:
yield (my['newid'], qid, m['ttl'],
self.driver.pack(m['body']), client_uuid)
my['newid'] += 1
insert into Messages
values (?, ?, ?, ?, ?, julianday() * 86400.0)''', it())
return map(_msgid_encode, range(unused, my['newid']))
def delete(self, queue, message_id, project, claim=None):
if project is None:
project = ''
id = _msgid_decode(message_id)
if not claim:'''
delete from Messages
where id = ?
and qid = (select id from Queues
where project = ? and name = ?)
''', id, project, queue)
with self.driver('immediate'):
message_exists, = self.driver.get('''
select count(
from Queues as Q join Messages as M
on qid =
where ttl > julianday() * 86400.0 - created
and = ? and project = ? and name = ?
''', id, project, queue)
if not message_exists:
self.__delete_claimed(id, claim)
def __delete_claimed(self, id, claim):
# Precondition: id exists in a specific queue'''
delete from Messages
where id = ?
and id in (select msgid
from Claims join Locked
on id = cid
where ttl > julianday() * 86400.0 - created
and id = ?)
''', id, _cid_decode(claim))
if not self.driver.affected:
raise exceptions.ClaimNotPermitted(_msgid_encode(id), claim)
class Claim(base.ClaimBase):
def __init__(self, driver):
self.driver = driver'''
create table
if not exists
Claims (
created DATETIME, -- seconds since the Julian day
FOREIGN KEY(qid) references Queues(id) on delete cascade
create table
if not exists
Locked (
msgid INTEGER,
FOREIGN KEY(cid) references Claims(id) on delete cascade,
FOREIGN KEY(msgid) references Messages(id) on delete cascade
def get(self, queue, claim_id, project):
if project is None:
project = ''
with self.driver('deferred'):
id, ttl, age = self.driver.get('''
select, C.ttl, julianday() * 86400.0 - C.created
from Queues as Q join Claims as C
on = C.qid
where C.ttl > julianday() * 86400.0 - C.created
and = ? and project = ? and name = ?
''', _cid_decode(claim_id), project, queue)
return (
'id': claim_id,
'ttl': ttl,
'age': int(age),
except (_NoResult, exceptions.MalformedID()):
raise exceptions.ClaimDoesNotExist(claim_id, queue, project)
def create(self, queue, metadata, project, limit=10):
if project is None:
project = ''
with self.driver('immediate'):
qid = _get_qid(self.driver, queue, project)
# Clean up all expired claims in this queue'''
delete from Claims
where ttl <= julianday() * 86400.0 - created
and qid = ?''', qid)'''
insert into Claims
values (null, ?, ?, julianday() * 86400.0)
''', qid, metadata['ttl'])
id = self.driver.lastrowid'''
insert into Locked
select last_insert_rowid(), id
from Messages left join Locked
on id = msgid
where msgid is null
and ttl > julianday() * 86400.0 - created
and qid = ?
limit ?''', qid, limit)
self.__update_claimed(id, metadata['ttl'])
return (_cid_encode(id), self.__get(id))
def __get(self, cid):
records ='''
select id, content, ttl, julianday() * 86400.0 - created
from Messages join Locked
on msgid = id
where ttl > julianday() * 86400.0 - created
and cid = ?''', cid)
for id, content, ttl, age in records:
yield {
'id': _msgid_encode(id),
'ttl': ttl,
'age': int(age),
'body': content,
def update(self, queue, claim_id, metadata, project):
if project is None:
project = ''
id = _cid_decode(claim_id)
except exceptions.MalformedID:
raise exceptions.ClaimDoesNotExist(claim_id, queue, project)
with self.driver('deferred'):
# still delay the cleanup here'''
update Claims
set created = julianday() * 86400.0,
ttl = ?
where ttl > julianday() * 86400.0 - created
and id = ?
and qid = (select id from Queues
where project = ? and name = ?)
''', metadata['ttl'], id, project, queue)
if not self.driver.affected:
raise exceptions.ClaimDoesNotExist(claim_id,
self.__update_claimed(id, metadata['ttl'])
def __update_claimed(self, cid, ttl):
# Precondition: cid is not expired'''
update Messages
set created = julianday() * 86400.0,
ttl = ?
where ttl < ?
and id in (select msgid from Locked
where cid = ?)
''', ttl, ttl, cid)
def delete(self, queue, claim_id, project):
if project is None:
project = ''
cid = _cid_decode(claim_id)
except exceptions.MalformedID:
delete from Claims
where id = ?
and qid = (select id from Queues
where project = ? and name = ?)
''', cid, project, queue)
class _NoResult(Exception):
def _get_qid(driver, queue, project):
return driver.get('''
select id from Queues
where project = ? and name = ?''', project, queue)[0]
except _NoResult:
raise exceptions.QueueDoesNotExist(queue, project)
# The utilities below make the database IDs opaque to the users
# of Marconi API. The only purpose is to advise the users NOT to
# make assumptions on the implementation of and/or relationship
# between the message IDs, the markers, and claim IDs.
# The magic numbers are arbitrarily picked; the numbers themselves
# come with no special functionalities.
def _msgid_encode(id):
return hex(id ^ 0x5c693a53)[2:]
except TypeError:
raise exceptions.MalformedID()
def _msgid_decode(id):
return int(id, 16) ^ 0x5c693a53
except ValueError:
raise exceptions.MalformedID()
def _marker_encode(id):
return oct(id ^ 0x3c96a355)[1:]
def _marker_decode(id):
return int(id, 8) ^ 0x3c96a355
except ValueError:
raise exceptions.MalformedMarker()
def _cid_encode(id):
return hex(id ^ 0x63c9a59c)[2:]
def _cid_decode(id):
return int(id, 16) ^ 0x63c9a59c
except ValueError:
raise exceptions.MalformedID()
ClaimController = claims.ClaimController
MessageController = messages.MessageController
QueueController = queues.QueueController

View File

@ -13,7 +13,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import contextlib
import sqlite3
@ -22,6 +21,7 @@ import msgpack
from marconi.common import config
from marconi import storage
from import controllers
from import utils
CFG = config.namespace('drivers:storage:sqlite').from_options(
@ -70,13 +70,13 @@ class Driver(storage.DriverBase):
:param sql: a query string with the '?' placeholders
:param args: the arguments to substitute the placeholders
:raises: _NoResult if the result set is empty
:raises: utils.NoResult if the result set is empty
return, *args).next()
except StopIteration:
raise controllers._NoResult
raise utils.NoResult
def affected(self):
@ -101,12 +101,12 @@ class Driver(storage.DriverBase):
def queue_controller(self):
return controllers.Queue(self)
return controllers.QueueController(self)
def message_controller(self):
return controllers.Message(self)
return controllers.MessageController(self)
def claim_controller(self):
return controllers.Claim(self)
return controllers.ClaimController(self)

View File

@ -0,0 +1,186 @@
# Copyright (c) 2013 Rackspace, Inc.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from import base
from import exceptions
from import utils
class MessageController(base.MessageBase):
def __init__(self, driver):
self.driver = driver'''
create table
if not exists
Messages (
content DOCUMENT,
client TEXT,
created DATETIME, -- seconds since the Julian day
FOREIGN KEY(qid) references Queues(id) on delete cascade
def get(self, queue, message_ids, project):
if project is None:
project = ''
if not isinstance(message_ids, list):
message_ids = [message_ids]
message_ids = ["'%s'" % utils.msgid_decode(id) for id in message_ids]
message_ids = ','.join(message_ids)
sql = '''
select, content, ttl, julianday() * 86400.0 - created
from Queues as Q join Messages as M
on qid =
where ttl > julianday() * 86400.0 - created
and in (%s) and project = ? and name = ?
''' % message_ids
records =, project, queue)
for id, content, ttl, age in records:
yield {
'id': utils.msgid_encode(id),
'ttl': ttl,
'age': int(age),
'body': content,
def list(self, queue, project, marker=None,
limit=10, echo=False, client_uuid=None):
if project is None:
project = ''
with self.driver('deferred'):
sql = '''
select id, content, ttl, julianday() * 86400.0 - created
from Messages
where ttl > julianday() * 86400.0 - created
and qid = ?'''
args = [utils.get_qid(self.driver, queue, project)]
if not echo:
sql += '''
and client != ?'''
args += [client_uuid]
if marker:
sql += '''
and id > ?'''
args += [utils.marker_decode(marker)]
sql += '''
limit ?'''
args += [limit]
records =, *args)
marker_id = {}
def it():
for id, content, ttl, age in records:
marker_id['next'] = id
yield {
'id': utils.msgid_encode(id),
'ttl': ttl,
'age': int(age),
'body': content,
yield it()
yield utils.marker_encode(marker_id['next'])
def post(self, queue, messages, client_uuid, project):
if project is None:
project = ''
with self.driver('immediate'):
qid = utils.get_qid(self.driver, queue, project)
# cleanup all expired messages in this queue'''
delete from Messages
where ttl <= julianday() * 86400.0 - created
and qid = ?''', qid)
# executemany() sets lastrowid to None, so no matter we manually
# generate the IDs or not, we still need to query for it.
unused = self.driver.get('''
select max(id) + 1 from Messages''')[0] or 1001
my = dict(newid=unused)
def it():
for m in messages:
yield (my['newid'], qid, m['ttl'],
self.driver.pack(m['body']), client_uuid)
my['newid'] += 1
insert into Messages
values (?, ?, ?, ?, ?, julianday() * 86400.0)''', it())
return map(utils.msgid_encode, range(unused, my['newid']))
def delete(self, queue, message_id, project, claim=None):
if project is None:
project = ''
id = utils.msgid_decode(message_id)
if not claim:'''
delete from Messages
where id = ?
and qid = (select id from Queues
where project = ? and name = ?)
''', id, project, queue)
with self.driver('immediate'):
message_exists, = self.driver.get('''
select count(
from Queues as Q join Messages as M
on qid =
where ttl > julianday() * 86400.0 - created
and = ? and project = ? and name = ?
''', id, project, queue)
if not message_exists:
self.__delete_claimed(id, claim)
def __delete_claimed(self, id, claim):
# Precondition: id exists in a specific queue'''
delete from Messages
where id = ?
and id in (select msgid
from Claims join Locked
on id = cid
where ttl > julianday() * 86400.0 - created
and id = ?)
''', id, utils.cid_decode(claim))
if not self.driver.affected:
raise exceptions.ClaimNotPermitted(utils.msgid_encode(id), claim)

View File

@ -0,0 +1,141 @@
# Copyright (c) 2013 Rackspace, Inc.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from import base
from import exceptions
from import utils
class QueueController(base.QueueBase):
def __init__(self, driver):
self.driver = driver'''
create table
if not exists
Queues (
project TEXT,
name TEXT,
metadata DOCUMENT,
UNIQUE(project, name)
def list(self, project, marker=None,
limit=10, detailed=False):
if project is None:
project = ''
sql = (('''
select name from Queues''' if not detailed
else '''
select name, metadata from Queues''') +
where project = ?''')
args = [project]
if marker:
sql += '''
and name > ?'''
args += [marker]
sql += '''
order by name
limit ?'''
args += [limit]
records =, *args)
marker_name = {}
def it():
for rec in records:
marker_name['next'] = rec[0]
yield ({'name': rec[0]} if not detailed
{'name': rec[0], 'metadata': rec[1]})
yield it()
yield marker_name['next']
def get(self, name, project):
if project is None:
project = ''
return self.driver.get('''
select metadata from Queues
where project = ? and name = ?''', project, name)[0]
except utils.NoResult:
raise exceptions.QueueDoesNotExist(name, project)
def upsert(self, name, metadata, project):
if project is None:
project = ''
with self.driver('immediate'):
previous_record ='''
select id from Queues
where project = ? and name = ?
''', project, name).fetchone()'''
replace into Queues
values (null, ?, ?, ?)
''', project, name, self.driver.pack(metadata))
return previous_record is None
def delete(self, name, project):
if project is None:
project = '''''
delete from Queues
where project = ? and name = ?''', project, name)
def stats(self, name, project):
if project is None:
project = ''
with self.driver('deferred'):
qid = utils.get_qid(self.driver, name, project)
claimed, free = self.driver.get('''
select * from
(select count(msgid)
from Claims join Locked
on id = cid
where ttl > julianday() * 86400.0 - created
and qid = ?),
(select count(id)
from Messages left join Locked
on id = msgid
where msgid is null
and ttl > julianday() * 86400.0 - created
and qid = ?)
''', qid, qid)
return {
'messages': {
'claimed': claimed,
'free': free,
'actions': 0,
def actions(self, name, project, marker=None, limit=10):
raise NotImplementedError

View File

@ -0,0 +1,79 @@
# Copyright (c) 2013 Rackspace, Inc.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from import exceptions
class NoResult(Exception):
def get_qid(driver, queue, project):
return driver.get('''
select id from Queues
where project = ? and name = ?''', project, queue)[0]
except NoResult:
raise exceptions.QueueDoesNotExist(queue, project)
# The utilities below make the database IDs opaque to the users
# of Marconi API. The only purpose is to advise the users NOT to
# make assumptions on the implementation of and/or relationship
# between the message IDs, the markers, and claim IDs.
# The magic numbers are arbitrarily picked; the numbers themselves
# come with no special functionalities.
def msgid_encode(id):
return hex(id ^ 0x5c693a53)[2:]
except TypeError:
raise exceptions.MalformedID()
def msgid_decode(id):
return int(id, 16) ^ 0x5c693a53
except ValueError:
raise exceptions.MalformedID()
def marker_encode(id):
return oct(id ^ 0x3c96a355)[1:]
def marker_decode(id):
return int(id, 8) ^ 0x3c96a355
except ValueError:
raise exceptions.MalformedMarker()
def cid_encode(id):
return hex(id ^ 0x63c9a59c)[2:]
def cid_decode(id):
return int(id, 16) ^ 0x63c9a59c
except ValueError:
raise exceptions.MalformedID()

View File

@ -20,14 +20,14 @@ from import base
class SQliteQueueTests(base.QueueControllerTest):
driver_class = sqlite.Driver
controller_class = controllers.Queue
controller_class = controllers.QueueController
class SQliteMessageTests(base.MessageControllerTest):
driver_class = sqlite.Driver
controller_class = controllers.Message
controller_class = controllers.MessageController
class SQliteClaimTests(base.ClaimControllerTest):
driver_class = sqlite.Driver
controller_class = controllers.Claim
controller_class = controllers.ClaimController