Merge "Make all concurrent_get options per-policy"
This commit is contained in:
commit
cca5e8b1de
@ -202,7 +202,7 @@ use = egg:swift#proxy
|
||||
#
|
||||
# By default on a GET/HEAD swift will connect to a minimum number storage nodes
|
||||
# in a minimum number of threads - for replicated data just a single request to
|
||||
# a single node one at a time. When enabled concurrent_gets allows the proxy,
|
||||
# a single node one at a time. When enabled concurrent_gets allows the proxy
|
||||
# to use up to replica count threads when waiting on a response. In
|
||||
# conjunction with the concurrency_timeout option this will allow swift to send
|
||||
# out GET/HEAD requests to the storage nodes concurrently and answer as soon as
|
||||
@ -307,6 +307,9 @@ use = egg:swift#proxy
|
||||
# write_affinity =
|
||||
# write_affinity_node_count =
|
||||
# write_affinity_handoff_delete_count =
|
||||
# concurrent_gets = off
|
||||
# concurrency_timeout = 0.5
|
||||
# concurrent_ec_extra_requests = 0
|
||||
|
||||
[filter:tempauth]
|
||||
use = egg:swift#tempauth
|
||||
|
@ -64,7 +64,7 @@ class AccountController(Controller):
|
||||
|
||||
partition = self.app.account_ring.get_part(self.account_name)
|
||||
concurrency = self.app.account_ring.replica_count \
|
||||
if self.app.concurrent_gets else 1
|
||||
if self.app.get_policy_options(None).concurrent_gets else 1
|
||||
node_iter = self.app.iter_nodes(self.app.account_ring, partition)
|
||||
params = req.params
|
||||
params['format'] = 'json'
|
||||
|
@ -856,8 +856,8 @@ class ByteCountEnforcer(object):
|
||||
|
||||
class GetOrHeadHandler(object):
|
||||
def __init__(self, app, req, server_type, node_iter, partition, path,
|
||||
backend_headers, concurrency=1, client_chunk_size=None,
|
||||
newest=None):
|
||||
backend_headers, concurrency=1, policy=None,
|
||||
client_chunk_size=None, newest=None):
|
||||
self.app = app
|
||||
self.node_iter = node_iter
|
||||
self.server_type = server_type
|
||||
@ -870,6 +870,7 @@ class GetOrHeadHandler(object):
|
||||
self.used_nodes = []
|
||||
self.used_source_etag = ''
|
||||
self.concurrency = concurrency
|
||||
self.policy = policy
|
||||
self.node = None
|
||||
self.latest_404_timestamp = Timestamp(0)
|
||||
|
||||
@ -1363,7 +1364,8 @@ class GetOrHeadHandler(object):
|
||||
for node in nodes:
|
||||
pile.spawn(self._make_node_request, node, node_timeout,
|
||||
self.app.logger.thread_locals)
|
||||
_timeout = self.app.concurrency_timeout \
|
||||
_timeout = self.app.get_policy_options(
|
||||
self.policy).concurrency_timeout \
|
||||
if pile.inflight < self.concurrency else None
|
||||
if pile.waitfirst(_timeout):
|
||||
break
|
||||
@ -1998,7 +2000,7 @@ class Controller(object):
|
||||
return False
|
||||
|
||||
def GETorHEAD_base(self, req, server_type, node_iter, partition, path,
|
||||
concurrency=1, client_chunk_size=None):
|
||||
concurrency=1, policy=None, client_chunk_size=None):
|
||||
"""
|
||||
Base handler for HTTP GET or HEAD requests.
|
||||
|
||||
@ -2008,6 +2010,7 @@ class Controller(object):
|
||||
:param partition: partition
|
||||
:param path: path for the request
|
||||
:param concurrency: number of requests to run concurrently
|
||||
:param policy: the policy instance, or None if Account or Container
|
||||
:param client_chunk_size: chunk size for response body iterator
|
||||
:returns: swob.Response object
|
||||
"""
|
||||
@ -2016,7 +2019,7 @@ class Controller(object):
|
||||
|
||||
handler = GetOrHeadHandler(self.app, req, self.server_type, node_iter,
|
||||
partition, path, backend_headers,
|
||||
concurrency,
|
||||
concurrency, policy=policy,
|
||||
client_chunk_size=client_chunk_size)
|
||||
res = handler.get_working_response(req)
|
||||
|
||||
|
@ -105,7 +105,7 @@ class ContainerController(Controller):
|
||||
part = self.app.container_ring.get_part(
|
||||
self.account_name, self.container_name)
|
||||
concurrency = self.app.container_ring.replica_count \
|
||||
if self.app.concurrent_gets else 1
|
||||
if self.app.get_policy_options(None).concurrent_gets else 1
|
||||
node_iter = self.app.iter_nodes(self.app.container_ring, part)
|
||||
params = req.params
|
||||
params['format'] = 'json'
|
||||
|
@ -864,10 +864,10 @@ class ReplicatedObjectController(BaseObjectController):
|
||||
|
||||
def _get_or_head_response(self, req, node_iter, partition, policy):
|
||||
concurrency = self.app.get_object_ring(policy.idx).replica_count \
|
||||
if self.app.concurrent_gets else 1
|
||||
if self.app.get_policy_options(policy).concurrent_gets else 1
|
||||
resp = self.GETorHEAD_base(
|
||||
req, _('Object'), node_iter, partition,
|
||||
req.swift_entity_path, concurrency)
|
||||
req.swift_entity_path, concurrency, policy)
|
||||
return resp
|
||||
|
||||
def _make_putter(self, node, part, req, headers):
|
||||
@ -2847,9 +2847,10 @@ class ECObjectController(BaseObjectController):
|
||||
|
||||
def feed_remaining_primaries(self, safe_iter, pile, req, partition, policy,
|
||||
buckets, feeder_q):
|
||||
timeout = self.app.get_policy_options(policy).concurrency_timeout
|
||||
while True:
|
||||
try:
|
||||
feeder_q.get(timeout=self.app.concurrency_timeout)
|
||||
feeder_q.get(timeout=timeout)
|
||||
except Empty:
|
||||
if safe_iter.unsafe_iter.primaries_left:
|
||||
# this will run async, if it ends up taking the last
|
||||
@ -2871,10 +2872,11 @@ class ECObjectController(BaseObjectController):
|
||||
# no fancy EC decoding here, just one plain old HEAD request to
|
||||
# one object server because all fragments hold all metadata
|
||||
# information about the object.
|
||||
concurrency = policy.ec_ndata if self.app.concurrent_gets else 1
|
||||
concurrency = policy.ec_ndata \
|
||||
if self.app.get_policy_options(policy).concurrent_gets else 1
|
||||
resp = self.GETorHEAD_base(
|
||||
req, _('Object'), node_iter, partition,
|
||||
req.swift_entity_path, concurrency)
|
||||
req.swift_entity_path, concurrency, policy)
|
||||
self._fix_response(req, resp)
|
||||
return resp
|
||||
|
||||
@ -2887,8 +2889,8 @@ class ECObjectController(BaseObjectController):
|
||||
|
||||
safe_iter = GreenthreadSafeIterator(node_iter)
|
||||
|
||||
ec_request_count = policy.ec_ndata + \
|
||||
self.app.concurrent_ec_extra_requests
|
||||
ec_request_count = policy.ec_ndata + self.app.get_policy_options(
|
||||
policy).concurrent_ec_extra_requests
|
||||
with ContextPool(ec_request_count) as pool:
|
||||
pile = GreenAsyncPile(pool)
|
||||
buckets = ECGetResponseCollection(policy)
|
||||
@ -2900,7 +2902,7 @@ class ECObjectController(BaseObjectController):
|
||||
policy, buckets.get_extra_headers)
|
||||
|
||||
feeder_q = None
|
||||
if self.app.concurrent_gets:
|
||||
if self.app.get_policy_options(policy).concurrent_gets:
|
||||
feeder_q = Queue()
|
||||
pool.spawn(self.feed_remaining_primaries, safe_iter, pile, req,
|
||||
partition, policy, buckets, feeder_q)
|
||||
|
@ -101,7 +101,8 @@ class ProxyOverrideOptions(object):
|
||||
:param conf: the proxy-server config dict.
|
||||
:param override_conf: a dict of overriding configuration options.
|
||||
"""
|
||||
def __init__(self, base_conf, override_conf):
|
||||
def __init__(self, base_conf, override_conf, app):
|
||||
|
||||
def get(key, default):
|
||||
return override_conf.get(key, base_conf.get(key, default))
|
||||
|
||||
@ -147,14 +148,25 @@ class ProxyOverrideOptions(object):
|
||||
get('write_affinity_handoff_delete_count', 'auto'), None
|
||||
)
|
||||
|
||||
self.concurrent_gets = config_true_value(get('concurrent_gets', False))
|
||||
self.concurrency_timeout = float(get(
|
||||
'concurrency_timeout', app.conn_timeout))
|
||||
self.concurrent_ec_extra_requests = int(get(
|
||||
'concurrent_ec_extra_requests', 0))
|
||||
|
||||
def __repr__(self):
|
||||
return '%s({}, {%s})' % (self.__class__.__name__, ', '.join(
|
||||
'%r: %r' % (k, getattr(self, k)) for k in (
|
||||
'sorting_method',
|
||||
'read_affinity',
|
||||
'write_affinity',
|
||||
'write_affinity_node_count',
|
||||
'write_affinity_handoff_delete_count')))
|
||||
return '%s({}, {%s}, app)' % (
|
||||
self.__class__.__name__, ', '.join(
|
||||
'%r: %r' % (k, getattr(self, k)) for k in (
|
||||
'sorting_method',
|
||||
'read_affinity',
|
||||
'write_affinity',
|
||||
'write_affinity_node_count',
|
||||
'write_affinity_handoff_delete_count',
|
||||
'concurrent_gets',
|
||||
'concurrency_timeout',
|
||||
'concurrent_ec_extra_requests',
|
||||
)))
|
||||
|
||||
def __eq__(self, other):
|
||||
if not isinstance(other, ProxyOverrideOptions):
|
||||
@ -164,7 +176,11 @@ class ProxyOverrideOptions(object):
|
||||
'read_affinity',
|
||||
'write_affinity',
|
||||
'write_affinity_node_count',
|
||||
'write_affinity_handoff_delete_count'))
|
||||
'write_affinity_handoff_delete_count',
|
||||
'concurrent_gets',
|
||||
'concurrency_timeout',
|
||||
'concurrent_ec_extra_requests',
|
||||
))
|
||||
|
||||
|
||||
class Application(object):
|
||||
@ -178,10 +194,6 @@ class Application(object):
|
||||
self.logger = get_logger(conf, log_route='proxy-server')
|
||||
else:
|
||||
self.logger = logger
|
||||
self._override_options = self._load_per_policy_config(conf)
|
||||
self.sorts_by_timing = any(pc.sorting_method == 'timing'
|
||||
for pc in self._override_options.values())
|
||||
|
||||
self._error_limiting = {}
|
||||
|
||||
swift_dir = conf.get('swift_dir', '/etc/swift')
|
||||
@ -260,11 +272,6 @@ class Application(object):
|
||||
conf.get('strict_cors_mode', 't'))
|
||||
self.node_timings = {}
|
||||
self.timing_expiry = int(conf.get('timing_expiry', 300))
|
||||
self.concurrent_gets = config_true_value(conf.get('concurrent_gets'))
|
||||
self.concurrency_timeout = float(conf.get('concurrency_timeout',
|
||||
self.conn_timeout))
|
||||
self.concurrent_ec_extra_requests = int(
|
||||
conf.get('concurrent_ec_extra_requests', 0))
|
||||
value = conf.get('request_node_count', '2 * replicas').lower().split()
|
||||
if len(value) == 1:
|
||||
rnc_value = int(value[0])
|
||||
@ -311,6 +318,10 @@ class Application(object):
|
||||
'swift.valid_api_versions',
|
||||
])))
|
||||
self.admin_key = conf.get('admin_key', None)
|
||||
self._override_options = self._load_per_policy_config(conf)
|
||||
self.sorts_by_timing = any(pc.sorting_method == 'timing'
|
||||
for pc in self._override_options.values())
|
||||
|
||||
register_swift_info(
|
||||
version=swift_version,
|
||||
strict_cors_mode=self.strict_cors_mode,
|
||||
@ -324,7 +335,7 @@ class Application(object):
|
||||
def _make_policy_override(self, policy, conf, override_conf):
|
||||
label_for_policy = _label_for_policy(policy)
|
||||
try:
|
||||
override = ProxyOverrideOptions(conf, override_conf)
|
||||
override = ProxyOverrideOptions(conf, override_conf, self)
|
||||
self.logger.debug("Loaded override config for %s: %r" %
|
||||
(label_for_policy, override))
|
||||
return override
|
||||
|
@ -2438,8 +2438,11 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase):
|
||||
raise Empty
|
||||
feeder_q.get.side_effect = feeder_timeout
|
||||
controller.feed_remaining_primaries(
|
||||
safe_iter, pile, req, 0, self.policy, mock.MagicMock(), feeder_q)
|
||||
expected_call = mock.call(timeout=self.app.concurrency_timeout)
|
||||
safe_iter, pile, req, 0, self.policy,
|
||||
mock.MagicMock(), feeder_q)
|
||||
expected_timeout = self.app.get_policy_options(
|
||||
self.policy).concurrency_timeout
|
||||
expected_call = mock.call(timeout=expected_timeout)
|
||||
expected_num_calls = self.policy.ec_nparity + 1
|
||||
self.assertEqual(feeder_q.get.call_args_list,
|
||||
[expected_call] * expected_num_calls)
|
||||
@ -2475,8 +2478,10 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase):
|
||||
|
||||
req = swift.common.swob.Request.blank('/v1/a/c/o')
|
||||
|
||||
self.app.concurrent_gets = True
|
||||
self.app.concurrency_timeout = 0.01
|
||||
policy_opts = self.app.get_policy_options(self.policy)
|
||||
policy_opts.concurrent_gets = True
|
||||
policy_opts.concurrency_timeout = 0.1
|
||||
|
||||
status_codes = ([
|
||||
FakeStatus(200, response_sleep=2.0),
|
||||
] * self.policy.ec_nparity) + ([
|
||||
@ -2511,8 +2516,10 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase):
|
||||
|
||||
req = swift.common.swob.Request.blank('/v1/a/c/o')
|
||||
|
||||
self.app.concurrent_gets = True
|
||||
self.app.concurrency_timeout = 0.01
|
||||
policy_opts = self.app.get_policy_options(self.policy)
|
||||
policy_opts.concurrent_gets = True
|
||||
policy_opts.concurrency_timeout = 0.1
|
||||
|
||||
slow_count = self.policy.ec_nparity
|
||||
status_codes = ([
|
||||
FakeStatus(200, response_sleep=2.0),
|
||||
@ -2552,8 +2559,10 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase):
|
||||
|
||||
req = swift.common.swob.Request.blank('/v1/a/c/o')
|
||||
|
||||
self.app.concurrent_gets = True
|
||||
self.app.concurrency_timeout = 0.01
|
||||
policy_opts = self.app.get_policy_options(self.policy)
|
||||
policy_opts.concurrent_gets = True
|
||||
policy_opts.concurrency_timeout = 0.1
|
||||
|
||||
unused_resp = [
|
||||
FakeStatus(200, response_sleep=2.0),
|
||||
FakeStatus(200, response_sleep=2.0),
|
||||
@ -2604,8 +2613,10 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase):
|
||||
|
||||
req = swift.common.swob.Request.blank('/v1/a/c/o')
|
||||
|
||||
self.app.concurrent_gets = True
|
||||
self.app.concurrency_timeout = 0.01
|
||||
policy_opts = self.app.get_policy_options(self.policy)
|
||||
policy_opts.concurrent_gets = True
|
||||
policy_opts.concurrency_timeout = 0.1
|
||||
|
||||
status_codes = [
|
||||
FakeStatus(200, response_sleep=2.0),
|
||||
] + ([
|
||||
@ -2644,7 +2655,8 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase):
|
||||
'X-Backend-Durable-Timestamp': ts.internal,
|
||||
'X-Backend-Data-Timestamp': ts.internal,
|
||||
})
|
||||
self.app.concurrent_ec_extra_requests = self.policy.ec_nparity - 1
|
||||
policy_opts = self.app.get_policy_options(self.policy)
|
||||
policy_opts.concurrent_ec_extra_requests = self.policy.ec_nparity - 1
|
||||
req = swift.common.swob.Request.blank('/v1/a/c/o')
|
||||
status_codes = [200] * (self.policy.object_ring.replicas - 1)
|
||||
with mocked_http_conn(*status_codes, body_iter=ec_archive_bodies,
|
||||
|
@ -1061,8 +1061,10 @@ class TestProxyServer(unittest.TestCase):
|
||||
baseapp = proxy_server.Application(app_conf,
|
||||
container_ring=FakeRing(),
|
||||
account_ring=FakeRing())
|
||||
self.assertTrue(baseapp.concurrent_gets)
|
||||
self.assertEqual(baseapp.concurrency_timeout, 0)
|
||||
policy_opts = baseapp.get_policy_options(None)
|
||||
self.assertTrue(policy_opts.concurrent_gets)
|
||||
self.assertEqual(policy_opts.concurrency_timeout, 0)
|
||||
|
||||
baseapp.update_request(req)
|
||||
resp = baseapp.handle_request(req)
|
||||
|
||||
@ -1085,7 +1087,8 @@ class TestProxyServer(unittest.TestCase):
|
||||
baseapp = proxy_server.Application(app_conf,
|
||||
container_ring=FakeRing(),
|
||||
account_ring=FakeRing())
|
||||
self.assertEqual(baseapp.concurrency_timeout, 2)
|
||||
policy_opts = baseapp.get_policy_options(None)
|
||||
self.assertEqual(policy_opts.concurrency_timeout, 2)
|
||||
baseapp.update_request(req)
|
||||
resp = baseapp.handle_request(req)
|
||||
|
||||
@ -1335,7 +1338,8 @@ class TestProxyServerLoading(unittest.TestCase):
|
||||
self.assertEqual(app.conn_timeout, 0.7)
|
||||
self.assertEqual(app.client_timeout, 1.7)
|
||||
self.assertEqual(app.post_quorum_timeout, 0.3)
|
||||
self.assertEqual(app.concurrency_timeout, 0.2)
|
||||
self.assertEqual(app.get_policy_options(
|
||||
None).concurrency_timeout, 0.2)
|
||||
|
||||
def test_concurrent_ec_options(self):
|
||||
conf = {
|
||||
@ -1347,9 +1351,11 @@ class TestProxyServerLoading(unittest.TestCase):
|
||||
policy.object_ring = FakeRing()
|
||||
app = proxy_server.Application(conf, debug_logger(),
|
||||
FakeRing(), FakeRing())
|
||||
self.assertEqual(app.concurrent_ec_extra_requests, 4)
|
||||
self.assertEqual(app.concurrent_gets, True)
|
||||
self.assertEqual(app.concurrency_timeout, 0.5)
|
||||
for policy in POLICIES:
|
||||
policy_opts = app.get_policy_options(policy)
|
||||
self.assertEqual(policy_opts.concurrent_ec_extra_requests, 4)
|
||||
self.assertEqual(policy_opts.concurrent_gets, True)
|
||||
self.assertEqual(policy_opts.concurrency_timeout, 0.5)
|
||||
|
||||
def test_load_policy_rings(self):
|
||||
for policy in POLICIES:
|
||||
@ -1553,25 +1559,51 @@ class TestProxyServerConfigLoading(unittest.TestCase):
|
||||
"ProxyOverrideOptions({}, {'sorting_method': 'shuffle', "
|
||||
"'read_affinity': '', 'write_affinity': '', "
|
||||
"'write_affinity_node_count': '2 * replicas', "
|
||||
"'write_affinity_handoff_delete_count': None})",
|
||||
"'write_affinity_handoff_delete_count': None, "
|
||||
"'concurrent_gets': False, 'concurrency_timeout': 0.5, "
|
||||
"'concurrent_ec_extra_requests': 0"
|
||||
"}, app)",
|
||||
repr(default_options))
|
||||
self.assertEqual(default_options, eval(repr(default_options), {
|
||||
'ProxyOverrideOptions': default_options.__class__}))
|
||||
'ProxyOverrideOptions': default_options.__class__, 'app': app}))
|
||||
|
||||
policy_0_options = app.get_policy_options(POLICIES[0])
|
||||
self.assertEqual(
|
||||
"ProxyOverrideOptions({}, {'sorting_method': 'affinity', "
|
||||
"'read_affinity': 'r1=100', 'write_affinity': 'r1', "
|
||||
"'write_affinity_node_count': '1 * replicas', "
|
||||
"'write_affinity_handoff_delete_count': 4})",
|
||||
"'write_affinity_handoff_delete_count': 4, "
|
||||
"'concurrent_gets': False, 'concurrency_timeout': 0.5, "
|
||||
"'concurrent_ec_extra_requests': 0"
|
||||
"}, app)",
|
||||
repr(policy_0_options))
|
||||
self.assertEqual(policy_0_options, eval(repr(policy_0_options), {
|
||||
'ProxyOverrideOptions': policy_0_options.__class__}))
|
||||
'ProxyOverrideOptions': default_options.__class__, 'app': app}))
|
||||
self.assertNotEqual(default_options, policy_0_options)
|
||||
|
||||
policy_1_options = app.get_policy_options(POLICIES[1])
|
||||
self.assertIs(default_options, policy_1_options)
|
||||
|
||||
def test_per_policy_conf_equality(self):
|
||||
conf_sections = """
|
||||
[app:proxy-server]
|
||||
use = egg:swift#proxy
|
||||
"""
|
||||
app = self._write_conf_and_load_app(conf_sections)
|
||||
self.assertIs(app.get_policy_options(None),
|
||||
app.get_policy_options(POLICIES[0]))
|
||||
|
||||
conf_sections = """
|
||||
[app:proxy-server]
|
||||
use = egg:swift#proxy
|
||||
|
||||
[proxy-server:policy:0]
|
||||
concurrent_ec_extra_requests = 1
|
||||
"""
|
||||
app = self._write_conf_and_load_app(conf_sections)
|
||||
self.assertNotEqual(app.get_policy_options(None),
|
||||
app.get_policy_options(POLICIES[0]))
|
||||
|
||||
def test_per_policy_conf_inherits_defaults(self):
|
||||
conf_sections = """
|
||||
[app:proxy-server]
|
||||
@ -1999,6 +2031,39 @@ class TestProxyServerConfigLoading(unittest.TestCase):
|
||||
do_test('uno')
|
||||
do_test('0.0')
|
||||
|
||||
def test_per_policy_conf_overrides_default_concurrency_settings(self):
|
||||
conf_sections = """
|
||||
[app:proxy-server]
|
||||
use = egg:swift#proxy
|
||||
concurrent_gets = True
|
||||
concurrency_timeout = 0.5
|
||||
|
||||
[proxy-server:policy:0]
|
||||
concurrent_gets = off
|
||||
concurrency_timeout = 0.6
|
||||
|
||||
[proxy-server:policy:1]
|
||||
concurrent_gets = True
|
||||
concurrency_timeout = 0.3
|
||||
concurrent_ec_extra_requests = 1
|
||||
"""
|
||||
exp_options = {
|
||||
None: {
|
||||
"concurrent_gets": True,
|
||||
"concurrency_timeout": 0.5,
|
||||
"concurrent_ec_extra_requests": 0,
|
||||
}, POLICIES[0]: {
|
||||
"concurrent_gets": False,
|
||||
"concurrency_timeout": 0.6,
|
||||
"concurrent_ec_extra_requests": 0,
|
||||
}, POLICIES[1]: {
|
||||
"concurrent_gets": True,
|
||||
"concurrency_timeout": 0.3,
|
||||
"concurrent_ec_extra_requests": 1,
|
||||
}}
|
||||
app = self._write_conf_and_load_app(conf_sections)
|
||||
self._check_policy_options(app, exp_options, {})
|
||||
|
||||
|
||||
class TestProxyServerConfigStringLoading(TestProxyServerConfigLoading):
|
||||
# The proxy may be loaded from a conf string rather than a conf file, for
|
||||
|
Loading…
x
Reference in New Issue
Block a user