Merge "untie pipeline manager from samples"

This commit is contained in:
Jenkins 2015-01-14 15:14:11 +00:00 committed by Gerrit Code Review
commit 3053108288
3 changed files with 106 additions and 90 deletions

View File

@ -86,10 +86,9 @@ class PublishContext(object):
self.pipelines.update(pipelines)
def __enter__(self):
def p(samples):
def p(data):
for p in self.pipelines:
p.publish_samples(self.context,
samples)
p.publish_data(self.context, data)
return p
def __exit__(self, exc_type, exc_value, traceback):
@ -98,6 +97,54 @@ class PublishContext(object):
class Source(object):
"""Represents a source of samples or events."""
def __init__(self, cfg):
self.cfg = cfg
try:
self.name = cfg['name']
except KeyError as err:
raise PipelineException(
"Required field %s not specified" % err.args[0], cfg)
def __str__(self):
return self.name
def check_sinks(self, sinks):
if not self.sinks:
raise PipelineException(
"No sink defined in source %s" % self,
self.cfg)
for sink in self.sinks:
if sink not in sinks:
raise PipelineException(
"Dangling sink %s from source %s" % (sink, self),
self.cfg)
def check_source_filtering(self, data, d_type):
"""Source data rules checking
- At least one meaningful datapoint exist
- Included type and excluded type can't co-exist on the same pipeline
- Included type meter and wildcard can't co-exist at same pipeline
"""
if not data:
raise PipelineException('No %s specified' % d_type, self.cfg)
if ([x for x in data if x[0] not in '!*'] and
[x for x in data if x[0] == '!']):
raise PipelineException(
'Both included and excluded %s specified' % d_type,
cfg)
if '*' in data and [x for x in data if x[0] not in '!*']:
raise PipelineException(
'Included %s specified with wildcard' % d_type,
self.cfg)
class SampleSource(Source):
"""Represents a source of samples.
In effect it is a set of pollsters and/or notification handlers emitting
@ -107,10 +154,8 @@ class Source(object):
"""
def __init__(self, cfg):
self.cfg = cfg
super(SampleSource, self).__init__(cfg)
try:
self.name = cfg['name']
try:
self.interval = int(cfg['interval'])
except ValueError:
@ -131,34 +176,7 @@ class Source(object):
self.discovery = cfg.get('discovery') or []
if not isinstance(self.discovery, list):
raise PipelineException("Discovery should be a list", cfg)
self._check_meters()
def __str__(self):
return self.name
def _check_meters(self):
"""Meter rules checking
At least one meaningful meter exist
Included type and excluded type meter can't co-exist at
the same pipeline
Included type meter and wildcard can't co-exist at same pipeline
"""
meters = self.meters
if not meters:
raise PipelineException("No meter specified", self.cfg)
if ([x for x in meters if x[0] not in '!*'] and
[x for x in meters if x[0] == '!']):
raise PipelineException(
"Both included and excluded meters specified",
cfg)
if '*' in meters and [x for x in meters if x[0] not in '!*']:
raise PipelineException(
"Included meters specified with wildcard",
self.cfg)
self.check_source_filtering(self.meters, 'meters')
# (yjiang5) To support meters like instance:m1.tiny,
# which include variable part at the end starting with ':'.
@ -192,43 +210,30 @@ class Source(object):
return default
def check_sinks(self, sinks):
if not self.sinks:
raise PipelineException(
"No sink defined in source %s" % self,
self.cfg)
for sink in self.sinks:
if sink not in sinks:
raise PipelineException(
"Dangling sink %s from source %s" % (sink, self),
self.cfg)
class Sink(object):
"""Represents a sink for the transformation and publication of samples.
Samples are emitted from a related source.
"""Represents a sink for the transformation and publication of data.
Each sink config is concerned *only* with the transformation rules
and publication conduits for samples.
and publication conduits for data.
In effect, a sink describes a chain of handlers. The chain starts
with zero or more transformers and ends with one or more publishers.
The first transformer in the chain is passed samples from the
The first transformer in the chain is passed data from the
corresponding source, takes some action such as deriving rate of
change, performing unit conversion, or aggregating, before passing
the modified sample to next step.
the modified data to next step.
The subsequent transformers, if any, handle the data similarly.
At the end of the chain, publishers publish the data. The exact
publishing method depends on publisher type, for example, pushing
into data storage via the message bus providing guaranteed delivery,
or for loss-tolerant samples UDP may be used.
or for loss-tolerant data UDP may be used.
If no transformers are included in the chain, the publishers are
passed samples directly from the sink which are published unchanged.
passed data directly from the sink which are published unchanged.
"""
def __init__(self, cfg, transformer_manager):
@ -280,6 +285,9 @@ class Sink(object):
return transformers
class SampleSink(Sink):
def _transform_sample(self, start, ctxt, sample):
try:
for transformer in self.transformers[start:]:
@ -364,6 +372,17 @@ class Pipeline(object):
return (self.source.name if self.source.name == self.sink.name
else '%s:%s' % (self.source.name, self.sink.name))
def flush(self, ctxt):
self.sink.flush(ctxt)
@property
def publishers(self):
return self.sink.publishers
class SamplePipeline(Pipeline):
"""Represents a pipeline for Samples."""
def get_interval(self):
return self.source.interval
@ -378,19 +397,16 @@ class Pipeline(object):
def support_meter(self, meter_name):
return self.source.support_meter(meter_name)
@property
def publishers(self):
return self.sink.publishers
def publish_sample(self, ctxt, sample):
self.publish_samples(ctxt, [sample])
def publish_samples(self, ctxt, samples):
def publish_data(self, ctxt, samples):
if not isinstance(samples, list):
samples = [samples]
supported = [s for s in samples if self.source.support_meter(s.name)]
self.sink.publish_samples(ctxt, supported)
def flush(self, ctxt):
self.sink.flush(ctxt)
SAMPLE_TYPE = {'pipeline': SamplePipeline,
'source': SampleSource,
'sink': SampleSink}
class PipelineManager(object):
@ -402,7 +418,7 @@ class PipelineManager(object):
"""
def __init__(self, cfg, transformer_manager):
def __init__(self, cfg, transformer_manager, p_type=SAMPLE_TYPE):
"""Setup the pipelines according to config.
The configuration is supported in one of two forms:
@ -506,13 +522,13 @@ class PipelineManager(object):
raise PipelineException("Both sources & sinks are required",
cfg)
LOG.info(_('detected decoupled pipeline config format'))
sources = [Source(s) for s in cfg.get('sources', [])]
sinks = dict((s['name'], Sink(s, transformer_manager))
sources = [p_type['source'](s) for s in cfg.get('sources', [])]
sinks = dict((s['name'], p_type['sink'](s, transformer_manager))
for s in cfg.get('sinks', []))
for source in sources:
source.check_sinks(sinks)
for target in source.sinks:
pipe = Pipeline(source, sinks[target])
pipe = p_type['pipeline'](source, sinks[target])
if pipe.name in [p.name for p in self.pipelines]:
raise PipelineException(
"Duplicate pipeline name: %s. Ensure pipeline"
@ -523,9 +539,9 @@ class PipelineManager(object):
else:
LOG.warning(_('detected deprecated pipeline config format'))
for pipedef in cfg:
source = Source(pipedef)
sink = Sink(pipedef, transformer_manager)
pipe = Pipeline(source, sink)
source = p_type['source'](pipedef)
sink = p_type['sink'](pipedef, transformer_manager)
pipe = p_type['pipeline'](source, sink)
if pipe.name in [p.name for p in self.pipelines]:
raise PipelineException(
"Duplicate pipeline name: %s. Ensure pipeline"

View File

@ -72,7 +72,7 @@ class TestSwiftMiddleware(tests_base.BaseTestCase):
self.pipeline_manager = pipeline_manager
self.samples = []
def publish_samples(self, ctxt, samples):
def publish_data(self, ctxt, samples):
self.samples.extend(samples)
def flush(self, context):

View File

@ -669,16 +669,16 @@ class BasePipelineTestCase(base.BaseTestCase):
self.transformer_manager)
pipe = pipeline_manager.pipelines[0]
pipe.publish_sample(None, self.test_counter)
pipe.publish_data(None, self.test_counter)
publisher = pipeline_manager.pipelines[0].publishers[0]
self.assertEqual(0, len(publisher.samples))
pipe.flush(None)
self.assertEqual(0, len(publisher.samples))
pipe.publish_sample(None, self.test_counter)
pipe.publish_data(None, self.test_counter)
pipe.flush(None)
self.assertEqual(0, len(publisher.samples))
for i in range(CACHE_SIZE - 2):
pipe.publish_sample(None, self.test_counter)
pipe.publish_data(None, self.test_counter)
pipe.flush(None)
self.assertEqual(CACHE_SIZE, len(publisher.samples))
self.assertEqual('a_update_new', getattr(publisher.samples[0], 'name'))
@ -741,7 +741,7 @@ class BasePipelineTestCase(base.BaseTestCase):
pipe = pipeline_manager.pipelines[0]
publisher = pipe.publishers[0]
pipe.publish_sample(None, self.test_counter)
pipe.publish_data(None, self.test_counter)
self.assertEqual(0, len(publisher.samples))
pipe.flush(None)
self.assertEqual(1, len(publisher.samples))
@ -818,7 +818,7 @@ class BasePipelineTestCase(base.BaseTestCase):
self.transformer_manager)
pipe = pipeline_manager.pipelines[0]
pipe.publish_samples(None, counters)
pipe.publish_data(None, counters)
publisher = pipeline_manager.pipelines[0].publishers[0]
self.assertEqual(1, len(publisher.samples))
pipe.flush(None)
@ -872,7 +872,7 @@ class BasePipelineTestCase(base.BaseTestCase):
self.transformer_manager)
pipe = pipeline_manager.pipelines[0]
pipe.publish_samples(None, counters)
pipe.publish_data(None, counters)
publisher = pipeline_manager.pipelines[0].publishers[0]
self.assertEqual(2, len(publisher.samples))
core_temp = publisher.samples[0]
@ -962,7 +962,7 @@ class BasePipelineTestCase(base.BaseTestCase):
self.transformer_manager)
pipe = pipeline_manager.pipelines[0]
pipe.publish_samples(None, counters)
pipe.publish_data(None, counters)
publisher = pipeline_manager.pipelines[0].publishers[0]
self.assertEqual(2, len(publisher.samples))
pipe.flush(None)
@ -1047,7 +1047,7 @@ class BasePipelineTestCase(base.BaseTestCase):
self.transformer_manager)
pipe = pipeline_manager.pipelines[0]
pipe.publish_samples(None, counters)
pipe.publish_data(None, counters)
publisher = pipeline_manager.pipelines[0].publishers[0]
self.assertEqual(0, len(publisher.samples))
pipe.flush(None)
@ -1090,7 +1090,7 @@ class BasePipelineTestCase(base.BaseTestCase):
)
counters.append(s)
pipe.publish_samples(None, counters)
pipe.publish_data(None, counters)
publisher = pipe.publishers[0]
self.assertEqual(2, len(publisher.samples))
pipe.flush(None)
@ -1219,7 +1219,7 @@ class BasePipelineTestCase(base.BaseTestCase):
self.transformer_manager)
pipe = pipeline_manager.pipelines[0]
pipe.publish_samples(None, counters)
pipe.publish_data(None, counters)
pipe.flush(None)
publisher = pipeline_manager.pipelines[0].publishers[0]
self.assertEqual(expected_length, len(publisher.samples))
@ -1255,7 +1255,7 @@ class BasePipelineTestCase(base.BaseTestCase):
self.transformer_manager)
pipe = pipeline_manager.pipelines[0]
pipe.publish_samples(None, counters)
pipe.publish_data(None, counters)
pipe.flush(None)
publisher = pipeline_manager.pipelines[0].publishers[0]
actual = sorted(s.volume for s in publisher.samples)
@ -1460,12 +1460,12 @@ class BasePipelineTestCase(base.BaseTestCase):
self.transformer_manager)
pipe = pipeline_manager.pipelines[0]
pipe.publish_samples(None, [counters[0]])
pipe.publish_data(None, [counters[0]])
pipe.flush(None)
publisher = pipe.publishers[0]
self.assertEqual(0, len(publisher.samples))
pipe.publish_samples(None, [counters[1]])
pipe.publish_data(None, [counters[1]])
pipe.flush(None)
publisher = pipe.publishers[0]
self.assertEqual(2, len(publisher.samples))
@ -1498,7 +1498,7 @@ class BasePipelineTestCase(base.BaseTestCase):
self.transformer_manager)
pipe = pipeline_manager.pipelines[0]
pipe.publish_samples(None, counters)
pipe.publish_data(None, counters)
pipe.flush(None)
publisher = pipeline_manager.pipelines[0].publishers[0]
self.assertEqual(0, len(publisher.samples))
@ -1546,12 +1546,12 @@ class BasePipelineTestCase(base.BaseTestCase):
self.transformer_manager)
pipe = pipeline_manager.pipelines[0]
pipe.publish_samples(None, [counters[0]])
pipe.publish_data(None, [counters[0]])
pipe.flush(None)
publisher = pipe.publishers[0]
self.assertEqual(0, len(publisher.samples))
pipe.publish_samples(None, [counters[1]])
pipe.publish_data(None, [counters[1]])
pipe.flush(None)
publisher = pipe.publishers[0]
@ -1635,7 +1635,7 @@ class BasePipelineTestCase(base.BaseTestCase):
self.transformer_manager)
pipe = pipeline_manager.pipelines[0]
pipe.publish_samples(None, counters)
pipe.publish_data(None, counters)
publisher = pipeline_manager.pipelines[0].publishers[0]
expected_len = len(test_resources) * len(expected)
self.assertEqual(0, len(publisher.samples))
@ -1756,7 +1756,7 @@ class BasePipelineTestCase(base.BaseTestCase):
self.transformer_manager)
pipe = pipeline_manager.pipelines[0]
pipe.publish_samples(None, [counter])
pipe.publish_data(None, [counter])
publisher = pipeline_manager.pipelines[0].publishers[0]
self.assertEqual(0, len(publisher.samples))
pipe.flush(None)
@ -1767,7 +1767,7 @@ class BasePipelineTestCase(base.BaseTestCase):
self.assertEqual(1, len(publisher.samples))
counter.volume = 2048.0
pipe.publish_samples(None, [counter])
pipe.publish_data(None, [counter])
pipe.flush(None)
self.assertEqual(2, len(publisher.samples))
self.assertEqual(2050.0, publisher.samples[1].volume)