Setup storage pipeline in the boostrap instead of driver base

This patch moves pipeline setup into the bootstrap and out of
the storage driver base class, so that the base class can be
inherited by meta-drivers, such as the planned sharding manager,
without introducing a loop in the bootstrapping logic.

Now, a meta-driver is exposed to the transport object that
takes care of wiring up the pipeline for each resource
controller behind the scenes.

As part of this work, the pipeline config was modified to
support configuring different stages depending on the
resource. We create three instances of Pipeline anyway,
so it seemed to make sense to allow the operator to
configure the pipelines independently.

Partially-Implements: blueprint storage-pipeline
Change-Id: Ibdb7d0e9537b1eec38a13f4881df7462039bbf98
This commit is contained in:
kgriffs 2013-10-10 15:13:11 -04:00
parent f42ed75e6c
commit e8cedadbfd
12 changed files with 172 additions and 122 deletions

View File

@ -23,6 +23,7 @@ log_file = /var/log/marconi/queues.log
# Facility to use. If unset defaults to LOG_USER.
;syslog_log_facility = LOG_LOCAL0
# ================= Driver Options ============================
[queues:drivers]
# Transport driver module (e.g., wsgi, zmq)
@ -31,6 +32,15 @@ transport = wsgi
# Storage driver module (e.g., mongodb, sqlite)
storage = mongodb
# TODO(kgriffs): Add example stages
[storage]
# Pipeline for operations on queue resources
;queue_pipeline =
# Pipeline for operations on message resources
;message_pipeline =
# Pipeline for operations on claim resources
;claim_pipeline =
[queues:drivers:transport:wsgi]
;bind = 0.0.0.0
;port = 8888

View File

@ -1 +1,5 @@
"""Code common to Marconi"""
from marconi.common import pipeline
Pipeline = pipeline.Pipeline

View File

@ -18,101 +18,46 @@
The pipeline can be used to enhance the storage layer with filtering, routing,
multiplexing and the like. For example:
>>> pipes = [MessageFiltering(), ShardManager(), QueueController()]
>>> pipeline = Pipeline(pipes)
>>> pipeline.create(...)
>>> stages = [MessageFilter(), EncryptionFilter(), QueueController()]
>>> pipeline = Pipeline(stages)
Every pipe has to implement the method it wants to hook into. This method
will be called when the pipeline consumption gets to that point - pipes
Every stage has to implement the method it wants to hook into. This method
will be called when the pipeline consumption gets to that point - stage
ordering matters - and will continue unless the method call returns a value
that is not None.
At least one of the pipes has to implement the calling method. If none of
At least one of the stages has to implement the calling method. If none of
them do, an AttributeError exception will be raised.
Other helper functions can also be found in this module. `get_storage_pipeline`
for example, creates a pipeline based on the `storage_pipeline` configuration
option. This config is a ListOpt and can be either set programmatically or in
the configuration file itself:
[storage]
storage_pipeline = marconi.queues.storage.filters.ValidationFilter,
marconi.queues.storage.sharding.ShardManager
Note that controllers *must* not be present in this configuration option.
They will be loaded - and automatically appended to the pipeline - using
the `drivers:storage` configuration parameter.
"""
import functools
from oslo.config import cfg
import six
from marconi.openstack.common import importutils
import marconi.openstack.common.log as logging
LOG = logging.getLogger(__name__)
_PIPELINE_CONFIGS = [
cfg.ListOpt('storage_pipeline', default=[],
help=_('Pipeline to use for the storage layer '
'This pipeline will be consumed before '
'calling the controller method, which will '
'always be appended to this pipeline'))
]
def get_storage_pipeline(conf):
"""Returns a pipeline based on `storage_pipeline`
This is a helper function for any service supporting
pipelines for the storage layer - Proxy and Queues,
for example. The function returns a pipeline based on
the `storage_pipeline` config option.
:param conf: Configuration instance.
:param conf: `cfg.ConfigOpts`
:returns: A pipeline to use.
:rtype: `Pipeline`
"""
conf.register_opts(_PIPELINE_CONFIGS,
group='storage')
pipeline = []
for ns in conf.storage.storage_pipeline:
cls = importutils.try_import(ns)
if not cls:
msg = _('Pipe {0} could not be imported').format(ns)
LOG.warning(msg)
continue
pipeline.append(cls())
return Pipeline(pipeline)
class Pipeline(object):
def __init__(self, pipeline=None):
self._pipeline = pipeline and list(pipeline) or []
def append(self, item):
self._pipeline.append(item)
def append(self, stage):
self._pipeline.append(stage)
def __getattr__(self, name):
return functools.partial(self.consume_for, name)
def consume_for(self, method, *args, **kwargs):
"""Consumes the pipeline for `method`
"""Consumes the pipeline for `method`.
This method walks through the pipeline and calls
`method` for each of the items in the pipeline. A
warning will be logged for each pipe not implementing
`method` and an Attribute error will be raised if
none of the pipes do.
none of the stages do.
:params method: The method name to call on each pipe
:type method: `six.text_type`
@ -120,22 +65,20 @@ class Pipeline(object):
:param kwargs: Keyword arguments to pass to the call.
:returns: Anything returned by the called methods.
:raises: AttributeError if none of the pipes implement `method`
:raises: AttributeError if none of the stages implement `method`
"""
# NOTE(flaper87): Used as a way to verify
# the requested method exists in at least
# one of the pipes, otherwise AttributeError
# one of the stages, otherwise AttributeError
# will be raised.
target = None
for pipe in self._pipeline:
for stage in self._pipeline:
try:
target = getattr(pipe, method)
target = getattr(stage, method)
except AttributeError:
spipe = six.text_type(pipe)
msg = _(u"Pipe {0} does not implement {1}").format(spipe,
method)
LOG.warning(msg)
msg = _(u'Stage {0} does not implement {1}')
LOG.warning(msg.format(six.text_type(stage), method))
continue
result = target(*args, **kwargs)
@ -148,6 +91,6 @@ class Pipeline(object):
if target is None:
msg = _(u'Method {0} not found in any of '
'the registered pipes').format(method)
'the registered stages').format(method)
LOG.error(msg)
raise AttributeError(msg)

View File

@ -19,6 +19,7 @@ from stevedore import driver
from marconi.common import decorators
from marconi.common import exceptions
from marconi.openstack.common import log
from marconi.queues.storage import pipeline
from marconi.queues import transport # NOQA
@ -53,22 +54,27 @@ class Bootstrap(object):
@decorators.lazy_property(write=False)
def storage(self):
LOG.debug(_(u'Loading Storage Driver'))
storage_name = CFG['queues:drivers'].storage
LOG.debug(_(u'Loading storage driver: ') + storage_name)
try:
mgr = driver.DriverManager('marconi.queues.storage',
CFG['queues:drivers'].storage,
storage_name,
invoke_on_load=True)
return mgr.driver
return pipeline.Driver(CFG, mgr.driver)
except RuntimeError as exc:
LOG.exception(exc)
raise exceptions.InvalidDriver(exc)
@decorators.lazy_property(write=False)
def transport(self):
LOG.debug(_(u'Loading Transport Driver'))
transport_name = CFG['queues:drivers'].transport
LOG.debug(_(u'Loading transport driver: ') + transport_name)
try:
mgr = driver.DriverManager('marconi.queues.transport',
CFG['queues:drivers'].transport,
transport_name,
invoke_on_load=True,
invoke_args=[self.storage])
return mgr.driver

View File

@ -17,49 +17,24 @@
import abc
from oslo.config import cfg
from marconi.common import pipeline
class DriverBase:
class DriverBase(object):
__metaclass__ = abc.ABCMeta
@abc.abstractproperty
def _queue_controller(self):
"""Returns storage's queues controller."""
raise NotImplementedError
@property
def queue_controller(self):
controller = self._queue_controller
pipes = pipeline.get_storage_pipeline(cfg.CONF)
pipes.append(controller)
return pipes
@abc.abstractproperty
def _message_controller(self):
"""Returns storage's messages controller."""
"""Returns the driver's queue controller."""
raise NotImplementedError
@property
@abc.abstractproperty
def message_controller(self):
controller = self._message_controller
pipes = pipeline.get_storage_pipeline(cfg.CONF)
pipes.append(controller)
return pipes
@abc.abstractproperty
def _claim_controller(self):
"""Returns storage's claims controller."""
"""Returns the driver's message controller."""
raise NotImplementedError
@property
@abc.abstractproperty
def claim_controller(self):
controller = self._claim_controller
pipes = pipeline.get_storage_pipeline(cfg.CONF)
pipes.append(controller)
return pipes
"""Returns the driver's claim controller."""
raise NotImplementedError
class ControllerBase(object):

View File

@ -70,13 +70,13 @@ class Driver(storage.DriverBase):
return MongoClient(options.CFG.uri)
@property
def _queue_controller(self):
def queue_controller(self):
return controllers.QueueController(self)
@property
def _message_controller(self):
def message_controller(self):
return controllers.MessageController(self)
@property
def _claim_controller(self):
def claim_controller(self):
return controllers.ClaimController(self)

View File

@ -0,0 +1,110 @@
# Copyright (c) 2013 Rackspace, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
#
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo.config import cfg
from marconi import common
from marconi.common import decorators
from marconi.openstack.common import importutils
from marconi.openstack.common import log as logging
from marconi.queues.storage import base
LOG = logging.getLogger(__name__)
_PIPELINE_RESOURCES = ('queue', 'message', 'claim')
_PIPELINE_CONFIGS = [
cfg.ListOpt(resource + '_pipeline', default=[],
help=_('Pipeline to use for processing {0} operations. '
'This pipeline will be consumed before calling '
'the storage driver\'s controller methods, '
'which will always be appended to this '
'pipeline.').format(resource))
for resource in _PIPELINE_RESOURCES
]
_PIPELINE_GROUP = 'storage'
def _get_storage_pipeline(resource_name, conf):
"""Constructs and returns a storage resource pipeline.
This is a helper function for any service supporting
pipelines for the storage layer. The function returns
a pipeline based on the `{resource_name}_pipeline`
config option.
Stages in the pipeline implement controller methods
that they want to hook. A stage can halt the
pipeline immediate by returning a value that is
not None; otherwise, processing will continue
to the next stage, ending with the actual storage
controller.
:param conf: Configuration instance.
:type conf: `cfg.ConfigOpts`
:returns: A pipeline to use.
:rtype: `Pipeline`
"""
conf.register_opts(_PIPELINE_CONFIGS,
group=_PIPELINE_GROUP)
storage_conf = conf[_PIPELINE_GROUP]
pipeline = []
for ns in storage_conf[resource_name + '_pipeline']:
cls = importutils.try_import(ns)
if not cls:
msg = _('Pipe {0} could not be imported').format(ns)
LOG.warning(msg)
continue
pipeline.append(cls())
return common.Pipeline(pipeline)
class Driver(base.DriverBase):
"""Meta-driver for injecting pipelines in front of controllers.
:param conf: Configuration from which to load pipeline settings
:param storage: Storage driver that will service requests as the
last step in the pipeline
"""
def __init__(self, conf, storage):
self._conf = conf
self._storage = storage
@decorators.lazy_property(write=False)
def queue_controller(self):
pipes = _get_storage_pipeline('queue', self._conf)
pipes.append(self._storage.queue_controller)
return pipes
@decorators.lazy_property(write=False)
def message_controller(self):
pipes = _get_storage_pipeline('message', self._conf)
pipes.append(self._storage.message_controller)
return pipes
@decorators.lazy_property(write=False)
def claim_controller(self):
pipes = _get_storage_pipeline('claim', self._conf)
pipes.append(self._storage.claim_controller)
return pipes

View File

@ -118,13 +118,13 @@ class Driver(storage.DriverBase):
raise
@property
def _queue_controller(self):
def queue_controller(self):
return controllers.QueueController(self)
@property
def _message_controller(self):
def message_controller(self):
return controllers.MessageController(self)
@property
def _claim_controller(self):
def claim_controller(self):
return controllers.ClaimController(self)

View File

@ -18,15 +18,15 @@ from marconi.queues import storage
class Driver(storage.DriverBase):
@property
def _queue_controller(self):
def queue_controller(self):
return QueueController(self)
@property
def _message_controller(self):
def message_controller(self):
return MessageController(self)
@property
def _claim_controller(self):
def claim_controller(self):
return None

View File

@ -242,7 +242,7 @@ class ClaimsMongoDBTests(ClaimsBaseTest):
self.cfg = cfg.CONF['queues:drivers:storage:mongodb']
def tearDown(self):
storage = self.boot.storage
storage = self.boot.storage._storage
connection = storage.connection
connection.drop_database(storage.queues_database)

View File

@ -319,7 +319,7 @@ class QueueLifecycleMongoDBTests(QueueLifecycleBaseTest):
self.cfg = cfg.CONF['queues:drivers:storage:mongodb']
def tearDown(self):
storage = self.boot.storage
storage = self.boot.storage._storage
connection = storage.connection
connection.drop_database(storage.queues_database)

View File

@ -17,6 +17,7 @@ from oslo.config import cfg
from marconi.common import exceptions
import marconi.queues
from marconi.queues.storage import pipeline
from marconi.queues.storage import sqlite
from marconi.queues.transport import wsgi
from marconi.tests import base
@ -36,7 +37,8 @@ class TestBootstrap(base.TestBase):
def test_storage_sqlite(self):
conf_file = 'etc/wsgi_sqlite.conf'
bootstrap = marconi.Bootstrap(conf_file)
self.assertIsInstance(bootstrap.storage, sqlite.Driver)
self.assertIsInstance(bootstrap.storage, pipeline.Driver)
self.assertIsInstance(bootstrap.storage._storage, sqlite.Driver)
def test_transport_invalid(self):
conf_file = 'etc/drivers_transport_invalid.conf'