From f043aedec113816ca36c915ba0fcbe1fdc1a765e Mon Sep 17 00:00:00 2001 From: Clay Gerrard Date: Fri, 19 Jun 2020 17:02:28 -0500 Subject: [PATCH] Make all concurrent_get options per-policy Change-Id: Ib81f77cc343c3435d7e6258d4631563fa022d449 --- etc/proxy-server.conf-sample | 5 +- swift/proxy/controllers/account.py | 2 +- swift/proxy/controllers/base.py | 13 ++-- swift/proxy/controllers/container.py | 2 +- swift/proxy/controllers/obj.py | 18 ++--- swift/proxy/server.py | 49 ++++++++------ test/unit/proxy/controllers/test_obj.py | 34 ++++++---- test/unit/proxy/test_server.py | 87 +++++++++++++++++++++---- 8 files changed, 153 insertions(+), 57 deletions(-) diff --git a/etc/proxy-server.conf-sample b/etc/proxy-server.conf-sample index 487e443b22..13feee38fe 100644 --- a/etc/proxy-server.conf-sample +++ b/etc/proxy-server.conf-sample @@ -202,7 +202,7 @@ use = egg:swift#proxy # # By default on a GET/HEAD swift will connect to a minimum number storage nodes # in a minimum number of threads - for replicated data just a single request to -# a single node one at a time. When enabled concurrent_gets allows the proxy, +# a single node one at a time. When enabled concurrent_gets allows the proxy # to use up to replica count threads when waiting on a response. In # conjunction with the concurrency_timeout option this will allow swift to send # out GET/HEAD requests to the storage nodes concurrently and answer as soon as @@ -307,6 +307,9 @@ use = egg:swift#proxy # write_affinity = # write_affinity_node_count = # write_affinity_handoff_delete_count = +# concurrent_gets = off +# concurrency_timeout = 0.5 +# concurrent_ec_extra_requests = 0 [filter:tempauth] use = egg:swift#tempauth diff --git a/swift/proxy/controllers/account.py b/swift/proxy/controllers/account.py index 979e6bb1fa..7908d337d6 100644 --- a/swift/proxy/controllers/account.py +++ b/swift/proxy/controllers/account.py @@ -64,7 +64,7 @@ class AccountController(Controller): partition = self.app.account_ring.get_part(self.account_name) concurrency = self.app.account_ring.replica_count \ - if self.app.concurrent_gets else 1 + if self.app.get_policy_options(None).concurrent_gets else 1 node_iter = self.app.iter_nodes(self.app.account_ring, partition) params = req.params params['format'] = 'json' diff --git a/swift/proxy/controllers/base.py b/swift/proxy/controllers/base.py index 60e5ff7bac..94f61434a4 100644 --- a/swift/proxy/controllers/base.py +++ b/swift/proxy/controllers/base.py @@ -856,8 +856,8 @@ class ByteCountEnforcer(object): class GetOrHeadHandler(object): def __init__(self, app, req, server_type, node_iter, partition, path, - backend_headers, concurrency=1, client_chunk_size=None, - newest=None): + backend_headers, concurrency=1, policy=None, + client_chunk_size=None, newest=None): self.app = app self.node_iter = node_iter self.server_type = server_type @@ -870,6 +870,7 @@ class GetOrHeadHandler(object): self.used_nodes = [] self.used_source_etag = '' self.concurrency = concurrency + self.policy = policy self.node = None self.latest_404_timestamp = Timestamp(0) @@ -1363,7 +1364,8 @@ class GetOrHeadHandler(object): for node in nodes: pile.spawn(self._make_node_request, node, node_timeout, self.app.logger.thread_locals) - _timeout = self.app.concurrency_timeout \ + _timeout = self.app.get_policy_options( + self.policy).concurrency_timeout \ if pile.inflight < self.concurrency else None if pile.waitfirst(_timeout): break @@ -1998,7 +2000,7 @@ class Controller(object): return False def GETorHEAD_base(self, req, server_type, node_iter, partition, path, - concurrency=1, client_chunk_size=None): + concurrency=1, policy=None, client_chunk_size=None): """ Base handler for HTTP GET or HEAD requests. @@ -2008,6 +2010,7 @@ class Controller(object): :param partition: partition :param path: path for the request :param concurrency: number of requests to run concurrently + :param policy: the policy instance, or None if Account or Container :param client_chunk_size: chunk size for response body iterator :returns: swob.Response object """ @@ -2016,7 +2019,7 @@ class Controller(object): handler = GetOrHeadHandler(self.app, req, self.server_type, node_iter, partition, path, backend_headers, - concurrency, + concurrency, policy=policy, client_chunk_size=client_chunk_size) res = handler.get_working_response(req) diff --git a/swift/proxy/controllers/container.py b/swift/proxy/controllers/container.py index 2235db757c..feebcc3f74 100644 --- a/swift/proxy/controllers/container.py +++ b/swift/proxy/controllers/container.py @@ -105,7 +105,7 @@ class ContainerController(Controller): part = self.app.container_ring.get_part( self.account_name, self.container_name) concurrency = self.app.container_ring.replica_count \ - if self.app.concurrent_gets else 1 + if self.app.get_policy_options(None).concurrent_gets else 1 node_iter = self.app.iter_nodes(self.app.container_ring, part) params = req.params params['format'] = 'json' diff --git a/swift/proxy/controllers/obj.py b/swift/proxy/controllers/obj.py index 6fa359d679..6d076ad87c 100644 --- a/swift/proxy/controllers/obj.py +++ b/swift/proxy/controllers/obj.py @@ -864,10 +864,10 @@ class ReplicatedObjectController(BaseObjectController): def _get_or_head_response(self, req, node_iter, partition, policy): concurrency = self.app.get_object_ring(policy.idx).replica_count \ - if self.app.concurrent_gets else 1 + if self.app.get_policy_options(policy).concurrent_gets else 1 resp = self.GETorHEAD_base( req, _('Object'), node_iter, partition, - req.swift_entity_path, concurrency) + req.swift_entity_path, concurrency, policy) return resp def _make_putter(self, node, part, req, headers): @@ -2847,9 +2847,10 @@ class ECObjectController(BaseObjectController): def feed_remaining_primaries(self, safe_iter, pile, req, partition, policy, buckets, feeder_q): + timeout = self.app.get_policy_options(policy).concurrency_timeout while True: try: - feeder_q.get(timeout=self.app.concurrency_timeout) + feeder_q.get(timeout=timeout) except Empty: if safe_iter.unsafe_iter.primaries_left: # this will run async, if it ends up taking the last @@ -2871,10 +2872,11 @@ class ECObjectController(BaseObjectController): # no fancy EC decoding here, just one plain old HEAD request to # one object server because all fragments hold all metadata # information about the object. - concurrency = policy.ec_ndata if self.app.concurrent_gets else 1 + concurrency = policy.ec_ndata \ + if self.app.get_policy_options(policy).concurrent_gets else 1 resp = self.GETorHEAD_base( req, _('Object'), node_iter, partition, - req.swift_entity_path, concurrency) + req.swift_entity_path, concurrency, policy) self._fix_response(req, resp) return resp @@ -2887,8 +2889,8 @@ class ECObjectController(BaseObjectController): safe_iter = GreenthreadSafeIterator(node_iter) - ec_request_count = policy.ec_ndata + \ - self.app.concurrent_ec_extra_requests + ec_request_count = policy.ec_ndata + self.app.get_policy_options( + policy).concurrent_ec_extra_requests with ContextPool(ec_request_count) as pool: pile = GreenAsyncPile(pool) buckets = ECGetResponseCollection(policy) @@ -2900,7 +2902,7 @@ class ECObjectController(BaseObjectController): policy, buckets.get_extra_headers) feeder_q = None - if self.app.concurrent_gets: + if self.app.get_policy_options(policy).concurrent_gets: feeder_q = Queue() pool.spawn(self.feed_remaining_primaries, safe_iter, pile, req, partition, policy, buckets, feeder_q) diff --git a/swift/proxy/server.py b/swift/proxy/server.py index 1a0928f692..ac30e684c0 100644 --- a/swift/proxy/server.py +++ b/swift/proxy/server.py @@ -101,7 +101,8 @@ class ProxyOverrideOptions(object): :param conf: the proxy-server config dict. :param override_conf: a dict of overriding configuration options. """ - def __init__(self, base_conf, override_conf): + def __init__(self, base_conf, override_conf, app): + def get(key, default): return override_conf.get(key, base_conf.get(key, default)) @@ -147,14 +148,25 @@ class ProxyOverrideOptions(object): get('write_affinity_handoff_delete_count', 'auto'), None ) + self.concurrent_gets = config_true_value(get('concurrent_gets', False)) + self.concurrency_timeout = float(get( + 'concurrency_timeout', app.conn_timeout)) + self.concurrent_ec_extra_requests = int(get( + 'concurrent_ec_extra_requests', 0)) + def __repr__(self): - return '%s({}, {%s})' % (self.__class__.__name__, ', '.join( - '%r: %r' % (k, getattr(self, k)) for k in ( - 'sorting_method', - 'read_affinity', - 'write_affinity', - 'write_affinity_node_count', - 'write_affinity_handoff_delete_count'))) + return '%s({}, {%s}, app)' % ( + self.__class__.__name__, ', '.join( + '%r: %r' % (k, getattr(self, k)) for k in ( + 'sorting_method', + 'read_affinity', + 'write_affinity', + 'write_affinity_node_count', + 'write_affinity_handoff_delete_count', + 'concurrent_gets', + 'concurrency_timeout', + 'concurrent_ec_extra_requests', + ))) def __eq__(self, other): if not isinstance(other, ProxyOverrideOptions): @@ -164,7 +176,11 @@ class ProxyOverrideOptions(object): 'read_affinity', 'write_affinity', 'write_affinity_node_count', - 'write_affinity_handoff_delete_count')) + 'write_affinity_handoff_delete_count', + 'concurrent_gets', + 'concurrency_timeout', + 'concurrent_ec_extra_requests', + )) class Application(object): @@ -178,10 +194,6 @@ class Application(object): self.logger = get_logger(conf, log_route='proxy-server') else: self.logger = logger - self._override_options = self._load_per_policy_config(conf) - self.sorts_by_timing = any(pc.sorting_method == 'timing' - for pc in self._override_options.values()) - self._error_limiting = {} swift_dir = conf.get('swift_dir', '/etc/swift') @@ -260,11 +272,6 @@ class Application(object): conf.get('strict_cors_mode', 't')) self.node_timings = {} self.timing_expiry = int(conf.get('timing_expiry', 300)) - self.concurrent_gets = config_true_value(conf.get('concurrent_gets')) - self.concurrency_timeout = float(conf.get('concurrency_timeout', - self.conn_timeout)) - self.concurrent_ec_extra_requests = int( - conf.get('concurrent_ec_extra_requests', 0)) value = conf.get('request_node_count', '2 * replicas').lower().split() if len(value) == 1: rnc_value = int(value[0]) @@ -311,6 +318,10 @@ class Application(object): 'swift.valid_api_versions', ]))) self.admin_key = conf.get('admin_key', None) + self._override_options = self._load_per_policy_config(conf) + self.sorts_by_timing = any(pc.sorting_method == 'timing' + for pc in self._override_options.values()) + register_swift_info( version=swift_version, strict_cors_mode=self.strict_cors_mode, @@ -324,7 +335,7 @@ class Application(object): def _make_policy_override(self, policy, conf, override_conf): label_for_policy = _label_for_policy(policy) try: - override = ProxyOverrideOptions(conf, override_conf) + override = ProxyOverrideOptions(conf, override_conf, self) self.logger.debug("Loaded override config for %s: %r" % (label_for_policy, override)) return override diff --git a/test/unit/proxy/controllers/test_obj.py b/test/unit/proxy/controllers/test_obj.py index fb84af7fd9..355eb56152 100644 --- a/test/unit/proxy/controllers/test_obj.py +++ b/test/unit/proxy/controllers/test_obj.py @@ -2438,8 +2438,11 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase): raise Empty feeder_q.get.side_effect = feeder_timeout controller.feed_remaining_primaries( - safe_iter, pile, req, 0, self.policy, mock.MagicMock(), feeder_q) - expected_call = mock.call(timeout=self.app.concurrency_timeout) + safe_iter, pile, req, 0, self.policy, + mock.MagicMock(), feeder_q) + expected_timeout = self.app.get_policy_options( + self.policy).concurrency_timeout + expected_call = mock.call(timeout=expected_timeout) expected_num_calls = self.policy.ec_nparity + 1 self.assertEqual(feeder_q.get.call_args_list, [expected_call] * expected_num_calls) @@ -2475,8 +2478,10 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase): req = swift.common.swob.Request.blank('/v1/a/c/o') - self.app.concurrent_gets = True - self.app.concurrency_timeout = 0.01 + policy_opts = self.app.get_policy_options(self.policy) + policy_opts.concurrent_gets = True + policy_opts.concurrency_timeout = 0.1 + status_codes = ([ FakeStatus(200, response_sleep=2.0), ] * self.policy.ec_nparity) + ([ @@ -2511,8 +2516,10 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase): req = swift.common.swob.Request.blank('/v1/a/c/o') - self.app.concurrent_gets = True - self.app.concurrency_timeout = 0.01 + policy_opts = self.app.get_policy_options(self.policy) + policy_opts.concurrent_gets = True + policy_opts.concurrency_timeout = 0.1 + slow_count = self.policy.ec_nparity status_codes = ([ FakeStatus(200, response_sleep=2.0), @@ -2552,8 +2559,10 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase): req = swift.common.swob.Request.blank('/v1/a/c/o') - self.app.concurrent_gets = True - self.app.concurrency_timeout = 0.01 + policy_opts = self.app.get_policy_options(self.policy) + policy_opts.concurrent_gets = True + policy_opts.concurrency_timeout = 0.1 + unused_resp = [ FakeStatus(200, response_sleep=2.0), FakeStatus(200, response_sleep=2.0), @@ -2604,8 +2613,10 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase): req = swift.common.swob.Request.blank('/v1/a/c/o') - self.app.concurrent_gets = True - self.app.concurrency_timeout = 0.01 + policy_opts = self.app.get_policy_options(self.policy) + policy_opts.concurrent_gets = True + policy_opts.concurrency_timeout = 0.1 + status_codes = [ FakeStatus(200, response_sleep=2.0), ] + ([ @@ -2644,7 +2655,8 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase): 'X-Backend-Durable-Timestamp': ts.internal, 'X-Backend-Data-Timestamp': ts.internal, }) - self.app.concurrent_ec_extra_requests = self.policy.ec_nparity - 1 + policy_opts = self.app.get_policy_options(self.policy) + policy_opts.concurrent_ec_extra_requests = self.policy.ec_nparity - 1 req = swift.common.swob.Request.blank('/v1/a/c/o') status_codes = [200] * (self.policy.object_ring.replicas - 1) with mocked_http_conn(*status_codes, body_iter=ec_archive_bodies, diff --git a/test/unit/proxy/test_server.py b/test/unit/proxy/test_server.py index 035d852671..4067cc636b 100644 --- a/test/unit/proxy/test_server.py +++ b/test/unit/proxy/test_server.py @@ -1061,8 +1061,10 @@ class TestProxyServer(unittest.TestCase): baseapp = proxy_server.Application(app_conf, container_ring=FakeRing(), account_ring=FakeRing()) - self.assertTrue(baseapp.concurrent_gets) - self.assertEqual(baseapp.concurrency_timeout, 0) + policy_opts = baseapp.get_policy_options(None) + self.assertTrue(policy_opts.concurrent_gets) + self.assertEqual(policy_opts.concurrency_timeout, 0) + baseapp.update_request(req) resp = baseapp.handle_request(req) @@ -1085,7 +1087,8 @@ class TestProxyServer(unittest.TestCase): baseapp = proxy_server.Application(app_conf, container_ring=FakeRing(), account_ring=FakeRing()) - self.assertEqual(baseapp.concurrency_timeout, 2) + policy_opts = baseapp.get_policy_options(None) + self.assertEqual(policy_opts.concurrency_timeout, 2) baseapp.update_request(req) resp = baseapp.handle_request(req) @@ -1335,7 +1338,8 @@ class TestProxyServerLoading(unittest.TestCase): self.assertEqual(app.conn_timeout, 0.7) self.assertEqual(app.client_timeout, 1.7) self.assertEqual(app.post_quorum_timeout, 0.3) - self.assertEqual(app.concurrency_timeout, 0.2) + self.assertEqual(app.get_policy_options( + None).concurrency_timeout, 0.2) def test_concurrent_ec_options(self): conf = { @@ -1347,9 +1351,11 @@ class TestProxyServerLoading(unittest.TestCase): policy.object_ring = FakeRing() app = proxy_server.Application(conf, debug_logger(), FakeRing(), FakeRing()) - self.assertEqual(app.concurrent_ec_extra_requests, 4) - self.assertEqual(app.concurrent_gets, True) - self.assertEqual(app.concurrency_timeout, 0.5) + for policy in POLICIES: + policy_opts = app.get_policy_options(policy) + self.assertEqual(policy_opts.concurrent_ec_extra_requests, 4) + self.assertEqual(policy_opts.concurrent_gets, True) + self.assertEqual(policy_opts.concurrency_timeout, 0.5) def test_load_policy_rings(self): for policy in POLICIES: @@ -1553,25 +1559,51 @@ class TestProxyServerConfigLoading(unittest.TestCase): "ProxyOverrideOptions({}, {'sorting_method': 'shuffle', " "'read_affinity': '', 'write_affinity': '', " "'write_affinity_node_count': '2 * replicas', " - "'write_affinity_handoff_delete_count': None})", + "'write_affinity_handoff_delete_count': None, " + "'concurrent_gets': False, 'concurrency_timeout': 0.5, " + "'concurrent_ec_extra_requests': 0" + "}, app)", repr(default_options)) self.assertEqual(default_options, eval(repr(default_options), { - 'ProxyOverrideOptions': default_options.__class__})) + 'ProxyOverrideOptions': default_options.__class__, 'app': app})) policy_0_options = app.get_policy_options(POLICIES[0]) self.assertEqual( "ProxyOverrideOptions({}, {'sorting_method': 'affinity', " "'read_affinity': 'r1=100', 'write_affinity': 'r1', " "'write_affinity_node_count': '1 * replicas', " - "'write_affinity_handoff_delete_count': 4})", + "'write_affinity_handoff_delete_count': 4, " + "'concurrent_gets': False, 'concurrency_timeout': 0.5, " + "'concurrent_ec_extra_requests': 0" + "}, app)", repr(policy_0_options)) self.assertEqual(policy_0_options, eval(repr(policy_0_options), { - 'ProxyOverrideOptions': policy_0_options.__class__})) + 'ProxyOverrideOptions': default_options.__class__, 'app': app})) self.assertNotEqual(default_options, policy_0_options) policy_1_options = app.get_policy_options(POLICIES[1]) self.assertIs(default_options, policy_1_options) + def test_per_policy_conf_equality(self): + conf_sections = """ + [app:proxy-server] + use = egg:swift#proxy + """ + app = self._write_conf_and_load_app(conf_sections) + self.assertIs(app.get_policy_options(None), + app.get_policy_options(POLICIES[0])) + + conf_sections = """ + [app:proxy-server] + use = egg:swift#proxy + + [proxy-server:policy:0] + concurrent_ec_extra_requests = 1 + """ + app = self._write_conf_and_load_app(conf_sections) + self.assertNotEqual(app.get_policy_options(None), + app.get_policy_options(POLICIES[0])) + def test_per_policy_conf_inherits_defaults(self): conf_sections = """ [app:proxy-server] @@ -1999,6 +2031,39 @@ class TestProxyServerConfigLoading(unittest.TestCase): do_test('uno') do_test('0.0') + def test_per_policy_conf_overrides_default_concurrency_settings(self): + conf_sections = """ + [app:proxy-server] + use = egg:swift#proxy + concurrent_gets = True + concurrency_timeout = 0.5 + + [proxy-server:policy:0] + concurrent_gets = off + concurrency_timeout = 0.6 + + [proxy-server:policy:1] + concurrent_gets = True + concurrency_timeout = 0.3 + concurrent_ec_extra_requests = 1 + """ + exp_options = { + None: { + "concurrent_gets": True, + "concurrency_timeout": 0.5, + "concurrent_ec_extra_requests": 0, + }, POLICIES[0]: { + "concurrent_gets": False, + "concurrency_timeout": 0.6, + "concurrent_ec_extra_requests": 0, + }, POLICIES[1]: { + "concurrent_gets": True, + "concurrency_timeout": 0.3, + "concurrent_ec_extra_requests": 1, + }} + app = self._write_conf_and_load_app(conf_sections) + self._check_policy_options(app, exp_options, {}) + class TestProxyServerConfigStringLoading(TestProxyServerConfigLoading): # The proxy may be loaded from a conf string rather than a conf file, for