Merge "Remove source as a publisher argument"
This commit is contained in:
commit
2c8bad71ff
@ -87,8 +87,7 @@ pipeline_manager = pipeline.setup_pipeline(
|
|||||||
),
|
),
|
||||||
)
|
)
|
||||||
|
|
||||||
with pipeline_manager.publisher(context.get_admin_context(),
|
with pipeline_manager.publisher(context.get_admin_context()) as p:
|
||||||
cfg.CONF.sample_source) as p:
|
|
||||||
p([sample.Sample(
|
p([sample.Sample(
|
||||||
name=cfg.CONF.counter_name,
|
name=cfg.CONF.counter_name,
|
||||||
type=cfg.CONF.counter_type,
|
type=cfg.CONF.counter_type,
|
||||||
|
@ -19,8 +19,6 @@
|
|||||||
import abc
|
import abc
|
||||||
import itertools
|
import itertools
|
||||||
|
|
||||||
from oslo.config import cfg
|
|
||||||
|
|
||||||
from ceilometer.openstack.common import context
|
from ceilometer.openstack.common import context
|
||||||
from ceilometer.openstack.common import log
|
from ceilometer.openstack.common import log
|
||||||
from ceilometer import pipeline
|
from ceilometer import pipeline
|
||||||
@ -39,8 +37,7 @@ class PollingTask(object):
|
|||||||
self.manager = agent_manager
|
self.manager = agent_manager
|
||||||
self.pollsters = set()
|
self.pollsters = set()
|
||||||
self.publish_context = pipeline.PublishContext(
|
self.publish_context = pipeline.PublishContext(
|
||||||
agent_manager.context,
|
agent_manager.context)
|
||||||
cfg.CONF.sample_source)
|
|
||||||
|
|
||||||
def add(self, pollster, pipelines):
|
def add(self, pollster, pipelines):
|
||||||
self.publish_context.add_pipelines(pipelines)
|
self.publish_context.add_pipelines(pipelines)
|
||||||
|
@ -42,7 +42,6 @@ from ceilometer.openstack.common import context
|
|||||||
from ceilometer.openstack.common import log
|
from ceilometer.openstack.common import log
|
||||||
from ceilometer.openstack.common import timeutils
|
from ceilometer.openstack.common import timeutils
|
||||||
from ceilometer import sample
|
from ceilometer import sample
|
||||||
from ceilometer import pipeline
|
|
||||||
from ceilometer import storage
|
from ceilometer import storage
|
||||||
from ceilometer.api import acl
|
from ceilometer.api import acl
|
||||||
|
|
||||||
@ -529,11 +528,8 @@ class MeterController(rest.RestController):
|
|||||||
s.timestamp = now
|
s.timestamp = now
|
||||||
s.source = '%s:%s' % (s.project_id, source)
|
s.source = '%s:%s' % (s.project_id, source)
|
||||||
|
|
||||||
with pipeline.PublishContext(
|
with pecan.request.pipeline_manager.publisher(
|
||||||
context.get_admin_context(),
|
context.get_admin_context()) as publisher:
|
||||||
source,
|
|
||||||
pecan.request.pipeline_manager.pipelines,
|
|
||||||
) as publisher:
|
|
||||||
publisher([sample.Sample(
|
publisher([sample.Sample(
|
||||||
name=s.counter_name,
|
name=s.counter_name,
|
||||||
type=s.counter_type,
|
type=s.counter_type,
|
||||||
@ -543,7 +539,8 @@ class MeterController(rest.RestController):
|
|||||||
project_id=s.project_id,
|
project_id=s.project_id,
|
||||||
resource_id=s.resource_id,
|
resource_id=s.resource_id,
|
||||||
timestamp=s.timestamp.isoformat(),
|
timestamp=s.timestamp.isoformat(),
|
||||||
resource_metadata=s.resource_metadata) for s in samples])
|
resource_metadata=s.resource_metadata,
|
||||||
|
source=source) for s in samples])
|
||||||
|
|
||||||
# TODO(asalkeld) this is not ideal, it would be nice if the publisher
|
# TODO(asalkeld) this is not ideal, it would be nice if the publisher
|
||||||
# returned the created sample message with message id (or at least the
|
# returned the created sample message with message id (or at least the
|
||||||
|
@ -250,8 +250,7 @@ class CollectorService(rpc_service.Service):
|
|||||||
handler = ext.obj
|
handler = ext.obj
|
||||||
if notification['event_type'] in handler.get_event_types():
|
if notification['event_type'] in handler.get_event_types():
|
||||||
ctxt = context.get_admin_context()
|
ctxt = context.get_admin_context()
|
||||||
with self.pipeline_manager.publisher(ctxt,
|
with self.pipeline_manager.publisher(ctxt) as p:
|
||||||
cfg.CONF.sample_source) as p:
|
|
||||||
# FIXME(dhellmann): Spawn green thread?
|
# FIXME(dhellmann): Spawn green thread?
|
||||||
p(list(handler.process_notification(notification)))
|
p(list(handler.process_notification(notification)))
|
||||||
|
|
||||||
|
@ -20,14 +20,11 @@ from ceilometer import pipeline
|
|||||||
from ceilometer import transformer
|
from ceilometer import transformer
|
||||||
from ceilometer.openstack.common import context as req_context
|
from ceilometer.openstack.common import context as req_context
|
||||||
from ceilometer.openstack.common import log as logging
|
from ceilometer.openstack.common import log as logging
|
||||||
from oslo.config import cfg
|
|
||||||
from stevedore import extension
|
from stevedore import extension
|
||||||
|
|
||||||
|
|
||||||
LOG = logging.getLogger(__name__)
|
LOG = logging.getLogger(__name__)
|
||||||
|
|
||||||
cfg.CONF.import_opt('sample_source', 'ceilometer.sample')
|
|
||||||
|
|
||||||
|
|
||||||
_notification_manager = None
|
_notification_manager = None
|
||||||
_pipeline_manager = None
|
_pipeline_manager = None
|
||||||
@ -63,8 +60,7 @@ def _process_notification_for_ext(ext, context, notification):
|
|||||||
handler = ext.obj
|
handler = ext.obj
|
||||||
if notification['event_type'] in handler.get_event_types():
|
if notification['event_type'] in handler.get_event_types():
|
||||||
|
|
||||||
with _pipeline_manager.publisher(context,
|
with _pipeline_manager.publisher(context) as p:
|
||||||
cfg.CONF.sample_source) as p:
|
|
||||||
# FIXME(dhellmann): Spawn green thread?
|
# FIXME(dhellmann): Spawn green thread?
|
||||||
p(list(handler.process_notification(notification)))
|
p(list(handler.process_notification(notification)))
|
||||||
|
|
||||||
|
@ -37,7 +37,6 @@ metadata_headers = X-TEST
|
|||||||
|
|
||||||
from __future__ import absolute_import
|
from __future__ import absolute_import
|
||||||
|
|
||||||
from oslo.config import cfg
|
|
||||||
from swift.common.utils import split_path
|
from swift.common.utils import split_path
|
||||||
import webob
|
import webob
|
||||||
|
|
||||||
@ -130,11 +129,8 @@ class CeilometerMiddleware(object):
|
|||||||
resource_metadata['http_header_%s' % header] = req.headers.get(
|
resource_metadata['http_header_%s' % header] = req.headers.get(
|
||||||
header.upper())
|
header.upper())
|
||||||
|
|
||||||
with pipeline.PublishContext(
|
with self.pipeline_manager.publisher(
|
||||||
context.get_admin_context(),
|
context.get_admin_context()) as publisher:
|
||||||
cfg.CONF.sample_source,
|
|
||||||
self.pipeline_manager.pipelines,
|
|
||||||
) as publisher:
|
|
||||||
if bytes_received:
|
if bytes_received:
|
||||||
publisher([sample.Sample(
|
publisher([sample.Sample(
|
||||||
name='storage.objects.incoming.bytes',
|
name='storage.objects.incoming.bytes',
|
||||||
|
@ -49,10 +49,9 @@ class PipelineException(Exception):
|
|||||||
|
|
||||||
class PublishContext(object):
|
class PublishContext(object):
|
||||||
|
|
||||||
def __init__(self, context, source, pipelines=[]):
|
def __init__(self, context, pipelines=[]):
|
||||||
self.pipelines = set(pipelines)
|
self.pipelines = set(pipelines)
|
||||||
self.context = context
|
self.context = context
|
||||||
self.source = source
|
|
||||||
|
|
||||||
def add_pipelines(self, pipelines):
|
def add_pipelines(self, pipelines):
|
||||||
self.pipelines.update(pipelines)
|
self.pipelines.update(pipelines)
|
||||||
@ -61,13 +60,12 @@ class PublishContext(object):
|
|||||||
def p(counters):
|
def p(counters):
|
||||||
for p in self.pipelines:
|
for p in self.pipelines:
|
||||||
p.publish_counters(self.context,
|
p.publish_counters(self.context,
|
||||||
counters,
|
counters)
|
||||||
self.source)
|
|
||||||
return p
|
return p
|
||||||
|
|
||||||
def __exit__(self, exc_type, exc_value, traceback):
|
def __exit__(self, exc_type, exc_value, traceback):
|
||||||
for p in self.pipelines:
|
for p in self.pipelines:
|
||||||
p.flush(self.context, self.source)
|
p.flush(self.context)
|
||||||
|
|
||||||
|
|
||||||
class Pipeline(object):
|
class Pipeline(object):
|
||||||
@ -175,10 +173,10 @@ class Pipeline(object):
|
|||||||
|
|
||||||
return transformers
|
return transformers
|
||||||
|
|
||||||
def _transform_counter(self, start, ctxt, counter, source):
|
def _transform_counter(self, start, ctxt, counter):
|
||||||
try:
|
try:
|
||||||
for transformer in self.transformers[start:]:
|
for transformer in self.transformers[start:]:
|
||||||
counter = transformer.handle_sample(ctxt, counter, source)
|
counter = transformer.handle_sample(ctxt, counter)
|
||||||
if not counter:
|
if not counter:
|
||||||
LOG.debug("Pipeline %s: Counter dropped by transformer %s",
|
LOG.debug("Pipeline %s: Counter dropped by transformer %s",
|
||||||
self, transformer)
|
self, transformer)
|
||||||
@ -190,7 +188,7 @@ class Pipeline(object):
|
|||||||
self, transformer, counter)
|
self, transformer, counter)
|
||||||
LOG.exception(err)
|
LOG.exception(err)
|
||||||
|
|
||||||
def _publish_counters(self, start, ctxt, counters, source):
|
def _publish_counters(self, start, ctxt, counters):
|
||||||
"""Push counter into pipeline for publishing.
|
"""Push counter into pipeline for publishing.
|
||||||
|
|
||||||
param start: the first transformer that the counter will be injected.
|
param start: the first transformer that the counter will be injected.
|
||||||
@ -198,7 +196,6 @@ class Pipeline(object):
|
|||||||
may emit counters
|
may emit counters
|
||||||
param ctxt: execution context from the manager or service
|
param ctxt: execution context from the manager or service
|
||||||
param counters: counter list
|
param counters: counter list
|
||||||
param source: counter source
|
|
||||||
|
|
||||||
"""
|
"""
|
||||||
|
|
||||||
@ -206,7 +203,7 @@ class Pipeline(object):
|
|||||||
for counter in counters:
|
for counter in counters:
|
||||||
LOG.debug("Pipeline %s: Transform counter %s from %s transformer",
|
LOG.debug("Pipeline %s: Transform counter %s from %s transformer",
|
||||||
self, counter, start)
|
self, counter, start)
|
||||||
counter = self._transform_counter(start, ctxt, counter, source)
|
counter = self._transform_counter(start, ctxt, counter)
|
||||||
if counter:
|
if counter:
|
||||||
transformed_counters.append(counter)
|
transformed_counters.append(counter)
|
||||||
|
|
||||||
@ -214,22 +211,22 @@ class Pipeline(object):
|
|||||||
|
|
||||||
for p in self.publishers:
|
for p in self.publishers:
|
||||||
try:
|
try:
|
||||||
p.publish_counters(ctxt, transformed_counters, source)
|
p.publish_counters(ctxt, transformed_counters)
|
||||||
except Exception:
|
except Exception:
|
||||||
LOG.exception("Pipeline %s: Continue after error "
|
LOG.exception("Pipeline %s: Continue after error "
|
||||||
"from publisher %s", self, p)
|
"from publisher %s", self, p)
|
||||||
|
|
||||||
LOG.audit("Pipeline %s: Published counters", self)
|
LOG.audit("Pipeline %s: Published counters", self)
|
||||||
|
|
||||||
def publish_counter(self, ctxt, counter, source):
|
def publish_counter(self, ctxt, counter):
|
||||||
self.publish_counters(ctxt, [counter], source)
|
self.publish_counters(ctxt, [counter])
|
||||||
|
|
||||||
def publish_counters(self, ctxt, counters, source):
|
def publish_counters(self, ctxt, counters):
|
||||||
for counter_name, counters in itertools.groupby(
|
for counter_name, counters in itertools.groupby(
|
||||||
sorted(counters, key=lambda c: c.name),
|
sorted(counters, key=lambda c: c.name),
|
||||||
lambda c: c.name):
|
lambda c: c.name):
|
||||||
if self.support_counter(counter_name):
|
if self.support_counter(counter_name):
|
||||||
self._publish_counters(0, ctxt, counters, source)
|
self._publish_counters(0, ctxt, counters)
|
||||||
|
|
||||||
# (yjiang5) To support counters like instance:m1.tiny,
|
# (yjiang5) To support counters like instance:m1.tiny,
|
||||||
# which include variable part at the end starting with ':'.
|
# which include variable part at the end starting with ':'.
|
||||||
@ -252,15 +249,14 @@ class Pipeline(object):
|
|||||||
else:
|
else:
|
||||||
return counter_name in self.counters
|
return counter_name in self.counters
|
||||||
|
|
||||||
def flush(self, ctxt, source):
|
def flush(self, ctxt):
|
||||||
"""Flush data after all counter have been injected to pipeline."""
|
"""Flush data after all counter have been injected to pipeline."""
|
||||||
|
|
||||||
LOG.audit("Flush pipeline %s", self)
|
LOG.audit("Flush pipeline %s", self)
|
||||||
for (i, transformer) in enumerate(self.transformers):
|
for (i, transformer) in enumerate(self.transformers):
|
||||||
try:
|
try:
|
||||||
self._publish_counters(i + 1, ctxt,
|
self._publish_counters(i + 1, ctxt,
|
||||||
list(transformer.flush(ctxt, source)),
|
list(transformer.flush(ctxt)))
|
||||||
source)
|
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
LOG.warning(
|
LOG.warning(
|
||||||
"Pipeline %s: Error flushing "
|
"Pipeline %s: Error flushing "
|
||||||
@ -327,13 +323,13 @@ class PipelineManager(object):
|
|||||||
self.pipelines = [Pipeline(pipedef, transformer_manager)
|
self.pipelines = [Pipeline(pipedef, transformer_manager)
|
||||||
for pipedef in cfg]
|
for pipedef in cfg]
|
||||||
|
|
||||||
def publisher(self, context, source):
|
def publisher(self, context):
|
||||||
"""Build a new Publisher for these manager pipelines.
|
"""Build a new Publisher for these manager pipelines.
|
||||||
|
|
||||||
:param context: The context.
|
:param context: The context.
|
||||||
:param source: Counter source.
|
:param source: Counter source.
|
||||||
"""
|
"""
|
||||||
return PublishContext(context, source, self.pipelines)
|
return PublishContext(context, self.pipelines)
|
||||||
|
|
||||||
|
|
||||||
def setup_pipeline(transformer_manager):
|
def setup_pipeline(transformer_manager):
|
||||||
|
@ -43,5 +43,5 @@ class PublisherBase(object):
|
|||||||
pass
|
pass
|
||||||
|
|
||||||
@abc.abstractmethod
|
@abc.abstractmethod
|
||||||
def publish_counters(self, context, counters, source):
|
def publish_counters(self, context, counters):
|
||||||
"Publish counters into final conduit."
|
"Publish counters into final conduit."
|
||||||
|
@ -86,12 +86,11 @@ class FilePublisher(publisher.PublisherBase):
|
|||||||
rfh.setLevel(logging.INFO)
|
rfh.setLevel(logging.INFO)
|
||||||
self.publisher_logger.addHandler(rfh)
|
self.publisher_logger.addHandler(rfh)
|
||||||
|
|
||||||
def publish_counters(self, context, counters, source):
|
def publish_counters(self, context, counters):
|
||||||
"""Send a metering message for publishing
|
"""Send a metering message for publishing
|
||||||
|
|
||||||
:param context: Execution context from the service or RPC call
|
:param context: Execution context from the service or RPC call
|
||||||
:param counter: Counter from pipeline after transformation
|
:param counter: Counter from pipeline after transformation
|
||||||
:param source: counter source
|
|
||||||
"""
|
"""
|
||||||
if self.publisher_logger:
|
if self.publisher_logger:
|
||||||
self.publisher_logger.info(counters)
|
self.publisher_logger.info(counters)
|
||||||
|
@ -137,12 +137,11 @@ class RPCPublisher(publisher.PublisherBase):
|
|||||||
% self.policy)
|
% self.policy)
|
||||||
self.policy = 'default'
|
self.policy = 'default'
|
||||||
|
|
||||||
def publish_counters(self, context, counters, source):
|
def publish_counters(self, context, counters):
|
||||||
"""Publish counters on RPC.
|
"""Publish counters on RPC.
|
||||||
|
|
||||||
:param context: Execution context from the service or RPC call.
|
:param context: Execution context from the service or RPC call.
|
||||||
:param counters: Counters from pipeline after transformation.
|
:param counters: Counters from pipeline after transformation.
|
||||||
:param source: Counter source.
|
|
||||||
|
|
||||||
"""
|
"""
|
||||||
|
|
||||||
@ -150,7 +149,7 @@ class RPCPublisher(publisher.PublisherBase):
|
|||||||
meter_message_from_counter(
|
meter_message_from_counter(
|
||||||
counter,
|
counter,
|
||||||
cfg.CONF.publisher_rpc.metering_secret,
|
cfg.CONF.publisher_rpc.metering_secret,
|
||||||
source)
|
counter.source)
|
||||||
for counter in counters
|
for counter in counters
|
||||||
]
|
]
|
||||||
|
|
||||||
|
@ -27,11 +27,10 @@ class TestPublisher(publisher.PublisherBase):
|
|||||||
def __init__(self, parsed_url):
|
def __init__(self, parsed_url):
|
||||||
self.counters = []
|
self.counters = []
|
||||||
|
|
||||||
def publish_counters(self, context, counters, source):
|
def publish_counters(self, context, counters):
|
||||||
"""Send a metering message for publishing
|
"""Send a metering message for publishing
|
||||||
|
|
||||||
:param context: Execution context from the service or RPC call
|
:param context: Execution context from the service or RPC call
|
||||||
:param counter: Counter from pipeline after transformation
|
:param counter: Counter from pipeline after transformation
|
||||||
:param source: counter source
|
|
||||||
"""
|
"""
|
||||||
self.counters.extend(counters)
|
self.counters.extend(counters)
|
||||||
|
@ -41,17 +41,15 @@ class UDPPublisher(publisher.PublisherBase):
|
|||||||
self.socket = socket.socket(socket.AF_INET,
|
self.socket = socket.socket(socket.AF_INET,
|
||||||
socket.SOCK_DGRAM)
|
socket.SOCK_DGRAM)
|
||||||
|
|
||||||
def publish_counters(self, context, counters, source):
|
def publish_counters(self, context, counters):
|
||||||
"""Send a metering message for publishing
|
"""Send a metering message for publishing
|
||||||
|
|
||||||
:param context: Execution context from the service or RPC call
|
:param context: Execution context from the service or RPC call
|
||||||
:param counter: Counter from pipeline after transformation
|
:param counter: Counter from pipeline after transformation
|
||||||
:param source: counter source
|
|
||||||
"""
|
"""
|
||||||
|
|
||||||
for counter in counters:
|
for counter in counters:
|
||||||
msg = counter._asdict()
|
msg = counter.as_dict()
|
||||||
msg['source'] = source
|
|
||||||
host = self.host
|
host = self.host
|
||||||
port = self.port
|
port = self.port
|
||||||
LOG.debug(_("Publishing counter %(msg)s over UDP to "
|
LOG.debug(_("Publishing counter %(msg)s over UDP to "
|
||||||
|
@ -1,8 +1,10 @@
|
|||||||
# -*- encoding: utf-8 -*-
|
# -*- encoding: utf-8 -*-
|
||||||
#
|
#
|
||||||
# Copyright © 2012 New Dream Network, LLC (DreamHost)
|
# Copyright © 2012 New Dream Network, LLC (DreamHost)
|
||||||
|
# Copyright © 2013 eNovance
|
||||||
#
|
#
|
||||||
# Author: Doug Hellmann <doug.hellmann@dreamhost.com>
|
# Authors: Doug Hellmann <doug.hellmann@dreamhost.com>
|
||||||
|
# Julien Danjou <julien@danjou.info>
|
||||||
#
|
#
|
||||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
# not use this file except in compliance with the License. You may obtain
|
# not use this file except in compliance with the License. You may obtain
|
||||||
@ -22,7 +24,6 @@ ensure that all of the appropriate fields have been filled
|
|||||||
in by the plugins that create them.
|
in by the plugins that create them.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
import collections
|
|
||||||
import copy
|
import copy
|
||||||
|
|
||||||
from oslo.config import cfg
|
from oslo.config import cfg
|
||||||
@ -40,6 +41,7 @@ cfg.CONF.register_opts(OPTS)
|
|||||||
|
|
||||||
# Fields explanation:
|
# Fields explanation:
|
||||||
#
|
#
|
||||||
|
# Source: the source of this sample
|
||||||
# Name: the name of the meter, must be unique
|
# Name: the name of the meter, must be unique
|
||||||
# Type: the type of the meter, must be either:
|
# Type: the type of the meter, must be either:
|
||||||
# - cumulative: the value is incremented and never reset to 0
|
# - cumulative: the value is incremented and never reset to 0
|
||||||
@ -52,40 +54,41 @@ cfg.CONF.register_opts(OPTS)
|
|||||||
# Resource ID: the resource ID
|
# Resource ID: the resource ID
|
||||||
# Timestamp: when the sample has been read
|
# Timestamp: when the sample has been read
|
||||||
# Resource metadata: various metadata
|
# Resource metadata: various metadata
|
||||||
Sample = collections.namedtuple('Sample',
|
class Sample(object):
|
||||||
' '.join([
|
|
||||||
'name',
|
|
||||||
'type',
|
|
||||||
'unit',
|
|
||||||
'volume',
|
|
||||||
'user_id',
|
|
||||||
'project_id',
|
|
||||||
'resource_id',
|
|
||||||
'timestamp',
|
|
||||||
'resource_metadata',
|
|
||||||
]))
|
|
||||||
|
|
||||||
|
def __init__(self, name, type, unit, volume, user_id, project_id,
|
||||||
|
resource_id, timestamp, resource_metadata, source=None):
|
||||||
|
self.name = name
|
||||||
|
self.type = type
|
||||||
|
self.unit = unit
|
||||||
|
self.volume = volume
|
||||||
|
self.user_id = user_id
|
||||||
|
self.project_id = project_id
|
||||||
|
self.resource_id = resource_id
|
||||||
|
self.timestamp = timestamp
|
||||||
|
self.resource_metadata = resource_metadata
|
||||||
|
self.source = source or cfg.CONF.sample_source
|
||||||
|
|
||||||
|
def as_dict(self):
|
||||||
|
return copy.copy(self.__dict__)
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def from_notification(cls, name, type, volume, unit,
|
||||||
|
user_id, project_id, resource_id,
|
||||||
|
message):
|
||||||
|
metadata = copy.copy(message['payload'])
|
||||||
|
metadata['event_type'] = message['event_type']
|
||||||
|
metadata['host'] = message['publisher_id']
|
||||||
|
return cls(name=name,
|
||||||
|
type=type,
|
||||||
|
volume=volume,
|
||||||
|
unit=unit,
|
||||||
|
user_id=user_id,
|
||||||
|
project_id=project_id,
|
||||||
|
resource_id=resource_id,
|
||||||
|
timestamp=message['timestamp'],
|
||||||
|
resource_metadata=metadata)
|
||||||
|
|
||||||
TYPE_GAUGE = 'gauge'
|
TYPE_GAUGE = 'gauge'
|
||||||
TYPE_DELTA = 'delta'
|
TYPE_DELTA = 'delta'
|
||||||
TYPE_CUMULATIVE = 'cumulative'
|
TYPE_CUMULATIVE = 'cumulative'
|
||||||
|
|
||||||
|
|
||||||
def from_notification(cls, name, type, volume, unit,
|
|
||||||
user_id, project_id, resource_id,
|
|
||||||
message):
|
|
||||||
metadata = copy.copy(message['payload'])
|
|
||||||
metadata['event_type'] = message['event_type']
|
|
||||||
metadata['host'] = message['publisher_id']
|
|
||||||
return cls(name=name,
|
|
||||||
type=type,
|
|
||||||
volume=volume,
|
|
||||||
unit=unit,
|
|
||||||
user_id=user_id,
|
|
||||||
project_id=project_id,
|
|
||||||
resource_id=resource_id,
|
|
||||||
timestamp=message['timestamp'],
|
|
||||||
resource_metadata=metadata)
|
|
||||||
|
|
||||||
|
|
||||||
Sample.from_notification = classmethod(from_notification)
|
|
||||||
|
@ -53,18 +53,16 @@ class TransformerBase(object):
|
|||||||
super(TransformerBase, self).__init__()
|
super(TransformerBase, self).__init__()
|
||||||
|
|
||||||
@abc.abstractmethod
|
@abc.abstractmethod
|
||||||
def handle_sample(self, context, counter, source):
|
def handle_sample(self, context, counter):
|
||||||
"""Transform a counter.
|
"""Transform a counter.
|
||||||
|
|
||||||
:param context: Passed from the data collector.
|
:param context: Passed from the data collector.
|
||||||
:param counter: A counter.
|
:param counter: A counter.
|
||||||
:param source: Passed from data collector.
|
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def flush(self, context, source):
|
def flush(self, context):
|
||||||
"""Flush counters cached previously.
|
"""Flush counters cached previously.
|
||||||
|
|
||||||
:param context: Passed from the data collector.
|
:param context: Passed from the data collector.
|
||||||
:param source: Source of counters that are being published.
|
|
||||||
"""
|
"""
|
||||||
return []
|
return []
|
||||||
|
@ -31,13 +31,13 @@ class TransformerAccumulator(transformer.TransformerBase):
|
|||||||
self.size = size
|
self.size = size
|
||||||
super(TransformerAccumulator, self).__init__(**kwargs)
|
super(TransformerAccumulator, self).__init__(**kwargs)
|
||||||
|
|
||||||
def handle_sample(self, context, counter, source):
|
def handle_sample(self, context, counter):
|
||||||
if self.size >= 1:
|
if self.size >= 1:
|
||||||
self.counters.append(counter)
|
self.counters.append(counter)
|
||||||
else:
|
else:
|
||||||
return counter
|
return counter
|
||||||
|
|
||||||
def flush(self, context, source):
|
def flush(self, context):
|
||||||
if len(self.counters) >= self.size:
|
if len(self.counters) >= self.size:
|
||||||
x = self.counters
|
x = self.counters
|
||||||
self.counters = []
|
self.counters = []
|
||||||
|
@ -74,7 +74,7 @@ class ScalingTransformer(transformer.TransformerBase):
|
|||||||
"""Apply the scaling factor (either a straight multiplicative
|
"""Apply the scaling factor (either a straight multiplicative
|
||||||
factor or else a string to be eval'd).
|
factor or else a string to be eval'd).
|
||||||
"""
|
"""
|
||||||
ns = Namespace(counter._asdict())
|
ns = Namespace(counter.as_dict())
|
||||||
|
|
||||||
return ((eval(scale, {}, ns) if isinstance(scale, basestring)
|
return ((eval(scale, {}, ns) if isinstance(scale, basestring)
|
||||||
else counter.volume * scale) if scale else counter.volume)
|
else counter.volume * scale) if scale else counter.volume)
|
||||||
@ -95,7 +95,7 @@ class ScalingTransformer(transformer.TransformerBase):
|
|||||||
resource_metadata=counter.resource_metadata
|
resource_metadata=counter.resource_metadata
|
||||||
)
|
)
|
||||||
|
|
||||||
def handle_sample(self, context, counter, source):
|
def handle_sample(self, context, counter):
|
||||||
"""Handle a sample, converting if necessary."""
|
"""Handle a sample, converting if necessary."""
|
||||||
LOG.debug('handling counter %s', (counter,))
|
LOG.debug('handling counter %s', (counter,))
|
||||||
if (self.source.get('unit', counter.unit) == counter.unit):
|
if (self.source.get('unit', counter.unit) == counter.unit):
|
||||||
@ -117,7 +117,7 @@ class RateOfChangeTransformer(ScalingTransformer):
|
|||||||
self.cache = {}
|
self.cache = {}
|
||||||
super(RateOfChangeTransformer, self).__init__(**kwargs)
|
super(RateOfChangeTransformer, self).__init__(**kwargs)
|
||||||
|
|
||||||
def handle_sample(self, context, counter, source):
|
def handle_sample(self, context, counter):
|
||||||
"""Handle a sample, converting if necessary."""
|
"""Handle a sample, converting if necessary."""
|
||||||
LOG.debug('handling counter %s', (counter,))
|
LOG.debug('handling counter %s', (counter,))
|
||||||
key = counter.name + counter.resource_id
|
key = counter.name + counter.resource_id
|
||||||
|
@ -71,15 +71,42 @@ class BaseAgentManagerTestCase(base.TestCase):
|
|||||||
|
|
||||||
class PollsterAnother(TestPollster):
|
class PollsterAnother(TestPollster):
|
||||||
counters = []
|
counters = []
|
||||||
test_data = default_test_data._replace(name='testanother')
|
test_data = sample.Sample(
|
||||||
|
name='testanother',
|
||||||
|
type=default_test_data.type,
|
||||||
|
unit=default_test_data.unit,
|
||||||
|
volume=default_test_data.volume,
|
||||||
|
user_id=default_test_data.user_id,
|
||||||
|
project_id=default_test_data.project_id,
|
||||||
|
resource_id=default_test_data.resource_id,
|
||||||
|
timestamp=default_test_data.timestamp,
|
||||||
|
resource_metadata=default_test_data.resource_metadata)
|
||||||
|
|
||||||
class PollsterException(TestPollsterException):
|
class PollsterException(TestPollsterException):
|
||||||
counters = []
|
counters = []
|
||||||
test_data = default_test_data._replace(name='testexception')
|
test_data = sample.Sample(
|
||||||
|
name='testexception',
|
||||||
|
type=default_test_data.type,
|
||||||
|
unit=default_test_data.unit,
|
||||||
|
volume=default_test_data.volume,
|
||||||
|
user_id=default_test_data.user_id,
|
||||||
|
project_id=default_test_data.project_id,
|
||||||
|
resource_id=default_test_data.resource_id,
|
||||||
|
timestamp=default_test_data.timestamp,
|
||||||
|
resource_metadata=default_test_data.resource_metadata)
|
||||||
|
|
||||||
class PollsterExceptionAnother(TestPollsterException):
|
class PollsterExceptionAnother(TestPollsterException):
|
||||||
counters = []
|
counters = []
|
||||||
test_data = default_test_data._replace(name='testexceptionanother')
|
test_data = sample.Sample(
|
||||||
|
name='testexceptionanother',
|
||||||
|
type=default_test_data.type,
|
||||||
|
unit=default_test_data.unit,
|
||||||
|
volume=default_test_data.volume,
|
||||||
|
user_id=default_test_data.user_id,
|
||||||
|
project_id=default_test_data.project_id,
|
||||||
|
resource_id=default_test_data.resource_id,
|
||||||
|
timestamp=default_test_data.timestamp,
|
||||||
|
resource_metadata=default_test_data.resource_metadata)
|
||||||
|
|
||||||
def setup_pipeline(self):
|
def setup_pipeline(self):
|
||||||
self.transformer_manager = transformer.TransformerExtensionManager(
|
self.transformer_manager = transformer.TransformerExtensionManager(
|
||||||
|
@ -115,7 +115,7 @@ class TestUDPCollectorService(TestCollector):
|
|||||||
def setUp(self):
|
def setUp(self):
|
||||||
super(TestUDPCollectorService, self).setUp()
|
super(TestUDPCollectorService, self).setUp()
|
||||||
self.srv = service.UDPCollectorService()
|
self.srv = service.UDPCollectorService()
|
||||||
self.counter = dict(sample.Sample(
|
self.counter = sample.Sample(
|
||||||
name='foobar',
|
name='foobar',
|
||||||
type='bad',
|
type='bad',
|
||||||
unit='F',
|
unit='F',
|
||||||
@ -125,7 +125,7 @@ class TestUDPCollectorService(TestCollector):
|
|||||||
resource_id='cat',
|
resource_id='cat',
|
||||||
timestamp='NOW!',
|
timestamp='NOW!',
|
||||||
resource_metadata={},
|
resource_metadata={},
|
||||||
)._asdict())
|
).as_dict()
|
||||||
|
|
||||||
def test_service_has_storage_conn(self):
|
def test_service_has_storage_conn(self):
|
||||||
srv = service.UDPCollectorService()
|
srv = service.UDPCollectorService()
|
||||||
|
@ -43,24 +43,21 @@ class FakeApp(object):
|
|||||||
|
|
||||||
class TestSwiftMiddleware(base.TestCase):
|
class TestSwiftMiddleware(base.TestCase):
|
||||||
|
|
||||||
class _faux_pipeline_manager(object):
|
class _faux_pipeline_manager(pipeline.PipelineManager):
|
||||||
class _faux_pipeline(object):
|
class _faux_pipeline(object):
|
||||||
def __init__(self, pipeline_manager):
|
def __init__(self, pipeline_manager):
|
||||||
self.pipeline_manager = pipeline_manager
|
self.pipeline_manager = pipeline_manager
|
||||||
self.counters = []
|
self.counters = []
|
||||||
|
|
||||||
def publish_counters(self, ctxt, counters, source):
|
def publish_counters(self, ctxt, counters):
|
||||||
self.counters.extend(counters)
|
self.counters.extend(counters)
|
||||||
|
|
||||||
def flush(self, context, source):
|
def flush(self, context):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self.pipelines = [self._faux_pipeline(self)]
|
self.pipelines = [self._faux_pipeline(self)]
|
||||||
|
|
||||||
def flush(self, ctx, source):
|
|
||||||
pass
|
|
||||||
|
|
||||||
def _faux_setup_pipeline(self, transformer_manager):
|
def _faux_setup_pipeline(self, transformer_manager):
|
||||||
return self.pipeline_manager
|
return self.pipeline_manager
|
||||||
|
|
||||||
|
@ -66,16 +66,13 @@ class TestFilePublisher(base.TestCase):
|
|||||||
),
|
),
|
||||||
]
|
]
|
||||||
|
|
||||||
COUNTER_SOURCE = 'testsource'
|
|
||||||
|
|
||||||
def test_file_publisher(self):
|
def test_file_publisher(self):
|
||||||
# Test valid configurations
|
# Test valid configurations
|
||||||
parsed_url = urlsplit(
|
parsed_url = urlsplit(
|
||||||
'file:///tmp/log_file?max_bytes=50&backup_count=3')
|
'file:///tmp/log_file?max_bytes=50&backup_count=3')
|
||||||
publisher = file.FilePublisher(parsed_url)
|
publisher = file.FilePublisher(parsed_url)
|
||||||
publisher.publish_counters(None,
|
publisher.publish_counters(None,
|
||||||
self.test_data,
|
self.test_data)
|
||||||
self.COUNTER_SOURCE)
|
|
||||||
|
|
||||||
handler = publisher.publisher_logger.handlers[0]
|
handler = publisher.publisher_logger.handlers[0]
|
||||||
self.assertTrue(isinstance(handler,
|
self.assertTrue(isinstance(handler,
|
||||||
@ -91,8 +88,7 @@ class TestFilePublisher(base.TestCase):
|
|||||||
'file:///tmp/log_file_plain')
|
'file:///tmp/log_file_plain')
|
||||||
publisher = file.FilePublisher(parsed_url)
|
publisher = file.FilePublisher(parsed_url)
|
||||||
publisher.publish_counters(None,
|
publisher.publish_counters(None,
|
||||||
self.test_data,
|
self.test_data)
|
||||||
self.COUNTER_SOURCE)
|
|
||||||
|
|
||||||
handler = publisher.publisher_logger.handlers[0]
|
handler = publisher.publisher_logger.handlers[0]
|
||||||
self.assertTrue(isinstance(handler,
|
self.assertTrue(isinstance(handler,
|
||||||
@ -109,7 +105,6 @@ class TestFilePublisher(base.TestCase):
|
|||||||
'file:///tmp/log_file_bad?max_bytes=yus&backup_count=5y')
|
'file:///tmp/log_file_bad?max_bytes=yus&backup_count=5y')
|
||||||
publisher = file.FilePublisher(parsed_url)
|
publisher = file.FilePublisher(parsed_url)
|
||||||
publisher.publish_counters(None,
|
publisher.publish_counters(None,
|
||||||
self.test_data,
|
self.test_data)
|
||||||
self.COUNTER_SOURCE)
|
|
||||||
|
|
||||||
self.assertIsNone(publisher.publisher_logger)
|
self.assertIsNone(publisher.publisher_logger)
|
||||||
|
@ -222,8 +222,7 @@ class TestPublish(base.TestCase):
|
|||||||
publisher = rpc.RPCPublisher(
|
publisher = rpc.RPCPublisher(
|
||||||
network_utils.urlsplit('rpc://'))
|
network_utils.urlsplit('rpc://'))
|
||||||
publisher.publish_counters(None,
|
publisher.publish_counters(None,
|
||||||
self.test_data,
|
self.test_data)
|
||||||
'test')
|
|
||||||
self.assertEqual(len(self.published), 1)
|
self.assertEqual(len(self.published), 1)
|
||||||
self.assertEqual(self.published[0][0],
|
self.assertEqual(self.published[0][0],
|
||||||
cfg.CONF.publisher_rpc.metering_topic)
|
cfg.CONF.publisher_rpc.metering_topic)
|
||||||
@ -235,8 +234,7 @@ class TestPublish(base.TestCase):
|
|||||||
publisher = rpc.RPCPublisher(
|
publisher = rpc.RPCPublisher(
|
||||||
network_utils.urlsplit('rpc://?target=custom_procedure_call'))
|
network_utils.urlsplit('rpc://?target=custom_procedure_call'))
|
||||||
publisher.publish_counters(None,
|
publisher.publish_counters(None,
|
||||||
self.test_data,
|
self.test_data)
|
||||||
'test')
|
|
||||||
self.assertEqual(len(self.published), 1)
|
self.assertEqual(len(self.published), 1)
|
||||||
self.assertEqual(self.published[0][0],
|
self.assertEqual(self.published[0][0],
|
||||||
cfg.CONF.publisher_rpc.metering_topic)
|
cfg.CONF.publisher_rpc.metering_topic)
|
||||||
@ -248,8 +246,7 @@ class TestPublish(base.TestCase):
|
|||||||
publisher = rpc.RPCPublisher(
|
publisher = rpc.RPCPublisher(
|
||||||
network_utils.urlsplit('rpc://?per_meter_topic=1'))
|
network_utils.urlsplit('rpc://?per_meter_topic=1'))
|
||||||
publisher.publish_counters(None,
|
publisher.publish_counters(None,
|
||||||
self.test_data,
|
self.test_data)
|
||||||
'test')
|
|
||||||
self.assertEqual(len(self.published), 4)
|
self.assertEqual(len(self.published), 4)
|
||||||
for topic, rpc_call in self.published:
|
for topic, rpc_call in self.published:
|
||||||
meters = rpc_call['args']['data']
|
meters = rpc_call['args']['data']
|
||||||
@ -276,7 +273,7 @@ class TestPublish(base.TestCase):
|
|||||||
self.assertRaises(
|
self.assertRaises(
|
||||||
SystemExit,
|
SystemExit,
|
||||||
publisher.publish_counters,
|
publisher.publish_counters,
|
||||||
None, self.test_data, 'test')
|
None, self.test_data)
|
||||||
self.assertEqual(publisher.policy, 'default')
|
self.assertEqual(publisher.policy, 'default')
|
||||||
self.assertEqual(len(self.published), 0)
|
self.assertEqual(len(self.published), 0)
|
||||||
self.assertEqual(len(publisher.local_queue), 0)
|
self.assertEqual(len(publisher.local_queue), 0)
|
||||||
@ -288,7 +285,7 @@ class TestPublish(base.TestCase):
|
|||||||
self.assertRaises(
|
self.assertRaises(
|
||||||
SystemExit,
|
SystemExit,
|
||||||
publisher.publish_counters,
|
publisher.publish_counters,
|
||||||
None, self.test_data, 'test')
|
None, self.test_data)
|
||||||
self.assertEqual(len(self.published), 0)
|
self.assertEqual(len(self.published), 0)
|
||||||
self.assertEqual(len(publisher.local_queue), 0)
|
self.assertEqual(len(publisher.local_queue), 0)
|
||||||
|
|
||||||
@ -299,7 +296,7 @@ class TestPublish(base.TestCase):
|
|||||||
self.assertRaises(
|
self.assertRaises(
|
||||||
SystemExit,
|
SystemExit,
|
||||||
publisher.publish_counters,
|
publisher.publish_counters,
|
||||||
None, self.test_data, 'test')
|
None, self.test_data)
|
||||||
self.assertEqual(publisher.policy, 'default')
|
self.assertEqual(publisher.policy, 'default')
|
||||||
self.assertEqual(len(self.published), 0)
|
self.assertEqual(len(self.published), 0)
|
||||||
self.assertEqual(len(publisher.local_queue), 0)
|
self.assertEqual(len(publisher.local_queue), 0)
|
||||||
@ -309,8 +306,7 @@ class TestPublish(base.TestCase):
|
|||||||
publisher = rpc.RPCPublisher(
|
publisher = rpc.RPCPublisher(
|
||||||
network_utils.urlsplit('rpc://?policy=drop'))
|
network_utils.urlsplit('rpc://?policy=drop'))
|
||||||
publisher.publish_counters(None,
|
publisher.publish_counters(None,
|
||||||
self.test_data,
|
self.test_data)
|
||||||
'test')
|
|
||||||
self.assertEqual(len(self.published), 0)
|
self.assertEqual(len(self.published), 0)
|
||||||
self.assertEqual(len(publisher.local_queue), 0)
|
self.assertEqual(len(publisher.local_queue), 0)
|
||||||
|
|
||||||
@ -319,8 +315,7 @@ class TestPublish(base.TestCase):
|
|||||||
publisher = rpc.RPCPublisher(
|
publisher = rpc.RPCPublisher(
|
||||||
network_utils.urlsplit('rpc://?policy=queue'))
|
network_utils.urlsplit('rpc://?policy=queue'))
|
||||||
publisher.publish_counters(None,
|
publisher.publish_counters(None,
|
||||||
self.test_data,
|
self.test_data)
|
||||||
'test')
|
|
||||||
self.assertEqual(len(self.published), 0)
|
self.assertEqual(len(self.published), 0)
|
||||||
self.assertEqual(len(publisher.local_queue), 1)
|
self.assertEqual(len(publisher.local_queue), 1)
|
||||||
|
|
||||||
@ -329,15 +324,13 @@ class TestPublish(base.TestCase):
|
|||||||
publisher = rpc.RPCPublisher(
|
publisher = rpc.RPCPublisher(
|
||||||
network_utils.urlsplit('rpc://?policy=queue'))
|
network_utils.urlsplit('rpc://?policy=queue'))
|
||||||
publisher.publish_counters(None,
|
publisher.publish_counters(None,
|
||||||
self.test_data,
|
self.test_data)
|
||||||
'test')
|
|
||||||
self.assertEqual(len(self.published), 0)
|
self.assertEqual(len(self.published), 0)
|
||||||
self.assertEqual(len(publisher.local_queue), 1)
|
self.assertEqual(len(publisher.local_queue), 1)
|
||||||
|
|
||||||
self.rpc_unreachable = False
|
self.rpc_unreachable = False
|
||||||
publisher.publish_counters(None,
|
publisher.publish_counters(None,
|
||||||
self.test_data,
|
self.test_data)
|
||||||
'test')
|
|
||||||
|
|
||||||
self.assertEqual(len(self.published), 2)
|
self.assertEqual(len(self.published), 2)
|
||||||
self.assertEqual(len(publisher.local_queue), 0)
|
self.assertEqual(len(publisher.local_queue), 0)
|
||||||
@ -347,9 +340,10 @@ class TestPublish(base.TestCase):
|
|||||||
publisher = rpc.RPCPublisher(
|
publisher = rpc.RPCPublisher(
|
||||||
network_utils.urlsplit('rpc://?policy=queue&max_queue_length=3'))
|
network_utils.urlsplit('rpc://?policy=queue&max_queue_length=3'))
|
||||||
for i in range(0, 5):
|
for i in range(0, 5):
|
||||||
|
for s in self.test_data:
|
||||||
|
s.source = 'test-%d' % i
|
||||||
publisher.publish_counters(None,
|
publisher.publish_counters(None,
|
||||||
self.test_data,
|
self.test_data)
|
||||||
'test-%d' % i)
|
|
||||||
self.assertEqual(len(self.published), 0)
|
self.assertEqual(len(self.published), 0)
|
||||||
self.assertEqual(len(publisher.local_queue), 3)
|
self.assertEqual(len(publisher.local_queue), 3)
|
||||||
self.assertEqual(
|
self.assertEqual(
|
||||||
@ -370,9 +364,10 @@ class TestPublish(base.TestCase):
|
|||||||
publisher = rpc.RPCPublisher(
|
publisher = rpc.RPCPublisher(
|
||||||
network_utils.urlsplit('rpc://?policy=queue'))
|
network_utils.urlsplit('rpc://?policy=queue'))
|
||||||
for i in range(0, 2000):
|
for i in range(0, 2000):
|
||||||
|
for s in self.test_data:
|
||||||
|
s.source = 'test-%d' % i
|
||||||
publisher.publish_counters(None,
|
publisher.publish_counters(None,
|
||||||
self.test_data,
|
self.test_data)
|
||||||
'test-%d' % i)
|
|
||||||
self.assertEqual(len(self.published), 0)
|
self.assertEqual(len(self.published), 0)
|
||||||
self.assertEqual(len(publisher.local_queue), 1024)
|
self.assertEqual(len(publisher.local_queue), 1024)
|
||||||
self.assertEqual(
|
self.assertEqual(
|
||||||
|
@ -29,6 +29,9 @@ from ceilometer.tests import base
|
|||||||
from ceilometer.openstack.common import network_utils
|
from ceilometer.openstack.common import network_utils
|
||||||
|
|
||||||
|
|
||||||
|
COUNTER_SOURCE = 'testsource'
|
||||||
|
|
||||||
|
|
||||||
class TestUDPPublisher(base.TestCase):
|
class TestUDPPublisher(base.TestCase):
|
||||||
|
|
||||||
test_data = [
|
test_data = [
|
||||||
@ -42,6 +45,7 @@ class TestUDPPublisher(base.TestCase):
|
|||||||
resource_id='test_run_tasks',
|
resource_id='test_run_tasks',
|
||||||
timestamp=datetime.datetime.utcnow().isoformat(),
|
timestamp=datetime.datetime.utcnow().isoformat(),
|
||||||
resource_metadata={'name': 'TestPublish'},
|
resource_metadata={'name': 'TestPublish'},
|
||||||
|
source=COUNTER_SOURCE,
|
||||||
),
|
),
|
||||||
sample.Sample(
|
sample.Sample(
|
||||||
name='test',
|
name='test',
|
||||||
@ -53,6 +57,7 @@ class TestUDPPublisher(base.TestCase):
|
|||||||
resource_id='test_run_tasks',
|
resource_id='test_run_tasks',
|
||||||
timestamp=datetime.datetime.utcnow().isoformat(),
|
timestamp=datetime.datetime.utcnow().isoformat(),
|
||||||
resource_metadata={'name': 'TestPublish'},
|
resource_metadata={'name': 'TestPublish'},
|
||||||
|
source=COUNTER_SOURCE,
|
||||||
),
|
),
|
||||||
sample.Sample(
|
sample.Sample(
|
||||||
name='test2',
|
name='test2',
|
||||||
@ -64,6 +69,7 @@ class TestUDPPublisher(base.TestCase):
|
|||||||
resource_id='test_run_tasks',
|
resource_id='test_run_tasks',
|
||||||
timestamp=datetime.datetime.utcnow().isoformat(),
|
timestamp=datetime.datetime.utcnow().isoformat(),
|
||||||
resource_metadata={'name': 'TestPublish'},
|
resource_metadata={'name': 'TestPublish'},
|
||||||
|
source=COUNTER_SOURCE,
|
||||||
),
|
),
|
||||||
sample.Sample(
|
sample.Sample(
|
||||||
name='test2',
|
name='test2',
|
||||||
@ -75,6 +81,7 @@ class TestUDPPublisher(base.TestCase):
|
|||||||
resource_id='test_run_tasks',
|
resource_id='test_run_tasks',
|
||||||
timestamp=datetime.datetime.utcnow().isoformat(),
|
timestamp=datetime.datetime.utcnow().isoformat(),
|
||||||
resource_metadata={'name': 'TestPublish'},
|
resource_metadata={'name': 'TestPublish'},
|
||||||
|
source=COUNTER_SOURCE,
|
||||||
),
|
),
|
||||||
sample.Sample(
|
sample.Sample(
|
||||||
name='test3',
|
name='test3',
|
||||||
@ -86,6 +93,7 @@ class TestUDPPublisher(base.TestCase):
|
|||||||
resource_id='test_run_tasks',
|
resource_id='test_run_tasks',
|
||||||
timestamp=datetime.datetime.utcnow().isoformat(),
|
timestamp=datetime.datetime.utcnow().isoformat(),
|
||||||
resource_metadata={'name': 'TestPublish'},
|
resource_metadata={'name': 'TestPublish'},
|
||||||
|
source=COUNTER_SOURCE,
|
||||||
),
|
),
|
||||||
]
|
]
|
||||||
|
|
||||||
@ -99,8 +107,6 @@ class TestUDPPublisher(base.TestCase):
|
|||||||
return udp_socket
|
return udp_socket
|
||||||
return _fake_socket_socket
|
return _fake_socket_socket
|
||||||
|
|
||||||
COUNTER_SOURCE = 'testsource'
|
|
||||||
|
|
||||||
def test_published(self):
|
def test_published(self):
|
||||||
self.data_sent = []
|
self.data_sent = []
|
||||||
with mock.patch('socket.socket',
|
with mock.patch('socket.socket',
|
||||||
@ -108,8 +114,7 @@ class TestUDPPublisher(base.TestCase):
|
|||||||
publisher = udp.UDPPublisher(
|
publisher = udp.UDPPublisher(
|
||||||
network_utils.urlsplit('udp://somehost'))
|
network_utils.urlsplit('udp://somehost'))
|
||||||
publisher.publish_counters(None,
|
publisher.publish_counters(None,
|
||||||
self.test_data,
|
self.test_data)
|
||||||
self.COUNTER_SOURCE)
|
|
||||||
|
|
||||||
self.assertEqual(len(self.data_sent), 5)
|
self.assertEqual(len(self.data_sent), 5)
|
||||||
|
|
||||||
@ -117,10 +122,6 @@ class TestUDPPublisher(base.TestCase):
|
|||||||
|
|
||||||
for data, dest in self.data_sent:
|
for data, dest in self.data_sent:
|
||||||
counter = msgpack.loads(data)
|
counter = msgpack.loads(data)
|
||||||
self.assertEqual(counter['source'], self.COUNTER_SOURCE)
|
|
||||||
# Remove source because our test Counters don't have it, so the
|
|
||||||
# comparison would fail later
|
|
||||||
del counter['source']
|
|
||||||
sent_counters.append(counter)
|
sent_counters.append(counter)
|
||||||
|
|
||||||
# Check destination
|
# Check destination
|
||||||
@ -129,7 +130,7 @@ class TestUDPPublisher(base.TestCase):
|
|||||||
|
|
||||||
# Check that counters are equal
|
# Check that counters are equal
|
||||||
self.assertEqual(sorted(sent_counters),
|
self.assertEqual(sorted(sent_counters),
|
||||||
sorted([dict(d._asdict()) for d in self.test_data]))
|
sorted([dict(d.as_dict()) for d in self.test_data]))
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def _raise_ioerror():
|
def _raise_ioerror():
|
||||||
@ -146,5 +147,4 @@ class TestUDPPublisher(base.TestCase):
|
|||||||
publisher = udp.UDPPublisher(
|
publisher = udp.UDPPublisher(
|
||||||
network_utils.urlsplit('udp://localhost'))
|
network_utils.urlsplit('udp://localhost'))
|
||||||
publisher.publish_counters(None,
|
publisher.publish_counters(None,
|
||||||
self.test_data,
|
self.test_data)
|
||||||
self.COUNTER_SOURCE)
|
|
||||||
|
@ -67,7 +67,7 @@ class TestPipeline(base.TestCase):
|
|||||||
return fake_drivers[url](url)
|
return fake_drivers[url](url)
|
||||||
|
|
||||||
class PublisherClassException(publisher.PublisherBase):
|
class PublisherClassException(publisher.PublisherBase):
|
||||||
def publish_counters(self, ctxt, counters, source):
|
def publish_counters(self, ctxt, counters):
|
||||||
raise Exception()
|
raise Exception()
|
||||||
|
|
||||||
class TransformerClass(transformer.TransformerBase):
|
class TransformerClass(transformer.TransformerBase):
|
||||||
@ -77,13 +77,23 @@ class TestPipeline(base.TestCase):
|
|||||||
self.__class__.samples = []
|
self.__class__.samples = []
|
||||||
self.append_name = append_name
|
self.append_name = append_name
|
||||||
|
|
||||||
def flush(self, ctxt, source):
|
def flush(self, ctxt):
|
||||||
return []
|
return []
|
||||||
|
|
||||||
def handle_sample(self, ctxt, counter, source):
|
def handle_sample(self, ctxt, counter):
|
||||||
self.__class__.samples.append(counter)
|
self.__class__.samples.append(counter)
|
||||||
newname = getattr(counter, 'name') + self.append_name
|
newname = getattr(counter, 'name') + self.append_name
|
||||||
return counter._replace(name=newname)
|
return sample.Sample(
|
||||||
|
name=newname,
|
||||||
|
type=counter.type,
|
||||||
|
volume=counter.volume,
|
||||||
|
unit=counter.unit,
|
||||||
|
user_id=counter.user_id,
|
||||||
|
project_id=counter.project_id,
|
||||||
|
resource_id=counter.resource_id,
|
||||||
|
timestamp=counter.timestamp,
|
||||||
|
resource_metadata=counter.resource_metadata,
|
||||||
|
)
|
||||||
|
|
||||||
class TransformerClassDrop(transformer.TransformerBase):
|
class TransformerClassDrop(transformer.TransformerBase):
|
||||||
samples = []
|
samples = []
|
||||||
@ -91,11 +101,11 @@ class TestPipeline(base.TestCase):
|
|||||||
def __init__(self):
|
def __init__(self):
|
||||||
self.__class__.samples = []
|
self.__class__.samples = []
|
||||||
|
|
||||||
def handle_sample(self, ctxt, counter, source):
|
def handle_sample(self, ctxt, counter):
|
||||||
self.__class__.samples.append(counter)
|
self.__class__.samples.append(counter)
|
||||||
|
|
||||||
class TransformerClassException(object):
|
class TransformerClassException(object):
|
||||||
def handle_sample(self, ctxt, counter, source):
|
def handle_sample(self, ctxt, counter):
|
||||||
raise Exception()
|
raise Exception()
|
||||||
|
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
@ -204,7 +214,7 @@ class TestPipeline(base.TestCase):
|
|||||||
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
|
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
|
||||||
self.transformer_manager)
|
self.transformer_manager)
|
||||||
|
|
||||||
with pipeline_manager.publisher(None, None) as p:
|
with pipeline_manager.publisher(None) as p:
|
||||||
p([self.test_counter])
|
p([self.test_counter])
|
||||||
|
|
||||||
publisher = pipeline_manager.pipelines[0].publishers[0]
|
publisher = pipeline_manager.pipelines[0].publishers[0]
|
||||||
@ -220,14 +230,25 @@ class TestPipeline(base.TestCase):
|
|||||||
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
|
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
|
||||||
self.transformer_manager)
|
self.transformer_manager)
|
||||||
|
|
||||||
with pipeline_manager.publisher(None, None) as p:
|
with pipeline_manager.publisher(None) as p:
|
||||||
p([self.test_counter])
|
p([self.test_counter])
|
||||||
|
|
||||||
publisher = pipeline_manager.pipelines[0].publishers[0]
|
publisher = pipeline_manager.pipelines[0].publishers[0]
|
||||||
self.assertEqual(len(publisher.counters), 1)
|
self.assertEqual(len(publisher.counters), 1)
|
||||||
|
|
||||||
self.test_counter = self.test_counter._replace(name='b')
|
self.test_counter = sample.Sample(
|
||||||
with pipeline_manager.publisher(None, None) as p:
|
name='b',
|
||||||
|
type=self.test_counter.type,
|
||||||
|
volume=self.test_counter.volume,
|
||||||
|
unit=self.test_counter.unit,
|
||||||
|
user_id=self.test_counter.user_id,
|
||||||
|
project_id=self.test_counter.project_id,
|
||||||
|
resource_id=self.test_counter.resource_id,
|
||||||
|
timestamp=self.test_counter.timestamp,
|
||||||
|
resource_metadata=self.test_counter.resource_metadata,
|
||||||
|
)
|
||||||
|
|
||||||
|
with pipeline_manager.publisher(None) as p:
|
||||||
p([self.test_counter])
|
p([self.test_counter])
|
||||||
|
|
||||||
self.assertEqual(len(publisher.counters), 2)
|
self.assertEqual(len(publisher.counters), 2)
|
||||||
@ -240,7 +261,7 @@ class TestPipeline(base.TestCase):
|
|||||||
self.pipeline_cfg[0]['counters'] = counter_cfg
|
self.pipeline_cfg[0]['counters'] = counter_cfg
|
||||||
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
|
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
|
||||||
self.transformer_manager)
|
self.transformer_manager)
|
||||||
with pipeline_manager.publisher(None, None) as p:
|
with pipeline_manager.publisher(None) as p:
|
||||||
p([self.test_counter])
|
p([self.test_counter])
|
||||||
|
|
||||||
publisher = pipeline_manager.pipelines[0].publishers[0]
|
publisher = pipeline_manager.pipelines[0].publishers[0]
|
||||||
@ -260,7 +281,7 @@ class TestPipeline(base.TestCase):
|
|||||||
self.pipeline_cfg[0]['counters'] = counter_cfg
|
self.pipeline_cfg[0]['counters'] = counter_cfg
|
||||||
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
|
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
|
||||||
self.transformer_manager)
|
self.transformer_manager)
|
||||||
with pipeline_manager.publisher(None, None) as p:
|
with pipeline_manager.publisher(None) as p:
|
||||||
p([self.test_counter])
|
p([self.test_counter])
|
||||||
publisher = pipeline_manager.pipelines[0].publishers[0]
|
publisher = pipeline_manager.pipelines[0].publishers[0]
|
||||||
self.assertEqual(len(publisher.counters), 1)
|
self.assertEqual(len(publisher.counters), 1)
|
||||||
@ -273,7 +294,7 @@ class TestPipeline(base.TestCase):
|
|||||||
self.pipeline_cfg[0]['counters'] = counter_cfg
|
self.pipeline_cfg[0]['counters'] = counter_cfg
|
||||||
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
|
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
|
||||||
self.transformer_manager)
|
self.transformer_manager)
|
||||||
with pipeline_manager.publisher(None, None) as p:
|
with pipeline_manager.publisher(None) as p:
|
||||||
p([self.test_counter])
|
p([self.test_counter])
|
||||||
|
|
||||||
publisher = pipeline_manager.pipelines[0].publishers[0]
|
publisher = pipeline_manager.pipelines[0].publishers[0]
|
||||||
@ -309,12 +330,22 @@ class TestPipeline(base.TestCase):
|
|||||||
|
|
||||||
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
|
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
|
||||||
self.transformer_manager)
|
self.transformer_manager)
|
||||||
with pipeline_manager.publisher(None, None) as p:
|
with pipeline_manager.publisher(None) as p:
|
||||||
p([self.test_counter])
|
p([self.test_counter])
|
||||||
|
|
||||||
self.test_counter = self.test_counter._replace(name='b')
|
self.test_counter = sample.Sample(
|
||||||
|
name='b',
|
||||||
|
type=self.test_counter.type,
|
||||||
|
volume=self.test_counter.volume,
|
||||||
|
unit=self.test_counter.unit,
|
||||||
|
user_id=self.test_counter.user_id,
|
||||||
|
project_id=self.test_counter.project_id,
|
||||||
|
resource_id=self.test_counter.resource_id,
|
||||||
|
timestamp=self.test_counter.timestamp,
|
||||||
|
resource_metadata=self.test_counter.resource_metadata,
|
||||||
|
)
|
||||||
|
|
||||||
with pipeline_manager.publisher(None, None) as p:
|
with pipeline_manager.publisher(None) as p:
|
||||||
p([self.test_counter])
|
p([self.test_counter])
|
||||||
|
|
||||||
publisher = pipeline_manager.pipelines[0].publishers[0]
|
publisher = pipeline_manager.pipelines[0].publishers[0]
|
||||||
@ -349,12 +380,22 @@ class TestPipeline(base.TestCase):
|
|||||||
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
|
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
|
||||||
self.transformer_manager)
|
self.transformer_manager)
|
||||||
|
|
||||||
with pipeline_manager.publisher(None, None) as p:
|
with pipeline_manager.publisher(None) as p:
|
||||||
p([self.test_counter])
|
p([self.test_counter])
|
||||||
|
|
||||||
self.test_counter = self.test_counter._replace(name='b')
|
self.test_counter = sample.Sample(
|
||||||
|
name='b',
|
||||||
|
type=self.test_counter.type,
|
||||||
|
volume=self.test_counter.volume,
|
||||||
|
unit=self.test_counter.unit,
|
||||||
|
user_id=self.test_counter.user_id,
|
||||||
|
project_id=self.test_counter.project_id,
|
||||||
|
resource_id=self.test_counter.resource_id,
|
||||||
|
timestamp=self.test_counter.timestamp,
|
||||||
|
resource_metadata=self.test_counter.resource_metadata,
|
||||||
|
)
|
||||||
|
|
||||||
with pipeline_manager.publisher(None, None) as p:
|
with pipeline_manager.publisher(None) as p:
|
||||||
p([self.test_counter])
|
p([self.test_counter])
|
||||||
|
|
||||||
publisher = pipeline_manager.pipelines[0].publishers[0]
|
publisher = pipeline_manager.pipelines[0].publishers[0]
|
||||||
@ -370,7 +411,7 @@ class TestPipeline(base.TestCase):
|
|||||||
self.pipeline_cfg[0]['transformers'] = None
|
self.pipeline_cfg[0]['transformers'] = None
|
||||||
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
|
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
|
||||||
self.transformer_manager)
|
self.transformer_manager)
|
||||||
with pipeline_manager.publisher(None, None) as p:
|
with pipeline_manager.publisher(None) as p:
|
||||||
p([self.test_counter])
|
p([self.test_counter])
|
||||||
publisher = pipeline_manager.pipelines[0].publishers[0]
|
publisher = pipeline_manager.pipelines[0].publishers[0]
|
||||||
self.assertEqual(len(publisher.counters), 1)
|
self.assertEqual(len(publisher.counters), 1)
|
||||||
@ -380,7 +421,7 @@ class TestPipeline(base.TestCase):
|
|||||||
self.pipeline_cfg[0]['transformers'] = []
|
self.pipeline_cfg[0]['transformers'] = []
|
||||||
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
|
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
|
||||||
self.transformer_manager)
|
self.transformer_manager)
|
||||||
with pipeline_manager.publisher(None, None) as p:
|
with pipeline_manager.publisher(None) as p:
|
||||||
p([self.test_counter])
|
p([self.test_counter])
|
||||||
publisher = pipeline_manager.pipelines[0].publishers[0]
|
publisher = pipeline_manager.pipelines[0].publishers[0]
|
||||||
self.assertEqual(len(publisher.counters), 1)
|
self.assertEqual(len(publisher.counters), 1)
|
||||||
@ -400,7 +441,7 @@ class TestPipeline(base.TestCase):
|
|||||||
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
|
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
|
||||||
self.transformer_manager)
|
self.transformer_manager)
|
||||||
|
|
||||||
with pipeline_manager.publisher(None, None) as p:
|
with pipeline_manager.publisher(None) as p:
|
||||||
p([self.test_counter])
|
p([self.test_counter])
|
||||||
|
|
||||||
publisher = pipeline_manager.pipelines[0].publishers[0]
|
publisher = pipeline_manager.pipelines[0].publishers[0]
|
||||||
@ -432,7 +473,7 @@ class TestPipeline(base.TestCase):
|
|||||||
]
|
]
|
||||||
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
|
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
|
||||||
self.transformer_manager)
|
self.transformer_manager)
|
||||||
with pipeline_manager.publisher(None, None) as p:
|
with pipeline_manager.publisher(None) as p:
|
||||||
p([self.test_counter])
|
p([self.test_counter])
|
||||||
|
|
||||||
self.assertTrue(len(self.TransformerClass.samples) == 2)
|
self.assertTrue(len(self.TransformerClass.samples) == 2)
|
||||||
@ -468,7 +509,7 @@ class TestPipeline(base.TestCase):
|
|||||||
]
|
]
|
||||||
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
|
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
|
||||||
self.transformer_manager)
|
self.transformer_manager)
|
||||||
with pipeline_manager.publisher(None, None) as p:
|
with pipeline_manager.publisher(None) as p:
|
||||||
p([self.test_counter])
|
p([self.test_counter])
|
||||||
|
|
||||||
publisher = pipeline_manager.pipelines[0].publishers[0]
|
publisher = pipeline_manager.pipelines[0].publishers[0]
|
||||||
@ -485,7 +526,7 @@ class TestPipeline(base.TestCase):
|
|||||||
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
|
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
|
||||||
self.transformer_manager)
|
self.transformer_manager)
|
||||||
|
|
||||||
with pipeline_manager.publisher(None, None) as p:
|
with pipeline_manager.publisher(None) as p:
|
||||||
p([self.test_counter])
|
p([self.test_counter])
|
||||||
|
|
||||||
publisher = pipeline_manager.pipelines[0].publishers[0]
|
publisher = pipeline_manager.pipelines[0].publishers[0]
|
||||||
@ -501,7 +542,7 @@ class TestPipeline(base.TestCase):
|
|||||||
self.pipeline_cfg[0]['publishers'] = ['except://', 'new://']
|
self.pipeline_cfg[0]['publishers'] = ['except://', 'new://']
|
||||||
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
|
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
|
||||||
self.transformer_manager)
|
self.transformer_manager)
|
||||||
with pipeline_manager.publisher(None, None) as p:
|
with pipeline_manager.publisher(None) as p:
|
||||||
p([self.test_counter])
|
p([self.test_counter])
|
||||||
|
|
||||||
new_publisher = pipeline_manager.pipelines[0].publishers[1]
|
new_publisher = pipeline_manager.pipelines[0].publishers[1]
|
||||||
@ -513,9 +554,19 @@ class TestPipeline(base.TestCase):
|
|||||||
self.pipeline_cfg[0]['counters'] = ['a', 'b']
|
self.pipeline_cfg[0]['counters'] = ['a', 'b']
|
||||||
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
|
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
|
||||||
self.transformer_manager)
|
self.transformer_manager)
|
||||||
with pipeline_manager.publisher(None, None) as p:
|
with pipeline_manager.publisher(None) as p:
|
||||||
p([self.test_counter,
|
p([self.test_counter,
|
||||||
self.test_counter._replace(name='b')])
|
sample.Sample(
|
||||||
|
name='b',
|
||||||
|
type=self.test_counter.type,
|
||||||
|
volume=self.test_counter.volume,
|
||||||
|
unit=self.test_counter.unit,
|
||||||
|
user_id=self.test_counter.user_id,
|
||||||
|
project_id=self.test_counter.project_id,
|
||||||
|
resource_id=self.test_counter.resource_id,
|
||||||
|
timestamp=self.test_counter.timestamp,
|
||||||
|
resource_metadata=self.test_counter.resource_metadata,
|
||||||
|
)])
|
||||||
|
|
||||||
publisher = pipeline_manager.pipelines[0].publishers[0]
|
publisher = pipeline_manager.pipelines[0].publishers[0]
|
||||||
self.assertEqual(len(publisher.counters), 2)
|
self.assertEqual(len(publisher.counters), 2)
|
||||||
@ -543,17 +594,17 @@ class TestPipeline(base.TestCase):
|
|||||||
self.transformer_manager)
|
self.transformer_manager)
|
||||||
pipe = pipeline_manager.pipelines[0]
|
pipe = pipeline_manager.pipelines[0]
|
||||||
|
|
||||||
pipe.publish_counter(None, self.test_counter, None)
|
pipe.publish_counter(None, self.test_counter)
|
||||||
publisher = pipeline_manager.pipelines[0].publishers[0]
|
publisher = pipeline_manager.pipelines[0].publishers[0]
|
||||||
self.assertEqual(len(publisher.counters), 0)
|
self.assertEqual(len(publisher.counters), 0)
|
||||||
pipe.flush(None, None)
|
pipe.flush(None)
|
||||||
self.assertEqual(len(publisher.counters), 0)
|
self.assertEqual(len(publisher.counters), 0)
|
||||||
pipe.publish_counter(None, self.test_counter, None)
|
pipe.publish_counter(None, self.test_counter)
|
||||||
pipe.flush(None, None)
|
pipe.flush(None)
|
||||||
self.assertEqual(len(publisher.counters), 0)
|
self.assertEqual(len(publisher.counters), 0)
|
||||||
for i in range(CACHE_SIZE - 2):
|
for i in range(CACHE_SIZE - 2):
|
||||||
pipe.publish_counter(None, self.test_counter, None)
|
pipe.publish_counter(None, self.test_counter)
|
||||||
pipe.flush(None, None)
|
pipe.flush(None)
|
||||||
self.assertEqual(len(publisher.counters), CACHE_SIZE)
|
self.assertEqual(len(publisher.counters), CACHE_SIZE)
|
||||||
self.assertTrue(getattr(publisher.counters[0], 'name')
|
self.assertTrue(getattr(publisher.counters[0], 'name')
|
||||||
== 'a_update_new')
|
== 'a_update_new')
|
||||||
@ -578,14 +629,24 @@ class TestPipeline(base.TestCase):
|
|||||||
self.pipeline_cfg[0]['counters'] = ['a', 'b']
|
self.pipeline_cfg[0]['counters'] = ['a', 'b']
|
||||||
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
|
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
|
||||||
self.transformer_manager)
|
self.transformer_manager)
|
||||||
with pipeline_manager.publisher(None, None) as p:
|
with pipeline_manager.publisher(None) as p:
|
||||||
p([self.test_counter,
|
p([self.test_counter,
|
||||||
self.test_counter._replace(name='b')])
|
sample.Sample(
|
||||||
|
name='b',
|
||||||
|
type=self.test_counter.type,
|
||||||
|
volume=self.test_counter.volume,
|
||||||
|
unit=self.test_counter.unit,
|
||||||
|
user_id=self.test_counter.user_id,
|
||||||
|
project_id=self.test_counter.project_id,
|
||||||
|
resource_id=self.test_counter.resource_id,
|
||||||
|
timestamp=self.test_counter.timestamp,
|
||||||
|
resource_metadata=self.test_counter.resource_metadata,
|
||||||
|
)])
|
||||||
|
|
||||||
publisher = pipeline_manager.pipelines[0].publishers[0]
|
publisher = pipeline_manager.pipelines[0].publishers[0]
|
||||||
self.assertEqual(len(publisher.counters), 0)
|
self.assertEqual(len(publisher.counters), 0)
|
||||||
|
|
||||||
with pipeline_manager.publisher(None, None) as p:
|
with pipeline_manager.publisher(None) as p:
|
||||||
p([self.test_counter])
|
p([self.test_counter])
|
||||||
|
|
||||||
self.assertEqual(len(publisher.counters), CACHE_SIZE)
|
self.assertEqual(len(publisher.counters), CACHE_SIZE)
|
||||||
@ -604,9 +665,9 @@ class TestPipeline(base.TestCase):
|
|||||||
pipe = pipeline_manager.pipelines[0]
|
pipe = pipeline_manager.pipelines[0]
|
||||||
|
|
||||||
publisher = pipe.publishers[0]
|
publisher = pipe.publishers[0]
|
||||||
pipe.publish_counter(None, self.test_counter, None)
|
pipe.publish_counter(None, self.test_counter)
|
||||||
self.assertEqual(len(publisher.counters), 0)
|
self.assertEqual(len(publisher.counters), 0)
|
||||||
pipe.flush(None, None)
|
pipe.flush(None)
|
||||||
self.assertEqual(len(publisher.counters), 1)
|
self.assertEqual(len(publisher.counters), 1)
|
||||||
self.assertEqual(getattr(publisher.counters[0], 'name'),
|
self.assertEqual(getattr(publisher.counters[0], 'name'),
|
||||||
'a_update')
|
'a_update')
|
||||||
@ -625,9 +686,19 @@ class TestPipeline(base.TestCase):
|
|||||||
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
|
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
|
||||||
self.transformer_manager)
|
self.transformer_manager)
|
||||||
|
|
||||||
self.test_counter = self.test_counter._replace(name='a:b')
|
self.test_counter = sample.Sample(
|
||||||
|
name='a:b',
|
||||||
|
type=self.test_counter.type,
|
||||||
|
volume=self.test_counter.volume,
|
||||||
|
unit=self.test_counter.unit,
|
||||||
|
user_id=self.test_counter.user_id,
|
||||||
|
project_id=self.test_counter.project_id,
|
||||||
|
resource_id=self.test_counter.resource_id,
|
||||||
|
timestamp=self.test_counter.timestamp,
|
||||||
|
resource_metadata=self.test_counter.resource_metadata,
|
||||||
|
)
|
||||||
|
|
||||||
with pipeline_manager.publisher(None, None) as p:
|
with pipeline_manager.publisher(None) as p:
|
||||||
p([self.test_counter])
|
p([self.test_counter])
|
||||||
|
|
||||||
publisher = pipeline_manager.pipelines[0].publishers[0]
|
publisher = pipeline_manager.pipelines[0].publishers[0]
|
||||||
@ -670,10 +741,10 @@ class TestPipeline(base.TestCase):
|
|||||||
self.transformer_manager)
|
self.transformer_manager)
|
||||||
pipe = pipeline_manager.pipelines[0]
|
pipe = pipeline_manager.pipelines[0]
|
||||||
|
|
||||||
pipe.publish_counters(None, counters, None)
|
pipe.publish_counters(None, counters)
|
||||||
publisher = pipeline_manager.pipelines[0].publishers[0]
|
publisher = pipeline_manager.pipelines[0].publishers[0]
|
||||||
self.assertEqual(len(publisher.counters), 1)
|
self.assertEqual(len(publisher.counters), 1)
|
||||||
pipe.flush(None, None)
|
pipe.flush(None)
|
||||||
self.assertEqual(len(publisher.counters), 1)
|
self.assertEqual(len(publisher.counters), 1)
|
||||||
cpu_mins = publisher.counters[-1]
|
cpu_mins = publisher.counters[-1]
|
||||||
self.assertEquals(getattr(cpu_mins, 'name'), 'cpu_mins')
|
self.assertEquals(getattr(cpu_mins, 'name'), 'cpu_mins')
|
||||||
@ -723,7 +794,7 @@ class TestPipeline(base.TestCase):
|
|||||||
self.transformer_manager)
|
self.transformer_manager)
|
||||||
pipe = pipeline_manager.pipelines[0]
|
pipe = pipeline_manager.pipelines[0]
|
||||||
|
|
||||||
pipe.publish_counters(None, counters, None)
|
pipe.publish_counters(None, counters)
|
||||||
publisher = pipeline_manager.pipelines[0].publishers[0]
|
publisher = pipeline_manager.pipelines[0].publishers[0]
|
||||||
self.assertEqual(len(publisher.counters), 2)
|
self.assertEqual(len(publisher.counters), 2)
|
||||||
core_temp = publisher.counters[1]
|
core_temp = publisher.counters[1]
|
||||||
@ -812,10 +883,10 @@ class TestPipeline(base.TestCase):
|
|||||||
self.transformer_manager)
|
self.transformer_manager)
|
||||||
pipe = pipeline_manager.pipelines[0]
|
pipe = pipeline_manager.pipelines[0]
|
||||||
|
|
||||||
pipe.publish_counters(None, counters, None)
|
pipe.publish_counters(None, counters)
|
||||||
publisher = pipeline_manager.pipelines[0].publishers[0]
|
publisher = pipeline_manager.pipelines[0].publishers[0]
|
||||||
self.assertEqual(len(publisher.counters), 2)
|
self.assertEqual(len(publisher.counters), 2)
|
||||||
pipe.flush(None, None)
|
pipe.flush(None)
|
||||||
self.assertEqual(len(publisher.counters), 2)
|
self.assertEqual(len(publisher.counters), 2)
|
||||||
cpu_util = publisher.counters[0]
|
cpu_util = publisher.counters[0]
|
||||||
self.assertEquals(getattr(cpu_util, 'name'), 'cpu_util')
|
self.assertEquals(getattr(cpu_util, 'name'), 'cpu_util')
|
||||||
@ -896,8 +967,8 @@ class TestPipeline(base.TestCase):
|
|||||||
self.transformer_manager)
|
self.transformer_manager)
|
||||||
pipe = pipeline_manager.pipelines[0]
|
pipe = pipeline_manager.pipelines[0]
|
||||||
|
|
||||||
pipe.publish_counters(None, counters, None)
|
pipe.publish_counters(None, counters)
|
||||||
publisher = pipeline_manager.pipelines[0].publishers[0]
|
publisher = pipeline_manager.pipelines[0].publishers[0]
|
||||||
self.assertEqual(len(publisher.counters), 0)
|
self.assertEqual(len(publisher.counters), 0)
|
||||||
pipe.flush(None, None)
|
pipe.flush(None)
|
||||||
self.assertEqual(len(publisher.counters), 0)
|
self.assertEqual(len(publisher.counters), 0)
|
||||||
|
Loading…
Reference in New Issue
Block a user