diff --git a/ceilometer/api/controllers/v2.py b/ceilometer/api/controllers/v2.py index 330fa94ba..c7bb61413 100644 --- a/ceilometer/api/controllers/v2.py +++ b/ceilometer/api/controllers/v2.py @@ -595,7 +595,9 @@ def _send_notification(event, payload): notification = event.replace(" ", "_") notification = "alarm.%s" % notification notifier = messaging.get_notifier(publisher_id="ceilometer.api") - notifier.info(None, notification, payload) + # FIXME(sileht): perhaps we need to copy some infos from the + # pecan request headers like nova does + notifier.info(context.RequestContext(), notification, payload) class OldSample(_Base): diff --git a/ceilometer/messaging.py b/ceilometer/messaging.py index 9d739e7d4..24f94a7e2 100644 --- a/ceilometer/messaging.py +++ b/ceilometer/messaging.py @@ -15,9 +15,14 @@ # License for the specific language governing permissions and limitations # under the License. +import eventlet from oslo.config import cfg import oslo.messaging +from ceilometer.openstack.common import context +from ceilometer.openstack.common import jsonutils + + TRANSPORT = None NOTIFIER = None @@ -28,15 +33,51 @@ _ALIASES = { } +class RequestContextSerializer(oslo.messaging.Serializer): + def __init__(self, base): + self._base = base + + def serialize_entity(self, ctxt, entity): + if not self._base: + return entity + return self._base.serialize_entity(ctxt, entity) + + def deserialize_entity(self, ctxt, entity): + if not self._base: + return entity + return self._base.deserialize_entity(ctxt, entity) + + @staticmethod + def serialize_context(ctxt): + return ctxt.to_dict() + + @staticmethod + def deserialize_context(ctxt): + return context.RequestContext(ctxt) + + +class JsonPayloadSerializer(oslo.messaging.NoOpSerializer): + @classmethod + def serialize_entity(cls, context, entity): + return jsonutils.to_primitive(entity, convert_instances=True) + + def setup(url=None): """Initialise the oslo.messaging layer.""" global TRANSPORT, NOTIFIER + + if url and url.startswith("fake://"): + # NOTE(sileht): oslo.messaging fake driver uses time.sleep + # for task switch, so we need to monkey_patch it + eventlet.monkey_patch(time=True) + if not TRANSPORT: oslo.messaging.set_transport_defaults('ceilometer') TRANSPORT = oslo.messaging.get_transport(cfg.CONF, url, aliases=_ALIASES) if not NOTIFIER: - NOTIFIER = oslo.messaging.Notifier(TRANSPORT) + serializer = RequestContextSerializer(JsonPayloadSerializer()) + NOTIFIER = oslo.messaging.Notifier(TRANSPORT, serializer=serializer) def cleanup(): @@ -52,15 +93,19 @@ def get_rpc_server(topic, endpoint): """Return a configured oslo.messaging rpc server.""" global TRANSPORT target = oslo.messaging.Target(server=cfg.CONF.host, topic=topic) + serializer = RequestContextSerializer(JsonPayloadSerializer()) return oslo.messaging.get_rpc_server(TRANSPORT, target, [endpoint], - executor='eventlet') + executor='eventlet', + serializer=serializer) def get_rpc_client(**kwargs): """Return a configured oslo.messaging RPCClient.""" global TRANSPORT target = oslo.messaging.Target(**kwargs) - return oslo.messaging.RPCClient(TRANSPORT, target) + serializer = RequestContextSerializer(JsonPayloadSerializer()) + return oslo.messaging.RPCClient(TRANSPORT, target, + serializer=serializer) def get_notification_listener(targets, endpoints, url=None): @@ -83,9 +128,9 @@ def get_notifier(publisher_id): def convert_to_old_notification_format(priority, ctxt, publisher_id, event_type, payload, metadata): - #FIXME(sileht): temporary convert notification to old format - #to focus on oslo.messaging migration before refactoring the code to - #use the new oslo.messaging facilities + # FIXME(sileht): temporary convert notification to old format + # to focus on oslo.messaging migration before refactoring the code to + # use the new oslo.messaging facilities notification = {'priority': priority, 'payload': payload, 'event_type': event_type, diff --git a/ceilometer/publisher/rpc.py b/ceilometer/publisher/rpc.py index 9f733791a..922098af2 100644 --- a/ceilometer/publisher/rpc.py +++ b/ceilometer/publisher/rpc.py @@ -142,7 +142,7 @@ class RPCPublisher(publisher.PublisherBase): self.flush() def flush(self): - #note(sileht): + # NOTE(sileht): # IO of the rpc stuff in handled by eventlet, # this is why the self.local_queue, is emptied before processing the # queue and the remaining messages in the queue are added to @@ -164,7 +164,7 @@ class RPCPublisher(publisher.PublisherBase): "dropping %d oldest samples") % count) def _process_queue(self, queue, policy): - #note(sileht): + # NOTE(sileht): # the behavior of rpc.cast call depends of rabbit_max_retries # if rabbit_max_retries <= 0: # it returns only if the msg has been sent on the amqp queue @@ -178,7 +178,7 @@ class RPCPublisher(publisher.PublisherBase): context, topic, meters = queue[0] try: self.rpc_client.prepare(topic=topic).cast( - context.to_dict(), self.target, data=meters) + context, self.target, data=meters) except oslo.messaging._drivers.common.RPCException: samples = sum([len(m) for __, __, m in queue]) if policy == 'queue': diff --git a/ceilometer/tests/alarm/test_rpc.py b/ceilometer/tests/alarm/test_rpc.py index d798c4e20..35b5f33de 100644 --- a/ceilometer/tests/alarm/test_rpc.py +++ b/ceilometer/tests/alarm/test_rpc.py @@ -19,30 +19,38 @@ import uuid from ceilometerclient.v2 import alarms -import mock +import eventlet from ceilometer.alarm import rpc as rpc_alarm from ceilometer import messaging -from ceilometer.openstack.common.fixture import mockpatch from ceilometer.openstack.common import test from ceilometer.openstack.common import timeutils from ceilometer.storage import models -class TestRPCAlarmNotifier(test.BaseTestCase): - def fake_cast(self, context, method, **args): - self.notified.append((method, args)) +class FakeNotifier(object): + def __init__(self): + self.rpc = messaging.get_rpc_server("alarm_notifier", self) + self.notified = [] + def start(self, expected_length): + self.expected_length = expected_length + self.rpc.start() + + def notify_alarm(self, context, data): + self.notified.append(data) + if len(self.notified) == self.expected_length: + self.rpc.stop() + + +class TestRPCAlarmNotifier(test.BaseTestCase): def setUp(self): super(TestRPCAlarmNotifier, self).setUp() messaging.setup('fake://') self.addCleanup(messaging.cleanup) - self.notified = [] + self.notifier_server = FakeNotifier() self.notifier = rpc_alarm.RPCAlarmNotifier() - self.useFixture(mockpatch.PatchObject( - self.notifier.client, 'cast', - side_effect=self.fake_cast)) self.alarms = [ alarms.Alarm(None, info={ 'name': 'instance_running_hot', @@ -83,29 +91,36 @@ class TestRPCAlarmNotifier(test.BaseTestCase): self.assertEqual('alarm_notifier', topic) def test_notify_alarm(self): + self.notifier_server.start(2) + previous = ['alarm', 'ok'] for i, a in enumerate(self.alarms): self.notifier.notify(a, previous[i], "what? %d" % i, {'fire': '%d' % i}) - self.assertEqual(2, len(self.notified)) + + self.notifier_server.rpc.wait() + + self.assertEqual(2, len(self.notifier_server.notified)) for i, a in enumerate(self.alarms): actions = getattr(a, models.Alarm.ALARM_ACTIONS_MAP[a.state]) - self.assertEqual('notify_alarm', self.notified[i][0]) self.assertEqual(self.alarms[i].alarm_id, - self.notified[i][1]["data"]["alarm_id"]) - self.assertEqual(actions, self.notified[i][1]["data"]["actions"]) + self.notifier_server.notified[i]["alarm_id"]) + self.assertEqual(actions, + self.notifier_server.notified[i]["actions"]) self.assertEqual(previous[i], - self.notified[i][1]["data"]["previous"]) + self.notifier_server.notified[i]["previous"]) self.assertEqual(self.alarms[i].state, - self.notified[i][1]["data"]["current"]) + self.notifier_server.notified[i]["current"]) self.assertEqual("what? %d" % i, - self.notified[i][1]["data"]["reason"]) + self.notifier_server.notified[i]["reason"]) self.assertEqual({'fire': '%d' % i}, - self.notified[i][1]["data"]["reason_data"]) + self.notifier_server.notified[i]["reason_data"]) def test_notify_non_string_reason(self): + self.notifier_server.start(1) self.notifier.notify(self.alarms[0], 'ok', 42, {}) - reason = self.notified[0][1]['data']['reason'] + self.notifier_server.rpc.wait() + reason = self.notifier_server.notified[0]['reason'] self.assertIsInstance(reason, basestring) def test_notify_no_actions(self): @@ -126,52 +141,99 @@ class TestRPCAlarmNotifier(test.BaseTestCase): 'my_instance'} }) self.notifier.notify(alarm, 'alarm', "what?", {}) - self.assertEqual(0, len(self.notified)) + self.assertEqual(0, len(self.notifier_server.notified)) + + +class FakeCoordinator(object): + def __init__(self): + self.rpc = messaging.get_rpc_server( + "alarm_partition_coordination", self) + self.notified = [] + + def presence(self, context, data): + self._record('presence', data) + + def allocate(self, context, data): + self._record('allocate', data) + + def assign(self, context, data): + self._record('assign', data) + + def _record(self, method, data): + self.notified.append((method, data)) + self.rpc.stop() class TestRPCAlarmPartitionCoordination(test.BaseTestCase): - def fake_fanout_cast(self, context, method, **args): - self.notified.append((method, args)) - - def fake_prepare(self, fanout): - self.assertTrue(fanout) - cctxt = mock.Mock() - cctxt.cast.side_effect = self.fake_fanout_cast - return cctxt - def setUp(self): super(TestRPCAlarmPartitionCoordination, self).setUp() messaging.setup('fake://') self.addCleanup(messaging.cleanup) - self.notified = [] + self.coordinator_server = FakeCoordinator() + self.coordinator_server.rpc.start() + eventlet.sleep() # must be sure that fanout queue is created + self.ordination = rpc_alarm.RPCAlarmPartitionCoordination() - self.useFixture(mockpatch.PatchObject( - self.ordination.client, 'prepare', - side_effect=self.fake_prepare)) - self.alarms = [mock.MagicMock(), mock.MagicMock()] + self.alarms = [ + alarms.Alarm(None, info={ + 'name': 'instance_running_hot', + 'meter_name': 'cpu_util', + 'comparison_operator': 'gt', + 'threshold': 80.0, + 'evaluation_periods': 5, + 'statistic': 'avg', + 'state': 'ok', + 'ok_actions': ['http://host:8080/path'], + 'user_id': 'foobar', + 'project_id': 'snafu', + 'period': 60, + 'alarm_id': str(uuid.uuid4()), + 'matching_metadata':{'resource_id': + 'my_instance'} + }), + alarms.Alarm(None, info={ + 'name': 'group_running_idle', + 'meter_name': 'cpu_util', + 'comparison_operator': 'le', + 'threshold': 10.0, + 'statistic': 'max', + 'evaluation_periods': 4, + 'state': 'insufficient data', + 'insufficient_data_actions': ['http://other_host/path'], + 'user_id': 'foobar', + 'project_id': 'snafu', + 'period': 300, + 'alarm_id': str(uuid.uuid4()), + 'matching_metadata':{'metadata.user_metadata.AS': + 'my_group'} + }), + ] def test_ordination_presence(self): - id = uuid.uuid4() + id = str(uuid.uuid4()) priority = float(timeutils.utcnow().strftime('%s.%f')) self.ordination.presence(id, priority) - method, args = self.notified[0] - self.assertEqual(id, args['data']['uuid']) - self.assertEqual(priority, args['data']['priority']) + self.coordinator_server.rpc.wait() + method, args = self.coordinator_server.notified[0] + self.assertEqual(id, args['uuid']) + self.assertEqual(priority, args['priority']) self.assertEqual('presence', method) def test_ordination_assign(self): - id = uuid.uuid4() + id = str(uuid.uuid4()) self.ordination.assign(id, self.alarms) - method, args = self.notified[0] - self.assertEqual(id, args['data']['uuid']) - self.assertEqual(2, len(args['data']['alarms'])) + self.coordinator_server.rpc.wait() + method, args = self.coordinator_server.notified[0] + self.assertEqual(id, args['uuid']) + self.assertEqual(2, len(args['alarms'])) self.assertEqual('assign', method) def test_ordination_allocate(self): - id = uuid.uuid4() + id = str(uuid.uuid4()) self.ordination.allocate(id, self.alarms) - method, args = self.notified[0] - self.assertEqual(id, args['data']['uuid']) - self.assertEqual(2, len(args['data']['alarms'])) + self.coordinator_server.rpc.wait() + method, args = self.coordinator_server.notified[0] + self.assertEqual(id, args['uuid']) + self.assertEqual(2, len(args['alarms'])) self.assertEqual('allocate', method) diff --git a/ceilometer/tests/api/__init__.py b/ceilometer/tests/api/__init__.py index a6b53d0f9..69fbb2b99 100644 --- a/ceilometer/tests/api/__init__.py +++ b/ceilometer/tests/api/__init__.py @@ -19,6 +19,7 @@ """ from oslo.config import cfg +import oslo.messaging.conffixture import pecan import pecan.testing @@ -38,9 +39,11 @@ class FunctionalTest(db_test_base.TestBase): PATH_PREFIX = '' def setUp(self): + super(FunctionalTest, self).setUp() + self.useFixture(oslo.messaging.conffixture.ConfFixture(self.CONF)) + self.CONF.set_override("notification_driver", "messaging") messaging.setup("fake://") self.addCleanup(messaging.cleanup) - super(FunctionalTest, self).setUp() self.CONF.set_override("auth_version", "v2.0", group=OPT_GROUP_NAME) self.CONF.set_override("policy_file", diff --git a/ceilometer/tests/api/v2/test_alarm_scenarios.py b/ceilometer/tests/api/v2/test_alarm_scenarios.py index b2441e29c..aa3481edf 100644 --- a/ceilometer/tests/api/v2/test_alarm_scenarios.py +++ b/ceilometer/tests/api/v2/test_alarm_scenarios.py @@ -26,7 +26,7 @@ import logging import uuid import mock - +import oslo.messaging.conffixture from six import moves from ceilometer import messaging @@ -1844,21 +1844,40 @@ class TestAlarms(FunctionalTest, } } - with mock.patch.object(messaging, 'get_notifier') as get_notifier: - notifier = get_notifier.return_value + endpoint = mock.MagicMock() + target = oslo.messaging.Target(topic="notifications", + exchange="ceilometer") + listener = messaging.get_notification_listener([target], + [endpoint]) + listener.start() + endpoint.info.side_effect = lambda *args: listener.stop() + self.post_json('/alarms', params=json, headers=self.auth_headers) + listener.wait() - self.post_json('/alarms', params=json, headers=self.auth_headers) - get_notifier.assert_called_once_with(publisher_id='ceilometer.api') + class PayloadMatcher(object): + def __eq__(self, payload): + return payload['detail']['name'] == 'sent_notification' and \ + payload['type'] == 'creation' and \ + payload['detail']['rule']['meter_name'] == 'ameter' and \ + set(['alarm_id', 'detail', 'event_id', 'on_behalf_of', + 'project_id', 'timestamp', + 'user_id']).issubset(payload.keys()) - calls = notifier.info.call_args_list - self.assertEqual(1, len(calls)) - args, _ = calls[0] - context, event_type, payload = args - self.assertEqual('alarm.creation', event_type) - self.assertEqual('sent_notification', payload['detail']['name']) - self.assertTrue(set(['alarm_id', 'detail', 'event_id', 'on_behalf_of', - 'project_id', 'timestamp', 'type', - 'user_id']).issubset(payload.keys())) + endpoint.info.assert_called_once_with( + {'instance_uuid': None, + 'domain': None, + 'project_domain': None, + 'auth_token': None, + 'is_admin': False, + 'user': None, + 'tenant': None, + 'read_only': False, + 'show_deleted': False, + 'user_identity': '- - - - -', + 'request_id': mock.ANY, + 'user_domain': None}, + 'ceilometer.api', 'alarm.creation', + PayloadMatcher(), mock.ANY) def test_alarm_sends_notification(self): # Hit the AlarmController (with alarm_id supplied) ... diff --git a/ceilometer/tests/publisher/test_rpc_publisher.py b/ceilometer/tests/publisher/test_rpc_publisher.py index f45e9f141..6afc239f0 100644 --- a/ceilometer/tests/publisher/test_rpc_publisher.py +++ b/ceilometer/tests/publisher/test_rpc_publisher.py @@ -26,6 +26,7 @@ import oslo.messaging import oslo.messaging._drivers.common from ceilometer import messaging +from ceilometer.openstack.common import context from ceilometer.openstack.common.fixture import config from ceilometer.openstack.common import network_utils from ceilometer.openstack.common import test @@ -95,25 +96,36 @@ class TestPublish(test.BaseTestCase): def setUp(self): super(TestPublish, self).setUp() self.CONF = self.useFixture(config.Config()).conf - messaging.setup('fake://') self.addCleanup(messaging.cleanup) - self.published = [] - def test_published(self): + def test_published_no_mock(self): publisher = rpc.RPCPublisher( network_utils.urlsplit('rpc://')) - cast_context = mock.MagicMock() - with mock.patch.object(publisher.rpc_client, 'prepare') as prepare: - prepare.return_value = cast_context - publisher.publish_samples(mock.MagicMock(), - self.test_data) - prepare.assert_called_once_with( - topic=self.CONF.publisher_rpc.metering_topic) - cast_context.cast.assert_called_once_with( - mock.ANY, 'record_metering_data', data=mock.ANY) + endpoint = mock.MagicMock(['record_metering_data']) + collector = messaging.get_rpc_server( + self.CONF.publisher_rpc.metering_topic, endpoint) + endpoint.record_metering_data.side_effect = \ + lambda *args, **kwds: collector.stop() + + collector.start() + eventlet.sleep() + publisher.publish_samples(context.RequestContext(), + self.test_data) + collector.wait() + + class Matcher(object): + @staticmethod + def __eq__(data): + for i, sample in enumerate(data): + if sample['counter_name'] != self.test_data[i].name: + return False + return True + + endpoint.record_metering_data.assert_called_once_with( + mock.ANY, data=Matcher()) def test_publish_target(self): publisher = rpc.RPCPublisher( diff --git a/ceilometer/tests/test_collector.py b/ceilometer/tests/test_collector.py index 34ee106f1..436c9b008 100644 --- a/ceilometer/tests/test_collector.py +++ b/ceilometer/tests/test_collector.py @@ -25,7 +25,9 @@ from stevedore import extension from ceilometer import collector from ceilometer import messaging +from ceilometer.openstack.common import context from ceilometer.openstack.common.fixture import config +from ceilometer.openstack.common import timeutils from ceilometer.publisher import utils from ceilometer import sample from ceilometer.tests import base as tests_base @@ -41,10 +43,11 @@ class TestCollector(tests_base.BaseTestCase): super(TestCollector, self).setUp() messaging.setup('fake://') self.addCleanup(messaging.cleanup) + self.CONF = self.useFixture(config.Config()).conf self.CONF.set_override("connection", "log://", group='database') - self.srv = collector.CollectorService() - self.CONF.publisher.metering_secret = 'not-so-secret' + self.CONF.set_override('metering_secret', 'not-so-secret', + group='publisher') self.counter = sample.Sample( name='foobar', type='bad', @@ -53,7 +56,7 @@ class TestCollector(tests_base.BaseTestCase): user_id='jd', project_id='ceilometer', resource_id='cat', - timestamp='NOW!', + timestamp=timeutils.utcnow().isoformat(), resource_metadata={}, ).as_dict() @@ -66,12 +69,14 @@ class TestCollector(tests_base.BaseTestCase): user_id=u'test', project_id=u'test', resource_id=u'test_run_tasks', - timestamp=u'NOW!', + timestamp=timeutils.utcnow().isoformat(), resource_metadata={u'name': [([u'TestPublish'])]}, source=u'testsource', ), 'not-so-secret') + self.srv = collector.CollectorService() + def _make_test_manager(self, plugin): return extension.ExtensionManager.make_test_instance([ extension.Extension( @@ -202,3 +207,18 @@ class TestCollector(tests_base.BaseTestCase): self.assertTrue(utils.verify_signature( mock_dispatcher.method_calls[0][1][0], "not-so-secret")) + + @mock.patch('ceilometer.storage.impl_log.LOG') + def test_collector_no_mock(self, mylog): + self.CONF.set_override('udp_address', '', group='collector') + self.srv.start() + mylog.info.side_effect = lambda *args: self.srv.stop() + + client = messaging.get_rpc_client(version='1.0') + cclient = client.prepare(topic='metering') + cclient.cast(context.RequestContext(), + 'record_metering_data', data=[self.utf8_msg]) + + self.srv.rpc_server.wait() + mylog.info.assert_called_once_with( + 'metering data test for test_run_tasks: 1') diff --git a/ceilometer/tests/test_notification.py b/ceilometer/tests/test_notification.py index b62437149..4d7d8a88a 100644 --- a/ceilometer/tests/test_notification.py +++ b/ceilometer/tests/test_notification.py @@ -20,11 +20,15 @@ import mock import oslo.messaging +import oslo.messaging.conffixture from stevedore import extension +import yaml from ceilometer.compute.notifications import instance from ceilometer import messaging from ceilometer import notification +from ceilometer.openstack.common import context +from ceilometer.openstack.common import fileutils from ceilometer.openstack.common.fixture import config from ceilometer.tests import base as tests_base @@ -158,3 +162,57 @@ class TestNotification(tests_base.BaseTestCase): self.assertEqual(2, len(self.srv.listeners[0].dispatcher.endpoints)) event_endpoint = self.srv.listeners[0].dispatcher.endpoints[0] self.assertEqual(1, len(list(event_endpoint.dispatcher_manager))) + + +class TestRealNotification(tests_base.BaseTestCase): + def setUp(self): + super(TestRealNotification, self).setUp() + self.CONF = self.useFixture(config.Config()).conf + self.useFixture(oslo.messaging.conffixture.ConfFixture(self.CONF)) + + pipeline = yaml.dump([{ + 'name': 'test_pipeline', + 'interval': 5, + 'counters': ['*'], + 'transformers': [], + 'publishers': ['test://'], + }]) + pipeline_cfg_file = fileutils.write_to_tempfile(content=pipeline, + prefix="pipeline", + suffix="yaml") + self.CONF.set_override("pipeline_cfg_file", pipeline_cfg_file) + self.CONF.set_override("notification_driver", "messaging") + self.CONF.set_override("control_exchange", "nova") + messaging.setup('fake://') + self.addCleanup(messaging.cleanup) + + self.srv = notification.NotificationService() + + @mock.patch('ceilometer.publisher.test.TestPublisher') + def test_notification_service(self, fake_publisher_cls): + self.srv.start() + + fake_publisher = fake_publisher_cls.return_value + fake_publisher.publish_samples.side_effect = \ + lambda *args: self.srv.stop() + + notifier = messaging.get_notifier("compute.vagrant-precise") + notifier.info(context.RequestContext(), 'compute.instance.create.end', + TEST_NOTICE_PAYLOAD) + + self.srv.listeners[0].wait() + + class SamplesMatcher(object): + def __eq__(self, samples): + for s in samples: + if s.resource_id != "9f9d01b9-4a58-4271-9e27-398b21ab20d1": + return False + return True + + fake_publisher.publish_samples.assert_has_calls([ + mock.call(mock.ANY, SamplesMatcher()), + mock.call(mock.ANY, SamplesMatcher()), + mock.call(mock.ANY, SamplesMatcher()), + mock.call(mock.ANY, SamplesMatcher()), + mock.call(mock.ANY, SamplesMatcher()), + ])