Merge "Adds message processing to WebSockets driver"

This commit is contained in:
Jenkins 2015-03-31 13:06:00 +00:00 committed by Gerrit Code Review
commit 208dddaf5f
15 changed files with 879 additions and 39 deletions

View File

@ -0,0 +1,19 @@
[DEFAULT]
[drivers]
# Transport driver to use (string value)
transport = websocket
# Storage driver to use (string value)
storage = mongodb
[drivers:management_store:mongodb]
# Mongodb Connection URI
uri = mongodb://127.0.0.1:27017
[drivers:message_store:mongodb]
# Mongodb Connection URI
uri = mongodb://127.0.0.1:27017

View File

@ -17,6 +17,7 @@ from zaqar import bootstrap
from zaqar.common import errors
from zaqar.storage import pooling
from zaqar.tests import base
from zaqar.transport import websocket
from zaqar.transport import wsgi
@ -44,3 +45,7 @@ class TestBootstrap(base.TestBase):
def test_transport_wsgi(self):
bootstrap = self._bootstrap('wsgi_mongodb.conf')
self.assertIsInstance(bootstrap.transport, wsgi.Driver)
def test_transport_websocket(self):
bootstrap = self._bootstrap('websocket_mongodb.conf')
self.assertIsInstance(bootstrap.transport, websocket.Driver)

View File

@ -17,7 +17,6 @@ from zaqar.common.api import utils as api_utils
from zaqar.i18n import _
import zaqar.openstack.common.log as logging
from zaqar.storage import errors as storage_errors
from zaqar.transport import utils
from zaqar.transport import validation
LOG = logging.getLogger(__name__)
@ -69,7 +68,7 @@ class Endpoints(object):
LOG.debug(ex)
headers = {'status': 400}
return api_utils.error_response(req, ex, headers)
except storage_errors.BaseException as ex:
except storage_errors.ExceptionBase as ex:
LOG.exception(ex)
error = 'Queues could not be listed.'
headers = {'status': 503}
@ -79,7 +78,7 @@ class Endpoints(object):
queues = list(next(results))
# Got some. Prepare the response.
body = utils.to_json({'queues': queues})
body = {'queues': queues}
headers = {'status': 200}
resp = response.Response(req, body, headers)
@ -97,7 +96,7 @@ class Endpoints(object):
"""
project_id = req._headers.get('X-Project-ID')
queue_name = req._body.get('queue_name')
metadata = req._body.get('metadata')
metadata = req._body.get('metadata', {})
LOG.debug(u'Queue create - queue: %(queue)s, project: %(project)s',
{'queue': queue_name,
@ -105,7 +104,7 @@ class Endpoints(object):
try:
self._validate.queue_identification(queue_name, project_id)
self._validate.queue_metadata_length(len(metadata))
self._validate.queue_metadata_length(len(str(metadata)))
created = self._queue_controller.create(queue_name,
metadata=metadata,
project=project_id)
@ -113,13 +112,13 @@ class Endpoints(object):
LOG.debug(ex)
headers = {'status': 400}
return api_utils.error_response(req, ex, headers)
except storage_errors.BaseException as ex:
except storage_errors.ExceptionBase as ex:
LOG.exception(ex)
error = _('Queue "%s" could not be created.') % queue_name
error = _('Queue %s could not be created.') % queue_name
headers = {'status': 503}
return api_utils.error_response(req, ex, headers, error)
else:
body = _('Queue "%s" created.') % queue_name
body = _('Queue %s created.') % queue_name
headers = {'status': 201} if created else {'status': 204}
resp = response.Response(req, body, headers)
return resp
@ -140,13 +139,13 @@ class Endpoints(object):
{'queue': queue_name, 'project': project_id})
try:
self._queue_controller.delete(queue_name, project=project_id)
except storage_errors.BaseException as ex:
except storage_errors.ExceptionBase as ex:
LOG.exception(ex)
error = _('Queue "%s" could not be deleted.') % queue_name
error = _('Queue %s could not be deleted.') % queue_name
headers = {'status': 503}
return api_utils.error_response(req, ex, headers, error)
else:
body = _('Queue "%s" removed.') % queue_name
body = _('Queue %s removed.') % queue_name
headers = {'status': 204}
resp = response.Response(req, body, headers)
return resp
@ -172,16 +171,16 @@ class Endpoints(object):
project=project_id)
except storage_errors.DoesNotExist as ex:
LOG.debug(ex)
error = _('Queue "%s" does not exist.') % queue_name
error = _('Queue %s does not exist.') % queue_name
headers = {'status': 404}
return api_utils.error_response(req, ex, headers, error)
except storage_errors.BaseException as ex:
except storage_errors.ExceptionBase as ex:
LOG.exception(ex)
headers = {'status': 503}
error = _('Cannot retrieve queue "%s".') % queue_name
error = _('Cannot retrieve queue %s.') % queue_name
return api_utils.error_response(req, ex, headers, error)
else:
body = utils.to_json(resp_dict)
body = resp_dict
headers = {'status': 200}
resp = response.Response(req, body, headers)
return resp
@ -198,10 +197,14 @@ class Endpoints(object):
project_id = req._headers.get('X-Project-ID')
queue_name = req._body.get('queue_name')
LOG.debug(u'Queue get queue stats - queue: %(queue)s, '
u'project: %(project)s',
{'queue': queue_name, 'project': project_id})
try:
resp_dict = self._queue_controller.stats(queue_name,
project=project_id)
body = utils.to_json(resp_dict)
body = resp_dict
except storage_errors.QueueDoesNotExist as ex:
LOG.exception(ex)
resp_dict = {
@ -211,13 +214,13 @@ class Endpoints(object):
'total': 0
}
}
body = utils.to_json(resp_dict)
body = resp_dict
headers = {'status': 404}
resp = response.Response(req, body, headers)
return resp
except storage_errors.BaseException as ex:
except storage_errors.ExceptionBase as ex:
LOG.exception(ex)
error = _('Cannot retrieve queue "%s" stats.') % queue_name
error = _('Cannot retrieve queue %s stats.') % queue_name
headers = {'status': 503}
return api_utils.error_response(req, ex, headers, error)
else:

View File

@ -126,7 +126,7 @@ class RequestSchema(api.Api):
'queue_get': {
'properties': {
'action': {'enum': ['queue_delete']},
'action': {'enum': ['queue_get']},
'headers': {
'type': 'object',
'properties': headers,

View File

@ -48,6 +48,8 @@ class Request(object):
return json.loads(self._body)
return None
def __repr__(self):
return "{'api': %s, 'action': %s, 'headers': %s, 'body': %s}" % (
self._api, self._action, self._headers, self._body)
def get_request(self):
return {'action': self._action,
'body': self._body,
'headers': self._headers,
'api': self._api}

View File

@ -36,6 +36,7 @@ class Response(object):
self._body = body
self._headers = headers or {}
def __repr__(self):
return "{'req': %s, 'headers': %s, 'body': %s}" % (
self._request, self._headers, self._body)
def get_response(self):
return {'request': self._request.get_request(),
'body': self._body,
'headers': self._headers}

View File

@ -17,7 +17,6 @@ import functools
import zaqar.common.api.response as response
from zaqar.i18n import _
import zaqar.openstack.common.log as logging
from zaqar.transport import utils
LOG = logging.getLogger(__name__)
@ -43,7 +42,6 @@ def raises_conn_error(func):
def error_response(req, exception, headers=None, error=None):
body = utils.to_json({'exception': exception,
'error': error})
body = {'exception': str(exception), 'error': error}
resp = response.Response(req, body, headers)
return resp

View File

@ -0,0 +1,104 @@
# Copyright (c) 2015 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy
# of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.
from oslo_serialization import jsonutils
from zaqar import bootstrap
from zaqar import tests as testing
from zaqar.transport import validation
from zaqar.transport.websocket import driver
class TestBase(testing.TestBase):
config_file = None
def setUp(self):
super(TestBase, self).setUp()
if not self.config_file:
self.skipTest("No config specified")
self.conf.register_opts(bootstrap._GENERAL_OPTIONS)
self.conf.register_opts(validation._TRANSPORT_LIMITS_OPTIONS,
group=validation._TRANSPORT_LIMITS_GROUP)
self.transport_cfg = self.conf[validation._TRANSPORT_LIMITS_GROUP]
self.conf.register_opts(driver._WS_OPTIONS,
group=driver._WS_GROUP)
self.wsgi_cfg = self.conf[driver._WS_GROUP]
self.conf.unreliable = True
self.conf.admin_mode = True
self.boot = bootstrap.Bootstrap(self.conf)
self.transport = self.boot.transport
def tearDown(self):
if self.conf.pooling:
self.boot.control.pools_controller.drop_all()
self.boot.control.catalogue_controller.drop_all()
super(TestBase, self).tearDown()
class TestBaseFaulty(TestBase):
"""This test ensures we aren't letting any exceptions go unhandled."""
class V1Base(TestBase):
"""Base class for V1 API Tests.
Should contain methods specific to V1 of the API
"""
pass
class V1BaseFaulty(TestBaseFaulty):
"""Base class for V1 API Faulty Tests.
Should contain methods specific to V1 exception testing
"""
pass
class V1_1Base(TestBase):
"""Base class for V1.1 API Tests.
Should contain methods specific to V1.1 of the API
"""
def _empty_message_list(self, body):
self.assertEqual(jsonutils.loads(body[0])['messages'], [])
class V1_1BaseFaulty(TestBaseFaulty):
"""Base class for V1.1 API Faulty Tests.
Should contain methods specific to V1.1 exception testing
"""
pass
class V2Base(V1_1Base):
"""Base class for V2 API Tests.
Should contain methods specific to V2 of the API
"""
class V2BaseFaulty(V1_1BaseFaulty):
"""Base class for V2 API Faulty Tests.
Should contain methods specific to V2 exception testing
"""

View File

@ -0,0 +1,19 @@
# Copyright (c) 2015 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy
# of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.
import json
def create_request(action, body, headers):
return json.dumps({"action": action, "body": body, "headers": headers})

View File

@ -0,0 +1,584 @@
# Copyright (c) 2015 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy
# of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.
import json
import uuid
import ddt
import mock
from zaqar import tests as testing
from zaqar.tests.unit.transport.websocket import base
from zaqar.tests.unit.transport.websocket import utils as test_utils
@ddt.ddt
class QueueLifecycleBaseTest(base.V1_1Base):
config_file = "websocket_mongodb.conf"
def setUp(self):
super(QueueLifecycleBaseTest, self).setUp()
self.protocol = self.transport.factory()
def test_empty_project_id(self):
action = "queue_create"
body = {"queue_name": "kitkat",
"metadata": {
"key": {
"key2": "value",
"key3": [1, 2, 3, 4, 5]}
}
}
headers = {'Client-ID': str(uuid.uuid4())}
req = test_utils.create_request(action, body, headers)
def validator(resp, isBinary):
resp = json.loads(resp)
self.assertEqual(resp['headers']['status'], 400)
with mock.patch.object(self.protocol, 'sendMessage') as msg_mock:
msg_mock.side_effect = validator
self.protocol.onMessage(req, False)
@ddt.data('480924', 'foo')
def test_basics_thoroughly(self, project_id):
# Stats are empty - queue not created yet
action = "queue_get_stats"
body = {"queue_name": "gummybears"}
headers = {
'Client-ID': str(uuid.uuid4()),
'X-Project-ID': project_id
}
send_mock = mock.patch.object(self.protocol, 'sendMessage')
self.addCleanup(send_mock.stop)
send_mock.start()
req = test_utils.create_request(action, body, headers)
def validator(resp, isBinary):
resp = json.loads(resp)
self.assertEqual(resp['headers']['status'], 404)
send_mock.side_effect = validator
self.protocol.onMessage(req, False)
# Create
action = "queue_create"
body = {"queue_name": "gummybears",
"metadata": {
"key": {
"key2": "value",
"key3": [1, 2, 3, 4, 5]},
"messages": {"ttl": 600},
}
}
req = test_utils.create_request(action, body, headers)
def validator(resp, isBinary):
resp = json.loads(resp)
self.assertEqual(resp['headers']['status'], 201)
send_mock.side_effect = validator
self.protocol.onMessage(req, False)
# Fetch metadata
action = "queue_get"
body = {"queue_name": "gummybears"}
meta = {"messages": {"ttl": 600},
"key": {
"key2": "value",
"key3": [1, 2, 3, 4, 5]}
}
req = test_utils.create_request(action, body, headers)
def validator(resp, isBinary):
resp = json.loads(resp)
self.assertEqual(resp['headers']['status'], 200)
self.assertEqual(json.dumps(resp['body']), json.dumps(meta))
send_mock.side_effect = validator
self.protocol.onMessage(req, False)
# Stats empty queue
action = "queue_get_stats"
body = {"queue_name": "gummybears"}
req = test_utils.create_request(action, body, headers)
def validator(resp, isBinary):
resp = json.loads(resp)
self.assertEqual(resp['headers']['status'], 200)
send_mock.side_effect = validator
self.protocol.onMessage(req, False)
# Delete
action = "queue_delete"
body = {"queue_name": "gummybears"}
req = test_utils.create_request(action, body, headers)
def validator(resp, isBinary):
resp = json.loads(resp)
self.assertEqual(resp['headers']['status'], 204)
send_mock.side_effect = validator
self.protocol.onMessage(req, False)
# Get non-existent stats
action = "queue_get_stats"
body = {"queue_name": "gummybears"}
req = test_utils.create_request(action, body, headers)
def validator(resp, isBinary):
resp = json.loads(resp)
self.assertEqual(resp['headers']['status'], 404)
send_mock.side_effect = validator
self.protocol.onMessage(req, False)
def test_name_restrictions(self):
headers = {
'Client-ID': str(uuid.uuid4()),
'X-Project-ID': 'test-project'
}
action = "queue_create"
body = {"queue_name": 'marsbar',
"metadata": {
"key": {
"key2": "value",
"key3": [1, 2, 3, 4, 5]},
"messages": {"ttl": 600},
}
}
send_mock = mock.patch.object(self.protocol, 'sendMessage')
self.addCleanup(send_mock.stop)
send_mock.start()
req = test_utils.create_request(action, body, headers)
def validator(resp, isBinary):
resp = json.loads(resp)
self.assertEqual(resp['headers']['status'], 201)
send_mock.side_effect = validator
self.protocol.onMessage(req, False)
body["queue_name"] = "m@rsb@r"
req = test_utils.create_request(action, body, headers)
def validator(resp, isBinary):
resp = json.loads(resp)
self.assertEqual(resp['headers']['status'], 400)
send_mock.side_effect = validator
self.protocol.onMessage(req, False)
body["queue_name"] = "marsbar" * 10
req = test_utils.create_request(action, body, headers)
self.protocol.onMessage(req, False)
def test_project_id_restriction(self):
headers = {
'Client-ID': str(uuid.uuid4()),
'X-Project-ID': 'test-project' * 30
}
action = "queue_create"
body = {"queue_name": 'poptart'}
send_mock = mock.patch.object(self.protocol, 'sendMessage')
self.addCleanup(send_mock.stop)
send_mock.start()
req = test_utils.create_request(action, body, headers)
def validator(resp, isBinary):
resp = json.loads(resp)
self.assertEqual(resp['headers']['status'], 400)
send_mock.side_effect = validator
self.protocol.onMessage(req, False)
headers['X-Project-ID'] = 'test-project'
req = test_utils.create_request(action, body, headers)
def validator(resp, isBinary):
resp = json.loads(resp)
self.assertEqual(resp['headers']['status'], 201)
send_mock.side_effect = validator
self.protocol.onMessage(req, False)
def test_non_ascii_name(self):
test_params = ((u'/queues/non-ascii-n\u0153me', 'utf-8'),
(u'/queues/non-ascii-n\xc4me', 'iso8859-1'))
headers = {
'Client-ID': str(uuid.uuid4()),
'X-Project-ID': 'test-project' * 30
}
action = "queue_create"
body = {"queue_name": test_params[0]}
send_mock = mock.patch.object(self.protocol, 'sendMessage')
self.addCleanup(send_mock.stop)
send_mock.start()
req = test_utils.create_request(action, body, headers)
def validator(resp, isBinary):
resp = json.loads(resp)
self.assertEqual(resp['headers']['status'], 400)
send_mock.side_effect = validator
self.protocol.onMessage(req, False)
body = {"queue_name": test_params[1]}
req = test_utils.create_request(action, body, headers)
self.protocol.onMessage(req, False)
def test_no_metadata(self):
headers = {
'Client-ID': str(uuid.uuid4()),
'X-Project-ID': 'test-project'
}
action = "queue_create"
body = {"queue_name": "fizbat"}
send_mock = mock.patch.object(self.protocol, 'sendMessage')
self.addCleanup(send_mock.stop)
send_mock.start()
req = test_utils.create_request(action, body, headers)
def validator(resp, isBinary):
resp = json.loads(resp)
self.assertEqual(resp['headers']['status'], 201)
send_mock.side_effect = validator
self.protocol.onMessage(req, False)
def validator(resp, isBinary):
resp = json.loads(resp)
self.assertEqual(resp['headers']['status'], 204)
send_mock.side_effect = validator
self.protocol.onMessage(req, False)
@ddt.data('{', '[]', '.', ' ')
def test_bad_metadata(self, meta):
headers = {
'Client-ID': str(uuid.uuid4()),
'X-Project-ID': 'test-project' * 30
}
action = "queue_create"
body = {"queue_name": "fizbat",
"metadata": meta}
send_mock = mock.patch.object(self.protocol, 'sendMessage')
self.addCleanup(send_mock.stop)
send_mock.start()
req = test_utils.create_request(action, body, headers)
def validator(resp, isBinary):
resp = json.loads(resp)
self.assertEqual(resp['headers']['status'], 400)
send_mock.side_effect = validator
self.protocol.onMessage(req, False)
def test_too_much_metadata(self):
headers = {
'Client-ID': str(uuid.uuid4()),
'X-Project-ID': 'test-project'
}
action = "queue_create"
body = {"queue_name": "buttertoffee",
"metadata": {"messages": {"ttl": 600},
"padding": "x"}
}
max_size = self.transport_cfg.max_queue_metadata
body["metadata"]["padding"] = "x" * max_size
send_mock = mock.patch.object(self.protocol, 'sendMessage')
self.addCleanup(send_mock.stop)
send_mock.start()
req = test_utils.create_request(action, body, headers)
def validator(resp, isBinary):
resp = json.loads(resp)
self.assertEqual(resp['headers']['status'], 400)
send_mock.side_effect = validator
self.protocol.onMessage(req, False)
def test_way_too_much_metadata(self):
headers = {
'Client-ID': str(uuid.uuid4()),
'X-Project-ID': 'test-project'
}
action = "queue_create"
body = {"queue_name": "peppermint",
"metadata": {"messages": {"ttl": 600},
"padding": "x"}
}
max_size = self.transport_cfg.max_queue_metadata
body["metadata"]["padding"] = "x" * max_size * 5
send_mock = mock.patch.object(self.protocol, 'sendMessage')
self.addCleanup(send_mock.stop)
send_mock.start()
req = test_utils.create_request(action, body, headers)
def validator(resp, isBinary):
resp = json.loads(resp)
self.assertEqual(resp['headers']['status'], 400)
send_mock.side_effect = validator
self.protocol.onMessage(req, False)
def test_update_metadata(self):
self.skip("Implement patch method")
headers = {
'Client-ID': str(uuid.uuid4()),
'X-Project-ID': 'test-project'
}
action = "queue_create"
body = {"queue_name": "bonobon"}
send_mock = mock.patch.object(self.protocol, 'sendMessage')
self.addCleanup(send_mock.stop)
send_mock.start()
# Create
req = test_utils.create_request(action, body, headers)
def validator(resp, isBinary):
resp = json.loads(resp)
self.assertEqual(resp['headers']['status'], 201)
send_mock.side_effect = validator
self.protocol.onMessage(req, False)
# Set meta
meta1 = {"messages": {"ttl": 600}, "padding": "x"}
body["metadata"] = meta1
req = test_utils.create_request(action, body, headers)
def validator(resp, isBinary):
resp = json.loads(resp)
self.assertEqual(resp['headers']['status'], 204)
send_mock.side_effect = validator
self.protocol.onMessage(req, False)
# Get
action = "queue_get"
body = {"queue_name": "bonobon"}
req = test_utils.create_request(action, body, headers)
def validator(resp, isBinary):
resp = json.loads(resp)
self.assertEqual(resp['headers']['status'], 204)
self.assertEqual(json.dumps(resp['body']), json.dumps(meta1))
send_mock.side_effect = validator
self.protocol.onMessage(req, False)
# Update
action = "queue_create"
meta2 = {"messages": {"ttl": 100}, "padding": "y"}
body["metadata"] = meta2
req = test_utils.create_request(action, body, headers)
def validator(resp, isBinary):
resp = json.loads(resp)
self.assertEqual(resp['headers']['status'], 204)
send_mock.side_effect = validator
self.protocol.onMessage(req, False)
# Get again
action = "queue_get"
body = {"queue_name": "bonobon"}
req = test_utils.create_request(action, body, headers)
def validator(resp, isBinary):
resp = json.loads(resp)
self.assertEqual(resp['headers']['status'], 200)
self.assertEqual(json.dumps(resp['body']), json.dumps(meta2))
send_mock.side_effect = validator
self.protocol.onMessage(req, False)
def test_list(self):
arbitrary_number = 644079696574693
project_id = str(arbitrary_number)
client_id = str(uuid.uuid4())
headers = {
'X-Project-ID': project_id,
'Client-ID': client_id
}
send_mock = mock.patch.object(self.protocol, 'sendMessage')
self.addCleanup(send_mock.stop)
send_mock.start()
# NOTE(kgriffs): It's important that this one sort after the one
# above. This is in order to prove that bug/1236605 is fixed, and
# stays fixed!
# NOTE(vkmc): In websockets as well!
alt_project_id = str(arbitrary_number + 1)
# List empty
action = "queue_list"
body = {}
req = test_utils.create_request(action, body, headers)
def validator(resp, isBinary):
resp = json.loads(resp)
self.assertEqual(resp['headers']['status'], 200)
self.assertEqual(resp['body']['queues'], [])
send_mock.side_effect = validator
self.protocol.onMessage(req, False)
# Payload exceeded
body = {'limit': 21}
req = test_utils.create_request(action, body, headers)
def validator(resp, isBinary):
resp = json.loads(resp)
self.assertEqual(resp['headers']['status'], 400)
send_mock.side_effect = validator
self.protocol.onMessage(req, False)
# Create some
def create_queue(project_id, queue_name, metadata):
altheaders = {'Client-ID': client_id}
if project_id is not None:
altheaders['X-Project-ID'] = project_id
action = 'queue_create'
body['queue_name'] = queue_name
body['metadata'] = metadata
req = test_utils.create_request(action, body, altheaders)
def validator(resp, isBinary):
resp = json.loads(resp)
self.assertEqual(resp['headers']['status'], 201)
send_mock.side_effect = validator
self.protocol.onMessage(req, False)
create_queue(project_id, 'q1', {"node": 31})
create_queue(project_id, 'q2', {"node": 32})
create_queue(project_id, 'q3', {"node": 33})
create_queue(alt_project_id, 'q3', {"alt": 1})
# List (limit)
body = {'limit': 2}
req = test_utils.create_request(action, body, headers)
def validator(resp, isBinary):
resp = json.loads(resp)
self.assertEqual(len(resp['body']['queues']), 2)
send_mock.side_effect = validator
self.protocol.onMessage(req, False)
# List (no metadata, get all)
body = {'limit': 5}
req = test_utils.create_request(action, body, headers)
def validator(resp, isBinary):
resp = json.loads(resp)
self.assertEqual(resp['headers']['status'], 200)
# Ensure we didn't pick up the queue from the alt project.
self.assertEqual(len(resp['body']['queues']), 3)
send_mock.side_effect = validator
self.protocol.onMessage(req, False)
# List with metadata
body = {'detailed': True}
req = test_utils.create_request(action, body, headers)
def validator(resp, isBinary):
resp = json.loads(resp)
self.assertEqual(resp['headers']['status'], 200)
send_mock.side_effect = validator
self.protocol.onMessage(req, False)
action = "queue_get"
body = {"queue_name": "q1"}
req = test_utils.create_request(action, body, headers)
def validator(resp, isBinary):
resp = json.loads(resp)
self.assertEqual(resp['headers']['status'], 200)
self.assertEqual(json.dumps(resp['body']),
json.dumps({"node": 31}))
send_mock.side_effect = validator
self.protocol.onMessage(req, False)
# List tail
action = "queue_list"
body = {}
req = test_utils.create_request(action, body, headers)
def validator(resp, isBinary):
resp = json.loads(resp)
self.assertEqual(resp['headers']['status'], 200)
send_mock.side_effect = validator
self.protocol.onMessage(req, False)
# List manually-constructed tail
body = {'marker': "zzz"}
req = test_utils.create_request(action, body, headers)
self.protocol.onMessage(req, False)
@testing.requires_mongodb
class TestQueueLifecycleMongoDB(QueueLifecycleBaseTest):
config_file = 'websocket_mongodb.conf'
def tearDown(self):
storage = self.boot.storage._storage
connection = storage.connection
connection.drop_database(storage.queues_database)
for db in storage.message_databases:
connection.drop_database(db)
super(TestQueueLifecycleMongoDB, self).tearDown()

View File

@ -13,7 +13,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from autobahn.asyncio import websocket
from oslo.config import cfg
try:
@ -21,9 +20,11 @@ try:
except ImportError:
import trollius as asyncio
from zaqar.common import decorators
from zaqar.i18n import _
import zaqar.openstack.common.log as logging
from zaqar.transport.websocket import protocol
from zaqar.transport.websocket import factory
_WS_OPTIONS = (
cfg.StrOpt('bind', default='127.0.0.1',
@ -54,6 +55,12 @@ class Driver(object):
self._conf.register_opts(_WS_OPTIONS, group=_WS_GROUP)
self._ws_conf = self._conf[_WS_GROUP]
@decorators.lazy_property(write=False)
def factory(self):
uri = 'ws://' + self._ws_conf.bind + ':' + str(self._ws_conf.port)
return factory.ProtocolFactory(uri, debug=self._ws_conf.debug,
handler=self._api)
def listen(self):
"""Self-host using 'bind' and 'port' from the WS config group."""
@ -61,13 +68,9 @@ class Driver(object):
LOG.info(msgtmpl,
{'bind': self._ws_conf.bind, 'port': self._ws_conf.port})
uri = 'ws://' + self._ws_conf.bind + ':' + str(self._ws_conf.port)
factory = websocket.WebSocketServerFactory(uri,
debug=self._ws_conf.debug)
factory.protocol = protocol.MessagingProtocol
loop = asyncio.get_event_loop()
coro = loop.create_server(factory, self._ws_conf.bind,
coro = loop.create_server(self.factory,
self._ws_conf.bind,
self._ws_conf.port)
server = loop.run_until_complete(coro)

View File

@ -0,0 +1,32 @@
# Copyright (c) 2015 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from autobahn.asyncio import websocket
from zaqar.transport.websocket import protocol
class ProtocolFactory(websocket.WebSocketServerFactory):
protocol = protocol.MessagingProtocol
def __init__(self, uri, debug, handler):
websocket.WebSocketServerFactory.__init__(self, uri, debug)
self._handler = handler
def __call__(self):
proto = self.protocol(self._handler)
proto.factory = self
return proto

View File

@ -15,9 +15,23 @@
from autobahn.asyncio import websocket
import json
from zaqar.api.v1_1 import request as schema_validator
from zaqar.common.api import request
from zaqar.common.api import response
from zaqar.common import errors
import zaqar.openstack.common.log as logging
LOG = logging.getLogger(__name__)
class MessagingProtocol(websocket.WebSocketServerProtocol):
def __init__(self, handler):
websocket.WebSocketServerProtocol.__init__(self)
self._handler = handler
def onConnect(self, request):
print("Client connecting: {0}".format(request.peer))
@ -26,10 +40,66 @@ class MessagingProtocol(websocket.WebSocketServerProtocol):
def onMessage(self, payload, isBinary):
if isBinary:
# TODO(vkmc): Binary support will be added in the next cycle
# For now, we are returning an invalid request response
print("Binary message received: {0} bytes".format(len(payload)))
req = self._dummy_request()
body = {'error': 'Schema validation failed.'}
headers = {'status': 400}
resp = response.Response(req, body, headers)
return resp
else:
print("Text message received: {0}".format(payload.decode('utf8')))
self.sendMessage(payload, isBinary)
try:
print("Text message received: {0}".
format(payload.decode('utf8')))
pl = json.loads(payload)
req = self._create_request(pl)
resp = (self._validate_request(pl, req) or
self._handler.process_request(req))
except ValueError as ex:
LOG.exception(ex)
req = self._dummy_request()
body = {'error': str(ex)}
headers = {'status': 400}
resp = response.Response(req, body, headers)
return resp
resp_json = json.dumps(resp.get_response())
self.sendMessage(resp_json, isBinary)
def onClose(self, wasClean, code, reason):
print("WebSocket connection closed: {0}".format(reason))
@staticmethod
def _create_request(pl):
action = pl.get('action')
body = pl.get('body', {})
headers = pl.get('headers')
return request.Request(action=action, body=body,
headers=headers, api="v1.1")
@staticmethod
def _validate_request(pl, req):
try:
action = pl.get('action')
validator = schema_validator.RequestSchema()
is_valid = validator.validate(action=action, body=pl)
except errors.InvalidAction as ex:
body = {'error': str(ex)}
headers = {'status': 400}
resp = response.Response(req, body, headers)
return resp
else:
if not is_valid:
body = {'error': 'Schema validation failed.'}
headers = {'status': 400}
resp = response.Response(req, body, headers)
return resp
return None
@staticmethod
def _dummy_request():
action = 'None'
return request.Request(action)