Remove source as a publisher argument

We used to have source as a publisher argument, but this anyway ends being
included in its Sample itself. So rather than complicating the internal
publishing API, simplify it and just include source as a Sample field.

Change-Id: Idb9d0c2b5969318dd16394a81d7d61c67f613892
This commit is contained in:
Julien Danjou 2013-07-29 15:01:59 +02:00
parent b999458a43
commit 4f87a14c7e
23 changed files with 261 additions and 200 deletions

View File

@ -87,8 +87,7 @@ pipeline_manager = pipeline.setup_pipeline(
), ),
) )
with pipeline_manager.publisher(context.get_admin_context(), with pipeline_manager.publisher(context.get_admin_context()) as p:
cfg.CONF.sample_source) as p:
p([sample.Sample( p([sample.Sample(
name=cfg.CONF.counter_name, name=cfg.CONF.counter_name,
type=cfg.CONF.counter_type, type=cfg.CONF.counter_type,

View File

@ -19,8 +19,6 @@
import abc import abc
import itertools import itertools
from oslo.config import cfg
from ceilometer.openstack.common import context from ceilometer.openstack.common import context
from ceilometer.openstack.common import log from ceilometer.openstack.common import log
from ceilometer import pipeline from ceilometer import pipeline
@ -39,8 +37,7 @@ class PollingTask(object):
self.manager = agent_manager self.manager = agent_manager
self.pollsters = set() self.pollsters = set()
self.publish_context = pipeline.PublishContext( self.publish_context = pipeline.PublishContext(
agent_manager.context, agent_manager.context)
cfg.CONF.sample_source)
def add(self, pollster, pipelines): def add(self, pollster, pipelines):
self.publish_context.add_pipelines(pipelines) self.publish_context.add_pipelines(pipelines)

View File

@ -42,7 +42,6 @@ from ceilometer.openstack.common import context
from ceilometer.openstack.common import log from ceilometer.openstack.common import log
from ceilometer.openstack.common import timeutils from ceilometer.openstack.common import timeutils
from ceilometer import sample from ceilometer import sample
from ceilometer import pipeline
from ceilometer import storage from ceilometer import storage
from ceilometer.api import acl from ceilometer.api import acl
@ -529,11 +528,8 @@ class MeterController(rest.RestController):
s.timestamp = now s.timestamp = now
s.source = '%s:%s' % (s.project_id, source) s.source = '%s:%s' % (s.project_id, source)
with pipeline.PublishContext( with pecan.request.pipeline_manager.publisher(
context.get_admin_context(), context.get_admin_context()) as publisher:
source,
pecan.request.pipeline_manager.pipelines,
) as publisher:
publisher([sample.Sample( publisher([sample.Sample(
name=s.counter_name, name=s.counter_name,
type=s.counter_type, type=s.counter_type,
@ -543,7 +539,8 @@ class MeterController(rest.RestController):
project_id=s.project_id, project_id=s.project_id,
resource_id=s.resource_id, resource_id=s.resource_id,
timestamp=s.timestamp.isoformat(), timestamp=s.timestamp.isoformat(),
resource_metadata=s.resource_metadata) for s in samples]) resource_metadata=s.resource_metadata,
source=source) for s in samples])
# TODO(asalkeld) this is not ideal, it would be nice if the publisher # TODO(asalkeld) this is not ideal, it would be nice if the publisher
# returned the created sample message with message id (or at least the # returned the created sample message with message id (or at least the

View File

@ -250,8 +250,7 @@ class CollectorService(rpc_service.Service):
handler = ext.obj handler = ext.obj
if notification['event_type'] in handler.get_event_types(): if notification['event_type'] in handler.get_event_types():
ctxt = context.get_admin_context() ctxt = context.get_admin_context()
with self.pipeline_manager.publisher(ctxt, with self.pipeline_manager.publisher(ctxt) as p:
cfg.CONF.sample_source) as p:
# FIXME(dhellmann): Spawn green thread? # FIXME(dhellmann): Spawn green thread?
p(list(handler.process_notification(notification))) p(list(handler.process_notification(notification)))

View File

@ -20,14 +20,11 @@ from ceilometer import pipeline
from ceilometer import transformer from ceilometer import transformer
from ceilometer.openstack.common import context as req_context from ceilometer.openstack.common import context as req_context
from ceilometer.openstack.common import log as logging from ceilometer.openstack.common import log as logging
from oslo.config import cfg
from stevedore import extension from stevedore import extension
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
cfg.CONF.import_opt('sample_source', 'ceilometer.sample')
_notification_manager = None _notification_manager = None
_pipeline_manager = None _pipeline_manager = None
@ -63,8 +60,7 @@ def _process_notification_for_ext(ext, context, notification):
handler = ext.obj handler = ext.obj
if notification['event_type'] in handler.get_event_types(): if notification['event_type'] in handler.get_event_types():
with _pipeline_manager.publisher(context, with _pipeline_manager.publisher(context) as p:
cfg.CONF.sample_source) as p:
# FIXME(dhellmann): Spawn green thread? # FIXME(dhellmann): Spawn green thread?
p(list(handler.process_notification(notification))) p(list(handler.process_notification(notification)))

View File

@ -37,7 +37,6 @@ metadata_headers = X-TEST
from __future__ import absolute_import from __future__ import absolute_import
from oslo.config import cfg
from swift.common.utils import split_path from swift.common.utils import split_path
import webob import webob
@ -130,11 +129,8 @@ class CeilometerMiddleware(object):
resource_metadata['http_header_%s' % header] = req.headers.get( resource_metadata['http_header_%s' % header] = req.headers.get(
header.upper()) header.upper())
with pipeline.PublishContext( with self.pipeline_manager.publisher(
context.get_admin_context(), context.get_admin_context()) as publisher:
cfg.CONF.sample_source,
self.pipeline_manager.pipelines,
) as publisher:
if bytes_received: if bytes_received:
publisher([sample.Sample( publisher([sample.Sample(
name='storage.objects.incoming.bytes', name='storage.objects.incoming.bytes',

View File

@ -49,10 +49,9 @@ class PipelineException(Exception):
class PublishContext(object): class PublishContext(object):
def __init__(self, context, source, pipelines=[]): def __init__(self, context, pipelines=[]):
self.pipelines = set(pipelines) self.pipelines = set(pipelines)
self.context = context self.context = context
self.source = source
def add_pipelines(self, pipelines): def add_pipelines(self, pipelines):
self.pipelines.update(pipelines) self.pipelines.update(pipelines)
@ -61,13 +60,12 @@ class PublishContext(object):
def p(counters): def p(counters):
for p in self.pipelines: for p in self.pipelines:
p.publish_counters(self.context, p.publish_counters(self.context,
counters, counters)
self.source)
return p return p
def __exit__(self, exc_type, exc_value, traceback): def __exit__(self, exc_type, exc_value, traceback):
for p in self.pipelines: for p in self.pipelines:
p.flush(self.context, self.source) p.flush(self.context)
class Pipeline(object): class Pipeline(object):
@ -175,10 +173,10 @@ class Pipeline(object):
return transformers return transformers
def _transform_counter(self, start, ctxt, counter, source): def _transform_counter(self, start, ctxt, counter):
try: try:
for transformer in self.transformers[start:]: for transformer in self.transformers[start:]:
counter = transformer.handle_sample(ctxt, counter, source) counter = transformer.handle_sample(ctxt, counter)
if not counter: if not counter:
LOG.debug("Pipeline %s: Counter dropped by transformer %s", LOG.debug("Pipeline %s: Counter dropped by transformer %s",
self, transformer) self, transformer)
@ -190,7 +188,7 @@ class Pipeline(object):
self, transformer, counter) self, transformer, counter)
LOG.exception(err) LOG.exception(err)
def _publish_counters(self, start, ctxt, counters, source): def _publish_counters(self, start, ctxt, counters):
"""Push counter into pipeline for publishing. """Push counter into pipeline for publishing.
param start: the first transformer that the counter will be injected. param start: the first transformer that the counter will be injected.
@ -198,7 +196,6 @@ class Pipeline(object):
may emit counters may emit counters
param ctxt: execution context from the manager or service param ctxt: execution context from the manager or service
param counters: counter list param counters: counter list
param source: counter source
""" """
@ -206,7 +203,7 @@ class Pipeline(object):
for counter in counters: for counter in counters:
LOG.debug("Pipeline %s: Transform counter %s from %s transformer", LOG.debug("Pipeline %s: Transform counter %s from %s transformer",
self, counter, start) self, counter, start)
counter = self._transform_counter(start, ctxt, counter, source) counter = self._transform_counter(start, ctxt, counter)
if counter: if counter:
transformed_counters.append(counter) transformed_counters.append(counter)
@ -214,22 +211,22 @@ class Pipeline(object):
for p in self.publishers: for p in self.publishers:
try: try:
p.publish_counters(ctxt, transformed_counters, source) p.publish_counters(ctxt, transformed_counters)
except Exception: except Exception:
LOG.exception("Pipeline %s: Continue after error " LOG.exception("Pipeline %s: Continue after error "
"from publisher %s", self, p) "from publisher %s", self, p)
LOG.audit("Pipeline %s: Published counters", self) LOG.audit("Pipeline %s: Published counters", self)
def publish_counter(self, ctxt, counter, source): def publish_counter(self, ctxt, counter):
self.publish_counters(ctxt, [counter], source) self.publish_counters(ctxt, [counter])
def publish_counters(self, ctxt, counters, source): def publish_counters(self, ctxt, counters):
for counter_name, counters in itertools.groupby( for counter_name, counters in itertools.groupby(
sorted(counters, key=lambda c: c.name), sorted(counters, key=lambda c: c.name),
lambda c: c.name): lambda c: c.name):
if self.support_counter(counter_name): if self.support_counter(counter_name):
self._publish_counters(0, ctxt, counters, source) self._publish_counters(0, ctxt, counters)
# (yjiang5) To support counters like instance:m1.tiny, # (yjiang5) To support counters like instance:m1.tiny,
# which include variable part at the end starting with ':'. # which include variable part at the end starting with ':'.
@ -252,15 +249,14 @@ class Pipeline(object):
else: else:
return counter_name in self.counters return counter_name in self.counters
def flush(self, ctxt, source): def flush(self, ctxt):
"""Flush data after all counter have been injected to pipeline.""" """Flush data after all counter have been injected to pipeline."""
LOG.audit("Flush pipeline %s", self) LOG.audit("Flush pipeline %s", self)
for (i, transformer) in enumerate(self.transformers): for (i, transformer) in enumerate(self.transformers):
try: try:
self._publish_counters(i + 1, ctxt, self._publish_counters(i + 1, ctxt,
list(transformer.flush(ctxt, source)), list(transformer.flush(ctxt)))
source)
except Exception as err: except Exception as err:
LOG.warning( LOG.warning(
"Pipeline %s: Error flushing " "Pipeline %s: Error flushing "
@ -327,13 +323,13 @@ class PipelineManager(object):
self.pipelines = [Pipeline(pipedef, transformer_manager) self.pipelines = [Pipeline(pipedef, transformer_manager)
for pipedef in cfg] for pipedef in cfg]
def publisher(self, context, source): def publisher(self, context):
"""Build a new Publisher for these manager pipelines. """Build a new Publisher for these manager pipelines.
:param context: The context. :param context: The context.
:param source: Counter source. :param source: Counter source.
""" """
return PublishContext(context, source, self.pipelines) return PublishContext(context, self.pipelines)
def setup_pipeline(transformer_manager): def setup_pipeline(transformer_manager):

View File

@ -43,5 +43,5 @@ class PublisherBase(object):
pass pass
@abc.abstractmethod @abc.abstractmethod
def publish_counters(self, context, counters, source): def publish_counters(self, context, counters):
"Publish counters into final conduit." "Publish counters into final conduit."

View File

@ -86,12 +86,11 @@ class FilePublisher(publisher.PublisherBase):
rfh.setLevel(logging.INFO) rfh.setLevel(logging.INFO)
self.publisher_logger.addHandler(rfh) self.publisher_logger.addHandler(rfh)
def publish_counters(self, context, counters, source): def publish_counters(self, context, counters):
"""Send a metering message for publishing """Send a metering message for publishing
:param context: Execution context from the service or RPC call :param context: Execution context from the service or RPC call
:param counter: Counter from pipeline after transformation :param counter: Counter from pipeline after transformation
:param source: counter source
""" """
if self.publisher_logger: if self.publisher_logger:
self.publisher_logger.info(counters) self.publisher_logger.info(counters)

View File

@ -137,12 +137,11 @@ class RPCPublisher(publisher.PublisherBase):
% self.policy) % self.policy)
self.policy = 'default' self.policy = 'default'
def publish_counters(self, context, counters, source): def publish_counters(self, context, counters):
"""Publish counters on RPC. """Publish counters on RPC.
:param context: Execution context from the service or RPC call. :param context: Execution context from the service or RPC call.
:param counters: Counters from pipeline after transformation. :param counters: Counters from pipeline after transformation.
:param source: Counter source.
""" """
@ -150,7 +149,7 @@ class RPCPublisher(publisher.PublisherBase):
meter_message_from_counter( meter_message_from_counter(
counter, counter,
cfg.CONF.publisher_rpc.metering_secret, cfg.CONF.publisher_rpc.metering_secret,
source) counter.source)
for counter in counters for counter in counters
] ]

View File

@ -27,11 +27,10 @@ class TestPublisher(publisher.PublisherBase):
def __init__(self, parsed_url): def __init__(self, parsed_url):
self.counters = [] self.counters = []
def publish_counters(self, context, counters, source): def publish_counters(self, context, counters):
"""Send a metering message for publishing """Send a metering message for publishing
:param context: Execution context from the service or RPC call :param context: Execution context from the service or RPC call
:param counter: Counter from pipeline after transformation :param counter: Counter from pipeline after transformation
:param source: counter source
""" """
self.counters.extend(counters) self.counters.extend(counters)

View File

@ -41,17 +41,15 @@ class UDPPublisher(publisher.PublisherBase):
self.socket = socket.socket(socket.AF_INET, self.socket = socket.socket(socket.AF_INET,
socket.SOCK_DGRAM) socket.SOCK_DGRAM)
def publish_counters(self, context, counters, source): def publish_counters(self, context, counters):
"""Send a metering message for publishing """Send a metering message for publishing
:param context: Execution context from the service or RPC call :param context: Execution context from the service or RPC call
:param counter: Counter from pipeline after transformation :param counter: Counter from pipeline after transformation
:param source: counter source
""" """
for counter in counters: for counter in counters:
msg = counter._asdict() msg = counter.as_dict()
msg['source'] = source
host = self.host host = self.host
port = self.port port = self.port
LOG.debug(_("Publishing counter %(msg)s over UDP to " LOG.debug(_("Publishing counter %(msg)s over UDP to "

View File

@ -1,8 +1,10 @@
# -*- encoding: utf-8 -*- # -*- encoding: utf-8 -*-
# #
# Copyright © 2012 New Dream Network, LLC (DreamHost) # Copyright © 2012 New Dream Network, LLC (DreamHost)
# Copyright © 2013 eNovance
# #
# Author: Doug Hellmann <doug.hellmann@dreamhost.com> # Authors: Doug Hellmann <doug.hellmann@dreamhost.com>
# Julien Danjou <julien@danjou.info>
# #
# Licensed under the Apache License, Version 2.0 (the "License"); you may # Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain # not use this file except in compliance with the License. You may obtain
@ -22,7 +24,6 @@ ensure that all of the appropriate fields have been filled
in by the plugins that create them. in by the plugins that create them.
""" """
import collections
import copy import copy
from oslo.config import cfg from oslo.config import cfg
@ -40,6 +41,7 @@ cfg.CONF.register_opts(OPTS)
# Fields explanation: # Fields explanation:
# #
# Source: the source of this sample
# Name: the name of the meter, must be unique # Name: the name of the meter, must be unique
# Type: the type of the meter, must be either: # Type: the type of the meter, must be either:
# - cumulative: the value is incremented and never reset to 0 # - cumulative: the value is incremented and never reset to 0
@ -52,40 +54,41 @@ cfg.CONF.register_opts(OPTS)
# Resource ID: the resource ID # Resource ID: the resource ID
# Timestamp: when the sample has been read # Timestamp: when the sample has been read
# Resource metadata: various metadata # Resource metadata: various metadata
Sample = collections.namedtuple('Sample', class Sample(object):
' '.join([
'name',
'type',
'unit',
'volume',
'user_id',
'project_id',
'resource_id',
'timestamp',
'resource_metadata',
]))
def __init__(self, name, type, unit, volume, user_id, project_id,
resource_id, timestamp, resource_metadata, source=None):
self.name = name
self.type = type
self.unit = unit
self.volume = volume
self.user_id = user_id
self.project_id = project_id
self.resource_id = resource_id
self.timestamp = timestamp
self.resource_metadata = resource_metadata
self.source = source or cfg.CONF.sample_source
def as_dict(self):
return copy.copy(self.__dict__)
@classmethod
def from_notification(cls, name, type, volume, unit,
user_id, project_id, resource_id,
message):
metadata = copy.copy(message['payload'])
metadata['event_type'] = message['event_type']
metadata['host'] = message['publisher_id']
return cls(name=name,
type=type,
volume=volume,
unit=unit,
user_id=user_id,
project_id=project_id,
resource_id=resource_id,
timestamp=message['timestamp'],
resource_metadata=metadata)
TYPE_GAUGE = 'gauge' TYPE_GAUGE = 'gauge'
TYPE_DELTA = 'delta' TYPE_DELTA = 'delta'
TYPE_CUMULATIVE = 'cumulative' TYPE_CUMULATIVE = 'cumulative'
def from_notification(cls, name, type, volume, unit,
user_id, project_id, resource_id,
message):
metadata = copy.copy(message['payload'])
metadata['event_type'] = message['event_type']
metadata['host'] = message['publisher_id']
return cls(name=name,
type=type,
volume=volume,
unit=unit,
user_id=user_id,
project_id=project_id,
resource_id=resource_id,
timestamp=message['timestamp'],
resource_metadata=metadata)
Sample.from_notification = classmethod(from_notification)

View File

@ -53,18 +53,16 @@ class TransformerBase(object):
super(TransformerBase, self).__init__() super(TransformerBase, self).__init__()
@abc.abstractmethod @abc.abstractmethod
def handle_sample(self, context, counter, source): def handle_sample(self, context, counter):
"""Transform a counter. """Transform a counter.
:param context: Passed from the data collector. :param context: Passed from the data collector.
:param counter: A counter. :param counter: A counter.
:param source: Passed from data collector.
""" """
def flush(self, context, source): def flush(self, context):
"""Flush counters cached previously. """Flush counters cached previously.
:param context: Passed from the data collector. :param context: Passed from the data collector.
:param source: Source of counters that are being published.
""" """
return [] return []

View File

@ -31,13 +31,13 @@ class TransformerAccumulator(transformer.TransformerBase):
self.size = size self.size = size
super(TransformerAccumulator, self).__init__(**kwargs) super(TransformerAccumulator, self).__init__(**kwargs)
def handle_sample(self, context, counter, source): def handle_sample(self, context, counter):
if self.size >= 1: if self.size >= 1:
self.counters.append(counter) self.counters.append(counter)
else: else:
return counter return counter
def flush(self, context, source): def flush(self, context):
if len(self.counters) >= self.size: if len(self.counters) >= self.size:
x = self.counters x = self.counters
self.counters = [] self.counters = []

View File

@ -74,7 +74,7 @@ class ScalingTransformer(transformer.TransformerBase):
"""Apply the scaling factor (either a straight multiplicative """Apply the scaling factor (either a straight multiplicative
factor or else a string to be eval'd). factor or else a string to be eval'd).
""" """
ns = Namespace(counter._asdict()) ns = Namespace(counter.as_dict())
return ((eval(scale, {}, ns) if isinstance(scale, basestring) return ((eval(scale, {}, ns) if isinstance(scale, basestring)
else counter.volume * scale) if scale else counter.volume) else counter.volume * scale) if scale else counter.volume)
@ -95,7 +95,7 @@ class ScalingTransformer(transformer.TransformerBase):
resource_metadata=counter.resource_metadata resource_metadata=counter.resource_metadata
) )
def handle_sample(self, context, counter, source): def handle_sample(self, context, counter):
"""Handle a sample, converting if necessary.""" """Handle a sample, converting if necessary."""
LOG.debug('handling counter %s', (counter,)) LOG.debug('handling counter %s', (counter,))
if (self.source.get('unit', counter.unit) == counter.unit): if (self.source.get('unit', counter.unit) == counter.unit):
@ -117,7 +117,7 @@ class RateOfChangeTransformer(ScalingTransformer):
self.cache = {} self.cache = {}
super(RateOfChangeTransformer, self).__init__(**kwargs) super(RateOfChangeTransformer, self).__init__(**kwargs)
def handle_sample(self, context, counter, source): def handle_sample(self, context, counter):
"""Handle a sample, converting if necessary.""" """Handle a sample, converting if necessary."""
LOG.debug('handling counter %s', (counter,)) LOG.debug('handling counter %s', (counter,))
key = counter.name + counter.resource_id key = counter.name + counter.resource_id

View File

@ -71,15 +71,42 @@ class BaseAgentManagerTestCase(base.TestCase):
class PollsterAnother(TestPollster): class PollsterAnother(TestPollster):
counters = [] counters = []
test_data = default_test_data._replace(name='testanother') test_data = sample.Sample(
name='testanother',
type=default_test_data.type,
unit=default_test_data.unit,
volume=default_test_data.volume,
user_id=default_test_data.user_id,
project_id=default_test_data.project_id,
resource_id=default_test_data.resource_id,
timestamp=default_test_data.timestamp,
resource_metadata=default_test_data.resource_metadata)
class PollsterException(TestPollsterException): class PollsterException(TestPollsterException):
counters = [] counters = []
test_data = default_test_data._replace(name='testexception') test_data = sample.Sample(
name='testexception',
type=default_test_data.type,
unit=default_test_data.unit,
volume=default_test_data.volume,
user_id=default_test_data.user_id,
project_id=default_test_data.project_id,
resource_id=default_test_data.resource_id,
timestamp=default_test_data.timestamp,
resource_metadata=default_test_data.resource_metadata)
class PollsterExceptionAnother(TestPollsterException): class PollsterExceptionAnother(TestPollsterException):
counters = [] counters = []
test_data = default_test_data._replace(name='testexceptionanother') test_data = sample.Sample(
name='testexceptionanother',
type=default_test_data.type,
unit=default_test_data.unit,
volume=default_test_data.volume,
user_id=default_test_data.user_id,
project_id=default_test_data.project_id,
resource_id=default_test_data.resource_id,
timestamp=default_test_data.timestamp,
resource_metadata=default_test_data.resource_metadata)
def setup_pipeline(self): def setup_pipeline(self):
self.transformer_manager = transformer.TransformerExtensionManager( self.transformer_manager = transformer.TransformerExtensionManager(

View File

@ -115,7 +115,7 @@ class TestUDPCollectorService(TestCollector):
def setUp(self): def setUp(self):
super(TestUDPCollectorService, self).setUp() super(TestUDPCollectorService, self).setUp()
self.srv = service.UDPCollectorService() self.srv = service.UDPCollectorService()
self.counter = dict(sample.Sample( self.counter = sample.Sample(
name='foobar', name='foobar',
type='bad', type='bad',
unit='F', unit='F',
@ -125,7 +125,7 @@ class TestUDPCollectorService(TestCollector):
resource_id='cat', resource_id='cat',
timestamp='NOW!', timestamp='NOW!',
resource_metadata={}, resource_metadata={},
)._asdict()) ).as_dict()
def test_service_has_storage_conn(self): def test_service_has_storage_conn(self):
srv = service.UDPCollectorService() srv = service.UDPCollectorService()

View File

@ -43,24 +43,21 @@ class FakeApp(object):
class TestSwiftMiddleware(base.TestCase): class TestSwiftMiddleware(base.TestCase):
class _faux_pipeline_manager(object): class _faux_pipeline_manager(pipeline.PipelineManager):
class _faux_pipeline(object): class _faux_pipeline(object):
def __init__(self, pipeline_manager): def __init__(self, pipeline_manager):
self.pipeline_manager = pipeline_manager self.pipeline_manager = pipeline_manager
self.counters = [] self.counters = []
def publish_counters(self, ctxt, counters, source): def publish_counters(self, ctxt, counters):
self.counters.extend(counters) self.counters.extend(counters)
def flush(self, context, source): def flush(self, context):
pass pass
def __init__(self): def __init__(self):
self.pipelines = [self._faux_pipeline(self)] self.pipelines = [self._faux_pipeline(self)]
def flush(self, ctx, source):
pass
def _faux_setup_pipeline(self, transformer_manager): def _faux_setup_pipeline(self, transformer_manager):
return self.pipeline_manager return self.pipeline_manager

View File

@ -66,16 +66,13 @@ class TestFilePublisher(base.TestCase):
), ),
] ]
COUNTER_SOURCE = 'testsource'
def test_file_publisher(self): def test_file_publisher(self):
# Test valid configurations # Test valid configurations
parsed_url = urlsplit( parsed_url = urlsplit(
'file:///tmp/log_file?max_bytes=50&backup_count=3') 'file:///tmp/log_file?max_bytes=50&backup_count=3')
publisher = file.FilePublisher(parsed_url) publisher = file.FilePublisher(parsed_url)
publisher.publish_counters(None, publisher.publish_counters(None,
self.test_data, self.test_data)
self.COUNTER_SOURCE)
handler = publisher.publisher_logger.handlers[0] handler = publisher.publisher_logger.handlers[0]
self.assertTrue(isinstance(handler, self.assertTrue(isinstance(handler,
@ -91,8 +88,7 @@ class TestFilePublisher(base.TestCase):
'file:///tmp/log_file_plain') 'file:///tmp/log_file_plain')
publisher = file.FilePublisher(parsed_url) publisher = file.FilePublisher(parsed_url)
publisher.publish_counters(None, publisher.publish_counters(None,
self.test_data, self.test_data)
self.COUNTER_SOURCE)
handler = publisher.publisher_logger.handlers[0] handler = publisher.publisher_logger.handlers[0]
self.assertTrue(isinstance(handler, self.assertTrue(isinstance(handler,
@ -109,7 +105,6 @@ class TestFilePublisher(base.TestCase):
'file:///tmp/log_file_bad?max_bytes=yus&backup_count=5y') 'file:///tmp/log_file_bad?max_bytes=yus&backup_count=5y')
publisher = file.FilePublisher(parsed_url) publisher = file.FilePublisher(parsed_url)
publisher.publish_counters(None, publisher.publish_counters(None,
self.test_data, self.test_data)
self.COUNTER_SOURCE)
self.assertIsNone(publisher.publisher_logger) self.assertIsNone(publisher.publisher_logger)

View File

@ -222,8 +222,7 @@ class TestPublish(base.TestCase):
publisher = rpc.RPCPublisher( publisher = rpc.RPCPublisher(
network_utils.urlsplit('rpc://')) network_utils.urlsplit('rpc://'))
publisher.publish_counters(None, publisher.publish_counters(None,
self.test_data, self.test_data)
'test')
self.assertEqual(len(self.published), 1) self.assertEqual(len(self.published), 1)
self.assertEqual(self.published[0][0], self.assertEqual(self.published[0][0],
cfg.CONF.publisher_rpc.metering_topic) cfg.CONF.publisher_rpc.metering_topic)
@ -235,8 +234,7 @@ class TestPublish(base.TestCase):
publisher = rpc.RPCPublisher( publisher = rpc.RPCPublisher(
network_utils.urlsplit('rpc://?target=custom_procedure_call')) network_utils.urlsplit('rpc://?target=custom_procedure_call'))
publisher.publish_counters(None, publisher.publish_counters(None,
self.test_data, self.test_data)
'test')
self.assertEqual(len(self.published), 1) self.assertEqual(len(self.published), 1)
self.assertEqual(self.published[0][0], self.assertEqual(self.published[0][0],
cfg.CONF.publisher_rpc.metering_topic) cfg.CONF.publisher_rpc.metering_topic)
@ -248,8 +246,7 @@ class TestPublish(base.TestCase):
publisher = rpc.RPCPublisher( publisher = rpc.RPCPublisher(
network_utils.urlsplit('rpc://?per_meter_topic=1')) network_utils.urlsplit('rpc://?per_meter_topic=1'))
publisher.publish_counters(None, publisher.publish_counters(None,
self.test_data, self.test_data)
'test')
self.assertEqual(len(self.published), 4) self.assertEqual(len(self.published), 4)
for topic, rpc_call in self.published: for topic, rpc_call in self.published:
meters = rpc_call['args']['data'] meters = rpc_call['args']['data']
@ -276,7 +273,7 @@ class TestPublish(base.TestCase):
self.assertRaises( self.assertRaises(
SystemExit, SystemExit,
publisher.publish_counters, publisher.publish_counters,
None, self.test_data, 'test') None, self.test_data)
self.assertEqual(publisher.policy, 'default') self.assertEqual(publisher.policy, 'default')
self.assertEqual(len(self.published), 0) self.assertEqual(len(self.published), 0)
self.assertEqual(len(publisher.local_queue), 0) self.assertEqual(len(publisher.local_queue), 0)
@ -288,7 +285,7 @@ class TestPublish(base.TestCase):
self.assertRaises( self.assertRaises(
SystemExit, SystemExit,
publisher.publish_counters, publisher.publish_counters,
None, self.test_data, 'test') None, self.test_data)
self.assertEqual(len(self.published), 0) self.assertEqual(len(self.published), 0)
self.assertEqual(len(publisher.local_queue), 0) self.assertEqual(len(publisher.local_queue), 0)
@ -299,7 +296,7 @@ class TestPublish(base.TestCase):
self.assertRaises( self.assertRaises(
SystemExit, SystemExit,
publisher.publish_counters, publisher.publish_counters,
None, self.test_data, 'test') None, self.test_data)
self.assertEqual(publisher.policy, 'default') self.assertEqual(publisher.policy, 'default')
self.assertEqual(len(self.published), 0) self.assertEqual(len(self.published), 0)
self.assertEqual(len(publisher.local_queue), 0) self.assertEqual(len(publisher.local_queue), 0)
@ -309,8 +306,7 @@ class TestPublish(base.TestCase):
publisher = rpc.RPCPublisher( publisher = rpc.RPCPublisher(
network_utils.urlsplit('rpc://?policy=drop')) network_utils.urlsplit('rpc://?policy=drop'))
publisher.publish_counters(None, publisher.publish_counters(None,
self.test_data, self.test_data)
'test')
self.assertEqual(len(self.published), 0) self.assertEqual(len(self.published), 0)
self.assertEqual(len(publisher.local_queue), 0) self.assertEqual(len(publisher.local_queue), 0)
@ -319,8 +315,7 @@ class TestPublish(base.TestCase):
publisher = rpc.RPCPublisher( publisher = rpc.RPCPublisher(
network_utils.urlsplit('rpc://?policy=queue')) network_utils.urlsplit('rpc://?policy=queue'))
publisher.publish_counters(None, publisher.publish_counters(None,
self.test_data, self.test_data)
'test')
self.assertEqual(len(self.published), 0) self.assertEqual(len(self.published), 0)
self.assertEqual(len(publisher.local_queue), 1) self.assertEqual(len(publisher.local_queue), 1)
@ -329,15 +324,13 @@ class TestPublish(base.TestCase):
publisher = rpc.RPCPublisher( publisher = rpc.RPCPublisher(
network_utils.urlsplit('rpc://?policy=queue')) network_utils.urlsplit('rpc://?policy=queue'))
publisher.publish_counters(None, publisher.publish_counters(None,
self.test_data, self.test_data)
'test')
self.assertEqual(len(self.published), 0) self.assertEqual(len(self.published), 0)
self.assertEqual(len(publisher.local_queue), 1) self.assertEqual(len(publisher.local_queue), 1)
self.rpc_unreachable = False self.rpc_unreachable = False
publisher.publish_counters(None, publisher.publish_counters(None,
self.test_data, self.test_data)
'test')
self.assertEqual(len(self.published), 2) self.assertEqual(len(self.published), 2)
self.assertEqual(len(publisher.local_queue), 0) self.assertEqual(len(publisher.local_queue), 0)
@ -347,9 +340,10 @@ class TestPublish(base.TestCase):
publisher = rpc.RPCPublisher( publisher = rpc.RPCPublisher(
network_utils.urlsplit('rpc://?policy=queue&max_queue_length=3')) network_utils.urlsplit('rpc://?policy=queue&max_queue_length=3'))
for i in range(0, 5): for i in range(0, 5):
for s in self.test_data:
s.source = 'test-%d' % i
publisher.publish_counters(None, publisher.publish_counters(None,
self.test_data, self.test_data)
'test-%d' % i)
self.assertEqual(len(self.published), 0) self.assertEqual(len(self.published), 0)
self.assertEqual(len(publisher.local_queue), 3) self.assertEqual(len(publisher.local_queue), 3)
self.assertEqual( self.assertEqual(
@ -370,9 +364,10 @@ class TestPublish(base.TestCase):
publisher = rpc.RPCPublisher( publisher = rpc.RPCPublisher(
network_utils.urlsplit('rpc://?policy=queue')) network_utils.urlsplit('rpc://?policy=queue'))
for i in range(0, 2000): for i in range(0, 2000):
for s in self.test_data:
s.source = 'test-%d' % i
publisher.publish_counters(None, publisher.publish_counters(None,
self.test_data, self.test_data)
'test-%d' % i)
self.assertEqual(len(self.published), 0) self.assertEqual(len(self.published), 0)
self.assertEqual(len(publisher.local_queue), 1024) self.assertEqual(len(publisher.local_queue), 1024)
self.assertEqual( self.assertEqual(

View File

@ -29,6 +29,9 @@ from ceilometer.tests import base
from ceilometer.openstack.common import network_utils from ceilometer.openstack.common import network_utils
COUNTER_SOURCE = 'testsource'
class TestUDPPublisher(base.TestCase): class TestUDPPublisher(base.TestCase):
test_data = [ test_data = [
@ -42,6 +45,7 @@ class TestUDPPublisher(base.TestCase):
resource_id='test_run_tasks', resource_id='test_run_tasks',
timestamp=datetime.datetime.utcnow().isoformat(), timestamp=datetime.datetime.utcnow().isoformat(),
resource_metadata={'name': 'TestPublish'}, resource_metadata={'name': 'TestPublish'},
source=COUNTER_SOURCE,
), ),
sample.Sample( sample.Sample(
name='test', name='test',
@ -53,6 +57,7 @@ class TestUDPPublisher(base.TestCase):
resource_id='test_run_tasks', resource_id='test_run_tasks',
timestamp=datetime.datetime.utcnow().isoformat(), timestamp=datetime.datetime.utcnow().isoformat(),
resource_metadata={'name': 'TestPublish'}, resource_metadata={'name': 'TestPublish'},
source=COUNTER_SOURCE,
), ),
sample.Sample( sample.Sample(
name='test2', name='test2',
@ -64,6 +69,7 @@ class TestUDPPublisher(base.TestCase):
resource_id='test_run_tasks', resource_id='test_run_tasks',
timestamp=datetime.datetime.utcnow().isoformat(), timestamp=datetime.datetime.utcnow().isoformat(),
resource_metadata={'name': 'TestPublish'}, resource_metadata={'name': 'TestPublish'},
source=COUNTER_SOURCE,
), ),
sample.Sample( sample.Sample(
name='test2', name='test2',
@ -75,6 +81,7 @@ class TestUDPPublisher(base.TestCase):
resource_id='test_run_tasks', resource_id='test_run_tasks',
timestamp=datetime.datetime.utcnow().isoformat(), timestamp=datetime.datetime.utcnow().isoformat(),
resource_metadata={'name': 'TestPublish'}, resource_metadata={'name': 'TestPublish'},
source=COUNTER_SOURCE,
), ),
sample.Sample( sample.Sample(
name='test3', name='test3',
@ -86,6 +93,7 @@ class TestUDPPublisher(base.TestCase):
resource_id='test_run_tasks', resource_id='test_run_tasks',
timestamp=datetime.datetime.utcnow().isoformat(), timestamp=datetime.datetime.utcnow().isoformat(),
resource_metadata={'name': 'TestPublish'}, resource_metadata={'name': 'TestPublish'},
source=COUNTER_SOURCE,
), ),
] ]
@ -99,8 +107,6 @@ class TestUDPPublisher(base.TestCase):
return udp_socket return udp_socket
return _fake_socket_socket return _fake_socket_socket
COUNTER_SOURCE = 'testsource'
def test_published(self): def test_published(self):
self.data_sent = [] self.data_sent = []
with mock.patch('socket.socket', with mock.patch('socket.socket',
@ -108,8 +114,7 @@ class TestUDPPublisher(base.TestCase):
publisher = udp.UDPPublisher( publisher = udp.UDPPublisher(
network_utils.urlsplit('udp://somehost')) network_utils.urlsplit('udp://somehost'))
publisher.publish_counters(None, publisher.publish_counters(None,
self.test_data, self.test_data)
self.COUNTER_SOURCE)
self.assertEqual(len(self.data_sent), 5) self.assertEqual(len(self.data_sent), 5)
@ -117,10 +122,6 @@ class TestUDPPublisher(base.TestCase):
for data, dest in self.data_sent: for data, dest in self.data_sent:
counter = msgpack.loads(data) counter = msgpack.loads(data)
self.assertEqual(counter['source'], self.COUNTER_SOURCE)
# Remove source because our test Counters don't have it, so the
# comparison would fail later
del counter['source']
sent_counters.append(counter) sent_counters.append(counter)
# Check destination # Check destination
@ -129,7 +130,7 @@ class TestUDPPublisher(base.TestCase):
# Check that counters are equal # Check that counters are equal
self.assertEqual(sorted(sent_counters), self.assertEqual(sorted(sent_counters),
sorted([dict(d._asdict()) for d in self.test_data])) sorted([dict(d.as_dict()) for d in self.test_data]))
@staticmethod @staticmethod
def _raise_ioerror(): def _raise_ioerror():
@ -146,5 +147,4 @@ class TestUDPPublisher(base.TestCase):
publisher = udp.UDPPublisher( publisher = udp.UDPPublisher(
network_utils.urlsplit('udp://localhost')) network_utils.urlsplit('udp://localhost'))
publisher.publish_counters(None, publisher.publish_counters(None,
self.test_data, self.test_data)
self.COUNTER_SOURCE)

View File

@ -67,7 +67,7 @@ class TestPipeline(base.TestCase):
return fake_drivers[url](url) return fake_drivers[url](url)
class PublisherClassException(publisher.PublisherBase): class PublisherClassException(publisher.PublisherBase):
def publish_counters(self, ctxt, counters, source): def publish_counters(self, ctxt, counters):
raise Exception() raise Exception()
class TransformerClass(transformer.TransformerBase): class TransformerClass(transformer.TransformerBase):
@ -77,13 +77,23 @@ class TestPipeline(base.TestCase):
self.__class__.samples = [] self.__class__.samples = []
self.append_name = append_name self.append_name = append_name
def flush(self, ctxt, source): def flush(self, ctxt):
return [] return []
def handle_sample(self, ctxt, counter, source): def handle_sample(self, ctxt, counter):
self.__class__.samples.append(counter) self.__class__.samples.append(counter)
newname = getattr(counter, 'name') + self.append_name newname = getattr(counter, 'name') + self.append_name
return counter._replace(name=newname) return sample.Sample(
name=newname,
type=counter.type,
volume=counter.volume,
unit=counter.unit,
user_id=counter.user_id,
project_id=counter.project_id,
resource_id=counter.resource_id,
timestamp=counter.timestamp,
resource_metadata=counter.resource_metadata,
)
class TransformerClassDrop(transformer.TransformerBase): class TransformerClassDrop(transformer.TransformerBase):
samples = [] samples = []
@ -91,11 +101,11 @@ class TestPipeline(base.TestCase):
def __init__(self): def __init__(self):
self.__class__.samples = [] self.__class__.samples = []
def handle_sample(self, ctxt, counter, source): def handle_sample(self, ctxt, counter):
self.__class__.samples.append(counter) self.__class__.samples.append(counter)
class TransformerClassException(object): class TransformerClassException(object):
def handle_sample(self, ctxt, counter, source): def handle_sample(self, ctxt, counter):
raise Exception() raise Exception()
def setUp(self): def setUp(self):
@ -204,7 +214,7 @@ class TestPipeline(base.TestCase):
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.transformer_manager) self.transformer_manager)
with pipeline_manager.publisher(None, None) as p: with pipeline_manager.publisher(None) as p:
p([self.test_counter]) p([self.test_counter])
publisher = pipeline_manager.pipelines[0].publishers[0] publisher = pipeline_manager.pipelines[0].publishers[0]
@ -220,14 +230,25 @@ class TestPipeline(base.TestCase):
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.transformer_manager) self.transformer_manager)
with pipeline_manager.publisher(None, None) as p: with pipeline_manager.publisher(None) as p:
p([self.test_counter]) p([self.test_counter])
publisher = pipeline_manager.pipelines[0].publishers[0] publisher = pipeline_manager.pipelines[0].publishers[0]
self.assertEqual(len(publisher.counters), 1) self.assertEqual(len(publisher.counters), 1)
self.test_counter = self.test_counter._replace(name='b') self.test_counter = sample.Sample(
with pipeline_manager.publisher(None, None) as p: name='b',
type=self.test_counter.type,
volume=self.test_counter.volume,
unit=self.test_counter.unit,
user_id=self.test_counter.user_id,
project_id=self.test_counter.project_id,
resource_id=self.test_counter.resource_id,
timestamp=self.test_counter.timestamp,
resource_metadata=self.test_counter.resource_metadata,
)
with pipeline_manager.publisher(None) as p:
p([self.test_counter]) p([self.test_counter])
self.assertEqual(len(publisher.counters), 2) self.assertEqual(len(publisher.counters), 2)
@ -240,7 +261,7 @@ class TestPipeline(base.TestCase):
self.pipeline_cfg[0]['counters'] = counter_cfg self.pipeline_cfg[0]['counters'] = counter_cfg
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.transformer_manager) self.transformer_manager)
with pipeline_manager.publisher(None, None) as p: with pipeline_manager.publisher(None) as p:
p([self.test_counter]) p([self.test_counter])
publisher = pipeline_manager.pipelines[0].publishers[0] publisher = pipeline_manager.pipelines[0].publishers[0]
@ -260,7 +281,7 @@ class TestPipeline(base.TestCase):
self.pipeline_cfg[0]['counters'] = counter_cfg self.pipeline_cfg[0]['counters'] = counter_cfg
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.transformer_manager) self.transformer_manager)
with pipeline_manager.publisher(None, None) as p: with pipeline_manager.publisher(None) as p:
p([self.test_counter]) p([self.test_counter])
publisher = pipeline_manager.pipelines[0].publishers[0] publisher = pipeline_manager.pipelines[0].publishers[0]
self.assertEqual(len(publisher.counters), 1) self.assertEqual(len(publisher.counters), 1)
@ -273,7 +294,7 @@ class TestPipeline(base.TestCase):
self.pipeline_cfg[0]['counters'] = counter_cfg self.pipeline_cfg[0]['counters'] = counter_cfg
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.transformer_manager) self.transformer_manager)
with pipeline_manager.publisher(None, None) as p: with pipeline_manager.publisher(None) as p:
p([self.test_counter]) p([self.test_counter])
publisher = pipeline_manager.pipelines[0].publishers[0] publisher = pipeline_manager.pipelines[0].publishers[0]
@ -309,12 +330,22 @@ class TestPipeline(base.TestCase):
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.transformer_manager) self.transformer_manager)
with pipeline_manager.publisher(None, None) as p: with pipeline_manager.publisher(None) as p:
p([self.test_counter]) p([self.test_counter])
self.test_counter = self.test_counter._replace(name='b') self.test_counter = sample.Sample(
name='b',
type=self.test_counter.type,
volume=self.test_counter.volume,
unit=self.test_counter.unit,
user_id=self.test_counter.user_id,
project_id=self.test_counter.project_id,
resource_id=self.test_counter.resource_id,
timestamp=self.test_counter.timestamp,
resource_metadata=self.test_counter.resource_metadata,
)
with pipeline_manager.publisher(None, None) as p: with pipeline_manager.publisher(None) as p:
p([self.test_counter]) p([self.test_counter])
publisher = pipeline_manager.pipelines[0].publishers[0] publisher = pipeline_manager.pipelines[0].publishers[0]
@ -349,12 +380,22 @@ class TestPipeline(base.TestCase):
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.transformer_manager) self.transformer_manager)
with pipeline_manager.publisher(None, None) as p: with pipeline_manager.publisher(None) as p:
p([self.test_counter]) p([self.test_counter])
self.test_counter = self.test_counter._replace(name='b') self.test_counter = sample.Sample(
name='b',
type=self.test_counter.type,
volume=self.test_counter.volume,
unit=self.test_counter.unit,
user_id=self.test_counter.user_id,
project_id=self.test_counter.project_id,
resource_id=self.test_counter.resource_id,
timestamp=self.test_counter.timestamp,
resource_metadata=self.test_counter.resource_metadata,
)
with pipeline_manager.publisher(None, None) as p: with pipeline_manager.publisher(None) as p:
p([self.test_counter]) p([self.test_counter])
publisher = pipeline_manager.pipelines[0].publishers[0] publisher = pipeline_manager.pipelines[0].publishers[0]
@ -370,7 +411,7 @@ class TestPipeline(base.TestCase):
self.pipeline_cfg[0]['transformers'] = None self.pipeline_cfg[0]['transformers'] = None
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.transformer_manager) self.transformer_manager)
with pipeline_manager.publisher(None, None) as p: with pipeline_manager.publisher(None) as p:
p([self.test_counter]) p([self.test_counter])
publisher = pipeline_manager.pipelines[0].publishers[0] publisher = pipeline_manager.pipelines[0].publishers[0]
self.assertEqual(len(publisher.counters), 1) self.assertEqual(len(publisher.counters), 1)
@ -380,7 +421,7 @@ class TestPipeline(base.TestCase):
self.pipeline_cfg[0]['transformers'] = [] self.pipeline_cfg[0]['transformers'] = []
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.transformer_manager) self.transformer_manager)
with pipeline_manager.publisher(None, None) as p: with pipeline_manager.publisher(None) as p:
p([self.test_counter]) p([self.test_counter])
publisher = pipeline_manager.pipelines[0].publishers[0] publisher = pipeline_manager.pipelines[0].publishers[0]
self.assertEqual(len(publisher.counters), 1) self.assertEqual(len(publisher.counters), 1)
@ -400,7 +441,7 @@ class TestPipeline(base.TestCase):
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.transformer_manager) self.transformer_manager)
with pipeline_manager.publisher(None, None) as p: with pipeline_manager.publisher(None) as p:
p([self.test_counter]) p([self.test_counter])
publisher = pipeline_manager.pipelines[0].publishers[0] publisher = pipeline_manager.pipelines[0].publishers[0]
@ -432,7 +473,7 @@ class TestPipeline(base.TestCase):
] ]
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.transformer_manager) self.transformer_manager)
with pipeline_manager.publisher(None, None) as p: with pipeline_manager.publisher(None) as p:
p([self.test_counter]) p([self.test_counter])
self.assertTrue(len(self.TransformerClass.samples) == 2) self.assertTrue(len(self.TransformerClass.samples) == 2)
@ -468,7 +509,7 @@ class TestPipeline(base.TestCase):
] ]
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.transformer_manager) self.transformer_manager)
with pipeline_manager.publisher(None, None) as p: with pipeline_manager.publisher(None) as p:
p([self.test_counter]) p([self.test_counter])
publisher = pipeline_manager.pipelines[0].publishers[0] publisher = pipeline_manager.pipelines[0].publishers[0]
@ -485,7 +526,7 @@ class TestPipeline(base.TestCase):
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.transformer_manager) self.transformer_manager)
with pipeline_manager.publisher(None, None) as p: with pipeline_manager.publisher(None) as p:
p([self.test_counter]) p([self.test_counter])
publisher = pipeline_manager.pipelines[0].publishers[0] publisher = pipeline_manager.pipelines[0].publishers[0]
@ -501,7 +542,7 @@ class TestPipeline(base.TestCase):
self.pipeline_cfg[0]['publishers'] = ['except://', 'new://'] self.pipeline_cfg[0]['publishers'] = ['except://', 'new://']
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.transformer_manager) self.transformer_manager)
with pipeline_manager.publisher(None, None) as p: with pipeline_manager.publisher(None) as p:
p([self.test_counter]) p([self.test_counter])
new_publisher = pipeline_manager.pipelines[0].publishers[1] new_publisher = pipeline_manager.pipelines[0].publishers[1]
@ -513,9 +554,19 @@ class TestPipeline(base.TestCase):
self.pipeline_cfg[0]['counters'] = ['a', 'b'] self.pipeline_cfg[0]['counters'] = ['a', 'b']
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.transformer_manager) self.transformer_manager)
with pipeline_manager.publisher(None, None) as p: with pipeline_manager.publisher(None) as p:
p([self.test_counter, p([self.test_counter,
self.test_counter._replace(name='b')]) sample.Sample(
name='b',
type=self.test_counter.type,
volume=self.test_counter.volume,
unit=self.test_counter.unit,
user_id=self.test_counter.user_id,
project_id=self.test_counter.project_id,
resource_id=self.test_counter.resource_id,
timestamp=self.test_counter.timestamp,
resource_metadata=self.test_counter.resource_metadata,
)])
publisher = pipeline_manager.pipelines[0].publishers[0] publisher = pipeline_manager.pipelines[0].publishers[0]
self.assertEqual(len(publisher.counters), 2) self.assertEqual(len(publisher.counters), 2)
@ -543,17 +594,17 @@ class TestPipeline(base.TestCase):
self.transformer_manager) self.transformer_manager)
pipe = pipeline_manager.pipelines[0] pipe = pipeline_manager.pipelines[0]
pipe.publish_counter(None, self.test_counter, None) pipe.publish_counter(None, self.test_counter)
publisher = pipeline_manager.pipelines[0].publishers[0] publisher = pipeline_manager.pipelines[0].publishers[0]
self.assertEqual(len(publisher.counters), 0) self.assertEqual(len(publisher.counters), 0)
pipe.flush(None, None) pipe.flush(None)
self.assertEqual(len(publisher.counters), 0) self.assertEqual(len(publisher.counters), 0)
pipe.publish_counter(None, self.test_counter, None) pipe.publish_counter(None, self.test_counter)
pipe.flush(None, None) pipe.flush(None)
self.assertEqual(len(publisher.counters), 0) self.assertEqual(len(publisher.counters), 0)
for i in range(CACHE_SIZE - 2): for i in range(CACHE_SIZE - 2):
pipe.publish_counter(None, self.test_counter, None) pipe.publish_counter(None, self.test_counter)
pipe.flush(None, None) pipe.flush(None)
self.assertEqual(len(publisher.counters), CACHE_SIZE) self.assertEqual(len(publisher.counters), CACHE_SIZE)
self.assertTrue(getattr(publisher.counters[0], 'name') self.assertTrue(getattr(publisher.counters[0], 'name')
== 'a_update_new') == 'a_update_new')
@ -578,14 +629,24 @@ class TestPipeline(base.TestCase):
self.pipeline_cfg[0]['counters'] = ['a', 'b'] self.pipeline_cfg[0]['counters'] = ['a', 'b']
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.transformer_manager) self.transformer_manager)
with pipeline_manager.publisher(None, None) as p: with pipeline_manager.publisher(None) as p:
p([self.test_counter, p([self.test_counter,
self.test_counter._replace(name='b')]) sample.Sample(
name='b',
type=self.test_counter.type,
volume=self.test_counter.volume,
unit=self.test_counter.unit,
user_id=self.test_counter.user_id,
project_id=self.test_counter.project_id,
resource_id=self.test_counter.resource_id,
timestamp=self.test_counter.timestamp,
resource_metadata=self.test_counter.resource_metadata,
)])
publisher = pipeline_manager.pipelines[0].publishers[0] publisher = pipeline_manager.pipelines[0].publishers[0]
self.assertEqual(len(publisher.counters), 0) self.assertEqual(len(publisher.counters), 0)
with pipeline_manager.publisher(None, None) as p: with pipeline_manager.publisher(None) as p:
p([self.test_counter]) p([self.test_counter])
self.assertEqual(len(publisher.counters), CACHE_SIZE) self.assertEqual(len(publisher.counters), CACHE_SIZE)
@ -604,9 +665,9 @@ class TestPipeline(base.TestCase):
pipe = pipeline_manager.pipelines[0] pipe = pipeline_manager.pipelines[0]
publisher = pipe.publishers[0] publisher = pipe.publishers[0]
pipe.publish_counter(None, self.test_counter, None) pipe.publish_counter(None, self.test_counter)
self.assertEqual(len(publisher.counters), 0) self.assertEqual(len(publisher.counters), 0)
pipe.flush(None, None) pipe.flush(None)
self.assertEqual(len(publisher.counters), 1) self.assertEqual(len(publisher.counters), 1)
self.assertEqual(getattr(publisher.counters[0], 'name'), self.assertEqual(getattr(publisher.counters[0], 'name'),
'a_update') 'a_update')
@ -625,9 +686,19 @@ class TestPipeline(base.TestCase):
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.transformer_manager) self.transformer_manager)
self.test_counter = self.test_counter._replace(name='a:b') self.test_counter = sample.Sample(
name='a:b',
type=self.test_counter.type,
volume=self.test_counter.volume,
unit=self.test_counter.unit,
user_id=self.test_counter.user_id,
project_id=self.test_counter.project_id,
resource_id=self.test_counter.resource_id,
timestamp=self.test_counter.timestamp,
resource_metadata=self.test_counter.resource_metadata,
)
with pipeline_manager.publisher(None, None) as p: with pipeline_manager.publisher(None) as p:
p([self.test_counter]) p([self.test_counter])
publisher = pipeline_manager.pipelines[0].publishers[0] publisher = pipeline_manager.pipelines[0].publishers[0]
@ -670,10 +741,10 @@ class TestPipeline(base.TestCase):
self.transformer_manager) self.transformer_manager)
pipe = pipeline_manager.pipelines[0] pipe = pipeline_manager.pipelines[0]
pipe.publish_counters(None, counters, None) pipe.publish_counters(None, counters)
publisher = pipeline_manager.pipelines[0].publishers[0] publisher = pipeline_manager.pipelines[0].publishers[0]
self.assertEqual(len(publisher.counters), 1) self.assertEqual(len(publisher.counters), 1)
pipe.flush(None, None) pipe.flush(None)
self.assertEqual(len(publisher.counters), 1) self.assertEqual(len(publisher.counters), 1)
cpu_mins = publisher.counters[-1] cpu_mins = publisher.counters[-1]
self.assertEquals(getattr(cpu_mins, 'name'), 'cpu_mins') self.assertEquals(getattr(cpu_mins, 'name'), 'cpu_mins')
@ -723,7 +794,7 @@ class TestPipeline(base.TestCase):
self.transformer_manager) self.transformer_manager)
pipe = pipeline_manager.pipelines[0] pipe = pipeline_manager.pipelines[0]
pipe.publish_counters(None, counters, None) pipe.publish_counters(None, counters)
publisher = pipeline_manager.pipelines[0].publishers[0] publisher = pipeline_manager.pipelines[0].publishers[0]
self.assertEqual(len(publisher.counters), 2) self.assertEqual(len(publisher.counters), 2)
core_temp = publisher.counters[1] core_temp = publisher.counters[1]
@ -812,10 +883,10 @@ class TestPipeline(base.TestCase):
self.transformer_manager) self.transformer_manager)
pipe = pipeline_manager.pipelines[0] pipe = pipeline_manager.pipelines[0]
pipe.publish_counters(None, counters, None) pipe.publish_counters(None, counters)
publisher = pipeline_manager.pipelines[0].publishers[0] publisher = pipeline_manager.pipelines[0].publishers[0]
self.assertEqual(len(publisher.counters), 2) self.assertEqual(len(publisher.counters), 2)
pipe.flush(None, None) pipe.flush(None)
self.assertEqual(len(publisher.counters), 2) self.assertEqual(len(publisher.counters), 2)
cpu_util = publisher.counters[0] cpu_util = publisher.counters[0]
self.assertEquals(getattr(cpu_util, 'name'), 'cpu_util') self.assertEquals(getattr(cpu_util, 'name'), 'cpu_util')
@ -896,8 +967,8 @@ class TestPipeline(base.TestCase):
self.transformer_manager) self.transformer_manager)
pipe = pipeline_manager.pipelines[0] pipe = pipeline_manager.pipelines[0]
pipe.publish_counters(None, counters, None) pipe.publish_counters(None, counters)
publisher = pipeline_manager.pipelines[0].publishers[0] publisher = pipeline_manager.pipelines[0].publishers[0]
self.assertEqual(len(publisher.counters), 0) self.assertEqual(len(publisher.counters), 0)
pipe.flush(None, None) pipe.flush(None)
self.assertEqual(len(publisher.counters), 0) self.assertEqual(len(publisher.counters), 0)