diff --git a/marconi/common/pipeline.py b/marconi/common/pipeline.py index 73dc318f3..3f4c3130f 100644 --- a/marconi/common/pipeline.py +++ b/marconi/common/pipeline.py @@ -13,14 +13,86 @@ # See the License for the specific language governing permissions and # limitations under the License. +"""This module implements a common Pipeline object. + +The pipeline can be used to enhance the storage layer with filtering, routing, +multiplexing and the like. For example: + + >>> pipes = [MessageFiltering(), ShardManager(), QueueController()] + >>> pipeline = Pipeline(pipes) + >>> pipeline.create(...) + +Every pipe has to implement the method it wants to hook into. This method +will be called when the pipeline consumption gets to that point - pipes +ordering matters - and will continue unless the method call returns a value +that is not None. + +At least one of the pipes has to implement the calling method. If none of +them do, an AttributeError exception will be raised. + +Other helper functions can also be found in this module. `get_storage_pipeline` +for example, creates a pipeline based on the `storage_pipeline` configuration +option. This config is a ListOpt and can be either set programmatically or in +the configuration file itself: + + [storage] + storage_pipeline = marconi.queues.storage.filters.ValidationFilter, + marconi.queues.storage.sharding.ShardManager + +Note that controllers *must* not be present in this configuration option. +They will be loaded - and automatically appended to the pipeline - using +the `drivers:storage` configuration parameter. +""" + import functools +from oslo.config import cfg import six +from marconi.openstack.common import importutils import marconi.openstack.common.log as logging LOG = logging.getLogger(__name__) +_PIPELINE_CONFIGS = [ + cfg.ListOpt('storage_pipeline', default=[], + help=_('Pipeline to use for the storage layer ' + 'This pipeline will be consumed before ' + 'calling the controller method, which will ' + 'always be appended to this pipeline')) +] + + +def get_storage_pipeline(conf): + """Returns a pipeline based on `storage_pipeline` + + This is a helper function for any service supporting + pipelines for the storage layer - Proxy and Queues, + for example. The function returns a pipeline based on + the `storage_pipeline` config option. + + :param conf: Configuration instance. + :param conf: `cfg.ConfigOpts` + + :returns: A pipeline to use. + :rtype: `Pipeline` + """ + conf.register_opts(_PIPELINE_CONFIGS, + group='storage') + + pipeline = [] + for ns in conf.storage.storage_pipeline: + cls = importutils.try_import(ns) + + if not cls: + msg = _('Pipe {0} could not be imported').format(ns) + LOG.warning(msg) + continue + + pipeline.append(cls()) + + return Pipeline(pipeline) + class Pipeline(object): diff --git a/marconi/queues/storage/base.py b/marconi/queues/storage/base.py index 7ba2464d4..ae2c7d40e 100644 --- a/marconi/queues/storage/base.py +++ b/marconi/queues/storage/base.py @@ -17,6 +17,10 @@ import abc +from oslo.config import cfg + +from marconi.common import pipeline + class DriverBase: __metaclass__ = abc.ABCMeta @@ -45,20 +49,41 @@ class DriverBase: raise NotImplementedError @abc.abstractproperty - def queue_controller(self): + def _queue_controller(self): """Returns storage's queues controller.""" raise NotImplementedError + @property + def queue_controller(self): + controller = self._queue_controller + pipes = pipeline.get_storage_pipeline(cfg.CONF) + pipes.append(controller) + return pipes + @abc.abstractproperty - def message_controller(self): + def _message_controller(self): """Returns storage's messages controller.""" raise NotImplementedError + @property + def message_controller(self): + controller = self._message_controller + pipes = pipeline.get_storage_pipeline(cfg.CONF) + pipes.append(controller) + return pipes + @abc.abstractproperty - def claim_controller(self): + def _claim_controller(self): """Returns storage's claims controller.""" raise NotImplementedError + @property + def claim_controller(self): + controller = self._claim_controller + pipes = pipeline.get_storage_pipeline(cfg.CONF) + pipes.append(controller) + return pipes + class ControllerBase(object): """Top-level class for controllers. diff --git a/marconi/queues/storage/mongodb/driver.py b/marconi/queues/storage/mongodb/driver.py index ac1f3a630..f04578c55 100644 --- a/marconi/queues/storage/mongodb/driver.py +++ b/marconi/queues/storage/mongodb/driver.py @@ -73,13 +73,13 @@ class Driver(storage.DriverBase): return options.CFG.gc_interval @property - def queue_controller(self): + def _queue_controller(self): return controllers.QueueController(self) @property - def message_controller(self): + def _message_controller(self): return controllers.MessageController(self) @property - def claim_controller(self): + def _claim_controller(self): return controllers.ClaimController(self) diff --git a/marconi/queues/storage/sqlite/driver.py b/marconi/queues/storage/sqlite/driver.py index 345988617..7ad19ba2b 100644 --- a/marconi/queues/storage/sqlite/driver.py +++ b/marconi/queues/storage/sqlite/driver.py @@ -117,13 +117,13 @@ class Driver(storage.DriverBase): raise @property - def queue_controller(self): + def _queue_controller(self): return controllers.QueueController(self) @property - def message_controller(self): + def _message_controller(self): return controllers.MessageController(self) @property - def claim_controller(self): + def _claim_controller(self): return controllers.ClaimController(self) diff --git a/marconi/tests/faulty_storage.py b/marconi/tests/faulty_storage.py index 6e2981161..3005a6adc 100644 --- a/marconi/tests/faulty_storage.py +++ b/marconi/tests/faulty_storage.py @@ -19,15 +19,15 @@ from marconi.queues import storage class Driver(storage.DriverBase): @property - def queue_controller(self): + def _queue_controller(self): return QueueController(self) @property - def message_controller(self): + def _message_controller(self): return MessageController(self) @property - def claim_controller(self): + def _claim_controller(self): return None