Add concurrent reads option to proxy
This change adds 2 new parameters to enable and control concurrent GETs in swift, these are 'concurrent_gets' and 'concurrency_timeout'. 'concurrent_gets' allows you to turn on or off concurrent GETs, when on it will set the GET/HEAD concurrency to replica count. And in the case of EC HEADs it will set it to ndata. The proxy will then serve only the first valid source to respond. This applies to all account, container and object GETs except for EC. For EC only HEAD requests are effected. It achieves this by changing the request sending mechanism to using GreenAsyncPile and green threads with a time out between each request. 'concurrency_timeout' is related to concurrent_gets. And is the amount of time to wait before firing the next thread. A value of 0 will fire at the same time (fully concurrent), setting another value will stagger the firing allowing you the ability to give a node a shorter chance to respond before firing the next. This value is a float and should be somewhere between 0 and node_timeout. The default is conn_timeout. Meaning by default it will stagger the firing. DocImpact Implements: blueprint concurrent-reads Change-Id: I789d39472ec48b22415ff9d9821b1eefab7da867
This commit is contained in:
parent
e50ab92aaa
commit
f595a7e704
@ -1367,6 +1367,36 @@ swift_owner_headers <see the sample These are the headers whose
|
||||
headers> up to the auth system in use,
|
||||
but usually indicates
|
||||
administrative responsibilities.
|
||||
sorting_method shuffle Storage nodes can be chosen at
|
||||
random (shuffle), by using timing
|
||||
measurements (timing), or by using
|
||||
an explicit match (affinity).
|
||||
Using timing measurements may allow
|
||||
for lower overall latency, while
|
||||
using affinity allows for finer
|
||||
control. In both the timing and
|
||||
affinity cases, equally-sorting nodes
|
||||
are still randomly chosen to spread
|
||||
load.
|
||||
timing_expiry 300 If the "timing" sorting_method is
|
||||
used, the timings will only be valid
|
||||
for the number of seconds configured
|
||||
by timing_expiry.
|
||||
concurrent_gets off Use replica count number of
|
||||
threads concurrently during a
|
||||
GET/HEAD and return with the
|
||||
first successful response. In
|
||||
the EC case, this parameter only
|
||||
effects an EC HEAD as an EC GET
|
||||
behaves differently.
|
||||
concurrency_timeout conn_timeout This parameter controls how long
|
||||
to wait before firing off the
|
||||
next concurrent_get thread. A
|
||||
value of 0 would we fully concurrent
|
||||
any other number will stagger the
|
||||
firing of the threads. This number
|
||||
should be between 0 and node_timeout.
|
||||
The default is conn_timeout (0.5).
|
||||
============================ =============== =============================
|
||||
|
||||
[tempauth]
|
||||
|
@ -164,13 +164,28 @@ use = egg:swift#proxy
|
||||
# using affinity allows for finer control. In both the timing and
|
||||
# affinity cases, equally-sorting nodes are still randomly chosen to
|
||||
# spread load.
|
||||
# The valid values for sorting_method are "affinity", "shuffle", and "timing".
|
||||
# The valid values for sorting_method are "affinity", "shuffle", or "timing".
|
||||
# sorting_method = shuffle
|
||||
#
|
||||
# If the "timing" sorting_method is used, the timings will only be valid for
|
||||
# the number of seconds configured by timing_expiry.
|
||||
# timing_expiry = 300
|
||||
#
|
||||
# By default on a GET/HEAD swift will connect to a storage node one at a time
|
||||
# in a single thread. There is smarts in the order they are hit however. If you
|
||||
# turn on concurrent_gets below, then replica count threads will be used.
|
||||
# With addition of the concurrency_timeout option this will allow swift to send
|
||||
# out GET/HEAD requests to the storage nodes concurrently and answer with the
|
||||
# first to respond. With an EC policy the parameter only affects HEAD requests.
|
||||
# concurrent_gets = off
|
||||
#
|
||||
# This parameter controls how long to wait before firing off the next
|
||||
# concurrent_get thread. A value of 0 would be fully concurrent, any other
|
||||
# number will stagger the firing of the threads. This number should be
|
||||
# between 0 and node_timeout. The default is what ever you set for the
|
||||
# conn_timeout parameter.
|
||||
# concurrency_timeout = 0.5
|
||||
#
|
||||
# Set to the number of nodes to contact for a normal request. You can use
|
||||
# '* replicas' at the end to have it use the number given times the number of
|
||||
# replicas for the ring being used for the request.
|
||||
|
@ -2471,6 +2471,10 @@ class GreenAsyncPile(object):
|
||||
finally:
|
||||
self._inflight -= 1
|
||||
|
||||
@property
|
||||
def inflight(self):
|
||||
return self._inflight
|
||||
|
||||
def spawn(self, func, *args, **kwargs):
|
||||
"""
|
||||
Spawn a job in a green thread on the pile.
|
||||
@ -2479,6 +2483,16 @@ class GreenAsyncPile(object):
|
||||
self._inflight += 1
|
||||
self._pool.spawn(self._run_func, func, args, kwargs)
|
||||
|
||||
def waitfirst(self, timeout):
|
||||
"""
|
||||
Wait up to timeout seconds for first result to come in.
|
||||
|
||||
:param timeout: seconds to wait for results
|
||||
:returns: first item to come back, or None
|
||||
"""
|
||||
for result in self._wait(timeout, first_n=1):
|
||||
return result
|
||||
|
||||
def waitall(self, timeout):
|
||||
"""
|
||||
Wait timeout seconds for any results to come in.
|
||||
@ -2486,11 +2500,16 @@ class GreenAsyncPile(object):
|
||||
:param timeout: seconds to wait for results
|
||||
:returns: list of results accrued in that time
|
||||
"""
|
||||
return self._wait(timeout)
|
||||
|
||||
def _wait(self, timeout, first_n=None):
|
||||
results = []
|
||||
try:
|
||||
with GreenAsyncPileWaitallTimeout(timeout):
|
||||
while True:
|
||||
results.append(next(self))
|
||||
if first_n and len(results) >= first_n:
|
||||
break
|
||||
except (GreenAsyncPileWaitallTimeout, StopIteration):
|
||||
pass
|
||||
return results
|
||||
|
@ -60,10 +60,12 @@ class AccountController(Controller):
|
||||
return resp
|
||||
|
||||
partition = self.app.account_ring.get_part(self.account_name)
|
||||
concurrency = self.app.account_ring.replica_count \
|
||||
if self.app.concurrent_gets else 1
|
||||
node_iter = self.app.iter_nodes(self.app.account_ring, partition)
|
||||
resp = self.GETorHEAD_base(
|
||||
req, _('Account'), node_iter, partition,
|
||||
req.swift_entity_path.rstrip('/'))
|
||||
req.swift_entity_path.rstrip('/'), concurrency)
|
||||
if resp.status_int == HTTP_NOT_FOUND:
|
||||
if resp.headers.get('X-Account-Status', '').lower() == 'deleted':
|
||||
resp.status = HTTP_GONE
|
||||
|
@ -623,7 +623,8 @@ def bytes_to_skip(record_size, range_start):
|
||||
|
||||
class ResumingGetter(object):
|
||||
def __init__(self, app, req, server_type, node_iter, partition, path,
|
||||
backend_headers, client_chunk_size=None, newest=None):
|
||||
backend_headers, concurrency=1, client_chunk_size=None,
|
||||
newest=None):
|
||||
self.app = app
|
||||
self.node_iter = node_iter
|
||||
self.server_type = server_type
|
||||
@ -634,6 +635,7 @@ class ResumingGetter(object):
|
||||
self.skip_bytes = 0
|
||||
self.used_nodes = []
|
||||
self.used_source_etag = ''
|
||||
self.concurrency = concurrency
|
||||
|
||||
# stuff from request
|
||||
self.req_method = req.method
|
||||
@ -649,6 +651,7 @@ class ResumingGetter(object):
|
||||
self.reasons = []
|
||||
self.bodies = []
|
||||
self.source_headers = []
|
||||
self.sources = []
|
||||
|
||||
# populated from response headers
|
||||
self.start_byte = self.end_byte = self.length = None
|
||||
@ -971,88 +974,106 @@ class ResumingGetter(object):
|
||||
else:
|
||||
return None
|
||||
|
||||
def _make_node_request(self, node, node_timeout, logger_thread_locals):
|
||||
self.app.logger.thread_locals = logger_thread_locals
|
||||
if node in self.used_nodes:
|
||||
return False
|
||||
start_node_timing = time.time()
|
||||
try:
|
||||
with ConnectionTimeout(self.app.conn_timeout):
|
||||
conn = http_connect(
|
||||
node['ip'], node['port'], node['device'],
|
||||
self.partition, self.req_method, self.path,
|
||||
headers=self.backend_headers,
|
||||
query_string=self.req_query_string)
|
||||
self.app.set_node_timing(node, time.time() - start_node_timing)
|
||||
|
||||
with Timeout(node_timeout):
|
||||
possible_source = conn.getresponse()
|
||||
# See NOTE: swift_conn at top of file about this.
|
||||
possible_source.swift_conn = conn
|
||||
except (Exception, Timeout):
|
||||
self.app.exception_occurred(
|
||||
node, self.server_type,
|
||||
_('Trying to %(method)s %(path)s') %
|
||||
{'method': self.req_method, 'path': self.req_path})
|
||||
return False
|
||||
if self.is_good_source(possible_source):
|
||||
# 404 if we know we don't have a synced copy
|
||||
if not float(possible_source.getheader('X-PUT-Timestamp', 1)):
|
||||
self.statuses.append(HTTP_NOT_FOUND)
|
||||
self.reasons.append('')
|
||||
self.bodies.append('')
|
||||
self.source_headers.append([])
|
||||
close_swift_conn(possible_source)
|
||||
else:
|
||||
if self.used_source_etag:
|
||||
src_headers = dict(
|
||||
(k.lower(), v) for k, v in
|
||||
possible_source.getheaders())
|
||||
|
||||
if self.used_source_etag != src_headers.get(
|
||||
'x-object-sysmeta-ec-etag',
|
||||
src_headers.get('etag', '')).strip('"'):
|
||||
self.statuses.append(HTTP_NOT_FOUND)
|
||||
self.reasons.append('')
|
||||
self.bodies.append('')
|
||||
self.source_headers.append([])
|
||||
return False
|
||||
|
||||
self.statuses.append(possible_source.status)
|
||||
self.reasons.append(possible_source.reason)
|
||||
self.bodies.append(None)
|
||||
self.source_headers.append(possible_source.getheaders())
|
||||
self.sources.append((possible_source, node))
|
||||
if not self.newest: # one good source is enough
|
||||
return True
|
||||
else:
|
||||
self.statuses.append(possible_source.status)
|
||||
self.reasons.append(possible_source.reason)
|
||||
self.bodies.append(possible_source.read())
|
||||
self.source_headers.append(possible_source.getheaders())
|
||||
if possible_source.status == HTTP_INSUFFICIENT_STORAGE:
|
||||
self.app.error_limit(node, _('ERROR Insufficient Storage'))
|
||||
elif is_server_error(possible_source.status):
|
||||
self.app.error_occurred(
|
||||
node, _('ERROR %(status)d %(body)s '
|
||||
'From %(type)s Server') %
|
||||
{'status': possible_source.status,
|
||||
'body': self.bodies[-1][:1024],
|
||||
'type': self.server_type})
|
||||
return False
|
||||
|
||||
def _get_source_and_node(self):
|
||||
self.statuses = []
|
||||
self.reasons = []
|
||||
self.bodies = []
|
||||
self.source_headers = []
|
||||
sources = []
|
||||
self.sources = []
|
||||
|
||||
nodes = GreenthreadSafeIterator(self.node_iter)
|
||||
|
||||
node_timeout = self.app.node_timeout
|
||||
if self.server_type == 'Object' and not self.newest:
|
||||
node_timeout = self.app.recoverable_node_timeout
|
||||
for node in self.node_iter:
|
||||
if node in self.used_nodes:
|
||||
continue
|
||||
start_node_timing = time.time()
|
||||
try:
|
||||
with ConnectionTimeout(self.app.conn_timeout):
|
||||
conn = http_connect(
|
||||
node['ip'], node['port'], node['device'],
|
||||
self.partition, self.req_method, self.path,
|
||||
headers=self.backend_headers,
|
||||
query_string=self.req_query_string)
|
||||
self.app.set_node_timing(node, time.time() - start_node_timing)
|
||||
|
||||
with Timeout(node_timeout):
|
||||
possible_source = conn.getresponse()
|
||||
# See NOTE: swift_conn at top of file about this.
|
||||
possible_source.swift_conn = conn
|
||||
except (Exception, Timeout):
|
||||
self.app.exception_occurred(
|
||||
node, self.server_type,
|
||||
_('Trying to %(method)s %(path)s') %
|
||||
{'method': self.req_method, 'path': self.req_path})
|
||||
continue
|
||||
if self.is_good_source(possible_source):
|
||||
# 404 if we know we don't have a synced copy
|
||||
if not float(possible_source.getheader('X-PUT-Timestamp', 1)):
|
||||
self.statuses.append(HTTP_NOT_FOUND)
|
||||
self.reasons.append('')
|
||||
self.bodies.append('')
|
||||
self.source_headers.append([])
|
||||
close_swift_conn(possible_source)
|
||||
else:
|
||||
if self.used_source_etag:
|
||||
src_headers = dict(
|
||||
(k.lower(), v) for k, v in
|
||||
possible_source.getheaders())
|
||||
pile = GreenAsyncPile(self.concurrency)
|
||||
|
||||
if self.used_source_etag != src_headers.get(
|
||||
'x-object-sysmeta-ec-etag',
|
||||
src_headers.get('etag', '')).strip('"'):
|
||||
self.statuses.append(HTTP_NOT_FOUND)
|
||||
self.reasons.append('')
|
||||
self.bodies.append('')
|
||||
self.source_headers.append([])
|
||||
continue
|
||||
for node in nodes:
|
||||
pile.spawn(self._make_node_request, node, node_timeout,
|
||||
self.app.logger.thread_locals)
|
||||
_timeout = self.app.concurrency_timeout \
|
||||
if pile.inflight < self.concurrency else None
|
||||
if pile.waitfirst(_timeout):
|
||||
break
|
||||
else:
|
||||
# ran out of nodes, see if any stragglers will finish
|
||||
any(pile)
|
||||
|
||||
self.statuses.append(possible_source.status)
|
||||
self.reasons.append(possible_source.reason)
|
||||
self.bodies.append(None)
|
||||
self.source_headers.append(possible_source.getheaders())
|
||||
sources.append((possible_source, node))
|
||||
if not self.newest: # one good source is enough
|
||||
break
|
||||
else:
|
||||
self.statuses.append(possible_source.status)
|
||||
self.reasons.append(possible_source.reason)
|
||||
self.bodies.append(possible_source.read())
|
||||
self.source_headers.append(possible_source.getheaders())
|
||||
if possible_source.status == HTTP_INSUFFICIENT_STORAGE:
|
||||
self.app.error_limit(node, _('ERROR Insufficient Storage'))
|
||||
elif is_server_error(possible_source.status):
|
||||
self.app.error_occurred(
|
||||
node, _('ERROR %(status)d %(body)s '
|
||||
'From %(type)s Server') %
|
||||
{'status': possible_source.status,
|
||||
'body': self.bodies[-1][:1024],
|
||||
'type': self.server_type})
|
||||
|
||||
if sources:
|
||||
sources.sort(key=lambda s: source_key(s[0]))
|
||||
source, node = sources.pop()
|
||||
for src, _junk in sources:
|
||||
if self.sources:
|
||||
self.sources.sort(key=lambda s: source_key(s[0]))
|
||||
source, node = self.sources.pop()
|
||||
for src, _junk in self.sources:
|
||||
close_swift_conn(src)
|
||||
self.used_nodes.append(node)
|
||||
src_headers = dict(
|
||||
@ -1613,7 +1634,7 @@ class Controller(object):
|
||||
self.app.logger.warning('Could not autocreate account %r' % path)
|
||||
|
||||
def GETorHEAD_base(self, req, server_type, node_iter, partition, path,
|
||||
client_chunk_size=None):
|
||||
concurrency=1, client_chunk_size=None):
|
||||
"""
|
||||
Base handler for HTTP GET or HEAD requests.
|
||||
|
||||
@ -1622,6 +1643,7 @@ class Controller(object):
|
||||
:param node_iter: an iterator to obtain nodes from
|
||||
:param partition: partition
|
||||
:param path: path for the request
|
||||
:param concurrency: number of requests to run concurrently
|
||||
:param client_chunk_size: chunk size for response body iterator
|
||||
:returns: swob.Response object
|
||||
"""
|
||||
@ -1630,6 +1652,7 @@ class Controller(object):
|
||||
|
||||
handler = GetOrHeadHandler(self.app, req, self.server_type, node_iter,
|
||||
partition, path, backend_headers,
|
||||
concurrency,
|
||||
client_chunk_size=client_chunk_size)
|
||||
res = handler.get_working_response(req)
|
||||
|
||||
|
@ -93,10 +93,12 @@ class ContainerController(Controller):
|
||||
return HTTPNotFound(request=req)
|
||||
part = self.app.container_ring.get_part(
|
||||
self.account_name, self.container_name)
|
||||
concurrency = self.app.container_ring.replica_count \
|
||||
if self.app.concurrent_gets else 1
|
||||
node_iter = self.app.iter_nodes(self.app.container_ring, part)
|
||||
resp = self.GETorHEAD_base(
|
||||
req, _('Container'), node_iter, part,
|
||||
req.swift_entity_path)
|
||||
req.swift_entity_path, concurrency)
|
||||
if 'swift.authorize' in req.environ:
|
||||
req.acl = resp.headers.get('x-container-read')
|
||||
aresp = req.environ['swift.authorize'](req)
|
||||
|
@ -879,9 +879,11 @@ class BaseObjectController(Controller):
|
||||
class ReplicatedObjectController(BaseObjectController):
|
||||
|
||||
def _get_or_head_response(self, req, node_iter, partition, policy):
|
||||
concurrency = self.app.get_object_ring(policy.idx).replica_count \
|
||||
if self.app.concurrent_gets else 1
|
||||
resp = self.GETorHEAD_base(
|
||||
req, _('Object'), node_iter, partition,
|
||||
req.swift_entity_path)
|
||||
req.swift_entity_path, concurrency)
|
||||
return resp
|
||||
|
||||
def _connect_put_node(self, nodes, part, path, headers,
|
||||
@ -2000,9 +2002,10 @@ class ECObjectController(BaseObjectController):
|
||||
# no fancy EC decoding here, just one plain old HEAD request to
|
||||
# one object server because all fragments hold all metadata
|
||||
# information about the object.
|
||||
concurrency = policy.ec_ndata if self.app.concurrent_gets else 1
|
||||
resp = self.GETorHEAD_base(
|
||||
req, _('Object'), node_iter, partition,
|
||||
req.swift_entity_path)
|
||||
req.swift_entity_path, concurrency)
|
||||
else: # GET request
|
||||
orig_range = None
|
||||
range_specs = []
|
||||
@ -2011,6 +2014,12 @@ class ECObjectController(BaseObjectController):
|
||||
range_specs = self._convert_range(req, policy)
|
||||
|
||||
safe_iter = GreenthreadSafeIterator(node_iter)
|
||||
# Sending the request concurrently to all nodes, and responding
|
||||
# with the first response isn't something useful for EC as all
|
||||
# nodes contain different fragments. Also EC has implemented it's
|
||||
# own specific implementation of concurrent gets to ec_ndata nodes.
|
||||
# So we don't need to worry about plumbing and sending a
|
||||
# concurrency value to ResumingGetter.
|
||||
with ContextPool(policy.ec_ndata) as pool:
|
||||
pile = GreenAsyncPile(pool)
|
||||
for _junk in range(policy.ec_ndata):
|
||||
|
@ -147,6 +147,10 @@ class Application(object):
|
||||
self.node_timings = {}
|
||||
self.timing_expiry = int(conf.get('timing_expiry', 300))
|
||||
self.sorting_method = conf.get('sorting_method', 'shuffle').lower()
|
||||
self.concurrent_gets = \
|
||||
config_true_value(conf.get('concurrent_gets'))
|
||||
self.concurrency_timeout = float(conf.get('concurrency_timeout',
|
||||
self.conn_timeout))
|
||||
value = conf.get('request_node_count', '2 * replicas').lower().split()
|
||||
if len(value) == 1:
|
||||
rnc_value = int(value[0])
|
||||
|
@ -4988,6 +4988,37 @@ class TestGreenAsyncPile(unittest.TestCase):
|
||||
self.assertEqual(pile.waitall(0.5), [0.1, 0.1])
|
||||
self.assertEqual(completed[0], 2)
|
||||
|
||||
def test_waitfirst_only_returns_first(self):
|
||||
def run_test(name):
|
||||
eventlet.sleep(0)
|
||||
completed.append(name)
|
||||
return name
|
||||
|
||||
completed = []
|
||||
pile = utils.GreenAsyncPile(3)
|
||||
pile.spawn(run_test, 'first')
|
||||
pile.spawn(run_test, 'second')
|
||||
pile.spawn(run_test, 'third')
|
||||
self.assertEqual(pile.waitfirst(0.5), completed[0])
|
||||
# 3 still completed, but only the first was returned.
|
||||
self.assertEqual(3, len(completed))
|
||||
|
||||
def test_wait_with_firstn(self):
|
||||
def run_test(name):
|
||||
eventlet.sleep(0)
|
||||
completed.append(name)
|
||||
return name
|
||||
|
||||
for first_n in [None] + list(range(6)):
|
||||
completed = []
|
||||
pile = utils.GreenAsyncPile(10)
|
||||
for i in range(10):
|
||||
pile.spawn(run_test, i)
|
||||
actual = pile._wait(1, first_n)
|
||||
expected_n = first_n if first_n else 10
|
||||
self.assertEqual(completed[:expected_n], actual)
|
||||
self.assertEqual(10, len(completed))
|
||||
|
||||
def test_pending(self):
|
||||
pile = utils.GreenAsyncPile(3)
|
||||
self.assertEqual(0, pile._pending)
|
||||
|
@ -28,7 +28,7 @@ from swift.common import exceptions
|
||||
from swift.common.utils import split_path
|
||||
from swift.common.header_key_dict import HeaderKeyDict
|
||||
from swift.common.http import is_success
|
||||
from swift.common.storage_policy import StoragePolicy
|
||||
from swift.common.storage_policy import StoragePolicy, POLICIES
|
||||
from test.unit import fake_http_connect, FakeRing, FakeMemcache
|
||||
from swift.proxy import server as proxy_server
|
||||
from swift.common.request_helpers import get_sys_meta_prefix
|
||||
@ -193,6 +193,52 @@ class TestFuncs(unittest.TestCase):
|
||||
self.assertTrue('swift.account/a' in resp.environ)
|
||||
self.assertEqual(resp.environ['swift.account/a']['status'], 200)
|
||||
|
||||
# Run the above tests again, but this time with concurrent_reads
|
||||
# turned on
|
||||
policy = next(iter(POLICIES))
|
||||
concurrent_get_threads = policy.object_ring.replica_count
|
||||
for concurrency_timeout in (0, 2):
|
||||
self.app.concurrency_timeout = concurrency_timeout
|
||||
req = Request.blank('/v1/a/c/o/with/slashes')
|
||||
# NOTE: We are using slow_connect of fake_http_connect as using
|
||||
# a concurrency of 0 when mocking the connection is a little too
|
||||
# fast for eventlet. Network i/o will make this fine, but mocking
|
||||
# it seems is too instantaneous.
|
||||
with patch('swift.proxy.controllers.base.http_connect',
|
||||
fake_http_connect(200, slow_connect=True)):
|
||||
resp = base.GETorHEAD_base(
|
||||
req, 'object', iter(nodes), 'part', '/a/c/o/with/slashes',
|
||||
concurrency=concurrent_get_threads)
|
||||
self.assertTrue('swift.object/a/c/o/with/slashes' in resp.environ)
|
||||
self.assertEqual(
|
||||
resp.environ['swift.object/a/c/o/with/slashes']['status'], 200)
|
||||
req = Request.blank('/v1/a/c/o')
|
||||
with patch('swift.proxy.controllers.base.http_connect',
|
||||
fake_http_connect(200, slow_connect=True)):
|
||||
resp = base.GETorHEAD_base(
|
||||
req, 'object', iter(nodes), 'part', '/a/c/o',
|
||||
concurrency=concurrent_get_threads)
|
||||
self.assertTrue('swift.object/a/c/o' in resp.environ)
|
||||
self.assertEqual(resp.environ['swift.object/a/c/o']['status'], 200)
|
||||
req = Request.blank('/v1/a/c')
|
||||
with patch('swift.proxy.controllers.base.http_connect',
|
||||
fake_http_connect(200, slow_connect=True)):
|
||||
resp = base.GETorHEAD_base(
|
||||
req, 'container', iter(nodes), 'part', '/a/c',
|
||||
concurrency=concurrent_get_threads)
|
||||
self.assertTrue('swift.container/a/c' in resp.environ)
|
||||
self.assertEqual(resp.environ['swift.container/a/c']['status'],
|
||||
200)
|
||||
|
||||
req = Request.blank('/v1/a')
|
||||
with patch('swift.proxy.controllers.base.http_connect',
|
||||
fake_http_connect(200, slow_connect=True)):
|
||||
resp = base.GETorHEAD_base(
|
||||
req, 'account', iter(nodes), 'part', '/a',
|
||||
concurrency=concurrent_get_threads)
|
||||
self.assertTrue('swift.account/a' in resp.environ)
|
||||
self.assertEqual(resp.environ['swift.account/a']['status'], 200)
|
||||
|
||||
def test_get_info(self):
|
||||
app = FakeApp()
|
||||
# Do a non cached call to account
|
||||
|
@ -722,9 +722,15 @@ class TestReplicatedObjController(BaseObjectControllerMixin,
|
||||
|
||||
def test_GET_error(self):
|
||||
req = swift.common.swob.Request.blank('/v1/a/c/o')
|
||||
with set_http_connect(503, 200):
|
||||
self.app.logger.txn_id = req.environ['swift.trans_id'] = 'my-txn-id'
|
||||
stdout = BytesIO()
|
||||
with set_http_connect(503, 200), \
|
||||
mock.patch('sys.stdout', stdout):
|
||||
resp = req.get_response(self.app)
|
||||
self.assertEqual(resp.status_int, 200)
|
||||
for line in stdout.getvalue().splitlines():
|
||||
self.assertIn('my-txn-id', line)
|
||||
self.assertIn('From Object Server', stdout.getvalue())
|
||||
|
||||
def test_GET_handoff(self):
|
||||
req = swift.common.swob.Request.blank('/v1/a/c/o')
|
||||
|
@ -928,6 +928,88 @@ class TestProxyServer(unittest.TestCase):
|
||||
{'region': 2, 'zone': 1, 'ip': '127.0.0.1'}]
|
||||
self.assertEqual(exp_sorted, app_sorted)
|
||||
|
||||
def test_node_concurrency(self):
|
||||
nodes = [{'region': 1, 'zone': 1, 'ip': '127.0.0.1', 'port': 6010,
|
||||
'device': 'sda'},
|
||||
{'region': 2, 'zone': 2, 'ip': '127.0.0.2', 'port': 6010,
|
||||
'device': 'sda'},
|
||||
{'region': 3, 'zone': 3, 'ip': '127.0.0.3', 'port': 6010,
|
||||
'device': 'sda'}]
|
||||
timings = {'127.0.0.1': 2, '127.0.0.2': 1, '127.0.0.3': 0}
|
||||
statuses = {'127.0.0.1': 200, '127.0.0.2': 200, '127.0.0.3': 200}
|
||||
req = Request.blank('/v1/account', environ={'REQUEST_METHOD': 'GET'})
|
||||
|
||||
def fake_iter_nodes(*arg, **karg):
|
||||
return iter(nodes)
|
||||
|
||||
class FakeConn(object):
|
||||
def __init__(self, ip, *args, **kargs):
|
||||
self.ip = ip
|
||||
self.args = args
|
||||
self.kargs = kargs
|
||||
|
||||
def getresponse(self):
|
||||
def mygetheader(header, *args, **kargs):
|
||||
if header == "Content-Type":
|
||||
return ""
|
||||
else:
|
||||
return 1
|
||||
|
||||
resp = mock.Mock()
|
||||
resp.read.side_effect = ['Response from %s' % self.ip, '']
|
||||
resp.getheader = mygetheader
|
||||
resp.getheaders.return_value = {}
|
||||
resp.reason = ''
|
||||
resp.status = statuses[self.ip]
|
||||
sleep(timings[self.ip])
|
||||
return resp
|
||||
|
||||
def myfake_http_connect_raw(ip, *args, **kargs):
|
||||
conn = FakeConn(ip, *args, **kargs)
|
||||
return conn
|
||||
|
||||
with mock.patch('swift.proxy.server.Application.iter_nodes',
|
||||
fake_iter_nodes):
|
||||
with mock.patch('swift.common.bufferedhttp.http_connect_raw',
|
||||
myfake_http_connect_raw):
|
||||
app_conf = {'concurrent_gets': 'on',
|
||||
'concurrency_timeout': 0}
|
||||
baseapp = proxy_server.Application(app_conf,
|
||||
FakeMemcache(),
|
||||
container_ring=FakeRing(),
|
||||
account_ring=FakeRing())
|
||||
self.assertEqual(baseapp.concurrent_gets, True)
|
||||
self.assertEqual(baseapp.concurrency_timeout, 0)
|
||||
baseapp.update_request(req)
|
||||
resp = baseapp.handle_request(req)
|
||||
|
||||
# Should get 127.0.0.3 as this has a wait of 0 seconds.
|
||||
self.assertEqual(resp.body, 'Response from 127.0.0.3')
|
||||
|
||||
# lets try again, with 127.0.0.1 with 0 timing but returns an
|
||||
# error.
|
||||
timings['127.0.0.1'] = 0
|
||||
statuses['127.0.0.1'] = 500
|
||||
|
||||
# Should still get 127.0.0.3 as this has a wait of 0 seconds
|
||||
# and a success
|
||||
baseapp.update_request(req)
|
||||
resp = baseapp.handle_request(req)
|
||||
self.assertEqual(resp.body, 'Response from 127.0.0.3')
|
||||
|
||||
# Now lets set the concurrency_timeout
|
||||
app_conf['concurrency_timeout'] = 2
|
||||
baseapp = proxy_server.Application(app_conf,
|
||||
FakeMemcache(),
|
||||
container_ring=FakeRing(),
|
||||
account_ring=FakeRing())
|
||||
self.assertEqual(baseapp.concurrency_timeout, 2)
|
||||
baseapp.update_request(req)
|
||||
resp = baseapp.handle_request(req)
|
||||
|
||||
# Should get 127.0.0.2 as this has a wait of 1 seconds.
|
||||
self.assertEqual(resp.body, 'Response from 127.0.0.2')
|
||||
|
||||
def test_info_defaults(self):
|
||||
app = proxy_server.Application({}, FakeMemcache(),
|
||||
account_ring=FakeRing(),
|
||||
|
Loading…
x
Reference in New Issue
Block a user