Bootstrap Messages support

This patch implements some of the operations required to add Message
support to marconiclient. The higher level implementation of message
deletion is not present in this patch, since it requires the definition
of a new Message type, which will submitted in the upcoming patch.

Partially-Implements blueprint python-marconiclient-v1
Partially-Implements blueprint messages-management

Change-Id: I42efe24e0e6083c530d494e95b66c6c01535a849
This commit is contained in:
Flavio Percoco 2013-10-17 15:35:35 +02:00
parent 3c32c6894b
commit 664c366bd8
7 changed files with 543 additions and 2 deletions

View File

@ -85,4 +85,60 @@ class V1(api.Api):
'queue_name': {'type': 'string'}
}
},
'message_list': {
'ref': 'queues/{queue_name}/messages',
'method': 'GET',
'required': ['queue_name'],
'properties': {
'queue_name': {'type': 'string'},
'marker': {'type': 'string'},
'limit': {'type': 'integer'},
'echo': {'type': 'boolean'},
'include_claimed': {'type': 'boolean'},
}
},
'message_post': {
'ref': 'queues/{queue_name}/messages',
'method': 'POST',
'required': ['queue_name', 'message_id'],
'properties': {
'queue_name': {'type': 'string'},
'claim_id': {'type': 'string'},
}
},
'message_get': {
'ref': 'queues/{queue_name}/messages/{message_id}',
'method': 'GET',
'required': ['queue_name', 'message_id'],
'properties': {
'queue_name': {'type': 'string'},
'message_id': {'type': 'string'},
'claim_id': {'type': 'string'},
}
},
'message_get_many': {
'ref': 'queues/{queue_name}/messages',
'method': 'GET',
'required': ['queue_name', 'ids'],
'properties': {
'queue_name': {'type': 'string'},
'ids': {'type': 'string'},
'claim_id': {'type': 'string'},
}
},
'message_delete': {
'ref': 'queues/{queue_name}/messages/{message_id}',
'method': 'DELETE',
'required': ['queue_name', 'message_id'],
'properties': {
'queue_name': {'type': 'string'},
'message_id': {'type': 'string'},
'claim_id': {'type': 'string'},
}
},
}

View File

@ -13,6 +13,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import uuid
from oslo.config import cfg
from marconiclient.queues.v1 import queues
@ -22,6 +24,10 @@ from marconiclient import transport
_CLIENT_OPTIONS = [
cfg.StrOpt('os_queues_url',
help='Queues remote URL'),
cfg.StrOpt('client_uuid',
default=uuid.uuid4().hex,
help='Client UUID'),
]
@ -38,6 +44,8 @@ class Client(object):
self.api_url = self.conf.os_queues_url or url
self.api_version = version
self.client_uuid = self.conf.client_uuid
def transport(self):
"""Gets a transport based on conf."""
return transport.get_transport_for_conf(self.conf)

View File

@ -91,3 +91,144 @@ def queue_delete(transport, request, name, callback=None):
"""Deletes queue."""
return _common_queue_ops('queue_delete', transport,
request, name, callback=callback)
def message_list(transport, request, queue_name, callback=None, **kwargs):
"""Gets a list of messages in queue `queue_name`
:param transport: Transport instance to use
:type transport: `transport.base.Transport`
:param request: Request instance ready to be sent.
:type request: `transport.request.Request`
:param queue_name: Queue reference name.
:type queue_name: `six.text_type`
:param callback: Optional callable to use as callback.
If specified, this request will be sent asynchronously.
(IGNORED UNTIL ASYNC SUPPORT IS COMPLETE)
:type callback: Callable object.
:param kwargs: Optional arguments for this operation.
- marker: Where to start getting messages from.
- limit: Maximum number of messages to get.
- echo: Whether to get our own messages.
- include_claimed: Whether to include claimed
messages.
"""
request.operation = 'message_list'
request.params['queue_name'] = queue_name
# NOTE(flaper87): Assume passed params
# are accepted by the API, if not, the
# API itself will raise an error.
request.params.update(kwargs)
resp = transport.send(request)
if not resp.content:
# NOTE(flaper87): We could also return None
# or an empty dict, however, we're giving
# more value to a consistent API here by
# returning a compliant dict with empty
# `links` and `messages`
return {'links': [], 'messages': []}
return json.loads(resp.content)
def message_post(transport, request, queue_name, messages, callback=None):
"""Post messages to `queue_name`
:param transport: Transport instance to use
:type transport: `transport.base.Transport`
:param request: Request instance ready to be sent.
:type request: `transport.request.Request`
:param queue_name: Queue reference name.
:type queue_name: `six.text_type`
:param messages: One or more messages to post.
:param messages: `list`
:param callback: Optional callable to use as callback.
If specified, this request will be sent asynchronously.
(IGNORED UNTIL ASYNC SUPPORT IS COMPLETE)
:type callback: Callable object.
"""
request.operation = 'message_post'
request.params['queue_name'] = queue_name
request.content = json.dumps(messages)
resp = transport.send(request)
return json.loads(resp.content)
def message_get(transport, request, queue_name, message_id, callback=None):
"""Gets one message from the queue by id
:param transport: Transport instance to use
:type transport: `transport.base.Transport`
:param request: Request instance ready to be sent.
:type request: `transport.request.Request`
:param queue_name: Queue reference name.
:type queue_name: `six.text_type`
:param message_id: Message reference.
:param message_id: `six.text_type`
:param callback: Optional callable to use as callback.
If specified, this request will be sent asynchronously.
(IGNORED UNTIL ASYNC SUPPORT IS COMPLETE)
:type callback: Callable object.
"""
request.operation = 'message_get'
request.params['queue_name'] = queue_name
request.params['message_id'] = message_id
resp = transport.send(request)
return json.loads(resp.content)
def message_get_many(transport, request, queue_name, messages, callback=None):
"""Gets many messages by id
:param transport: Transport instance to use
:type transport: `transport.base.Transport`
:param request: Request instance ready to be sent.
:type request: `transport.request.Request`
:param queue_name: Queue reference name.
:type queue_name: `six.text_type`
:param messages: Messages references.
:param messages: list of `six.text_type`
:param callback: Optional callable to use as callback.
If specified, this request will be sent asynchronously.
(IGNORED UNTIL ASYNC SUPPORT IS COMPLETE)
:type callback: Callable object.
"""
request.operation = 'message_get_many'
request.params['queue_name'] = queue_name
request.params['ids'] = messages
resp = transport.send(request)
return json.loads(resp.content)
def message_delete(transport, request, queue_name, message_id, callback=None):
"""Deletes messages from `queue_name`
:param transport: Transport instance to use
:type transport: `transport.base.Transport`
:param request: Request instance ready to be sent.
:type request: `transport.request.Request`
:param queue_name: Queue reference name.
:type queue_name: `six.text_type`
:param message_id: Message reference.
:param message_id: `six.text_type`
:param callback: Optional callable to use as callback.
If specified, this request will be sent asynchronously.
(IGNORED UNTIL ASYNC SUPPORT IS COMPLETE)
:type callback: Callable object.
"""
request.operation = 'message_delete'
request.params['queue_name'] = queue_name
request.params['message_id'] = message_id
return transport.send(request)

View File

@ -52,6 +52,8 @@ class Queue(object):
endpoint=self.client.api_url,
api=api)
req.headers['Client-ID'] = self.client.client_uuid
trans = self._get_transport(req)
return req, trans
@ -101,3 +103,73 @@ class Queue(object):
def delete(self):
req, trans = self._request_and_transport()
core.queue_delete(trans, req, self._id)
# Messages API
def post(self, messages):
"""Posts one or more messages to this queue
:param messages: One or more messages to post
:type messages: `list` or `dict`
:returns: A dict with the result of this operation.
:rtype: `dict`
"""
if not isinstance(messages, list):
messages = [messages]
req, trans = self._request_and_transport()
# TODO(flaper87): Return a list of messages
return core.message_post(trans, req,
self._id, messages)
def message(self, message_id):
"""Gets a message by id
:param message_id: Message's reference
:type message_id: `six.text_type`
:returns: A message
:rtype: `dict`
"""
req, trans = self._request_and_transport()
return core.message_get(trans, req, self._id,
message_id)
def messages(self, *messages, **params):
"""Gets a list of messages from the server
This method returns a list of messages, it can be
used to retrieve a set of messages by id or to
walk through the active messages by using the
collection endpoint.
The `messages` and `params` params are mutually exclusive
and the former has the priority.
:param messages: List of messages' ids to retrieve.
:type messages: *args of `six.string_type`
:param params: Filters to use for getting messages
:type params: **kwargs dict.
:returns: List of messages
:rtype: `list`
"""
req, trans = self._request_and_transport()
# TODO(flaper87): Return a MessageIterator.
# This iterator should handle limits, pagination
# and messages deserialization.
if messages:
return core.message_get_many(trans, req,
self._id, messages)
# NOTE(flaper87): It's safe to access messages
# directly. If something wrong happens, the core
# API will raise the right exceptions.
return core.message_list(trans, req,
self._id,
**params)['messages']

View File

@ -104,6 +104,100 @@ class QueuesV1QueueTestBase(base.TestBase):
# just checking our way down to the transport
# doesn't crash.
def test_message_post(self):
messages = [{'ttl': 30, 'body': 'Post It!'}]
result = {
"resources": [
"/v1/queues/fizbit/messages/50b68a50d6f5b8c8a7c62b01"
],
"partial": False
}
with mock.patch.object(self.transport, 'send',
autospec=True) as send_method:
resp = response.Response(None, json.dumps(result))
send_method.return_value = resp
posted = self.queue.post(messages)
self.assertEqual(result, posted)
def test_message_list(self):
returned = {
'links': [{
'rel': 'next',
'href': '/v1/queues/fizbit/messages?marker=6244-244224-783'
}],
'messages': [{
'href': '/v1/queues/fizbit/messages/50b68a50d6f5b8c8a7c62b01',
'ttl': 800,
'age': 790,
'body': {'event': 'ActivateAccount',
'mode': 'active'}
}]
}
with mock.patch.object(self.transport, 'send',
autospec=True) as send_method:
resp = response.Response(None, json.dumps(returned))
send_method.return_value = resp
self.queue.messages(limit=1)
# NOTE(flaper87): Nothing to assert here,
# just checking our way down to the transport
# doesn't crash.
def test_message_get(self):
returned = {
'href': '/v1/queues/fizbit/messages/50b68a50d6f5b8c8a7c62b01',
'ttl': 800,
'age': 790,
'body': {'event': 'ActivateAccount', 'mode': 'active'}
}
with mock.patch.object(self.transport, 'send',
autospec=True) as send_method:
resp = response.Response(None, json.dumps(returned))
send_method.return_value = resp
msg = self.queue.message('50b68a50d6f5b8c8a7c62b01')
self.assertTrue(isinstance(msg, dict))
# NOTE(flaper87): Nothing to assert here,
# just checking our way down to the transport
# doesn't crash.
def test_message_get_many(self):
returned = [{
'href': '/v1/queues/fizbit/messages/50b68a50d6f5b8c8a7c62b01',
'ttl': 800,
'age': 790,
'body': {'event': 'ActivateAccount', 'mode': 'active'}
}, {
'href': '/v1/queues/fizbit/messages/50b68a50d6f5b8c8a7c62b02',
'ttl': 800,
'age': 790,
'body': {'event': 'ActivateAccount', 'mode': 'active'}
}]
with mock.patch.object(self.transport, 'send',
autospec=True) as send_method:
resp = response.Response(None, json.dumps(returned))
send_method.return_value = resp
msg = self.queue.messages('50b68a50d6f5b8c8a7c62b01',
'50b68a50d6f5b8c8a7c62b02')
self.assertTrue(isinstance(msg, list))
# NOTE(flaper87): Nothing to assert here,
# just checking our way down to the transport
# doesn't crash.
class QueuesV1QueueFuncMixin(object):
@ -154,3 +248,86 @@ class QueuesV1QueueFuncMixin(object):
queue._metadata = 'test'
metadata = queue.metadata(force_reload=True)
self.assertEqual(metadata, test_metadata)
@testtools.skipUnless(_RUN_FUNCTIONAL,
'Functional tests disabled')
def test_message_post_functional(self):
messages = [
{'ttl': 60, 'body': 'Post It!'},
{'ttl': 60, 'body': 'Post It!'},
{'ttl': 60, 'body': 'Post It!'},
]
queue = self.client.queue("nonono")
queue._get_transport = mock.Mock(return_value=self.transport)
result = queue.post(messages)
self.assertIn('resources', result)
self.assertEqual(len(result['resources']), 3)
@testtools.skipUnless(_RUN_FUNCTIONAL,
'Functional tests disabled')
def test_message_list_functional(self):
queue = self.client.queue("test_queue")
queue._get_transport = mock.Mock(return_value=self.transport)
messages = [{'ttl': 60, 'body': 'Post It 1!'}]
queue.post(messages)
messages = queue.messages()
self.assertTrue(isinstance(messages, list))
self.assertGreaterEqual(len(messages), 0)
@testtools.skipUnless(_RUN_FUNCTIONAL,
'Functional tests disabled')
def test_message_list_echo_functional(self):
queue = self.client.queue("test_queue")
queue._get_transport = mock.Mock(return_value=self.transport)
messages = [
{'ttl': 60, 'body': 'Post It 1!'},
{'ttl': 60, 'body': 'Post It 2!'},
{'ttl': 60, 'body': 'Post It 3!'},
]
queue.post(messages)
messages = queue.messages(echo=True)
self.assertTrue(isinstance(messages, list))
self.assertGreaterEqual(len(messages), 3)
@testtools.skipUnless(_RUN_FUNCTIONAL,
'Functional tests disabled')
def test_message_get_functional(self):
queue = self.client.queue("test_queue")
queue._get_transport = mock.Mock(return_value=self.transport)
messages = [
{'ttl': 60, 'body': 'Post It 1!'},
{'ttl': 60, 'body': 'Post It 2!'},
{'ttl': 60, 'body': 'Post It 3!'},
]
res = queue.post(messages)['resources']
msg_id = res[0].split('/')[-1]
message = queue.message(msg_id)
self.assertTrue(isinstance(message, dict))
self.assertEqual(message['href'], res[0])
@testtools.skipUnless(_RUN_FUNCTIONAL,
'Functional tests disabled')
def test_message_get_many_functional(self):
queue = self.client.queue("test_queue")
queue._get_transport = mock.Mock(return_value=self.transport)
messages = [
{'ttl': 60, 'body': 'Post It 1!'},
# NOTE(falper87): Waiting for
# https://github.com/racker/falcon/issues/198
#{'ttl': 60, 'body': 'Post It 2!'},
#{'ttl': 60, 'body': 'Post It 3!'},
]
res = queue.post(messages)['resources']
msgs_id = [ref.split('/')[-1] for ref in res]
messages = queue.messages(*msgs_id)
self.assertTrue(isinstance(messages, list))
self.assertEqual(len(messages), 1)

View File

@ -51,7 +51,16 @@ class HttpTransport(base.Transport):
for param in list(request.params.keys()):
if '{{{0}}}'.format(param) in ref:
ref_params[param] = request.params.pop(param)
value = request.params.pop(param)
# NOTE(flaper87): Marconi API parses
# sequences encoded as '1,2,3,4'. Let's
# encode lists, tuples and sets before
# sending them to the server.
if isinstance(value, (list, tuple, set)):
value = ','.join(value)
ref_params[param] = value
url = '{0}/{1}'.format(request.endpoint.rstrip('/'),
ref.format(**ref_params))

View File

@ -13,6 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import mock
from marconiclient.queues.v1 import core
@ -87,4 +88,81 @@ class TestV1Core(base.TestBase):
core.queue_exists(self.transport, req, update_data, 'test')
self.assertIn('queue_name', req.params)
self.assertIn('queue_name', req.params)
def test_message_post(self):
messages = [{'ttl': 30, 'body': 'Post It!'}]
with mock.patch.object(self.transport, 'send',
autospec=True) as send_method:
resp = response.Response(None, '{}')
send_method.return_value = resp
req = request.Request()
core.message_post(self.transport, req, 'test', messages)
self.assertIn('queue_name', req.params)
self.assertEqual(json.loads(req.content),
messages)
def test_message_list(self):
with mock.patch.object(self.transport, 'send',
autospec=True) as send_method:
resp = response.Response(None, '{}')
send_method.return_value = resp
req = request.Request()
core.message_list(self.transport, req, 'test')
self.assertIn('queue_name', req.params)
def test_message_list_kwargs(self):
with mock.patch.object(self.transport, 'send',
autospec=True) as send_method:
resp = response.Response(None, '{}')
send_method.return_value = resp
req = request.Request()
core.message_list(self.transport, req, 'test',
marker='supermarket',
echo=False, limit=10)
self.assertIn('queue_name', req.params)
self.assertIn('limit', req.params)
self.assertIn('echo', req.params)
self.assertIn('marker', req.params)
def test_message_get_many(self):
with mock.patch.object(self.transport, 'send',
autospec=True) as send_method:
resp = response.Response(None, '{}')
send_method.return_value = resp
req = request.Request()
ids = ['a', 'b']
core.message_get_many(self.transport, req,
'test', ids)
self.assertIn('queue_name', req.params)
self.assertIn('ids', req.params)
self.assertEqual(ids, req.params['ids'])
def test_message_get(self):
with mock.patch.object(self.transport, 'send',
autospec=True) as send_method:
resp = response.Response(None, '{}')
send_method.return_value = resp
req = request.Request()
core.message_get(self.transport, req,
'test', 'message_id')
def test_message_delete(self):
with mock.patch.object(self.transport, 'send',
autospec=True) as send_method:
resp = response.Response(None, None)
send_method.return_value = resp
req = request.Request()
core.message_delete(self.transport, req,
'test', 'message_id')