# Copyright (c) 2013 Red Hat, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or # implied. # See the License for the specific language governing permissions and # limitations under the License. from marconi import storage from marconi.storage import exceptions from marconi.tests import util as testing class ControllerBaseTest(testing.TestBase): project = "project" driver_class = None controller_class = None controller_base_class = None def setUp(self): super(ControllerBaseTest, self).setUp() if not self.driver_class: self.skipTest("No driver class specified") if not issubclass(self.controller_class, self.controller_base_class): self.skipTest("%s is not an instance of %s. Tests not supported" % (self.controller_class, self.controller_base_class)) self.driver = self.driver_class() self.controller = self.controller_class(self.driver) class QueueControllerTest(ControllerBaseTest): """Queue Controller base tests.""" controller_base_class = storage.QueueBase def setUp(self): super(QueueControllerTest, self).setUp() self.message_controller = self.driver.message_controller self.claim_controller = self.driver.claim_controller def test_list(self): num = 15 for queue in xrange(num): self.controller.upsert(queue, {}, project=self.project) interaction = self.controller.list(project=self.project, detailed=True) queues = list(interaction.next()) self.assertEquals(all(map(lambda queue: 'name' in queue and 'metadata' in queue, queues)), True) self.assertEquals(len(queues), 10) interaction = self.controller.list(project=self.project, marker=interaction.next()) queues = list(interaction.next()) self.assertEquals(all(map(lambda queue: 'name' in queue and 'metadata' not in queue, queues)), True) self.assertEquals(len(queues), 5) def test_queue_lifecycle(self): # Test Queue Creation created = self.controller.upsert("test", project=self.project, metadata=dict(topic="test_queue")) self.assertTrue(created) # Test Queue retrieval queue = self.controller.get("test", project=self.project) self.assertIsNotNone(queue) # Test Queue Update created = self.controller.upsert("test", project=self.project, metadata=dict(meta="test_meta")) self.assertFalse(created) queue = self.controller.get("test", project=self.project) self.assertEqual(queue["meta"], "test_meta") # Test Queue Statistic _insert_fixtures(self.message_controller, "test", project=self.project, client_uuid="my_uuid", num=12) countof = self.controller.stats("test", project=self.project) self.assertEqual(countof['messages']['free'], 12) # Test Queue Deletion self.controller.delete("test", project=self.project) # Test DoesNotExist Exception self.assertRaises(storage.exceptions.DoesNotExist, self.controller.get, "test", project=self.project) class MessageControllerTest(ControllerBaseTest): """Message Controller base tests. NOTE(flaper87): Implementations of this class should override the tearDown method in order to clean up storage's state. """ queue_name = "test_queue" controller_base_class = storage.MessageBase def setUp(self): super(MessageControllerTest, self).setUp() # Lets create a queue self.queue_controller = self.driver.queue_controller self.claim_controller = self.driver.claim_controller self.queue_controller.upsert(self.queue_name, {}, project=self.project) def tearDown(self): self.queue_controller.delete(self.queue_name, project=self.project) super(MessageControllerTest, self).tearDown() def test_message_lifecycle(self): queue_name = self.queue_name messages = [ { "ttl": 60, "body": { "event": "BackupStarted", "backupId": "c378813c-3f0b-11e2-ad92-7823d2b0f3ce" } }, ] # Test Message Creation created = list(self.controller.post(queue_name, messages, project=self.project, client_uuid="unused")) self.assertEqual(len(created), 1) # Test Message Get self.controller.get(queue_name, created[0], project=self.project) # Test Message Deletion self.controller.delete(queue_name, created[0], project=self.project) # Test DoesNotExist self.assertRaises(storage.exceptions.DoesNotExist, self.controller.get, queue_name, message_id=created[0], project=self.project) def test_get_multi(self): _insert_fixtures(self.controller, self.queue_name, project=self.project, client_uuid="my_uuid", num=15) def load_messages(expected, *args, **kwargs): interaction = self.controller.list(*args, **kwargs) msgs = list(interaction.next()) self.assertEqual(len(msgs), expected) return interaction # Test all messages, echo False and uuid load_messages(0, self.queue_name, project=self.project, client_uuid="my_uuid") # Test all messages and limit load_messages(15, self.queue_name, project=self.project, limit=20, echo=True) # Test all messages, echo True, and uuid interaction = load_messages(10, self.queue_name, echo=True, project=self.project, client_uuid="my_uuid") # Test all messages, echo True, uuid and marker load_messages(5, self.queue_name, echo=True, project=self.project, marker=interaction.next(), client_uuid="my_uuid") def test_claim_effects(self): _insert_fixtures(self.controller, self.queue_name, project=self.project, client_uuid="my_uuid", num=12) meta = {"ttl": 70} another_cid, _ = self.claim_controller.create(self.queue_name, meta, project=self.project) cid, msgs = self.claim_controller.create(self.queue_name, meta, project=self.project) [msg1, msg2] = msgs # A wrong claim does not ensure the message deletion with testing.expect(storage.exceptions.NotPermitted): self.controller.delete(self.queue_name, msg1["id"], project=self.project, claim=another_cid) # Make sure a message can be deleted with a claim self.controller.delete(self.queue_name, msg1["id"], project=self.project, claim=cid) with testing.expect(storage.exceptions.DoesNotExist): self.controller.get(self.queue_name, msg1["id"], project=self.project) # Make sure such a deletion is idempotent self.controller.delete(self.queue_name, msg1["id"], project=self.project, claim=cid) # A non-existing claim does not ensure the message deletion self.claim_controller.delete(self.queue_name, cid, project=self.project) with testing.expect(storage.exceptions.NotPermitted): self.controller.delete(self.queue_name, msg2["id"], project=self.project, claim=cid) def test_expired_message(self): messages = [{'body': 3.14, 'ttl': 0}] [msgid] = self.controller.post(self.queue_name, messages, project=self.project, client_uuid='my_uuid') with testing.expect(storage.exceptions.DoesNotExist): self.controller.get(self.queue_name, msgid, project=self.project) countof = self.queue_controller.stats(self.queue_name, project=self.project) self.assertEquals(countof['messages']['free'], 0) def test_bad_id(self): # A malformed ID should result in an error. This # doesn't hurt anything, since an attacker could just # read the source code anyway to find out how IDs are # implemented. Plus, if someone is just trying to # get a message that they don't own, they would # more likely just list the messages, not try to # guess an ID of an arbitrary message. queue = 'foo' project = '480924' self.queue_controller.upsert(queue, {}, project) bad_message_id = 'xyz' with testing.expect(exceptions.MalformedID): self.controller.delete(queue, bad_message_id, project) with testing.expect(exceptions.MalformedID): self.controller.get(queue, bad_message_id, project) def test_bad_claim_id(self): self.queue_controller.upsert('unused', {}, '480924') [msgid] = self.controller.post('unused', [{'body': {}, 'ttl': 10}], project='480924', client_uuid='unused') bad_claim_id = '; DROP TABLE queues' with testing.expect(exceptions.MalformedID): self.controller.delete('unused', msgid, project='480924', claim=bad_claim_id) def test_bad_marker(self): queue = 'foo' project = '480924' self.queue_controller.upsert(queue, {}, project) bad_marker = 'xyz' func = self.controller.list results = func(queue, project, marker=bad_marker) self.assertRaises(exceptions.MalformedMarker, results.next) class ClaimControllerTest(ControllerBaseTest): """Claim Controller base tests. NOTE(flaper87): Implementations of this class should override the tearDown method in order to clean up storage's state. """ queue_name = "test_queue" controller_base_class = storage.ClaimBase def setUp(self): super(ClaimControllerTest, self).setUp() # Lets create a queue self.queue_controller = self.driver.queue_controller self.message_controller = self.driver.message_controller self.queue_controller.upsert(self.queue_name, {}, project=self.project) def tearDown(self): self.queue_controller.delete(self.queue_name, project=self.project) super(ClaimControllerTest, self).tearDown() def test_claim_lifecycle(self): _insert_fixtures(self.message_controller, self.queue_name, project=self.project, client_uuid="my_uuid", num=20) meta = {"ttl": 70} # Make sure create works claim_id, messages = self.controller.create(self.queue_name, meta, project=self.project, limit=15) messages = list(messages) self.assertEquals(len(messages), 15) # Ensure Queue stats countof = self.queue_controller.stats(self.queue_name, project=self.project) self.assertEqual(countof['messages']['claimed'], 15) self.assertEqual(countof['messages']['free'], 5) # Make sure get works claim, messages2 = self.controller.get(self.queue_name, claim_id, project=self.project) messages2 = list(messages2) self.assertEquals(len(messages2), 15) self.assertEquals(messages, messages2) self.assertEquals(claim["ttl"], 70) self.assertEquals(claim["id"], claim_id) new_meta = {"ttl": 100} self.controller.update(self.queue_name, claim_id, new_meta, project=self.project) # Make sure update works claim, messages2 = self.controller.get(self.queue_name, claim_id, project=self.project) messages2 = list(messages2) self.assertEquals(len(messages2), 15) #TODO(zyuan): Add some tests to ensure the ttl is extended/not-extended for msg1, msg2 in zip(messages, messages2): self.assertEquals(msg1['body'], msg2['body']) self.assertEquals(claim["ttl"], 100) self.assertEquals(claim["id"], claim_id) # Make sure delete works self.controller.delete(self.queue_name, claim_id, project=self.project) self.assertRaises(storage.exceptions.ClaimDoesNotExist, self.controller.get, self.queue_name, claim_id, project=self.project) def test_expired_claim(self): meta = {"ttl": 0} claim_id, messages = self.controller.create(self.queue_name, meta, project=self.project) with testing.expect(storage.exceptions.DoesNotExist): self.controller.get(self.queue_name, claim_id, project=self.project) with testing.expect(storage.exceptions.DoesNotExist): self.controller.update(self.queue_name, claim_id, meta, project=self.project) def test_illformed_id(self): # any ill-formed IDs should be regarded as non-existing ones. self.queue_controller.upsert('unused', {}, '480924') self.controller.delete('unused', 'illformed', '480924') with testing.expect(exceptions.DoesNotExist): self.controller.update('unused', 'illformed', {'ttl': 40}, '480924') def _insert_fixtures(controller, queue_name, project=None, client_uuid=None, num=4): def messages(): for n in xrange(num): yield { "ttl": 120, "body": { "event": "Event number %s" % n }} controller.post(queue_name, messages(), project=project, client_uuid=client_uuid)