From 9df30aad188f36309ff817810d509f37e951426e Mon Sep 17 00:00:00 2001 From: Mehdi Abaakouk Date: Mon, 26 May 2014 15:37:36 +0200 Subject: [PATCH] Don't rely on oslomsg configuration options We shouldn't use internal oslo.messaging configuration options. This patch removes them when an oslo.messaging public API equivalent is possible. Closes-bug: #1323324 Change-Id: I0d901bfaf7dd67f0adb0ad9c1b4371994573a6c8 --- ceilometer/collector.py | 12 ++-- ceilometer/messaging.py | 23 ++++--- ceilometer/tests/test_collector.py | 99 +++++++++++++++--------------- 3 files changed, 70 insertions(+), 64 deletions(-) diff --git a/ceilometer/collector.py b/ceilometer/collector.py index be56fdf6c..3a15eeadb 100644 --- a/ceilometer/collector.py +++ b/ceilometer/collector.py @@ -48,21 +48,17 @@ LOG = log.getLogger(__name__) class CollectorService(os_service.Service): """Listener for the collector service.""" - - @staticmethod - def rpc_enabled(): - # cfg.CONF opt from oslo.messaging.transport - return cfg.CONF.rpc_backend or cfg.CONF.transport_url - def start(self): """Bind the UDP socket and handle incoming data.""" # ensure dispatcher is configured before starting other services self.dispatcher_manager = dispatcher.load_dispatcher_manager() + self.rpc_server = None super(CollectorService, self).start() + if cfg.CONF.collector.udp_address: self.tg.add_thread(self.start_udp) - if self.rpc_enabled(): + if messaging.TRANSPORT is not None: self.rpc_server = messaging.get_rpc_server( cfg.CONF.publisher_rpc.metering_topic, self) self.rpc_server.start() @@ -96,7 +92,7 @@ class CollectorService(os_service.Service): def stop(self): self.udp_run = False - if self.rpc_enabled(): + if self.rpc_server: self.rpc_server.stop() super(CollectorService, self).stop() diff --git a/ceilometer/messaging.py b/ceilometer/messaging.py index 24f94a7e2..7d92e03d3 100644 --- a/ceilometer/messaging.py +++ b/ceilometer/messaging.py @@ -62,7 +62,7 @@ class JsonPayloadSerializer(oslo.messaging.NoOpSerializer): return jsonutils.to_primitive(entity, convert_instances=True) -def setup(url=None): +def setup(url=None, optional=False): """Initialise the oslo.messaging layer.""" global TRANSPORT, NOTIFIER @@ -73,9 +73,17 @@ def setup(url=None): if not TRANSPORT: oslo.messaging.set_transport_defaults('ceilometer') - TRANSPORT = oslo.messaging.get_transport(cfg.CONF, url, - aliases=_ALIASES) - if not NOTIFIER: + try: + TRANSPORT = oslo.messaging.get_transport(cfg.CONF, url, + aliases=_ALIASES) + except oslo.messaging.InvalidTransportURL as e: + TRANSPORT = None + if not optional or e.url: + # NOTE(sileht): oslo.messaging is configured but unloadable + # so reraise the exception + raise + + if not NOTIFIER and TRANSPORT: serializer = RequestContextSerializer(JsonPayloadSerializer()) NOTIFIER = oslo.messaging.Notifier(TRANSPORT, serializer=serializer) @@ -83,10 +91,9 @@ def setup(url=None): def cleanup(): """Cleanup the oslo.messaging layer.""" global TRANSPORT, NOTIFIER - assert TRANSPORT is not None - assert NOTIFIER is not None - TRANSPORT.cleanup() - TRANSPORT = NOTIFIER = None + if TRANSPORT: + TRANSPORT.cleanup() + TRANSPORT = NOTIFIER = None def get_rpc_server(topic, endpoint): diff --git a/ceilometer/tests/test_collector.py b/ceilometer/tests/test_collector.py index 436c9b008..773d40335 100644 --- a/ceilometer/tests/test_collector.py +++ b/ceilometer/tests/test_collector.py @@ -24,9 +24,11 @@ import oslo.messaging from stevedore import extension from ceilometer import collector +from ceilometer import dispatcher from ceilometer import messaging from ceilometer.openstack.common import context from ceilometer.openstack.common.fixture import config +from ceilometer.openstack.common.fixture import mockpatch from ceilometer.openstack.common import timeutils from ceilometer.publisher import utils from ceilometer import sample @@ -41,13 +43,14 @@ class FakeConnection(): class TestCollector(tests_base.BaseTestCase): def setUp(self): super(TestCollector, self).setUp() - messaging.setup('fake://') - self.addCleanup(messaging.cleanup) - self.CONF = self.useFixture(config.Config()).conf self.CONF.set_override("connection", "log://", group='database') self.CONF.set_override('metering_secret', 'not-so-secret', group='publisher') + self.useFixture(oslo.messaging.conffixture.ConfFixture(self.CONF)) + self._setup_messaging('fake://') + self.addCleanup(messaging.cleanup) + self.counter = sample.Sample( name='foobar', type='bad', @@ -77,15 +80,28 @@ class TestCollector(tests_base.BaseTestCase): self.srv = collector.CollectorService() - def _make_test_manager(self, plugin): - return extension.ExtensionManager.make_test_instance([ - extension.Extension( - 'test', - None, - None, - plugin, - ), + self.useFixture(mockpatch.PatchObject( + self.srv.tg, 'add_thread', + side_effect=self._dummy_thread_group_add_thread)) + + @staticmethod + def _dummy_thread_group_add_thread(method): + method() + + def _setup_messaging(self, url): + messaging.cleanup() + self.CONF.set_override('rpc_backend', '') + messaging.setup(url, optional=True) + + def _setup_fake_dispatcher(self): + plugin = mock.MagicMock() + fake_dispatcher = extension.ExtensionManager.make_test_instance([ + extension.Extension('test', None, None, plugin,), ]) + self.useFixture(mockpatch.Patch( + 'ceilometer.dispatcher.load_dispatcher_manager', + return_value=fake_dispatcher)) + return plugin def _make_fake_socket(self, sample): def recvfrom(size): @@ -105,16 +121,15 @@ class TestCollector(tests_base.BaseTestCase): conf.udp_port)) def test_record_metering_data(self): - mock_dispatcher = mock.MagicMock() - self.srv.dispatcher_manager = self._make_test_manager(mock_dispatcher) + mock_dispatcher = self._setup_fake_dispatcher() + self.srv.dispatcher_manager = dispatcher.load_dispatcher_manager() self.srv.record_metering_data(None, self.counter) mock_dispatcher.record_metering_data.assert_called_once_with( data=self.counter) - def test_udp_receive(self): - self.CONF.set_override('rpc_backend', '') - mock_dispatcher = mock.MagicMock() - self.srv.dispatcher_manager = self._make_test_manager(mock_dispatcher) + def test_udp_receive_base(self): + self._setup_messaging('') + mock_dispatcher = self._setup_fake_dispatcher() self.counter['source'] = 'mysource' self.counter['counter_name'] = self.counter['name'] self.counter['counter_volume'] = self.counter['volume'] @@ -122,8 +137,9 @@ class TestCollector(tests_base.BaseTestCase): self.counter['counter_unit'] = self.counter['unit'] udp_socket = self._make_fake_socket(self.counter) + with mock.patch('socket.socket', return_value=udp_socket): - self.srv.start_udp() + self.srv.start() self._verify_udp_socket(udp_socket) @@ -131,9 +147,8 @@ class TestCollector(tests_base.BaseTestCase): self.counter) def test_udp_receive_storage_error(self): - self.CONF.set_override('rpc_backend', '') - mock_dispatcher = mock.MagicMock() - self.srv.dispatcher_manager = self._make_test_manager(mock_dispatcher) + self._setup_messaging('') + mock_dispatcher = self._setup_fake_dispatcher() mock_dispatcher.record_metering_data.side_effect = self._raise_error self.counter['source'] = 'mysource' @@ -144,7 +159,7 @@ class TestCollector(tests_base.BaseTestCase): udp_socket = self._make_fake_socket(self.counter) with mock.patch('socket.socket', return_value=udp_socket): - self.srv.start_udp() + self.srv.start() self._verify_udp_socket(udp_socket) @@ -156,29 +171,22 @@ class TestCollector(tests_base.BaseTestCase): raise Exception def test_udp_receive_bad_decoding(self): - self.CONF.set_override('rpc_backend', '') + self._setup_messaging('') udp_socket = self._make_fake_socket(self.counter) - with mock.patch('socket.socket', return_value=udp_socket): - with mock.patch('msgpack.loads', self._raise_error): - self.srv.start_udp() + with contextlib.nested( + mock.patch('socket.socket', return_value=udp_socket), + mock.patch('msgpack.loads', self._raise_error)): + self.srv.start() self._verify_udp_socket(udp_socket) - @staticmethod - def _dummy_thread_group_add_thread(method): - method() - @mock.patch.object(oslo.messaging.MessageHandlingServer, 'start') @mock.patch.object(collector.CollectorService, 'start_udp') def test_only_udp(self, udp_start, rpc_start): - """Check that only UDP is started if rpc_backend is empty.""" - self.CONF.set_override('rpc_backend', '') + """Check that only UDP is started if messaging transport is unset.""" + self._setup_messaging('') udp_socket = self._make_fake_socket(self.counter) - with contextlib.nested( - mock.patch.object( - self.srv.tg, 'add_thread', - side_effect=self._dummy_thread_group_add_thread), - mock.patch('socket.socket', return_value=udp_socket)): + with mock.patch('socket.socket', return_value=udp_socket): self.srv.start() self.assertEqual(0, rpc_start.call_count) self.assertEqual(1, udp_start.call_count) @@ -188,22 +196,17 @@ class TestCollector(tests_base.BaseTestCase): def test_only_rpc(self, udp_start, rpc_start): """Check that only RPC is started if udp_address is empty.""" self.CONF.set_override('udp_address', '', group='collector') - with mock.patch.object( - self.srv.tg, 'add_thread', - side_effect=self._dummy_thread_group_add_thread): - self.srv.start() - self.assertEqual(1, rpc_start.call_count) - self.assertEqual(0, udp_start.call_count) + self.srv.start() + self.assertEqual(1, rpc_start.call_count) + self.assertEqual(0, udp_start.call_count) def test_udp_receive_valid_encoding(self): + self._setup_messaging('') + mock_dispatcher = self._setup_fake_dispatcher() self.data_sent = [] with mock.patch('socket.socket', return_value=self._make_fake_socket(self.utf8_msg)): - self.srv.rpc_server = mock.MagicMock() - mock_dispatcher = mock.MagicMock() - self.srv.dispatcher_manager = \ - self._make_test_manager(mock_dispatcher) - self.srv.start_udp() + self.srv.start() self.assertTrue(utils.verify_signature( mock_dispatcher.method_calls[0][1][0], "not-so-secret"))