Merge "chore: Pass cache into drivers"

This commit is contained in:
Jenkins 2013-12-04 21:17:28 +00:00 committed by Gerrit Code Review
commit 5208aa73b0
12 changed files with 90 additions and 49 deletions

View File

@ -70,9 +70,11 @@ class Bootstrap(object):
if self.conf.sharding:
LOG.debug(_(u'Storage sharding enabled'))
storage_driver = sharding.DataDriver(self.conf, self.control)
storage_driver = sharding.DataDriver(self.conf, self.cache,
self.control)
else:
storage_driver = storage_utils.load_storage_driver(self.conf)
storage_driver = storage_utils.load_storage_driver(
self.conf, self.cache)
LOG.debug(_(u'Loading storage pipeline'))
return pipeline.DataDriver(self.conf, storage_driver)
@ -80,12 +82,12 @@ class Bootstrap(object):
@decorators.lazy_property(write=False)
def control(self):
LOG.debug(_(u'Loading storage control driver'))
return storage_utils.load_storage_driver(self.conf,
return storage_utils.load_storage_driver(self.conf, self.cache,
control_mode=True)
@decorators.lazy_property(write=False)
def cache(self):
LOG.debug(_(u'Loading Proxy Cache Driver'))
LOG.debug(_(u'Loading proxy cache driver'))
try:
mgr = oslo_cache.get_cache(self.conf)
return mgr

View File

@ -32,7 +32,22 @@ _LIMITS_GROUP = 'limits:storage'
@six.add_metaclass(abc.ABCMeta)
class DataDriverBase(object):
class DriverBase(object):
"""Base class for both data and control plane drivers
:param conf: Configuration containing options for this driver.
:type conf: `oslo.config.ConfigOpts`
:param cache: Cache instance to use for reducing latency
for certain lookups.
:type cache: `marconi.common.cache.backends.BaseCache`
"""
def __init__(self, conf, cache):
self.conf = conf
self.cache = cache
@six.add_metaclass(abc.ABCMeta)
class DataDriverBase(DriverBase):
"""Interface definition for storage drivers.
Data plane storage drivers are responsible for implementing the
@ -41,16 +56,15 @@ class DataDriverBase(object):
Connection information and driver-specific options are
loaded from the config file or the shard catalog.
:param conf: Driver configuration. Can be any
dict-like object containing the expected
options. Must at least include 'uri' which
provides connection options such as host and
port.
:param conf: Configuration containing options for this driver.
:type conf: `oslo.config.ConfigOpts`
:param cache: Cache instance to use for reducing latency
for certain lookups.
:type cache: `marconi.common.cache.backends.BaseCache`
"""
def __init__(self, conf):
self.conf = conf
def __init__(self, conf, cache):
super(DataDriverBase, self).__init__(conf, cache)
self.conf.register_opts(_LIMITS_OPTIONS, group=_LIMITS_GROUP)
self.limits_conf = self.conf[_LIMITS_GROUP]
@ -77,7 +91,7 @@ class DataDriverBase(object):
@six.add_metaclass(abc.ABCMeta)
class ControlDriverBase(object):
class ControlDriverBase(DriverBase):
"""Interface definition for control plane storage drivers.
Storage drivers that work at the control plane layer allow one to
@ -86,10 +100,13 @@ class ControlDriverBase(object):
Allows access to the shard registry through a catalogue and a
shard controller.
"""
def __init__(self, conf):
self.conf = conf
:param conf: Configuration containing options for this driver.
:type conf: `oslo.config.ConfigOpts`
:param cache: Cache instance to use for reducing latency
for certain lookups.
:type cache: `marconi.common.cache.backends.BaseCache`
"""
@abc.abstractproperty
def catalogue_controller(self):

View File

@ -39,8 +39,8 @@ def _connection(conf):
class DataDriver(storage.DataDriverBase):
def __init__(self, conf):
super(DataDriver, self).__init__(conf)
def __init__(self, conf, cache):
super(DataDriver, self).__init__(conf, cache)
opts = options.MONGODB_OPTIONS
@ -112,8 +112,8 @@ class DataDriver(storage.DataDriverBase):
class ControlDriver(storage.ControlDriverBase):
def __init__(self, conf):
super(ControlDriver, self).__init__(conf)
def __init__(self, conf, cache):
super(ControlDriver, self).__init__(conf, cache)
self.conf.register_opts(options.MONGODB_OPTIONS,
group=options.MONGODB_GROUP)

View File

@ -82,15 +82,15 @@ def _get_storage_pipeline(resource_name, conf):
class DataDriver(base.DataDriverBase):
"""Meta-driver for injecting pipelines in front of controllers.
:param storage_conf: For real drivers, this would be used to
configure the storage, but in this case it is simply ignored.
:param conf: Configuration from which to load pipeline settings
:param storage: Storage driver that will service requests as the
last step in the pipeline
"""
def __init__(self, conf, storage):
super(DataDriver, self).__init__(conf)
# NOTE(kgriffs): Pass None for cache since it won't ever
# be referenced.
super(DataDriver, self).__init__(conf, None)
self._storage = storage
def is_alive(self):

View File

@ -38,13 +38,16 @@ _CATALOG_GROUP = 'sharding:catalog'
class DataDriver(storage.DataDriverBase):
"""Sharding meta-driver for routing requests to multiple backends.
:param storage_conf: Ignored, since this is a meta-driver
:param catalog_conf: Options pertaining to the shard catalog
:param conf: Configuration from which to read sharding options
:param cache: Cache instance that will be passed to individual
storage driver instances that correspond to each shard. will
also be used by the shard controller to reduce latency for
some operations.
"""
def __init__(self, conf, control):
super(DataDriver, self).__init__(conf)
self._shard_catalog = Catalog(conf, control)
def __init__(self, conf, cache, control):
super(DataDriver, self).__init__(conf, cache)
self._shard_catalog = Catalog(conf, cache, control)
def is_alive(self):
return all(self._shard_catalog.get_driver(shard['name']).is_alive()
@ -307,9 +310,10 @@ class ClaimController(RoutingController):
class Catalog(object):
"""Represents the mapping between queues and shard drivers."""
def __init__(self, conf, control):
def __init__(self, conf, cache, control):
self._drivers = {}
self._conf = conf
self._cache = cache
self._conf.register_opts(_CATALOG_OPTIONS, group=_CATALOG_GROUP)
self._catalog_conf = self._conf[_CATALOG_GROUP]
@ -355,7 +359,7 @@ class Catalog(object):
conf.register_opts(general_opts)
conf.register_opts(driver_opts, group=u'drivers')
conf.register_opts(storage_opts, group=storage_group)
return utils.load_storage_driver(conf)
return utils.load_storage_driver(conf, self._cache)
def register(self, queue, project=None):
"""Register a new queue in the shard catalog.

View File

@ -36,8 +36,8 @@ _SQLITE_GROUP = 'drivers:storage:sqlite'
class DataDriver(storage.DataDriverBase):
def __init__(self, conf):
super(DataDriver, self).__init__(conf)
def __init__(self, conf, cache):
super(DataDriver, self).__init__(conf, cache)
self.conf.register_opts(_SQLITE_OPTIONS, group=_SQLITE_GROUP)
self.sqlite_conf = self.conf[_SQLITE_GROUP]
@ -211,8 +211,8 @@ class DataDriver(storage.DataDriverBase):
class ControlDriver(storage.ControlDriverBase):
def __init__(self, conf):
super(ControlDriver, self).__init__(conf)
def __init__(self, conf, cache):
super(ControlDriver, self).__init__(conf, cache)
self.conf.register_opts(_SQLITE_OPTIONS, group=_SQLITE_GROUP)
self.sqlite_conf = self.conf[_SQLITE_GROUP]

View File

@ -22,22 +22,30 @@ from marconi.openstack.common import log
LOG = log.getLogger(__name__)
def load_storage_driver(conf, control_mode=False):
def load_storage_driver(conf, cache, control_mode=False):
"""Loads a storage driver and returns it.
The driver's initializer will be passed conf as its only arg.
The driver's initializer will be passed conf and cache as
its positional args.
:param conf: Configuration instance to use for loading the
driver. Must include a 'drivers' group.
:param cache: Cache instance that the driver can (optionally)
use to reduce latency for some operations.
:param control_mode: (Default False). Determines which
driver type to load; if False, the data driver is
loaded. If True, the control driver is loaded.
"""
mode = 'control' if control_mode else 'data'
driver_type = 'marconi.queues.{0}.storage'.format(mode)
try:
mgr = driver.DriverManager(driver_type,
conf['drivers'].storage,
invoke_on_load=True,
invoke_args=[conf])
invoke_args=[conf, cache])
return mgr.driver
except RuntimeError as exc:

View File

@ -17,8 +17,8 @@ from marconi.queues import storage
class DataDriver(storage.DataDriverBase):
def __init__(self, conf):
super(DataDriver, self).__init__(conf)
def __init__(self, conf, cache):
super(DataDriver, self).__init__(conf, cache)
@property
def default_options(self):
@ -42,8 +42,8 @@ class DataDriver(storage.DataDriverBase):
class ControlDriver(storage.ControlDriverBase):
def __init__(self, conf):
super(ControlDriver, self).__init__(conf)
def __init__(self, conf, cache):
super(ControlDriver, self).__init__(conf, cache)
@property
def catalogue_controller(self):

View File

@ -21,6 +21,7 @@ import ddt
import six
from testtools import matchers
from marconi.common.cache import cache as oslo_cache
from marconi.openstack.common import timeutils
from marconi.queues import storage
from marconi.queues.storage import errors
@ -46,7 +47,8 @@ class ControllerBaseTest(testing.TestBase):
self.controller_class,
self.controller_base_class))
self.driver = self.driver_class(self.conf)
cache = oslo_cache.get_cache(self.conf)
self.driver = self.driver_class(self.conf, cache)
self._prepare_conf()
self.addCleanup(self._purge_databases)

View File

@ -21,6 +21,7 @@ from pymongo import cursor
import pymongo.errors
from testtools import matchers
from marconi.common.cache import cache as oslo_cache
from marconi.openstack.common import timeutils
from marconi.queues import storage
from marconi.queues.storage import errors
@ -92,7 +93,8 @@ class MongodbDriverTest(testing.TestBase, MongodbTestMixin):
config_file = 'wsgi_mongodb.conf'
def test_db_instance(self):
driver = mongodb.DataDriver(self.conf)
cache = oslo_cache.get_cache(self.conf)
driver = mongodb.DataDriver(self.conf, cache)
databases = (driver.message_databases +
[driver.queues_database])

View File

@ -18,6 +18,7 @@ import uuid
from oslo.config import cfg
from marconi.common.cache import cache as oslo_cache
from marconi.queues.storage import sharding
from marconi.queues.storage import sqlite
from marconi.queues.storage import utils
@ -37,7 +38,10 @@ class TestShardCatalog(testing.TestBase):
self.conf.register_opts([cfg.StrOpt('storage')],
group='drivers')
control = utils.load_storage_driver(self.conf, control_mode=True)
cache = oslo_cache.get_cache(self.conf)
control = utils.load_storage_driver(self.conf, cache,
control_mode=True)
self.catalogue_ctrl = control.catalogue_controller
self.shards_ctrl = control.shards_controller
@ -47,7 +51,7 @@ class TestShardCatalog(testing.TestBase):
self.project = str(uuid.uuid1())
self.shards_ctrl.create(self.shard, 100, 'sqlite://memory')
self.catalogue_ctrl.insert(self.project, self.queue, self.shard)
self.catalog = sharding.Catalog(self.conf, control)
self.catalog = sharding.Catalog(self.conf, cache, control)
def tearDown(self):
self.catalogue_ctrl.drop_all()

View File

@ -19,6 +19,7 @@ import uuid
from oslo.config import cfg
from marconi.common.cache import cache as oslo_cache
from marconi.queues.storage import sharding
from marconi.queues.storage import utils
from marconi import tests as testing
@ -33,11 +34,12 @@ class TestShardQueues(base.TestBase):
conf = self.load_conf('wsgi_mongodb_sharded.conf')
conf.register_opts([cfg.StrOpt('storage')],
group='queues:drivers')
group='drivers')
control = utils.load_storage_driver(conf, control_mode=True)
cache = oslo_cache.get_cache(self.conf)
control = utils.load_storage_driver(conf, cache, control_mode=True)
self.shards_ctrl = control.shards_controller
self.driver = sharding.DataDriver(conf, control)
self.driver = sharding.DataDriver(conf, cache, control)
self.controller = self.driver.queue_controller
# fake two shards