diff --git a/etc/policy.json.sample b/etc/policy.json.sample index df6c63832..83a6bd5d4 100644 --- a/etc/policy.json.sample +++ b/etc/policy.json.sample @@ -10,6 +10,7 @@ "queues:update": "", "queues:stats": "", "queues:share": "", + "queues:purge": "", "messages:get_all": "", "messages:create": "", diff --git a/releasenotes/notes/purge-queue-6788a249ee59d55a.yaml b/releasenotes/notes/purge-queue-6788a249ee59d55a.yaml new file mode 100644 index 000000000..f98d89ca2 --- /dev/null +++ b/releasenotes/notes/purge-queue-6788a249ee59d55a.yaml @@ -0,0 +1,4 @@ +features: + - A new queue action is added so that users can purge a queue + quickly. That means all the messages and subscriptions will be deleted + automatically but the metadata of the queue will be kept. diff --git a/zaqar/tests/etc/policy.json b/zaqar/tests/etc/policy.json index df6c63832..83a6bd5d4 100644 --- a/zaqar/tests/etc/policy.json +++ b/zaqar/tests/etc/policy.json @@ -10,6 +10,7 @@ "queues:update": "", "queues:stats": "", "queues:share": "", + "queues:purge": "", "messages:get_all": "", "messages:create": "", diff --git a/zaqar/tests/unit/transport/wsgi/v2_0/test_purge.py b/zaqar/tests/unit/transport/wsgi/v2_0/test_purge.py new file mode 100644 index 000000000..42a0b06c2 --- /dev/null +++ b/zaqar/tests/unit/transport/wsgi/v2_0/test_purge.py @@ -0,0 +1,119 @@ +# Copyright 2016 Catalyst IT Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import falcon +import uuid + +from oslo_serialization import jsonutils + +from zaqar.tests.unit.transport.wsgi import base + + +class TestPurge(base.V2Base): + + config_file = 'wsgi_mongodb.conf' + + def setUp(self): + super(TestPurge, self).setUp() + + self.headers = { + 'Client-ID': str(uuid.uuid4()) + } + self.queue_path = self.url_prefix + '/queues/myqueue' + self.messages_path = self.queue_path + '/messages' + self.subscription_path = (self.queue_path + '/subscriptions') + + self.messages = {'messages': [{'body': 'A', 'ttl': 300}, + {'body': 'B', 'ttl': 400}, + {'body': 'C', 'ttl': 500}]} + self.subscriptions = {"subscriber": "http://ping.me", "ttl": 3600, + "options": {"key": "value"}} + + def tearDown(self): + self.simulate_delete(self.queue_path, headers=self.headers) + super(TestPurge, self).tearDown() + + def _get_msg_id(self, headers): + return self._get_msg_ids(headers)[0] + + def _get_msg_ids(self, headers): + return headers['location'].rsplit('=', 1)[-1].split(',') + + def test_purge_particular_resource(self): + # Post messages + messages_body = jsonutils.dumps(self.messages) + self.simulate_post(self.messages_path, body=messages_body, + headers=self.headers) + + msg_ids = self._get_msg_ids(self.srmock.headers_dict) + for msg_id in msg_ids: + target = self.messages_path + '/' + msg_id + self.simulate_get(target, headers=self.headers) + self.assertEqual(falcon.HTTP_200, self.srmock.status) + + # Post subscriptions + sub_resp = self.simulate_post(self.subscription_path, + body=jsonutils.dumps(self.subscriptions), + headers=self.headers) + + # Purge queue + purge_body = jsonutils.dumps({'resource_types': ['messages']}) + self.simulate_post(self.queue_path+"/purge", body=purge_body) + + for msg_id in msg_ids: + target = self.messages_path + '/' + msg_id + self.simulate_get(target, headers=self.headers) + self.assertEqual(falcon.HTTP_404, self.srmock.status) + + # Check subscriptions are still there + resp_list = self.simulate_get(self.subscription_path, + headers=self.headers) + resp_list_doc = jsonutils.loads(resp_list[0]) + sid = resp_list_doc['subscriptions'][0]['id'] + sub_resp_doc = jsonutils.loads(sub_resp[0]) + self.assertEqual(sub_resp_doc['subscription_id'], sid) + + def test_purge_by_default(self): + # Post messages + messages_body = jsonutils.dumps(self.messages) + self.simulate_post(self.messages_path, body=messages_body, + headers=self.headers) + + msg_ids = self._get_msg_ids(self.srmock.headers_dict) + for msg_id in msg_ids: + target = self.messages_path + '/' + msg_id + self.simulate_get(target, headers=self.headers) + self.assertEqual(falcon.HTTP_200, self.srmock.status) + + # Post subscriptions + sub_resp = self.simulate_post(self.subscription_path, + body=jsonutils.dumps(self.subscriptions), + headers=self.headers) + + # Purge queue + purge_body = jsonutils.dumps({'resource_types': ['messages', + 'subscriptions']}) + self.simulate_post(self.queue_path+"/purge", body=purge_body) + + for msg_id in msg_ids: + target = self.messages_path + '/' + msg_id + self.simulate_get(target, headers=self.headers) + self.assertEqual(falcon.HTTP_404, self.srmock.status) + + # Check subscriptions are still there + sub_id = jsonutils.loads(sub_resp[0])['subscription_id'] + self.simulate_get(self.subscription_path + "/" + sub_id, + headers=self.headers) + self.assertEqual(falcon.HTTP_404, self.srmock.status) diff --git a/zaqar/tests/unit/transport/wsgi/v2_0/test_validation.py b/zaqar/tests/unit/transport/wsgi/v2_0/test_validation.py index 9aa04a54a..5d5df6a47 100644 --- a/zaqar/tests/unit/transport/wsgi/v2_0/test_validation.py +++ b/zaqar/tests/unit/transport/wsgi/v2_0/test_validation.py @@ -179,3 +179,25 @@ class TestValidation(base.V2Base): body='[{"op":"add","path":"/metadata/a",' '"value":2}]') self.assertEqual(falcon.HTTP_200, self.srmock.status) + + def test_queue_purge(self): + # Wrong key + queue_1 = self.url_prefix + '/queues/queue1/purge' + self.simulate_post(queue_1, + self.project_id, + body='{"wrong_key": ["messages"]}') + self.addCleanup(self.simulate_delete, queue_1, self.project_id, + headers=self.headers) + self.assertEqual(falcon.HTTP_400, self.srmock.status) + + # Wrong value + self.simulate_post(queue_1, + self.project_id, + body='{"resource_types": ["wrong_value"]}') + self.assertEqual(falcon.HTTP_400, self.srmock.status) + + # Correct input + self.simulate_post(queue_1, + self.project_id, + body='{"resource_types": ["messages"]}') + self.assertEqual(falcon.HTTP_204, self.srmock.status) diff --git a/zaqar/transport/validation.py b/zaqar/transport/validation.py index 5ef1f190d..db7d01697 100644 --- a/zaqar/transport/validation.py +++ b/zaqar/transport/validation.py @@ -27,6 +27,7 @@ MIN_MESSAGE_TTL = 60 MIN_CLAIM_TTL = 60 MIN_CLAIM_GRACE = 60 MIN_SUBSCRIPTION_TTL = 60 +_PURGBLE_RESOURCE_TYPES = {'messages', 'subscriptions'} _TRANSPORT_LIMITS_OPTIONS = ( cfg.IntOpt('max_queues_per_page', default=20, @@ -320,6 +321,22 @@ class Validator(object): ' and must be at least greater than 0.'), self._limits_conf.max_messages_post_size) + def queue_purging(self, document): + """Restrictions the resource types to be purged for a queue. + + :param resource_types: Type list of all resource under a queue + :raises: ValidationFailed if the resource types are invalid + """ + + if 'resource_types' not in document: + msg = _(u'Post body must contain key "resource_types".') + raise ValidationFailed(msg) + + if (not set(document['resource_types']).issubset( + _PURGBLE_RESOURCE_TYPES)): + msg = _(u'Resource types must be a sub set of {0}.') + raise ValidationFailed(msg, _PURGBLE_RESOURCE_TYPES) + def message_posting(self, messages): """Restrictions on a list of messages. diff --git a/zaqar/transport/wsgi/v2_0/__init__.py b/zaqar/transport/wsgi/v2_0/__init__.py index 46078bd6a..c621ff453 100644 --- a/zaqar/transport/wsgi/v2_0/__init__.py +++ b/zaqar/transport/wsgi/v2_0/__init__.py @@ -20,6 +20,7 @@ from zaqar.transport.wsgi.v2_0 import homedoc from zaqar.transport.wsgi.v2_0 import messages from zaqar.transport.wsgi.v2_0 import ping from zaqar.transport.wsgi.v2_0 import pools +from zaqar.transport.wsgi.v2_0 import purge from zaqar.transport.wsgi.v2_0 import queues from zaqar.transport.wsgi.v2_0 import stats from zaqar.transport.wsgi.v2_0 import subscriptions @@ -69,7 +70,8 @@ def public_endpoints(driver, conf): message_controller)), ('/queues/{queue_name}/stats', stats.Resource(queue_controller)), - + ('/queues/{queue_name}/purge', + purge.Resource(driver)), # Messages Endpoints ('/queues/{queue_name}/messages', messages.CollectionResource(driver._wsgi_conf, diff --git a/zaqar/transport/wsgi/v2_0/homedoc.py b/zaqar/transport/wsgi/v2_0/homedoc.py index 2c2ad578b..f019a2fa4 100644 --- a/zaqar/transport/wsgi/v2_0/homedoc.py +++ b/zaqar/transport/wsgi/v2_0/homedoc.py @@ -72,6 +72,19 @@ JSON_HOME = { 'accept-post': ['application/json'], }, }, + 'rel/queue_purge': { + 'href-template': '/v2/queues/{queue_name}/purge', + 'href-vars': { + 'queue_name': 'param/queue_name', + }, + 'hints': { + 'allow': ['POST'], + 'formats': { + 'application/json': {}, + }, + 'accept-post': ['application/json'], + }, + }, # ----------------------------------------------------------------- # Messages diff --git a/zaqar/transport/wsgi/v2_0/purge.py b/zaqar/transport/wsgi/v2_0/purge.py new file mode 100644 index 000000000..ce5876d42 --- /dev/null +++ b/zaqar/transport/wsgi/v2_0/purge.py @@ -0,0 +1,81 @@ +# Copyright 2016 Catalyst IT Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may not +# use this file except in compliance with the License. You may obtain a copy +# of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations under +# the License. + +import falcon + +from oslo_log import log as logging +import six + +from zaqar.common import decorators +from zaqar.transport import acl +from zaqar.transport import validation +from zaqar.transport.wsgi import errors as wsgi_errors +from zaqar.transport.wsgi import utils as wsgi_utils + +LOG = logging.getLogger(__name__) + + +class Resource(object): + + __slots__ = ('_driver', '_conf', '_queue_ctrl', + '_message_ctrl', '_subscription_ctrl', '_validate') + + def __init__(self, driver): + self._driver = driver + self._conf = driver._conf + self._queue_ctrl = driver._storage.queue_controller + self._message_ctrl = driver._storage.message_controller + self._subscription_ctrl = driver._storage.subscription_controller + self._validate = driver._validate + + @decorators.TransportLog("Queue item") + @acl.enforce("queues:purge") + def on_post(self, req, resp, project_id, queue_name): + try: + if req.content_length: + document = wsgi_utils.deserialize(req.stream, + req.content_length) + self._validate.queue_purging(document) + else: + document = {'resource_types': ['messages', 'subscriptions']} + except ValueError as ex: + LOG.debug(ex) + raise wsgi_errors.HTTPBadRequestAPI(six.text_type(ex)) + except validation.ValidationFailed as ex: + LOG.debug(ex) + raise wsgi_errors.HTTPBadRequestAPI(six.text_type(ex)) + + try: + if "messages" in document['resource_types']: + LOG.debug("Purge all messages under queue %s" % queue_name) + messages = self._message_ctrl.pop(queue_name, 10, + project=project_id) + while messages: + messages = self._message_ctrl.pop(queue_name, 10, + project=project_id) + + if "subscriptions" in document['resource_types']: + LOG.debug("Purge all subscriptions under queue %s" % + queue_name) + results = self._subscription_ctrl.list(queue_name, + project=project_id) + subscriptions = list(next(results)) + for sub in subscriptions: + self._subscription_ctrl.delete(queue_name, + sub['id'], + project=project_id) + except ValueError as err: + raise wsgi_errors.HTTPBadRequestAPI(str(err)) + + resp.status = falcon.HTTP_204