From 9290471b61a98a1882f0d9e5ce7d883428e2ff36 Mon Sep 17 00:00:00 2001 From: Constantine Peresypkin Date: Fri, 24 Aug 2012 20:20:14 +0300 Subject: [PATCH] x-newest cleanup code with test. Fixes bug 1037337 Change-Id: Ie99250250171246e8c13e8d8c8258101bd78cce4 --- swift/proxy/controllers/base.py | 47 +++++++++++++++++++++------------ test/unit/proxy/test_server.py | 37 ++++++++++++++++++++++++++ 2 files changed, 67 insertions(+), 17 deletions(-) diff --git a/swift/proxy/controllers/base.py b/swift/proxy/controllers/base.py index ac032676f8..426e6a1307 100644 --- a/swift/proxy/controllers/base.py +++ b/swift/proxy/controllers/base.py @@ -504,20 +504,7 @@ class Controller(object): queue.put(success) # Close-out the connection as best as possible. if getattr(source, 'swift_conn', None): - try: - source.swift_conn.close() - except Exception: - pass - source.swift_conn = None - try: - while source.read(self.app.object_chunk_size): - pass - except Exception: - pass - try: - source.close() - except Exception: - pass + self.close_swift_conn(source) def _make_app_iter(self, node, source, response): """ @@ -559,6 +546,22 @@ class Controller(object): finally: response.app_iter = None + def close_swift_conn(self, src): + try: + src.swift_conn.close() + except Exception: + pass + src.swift_conn = None + try: + while src.read(self.app.object_chunk_size): + pass + except Exception: + pass + try: + src.close() + except Exception: + pass + def GETorHEAD_base(self, req, server_type, partition, nodes, path, attempts): """ @@ -576,6 +579,7 @@ class Controller(object): reasons = [] bodies = [] source = None + sources = [] newest = req.headers.get('x-newest', 'f').lower() in TRUE_VALUES nodes = iter(nodes) while len(statuses) < attempts: @@ -615,16 +619,19 @@ class Controller(object): possible_source.read() continue if newest: - if source: + if sources: ts = float(source.getheader('x-put-timestamp') or source.getheader('x-timestamp') or 0) pts = float( possible_source.getheader('x-put-timestamp') or possible_source.getheader('x-timestamp') or 0) if pts > ts: - source = possible_source + sources.insert(0, possible_source) + else: + sources.append(possible_source) else: - source = possible_source + sources.insert(0, possible_source) + source = sources[0] statuses.append(source.status) reasons.append(source.reason) bodies.append('') @@ -643,6 +650,12 @@ class Controller(object): if source: if req.method == 'GET' and \ source.status in (HTTP_OK, HTTP_PARTIAL_CONTENT): + if newest: + # we need to close all hanging swift_conns + sources.pop(0) + for src in sources: + self.close_swift_conn(src) + res = Response(request=req, conditional_response=True) res.app_iter = self._make_app_iter(node, source, res) # See NOTE: swift_conn at top of file about this. diff --git a/test/unit/proxy/test_server.py b/test/unit/proxy/test_server.py index 62eefc137b..09cae90795 100644 --- a/test/unit/proxy/test_server.py +++ b/test/unit/proxy/test_server.py @@ -20,6 +20,7 @@ from logging.handlers import SysLogHandler import os import sys import unittest +import signal from ConfigParser import ConfigParser from contextlib import contextmanager from cStringIO import StringIO @@ -704,6 +705,42 @@ class TestObjectController(unittest.TestCase): res = method(req) self.assertEquals(res.status_int, expected) + def test_GET_newest_large_file(self): + calls = [0] + + def handler(_junk1, _junk2): + calls[0] += 1 + + try: + signal.signal(signal.SIGPIPE, handler) + prolis = _test_sockets[0] + prosrv = _test_servers[0] + sock = connect_tcp(('localhost', prolis.getsockname()[1])) + fd = sock.makefile() + obj = 'a' * (1024 * 1024) + path = '/v1/a/c/o.large' + fd.write('PUT %s HTTP/1.1\r\n' + 'Host: localhost\r\n' + 'Connection: close\r\n' + 'X-Storage-Token: t\r\n' + 'Content-Length: %s\r\n' + 'Content-Type: application/octet-stream\r\n' + '\r\n%s' % (path, str(len(obj)), obj)) + fd.flush() + headers = readuntil2crlfs(fd) + exp = 'HTTP/1.1 201' + self.assertEqual(headers[:len(exp)], exp) + req = Request.blank(path, + environ={'REQUEST_METHOD': 'GET'}, + headers={'Content-Type': 'application/octet-stream', + 'X-Newest':'true'}) + res = req.get_response(prosrv) + self.assertEqual(res.status_int, 200) + self.assertEqual(res.body, obj) + self.assertEqual(calls[0], 0) + finally: + signal.signal(signal.SIGPIPE, signal.SIG_DFL) + def test_PUT_auto_content_type(self): with save_globals(): controller = proxy_server.ObjectController(self.app, 'account',