py3: Port common/swob

Change-Id: I31408f525ba9836f634a35581d4aee6fa2c9428f
This commit is contained in:
Tim Burke 2018-06-20 16:50:51 -07:00
parent 0694e51eb7
commit 2b19c26498
3 changed files with 171 additions and 149 deletions

View File

@ -128,6 +128,8 @@ class _UTC(tzinfo):
def tzname(self, dt):
return 'UTC'
UTC = _UTC()
@ -324,7 +326,7 @@ def _resp_body_property():
def getter(self):
if not self._body:
if not self._app_iter:
return ''
return b''
with closing_if_possible(self._app_iter):
self._body = b''.join(self._app_iter)
self._app_iter = None
@ -333,7 +335,7 @@ def _resp_body_property():
def setter(self, value):
if isinstance(value, six.text_type):
value = value.encode('utf-8')
if isinstance(value, str):
if isinstance(value, six.binary_type):
self.content_length = len(value)
self._app_iter = None
self._body = value
@ -739,8 +741,10 @@ def _req_environ_property(environ_field):
return self.environ.get(environ_field, None)
def setter(self, value):
if isinstance(value, six.text_type):
if six.PY2 and isinstance(value, six.text_type):
self.environ[environ_field] = value.encode('utf-8')
elif not six.PY2 and isinstance(value, six.binary_type):
self.environ[environ_field] = value.decode('latin1')
else:
self.environ[environ_field] = value
@ -760,6 +764,8 @@ def _req_body_property():
return body
def setter(self, value):
if not isinstance(value, six.binary_type):
value = value.encode('utf8')
self.environ['wsgi.input'] = WsgiBytesIO(value)
self.environ['CONTENT_LENGTH'] = str(len(value))
@ -854,8 +860,11 @@ class Request(object):
"""
headers = headers or {}
environ = environ or {}
if isinstance(path, six.text_type):
if six.PY2 and isinstance(path, six.text_type):
path = path.encode('utf-8')
elif not six.PY2 and isinstance(path, six.binary_type):
path = path.decode('latin1')
parsed_path = urllib.parse.urlparse(path)
server_name = 'localhost'
if parsed_path.netloc:
@ -867,8 +876,7 @@ class Request(object):
'https': 443}.get(parsed_path.scheme, 80)
if parsed_path.scheme and parsed_path.scheme not in ['http', 'https']:
raise TypeError('Invalid scheme: %s' % parsed_path.scheme)
path_info = urllib.parse.unquote(
parsed_path.path.decode('utf8') if six.PY3 else parsed_path.path)
path_info = urllib.parse.unquote(parsed_path.path)
env = {
'REQUEST_METHOD': 'GET',
'SCRIPT_NAME': '',
@ -886,6 +894,8 @@ class Request(object):
}
env.update(environ)
if body is not None:
if not isinstance(body, six.binary_type):
body = body.encode('utf8')
env['wsgi.input'] = WsgiBytesIO(body)
env['CONTENT_LENGTH'] = str(len(body))
elif 'wsgi.input' not in env:
@ -1093,19 +1103,20 @@ def content_range_header_value(start, stop, size):
def content_range_header(start, stop, size):
return "Content-Range: " + content_range_header_value(start, stop, size)
value = content_range_header_value(start, stop, size)
return b"Content-Range: " + value.encode('ascii')
def multi_range_iterator(ranges, content_type, boundary, size, sub_iter_gen):
for start, stop in ranges:
yield ''.join(['--', boundary, '\r\n',
'Content-Type: ', content_type, '\r\n'])
yield content_range_header(start, stop, size) + '\r\n\r\n'
yield b''.join([b'--', boundary, b'\r\n',
b'Content-Type: ', content_type, b'\r\n'])
yield content_range_header(start, stop, size) + b'\r\n\r\n'
sub_iter = sub_iter_gen(start, stop)
for chunk in sub_iter:
yield chunk
yield '\r\n'
yield '--' + boundary + '--'
yield b'\r\n'
yield b'--' + boundary + b'--'
class Response(object):
@ -1138,7 +1149,7 @@ class Response(object):
self.app_iter = app_iter
self.response_iter = None
self.status = status
self.boundary = "%.32x" % random.randint(0, 256 ** 16)
self.boundary = b"%.32x" % random.randint(0, 256 ** 16)
if request:
self.environ = request.environ
else:
@ -1185,20 +1196,20 @@ class Response(object):
"""
content_size = self.content_length
content_type = self.headers.get('content-type')
self.content_type = ''.join(['multipart/byteranges;',
'boundary=', self.boundary])
content_type = self.headers['content-type'].encode('utf8')
self.content_type = b''.join([b'multipart/byteranges;',
b'boundary=', self.boundary])
# This section calculates the total size of the response.
section_header_fixed_len = (
section_header_fixed_len = sum([
# --boundary\r\n
len(self.boundary) + 4
2, len(self.boundary), 2,
# Content-Type: <type>\r\n
+ len('Content-Type: ') + len(content_type) + 2
len('Content-Type: '), len(content_type), 2,
# Content-Range: <value>\r\n; <value> accounted for later
+ len('Content-Range: ') + 2
len('Content-Range: '), 2,
# \r\n at end of headers
+ 2)
2])
body_size = 0
for start, end in ranges:
@ -1263,11 +1274,11 @@ class Response(object):
self.status = empty_resp
self.content_length = 0
close_if_possible(app_iter)
return ['']
return [b'']
if self.request and self.request.method == 'HEAD':
# We explicitly do NOT want to set self.content_length to 0 here
return ['']
return [b'']
if self.conditional_response and self.request and \
self.request.range and self.request.range.ranges and \
@ -1345,9 +1356,10 @@ class Response(object):
body = '<html><h1>%s</h1><p>%s</p></html>' % (
title,
exp % defaultdict(lambda: 'unknown', self.__dict__))
body = body.encode('utf8')
self.content_length = len(body)
return [body]
return ['']
return [b'']
def fix_conditional_response(self):
"""
@ -1460,6 +1472,8 @@ class StatusMap(object):
"""
def __getitem__(self, key):
return partial(HTTPException, status=key)
status_map = StatusMap()

View File

@ -20,6 +20,7 @@ import unittest
import re
import time
import six
from six import BytesIO
from six.moves.urllib.parse import quote
@ -391,7 +392,7 @@ class TestRequest(unittest.TestCase):
'/', environ={'REQUEST_METHOD': 'POST'},
headers={'Content-Type': 'text/plain'}, body='hi')
self.assertEqual(req.path_info, '/')
self.assertEqual(req.body, 'hi')
self.assertEqual(req.body, b'hi')
self.assertEqual(req.headers['Content-Type'], 'text/plain')
self.assertEqual(req.method, 'POST')
@ -500,7 +501,7 @@ class TestRequest(unittest.TestCase):
'wsgi.input': BytesIO(b'')},
headers={'Content-Type': 'text/plain'}, body='hi')
self.assertEqual(req.path_info, '/')
self.assertEqual(req.body, 'hi')
self.assertEqual(req.body, b'hi')
self.assertEqual(req.headers['Content-Type'], 'text/plain')
self.assertEqual(req.method, 'POST')
body_file = BytesIO(b'asdf')
@ -614,12 +615,12 @@ class TestRequest(unittest.TestCase):
def test_get_response(self):
def test_app(environ, start_response):
start_response('200 OK', [])
return ['hi']
return [b'hi']
req = swift.common.swob.Request.blank('/')
resp = req.get_response(test_app)
self.assertEqual(resp.status_int, 200)
self.assertEqual(resp.body, 'hi')
self.assertEqual(resp.body, b'hi')
def test_401_unauthorized(self):
# No request environment
@ -741,14 +742,14 @@ class TestRequest(unittest.TestCase):
def test_properties(self):
req = swift.common.swob.Request.blank('/hi/there', body='hi')
self.assertEqual(req.body, 'hi')
self.assertEqual(req.body, b'hi')
self.assertEqual(req.content_length, 2)
req.remote_addr = 'something'
self.assertEqual(req.environ['REMOTE_ADDR'], 'something')
req.body = 'whatever'
self.assertEqual(req.content_length, 8)
self.assertEqual(req.body, 'whatever')
self.assertEqual(req.body, b'whatever')
self.assertEqual(req.method, 'GET')
req.range = 'bytes=1-7'
@ -897,7 +898,12 @@ class TestRequest(unittest.TestCase):
def test_unicode_query(self):
req = swift.common.swob.Request.blank(u'/')
req.query_string = u'x=\u2661'
self.assertEqual(req.params['x'], u'\u2661'.encode('utf-8'))
encoded = u'\u2661'.encode('utf-8')
if six.PY2:
self.assertEqual(req.params['x'], encoded)
else:
# XXX should this be latin1?
self.assertEqual(req.params['x'], encoded.decode('utf8'))
def test_url2(self):
pi = '/hi/there'
@ -1020,8 +1026,8 @@ class TestStatusMap(unittest.TestCase):
resp = resp_cls()
self.assertEqual(resp.status_int, 404)
self.assertEqual(resp.title, 'Not Found')
body = ''.join(resp({}, start_response))
self.assertTrue('The resource could not be found.' in body)
body = b''.join(resp({}, start_response))
self.assertIn(b'The resource could not be found.', body)
self.assertEqual(response_args[0], '404 Not Found')
headers = dict(response_args[1])
self.assertEqual(headers['Content-Type'], 'text/html; charset=UTF-8')
@ -1032,7 +1038,7 @@ class TestResponse(unittest.TestCase):
def _get_response(self):
def test_app(environ, start_response):
start_response('200 OK', [])
return ['hi']
return [b'hi']
req = swift.common.swob.Request.blank('/')
return req.get_response(test_app)
@ -1054,7 +1060,7 @@ class TestResponse(unittest.TestCase):
def test_empty_body(self):
resp = self._get_response()
resp.body = ''
self.assertEqual(resp.body, '')
self.assertEqual(resp.body, b'')
def test_unicode_body(self):
resp = self._get_response()
@ -1070,14 +1076,14 @@ class TestResponse(unittest.TestCase):
"""
def test_app(environ, start_response):
start_response('200 OK', [])
return ['hi']
return [b'hi']
req = swift.common.swob.Request.blank('/')
req.method = 'HEAD'
status, headers, app_iter = req.call_application(test_app)
resp = swift.common.swob.Response(status=status, headers=dict(headers),
app_iter=app_iter)
output_iter = resp(req.environ, lambda *_: None)
self.assertEqual(list(output_iter), [''])
self.assertEqual(list(output_iter), [b''])
def test_call_preserves_closeability(self):
def test_app(environ, start_response):
@ -1093,7 +1099,8 @@ class TestResponse(unittest.TestCase):
self.assertEqual('igloo', next(iterator))
self.assertEqual('shindig', next(iterator))
app_iter.close()
self.assertRaises(StopIteration, iterator.next)
with self.assertRaises(StopIteration):
next(iterator)
def test_call_finds_nonempty_chunk(self):
def test_app(environ, start_response):
@ -1127,7 +1134,7 @@ class TestResponse(unittest.TestCase):
resp = self._get_response()
resp.location = '/something'
# read response
''.join(resp(req.environ, start_response))
b''.join(resp(req.environ, start_response))
self.assertEqual(resp.location, 'http://somehost/something')
req = swift.common.swob.Request.blank(
@ -1135,7 +1142,7 @@ class TestResponse(unittest.TestCase):
resp = self._get_response()
resp.location = '/something'
# read response
''.join(resp(req.environ, start_response))
b''.join(resp(req.environ, start_response))
self.assertEqual(resp.location, 'http://somehost/something')
req = swift.common.swob.Request.blank(
@ -1144,7 +1151,7 @@ class TestResponse(unittest.TestCase):
resp = self._get_response()
resp.location = '/something'
# read response
''.join(resp(req.environ, start_response))
b''.join(resp(req.environ, start_response))
self.assertEqual(resp.location, 'http://somehost:443/something')
req = swift.common.swob.Request.blank(
@ -1153,7 +1160,7 @@ class TestResponse(unittest.TestCase):
resp = self._get_response()
resp.location = '/something'
# read response
''.join(resp(req.environ, start_response))
b''.join(resp(req.environ, start_response))
self.assertEqual(resp.location, 'https://somehost/something')
def test_location_rewrite_no_host(self):
@ -1165,7 +1172,7 @@ class TestResponse(unittest.TestCase):
resp = self._get_response()
resp.location = '/something'
# read response
''.join(resp(req.environ, start_response))
b''.join(resp(req.environ, start_response))
self.assertEqual(resp.location, 'http://local/something')
req = swift.common.swob.Request.blank(
@ -1174,7 +1181,7 @@ class TestResponse(unittest.TestCase):
resp = self._get_response()
resp.location = '/something'
# read response
''.join(resp(req.environ, start_response))
b''.join(resp(req.environ, start_response))
self.assertEqual(resp.location, 'http://local:81/something')
def test_location_no_rewrite(self):
@ -1185,7 +1192,7 @@ class TestResponse(unittest.TestCase):
resp = self._get_response()
resp.location = 'http://www.google.com/'
# read response
''.join(resp(req.environ, start_response))
b''.join(resp(req.environ, start_response))
self.assertEqual(resp.location, 'http://www.google.com/')
def test_location_no_rewrite_when_told_not_to(self):
@ -1198,21 +1205,21 @@ class TestResponse(unittest.TestCase):
resp = self._get_response()
resp.location = '/something'
# read response
''.join(resp(req.environ, start_response))
b''.join(resp(req.environ, start_response))
self.assertEqual(resp.location, '/something')
def test_app_iter(self):
def start_response(env, headers):
pass
resp = self._get_response()
resp.app_iter = ['a', 'b', 'c']
body = ''.join(resp({}, start_response))
self.assertEqual(body, 'abc')
resp.app_iter = [b'a', b'b', b'c']
body = b''.join(resp({}, start_response))
self.assertEqual(body, b'abc')
def test_multi_ranges_wo_iter_ranges(self):
def test_app(environ, start_response):
start_response('200 OK', [('Content-Length', '10')])
return ['1234567890']
return [b'1234567890']
req = swift.common.swob.Request.blank(
'/', headers={'Range': 'bytes=0-9,10-19,20-29'})
@ -1222,7 +1229,7 @@ class TestResponse(unittest.TestCase):
resp.content_length = 10
# read response
''.join(resp._response_iter(resp.app_iter, ''))
b''.join(resp._response_iter(resp.app_iter, b''))
self.assertEqual(resp.status, '200 OK')
self.assertEqual(10, resp.content_length)
@ -1230,7 +1237,7 @@ class TestResponse(unittest.TestCase):
def test_single_range_wo_iter_range(self):
def test_app(environ, start_response):
start_response('200 OK', [('Content-Length', '10')])
return ['1234567890']
return [b'1234567890']
req = swift.common.swob.Request.blank(
'/', headers={'Range': 'bytes=0-9'})
@ -1240,7 +1247,7 @@ class TestResponse(unittest.TestCase):
resp.content_length = 10
# read response
''.join(resp._response_iter(resp.app_iter, ''))
b''.join(resp._response_iter(resp.app_iter, b''))
self.assertEqual(resp.status, '200 OK')
self.assertEqual(10, resp.content_length)
@ -1248,7 +1255,7 @@ class TestResponse(unittest.TestCase):
def test_multi_range_body(self):
def test_app(environ, start_response):
start_response('200 OK', [('Content-Length', '4')])
return ['abcd']
return [b'abcd']
req = swift.common.swob.Request.blank(
'/', headers={'Range': 'bytes=0-9,10-19,20-29'})
@ -1258,29 +1265,29 @@ class TestResponse(unittest.TestCase):
resp.content_length = 100
resp.content_type = 'text/plain; charset=utf8'
content = ''.join(resp._response_iter(None,
('0123456789112345678'
'92123456789')))
content = b''.join(resp._response_iter(None,
(b'0123456789112345678'
b'92123456789')))
self.assertTrue(re.match(('--[a-f0-9]{32}\r\n'
'Content-Type: text/plain; charset=utf8\r\n'
'Content-Range: bytes '
'0-9/100\r\n\r\n0123456789\r\n'
'--[a-f0-9]{32}\r\n'
'Content-Type: text/plain; charset=utf8\r\n'
'Content-Range: bytes '
'10-19/100\r\n\r\n1123456789\r\n'
'--[a-f0-9]{32}\r\n'
'Content-Type: text/plain; charset=utf8\r\n'
'Content-Range: bytes '
'20-29/100\r\n\r\n2123456789\r\n'
'--[a-f0-9]{32}--'), content))
self.assertTrue(re.match(br'--([a-f0-9]{32})\r\n'
br'Content-Type: text/plain; charset=utf8\r\n'
br'Content-Range: bytes '
br'0-9/100\r\n\r\n0123456789\r\n'
br'--\1\r\n'
br'Content-Type: text/plain; charset=utf8\r\n'
br'Content-Range: bytes '
br'10-19/100\r\n\r\n1123456789\r\n'
br'--\1\r\n'
br'Content-Type: text/plain; charset=utf8\r\n'
br'Content-Range: bytes '
br'20-29/100\r\n\r\n2123456789\r\n'
br'--\1--', content))
def test_multi_response_iter(self):
def test_app(environ, start_response):
start_response('200 OK', [('Content-Length', '10'),
('Content-Type', 'application/xml')])
return ['0123456789']
return [b'0123456789']
app_iter_ranges_args = []
@ -1289,12 +1296,12 @@ class TestResponse(unittest.TestCase):
app_iter_ranges_args.append((ranges, content_type, boundary,
size))
for i in range(3):
yield str(i) + 'fun'
yield (str(i) + 'fun').encode('ascii')
yield boundary
def __iter__(self):
for i in range(3):
yield str(i) + 'fun'
yield (str(i) + 'fun').encode('ascii')
req = swift.common.swob.Request.blank(
'/', headers={'Range': 'bytes=1-5,8-11'})
@ -1303,18 +1310,18 @@ class TestResponse(unittest.TestCase):
resp.conditional_response = True
resp.content_length = 12
content = ''.join(resp._response_iter(App_iter(), ''))
content = b''.join(resp._response_iter(App_iter(), b''))
boundary = content[-32:]
self.assertEqual(content[:-32], '0fun1fun2fun')
self.assertEqual(content[:-32], b'0fun1fun2fun')
self.assertEqual(app_iter_ranges_args,
[([(1, 6), (8, 12)], 'application/xml',
[([(1, 6), (8, 12)], b'application/xml',
boundary, 12)])
def test_range_body(self):
def test_app(environ, start_response):
start_response('200 OK', [('Content-Length', '10')])
return ['1234567890']
return [b'1234567890']
def start_response(env, headers):
pass
@ -1325,8 +1332,8 @@ class TestResponse(unittest.TestCase):
resp = swift.common.swob.Response(
body='1234567890', request=req,
conditional_response=True)
body = ''.join(resp([], start_response))
self.assertEqual(body, '234')
body = b''.join(resp({}, start_response))
self.assertEqual(body, b'234')
self.assertEqual(resp.content_range, 'bytes 1-3/10')
self.assertEqual(resp.status, '206 Partial Content')
@ -1336,8 +1343,8 @@ class TestResponse(unittest.TestCase):
'/', headers={'Range': 'bytes=-0'})
resp = req.get_response(test_app)
resp.conditional_response = True
body = ''.join(resp([], start_response))
self.assertIn('The Range requested is not available', body)
body = b''.join(resp({}, start_response))
self.assertIn(b'The Range requested is not available', body)
self.assertEqual(resp.content_length, len(body))
self.assertEqual(resp.status, '416 Requested Range Not Satisfiable')
self.assertEqual(resp.content_range, 'bytes */10')
@ -1345,8 +1352,8 @@ class TestResponse(unittest.TestCase):
resp = swift.common.swob.Response(
body='1234567890', request=req,
conditional_response=True)
body = ''.join(resp([], start_response))
self.assertIn('The Range requested is not available', body)
body = b''.join(resp({}, start_response))
self.assertIn(b'The Range requested is not available', body)
self.assertEqual(resp.content_length, len(body))
self.assertEqual(resp.status, '416 Requested Range Not Satisfiable')
@ -1355,16 +1362,16 @@ class TestResponse(unittest.TestCase):
'/', headers={'Range': 'bytes=3-2'})
resp = req.get_response(test_app)
resp.conditional_response = True
body = ''.join(resp([], start_response))
self.assertEqual(body, '1234567890')
body = b''.join(resp({}, start_response))
self.assertEqual(body, b'1234567890')
self.assertEqual(resp.status, '200 OK')
self.assertNotIn('Content-Range', resp.headers)
resp = swift.common.swob.Response(
body='1234567890', request=req,
conditional_response=True)
body = ''.join(resp([], start_response))
self.assertEqual(body, '1234567890')
body = b''.join(resp({}, start_response))
self.assertEqual(body, b'1234567890')
self.assertEqual(resp.status, '200 OK')
def test_content_type(self):
@ -1462,48 +1469,48 @@ class TestResponse(unittest.TestCase):
def test_507(self):
resp = swift.common.swob.HTTPInsufficientStorage()
content = ''.join(resp._response_iter(resp.app_iter, resp._body))
content = b''.join(resp._response_iter(resp.app_iter, resp._body))
self.assertEqual(
content,
'<html><h1>Insufficient Storage</h1><p>There was not enough space '
'to save the resource. Drive: unknown</p></html>')
b'<html><h1>Insufficient Storage</h1><p>There was not enough '
b'space to save the resource. Drive: unknown</p></html>')
resp = swift.common.swob.HTTPInsufficientStorage(drive='sda1')
content = ''.join(resp._response_iter(resp.app_iter, resp._body))
content = b''.join(resp._response_iter(resp.app_iter, resp._body))
self.assertEqual(
content,
'<html><h1>Insufficient Storage</h1><p>There was not enough space '
'to save the resource. Drive: sda1</p></html>')
b'<html><h1>Insufficient Storage</h1><p>There was not enough '
b'space to save the resource. Drive: sda1</p></html>')
def test_200_with_body_and_headers(self):
headers = {'Content-Length': '0'}
content = 'foo'
content = b'foo'
resp = swift.common.swob.HTTPOk(body=content, headers=headers)
self.assertEqual(resp.body, content)
self.assertEqual(resp.content_length, len(content))
def test_init_with_body_headers_app_iter(self):
# body exists but no headers and no app_iter
body = 'ok'
body = b'ok'
resp = swift.common.swob.Response(body=body)
self.assertEqual(resp.body, body)
self.assertEqual(resp.content_length, len(body))
# body and headers with 0 content_length exist but no app_iter
body = 'ok'
body = b'ok'
resp = swift.common.swob.Response(
body=body, headers={'Content-Length': '0'})
self.assertEqual(resp.body, body)
self.assertEqual(resp.content_length, len(body))
# body and headers with content_length exist but no app_iter
body = 'ok'
body = b'ok'
resp = swift.common.swob.Response(
body=body, headers={'Content-Length': '5'})
self.assertEqual(resp.body, body)
self.assertEqual(resp.content_length, len(body))
# body and headers with no content_length exist but no app_iter
body = 'ok'
body = b'ok'
resp = swift.common.swob.Response(body=body, headers={})
self.assertEqual(resp.body, body)
self.assertEqual(resp.content_length, len(body))
@ -1512,18 +1519,18 @@ class TestResponse(unittest.TestCase):
resp = swift.common.swob.Response(
body='ok', headers={'Content-Length': '5'}, app_iter=iter([]))
self.assertEqual(resp.content_length, 5)
self.assertEqual(resp.body, '')
self.assertEqual(resp.body, b'')
# headers with content_length and app_iter exist but no body
resp = swift.common.swob.Response(
headers={'Content-Length': '5'}, app_iter=iter([]))
self.assertEqual(resp.content_length, 5)
self.assertEqual(resp.body, '')
self.assertEqual(resp.body, b'')
# app_iter exists but no body and headers
resp = swift.common.swob.Response(app_iter=iter([]))
self.assertIsNone(resp.content_length)
self.assertEqual(resp.body, '')
self.assertEqual(resp.body, b'')
class TestUTC(unittest.TestCase):
@ -1534,7 +1541,7 @@ class TestUTC(unittest.TestCase):
class TestConditionalIfNoneMatch(unittest.TestCase):
def fake_app(self, environ, start_response):
start_response('200 OK', [('Etag', 'the-etag')])
return ['hi']
return [b'hi']
def fake_start_response(*a, **kw):
pass
@ -1545,9 +1552,9 @@ class TestConditionalIfNoneMatch(unittest.TestCase):
'/', headers={'If-None-Match': 'the-etag'})
resp = req.get_response(self.fake_app)
resp.conditional_response = True
body = ''.join(resp(req.environ, self.fake_start_response))
body = b''.join(resp(req.environ, self.fake_start_response))
self.assertEqual(resp.status_int, 304)
self.assertEqual(body, '')
self.assertEqual(body, b'')
def test_quoted_simple_match(self):
# double quotes don't matter
@ -1555,9 +1562,9 @@ class TestConditionalIfNoneMatch(unittest.TestCase):
'/', headers={'If-None-Match': '"the-etag"'})
resp = req.get_response(self.fake_app)
resp.conditional_response = True
body = ''.join(resp(req.environ, self.fake_start_response))
body = b''.join(resp(req.environ, self.fake_start_response))
self.assertEqual(resp.status_int, 304)
self.assertEqual(body, '')
self.assertEqual(body, b'')
def test_list_match(self):
# it works with lists of etags to match
@ -1565,9 +1572,9 @@ class TestConditionalIfNoneMatch(unittest.TestCase):
'/', headers={'If-None-Match': '"bert", "the-etag", "ernie"'})
resp = req.get_response(self.fake_app)
resp.conditional_response = True
body = ''.join(resp(req.environ, self.fake_start_response))
body = b''.join(resp(req.environ, self.fake_start_response))
self.assertEqual(resp.status_int, 304)
self.assertEqual(body, '')
self.assertEqual(body, b'')
def test_list_no_match(self):
# no matches --> whatever the original status was
@ -1575,9 +1582,9 @@ class TestConditionalIfNoneMatch(unittest.TestCase):
'/', headers={'If-None-Match': '"bert", "ernie"'})
resp = req.get_response(self.fake_app)
resp.conditional_response = True
body = ''.join(resp(req.environ, self.fake_start_response))
body = b''.join(resp(req.environ, self.fake_start_response))
self.assertEqual(resp.status_int, 200)
self.assertEqual(body, 'hi')
self.assertEqual(body, b'hi')
def test_match_star(self):
# "*" means match anything; see RFC 2616 section 14.24
@ -1585,15 +1592,15 @@ class TestConditionalIfNoneMatch(unittest.TestCase):
'/', headers={'If-None-Match': '*'})
resp = req.get_response(self.fake_app)
resp.conditional_response = True
body = ''.join(resp(req.environ, self.fake_start_response))
body = b''.join(resp(req.environ, self.fake_start_response))
self.assertEqual(resp.status_int, 304)
self.assertEqual(body, '')
self.assertEqual(body, b'')
class TestConditionalIfMatch(unittest.TestCase):
def fake_app(self, environ, start_response):
start_response('200 OK', [('Etag', 'the-etag')])
return ['hi']
return [b'hi']
def fake_start_response(*a, **kw):
pass
@ -1604,9 +1611,9 @@ class TestConditionalIfMatch(unittest.TestCase):
'/', headers={'If-Match': 'the-etag'})
resp = req.get_response(self.fake_app)
resp.conditional_response = True
body = ''.join(resp(req.environ, self.fake_start_response))
body = b''.join(resp(req.environ, self.fake_start_response))
self.assertEqual(resp.status_int, 200)
self.assertEqual(body, 'hi')
self.assertEqual(body, b'hi')
def test_simple_conditional_etag_match(self):
# if etag matches, proceed as normal
@ -1615,9 +1622,9 @@ class TestConditionalIfMatch(unittest.TestCase):
resp = req.get_response(self.fake_app)
resp.conditional_response = True
resp._conditional_etag = 'not-the-etag'
body = ''.join(resp(req.environ, self.fake_start_response))
body = b''.join(resp(req.environ, self.fake_start_response))
self.assertEqual(resp.status_int, 200)
self.assertEqual(body, 'hi')
self.assertEqual(body, b'hi')
def test_quoted_simple_match(self):
# double quotes or not, doesn't matter
@ -1625,9 +1632,9 @@ class TestConditionalIfMatch(unittest.TestCase):
'/', headers={'If-Match': '"the-etag"'})
resp = req.get_response(self.fake_app)
resp.conditional_response = True
body = ''.join(resp(req.environ, self.fake_start_response))
body = b''.join(resp(req.environ, self.fake_start_response))
self.assertEqual(resp.status_int, 200)
self.assertEqual(body, 'hi')
self.assertEqual(body, b'hi')
def test_no_match(self):
# no match --> 412
@ -1635,9 +1642,9 @@ class TestConditionalIfMatch(unittest.TestCase):
'/', headers={'If-Match': 'not-the-etag'})
resp = req.get_response(self.fake_app)
resp.conditional_response = True
body = ''.join(resp(req.environ, self.fake_start_response))
body = b''.join(resp(req.environ, self.fake_start_response))
self.assertEqual(resp.status_int, 412)
self.assertEqual(body, '')
self.assertEqual(body, b'')
def test_simple_conditional_etag_no_match(self):
req = swift.common.swob.Request.blank(
@ -1645,9 +1652,9 @@ class TestConditionalIfMatch(unittest.TestCase):
resp = req.get_response(self.fake_app)
resp.conditional_response = True
resp._conditional_etag = 'not-the-etag'
body = ''.join(resp(req.environ, self.fake_start_response))
body = b''.join(resp(req.environ, self.fake_start_response))
self.assertEqual(resp.status_int, 412)
self.assertEqual(body, '')
self.assertEqual(body, b'')
def test_match_star(self):
# "*" means match anything; see RFC 2616 section 14.24
@ -1655,9 +1662,9 @@ class TestConditionalIfMatch(unittest.TestCase):
'/', headers={'If-Match': '*'})
resp = req.get_response(self.fake_app)
resp.conditional_response = True
body = ''.join(resp(req.environ, self.fake_start_response))
body = b''.join(resp(req.environ, self.fake_start_response))
self.assertEqual(resp.status_int, 200)
self.assertEqual(body, 'hi')
self.assertEqual(body, b'hi')
def test_match_star_on_404(self):
@ -1669,16 +1676,16 @@ class TestConditionalIfMatch(unittest.TestCase):
'/', headers={'If-Match': '*'})
resp = req.get_response(fake_app_404)
resp.conditional_response = True
body = ''.join(resp(req.environ, self.fake_start_response))
body = b''.join(resp(req.environ, self.fake_start_response))
self.assertEqual(resp.status_int, 412)
self.assertEqual(body, '')
self.assertEqual(body, b'')
class TestConditionalIfModifiedSince(unittest.TestCase):
def fake_app(self, environ, start_response):
start_response(
'200 OK', [('Last-Modified', 'Thu, 27 Feb 2014 03:29:37 GMT')])
return ['hi']
return [b'hi']
def fake_start_response(*a, **kw):
pass
@ -1687,9 +1694,9 @@ class TestConditionalIfModifiedSince(unittest.TestCase):
req = swift.common.swob.Request.blank('/')
resp = req.get_response(self.fake_app)
resp.conditional_response = True
body = ''.join(resp(req.environ, self.fake_start_response))
body = b''.join(resp(req.environ, self.fake_start_response))
self.assertEqual(resp.status_int, 200)
self.assertEqual(body, 'hi')
self.assertEqual(body, b'hi')
def test_before(self):
req = swift.common.swob.Request.blank(
@ -1697,9 +1704,9 @@ class TestConditionalIfModifiedSince(unittest.TestCase):
headers={'If-Modified-Since': 'Thu, 27 Feb 2014 03:29:36 GMT'})
resp = req.get_response(self.fake_app)
resp.conditional_response = True
body = ''.join(resp(req.environ, self.fake_start_response))
body = b''.join(resp(req.environ, self.fake_start_response))
self.assertEqual(resp.status_int, 200)
self.assertEqual(body, 'hi')
self.assertEqual(body, b'hi')
def test_same(self):
req = swift.common.swob.Request.blank(
@ -1707,9 +1714,9 @@ class TestConditionalIfModifiedSince(unittest.TestCase):
headers={'If-Modified-Since': 'Thu, 27 Feb 2014 03:29:37 GMT'})
resp = req.get_response(self.fake_app)
resp.conditional_response = True
body = ''.join(resp(req.environ, self.fake_start_response))
body = b''.join(resp(req.environ, self.fake_start_response))
self.assertEqual(resp.status_int, 304)
self.assertEqual(body, '')
self.assertEqual(body, b'')
def test_greater(self):
req = swift.common.swob.Request.blank(
@ -1717,9 +1724,9 @@ class TestConditionalIfModifiedSince(unittest.TestCase):
headers={'If-Modified-Since': 'Thu, 27 Feb 2014 03:29:38 GMT'})
resp = req.get_response(self.fake_app)
resp.conditional_response = True
body = ''.join(resp(req.environ, self.fake_start_response))
body = b''.join(resp(req.environ, self.fake_start_response))
self.assertEqual(resp.status_int, 304)
self.assertEqual(body, '')
self.assertEqual(body, b'')
def test_out_of_range_is_ignored(self):
# All that datetime gives us is a ValueError or OverflowError when
@ -1737,16 +1744,16 @@ class TestConditionalIfModifiedSince(unittest.TestCase):
headers={'If-Modified-Since': too_big_date_header})
resp = req.get_response(self.fake_app)
resp.conditional_response = True
body = ''.join(resp(req.environ, self.fake_start_response))
body = b''.join(resp(req.environ, self.fake_start_response))
self.assertEqual(resp.status_int, 200)
self.assertEqual(body, 'hi')
self.assertEqual(body, b'hi')
class TestConditionalIfUnmodifiedSince(unittest.TestCase):
def fake_app(self, environ, start_response):
start_response(
'200 OK', [('Last-Modified', 'Thu, 20 Feb 2014 03:29:37 GMT')])
return ['hi']
return [b'hi']
def fake_start_response(*a, **kw):
pass
@ -1755,9 +1762,9 @@ class TestConditionalIfUnmodifiedSince(unittest.TestCase):
req = swift.common.swob.Request.blank('/')
resp = req.get_response(self.fake_app)
resp.conditional_response = True
body = ''.join(resp(req.environ, self.fake_start_response))
body = b''.join(resp(req.environ, self.fake_start_response))
self.assertEqual(resp.status_int, 200)
self.assertEqual(body, 'hi')
self.assertEqual(body, b'hi')
def test_before(self):
req = swift.common.swob.Request.blank(
@ -1765,9 +1772,9 @@ class TestConditionalIfUnmodifiedSince(unittest.TestCase):
headers={'If-Unmodified-Since': 'Thu, 20 Feb 2014 03:29:36 GMT'})
resp = req.get_response(self.fake_app)
resp.conditional_response = True
body = ''.join(resp(req.environ, self.fake_start_response))
body = b''.join(resp(req.environ, self.fake_start_response))
self.assertEqual(resp.status_int, 412)
self.assertEqual(body, '')
self.assertEqual(body, b'')
def test_same(self):
req = swift.common.swob.Request.blank(
@ -1775,9 +1782,9 @@ class TestConditionalIfUnmodifiedSince(unittest.TestCase):
headers={'If-Unmodified-Since': 'Thu, 20 Feb 2014 03:29:37 GMT'})
resp = req.get_response(self.fake_app)
resp.conditional_response = True
body = ''.join(resp(req.environ, self.fake_start_response))
body = b''.join(resp(req.environ, self.fake_start_response))
self.assertEqual(resp.status_int, 200)
self.assertEqual(body, 'hi')
self.assertEqual(body, b'hi')
def test_greater(self):
req = swift.common.swob.Request.blank(
@ -1785,9 +1792,9 @@ class TestConditionalIfUnmodifiedSince(unittest.TestCase):
headers={'If-Unmodified-Since': 'Thu, 20 Feb 2014 03:29:38 GMT'})
resp = req.get_response(self.fake_app)
resp.conditional_response = True
body = ''.join(resp(req.environ, self.fake_start_response))
body = b''.join(resp(req.environ, self.fake_start_response))
self.assertEqual(resp.status_int, 200)
self.assertEqual(body, 'hi')
self.assertEqual(body, b'hi')
def test_out_of_range_is_ignored(self):
# All that datetime gives us is a ValueError or OverflowError when
@ -1805,9 +1812,9 @@ class TestConditionalIfUnmodifiedSince(unittest.TestCase):
headers={'If-Unmodified-Since': too_big_date_header})
resp = req.get_response(self.fake_app)
resp.conditional_response = True
body = ''.join(resp(req.environ, self.fake_start_response))
body = b''.join(resp(req.environ, self.fake_start_response))
self.assertEqual(resp.status_int, 200)
self.assertEqual(body, 'hi')
self.assertEqual(body, b'hi')
if __name__ == '__main__':

View File

@ -47,6 +47,7 @@ commands =
test/unit/common/test_manager.py \
test/unit/common/test_splice.py \
test/unit/common/test_storage_policy.py \
test/unit/common/test_swob.py \
test/unit/common/test_utils.py \
test/unit/common/test_wsgi.py}