Update license info

Change-Id: Ib1b399235853f240f59b142b14cfdcebda2451f6
Signed-off-by: Zhijiang Hu <hu.zhijiang@zte.com.cn>
This commit is contained in:
Zhijiang Hu 2017-08-17 06:05:06 -04:00
parent 49716fc081
commit c9ff2e7ec4
36 changed files with 49 additions and 7475 deletions

View File

@ -1,3 +1,6 @@
# Copyright 2012 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at

View File

@ -1,5 +1,4 @@
# Copyright 2013 OpenStack Foundation
# Copyright 2013 Spanish National Research Council.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may

View File

@ -1,7 +1,4 @@
# Copyright 2010 Jacob Kaplan-Moss
# Copyright 2011 OpenStack Foundation
# Copyright 2012 Grid Dynamics
# Copyright 2013 OpenStack Foundation
# Copyright 2012 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may

View File

@ -1,9 +1,4 @@
# Copyright 2010 Jacob Kaplan-Moss
# Copyright 2011 OpenStack Foundation
# Copyright 2011 Piston Cloud Computing, Inc.
# Copyright 2013 Alessio Ababilov
# Copyright 2013 Grid Dynamics
# Copyright 2013 OpenStack Foundation
# Copyright 2012 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may

View File

@ -1,7 +1,4 @@
# Copyright 2010 Jacob Kaplan-Moss
# Copyright 2011 Nebula, Inc.
# Copyright 2013 Alessio Ababilov
# Copyright 2013 OpenStack Foundation
# Copyright 2012 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may

View File

@ -1,3 +1,5 @@
# Copyright 2012 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain

View File

@ -1,3 +1,18 @@
# Copyright 2012 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
# -*- coding:utf-8 -*-
import os

View File

@ -1,62 +0,0 @@
# Copyright 2012 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from daisyclient.common import http
from daisyclient.common import utils
from daisyclient.v2 import image_members
from daisyclient.v2 import image_tags
from daisyclient.v2 import images
from daisyclient.v2 import metadefs
from daisyclient.v2 import schemas
from daisyclient.v2 import tasks
class Client(object):
"""Client for the OpenStack Images v2 API.
:param string endpoint: A user-supplied endpoint URL for the glance
service.
:param string token: Token for authentication.
:param integer timeout: Allows customization of the timeout for client
http requests. (optional)
"""
def __init__(self, endpoint, *args, **kwargs):
endpoint, version = utils.strip_version(endpoint)
self.version = version or 2.0
self.http_client = http.HTTPClient(endpoint, *args, **kwargs)
self.schemas = schemas.Controller(self.http_client)
self.images = images.Controller(self.http_client, self.schemas)
self.image_tags = image_tags.Controller(self.http_client,
self.schemas)
self.image_members = image_members.Controller(self.http_client,
self.schemas)
self.tasks = tasks.Controller(self.http_client, self.schemas)
self.metadefs_resource_type = (
metadefs.ResourceTypeController(self.http_client, self.schemas))
self.metadefs_property = (
metadefs.PropertyController(self.http_client, self.schemas))
self.metadefs_object = (
metadefs.ObjectController(self.http_client, self.schemas))
self.metadefs_namespace = (
metadefs.NamespaceController(self.http_client, self.schemas))

View File

@ -1,55 +0,0 @@
# Copyright 2013 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import warlock
from daisyclient.common import utils
from daisyclient.v2 import schemas
MEMBER_STATUS_VALUES = ('accepted', 'rejected', 'pending')
class Controller(object):
def __init__(self, http_client, schema_client):
self.http_client = http_client
self.schema_client = schema_client
@utils.memoized_property
def model(self):
schema = self.schema_client.get('member')
return warlock.model_factory(schema.raw(), schemas.SchemaBasedModel)
def list(self, image_id):
url = '/v2/images/%s/members' % image_id
resp, body = self.http_client.get(url)
for member in body['members']:
yield self.model(member)
def delete(self, image_id, member_id):
self.http_client.delete('/v2/images/%s/members/%s' %
(image_id, member_id))
def update(self, image_id, member_id, member_status):
url = '/v2/images/%s/members/%s' % (image_id, member_id)
body = {'status': member_status}
resp, updated_member = self.http_client.put(url, data=body)
return self.model(updated_member)
def create(self, image_id, member_id):
url = '/v2/images/%s/members' % image_id
body = {'member': member_id}
resp, created_member = self.http_client.post(url, data=body)
return self.model(created_member)

View File

@ -1,50 +0,0 @@
# Copyright 2013 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import warlock
from daisyclient.common import utils
from daisyclient.v2 import schemas
class Controller(object):
def __init__(self, http_client, schema_client):
self.http_client = http_client
self.schema_client = schema_client
@utils.memoized_property
def model(self):
schema = self.schema_client.get('image')
return warlock.model_factory(schema.raw(), schemas.SchemaBasedModel)
def update(self, image_id, tag_value):
"""
Update an image with the given tag.
:param image_id: image to be updated with the given tag.
:param tag_value: value of the tag.
"""
url = '/v2/images/%s/tags/%s' % (image_id, tag_value)
self.http_client.put(url)
def delete(self, image_id, tag_value):
"""
Delete the tag associated with the given image.
:param image_id: Image whose tag to be deleted.
:param tag_value: tag value to be deleted.
"""
url = '/v2/images/%s/tags/%s' % (image_id, tag_value)
self.http_client.delete(url)

View File

@ -1,358 +0,0 @@
# Copyright 2012 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import json
from oslo_utils import encodeutils
import six
from six.moves.urllib import parse
import warlock
from daisyclient.common import utils
from daisyclient import exc
from daisyclient.v2 import schemas
DEFAULT_PAGE_SIZE = 20
SORT_DIR_VALUES = ('asc', 'desc')
SORT_KEY_VALUES = ('name', 'status', 'container_format', 'disk_format',
'size', 'id', 'created_at', 'updated_at')
class Controller(object):
def __init__(self, http_client, schema_client):
self.http_client = http_client
self.schema_client = schema_client
@utils.memoized_property
def model(self):
schema = self.schema_client.get('image')
return warlock.model_factory(schema.raw(), schemas.SchemaBasedModel)
@staticmethod
def _wrap(value):
if isinstance(value, six.string_types):
return [value]
return value
@staticmethod
def _validate_sort_param(sort):
"""Validates sorting argument for invalid keys and directions values.
:param sort: comma-separated list of sort keys with optional <:dir>
after each key
"""
for sort_param in sort.strip().split(','):
key, _sep, dir = sort_param.partition(':')
if dir and dir not in SORT_DIR_VALUES:
msg = ('Invalid sort direction: %(sort_dir)s.'
' It must be one of the following: %(available)s.'
) % {'sort_dir': dir,
'available': ', '.join(SORT_DIR_VALUES)}
raise exc.HTTPBadRequest(msg)
if key not in SORT_KEY_VALUES:
msg = ('Invalid sort key: %(sort_key)s.'
' It must be one of the following: %(available)s.'
) % {'sort_key': key,
'available': ', '.join(SORT_KEY_VALUES)}
raise exc.HTTPBadRequest(msg)
return sort
def list(self, **kwargs):
"""Retrieve a listing of Image objects
:param page_size: Number of images to request in each paginated request
:returns generator over list of Images
"""
ori_validate_fun = self.model.validate
empty_fun = lambda *args, **kwargs: None
limit = kwargs.get('limit')
# NOTE(flaper87): Don't use `get('page_size', DEFAULT_SIZE)` otherwise,
# it could be possible to send invalid data to the server by passing
# page_size=None.
page_size = kwargs.get('page_size') or DEFAULT_PAGE_SIZE
def paginate(url, page_size, limit=None):
next_url = url
while True:
if limit and page_size > limit:
# NOTE(flaper87): Avoid requesting 2000 images when limit
# is 1
next_url = next_url.replace("limit=%s" % page_size,
"limit=%s" % limit)
resp, body = self.http_client.get(next_url)
for image in body['images']:
# NOTE(bcwaldon): remove 'self' for now until we have
# an elegant way to pass it into the model constructor
# without conflict.
image.pop('self', None)
yield self.model(**image)
# NOTE(zhiyan): In order to resolve the performance issue
# of JSON schema validation for image listing case, we
# don't validate each image entry but do it only on first
# image entry for each page.
self.model.validate = empty_fun
if limit:
limit -= 1
if limit <= 0:
raise StopIteration
# NOTE(zhiyan); Reset validation function.
self.model.validate = ori_validate_fun
try:
next_url = body['next']
except KeyError:
return
filters = kwargs.get('filters', {})
# NOTE(flaper87): We paginate in the client, hence we use
# the page_size as Glance's limit.
filters['limit'] = page_size
tags = filters.pop('tag', [])
tags_url_params = []
for tag in tags:
if isinstance(tag, six.string_types):
tags_url_params.append({'tag': encodeutils.safe_encode(tag)})
for param, value in six.iteritems(filters):
if isinstance(value, six.string_types):
filters[param] = encodeutils.safe_encode(value)
url = '/v2/images?%s' % parse.urlencode(filters)
for param in tags_url_params:
url = '%s&%s' % (url, parse.urlencode(param))
if 'sort' in kwargs:
if 'sort_key' in kwargs or 'sort_dir' in kwargs:
raise exc.HTTPBadRequest("The 'sort' argument is not supported"
" with 'sort_key' or 'sort_dir'.")
url = '%s&sort=%s' % (url,
self._validate_sort_param(
kwargs['sort']))
else:
sort_dir = self._wrap(kwargs.get('sort_dir', []))
sort_key = self._wrap(kwargs.get('sort_key', []))
if len(sort_key) != len(sort_dir) and len(sort_dir) > 1:
raise exc.HTTPBadRequest(
"Unexpected number of sort directions: "
"either provide a single sort direction or an equal "
"number of sort keys and sort directions.")
for key in sort_key:
url = '%s&sort_key=%s' % (url, key)
for dir in sort_dir:
url = '%s&sort_dir=%s' % (url, dir)
for image in paginate(url, page_size, limit):
yield image
def get(self, image_id):
url = '/v2/images/%s' % image_id
resp, body = self.http_client.get(url)
# NOTE(bcwaldon): remove 'self' for now until we have an elegant
# way to pass it into the model constructor without conflict
body.pop('self', None)
return self.model(**body)
def data(self, image_id, do_checksum=True):
"""
Retrieve data of an image.
:param image_id: ID of the image to download.
:param do_checksum: Enable/disable checksum validation.
"""
url = '/v2/images/%s/file' % image_id
resp, body = self.http_client.get(url)
checksum = resp.headers.get('content-md5', None)
content_length = int(resp.headers.get('content-length', 0))
if do_checksum and checksum is not None:
body = utils.integrity_iter(body, checksum)
return utils.IterableWithLength(body, content_length)
def upload(self, image_id, image_data, image_size=None):
"""
Upload the data for an image.
:param image_id: ID of the image to upload data for.
:param image_data: File-like object supplying the data to upload.
:param image_size: Total size in bytes of image to be uploaded.
"""
url = '/v2/images/%s/file' % image_id
hdrs = {'Content-Type': 'application/octet-stream'}
if image_size:
body = {'image_data': image_data,
'image_size': image_size}
else:
body = image_data
self.http_client.put(url, headers=hdrs, data=body)
def delete(self, image_id):
"""Delete an image."""
url = '/v2/images/%s' % image_id
self.http_client.delete(url)
def create(self, **kwargs):
"""Create an image."""
url = '/v2/images'
image = self.model()
for (key, value) in kwargs.items():
try:
setattr(image, key, value)
except warlock.InvalidOperation as e:
raise TypeError(utils.exception_to_str(e))
resp, body = self.http_client.post(url, data=image)
# NOTE(esheffield): remove 'self' for now until we have an elegant
# way to pass it into the model constructor without conflict
body.pop('self', None)
return self.model(**body)
def update(self, image_id, remove_props=None, **kwargs):
"""
Update attributes of an image.
:param image_id: ID of the image to modify.
:param remove_props: List of property names to remove
:param **kwargs: Image attribute names and their new values.
"""
image = self.get(image_id)
for (key, value) in kwargs.items():
try:
setattr(image, key, value)
except warlock.InvalidOperation as e:
raise TypeError(utils.exception_to_str(e))
if remove_props is not None:
cur_props = image.keys()
new_props = kwargs.keys()
# NOTE(esheffield): Only remove props that currently exist on the
# image and are NOT in the properties being updated / added
props_to_remove = set(cur_props).intersection(
set(remove_props).difference(new_props))
for key in props_to_remove:
delattr(image, key)
url = '/v2/images/%s' % image_id
hdrs = {'Content-Type': 'application/openstack-images-v2.1-json-patch'}
self.http_client.patch(url, headers=hdrs, data=image.patch)
# NOTE(bcwaldon): calling image.patch doesn't clear the changes, so
# we need to fetch the image again to get a clean history. This is
# an obvious optimization for warlock
return self.get(image_id)
def _get_image_with_locations_or_fail(self, image_id):
image = self.get(image_id)
if getattr(image, 'locations', None) is None:
raise exc.HTTPBadRequest('The administrator has disabled '
'API access to image locations')
return image
def _send_image_update_request(self, image_id, patch_body):
url = '/v2/images/%s' % image_id
hdrs = {'Content-Type': 'application/openstack-images-v2.1-json-patch'}
self.http_client.patch(url, headers=hdrs, data=json.dumps(patch_body))
def add_location(self, image_id, url, metadata):
"""Add a new location entry to an image's list of locations.
It is an error to add a URL that is already present in the list of
locations.
:param image_id: ID of image to which the location is to be added.
:param url: URL of the location to add.
:param metadata: Metadata associated with the location.
:returns: The updated image
"""
image = self._get_image_with_locations_or_fail(image_id)
url_list = [l['url'] for l in image.locations]
if url in url_list:
err_str = 'A location entry at %s already exists' % url
raise exc.HTTPConflict(err_str)
add_patch = [{'op': 'add', 'path': '/locations/-',
'value': {'url': url, 'metadata': metadata}}]
self._send_image_update_request(image_id, add_patch)
return self.get(image_id)
def delete_locations(self, image_id, url_set):
"""Remove one or more location entries of an image.
:param image_id: ID of image from which locations are to be removed.
:param url_set: set of URLs of location entries to remove.
:returns: None
"""
image = self._get_image_with_locations_or_fail(image_id)
current_urls = [l['url'] for l in image.locations]
missing_locs = url_set.difference(set(current_urls))
if missing_locs:
raise exc.HTTPNotFound('Unknown URL(s): %s' % list(missing_locs))
# NOTE: warlock doesn't generate the most efficient patch for remove
# operations (it shifts everything up and deletes the tail elements) so
# we do it ourselves.
url_indices = [current_urls.index(url) for url in url_set]
url_indices.sort(reverse=True)
patches = [{'op': 'remove', 'path': '/locations/%s' % url_idx}
for url_idx in url_indices]
self._send_image_update_request(image_id, patches)
def update_location(self, image_id, url, metadata):
"""Update an existing location entry in an image's list of locations.
The URL specified must be already present in the image's list of
locations.
:param image_id: ID of image whose location is to be updated.
:param url: URL of the location to update.
:param metadata: Metadata associated with the location.
:returns: The updated image
"""
image = self._get_image_with_locations_or_fail(image_id)
url_map = dict([(l['url'], l) for l in image.locations])
if url not in url_map:
raise exc.HTTPNotFound('Unknown URL: %s' % url)
if url_map[url]['metadata'] == metadata:
return image
# NOTE: The server (as of now) doesn't support modifying individual
# location entries. So we must:
# 1. Empty existing list of locations.
# 2. Send another request to set 'locations' to the new list
# of locations.
url_map[url]['metadata'] = metadata
patches = [{'op': 'replace',
'path': '/locations',
'value': p} for p in ([], list(url_map.values()))]
self._send_image_update_request(image_id, patches)
return self.get(image_id)

View File

@ -1,387 +0,0 @@
# Copyright 2014 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_utils import encodeutils
import six
from six.moves.urllib import parse
import warlock
from daisyclient.common import utils
from daisyclient.v2 import schemas
DEFAULT_PAGE_SIZE = 20
SORT_DIR_VALUES = ('asc', 'desc')
SORT_KEY_VALUES = ('created_at', 'namespace')
class NamespaceController(object):
def __init__(self, http_client, schema_client):
self.http_client = http_client
self.schema_client = schema_client
@utils.memoized_property
def model(self):
schema = self.schema_client.get('metadefs/namespace')
return warlock.model_factory(schema.raw(), schemas.SchemaBasedModel)
def create(self, **kwargs):
"""Create a namespace.
:param kwargs: Unpacked namespace object.
"""
url = '/v2/metadefs/namespaces'
try:
namespace = self.model(kwargs)
except (warlock.InvalidOperation, ValueError) as e:
raise TypeError(utils.exception_to_str(e))
resp, body = self.http_client.post(url, data=namespace)
body.pop('self', None)
return self.model(**body)
def update(self, namespace_name, **kwargs):
"""Update a namespace.
:param namespace_name: Name of a namespace (old one).
:param kwargs: Unpacked namespace object.
"""
namespace = self.get(namespace_name)
for (key, value) in six.iteritems(kwargs):
try:
setattr(namespace, key, value)
except warlock.InvalidOperation as e:
raise TypeError(utils.exception_to_str(e))
# Remove read-only parameters.
read_only = ['schema', 'updated_at', 'created_at']
for elem in read_only:
if elem in namespace:
del namespace[elem]
url = '/v2/metadefs/namespaces/{0}'.format(namespace_name)
self.http_client.put(url, data=namespace)
return self.get(namespace.namespace)
def get(self, namespace, **kwargs):
"""Get one namespace."""
query_params = parse.urlencode(kwargs)
if kwargs:
query_params = '?%s' % query_params
url = '/v2/metadefs/namespaces/{0}{1}'.format(namespace, query_params)
resp, body = self.http_client.get(url)
# NOTE(bcwaldon): remove 'self' for now until we have an elegant
# way to pass it into the model constructor without conflict
body.pop('self', None)
return self.model(**body)
def list(self, **kwargs):
"""Retrieve a listing of Namespace objects
:param page_size: Number of items to request in each paginated request
:param limit: Use to request a specific page size. Expect a response
to a limited request to return between zero and limit
items.
:param marker: Specifies the namespace of the last-seen namespace.
The typical pattern of limit and marker is to make an
initial limited request and then to use the last
namespace from the response as the marker parameter
in a subsequent limited request.
:param sort_key: The field to sort on (for example, 'created_at')
:param sort_dir: The direction to sort ('asc' or 'desc')
:returns generator over list of Namespaces
"""
ori_validate_fun = self.model.validate
empty_fun = lambda *args, **kwargs: None
def paginate(url):
resp, body = self.http_client.get(url)
for namespace in body['namespaces']:
# NOTE(bcwaldon): remove 'self' for now until we have
# an elegant way to pass it into the model constructor
# without conflict.
namespace.pop('self', None)
yield self.model(**namespace)
# NOTE(zhiyan): In order to resolve the performance issue
# of JSON schema validation for image listing case, we
# don't validate each image entry but do it only on first
# image entry for each page.
self.model.validate = empty_fun
# NOTE(zhiyan); Reset validation function.
self.model.validate = ori_validate_fun
try:
next_url = body['next']
except KeyError:
return
else:
for namespace in paginate(next_url):
yield namespace
filters = kwargs.get('filters', {})
filters = {} if filters is None else filters
if not kwargs.get('page_size'):
filters['limit'] = DEFAULT_PAGE_SIZE
else:
filters['limit'] = kwargs['page_size']
if 'marker' in kwargs:
filters['marker'] = kwargs['marker']
sort_key = kwargs.get('sort_key')
if sort_key is not None:
if sort_key in SORT_KEY_VALUES:
filters['sort_key'] = sort_key
else:
raise ValueError('sort_key must be one of the following: %s.'
% ', '.join(SORT_KEY_VALUES))
sort_dir = kwargs.get('sort_dir')
if sort_dir is not None:
if sort_dir in SORT_DIR_VALUES:
filters['sort_dir'] = sort_dir
else:
raise ValueError('sort_dir must be one of the following: %s.'
% ', '.join(SORT_DIR_VALUES))
for param, value in six.iteritems(filters):
if isinstance(value, list):
filters[param] = encodeutils.safe_encode(','.join(value))
elif isinstance(value, six.string_types):
filters[param] = encodeutils.safe_encode(value)
url = '/v2/metadefs/namespaces?%s' % parse.urlencode(filters)
for namespace in paginate(url):
yield namespace
def delete(self, namespace):
"""Delete a namespace."""
url = '/v2/metadefs/namespaces/{0}'.format(namespace)
self.http_client.delete(url)
class ResourceTypeController(object):
def __init__(self, http_client, schema_client):
self.http_client = http_client
self.schema_client = schema_client
@utils.memoized_property
def model(self):
schema = self.schema_client.get('metadefs/resource_type')
return warlock.model_factory(schema.raw(), schemas.SchemaBasedModel)
def associate(self, namespace, **kwargs):
"""Associate a resource type with a namespace."""
try:
res_type = self.model(kwargs)
except (warlock.InvalidOperation, ValueError) as e:
raise TypeError(utils.exception_to_str(e))
url = '/v2/metadefs/namespaces/{0}/resource_types'.format(namespace,
res_type)
resp, body = self.http_client.post(url, data=res_type)
body.pop('self', None)
return self.model(**body)
def deassociate(self, namespace, resource):
"""Deasociate a resource type with a namespace."""
url = '/v2/metadefs/namespaces/{0}/resource_types/{1}'. \
format(namespace, resource)
self.http_client.delete(url)
def list(self):
"""Retrieve a listing of available resource types
:returns generator over list of resource_types
"""
url = '/v2/metadefs/resource_types'
resp, body = self.http_client.get(url)
for resource_type in body['resource_types']:
yield self.model(**resource_type)
def get(self, namespace):
url = '/v2/metadefs/namespaces/{0}/resource_types'.format(namespace)
resp, body = self.http_client.get(url)
body.pop('self', None)
for resource_type in body['resource_type_associations']:
yield self.model(**resource_type)
class PropertyController(object):
def __init__(self, http_client, schema_client):
self.http_client = http_client
self.schema_client = schema_client
@utils.memoized_property
def model(self):
schema = self.schema_client.get('metadefs/property')
return warlock.model_factory(schema.raw(), schemas.SchemaBasedModel)
def create(self, namespace, **kwargs):
"""Create a property.
:param namespace: Name of a namespace the property will belong.
:param kwargs: Unpacked property object.
"""
try:
prop = self.model(kwargs)
except (warlock.InvalidOperation, ValueError) as e:
raise TypeError(utils.exception_to_str(e))
url = '/v2/metadefs/namespaces/{0}/properties'.format(namespace)
resp, body = self.http_client.post(url, data=prop)
body.pop('self', None)
return self.model(**body)
def update(self, namespace, prop_name, **kwargs):
"""Update a property.
:param namespace: Name of a namespace the property belongs.
:param prop_name: Name of a property (old one).
:param kwargs: Unpacked property object.
"""
prop = self.get(namespace, prop_name)
for (key, value) in kwargs.items():
try:
setattr(prop, key, value)
except warlock.InvalidOperation as e:
raise TypeError(utils.exception_to_str(e))
url = '/v2/metadefs/namespaces/{0}/properties/{1}'.format(namespace,
prop_name)
self.http_client.put(url, data=prop)
return self.get(namespace, prop.name)
def get(self, namespace, prop_name):
url = '/v2/metadefs/namespaces/{0}/properties/{1}'.format(namespace,
prop_name)
resp, body = self.http_client.get(url)
body.pop('self', None)
body['name'] = prop_name
return self.model(**body)
def list(self, namespace, **kwargs):
"""Retrieve a listing of metadata properties
:returns generator over list of objects
"""
url = '/v2/metadefs/namespaces/{0}/properties'.format(namespace)
resp, body = self.http_client.get(url)
for key, value in body['properties'].items():
value['name'] = key
yield self.model(value)
def delete(self, namespace, prop_name):
"""Delete a property."""
url = '/v2/metadefs/namespaces/{0}/properties/{1}'.format(namespace,
prop_name)
self.http_client.delete(url)
def delete_all(self, namespace):
"""Delete all properties in a namespace."""
url = '/v2/metadefs/namespaces/{0}/properties'.format(namespace)
self.http_client.delete(url)
class ObjectController(object):
def __init__(self, http_client, schema_client):
self.http_client = http_client
self.schema_client = schema_client
@utils.memoized_property
def model(self):
schema = self.schema_client.get('metadefs/object')
return warlock.model_factory(schema.raw(), schemas.SchemaBasedModel)
def create(self, namespace, **kwargs):
"""Create an object.
:param namespace: Name of a namespace the object belongs.
:param kwargs: Unpacked object.
"""
try:
obj = self.model(kwargs)
except (warlock.InvalidOperation, ValueError) as e:
raise TypeError(utils.exception_to_str(e))
url = '/v2/metadefs/namespaces/{0}/objects'.format(namespace)
resp, body = self.http_client.post(url, data=obj)
body.pop('self', None)
return self.model(**body)
def update(self, namespace, object_name, **kwargs):
"""Update an object.
:param namespace: Name of a namespace the object belongs.
:param prop_name: Name of an object (old one).
:param kwargs: Unpacked object.
"""
obj = self.get(namespace, object_name)
for (key, value) in kwargs.items():
try:
setattr(obj, key, value)
except warlock.InvalidOperation as e:
raise TypeError(utils.exception_to_str(e))
# Remove read-only parameters.
read_only = ['schema', 'updated_at', 'created_at']
for elem in read_only:
if elem in namespace:
del namespace[elem]
url = '/v2/metadefs/namespaces/{0}/objects/{1}'.format(namespace,
object_name)
self.http_client.put(url, data=obj)
return self.get(namespace, obj.name)
def get(self, namespace, object_name):
url = '/v2/metadefs/namespaces/{0}/objects/{1}'.format(namespace,
object_name)
resp, body = self.http_client.get(url)
body.pop('self', None)
return self.model(**body)
def list(self, namespace, **kwargs):
"""Retrieve a listing of metadata objects
:returns generator over list of objects
"""
url = '/v2/metadefs/namespaces/{0}/objects'.format(namespace,)
resp, body = self.http_client.get(url)
for obj in body['objects']:
yield self.model(obj)
def delete(self, namespace, object_name):
"""Delete an object."""
url = '/v2/metadefs/namespaces/{0}/objects/{1}'.format(namespace,
object_name)
self.http_client.delete(url)
def delete_all(self, namespace):
"""Delete all objects in a namespace."""
url = '/v2/metadefs/namespaces/{0}/objects'.format(namespace)
self.http_client.delete(url)

View File

@ -1,105 +0,0 @@
# Copyright 2012 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import copy
import json
import jsonpatch
import six
import warlock.model as warlock
class SchemaBasedModel(warlock.Model):
"""Glance specific subclass of the warlock Model
This implementation alters the function of the patch property
to take into account the schema's core properties. With this version
undefined properties which are core will generated 'replace'
operations rather than 'add' since this is what the Glance API
expects.
"""
def _make_custom_patch(self, new, original):
if not self.get('tags'):
tags_patch = []
else:
tags_patch = [{"path": "/tags",
"value": self.get('tags'),
"op": "replace"}]
patch_string = jsonpatch.make_patch(original, new).to_string()
patch = json.loads(patch_string)
if not patch:
return json.dumps(tags_patch)
else:
return json.dumps(patch + tags_patch)
@warlock.Model.patch.getter
def patch(self):
"""Return a jsonpatch object representing the delta."""
original = copy.deepcopy(self.__dict__['__original__'])
new = dict(self)
if self.schema:
for (name, prop) in six.iteritems(self.schema['properties']):
if (name not in original and name in new and
prop.get('is_base', True)):
original[name] = None
original['tags'] = None
new['tags'] = None
return self._make_custom_patch(new, original)
class SchemaProperty(object):
def __init__(self, name, **kwargs):
self.name = name
self.description = kwargs.get('description')
def translate_schema_properties(schema_properties):
"""Parse the properties dictionary of a schema document
:returns list of SchemaProperty objects
"""
properties = []
for (name, prop) in schema_properties.items():
properties.append(SchemaProperty(name, **prop))
return properties
class Schema(object):
def __init__(self, raw_schema):
self._raw_schema = raw_schema
self.name = raw_schema['name']
raw_properties = raw_schema['properties']
self.properties = translate_schema_properties(raw_properties)
def is_core_property(self, property_name):
for prop in self.properties:
if property_name == prop.name:
return True
return False
def raw(self):
return copy.deepcopy(self._raw_schema)
class Controller(object):
def __init__(self, http_client):
self.http_client = http_client
def get(self, schema_name):
uri = '/v2/schemas/%s' % schema_name
_, raw_schema = self.http_client.get(uri)
return Schema(raw_schema)

View File

@ -1,822 +0,0 @@
# Copyright 2012 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from daisyclient.common import progressbar
from daisyclient.common import utils
from daisyclient import exc
from daisyclient.v2.image_members import MEMBER_STATUS_VALUES
from daisyclient.v2 import images
from daisyclient.v2 import tasks
import json
import os
from os.path import expanduser
IMAGE_SCHEMA = None
def get_image_schema():
global IMAGE_SCHEMA
if IMAGE_SCHEMA is None:
schema_path = expanduser("~/.glanceclient/image_schema.json")
if os.path.isfile(schema_path):
with open(schema_path, "r") as f:
schema_raw = f.read()
IMAGE_SCHEMA = json.loads(schema_raw)
return IMAGE_SCHEMA
@utils.schema_args(get_image_schema, omit=['created_at', 'updated_at', 'file',
'checksum', 'virtual_size', 'size',
'status', 'schema', 'direct_url'])
@utils.arg('--property', metavar="<key=value>", action='append',
default=[], help=('Arbitrary property to associate with image.'
' May be used multiple times.'))
@utils.arg('--file', metavar='<FILE>',
help='Local file that contains disk image to be uploaded '
'during creation. Alternatively, images can be passed '
'to the client via stdin.')
@utils.arg('--progress', action='store_true', default=False,
help='Show upload progress bar.')
def do_image_create(gc, args):
"""Create a new image."""
schema = gc.schemas.get("image")
_args = [(x[0].replace('-', '_'), x[1]) for x in vars(args).items()]
fields = dict(filter(lambda x: x[1] is not None and
(x[0] == 'property' or
schema.is_core_property(x[0])),
_args))
raw_properties = fields.pop('property', [])
for datum in raw_properties:
key, value = datum.split('=', 1)
fields[key] = value
file_name = fields.pop('file', None)
if file_name is not None and os.access(file_name, os.R_OK) is False:
utils.exit("File %s does not exist or user does not have read "
"privileges to it" % file_name)
image = gc.images.create(**fields)
try:
if utils.get_data_file(args) is not None:
args.id = image['id']
args.size = None
do_image_upload(gc, args)
image = gc.images.get(args.id)
finally:
utils.print_image(image)
@utils.arg('id', metavar='<IMAGE_ID>', help='ID of image to update.')
@utils.schema_args(get_image_schema, omit=['id', 'locations', 'created_at',
'updated_at', 'file', 'checksum',
'virtual_size', 'size', 'status',
'schema', 'direct_url', 'tags'])
@utils.arg('--property', metavar="<key=value>", action='append',
default=[], help=('Arbitrary property to associate with image.'
' May be used multiple times.'))
@utils.arg('--remove-property', metavar="key", action='append', default=[],
help="Name of arbitrary property to remove from the image.")
def do_image_update(gc, args):
"""Update an existing image."""
schema = gc.schemas.get("image")
_args = [(x[0].replace('-', '_'), x[1]) for x in vars(args).items()]
fields = dict(filter(lambda x: x[1] is not None and
(x[0] in ['property', 'remove_property'] or
schema.is_core_property(x[0])),
_args))
raw_properties = fields.pop('property', [])
for datum in raw_properties:
key, value = datum.split('=', 1)
fields[key] = value
remove_properties = fields.pop('remove_property', None)
image_id = fields.pop('id')
image = gc.images.update(image_id, remove_properties, **fields)
utils.print_image(image)
@utils.arg('--limit', metavar='<LIMIT>', default=None, type=int,
help='Maximum number of images to get.')
@utils.arg('--page-size', metavar='<SIZE>', default=None, type=int,
help='Number of images to request in each paginated request.')
@utils.arg('--visibility', metavar='<VISIBILITY>',
help='The visibility of the images to display.')
@utils.arg('--member-status', metavar='<MEMBER_STATUS>',
help='The status of images to display.')
@utils.arg('--owner', metavar='<OWNER>',
help='Display images owned by <OWNER>.')
@utils.arg('--property-filter', metavar='<KEY=VALUE>',
help="Filter images by a user-defined image property.",
action='append', dest='properties', default=[])
@utils.arg('--checksum', metavar='<CHECKSUM>',
help='Displays images that match the checksum.')
@utils.arg('--tag', metavar='<TAG>', action='append',
help="Filter images by a user-defined tag.")
@utils.arg('--sort-key', default=[], action='append',
choices=images.SORT_KEY_VALUES,
help='Sort image list by specified fields.')
@utils.arg('--sort-dir', default=[], action='append',
choices=images.SORT_DIR_VALUES,
help='Sort image list in specified directions.')
@utils.arg('--sort', metavar='<key>[:<direction>]', default=None,
help=(("Comma-separated list of sort keys and directions in the "
"form of <key>[:<asc|desc>]. Valid keys: %s. OPTIONAL: "
"Default='name:asc'.") % ', '.join(images.SORT_KEY_VALUES)))
def do_image_list(gc, args):
"""List images you can access."""
filter_keys = ['visibility', 'member_status', 'owner', 'checksum', 'tag']
filter_items = [(key, getattr(args, key)) for key in filter_keys]
if args.properties:
filter_properties = [prop.split('=', 1) for prop in args.properties]
if False in (len(pair) == 2 for pair in filter_properties):
utils.exit('Argument --property-filter expected properties in the'
' format KEY=VALUE')
filter_items += filter_properties
filters = dict([item for item in filter_items if item[1] is not None])
kwargs = {'filters': filters}
if args.limit is not None:
kwargs['limit'] = args.limit
if args.page_size is not None:
kwargs['page_size'] = args.page_size
if args.sort_key:
kwargs['sort_key'] = args.sort_key
if args.sort_dir:
kwargs['sort_dir'] = args.sort_dir
if args.sort is not None:
kwargs['sort'] = args.sort
elif not args.sort_dir and not args.sort_key:
kwargs['sort'] = 'name:asc'
images = gc.images.list(**kwargs)
columns = ['ID', 'Name']
utils.print_list(images, columns)
@utils.arg('id', metavar='<IMAGE_ID>', help='ID of image to describe.')
@utils.arg('--max-column-width', metavar='<integer>', default=80,
help='The max column width of the printed table.')
def do_image_show(gc, args):
"""Describe a specific image."""
image = gc.images.get(args.id)
utils.print_image(image, int(args.max_column_width))
@utils.arg('--image-id', metavar='<IMAGE_ID>', required=True,
help='Image to display members of.')
def do_member_list(gc, args):
"""Describe sharing permissions by image."""
members = gc.image_members.list(args.image_id)
columns = ['Image ID', 'Member ID', 'Status']
utils.print_list(members, columns)
@utils.arg('image_id', metavar='<IMAGE_ID>',
help='Image from which to remove member.')
@utils.arg('member_id', metavar='<MEMBER_ID>',
help='Tenant to remove as member.')
def do_member_delete(gc, args):
"""Delete image member."""
if not (args.image_id and args.member_id):
utils.exit('Unable to delete member. Specify image_id and member_id')
else:
gc.image_members.delete(args.image_id, args.member_id)
@utils.arg('image_id', metavar='<IMAGE_ID>',
help='Image from which to update member.')
@utils.arg('member_id', metavar='<MEMBER_ID>',
help='Tenant to update.')
@utils.arg('member_status', metavar='<MEMBER_STATUS>',
choices=MEMBER_STATUS_VALUES,
help='Updated status of member.'
' Valid Values: %s' %
', '.join(str(val) for val in MEMBER_STATUS_VALUES))
def do_member_update(gc, args):
"""Update the status of a member for a given image."""
if not (args.image_id and args.member_id and args.member_status):
utils.exit('Unable to update member. Specify image_id, member_id and'
' member_status')
else:
member = gc.image_members.update(args.image_id, args.member_id,
args.member_status)
member = [member]
columns = ['Image ID', 'Member ID', 'Status']
utils.print_list(member, columns)
@utils.arg('image_id', metavar='<IMAGE_ID>',
help='Image with which to create member.')
@utils.arg('member_id', metavar='<MEMBER_ID>',
help='Tenant to add as member.')
def do_member_create(gc, args):
"""Create member for a given image."""
if not (args.image_id and args.member_id):
utils.exit('Unable to create member. Specify image_id and member_id')
else:
member = gc.image_members.create(args.image_id, args.member_id)
member = [member]
columns = ['Image ID', 'Member ID', 'Status']
utils.print_list(member, columns)
@utils.arg('model', metavar='<MODEL>', help='Name of model to describe.')
def do_explain(gc, args):
"""Describe a specific model."""
try:
schema = gc.schemas.get(args.model)
except exc.HTTPNotFound:
utils.exit('Unable to find requested model \'%s\'' % args.model)
else:
formatters = {'Attribute': lambda m: m.name}
columns = ['Attribute', 'Description']
utils.print_list(schema.properties, columns, formatters)
@utils.arg('--file', metavar='<FILE>',
help='Local file to save downloaded image data to. '
'If this is not specified the image data will be '
'written to stdout.')
@utils.arg('id', metavar='<IMAGE_ID>', help='ID of image to download.')
@utils.arg('--progress', action='store_true', default=False,
help='Show download progress bar.')
def do_image_download(gc, args):
"""Download a specific image."""
body = gc.images.data(args.id)
if args.progress:
body = progressbar.VerboseIteratorWrapper(body, len(body))
utils.save_image(body, args.file)
@utils.arg('--file', metavar='<FILE>',
help=('Local file that contains disk image to be uploaded.'
' Alternatively, images can be passed'
' to the client via stdin.'))
@utils.arg('--size', metavar='<IMAGE_SIZE>', type=int,
help='Size in bytes of image to be uploaded. Default is to get '
'size from provided data object but this is supported in case '
'where size cannot be inferred.',
default=None)
@utils.arg('--progress', action='store_true', default=False,
help='Show upload progress bar.')
@utils.arg('id', metavar='<IMAGE_ID>',
help='ID of image to upload data to.')
def do_image_upload(gc, args):
"""Upload data for a specific image."""
image_data = utils.get_data_file(args)
if args.progress:
filesize = utils.get_file_size(image_data)
if filesize is not None:
# NOTE(kragniz): do not show a progress bar if the size of the
# input is unknown (most likely a piped input)
image_data = progressbar.VerboseFileWrapper(image_data, filesize)
gc.images.upload(args.id, image_data, args.size)
@utils.arg('id', metavar='<IMAGE_ID>', help='ID of image to delete.')
def do_image_delete(gc, args):
"""Delete specified image."""
image = gc.images.get(args.id)
if image and image.status == "deleted":
msg = "No image with an ID of '%s' exists." % image.id
utils.exit(msg)
gc.images.delete(args.id)
@utils.arg('image_id', metavar='<IMAGE_ID>',
help='Image to be updated with the given tag.')
@utils.arg('tag_value', metavar='<TAG_VALUE>',
help='Value of the tag.')
def do_image_tag_update(gc, args):
"""Update an image with the given tag."""
if not (args.image_id and args.tag_value):
utils.exit('Unable to update tag. Specify image_id and tag_value')
else:
gc.image_tags.update(args.image_id, args.tag_value)
image = gc.images.get(args.image_id)
image = [image]
columns = ['ID', 'Tags']
utils.print_list(image, columns)
@utils.arg('image_id', metavar='<IMAGE_ID>',
help='ID of the image from which to delete tag.')
@utils.arg('tag_value', metavar='<TAG_VALUE>',
help='Value of the tag.')
def do_image_tag_delete(gc, args):
"""Delete the tag associated with the given image."""
if not (args.image_id and args.tag_value):
utils.exit('Unable to delete tag. Specify image_id and tag_value')
else:
gc.image_tags.delete(args.image_id, args.tag_value)
@utils.arg('--url', metavar='<URL>', required=True,
help='URL of location to add.')
@utils.arg('--metadata', metavar='<STRING>', default='{}',
help=('Metadata associated with the location. '
'Must be a valid JSON object (default: %(default)s)'))
@utils.arg('id', metavar='<ID>',
help='ID of image to which the location is to be added.')
def do_location_add(gc, args):
"""Add a location (and related metadata) to an image."""
try:
metadata = json.loads(args.metadata)
except ValueError:
utils.exit('Metadata is not a valid JSON object.')
else:
image = gc.images.add_location(args.id, args.url, metadata)
utils.print_dict(image)
@utils.arg('--url', metavar='<URL>', action='append', required=True,
help='URL of location to remove. May be used multiple times.')
@utils.arg('id', metavar='<ID>',
help='ID of image whose locations are to be removed.')
def do_location_delete(gc, args):
"""Remove locations (and related metadata) from an image."""
gc.images.delete_locations(args.id, set(args.url))
@utils.arg('--url', metavar='<URL>', required=True,
help='URL of location to update.')
@utils.arg('--metadata', metavar='<STRING>', default='{}',
help=('Metadata associated with the location. '
'Must be a valid JSON object (default: %(default)s)'))
@utils.arg('id', metavar='<ID>',
help='ID of image whose location is to be updated.')
def do_location_update(gc, args):
"""Update metadata of an image's location."""
try:
metadata = json.loads(args.metadata)
except ValueError:
utils.exit('Metadata is not a valid JSON object.')
else:
image = gc.images.update_location(args.id, args.url, metadata)
utils.print_dict(image)
# Metadata - catalog
NAMESPACE_SCHEMA = None
def get_namespace_schema():
global NAMESPACE_SCHEMA
if NAMESPACE_SCHEMA is None:
schema_path = expanduser("~/.glanceclient/namespace_schema.json")
if os.path.isfile(schema_path):
with open(schema_path, "r") as f:
schema_raw = f.read()
NAMESPACE_SCHEMA = json.loads(schema_raw)
return NAMESPACE_SCHEMA
def _namespace_show(namespace, max_column_width=None):
namespace = dict(namespace) # Warlock objects are compatible with dicts
# Flatten dicts for display
if 'properties' in namespace:
props = [k for k in namespace['properties']]
namespace['properties'] = props
if 'resource_type_associations' in namespace:
assocs = [assoc['name']
for assoc in namespace['resource_type_associations']]
namespace['resource_type_associations'] = assocs
if 'objects' in namespace:
objects = [obj['name'] for obj in namespace['objects']]
namespace['objects'] = objects
if max_column_width:
utils.print_dict(namespace, max_column_width)
else:
utils.print_dict(namespace)
@utils.arg('namespace', metavar='<NAMESPACE>', help='Name of the namespace.')
@utils.schema_args(get_namespace_schema, omit=['namespace', 'property_count',
'properties', 'tag_count',
'tags', 'object_count',
'objects', 'resource_types'])
def do_md_namespace_create(gc, args):
"""Create a new metadata definitions namespace."""
schema = gc.schemas.get('metadefs/namespace')
_args = [(x[0].replace('-', '_'), x[1]) for x in vars(args).items()]
fields = dict(filter(lambda x: x[1] is not None and
(schema.is_core_property(x[0])),
_args))
namespace = gc.metadefs_namespace.create(**fields)
_namespace_show(namespace)
@utils.arg('--file', metavar='<FILEPATH>',
help='Path to file with namespace schema to import. Alternatively, '
'namespaces schema can be passed to the client via stdin.')
def do_md_namespace_import(gc, args):
"""Import a metadata definitions namespace from file or standard input."""
namespace_data = utils.get_data_file(args)
if not namespace_data:
utils.exit('No metadata definition namespace passed via stdin or '
'--file argument.')
try:
namespace_json = json.load(namespace_data)
except ValueError:
utils.exit('Schema is not a valid JSON object.')
else:
namespace = gc.metadefs_namespace.create(**namespace_json)
_namespace_show(namespace)
@utils.arg('id', metavar='<NAMESPACE>', help='Name of namespace to update.')
@utils.schema_args(get_namespace_schema, omit=['property_count', 'properties',
'tag_count', 'tags',
'object_count', 'objects',
'resource_type_associations',
'schema'])
def do_md_namespace_update(gc, args):
"""Update an existing metadata definitions namespace."""
schema = gc.schemas.get('metadefs/namespace')
_args = [(x[0].replace('-', '_'), x[1]) for x in vars(args).items()]
fields = dict(filter(lambda x: x[1] is not None and
(schema.is_core_property(x[0])),
_args))
namespace = gc.metadefs_namespace.update(args.id, **fields)
_namespace_show(namespace)
@utils.arg('namespace', metavar='<NAMESPACE>',
help='Name of namespace to describe.')
@utils.arg('--resource-type', metavar='<RESOURCE_TYPE>',
help='Applies prefix of given resource type associated to a '
'namespace to all properties of a namespace.', default=None)
@utils.arg('--max-column-width', metavar='<integer>', default=80,
help='The max column width of the printed table.')
def do_md_namespace_show(gc, args):
"""Describe a specific metadata definitions namespace.
Lists also the namespace properties, objects and resource type
associations.
"""
kwargs = {}
if args.resource_type:
kwargs['resource_type'] = args.resource_type
namespace = gc.metadefs_namespace.get(args.namespace, **kwargs)
_namespace_show(namespace, int(args.max_column_width))
@utils.arg('--resource-types', metavar='<RESOURCE_TYPES>', action='append',
help='Resource type to filter namespaces.')
@utils.arg('--visibility', metavar='<VISIBILITY>',
help='Visibility parameter to filter namespaces.')
@utils.arg('--page-size', metavar='<SIZE>', default=None, type=int,
help='Number of namespaces to request in each paginated request.')
def do_md_namespace_list(gc, args):
"""List metadata definitions namespaces."""
filter_keys = ['resource_types', 'visibility']
filter_items = [(key, getattr(args, key, None)) for key in filter_keys]
filters = dict([item for item in filter_items if item[1] is not None])
kwargs = {'filters': filters}
if args.page_size is not None:
kwargs['page_size'] = args.page_size
namespaces = gc.metadefs_namespace.list(**kwargs)
columns = ['namespace']
utils.print_list(namespaces, columns)
@utils.arg('namespace', metavar='<NAMESPACE>',
help='Name of namespace to delete.')
def do_md_namespace_delete(gc, args):
"""Delete specified metadata definitions namespace with its contents."""
gc.metadefs_namespace.delete(args.namespace)
# Metadata - catalog
RESOURCE_TYPE_SCHEMA = None
def get_resource_type_schema():
global RESOURCE_TYPE_SCHEMA
if RESOURCE_TYPE_SCHEMA is None:
schema_path = expanduser("~/.glanceclient/resource_type_schema.json")
if os.path.isfile(schema_path):
with open(schema_path, "r") as f:
schema_raw = f.read()
RESOURCE_TYPE_SCHEMA = json.loads(schema_raw)
return RESOURCE_TYPE_SCHEMA
@utils.arg('namespace', metavar='<NAMESPACE>', help='Name of namespace.')
@utils.schema_args(get_resource_type_schema)
def do_md_resource_type_associate(gc, args):
"""Associate resource type with a metadata definitions namespace."""
schema = gc.schemas.get('metadefs/resource_type')
_args = [(x[0].replace('-', '_'), x[1]) for x in vars(args).items()]
fields = dict(filter(lambda x: x[1] is not None and
(schema.is_core_property(x[0])),
_args))
resource_type = gc.metadefs_resource_type.associate(args.namespace,
**fields)
utils.print_dict(resource_type)
@utils.arg('namespace', metavar='<NAMESPACE>', help='Name of namespace.')
@utils.arg('resource_type', metavar='<RESOURCE_TYPE>',
help='Name of resource type.')
def do_md_resource_type_deassociate(gc, args):
"""Deassociate resource type with a metadata definitions namespace."""
gc.metadefs_resource_type.deassociate(args.namespace, args.resource_type)
def do_md_resource_type_list(gc, args):
"""List available resource type names."""
resource_types = gc.metadefs_resource_type.list()
utils.print_list(resource_types, ['name'])
@utils.arg('namespace', metavar='<NAMESPACE>', help='Name of namespace.')
def do_md_namespace_resource_type_list(gc, args):
"""List resource types associated to specific namespace."""
resource_types = gc.metadefs_resource_type.get(args.namespace)
utils.print_list(resource_types, ['name', 'prefix', 'properties_target'])
@utils.arg('namespace', metavar='<NAMESPACE>',
help='Name of namespace the property will belong.')
@utils.arg('--name', metavar='<NAME>', required=True,
help='Internal name of a property.')
@utils.arg('--title', metavar='<TITLE>', required=True,
help='Property name displayed to the user.')
@utils.arg('--schema', metavar='<SCHEMA>', required=True,
help='Valid JSON schema of a property.')
def do_md_property_create(gc, args):
"""Create a new metadata definitions property inside a namespace."""
try:
schema = json.loads(args.schema)
except ValueError:
utils.exit('Schema is not a valid JSON object.')
else:
fields = {'name': args.name, 'title': args.title}
fields.update(schema)
new_property = gc.metadefs_property.create(args.namespace, **fields)
utils.print_dict(new_property)
@utils.arg('namespace', metavar='<NAMESPACE>',
help='Name of namespace the property belongs.')
@utils.arg('property', metavar='<PROPERTY>', help='Name of a property.')
@utils.arg('--name', metavar='<NAME>', default=None,
help='New name of a property.')
@utils.arg('--title', metavar='<TITLE>', default=None,
help='Property name displayed to the user.')
@utils.arg('--schema', metavar='<SCHEMA>', default=None,
help='Valid JSON schema of a property.')
def do_md_property_update(gc, args):
"""Update metadata definitions property inside a namespace."""
fields = {}
if args.name:
fields['name'] = args.name
if args.title:
fields['title'] = args.title
if args.schema:
try:
schema = json.loads(args.schema)
except ValueError:
utils.exit('Schema is not a valid JSON object.')
else:
fields.update(schema)
new_property = gc.metadefs_property.update(args.namespace, args.property,
**fields)
utils.print_dict(new_property)
@utils.arg('namespace', metavar='<NAMESPACE>',
help='Name of namespace the property belongs.')
@utils.arg('property', metavar='<PROPERTY>', help='Name of a property.')
@utils.arg('--max-column-width', metavar='<integer>', default=80,
help='The max column width of the printed table.')
def do_md_property_show(gc, args):
"""Describe a specific metadata definitions property inside a namespace."""
prop = gc.metadefs_property.get(args.namespace, args.property)
utils.print_dict(prop, int(args.max_column_width))
@utils.arg('namespace', metavar='<NAMESPACE>',
help='Name of namespace the property belongs.')
@utils.arg('property', metavar='<PROPERTY>', help='Name of a property.')
def do_md_property_delete(gc, args):
"""Delete a specific metadata definitions property inside a namespace."""
gc.metadefs_property.delete(args.namespace, args.property)
@utils.arg('namespace', metavar='<NAMESPACE>', help='Name of namespace.')
def do_md_namespace_properties_delete(gc, args):
"""Delete all metadata definitions property inside a specific namespace."""
gc.metadefs_property.delete_all(args.namespace)
@utils.arg('namespace', metavar='<NAMESPACE>', help='Name of namespace.')
def do_md_property_list(gc, args):
"""List metadata definitions properties inside a specific namespace."""
properties = gc.metadefs_property.list(args.namespace)
columns = ['name', 'title', 'type']
utils.print_list(properties, columns)
def _object_show(obj, max_column_width=None):
obj = dict(obj) # Warlock objects are compatible with dicts
# Flatten dicts for display
if 'properties' in obj:
objects = [k for k in obj['properties']]
obj['properties'] = objects
if max_column_width:
utils.print_dict(obj, max_column_width)
else:
utils.print_dict(obj)
@utils.arg('namespace', metavar='<NAMESPACE>',
help='Name of namespace the object will belong.')
@utils.arg('--name', metavar='<NAME>', required=True,
help='Internal name of an object.')
@utils.arg('--schema', metavar='<SCHEMA>', required=True,
help='Valid JSON schema of an object.')
def do_md_object_create(gc, args):
"""Create a new metadata definitions object inside a namespace."""
try:
schema = json.loads(args.schema)
except ValueError:
utils.exit('Schema is not a valid JSON object.')
else:
fields = {'name': args.name}
fields.update(schema)
new_object = gc.metadefs_object.create(args.namespace, **fields)
_object_show(new_object)
@utils.arg('namespace', metavar='<NAMESPACE>',
help='Name of namespace the object belongs.')
@utils.arg('object', metavar='<OBJECT>', help='Name of an object.')
@utils.arg('--name', metavar='<NAME>', default=None,
help='New name of an object.')
@utils.arg('--schema', metavar='<SCHEMA>', default=None,
help='Valid JSON schema of an object.')
def do_md_object_update(gc, args):
"""Update metadata definitions object inside a namespace."""
fields = {}
if args.name:
fields['name'] = args.name
if args.schema:
try:
schema = json.loads(args.schema)
except ValueError:
utils.exit('Schema is not a valid JSON object.')
else:
fields.update(schema)
new_object = gc.metadefs_object.update(args.namespace, args.object,
**fields)
_object_show(new_object)
@utils.arg('namespace', metavar='<NAMESPACE>',
help='Name of namespace the object belongs.')
@utils.arg('object', metavar='<OBJECT>', help='Name of an object.')
@utils.arg('--max-column-width', metavar='<integer>', default=80,
help='The max column width of the printed table.')
def do_md_object_show(gc, args):
"""Describe a specific metadata definitions object inside a namespace."""
obj = gc.metadefs_object.get(args.namespace, args.object)
_object_show(obj, int(args.max_column_width))
@utils.arg('namespace', metavar='<NAMESPACE>',
help='Name of namespace the object belongs.')
@utils.arg('object', metavar='<OBJECT>', help='Name of an object.')
@utils.arg('property', metavar='<PROPERTY>', help='Name of a property.')
@utils.arg('--max-column-width', metavar='<integer>', default=80,
help='The max column width of the printed table.')
def do_md_object_property_show(gc, args):
"""Describe a specific metadata definitions property inside an object."""
obj = gc.metadefs_object.get(args.namespace, args.object)
try:
prop = obj['properties'][args.property]
prop['name'] = args.property
except KeyError:
utils.exit('Property %s not found in object %s.' % (args.property,
args.object))
utils.print_dict(prop, int(args.max_column_width))
@utils.arg('namespace', metavar='<NAMESPACE>',
help='Name of namespace the object belongs.')
@utils.arg('object', metavar='<OBJECT>', help='Name of an object.')
def do_md_object_delete(gc, args):
"""Delete a specific metadata definitions object inside a namespace."""
gc.metadefs_object.delete(args.namespace, args.object)
@utils.arg('namespace', metavar='<NAMESPACE>', help='Name of namespace.')
def do_md_namespace_objects_delete(gc, args):
"""Delete all metadata definitions objects inside a specific namespace."""
gc.metadefs_object.delete_all(args.namespace)
@utils.arg('namespace', metavar='<NAMESPACE>', help='Name of namespace.')
def do_md_object_list(gc, args):
"""List metadata definitions objects inside a specific namespace."""
objects = gc.metadefs_object.list(args.namespace)
columns = ['name', 'description']
column_settings = {
"description": {
"max_width": 50,
"align": "l"
}
}
utils.print_list(objects, columns, field_settings=column_settings)
@utils.arg('--sort-key', default='status',
choices=tasks.SORT_KEY_VALUES,
help='Sort task list by specified field.')
@utils.arg('--sort-dir', default='desc',
choices=tasks.SORT_DIR_VALUES,
help='Sort task list in specified direction.')
@utils.arg('--page-size', metavar='<SIZE>', default=None, type=int,
help='Number of tasks to request in each paginated request.')
@utils.arg('--type', metavar='<TYPE>',
help='Filter tasks to those that have this type.')
@utils.arg('--status', metavar='<STATUS>',
help='Filter tasks to those that have this status.')
def do_task_list(gc, args):
"""List tasks you can access."""
filter_keys = ['type', 'status']
filter_items = [(key, getattr(args, key)) for key in filter_keys]
filters = dict([item for item in filter_items if item[1] is not None])
kwargs = {'filters': filters}
if args.page_size is not None:
kwargs['page_size'] = args.page_size
kwargs['sort_key'] = args.sort_key
kwargs['sort_dir'] = args.sort_dir
tasks = gc.tasks.list(**kwargs)
columns = ['ID', 'Type', 'Status', 'Owner']
utils.print_list(tasks, columns)
@utils.arg('id', metavar='<TASK_ID>', help='ID of task to describe.')
def do_task_show(gc, args):
"""Describe a specific task."""
task = gc.tasks.get(args.id)
ignore = ['self', 'schema']
task = dict([item for item in task.iteritems() if item[0] not in ignore])
utils.print_dict(task)
@utils.arg('--type', metavar='<TYPE>',
help='Type of Task. Please refer to Glance schema or documentation'
' to see which tasks are supported.')
@utils.arg('--input', metavar='<STRING>', default='{}',
help='Parameters of the task to be launched')
def do_task_create(gc, args):
"""Create a new task."""
if not (args.type and args.input):
utils.exit('Unable to create task. Specify task type and input.')
else:
try:
input = json.loads(args.input)
except ValueError:
utils.exit('Failed to parse the "input" parameter. Must be a '
'valid JSON object.')
task_values = {'type': args.type, 'input': input}
task = gc.tasks.create(**task_values)
ignore = ['self', 'schema']
task = dict([item for item in task.iteritems()
if item[0] not in ignore])
utils.print_dict(task)

View File

@ -1,120 +0,0 @@
# Copyright 2013 OpenStack LLC.
# Copyright 2013 IBM Corp.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_utils import encodeutils
import six
import warlock
from daisyclient.common import utils
from daisyclient.v2 import schemas
DEFAULT_PAGE_SIZE = 20
SORT_DIR_VALUES = ('asc', 'desc')
SORT_KEY_VALUES = ('id', 'type', 'status')
class Controller(object):
def __init__(self, http_client, schema_client):
self.http_client = http_client
self.schema_client = schema_client
@utils.memoized_property
def model(self):
schema = self.schema_client.get('task')
return warlock.model_factory(schema.raw(), schemas.SchemaBasedModel)
def list(self, **kwargs):
"""Retrieve a listing of Task objects
:param page_size: Number of tasks to request in each paginated request
:returns generator over list of Tasks
"""
def paginate(url):
resp, body = self.http_client.get(url)
for task in body['tasks']:
yield task
try:
next_url = body['next']
except KeyError:
return
else:
for task in paginate(next_url):
yield task
filters = kwargs.get('filters', {})
if not kwargs.get('page_size'):
filters['limit'] = DEFAULT_PAGE_SIZE
else:
filters['limit'] = kwargs['page_size']
if 'marker' in kwargs:
filters['marker'] = kwargs['marker']
sort_key = kwargs.get('sort_key')
if sort_key is not None:
if sort_key in SORT_KEY_VALUES:
filters['sort_key'] = sort_key
else:
raise ValueError('sort_key must be one of the following: %s.'
% ', '.join(SORT_KEY_VALUES))
sort_dir = kwargs.get('sort_dir')
if sort_dir is not None:
if sort_dir in SORT_DIR_VALUES:
filters['sort_dir'] = sort_dir
else:
raise ValueError('sort_dir must be one of the following: %s.'
% ', '.join(SORT_DIR_VALUES))
for param, value in filters.items():
if isinstance(value, six.string_types):
filters[param] = encodeutils.safe_encode(value)
url = '/v2/tasks?%s' % six.moves.urllib.parse.urlencode(filters)
for task in paginate(url):
# NOTE(flwang): remove 'self' for now until we have an elegant
# way to pass it into the model constructor without conflict
task.pop('self', None)
yield self.model(**task)
def get(self, task_id):
"""Get a task based on given task id."""
url = '/v2/tasks/%s' % task_id
resp, body = self.http_client.get(url)
# NOTE(flwang): remove 'self' for now until we have an elegant
# way to pass it into the model constructor without conflict
body.pop('self', None)
return self.model(**body)
def create(self, **kwargs):
"""Create a new task."""
url = '/v2/tasks'
task = self.model()
for (key, value) in kwargs.items():
try:
setattr(task, key, value)
except warlock.InvalidOperation as e:
raise TypeError(unicode(e))
resp, body = self.http_client.post(url, data=task)
# NOTE(flwang): remove 'self' for now until we have an elegant
# way to pass it into the model constructor without conflict
body.pop('self', None)
return self.model(**body)

View File

@ -1,18 +1,18 @@
#!/usr/bin/env python
# Copyright (c) 2013 Hewlett-Packard Development Company, L.P.
# Copyright 2012 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
# THIS FILE IS MANAGED BY THE GLOBAL REQUIREMENTS REPO - DO NOT EDIT
import setuptools

View File

@ -1,14 +1,17 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
# Copyright 2012 OpenStack Foundation
# All Rights Reserved.
#
# http://www.apache.org/licenses/LICENSE-2.0
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import copy
import json

View File

@ -1,5 +1,4 @@
# Copyright 2013 OpenStack Foundation
# Copyright (C) 2013 Yahoo! Inc.
# Copyright 2012 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may

View File

@ -1,4 +1,4 @@
# Copyright 2014 Red Hat, Inc.
# Copyright 2012 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may

View File

@ -1,5 +1,4 @@
# Copyright 2013 OpenStack Foundation
# Copyright (C) 2013 Yahoo! Inc.
# Copyright 2012 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may

View File

@ -1,125 +0,0 @@
# Copyright 2012 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import testtools
import glanceclient.v1.image_members
import glanceclient.v1.images
from tests import utils
fixtures = {
'/v1/images/1/members': {
'GET': (
{},
{'members': [
{'member_id': '1', 'can_share': False},
]},
),
'PUT': ({}, None),
},
'/v1/images/1/members/1': {
'GET': (
{},
{'member': {
'member_id': '1',
'can_share': False,
}},
),
'PUT': ({}, None),
'DELETE': ({}, None),
},
'/v1/shared-images/1': {
'GET': (
{},
{'shared_images': [
{'image_id': '1', 'can_share': False},
]},
),
},
}
class ImageMemberManagerTest(testtools.TestCase):
def setUp(self):
super(ImageMemberManagerTest, self).setUp()
self.api = utils.FakeAPI(fixtures)
self.mgr = glanceclient.v1.image_members.ImageMemberManager(self.api)
self.image = glanceclient.v1.images.Image(self.api, {'id': '1'}, True)
def test_list_by_image(self):
members = self.mgr.list(image=self.image)
expect = [('GET', '/v1/images/1/members', {}, None)]
self.assertEqual(expect, self.api.calls)
self.assertEqual(1, len(members))
self.assertEqual('1', members[0].member_id)
self.assertEqual('1', members[0].image_id)
self.assertFalse(members[0].can_share)
def test_list_by_member(self):
resource_class = glanceclient.v1.image_members.ImageMember
member = resource_class(self.api, {'member_id': '1'}, True)
self.mgr.list(member=member)
expect = [('GET', '/v1/shared-images/1', {}, None)]
self.assertEqual(expect, self.api.calls)
def test_get(self):
member = self.mgr.get(self.image, '1')
expect = [('GET', '/v1/images/1/members/1', {}, None)]
self.assertEqual(expect, self.api.calls)
self.assertEqual('1', member.member_id)
self.assertEqual('1', member.image_id)
self.assertFalse(member.can_share)
def test_delete(self):
self.mgr.delete('1', '1')
expect = [('DELETE', '/v1/images/1/members/1', {}, None)]
self.assertEqual(expect, self.api.calls)
def test_create(self):
self.mgr.create(self.image, '1', can_share=True)
expect_body = {'member': {'can_share': True}}
expect = [('PUT', '/v1/images/1/members/1', {},
sorted(expect_body.items()))]
self.assertEqual(expect, self.api.calls)
def test_replace(self):
body = [
{'member_id': '2', 'can_share': False},
{'member_id': '3'},
]
self.mgr.replace(self.image, body)
expect = [('PUT', '/v1/images/1/members', {},
sorted({'memberships': body}.items()))]
self.assertEqual(expect, self.api.calls)
def test_replace_objects(self):
body = [
glanceclient.v1.image_members.ImageMember(
self.mgr, {'member_id': '2', 'can_share': False}, True),
glanceclient.v1.image_members.ImageMember(
self.mgr, {'member_id': '3', 'can_share': True}, True),
]
self.mgr.replace(self.image, body)
expect_body = {
'memberships': [
{'member_id': '2', 'can_share': False},
{'member_id': '3', 'can_share': True},
],
}
expect = [('PUT', '/v1/images/1/members', {},
sorted(expect_body.items()))]
self.assertEqual(expect, self.api.calls)

View File

@ -1,963 +0,0 @@
# Copyright 2012 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import errno
import json
import testtools
import six
from six.moves.urllib import parse
from glanceclient.v1 import client
from glanceclient.v1 import images
from glanceclient.v1 import shell
from tests import utils
fixtures = {
'/v1/images': {
'POST': (
{
'location': '/v1/images/1',
'x-openstack-request-id': 'req-1234',
},
json.dumps(
{'image': {
'id': '1',
'name': 'image-1',
'container_format': 'ovf',
'disk_format': 'vhd',
'owner': 'asdf',
'size': '1024',
'min_ram': '512',
'min_disk': '10',
'properties': {'a': 'b', 'c': 'd'},
'is_public': False,
'protected': False,
'deleted': False,
}},
),
),
},
'/v1/images/detail?limit=20': {
'GET': (
{},
{'images': [
{
'id': 'a',
'name': 'image-1',
'properties': {'arch': 'x86_64'},
},
{
'id': 'b',
'name': 'image-2',
'properties': {'arch': 'x86_64'},
},
]},
),
},
'/v1/images/detail?is_public=None&limit=20': {
'GET': (
{'x-openstack-request-id': 'req-1234'},
{'images': [
{
'id': 'a',
'owner': 'A',
'is_public': 'True',
'name': 'image-1',
'properties': {'arch': 'x86_64'},
},
{
'id': 'b',
'owner': 'B',
'is_public': 'False',
'name': 'image-2',
'properties': {'arch': 'x86_64'},
},
{
'id': 'c',
'is_public': 'False',
'name': 'image-3',
'properties': {'arch': 'x86_64'},
},
]},
),
},
'/v1/images/detail?is_public=None&limit=5': {
'GET': (
{},
{'images': [
{
'id': 'a',
'owner': 'A',
'name': 'image-1',
'properties': {'arch': 'x86_64'},
},
{
'id': 'b',
'owner': 'B',
'name': 'image-2',
'properties': {'arch': 'x86_64'},
},
{
'id': 'b2',
'owner': 'B',
'name': 'image-3',
'properties': {'arch': 'x86_64'},
},
{
'id': 'c',
'name': 'image-3',
'properties': {'arch': 'x86_64'},
},
]},
),
},
'/v1/images/detail?limit=5': {
'GET': (
{},
{'images': [
{
'id': 'a',
'owner': 'A',
'is_public': 'False',
'name': 'image-1',
'properties': {'arch': 'x86_64'},
},
{
'id': 'b',
'owner': 'A',
'is_public': 'False',
'name': 'image-2',
'properties': {'arch': 'x86_64'},
},
{
'id': 'b2',
'owner': 'B',
'name': 'image-3',
'properties': {'arch': 'x86_64'},
},
{
'id': 'c',
'is_public': 'True',
'name': 'image-3',
'properties': {'arch': 'x86_64'},
},
]},
),
},
'/v1/images/detail?limit=20&marker=a': {
'GET': (
{},
{'images': [
{
'id': 'b',
'name': 'image-1',
'properties': {'arch': 'x86_64'},
},
{
'id': 'c',
'name': 'image-2',
'properties': {'arch': 'x86_64'},
},
]},
),
},
'/v1/images/detail?limit=1': {
'GET': (
{},
{'images': [
{
'id': 'a',
'name': 'image-0',
'properties': {'arch': 'x86_64'},
},
]},
),
},
'/v1/images/detail?limit=1&marker=a': {
'GET': (
{},
{'images': [
{
'id': 'b',
'name': 'image-1',
'properties': {'arch': 'x86_64'},
},
]},
),
},
'/v1/images/detail?limit=2': {
'GET': (
{},
{'images': [
{
'id': 'a',
'name': 'image-1',
'properties': {'arch': 'x86_64'},
},
{
'id': 'b',
'name': 'image-2',
'properties': {'arch': 'x86_64'},
},
]},
),
},
'/v1/images/detail?limit=2&marker=b': {
'GET': (
{},
{'images': [
{
'id': 'c',
'name': 'image-3',
'properties': {'arch': 'x86_64'},
},
]},
),
},
'/v1/images/detail?limit=20&name=foo': {
'GET': (
{},
{'images': [
{
'id': 'a',
'name': 'image-1',
'properties': {'arch': 'x86_64'},
},
{
'id': 'b',
'name': 'image-2',
'properties': {'arch': 'x86_64'},
},
]},
),
},
'/v1/images/detail?limit=20&property-ping=pong':
{
'GET': (
{},
{'images': [
{
'id': '1',
'name': 'image-1',
'properties': {'arch': 'x86_64'},
},
]},
),
},
'/v1/images/detail?limit=20&sort_dir=desc': {
'GET': (
{},
{'images': [
{
'id': 'a',
'name': 'image-1',
'properties': {'arch': 'x86_64'},
},
{
'id': 'b',
'name': 'image-2',
'properties': {'arch': 'x86_64'},
},
]},
),
},
'/v1/images/detail?limit=20&sort_key=name': {
'GET': (
{},
{'images': [
{
'id': 'a',
'name': 'image-1',
'properties': {'arch': 'x86_64'},
},
{
'id': 'b',
'name': 'image-2',
'properties': {'arch': 'x86_64'},
},
]},
),
},
'/v1/images/1': {
'HEAD': (
{
'x-image-meta-id': '1',
'x-image-meta-name': 'image-1',
'x-image-meta-property-arch': 'x86_64',
'x-image-meta-is_public': 'false',
'x-image-meta-protected': 'false',
'x-image-meta-deleted': 'false',
},
None),
'GET': (
{},
'XXX',
),
'PUT': (
{},
json.dumps(
{'image': {
'id': '1',
'name': 'image-2',
'container_format': 'ovf',
'disk_format': 'vhd',
'owner': 'asdf',
'size': '1024',
'min_ram': '512',
'min_disk': '10',
'properties': {'a': 'b', 'c': 'd'},
'is_public': False,
'protected': False,
}},
),
),
'DELETE': ({}, None),
},
'/v1/images/2': {
'HEAD': (
{
'x-image-meta-id': '2'
},
None,
),
'GET': (
{
'x-image-meta-checksum': 'wrong'
},
'YYY',
),
},
'/v1/images/3': {
'HEAD': (
{
'x-image-meta-id': '3',
'x-image-meta-name': u"ni\xf1o"
},
None,
),
'GET': (
{
'x-image-meta-checksum': '0745064918b49693cca64d6b6a13d28a'
},
'ZZZ',
),
},
'/v1/images/4': {
'HEAD': (
{
'x-image-meta-id': '4',
'x-image-meta-name': 'image-4',
'x-image-meta-property-arch': 'x86_64',
'x-image-meta-is_public': 'false',
'x-image-meta-protected': 'false',
'x-image-meta-deleted': 'false',
'x-openstack-request-id': 'req-1234',
},
None),
'GET': (
{
'x-openstack-request-id': 'req-1234',
},
'XXX',
),
'PUT': (
{
'x-openstack-request-id': 'req-1234',
},
json.dumps(
{'image': {
'id': '4',
'name': 'image-4',
'container_format': 'ovf',
'disk_format': 'vhd',
'owner': 'asdf',
'size': '1024',
'min_ram': '512',
'min_disk': '10',
'properties': {'a': 'b', 'c': 'd'},
'is_public': False,
'protected': False,
}},
),
),
'DELETE': (
{
'x-openstack-request-id': 'req-1234',
},
None),
},
'/v1/images/v2_created_img': {
'PUT': (
{},
json.dumps({
"image": {
"status": "queued",
"deleted": False,
"container_format": "bare",
"min_ram": 0,
"updated_at": "2013-12-20T01:51:45",
"owner": "foo",
"min_disk": 0,
"is_public": False,
"deleted_at": None,
"id": "v2_created_img",
"size": None,
"name": "bar",
"checksum": None,
"created_at": "2013-12-20T01:50:38",
"disk_format": "qcow2",
"properties": {},
"protected": False
}
})
),
},
}
class ImageManagerTest(testtools.TestCase):
def setUp(self):
super(ImageManagerTest, self).setUp()
self.api = utils.FakeAPI(fixtures)
self.mgr = images.ImageManager(self.api)
def test_paginated_list(self):
images = list(self.mgr.list(page_size=2))
expect = [
('GET', '/v1/images/detail?limit=2', {}, None),
('GET', '/v1/images/detail?limit=2&marker=b', {}, None),
]
self.assertEqual(expect, self.api.calls)
self.assertEqual(3, len(images))
self.assertEqual('a', images[0].id)
self.assertEqual('b', images[1].id)
self.assertEqual('c', images[2].id)
def test_list_with_limit_less_than_page_size(self):
results = list(self.mgr.list(page_size=2, limit=1))
expect = [('GET', '/v1/images/detail?limit=2', {}, None)]
self.assertEqual(1, len(results))
self.assertEqual(expect, self.api.calls)
def test_list_with_limit_greater_than_page_size(self):
images = list(self.mgr.list(page_size=1, limit=2))
expect = [
('GET', '/v1/images/detail?limit=1', {}, None),
('GET', '/v1/images/detail?limit=1&marker=a', {}, None),
]
self.assertEqual(2, len(images))
self.assertEqual('a', images[0].id)
self.assertEqual('b', images[1].id)
self.assertEqual(expect, self.api.calls)
def test_list_with_marker(self):
list(self.mgr.list(marker='a'))
url = '/v1/images/detail?limit=20&marker=a'
expect = [('GET', url, {}, None)]
self.assertEqual(expect, self.api.calls)
def test_list_with_filter(self):
list(self.mgr.list(filters={'name': "foo"}))
url = '/v1/images/detail?limit=20&name=foo'
expect = [('GET', url, {}, None)]
self.assertEqual(expect, self.api.calls)
def test_list_with_property_filters(self):
list(self.mgr.list(filters={'properties': {'ping': 'pong'}}))
url = '/v1/images/detail?limit=20&property-ping=pong'
expect = [('GET', url, {}, None)]
self.assertEqual(expect, self.api.calls)
def test_list_with_sort_dir(self):
list(self.mgr.list(sort_dir='desc'))
url = '/v1/images/detail?limit=20&sort_dir=desc'
expect = [('GET', url, {}, None)]
self.assertEqual(expect, self.api.calls)
def test_list_with_sort_key(self):
list(self.mgr.list(sort_key='name'))
url = '/v1/images/detail?limit=20&sort_key=name'
expect = [('GET', url, {}, None)]
self.assertEqual(expect, self.api.calls)
def test_get(self):
image = self.mgr.get('1')
expect = [('HEAD', '/v1/images/1', {}, None)]
self.assertEqual(expect, self.api.calls)
self.assertEqual('1', image.id)
self.assertEqual('image-1', image.name)
self.assertFalse(image.is_public)
self.assertFalse(image.protected)
self.assertFalse(image.deleted)
self.assertEqual({u'arch': u'x86_64'}, image.properties)
def test_get_int(self):
image = self.mgr.get(1)
expect = [('HEAD', '/v1/images/1', {}, None)]
self.assertEqual(expect, self.api.calls)
self.assertEqual('1', image.id)
self.assertEqual('image-1', image.name)
self.assertFalse(image.is_public)
self.assertFalse(image.protected)
self.assertFalse(image.deleted)
self.assertEqual({u'arch': u'x86_64'}, image.properties)
def test_get_encoding(self):
image = self.mgr.get('3')
self.assertEqual(u"ni\xf1o", image.name)
def test_get_req_id(self):
params = {'return_req_id': []}
self.mgr.get('4', **params)
expect_req_id = ['req-1234']
self.assertEqual(expect_req_id, params['return_req_id'])
def test_data(self):
data = ''.join([b for b in self.mgr.data('1', do_checksum=False)])
expect = [('GET', '/v1/images/1', {}, None)]
self.assertEqual(expect, self.api.calls)
self.assertEqual('XXX', data)
expect += [('GET', '/v1/images/1', {}, None)]
data = ''.join([b for b in self.mgr.data('1')])
self.assertEqual(expect, self.api.calls)
self.assertEqual('XXX', data)
def test_data_with_wrong_checksum(self):
data = ''.join([b for b in self.mgr.data('2', do_checksum=False)])
expect = [('GET', '/v1/images/2', {}, None)]
self.assertEqual(expect, self.api.calls)
self.assertEqual('YYY', data)
expect += [('GET', '/v1/images/2', {}, None)]
data = self.mgr.data('2')
self.assertEqual(expect, self.api.calls)
try:
data = ''.join([b for b in data])
self.fail('data did not raise an error.')
except IOError as e:
self.assertEqual(errno.EPIPE, e.errno)
msg = 'was fd7c5c4fdaa97163ee4ba8842baa537a expected wrong'
self.assertIn(msg, str(e))
def test_data_req_id(self):
params = {
'do_checksum': False,
'return_req_id': [],
}
''.join([b for b in self.mgr.data('4', **params)])
expect_req_id = ['req-1234']
self.assertEqual(expect_req_id, params['return_req_id'])
def test_data_with_checksum(self):
data = ''.join([b for b in self.mgr.data('3', do_checksum=False)])
expect = [('GET', '/v1/images/3', {}, None)]
self.assertEqual(expect, self.api.calls)
self.assertEqual('ZZZ', data)
expect += [('GET', '/v1/images/3', {}, None)]
data = ''.join([b for b in self.mgr.data('3')])
self.assertEqual(expect, self.api.calls)
self.assertEqual('ZZZ', data)
def test_delete(self):
self.mgr.delete('1')
expect = [('DELETE', '/v1/images/1', {}, None)]
self.assertEqual(expect, self.api.calls)
def test_delete_req_id(self):
params = {
'return_req_id': []
}
self.mgr.delete('4', **params)
expect = [('DELETE', '/v1/images/4', {}, None)]
self.assertEqual(self.api.calls, expect)
expect_req_id = ['req-1234']
self.assertEqual(expect_req_id, params['return_req_id'])
def test_create_without_data(self):
params = {
'id': '1',
'name': 'image-1',
'container_format': 'ovf',
'disk_format': 'vhd',
'owner': 'asdf',
'size': 1024,
'min_ram': 512,
'min_disk': 10,
'copy_from': 'http://example.com',
'properties': {'a': 'b', 'c': 'd'},
}
image = self.mgr.create(**params)
expect_headers = {
'x-image-meta-id': '1',
'x-image-meta-name': 'image-1',
'x-image-meta-container_format': 'ovf',
'x-image-meta-disk_format': 'vhd',
'x-image-meta-owner': 'asdf',
'x-image-meta-size': '1024',
'x-image-meta-min_ram': '512',
'x-image-meta-min_disk': '10',
'x-glance-api-copy-from': 'http://example.com',
'x-image-meta-property-a': 'b',
'x-image-meta-property-c': 'd',
}
expect = [('POST', '/v1/images', expect_headers, None)]
self.assertEqual(expect, self.api.calls)
self.assertEqual('1', image.id)
self.assertEqual('image-1', image.name)
self.assertEqual('ovf', image.container_format)
self.assertEqual('vhd', image.disk_format)
self.assertEqual('asdf', image.owner)
self.assertEqual(1024, image.size)
self.assertEqual(512, image.min_ram)
self.assertEqual(10, image.min_disk)
self.assertFalse(image.is_public)
self.assertFalse(image.protected)
self.assertFalse(image.deleted)
self.assertEqual({'a': 'b', 'c': 'd'}, image.properties)
def test_create_with_data(self):
image_data = six.StringIO('XXX')
self.mgr.create(data=image_data)
expect_headers = {'x-image-meta-size': '3'}
expect = [('POST', '/v1/images', expect_headers, image_data)]
self.assertEqual(expect, self.api.calls)
def test_create_req_id(self):
params = {
'id': '4',
'name': 'image-4',
'container_format': 'ovf',
'disk_format': 'vhd',
'owner': 'asdf',
'size': 1024,
'min_ram': 512,
'min_disk': 10,
'copy_from': 'http://example.com',
'properties': {'a': 'b', 'c': 'd'},
'return_req_id': [],
}
image = self.mgr.create(**params)
expect_headers = {
'x-image-meta-id': '4',
'x-image-meta-name': 'image-4',
'x-image-meta-container_format': 'ovf',
'x-image-meta-disk_format': 'vhd',
'x-image-meta-owner': 'asdf',
'x-image-meta-size': '1024',
'x-image-meta-min_ram': '512',
'x-image-meta-min_disk': '10',
'x-glance-api-copy-from': 'http://example.com',
'x-image-meta-property-a': 'b',
'x-image-meta-property-c': 'd',
}
expect = [('POST', '/v1/images', expect_headers, None)]
self.assertEqual(self.api.calls, expect)
self.assertEqual(image.id, '1')
expect_req_id = ['req-1234']
self.assertEqual(expect_req_id, params['return_req_id'])
def test_update(self):
fields = {
'name': 'image-2',
'container_format': 'ovf',
'disk_format': 'vhd',
'owner': 'asdf',
'size': 1024,
'min_ram': 512,
'min_disk': 10,
'copy_from': 'http://example.com',
'properties': {'a': 'b', 'c': 'd'},
'deleted': False,
}
image = self.mgr.update('1', **fields)
expect_hdrs = {
'x-image-meta-name': 'image-2',
'x-image-meta-container_format': 'ovf',
'x-image-meta-disk_format': 'vhd',
'x-image-meta-owner': 'asdf',
'x-image-meta-size': '1024',
'x-image-meta-min_ram': '512',
'x-image-meta-min_disk': '10',
'x-glance-api-copy-from': 'http://example.com',
'x-image-meta-property-a': 'b',
'x-image-meta-property-c': 'd',
'x-image-meta-deleted': 'False',
'x-glance-registry-purge-props': 'false',
}
expect = [('PUT', '/v1/images/1', expect_hdrs, None)]
self.assertEqual(expect, self.api.calls)
self.assertEqual('1', image.id)
self.assertEqual('image-2', image.name)
self.assertEqual(1024, image.size)
self.assertEqual(512, image.min_ram)
self.assertEqual(10, image.min_disk)
def test_update_with_data(self):
image_data = six.StringIO('XXX')
self.mgr.update('1', data=image_data)
expect_headers = {'x-image-meta-size': '3',
'x-glance-registry-purge-props': 'false'}
expect = [('PUT', '/v1/images/1', expect_headers, image_data)]
self.assertEqual(expect, self.api.calls)
def test_update_with_purge_props(self):
self.mgr.update('1', purge_props=True)
expect_headers = {'x-glance-registry-purge-props': 'true'}
expect = [('PUT', '/v1/images/1', expect_headers, None)]
self.assertEqual(expect, self.api.calls)
def test_update_with_purge_props_false(self):
self.mgr.update('1', purge_props=False)
expect_headers = {'x-glance-registry-purge-props': 'false'}
expect = [('PUT', '/v1/images/1', expect_headers, None)]
self.assertEqual(expect, self.api.calls)
def test_update_req_id(self):
fields = {
'purge_props': True,
'return_req_id': [],
}
self.mgr.update('4', **fields)
expect_headers = {'x-glance-registry-purge-props': 'true'}
expect = [('PUT', '/v1/images/4', expect_headers, None)]
self.assertEqual(self.api.calls, expect)
expect_req_id = ['req-1234']
self.assertEqual(expect_req_id, fields['return_req_id'])
def test_image_meta_from_headers_encoding(self):
value = u"ni\xf1o"
if six.PY2:
fields = {"x-image-meta-name": "ni\xc3\xb1o"}
else:
fields = {"x-image-meta-name": value}
headers = self.mgr._image_meta_from_headers(fields)
self.assertEqual(value, headers["name"])
def test_image_list_with_owner(self):
images = self.mgr.list(owner='A', page_size=20)
image_list = list(images)
self.assertEqual('A', image_list[0].owner)
self.assertEqual('a', image_list[0].id)
self.assertEqual(1, len(image_list))
def test_image_list_with_owner_req_id(self):
fields = {
'owner': 'A',
'return_req_id': [],
}
images = self.mgr.list(**fields)
next(images)
self.assertEqual(fields['return_req_id'], ['req-1234'])
def test_image_list_with_notfound_owner(self):
images = self.mgr.list(owner='X', page_size=20)
self.assertEqual(0, len(list(images)))
def test_image_list_with_empty_string_owner(self):
images = self.mgr.list(owner='', page_size=20)
image_list = list(images)
self.assertRaises(AttributeError, lambda: image_list[0].owner)
self.assertEqual('c', image_list[0].id)
self.assertEqual(1, len(image_list))
def test_image_list_with_unspecified_owner(self):
images = self.mgr.list(owner=None, page_size=5)
image_list = list(images)
self.assertEqual('A', image_list[0].owner)
self.assertEqual('a', image_list[0].id)
self.assertEqual('A', image_list[1].owner)
self.assertEqual('b', image_list[1].id)
self.assertEqual('B', image_list[2].owner)
self.assertEqual('b2', image_list[2].id)
self.assertRaises(AttributeError, lambda: image_list[3].owner)
self.assertEqual('c', image_list[3].id)
self.assertEqual(4, len(image_list))
def test_image_list_with_owner_and_limit(self):
images = self.mgr.list(owner='B', page_size=5, limit=1)
image_list = list(images)
self.assertEqual('B', image_list[0].owner)
self.assertEqual('b', image_list[0].id)
self.assertEqual(1, len(image_list))
def test_image_list_all_tenants(self):
images = self.mgr.list(is_public=None, page_size=5)
image_list = list(images)
self.assertEqual('A', image_list[0].owner)
self.assertEqual('a', image_list[0].id)
self.assertEqual('B', image_list[1].owner)
self.assertEqual('b', image_list[1].id)
self.assertEqual('B', image_list[2].owner)
self.assertEqual('b2', image_list[2].id)
self.assertRaises(AttributeError, lambda: image_list[3].owner)
self.assertEqual('c', image_list[3].id)
self.assertEqual(4, len(image_list))
def test_update_v2_created_image_using_v1(self):
fields_to_update = {
'name': 'bar',
'container_format': 'bare',
'disk_format': 'qcow2',
}
image = self.mgr.update('v2_created_img', **fields_to_update)
expect_hdrs = {
'x-image-meta-name': 'bar',
'x-image-meta-container_format': 'bare',
'x-image-meta-disk_format': 'qcow2',
'x-glance-registry-purge-props': 'false',
}
expect = [('PUT', '/v1/images/v2_created_img', expect_hdrs, None)]
self.assertEqual(expect, self.api.calls)
self.assertEqual('v2_created_img', image.id)
self.assertEqual('bar', image.name)
self.assertEqual(0, image.size)
self.assertEqual('bare', image.container_format)
self.assertEqual('qcow2', image.disk_format)
class ImageTest(testtools.TestCase):
def setUp(self):
super(ImageTest, self).setUp()
self.api = utils.FakeAPI(fixtures)
self.mgr = images.ImageManager(self.api)
def test_delete(self):
image = self.mgr.get('1')
image.delete()
expect = [
('HEAD', '/v1/images/1', {}, None),
('HEAD', '/v1/images/1', {}, None),
('DELETE', '/v1/images/1', {}, None),
]
self.assertEqual(expect, self.api.calls)
def test_update(self):
image = self.mgr.get('1')
image.update(name='image-5')
expect = [
('HEAD', '/v1/images/1', {}, None),
('HEAD', '/v1/images/1', {}, None),
('PUT', '/v1/images/1',
{'x-image-meta-name': 'image-5',
'x-glance-registry-purge-props': 'false'}, None),
]
self.assertEqual(expect, self.api.calls)
def test_data(self):
image = self.mgr.get('1')
data = ''.join([b for b in image.data()])
expect = [
('HEAD', '/v1/images/1', {}, None),
('HEAD', '/v1/images/1', {}, None),
('GET', '/v1/images/1', {}, None),
]
self.assertEqual(expect, self.api.calls)
self.assertEqual('XXX', data)
data = ''.join([b for b in image.data(do_checksum=False)])
expect += [('GET', '/v1/images/1', {}, None)]
self.assertEqual(expect, self.api.calls)
self.assertEqual('XXX', data)
def test_data_with_wrong_checksum(self):
image = self.mgr.get('2')
data = ''.join([b for b in image.data(do_checksum=False)])
expect = [
('HEAD', '/v1/images/2', {}, None),
('HEAD', '/v1/images/2', {}, None),
('GET', '/v1/images/2', {}, None),
]
self.assertEqual(expect, self.api.calls)
self.assertEqual('YYY', data)
data = image.data()
expect += [('GET', '/v1/images/2', {}, None)]
self.assertEqual(expect, self.api.calls)
try:
data = ''.join([b for b in image.data()])
self.fail('data did not raise an error.')
except IOError as e:
self.assertEqual(errno.EPIPE, e.errno)
msg = 'was fd7c5c4fdaa97163ee4ba8842baa537a expected wrong'
self.assertIn(msg, str(e))
def test_data_with_checksum(self):
image = self.mgr.get('3')
data = ''.join([b for b in image.data(do_checksum=False)])
expect = [
('HEAD', '/v1/images/3', {}, None),
('HEAD', '/v1/images/3', {}, None),
('GET', '/v1/images/3', {}, None),
]
self.assertEqual(expect, self.api.calls)
self.assertEqual('ZZZ', data)
data = ''.join([b for b in image.data()])
expect += [('GET', '/v1/images/3', {}, None)]
self.assertEqual(expect, self.api.calls)
self.assertEqual('ZZZ', data)
class ParameterFakeAPI(utils.FakeAPI):
image_list = {'images': [
{
'id': 'a',
'name': 'image-1',
'properties': {'arch': 'x86_64'},
},
{
'id': 'b',
'name': 'image-2',
'properties': {'arch': 'x86_64'},
},
]}
def get(self, url, **kwargs):
self.url = url
return utils.FakeResponse({}), ParameterFakeAPI.image_list
class FakeArg(object):
def __init__(self, arg_dict):
self.arg_dict = arg_dict
self.fields = arg_dict.keys()
def __getattr__(self, name):
if name in self.arg_dict:
return self.arg_dict[name]
else:
return None
class UrlParameterTest(testtools.TestCase):
def setUp(self):
super(UrlParameterTest, self).setUp()
self.api = ParameterFakeAPI({})
self.gc = client.Client("http://fakeaddress.com")
self.gc.images = images.ImageManager(self.api)
def test_is_public_list(self):
shell.do_image_list(self.gc, FakeArg({"is_public": "True"}))
parts = parse.urlparse(self.api.url)
qs_dict = parse.parse_qs(parts.query)
self.assertIn('is_public', qs_dict)
self.assertTrue(qs_dict['is_public'][0].lower() == "true")

File diff suppressed because it is too large Load Diff

View File

@ -1,121 +0,0 @@
# Copyright 2013 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import testtools
from glanceclient.v2 import image_members
from tests import utils
IMAGE = '3a4560a1-e585-443e-9b39-553b46ec92d1'
MEMBER = '11223344-5566-7788-9911-223344556677'
data_fixtures = {
'/v2/images/{image}/members'.format(image=IMAGE): {
'GET': (
{},
{'members': [
{
'image_id': IMAGE,
'member_id': MEMBER,
},
]},
),
'POST': (
{},
{
'image_id': IMAGE,
'member_id': MEMBER,
'status': 'pending'
}
)
},
'/v2/images/{image}/members/{mem}'.format(image=IMAGE, mem=MEMBER): {
'DELETE': (
{},
None,
),
'PUT': (
{},
{
'image_id': IMAGE,
'member_id': MEMBER,
'status': 'accepted'
}
),
}
}
schema_fixtures = {
'member': {
'GET': (
{},
{
'name': 'member',
'properties': {
'image_id': {},
'member_id': {}
}
},
)
}
}
class TestController(testtools.TestCase):
def setUp(self):
super(TestController, self).setUp()
self.api = utils.FakeAPI(data_fixtures)
self.schema_api = utils.FakeSchemaAPI(schema_fixtures)
self.controller = image_members.Controller(self.api, self.schema_api)
def test_list_image_members(self):
image_id = IMAGE
# NOTE(iccha): cast to list since the controller returns a generator
image_members = list(self.controller.list(image_id))
self.assertEqual(IMAGE, image_members[0].image_id)
self.assertEqual(MEMBER, image_members[0].member_id)
def test_delete_image_member(self):
image_id = IMAGE
member_id = MEMBER
self.controller.delete(image_id, member_id)
expect = [
('DELETE',
'/v2/images/{image}/members/{mem}'.format(image=IMAGE,
mem=MEMBER),
{},
None)]
self.assertEqual(expect, self.api.calls)
def test_update_image_members(self):
image_id = IMAGE
member_id = MEMBER
status = 'accepted'
image_member = self.controller.update(image_id, member_id, status)
self.assertEqual(IMAGE, image_member.image_id)
self.assertEqual(MEMBER, image_member.member_id)
self.assertEqual(status, image_member.status)
def test_create_image_members(self):
image_id = IMAGE
member_id = MEMBER
status = 'pending'
image_member = self.controller.create(image_id, member_id)
self.assertEqual(IMAGE, image_member.image_id)
self.assertEqual(MEMBER, image_member.member_id)
self.assertEqual(status, image_member.status)

View File

@ -1,675 +0,0 @@
# Copyright 2012 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import testtools
from glanceclient.v2 import metadefs
from tests import utils
NAMESPACE1 = 'Namespace1'
NAMESPACE2 = 'Namespace2'
NAMESPACE3 = 'Namespace3'
NAMESPACE4 = 'Namespace4'
NAMESPACE5 = 'Namespace5'
NAMESPACE6 = 'Namespace6'
NAMESPACE7 = 'Namespace7'
NAMESPACE8 = 'Namespace8'
NAMESPACENEW = 'NamespaceNew'
RESOURCE_TYPE1 = 'ResourceType1'
RESOURCE_TYPE2 = 'ResourceType2'
OBJECT1 = 'Object1'
PROPERTY1 = 'Property1'
PROPERTY2 = 'Property2'
def _get_namespace_fixture(ns_name, rt_name=RESOURCE_TYPE1, **kwargs):
ns = {
"display_name": "Flavor Quota",
"description": "DESCRIPTION1",
"self": "/v2/metadefs/namespaces/%s" % ns_name,
"namespace": ns_name,
"visibility": "public",
"protected": True,
"owner": "admin",
"resource_types": [
{
"name": rt_name
}
],
"schema": "/v2/schemas/metadefs/namespace",
"created_at": "2014-08-14T09:07:06Z",
"updated_at": "2014-08-14T09:07:06Z",
}
ns.update(kwargs)
return ns
data_fixtures = {
"/v2/metadefs/namespaces?limit=20": {
"GET": (
{},
{
"first": "/v2/metadefs/namespaces?limit=20",
"namespaces": [
_get_namespace_fixture(NAMESPACE1),
_get_namespace_fixture(NAMESPACE2),
],
"schema": "/v2/schemas/metadefs/namespaces"
}
)
},
"/v2/metadefs/namespaces?limit=1": {
"GET": (
{},
{
"first": "/v2/metadefs/namespaces?limit=1",
"namespaces": [
_get_namespace_fixture(NAMESPACE7),
],
"schema": "/v2/schemas/metadefs/namespaces",
"next": "/v2/metadefs/namespaces?marker=%s&limit=1"
% NAMESPACE7,
}
)
},
"/v2/metadefs/namespaces?limit=1&marker=%s" % NAMESPACE7: {
"GET": (
{},
{
"first": "/v2/metadefs/namespaces?limit=2",
"namespaces": [
_get_namespace_fixture(NAMESPACE8),
],
"schema": "/v2/schemas/metadefs/namespaces"
}
)
},
"/v2/metadefs/namespaces?limit=2&marker=%s" % NAMESPACE6: {
"GET": (
{},
{
"first": "/v2/metadefs/namespaces?limit=2",
"namespaces": [
_get_namespace_fixture(NAMESPACE7),
_get_namespace_fixture(NAMESPACE8),
],
"schema": "/v2/schemas/metadefs/namespaces"
}
)
},
"/v2/metadefs/namespaces?limit=20&sort_dir=asc": {
"GET": (
{},
{
"first": "/v2/metadefs/namespaces?limit=1",
"namespaces": [
_get_namespace_fixture(NAMESPACE1),
],
"schema": "/v2/schemas/metadefs/namespaces"
}
)
},
"/v2/metadefs/namespaces?limit=20&sort_key=created_at": {
"GET": (
{},
{
"first": "/v2/metadefs/namespaces?limit=1",
"namespaces": [
_get_namespace_fixture(NAMESPACE1),
],
"schema": "/v2/schemas/metadefs/namespaces"
}
)
},
"/v2/metadefs/namespaces?limit=20&resource_types=%s" % RESOURCE_TYPE1: {
"GET": (
{},
{
"first": "/v2/metadefs/namespaces?limit=20",
"namespaces": [
_get_namespace_fixture(NAMESPACE3),
],
"schema": "/v2/schemas/metadefs/namespaces"
}
)
},
"/v2/metadefs/namespaces?limit=20&resource_types="
"%s%%2C%s" % (RESOURCE_TYPE1, RESOURCE_TYPE2): {
"GET": (
{},
{
"first": "/v2/metadefs/namespaces?limit=20",
"namespaces": [
_get_namespace_fixture(NAMESPACE4),
],
"schema": "/v2/schemas/metadefs/namespaces"
}
)
},
"/v2/metadefs/namespaces?limit=20&visibility=private": {
"GET": (
{},
{
"first": "/v2/metadefs/namespaces?limit=20",
"namespaces": [
_get_namespace_fixture(NAMESPACE5),
],
"schema": "/v2/schemas/metadefs/namespaces"
}
)
},
"/v2/metadefs/namespaces": {
"POST": (
{},
{
"display_name": "Flavor Quota",
"description": "DESCRIPTION1",
"self": "/v2/metadefs/namespaces/%s" % 'NamespaceNew',
"namespace": 'NamespaceNew',
"visibility": "public",
"protected": True,
"owner": "admin",
"schema": "/v2/schemas/metadefs/namespace",
"created_at": "2014-08-14T09:07:06Z",
"updated_at": "2014-08-14T09:07:06Z",
}
)
},
"/v2/metadefs/namespaces/%s" % NAMESPACE1: {
"GET": (
{},
{
"display_name": "Flavor Quota",
"description": "DESCRIPTION1",
"objects": [
{
"description": "DESCRIPTION2",
"name": "OBJECT1",
"self": "/v2/metadefs/namespaces/%s/objects/" %
OBJECT1,
"required": [],
"properties": {
PROPERTY1: {
"type": "integer",
"description": "DESCRIPTION3",
"title": "Quota: CPU Shares"
},
PROPERTY2: {
"minimum": 1000,
"type": "integer",
"description": "DESCRIPTION4",
"maximum": 1000000,
"title": "Quota: CPU Period"
},
},
"schema": "/v2/schemas/metadefs/object"
}
],
"self": "/v2/metadefs/namespaces/%s" % NAMESPACE1,
"namespace": NAMESPACE1,
"visibility": "public",
"protected": True,
"owner": "admin",
"resource_types": [
{
"name": RESOURCE_TYPE1
}
],
"schema": "/v2/schemas/metadefs/namespace",
"created_at": "2014-08-14T09:07:06Z",
"updated_at": "2014-08-14T09:07:06Z",
}
),
"PUT": (
{},
{
"display_name": "Flavor Quota",
"description": "DESCRIPTION1",
"objects": [
{
"description": "DESCRIPTION2",
"name": "OBJECT1",
"self": "/v2/metadefs/namespaces/%s/objects/" %
OBJECT1,
"required": [],
"properties": {
PROPERTY1: {
"type": "integer",
"description": "DESCRIPTION3",
"title": "Quota: CPU Shares"
},
PROPERTY2: {
"minimum": 1000,
"type": "integer",
"description": "DESCRIPTION4",
"maximum": 1000000,
"title": "Quota: CPU Period"
},
},
"schema": "/v2/schemas/metadefs/object"
}
],
"self": "/v2/metadefs/namespaces/%s" % NAMESPACENEW,
"namespace": NAMESPACENEW,
"visibility": "public",
"protected": True,
"owner": "admin",
"resource_types": [
{
"name": RESOURCE_TYPE1
}
],
"schema": "/v2/schemas/metadefs/namespace",
"created_at": "2014-08-14T09:07:06Z",
"updated_at": "2014-08-14T09:07:06Z",
}
),
"DELETE": (
{},
{}
)
},
"/v2/metadefs/namespaces/%s?resource_type=%s" % (NAMESPACE6,
RESOURCE_TYPE1):
{
"GET": (
{},
{
"display_name": "Flavor Quota",
"description": "DESCRIPTION1",
"objects": [],
"self": "/v2/metadefs/namespaces/%s" % NAMESPACE1,
"namespace": NAMESPACE6,
"visibility": "public",
"protected": True,
"owner": "admin",
"resource_types": [
{
"name": RESOURCE_TYPE1
}
],
"schema": "/v2/schemas/metadefs/namespace",
"created_at": "2014-08-14T09:07:06Z",
"updated_at": "2014-08-14T09:07:06Z",
}
),
},
}
schema_fixtures = {
"metadefs/namespace":
{
"GET": (
{},
{
"additionalProperties": False,
"definitions": {
"property": {
"additionalProperties": {
"required": [
"title",
"type"
],
"type": "object",
"properties": {
"additionalItems": {
"type": "boolean"
},
"enum": {
"type": "array"
},
"description": {
"type": "string"
},
"title": {
"type": "string"
},
"default": {},
"minLength": {
"$ref": "#/definitions/"
"positiveIntegerDefault0"
},
"required": {
"$ref": "#/definitions/stringArray"
},
"maximum": {
"type": "number"
},
"minItems": {
"$ref": "#/definitions/"
"positiveIntegerDefault0"
},
"readonly": {
"type": "boolean"
},
"minimum": {
"type": "number"
},
"maxItems": {
"$ref": "#/definitions/"
"positiveInteger"
},
"maxLength": {
"$ref": "#/definitions/positiveInteger"
},
"uniqueItems": {
"default": False,
"type": "boolean"
},
"pattern": {
"type": "string",
"format": "regex"
},
"items": {
"type": "object",
"properties": {
"enum": {
"type": "array"
},
"type": {
"enum": [
"array",
"boolean",
"integer",
"number",
"object",
"string",
"null"
],
"type": "string"
}
}
},
"type": {
"enum": [
"array",
"boolean",
"integer",
"number",
"object",
"string",
"null"
],
"type": "string"
}
}
},
"type": "object"
},
"positiveIntegerDefault0": {
"allOf": [
{
"$ref": "#/definitions/positiveInteger"
},
{
"default": 0
}
]
},
"stringArray": {
"uniqueItems": True,
"items": {
"type": "string"
},
"type": "array"
},
"positiveInteger": {
"minimum": 0,
"type": "integer"
}
},
"required": [
"namespace"
],
"name": "namespace",
"properties": {
"description": {
"type": "string",
"description": "Provides a user friendly description "
"of the namespace.",
"maxLength": 500
},
"updated_at": {
"type": "string",
"description": "Date and time of the last namespace "
"modification (READ-ONLY)",
"format": "date-time"
},
"visibility": {
"enum": [
"public",
"private"
],
"type": "string",
"description": "Scope of namespace accessibility."
},
"self": {
"type": "string"
},
"objects": {
"items": {
"type": "object",
"properties": {
"properties": {
"$ref": "#/definitions/property"
},
"required": {
"$ref": "#/definitions/stringArray"
},
"name": {
"type": "string"
},
"description": {
"type": "string"
}
}
},
"type": "array"
},
"owner": {
"type": "string",
"description": "Owner of the namespace.",
"maxLength": 255
},
"resource_types": {
"items": {
"type": "object",
"properties": {
"prefix": {
"type": "string"
},
"name": {
"type": "string"
},
"metadata_type": {
"type": "string"
}
}
},
"type": "array"
},
"properties": {
"$ref": "#/definitions/property"
},
"display_name": {
"type": "string",
"description": "The user friendly name for the "
"namespace. Used by UI if available.",
"maxLength": 80
},
"created_at": {
"type": "string",
"description": "Date and time of namespace creation "
"(READ-ONLY)",
"format": "date-time"
},
"namespace": {
"type": "string",
"description": "The unique namespace text.",
"maxLength": 80
},
"protected": {
"type": "boolean",
"description": "If true, namespace will not be "
"deletable."
},
"schema": {
"type": "string"
}
}
}
),
}
}
class TestNamespaceController(testtools.TestCase):
def setUp(self):
super(TestNamespaceController, self).setUp()
self.api = utils.FakeAPI(data_fixtures)
self.schema_api = utils.FakeSchemaAPI(schema_fixtures)
self.controller = metadefs.NamespaceController(self.api,
self.schema_api)
def test_list_namespaces(self):
namespaces = list(self.controller.list())
self.assertEqual(2, len(namespaces))
self.assertEqual(NAMESPACE1, namespaces[0]['namespace'])
self.assertEqual(NAMESPACE2, namespaces[1]['namespace'])
def test_list_namespaces_paginate(self):
namespaces = list(self.controller.list(page_size=1))
self.assertEqual(2, len(namespaces))
self.assertEqual(NAMESPACE7, namespaces[0]['namespace'])
self.assertEqual(NAMESPACE8, namespaces[1]['namespace'])
def test_list_with_limit_greater_than_page_size(self):
namespaces = list(self.controller.list(page_size=1, limit=2))
self.assertEqual(2, len(namespaces))
self.assertEqual(NAMESPACE7, namespaces[0]['namespace'])
self.assertEqual(NAMESPACE8, namespaces[1]['namespace'])
def test_list_with_marker(self):
namespaces = list(self.controller.list(marker=NAMESPACE6, page_size=2))
self.assertEqual(2, len(namespaces))
self.assertEqual(NAMESPACE7, namespaces[0]['namespace'])
self.assertEqual(NAMESPACE8, namespaces[1]['namespace'])
def test_list_with_sort_dir(self):
namespaces = list(self.controller.list(sort_dir='asc', limit=1))
self.assertEqual(1, len(namespaces))
self.assertEqual(NAMESPACE1, namespaces[0]['namespace'])
def test_list_with_sort_dir_invalid(self):
# NOTE(TravT): The clients work by returning an iterator.
# Invoking the iterator is what actually executes the logic.
ns_iterator = self.controller.list(sort_dir='foo')
self.assertRaises(ValueError, next, ns_iterator)
def test_list_with_sort_key(self):
namespaces = list(self.controller.list(sort_key='created_at', limit=1))
self.assertEqual(1, len(namespaces))
self.assertEqual(NAMESPACE1, namespaces[0]['namespace'])
def test_list_with_sort_key_invalid(self):
# NOTE(TravT): The clients work by returning an iterator.
# Invoking the iterator is what actually executes the logic.
ns_iterator = self.controller.list(sort_key='foo')
self.assertRaises(ValueError, next, ns_iterator)
def test_list_namespaces_with_one_resource_type_filter(self):
namespaces = list(self.controller.list(
filters={
'resource_types': [RESOURCE_TYPE1]
}
))
self.assertEqual(1, len(namespaces))
self.assertEqual(NAMESPACE3, namespaces[0]['namespace'])
def test_list_namespaces_with_multiple_resource_types_filter(self):
namespaces = list(self.controller.list(
filters={
'resource_types': [RESOURCE_TYPE1, RESOURCE_TYPE2]
}
))
self.assertEqual(1, len(namespaces))
self.assertEqual(NAMESPACE4, namespaces[0]['namespace'])
def test_list_namespaces_with_visibility_filter(self):
namespaces = list(self.controller.list(
filters={
'visibility': 'private'
}
))
self.assertEqual(1, len(namespaces))
self.assertEqual(NAMESPACE5, namespaces[0]['namespace'])
def test_get_namespace(self):
namespace = self.controller.get(NAMESPACE1)
self.assertEqual(NAMESPACE1, namespace.namespace)
self.assertTrue(namespace.protected)
def test_get_namespace_with_resource_type(self):
namespace = self.controller.get(NAMESPACE6,
resource_type=RESOURCE_TYPE1)
self.assertEqual(NAMESPACE6, namespace.namespace)
self.assertTrue(namespace.protected)
def test_create_namespace(self):
properties = {
'namespace': NAMESPACENEW
}
namespace = self.controller.create(**properties)
self.assertEqual(NAMESPACENEW, namespace.namespace)
self.assertTrue(namespace.protected)
def test_create_namespace_invalid_data(self):
properties = {}
self.assertRaises(TypeError, self.controller.create, **properties)
def test_create_namespace_invalid_property(self):
properties = {'namespace': 'NewNamespace', 'protected': '123'}
self.assertRaises(TypeError, self.controller.create, **properties)
def test_update_namespace(self):
properties = {'display_name': 'My Updated Name'}
namespace = self.controller.update(NAMESPACE1, **properties)
self.assertEqual(NAMESPACE1, namespace.namespace)
def test_update_namespace_invalid_property(self):
properties = {'protected': '123'}
self.assertRaises(TypeError, self.controller.update, NAMESPACE1,
**properties)
def test_delete_namespace(self):
self.controller.delete(NAMESPACE1)
expect = [
('DELETE',
'/v2/metadefs/namespaces/%s' % NAMESPACE1,
{},
None)]
self.assertEqual(expect, self.api.calls)

View File

@ -1,324 +0,0 @@
# Copyright 2012 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import six
import testtools
from glanceclient.v2 import metadefs
from tests import utils
NAMESPACE1 = 'Namespace1'
OBJECT1 = 'Object1'
OBJECT2 = 'Object2'
OBJECTNEW = 'ObjectNew'
PROPERTY1 = 'Property1'
PROPERTY2 = 'Property2'
PROPERTY3 = 'Property3'
PROPERTY4 = 'Property4'
def _get_object_fixture(ns_name, obj_name, **kwargs):
obj = {
"description": "DESCRIPTION",
"name": obj_name,
"self": "/v2/metadefs/namespaces/%s/objects/%s" %
(ns_name, obj_name),
"required": [],
"properties": {
PROPERTY1: {
"type": "integer",
"description": "DESCRIPTION",
"title": "Quota: CPU Shares"
},
PROPERTY2: {
"minimum": 1000,
"type": "integer",
"description": "DESCRIPTION",
"maximum": 1000000,
"title": "Quota: CPU Period"
}},
"schema": "/v2/schemas/metadefs/object",
"created_at": "2014-08-14T09:07:06Z",
"updated_at": "2014-08-14T09:07:06Z",
}
obj.update(kwargs)
return obj
data_fixtures = {
"/v2/metadefs/namespaces/%s/objects" % NAMESPACE1: {
"GET": (
{},
{
"objects": [
_get_object_fixture(NAMESPACE1, OBJECT1),
_get_object_fixture(NAMESPACE1, OBJECT2)
],
"schema": "v2/schemas/metadefs/objects"
}
),
"POST": (
{},
_get_object_fixture(NAMESPACE1, OBJECTNEW)
),
"DELETE": (
{},
{}
)
},
"/v2/metadefs/namespaces/%s/objects/%s" % (NAMESPACE1, OBJECT1): {
"GET": (
{},
_get_object_fixture(NAMESPACE1, OBJECT1)
),
"PUT": (
{},
_get_object_fixture(NAMESPACE1, OBJECT1)
),
"DELETE": (
{},
{}
)
}
}
schema_fixtures = {
"metadefs/object": {
"GET": (
{},
{
"additionalProperties": False,
"definitions": {
"property": {
"additionalProperties": {
"required": [
"title",
"type"
],
"type": "object",
"properties": {
"additionalItems": {
"type": "boolean"
},
"enum": {
"type": "array"
},
"description": {
"type": "string"
},
"title": {
"type": "string"
},
"default": {},
"minLength": {
"$ref": "#/definitions/positiveInteger"
"Default0"
},
"required": {
"$ref": "#/definitions/stringArray"
},
"maximum": {
"type": "number"
},
"minItems": {
"$ref": "#/definitions/positiveInteger"
"Default0"
},
"readonly": {
"type": "boolean"
},
"minimum": {
"type": "number"
},
"maxItems": {
"$ref": "#/definitions/positiveInteger"
},
"maxLength": {
"$ref": "#/definitions/positiveInteger"
},
"uniqueItems": {
"default": False,
"type": "boolean"
},
"pattern": {
"type": "string",
"format": "regex"
},
"items": {
"type": "object",
"properties": {
"enum": {
"type": "array"
},
"type": {
"enum": [
"array",
"boolean",
"integer",
"number",
"object",
"string",
"null"
],
"type": "string"
}
}
},
"type": {
"enum": [
"array",
"boolean",
"integer",
"number",
"object",
"string",
"null"
],
"type": "string"
}
}
},
"type": "object"
},
"positiveIntegerDefault0": {
"allOf": [
{
"$ref": "#/definitions/positiveInteger"
},
{
"default": 0
}
]
},
"stringArray": {
"uniqueItems": True,
"items": {
"type": "string"
},
"type": "array"
},
"positiveInteger": {
"minimum": 0,
"type": "integer"
}
},
"required": [
"name"
],
"name": "object",
"properties": {
"created_at": {
"type": "string",
"description": "Date and time of object creation "
"(READ-ONLY)",
"format": "date-time"
},
"description": {
"type": "string"
},
"name": {
"type": "string"
},
"self": {
"type": "string"
},
"required": {
"$ref": "#/definitions/stringArray"
},
"properties": {
"$ref": "#/definitions/property"
},
"schema": {
"type": "string"
},
"updated_at": {
"type": "string",
"description": "Date and time of the last object "
"modification (READ-ONLY)",
"format": "date-time"
},
}
}
)
}
}
class TestObjectController(testtools.TestCase):
def setUp(self):
super(TestObjectController, self).setUp()
self.api = utils.FakeAPI(data_fixtures)
self.schema_api = utils.FakeSchemaAPI(schema_fixtures)
self.controller = metadefs.ObjectController(self.api, self.schema_api)
def test_list_object(self):
objects = list(self.controller.list(NAMESPACE1))
actual = [obj.name for obj in objects]
self.assertEqual([OBJECT1, OBJECT2], actual)
def test_get_object(self):
obj = self.controller.get(NAMESPACE1, OBJECT1)
self.assertEqual(OBJECT1, obj.name)
self.assertEqual(sorted([PROPERTY1, PROPERTY2]),
sorted(list(six.iterkeys(obj.properties))))
def test_create_object(self):
properties = {
'name': OBJECTNEW,
'description': 'DESCRIPTION'
}
obj = self.controller.create(NAMESPACE1, **properties)
self.assertEqual(OBJECTNEW, obj.name)
def test_create_object_invalid_property(self):
properties = {
'namespace': NAMESPACE1
}
self.assertRaises(TypeError, self.controller.create, **properties)
def test_update_object(self):
properties = {
'description': 'UPDATED_DESCRIPTION'
}
obj = self.controller.update(NAMESPACE1, OBJECT1, **properties)
self.assertEqual(OBJECT1, obj.name)
def test_update_object_invalid_property(self):
properties = {
'required': 'INVALID'
}
self.assertRaises(TypeError, self.controller.update, NAMESPACE1,
OBJECT1, **properties)
def test_delete_object(self):
self.controller.delete(NAMESPACE1, OBJECT1)
expect = [
('DELETE',
'/v2/metadefs/namespaces/%s/objects/%s' % (NAMESPACE1, OBJECT1),
{},
None)]
self.assertEqual(expect, self.api.calls)
def test_delete_all_objects(self):
self.controller.delete_all(NAMESPACE1)
expect = [
('DELETE',
'/v2/metadefs/namespaces/%s/objects' % NAMESPACE1,
{},
None)]
self.assertEqual(expect, self.api.calls)

View File

@ -1,301 +0,0 @@
# Copyright 2012 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import testtools
from glanceclient.v2 import metadefs
from tests import utils
NAMESPACE1 = 'Namespace1'
PROPERTY1 = 'Property1'
PROPERTY2 = 'Property2'
PROPERTYNEW = 'PropertyNew'
data_fixtures = {
"/v2/metadefs/namespaces/%s/properties" % NAMESPACE1: {
"GET": (
{},
{
"properties": {
PROPERTY1: {
"default": "1",
"type": "integer",
"description": "Number of cores.",
"title": "cores"
},
PROPERTY2: {
"items": {
"enum": [
"Intel",
"AMD"
],
"type": "string"
},
"type": "array",
"description": "Specifies the CPU manufacturer.",
"title": "Vendor"
},
}
}
),
"POST": (
{},
{
"items": {
"enum": [
"Intel",
"AMD"
],
"type": "string"
},
"type": "array",
"description": "UPDATED_DESCRIPTION",
"title": "Vendor",
"name": PROPERTYNEW
}
),
"DELETE": (
{},
{}
)
},
"/v2/metadefs/namespaces/%s/properties/%s" % (NAMESPACE1, PROPERTY1): {
"GET": (
{},
{
"items": {
"enum": [
"Intel",
"AMD"
],
"type": "string"
},
"type": "array",
"description": "Specifies the CPU manufacturer.",
"title": "Vendor"
}
),
"PUT": (
{},
{
"items": {
"enum": [
"Intel",
"AMD"
],
"type": "string"
},
"type": "array",
"description": "UPDATED_DESCRIPTION",
"title": "Vendor"
}
),
"DELETE": (
{},
{}
)
}
}
schema_fixtures = {
"metadefs/property": {
"GET": (
{},
{
"additionalProperties": False,
"definitions": {
"positiveIntegerDefault0": {
"allOf": [
{
"$ref": "#/definitions/positiveInteger"
},
{
"default": 0
}
]
},
"stringArray": {
"minItems": 1,
"items": {
"type": "string"
},
"uniqueItems": True,
"type": "array"
},
"positiveInteger": {
"minimum": 0,
"type": "integer"
}
},
"required": [
"name",
"title",
"type"
],
"name": "property",
"properties": {
"description": {
"type": "string"
},
"minLength": {
"$ref": "#/definitions/positiveIntegerDefault0"
},
"enum": {
"type": "array"
},
"minimum": {
"type": "number"
},
"maxItems": {
"$ref": "#/definitions/positiveInteger"
},
"maxLength": {
"$ref": "#/definitions/positiveInteger"
},
"uniqueItems": {
"default": False,
"type": "boolean"
},
"additionalItems": {
"type": "boolean"
},
"name": {
"type": "string"
},
"title": {
"type": "string"
},
"default": {},
"pattern": {
"type": "string",
"format": "regex"
},
"required": {
"$ref": "#/definitions/stringArray"
},
"maximum": {
"type": "number"
},
"minItems": {
"$ref": "#/definitions/positiveIntegerDefault0"
},
"readonly": {
"type": "boolean"
},
"items": {
"type": "object",
"properties": {
"enum": {
"type": "array"
},
"type": {
"enum": [
"array",
"boolean",
"integer",
"number",
"object",
"string",
"null"
],
"type": "string"
}
}
},
"type": {
"enum": [
"array",
"boolean",
"integer",
"number",
"object",
"string",
"null"
],
"type": "string"
}
}
}
)
}
}
class TestPropertyController(testtools.TestCase):
def setUp(self):
super(TestPropertyController, self).setUp()
self.api = utils.FakeAPI(data_fixtures)
self.schema_api = utils.FakeSchemaAPI(schema_fixtures)
self.controller = metadefs.PropertyController(self.api,
self.schema_api)
def test_list_property(self):
properties = list(self.controller.list(NAMESPACE1))
actual = [prop.name for prop in properties]
self.assertEqual(sorted([PROPERTY1, PROPERTY2]), sorted(actual))
def test_get_property(self):
prop = self.controller.get(NAMESPACE1, PROPERTY1)
self.assertEqual(PROPERTY1, prop.name)
def test_create_property(self):
properties = {
'name': PROPERTYNEW,
'title': 'TITLE',
'type': 'string'
}
obj = self.controller.create(NAMESPACE1, **properties)
self.assertEqual(PROPERTYNEW, obj.name)
def test_create_property_invalid_property(self):
properties = {
'namespace': NAMESPACE1
}
self.assertRaises(TypeError, self.controller.create, **properties)
def test_update_property(self):
properties = {
'description': 'UPDATED_DESCRIPTION'
}
prop = self.controller.update(NAMESPACE1, PROPERTY1, **properties)
self.assertEqual(PROPERTY1, prop.name)
def test_update_property_invalid_property(self):
properties = {
'type': 'INVALID'
}
self.assertRaises(TypeError, self.controller.update, NAMESPACE1,
PROPERTY1, **properties)
def test_delete_property(self):
self.controller.delete(NAMESPACE1, PROPERTY1)
expect = [
('DELETE',
'/v2/metadefs/namespaces/%s/properties/%s' % (NAMESPACE1,
PROPERTY1),
{},
None)]
self.assertEqual(expect, self.api.calls)
def test_delete_all_properties(self):
self.controller.delete_all(NAMESPACE1)
expect = [
('DELETE',
'/v2/metadefs/namespaces/%s/properties' % NAMESPACE1,
{},
None)]
self.assertEqual(expect, self.api.calls)

View File

@ -1,187 +0,0 @@
# Copyright 2012 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import testtools
from glanceclient.v2 import metadefs
from tests import utils
NAMESPACE1 = 'Namespace1'
RESOURCE_TYPE1 = 'ResourceType1'
RESOURCE_TYPE2 = 'ResourceType2'
RESOURCE_TYPE3 = 'ResourceType3'
RESOURCE_TYPE4 = 'ResourceType4'
RESOURCE_TYPENEW = 'ResourceTypeNew'
data_fixtures = {
"/v2/metadefs/namespaces/%s/resource_types" % NAMESPACE1: {
"GET": (
{},
{
"resource_type_associations": [
{
"name": RESOURCE_TYPE3,
"created_at": "2014-08-14T09:07:06Z",
"updated_at": "2014-08-14T09:07:06Z",
},
{
"name": RESOURCE_TYPE4,
"prefix": "PREFIX:",
"created_at": "2014-08-14T09:07:06Z",
"updated_at": "2014-08-14T09:07:06Z",
}
]
}
),
"POST": (
{},
{
"name": RESOURCE_TYPENEW,
"prefix": "PREFIX:",
"created_at": "2014-08-14T09:07:06Z",
"updated_at": "2014-08-14T09:07:06Z",
}
),
},
"/v2/metadefs/namespaces/%s/resource_types/%s" % (NAMESPACE1,
RESOURCE_TYPE1):
{
"DELETE": (
{},
{}
),
},
"/v2/metadefs/resource_types": {
"GET": (
{},
{
"resource_types": [
{
"name": RESOURCE_TYPE1,
"created_at": "2014-08-14T09:07:06Z",
"updated_at": "2014-08-14T09:07:06Z",
},
{
"name": RESOURCE_TYPE2,
"created_at": "2014-08-14T09:07:06Z",
"updated_at": "2014-08-14T09:07:06Z",
}
]
}
)
}
}
schema_fixtures = {
"metadefs/resource_type": {
"GET": (
{},
{
"name": "resource_type",
"properties": {
"prefix": {
"type": "string",
"description": "Specifies the prefix to use for the "
"given resource type. Any properties "
"in the namespace should be prefixed "
"with this prefix when being applied "
"to the specified resource type. Must "
"include prefix separator (e.g. a "
"colon :).",
"maxLength": 80
},
"properties_target": {
"type": "string",
"description": "Some resource types allow more than "
"one key / value pair per instance. "
"For example, Cinder allows user and "
"image metadata on volumes. Only the "
"image properties metadata is "
"evaluated by Nova (scheduling or "
"drivers). This property allows a "
"namespace target to remove the "
"ambiguity.",
"maxLength": 80
},
"name": {
"type": "string",
"description": "Resource type names should be "
"aligned with Heat resource types "
"whenever possible: http://docs."
"openstack.org/developer/heat/"
"template_guide/openstack.html",
"maxLength": 80
},
"created_at": {
"type": "string",
"description": "Date and time of resource type "
"association (READ-ONLY)",
"format": "date-time"
},
"updated_at": {
"type": "string",
"description": "Date and time of the last resource "
"type association modification "
"(READ-ONLY)",
"format": "date-time"
},
}
}
)
}
}
class TestResoureTypeController(testtools.TestCase):
def setUp(self):
super(TestResoureTypeController, self).setUp()
self.api = utils.FakeAPI(data_fixtures)
self.schema_api = utils.FakeSchemaAPI(schema_fixtures)
self.controller = metadefs.ResourceTypeController(self.api,
self.schema_api)
def test_list_resource_types(self):
resource_types = list(self.controller.list())
names = [rt.name for rt in resource_types]
self.assertEqual([RESOURCE_TYPE1, RESOURCE_TYPE2], names)
def test_get_resource_types(self):
resource_types = list(self.controller.get(NAMESPACE1))
names = [rt.name for rt in resource_types]
self.assertEqual([RESOURCE_TYPE3, RESOURCE_TYPE4], names)
def test_associate_resource_types(self):
resource_types = self.controller.associate(NAMESPACE1,
name=RESOURCE_TYPENEW)
self.assertEqual(RESOURCE_TYPENEW, resource_types['name'])
def test_associate_resource_types_invalid_property(self):
longer = '1234' * 50
properties = {'name': RESOURCE_TYPENEW, 'prefix': longer}
self.assertRaises(TypeError, self.controller.associate, NAMESPACE1,
**properties)
def test_deassociate_resource_types(self):
self.controller.deassociate(NAMESPACE1, RESOURCE_TYPE1)
expect = [
('DELETE',
'/v2/metadefs/namespaces/%s/resource_types/%s' % (NAMESPACE1,
RESOURCE_TYPE1),
{},
None)]
self.assertEqual(expect, self.api.calls)

View File

@ -1,217 +0,0 @@
# Copyright 2012 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from jsonpatch import JsonPatch
import testtools
import warlock
from glanceclient.v2 import schemas
from tests import utils
fixtures = {
'/v2/schemas': {
'GET': (
{},
{
'image': '/v2/schemas/image',
'access': '/v2/schemas/image/access',
},
),
},
'/v2/schemas/image': {
'GET': (
{},
{
'name': 'image',
'properties': {
'name': {'type': 'string',
'description': 'Name of image'},
'tags': {'type': 'array'}
},
},
),
},
}
_SCHEMA = schemas.Schema({
'name': 'image',
'properties': {
'name': {'type': 'string'},
'color': {'type': 'string'},
'shape': {'type': 'string', 'is_base': False},
'tags': {'type': 'array'}
},
})
def compare_json_patches(a, b):
"""Return 0 if a and b describe the same JSON patch."""
return JsonPatch.from_string(a) == JsonPatch.from_string(b)
class TestSchemaProperty(testtools.TestCase):
def test_property_minimum(self):
prop = schemas.SchemaProperty('size')
self.assertEqual('size', prop.name)
def test_property_description(self):
prop = schemas.SchemaProperty('size', description='some quantity')
self.assertEqual('size', prop.name)
self.assertEqual('some quantity', prop.description)
class TestSchema(testtools.TestCase):
def test_schema_minimum(self):
raw_schema = {'name': 'Country', 'properties': {}}
schema = schemas.Schema(raw_schema)
self.assertEqual('Country', schema.name)
self.assertEqual([], schema.properties)
def test_schema_with_property(self):
raw_schema = {'name': 'Country', 'properties': {'size': {}}}
schema = schemas.Schema(raw_schema)
self.assertEqual('Country', schema.name)
self.assertEqual(['size'], [p.name for p in schema.properties])
def test_raw(self):
raw_schema = {'name': 'Country', 'properties': {}}
schema = schemas.Schema(raw_schema)
self.assertEqual(raw_schema, schema.raw())
class TestController(testtools.TestCase):
def setUp(self):
super(TestController, self).setUp()
self.api = utils.FakeAPI(fixtures)
self.controller = schemas.Controller(self.api)
def test_get_schema(self):
schema = self.controller.get('image')
self.assertEqual('image', schema.name)
self.assertEqual(['name', 'tags'],
[p.name for p in schema.properties])
class TestSchemaBasedModel(testtools.TestCase):
def setUp(self):
super(TestSchemaBasedModel, self).setUp()
self.model = warlock.model_factory(_SCHEMA.raw(),
schemas.SchemaBasedModel)
def test_patch_should_replace_missing_core_properties(self):
obj = {
'name': 'fred'
}
original = self.model(obj)
original['color'] = 'red'
patch = original.patch
expected = '[{"path": "/color", "value": "red", "op": "replace"}]'
self.assertTrue(compare_json_patches(patch, expected))
def test_patch_should_add_extra_properties(self):
obj = {
'name': 'fred',
}
original = self.model(obj)
original['weight'] = '10'
patch = original.patch
expected = '[{"path": "/weight", "value": "10", "op": "add"}]'
self.assertTrue(compare_json_patches(patch, expected))
def test_patch_should_replace_extra_properties(self):
obj = {
'name': 'fred',
'weight': '10'
}
original = self.model(obj)
original['weight'] = '22'
patch = original.patch
expected = '[{"path": "/weight", "value": "22", "op": "replace"}]'
self.assertTrue(compare_json_patches(patch, expected))
def test_patch_should_remove_extra_properties(self):
obj = {
'name': 'fred',
'weight': '10'
}
original = self.model(obj)
del original['weight']
patch = original.patch
expected = '[{"path": "/weight", "op": "remove"}]'
self.assertTrue(compare_json_patches(patch, expected))
def test_patch_should_remove_core_properties(self):
obj = {
'name': 'fred',
'color': 'red'
}
original = self.model(obj)
del original['color']
patch = original.patch
expected = '[{"path": "/color", "op": "remove"}]'
self.assertTrue(compare_json_patches(patch, expected))
def test_patch_should_add_missing_custom_properties(self):
obj = {
'name': 'fred'
}
original = self.model(obj)
original['shape'] = 'circle'
patch = original.patch
expected = '[{"path": "/shape", "value": "circle", "op": "add"}]'
self.assertTrue(compare_json_patches(patch, expected))
def test_patch_should_replace_custom_properties(self):
obj = {
'name': 'fred',
'shape': 'circle'
}
original = self.model(obj)
original['shape'] = 'square'
patch = original.patch
expected = '[{"path": "/shape", "value": "square", "op": "replace"}]'
self.assertTrue(compare_json_patches(patch, expected))
def test_patch_should_replace_tags(self):
obj = {'name': 'fred', }
original = self.model(obj)
original['tags'] = ['tag1', 'tag2']
patch = original.patch
expected = '[{"path": "/tags", "value": ["tag1", "tag2"], ' \
'"op": "replace"}]'
self.assertTrue(compare_json_patches(patch, expected))

File diff suppressed because it is too large Load Diff

View File

@ -1,82 +0,0 @@
# Copyright 2013 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import testtools
from glanceclient.v2 import image_tags
from tests import utils
IMAGE = '3a4560a1-e585-443e-9b39-553b46ec92d1'
TAG = 'tag01'
data_fixtures = {
'/v2/images/{image}/tags/{tag_value}'.format(image=IMAGE, tag_value=TAG): {
'DELETE': (
{},
None,
),
'PUT': (
{},
{
'image_id': IMAGE,
'tag_value': TAG
}
),
}
}
schema_fixtures = {
'tag': {
'GET': (
{},
{'name': 'image', 'properties': {'image_id': {}, 'tags': {}}}
)
}
}
class TestController(testtools.TestCase):
def setUp(self):
super(TestController, self).setUp()
self.api = utils.FakeAPI(data_fixtures)
self.schema_api = utils.FakeSchemaAPI(schema_fixtures)
self.controller = image_tags.Controller(self.api, self.schema_api)
def test_update_image_tag(self):
image_id = IMAGE
tag_value = TAG
self.controller.update(image_id, tag_value)
expect = [
('PUT',
'/v2/images/{image}/tags/{tag_value}'.format(image=IMAGE,
tag_value=TAG),
{},
None)]
self.assertEqual(expect, self.api.calls)
def test_delete_image_tag(self):
image_id = IMAGE
tag_value = TAG
self.controller.delete(image_id, tag_value)
expect = [
('DELETE',
'/v2/images/{image}/tags/{tag_value}'.format(image=IMAGE,
tag_value=TAG),
{},
None)]
self.assertEqual(expect, self.api.calls)

View File

@ -1,278 +0,0 @@
# Copyright 2013 OpenStack Foundation.
# Copyright 2013 IBM Corp.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import testtools
from glanceclient.v2 import tasks
from tests import utils
_OWNED_TASK_ID = 'a4963502-acc7-42ba-ad60-5aa0962b7faf'
_OWNER_ID = '6bd473f0-79ae-40ad-a927-e07ec37b642f'
_FAKE_OWNER_ID = '63e7f218-29de-4477-abdc-8db7c9533188'
fixtures = {
'/v2/tasks?limit=%d' % tasks.DEFAULT_PAGE_SIZE: {
'GET': (
{},
{'tasks': [
{
'id': '3a4560a1-e585-443e-9b39-553b46ec92d1',
'type': 'import',
'status': 'pending',
},
{
'id': '6f99bf80-2ee6-47cf-acfe-1f1fabb7e810',
'type': 'import',
'status': 'processing',
},
]},
),
},
'/v2/tasks?limit=1': {
'GET': (
{},
{
'tasks': [
{
'id': '3a4560a1-e585-443e-9b39-553b46ec92d1',
'type': 'import',
'status': 'pending',
},
],
'next': ('/v2/tasks?limit=1&'
'marker=3a4560a1-e585-443e-9b39-553b46ec92d1'),
},
),
},
('/v2/tasks?limit=1&marker=3a4560a1-e585-443e-9b39-553b46ec92d1'): {
'GET': (
{},
{'tasks': [
{
'id': '6f99bf80-2ee6-47cf-acfe-1f1fabb7e810',
'type': 'import',
'status': 'pending',
},
]},
),
},
'/v2/tasks/3a4560a1-e585-443e-9b39-553b46ec92d1': {
'GET': (
{},
{
'id': '3a4560a1-e585-443e-9b39-553b46ec92d1',
'type': 'import',
'status': 'pending',
},
),
'PATCH': (
{},
'',
),
},
'/v2/tasks/e7e59ff6-fa2e-4075-87d3-1a1398a07dc3': {
'GET': (
{},
{
'id': 'e7e59ff6-fa2e-4075-87d3-1a1398a07dc3',
'type': 'import',
'status': 'pending',
},
),
'PATCH': (
{},
'',
),
},
'/v2/tasks': {
'POST': (
{},
{
'id': '3a4560a1-e585-443e-9b39-553b46ec92d1',
'type': 'import',
'status': 'pending',
'input': '{"import_from": "file:///", '
'"import_from_format": "qcow2"}'
},
),
},
'/v2/tasks?limit=%d&owner=%s' % (tasks.DEFAULT_PAGE_SIZE, _OWNER_ID): {
'GET': (
{},
{'tasks': [
{
'id': _OWNED_TASK_ID,
},
]},
),
},
'/v2/tasks?limit=%d&status=processing' % (tasks.DEFAULT_PAGE_SIZE): {
'GET': (
{},
{'tasks': [
{
'id': _OWNED_TASK_ID,
},
]},
),
},
'/v2/tasks?limit=%d&type=import' % (tasks.DEFAULT_PAGE_SIZE): {
'GET': (
{},
{'tasks': [
{
'id': _OWNED_TASK_ID,
},
]},
),
},
'/v2/tasks?limit=%d&type=fake' % (tasks.DEFAULT_PAGE_SIZE): {
'GET': (
{},
{'tasks': [
]},
),
},
'/v2/tasks?limit=%d&status=fake' % (tasks.DEFAULT_PAGE_SIZE): {
'GET': (
{},
{'tasks': [
]},
),
},
'/v2/tasks?limit=%d&type=import' % (tasks.DEFAULT_PAGE_SIZE): {
'GET': (
{},
{'tasks': [
{
'id': _OWNED_TASK_ID,
},
]},
),
},
'/v2/tasks?limit=%d&owner=%s' % (tasks.DEFAULT_PAGE_SIZE, _FAKE_OWNER_ID):
{
'GET': ({},
{'tasks': []},
),
}
}
schema_fixtures = {
'task': {
'GET': (
{},
{
'name': 'task',
'properties': {
'id': {},
'type': {},
'status': {},
'input': {},
'result': {},
'message': {},
},
}
)
}
}
class TestController(testtools.TestCase):
def setUp(self):
super(TestController, self).setUp()
self.api = utils.FakeAPI(fixtures)
self.schema_api = utils.FakeSchemaAPI(schema_fixtures)
self.controller = tasks.Controller(self.api, self.schema_api)
def test_list_tasks(self):
# NOTE(flwang): cast to list since the controller returns a generator
tasks = list(self.controller.list())
self.assertEqual(tasks[0].id, '3a4560a1-e585-443e-9b39-553b46ec92d1')
self.assertEqual(tasks[0].type, 'import')
self.assertEqual(tasks[0].status, 'pending')
self.assertEqual(tasks[1].id, '6f99bf80-2ee6-47cf-acfe-1f1fabb7e810')
self.assertEqual(tasks[1].type, 'import')
self.assertEqual(tasks[1].status, 'processing')
def test_list_tasks_paginated(self):
# NOTE(flwang): cast to list since the controller returns a generator
tasks = list(self.controller.list(page_size=1))
self.assertEqual(tasks[0].id, '3a4560a1-e585-443e-9b39-553b46ec92d1')
self.assertEqual(tasks[0].type, 'import')
self.assertEqual(tasks[1].id, '6f99bf80-2ee6-47cf-acfe-1f1fabb7e810')
self.assertEqual(tasks[1].type, 'import')
def test_list_tasks_with_status(self):
filters = {'filters': {'status': 'processing'}}
tasks = list(self.controller.list(**filters))
self.assertEqual(tasks[0].id, _OWNED_TASK_ID)
def test_list_tasks_with_wrong_status(self):
filters = {'filters': {'status': 'fake'}}
tasks = list(self.controller.list(**filters))
self.assertEqual(len(tasks), 0)
def test_list_tasks_with_type(self):
filters = {'filters': {'type': 'import'}}
tasks = list(self.controller.list(**filters))
self.assertEqual(tasks[0].id, _OWNED_TASK_ID)
def test_list_tasks_with_wrong_type(self):
filters = {'filters': {'type': 'fake'}}
tasks = list(self.controller.list(**filters))
self.assertEqual(len(tasks), 0)
def test_list_tasks_for_owner(self):
filters = {'filters': {'owner': _OWNER_ID}}
tasks = list(self.controller.list(**filters))
self.assertEqual(tasks[0].id, _OWNED_TASK_ID)
def test_list_tasks_for_fake_owner(self):
filters = {'filters': {'owner': _FAKE_OWNER_ID}}
tasks = list(self.controller.list(**filters))
self.assertEqual(tasks, [])
def test_list_tasks_filters_encoding(self):
filters = {"owner": u"ni\xf1o"}
try:
list(self.controller.list(filters=filters))
except KeyError:
# NOTE(flaper87): It raises KeyError because there's
# no fixture supporting this query:
# /v2/tasks?owner=ni%C3%B1o&limit=20
# We just want to make sure filters are correctly encoded.
pass
self.assertEqual(b"ni\xc3\xb1o", filters["owner"])
def test_get_task(self):
task = self.controller.get('3a4560a1-e585-443e-9b39-553b46ec92d1')
self.assertEqual(task.id, '3a4560a1-e585-443e-9b39-553b46ec92d1')
self.assertEqual(task.type, 'import')
def test_create_task(self):
properties = {
'type': 'import',
'input': {'import_from_format': 'ovf', 'import_from':
'swift://cloud.foo/myaccount/mycontainer/path'},
}
task = self.controller.create(**properties)
self.assertEqual(task.id, '3a4560a1-e585-443e-9b39-553b46ec92d1')
self.assertEqual(task.type, 'import')

View File

@ -1,25 +0,0 @@
_glance_opts="" # lazy init
_glance_flags="" # lazy init
_glance_opts_exp="" # lazy init
_glance()
{
local cur prev nbc cflags
COMPREPLY=()
cur="${COMP_WORDS[COMP_CWORD]}"
prev="${COMP_WORDS[COMP_CWORD-1]}"
if [ "x$_glance_opts" == "x" ] ; then
nbc="`glance bash-completion | sed -e "s/ *-h */ /" -e "s/ *-i */ /"`"
_glance_opts="`echo "$nbc" | sed -e "s/--[a-z0-9_-]*//g" -e "s/ */ /g"`"
_glance_flags="`echo " $nbc" | sed -e "s/ [^-][^-][a-z0-9_-]*//g" -e "s/ */ /g"`"
_glance_opts_exp="`echo "$_glance_opts" | sed 's/^ *//' | tr ' ' '|'`"
fi
if [[ " ${COMP_WORDS[@]} " =~ " "($_glance_opts_exp)" " && "$prev" != "help" ]] ; then
COMPREPLY=($(compgen -W "${_glance_flags}" -- ${cur}))
else
COMPREPLY=($(compgen -W "${_glance_opts}" -- ${cur}))
fi
return 0
}
complete -F _glance glance

View File

@ -1,10 +0,0 @@
#!/bin/bash
command -v tox > /dev/null 2>&1
if [ $? -ne 0 ]; then
echo 'This script requires "tox" to run.'
echo 'You can install it with "pip install tox".'
exit 1;
fi
tox -evenv -- $@