Merge "Add functional tests for container TempURLs"

This commit is contained in:
Jenkins 2015-02-16 09:53:32 +00:00 committed by Gerrit Code Review
commit bf0e01c488
2 changed files with 175 additions and 1 deletions

View File

@ -582,7 +582,10 @@ class Container(Base):
if self.conn.response.status == 204:
required_fields = [['bytes_used', 'x-container-bytes-used'],
['object_count', 'x-container-object-count']]
optional_fields = [['versions', 'x-versions-location']]
optional_fields = [
['versions', 'x-versions-location'],
['tempurl_key', 'x-container-meta-temp-url-key'],
['tempurl_key2', 'x-container-meta-temp-url-key-2']]
return self.header_fields(required_fields, optional_fields)

View File

@ -2746,6 +2746,177 @@ class TestTempurlUTF8(Base2, TestTempurl):
set_up = False
class TestContainerTempurlEnv(object):
tempurl_enabled = None # tri-state: None initially, then True/False
@classmethod
def setUp(cls):
cls.conn = Connection(tf.config)
cls.conn.authenticate()
if cls.tempurl_enabled is None:
cls.tempurl_enabled = 'tempurl' in cluster_info
if not cls.tempurl_enabled:
return
cls.tempurl_key = Utils.create_name()
cls.tempurl_key2 = Utils.create_name()
cls.account = Account(
cls.conn, tf.config.get('account', tf.config['username']))
cls.account.delete_containers()
cls.container = cls.account.container(Utils.create_name())
if not cls.container.create({
'x-container-meta-temp-url-key': cls.tempurl_key,
'x-container-meta-temp-url-key-2': cls.tempurl_key2}):
raise ResponseError(cls.conn.response)
cls.obj = cls.container.file(Utils.create_name())
cls.obj.write("obj contents")
cls.other_obj = cls.container.file(Utils.create_name())
cls.other_obj.write("other obj contents")
class TestContainerTempurl(Base):
env = TestContainerTempurlEnv
set_up = False
def setUp(self):
super(TestContainerTempurl, self).setUp()
if self.env.tempurl_enabled is False:
raise SkipTest("TempURL not enabled")
elif self.env.tempurl_enabled is not True:
# just some sanity checking
raise Exception(
"Expected tempurl_enabled to be True/False, got %r" %
(self.env.tempurl_enabled,))
expires = int(time.time()) + 86400
sig = self.tempurl_sig(
'GET', expires, self.env.conn.make_path(self.env.obj.path),
self.env.tempurl_key)
self.obj_tempurl_parms = {'temp_url_sig': sig,
'temp_url_expires': str(expires)}
def tempurl_sig(self, method, expires, path, key):
return hmac.new(
key,
'%s\n%s\n%s' % (method, expires, urllib.unquote(path)),
hashlib.sha1).hexdigest()
def test_GET(self):
contents = self.env.obj.read(
parms=self.obj_tempurl_parms,
cfg={'no_auth_token': True})
self.assertEqual(contents, "obj contents")
# GET tempurls also allow HEAD requests
self.assert_(self.env.obj.info(parms=self.obj_tempurl_parms,
cfg={'no_auth_token': True}))
def test_GET_with_key_2(self):
expires = int(time.time()) + 86400
sig = self.tempurl_sig(
'GET', expires, self.env.conn.make_path(self.env.obj.path),
self.env.tempurl_key2)
parms = {'temp_url_sig': sig,
'temp_url_expires': str(expires)}
contents = self.env.obj.read(parms=parms, cfg={'no_auth_token': True})
self.assertEqual(contents, "obj contents")
def test_PUT(self):
new_obj = self.env.container.file(Utils.create_name())
expires = int(time.time()) + 86400
sig = self.tempurl_sig(
'PUT', expires, self.env.conn.make_path(new_obj.path),
self.env.tempurl_key)
put_parms = {'temp_url_sig': sig,
'temp_url_expires': str(expires)}
new_obj.write('new obj contents',
parms=put_parms, cfg={'no_auth_token': True})
self.assertEqual(new_obj.read(), "new obj contents")
# PUT tempurls also allow HEAD requests
self.assert_(new_obj.info(parms=put_parms,
cfg={'no_auth_token': True}))
def test_HEAD(self):
expires = int(time.time()) + 86400
sig = self.tempurl_sig(
'HEAD', expires, self.env.conn.make_path(self.env.obj.path),
self.env.tempurl_key)
head_parms = {'temp_url_sig': sig,
'temp_url_expires': str(expires)}
self.assert_(self.env.obj.info(parms=head_parms,
cfg={'no_auth_token': True}))
# HEAD tempurls don't allow PUT or GET requests, despite the fact that
# PUT and GET tempurls both allow HEAD requests
self.assertRaises(ResponseError, self.env.other_obj.read,
cfg={'no_auth_token': True},
parms=self.obj_tempurl_parms)
self.assert_status([401])
self.assertRaises(ResponseError, self.env.other_obj.write,
'new contents',
cfg={'no_auth_token': True},
parms=self.obj_tempurl_parms)
self.assert_status([401])
def test_different_object(self):
contents = self.env.obj.read(
parms=self.obj_tempurl_parms,
cfg={'no_auth_token': True})
self.assertEqual(contents, "obj contents")
self.assertRaises(ResponseError, self.env.other_obj.read,
cfg={'no_auth_token': True},
parms=self.obj_tempurl_parms)
self.assert_status([401])
def test_changing_sig(self):
contents = self.env.obj.read(
parms=self.obj_tempurl_parms,
cfg={'no_auth_token': True})
self.assertEqual(contents, "obj contents")
parms = self.obj_tempurl_parms.copy()
if parms['temp_url_sig'][0] == 'a':
parms['temp_url_sig'] = 'b' + parms['temp_url_sig'][1:]
else:
parms['temp_url_sig'] = 'a' + parms['temp_url_sig'][1:]
self.assertRaises(ResponseError, self.env.obj.read,
cfg={'no_auth_token': True},
parms=parms)
self.assert_status([401])
def test_changing_expires(self):
contents = self.env.obj.read(
parms=self.obj_tempurl_parms,
cfg={'no_auth_token': True})
self.assertEqual(contents, "obj contents")
parms = self.obj_tempurl_parms.copy()
if parms['temp_url_expires'][-1] == '0':
parms['temp_url_expires'] = parms['temp_url_expires'][:-1] + '1'
else:
parms['temp_url_expires'] = parms['temp_url_expires'][:-1] + '0'
self.assertRaises(ResponseError, self.env.obj.read,
cfg={'no_auth_token': True},
parms=parms)
self.assert_status([401])
class TestContainerTempurlUTF8(Base2, TestContainerTempurl):
set_up = False
class TestSloTempurlEnv(object):
enabled = None # tri-state: None initially, then True/False