From 51246e1a8c91e74453289a0f0710bc9ea2297e5f Mon Sep 17 00:00:00 2001 From: Dan Prince Date: Wed, 13 Jul 2016 15:18:32 -0400 Subject: [PATCH] Create a websocket transport This patch adds a new WebsocketTransport class that will help consolidate common websocket client code that is useful for connecting to and subscribing to Zaqar queues via websockets. We are already using a similar client within TripleO and it makes sense to have it live in zaqarclient where it can be shared and used by multiple projects. Change-Id: Ia2a8deb599252d8308e44d595eb2bf443999aaad --- requirements.txt | 1 + setup.cfg | 4 + tests/unit/transport/test_ws.py | 80 +++++++++++++++++++ zaqarclient/transport/base.py | 13 ++++ zaqarclient/transport/http.py | 15 ---- zaqarclient/transport/ws.py | 134 ++++++++++++++++++++++++++++++++ 6 files changed, 232 insertions(+), 15 deletions(-) create mode 100644 tests/unit/transport/test_ws.py create mode 100644 zaqarclient/transport/ws.py diff --git a/requirements.txt b/requirements.txt index 5478f53c..ee985072 100644 --- a/requirements.txt +++ b/requirements.txt @@ -9,6 +9,7 @@ jsonschema!=2.5.0,<3.0.0,>=2.0.0 # MIT # Oslo Packages oslo.i18n>=2.1.0 # Apache-2.0 +oslo.utils>=3.15.0 # Apache-2.0 python-keystoneclient!=1.8.0,!=2.1.0,>=1.7.0 # Apache-2.0 osc-lib>=0.1.0 # Apache-2.0 diff --git a/setup.cfg b/setup.cfg index d84c1369..e26d7173 100644 --- a/setup.cfg +++ b/setup.cfg @@ -40,6 +40,10 @@ zaqarclient.transport = http.v2 = zaqarclient.transport.http:HttpTransport https.v2 = zaqarclient.transport.http:HttpTransport + ws.v1 = zaqarclient.transport.ws:WebsocketTransport + ws.v1.1 = zaqarclient.transport.ws:WebsocketTransport + ws.v2 = zaqarclient.transport.ws:WebsocketTransport + zaqarclient.api = queues.v1 = zaqarclient.queues.v1.api:V1 queues.v1.1 = zaqarclient.queues.v1.api:V1_1 diff --git a/tests/unit/transport/test_ws.py b/tests/unit/transport/test_ws.py new file mode 100644 index 00000000..cc7bef97 --- /dev/null +++ b/tests/unit/transport/test_ws.py @@ -0,0 +1,80 @@ +# -*- coding: utf-8 -*- + +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import json +import mock + +from zaqarclient.tests import base +from zaqarclient.transport import request +from zaqarclient.transport import ws + + +class TestWsTransport(base.TestBase): + + def setUp(self): + super(TestWsTransport, self).setUp() + os_opts = { + 'os_auth_token': 'FAKE_TOKEN', + 'os_auth_url': 'http://127.0.0.0:5000/v3', + 'os_project_id': 'admin', + 'os_service_type': 'messaging-websocket', + } + auth_opts = {'backend': 'keystone', + 'options': os_opts} + self.options = {'auth_opts': auth_opts} + self.endpoint = 'ws://127.0.0.1:9000' + + @mock.patch.object(ws.WebsocketTransport, "_create_connection") + def test_make_client(self, ws_create_connection): + ws_create_connection.return_value.recv.return_value = json.dumps({ + "headers": { + "status": 200 + } + }) + + transport = ws.WebsocketTransport(self.options) + req = request.Request(self.endpoint) + transport.send(req) + ws_create_connection.assert_called_with("ws://127.0.0.1:9000") + + @mock.patch.object(ws.WebsocketTransport, "recv") + @mock.patch.object(ws.WebsocketTransport, "_create_connection") + def test_recv(self, ws_create_connection, recv_mock): + + send_ack = { + "headers": { + "status": 200 + } + } + + recv_mock.side_effect = [send_ack, send_ack, send_ack, { + "body": { + "payload": "foo" + } + }, send_ack] + + transport = ws.WebsocketTransport(self.options) + req = request.Request(self.endpoint) + transport.send(req) + + count = 0 + + while True: + count += 1 + data = transport.recv() + if 'body' in data: + self.assertEqual(data['body']['payload'], 'foo') + break + if count >= 4: + self.fail('Failed to recieve expected message.') diff --git a/zaqarclient/transport/base.py b/zaqarclient/transport/base.py index c2b11d03..f2870293 100644 --- a/zaqarclient/transport/base.py +++ b/zaqarclient/transport/base.py @@ -17,10 +17,23 @@ import abc import six +from zaqarclient.transport import errors + @six.add_metaclass(abc.ABCMeta) class Transport(object): + # common HTTP codes used by multiple transports + http_to_zaqar = { + 400: errors.MalformedRequest, + 401: errors.UnauthorizedError, + 403: errors.ForbiddenError, + 404: errors.ResourceNotFound, + 409: errors.ConflictError, + 500: errors.InternalServerError, + 503: errors.ServiceUnavailableError + } + def __init__(self, options): self.options = options diff --git a/zaqarclient/transport/http.py b/zaqarclient/transport/http.py index 0f675a3a..ec61751b 100644 --- a/zaqarclient/transport/http.py +++ b/zaqarclient/transport/http.py @@ -18,26 +18,11 @@ import json from zaqarclient.common import http from zaqarclient.transport import base -# NOTE(flaper87): Something is completely borked -# with some imports. Using `from ... import errors` -# will end up importing `zaqarclient.errors` instead -# of transports -import zaqarclient.transport.errors as errors from zaqarclient.transport import response class HttpTransport(base.Transport): - http_to_zaqar = { - 400: errors.MalformedRequest, - 401: errors.UnauthorizedError, - 403: errors.ForbiddenError, - 404: errors.ResourceNotFound, - 409: errors.ConflictError, - 500: errors.InternalServerError, - 503: errors.ServiceUnavailableError - } - def __init__(self, options): super(HttpTransport, self).__init__(options) self.client = http.Client() diff --git a/zaqarclient/transport/ws.py b/zaqarclient/transport/ws.py new file mode 100644 index 00000000..553697b2 --- /dev/null +++ b/zaqarclient/transport/ws.py @@ -0,0 +1,134 @@ +# Copyright 2016 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +import json +import logging +import uuid + +from oslo_utils import importutils + +from zaqarclient.transport import base +from zaqarclient.transport import request +from zaqarclient.transport import response + +websocket = importutils.try_import('websocket') + +LOG = logging.getLogger(__name__) + + +class WebsocketTransport(base.Transport): + + """Zaqar websocket transport. + + *NOTE:* Zaqar's websocket interface does not yet appear to work + well with parameters. Until it does the websocket transport may not + integrate with all of zaqarclients higherlevel request. Even so... + websockets today is still quite usable and use of the transport + via lower level API's in zaqarclient work quite nicely. Example: + + conf = { + 'auth_opts': { + 'backend': 'keystone', + 'options': { + 'os_auth_token': ks.auth_token, + 'os_project_id': CONF.zaqar.project_id + } + } + } + + endpoint = 'ws://172.19.0.3:9000' + + with transport.get_transport_for(endpoint, options=conf) as ws: + req = request.Request(endpoint, 'queue_create', + content=json.dumps({'queue_name': 'foo'})) + resp = ws.send(req) + + """ + def __init__(self, options): + super(WebsocketTransport, self).__init__(options) + self._project_id = options['auth_opts']['options']['os_project_id'] + self._token = options['auth_opts']['options']['os_auth_token'] + self._websocket_client_id = None + self._ws = None + + def _init_client(self, endpoint): + """Initialize a websocket transport client. + + :param endpoint: The websocket endpoint. Example: ws://127.0.0.1:9000/. + Required. + :type endpoint: string + """ + self._websocket_client_id = str(uuid.uuid4()) + + LOG.debug('Instantiating messaging websocket client: %s', endpoint) + self._ws = self._create_connection(endpoint) + + auth_req = request.Request(endpoint, 'authenticate', + headers={'X-Auth-Token': self._token}) + self.send(auth_req) + + def _create_connection(self, endpoint): + return websocket.create_connection(endpoint) + + def send(self, request): + if not self._ws: + self._init_client(request.endpoint) + + headers = request.headers.copy() + headers.update({ + 'Client-ID': self._websocket_client_id, + 'X-Project-ID': self._project_id + }) + + msg = {'action': request.operation, 'headers': headers} + if request.content: + msg['body'] = json.loads(request.content) + # NOTE(dprince): Zaqar websockets do not yet seem to support params?! + # Users of this protocol will need to send everything in the body. + if request.params: + LOG.warning('Websocket transport does not yet support params.') + self._ws.send(json.dumps(msg)) + ret = self.recv() + + resp = response.Response(request, json.dumps(ret.get('body', '')), + headers=ret['headers'], + status_code=int(ret['headers']['status'])) + + if resp.status_code in self.http_to_zaqar: + kwargs = {} + try: + error_body = json.loads(resp.content) + kwargs['title'] = 'Websocket Transport Error' + kwargs['description'] = error_body['error'] + except Exception: + kwargs['text'] = resp.content + raise self.http_to_zaqar[resp.status_code](**kwargs) + + return resp + + def recv(self): + return json.loads(self._ws.recv()) + + def cleanup(self): + if self._ws: + self._ws.close() + self._ws = None + + def __enter__(self): + """Return self to allow usage as a context manager""" + return self + + def __exit__(self, *exc): + """Call cleanup when exiting the context manager""" + self.cleanup()