Use the pipeline for the storage layer

This patch integrates the pipeline between the transport and the storage
layer. This will allow have filters before getting to the final storage
and even implement the ShardManager as a Pipe itself.

Implements blueprint storage-pipeline

Change-Id: I428421d9f3d6c2b8f05597073f341689e186960c
This commit is contained in:
Flavio Percoco 2013-10-10 12:19:43 +02:00
parent 0e21349600
commit 6b25f3227a
5 changed files with 109 additions and 12 deletions

View File

@ -13,14 +13,86 @@
# See the License for the specific language governing permissions and
# limitations under the License.
"""This module implements a common Pipeline object.
The pipeline can be used to enhance the storage layer with filtering, routing,
multiplexing and the like. For example:
>>> pipes = [MessageFiltering(), ShardManager(), QueueController()]
>>> pipeline = Pipeline(pipes)
>>> pipeline.create(...)
Every pipe has to implement the method it wants to hook into. This method
will be called when the pipeline consumption gets to that point - pipes
ordering matters - and will continue unless the method call returns a value
that is not None.
At least one of the pipes has to implement the calling method. If none of
them do, an AttributeError exception will be raised.
Other helper functions can also be found in this module. `get_storage_pipeline`
for example, creates a pipeline based on the `storage_pipeline` configuration
option. This config is a ListOpt and can be either set programmatically or in
the configuration file itself:
[storage]
storage_pipeline = marconi.queues.storage.filters.ValidationFilter,
marconi.queues.storage.sharding.ShardManager
Note that controllers *must* not be present in this configuration option.
They will be loaded - and automatically appended to the pipeline - using
the `drivers:storage` configuration parameter.
"""
import functools
from oslo.config import cfg
import six
from marconi.openstack.common import importutils
import marconi.openstack.common.log as logging
LOG = logging.getLogger(__name__)
_PIPELINE_CONFIGS = [
cfg.ListOpt('storage_pipeline', default=[],
help=_('Pipeline to use for the storage layer '
'This pipeline will be consumed before '
'calling the controller method, which will '
'always be appended to this pipeline'))
]
def get_storage_pipeline(conf):
"""Returns a pipeline based on `storage_pipeline`
This is a helper function for any service supporting
pipelines for the storage layer - Proxy and Queues,
for example. The function returns a pipeline based on
the `storage_pipeline` config option.
:param conf: Configuration instance.
:param conf: `cfg.ConfigOpts`
:returns: A pipeline to use.
:rtype: `Pipeline`
"""
conf.register_opts(_PIPELINE_CONFIGS,
group='storage')
pipeline = []
for ns in conf.storage.storage_pipeline:
cls = importutils.try_import(ns)
if not cls:
msg = _('Pipe {0} could not be imported').format(ns)
LOG.warning(msg)
continue
pipeline.append(cls())
return Pipeline(pipeline)
class Pipeline(object):

View File

@ -17,6 +17,10 @@
import abc
from oslo.config import cfg
from marconi.common import pipeline
class DriverBase:
__metaclass__ = abc.ABCMeta
@ -45,20 +49,41 @@ class DriverBase:
raise NotImplementedError
@abc.abstractproperty
def queue_controller(self):
def _queue_controller(self):
"""Returns storage's queues controller."""
raise NotImplementedError
@property
def queue_controller(self):
controller = self._queue_controller
pipes = pipeline.get_storage_pipeline(cfg.CONF)
pipes.append(controller)
return pipes
@abc.abstractproperty
def message_controller(self):
def _message_controller(self):
"""Returns storage's messages controller."""
raise NotImplementedError
@property
def message_controller(self):
controller = self._message_controller
pipes = pipeline.get_storage_pipeline(cfg.CONF)
pipes.append(controller)
return pipes
@abc.abstractproperty
def claim_controller(self):
def _claim_controller(self):
"""Returns storage's claims controller."""
raise NotImplementedError
@property
def claim_controller(self):
controller = self._claim_controller
pipes = pipeline.get_storage_pipeline(cfg.CONF)
pipes.append(controller)
return pipes
class ControllerBase(object):
"""Top-level class for controllers.

View File

@ -73,13 +73,13 @@ class Driver(storage.DriverBase):
return options.CFG.gc_interval
@property
def queue_controller(self):
def _queue_controller(self):
return controllers.QueueController(self)
@property
def message_controller(self):
def _message_controller(self):
return controllers.MessageController(self)
@property
def claim_controller(self):
def _claim_controller(self):
return controllers.ClaimController(self)

View File

@ -117,13 +117,13 @@ class Driver(storage.DriverBase):
raise
@property
def queue_controller(self):
def _queue_controller(self):
return controllers.QueueController(self)
@property
def message_controller(self):
def _message_controller(self):
return controllers.MessageController(self)
@property
def claim_controller(self):
def _claim_controller(self):
return controllers.ClaimController(self)

View File

@ -19,15 +19,15 @@ from marconi.queues import storage
class Driver(storage.DriverBase):
@property
def queue_controller(self):
def _queue_controller(self):
return QueueController(self)
@property
def message_controller(self):
def _message_controller(self):
return MessageController(self)
@property
def claim_controller(self):
def _claim_controller(self):
return None