Merge "Merge ceilometer-collector and ceilometer-collector-udp"

This commit is contained in:
Jenkins 2013-11-27 12:57:07 +00:00 committed by Gerrit Code Review
commit 751fd75aa9
3 changed files with 44 additions and 48 deletions

View File

@ -39,24 +39,33 @@ OPTS = [
]
cfg.CONF.register_opts(OPTS, group="collector")
cfg.CONF.import_opt('rpc_backend', 'ceilometer.openstack.common.rpc')
LOG = log.getLogger(__name__)
class UDPCollectorService(service.DispatchedService, os_service.Service):
"""UDP listener for the collector service."""
class CollectorService(service.DispatchedService, rpc_service.Service):
"""Listener for the collector service."""
def start(self):
"""Bind the UDP socket and handle incoming data."""
super(UDPCollectorService, self).start()
if cfg.CONF.collector.udp_address:
self.tg.add_thread(self.start_udp)
if cfg.CONF.rpc_backend:
super(CollectorService, self).start()
if not cfg.CONF.collector.udp_address:
# Add a dummy thread to have wait() working
self.tg.add_timer(604800, lambda: None)
def start_udp(self):
udp = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
udp.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
udp.bind((cfg.CONF.collector.udp_address,
cfg.CONF.collector.udp_port))
self.running = True
while self.running:
self.udp_run = True
while self.udp_run:
# NOTE(jd) Arbitrary limit of 64K because that ought to be
# enough for anybody.
data, source = udp.recvfrom(64 * 1024)
@ -78,21 +87,8 @@ class UDPCollectorService(service.DispatchedService, os_service.Service):
LOG.exception(_("UDP: Unable to store meter"))
def stop(self):
self.running = False
super(UDPCollectorService, self).stop()
def udp_collector():
service.prepare_service()
os_service.launch(UDPCollectorService()).wait()
class CollectorService(service.DispatchedService, rpc_service.Service):
def start(self):
super(CollectorService, self).start()
# Add a dummy thread to have wait() working
self.tg.add_timer(604800, lambda: None)
self.udp_run = False
super(CollectorService, self).stop()
def initialize_service_hook(self, service):
'''Consumers must be declared before consume_thread start.'''

View File

@ -37,9 +37,19 @@ class TestCollector(tests_base.BaseTestCase):
super(TestCollector, self).setUp()
self.CONF = self.useFixture(config.Config()).conf
self.CONF.set_override("connection", "log://", group='database')
self.srv = service.CollectorService('the-host', 'the-topic')
self.counter = sample.Sample(
name='foobar',
type='bad',
unit='F',
volume=1,
user_id='jd',
project_id='ceilometer',
resource_id='cat',
timestamp='NOW!',
resource_metadata={},
).as_dict()
class TestUDPCollectorService(TestCollector):
def _make_fake_socket(self):
def recvfrom(size):
# Make the loop stop
@ -57,21 +67,6 @@ class TestUDPCollectorService(TestCollector):
udp_socket.bind.assert_called_once_with((conf.udp_address,
conf.udp_port))
def setUp(self):
super(TestUDPCollectorService, self).setUp()
self.srv = service.UDPCollectorService()
self.counter = sample.Sample(
name='foobar',
type='bad',
unit='F',
volume=1,
user_id='jd',
project_id='ceilometer',
resource_id='cat',
timestamp='NOW!',
resource_metadata={},
).as_dict()
def test_udp_receive(self):
mock_dispatcher = mock.MagicMock()
self.srv.dispatcher_manager = test_manager.TestExtensionManager(
@ -89,7 +84,7 @@ class TestUDPCollectorService(TestCollector):
udp_socket = self._make_fake_socket()
with patch('socket.socket', return_value=udp_socket):
self.srv.start()
self.srv.start_udp()
self._verify_udp_socket(udp_socket)
@ -115,7 +110,7 @@ class TestUDPCollectorService(TestCollector):
udp_socket = self._make_fake_socket()
with patch('socket.socket', return_value=udp_socket):
self.srv.start()
self.srv.start_udp()
self._verify_udp_socket(udp_socket)
@ -130,17 +125,10 @@ class TestUDPCollectorService(TestCollector):
udp_socket = self._make_fake_socket()
with patch('socket.socket', return_value=udp_socket):
with patch('msgpack.loads', self._raise_error):
self.srv.start()
self.srv.start_udp()
self._verify_udp_socket(udp_socket)
class TestCollectorService(TestCollector):
def setUp(self):
super(TestCollectorService, self).setUp()
self.srv = service.CollectorService('the-host', 'the-topic')
@patch('ceilometer.pipeline.setup_pipeline', mock.MagicMock())
def test_init_host(self):
# If we try to create a real RPC connection, init_host() never
@ -148,3 +136,16 @@ class TestCollectorService(TestCollector):
# configuration.
with patch('ceilometer.openstack.common.rpc.create_connection'):
self.srv.start()
def test_only_udp(self):
"""Check that only UDP is started if rpc_backend is empty."""
self.CONF.set_override('rpc_backend', '')
udp_socket = self._make_fake_socket()
with patch('socket.socket', return_value=udp_socket):
self.srv.start()
def test_only_rpc(self):
"""Check that only RPC is started if udp_address is empty."""
self.CONF.set_override('udp_address', '', group='collector')
with patch('ceilometer.openstack.common.rpc.create_connection'):
self.srv.start()

View File

@ -123,7 +123,6 @@ console_scripts =
ceilometer-dbsync = ceilometer.storage:dbsync
ceilometer-expirer = ceilometer.storage:expirer
ceilometer-collector = ceilometer.collector.service:collector
ceilometer-collector-udp = ceilometer.collector.service:udp_collector
ceilometer-alarm-evaluator = ceilometer.alarm.service:alarm_evaluator
ceilometer-alarm-notifier = ceilometer.alarm.service:alarm_notifier