feat: add catalogue storage driver for queues

The upcoming sharding feature requires a catalogue storage driver very
similar to that used by the proxy. This patch ports the proxy
catalogue storage driver to the queues storage space and updates the
semantics accordingly. Specifically, the queues catalogue maintains a
mapping from project/queue to shard identifier.

Unit tests and helpers have been updated accordingly. The sqlite
portion of this is still to be done, and is reserved for a future
patch.

The faulty storage drivers has been given the appropriate
catalogue_controller impls.

Furthermore, minor py3k updates were added: metaclass using
six.add_metaclass.

Change-Id: I380d028520e9a712064810a15e2c6576de67e485
Partially-implements: blueprint storage-sharding
Closes-Bug: #1241686
This commit is contained in:
Alejandro Cabrera 2013-10-16 12:54:55 -04:00
parent ea79e2bd16
commit 4f5de4b23f
16 changed files with 517 additions and 29 deletions

View File

@ -6,6 +6,7 @@ from marconi.queues.storage import exceptions # NOQA
# Hoist classes into package namespace
ControlDriverBase = base.ControlDriverBase
DataDriverBase = base.DataDriverBase
CatalogueBase = base.CatalogueBase
ClaimBase = base.ClaimBase
MessageBase = base.MessageBase
QueueBase = base.QueueBase

View File

@ -87,6 +87,11 @@ class ControlDriverBase(object):
def __init__(self, conf):
self.conf = conf
@abc.abstractproperty
def catalogue_controller(self):
"""Returns the driver's catalogue controller."""
raise NotImplementedError
@abc.abstractproperty
def shards_controller(self):
"""Returns storage's shard management controller."""
@ -465,3 +470,94 @@ class ShardsBase(AdminControllerBase):
def drop_all(self):
"""Deletes all shards from storage."""
raise NotImplementedError
@six.add_metaclass(abc.ABCMeta)
class CatalogueBase(ControllerBase):
"""A controller for managing the catalogue. The catalogue is
responsible for maintaining a mapping between project.queue
entries to their shard.
"""
@abc.abstractmethod
def list(self, project):
"""Returns a list of queue entries from the catalogue associated with
this project.
:param project: The project to use when filtering through queue
entries.
:type project: six.text_type
:returns: [{'project': ..., 'queue': ..., 'shard': ...},]
:rtype: [dict]
"""
raise NotImplementedError
@abc.abstractmethod
def get(self, project, queue):
"""Returns the shard identifier for the queue registered under this
project.
:param project: Namespace to search for the given queue
:type project: six.text_type
:param queue: The name of the queue to search for
:type queue: six.text_type
:returns: {'shard': ...}
:rtype: dict
:raises: QueueNotMapped
"""
raise NotImplementedError
@abc.abstractmethod
def exists(self, project, queue):
"""Determines whether the given queue exists under project.
:param project: Namespace to check.
:type project: six.text_type
:param queue: str - Particular queue to check for
:type queue: six.text_type
:return: True if the queue exists under this project
:rtype: bool
"""
@abc.abstractmethod
def insert(self, project, queue, shard):
"""Creates a new catalogue entry, or updates it if it already existed.
:param project: str - Namespace to insert the given queue into
:type project: six.text_type
:param queue: str - The name of the queue to insert
:type queue: six.text_type
:param shard: shard identifier to associate this queue with
:type shard: six.text_type
"""
raise NotImplementedError
@abc.abstractmethod
def delete(self, project, queue):
"""Removes this entry from the catalogue.
:param project: The namespace to search for this queue
:type project: six.text_type
:param queue: The queue name to remove
:type queue: six.text_type
"""
raise NotImplementedError
@abc.abstractmethod
def update(self, project, queue, shards=None):
"""Updates the shard identifier for this queue
:param project: Namespace to search
:type project: six.text_type
:param queue: The name of the queue
:type queue: six.text_type
:param shards: The name of the shard where this project/queue lives.
:type shards: six.text_type
:raises: QueueNotMapped
"""
raise NotImplementedError
@abc.abstractmethod
def drop_all(self):
"""Drops all catalogue entries from storage."""
raise NotImplementedError

View File

@ -100,6 +100,15 @@ class ClaimDoesNotExist(DoesNotExist):
super(ClaimDoesNotExist, self).__init__(msg)
class QueueNotMapped(DoesNotExist):
def __init__(self, queue, project):
msg = (u'No shard found for '
u'queue %(queue)s for project %(project)s' %
dict(queue=queue, project=project))
super(QueueNotMapped, self).__init__(msg)
class MessageIsClaimedBy(NotPermitted):
def __init__(self, mid, cid):

View File

@ -0,0 +1,107 @@
# Copyright (c) 2013 Rackspace Hosting, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""MongoDB storage controller for the queues catalogue.
Serves to construct an association between a project + queue -> shard
{
'p_q': project_queue :: six.text_type,
's': shard_identifier :: six.text_type
}
"""
import marconi.openstack.common.log as logging
from marconi.queues.storage import base, exceptions
from marconi.queues.storage.mongodb import utils
LOG = logging.getLogger(__name__)
PRIMARY_KEY = utils.PROJ_QUEUE_KEY
CATALOGUE_INDEX = [
(PRIMARY_KEY, 1)
]
class CatalogueController(base.CatalogueBase):
def __init__(self, *args, **kwargs):
super(CatalogueController, self).__init__(*args, **kwargs)
self._col = self.driver.catalogue_database.catalogue
self._col.ensure_index(CATALOGUE_INDEX, unique=True)
@utils.raises_conn_error
def _insert(self, project, queue, shard, upsert):
key = utils.scope_queue_name(queue, project)
return self._col.update({PRIMARY_KEY: key},
{'$set': {'s': shard}}, upsert=upsert)
@utils.raises_conn_error
def list(self, project):
fields = {'_id': 0}
query = utils.scoped_query(None, project)
return utils.HookedCursor(self._col.find(query, fields),
_normalize)
@utils.raises_conn_error
def get(self, project, queue):
fields = {'_id': 0}
key = utils.scope_queue_name(queue, project)
entry = self._col.find_one({PRIMARY_KEY: key},
fields=fields)
if entry is None:
raise exceptions.QueueNotMapped(project, queue)
return _normalize(entry)
@utils.raises_conn_error
def exists(self, project, queue):
key = utils.scope_queue_name(queue, project)
return self._col.find_one({PRIMARY_KEY: key}) is not None
def insert(self, project, queue, shard):
# NOTE(cpp-cabrera): _insert handles conn_error
self._insert(project, queue, shard, upsert=True)
@utils.raises_conn_error
def delete(self, project, queue):
self._col.remove({PRIMARY_KEY: utils.scope_queue_name(queue, project)},
w=0)
def update(self, project, queue, shard=None):
# NOTE(cpp-cabrera): _insert handles conn_error
res = self._insert(project, queue, shard, upsert=False)
if not res['updatedExisting']:
raise exceptions.QueueNotMapped(project, queue)
@utils.raises_conn_error
def drop_all(self):
self._col.drop()
self._col.ensure_index(CATALOGUE_INDEX, unique=True)
def _normalize(entry):
project, queue = utils.parse_scoped_project_queue(entry[PRIMARY_KEY])
return {
'queue': queue,
'project': project,
'shard': entry['s']
}

View File

@ -22,12 +22,14 @@ Field Mappings:
updated and documented in each controller class.
"""
from marconi.queues.storage.mongodb import catalogue
from marconi.queues.storage.mongodb import claims
from marconi.queues.storage.mongodb import messages
from marconi.queues.storage.mongodb import queues
from marconi.queues.storage.mongodb import shards
CatalogueController = catalogue.CatalogueController
ClaimController = claims.ClaimController
MessageController = messages.MessageController
QueueController = queues.QueueController

View File

@ -29,7 +29,6 @@ LOG = logging.getLogger(__name__)
def _connection(conf):
"""MongoDB client connection instance."""
if conf.uri and 'replicaSet' in conf.uri:
MongoClient = pymongo.MongoReplicaSetClient
else:
@ -52,7 +51,7 @@ class DataDriver(storage.DataDriverBase):
def queues_database(self):
"""Database dedicated to the "queues" collection.
The queues collection is separated out into it's own database
The queues collection is separated out into its own database
to avoid writer lock contention with the messages collections.
"""
@ -78,6 +77,7 @@ class DataDriver(storage.DataDriverBase):
@decorators.lazy_property(write=False)
def connection(self):
"""MongoDB client connection instance."""
return _connection(self.mongodb_conf)
@decorators.lazy_property(write=False)
@ -116,3 +116,18 @@ class ControlDriver(storage.ControlDriverBase):
@property
def shards_controller(self):
return controllers.ShardsController(self)
@decorators.lazy_property(write=False)
def catalogue_database(self):
"""Database dedicated to the "queues" collection.
The queues collection is separated out into its own database
to avoid writer lock contention with the messages collections.
"""
name = self.mongodb_conf.database + '_catalogue'
return self.connection[name]
@property
def catalogue_controller(self):
return controllers.CatalogueController(self)

View File

@ -55,23 +55,27 @@ TTL_INDEX_FIELDS = [
('e', 1),
]
# NOTE(cpp-cabrera): to unify use of project/queue across mongodb
# storage impls.
PROJ_QUEUE = utils.PROJ_QUEUE_KEY
# NOTE(kgriffs): This index is for listing messages, usually
# filtering out claimed ones.
ACTIVE_INDEX_FIELDS = [
('p_q', 1), # Project will to be unique, so put first
(PROJ_QUEUE, 1), # Project will be unique, so put first
('k', 1), # Used for sorting and paging, must come before range queries
('c.e', 1), # Used for filtering out claimed messages
]
# For counting
COUNTING_INDEX_FIELDS = [
('p_q', 1), # Project will to be unique, so put first
(PROJ_QUEUE, 1), # Project will be unique, so put first
('c.e', 1), # Used for filtering out claimed messages
]
# Index used for claims
CLAIMED_INDEX_FIELDS = [
('p_q', 1),
(PROJ_QUEUE, 1),
('c.id', 1),
('k', 1),
('c.e', 1),
@ -79,7 +83,7 @@ CLAIMED_INDEX_FIELDS = [
# Index used to ensure uniqueness.
MARKER_INDEX_FIELDS = [
('p_q', 1),
(PROJ_QUEUE, 1),
('k', 1),
]
@ -193,7 +197,7 @@ class MessageController(storage.MessageBase):
"""
scope = utils.scope_queue_name(queue_name, project)
collection = self._collection(queue_name, project)
collection.remove({'p_q': scope}, w=0)
collection.remove({PROJ_QUEUE: scope}, w=0)
def _list(self, queue_name, project=None, marker=None,
echo=False, client_uuid=None, fields=None,
@ -234,7 +238,7 @@ class MessageController(storage.MessageBase):
query = {
# Messages must belong to this
# queue and project
'p_q': utils.scope_queue_name(queue_name, project),
PROJ_QUEUE: utils.scope_queue_name(queue_name, project),
}
if not echo:
@ -275,7 +279,7 @@ class MessageController(storage.MessageBase):
"""
query = {
# Messages must belong to this queue
'p_q': utils.scope_queue_name(queue_name, project),
PROJ_QUEUE: utils.scope_queue_name(queue_name, project),
}
if not include_claimed:
@ -301,7 +305,7 @@ class MessageController(storage.MessageBase):
claim_id = {'$ne': None}
query = {
'p_q': utils.scope_queue_name(queue_name, project),
PROJ_QUEUE: utils.scope_queue_name(queue_name, project),
'c.id': claim_id,
'c.e': {'$gt': expires or timeutils.utcnow_ts()},
}
@ -341,7 +345,7 @@ class MessageController(storage.MessageBase):
scope = utils.scope_queue_name(queue_name, project)
collection = self._collection(queue_name, project)
collection.update({'p_q': scope, 'c.id': cid},
collection.update({PROJ_QUEUE: scope, 'c.id': cid},
{'$set': {'c': {'id': None, 'e': now}}},
upsert=False, multi=True)
@ -402,7 +406,7 @@ class MessageController(storage.MessageBase):
query = {
'_id': mid,
'p_q': utils.scope_queue_name(queue_name, project),
PROJ_QUEUE: utils.scope_queue_name(queue_name, project),
}
collection = self._collection(queue_name, project)
@ -425,7 +429,7 @@ class MessageController(storage.MessageBase):
# Base query, always check expire time
query = {
'_id': {'$in': message_ids},
'p_q': utils.scope_queue_name(queue_name, project),
PROJ_QUEUE: utils.scope_queue_name(queue_name, project),
}
collection = self._collection(queue_name, project)
@ -454,7 +458,7 @@ class MessageController(storage.MessageBase):
prepared_messages = [
{
't': message['ttl'],
'p_q': utils.scope_queue_name(queue_name, project),
PROJ_QUEUE: utils.scope_queue_name(queue_name, project),
'e': now_dt + datetime.timedelta(seconds=message['ttl']),
'u': client_uuid,
'c': {'id': None, 'e': now},
@ -616,7 +620,7 @@ class MessageController(storage.MessageBase):
query = {
'_id': mid,
'p_q': utils.scope_queue_name(queue_name, project),
PROJ_QUEUE: utils.scope_queue_name(queue_name, project),
}
# NOTE(cpp-cabrera): return early - the user gaves us an
@ -650,7 +654,7 @@ class MessageController(storage.MessageBase):
message_ids = [mid for mid in map(utils.to_oid, message_ids) if mid]
query = {
'_id': {'$in': message_ids},
'p_q': utils.scope_queue_name(queue_name, project),
PROJ_QUEUE: utils.scope_queue_name(queue_name, project),
}
collection = self._collection(queue_name, project)

View File

@ -172,19 +172,7 @@ class QueueController(storage.QueueBase):
if limit is None:
limit = self.driver.limits_conf.default_queue_paging
query = {}
scoped_name = utils.scope_queue_name(marker, project)
if not scoped_name.startswith('/'):
# NOTE(kgriffs): scoped queue, e.g., 'project-id/queue-name'
project_prefix = '^' + project + '/'
query['p_q'] = {'$regex': project_prefix, '$gt': scoped_name}
elif scoped_name == '/':
# NOTE(kgriffs): list global queues, but exclude scoped ones
query['p_q'] = {'$regex': '^/'}
else:
# NOTE(kgriffs): unscoped queue, e.g., '/my-global-queue'
query['p_q'] = {'$regex': '^/', '$gt': scoped_name}
query = utils.scoped_query(marker, project)
fields = {'p_q': 1, '_id': 0}
if detailed:

View File

@ -33,6 +33,9 @@ from marconi.queues.storage import exceptions as storage_exceptions
# TZ-aware UNIX epoch for convenience.
EPOCH = datetime.datetime.utcfromtimestamp(0).replace(tzinfo=tz_util.utc)
# NOTE(cpp-cabrera): the authoritative form of project/queue keys.
PROJ_QUEUE_KEY = 'p_q'
LOG = logging.getLogger(__name__)
@ -172,6 +175,47 @@ def descope_queue_name(scoped_name):
return scoped_name.partition('/')[2] or None
def parse_scoped_project_queue(scoped_name):
"""Returns the project and queue name for a scoped catalogue entry.
:param scoped_name: a project/queue as given by :scope_queue_name:
:type scoped_name: six.text_type
:returns: (project, queue)
:rtype: (six.text_type, six.text_type)
"""
return scoped_name.split('/')
def scoped_query(queue, project):
"""Returns a dict usable for querying for scoped project/queues.
:param queue: name of queue to seek
:type queue: six.text_type
:param project: namespace
:type project: six.text_type
:param key: query key to use
:type key: six.text_type
:returns: query to issue
:rtype: dict
"""
key = PROJ_QUEUE_KEY
query = {}
scoped_name = scope_queue_name(queue, project)
if not scoped_name.startswith('/'):
# NOTE(kgriffs): scoped queue, e.g., 'project-id/queue-name'
project_prefix = '^' + project + '/'
query[key] = {'$regex': project_prefix, '$gt': scoped_name}
elif scoped_name == '/':
# NOTE(kgriffs): list global queues, but exclude scoped ones
query[key] = {'$regex': '^/'}
else:
# NOTE(kgriffs): unscoped queue, e.g., '/my-global-queue'
query[key] = {'$regex': '^/', '$gt': scoped_name}
return query
def get_partition(num_partitions, queue, project=None):
"""Get the partition number for a given queue and project.

View File

@ -0,0 +1,48 @@
# Copyright (c) 2013 Rackspace Hosting, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""sqlite storage controller for the queues catalogue.
Serves to construct an association between a project + queue -> shard
"""
from marconi.queues.storage import base
class CatalogueController(base.CatalogueBase):
def __init__(self, *args, **kwargs):
super(CatalogueController, self).__init__(*args, **kwargs)
def list(self, project):
pass
def get(self, project, queue):
pass
def exists(self, project, queue):
pass
def insert(self, project, queue, shard):
pass
def delete(self, project, queue):
pass
def update(self, project, queue, shards=None):
pass
def drop_all(self):
pass

View File

@ -16,12 +16,14 @@
"""Exports SQLite driver controllers."""
from marconi.queues.storage.sqlite import catalogue
from marconi.queues.storage.sqlite import claims
from marconi.queues.storage.sqlite import messages
from marconi.queues.storage.sqlite import queues
from marconi.queues.storage.sqlite import shards
CatalogueController = catalogue.CatalogueController
ClaimController = claims.ClaimController
MessageController = messages.MessageController
QueueController = queues.QueueController

View File

@ -214,6 +214,10 @@ class ControlDriver(storage.ControlDriverBase):
self.__db = self.__conn.cursor()
self.run('''PRAGMA foreign_keys = ON''')
@property
def catalogue_controller(self):
return controllers.CatalogueController(self)
@property
def shards_controller(self):
return controllers.ShardsController(self)

View File

@ -42,6 +42,10 @@ class ControlDriver(storage.ControlDriverBase):
def __init__(self, conf):
super(ControlDriver, self).__init__(conf)
@property
def catalogue_controller(self):
return None
@property
def shards_controller(self):
return None

View File

@ -132,6 +132,51 @@ def entries(controller, count):
controller.delete(p, q)
@contextlib.contextmanager
def shard_entry(controller, project, queue, shard):
"""Creates a catalogue entry with the given details, and deletes
it once the context manager goes out of scope.
:param controller: storage handler
:type controller: queues.storage.base:CatalogueBase
:param project: namespace for queue
:type project: six.text_type
:param queue: name of queue
:type queue: six.text_type
:param shard: an identifier for the shard
:type shard: six.text_type
:returns: (project, queue, shard)
:rtype: (six.text_type, six.text_type, six.text_type)
"""
controller.insert(project, queue, shard)
yield (project, queue, shard)
controller.delete(project, queue)
@contextlib.contextmanager
def shard_entries(controller, count):
"""Creates `count` catalogue entries with the given details, and
deletes them once the context manager goes out of scope.
:param controller: storage handler
:type controller: queues.storage.base:CatalogueBase
:param count: number of entries to create
:type count: int
:returns: [(project, queue, shard)]
:rtype: [(six.text_type, six.text_type, six.text_type)]
"""
spec = [(u'_', six.text_type(uuid.uuid1()), six.text_type(i))
for i in range(count)]
for p, q, s in spec:
controller.insert(p, q, s)
yield spec
for p, q, _ in spec:
controller.delete(p, q)
def requires_mongodb(test_case):
"""Decorator to flag a test case as being dependent on MongoDB.

View File

@ -19,12 +19,14 @@ import uuid
import ddt
from oslo.config import cfg
import six
from testtools import matchers
from marconi.openstack.common import timeutils
from marconi.queues import storage
from marconi.queues.storage import exceptions
from marconi import tests as testing
from marconi.tests import helpers
class ControllerBaseTest(testing.TestBase):
@ -712,6 +714,109 @@ class ShardsControllerTest(ControllerBaseTest):
self.assertEqual(entry['o'], {})
class CatalogueControllerTest(ControllerBaseTest):
controller_base_class = storage.CatalogueBase
def setUp(self):
super(CatalogueControllerTest, self).setUp()
self.controller = self.driver.catalogue_controller
self.queue = six.text_type(uuid.uuid1())
self.project = six.text_type(uuid.uuid1())
def tearDown(self):
self.controller.drop_all()
super(CatalogueControllerTest, self).tearDown()
def _check_structure(self, entry):
self.assertIn('queue', entry)
self.assertIn('project', entry)
self.assertIn('shard', entry)
self.assertIsInstance(entry['queue'], six.text_type)
self.assertIsInstance(entry['project'], six.text_type)
self.assertIsInstance(entry['shard'], six.text_type)
def _check_value(self, entry, xqueue, xproject, xshard):
self.assertEqual(entry['queue'], xqueue)
self.assertEqual(entry['project'], xproject)
self.assertEqual(entry['shard'], xshard)
def test_catalogue_entry_life_cycle(self):
queue = self.queue
project = self.project
# check listing is initially empty
for p in self.controller.list(project):
self.fail('There should be no entries at this time')
# create a listing, check its length
with helpers.shard_entries(self.controller, 10) as expect:
project = expect[0][0]
xs = list(self.controller.list(project))
self.assertEqual(len(xs), 10)
# create, check existence, delete
with helpers.shard_entry(self.controller, project, queue, u'a'):
self.assertTrue(self.controller.exists(project, queue))
# verify it no longer exists
self.assertFalse(self.controller.exists(project, queue))
# verify it isn't listable
self.assertEqual(len(list(self.controller.list(project))), 0)
def test_list(self):
with helpers.shard_entries(self.controller, 10) as expect:
values = zip(self.controller.list(u'_'), expect)
for e, x in values:
p, q, s = x
self._check_structure(e)
self._check_value(e, xqueue=q, xproject=p, xshard=s)
def test_update(self):
with helpers.shard_entry(self.controller, self.project,
self.queue, u'a') as expect:
p, q, s = expect
self.controller.update(p, q, shard=u'b')
entry = self.controller.get(p, q)
self._check_value(entry, xqueue=q, xproject=p, xshard=u'b')
def test_update_raises_when_entry_does_not_exist(self):
self.assertRaises(exceptions.QueueNotMapped,
self.controller.update,
'not', 'not', 'a')
def test_get(self):
with helpers.shard_entry(self.controller,
self.project,
self.queue, u'a') as expect:
p, q, s = expect
e = self.controller.get(p, q)
self._check_value(e, xqueue=q, xproject=p, xshard=s)
def test_get_raises_if_does_not_exist(self):
with helpers.shard_entry(self.controller,
self.project,
self.queue, u'a') as expect:
p, q, _ = expect
self.assertRaises(exceptions.QueueNotMapped,
self.controller.get,
p, 'non_existing')
self.assertRaises(exceptions.QueueNotMapped,
self.controller.get,
'non_existing', q)
self.assertRaises(exceptions.QueueNotMapped,
self.controller.get,
'non_existing', 'non_existing')
def test_exists(self):
with helpers.shard_entry(self.controller,
self.project,
self.queue, u'a') as expect:
p, q, _ = expect
self.assertTrue(self.controller.exists(p, q))
self.assertFalse(self.controller.exists('nada', 'not_here'))
def _insert_fixtures(controller, queue_name, project=None,
client_uuid=None, num=4, ttl=120):

View File

@ -345,3 +345,17 @@ class MongodbShardsTests(base.ShardsControllerTest):
def tearDown(self):
super(MongodbShardsTests, self).tearDown()
@testing.requires_mongodb
class MongodbCatalogueTests(base.CatalogueControllerTest):
driver_class = mongodb.ControlDriver
controller_class = controllers.CatalogueController
def setUp(self):
super(MongodbCatalogueTests, self).setUp()
self.load_conf('wsgi_mongodb.conf')
def tearDown(self):
self.controller.drop_all()
super(MongodbCatalogueTests, self).tearDown()