From e0892978cd3589c559df90409ed3ba8f26f66cff Mon Sep 17 00:00:00 2001 From: kgriffs Date: Mon, 14 Oct 2013 10:02:57 -0500 Subject: [PATCH] feat: Storage sharding foundation This patch provides the plumbing for implementing storage sharding across multiple backends. Sharding is agnostic to storage driver type and transport type. The new feature is optional, and disabled by default. The design eschews placing any kind of sharding reverse proxy in the network, allowing the storage drivers to continue communicating directly with their respective backends. Sharding can be enabled by setting the global "sharding" option to True. Future patches will add a sharding section to the config that can be used to tweak the way sharding works when it is enabled. Storage drivers are managed by a Catalog class. The Catalog is responsible for registering and deregistering queues in the catalog backend, and for looking up an appropriate driver, according to which shard a particular queue has been assigned. In the future, this design will make it straightforward to map individual queues to different storage backends, according to user preference. FWIW, I considered enabling sharding by inserting the routing driver as the last stage in the storage pipeline. However, it felt like a hack for the following reasons: * Doing so orphaned the regular, solitary driver that was still always loaded at the end of the pipeline. * Since the bootstrap was not aware of the sharding driver, it could not be used to provide setup, so the catalog object had to be turned into a singleton and options had to always be loaded from the global config. * The driver would have to be added to each controller pipeline, and would have to always be the last stage in the pipeline. Introducing a simple "sharded" boolean option seemed to be a more straightforward, less error-prone way for operators to enable sharding. Partially-Implements: blueprint storage-sharding Change-Id: I5190211e81fe4acd311b2cfdd0bae806cc3fec81 --- etc/marconi-queues.conf-sample | 3 + marconi/queues/bootstrap.py | 28 ++- marconi/queues/storage/sharding.py | 205 ++++++++++++++++++ marconi/queues/storage/utils.py | 43 ++++ tests/etc/wsgi_mongodb_sharded.conf | 10 + tests/etc/wsgi_sqlite_sharded.conf | 6 + .../unit/queues/storage/test_shard_catalog.py | 43 ++++ .../queues/transport/wsgi/test_messages.py | 14 ++ tests/unit/test_bootstrap.py | 8 + 9 files changed, 349 insertions(+), 11 deletions(-) create mode 100644 marconi/queues/storage/sharding.py create mode 100644 marconi/queues/storage/utils.py create mode 100644 tests/etc/wsgi_mongodb_sharded.conf create mode 100644 tests/etc/wsgi_sqlite_sharded.conf create mode 100644 tests/unit/queues/storage/test_shard_catalog.py diff --git a/etc/marconi-queues.conf-sample b/etc/marconi-queues.conf-sample index a29bf238a..16c535aa6 100644 --- a/etc/marconi-queues.conf-sample +++ b/etc/marconi-queues.conf-sample @@ -14,6 +14,9 @@ log_file = /var/log/marconi/queues.log ;auth_strategy = +# Set to True to enable sharding across multiple storage backends +;sharding = False + # ================= Syslog Options ============================ # Send logs to syslog (/dev/log) instead of to file specified diff --git a/marconi/queues/bootstrap.py b/marconi/queues/bootstrap.py index d6840fae4..e5be8e744 100644 --- a/marconi/queues/bootstrap.py +++ b/marconi/queues/bootstrap.py @@ -20,10 +20,17 @@ from marconi.common import decorators from marconi.common import exceptions from marconi.openstack.common import log from marconi.queues.storage import pipeline +from marconi.queues.storage import sharding +from marconi.queues.storage import utils as storage_utils from marconi.queues import transport # NOQA LOG = log.getLogger(__name__) +_GENERAL_OPTIONS = [ + cfg.BoolOpt('sharding', default=False, + help='Enable sharding across multiple storage backends'), +] + _DRIVER_OPTIONS = [ cfg.StrOpt('transport', default='wsgi', help='Transport driver to use'), @@ -47,6 +54,7 @@ class Bootstrap(object): default_file = [config_file] self.conf = cfg.ConfigOpts() + self.conf.register_opts(_GENERAL_OPTIONS) self.conf.register_opts(_DRIVER_OPTIONS, group=_DRIVER_GROUP) self.driver_conf = self.conf[_DRIVER_GROUP] @@ -57,18 +65,16 @@ class Bootstrap(object): @decorators.lazy_property(write=False) def storage(self): - storage_name = self.driver_conf.storage - LOG.debug(_(u'Loading storage driver: ') + storage_name) + LOG.debug(_(u'Loading storage driver')) - try: - mgr = driver.DriverManager('marconi.queues.storage', - storage_name, - invoke_on_load=True, - invoke_args=[self.conf]) - return pipeline.Driver(self.conf, mgr.driver) - except RuntimeError as exc: - LOG.exception(exc) - raise exceptions.InvalidDriver(exc) + if self.conf.sharding: + LOG.debug(_(u'Storage sharding enabled')) + storage_driver = sharding.Driver(self.conf) + else: + storage_driver = storage_utils.load_storage_driver(self.conf) + + LOG.debug(_(u'Loading storage pipeline')) + return pipeline.Driver(self.conf, storage_driver) @decorators.lazy_property(write=False) def transport(self): diff --git a/marconi/queues/storage/sharding.py b/marconi/queues/storage/sharding.py new file mode 100644 index 000000000..ea329e5cb --- /dev/null +++ b/marconi/queues/storage/sharding.py @@ -0,0 +1,205 @@ +# Copyright (c) 2013 Rackspace, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# +# See the License for the specific language governing permissions and +# limitations under the License. + +from oslo.config import cfg + +from marconi.common import decorators +from marconi.queues import storage +from marconi.queues.storage import utils + +_CATALOG_OPTIONS = [ + cfg.IntOpt('storage', default='sqlite', + help='Catalog storage driver'), +] + +_CATALOG_GROUP = 'queues:sharding:catalog' + + +class Driver(storage.DriverBase): + """Sharding meta-driver for routing requests to multiple backends. + + :param storage_conf: Ignored, since this is a meta-driver + :param catalog_conf: Options pertaining to the shard catalog + """ + + def __init__(self, conf): + super(Driver, self).__init__(conf) + self._shard_catalog = Catalog(conf) + + @decorators.lazy_property(write=False) + def queue_controller(self): + return QueueController(self._shard_catalog) + + @decorators.lazy_property(write=False) + def message_controller(self): + return MessageController(self._shard_catalog) + + @decorators.lazy_property(write=False) + def claim_controller(self): + return ClaimController(self._shard_catalog) + + +class RoutingController(storage.base.ControllerBase): + """Routes operations to the appropriate shard. + + This controller stands in for a regular storage controller, + routing operations to a driver instance that represents + the shard to which the queue has been assigned. + + Do not instantiate this class directly; use one of the + more specific child classes instead. + """ + + _resource_name = None + + def __init__(self, shard_catalog): + super(RoutingController, self).__init__(None) + self._ctrl_property_name = self._resource_name + '_controller' + self._shard_catalog = shard_catalog + + @decorators.cached_getattr + def __getattr__(self, name): + # NOTE(kgriffs): Use a closure trick to avoid + # some attr lookups each time foward() is called. + lookup = self._shard_catalog.lookup + + # NOTE(kgriffs): Assume that every controller method + # that is exposed to the transport declares queue name + # as its first arg. The only exception to this + # is QueueController.list + def forward(queue, *args, **kwargs): + # NOTE(kgriffs): Using .get since 'project' is an + # optional argument. + storage = lookup(queue, kwargs.get('project')) + target_ctrl = getattr(storage, self._ctrl_property_name) + return getattr(target_ctrl, name)(queue, *args, **kwargs) + + return forward + + +class QueueController(RoutingController): + """Controller to facilitate special processing for queue operations.""" + + _resource_name = 'queue' + + def __init__(self, shard_catalog): + super(QueueController, self).__init__(shard_catalog) + + def list(self, project=None, marker=None, + limit=None, detailed=False): + # TODO(kgriffs): SHARDING - Query all shards and merge + # the results, then return the resulting list. + + # TODO(kgriffs): Remove this placeholder code - it is + # only here to make tests pass in the short term! + target = self._shard_catalog.lookup(None, project).queue_controller + return target.list(project=project, marker=marker, + limit=limit, detailed=detailed) + + def create(self, name, project=None): + self._shard_catalog.register(name, project) + + target = self._shard_catalog.lookup(name, project).queue_controller + return target.create(name, project) + + def delete(self, name, project=None): + self._shard_catalog.deregister(name, project) + + target = self._shard_catalog.lookup(name, project).queue_controller + return target.delete(name, project) + + +class MessageController(RoutingController): + _resource_name = 'message' + + +class ClaimController(RoutingController): + _resource_name = 'claim' + + +class Catalog(object): + """Represents the mapping between queues and shard drivers.""" + + def __init__(self, conf): + self._shards = {} + self._conf = conf + + self._conf.register_opts(_CATALOG_OPTIONS, group=_CATALOG_GROUP) + self._catalog_conf = self._conf[_CATALOG_GROUP] + + def _init_shard(self, shard_id): + # TODO(kgriffs): SHARDING - Read options from catalog backend + conf = cfg.ConfigOpts() + + options = [ + cfg.StrOpt('storage', default='sqlite') + ] + + conf.register_opts(options, group='queues:drivers') + return utils.load_storage_driver(conf) + + def register(self, queue, project=None): + """Register a new queue in the shard catalog. + + This method should be called whenever a new queue is being + created, and will create an entry in the shard catalog for + the given queue. + + After using this method to register the queue in the + catalog, the caller should call `lookup()` to get a reference + to a storage driver which will allow interacting with the + queue's assigned backend shard. + + :param queue: Name of the new queue to assign to a shard + :param project: Project to which the queue belongs, or + None for the "global" or "generic" project. + """ + + # TODO(kgriffs): SHARDING - Implement this! + pass + + def deregister(self, queue, project=None): + """Removes a queue from the shard catalog. + + Call this method after successfully deleting it from a + backend shard. + """ + + # TODO(kgriffs): SHARDING - Implement this! + pass + + def lookup(self, queue, project=None): + """Lookup a shard driver for the given queue and project. + + :param queue: Name of the queue for which to find a shard + :param project: Project to which the queue belongs, or + None to specify the "global" or "generic" project. + + :returns: A storage driver instance for the appropriate shard. If + the driver does not exist yet, it is created and cached. + """ + + # TODO(kgriffs): SHARDING - Raise an exception if the queue + # does not have a mapping (it does not exist). + + # TODO(kgriffs): SHARDING - Get ID from the catalog backend + shard_id = '[insert_id]' + try: + shard = self._shards[shard_id] + except KeyError: + self._shards[shard_id] = shard = self._init_shard(shard_id) + + return shard diff --git a/marconi/queues/storage/utils.py b/marconi/queues/storage/utils.py new file mode 100644 index 000000000..e94bf1292 --- /dev/null +++ b/marconi/queues/storage/utils.py @@ -0,0 +1,43 @@ +# Copyright (c) 2013 Rackspace, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# +# See the License for the specific language governing permissions and +# limitations under the License. + +from stevedore import driver + +from marconi.common import exceptions +from marconi.openstack.common import log + +LOG = log.getLogger(__name__) + + +def load_storage_driver(conf): + """Loads a storage driver and returns it. + + The driver's initializer will be passed conf as its only arg. + + :param conf: Configuration instance to use for loading the + driver. Must include a 'queues:drivers' group. + """ + + try: + mgr = driver.DriverManager('marconi.queues.storage', + conf['queues:drivers'].storage, + invoke_on_load=True, + invoke_args=[conf]) + return mgr.driver + + except RuntimeError as exc: + LOG.exception(exc) + raise exceptions.InvalidDriver(exc) diff --git a/tests/etc/wsgi_mongodb_sharded.conf b/tests/etc/wsgi_mongodb_sharded.conf new file mode 100644 index 000000000..bc6937d15 --- /dev/null +++ b/tests/etc/wsgi_mongodb_sharded.conf @@ -0,0 +1,10 @@ +[DEFAULT] +sharding = True + +[queues:drivers] +transport = wsgi +storage = mongodb + +[queues:drivers:storage:mongodb] +uri = mongodb://127.0.0.1:27017 +database = marconi_test diff --git a/tests/etc/wsgi_sqlite_sharded.conf b/tests/etc/wsgi_sqlite_sharded.conf new file mode 100644 index 000000000..7941d21a5 --- /dev/null +++ b/tests/etc/wsgi_sqlite_sharded.conf @@ -0,0 +1,6 @@ +[DEFAULT] +sharding = True + +[queues:drivers] +transport = wsgi +storage = sqlite diff --git a/tests/unit/queues/storage/test_shard_catalog.py b/tests/unit/queues/storage/test_shard_catalog.py new file mode 100644 index 000000000..43c3d0c15 --- /dev/null +++ b/tests/unit/queues/storage/test_shard_catalog.py @@ -0,0 +1,43 @@ +# Copyright (c) 2013 Rackspace, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# +# See the License for the specific language governing permissions and +# limitations under the License. + +from oslo.config import cfg + +from marconi.queues.storage import sharding +from marconi.queues.storage import sqlite +from marconi.tests import base + + +class TestShardCatalog(base.TestBase): + + def test_lookup(self): + # TODO(kgriffs): SHARDING - configure sharding to use an in-memory + # backend store, and register the queues we are going to look up. + conf_file = 'etc/wsgi_sqlite_sharded.conf' + + conf = cfg.ConfigOpts() + conf(default_config_files=[conf_file]) + + lookup = sharding.Catalog(conf).lookup + + storage = lookup('q1', '123456') + self.assertIsInstance(storage, sqlite.Driver) + + storage = lookup('q2', '123456') + self.assertIsInstance(storage, sqlite.Driver) + + storage = lookup('g1', None) + self.assertIsInstance(storage, sqlite.Driver) diff --git a/tests/unit/queues/transport/wsgi/test_messages.py b/tests/unit/queues/transport/wsgi/test_messages.py index 01e356589..2383cc843 100644 --- a/tests/unit/queues/transport/wsgi/test_messages.py +++ b/tests/unit/queues/transport/wsgi/test_messages.py @@ -419,6 +419,11 @@ class MessagesSQLiteTests(MessagesBaseTest): config_filename = 'wsgi_sqlite.conf' +class MessagesSQLiteShardedTests(MessagesBaseTest): + + config_filename = 'wsgi_sqlite_sharded.conf' + + @testing.requires_mongodb class MessagesMongoDBTests(MessagesBaseTest): @@ -428,6 +433,15 @@ class MessagesMongoDBTests(MessagesBaseTest): super(MessagesMongoDBTests, self).setUp() +@testing.requires_mongodb +class MessagesMongoDBShardedTests(MessagesBaseTest): + + config_filename = 'wsgi_mongodb_sharded.conf' + + def setUp(self): + super(MessagesMongoDBShardedTests, self).setUp() + + class MessagesFaultyDriverTests(base.TestBaseFaulty): config_filename = 'wsgi_faulty.conf' diff --git a/tests/unit/test_bootstrap.py b/tests/unit/test_bootstrap.py index b5c1d56c6..6d19e2dc8 100644 --- a/tests/unit/test_bootstrap.py +++ b/tests/unit/test_bootstrap.py @@ -18,6 +18,7 @@ from oslo.config import cfg from marconi.common import exceptions import marconi.queues from marconi.queues.storage import pipeline +from marconi.queues.storage import sharding from marconi.queues.storage import sqlite from marconi.queues.transport import wsgi from marconi.tests import base @@ -40,6 +41,13 @@ class TestBootstrap(base.TestBase): self.assertIsInstance(bootstrap.storage, pipeline.Driver) self.assertIsInstance(bootstrap.storage._storage, sqlite.Driver) + def test_storage_sqlite_sharded(self): + """Makes sure we can load the shard driver.""" + conf_file = 'etc/wsgi_sqlite_sharded.conf' + bootstrap = marconi.Bootstrap(conf_file) + + self.assertIsInstance(bootstrap.storage._storage, sharding.Driver) + def test_transport_invalid(self): conf_file = 'etc/drivers_transport_invalid.conf' bootstrap = marconi.Bootstrap(conf_file)