diff --git a/doc/manpages/proxy-server.conf.5 b/doc/manpages/proxy-server.conf.5 index 942bcac92a..1fea62bc7a 100644 --- a/doc/manpages/proxy-server.conf.5 +++ b/doc/manpages/proxy-server.conf.5 @@ -1061,8 +1061,6 @@ recheck_account_existence before the 403s kick in. This is a comma separated list of account hashes that ignore the max_containers_per_account cap. .IP \fBdeny_host_headers\fR Comma separated list of Host headers to which the proxy will deny requests. The default is empty. -.IP \fBput_queue_depth\fR -Depth of the proxy put queue. The default is 10. .IP \fBsorting_method\fR Storage nodes can be chosen at random (shuffle - default), by using timing measurements (timing), or by using an explicit match (affinity). diff --git a/etc/proxy-server.conf-sample b/etc/proxy-server.conf-sample index 027182d83a..030d0d93d8 100644 --- a/etc/proxy-server.conf-sample +++ b/etc/proxy-server.conf-sample @@ -188,9 +188,6 @@ use = egg:swift#proxy # Prefix used when automatically creating accounts. # auto_create_account_prefix = . # -# Depth of the proxy put queue. -# put_queue_depth = 10 -# # During GET and HEAD requests, storage nodes can be chosen at random # (shuffle), by using timing measurements (timing), or by using an explicit # region/zone match (affinity). Using timing measurements may allow for lower diff --git a/swift/proxy/controllers/obj.py b/swift/proxy/controllers/obj.py index 8508e2a0b4..e272287ab4 100644 --- a/swift/proxy/controllers/obj.py +++ b/swift/proxy/controllers/obj.py @@ -876,6 +876,8 @@ class ReplicatedObjectController(BaseObjectController): node, part, req.swift_entity_path, headers, conn_timeout=self.app.conn_timeout, node_timeout=self.app.node_timeout, + write_timeout=self.app.node_timeout, + send_exception_handler=self.app.exception_occurred, logger=self.app.logger, need_multiphase=False) else: @@ -884,6 +886,8 @@ class ReplicatedObjectController(BaseObjectController): node, part, req.swift_entity_path, headers, conn_timeout=self.app.conn_timeout, node_timeout=self.app.node_timeout, + write_timeout=self.app.node_timeout, + send_exception_handler=self.app.exception_occurred, logger=self.app.logger, chunked=te.endswith(',chunked')) return putter @@ -910,42 +914,35 @@ class ReplicatedObjectController(BaseObjectController): min_conns = quorum_size(len(nodes)) try: - with ContextPool(len(nodes)) as pool: - for putter in putters: - putter.spawn_sender_greenthread( - pool, self.app.put_queue_depth, self.app.node_timeout, - self.app.exception_occurred) - while True: - with ChunkReadTimeout(self.app.client_timeout): - try: - chunk = next(data_source) - except StopIteration: - break - bytes_transferred += len(chunk) - if bytes_transferred > constraints.MAX_FILE_SIZE: - raise HTTPRequestEntityTooLarge(request=req) + while True: + with ChunkReadTimeout(self.app.client_timeout): + try: + chunk = next(data_source) + except StopIteration: + break + bytes_transferred += len(chunk) + if bytes_transferred > constraints.MAX_FILE_SIZE: + raise HTTPRequestEntityTooLarge(request=req) - send_chunk(chunk) + send_chunk(chunk) - ml = req.message_length() - if ml and bytes_transferred < ml: - req.client_disconnect = True - self.app.logger.warning( - _('Client disconnected without sending enough data')) - self.app.logger.increment('client_disconnects') - raise HTTPClientDisconnect(request=req) + ml = req.message_length() + if ml and bytes_transferred < ml: + req.client_disconnect = True + self.app.logger.warning( + _('Client disconnected without sending enough data')) + self.app.logger.increment('client_disconnects') + raise HTTPClientDisconnect(request=req) - trail_md = self._get_footers(req) - for putter in putters: - # send any footers set by middleware - putter.end_of_object_data(footer_metadata=trail_md) + trail_md = self._get_footers(req) + for putter in putters: + # send any footers set by middleware + putter.end_of_object_data(footer_metadata=trail_md) - for putter in putters: - putter.wait() - self._check_min_conn( - req, [p for p in putters if not p.failed], min_conns, - msg=_('Object PUT exceptions after last send, ' - '%(conns)s/%(nodes)s required connections')) + self._check_min_conn( + req, [p for p in putters if not p.failed], min_conns, + msg=_('Object PUT exceptions after last send, ' + '%(conns)s/%(nodes)s required connections')) except ChunkReadTimeout as err: self.app.logger.warning( _('ERROR Client read timeout (%ss)'), err.seconds) @@ -1576,10 +1573,14 @@ class Putter(object): :param resp: an HTTPResponse instance if connect() received final response :param path: the object path to send to the storage node :param connect_duration: time taken to initiate the HTTPConnection + :param write_timeout: time limit to write a chunk to the connection socket + :param send_exception_handler: callback called when an exception occured + writing to the connection socket :param logger: a Logger instance :param chunked: boolean indicating if the request encoding is chunked """ - def __init__(self, conn, node, resp, path, connect_duration, logger, + def __init__(self, conn, node, resp, path, connect_duration, + write_timeout, send_exception_handler, logger, chunked=False): # Note: you probably want to call Putter.connect() instead of # instantiating one of these directly. @@ -1588,11 +1589,12 @@ class Putter(object): self.resp = self.final_resp = resp self.path = path self.connect_duration = connect_duration + self.write_timeout = write_timeout + self.send_exception_handler = send_exception_handler # for handoff nodes node_index is None self.node_index = node.get('index') self.failed = False - self.queue = None self.state = NO_DATA_SENT self.chunked = chunked self.logger = logger @@ -1624,16 +1626,6 @@ class Putter(object): self.resp = self.conn.getresponse() return self.resp - def spawn_sender_greenthread(self, pool, queue_depth, write_timeout, - exception_handler): - """Call before sending the first chunk of request body""" - self.queue = Queue(queue_depth) - pool.spawn(self._send_file, write_timeout, exception_handler) - - def wait(self): - if self.queue.unfinished_tasks: - self.queue.join() - def _start_object_data(self): # Called immediately before the first chunk of object data is sent. # Subclasses may implement custom behaviour @@ -1653,7 +1645,7 @@ class Putter(object): self._start_object_data() self.state = SENDING_DATA - self.queue.put(chunk) + self._send_chunk(chunk) def end_of_object_data(self, **kwargs): """ @@ -1662,33 +1654,23 @@ class Putter(object): if self.state == DATA_SENT: raise ValueError("called end_of_object_data twice") - self.queue.put(b'') + self._send_chunk(b'') self.state = DATA_SENT - def _send_file(self, write_timeout, exception_handler): - """ - Method for a file PUT coroutine. Takes chunks from a queue and sends - them down a socket. - - If something goes wrong, the "failed" attribute will be set to true - and the exception handler will be called. - """ - while True: - chunk = self.queue.get() - if not self.failed: - if self.chunked: - to_send = b"%x\r\n%s\r\n" % (len(chunk), chunk) - else: - to_send = chunk - try: - with ChunkWriteTimeout(write_timeout): - self.conn.send(to_send) - except (Exception, ChunkWriteTimeout): - self.failed = True - exception_handler(self.node, _('Object'), - _('Trying to write to %s') % self.path) - - self.queue.task_done() + def _send_chunk(self, chunk): + if not self.failed: + if self.chunked: + to_send = b"%x\r\n%s\r\n" % (len(chunk), chunk) + else: + to_send = chunk + try: + with ChunkWriteTimeout(self.write_timeout): + self.conn.send(to_send) + except (Exception, ChunkWriteTimeout): + self.failed = True + self.send_exception_handler(self.node, _('Object'), + _('Trying to write to %s') + % self.path) def close(self): # release reference to response to ensure connection really does close, @@ -1725,7 +1707,8 @@ class Putter(object): @classmethod def connect(cls, node, part, path, headers, conn_timeout, node_timeout, - logger=None, chunked=False, **kwargs): + write_timeout, send_exception_handler, logger=None, + chunked=False, **kwargs): """ Connect to a backend node and send the headers. @@ -1738,7 +1721,8 @@ class Putter(object): """ conn, expect_resp, final_resp, connect_duration = cls._make_connection( node, part, path, headers, conn_timeout, node_timeout) - return cls(conn, node, final_resp, path, connect_duration, logger, + return cls(conn, node, final_resp, path, connect_duration, + write_timeout, send_exception_handler, logger, chunked=chunked) @@ -1753,9 +1737,11 @@ class MIMEPutter(Putter): An HTTP PUT request that supports streaming. """ def __init__(self, conn, node, resp, req, connect_duration, - logger, mime_boundary, multiphase=False): + write_timeout, send_exception_handler, logger, mime_boundary, + multiphase=False): super(MIMEPutter, self).__init__(conn, node, resp, req, - connect_duration, logger) + connect_duration, write_timeout, + send_exception_handler, logger) # Note: you probably want to call MimePutter.connect() instead of # instantiating one of these directly. self.chunked = True # MIME requests always send chunked body @@ -1766,8 +1752,8 @@ class MIMEPutter(Putter): # We're sending the object plus other stuff in the same request # body, all wrapped up in multipart MIME, so we'd better start # off the MIME document before sending any object data. - self.queue.put(b"--%s\r\nX-Document: object body\r\n\r\n" % - (self.mime_boundary,)) + self._send_chunk(b"--%s\r\nX-Document: object body\r\n\r\n" % + (self.mime_boundary,)) def end_of_object_data(self, footer_metadata=None): """ @@ -1800,9 +1786,9 @@ class MIMEPutter(Putter): footer_body, b"\r\n", tail_boundary, b"\r\n", ] - self.queue.put(b"".join(message_parts)) + self._send_chunk(b"".join(message_parts)) - self.queue.put(b'') + self._send_chunk(b'') self.state = DATA_SENT def send_commit_confirmation(self): @@ -1827,14 +1813,15 @@ class MIMEPutter(Putter): body, b"\r\n", tail_boundary, ] - self.queue.put(b"".join(message_parts)) + self._send_chunk(b"".join(message_parts)) - self.queue.put(b'') + self._send_chunk(b'') self.state = COMMIT_SENT @classmethod def connect(cls, node, part, req, headers, conn_timeout, node_timeout, - logger=None, need_multiphase=True, **kwargs): + write_timeout, send_exception_handler, logger=None, + need_multiphase=True, **kwargs): """ Connect to a backend node and send the headers. @@ -1886,7 +1873,8 @@ class MIMEPutter(Putter): if need_multiphase and not can_handle_multiphase_put: raise MultiphasePUTNotSupported() - return cls(conn, node, final_resp, req, connect_duration, logger, + return cls(conn, node, final_resp, req, connect_duration, + write_timeout, send_exception_handler, logger, mime_boundary, multiphase=need_multiphase) @@ -2499,6 +2487,8 @@ class ECObjectController(BaseObjectController): node, part, req.swift_entity_path, headers, conn_timeout=self.app.conn_timeout, node_timeout=self.app.node_timeout, + write_timeout=self.app.node_timeout, + send_exception_handler=self.app.exception_occurred, logger=self.app.logger, need_multiphase=True) @@ -2615,106 +2605,95 @@ class ECObjectController(BaseObjectController): '%(conns)s/%(nodes)s required connections')) try: - with ContextPool(len(putters)) as pool: + # build our putter_to_frag_index dict to place handoffs in the + # same part nodes index as the primaries they are covering + putter_to_frag_index = self._determine_chunk_destinations( + putters, policy) - # build our putter_to_frag_index dict to place handoffs in the - # same part nodes index as the primaries they are covering - putter_to_frag_index = self._determine_chunk_destinations( - putters, policy) + while True: + with ChunkReadTimeout(self.app.client_timeout): + try: + chunk = next(data_source) + except StopIteration: + break + bytes_transferred += len(chunk) + if bytes_transferred > constraints.MAX_FILE_SIZE: + raise HTTPRequestEntityTooLarge(request=req) - for putter in putters: - putter.spawn_sender_greenthread( - pool, self.app.put_queue_depth, self.app.node_timeout, - self.app.exception_occurred) - while True: - with ChunkReadTimeout(self.app.client_timeout): - try: - chunk = next(data_source) - except StopIteration: - break - bytes_transferred += len(chunk) - if bytes_transferred > constraints.MAX_FILE_SIZE: - raise HTTPRequestEntityTooLarge(request=req) + send_chunk(chunk) - send_chunk(chunk) + ml = req.message_length() + if ml and bytes_transferred < ml: + req.client_disconnect = True + self.app.logger.warning( + _('Client disconnected without sending enough data')) + self.app.logger.increment('client_disconnects') + raise HTTPClientDisconnect(request=req) - ml = req.message_length() - if ml and bytes_transferred < ml: - req.client_disconnect = True - self.app.logger.warning( - _('Client disconnected without sending enough data')) - self.app.logger.increment('client_disconnects') - raise HTTPClientDisconnect(request=req) + send_chunk(b'') # flush out any buffered data - send_chunk(b'') # flush out any buffered data + computed_etag = (etag_hasher.hexdigest() + if etag_hasher else None) + footers = self._get_footers(req) + received_etag = footers.get('etag', req.headers.get( + 'etag', '')).strip('"') + if (computed_etag and received_etag and + computed_etag != received_etag): + raise HTTPUnprocessableEntity(request=req) - computed_etag = (etag_hasher.hexdigest() - if etag_hasher else None) - footers = self._get_footers(req) - received_etag = footers.get('etag', req.headers.get( - 'etag', '')).strip('"') - if (computed_etag and received_etag and - computed_etag != received_etag): - raise HTTPUnprocessableEntity(request=req) + # Remove any EC reserved metadata names from footers + footers = {(k, v) for k, v in footers.items() + if not k.lower().startswith('x-object-sysmeta-ec-')} + for putter in putters: + frag_index = putter_to_frag_index[putter] + # Update any footers set by middleware with EC footers + trail_md = trailing_metadata( + policy, etag_hasher, + bytes_transferred, frag_index) + trail_md.update(footers) + # Etag footer must always be hash of what we sent + trail_md['Etag'] = frag_hashers[frag_index].hexdigest() + putter.end_of_object_data(footer_metadata=trail_md) - # Remove any EC reserved metadata names from footers - footers = {(k, v) for k, v in footers.items() - if not k.lower().startswith('x-object-sysmeta-ec-')} - for putter in putters: - frag_index = putter_to_frag_index[putter] - # Update any footers set by middleware with EC footers - trail_md = trailing_metadata( - policy, etag_hasher, - bytes_transferred, frag_index) - trail_md.update(footers) - # Etag footer must always be hash of what we sent - trail_md['Etag'] = frag_hashers[frag_index].hexdigest() - putter.end_of_object_data(footer_metadata=trail_md) + # for storage policies requiring 2-phase commit (e.g. + # erasure coding), enforce >= 'quorum' number of + # 100-continue responses - this indicates successful + # object data and metadata commit and is a necessary + # condition to be met before starting 2nd PUT phase + final_phase = False + statuses, reasons, bodies, _junk = \ + self._get_put_responses( + req, putters, len(nodes), final_phase=final_phase, + min_responses=min_conns) + if not self.have_quorum( + statuses, len(nodes), quorum=min_conns): + self.app.logger.error( + _('Not enough object servers ack\'ed (got %d)'), + statuses.count(HTTP_CONTINUE)) + raise HTTPServiceUnavailable(request=req) - for putter in putters: - putter.wait() - - # for storage policies requiring 2-phase commit (e.g. - # erasure coding), enforce >= 'quorum' number of - # 100-continue responses - this indicates successful - # object data and metadata commit and is a necessary - # condition to be met before starting 2nd PUT phase - final_phase = False - statuses, reasons, bodies, _junk = \ - self._get_put_responses( - req, putters, len(nodes), final_phase=final_phase, - min_responses=min_conns) - if not self.have_quorum( - statuses, len(nodes), quorum=min_conns): - self.app.logger.error( - _('Not enough object servers ack\'ed (got %d)'), - statuses.count(HTTP_CONTINUE)) + elif not self._have_adequate_informational( + statuses, min_conns): + resp = self.best_response(req, statuses, reasons, bodies, + _('Object PUT'), + quorum_size=min_conns) + if is_client_error(resp.status_int): + # if 4xx occurred in this state it is absolutely + # a bad conversation between proxy-server and + # object-server (even if it's + # HTTP_UNPROCESSABLE_ENTITY) so we should regard this + # as HTTPServiceUnavailable. raise HTTPServiceUnavailable(request=req) + else: + # Other errors should use raw best_response + raise resp - elif not self._have_adequate_informational( - statuses, min_conns): - resp = self.best_response(req, statuses, reasons, bodies, - _('Object PUT'), - quorum_size=min_conns) - if is_client_error(resp.status_int): - # if 4xx occurred in this state it is absolutely - # a bad conversation between proxy-server and - # object-server (even if it's - # HTTP_UNPROCESSABLE_ENTITY) so we should regard this - # as HTTPServiceUnavailable. - raise HTTPServiceUnavailable(request=req) - else: - # Other errors should use raw best_response - raise resp - - # quorum achieved, start 2nd phase - send commit - # confirmation to participating object servers - # so they write a .durable state file indicating - # a successful PUT - for putter in putters: - putter.send_commit_confirmation() - for putter in putters: - putter.wait() + # quorum achieved, start 2nd phase - send commit + # confirmation to participating object servers + # so they write a .durable state file indicating + # a successful PUT + for putter in putters: + putter.send_commit_confirmation() except ChunkReadTimeout as err: self.app.logger.warning( _('ERROR Client read timeout (%ss)'), err.seconds) diff --git a/swift/proxy/server.py b/swift/proxy/server.py index 42227b954a..ae7ee8ee26 100644 --- a/swift/proxy/server.py +++ b/swift/proxy/server.py @@ -191,7 +191,6 @@ class Application(object): conf.get('recoverable_node_timeout', self.node_timeout)) self.conn_timeout = float(conf.get('conn_timeout', 0.5)) self.client_timeout = int(conf.get('client_timeout', 60)) - self.put_queue_depth = int(conf.get('put_queue_depth', 10)) self.object_chunk_size = int(conf.get('object_chunk_size', 65536)) self.client_chunk_size = int(conf.get('client_chunk_size', 65536)) self.trans_id_suffix = conf.get('trans_id_suffix', '')