From c290ed8f371d9aaeeef54abb3bf503f8f0526284 Mon Sep 17 00:00:00 2001 From: gordon chung Date: Thu, 4 Dec 2014 16:23:23 -0500 Subject: [PATCH] ensure unique pipeline names for notification agent coordination, we requeue items based on the pipeline name. to ensure we have correct queues for pipeline, we should make sure pipeline names are unique. Change-Id: I1b3e7cf1206fd11ab7fde15233cdd19416322acc Implements: blueprint notification-coordination --- ceilometer/pipeline.py | 18 +++++++++++++++--- ceilometer/tests/agent/agentbase.py | 10 +++++----- ceilometer/tests/pipeline_base.py | 8 ++++++++ ceilometer/tests/test_decoupled_pipeline.py | 9 +++++++++ ceilometer/tests/test_deprecated_pipeline.py | 9 +++++++++ 5 files changed, 46 insertions(+), 8 deletions(-) diff --git a/ceilometer/pipeline.py b/ceilometer/pipeline.py index 8181726b3..32acc4c30 100644 --- a/ceilometer/pipeline.py +++ b/ceilometer/pipeline.py @@ -512,14 +512,26 @@ class PipelineManager(object): for source in sources: source.check_sinks(sinks) for target in source.sinks: - self.pipelines.append(Pipeline(source, - sinks[target])) + pipe = Pipeline(source, sinks[target]) + if pipe.name in [p.name for p in self.pipelines]: + raise PipelineException( + "Duplicate pipeline name: %s. Ensure pipeline" + " names are unique. (name is the source and sink" + " names combined)" % pipe.name, cfg) + else: + self.pipelines.append(pipe) else: LOG.warning(_('detected deprecated pipeline config format')) for pipedef in cfg: source = Source(pipedef) sink = Sink(pipedef, transformer_manager) - self.pipelines.append(Pipeline(source, sink)) + pipe = Pipeline(source, sink) + if pipe.name in [p.name for p in self.pipelines]: + raise PipelineException( + "Duplicate pipeline name: %s. Ensure pipeline" + " names are unique" % pipe.name, cfg) + else: + self.pipelines.append(pipe) def publisher(self, context): """Build a new Publisher for these manager pipelines. diff --git a/ceilometer/tests/agent/agentbase.py b/ceilometer/tests/agent/agentbase.py index f589c43bd..57e5eba12 100644 --- a/ceilometer/tests/agent/agentbase.py +++ b/ceilometer/tests/agent/agentbase.py @@ -317,7 +317,7 @@ class BaseAgentManagerTestCase(base.BaseTestCase): def test_setup_polling_tasks_multiple_interval(self): self.pipeline_cfg.append({ - 'name': "test_pipeline", + 'name': "test_pipeline_1", 'interval': 10, 'counters': ['test'], 'resources': ['test://'] if self.source_resources else [], @@ -346,7 +346,7 @@ class BaseAgentManagerTestCase(base.BaseTestCase): def test_setup_polling_task_same_interval(self): self.pipeline_cfg.append({ - 'name': "test_pipeline", + 'name': "test_pipeline_1", 'interval': 60, 'counters': ['testanother'], 'resources': ['testanother://'] if self.source_resources else [], @@ -356,14 +356,14 @@ class BaseAgentManagerTestCase(base.BaseTestCase): self.setup_pipeline() polling_tasks = self.mgr.setup_polling_tasks() self.assertEqual(1, len(polling_tasks)) - pollsters = polling_tasks.get(60).pollster_matches['test_pipeline'] + pollsters = polling_tasks.get(60).pollster_matches self.assertEqual(2, len(pollsters)) per_task_resources = polling_tasks[60].resources self.assertEqual(2, len(per_task_resources)) key = 'test_pipeline-test' self.assertEqual(set(self.pipeline_cfg[0]['resources']), set(per_task_resources[key].get({}))) - key = 'test_pipeline-testanother' + key = 'test_pipeline_1-testanother' self.assertEqual(set(self.pipeline_cfg[1]['resources']), set(per_task_resources[key].get({}))) @@ -407,7 +407,7 @@ class BaseAgentManagerTestCase(base.BaseTestCase): def test_manager_exception_persistency(self): self.pipeline_cfg.append({ - 'name': "test_pipeline", + 'name': "test_pipeline_1", 'interval': 60, 'counters': ['testanother'], 'transformers': [], diff --git a/ceilometer/tests/pipeline_base.py b/ceilometer/tests/pipeline_base.py index c0ada4c7d..9538bfc02 100644 --- a/ceilometer/tests/pipeline_base.py +++ b/ceilometer/tests/pipeline_base.py @@ -169,6 +169,10 @@ class BasePipelineTestCase(base.BaseTestCase): def _break_pipeline_cfg(self): """Break the pipeline config with a malformed element.""" + @abc.abstractmethod + def _dup_pipeline_name_cfg(self): + """Break the pipeline config with duplicate pipeline name.""" + @abc.abstractmethod def _set_pipeline_cfg(self, field, value): """Set a field to a value in the pipeline config.""" @@ -1783,3 +1787,7 @@ class BasePipelineTestCase(base.BaseTestCase): pipe.flush(None) publisher = pipeline_manager.pipelines[0].publishers[0] self.assertEqual(0, len(publisher.samples)) + + def test_unique_pipeline_names(self): + self._dup_pipeline_name_cfg() + self._exception_create_pipelinemanager() diff --git a/ceilometer/tests/test_decoupled_pipeline.py b/ceilometer/tests/test_decoupled_pipeline.py index af16c6147..53e781e13 100644 --- a/ceilometer/tests/test_decoupled_pipeline.py +++ b/ceilometer/tests/test_decoupled_pipeline.py @@ -74,6 +74,15 @@ class TestDecoupledPipeline(pipeline_base.BasePipelineTestCase): 'publishers': ['except'], }) + def _dup_pipeline_name_cfg(self): + self.pipeline_cfg['sources'].append({ + 'name': 'test_source', + 'interval': 5, + 'counters': ['b'], + 'resources': [], + 'sinks': ['test_sink'] + }) + def _set_pipeline_cfg(self, field, value): if field in self.pipeline_cfg['sources'][0]: self.pipeline_cfg['sources'][0][field] = value diff --git a/ceilometer/tests/test_deprecated_pipeline.py b/ceilometer/tests/test_deprecated_pipeline.py index 3f4d093ee..595015d97 100644 --- a/ceilometer/tests/test_deprecated_pipeline.py +++ b/ceilometer/tests/test_deprecated_pipeline.py @@ -64,6 +64,15 @@ class TestDeprecatedPipeline(pipeline_base.BasePipelineTestCase): 'publishers': ['except'], }) + def _dup_pipeline_name_cfg(self): + self.pipeline_cfg.append({ + 'name': 'test_pipeline', + 'interval': 5, + 'counters': ['b'], + 'transformers': [], + 'publishers': ['except'], + }) + def _set_pipeline_cfg(self, field, value): self.pipeline_cfg[0][field] = value