From 58c646fbfecc993d4b5ca3c53a247727c9769061 Mon Sep 17 00:00:00 2001 From: kgriffs Date: Thu, 14 Mar 2013 12:03:42 -0400 Subject: [PATCH] feat(transport.wsgi): Create or update queue Still needs some error handling, but the happy path works. Change-Id: I9da6cd0c7a54693389e996a58c8b2f7664ac7b5e Trello: 116 --- marconi/main.py | 43 +++++++ marconi/tests/etc/wsgi_reference.conf | 3 + marconi/tests/transport/__init__.py | 0 marconi/tests/transport/wsgi/__init__.py | 0 .../transport/wsgi/test_queue_lifecycle.py | 118 ++++++++++++++++++ marconi/transport/__init__.py | 4 + marconi/transport/wsgi/__init__.py | 1 + marconi/transport/wsgi/driver.py | 18 +-- marconi/transport/wsgi/queues.py | 50 ++++++++ tools/pip-requires | 1 + 10 files changed, 225 insertions(+), 13 deletions(-) create mode 100644 marconi/main.py create mode 100644 marconi/tests/transport/__init__.py create mode 100644 marconi/tests/transport/wsgi/__init__.py create mode 100644 marconi/tests/transport/wsgi/test_queue_lifecycle.py create mode 100644 marconi/transport/wsgi/queues.py diff --git a/marconi/main.py b/marconi/main.py new file mode 100644 index 000000000..1fd84b88b --- /dev/null +++ b/marconi/main.py @@ -0,0 +1,43 @@ +# Copyright (c) 2013 Rackspace, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from marconi.common import config +from marconi.storage import sqlite as storage +from marconi.transport.wsgi import driver as wsgi + + +cfg = config.project('marconi').from_options() + + +class Main(object): + """ + Defines the Marconi Kernel + + The Kernel loads up drivers per a given configuration, and manages their + lifetimes. + """ + + def __init__(self, config_file=None): + #TODO(kgriffs): Error handling + cfg.load(config_file) + + #TODO(kgriffs): Determine driver types from cfg + self.storage = storage.Driver() + self.transport = wsgi.Driver(self.storage.queue_controller, + self.storage.message_controller, + self.storage.claim_controller) + + def run(self): + self.transport.listen() diff --git a/marconi/tests/etc/wsgi_reference.conf b/marconi/tests/etc/wsgi_reference.conf index a2aaf64e6..c5c83ec02 100644 --- a/marconi/tests/etc/wsgi_reference.conf +++ b/marconi/tests/etc/wsgi_reference.conf @@ -4,3 +4,6 @@ storage = reference [drivers:transport:wsgi] port = 8888 + +[drivers:storage:reference] +database = :memory: diff --git a/marconi/tests/transport/__init__.py b/marconi/tests/transport/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/marconi/tests/transport/wsgi/__init__.py b/marconi/tests/transport/wsgi/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/marconi/tests/transport/wsgi/test_queue_lifecycle.py b/marconi/tests/transport/wsgi/test_queue_lifecycle.py new file mode 100644 index 000000000..1b8a85e08 --- /dev/null +++ b/marconi/tests/transport/wsgi/test_queue_lifecycle.py @@ -0,0 +1,118 @@ +# Copyright (c) 2013 Rackspace, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json + +import falcon +from falcon import testing +from testtools import matchers + +import marconi +from marconi.tests import util +from marconi import transport + + +class TestCreateQueue(util.TestBase): + + def setUp(self): + super(TestCreateQueue, self).setUp() + + conf_file = self.conf_path('wsgi_reference.conf') + boot = marconi.Bootstrap(conf_file) + + self.app = boot.transport.app + self.srmock = testing.StartResponseMock() + + def test_simple(self): + doc = '{"messages": {"ttl": 600}}' + env = testing.create_environ('/v1/480924/queues/gumshoe', + method="PUT", body=doc) + + self.app(env, self.srmock) + self.assertEquals(self.srmock.status, falcon.HTTP_201) + + location = ('Location', '/v1/480924/queues/gumshoe') + self.assertThat(self.srmock.headers, matchers.Contains(location)) + + env = testing.create_environ('/v1/480924/queues/gumshoe') + result = self.app(env, self.srmock) + self.assertEquals(self.srmock.status, falcon.HTTP_200) + self.assertEquals(result, [doc]) + + def test_no_metadata(self): + env = testing.create_environ('/v1/480924/queues/fizbat', method="PUT") + + self.app(env, self.srmock) + self.assertEquals(self.srmock.status, falcon.HTTP_400) + + def test_too_much_metadata(self): + doc = '{"messages": {"ttl": 600}, "padding": "%s"}' + padding_len = transport.MAX_QUEUE_METADATA_SIZE - (len(doc) - 2) + 1 + doc = doc % ('x' * padding_len) + env = testing.create_environ('/v1/480924/queues/fizbat', + method="PUT", body=doc) + + self.app(env, self.srmock) + self.assertEquals(self.srmock.status, falcon.HTTP_400) + + def test_way_too_much_metadata(self): + doc = '{"messages": {"ttl": 600}, "padding": "%s"}' + padding_len = transport.MAX_QUEUE_METADATA_SIZE * 100 + doc = doc % ('x' * padding_len) + env = testing.create_environ('/v1/480924/queues/gumshoe', + method="PUT", body=doc) + + self.app(env, self.srmock) + self.assertEquals(self.srmock.status, falcon.HTTP_400) + + def test_custom_metadata(self): + # Set + doc = '{"messages": {"ttl": 600}, "padding": "%s"}' + padding_len = transport.MAX_QUEUE_METADATA_SIZE - (len(doc) - 2) + doc = doc % ('x' * padding_len) + env = testing.create_environ('/v1/480924/queues/gumshoe', + method="PUT", body=doc) + + self.app(env, self.srmock) + self.assertEquals(self.srmock.status, falcon.HTTP_201) + + # Get + env = testing.create_environ('/v1/480924/queues/gumshoe') + result = self.app(env, self.srmock) + result_doc = json.loads(result[0]) + self.assertEquals(result_doc, json.loads(doc)) + + def test_update_metadata(self): + # Create + doc1 = '{"messages": {"ttl": 600}}' + env = testing.create_environ('/v1/480924/queues/xyz', + method="PUT", body=doc1) + + self.app(env, self.srmock) + self.assertEquals(self.srmock.status, falcon.HTTP_201) + + # Update + doc2 = '{"messages": {"ttl": 100}}' + env = testing.create_environ('/v1/480924/queues/xyz', + method="PUT", body=doc2) + + self.app(env, self.srmock) + self.assertEquals(self.srmock.status, falcon.HTTP_204) + + # Get + env = testing.create_environ('/v1/480924/queues/xyz') + result = self.app(env, self.srmock) + result_doc = json.loads(result[0]) + self.assertEquals(result_doc, json.loads(doc2)) diff --git a/marconi/transport/__init__.py b/marconi/transport/__init__.py index ad6437ed4..9c92c1ccd 100644 --- a/marconi/transport/__init__.py +++ b/marconi/transport/__init__.py @@ -1,3 +1,7 @@ """Marconi Transport Drivers""" +MAX_QUEUE_METADATA_SIZE = 64 * 1024 +"""Maximum metadata size per queue when serialized as JSON""" + + from marconi.transport.base import DriverBase # NOQA diff --git a/marconi/transport/wsgi/__init__.py b/marconi/transport/wsgi/__init__.py index 56ea3fe0d..57e202bf6 100644 --- a/marconi/transport/wsgi/__init__.py +++ b/marconi/transport/wsgi/__init__.py @@ -1,3 +1,4 @@ """WSGI Transport Driver""" from marconi.transport.wsgi.driver import Driver # NOQA +from marconi.transport.wsgi.queues import QueuesResource # NOQA diff --git a/marconi/transport/wsgi/driver.py b/marconi/transport/wsgi/driver.py index d449fe533..0f590c75b 100644 --- a/marconi/transport/wsgi/driver.py +++ b/marconi/transport/wsgi/driver.py @@ -13,6 +13,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +import falcon + from marconi.common import config from marconi import transport @@ -25,20 +27,10 @@ class Driver(transport.DriverBase): def __init__(self, queue_controller, message_controller, claim_controller): - # E.g.: - # - # self._queue_controller.create(tenant_id, queue_name) - # self._queue_controller.set_metadata(tenant_id, queue_name, metadata) - # - self._queue_controller = queue_controller - self._message_controller = message_controller - self._claim_controller = claim_controller + queues = transport.wsgi.QueuesResource(queue_controller) - # self.app = api = falcon.API() + self.app = api = falcon.API() + api.add_route('/v1/{tenant_id}/queues/{queue_name}', queues) def listen(self): pass - - def app(self, env, start_response, exc_info=None): - """This will be replace by falcon.API().""" - pass diff --git a/marconi/transport/wsgi/queues.py b/marconi/transport/wsgi/queues.py new file mode 100644 index 000000000..df9a58361 --- /dev/null +++ b/marconi/transport/wsgi/queues.py @@ -0,0 +1,50 @@ +# Copyright (c) 2013 Rackspace, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json + +import falcon + +from marconi import transport + + +class QueuesResource(object): + + __slots__ = ('queue_ctrl') + + def __init__(self, queue_controller): + self.queue_ctrl = queue_controller + + def on_put(self, req, resp, tenant_id, queue_name): + if req.content_length > transport.MAX_QUEUE_METADATA_SIZE: + raise falcon.HTTPBadRequest(_('Bad request'), + _('Queue metadata size is too large.')) + + if req.content_length is None or req.content_length == 0: + raise falcon.HTTPBadRequest(_('Bad request'), + _('Missing queue metadata.')) + + #TODO(kgriffs): check for malformed JSON, must be a hash at top level + meta = json.load(req.stream) + + #TODO(kgriffs): catch other kinds of exceptions + created = self.queue_ctrl.upsert(queue_name, meta, tenant=tenant_id) + + resp.status = falcon.HTTP_201 if created else falcon.HTTP_204 + resp.location = req.path + + def on_get(self, req, resp, tenant_id, queue_name): + doc = self.queue_ctrl.get(queue_name, tenant=tenant_id) + resp.body = json.dumps(doc) diff --git a/tools/pip-requires b/tools/pip-requires index c25044ab6..403f5b647 100644 --- a/tools/pip-requires +++ b/tools/pip-requires @@ -1,3 +1,4 @@ cliff +falcon oslo.config>=1.1.0 pymongo