From aa842c26ac5c966ea68a1bac5eabba4cc0d34c3f Mon Sep 17 00:00:00 2001 From: Julien Danjou Date: Tue, 11 Jun 2013 15:31:50 +0200 Subject: [PATCH] Replace publisher name by URL This allows to specify any amount of configuration option for each publisher in each pipeline. UDP is the first to use this to possibly publish to differents (host, port). Blueprint: pipeline-publisher-url Change-Id: I6a6f53ff438395c7bbbafad30efe097e1e01e1c6 Signed-off-by: Julien Danjou --- ceilometer/pipeline.py | 3 ++ ceilometer/publisher/__init__.py | 13 ++++-- ceilometer/publisher/test.py | 2 +- ceilometer/publisher/udp.py | 43 +++++------------ etc/ceilometer/ceilometer.conf.sample | 62 ++++++++++--------------- etc/ceilometer/pipeline.yaml | 2 +- tests/publisher/test_meter_publisher.py | 2 +- tests/publisher/test_udp.py | 9 ++-- tests/test_pipeline.py | 20 ++++---- 9 files changed, 66 insertions(+), 90 deletions(-) diff --git a/ceilometer/pipeline.py b/ceilometer/pipeline.py index 664d14700..3b2d69996 100644 --- a/ceilometer/pipeline.py +++ b/ceilometer/pipeline.py @@ -118,6 +118,9 @@ class Pipeline(object): self.publishers = [] for p in cfg['publishers']: + if '://' not in p: + # Support old format without URL + p = p + "://" try: self.publishers.append(publisher.get_publisher(p)) except Exception: diff --git a/ceilometer/publisher/__init__.py b/ceilometer/publisher/__init__.py index 33fe6a6e8..303b11a71 100644 --- a/ceilometer/publisher/__init__.py +++ b/ceilometer/publisher/__init__.py @@ -20,16 +20,18 @@ import abc from stevedore import driver +import urlparse -def get_publisher(name, namespace='ceilometer.publisher'): +def get_publisher(url, namespace='ceilometer.publisher'): """Get publisher driver and load it. - :param name: Name of the publisher driver. + :param URL: URL for the publisher :param namespace: Namespace to use to look for drivers. """ - loaded_driver = driver.DriverManager(namespace, name) - return loaded_driver.driver() + parse_result = urlparse.urlparse(url) + loaded_driver = driver.DriverManager(namespace, parse_result.scheme) + return loaded_driver.driver(parse_result) class PublisherBase(object): @@ -37,6 +39,9 @@ class PublisherBase(object): __metaclass__ = abc.ABCMeta + def __init__(self, parsed_url): + pass + @abc.abstractmethod def publish_counters(self, context, counters, source): "Publish counters into final conduit." diff --git a/ceilometer/publisher/test.py b/ceilometer/publisher/test.py index 8d12d8d56..2b2ffe3a9 100644 --- a/ceilometer/publisher/test.py +++ b/ceilometer/publisher/test.py @@ -24,7 +24,7 @@ from ceilometer import publisher class TestPublisher(publisher.PublisherBase): """Publisher used in unit testing.""" - def __init__(self): + def __init__(self, parsed_url): self.counters = [] def publish_counters(self, context, counters, source): diff --git a/ceilometer/publisher/udp.py b/ceilometer/publisher/udp.py index 12e33ce2e..0677c8c2b 100644 --- a/ceilometer/publisher/udp.py +++ b/ceilometer/publisher/udp.py @@ -20,42 +20,24 @@ from ceilometer import publisher from ceilometer.openstack.common import log +from ceilometer.openstack.common import network_utils from ceilometer.openstack.common.gettextutils import _ -from oslo.config import cfg import msgpack import socket +from oslo.config import cfg + +cfg.CONF.import_opt('udp_port', 'ceilometer.collector.service', + group='collector') LOG = log.getLogger(__name__) -UDP_PUBLISH_GROUP = cfg.OptGroup(name='publisher_udp', - title='Options for UDP publisher') - -UDP_PUBLISH_OPTS = [ - cfg.StrOpt('host', - default="localhost", - help='The host target to publish metering records to.', - ), - cfg.IntOpt('port', - default=4952, - help='The port to send UDP meters to.', - ), -] - - -def register_opts(config): - """Register the options for publishing UDP messages. - """ - config.register_group(UDP_PUBLISH_GROUP) - config.register_opts(UDP_PUBLISH_OPTS, - group=UDP_PUBLISH_GROUP) - - -register_opts(cfg.CONF) - class UDPPublisher(publisher.PublisherBase): - def __init__(self): + def __init__(self, parsed_url): + self.host, self.port = network_utils.parse_host_port( + parsed_url.netloc, + default_port=cfg.CONF.collector.udp_port) self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) @@ -70,14 +52,13 @@ class UDPPublisher(publisher.PublisherBase): for counter in counters: msg = counter._asdict() msg['source'] = source - host = cfg.CONF.publisher_udp.host - port = cfg.CONF.publisher_udp.port + host = self.host + port = self.port LOG.debug(_("Publishing counter %(msg)s over " "UDP to %(host)s:%(port)d") % locals()) try: self.socket.sendto(msgpack.dumps(msg), - (cfg.CONF.publisher_udp.host, - cfg.CONF.publisher_udp.port)) + (self.host, self.port)) except Exception as e: LOG.warn(_("Unable to send counter over UDP")) LOG.exception(e) diff --git a/etc/ceilometer/ceilometer.conf.sample b/etc/ceilometer/ceilometer.conf.sample index 87a396add..b402c93d7 100644 --- a/etc/ceilometer/ceilometer.conf.sample +++ b/etc/ceilometer/ceilometer.conf.sample @@ -16,17 +16,6 @@ #pipeline_cfg_file=pipeline.yaml -# -# Options defined in ceilometer.policy -# - -# JSON file representing policy (string value) -#policy_file=policy.json - -# Rule checked when requested rule is not found (string value) -#policy_default_rule=default - - # # Options defined in ceilometer.api.app # @@ -239,6 +228,18 @@ #notification_topics=notifications +# +# Options defined in ceilometer.openstack.common.policy +# + +# JSON file containing policy (string value) +#policy_file=policy.json + +# Rule enforced when requested rule is not found (string +# value) +#policy_default_rule=default + + # # Options defined in ceilometer.openstack.common.rpc # @@ -273,15 +274,6 @@ #control_exchange=openstack -# -# Options defined in ceilometer.openstack.common.rpc.amqp -# - -# Enable a fast single reply queue if using AMQP based RPC -# like RabbitMQ or Qpid. (boolean value) -#amqp_rpc_single_reply_queue=false - - # # Options defined in ceilometer.openstack.common.rpc.impl_kombu # @@ -407,7 +399,7 @@ # Name of this node. Must be a valid hostname, FQDN, or IP # address. Must match "host" option, if running Nova. (string # value) -#rpc_zmq_host=dex +#rpc_zmq_host=ceilometer # @@ -445,20 +437,6 @@ #cinder_control_exchange=cinder -[publisher_udp] - -# -# Options defined in ceilometer.publisher.udp -# - -# The host target to publish metering records to. (string -# value) -#host=localhost - -# The port to send UDP meters to. (integer value) -#port=4952 - - [database] # @@ -479,7 +457,11 @@ # The SQLAlchemy connection string used to connect to the # database (string value) -#connection=sqlite:////common/db/$sqlite_db +#connection=sqlite:////ceilometer/openstack/common/db/$sqlite_db + +# The SQLAlchemy connection string used to connect to the +# slave database (string value) +#slave_connection= # timeout before idle sql connections are reaped (integer # value) @@ -491,7 +473,7 @@ # Maximum number of SQL connections to keep open in a pool # (integer value) -#max_pool_size=5 +#max_pool_size= # maximum db connection retries during startup. (setting -1 # implies an infinite retry count) (integer value) @@ -513,6 +495,10 @@ # value) #connection_trace=false +# If set, use this value for pool_timeout with sqlalchemy +# (integer value) +#pool_timeout= + [publisher_meter] @@ -614,4 +600,4 @@ #password= -# Total option count: 117 +# Total option count: 116 diff --git a/etc/ceilometer/pipeline.yaml b/etc/ceilometer/pipeline.yaml index ff1ebc730..cad1160ab 100644 --- a/etc/ceilometer/pipeline.yaml +++ b/etc/ceilometer/pipeline.yaml @@ -6,4 +6,4 @@ - "*" transformers: publishers: - - meter + - meter:// diff --git a/tests/publisher/test_meter_publisher.py b/tests/publisher/test_meter_publisher.py index d2ac32e56..e60a866da 100644 --- a/tests/publisher/test_meter_publisher.py +++ b/tests/publisher/test_meter_publisher.py @@ -255,7 +255,7 @@ class TestPublish(base.TestCase): super(TestPublish, self).setUp() self.published = [] self.stubs.Set(rpc, 'cast', self.faux_cast) - publisher = meter.MeterPublisher() + publisher = meter.MeterPublisher(None) publisher.publish_counters(None, self.test_data, 'test') diff --git a/tests/publisher/test_udp.py b/tests/publisher/test_udp.py index e0b571ea6..6e81270bc 100644 --- a/tests/publisher/test_udp.py +++ b/tests/publisher/test_udp.py @@ -22,6 +22,7 @@ import datetime import mock import msgpack from oslo.config import cfg +import urlparse from ceilometer import counter from ceilometer.publisher import udp @@ -104,7 +105,7 @@ class TestUDPPublisher(base.TestCase): self.data_sent = [] with mock.patch('socket.socket', self._make_fake_socket(self.data_sent)): - publisher = udp.UDPPublisher() + publisher = udp.UDPPublisher(urlparse.urlparse('udp://somehost')) publisher.publish_counters(None, self.test_data, self.COUNTER_SOURCE) @@ -122,8 +123,8 @@ class TestUDPPublisher(base.TestCase): sent_counters.append(counter) # Check destination - self.assertEqual(dest, (cfg.CONF.publisher_udp.host, - cfg.CONF.publisher_udp.port)) + self.assertEqual(dest, ('somehost', + cfg.CONF.collector.udp_port)) # Check that counters are equal self.assertEqual(sorted(sent_counters), @@ -141,7 +142,7 @@ class TestUDPPublisher(base.TestCase): def test_publish_error(self): with mock.patch('socket.socket', self._make_broken_socket): - publisher = udp.UDPPublisher() + publisher = udp.UDPPublisher(urlparse.urlparse('udp://localhost')) publisher.publish_counters(None, self.test_data, self.COUNTER_SOURCE) diff --git a/tests/test_pipeline.py b/tests/test_pipeline.py index 8a2785c74..de20344c9 100644 --- a/tests/test_pipeline.py +++ b/tests/test_pipeline.py @@ -54,13 +54,13 @@ class TestPipeline(base.TestCase): raise KeyError(name) - def get_publisher(self, name, namespace=''): - fake_drivers = {'test': test_publisher.TestPublisher, - 'new': test_publisher.TestPublisher, - 'except': self.PublisherClassException} - return fake_drivers[name]() + def get_publisher(self, url, namespace=''): + fake_drivers = {'test://': test_publisher.TestPublisher, + 'new://': test_publisher.TestPublisher, + 'except://': self.PublisherClassException} + return fake_drivers[url](url) - class PublisherClassException(): + class PublisherClassException(publisher.PublisherBase): def publish_counters(self, ctxt, counters, source): raise Exception() @@ -127,7 +127,7 @@ class TestPipeline(base.TestCase): {'name': "update", 'parameters': {}} ], - 'publishers': ["test"], + 'publishers': ["test://"], }, ] def _exception_create_pipelinemanager(self): @@ -475,7 +475,7 @@ class TestPipeline(base.TestCase): == 'a_update') def test_multiple_publisher(self): - self.pipeline_cfg[0]['publishers'] = ['test', 'new'] + self.pipeline_cfg[0]['publishers'] = ['test://', 'new://'] pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, self.transformer_manager) @@ -492,7 +492,7 @@ class TestPipeline(base.TestCase): 'a_update') def test_multiple_publisher_isolation(self): - self.pipeline_cfg[0]['publishers'] = ['except', 'new'] + self.pipeline_cfg[0]['publishers'] = ['except://', 'new://'] pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, self.transformer_manager) with pipeline_manager.publisher(None, None) as p: @@ -614,7 +614,7 @@ class TestPipeline(base.TestCase): {'name': "update", 'parameters': {}} ], - 'publishers': ["test"], + 'publishers': ["test://"], }, ] pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, self.transformer_manager)