From ae9ccbd227783582c133c00660c6c52e4c8efade Mon Sep 17 00:00:00 2001 From: Julien Danjou Date: Mon, 13 May 2013 17:35:42 +0200 Subject: [PATCH] Add an UDP publisher and receiver Blueprint: udp-publishing Change-Id: Ia281ab1d2924c8126895d7475f48f9cc886f92d1 Signed-off-by: Julien Danjou --- ceilometer/collector/service.py | 86 +++++++++++++-- ceilometer/publisher/udp.py | 83 ++++++++++++++ etc/ceilometer/ceilometer.conf.sample | 55 +++++++--- setup.cfg | 4 + tests/collector/test_manager.py | 87 ++++++++++++++- tests/publisher/test_udp.py | 149 ++++++++++++++++++++++++++ tools/pip-requires | 1 + 7 files changed, 435 insertions(+), 30 deletions(-) create mode 100644 ceilometer/publisher/udp.py create mode 100644 tests/publisher/test_udp.py diff --git a/ceilometer/collector/service.py b/ceilometer/collector/service.py index 296e8a030..b2cd4c783 100644 --- a/ceilometer/collector/service.py +++ b/ceilometer/collector/service.py @@ -16,12 +16,19 @@ # License for the specific language governing permissions and limitations # under the License. +import eventlet from oslo.config import cfg +import msgpack +import socket +import sys from ceilometer.publisher import meter as publisher_meter from ceilometer import extension_manager +from ceilometer.service import prepare_service from ceilometer.openstack.common import context +from ceilometer.openstack.common import gettextutils from ceilometer.openstack.common import log +from ceilometer.openstack.common import service as os_service from ceilometer.openstack.common.rpc import dispatcher as rpc_dispatcher # Import rpc_notifier to register `notification_topics` flag so that @@ -40,24 +47,84 @@ OPTS = [ cfg.ListOpt('disabled_notification_listeners', default=[], help='list of listener plugins to disable', - ), + deprecated_group="DEFAULT"), + cfg.StrOpt('udp_address', + default='0.0.0.0', + help='address to bind the UDP socket to' + 'disabled if set to an empty string'), + cfg.IntOpt('udp_port', + default=4952, + help='port to bind the UDP socket to'), ] -cfg.CONF.register_opts(OPTS) +cfg.CONF.register_opts(OPTS, group="collector") LOG = log.getLogger(__name__) +def get_storage_connection(conf): + storage.register_opts(conf) + storage_engine = storage.get_engine(conf) + return storage_engine.get_connection(conf) + + +class UDPCollectorService(os_service.Service): + """UDP listener for the collector service.""" + + def __init__(self): + super(UDPCollectorService, self).__init__() + self.storage_conn = get_storage_connection(cfg.CONF) + + def start(self): + """Bind the UDP socket and handle incoming data.""" + super(UDPCollectorService, self).start() + + udp = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + udp.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + udp.bind((cfg.CONF.collector.udp_address, + cfg.CONF.collector.udp_port)) + + self.running = True + while self.running: + # NOTE(jd) Arbitrary limit of 64K because that ought to be + # enough for anybody. + data, source = udp.recvfrom(64 * 1024) + try: + counter = msgpack.loads(data) + except: + LOG.warn(_("UDP: Cannot decode data sent by %s"), str(source)) + else: + try: + counter['counter_name'] = counter['name'] + counter['counter_volume'] = counter['volume'] + counter['counter_unit'] = counter['unit'] + counter['counter_type'] = counter['type'] + LOG.debug("UDP: Storing %s", str(counter)) + self.storage_conn.record_metering_data(counter) + except Exception as err: + LOG.debug(_("UDP: Unable to store meter")) + LOG.exception(err) + + def stop(self): + self.running = False + super(UDPCollectorService, self).stop() + + +def udp_collector(): + # TODO(jd) move into prepare_service gettextutils and eventlet? + eventlet.monkey_patch() + gettextutils.install('ceilometer') + prepare_service(sys.argv) + os_service.launch(UDPCollectorService()).wait() + + class CollectorService(service.PeriodicService): COLLECTOR_NAMESPACE = 'ceilometer.collector' - def start(self): - super(CollectorService, self).start() - - storage.register_opts(cfg.CONF) - self.storage_engine = storage.get_engine(cfg.CONF) - self.storage_conn = self.storage_engine.get_connection(cfg.CONF) + def __init__(self, host, topic, manager=None): + super(CollectorService, self).__init__(host, topic, manager) + self.storage_conn = get_storage_connection(cfg.CONF) def initialize_service_hook(self, service): '''Consumers must be declared before consume_thread start.''' @@ -76,7 +143,8 @@ class CollectorService(service.PeriodicService): self.notification_manager = \ extension_manager.ActivatedExtensionManager( namespace=self.COLLECTOR_NAMESPACE, - disabled_names=cfg.CONF.disabled_notification_listeners, + disabled_names= + cfg.CONF.collector.disabled_notification_listeners, ) if not list(self.notification_manager): diff --git a/ceilometer/publisher/udp.py b/ceilometer/publisher/udp.py new file mode 100644 index 000000000..63fc81faf --- /dev/null +++ b/ceilometer/publisher/udp.py @@ -0,0 +1,83 @@ +# -*- encoding: utf-8 -*- +# +# Copyright © 2013 eNovance +# +# Author: Julien Danjou +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +"""Publish a counter using an UDP mechanism +""" + +from ceilometer import publisher +from ceilometer.openstack.common import log +from ceilometer.openstack.common.gettextutils import _ +from oslo.config import cfg +import msgpack +import socket + +LOG = log.getLogger(__name__) + +UDP_PUBLISH_GROUP = cfg.OptGroup(name='publisher_udp', + title='Options for UDP publisher') + +UDP_PUBLISH_OPTS = [ + cfg.StrOpt('host', + default="localhost", + help='The host target to publish metering records to.', + ), + cfg.IntOpt('port', + default=4952, + help='The port to send UDP meters to.', + ), +] + + +def register_opts(config): + """Register the options for publishing UDP messages. + """ + config.register_group(UDP_PUBLISH_GROUP) + config.register_opts(UDP_PUBLISH_OPTS, + group=UDP_PUBLISH_GROUP) + + +register_opts(cfg.CONF) + + +class UDPPublisher(publisher.PublisherBase): + + def __init__(self): + self.socket = socket.socket(socket.AF_INET, + socket.SOCK_DGRAM) + + def publish_counters(self, context, counters, source): + """Send a metering message for publishing + + :param context: Execution context from the service or RPC call + :param counter: Counter from pipeline after transformation + :param source: counter source + """ + + for counter in counters: + msg = counter._asdict() + msg['source'] = source + LOG.debug(_("Publishing counter %s over UDP to %s:%d"), + msg, + cfg.CONF.publisher_udp.host, + cfg.CONF.publisher_udp.port) + try: + self.socket.sendto(msgpack.dumps(msg), + (cfg.CONF.publisher_udp.host, + cfg.CONF.publisher_udp.port)) + except Exception as e: + LOG.warn(_("Unable to send counter over UDP")) + LOG.exception(e) diff --git a/etc/ceilometer/ceilometer.conf.sample b/etc/ceilometer/ceilometer.conf.sample index ed8cdd410..7d58d7431 100644 --- a/etc/ceilometer/ceilometer.conf.sample +++ b/etc/ceilometer/ceilometer.conf.sample @@ -76,14 +76,6 @@ #disabled_central_pollsters= -# -# Options defined in ceilometer.collector.service -# - -# list of listener plugins to disable (list value) -#disabled_notification_listeners= - - # # Options defined in ceilometer.compute # @@ -523,6 +515,34 @@ #cinder_control_exchange=cinder +[publisher_udp] + +# +# Options defined in ceilometer.publisher.udp +# + +# The host target to publish metering records to. (string +# value) +#host=localhost + +# The port to send UDP meters to. (integer value) +#port=4952 + + +[publisher_meter] + +# +# Options defined in ceilometer.publisher.meter +# + +# the topic ceilometer uses for metering messages (string +# value) +#metering_topic=metering + +# Secret value for signing metering messages (string value) +#metering_secret=change this or be hacked + + [rpc_notifier2] # @@ -549,18 +569,21 @@ #password= -[publisher_meter] +[collector] # -# Options defined in ceilometer.publisher.meter +# Options defined in ceilometer.collector.service # -# the topic ceilometer uses for metering messages (string -# value) -#metering_topic=metering +# list of listener plugins to disable (list value) +#disabled_notification_listeners= -# Secret value for signing metering messages (string value) -#metering_secret=change this or be hacked +# address to bind the UDP socket todisabled if set to an empty +# string (string value) +#udp_address=0.0.0.0 + +# port to bind the UDP socket to (integer value) +#udp_port=4952 -# Total option count: 111 +# Total option count: 115 diff --git a/setup.cfg b/setup.cfg index e74f0614c..173a0ff23 100644 --- a/setup.cfg +++ b/setup.cfg @@ -84,10 +84,14 @@ ceilometer.transformer = ceilometer.publisher = meter_publisher = ceilometer.publisher.meter:MeterPublisher meter = ceilometer.publisher.meter:MeterPublisher + udp = ceilometer.publisher.udp:UDPPublisher paste.filter_factory = swift = ceilometer.objectstore.swift_middleware:filter_factory +console_scripts = + ceilometer-collector-udp = ceilometer.collector.service:udp_collector + [build_sphinx] all_files = 1 build-dir = doc/build diff --git a/tests/collector/test_manager.py b/tests/collector/test_manager.py index 9e455f916..a4e36ca39 100644 --- a/tests/collector/test_manager.py +++ b/tests/collector/test_manager.py @@ -19,6 +19,8 @@ """ from datetime import datetime +import msgpack +import socket from mock import patch from mock import MagicMock @@ -26,6 +28,7 @@ from oslo.config import cfg from stevedore import extension from stevedore.tests import manager as test_manager +from ceilometer import counter from ceilometer.publisher import meter from ceilometer.collector import service from ceilometer.storage import base @@ -83,17 +86,93 @@ TEST_NOTICE = { } -class TestCollectorService(tests_base.TestCase): +class TestCollector(tests_base.TestCase): + def setUp(self): + super(TestCollector, self).setUp() + cfg.CONF.set_override("database_connection", "log://") + + +class TestUDPCollectorService(TestCollector): + def _make_fake_socket(self, family, type): + udp_socket = self.mox.CreateMockAnything() + udp_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + udp_socket.bind((cfg.CONF.collector.udp_address, + cfg.CONF.collector.udp_port)) + + def stop_udp(anything): + # Make the loop stop + self.srv.stop() + + udp_socket.recvfrom(64 * 1024).WithSideEffects( + stop_udp).AndReturn( + (msgpack.dumps(self.counter), + ('127.0.0.1', 12345))) + + self.mox.ReplayAll() + + return udp_socket + + def setUp(self): + super(TestUDPCollectorService, self).setUp() + self.srv = service.UDPCollectorService() + self.counter = dict(counter.Counter( + name='foobar', + type='bad', + unit='F', + volume=1, + user_id='jd', + project_id='ceilometer', + resource_id='cat', + timestamp='NOW!', + resource_metadata={}, + )._asdict()) + + def test_udp_receive(self): + self.srv.storage_conn = self.mox.CreateMock(base.Connection) + self.counter['source'] = 'mysource' + self.counter['counter_name'] = self.counter['name'] + self.counter['counter_volume'] = self.counter['volume'] + self.counter['counter_type'] = self.counter['type'] + self.counter['counter_unit'] = self.counter['unit'] + self.srv.storage_conn.record_metering_data(self.counter) + self.mox.ReplayAll() + + with patch('socket.socket', self._make_fake_socket): + self.srv.start() + + @staticmethod + def _raise_error(): + raise Exception + + def test_udp_receive_bad_decoding(self): + with patch('socket.socket', self._make_fake_socket): + with patch('msgpack.loads', self._raise_error): + self.srv.start() + + def test_udp_receive_storage_error(self): + self.srv.storage_conn = self.mox.CreateMock(base.Connection) + self.counter['source'] = 'mysource' + self.counter['counter_name'] = self.counter['name'] + self.counter['counter_volume'] = self.counter['volume'] + self.counter['counter_type'] = self.counter['type'] + self.counter['counter_unit'] = self.counter['unit'] + self.srv.storage_conn.record_metering_data( + self.counter).AndRaise(IOError) + self.mox.ReplayAll() + + with patch('socket.socket', self._make_fake_socket): + self.srv.start() + + +class TestCollectorService(TestCollector): def setUp(self): super(TestCollectorService, self).setUp() self.srv = service.CollectorService('the-host', 'the-topic') self.ctx = None - #cfg.CONF.publisher_meter.metering_secret = 'not-so-secret' @patch('ceilometer.pipeline.setup_pipeline', MagicMock()) def test_init_host(self): - cfg.CONF.database_connection = 'log://localhost' # If we try to create a real RPC connection, init_host() never # returns. Mock it out so we can establish the manager # configuration. @@ -158,7 +237,6 @@ class TestCollectorService(tests_base.TestCase): self.mox.ReplayAll() self.srv.record_metering_data(self.ctx, msg) - self.mox.VerifyAll() def test_timestamp_tzinfo_conversion(self): msg = {'counter_name': 'test', @@ -180,7 +258,6 @@ class TestCollectorService(tests_base.TestCase): self.mox.ReplayAll() self.srv.record_metering_data(self.ctx, msg) - self.mox.VerifyAll() @patch('ceilometer.pipeline.setup_pipeline', MagicMock()) def test_process_notification(self): diff --git a/tests/publisher/test_udp.py b/tests/publisher/test_udp.py new file mode 100644 index 000000000..f4403b1a3 --- /dev/null +++ b/tests/publisher/test_udp.py @@ -0,0 +1,149 @@ +# -*- encoding: utf-8 -*- +# +# Copyright © 2013 eNovance +# +# Author: Julien Danjou +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +"""Tests for ceilometer/publisher/udp.py +""" + +import datetime +import functools +import mock +import msgpack +from oslo.config import cfg +import socket + +from ceilometer import counter +from ceilometer.publisher import udp +from ceilometer.tests import base + + +class TestUDPPublisher(base.TestCase): + + test_data = [ + counter.Counter( + name='test', + type=counter.TYPE_CUMULATIVE, + unit='', + volume=1, + user_id='test', + project_id='test', + resource_id='test_run_tasks', + timestamp=datetime.datetime.utcnow().isoformat(), + resource_metadata={'name': 'TestPublish'}, + ), + counter.Counter( + name='test', + type=counter.TYPE_CUMULATIVE, + unit='', + volume=1, + user_id='test', + project_id='test', + resource_id='test_run_tasks', + timestamp=datetime.datetime.utcnow().isoformat(), + resource_metadata={'name': 'TestPublish'}, + ), + counter.Counter( + name='test2', + type=counter.TYPE_CUMULATIVE, + unit='', + volume=1, + user_id='test', + project_id='test', + resource_id='test_run_tasks', + timestamp=datetime.datetime.utcnow().isoformat(), + resource_metadata={'name': 'TestPublish'}, + ), + counter.Counter( + name='test2', + type=counter.TYPE_CUMULATIVE, + unit='', + volume=1, + user_id='test', + project_id='test', + resource_id='test_run_tasks', + timestamp=datetime.datetime.utcnow().isoformat(), + resource_metadata={'name': 'TestPublish'}, + ), + counter.Counter( + name='test3', + type=counter.TYPE_CUMULATIVE, + unit='', + volume=1, + user_id='test', + project_id='test', + resource_id='test_run_tasks', + timestamp=datetime.datetime.utcnow().isoformat(), + resource_metadata={'name': 'TestPublish'}, + ), + ] + + def _make_fake_socket(self, published): + def _fake_socket_socket(family, type): + def record_data(msg, dest): + published.append((msg, dest)) + udp_socket = self.mox.CreateMockAnything() + udp_socket.sendto = record_data + self.mox.ReplayAll() + return udp_socket + return _fake_socket_socket + + COUNTER_SOURCE = 'testsource' + + def test_published(self): + self.data_sent = [] + with mock.patch('socket.socket', + self._make_fake_socket(self.data_sent)): + publisher = udp.UDPPublisher() + publisher.publish_counters(None, + self.test_data, + self.COUNTER_SOURCE) + + self.assertEqual(len(self.data_sent), 5) + + sent_counters = [] + + for data, dest in self.data_sent: + counter = msgpack.loads(data) + self.assertEqual(counter['source'], self.COUNTER_SOURCE) + # Remove source because our test Counters don't have it, so the + # comparison would fail later + del counter['source'] + sent_counters.append(counter) + + # Check destination + self.assertEqual(dest, (cfg.CONF.publisher_udp.host, + cfg.CONF.publisher_udp.port)) + + # Check that counters are equal + self.assertEqual(sorted(sent_counters), + sorted([dict(d._asdict()) for d in self.test_data])) + + @staticmethod + def _raise_ioerror(): + raise IOError + + def _make_broken_socket(self, family, type): + udp_socket = self.mox.CreateMockAnything() + udp_socket.sendto = self._raise_ioerror + self.mox.ReplayAll() + + def test_publish_error(self): + with mock.patch('socket.socket', + self._make_broken_socket): + publisher = udp.UDPPublisher() + publisher.publish_counters(None, + self.test_data, + self.COUNTER_SOURCE) diff --git a/tools/pip-requires b/tools/pip-requires index 93019b917..2691ba274 100644 --- a/tools/pip-requires +++ b/tools/pip-requires @@ -12,6 +12,7 @@ anyjson>=0.2.4 Flask==0.9 pecan>=0.2.0 stevedore>=0.7 +msgpack-python python-glanceclient python-novaclient>=2.6.10 python-keystoneclient>=0.2.3