Implement volume type basic operation
The purpose of this patch set is to provide the end users with the following functionalities: 1. Create volume type resources, only admin is able to create volume type 2. Show details for a volume type by id 3. List volume types 4. Update a volume type, only admin is able to update volume type 5. Delete a volume type, only admin is able to delete volume type Change-Id: I6e3188018eff6db155ec02dbd2af6aefc0363df9
This commit is contained in:
parent
828f55287e
commit
558e3d5ad7
@ -18,6 +18,7 @@ import pecan
|
||||
import oslo_log.log as logging
|
||||
|
||||
from tricircle.cinder_apigw.controllers import volume
|
||||
from tricircle.cinder_apigw.controllers import volume_type
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
@ -65,6 +66,7 @@ class V2Controller(object):
|
||||
|
||||
self.resource_controller = {
|
||||
'volumes': volume.VolumeController,
|
||||
'types': volume_type.VolumeTypeController
|
||||
}
|
||||
|
||||
@pecan.expose()
|
||||
|
286
tricircle/cinder_apigw/controllers/volume_type.py
Normal file
286
tricircle/cinder_apigw/controllers/volume_type.py
Normal file
@ -0,0 +1,286 @@
|
||||
# Copyright 2016 OpenStack Foundation.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import pecan
|
||||
from pecan import expose
|
||||
from pecan import rest
|
||||
|
||||
from oslo_log import log as logging
|
||||
from oslo_utils import uuidutils
|
||||
|
||||
import tricircle.common.context as t_context
|
||||
from tricircle.common import exceptions
|
||||
from tricircle.common.i18n import _
|
||||
from tricircle.common.i18n import _LE
|
||||
from tricircle.common import utils
|
||||
import tricircle.db.api as db_api
|
||||
from tricircle.db import core
|
||||
from tricircle.db import models
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class VolumeTypeController(rest.RestController):
|
||||
|
||||
def __init__(self, tenant_id):
|
||||
self.tenant_id = tenant_id
|
||||
|
||||
def _metadata_refs(self, metadata_dict, meta_class):
|
||||
metadata_refs = []
|
||||
|
||||
if metadata_dict:
|
||||
for k, v in metadata_dict.items():
|
||||
metadata_ref = meta_class()
|
||||
metadata_ref['key'] = k
|
||||
metadata_ref['value'] = v
|
||||
metadata_refs.append(metadata_ref)
|
||||
return metadata_refs
|
||||
|
||||
@expose(generic=True, template='json')
|
||||
def post(self, **kw):
|
||||
"""Creates volume types."""
|
||||
context = t_context.extract_context_from_environ()
|
||||
|
||||
if not context.is_admin:
|
||||
return utils.format_cinder_error(
|
||||
403, _("Policy doesn't allow volume_extension:types_manage "
|
||||
"to be performed."))
|
||||
|
||||
if 'volume_type' not in kw:
|
||||
return utils.format_cinder_error(
|
||||
400, _("Missing required element 'volume_type' in "
|
||||
"request body."))
|
||||
|
||||
projects = []
|
||||
|
||||
if self.tenant_id is not None:
|
||||
projects = [self.tenant_id]
|
||||
|
||||
vol_type = kw['volume_type']
|
||||
name = vol_type.get('name', None)
|
||||
description = vol_type.get('description')
|
||||
specs = vol_type.get('extra_specs', {})
|
||||
is_public = vol_type.pop('os-volume-type-access:is_public', True)
|
||||
|
||||
if name is None or len(name.strip()) == 0:
|
||||
return utils.format_cinder_error(
|
||||
400, _("Volume type name can not be empty."))
|
||||
|
||||
try:
|
||||
utils.check_string_length(name, 'Type name',
|
||||
min_len=1, max_len=255)
|
||||
except exceptions.InvalidInput as e:
|
||||
return utils.format_cinder_error(
|
||||
400, e.message)
|
||||
|
||||
if description is not None:
|
||||
try:
|
||||
utils.check_string_length(description, 'Type description',
|
||||
min_len=0, max_len=255)
|
||||
except exceptions.InvalidInput as e:
|
||||
return utils.format_cinder_error(400, e.message)
|
||||
|
||||
if not utils.is_valid_boolstr(is_public):
|
||||
msg = _("Invalid value '%(is_public)s' for is_public. "
|
||||
"Accepted values: True or False.") % {
|
||||
'is_public': is_public}
|
||||
return utils.format_cinder_error(400, msg)
|
||||
|
||||
vol_type['extra_specs'] = specs
|
||||
vol_type['is_public'] = is_public
|
||||
vol_type['id'] = uuidutils.generate_uuid()
|
||||
|
||||
session = core.get_session()
|
||||
with session.begin():
|
||||
try:
|
||||
db_api.volume_type_get_by_name(context, vol_type['name'],
|
||||
session)
|
||||
return utils.format_cinder_error(
|
||||
409, _("Volume Type %(id)s already exists.") % {
|
||||
'id': vol_type['id']})
|
||||
except exceptions.VolumeTypeNotFoundByName:
|
||||
pass
|
||||
try:
|
||||
extra_specs = vol_type['extra_specs']
|
||||
vol_type['extra_specs'] = \
|
||||
self._metadata_refs(vol_type.get('extra_specs'),
|
||||
models.VolumeTypeExtraSpecs)
|
||||
volume_type_ref = models.VolumeTypes()
|
||||
volume_type_ref.update(vol_type)
|
||||
session.add(volume_type_ref)
|
||||
for project in set(projects):
|
||||
access_ref = models.VolumeTypeProjects()
|
||||
access_ref.update({"volume_type_id": volume_type_ref.id,
|
||||
"project_id": project})
|
||||
access_ref.save(session=session)
|
||||
except Exception as e:
|
||||
LOG.exception(_LE('Fail to create volume type: %(name)s,'
|
||||
'%(exception)s'),
|
||||
{'name': vol_type['name'],
|
||||
'exception': e})
|
||||
return utils.format_cinder_error(
|
||||
500, _('Fail to create volume type'))
|
||||
|
||||
vol_type['extra_specs'] = extra_specs
|
||||
return {'volume_type': vol_type}
|
||||
|
||||
@expose(generic=True, template='json')
|
||||
def get_one(self, _id):
|
||||
"""Retrieves single volume type by id.
|
||||
|
||||
:param _id: id of volume type to be retrieved
|
||||
:returns: retrieved volume type
|
||||
"""
|
||||
context = t_context.extract_context_from_environ()
|
||||
try:
|
||||
result = db_api.volume_type_get(context, _id)
|
||||
except exceptions.VolumeTypeNotFound as e:
|
||||
return utils.format_cinder_error(404, e.message)
|
||||
except Exception as e:
|
||||
LOG.exception(_LE('Volume type not found: %(id)s,'
|
||||
'%(exception)s'),
|
||||
{'id': _id,
|
||||
'exception': e})
|
||||
return utils.format_cinder_error(
|
||||
404, _("Volume type %(id)s could not be found.") % {
|
||||
'id': _id})
|
||||
return {'volume_type': result}
|
||||
|
||||
@expose(generic=True, template='json')
|
||||
def get_all(self):
|
||||
"""Get all non-deleted volume_types."""
|
||||
filters = {}
|
||||
context = t_context.extract_context_from_environ()
|
||||
if not context.is_admin:
|
||||
# Only admin has query access to all volume types
|
||||
filters['is_public'] = True
|
||||
try:
|
||||
list_result = db_api.volume_type_get_all(context,
|
||||
list_result=True,
|
||||
filters=filters)
|
||||
except Exception as e:
|
||||
LOG.exception(_LE('Fail to retrieve volume types: %(exception)s'),
|
||||
{'exception': e})
|
||||
return utils.format_cinder_error(500, e)
|
||||
|
||||
return {'volume_types': list_result}
|
||||
|
||||
@expose(generic=True, template='json')
|
||||
def put(self, _id, **kw):
|
||||
"""Update volume type by id.
|
||||
|
||||
:param _id: id of volume type to be updated
|
||||
:param kw: dictionary of values to be updated
|
||||
:returns: updated volume type
|
||||
"""
|
||||
context = t_context.extract_context_from_environ()
|
||||
|
||||
if not context.is_admin:
|
||||
return utils.format_cinder_error(
|
||||
403, _("Policy doesn't allow volume_extension:types_manage "
|
||||
"to be performed."))
|
||||
|
||||
if 'volume_type' not in kw:
|
||||
return utils.format_cinder_error(
|
||||
400, _("Missing required element 'volume_type' in "
|
||||
"request body."))
|
||||
|
||||
values = kw['volume_type']
|
||||
name = values.get('name')
|
||||
description = values.get('description')
|
||||
is_public = values.get('os-volume-type-access:is_public')
|
||||
|
||||
# Name and description can not be both None.
|
||||
# If name specified, name can not be empty.
|
||||
if name and len(name.strip()) == 0:
|
||||
return utils.format_cinder_error(
|
||||
400, _("Volume type name can not be empty."))
|
||||
|
||||
if name is None and description is None and is_public is None:
|
||||
msg = _("Specify volume type name, description, is_public or "
|
||||
"a combination thereof.")
|
||||
return utils.format_cinder_error(400, msg)
|
||||
|
||||
if is_public is not None and not utils.is_valid_boolstr(is_public):
|
||||
msg = _("Invalid value '%(is_public)s' for is_public. Accepted "
|
||||
"values: True or False.") % {'is_public': is_public}
|
||||
return utils.format_cinder_error(400, msg)
|
||||
|
||||
if name:
|
||||
try:
|
||||
utils.check_string_length(name, 'Type name',
|
||||
min_len=1, max_len=255)
|
||||
except exceptions.InvalidInput as e:
|
||||
return utils.format_cinder_error(400, e.message)
|
||||
|
||||
if description is not None:
|
||||
try:
|
||||
utils.check_string_length(description, 'Type description',
|
||||
min_len=0, max_len=255)
|
||||
except exceptions.InvalidInput as e:
|
||||
return utils.format_cinder_error(400, e.message)
|
||||
|
||||
try:
|
||||
type_updated = \
|
||||
db_api.volume_type_update(context, _id,
|
||||
dict(name=name,
|
||||
description=description,
|
||||
is_public=is_public))
|
||||
except exceptions.VolumeTypeNotFound as e:
|
||||
return utils.format_cinder_error(404, e.message)
|
||||
except exceptions.VolumeTypeExists as e:
|
||||
return utils.format_cinder_error(409, e.message)
|
||||
except exceptions.VolumeTypeUpdateFailed as e:
|
||||
return utils.format_cinder_error(500, e.message)
|
||||
except Exception as e:
|
||||
LOG.exception(_LE('Fail to update volume type: %(name)s,'
|
||||
'%(exception)s'),
|
||||
{'name': values['name'],
|
||||
'exception': e})
|
||||
return utils.format_cinder_error(
|
||||
500, _("Fail to update volume type."))
|
||||
return {'volume_type': type_updated}
|
||||
|
||||
@expose(generic=True, template='json')
|
||||
def delete(self, _id):
|
||||
"""Marks volume types as deleted.
|
||||
|
||||
:param _id: id of volume type to be deleted
|
||||
"""
|
||||
context = t_context.extract_context_from_environ()
|
||||
|
||||
if not context.is_admin:
|
||||
return utils.format_cinder_error(
|
||||
403, _("Policy doesn't allow volume_extension:types_manage "
|
||||
"to be performed."))
|
||||
|
||||
session = core.get_session()
|
||||
with session.begin():
|
||||
try:
|
||||
db_api.volume_type_get(context, _id, session)
|
||||
except exceptions.VolumeTypeNotFound as e:
|
||||
return utils.format_cinder_error(404, e.message)
|
||||
try:
|
||||
db_api.volume_type_delete(context, _id, session)
|
||||
except Exception as e:
|
||||
LOG.exception(_LE('Fail to update volume type: %(id)s,'
|
||||
'%(exception)s'),
|
||||
{'id': _id,
|
||||
'exception': e})
|
||||
return utils.format_cinder_error(
|
||||
500, _('Fail to delete volume type.'))
|
||||
|
||||
pecan.response.status = 202
|
||||
return pecan.response
|
@ -19,6 +19,7 @@ from pecan import request
|
||||
|
||||
import oslo_context.context as oslo_ctx
|
||||
|
||||
from tricircle.common.i18n import _
|
||||
from tricircle.db import core
|
||||
|
||||
|
||||
@ -70,9 +71,18 @@ def get_context_from_neutron_context(context):
|
||||
|
||||
class ContextBase(oslo_ctx.RequestContext):
|
||||
def __init__(self, auth_token=None, user_id=None, tenant_id=None,
|
||||
is_admin=False, request_id=None, overwrite=True,
|
||||
user_name=None, tenant_name=None, quota_class=None,
|
||||
**kwargs):
|
||||
is_admin=False, read_deleted="no", request_id=None,
|
||||
overwrite=True, user_name=None, tenant_name=None,
|
||||
quota_class=None, **kwargs):
|
||||
"""Initialize RequestContext.
|
||||
|
||||
:param read_deleted: 'no' indicates deleted records are hidden, 'yes'
|
||||
indicates deleted records are visible, 'only' indicates that
|
||||
*only* deleted records are visible.
|
||||
|
||||
:param overwrite: Set to False to ensure that the greenthread local
|
||||
copy of the index is not overwritten.
|
||||
"""
|
||||
super(ContextBase, self).__init__(
|
||||
auth_token=auth_token,
|
||||
user=user_id or kwargs.get('user', None),
|
||||
@ -89,6 +99,22 @@ class ContextBase(oslo_ctx.RequestContext):
|
||||
self.user_name = user_name
|
||||
self.tenant_name = tenant_name
|
||||
self.quota_class = quota_class
|
||||
self.read_deleted = read_deleted
|
||||
|
||||
def _get_read_deleted(self):
|
||||
return self._read_deleted
|
||||
|
||||
def _set_read_deleted(self, read_deleted):
|
||||
if read_deleted not in ('no', 'yes', 'only'):
|
||||
raise ValueError(_("read_deleted can only be one of 'no', "
|
||||
"'yes' or 'only', not %r") % read_deleted)
|
||||
self._read_deleted = read_deleted
|
||||
|
||||
def _del_read_deleted(self):
|
||||
del self._read_deleted
|
||||
|
||||
read_deleted = property(_get_read_deleted, _set_read_deleted,
|
||||
_del_read_deleted)
|
||||
|
||||
def to_dict(self):
|
||||
ctx_dict = super(ContextBase, self).to_dict()
|
||||
|
@ -273,3 +273,29 @@ class ValidationError(TricircleException):
|
||||
class HTTPForbiddenError(TricircleException):
|
||||
message = _("%(msg)s")
|
||||
code = 403
|
||||
|
||||
|
||||
class VolumeTypeNotFound(NotFound):
|
||||
message = _("Volume type %(volume_type_id)s could not be found.")
|
||||
|
||||
|
||||
class VolumeTypeNotFoundByName(VolumeTypeNotFound):
|
||||
message = _("Volume type with name %(volume_type_name)s "
|
||||
"could not be found.")
|
||||
|
||||
|
||||
class VolumeTypeExtraSpecsNotFound(NotFound):
|
||||
message = _("Volume Type %(volume_type_id)s has no extra specs with "
|
||||
"key %(extra_specs_key)s.")
|
||||
|
||||
|
||||
class Duplicate(TricircleException):
|
||||
pass
|
||||
|
||||
|
||||
class VolumeTypeExists(Duplicate):
|
||||
message = _("Volume Type %(id)s already exists.")
|
||||
|
||||
|
||||
class VolumeTypeUpdateFailed(TricircleException):
|
||||
message = _("Cannot update volume_type %(id)s")
|
||||
|
@ -15,6 +15,8 @@
|
||||
|
||||
import six
|
||||
|
||||
import pecan
|
||||
|
||||
import tricircle.common.exceptions as t_exceptions
|
||||
from tricircle.common.i18n import _
|
||||
|
||||
@ -116,3 +118,28 @@ def check_string_length(value, name=None, min_len=0, max_len=None):
|
||||
|
||||
def get_bottom_network_name(network):
|
||||
return '%s#%s' % (network['id'], network['name'])
|
||||
|
||||
|
||||
def format_error(code, message, error_type=None):
|
||||
error_type_map = {400: 'badRequest',
|
||||
403: 'forbidden',
|
||||
404: 'itemNotFound',
|
||||
409: 'conflictingRequest',
|
||||
500: 'internalServerError'}
|
||||
pecan.response.status = code
|
||||
if not error_type:
|
||||
if code in error_type_map:
|
||||
error_type = error_type_map[code]
|
||||
else:
|
||||
error_type = 'Error'
|
||||
# format error message in this form so nova client can
|
||||
# correctly parse it
|
||||
return {error_type: {'message': message, 'code': code}}
|
||||
|
||||
|
||||
def format_nova_error(code, message, error_type=None):
|
||||
return format_error(code, message, error_type)
|
||||
|
||||
|
||||
def format_cinder_error(code, message, error_type=None):
|
||||
return format_error(code, message, error_type)
|
||||
|
@ -23,6 +23,9 @@ from oslo_db import exception as db_exc
|
||||
from oslo_log import log as logging
|
||||
from oslo_utils import timeutils
|
||||
from oslo_utils import uuidutils
|
||||
from sqlalchemy import or_, and_
|
||||
from sqlalchemy.orm import joinedload
|
||||
from sqlalchemy.sql.expression import literal_column
|
||||
|
||||
from tricircle.common import constants
|
||||
from tricircle.common.context import is_admin_context as _is_admin_context
|
||||
@ -380,6 +383,21 @@ def _retry_on_deadlock(f):
|
||||
return wrapped
|
||||
|
||||
|
||||
def handle_db_data_error(f):
|
||||
def wrapper(*args, **kwargs):
|
||||
try:
|
||||
return f(*args, **kwargs)
|
||||
except db_exc.DBDataError:
|
||||
msg = _('Error writing field to database')
|
||||
LOG.exception(msg)
|
||||
raise exceptions.Invalid(msg)
|
||||
except Exception as e:
|
||||
LOG.exception(str(e))
|
||||
raise
|
||||
|
||||
return wrapper
|
||||
|
||||
|
||||
def model_query(context, *args, **kwargs):
|
||||
"""Query helper that accounts for context's `read_deleted` field.
|
||||
|
||||
@ -904,3 +922,316 @@ def reservation_expire(context):
|
||||
reservation.usage.save(session=context.session)
|
||||
|
||||
reservation.delete(session=context.session)
|
||||
|
||||
|
||||
def _dict_with_extra_specs_if_authorized(context, inst_type_query):
|
||||
"""Convert type query result to dict with extra_spec and rate_limit.
|
||||
|
||||
Takes a volume type query returned by sqlalchemy and returns it
|
||||
as a dictionary, converting the extra_specs entry from a list
|
||||
of dicts.
|
||||
|
||||
NOTE:
|
||||
the contents of extra-specs are admin readable only.
|
||||
If the context passed in for this request is not about admin,
|
||||
we will return an empty extra-specs dict rather than
|
||||
providing extra-specs details.
|
||||
|
||||
:param context: The request context, for access checks.
|
||||
:param inst_type_query: list of extra-specs.
|
||||
:returns dictionary of extra-specs.
|
||||
|
||||
Example of response of admin context:
|
||||
|
||||
'extra_specs' : [{'key': 'k1', 'value': 'v1', ...}, ...]
|
||||
to a single dict:
|
||||
'extra_specs' : {'k1': 'v1'}
|
||||
|
||||
"""
|
||||
|
||||
inst_type_dict = dict(inst_type_query)
|
||||
if not context.is_admin:
|
||||
del (inst_type_dict['extra_specs'])
|
||||
else:
|
||||
extra_specs = {x['key']: x['value']
|
||||
for x in inst_type_query['extra_specs']}
|
||||
inst_type_dict['extra_specs'] = extra_specs
|
||||
return inst_type_dict
|
||||
|
||||
|
||||
@require_context
|
||||
def _volume_type_get_by_name(context, name, session=None):
|
||||
result = model_query(context, models.VolumeTypes, session=session). \
|
||||
options(joinedload('extra_specs')). \
|
||||
filter_by(name=name). \
|
||||
first()
|
||||
|
||||
if not result:
|
||||
raise exceptions.VolumeTypeNotFoundByName(volume_type_name=name)
|
||||
|
||||
return _dict_with_extra_specs_if_authorized(context, result)
|
||||
|
||||
|
||||
@require_context
|
||||
def volume_type_get_by_name(context, name, session=None):
|
||||
"""Return a dict describing specific volume_type.
|
||||
|
||||
:param context: The request context, for access checks.
|
||||
:param name: The name of volume type to be found.
|
||||
:returns Volume type.
|
||||
"""
|
||||
return _volume_type_get_by_name(context, name, session)
|
||||
|
||||
|
||||
def _volume_type_get_query(context, session=None, read_deleted='no'):
|
||||
query = model_query(context, models.VolumeTypes,
|
||||
session=session,
|
||||
read_deleted=read_deleted). \
|
||||
options(joinedload('extra_specs'))
|
||||
|
||||
if not context.is_admin:
|
||||
is_public = True
|
||||
the_filter = [models.VolumeTypes.is_public == is_public]
|
||||
query.filter(or_(*the_filter))
|
||||
|
||||
return query
|
||||
|
||||
|
||||
def _volume_type_get_db_object(context, id, session=None, inactive=False):
|
||||
read_deleted = "yes" if inactive else "no"
|
||||
result = _volume_type_get_query(
|
||||
context, session, read_deleted). \
|
||||
filter_by(id=id). \
|
||||
first()
|
||||
return result
|
||||
|
||||
|
||||
@require_context
|
||||
def _volume_type_get(context, id, session=None, inactive=False):
|
||||
result = _volume_type_get_db_object(context, id, session, inactive)
|
||||
|
||||
if not result:
|
||||
raise exceptions.VolumeTypeNotFound(volume_type_id=id)
|
||||
|
||||
vtype = _dict_with_extra_specs_if_authorized(context, result)
|
||||
|
||||
return vtype
|
||||
|
||||
|
||||
@require_context
|
||||
def volume_type_get(context, id, inactive=False):
|
||||
"""Return a dict describing specific volume_type.
|
||||
|
||||
:param context: The request context, for access checks.
|
||||
:param id: The id of volume type to be found.
|
||||
:returns Volume type.
|
||||
"""
|
||||
|
||||
return _volume_type_get(context, id,
|
||||
session=None,
|
||||
inactive=inactive)
|
||||
|
||||
|
||||
@require_context
|
||||
def volume_type_delete(context, id, session):
|
||||
"""delete a volume_type by id.
|
||||
|
||||
:param context: The request context, for access checks.
|
||||
:param id: The id of volume type to be deleted.
|
||||
"""
|
||||
model_query(context, models.VolumeTypes, session=session, read_deleted="no").\
|
||||
filter_by(id=id). \
|
||||
update({'deleted': True,
|
||||
'deleted_at': timeutils.utcnow(),
|
||||
'updated_at': literal_column('updated_at')})
|
||||
|
||||
model_query(context, models.VolumeTypeExtraSpecs, session=session, read_deleted="no"). \
|
||||
filter_by(volume_type_id=id). \
|
||||
update({'deleted': True,
|
||||
'deleted_at': timeutils.utcnow(),
|
||||
'updated_at': literal_column('updated_at')})
|
||||
|
||||
|
||||
def is_valid_model_filters(model, filters):
|
||||
"""Return True if filter values exist on the model
|
||||
|
||||
:param model: a Cinder model
|
||||
:param filters: dictionary of filters
|
||||
"""
|
||||
for key in filters.keys():
|
||||
if not hasattr(model, key):
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
def _process_volume_types_filters(query, filters):
|
||||
context = filters.pop('context', None)
|
||||
|
||||
if filters.get('is_public'):
|
||||
the_filter = [models.VolumeTypes.is_public == filters['is_public']]
|
||||
|
||||
if filters['is_public'] and context.project_id is not None:
|
||||
projects_attr = getattr(models.VolumeTypes, 'projects')
|
||||
the_filter.append(
|
||||
[projects_attr.any(project_id=context.project_id,
|
||||
deleted=0)])
|
||||
|
||||
if len(the_filter) > 1:
|
||||
query = query.filter(or_(*the_filter))
|
||||
else:
|
||||
query = query.filter(the_filter[0])
|
||||
|
||||
if 'is_public' in filters:
|
||||
del filters['is_public']
|
||||
|
||||
if filters:
|
||||
# Ensure that filters' keys exist on the model
|
||||
if not is_valid_model_filters(models.VolumeTypes, filters):
|
||||
return
|
||||
|
||||
if filters.get('extra_specs') is not None:
|
||||
the_filter = []
|
||||
searchdict = filters.get('extra_specs')
|
||||
extra_specs = getattr(models.VolumeTypes, 'extra_specs')
|
||||
for k, v in searchdict.items():
|
||||
the_filter.append([extra_specs.any(key=k, value=v,
|
||||
deleted=False)])
|
||||
|
||||
if len(the_filter) > 1:
|
||||
query = query.filter(and_(*the_filter))
|
||||
else:
|
||||
query = query.filter(the_filter[0])
|
||||
del filters['extra_specs']
|
||||
query = query.filter_by(**filters)
|
||||
return query
|
||||
|
||||
|
||||
@require_context
|
||||
def volume_type_get_all(context, inactive=False, filters=None,
|
||||
list_result=False):
|
||||
"""Returns a dict describing all volume_types with name as key.
|
||||
|
||||
:param context: context to query under
|
||||
:param inactive: Pass true as argument if you want deleted volume types
|
||||
returned also.
|
||||
:param filters: dictionary of filters; values that are in lists, tuples,
|
||||
or sets cause an 'IN' operation, while exact matching
|
||||
is used for other values, see _process_volume_type_filters
|
||||
function for more information
|
||||
:param list_result: For compatibility, if list_result = True, return
|
||||
a list instead of dict.
|
||||
:returns: list/dict of matching volume types
|
||||
"""
|
||||
read_deleted = 'yes' if inactive else 'no'
|
||||
session = core.get_session()
|
||||
with session.begin():
|
||||
filters = filters or {}
|
||||
filters['context'] = context
|
||||
# Generate the query
|
||||
query = _volume_type_get_query(context, session=session,
|
||||
read_deleted=read_deleted)
|
||||
query = _process_volume_types_filters(query, filters)
|
||||
|
||||
# No volume types would match, return empty dict or list
|
||||
if query is None:
|
||||
if list_result:
|
||||
return []
|
||||
return {}
|
||||
|
||||
rows = query.all()
|
||||
|
||||
if list_result:
|
||||
result = [_dict_with_extra_specs_if_authorized(context, row)
|
||||
for row in rows]
|
||||
return result
|
||||
result = {row['name']: _dict_with_extra_specs_if_authorized(context,
|
||||
row)
|
||||
for row in rows}
|
||||
return result
|
||||
|
||||
|
||||
@require_context
|
||||
def _volume_type_ref_get(context, id, session=None, inactive=False):
|
||||
read_deleted = "yes" if inactive else "no"
|
||||
result = model_query(context,
|
||||
models.VolumeTypes,
|
||||
session=session,
|
||||
read_deleted=read_deleted).\
|
||||
options(joinedload('extra_specs')).\
|
||||
filter_by(id=id).\
|
||||
first()
|
||||
|
||||
if not result:
|
||||
raise exceptions.VolumeTypeNotFound(volume_type_id=id)
|
||||
|
||||
return result
|
||||
|
||||
|
||||
@handle_db_data_error
|
||||
@require_admin_context
|
||||
def volume_type_update(context, volume_type_id, values):
|
||||
"""Update volume type by volume_type_id.
|
||||
|
||||
:param volume_type_id: id of volume type to be updated
|
||||
:param values: dictionary of values to be updated
|
||||
:returns: updated volume type
|
||||
"""
|
||||
session = core.get_session()
|
||||
with session.begin():
|
||||
try:
|
||||
# Check it exists
|
||||
volume_type_ref = _volume_type_ref_get(context,
|
||||
volume_type_id,
|
||||
session)
|
||||
if not volume_type_ref:
|
||||
raise exceptions.VolumeTypeNotFound(type_id=volume_type_id)
|
||||
|
||||
# No description change
|
||||
if values['description'] is None:
|
||||
del values['description']
|
||||
|
||||
# No is_public change
|
||||
if values['is_public'] is None:
|
||||
del values['is_public']
|
||||
|
||||
# No name change
|
||||
if values['name'] is None:
|
||||
del values['name']
|
||||
else:
|
||||
# Volume type name is unique. If change to a name that
|
||||
# belongs to a different volume_type , it should be
|
||||
# prevented.
|
||||
check_vol_type = None
|
||||
try:
|
||||
check_vol_type = \
|
||||
volume_type_get_by_name(context,
|
||||
values['name'],
|
||||
session=session)
|
||||
except exceptions.VolumeTypeNotFoundByName:
|
||||
pass
|
||||
else:
|
||||
if check_vol_type.get('id') != volume_type_id:
|
||||
raise exceptions.VolumeTypeExists(id=values['name'])
|
||||
|
||||
volume_type_ref.update(values)
|
||||
volume_type_ref.save(session=session)
|
||||
except Exception:
|
||||
raise exceptions.VolumeTypeUpdateFailed(id=volume_type_id)
|
||||
|
||||
return _dict_with_extra_specs_if_authorized(context, volume_type_ref)
|
||||
|
||||
|
||||
@require_context
|
||||
def volume_type_project_query(context, session=None, inactive=False,
|
||||
filters=None):
|
||||
"""Get a query of volume type project.
|
||||
|
||||
:param context: context to query under
|
||||
:param inactive: Pass true as argument if you want deleted
|
||||
volume type projects returned also.
|
||||
:param filters: dictionary of filters.
|
||||
"""
|
||||
read_deleted = "yes" if inactive else "no"
|
||||
filters = filters or {}
|
||||
return model_query(context, models.VolumeTypeProjects, session=session,
|
||||
read_deleted=read_deleted).filter_by(**filters)
|
||||
|
@ -190,6 +190,41 @@ def upgrade(migrate_engine):
|
||||
sql.Column('is_public', sql.Boolean, default=True),
|
||||
sql.Column('created_at', sql.DateTime),
|
||||
sql.Column('updated_at', sql.DateTime),
|
||||
sql.Column('deleted_at', sql.DateTime),
|
||||
sql.Column('deleted', sql.Boolean),
|
||||
mysql_engine='InnoDB',
|
||||
mysql_charset='utf8')
|
||||
|
||||
volume_type_extra_specs = sql.Table(
|
||||
'volume_type_extra_specs', meta,
|
||||
sql.Column('created_at', sql.DateTime),
|
||||
sql.Column('updated_at', sql.DateTime),
|
||||
sql.Column('deleted_at', sql.DateTime),
|
||||
sql.Column('deleted', sql.Boolean),
|
||||
sql.Column('id', sql.Integer, primary_key=True, nullable=False),
|
||||
sql.Column('volume_type_id', sql.String(36),
|
||||
sql.ForeignKey('volume_types.id'),
|
||||
nullable=False),
|
||||
sql.Column('key', sql.String(length=255)),
|
||||
sql.Column('value', sql.String(length=255)),
|
||||
mysql_engine='InnoDB',
|
||||
mysql_charset='utf8')
|
||||
|
||||
volume_type_projects = sql.Table(
|
||||
'volume_type_projects', meta,
|
||||
sql.Column('id', sql.Integer, primary_key=True, nullable=False),
|
||||
sql.Column('created_at', sql.DateTime),
|
||||
sql.Column('updated_at', sql.DateTime),
|
||||
sql.Column('deleted_at', sql.DateTime),
|
||||
sql.Column('volume_type_id', sql.String(36),
|
||||
sql.ForeignKey('volume_types.id'),
|
||||
nullable=False),
|
||||
sql.Column('project_id', sql.String(length=255)),
|
||||
sql.Column('deleted', sql.Boolean(create_constraint=True, name=None)),
|
||||
migrate.UniqueConstraint(
|
||||
'volume_type_id', 'project_id', 'deleted',
|
||||
name='uniq_volume_type_projects0volume_type_id0project_id0deleted'
|
||||
),
|
||||
mysql_engine='InnoDB',
|
||||
mysql_charset='utf8')
|
||||
|
||||
@ -235,8 +270,8 @@ def upgrade(migrate_engine):
|
||||
tables = [aggregates, aggregate_metadata, instance_types,
|
||||
instance_type_projects, instance_type_extra_specs, key_pairs,
|
||||
quotas, quota_classes, quota_usages, reservations,
|
||||
volume_types, job,
|
||||
quality_of_service_specs, cascaded_pods_resource_routing]
|
||||
volume_types, volume_type_extra_specs, volume_type_projects,
|
||||
job, quality_of_service_specs, cascaded_pods_resource_routing]
|
||||
for table in tables:
|
||||
table.create()
|
||||
|
||||
@ -250,6 +285,10 @@ def upgrade(migrate_engine):
|
||||
'references': [quota_usages.c.id]},
|
||||
{'columns': [volume_types.c.qos_specs_id],
|
||||
'references': [quality_of_service_specs.c.id]},
|
||||
{'columns': [volume_type_extra_specs.c.volume_type_id],
|
||||
'references': [volume_types.c.id]},
|
||||
{'columns': [volume_type_projects.c.volume_type_id],
|
||||
'references': [volume_types.c.id]},
|
||||
{'columns': [quality_of_service_specs.c.specs_id],
|
||||
'references': [quality_of_service_specs.c.id]},
|
||||
{'columns': [aggregate_metadata.c.aggregate_id],
|
||||
|
@ -260,11 +260,29 @@ class Reservation(core.ModelBase, QuotasBase):
|
||||
'QuotaUsages.deleted == 0)')
|
||||
|
||||
|
||||
class VolumeTypes(core.ModelBase, core.DictBase, models.TimestampMixin):
|
||||
class VolumeTypeBase(models.ModelBase, core.DictBase,
|
||||
models.TimestampMixin, models.SoftDeleteMixin):
|
||||
"""VolumeTypeBase.
|
||||
|
||||
provide base class for volume type series tables. For it inherits from
|
||||
models.ModelBase, this is different from other tables
|
||||
"""
|
||||
__table_args__ = {'mysql_engine': 'InnoDB'}
|
||||
|
||||
metadata = None
|
||||
|
||||
def delete(self, session):
|
||||
"""Delete this object."""
|
||||
self.deleted = True
|
||||
self.deleted_at = timeutils.utcnow()
|
||||
self.save(session=session)
|
||||
|
||||
|
||||
class VolumeTypes(core.ModelBase, VolumeTypeBase):
|
||||
"""Represent possible volume_types of volumes offered."""
|
||||
__tablename__ = "volume_types"
|
||||
__tablename__ = 'volume_types'
|
||||
attributes = ['id', 'name', 'description', 'qos_specs_id', 'is_public',
|
||||
'created_at', 'updated_at']
|
||||
'created_at', 'updated_at', 'deleted_at', 'deleted']
|
||||
|
||||
id = sql.Column(sql.String(36), primary_key=True)
|
||||
name = sql.Column(sql.String(255), unique=True)
|
||||
@ -275,6 +293,53 @@ class VolumeTypes(core.ModelBase, core.DictBase, models.TimestampMixin):
|
||||
is_public = sql.Column(sql.Boolean, default=True)
|
||||
|
||||
|
||||
class VolumeTypeProjects(core.ModelBase, VolumeTypeBase):
|
||||
"""Represent projects associated volume_types."""
|
||||
__tablename__ = 'volume_type_projects'
|
||||
__table_args__ = (schema.UniqueConstraint(
|
||||
'volume_type_id', 'project_id', 'deleted',
|
||||
name="uniq_volume_type_projects0volume_type_id0project_id0deleted"),
|
||||
)
|
||||
attributes = ['id', 'volume_type_id', 'project_id', 'created_at',
|
||||
'updated_at', 'deleted_at', 'deleted']
|
||||
id = sql.Column(sql.Integer, primary_key=True)
|
||||
volume_type_id = sql.Column(sql.String(36),
|
||||
sql.ForeignKey('volume_types.id'),
|
||||
nullable=False)
|
||||
project_id = sql.Column(sql.String(255))
|
||||
deleted = sql.Column(sql.Boolean, default=False)
|
||||
|
||||
volume_type = relationship(
|
||||
VolumeTypes,
|
||||
backref="projects",
|
||||
foreign_keys=volume_type_id,
|
||||
primaryjoin='and_('
|
||||
'VolumeTypeProjects.volume_type_id == VolumeTypes.id,'
|
||||
'VolumeTypeProjects.deleted == False)')
|
||||
|
||||
|
||||
class VolumeTypeExtraSpecs(core.ModelBase, VolumeTypeBase):
|
||||
"""Represents additional specs as key/value pairs for a volume_type."""
|
||||
__tablename__ = 'volume_type_extra_specs'
|
||||
attributes = ['id', 'key', 'value', 'volume_type_id', 'created_at',
|
||||
'updated_at', 'deleted', 'deleted_at']
|
||||
|
||||
id = sql.Column(sql.Integer, primary_key=True)
|
||||
key = sql.Column(sql.String(255))
|
||||
value = sql.Column(sql.String(255))
|
||||
volume_type_id = sql.Column(sql.String(36),
|
||||
sql.ForeignKey('volume_types.id'),
|
||||
nullable=False)
|
||||
volume_type = relationship(
|
||||
VolumeTypes,
|
||||
backref="extra_specs",
|
||||
foreign_keys=volume_type_id,
|
||||
primaryjoin='and_('
|
||||
'VolumeTypeExtraSpecs.volume_type_id == VolumeTypes.id,'
|
||||
'VolumeTypeExtraSpecs.deleted == False)'
|
||||
)
|
||||
|
||||
|
||||
class QualityOfServiceSpecs(core.ModelBase, core.DictBase,
|
||||
models.TimestampMixin):
|
||||
"""Represents QoS specs as key/value pairs.
|
||||
|
0
tricircle/tests/unit/cinder_apigw/__init__.py
Normal file
0
tricircle/tests/unit/cinder_apigw/__init__.py
Normal file
271
tricircle/tests/unit/cinder_apigw/controllers/test_volume.py
Normal file
271
tricircle/tests/unit/cinder_apigw/controllers/test_volume.py
Normal file
@ -0,0 +1,271 @@
|
||||
# Copyright 2016 OpenStack Foundation.
|
||||
# All Rights Reserved
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from mock import patch
|
||||
import pecan
|
||||
import unittest
|
||||
|
||||
from tricircle.cinder_apigw.controllers import volume_type
|
||||
from tricircle.common import context
|
||||
from tricircle.db import api as db_api
|
||||
from tricircle.db import core
|
||||
|
||||
|
||||
class FakeResponse(object):
|
||||
def __new__(cls, code=500):
|
||||
cls.status = code
|
||||
cls.status_code = code
|
||||
return super(FakeResponse, cls).__new__(cls)
|
||||
|
||||
|
||||
class VolumeTypeTest(unittest.TestCase):
|
||||
def setUp(self):
|
||||
core.initialize()
|
||||
core.ModelBase.metadata.create_all(core.get_engine())
|
||||
self.context = context.get_admin_context()
|
||||
self.project_id = 'test_project'
|
||||
self.controller = volume_type.VolumeTypeController(self.project_id)
|
||||
|
||||
def _validate_error_code(self, res, code):
|
||||
self.assertEqual(code, res[res.keys()[0]]['code'])
|
||||
|
||||
@patch.object(pecan, 'response', new=FakeResponse)
|
||||
@patch.object(context, 'extract_context_from_environ')
|
||||
def test_post(self, mock_context):
|
||||
mock_context.return_value = self.context
|
||||
|
||||
body = {'volume_type': {'name': 'vol-type-001',
|
||||
'description': 'volume type 001',
|
||||
'os-volume-type-access:is_public': True,
|
||||
'extra_specs': {
|
||||
'capabilities': 'gpu',
|
||||
}}}
|
||||
self.controller.post(**body)
|
||||
res = db_api.volume_type_get_by_name(self.context, 'vol-type-001')
|
||||
|
||||
self.assertEqual('vol-type-001', res['name'])
|
||||
self.assertEqual('volume type 001', res['description'])
|
||||
capabilities = res['extra_specs']['capabilities']
|
||||
self.assertEqual('gpu', capabilities)
|
||||
|
||||
# failure case, only admin can create volume type
|
||||
self.context.is_admin = False
|
||||
res = self.controller.post(**body)
|
||||
self._validate_error_code(res, 403)
|
||||
|
||||
self.context.is_admin = True
|
||||
|
||||
# failure case, volume_type body is required
|
||||
body = {'name': 'vol-type-002'}
|
||||
res = self.controller.post(**body)
|
||||
self._validate_error_code(res, 400)
|
||||
|
||||
# failure case, volume type name is empty
|
||||
body = {'volume_type': {'name': '',
|
||||
'description': 'volume type 001',
|
||||
'os-volume-type-access:is_public': True,
|
||||
'extra_specs': {
|
||||
'capabilities': 'gpu',
|
||||
}}}
|
||||
res = self.controller.post(**body)
|
||||
self._validate_error_code(res, 400)
|
||||
|
||||
# failure case, volume type name has more than 255 characters
|
||||
body = {'volume_type': {'name': ('a' * 500),
|
||||
'description': 'volume type 001',
|
||||
'os-volume-type-access:is_public': True,
|
||||
'extra_specs': {
|
||||
'capabilities': 'gpu', }
|
||||
}
|
||||
}
|
||||
res = self.controller.post(**body)
|
||||
self._validate_error_code(res, 400)
|
||||
|
||||
# failure case, volume type description has more than 255 characters
|
||||
body = {'volume_type': {'name': 'vol-type-001',
|
||||
'description': ('a' * 500),
|
||||
'os-volume-type-access:is_public': True,
|
||||
'extra_specs': {
|
||||
'capabilities': 'gpu',
|
||||
}}}
|
||||
self.controller.post(**body)
|
||||
self._validate_error_code(res, 400)
|
||||
|
||||
# failure case, is_public is invalid input
|
||||
body = {'volume_type': {'name': 'vol-type-001',
|
||||
'description': 'volume type 001',
|
||||
'os-volume-type-access:is_public': 'a',
|
||||
'extra_specs': {
|
||||
'capabilities': 'gpu',
|
||||
}}}
|
||||
res = self.controller.post(**body)
|
||||
self._validate_error_code(res, 400)
|
||||
|
||||
# failure case, volume type name is unique
|
||||
body = {'volume_type': {'name': 'vol-type-001',
|
||||
'description': 'volume type 001',
|
||||
'os-volume-type-access:is_public': True,
|
||||
'extra_specs': {
|
||||
'capabilities': 'gpu',
|
||||
}}}
|
||||
res = self.controller.post(**body)
|
||||
self._validate_error_code(res, 409)
|
||||
|
||||
@patch.object(pecan, 'response', new=FakeResponse)
|
||||
@patch.object(context, 'extract_context_from_environ')
|
||||
def test_get_one(self, mock_context):
|
||||
mock_context.return_value = self.context
|
||||
|
||||
body = {'volume_type': {'name': 'vol-type-001',
|
||||
'description': 'volume type 001',
|
||||
'os-volume-type-access:is_public': True,
|
||||
'extra_specs': {
|
||||
'capabilities': 'gpu',
|
||||
}}}
|
||||
self.controller.post(**body)
|
||||
vtype = db_api.volume_type_get_by_name(self.context, 'vol-type-001')
|
||||
res = self.controller.get_one(vtype['id'])['volume_type']
|
||||
|
||||
self.assertEqual('vol-type-001', res['name'])
|
||||
self.assertEqual('volume type 001', res['description'])
|
||||
capabilities = res['extra_specs']['capabilities']
|
||||
self.assertEqual('gpu', capabilities)
|
||||
|
||||
# failure case, volume type is not exist.
|
||||
fake_id = "Fake_ID"
|
||||
res = self.controller.get_one(fake_id)
|
||||
self._validate_error_code(res, 404)
|
||||
|
||||
# failure case, the volume type is private.
|
||||
body = {'volume_type': {'name': 'vol-type-002',
|
||||
'description': 'volume type 002',
|
||||
'os-volume-type-access:is_public': False,
|
||||
'extra_specs': {
|
||||
'capabilities': 'gpu',
|
||||
}}}
|
||||
self.controller.post(**body)
|
||||
vtype = db_api.volume_type_get_by_name(self.context, 'vol-type-002')
|
||||
self.context.is_admin = False
|
||||
res = self.controller.get_one(vtype['id'])
|
||||
self._validate_error_code(res, 404)
|
||||
|
||||
@patch.object(context, 'extract_context_from_environ')
|
||||
def test_get_all(self, mock_context):
|
||||
mock_context.return_value = self.context
|
||||
|
||||
volume_type_001 = {'volume_type': {'name': 'vol-type-001',
|
||||
'description': 'volume type 001',
|
||||
'os-volume-'
|
||||
'type-access:is_public': True,
|
||||
'extra_specs': {
|
||||
'capabilities': 'gpu',
|
||||
}}}
|
||||
volume_type_002 = {'volume_type': {'name': 'vol-type-002',
|
||||
'description': 'volume type 002',
|
||||
'os-volume-'
|
||||
'type-access:is_public': True,
|
||||
'extra_specs': {
|
||||
'capabilities': 'gpu',
|
||||
}}}
|
||||
self.controller.post(**volume_type_001)
|
||||
self.controller.post(**volume_type_002)
|
||||
volume_types = self.controller.get_all()['volume_types']
|
||||
|
||||
self.assertEqual('vol-type-001', volume_types[0]['name'])
|
||||
self.assertEqual('volume type 001', volume_types[0]['description'])
|
||||
capabilities_001 = volume_types[0]['extra_specs']['capabilities']
|
||||
self.assertEqual('gpu', capabilities_001)
|
||||
|
||||
self.assertEqual('vol-type-002', volume_types[1]['name'])
|
||||
self.assertEqual('volume type 002', volume_types[1]['description'])
|
||||
capabilities_002 = volume_types[1]['extra_specs']['capabilities']
|
||||
self.assertEqual('gpu', capabilities_002)
|
||||
|
||||
@patch.object(pecan, 'response', new=FakeResponse)
|
||||
@patch.object(context, 'extract_context_from_environ')
|
||||
def test_put(self, mock_context):
|
||||
mock_context.return_value = self.context
|
||||
|
||||
body = {'volume_type': {'name': 'vol-type-001',
|
||||
'description': 'volume type 001',
|
||||
'os-volume-type-access:is_public': True,
|
||||
'extra_specs': {
|
||||
'capabilities': 'gpu',
|
||||
}}}
|
||||
body_update = {'volume_type': {'name': 'vol-type-002',
|
||||
'description': 'volume type 002',
|
||||
'os-volume-'
|
||||
'type-access:is_public': True,
|
||||
'extra_specs': {
|
||||
'capabilities': 'gpu',
|
||||
}}}
|
||||
self.controller.post(**body)
|
||||
vtype = db_api.volume_type_get_by_name(self.context, 'vol-type-001')
|
||||
res = self.controller.put(vtype['id'], **body_update)['volume_type']
|
||||
|
||||
self.assertEqual('vol-type-002', res['name'])
|
||||
self.assertEqual('volume type 002', res['description'])
|
||||
capabilities = res['extra_specs']['capabilities']
|
||||
self.assertEqual('gpu', capabilities)
|
||||
|
||||
# failure case, volume type name, description, is_public
|
||||
# not None at the same time
|
||||
body = {'volume_type': {'extra_specs': {
|
||||
'capabilities': 'gpu',
|
||||
}}}
|
||||
res = self.controller.put(vtype['id'], **body)
|
||||
self._validate_error_code(res, 400)
|
||||
# failure case, name exists in db
|
||||
body = {'volume_type': {'name': 'vol-type-003',
|
||||
'description': 'volume type 003',
|
||||
'os-volume-type-access:is_public': True,
|
||||
'extra_specs': {
|
||||
'capabilities': 'gpu',
|
||||
}}}
|
||||
self.controller.post(**body)
|
||||
res = self.controller.put(vtype['id'], **body)
|
||||
self._validate_error_code(res, 500)
|
||||
|
||||
@patch.object(pecan, 'response', new=FakeResponse)
|
||||
@patch.object(db_api, 'volume_type_delete')
|
||||
@patch.object(context, 'extract_context_from_environ')
|
||||
def test_delete(self, mock_context, mock_delete):
|
||||
mock_context.return_value = self.context
|
||||
mock_delete.return_value = Exception()
|
||||
|
||||
body = {'volume_type': {'name': 'vol-type-001',
|
||||
'description': 'volume type 001',
|
||||
'os-volume-type-access:is_public': True,
|
||||
'extra_specs': {
|
||||
'capabilities': 'gpu',
|
||||
}}}
|
||||
self.controller.post(**body)
|
||||
vtype = db_api.volume_type_get_by_name(self.context, 'vol-type-001')
|
||||
|
||||
# failure case, only admin delete create volume type
|
||||
self.context.is_admin = False
|
||||
res = self.controller.delete(vtype['id'])
|
||||
self._validate_error_code(res, 403)
|
||||
|
||||
# failure case, bad request
|
||||
self.context.is_admin = True
|
||||
res = self.controller.delete(_id=None)
|
||||
self._validate_error_code(res, 404)
|
||||
|
||||
res = self.controller.delete(vtype['id'])
|
||||
self.assertEqual(res.status, 202)
|
||||
|
||||
def tearDown(self):
|
||||
core.ModelBase.metadata.drop_all(core.get_engine())
|
Loading…
Reference in New Issue
Block a user