[tempauth] update token if its value changes or expires

In current codes, tempauth token will be updated only when it expires. In
token's lifetime, any changes in tempauth config file doesn't update token,
which means if you update your config file and restart proxy server, the
role in runtime might not change. So a good fix is updating token when
it changes or expires.

fixes bug #1201678

Change-Id: Ieb080e87268e16d0a2e5896431aa41fcedc7a900
This commit is contained in:
Kun Huang 2013-07-16 16:28:32 +08:00
parent 8a8499805b
commit 908e5fcb70
2 changed files with 88 additions and 15 deletions

View File

@ -188,6 +188,19 @@ class TempAuth(object):
env['swift.clean_acl'] = clean_acl env['swift.clean_acl'] = clean_acl
return self.app(env, start_response) return self.app(env, start_response)
def _get_user_groups(self, account, account_user, account_id):
"""
:param account: example: test
:param account_user: example: test:tester
"""
groups = [account, account_user]
groups.extend(self.users[account_user]['groups'])
if '.admin' in groups:
groups.remove('.admin')
groups.append(account_id)
groups = ','.join(groups)
return groups
def get_groups(self, env, token): def get_groups(self, env, token):
""" """
Get groups for the given token. Get groups for the given token.
@ -225,12 +238,7 @@ class TempAuth(object):
s = base64.encodestring(hmac.new(key, msg, sha1).digest()).strip() s = base64.encodestring(hmac.new(key, msg, sha1).digest()).strip()
if s != sign: if s != sign:
return None return None
groups = [account, account_user] groups = self._get_user_groups(account, account_user, account_id)
groups.extend(self.users[account_user]['groups'])
if '.admin' in groups:
groups.remove('.admin')
groups.append(account_id)
groups = ','.join(groups)
return groups return groups
@ -432,6 +440,7 @@ class TempAuth(object):
if self.users[account_user]['key'] != key: if self.users[account_user]['key'] != key:
self.logger.increment('token_denied') self.logger.increment('token_denied')
return HTTPUnauthorized(request=req) return HTTPUnauthorized(request=req)
account_id = self.users[account_user]['url'].rsplit('/', 1)[-1]
# Get memcache client # Get memcache client
memcache_client = cache_from_env(req.environ) memcache_client = cache_from_env(req.environ)
if not memcache_client: if not memcache_client:
@ -445,21 +454,20 @@ class TempAuth(object):
'%s/token/%s' % (self.reseller_prefix, candidate_token) '%s/token/%s' % (self.reseller_prefix, candidate_token)
cached_auth_data = memcache_client.get(memcache_token_key) cached_auth_data = memcache_client.get(memcache_token_key)
if cached_auth_data: if cached_auth_data:
expires, groups = cached_auth_data expires, old_groups = cached_auth_data
if expires > time(): old_groups = old_groups.split(',')
new_groups = self._get_user_groups(account, account_user,
account_id)
if expires > time() and \
set(old_groups) == set(new_groups.split(',')):
token = candidate_token token = candidate_token
# Create a new token if one didn't exist # Create a new token if one didn't exist
if not token: if not token:
# Generate new token # Generate new token
token = '%stk%s' % (self.reseller_prefix, uuid4().hex) token = '%stk%s' % (self.reseller_prefix, uuid4().hex)
expires = time() + self.token_life expires = time() + self.token_life
groups = [account, account_user] groups = self._get_user_groups(account, account_user, account_id)
groups.extend(self.users[account_user]['groups'])
if '.admin' in groups:
groups.remove('.admin')
account_id = self.users[account_user]['url'].rsplit('/', 1)[-1]
groups.append(account_id)
groups = ','.join(groups)
# Save token # Save token
memcache_token_key = '%s/token/%s' % (self.reseller_prefix, token) memcache_token_key = '%s/token/%s' % (self.reseller_prefix, token)
memcache_client.set(memcache_token_key, (expires, groups), memcache_client.set(memcache_token_key, (expires, groups),

View File

@ -490,6 +490,59 @@ class TestAuth(unittest.TestCase):
self.assertEquals(resp.headers['x-storage-url'], self.assertEquals(resp.headers['x-storage-url'],
'fake://somehost:5678/v1/AUTH_test') 'fake://somehost:5678/v1/AUTH_test')
def test_use_old_token_from_memcached(self):
self.test_auth = \
auth.filter_factory({'user_test_tester': 'testing',
'storage_url_scheme': 'fake'})(FakeApp())
req = self._make_request(
'/auth/v1.0',
headers={'X-Auth-User': 'test:tester', 'X-Auth-Key': 'testing'})
req.environ['HTTP_HOST'] = 'somehost:5678'
req.environ['SERVER_NAME'] = 'bob'
req.environ['SERVER_PORT'] = '1234'
req.environ['swift.cache'].set('AUTH_/user/test:tester', 'uuid_token')
req.environ['swift.cache'].set('AUTH_/token/uuid_token',
(time() + 180, 'test,test:tester'))
resp = req.get_response(self.test_auth)
self.assertEquals(resp.status_int, 200)
self.assertEquals(resp.headers['x-auth-token'], 'uuid_token')
def test_old_token_overdate(self):
self.test_auth = \
auth.filter_factory({'user_test_tester': 'testing',
'storage_url_scheme': 'fake'})(FakeApp())
req = self._make_request(
'/auth/v1.0',
headers={'X-Auth-User': 'test:tester', 'X-Auth-Key': 'testing'})
req.environ['HTTP_HOST'] = 'somehost:5678'
req.environ['SERVER_NAME'] = 'bob'
req.environ['SERVER_PORT'] = '1234'
req.environ['swift.cache'].set('AUTH_/user/test:tester', 'uuid_token')
req.environ['swift.cache'].set('AUTH_/token/uuid_token',
(0, 'test,test:tester'))
resp = req.get_response(self.test_auth)
self.assertEquals(resp.status_int, 200)
self.assertNotEquals(resp.headers['x-auth-token'], 'uuid_token')
self.assertEquals(resp.headers['x-auth-token'][:7], 'AUTH_tk')
def test_old_token_with_old_data(self):
self.test_auth = \
auth.filter_factory({'user_test_tester': 'testing',
'storage_url_scheme': 'fake'})(FakeApp())
req = self._make_request(
'/auth/v1.0',
headers={'X-Auth-User': 'test:tester', 'X-Auth-Key': 'testing'})
req.environ['HTTP_HOST'] = 'somehost:5678'
req.environ['SERVER_NAME'] = 'bob'
req.environ['SERVER_PORT'] = '1234'
req.environ['swift.cache'].set('AUTH_/user/test:tester', 'uuid_token')
req.environ['swift.cache'].set('AUTH_/token/uuid_token',
(time() + 99, 'test,test:tester,.role'))
resp = req.get_response(self.test_auth)
self.assertEquals(resp.status_int, 200)
self.assertNotEquals(resp.headers['x-auth-token'], 'uuid_token')
self.assertEquals(resp.headers['x-auth-token'][:7], 'AUTH_tk')
def test_reseller_admin_is_owner(self): def test_reseller_admin_is_owner(self):
orig_authorize = self.test_auth.authorize orig_authorize = self.test_auth.authorize
owner_values = [] owner_values = []
@ -631,6 +684,18 @@ class TestAuth(unittest.TestCase):
resp = self.test_auth.authorize(req) resp = self.test_auth.authorize(req)
self.assertEquals(resp, None) self.assertEquals(resp, None)
def test_get_user_group(self):
app = FakeApp()
ath = auth.filter_factory({})(app)
ath.users = {'test:tester': {'groups': ['.admin']}}
groups = ath._get_user_groups('test', 'test:tester', 'AUTH_test')
self.assertEquals(groups, 'test,test:tester,AUTH_test')
ath.users = {'test:tester': {'groups': []}}
groups = ath._get_user_groups('test', 'test:tester', 'AUTH_test')
self.assertEquals(groups, 'test,test:tester')
class TestParseUserCreation(unittest.TestCase): class TestParseUserCreation(unittest.TestCase):
def test_parse_user_creation(self): def test_parse_user_creation(self):