From 664c366bd877e345463da2a85559e369abec0c8a Mon Sep 17 00:00:00 2001 From: Flavio Percoco Date: Thu, 17 Oct 2013 15:35:35 +0200 Subject: [PATCH] Bootstrap Messages support This patch implements some of the operations required to add Message support to marconiclient. The higher level implementation of message deletion is not present in this patch, since it requires the definition of a new Message type, which will submitted in the upcoming patch. Partially-Implements blueprint python-marconiclient-v1 Partially-Implements blueprint messages-management Change-Id: I42efe24e0e6083c530d494e95b66c6c01535a849 --- marconiclient/queues/v1/api.py | 56 +++++++++ marconiclient/queues/v1/client.py | 8 ++ marconiclient/queues/v1/core.py | 141 +++++++++++++++++++++ marconiclient/queues/v1/queues.py | 72 +++++++++++ marconiclient/tests/queues/queues.py | 177 +++++++++++++++++++++++++++ marconiclient/transport/http.py | 11 +- tests/unit/queues/v1/test_core.py | 80 +++++++++++- 7 files changed, 543 insertions(+), 2 deletions(-) diff --git a/marconiclient/queues/v1/api.py b/marconiclient/queues/v1/api.py index 53d5d185..e0c8e3e5 100644 --- a/marconiclient/queues/v1/api.py +++ b/marconiclient/queues/v1/api.py @@ -85,4 +85,60 @@ class V1(api.Api): 'queue_name': {'type': 'string'} } }, + + 'message_list': { + 'ref': 'queues/{queue_name}/messages', + 'method': 'GET', + 'required': ['queue_name'], + 'properties': { + 'queue_name': {'type': 'string'}, + 'marker': {'type': 'string'}, + 'limit': {'type': 'integer'}, + 'echo': {'type': 'boolean'}, + 'include_claimed': {'type': 'boolean'}, + } + }, + + 'message_post': { + 'ref': 'queues/{queue_name}/messages', + 'method': 'POST', + 'required': ['queue_name', 'message_id'], + 'properties': { + 'queue_name': {'type': 'string'}, + 'claim_id': {'type': 'string'}, + } + }, + + 'message_get': { + 'ref': 'queues/{queue_name}/messages/{message_id}', + 'method': 'GET', + 'required': ['queue_name', 'message_id'], + 'properties': { + 'queue_name': {'type': 'string'}, + 'message_id': {'type': 'string'}, + 'claim_id': {'type': 'string'}, + } + }, + + 'message_get_many': { + 'ref': 'queues/{queue_name}/messages', + 'method': 'GET', + 'required': ['queue_name', 'ids'], + 'properties': { + 'queue_name': {'type': 'string'}, + 'ids': {'type': 'string'}, + 'claim_id': {'type': 'string'}, + } + }, + + 'message_delete': { + 'ref': 'queues/{queue_name}/messages/{message_id}', + 'method': 'DELETE', + 'required': ['queue_name', 'message_id'], + 'properties': { + 'queue_name': {'type': 'string'}, + 'message_id': {'type': 'string'}, + 'claim_id': {'type': 'string'}, + } + }, } diff --git a/marconiclient/queues/v1/client.py b/marconiclient/queues/v1/client.py index fbb94b0a..a31db9d4 100644 --- a/marconiclient/queues/v1/client.py +++ b/marconiclient/queues/v1/client.py @@ -13,6 +13,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +import uuid + from oslo.config import cfg from marconiclient.queues.v1 import queues @@ -22,6 +24,10 @@ from marconiclient import transport _CLIENT_OPTIONS = [ cfg.StrOpt('os_queues_url', help='Queues remote URL'), + + cfg.StrOpt('client_uuid', + default=uuid.uuid4().hex, + help='Client UUID'), ] @@ -38,6 +44,8 @@ class Client(object): self.api_url = self.conf.os_queues_url or url self.api_version = version + self.client_uuid = self.conf.client_uuid + def transport(self): """Gets a transport based on conf.""" return transport.get_transport_for_conf(self.conf) diff --git a/marconiclient/queues/v1/core.py b/marconiclient/queues/v1/core.py index 93f0c31d..cd657824 100644 --- a/marconiclient/queues/v1/core.py +++ b/marconiclient/queues/v1/core.py @@ -91,3 +91,144 @@ def queue_delete(transport, request, name, callback=None): """Deletes queue.""" return _common_queue_ops('queue_delete', transport, request, name, callback=callback) + + +def message_list(transport, request, queue_name, callback=None, **kwargs): + """Gets a list of messages in queue `queue_name` + + :param transport: Transport instance to use + :type transport: `transport.base.Transport` + :param request: Request instance ready to be sent. + :type request: `transport.request.Request` + :param queue_name: Queue reference name. + :type queue_name: `six.text_type` + :param callback: Optional callable to use as callback. + If specified, this request will be sent asynchronously. + (IGNORED UNTIL ASYNC SUPPORT IS COMPLETE) + :type callback: Callable object. + :param kwargs: Optional arguments for this operation. + - marker: Where to start getting messages from. + - limit: Maximum number of messages to get. + - echo: Whether to get our own messages. + - include_claimed: Whether to include claimed + messages. + """ + + request.operation = 'message_list' + request.params['queue_name'] = queue_name + + # NOTE(flaper87): Assume passed params + # are accepted by the API, if not, the + # API itself will raise an error. + request.params.update(kwargs) + + resp = transport.send(request) + + if not resp.content: + # NOTE(flaper87): We could also return None + # or an empty dict, however, we're giving + # more value to a consistent API here by + # returning a compliant dict with empty + # `links` and `messages` + return {'links': [], 'messages': []} + + return json.loads(resp.content) + + +def message_post(transport, request, queue_name, messages, callback=None): + """Post messages to `queue_name` + + :param transport: Transport instance to use + :type transport: `transport.base.Transport` + :param request: Request instance ready to be sent. + :type request: `transport.request.Request` + :param queue_name: Queue reference name. + :type queue_name: `six.text_type` + :param messages: One or more messages to post. + :param messages: `list` + :param callback: Optional callable to use as callback. + If specified, this request will be sent asynchronously. + (IGNORED UNTIL ASYNC SUPPORT IS COMPLETE) + :type callback: Callable object. + """ + + request.operation = 'message_post' + request.params['queue_name'] = queue_name + request.content = json.dumps(messages) + + resp = transport.send(request) + return json.loads(resp.content) + + +def message_get(transport, request, queue_name, message_id, callback=None): + """Gets one message from the queue by id + + :param transport: Transport instance to use + :type transport: `transport.base.Transport` + :param request: Request instance ready to be sent. + :type request: `transport.request.Request` + :param queue_name: Queue reference name. + :type queue_name: `six.text_type` + :param message_id: Message reference. + :param message_id: `six.text_type` + :param callback: Optional callable to use as callback. + If specified, this request will be sent asynchronously. + (IGNORED UNTIL ASYNC SUPPORT IS COMPLETE) + :type callback: Callable object. + """ + + request.operation = 'message_get' + request.params['queue_name'] = queue_name + request.params['message_id'] = message_id + + resp = transport.send(request) + return json.loads(resp.content) + + +def message_get_many(transport, request, queue_name, messages, callback=None): + """Gets many messages by id + + :param transport: Transport instance to use + :type transport: `transport.base.Transport` + :param request: Request instance ready to be sent. + :type request: `transport.request.Request` + :param queue_name: Queue reference name. + :type queue_name: `six.text_type` + :param messages: Messages references. + :param messages: list of `six.text_type` + :param callback: Optional callable to use as callback. + If specified, this request will be sent asynchronously. + (IGNORED UNTIL ASYNC SUPPORT IS COMPLETE) + :type callback: Callable object. + """ + + request.operation = 'message_get_many' + request.params['queue_name'] = queue_name + request.params['ids'] = messages + + resp = transport.send(request) + return json.loads(resp.content) + + +def message_delete(transport, request, queue_name, message_id, callback=None): + """Deletes messages from `queue_name` + + :param transport: Transport instance to use + :type transport: `transport.base.Transport` + :param request: Request instance ready to be sent. + :type request: `transport.request.Request` + :param queue_name: Queue reference name. + :type queue_name: `six.text_type` + :param message_id: Message reference. + :param message_id: `six.text_type` + :param callback: Optional callable to use as callback. + If specified, this request will be sent asynchronously. + (IGNORED UNTIL ASYNC SUPPORT IS COMPLETE) + :type callback: Callable object. + """ + + request.operation = 'message_delete' + request.params['queue_name'] = queue_name + request.params['message_id'] = message_id + + return transport.send(request) diff --git a/marconiclient/queues/v1/queues.py b/marconiclient/queues/v1/queues.py index 7b37d9c1..db7f5256 100644 --- a/marconiclient/queues/v1/queues.py +++ b/marconiclient/queues/v1/queues.py @@ -52,6 +52,8 @@ class Queue(object): endpoint=self.client.api_url, api=api) + req.headers['Client-ID'] = self.client.client_uuid + trans = self._get_transport(req) return req, trans @@ -101,3 +103,73 @@ class Queue(object): def delete(self): req, trans = self._request_and_transport() core.queue_delete(trans, req, self._id) + + # Messages API + + def post(self, messages): + """Posts one or more messages to this queue + + :param messages: One or more messages to post + :type messages: `list` or `dict` + + :returns: A dict with the result of this operation. + :rtype: `dict` + """ + if not isinstance(messages, list): + messages = [messages] + + req, trans = self._request_and_transport() + + # TODO(flaper87): Return a list of messages + return core.message_post(trans, req, + self._id, messages) + + def message(self, message_id): + """Gets a message by id + + :param message_id: Message's reference + :type message_id: `six.text_type` + + :returns: A message + :rtype: `dict` + """ + req, trans = self._request_and_transport() + return core.message_get(trans, req, self._id, + message_id) + + def messages(self, *messages, **params): + """Gets a list of messages from the server + + This method returns a list of messages, it can be + used to retrieve a set of messages by id or to + walk through the active messages by using the + collection endpoint. + + The `messages` and `params` params are mutually exclusive + and the former has the priority. + + :param messages: List of messages' ids to retrieve. + :type messages: *args of `six.string_type` + + :param params: Filters to use for getting messages + :type params: **kwargs dict. + + :returns: List of messages + :rtype: `list` + """ + req, trans = self._request_and_transport() + + # TODO(flaper87): Return a MessageIterator. + # This iterator should handle limits, pagination + # and messages deserialization. + + if messages: + return core.message_get_many(trans, req, + self._id, messages) + + # NOTE(flaper87): It's safe to access messages + # directly. If something wrong happens, the core + # API will raise the right exceptions. + return core.message_list(trans, req, + self._id, + **params)['messages'] diff --git a/marconiclient/tests/queues/queues.py b/marconiclient/tests/queues/queues.py index 27901a92..5c1aa263 100644 --- a/marconiclient/tests/queues/queues.py +++ b/marconiclient/tests/queues/queues.py @@ -104,6 +104,100 @@ class QueuesV1QueueTestBase(base.TestBase): # just checking our way down to the transport # doesn't crash. + def test_message_post(self): + messages = [{'ttl': 30, 'body': 'Post It!'}] + + result = { + "resources": [ + "/v1/queues/fizbit/messages/50b68a50d6f5b8c8a7c62b01" + ], + "partial": False + } + + with mock.patch.object(self.transport, 'send', + autospec=True) as send_method: + + resp = response.Response(None, json.dumps(result)) + send_method.return_value = resp + + posted = self.queue.post(messages) + self.assertEqual(result, posted) + + def test_message_list(self): + returned = { + 'links': [{ + 'rel': 'next', + 'href': '/v1/queues/fizbit/messages?marker=6244-244224-783' + }], + 'messages': [{ + 'href': '/v1/queues/fizbit/messages/50b68a50d6f5b8c8a7c62b01', + 'ttl': 800, + 'age': 790, + 'body': {'event': 'ActivateAccount', + 'mode': 'active'} + }] + } + + with mock.patch.object(self.transport, 'send', + autospec=True) as send_method: + + resp = response.Response(None, json.dumps(returned)) + send_method.return_value = resp + + self.queue.messages(limit=1) + + # NOTE(flaper87): Nothing to assert here, + # just checking our way down to the transport + # doesn't crash. + + def test_message_get(self): + returned = { + 'href': '/v1/queues/fizbit/messages/50b68a50d6f5b8c8a7c62b01', + 'ttl': 800, + 'age': 790, + 'body': {'event': 'ActivateAccount', 'mode': 'active'} + } + + with mock.patch.object(self.transport, 'send', + autospec=True) as send_method: + + resp = response.Response(None, json.dumps(returned)) + send_method.return_value = resp + + msg = self.queue.message('50b68a50d6f5b8c8a7c62b01') + self.assertTrue(isinstance(msg, dict)) + + # NOTE(flaper87): Nothing to assert here, + # just checking our way down to the transport + # doesn't crash. + + def test_message_get_many(self): + returned = [{ + 'href': '/v1/queues/fizbit/messages/50b68a50d6f5b8c8a7c62b01', + 'ttl': 800, + 'age': 790, + 'body': {'event': 'ActivateAccount', 'mode': 'active'} + }, { + 'href': '/v1/queues/fizbit/messages/50b68a50d6f5b8c8a7c62b02', + 'ttl': 800, + 'age': 790, + 'body': {'event': 'ActivateAccount', 'mode': 'active'} + }] + + with mock.patch.object(self.transport, 'send', + autospec=True) as send_method: + + resp = response.Response(None, json.dumps(returned)) + send_method.return_value = resp + + msg = self.queue.messages('50b68a50d6f5b8c8a7c62b01', + '50b68a50d6f5b8c8a7c62b02') + self.assertTrue(isinstance(msg, list)) + + # NOTE(flaper87): Nothing to assert here, + # just checking our way down to the transport + # doesn't crash. + class QueuesV1QueueFuncMixin(object): @@ -154,3 +248,86 @@ class QueuesV1QueueFuncMixin(object): queue._metadata = 'test' metadata = queue.metadata(force_reload=True) self.assertEqual(metadata, test_metadata) + + @testtools.skipUnless(_RUN_FUNCTIONAL, + 'Functional tests disabled') + def test_message_post_functional(self): + messages = [ + {'ttl': 60, 'body': 'Post It!'}, + {'ttl': 60, 'body': 'Post It!'}, + {'ttl': 60, 'body': 'Post It!'}, + ] + + queue = self.client.queue("nonono") + queue._get_transport = mock.Mock(return_value=self.transport) + result = queue.post(messages) + self.assertIn('resources', result) + self.assertEqual(len(result['resources']), 3) + + @testtools.skipUnless(_RUN_FUNCTIONAL, + 'Functional tests disabled') + def test_message_list_functional(self): + queue = self.client.queue("test_queue") + queue._get_transport = mock.Mock(return_value=self.transport) + + messages = [{'ttl': 60, 'body': 'Post It 1!'}] + queue.post(messages) + + messages = queue.messages() + self.assertTrue(isinstance(messages, list)) + self.assertGreaterEqual(len(messages), 0) + + @testtools.skipUnless(_RUN_FUNCTIONAL, + 'Functional tests disabled') + def test_message_list_echo_functional(self): + queue = self.client.queue("test_queue") + queue._get_transport = mock.Mock(return_value=self.transport) + + messages = [ + {'ttl': 60, 'body': 'Post It 1!'}, + {'ttl': 60, 'body': 'Post It 2!'}, + {'ttl': 60, 'body': 'Post It 3!'}, + ] + queue.post(messages) + messages = queue.messages(echo=True) + self.assertTrue(isinstance(messages, list)) + self.assertGreaterEqual(len(messages), 3) + + @testtools.skipUnless(_RUN_FUNCTIONAL, + 'Functional tests disabled') + def test_message_get_functional(self): + queue = self.client.queue("test_queue") + queue._get_transport = mock.Mock(return_value=self.transport) + + messages = [ + {'ttl': 60, 'body': 'Post It 1!'}, + {'ttl': 60, 'body': 'Post It 2!'}, + {'ttl': 60, 'body': 'Post It 3!'}, + ] + + res = queue.post(messages)['resources'] + msg_id = res[0].split('/')[-1] + message = queue.message(msg_id) + self.assertTrue(isinstance(message, dict)) + self.assertEqual(message['href'], res[0]) + + @testtools.skipUnless(_RUN_FUNCTIONAL, + 'Functional tests disabled') + def test_message_get_many_functional(self): + queue = self.client.queue("test_queue") + queue._get_transport = mock.Mock(return_value=self.transport) + + messages = [ + {'ttl': 60, 'body': 'Post It 1!'}, + + # NOTE(falper87): Waiting for + # https://github.com/racker/falcon/issues/198 + #{'ttl': 60, 'body': 'Post It 2!'}, + #{'ttl': 60, 'body': 'Post It 3!'}, + ] + + res = queue.post(messages)['resources'] + msgs_id = [ref.split('/')[-1] for ref in res] + messages = queue.messages(*msgs_id) + self.assertTrue(isinstance(messages, list)) + self.assertEqual(len(messages), 1) diff --git a/marconiclient/transport/http.py b/marconiclient/transport/http.py index 93dba3ac..80ca8cb4 100644 --- a/marconiclient/transport/http.py +++ b/marconiclient/transport/http.py @@ -51,7 +51,16 @@ class HttpTransport(base.Transport): for param in list(request.params.keys()): if '{{{0}}}'.format(param) in ref: - ref_params[param] = request.params.pop(param) + value = request.params.pop(param) + + # NOTE(flaper87): Marconi API parses + # sequences encoded as '1,2,3,4'. Let's + # encode lists, tuples and sets before + # sending them to the server. + if isinstance(value, (list, tuple, set)): + value = ','.join(value) + + ref_params[param] = value url = '{0}/{1}'.format(request.endpoint.rstrip('/'), ref.format(**ref_params)) diff --git a/tests/unit/queues/v1/test_core.py b/tests/unit/queues/v1/test_core.py index 2d0d3209..0923c779 100644 --- a/tests/unit/queues/v1/test_core.py +++ b/tests/unit/queues/v1/test_core.py @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import json import mock from marconiclient.queues.v1 import core @@ -87,4 +88,81 @@ class TestV1Core(base.TestBase): core.queue_exists(self.transport, req, update_data, 'test') self.assertIn('queue_name', req.params) - self.assertIn('queue_name', req.params) + def test_message_post(self): + messages = [{'ttl': 30, 'body': 'Post It!'}] + + with mock.patch.object(self.transport, 'send', + autospec=True) as send_method: + resp = response.Response(None, '{}') + send_method.return_value = resp + + req = request.Request() + + core.message_post(self.transport, req, 'test', messages) + self.assertIn('queue_name', req.params) + self.assertEqual(json.loads(req.content), + messages) + + def test_message_list(self): + with mock.patch.object(self.transport, 'send', + autospec=True) as send_method: + resp = response.Response(None, '{}') + send_method.return_value = resp + + req = request.Request() + + core.message_list(self.transport, req, 'test') + self.assertIn('queue_name', req.params) + + def test_message_list_kwargs(self): + with mock.patch.object(self.transport, 'send', + autospec=True) as send_method: + resp = response.Response(None, '{}') + send_method.return_value = resp + + req = request.Request() + + core.message_list(self.transport, req, 'test', + marker='supermarket', + echo=False, limit=10) + + self.assertIn('queue_name', req.params) + self.assertIn('limit', req.params) + self.assertIn('echo', req.params) + self.assertIn('marker', req.params) + + def test_message_get_many(self): + with mock.patch.object(self.transport, 'send', + autospec=True) as send_method: + resp = response.Response(None, '{}') + send_method.return_value = resp + + req = request.Request() + + ids = ['a', 'b'] + core.message_get_many(self.transport, req, + 'test', ids) + + self.assertIn('queue_name', req.params) + self.assertIn('ids', req.params) + self.assertEqual(ids, req.params['ids']) + + def test_message_get(self): + with mock.patch.object(self.transport, 'send', + autospec=True) as send_method: + resp = response.Response(None, '{}') + send_method.return_value = resp + + req = request.Request() + core.message_get(self.transport, req, + 'test', 'message_id') + + def test_message_delete(self): + with mock.patch.object(self.transport, 'send', + autospec=True) as send_method: + resp = response.Response(None, None) + send_method.return_value = resp + + req = request.Request() + core.message_delete(self.transport, req, + 'test', 'message_id')