Add support for subscription create v2

This patch adds the support for subscription create, which includes
the CLI support and the lib support. Besides, to make the functional
test work for subscription create, this patch also adds the lib
support for subscription get, delete, update and list to make things
easier.

Change-Id: Ie3ac27b731f41ea6023976aab3c1afbc283d659d
This commit is contained in:
Fei Long Wang 2015-11-25 08:41:22 +13:00
parent e8c303956f
commit 8ca747746c
12 changed files with 477 additions and 5 deletions

View File

@ -82,6 +82,7 @@ openstack.messaging.v2 =
messaging_flavor_update = zaqarclient.queues.v2.cli:UpdateFlavor
messaging_flavor_show = zaqarclient.queues.v2.cli:ShowFlavor
messaging_flavor_create = zaqarclient.queues.v2.cli:CreateFlavor
subscription_create = zaqarclient.queues.v2.cli:CreateSubscription
openstack.cli.extension =
messaging = zaqarclient.queues.cli

View File

@ -0,0 +1,27 @@
# Copyright (c) 2015 Catalyst IT Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from zaqarclient.tests.queues import subscriptions
from zaqarclient.transport import http
class QueuesV2SubscriptionHttpFunctionalTest(
subscriptions.QueuesV2SubscriptionFunctionalTest):
is_functional = True
transport_cls = http.HttpTransport
url = 'http://127.0.0.1:8888'
version = 2

View File

@ -0,0 +1,24 @@
# Copyright (c) 2015 Catalyst IT Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from zaqarclient.tests.queues import subscriptions as sub
from zaqarclient.transport import http
class QueuesV2SubscriptionHttpUnitTest(sub.QueuesV2SubscriptionUnitTest):
transport_cls = http.HttpTransport
url = 'http://127.0.0.1:8888/v2'
version = 2

View File

@ -29,5 +29,49 @@ V2.schema.update({
'properties': {
'queue_name': {'type': 'string'}
},
}
},
'subscription_create': {
'ref': 'queues/{queue_name}/subscriptions',
'method': 'POST',
'required': ['queue_name'],
'properties': {
'queue_name': {'type': 'string'}
},
},
'subscription_get': {
'ref': 'queues/{queue_name}/subscriptions/{subscription_id}',
'method': 'GET',
'required': ['queue_name', 'subscription_id'],
'properties': {
'queue_name': {'type': 'string'},
'subscription_id': {'type': 'string'}
},
},
'subscription_update': {
'ref': 'queues/{queue_name}/subscriptions/{subscription_id}',
'method': 'PATCH',
'required': ['queue_name', 'subscription_id'],
'properties': {
'queue_name': {'type': 'string'},
'subscription_id': {'type': 'string'}
}
},
'subscription_delete': {
'ref': 'queues/{queue_name}/subscriptions/{subscription_id}',
'method': 'DELETE',
'required': ['queue_name', 'subscription_id'],
'properties': {
'queue_name': {'type': 'string'},
'subscription_id': {'type': 'string'}
}
},
'subscription_list': {
'ref': 'queues/{queue_name}/subscriptions',
'method': 'GET',
'properties': {
'marker': {'type': 'string'},
'limit': {'type': 'integer'},
'detailed': {'type': 'boolean'}
}
},
})

View File

@ -12,9 +12,18 @@
# License for the specific language governing permissions and limitations
# under the License.
from zaqarclient.queues.v1 import cli
import json
import logging
from cliff import show
from openstackclient.common import utils
from zaqarclient.queues.v1 import cli
def _get_client(obj, parsed_args):
obj.log.debug("take_action(%s)" % parsed_args)
return obj.app.client_manager.messaging
class CreateQueue(cli.CreateQueue):
@ -104,3 +113,48 @@ class CreateFlavor(cli.CreateFlavor):
class ListFlavors(cli.ListFlavors):
"""List available flavors"""
pass
class CreateSubscription(show.ShowOne):
"""Create a subscription for queue"""
log = logging.getLogger(__name__ + ".CreateSubscription")
def get_parser(self, prog_name):
parser = super(CreateSubscription, self).get_parser(prog_name)
parser.add_argument(
"queue_name",
metavar="<queue_name>",
help="Name of the queue to subscribe to")
parser.add_argument(
"subscriber",
metavar="<subscriber>",
help="Subscriber which will be notified")
parser.add_argument(
"ttl",
metavar="<ttl>",
type=int,
help="Time to live of the subscription in seconds")
parser.add_argument(
"--options",
type=json.loads,
default={},
metavar="<options>",
help="Metadata of the subscription in JSON format")
return parser
def take_action(self, parsed_args):
client = _get_client(self, parsed_args)
kwargs = {'subscriber': parsed_args.subscriber,
'ttl': parsed_args.ttl,
'options': parsed_args.options,
}
data = client.subscription(parsed_args.queue_name, **kwargs)
if not data:
raise RuntimeError('Failed to create subscription for (%s).' %
parsed_args.subscriber)
columns = ('ID', 'Subscriber', 'TTL', 'Options')
return columns, utils.get_item_properties(data, columns)

View File

@ -15,7 +15,11 @@
import uuid
from zaqarclient.common import decorators
from zaqarclient.queues.v1 import client
from zaqarclient.queues.v1 import iterator
from zaqarclient.queues.v2 import core
from zaqarclient.queues.v2 import subscription
class Client(client.Client):
@ -42,3 +46,34 @@ class Client(client.Client):
self.auth_opts = self.conf.get('auth_opts', {})
self.client_uuid = self.conf.get('client_uuid',
uuid.uuid4().hex)
@decorators.version(min_version=2)
def subscription(self, queue_name, **kwargs):
"""Returns a subscription instance
:param queue_name: Name of the queue to subscribe to.
:type queue_name: `six.text_type`
:returns: A subscription instance
:rtype: `subscription.Subscription`
"""
return subscription.Subscription(self, queue_name, **kwargs)
@decorators.version(min_version=2)
def subscriptions(self, **params):
"""Gets a list of subscriptions from the server
:param params: Filters to use for getting subscriptions
:type params: **kwargs dict.
:returns: A list of subscriptions
:rtype: `list`
"""
req, trans = self._request_and_transport()
subscription_list = core.subscription_list(trans, req, **params)
return iterator._Iterator(self,
subscription_list,
'subscriptions',
subscription.create_object(self))

View File

@ -33,6 +33,7 @@ import json
from oslo_utils import timeutils
from zaqarclient.queues.v1 import core
from zaqarclient.transport import errors
queue_create = core.queue_create
queue_exists = core.queue_exists
@ -103,3 +104,119 @@ def signed_url_create(transport, request, queue_name, paths=None,
resp = transport.send(request)
return resp.deserialized_content
def subscription_create(transport, request, queue_name, subscription_data):
"""Creates a new subscription against the `queue_name`
:param transport: Transport instance to use
:type transport: `transport.base.Transport`
:param request: Request instance ready to be sent.
:type request: `transport.request.Request`
:param queue_name: Queue reference name.
:type queue_name: `six.text_type`
:param subscription_data: Subscription's properties, i.e: subscriber,
ttl, options.
:type subscription_data: `dict`
"""
request.operation = 'subscription_create'
request.params['queue_name'] = queue_name
request.content = json.dumps(subscription_data)
resp = transport.send(request)
if resp.status_code == 409:
raise errors.ConflictError()
return resp.deserialized_content
def subscription_get(transport, request, queue_name, subscription_id):
"""Gets a particular subscription data
:param transport: Transport instance to use
:type transport: `transport.base.Transport`
:param request: Request instance ready to be sent.
:type request: `transport.request.Request`
:param queue_name: Queue reference name.
:type queue_name: `six.text_type`
:param subscription_id: ID of subscription.
:type subscription_id: `six.text_type`
"""
request.operation = 'subscription_get'
request.params['queue_name'] = queue_name
request.params['subscription_id'] = subscription_id
resp = transport.send(request)
return resp.deserialized_content
def subscription_update(transport, request, queue_name, subscription_id,
subscription_data):
"""Updates the subscription
:param transport: Transport instance to use
:type transport: `transport.base.Transport`
:param request: Request instance ready to be sent.
:type request: `transport.request.Request`
:param queue_name: Queue reference name.
:type queue_name: `six.text_type`
:param subscription_id: ID of subscription.
:type subscription_id: `six.text_type`
:param subscription_data: Subscription's properties, i.e: subscriber,
ttl, options.
:type subscription_data: `dict`
"""
request.operation = 'subscription_update'
request.params['queue_name'] = queue_name
request.params['subscription_id'] = subscription_id
request.content = json.dumps(subscription_data)
resp = transport.send(request)
return resp.deserialized_content
def subscription_delete(transport, request, queue_name, subscription_id):
"""Deletes the subscription
:param transport: Transport instance to use
:type transport: `transport.base.Transport`
:param request: Request instance ready to be sent.
:type request: `transport.request.Request`
:param queue_name: Queue reference name.
:type queue_name: `six.text_type`
:param subscription_id: ID of subscription.
:type subscription_id: `six.text_type`
"""
request.operation = 'subscription_delete'
request.params['queue_name'] = queue_name
request.params['subscription_id'] = subscription_id
transport.send(request)
def subscription_list(transport, request, **kwargs):
"""Gets a list of subscriptions
:param transport: Transport instance to use
:type transport: `transport.base.Transport`
:param request: Request instance ready to be sent.
:type request: `transport.request.Request`
:param kwargs: Optional arguments for this operation.
- marker: Where to start getting subscriptions from.
- limit: Maximum number of subscriptions to get.
"""
request.operation = 'subscription_list'
request.params.update(kwargs)
resp = transport.send(request)
if not resp.content:
return {'links': [], 'pools': []}
return resp.deserialized_content

View File

@ -0,0 +1,84 @@
# Copyright (c) 2015 Catalyst IT Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from zaqarclient.queues.v2 import core
from zaqarclient.transport import errors
class Subscription(object):
def __init__(self, client, queue_name, subscriber=None, ttl=60, id=None,
auto_create=True, **kwargs):
self.client = client
self.id = id
self.queue_name = queue_name,
self.subscriber = subscriber
self.ttl = ttl
self.options = kwargs.get('options', {})
if auto_create:
self.ensure_exists()
def ensure_exists(self):
"""Ensures subscription exists
This method is not race safe, the subscription could've been deleted
right after it was called.
"""
req, trans = self.client._request_and_transport()
if not self.id and self.subscriber:
subscription_data = {'subscriber': self.subscriber,
'ttl': self.ttl,
'options': self.options
}
try:
subscription = core.subscription_create(trans, req,
self.queue_name,
subscription_data)
if subscription and 'subscription_id' in subscription:
self.id = subscription['subscription_id']
except errors.ConflictError:
# ConflictError means the subscription already exists.
print('The subscriber has been existed already.')
if self.id:
sub = core.subscription_get(trans, req, self.queue_name, self.id)
self.subscriber = sub.get('subscriber')
self.ttl = sub.get('ttl')
self.options = sub.get('options')
def update(self, subscription_data):
req, trans = self.client._request_and_transport()
core.subscription_update(trans, req, self.queue_name,
self.id, subscription_data)
for key, value in subscription_data.items():
setattr(self, key, value)
def delete(self):
req, trans = self.client._request_and_transport()
core.subscription_delete(trans, req, self.queue_name, self.id)
def create_object(parent):
return lambda kwargs: Subscription(parent, kwargs["queue_name"],
subscriber=kwargs['subscriber'],
ttl=kwargs['ttl'],
id=kwargs['id'],
auto_create=False,
**kwargs)

View File

@ -0,0 +1,74 @@
# Copyright (c) 2015 Catalyst IT Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import mock
from zaqarclient.tests.queues import base
from zaqarclient.transport import response
class QueuesV2SubscriptionUnitTest(base.QueuesTestBase):
def test_subscription_create(self):
subscription_data = {'subscriber': 'http://trigger.me',
'ttl': 3600}
with mock.patch.object(self.transport, 'send',
autospec=True) as send_method:
create_resp = response.Response(None,
'{"subscription_id": "fake_id"}')
get_content = ('{"subscriber": "http://trigger.me","ttl": 3600, '
'"id": "fake_id"}')
get_resp = response.Response(None, get_content)
send_method.side_effect = iter([create_resp, get_resp])
# NOTE(flwang): This will call
# ensure exists in the client instance
# since auto_create's default is True
subscription = self.client.subscription('beijing',
**subscription_data)
self.assertEqual('http://trigger.me', subscription.subscriber)
self.assertEqual(3600, subscription.ttl)
self.assertEqual('fake_id', subscription.id)
class QueuesV2SubscriptionFunctionalTest(base.QueuesTestBase):
def setUp(self):
super(QueuesV2SubscriptionFunctionalTest, self).setUp()
# TODO(flwang): Now there is a bug(#1529168) for the subscription TTL,
# so we will add a test case for TTL after the bug fixed.
self.queue_name = 'beijing'
queue = self.client.queue(self.queue_name, force_create=True)
self.addCleanup(queue.delete)
subscription_data_1 = {'subscriber': 'http://trigger.me', 'ttl': 3600}
subscription_data_2 = {'subscriber': 'http://trigger.he', 'ttl': 7200}
self.subscription_1 = self.client.subscription(self.queue_name,
**subscription_data_1)
self.addCleanup(self.subscription_1.delete)
self.subscription_2 = self.client.subscription(self.queue_name,
**subscription_data_2)
self.addCleanup(self.subscription_2.delete)
def test_subscription_create(self):
self.assertEqual('http://trigger.me', self.subscription_1.subscriber)
self.assertEqual(3600, self.subscription_1.ttl)
self.assertEqual('http://trigger.he', self.subscription_2.subscriber)
self.assertEqual(7200, self.subscription_2.ttl)

View File

@ -71,3 +71,10 @@ class ServiceUnavailableError(TransportError):
This error maps to HTTP's 503
"""
class ConflictError(TransportError):
"""Indicates that the server was unable to service the request
This error maps to HTTP's 409
"""

View File

@ -108,4 +108,5 @@ class HttpTransport(base.Transport):
# NOTE(flaper87): This reads the whole content
# and will consume any attempt of streaming.
return response.Response(request, resp.text,
headers=resp.headers)
headers=resp.headers,
status_code=resp.status_code)

View File

@ -29,14 +29,18 @@ class Response(object):
:type: `six.string_types`
:param headers: Optional headers returned in the response.
:type: dict
:param status_code: Optional status_code returned in the response.
:type: `int`
"""
__slots__ = ('request', 'content', 'headers', '_deserialized')
__slots__ = ('request', 'content', 'headers', 'status_code',
'_deserialized')
def __init__(self, request, content, headers=None):
def __init__(self, request, content, headers=None, status_code=None):
self.request = request
self.content = content
self.headers = headers or {}
self.status_code = status_code
self._deserialized = None