diff --git a/ceilometer/alarm/rpc.py b/ceilometer/alarm/rpc.py index 4e177d0ef..78271e56d 100644 --- a/ceilometer/alarm/rpc.py +++ b/ceilometer/alarm/rpc.py @@ -42,8 +42,9 @@ LOG = log.getLogger(__name__) class RPCAlarmNotifier(object): def __init__(self): + transport = messaging.get_transport() self.client = messaging.get_rpc_client( - topic=cfg.CONF.alarm.notifier_rpc_topic, + transport, topic=cfg.CONF.alarm.notifier_rpc_topic, version="1.0") def notify(self, alarm, previous, reason, reason_data): @@ -68,8 +69,9 @@ class RPCAlarmNotifier(object): class RPCAlarmPartitionCoordination(object): def __init__(self): + transport = messaging.get_transport() self.client = messaging.get_rpc_client( - topic=cfg.CONF.alarm.partition_rpc_topic, + transport, topic=cfg.CONF.alarm.partition_rpc_topic, version="1.0") def presence(self, uuid, priority): diff --git a/ceilometer/alarm/service.py b/ceilometer/alarm/service.py index bceef8034..11d4011d7 100644 --- a/ceilometer/alarm/service.py +++ b/ceilometer/alarm/service.py @@ -139,8 +139,9 @@ class PartitionedAlarmService(AlarmService, os_service.Service): def __init__(self): super(PartitionedAlarmService, self).__init__() + transport = messaging.get_transport() self.rpc_server = messaging.get_rpc_server( - cfg.CONF.alarm.partition_rpc_topic, self) + transport, cfg.CONF.alarm.partition_rpc_topic, self) self._load_evaluators() self.api_client = None @@ -193,8 +194,9 @@ class AlarmNotifierService(os_service.Service): def __init__(self): super(AlarmNotifierService, self).__init__() + transport = messaging.get_transport() self.rpc_server = messaging.get_rpc_server( - cfg.CONF.alarm.notifier_rpc_topic, self) + transport, cfg.CONF.alarm.notifier_rpc_topic, self) self.notifiers = extension.ExtensionManager(self.EXTENSIONS_NAMESPACE, invoke_on_load=True) diff --git a/ceilometer/api/controllers/v2.py b/ceilometer/api/controllers/v2.py index 4b566a7f0..962ff6e94 100644 --- a/ceilometer/api/controllers/v2.py +++ b/ceilometer/api/controllers/v2.py @@ -604,7 +604,8 @@ def _make_link(rel_name, url, type, type_arg, query=None): def _send_notification(event, payload): notification = event.replace(" ", "_") notification = "alarm.%s" % notification - notifier = messaging.get_notifier(publisher_id="ceilometer.api") + transport = messaging.get_transport() + notifier = messaging.get_notifier(transport, publisher_id="ceilometer.api") # FIXME(sileht): perhaps we need to copy some infos from the # pecan request headers like nova does notifier.info(context.RequestContext(), notification, payload) diff --git a/ceilometer/collector.py b/ceilometer/collector.py index f9c08ee75..18bae2451 100644 --- a/ceilometer/collector.py +++ b/ceilometer/collector.py @@ -57,9 +57,10 @@ class CollectorService(os_service.Service): if cfg.CONF.collector.udp_address: self.tg.add_thread(self.start_udp) - if messaging.TRANSPORT is not None: + transport = messaging.get_transport(optional=True) + if transport: self.rpc_server = messaging.get_rpc_server( - cfg.CONF.publisher_rpc.metering_topic, self) + transport, cfg.CONF.publisher_rpc.metering_topic, self) self.rpc_server.start() if not cfg.CONF.collector.udp_address: diff --git a/ceilometer/messaging.py b/ceilometer/messaging.py index 88934606c..fda4320cb 100644 --- a/ceilometer/messaging.py +++ b/ceilometer/messaging.py @@ -15,16 +15,14 @@ # License for the specific language governing permissions and limitations # under the License. -import eventlet from oslo.config import cfg import oslo.messaging from ceilometer.openstack.common import context from ceilometer.openstack.common import jsonutils - -TRANSPORT = None -NOTIFIER = None +DEFAULT_URL = "__default__" +TRANSPORTS = {} _ALIASES = { 'ceilometer.openstack.common.rpc.impl_kombu': 'rabbit', @@ -62,75 +60,69 @@ class JsonPayloadSerializer(oslo.messaging.NoOpSerializer): return jsonutils.to_primitive(entity, convert_instances=True) -def setup(url=None, optional=False): +def setup(): + oslo.messaging.set_transport_defaults('ceilometer') + + +def get_transport(url=None, optional=False, cache=True): """Initialise the oslo.messaging layer.""" - global TRANSPORT, NOTIFIER - - if url and url.startswith("fake://"): - # NOTE(sileht): oslo.messaging fake driver uses time.sleep - # for task switch, so we need to monkey_patch it - eventlet.monkey_patch(time=True) - - if not TRANSPORT: - oslo.messaging.set_transport_defaults('ceilometer') + global TRANSPORTS, DEFAULT_URL + transport = TRANSPORTS.get(url) + if not transport: try: - TRANSPORT = oslo.messaging.get_transport(cfg.CONF, url, + transport = oslo.messaging.get_transport(cfg.CONF, url, aliases=_ALIASES) except oslo.messaging.InvalidTransportURL as e: - TRANSPORT = None if not optional or e.url: # NOTE(sileht): oslo.messaging is configured but unloadable # so reraise the exception raise - - if not NOTIFIER and TRANSPORT: - serializer = RequestContextSerializer(JsonPayloadSerializer()) - NOTIFIER = oslo.messaging.Notifier(TRANSPORT, serializer=serializer) + return None + else: + if not url: + url = DEFAULT_URL + if cache: + TRANSPORTS[url] = transport + return transport def cleanup(): """Cleanup the oslo.messaging layer.""" - global TRANSPORT, NOTIFIER - if TRANSPORT: - TRANSPORT.cleanup() - TRANSPORT = NOTIFIER = None + global TRANSPORTS, NOTIFIERS + NOTIFIERS = {} + for url in TRANSPORTS: + TRANSPORTS[url].cleanup() + del TRANSPORTS[url] -def get_rpc_server(topic, endpoint): +def get_rpc_server(transport, topic, endpoint): """Return a configured oslo.messaging rpc server.""" - global TRANSPORT target = oslo.messaging.Target(server=cfg.CONF.host, topic=topic) serializer = RequestContextSerializer(JsonPayloadSerializer()) - return oslo.messaging.get_rpc_server(TRANSPORT, target, [endpoint], - executor='eventlet', + return oslo.messaging.get_rpc_server(transport, target, + [endpoint], executor='eventlet', serializer=serializer) -def get_rpc_client(**kwargs): +def get_rpc_client(transport, **kwargs): """Return a configured oslo.messaging RPCClient.""" - global TRANSPORT target = oslo.messaging.Target(**kwargs) serializer = RequestContextSerializer(JsonPayloadSerializer()) - return oslo.messaging.RPCClient(TRANSPORT, target, + return oslo.messaging.RPCClient(transport, target, serializer=serializer) -def get_notification_listener(targets, endpoints, url=None): +def get_notification_listener(transport, targets, endpoints): """Return a configured oslo.messaging notification listener.""" - global TRANSPORT - if url: - transport = oslo.messaging.get_transport(cfg.CONF, url, - _ALIASES) - else: - transport = TRANSPORT return oslo.messaging.get_notification_listener( transport, targets, endpoints, executor='eventlet') -def get_notifier(publisher_id): +def get_notifier(transport, publisher_id): """Return a configured oslo.messaging notifier.""" - global NOTIFIER - return NOTIFIER.prepare(publisher_id=publisher_id) + serializer = RequestContextSerializer(JsonPayloadSerializer()) + notifier = oslo.messaging.Notifier(transport, serializer=serializer) + return notifier.prepare(publisher_id=publisher_id) def convert_to_old_notification_format(priority, ctxt, publisher_id, diff --git a/ceilometer/notification.py b/ceilometer/notification.py index 8ddc49db7..56ea03af8 100644 --- a/ceilometer/notification.py +++ b/ceilometer/notification.py @@ -63,6 +63,15 @@ class NotificationService(os_service.Service): def start(self): super(NotificationService, self).start() + # FIXME(sileht): endpoint use notification_topics option + # and it should not because this is oslo.messaging option + # not a ceilometer, until we have a something to get + # the notification_topics in an other way + # we must create a transport to ensure the option have + # beeen registered by oslo.messaging + transport = messaging.get_transport() + messaging.get_notifier(transport, '') + self.pipeline_manager = pipeline.setup_pipeline() self.notification_manager = self._get_notifications_manager( @@ -91,9 +100,9 @@ class NotificationService(os_service.Service): urls = cfg.CONF.notification.messaging_urls or [None] self.listeners = [] for url in urls: - listener = messaging.get_notification_listener(targets, - endpoints, - url) + transport = messaging.get_transport(url) + listener = messaging.get_notification_listener( + transport, targets, endpoints) listener.start() self.listeners.append(listener) diff --git a/ceilometer/publisher/rpc.py b/ceilometer/publisher/rpc.py index 8d54ec5b3..16d403750 100644 --- a/ceilometer/publisher/rpc.py +++ b/ceilometer/publisher/rpc.py @@ -106,7 +106,8 @@ class RPCPublisher(publisher.PublisherBase): % self.policy) self.policy = 'default' - self.rpc_client = messaging.get_rpc_client(version='1.0') + transport = messaging.get_transport() + self.rpc_client = messaging.get_rpc_client(transport, version='1.0') def publish_samples(self, context, samples): """Publish samples on RPC. diff --git a/ceilometer/tests/alarm/partition/test_coordination.py b/ceilometer/tests/alarm/partition/test_coordination.py index 0da005d57..b396c66e6 100644 --- a/ceilometer/tests/alarm/partition/test_coordination.py +++ b/ceilometer/tests/alarm/partition/test_coordination.py @@ -25,19 +25,17 @@ import mock from six import moves from ceilometer.alarm.partition import coordination -from ceilometer import messaging from ceilometer.openstack.common.fixture import config -from ceilometer.openstack.common import test from ceilometer.openstack.common import timeutils from ceilometer.storage import models +from ceilometer.tests import base as tests_base -class TestCoordinate(test.BaseTestCase): +class TestCoordinate(tests_base.BaseTestCase): def setUp(self): super(TestCoordinate, self).setUp() self.CONF = self.useFixture(config.Config()).conf - messaging.setup('fake://') - self.addCleanup(messaging.cleanup) + self.setup_messaging(self.CONF) self.test_interval = 120 self.CONF.set_override('evaluation_interval', @@ -425,7 +423,7 @@ class TestCoordinate(test.BaseTestCase): self.output.getvalue()) -class TestPartitionIdentity(test.BaseTestCase): +class TestPartitionIdentity(tests_base.BaseTestCase): def setUp(self): super(TestPartitionIdentity, self).setUp() self.id_1st = coordination.PartitionIdentity(str(uuid.uuid4()), 1) diff --git a/ceilometer/tests/alarm/test_notifier.py b/ceilometer/tests/alarm/test_notifier.py index 4a88b2712..6774214f8 100644 --- a/ceilometer/tests/alarm/test_notifier.py +++ b/ceilometer/tests/alarm/test_notifier.py @@ -20,11 +20,10 @@ import mock import requests from ceilometer.alarm import service -from ceilometer import messaging from ceilometer.openstack.common import context from ceilometer.openstack.common.fixture import config from ceilometer.openstack.common.fixture import mockpatch -from ceilometer.openstack.common import test +from ceilometer.tests import base as tests_base DATA_JSON = ('{"current": "ALARM", "alarm_id": "foobar",' @@ -38,14 +37,12 @@ NOTIFICATION = dict(alarm_id='foobar', current='ALARM') -class TestAlarmNotifier(test.BaseTestCase): +class TestAlarmNotifier(tests_base.BaseTestCase): def setUp(self): super(TestAlarmNotifier, self).setUp() - messaging.setup('fake://') - self.addCleanup(messaging.cleanup) - self.CONF = self.useFixture(config.Config()).conf + self.setup_messaging(self.CONF) self.service = service.AlarmNotifierService() @mock.patch('ceilometer.pipeline.setup_pipeline', mock.MagicMock()) diff --git a/ceilometer/tests/alarm/test_partitioned_alarm_svc.py b/ceilometer/tests/alarm/test_partitioned_alarm_svc.py index 739d49973..38b850916 100644 --- a/ceilometer/tests/alarm/test_partitioned_alarm_svc.py +++ b/ceilometer/tests/alarm/test_partitioned_alarm_svc.py @@ -21,16 +21,13 @@ import mock from stevedore import extension from ceilometer.alarm import service -from ceilometer import messaging from ceilometer.openstack.common.fixture import config -from ceilometer.openstack.common import test +from ceilometer.tests import base as tests_base -class TestPartitionedAlarmService(test.BaseTestCase): +class TestPartitionedAlarmService(tests_base.BaseTestCase): def setUp(self): super(TestPartitionedAlarmService, self).setUp() - messaging.setup('fake://') - self.addCleanup(messaging.cleanup) self.threshold_eval = mock.Mock() self.api_client = mock.MagicMock() @@ -41,6 +38,8 @@ class TestPartitionedAlarmService(test.BaseTestCase): self.CONF.set_override('partition_rpc_topic', 'fake_topic', group='alarm') + self.setup_messaging(self.CONF) + self.partitioned = service.PartitionedAlarmService() self.partitioned.tg = mock.Mock() self.partitioned.partition_coordinator = mock.Mock() diff --git a/ceilometer/tests/alarm/test_rpc.py b/ceilometer/tests/alarm/test_rpc.py index 55d2def8c..3e6bd9d8a 100644 --- a/ceilometer/tests/alarm/test_rpc.py +++ b/ceilometer/tests/alarm/test_rpc.py @@ -22,14 +22,16 @@ import eventlet from ceilometer.alarm import rpc as rpc_alarm from ceilometer import messaging -from ceilometer.openstack.common import test +from ceilometer.openstack.common.fixture import config from ceilometer.openstack.common import timeutils from ceilometer.storage import models +from ceilometer.tests import base class FakeNotifier(object): - def __init__(self): - self.rpc = messaging.get_rpc_server("alarm_notifier", self) + def __init__(self, transport): + self.rpc = messaging.get_rpc_server( + transport, "alarm_notifier", self) self.notified = [] def start(self, expected_length): @@ -42,13 +44,13 @@ class FakeNotifier(object): self.rpc.stop() -class TestRPCAlarmNotifier(test.BaseTestCase): +class TestRPCAlarmNotifier(base.BaseTestCase): def setUp(self): super(TestRPCAlarmNotifier, self).setUp() - messaging.setup('fake://') - self.addCleanup(messaging.cleanup) + self.CONF = self.useFixture(config.Config()).conf + self.setup_messaging(self.CONF) - self.notifier_server = FakeNotifier() + self.notifier_server = FakeNotifier(self.transport) self.notifier = rpc_alarm.RPCAlarmNotifier() self.alarms = [ alarms.Alarm(None, info={ @@ -144,9 +146,9 @@ class TestRPCAlarmNotifier(test.BaseTestCase): class FakeCoordinator(object): - def __init__(self): + def __init__(self, transport): self.rpc = messaging.get_rpc_server( - "alarm_partition_coordination", self) + transport, "alarm_partition_coordination", self) self.notified = [] def presence(self, context, data): @@ -163,13 +165,13 @@ class FakeCoordinator(object): self.rpc.stop() -class TestRPCAlarmPartitionCoordination(test.BaseTestCase): +class TestRPCAlarmPartitionCoordination(base.BaseTestCase): def setUp(self): super(TestRPCAlarmPartitionCoordination, self).setUp() - messaging.setup('fake://') - self.addCleanup(messaging.cleanup) + self.CONF = self.useFixture(config.Config()).conf + self.setup_messaging(self.CONF) - self.coordinator_server = FakeCoordinator() + self.coordinator_server = FakeCoordinator(self.transport) self.coordinator_server.rpc.start() eventlet.sleep() # must be sure that fanout queue is created diff --git a/ceilometer/tests/alarm/test_singleton_alarm_svc.py b/ceilometer/tests/alarm/test_singleton_alarm_svc.py index c1353d291..5ac82875b 100644 --- a/ceilometer/tests/alarm/test_singleton_alarm_svc.py +++ b/ceilometer/tests/alarm/test_singleton_alarm_svc.py @@ -18,20 +18,19 @@ """ import mock -from oslo.config import cfg from stevedore import extension from ceilometer.alarm import service -from ceilometer import messaging -from ceilometer.openstack.common import test +from ceilometer.openstack.common.fixture import config +from ceilometer.tests import base as tests_base -class TestSingletonAlarmService(test.BaseTestCase): +class TestSingletonAlarmService(tests_base.BaseTestCase): def setUp(self): super(TestSingletonAlarmService, self).setUp() - messaging.setup('fake://') - self.addCleanup(messaging.cleanup) + self.CONF = self.useFixture(config.Config()).conf + self.setup_messaging(self.CONF) self.threshold_eval = mock.Mock() self.evaluators = extension.ExtensionManager.make_test_instance( @@ -51,9 +50,9 @@ class TestSingletonAlarmService(test.BaseTestCase): def test_start(self): test_interval = 120 - cfg.CONF.set_override('evaluation_interval', - test_interval, - group='alarm') + self.CONF.set_override('evaluation_interval', + test_interval, + group='alarm') with mock.patch('ceilometerclient.client.get_client', return_value=self.api_client): self.singleton.start() @@ -90,13 +89,13 @@ class TestSingletonAlarmService(test.BaseTestCase): def test_singleton_endpoint_types(self): endpoint_types = ["internalURL", "publicURL"] for endpoint_type in endpoint_types: - cfg.CONF.set_override('os_endpoint_type', - endpoint_type, - group='service_credentials') + self.CONF.set_override('os_endpoint_type', + endpoint_type, + group='service_credentials') with mock.patch('ceilometerclient.client.get_client') as client: self.singleton.api_client = None self.singleton._evaluate_assigned_alarms() - conf = cfg.CONF.service_credentials + conf = self.CONF.service_credentials expected = [mock.call(2, os_auth_url=conf.os_auth_url, os_region_name=conf.os_region_name, diff --git a/ceilometer/tests/api/__init__.py b/ceilometer/tests/api/__init__.py index 993b48b20..4fcc3a08a 100644 --- a/ceilometer/tests/api/__init__.py +++ b/ceilometer/tests/api/__init__.py @@ -18,11 +18,10 @@ """ from oslo.config import cfg -import oslo.messaging.conffixture import pecan import pecan.testing -from ceilometer import messaging +from ceilometer.openstack.common.fixture import config from ceilometer.tests import db as db_test_base OPT_GROUP_NAME = 'keystone_authtoken' @@ -39,10 +38,9 @@ class FunctionalTest(db_test_base.TestBase): def setUp(self): super(FunctionalTest, self).setUp() - self.useFixture(oslo.messaging.conffixture.ConfFixture(self.CONF)) - self.CONF.set_override("notification_driver", "messaging") - messaging.setup("fake://") - self.addCleanup(messaging.cleanup) + self.CONF = self.useFixture(config.Config()).conf + self.setup_messaging(self.CONF) + self.CONF.set_override("auth_version", "v2.0", group=OPT_GROUP_NAME) self.CONF.set_override("policy_file", diff --git a/ceilometer/tests/api/v2/test_alarm_scenarios.py b/ceilometer/tests/api/v2/test_alarm_scenarios.py index 16d17d716..35eadce87 100644 --- a/ceilometer/tests/api/v2/test_alarm_scenarios.py +++ b/ceilometer/tests/api/v2/test_alarm_scenarios.py @@ -1933,10 +1933,9 @@ class TestAlarms(FunctionalTest, } endpoint = mock.MagicMock() - target = oslo.messaging.Target(topic="notifications", - exchange="ceilometer") - listener = messaging.get_notification_listener([target], - [endpoint]) + target = oslo.messaging.Target(topic="notifications") + listener = messaging.get_notification_listener( + self.transport, [target], [endpoint]) listener.start() endpoint.info.side_effect = lambda *args: listener.stop() self.post_json('/alarms', params=json, headers=self.auth_headers) @@ -1975,7 +1974,8 @@ class TestAlarms(FunctionalTest, self.delete('/alarms/%s' % data[0]['alarm_id'], headers=self.auth_headers, status=204) - get_notifier.assert_called_once_with(publisher_id='ceilometer.api') + get_notifier.assert_called_once_with(mock.ANY, + publisher_id='ceilometer.api') calls = notifier.info.call_args_list self.assertEqual(1, len(calls)) diff --git a/ceilometer/tests/api/v2/test_post_samples_scenarios.py b/ceilometer/tests/api/v2/test_post_samples_scenarios.py index cc623a59a..4841c8dac 100644 --- a/ceilometer/tests/api/v2/test_post_samples_scenarios.py +++ b/ceilometer/tests/api/v2/test_post_samples_scenarios.py @@ -35,7 +35,7 @@ class TestPostSamples(FunctionalTest, del m['message_signature'] self.published.append(data) - def fake_get_rpc_client(self, **kwargs): + def fake_get_rpc_client(self, *args, **kwargs): cast_ctxt = mock.Mock() cast_ctxt.cast.side_effect = self.fake_cast client = mock.Mock() diff --git a/ceilometer/tests/base.py b/ceilometer/tests/base.py index 084a8de77..3914788f3 100644 --- a/ceilometer/tests/base.py +++ b/ceilometer/tests/base.py @@ -21,13 +21,36 @@ import functools import os.path import six +import eventlet +import oslo.messaging from testtools import testcase +from ceilometer import messaging +from ceilometer.openstack.common.fixture import mockpatch from ceilometer.openstack.common import test from ceilometer.openstack.common import timeutils class BaseTestCase(test.BaseTestCase): + def setup_messaging(self, conf, exchange=None): + self.useFixture(oslo.messaging.conffixture.ConfFixture(conf)) + conf.set_override("notification_driver", "messaging") + if not exchange: + exchange = 'ceilometer' + conf.set_override("control_exchange", exchange) + + # NOTE(sileht): oslo.messaging fake driver uses time.sleep + # for task switch, so we need to monkey_patch it + # and also ensure the correct exchange have been set + eventlet.monkey_patch(time=True) + + # NOTE(sileht): Ensure a new oslo.messaging driver is loaded + # between each tests + self.transport = messaging.get_transport("fake://", cache=False) + self.useFixture(mockpatch.Patch( + 'ceilometer.messaging.get_transport', + return_value=self.transport)) + def assertTimestampEqual(self, first, second, msg=None): """Checks that two timestamps are equals. diff --git a/ceilometer/tests/event/test_endpoint.py b/ceilometer/tests/event/test_endpoint.py index 03db8d829..83e3bdff6 100644 --- a/ceilometer/tests/event/test_endpoint.py +++ b/ceilometer/tests/event/test_endpoint.py @@ -23,7 +23,6 @@ import oslo.messaging from stevedore import extension from ceilometer.event import endpoint as event_endpoint -from ceilometer import messaging from ceilometer.openstack.common.fixture import config from ceilometer.storage import models from ceilometer.tests import base as tests_base @@ -90,10 +89,9 @@ class TestEventEndpoint(tests_base.BaseTestCase): super(TestEventEndpoint, self).setUp() self.CONF = self.useFixture(config.Config()).conf self.CONF([]) - messaging.setup('fake://') - self.addCleanup(messaging.cleanup) self.CONF.set_override("connection", "log://", group='database') self.CONF.set_override("store_events", True, group="notification") + self.setup_messaging(self.CONF) self.mock_dispatcher = mock.MagicMock() self.endpoint = event_endpoint.EventsNotificationEndpoint() diff --git a/ceilometer/tests/objectstore/test_swift_middleware.py b/ceilometer/tests/objectstore/test_swift_middleware.py index fd8f80f82..64b6d931e 100644 --- a/ceilometer/tests/objectstore/test_swift_middleware.py +++ b/ceilometer/tests/objectstore/test_swift_middleware.py @@ -27,12 +27,11 @@ except ImportError: import webob REQUEST = webob -from ceilometer import messaging from ceilometer.objectstore import swift_middleware from ceilometer.openstack.common.fixture import config -from ceilometer.openstack.common.fixture.mockpatch import PatchObject -from ceilometer.openstack.common import test +from ceilometer.openstack.common.fixture import mockpatch from ceilometer import pipeline +from ceilometer.tests import base as tests_base class FakeApp(object): @@ -49,7 +48,7 @@ class FakeApp(object): return self.body -class TestSwiftMiddleware(test.BaseTestCase): +class TestSwiftMiddleware(tests_base.BaseTestCase): class _faux_pipeline_manager(pipeline.PipelineManager): class _faux_pipeline(object): @@ -72,20 +71,16 @@ class TestSwiftMiddleware(test.BaseTestCase): def setUp(self): super(TestSwiftMiddleware, self).setUp() self.pipeline_manager = self._faux_pipeline_manager() - self.useFixture(PatchObject(pipeline, 'setup_pipeline', - side_effect=self._fake_setup_pipeline)) - messaging.setup('fake://') - self.addCleanup(messaging.cleanup) + self.useFixture(mockpatch.PatchObject( + pipeline, 'setup_pipeline', + side_effect=self._fake_setup_pipeline)) self.CONF = self.useFixture(config.Config()).conf + self.setup_messaging(self.CONF) @staticmethod def start_response(*args): pass - def test_rpc_setup(self): - swift_middleware.CeilometerMiddleware(FakeApp(), {}) - self.assertEqual('ceilometer', self.CONF.control_exchange) - def test_get(self): app = swift_middleware.CeilometerMiddleware(FakeApp(), {}) req = REQUEST.Request.blank('/1.0/account/container/obj', diff --git a/ceilometer/tests/publisher/test_rpc_publisher.py b/ceilometer/tests/publisher/test_rpc_publisher.py index 3f910a601..c8000a01d 100644 --- a/ceilometer/tests/publisher/test_rpc_publisher.py +++ b/ceilometer/tests/publisher/test_rpc_publisher.py @@ -28,12 +28,12 @@ from ceilometer import messaging from ceilometer.openstack.common import context from ceilometer.openstack.common.fixture import config from ceilometer.openstack.common import network_utils -from ceilometer.openstack.common import test from ceilometer.publisher import rpc from ceilometer import sample +from ceilometer.tests import base as tests_base -class TestPublish(test.BaseTestCase): +class TestPublish(tests_base.BaseTestCase): test_data = [ sample.Sample( name='test', @@ -95,8 +95,7 @@ class TestPublish(test.BaseTestCase): def setUp(self): super(TestPublish, self).setUp() self.CONF = self.useFixture(config.Config()).conf - messaging.setup('fake://') - self.addCleanup(messaging.cleanup) + self.setup_messaging(self.CONF) self.published = [] def test_published_no_mock(self): @@ -105,7 +104,7 @@ class TestPublish(test.BaseTestCase): endpoint = mock.MagicMock(['record_metering_data']) collector = messaging.get_rpc_server( - self.CONF.publisher_rpc.metering_topic, endpoint) + self.transport, self.CONF.publisher_rpc.metering_topic, endpoint) endpoint.record_metering_data.side_effect = \ lambda *args, **kwds: collector.stop() diff --git a/ceilometer/tests/test_collector.py b/ceilometer/tests/test_collector.py index 74b4e344c..78e05f130 100644 --- a/ceilometer/tests/test_collector.py +++ b/ceilometer/tests/test_collector.py @@ -46,9 +46,7 @@ class TestCollector(tests_base.BaseTestCase): self.CONF.set_override("connection", "log://", group='database') self.CONF.set_override('metering_secret', 'not-so-secret', group='publisher') - self.useFixture(oslo.messaging.conffixture.ConfFixture(self.CONF)) - self._setup_messaging('fake://') - self.addCleanup(messaging.cleanup) + self._setup_messaging() self.counter = sample.Sample( name='foobar', @@ -87,10 +85,13 @@ class TestCollector(tests_base.BaseTestCase): def _dummy_thread_group_add_thread(method): method() - def _setup_messaging(self, url): - messaging.cleanup() - self.CONF.set_override('rpc_backend', '') - messaging.setup(url, optional=True) + def _setup_messaging(self, enabled=True): + if enabled: + self.setup_messaging(self.CONF) + else: + self.useFixture(mockpatch.Patch( + 'ceilometer.messaging.get_transport', + return_value=None)) def _setup_fake_dispatcher(self): plugin = mock.MagicMock() @@ -127,7 +128,7 @@ class TestCollector(tests_base.BaseTestCase): data=self.counter) def test_udp_receive_base(self): - self._setup_messaging('') + self._setup_messaging(False) mock_dispatcher = self._setup_fake_dispatcher() self.counter['source'] = 'mysource' self.counter['counter_name'] = self.counter['name'] @@ -146,7 +147,7 @@ class TestCollector(tests_base.BaseTestCase): self.counter) def test_udp_receive_storage_error(self): - self._setup_messaging('') + self._setup_messaging(False) mock_dispatcher = self._setup_fake_dispatcher() mock_dispatcher.record_metering_data.side_effect = self._raise_error @@ -170,7 +171,7 @@ class TestCollector(tests_base.BaseTestCase): raise Exception def test_udp_receive_bad_decoding(self): - self._setup_messaging('') + self._setup_messaging(False) udp_socket = self._make_fake_socket(self.counter) with contextlib.nested( mock.patch('socket.socket', return_value=udp_socket), @@ -183,7 +184,7 @@ class TestCollector(tests_base.BaseTestCase): @mock.patch.object(collector.CollectorService, 'start_udp') def test_only_udp(self, udp_start, rpc_start): """Check that only UDP is started if messaging transport is unset.""" - self._setup_messaging('') + self._setup_messaging(False) udp_socket = self._make_fake_socket(self.counter) with mock.patch('socket.socket', return_value=udp_socket): self.srv.start() @@ -200,7 +201,7 @@ class TestCollector(tests_base.BaseTestCase): self.assertEqual(0, udp_start.call_count) def test_udp_receive_valid_encoding(self): - self._setup_messaging('') + self._setup_messaging(False) mock_dispatcher = self._setup_fake_dispatcher() self.data_sent = [] with mock.patch('socket.socket', @@ -216,7 +217,7 @@ class TestCollector(tests_base.BaseTestCase): self.srv.start() mylog.info.side_effect = lambda *args: self.srv.stop() - client = messaging.get_rpc_client(version='1.0') + client = messaging.get_rpc_client(self.transport, version='1.0') cclient = client.prepare(topic='metering') cclient.cast(context.RequestContext(), 'record_metering_data', data=[self.utf8_msg]) diff --git a/ceilometer/tests/test_notification.py b/ceilometer/tests/test_notification.py index f48986d85..e54e2d625 100644 --- a/ceilometer/tests/test_notification.py +++ b/ceilometer/tests/test_notification.py @@ -16,7 +16,7 @@ # under the License. """Tests for Ceilometer notify daemon.""" -import eventlet.semaphore +import eventlet import mock import oslo.messaging @@ -30,6 +30,8 @@ from ceilometer import notification from ceilometer.openstack.common import context from ceilometer.openstack.common import fileutils from ceilometer.openstack.common.fixture import config +from ceilometer.openstack.common import timeutils +from ceilometer.publisher import test as test_publisher from ceilometer.tests import base as tests_base TEST_NOTICE_CTXT = { @@ -89,10 +91,9 @@ class TestNotification(tests_base.BaseTestCase): def setUp(self): super(TestNotification, self).setUp() self.CONF = self.useFixture(config.Config()).conf - messaging.setup('fake://') - self.addCleanup(messaging.cleanup) self.CONF.set_override("connection", "log://", group='database') self.CONF.set_override("store_events", False, group="notification") + self.setup_messaging(self.CONF) self.srv = notification.NotificationService() def fake_get_notifications_manager(self, pm): @@ -168,7 +169,7 @@ class TestRealNotification(tests_base.BaseTestCase): def setUp(self): super(TestRealNotification, self).setUp() self.CONF = self.useFixture(config.Config()).conf - self.useFixture(oslo.messaging.conffixture.ConfFixture(self.CONF)) + self.setup_messaging(self.CONF, 'nova') pipeline = yaml.dump([{ 'name': 'test_pipeline', @@ -179,44 +180,31 @@ class TestRealNotification(tests_base.BaseTestCase): }]) self.expected_samples = 2 - self.sem = eventlet.semaphore.Semaphore(0) pipeline_cfg_file = fileutils.write_to_tempfile(content=pipeline, prefix="pipeline", suffix="yaml") self.CONF.set_override("pipeline_cfg_file", pipeline_cfg_file) - self.CONF.set_override("notification_driver", "messaging") - self.CONF.set_override("control_exchange", "nova") - messaging.setup('fake://') - self.addCleanup(messaging.cleanup) - self.srv = notification.NotificationService() + self.publisher = test_publisher.TestPublisher("") @mock.patch('ceilometer.publisher.test.TestPublisher') def test_notification_service(self, fake_publisher_cls): + fake_publisher_cls.return_value = self.publisher self.srv.start() - fake_publisher = fake_publisher_cls.return_value - fake_publisher.publish_samples.side_effect = \ - lambda *args: self.sem.release() - - notifier = messaging.get_notifier("compute.vagrant-precise") + notifier = messaging.get_notifier(self.transport, + "compute.vagrant-precise") notifier.info(context.RequestContext(), 'compute.instance.create.end', TEST_NOTICE_PAYLOAD) - # we should wait all the expected notification listeners finished - # processing the notification - for i in range(self.expected_samples): - self.sem.acquire(timeout=30) - # stop NotificationService + start = timeutils.utcnow() + while timeutils.delta_seconds(start, timeutils.utcnow()) < 600: + if len(self.publisher.samples) >= self.expected_samples: + break + eventlet.sleep(0) + self.srv.stop() - class SamplesMatcher(object): - def __eq__(self, samples): - for s in samples: - if s.resource_id != "9f9d01b9-4a58-4271-9e27-398b21ab20d1": - return False - return True - - fake_publisher.publish_samples.assert_has_calls( - [mock.call(mock.ANY, SamplesMatcher())] * self.expected_samples - ) + resources = list(set(s.resource_id for s in self.publisher.samples)) + self.assertEqual(self.expected_samples, len(self.publisher.samples)) + self.assertEqual(["9f9d01b9-4a58-4271-9e27-398b21ab20d1"], resources)