Merge pull request #3 from jistr/wip_apiclient2

API client skeleton variant 2 (taken from ceilometerclient)
This commit is contained in:
Jiří Stránský 2013-07-08 08:40:43 -07:00
commit 306f6aa52d
14 changed files with 1026 additions and 0 deletions

View File

@ -1,6 +1,7 @@
# The list of modules to copy from openstack-common
# The base module to hold the copy of openstack.common

View File

@ -1,7 +1,9 @@

tuskarclient/ Normal file
View File

@ -0,0 +1,96 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from tuskarclient.common import utils
from keystoneclient.v2_0 import client as ksclient
def _get_ksclient(**kwargs):
"""Get an endpoint and auth token from Keystone.
:param kwargs: keyword args containing credentials:
* username: name of user
* password: user's password
* auth_url: endpoint to authenticate against
* insecure: allow insecure SSL (no cert verification)
* tenant_{name|id}: name or ID of tenant
return ksclient.Client(username=kwargs.get('username'),
def _get_endpoint(client, **kwargs):
"""Get an endpoint using the provided keystone client."""
return client.service_catalog.url_for(
service_type=kwargs.get('service_type') or 'metering',
endpoint_type=kwargs.get('endpoint_type') or 'publicURL')
def get_client(api_version, **kwargs):
"""Get an authtenticated client, based on the credentials
in the keyword args.
:param api_version: the API version to use ('1' or '2')
:param kwargs: keyword args containing credentials, either:
* os_auth_token: pre-existing token to re-use
* tuskar_url: tuskar API endpoint
* os_username: name of user
* os_password: user's password
* os_auth_url: endpoint to authenticate against
* insecure: allow insecure SSL (no cert verification)
* os_tenant_{name|id}: name or ID of tenant
if kwargs.get('os_auth_token') and kwargs.get('tuskar_url'):
token = kwargs.get('os_auth_token')
endpoint = kwargs.get('tuskar_url')
elif (kwargs.get('os_username') and
kwargs.get('os_password') and
kwargs.get('os_auth_url') and
(kwargs.get('os_tenant_id') or kwargs.get('os_tenant_name'))):
ks_kwargs = {
'username': kwargs.get('os_username'),
'password': kwargs.get('os_password'),
'tenant_id': kwargs.get('os_tenant_id'),
'tenant_name': kwargs.get('os_tenant_name'),
'auth_url': kwargs.get('os_auth_url'),
'service_type': kwargs.get('os_service_type'),
'endpoint_type': kwargs.get('os_endpoint_type'),
'insecure': kwargs.get('insecure'),
_ksclient = _get_ksclient(**ks_kwargs)
token = kwargs.get('os_auth_token') or _ksclient.auth_token
endpoint = kwargs.get('tuskar_url') or \
_get_endpoint(_ksclient, **ks_kwargs)
cli_kwargs = {
'token': token,
'insecure': kwargs.get('insecure'),
'timeout': kwargs.get('timeout'),
'ca_file': kwargs.get('ca_file'),
'cert_file': kwargs.get('cert_file'),
'key_file': kwargs.get('key_file'),
return Client(api_version, endpoint, **cli_kwargs)
def Client(version, *args, **kwargs):
module = utils.import_versioned_module(version, 'client')
client_class = getattr(module, 'Client')
return client_class(*args, **kwargs)

View File

tuskarclient/common/ Normal file
View File

@ -0,0 +1,142 @@
# Copyright 2012 OpenStack LLC.
# All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
Base utilities to build API operation managers and objects on top of.
import copy
# Python 2.4 compat
except NameError:
def all(iterable):
return True not in (not x for x in iterable)
def getid(obj):
"""Abstracts the common pattern of allowing both an object or an
object's ID (UUID) as a parameter when dealing with relationships.
except AttributeError:
return obj
class Manager(object):
"""Managers interact with a particular type of API
(samples, meters, alarms, etc.) and provide CRUD operations for them.
resource_class = None
def __init__(self, api):
self.api = api
def _create(self, url, body):
resp, body = self.api.json_request('POST', url, body=body)
if body:
return self.resource_class(self, body)
def _list(self, url, response_key=None, obj_class=None, body=None,
resp, body = self.api.json_request('GET', url)
if obj_class is None:
obj_class = self.resource_class
if response_key:
data = body[response_key]
except KeyError:
return []
data = body
if expect_single:
data = [data]
return [obj_class(self, res, loaded=True) for res in data if res]
def _update(self, url, body, response_key=None):
resp, body = self.api.json_request('PUT', url, body=body)
# PUT requests may not return a body
if body:
return self.resource_class(self, body)
def _delete(self, url):
self.api.raw_request('DELETE', url)
class Resource(object):
"""A resource represents a particular instance of an object (tenant, user,
etc). This is pretty much just a bag for attributes.
:param manager: Manager object
:param info: dictionary representing resource attributes
:param loaded: prevent lazy-loading if set to True
def __init__(self, manager, info, loaded=False):
self.manager = manager
self._info = info
self._loaded = loaded
def _add_details(self, info):
for (k, v) in info.iteritems():
setattr(self, k, v)
def __getattr__(self, k):
if k not in self.__dict__:
# NOTE(bcwaldon): disallow lazy-loading if already loaded once
if not self.is_loaded():
return self.__getattr__(k)
raise AttributeError(k)
return self.__dict__[k]
def __repr__(self):
reprkeys = sorted(k for k in self.__dict__.keys() if k[0] != '_' and
k != 'manager')
info = ", ".join("%s=%s" % (k, getattr(self, k)) for k in reprkeys)
return "<%s %s>" % (self.__class__.__name__, info)
def get(self):
# set_loaded() first ... so if we have to bail, we know we tried.
if not hasattr(self.manager, 'get'):
new = self.manager.get(
if new:
def __eq__(self, other):
if not isinstance(other, self.__class__):
return False
if hasattr(self, 'id') and hasattr(other, 'id'):
return ==
return self._info == other._info
def is_loaded(self):
return self._loaded
def set_loaded(self, val):
self._loaded = val
def to_dict(self):
return copy.deepcopy(self._info)

tuskarclient/common/ Normal file
View File

@ -0,0 +1,284 @@
# Copyright 2012 OpenStack LLC.
# All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import copy
import httplib
import logging
import os
import socket
import StringIO
import urlparse
import ssl
except ImportError:
#TODO(bcwaldon): Handle this failure more gracefully
import json
except ImportError:
import simplejson as json
# Python 2.5 compat fix
if not hasattr(urlparse, 'parse_qsl'):
import cgi
urlparse.parse_qsl = cgi.parse_qsl
from tuskarclient import exc
LOG = logging.getLogger(__name__)
USER_AGENT = 'python-tuskarclient'
CHUNKSIZE = 1024 * 64 # 64kB
class HTTPClient(object):
def __init__(self, endpoint, **kwargs):
self.endpoint = endpoint
self.auth_token = kwargs.get('token')
self.connection_params = self.get_connection_params(endpoint, **kwargs)
def get_connection_params(endpoint, **kwargs):
parts = urlparse.urlparse(endpoint)
_args = (parts.hostname, parts.port, parts.path)
_kwargs = {'timeout': (float(kwargs.get('timeout'))
if kwargs.get('timeout') else 600)}
if parts.scheme == 'https':
_class = VerifiedHTTPSConnection
_kwargs['ca_file'] = kwargs.get('ca_file', None)
_kwargs['cert_file'] = kwargs.get('cert_file', None)
_kwargs['key_file'] = kwargs.get('key_file', None)
_kwargs['insecure'] = kwargs.get('insecure', False)
elif parts.scheme == 'http':
_class = httplib.HTTPConnection
msg = 'Unsupported scheme: %s' % parts.scheme
raise exc.InvalidEndpoint(msg)
return (_class, _args, _kwargs)
def get_connection(self):
_class = self.connection_params[0]
return _class(*self.connection_params[1][0:2],
except httplib.InvalidURL:
raise exc.InvalidEndpoint()
def log_curl_request(self, method, url, kwargs):
curl = ['curl -i -X %s' % method]
for (key, value) in kwargs['headers'].items():
header = '-H \'%s: %s\'' % (key, value)
conn_params_fmt = [
('key_file', '--key %s'),
('cert_file', '--cert %s'),
('ca_file', '--cacert %s'),
for (key, fmt) in conn_params_fmt:
value = self.connection_params[2].get(key)
if value:
curl.append(fmt % value)
if self.connection_params[2].get('insecure'):
if 'body' in kwargs:
curl.append('-d \'%s\'' % kwargs['body'])
curl.append('%s%s' % (self.endpoint, url))
LOG.debug(' '.join(curl))
def log_http_response(resp, body=None):
status = (resp.version / 10.0, resp.status, resp.reason)
dump = ['\nHTTP/%.1f %s %s' % status]
dump.extend(['%s: %s' % (k, v) for k, v in resp.getheaders()])
if body:
dump.extend([body, ''])
def _make_connection_url(self, url):
(_class, _args, _kwargs) = self.connection_params
base_url = _args[2]
return '%s/%s' % (base_url.rstrip('/'), url.lstrip('/'))
def _http_request(self, url, method, **kwargs):
"""Send an http request with the specified characteristics.
Wrapper around httplib.HTTP(S)Connection.request to handle tasks such
as setting headers and error handling.
# Copy the kwargs so we can reuse the original in case of redirects
kwargs['headers'] = copy.deepcopy(kwargs.get('headers', {}))
kwargs['headers'].setdefault('User-Agent', USER_AGENT)
if self.auth_token:
kwargs['headers'].setdefault('X-Auth-Token', self.auth_token)
self.log_curl_request(method, url, kwargs)
conn = self.get_connection()
conn_url = self._make_connection_url(url)
conn.request(method, conn_url, **kwargs)
resp = conn.getresponse()
except socket.gaierror as e:
message = "Error finding address for %(url)s: %(e)s" % locals()
raise exc.InvalidEndpoint(message=message)
except (socket.error, socket.timeout) as e:
endpoint = self.endpoint
message = "Error communicating with %(endpoint)s %(e)s" % locals()
raise exc.CommunicationError(message=message)
body_iter = ResponseBodyIterator(resp)
# Read body into string if it isn't obviously image data
if resp.getheader('content-type', None) != 'application/octet-stream':
body_str = ''.join([chunk for chunk in body_iter])
self.log_http_response(resp, body_str)
body_iter = StringIO.StringIO(body_str)
if 400 <= resp.status < 600:
LOG.warn("Request returned failure status.")
raise exc.from_response(resp)
elif resp.status in (301, 302, 305):
# Redirected. Reissue the request to the new location.
return self._http_request(resp['location'], method, **kwargs)
elif resp.status == 300:
raise exc.from_response(resp)
return resp, body_iter
def json_request(self, method, url, **kwargs):
kwargs.setdefault('headers', {})
kwargs['headers'].setdefault('Content-Type', 'application/json')
kwargs['headers'].setdefault('Accept', 'application/json')
if 'body' in kwargs:
kwargs['body'] = json.dumps(kwargs['body'])
resp, body_iter = self._http_request(url, method, **kwargs)
content_type = resp.getheader('content-type', None)
if resp.status == 204 or resp.status == 205 or content_type is None:
return resp, list()
if 'application/json' in content_type:
body = ''.join([chunk for chunk in body_iter])
body = json.loads(body)
except ValueError:
LOG.error('Could not decode response body as JSON')
body = None
return resp, body
def raw_request(self, method, url, **kwargs):
kwargs.setdefault('headers', {})
return self._http_request(url, method, **kwargs)
class VerifiedHTTPSConnection(httplib.HTTPSConnection):
"""httplib-compatibile connection using client-side SSL authentication
def __init__(self, host, port, key_file=None, cert_file=None,
ca_file=None, timeout=None, insecure=False):
httplib.HTTPSConnection.__init__(self, host, port, key_file=key_file,
self.key_file = key_file
self.cert_file = cert_file
if ca_file is not None:
self.ca_file = ca_file
self.ca_file = self.get_system_ca_file()
self.timeout = timeout
self.insecure = insecure
def connect(self):
"""Connect to a host on a given (SSL) port.
If ca_file is pointing somewhere, use it to check Server Certificate.
Redefined/copied and extended from (Python 2.6.x).
This is needed to pass cert_reqs=ssl.CERT_REQUIRED as parameter to
ssl.wrap_socket(), which forces SSL to check server certificate against
our client certificate.
sock = socket.create_connection((, self.port), self.timeout)
if self._tunnel_host:
self.sock = sock
if self.insecure is True:
kwargs = {'cert_reqs': ssl.CERT_NONE}
kwargs = {'cert_reqs': ssl.CERT_REQUIRED, 'ca_certs': self.ca_file}
if self.cert_file:
kwargs['certfile'] = self.cert_file
if self.key_file:
kwargs['keyfile'] = self.key_file
self.sock = ssl.wrap_socket(sock, **kwargs)
def get_system_ca_file():
"""Return path to system default CA file."""
# Standard CA file locations for Debian/Ubuntu, RedHat/Fedora,
# Suse, FreeBSD/OpenBSD
ca_path = ['/etc/ssl/certs/ca-certificates.crt',
for ca in ca_path:
if os.path.exists(ca):
return ca
return None
class ResponseBodyIterator(object):
"""A class that acts as an iterator over an HTTP response."""
def __init__(self, resp):
self.resp = resp
def __iter__(self):
while True:
def next(self):
chunk =
if chunk:
return chunk
raise StopIteration()

View File

@ -0,0 +1,133 @@
# Copyright 2012 OpenStack LLC.
# All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
import sys
import textwrap
import uuid
import prettytable
from tuskarclient import exc
from tuskarclient.openstack.common import importutils
# Decorator for cli-args
def arg(*args, **kwargs):
def _decorator(func):
# Because of the sematics of decorator composition if we just append
# to the options list positional options will appear to be backwards.
func.__dict__.setdefault('arguments', []).insert(0, (args, kwargs))
return func
return _decorator
def pretty_choice_list(l):
return ', '.join("'%s'" % i for i in l)
def print_list(objs, fields, field_labels, formatters={}, sortby=0):
pt = prettytable.PrettyTable([f for f in field_labels],
caching=False, print_empty=False)
pt.align = 'l'
for o in objs:
row = []
for field in fields:
if field in formatters:
data = getattr(o, field, '')
print pt.get_string(sortby=field_labels[sortby])
def print_dict(d, dict_property="Property", wrap=0):
pt = prettytable.PrettyTable([dict_property, 'Value'],
caching=False, print_empty=False)
pt.align = 'l'
for k, v in d.iteritems():
# convert dict to str to check length
if isinstance(v, dict):
v = str(v)
if wrap > 0:
v = textwrap.fill(str(v), wrap)
# if value has a newline, add in multiple rows
# e.g. fault with stacktrace
if v and isinstance(v, basestring) and r'\n' in v:
lines = v.strip().split(r'\n')
col1 = k
for line in lines:
pt.add_row([col1, line])
col1 = ''
pt.add_row([k, v])
print pt.get_string()
def find_resource(manager, name_or_id):
"""Helper for the _find_* methods."""
# first try to get entity as integer id
if isinstance(name_or_id, int) or name_or_id.isdigit():
return manager.get(int(name_or_id))
except exc.NotFound:
# now try to get entity as uuid
return manager.get(name_or_id)
except (ValueError, exc.NotFound):
# finally try to find entity by name
return manager.find(name=name_or_id)
except exc.NotFound:
msg = "No %s with a name or ID of '%s' exists." % \
(manager.resource_class.__name__.lower(), name_or_id)
raise exc.CommandError(msg)
def string_to_bool(arg):
return arg.strip().lower() in ('t', 'true', 'yes', '1')
def env(*vars, **kwargs):
"""Search for the first defined of possibly many env vars
Returns the first environment variable defined in vars, or
returns the default defined in kwargs.
for v in vars:
value = os.environ.get(v, None)
if value:
return value
return kwargs.get('default', '')
def import_versioned_module(version, submodule=None):
module = 'tuskarclient.v%s' % version
if submodule:
module = '.'.join((module, submodule))
return importutils.import_module(module)
def exit(msg=''):
if msg:
print >> sys.stderr, msg

tuskarclient/ Normal file
View File

@ -0,0 +1,163 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import sys
class BaseException(Exception):
"""An error occurred."""
def __init__(self, message=None):
self.message = message
def __str__(self):
return self.message or self.__class__.__doc__
class CommandError(BaseException):
"""Invalid usage of CLI."""
class InvalidEndpoint(BaseException):
"""The provided endpoint is invalid."""
class CommunicationError(BaseException):
"""Unable to communicate with server."""
class ClientException(Exception):
class HTTPException(ClientException):
"""Base exception for all HTTP-derived exceptions."""
code = 'N/A'
def __init__(self, details=None):
self.details = details
def __str__(self):
return "%s (HTTP %s)" % (self.__class__.__name__, self.code)
class HTTPMultipleChoices(HTTPException):
code = 300
def __str__(self):
self.details = ("Requested version of OpenStack Images API is not"
return "%s (HTTP %s) %s" % (self.__class__.__name__, self.code,
class BadRequest(HTTPException):
code = 400
class HTTPBadRequest(BadRequest):
class Unauthorized(HTTPException):
code = 401
class HTTPUnauthorized(Unauthorized):
class Forbidden(HTTPException):
code = 403
class HTTPForbidden(Forbidden):
class NotFound(HTTPException):
code = 404
class HTTPNotFound(NotFound):
class HTTPMethodNotAllowed(HTTPException):
code = 405
class Conflict(HTTPException):
code = 409
class HTTPConflict(Conflict):
class OverLimit(HTTPException):
code = 413
class HTTPOverLimit(OverLimit):
class HTTPInternalServerError(HTTPException):
code = 500
class HTTPNotImplemented(HTTPException):
code = 501
class HTTPBadGateway(HTTPException):
code = 502
class ServiceUnavailable(HTTPException):
code = 503
class HTTPServiceUnavailable(ServiceUnavailable):
#NOTE(bcwaldon): Build a mapping of HTTP codes to corresponding exception
# classes
_code_map = {}
for obj_name in dir(sys.modules[__name__]):
if obj_name.startswith('HTTP'):
obj = getattr(sys.modules[__name__], obj_name)
_code_map[obj.code] = obj
def from_response(response):
"""Return an instance of an HTTPException based on httplib response."""
cls = _code_map.get(response.status, HTTPException)
return cls()
class NoTokenLookupException(Exception):
class EndpointNotFound(Exception):

View File

View File

@ -0,0 +1,68 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 OpenStack Foundation.
# All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
Import related utilities and helper functions.
import sys
import traceback
def import_class(import_str):
"""Returns a class from a string including module and class."""
mod_str, _sep, class_str = import_str.rpartition('.')
return getattr(sys.modules[mod_str], class_str)
except (ValueError, AttributeError):
raise ImportError('Class %s cannot be found (%s)' %
def import_object(import_str, *args, **kwargs):
"""Import a class and return an instance of it."""
return import_class(import_str)(*args, **kwargs)
def import_object_ns(name_space, import_str, *args, **kwargs):
"""Tries to import object from default namespace.
Imports a class and return an instance of it, first by trying
to find the class in a default namespace, then failing back to
a full path if not found in the default namespace.
import_value = "%s.%s" % (name_space, import_str)
return import_class(import_value)(*args, **kwargs)
except ImportError:
return import_class(import_str)(*args, **kwargs)
def import_module(import_str):
"""Import a module."""
return sys.modules[import_str]
def try_import(import_str, default=None):
"""Try to import a module and if it fails return default."""
return import_module(import_str)
except ImportError:
return default

View File

@ -0,0 +1,48 @@
# Copyright 2012 OpenStack LLC.
# All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from tuskarclient.tests import utils as tutils
from tuskarclient.common import http
fixtures = {}
class HttpClientTest(tutils.TestCase):
def test_url_generation_trailing_slash_in_base(self):
client = http.HTTPClient('http://localhost/')
url = client._make_connection_url('/v1/resources')
print client.connection_params
self.assertEqual(url, '/v1/resources')
def test_url_generation_without_trailing_slash_in_base(self):
client = http.HTTPClient('http://localhost')
url = client._make_connection_url('/v1/resources')
print client.connection_params
self.assertEqual(url, '/v1/resources')
def test_url_generation_prefix_slash_in_path(self):
client = http.HTTPClient('http://localhost/')
url = client._make_connection_url('/v1/resources')
print client.connection_params
self.assertEqual(url, '/v1/resources')
def test_url_generation_without_prefix_slash_in_path(self):
client = http.HTTPClient('http://localhost')
url = client._make_connection_url('v1/resources')
print client.connection_params
self.assertEqual(url, '/v1/resources')

View File

@ -0,0 +1,47 @@
# Copyright 2013 OpenStack LLC.
# All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import cStringIO
import sys
from tuskarclient.common import utils
from tuskarclient.tests import utils as test_utils
class UtilsTest(test_utils.TestCase):
def test_prettytable(self):
class Struct:
def __init__(self, **entries):
# test that the prettytable output is wellformatted (left-aligned)
saved_stdout = sys.stdout
sys.stdout = output_dict = cStringIO.StringIO()
utils.print_dict({'K': 'k', 'Key': 'Value'})
sys.stdout = saved_stdout
self.assertEqual(output_dict.getvalue(), '''\
| Property | Value |
| K | k |
| Key | Value |

View File

@ -10,10 +10,14 @@
# License for the specific language governing permissions and limitations
# under the License.
import copy
import fixtures
import os
import StringIO
import testtools
from tuskarclient.common import http
class TestCase(testtools.TestCase):
@ -27,3 +31,41 @@ class TestCase(testtools.TestCase):
os.environ.get('OS_STDERR_CAPTURE') == '1'):
stderr = self.useFixture(fixtures.StringStream('stderr')).stream
self.useFixture(fixtures.MonkeyPatch('sys.stderr', stderr))
class FakeAPI(object):
def __init__(self, fixtures):
self.fixtures = fixtures
self.calls = []
def _request(self, method, url, headers=None, body=None):
call = (method, url, headers or {}, body)
return self.fixtures[url][method]
def raw_request(self, *args, **kwargs):
fixture = self._request(*args, **kwargs)
body_iter = http.ResponseBodyIterator(StringIO.StringIO(fixture[1]))
return FakeResponse(fixture[0]), body_iter
def json_request(self, *args, **kwargs):
fixture = self._request(*args, **kwargs)
return FakeResponse(fixture[0]), fixture[1]
class FakeResponse(object):
def __init__(self, headers, body=None, version=None):
""":param headers: dict representing HTTP response headers
:param body: file-like object
self.headers = headers
self.body = body
def getheaders(self):
return copy.deepcopy(self.headers).items()
def getheader(self, key, default):
return self.headers.get(key, default)
def read(self, amt):