diff --git a/ceilometer/agent.py b/ceilometer/agent.py index d1d690cdb..7638b7300 100644 --- a/ceilometer/agent.py +++ b/ceilometer/agent.py @@ -28,7 +28,7 @@ LOG = log.getLogger(__name__) class PollingTask(object): - """Polling task for polling counters and inject into pipeline. + """Polling task for polling samples and inject into pipeline. A polling task can be invoked periodically or only once. """ @@ -45,7 +45,7 @@ class PollingTask(object): @abc.abstractmethod def poll_and_publish(self): - """Polling counter and publish into pipeline.""" + """Polling sample and publish into pipeline.""" class AgentManager(object): @@ -70,7 +70,7 @@ class AgentManager(object): for pipeline, pollster in itertools.product( self.pipeline_manager.pipelines, self.pollster_manager.extensions): - if pipeline.support_counter(pollster.name): + if pipeline.support_meter(pollster.name): polling_task = polling_tasks.get(pipeline.interval, None) if not polling_task: polling_task = self.create_polling_task() diff --git a/ceilometer/objectstore/swift_middleware.py b/ceilometer/objectstore/swift_middleware.py index 3919ab42a..35ae13146 100644 --- a/ceilometer/objectstore/swift_middleware.py +++ b/ceilometer/objectstore/swift_middleware.py @@ -100,19 +100,19 @@ class CeilometerMiddleware(object): bytes_sent += len(chunk) yield chunk finally: - self.publish_counter(env, - input_proxy.bytes_received, - bytes_sent) + self.publish_sample(env, + input_proxy.bytes_received, + bytes_sent) try: iterable = self.app(env, my_start_response) except Exception: - self.publish_counter(env, input_proxy.bytes_received, 0) + self.publish_sample(env, input_proxy.bytes_received, 0) raise else: return iter_response(iterable) - def publish_counter(self, env, bytes_received, bytes_sent): + def publish_sample(self, env, bytes_received, bytes_sent): req = REQUEST.Request(env) version, account, container, obj = split_path(req.path, 1, 4, True) now = timeutils.utcnow().isoformat() diff --git a/ceilometer/pipeline.py b/ceilometer/pipeline.py index f8f49c092..48d9a1388 100644 --- a/ceilometer/pipeline.py +++ b/ceilometer/pipeline.py @@ -18,6 +18,7 @@ import itertools import os +import operator from oslo.config import cfg import yaml @@ -57,10 +58,10 @@ class PublishContext(object): self.pipelines.update(pipelines) def __enter__(self): - def p(counters): + def p(samples): for p in self.pipelines: - p.publish_counters(self.context, - counters) + p.publish_samples(self.context, + samples) return p def __exit__(self, exc_type, exc_value, traceback): @@ -74,9 +75,9 @@ class Pipeline(object): Pipeline describes a chain of handlers. The chain starts with tranformer and ends with one or more publishers. - The first transformer in the chain gets counter from data collector, i.e. + The first transformer in the chain gets sample from data collector, i.e. pollster or notification handler, takes some action like dropping, - aggregation, changing field etc, then passes the updated counter + aggregation, changing field etc, then passes the updated sample to next step. The subsequent transformers, if any, handle the data similarly. @@ -85,7 +86,7 @@ class Pipeline(object): method depends on publisher type, for example, pushing into data storage through message bus, sending to external CW software through CW API call. - If no transformer is included in the chain, the publishers get counters + If no transformer is included in the chain, the publishers get samples from data collector and publish them directly. """ @@ -99,7 +100,8 @@ class Pipeline(object): self.interval = int(cfg['interval']) except ValueError: raise PipelineException("Invalid interval value", cfg) - self.counters = cfg['counters'] + # Support 'counters' for backward compatibility + self.meters = cfg.get('meters', cfg.get('counters')) # It's legal to have no transformer specified self.transformer_cfg = cfg['transformers'] or [] except KeyError as err: @@ -109,7 +111,7 @@ class Pipeline(object): if self.interval <= 0: raise PipelineException("Interval value should > 0", cfg) - self._check_counters() + self._check_meters() if not cfg.get('publishers'): raise PipelineException("No publisher specified", cfg) @@ -129,28 +131,28 @@ class Pipeline(object): def __str__(self): return self.name - def _check_counters(self): - """Counter rules checking + def _check_meters(self): + """Meter rules checking - At least one meaningful counter exist - Included type and excluded type counter can't co-exist at + At least one meaningful meter exist + Included type and excluded type meter can't co-exist at the same pipeline - Included type counter and wildcard can't co-exist at same pipeline + Included type meter and wildcard can't co-exist at same pipeline """ - counters = self.counters - if not counters: - raise PipelineException("No counter specified", self.cfg) + meters = self.meters + if not meters: + raise PipelineException("No meter specified", self.cfg) - if [x for x in counters if x[0] not in '!*'] and \ - [x for x in counters if x[0] == '!']: + if [x for x in meters if x[0] not in '!*'] and \ + [x for x in meters if x[0] == '!']: raise PipelineException( - "Both included and excluded counters specified", + "Both included and excluded meters specified", cfg) - if '*' in counters and [x for x in counters if x[0] not in '!*']: + if '*' in meters and [x for x in meters if x[0] not in '!*']: raise PipelineException( - "Included counters specified with wildcard", + "Included meters specified with wildcard", self.cfg) def _setup_transformers(self, cfg, transformer_manager): @@ -173,90 +175,90 @@ class Pipeline(object): return transformers - def _transform_counter(self, start, ctxt, counter): + def _transform_sample(self, start, ctxt, sample): try: for transformer in self.transformers[start:]: - counter = transformer.handle_sample(ctxt, counter) - if not counter: - LOG.debug("Pipeline %s: Counter dropped by transformer %s", + sample = transformer.handle_sample(ctxt, sample) + if not sample: + LOG.debug("Pipeline %s: Sample dropped by transformer %s", self, transformer) return - return counter + return sample except Exception as err: LOG.warning("Pipeline %s: Exit after error from transformer" "%s for %s", - self, transformer, counter) + self, transformer, sample) LOG.exception(err) - def _publish_counters(self, start, ctxt, counters): - """Push counter into pipeline for publishing. + def _publish_samples(self, start, ctxt, samples): + """Push samples into pipeline for publishing. - param start: the first transformer that the counter will be injected. - This is mainly for flush() invocation that transformer - may emit counters - param ctxt: execution context from the manager or service - param counters: counter list + :param start: The first transformer that the sample will be injected. + This is mainly for flush() invocation that transformer + may emit samples. + :param ctxt: Execution context from the manager or service. + :param samples: Sample list. """ - transformed_counters = [] - for counter in counters: - LOG.debug("Pipeline %s: Transform counter %s from %s transformer", - self, counter, start) - counter = self._transform_counter(start, ctxt, counter) - if counter: - transformed_counters.append(counter) + transformed_samples = [] + for sample in samples: + LOG.debug("Pipeline %s: Transform sample %s from %s transformer", + self, sample, start) + sample = self._transform_sample(start, ctxt, sample) + if sample: + transformed_samples.append(sample) - LOG.audit("Pipeline %s: Publishing counters", self) + LOG.audit("Pipeline %s: Publishing samples", self) for p in self.publishers: try: - p.publish_counters(ctxt, transformed_counters) + p.publish_samples(ctxt, transformed_samples) except Exception: LOG.exception("Pipeline %s: Continue after error " "from publisher %s", self, p) - LOG.audit("Pipeline %s: Published counters", self) + LOG.audit("Pipeline %s: Published samples", self) - def publish_counter(self, ctxt, counter): - self.publish_counters(ctxt, [counter]) + def publish_sample(self, ctxt, sample): + self.publish_samples(ctxt, [sample]) - def publish_counters(self, ctxt, counters): - for counter_name, counters in itertools.groupby( - sorted(counters, key=lambda c: c.name), - lambda c: c.name): - if self.support_counter(counter_name): - self._publish_counters(0, ctxt, counters) + def publish_samples(self, ctxt, samples): + for meter_name, samples in itertools.groupby( + sorted(samples, key=operator.attrgetter('name')), + operator.attrgetter('name')): + if self.support_meter(meter_name): + self._publish_samples(0, ctxt, samples) - # (yjiang5) To support counters like instance:m1.tiny, + # (yjiang5) To support meters like instance:m1.tiny, # which include variable part at the end starting with ':'. - # Hope we will not add such counters in future. - def _variable_counter_name(self, name): + # Hope we will not add such meters in future. + def _variable_meter_name(self, name): m = name.partition(':') if m[1] == ':': return m[1].join((m[0], '*')) else: return name - def support_counter(self, counter_name): - counter_name = self._variable_counter_name(counter_name) - if ('!' + counter_name) in self.counters: + def support_meter(self, meter_name): + meter_name = self._variable_meter_name(meter_name) + if ('!' + meter_name) in self.meters: return False - if '*' in self.counters: + if '*' in self.meters: return True - elif self.counters[0][0] == '!': - return not ('!' + counter_name) in self.counters + elif self.meters[0][0] == '!': + return not ('!' + meter_name) in self.meters else: - return counter_name in self.counters + return meter_name in self.meters def flush(self, ctxt): - """Flush data after all counter have been injected to pipeline.""" + """Flush data after all samples have been injected to pipeline.""" LOG.audit("Flush pipeline %s", self) for (i, transformer) in enumerate(self.transformers): try: - self._publish_counters(i + 1, ctxt, - list(transformer.flush(ctxt))) + self._publish_samples(i + 1, ctxt, + list(transformer.flush(ctxt))) except Exception as err: LOG.warning( "Pipeline %s: Error flushing " @@ -283,12 +285,12 @@ class PipelineManager(object): The top of the cfg is a list of pipeline definitions. - Pipeline definition is an dictionary specifying the target counters, + Pipeline definition is an dictionary specifying the target samples, the tranformers involved, and the target publishers: { "name": pipeline_name "interval": interval_time - "counters" : ["counter_1", "counter_2"], + "meters" : ["meter_1", "meter_2"], "tranformers":[ {"name": "Transformer_1", "parameters": {"p1": "value"}}, @@ -299,20 +301,19 @@ class PipelineManager(object): "publishers": ["publisher_1", "publisher_2"] } - Interval is how many seconds should the counters be injected to + Interval is how many seconds should the samples be injected to the pipeline. - Valid counter format is '*', '!counter_name', or 'counter_name'. - '*' is wildcard symbol means any counters; '!counter_name' means - "counter_name" will be excluded; 'counter_name' means 'counter_name' + Valid meter format is '*', '!meter_name', or 'meter_name'. + '*' is wildcard symbol means any meters; '!meter_name' means + "meter_name" will be excluded; 'meter_name' means 'meter_name' will be included. - The 'counter_name" is Counter namedtuple's name field. For counter - names with variable like "instance:m1.tiny", it's "instance:*", as - returned by get_counter_list(). + The 'meter_name" is Sample name field. For meter names with + variable like "instance:m1.tiny", it's "instance:*". - Valid counters definition is all "included counter names", all - "excluded counter names", wildcard and "excluded counter names", or + Valid meters definition is all "included meter names", all + "excluded meter names", wildcard and "excluded meter names", or only wildcard. Transformer's name is plugin name in setup.py. @@ -327,7 +328,6 @@ class PipelineManager(object): """Build a new Publisher for these manager pipelines. :param context: The context. - :param source: Counter source. """ return PublishContext(context, self.pipelines) diff --git a/ceilometer/publisher/__init__.py b/ceilometer/publisher/__init__.py index 9ffbae2b0..33995778d 100644 --- a/ceilometer/publisher/__init__.py +++ b/ceilometer/publisher/__init__.py @@ -43,5 +43,5 @@ class PublisherBase(object): pass @abc.abstractmethod - def publish_counters(self, context, counters): + def publish_samples(self, context, samples): "Publish counters into final conduit." diff --git a/ceilometer/publisher/file.py b/ceilometer/publisher/file.py index 919db5395..d59267253 100644 --- a/ceilometer/publisher/file.py +++ b/ceilometer/publisher/file.py @@ -86,7 +86,7 @@ class FilePublisher(publisher.PublisherBase): rfh.setLevel(logging.INFO) self.publisher_logger.addHandler(rfh) - def publish_counters(self, context, counters): + def publish_samples(self, context, counters): """Send a metering message for publishing :param context: Execution context from the service or RPC call diff --git a/ceilometer/publisher/rpc.py b/ceilometer/publisher/rpc.py index a5734ccaf..fcaeb4e93 100644 --- a/ceilometer/publisher/rpc.py +++ b/ceilometer/publisher/rpc.py @@ -136,7 +136,7 @@ class RPCPublisher(publisher.PublisherBase): % self.policy) self.policy = 'default' - def publish_counters(self, context, counters): + def publish_samples(self, context, counters): """Publish counters on RPC. :param context: Execution context from the service or RPC call. diff --git a/ceilometer/publisher/test.py b/ceilometer/publisher/test.py index 8dd0bf30b..ec725f3ab 100644 --- a/ceilometer/publisher/test.py +++ b/ceilometer/publisher/test.py @@ -27,7 +27,7 @@ class TestPublisher(publisher.PublisherBase): def __init__(self, parsed_url): self.counters = [] - def publish_counters(self, context, counters): + def publish_samples(self, context, counters): """Send a metering message for publishing :param context: Execution context from the service or RPC call diff --git a/ceilometer/publisher/udp.py b/ceilometer/publisher/udp.py index 6cf9feda0..0ee651407 100644 --- a/ceilometer/publisher/udp.py +++ b/ceilometer/publisher/udp.py @@ -41,7 +41,7 @@ class UDPPublisher(publisher.PublisherBase): self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - def publish_counters(self, context, counters): + def publish_samples(self, context, counters): """Send a metering message for publishing :param context: Execution context from the service or RPC call diff --git a/etc/ceilometer/pipeline.yaml b/etc/ceilometer/pipeline.yaml index 3e58f7b93..731e755a5 100644 --- a/etc/ceilometer/pipeline.yaml +++ b/etc/ceilometer/pipeline.yaml @@ -2,7 +2,7 @@ - name: meter_pipeline interval: 600 - counters: + meters: - "*" transformers: publishers: @@ -10,7 +10,7 @@ - name: cpu_pipeline interval: 600 - counters: + meters: - "cpu" transformers: - name: "rate_of_change" diff --git a/tests/objectstore/test_swift_middleware.py b/tests/objectstore/test_swift_middleware.py index e3ccbeb74..b27d06b21 100644 --- a/tests/objectstore/test_swift_middleware.py +++ b/tests/objectstore/test_swift_middleware.py @@ -49,7 +49,7 @@ class TestSwiftMiddleware(base.TestCase): self.pipeline_manager = pipeline_manager self.counters = [] - def publish_counters(self, ctxt, counters): + def publish_samples(self, ctxt, counters): self.counters.extend(counters) def flush(self, context): diff --git a/tests/publisher/test_file.py b/tests/publisher/test_file.py index 62249f214..f9b7ee4df 100644 --- a/tests/publisher/test_file.py +++ b/tests/publisher/test_file.py @@ -71,8 +71,8 @@ class TestFilePublisher(base.TestCase): parsed_url = urlsplit( 'file:///tmp/log_file?max_bytes=50&backup_count=3') publisher = file.FilePublisher(parsed_url) - publisher.publish_counters(None, - self.test_data) + publisher.publish_samples(None, + self.test_data) handler = publisher.publisher_logger.handlers[0] self.assertTrue(isinstance(handler, @@ -87,8 +87,8 @@ class TestFilePublisher(base.TestCase): parsed_url = urlsplit( 'file:///tmp/log_file_plain') publisher = file.FilePublisher(parsed_url) - publisher.publish_counters(None, - self.test_data) + publisher.publish_samples(None, + self.test_data) handler = publisher.publisher_logger.handlers[0] self.assertTrue(isinstance(handler, @@ -104,7 +104,7 @@ class TestFilePublisher(base.TestCase): parsed_url = urlsplit( 'file:///tmp/log_file_bad?max_bytes=yus&backup_count=5y') publisher = file.FilePublisher(parsed_url) - publisher.publish_counters(None, - self.test_data) + publisher.publish_samples(None, + self.test_data) self.assertIsNone(publisher.publisher_logger) diff --git a/tests/publisher/test_rpc_publisher.py b/tests/publisher/test_rpc_publisher.py index 0eff7540a..a072d8f8b 100644 --- a/tests/publisher/test_rpc_publisher.py +++ b/tests/publisher/test_rpc_publisher.py @@ -220,8 +220,8 @@ class TestPublish(base.TestCase): def test_published(self): publisher = rpc.RPCPublisher( network_utils.urlsplit('rpc://')) - publisher.publish_counters(None, - self.test_data) + publisher.publish_samples(None, + self.test_data) self.assertEqual(len(self.published), 1) self.assertEqual(self.published[0][0], cfg.CONF.publisher_rpc.metering_topic) @@ -232,8 +232,8 @@ class TestPublish(base.TestCase): def test_publish_target(self): publisher = rpc.RPCPublisher( network_utils.urlsplit('rpc://?target=custom_procedure_call')) - publisher.publish_counters(None, - self.test_data) + publisher.publish_samples(None, + self.test_data) self.assertEqual(len(self.published), 1) self.assertEqual(self.published[0][0], cfg.CONF.publisher_rpc.metering_topic) @@ -244,8 +244,8 @@ class TestPublish(base.TestCase): def test_published_with_per_meter_topic(self): publisher = rpc.RPCPublisher( network_utils.urlsplit('rpc://?per_meter_topic=1')) - publisher.publish_counters(None, - self.test_data) + publisher.publish_samples(None, + self.test_data) self.assertEqual(len(self.published), 4) for topic, rpc_call in self.published: meters = rpc_call['args']['data'] @@ -271,7 +271,7 @@ class TestPublish(base.TestCase): network_utils.urlsplit('rpc://')) self.assertRaises( SystemExit, - publisher.publish_counters, + publisher.publish_samples, None, self.test_data) self.assertEqual(publisher.policy, 'default') self.assertEqual(len(self.published), 0) @@ -283,7 +283,7 @@ class TestPublish(base.TestCase): network_utils.urlsplit('rpc://?policy=default')) self.assertRaises( SystemExit, - publisher.publish_counters, + publisher.publish_samples, None, self.test_data) self.assertEqual(len(self.published), 0) self.assertEqual(len(publisher.local_queue), 0) @@ -294,7 +294,7 @@ class TestPublish(base.TestCase): network_utils.urlsplit('rpc://?policy=notexist')) self.assertRaises( SystemExit, - publisher.publish_counters, + publisher.publish_samples, None, self.test_data) self.assertEqual(publisher.policy, 'default') self.assertEqual(len(self.published), 0) @@ -304,8 +304,8 @@ class TestPublish(base.TestCase): self.rpc_unreachable = True publisher = rpc.RPCPublisher( network_utils.urlsplit('rpc://?policy=drop')) - publisher.publish_counters(None, - self.test_data) + publisher.publish_samples(None, + self.test_data) self.assertEqual(len(self.published), 0) self.assertEqual(len(publisher.local_queue), 0) @@ -313,8 +313,8 @@ class TestPublish(base.TestCase): self.rpc_unreachable = True publisher = rpc.RPCPublisher( network_utils.urlsplit('rpc://?policy=queue')) - publisher.publish_counters(None, - self.test_data) + publisher.publish_samples(None, + self.test_data) self.assertEqual(len(self.published), 0) self.assertEqual(len(publisher.local_queue), 1) @@ -322,14 +322,14 @@ class TestPublish(base.TestCase): self.rpc_unreachable = True publisher = rpc.RPCPublisher( network_utils.urlsplit('rpc://?policy=queue')) - publisher.publish_counters(None, - self.test_data) + publisher.publish_samples(None, + self.test_data) self.assertEqual(len(self.published), 0) self.assertEqual(len(publisher.local_queue), 1) self.rpc_unreachable = False - publisher.publish_counters(None, - self.test_data) + publisher.publish_samples(None, + self.test_data) self.assertEqual(len(self.published), 2) self.assertEqual(len(publisher.local_queue), 0) @@ -341,8 +341,8 @@ class TestPublish(base.TestCase): for i in range(0, 5): for s in self.test_data: s.source = 'test-%d' % i - publisher.publish_counters(None, - self.test_data) + publisher.publish_samples(None, + self.test_data) self.assertEqual(len(self.published), 0) self.assertEqual(len(publisher.local_queue), 3) self.assertEqual( @@ -365,8 +365,8 @@ class TestPublish(base.TestCase): for i in range(0, 2000): for s in self.test_data: s.source = 'test-%d' % i - publisher.publish_counters(None, - self.test_data) + publisher.publish_samples(None, + self.test_data) self.assertEqual(len(self.published), 0) self.assertEqual(len(publisher.local_queue), 1024) self.assertEqual( diff --git a/tests/publisher/test_udp.py b/tests/publisher/test_udp.py index db9722f69..7ee83958d 100644 --- a/tests/publisher/test_udp.py +++ b/tests/publisher/test_udp.py @@ -113,8 +113,8 @@ class TestUDPPublisher(base.TestCase): self._make_fake_socket(self.data_sent)): publisher = udp.UDPPublisher( network_utils.urlsplit('udp://somehost')) - publisher.publish_counters(None, - self.test_data) + publisher.publish_samples(None, + self.test_data) self.assertEqual(len(self.data_sent), 5) @@ -146,5 +146,5 @@ class TestUDPPublisher(base.TestCase): self._make_broken_socket): publisher = udp.UDPPublisher( network_utils.urlsplit('udp://localhost')) - publisher.publish_counters(None, - self.test_data) + publisher.publish_samples(None, + self.test_data) diff --git a/tests/test_pipeline.py b/tests/test_pipeline.py index c779a6e21..1b38b2408 100644 --- a/tests/test_pipeline.py +++ b/tests/test_pipeline.py @@ -67,7 +67,7 @@ class TestPipeline(base.TestCase): return fake_drivers[url](url) class PublisherClassException(publisher.PublisherBase): - def publish_counters(self, ctxt, counters): + def publish_samples(self, ctxt, counters): raise Exception() class TransformerClass(transformer.TransformerBase): @@ -274,7 +274,7 @@ class TestPipeline(base.TestCase): self.pipeline_cfg[0]['counters'] = counter_cfg pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, self.transformer_manager) - self.assertFalse(pipeline_manager.pipelines[0].support_counter('a')) + self.assertFalse(pipeline_manager.pipelines[0].support_meter('a')) def test_wildcard_excluded_counters_not_excluded(self): counter_cfg = ['*', '!b'] @@ -309,9 +309,9 @@ class TestPipeline(base.TestCase): self.pipeline_cfg[0]['counters'] = counter_cfg pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, self.transformer_manager) - self.assertFalse(pipeline_manager.pipelines[0].support_counter('a')) - self.assertTrue(pipeline_manager.pipelines[0].support_counter('b')) - self.assertFalse(pipeline_manager.pipelines[0].support_counter('c')) + self.assertFalse(pipeline_manager.pipelines[0].support_meter('a')) + self.assertTrue(pipeline_manager.pipelines[0].support_meter('b')) + self.assertFalse(pipeline_manager.pipelines[0].support_meter('c')) def test_multiple_pipeline(self): self.pipeline_cfg.append({ @@ -594,16 +594,16 @@ class TestPipeline(base.TestCase): self.transformer_manager) pipe = pipeline_manager.pipelines[0] - pipe.publish_counter(None, self.test_counter) + pipe.publish_sample(None, self.test_counter) publisher = pipeline_manager.pipelines[0].publishers[0] self.assertEqual(len(publisher.counters), 0) pipe.flush(None) self.assertEqual(len(publisher.counters), 0) - pipe.publish_counter(None, self.test_counter) + pipe.publish_sample(None, self.test_counter) pipe.flush(None) self.assertEqual(len(publisher.counters), 0) for i in range(CACHE_SIZE - 2): - pipe.publish_counter(None, self.test_counter) + pipe.publish_sample(None, self.test_counter) pipe.flush(None) self.assertEqual(len(publisher.counters), CACHE_SIZE) self.assertTrue(getattr(publisher.counters[0], 'name') @@ -665,7 +665,7 @@ class TestPipeline(base.TestCase): pipe = pipeline_manager.pipelines[0] publisher = pipe.publishers[0] - pipe.publish_counter(None, self.test_counter) + pipe.publish_sample(None, self.test_counter) self.assertEqual(len(publisher.counters), 0) pipe.flush(None) self.assertEqual(len(publisher.counters), 1) @@ -741,7 +741,7 @@ class TestPipeline(base.TestCase): self.transformer_manager) pipe = pipeline_manager.pipelines[0] - pipe.publish_counters(None, counters) + pipe.publish_samples(None, counters) publisher = pipeline_manager.pipelines[0].publishers[0] self.assertEqual(len(publisher.counters), 1) pipe.flush(None) @@ -794,7 +794,7 @@ class TestPipeline(base.TestCase): self.transformer_manager) pipe = pipeline_manager.pipelines[0] - pipe.publish_counters(None, counters) + pipe.publish_samples(None, counters) publisher = pipeline_manager.pipelines[0].publishers[0] self.assertEqual(len(publisher.counters), 2) core_temp = publisher.counters[1] @@ -883,7 +883,7 @@ class TestPipeline(base.TestCase): self.transformer_manager) pipe = pipeline_manager.pipelines[0] - pipe.publish_counters(None, counters) + pipe.publish_samples(None, counters) publisher = pipeline_manager.pipelines[0].publishers[0] self.assertEqual(len(publisher.counters), 2) pipe.flush(None) @@ -967,7 +967,7 @@ class TestPipeline(base.TestCase): self.transformer_manager) pipe = pipeline_manager.pipelines[0] - pipe.publish_counters(None, counters) + pipe.publish_samples(None, counters) publisher = pipeline_manager.pipelines[0].publishers[0] self.assertEqual(len(publisher.counters), 0) pipe.flush(None)