From 9c7036ff4eaa093e72a9dfa5b0c2301f576d1d38 Mon Sep 17 00:00:00 2001 From: Alejandro Cabrera Date: Mon, 28 Oct 2013 11:17:28 -0400 Subject: [PATCH] feat: shards mongodb driver + tests This patch adds shard management capability to the queues mongodb driver. The storage API is also correctly split into an control and data interfaces. The data drivers are loaded by default. The control drivers are loaded as needed by the transport layer. Unit tests are also provided to verify that the driver (and future drivers) work as expected. Change-Id: Iad034a429a763c9a2ce161f05c928b090ab58944 Partially-implements: blueprint storage-sharding Partially-Closes: 1241686 --- marconi/common/utils.py | 38 ++++++ marconi/queues/bootstrap.py | 2 +- marconi/queues/storage/__init__.py | 1 + marconi/queues/storage/base.py | 17 ++- marconi/queues/storage/mongodb/driver.py | 29 +++-- marconi/queues/storage/mongodb/shards.py | 83 +++++++++++-- marconi/queues/storage/sharding.py | 6 +- marconi/queues/storage/sqlite/shards.py | 2 +- marconi/queues/storage/utils.py | 2 +- marconi/tests/faulty_storage.py | 9 +- marconi/tests/queues/storage/base.py | 111 ++++++++++++++++++ setup.cfg | 6 +- tests/etc/drivers_storage_invalid.conf | 1 + tests/etc/wsgi_sqlite.conf | 1 + .../unit/queues/storage/test_impl_mongodb.py | 13 ++ .../unit/queues/storage/test_shard_catalog.py | 2 +- tests/unit/test_bootstrap.py | 4 +- 17 files changed, 296 insertions(+), 31 deletions(-) create mode 100644 marconi/common/utils.py diff --git a/marconi/common/utils.py b/marconi/common/utils.py new file mode 100644 index 000000000..d87010779 --- /dev/null +++ b/marconi/common/utils.py @@ -0,0 +1,38 @@ +# Copyright (c) 2013 Rackspace Hosting, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""utils: general-purpose utilities.""" + +import six + + +def fields(d, names, pred=lambda x: True, + key_transform=lambda x: x, value_transform=lambda x: x): + """Returns the entries in this dictionary with keys appearing in names. + :type d: dict + :type names: [a] + :param pred: a filter that is applied to the values of the dictionary. + :type pred: (a -> bool) + :param key_transform: a transform to apply to the key before returning it + :type key_transform: a -> a + :param value_transform: a transform to apply to the value before + returning it + :type value_transform: a -> a + :rtype: dict + + """ + return dict((key_transform(k), value_transform(v)) + for k, v in six.iteritems(d) + if k in names and pred(v)) diff --git a/marconi/queues/bootstrap.py b/marconi/queues/bootstrap.py index b257b5f9f..25d15a20a 100644 --- a/marconi/queues/bootstrap.py +++ b/marconi/queues/bootstrap.py @@ -58,7 +58,7 @@ class Bootstrap(object): self.driver_conf = self.conf[_DRIVER_GROUP] log.setup('marconi') - mode = 'admin' if self.conf.admin_mode else 'public' + mode = 'admin' if conf.admin_mode else 'public' self._transport_type = 'marconi.queues.{0}.transport'.format(mode) @decorators.lazy_property(write=False) diff --git a/marconi/queues/storage/__init__.py b/marconi/queues/storage/__init__.py index 7c3f9ddf4..288906768 100644 --- a/marconi/queues/storage/__init__.py +++ b/marconi/queues/storage/__init__.py @@ -9,3 +9,4 @@ DataDriverBase = base.DataDriverBase ClaimBase = base.ClaimBase MessageBase = base.MessageBase QueueBase = base.QueueBase +ShardsBase = base.ShardsBase diff --git a/marconi/queues/storage/base.py b/marconi/queues/storage/base.py index 7e09df333..ce6ee5ec5 100644 --- a/marconi/queues/storage/base.py +++ b/marconi/queues/storage/base.py @@ -368,8 +368,19 @@ class ClaimBase(ControllerBase): raise NotImplementedError +class AdminControllerBase(object): + """Top-level class for controllers. + + :param driver: Instance of the driver + instantiating this controller. + """ + + def __init__(self, driver): + self.driver = driver + + @six.add_metaclass(abc.ABCMeta) -class ShardsController(ControllerBase): +class ShardsBase(AdminControllerBase): """A controller for managing shards.""" @abc.abstractmethod @@ -404,11 +415,13 @@ class ShardsController(ControllerBase): raise NotImplementedError @abc.abstractmethod - def get(self, name): + def get(self, name, detailed=False): """Returns a single shard entry. :param name: The name of this shard :type name: six.text_type + :param detailed: Should the options data be included? + :type detailed: bool :returns: weight, uri, and options for this shard :rtype: {} :raises: ShardDoesNotExist if not found diff --git a/marconi/queues/storage/mongodb/driver.py b/marconi/queues/storage/mongodb/driver.py index 1eb1317e4..0f4b02617 100644 --- a/marconi/queues/storage/mongodb/driver.py +++ b/marconi/queues/storage/mongodb/driver.py @@ -28,6 +28,16 @@ from marconi.queues.storage.mongodb import options LOG = logging.getLogger(__name__) +def _connection(conf): + """MongoDB client connection instance.""" + if conf.uri and 'replicaSet' in conf.uri: + MongoClient = pymongo.MongoReplicaSetClient + else: + MongoClient = pymongo.MongoClient + + return MongoClient(conf.uri) + + class DataDriver(storage.DataDriverBase): def __init__(self, conf): @@ -68,14 +78,7 @@ class DataDriver(storage.DataDriverBase): @decorators.lazy_property(write=False) def connection(self): - """MongoDB client connection instance.""" - - if self.mongodb_conf.uri and 'replicaSet' in self.mongodb_conf.uri: - MongoClient = pymongo.MongoReplicaSetClient - else: - MongoClient = pymongo.MongoClient - - return MongoClient(self.mongodb_conf.uri) + return _connection(self.mongodb_conf) @decorators.lazy_property(write=False) def queue_controller(self): @@ -100,6 +103,16 @@ class ControlDriver(storage.ControlDriverBase): self.mongodb_conf = self.conf[options.MONGODB_GROUP] + @decorators.lazy_property(write=False) + def connection(self): + """MongoDB client connection instance.""" + return _connection(self.mongodb_conf) + + @decorators.lazy_property(write=False) + def shards_database(self): + name = self.mongodb_conf.database + '_shards' + return self.connection[name] + @property def shards_controller(self): return controllers.ShardsController(self) diff --git a/marconi/queues/storage/mongodb/shards.py b/marconi/queues/storage/mongodb/shards.py index a7af06463..7fbe54207 100644 --- a/marconi/queues/storage/mongodb/shards.py +++ b/marconi/queues/storage/mongodb/shards.py @@ -14,28 +14,91 @@ # See the License for the specific language governing permissions and # limitations under the License. -from marconi.queues.storage import base +"""shards: an implementation of the shard management storage +controller for mongodb. + +Schema: + 'n': name :: six.text_type + 'u': uri :: six.text_type + 'w': weight :: int + 'o': options :: dict +""" + +from marconi.common import utils as common_utils +from marconi.queues.storage import base, exceptions +from marconi.queues.storage.mongodb import utils + +SHARDS_INDEX = [ + ('n', 1) +] + +# NOTE(cpp-cabrera): used for get/list operations. There's no need to +# show the marker or the _id - they're implementation details. +OMIT_FIELDS = (('_id', 0),) -class ShardsController(base.ShardsController): +def _field_spec(detailed=False): + return dict(OMIT_FIELDS + (() if detailed else (('o', 0),))) + +class ShardsController(base.ShardsBase): + + def __init__(self, *args, **kwargs): + super(ShardsController, self).__init__(*args, **kwargs) + + self._col = self.driver.shards_database.shards + self._col.ensure_index(SHARDS_INDEX, + background=True, + name='shards_name', + unique=True) + + @utils.raises_conn_error def list(self, marker=None, limit=10, detailed=False): - pass + query = {} + if marker is not None: + query['n'] = {'$gt': marker} - def get(self, name): - pass + return self._col.find(query, fields=_field_spec(detailed), + limit=limit) + @utils.raises_conn_error + def get(self, name, detailed=False): + res = self._col.find_one({'n': name}, + _field_spec(detailed)) + if not res: + raise exceptions.ShardDoesNotExist(name) + return res + + @utils.raises_conn_error def create(self, name, weight, uri, options=None): - pass + options = {} if options is None else options + self._col.update({'n': name}, + {'$set': {'n': name, 'w': weight, 'u': uri, + 'o': options}}, + upsert=True) + @utils.raises_conn_error def exists(self, name): - pass + return self._col.find_one({'n': name}) is not None + @utils.raises_conn_error def update(self, name, **kwargs): - pass + names = ('uri', 'weight', 'options') + fields = common_utils.fields(kwargs, names, + pred=lambda x: x is not None, + key_transform=lambda x: x[0]) + assert fields, '`weight`, `uri`, or `options` not found in kwargs' + res = self._col.update({'n': name}, + {'$set': fields}, + upsert=False) + if not res['updatedExisting']: + raise exceptions.ShardDoesNotExist(name) + @utils.raises_conn_error def delete(self, name): - pass + self._col.remove({'n': name}, w=0) + @utils.raises_conn_error def drop_all(self): - pass + self._col.drop() + self._col.ensure_index(SHARDS_INDEX, unique=True) diff --git a/marconi/queues/storage/sharding.py b/marconi/queues/storage/sharding.py index 291ddc676..67caeb71b 100644 --- a/marconi/queues/storage/sharding.py +++ b/marconi/queues/storage/sharding.py @@ -144,10 +144,14 @@ class Catalog(object): # TODO(kgriffs): SHARDING - Read options from catalog backend conf = cfg.ConfigOpts() + general_opts = [ + cfg.BoolOpt('admin_mode', default=False) + ] options = [ - cfg.StrOpt('storage', default='sqlite') + cfg.StrOpt('storage', default='sqlite'), ] + conf.register_opts(general_opts) conf.register_opts(options, group='queues:drivers') return utils.load_storage_driver(conf) diff --git a/marconi/queues/storage/sqlite/shards.py b/marconi/queues/storage/sqlite/shards.py index a7af06463..9aecd3bc6 100644 --- a/marconi/queues/storage/sqlite/shards.py +++ b/marconi/queues/storage/sqlite/shards.py @@ -17,7 +17,7 @@ from marconi.queues.storage import base -class ShardsController(base.ShardsController): +class ShardsController(base.ShardsBase): def list(self, marker=None, limit=10, detailed=False): pass diff --git a/marconi/queues/storage/utils.py b/marconi/queues/storage/utils.py index e94bf1292..4f55ab53a 100644 --- a/marconi/queues/storage/utils.py +++ b/marconi/queues/storage/utils.py @@ -32,7 +32,7 @@ def load_storage_driver(conf): """ try: - mgr = driver.DriverManager('marconi.queues.storage', + mgr = driver.DriverManager('marconi.queues.data.storage', conf['queues:drivers'].storage, invoke_on_load=True, invoke_args=[conf]) diff --git a/marconi/tests/faulty_storage.py b/marconi/tests/faulty_storage.py index e9464883f..20e1ff859 100644 --- a/marconi/tests/faulty_storage.py +++ b/marconi/tests/faulty_storage.py @@ -17,6 +17,9 @@ from marconi.queues import storage class DataDriver(storage.DataDriverBase): + def __init__(self, conf): + super(DataDriver, self).__init__(conf) + @property def default_options(self): return {} @@ -35,9 +38,9 @@ class DataDriver(storage.DataDriverBase): class ControlDriver(storage.ControlDriverBase): - @property - def default_options(self): - return {} + + def __init__(self, conf): + super(ControlDriver, self).__init__(conf) @property def shards_controller(self): diff --git a/marconi/tests/queues/storage/base.py b/marconi/tests/queues/storage/base.py index 22743ea3a..158aab78d 100644 --- a/marconi/tests/queues/storage/base.py +++ b/marconi/tests/queues/storage/base.py @@ -601,6 +601,117 @@ class ClaimControllerTest(ControllerBaseTest): project=self.project) +class ShardsControllerTest(ControllerBaseTest): + """Shards Controller base tests. + + NOTE(flaper87): Implementations of this class should + override the tearDown method in order + to clean up storage's state. + """ + controller_base_class = storage.ShardsBase + + def setUp(self): + super(ShardsControllerTest, self).setUp() + self.shards_controller = self.driver.shards_controller + + # Let's create one shard + self.shard = str(uuid.uuid1()) + self.shards_controller.create(self.shard, 100, 'localhost', {}) + + def tearDown(self): + self.shards_controller.drop_all() + super(ShardsControllerTest, self).tearDown() + + def test_create_succeeds(self): + self.shards_controller.create(str(uuid.uuid1()), + 100, 'localhost', {}) + + def test_create_replaces_on_duplicate_insert(self): + name = str(uuid.uuid1()) + self.shards_controller.create(name, + 100, 'localhost', {}) + self.shards_controller.create(name, + 111, 'localhost2', {}) + entry = self.shards_controller.get(name) + self._shard_expects(entry, xname=name, xweight=111, + xlocation='localhost2') + + def _shard_expects(self, shard, xname, xweight, xlocation): + self.assertIn('n', shard) + self.assertEqual(shard['n'], xname) + self.assertIn('w', shard) + self.assertEqual(shard['w'], xweight) + self.assertIn('u', shard) + self.assertEqual(shard['u'], xlocation) + + def test_get_returns_expected_content(self): + res = self.shards_controller.get(self.shard) + self._shard_expects(res, self.shard, 100, 'localhost') + self.assertNotIn('o', res) + + def test_detailed_get_returns_expected_content(self): + res = self.shards_controller.get(self.shard, detailed=True) + self.assertIn('o', res) + self.assertEqual(res['o'], {}) + + def test_get_raises_if_not_found(self): + self.assertRaises(storage.exceptions.ShardDoesNotExist, + self.shards_controller.get, 'notexists') + + def test_exists(self): + self.assertTrue(self.shards_controller.exists(self.shard)) + self.assertFalse(self.shards_controller.exists('notexists')) + + def test_update_raises_assertion_error_on_bad_fields(self): + self.assertRaises(AssertionError, self.shards_controller.update, + self.shard) + + def test_update_works(self): + self.shards_controller.update(self.shard, weight=101, + uri='redis://localhost', + options={'a': 1}) + res = self.shards_controller.get(self.shard, detailed=True) + self._shard_expects(res, self.shard, 101, 'redis://localhost') + self.assertEqual(res['o'], {'a': 1}) + + def test_delete_works(self): + self.shards_controller.delete(self.shard) + self.assertFalse(self.shards_controller.exists(self.shard)) + + def test_delete_nonexistent_is_silent(self): + self.shards_controller.delete('nonexisting') + + def test_drop_all_leads_to_empty_listing(self): + self.shards_controller.drop_all() + cursor = self.shards_controller.list() + self.assertRaises(StopIteration, next, cursor) + + def test_listing_simple(self): + # NOTE(cpp-cabrera): base entry interferes with listing results + self.shards_controller.delete(self.shard) + + for i in range(15): + self.shards_controller.create(str(i), i, str(i), {}) + + res = list(self.shards_controller.list()) + self.assertEqual(len(res), 10) + for i, entry in enumerate(res): + self._shard_expects(entry, str(i), i, str(i)) + + res = list(self.shards_controller.list(limit=5)) + self.assertEqual(len(res), 5) + + res = next(self.shards_controller.list(marker='3')) + self._shard_expects(res, '4', 4, '4') + + res = list(self.shards_controller.list(detailed=True)) + self.assertEqual(len(res), 10) + for i, entry in enumerate(res): + self._shard_expects(entry, str(i), i, str(i)) + self.assertIn('o', entry) + self.assertEqual(entry['o'], {}) + + def _insert_fixtures(controller, queue_name, project=None, client_uuid=None, num=4, ttl=120): diff --git a/setup.cfg b/setup.cfg index 8b300ff06..72f73b2a5 100644 --- a/setup.cfg +++ b/setup.cfg @@ -26,10 +26,14 @@ packages = console_scripts = marconi-server = marconi.cmd.server:run -marconi.queues.storage = +marconi.queues.data.storage = sqlite = marconi.queues.storage.sqlite.driver:DataDriver mongodb = marconi.queues.storage.mongodb.driver:DataDriver +marconi.queues.control.storage = + sqlite = marconi.queues.storage.sqlite.driver:ControlDriver + mongodb = marconi.queues.storage.mongodb.driver:ControlDriver + marconi.queues.public.transport = wsgi = marconi.queues.transport.wsgi.public.driver:Driver diff --git a/tests/etc/drivers_storage_invalid.conf b/tests/etc/drivers_storage_invalid.conf index e779a56f8..9a0ecd11e 100644 --- a/tests/etc/drivers_storage_invalid.conf +++ b/tests/etc/drivers_storage_invalid.conf @@ -1,6 +1,7 @@ [DEFAULT] debug = False verbose = False +admin_mode = False [queues:drivers] transport = wsgi diff --git a/tests/etc/wsgi_sqlite.conf b/tests/etc/wsgi_sqlite.conf index 82ffd6366..e5dfe37b5 100644 --- a/tests/etc/wsgi_sqlite.conf +++ b/tests/etc/wsgi_sqlite.conf @@ -1,6 +1,7 @@ [DEFAULT] debug = False verbose = False +admin_mode = False [queues:drivers] transport = wsgi diff --git a/tests/unit/queues/storage/test_impl_mongodb.py b/tests/unit/queues/storage/test_impl_mongodb.py index 9eb1f7090..a852e0ce6 100644 --- a/tests/unit/queues/storage/test_impl_mongodb.py +++ b/tests/unit/queues/storage/test_impl_mongodb.py @@ -332,3 +332,16 @@ class MongodbClaimTests(base.ClaimControllerTest): self.assertRaises(storage.exceptions.ClaimDoesNotExist, self.controller.update, self.queue_name, claim_id, {}, project=self.project) + + +@testing.requires_mongodb +class MongodbShardsTests(base.ShardsControllerTest): + driver_class = mongodb.ControlDriver + controller_class = controllers.ShardsController + + def setUp(self): + super(MongodbShardsTests, self).setUp() + self.load_conf('wsgi_mongodb.conf') + + def tearDown(self): + super(MongodbShardsTests, self).tearDown() diff --git a/tests/unit/queues/storage/test_shard_catalog.py b/tests/unit/queues/storage/test_shard_catalog.py index 0732bb83d..bde665180 100644 --- a/tests/unit/queues/storage/test_shard_catalog.py +++ b/tests/unit/queues/storage/test_shard_catalog.py @@ -29,7 +29,7 @@ class TestShardCatalog(base.TestBase): conf_file = 'etc/wsgi_sqlite_sharded.conf' conf = cfg.ConfigOpts() - conf(default_config_files=[conf_file]) + conf(args=[], default_config_files=[conf_file]) lookup = sharding.Catalog(conf).lookup diff --git a/tests/unit/test_bootstrap.py b/tests/unit/test_bootstrap.py index 29f84278c..b4140d311 100644 --- a/tests/unit/test_bootstrap.py +++ b/tests/unit/test_bootstrap.py @@ -25,8 +25,8 @@ from marconi.tests import base class TestBootstrap(base.TestBase): def _bootstrap(self, conf_file): - conf = self.load_conf(conf_file) - return bootstrap.Bootstrap(conf) + self.conf = self.load_conf(conf_file) + return bootstrap.Bootstrap(self.conf) def test_storage_invalid(self): boot = self._bootstrap('etc/drivers_storage_invalid.conf')