From 33fabc9c34dcc79829ab674d96f1857b43989547 Mon Sep 17 00:00:00 2001 From: Fei Long Wang Date: Fri, 16 Jan 2015 01:33:23 +1300 Subject: [PATCH] Add REST API for notification/subscriptions New REST API(2.0 version) is added in this patch to support the CRUD of subscriptions for notification. DocImpact APIImpact Depends-On: I8420c66cf35f88503279bb6d0926040affd12471 Partially-Implements blueprint: notifications Change-Id: Ie059abdc52453389760e732b71433478761a8ce7 --- tests/unit/transport/wsgi/test_v2_0.py | 4 + zaqar/storage/mongodb/subscriptions.py | 3 +- zaqar/storage/pooling.py | 6 +- .../unit/transport/wsgi/v2_0/__init__.py | 2 + .../transport/wsgi/v2_0/test_subscriptions.py | 268 ++++++++++++++++++ zaqar/transport/validation.py | 72 ++++- zaqar/transport/wsgi/v2_0/__init__.py | 13 +- zaqar/transport/wsgi/v2_0/subscriptions.py | 194 +++++++++++++ 8 files changed, 555 insertions(+), 7 deletions(-) create mode 100644 zaqar/tests/unit/transport/wsgi/v2_0/test_subscriptions.py create mode 100644 zaqar/transport/wsgi/v2_0/subscriptions.py diff --git a/tests/unit/transport/wsgi/test_v2_0.py b/tests/unit/transport/wsgi/test_v2_0.py index 0ae928396..e2776945f 100644 --- a/tests/unit/transport/wsgi/test_v2_0.py +++ b/tests/unit/transport/wsgi/test_v2_0.py @@ -100,6 +100,10 @@ class TestValidation(v2_0.TestValidation): class TestFlavorsMongoDB(v2_0.TestFlavorsMongoDB): url_prefix = URL_PREFIX + +class TestSubscriptionsMongoDB(v2_0.TestSubscriptionsMongoDB): + url_prefix = URL_PREFIX + # -------------------------------------------------------------------------- # v1.1 & v2 only # -------------------------------------------------------------------------- diff --git a/zaqar/storage/mongodb/subscriptions.py b/zaqar/storage/mongodb/subscriptions.py index 6ed3eaa56..cb9e8feec 100644 --- a/zaqar/storage/mongodb/subscriptions.py +++ b/zaqar/storage/mongodb/subscriptions.py @@ -53,7 +53,7 @@ class SubscriptionController(base.Subscription): def list(self, queue, project=None, marker=None, limit=10): query = {'s': queue, 'p': project} if marker is not None: - query['_id'] = {'$gt': marker} + query['_id'] = {'$gt': utils.to_oid(marker)} fields = {'s': 1, 'u': 1, 't': 1, 'p': 1, 'o': 1, '_id': 1} @@ -98,7 +98,6 @@ class SubscriptionController(base.Subscription): '_id': 0}) if target_source is None: raise errors.QueueDoesNotExist(target_source, project) - try: subscription_id = self._collection.insert({'s': source, 'u': subscriber, diff --git a/zaqar/storage/pooling.py b/zaqar/storage/pooling.py index 9b9b59d14..d3f336878 100644 --- a/zaqar/storage/pooling.py +++ b/zaqar/storage/pooling.py @@ -374,9 +374,9 @@ class SubscriptionController(storage.Subscription): def create(self, queue, subscriber, ttl, options, project=None): control = self._get_controller(queue, project) if control: - return control.post(queue, subscriber, - ttl, options, - project=project) + return control.create(queue, subscriber, + ttl, options, + project=project) def update(self, queue, subscription_id, project=None, **kwargs): control = self._get_controller(queue, project) diff --git a/zaqar/tests/unit/transport/wsgi/v2_0/__init__.py b/zaqar/tests/unit/transport/wsgi/v2_0/__init__.py index 0217093c0..26288e359 100644 --- a/zaqar/tests/unit/transport/wsgi/v2_0/__init__.py +++ b/zaqar/tests/unit/transport/wsgi/v2_0/__init__.py @@ -22,6 +22,7 @@ from zaqar.tests.unit.transport.wsgi.v2_0 import test_media_type from zaqar.tests.unit.transport.wsgi.v2_0 import test_messages from zaqar.tests.unit.transport.wsgi.v2_0 import test_pools from zaqar.tests.unit.transport.wsgi.v2_0 import test_queue_lifecycle as l +from zaqar.tests.unit.transport.wsgi.v2_0 import test_subscriptions from zaqar.tests.unit.transport.wsgi.v2_0 import test_validation TestAuth = test_auth.TestAuth @@ -44,3 +45,4 @@ TestPoolsMongoDB = test_pools.TestPoolsMongoDB TestPoolsSqlalchemy = test_pools.TestPoolsSqlalchemy TestValidation = test_validation.TestValidation TestFlavorsMongoDB = test_flavors.TestFlavorsMongoDB +TestSubscriptionsMongoDB = test_subscriptions.TestSubscriptionsMongoDB diff --git a/zaqar/tests/unit/transport/wsgi/v2_0/test_subscriptions.py b/zaqar/tests/unit/transport/wsgi/v2_0/test_subscriptions.py new file mode 100644 index 000000000..0ff74adad --- /dev/null +++ b/zaqar/tests/unit/transport/wsgi/v2_0/test_subscriptions.py @@ -0,0 +1,268 @@ +# Copyright (c) 2015 Catalyst IT Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may not +# use this file except in compliance with the License. You may obtain a copy +# of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations under +# the License. + +import uuid + +import ddt +import falcon +from oslo.serialization import jsonutils + +from zaqar import tests as testing +from zaqar.tests.unit.transport.wsgi import base + + +@ddt.ddt +class SubscriptionsBaseTest(base.V2Base): + + def setUp(self): + super(SubscriptionsBaseTest, self).setUp() + + if self.conf.pooling: + for i in range(1): + uri = self.conf['drivers:management_store:mongodb'].uri + doc = {'weight': 100, 'uri': uri} + self.simulate_put(self.url_prefix + '/pools/' + str(i), + body=jsonutils.dumps(doc)) + self.assertEqual(self.srmock.status, falcon.HTTP_201) + self.addCleanup(self.simulate_delete, + self.url_prefix + '/pools/' + str(i), + headers=self.headers) + + self.project_id = '7e55e1a7exyz' + self.headers = { + 'Client-ID': str(uuid.uuid4()), + 'X-Project-ID': self.project_id + } + self.queue = 'fake-topic' + self.queue_path = self.url_prefix + '/queues/' + self.queue + doc = '{"_ttl": 60}' + self.simulate_put(self.queue_path, body=doc, headers=self.headers) + + self.subscription_path = (self.url_prefix + '/queues/' + self.queue + + '/subscriptions') + + self.addCleanup(self._delete_subscription) + + def tearDown(self): + super(SubscriptionsBaseTest, self).tearDown() + + def _delete_subscription(self, sid=None): + if sid: + self.simulate_delete(self.subscription_path + '/' + sid, + headers=self.headers) + else: + resp = self.simulate_get(self.subscription_path, + headers=self.headers) + resp_doc = jsonutils.loads(resp[0]) + for s in resp_doc['subscriptions']: + self.simulate_delete(self.subscription_path + '/' + s['id'], + headers=self.headers) + + self.simulate_delete(self.queue_path) + + def _create_subscription(self, + subscriber='http://triger.me', + ttl=600, + options='{"a":1}'): + doc = ('{"subscriber": "%s", "ttl": %s, "options": %s}' % (subscriber, + ttl, + options)) + return self.simulate_post(self.subscription_path, body=doc, + headers=self.headers) + + def test_create_works(self): + self._create_subscription() + self.assertEqual(self.srmock.status, falcon.HTTP_201) + + def test_create_duplicate_409(self): + self._create_subscription(subscriber='http://CCC.com') + self.assertEqual(self.srmock.status, falcon.HTTP_201) + + self._create_subscription(subscriber='http://CCC.com') + self.assertEqual(self.srmock.status, falcon.HTTP_409) + + def test_create_invalid_body_400(self): + resp = self._create_subscription(options='xxx') + self.assertEqual(self.srmock.status, falcon.HTTP_400) + resp_doc = jsonutils.loads(resp[0]) + self.assertIn('body could not be parsed', resp_doc['description']) + + def test_create_invalid_subscriber_400(self): + resp = self._create_subscription(subscriber='fake') + self.assertEqual(self.srmock.status, falcon.HTTP_400) + resp_doc = jsonutils.loads(resp[0]) + self.assertIn('must be supported in the list', resp_doc['description']) + + def test_create_unsupported_subscriber_400(self): + resp = self._create_subscription(subscriber='email://fake') + self.assertEqual(self.srmock.status, falcon.HTTP_400) + resp_doc = jsonutils.loads(resp[0]) + self.assertIn('must be supported in the list', + resp_doc['description']) + + def test_create_invalid_options_400(self): + resp = self._create_subscription(options='1') + self.assertEqual(self.srmock.status, falcon.HTTP_400) + resp_doc = jsonutils.loads(resp[0]) + self.assertIn('must be a dict', resp_doc['description']) + + def test_create_invalid_ttl(self): + resp = self._create_subscription(ttl='"invalid"') + self.assertEqual(self.srmock.status, falcon.HTTP_400) + resp_doc = jsonutils.loads(resp[0]) + self.assertIn('must be an integer', resp_doc['description']) + + def _list_subscription(self, count=10, limit=10, marker=None): + for i in range(count): + self._create_subscription(subscriber='http://' + str(i)) + + query = '?limit={0}'.format(limit) + if marker: + query += '&marker={1}'.format(marker) + + resp = self.simulate_get(self.subscription_path, + query_string=query, + headers=self.headers) + self.assertEqual(self.srmock.status, falcon.HTTP_200) + + resp_doc = jsonutils.loads(resp[0]) + self.assertIsInstance(resp_doc, dict) + self.assertIn('subscriptions', resp_doc) + self.assertIn('links', resp_doc) + subscriptions_list = resp_doc['subscriptions'] + + link = resp_doc['links'][0] + self.assertEqual('next', link['rel']) + href = falcon.uri.parse_query_string(link['href']) + self.assertIn('marker', href) + self.assertEqual(href['limit'], str(limit)) + + next_query_string = ('?marker={marker}&limit={limit}').format(**href) + next_result = self.simulate_get(link['href'].split('?')[0], + query_string=next_query_string) + next_subscriptions = jsonutils.loads(next_result[0]) + next_subscriptions_list = next_subscriptions['subscriptions'] + + self.assertEqual(self.srmock.status, falcon.HTTP_200) + self.assertIn('links', next_subscriptions) + if limit < count: + self.assertEqual(len(next_subscriptions_list), + min(limit, count-limit)) + else: + self.assertTrue(len(next_subscriptions_list) == 0) + + self.assertEqual(len(subscriptions_list), min(limit, count)) + + def test_list_works(self): + self._list_subscription() + + @ddt.data(1, 5, 10, 15) + def test_listing_works_with_limit(self, limit): + self._list_subscription(count=15, limit=limit) + + def test_listing_marker_is_respected(self): + for i in range(15): + self._create_subscription(subscriber='http://' + str(i)) + + resp = self.simulate_get(self.subscription_path, + query_string='?limit=20', + headers=self.headers) + subscriptions_list = jsonutils.loads(resp[0])['subscriptions'] + id_list = sorted([s['id'] for s in subscriptions_list]) + + resp = self.simulate_get(self.subscription_path, + query_string='?marker={0}'.format(id_list[9]), + headers=self.headers) + self.assertEqual(self.srmock.status, falcon.HTTP_200) + next_subscriptions_list = jsonutils.loads(resp[0])['subscriptions'] + self.assertEqual(len(next_subscriptions_list), 5) + self.assertEqual(subscriptions_list[10], next_subscriptions_list[0]) + + def test_get_works(self): + self._create_subscription() + resp = self.simulate_get(self.subscription_path, + headers=self.headers) + resp_doc = jsonutils.loads(resp[0]) + sid = resp_doc['subscriptions'][0]['id'] + subscriber = resp_doc['subscriptions'][0]['subscriber'] + + resp = self.simulate_get(self.subscription_path + '/' + sid, + headers=self.headers) + self.assertEqual(self.srmock.status, falcon.HTTP_200) + resp_doc = jsonutils.loads(resp[0]) + self.assertEqual(sid, resp_doc['id']) + self.assertEqual(subscriber, resp_doc['subscriber']) + + def test_get_nonexisting_raise_404(self): + self.simulate_get(self.subscription_path + '/fake', + headers=self.headers) + self.assertEqual(self.srmock.status, falcon.HTTP_404) + + def test_patch_works(self): + self._create_subscription() + resp = self.simulate_get(self.subscription_path, + headers=self.headers) + resp_doc = jsonutils.loads(resp[0]) + sid = resp_doc['subscriptions'][0]['id'] + + resp = self.simulate_patch(self.subscription_path + '/' + sid, + body='{"ttl": 300}', + headers=self.headers) + self.assertEqual(self.srmock.status, falcon.HTTP_204) + + resp = self.simulate_get(self.subscription_path + '/' + sid, + headers=self.headers) + self.assertEqual(self.srmock.status, falcon.HTTP_200) + resp_doc = jsonutils.loads(resp[0]) + self.assertEqual(resp_doc['ttl'], 300) + + def test_patch_nonexisting_raise_404(self): + self.simulate_patch(self.subscription_path + '/x', + body='{"ttl": 300}', + headers=self.headers) + self.assertEqual(self.srmock.status, falcon.HTTP_404) + + def test_patch_invalid_ttl(self): + self.simulate_patch(self.subscription_path + '/x', + body='{"ttl": "invalid"}', + headers=self.headers) + self.assertEqual(self.srmock.status, falcon.HTTP_400) + + def test_delete_works(self): + self._create_subscription() + resp = self.simulate_get(self.subscription_path, + headers=self.headers) + resp_doc = jsonutils.loads(resp[0]) + sid = resp_doc['subscriptions'][0]['id'] + + resp = self.simulate_get(self.subscription_path + '/' + sid, + headers=self.headers) + self.assertEqual(self.srmock.status, falcon.HTTP_200) + + self.simulate_delete(self.subscription_path + '/' + sid, + headers=self.headers) + self.assertEqual(self.srmock.status, falcon.HTTP_204) + + resp = self.simulate_get(self.subscription_path + '/' + sid, + headers=self.headers) + self.assertEqual(self.srmock.status, falcon.HTTP_404) + + +class TestSubscriptionsMongoDB(SubscriptionsBaseTest): + + config_file = 'wsgi_mongodb_pooled.conf' + + @testing.requires_mongodb + def setUp(self): + super(TestSubscriptionsMongoDB, self).setUp() diff --git a/zaqar/transport/validation.py b/zaqar/transport/validation.py index 5d1455626..1504f0f65 100644 --- a/zaqar/transport/validation.py +++ b/zaqar/transport/validation.py @@ -1,4 +1,5 @@ # Copyright (c) 2013 Rackspace, Inc. +# Copyright (c) 2015 Catalyst IT Ltd. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -15,7 +16,8 @@ import re -from oslo_config import cfg +from oslo.config import cfg +import six from zaqar.i18n import _ @@ -34,6 +36,11 @@ _TRANSPORT_LIMITS_OPTIONS = ( deprecated_group='limits:transport', help='Defines the maximum number of messages per page.'), + cfg.IntOpt('max_subscriptions_per_page', default=20, + deprecated_name='subscription_paging_uplimit', + deprecated_group='limits:transport', + help='Defines the maximum number of subscriptions per page.'), + cfg.IntOpt('max_messages_per_claim_or_pop', default=20, deprecated_name='max_messages_per_claim', help='The maximum number of messages that can be claimed (OR) ' @@ -64,6 +71,9 @@ _TRANSPORT_LIMITS_OPTIONS = ( deprecated_name='claim_grace_max', deprecated_group='limits:transport', help='Defines the maximum message grace period in seconds.'), + + cfg.ListOpt('subscriber_types', default=['http'], + help='Defines supported subscriber types.'), ) _TRANSPORT_LIMITS_GROUP = 'transport' @@ -278,3 +288,63 @@ class Validator(object): raise ValidationFailed( msg, self._limits_conf.max_message_ttl, MIN_CLAIM_TTL) + + def subscription_posting(self, subscription): + """Restrictions on a creation of subscription. + + :param subscription: dict of subscription + :raises: ValidationFailed if the subscription is invalid. + """ + for p in ('subscriber', 'ttl', 'options'): + if p not in subscription.keys(): + raise ValidationFailed(_(u'Missing parameter %s in body.') % p) + + self.subscription_patching(subscription) + + def subscription_patching(self, subscription): + """Restrictions on an update of subscription. + + :param subscription: dict of subscription + :raises: ValidationFailed if the subscription is invalid. + """ + + if not subscription: + raise ValidationFailed(_(u'No subscription to create.')) + + subscriber = subscription.get('subscriber', None) + subscriber_type = None + + if subscriber: + parsed_uri = six.moves.urllib_parse.urlparse(subscriber) + subscriber_type = parsed_uri.scheme + + if subscriber_type not in self._limits_conf.subscriber_types: + msg = _(u'The subscriber type of subscription must be ' + u'supported in the list {0}.') + raise ValidationFailed(msg, self._limits_conf.subscriber_types) + + options = subscription.get('options', None) + if options and not isinstance(options, dict): + msg = _(u'Options must be a dict.') + raise ValidationFailed(msg) + + ttl = subscription.get('ttl', None) + if ttl and not isinstance(ttl, int): + msg = _(u'TTL must be an integer.') + raise ValidationFailed(msg) + + def subscription_listing(self, limit=None, **kwargs): + """Restrictions involving a list of subscriptions. + + :param limit: The expected number of subscriptions in the list + :param kwargs: Ignored arguments passed to storage API + :raises: ValidationFailed if the limit is exceeded + """ + + uplimit = self._limits_conf.max_subscriptions_per_page + if limit is not None and not (0 < limit <= uplimit): + msg = _(u'Limit must be at least 1 and may not ' + 'be greater than {0}.') + + raise ValidationFailed( + msg, self._limits_conf.max_subscriptions_per_page) diff --git a/zaqar/transport/wsgi/v2_0/__init__.py b/zaqar/transport/wsgi/v2_0/__init__.py index 7e927deb6..0517fd28c 100644 --- a/zaqar/transport/wsgi/v2_0/__init__.py +++ b/zaqar/transport/wsgi/v2_0/__init__.py @@ -21,6 +21,7 @@ from zaqar.transport.wsgi.v2_0 import ping from zaqar.transport.wsgi.v2_0 import pools from zaqar.transport.wsgi.v2_0 import queues from zaqar.transport.wsgi.v2_0 import stats +from zaqar.transport.wsgi.v2_0 import subscriptions VERSION = { @@ -46,6 +47,7 @@ def public_endpoints(driver, conf): queue_controller = driver._storage.queue_controller message_controller = driver._storage.message_controller claim_controller = driver._storage.claim_controller + subscription_controller = driver._storage.subscription_controller defaults = driver._defaults @@ -91,7 +93,16 @@ def public_endpoints(driver, conf): # Ping ('/ping', - ping.Resource(driver._storage)) + ping.Resource(driver._storage)), + + # Subscription Endpoints + ('/queues/{queue_name}/subscriptions', + subscriptions.CollectionResource(driver._validate, + subscription_controller)), + + ('/queues/{queue_name}/subscriptions/{subscription_id}', + subscriptions.ItemResource(driver._validate, + subscription_controller)), ] diff --git a/zaqar/transport/wsgi/v2_0/subscriptions.py b/zaqar/transport/wsgi/v2_0/subscriptions.py new file mode 100644 index 000000000..7e0f6403a --- /dev/null +++ b/zaqar/transport/wsgi/v2_0/subscriptions.py @@ -0,0 +1,194 @@ +# Copyright (c) 2015 Catalyst IT Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import falcon +import six + +from zaqar.i18n import _ +import zaqar.openstack.common.log as logging +from zaqar.storage import errors as storage_errors +from zaqar.transport import utils +from zaqar.transport import validation +from zaqar.transport.wsgi import errors as wsgi_errors +from zaqar.transport.wsgi import utils as wsgi_utils + + +LOG = logging.getLogger(__name__) + + +class ItemResource(object): + + __slots__ = ('_validate', '_subscription_controller') + + def __init__(self, validate, subscription_controller): + self._validate = validate + self._subscription_controller = subscription_controller + + def on_get(self, req, resp, project_id, queue_name, subscription_id): + LOG.debug(u'Subscription GET - subscription id: %(subscription_id)s,' + u' project: %(project)s, queue: %(queue)s', + {'subscription_id': subscription_id, 'project': project_id, + 'queue': queue_name}) + try: + resp_dict = self._subscription_controller.get(queue_name, + subscription_id, + project=project_id) + + except storage_errors.DoesNotExist as ex: + LOG.debug(ex) + raise falcon.HTTPNotFound() + + except Exception as ex: + LOG.exception(ex) + description = _(u'Subscription could not be retrieved.') + raise wsgi_errors.HTTPServiceUnavailable(description) + + resp.body = utils.to_json(resp_dict) + # status defaults to 200 + + def on_delete(self, req, resp, project_id, queue_name, subscription_id): + LOG.debug(u'Subscription DELETE - ' + u'subscription id: %(subscription_id)s,' + u' project: %(project)s, queue: %(queue)s', + {'subscription_id': subscription_id, 'project': project_id, + 'queue': queue_name}) + try: + self._subscription_controller.delete(queue_name, + subscription_id, + project=project_id) + + except Exception as ex: + LOG.exception(ex) + description = _(u'Subscription could not be deleted.') + raise wsgi_errors.HTTPServiceUnavailable(description) + + resp.status = falcon.HTTP_204 + + def on_patch(self, req, resp, project_id, queue_name, subscription_id): + LOG.debug(u'Subscription PATCH - subscription id: %(subscription_id)s,' + u' project: %(project)s, queue: %(queue)s', + {'subscription_id': subscription_id, 'project': project_id, + 'queue': queue_name}) + + if req.content_length: + document = wsgi_utils.deserialize(req.stream, req.content_length) + + try: + self._validate.subscription_patching(document) + self._subscription_controller.update(queue_name, subscription_id, + project=project_id, + **document) + resp.status = falcon.HTTP_204 + resp.location = req.path + except storage_errors.SubscriptionDoesNotExist as ex: + LOG.debug(ex) + raise falcon.HTTPNotFound() + except validation.ValidationFailed as ex: + LOG.debug(ex) + raise wsgi_errors.HTTPBadRequestAPI(six.text_type(ex)) + except Exception as ex: + LOG.exception(ex) + description = (_(u'Subscription {subscription_id} could not be' + ' updated.') % + dict(subscription_id=subscription_id)) + raise falcon.HTTPBadRequest(_('Unable to update subscription'), + description) + + +class CollectionResource(object): + + __slots__ = ('_subscription_controller', '_validate') + + def __init__(self, validate, subscription_controller): + self._subscription_controller = subscription_controller + self._validate = validate + + def on_get(self, req, resp, project_id, queue_name): + LOG.debug(u'Subscription collection GET - project: %(project)s, ' + u'queue: %(queue)s', + {'project': project_id, 'queue': queue_name}) + + kwargs = {} + + # NOTE(kgriffs): This syntax ensures that + # we don't clobber default values with None. + req.get_param('marker', store=kwargs) + req.get_param_as_int('limit', store=kwargs) + + try: + self._validate.subscription_listing(**kwargs) + results = self._subscription_controller.list(queue_name, + project=project_id, + **kwargs) + except validation.ValidationFailed as ex: + LOG.debug(ex) + raise wsgi_errors.HTTPBadRequestAPI(six.text_type(ex)) + + except Exception as ex: + LOG.exception(ex) + description = _(u'Subscriptions could not be listed.') + raise wsgi_errors.HTTPServiceUnavailable(description) + + # Buffer list of subscriptions + subscriptions = list(next(results)) + + # Got some. Prepare the response. + kwargs['marker'] = next(results) or kwargs.get('marker', '') + + response_body = { + 'subscriptions': subscriptions, + 'links': [ + { + 'rel': 'next', + 'href': req.path + falcon.to_query_str(kwargs) + } + ] + } + + resp.body = utils.to_json(response_body) + # status defaults to 200 + + def on_post(self, req, resp, project_id, queue_name): + LOG.debug(u'Subscription item POST - project: %(project)s, ' + u'queue: %(queue)s', + {'project': project_id, 'queue': queue_name}) + + if req.content_length: + document = wsgi_utils.deserialize(req.stream, req.content_length) + + try: + self._validate.subscription_posting(document) + subscriber = document['subscriber'] + ttl = int(document['ttl']) + options = document['options'] + created = self._subscription_controller.create(queue_name, + subscriber, + ttl, + options, + project=project_id) + + except storage_errors.QueueDoesNotExist as ex: + LOG.exception(ex) + raise wsgi_errors.HTTPBadRequestAPI(six.text_type(ex)) + except validation.ValidationFailed as ex: + LOG.debug(ex) + raise wsgi_errors.HTTPBadRequestAPI(six.text_type(ex)) + except Exception as ex: + LOG.exception(ex) + description = _(u'Subscription could not be created.') + raise wsgi_errors.HTTPServiceUnavailable(description) + + resp.status = falcon.HTTP_201 if created else falcon.HTTP_409 + resp.location = req.path