Return a consumer function instead of consuming

Former implementation was called consume_for and used to consume the
whole pipeline. This implementation returns a callable function instead
which makes it possible for wrapped methods to have a `method` keyword
and to control / reuse the consumer.

Implements blueprint storage-pipeline

Change-Id: I8d7ad43028b5615b24f06ca1e116e35e5ab6a145
This commit is contained in:
Flavio Percoco 2013-10-14 01:04:45 +02:00
parent 1733a05d8e
commit 572a6296e0
3 changed files with 63 additions and 52 deletions

View File

@ -30,8 +30,6 @@ At least one of the stages has to implement the calling method. If none of
them do, an AttributeError exception will be raised. them do, an AttributeError exception will be raised.
""" """
import functools
import six import six
from marconi.common import decorators from marconi.common import decorators
@ -50,23 +48,32 @@ class Pipeline(object):
@decorators.cached_getattr @decorators.cached_getattr
def __getattr__(self, name): def __getattr__(self, name):
return functools.partial(self.consume_for, name) return self.consumer_for(name)
def consume_for(self, method, *args, **kwargs): def consumer_for(self, method):
"""Consumes the pipeline for `method`. """Creates a closure for `method`
This method walks through the pipeline and calls This method creates a closure to consume the pipeline
for `method`.
:params method: The method name to call on each stage
:type method: `six.text_type`
:returns: A callable to consume the pipeline.
"""
def consumer(*args, **kwargs):
"""Consumes the pipeline for `method`
This function walks through the pipeline and calls
`method` for each of the items in the pipeline. A `method` for each of the items in the pipeline. A
warning will be logged for each pipe not implementing warning will be logged for each stage not implementing
`method` and an Attribute error will be raised if `method` and an Attribute error will be raised if
none of the stages do. none of the stages do.
:params method: The method name to call on each pipe
:type method: `six.text_type`
:param args: Positional arguments to pass to the call. :param args: Positional arguments to pass to the call.
:param kwargs: Keyword arguments to pass to the call. :param kwargs: Keyword arguments to pass to the call.
:returns: Anything returned by the called methods.
:raises: AttributeError if none of the stages implement `method` :raises: AttributeError if none of the stages implement `method`
""" """
# NOTE(flaper87): Used as a way to verify # NOTE(flaper87): Used as a way to verify
@ -79,14 +86,16 @@ class Pipeline(object):
try: try:
target = getattr(stage, method) target = getattr(stage, method)
except AttributeError: except AttributeError:
msg = _(u'Stage {0} does not implement {1}') sstage = six.text_type(stage)
LOG.warning(msg.format(six.text_type(stage), method)) msg = _(u"Stage {0} does not implement {1}").format(sstage,
method)
LOG.warning(msg)
continue continue
result = target(*args, **kwargs) result = target(*args, **kwargs)
# NOTE(flaper87): Will keep going forward # NOTE(flaper87): Will keep going forward
# through the pipeline unless the call returns # through the stageline unless the call returns
# something. # something.
if result is not None: if result is not None:
return result return result
@ -96,3 +105,5 @@ class Pipeline(object):
'the registered stages').format(method) 'the registered stages').format(method)
LOG.error(msg) LOG.error(msg)
raise AttributeError(msg) raise AttributeError(msg)
return consumer

View File

@ -68,11 +68,12 @@ def _get_storage_pipeline(resource_name, conf):
pipeline = [] pipeline = []
for ns in storage_conf[resource_name + '_pipeline']: for ns in storage_conf[resource_name + '_pipeline']:
try: try:
mgr = driver.DriverManager('marconi.queues.storage.pipes', mgr = driver.DriverManager('marconi.queues.storage.stages',
ns, invoke_on_load=True) ns, invoke_on_load=True)
pipeline.append(mgr.driver) pipeline.append(mgr.driver)
except RuntimeError as exc: except RuntimeError as exc:
msg = _('Pipe {0} could not be imported: {1}').format(ns, str(exc)) msg = _('Stage {0} could not be imported: {1}').format(ns,
str(exc))
LOG.warning(msg) LOG.warning(msg)
continue continue
@ -95,18 +96,18 @@ class Driver(base.DriverBase):
@decorators.lazy_property(write=False) @decorators.lazy_property(write=False)
def queue_controller(self): def queue_controller(self):
pipes = _get_storage_pipeline('queue', self.conf) stages = _get_storage_pipeline('queue', self.conf)
pipes.append(self._storage.queue_controller) stages.append(self._storage.queue_controller)
return pipes return stages
@decorators.lazy_property(write=False) @decorators.lazy_property(write=False)
def message_controller(self): def message_controller(self):
pipes = _get_storage_pipeline('message', self.conf) stages = _get_storage_pipeline('message', self.conf)
pipes.append(self._storage.message_controller) stages.append(self._storage.message_controller)
return pipes return stages
@decorators.lazy_property(write=False) @decorators.lazy_property(write=False)
def claim_controller(self): def claim_controller(self):
pipes = _get_storage_pipeline('claim', self.conf) stages = _get_storage_pipeline('claim', self.conf)
pipes.append(self._storage.claim_controller) stages.append(self._storage.claim_controller)
return pipes return stages

View File

@ -63,9 +63,8 @@ class TestPipeLine(base.TestBase):
SecondClass()]) SecondClass()])
def test_attribute_error(self): def test_attribute_error(self):
self.assertRaises(AttributeError, consumer = self.pipeline.consumer_for('does_not_exist')
self.pipeline.consume_for, self.assertRaises(AttributeError, consumer)
'does_not_exist')
def test_with_args(self): def test_with_args(self):
name = 'James' name = 'James'