Merge "Add concurrent reads option to proxy"

This commit is contained in:
Jenkins 2016-03-18 23:11:04 +00:00 committed by Gerrit Code Review
commit 4eb57e971d
12 changed files with 348 additions and 79 deletions

View File

@ -1367,6 +1367,36 @@ swift_owner_headers <see the sample These are the headers whose
headers> up to the auth system in use,
but usually indicates
administrative responsibilities.
sorting_method shuffle Storage nodes can be chosen at
random (shuffle), by using timing
measurements (timing), or by using
an explicit match (affinity).
Using timing measurements may allow
for lower overall latency, while
using affinity allows for finer
control. In both the timing and
affinity cases, equally-sorting nodes
are still randomly chosen to spread
load.
timing_expiry 300 If the "timing" sorting_method is
used, the timings will only be valid
for the number of seconds configured
by timing_expiry.
concurrent_gets off Use replica count number of
threads concurrently during a
GET/HEAD and return with the
first successful response. In
the EC case, this parameter only
effects an EC HEAD as an EC GET
behaves differently.
concurrency_timeout conn_timeout This parameter controls how long
to wait before firing off the
next concurrent_get thread. A
value of 0 would we fully concurrent
any other number will stagger the
firing of the threads. This number
should be between 0 and node_timeout.
The default is conn_timeout (0.5).
============================ =============== =============================
[tempauth]

View File

@ -164,13 +164,28 @@ use = egg:swift#proxy
# using affinity allows for finer control. In both the timing and
# affinity cases, equally-sorting nodes are still randomly chosen to
# spread load.
# The valid values for sorting_method are "affinity", "shuffle", and "timing".
# The valid values for sorting_method are "affinity", "shuffle", or "timing".
# sorting_method = shuffle
#
# If the "timing" sorting_method is used, the timings will only be valid for
# the number of seconds configured by timing_expiry.
# timing_expiry = 300
#
# By default on a GET/HEAD swift will connect to a storage node one at a time
# in a single thread. There is smarts in the order they are hit however. If you
# turn on concurrent_gets below, then replica count threads will be used.
# With addition of the concurrency_timeout option this will allow swift to send
# out GET/HEAD requests to the storage nodes concurrently and answer with the
# first to respond. With an EC policy the parameter only affects HEAD requests.
# concurrent_gets = off
#
# This parameter controls how long to wait before firing off the next
# concurrent_get thread. A value of 0 would be fully concurrent, any other
# number will stagger the firing of the threads. This number should be
# between 0 and node_timeout. The default is what ever you set for the
# conn_timeout parameter.
# concurrency_timeout = 0.5
#
# Set to the number of nodes to contact for a normal request. You can use
# '* replicas' at the end to have it use the number given times the number of
# replicas for the ring being used for the request.

View File

@ -2471,6 +2471,10 @@ class GreenAsyncPile(object):
finally:
self._inflight -= 1
@property
def inflight(self):
return self._inflight
def spawn(self, func, *args, **kwargs):
"""
Spawn a job in a green thread on the pile.
@ -2479,6 +2483,16 @@ class GreenAsyncPile(object):
self._inflight += 1
self._pool.spawn(self._run_func, func, args, kwargs)
def waitfirst(self, timeout):
"""
Wait up to timeout seconds for first result to come in.
:param timeout: seconds to wait for results
:returns: first item to come back, or None
"""
for result in self._wait(timeout, first_n=1):
return result
def waitall(self, timeout):
"""
Wait timeout seconds for any results to come in.
@ -2486,11 +2500,16 @@ class GreenAsyncPile(object):
:param timeout: seconds to wait for results
:returns: list of results accrued in that time
"""
return self._wait(timeout)
def _wait(self, timeout, first_n=None):
results = []
try:
with GreenAsyncPileWaitallTimeout(timeout):
while True:
results.append(next(self))
if first_n and len(results) >= first_n:
break
except (GreenAsyncPileWaitallTimeout, StopIteration):
pass
return results

View File

@ -60,10 +60,12 @@ class AccountController(Controller):
return resp
partition = self.app.account_ring.get_part(self.account_name)
concurrency = self.app.account_ring.replica_count \
if self.app.concurrent_gets else 1
node_iter = self.app.iter_nodes(self.app.account_ring, partition)
resp = self.GETorHEAD_base(
req, _('Account'), node_iter, partition,
req.swift_entity_path.rstrip('/'))
req.swift_entity_path.rstrip('/'), concurrency)
if resp.status_int == HTTP_NOT_FOUND:
if resp.headers.get('X-Account-Status', '').lower() == 'deleted':
resp.status = HTTP_GONE

View File

@ -623,7 +623,8 @@ def bytes_to_skip(record_size, range_start):
class ResumingGetter(object):
def __init__(self, app, req, server_type, node_iter, partition, path,
backend_headers, client_chunk_size=None, newest=None):
backend_headers, concurrency=1, client_chunk_size=None,
newest=None):
self.app = app
self.node_iter = node_iter
self.server_type = server_type
@ -634,6 +635,7 @@ class ResumingGetter(object):
self.skip_bytes = 0
self.used_nodes = []
self.used_source_etag = ''
self.concurrency = concurrency
# stuff from request
self.req_method = req.method
@ -649,6 +651,7 @@ class ResumingGetter(object):
self.reasons = []
self.bodies = []
self.source_headers = []
self.sources = []
# populated from response headers
self.start_byte = self.end_byte = self.length = None
@ -971,88 +974,106 @@ class ResumingGetter(object):
else:
return None
def _make_node_request(self, node, node_timeout, logger_thread_locals):
self.app.logger.thread_locals = logger_thread_locals
if node in self.used_nodes:
return False
start_node_timing = time.time()
try:
with ConnectionTimeout(self.app.conn_timeout):
conn = http_connect(
node['ip'], node['port'], node['device'],
self.partition, self.req_method, self.path,
headers=self.backend_headers,
query_string=self.req_query_string)
self.app.set_node_timing(node, time.time() - start_node_timing)
with Timeout(node_timeout):
possible_source = conn.getresponse()
# See NOTE: swift_conn at top of file about this.
possible_source.swift_conn = conn
except (Exception, Timeout):
self.app.exception_occurred(
node, self.server_type,
_('Trying to %(method)s %(path)s') %
{'method': self.req_method, 'path': self.req_path})
return False
if self.is_good_source(possible_source):
# 404 if we know we don't have a synced copy
if not float(possible_source.getheader('X-PUT-Timestamp', 1)):
self.statuses.append(HTTP_NOT_FOUND)
self.reasons.append('')
self.bodies.append('')
self.source_headers.append([])
close_swift_conn(possible_source)
else:
if self.used_source_etag:
src_headers = dict(
(k.lower(), v) for k, v in
possible_source.getheaders())
if self.used_source_etag != src_headers.get(
'x-object-sysmeta-ec-etag',
src_headers.get('etag', '')).strip('"'):
self.statuses.append(HTTP_NOT_FOUND)
self.reasons.append('')
self.bodies.append('')
self.source_headers.append([])
return False
self.statuses.append(possible_source.status)
self.reasons.append(possible_source.reason)
self.bodies.append(None)
self.source_headers.append(possible_source.getheaders())
self.sources.append((possible_source, node))
if not self.newest: # one good source is enough
return True
else:
self.statuses.append(possible_source.status)
self.reasons.append(possible_source.reason)
self.bodies.append(possible_source.read())
self.source_headers.append(possible_source.getheaders())
if possible_source.status == HTTP_INSUFFICIENT_STORAGE:
self.app.error_limit(node, _('ERROR Insufficient Storage'))
elif is_server_error(possible_source.status):
self.app.error_occurred(
node, _('ERROR %(status)d %(body)s '
'From %(type)s Server') %
{'status': possible_source.status,
'body': self.bodies[-1][:1024],
'type': self.server_type})
return False
def _get_source_and_node(self):
self.statuses = []
self.reasons = []
self.bodies = []
self.source_headers = []
sources = []
self.sources = []
nodes = GreenthreadSafeIterator(self.node_iter)
node_timeout = self.app.node_timeout
if self.server_type == 'Object' and not self.newest:
node_timeout = self.app.recoverable_node_timeout
for node in self.node_iter:
if node in self.used_nodes:
continue
start_node_timing = time.time()
try:
with ConnectionTimeout(self.app.conn_timeout):
conn = http_connect(
node['ip'], node['port'], node['device'],
self.partition, self.req_method, self.path,
headers=self.backend_headers,
query_string=self.req_query_string)
self.app.set_node_timing(node, time.time() - start_node_timing)
with Timeout(node_timeout):
possible_source = conn.getresponse()
# See NOTE: swift_conn at top of file about this.
possible_source.swift_conn = conn
except (Exception, Timeout):
self.app.exception_occurred(
node, self.server_type,
_('Trying to %(method)s %(path)s') %
{'method': self.req_method, 'path': self.req_path})
continue
if self.is_good_source(possible_source):
# 404 if we know we don't have a synced copy
if not float(possible_source.getheader('X-PUT-Timestamp', 1)):
self.statuses.append(HTTP_NOT_FOUND)
self.reasons.append('')
self.bodies.append('')
self.source_headers.append([])
close_swift_conn(possible_source)
else:
if self.used_source_etag:
src_headers = dict(
(k.lower(), v) for k, v in
possible_source.getheaders())
pile = GreenAsyncPile(self.concurrency)
if self.used_source_etag != src_headers.get(
'x-object-sysmeta-ec-etag',
src_headers.get('etag', '')).strip('"'):
self.statuses.append(HTTP_NOT_FOUND)
self.reasons.append('')
self.bodies.append('')
self.source_headers.append([])
continue
for node in nodes:
pile.spawn(self._make_node_request, node, node_timeout,
self.app.logger.thread_locals)
_timeout = self.app.concurrency_timeout \
if pile.inflight < self.concurrency else None
if pile.waitfirst(_timeout):
break
else:
# ran out of nodes, see if any stragglers will finish
any(pile)
self.statuses.append(possible_source.status)
self.reasons.append(possible_source.reason)
self.bodies.append(None)
self.source_headers.append(possible_source.getheaders())
sources.append((possible_source, node))
if not self.newest: # one good source is enough
break
else:
self.statuses.append(possible_source.status)
self.reasons.append(possible_source.reason)
self.bodies.append(possible_source.read())
self.source_headers.append(possible_source.getheaders())
if possible_source.status == HTTP_INSUFFICIENT_STORAGE:
self.app.error_limit(node, _('ERROR Insufficient Storage'))
elif is_server_error(possible_source.status):
self.app.error_occurred(
node, _('ERROR %(status)d %(body)s '
'From %(type)s Server') %
{'status': possible_source.status,
'body': self.bodies[-1][:1024],
'type': self.server_type})
if sources:
sources.sort(key=lambda s: source_key(s[0]))
source, node = sources.pop()
for src, _junk in sources:
if self.sources:
self.sources.sort(key=lambda s: source_key(s[0]))
source, node = self.sources.pop()
for src, _junk in self.sources:
close_swift_conn(src)
self.used_nodes.append(node)
src_headers = dict(
@ -1613,7 +1634,7 @@ class Controller(object):
self.app.logger.warning('Could not autocreate account %r' % path)
def GETorHEAD_base(self, req, server_type, node_iter, partition, path,
client_chunk_size=None):
concurrency=1, client_chunk_size=None):
"""
Base handler for HTTP GET or HEAD requests.
@ -1622,6 +1643,7 @@ class Controller(object):
:param node_iter: an iterator to obtain nodes from
:param partition: partition
:param path: path for the request
:param concurrency: number of requests to run concurrently
:param client_chunk_size: chunk size for response body iterator
:returns: swob.Response object
"""
@ -1630,6 +1652,7 @@ class Controller(object):
handler = GetOrHeadHandler(self.app, req, self.server_type, node_iter,
partition, path, backend_headers,
concurrency,
client_chunk_size=client_chunk_size)
res = handler.get_working_response(req)

View File

@ -93,10 +93,12 @@ class ContainerController(Controller):
return HTTPNotFound(request=req)
part = self.app.container_ring.get_part(
self.account_name, self.container_name)
concurrency = self.app.container_ring.replica_count \
if self.app.concurrent_gets else 1
node_iter = self.app.iter_nodes(self.app.container_ring, part)
resp = self.GETorHEAD_base(
req, _('Container'), node_iter, part,
req.swift_entity_path)
req.swift_entity_path, concurrency)
if 'swift.authorize' in req.environ:
req.acl = resp.headers.get('x-container-read')
aresp = req.environ['swift.authorize'](req)

View File

@ -879,9 +879,11 @@ class BaseObjectController(Controller):
class ReplicatedObjectController(BaseObjectController):
def _get_or_head_response(self, req, node_iter, partition, policy):
concurrency = self.app.get_object_ring(policy.idx).replica_count \
if self.app.concurrent_gets else 1
resp = self.GETorHEAD_base(
req, _('Object'), node_iter, partition,
req.swift_entity_path)
req.swift_entity_path, concurrency)
return resp
def _connect_put_node(self, nodes, part, path, headers,
@ -2000,9 +2002,10 @@ class ECObjectController(BaseObjectController):
# no fancy EC decoding here, just one plain old HEAD request to
# one object server because all fragments hold all metadata
# information about the object.
concurrency = policy.ec_ndata if self.app.concurrent_gets else 1
resp = self.GETorHEAD_base(
req, _('Object'), node_iter, partition,
req.swift_entity_path)
req.swift_entity_path, concurrency)
else: # GET request
orig_range = None
range_specs = []
@ -2011,6 +2014,12 @@ class ECObjectController(BaseObjectController):
range_specs = self._convert_range(req, policy)
safe_iter = GreenthreadSafeIterator(node_iter)
# Sending the request concurrently to all nodes, and responding
# with the first response isn't something useful for EC as all
# nodes contain different fragments. Also EC has implemented it's
# own specific implementation of concurrent gets to ec_ndata nodes.
# So we don't need to worry about plumbing and sending a
# concurrency value to ResumingGetter.
with ContextPool(policy.ec_ndata) as pool:
pile = GreenAsyncPile(pool)
for _junk in range(policy.ec_ndata):

View File

@ -147,6 +147,10 @@ class Application(object):
self.node_timings = {}
self.timing_expiry = int(conf.get('timing_expiry', 300))
self.sorting_method = conf.get('sorting_method', 'shuffle').lower()
self.concurrent_gets = \
config_true_value(conf.get('concurrent_gets'))
self.concurrency_timeout = float(conf.get('concurrency_timeout',
self.conn_timeout))
value = conf.get('request_node_count', '2 * replicas').lower().split()
if len(value) == 1:
rnc_value = int(value[0])

View File

@ -4988,6 +4988,37 @@ class TestGreenAsyncPile(unittest.TestCase):
self.assertEqual(pile.waitall(0.5), [0.1, 0.1])
self.assertEqual(completed[0], 2)
def test_waitfirst_only_returns_first(self):
def run_test(name):
eventlet.sleep(0)
completed.append(name)
return name
completed = []
pile = utils.GreenAsyncPile(3)
pile.spawn(run_test, 'first')
pile.spawn(run_test, 'second')
pile.spawn(run_test, 'third')
self.assertEqual(pile.waitfirst(0.5), completed[0])
# 3 still completed, but only the first was returned.
self.assertEqual(3, len(completed))
def test_wait_with_firstn(self):
def run_test(name):
eventlet.sleep(0)
completed.append(name)
return name
for first_n in [None] + list(range(6)):
completed = []
pile = utils.GreenAsyncPile(10)
for i in range(10):
pile.spawn(run_test, i)
actual = pile._wait(1, first_n)
expected_n = first_n if first_n else 10
self.assertEqual(completed[:expected_n], actual)
self.assertEqual(10, len(completed))
def test_pending(self):
pile = utils.GreenAsyncPile(3)
self.assertEqual(0, pile._pending)

View File

@ -28,7 +28,7 @@ from swift.common import exceptions
from swift.common.utils import split_path
from swift.common.header_key_dict import HeaderKeyDict
from swift.common.http import is_success
from swift.common.storage_policy import StoragePolicy
from swift.common.storage_policy import StoragePolicy, POLICIES
from test.unit import fake_http_connect, FakeRing, FakeMemcache
from swift.proxy import server as proxy_server
from swift.common.request_helpers import get_sys_meta_prefix
@ -193,6 +193,52 @@ class TestFuncs(unittest.TestCase):
self.assertTrue('swift.account/a' in resp.environ)
self.assertEqual(resp.environ['swift.account/a']['status'], 200)
# Run the above tests again, but this time with concurrent_reads
# turned on
policy = next(iter(POLICIES))
concurrent_get_threads = policy.object_ring.replica_count
for concurrency_timeout in (0, 2):
self.app.concurrency_timeout = concurrency_timeout
req = Request.blank('/v1/a/c/o/with/slashes')
# NOTE: We are using slow_connect of fake_http_connect as using
# a concurrency of 0 when mocking the connection is a little too
# fast for eventlet. Network i/o will make this fine, but mocking
# it seems is too instantaneous.
with patch('swift.proxy.controllers.base.http_connect',
fake_http_connect(200, slow_connect=True)):
resp = base.GETorHEAD_base(
req, 'object', iter(nodes), 'part', '/a/c/o/with/slashes',
concurrency=concurrent_get_threads)
self.assertTrue('swift.object/a/c/o/with/slashes' in resp.environ)
self.assertEqual(
resp.environ['swift.object/a/c/o/with/slashes']['status'], 200)
req = Request.blank('/v1/a/c/o')
with patch('swift.proxy.controllers.base.http_connect',
fake_http_connect(200, slow_connect=True)):
resp = base.GETorHEAD_base(
req, 'object', iter(nodes), 'part', '/a/c/o',
concurrency=concurrent_get_threads)
self.assertTrue('swift.object/a/c/o' in resp.environ)
self.assertEqual(resp.environ['swift.object/a/c/o']['status'], 200)
req = Request.blank('/v1/a/c')
with patch('swift.proxy.controllers.base.http_connect',
fake_http_connect(200, slow_connect=True)):
resp = base.GETorHEAD_base(
req, 'container', iter(nodes), 'part', '/a/c',
concurrency=concurrent_get_threads)
self.assertTrue('swift.container/a/c' in resp.environ)
self.assertEqual(resp.environ['swift.container/a/c']['status'],
200)
req = Request.blank('/v1/a')
with patch('swift.proxy.controllers.base.http_connect',
fake_http_connect(200, slow_connect=True)):
resp = base.GETorHEAD_base(
req, 'account', iter(nodes), 'part', '/a',
concurrency=concurrent_get_threads)
self.assertTrue('swift.account/a' in resp.environ)
self.assertEqual(resp.environ['swift.account/a']['status'], 200)
def test_get_info(self):
app = FakeApp()
# Do a non cached call to account

View File

@ -722,9 +722,15 @@ class TestReplicatedObjController(BaseObjectControllerMixin,
def test_GET_error(self):
req = swift.common.swob.Request.blank('/v1/a/c/o')
with set_http_connect(503, 200):
self.app.logger.txn_id = req.environ['swift.trans_id'] = 'my-txn-id'
stdout = BytesIO()
with set_http_connect(503, 200), \
mock.patch('sys.stdout', stdout):
resp = req.get_response(self.app)
self.assertEqual(resp.status_int, 200)
for line in stdout.getvalue().splitlines():
self.assertIn('my-txn-id', line)
self.assertIn('From Object Server', stdout.getvalue())
def test_GET_handoff(self):
req = swift.common.swob.Request.blank('/v1/a/c/o')

View File

@ -929,6 +929,88 @@ class TestProxyServer(unittest.TestCase):
{'region': 2, 'zone': 1, 'ip': '127.0.0.1'}]
self.assertEqual(exp_sorted, app_sorted)
def test_node_concurrency(self):
nodes = [{'region': 1, 'zone': 1, 'ip': '127.0.0.1', 'port': 6010,
'device': 'sda'},
{'region': 2, 'zone': 2, 'ip': '127.0.0.2', 'port': 6010,
'device': 'sda'},
{'region': 3, 'zone': 3, 'ip': '127.0.0.3', 'port': 6010,
'device': 'sda'}]
timings = {'127.0.0.1': 2, '127.0.0.2': 1, '127.0.0.3': 0}
statuses = {'127.0.0.1': 200, '127.0.0.2': 200, '127.0.0.3': 200}
req = Request.blank('/v1/account', environ={'REQUEST_METHOD': 'GET'})
def fake_iter_nodes(*arg, **karg):
return iter(nodes)
class FakeConn(object):
def __init__(self, ip, *args, **kargs):
self.ip = ip
self.args = args
self.kargs = kargs
def getresponse(self):
def mygetheader(header, *args, **kargs):
if header == "Content-Type":
return ""
else:
return 1
resp = mock.Mock()
resp.read.side_effect = ['Response from %s' % self.ip, '']
resp.getheader = mygetheader
resp.getheaders.return_value = {}
resp.reason = ''
resp.status = statuses[self.ip]
sleep(timings[self.ip])
return resp
def myfake_http_connect_raw(ip, *args, **kargs):
conn = FakeConn(ip, *args, **kargs)
return conn
with mock.patch('swift.proxy.server.Application.iter_nodes',
fake_iter_nodes):
with mock.patch('swift.common.bufferedhttp.http_connect_raw',
myfake_http_connect_raw):
app_conf = {'concurrent_gets': 'on',
'concurrency_timeout': 0}
baseapp = proxy_server.Application(app_conf,
FakeMemcache(),
container_ring=FakeRing(),
account_ring=FakeRing())
self.assertEqual(baseapp.concurrent_gets, True)
self.assertEqual(baseapp.concurrency_timeout, 0)
baseapp.update_request(req)
resp = baseapp.handle_request(req)
# Should get 127.0.0.3 as this has a wait of 0 seconds.
self.assertEqual(resp.body, 'Response from 127.0.0.3')
# lets try again, with 127.0.0.1 with 0 timing but returns an
# error.
timings['127.0.0.1'] = 0
statuses['127.0.0.1'] = 500
# Should still get 127.0.0.3 as this has a wait of 0 seconds
# and a success
baseapp.update_request(req)
resp = baseapp.handle_request(req)
self.assertEqual(resp.body, 'Response from 127.0.0.3')
# Now lets set the concurrency_timeout
app_conf['concurrency_timeout'] = 2
baseapp = proxy_server.Application(app_conf,
FakeMemcache(),
container_ring=FakeRing(),
account_ring=FakeRing())
self.assertEqual(baseapp.concurrency_timeout, 2)
baseapp.update_request(req)
resp = baseapp.handle_request(req)
# Should get 127.0.0.2 as this has a wait of 1 seconds.
self.assertEqual(resp.body, 'Response from 127.0.0.2')
def test_info_defaults(self):
app = proxy_server.Application({}, FakeMemcache(),
account_ring=FakeRing(),