diff --git a/zaqar/api/v2/endpoints.py b/zaqar/api/v2/endpoints.py index 874cdb6eb..bd4c08b4f 100644 --- a/zaqar/api/v2/endpoints.py +++ b/zaqar/api/v2/endpoints.py @@ -222,6 +222,57 @@ class Endpoints(object): headers = {'status': 200} return response.Response(req, body, headers) + @api_utils.on_exception_sends_500 + def queue_purge(self, req): + """Purge queue + + :param req: Request instance ready to be sent. + :type req: `api.common.Request` + :return: resp: Response instance + :type: resp: `api.common.Response` + """ + project_id = req._headers.get('X-Project-ID') + queue_name = req._body.get('queue_name') + resource_types = req._body.get('resource_types', ["messages", + "subscriptions"]) + + LOG.debug(u'Purge queue - queue: %(queue)s, ' + u'project: %(project)s', + {'queue': queue_name, 'project': project_id}) + + try: + pop_limit = 100 + if "messages" in resource_types: + LOG.debug("Purge all messages under queue %s" % queue_name) + resp = self._pop_messages(req, queue_name, + project_id, pop_limit) + while resp.get_response()['body']['messages']: + resp = self._pop_messages(req, queue_name, + project_id, pop_limit) + + if "subscriptions" in resource_types: + LOG.debug("Purge all subscriptions under queue %s" % + queue_name) + resp = self._subscription_controller.list(queue_name, + project=project_id) + subscriptions = list(next(resp)) + for sub in subscriptions: + self._subscription_controller.delete(queue_name, + sub['id'], + project=project_id) + + except storage_errors.QueueDoesNotExist as ex: + LOG.exception(ex) + headers = {'status': 404} + return api_utils.error_response(req, ex, headers) + except storage_errors.ExceptionBase as ex: + LOG.exception(ex) + headers = {'status': 503} + return api_utils.error_response(req, ex, headers) + else: + headers = {'status': 204} + return response.Response(req, {}, headers) + # Messages @api_utils.on_exception_sends_500 def message_list(self, req): diff --git a/zaqar/api/v2/request.py b/zaqar/api/v2/request.py index d87ed8db3..d7525cf2f 100644 --- a/zaqar/api/v2/request.py +++ b/zaqar/api/v2/request.py @@ -104,4 +104,23 @@ class RequestSchema(v1_1.RequestSchema): }, 'required': ['action', 'headers', 'body'] }, + + consts.QUEUE_PURGE: { + 'properties': { + 'action': {'enum': [consts.QUEUE_PURGE]}, + 'headers': { + 'type': 'object', + 'properties': headers, + 'required': ['Client-ID', 'X-Project-ID']}, + 'body': { + 'type': 'object', + 'properties': { + 'queue_name': {'type': 'string'}, + 'resource_types': {'type': 'array'}, + }, + 'required': ['queue_name'], + } + }, + 'required': ['action', 'headers', 'body'] + }, }) diff --git a/zaqar/common/consts.py b/zaqar/common/consts.py index eaedc502a..96cb1adb4 100644 --- a/zaqar/common/consts.py +++ b/zaqar/common/consts.py @@ -64,12 +64,14 @@ QUEUE_OPS = ( QUEUE_GET, QUEUE_DELETE, QUEUE_GET_STATS, + QUEUE_PURGE ) = ( 'queue_create', 'queue_list', 'queue_get', 'queue_delete', 'queue_get_stats', + 'queue_purge' ) CLAIM_OPS = ( diff --git a/zaqar/tests/unit/transport/websocket/v2/test_queue_lifecycle.py b/zaqar/tests/unit/transport/websocket/v2/test_queue_lifecycle.py index 3b27c207a..d33e0b06c 100644 --- a/zaqar/tests/unit/transport/websocket/v2/test_queue_lifecycle.py +++ b/zaqar/tests/unit/transport/websocket/v2/test_queue_lifecycle.py @@ -602,6 +602,59 @@ class QueueLifecycleBaseTest(base.V2Base): mock_queue_list.return_value = fake_generator() self.protocol.onMessage(req, False) + def _post_messages(self, queue_name, headers, repeat=1): + messages = [{'body': 239, 'ttl': 300}] * repeat + + action = consts.MESSAGE_POST + body = {"queue_name": queue_name, + "messages": messages} + + send_mock = mock.Mock() + self.protocol.sendMessage = send_mock + + req = test_utils.create_request(action, body, headers) + + self.protocol.onMessage(req, False) + + return json.loads(send_mock.call_args[0][0]) + + def test_purge(self): + arbitrary_number = 644079696574693 + project_id = str(arbitrary_number) + client_id = str(uuid.uuid4()) + headers = { + 'X-Project-ID': project_id, + 'Client-ID': client_id + } + queue_name = 'myqueue' + resp = self._post_messages(queue_name, headers, repeat=5) + msg_ids = resp['body']['message_ids'] + + send_mock = mock.Mock() + self.protocol.sendMessage = send_mock + for msg_id in msg_ids: + action = consts.MESSAGE_GET + body = {"queue_name": queue_name, "message_id": msg_id} + req = test_utils.create_request(action, body, headers) + self.protocol.onMessage(req, False) + resp = json.loads(send_mock.call_args[0][0]) + self.assertEqual(200, resp['headers']['status']) + + action = consts.QUEUE_PURGE + body = {"queue_name": queue_name, "resource_types": ["messages"]} + req = test_utils.create_request(action, body, headers) + self.protocol.onMessage(req, False) + resp = json.loads(send_mock.call_args[0][0]) + self.assertEqual(204, resp['headers']['status']) + + for msg_id in msg_ids: + action = consts.MESSAGE_GET + body = {"queue_name": queue_name, "message_id": msg_id} + req = test_utils.create_request(action, body, headers) + self.protocol.onMessage(req, False) + resp = json.loads(send_mock.call_args[0][0]) + self.assertEqual(404, resp['headers']['status']) + class TestQueueLifecycleMongoDB(QueueLifecycleBaseTest): diff --git a/zaqar/transport/wsgi/v2_0/purge.py b/zaqar/transport/wsgi/v2_0/purge.py index ce5876d42..56cfadcf8 100644 --- a/zaqar/transport/wsgi/v2_0/purge.py +++ b/zaqar/transport/wsgi/v2_0/purge.py @@ -58,11 +58,12 @@ class Resource(object): try: if "messages" in document['resource_types']: + pop_limit = 100 LOG.debug("Purge all messages under queue %s" % queue_name) - messages = self._message_ctrl.pop(queue_name, 10, + messages = self._message_ctrl.pop(queue_name, pop_limit, project=project_id) while messages: - messages = self._message_ctrl.pop(queue_name, 10, + messages = self._message_ctrl.pop(queue_name, pop_limit, project=project_id) if "subscriptions" in document['resource_types']: