change internal proxy to make calls to handle_request with a copy of the request rather than the request itself
This commit is contained in:
commit
7e8a8500b7
@ -23,25 +23,45 @@ from swift.proxy.server import BaseApplication
|
||||
|
||||
class MemcacheStub(object):
|
||||
|
||||
def get(self, *a, **kw):
|
||||
def get(self, *a, **kw): # pragma: no cover
|
||||
return None
|
||||
|
||||
def set(self, *a, **kw):
|
||||
def set(self, *a, **kw): # pragma: no cover
|
||||
return None
|
||||
|
||||
def incr(self, *a, **kw):
|
||||
def incr(self, *a, **kw): # pragma: no cover
|
||||
return 0
|
||||
|
||||
def delete(self, *a, **kw):
|
||||
def delete(self, *a, **kw): # pragma: no cover
|
||||
return None
|
||||
|
||||
def set_multi(self, *a, **kw):
|
||||
def set_multi(self, *a, **kw): # pragma: no cover
|
||||
return None
|
||||
|
||||
def get_multi(self, *a, **kw):
|
||||
def get_multi(self, *a, **kw): # pragma: no cover
|
||||
return []
|
||||
|
||||
|
||||
def make_request_body_file(source_file, compress=True):
|
||||
if hasattr(source_file, 'seek'):
|
||||
source_file.seek(0)
|
||||
else:
|
||||
source_file = open(source_file, 'rb')
|
||||
if compress:
|
||||
compressed_file = CompressingFileReader(source_file)
|
||||
return compressed_file
|
||||
return source_file
|
||||
|
||||
|
||||
def webob_request_copy(orig_req, source_file=None, compress=True):
|
||||
req_copy = orig_req.copy()
|
||||
if source_file:
|
||||
req_copy.body_file = make_request_body_file(source_file,
|
||||
compress=compress)
|
||||
req_copy.content_length = orig_req.content_length
|
||||
return req_copy
|
||||
|
||||
|
||||
class InternalProxy(object):
|
||||
"""
|
||||
Set up a private instance of a proxy server that allows normal requests
|
||||
@ -59,6 +79,20 @@ class InternalProxy(object):
|
||||
logger=logger)
|
||||
self.retries = retries
|
||||
|
||||
def _handle_request(self, req, source_file=None, compress=True):
|
||||
req = self.upload_app.update_request(req)
|
||||
req_copy = webob_request_copy(req, source_file=source_file,
|
||||
compress=compress)
|
||||
resp = self.upload_app.handle_request(req_copy)
|
||||
tries = 1
|
||||
while (resp.status_int < 200 or resp.status_int > 299) \
|
||||
and tries < self.retries:
|
||||
req_copy = webob_request_copy(req, source_file=source_file,
|
||||
compress=compress)
|
||||
resp = self.upload_app.handle_request(req_copy)
|
||||
tries += 1
|
||||
return resp
|
||||
|
||||
def upload_file(self, source_file, account, container, object_name,
|
||||
compress=True, content_type='application/x-gzip',
|
||||
etag=None):
|
||||
@ -81,33 +115,14 @@ class InternalProxy(object):
|
||||
return False
|
||||
|
||||
# upload the file to the account
|
||||
req = webob.Request.blank(target_name,
|
||||
req = webob.Request.blank(target_name, content_type=content_type,
|
||||
environ={'REQUEST_METHOD': 'PUT'},
|
||||
headers={'Transfer-Encoding': 'chunked'})
|
||||
if compress:
|
||||
if hasattr(source_file, 'read'):
|
||||
compressed_file = CompressingFileReader(source_file)
|
||||
else:
|
||||
compressed_file = CompressingFileReader(
|
||||
open(source_file, 'rb'))
|
||||
req.body_file = compressed_file
|
||||
else:
|
||||
if not hasattr(source_file, 'read'):
|
||||
source_file = open(source_file, 'rb')
|
||||
req.body_file = source_file
|
||||
req.account = account
|
||||
req.content_type = content_type
|
||||
req.content_length = None # to make sure we send chunked data
|
||||
if etag:
|
||||
req.etag = etag
|
||||
resp = self.upload_app.handle_request(
|
||||
self.upload_app.update_request(req))
|
||||
tries = 1
|
||||
while (resp.status_int < 200 or resp.status_int > 299) \
|
||||
and tries <= self.retries:
|
||||
resp = self.upload_app.handle_request(
|
||||
self.upload_app.update_request(req))
|
||||
tries += 1
|
||||
req.headers['etag'] = etag
|
||||
resp = self._handle_request(req, source_file=source_file,
|
||||
compress=compress)
|
||||
if not (200 <= resp.status_int < 300):
|
||||
return False
|
||||
return True
|
||||
@ -124,15 +139,7 @@ class InternalProxy(object):
|
||||
req = webob.Request.blank('/v1/%s/%s/%s' %
|
||||
(account, container, object_name),
|
||||
environ={'REQUEST_METHOD': 'GET'})
|
||||
req.account = account
|
||||
resp = self.upload_app.handle_request(
|
||||
self.upload_app.update_request(req))
|
||||
tries = 1
|
||||
while (resp.status_int < 200 or resp.status_int > 299) \
|
||||
and tries <= self.retries:
|
||||
resp = self.upload_app.handle_request(
|
||||
self.upload_app.update_request(req))
|
||||
tries += 1
|
||||
resp = self._handle_request(req)
|
||||
return resp.status_int, resp.app_iter
|
||||
|
||||
def create_container(self, account, container):
|
||||
@ -145,37 +152,31 @@ class InternalProxy(object):
|
||||
"""
|
||||
req = webob.Request.blank('/v1/%s/%s' % (account, container),
|
||||
environ={'REQUEST_METHOD': 'PUT'})
|
||||
req.account = account
|
||||
resp = self.upload_app.handle_request(
|
||||
self.upload_app.update_request(req))
|
||||
tries = 1
|
||||
while (resp.status_int < 200 or resp.status_int > 299) \
|
||||
and tries <= self.retries:
|
||||
resp = self.upload_app.handle_request(
|
||||
self.upload_app.update_request(req))
|
||||
tries += 1
|
||||
resp = self._handle_request(req)
|
||||
return 200 <= resp.status_int < 300
|
||||
|
||||
def get_container_list(self, account, container, marker=None,
|
||||
end_marker=None, limit=None, prefix=None,
|
||||
delimiter=None, full_listing=True):
|
||||
"""
|
||||
Get container listing.
|
||||
Get a listing of objects for the container.
|
||||
|
||||
:param account: account name for the container
|
||||
:param container: container name to get the listing of
|
||||
:param container: container name to get a listing for
|
||||
:param marker: marker query
|
||||
:param end_marker: end marker query
|
||||
:param limit: limit to query
|
||||
:param limit: limit query
|
||||
:param prefix: prefix query
|
||||
:param delimeter: delimeter for query
|
||||
:param full_listing: if True, make enough requests to get all listings
|
||||
:param delimeter: string to delimit the queries on
|
||||
:param full_listing: if True, return a full listing, else returns a max
|
||||
of 10000 listings
|
||||
:returns: list of objects
|
||||
"""
|
||||
if full_listing:
|
||||
rv = []
|
||||
listing = self.get_container_list(account, container, marker,
|
||||
end_marker, limit, prefix, delimiter, full_listing=False)
|
||||
end_marker, limit, prefix,
|
||||
delimiter, full_listing=False)
|
||||
while listing:
|
||||
rv.extend(listing)
|
||||
if not delimiter:
|
||||
@ -183,9 +184,11 @@ class InternalProxy(object):
|
||||
else:
|
||||
marker = listing[-1].get('name', listing[-1].get('subdir'))
|
||||
listing = self.get_container_list(account, container, marker,
|
||||
end_marker, limit, prefix, delimiter, full_listing=False)
|
||||
end_marker, limit, prefix,
|
||||
delimiter,
|
||||
full_listing=False)
|
||||
return rv
|
||||
path = '/v1/%s/%s' % (account, container)
|
||||
path = '/v1/%s/%s' % (account, quote(container))
|
||||
qs = 'format=json'
|
||||
if marker:
|
||||
qs += '&marker=%s' % quote(marker)
|
||||
@ -199,16 +202,9 @@ class InternalProxy(object):
|
||||
qs += '&delimiter=%s' % quote(delimiter)
|
||||
path += '?%s' % qs
|
||||
req = webob.Request.blank(path, environ={'REQUEST_METHOD': 'GET'})
|
||||
req.account = account
|
||||
resp = self.upload_app.handle_request(
|
||||
self.upload_app.update_request(req))
|
||||
tries = 1
|
||||
while (resp.status_int < 200 or resp.status_int > 299) \
|
||||
and tries <= self.retries:
|
||||
resp = self.upload_app.handle_request(
|
||||
self.upload_app.update_request(req))
|
||||
tries += 1
|
||||
resp = self._handle_request(req)
|
||||
if resp.status_int < 200 or resp.status_int >= 300:
|
||||
return [] # TODO: distinguish between 404 and empty container
|
||||
if resp.status_int == 204:
|
||||
return []
|
||||
if 200 <= resp.status_int < 300:
|
||||
return json_loads(resp.body)
|
||||
return json_loads(resp.body)
|
||||
|
@ -147,13 +147,12 @@ class LogProcessor(object):
|
||||
marker=search_key,
|
||||
end_marker=end_key)
|
||||
results = []
|
||||
if container_listing is not None:
|
||||
if listing_filter is None:
|
||||
listing_filter = set()
|
||||
for item in container_listing:
|
||||
name = item['name']
|
||||
if name not in listing_filter:
|
||||
results.append(name)
|
||||
if listing_filter is None:
|
||||
listing_filter = set()
|
||||
for item in container_listing:
|
||||
name = item['name']
|
||||
if name not in listing_filter:
|
||||
results.append(name)
|
||||
return results
|
||||
|
||||
def get_object_data(self, swift_account, container_name, object_name,
|
||||
|
@ -16,13 +16,176 @@
|
||||
# TODO: Tests
|
||||
|
||||
import unittest
|
||||
import webob
|
||||
import tempfile
|
||||
import json
|
||||
|
||||
from swift.common import internal_proxy
|
||||
|
||||
class DumbBaseApplicationFactory(object):
|
||||
|
||||
def __init__(self, status_codes, body=''):
|
||||
self.status_codes = status_codes[:]
|
||||
self.body = body
|
||||
|
||||
def __call__(self, *a, **kw):
|
||||
app = DumbBaseApplication(*a, **kw)
|
||||
app.status_codes = self.status_codes
|
||||
try:
|
||||
app.default_status_code = self.status_codes[-1]
|
||||
except IndexError:
|
||||
app.default_status_code = 200
|
||||
app.body = self.body
|
||||
return app
|
||||
|
||||
class DumbBaseApplication(object):
|
||||
|
||||
def __init__(self, *a, **kw):
|
||||
self.status_codes = []
|
||||
self.default_status_code = 200
|
||||
self.call_count = 0
|
||||
self.body = ''
|
||||
|
||||
def handle_request(self, req):
|
||||
self.call_count += 1
|
||||
req.path_info_pop()
|
||||
if isinstance(self.body, list):
|
||||
try:
|
||||
body = self.body.pop(0)
|
||||
except IndexError:
|
||||
body = ''
|
||||
else:
|
||||
body = self.body
|
||||
resp = webob.Response(request=req, body=body,
|
||||
conditional_response=True)
|
||||
try:
|
||||
resp.status_int = self.status_codes.pop(0)
|
||||
except IndexError:
|
||||
resp.status_int = self.default_status_code
|
||||
return resp
|
||||
|
||||
def update_request(self, req):
|
||||
return req
|
||||
|
||||
|
||||
class TestInternalProxy(unittest.TestCase):
|
||||
|
||||
def test_placeholder(self):
|
||||
pass
|
||||
def test_webob_request_copy(self):
|
||||
req = webob.Request.blank('/')
|
||||
req2 = internal_proxy.webob_request_copy(req)
|
||||
self.assertEquals(req.path, req2.path)
|
||||
self.assertEquals(req.path_info, req2.path_info)
|
||||
self.assertFalse(req is req2)
|
||||
|
||||
def test_handle_request(self):
|
||||
status_codes = [200]
|
||||
internal_proxy.BaseApplication = DumbBaseApplicationFactory(
|
||||
status_codes)
|
||||
p = internal_proxy.InternalProxy()
|
||||
req = webob.Request.blank('/')
|
||||
orig_req = internal_proxy.webob_request_copy(req)
|
||||
resp = p._handle_request(req)
|
||||
self.assertEquals(req.path_info, orig_req.path_info)
|
||||
|
||||
def test_handle_request_with_retries(self):
|
||||
status_codes = [500, 200]
|
||||
internal_proxy.BaseApplication = DumbBaseApplicationFactory(
|
||||
status_codes)
|
||||
p = internal_proxy.InternalProxy(retries=3)
|
||||
req = webob.Request.blank('/')
|
||||
orig_req = internal_proxy.webob_request_copy(req)
|
||||
resp = p._handle_request(req)
|
||||
self.assertEquals(req.path_info, orig_req.path_info)
|
||||
self.assertEquals(p.upload_app.call_count, 2)
|
||||
self.assertEquals(resp.status_int, 200)
|
||||
|
||||
def test_get_object(self):
|
||||
status_codes = [200]
|
||||
internal_proxy.BaseApplication = DumbBaseApplicationFactory(
|
||||
status_codes)
|
||||
p = internal_proxy.InternalProxy()
|
||||
code, body = p.get_object('a', 'c', 'o')
|
||||
body = ''.join(body)
|
||||
self.assertEquals(code, 200)
|
||||
self.assertEquals(body, '')
|
||||
|
||||
def test_create_container(self):
|
||||
status_codes = [200]
|
||||
internal_proxy.BaseApplication = DumbBaseApplicationFactory(
|
||||
status_codes)
|
||||
p = internal_proxy.InternalProxy()
|
||||
resp = p.create_container('a', 'c')
|
||||
self.assertTrue(resp)
|
||||
|
||||
def test_handle_request_with_retries_all_error(self):
|
||||
status_codes = [500, 500, 500, 500, 500]
|
||||
internal_proxy.BaseApplication = DumbBaseApplicationFactory(
|
||||
status_codes)
|
||||
p = internal_proxy.InternalProxy(retries=3)
|
||||
req = webob.Request.blank('/')
|
||||
orig_req = internal_proxy.webob_request_copy(req)
|
||||
resp = p._handle_request(req)
|
||||
self.assertEquals(req.path_info, orig_req.path_info)
|
||||
self.assertEquals(p.upload_app.call_count, 3)
|
||||
self.assertEquals(resp.status_int, 500)
|
||||
|
||||
def test_get_container_list_empty(self):
|
||||
status_codes = [200]
|
||||
internal_proxy.BaseApplication = DumbBaseApplicationFactory(
|
||||
status_codes, body='[]')
|
||||
p = internal_proxy.InternalProxy()
|
||||
resp = p.get_container_list('a', 'c')
|
||||
self.assertEquals(resp, [])
|
||||
|
||||
def test_get_container_list_no_body(self):
|
||||
status_codes = [204]
|
||||
internal_proxy.BaseApplication = DumbBaseApplicationFactory(
|
||||
status_codes, body='')
|
||||
p = internal_proxy.InternalProxy()
|
||||
resp = p.get_container_list('a', 'c')
|
||||
self.assertEquals(resp, [])
|
||||
|
||||
def test_get_container_list_full_listing(self):
|
||||
status_codes = [200, 200]
|
||||
obj_a = dict(name='foo', hash='foo', bytes=3,
|
||||
content_type='text/plain', last_modified='2011/01/01')
|
||||
obj_b = dict(name='bar', hash='bar', bytes=3,
|
||||
content_type='text/plain', last_modified='2011/01/01')
|
||||
body = [json.dumps([obj_a]), json.dumps([obj_b]), json.dumps([])]
|
||||
internal_proxy.BaseApplication = DumbBaseApplicationFactory(
|
||||
status_codes, body=body)
|
||||
p = internal_proxy.InternalProxy()
|
||||
resp = p.get_container_list('a', 'c')
|
||||
expected = ['foo', 'bar']
|
||||
self.assertEquals([x['name'] for x in resp], expected)
|
||||
|
||||
def test_get_container_list_full(self):
|
||||
status_codes = [204]
|
||||
internal_proxy.BaseApplication = DumbBaseApplicationFactory(
|
||||
status_codes, body='')
|
||||
p = internal_proxy.InternalProxy()
|
||||
resp = p.get_container_list('a', 'c', marker='a', end_marker='b',
|
||||
limit=100, prefix='/', delimiter='.')
|
||||
self.assertEquals(resp, [])
|
||||
|
||||
def test_upload_file(self):
|
||||
status_codes = [200, 200] # container PUT + object PUT
|
||||
internal_proxy.BaseApplication = DumbBaseApplicationFactory(
|
||||
status_codes)
|
||||
p = internal_proxy.InternalProxy()
|
||||
with tempfile.NamedTemporaryFile() as file_obj:
|
||||
resp = p.upload_file(file_obj.name, 'a', 'c', 'o')
|
||||
self.assertTrue(resp)
|
||||
|
||||
def test_upload_file_with_retries(self):
|
||||
status_codes = [200, 500, 200] # container PUT + error + object PUT
|
||||
internal_proxy.BaseApplication = DumbBaseApplicationFactory(
|
||||
status_codes)
|
||||
p = internal_proxy.InternalProxy(retries=3)
|
||||
with tempfile.NamedTemporaryFile() as file_obj:
|
||||
resp = p.upload_file(file_obj, 'a', 'c', 'o')
|
||||
self.assertTrue(resp)
|
||||
self.assertEquals(p.upload_app.call_count, 3)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
Loading…
x
Reference in New Issue
Block a user