diff --git a/ceilometer/pipeline.py b/ceilometer/pipeline.py index 32acc4c30..7eb822bfb 100644 --- a/ceilometer/pipeline.py +++ b/ceilometer/pipeline.py @@ -86,10 +86,9 @@ class PublishContext(object): self.pipelines.update(pipelines) def __enter__(self): - def p(samples): + def p(data): for p in self.pipelines: - p.publish_samples(self.context, - samples) + p.publish_data(self.context, data) return p def __exit__(self, exc_type, exc_value, traceback): @@ -98,6 +97,54 @@ class PublishContext(object): class Source(object): + """Represents a source of samples or events.""" + + def __init__(self, cfg): + self.cfg = cfg + + try: + self.name = cfg['name'] + except KeyError as err: + raise PipelineException( + "Required field %s not specified" % err.args[0], cfg) + + def __str__(self): + return self.name + + def check_sinks(self, sinks): + if not self.sinks: + raise PipelineException( + "No sink defined in source %s" % self, + self.cfg) + for sink in self.sinks: + if sink not in sinks: + raise PipelineException( + "Dangling sink %s from source %s" % (sink, self), + self.cfg) + + def check_source_filtering(self, data, d_type): + """Source data rules checking + + - At least one meaningful datapoint exist + - Included type and excluded type can't co-exist on the same pipeline + - Included type meter and wildcard can't co-exist at same pipeline + """ + if not data: + raise PipelineException('No %s specified' % d_type, self.cfg) + + if ([x for x in data if x[0] not in '!*'] and + [x for x in data if x[0] == '!']): + raise PipelineException( + 'Both included and excluded %s specified' % d_type, + cfg) + + if '*' in data and [x for x in data if x[0] not in '!*']: + raise PipelineException( + 'Included %s specified with wildcard' % d_type, + self.cfg) + + +class SampleSource(Source): """Represents a source of samples. In effect it is a set of pollsters and/or notification handlers emitting @@ -107,10 +154,8 @@ class Source(object): """ def __init__(self, cfg): - self.cfg = cfg - + super(SampleSource, self).__init__(cfg) try: - self.name = cfg['name'] try: self.interval = int(cfg['interval']) except ValueError: @@ -131,34 +176,7 @@ class Source(object): self.discovery = cfg.get('discovery') or [] if not isinstance(self.discovery, list): raise PipelineException("Discovery should be a list", cfg) - self._check_meters() - - def __str__(self): - return self.name - - def _check_meters(self): - """Meter rules checking - - At least one meaningful meter exist - Included type and excluded type meter can't co-exist at - the same pipeline - Included type meter and wildcard can't co-exist at same pipeline - - """ - meters = self.meters - if not meters: - raise PipelineException("No meter specified", self.cfg) - - if ([x for x in meters if x[0] not in '!*'] and - [x for x in meters if x[0] == '!']): - raise PipelineException( - "Both included and excluded meters specified", - cfg) - - if '*' in meters and [x for x in meters if x[0] not in '!*']: - raise PipelineException( - "Included meters specified with wildcard", - self.cfg) + self.check_source_filtering(self.meters, 'meters') # (yjiang5) To support meters like instance:m1.tiny, # which include variable part at the end starting with ':'. @@ -192,43 +210,30 @@ class Source(object): return default - def check_sinks(self, sinks): - if not self.sinks: - raise PipelineException( - "No sink defined in source %s" % self, - self.cfg) - for sink in self.sinks: - if sink not in sinks: - raise PipelineException( - "Dangling sink %s from source %s" % (sink, self), - self.cfg) - class Sink(object): - """Represents a sink for the transformation and publication of samples. - - Samples are emitted from a related source. + """Represents a sink for the transformation and publication of data. Each sink config is concerned *only* with the transformation rules - and publication conduits for samples. + and publication conduits for data. In effect, a sink describes a chain of handlers. The chain starts with zero or more transformers and ends with one or more publishers. - The first transformer in the chain is passed samples from the + The first transformer in the chain is passed data from the corresponding source, takes some action such as deriving rate of change, performing unit conversion, or aggregating, before passing - the modified sample to next step. + the modified data to next step. The subsequent transformers, if any, handle the data similarly. At the end of the chain, publishers publish the data. The exact publishing method depends on publisher type, for example, pushing into data storage via the message bus providing guaranteed delivery, - or for loss-tolerant samples UDP may be used. + or for loss-tolerant data UDP may be used. If no transformers are included in the chain, the publishers are - passed samples directly from the sink which are published unchanged. + passed data directly from the sink which are published unchanged. """ def __init__(self, cfg, transformer_manager): @@ -280,6 +285,9 @@ class Sink(object): return transformers + +class SampleSink(Sink): + def _transform_sample(self, start, ctxt, sample): try: for transformer in self.transformers[start:]: @@ -364,6 +372,17 @@ class Pipeline(object): return (self.source.name if self.source.name == self.sink.name else '%s:%s' % (self.source.name, self.sink.name)) + def flush(self, ctxt): + self.sink.flush(ctxt) + + @property + def publishers(self): + return self.sink.publishers + + +class SamplePipeline(Pipeline): + """Represents a pipeline for Samples.""" + def get_interval(self): return self.source.interval @@ -378,19 +397,16 @@ class Pipeline(object): def support_meter(self, meter_name): return self.source.support_meter(meter_name) - @property - def publishers(self): - return self.sink.publishers - - def publish_sample(self, ctxt, sample): - self.publish_samples(ctxt, [sample]) - - def publish_samples(self, ctxt, samples): + def publish_data(self, ctxt, samples): + if not isinstance(samples, list): + samples = [samples] supported = [s for s in samples if self.source.support_meter(s.name)] self.sink.publish_samples(ctxt, supported) - def flush(self, ctxt): - self.sink.flush(ctxt) + +SAMPLE_TYPE = {'pipeline': SamplePipeline, + 'source': SampleSource, + 'sink': SampleSink} class PipelineManager(object): @@ -402,7 +418,7 @@ class PipelineManager(object): """ - def __init__(self, cfg, transformer_manager): + def __init__(self, cfg, transformer_manager, p_type=SAMPLE_TYPE): """Setup the pipelines according to config. The configuration is supported in one of two forms: @@ -506,13 +522,13 @@ class PipelineManager(object): raise PipelineException("Both sources & sinks are required", cfg) LOG.info(_('detected decoupled pipeline config format')) - sources = [Source(s) for s in cfg.get('sources', [])] - sinks = dict((s['name'], Sink(s, transformer_manager)) + sources = [p_type['source'](s) for s in cfg.get('sources', [])] + sinks = dict((s['name'], p_type['sink'](s, transformer_manager)) for s in cfg.get('sinks', [])) for source in sources: source.check_sinks(sinks) for target in source.sinks: - pipe = Pipeline(source, sinks[target]) + pipe = p_type['pipeline'](source, sinks[target]) if pipe.name in [p.name for p in self.pipelines]: raise PipelineException( "Duplicate pipeline name: %s. Ensure pipeline" @@ -523,9 +539,9 @@ class PipelineManager(object): else: LOG.warning(_('detected deprecated pipeline config format')) for pipedef in cfg: - source = Source(pipedef) - sink = Sink(pipedef, transformer_manager) - pipe = Pipeline(source, sink) + source = p_type['source'](pipedef) + sink = p_type['sink'](pipedef, transformer_manager) + pipe = p_type['pipeline'](source, sink) if pipe.name in [p.name for p in self.pipelines]: raise PipelineException( "Duplicate pipeline name: %s. Ensure pipeline" diff --git a/ceilometer/tests/objectstore/test_swift_middleware.py b/ceilometer/tests/objectstore/test_swift_middleware.py index 82a550120..426f10c0e 100644 --- a/ceilometer/tests/objectstore/test_swift_middleware.py +++ b/ceilometer/tests/objectstore/test_swift_middleware.py @@ -72,7 +72,7 @@ class TestSwiftMiddleware(tests_base.BaseTestCase): self.pipeline_manager = pipeline_manager self.samples = [] - def publish_samples(self, ctxt, samples): + def publish_data(self, ctxt, samples): self.samples.extend(samples) def flush(self, context): diff --git a/ceilometer/tests/pipeline_base.py b/ceilometer/tests/pipeline_base.py index 10dd7839b..864ac1b50 100644 --- a/ceilometer/tests/pipeline_base.py +++ b/ceilometer/tests/pipeline_base.py @@ -669,16 +669,16 @@ class BasePipelineTestCase(base.BaseTestCase): self.transformer_manager) pipe = pipeline_manager.pipelines[0] - pipe.publish_sample(None, self.test_counter) + pipe.publish_data(None, self.test_counter) publisher = pipeline_manager.pipelines[0].publishers[0] self.assertEqual(0, len(publisher.samples)) pipe.flush(None) self.assertEqual(0, len(publisher.samples)) - pipe.publish_sample(None, self.test_counter) + pipe.publish_data(None, self.test_counter) pipe.flush(None) self.assertEqual(0, len(publisher.samples)) for i in range(CACHE_SIZE - 2): - pipe.publish_sample(None, self.test_counter) + pipe.publish_data(None, self.test_counter) pipe.flush(None) self.assertEqual(CACHE_SIZE, len(publisher.samples)) self.assertEqual('a_update_new', getattr(publisher.samples[0], 'name')) @@ -741,7 +741,7 @@ class BasePipelineTestCase(base.BaseTestCase): pipe = pipeline_manager.pipelines[0] publisher = pipe.publishers[0] - pipe.publish_sample(None, self.test_counter) + pipe.publish_data(None, self.test_counter) self.assertEqual(0, len(publisher.samples)) pipe.flush(None) self.assertEqual(1, len(publisher.samples)) @@ -818,7 +818,7 @@ class BasePipelineTestCase(base.BaseTestCase): self.transformer_manager) pipe = pipeline_manager.pipelines[0] - pipe.publish_samples(None, counters) + pipe.publish_data(None, counters) publisher = pipeline_manager.pipelines[0].publishers[0] self.assertEqual(1, len(publisher.samples)) pipe.flush(None) @@ -872,7 +872,7 @@ class BasePipelineTestCase(base.BaseTestCase): self.transformer_manager) pipe = pipeline_manager.pipelines[0] - pipe.publish_samples(None, counters) + pipe.publish_data(None, counters) publisher = pipeline_manager.pipelines[0].publishers[0] self.assertEqual(2, len(publisher.samples)) core_temp = publisher.samples[0] @@ -962,7 +962,7 @@ class BasePipelineTestCase(base.BaseTestCase): self.transformer_manager) pipe = pipeline_manager.pipelines[0] - pipe.publish_samples(None, counters) + pipe.publish_data(None, counters) publisher = pipeline_manager.pipelines[0].publishers[0] self.assertEqual(2, len(publisher.samples)) pipe.flush(None) @@ -1047,7 +1047,7 @@ class BasePipelineTestCase(base.BaseTestCase): self.transformer_manager) pipe = pipeline_manager.pipelines[0] - pipe.publish_samples(None, counters) + pipe.publish_data(None, counters) publisher = pipeline_manager.pipelines[0].publishers[0] self.assertEqual(0, len(publisher.samples)) pipe.flush(None) @@ -1090,7 +1090,7 @@ class BasePipelineTestCase(base.BaseTestCase): ) counters.append(s) - pipe.publish_samples(None, counters) + pipe.publish_data(None, counters) publisher = pipe.publishers[0] self.assertEqual(2, len(publisher.samples)) pipe.flush(None) @@ -1219,7 +1219,7 @@ class BasePipelineTestCase(base.BaseTestCase): self.transformer_manager) pipe = pipeline_manager.pipelines[0] - pipe.publish_samples(None, counters) + pipe.publish_data(None, counters) pipe.flush(None) publisher = pipeline_manager.pipelines[0].publishers[0] self.assertEqual(expected_length, len(publisher.samples)) @@ -1255,7 +1255,7 @@ class BasePipelineTestCase(base.BaseTestCase): self.transformer_manager) pipe = pipeline_manager.pipelines[0] - pipe.publish_samples(None, counters) + pipe.publish_data(None, counters) pipe.flush(None) publisher = pipeline_manager.pipelines[0].publishers[0] actual = sorted(s.volume for s in publisher.samples) @@ -1460,12 +1460,12 @@ class BasePipelineTestCase(base.BaseTestCase): self.transformer_manager) pipe = pipeline_manager.pipelines[0] - pipe.publish_samples(None, [counters[0]]) + pipe.publish_data(None, [counters[0]]) pipe.flush(None) publisher = pipe.publishers[0] self.assertEqual(0, len(publisher.samples)) - pipe.publish_samples(None, [counters[1]]) + pipe.publish_data(None, [counters[1]]) pipe.flush(None) publisher = pipe.publishers[0] self.assertEqual(2, len(publisher.samples)) @@ -1498,7 +1498,7 @@ class BasePipelineTestCase(base.BaseTestCase): self.transformer_manager) pipe = pipeline_manager.pipelines[0] - pipe.publish_samples(None, counters) + pipe.publish_data(None, counters) pipe.flush(None) publisher = pipeline_manager.pipelines[0].publishers[0] self.assertEqual(0, len(publisher.samples)) @@ -1546,12 +1546,12 @@ class BasePipelineTestCase(base.BaseTestCase): self.transformer_manager) pipe = pipeline_manager.pipelines[0] - pipe.publish_samples(None, [counters[0]]) + pipe.publish_data(None, [counters[0]]) pipe.flush(None) publisher = pipe.publishers[0] self.assertEqual(0, len(publisher.samples)) - pipe.publish_samples(None, [counters[1]]) + pipe.publish_data(None, [counters[1]]) pipe.flush(None) publisher = pipe.publishers[0] @@ -1635,7 +1635,7 @@ class BasePipelineTestCase(base.BaseTestCase): self.transformer_manager) pipe = pipeline_manager.pipelines[0] - pipe.publish_samples(None, counters) + pipe.publish_data(None, counters) publisher = pipeline_manager.pipelines[0].publishers[0] expected_len = len(test_resources) * len(expected) self.assertEqual(0, len(publisher.samples)) @@ -1756,7 +1756,7 @@ class BasePipelineTestCase(base.BaseTestCase): self.transformer_manager) pipe = pipeline_manager.pipelines[0] - pipe.publish_samples(None, [counter]) + pipe.publish_data(None, [counter]) publisher = pipeline_manager.pipelines[0].publishers[0] self.assertEqual(0, len(publisher.samples)) pipe.flush(None) @@ -1767,7 +1767,7 @@ class BasePipelineTestCase(base.BaseTestCase): self.assertEqual(1, len(publisher.samples)) counter.volume = 2048.0 - pipe.publish_samples(None, [counter]) + pipe.publish_data(None, [counter]) pipe.flush(None) self.assertEqual(2, len(publisher.samples)) self.assertEqual(2050.0, publisher.samples[1].volume)