diff --git a/swift/common/internal_proxy.py b/swift/common/internal_proxy.py index 40c5185cfa..dd62ba1b0d 100644 --- a/swift/common/internal_proxy.py +++ b/swift/common/internal_proxy.py @@ -23,25 +23,45 @@ from swift.proxy.server import BaseApplication class MemcacheStub(object): - def get(self, *a, **kw): + def get(self, *a, **kw): # pragma: no cover return None - def set(self, *a, **kw): + def set(self, *a, **kw): # pragma: no cover return None - def incr(self, *a, **kw): + def incr(self, *a, **kw): # pragma: no cover return 0 - def delete(self, *a, **kw): + def delete(self, *a, **kw): # pragma: no cover return None - def set_multi(self, *a, **kw): + def set_multi(self, *a, **kw): # pragma: no cover return None - def get_multi(self, *a, **kw): + def get_multi(self, *a, **kw): # pragma: no cover return [] +def make_request_body_file(source_file, compress=True): + if hasattr(source_file, 'seek'): + source_file.seek(0) + else: + source_file = open(source_file, 'rb') + if compress: + compressed_file = CompressingFileReader(source_file) + return compressed_file + return source_file + + +def webob_request_copy(orig_req, source_file=None, compress=True): + req_copy = orig_req.copy() + if source_file: + req_copy.body_file = make_request_body_file(source_file, + compress=compress) + req_copy.content_length = orig_req.content_length + return req_copy + + class InternalProxy(object): """ Set up a private instance of a proxy server that allows normal requests @@ -59,6 +79,20 @@ class InternalProxy(object): logger=logger) self.retries = retries + def _handle_request(self, req, source_file=None, compress=True): + req = self.upload_app.update_request(req) + req_copy = webob_request_copy(req, source_file=source_file, + compress=compress) + resp = self.upload_app.handle_request(req_copy) + tries = 1 + while (resp.status_int < 200 or resp.status_int > 299) \ + and tries < self.retries: + req_copy = webob_request_copy(req, source_file=source_file, + compress=compress) + resp = self.upload_app.handle_request(req_copy) + tries += 1 + return resp + def upload_file(self, source_file, account, container, object_name, compress=True, content_type='application/x-gzip', etag=None): @@ -81,33 +115,14 @@ class InternalProxy(object): return False # upload the file to the account - req = webob.Request.blank(target_name, + req = webob.Request.blank(target_name, content_type=content_type, environ={'REQUEST_METHOD': 'PUT'}, headers={'Transfer-Encoding': 'chunked'}) - if compress: - if hasattr(source_file, 'read'): - compressed_file = CompressingFileReader(source_file) - else: - compressed_file = CompressingFileReader( - open(source_file, 'rb')) - req.body_file = compressed_file - else: - if not hasattr(source_file, 'read'): - source_file = open(source_file, 'rb') - req.body_file = source_file - req.account = account - req.content_type = content_type req.content_length = None # to make sure we send chunked data if etag: - req.etag = etag - resp = self.upload_app.handle_request( - self.upload_app.update_request(req)) - tries = 1 - while (resp.status_int < 200 or resp.status_int > 299) \ - and tries <= self.retries: - resp = self.upload_app.handle_request( - self.upload_app.update_request(req)) - tries += 1 + req.headers['etag'] = etag + resp = self._handle_request(req, source_file=source_file, + compress=compress) if not (200 <= resp.status_int < 300): return False return True @@ -124,15 +139,7 @@ class InternalProxy(object): req = webob.Request.blank('/v1/%s/%s/%s' % (account, container, object_name), environ={'REQUEST_METHOD': 'GET'}) - req.account = account - resp = self.upload_app.handle_request( - self.upload_app.update_request(req)) - tries = 1 - while (resp.status_int < 200 or resp.status_int > 299) \ - and tries <= self.retries: - resp = self.upload_app.handle_request( - self.upload_app.update_request(req)) - tries += 1 + resp = self._handle_request(req) return resp.status_int, resp.app_iter def create_container(self, account, container): @@ -145,37 +152,31 @@ class InternalProxy(object): """ req = webob.Request.blank('/v1/%s/%s' % (account, container), environ={'REQUEST_METHOD': 'PUT'}) - req.account = account - resp = self.upload_app.handle_request( - self.upload_app.update_request(req)) - tries = 1 - while (resp.status_int < 200 or resp.status_int > 299) \ - and tries <= self.retries: - resp = self.upload_app.handle_request( - self.upload_app.update_request(req)) - tries += 1 + resp = self._handle_request(req) return 200 <= resp.status_int < 300 def get_container_list(self, account, container, marker=None, end_marker=None, limit=None, prefix=None, delimiter=None, full_listing=True): """ - Get container listing. + Get a listing of objects for the container. :param account: account name for the container - :param container: container name to get the listing of + :param container: container name to get a listing for :param marker: marker query :param end_marker: end marker query - :param limit: limit to query + :param limit: limit query :param prefix: prefix query - :param delimeter: delimeter for query - :param full_listing: if True, make enough requests to get all listings + :param delimeter: string to delimit the queries on + :param full_listing: if True, return a full listing, else returns a max + of 10000 listings :returns: list of objects """ if full_listing: rv = [] listing = self.get_container_list(account, container, marker, - end_marker, limit, prefix, delimiter, full_listing=False) + end_marker, limit, prefix, + delimiter, full_listing=False) while listing: rv.extend(listing) if not delimiter: @@ -183,9 +184,11 @@ class InternalProxy(object): else: marker = listing[-1].get('name', listing[-1].get('subdir')) listing = self.get_container_list(account, container, marker, - end_marker, limit, prefix, delimiter, full_listing=False) + end_marker, limit, prefix, + delimiter, + full_listing=False) return rv - path = '/v1/%s/%s' % (account, container) + path = '/v1/%s/%s' % (account, quote(container)) qs = 'format=json' if marker: qs += '&marker=%s' % quote(marker) @@ -199,16 +202,9 @@ class InternalProxy(object): qs += '&delimiter=%s' % quote(delimiter) path += '?%s' % qs req = webob.Request.blank(path, environ={'REQUEST_METHOD': 'GET'}) - req.account = account - resp = self.upload_app.handle_request( - self.upload_app.update_request(req)) - tries = 1 - while (resp.status_int < 200 or resp.status_int > 299) \ - and tries <= self.retries: - resp = self.upload_app.handle_request( - self.upload_app.update_request(req)) - tries += 1 + resp = self._handle_request(req) + if resp.status_int < 200 or resp.status_int >= 300: + return [] # TODO: distinguish between 404 and empty container if resp.status_int == 204: return [] - if 200 <= resp.status_int < 300: - return json_loads(resp.body) + return json_loads(resp.body) diff --git a/swift/stats/log_processor.py b/swift/stats/log_processor.py index 30ca062912..ce9694e4e6 100644 --- a/swift/stats/log_processor.py +++ b/swift/stats/log_processor.py @@ -147,13 +147,12 @@ class LogProcessor(object): marker=search_key, end_marker=end_key) results = [] - if container_listing is not None: - if listing_filter is None: - listing_filter = set() - for item in container_listing: - name = item['name'] - if name not in listing_filter: - results.append(name) + if listing_filter is None: + listing_filter = set() + for item in container_listing: + name = item['name'] + if name not in listing_filter: + results.append(name) return results def get_object_data(self, swift_account, container_name, object_name, diff --git a/test/unit/common/test_internal_proxy.py b/test/unit/common/test_internal_proxy.py index 719970118f..a2e82f8d31 100644 --- a/test/unit/common/test_internal_proxy.py +++ b/test/unit/common/test_internal_proxy.py @@ -16,13 +16,176 @@ # TODO: Tests import unittest +import webob +import tempfile +import json + from swift.common import internal_proxy +class DumbBaseApplicationFactory(object): + + def __init__(self, status_codes, body=''): + self.status_codes = status_codes[:] + self.body = body + + def __call__(self, *a, **kw): + app = DumbBaseApplication(*a, **kw) + app.status_codes = self.status_codes + try: + app.default_status_code = self.status_codes[-1] + except IndexError: + app.default_status_code = 200 + app.body = self.body + return app + +class DumbBaseApplication(object): + + def __init__(self, *a, **kw): + self.status_codes = [] + self.default_status_code = 200 + self.call_count = 0 + self.body = '' + + def handle_request(self, req): + self.call_count += 1 + req.path_info_pop() + if isinstance(self.body, list): + try: + body = self.body.pop(0) + except IndexError: + body = '' + else: + body = self.body + resp = webob.Response(request=req, body=body, + conditional_response=True) + try: + resp.status_int = self.status_codes.pop(0) + except IndexError: + resp.status_int = self.default_status_code + return resp + + def update_request(self, req): + return req + class TestInternalProxy(unittest.TestCase): - def test_placeholder(self): - pass + def test_webob_request_copy(self): + req = webob.Request.blank('/') + req2 = internal_proxy.webob_request_copy(req) + self.assertEquals(req.path, req2.path) + self.assertEquals(req.path_info, req2.path_info) + self.assertFalse(req is req2) + + def test_handle_request(self): + status_codes = [200] + internal_proxy.BaseApplication = DumbBaseApplicationFactory( + status_codes) + p = internal_proxy.InternalProxy() + req = webob.Request.blank('/') + orig_req = internal_proxy.webob_request_copy(req) + resp = p._handle_request(req) + self.assertEquals(req.path_info, orig_req.path_info) + + def test_handle_request_with_retries(self): + status_codes = [500, 200] + internal_proxy.BaseApplication = DumbBaseApplicationFactory( + status_codes) + p = internal_proxy.InternalProxy(retries=3) + req = webob.Request.blank('/') + orig_req = internal_proxy.webob_request_copy(req) + resp = p._handle_request(req) + self.assertEquals(req.path_info, orig_req.path_info) + self.assertEquals(p.upload_app.call_count, 2) + self.assertEquals(resp.status_int, 200) + + def test_get_object(self): + status_codes = [200] + internal_proxy.BaseApplication = DumbBaseApplicationFactory( + status_codes) + p = internal_proxy.InternalProxy() + code, body = p.get_object('a', 'c', 'o') + body = ''.join(body) + self.assertEquals(code, 200) + self.assertEquals(body, '') + + def test_create_container(self): + status_codes = [200] + internal_proxy.BaseApplication = DumbBaseApplicationFactory( + status_codes) + p = internal_proxy.InternalProxy() + resp = p.create_container('a', 'c') + self.assertTrue(resp) + + def test_handle_request_with_retries_all_error(self): + status_codes = [500, 500, 500, 500, 500] + internal_proxy.BaseApplication = DumbBaseApplicationFactory( + status_codes) + p = internal_proxy.InternalProxy(retries=3) + req = webob.Request.blank('/') + orig_req = internal_proxy.webob_request_copy(req) + resp = p._handle_request(req) + self.assertEquals(req.path_info, orig_req.path_info) + self.assertEquals(p.upload_app.call_count, 3) + self.assertEquals(resp.status_int, 500) + + def test_get_container_list_empty(self): + status_codes = [200] + internal_proxy.BaseApplication = DumbBaseApplicationFactory( + status_codes, body='[]') + p = internal_proxy.InternalProxy() + resp = p.get_container_list('a', 'c') + self.assertEquals(resp, []) + + def test_get_container_list_no_body(self): + status_codes = [204] + internal_proxy.BaseApplication = DumbBaseApplicationFactory( + status_codes, body='') + p = internal_proxy.InternalProxy() + resp = p.get_container_list('a', 'c') + self.assertEquals(resp, []) + + def test_get_container_list_full_listing(self): + status_codes = [200, 200] + obj_a = dict(name='foo', hash='foo', bytes=3, + content_type='text/plain', last_modified='2011/01/01') + obj_b = dict(name='bar', hash='bar', bytes=3, + content_type='text/plain', last_modified='2011/01/01') + body = [json.dumps([obj_a]), json.dumps([obj_b]), json.dumps([])] + internal_proxy.BaseApplication = DumbBaseApplicationFactory( + status_codes, body=body) + p = internal_proxy.InternalProxy() + resp = p.get_container_list('a', 'c') + expected = ['foo', 'bar'] + self.assertEquals([x['name'] for x in resp], expected) + + def test_get_container_list_full(self): + status_codes = [204] + internal_proxy.BaseApplication = DumbBaseApplicationFactory( + status_codes, body='') + p = internal_proxy.InternalProxy() + resp = p.get_container_list('a', 'c', marker='a', end_marker='b', + limit=100, prefix='/', delimiter='.') + self.assertEquals(resp, []) + + def test_upload_file(self): + status_codes = [200, 200] # container PUT + object PUT + internal_proxy.BaseApplication = DumbBaseApplicationFactory( + status_codes) + p = internal_proxy.InternalProxy() + with tempfile.NamedTemporaryFile() as file_obj: + resp = p.upload_file(file_obj.name, 'a', 'c', 'o') + self.assertTrue(resp) + + def test_upload_file_with_retries(self): + status_codes = [200, 500, 200] # container PUT + error + object PUT + internal_proxy.BaseApplication = DumbBaseApplicationFactory( + status_codes) + p = internal_proxy.InternalProxy(retries=3) + with tempfile.NamedTemporaryFile() as file_obj: + resp = p.upload_file(file_obj, 'a', 'c', 'o') + self.assertTrue(resp) + self.assertEquals(p.upload_app.call_count, 3) if __name__ == '__main__':