diff --git a/keystonemiddleware/tests/test_auth_token_middleware.py b/keystonemiddleware/tests/test_auth_token_middleware.py index 8b12a20a..5f080f5d 100644 --- a/keystonemiddleware/tests/test_auth_token_middleware.py +++ b/keystonemiddleware/tests/test_auth_token_middleware.py @@ -23,7 +23,6 @@ import time import uuid import fixtures -import httpretty from keystoneclient import access from keystoneclient.common import cms from keystoneclient import exceptions @@ -32,6 +31,7 @@ from keystoneclient import session import mock from oslo.serialization import jsonutils from oslo.utils import timeutils +from requests_mock.contrib import fixture as rm_fixture import six import testresources import testtools @@ -305,6 +305,7 @@ class BaseAuthTokenMiddlewareTest(testtools.TestCase): self.expected_env = expected_env or dict() self.fake_app = fake_app or FakeApp self.middleware = None + self.requests = self.useFixture(rm_fixture.Fixture()) self.conf = { 'identity_uri': 'https://keystone.example.com:1234/testadmin/', @@ -363,10 +364,9 @@ class BaseAuthTokenMiddlewareTest(testtools.TestCase): def assertLastPath(self, path): if path: - self.assertEqual(path, httpretty.last_request().path) + self.assertEqual(BASE_URI + path, self.requests.last_request.url) else: - self.assertIsInstance(httpretty.last_request(), - httpretty.core.HTTPrettyRequestEmpty) + self.assertIsNone(self.requests.last_request) class DiabloAuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest, @@ -387,25 +387,20 @@ class DiabloAuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest, super(DiabloAuthTokenMiddlewareTest, self).setUp( expected_env=expected_env) - httpretty.reset() - httpretty.enable() - self.addCleanup(httpretty.disable) + self.requests.register_uri('GET', + "%s/" % BASE_URI, + text=VERSION_LIST_v2, + status_code=300) - httpretty.register_uri(httpretty.GET, - "%s/" % BASE_URI, - body=VERSION_LIST_v2, - status=300) - - httpretty.register_uri(httpretty.POST, - "%s/v2.0/tokens" % BASE_URI, - body=FAKE_ADMIN_TOKEN) + self.requests.register_uri('POST', + "%s/v2.0/tokens" % BASE_URI, + text=FAKE_ADMIN_TOKEN) self.token_id = self.examples.VALID_DIABLO_TOKEN token_response = self.examples.JSON_TOKEN_RESPONSES[self.token_id] - httpretty.register_uri(httpretty.GET, - "%s/v2.0/tokens/%s" % (BASE_URI, self.token_id), - body=token_response) + url = "%s/v2.0/tokens/%s" % (BASE_URI, self.token_id) + self.requests.register_uri('GET', url, text=token_response) self.set_middleware() @@ -530,7 +525,6 @@ class GeneralAuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest, @testtools.skipUnless(memcached_available(), 'memcached not available') def test_encrypt_cache_data(self): - httpretty.disable() conf = { 'memcached_servers': MEMCACHED_SERVERS, 'memcache_security_strategy': 'encrypt', @@ -548,7 +542,6 @@ class GeneralAuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest, @testtools.skipUnless(memcached_available(), 'memcached not available') def test_sign_cache_data(self): - httpretty.disable() conf = { 'memcached_servers': MEMCACHED_SERVERS, 'memcache_security_strategy': 'mac', @@ -566,7 +559,6 @@ class GeneralAuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest, @testtools.skipUnless(memcached_available(), 'memcached not available') def test_no_memcache_protection(self): - httpretty.disable() conf = { 'memcached_servers': MEMCACHED_SERVERS, 'memcache_secret_key': 'mysecret' @@ -1010,10 +1002,9 @@ class CommonAuthTokenMiddlewareTest(object): in_memory_list) def test_invalid_revocation_list_raises_service_error(self): - httpretty.register_uri(httpretty.GET, - "%s/v2.0/tokens/revoked" % BASE_URI, - body="{}", - status=200) + self.requests.register_uri('GET', + "%s/v2.0/tokens/revoked" % BASE_URI, + json={}) self.assertRaises(auth_token.ServiceError, self.middleware._fetch_revocation_list) @@ -1028,7 +1019,7 @@ class CommonAuthTokenMiddlewareTest(object): # remember because we are testing the middleware we stub the connection # to the keystone server, but this is not what gets returned invalid_uri = "%s/v2.0/tokens/invalid-token" % BASE_URI - httpretty.register_uri(httpretty.GET, invalid_uri, body="", status=404) + self.requests.register_uri('GET', invalid_uri, status_code=404) req = webob.Request.blank('/') req.headers['X-Auth-Token'] = 'invalid-token' @@ -1101,9 +1092,6 @@ class CommonAuthTokenMiddlewareTest(object): return self.middleware._token_cache._cache_get(token_id) def test_memcache(self): - # NOTE(jamielennox): it appears that httpretty can mess with the - # memcache socket. Just disable it as it's not required here anyway. - httpretty.disable() req = webob.Request.blank('/') token = self.token_dict['signed_token_scoped'] req.headers['X-Auth-Token'] = token @@ -1111,7 +1099,6 @@ class CommonAuthTokenMiddlewareTest(object): self.assertIsNotNone(self._get_cached_token(token)) def test_expired(self): - httpretty.disable() req = webob.Request.blank('/') token = self.token_dict['signed_token_scoped_expired'] req.headers['X-Auth-Token'] = token @@ -1120,7 +1107,7 @@ class CommonAuthTokenMiddlewareTest(object): def test_memcache_set_invalid_uuid(self): invalid_uri = "%s/v2.0/tokens/invalid-token" % BASE_URI - httpretty.register_uri(httpretty.GET, invalid_uri, body="", status=404) + self.requests.register_uri('GET', invalid_uri, status_code=404) req = webob.Request.blank('/') token = 'invalid-token' @@ -1155,7 +1142,6 @@ class CommonAuthTokenMiddlewareTest(object): exp_mode='sha256') def test_memcache_set_expired(self, extra_conf={}, extra_environ={}): - httpretty.disable() token_cache_time = 10 conf = { 'token_cache_time': token_cache_time, @@ -1415,10 +1401,10 @@ class CommonAuthTokenMiddlewareTest(object): for service_url in (self.examples.UNVERSIONED_SERVICE_URL, self.examples.SERVICE_URL): - httpretty.register_uri(httpretty.GET, - service_url, - body=VERSION_LIST_v3, - status_code=300) + self.requests.register_uri('GET', + service_url, + text=VERSION_LIST_v3, + status_code=300) req = webob.Request.blank('/') req.headers['X-Auth-Token'] = self.token_dict['uuid_token_default'] @@ -1460,22 +1446,18 @@ class V2CertDownloadMiddlewareTest(BaseAuthTokenMiddlewareTest, } self.set_middleware(conf=conf) - httpretty.reset() - httpretty.enable() - self.addCleanup(httpretty.disable) - # Usually we supply a signed_dir with pre-installed certificates, # so invocation of /usr/bin/openssl succeeds. This time we give it # an empty directory, so it fails. def test_request_no_token_dummy(self): cms._ensure_subprocess() - httpretty.register_uri(httpretty.GET, - "%s%s" % (BASE_URI, self.ca_path), - status=404) - httpretty.register_uri(httpretty.GET, - "%s%s" % (BASE_URI, self.signing_path), - status=404) + self.requests.register_uri('GET', + "%s%s" % (BASE_URI, self.ca_path), + status_code=404) + self.requests.register_uri('GET', + "%s%s" % (BASE_URI, self.signing_path), + status_code=404) self.assertRaises(exceptions.CertificateConfigError, self.middleware._verify_signed_token, self.examples.SIGNED_TOKEN_SCOPED, @@ -1483,29 +1465,25 @@ class V2CertDownloadMiddlewareTest(BaseAuthTokenMiddlewareTest, def test_fetch_signing_cert(self): data = 'FAKE CERT' - httpretty.register_uri(httpretty.GET, - "%s%s" % (BASE_URI, self.signing_path), - body=data) + url = "%s%s" % (BASE_URI, self.signing_path) + self.requests.register_uri('GET', url, text=data) self.middleware._fetch_signing_cert() with open(self.middleware._signing_cert_file_name, 'r') as f: self.assertEqual(f.read(), data) - self.assertEqual("/testadmin%s" % self.signing_path, - httpretty.last_request().path) + self.assertEqual(url, self.requests.last_request.url) def test_fetch_signing_ca(self): data = 'FAKE CA' - httpretty.register_uri(httpretty.GET, - "%s%s" % (BASE_URI, self.ca_path), - body=data) + url = "%s%s" % (BASE_URI, self.ca_path) + self.requests.register_uri('GET', url, text=data) self.middleware._fetch_ca_cert() with open(self.middleware._signing_ca_file_name, 'r') as f: self.assertEqual(f.read(), data) - self.assertEqual("/testadmin%s" % self.ca_path, - httpretty.last_request().path) + self.assertEqual(url, self.requests.last_request.url) def test_prefix_trailing_slash(self): del self.conf['identity_uri'] @@ -1514,24 +1492,19 @@ class V2CertDownloadMiddlewareTest(BaseAuthTokenMiddlewareTest, self.conf['auth_port'] = 1234 self.conf['auth_admin_prefix'] = '/newadmin/' - httpretty.register_uri(httpretty.GET, - "%s/newadmin%s" % (BASE_HOST, self.ca_path), - body='FAKECA') - httpretty.register_uri(httpretty.GET, - "%s/newadmin%s" % - (BASE_HOST, self.signing_path), body='FAKECERT') + ca_url = "%s/newadmin%s" % (BASE_HOST, self.ca_path) + signing_url = "%s/newadmin%s" % (BASE_HOST, self.signing_path) + + self.requests.register_uri('GET', ca_url, text='FAKECA') + self.requests.register_uri('GET', signing_url, text='FAKECERT') self.set_middleware(conf=self.conf) self.middleware._fetch_ca_cert() - - self.assertEqual('/newadmin%s' % self.ca_path, - httpretty.last_request().path) + self.assertEqual(ca_url, self.requests.last_request.url) self.middleware._fetch_signing_cert() - - self.assertEqual('/newadmin%s' % self.signing_path, - httpretty.last_request().path) + self.assertEqual(signing_url, self.requests.last_request.url) def test_without_prefix(self): del self.conf['identity_uri'] @@ -1540,24 +1513,18 @@ class V2CertDownloadMiddlewareTest(BaseAuthTokenMiddlewareTest, self.conf['auth_port'] = 1234 self.conf['auth_admin_prefix'] = '' - httpretty.register_uri(httpretty.GET, - "%s%s" % (BASE_HOST, self.ca_path), - body='FAKECA') - httpretty.register_uri(httpretty.GET, - "%s%s" % (BASE_HOST, self.signing_path), - body='FAKECERT') + ca_url = "%s%s" % (BASE_HOST, self.ca_path) + signing_url = "%s%s" % (BASE_HOST, self.signing_path) + self.requests.register_uri('GET', ca_url, text='FAKECA') + self.requests.register_uri('GET', signing_url, text='FAKECERT') self.set_middleware(conf=self.conf) self.middleware._fetch_ca_cert() - - self.assertEqual(self.ca_path, - httpretty.last_request().path) + self.assertEqual(ca_url, self.requests.last_request.url) self.middleware._fetch_signing_cert() - - self.assertEqual(self.signing_path, - httpretty.last_request().path) + self.assertEqual(signing_url, self.requests.last_request.url) class V3CertDownloadMiddlewareTest(V2CertDownloadMiddlewareTest): @@ -1570,7 +1537,7 @@ class V3CertDownloadMiddlewareTest(V2CertDownloadMiddlewareTest): self.signing_path = '/v3/OS-SIMPLE-CERT/certificates' -def network_error_response(method, uri, headers): +def network_error_response(request, context): raise auth_token.NetworkError("Network connection error.") @@ -1619,23 +1586,18 @@ class v2AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest, self.examples.REVOKED_TOKEN_HASH_SHA256, } - httpretty.reset() - httpretty.enable() - self.addCleanup(httpretty.disable) + self.requests.register_uri('GET', + "%s/" % BASE_URI, + text=VERSION_LIST_v2, + status_code=300) - httpretty.register_uri(httpretty.GET, - "%s/" % BASE_URI, - body=VERSION_LIST_v2, - status=300) + self.requests.register_uri('POST', + "%s/v2.0/tokens" % BASE_URI, + text=FAKE_ADMIN_TOKEN) - httpretty.register_uri(httpretty.POST, - "%s/v2.0/tokens" % BASE_URI, - body=FAKE_ADMIN_TOKEN) - - httpretty.register_uri(httpretty.GET, - "%s/v2.0/tokens/revoked" % BASE_URI, - body=self.examples.SIGNED_REVOCATION_LIST, - status=200) + self.requests.register_uri('GET', + "%s/v2.0/tokens/revoked" % BASE_URI, + text=self.examples.SIGNED_REVOCATION_LIST) for token in (self.examples.UUID_TOKEN_DEFAULT, self.examples.UUID_TOKEN_UNSCOPED, @@ -1644,14 +1606,12 @@ class v2AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest, self.examples.UUID_TOKEN_NO_SERVICE_CATALOG, self.examples.SIGNED_TOKEN_SCOPED_KEY, self.examples.SIGNED_TOKEN_SCOPED_PKIZ_KEY,): - httpretty.register_uri(httpretty.GET, - "%s/v2.0/tokens/%s" % (BASE_URI, token), - body= - self.examples.JSON_TOKEN_RESPONSES[token]) + url = "%s/v2.0/tokens/%s" % (BASE_URI, token) + text = self.examples.JSON_TOKEN_RESPONSES[token] + self.requests.register_uri('GET', url, text=text) - httpretty.register_uri(httpretty.GET, - '%s/v2.0/tokens/%s' % (BASE_URI, ERROR_TOKEN), - body=network_error_response) + url = '%s/v2.0/tokens/%s' % (BASE_URI, ERROR_TOKEN) + self.requests.register_uri('GET', url, text=network_error_response) self.set_middleware() @@ -1669,7 +1629,7 @@ class v2AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest, self.assertIn('keystone.token_info', req.environ) def assert_valid_last_url(self, token_id): - self.assertLastPath("/testadmin/v2.0/tokens/%s" % token_id) + self.assertLastPath("/v2.0/tokens/%s" % token_id) def test_default_tenant_uuid_token(self): self.assert_unscoped_default_tenant_auto_scopes( @@ -1712,7 +1672,6 @@ class CrossVersionAuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest, resources = [('examples', client_fixtures.EXAMPLES_RESOURCE)] - @httpretty.activate def test_valid_uuid_request_forced_to_2_0(self): """Test forcing auth_token to use lower api version. @@ -1728,20 +1687,19 @@ class CrossVersionAuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest, 'auth_version': 'v2.0' } - httpretty.register_uri(httpretty.GET, - "%s/" % BASE_URI, - body=VERSION_LIST_v3, - status=300) + self.requests.register_uri('GET', + BASE_URI, + text=VERSION_LIST_v3, + status_code=300) - httpretty.register_uri(httpretty.POST, - "%s/v2.0/tokens" % BASE_URI, - body=FAKE_ADMIN_TOKEN) + self.requests.register_uri('POST', + "%s/v2.0/tokens" % BASE_URI, + text=FAKE_ADMIN_TOKEN) token = self.examples.UUID_TOKEN_DEFAULT - httpretty.register_uri(httpretty.GET, - "%s/v2.0/tokens/%s" % (BASE_URI, token), - body= - self.examples.JSON_TOKEN_RESPONSES[token]) + url = "%s/v2.0/tokens/%s" % (BASE_URI, token) + text = self.examples.JSON_TOKEN_RESPONSES[token] + self.requests.register_uri('GET', url, text=text) self.set_middleware(conf=conf) @@ -1751,9 +1709,7 @@ class CrossVersionAuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest, req.headers['X-Auth-Token'] = self.examples.UUID_TOKEN_DEFAULT self.middleware(req.environ, self.start_fake_response) self.assertEqual(self.response_status, 200) - self.assertEqual("/testadmin/v2.0/tokens/%s" % - self.examples.UUID_TOKEN_DEFAULT, - httpretty.last_request().path) + self.assertEqual(url, self.requests.last_request.url) class v3AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest, @@ -1814,41 +1770,32 @@ class v3AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest, self.examples.REVOKED_v3_PKIZ_TOKEN_HASH, } - httpretty.reset() - httpretty.enable() - self.addCleanup(httpretty.disable) - - httpretty.register_uri(httpretty.GET, - "%s" % BASE_URI, - body=VERSION_LIST_v3, - status=300) + self.requests.register_uri('GET', + BASE_URI, + text=VERSION_LIST_v3, + status_code=300) # TODO(jamielennox): auth_token middleware uses a v2 admin token # regardless of the auth_version that is set. - httpretty.register_uri(httpretty.POST, - "%s/v2.0/tokens" % BASE_URI, - body=FAKE_ADMIN_TOKEN) + self.requests.register_uri('POST', + "%s/v2.0/tokens" % BASE_URI, + text=FAKE_ADMIN_TOKEN) # TODO(jamielennox): there is no v3 revocation url yet, it uses v2 - httpretty.register_uri(httpretty.GET, - "%s/v2.0/tokens/revoked" % BASE_URI, - body=self.examples.SIGNED_REVOCATION_LIST, - status=200) + self.requests.register_uri('GET', + "%s/v2.0/tokens/revoked" % BASE_URI, + text=self.examples.SIGNED_REVOCATION_LIST) - httpretty.register_uri(httpretty.GET, - "%s/v3/auth/tokens" % BASE_URI, - body=self.token_response) + self.requests.register_uri('GET', + "%s/v3/auth/tokens" % BASE_URI, + text=self.token_response) self.set_middleware() - def token_response(self, request, uri, headers): + def token_response(self, request, context): auth_id = request.headers.get('X-Auth-Token') token_id = request.headers.get('X-Subject-Token') self.assertEqual(auth_id, FAKE_ADMIN_TOKEN_ID) - headers.pop('status') - - status = 200 - response = "" if token_id == ERROR_TOKEN: raise auth_token.NetworkError("Network connection error.") @@ -1856,12 +1803,13 @@ class v3AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest, try: response = self.examples.JSON_TOKEN_RESPONSES[token_id] except KeyError: - status = 404 + response = "" + context.status_code = 404 - return status, headers, response + return response def assert_valid_last_url(self, token_id): - self.assertLastPath('/testadmin/v3/auth/tokens') + self.assertLastPath('/v3/auth/tokens') def test_valid_unscoped_uuid_request(self): # Remove items that won't be in an unscoped token @@ -1879,7 +1827,7 @@ class v3AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest, self.set_middleware(expected_env=delta_expected_env) self.assert_valid_request_200(self.examples.v3_UUID_TOKEN_UNSCOPED, with_catalog=False) - self.assertLastPath('/testadmin/v3/auth/tokens') + self.assertLastPath('/v3/auth/tokens') def test_domain_scoped_uuid_request(self): # Modify items compared to default token for a domain scope @@ -1897,7 +1845,7 @@ class v3AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest, self.set_middleware(expected_env=delta_expected_env) self.assert_valid_request_200( self.examples.v3_UUID_TOKEN_DOMAIN_SCOPED) - self.assertLastPath('/testadmin/v3/auth/tokens') + self.assertLastPath('/v3/auth/tokens') def test_gives_v2_catalog(self): self.set_middleware() @@ -2392,36 +2340,32 @@ class v2CompositeAuthTests(BaseAuthTokenMiddlewareTest, 'uuid_service_token_default': uuid_service_token_default, } - httpretty.reset() - httpretty.enable() - self.addCleanup(httpretty.disable) + self.requests.register_uri('GET', + "%s/" % BASE_URI, + text=VERSION_LIST_v2, + status_code=300) - httpretty.register_uri(httpretty.GET, - "%s/" % BASE_URI, - body=VERSION_LIST_v2, - status=300) + self.requests.register_uri('POST', + "%s/v2.0/tokens" % BASE_URI, + text=FAKE_ADMIN_TOKEN) - httpretty.register_uri(httpretty.POST, - "%s/v2.0/tokens" % BASE_URI, - body=FAKE_ADMIN_TOKEN) - - httpretty.register_uri(httpretty.GET, - "%s/v2.0/tokens/revoked" % BASE_URI, - body=self.examples.SIGNED_REVOCATION_LIST, - status=200) + self.requests.register_uri('GET', + "%s/v2.0/tokens/revoked" % BASE_URI, + text=self.examples.SIGNED_REVOCATION_LIST, + status_code=200) for token in (self.examples.UUID_TOKEN_DEFAULT, self.examples.UUID_SERVICE_TOKEN_DEFAULT,): - httpretty.register_uri(httpretty.GET, - "%s/v2.0/tokens/%s" % (BASE_URI, token), - body= - self.examples.JSON_TOKEN_RESPONSES[token]) + self.requests.register_uri( + 'GET', + "%s/v2.0/tokens/%s" % (BASE_URI, token), + text=self.examples.JSON_TOKEN_RESPONSES[token]) for invalid_uri in ("%s/v2.0/tokens/invalid-token" % BASE_URI, "%s/v2.0/tokens/invalid-service-token" % BASE_URI): - httpretty.register_uri(httpretty.GET, - invalid_uri, - body="", status=404) + self.requests.register_uri('GET', + invalid_uri, + text="", status_code=404) self.token_expected_env = dict(EXPECTED_V2_DEFAULT_ENV_RESPONSE) self.service_token_expected_env = dict( @@ -2453,30 +2397,25 @@ class v3CompositeAuthTests(BaseAuthTokenMiddlewareTest, 'uuid_service_token_default': uuid_serv_token_default, } - httpretty.reset() - httpretty.enable() - self.addCleanup(httpretty.disable) - - httpretty.register_uri(httpretty.GET, - "%s" % BASE_URI, - body=VERSION_LIST_v3, - status=300) + self.requests.register_uri('GET', + "%s" % BASE_URI, + text=VERSION_LIST_v3, + status_code=300) # TODO(jamielennox): auth_token middleware uses a v2 admin token # regardless of the auth_version that is set. - httpretty.register_uri(httpretty.POST, - "%s/v2.0/tokens" % BASE_URI, - body=FAKE_ADMIN_TOKEN) + self.requests.register_uri('POST', + "%s/v2.0/tokens" % BASE_URI, + text=FAKE_ADMIN_TOKEN) # TODO(jamielennox): there is no v3 revocation url yet, it uses v2 - httpretty.register_uri(httpretty.GET, - "%s/v2.0/tokens/revoked" % BASE_URI, - body=self.examples.SIGNED_REVOCATION_LIST, - status=200) + self.requests.register_uri('GET', + "%s/v2.0/tokens/revoked" % BASE_URI, + text=self.examples.SIGNED_REVOCATION_LIST) - httpretty.register_uri(httpretty.GET, - "%s/v3/auth/tokens" % BASE_URI, - body=self.token_response) + self.requests.register_uri('GET', + "%s/v3/auth/tokens" % BASE_URI, + text=self.token_response) self.token_expected_env = dict(EXPECTED_V2_DEFAULT_ENV_RESPONSE) self.token_expected_env.update(EXPECTED_V3_DEFAULT_ENV_ADDITIONS) @@ -2486,11 +2425,10 @@ class v3CompositeAuthTests(BaseAuthTokenMiddlewareTest, EXPECTED_V3_DEFAULT_SERVICE_ENV_ADDITIONS) self.set_middleware() - def token_response(self, request, uri, headers): + def token_response(self, request, context): auth_id = request.headers.get('X-Auth-Token') token_id = request.headers.get('X-Subject-Token') self.assertEqual(auth_id, FAKE_ADMIN_TOKEN_ID) - headers.pop('status') status = 200 response = "" @@ -2503,7 +2441,8 @@ class v3CompositeAuthTests(BaseAuthTokenMiddlewareTest, except KeyError: status = 404 - return status, headers, response + context.status_code = status + return response def load_tests(loader, tests, pattern): diff --git a/keystonemiddleware/tests/test_s3_token_middleware.py b/keystonemiddleware/tests/test_s3_token_middleware.py index 073ff946..b5b2bf76 100644 --- a/keystonemiddleware/tests/test_s3_token_middleware.py +++ b/keystonemiddleware/tests/test_s3_token_middleware.py @@ -12,10 +12,10 @@ # License for the specific language governing permissions and limitations # under the License. -import httpretty import mock from oslo.serialization import jsonutils import requests +from requests_mock.contrib import fixture as rm_fixture import six import testtools import webob @@ -54,9 +54,7 @@ class S3TokenMiddlewareTestBase(utils.TestCase): 'auth_protocol': self.TEST_PROTOCOL, } - httpretty.reset() - httpretty.enable() - self.addCleanup(httpretty.disable) + self.requests = self.useFixture(rm_fixture.Fixture()) def start_fake_response(self, status, headers): self.response_status = int(status.split(' ', 1)[0]) @@ -69,8 +67,8 @@ class S3TokenMiddlewareTestGood(S3TokenMiddlewareTestBase): super(S3TokenMiddlewareTestGood, self).setUp() self.middleware = s3_token.S3Token(FakeApp(), self.conf) - httpretty.register_uri(httpretty.POST, self.TEST_URL, - status=201, body=jsonutils.dumps(GOOD_RESPONSE)) + self.requests.register_uri('POST', self.TEST_URL, + status_code=201, json=GOOD_RESPONSE) # Ignore the request and pass to the next middleware in the # pipeline if no path has been specified. @@ -101,6 +99,10 @@ class S3TokenMiddlewareTestGood(S3TokenMiddlewareTestBase): self.assertEqual(req.headers['X-Auth-Token'], 'TOKEN_ID') def test_authorized_http(self): + self.requests.register_uri('POST', + self.TEST_URL.replace('https', 'http'), + status_code=201, json=GOOD_RESPONSE) + self.middleware = ( s3_token.filter_factory({'auth_protocol': 'http', 'auth_host': self.TEST_HOST, @@ -152,8 +154,8 @@ class S3TokenMiddlewareTestBad(S3TokenMiddlewareTestBase): {"message": "EC2 access key not found.", "code": 401, "title": "Unauthorized"}} - httpretty.register_uri(httpretty.POST, self.TEST_URL, - status=403, body=jsonutils.dumps(ret)) + self.requests.register_uri('POST', self.TEST_URL, + status_code=403, json=ret) req = webob.Request.blank('/v1/AUTH_cfa/c/o') req.headers['Authorization'] = 'access:signature' req.headers['X-Storage-Token'] = 'token' @@ -185,8 +187,8 @@ class S3TokenMiddlewareTestBad(S3TokenMiddlewareTestBase): self.assertEqual(resp.status_int, s3_invalid_req.status_int) def test_bad_reply(self): - httpretty.register_uri(httpretty.POST, self.TEST_URL, - status=201, body="") + self.requests.register_uri('POST', self.TEST_URL, + status_code=201, text="") req = webob.Request.blank('/v1/AUTH_cfa/c/o') req.headers['Authorization'] = 'access:signature' diff --git a/keystonemiddleware/tests/utils.py b/keystonemiddleware/tests/utils.py index a92ef435..da6f347a 100644 --- a/keystonemiddleware/tests/utils.py +++ b/keystonemiddleware/tests/utils.py @@ -15,12 +15,8 @@ import sys import time import fixtures -import httpretty import mock -from oslo.serialization import jsonutils import requests -import six -from six.moves.urllib import parse as urlparse import testtools import uuid @@ -49,58 +45,6 @@ class TestCase(testtools.TestCase): self.time_patcher.stop() super(TestCase, self).tearDown() - def stub_url(self, method, parts=None, base_url=None, json=None, **kwargs): - if not base_url: - base_url = self.TEST_URL - - if json: - kwargs['body'] = jsonutils.dumps(json) - kwargs['content_type'] = 'application/json' - - if parts: - url = '/'.join([p.strip('/') for p in [base_url] + parts]) - else: - url = base_url - - # For urls containing queries - url = url.replace("/?", "?") - httpretty.register_uri(method, url, **kwargs) - - def assertRequestBodyIs(self, body=None, json=None): - last_request_body = httpretty.last_request().body - if six.PY3: - last_request_body = last_request_body.decode('utf-8') - - if json: - val = jsonutils.loads(last_request_body) - self.assertEqual(json, val) - elif body: - self.assertEqual(body, last_request_body) - - def assertQueryStringIs(self, qs=''): - """Verify the QueryString matches what is expected. - - The qs parameter should be of the format \'foo=bar&abc=xyz\' - """ - expected = urlparse.parse_qs(qs) - self.assertEqual(expected, httpretty.last_request().querystring) - - def assertQueryStringContains(self, **kwargs): - qs = httpretty.last_request().querystring - - for k, v in six.iteritems(kwargs): - self.assertIn(k, qs) - self.assertIn(v, qs[k]) - - def assertRequestHeaderEqual(self, name, val): - """Verify that the last request made contains a header and its value - - The request must have already been made and httpretty must have been - activated for the request. - """ - headers = httpretty.last_request().headers - self.assertEqual(headers.get(name), val) - if tuple(sys.version_info)[0:2] < (2, 7): diff --git a/test-requirements.txt b/test-requirements.txt index da84a5f3..7875caca 100644 --- a/test-requirements.txt +++ b/test-requirements.txt @@ -6,11 +6,11 @@ coverage>=3.6 discover fixtures>=0.3.14 hacking>=0.8.0,<0.9 -httpretty>=0.8.0,!=0.8.1,!=0.8.2,!=0.8.3 mock>=1.0 pycrypto>=2.6 oslosphinx>=2.2.0 # Apache-2.0 oslotest>=1.1.0 # Apache-2.0 +requests-mock>=0.4.0 # Apache-2.0 sphinx>=1.1.2,!=1.2.0,<1.3 testrepository>=0.0.18 testresources>=0.2.4