diff --git a/tests/etc/websocket_mongodb.conf b/tests/etc/websocket_mongodb.conf new file mode 100644 index 000000000..f9df70b51 --- /dev/null +++ b/tests/etc/websocket_mongodb.conf @@ -0,0 +1,19 @@ +[DEFAULT] + +[drivers] + +# Transport driver to use (string value) +transport = websocket + +# Storage driver to use (string value) +storage = mongodb + +[drivers:management_store:mongodb] + +# Mongodb Connection URI +uri = mongodb://127.0.0.1:27017 + +[drivers:message_store:mongodb] + +# Mongodb Connection URI +uri = mongodb://127.0.0.1:27017 \ No newline at end of file diff --git a/tests/unit/test_bootstrap.py b/tests/unit/test_bootstrap.py index 6594bf3e4..7b3d14694 100644 --- a/tests/unit/test_bootstrap.py +++ b/tests/unit/test_bootstrap.py @@ -19,6 +19,7 @@ from zaqar.storage import pipeline from zaqar.storage import pooling from zaqar.storage import sqlalchemy from zaqar.tests import base +from zaqar.transport import websocket from zaqar.transport import wsgi @@ -52,3 +53,7 @@ class TestBootstrap(base.TestBase): def test_transport_wsgi(self): bootstrap = self._bootstrap('wsgi_sqlalchemy.conf') self.assertIsInstance(bootstrap.transport, wsgi.Driver) + + def test_transport_websocket(self): + bootstrap = self._bootstrap('websocket_mongodb.conf') + self.assertIsInstance(bootstrap.transport, websocket.Driver) diff --git a/zaqar/api/v1_1/endpoints.py b/zaqar/api/v1_1/endpoints.py index 831ff9fc7..d9d531bdf 100644 --- a/zaqar/api/v1_1/endpoints.py +++ b/zaqar/api/v1_1/endpoints.py @@ -17,7 +17,6 @@ from zaqar.common.api import utils as api_utils from zaqar.i18n import _ import zaqar.openstack.common.log as logging from zaqar.storage import errors as storage_errors -from zaqar.transport import utils from zaqar.transport import validation LOG = logging.getLogger(__name__) @@ -69,7 +68,7 @@ class Endpoints(object): LOG.debug(ex) headers = {'status': 400} return api_utils.error_response(req, ex, headers) - except storage_errors.BaseException as ex: + except storage_errors.ExceptionBase as ex: LOG.exception(ex) error = 'Queues could not be listed.' headers = {'status': 503} @@ -79,7 +78,7 @@ class Endpoints(object): queues = list(next(results)) # Got some. Prepare the response. - body = utils.to_json({'queues': queues}) + body = {'queues': queues} headers = {'status': 200} resp = response.Response(req, body, headers) @@ -97,7 +96,7 @@ class Endpoints(object): """ project_id = req._headers.get('X-Project-ID') queue_name = req._body.get('queue_name') - metadata = req._body.get('metadata') + metadata = req._body.get('metadata', {}) LOG.debug(u'Queue create - queue: %(queue)s, project: %(project)s', {'queue': queue_name, @@ -105,7 +104,7 @@ class Endpoints(object): try: self._validate.queue_identification(queue_name, project_id) - self._validate.queue_metadata_length(len(metadata)) + self._validate.queue_metadata_length(len(str(metadata))) created = self._queue_controller.create(queue_name, metadata=metadata, project=project_id) @@ -113,13 +112,13 @@ class Endpoints(object): LOG.debug(ex) headers = {'status': 400} return api_utils.error_response(req, ex, headers) - except storage_errors.BaseException as ex: + except storage_errors.ExceptionBase as ex: LOG.exception(ex) - error = _('Queue "%s" could not be created.') % queue_name + error = _('Queue %s could not be created.') % queue_name headers = {'status': 503} return api_utils.error_response(req, ex, headers, error) else: - body = _('Queue "%s" created.') % queue_name + body = _('Queue %s created.') % queue_name headers = {'status': 201} if created else {'status': 204} resp = response.Response(req, body, headers) return resp @@ -140,13 +139,13 @@ class Endpoints(object): {'queue': queue_name, 'project': project_id}) try: self._queue_controller.delete(queue_name, project=project_id) - except storage_errors.BaseException as ex: + except storage_errors.ExceptionBase as ex: LOG.exception(ex) - error = _('Queue "%s" could not be deleted.') % queue_name + error = _('Queue %s could not be deleted.') % queue_name headers = {'status': 503} return api_utils.error_response(req, ex, headers, error) else: - body = _('Queue "%s" removed.') % queue_name + body = _('Queue %s removed.') % queue_name headers = {'status': 204} resp = response.Response(req, body, headers) return resp @@ -172,16 +171,16 @@ class Endpoints(object): project=project_id) except storage_errors.DoesNotExist as ex: LOG.debug(ex) - error = _('Queue "%s" does not exist.') % queue_name + error = _('Queue %s does not exist.') % queue_name headers = {'status': 404} return api_utils.error_response(req, ex, headers, error) - except storage_errors.BaseException as ex: + except storage_errors.ExceptionBase as ex: LOG.exception(ex) headers = {'status': 503} - error = _('Cannot retrieve queue "%s".') % queue_name + error = _('Cannot retrieve queue %s.') % queue_name return api_utils.error_response(req, ex, headers, error) else: - body = utils.to_json(resp_dict) + body = resp_dict headers = {'status': 200} resp = response.Response(req, body, headers) return resp @@ -198,10 +197,14 @@ class Endpoints(object): project_id = req._headers.get('X-Project-ID') queue_name = req._body.get('queue_name') + LOG.debug(u'Queue get queue stats - queue: %(queue)s, ' + u'project: %(project)s', + {'queue': queue_name, 'project': project_id}) + try: resp_dict = self._queue_controller.stats(queue_name, project=project_id) - body = utils.to_json(resp_dict) + body = resp_dict except storage_errors.QueueDoesNotExist as ex: LOG.exception(ex) resp_dict = { @@ -211,13 +214,13 @@ class Endpoints(object): 'total': 0 } } - body = utils.to_json(resp_dict) + body = resp_dict headers = {'status': 404} resp = response.Response(req, body, headers) return resp - except storage_errors.BaseException as ex: + except storage_errors.ExceptionBase as ex: LOG.exception(ex) - error = _('Cannot retrieve queue "%s" stats.') % queue_name + error = _('Cannot retrieve queue %s stats.') % queue_name headers = {'status': 503} return api_utils.error_response(req, ex, headers, error) else: diff --git a/zaqar/api/v1_1/request.py b/zaqar/api/v1_1/request.py index 0d6a0d0ae..875472e36 100644 --- a/zaqar/api/v1_1/request.py +++ b/zaqar/api/v1_1/request.py @@ -126,7 +126,7 @@ class RequestSchema(api.Api): 'queue_get': { 'properties': { - 'action': {'enum': ['queue_delete']}, + 'action': {'enum': ['queue_get']}, 'headers': { 'type': 'object', 'properties': headers, diff --git a/zaqar/common/api/request.py b/zaqar/common/api/request.py index 5b722bfea..3d4d8c117 100644 --- a/zaqar/common/api/request.py +++ b/zaqar/common/api/request.py @@ -48,6 +48,8 @@ class Request(object): return json.loads(self._body) return None - def __repr__(self): - return "{'api': %s, 'action': %s, 'headers': %s, 'body': %s}" % ( - self._api, self._action, self._headers, self._body) \ No newline at end of file + def get_request(self): + return {'action': self._action, + 'body': self._body, + 'headers': self._headers, + 'api': self._api} diff --git a/zaqar/common/api/response.py b/zaqar/common/api/response.py index e0a2cb37b..a843a0133 100644 --- a/zaqar/common/api/response.py +++ b/zaqar/common/api/response.py @@ -36,6 +36,7 @@ class Response(object): self._body = body self._headers = headers or {} - def __repr__(self): - return "{'req': %s, 'headers': %s, 'body': %s}" % ( - self._request, self._headers, self._body) + def get_response(self): + return {'request': self._request.get_request(), + 'body': self._body, + 'headers': self._headers} diff --git a/zaqar/common/api/utils.py b/zaqar/common/api/utils.py index 77e207f20..8578df486 100644 --- a/zaqar/common/api/utils.py +++ b/zaqar/common/api/utils.py @@ -17,7 +17,6 @@ import functools import zaqar.common.api.response as response from zaqar.i18n import _ import zaqar.openstack.common.log as logging -from zaqar.transport import utils LOG = logging.getLogger(__name__) @@ -43,7 +42,6 @@ def raises_conn_error(func): def error_response(req, exception, headers=None, error=None): - body = utils.to_json({'exception': exception, - 'error': error}) + body = {'exception': str(exception), 'error': error} resp = response.Response(req, body, headers) return resp \ No newline at end of file diff --git a/zaqar/tests/unit/transport/websocket/__init__.py b/zaqar/tests/unit/transport/websocket/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/zaqar/tests/unit/transport/websocket/base.py b/zaqar/tests/unit/transport/websocket/base.py new file mode 100644 index 000000000..e77e32646 --- /dev/null +++ b/zaqar/tests/unit/transport/websocket/base.py @@ -0,0 +1,104 @@ +# Copyright (c) 2015 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may not +# use this file except in compliance with the License. You may obtain a copy +# of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations under +# the License. + +from oslo_serialization import jsonutils + +from zaqar import bootstrap +from zaqar import tests as testing +from zaqar.transport import validation +from zaqar.transport.websocket import driver + + +class TestBase(testing.TestBase): + + config_file = None + + def setUp(self): + super(TestBase, self).setUp() + + if not self.config_file: + self.skipTest("No config specified") + + self.conf.register_opts(bootstrap._GENERAL_OPTIONS) + self.conf.register_opts(validation._TRANSPORT_LIMITS_OPTIONS, + group=validation._TRANSPORT_LIMITS_GROUP) + self.transport_cfg = self.conf[validation._TRANSPORT_LIMITS_GROUP] + + self.conf.register_opts(driver._WS_OPTIONS, + group=driver._WS_GROUP) + self.wsgi_cfg = self.conf[driver._WS_GROUP] + + self.conf.unreliable = True + self.conf.admin_mode = True + self.boot = bootstrap.Bootstrap(self.conf) + + self.transport = self.boot.transport + + def tearDown(self): + if self.conf.pooling: + self.boot.control.pools_controller.drop_all() + self.boot.control.catalogue_controller.drop_all() + super(TestBase, self).tearDown() + + +class TestBaseFaulty(TestBase): + """This test ensures we aren't letting any exceptions go unhandled.""" + + +class V1Base(TestBase): + """Base class for V1 API Tests. + + Should contain methods specific to V1 of the API + """ + pass + + +class V1BaseFaulty(TestBaseFaulty): + """Base class for V1 API Faulty Tests. + + Should contain methods specific to V1 exception testing + """ + pass + + +class V1_1Base(TestBase): + """Base class for V1.1 API Tests. + + Should contain methods specific to V1.1 of the API + """ + + def _empty_message_list(self, body): + self.assertEqual(jsonutils.loads(body[0])['messages'], []) + + +class V1_1BaseFaulty(TestBaseFaulty): + """Base class for V1.1 API Faulty Tests. + + Should contain methods specific to V1.1 exception testing + """ + pass + + +class V2Base(V1_1Base): + """Base class for V2 API Tests. + + Should contain methods specific to V2 of the API + """ + + +class V2BaseFaulty(V1_1BaseFaulty): + """Base class for V2 API Faulty Tests. + + Should contain methods specific to V2 exception testing + """ diff --git a/zaqar/tests/unit/transport/websocket/utils.py b/zaqar/tests/unit/transport/websocket/utils.py new file mode 100644 index 000000000..9ecf06282 --- /dev/null +++ b/zaqar/tests/unit/transport/websocket/utils.py @@ -0,0 +1,19 @@ +# Copyright (c) 2015 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may not +# use this file except in compliance with the License. You may obtain a copy +# of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations under +# the License. + +import json + + +def create_request(action, body, headers): + return json.dumps({"action": action, "body": body, "headers": headers}) \ No newline at end of file diff --git a/zaqar/tests/unit/transport/websocket/v1_1/__init__.py b/zaqar/tests/unit/transport/websocket/v1_1/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/zaqar/tests/unit/transport/websocket/v1_1/test_queue_lifecycle.py b/zaqar/tests/unit/transport/websocket/v1_1/test_queue_lifecycle.py new file mode 100644 index 000000000..a55bb59d8 --- /dev/null +++ b/zaqar/tests/unit/transport/websocket/v1_1/test_queue_lifecycle.py @@ -0,0 +1,584 @@ +# Copyright (c) 2015 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may not +# use this file except in compliance with the License. You may obtain a copy +# of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations under +# the License. + +import json +import uuid + +import ddt +import mock + +from zaqar import tests as testing +from zaqar.tests.unit.transport.websocket import base +from zaqar.tests.unit.transport.websocket import utils as test_utils + + +@ddt.ddt +class QueueLifecycleBaseTest(base.V1_1Base): + + config_file = "websocket_mongodb.conf" + + def setUp(self): + super(QueueLifecycleBaseTest, self).setUp() + self.protocol = self.transport.factory() + + def test_empty_project_id(self): + action = "queue_create" + body = {"queue_name": "kitkat", + "metadata": { + "key": { + "key2": "value", + "key3": [1, 2, 3, 4, 5]} + } + } + headers = {'Client-ID': str(uuid.uuid4())} + req = test_utils.create_request(action, body, headers) + + def validator(resp, isBinary): + resp = json.loads(resp) + self.assertEqual(resp['headers']['status'], 400) + + with mock.patch.object(self.protocol, 'sendMessage') as msg_mock: + msg_mock.side_effect = validator + self.protocol.onMessage(req, False) + + @ddt.data('480924', 'foo') + def test_basics_thoroughly(self, project_id): + # Stats are empty - queue not created yet + action = "queue_get_stats" + body = {"queue_name": "gummybears"} + headers = { + 'Client-ID': str(uuid.uuid4()), + 'X-Project-ID': project_id + } + + send_mock = mock.patch.object(self.protocol, 'sendMessage') + self.addCleanup(send_mock.stop) + send_mock.start() + + req = test_utils.create_request(action, body, headers) + + def validator(resp, isBinary): + resp = json.loads(resp) + self.assertEqual(resp['headers']['status'], 404) + + send_mock.side_effect = validator + self.protocol.onMessage(req, False) + + # Create + action = "queue_create" + body = {"queue_name": "gummybears", + "metadata": { + "key": { + "key2": "value", + "key3": [1, 2, 3, 4, 5]}, + "messages": {"ttl": 600}, + } + } + req = test_utils.create_request(action, body, headers) + + def validator(resp, isBinary): + resp = json.loads(resp) + self.assertEqual(resp['headers']['status'], 201) + + send_mock.side_effect = validator + self.protocol.onMessage(req, False) + + # Fetch metadata + action = "queue_get" + body = {"queue_name": "gummybears"} + meta = {"messages": {"ttl": 600}, + "key": { + "key2": "value", + "key3": [1, 2, 3, 4, 5]} + } + req = test_utils.create_request(action, body, headers) + + def validator(resp, isBinary): + resp = json.loads(resp) + self.assertEqual(resp['headers']['status'], 200) + self.assertEqual(json.dumps(resp['body']), json.dumps(meta)) + + send_mock.side_effect = validator + self.protocol.onMessage(req, False) + + # Stats empty queue + action = "queue_get_stats" + body = {"queue_name": "gummybears"} + req = test_utils.create_request(action, body, headers) + + def validator(resp, isBinary): + resp = json.loads(resp) + self.assertEqual(resp['headers']['status'], 200) + + send_mock.side_effect = validator + self.protocol.onMessage(req, False) + + # Delete + action = "queue_delete" + body = {"queue_name": "gummybears"} + req = test_utils.create_request(action, body, headers) + + def validator(resp, isBinary): + resp = json.loads(resp) + self.assertEqual(resp['headers']['status'], 204) + + send_mock.side_effect = validator + self.protocol.onMessage(req, False) + + # Get non-existent stats + action = "queue_get_stats" + body = {"queue_name": "gummybears"} + req = test_utils.create_request(action, body, headers) + + def validator(resp, isBinary): + resp = json.loads(resp) + self.assertEqual(resp['headers']['status'], 404) + + send_mock.side_effect = validator + self.protocol.onMessage(req, False) + + def test_name_restrictions(self): + headers = { + 'Client-ID': str(uuid.uuid4()), + 'X-Project-ID': 'test-project' + } + action = "queue_create" + body = {"queue_name": 'marsbar', + "metadata": { + "key": { + "key2": "value", + "key3": [1, 2, 3, 4, 5]}, + "messages": {"ttl": 600}, + } + } + + send_mock = mock.patch.object(self.protocol, 'sendMessage') + self.addCleanup(send_mock.stop) + send_mock.start() + + req = test_utils.create_request(action, body, headers) + + def validator(resp, isBinary): + resp = json.loads(resp) + self.assertEqual(resp['headers']['status'], 201) + + send_mock.side_effect = validator + self.protocol.onMessage(req, False) + + body["queue_name"] = "m@rsb@r" + req = test_utils.create_request(action, body, headers) + + def validator(resp, isBinary): + resp = json.loads(resp) + self.assertEqual(resp['headers']['status'], 400) + + send_mock.side_effect = validator + self.protocol.onMessage(req, False) + + body["queue_name"] = "marsbar" * 10 + req = test_utils.create_request(action, body, headers) + self.protocol.onMessage(req, False) + + def test_project_id_restriction(self): + headers = { + 'Client-ID': str(uuid.uuid4()), + 'X-Project-ID': 'test-project' * 30 + } + action = "queue_create" + body = {"queue_name": 'poptart'} + + send_mock = mock.patch.object(self.protocol, 'sendMessage') + self.addCleanup(send_mock.stop) + send_mock.start() + + req = test_utils.create_request(action, body, headers) + + def validator(resp, isBinary): + resp = json.loads(resp) + self.assertEqual(resp['headers']['status'], 400) + + send_mock.side_effect = validator + self.protocol.onMessage(req, False) + + headers['X-Project-ID'] = 'test-project' + req = test_utils.create_request(action, body, headers) + + def validator(resp, isBinary): + resp = json.loads(resp) + self.assertEqual(resp['headers']['status'], 201) + + send_mock.side_effect = validator + self.protocol.onMessage(req, False) + + def test_non_ascii_name(self): + test_params = ((u'/queues/non-ascii-n\u0153me', 'utf-8'), + (u'/queues/non-ascii-n\xc4me', 'iso8859-1')) + + headers = { + 'Client-ID': str(uuid.uuid4()), + 'X-Project-ID': 'test-project' * 30 + } + action = "queue_create" + body = {"queue_name": test_params[0]} + + send_mock = mock.patch.object(self.protocol, 'sendMessage') + self.addCleanup(send_mock.stop) + send_mock.start() + + req = test_utils.create_request(action, body, headers) + + def validator(resp, isBinary): + resp = json.loads(resp) + self.assertEqual(resp['headers']['status'], 400) + + send_mock.side_effect = validator + self.protocol.onMessage(req, False) + + body = {"queue_name": test_params[1]} + req = test_utils.create_request(action, body, headers) + + self.protocol.onMessage(req, False) + + def test_no_metadata(self): + headers = { + 'Client-ID': str(uuid.uuid4()), + 'X-Project-ID': 'test-project' + } + action = "queue_create" + body = {"queue_name": "fizbat"} + + send_mock = mock.patch.object(self.protocol, 'sendMessage') + self.addCleanup(send_mock.stop) + send_mock.start() + + req = test_utils.create_request(action, body, headers) + + def validator(resp, isBinary): + resp = json.loads(resp) + self.assertEqual(resp['headers']['status'], 201) + + send_mock.side_effect = validator + self.protocol.onMessage(req, False) + + def validator(resp, isBinary): + resp = json.loads(resp) + self.assertEqual(resp['headers']['status'], 204) + + send_mock.side_effect = validator + self.protocol.onMessage(req, False) + + @ddt.data('{', '[]', '.', ' ') + def test_bad_metadata(self, meta): + headers = { + 'Client-ID': str(uuid.uuid4()), + 'X-Project-ID': 'test-project' * 30 + } + action = "queue_create" + body = {"queue_name": "fizbat", + "metadata": meta} + + send_mock = mock.patch.object(self.protocol, 'sendMessage') + self.addCleanup(send_mock.stop) + send_mock.start() + + req = test_utils.create_request(action, body, headers) + + def validator(resp, isBinary): + resp = json.loads(resp) + self.assertEqual(resp['headers']['status'], 400) + + send_mock.side_effect = validator + self.protocol.onMessage(req, False) + + def test_too_much_metadata(self): + headers = { + 'Client-ID': str(uuid.uuid4()), + 'X-Project-ID': 'test-project' + } + action = "queue_create" + body = {"queue_name": "buttertoffee", + "metadata": {"messages": {"ttl": 600}, + "padding": "x"} + } + + max_size = self.transport_cfg.max_queue_metadata + body["metadata"]["padding"] = "x" * max_size + + send_mock = mock.patch.object(self.protocol, 'sendMessage') + self.addCleanup(send_mock.stop) + send_mock.start() + + req = test_utils.create_request(action, body, headers) + + def validator(resp, isBinary): + resp = json.loads(resp) + self.assertEqual(resp['headers']['status'], 400) + + send_mock.side_effect = validator + self.protocol.onMessage(req, False) + + def test_way_too_much_metadata(self): + headers = { + 'Client-ID': str(uuid.uuid4()), + 'X-Project-ID': 'test-project' + } + action = "queue_create" + body = {"queue_name": "peppermint", + "metadata": {"messages": {"ttl": 600}, + "padding": "x"} + } + + max_size = self.transport_cfg.max_queue_metadata + body["metadata"]["padding"] = "x" * max_size * 5 + + send_mock = mock.patch.object(self.protocol, 'sendMessage') + self.addCleanup(send_mock.stop) + send_mock.start() + + req = test_utils.create_request(action, body, headers) + + def validator(resp, isBinary): + resp = json.loads(resp) + self.assertEqual(resp['headers']['status'], 400) + + send_mock.side_effect = validator + self.protocol.onMessage(req, False) + + def test_update_metadata(self): + self.skip("Implement patch method") + headers = { + 'Client-ID': str(uuid.uuid4()), + 'X-Project-ID': 'test-project' + } + action = "queue_create" + body = {"queue_name": "bonobon"} + + send_mock = mock.patch.object(self.protocol, 'sendMessage') + self.addCleanup(send_mock.stop) + send_mock.start() + + # Create + req = test_utils.create_request(action, body, headers) + + def validator(resp, isBinary): + resp = json.loads(resp) + self.assertEqual(resp['headers']['status'], 201) + + send_mock.side_effect = validator + self.protocol.onMessage(req, False) + + # Set meta + meta1 = {"messages": {"ttl": 600}, "padding": "x"} + body["metadata"] = meta1 + + req = test_utils.create_request(action, body, headers) + + def validator(resp, isBinary): + resp = json.loads(resp) + self.assertEqual(resp['headers']['status'], 204) + + send_mock.side_effect = validator + self.protocol.onMessage(req, False) + + # Get + action = "queue_get" + body = {"queue_name": "bonobon"} + + req = test_utils.create_request(action, body, headers) + + def validator(resp, isBinary): + resp = json.loads(resp) + self.assertEqual(resp['headers']['status'], 204) + self.assertEqual(json.dumps(resp['body']), json.dumps(meta1)) + + send_mock.side_effect = validator + self.protocol.onMessage(req, False) + + # Update + action = "queue_create" + meta2 = {"messages": {"ttl": 100}, "padding": "y"} + body["metadata"] = meta2 + + req = test_utils.create_request(action, body, headers) + + def validator(resp, isBinary): + resp = json.loads(resp) + self.assertEqual(resp['headers']['status'], 204) + + send_mock.side_effect = validator + self.protocol.onMessage(req, False) + + # Get again + action = "queue_get" + body = {"queue_name": "bonobon"} + + req = test_utils.create_request(action, body, headers) + + def validator(resp, isBinary): + resp = json.loads(resp) + self.assertEqual(resp['headers']['status'], 200) + self.assertEqual(json.dumps(resp['body']), json.dumps(meta2)) + + send_mock.side_effect = validator + self.protocol.onMessage(req, False) + + def test_list(self): + arbitrary_number = 644079696574693 + project_id = str(arbitrary_number) + client_id = str(uuid.uuid4()) + headers = { + 'X-Project-ID': project_id, + 'Client-ID': client_id + } + + send_mock = mock.patch.object(self.protocol, 'sendMessage') + self.addCleanup(send_mock.stop) + send_mock.start() + + # NOTE(kgriffs): It's important that this one sort after the one + # above. This is in order to prove that bug/1236605 is fixed, and + # stays fixed! + # NOTE(vkmc): In websockets as well! + alt_project_id = str(arbitrary_number + 1) + + # List empty + action = "queue_list" + body = {} + + req = test_utils.create_request(action, body, headers) + + def validator(resp, isBinary): + resp = json.loads(resp) + self.assertEqual(resp['headers']['status'], 200) + self.assertEqual(resp['body']['queues'], []) + + send_mock.side_effect = validator + self.protocol.onMessage(req, False) + + # Payload exceeded + body = {'limit': 21} + req = test_utils.create_request(action, body, headers) + + def validator(resp, isBinary): + resp = json.loads(resp) + self.assertEqual(resp['headers']['status'], 400) + + send_mock.side_effect = validator + self.protocol.onMessage(req, False) + + # Create some + def create_queue(project_id, queue_name, metadata): + altheaders = {'Client-ID': client_id} + if project_id is not None: + altheaders['X-Project-ID'] = project_id + action = 'queue_create' + body['queue_name'] = queue_name + body['metadata'] = metadata + + req = test_utils.create_request(action, body, altheaders) + + def validator(resp, isBinary): + resp = json.loads(resp) + self.assertEqual(resp['headers']['status'], 201) + + send_mock.side_effect = validator + self.protocol.onMessage(req, False) + + create_queue(project_id, 'q1', {"node": 31}) + create_queue(project_id, 'q2', {"node": 32}) + create_queue(project_id, 'q3', {"node": 33}) + + create_queue(alt_project_id, 'q3', {"alt": 1}) + + # List (limit) + body = {'limit': 2} + req = test_utils.create_request(action, body, headers) + + def validator(resp, isBinary): + resp = json.loads(resp) + self.assertEqual(len(resp['body']['queues']), 2) + + send_mock.side_effect = validator + self.protocol.onMessage(req, False) + + # List (no metadata, get all) + body = {'limit': 5} + req = test_utils.create_request(action, body, headers) + + def validator(resp, isBinary): + resp = json.loads(resp) + self.assertEqual(resp['headers']['status'], 200) + # Ensure we didn't pick up the queue from the alt project. + self.assertEqual(len(resp['body']['queues']), 3) + + send_mock.side_effect = validator + self.protocol.onMessage(req, False) + + # List with metadata + body = {'detailed': True} + req = test_utils.create_request(action, body, headers) + + def validator(resp, isBinary): + resp = json.loads(resp) + self.assertEqual(resp['headers']['status'], 200) + + send_mock.side_effect = validator + self.protocol.onMessage(req, False) + + action = "queue_get" + body = {"queue_name": "q1"} + req = test_utils.create_request(action, body, headers) + + def validator(resp, isBinary): + resp = json.loads(resp) + self.assertEqual(resp['headers']['status'], 200) + self.assertEqual(json.dumps(resp['body']), + json.dumps({"node": 31})) + + send_mock.side_effect = validator + self.protocol.onMessage(req, False) + + # List tail + action = "queue_list" + body = {} + req = test_utils.create_request(action, body, headers) + + def validator(resp, isBinary): + resp = json.loads(resp) + self.assertEqual(resp['headers']['status'], 200) + + send_mock.side_effect = validator + self.protocol.onMessage(req, False) + + # List manually-constructed tail + body = {'marker': "zzz"} + req = test_utils.create_request(action, body, headers) + self.protocol.onMessage(req, False) + + +@testing.requires_mongodb +class TestQueueLifecycleMongoDB(QueueLifecycleBaseTest): + + config_file = 'websocket_mongodb.conf' + + def tearDown(self): + storage = self.boot.storage._storage + connection = storage.connection + + connection.drop_database(storage.queues_database) + + for db in storage.message_databases: + connection.drop_database(db) + + super(TestQueueLifecycleMongoDB, self).tearDown() diff --git a/zaqar/transport/websocket/driver.py b/zaqar/transport/websocket/driver.py index 666a509fb..401667a8d 100644 --- a/zaqar/transport/websocket/driver.py +++ b/zaqar/transport/websocket/driver.py @@ -13,7 +13,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -from autobahn.asyncio import websocket from oslo.config import cfg try: @@ -21,9 +20,11 @@ try: except ImportError: import trollius as asyncio +from zaqar.common import decorators from zaqar.i18n import _ import zaqar.openstack.common.log as logging -from zaqar.transport.websocket import protocol +from zaqar.transport.websocket import factory + _WS_OPTIONS = ( cfg.StrOpt('bind', default='127.0.0.1', @@ -54,6 +55,12 @@ class Driver(object): self._conf.register_opts(_WS_OPTIONS, group=_WS_GROUP) self._ws_conf = self._conf[_WS_GROUP] + @decorators.lazy_property(write=False) + def factory(self): + uri = 'ws://' + self._ws_conf.bind + ':' + str(self._ws_conf.port) + return factory.ProtocolFactory(uri, debug=self._ws_conf.debug, + handler=self._api) + def listen(self): """Self-host using 'bind' and 'port' from the WS config group.""" @@ -61,13 +68,9 @@ class Driver(object): LOG.info(msgtmpl, {'bind': self._ws_conf.bind, 'port': self._ws_conf.port}) - uri = 'ws://' + self._ws_conf.bind + ':' + str(self._ws_conf.port) - factory = websocket.WebSocketServerFactory(uri, - debug=self._ws_conf.debug) - factory.protocol = protocol.MessagingProtocol - loop = asyncio.get_event_loop() - coro = loop.create_server(factory, self._ws_conf.bind, + coro = loop.create_server(self.factory, + self._ws_conf.bind, self._ws_conf.port) server = loop.run_until_complete(coro) diff --git a/zaqar/transport/websocket/factory.py b/zaqar/transport/websocket/factory.py new file mode 100644 index 000000000..b43e07418 --- /dev/null +++ b/zaqar/transport/websocket/factory.py @@ -0,0 +1,32 @@ +# Copyright (c) 2015 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from autobahn.asyncio import websocket + +from zaqar.transport.websocket import protocol + + +class ProtocolFactory(websocket.WebSocketServerFactory): + + protocol = protocol.MessagingProtocol + + def __init__(self, uri, debug, handler): + websocket.WebSocketServerFactory.__init__(self, uri, debug) + self._handler = handler + + def __call__(self): + proto = self.protocol(self._handler) + proto.factory = self + return proto diff --git a/zaqar/transport/websocket/protocol.py b/zaqar/transport/websocket/protocol.py index 4ccda433b..5f3e11780 100644 --- a/zaqar/transport/websocket/protocol.py +++ b/zaqar/transport/websocket/protocol.py @@ -15,9 +15,23 @@ from autobahn.asyncio import websocket +import json + +from zaqar.api.v1_1 import request as schema_validator +from zaqar.common.api import request +from zaqar.common.api import response +from zaqar.common import errors +import zaqar.openstack.common.log as logging + +LOG = logging.getLogger(__name__) + class MessagingProtocol(websocket.WebSocketServerProtocol): + def __init__(self, handler): + websocket.WebSocketServerProtocol.__init__(self) + self._handler = handler + def onConnect(self, request): print("Client connecting: {0}".format(request.peer)) @@ -26,10 +40,66 @@ class MessagingProtocol(websocket.WebSocketServerProtocol): def onMessage(self, payload, isBinary): if isBinary: + # TODO(vkmc): Binary support will be added in the next cycle + # For now, we are returning an invalid request response print("Binary message received: {0} bytes".format(len(payload))) + req = self._dummy_request() + body = {'error': 'Schema validation failed.'} + headers = {'status': 400} + resp = response.Response(req, body, headers) + return resp else: - print("Text message received: {0}".format(payload.decode('utf8'))) - self.sendMessage(payload, isBinary) + try: + print("Text message received: {0}". + format(payload.decode('utf8'))) + pl = json.loads(payload) + req = self._create_request(pl) + resp = (self._validate_request(pl, req) or + self._handler.process_request(req)) + except ValueError as ex: + LOG.exception(ex) + req = self._dummy_request() + body = {'error': str(ex)} + headers = {'status': 400} + resp = response.Response(req, body, headers) + return resp + + resp_json = json.dumps(resp.get_response()) + self.sendMessage(resp_json, isBinary) def onClose(self, wasClean, code, reason): print("WebSocket connection closed: {0}".format(reason)) + + @staticmethod + def _create_request(pl): + action = pl.get('action') + body = pl.get('body', {}) + headers = pl.get('headers') + + return request.Request(action=action, body=body, + headers=headers, api="v1.1") + + @staticmethod + def _validate_request(pl, req): + try: + action = pl.get('action') + validator = schema_validator.RequestSchema() + is_valid = validator.validate(action=action, body=pl) + except errors.InvalidAction as ex: + body = {'error': str(ex)} + headers = {'status': 400} + resp = response.Response(req, body, headers) + return resp + else: + if not is_valid: + body = {'error': 'Schema validation failed.'} + headers = {'status': 400} + resp = response.Response(req, body, headers) + return resp + + return None + + @staticmethod + def _dummy_request(): + action = 'None' + return request.Request(action) \ No newline at end of file