Merge "collector: Allows to requeue a sample"
This commit is contained in:
commit
9ce927716a
@ -25,6 +25,7 @@ from oslo.utils import units
|
||||
from ceilometer import dispatcher
|
||||
from ceilometer import messaging
|
||||
from ceilometer.openstack.common.gettextutils import _
|
||||
from ceilometer.openstack.common.gettextutils import _LE
|
||||
from ceilometer.openstack.common import log
|
||||
from ceilometer.openstack.common import service as os_service
|
||||
|
||||
@ -36,6 +37,11 @@ OPTS = [
|
||||
cfg.IntOpt('udp_port',
|
||||
default=4952,
|
||||
help='Port to which the UDP socket is bound.'),
|
||||
cfg.BoolOpt('requeue_sample_on_dispatcher_error',
|
||||
default=False,
|
||||
help='Requeue the sample on the collector sample queue '
|
||||
'when the collector fails to dispatch it. This is only valid '
|
||||
'if the sample come from the notifier publisher'),
|
||||
]
|
||||
|
||||
cfg.CONF.register_opts(OPTS, group="collector")
|
||||
@ -61,6 +67,7 @@ class CollectorService(os_service.Service):
|
||||
if cfg.CONF.collector.udp_address:
|
||||
self.tg.add_thread(self.start_udp)
|
||||
|
||||
allow_requeue = cfg.CONF.collector.requeue_sample_on_dispatcher_error
|
||||
transport = messaging.get_transport(optional=True)
|
||||
if transport:
|
||||
self.rpc_server = messaging.get_rpc_server(
|
||||
@ -69,7 +76,8 @@ class CollectorService(os_service.Service):
|
||||
target = oslo.messaging.Target(
|
||||
topic=cfg.CONF.publisher_notifier.metering_topic)
|
||||
self.notification_server = messaging.get_notification_listener(
|
||||
transport, [target], [self])
|
||||
transport, [target], [self],
|
||||
allow_requeue=allow_requeue)
|
||||
|
||||
self.rpc_server.start()
|
||||
self.notification_server.start()
|
||||
@ -116,8 +124,15 @@ class CollectorService(os_service.Service):
|
||||
bus, this method receives it.
|
||||
|
||||
"""
|
||||
self.dispatcher_manager.map_method('record_metering_data',
|
||||
data=payload)
|
||||
try:
|
||||
self.dispatcher_manager.map_method('record_metering_data',
|
||||
data=payload)
|
||||
except Exception:
|
||||
if cfg.CONF.collector.requeue_sample_on_dispatcher_error:
|
||||
LOG.exception(_LE("Dispatcher failed to handle the sample, "
|
||||
"requeue it."))
|
||||
return oslo.messaging.NotificationResult.REQUEUE
|
||||
raise
|
||||
|
||||
def record_metering_data(self, context, data):
|
||||
"""RPC endpoint for messages we send to ourselves.
|
||||
|
@ -112,10 +112,12 @@ def get_rpc_client(transport, **kwargs):
|
||||
serializer=serializer)
|
||||
|
||||
|
||||
def get_notification_listener(transport, targets, endpoints):
|
||||
def get_notification_listener(transport, targets, endpoints,
|
||||
allow_requeue=False):
|
||||
"""Return a configured oslo.messaging notification listener."""
|
||||
return oslo.messaging.get_notification_listener(
|
||||
transport, targets, endpoints, executor='eventlet')
|
||||
transport, targets, endpoints, executor='eventlet',
|
||||
allow_requeue=allow_requeue)
|
||||
|
||||
|
||||
def get_notifier(transport, publisher_id):
|
||||
|
@ -34,6 +34,10 @@ from ceilometer import sample
|
||||
from ceilometer.tests import base as tests_base
|
||||
|
||||
|
||||
class FakeException(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class FakeConnection():
|
||||
def create_worker(self, topic, proxy, pool_name):
|
||||
pass
|
||||
@ -227,3 +231,26 @@ class TestCollector(tests_base.BaseTestCase):
|
||||
self.srv.rpc_server.wait()
|
||||
mylog.info.assert_called_once_with(
|
||||
'metering data test for test_run_tasks: 1')
|
||||
|
||||
@mock.patch.object(oslo.messaging.MessageHandlingServer, 'start')
|
||||
@mock.patch.object(collector.CollectorService, 'start_udp')
|
||||
def test_collector_requeue(self, udp_start, rpc_start):
|
||||
self.CONF.set_override('requeue_sample_on_dispatcher_error', True,
|
||||
group='collector')
|
||||
self.srv.start()
|
||||
with mock.patch.object(self.srv.dispatcher_manager, 'map_method',
|
||||
side_effect=Exception('boom')):
|
||||
ret = self.srv.sample({}, 'pub_id', 'event', {}, {})
|
||||
self.assertEqual(oslo.messaging.NotificationResult.REQUEUE,
|
||||
ret)
|
||||
|
||||
@mock.patch.object(oslo.messaging.MessageHandlingServer, 'start')
|
||||
@mock.patch.object(collector.CollectorService, 'start_udp')
|
||||
def test_collector_no_requeue(self, udp_start, rpc_start):
|
||||
self.CONF.set_override('requeue_sample_on_dispatcher_error', False,
|
||||
group='collector')
|
||||
self.srv.start()
|
||||
with mock.patch.object(self.srv.dispatcher_manager, 'map_method',
|
||||
side_effect=FakeException('boom')):
|
||||
self.assertRaises(FakeException, self.srv.sample, {}, 'pub_id',
|
||||
'event', {}, {})
|
||||
|
@ -208,6 +208,20 @@ evaluation_service singleton Driver to use for alarm evaluation servi
|
||||
====================== ============== ====================================================================================
|
||||
|
||||
|
||||
Collector
|
||||
=========
|
||||
|
||||
The following options in the [collector] configuration section affect the collector service
|
||||
|
||||
===================================== ====================================== ==============================================================
|
||||
Parameter Default Note
|
||||
===================================== ====================================== ==============================================================
|
||||
requeue_sample_on_dispatcher_error False Requeue the sample on the collector sample queue when the
|
||||
collector fails to dispatch it. This option is only valid if
|
||||
the sample comes from the notifier publisher
|
||||
===================================== ====================================== ==============================================================
|
||||
|
||||
|
||||
|
||||
General options
|
||||
===============
|
||||
|
Loading…
x
Reference in New Issue
Block a user