From cb55e89bf1892c62a981df6b205d41ebfdee65ef Mon Sep 17 00:00:00 2001 From: Clay Gerrard Date: Mon, 8 Sep 2014 12:25:54 -0700 Subject: [PATCH] test tempurl header sanitization priority Change-Id: I0bb3004a717da2f65196bc56b0f7baef49e649e8 --- test/unit/common/middleware/test_tempurl.py | 82 +++++++++++++++++++++ 1 file changed, 82 insertions(+) diff --git a/test/unit/common/middleware/test_tempurl.py b/test/unit/common/middleware/test_tempurl.py index e9d166b5ff..0581077094 100644 --- a/test/unit/common/middleware/test_tempurl.py +++ b/test/unit/common/middleware/test_tempurl.py @@ -663,6 +663,44 @@ class TestTempURL(unittest.TestCase): self.assertEquals( self.app.request.headers['x-remove-this-except-this'], 'value2') + def test_allow_trumps_incoming_header_conflict(self): + self.tempurl = tempurl.filter_factory({ + 'incoming_remove_headers': 'x-conflict-header', + 'incoming_allow_headers': 'x-conflict-header'})(self.auth) + method = 'GET' + expires = int(time() + 86400) + path = '/v1/a/c/o' + key = 'abc' + hmac_body = '%s\n%s\n%s' % (method, expires, path) + sig = hmac.new(key, hmac_body, sha1).hexdigest() + req = self._make_request( + path, keys=[key], + headers={'x-conflict-header': 'value'}, + environ={'QUERY_STRING': 'temp_url_sig=%s&temp_url_expires=%s' % ( + sig, expires)}) + resp = req.get_response(self.tempurl) + self.assertEquals(resp.status_int, 404) + self.assertTrue('x-conflict-header' in self.app.request.headers) + + def test_allow_trumps_incoming_header_startswith_conflict(self): + self.tempurl = tempurl.filter_factory({ + 'incoming_remove_headers': 'x-conflict-header-*', + 'incoming_allow_headers': 'x-conflict-header-*'})(self.auth) + method = 'GET' + expires = int(time() + 86400) + path = '/v1/a/c/o' + key = 'abc' + hmac_body = '%s\n%s\n%s' % (method, expires, path) + sig = hmac.new(key, hmac_body, sha1).hexdigest() + req = self._make_request( + path, keys=[key], + headers={'x-conflict-header-test': 'value'}, + environ={'QUERY_STRING': 'temp_url_sig=%s&temp_url_expires=%s' % ( + sig, expires)}) + resp = req.get_response(self.tempurl) + self.assertEquals(resp.status_int, 404) + self.assertTrue('x-conflict-header-test' in self.app.request.headers) + def test_removed_outgoing_header(self): self.tempurl = tempurl.filter_factory({ 'outgoing_remove_headers': 'x-test-header-one-a'})(self.auth) @@ -701,6 +739,50 @@ class TestTempURL(unittest.TestCase): self.assertTrue('x-test-header-two-a' not in resp.headers) self.assertEquals(resp.headers['x-test-header-two-b'], 'value3') + def test_allow_trumps_outgoing_header_conflict(self): + self.tempurl = tempurl.filter_factory({ + 'outgoing_remove_headers': 'x-conflict-header', + 'outgoing_allow_headers': 'x-conflict-header'})(self.auth) + method = 'GET' + expires = int(time() + 86400) + path = '/v1/a/c/o' + key = 'abc' + hmac_body = '%s\n%s\n%s' % (method, expires, path) + sig = hmac.new(key, hmac_body, sha1).hexdigest() + req = self._make_request( + path, keys=[key], + headers={}, + environ={'QUERY_STRING': 'temp_url_sig=%s&temp_url_expires=%s' % ( + sig, expires)}) + self.tempurl.app = FakeApp(iter([('200 Ok', { + 'X-Conflict-Header': 'value'}, '123')])) + resp = req.get_response(self.tempurl) + self.assertEquals(resp.status_int, 200) + self.assertTrue('x-conflict-header' in resp.headers) + self.assertEqual(resp.headers['x-conflict-header'], 'value') + + def test_allow_trumps_outgoing_header_startswith_conflict(self): + self.tempurl = tempurl.filter_factory({ + 'outgoing_remove_headers': 'x-conflict-header-*', + 'outgoing_allow_headers': 'x-conflict-header-*'})(self.auth) + method = 'GET' + expires = int(time() + 86400) + path = '/v1/a/c/o' + key = 'abc' + hmac_body = '%s\n%s\n%s' % (method, expires, path) + sig = hmac.new(key, hmac_body, sha1).hexdigest() + req = self._make_request( + path, keys=[key], + headers={}, + environ={'QUERY_STRING': 'temp_url_sig=%s&temp_url_expires=%s' % ( + sig, expires)}) + self.tempurl.app = FakeApp(iter([('200 Ok', { + 'X-Conflict-Header-Test': 'value'}, '123')])) + resp = req.get_response(self.tempurl) + self.assertEquals(resp.status_int, 200) + self.assertTrue('x-conflict-header-test' in resp.headers) + self.assertEqual(resp.headers['x-conflict-header-test'], 'value') + def test_get_account(self): self.assertEquals(self.tempurl._get_account({ 'REQUEST_METHOD': 'HEAD', 'PATH_INFO': '/v1/a/c/o'}), 'a')