Add an UDP publisher and receiver

Blueprint: udp-publishing

Change-Id: Ia281ab1d2924c8126895d7475f48f9cc886f92d1
Signed-off-by: Julien Danjou <julien@danjou.info>
This commit is contained in:
Julien Danjou 2013-05-13 17:35:42 +02:00
parent 9d339a59c6
commit ae9ccbd227
7 changed files with 435 additions and 30 deletions

View File

@ -16,12 +16,19 @@
# License for the specific language governing permissions and limitations
# under the License.
import eventlet
from oslo.config import cfg
import msgpack
import socket
import sys
from ceilometer.publisher import meter as publisher_meter
from ceilometer import extension_manager
from ceilometer.service import prepare_service
from ceilometer.openstack.common import context
from ceilometer.openstack.common import gettextutils
from ceilometer.openstack.common import log
from ceilometer.openstack.common import service as os_service
from ceilometer.openstack.common.rpc import dispatcher as rpc_dispatcher
# Import rpc_notifier to register `notification_topics` flag so that
@ -40,24 +47,84 @@ OPTS = [
cfg.ListOpt('disabled_notification_listeners',
default=[],
help='list of listener plugins to disable',
),
deprecated_group="DEFAULT"),
cfg.StrOpt('udp_address',
default='0.0.0.0',
help='address to bind the UDP socket to'
'disabled if set to an empty string'),
cfg.IntOpt('udp_port',
default=4952,
help='port to bind the UDP socket to'),
]
cfg.CONF.register_opts(OPTS)
cfg.CONF.register_opts(OPTS, group="collector")
LOG = log.getLogger(__name__)
def get_storage_connection(conf):
storage.register_opts(conf)
storage_engine = storage.get_engine(conf)
return storage_engine.get_connection(conf)
class UDPCollectorService(os_service.Service):
"""UDP listener for the collector service."""
def __init__(self):
super(UDPCollectorService, self).__init__()
self.storage_conn = get_storage_connection(cfg.CONF)
def start(self):
"""Bind the UDP socket and handle incoming data."""
super(UDPCollectorService, self).start()
udp = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
udp.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
udp.bind((cfg.CONF.collector.udp_address,
cfg.CONF.collector.udp_port))
self.running = True
while self.running:
# NOTE(jd) Arbitrary limit of 64K because that ought to be
# enough for anybody.
data, source = udp.recvfrom(64 * 1024)
try:
counter = msgpack.loads(data)
except:
LOG.warn(_("UDP: Cannot decode data sent by %s"), str(source))
else:
try:
counter['counter_name'] = counter['name']
counter['counter_volume'] = counter['volume']
counter['counter_unit'] = counter['unit']
counter['counter_type'] = counter['type']
LOG.debug("UDP: Storing %s", str(counter))
self.storage_conn.record_metering_data(counter)
except Exception as err:
LOG.debug(_("UDP: Unable to store meter"))
LOG.exception(err)
def stop(self):
self.running = False
super(UDPCollectorService, self).stop()
def udp_collector():
# TODO(jd) move into prepare_service gettextutils and eventlet?
eventlet.monkey_patch()
gettextutils.install('ceilometer')
prepare_service(sys.argv)
os_service.launch(UDPCollectorService()).wait()
class CollectorService(service.PeriodicService):
COLLECTOR_NAMESPACE = 'ceilometer.collector'
def start(self):
super(CollectorService, self).start()
storage.register_opts(cfg.CONF)
self.storage_engine = storage.get_engine(cfg.CONF)
self.storage_conn = self.storage_engine.get_connection(cfg.CONF)
def __init__(self, host, topic, manager=None):
super(CollectorService, self).__init__(host, topic, manager)
self.storage_conn = get_storage_connection(cfg.CONF)
def initialize_service_hook(self, service):
'''Consumers must be declared before consume_thread start.'''
@ -76,7 +143,8 @@ class CollectorService(service.PeriodicService):
self.notification_manager = \
extension_manager.ActivatedExtensionManager(
namespace=self.COLLECTOR_NAMESPACE,
disabled_names=cfg.CONF.disabled_notification_listeners,
disabled_names=
cfg.CONF.collector.disabled_notification_listeners,
)
if not list(self.notification_manager):

View File

@ -0,0 +1,83 @@
# -*- encoding: utf-8 -*-
#
# Copyright © 2013 eNovance
#
# Author: Julien Danjou <julien@danjou.info>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Publish a counter using an UDP mechanism
"""
from ceilometer import publisher
from ceilometer.openstack.common import log
from ceilometer.openstack.common.gettextutils import _
from oslo.config import cfg
import msgpack
import socket
LOG = log.getLogger(__name__)
UDP_PUBLISH_GROUP = cfg.OptGroup(name='publisher_udp',
title='Options for UDP publisher')
UDP_PUBLISH_OPTS = [
cfg.StrOpt('host',
default="localhost",
help='The host target to publish metering records to.',
),
cfg.IntOpt('port',
default=4952,
help='The port to send UDP meters to.',
),
]
def register_opts(config):
"""Register the options for publishing UDP messages.
"""
config.register_group(UDP_PUBLISH_GROUP)
config.register_opts(UDP_PUBLISH_OPTS,
group=UDP_PUBLISH_GROUP)
register_opts(cfg.CONF)
class UDPPublisher(publisher.PublisherBase):
def __init__(self):
self.socket = socket.socket(socket.AF_INET,
socket.SOCK_DGRAM)
def publish_counters(self, context, counters, source):
"""Send a metering message for publishing
:param context: Execution context from the service or RPC call
:param counter: Counter from pipeline after transformation
:param source: counter source
"""
for counter in counters:
msg = counter._asdict()
msg['source'] = source
LOG.debug(_("Publishing counter %s over UDP to %s:%d"),
msg,
cfg.CONF.publisher_udp.host,
cfg.CONF.publisher_udp.port)
try:
self.socket.sendto(msgpack.dumps(msg),
(cfg.CONF.publisher_udp.host,
cfg.CONF.publisher_udp.port))
except Exception as e:
LOG.warn(_("Unable to send counter over UDP"))
LOG.exception(e)

View File

@ -76,14 +76,6 @@
#disabled_central_pollsters=
#
# Options defined in ceilometer.collector.service
#
# list of listener plugins to disable (list value)
#disabled_notification_listeners=
#
# Options defined in ceilometer.compute
#
@ -523,6 +515,34 @@
#cinder_control_exchange=cinder
[publisher_udp]
#
# Options defined in ceilometer.publisher.udp
#
# The host target to publish metering records to. (string
# value)
#host=localhost
# The port to send UDP meters to. (integer value)
#port=4952
[publisher_meter]
#
# Options defined in ceilometer.publisher.meter
#
# the topic ceilometer uses for metering messages (string
# value)
#metering_topic=metering
# Secret value for signing metering messages (string value)
#metering_secret=change this or be hacked
[rpc_notifier2]
#
@ -549,18 +569,21 @@
#password=<None>
[publisher_meter]
[collector]
#
# Options defined in ceilometer.publisher.meter
# Options defined in ceilometer.collector.service
#
# the topic ceilometer uses for metering messages (string
# value)
#metering_topic=metering
# list of listener plugins to disable (list value)
#disabled_notification_listeners=
# Secret value for signing metering messages (string value)
#metering_secret=change this or be hacked
# address to bind the UDP socket todisabled if set to an empty
# string (string value)
#udp_address=0.0.0.0
# port to bind the UDP socket to (integer value)
#udp_port=4952
# Total option count: 111
# Total option count: 115

View File

@ -84,10 +84,14 @@ ceilometer.transformer =
ceilometer.publisher =
meter_publisher = ceilometer.publisher.meter:MeterPublisher
meter = ceilometer.publisher.meter:MeterPublisher
udp = ceilometer.publisher.udp:UDPPublisher
paste.filter_factory =
swift = ceilometer.objectstore.swift_middleware:filter_factory
console_scripts =
ceilometer-collector-udp = ceilometer.collector.service:udp_collector
[build_sphinx]
all_files = 1
build-dir = doc/build

View File

@ -19,6 +19,8 @@
"""
from datetime import datetime
import msgpack
import socket
from mock import patch
from mock import MagicMock
@ -26,6 +28,7 @@ from oslo.config import cfg
from stevedore import extension
from stevedore.tests import manager as test_manager
from ceilometer import counter
from ceilometer.publisher import meter
from ceilometer.collector import service
from ceilometer.storage import base
@ -83,17 +86,93 @@ TEST_NOTICE = {
}
class TestCollectorService(tests_base.TestCase):
class TestCollector(tests_base.TestCase):
def setUp(self):
super(TestCollector, self).setUp()
cfg.CONF.set_override("database_connection", "log://")
class TestUDPCollectorService(TestCollector):
def _make_fake_socket(self, family, type):
udp_socket = self.mox.CreateMockAnything()
udp_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
udp_socket.bind((cfg.CONF.collector.udp_address,
cfg.CONF.collector.udp_port))
def stop_udp(anything):
# Make the loop stop
self.srv.stop()
udp_socket.recvfrom(64 * 1024).WithSideEffects(
stop_udp).AndReturn(
(msgpack.dumps(self.counter),
('127.0.0.1', 12345)))
self.mox.ReplayAll()
return udp_socket
def setUp(self):
super(TestUDPCollectorService, self).setUp()
self.srv = service.UDPCollectorService()
self.counter = dict(counter.Counter(
name='foobar',
type='bad',
unit='F',
volume=1,
user_id='jd',
project_id='ceilometer',
resource_id='cat',
timestamp='NOW!',
resource_metadata={},
)._asdict())
def test_udp_receive(self):
self.srv.storage_conn = self.mox.CreateMock(base.Connection)
self.counter['source'] = 'mysource'
self.counter['counter_name'] = self.counter['name']
self.counter['counter_volume'] = self.counter['volume']
self.counter['counter_type'] = self.counter['type']
self.counter['counter_unit'] = self.counter['unit']
self.srv.storage_conn.record_metering_data(self.counter)
self.mox.ReplayAll()
with patch('socket.socket', self._make_fake_socket):
self.srv.start()
@staticmethod
def _raise_error():
raise Exception
def test_udp_receive_bad_decoding(self):
with patch('socket.socket', self._make_fake_socket):
with patch('msgpack.loads', self._raise_error):
self.srv.start()
def test_udp_receive_storage_error(self):
self.srv.storage_conn = self.mox.CreateMock(base.Connection)
self.counter['source'] = 'mysource'
self.counter['counter_name'] = self.counter['name']
self.counter['counter_volume'] = self.counter['volume']
self.counter['counter_type'] = self.counter['type']
self.counter['counter_unit'] = self.counter['unit']
self.srv.storage_conn.record_metering_data(
self.counter).AndRaise(IOError)
self.mox.ReplayAll()
with patch('socket.socket', self._make_fake_socket):
self.srv.start()
class TestCollectorService(TestCollector):
def setUp(self):
super(TestCollectorService, self).setUp()
self.srv = service.CollectorService('the-host', 'the-topic')
self.ctx = None
#cfg.CONF.publisher_meter.metering_secret = 'not-so-secret'
@patch('ceilometer.pipeline.setup_pipeline', MagicMock())
def test_init_host(self):
cfg.CONF.database_connection = 'log://localhost'
# If we try to create a real RPC connection, init_host() never
# returns. Mock it out so we can establish the manager
# configuration.
@ -158,7 +237,6 @@ class TestCollectorService(tests_base.TestCase):
self.mox.ReplayAll()
self.srv.record_metering_data(self.ctx, msg)
self.mox.VerifyAll()
def test_timestamp_tzinfo_conversion(self):
msg = {'counter_name': 'test',
@ -180,7 +258,6 @@ class TestCollectorService(tests_base.TestCase):
self.mox.ReplayAll()
self.srv.record_metering_data(self.ctx, msg)
self.mox.VerifyAll()
@patch('ceilometer.pipeline.setup_pipeline', MagicMock())
def test_process_notification(self):

149
tests/publisher/test_udp.py Normal file
View File

@ -0,0 +1,149 @@
# -*- encoding: utf-8 -*-
#
# Copyright © 2013 eNovance
#
# Author: Julien Danjou <julien@danjou.info>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Tests for ceilometer/publisher/udp.py
"""
import datetime
import functools
import mock
import msgpack
from oslo.config import cfg
import socket
from ceilometer import counter
from ceilometer.publisher import udp
from ceilometer.tests import base
class TestUDPPublisher(base.TestCase):
test_data = [
counter.Counter(
name='test',
type=counter.TYPE_CUMULATIVE,
unit='',
volume=1,
user_id='test',
project_id='test',
resource_id='test_run_tasks',
timestamp=datetime.datetime.utcnow().isoformat(),
resource_metadata={'name': 'TestPublish'},
),
counter.Counter(
name='test',
type=counter.TYPE_CUMULATIVE,
unit='',
volume=1,
user_id='test',
project_id='test',
resource_id='test_run_tasks',
timestamp=datetime.datetime.utcnow().isoformat(),
resource_metadata={'name': 'TestPublish'},
),
counter.Counter(
name='test2',
type=counter.TYPE_CUMULATIVE,
unit='',
volume=1,
user_id='test',
project_id='test',
resource_id='test_run_tasks',
timestamp=datetime.datetime.utcnow().isoformat(),
resource_metadata={'name': 'TestPublish'},
),
counter.Counter(
name='test2',
type=counter.TYPE_CUMULATIVE,
unit='',
volume=1,
user_id='test',
project_id='test',
resource_id='test_run_tasks',
timestamp=datetime.datetime.utcnow().isoformat(),
resource_metadata={'name': 'TestPublish'},
),
counter.Counter(
name='test3',
type=counter.TYPE_CUMULATIVE,
unit='',
volume=1,
user_id='test',
project_id='test',
resource_id='test_run_tasks',
timestamp=datetime.datetime.utcnow().isoformat(),
resource_metadata={'name': 'TestPublish'},
),
]
def _make_fake_socket(self, published):
def _fake_socket_socket(family, type):
def record_data(msg, dest):
published.append((msg, dest))
udp_socket = self.mox.CreateMockAnything()
udp_socket.sendto = record_data
self.mox.ReplayAll()
return udp_socket
return _fake_socket_socket
COUNTER_SOURCE = 'testsource'
def test_published(self):
self.data_sent = []
with mock.patch('socket.socket',
self._make_fake_socket(self.data_sent)):
publisher = udp.UDPPublisher()
publisher.publish_counters(None,
self.test_data,
self.COUNTER_SOURCE)
self.assertEqual(len(self.data_sent), 5)
sent_counters = []
for data, dest in self.data_sent:
counter = msgpack.loads(data)
self.assertEqual(counter['source'], self.COUNTER_SOURCE)
# Remove source because our test Counters don't have it, so the
# comparison would fail later
del counter['source']
sent_counters.append(counter)
# Check destination
self.assertEqual(dest, (cfg.CONF.publisher_udp.host,
cfg.CONF.publisher_udp.port))
# Check that counters are equal
self.assertEqual(sorted(sent_counters),
sorted([dict(d._asdict()) for d in self.test_data]))
@staticmethod
def _raise_ioerror():
raise IOError
def _make_broken_socket(self, family, type):
udp_socket = self.mox.CreateMockAnything()
udp_socket.sendto = self._raise_ioerror
self.mox.ReplayAll()
def test_publish_error(self):
with mock.patch('socket.socket',
self._make_broken_socket):
publisher = udp.UDPPublisher()
publisher.publish_counters(None,
self.test_data,
self.COUNTER_SOURCE)

View File

@ -12,6 +12,7 @@ anyjson>=0.2.4
Flask==0.9
pecan>=0.2.0
stevedore>=0.7
msgpack-python
python-glanceclient
python-novaclient>=2.6.10
python-keystoneclient>=0.2.3