Merge "feat: Storage sharding foundation"
This commit is contained in:
commit
776260836a
@ -14,6 +14,9 @@ log_file = /var/log/marconi/queues.log
|
||||
|
||||
;auth_strategy =
|
||||
|
||||
# Set to True to enable sharding across multiple storage backends
|
||||
;sharding = False
|
||||
|
||||
# ================= Syslog Options ============================
|
||||
|
||||
# Send logs to syslog (/dev/log) instead of to file specified
|
||||
|
@ -20,10 +20,17 @@ from marconi.common import decorators
|
||||
from marconi.common import exceptions
|
||||
from marconi.openstack.common import log
|
||||
from marconi.queues.storage import pipeline
|
||||
from marconi.queues.storage import sharding
|
||||
from marconi.queues.storage import utils as storage_utils
|
||||
from marconi.queues import transport # NOQA
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
_GENERAL_OPTIONS = [
|
||||
cfg.BoolOpt('sharding', default=False,
|
||||
help='Enable sharding across multiple storage backends'),
|
||||
]
|
||||
|
||||
_DRIVER_OPTIONS = [
|
||||
cfg.StrOpt('transport', default='wsgi',
|
||||
help='Transport driver to use'),
|
||||
@ -47,6 +54,7 @@ class Bootstrap(object):
|
||||
default_file = [config_file]
|
||||
|
||||
self.conf = cfg.ConfigOpts()
|
||||
self.conf.register_opts(_GENERAL_OPTIONS)
|
||||
self.conf.register_opts(_DRIVER_OPTIONS, group=_DRIVER_GROUP)
|
||||
self.driver_conf = self.conf[_DRIVER_GROUP]
|
||||
|
||||
@ -57,18 +65,16 @@ class Bootstrap(object):
|
||||
|
||||
@decorators.lazy_property(write=False)
|
||||
def storage(self):
|
||||
storage_name = self.driver_conf.storage
|
||||
LOG.debug(_(u'Loading storage driver: ') + storage_name)
|
||||
LOG.debug(_(u'Loading storage driver'))
|
||||
|
||||
try:
|
||||
mgr = driver.DriverManager('marconi.queues.storage',
|
||||
storage_name,
|
||||
invoke_on_load=True,
|
||||
invoke_args=[self.conf])
|
||||
return pipeline.Driver(self.conf, mgr.driver)
|
||||
except RuntimeError as exc:
|
||||
LOG.exception(exc)
|
||||
raise exceptions.InvalidDriver(exc)
|
||||
if self.conf.sharding:
|
||||
LOG.debug(_(u'Storage sharding enabled'))
|
||||
storage_driver = sharding.Driver(self.conf)
|
||||
else:
|
||||
storage_driver = storage_utils.load_storage_driver(self.conf)
|
||||
|
||||
LOG.debug(_(u'Loading storage pipeline'))
|
||||
return pipeline.Driver(self.conf, storage_driver)
|
||||
|
||||
@decorators.lazy_property(write=False)
|
||||
def transport(self):
|
||||
|
205
marconi/queues/storage/sharding.py
Normal file
205
marconi/queues/storage/sharding.py
Normal file
@ -0,0 +1,205 @@
|
||||
# Copyright (c) 2013 Rackspace, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
#
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from oslo.config import cfg
|
||||
|
||||
from marconi.common import decorators
|
||||
from marconi.queues import storage
|
||||
from marconi.queues.storage import utils
|
||||
|
||||
_CATALOG_OPTIONS = [
|
||||
cfg.IntOpt('storage', default='sqlite',
|
||||
help='Catalog storage driver'),
|
||||
]
|
||||
|
||||
_CATALOG_GROUP = 'queues:sharding:catalog'
|
||||
|
||||
|
||||
class Driver(storage.DriverBase):
|
||||
"""Sharding meta-driver for routing requests to multiple backends.
|
||||
|
||||
:param storage_conf: Ignored, since this is a meta-driver
|
||||
:param catalog_conf: Options pertaining to the shard catalog
|
||||
"""
|
||||
|
||||
def __init__(self, conf):
|
||||
super(Driver, self).__init__(conf)
|
||||
self._shard_catalog = Catalog(conf)
|
||||
|
||||
@decorators.lazy_property(write=False)
|
||||
def queue_controller(self):
|
||||
return QueueController(self._shard_catalog)
|
||||
|
||||
@decorators.lazy_property(write=False)
|
||||
def message_controller(self):
|
||||
return MessageController(self._shard_catalog)
|
||||
|
||||
@decorators.lazy_property(write=False)
|
||||
def claim_controller(self):
|
||||
return ClaimController(self._shard_catalog)
|
||||
|
||||
|
||||
class RoutingController(storage.base.ControllerBase):
|
||||
"""Routes operations to the appropriate shard.
|
||||
|
||||
This controller stands in for a regular storage controller,
|
||||
routing operations to a driver instance that represents
|
||||
the shard to which the queue has been assigned.
|
||||
|
||||
Do not instantiate this class directly; use one of the
|
||||
more specific child classes instead.
|
||||
"""
|
||||
|
||||
_resource_name = None
|
||||
|
||||
def __init__(self, shard_catalog):
|
||||
super(RoutingController, self).__init__(None)
|
||||
self._ctrl_property_name = self._resource_name + '_controller'
|
||||
self._shard_catalog = shard_catalog
|
||||
|
||||
@decorators.cached_getattr
|
||||
def __getattr__(self, name):
|
||||
# NOTE(kgriffs): Use a closure trick to avoid
|
||||
# some attr lookups each time foward() is called.
|
||||
lookup = self._shard_catalog.lookup
|
||||
|
||||
# NOTE(kgriffs): Assume that every controller method
|
||||
# that is exposed to the transport declares queue name
|
||||
# as its first arg. The only exception to this
|
||||
# is QueueController.list
|
||||
def forward(queue, *args, **kwargs):
|
||||
# NOTE(kgriffs): Using .get since 'project' is an
|
||||
# optional argument.
|
||||
storage = lookup(queue, kwargs.get('project'))
|
||||
target_ctrl = getattr(storage, self._ctrl_property_name)
|
||||
return getattr(target_ctrl, name)(queue, *args, **kwargs)
|
||||
|
||||
return forward
|
||||
|
||||
|
||||
class QueueController(RoutingController):
|
||||
"""Controller to facilitate special processing for queue operations."""
|
||||
|
||||
_resource_name = 'queue'
|
||||
|
||||
def __init__(self, shard_catalog):
|
||||
super(QueueController, self).__init__(shard_catalog)
|
||||
|
||||
def list(self, project=None, marker=None,
|
||||
limit=None, detailed=False):
|
||||
# TODO(kgriffs): SHARDING - Query all shards and merge
|
||||
# the results, then return the resulting list.
|
||||
|
||||
# TODO(kgriffs): Remove this placeholder code - it is
|
||||
# only here to make tests pass in the short term!
|
||||
target = self._shard_catalog.lookup(None, project).queue_controller
|
||||
return target.list(project=project, marker=marker,
|
||||
limit=limit, detailed=detailed)
|
||||
|
||||
def create(self, name, project=None):
|
||||
self._shard_catalog.register(name, project)
|
||||
|
||||
target = self._shard_catalog.lookup(name, project).queue_controller
|
||||
return target.create(name, project)
|
||||
|
||||
def delete(self, name, project=None):
|
||||
self._shard_catalog.deregister(name, project)
|
||||
|
||||
target = self._shard_catalog.lookup(name, project).queue_controller
|
||||
return target.delete(name, project)
|
||||
|
||||
|
||||
class MessageController(RoutingController):
|
||||
_resource_name = 'message'
|
||||
|
||||
|
||||
class ClaimController(RoutingController):
|
||||
_resource_name = 'claim'
|
||||
|
||||
|
||||
class Catalog(object):
|
||||
"""Represents the mapping between queues and shard drivers."""
|
||||
|
||||
def __init__(self, conf):
|
||||
self._shards = {}
|
||||
self._conf = conf
|
||||
|
||||
self._conf.register_opts(_CATALOG_OPTIONS, group=_CATALOG_GROUP)
|
||||
self._catalog_conf = self._conf[_CATALOG_GROUP]
|
||||
|
||||
def _init_shard(self, shard_id):
|
||||
# TODO(kgriffs): SHARDING - Read options from catalog backend
|
||||
conf = cfg.ConfigOpts()
|
||||
|
||||
options = [
|
||||
cfg.StrOpt('storage', default='sqlite')
|
||||
]
|
||||
|
||||
conf.register_opts(options, group='queues:drivers')
|
||||
return utils.load_storage_driver(conf)
|
||||
|
||||
def register(self, queue, project=None):
|
||||
"""Register a new queue in the shard catalog.
|
||||
|
||||
This method should be called whenever a new queue is being
|
||||
created, and will create an entry in the shard catalog for
|
||||
the given queue.
|
||||
|
||||
After using this method to register the queue in the
|
||||
catalog, the caller should call `lookup()` to get a reference
|
||||
to a storage driver which will allow interacting with the
|
||||
queue's assigned backend shard.
|
||||
|
||||
:param queue: Name of the new queue to assign to a shard
|
||||
:param project: Project to which the queue belongs, or
|
||||
None for the "global" or "generic" project.
|
||||
"""
|
||||
|
||||
# TODO(kgriffs): SHARDING - Implement this!
|
||||
pass
|
||||
|
||||
def deregister(self, queue, project=None):
|
||||
"""Removes a queue from the shard catalog.
|
||||
|
||||
Call this method after successfully deleting it from a
|
||||
backend shard.
|
||||
"""
|
||||
|
||||
# TODO(kgriffs): SHARDING - Implement this!
|
||||
pass
|
||||
|
||||
def lookup(self, queue, project=None):
|
||||
"""Lookup a shard driver for the given queue and project.
|
||||
|
||||
:param queue: Name of the queue for which to find a shard
|
||||
:param project: Project to which the queue belongs, or
|
||||
None to specify the "global" or "generic" project.
|
||||
|
||||
:returns: A storage driver instance for the appropriate shard. If
|
||||
the driver does not exist yet, it is created and cached.
|
||||
"""
|
||||
|
||||
# TODO(kgriffs): SHARDING - Raise an exception if the queue
|
||||
# does not have a mapping (it does not exist).
|
||||
|
||||
# TODO(kgriffs): SHARDING - Get ID from the catalog backend
|
||||
shard_id = '[insert_id]'
|
||||
try:
|
||||
shard = self._shards[shard_id]
|
||||
except KeyError:
|
||||
self._shards[shard_id] = shard = self._init_shard(shard_id)
|
||||
|
||||
return shard
|
43
marconi/queues/storage/utils.py
Normal file
43
marconi/queues/storage/utils.py
Normal file
@ -0,0 +1,43 @@
|
||||
# Copyright (c) 2013 Rackspace, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
#
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from stevedore import driver
|
||||
|
||||
from marconi.common import exceptions
|
||||
from marconi.openstack.common import log
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
|
||||
def load_storage_driver(conf):
|
||||
"""Loads a storage driver and returns it.
|
||||
|
||||
The driver's initializer will be passed conf as its only arg.
|
||||
|
||||
:param conf: Configuration instance to use for loading the
|
||||
driver. Must include a 'queues:drivers' group.
|
||||
"""
|
||||
|
||||
try:
|
||||
mgr = driver.DriverManager('marconi.queues.storage',
|
||||
conf['queues:drivers'].storage,
|
||||
invoke_on_load=True,
|
||||
invoke_args=[conf])
|
||||
return mgr.driver
|
||||
|
||||
except RuntimeError as exc:
|
||||
LOG.exception(exc)
|
||||
raise exceptions.InvalidDriver(exc)
|
10
tests/etc/wsgi_mongodb_sharded.conf
Normal file
10
tests/etc/wsgi_mongodb_sharded.conf
Normal file
@ -0,0 +1,10 @@
|
||||
[DEFAULT]
|
||||
sharding = True
|
||||
|
||||
[queues:drivers]
|
||||
transport = wsgi
|
||||
storage = mongodb
|
||||
|
||||
[queues:drivers:storage:mongodb]
|
||||
uri = mongodb://127.0.0.1:27017
|
||||
database = marconi_test
|
6
tests/etc/wsgi_sqlite_sharded.conf
Normal file
6
tests/etc/wsgi_sqlite_sharded.conf
Normal file
@ -0,0 +1,6 @@
|
||||
[DEFAULT]
|
||||
sharding = True
|
||||
|
||||
[queues:drivers]
|
||||
transport = wsgi
|
||||
storage = sqlite
|
43
tests/unit/queues/storage/test_shard_catalog.py
Normal file
43
tests/unit/queues/storage/test_shard_catalog.py
Normal file
@ -0,0 +1,43 @@
|
||||
# Copyright (c) 2013 Rackspace, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
#
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from oslo.config import cfg
|
||||
|
||||
from marconi.queues.storage import sharding
|
||||
from marconi.queues.storage import sqlite
|
||||
from marconi.tests import base
|
||||
|
||||
|
||||
class TestShardCatalog(base.TestBase):
|
||||
|
||||
def test_lookup(self):
|
||||
# TODO(kgriffs): SHARDING - configure sharding to use an in-memory
|
||||
# backend store, and register the queues we are going to look up.
|
||||
conf_file = 'etc/wsgi_sqlite_sharded.conf'
|
||||
|
||||
conf = cfg.ConfigOpts()
|
||||
conf(default_config_files=[conf_file])
|
||||
|
||||
lookup = sharding.Catalog(conf).lookup
|
||||
|
||||
storage = lookup('q1', '123456')
|
||||
self.assertIsInstance(storage, sqlite.Driver)
|
||||
|
||||
storage = lookup('q2', '123456')
|
||||
self.assertIsInstance(storage, sqlite.Driver)
|
||||
|
||||
storage = lookup('g1', None)
|
||||
self.assertIsInstance(storage, sqlite.Driver)
|
@ -419,6 +419,11 @@ class MessagesSQLiteTests(MessagesBaseTest):
|
||||
config_filename = 'wsgi_sqlite.conf'
|
||||
|
||||
|
||||
class MessagesSQLiteShardedTests(MessagesBaseTest):
|
||||
|
||||
config_filename = 'wsgi_sqlite_sharded.conf'
|
||||
|
||||
|
||||
@testing.requires_mongodb
|
||||
class MessagesMongoDBTests(MessagesBaseTest):
|
||||
|
||||
@ -428,6 +433,15 @@ class MessagesMongoDBTests(MessagesBaseTest):
|
||||
super(MessagesMongoDBTests, self).setUp()
|
||||
|
||||
|
||||
@testing.requires_mongodb
|
||||
class MessagesMongoDBShardedTests(MessagesBaseTest):
|
||||
|
||||
config_filename = 'wsgi_mongodb_sharded.conf'
|
||||
|
||||
def setUp(self):
|
||||
super(MessagesMongoDBShardedTests, self).setUp()
|
||||
|
||||
|
||||
class MessagesFaultyDriverTests(base.TestBaseFaulty):
|
||||
|
||||
config_filename = 'wsgi_faulty.conf'
|
||||
|
@ -18,6 +18,7 @@ from oslo.config import cfg
|
||||
from marconi.common import exceptions
|
||||
import marconi.queues
|
||||
from marconi.queues.storage import pipeline
|
||||
from marconi.queues.storage import sharding
|
||||
from marconi.queues.storage import sqlite
|
||||
from marconi.queues.transport import wsgi
|
||||
from marconi.tests import base
|
||||
@ -40,6 +41,13 @@ class TestBootstrap(base.TestBase):
|
||||
self.assertIsInstance(bootstrap.storage, pipeline.Driver)
|
||||
self.assertIsInstance(bootstrap.storage._storage, sqlite.Driver)
|
||||
|
||||
def test_storage_sqlite_sharded(self):
|
||||
"""Makes sure we can load the shard driver."""
|
||||
conf_file = 'etc/wsgi_sqlite_sharded.conf'
|
||||
bootstrap = marconi.Bootstrap(conf_file)
|
||||
|
||||
self.assertIsInstance(bootstrap.storage._storage, sharding.Driver)
|
||||
|
||||
def test_transport_invalid(self):
|
||||
conf_file = 'etc/drivers_transport_invalid.conf'
|
||||
bootstrap = marconi.Bootstrap(conf_file)
|
||||
|
Loading…
x
Reference in New Issue
Block a user