Reject names with NULL characters

Unfortunately, SQLite truncates strings with null characters.
Additionally, XML pretty much hates them too.

Change-Id: Id9a8eaa27b841db6350d6959c202d3e3d6462b35
This commit is contained in:
gholt 2013-01-12 06:54:17 +00:00
parent b8626f9667
commit 592d895e31
9 changed files with 43 additions and 12 deletions

View File

@ -334,7 +334,7 @@ class AccountController(object):
req = Request(env)
self.logger.txn_id = req.headers.get('x-trans-id', None)
if not check_utf8(req.path_info):
res = HTTPPreconditionFailed(body='Invalid UTF8')
res = HTTPPreconditionFailed(body='Invalid UTF8 or contains NULL')
else:
try:
# disallow methods which are not publicly accessible

View File

@ -182,10 +182,12 @@ def check_float(string):
def check_utf8(string):
"""
Validate if a string is valid UTF-8 str or unicode
Validate if a string is valid UTF-8 str or unicode and that it
does not contain any null character.
:param string: string to be validated
:returns: True if the string is valid utf-8 str or unicode, False otherwise
:returns: True if the string is valid utf-8 str or unicode and
contains no null characters, False otherwise
"""
if not string:
return False
@ -194,7 +196,7 @@ def check_utf8(string):
string.encode('utf-8')
else:
string.decode('UTF-8')
return True
return '\x00' not in string
# If string is unicode, decode() will raise UnicodeEncodeError
# So, we should catch both UnicodeDecodeError & UnicodeEncodeError
except UnicodeError:

View File

@ -461,7 +461,7 @@ class ContainerController(object):
req = Request(env)
self.logger.txn_id = req.headers.get('x-trans-id', None)
if not check_utf8(req.path_info):
res = HTTPPreconditionFailed(body='Invalid UTF8')
res = HTTPPreconditionFailed(body='Invalid UTF8 or contains NULL')
else:
try:
# disallow methods which have not been marked 'public'

View File

@ -880,7 +880,7 @@ class ObjectController(object):
self.logger.txn_id = req.headers.get('x-trans-id', None)
if not check_utf8(req.path_info):
res = HTTPPreconditionFailed(body='Invalid UTF8')
res = HTTPPreconditionFailed(body='Invalid UTF8 or contains NULL')
else:
try:
# disallow methods which have not been marked 'public'

View File

@ -147,7 +147,8 @@ class Application(object):
req = self.update_request(Request(env))
return self.handle_request(req)(env, start_response)
except UnicodeError:
err = HTTPPreconditionFailed(request=req, body='Invalid UTF8')
err = HTTPPreconditionFailed(
request=req, body='Invalid UTF8 or contains NULL')
return err(env, start_response)
except (Exception, Timeout):
start_response('500 Server Error',
@ -177,11 +178,12 @@ class Application(object):
try:
if not check_utf8(req.path_info):
self.logger.increment('errors')
return HTTPPreconditionFailed(request=req,
body='Invalid UTF8')
return HTTPPreconditionFailed(
request=req, body='Invalid UTF8 or contains NULL')
except UnicodeError:
self.logger.increment('errors')
return HTTPPreconditionFailed(request=req, body='Invalid UTF8')
return HTTPPreconditionFailed(
request=req, body='Invalid UTF8 or contains NULL')
try:
controller, path_parts = self.get_controller(req.path)

View File

@ -210,7 +210,7 @@ class TestAccount(Base):
container = self.env.account.container(invalid_utf8)
self.assert_(not container.create(cfg={'no_path_quote': True}))
self.assert_status(412)
self.assert_body('Invalid UTF8')
self.assert_body('Invalid UTF8 or contains NULL')
def testVersionOnlyPath(self):
self.env.account.conn.make_request('PUT',

View File

@ -552,6 +552,18 @@ class TestContainer(unittest.TestCase):
self.assertEquals(resp.getheader('Content-Type'),
'text/html; charset=UTF-8')
def test_null_name(self):
if skip:
raise SkipTest
def put(url, token, parsed, conn):
conn.request('PUT', '%s/abc%%00def' % parsed.path, '',
{'X-Auth-Token': token})
return check_response(conn)
resp = retry(put)
self.assertEquals(resp.read(), 'Invalid UTF8 or contains NULL')
self.assertEquals(resp.status, 412)
if __name__ == '__main__':
unittest.main()

View File

@ -578,6 +578,18 @@ class TestObject(unittest.TestCase):
self.assertEquals(resp.getheader('Content-Type'),
'text/html; charset=UTF-8')
def test_null_name(self):
if skip:
raise SkipTest
def put(url, token, parsed, conn):
conn.request('PUT', '%s/%s/abc%%00def' % (parsed.path,
self.container), 'test', {'X-Auth-Token': token})
return check_response(conn)
resp = retry(put)
self.assertEquals(resp.read(), 'Invalid UTF8 or contains NULL')
self.assertEquals(resp.status, 412)
if __name__ == '__main__':
unittest.main()

View File

@ -188,11 +188,14 @@ class TestConstraints(unittest.TestCase):
unicode_sample = u'\uc77c\uc601'
valid_utf8_str = unicode_sample.encode('utf-8')
invalid_utf8_str = unicode_sample.encode('utf-8')[::-1]
unicode_with_null = u'abc\u0000def'
utf8_with_null = unicode_with_null.encode('utf-8')
for false_argument in [None,
'',
invalid_utf8_str,
]:
unicode_with_null,
utf8_with_null]:
self.assertFalse(constraints.check_utf8(false_argument))
for true_argument in ['this is ascii and utf-8, too',