Replace httpretty with requests-mock

We've had some issue in the past with httpretty breaking compatability
between releases and being a problem to package upstream. requests-mock
is on stackforge and can be tailored to our needs.

Due to issues with py34 and httpretty, this is the preferred fix to
allow middleware to test against py34 (instead of waiting for a fix to
httpretty).

Change-Id: I9fa0f12c732eb187fcb5b0926593cb3624ddc51a
This commit is contained in:
Jamie Lennox 2014-08-08 17:17:29 +10:00 committed by Morgan Fainberg
parent 6266ed437d
commit 89eddf54ee
4 changed files with 142 additions and 257 deletions

View File

@ -23,7 +23,6 @@ import time
import uuid
import fixtures
import httpretty
from keystoneclient import access
from keystoneclient.common import cms
from keystoneclient import exceptions
@ -32,6 +31,7 @@ from keystoneclient import session
import mock
from oslo.serialization import jsonutils
from oslo.utils import timeutils
from requests_mock.contrib import fixture as rm_fixture
import six
import testresources
import testtools
@ -305,6 +305,7 @@ class BaseAuthTokenMiddlewareTest(testtools.TestCase):
self.expected_env = expected_env or dict()
self.fake_app = fake_app or FakeApp
self.middleware = None
self.requests = self.useFixture(rm_fixture.Fixture())
self.conf = {
'identity_uri': 'https://keystone.example.com:1234/testadmin/',
@ -363,10 +364,9 @@ class BaseAuthTokenMiddlewareTest(testtools.TestCase):
def assertLastPath(self, path):
if path:
self.assertEqual(path, httpretty.last_request().path)
self.assertEqual(BASE_URI + path, self.requests.last_request.url)
else:
self.assertIsInstance(httpretty.last_request(),
httpretty.core.HTTPrettyRequestEmpty)
self.assertIsNone(self.requests.last_request)
class DiabloAuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest,
@ -387,25 +387,20 @@ class DiabloAuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest,
super(DiabloAuthTokenMiddlewareTest, self).setUp(
expected_env=expected_env)
httpretty.reset()
httpretty.enable()
self.addCleanup(httpretty.disable)
httpretty.register_uri(httpretty.GET,
self.requests.register_uri('GET',
"%s/" % BASE_URI,
body=VERSION_LIST_v2,
status=300)
text=VERSION_LIST_v2,
status_code=300)
httpretty.register_uri(httpretty.POST,
self.requests.register_uri('POST',
"%s/v2.0/tokens" % BASE_URI,
body=FAKE_ADMIN_TOKEN)
text=FAKE_ADMIN_TOKEN)
self.token_id = self.examples.VALID_DIABLO_TOKEN
token_response = self.examples.JSON_TOKEN_RESPONSES[self.token_id]
httpretty.register_uri(httpretty.GET,
"%s/v2.0/tokens/%s" % (BASE_URI, self.token_id),
body=token_response)
url = "%s/v2.0/tokens/%s" % (BASE_URI, self.token_id)
self.requests.register_uri('GET', url, text=token_response)
self.set_middleware()
@ -530,7 +525,6 @@ class GeneralAuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest,
@testtools.skipUnless(memcached_available(), 'memcached not available')
def test_encrypt_cache_data(self):
httpretty.disable()
conf = {
'memcached_servers': MEMCACHED_SERVERS,
'memcache_security_strategy': 'encrypt',
@ -548,7 +542,6 @@ class GeneralAuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest,
@testtools.skipUnless(memcached_available(), 'memcached not available')
def test_sign_cache_data(self):
httpretty.disable()
conf = {
'memcached_servers': MEMCACHED_SERVERS,
'memcache_security_strategy': 'mac',
@ -566,7 +559,6 @@ class GeneralAuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest,
@testtools.skipUnless(memcached_available(), 'memcached not available')
def test_no_memcache_protection(self):
httpretty.disable()
conf = {
'memcached_servers': MEMCACHED_SERVERS,
'memcache_secret_key': 'mysecret'
@ -1010,10 +1002,9 @@ class CommonAuthTokenMiddlewareTest(object):
in_memory_list)
def test_invalid_revocation_list_raises_service_error(self):
httpretty.register_uri(httpretty.GET,
self.requests.register_uri('GET',
"%s/v2.0/tokens/revoked" % BASE_URI,
body="{}",
status=200)
json={})
self.assertRaises(auth_token.ServiceError,
self.middleware._fetch_revocation_list)
@ -1028,7 +1019,7 @@ class CommonAuthTokenMiddlewareTest(object):
# remember because we are testing the middleware we stub the connection
# to the keystone server, but this is not what gets returned
invalid_uri = "%s/v2.0/tokens/invalid-token" % BASE_URI
httpretty.register_uri(httpretty.GET, invalid_uri, body="", status=404)
self.requests.register_uri('GET', invalid_uri, status_code=404)
req = webob.Request.blank('/')
req.headers['X-Auth-Token'] = 'invalid-token'
@ -1101,9 +1092,6 @@ class CommonAuthTokenMiddlewareTest(object):
return self.middleware._token_cache._cache_get(token_id)
def test_memcache(self):
# NOTE(jamielennox): it appears that httpretty can mess with the
# memcache socket. Just disable it as it's not required here anyway.
httpretty.disable()
req = webob.Request.blank('/')
token = self.token_dict['signed_token_scoped']
req.headers['X-Auth-Token'] = token
@ -1111,7 +1099,6 @@ class CommonAuthTokenMiddlewareTest(object):
self.assertIsNotNone(self._get_cached_token(token))
def test_expired(self):
httpretty.disable()
req = webob.Request.blank('/')
token = self.token_dict['signed_token_scoped_expired']
req.headers['X-Auth-Token'] = token
@ -1120,7 +1107,7 @@ class CommonAuthTokenMiddlewareTest(object):
def test_memcache_set_invalid_uuid(self):
invalid_uri = "%s/v2.0/tokens/invalid-token" % BASE_URI
httpretty.register_uri(httpretty.GET, invalid_uri, body="", status=404)
self.requests.register_uri('GET', invalid_uri, status_code=404)
req = webob.Request.blank('/')
token = 'invalid-token'
@ -1155,7 +1142,6 @@ class CommonAuthTokenMiddlewareTest(object):
exp_mode='sha256')
def test_memcache_set_expired(self, extra_conf={}, extra_environ={}):
httpretty.disable()
token_cache_time = 10
conf = {
'token_cache_time': token_cache_time,
@ -1415,9 +1401,9 @@ class CommonAuthTokenMiddlewareTest(object):
for service_url in (self.examples.UNVERSIONED_SERVICE_URL,
self.examples.SERVICE_URL):
httpretty.register_uri(httpretty.GET,
self.requests.register_uri('GET',
service_url,
body=VERSION_LIST_v3,
text=VERSION_LIST_v3,
status_code=300)
req = webob.Request.blank('/')
@ -1460,22 +1446,18 @@ class V2CertDownloadMiddlewareTest(BaseAuthTokenMiddlewareTest,
}
self.set_middleware(conf=conf)
httpretty.reset()
httpretty.enable()
self.addCleanup(httpretty.disable)
# Usually we supply a signed_dir with pre-installed certificates,
# so invocation of /usr/bin/openssl succeeds. This time we give it
# an empty directory, so it fails.
def test_request_no_token_dummy(self):
cms._ensure_subprocess()
httpretty.register_uri(httpretty.GET,
self.requests.register_uri('GET',
"%s%s" % (BASE_URI, self.ca_path),
status=404)
httpretty.register_uri(httpretty.GET,
status_code=404)
self.requests.register_uri('GET',
"%s%s" % (BASE_URI, self.signing_path),
status=404)
status_code=404)
self.assertRaises(exceptions.CertificateConfigError,
self.middleware._verify_signed_token,
self.examples.SIGNED_TOKEN_SCOPED,
@ -1483,29 +1465,25 @@ class V2CertDownloadMiddlewareTest(BaseAuthTokenMiddlewareTest,
def test_fetch_signing_cert(self):
data = 'FAKE CERT'
httpretty.register_uri(httpretty.GET,
"%s%s" % (BASE_URI, self.signing_path),
body=data)
url = "%s%s" % (BASE_URI, self.signing_path)
self.requests.register_uri('GET', url, text=data)
self.middleware._fetch_signing_cert()
with open(self.middleware._signing_cert_file_name, 'r') as f:
self.assertEqual(f.read(), data)
self.assertEqual("/testadmin%s" % self.signing_path,
httpretty.last_request().path)
self.assertEqual(url, self.requests.last_request.url)
def test_fetch_signing_ca(self):
data = 'FAKE CA'
httpretty.register_uri(httpretty.GET,
"%s%s" % (BASE_URI, self.ca_path),
body=data)
url = "%s%s" % (BASE_URI, self.ca_path)
self.requests.register_uri('GET', url, text=data)
self.middleware._fetch_ca_cert()
with open(self.middleware._signing_ca_file_name, 'r') as f:
self.assertEqual(f.read(), data)
self.assertEqual("/testadmin%s" % self.ca_path,
httpretty.last_request().path)
self.assertEqual(url, self.requests.last_request.url)
def test_prefix_trailing_slash(self):
del self.conf['identity_uri']
@ -1514,24 +1492,19 @@ class V2CertDownloadMiddlewareTest(BaseAuthTokenMiddlewareTest,
self.conf['auth_port'] = 1234
self.conf['auth_admin_prefix'] = '/newadmin/'
httpretty.register_uri(httpretty.GET,
"%s/newadmin%s" % (BASE_HOST, self.ca_path),
body='FAKECA')
httpretty.register_uri(httpretty.GET,
"%s/newadmin%s" %
(BASE_HOST, self.signing_path), body='FAKECERT')
ca_url = "%s/newadmin%s" % (BASE_HOST, self.ca_path)
signing_url = "%s/newadmin%s" % (BASE_HOST, self.signing_path)
self.requests.register_uri('GET', ca_url, text='FAKECA')
self.requests.register_uri('GET', signing_url, text='FAKECERT')
self.set_middleware(conf=self.conf)
self.middleware._fetch_ca_cert()
self.assertEqual('/newadmin%s' % self.ca_path,
httpretty.last_request().path)
self.assertEqual(ca_url, self.requests.last_request.url)
self.middleware._fetch_signing_cert()
self.assertEqual('/newadmin%s' % self.signing_path,
httpretty.last_request().path)
self.assertEqual(signing_url, self.requests.last_request.url)
def test_without_prefix(self):
del self.conf['identity_uri']
@ -1540,24 +1513,18 @@ class V2CertDownloadMiddlewareTest(BaseAuthTokenMiddlewareTest,
self.conf['auth_port'] = 1234
self.conf['auth_admin_prefix'] = ''
httpretty.register_uri(httpretty.GET,
"%s%s" % (BASE_HOST, self.ca_path),
body='FAKECA')
httpretty.register_uri(httpretty.GET,
"%s%s" % (BASE_HOST, self.signing_path),
body='FAKECERT')
ca_url = "%s%s" % (BASE_HOST, self.ca_path)
signing_url = "%s%s" % (BASE_HOST, self.signing_path)
self.requests.register_uri('GET', ca_url, text='FAKECA')
self.requests.register_uri('GET', signing_url, text='FAKECERT')
self.set_middleware(conf=self.conf)
self.middleware._fetch_ca_cert()
self.assertEqual(self.ca_path,
httpretty.last_request().path)
self.assertEqual(ca_url, self.requests.last_request.url)
self.middleware._fetch_signing_cert()
self.assertEqual(self.signing_path,
httpretty.last_request().path)
self.assertEqual(signing_url, self.requests.last_request.url)
class V3CertDownloadMiddlewareTest(V2CertDownloadMiddlewareTest):
@ -1570,7 +1537,7 @@ class V3CertDownloadMiddlewareTest(V2CertDownloadMiddlewareTest):
self.signing_path = '/v3/OS-SIMPLE-CERT/certificates'
def network_error_response(method, uri, headers):
def network_error_response(request, context):
raise auth_token.NetworkError("Network connection error.")
@ -1619,23 +1586,18 @@ class v2AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest,
self.examples.REVOKED_TOKEN_HASH_SHA256,
}
httpretty.reset()
httpretty.enable()
self.addCleanup(httpretty.disable)
httpretty.register_uri(httpretty.GET,
self.requests.register_uri('GET',
"%s/" % BASE_URI,
body=VERSION_LIST_v2,
status=300)
text=VERSION_LIST_v2,
status_code=300)
httpretty.register_uri(httpretty.POST,
self.requests.register_uri('POST',
"%s/v2.0/tokens" % BASE_URI,
body=FAKE_ADMIN_TOKEN)
text=FAKE_ADMIN_TOKEN)
httpretty.register_uri(httpretty.GET,
self.requests.register_uri('GET',
"%s/v2.0/tokens/revoked" % BASE_URI,
body=self.examples.SIGNED_REVOCATION_LIST,
status=200)
text=self.examples.SIGNED_REVOCATION_LIST)
for token in (self.examples.UUID_TOKEN_DEFAULT,
self.examples.UUID_TOKEN_UNSCOPED,
@ -1644,14 +1606,12 @@ class v2AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest,
self.examples.UUID_TOKEN_NO_SERVICE_CATALOG,
self.examples.SIGNED_TOKEN_SCOPED_KEY,
self.examples.SIGNED_TOKEN_SCOPED_PKIZ_KEY,):
httpretty.register_uri(httpretty.GET,
"%s/v2.0/tokens/%s" % (BASE_URI, token),
body=
self.examples.JSON_TOKEN_RESPONSES[token])
url = "%s/v2.0/tokens/%s" % (BASE_URI, token)
text = self.examples.JSON_TOKEN_RESPONSES[token]
self.requests.register_uri('GET', url, text=text)
httpretty.register_uri(httpretty.GET,
'%s/v2.0/tokens/%s' % (BASE_URI, ERROR_TOKEN),
body=network_error_response)
url = '%s/v2.0/tokens/%s' % (BASE_URI, ERROR_TOKEN)
self.requests.register_uri('GET', url, text=network_error_response)
self.set_middleware()
@ -1669,7 +1629,7 @@ class v2AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest,
self.assertIn('keystone.token_info', req.environ)
def assert_valid_last_url(self, token_id):
self.assertLastPath("/testadmin/v2.0/tokens/%s" % token_id)
self.assertLastPath("/v2.0/tokens/%s" % token_id)
def test_default_tenant_uuid_token(self):
self.assert_unscoped_default_tenant_auto_scopes(
@ -1712,7 +1672,6 @@ class CrossVersionAuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest,
resources = [('examples', client_fixtures.EXAMPLES_RESOURCE)]
@httpretty.activate
def test_valid_uuid_request_forced_to_2_0(self):
"""Test forcing auth_token to use lower api version.
@ -1728,20 +1687,19 @@ class CrossVersionAuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest,
'auth_version': 'v2.0'
}
httpretty.register_uri(httpretty.GET,
"%s/" % BASE_URI,
body=VERSION_LIST_v3,
status=300)
self.requests.register_uri('GET',
BASE_URI,
text=VERSION_LIST_v3,
status_code=300)
httpretty.register_uri(httpretty.POST,
self.requests.register_uri('POST',
"%s/v2.0/tokens" % BASE_URI,
body=FAKE_ADMIN_TOKEN)
text=FAKE_ADMIN_TOKEN)
token = self.examples.UUID_TOKEN_DEFAULT
httpretty.register_uri(httpretty.GET,
"%s/v2.0/tokens/%s" % (BASE_URI, token),
body=
self.examples.JSON_TOKEN_RESPONSES[token])
url = "%s/v2.0/tokens/%s" % (BASE_URI, token)
text = self.examples.JSON_TOKEN_RESPONSES[token]
self.requests.register_uri('GET', url, text=text)
self.set_middleware(conf=conf)
@ -1751,9 +1709,7 @@ class CrossVersionAuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest,
req.headers['X-Auth-Token'] = self.examples.UUID_TOKEN_DEFAULT
self.middleware(req.environ, self.start_fake_response)
self.assertEqual(self.response_status, 200)
self.assertEqual("/testadmin/v2.0/tokens/%s" %
self.examples.UUID_TOKEN_DEFAULT,
httpretty.last_request().path)
self.assertEqual(url, self.requests.last_request.url)
class v3AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest,
@ -1814,41 +1770,32 @@ class v3AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest,
self.examples.REVOKED_v3_PKIZ_TOKEN_HASH,
}
httpretty.reset()
httpretty.enable()
self.addCleanup(httpretty.disable)
httpretty.register_uri(httpretty.GET,
"%s" % BASE_URI,
body=VERSION_LIST_v3,
status=300)
self.requests.register_uri('GET',
BASE_URI,
text=VERSION_LIST_v3,
status_code=300)
# TODO(jamielennox): auth_token middleware uses a v2 admin token
# regardless of the auth_version that is set.
httpretty.register_uri(httpretty.POST,
self.requests.register_uri('POST',
"%s/v2.0/tokens" % BASE_URI,
body=FAKE_ADMIN_TOKEN)
text=FAKE_ADMIN_TOKEN)
# TODO(jamielennox): there is no v3 revocation url yet, it uses v2
httpretty.register_uri(httpretty.GET,
self.requests.register_uri('GET',
"%s/v2.0/tokens/revoked" % BASE_URI,
body=self.examples.SIGNED_REVOCATION_LIST,
status=200)
text=self.examples.SIGNED_REVOCATION_LIST)
httpretty.register_uri(httpretty.GET,
self.requests.register_uri('GET',
"%s/v3/auth/tokens" % BASE_URI,
body=self.token_response)
text=self.token_response)
self.set_middleware()
def token_response(self, request, uri, headers):
def token_response(self, request, context):
auth_id = request.headers.get('X-Auth-Token')
token_id = request.headers.get('X-Subject-Token')
self.assertEqual(auth_id, FAKE_ADMIN_TOKEN_ID)
headers.pop('status')
status = 200
response = ""
if token_id == ERROR_TOKEN:
raise auth_token.NetworkError("Network connection error.")
@ -1856,12 +1803,13 @@ class v3AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest,
try:
response = self.examples.JSON_TOKEN_RESPONSES[token_id]
except KeyError:
status = 404
response = ""
context.status_code = 404
return status, headers, response
return response
def assert_valid_last_url(self, token_id):
self.assertLastPath('/testadmin/v3/auth/tokens')
self.assertLastPath('/v3/auth/tokens')
def test_valid_unscoped_uuid_request(self):
# Remove items that won't be in an unscoped token
@ -1879,7 +1827,7 @@ class v3AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest,
self.set_middleware(expected_env=delta_expected_env)
self.assert_valid_request_200(self.examples.v3_UUID_TOKEN_UNSCOPED,
with_catalog=False)
self.assertLastPath('/testadmin/v3/auth/tokens')
self.assertLastPath('/v3/auth/tokens')
def test_domain_scoped_uuid_request(self):
# Modify items compared to default token for a domain scope
@ -1897,7 +1845,7 @@ class v3AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest,
self.set_middleware(expected_env=delta_expected_env)
self.assert_valid_request_200(
self.examples.v3_UUID_TOKEN_DOMAIN_SCOPED)
self.assertLastPath('/testadmin/v3/auth/tokens')
self.assertLastPath('/v3/auth/tokens')
def test_gives_v2_catalog(self):
self.set_middleware()
@ -2392,36 +2340,32 @@ class v2CompositeAuthTests(BaseAuthTokenMiddlewareTest,
'uuid_service_token_default': uuid_service_token_default,
}
httpretty.reset()
httpretty.enable()
self.addCleanup(httpretty.disable)
httpretty.register_uri(httpretty.GET,
self.requests.register_uri('GET',
"%s/" % BASE_URI,
body=VERSION_LIST_v2,
status=300)
text=VERSION_LIST_v2,
status_code=300)
httpretty.register_uri(httpretty.POST,
self.requests.register_uri('POST',
"%s/v2.0/tokens" % BASE_URI,
body=FAKE_ADMIN_TOKEN)
text=FAKE_ADMIN_TOKEN)
httpretty.register_uri(httpretty.GET,
self.requests.register_uri('GET',
"%s/v2.0/tokens/revoked" % BASE_URI,
body=self.examples.SIGNED_REVOCATION_LIST,
status=200)
text=self.examples.SIGNED_REVOCATION_LIST,
status_code=200)
for token in (self.examples.UUID_TOKEN_DEFAULT,
self.examples.UUID_SERVICE_TOKEN_DEFAULT,):
httpretty.register_uri(httpretty.GET,
self.requests.register_uri(
'GET',
"%s/v2.0/tokens/%s" % (BASE_URI, token),
body=
self.examples.JSON_TOKEN_RESPONSES[token])
text=self.examples.JSON_TOKEN_RESPONSES[token])
for invalid_uri in ("%s/v2.0/tokens/invalid-token" % BASE_URI,
"%s/v2.0/tokens/invalid-service-token" % BASE_URI):
httpretty.register_uri(httpretty.GET,
self.requests.register_uri('GET',
invalid_uri,
body="", status=404)
text="", status_code=404)
self.token_expected_env = dict(EXPECTED_V2_DEFAULT_ENV_RESPONSE)
self.service_token_expected_env = dict(
@ -2453,30 +2397,25 @@ class v3CompositeAuthTests(BaseAuthTokenMiddlewareTest,
'uuid_service_token_default': uuid_serv_token_default,
}
httpretty.reset()
httpretty.enable()
self.addCleanup(httpretty.disable)
httpretty.register_uri(httpretty.GET,
self.requests.register_uri('GET',
"%s" % BASE_URI,
body=VERSION_LIST_v3,
status=300)
text=VERSION_LIST_v3,
status_code=300)
# TODO(jamielennox): auth_token middleware uses a v2 admin token
# regardless of the auth_version that is set.
httpretty.register_uri(httpretty.POST,
self.requests.register_uri('POST',
"%s/v2.0/tokens" % BASE_URI,
body=FAKE_ADMIN_TOKEN)
text=FAKE_ADMIN_TOKEN)
# TODO(jamielennox): there is no v3 revocation url yet, it uses v2
httpretty.register_uri(httpretty.GET,
self.requests.register_uri('GET',
"%s/v2.0/tokens/revoked" % BASE_URI,
body=self.examples.SIGNED_REVOCATION_LIST,
status=200)
text=self.examples.SIGNED_REVOCATION_LIST)
httpretty.register_uri(httpretty.GET,
self.requests.register_uri('GET',
"%s/v3/auth/tokens" % BASE_URI,
body=self.token_response)
text=self.token_response)
self.token_expected_env = dict(EXPECTED_V2_DEFAULT_ENV_RESPONSE)
self.token_expected_env.update(EXPECTED_V3_DEFAULT_ENV_ADDITIONS)
@ -2486,11 +2425,10 @@ class v3CompositeAuthTests(BaseAuthTokenMiddlewareTest,
EXPECTED_V3_DEFAULT_SERVICE_ENV_ADDITIONS)
self.set_middleware()
def token_response(self, request, uri, headers):
def token_response(self, request, context):
auth_id = request.headers.get('X-Auth-Token')
token_id = request.headers.get('X-Subject-Token')
self.assertEqual(auth_id, FAKE_ADMIN_TOKEN_ID)
headers.pop('status')
status = 200
response = ""
@ -2503,7 +2441,8 @@ class v3CompositeAuthTests(BaseAuthTokenMiddlewareTest,
except KeyError:
status = 404
return status, headers, response
context.status_code = status
return response
def load_tests(loader, tests, pattern):

View File

@ -12,10 +12,10 @@
# License for the specific language governing permissions and limitations
# under the License.
import httpretty
import mock
from oslo.serialization import jsonutils
import requests
from requests_mock.contrib import fixture as rm_fixture
import six
import testtools
import webob
@ -54,9 +54,7 @@ class S3TokenMiddlewareTestBase(utils.TestCase):
'auth_protocol': self.TEST_PROTOCOL,
}
httpretty.reset()
httpretty.enable()
self.addCleanup(httpretty.disable)
self.requests = self.useFixture(rm_fixture.Fixture())
def start_fake_response(self, status, headers):
self.response_status = int(status.split(' ', 1)[0])
@ -69,8 +67,8 @@ class S3TokenMiddlewareTestGood(S3TokenMiddlewareTestBase):
super(S3TokenMiddlewareTestGood, self).setUp()
self.middleware = s3_token.S3Token(FakeApp(), self.conf)
httpretty.register_uri(httpretty.POST, self.TEST_URL,
status=201, body=jsonutils.dumps(GOOD_RESPONSE))
self.requests.register_uri('POST', self.TEST_URL,
status_code=201, json=GOOD_RESPONSE)
# Ignore the request and pass to the next middleware in the
# pipeline if no path has been specified.
@ -101,6 +99,10 @@ class S3TokenMiddlewareTestGood(S3TokenMiddlewareTestBase):
self.assertEqual(req.headers['X-Auth-Token'], 'TOKEN_ID')
def test_authorized_http(self):
self.requests.register_uri('POST',
self.TEST_URL.replace('https', 'http'),
status_code=201, json=GOOD_RESPONSE)
self.middleware = (
s3_token.filter_factory({'auth_protocol': 'http',
'auth_host': self.TEST_HOST,
@ -152,8 +154,8 @@ class S3TokenMiddlewareTestBad(S3TokenMiddlewareTestBase):
{"message": "EC2 access key not found.",
"code": 401,
"title": "Unauthorized"}}
httpretty.register_uri(httpretty.POST, self.TEST_URL,
status=403, body=jsonutils.dumps(ret))
self.requests.register_uri('POST', self.TEST_URL,
status_code=403, json=ret)
req = webob.Request.blank('/v1/AUTH_cfa/c/o')
req.headers['Authorization'] = 'access:signature'
req.headers['X-Storage-Token'] = 'token'
@ -185,8 +187,8 @@ class S3TokenMiddlewareTestBad(S3TokenMiddlewareTestBase):
self.assertEqual(resp.status_int, s3_invalid_req.status_int)
def test_bad_reply(self):
httpretty.register_uri(httpretty.POST, self.TEST_URL,
status=201, body="<badreply>")
self.requests.register_uri('POST', self.TEST_URL,
status_code=201, text="<badreply>")
req = webob.Request.blank('/v1/AUTH_cfa/c/o')
req.headers['Authorization'] = 'access:signature'

View File

@ -15,12 +15,8 @@ import sys
import time
import fixtures
import httpretty
import mock
from oslo.serialization import jsonutils
import requests
import six
from six.moves.urllib import parse as urlparse
import testtools
import uuid
@ -49,58 +45,6 @@ class TestCase(testtools.TestCase):
self.time_patcher.stop()
super(TestCase, self).tearDown()
def stub_url(self, method, parts=None, base_url=None, json=None, **kwargs):
if not base_url:
base_url = self.TEST_URL
if json:
kwargs['body'] = jsonutils.dumps(json)
kwargs['content_type'] = 'application/json'
if parts:
url = '/'.join([p.strip('/') for p in [base_url] + parts])
else:
url = base_url
# For urls containing queries
url = url.replace("/?", "?")
httpretty.register_uri(method, url, **kwargs)
def assertRequestBodyIs(self, body=None, json=None):
last_request_body = httpretty.last_request().body
if six.PY3:
last_request_body = last_request_body.decode('utf-8')
if json:
val = jsonutils.loads(last_request_body)
self.assertEqual(json, val)
elif body:
self.assertEqual(body, last_request_body)
def assertQueryStringIs(self, qs=''):
"""Verify the QueryString matches what is expected.
The qs parameter should be of the format \'foo=bar&abc=xyz\'
"""
expected = urlparse.parse_qs(qs)
self.assertEqual(expected, httpretty.last_request().querystring)
def assertQueryStringContains(self, **kwargs):
qs = httpretty.last_request().querystring
for k, v in six.iteritems(kwargs):
self.assertIn(k, qs)
self.assertIn(v, qs[k])
def assertRequestHeaderEqual(self, name, val):
"""Verify that the last request made contains a header and its value
The request must have already been made and httpretty must have been
activated for the request.
"""
headers = httpretty.last_request().headers
self.assertEqual(headers.get(name), val)
if tuple(sys.version_info)[0:2] < (2, 7):

View File

@ -6,11 +6,11 @@ coverage>=3.6
discover
fixtures>=0.3.14
hacking>=0.8.0,<0.9
httpretty>=0.8.0,!=0.8.1,!=0.8.2,!=0.8.3
mock>=1.0
pycrypto>=2.6
oslosphinx>=2.2.0 # Apache-2.0
oslotest>=1.1.0 # Apache-2.0
requests-mock>=0.4.0 # Apache-2.0
sphinx>=1.1.2,!=1.2.0,<1.3
testrepository>=0.0.18
testresources>=0.2.4