Merge "Don't rely on oslomsg configuration options"

This commit is contained in:
Jenkins 2014-06-07 14:21:05 +00:00 committed by Gerrit Code Review
commit 4352a93f4f
3 changed files with 70 additions and 64 deletions

View File

@ -48,21 +48,17 @@ LOG = log.getLogger(__name__)
class CollectorService(os_service.Service): class CollectorService(os_service.Service):
"""Listener for the collector service.""" """Listener for the collector service."""
@staticmethod
def rpc_enabled():
# cfg.CONF opt from oslo.messaging.transport
return cfg.CONF.rpc_backend or cfg.CONF.transport_url
def start(self): def start(self):
"""Bind the UDP socket and handle incoming data.""" """Bind the UDP socket and handle incoming data."""
# ensure dispatcher is configured before starting other services # ensure dispatcher is configured before starting other services
self.dispatcher_manager = dispatcher.load_dispatcher_manager() self.dispatcher_manager = dispatcher.load_dispatcher_manager()
self.rpc_server = None
super(CollectorService, self).start() super(CollectorService, self).start()
if cfg.CONF.collector.udp_address: if cfg.CONF.collector.udp_address:
self.tg.add_thread(self.start_udp) self.tg.add_thread(self.start_udp)
if self.rpc_enabled(): if messaging.TRANSPORT is not None:
self.rpc_server = messaging.get_rpc_server( self.rpc_server = messaging.get_rpc_server(
cfg.CONF.publisher_rpc.metering_topic, self) cfg.CONF.publisher_rpc.metering_topic, self)
self.rpc_server.start() self.rpc_server.start()
@ -96,7 +92,7 @@ class CollectorService(os_service.Service):
def stop(self): def stop(self):
self.udp_run = False self.udp_run = False
if self.rpc_enabled(): if self.rpc_server:
self.rpc_server.stop() self.rpc_server.stop()
super(CollectorService, self).stop() super(CollectorService, self).stop()

View File

@ -62,7 +62,7 @@ class JsonPayloadSerializer(oslo.messaging.NoOpSerializer):
return jsonutils.to_primitive(entity, convert_instances=True) return jsonutils.to_primitive(entity, convert_instances=True)
def setup(url=None): def setup(url=None, optional=False):
"""Initialise the oslo.messaging layer.""" """Initialise the oslo.messaging layer."""
global TRANSPORT, NOTIFIER global TRANSPORT, NOTIFIER
@ -73,9 +73,17 @@ def setup(url=None):
if not TRANSPORT: if not TRANSPORT:
oslo.messaging.set_transport_defaults('ceilometer') oslo.messaging.set_transport_defaults('ceilometer')
TRANSPORT = oslo.messaging.get_transport(cfg.CONF, url, try:
aliases=_ALIASES) TRANSPORT = oslo.messaging.get_transport(cfg.CONF, url,
if not NOTIFIER: aliases=_ALIASES)
except oslo.messaging.InvalidTransportURL as e:
TRANSPORT = None
if not optional or e.url:
# NOTE(sileht): oslo.messaging is configured but unloadable
# so reraise the exception
raise
if not NOTIFIER and TRANSPORT:
serializer = RequestContextSerializer(JsonPayloadSerializer()) serializer = RequestContextSerializer(JsonPayloadSerializer())
NOTIFIER = oslo.messaging.Notifier(TRANSPORT, serializer=serializer) NOTIFIER = oslo.messaging.Notifier(TRANSPORT, serializer=serializer)
@ -83,10 +91,9 @@ def setup(url=None):
def cleanup(): def cleanup():
"""Cleanup the oslo.messaging layer.""" """Cleanup the oslo.messaging layer."""
global TRANSPORT, NOTIFIER global TRANSPORT, NOTIFIER
assert TRANSPORT is not None if TRANSPORT:
assert NOTIFIER is not None TRANSPORT.cleanup()
TRANSPORT.cleanup() TRANSPORT = NOTIFIER = None
TRANSPORT = NOTIFIER = None
def get_rpc_server(topic, endpoint): def get_rpc_server(topic, endpoint):

View File

@ -24,9 +24,11 @@ import oslo.messaging
from stevedore import extension from stevedore import extension
from ceilometer import collector from ceilometer import collector
from ceilometer import dispatcher
from ceilometer import messaging from ceilometer import messaging
from ceilometer.openstack.common import context from ceilometer.openstack.common import context
from ceilometer.openstack.common.fixture import config from ceilometer.openstack.common.fixture import config
from ceilometer.openstack.common.fixture import mockpatch
from ceilometer.openstack.common import timeutils from ceilometer.openstack.common import timeutils
from ceilometer.publisher import utils from ceilometer.publisher import utils
from ceilometer import sample from ceilometer import sample
@ -41,13 +43,14 @@ class FakeConnection():
class TestCollector(tests_base.BaseTestCase): class TestCollector(tests_base.BaseTestCase):
def setUp(self): def setUp(self):
super(TestCollector, self).setUp() super(TestCollector, self).setUp()
messaging.setup('fake://')
self.addCleanup(messaging.cleanup)
self.CONF = self.useFixture(config.Config()).conf self.CONF = self.useFixture(config.Config()).conf
self.CONF.set_override("connection", "log://", group='database') self.CONF.set_override("connection", "log://", group='database')
self.CONF.set_override('metering_secret', 'not-so-secret', self.CONF.set_override('metering_secret', 'not-so-secret',
group='publisher') group='publisher')
self.useFixture(oslo.messaging.conffixture.ConfFixture(self.CONF))
self._setup_messaging('fake://')
self.addCleanup(messaging.cleanup)
self.counter = sample.Sample( self.counter = sample.Sample(
name='foobar', name='foobar',
type='bad', type='bad',
@ -77,15 +80,28 @@ class TestCollector(tests_base.BaseTestCase):
self.srv = collector.CollectorService() self.srv = collector.CollectorService()
def _make_test_manager(self, plugin): self.useFixture(mockpatch.PatchObject(
return extension.ExtensionManager.make_test_instance([ self.srv.tg, 'add_thread',
extension.Extension( side_effect=self._dummy_thread_group_add_thread))
'test',
None, @staticmethod
None, def _dummy_thread_group_add_thread(method):
plugin, method()
),
def _setup_messaging(self, url):
messaging.cleanup()
self.CONF.set_override('rpc_backend', '')
messaging.setup(url, optional=True)
def _setup_fake_dispatcher(self):
plugin = mock.MagicMock()
fake_dispatcher = extension.ExtensionManager.make_test_instance([
extension.Extension('test', None, None, plugin,),
]) ])
self.useFixture(mockpatch.Patch(
'ceilometer.dispatcher.load_dispatcher_manager',
return_value=fake_dispatcher))
return plugin
def _make_fake_socket(self, sample): def _make_fake_socket(self, sample):
def recvfrom(size): def recvfrom(size):
@ -105,16 +121,15 @@ class TestCollector(tests_base.BaseTestCase):
conf.udp_port)) conf.udp_port))
def test_record_metering_data(self): def test_record_metering_data(self):
mock_dispatcher = mock.MagicMock() mock_dispatcher = self._setup_fake_dispatcher()
self.srv.dispatcher_manager = self._make_test_manager(mock_dispatcher) self.srv.dispatcher_manager = dispatcher.load_dispatcher_manager()
self.srv.record_metering_data(None, self.counter) self.srv.record_metering_data(None, self.counter)
mock_dispatcher.record_metering_data.assert_called_once_with( mock_dispatcher.record_metering_data.assert_called_once_with(
data=self.counter) data=self.counter)
def test_udp_receive(self): def test_udp_receive_base(self):
self.CONF.set_override('rpc_backend', '') self._setup_messaging('')
mock_dispatcher = mock.MagicMock() mock_dispatcher = self._setup_fake_dispatcher()
self.srv.dispatcher_manager = self._make_test_manager(mock_dispatcher)
self.counter['source'] = 'mysource' self.counter['source'] = 'mysource'
self.counter['counter_name'] = self.counter['name'] self.counter['counter_name'] = self.counter['name']
self.counter['counter_volume'] = self.counter['volume'] self.counter['counter_volume'] = self.counter['volume']
@ -122,8 +137,9 @@ class TestCollector(tests_base.BaseTestCase):
self.counter['counter_unit'] = self.counter['unit'] self.counter['counter_unit'] = self.counter['unit']
udp_socket = self._make_fake_socket(self.counter) udp_socket = self._make_fake_socket(self.counter)
with mock.patch('socket.socket', return_value=udp_socket): with mock.patch('socket.socket', return_value=udp_socket):
self.srv.start_udp() self.srv.start()
self._verify_udp_socket(udp_socket) self._verify_udp_socket(udp_socket)
@ -131,9 +147,8 @@ class TestCollector(tests_base.BaseTestCase):
self.counter) self.counter)
def test_udp_receive_storage_error(self): def test_udp_receive_storage_error(self):
self.CONF.set_override('rpc_backend', '') self._setup_messaging('')
mock_dispatcher = mock.MagicMock() mock_dispatcher = self._setup_fake_dispatcher()
self.srv.dispatcher_manager = self._make_test_manager(mock_dispatcher)
mock_dispatcher.record_metering_data.side_effect = self._raise_error mock_dispatcher.record_metering_data.side_effect = self._raise_error
self.counter['source'] = 'mysource' self.counter['source'] = 'mysource'
@ -144,7 +159,7 @@ class TestCollector(tests_base.BaseTestCase):
udp_socket = self._make_fake_socket(self.counter) udp_socket = self._make_fake_socket(self.counter)
with mock.patch('socket.socket', return_value=udp_socket): with mock.patch('socket.socket', return_value=udp_socket):
self.srv.start_udp() self.srv.start()
self._verify_udp_socket(udp_socket) self._verify_udp_socket(udp_socket)
@ -156,29 +171,22 @@ class TestCollector(tests_base.BaseTestCase):
raise Exception raise Exception
def test_udp_receive_bad_decoding(self): def test_udp_receive_bad_decoding(self):
self.CONF.set_override('rpc_backend', '') self._setup_messaging('')
udp_socket = self._make_fake_socket(self.counter) udp_socket = self._make_fake_socket(self.counter)
with mock.patch('socket.socket', return_value=udp_socket): with contextlib.nested(
with mock.patch('msgpack.loads', self._raise_error): mock.patch('socket.socket', return_value=udp_socket),
self.srv.start_udp() mock.patch('msgpack.loads', self._raise_error)):
self.srv.start()
self._verify_udp_socket(udp_socket) self._verify_udp_socket(udp_socket)
@staticmethod
def _dummy_thread_group_add_thread(method):
method()
@mock.patch.object(oslo.messaging.MessageHandlingServer, 'start') @mock.patch.object(oslo.messaging.MessageHandlingServer, 'start')
@mock.patch.object(collector.CollectorService, 'start_udp') @mock.patch.object(collector.CollectorService, 'start_udp')
def test_only_udp(self, udp_start, rpc_start): def test_only_udp(self, udp_start, rpc_start):
"""Check that only UDP is started if rpc_backend is empty.""" """Check that only UDP is started if messaging transport is unset."""
self.CONF.set_override('rpc_backend', '') self._setup_messaging('')
udp_socket = self._make_fake_socket(self.counter) udp_socket = self._make_fake_socket(self.counter)
with contextlib.nested( with mock.patch('socket.socket', return_value=udp_socket):
mock.patch.object(
self.srv.tg, 'add_thread',
side_effect=self._dummy_thread_group_add_thread),
mock.patch('socket.socket', return_value=udp_socket)):
self.srv.start() self.srv.start()
self.assertEqual(0, rpc_start.call_count) self.assertEqual(0, rpc_start.call_count)
self.assertEqual(1, udp_start.call_count) self.assertEqual(1, udp_start.call_count)
@ -188,22 +196,17 @@ class TestCollector(tests_base.BaseTestCase):
def test_only_rpc(self, udp_start, rpc_start): def test_only_rpc(self, udp_start, rpc_start):
"""Check that only RPC is started if udp_address is empty.""" """Check that only RPC is started if udp_address is empty."""
self.CONF.set_override('udp_address', '', group='collector') self.CONF.set_override('udp_address', '', group='collector')
with mock.patch.object( self.srv.start()
self.srv.tg, 'add_thread', self.assertEqual(1, rpc_start.call_count)
side_effect=self._dummy_thread_group_add_thread): self.assertEqual(0, udp_start.call_count)
self.srv.start()
self.assertEqual(1, rpc_start.call_count)
self.assertEqual(0, udp_start.call_count)
def test_udp_receive_valid_encoding(self): def test_udp_receive_valid_encoding(self):
self._setup_messaging('')
mock_dispatcher = self._setup_fake_dispatcher()
self.data_sent = [] self.data_sent = []
with mock.patch('socket.socket', with mock.patch('socket.socket',
return_value=self._make_fake_socket(self.utf8_msg)): return_value=self._make_fake_socket(self.utf8_msg)):
self.srv.rpc_server = mock.MagicMock() self.srv.start()
mock_dispatcher = mock.MagicMock()
self.srv.dispatcher_manager = \
self._make_test_manager(mock_dispatcher)
self.srv.start_udp()
self.assertTrue(utils.verify_signature( self.assertTrue(utils.verify_signature(
mock_dispatcher.method_calls[0][1][0], mock_dispatcher.method_calls[0][1][0],
"not-so-secret")) "not-so-secret"))