# Copyright 2012 OpenStack Foundation # All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. import copy import json import six import six.moves.urllib.parse as urlparse import testtools from glanceclient.v2.schemas import Schema class FakeAPI(object): def __init__(self, fixtures): self.fixtures = fixtures self.calls = [] def _request(self, method, url, headers=None, data=None, content_length=None): call = build_call_record(method, sort_url_by_query_keys(url), headers or {}, data) if content_length is not None: call = tuple(list(call) + [content_length]) self.calls.append(call) fixture = self.fixtures[sort_url_by_query_keys(url)][method] data = fixture[1] if isinstance(fixture[1], six.string_types): try: data = json.loads(fixture[1]) except ValueError: data = six.StringIO(fixture[1]) return FakeResponse(fixture[0], fixture[1]), data def get(self, *args, **kwargs): return self._request('GET', *args, **kwargs) def post(self, *args, **kwargs): return self._request('POST', *args, **kwargs) def put(self, *args, **kwargs): return self._request('PUT', *args, **kwargs) def patch(self, *args, **kwargs): return self._request('PATCH', *args, **kwargs) def delete(self, *args, **kwargs): return self._request('DELETE', *args, **kwargs) def head(self, *args, **kwargs): return self._request('HEAD', *args, **kwargs) class FakeSchemaAPI(FakeAPI): def __init__(cls, *args): super(FakeSchemaAPI, cls).__init__(*args) def get(self, *args, **kwargs): _, raw_schema = self._request('GET', *args, **kwargs) return Schema(raw_schema) class RawRequest(object): def __init__(self, headers, body=None, version=1.0, status=200, reason="Ok"): """ :param headers: dict representing HTTP response headers :param body: file-like object :param version: HTTP Version :param status: Response status code :param reason: Status code related message. """ self.body = body self.status = status self.reason = reason self.version = version self.headers = headers def getheaders(self): return copy.deepcopy(self.headers).items() def getheader(self, key, default): return self.headers.get(key, default) def read(self, amt): return self.body.read(amt) class FakeResponse(object): def __init__(self, headers=None, body=None, version=1.0, status_code=200, reason="Ok"): """ :param headers: dict representing HTTP response headers :param body: file-like object :param version: HTTP Version :param status: Response status code :param reason: Status code related message. """ self.body = body self.reason = reason self.version = version self.headers = headers self.status_code = status_code self.raw = RawRequest(headers, body=body, reason=reason, version=version, status=status_code) @property def ok(self): return (self.status_code < 400 or self.status_code >= 600) def read(self, amt): return self.body.read(amt) def close(self): pass @property def content(self): if hasattr(self.body, "read"): return self.body.read() return self.body @property def text(self): if isinstance(self.content, six.binary_type): return self.content.decode('utf-8') return self.content def json(self, **kwargs): return self.body and json.loads(self.text) or "" def iter_content(self, chunk_size=1, decode_unicode=False): while True: chunk = self.raw.read(chunk_size) if not chunk: break yield chunk class TestCase(testtools.TestCase): TEST_REQUEST_BASE = { 'config': {'danger_mode': False}, 'verify': True} class FakeTTYStdout(six.StringIO): """A Fake stdout that try to emulate a TTY device as much as possible.""" def isatty(self): return True def write(self, data): # When a CR (carriage return) is found reset file. if data.startswith('\r'): self.seek(0) data = data[1:] return six.StringIO.write(self, data) class FakeNoTTYStdout(FakeTTYStdout): """A Fake stdout that is not a TTY device.""" def isatty(self): return False def sort_url_by_query_keys(url): """A helper function which sorts the keys of the query string of a url. For example, an input of '/v2/tasks?sort_key=id&sort_dir=asc&limit=10' returns '/v2/tasks?limit=10&sort_dir=asc&sort_key=id'. This is to prevent non-deterministic ordering of the query string causing problems with unit tests. :param url: url which will be ordered by query keys :returns url: url with ordered query keys """ parsed = urlparse.urlparse(url) queries = urlparse.parse_qsl(parsed.query, True) sorted_query = sorted(queries, key=lambda x: x[0]) encoded_sorted_query = urlparse.urlencode(sorted_query, True) url_parts = (parsed.scheme, parsed.netloc, parsed.path, parsed.params, encoded_sorted_query, parsed.fragment) return urlparse.urlunparse(url_parts) def build_call_record(method, url, headers, data): """Key the request body be ordered if it's a dict type. """ if isinstance(data, dict): data = sorted(data.items()) if isinstance(data, six.string_types): # NOTE(flwang): For image update, the data will be a 'list' which # contains operation dict, such as: [{"op": "remove", "path": "/a"}] try: data = json.loads(data) except ValueError: return (method, url, headers or {}, data) data = [sorted(d.items()) for d in data] return (method, url, headers or {}, data)