diff --git a/ceilometer/publisher/__init__.py b/ceilometer/publisher/__init__.py index 303b11a71..9783ac138 100644 --- a/ceilometer/publisher/__init__.py +++ b/ceilometer/publisher/__init__.py @@ -20,7 +20,7 @@ import abc from stevedore import driver -import urlparse +from ceilometer.openstack.common import network_utils def get_publisher(url, namespace='ceilometer.publisher'): @@ -29,7 +29,7 @@ def get_publisher(url, namespace='ceilometer.publisher'): :param URL: URL for the publisher :param namespace: Namespace to use to look for drivers. """ - parse_result = urlparse.urlparse(url) + parse_result = network_utils.urlsplit(url) loaded_driver = driver.DriverManager(namespace, parse_result.scheme) return loaded_driver.driver(parse_result) diff --git a/ceilometer/publisher/rpc.py b/ceilometer/publisher/rpc.py index 8ec8e3cb1..25001b027 100644 --- a/ceilometer/publisher/rpc.py +++ b/ceilometer/publisher/rpc.py @@ -21,7 +21,9 @@ import hashlib import hmac import itertools +import operator import uuid +import urlparse from oslo.config import cfg @@ -103,12 +105,19 @@ def meter_message_from_counter(counter, secret, source): class RPCPublisher(publisher.PublisherBase): - def publish_counters(self, context, counters, source): - """Send a metering message for publishing - :param context: Execution context from the service or RPC call - :param counter: Counter from pipeline after transformation - :param source: counter source + def __init__(self, parsed_url): + options = urlparse.parse_qs(parsed_url.query) + self.per_meter_topic = bool(int( + options.get('per_meter_topic', [0])[-1])) + + def publish_counters(self, context, counters, source): + """Publish counters on RPC. + + :param context: Execution context from the service or RPC call. + :param counters: Counters from pipeline after transformation. + :param source: Counter source. + """ meters = [ @@ -125,15 +134,20 @@ class RPCPublisher(publisher.PublisherBase): 'version': '1.0', 'args': {'data': meters}, } - LOG.debug('PUBLISH: %s', str(msg)) + LOG.audit('Publishing %d counters on %s', + len(msg['args']['data']), topic) rpc.cast(context, topic, msg) - for meter_name, meter_list in itertools.groupby( - sorted(meters, key=lambda m: m['counter_name']), - lambda m: m['counter_name']): - msg = { - 'method': 'record_metering_data', - 'version': '1.0', - 'args': {'data': list(meter_list)}, - } - rpc.cast(context, topic + '.' + meter_name, msg) + if self.per_meter_topic: + for meter_name, meter_list in itertools.groupby( + sorted(meters, key=operator.itemgetter('counter_name')), + operator.itemgetter('counter_name')): + msg = { + 'method': 'record_metering_data', + 'version': '1.0', + 'args': {'data': list(meter_list)}, + } + topic_name = topic + '.' + meter_name + LOG.audit('Publishing %d counters on %s', + len(msg['args']['data']), topic_name) + rpc.cast(context, topic_name, msg) diff --git a/tests/publisher/test_rpc_publisher.py b/tests/publisher/test_rpc_publisher.py index 7ddd32067..97daee0fc 100644 --- a/tests/publisher/test_rpc_publisher.py +++ b/tests/publisher/test_rpc_publisher.py @@ -3,6 +3,7 @@ # Copyright © 2012 New Dream Network, LLC (DreamHost) # # Author: Doug Hellmann +# Julien Danjou # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain @@ -24,6 +25,7 @@ from oslo.config import cfg from ceilometer import counter from ceilometer.openstack.common import jsonutils +from ceilometer.openstack.common import network_utils from ceilometer.openstack.common import rpc as oslo_rpc from ceilometer.publisher import rpc from ceilometer.tests import base @@ -255,12 +257,24 @@ class TestPublish(base.TestCase): super(TestPublish, self).setUp() self.published = [] self.stubs.Set(oslo_rpc, 'cast', self.faux_cast) - publisher = rpc.RPCPublisher(None) + + def test_published(self): + publisher = rpc.RPCPublisher( + network_utils.urlsplit('rpc://')) publisher.publish_counters(None, self.test_data, 'test') + self.assertEqual(len(self.published), 1) + self.assertEqual(self.published[0][0], + cfg.CONF.publisher_rpc.metering_topic) + self.assertIsInstance(self.published[0][1]['args']['data'], list) - def test_published(self): + def test_published_with_per_meter_topic(self): + publisher = rpc.RPCPublisher( + network_utils.urlsplit('rpc://?per_meter_topic=1')) + publisher.publish_counters(None, + self.test_data, + 'test') self.assertEqual(len(self.published), 4) for topic, rpc_call in self.published: meters = rpc_call['args']['data'] @@ -271,7 +285,6 @@ class TestPublish(base.TestCase): 1, "Meter are published grouped by name") - def test_published_topics(self): topics = [topic for topic, meter in self.published] self.assertIn(cfg.CONF.publisher_rpc.metering_topic, topics) self.assertIn( diff --git a/tests/publisher/test_udp.py b/tests/publisher/test_udp.py index 6e81270bc..e77fb9cc0 100644 --- a/tests/publisher/test_udp.py +++ b/tests/publisher/test_udp.py @@ -22,11 +22,11 @@ import datetime import mock import msgpack from oslo.config import cfg -import urlparse from ceilometer import counter from ceilometer.publisher import udp from ceilometer.tests import base +from ceilometer.openstack.common import network_utils class TestUDPPublisher(base.TestCase): @@ -105,7 +105,8 @@ class TestUDPPublisher(base.TestCase): self.data_sent = [] with mock.patch('socket.socket', self._make_fake_socket(self.data_sent)): - publisher = udp.UDPPublisher(urlparse.urlparse('udp://somehost')) + publisher = udp.UDPPublisher( + network_utils.urlsplit('udp://somehost')) publisher.publish_counters(None, self.test_data, self.COUNTER_SOURCE) @@ -142,7 +143,8 @@ class TestUDPPublisher(base.TestCase): def test_publish_error(self): with mock.patch('socket.socket', self._make_broken_socket): - publisher = udp.UDPPublisher(urlparse.urlparse('udp://localhost')) + publisher = udp.UDPPublisher( + network_utils.urlsplit('udp://localhost')) publisher.publish_counters(None, self.test_data, self.COUNTER_SOURCE)