Merge "Use newer requests-mock syntax"
This commit is contained in:
commit
8d3230fe32
@ -363,20 +363,18 @@ class DiabloAuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest,
|
||||
super(DiabloAuthTokenMiddlewareTest, self).setUp(
|
||||
expected_env=expected_env)
|
||||
|
||||
self.requests.register_uri('GET',
|
||||
BASE_URI,
|
||||
json=VERSION_LIST_v2,
|
||||
status_code=300)
|
||||
self.requests.get(BASE_URI,
|
||||
json=VERSION_LIST_v2,
|
||||
status_code=300)
|
||||
|
||||
self.requests.register_uri('POST',
|
||||
"%s/v2.0/tokens" % BASE_URI,
|
||||
text=FAKE_ADMIN_TOKEN)
|
||||
self.requests.post("%s/v2.0/tokens" % BASE_URI,
|
||||
text=FAKE_ADMIN_TOKEN)
|
||||
|
||||
self.token_id = self.examples.VALID_DIABLO_TOKEN
|
||||
token_response = self.examples.JSON_TOKEN_RESPONSES[self.token_id]
|
||||
|
||||
url = "%s/v2.0/tokens/%s" % (BASE_URI, self.token_id)
|
||||
self.requests.register_uri('GET', url, text=token_response)
|
||||
self.requests.get(url, text=token_response)
|
||||
|
||||
self.set_middleware()
|
||||
|
||||
@ -948,9 +946,7 @@ class CommonAuthTokenMiddlewareTest(object):
|
||||
in_memory_list)
|
||||
|
||||
def test_invalid_revocation_list_raises_service_error(self):
|
||||
self.requests.register_uri('GET',
|
||||
"%s/v2.0/tokens/revoked" % BASE_URI,
|
||||
json={})
|
||||
self.requests.get('%s/v2.0/tokens/revoked' % BASE_URI, json={})
|
||||
|
||||
self.assertRaises(auth_token.ServiceError,
|
||||
self.middleware._fetch_revocation_list)
|
||||
@ -965,7 +961,7 @@ class CommonAuthTokenMiddlewareTest(object):
|
||||
# remember because we are testing the middleware we stub the connection
|
||||
# to the keystone server, but this is not what gets returned
|
||||
invalid_uri = "%s/v2.0/tokens/invalid-token" % BASE_URI
|
||||
self.requests.register_uri('GET', invalid_uri, status_code=404)
|
||||
self.requests.get(invalid_uri, status_code=404)
|
||||
|
||||
req = webob.Request.blank('/')
|
||||
req.headers['X-Auth-Token'] = 'invalid-token'
|
||||
@ -1053,7 +1049,7 @@ class CommonAuthTokenMiddlewareTest(object):
|
||||
|
||||
def test_memcache_set_invalid_uuid(self):
|
||||
invalid_uri = "%s/v2.0/tokens/invalid-token" % BASE_URI
|
||||
self.requests.register_uri('GET', invalid_uri, status_code=404)
|
||||
self.requests.get(invalid_uri, status_code=404)
|
||||
|
||||
req = webob.Request.blank('/')
|
||||
token = 'invalid-token'
|
||||
@ -1346,10 +1342,9 @@ class CommonAuthTokenMiddlewareTest(object):
|
||||
|
||||
for service_url in (self.examples.UNVERSIONED_SERVICE_URL,
|
||||
self.examples.SERVICE_URL):
|
||||
self.requests.register_uri('GET',
|
||||
service_url,
|
||||
json=VERSION_LIST_v3,
|
||||
status_code=300)
|
||||
self.requests.get(service_url,
|
||||
json=VERSION_LIST_v3,
|
||||
status_code=300)
|
||||
|
||||
req = webob.Request.blank('/')
|
||||
req.headers['X-Auth-Token'] = self.token_dict['uuid_token_default']
|
||||
@ -1403,12 +1398,10 @@ class V2CertDownloadMiddlewareTest(BaseAuthTokenMiddlewareTest,
|
||||
def test_request_no_token_dummy(self):
|
||||
cms._ensure_subprocess()
|
||||
|
||||
self.requests.register_uri('GET',
|
||||
"%s%s" % (BASE_URI, self.ca_path),
|
||||
status_code=404)
|
||||
self.requests.register_uri('GET',
|
||||
"%s%s" % (BASE_URI, self.signing_path),
|
||||
status_code=404)
|
||||
self.requests.get('%s%s' % (BASE_URI, self.ca_path),
|
||||
status_code=404)
|
||||
self.requests.get('%s%s' % (BASE_URI, self.signing_path),
|
||||
status_code=404)
|
||||
self.assertRaises(exceptions.CertificateConfigError,
|
||||
self.middleware._verify_signed_token,
|
||||
self.examples.SIGNED_TOKEN_SCOPED,
|
||||
@ -1417,7 +1410,7 @@ class V2CertDownloadMiddlewareTest(BaseAuthTokenMiddlewareTest,
|
||||
def test_fetch_signing_cert(self):
|
||||
data = 'FAKE CERT'
|
||||
url = "%s%s" % (BASE_URI, self.signing_path)
|
||||
self.requests.register_uri('GET', url, text=data)
|
||||
self.requests.get(url, text=data)
|
||||
self.middleware._fetch_signing_cert()
|
||||
|
||||
with open(self.middleware._signing_cert_file_name, 'r') as f:
|
||||
@ -1428,7 +1421,7 @@ class V2CertDownloadMiddlewareTest(BaseAuthTokenMiddlewareTest,
|
||||
def test_fetch_signing_ca(self):
|
||||
data = 'FAKE CA'
|
||||
url = "%s%s" % (BASE_URI, self.ca_path)
|
||||
self.requests.register_uri('GET', url, text=data)
|
||||
self.requests.get(url, text=data)
|
||||
self.middleware._fetch_ca_cert()
|
||||
|
||||
with open(self.middleware._signing_ca_file_name, 'r') as f:
|
||||
@ -1447,12 +1440,11 @@ class V2CertDownloadMiddlewareTest(BaseAuthTokenMiddlewareTest,
|
||||
ca_url = "%s%s" % (base_url, self.ca_path)
|
||||
signing_url = "%s%s" % (base_url, self.signing_path)
|
||||
|
||||
self.requests.register_uri('GET',
|
||||
base_url,
|
||||
json=VERSION_LIST_v3,
|
||||
status_code=300)
|
||||
self.requests.register_uri('GET', ca_url, text='FAKECA')
|
||||
self.requests.register_uri('GET', signing_url, text='FAKECERT')
|
||||
self.requests.get(base_url,
|
||||
json=VERSION_LIST_v3,
|
||||
status_code=300)
|
||||
self.requests.get(ca_url, text='FAKECA')
|
||||
self.requests.get(signing_url, text='FAKECERT')
|
||||
|
||||
self.set_middleware(conf=self.conf)
|
||||
|
||||
@ -1472,12 +1464,11 @@ class V2CertDownloadMiddlewareTest(BaseAuthTokenMiddlewareTest,
|
||||
ca_url = "%s%s" % (BASE_HOST, self.ca_path)
|
||||
signing_url = "%s%s" % (BASE_HOST, self.signing_path)
|
||||
|
||||
self.requests.register_uri('GET',
|
||||
BASE_HOST,
|
||||
json=VERSION_LIST_v3,
|
||||
status_code=300)
|
||||
self.requests.register_uri('GET', ca_url, text='FAKECA')
|
||||
self.requests.register_uri('GET', signing_url, text='FAKECERT')
|
||||
self.requests.get(BASE_HOST,
|
||||
json=VERSION_LIST_v3,
|
||||
status_code=300)
|
||||
self.requests.get(ca_url, text='FAKECA')
|
||||
self.requests.get(signing_url, text='FAKECERT')
|
||||
|
||||
self.set_middleware(conf=self.conf)
|
||||
|
||||
@ -1547,18 +1538,15 @@ class v2AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest,
|
||||
self.examples.REVOKED_TOKEN_HASH_SHA256,
|
||||
}
|
||||
|
||||
self.requests.register_uri('GET',
|
||||
BASE_URI,
|
||||
json=VERSION_LIST_v2,
|
||||
status_code=300)
|
||||
self.requests.get(BASE_URI,
|
||||
json=VERSION_LIST_v2,
|
||||
status_code=300)
|
||||
|
||||
self.requests.register_uri('POST',
|
||||
"%s/v2.0/tokens" % BASE_URI,
|
||||
text=FAKE_ADMIN_TOKEN)
|
||||
self.requests.post('%s/v2.0/tokens' % BASE_URI,
|
||||
text=FAKE_ADMIN_TOKEN)
|
||||
|
||||
self.requests.register_uri('GET',
|
||||
"%s/v2.0/tokens/revoked" % BASE_URI,
|
||||
text=self.examples.SIGNED_REVOCATION_LIST)
|
||||
self.requests.get('%s/v2.0/tokens/revoked' % BASE_URI,
|
||||
text=self.examples.SIGNED_REVOCATION_LIST)
|
||||
|
||||
for token in (self.examples.UUID_TOKEN_DEFAULT,
|
||||
self.examples.UUID_TOKEN_UNSCOPED,
|
||||
@ -1569,10 +1557,10 @@ class v2AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest,
|
||||
self.examples.SIGNED_TOKEN_SCOPED_PKIZ_KEY,):
|
||||
url = "%s/v2.0/tokens/%s" % (BASE_URI, token)
|
||||
text = self.examples.JSON_TOKEN_RESPONSES[token]
|
||||
self.requests.register_uri('GET', url, text=text)
|
||||
self.requests.get(url, text=text)
|
||||
|
||||
url = '%s/v2.0/tokens/%s' % (BASE_URI, ERROR_TOKEN)
|
||||
self.requests.register_uri('GET', url, text=network_error_response)
|
||||
self.requests.get(url, text=network_error_response)
|
||||
|
||||
self.set_middleware()
|
||||
|
||||
@ -1647,19 +1635,17 @@ class CrossVersionAuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest,
|
||||
'auth_version': 'v2.0'
|
||||
}
|
||||
|
||||
self.requests.register_uri('GET',
|
||||
BASE_URI,
|
||||
json=VERSION_LIST_v3,
|
||||
status_code=300)
|
||||
self.requests.get(BASE_URI,
|
||||
json=VERSION_LIST_v3,
|
||||
status_code=300)
|
||||
|
||||
self.requests.register_uri('POST',
|
||||
"%s/v2.0/tokens" % BASE_URI,
|
||||
text=FAKE_ADMIN_TOKEN)
|
||||
self.requests.post('%s/v2.0/tokens' % BASE_URI,
|
||||
text=FAKE_ADMIN_TOKEN)
|
||||
|
||||
token = self.examples.UUID_TOKEN_DEFAULT
|
||||
url = "%s/v2.0/tokens/%s" % (BASE_URI, token)
|
||||
text = self.examples.JSON_TOKEN_RESPONSES[token]
|
||||
self.requests.register_uri('GET', url, text=text)
|
||||
self.requests.get(url, text=text)
|
||||
|
||||
self.set_middleware(conf=conf)
|
||||
|
||||
@ -1730,25 +1716,21 @@ class v3AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest,
|
||||
self.examples.REVOKED_v3_PKIZ_TOKEN_HASH,
|
||||
}
|
||||
|
||||
self.requests.register_uri('GET',
|
||||
BASE_URI,
|
||||
json=VERSION_LIST_v3,
|
||||
status_code=300)
|
||||
self.requests.get(BASE_URI,
|
||||
json=VERSION_LIST_v3,
|
||||
status_code=300)
|
||||
|
||||
# TODO(jamielennox): auth_token middleware uses a v2 admin token
|
||||
# regardless of the auth_version that is set.
|
||||
self.requests.register_uri('POST',
|
||||
"%s/v2.0/tokens" % BASE_URI,
|
||||
text=FAKE_ADMIN_TOKEN)
|
||||
self.requests.post('%s/v2.0/tokens' % BASE_URI,
|
||||
text=FAKE_ADMIN_TOKEN)
|
||||
|
||||
# TODO(jamielennox): there is no v3 revocation url yet, it uses v2
|
||||
self.requests.register_uri('GET',
|
||||
"%s/v2.0/tokens/revoked" % BASE_URI,
|
||||
text=self.examples.SIGNED_REVOCATION_LIST)
|
||||
self.requests.get('%s/v2.0/tokens/revoked' % BASE_URI,
|
||||
text=self.examples.SIGNED_REVOCATION_LIST)
|
||||
|
||||
self.requests.register_uri('GET',
|
||||
"%s/v3/auth/tokens" % BASE_URI,
|
||||
text=self.token_response)
|
||||
self.requests.get('%s/v3/auth/tokens' % BASE_URI,
|
||||
text=self.token_response)
|
||||
|
||||
self.set_middleware()
|
||||
|
||||
@ -2302,32 +2284,25 @@ class v2CompositeAuthTests(BaseAuthTokenMiddlewareTest,
|
||||
'uuid_service_token_default': uuid_service_token_default,
|
||||
}
|
||||
|
||||
self.requests.register_uri('GET',
|
||||
BASE_URI,
|
||||
json=VERSION_LIST_v2,
|
||||
status_code=300)
|
||||
self.requests.get(BASE_URI,
|
||||
json=VERSION_LIST_v2,
|
||||
status_code=300)
|
||||
|
||||
self.requests.register_uri('POST',
|
||||
"%s/v2.0/tokens" % BASE_URI,
|
||||
text=FAKE_ADMIN_TOKEN)
|
||||
self.requests.post('%s/v2.0/tokens' % BASE_URI,
|
||||
text=FAKE_ADMIN_TOKEN)
|
||||
|
||||
self.requests.register_uri('GET',
|
||||
"%s/v2.0/tokens/revoked" % BASE_URI,
|
||||
text=self.examples.SIGNED_REVOCATION_LIST,
|
||||
status_code=200)
|
||||
self.requests.get('%s/v2.0/tokens/revoked' % BASE_URI,
|
||||
text=self.examples.SIGNED_REVOCATION_LIST,
|
||||
status_code=200)
|
||||
|
||||
for token in (self.examples.UUID_TOKEN_DEFAULT,
|
||||
self.examples.UUID_SERVICE_TOKEN_DEFAULT,):
|
||||
self.requests.register_uri(
|
||||
'GET',
|
||||
"%s/v2.0/tokens/%s" % (BASE_URI, token),
|
||||
text=self.examples.JSON_TOKEN_RESPONSES[token])
|
||||
self.requests.get('%s/v2.0/tokens/%s' % (BASE_URI, token),
|
||||
text=self.examples.JSON_TOKEN_RESPONSES[token])
|
||||
|
||||
for invalid_uri in ("%s/v2.0/tokens/invalid-token" % BASE_URI,
|
||||
"%s/v2.0/tokens/invalid-service-token" % BASE_URI):
|
||||
self.requests.register_uri('GET',
|
||||
invalid_uri,
|
||||
text="", status_code=404)
|
||||
self.requests.get(invalid_uri, text='', status_code=404)
|
||||
|
||||
self.token_expected_env = dict(EXPECTED_V2_DEFAULT_ENV_RESPONSE)
|
||||
self.service_token_expected_env = dict(
|
||||
@ -2359,25 +2334,19 @@ class v3CompositeAuthTests(BaseAuthTokenMiddlewareTest,
|
||||
'uuid_service_token_default': uuid_serv_token_default,
|
||||
}
|
||||
|
||||
self.requests.register_uri('GET',
|
||||
BASE_URI,
|
||||
json=VERSION_LIST_v3,
|
||||
status_code=300)
|
||||
self.requests.get(BASE_URI, json=VERSION_LIST_v3, status_code=300)
|
||||
|
||||
# TODO(jamielennox): auth_token middleware uses a v2 admin token
|
||||
# regardless of the auth_version that is set.
|
||||
self.requests.register_uri('POST',
|
||||
"%s/v2.0/tokens" % BASE_URI,
|
||||
text=FAKE_ADMIN_TOKEN)
|
||||
self.requests.post('%s/v2.0/tokens' % BASE_URI,
|
||||
text=FAKE_ADMIN_TOKEN)
|
||||
|
||||
# TODO(jamielennox): there is no v3 revocation url yet, it uses v2
|
||||
self.requests.register_uri('GET',
|
||||
"%s/v2.0/tokens/revoked" % BASE_URI,
|
||||
text=self.examples.SIGNED_REVOCATION_LIST)
|
||||
self.requests.get('%s/v2.0/tokens/revoked' % BASE_URI,
|
||||
text=self.examples.SIGNED_REVOCATION_LIST)
|
||||
|
||||
self.requests.register_uri('GET',
|
||||
"%s/v3/auth/tokens" % BASE_URI,
|
||||
text=self.token_response)
|
||||
self.requests.get('%s/v3/auth/tokens' % BASE_URI,
|
||||
text=self.token_response)
|
||||
|
||||
self.token_expected_env = dict(EXPECTED_V2_DEFAULT_ENV_RESPONSE)
|
||||
self.token_expected_env.update(EXPECTED_V3_DEFAULT_ENV_ADDITIONS)
|
||||
@ -2495,9 +2464,8 @@ class DefaultAuthPluginTests(testtools.TestCase):
|
||||
token = fixture.V2Token()
|
||||
admin_tenant_name = uuid.uuid4().hex
|
||||
|
||||
self.requests.register_uri('POST',
|
||||
base_uri + '/v2.0/tokens',
|
||||
json=token)
|
||||
self.requests.post(base_uri + '/v2.0/tokens',
|
||||
json=token)
|
||||
|
||||
plugin = self.new_plugin(identity_uri=base_uri,
|
||||
admin_user=uuid.uuid4().hex,
|
||||
|
@ -67,8 +67,7 @@ class S3TokenMiddlewareTestGood(S3TokenMiddlewareTestBase):
|
||||
super(S3TokenMiddlewareTestGood, self).setUp()
|
||||
self.middleware = s3_token.S3Token(FakeApp(), self.conf)
|
||||
|
||||
self.requests.register_uri('POST', self.TEST_URL,
|
||||
status_code=201, json=GOOD_RESPONSE)
|
||||
self.requests.post(self.TEST_URL, status_code=201, json=GOOD_RESPONSE)
|
||||
|
||||
# Ignore the request and pass to the next middleware in the
|
||||
# pipeline if no path has been specified.
|
||||
@ -99,9 +98,9 @@ class S3TokenMiddlewareTestGood(S3TokenMiddlewareTestBase):
|
||||
self.assertEqual(req.headers['X-Auth-Token'], 'TOKEN_ID')
|
||||
|
||||
def test_authorized_http(self):
|
||||
self.requests.register_uri('POST',
|
||||
self.TEST_URL.replace('https', 'http'),
|
||||
status_code=201, json=GOOD_RESPONSE)
|
||||
self.requests.post(self.TEST_URL.replace('https', 'http'),
|
||||
status_code=201,
|
||||
json=GOOD_RESPONSE)
|
||||
|
||||
self.middleware = (
|
||||
s3_token.filter_factory({'auth_protocol': 'http',
|
||||
@ -154,8 +153,7 @@ class S3TokenMiddlewareTestBad(S3TokenMiddlewareTestBase):
|
||||
{"message": "EC2 access key not found.",
|
||||
"code": 401,
|
||||
"title": "Unauthorized"}}
|
||||
self.requests.register_uri('POST', self.TEST_URL,
|
||||
status_code=403, json=ret)
|
||||
self.requests.post(self.TEST_URL, status_code=403, json=ret)
|
||||
req = webob.Request.blank('/v1/AUTH_cfa/c/o')
|
||||
req.headers['Authorization'] = 'access:signature'
|
||||
req.headers['X-Storage-Token'] = 'token'
|
||||
@ -187,8 +185,7 @@ class S3TokenMiddlewareTestBad(S3TokenMiddlewareTestBase):
|
||||
self.assertEqual(resp.status_int, s3_invalid_req.status_int)
|
||||
|
||||
def test_bad_reply(self):
|
||||
self.requests.register_uri('POST', self.TEST_URL,
|
||||
status_code=201, text="<badreply>")
|
||||
self.requests.post(self.TEST_URL, status_code=201, text="<badreply>")
|
||||
|
||||
req = webob.Request.blank('/v1/AUTH_cfa/c/o')
|
||||
req.headers['Authorization'] = 'access:signature'
|
||||
|
Loading…
x
Reference in New Issue
Block a user