feat: shards storage controller interface

Adds the storage interface for the shards storage controller. This
controller is meant to be used with the shard management queues admin
API interface.

The idea mirrors that used in writing the proxy's partition manager.

This patch also introduces the distinction between data and control
storage drivers. Data storage drivers are those that control core
functionality: messages, queues, claims. Control storage drivers are
used to manage system functionality from the point of view of an admin
- in this case, the registry of shards.

The next patch will provide an implementation for mongodb + unit tests.

Change-Id: I4f24e84c99689968e60360383b190ce168055e74
Partially-implements: blueprint storage-sharding
Partially-Closes: 1241686
This commit is contained in:
Alejandro Cabrera 2013-10-09 15:15:55 -04:00
parent a24ce88789
commit 142c7ae0d6
20 changed files with 284 additions and 40 deletions

View File

@ -67,12 +67,12 @@ class Bootstrap(object):
if self.conf.sharding:
LOG.debug(_(u'Storage sharding enabled'))
storage_driver = sharding.Driver(self.conf)
storage_driver = sharding.DataDriver(self.conf)
else:
storage_driver = storage_utils.load_storage_driver(self.conf)
LOG.debug(_(u'Loading storage pipeline'))
return pipeline.Driver(self.conf, storage_driver)
return pipeline.DataDriver(self.conf, storage_driver)
@decorators.lazy_property(write=False)
def cache(self):

View File

@ -4,7 +4,8 @@ from marconi.queues.storage import base
from marconi.queues.storage import exceptions # NOQA
# Hoist classes into package namespace
ControlDriverBase = base.ControlDriverBase
DataDriverBase = base.DataDriverBase
ClaimBase = base.ClaimBase
DriverBase = base.DriverBase
MessageBase = base.MessageBase
QueueBase = base.QueueBase

View File

@ -16,6 +16,7 @@
"""Implements the DriverBase abstract class for Marconi storage drivers."""
import abc
import six
from oslo.config import cfg
@ -30,9 +31,13 @@ _LIMITS_OPTIONS = [
_LIMITS_GROUP = 'queues:limits:storage'
class DriverBase(object):
@six.add_metaclass(abc.ABCMeta)
class DataDriverBase(object):
"""Interface definition for storage drivers.
Data plane storage drivers are responsible for implementing the
core functionality of the system.
Connection information and driver-specific options are
loaded from the config file or the shard catalog.
@ -41,8 +46,8 @@ class DriverBase(object):
options. Must at least include 'uri' which
provides connection options such as host and
port.
"""
__metaclass__ = abc.ABCMeta
def __init__(self, conf):
self.conf = conf
@ -66,6 +71,28 @@ class DriverBase(object):
raise NotImplementedError
@six.add_metaclass(abc.ABCMeta)
class ControlDriverBase(object):
"""Interface definition for control plane storage drivers.
Storage drivers that work at the control plane layer allow one to
modify aspects of the functionality of the system. This is ideal
for administrative purposes.
Allows access to the shard registry through a catalogue and a
shard controller.
"""
def __init__(self, conf):
self.conf = conf
@abc.abstractproperty
def shards_controller(self):
"""Returns storage's shard management controller."""
raise NotImplementedError
class ControllerBase(object):
"""Top-level class for controllers.
@ -77,6 +104,7 @@ class ControllerBase(object):
self.driver = driver
@six.add_metaclass(abc.ABCMeta)
class QueueBase(ControllerBase):
"""This class is responsible for managing queues.
@ -87,8 +115,6 @@ class QueueBase(ControllerBase):
numbers of queues.
"""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def list(self, project=None, marker=None, limit=10,
detailed=False, include_claimed=True):
@ -172,11 +198,10 @@ class QueueBase(ControllerBase):
raise NotImplementedError
@six.add_metaclass(abc.ABCMeta)
class MessageBase(ControllerBase):
"""This class is responsible for managing message CRUD."""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def list(self, queue, project=None, marker=None,
limit=10, echo=False, client_uuid=None):
@ -285,10 +310,9 @@ class MessageBase(ControllerBase):
raise NotImplementedError
@six.add_metaclass(abc.ABCMeta)
class ClaimBase(ControllerBase):
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def get(self, queue, claim_id, project=None):
"""Base method for getting a claim.
@ -342,3 +366,89 @@ class ClaimBase(ControllerBase):
:param project: Project id
"""
raise NotImplementedError
@six.add_metaclass(abc.ABCMeta)
class ShardsController(ControllerBase):
"""A controller for managing shards."""
@abc.abstractmethod
def list(self, marker=None, limit=10, detailed=False):
"""Lists all registered shards.
:param marker: used to determine which shard to start with
:type marker: six.text_type
:param limit: how many results to return
:type limit: int
:param detailed: whether to include options
:type detailed: bool
:returns: A list of shards - name, weight, uri
:rtype: [{}]
"""
raise NotImplementedError
@abc.abstractmethod
def create(self, name, weight, uri, options=None):
"""Registers a shard entry.
:param name: The name of this shard
:type name: six.text_type
:param weight: the likelihood that this shard will be used
:type weight: int
:param uri: A URI that can be used by a storage client
(e.g., pymongo) to access this shard.
:type uri: six.text_type
:param options: Options used to configure this shard
:type options: dict
"""
raise NotImplementedError
@abc.abstractmethod
def get(self, name):
"""Returns a single shard entry.
:param name: The name of this shard
:type name: six.text_type
:returns: weight, uri, and options for this shard
:rtype: {}
:raises: ShardDoesNotExist if not found
"""
raise NotImplementedError
@abc.abstractmethod
def exists(self, name):
"""Returns a single shard entry.
:param name: The name of this shard
:type name: six.text_type
:returns: True if the shard exists
:rtype: bool
"""
raise NotImplementedError
@abc.abstractmethod
def delete(self, name):
"""Removes a shard entry.
:param name: The name of this shard
:type name: six.text_type
:rtype: None
"""
raise NotImplementedError
@abc.abstractmethod
def update(self, name, **kwargs):
"""Updates the weight, uris, and/or options of this shard
:param name: Name of the shard
:type name: text
:param kwargs: one of: `uri`, `weight`, `options`
:type kwargs: dict
:raises: ShardDoesNotExist
"""
raise NotImplementedError
@abc.abstractmethod
def drop_all(self):
"""Deletes all shards from storage."""
raise NotImplementedError

View File

@ -3,4 +3,5 @@
from marconi.queues.storage.mongodb import driver
# Hoist classes into package namespace
Driver = driver.Driver
ControlDriver = driver.ControlDriver
DataDriver = driver.DataDriver

View File

@ -25,8 +25,10 @@ Field Mappings:
from marconi.queues.storage.mongodb import claims
from marconi.queues.storage.mongodb import messages
from marconi.queues.storage.mongodb import queues
from marconi.queues.storage.mongodb import shards
ClaimController = claims.ClaimController
MessageController = messages.MessageController
QueueController = queues.QueueController
ShardsController = shards.ShardsController

View File

@ -28,10 +28,10 @@ from marconi.queues.storage.mongodb import options
LOG = logging.getLogger(__name__)
class Driver(storage.DriverBase):
class DataDriver(storage.DataDriverBase):
def __init__(self, conf):
super(Driver, self).__init__(conf)
super(DataDriver, self).__init__(conf)
self.conf.register_opts(options.MONGODB_OPTIONS,
group=options.MONGODB_GROUP)
@ -88,3 +88,18 @@ class Driver(storage.DriverBase):
@decorators.lazy_property(write=False)
def claim_controller(self):
return controllers.ClaimController(self)
class ControlDriver(storage.ControlDriverBase):
def __init__(self, conf):
super(ControlDriver, self).__init__(conf)
self.conf.register_opts(options.MONGODB_OPTIONS,
group=options.MONGODB_GROUP)
self.mongodb_conf = self.conf[options.MONGODB_GROUP]
@property
def shards_controller(self):
return controllers.ShardsController(self)

View File

@ -0,0 +1,41 @@
# Copyright (c) 2013 Rackspace, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
#
# See the License for the specific language governing permissions and
# limitations under the License.
from marconi.queues.storage import base
class ShardsController(base.ShardsController):
def list(self, marker=None, limit=10, detailed=False):
pass
def get(self, name):
pass
def create(self, name, weight, uri, options=None):
pass
def exists(self, name):
pass
def update(self, name, **kwargs):
pass
def delete(self, name):
pass
def drop_all(self):
pass

View File

@ -80,7 +80,7 @@ def _get_storage_pipeline(resource_name, conf):
return common.Pipeline(pipeline)
class Driver(base.DriverBase):
class DataDriver(base.DataDriverBase):
"""Meta-driver for injecting pipelines in front of controllers.
:param storage_conf: For real drivers, this would be used to
@ -91,7 +91,7 @@ class Driver(base.DriverBase):
"""
def __init__(self, conf, storage):
super(Driver, self).__init__(conf)
super(DataDriver, self).__init__(conf)
self._storage = storage
@decorators.lazy_property(write=False)

View File

@ -28,7 +28,7 @@ _CATALOG_OPTIONS = [
_CATALOG_GROUP = 'queues:sharding:catalog'
class Driver(storage.DriverBase):
class DataDriver(storage.DataDriverBase):
"""Sharding meta-driver for routing requests to multiple backends.
:param storage_conf: Ignored, since this is a meta-driver
@ -36,7 +36,7 @@ class Driver(storage.DriverBase):
"""
def __init__(self, conf):
super(Driver, self).__init__(conf)
super(DataDriver, self).__init__(conf)
self._shard_catalog = Catalog(conf)
@decorators.lazy_property(write=False)

View File

@ -7,4 +7,5 @@ Useful for automated testing and for prototyping storage driver concepts.
from marconi.queues.storage.sqlite import driver
# Hoist classes into package namespace
Driver = driver.Driver
ControlDriver = driver.ControlDriver
DataDriver = driver.DataDriver

View File

@ -19,8 +19,10 @@
from marconi.queues.storage.sqlite import claims
from marconi.queues.storage.sqlite import messages
from marconi.queues.storage.sqlite import queues
from marconi.queues.storage.sqlite import shards
ClaimController = claims.ClaimController
MessageController = messages.MessageController
QueueController = queues.QueueController
ShardsController = shards.ShardsController

View File

@ -34,10 +34,10 @@ _SQLITE_OPTIONS = [
_SQLITE_GROUP = 'queues:drivers:storage:sqlite'
class Driver(storage.DriverBase):
class DataDriver(storage.DataDriverBase):
def __init__(self, conf):
super(Driver, self).__init__(conf)
super(DataDriver, self).__init__(conf)
self.conf.register_opts(_SQLITE_OPTIONS, group=_SQLITE_GROUP)
self.sqlite_conf = self.conf[_SQLITE_GROUP]
@ -196,3 +196,24 @@ class Driver(storage.DriverBase):
@decorators.lazy_property(write=False)
def claim_controller(self):
return controllers.ClaimController(self)
class ControlDriver(storage.ControlDriverBase):
def __init__(self, conf):
super(ControlDriver, self).__init__(conf)
self.conf.register_opts(_SQLITE_OPTIONS, group=_SQLITE_GROUP)
self.sqlite_conf = self.conf[_SQLITE_GROUP]
self.__path = self.sqlite_conf.database
# TODO(cpp-cabrera): implement this thing
self.__conn = sqlite3.connect(self.__path,
detect_types=sqlite3.PARSE_DECLTYPES)
self.__db = self.__conn.cursor()
self.run('''PRAGMA foreign_keys = ON''')
@property
def shards_controller(self):
return controllers.ShardsController(self)

View File

@ -0,0 +1,41 @@
# Copyright (c) 2013 Rackspace, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
#
# See the License for the specific language governing permissions and
# limitations under the License.
from marconi.queues.storage import base
class ShardsController(base.ShardsController):
def list(self, marker=None, limit=10, detailed=False):
pass
def get(self, name):
pass
def create(self, name, weight, uri, options=None):
pass
def exists(self, name):
pass
def update(self, name, **kwargs):
pass
def delete(self, name):
pass
def drop_all(self):
pass

View File

@ -16,7 +16,7 @@
from marconi.queues import storage
class Driver(storage.DriverBase):
class DataDriver(storage.DataDriverBase):
@property
def default_options(self):
return {}
@ -34,6 +34,16 @@ class Driver(storage.DriverBase):
return None
class ControlDriver(storage.ControlDriverBase):
@property
def default_options(self):
return {}
@property
def shards_controller(self):
return None
class QueueController(storage.QueueBase):
def __init__(self, driver):
pass

View File

@ -27,8 +27,8 @@ console_scripts =
marconi-server = marconi.cmd.server:run
marconi.queues.storage =
sqlite = marconi.queues.storage.sqlite.driver:Driver
mongodb = marconi.queues.storage.mongodb.driver:Driver
sqlite = marconi.queues.storage.sqlite.driver:DataDriver
mongodb = marconi.queues.storage.mongodb.driver:DataDriver
marconi.queues.public.transport =
wsgi = marconi.queues.transport.wsgi.public.driver:Driver

View File

@ -81,7 +81,7 @@ class MongodbDriverTest(testing.TestBase):
self._conf = self.load_conf('wsgi_mongodb.conf')
def test_db_instance(self):
driver = mongodb.Driver(self._conf)
driver = mongodb.DataDriver(self._conf)
databases = driver.message_databases + [driver.queues_database]
for db in databases:
@ -92,7 +92,7 @@ class MongodbDriverTest(testing.TestBase):
@testing.requires_mongodb
class MongodbQueueTests(base.QueueControllerTest):
driver_class = mongodb.Driver
driver_class = mongodb.DataDriver
controller_class = controllers.QueueController
def setUp(self):
@ -135,7 +135,7 @@ class MongodbQueueTests(base.QueueControllerTest):
@testing.requires_mongodb
class MongodbMessageTests(base.MessageControllerTest):
driver_class = mongodb.Driver
driver_class = mongodb.DataDriver
controller_class = controllers.MessageController
# NOTE(kgriffs): MongoDB's TTL scavenger only runs once a minute
@ -294,7 +294,7 @@ class MongodbMessageTests(base.MessageControllerTest):
@testing.requires_mongodb
class MongodbClaimTests(base.ClaimControllerTest):
driver_class = mongodb.Driver
driver_class = mongodb.DataDriver
controller_class = controllers.ClaimController
def setUp(self):

View File

@ -20,12 +20,12 @@ from marconi.tests.queues.storage import base
class SQliteQueueTests(base.QueueControllerTest):
driver_class = sqlite.Driver
driver_class = sqlite.DataDriver
controller_class = controllers.QueueController
class SQliteMessageTests(base.MessageControllerTest):
driver_class = sqlite.Driver
driver_class = sqlite.DataDriver
controller_class = controllers.MessageController
def test_empty_queue_exception(self):
@ -43,5 +43,5 @@ class SQliteMessageTests(base.MessageControllerTest):
class SQliteClaimTests(base.ClaimControllerTest):
driver_class = sqlite.Driver
driver_class = sqlite.DataDriver
controller_class = controllers.ClaimController

View File

@ -34,10 +34,10 @@ class TestShardCatalog(base.TestBase):
lookup = sharding.Catalog(conf).lookup
storage = lookup('q1', '123456')
self.assertIsInstance(storage, sqlite.Driver)
self.assertIsInstance(storage, sqlite.DataDriver)
storage = lookup('q2', '123456')
self.assertIsInstance(storage, sqlite.Driver)
self.assertIsInstance(storage, sqlite.DataDriver)
storage = lookup('g1', None)
self.assertIsInstance(storage, sqlite.Driver)
self.assertIsInstance(storage, sqlite.DataDriver)

View File

@ -33,8 +33,7 @@ class TestBase(testing.TestBase):
super(TestBase, self).setUp()
conf = cfg.ConfigOpts()
conf(default_config_files=[self.conf_path(self.config_filename)])
conf = self.load_conf(self.conf_path(self.config_filename))
conf.register_opts(driver._WSGI_OPTIONS,
group=driver._WSGI_GROUP)
self.wsgi_cfg = conf[driver._WSGI_GROUP]
@ -101,7 +100,7 @@ class TestBaseFaulty(TestBase):
def setUp(self):
self._storage_backup = bootstrap.Bootstrap.storage
faulty = faulty_storage.Driver(cfg.ConfigOpts())
faulty = faulty_storage.DataDriver(cfg.ConfigOpts())
setattr(bootstrap.Bootstrap, 'storage', faulty)
super(TestBaseFaulty, self).setUp()

View File

@ -35,13 +35,13 @@ class TestBootstrap(base.TestBase):
def test_storage_sqlite(self):
bootstrap = self._bootstrap('etc/wsgi_sqlite.conf')
self.assertIsInstance(bootstrap.storage, pipeline.Driver)
self.assertIsInstance(bootstrap.storage._storage, sqlite.Driver)
self.assertIsInstance(bootstrap.storage, pipeline.DataDriver)
self.assertIsInstance(bootstrap.storage._storage, sqlite.DataDriver)
def test_storage_sqlite_sharded(self):
"""Makes sure we can load the shard driver."""
bootstrap = self._bootstrap('etc/wsgi_sqlite_sharded.conf')
self.assertIsInstance(bootstrap.storage._storage, sharding.Driver)
self.assertIsInstance(bootstrap.storage._storage, sharding.DataDriver)
def test_transport_invalid(self):
boot = self._bootstrap('etc/drivers_transport_invalid.conf')