From e290d47c435627082b3825b67e27ea3a3c99f01f Mon Sep 17 00:00:00 2001 From: indianwhocodes Date: Wed, 21 Jun 2023 08:36:54 -0700 Subject: [PATCH] unify duplicate code in replication, EC GET paths The GetOrHeadHandler class in base.py and the ECFragGetter class in obj.py have some methods that are very similar. The two existing classes do also have unique code so they will remain, but as subclasses of the common base class called GetterBase. Change-Id: I893c5fcb6b4f8a7dda351169f5f6b37375a34817 --- swift/proxy/controllers/base.py | 92 ++++++++++++++++++--------------- swift/proxy/controllers/obj.py | 92 +++------------------------------ 2 files changed, 56 insertions(+), 128 deletions(-) diff --git a/swift/proxy/controllers/base.py b/swift/proxy/controllers/base.py index 3dbc5c0326..2c076888db 100644 --- a/swift/proxy/controllers/base.py +++ b/swift/proxy/controllers/base.py @@ -1014,55 +1014,21 @@ class ByteCountEnforcer(object): return chunk -class GetOrHeadHandler(object): - def __init__(self, app, req, server_type, node_iter, partition, path, - backend_headers, concurrency=1, policy=None, - client_chunk_size=None, newest=None, logger=None): +class GetterBase(object): + def __init__(self, app, req, node_iter, partition, policy, + path, backend_headers, logger=None): self.app = app + self.req = req self.node_iter = node_iter - self.server_type = server_type self.partition = partition + self.policy = policy self.path = path self.backend_headers = backend_headers - self.client_chunk_size = client_chunk_size self.logger = logger or app.logger - self.skip_bytes = 0 self.bytes_used_from_backend = 0 - self.used_nodes = [] - self.used_source_etag = '' - self.concurrency = concurrency - self.policy = policy self.node = None self.source = None self.source_parts_iter = None - self.latest_404_timestamp = Timestamp(0) - if self.server_type == 'Object': - self.node_timeout = self.app.recoverable_node_timeout - else: - self.node_timeout = self.app.node_timeout - policy_options = self.app.get_policy_options(self.policy) - self.rebalance_missing_suppression_count = min( - policy_options.rebalance_missing_suppression_count, - node_iter.num_primary_nodes - 1) - - # stuff from request - self.req_method = req.method - self.req_path = req.path - self.req_query_string = req.query_string - if newest is None: - self.newest = config_true_value(req.headers.get('x-newest', 'f')) - else: - self.newest = newest - - # populated when finding source - self.statuses = [] - self.reasons = [] - self.bodies = [] - self.source_headers = [] - self.sources = [] - - # populated from response headers - self.start_byte = self.end_byte = self.length = None def fast_forward(self, num_bytes): """ @@ -1136,6 +1102,46 @@ class GetOrHeadHandler(object): else: self.backend_headers.pop('Range') + +class GetOrHeadHandler(GetterBase): + def __init__(self, app, req, server_type, node_iter, partition, path, + backend_headers, concurrency=1, policy=None, + client_chunk_size=None, newest=None, logger=None): + super(GetOrHeadHandler, self).__init__( + app=app, req=req, node_iter=node_iter, + partition=partition, policy=policy, path=path, + backend_headers=backend_headers, logger=logger) + self.server_type = server_type + self.client_chunk_size = client_chunk_size + self.skip_bytes = 0 + self.used_nodes = [] + self.used_source_etag = '' + self.concurrency = concurrency + self.latest_404_timestamp = Timestamp(0) + if self.server_type == 'Object': + self.node_timeout = self.app.recoverable_node_timeout + else: + self.node_timeout = self.app.node_timeout + policy_options = self.app.get_policy_options(self.policy) + self.rebalance_missing_suppression_count = min( + policy_options.rebalance_missing_suppression_count, + node_iter.num_primary_nodes - 1) + + if newest is None: + self.newest = config_true_value(req.headers.get('x-newest', 'f')) + else: + self.newest = newest + + # populated when finding source + self.statuses = [] + self.reasons = [] + self.bodies = [] + self.source_headers = [] + self.sources = [] + + # populated from response headers + self.start_byte = self.end_byte = self.length = None + def learn_size_from_content_range(self, start, end, length): """ If client_chunk_size is set, makes sure we yield things starting on @@ -1403,9 +1409,9 @@ class GetOrHeadHandler(object): with ConnectionTimeout(self.app.conn_timeout): conn = http_connect( ip, port, node['device'], - self.partition, self.req_method, self.path, + self.partition, self.req.method, self.path, headers=req_headers, - query_string=self.req_query_string) + query_string=self.req.query_string) self.app.set_node_timing(node, time.time() - start_node_timing) with Timeout(node_timeout): @@ -1416,7 +1422,7 @@ class GetOrHeadHandler(object): self.app.exception_occurred( node, self.server_type, 'Trying to %(method)s %(path)s' % - {'method': self.req_method, 'path': self.req_path}) + {'method': self.req.method, 'path': self.req.path}) return False src_headers = dict( @@ -1486,7 +1492,7 @@ class GetOrHeadHandler(object): if ts > self.latest_404_timestamp: self.latest_404_timestamp = ts self.app.check_response(node, self.server_type, possible_source, - self.req_method, self.path, + self.req.method, self.path, self.bodies[-1]) return False diff --git a/swift/proxy/controllers/obj.py b/swift/proxy/controllers/obj.py index cdc354f701..d1221c7667 100644 --- a/swift/proxy/controllers/obj.py +++ b/swift/proxy/controllers/obj.py @@ -69,7 +69,7 @@ from swift.common.storage_policy import (POLICIES, REPL_POLICY, EC_POLICY, ECDriverError, PolicyError) from swift.proxy.controllers.base import Controller, delay_denial, \ cors_validation, update_headers, bytes_to_skip, close_swift_conn, \ - ByteCountEnforcer, record_cache_op_metrics, get_cache_key + ByteCountEnforcer, record_cache_op_metrics, get_cache_key, GetterBase from swift.common.swob import HTTPAccepted, HTTPBadRequest, HTTPNotFound, \ HTTPPreconditionFailed, HTTPRequestEntityTooLarge, HTTPRequestTimeout, \ HTTPServerError, HTTPServiceUnavailable, HTTPClientDisconnect, \ @@ -2490,97 +2490,19 @@ def is_good_source(status): return is_success(status) or is_redirection(status) -class ECFragGetter(object): +class ECFragGetter(GetterBase): def __init__(self, app, req, node_iter, partition, policy, path, backend_headers, header_provider, logger_thread_locals, logger): - self.app = app - self.req = req - self.node_iter = node_iter - self.partition = partition - self.path = path - self.backend_headers = backend_headers + super(ECFragGetter, self).__init__( + app=app, req=req, node_iter=node_iter, + partition=partition, policy=policy, path=path, + backend_headers=backend_headers, logger=logger) self.header_provider = header_provider - self.req_query_string = req.query_string self.fragment_size = policy.fragment_size self.skip_bytes = 0 - self.bytes_used_from_backend = 0 - self.source = self.node = None self.logger_thread_locals = logger_thread_locals - self.logger = logger - - def fast_forward(self, num_bytes): - """ - Will skip num_bytes into the current ranges. - - :params num_bytes: the number of bytes that have already been read on - this request. This will change the Range header - so that the next req will start where it left off. - - :raises HTTPRequestedRangeNotSatisfiable: if begin + num_bytes - > end of range + 1 - :raises RangeAlreadyComplete: if begin + num_bytes == end of range + 1 - """ - try: - req_range = Range(self.backend_headers.get('Range')) - except ValueError: - req_range = None - - if req_range: - begin, end = req_range.ranges[0] - if begin is None: - # this is a -50 range req (last 50 bytes of file) - end -= num_bytes - if end == 0: - # we sent out exactly the first range's worth of bytes, so - # we're done with it - raise RangeAlreadyComplete() - - if end < 0: - raise HTTPRequestedRangeNotSatisfiable() - - else: - begin += num_bytes - if end is not None and begin == end + 1: - # we sent out exactly the first range's worth of bytes, so - # we're done with it - raise RangeAlreadyComplete() - - if end is not None and begin > end: - raise HTTPRequestedRangeNotSatisfiable() - - req_range.ranges = [(begin, end)] + req_range.ranges[1:] - self.backend_headers['Range'] = str(req_range) - else: - self.backend_headers['Range'] = 'bytes=%d-' % num_bytes - - # Reset so if we need to do this more than once, we don't double-up - self.bytes_used_from_backend = 0 - - def pop_range(self): - """ - Remove the first byterange from our Range header. - - This is used after a byterange has been completely sent to the - client; this way, should we need to resume the download from another - object server, we do not re-fetch byteranges that the client already - has. - - If we have no Range header, this is a no-op. - """ - if 'Range' in self.backend_headers: - try: - req_range = Range(self.backend_headers['Range']) - except ValueError: - # there's a Range header, but it's garbage, so get rid of it - self.backend_headers.pop('Range') - return - begin, end = req_range.ranges.pop(0) - if len(req_range.ranges) > 0: - self.backend_headers['Range'] = str(req_range) - else: - self.backend_headers.pop('Range') def learn_size_from_content_range(self, start, end, length): """ @@ -2833,7 +2755,7 @@ class ECFragGetter(object): ip, port, node['device'], self.partition, 'GET', self.path, headers=req_headers, - query_string=self.req_query_string) + query_string=self.req.query_string) self.app.set_node_timing(node, time.time() - start_node_timing) with Timeout(node_timeout):