Merge "API handler and API v1.1 Queue endpoints"

This commit is contained in:
Jenkins 2015-02-18 01:17:23 +00:00 committed by Gerrit Code Review
commit 77025e1ae1
4 changed files with 307 additions and 6 deletions

29
zaqar/api/handler.py Normal file
View File

@ -0,0 +1,29 @@
# Copyright (c) 2015 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy
# of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.
from zaqar.api.v1_1 import endpoints
class Handler(object):
"""Defines API handler
The handler validates and process the requests
"""
def __init__(self, storage, control, validate):
self.v1_1_endpoints = endpoints.Endpoints(storage, control, validate)
def process_request(self, req):
# FIXME(vkmc): Control API version
return getattr(self.v1_1_endpoints, req._action)(req)

211
zaqar/api/v1_1/endpoints.py Normal file
View File

@ -0,0 +1,211 @@
# Copyright (c) 2015 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy
# of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.
from zaqar.common.api import response
from zaqar.common.api import utils as api_utils
from zaqar.i18n import _
import zaqar.openstack.common.log as logging
from zaqar.storage import errors as storage_errors
from zaqar.transport import utils
from zaqar.transport import validation
LOG = logging.getLogger(__name__)
class Endpoints(object):
"""v1.1 API Endpoints."""
def __init__(self, storage, control, validate):
self._queue_controller = storage.queue_controller
self._message_controller = storage.message_controller
self._claim_controller = storage.claim_controller
self._pools_controller = control.pools_controller
self._flavors_controller = control.flavors_controller
self._validate = validate
@api_utils.raises_conn_error
def queue_list(self, req):
"""Gets a list of queues
:param req: Request instance ready to be sent.
:type req: `api.common.Request`
:return: resp: Response instance
:type: resp: `api.common.Response`
"""
project_id = req._headers.get('X-Project-ID')
LOG.debug(u'Queue list - project: %(project)s',
{'project': project_id})
kwargs = {}
if req._body.get('marker') is not None:
kwargs['marker'] = req._body.get('marker')
if req._body.get('limit') is not None:
kwargs['limit'] = req._body.get('limit')
if req._body.get('detailed') is not None:
kwargs['detailed'] = req._body.get('detailed')
try:
self._validate.queue_listing(**kwargs)
results = self._queue_controller.list(
project=project_id, **kwargs)
except validation.ValidationFailed as ex:
LOG.debug(ex)
return api_utils.error_response(req, ex)
except storage_errors.BaseException as ex:
LOG.exception(ex)
error = 'Queues could not be listed.'
return api_utils.error_response(req, ex, error)
# Buffer list of queues
queues = list(next(results))
# Got some. Prepare the response.
body = utils.to_json({'queues': queues})
resp = response.Response(req, body)
return resp
@api_utils.raises_conn_error
def queue_create(self, req):
"""Creates a queue
:param req: Request instance ready to be sent.
:type req: `api.common.Request`
:return: resp: Response instance
:type: resp: `api.common.Response`
"""
project_id = req._headers.get('X-Project-ID')
queue_name = req._body.get('queue_name')
metadata = req._body.get('metadata')
LOG.debug(u'Queue create - queue: %(queue)s, project: %(project)s',
{'queue': queue_name,
'project': project_id})
try:
self._validate.queue_identification(queue_name, project_id)
self._validate.queue_metadata_length(len(metadata))
self._queue_controller.create(queue_name, metadata=metadata,
project=project_id)
except validation.ValidationFailed as ex:
LOG.debug(ex)
return api_utils.error_response(req, ex)
except storage_errors.BaseException as ex:
LOG.exception(ex)
error = _('Queue "%s" could not be created.') % queue_name
return api_utils.error_response(req, ex, error)
else:
body = _('Queue "%s" created.') % queue_name
resp = response.Response(req, body)
return resp
@api_utils.raises_conn_error
def queue_delete(self, req):
"""Deletes a queue
:param req: Request instance ready to be sent.
:type req: `api.common.Request`
:return: resp: Response instance
:type: resp: `api.common.Response`
"""
project_id = req._headers.get('X-Project-ID')
queue_name = req._body.get('queue_name')
LOG.debug(u'Queue delete - queue: %(queue)s, project: %(project)s',
{'queue': queue_name, 'project': project_id})
try:
self._queue_controller.delete(queue_name, project=project_id)
except storage_errors.BaseException as ex:
LOG.exception(ex)
error = _('Queue "%s" could not be deleted.') % queue_name
return api_utils.error_response(req, ex, error)
else:
body = _('Queue "%s" removed.') % queue_name
resp = response.Response(req, body)
return resp
@api_utils.raises_conn_error
def queue_get(self, req):
"""Gets a queue
:param req: Request instance ready to be sent.
:type req: `api.common.Request`
:return: resp: Response instance
:type: resp: `api.common.Response`
"""
project_id = req._headers.get('X-Project-ID')
queue_name = req._body.get('queue_name')
LOG.debug(u'Queue get - queue: %(queue)s, '
u'project: %(project)s',
{'queue': queue_name, 'project': project_id})
try:
resp_dict = self._queue_controller.get(queue_name,
project=project_id)
except storage_errors.DoesNotExist as ex:
LOG.debug(ex)
error = _('Queue "%s" does not exist.') % queue_name
return api_utils.error_response(req, ex, error)
except storage_errors.BaseException as ex:
LOG.exception(ex)
error = _('Cannot retrieve queue "%s".') % queue_name
return api_utils.error_response(req, ex, error)
else:
body = utils.to_json(resp_dict)
resp = response.Response(req, body)
return resp
@api_utils.raises_conn_error
def queue_get_stats(self, req):
"""Gets queue stats
:param req: Request instance ready to be sent.
:type req: `api.common.Request`
:return: resp: Response instance
:type: resp: `api.common.Response`
"""
project_id = req._headers.get('X-Project-ID')
queue_name = req._body.get('queue_name')
try:
resp_dict = self._queue_controller.stats(queue_name,
project=project_id)
body = utils.to_json(resp_dict)
except storage_errors.QueueDoesNotExist as ex:
LOG.exception(ex)
resp_dict = {
'messages': {
'claimed': 0,
'free': 0,
'total': 0
}
}
body = utils.to_json(resp_dict)
resp = response.Response(req, body)
return resp
except storage_errors.BaseException as ex:
LOG.exception(ex)
error = _('Cannot retrieve queue "%s" stats.') % queue_name
return api_utils.error_response(req, ex, error)
else:
resp = response.Response(req, body)
return resp

View File

@ -16,6 +16,7 @@
from oslo_config import cfg
from stevedore import driver
from zaqar.api import handler
from zaqar.common import decorators
from zaqar.common import errors
from zaqar.i18n import _
@ -24,6 +25,7 @@ from zaqar.openstack.common import log
from zaqar.storage import pipeline
from zaqar.storage import pooling
from zaqar.storage import utils as storage_utils
from zaqar.transport import validation
LOG = log.getLogger(__name__)
@ -92,6 +94,12 @@ class Bootstrap(object):
LOG.warn(msg)
self.conf.unreliable = True
@decorators.lazy_property(write=False)
def api(self):
LOG.debug(u'Loading API handler')
validate = validation.Validator(self.conf)
return handler.Handler(self.storage, self.control, validate)
@decorators.lazy_property(write=False)
def storage(self):
LOG.debug(u'Loading storage driver')
@ -129,12 +137,16 @@ class Bootstrap(object):
transport_name = self.driver_conf.transport
LOG.debug(u'Loading transport driver: %s', transport_name)
args = [
self.conf,
self.storage,
self.cache,
self.control,
]
# FIXME(vkmc): Find a better way to init args
if transport_name == 'websocket':
args = [self.conf, self.api, self.cache]
else:
args = [
self.conf,
self.storage,
self.cache,
self.control,
]
try:
mgr = driver.DriverManager('zaqar.transport',

49
zaqar/common/api/utils.py Normal file
View File

@ -0,0 +1,49 @@
# Copyright (c) 2015 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy
# of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.
import functools
import zaqar.common.api.response as response
from zaqar.i18n import _
import zaqar.openstack.common.log as logging
from zaqar.transport import utils
LOG = logging.getLogger(__name__)
def raises_conn_error(func):
"""Handles generic Exceptions
This decorator catches generic Exceptions and returns a generic
Response.
"""
@functools.wraps(func)
def wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except Exception as ex:
LOG.exception(ex)
error = _("Unexpected error.")
req = kwargs.get('req')
return error_response(req, ex, error)
return wrapper
def error_response(req, exception, error=None):
body = utils.to_json({'exception': exception,
'error': error})
resp = response.Response(req, body)
return resp