Merge "Rename requests mock object in testing"

This commit is contained in:
Jenkins 2015-03-20 03:14:20 +00:00 committed by Gerrit Code Review
commit 319e02d08b
3 changed files with 120 additions and 119 deletions

View File

@ -50,7 +50,7 @@ class DefaultAuthPluginTests(testtools.TestCase):
self.stream = six.StringIO()
self.logger = logging.getLogger(__name__)
self.session = session.Session()
self.requests = self.useFixture(rm_fixture.Fixture())
self.requests_mock = self.useFixture(rm_fixture.Fixture())
def test_auth_uri_from_fragments(self):
auth_protocol = 'http'
@ -91,8 +91,8 @@ class DefaultAuthPluginTests(testtools.TestCase):
token = fixture.V2Token()
admin_tenant_name = uuid.uuid4().hex
self.requests.post(base_uri + '/v2.0/tokens',
json=token)
self.requests_mock.post(base_uri + '/v2.0/tokens',
json=token)
plugin = self.new_plugin(identity_uri=base_uri,
admin_user=uuid.uuid4().hex,

View File

@ -309,7 +309,7 @@ class BaseAuthTokenMiddlewareTest(testtools.TestCase):
self.expected_env = expected_env or dict()
self.fake_app = fake_app or FakeApp
self.middleware = None
self.requests = self.useFixture(rm_fixture.Fixture())
self.requests_mock = self.useFixture(rm_fixture.Fixture())
signing_dir = self._setup_signing_directory()
@ -372,9 +372,10 @@ class BaseAuthTokenMiddlewareTest(testtools.TestCase):
def assertLastPath(self, path):
if path:
self.assertEqual(BASE_URI + path, self.requests.last_request.url)
self.assertEqual(BASE_URI + path,
self.requests_mock.last_request.url)
else:
self.assertIsNone(self.requests.last_request)
self.assertIsNone(self.requests_mock.last_request)
class DiabloAuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest,
@ -395,18 +396,18 @@ class DiabloAuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest,
super(DiabloAuthTokenMiddlewareTest, self).setUp(
expected_env=expected_env)
self.requests.get(BASE_URI,
json=VERSION_LIST_v2,
status_code=300)
self.requests_mock.get(BASE_URI,
json=VERSION_LIST_v2,
status_code=300)
self.requests.post("%s/v2.0/tokens" % BASE_URI,
text=FAKE_ADMIN_TOKEN)
self.requests_mock.post("%s/v2.0/tokens" % BASE_URI,
text=FAKE_ADMIN_TOKEN)
self.token_id = self.examples.VALID_DIABLO_TOKEN
token_response = self.examples.JSON_TOKEN_RESPONSES[self.token_id]
url = "%s/v2.0/tokens/%s" % (BASE_URI, self.token_id)
self.requests.get(url, text=token_response)
self.requests_mock.get(url, text=token_response)
self.set_middleware()
@ -973,7 +974,7 @@ class CommonAuthTokenMiddlewareTest(object):
in_memory_list)
def test_invalid_revocation_list_raises_error(self):
self.requests.get('%s/v2.0/tokens/revoked' % BASE_URI, json={})
self.requests_mock.get('%s/v2.0/tokens/revoked' % BASE_URI, json={})
self.assertRaises(exc.RevocationListError,
self.middleware._revocations._fetch)
@ -988,7 +989,7 @@ class CommonAuthTokenMiddlewareTest(object):
# remember because we are testing the middleware we stub the connection
# to the keystone server, but this is not what gets returned
invalid_uri = "%s/v2.0/tokens/invalid-token" % BASE_URI
self.requests.get(invalid_uri, status_code=404)
self.requests_mock.get(invalid_uri, status_code=404)
req = webob.Request.blank('/')
req.headers['X-Auth-Token'] = 'invalid-token'
@ -1076,7 +1077,7 @@ class CommonAuthTokenMiddlewareTest(object):
def test_memcache_set_invalid_uuid(self):
invalid_uri = "%s/v2.0/tokens/invalid-token" % BASE_URI
self.requests.get(invalid_uri, status_code=404)
self.requests_mock.get(invalid_uri, status_code=404)
req = webob.Request.blank('/')
token = 'invalid-token'
@ -1367,9 +1368,9 @@ class CommonAuthTokenMiddlewareTest(object):
for service_url in (self.examples.UNVERSIONED_SERVICE_URL,
self.examples.SERVICE_URL):
self.requests.get(service_url,
json=VERSION_LIST_v3,
status_code=300)
self.requests_mock.get(service_url,
json=VERSION_LIST_v3,
status_code=300)
req = webob.Request.blank('/')
req.headers['X-Auth-Token'] = self.token_dict['uuid_token_default']
@ -1414,10 +1415,9 @@ class V2CertDownloadMiddlewareTest(BaseAuthTokenMiddlewareTest,
'auth_version': self.auth_version,
}
self.requests.register_uri('GET',
BASE_URI,
json=VERSION_LIST_v3,
status_code=300)
self.requests_mock.get(BASE_URI,
json=VERSION_LIST_v3,
status_code=300)
self.set_middleware(conf=conf)
@ -1427,10 +1427,10 @@ class V2CertDownloadMiddlewareTest(BaseAuthTokenMiddlewareTest,
def test_request_no_token_dummy(self):
cms._ensure_subprocess()
self.requests.get('%s%s' % (BASE_URI, self.ca_path),
status_code=404)
self.requests.get('%s%s' % (BASE_URI, self.signing_path),
status_code=404)
self.requests_mock.get('%s%s' % (BASE_URI, self.ca_path),
status_code=404)
self.requests_mock.get('%s%s' % (BASE_URI, self.signing_path),
status_code=404)
self.assertRaises(exceptions.CertificateConfigError,
self.middleware._verify_signed_token,
self.examples.SIGNED_TOKEN_SCOPED,
@ -1439,7 +1439,7 @@ class V2CertDownloadMiddlewareTest(BaseAuthTokenMiddlewareTest,
def test_fetch_signing_cert(self):
data = 'FAKE CERT'
url = "%s%s" % (BASE_URI, self.signing_path)
self.requests.get(url, text=data)
self.requests_mock.get(url, text=data)
self.middleware._fetch_signing_cert()
signing_cert_path = self.middleware._signing_directory.calc_path(
@ -1447,12 +1447,12 @@ class V2CertDownloadMiddlewareTest(BaseAuthTokenMiddlewareTest,
with open(signing_cert_path, 'r') as f:
self.assertEqual(f.read(), data)
self.assertEqual(url, self.requests.last_request.url)
self.assertEqual(url, self.requests_mock.last_request.url)
def test_fetch_signing_ca(self):
data = 'FAKE CA'
url = "%s%s" % (BASE_URI, self.ca_path)
self.requests.get(url, text=data)
self.requests_mock.get(url, text=data)
self.middleware._fetch_ca_cert()
ca_file_path = self.middleware._signing_directory.calc_path(
@ -1460,7 +1460,7 @@ class V2CertDownloadMiddlewareTest(BaseAuthTokenMiddlewareTest,
with open(ca_file_path, 'r') as f:
self.assertEqual(f.read(), data)
self.assertEqual(url, self.requests.last_request.url)
self.assertEqual(url, self.requests_mock.last_request.url)
def test_prefix_trailing_slash(self):
del self.conf['identity_uri']
@ -1473,19 +1473,19 @@ class V2CertDownloadMiddlewareTest(BaseAuthTokenMiddlewareTest,
ca_url = "%s%s" % (base_url, self.ca_path)
signing_url = "%s%s" % (base_url, self.signing_path)
self.requests.get(base_url,
json=VERSION_LIST_v3,
status_code=300)
self.requests.get(ca_url, text='FAKECA')
self.requests.get(signing_url, text='FAKECERT')
self.requests_mock.get(base_url,
json=VERSION_LIST_v3,
status_code=300)
self.requests_mock.get(ca_url, text='FAKECA')
self.requests_mock.get(signing_url, text='FAKECERT')
self.set_middleware(conf=self.conf)
self.middleware._fetch_ca_cert()
self.assertEqual(ca_url, self.requests.last_request.url)
self.assertEqual(ca_url, self.requests_mock.last_request.url)
self.middleware._fetch_signing_cert()
self.assertEqual(signing_url, self.requests.last_request.url)
self.assertEqual(signing_url, self.requests_mock.last_request.url)
def test_without_prefix(self):
del self.conf['identity_uri']
@ -1497,19 +1497,19 @@ class V2CertDownloadMiddlewareTest(BaseAuthTokenMiddlewareTest,
ca_url = "%s%s" % (BASE_HOST, self.ca_path)
signing_url = "%s%s" % (BASE_HOST, self.signing_path)
self.requests.get(BASE_HOST,
json=VERSION_LIST_v3,
status_code=300)
self.requests.get(ca_url, text='FAKECA')
self.requests.get(signing_url, text='FAKECERT')
self.requests_mock.get(BASE_HOST,
json=VERSION_LIST_v3,
status_code=300)
self.requests_mock.get(ca_url, text='FAKECA')
self.requests_mock.get(signing_url, text='FAKECERT')
self.set_middleware(conf=self.conf)
self.middleware._fetch_ca_cert()
self.assertEqual(ca_url, self.requests.last_request.url)
self.assertEqual(ca_url, self.requests_mock.last_request.url)
self.middleware._fetch_signing_cert()
self.assertEqual(signing_url, self.requests.last_request.url)
self.assertEqual(signing_url, self.requests_mock.last_request.url)
class V3CertDownloadMiddlewareTest(V2CertDownloadMiddlewareTest):
@ -1571,15 +1571,15 @@ class v2AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest,
self.examples.REVOKED_TOKEN_HASH_SHA256,
}
self.requests.get(BASE_URI,
json=VERSION_LIST_v2,
status_code=300)
self.requests_mock.get(BASE_URI,
json=VERSION_LIST_v2,
status_code=300)
self.requests.post('%s/v2.0/tokens' % BASE_URI,
text=FAKE_ADMIN_TOKEN)
self.requests_mock.post('%s/v2.0/tokens' % BASE_URI,
text=FAKE_ADMIN_TOKEN)
self.requests.get('%s/v2.0/tokens/revoked' % BASE_URI,
text=self.examples.SIGNED_REVOCATION_LIST)
self.requests_mock.get('%s/v2.0/tokens/revoked' % BASE_URI,
text=self.examples.SIGNED_REVOCATION_LIST)
for token in (self.examples.UUID_TOKEN_DEFAULT,
self.examples.UUID_TOKEN_UNSCOPED,
@ -1590,10 +1590,10 @@ class v2AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest,
self.examples.SIGNED_TOKEN_SCOPED_PKIZ_KEY,):
url = "%s/v2.0/tokens/%s" % (BASE_URI, token)
text = self.examples.JSON_TOKEN_RESPONSES[token]
self.requests.get(url, text=text)
self.requests_mock.get(url, text=text)
url = '%s/v2.0/tokens/%s' % (BASE_URI, ERROR_TOKEN)
self.requests.get(url, text=network_error_response)
self.requests_mock.get(url, text=network_error_response)
self.set_middleware()
@ -1697,17 +1697,17 @@ class CrossVersionAuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest,
'auth_version': 'v2.0'
}
self.requests.get(BASE_URI,
json=VERSION_LIST_v3,
status_code=300)
self.requests_mock.get(BASE_URI,
json=VERSION_LIST_v3,
status_code=300)
self.requests.post('%s/v2.0/tokens' % BASE_URI,
text=FAKE_ADMIN_TOKEN)
self.requests_mock.post('%s/v2.0/tokens' % BASE_URI,
text=FAKE_ADMIN_TOKEN)
token = self.examples.UUID_TOKEN_DEFAULT
url = "%s/v2.0/tokens/%s" % (BASE_URI, token)
text = self.examples.JSON_TOKEN_RESPONSES[token]
self.requests.get(url, text=text)
self.requests_mock.get(url, text=text)
self.set_middleware(conf=conf)
@ -1717,7 +1717,7 @@ class CrossVersionAuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest,
req.headers['X-Auth-Token'] = self.examples.UUID_TOKEN_DEFAULT
self.middleware(req.environ, self.start_fake_response)
self.assertEqual(self.response_status, 200)
self.assertEqual(url, self.requests.last_request.url)
self.assertEqual(url, self.requests_mock.last_request.url)
class v3AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest,
@ -1778,21 +1778,21 @@ class v3AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest,
self.examples.REVOKED_v3_PKIZ_TOKEN_HASH,
}
self.requests.get(BASE_URI,
json=VERSION_LIST_v3,
status_code=300)
self.requests_mock.get(BASE_URI,
json=VERSION_LIST_v3,
status_code=300)
# TODO(jamielennox): auth_token middleware uses a v2 admin token
# regardless of the auth_version that is set.
self.requests.post('%s/v2.0/tokens' % BASE_URI,
text=FAKE_ADMIN_TOKEN)
self.requests_mock.post('%s/v2.0/tokens' % BASE_URI,
text=FAKE_ADMIN_TOKEN)
# TODO(jamielennox): there is no v3 revocation url yet, it uses v2
self.requests.get('%s/v2.0/tokens/revoked' % BASE_URI,
text=self.examples.SIGNED_REVOCATION_LIST)
self.requests_mock.get('%s/v2.0/tokens/revoked' % BASE_URI,
text=self.examples.SIGNED_REVOCATION_LIST)
self.requests.get('%s/v3/auth/tokens' % BASE_URI,
text=self.token_response)
self.requests_mock.get('%s/v3/auth/tokens' % BASE_URI,
text=self.token_response)
self.set_middleware()
@ -1866,26 +1866,22 @@ class v3AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest,
self.assertIn('adminURL', endpoint)
def test_fallback_to_online_validation_with_signing_error(self):
self.requests.register_uri(
'GET',
'%s/v3/OS-SIMPLE-CERT/certificates' % BASE_URI,
status_code=404)
self.requests_mock.get('%s/v3/OS-SIMPLE-CERT/certificates' % BASE_URI,
status_code=404)
self.assert_valid_request_200(self.token_dict['signed_token_scoped'])
self.assert_valid_request_200(
self.token_dict['signed_token_scoped_pkiz'])
def test_fallback_to_online_validation_with_ca_error(self):
self.requests.register_uri('GET',
'%s/v3/OS-SIMPLE-CERT/ca' % BASE_URI,
status_code=404)
self.requests_mock.get('%s/v3/OS-SIMPLE-CERT/ca' % BASE_URI,
status_code=404)
self.assert_valid_request_200(self.token_dict['signed_token_scoped'])
self.assert_valid_request_200(
self.token_dict['signed_token_scoped_pkiz'])
def test_fallback_to_online_validation_with_revocation_list_error(self):
self.requests.register_uri('GET',
'%s/v2.0/tokens/revoked' % BASE_URI,
status_code=404)
self.requests_mock.get('%s/v2.0/tokens/revoked' % BASE_URI,
status_code=404)
self.assert_valid_request_200(self.token_dict['signed_token_scoped'])
self.assert_valid_request_200(
self.token_dict['signed_token_scoped_pkiz'])
@ -2458,25 +2454,26 @@ class v2CompositeAuthTests(BaseAuthTokenMiddlewareTest,
'uuid_service_token_default': uuid_service_token_default,
}
self.requests.get(BASE_URI,
json=VERSION_LIST_v2,
status_code=300)
self.requests_mock.get(BASE_URI,
json=VERSION_LIST_v2,
status_code=300)
self.requests.post('%s/v2.0/tokens' % BASE_URI,
text=FAKE_ADMIN_TOKEN)
self.requests_mock.post('%s/v2.0/tokens' % BASE_URI,
text=FAKE_ADMIN_TOKEN)
self.requests.get('%s/v2.0/tokens/revoked' % BASE_URI,
text=self.examples.SIGNED_REVOCATION_LIST,
status_code=200)
self.requests_mock.get('%s/v2.0/tokens/revoked' % BASE_URI,
text=self.examples.SIGNED_REVOCATION_LIST,
status_code=200)
for token in (self.examples.UUID_TOKEN_DEFAULT,
self.examples.UUID_SERVICE_TOKEN_DEFAULT,):
self.requests.get('%s/v2.0/tokens/%s' % (BASE_URI, token),
text=self.examples.JSON_TOKEN_RESPONSES[token])
text = self.examples.JSON_TOKEN_RESPONSES[token]
self.requests_mock.get('%s/v2.0/tokens/%s' % (BASE_URI, token),
text=text)
for invalid_uri in ("%s/v2.0/tokens/invalid-token" % BASE_URI,
"%s/v2.0/tokens/invalid-service-token" % BASE_URI):
self.requests.get(invalid_uri, text='', status_code=404)
self.requests_mock.get(invalid_uri, text='', status_code=404)
self.token_expected_env = dict(EXPECTED_V2_DEFAULT_ENV_RESPONSE)
self.service_token_expected_env = dict(
@ -2508,19 +2505,19 @@ class v3CompositeAuthTests(BaseAuthTokenMiddlewareTest,
'uuid_service_token_default': uuid_serv_token_default,
}
self.requests.get(BASE_URI, json=VERSION_LIST_v3, status_code=300)
self.requests_mock.get(BASE_URI, json=VERSION_LIST_v3, status_code=300)
# TODO(jamielennox): auth_token middleware uses a v2 admin token
# regardless of the auth_version that is set.
self.requests.post('%s/v2.0/tokens' % BASE_URI,
text=FAKE_ADMIN_TOKEN)
self.requests_mock.post('%s/v2.0/tokens' % BASE_URI,
text=FAKE_ADMIN_TOKEN)
# TODO(jamielennox): there is no v3 revocation url yet, it uses v2
self.requests.get('%s/v2.0/tokens/revoked' % BASE_URI,
text=self.examples.SIGNED_REVOCATION_LIST)
self.requests_mock.get('%s/v2.0/tokens/revoked' % BASE_URI,
text=self.examples.SIGNED_REVOCATION_LIST)
self.requests.get('%s/v3/auth/tokens' % BASE_URI,
text=self.token_response)
self.requests_mock.get('%s/v3/auth/tokens' % BASE_URI,
text=self.token_response)
self.token_expected_env = dict(EXPECTED_V2_DEFAULT_ENV_RESPONSE)
self.token_expected_env.update(EXPECTED_V3_DEFAULT_ENV_ADDITIONS)
@ -2561,7 +2558,7 @@ class OtherTests(BaseAuthTokenMiddlewareTest):
versions = fixture.DiscoveryList(v2=False, v3_id='v4', href=BASE_URI)
self.set_middleware()
self.requests.get(BASE_URI, json=versions, status_code=300)
self.requests_mock.get(BASE_URI, json=versions, status_code=300)
req = webob.Request.blank('/')
req.headers['X-Auth-Token'] = uuid.uuid4().hex
@ -2589,11 +2586,11 @@ class OtherTests(BaseAuthTokenMiddlewareTest):
def test_default_auth_version(self):
# VERSION_LIST_v3 contains both v2 and v3 version elements
self.requests.get(BASE_URI, json=VERSION_LIST_v3, status_code=300)
self.requests_mock.get(BASE_URI, json=VERSION_LIST_v3, status_code=300)
self._assert_auth_version(None, (3, 0))
# VERSION_LIST_v2 contains only v2 version elements
self.requests.get(BASE_URI, json=VERSION_LIST_v2, status_code=300)
self.requests_mock.get(BASE_URI, json=VERSION_LIST_v2, status_code=300)
self._assert_auth_version(None, (2, 0))
def test_unsupported_auth_version(self):
@ -2620,15 +2617,15 @@ class AuthProtocolLoadingTests(BaseAuthTokenMiddlewareTest):
self.project_id = uuid.uuid4().hex
# first touch is to discover the available versions at the auth_url
self.requests.get(self.AUTH_URL,
json=fixture.DiscoveryList(href=self.DISC_URL),
status_code=300)
self.requests_mock.get(self.AUTH_URL,
json=fixture.DiscoveryList(href=self.DISC_URL),
status_code=300)
# then we do discovery on the URL from the service catalog. In practice
# this is mostly the same URL as before but test the full range.
self.requests.get(self.KEYSTONE_BASE_URL + '/',
json=fixture.DiscoveryList(href=self.CRUD_URL),
status_code=300)
self.requests_mock.get(self.KEYSTONE_BASE_URL + '/',
json=fixture.DiscoveryList(href=self.CRUD_URL),
status_code=300)
def good_request(self, app):
# admin_token is the token that the service will get back from auth
@ -2637,9 +2634,9 @@ class AuthProtocolLoadingTests(BaseAuthTokenMiddlewareTest):
s = admin_token.add_service('identity', name='keystone')
s.add_standard_endpoints(admin=self.KEYSTONE_URL)
self.requests.post(self.DISC_URL + '/v3/auth/tokens',
json=admin_token,
headers={'X-Subject-Token': admin_token_id})
self.requests_mock.post(self.DISC_URL + '/v3/auth/tokens',
json=admin_token,
headers={'X-Subject-Token': admin_token_id})
# user_token is the data from the user's inputted token
user_token_id = uuid.uuid4().hex
@ -2649,9 +2646,9 @@ class AuthProtocolLoadingTests(BaseAuthTokenMiddlewareTest):
request_headers = {'X-Subject-Token': user_token_id,
'X-Auth-Token': admin_token_id}
self.requests.get(self.CRUD_URL + '/v3/auth/tokens',
request_headers=request_headers,
json=user_token)
self.requests_mock.get(self.CRUD_URL + '/v3/auth/tokens',
request_headers=request_headers,
json=user_token)
req = webob.Request.blank('/')
req.headers['X-Auth-Token'] = user_token_id

View File

@ -54,7 +54,7 @@ class S3TokenMiddlewareTestBase(utils.TestCase):
'auth_protocol': self.TEST_PROTOCOL,
}
self.requests = self.useFixture(rm_fixture.Fixture())
self.requests_mock = self.useFixture(rm_fixture.Fixture())
def start_fake_response(self, status, headers):
self.response_status = int(status.split(' ', 1)[0])
@ -67,7 +67,9 @@ class S3TokenMiddlewareTestGood(S3TokenMiddlewareTestBase):
super(S3TokenMiddlewareTestGood, self).setUp()
self.middleware = s3_token.S3Token(FakeApp(), self.conf)
self.requests.post(self.TEST_URL, status_code=201, json=GOOD_RESPONSE)
self.requests_mock.post(self.TEST_URL,
status_code=201,
json=GOOD_RESPONSE)
# Ignore the request and pass to the next middleware in the
# pipeline if no path has been specified.
@ -98,9 +100,9 @@ class S3TokenMiddlewareTestGood(S3TokenMiddlewareTestBase):
self.assertEqual(req.headers['X-Auth-Token'], 'TOKEN_ID')
def test_authorized_http(self):
self.requests.post(self.TEST_URL.replace('https', 'http'),
status_code=201,
json=GOOD_RESPONSE)
self.requests_mock.post(self.TEST_URL.replace('https', 'http'),
status_code=201,
json=GOOD_RESPONSE)
self.middleware = (
s3_token.filter_factory({'auth_protocol': 'http',
@ -153,7 +155,7 @@ class S3TokenMiddlewareTestBad(S3TokenMiddlewareTestBase):
{"message": "EC2 access key not found.",
"code": 401,
"title": "Unauthorized"}}
self.requests.post(self.TEST_URL, status_code=403, json=ret)
self.requests_mock.post(self.TEST_URL, status_code=403, json=ret)
req = webob.Request.blank('/v1/AUTH_cfa/c/o')
req.headers['Authorization'] = 'access:signature'
req.headers['X-Storage-Token'] = 'token'
@ -185,7 +187,9 @@ class S3TokenMiddlewareTestBad(S3TokenMiddlewareTestBase):
self.assertEqual(resp.status_int, s3_invalid_req.status_int)
def test_bad_reply(self):
self.requests.post(self.TEST_URL, status_code=201, text="<badreply>")
self.requests_mock.post(self.TEST_URL,
status_code=201,
text="<badreply>")
req = webob.Request.blank('/v1/AUTH_cfa/c/o')
req.headers['Authorization'] = 'access:signature'