From 6f9e46ba5c4ed9a04e4f12460be5edc0b9c0602b Mon Sep 17 00:00:00 2001 From: Mehdi Abaakouk Date: Wed, 7 May 2014 14:42:49 +0200 Subject: [PATCH] oslo.messaging context must be a dict oslo.messaging assumes the context is a dict not a RequestContext and it assumes the payload in json serializable. This patch ensures this. Also it removes oslo.messaging mock on some tests and use real oslo.messaging library with the fake driver. Change-Id: Ie3c6083bbc4ec83de28e42bb10e7c50c7e135070 Closes-bug: #1275771 Closes-bug: #1317290 --- ceilometer/api/controllers/v2.py | 4 +- ceilometer/messaging.py | 57 ++++++- ceilometer/publisher/rpc.py | 6 +- ceilometer/tests/alarm/test_rpc.py | 152 ++++++++++++------ ceilometer/tests/api/__init__.py | 5 +- .../tests/api/v2/test_alarm_scenarios.py | 47 ++++-- .../tests/publisher/test_rpc_publisher.py | 36 +++-- ceilometer/tests/test_collector.py | 28 +++- ceilometer/tests/test_notification.py | 58 +++++++ 9 files changed, 307 insertions(+), 86 deletions(-) diff --git a/ceilometer/api/controllers/v2.py b/ceilometer/api/controllers/v2.py index 330fa94ba..c7bb61413 100644 --- a/ceilometer/api/controllers/v2.py +++ b/ceilometer/api/controllers/v2.py @@ -595,7 +595,9 @@ def _send_notification(event, payload): notification = event.replace(" ", "_") notification = "alarm.%s" % notification notifier = messaging.get_notifier(publisher_id="ceilometer.api") - notifier.info(None, notification, payload) + # FIXME(sileht): perhaps we need to copy some infos from the + # pecan request headers like nova does + notifier.info(context.RequestContext(), notification, payload) class OldSample(_Base): diff --git a/ceilometer/messaging.py b/ceilometer/messaging.py index 9d739e7d4..24f94a7e2 100644 --- a/ceilometer/messaging.py +++ b/ceilometer/messaging.py @@ -15,9 +15,14 @@ # License for the specific language governing permissions and limitations # under the License. +import eventlet from oslo.config import cfg import oslo.messaging +from ceilometer.openstack.common import context +from ceilometer.openstack.common import jsonutils + + TRANSPORT = None NOTIFIER = None @@ -28,15 +33,51 @@ _ALIASES = { } +class RequestContextSerializer(oslo.messaging.Serializer): + def __init__(self, base): + self._base = base + + def serialize_entity(self, ctxt, entity): + if not self._base: + return entity + return self._base.serialize_entity(ctxt, entity) + + def deserialize_entity(self, ctxt, entity): + if not self._base: + return entity + return self._base.deserialize_entity(ctxt, entity) + + @staticmethod + def serialize_context(ctxt): + return ctxt.to_dict() + + @staticmethod + def deserialize_context(ctxt): + return context.RequestContext(ctxt) + + +class JsonPayloadSerializer(oslo.messaging.NoOpSerializer): + @classmethod + def serialize_entity(cls, context, entity): + return jsonutils.to_primitive(entity, convert_instances=True) + + def setup(url=None): """Initialise the oslo.messaging layer.""" global TRANSPORT, NOTIFIER + + if url and url.startswith("fake://"): + # NOTE(sileht): oslo.messaging fake driver uses time.sleep + # for task switch, so we need to monkey_patch it + eventlet.monkey_patch(time=True) + if not TRANSPORT: oslo.messaging.set_transport_defaults('ceilometer') TRANSPORT = oslo.messaging.get_transport(cfg.CONF, url, aliases=_ALIASES) if not NOTIFIER: - NOTIFIER = oslo.messaging.Notifier(TRANSPORT) + serializer = RequestContextSerializer(JsonPayloadSerializer()) + NOTIFIER = oslo.messaging.Notifier(TRANSPORT, serializer=serializer) def cleanup(): @@ -52,15 +93,19 @@ def get_rpc_server(topic, endpoint): """Return a configured oslo.messaging rpc server.""" global TRANSPORT target = oslo.messaging.Target(server=cfg.CONF.host, topic=topic) + serializer = RequestContextSerializer(JsonPayloadSerializer()) return oslo.messaging.get_rpc_server(TRANSPORT, target, [endpoint], - executor='eventlet') + executor='eventlet', + serializer=serializer) def get_rpc_client(**kwargs): """Return a configured oslo.messaging RPCClient.""" global TRANSPORT target = oslo.messaging.Target(**kwargs) - return oslo.messaging.RPCClient(TRANSPORT, target) + serializer = RequestContextSerializer(JsonPayloadSerializer()) + return oslo.messaging.RPCClient(TRANSPORT, target, + serializer=serializer) def get_notification_listener(targets, endpoints, url=None): @@ -83,9 +128,9 @@ def get_notifier(publisher_id): def convert_to_old_notification_format(priority, ctxt, publisher_id, event_type, payload, metadata): - #FIXME(sileht): temporary convert notification to old format - #to focus on oslo.messaging migration before refactoring the code to - #use the new oslo.messaging facilities + # FIXME(sileht): temporary convert notification to old format + # to focus on oslo.messaging migration before refactoring the code to + # use the new oslo.messaging facilities notification = {'priority': priority, 'payload': payload, 'event_type': event_type, diff --git a/ceilometer/publisher/rpc.py b/ceilometer/publisher/rpc.py index 9f733791a..922098af2 100644 --- a/ceilometer/publisher/rpc.py +++ b/ceilometer/publisher/rpc.py @@ -142,7 +142,7 @@ class RPCPublisher(publisher.PublisherBase): self.flush() def flush(self): - #note(sileht): + # NOTE(sileht): # IO of the rpc stuff in handled by eventlet, # this is why the self.local_queue, is emptied before processing the # queue and the remaining messages in the queue are added to @@ -164,7 +164,7 @@ class RPCPublisher(publisher.PublisherBase): "dropping %d oldest samples") % count) def _process_queue(self, queue, policy): - #note(sileht): + # NOTE(sileht): # the behavior of rpc.cast call depends of rabbit_max_retries # if rabbit_max_retries <= 0: # it returns only if the msg has been sent on the amqp queue @@ -178,7 +178,7 @@ class RPCPublisher(publisher.PublisherBase): context, topic, meters = queue[0] try: self.rpc_client.prepare(topic=topic).cast( - context.to_dict(), self.target, data=meters) + context, self.target, data=meters) except oslo.messaging._drivers.common.RPCException: samples = sum([len(m) for __, __, m in queue]) if policy == 'queue': diff --git a/ceilometer/tests/alarm/test_rpc.py b/ceilometer/tests/alarm/test_rpc.py index d798c4e20..35b5f33de 100644 --- a/ceilometer/tests/alarm/test_rpc.py +++ b/ceilometer/tests/alarm/test_rpc.py @@ -19,30 +19,38 @@ import uuid from ceilometerclient.v2 import alarms -import mock +import eventlet from ceilometer.alarm import rpc as rpc_alarm from ceilometer import messaging -from ceilometer.openstack.common.fixture import mockpatch from ceilometer.openstack.common import test from ceilometer.openstack.common import timeutils from ceilometer.storage import models -class TestRPCAlarmNotifier(test.BaseTestCase): - def fake_cast(self, context, method, **args): - self.notified.append((method, args)) +class FakeNotifier(object): + def __init__(self): + self.rpc = messaging.get_rpc_server("alarm_notifier", self) + self.notified = [] + def start(self, expected_length): + self.expected_length = expected_length + self.rpc.start() + + def notify_alarm(self, context, data): + self.notified.append(data) + if len(self.notified) == self.expected_length: + self.rpc.stop() + + +class TestRPCAlarmNotifier(test.BaseTestCase): def setUp(self): super(TestRPCAlarmNotifier, self).setUp() messaging.setup('fake://') self.addCleanup(messaging.cleanup) - self.notified = [] + self.notifier_server = FakeNotifier() self.notifier = rpc_alarm.RPCAlarmNotifier() - self.useFixture(mockpatch.PatchObject( - self.notifier.client, 'cast', - side_effect=self.fake_cast)) self.alarms = [ alarms.Alarm(None, info={ 'name': 'instance_running_hot', @@ -83,29 +91,36 @@ class TestRPCAlarmNotifier(test.BaseTestCase): self.assertEqual('alarm_notifier', topic) def test_notify_alarm(self): + self.notifier_server.start(2) + previous = ['alarm', 'ok'] for i, a in enumerate(self.alarms): self.notifier.notify(a, previous[i], "what? %d" % i, {'fire': '%d' % i}) - self.assertEqual(2, len(self.notified)) + + self.notifier_server.rpc.wait() + + self.assertEqual(2, len(self.notifier_server.notified)) for i, a in enumerate(self.alarms): actions = getattr(a, models.Alarm.ALARM_ACTIONS_MAP[a.state]) - self.assertEqual('notify_alarm', self.notified[i][0]) self.assertEqual(self.alarms[i].alarm_id, - self.notified[i][1]["data"]["alarm_id"]) - self.assertEqual(actions, self.notified[i][1]["data"]["actions"]) + self.notifier_server.notified[i]["alarm_id"]) + self.assertEqual(actions, + self.notifier_server.notified[i]["actions"]) self.assertEqual(previous[i], - self.notified[i][1]["data"]["previous"]) + self.notifier_server.notified[i]["previous"]) self.assertEqual(self.alarms[i].state, - self.notified[i][1]["data"]["current"]) + self.notifier_server.notified[i]["current"]) self.assertEqual("what? %d" % i, - self.notified[i][1]["data"]["reason"]) + self.notifier_server.notified[i]["reason"]) self.assertEqual({'fire': '%d' % i}, - self.notified[i][1]["data"]["reason_data"]) + self.notifier_server.notified[i]["reason_data"]) def test_notify_non_string_reason(self): + self.notifier_server.start(1) self.notifier.notify(self.alarms[0], 'ok', 42, {}) - reason = self.notified[0][1]['data']['reason'] + self.notifier_server.rpc.wait() + reason = self.notifier_server.notified[0]['reason'] self.assertIsInstance(reason, basestring) def test_notify_no_actions(self): @@ -126,52 +141,99 @@ class TestRPCAlarmNotifier(test.BaseTestCase): 'my_instance'} }) self.notifier.notify(alarm, 'alarm', "what?", {}) - self.assertEqual(0, len(self.notified)) + self.assertEqual(0, len(self.notifier_server.notified)) + + +class FakeCoordinator(object): + def __init__(self): + self.rpc = messaging.get_rpc_server( + "alarm_partition_coordination", self) + self.notified = [] + + def presence(self, context, data): + self._record('presence', data) + + def allocate(self, context, data): + self._record('allocate', data) + + def assign(self, context, data): + self._record('assign', data) + + def _record(self, method, data): + self.notified.append((method, data)) + self.rpc.stop() class TestRPCAlarmPartitionCoordination(test.BaseTestCase): - def fake_fanout_cast(self, context, method, **args): - self.notified.append((method, args)) - - def fake_prepare(self, fanout): - self.assertTrue(fanout) - cctxt = mock.Mock() - cctxt.cast.side_effect = self.fake_fanout_cast - return cctxt - def setUp(self): super(TestRPCAlarmPartitionCoordination, self).setUp() messaging.setup('fake://') self.addCleanup(messaging.cleanup) - self.notified = [] + self.coordinator_server = FakeCoordinator() + self.coordinator_server.rpc.start() + eventlet.sleep() # must be sure that fanout queue is created + self.ordination = rpc_alarm.RPCAlarmPartitionCoordination() - self.useFixture(mockpatch.PatchObject( - self.ordination.client, 'prepare', - side_effect=self.fake_prepare)) - self.alarms = [mock.MagicMock(), mock.MagicMock()] + self.alarms = [ + alarms.Alarm(None, info={ + 'name': 'instance_running_hot', + 'meter_name': 'cpu_util', + 'comparison_operator': 'gt', + 'threshold': 80.0, + 'evaluation_periods': 5, + 'statistic': 'avg', + 'state': 'ok', + 'ok_actions': ['http://host:8080/path'], + 'user_id': 'foobar', + 'project_id': 'snafu', + 'period': 60, + 'alarm_id': str(uuid.uuid4()), + 'matching_metadata':{'resource_id': + 'my_instance'} + }), + alarms.Alarm(None, info={ + 'name': 'group_running_idle', + 'meter_name': 'cpu_util', + 'comparison_operator': 'le', + 'threshold': 10.0, + 'statistic': 'max', + 'evaluation_periods': 4, + 'state': 'insufficient data', + 'insufficient_data_actions': ['http://other_host/path'], + 'user_id': 'foobar', + 'project_id': 'snafu', + 'period': 300, + 'alarm_id': str(uuid.uuid4()), + 'matching_metadata':{'metadata.user_metadata.AS': + 'my_group'} + }), + ] def test_ordination_presence(self): - id = uuid.uuid4() + id = str(uuid.uuid4()) priority = float(timeutils.utcnow().strftime('%s.%f')) self.ordination.presence(id, priority) - method, args = self.notified[0] - self.assertEqual(id, args['data']['uuid']) - self.assertEqual(priority, args['data']['priority']) + self.coordinator_server.rpc.wait() + method, args = self.coordinator_server.notified[0] + self.assertEqual(id, args['uuid']) + self.assertEqual(priority, args['priority']) self.assertEqual('presence', method) def test_ordination_assign(self): - id = uuid.uuid4() + id = str(uuid.uuid4()) self.ordination.assign(id, self.alarms) - method, args = self.notified[0] - self.assertEqual(id, args['data']['uuid']) - self.assertEqual(2, len(args['data']['alarms'])) + self.coordinator_server.rpc.wait() + method, args = self.coordinator_server.notified[0] + self.assertEqual(id, args['uuid']) + self.assertEqual(2, len(args['alarms'])) self.assertEqual('assign', method) def test_ordination_allocate(self): - id = uuid.uuid4() + id = str(uuid.uuid4()) self.ordination.allocate(id, self.alarms) - method, args = self.notified[0] - self.assertEqual(id, args['data']['uuid']) - self.assertEqual(2, len(args['data']['alarms'])) + self.coordinator_server.rpc.wait() + method, args = self.coordinator_server.notified[0] + self.assertEqual(id, args['uuid']) + self.assertEqual(2, len(args['alarms'])) self.assertEqual('allocate', method) diff --git a/ceilometer/tests/api/__init__.py b/ceilometer/tests/api/__init__.py index a6b53d0f9..69fbb2b99 100644 --- a/ceilometer/tests/api/__init__.py +++ b/ceilometer/tests/api/__init__.py @@ -19,6 +19,7 @@ """ from oslo.config import cfg +import oslo.messaging.conffixture import pecan import pecan.testing @@ -38,9 +39,11 @@ class FunctionalTest(db_test_base.TestBase): PATH_PREFIX = '' def setUp(self): + super(FunctionalTest, self).setUp() + self.useFixture(oslo.messaging.conffixture.ConfFixture(self.CONF)) + self.CONF.set_override("notification_driver", "messaging") messaging.setup("fake://") self.addCleanup(messaging.cleanup) - super(FunctionalTest, self).setUp() self.CONF.set_override("auth_version", "v2.0", group=OPT_GROUP_NAME) self.CONF.set_override("policy_file", diff --git a/ceilometer/tests/api/v2/test_alarm_scenarios.py b/ceilometer/tests/api/v2/test_alarm_scenarios.py index b2441e29c..aa3481edf 100644 --- a/ceilometer/tests/api/v2/test_alarm_scenarios.py +++ b/ceilometer/tests/api/v2/test_alarm_scenarios.py @@ -26,7 +26,7 @@ import logging import uuid import mock - +import oslo.messaging.conffixture from six import moves from ceilometer import messaging @@ -1844,21 +1844,40 @@ class TestAlarms(FunctionalTest, } } - with mock.patch.object(messaging, 'get_notifier') as get_notifier: - notifier = get_notifier.return_value + endpoint = mock.MagicMock() + target = oslo.messaging.Target(topic="notifications", + exchange="ceilometer") + listener = messaging.get_notification_listener([target], + [endpoint]) + listener.start() + endpoint.info.side_effect = lambda *args: listener.stop() + self.post_json('/alarms', params=json, headers=self.auth_headers) + listener.wait() - self.post_json('/alarms', params=json, headers=self.auth_headers) - get_notifier.assert_called_once_with(publisher_id='ceilometer.api') + class PayloadMatcher(object): + def __eq__(self, payload): + return payload['detail']['name'] == 'sent_notification' and \ + payload['type'] == 'creation' and \ + payload['detail']['rule']['meter_name'] == 'ameter' and \ + set(['alarm_id', 'detail', 'event_id', 'on_behalf_of', + 'project_id', 'timestamp', + 'user_id']).issubset(payload.keys()) - calls = notifier.info.call_args_list - self.assertEqual(1, len(calls)) - args, _ = calls[0] - context, event_type, payload = args - self.assertEqual('alarm.creation', event_type) - self.assertEqual('sent_notification', payload['detail']['name']) - self.assertTrue(set(['alarm_id', 'detail', 'event_id', 'on_behalf_of', - 'project_id', 'timestamp', 'type', - 'user_id']).issubset(payload.keys())) + endpoint.info.assert_called_once_with( + {'instance_uuid': None, + 'domain': None, + 'project_domain': None, + 'auth_token': None, + 'is_admin': False, + 'user': None, + 'tenant': None, + 'read_only': False, + 'show_deleted': False, + 'user_identity': '- - - - -', + 'request_id': mock.ANY, + 'user_domain': None}, + 'ceilometer.api', 'alarm.creation', + PayloadMatcher(), mock.ANY) def test_alarm_sends_notification(self): # Hit the AlarmController (with alarm_id supplied) ... diff --git a/ceilometer/tests/publisher/test_rpc_publisher.py b/ceilometer/tests/publisher/test_rpc_publisher.py index f45e9f141..6afc239f0 100644 --- a/ceilometer/tests/publisher/test_rpc_publisher.py +++ b/ceilometer/tests/publisher/test_rpc_publisher.py @@ -26,6 +26,7 @@ import oslo.messaging import oslo.messaging._drivers.common from ceilometer import messaging +from ceilometer.openstack.common import context from ceilometer.openstack.common.fixture import config from ceilometer.openstack.common import network_utils from ceilometer.openstack.common import test @@ -95,25 +96,36 @@ class TestPublish(test.BaseTestCase): def setUp(self): super(TestPublish, self).setUp() self.CONF = self.useFixture(config.Config()).conf - messaging.setup('fake://') self.addCleanup(messaging.cleanup) - self.published = [] - def test_published(self): + def test_published_no_mock(self): publisher = rpc.RPCPublisher( network_utils.urlsplit('rpc://')) - cast_context = mock.MagicMock() - with mock.patch.object(publisher.rpc_client, 'prepare') as prepare: - prepare.return_value = cast_context - publisher.publish_samples(mock.MagicMock(), - self.test_data) - prepare.assert_called_once_with( - topic=self.CONF.publisher_rpc.metering_topic) - cast_context.cast.assert_called_once_with( - mock.ANY, 'record_metering_data', data=mock.ANY) + endpoint = mock.MagicMock(['record_metering_data']) + collector = messaging.get_rpc_server( + self.CONF.publisher_rpc.metering_topic, endpoint) + endpoint.record_metering_data.side_effect = \ + lambda *args, **kwds: collector.stop() + + collector.start() + eventlet.sleep() + publisher.publish_samples(context.RequestContext(), + self.test_data) + collector.wait() + + class Matcher(object): + @staticmethod + def __eq__(data): + for i, sample in enumerate(data): + if sample['counter_name'] != self.test_data[i].name: + return False + return True + + endpoint.record_metering_data.assert_called_once_with( + mock.ANY, data=Matcher()) def test_publish_target(self): publisher = rpc.RPCPublisher( diff --git a/ceilometer/tests/test_collector.py b/ceilometer/tests/test_collector.py index 34ee106f1..436c9b008 100644 --- a/ceilometer/tests/test_collector.py +++ b/ceilometer/tests/test_collector.py @@ -25,7 +25,9 @@ from stevedore import extension from ceilometer import collector from ceilometer import messaging +from ceilometer.openstack.common import context from ceilometer.openstack.common.fixture import config +from ceilometer.openstack.common import timeutils from ceilometer.publisher import utils from ceilometer import sample from ceilometer.tests import base as tests_base @@ -41,10 +43,11 @@ class TestCollector(tests_base.BaseTestCase): super(TestCollector, self).setUp() messaging.setup('fake://') self.addCleanup(messaging.cleanup) + self.CONF = self.useFixture(config.Config()).conf self.CONF.set_override("connection", "log://", group='database') - self.srv = collector.CollectorService() - self.CONF.publisher.metering_secret = 'not-so-secret' + self.CONF.set_override('metering_secret', 'not-so-secret', + group='publisher') self.counter = sample.Sample( name='foobar', type='bad', @@ -53,7 +56,7 @@ class TestCollector(tests_base.BaseTestCase): user_id='jd', project_id='ceilometer', resource_id='cat', - timestamp='NOW!', + timestamp=timeutils.utcnow().isoformat(), resource_metadata={}, ).as_dict() @@ -66,12 +69,14 @@ class TestCollector(tests_base.BaseTestCase): user_id=u'test', project_id=u'test', resource_id=u'test_run_tasks', - timestamp=u'NOW!', + timestamp=timeutils.utcnow().isoformat(), resource_metadata={u'name': [([u'TestPublish'])]}, source=u'testsource', ), 'not-so-secret') + self.srv = collector.CollectorService() + def _make_test_manager(self, plugin): return extension.ExtensionManager.make_test_instance([ extension.Extension( @@ -202,3 +207,18 @@ class TestCollector(tests_base.BaseTestCase): self.assertTrue(utils.verify_signature( mock_dispatcher.method_calls[0][1][0], "not-so-secret")) + + @mock.patch('ceilometer.storage.impl_log.LOG') + def test_collector_no_mock(self, mylog): + self.CONF.set_override('udp_address', '', group='collector') + self.srv.start() + mylog.info.side_effect = lambda *args: self.srv.stop() + + client = messaging.get_rpc_client(version='1.0') + cclient = client.prepare(topic='metering') + cclient.cast(context.RequestContext(), + 'record_metering_data', data=[self.utf8_msg]) + + self.srv.rpc_server.wait() + mylog.info.assert_called_once_with( + 'metering data test for test_run_tasks: 1') diff --git a/ceilometer/tests/test_notification.py b/ceilometer/tests/test_notification.py index b62437149..4d7d8a88a 100644 --- a/ceilometer/tests/test_notification.py +++ b/ceilometer/tests/test_notification.py @@ -20,11 +20,15 @@ import mock import oslo.messaging +import oslo.messaging.conffixture from stevedore import extension +import yaml from ceilometer.compute.notifications import instance from ceilometer import messaging from ceilometer import notification +from ceilometer.openstack.common import context +from ceilometer.openstack.common import fileutils from ceilometer.openstack.common.fixture import config from ceilometer.tests import base as tests_base @@ -158,3 +162,57 @@ class TestNotification(tests_base.BaseTestCase): self.assertEqual(2, len(self.srv.listeners[0].dispatcher.endpoints)) event_endpoint = self.srv.listeners[0].dispatcher.endpoints[0] self.assertEqual(1, len(list(event_endpoint.dispatcher_manager))) + + +class TestRealNotification(tests_base.BaseTestCase): + def setUp(self): + super(TestRealNotification, self).setUp() + self.CONF = self.useFixture(config.Config()).conf + self.useFixture(oslo.messaging.conffixture.ConfFixture(self.CONF)) + + pipeline = yaml.dump([{ + 'name': 'test_pipeline', + 'interval': 5, + 'counters': ['*'], + 'transformers': [], + 'publishers': ['test://'], + }]) + pipeline_cfg_file = fileutils.write_to_tempfile(content=pipeline, + prefix="pipeline", + suffix="yaml") + self.CONF.set_override("pipeline_cfg_file", pipeline_cfg_file) + self.CONF.set_override("notification_driver", "messaging") + self.CONF.set_override("control_exchange", "nova") + messaging.setup('fake://') + self.addCleanup(messaging.cleanup) + + self.srv = notification.NotificationService() + + @mock.patch('ceilometer.publisher.test.TestPublisher') + def test_notification_service(self, fake_publisher_cls): + self.srv.start() + + fake_publisher = fake_publisher_cls.return_value + fake_publisher.publish_samples.side_effect = \ + lambda *args: self.srv.stop() + + notifier = messaging.get_notifier("compute.vagrant-precise") + notifier.info(context.RequestContext(), 'compute.instance.create.end', + TEST_NOTICE_PAYLOAD) + + self.srv.listeners[0].wait() + + class SamplesMatcher(object): + def __eq__(self, samples): + for s in samples: + if s.resource_id != "9f9d01b9-4a58-4271-9e27-398b21ab20d1": + return False + return True + + fake_publisher.publish_samples.assert_has_calls([ + mock.call(mock.ANY, SamplesMatcher()), + mock.call(mock.ANY, SamplesMatcher()), + mock.call(mock.ANY, SamplesMatcher()), + mock.call(mock.ANY, SamplesMatcher()), + mock.call(mock.ANY, SamplesMatcher()), + ])