From 11e8b943620d5244b8a20b300d3df6e66aafeab3 Mon Sep 17 00:00:00 2001 From: Alejandro Cabrera Date: Wed, 24 Jul 2013 17:18:12 -0400 Subject: [PATCH] Implements new metadata endpoint This patchset removes the ability to specify metadata when creating a queue as per the Marconi Weekly Meeting held on July 18th, 2013. It also adds a new endpoint: /v1/queues/{name}/metadata with operations - GET - PUT Removes: - GET /v1/queues/{queue_name} # replaced by metadata ^^ - the request body from PUT /v1/queues/{queue_name} Rationale: The addition of the metadata endpoint increases the extensibility of the API, and decouples metadata updates from queue creation. This makes it easier for us in the future to add other endpoints, say /v1/queues/{name}/config for modifying special values that change the behavior of the queue. With that addition, creating a queue with metadata became slightly more dangerous. In the case where a user accidentally tries to create a queue that already exists, the metadata for the existing queue would be overwritten by the PUT request body. By removing the ability to modify metadata at queue creation time, it also prevents these types of accidents. Tests: New unit tests added to capture expected behavior. Old tests were also updated to reflect new endpoint and behavior. Storage API changes: - storage(queue): get -> get_metadata Change-Id: Ie3a79a63a865035a789609dac770adabe4dc6ed7 Implements: blueprint metadata-resource --- marconi/storage/base.py | 4 +- marconi/storage/mongodb/queues.py | 2 +- marconi/storage/sqlite/queues.py | 2 +- marconi/tests/storage/base.py | 8 +- .../tests/transport/wsgi/test_media_type.py | 2 +- .../transport/wsgi/test_queue_lifecycle.py | 104 +++++++++++------- marconi/tests/util/faulty_storage.py | 2 +- marconi/transport/wsgi/driver.py | 6 + marconi/transport/wsgi/metadata.py | 95 ++++++++++++++++ marconi/transport/wsgi/queues.py | 48 -------- 10 files changed, 178 insertions(+), 95 deletions(-) create mode 100644 marconi/transport/wsgi/metadata.py diff --git a/marconi/storage/base.py b/marconi/storage/base.py index 836cb02e2..76b26a2ba 100644 --- a/marconi/storage/base.py +++ b/marconi/storage/base.py @@ -100,8 +100,8 @@ class QueueBase(ControllerBase): raise NotImplementedError @abc.abstractmethod - def get(self, name, project=None): - """Base method for queue retrieval. + def get_metadata(self, name, project=None): + """Base method for queue metadata retrieval. :param name: The queue name :param project: Project id diff --git a/marconi/storage/mongodb/queues.py b/marconi/storage/mongodb/queues.py index 69622f709..b4bf9f226 100644 --- a/marconi/storage/mongodb/queues.py +++ b/marconi/storage/mongodb/queues.py @@ -108,7 +108,7 @@ class QueueController(storage.QueueBase): yield marker_name and marker_name['next'] @utils.raises_conn_error - def get(self, name, project=None): + def get_metadata(self, name, project=None): queue = self._get(name, project) return queue.get('m', {}) diff --git a/marconi/storage/sqlite/queues.py b/marconi/storage/sqlite/queues.py index 8ae2bc7a6..01b5d7e47 100644 --- a/marconi/storage/sqlite/queues.py +++ b/marconi/storage/sqlite/queues.py @@ -71,7 +71,7 @@ class QueueController(base.QueueBase): yield it() yield marker_name['next'] - def get(self, name, project): + def get_metadata(self, name, project): if project is None: project = '' diff --git a/marconi/tests/storage/base.py b/marconi/tests/storage/base.py index 20bec25ef..5253b7de7 100644 --- a/marconi/tests/storage/base.py +++ b/marconi/tests/storage/base.py @@ -76,21 +76,21 @@ class QueueControllerTest(ControllerBaseTest): self.assertTrue(created) # Test Queue retrieval - metadata = self.controller.get('test', project=self.project) + metadata = self.controller.get_metadata('test', project=self.project) self.assertEqual(metadata, {}) # Test Queue Update created = self.controller.set_metadata('test', project=self.project, metadata=dict(meta='test_meta')) - metadata = self.controller.get('test', project=self.project) + metadata = self.controller.get_metadata('test', project=self.project) self.assertEqual(metadata['meta'], 'test_meta') # Touching an existing queue does not affect metadata created = self.controller.create('test', project=self.project) self.assertFalse(created) - metadata = self.controller.get('test', project=self.project) + metadata = self.controller.get_metadata('test', project=self.project) self.assertEqual(metadata['meta'], 'test_meta') # Test Queue Statistic @@ -105,7 +105,7 @@ class QueueControllerTest(ControllerBaseTest): # Test DoesNotExist Exception with testing.expect(storage.exceptions.DoesNotExist): - self.controller.get('test', project=self.project) + self.controller.get_metadata('test', project=self.project) with testing.expect(storage.exceptions.DoesNotExist): self.controller.set_metadata('test', '{}', project=self.project) diff --git a/marconi/tests/transport/wsgi/test_media_type.py b/marconi/tests/transport/wsgi/test_media_type.py index ddb714c55..418e44b15 100644 --- a/marconi/tests/transport/wsgi/test_media_type.py +++ b/marconi/tests/transport/wsgi/test_media_type.py @@ -29,7 +29,7 @@ class TestWSGIMediaType(base.TestBase): endpoints = [ ('GET', '/v1/queues'), - ('GET', '/v1/queues/nonexistent'), + ('GET', '/v1/queues/nonexistent/metadata'), ('GET', '/v1/queues/nonexistent/stats'), ('POST', '/v1/queues/nonexistent/messages'), ('GET', '/v1/queues/nonexistent/messages/deadbeaf'), diff --git a/marconi/tests/transport/wsgi/test_queue_lifecycle.py b/marconi/tests/transport/wsgi/test_queue_lifecycle.py index e932ea8bd..cda17ee66 100644 --- a/marconi/tests/transport/wsgi/test_queue_lifecycle.py +++ b/marconi/tests/transport/wsgi/test_queue_lifecycle.py @@ -29,23 +29,36 @@ class QueueLifecycleBaseTest(base.TestBase): config_filename = None - def test_simple(self): + def test_basics_thoroughly(self): path = '/v1/queues/gumshoe' for project_id in ('480924', 'foo', '', None): - # Stats + # Stats not found - queue not created yet self.simulate_get(path + '/stats', project_id) self.assertEquals(self.srmock.status, falcon.HTTP_404) + # Metadata not found - queue not created yet + self.simulate_get(path + '/metadata', project_id) + self.assertEquals(self.srmock.status, falcon.HTTP_404) + # Create - doc = '{"messages": {"ttl": 600}}' - self.simulate_put(path, project_id, body=doc) + self.simulate_put(path, project_id) self.assertEquals(self.srmock.status, falcon.HTTP_201) location = ('Location', '/v1/queues/gumshoe') self.assertIn(location, self.srmock.headers) - result = self.simulate_get(path, project_id) + # Get on queues shouldn't work any more + self.simulate_get(path, project_id) + self.assertEquals(self.srmock.status, falcon.HTTP_405) + + # Add metadata + doc = '{"messages": {"ttl": 600}}' + self.simulate_put(path + '/metadata', project_id, body=doc) + self.assertEquals(self.srmock.status, falcon.HTTP_204) + + # Fetch metadata + result = self.simulate_get(path + '/metadata', project_id) result_doc = json.loads(result[0]) self.assertEquals(self.srmock.status, falcon.HTTP_200) self.assertEquals(result_doc, json.loads(doc)) @@ -54,70 +67,87 @@ class QueueLifecycleBaseTest(base.TestBase): self.simulate_delete(path, project_id) self.assertEquals(self.srmock.status, falcon.HTTP_204) - # Get non-existing - self.simulate_get(path, project_id) + # Get non-existent stats + self.simulate_get(path + '/stats', project_id) + self.assertEquals(self.srmock.status, falcon.HTTP_404) + + # Get non-existent metadata + self.simulate_get(path + '/metadata', project_id) self.assertEquals(self.srmock.status, falcon.HTTP_404) def test_no_metadata(self): self.simulate_put('/v1/queues/fizbat') - self.assertEquals(self.srmock.status, falcon.HTTP_400) + self.assertEquals(self.srmock.status, falcon.HTTP_201) def test_bad_metadata(self): + self.simulate_put('/v1/queues/fizbat', '7e55e1a7e') + self.assertEquals(self.srmock.status, falcon.HTTP_201) for document in ('{', '[]', '.', ' ', ''): - self.simulate_put('/v1/queues/fizbat', '7e55e1a7e', + self.simulate_put('/v1/queues/fizbat/metadata', '7e55e1a7e', body=document) self.assertEquals(self.srmock.status, falcon.HTTP_400) def test_too_much_metadata(self): + self.simulate_put('/v1/queues/fizbat', '7e55e1a7e') + self.assertEquals(self.srmock.status, falcon.HTTP_201) doc = '{"messages": {"ttl": 600}, "padding": "%s"}' padding_len = transport.MAX_QUEUE_METADATA_SIZE - (len(doc) - 2) + 1 doc = doc % ('x' * padding_len) - self.simulate_put('/v1/queues/fizbat', 'deadbeef', body=doc) + self.simulate_put('/v1/queues/fizbat/metadata', '7e55e1a7e', body=doc) self.assertEquals(self.srmock.status, falcon.HTTP_400) def test_way_too_much_metadata(self): + self.simulate_put('/v1/queues/fizbat', '7e55e1a7e') + self.assertEquals(self.srmock.status, falcon.HTTP_201) doc = '{"messages": {"ttl": 600}, "padding": "%s"}' padding_len = transport.MAX_QUEUE_METADATA_SIZE * 100 doc = doc % ('x' * padding_len) - self.simulate_put('/v1/queues/fizbat', 'deadbeef', body=doc) + self.simulate_put('/v1/queues/fizbat/metadata', '7e55e1a7e', body=doc) self.assertEquals(self.srmock.status, falcon.HTTP_400) def test_custom_metadata(self): + self.simulate_put('/v1/queues/fizbat', '480924') + self.assertEquals(self.srmock.status, falcon.HTTP_201) + # Set doc = '{"messages": {"ttl": 600}, "padding": "%s"}' padding_len = transport.MAX_QUEUE_METADATA_SIZE - (len(doc) - 2) doc = doc % ('x' * padding_len) - self.simulate_put('/v1/queues/fizbat', '480924', body=doc) - self.assertEquals(self.srmock.status, falcon.HTTP_201) - - # Get - result = self.simulate_get('/v1/queues/fizbat', '480924') - result_doc = json.loads(result[0]) - self.assertEquals(result_doc, json.loads(doc)) - - def test_update_metadata(self): - path = '/v1/queues/xyz' - project_id = '480924' - - # Create - doc1 = '{"messages": {"ttl": 600}}' - self.simulate_put(path, project_id, body=doc1) - self.assertEquals(self.srmock.status, falcon.HTTP_201) - - # Update - doc2 = '{"messages": {"ttl": 100}}' - self.simulate_put(path, project_id, body=doc2) + self.simulate_put('/v1/queues/fizbat/metadata', '480924', body=doc) self.assertEquals(self.srmock.status, falcon.HTTP_204) # Get - result = self.simulate_get(path, project_id) + result = self.simulate_get('/v1/queues/fizbat/metadata', '480924') + result_doc = json.loads(result[0]) + self.assertEquals(result_doc, json.loads(doc)) + self.assertEquals(self.srmock.status, falcon.HTTP_200) + + def test_update_metadata(self): + # Create + path = '/v1/queues/xyz' + project_id = '480924' + self.simulate_put(path, project_id) + self.assertEquals(self.srmock.status, falcon.HTTP_201) + + # Set meta + doc1 = '{"messages": {"ttl": 600}}' + self.simulate_put(path + '/metadata', project_id, body=doc1) + self.assertEquals(self.srmock.status, falcon.HTTP_204) + + # Update + doc2 = '{"messages": {"ttl": 100}}' + self.simulate_put(path + '/metadata', project_id, body=doc2) + self.assertEquals(self.srmock.status, falcon.HTTP_204) + + # Get + result = self.simulate_get(path + '/metadata', project_id) result_doc = json.loads(result[0]) self.assertEquals(result_doc, json.loads(doc2)) self.assertEquals(self.srmock.headers_dict['Content-Location'], - path) + path + '/metadata') def test_list(self): project_id = '644079696574693' @@ -144,10 +174,10 @@ class QueueLifecycleBaseTest(base.TestBase): '/v1/queues?limit=2') for queue in result_doc['queues']: - self.simulate_get(queue['href'], project_id) + self.simulate_get(queue['href'] + '/metadata', project_id) self.assertEquals(self.srmock.status, falcon.HTTP_200) - self.simulate_get(queue['href'], alt_project_id) + self.simulate_get(queue['href'] + '/metadata', alt_project_id) self.assertEquals(self.srmock.status, falcon.HTTP_404) self.assertNotIn('metadata', queue) @@ -161,7 +191,7 @@ class QueueLifecycleBaseTest(base.TestBase): [target, params] = result_doc['links'][0]['href'].split('?') [queue] = result_doc['queues'] - result = self.simulate_get(queue['href'], project_id) + result = self.simulate_get(queue['href'] + '/metadata', project_id) result_doc = json.loads(result[0]) self.assertEquals(result_doc, queue['metadata']) @@ -205,7 +235,7 @@ class QueueFaultyDriverTests(base.TestBaseFaulty): location = ('Location', path) self.assertNotIn(location, self.srmock.headers) - result = self.simulate_get(path, '480924') + result = self.simulate_get(path + '/metadata', '480924') result_doc = json.loads(result[0]) self.assertEquals(self.srmock.status, falcon.HTTP_503) self.assertNotEquals(result_doc, json.loads(doc)) diff --git a/marconi/tests/util/faulty_storage.py b/marconi/tests/util/faulty_storage.py index ea7043fba..f5a96d0ac 100644 --- a/marconi/tests/util/faulty_storage.py +++ b/marconi/tests/util/faulty_storage.py @@ -38,7 +38,7 @@ class QueueController(storage.QueueBase): def list(self, project=None): raise NotImplementedError() - def get(self, name, project=None): + def get_metadata(self, name, project=None): raise NotImplementedError() def create(self, name, project=None): diff --git a/marconi/transport/wsgi/driver.py b/marconi/transport/wsgi/driver.py index 4f9a2e635..b0f439530 100644 --- a/marconi/transport/wsgi/driver.py +++ b/marconi/transport/wsgi/driver.py @@ -23,6 +23,7 @@ from marconi.transport import auth from marconi.transport.wsgi import claims from marconi.transport.wsgi import health from marconi.transport.wsgi import messages +from marconi.transport.wsgi import metadata from marconi.transport.wsgi import queues from marconi.transport.wsgi import stats @@ -79,6 +80,11 @@ class Driver(transport.DriverBase): self.app.add_route('/v1/queues/{queue_name}' '/stats', stats_endpoint) + # Metadata Endpoints + metadata_endpoint = metadata.Resource(queue_controller) + self.app.add_route('/v1/queues/{queue_name}' + '/metadata', metadata_endpoint) + # Messages Endpoints msg_collection = messages.CollectionResource(message_controller) self.app.add_route('/v1/queues/{queue_name}' diff --git a/marconi/transport/wsgi/metadata.py b/marconi/transport/wsgi/metadata.py new file mode 100644 index 000000000..cce285389 --- /dev/null +++ b/marconi/transport/wsgi/metadata.py @@ -0,0 +1,95 @@ +# Copyright (c) 2013 Rackspace, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import falcon + +import marconi.openstack.common.log as logging +from marconi.storage import exceptions as storage_exceptions +from marconi import transport +from marconi.transport import helpers +from marconi.transport.wsgi import exceptions as wsgi_exceptions + + +LOG = logging.getLogger(__name__) + + +class Resource(object): + __slots__ = ('queue_ctrl', ) + + def __init__(self, queue_controller): + self.queue_ctrl = queue_controller + + def on_get(self, req, resp, project_id, queue_name): + LOG.debug(_("Queue metadata GET - queue: %(queue)s, " + "project: %(project)s") % + {"queue": queue_name, "project": project_id}) + + try: + resp_dict = self.queue_ctrl.get_metadata(queue_name, + project=project_id) + + except storage_exceptions.DoesNotExist: + raise falcon.HTTPNotFound() + + except Exception as ex: + LOG.exception(ex) + description = _('Queue metadata could not be retrieved.') + raise wsgi_exceptions.HTTPServiceUnavailable(description) + + resp.content_location = req.path + resp.body = helpers.to_json(resp_dict) + resp.status = falcon.HTTP_200 + + def on_put(self, req, resp, project_id, queue_name): + LOG.debug(_("Queue metadata PUT - queue: %(queue)s, " + "project: %(project)s") % + {"queue": queue_name, "project": project_id}) + + # TODO(kgriffs): Migrate this check to input validator middleware + if req.content_length > transport.MAX_QUEUE_METADATA_SIZE: + description = _('Queue metadata size is too large.') + raise wsgi_exceptions.HTTPBadRequestBody(description) + + # Deserialize queue metadata + try: + metadata = helpers.read_json(req.stream, req.content_length) + except helpers.MalformedJSON: + description = _('Request body could not be parsed.') + raise wsgi_exceptions.HTTPBadRequestBody(description) + except Exception as ex: + LOG.exception(ex) + description = _('Request body could not be read.') + raise wsgi_exceptions.HTTPServiceUnavailable(description) + + # Metadata must be a JSON object + if not isinstance(metadata, dict): + description = _('Queue metadata must be an object.') + raise wsgi_exceptions.HTTPBadRequestBody(description) + + try: + self.queue_ctrl.set_metadata(queue_name, + metadata=metadata, + project=project_id) + + except storage_exceptions.QueueDoesNotExist: + raise falcon.HTTPNotFound() + + except Exception as ex: + LOG.exception(ex) + description = _('Metadata could not be updated.') + raise wsgi_exceptions.HTTPServiceUnavailable(description) + + resp.status = falcon.HTTP_204 + resp.location = req.path diff --git a/marconi/transport/wsgi/queues.py b/marconi/transport/wsgi/queues.py index d5d3cbf4d..bfee70de3 100644 --- a/marconi/transport/wsgi/queues.py +++ b/marconi/transport/wsgi/queues.py @@ -16,8 +16,6 @@ import falcon import marconi.openstack.common.log as logging -from marconi.storage import exceptions as storage_exceptions -from marconi import transport from marconi.transport import helpers from marconi.transport.wsgi import exceptions as wsgi_exceptions @@ -37,38 +35,12 @@ class ItemResource(object): LOG.debug(_("Queue item PUT - queue: %(queue)s, " "project: %(project)s") % {"queue": queue_name, "project": project_id}) - # TODO(kgriffs): Migrate this check to input validator middleware - if req.content_length > transport.MAX_QUEUE_METADATA_SIZE: - description = _('Queue metadata size is too large.') - raise wsgi_exceptions.HTTPBadRequestBody(description) - # Deserialize queue metadata - try: - metadata = helpers.read_json(req.stream, req.content_length) - except helpers.MalformedJSON: - description = _('Request body could not be parsed.') - raise wsgi_exceptions.HTTPBadRequestBody(description) - except Exception as ex: - LOG.exception(ex) - description = _('Request body could not be read.') - raise wsgi_exceptions.HTTPServiceUnavailable(description) - - # Metadata must be a JSON object - if not isinstance(metadata, dict): - description = _('Queue metadata must be an object.') - raise wsgi_exceptions.HTTPBadRequestBody(description) - - # Create or update the queue try: created = self.queue_controller.create( queue_name, project=project_id) - self.queue_controller.set_metadata( - queue_name, - metadata=metadata, - project=project_id) - except Exception as ex: LOG.exception(ex) description = _('Queue could not be created.') @@ -77,26 +49,6 @@ class ItemResource(object): resp.status = falcon.HTTP_201 if created else falcon.HTTP_204 resp.location = req.path - def on_get(self, req, resp, project_id, queue_name): - LOG.debug(_("Queue item GET - queue: %(queue)s, " - "project: %(project)s") % - {"queue": queue_name, "project": project_id}) - - try: - doc = self.queue_controller.get(queue_name, project=project_id) - - except storage_exceptions.DoesNotExist: - raise falcon.HTTPNotFound() - - except Exception as ex: - LOG.exception(ex) - description = _('Queue metadata could not be retrieved.') - raise wsgi_exceptions.HTTPServiceUnavailable(description) - - else: - resp.content_location = req.relative_uri - resp.body = helpers.to_json(doc) - def on_delete(self, req, resp, project_id, queue_name): LOG.debug(_("Queue item DELETE - queue: %(queue)s, " "project: %(project)s") %