diff --git a/ceilometer/pipeline.py b/ceilometer/pipeline.py index 8181726b3..32acc4c30 100644 --- a/ceilometer/pipeline.py +++ b/ceilometer/pipeline.py @@ -512,14 +512,26 @@ class PipelineManager(object): for source in sources: source.check_sinks(sinks) for target in source.sinks: - self.pipelines.append(Pipeline(source, - sinks[target])) + pipe = Pipeline(source, sinks[target]) + if pipe.name in [p.name for p in self.pipelines]: + raise PipelineException( + "Duplicate pipeline name: %s. Ensure pipeline" + " names are unique. (name is the source and sink" + " names combined)" % pipe.name, cfg) + else: + self.pipelines.append(pipe) else: LOG.warning(_('detected deprecated pipeline config format')) for pipedef in cfg: source = Source(pipedef) sink = Sink(pipedef, transformer_manager) - self.pipelines.append(Pipeline(source, sink)) + pipe = Pipeline(source, sink) + if pipe.name in [p.name for p in self.pipelines]: + raise PipelineException( + "Duplicate pipeline name: %s. Ensure pipeline" + " names are unique" % pipe.name, cfg) + else: + self.pipelines.append(pipe) def publisher(self, context): """Build a new Publisher for these manager pipelines. diff --git a/ceilometer/tests/agent/agentbase.py b/ceilometer/tests/agent/agentbase.py index f589c43bd..57e5eba12 100644 --- a/ceilometer/tests/agent/agentbase.py +++ b/ceilometer/tests/agent/agentbase.py @@ -317,7 +317,7 @@ class BaseAgentManagerTestCase(base.BaseTestCase): def test_setup_polling_tasks_multiple_interval(self): self.pipeline_cfg.append({ - 'name': "test_pipeline", + 'name': "test_pipeline_1", 'interval': 10, 'counters': ['test'], 'resources': ['test://'] if self.source_resources else [], @@ -346,7 +346,7 @@ class BaseAgentManagerTestCase(base.BaseTestCase): def test_setup_polling_task_same_interval(self): self.pipeline_cfg.append({ - 'name': "test_pipeline", + 'name': "test_pipeline_1", 'interval': 60, 'counters': ['testanother'], 'resources': ['testanother://'] if self.source_resources else [], @@ -356,14 +356,14 @@ class BaseAgentManagerTestCase(base.BaseTestCase): self.setup_pipeline() polling_tasks = self.mgr.setup_polling_tasks() self.assertEqual(1, len(polling_tasks)) - pollsters = polling_tasks.get(60).pollster_matches['test_pipeline'] + pollsters = polling_tasks.get(60).pollster_matches self.assertEqual(2, len(pollsters)) per_task_resources = polling_tasks[60].resources self.assertEqual(2, len(per_task_resources)) key = 'test_pipeline-test' self.assertEqual(set(self.pipeline_cfg[0]['resources']), set(per_task_resources[key].get({}))) - key = 'test_pipeline-testanother' + key = 'test_pipeline_1-testanother' self.assertEqual(set(self.pipeline_cfg[1]['resources']), set(per_task_resources[key].get({}))) @@ -407,7 +407,7 @@ class BaseAgentManagerTestCase(base.BaseTestCase): def test_manager_exception_persistency(self): self.pipeline_cfg.append({ - 'name': "test_pipeline", + 'name': "test_pipeline_1", 'interval': 60, 'counters': ['testanother'], 'transformers': [], diff --git a/ceilometer/tests/pipeline_base.py b/ceilometer/tests/pipeline_base.py index c0ada4c7d..9538bfc02 100644 --- a/ceilometer/tests/pipeline_base.py +++ b/ceilometer/tests/pipeline_base.py @@ -169,6 +169,10 @@ class BasePipelineTestCase(base.BaseTestCase): def _break_pipeline_cfg(self): """Break the pipeline config with a malformed element.""" + @abc.abstractmethod + def _dup_pipeline_name_cfg(self): + """Break the pipeline config with duplicate pipeline name.""" + @abc.abstractmethod def _set_pipeline_cfg(self, field, value): """Set a field to a value in the pipeline config.""" @@ -1783,3 +1787,7 @@ class BasePipelineTestCase(base.BaseTestCase): pipe.flush(None) publisher = pipeline_manager.pipelines[0].publishers[0] self.assertEqual(0, len(publisher.samples)) + + def test_unique_pipeline_names(self): + self._dup_pipeline_name_cfg() + self._exception_create_pipelinemanager() diff --git a/ceilometer/tests/test_decoupled_pipeline.py b/ceilometer/tests/test_decoupled_pipeline.py index af16c6147..53e781e13 100644 --- a/ceilometer/tests/test_decoupled_pipeline.py +++ b/ceilometer/tests/test_decoupled_pipeline.py @@ -74,6 +74,15 @@ class TestDecoupledPipeline(pipeline_base.BasePipelineTestCase): 'publishers': ['except'], }) + def _dup_pipeline_name_cfg(self): + self.pipeline_cfg['sources'].append({ + 'name': 'test_source', + 'interval': 5, + 'counters': ['b'], + 'resources': [], + 'sinks': ['test_sink'] + }) + def _set_pipeline_cfg(self, field, value): if field in self.pipeline_cfg['sources'][0]: self.pipeline_cfg['sources'][0][field] = value diff --git a/ceilometer/tests/test_deprecated_pipeline.py b/ceilometer/tests/test_deprecated_pipeline.py index 3f4d093ee..595015d97 100644 --- a/ceilometer/tests/test_deprecated_pipeline.py +++ b/ceilometer/tests/test_deprecated_pipeline.py @@ -64,6 +64,15 @@ class TestDeprecatedPipeline(pipeline_base.BasePipelineTestCase): 'publishers': ['except'], }) + def _dup_pipeline_name_cfg(self): + self.pipeline_cfg.append({ + 'name': 'test_pipeline', + 'interval': 5, + 'counters': ['b'], + 'transformers': [], + 'publishers': ['except'], + }) + def _set_pipeline_cfg(self, field, value): self.pipeline_cfg[0][field] = value