Merge "Implements new metadata endpoint"
This commit is contained in:
commit
839907831c
@ -100,8 +100,8 @@ class QueueBase(ControllerBase):
|
||||
raise NotImplementedError
|
||||
|
||||
@abc.abstractmethod
|
||||
def get(self, name, project=None):
|
||||
"""Base method for queue retrieval.
|
||||
def get_metadata(self, name, project=None):
|
||||
"""Base method for queue metadata retrieval.
|
||||
|
||||
:param name: The queue name
|
||||
:param project: Project id
|
||||
|
@ -108,7 +108,7 @@ class QueueController(storage.QueueBase):
|
||||
yield marker_name and marker_name['next']
|
||||
|
||||
@utils.raises_conn_error
|
||||
def get(self, name, project=None):
|
||||
def get_metadata(self, name, project=None):
|
||||
queue = self._get(name, project)
|
||||
return queue.get('m', {})
|
||||
|
||||
|
@ -71,7 +71,7 @@ class QueueController(base.QueueBase):
|
||||
yield it()
|
||||
yield marker_name['next']
|
||||
|
||||
def get(self, name, project):
|
||||
def get_metadata(self, name, project):
|
||||
if project is None:
|
||||
project = ''
|
||||
|
||||
|
@ -76,21 +76,21 @@ class QueueControllerTest(ControllerBaseTest):
|
||||
self.assertTrue(created)
|
||||
|
||||
# Test Queue retrieval
|
||||
metadata = self.controller.get('test', project=self.project)
|
||||
metadata = self.controller.get_metadata('test', project=self.project)
|
||||
self.assertEqual(metadata, {})
|
||||
|
||||
# Test Queue Update
|
||||
created = self.controller.set_metadata('test', project=self.project,
|
||||
metadata=dict(meta='test_meta'))
|
||||
|
||||
metadata = self.controller.get('test', project=self.project)
|
||||
metadata = self.controller.get_metadata('test', project=self.project)
|
||||
self.assertEqual(metadata['meta'], 'test_meta')
|
||||
|
||||
# Touching an existing queue does not affect metadata
|
||||
created = self.controller.create('test', project=self.project)
|
||||
self.assertFalse(created)
|
||||
|
||||
metadata = self.controller.get('test', project=self.project)
|
||||
metadata = self.controller.get_metadata('test', project=self.project)
|
||||
self.assertEqual(metadata['meta'], 'test_meta')
|
||||
|
||||
# Test Queue Statistic
|
||||
@ -105,7 +105,7 @@ class QueueControllerTest(ControllerBaseTest):
|
||||
|
||||
# Test DoesNotExist Exception
|
||||
with testing.expect(storage.exceptions.DoesNotExist):
|
||||
self.controller.get('test', project=self.project)
|
||||
self.controller.get_metadata('test', project=self.project)
|
||||
|
||||
with testing.expect(storage.exceptions.DoesNotExist):
|
||||
self.controller.set_metadata('test', '{}', project=self.project)
|
||||
|
@ -29,7 +29,7 @@ class TestWSGIMediaType(base.TestBase):
|
||||
|
||||
endpoints = [
|
||||
('GET', '/v1/queues'),
|
||||
('GET', '/v1/queues/nonexistent'),
|
||||
('GET', '/v1/queues/nonexistent/metadata'),
|
||||
('GET', '/v1/queues/nonexistent/stats'),
|
||||
('POST', '/v1/queues/nonexistent/messages'),
|
||||
('GET', '/v1/queues/nonexistent/messages/deadbeaf'),
|
||||
|
@ -29,23 +29,36 @@ class QueueLifecycleBaseTest(base.TestBase):
|
||||
|
||||
config_filename = None
|
||||
|
||||
def test_simple(self):
|
||||
def test_basics_thoroughly(self):
|
||||
path = '/v1/queues/gumshoe'
|
||||
|
||||
for project_id in ('480924', 'foo', '', None):
|
||||
# Stats
|
||||
# Stats not found - queue not created yet
|
||||
self.simulate_get(path + '/stats', project_id)
|
||||
self.assertEquals(self.srmock.status, falcon.HTTP_404)
|
||||
|
||||
# Metadata not found - queue not created yet
|
||||
self.simulate_get(path + '/metadata', project_id)
|
||||
self.assertEquals(self.srmock.status, falcon.HTTP_404)
|
||||
|
||||
# Create
|
||||
doc = '{"messages": {"ttl": 600}}'
|
||||
self.simulate_put(path, project_id, body=doc)
|
||||
self.simulate_put(path, project_id)
|
||||
self.assertEquals(self.srmock.status, falcon.HTTP_201)
|
||||
|
||||
location = ('Location', '/v1/queues/gumshoe')
|
||||
self.assertIn(location, self.srmock.headers)
|
||||
|
||||
result = self.simulate_get(path, project_id)
|
||||
# Get on queues shouldn't work any more
|
||||
self.simulate_get(path, project_id)
|
||||
self.assertEquals(self.srmock.status, falcon.HTTP_405)
|
||||
|
||||
# Add metadata
|
||||
doc = '{"messages": {"ttl": 600}}'
|
||||
self.simulate_put(path + '/metadata', project_id, body=doc)
|
||||
self.assertEquals(self.srmock.status, falcon.HTTP_204)
|
||||
|
||||
# Fetch metadata
|
||||
result = self.simulate_get(path + '/metadata', project_id)
|
||||
result_doc = json.loads(result[0])
|
||||
self.assertEquals(self.srmock.status, falcon.HTTP_200)
|
||||
self.assertEquals(result_doc, json.loads(doc))
|
||||
@ -54,70 +67,87 @@ class QueueLifecycleBaseTest(base.TestBase):
|
||||
self.simulate_delete(path, project_id)
|
||||
self.assertEquals(self.srmock.status, falcon.HTTP_204)
|
||||
|
||||
# Get non-existing
|
||||
self.simulate_get(path, project_id)
|
||||
# Get non-existent stats
|
||||
self.simulate_get(path + '/stats', project_id)
|
||||
self.assertEquals(self.srmock.status, falcon.HTTP_404)
|
||||
|
||||
# Get non-existent metadata
|
||||
self.simulate_get(path + '/metadata', project_id)
|
||||
self.assertEquals(self.srmock.status, falcon.HTTP_404)
|
||||
|
||||
def test_no_metadata(self):
|
||||
self.simulate_put('/v1/queues/fizbat')
|
||||
self.assertEquals(self.srmock.status, falcon.HTTP_400)
|
||||
self.assertEquals(self.srmock.status, falcon.HTTP_201)
|
||||
|
||||
def test_bad_metadata(self):
|
||||
self.simulate_put('/v1/queues/fizbat', '7e55e1a7e')
|
||||
self.assertEquals(self.srmock.status, falcon.HTTP_201)
|
||||
for document in ('{', '[]', '.', ' ', ''):
|
||||
self.simulate_put('/v1/queues/fizbat', '7e55e1a7e',
|
||||
self.simulate_put('/v1/queues/fizbat/metadata', '7e55e1a7e',
|
||||
body=document)
|
||||
self.assertEquals(self.srmock.status, falcon.HTTP_400)
|
||||
|
||||
def test_too_much_metadata(self):
|
||||
self.simulate_put('/v1/queues/fizbat', '7e55e1a7e')
|
||||
self.assertEquals(self.srmock.status, falcon.HTTP_201)
|
||||
doc = '{"messages": {"ttl": 600}, "padding": "%s"}'
|
||||
padding_len = transport.MAX_QUEUE_METADATA_SIZE - (len(doc) - 2) + 1
|
||||
doc = doc % ('x' * padding_len)
|
||||
|
||||
self.simulate_put('/v1/queues/fizbat', 'deadbeef', body=doc)
|
||||
self.simulate_put('/v1/queues/fizbat/metadata', '7e55e1a7e', body=doc)
|
||||
self.assertEquals(self.srmock.status, falcon.HTTP_400)
|
||||
|
||||
def test_way_too_much_metadata(self):
|
||||
self.simulate_put('/v1/queues/fizbat', '7e55e1a7e')
|
||||
self.assertEquals(self.srmock.status, falcon.HTTP_201)
|
||||
doc = '{"messages": {"ttl": 600}, "padding": "%s"}'
|
||||
padding_len = transport.MAX_QUEUE_METADATA_SIZE * 100
|
||||
doc = doc % ('x' * padding_len)
|
||||
|
||||
self.simulate_put('/v1/queues/fizbat', 'deadbeef', body=doc)
|
||||
self.simulate_put('/v1/queues/fizbat/metadata', '7e55e1a7e', body=doc)
|
||||
self.assertEquals(self.srmock.status, falcon.HTTP_400)
|
||||
|
||||
def test_custom_metadata(self):
|
||||
self.simulate_put('/v1/queues/fizbat', '480924')
|
||||
self.assertEquals(self.srmock.status, falcon.HTTP_201)
|
||||
|
||||
# Set
|
||||
doc = '{"messages": {"ttl": 600}, "padding": "%s"}'
|
||||
padding_len = transport.MAX_QUEUE_METADATA_SIZE - (len(doc) - 2)
|
||||
doc = doc % ('x' * padding_len)
|
||||
self.simulate_put('/v1/queues/fizbat', '480924', body=doc)
|
||||
self.assertEquals(self.srmock.status, falcon.HTTP_201)
|
||||
|
||||
# Get
|
||||
result = self.simulate_get('/v1/queues/fizbat', '480924')
|
||||
result_doc = json.loads(result[0])
|
||||
self.assertEquals(result_doc, json.loads(doc))
|
||||
|
||||
def test_update_metadata(self):
|
||||
path = '/v1/queues/xyz'
|
||||
project_id = '480924'
|
||||
|
||||
# Create
|
||||
doc1 = '{"messages": {"ttl": 600}}'
|
||||
self.simulate_put(path, project_id, body=doc1)
|
||||
self.assertEquals(self.srmock.status, falcon.HTTP_201)
|
||||
|
||||
# Update
|
||||
doc2 = '{"messages": {"ttl": 100}}'
|
||||
self.simulate_put(path, project_id, body=doc2)
|
||||
self.simulate_put('/v1/queues/fizbat/metadata', '480924', body=doc)
|
||||
self.assertEquals(self.srmock.status, falcon.HTTP_204)
|
||||
|
||||
# Get
|
||||
result = self.simulate_get(path, project_id)
|
||||
result = self.simulate_get('/v1/queues/fizbat/metadata', '480924')
|
||||
result_doc = json.loads(result[0])
|
||||
self.assertEquals(result_doc, json.loads(doc))
|
||||
self.assertEquals(self.srmock.status, falcon.HTTP_200)
|
||||
|
||||
def test_update_metadata(self):
|
||||
# Create
|
||||
path = '/v1/queues/xyz'
|
||||
project_id = '480924'
|
||||
self.simulate_put(path, project_id)
|
||||
self.assertEquals(self.srmock.status, falcon.HTTP_201)
|
||||
|
||||
# Set meta
|
||||
doc1 = '{"messages": {"ttl": 600}}'
|
||||
self.simulate_put(path + '/metadata', project_id, body=doc1)
|
||||
self.assertEquals(self.srmock.status, falcon.HTTP_204)
|
||||
|
||||
# Update
|
||||
doc2 = '{"messages": {"ttl": 100}}'
|
||||
self.simulate_put(path + '/metadata', project_id, body=doc2)
|
||||
self.assertEquals(self.srmock.status, falcon.HTTP_204)
|
||||
|
||||
# Get
|
||||
result = self.simulate_get(path + '/metadata', project_id)
|
||||
result_doc = json.loads(result[0])
|
||||
|
||||
self.assertEquals(result_doc, json.loads(doc2))
|
||||
self.assertEquals(self.srmock.headers_dict['Content-Location'],
|
||||
path)
|
||||
path + '/metadata')
|
||||
|
||||
def test_list(self):
|
||||
project_id = '644079696574693'
|
||||
@ -144,10 +174,10 @@ class QueueLifecycleBaseTest(base.TestBase):
|
||||
'/v1/queues?limit=2')
|
||||
|
||||
for queue in result_doc['queues']:
|
||||
self.simulate_get(queue['href'], project_id)
|
||||
self.simulate_get(queue['href'] + '/metadata', project_id)
|
||||
self.assertEquals(self.srmock.status, falcon.HTTP_200)
|
||||
|
||||
self.simulate_get(queue['href'], alt_project_id)
|
||||
self.simulate_get(queue['href'] + '/metadata', alt_project_id)
|
||||
self.assertEquals(self.srmock.status, falcon.HTTP_404)
|
||||
|
||||
self.assertNotIn('metadata', queue)
|
||||
@ -161,7 +191,7 @@ class QueueLifecycleBaseTest(base.TestBase):
|
||||
[target, params] = result_doc['links'][0]['href'].split('?')
|
||||
|
||||
[queue] = result_doc['queues']
|
||||
result = self.simulate_get(queue['href'], project_id)
|
||||
result = self.simulate_get(queue['href'] + '/metadata', project_id)
|
||||
result_doc = json.loads(result[0])
|
||||
self.assertEquals(result_doc, queue['metadata'])
|
||||
|
||||
@ -205,7 +235,7 @@ class QueueFaultyDriverTests(base.TestBaseFaulty):
|
||||
location = ('Location', path)
|
||||
self.assertNotIn(location, self.srmock.headers)
|
||||
|
||||
result = self.simulate_get(path, '480924')
|
||||
result = self.simulate_get(path + '/metadata', '480924')
|
||||
result_doc = json.loads(result[0])
|
||||
self.assertEquals(self.srmock.status, falcon.HTTP_503)
|
||||
self.assertNotEquals(result_doc, json.loads(doc))
|
||||
|
@ -38,7 +38,7 @@ class QueueController(storage.QueueBase):
|
||||
def list(self, project=None):
|
||||
raise NotImplementedError()
|
||||
|
||||
def get(self, name, project=None):
|
||||
def get_metadata(self, name, project=None):
|
||||
raise NotImplementedError()
|
||||
|
||||
def create(self, name, project=None):
|
||||
|
@ -23,6 +23,7 @@ from marconi.transport import auth
|
||||
from marconi.transport.wsgi import claims
|
||||
from marconi.transport.wsgi import health
|
||||
from marconi.transport.wsgi import messages
|
||||
from marconi.transport.wsgi import metadata
|
||||
from marconi.transport.wsgi import queues
|
||||
from marconi.transport.wsgi import stats
|
||||
|
||||
@ -79,6 +80,11 @@ class Driver(transport.DriverBase):
|
||||
self.app.add_route('/v1/queues/{queue_name}'
|
||||
'/stats', stats_endpoint)
|
||||
|
||||
# Metadata Endpoints
|
||||
metadata_endpoint = metadata.Resource(queue_controller)
|
||||
self.app.add_route('/v1/queues/{queue_name}'
|
||||
'/metadata', metadata_endpoint)
|
||||
|
||||
# Messages Endpoints
|
||||
msg_collection = messages.CollectionResource(message_controller)
|
||||
self.app.add_route('/v1/queues/{queue_name}'
|
||||
|
95
marconi/transport/wsgi/metadata.py
Normal file
95
marconi/transport/wsgi/metadata.py
Normal file
@ -0,0 +1,95 @@
|
||||
# Copyright (c) 2013 Rackspace, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import falcon
|
||||
|
||||
import marconi.openstack.common.log as logging
|
||||
from marconi.storage import exceptions as storage_exceptions
|
||||
from marconi import transport
|
||||
from marconi.transport import helpers
|
||||
from marconi.transport.wsgi import exceptions as wsgi_exceptions
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class Resource(object):
|
||||
__slots__ = ('queue_ctrl', )
|
||||
|
||||
def __init__(self, queue_controller):
|
||||
self.queue_ctrl = queue_controller
|
||||
|
||||
def on_get(self, req, resp, project_id, queue_name):
|
||||
LOG.debug(_("Queue metadata GET - queue: %(queue)s, "
|
||||
"project: %(project)s") %
|
||||
{"queue": queue_name, "project": project_id})
|
||||
|
||||
try:
|
||||
resp_dict = self.queue_ctrl.get_metadata(queue_name,
|
||||
project=project_id)
|
||||
|
||||
except storage_exceptions.DoesNotExist:
|
||||
raise falcon.HTTPNotFound()
|
||||
|
||||
except Exception as ex:
|
||||
LOG.exception(ex)
|
||||
description = _('Queue metadata could not be retrieved.')
|
||||
raise wsgi_exceptions.HTTPServiceUnavailable(description)
|
||||
|
||||
resp.content_location = req.path
|
||||
resp.body = helpers.to_json(resp_dict)
|
||||
resp.status = falcon.HTTP_200
|
||||
|
||||
def on_put(self, req, resp, project_id, queue_name):
|
||||
LOG.debug(_("Queue metadata PUT - queue: %(queue)s, "
|
||||
"project: %(project)s") %
|
||||
{"queue": queue_name, "project": project_id})
|
||||
|
||||
# TODO(kgriffs): Migrate this check to input validator middleware
|
||||
if req.content_length > transport.MAX_QUEUE_METADATA_SIZE:
|
||||
description = _('Queue metadata size is too large.')
|
||||
raise wsgi_exceptions.HTTPBadRequestBody(description)
|
||||
|
||||
# Deserialize queue metadata
|
||||
try:
|
||||
metadata = helpers.read_json(req.stream, req.content_length)
|
||||
except helpers.MalformedJSON:
|
||||
description = _('Request body could not be parsed.')
|
||||
raise wsgi_exceptions.HTTPBadRequestBody(description)
|
||||
except Exception as ex:
|
||||
LOG.exception(ex)
|
||||
description = _('Request body could not be read.')
|
||||
raise wsgi_exceptions.HTTPServiceUnavailable(description)
|
||||
|
||||
# Metadata must be a JSON object
|
||||
if not isinstance(metadata, dict):
|
||||
description = _('Queue metadata must be an object.')
|
||||
raise wsgi_exceptions.HTTPBadRequestBody(description)
|
||||
|
||||
try:
|
||||
self.queue_ctrl.set_metadata(queue_name,
|
||||
metadata=metadata,
|
||||
project=project_id)
|
||||
|
||||
except storage_exceptions.QueueDoesNotExist:
|
||||
raise falcon.HTTPNotFound()
|
||||
|
||||
except Exception as ex:
|
||||
LOG.exception(ex)
|
||||
description = _('Metadata could not be updated.')
|
||||
raise wsgi_exceptions.HTTPServiceUnavailable(description)
|
||||
|
||||
resp.status = falcon.HTTP_204
|
||||
resp.location = req.path
|
@ -16,8 +16,6 @@
|
||||
import falcon
|
||||
|
||||
import marconi.openstack.common.log as logging
|
||||
from marconi.storage import exceptions as storage_exceptions
|
||||
from marconi import transport
|
||||
from marconi.transport import helpers
|
||||
from marconi.transport.wsgi import exceptions as wsgi_exceptions
|
||||
|
||||
@ -37,38 +35,12 @@ class ItemResource(object):
|
||||
LOG.debug(_("Queue item PUT - queue: %(queue)s, "
|
||||
"project: %(project)s") %
|
||||
{"queue": queue_name, "project": project_id})
|
||||
# TODO(kgriffs): Migrate this check to input validator middleware
|
||||
if req.content_length > transport.MAX_QUEUE_METADATA_SIZE:
|
||||
description = _('Queue metadata size is too large.')
|
||||
raise wsgi_exceptions.HTTPBadRequestBody(description)
|
||||
|
||||
# Deserialize queue metadata
|
||||
try:
|
||||
metadata = helpers.read_json(req.stream, req.content_length)
|
||||
except helpers.MalformedJSON:
|
||||
description = _('Request body could not be parsed.')
|
||||
raise wsgi_exceptions.HTTPBadRequestBody(description)
|
||||
except Exception as ex:
|
||||
LOG.exception(ex)
|
||||
description = _('Request body could not be read.')
|
||||
raise wsgi_exceptions.HTTPServiceUnavailable(description)
|
||||
|
||||
# Metadata must be a JSON object
|
||||
if not isinstance(metadata, dict):
|
||||
description = _('Queue metadata must be an object.')
|
||||
raise wsgi_exceptions.HTTPBadRequestBody(description)
|
||||
|
||||
# Create or update the queue
|
||||
try:
|
||||
created = self.queue_controller.create(
|
||||
queue_name,
|
||||
project=project_id)
|
||||
|
||||
self.queue_controller.set_metadata(
|
||||
queue_name,
|
||||
metadata=metadata,
|
||||
project=project_id)
|
||||
|
||||
except Exception as ex:
|
||||
LOG.exception(ex)
|
||||
description = _('Queue could not be created.')
|
||||
@ -77,26 +49,6 @@ class ItemResource(object):
|
||||
resp.status = falcon.HTTP_201 if created else falcon.HTTP_204
|
||||
resp.location = req.path
|
||||
|
||||
def on_get(self, req, resp, project_id, queue_name):
|
||||
LOG.debug(_("Queue item GET - queue: %(queue)s, "
|
||||
"project: %(project)s") %
|
||||
{"queue": queue_name, "project": project_id})
|
||||
|
||||
try:
|
||||
doc = self.queue_controller.get(queue_name, project=project_id)
|
||||
|
||||
except storage_exceptions.DoesNotExist:
|
||||
raise falcon.HTTPNotFound()
|
||||
|
||||
except Exception as ex:
|
||||
LOG.exception(ex)
|
||||
description = _('Queue metadata could not be retrieved.')
|
||||
raise wsgi_exceptions.HTTPServiceUnavailable(description)
|
||||
|
||||
else:
|
||||
resp.content_location = req.relative_uri
|
||||
resp.body = helpers.to_json(doc)
|
||||
|
||||
def on_delete(self, req, resp, project_id, queue_name):
|
||||
LOG.debug(_("Queue item DELETE - queue: %(queue)s, "
|
||||
"project: %(project)s") %
|
||||
|
Loading…
Reference in New Issue
Block a user