Merge "feat: shards mongodb driver + tests"

This commit is contained in:
Jenkins 2013-10-29 15:42:42 +00:00 committed by Gerrit Code Review
commit ddaf071288
17 changed files with 296 additions and 31 deletions

38
marconi/common/utils.py Normal file
View File

@ -0,0 +1,38 @@
# Copyright (c) 2013 Rackspace Hosting, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""utils: general-purpose utilities."""
import six
def fields(d, names, pred=lambda x: True,
key_transform=lambda x: x, value_transform=lambda x: x):
"""Returns the entries in this dictionary with keys appearing in names.
:type d: dict
:type names: [a]
:param pred: a filter that is applied to the values of the dictionary.
:type pred: (a -> bool)
:param key_transform: a transform to apply to the key before returning it
:type key_transform: a -> a
:param value_transform: a transform to apply to the value before
returning it
:type value_transform: a -> a
:rtype: dict
"""
return dict((key_transform(k), value_transform(v))
for k, v in six.iteritems(d)
if k in names and pred(v))

View File

@ -58,7 +58,7 @@ class Bootstrap(object):
self.driver_conf = self.conf[_DRIVER_GROUP]
log.setup('marconi')
mode = 'admin' if self.conf.admin_mode else 'public'
mode = 'admin' if conf.admin_mode else 'public'
self._transport_type = 'marconi.queues.{0}.transport'.format(mode)
@decorators.lazy_property(write=False)

View File

@ -9,3 +9,4 @@ DataDriverBase = base.DataDriverBase
ClaimBase = base.ClaimBase
MessageBase = base.MessageBase
QueueBase = base.QueueBase
ShardsBase = base.ShardsBase

View File

@ -368,8 +368,19 @@ class ClaimBase(ControllerBase):
raise NotImplementedError
class AdminControllerBase(object):
"""Top-level class for controllers.
:param driver: Instance of the driver
instantiating this controller.
"""
def __init__(self, driver):
self.driver = driver
@six.add_metaclass(abc.ABCMeta)
class ShardsController(ControllerBase):
class ShardsBase(AdminControllerBase):
"""A controller for managing shards."""
@abc.abstractmethod
@ -404,11 +415,13 @@ class ShardsController(ControllerBase):
raise NotImplementedError
@abc.abstractmethod
def get(self, name):
def get(self, name, detailed=False):
"""Returns a single shard entry.
:param name: The name of this shard
:type name: six.text_type
:param detailed: Should the options data be included?
:type detailed: bool
:returns: weight, uri, and options for this shard
:rtype: {}
:raises: ShardDoesNotExist if not found

View File

@ -28,6 +28,16 @@ from marconi.queues.storage.mongodb import options
LOG = logging.getLogger(__name__)
def _connection(conf):
"""MongoDB client connection instance."""
if conf.uri and 'replicaSet' in conf.uri:
MongoClient = pymongo.MongoReplicaSetClient
else:
MongoClient = pymongo.MongoClient
return MongoClient(conf.uri)
class DataDriver(storage.DataDriverBase):
def __init__(self, conf):
@ -68,14 +78,7 @@ class DataDriver(storage.DataDriverBase):
@decorators.lazy_property(write=False)
def connection(self):
"""MongoDB client connection instance."""
if self.mongodb_conf.uri and 'replicaSet' in self.mongodb_conf.uri:
MongoClient = pymongo.MongoReplicaSetClient
else:
MongoClient = pymongo.MongoClient
return MongoClient(self.mongodb_conf.uri)
return _connection(self.mongodb_conf)
@decorators.lazy_property(write=False)
def queue_controller(self):
@ -100,6 +103,16 @@ class ControlDriver(storage.ControlDriverBase):
self.mongodb_conf = self.conf[options.MONGODB_GROUP]
@decorators.lazy_property(write=False)
def connection(self):
"""MongoDB client connection instance."""
return _connection(self.mongodb_conf)
@decorators.lazy_property(write=False)
def shards_database(self):
name = self.mongodb_conf.database + '_shards'
return self.connection[name]
@property
def shards_controller(self):
return controllers.ShardsController(self)

View File

@ -14,28 +14,91 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from marconi.queues.storage import base
"""shards: an implementation of the shard management storage
controller for mongodb.
Schema:
'n': name :: six.text_type
'u': uri :: six.text_type
'w': weight :: int
'o': options :: dict
"""
from marconi.common import utils as common_utils
from marconi.queues.storage import base, exceptions
from marconi.queues.storage.mongodb import utils
SHARDS_INDEX = [
('n', 1)
]
# NOTE(cpp-cabrera): used for get/list operations. There's no need to
# show the marker or the _id - they're implementation details.
OMIT_FIELDS = (('_id', 0),)
class ShardsController(base.ShardsController):
def _field_spec(detailed=False):
return dict(OMIT_FIELDS + (() if detailed else (('o', 0),)))
class ShardsController(base.ShardsBase):
def __init__(self, *args, **kwargs):
super(ShardsController, self).__init__(*args, **kwargs)
self._col = self.driver.shards_database.shards
self._col.ensure_index(SHARDS_INDEX,
background=True,
name='shards_name',
unique=True)
@utils.raises_conn_error
def list(self, marker=None, limit=10, detailed=False):
pass
query = {}
if marker is not None:
query['n'] = {'$gt': marker}
def get(self, name):
pass
return self._col.find(query, fields=_field_spec(detailed),
limit=limit)
@utils.raises_conn_error
def get(self, name, detailed=False):
res = self._col.find_one({'n': name},
_field_spec(detailed))
if not res:
raise exceptions.ShardDoesNotExist(name)
return res
@utils.raises_conn_error
def create(self, name, weight, uri, options=None):
pass
options = {} if options is None else options
self._col.update({'n': name},
{'$set': {'n': name, 'w': weight, 'u': uri,
'o': options}},
upsert=True)
@utils.raises_conn_error
def exists(self, name):
pass
return self._col.find_one({'n': name}) is not None
@utils.raises_conn_error
def update(self, name, **kwargs):
pass
names = ('uri', 'weight', 'options')
fields = common_utils.fields(kwargs, names,
pred=lambda x: x is not None,
key_transform=lambda x: x[0])
assert fields, '`weight`, `uri`, or `options` not found in kwargs'
res = self._col.update({'n': name},
{'$set': fields},
upsert=False)
if not res['updatedExisting']:
raise exceptions.ShardDoesNotExist(name)
@utils.raises_conn_error
def delete(self, name):
pass
self._col.remove({'n': name}, w=0)
@utils.raises_conn_error
def drop_all(self):
pass
self._col.drop()
self._col.ensure_index(SHARDS_INDEX, unique=True)

View File

@ -144,10 +144,14 @@ class Catalog(object):
# TODO(kgriffs): SHARDING - Read options from catalog backend
conf = cfg.ConfigOpts()
general_opts = [
cfg.BoolOpt('admin_mode', default=False)
]
options = [
cfg.StrOpt('storage', default='sqlite')
cfg.StrOpt('storage', default='sqlite'),
]
conf.register_opts(general_opts)
conf.register_opts(options, group='queues:drivers')
return utils.load_storage_driver(conf)

View File

@ -17,7 +17,7 @@
from marconi.queues.storage import base
class ShardsController(base.ShardsController):
class ShardsController(base.ShardsBase):
def list(self, marker=None, limit=10, detailed=False):
pass

View File

@ -32,7 +32,7 @@ def load_storage_driver(conf):
"""
try:
mgr = driver.DriverManager('marconi.queues.storage',
mgr = driver.DriverManager('marconi.queues.data.storage',
conf['queues:drivers'].storage,
invoke_on_load=True,
invoke_args=[conf])

View File

@ -17,6 +17,9 @@ from marconi.queues import storage
class DataDriver(storage.DataDriverBase):
def __init__(self, conf):
super(DataDriver, self).__init__(conf)
@property
def default_options(self):
return {}
@ -35,9 +38,9 @@ class DataDriver(storage.DataDriverBase):
class ControlDriver(storage.ControlDriverBase):
@property
def default_options(self):
return {}
def __init__(self, conf):
super(ControlDriver, self).__init__(conf)
@property
def shards_controller(self):

View File

@ -601,6 +601,117 @@ class ClaimControllerTest(ControllerBaseTest):
project=self.project)
class ShardsControllerTest(ControllerBaseTest):
"""Shards Controller base tests.
NOTE(flaper87): Implementations of this class should
override the tearDown method in order
to clean up storage's state.
"""
controller_base_class = storage.ShardsBase
def setUp(self):
super(ShardsControllerTest, self).setUp()
self.shards_controller = self.driver.shards_controller
# Let's create one shard
self.shard = str(uuid.uuid1())
self.shards_controller.create(self.shard, 100, 'localhost', {})
def tearDown(self):
self.shards_controller.drop_all()
super(ShardsControllerTest, self).tearDown()
def test_create_succeeds(self):
self.shards_controller.create(str(uuid.uuid1()),
100, 'localhost', {})
def test_create_replaces_on_duplicate_insert(self):
name = str(uuid.uuid1())
self.shards_controller.create(name,
100, 'localhost', {})
self.shards_controller.create(name,
111, 'localhost2', {})
entry = self.shards_controller.get(name)
self._shard_expects(entry, xname=name, xweight=111,
xlocation='localhost2')
def _shard_expects(self, shard, xname, xweight, xlocation):
self.assertIn('n', shard)
self.assertEqual(shard['n'], xname)
self.assertIn('w', shard)
self.assertEqual(shard['w'], xweight)
self.assertIn('u', shard)
self.assertEqual(shard['u'], xlocation)
def test_get_returns_expected_content(self):
res = self.shards_controller.get(self.shard)
self._shard_expects(res, self.shard, 100, 'localhost')
self.assertNotIn('o', res)
def test_detailed_get_returns_expected_content(self):
res = self.shards_controller.get(self.shard, detailed=True)
self.assertIn('o', res)
self.assertEqual(res['o'], {})
def test_get_raises_if_not_found(self):
self.assertRaises(storage.exceptions.ShardDoesNotExist,
self.shards_controller.get, 'notexists')
def test_exists(self):
self.assertTrue(self.shards_controller.exists(self.shard))
self.assertFalse(self.shards_controller.exists('notexists'))
def test_update_raises_assertion_error_on_bad_fields(self):
self.assertRaises(AssertionError, self.shards_controller.update,
self.shard)
def test_update_works(self):
self.shards_controller.update(self.shard, weight=101,
uri='redis://localhost',
options={'a': 1})
res = self.shards_controller.get(self.shard, detailed=True)
self._shard_expects(res, self.shard, 101, 'redis://localhost')
self.assertEqual(res['o'], {'a': 1})
def test_delete_works(self):
self.shards_controller.delete(self.shard)
self.assertFalse(self.shards_controller.exists(self.shard))
def test_delete_nonexistent_is_silent(self):
self.shards_controller.delete('nonexisting')
def test_drop_all_leads_to_empty_listing(self):
self.shards_controller.drop_all()
cursor = self.shards_controller.list()
self.assertRaises(StopIteration, next, cursor)
def test_listing_simple(self):
# NOTE(cpp-cabrera): base entry interferes with listing results
self.shards_controller.delete(self.shard)
for i in range(15):
self.shards_controller.create(str(i), i, str(i), {})
res = list(self.shards_controller.list())
self.assertEqual(len(res), 10)
for i, entry in enumerate(res):
self._shard_expects(entry, str(i), i, str(i))
res = list(self.shards_controller.list(limit=5))
self.assertEqual(len(res), 5)
res = next(self.shards_controller.list(marker='3'))
self._shard_expects(res, '4', 4, '4')
res = list(self.shards_controller.list(detailed=True))
self.assertEqual(len(res), 10)
for i, entry in enumerate(res):
self._shard_expects(entry, str(i), i, str(i))
self.assertIn('o', entry)
self.assertEqual(entry['o'], {})
def _insert_fixtures(controller, queue_name, project=None,
client_uuid=None, num=4, ttl=120):

View File

@ -26,10 +26,14 @@ packages =
console_scripts =
marconi-server = marconi.cmd.server:run
marconi.queues.storage =
marconi.queues.data.storage =
sqlite = marconi.queues.storage.sqlite.driver:DataDriver
mongodb = marconi.queues.storage.mongodb.driver:DataDriver
marconi.queues.control.storage =
sqlite = marconi.queues.storage.sqlite.driver:ControlDriver
mongodb = marconi.queues.storage.mongodb.driver:ControlDriver
marconi.queues.public.transport =
wsgi = marconi.queues.transport.wsgi.public.driver:Driver

View File

@ -1,6 +1,7 @@
[DEFAULT]
debug = False
verbose = False
admin_mode = False
[queues:drivers]
transport = wsgi

View File

@ -1,6 +1,7 @@
[DEFAULT]
debug = False
verbose = False
admin_mode = False
[queues:drivers]
transport = wsgi

View File

@ -332,3 +332,16 @@ class MongodbClaimTests(base.ClaimControllerTest):
self.assertRaises(storage.exceptions.ClaimDoesNotExist,
self.controller.update, self.queue_name,
claim_id, {}, project=self.project)
@testing.requires_mongodb
class MongodbShardsTests(base.ShardsControllerTest):
driver_class = mongodb.ControlDriver
controller_class = controllers.ShardsController
def setUp(self):
super(MongodbShardsTests, self).setUp()
self.load_conf('wsgi_mongodb.conf')
def tearDown(self):
super(MongodbShardsTests, self).tearDown()

View File

@ -29,7 +29,7 @@ class TestShardCatalog(base.TestBase):
conf_file = 'etc/wsgi_sqlite_sharded.conf'
conf = cfg.ConfigOpts()
conf(default_config_files=[conf_file])
conf(args=[], default_config_files=[conf_file])
lookup = sharding.Catalog(conf).lookup

View File

@ -25,8 +25,8 @@ from marconi.tests import base
class TestBootstrap(base.TestBase):
def _bootstrap(self, conf_file):
conf = self.load_conf(conf_file)
return bootstrap.Bootstrap(conf)
self.conf = self.load_conf(conf_file)
return bootstrap.Bootstrap(self.conf)
def test_storage_invalid(self):
boot = self._bootstrap('etc/drivers_storage_invalid.conf')