From 28c0da29b03e61f1018f3a4da0ca167cd1c0f75f Mon Sep 17 00:00:00 2001 From: Samuel Merritt Date: Thu, 6 Mar 2014 13:11:03 -0800 Subject: [PATCH] Functional tests for tempurl Change-Id: I578be387fe6119a86a8abc544b3cbe210ddca3c1 --- test/functional/swift_test_client.py | 26 +++- test/functional/tests.py | 194 +++++++++++++++++++++++++-- 2 files changed, 204 insertions(+), 16 deletions(-) diff --git a/test/functional/swift_test_client.py b/test/functional/swift_test_client.py index 277973442c..b4dcb56cf9 100644 --- a/test/functional/swift_test_client.py +++ b/test/functional/swift_test_client.py @@ -178,6 +178,18 @@ class Connection(object): self.http_connect() return self.storage_url, self.storage_token + def cluster_info(self): + """ + Retrieve the data in /info, or {} on 404 + """ + status = self.make_request('GET', '/info', + cfg={'absolute_path': True}) + if status == 404: + return {} + if not 200 <= status <= 299: + raise ResponseError(self.response, 'GET', '/info') + return json.loads(self.response.read()) + def http_connect(self): self.connection = self.conn_class(self.storage_host, port=self.storage_port) @@ -208,8 +220,8 @@ class Connection(object): def make_request(self, method, path=[], data='', hdrs={}, parms={}, cfg={}): - if not cfg.get('verbatim_path'): - # Set verbatim_path=True to make a request to exactly the given + if not cfg.get('absolute_path'): + # Set absolute_path=True to make a request to exactly the given # path, not storage path + given path. Useful for # non-account/container/object requests. path = self.make_path(path, cfg=cfg) @@ -340,6 +352,16 @@ class Account(Base): self.conn = conn self.name = str(name) + def update_metadata(self, metadata={}, cfg={}): + headers = dict(("X-Account-Meta-%s" % k, v) + for k, v in metadata.items()) + + self.conn.make_request('POST', self.path, hdrs=headers, cfg=cfg) + if not 200 <= self.conn.response.status <= 299: + raise ResponseError(self.conn.response, 'POST', + self.conn.make_path(self.path)) + return True + def container(self, container_name): return Container(self.conn, self.name, container_name) diff --git a/test/functional/tests.py b/test/functional/tests.py index f222d4e9b8..556c5e48e4 100644 --- a/test/functional/tests.py +++ b/test/functional/tests.py @@ -16,14 +16,16 @@ from datetime import datetime import hashlib +import hmac import json import locale import random import StringIO import time import threading -import uuid import unittest +import urllib +import uuid from nose import SkipTest from ConfigParser import ConfigParser @@ -1837,19 +1839,8 @@ class TestSloEnv(object): cls.conn.authenticate() if cls.slo_enabled is None: - status = cls.conn.make_request('GET', '/info', - cfg={'verbatim_path': True}) - if not (200 <= status <= 299): - # Can't tell if SLO is enabled or not since we're running - # against an old cluster, so let's skip the tests instead of - # possibly having spurious failures. - cls.slo_enabled = False - else: - # Don't bother looking for ValueError here. If something is - # responding to a GET /info request with invalid JSON, then - # the cluster is broken and a test failure will let us know. - cluster_info = json.loads(cls.conn.response.read()) - cls.slo_enabled = 'slo' in cluster_info + cluster_info = cls.conn.cluster_info() + cls.slo_enabled = 'slo' in cluster_info if not cls.slo_enabled: return @@ -2187,5 +2178,180 @@ class TestObjectVersioningUTF8(Base2, TestObjectVersioning): set_up = False +class TestTempurlEnv(object): + tempurl_enabled = None # tri-state: None initially, then True/False + + @classmethod + def setUp(cls): + cls.conn = Connection(config) + cls.conn.authenticate() + + if cls.tempurl_enabled is None: + cluster_info = cls.conn.cluster_info() + cls.tempurl_enabled = 'tempurl' in cluster_info + if not cls.tempurl_enabled: + return + cls.tempurl_methods = cluster_info['tempurl']['methods'] + + cls.tempurl_key = Utils.create_name() + cls.tempurl_key2 = Utils.create_name() + + cls.account = Account( + cls.conn, config.get('account', config['username'])) + cls.account.delete_containers() + cls.account.update_metadata({ + 'temp-url-key': cls.tempurl_key, + 'temp-url-key-2': cls.tempurl_key2 + }) + + cls.container = cls.account.container(Utils.create_name()) + if not cls.container.create(): + raise ResponseError(cls.conn.response) + + cls.obj = cls.container.file(Utils.create_name()) + cls.obj.write("obj contents") + cls.other_obj = cls.container.file(Utils.create_name()) + cls.other_obj.write("other obj contents") + + +class TestTempurl(Base): + env = TestTempurlEnv + set_up = False + + def setUp(self): + super(TestTempurl, self).setUp() + if self.env.tempurl_enabled is False: + raise SkipTest("TempURL not enabled") + elif self.env.tempurl_enabled is not True: + # just some sanity checking + raise Exception( + "Expected tempurl_enabled to be True/False, got %r" % + (self.env.tempurl_enabled,)) + + expires = int(time.time()) + 86400 + sig = self.tempurl_sig( + 'GET', expires, self.env.conn.make_path(self.env.obj.path), + self.env.tempurl_key) + self.obj_tempurl_parms = {'temp_url_sig': sig, + 'temp_url_expires': str(expires)} + + def tempurl_sig(self, method, expires, path, key): + return hmac.new( + key, + '%s\n%s\n%s' % (method, expires, urllib.unquote(path)), + hashlib.sha1).hexdigest() + + def test_GET(self): + contents = self.env.obj.read( + parms=self.obj_tempurl_parms, + cfg={'no_auth_token': True}) + self.assertEqual(contents, "obj contents") + + # GET tempurls also allow HEAD requests + self.assert_(self.env.obj.info(parms=self.obj_tempurl_parms, + cfg={'no_auth_token': True})) + + def test_GET_with_key_2(self): + expires = int(time.time()) + 86400 + sig = self.tempurl_sig( + 'GET', expires, self.env.conn.make_path(self.env.obj.path), + self.env.tempurl_key2) + parms = {'temp_url_sig': sig, + 'temp_url_expires': str(expires)} + + contents = self.env.obj.read(parms=parms, cfg={'no_auth_token': True}) + self.assertEqual(contents, "obj contents") + + def test_PUT(self): + new_obj = self.env.container.file(Utils.create_name()) + + expires = int(time.time()) + 86400 + sig = self.tempurl_sig( + 'PUT', expires, self.env.conn.make_path(new_obj.path), + self.env.tempurl_key) + put_parms = {'temp_url_sig': sig, + 'temp_url_expires': str(expires)} + + new_obj.write('new obj contents', + parms=put_parms, cfg={'no_auth_token': True}) + self.assertEqual(new_obj.read(), "new obj contents") + + # PUT tempurls also allow HEAD requests + self.assert_(new_obj.info(parms=put_parms, + cfg={'no_auth_token': True})) + + def test_HEAD(self): + expires = int(time.time()) + 86400 + sig = self.tempurl_sig( + 'HEAD', expires, self.env.conn.make_path(self.env.obj.path), + self.env.tempurl_key) + head_parms = {'temp_url_sig': sig, + 'temp_url_expires': str(expires)} + + self.assert_(self.env.obj.info(parms=head_parms, + cfg={'no_auth_token': True})) + # HEAD tempurls don't allow PUT or GET requests, despite the fact that + # PUT and GET tempurls both allow HEAD requests + self.assertRaises(ResponseError, self.env.other_obj.read, + cfg={'no_auth_token': True}, + parms=self.obj_tempurl_parms) + self.assert_status([401]) + + self.assertRaises(ResponseError, self.env.other_obj.write, + 'new contents', + cfg={'no_auth_token': True}, + parms=self.obj_tempurl_parms) + self.assert_status([401]) + + def test_different_object(self): + contents = self.env.obj.read( + parms=self.obj_tempurl_parms, + cfg={'no_auth_token': True}) + self.assertEqual(contents, "obj contents") + + self.assertRaises(ResponseError, self.env.other_obj.read, + cfg={'no_auth_token': True}, + parms=self.obj_tempurl_parms) + self.assert_status([401]) + + def test_changing_sig(self): + contents = self.env.obj.read( + parms=self.obj_tempurl_parms, + cfg={'no_auth_token': True}) + self.assertEqual(contents, "obj contents") + + parms = self.obj_tempurl_parms.copy() + if parms['temp_url_sig'][0] == 'a': + parms['temp_url_sig'] = 'b' + parms['temp_url_sig'][1:] + else: + parms['temp_url_sig'] = 'a' + parms['temp_url_sig'][1:] + + self.assertRaises(ResponseError, self.env.obj.read, + cfg={'no_auth_token': True}, + parms=parms) + self.assert_status([401]) + + def test_changing_expires(self): + contents = self.env.obj.read( + parms=self.obj_tempurl_parms, + cfg={'no_auth_token': True}) + self.assertEqual(contents, "obj contents") + + parms = self.obj_tempurl_parms.copy() + if parms['temp_url_expires'][-1] == '0': + parms['temp_url_expires'] = parms['temp_url_expires'][:-1] + '1' + else: + parms['temp_url_expires'] = parms['temp_url_expires'][:-1] + '0' + + self.assertRaises(ResponseError, self.env.obj.read, + cfg={'no_auth_token': True}, + parms=parms) + self.assert_status([401]) + + +class TestTempurlUTF8(Base2, TestTempurl): + set_up = False + + if __name__ == '__main__': unittest.main()