Merge "x-newest cleanup code with test. Fixes bug 1037337"
This commit is contained in:
commit
78dacc4663
@ -504,20 +504,7 @@ class Controller(object):
|
||||
queue.put(success)
|
||||
# Close-out the connection as best as possible.
|
||||
if getattr(source, 'swift_conn', None):
|
||||
try:
|
||||
source.swift_conn.close()
|
||||
except Exception:
|
||||
pass
|
||||
source.swift_conn = None
|
||||
try:
|
||||
while source.read(self.app.object_chunk_size):
|
||||
pass
|
||||
except Exception:
|
||||
pass
|
||||
try:
|
||||
source.close()
|
||||
except Exception:
|
||||
pass
|
||||
self.close_swift_conn(source)
|
||||
|
||||
def _make_app_iter(self, node, source, response):
|
||||
"""
|
||||
@ -559,6 +546,22 @@ class Controller(object):
|
||||
finally:
|
||||
response.app_iter = None
|
||||
|
||||
def close_swift_conn(self, src):
|
||||
try:
|
||||
src.swift_conn.close()
|
||||
except Exception:
|
||||
pass
|
||||
src.swift_conn = None
|
||||
try:
|
||||
while src.read(self.app.object_chunk_size):
|
||||
pass
|
||||
except Exception:
|
||||
pass
|
||||
try:
|
||||
src.close()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
def GETorHEAD_base(self, req, server_type, partition, nodes, path,
|
||||
attempts):
|
||||
"""
|
||||
@ -576,6 +579,7 @@ class Controller(object):
|
||||
reasons = []
|
||||
bodies = []
|
||||
source = None
|
||||
sources = []
|
||||
newest = req.headers.get('x-newest', 'f').lower() in TRUE_VALUES
|
||||
nodes = iter(nodes)
|
||||
while len(statuses) < attempts:
|
||||
@ -615,16 +619,19 @@ class Controller(object):
|
||||
possible_source.read()
|
||||
continue
|
||||
if newest:
|
||||
if source:
|
||||
if sources:
|
||||
ts = float(source.getheader('x-put-timestamp') or
|
||||
source.getheader('x-timestamp') or 0)
|
||||
pts = float(
|
||||
possible_source.getheader('x-put-timestamp') or
|
||||
possible_source.getheader('x-timestamp') or 0)
|
||||
if pts > ts:
|
||||
source = possible_source
|
||||
sources.insert(0, possible_source)
|
||||
else:
|
||||
sources.append(possible_source)
|
||||
else:
|
||||
source = possible_source
|
||||
sources.insert(0, possible_source)
|
||||
source = sources[0]
|
||||
statuses.append(source.status)
|
||||
reasons.append(source.reason)
|
||||
bodies.append('')
|
||||
@ -643,6 +650,12 @@ class Controller(object):
|
||||
if source:
|
||||
if req.method == 'GET' and \
|
||||
source.status in (HTTP_OK, HTTP_PARTIAL_CONTENT):
|
||||
if newest:
|
||||
# we need to close all hanging swift_conns
|
||||
sources.pop(0)
|
||||
for src in sources:
|
||||
self.close_swift_conn(src)
|
||||
|
||||
res = Response(request=req, conditional_response=True)
|
||||
res.app_iter = self._make_app_iter(node, source, res)
|
||||
# See NOTE: swift_conn at top of file about this.
|
||||
|
@ -20,6 +20,7 @@ from logging.handlers import SysLogHandler
|
||||
import os
|
||||
import sys
|
||||
import unittest
|
||||
import signal
|
||||
from ConfigParser import ConfigParser
|
||||
from contextlib import contextmanager
|
||||
from cStringIO import StringIO
|
||||
@ -704,6 +705,42 @@ class TestObjectController(unittest.TestCase):
|
||||
res = method(req)
|
||||
self.assertEquals(res.status_int, expected)
|
||||
|
||||
def test_GET_newest_large_file(self):
|
||||
calls = [0]
|
||||
|
||||
def handler(_junk1, _junk2):
|
||||
calls[0] += 1
|
||||
|
||||
try:
|
||||
signal.signal(signal.SIGPIPE, handler)
|
||||
prolis = _test_sockets[0]
|
||||
prosrv = _test_servers[0]
|
||||
sock = connect_tcp(('localhost', prolis.getsockname()[1]))
|
||||
fd = sock.makefile()
|
||||
obj = 'a' * (1024 * 1024)
|
||||
path = '/v1/a/c/o.large'
|
||||
fd.write('PUT %s HTTP/1.1\r\n'
|
||||
'Host: localhost\r\n'
|
||||
'Connection: close\r\n'
|
||||
'X-Storage-Token: t\r\n'
|
||||
'Content-Length: %s\r\n'
|
||||
'Content-Type: application/octet-stream\r\n'
|
||||
'\r\n%s' % (path, str(len(obj)), obj))
|
||||
fd.flush()
|
||||
headers = readuntil2crlfs(fd)
|
||||
exp = 'HTTP/1.1 201'
|
||||
self.assertEqual(headers[:len(exp)], exp)
|
||||
req = Request.blank(path,
|
||||
environ={'REQUEST_METHOD': 'GET'},
|
||||
headers={'Content-Type': 'application/octet-stream',
|
||||
'X-Newest':'true'})
|
||||
res = req.get_response(prosrv)
|
||||
self.assertEqual(res.status_int, 200)
|
||||
self.assertEqual(res.body, obj)
|
||||
self.assertEqual(calls[0], 0)
|
||||
finally:
|
||||
signal.signal(signal.SIGPIPE, signal.SIG_DFL)
|
||||
|
||||
def test_PUT_auto_content_type(self):
|
||||
with save_globals():
|
||||
controller = proxy_server.ObjectController(self.app, 'account',
|
||||
|
Loading…
x
Reference in New Issue
Block a user