Support purge queue -- wsgi

A new endpoint /v2/queues/myqueue/purge is added to support purge
a queue, which accepts a POST body like:
{"resource_types": ["messages", "subscriptions"]} to allow user
purge particular resource of the queue. Test cases are added as
well.

APIImpact
DocImpact

Partially Implements: blueprint purge-queue

Change-Id: Ie82713fce7cb0db6612693cee81be8c3170d292a
This commit is contained in:
Fei Long Wang 2016-11-22 14:12:18 +13:00
parent cd1ccb0c66
commit 460c345298
9 changed files with 261 additions and 1 deletions

View File

@ -10,6 +10,7 @@
"queues:update": "",
"queues:stats": "",
"queues:share": "",
"queues:purge": "",
"messages:get_all": "",
"messages:create": "",

View File

@ -0,0 +1,4 @@
features:
- A new queue action is added so that users can purge a queue
quickly. That means all the messages and subscriptions will be deleted
automatically but the metadata of the queue will be kept.

View File

@ -10,6 +10,7 @@
"queues:update": "",
"queues:stats": "",
"queues:share": "",
"queues:purge": "",
"messages:get_all": "",
"messages:create": "",

View File

@ -0,0 +1,119 @@
# Copyright 2016 Catalyst IT Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import falcon
import uuid
from oslo_serialization import jsonutils
from zaqar.tests.unit.transport.wsgi import base
class TestPurge(base.V2Base):
config_file = 'wsgi_mongodb.conf'
def setUp(self):
super(TestPurge, self).setUp()
self.headers = {
'Client-ID': str(uuid.uuid4())
}
self.queue_path = self.url_prefix + '/queues/myqueue'
self.messages_path = self.queue_path + '/messages'
self.subscription_path = (self.queue_path + '/subscriptions')
self.messages = {'messages': [{'body': 'A', 'ttl': 300},
{'body': 'B', 'ttl': 400},
{'body': 'C', 'ttl': 500}]}
self.subscriptions = {"subscriber": "http://ping.me", "ttl": 3600,
"options": {"key": "value"}}
def tearDown(self):
self.simulate_delete(self.queue_path, headers=self.headers)
super(TestPurge, self).tearDown()
def _get_msg_id(self, headers):
return self._get_msg_ids(headers)[0]
def _get_msg_ids(self, headers):
return headers['location'].rsplit('=', 1)[-1].split(',')
def test_purge_particular_resource(self):
# Post messages
messages_body = jsonutils.dumps(self.messages)
self.simulate_post(self.messages_path, body=messages_body,
headers=self.headers)
msg_ids = self._get_msg_ids(self.srmock.headers_dict)
for msg_id in msg_ids:
target = self.messages_path + '/' + msg_id
self.simulate_get(target, headers=self.headers)
self.assertEqual(falcon.HTTP_200, self.srmock.status)
# Post subscriptions
sub_resp = self.simulate_post(self.subscription_path,
body=jsonutils.dumps(self.subscriptions),
headers=self.headers)
# Purge queue
purge_body = jsonutils.dumps({'resource_types': ['messages']})
self.simulate_post(self.queue_path+"/purge", body=purge_body)
for msg_id in msg_ids:
target = self.messages_path + '/' + msg_id
self.simulate_get(target, headers=self.headers)
self.assertEqual(falcon.HTTP_404, self.srmock.status)
# Check subscriptions are still there
resp_list = self.simulate_get(self.subscription_path,
headers=self.headers)
resp_list_doc = jsonutils.loads(resp_list[0])
sid = resp_list_doc['subscriptions'][0]['id']
sub_resp_doc = jsonutils.loads(sub_resp[0])
self.assertEqual(sub_resp_doc['subscription_id'], sid)
def test_purge_by_default(self):
# Post messages
messages_body = jsonutils.dumps(self.messages)
self.simulate_post(self.messages_path, body=messages_body,
headers=self.headers)
msg_ids = self._get_msg_ids(self.srmock.headers_dict)
for msg_id in msg_ids:
target = self.messages_path + '/' + msg_id
self.simulate_get(target, headers=self.headers)
self.assertEqual(falcon.HTTP_200, self.srmock.status)
# Post subscriptions
sub_resp = self.simulate_post(self.subscription_path,
body=jsonutils.dumps(self.subscriptions),
headers=self.headers)
# Purge queue
purge_body = jsonutils.dumps({'resource_types': ['messages',
'subscriptions']})
self.simulate_post(self.queue_path+"/purge", body=purge_body)
for msg_id in msg_ids:
target = self.messages_path + '/' + msg_id
self.simulate_get(target, headers=self.headers)
self.assertEqual(falcon.HTTP_404, self.srmock.status)
# Check subscriptions are still there
sub_id = jsonutils.loads(sub_resp[0])['subscription_id']
self.simulate_get(self.subscription_path + "/" + sub_id,
headers=self.headers)
self.assertEqual(falcon.HTTP_404, self.srmock.status)

View File

@ -179,3 +179,25 @@ class TestValidation(base.V2Base):
body='[{"op":"add","path":"/metadata/a",'
'"value":2}]')
self.assertEqual(falcon.HTTP_200, self.srmock.status)
def test_queue_purge(self):
# Wrong key
queue_1 = self.url_prefix + '/queues/queue1/purge'
self.simulate_post(queue_1,
self.project_id,
body='{"wrong_key": ["messages"]}')
self.addCleanup(self.simulate_delete, queue_1, self.project_id,
headers=self.headers)
self.assertEqual(falcon.HTTP_400, self.srmock.status)
# Wrong value
self.simulate_post(queue_1,
self.project_id,
body='{"resource_types": ["wrong_value"]}')
self.assertEqual(falcon.HTTP_400, self.srmock.status)
# Correct input
self.simulate_post(queue_1,
self.project_id,
body='{"resource_types": ["messages"]}')
self.assertEqual(falcon.HTTP_204, self.srmock.status)

View File

@ -27,6 +27,7 @@ MIN_MESSAGE_TTL = 60
MIN_CLAIM_TTL = 60
MIN_CLAIM_GRACE = 60
MIN_SUBSCRIPTION_TTL = 60
_PURGBLE_RESOURCE_TYPES = {'messages', 'subscriptions'}
_TRANSPORT_LIMITS_OPTIONS = (
cfg.IntOpt('max_queues_per_page', default=20,
@ -320,6 +321,22 @@ class Validator(object):
' and must be at least greater than 0.'),
self._limits_conf.max_messages_post_size)
def queue_purging(self, document):
"""Restrictions the resource types to be purged for a queue.
:param resource_types: Type list of all resource under a queue
:raises: ValidationFailed if the resource types are invalid
"""
if 'resource_types' not in document:
msg = _(u'Post body must contain key "resource_types".')
raise ValidationFailed(msg)
if (not set(document['resource_types']).issubset(
_PURGBLE_RESOURCE_TYPES)):
msg = _(u'Resource types must be a sub set of {0}.')
raise ValidationFailed(msg, _PURGBLE_RESOURCE_TYPES)
def message_posting(self, messages):
"""Restrictions on a list of messages.

View File

@ -20,6 +20,7 @@ from zaqar.transport.wsgi.v2_0 import homedoc
from zaqar.transport.wsgi.v2_0 import messages
from zaqar.transport.wsgi.v2_0 import ping
from zaqar.transport.wsgi.v2_0 import pools
from zaqar.transport.wsgi.v2_0 import purge
from zaqar.transport.wsgi.v2_0 import queues
from zaqar.transport.wsgi.v2_0 import stats
from zaqar.transport.wsgi.v2_0 import subscriptions
@ -69,7 +70,8 @@ def public_endpoints(driver, conf):
message_controller)),
('/queues/{queue_name}/stats',
stats.Resource(queue_controller)),
('/queues/{queue_name}/purge',
purge.Resource(driver)),
# Messages Endpoints
('/queues/{queue_name}/messages',
messages.CollectionResource(driver._wsgi_conf,

View File

@ -72,6 +72,19 @@ JSON_HOME = {
'accept-post': ['application/json'],
},
},
'rel/queue_purge': {
'href-template': '/v2/queues/{queue_name}/purge',
'href-vars': {
'queue_name': 'param/queue_name',
},
'hints': {
'allow': ['POST'],
'formats': {
'application/json': {},
},
'accept-post': ['application/json'],
},
},
# -----------------------------------------------------------------
# Messages

View File

@ -0,0 +1,81 @@
# Copyright 2016 Catalyst IT Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy
# of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.
import falcon
from oslo_log import log as logging
import six
from zaqar.common import decorators
from zaqar.transport import acl
from zaqar.transport import validation
from zaqar.transport.wsgi import errors as wsgi_errors
from zaqar.transport.wsgi import utils as wsgi_utils
LOG = logging.getLogger(__name__)
class Resource(object):
__slots__ = ('_driver', '_conf', '_queue_ctrl',
'_message_ctrl', '_subscription_ctrl', '_validate')
def __init__(self, driver):
self._driver = driver
self._conf = driver._conf
self._queue_ctrl = driver._storage.queue_controller
self._message_ctrl = driver._storage.message_controller
self._subscription_ctrl = driver._storage.subscription_controller
self._validate = driver._validate
@decorators.TransportLog("Queue item")
@acl.enforce("queues:purge")
def on_post(self, req, resp, project_id, queue_name):
try:
if req.content_length:
document = wsgi_utils.deserialize(req.stream,
req.content_length)
self._validate.queue_purging(document)
else:
document = {'resource_types': ['messages', 'subscriptions']}
except ValueError as ex:
LOG.debug(ex)
raise wsgi_errors.HTTPBadRequestAPI(six.text_type(ex))
except validation.ValidationFailed as ex:
LOG.debug(ex)
raise wsgi_errors.HTTPBadRequestAPI(six.text_type(ex))
try:
if "messages" in document['resource_types']:
LOG.debug("Purge all messages under queue %s" % queue_name)
messages = self._message_ctrl.pop(queue_name, 10,
project=project_id)
while messages:
messages = self._message_ctrl.pop(queue_name, 10,
project=project_id)
if "subscriptions" in document['resource_types']:
LOG.debug("Purge all subscriptions under queue %s" %
queue_name)
results = self._subscription_ctrl.list(queue_name,
project=project_id)
subscriptions = list(next(results))
for sub in subscriptions:
self._subscription_ctrl.delete(queue_name,
sub['id'],
project=project_id)
except ValueError as err:
raise wsgi_errors.HTTPBadRequestAPI(str(err))
resp.status = falcon.HTTP_204