diff --git a/ceilometer/collector/service.py b/ceilometer/collector/service.py index ebf67fedf..2522a556d 100644 --- a/ceilometer/collector/service.py +++ b/ceilometer/collector/service.py @@ -39,24 +39,33 @@ OPTS = [ ] cfg.CONF.register_opts(OPTS, group="collector") +cfg.CONF.import_opt('rpc_backend', 'ceilometer.openstack.common.rpc') + LOG = log.getLogger(__name__) -class UDPCollectorService(service.DispatchedService, os_service.Service): - """UDP listener for the collector service.""" +class CollectorService(service.DispatchedService, rpc_service.Service): + """Listener for the collector service.""" def start(self): """Bind the UDP socket and handle incoming data.""" - super(UDPCollectorService, self).start() + if cfg.CONF.collector.udp_address: + self.tg.add_thread(self.start_udp) + if cfg.CONF.rpc_backend: + super(CollectorService, self).start() + if not cfg.CONF.collector.udp_address: + # Add a dummy thread to have wait() working + self.tg.add_timer(604800, lambda: None) + def start_udp(self): udp = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) udp.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) udp.bind((cfg.CONF.collector.udp_address, cfg.CONF.collector.udp_port)) - self.running = True - while self.running: + self.udp_run = True + while self.udp_run: # NOTE(jd) Arbitrary limit of 64K because that ought to be # enough for anybody. data, source = udp.recvfrom(64 * 1024) @@ -78,21 +87,8 @@ class UDPCollectorService(service.DispatchedService, os_service.Service): LOG.exception(_("UDP: Unable to store meter")) def stop(self): - self.running = False - super(UDPCollectorService, self).stop() - - -def udp_collector(): - service.prepare_service() - os_service.launch(UDPCollectorService()).wait() - - -class CollectorService(service.DispatchedService, rpc_service.Service): - - def start(self): - super(CollectorService, self).start() - # Add a dummy thread to have wait() working - self.tg.add_timer(604800, lambda: None) + self.udp_run = False + super(CollectorService, self).stop() def initialize_service_hook(self, service): '''Consumers must be declared before consume_thread start.''' diff --git a/ceilometer/tests/collector/test_service.py b/ceilometer/tests/collector/test_service.py index 191e0f37c..9f5bc49fa 100644 --- a/ceilometer/tests/collector/test_service.py +++ b/ceilometer/tests/collector/test_service.py @@ -37,9 +37,19 @@ class TestCollector(tests_base.BaseTestCase): super(TestCollector, self).setUp() self.CONF = self.useFixture(config.Config()).conf self.CONF.set_override("connection", "log://", group='database') + self.srv = service.CollectorService('the-host', 'the-topic') + self.counter = sample.Sample( + name='foobar', + type='bad', + unit='F', + volume=1, + user_id='jd', + project_id='ceilometer', + resource_id='cat', + timestamp='NOW!', + resource_metadata={}, + ).as_dict() - -class TestUDPCollectorService(TestCollector): def _make_fake_socket(self): def recvfrom(size): # Make the loop stop @@ -57,21 +67,6 @@ class TestUDPCollectorService(TestCollector): udp_socket.bind.assert_called_once_with((conf.udp_address, conf.udp_port)) - def setUp(self): - super(TestUDPCollectorService, self).setUp() - self.srv = service.UDPCollectorService() - self.counter = sample.Sample( - name='foobar', - type='bad', - unit='F', - volume=1, - user_id='jd', - project_id='ceilometer', - resource_id='cat', - timestamp='NOW!', - resource_metadata={}, - ).as_dict() - def test_udp_receive(self): mock_dispatcher = mock.MagicMock() self.srv.dispatcher_manager = test_manager.TestExtensionManager( @@ -89,7 +84,7 @@ class TestUDPCollectorService(TestCollector): udp_socket = self._make_fake_socket() with patch('socket.socket', return_value=udp_socket): - self.srv.start() + self.srv.start_udp() self._verify_udp_socket(udp_socket) @@ -115,7 +110,7 @@ class TestUDPCollectorService(TestCollector): udp_socket = self._make_fake_socket() with patch('socket.socket', return_value=udp_socket): - self.srv.start() + self.srv.start_udp() self._verify_udp_socket(udp_socket) @@ -130,17 +125,10 @@ class TestUDPCollectorService(TestCollector): udp_socket = self._make_fake_socket() with patch('socket.socket', return_value=udp_socket): with patch('msgpack.loads', self._raise_error): - self.srv.start() + self.srv.start_udp() self._verify_udp_socket(udp_socket) - -class TestCollectorService(TestCollector): - - def setUp(self): - super(TestCollectorService, self).setUp() - self.srv = service.CollectorService('the-host', 'the-topic') - @patch('ceilometer.pipeline.setup_pipeline', mock.MagicMock()) def test_init_host(self): # If we try to create a real RPC connection, init_host() never @@ -148,3 +136,16 @@ class TestCollectorService(TestCollector): # configuration. with patch('ceilometer.openstack.common.rpc.create_connection'): self.srv.start() + + def test_only_udp(self): + """Check that only UDP is started if rpc_backend is empty.""" + self.CONF.set_override('rpc_backend', '') + udp_socket = self._make_fake_socket() + with patch('socket.socket', return_value=udp_socket): + self.srv.start() + + def test_only_rpc(self): + """Check that only RPC is started if udp_address is empty.""" + self.CONF.set_override('udp_address', '', group='collector') + with patch('ceilometer.openstack.common.rpc.create_connection'): + self.srv.start() diff --git a/setup.cfg b/setup.cfg index 9d4d18a8a..8e0dc0e4e 100644 --- a/setup.cfg +++ b/setup.cfg @@ -123,7 +123,6 @@ console_scripts = ceilometer-dbsync = ceilometer.storage:dbsync ceilometer-expirer = ceilometer.storage:expirer ceilometer-collector = ceilometer.collector.service:collector - ceilometer-collector-udp = ceilometer.collector.service:udp_collector ceilometer-alarm-evaluator = ceilometer.alarm.service:alarm_evaluator ceilometer-alarm-notifier = ceilometer.alarm.service:alarm_notifier