ensure unique pipeline names

for notification agent coordination, we requeue items based on the
pipeline name. to ensure we have correct queues for pipeline, we
should make sure pipeline names are unique.

Change-Id: I1b3e7cf1206fd11ab7fde15233cdd19416322acc
Implements: blueprint notification-coordination
This commit is contained in:
gordon chung 2014-12-04 16:23:23 -05:00
parent 8d3d84053f
commit c290ed8f37
5 changed files with 46 additions and 8 deletions

View File

@ -512,14 +512,26 @@ class PipelineManager(object):
for source in sources: for source in sources:
source.check_sinks(sinks) source.check_sinks(sinks)
for target in source.sinks: for target in source.sinks:
self.pipelines.append(Pipeline(source, pipe = Pipeline(source, sinks[target])
sinks[target])) if pipe.name in [p.name for p in self.pipelines]:
raise PipelineException(
"Duplicate pipeline name: %s. Ensure pipeline"
" names are unique. (name is the source and sink"
" names combined)" % pipe.name, cfg)
else:
self.pipelines.append(pipe)
else: else:
LOG.warning(_('detected deprecated pipeline config format')) LOG.warning(_('detected deprecated pipeline config format'))
for pipedef in cfg: for pipedef in cfg:
source = Source(pipedef) source = Source(pipedef)
sink = Sink(pipedef, transformer_manager) sink = Sink(pipedef, transformer_manager)
self.pipelines.append(Pipeline(source, sink)) pipe = Pipeline(source, sink)
if pipe.name in [p.name for p in self.pipelines]:
raise PipelineException(
"Duplicate pipeline name: %s. Ensure pipeline"
" names are unique" % pipe.name, cfg)
else:
self.pipelines.append(pipe)
def publisher(self, context): def publisher(self, context):
"""Build a new Publisher for these manager pipelines. """Build a new Publisher for these manager pipelines.

View File

@ -317,7 +317,7 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
def test_setup_polling_tasks_multiple_interval(self): def test_setup_polling_tasks_multiple_interval(self):
self.pipeline_cfg.append({ self.pipeline_cfg.append({
'name': "test_pipeline", 'name': "test_pipeline_1",
'interval': 10, 'interval': 10,
'counters': ['test'], 'counters': ['test'],
'resources': ['test://'] if self.source_resources else [], 'resources': ['test://'] if self.source_resources else [],
@ -346,7 +346,7 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
def test_setup_polling_task_same_interval(self): def test_setup_polling_task_same_interval(self):
self.pipeline_cfg.append({ self.pipeline_cfg.append({
'name': "test_pipeline", 'name': "test_pipeline_1",
'interval': 60, 'interval': 60,
'counters': ['testanother'], 'counters': ['testanother'],
'resources': ['testanother://'] if self.source_resources else [], 'resources': ['testanother://'] if self.source_resources else [],
@ -356,14 +356,14 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
self.setup_pipeline() self.setup_pipeline()
polling_tasks = self.mgr.setup_polling_tasks() polling_tasks = self.mgr.setup_polling_tasks()
self.assertEqual(1, len(polling_tasks)) self.assertEqual(1, len(polling_tasks))
pollsters = polling_tasks.get(60).pollster_matches['test_pipeline'] pollsters = polling_tasks.get(60).pollster_matches
self.assertEqual(2, len(pollsters)) self.assertEqual(2, len(pollsters))
per_task_resources = polling_tasks[60].resources per_task_resources = polling_tasks[60].resources
self.assertEqual(2, len(per_task_resources)) self.assertEqual(2, len(per_task_resources))
key = 'test_pipeline-test' key = 'test_pipeline-test'
self.assertEqual(set(self.pipeline_cfg[0]['resources']), self.assertEqual(set(self.pipeline_cfg[0]['resources']),
set(per_task_resources[key].get({}))) set(per_task_resources[key].get({})))
key = 'test_pipeline-testanother' key = 'test_pipeline_1-testanother'
self.assertEqual(set(self.pipeline_cfg[1]['resources']), self.assertEqual(set(self.pipeline_cfg[1]['resources']),
set(per_task_resources[key].get({}))) set(per_task_resources[key].get({})))
@ -407,7 +407,7 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
def test_manager_exception_persistency(self): def test_manager_exception_persistency(self):
self.pipeline_cfg.append({ self.pipeline_cfg.append({
'name': "test_pipeline", 'name': "test_pipeline_1",
'interval': 60, 'interval': 60,
'counters': ['testanother'], 'counters': ['testanother'],
'transformers': [], 'transformers': [],

View File

@ -169,6 +169,10 @@ class BasePipelineTestCase(base.BaseTestCase):
def _break_pipeline_cfg(self): def _break_pipeline_cfg(self):
"""Break the pipeline config with a malformed element.""" """Break the pipeline config with a malformed element."""
@abc.abstractmethod
def _dup_pipeline_name_cfg(self):
"""Break the pipeline config with duplicate pipeline name."""
@abc.abstractmethod @abc.abstractmethod
def _set_pipeline_cfg(self, field, value): def _set_pipeline_cfg(self, field, value):
"""Set a field to a value in the pipeline config.""" """Set a field to a value in the pipeline config."""
@ -1783,3 +1787,7 @@ class BasePipelineTestCase(base.BaseTestCase):
pipe.flush(None) pipe.flush(None)
publisher = pipeline_manager.pipelines[0].publishers[0] publisher = pipeline_manager.pipelines[0].publishers[0]
self.assertEqual(0, len(publisher.samples)) self.assertEqual(0, len(publisher.samples))
def test_unique_pipeline_names(self):
self._dup_pipeline_name_cfg()
self._exception_create_pipelinemanager()

View File

@ -74,6 +74,15 @@ class TestDecoupledPipeline(pipeline_base.BasePipelineTestCase):
'publishers': ['except'], 'publishers': ['except'],
}) })
def _dup_pipeline_name_cfg(self):
self.pipeline_cfg['sources'].append({
'name': 'test_source',
'interval': 5,
'counters': ['b'],
'resources': [],
'sinks': ['test_sink']
})
def _set_pipeline_cfg(self, field, value): def _set_pipeline_cfg(self, field, value):
if field in self.pipeline_cfg['sources'][0]: if field in self.pipeline_cfg['sources'][0]:
self.pipeline_cfg['sources'][0][field] = value self.pipeline_cfg['sources'][0][field] = value

View File

@ -64,6 +64,15 @@ class TestDeprecatedPipeline(pipeline_base.BasePipelineTestCase):
'publishers': ['except'], 'publishers': ['except'],
}) })
def _dup_pipeline_name_cfg(self):
self.pipeline_cfg.append({
'name': 'test_pipeline',
'interval': 5,
'counters': ['b'],
'transformers': [],
'publishers': ['except'],
})
def _set_pipeline_cfg(self, field, value): def _set_pipeline_cfg(self, field, value):
self.pipeline_cfg[0][field] = value self.pipeline_cfg[0][field] = value