diff --git a/etc/marconi-queues.conf-sample b/etc/marconi-queues.conf-sample index 0fb08936d..a29bf238a 100644 --- a/etc/marconi-queues.conf-sample +++ b/etc/marconi-queues.conf-sample @@ -23,6 +23,7 @@ log_file = /var/log/marconi/queues.log # Facility to use. If unset defaults to LOG_USER. ;syslog_log_facility = LOG_LOCAL0 +# ================= Driver Options ============================ [queues:drivers] # Transport driver module (e.g., wsgi, zmq) @@ -31,6 +32,15 @@ transport = wsgi # Storage driver module (e.g., mongodb, sqlite) storage = mongodb +# TODO(kgriffs): Add example stages +[storage] +# Pipeline for operations on queue resources +;queue_pipeline = +# Pipeline for operations on message resources +;message_pipeline = +# Pipeline for operations on claim resources +;claim_pipeline = + [queues:drivers:transport:wsgi] ;bind = 0.0.0.0 ;port = 8888 diff --git a/marconi/common/__init__.py b/marconi/common/__init__.py index 29ada70bd..15dfa0253 100644 --- a/marconi/common/__init__.py +++ b/marconi/common/__init__.py @@ -1 +1,5 @@ """Code common to Marconi""" + +from marconi.common import pipeline + +Pipeline = pipeline.Pipeline diff --git a/marconi/common/pipeline.py b/marconi/common/pipeline.py index 3f4c3130f..85a999f7c 100644 --- a/marconi/common/pipeline.py +++ b/marconi/common/pipeline.py @@ -18,101 +18,46 @@ The pipeline can be used to enhance the storage layer with filtering, routing, multiplexing and the like. For example: - >>> pipes = [MessageFiltering(), ShardManager(), QueueController()] - >>> pipeline = Pipeline(pipes) - >>> pipeline.create(...) + >>> stages = [MessageFilter(), EncryptionFilter(), QueueController()] + >>> pipeline = Pipeline(stages) -Every pipe has to implement the method it wants to hook into. This method -will be called when the pipeline consumption gets to that point - pipes +Every stage has to implement the method it wants to hook into. This method +will be called when the pipeline consumption gets to that point - stage ordering matters - and will continue unless the method call returns a value that is not None. -At least one of the pipes has to implement the calling method. If none of +At least one of the stages has to implement the calling method. If none of them do, an AttributeError exception will be raised. - -Other helper functions can also be found in this module. `get_storage_pipeline` -for example, creates a pipeline based on the `storage_pipeline` configuration -option. This config is a ListOpt and can be either set programmatically or in -the configuration file itself: - - [storage] - storage_pipeline = marconi.queues.storage.filters.ValidationFilter, - marconi.queues.storage.sharding.ShardManager - -Note that controllers *must* not be present in this configuration option. -They will be loaded - and automatically appended to the pipeline - using -the `drivers:storage` configuration parameter. """ import functools -from oslo.config import cfg import six -from marconi.openstack.common import importutils import marconi.openstack.common.log as logging LOG = logging.getLogger(__name__) -_PIPELINE_CONFIGS = [ - cfg.ListOpt('storage_pipeline', default=[], - help=_('Pipeline to use for the storage layer ' - 'This pipeline will be consumed before ' - 'calling the controller method, which will ' - 'always be appended to this pipeline')) -] - - -def get_storage_pipeline(conf): - """Returns a pipeline based on `storage_pipeline` - - This is a helper function for any service supporting - pipelines for the storage layer - Proxy and Queues, - for example. The function returns a pipeline based on - the `storage_pipeline` config option. - - :param conf: Configuration instance. - :param conf: `cfg.ConfigOpts` - - :returns: A pipeline to use. - :rtype: `Pipeline` - """ - conf.register_opts(_PIPELINE_CONFIGS, - group='storage') - - pipeline = [] - for ns in conf.storage.storage_pipeline: - cls = importutils.try_import(ns) - - if not cls: - msg = _('Pipe {0} could not be imported').format(ns) - LOG.warning(msg) - continue - - pipeline.append(cls()) - - return Pipeline(pipeline) - class Pipeline(object): def __init__(self, pipeline=None): self._pipeline = pipeline and list(pipeline) or [] - def append(self, item): - self._pipeline.append(item) + def append(self, stage): + self._pipeline.append(stage) def __getattr__(self, name): return functools.partial(self.consume_for, name) def consume_for(self, method, *args, **kwargs): - """Consumes the pipeline for `method` + """Consumes the pipeline for `method`. This method walks through the pipeline and calls `method` for each of the items in the pipeline. A warning will be logged for each pipe not implementing `method` and an Attribute error will be raised if - none of the pipes do. + none of the stages do. :params method: The method name to call on each pipe :type method: `six.text_type` @@ -120,22 +65,20 @@ class Pipeline(object): :param kwargs: Keyword arguments to pass to the call. :returns: Anything returned by the called methods. - :raises: AttributeError if none of the pipes implement `method` + :raises: AttributeError if none of the stages implement `method` """ # NOTE(flaper87): Used as a way to verify # the requested method exists in at least - # one of the pipes, otherwise AttributeError + # one of the stages, otherwise AttributeError # will be raised. target = None - for pipe in self._pipeline: + for stage in self._pipeline: try: - target = getattr(pipe, method) + target = getattr(stage, method) except AttributeError: - spipe = six.text_type(pipe) - msg = _(u"Pipe {0} does not implement {1}").format(spipe, - method) - LOG.warning(msg) + msg = _(u'Stage {0} does not implement {1}') + LOG.warning(msg.format(six.text_type(stage), method)) continue result = target(*args, **kwargs) @@ -148,6 +91,6 @@ class Pipeline(object): if target is None: msg = _(u'Method {0} not found in any of ' - 'the registered pipes').format(method) + 'the registered stages').format(method) LOG.error(msg) raise AttributeError(msg) diff --git a/marconi/queues/bootstrap.py b/marconi/queues/bootstrap.py index 7844e86ae..09080627e 100644 --- a/marconi/queues/bootstrap.py +++ b/marconi/queues/bootstrap.py @@ -19,6 +19,7 @@ from stevedore import driver from marconi.common import decorators from marconi.common import exceptions from marconi.openstack.common import log +from marconi.queues.storage import pipeline from marconi.queues import transport # NOQA @@ -53,22 +54,27 @@ class Bootstrap(object): @decorators.lazy_property(write=False) def storage(self): - LOG.debug(_(u'Loading Storage Driver')) + storage_name = CFG['queues:drivers'].storage + LOG.debug(_(u'Loading storage driver: ') + storage_name) + try: mgr = driver.DriverManager('marconi.queues.storage', - CFG['queues:drivers'].storage, + storage_name, invoke_on_load=True) - return mgr.driver + + return pipeline.Driver(CFG, mgr.driver) except RuntimeError as exc: LOG.exception(exc) raise exceptions.InvalidDriver(exc) @decorators.lazy_property(write=False) def transport(self): - LOG.debug(_(u'Loading Transport Driver')) + transport_name = CFG['queues:drivers'].transport + LOG.debug(_(u'Loading transport driver: ') + transport_name) + try: mgr = driver.DriverManager('marconi.queues.transport', - CFG['queues:drivers'].transport, + transport_name, invoke_on_load=True, invoke_args=[self.storage]) return mgr.driver diff --git a/marconi/queues/storage/base.py b/marconi/queues/storage/base.py index c96776b08..37cfdd8d1 100644 --- a/marconi/queues/storage/base.py +++ b/marconi/queues/storage/base.py @@ -17,49 +17,24 @@ import abc -from oslo.config import cfg -from marconi.common import pipeline - - -class DriverBase: +class DriverBase(object): __metaclass__ = abc.ABCMeta @abc.abstractproperty - def _queue_controller(self): - """Returns storage's queues controller.""" - raise NotImplementedError - - @property def queue_controller(self): - controller = self._queue_controller - pipes = pipeline.get_storage_pipeline(cfg.CONF) - pipes.append(controller) - return pipes - - @abc.abstractproperty - def _message_controller(self): - """Returns storage's messages controller.""" + """Returns the driver's queue controller.""" raise NotImplementedError - @property + @abc.abstractproperty def message_controller(self): - controller = self._message_controller - pipes = pipeline.get_storage_pipeline(cfg.CONF) - pipes.append(controller) - return pipes - - @abc.abstractproperty - def _claim_controller(self): - """Returns storage's claims controller.""" + """Returns the driver's message controller.""" raise NotImplementedError - @property + @abc.abstractproperty def claim_controller(self): - controller = self._claim_controller - pipes = pipeline.get_storage_pipeline(cfg.CONF) - pipes.append(controller) - return pipes + """Returns the driver's claim controller.""" + raise NotImplementedError class ControllerBase(object): diff --git a/marconi/queues/storage/mongodb/driver.py b/marconi/queues/storage/mongodb/driver.py index 91c1c23cb..155517e43 100644 --- a/marconi/queues/storage/mongodb/driver.py +++ b/marconi/queues/storage/mongodb/driver.py @@ -70,13 +70,13 @@ class Driver(storage.DriverBase): return MongoClient(options.CFG.uri) @property - def _queue_controller(self): + def queue_controller(self): return controllers.QueueController(self) @property - def _message_controller(self): + def message_controller(self): return controllers.MessageController(self) @property - def _claim_controller(self): + def claim_controller(self): return controllers.ClaimController(self) diff --git a/marconi/queues/storage/pipeline.py b/marconi/queues/storage/pipeline.py new file mode 100644 index 000000000..a63cc55e9 --- /dev/null +++ b/marconi/queues/storage/pipeline.py @@ -0,0 +1,110 @@ +# Copyright (c) 2013 Rackspace, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# +# See the License for the specific language governing permissions and +# limitations under the License. + +from oslo.config import cfg + +from marconi import common +from marconi.common import decorators +from marconi.openstack.common import importutils +from marconi.openstack.common import log as logging +from marconi.queues.storage import base + +LOG = logging.getLogger(__name__) + +_PIPELINE_RESOURCES = ('queue', 'message', 'claim') + +_PIPELINE_CONFIGS = [ + cfg.ListOpt(resource + '_pipeline', default=[], + help=_('Pipeline to use for processing {0} operations. ' + 'This pipeline will be consumed before calling ' + 'the storage driver\'s controller methods, ' + 'which will always be appended to this ' + 'pipeline.').format(resource)) + for resource in _PIPELINE_RESOURCES +] + +_PIPELINE_GROUP = 'storage' + + +def _get_storage_pipeline(resource_name, conf): + """Constructs and returns a storage resource pipeline. + + This is a helper function for any service supporting + pipelines for the storage layer. The function returns + a pipeline based on the `{resource_name}_pipeline` + config option. + + Stages in the pipeline implement controller methods + that they want to hook. A stage can halt the + pipeline immediate by returning a value that is + not None; otherwise, processing will continue + to the next stage, ending with the actual storage + controller. + + :param conf: Configuration instance. + :type conf: `cfg.ConfigOpts` + + :returns: A pipeline to use. + :rtype: `Pipeline` + """ + conf.register_opts(_PIPELINE_CONFIGS, + group=_PIPELINE_GROUP) + + storage_conf = conf[_PIPELINE_GROUP] + + pipeline = [] + for ns in storage_conf[resource_name + '_pipeline']: + cls = importutils.try_import(ns) + + if not cls: + msg = _('Pipe {0} could not be imported').format(ns) + LOG.warning(msg) + continue + + pipeline.append(cls()) + + return common.Pipeline(pipeline) + + +class Driver(base.DriverBase): + """Meta-driver for injecting pipelines in front of controllers. + + :param conf: Configuration from which to load pipeline settings + :param storage: Storage driver that will service requests as the + last step in the pipeline + """ + + def __init__(self, conf, storage): + self._conf = conf + self._storage = storage + + @decorators.lazy_property(write=False) + def queue_controller(self): + pipes = _get_storage_pipeline('queue', self._conf) + pipes.append(self._storage.queue_controller) + return pipes + + @decorators.lazy_property(write=False) + def message_controller(self): + pipes = _get_storage_pipeline('message', self._conf) + pipes.append(self._storage.message_controller) + return pipes + + @decorators.lazy_property(write=False) + def claim_controller(self): + pipes = _get_storage_pipeline('claim', self._conf) + pipes.append(self._storage.claim_controller) + return pipes diff --git a/marconi/queues/storage/sqlite/driver.py b/marconi/queues/storage/sqlite/driver.py index 844cab27c..52b482075 100644 --- a/marconi/queues/storage/sqlite/driver.py +++ b/marconi/queues/storage/sqlite/driver.py @@ -118,13 +118,13 @@ class Driver(storage.DriverBase): raise @property - def _queue_controller(self): + def queue_controller(self): return controllers.QueueController(self) @property - def _message_controller(self): + def message_controller(self): return controllers.MessageController(self) @property - def _claim_controller(self): + def claim_controller(self): return controllers.ClaimController(self) diff --git a/marconi/tests/faulty_storage.py b/marconi/tests/faulty_storage.py index e02448224..80a08d9e0 100644 --- a/marconi/tests/faulty_storage.py +++ b/marconi/tests/faulty_storage.py @@ -18,15 +18,15 @@ from marconi.queues import storage class Driver(storage.DriverBase): @property - def _queue_controller(self): + def queue_controller(self): return QueueController(self) @property - def _message_controller(self): + def message_controller(self): return MessageController(self) @property - def _claim_controller(self): + def claim_controller(self): return None diff --git a/tests/unit/queues/transport/wsgi/test_claims.py b/tests/unit/queues/transport/wsgi/test_claims.py index dabcc2ca6..f0cfd064f 100644 --- a/tests/unit/queues/transport/wsgi/test_claims.py +++ b/tests/unit/queues/transport/wsgi/test_claims.py @@ -242,7 +242,7 @@ class ClaimsMongoDBTests(ClaimsBaseTest): self.cfg = cfg.CONF['queues:drivers:storage:mongodb'] def tearDown(self): - storage = self.boot.storage + storage = self.boot.storage._storage connection = storage.connection connection.drop_database(storage.queues_database) diff --git a/tests/unit/queues/transport/wsgi/test_queue_lifecycle.py b/tests/unit/queues/transport/wsgi/test_queue_lifecycle.py index b993bdf7e..71b11c1fe 100644 --- a/tests/unit/queues/transport/wsgi/test_queue_lifecycle.py +++ b/tests/unit/queues/transport/wsgi/test_queue_lifecycle.py @@ -319,7 +319,7 @@ class QueueLifecycleMongoDBTests(QueueLifecycleBaseTest): self.cfg = cfg.CONF['queues:drivers:storage:mongodb'] def tearDown(self): - storage = self.boot.storage + storage = self.boot.storage._storage connection = storage.connection connection.drop_database(storage.queues_database) diff --git a/tests/unit/test_bootstrap.py b/tests/unit/test_bootstrap.py index 348fdb7fe..b5c1d56c6 100644 --- a/tests/unit/test_bootstrap.py +++ b/tests/unit/test_bootstrap.py @@ -17,6 +17,7 @@ from oslo.config import cfg from marconi.common import exceptions import marconi.queues +from marconi.queues.storage import pipeline from marconi.queues.storage import sqlite from marconi.queues.transport import wsgi from marconi.tests import base @@ -36,7 +37,8 @@ class TestBootstrap(base.TestBase): def test_storage_sqlite(self): conf_file = 'etc/wsgi_sqlite.conf' bootstrap = marconi.Bootstrap(conf_file) - self.assertIsInstance(bootstrap.storage, sqlite.Driver) + self.assertIsInstance(bootstrap.storage, pipeline.Driver) + self.assertIsInstance(bootstrap.storage._storage, sqlite.Driver) def test_transport_invalid(self): conf_file = 'etc/drivers_transport_invalid.conf'