Stop reading from object server when client disconnects.
If a client were in the middle of an object GET request and then disconnected, the proxy would wait a while (default 60s) and then time out the connection. As part of the teardown for this, the proxy would attempt to close the connection to the object server, then drain any associated buffers. However, this didn't work particularly well, resulting in the proxy reading the entire remainder of the object for no gain. Now, the proxy closes the connection hard, by calling .close() on the underlying socket._socket object. This is different from calling .close() on a socket._socketobject object, which is what you get back from socket.socket() and similar methods. Calling .close() on a socket._socketobject simply decrements a reference counter on the socket._socket, which has been observed in the past to result in socket leaks when something holds onto a reference. However, calling .close() on a socket._socket actually closes the socket regardless of who else has a reference to it. I had to delete a test assertion that said the object server never got SIGPIPE after a GET w/X-Newest. Well, you get a SIGPIPE when you write to a closed socket, and now the proxy is actually closing the sockets early, so now you *do* get a SIGPIPE. closes-bug: 1174660 Note that this will cause a regression on bug 1037337; unfortunately, the cure is worse than the disease, so out it goes. Change-Id: I9c7a2e7fdb8b4232e53ea96f86b50e8d34c27221
This commit is contained in:
parent
34340ddf49
commit
def37fb56a
@ -41,6 +41,10 @@ class BufferedHTTPResponse(HTTPResponse):
|
||||
def __init__(self, sock, debuglevel=0, strict=0,
|
||||
method=None): # pragma: no cover
|
||||
self.sock = sock
|
||||
# sock is an eventlet.greenio.GreenSocket
|
||||
# sock.fd is a socket._socketobject
|
||||
# sock.fd._sock is a socket._socket object, which is what we want.
|
||||
self._real_socket = sock.fd._sock
|
||||
self.fp = sock.makefile('rb')
|
||||
self.debuglevel = debuglevel
|
||||
self.strict = strict
|
||||
@ -74,9 +78,25 @@ class BufferedHTTPResponse(HTTPResponse):
|
||||
self.msg = HTTPMessage(self.fp, 0)
|
||||
self.msg.fp = None
|
||||
|
||||
def nuke_from_orbit(self):
|
||||
"""
|
||||
Terminate the socket with extreme prejudice.
|
||||
|
||||
Closes the underlying socket regardless of whether or not anyone else
|
||||
has references to it. Use this when you are certain that nobody else
|
||||
you care about has a reference to this socket.
|
||||
"""
|
||||
if self._real_socket:
|
||||
# this is idempotent; see sock_close in Modules/socketmodule.c in
|
||||
# the Python source for details.
|
||||
self._real_socket.close()
|
||||
self._real_socket = None
|
||||
self.close()
|
||||
|
||||
def close(self):
|
||||
HTTPResponse.close(self)
|
||||
self.sock = None
|
||||
self._real_socket = None
|
||||
|
||||
|
||||
class BufferedHTTPConnection(HTTPConnection):
|
||||
|
@ -987,17 +987,17 @@ class Controller(object):
|
||||
:param src: the response from the backend
|
||||
"""
|
||||
try:
|
||||
src.swift_conn.close()
|
||||
except Exception:
|
||||
pass
|
||||
src.swift_conn = None
|
||||
try:
|
||||
while src.read(self.app.object_chunk_size):
|
||||
pass
|
||||
except Exception:
|
||||
pass
|
||||
try:
|
||||
src.close()
|
||||
# Since the backends set "Connection: close" in their response
|
||||
# headers, the response object (src) is solely responsible for the
|
||||
# socket. The connection object (src.swift_conn) has no references
|
||||
# to the socket, so calling its close() method does nothing, and
|
||||
# therefore we don't do it.
|
||||
#
|
||||
# Also, since calling the response's close() method might not
|
||||
# close the underlying socket but only decrement some
|
||||
# reference-counter, we have a special method here that really,
|
||||
# really kills the underlying socket with a close() syscall.
|
||||
src.nuke_from_orbit() # it's the only way to be sure
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
|
@ -20,7 +20,6 @@ import os
|
||||
import sys
|
||||
import unittest
|
||||
import urlparse
|
||||
import signal
|
||||
from contextlib import contextmanager, nested, closing
|
||||
from gzip import GzipFile
|
||||
from shutil import rmtree
|
||||
@ -681,41 +680,31 @@ class TestObjectController(unittest.TestCase):
|
||||
self.assertEquals(res.status_int, expected)
|
||||
|
||||
def test_GET_newest_large_file(self):
|
||||
calls = [0]
|
||||
|
||||
def handler(_junk1, _junk2):
|
||||
calls[0] += 1
|
||||
|
||||
old_handler = signal.signal(signal.SIGPIPE, handler)
|
||||
try:
|
||||
prolis = _test_sockets[0]
|
||||
prosrv = _test_servers[0]
|
||||
sock = connect_tcp(('localhost', prolis.getsockname()[1]))
|
||||
fd = sock.makefile()
|
||||
obj = 'a' * (1024 * 1024)
|
||||
path = '/v1/a/c/o.large'
|
||||
fd.write('PUT %s HTTP/1.1\r\n'
|
||||
'Host: localhost\r\n'
|
||||
'Connection: close\r\n'
|
||||
'X-Storage-Token: t\r\n'
|
||||
'Content-Length: %s\r\n'
|
||||
'Content-Type: application/octet-stream\r\n'
|
||||
'\r\n%s' % (path, str(len(obj)), obj))
|
||||
fd.flush()
|
||||
headers = readuntil2crlfs(fd)
|
||||
exp = 'HTTP/1.1 201'
|
||||
self.assertEqual(headers[:len(exp)], exp)
|
||||
req = Request.blank(path,
|
||||
environ={'REQUEST_METHOD': 'GET'},
|
||||
headers={'Content-Type':
|
||||
'application/octet-stream',
|
||||
'X-Newest': 'true'})
|
||||
res = req.get_response(prosrv)
|
||||
self.assertEqual(res.status_int, 200)
|
||||
self.assertEqual(res.body, obj)
|
||||
self.assertEqual(calls[0], 0)
|
||||
finally:
|
||||
signal.signal(signal.SIGPIPE, old_handler)
|
||||
prolis = _test_sockets[0]
|
||||
prosrv = _test_servers[0]
|
||||
sock = connect_tcp(('localhost', prolis.getsockname()[1]))
|
||||
fd = sock.makefile()
|
||||
obj = 'a' * (1024 * 1024)
|
||||
path = '/v1/a/c/o.large'
|
||||
fd.write('PUT %s HTTP/1.1\r\n'
|
||||
'Host: localhost\r\n'
|
||||
'Connection: close\r\n'
|
||||
'X-Storage-Token: t\r\n'
|
||||
'Content-Length: %s\r\n'
|
||||
'Content-Type: application/octet-stream\r\n'
|
||||
'\r\n%s' % (path, str(len(obj)), obj))
|
||||
fd.flush()
|
||||
headers = readuntil2crlfs(fd)
|
||||
exp = 'HTTP/1.1 201'
|
||||
self.assertEqual(headers[:len(exp)], exp)
|
||||
req = Request.blank(path,
|
||||
environ={'REQUEST_METHOD': 'GET'},
|
||||
headers={'Content-Type':
|
||||
'application/octet-stream',
|
||||
'X-Newest': 'true'})
|
||||
res = req.get_response(prosrv)
|
||||
self.assertEqual(res.status_int, 200)
|
||||
self.assertEqual(res.body, obj)
|
||||
|
||||
def test_PUT_expect_header_zero_content_length(self):
|
||||
test_errors = []
|
||||
|
Loading…
x
Reference in New Issue
Block a user