Fixed leak on 499s
This fixes an issue where Request objects (and related objects) were not getting garbage collected when a 499 (client disconnect) occurred for responses that still would have had more than the proxy server's client chunk size left to send. Fixed bug #1055834 Change-Id: Ib7b5fffe970dcdec60b90a9bdcd67c49b68fb914
This commit is contained in:
parent
57b10cd38c
commit
7e0c4eb152
@ -495,45 +495,40 @@ class Controller(object):
|
|||||||
if getattr(source, 'swift_conn', None):
|
if getattr(source, 'swift_conn', None):
|
||||||
self.close_swift_conn(source)
|
self.close_swift_conn(source)
|
||||||
|
|
||||||
def _make_app_iter(self, node, source, response):
|
def _make_app_iter(self, node, source):
|
||||||
"""
|
"""
|
||||||
Returns an iterator over the contents of the source (via its read
|
Returns an iterator over the contents of the source (via its read
|
||||||
func). There is also quite a bit of cleanup to ensure garbage
|
func). There is also quite a bit of cleanup to ensure garbage
|
||||||
collection works and the underlying socket of the source is closed.
|
collection works and the underlying socket of the source is closed.
|
||||||
|
|
||||||
:param response: The webob.Response object this iterator should be
|
|
||||||
assigned to via response.app_iter.
|
|
||||||
:param source: The httplib.Response object this iterator should read
|
:param source: The httplib.Response object this iterator should read
|
||||||
from.
|
from.
|
||||||
:param node: The node the source is reading from, for logging purposes.
|
:param node: The node the source is reading from, for logging purposes.
|
||||||
"""
|
"""
|
||||||
try:
|
try:
|
||||||
try:
|
# Spawn reader to read from the source and place in the queue.
|
||||||
# Spawn reader to read from the source and place in the queue.
|
# We then drop any reference to the source or node, for garbage
|
||||||
# We then drop any reference to the source or node, for garbage
|
# collection purposes.
|
||||||
# collection purposes.
|
queue = Queue(1)
|
||||||
queue = Queue(1)
|
spawn_n(self._make_app_iter_reader, node, source, queue,
|
||||||
spawn_n(self._make_app_iter_reader, node, source, queue,
|
self.app.logger.thread_locals)
|
||||||
self.app.logger.thread_locals)
|
source = node = None
|
||||||
source = node = None
|
while True:
|
||||||
while True:
|
chunk = queue.get(timeout=self.app.node_timeout)
|
||||||
chunk = queue.get(timeout=self.app.node_timeout)
|
if isinstance(chunk, bool): # terminator
|
||||||
if isinstance(chunk, bool): # terminator
|
success = chunk
|
||||||
success = chunk
|
if not success:
|
||||||
if not success:
|
raise Exception(_('Failed to read all data'
|
||||||
raise Exception(_('Failed to read all data'
|
' from the source'))
|
||||||
' from the source'))
|
break
|
||||||
break
|
yield chunk
|
||||||
yield chunk
|
except Empty:
|
||||||
except Empty:
|
raise ChunkReadTimeout()
|
||||||
raise ChunkReadTimeout()
|
except (GeneratorExit, Timeout):
|
||||||
except (GeneratorExit, Timeout):
|
self.app.logger.warn(_('Client disconnected on read'))
|
||||||
self.app.logger.warn(_('Client disconnected on read'))
|
except Exception:
|
||||||
except Exception:
|
self.app.logger.exception(_('Trying to send to client'))
|
||||||
self.app.logger.exception(_('Trying to send to client'))
|
raise
|
||||||
raise
|
|
||||||
finally:
|
|
||||||
response.app_iter = None
|
|
||||||
|
|
||||||
def close_swift_conn(self, src):
|
def close_swift_conn(self, src):
|
||||||
try:
|
try:
|
||||||
@ -646,7 +641,7 @@ class Controller(object):
|
|||||||
self.close_swift_conn(src)
|
self.close_swift_conn(src)
|
||||||
|
|
||||||
res = Response(request=req, conditional_response=True)
|
res = Response(request=req, conditional_response=True)
|
||||||
res.app_iter = self._make_app_iter(node, source, res)
|
res.app_iter = self._make_app_iter(node, source)
|
||||||
# See NOTE: swift_conn at top of file about this.
|
# See NOTE: swift_conn at top of file about this.
|
||||||
res.swift_conn = source.swift_conn
|
res.swift_conn = source.swift_conn
|
||||||
update_headers(res, source.getheaders())
|
update_headers(res, source.getheaders())
|
||||||
|
@ -59,9 +59,29 @@ import swift.proxy.controllers
|
|||||||
logging.getLogger().addHandler(logging.StreamHandler(sys.stdout))
|
logging.getLogger().addHandler(logging.StreamHandler(sys.stdout))
|
||||||
|
|
||||||
|
|
||||||
|
_request_instances = 0
|
||||||
|
|
||||||
|
|
||||||
|
def request_init(self, *args, **kwargs):
|
||||||
|
global _request_instances
|
||||||
|
self._orig_init(*args, **kwargs)
|
||||||
|
_request_instances += 1
|
||||||
|
|
||||||
|
|
||||||
|
def request_del(self):
|
||||||
|
global _request_instances
|
||||||
|
if self._orig_del:
|
||||||
|
self._orig_del()
|
||||||
|
_request_instances -= 1
|
||||||
|
|
||||||
|
|
||||||
def setup():
|
def setup():
|
||||||
global _testdir, _test_servers, _test_sockets, \
|
global _testdir, _test_servers, _test_sockets, \
|
||||||
_orig_container_listing_limit, _test_coros
|
_orig_container_listing_limit, _test_coros
|
||||||
|
Request._orig_init = Request.__init__
|
||||||
|
Request.__init__ = request_init
|
||||||
|
Request._orig_del = getattr(Request, '__del__', None)
|
||||||
|
Request.__del__ = request_del
|
||||||
monkey_patch_mimetools()
|
monkey_patch_mimetools()
|
||||||
# Since we're starting up a lot here, we're going to test more than
|
# Since we're starting up a lot here, we're going to test more than
|
||||||
# just chunked puts; we're also going to test parts of
|
# just chunked puts; we're also going to test parts of
|
||||||
@ -153,6 +173,9 @@ def teardown():
|
|||||||
swift.proxy.controllers.obj.CONTAINER_LISTING_LIMIT = \
|
swift.proxy.controllers.obj.CONTAINER_LISTING_LIMIT = \
|
||||||
_orig_container_listing_limit
|
_orig_container_listing_limit
|
||||||
rmtree(os.path.dirname(_testdir))
|
rmtree(os.path.dirname(_testdir))
|
||||||
|
Request.__init__ = Request._orig_init
|
||||||
|
if Request._orig_del:
|
||||||
|
Request.__del__ = Request._orig_del
|
||||||
|
|
||||||
|
|
||||||
def fake_http_connect(*code_iter, **kwargs):
|
def fake_http_connect(*code_iter, **kwargs):
|
||||||
@ -3259,6 +3282,43 @@ class TestObjectController(unittest.TestCase):
|
|||||||
self.assertEquals(resp.status_int, 400)
|
self.assertEquals(resp.status_int, 400)
|
||||||
self.assertTrue('X-Delete-At in past' in resp.body)
|
self.assertTrue('X-Delete-At in past' in resp.body)
|
||||||
|
|
||||||
|
def test_leak_1(self):
|
||||||
|
global _request_instances
|
||||||
|
prolis = _test_sockets[0]
|
||||||
|
prosrv = _test_servers[0]
|
||||||
|
obj_len = prosrv.client_chunk_size * 2
|
||||||
|
# PUT test file
|
||||||
|
sock = connect_tcp(('localhost', prolis.getsockname()[1]))
|
||||||
|
fd = sock.makefile()
|
||||||
|
fd.write('PUT /v1/a/c/test_leak_1 HTTP/1.1\r\n'
|
||||||
|
'Host: localhost\r\n'
|
||||||
|
'Connection: close\r\n'
|
||||||
|
'X-Auth-Token: t\r\n'
|
||||||
|
'Content-Length: %s\r\n'
|
||||||
|
'Content-Type: application/octet-stream\r\n'
|
||||||
|
'\r\n%s' % (obj_len, 'a' * obj_len))
|
||||||
|
fd.flush()
|
||||||
|
headers = readuntil2crlfs(fd)
|
||||||
|
exp = 'HTTP/1.1 201'
|
||||||
|
self.assertEqual(headers[:len(exp)], exp)
|
||||||
|
# Remember Request instance count
|
||||||
|
before_request_instances = _request_instances
|
||||||
|
# GET test file, but disconnect early
|
||||||
|
sock = connect_tcp(('localhost', prolis.getsockname()[1]))
|
||||||
|
fd = sock.makefile()
|
||||||
|
fd.write('GET /v1/a/c/test_leak_1 HTTP/1.1\r\n'
|
||||||
|
'Host: localhost\r\n'
|
||||||
|
'Connection: close\r\n'
|
||||||
|
'X-Auth-Token: t\r\n'
|
||||||
|
'\r\n')
|
||||||
|
fd.flush()
|
||||||
|
headers = readuntil2crlfs(fd)
|
||||||
|
exp = 'HTTP/1.1 200'
|
||||||
|
self.assertEqual(headers[:len(exp)], exp)
|
||||||
|
fd.read(1)
|
||||||
|
fd.close()
|
||||||
|
sock.close()
|
||||||
|
self.assertEquals(before_request_instances, _request_instances)
|
||||||
|
|
||||||
class TestContainerController(unittest.TestCase):
|
class TestContainerController(unittest.TestCase):
|
||||||
"Test swift.proxy_server.ContainerController"
|
"Test swift.proxy_server.ContainerController"
|
||||||
|
Loading…
Reference in New Issue
Block a user