Merge "Add REST API for notification/subscriptions"

This commit is contained in:
Jenkins 2015-03-31 18:31:18 +00:00 committed by Gerrit Code Review
commit f340719b72
8 changed files with 555 additions and 7 deletions

View File

@ -93,6 +93,10 @@ class TestValidation(v2_0.TestValidation):
class TestFlavorsMongoDB(v2_0.TestFlavorsMongoDB):
url_prefix = URL_PREFIX
class TestSubscriptionsMongoDB(v2_0.TestSubscriptionsMongoDB):
url_prefix = URL_PREFIX
# --------------------------------------------------------------------------
# v1.1 & v2 only
# --------------------------------------------------------------------------

View File

@ -53,7 +53,7 @@ class SubscriptionController(base.Subscription):
def list(self, queue, project=None, marker=None, limit=10):
query = {'s': queue, 'p': project}
if marker is not None:
query['_id'] = {'$gt': marker}
query['_id'] = {'$gt': utils.to_oid(marker)}
fields = {'s': 1, 'u': 1, 't': 1, 'p': 1, 'o': 1, '_id': 1}
@ -98,7 +98,6 @@ class SubscriptionController(base.Subscription):
'_id': 0})
if target_source is None:
raise errors.QueueDoesNotExist(target_source, project)
try:
subscription_id = self._collection.insert({'s': source,
'u': subscriber,

View File

@ -374,9 +374,9 @@ class SubscriptionController(storage.Subscription):
def create(self, queue, subscriber, ttl, options, project=None):
control = self._get_controller(queue, project)
if control:
return control.post(queue, subscriber,
ttl, options,
project=project)
return control.create(queue, subscriber,
ttl, options,
project=project)
def update(self, queue, subscription_id, project=None, **kwargs):
control = self._get_controller(queue, project)

View File

@ -22,6 +22,7 @@ from zaqar.tests.unit.transport.wsgi.v2_0 import test_media_type
from zaqar.tests.unit.transport.wsgi.v2_0 import test_messages
from zaqar.tests.unit.transport.wsgi.v2_0 import test_pools
from zaqar.tests.unit.transport.wsgi.v2_0 import test_queue_lifecycle as l
from zaqar.tests.unit.transport.wsgi.v2_0 import test_subscriptions
from zaqar.tests.unit.transport.wsgi.v2_0 import test_validation
TestAuth = test_auth.TestAuth
@ -42,3 +43,4 @@ TestPoolsMongoDB = test_pools.TestPoolsMongoDB
TestPoolsSqlalchemy = test_pools.TestPoolsSqlalchemy
TestValidation = test_validation.TestValidation
TestFlavorsMongoDB = test_flavors.TestFlavorsMongoDB
TestSubscriptionsMongoDB = test_subscriptions.TestSubscriptionsMongoDB

View File

@ -0,0 +1,268 @@
# Copyright (c) 2015 Catalyst IT Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy
# of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.
import uuid
import ddt
import falcon
from oslo.serialization import jsonutils
from zaqar import tests as testing
from zaqar.tests.unit.transport.wsgi import base
@ddt.ddt
class SubscriptionsBaseTest(base.V2Base):
def setUp(self):
super(SubscriptionsBaseTest, self).setUp()
if self.conf.pooling:
for i in range(1):
uri = self.conf['drivers:management_store:mongodb'].uri
doc = {'weight': 100, 'uri': uri}
self.simulate_put(self.url_prefix + '/pools/' + str(i),
body=jsonutils.dumps(doc))
self.assertEqual(self.srmock.status, falcon.HTTP_201)
self.addCleanup(self.simulate_delete,
self.url_prefix + '/pools/' + str(i),
headers=self.headers)
self.project_id = '7e55e1a7exyz'
self.headers = {
'Client-ID': str(uuid.uuid4()),
'X-Project-ID': self.project_id
}
self.queue = 'fake-topic'
self.queue_path = self.url_prefix + '/queues/' + self.queue
doc = '{"_ttl": 60}'
self.simulate_put(self.queue_path, body=doc, headers=self.headers)
self.subscription_path = (self.url_prefix + '/queues/' + self.queue +
'/subscriptions')
self.addCleanup(self._delete_subscription)
def tearDown(self):
super(SubscriptionsBaseTest, self).tearDown()
def _delete_subscription(self, sid=None):
if sid:
self.simulate_delete(self.subscription_path + '/' + sid,
headers=self.headers)
else:
resp = self.simulate_get(self.subscription_path,
headers=self.headers)
resp_doc = jsonutils.loads(resp[0])
for s in resp_doc['subscriptions']:
self.simulate_delete(self.subscription_path + '/' + s['id'],
headers=self.headers)
self.simulate_delete(self.queue_path)
def _create_subscription(self,
subscriber='http://triger.me',
ttl=600,
options='{"a":1}'):
doc = ('{"subscriber": "%s", "ttl": %s, "options": %s}' % (subscriber,
ttl,
options))
return self.simulate_post(self.subscription_path, body=doc,
headers=self.headers)
def test_create_works(self):
self._create_subscription()
self.assertEqual(self.srmock.status, falcon.HTTP_201)
def test_create_duplicate_409(self):
self._create_subscription(subscriber='http://CCC.com')
self.assertEqual(self.srmock.status, falcon.HTTP_201)
self._create_subscription(subscriber='http://CCC.com')
self.assertEqual(self.srmock.status, falcon.HTTP_409)
def test_create_invalid_body_400(self):
resp = self._create_subscription(options='xxx')
self.assertEqual(self.srmock.status, falcon.HTTP_400)
resp_doc = jsonutils.loads(resp[0])
self.assertIn('body could not be parsed', resp_doc['description'])
def test_create_invalid_subscriber_400(self):
resp = self._create_subscription(subscriber='fake')
self.assertEqual(self.srmock.status, falcon.HTTP_400)
resp_doc = jsonutils.loads(resp[0])
self.assertIn('must be supported in the list', resp_doc['description'])
def test_create_unsupported_subscriber_400(self):
resp = self._create_subscription(subscriber='email://fake')
self.assertEqual(self.srmock.status, falcon.HTTP_400)
resp_doc = jsonutils.loads(resp[0])
self.assertIn('must be supported in the list',
resp_doc['description'])
def test_create_invalid_options_400(self):
resp = self._create_subscription(options='1')
self.assertEqual(self.srmock.status, falcon.HTTP_400)
resp_doc = jsonutils.loads(resp[0])
self.assertIn('must be a dict', resp_doc['description'])
def test_create_invalid_ttl(self):
resp = self._create_subscription(ttl='"invalid"')
self.assertEqual(self.srmock.status, falcon.HTTP_400)
resp_doc = jsonutils.loads(resp[0])
self.assertIn('must be an integer', resp_doc['description'])
def _list_subscription(self, count=10, limit=10, marker=None):
for i in range(count):
self._create_subscription(subscriber='http://' + str(i))
query = '?limit={0}'.format(limit)
if marker:
query += '&marker={1}'.format(marker)
resp = self.simulate_get(self.subscription_path,
query_string=query,
headers=self.headers)
self.assertEqual(self.srmock.status, falcon.HTTP_200)
resp_doc = jsonutils.loads(resp[0])
self.assertIsInstance(resp_doc, dict)
self.assertIn('subscriptions', resp_doc)
self.assertIn('links', resp_doc)
subscriptions_list = resp_doc['subscriptions']
link = resp_doc['links'][0]
self.assertEqual('next', link['rel'])
href = falcon.uri.parse_query_string(link['href'])
self.assertIn('marker', href)
self.assertEqual(href['limit'], str(limit))
next_query_string = ('?marker={marker}&limit={limit}').format(**href)
next_result = self.simulate_get(link['href'].split('?')[0],
query_string=next_query_string)
next_subscriptions = jsonutils.loads(next_result[0])
next_subscriptions_list = next_subscriptions['subscriptions']
self.assertEqual(self.srmock.status, falcon.HTTP_200)
self.assertIn('links', next_subscriptions)
if limit < count:
self.assertEqual(len(next_subscriptions_list),
min(limit, count-limit))
else:
self.assertTrue(len(next_subscriptions_list) == 0)
self.assertEqual(len(subscriptions_list), min(limit, count))
def test_list_works(self):
self._list_subscription()
@ddt.data(1, 5, 10, 15)
def test_listing_works_with_limit(self, limit):
self._list_subscription(count=15, limit=limit)
def test_listing_marker_is_respected(self):
for i in range(15):
self._create_subscription(subscriber='http://' + str(i))
resp = self.simulate_get(self.subscription_path,
query_string='?limit=20',
headers=self.headers)
subscriptions_list = jsonutils.loads(resp[0])['subscriptions']
id_list = sorted([s['id'] for s in subscriptions_list])
resp = self.simulate_get(self.subscription_path,
query_string='?marker={0}'.format(id_list[9]),
headers=self.headers)
self.assertEqual(self.srmock.status, falcon.HTTP_200)
next_subscriptions_list = jsonutils.loads(resp[0])['subscriptions']
self.assertEqual(len(next_subscriptions_list), 5)
self.assertEqual(subscriptions_list[10], next_subscriptions_list[0])
def test_get_works(self):
self._create_subscription()
resp = self.simulate_get(self.subscription_path,
headers=self.headers)
resp_doc = jsonutils.loads(resp[0])
sid = resp_doc['subscriptions'][0]['id']
subscriber = resp_doc['subscriptions'][0]['subscriber']
resp = self.simulate_get(self.subscription_path + '/' + sid,
headers=self.headers)
self.assertEqual(self.srmock.status, falcon.HTTP_200)
resp_doc = jsonutils.loads(resp[0])
self.assertEqual(sid, resp_doc['id'])
self.assertEqual(subscriber, resp_doc['subscriber'])
def test_get_nonexisting_raise_404(self):
self.simulate_get(self.subscription_path + '/fake',
headers=self.headers)
self.assertEqual(self.srmock.status, falcon.HTTP_404)
def test_patch_works(self):
self._create_subscription()
resp = self.simulate_get(self.subscription_path,
headers=self.headers)
resp_doc = jsonutils.loads(resp[0])
sid = resp_doc['subscriptions'][0]['id']
resp = self.simulate_patch(self.subscription_path + '/' + sid,
body='{"ttl": 300}',
headers=self.headers)
self.assertEqual(self.srmock.status, falcon.HTTP_204)
resp = self.simulate_get(self.subscription_path + '/' + sid,
headers=self.headers)
self.assertEqual(self.srmock.status, falcon.HTTP_200)
resp_doc = jsonutils.loads(resp[0])
self.assertEqual(resp_doc['ttl'], 300)
def test_patch_nonexisting_raise_404(self):
self.simulate_patch(self.subscription_path + '/x',
body='{"ttl": 300}',
headers=self.headers)
self.assertEqual(self.srmock.status, falcon.HTTP_404)
def test_patch_invalid_ttl(self):
self.simulate_patch(self.subscription_path + '/x',
body='{"ttl": "invalid"}',
headers=self.headers)
self.assertEqual(self.srmock.status, falcon.HTTP_400)
def test_delete_works(self):
self._create_subscription()
resp = self.simulate_get(self.subscription_path,
headers=self.headers)
resp_doc = jsonutils.loads(resp[0])
sid = resp_doc['subscriptions'][0]['id']
resp = self.simulate_get(self.subscription_path + '/' + sid,
headers=self.headers)
self.assertEqual(self.srmock.status, falcon.HTTP_200)
self.simulate_delete(self.subscription_path + '/' + sid,
headers=self.headers)
self.assertEqual(self.srmock.status, falcon.HTTP_204)
resp = self.simulate_get(self.subscription_path + '/' + sid,
headers=self.headers)
self.assertEqual(self.srmock.status, falcon.HTTP_404)
class TestSubscriptionsMongoDB(SubscriptionsBaseTest):
config_file = 'wsgi_mongodb_pooled.conf'
@testing.requires_mongodb
def setUp(self):
super(TestSubscriptionsMongoDB, self).setUp()

View File

@ -1,4 +1,5 @@
# Copyright (c) 2013 Rackspace, Inc.
# Copyright (c) 2015 Catalyst IT Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -15,7 +16,8 @@
import re
from oslo_config import cfg
from oslo.config import cfg
import six
from zaqar.i18n import _
@ -34,6 +36,11 @@ _TRANSPORT_LIMITS_OPTIONS = (
deprecated_group='limits:transport',
help='Defines the maximum number of messages per page.'),
cfg.IntOpt('max_subscriptions_per_page', default=20,
deprecated_name='subscription_paging_uplimit',
deprecated_group='limits:transport',
help='Defines the maximum number of subscriptions per page.'),
cfg.IntOpt('max_messages_per_claim_or_pop', default=20,
deprecated_name='max_messages_per_claim',
help='The maximum number of messages that can be claimed (OR) '
@ -64,6 +71,9 @@ _TRANSPORT_LIMITS_OPTIONS = (
deprecated_name='claim_grace_max',
deprecated_group='limits:transport',
help='Defines the maximum message grace period in seconds.'),
cfg.ListOpt('subscriber_types', default=['http'],
help='Defines supported subscriber types.'),
)
_TRANSPORT_LIMITS_GROUP = 'transport'
@ -278,3 +288,63 @@ class Validator(object):
raise ValidationFailed(
msg, self._limits_conf.max_message_ttl, MIN_CLAIM_TTL)
def subscription_posting(self, subscription):
"""Restrictions on a creation of subscription.
:param subscription: dict of subscription
:raises: ValidationFailed if the subscription is invalid.
"""
for p in ('subscriber', 'ttl', 'options'):
if p not in subscription.keys():
raise ValidationFailed(_(u'Missing parameter %s in body.') % p)
self.subscription_patching(subscription)
def subscription_patching(self, subscription):
"""Restrictions on an update of subscription.
:param subscription: dict of subscription
:raises: ValidationFailed if the subscription is invalid.
"""
if not subscription:
raise ValidationFailed(_(u'No subscription to create.'))
subscriber = subscription.get('subscriber', None)
subscriber_type = None
if subscriber:
parsed_uri = six.moves.urllib_parse.urlparse(subscriber)
subscriber_type = parsed_uri.scheme
if subscriber_type not in self._limits_conf.subscriber_types:
msg = _(u'The subscriber type of subscription must be '
u'supported in the list {0}.')
raise ValidationFailed(msg, self._limits_conf.subscriber_types)
options = subscription.get('options', None)
if options and not isinstance(options, dict):
msg = _(u'Options must be a dict.')
raise ValidationFailed(msg)
ttl = subscription.get('ttl', None)
if ttl and not isinstance(ttl, int):
msg = _(u'TTL must be an integer.')
raise ValidationFailed(msg)
def subscription_listing(self, limit=None, **kwargs):
"""Restrictions involving a list of subscriptions.
:param limit: The expected number of subscriptions in the list
:param kwargs: Ignored arguments passed to storage API
:raises: ValidationFailed if the limit is exceeded
"""
uplimit = self._limits_conf.max_subscriptions_per_page
if limit is not None and not (0 < limit <= uplimit):
msg = _(u'Limit must be at least 1 and may not '
'be greater than {0}.')
raise ValidationFailed(
msg, self._limits_conf.max_subscriptions_per_page)

View File

@ -21,6 +21,7 @@ from zaqar.transport.wsgi.v2_0 import ping
from zaqar.transport.wsgi.v2_0 import pools
from zaqar.transport.wsgi.v2_0 import queues
from zaqar.transport.wsgi.v2_0 import stats
from zaqar.transport.wsgi.v2_0 import subscriptions
VERSION = {
@ -46,6 +47,7 @@ def public_endpoints(driver, conf):
queue_controller = driver._storage.queue_controller
message_controller = driver._storage.message_controller
claim_controller = driver._storage.claim_controller
subscription_controller = driver._storage.subscription_controller
defaults = driver._defaults
@ -91,7 +93,16 @@ def public_endpoints(driver, conf):
# Ping
('/ping',
ping.Resource(driver._storage))
ping.Resource(driver._storage)),
# Subscription Endpoints
('/queues/{queue_name}/subscriptions',
subscriptions.CollectionResource(driver._validate,
subscription_controller)),
('/queues/{queue_name}/subscriptions/{subscription_id}',
subscriptions.ItemResource(driver._validate,
subscription_controller)),
]

View File

@ -0,0 +1,194 @@
# Copyright (c) 2015 Catalyst IT Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import falcon
import six
from zaqar.i18n import _
import zaqar.openstack.common.log as logging
from zaqar.storage import errors as storage_errors
from zaqar.transport import utils
from zaqar.transport import validation
from zaqar.transport.wsgi import errors as wsgi_errors
from zaqar.transport.wsgi import utils as wsgi_utils
LOG = logging.getLogger(__name__)
class ItemResource(object):
__slots__ = ('_validate', '_subscription_controller')
def __init__(self, validate, subscription_controller):
self._validate = validate
self._subscription_controller = subscription_controller
def on_get(self, req, resp, project_id, queue_name, subscription_id):
LOG.debug(u'Subscription GET - subscription id: %(subscription_id)s,'
u' project: %(project)s, queue: %(queue)s',
{'subscription_id': subscription_id, 'project': project_id,
'queue': queue_name})
try:
resp_dict = self._subscription_controller.get(queue_name,
subscription_id,
project=project_id)
except storage_errors.DoesNotExist as ex:
LOG.debug(ex)
raise falcon.HTTPNotFound()
except Exception as ex:
LOG.exception(ex)
description = _(u'Subscription could not be retrieved.')
raise wsgi_errors.HTTPServiceUnavailable(description)
resp.body = utils.to_json(resp_dict)
# status defaults to 200
def on_delete(self, req, resp, project_id, queue_name, subscription_id):
LOG.debug(u'Subscription DELETE - '
u'subscription id: %(subscription_id)s,'
u' project: %(project)s, queue: %(queue)s',
{'subscription_id': subscription_id, 'project': project_id,
'queue': queue_name})
try:
self._subscription_controller.delete(queue_name,
subscription_id,
project=project_id)
except Exception as ex:
LOG.exception(ex)
description = _(u'Subscription could not be deleted.')
raise wsgi_errors.HTTPServiceUnavailable(description)
resp.status = falcon.HTTP_204
def on_patch(self, req, resp, project_id, queue_name, subscription_id):
LOG.debug(u'Subscription PATCH - subscription id: %(subscription_id)s,'
u' project: %(project)s, queue: %(queue)s',
{'subscription_id': subscription_id, 'project': project_id,
'queue': queue_name})
if req.content_length:
document = wsgi_utils.deserialize(req.stream, req.content_length)
try:
self._validate.subscription_patching(document)
self._subscription_controller.update(queue_name, subscription_id,
project=project_id,
**document)
resp.status = falcon.HTTP_204
resp.location = req.path
except storage_errors.SubscriptionDoesNotExist as ex:
LOG.debug(ex)
raise falcon.HTTPNotFound()
except validation.ValidationFailed as ex:
LOG.debug(ex)
raise wsgi_errors.HTTPBadRequestAPI(six.text_type(ex))
except Exception as ex:
LOG.exception(ex)
description = (_(u'Subscription {subscription_id} could not be'
' updated.') %
dict(subscription_id=subscription_id))
raise falcon.HTTPBadRequest(_('Unable to update subscription'),
description)
class CollectionResource(object):
__slots__ = ('_subscription_controller', '_validate')
def __init__(self, validate, subscription_controller):
self._subscription_controller = subscription_controller
self._validate = validate
def on_get(self, req, resp, project_id, queue_name):
LOG.debug(u'Subscription collection GET - project: %(project)s, '
u'queue: %(queue)s',
{'project': project_id, 'queue': queue_name})
kwargs = {}
# NOTE(kgriffs): This syntax ensures that
# we don't clobber default values with None.
req.get_param('marker', store=kwargs)
req.get_param_as_int('limit', store=kwargs)
try:
self._validate.subscription_listing(**kwargs)
results = self._subscription_controller.list(queue_name,
project=project_id,
**kwargs)
except validation.ValidationFailed as ex:
LOG.debug(ex)
raise wsgi_errors.HTTPBadRequestAPI(six.text_type(ex))
except Exception as ex:
LOG.exception(ex)
description = _(u'Subscriptions could not be listed.')
raise wsgi_errors.HTTPServiceUnavailable(description)
# Buffer list of subscriptions
subscriptions = list(next(results))
# Got some. Prepare the response.
kwargs['marker'] = next(results) or kwargs.get('marker', '')
response_body = {
'subscriptions': subscriptions,
'links': [
{
'rel': 'next',
'href': req.path + falcon.to_query_str(kwargs)
}
]
}
resp.body = utils.to_json(response_body)
# status defaults to 200
def on_post(self, req, resp, project_id, queue_name):
LOG.debug(u'Subscription item POST - project: %(project)s, '
u'queue: %(queue)s',
{'project': project_id, 'queue': queue_name})
if req.content_length:
document = wsgi_utils.deserialize(req.stream, req.content_length)
try:
self._validate.subscription_posting(document)
subscriber = document['subscriber']
ttl = int(document['ttl'])
options = document['options']
created = self._subscription_controller.create(queue_name,
subscriber,
ttl,
options,
project=project_id)
except storage_errors.QueueDoesNotExist as ex:
LOG.exception(ex)
raise wsgi_errors.HTTPBadRequestAPI(six.text_type(ex))
except validation.ValidationFailed as ex:
LOG.debug(ex)
raise wsgi_errors.HTTPBadRequestAPI(six.text_type(ex))
except Exception as ex:
LOG.exception(ex)
description = _(u'Subscription could not be created.')
raise wsgi_errors.HTTPServiceUnavailable(description)
resp.status = falcon.HTTP_201 if created else falcon.HTTP_409
resp.location = req.path