From 59b5a5a488946b0d6e1f8d239a9cada21201c320 Mon Sep 17 00:00:00 2001 From: Julien Danjou Date: Tue, 5 Feb 2013 11:32:58 +0100 Subject: [PATCH] Allow to publish several counters in a row Support publishing of multiple counters at once. This change the pipeline and RPC calls to send a list of counter by default instead of only once counter. This allows to make only one RPC calls when sending multiple counters to be more efficient. This implements blueprint publish-counters-list-rpc Change-Id: Ie4155b35585f261e6ff9816e5a845a479151eefd Signed-off-by: Julien Danjou --- ceilometer/collector/service.py | 49 +++++----- ceilometer/objectstore/swift_middleware.py | 12 +-- ceilometer/pipeline.py | 49 ++++++---- ceilometer/plugin.py | 4 +- ceilometer/publisher/meter_publish.py | 31 ++++-- ceilometer/tests/base.py | 4 +- tests/objectstore/test_swift_middleware.py | 4 +- tests/publisher/test_meter_publisher.py | 105 ++++++++++++++++----- tests/test_pipeline.py | 6 +- tools/test-requires | 1 + tools/test-requires-folsom | 1 + 11 files changed, 177 insertions(+), 89 deletions(-) diff --git a/ceilometer/collector/service.py b/ceilometer/collector/service.py index 3abb1c276..09fa6b31f 100644 --- a/ceilometer/collector/service.py +++ b/ceilometer/collector/service.py @@ -18,7 +18,7 @@ from stevedore import dispatch -from ceilometer.collector import meter +from ceilometer.collector import meter as meter_api from ceilometer import extension_manager from ceilometer import pipeline from ceilometer import service @@ -124,27 +124,32 @@ class CollectorService(service.PeriodicService): """This method is triggered when metering data is cast from an agent. """ - #LOG.info('metering data: %r', data) - LOG.info('metering data %s for %s @ %s: %s', - data['counter_name'], - data['resource_id'], - data.get('timestamp', 'NO TIMESTAMP'), - data['counter_volume']) - if not meter.verify_signature(data, cfg.CONF.metering_secret): - LOG.warning('message signature invalid, discarding message: %r', - data) - else: - try: - # Convert the timestamp to a datetime instance. - # Storage engines are responsible for converting - # that value to something they can store. - if data.get('timestamp'): - ts = timeutils.parse_isotime(data['timestamp']) - data['timestamp'] = timeutils.normalize_time(ts) - self.storage_conn.record_metering_data(data) - except Exception as err: - LOG.error('Failed to record metering data: %s', err) - LOG.exception(err) + # We may have receive only one counter on the wire + if not isinstance(data, list): + data = [data] + + for meter in data: + LOG.info('metering data %s for %s @ %s: %s', + meter['counter_name'], + meter['resource_id'], + meter.get('timestamp', 'NO TIMESTAMP'), + meter['counter_volume']) + if meter_api.verify_signature(meter, cfg.CONF.metering_secret): + try: + # Convert the timestamp to a datetime instance. + # Storage engines are responsible for converting + # that value to something they can store. + if meter.get('timestamp'): + ts = timeutils.parse_isotime(meter['timestamp']) + meter['timestamp'] = timeutils.normalize_time(ts) + self.storage_conn.record_metering_data(meter) + except Exception as err: + LOG.error('Failed to record metering data: %s', err) + LOG.exception(err) + else: + LOG.warning( + 'message signature invalid, discarding message: %r', + meter) def periodic_tasks(self, context): pass diff --git a/ceilometer/objectstore/swift_middleware.py b/ceilometer/objectstore/swift_middleware.py index c0eea3c5d..6ad6dc253 100644 --- a/ceilometer/objectstore/swift_middleware.py +++ b/ceilometer/objectstore/swift_middleware.py @@ -96,9 +96,9 @@ class CeilometerMiddleware(object): now = timeutils.utcnow().isoformat() if bytes_received: - self.pipeline_manager.publish_counter( + self.pipeline_manager.publish_counters( context.get_admin_context(), - counter.Counter( + [counter.Counter( name='storage.objects.incoming.bytes', type='delta', unit='B', @@ -112,13 +112,13 @@ class CeilometerMiddleware(object): "version": version, "container": container, "object": obj, - }, ), + }, )], cfg.CONF.counter_source) if bytes_sent: - self.pipeline_manager.publish_counter( + self.pipeline_manager.publish_counters( context.get_admin_context(), - counter.Counter( + [counter.Counter( name='storage.objects.outgoing.bytes', type='delta', unit='B', @@ -132,7 +132,7 @@ class CeilometerMiddleware(object): "version": version, "container": container, "object": obj, - }), + })], cfg.CONF.counter_source) diff --git a/ceilometer/pipeline.py b/ceilometer/pipeline.py index cfce6680e..8d5e7e7f2 100644 --- a/ceilometer/pipeline.py +++ b/ceilometer/pipeline.py @@ -16,12 +16,12 @@ # License for the specific language governing permissions and limitations # under the License. +import itertools import os from stevedore import extension import yaml -from ceilometer import extension_manager from ceilometer.openstack.common import cfg from ceilometer.openstack.common import log @@ -170,13 +170,12 @@ class Pipeline(object): return transformers - def _publish_counter_to_one_publisher(self, ext, ctxt, counter, source): + def _publish_counters_to_one_publisher(self, ext, ctxt, counters, source): try: - ext.obj.publish_counter(ctxt, counter, source) + ext.obj.publish_counters(ctxt, counters, source) except Exception as err: LOG.warning("Pipeline %s: Continue after error " - "from publisher %s for %s", - self, ext.name, counter) + "from publisher %s", self, ext.name) LOG.exception(err) def _transform_counter(self, start, ctxt, counter, source): @@ -194,36 +193,45 @@ class Pipeline(object): self, transformer, counter) LOG.exception(err) - def _publish_counter(self, start, ctxt, counter, source): + def _publish_counters(self, start, ctxt, counters, source): """Push counter into pipeline for publishing. param start: the first transformer that the counter will be injected. This is mainly for flush() invocation that transformer may emit counters param ctxt: execution context from the manager or service - param counter: counter + param counters: counter list param source: counter source """ - LOG.audit("Pipeline %s: Transform counter %s from %s transformer", - self, counter, start) - counter = self._transform_counter(start, ctxt, counter, source) - if not counter: - return - LOG.audit("Pipeline %s: Publish counter %s", self, counter) + transformed_counters = [] + for counter in counters: + LOG.audit("Pipeline %s: Transform counter %s from %s transformer", + self, counter, start) + counter = self._transform_counter(start, ctxt, counter, source) + if counter: + transformed_counters.append(counter) + + LOG.audit("Pipeline %s: Publishing counters", self) self.publisher_manager.map(self.publishers, - self._publish_counter_to_one_publisher, + self._publish_counters_to_one_publisher, ctxt=ctxt, - counter=counter, + counters=transformed_counters, source=source, ) - LOG.audit("Pipeline %s: Published counter %s", self, counter) + LOG.audit("Pipeline %s: Published counters", self) def publish_counter(self, ctxt, counter, source): - if self.support_counter(counter.name): - self._publish_counter(0, ctxt, counter, source) + self.publish_counters(ctxt, [counter], source) + + def publish_counters(self, ctxt, counters, source): + for counter_name, counters in itertools.groupby( + sorted(counters, key=lambda c: c.name), + lambda c: c.name): + if self.support_counter(counter_name): + self._publish_counters(0, ctxt, counters, source) # (yjiang5) To support counters like instance:m1.tiny, # which include variable part at the end starting with ':'. @@ -251,8 +259,9 @@ class Pipeline(object): LOG.audit("Flush pipeline %s", self) for (i, transformer) in enumerate(self.transformers): - for c in transformer.flush(ctxt, source): - self._publish_counter(i + 1, ctxt, c, source) + self._publish_counters(i + 1, ctxt, + list(transformer.flush(ctxt, source)), + source) def get_interval(self): return self.interval diff --git a/ceilometer/plugin.py b/ceilometer/plugin.py index 3ee34b699..e5725844f 100644 --- a/ceilometer/plugin.py +++ b/ceilometer/plugin.py @@ -94,8 +94,8 @@ class PublisherBase(PluginBase): __metaclass__ = abc.ABCMeta @abc.abstractmethod - def publish_counter(self, context, counter, source): - "publish counters into final conduit" + def publish_counters(self, context, counters, source): + "Publish counters into final conduit" class TransformerBase(PluginBase): diff --git a/ceilometer/publisher/meter_publish.py b/ceilometer/publisher/meter_publish.py index 182c47303..232951159 100644 --- a/ceilometer/publisher/meter_publish.py +++ b/ceilometer/publisher/meter_publish.py @@ -18,7 +18,9 @@ """Publish a counter using the preferred RPC mechanism. """ -from ceilometer.collector import meter +import itertools + +from ceilometer.collector import meter as meter_api from ceilometer.openstack.common import cfg from ceilometer.openstack.common import log from ceilometer.openstack.common import rpc @@ -44,23 +46,36 @@ register_opts(cfg.CONF) class MeterPublisher(plugin.PublisherBase): - def publish_counter(self, context, counter, source): + def publish_counters(self, context, counters, source): """Send a metering message for publishing :param context: Execution context from the service or RPC call :param counter: Counter from pipeline after transformation :param source: counter source """ + + meters = [ + meter_api.meter_message_from_counter(counter, + cfg.CONF.metering_secret, + source) + for counter in counters + ] + topic = cfg.CONF.metering_topic msg = { 'method': 'record_metering_data', 'version': '1.0', - 'args': {'data': meter.meter_message_from_counter( - counter, - cfg.CONF.metering_secret, - source), - }, + 'args': {'data': meters}, } LOG.debug('PUBLISH: %s', str(msg)) rpc.cast(context, topic, msg) - rpc.cast(context, topic + '.' + counter.name, msg) + + for meter_name, meter_list in itertools.groupby( + sorted(meters, key=lambda m: m['counter_name']), + lambda m: m['counter_name']): + msg = { + 'method': 'record_metering_data', + 'version': '1.0', + 'args': {'data': list(meter_list)}, + } + rpc.cast(context, topic + '.' + meter_name, msg) diff --git a/ceilometer/tests/base.py b/ceilometer/tests/base.py index 8613d459c..be92f5a0b 100644 --- a/ceilometer/tests/base.py +++ b/ceilometer/tests/base.py @@ -19,13 +19,13 @@ """Test base classes. """ -import unittest +import unittest2 import mox import stubout -class TestCase(unittest.TestCase): +class TestCase(unittest2.TestCase): def setUp(self): super(TestCase, self).setUp() diff --git a/tests/objectstore/test_swift_middleware.py b/tests/objectstore/test_swift_middleware.py index a018e2e28..8e9410863 100644 --- a/tests/objectstore/test_swift_middleware.py +++ b/tests/objectstore/test_swift_middleware.py @@ -46,8 +46,8 @@ class TestSwiftMiddleware(base.TestCase): def __init__(self): self.counters = [] - def publish_counter(self, context, counter, source): - self.counters.append(counter) + def publish_counters(self, context, counters, source): + self.counters.extend(counters) def _faux_setup_pipeline(self, publisher_manager): return self.pipeline_manager diff --git a/tests/publisher/test_meter_publisher.py b/tests/publisher/test_meter_publisher.py index fe6aa302a..049f8981c 100644 --- a/tests/publisher/test_meter_publisher.py +++ b/tests/publisher/test_meter_publisher.py @@ -20,6 +20,7 @@ import datetime +from ceilometer.openstack.common import cfg from ceilometer.openstack.common import rpc from ceilometer.tests import base @@ -29,34 +30,90 @@ from ceilometer.publisher import meter_publish class TestPublish(base.TestCase): - test_data = counter.Counter( - name='test', - type=counter.TYPE_CUMULATIVE, - unit='', - volume=1, - user_id='test', - project_id='test', - resource_id='test_run_tasks', - timestamp=datetime.datetime.utcnow().isoformat(), - resource_metadata={'name': 'TestPublish'}, - ) + test_data = [ + counter.Counter( + name='test', + type=counter.TYPE_CUMULATIVE, + unit='', + volume=1, + user_id='test', + project_id='test', + resource_id='test_run_tasks', + timestamp=datetime.datetime.utcnow().isoformat(), + resource_metadata={'name': 'TestPublish'}, + ), + counter.Counter( + name='test', + type=counter.TYPE_CUMULATIVE, + unit='', + volume=1, + user_id='test', + project_id='test', + resource_id='test_run_tasks', + timestamp=datetime.datetime.utcnow().isoformat(), + resource_metadata={'name': 'TestPublish'}, + ), + counter.Counter( + name='test2', + type=counter.TYPE_CUMULATIVE, + unit='', + volume=1, + user_id='test', + project_id='test', + resource_id='test_run_tasks', + timestamp=datetime.datetime.utcnow().isoformat(), + resource_metadata={'name': 'TestPublish'}, + ), + counter.Counter( + name='test2', + type=counter.TYPE_CUMULATIVE, + unit='', + volume=1, + user_id='test', + project_id='test', + resource_id='test_run_tasks', + timestamp=datetime.datetime.utcnow().isoformat(), + resource_metadata={'name': 'TestPublish'}, + ), + counter.Counter( + name='test3', + type=counter.TYPE_CUMULATIVE, + unit='', + volume=1, + user_id='test', + project_id='test', + resource_id='test_run_tasks', + timestamp=datetime.datetime.utcnow().isoformat(), + resource_metadata={'name': 'TestPublish'}, + ), + ] - def faux_notify(self, context, topic, msg): - self.notifications.append((topic, msg)) + def faux_cast(self, context, topic, msg): + self.published.append((topic, msg)) def setUp(self): super(TestPublish, self).setUp() - self.notifications = [] - self.stubs.Set(rpc, 'cast', self.faux_notify) + self.published = [] + self.stubs.Set(rpc, 'cast', self.faux_cast) publisher = meter_publish.MeterPublisher() - publisher.publish_counter(None, - self.test_data, - 'test', - ) + publisher.publish_counters(None, + self.test_data, + 'test') - def test_notify(self): - assert len(self.notifications) == 2 + def test_published(self): + self.assertEqual(len(self.published), 4) + for topic, rpc_call in self.published: + meters = rpc_call['args']['data'] + self.assertIsInstance(meters, list) + if topic != cfg.CONF.metering_topic: + self.assertEqual(len(set(meter['counter_name'] + for meter in meters)), + 1, + "Meter are published grouped by name") - def test_notify_topics(self): - topics = [n[0] for n in self.notifications] - assert topics == ['metering', 'metering.test'] + def test_published_topics(self): + topics = [topic for topic, meter in self.published] + self.assertIn(cfg.CONF.metering_topic, topics) + self.assertIn(cfg.CONF.metering_topic + '.' + 'test', topics) + self.assertIn(cfg.CONF.metering_topic + '.' + 'test2', topics) + self.assertIn(cfg.CONF.metering_topic + '.' + 'test3', topics) diff --git a/tests/test_pipeline.py b/tests/test_pipeline.py index 10bb1eefe..29fc77757 100644 --- a/tests/test_pipeline.py +++ b/tests/test_pipeline.py @@ -57,11 +57,11 @@ class TestPipeline(base.TestCase): def __init__(self): self.counters = [] - def publish_counter(self, ctxt, counter, source): - self.counters.append(counter) + def publish_counters(self, ctxt, counters, source): + self.counters.extend(counters) class PublisherClassException(): - def publish_counter(self, ctxt, counter, source): + def publish_counters(self, ctxt, counters, source): raise Exception() class TransformerClass(object): diff --git a/tools/test-requires b/tools/test-requires index 66b0be3e4..2208c23f3 100644 --- a/tools/test-requires +++ b/tools/test-requires @@ -1,4 +1,5 @@ nose +unittest2 coverage mock mox diff --git a/tools/test-requires-folsom b/tools/test-requires-folsom index 9ac3453cc..cd2b40237 100644 --- a/tools/test-requires-folsom +++ b/tools/test-requires-folsom @@ -1,5 +1,6 @@ http://tarballs.openstack.org/nova/nova-stable-folsom.tar.gz nose +unittest2 coverage mock mox