collector-udp: use dispatcher rather than storage

Change-Id: I33c2790441c951e9e86fd21407b699df0f87c01a
Fixes-Bug: #1204517
This commit is contained in:
Julien Danjou 2013-09-17 11:53:05 +02:00
parent 909dee171b
commit 94be547fc7
2 changed files with 65 additions and 51 deletions

View File

@ -32,7 +32,6 @@ from ceilometer.openstack.common.rpc import service as rpc_service
from ceilometer.openstack.common import timeutils from ceilometer.openstack.common import timeutils
from ceilometer import pipeline from ceilometer import pipeline
from ceilometer import storage
from ceilometer.storage import models from ceilometer.storage import models
from ceilometer import transformer from ceilometer import transformer
@ -60,12 +59,26 @@ cfg.CONF.register_opts(OPTS, group="collector")
LOG = log.getLogger(__name__) LOG = log.getLogger(__name__)
class UDPCollectorService(os_service.Service): class CollectorBase(object):
"""UDP listener for the collector service."""
def __init__(self): DISPATCHER_NAMESPACE = 'ceilometer.dispatcher'
super(UDPCollectorService, self).__init__()
self.storage_conn = storage.get_connection(cfg.CONF) def __init__(self, *args, **kwargs):
super(CollectorBase, self).__init__(*args, **kwargs)
LOG.debug('loading dispatchers from %s',
self.DISPATCHER_NAMESPACE)
self.dispatcher_manager = named.NamedExtensionManager(
namespace=self.DISPATCHER_NAMESPACE,
names=cfg.CONF.collector.dispatcher,
invoke_on_load=True,
invoke_args=[cfg.CONF])
if not list(self.dispatcher_manager):
LOG.warning('Failed to load any dispatchers for %s',
self.DISPATCHER_NAMESPACE)
class UDPCollectorService(CollectorBase, os_service.Service):
"""UDP listener for the collector service."""
def start(self): def start(self):
"""Bind the UDP socket and handle incoming data.""" """Bind the UDP socket and handle incoming data."""
@ -82,20 +95,21 @@ class UDPCollectorService(os_service.Service):
# enough for anybody. # enough for anybody.
data, source = udp.recvfrom(64 * 1024) data, source = udp.recvfrom(64 * 1024)
try: try:
counter = msgpack.loads(data) sample = msgpack.loads(data)
except Exception: except Exception:
LOG.warn(_("UDP: Cannot decode data sent by %s"), str(source)) LOG.warn(_("UDP: Cannot decode data sent by %s"), str(source))
else: else:
try: try:
counter['counter_name'] = counter['name'] sample['counter_name'] = sample['name']
counter['counter_volume'] = counter['volume'] sample['counter_volume'] = sample['volume']
counter['counter_unit'] = counter['unit'] sample['counter_unit'] = sample['unit']
counter['counter_type'] = counter['type'] sample['counter_type'] = sample['type']
LOG.debug("UDP: Storing %s", str(counter)) LOG.debug("UDP: Storing %s", str(sample))
self.storage_conn.record_metering_data(counter) self.dispatcher_manager.map(
except Exception as err: lambda ext, data: ext.obj.record_metering_data(data),
LOG.debug(_("UDP: Unable to store meter")) sample)
LOG.exception(err) except Exception:
LOG.exception(_("UDP: Unable to store meter"))
def stop(self): def stop(self):
self.running = False self.running = False
@ -116,10 +130,9 @@ class UnableToSaveEventException(Exception):
pass pass
class CollectorService(rpc_service.Service): class CollectorService(CollectorBase, rpc_service.Service):
COLLECTOR_NAMESPACE = 'ceilometer.collector' COLLECTOR_NAMESPACE = 'ceilometer.collector'
DISPATCHER_NAMESPACE = 'ceilometer.dispatcher'
def start(self): def start(self):
super(CollectorService, self).start() super(CollectorService, self).start()
@ -148,17 +161,6 @@ class CollectorService(rpc_service.Service):
self.COLLECTOR_NAMESPACE) self.COLLECTOR_NAMESPACE)
self.notification_manager.map(self._setup_subscription) self.notification_manager.map(self._setup_subscription)
LOG.debug('loading dispatchers from %s',
self.DISPATCHER_NAMESPACE)
self.dispatcher_manager = named.NamedExtensionManager(
namespace=self.DISPATCHER_NAMESPACE,
names=cfg.CONF.collector.dispatcher,
invoke_on_load=True,
invoke_args=[cfg.CONF])
if not list(self.dispatcher_manager):
LOG.warning('Failed to load any dispatchers for %s',
self.DISPATCHER_NAMESPACE)
# Set ourselves up as a separate worker for the metering data, # Set ourselves up as a separate worker for the metering data,
# since the default for service is to use create_consumer(). # since the default for service is to use create_consumer().
self.conn.create_worker( self.conn.create_worker(

View File

@ -32,7 +32,6 @@ from ceilometer.collector import service
from ceilometer.compute import notifications from ceilometer.compute import notifications
from ceilometer.openstack.common import timeutils from ceilometer.openstack.common import timeutils
from ceilometer import sample from ceilometer import sample
from ceilometer.storage import base
from ceilometer.storage import models from ceilometer.storage import models
from ceilometer.tests import base as tests_base from ceilometer.tests import base as tests_base
@ -128,23 +127,50 @@ class TestUDPCollectorService(TestCollector):
resource_metadata={}, resource_metadata={},
).as_dict() ).as_dict()
def test_service_has_storage_conn(self):
srv = service.UDPCollectorService()
self.assertIsNotNone(srv.storage_conn)
def test_udp_receive(self): def test_udp_receive(self):
self.srv.storage_conn = self.mox.CreateMock(base.Connection) mock_dispatcher = MagicMock()
self.srv.dispatcher_manager = test_manager.TestExtensionManager(
[extension.Extension('test',
None,
None,
mock_dispatcher
),
])
self.counter['source'] = 'mysource' self.counter['source'] = 'mysource'
self.counter['counter_name'] = self.counter['name'] self.counter['counter_name'] = self.counter['name']
self.counter['counter_volume'] = self.counter['volume'] self.counter['counter_volume'] = self.counter['volume']
self.counter['counter_type'] = self.counter['type'] self.counter['counter_type'] = self.counter['type']
self.counter['counter_unit'] = self.counter['unit'] self.counter['counter_unit'] = self.counter['unit']
self.srv.storage_conn.record_metering_data(self.counter)
self.mox.ReplayAll()
with patch('socket.socket', self._make_fake_socket): with patch('socket.socket', self._make_fake_socket):
self.srv.start() self.srv.start()
mock_dispatcher.record_metering_data.assert_called_once_with(
self.counter)
def test_udp_receive_storage_error(self):
mock_dispatcher = MagicMock()
self.srv.dispatcher_manager = test_manager.TestExtensionManager(
[extension.Extension('test',
None,
None,
mock_dispatcher
),
])
mock_dispatcher.record_metering_data.side_effect = self._raise_error
self.counter['source'] = 'mysource'
self.counter['counter_name'] = self.counter['name']
self.counter['counter_volume'] = self.counter['volume']
self.counter['counter_type'] = self.counter['type']
self.counter['counter_unit'] = self.counter['unit']
with patch('socket.socket', self._make_fake_socket):
self.srv.start()
mock_dispatcher.record_metering_data.assert_called_once_with(
self.counter)
@staticmethod @staticmethod
def _raise_error(): def _raise_error():
raise Exception raise Exception
@ -154,20 +180,6 @@ class TestUDPCollectorService(TestCollector):
with patch('msgpack.loads', self._raise_error): with patch('msgpack.loads', self._raise_error):
self.srv.start() self.srv.start()
def test_udp_receive_storage_error(self):
self.srv.storage_conn = self.mox.CreateMock(base.Connection)
self.counter['source'] = 'mysource'
self.counter['counter_name'] = self.counter['name']
self.counter['counter_volume'] = self.counter['volume']
self.counter['counter_type'] = self.counter['type']
self.counter['counter_unit'] = self.counter['unit']
self.srv.storage_conn.record_metering_data(
self.counter).AndRaise(IOError)
self.mox.ReplayAll()
with patch('socket.socket', self._make_fake_socket):
self.srv.start()
class MyException(Exception): class MyException(Exception):
pass pass