Proxy: use just one greenthread per client
Instead of using two greenthreads and an eventlet.queue.Queue of size 1 to stream response bodies from backends to clients, just do it with a single greenthread that reads and writes in a loop. This should lower the amount of CPU used by the proxy in its response streaming. Client fairness used to be provided implicitly; since the queue only held 1 item, the read-from-backend greenthread would block after each chunk until the write-to-client greenthread got a chance to run. Now fairness is provided by an explicit eventlet.sleep() call, just as it is in the object server. Change-Id: Iae27109f5a3d109ad21ec9a972e39f22150f6dbb
This commit is contained in:
parent
2b639f5ecc
commit
a4e25d6da8
@ -32,8 +32,7 @@ import itertools
|
|||||||
from swift import gettext_ as _
|
from swift import gettext_ as _
|
||||||
from urllib import quote
|
from urllib import quote
|
||||||
|
|
||||||
from eventlet import spawn_n, GreenPile
|
from eventlet import sleep, GreenPile
|
||||||
from eventlet.queue import Queue, Empty, Full
|
|
||||||
from eventlet.timeout import Timeout
|
from eventlet.timeout import Timeout
|
||||||
|
|
||||||
from swift.common.wsgi import make_pre_authed_env
|
from swift.common.wsgi import make_pre_authed_env
|
||||||
@ -41,7 +40,8 @@ from swift.common.utils import normalize_timestamp, config_true_value, \
|
|||||||
public, split_path, list_from_csv, GreenthreadSafeIterator, \
|
public, split_path, list_from_csv, GreenthreadSafeIterator, \
|
||||||
quorum_size
|
quorum_size
|
||||||
from swift.common.bufferedhttp import http_connect
|
from swift.common.bufferedhttp import http_connect
|
||||||
from swift.common.exceptions import ChunkReadTimeout, ConnectionTimeout
|
from swift.common.exceptions import ChunkReadTimeout, ChunkWriteTimeout, \
|
||||||
|
ConnectionTimeout
|
||||||
from swift.common.http import is_informational, is_success, is_redirection, \
|
from swift.common.http import is_informational, is_success, is_redirection, \
|
||||||
is_server_error, HTTP_OK, HTTP_PARTIAL_CONTENT, HTTP_MULTIPLE_CHOICES, \
|
is_server_error, HTTP_OK, HTTP_PARTIAL_CONTENT, HTTP_MULTIPLE_CHOICES, \
|
||||||
HTTP_BAD_REQUEST, HTTP_NOT_FOUND, HTTP_SERVICE_UNAVAILABLE, \
|
HTTP_BAD_REQUEST, HTTP_NOT_FOUND, HTTP_SERVICE_UNAVAILABLE, \
|
||||||
@ -903,48 +903,6 @@ class Controller(object):
|
|||||||
"""
|
"""
|
||||||
return self.GETorHEAD(req)
|
return self.GETorHEAD(req)
|
||||||
|
|
||||||
def _make_app_iter_reader(self, node, source, queue, logger_thread_locals):
|
|
||||||
"""
|
|
||||||
Reads from the source and places data in the queue. It expects
|
|
||||||
something else be reading from the queue and, if nothing does within
|
|
||||||
self.app.client_timeout seconds, the process will be aborted.
|
|
||||||
|
|
||||||
:param node: The node dict that the source is connected to, for
|
|
||||||
logging/error-limiting purposes.
|
|
||||||
:param source: The httplib.Response object to read from.
|
|
||||||
:param queue: The eventlet.queue.Queue to place read source data into.
|
|
||||||
:param logger_thread_locals: The thread local values to be set on the
|
|
||||||
self.app.logger to retain transaction
|
|
||||||
logging information.
|
|
||||||
"""
|
|
||||||
self.app.logger.thread_locals = logger_thread_locals
|
|
||||||
success = True
|
|
||||||
try:
|
|
||||||
try:
|
|
||||||
while True:
|
|
||||||
with ChunkReadTimeout(self.app.node_timeout):
|
|
||||||
chunk = source.read(self.app.object_chunk_size)
|
|
||||||
if not chunk:
|
|
||||||
break
|
|
||||||
queue.put(chunk, timeout=self.app.client_timeout)
|
|
||||||
except Full:
|
|
||||||
self.app.logger.warn(
|
|
||||||
_('Client did not read from queue within %ss') %
|
|
||||||
self.app.client_timeout)
|
|
||||||
self.app.logger.increment('client_timeouts')
|
|
||||||
success = False
|
|
||||||
except (Exception, Timeout):
|
|
||||||
self.exception_occurred(node, _('Object'),
|
|
||||||
_('Trying to read during GET'))
|
|
||||||
success = False
|
|
||||||
finally:
|
|
||||||
# Ensure the queue getter gets a terminator.
|
|
||||||
queue.resize(2)
|
|
||||||
queue.put(success)
|
|
||||||
# Close-out the connection as best as possible.
|
|
||||||
if getattr(source, 'swift_conn', None):
|
|
||||||
self.close_swift_conn(source)
|
|
||||||
|
|
||||||
def _make_app_iter(self, node, source):
|
def _make_app_iter(self, node, source):
|
||||||
"""
|
"""
|
||||||
Returns an iterator over the contents of the source (via its read
|
Returns an iterator over the contents of the source (via its read
|
||||||
@ -956,29 +914,49 @@ class Controller(object):
|
|||||||
:param node: The node the source is reading from, for logging purposes.
|
:param node: The node the source is reading from, for logging purposes.
|
||||||
"""
|
"""
|
||||||
try:
|
try:
|
||||||
# Spawn reader to read from the source and place in the queue.
|
nchunks = 0
|
||||||
# We then drop any reference to the source or node, for garbage
|
|
||||||
# collection purposes.
|
|
||||||
queue = Queue(1)
|
|
||||||
spawn_n(self._make_app_iter_reader, node, source, queue,
|
|
||||||
self.app.logger.thread_locals)
|
|
||||||
source = node = None
|
|
||||||
while True:
|
while True:
|
||||||
chunk = queue.get(timeout=self.app.node_timeout)
|
with ChunkReadTimeout(self.app.node_timeout):
|
||||||
if isinstance(chunk, bool): # terminator
|
chunk = source.read(self.app.object_chunk_size)
|
||||||
success = chunk
|
nchunks += 1
|
||||||
if not success:
|
if not chunk:
|
||||||
raise Exception(_('Failed to read all data'
|
|
||||||
' from the source'))
|
|
||||||
break
|
break
|
||||||
yield chunk
|
with ChunkWriteTimeout(self.app.client_timeout):
|
||||||
except Empty:
|
yield chunk
|
||||||
raise ChunkReadTimeout()
|
# This is for fairness; if the network is outpacing the CPU,
|
||||||
except (GeneratorExit, Timeout):
|
# we'll always be able to read and write data without
|
||||||
|
# encountering an EWOULDBLOCK, and so eventlet will not switch
|
||||||
|
# greenthreads on its own. We do it manually so that clients
|
||||||
|
# don't starve.
|
||||||
|
#
|
||||||
|
# The number 5 here was chosen by making stuff up. It's not
|
||||||
|
# every single chunk, but it's not too big either, so it seemed
|
||||||
|
# like it would probably be an okay choice.
|
||||||
|
#
|
||||||
|
# Note that we may trampoline to other greenthreads more often
|
||||||
|
# than once every 5 chunks, depending on how blocking our
|
||||||
|
# network IO is; the explicit sleep here simply provides a
|
||||||
|
# lower bound on the rate of trampolining.
|
||||||
|
if nchunks % 5 == 0:
|
||||||
|
sleep()
|
||||||
|
except ChunkReadTimeout:
|
||||||
|
self.exception_occurred(node, _('Object'),
|
||||||
|
_('Trying to read during GET'))
|
||||||
|
raise
|
||||||
|
except ChunkWriteTimeout:
|
||||||
|
self.app.logger.warn(
|
||||||
|
_('Client did not read from proxy within %ss') %
|
||||||
|
self.app.client_timeout)
|
||||||
|
self.app.logger.increment('client_timeouts')
|
||||||
|
except GeneratorExit:
|
||||||
self.app.logger.warn(_('Client disconnected on read'))
|
self.app.logger.warn(_('Client disconnected on read'))
|
||||||
except Exception:
|
except Exception:
|
||||||
self.app.logger.exception(_('Trying to send to client'))
|
self.app.logger.exception(_('Trying to send to client'))
|
||||||
raise
|
raise
|
||||||
|
finally:
|
||||||
|
# Close-out the connection as best as possible.
|
||||||
|
if getattr(source, 'swift_conn', None):
|
||||||
|
self.close_swift_conn(source)
|
||||||
|
|
||||||
def close_swift_conn(self, src):
|
def close_swift_conn(self, src):
|
||||||
"""
|
"""
|
||||||
|
Loading…
x
Reference in New Issue
Block a user