py3: port staticweb middleware
This patch is porting the staticweb middleware to py3. Change-Id: I5d9a13baecedd13d2b7a8ae3dd639eaff0894441
This commit is contained in:
parent
48a2e5c311
commit
b86bc51607
@ -125,6 +125,7 @@ Example usage of this middleware via ``swift``:
|
||||
|
||||
import cgi
|
||||
import json
|
||||
import six
|
||||
import time
|
||||
|
||||
from swift.common.utils import human_readable, split_path, config_true_value, \
|
||||
@ -132,7 +133,7 @@ from swift.common.utils import human_readable, split_path, config_true_value, \
|
||||
from swift.common.wsgi import make_env, WSGIContext
|
||||
from swift.common.http import is_success, is_redirection, HTTP_NOT_FOUND
|
||||
from swift.common.swob import Response, HTTPMovedPermanently, HTTPNotFound, \
|
||||
Request
|
||||
Request, wsgi_quote, wsgi_to_str
|
||||
from swift.proxy.controllers.base import get_container_info
|
||||
|
||||
|
||||
@ -145,6 +146,12 @@ class _StaticWebContext(WSGIContext):
|
||||
that might need to be handled to make keeping contextual
|
||||
information about the request a bit simpler than storing it in
|
||||
the WSGI env.
|
||||
|
||||
:param staticweb: The staticweb middleware object in use.
|
||||
:param version: A WSGI string representation of the swift api version.
|
||||
:param account: A WSGI string representation of the account name.
|
||||
:param container: A WSGI string representation of the container name.
|
||||
:param obj: A WSGI string representation of the object name.
|
||||
"""
|
||||
|
||||
def __init__(self, staticweb, version, account, container, obj):
|
||||
@ -223,9 +230,9 @@ class _StaticWebContext(WSGIContext):
|
||||
:param start_response: The original WSGI start_response hook.
|
||||
:param prefix: Any prefix desired for the container listing.
|
||||
"""
|
||||
label = env['PATH_INFO']
|
||||
label = wsgi_to_str(env['PATH_INFO'])
|
||||
if self._listings_label:
|
||||
groups = env['PATH_INFO'].split('/')
|
||||
groups = wsgi_to_str(env['PATH_INFO']).split('/')
|
||||
label = '{0}/{1}'.format(self._listings_label,
|
||||
'/'.join(groups[4:]))
|
||||
|
||||
@ -262,14 +269,14 @@ class _StaticWebContext(WSGIContext):
|
||||
self.agent, swift_source='SW')
|
||||
tmp_env['QUERY_STRING'] = 'delimiter=/'
|
||||
if prefix:
|
||||
tmp_env['QUERY_STRING'] += '&prefix=%s' % quote(prefix)
|
||||
tmp_env['QUERY_STRING'] += '&prefix=%s' % wsgi_quote(prefix)
|
||||
else:
|
||||
prefix = ''
|
||||
resp = self._app_call(tmp_env)
|
||||
if not is_success(self._get_status_int()):
|
||||
return self._error_response(resp, env, start_response)
|
||||
listing = None
|
||||
body = ''.join(resp)
|
||||
body = b''.join(resp)
|
||||
if body:
|
||||
listing = json.loads(body)
|
||||
if not listing:
|
||||
@ -280,7 +287,8 @@ class _StaticWebContext(WSGIContext):
|
||||
'Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd">\n' \
|
||||
'<html>\n' \
|
||||
' <head>\n' \
|
||||
' <title>Listing of %s</title>\n' % cgi.escape(label)
|
||||
' <title>Listing of %s</title>\n' % \
|
||||
cgi.escape(label)
|
||||
if self._listings_css:
|
||||
body += ' <link rel="stylesheet" type="text/css" ' \
|
||||
'href="%s" />\n' % (self._build_css_path(prefix))
|
||||
@ -308,7 +316,8 @@ class _StaticWebContext(WSGIContext):
|
||||
' </tr>\n'
|
||||
for item in listing:
|
||||
if 'subdir' in item:
|
||||
subdir = item['subdir'].encode("utf-8")
|
||||
subdir = item['subdir'] if six.PY3 else \
|
||||
item['subdir'].encode('utf-8')
|
||||
if prefix:
|
||||
subdir = subdir[len(prefix):]
|
||||
body += ' <tr class="item subdir">\n' \
|
||||
@ -319,13 +328,16 @@ class _StaticWebContext(WSGIContext):
|
||||
(quote(subdir), cgi.escape(subdir))
|
||||
for item in listing:
|
||||
if 'name' in item:
|
||||
name = item['name'].encode("utf-8")
|
||||
name = item['name'] if six.PY3 else \
|
||||
item['name'].encode('utf-8')
|
||||
if prefix:
|
||||
name = name[len(prefix):]
|
||||
content_type = item['content_type'].encode("utf-8")
|
||||
content_type = item['content_type'] if six.PY3 else \
|
||||
item['content_type'].encode('utf-8')
|
||||
bytes = human_readable(item['bytes'])
|
||||
last_modified = (
|
||||
cgi.escape(item['last_modified'].encode("utf-8")).
|
||||
cgi.escape(item['last_modified'] if six.PY3 else
|
||||
item['last_modified'].encode('utf-8')).
|
||||
split('.')[0].replace('T', ' '))
|
||||
body += ' <tr class="item %s">\n' \
|
||||
' <td class="colname"><a href="%s">%s</a></td>\n' \
|
||||
@ -466,9 +478,9 @@ class _StaticWebContext(WSGIContext):
|
||||
self.version, self.account, self.container),
|
||||
self.agent, swift_source='SW')
|
||||
tmp_env['QUERY_STRING'] = 'limit=1&delimiter=/&prefix=%s' % (
|
||||
quote(self.obj + '/'), )
|
||||
quote(wsgi_to_str(self.obj) + '/'), )
|
||||
resp = self._app_call(tmp_env)
|
||||
body = ''.join(resp)
|
||||
body = b''.join(resp)
|
||||
if not is_success(self._get_status_int()) or not body or \
|
||||
not json.loads(body):
|
||||
resp = HTTPNotFound()(env, self._start_response)
|
||||
|
@ -528,7 +528,7 @@ class TestStaticWeb(unittest.TestCase):
|
||||
def test_container3indexhtml(self):
|
||||
resp = Request.blank('/v1/a/c3/').get_response(self.test_staticweb)
|
||||
self.assertEqual(resp.status_int, 200)
|
||||
self.assertTrue('Test main index.html file.' in resp.body)
|
||||
self.assertIn(b'Test main index.html file.', resp.body)
|
||||
|
||||
def test_container3subsubdir(self):
|
||||
resp = Request.blank(
|
||||
@ -539,16 +539,16 @@ class TestStaticWeb(unittest.TestCase):
|
||||
resp = Request.blank(
|
||||
'/v1/a/c3/subdir3/subsubdir/').get_response(self.test_staticweb)
|
||||
self.assertEqual(resp.status_int, 200)
|
||||
self.assertEqual(resp.body, 'index file')
|
||||
self.assertEqual(resp.body, b'index file')
|
||||
|
||||
def test_container3subdir(self):
|
||||
resp = Request.blank(
|
||||
'/v1/a/c3/subdir/').get_response(self.test_staticweb)
|
||||
self.assertEqual(resp.status_int, 200)
|
||||
self.assertIn('Listing of /v1/a/c3/subdir/', resp.body)
|
||||
self.assertIn('</style>', resp.body)
|
||||
self.assertNotIn('<link', resp.body)
|
||||
self.assertNotIn('listing.css', resp.body)
|
||||
self.assertIn(b'Listing of /v1/a/c3/subdir/', resp.body)
|
||||
self.assertIn(b'</style>', resp.body)
|
||||
self.assertNotIn(b'<link', resp.body)
|
||||
self.assertNotIn(b'listing.css', resp.body)
|
||||
|
||||
def test_container3subdirx(self):
|
||||
resp = Request.blank(
|
||||
@ -569,18 +569,18 @@ class TestStaticWeb(unittest.TestCase):
|
||||
resp = Request.blank(
|
||||
'/v1/a/c3/unknown').get_response(self.test_staticweb)
|
||||
self.assertEqual(resp.status_int, 404)
|
||||
self.assertNotIn("Chrome's 404 fancy-page sucks.", resp.body)
|
||||
self.assertNotIn(b"Chrome's 404 fancy-page sucks.", resp.body)
|
||||
|
||||
def test_container3bindexhtml(self):
|
||||
resp = Request.blank('/v1/a/c3b/').get_response(self.test_staticweb)
|
||||
self.assertEqual(resp.status_int, 204)
|
||||
self.assertEqual(resp.body, '')
|
||||
self.assertEqual(resp.body, b'')
|
||||
|
||||
def test_container4indexhtml(self):
|
||||
resp = Request.blank('/v1/a/c4/').get_response(self.test_staticweb)
|
||||
self.assertEqual(resp.status_int, 200)
|
||||
self.assertIn('Listing of /v1/a/c4/', resp.body)
|
||||
self.assertIn('href="listing.css"', resp.body)
|
||||
self.assertIn(b'Listing of /v1/a/c4/', resp.body)
|
||||
self.assertIn(b'href="listing.css"', resp.body)
|
||||
|
||||
def test_container4indexhtmlauthed(self):
|
||||
resp = Request.blank('/v1/a/c4').get_response(self.test_staticweb)
|
||||
@ -600,16 +600,16 @@ class TestStaticWeb(unittest.TestCase):
|
||||
resp = Request.blank(
|
||||
'/v1/a/c4/unknown').get_response(self.test_staticweb)
|
||||
self.assertEqual(resp.status_int, 404)
|
||||
self.assertIn("Chrome's 404 fancy-page sucks.", resp.body)
|
||||
self.assertIn(b"Chrome's 404 fancy-page sucks.", resp.body)
|
||||
|
||||
def test_container4subdir(self):
|
||||
resp = Request.blank(
|
||||
'/v1/a/c4/subdir/').get_response(self.test_staticweb)
|
||||
self.assertEqual(resp.status_int, 200)
|
||||
self.assertIn('Listing of /v1/a/c4/subdir/', resp.body)
|
||||
self.assertNotIn('</style>', resp.body)
|
||||
self.assertIn('<link', resp.body)
|
||||
self.assertIn('href="../listing.css"', resp.body)
|
||||
self.assertIn(b'Listing of /v1/a/c4/subdir/', resp.body)
|
||||
self.assertNotIn(b'</style>', resp.body)
|
||||
self.assertIn(b'<link', resp.body)
|
||||
self.assertIn(b'href="../listing.css"', resp.body)
|
||||
self.assertEqual(resp.headers['content-type'],
|
||||
'text/html; charset=UTF-8')
|
||||
|
||||
@ -631,7 +631,7 @@ class TestStaticWeb(unittest.TestCase):
|
||||
resp = Request.blank(
|
||||
'/v1/a/c5/unknown').get_response(self.test_staticweb)
|
||||
self.assertEqual(resp.status_int, 404)
|
||||
self.assertNotIn("Chrome's 404 fancy-page sucks.", resp.body)
|
||||
self.assertNotIn(b"Chrome's 404 fancy-page sucks.", resp.body)
|
||||
|
||||
def test_container6subdir(self):
|
||||
resp = Request.blank(
|
||||
@ -649,7 +649,7 @@ class TestStaticWeb(unittest.TestCase):
|
||||
staticweb.filter_factory({})(self.app), deny_listing=True)
|
||||
resp = Request.blank('/v1/a/c6/').get_response(test_staticweb)
|
||||
self.assertEqual(resp.status_int, 401)
|
||||
self.assertIn("Hey, you're not authorized to see this!", resp.body)
|
||||
self.assertIn(b"Hey, you're not authorized to see this!", resp.body)
|
||||
|
||||
# expect default 401 if request is not auth'd for listing or object GET
|
||||
test_staticweb = FakeAuthFilter(
|
||||
@ -657,20 +657,20 @@ class TestStaticWeb(unittest.TestCase):
|
||||
deny_objects=True)
|
||||
resp = Request.blank('/v1/a/c6/').get_response(test_staticweb)
|
||||
self.assertEqual(resp.status_int, 401)
|
||||
self.assertNotIn("Hey, you're not authorized to see this!", resp.body)
|
||||
self.assertNotIn(b"Hey, you're not authorized to see this!", resp.body)
|
||||
|
||||
def test_container6blisting(self):
|
||||
label = 'Listing of {0}/'.format(
|
||||
meta_map['c6b']['meta']['web-listings-label'])
|
||||
resp = Request.blank('/v1/a/c6b/').get_response(self.test_staticweb)
|
||||
self.assertEqual(resp.status_int, 200)
|
||||
self.assertIn(label, resp.body)
|
||||
self.assertIn(label.encode('utf-8'), resp.body)
|
||||
|
||||
def test_container7listing(self):
|
||||
# container7 has web-listings = f, web-error=error.html
|
||||
resp = Request.blank('/v1/a/c7/').get_response(self.test_staticweb)
|
||||
self.assertEqual(resp.status_int, 404)
|
||||
self.assertIn("Web Listing Disabled", resp.body)
|
||||
self.assertIn(b"Web Listing Disabled", resp.body)
|
||||
|
||||
# expect 301 if auth'd but no trailing '/'
|
||||
resp = Request.blank('/v1/a/c7').get_response(self.test_staticweb)
|
||||
@ -682,14 +682,14 @@ class TestStaticWeb(unittest.TestCase):
|
||||
deny_objects=True)
|
||||
resp = Request.blank('/v1/a/c7').get_response(test_staticweb)
|
||||
self.assertEqual(resp.status_int, 401)
|
||||
self.assertNotIn("Hey, you're not authorized to see this!", resp.body)
|
||||
self.assertNotIn(b"Hey, you're not authorized to see this!", resp.body)
|
||||
|
||||
# expect custom 401 if request is not auth'd for listing
|
||||
test_staticweb = FakeAuthFilter(
|
||||
staticweb.filter_factory({})(self.app), deny_listing=True)
|
||||
resp = Request.blank('/v1/a/c7/').get_response(test_staticweb)
|
||||
self.assertEqual(resp.status_int, 401)
|
||||
self.assertIn("Hey, you're not authorized to see this!", resp.body)
|
||||
self.assertIn(b"Hey, you're not authorized to see this!", resp.body)
|
||||
|
||||
# expect default 401 if request is not auth'd for listing or object GET
|
||||
test_staticweb = FakeAuthFilter(
|
||||
@ -697,69 +697,69 @@ class TestStaticWeb(unittest.TestCase):
|
||||
deny_objects=True)
|
||||
resp = Request.blank('/v1/a/c7/').get_response(test_staticweb)
|
||||
self.assertEqual(resp.status_int, 401)
|
||||
self.assertNotIn("Hey, you're not authorized to see this!", resp.body)
|
||||
self.assertNotIn(b"Hey, you're not authorized to see this!", resp.body)
|
||||
|
||||
def test_container8listingcss(self):
|
||||
resp = Request.blank(
|
||||
'/v1/a/c8/').get_response(self.test_staticweb)
|
||||
self.assertEqual(resp.status_int, 200)
|
||||
self.assertTrue('Listing of /v1/a/c8/' in resp.body)
|
||||
self.assertTrue('<link' in resp.body)
|
||||
self.assertTrue(
|
||||
'href="http://localhost/stylesheets/listing.css"' in resp.body)
|
||||
self.assertIn(b'Listing of /v1/a/c8/', resp.body)
|
||||
self.assertIn(b'<link', resp.body)
|
||||
self.assertIn(b'href="http://localhost/stylesheets/listing.css"',
|
||||
resp.body)
|
||||
|
||||
def test_container8subdirlistingcss(self):
|
||||
resp = Request.blank(
|
||||
'/v1/a/c8/subdir/').get_response(self.test_staticweb)
|
||||
self.assertEqual(resp.status_int, 200)
|
||||
self.assertTrue('Listing of /v1/a/c8/subdir/' in resp.body)
|
||||
self.assertTrue('<link' in resp.body)
|
||||
self.assertTrue(
|
||||
'href="http://localhost/stylesheets/listing.css"' in resp.body)
|
||||
self.assertIn(b'Listing of /v1/a/c8/subdir/', resp.body)
|
||||
self.assertIn(b'<link', resp.body)
|
||||
self.assertIn(b'href="http://localhost/stylesheets/listing.css"',
|
||||
resp.body)
|
||||
|
||||
def test_container9listingcss(self):
|
||||
resp = Request.blank(
|
||||
'/v1/a/c9/').get_response(self.test_staticweb)
|
||||
self.assertEqual(resp.status_int, 200)
|
||||
self.assertTrue('Listing of /v1/a/c9/' in resp.body)
|
||||
self.assertTrue('<link' in resp.body)
|
||||
self.assertTrue('href="/absolute/listing.css"' in resp.body)
|
||||
self.assertIn(b'Listing of /v1/a/c9/', resp.body)
|
||||
self.assertIn(b'<link', resp.body)
|
||||
self.assertIn(b'href="/absolute/listing.css"', resp.body)
|
||||
|
||||
def test_container9subdirlistingcss(self):
|
||||
resp = Request.blank(
|
||||
'/v1/a/c9/subdir/').get_response(self.test_staticweb)
|
||||
self.assertEqual(resp.status_int, 200)
|
||||
self.assertTrue('Listing of /v1/a/c9/subdir/' in resp.body)
|
||||
self.assertTrue('<link' in resp.body)
|
||||
self.assertTrue('href="/absolute/listing.css"' in resp.body)
|
||||
self.assertIn(b'Listing of /v1/a/c9/subdir/', resp.body)
|
||||
self.assertIn(b'<link', resp.body)
|
||||
self.assertIn(b'href="/absolute/listing.css"', resp.body)
|
||||
|
||||
def test_container10unicodesubdirlisting(self):
|
||||
resp = Request.blank(
|
||||
'/v1/a/c10/').get_response(self.test_staticweb)
|
||||
self.assertEqual(resp.status_int, 200)
|
||||
self.assertTrue('Listing of /v1/a/c10/' in resp.body)
|
||||
self.assertIn(b'Listing of /v1/a/c10/', resp.body)
|
||||
resp = Request.blank(
|
||||
'/v1/a/c10/\xe2\x98\x83/').get_response(self.test_staticweb)
|
||||
self.assertEqual(resp.status_int, 200)
|
||||
self.assertTrue('Listing of /v1/a/c10/\xe2\x98\x83/' in resp.body)
|
||||
self.assertIn(b'Listing of /v1/a/c10/\xe2\x98\x83/', resp.body)
|
||||
resp = Request.blank(
|
||||
'/v1/a/c10/\xe2\x98\x83/\xe2\x98\x83/'
|
||||
).get_response(self.test_staticweb)
|
||||
self.assertEqual(resp.status_int, 200)
|
||||
self.assertTrue(
|
||||
'Listing of /v1/a/c10/\xe2\x98\x83/\xe2\x98\x83/' in resp.body)
|
||||
self.assertIn(
|
||||
b'Listing of /v1/a/c10/\xe2\x98\x83/\xe2\x98\x83/', resp.body)
|
||||
|
||||
def test_container11subdirmarkerobjectindex(self):
|
||||
resp = Request.blank('/v1/a/c11/subdir/').get_response(
|
||||
self.test_staticweb)
|
||||
self.assertEqual(resp.status_int, 200)
|
||||
self.assertTrue('<h2>c11 subdir index</h2>' in resp.body)
|
||||
self.assertIn(b'<h2>c11 subdir index</h2>', resp.body)
|
||||
|
||||
def test_container11subdirmarkermatchdirtype(self):
|
||||
resp = Request.blank('/v1/a/c11a/subdir/').get_response(
|
||||
self.test_staticweb)
|
||||
self.assertEqual(resp.status_int, 404)
|
||||
self.assertIn('Index File Not Found', resp.body)
|
||||
self.assertIn(b'Index File Not Found', resp.body)
|
||||
|
||||
def test_container11subdirmarkeraltdirtype(self):
|
||||
resp = Request.blank('/v1/a/c11a/subdir2/').get_response(
|
||||
@ -775,27 +775,27 @@ class TestStaticWeb(unittest.TestCase):
|
||||
resp = Request.blank('/v1/a/c12/').get_response(
|
||||
self.test_staticweb)
|
||||
self.assertEqual(resp.status_int, 200)
|
||||
self.assertIn('index file', resp.body)
|
||||
self.assertIn(b'index file', resp.body)
|
||||
|
||||
def test_container_404_has_css(self):
|
||||
resp = Request.blank('/v1/a/c13/').get_response(
|
||||
self.test_staticweb)
|
||||
self.assertEqual(resp.status_int, 404)
|
||||
self.assertIn('listing.css', resp.body)
|
||||
self.assertIn(b'listing.css', resp.body)
|
||||
|
||||
def test_container_404_has_no_css(self):
|
||||
resp = Request.blank('/v1/a/c7/').get_response(
|
||||
self.test_staticweb)
|
||||
self.assertEqual(resp.status_int, 404)
|
||||
self.assertNotIn('listing.css', resp.body)
|
||||
self.assertIn('<style', resp.body)
|
||||
self.assertNotIn(b'listing.css', resp.body)
|
||||
self.assertIn(b'<style', resp.body)
|
||||
|
||||
def test_subrequest_once_if_possible(self):
|
||||
resp = Request.blank(
|
||||
'/v1/a/c4/one.txt').get_response(self.test_staticweb)
|
||||
self.assertEqual(resp.status_int, 200)
|
||||
self.assertEqual(resp.headers['x-object-meta-test'], 'value')
|
||||
self.assertEqual(resp.body, '1')
|
||||
self.assertEqual(resp.body, b'1')
|
||||
self.assertEqual(self.app.calls, 1)
|
||||
|
||||
def test_no_auth_middleware(self):
|
||||
|
1
tox.ini
1
tox.ini
@ -63,6 +63,7 @@ commands =
|
||||
test/unit/common/middleware/test_read_only.py \
|
||||
test/unit/common/middleware/test_recon.py \
|
||||
test/unit/common/middleware/test_subrequest_logging.py \
|
||||
test/unit/common/middleware/test_staticweb.py \
|
||||
test/unit/common/middleware/test_tempauth.py \
|
||||
test/unit/common/middleware/test_versioned_writes.py \
|
||||
test/unit/common/middleware/test_xprofile.py \
|
||||
|
Loading…
Reference in New Issue
Block a user