Added resources definition in the pipeline

The 'resources' in the pipeline definition is a list of endpoints for
the pollsters to get the data from. It's optional and it's up to the
specific pollsters to decide how to use it.

The new pipeline definition could be something like the following:

    meters:
        - "*"
    resources:
        - file:///foo/bar/endpoints

Implements: blueprint support-resources-pipeline-item
Change-Id: Ibd7426f8f453c176c99d3786106deb35bbac8d4f
This commit is contained in:
Lianhao Lu 2013-11-26 14:15:01 +08:00
parent ff20b3986f
commit 87a914f6b7
3 changed files with 29 additions and 0 deletions

View File

@ -129,6 +129,10 @@ class Pipeline(object):
self.transformers = self._setup_transformers(cfg, transformer_manager)
self.resources = cfg.get('resources') or []
if not isinstance(self.resources, list):
raise PipelineException("Resources should be a list", cfg)
def __str__(self):
return self.name
@ -304,6 +308,7 @@ class PipelineManager(object):
"name": pipeline_name
"interval": interval_time
"meters" : ["meter_1", "meter_2"],
"resources": ["resource_uri1", "resource_uri2"],
"tranformers":[
{"name": "Transformer_1",
"parameters": {"p1": "value"}},
@ -329,6 +334,10 @@ class PipelineManager(object):
"excluded meter names", wildcard and "excluded meter names", or
only wildcard.
The resources is list of URI indicating the resources from where
the meters should be polled. It's optional and it's up to the
specific pollster to decide how to use it.
Transformer's name is plugin name in setup.py.
Publisher's name is plugin name in setup.py

View File

@ -199,6 +199,11 @@ class TestPipeline(test.BaseTestCase):
del self.pipeline_cfg[0]['publishers']
self._exception_create_pipelinemanager()
def test_invalid_resources(self):
invalid_resource = {'invalid': 1}
self.pipeline_cfg[0]['resources'] = invalid_resource
self._exception_create_pipelinemanager()
def test_check_counters_include_exclude_same(self):
counter_cfg = ['a', '!a']
self.pipeline_cfg[0]['counters'] = counter_cfg
@ -1048,3 +1053,17 @@ class TestPipeline(test.BaseTestCase):
self.assertEqual(len(publisher.samples), 0)
pipe.flush(None)
self.assertEqual(len(publisher.samples), 0)
def test_resources(self):
resources = ['test1://', 'test2://']
self.pipeline_cfg[0]['resources'] = resources
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.transformer_manager)
self.assertEqual(pipeline_manager.pipelines[0].resources,
resources)
def test_no_resources(self):
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.transformer_manager)
self.assertEqual(len(pipeline_manager.pipelines[0].resources),
0)

View File

@ -4,6 +4,7 @@
interval: 600
meters:
- "*"
resources:
transformers:
publishers:
- rpc://