Support purge queue

Change-Id: Ic20c9ad65d2a77c84afc38d7fcb4535cbaa1d01d
This commit is contained in:
Fei Long Wang 2017-02-16 13:43:42 +13:00
parent b60f5f54af
commit b3ff95eb4a
6 changed files with 85 additions and 0 deletions

View File

@ -79,6 +79,7 @@ openstack.messaging.v2 =
queue_stats = zaqarclient.queues.v2.cli:GetQueueStats
queue_set_metadata = zaqarclient.queues.v2.cli:SetQueueMetadata
queue_get_metadata = zaqarclient.queues.v2.cli:GetQueueMetadata
queue_purge = zaqarclient.queues.v2.cli:PurgeQueue
pool_create = zaqarclient.queues.v2.cli:CreatePool
pool_show = zaqarclient.queues.v2.cli:ShowPool
pool_update = zaqarclient.queues.v2.cli:UpdatePool

View File

@ -22,6 +22,14 @@ class V2(api.V1_1):
V2.schema.update({
'queue_purge': {
'ref': 'queues/{queue_name}/purge',
'method': 'POST',
'required': ['queue_name'],
'properties': {
'queue_name': {'type': 'string'}
}
},
'signed_url_create': {
'ref': 'queues/{queue_name}/share',
'method': 'POST',

View File

@ -57,6 +57,32 @@ class GetQueueMetadata(cli.GetQueueMetadata):
pass
class PurgeQueue(command.Command):
"""Purge a queue"""
_description = _("Purge a queue")
log = logging.getLogger(__name__ + ".PurgeQueue")
def get_parser(self, prog_name):
parser = super(PurgeQueue, self).get_parser(prog_name)
parser.add_argument(
"queue_name",
metavar="<queue_name>",
help="Name of the queue")
parser.add_argument(
"--resource_types",
metavar="<resource_types>",
choices=['messages', 'subscriptions'],
help="Resource types want to be purged.")
return parser
def take_action(self, parsed_args):
client = _get_client(self, parsed_args)
queue_name = parsed_args.queue_name
client.queue(queue_name).purge(
resource_types=parsed_args.resource_types)
class CreatePool(cli.CreatePool):
"""Create a pool"""
pass

View File

@ -88,6 +88,27 @@ def queue_update(transport, request, name, metadata, callback=None):
return resp.deserialized_content
def queue_purge(transport, request, name, resource_types=None):
"""Purge resources under a queue
:param transport: Transport instance to use
:type transport: `transport.base.Transport`
:param request: Request instance ready to be sent.
:type request: `transport.request.Request`
:param name: Queue reference name.
:type name: `six.text_type`
:param resource_types: Resource types will be purged
:type resource_types: `list`
"""
request.operation = 'queue_purge'
request.params['queue_name'] = name
if resource_types:
request.content = json.dumps({'resource_types': [resource_types]})
resp = transport.send(request)
return resp.deserialized_content
def signed_url_create(transport, request, queue_name, paths=None,
ttl_seconds=None, project_id=None, methods=None):
"""Creates a signed URL given a queue name

View File

@ -83,6 +83,11 @@ class Queue(queues.Queue):
return self._metadata
def purge(self, resource_types=None):
req, trans = self.client._request_and_transport()
core.queue_purge(trans, req, self._name,
resource_types=resource_types)
def create_object(parent):
return lambda args: Queue(parent, args["name"], auto_create=False)

View File

@ -558,6 +558,30 @@ class QueuesV2QueueUnitTest(QueuesV1_1QueueUnitTest):
expect_metadata = {"name": 'test1'}
self.assertEqual(expect_metadata, metadata)
def test_queue_purge(self):
with mock.patch.object(self.transport, 'send',
autospec=True) as send_method:
resp = response.Response(None, None)
send_method.return_value = resp
self.queue.purge()
# NOTE(flwang): Nothing to assert here,
# just checking our way down to the transport
# doesn't crash.
def test_queue_purge_messages(self):
with mock.patch.object(self.transport, 'send',
autospec=True) as send_method:
resp = response.Response(None, None)
send_method.return_value = resp
self.queue.purge(resource_types=['messages'])
self.assertEqual({"resource_types": [["messages"]]},
json.loads(send_method.call_args[0][0].content))
class QueuesV2QueueFunctionalTest(QueuesV1_1QueueFunctionalTest):