From 03b762e80a9b3d33ce13b8222f4cd2b549171c51 Mon Sep 17 00:00:00 2001 From: Janie Richling Date: Mon, 6 Jun 2016 17:19:48 +0100 Subject: [PATCH] Support for http footers - Replication and EC Before this patch, the proxy ObjectController supported sending metadata from the proxy server to object servers in "footers" that trail the body of HTTP PUT requests, but this support was for EC policies only. The encryption feature requires that footers are sent with both EC and replicated policy requests in order to persist encryption specific sysmeta, and to override container update headers with an encrypted Etag value. This patch: - Moves most of the functionality of ECPutter into a generic Putter class that is used for replicated object PUTs without footers. - Creates a MIMEPutter subclass to support multipart and multiphase behaviour required for any replicated object PUT with footers and all EC PUTs. - Modifies ReplicatedObjectController to use Putter objects in place of raw connection objects. - Refactors the _get_put_connections method and _put_connect_node methods so that more code is in the BaseObjectController class and therefore shared by [EC|Replicated]ObjectController classes. - Adds support to call a callback that middleware may have placed in the environ, so the callback can set footers. The x-object-sysmeta-ec- namespace is reserved and any footer values set by middleware in that namespace will not be forwarded to object servers. In addition this patch enables more than one value to be added to the X-Backend-Etag-Is-At header. This header is used to point to an (optional) alternative sysmeta header whose value should be used when evaluating conditional requests with If-[None-]Match headers. This is already used with EC policies when the ECObjectController has calculated the actual body Etag and sent it using a footer (X-Object-Sysmeta-EC-Etag). X-Backend-Etag-Is-At is in that case set to X-Object-Sysmeta-Ec-Etag so as to point to the actual body Etag value rather than the EC fragment Etag. Encryption will also need to add a pointer to an encrypted Etag value. However, the referenced sysmeta may not exist, for example if the object was created before encryption was enabled. The X-Backend-Etag-Is-At value is therefore changed to support a list of possible locations for alternate Etag values. Encryption will place its expected alternative Etag location on this list, as will the ECObjectController, and the object server will look for the first object metadata to match an entry on the list when matching conditional requests. That way, if the object was not encrypted then the object server will fall through to using the EC Etag value, or in the case of a replicated policy will fall through to using the normal Etag metadata. If your proxy has a third-party middleware that uses X-Backend-Etag-Is-At and it upgrades before an object server it's talking to then conditional requests may be broken. UpgradeImpact Co-Authored-By: Alistair Coles Co-Authored-By: Thiago da Silva Co-Authored-By: Samuel Merritt Co-Authored-By: Kota Tsuyuzaki Closes-Bug: #1594739 Change-Id: I12a6e41150f90de746ce03623032b83ed1987ee1 --- swift/common/request_helpers.py | 66 +- swift/common/swob.py | 4 +- swift/obj/server.py | 12 +- swift/proxy/controllers/obj.py | 945 ++++++++++++----------- test/unit/__init__.py | 15 +- test/unit/common/middleware/helpers.py | 25 +- test/unit/common/test_request_helpers.py | 74 +- test/unit/obj/test_server.py | 34 + test/unit/proxy/controllers/test_obj.py | 480 +++++++++++- test/unit/proxy/test_server.py | 4 +- 10 files changed, 1165 insertions(+), 494 deletions(-) diff --git a/swift/common/request_helpers.py b/swift/common/request_helpers.py index 07e34d8b46..71a32106af 100644 --- a/swift/common/request_helpers.py +++ b/swift/common/request_helpers.py @@ -27,6 +27,7 @@ import time import six from six.moves.urllib.parse import unquote +from swift.common.header_key_dict import HeaderKeyDict from swift import gettext_ as _ from swift.common.storage_policy import POLICIES @@ -38,7 +39,7 @@ from swift.common.swob import HTTPBadRequest, HTTPNotAcceptable, \ from swift.common.utils import split_path, validate_device_partition, \ close_if_possible, maybe_multipart_byteranges_to_document_iters, \ multipart_byteranges_to_document_iters, parse_content_type, \ - parse_content_range + parse_content_range, csv_append, list_from_csv from swift.common.wsgi import make_subrequest @@ -544,3 +545,66 @@ def http_response_to_document_iters(response, read_chunk_size=4096): params = dict(params_list) return multipart_byteranges_to_document_iters( response, params['boundary'], read_chunk_size) + + +def update_etag_is_at_header(req, name): + """ + Helper function to update an X-Backend-Etag-Is-At header whose value is a + list of alternative header names at which the actual object etag may be + found. This informs the object server where to look for the actual object + etag when processing conditional requests. + + Since the proxy server and/or middleware may set alternative etag header + names, the value of X-Backend-Etag-Is-At is a comma separated list which + the object server inspects in order until it finds an etag value. + + :param req: a swob Request + :param name: name of a sysmeta where alternative etag may be found + """ + if ',' in name: + # HTTP header names should not have commas but we'll check anyway + raise ValueError('Header name must not contain commas') + existing = req.headers.get("X-Backend-Etag-Is-At") + req.headers["X-Backend-Etag-Is-At"] = csv_append( + existing, name) + + +def resolve_etag_is_at_header(req, metadata): + """ + Helper function to resolve an alternative etag value that may be stored in + metadata under an alternate name. + + The value of the request's X-Backend-Etag-Is-At header (if it exists) is a + comma separated list of alternate names in the metadata at which an + alternate etag value may be found. This list is processed in order until an + alternate etag is found. + + The left most value in X-Backend-Etag-Is-At will have been set by the left + most middleware, or if no middleware, by ECObjectController, if an EC + policy is in use. The left most middleware is assumed to be the authority + on what the etag value of the object content is. + + The resolver will work from left to right in the list until it finds a + value that is a name in the given metadata. So the left most wins, IF it + exists in the metadata. + + By way of example, assume the encrypter middleware is installed. If an + object is *not* encrypted then the resolver will not find the encrypter + middleware's alternate etag sysmeta (X-Object-Sysmeta-Crypto-Etag) but will + then find the EC alternate etag (if EC policy). But if the object *is* + encrypted then X-Object-Sysmeta-Crypto-Etag is found and used, which is + correct because it should be preferred over X-Object-Sysmeta-Crypto-Etag. + + :param req: a swob Request + :param metadata: a dict containing object metadata + :return: an alternate etag value if any is found, otherwise None + """ + alternate_etag = None + metadata = HeaderKeyDict(metadata) + if "X-Backend-Etag-Is-At" in req.headers: + names = list_from_csv(req.headers["X-Backend-Etag-Is-At"]) + for name in names: + if name in metadata: + alternate_etag = metadata[name] + break + return alternate_etag diff --git a/swift/common/swob.py b/swift/common/swob.py index 2ba5d5e6a4..aa11ec01f2 100644 --- a/swift/common/swob.py +++ b/swift/common/swob.py @@ -1140,8 +1140,8 @@ class Response(object): conditional requests. It's most effectively used with X-Backend-Etag-Is-At which would - define the additional Metadata key where the original ETag of the - clear-form client request data. + define the additional Metadata key(s) where the original ETag of the + clear-form client request data may be found. """ if self._conditional_etag is not None: return self._conditional_etag diff --git a/swift/obj/server.py b/swift/obj/server.py index c3fde72525..99083800eb 100644 --- a/swift/obj/server.py +++ b/swift/obj/server.py @@ -46,7 +46,7 @@ from swift.common.http import is_success from swift.common.base_storage_server import BaseStorageServer from swift.common.header_key_dict import HeaderKeyDict from swift.common.request_helpers import get_name_and_placement, \ - is_user_meta, is_sys_or_user_meta + is_user_meta, is_sys_or_user_meta, resolve_etag_is_at_header from swift.common.swob import HTTPAccepted, HTTPBadRequest, HTTPCreated, \ HTTPInternalServerError, HTTPNoContent, HTTPNotFound, \ HTTPPreconditionFailed, HTTPRequestTimeout, HTTPUnprocessableEntity, \ @@ -832,10 +832,7 @@ class ObjectController(BaseStorageServer): keep_cache = (self.keep_cache_private or ('X-Auth-Token' not in request.headers and 'X-Storage-Token' not in request.headers)) - conditional_etag = None - if 'X-Backend-Etag-Is-At' in request.headers: - conditional_etag = metadata.get( - request.headers['X-Backend-Etag-Is-At']) + conditional_etag = resolve_etag_is_at_header(request, metadata) response = Response( app_iter=disk_file.reader(keep_cache=keep_cache), request=request, conditional_response=True, @@ -889,10 +886,7 @@ class ObjectController(BaseStorageServer): headers['X-Backend-Timestamp'] = e.timestamp.internal return HTTPNotFound(request=request, headers=headers, conditional_response=True) - conditional_etag = None - if 'X-Backend-Etag-Is-At' in request.headers: - conditional_etag = metadata.get( - request.headers['X-Backend-Etag-Is-At']) + conditional_etag = resolve_etag_is_at_header(request, metadata) response = Response(request=request, conditional_response=True, conditional_etag=conditional_etag) response.headers['Content-Type'] = metadata.get( diff --git a/swift/proxy/controllers/obj.py b/swift/proxy/controllers/obj.py index 6f8559063a..af6b9368d7 100644 --- a/swift/proxy/controllers/obj.py +++ b/swift/proxy/controllers/obj.py @@ -71,6 +71,8 @@ from swift.common.swob import HTTPAccepted, HTTPBadRequest, HTTPNotFound, \ HTTPServerError, HTTPServiceUnavailable, HTTPClientDisconnect, \ HTTPUnprocessableEntity, Response, HTTPException, \ HTTPRequestedRangeNotSatisfiable, Range, HTTPInternalServerError +from swift.common.request_helpers import update_etag_is_at_header, \ + resolve_etag_is_at_header def check_content_type(req): @@ -289,71 +291,111 @@ class BaseObjectController(Controller): return headers - def _await_response(self, conn, **kwargs): - with Timeout(self.app.node_timeout): - if conn.resp: - return conn.resp - else: - return conn.getresponse() - - def _get_conn_response(self, conn, req, logger_thread_locals, **kwargs): + def _get_conn_response(self, putter, path, logger_thread_locals, + final_phase, **kwargs): self.app.logger.thread_locals = logger_thread_locals try: - resp = self._await_response(conn, **kwargs) - return (conn, resp) + resp = putter.await_response( + self.app.node_timeout, not final_phase) except (Exception, Timeout): + resp = None + if final_phase: + status_type = 'final' + else: + status_type = 'commit' self.app.exception_occurred( - conn.node, _('Object'), - _('Trying to get final status of PUT to %s') % req.path) - return (None, None) + putter.node, _('Object'), + _('Trying to get %(status_type)s status of PUT to %(path)s') % + {'status_type': status_type, 'path': path}) + return (putter, resp) - def _get_put_responses(self, req, conns, nodes, **kwargs): + def _have_adequate_put_responses(self, statuses, num_nodes, min_responses): """ - Collect replicated object responses. + Test for sufficient PUT responses from backend nodes to proceed with + PUT handling. + + :param statuses: a list of response statuses. + :param num_nodes: number of backend nodes to which PUT requests may be + issued. + :param min_responses: (optional) minimum number of nodes required to + have responded with satisfactory status code. + :return: True if sufficient backend responses have returned a + satisfactory status code. + """ + raise NotImplementedError + + def _get_put_responses(self, req, putters, num_nodes, final_phase=True, + min_responses=None): + """ + Collect object responses to a PUT request and determine if a + satisfactory number of nodes have returned success. Returns + lists of accumulated status codes, reasons, bodies and etags. + + :param req: the request + :param putters: list of putters for the request + :param num_nodes: number of nodes involved + :param final_phase: boolean indicating if this is the last phase + :param min_responses: minimum needed when not requiring quorum + :return: a tuple of lists of status codes, reasons, bodies and etags. + The list of bodies and etags is only populated for the final + phase of a PUT transaction. """ statuses = [] reasons = [] bodies = [] etags = set() - pile = GreenAsyncPile(len(conns)) - for conn in conns: - pile.spawn(self._get_conn_response, conn, - req, self.app.logger.thread_locals) + pile = GreenAsyncPile(len(putters)) + for putter in putters: + if putter.failed: + continue + pile.spawn(self._get_conn_response, putter, req, + self.app.logger.thread_locals, final_phase=final_phase) - def _handle_response(conn, response): + def _handle_response(putter, response): statuses.append(response.status) reasons.append(response.reason) - bodies.append(response.read()) + if final_phase: + body = response.read() + else: + body = '' + bodies.append(body) if response.status == HTTP_INSUFFICIENT_STORAGE: - self.app.error_limit(conn.node, + putter.failed = True + self.app.error_limit(putter.node, _('ERROR Insufficient Storage')) elif response.status >= HTTP_INTERNAL_SERVER_ERROR: + putter.failed = True self.app.error_occurred( - conn.node, + putter.node, _('ERROR %(status)d %(body)s From Object Server ' 're: %(path)s') % {'status': response.status, - 'body': bodies[-1][:1024], 'path': req.path}) + 'body': body[:1024], 'path': req.path}) elif is_success(response.status): etags.add(response.getheader('etag').strip('"')) - for (conn, response) in pile: + for (putter, response) in pile: if response: - _handle_response(conn, response) - if self.have_quorum(statuses, len(nodes)): + _handle_response(putter, response) + if self._have_adequate_put_responses( + statuses, num_nodes, min_responses): break + else: + putter.failed = True # give any pending requests *some* chance to finish finished_quickly = pile.waitall(self.app.post_quorum_timeout) - for (conn, response) in finished_quickly: + for (putter, response) in finished_quickly: if response: - _handle_response(conn, response) + _handle_response(putter, response) + + if final_phase: + while len(statuses) < num_nodes: + statuses.append(HTTP_SERVICE_UNAVAILABLE) + reasons.append('') + bodies.append('') - while len(statuses) < len(nodes): - statuses.append(HTTP_SERVICE_UNAVAILABLE) - reasons.append('') - bodies.append('') return statuses, reasons, bodies, etags def _config_obj_expiration(self, req): @@ -406,12 +448,17 @@ class BaseObjectController(Controller): req.headers['X-Timestamp'] = Timestamp(time.time()).internal return None - def _check_failure_put_connections(self, conns, req, nodes, min_conns): + def _check_failure_put_connections(self, putters, req, min_conns): """ Identify any failed connections and check minimum connection count. + + :param putters: a list of Putter instances + :param req: request + :param min_conns: minimum number of putter connections required """ if req.if_none_match is not None and '*' in req.if_none_match: - statuses = [conn.resp.status for conn in conns if conn.resp] + statuses = [ + putter.resp.status for putter in putters if putter.resp] if HTTP_PRECONDITION_FAILED in statuses: # If we find any copy of the file, it shouldn't be uploaded self.app.logger.debug( @@ -419,14 +466,14 @@ class BaseObjectController(Controller): {'statuses': statuses}) raise HTTPPreconditionFailed(request=req) - if any(conn for conn in conns if conn.resp and - conn.resp.status == HTTP_CONFLICT): + if any(putter for putter in putters if putter.resp and + putter.resp.status == HTTP_CONFLICT): status_times = ['%(status)s (%(timestamp)s)' % { - 'status': conn.resp.status, + 'status': putter.resp.status, 'timestamp': HeaderKeyDict( - conn.resp.getheaders()).get( + putter.resp.getheaders()).get( 'X-Backend-Timestamp', 'unknown') - } for conn in conns if conn.resp] + } for putter in putters if putter.resp] self.app.logger.debug( _('Object PUT returning 202 for 409: ' '%(req_timestamp)s <= %(timestamps)r'), @@ -434,32 +481,61 @@ class BaseObjectController(Controller): 'timestamps': ', '.join(status_times)}) raise HTTPAccepted(request=req) - self._check_min_conn(req, conns, min_conns) + self._check_min_conn(req, putters, min_conns) - def _connect_put_node(self, nodes, part, path, headers, + def _make_putter(self, node, part, req, headers): + """ + Returns a putter object for handling streaming of object to object + servers. + + Subclasses must implement this method. + + :param node: a storage node + :param part: ring partition number + :param req: a swob Request + :param headers: request headers + :return: an instance of a Putter + """ + raise NotImplementedError + + def _connect_put_node(self, nodes, part, req, headers, logger_thread_locals): """ Make connection to storage nodes - Connects to the first working node that it finds in nodes iter - and sends over the request headers. Returns an HTTPConnection - object to handle the rest of the streaming. - - This method must be implemented by each policy ObjectController. + Connects to the first working node that it finds in nodes iter and + sends over the request headers. Returns a Putter to handle the rest of + the streaming, or None if no working nodes were found. :param nodes: an iterator of the target storage nodes - :param partition: ring partition number - :param path: the object path to send to the storage node + :param part: ring partition number + :param req: a swob Request :param headers: request headers :param logger_thread_locals: The thread local values to be set on the self.app.logger to retain transaction logging information. - :return: HTTPConnection object + :return: an instance of a Putter """ - raise NotImplementedError() + self.app.logger.thread_locals = logger_thread_locals + for node in nodes: + try: + putter = self._make_putter(node, part, req, headers) + self.app.set_node_timing(node, putter.connect_duration) + return putter + except InsufficientStorage: + self.app.error_limit(node, _('ERROR Insufficient Storage')) + except PutterConnectError as e: + self.app.error_occurred( + node, _('ERROR %(status)d Expect: 100-continue ' + 'From Object Server') % { + 'status': e.status}) + except (Exception, Timeout): + self.app.exception_occurred( + node, _('Object'), + _('Expect: 100-continue on %s') % req.swift_entity_path) def _get_put_connections(self, req, nodes, partition, outgoing_headers, - policy, expect): + policy): """ Establish connections to storage nodes for PUT request """ @@ -469,25 +545,32 @@ class BaseObjectController(Controller): pile = GreenPile(len(nodes)) for nheaders in outgoing_headers: - if expect: + # RFC2616:8.2.3 disallows 100-continue without a body + if (req.content_length > 0) or req.is_chunked: nheaders['Expect'] = '100-continue' pile.spawn(self._connect_put_node, node_iter, partition, - req.swift_entity_path, nheaders, - self.app.logger.thread_locals) + req, nheaders, self.app.logger.thread_locals) - conns = [conn for conn in pile if conn] + putters = [putter for putter in pile if putter] - return conns + return putters - def _check_min_conn(self, req, conns, min_conns, msg=None): - msg = msg or 'Object PUT returning 503, %(conns)s/%(nodes)s ' \ - 'required connections' + def _check_min_conn(self, req, putters, min_conns, msg=None): + msg = msg or _('Object PUT returning 503, %(conns)s/%(nodes)s ' + 'required connections') - if len(conns) < min_conns: + if len(putters) < min_conns: self.app.logger.error((msg), - {'conns': len(conns), 'nodes': min_conns}) + {'conns': len(putters), 'nodes': min_conns}) raise HTTPServiceUnavailable(request=req) + def _get_footers(self, req): + footers = HeaderKeyDict() + footer_callback = req.environ.get( + 'swift.callback.update_footers', lambda _footer: None) + footer_callback(footers) + return footers + def _store_object(self, req, data_source, nodes, partition, outgoing_headers): """ @@ -659,115 +742,81 @@ class ReplicatedObjectController(BaseObjectController): req.swift_entity_path, concurrency) return resp - def _connect_put_node(self, nodes, part, path, headers, - logger_thread_locals): - """ - Make a connection for a replicated object. + def _make_putter(self, node, part, req, headers): + if req.environ.get('swift.callback.update_footers'): + putter = MIMEPutter.connect( + node, part, req.swift_entity_path, headers, + conn_timeout=self.app.conn_timeout, + node_timeout=self.app.node_timeout, + logger=self.app.logger, + need_multiphase=False) + else: + putter = Putter.connect( + node, part, req.swift_entity_path, headers, + conn_timeout=self.app.conn_timeout, + node_timeout=self.app.node_timeout, + logger=self.app.logger, + chunked=req.is_chunked) + return putter - Connects to the first working node that it finds in node_iter - and sends over the request headers. Returns an HTTPConnection - object to handle the rest of the streaming. - """ - self.app.logger.thread_locals = logger_thread_locals - for node in nodes: - try: - start_time = time.time() - with ConnectionTimeout(self.app.conn_timeout): - conn = http_connect( - node['ip'], node['port'], node['device'], part, 'PUT', - path, headers) - self.app.set_node_timing(node, time.time() - start_time) - with Timeout(self.app.node_timeout): - resp = conn.getexpect() - if resp.status == HTTP_CONTINUE: - conn.resp = None - conn.node = node - return conn - elif (is_success(resp.status) - or resp.status in (HTTP_CONFLICT, - HTTP_UNPROCESSABLE_ENTITY)): - conn.resp = resp - conn.node = node - return conn - elif headers['If-None-Match'] is not None and \ - resp.status == HTTP_PRECONDITION_FAILED: - conn.resp = resp - conn.node = node - return conn - elif resp.status == HTTP_INSUFFICIENT_STORAGE: - self.app.error_limit(node, _('ERROR Insufficient Storage')) - elif is_server_error(resp.status): - self.app.error_occurred( - node, - _('ERROR %(status)d Expect: 100-continue ' - 'From Object Server') % { - 'status': resp.status}) - except (Exception, Timeout): - self.app.exception_occurred( - node, _('Object'), - _('Expect: 100-continue on %s') % path) - - def _send_file(self, conn, path): - """Method for a file PUT coro""" - while True: - chunk = conn.queue.get() - if not conn.failed: - try: - with ChunkWriteTimeout(self.app.node_timeout): - conn.send(chunk) - except (Exception, ChunkWriteTimeout): - conn.failed = True - self.app.exception_occurred( - conn.node, _('Object'), - _('Trying to write to %s') % path) - conn.queue.task_done() - - def _transfer_data(self, req, data_source, conns, nodes): + def _transfer_data(self, req, data_source, putters, nodes): """ Transfer data for a replicated object. This method was added in the PUT method extraction change """ - min_conns = quorum_size(len(nodes)) bytes_transferred = 0 + + def send_chunk(chunk): + for putter in list(putters): + if not putter.failed: + putter.send_chunk(chunk) + else: + putter.close() + putters.remove(putter) + self._check_min_conn( + req, putters, min_conns, + msg=_('Object PUT exceptions during send, ' + '%(conns)s/%(nodes)s required connections')) + + min_conns = quorum_size(len(nodes)) try: with ContextPool(len(nodes)) as pool: - for conn in conns: - conn.failed = False - conn.queue = Queue(self.app.put_queue_depth) - pool.spawn(self._send_file, conn, req.path) + for putter in putters: + putter.spawn_sender_greenthread( + pool, self.app.put_queue_depth, self.app.node_timeout, + self.app.exception_occurred) while True: with ChunkReadTimeout(self.app.client_timeout): try: chunk = next(data_source) except StopIteration: - if req.is_chunked: - for conn in conns: - conn.queue.put('0\r\n\r\n') break bytes_transferred += len(chunk) if bytes_transferred > constraints.MAX_FILE_SIZE: raise HTTPRequestEntityTooLarge(request=req) - for conn in list(conns): - if not conn.failed: - conn.queue.put( - '%x\r\n%s\r\n' % (len(chunk), chunk) - if req.is_chunked else chunk) - else: - conn.close() - conns.remove(conn) - self._check_min_conn( - req, conns, min_conns, - msg='Object PUT exceptions during' - ' send, %(conns)s/%(nodes)s required connections') - for conn in conns: - if conn.queue.unfinished_tasks: - conn.queue.join() - conns = [conn for conn in conns if not conn.failed] - self._check_min_conn( - req, conns, min_conns, - msg='Object PUT exceptions after last send, ' - '%(conns)s/%(nodes)s required connections') + + send_chunk(chunk) + + if req.content_length and ( + bytes_transferred < req.content_length): + req.client_disconnect = True + self.app.logger.warning( + _('Client disconnected without sending enough data')) + self.app.logger.increment('client_disconnects') + raise HTTPClientDisconnect(request=req) + + trail_md = self._get_footers(req) + for putter in putters: + # send any footers set by middleware + putter.end_of_object_data(footer_metadata=trail_md) + + for putter in putters: + putter.wait() + self._check_min_conn( + req, [p for p in putters if not p.failed], min_conns, + msg=_('Object PUT exceptions after last send, ' + '%(conns)s/%(nodes)s required connections')) except ChunkReadTimeout as err: self.app.logger.warning( _('ERROR Client read timeout (%ss)'), err.seconds) @@ -790,12 +839,9 @@ class ReplicatedObjectController(BaseObjectController): _('ERROR Exception transferring data to object servers %s'), {'path': req.path}) raise HTTPInternalServerError(request=req) - if req.content_length and bytes_transferred < req.content_length: - req.client_disconnect = True - self.app.logger.warning( - _('Client disconnected without sending enough data')) - self.app.logger.increment('client_disconnects') - raise HTTPClientDisconnect(request=req) + + def _have_adequate_put_responses(self, statuses, num_nodes, min_responses): + return self.have_quorum(statuses, num_nodes) def _store_object(self, req, data_source, nodes, partition, outgoing_headers): @@ -812,30 +858,25 @@ class ReplicatedObjectController(BaseObjectController): if not nodes: return HTTPNotFound() - # RFC2616:8.2.3 disallows 100-continue without a body - if (req.content_length > 0) or req.is_chunked: - expect = True - else: - expect = False - conns = self._get_put_connections(req, nodes, partition, - outgoing_headers, policy, expect) + putters = self._get_put_connections( + req, nodes, partition, outgoing_headers, policy) min_conns = quorum_size(len(nodes)) try: # check that a minimum number of connections were established and # meet all the correct conditions set in the request - self._check_failure_put_connections(conns, req, nodes, min_conns) + self._check_failure_put_connections(putters, req, min_conns) # transfer data - self._transfer_data(req, data_source, conns, nodes) + self._transfer_data(req, data_source, putters, nodes) # get responses - statuses, reasons, bodies, etags = self._get_put_responses( - req, conns, nodes) + statuses, reasons, bodies, etags = \ + self._get_put_responses(req, putters, len(nodes)) except HTTPException as resp: return resp finally: - for conn in conns: - conn.close() + for putter in putters: + putter.close() if len(etags) > 1: self.app.logger.error( @@ -1380,33 +1421,38 @@ DATA_ACKED = 4 COMMIT_SENT = 5 -class ECPutter(object): +class Putter(object): """ - This is here mostly to wrap up the fact that all EC PUTs are - chunked because of the mime boundary footer trick and the first - half of the two-phase PUT conversation handling. + Putter for backend PUT requests. - An HTTP PUT request that supports streaming. + Encapsulates all the actions required to establish a connection with a + storage node and stream data to that node. - Probably deserves more docs than this, but meh. + :param conn: an HTTPConnection instance + :param node: dict describing storage node + :param resp: an HTTPResponse instance if connect() received final response + :param path: the object path to send to the storage node + :param connect_duration: time taken to initiate the HTTPConnection + :param logger: a Logger instance + :param chunked: boolean indicating if the request encoding is chunked """ - def __init__(self, conn, node, resp, path, connect_duration, - mime_boundary): + def __init__(self, conn, node, resp, path, connect_duration, logger, + chunked=False): # Note: you probably want to call Putter.connect() instead of # instantiating one of these directly. self.conn = conn self.node = node - self.resp = resp + self.resp = self.final_resp = resp self.path = path self.connect_duration = connect_duration # for handoff nodes node_index is None self.node_index = node.get('index') - self.mime_boundary = mime_boundary - self.chunk_hasher = md5() self.failed = False self.queue = None self.state = NO_DATA_SENT + self.chunked = chunked + self.logger = logger def await_response(self, timeout, informational=False): """ @@ -1419,16 +1465,20 @@ class ECPutter(object): a 100 Continue response and sent up the PUT request's body, then we'll actually read the 2xx-5xx response off the network here. + :param timeout: time to wait for a response + :param informational: if True then try to get a 100-continue response, + otherwise try to get a final response. :returns: HTTPResponse :raises: Timeout if the response took too long """ - conn = self.conn with Timeout(timeout): - if not conn.resp: + # don't do this update of self.resp if the Expect response during + # conenct() was actually a final response + if not self.final_resp: if informational: - self.resp = conn.getexpect() + self.resp = self.conn.getexpect() else: - self.resp = conn.getresponse() + self.resp = self.conn.getresponse() return self.resp def spawn_sender_greenthread(self, pool, queue_depth, write_timeout, @@ -1441,9 +1491,10 @@ class ECPutter(object): if self.queue.unfinished_tasks: self.queue.join() - def _start_mime_doc_object_body(self): - self.queue.put("--%s\r\nX-Document: object body\r\n\r\n" % - (self.mime_boundary,)) + def _start_object_data(self): + # Called immediately before the first chunk of object data is sent. + # Subclasses may implement custom behaviour + pass def send_chunk(self, chunk): if not chunk: @@ -1455,30 +1506,148 @@ class ECPutter(object): elif self.state == DATA_SENT: raise ValueError("called send_chunk after end_of_object_data") - if self.state == NO_DATA_SENT and self.mime_boundary: - # We're sending the object plus other stuff in the same request - # body, all wrapped up in multipart MIME, so we'd better start - # off the MIME document before sending any object data. - self._start_mime_doc_object_body() + if self.state == NO_DATA_SENT: + self._start_object_data() self.state = SENDING_DATA self.queue.put(chunk) - def end_of_object_data(self, footer_metadata): + def end_of_object_data(self, **kwargs): + """ + Call when there is no more data to send. + """ + if self.state == DATA_SENT: + raise ValueError("called end_of_object_data twice") + + self.queue.put('') + self.state = DATA_SENT + + def _send_file(self, write_timeout, exception_handler): + """ + Method for a file PUT coroutine. Takes chunks from a queue and sends + them down a socket. + + If something goes wrong, the "failed" attribute will be set to true + and the exception handler will be called. + """ + while True: + chunk = self.queue.get() + if not self.failed: + if self.chunked: + to_send = "%x\r\n%s\r\n" % (len(chunk), chunk) + else: + to_send = chunk + try: + with ChunkWriteTimeout(write_timeout): + self.conn.send(to_send) + except (Exception, ChunkWriteTimeout): + self.failed = True + exception_handler(self.node, _('Object'), + _('Trying to write to %s') % self.path) + + self.queue.task_done() + + def close(self): + # release reference to response to ensure connection really does close, + # see bug https://bugs.launchpad.net/swift/+bug/1594739 + self.resp = self.final_resp = None + self.conn.close() + + @classmethod + def _make_connection(cls, node, part, path, headers, conn_timeout, + node_timeout): + start_time = time.time() + with ConnectionTimeout(conn_timeout): + conn = http_connect(node['ip'], node['port'], node['device'], + part, 'PUT', path, headers) + connect_duration = time.time() - start_time + + with ResponseTimeout(node_timeout): + resp = conn.getexpect() + + if resp.status == HTTP_INSUFFICIENT_STORAGE: + raise InsufficientStorage + + if is_server_error(resp.status): + raise PutterConnectError(resp.status) + + final_resp = None + if (is_success(resp.status) or + resp.status in (HTTP_CONFLICT, HTTP_UNPROCESSABLE_ENTITY) or + (headers.get('If-None-Match', None) is not None and + resp.status == HTTP_PRECONDITION_FAILED)): + final_resp = resp + + return conn, resp, final_resp, connect_duration + + @classmethod + def connect(cls, node, part, path, headers, conn_timeout, node_timeout, + logger=None, chunked=False, **kwargs): + """ + Connect to a backend node and send the headers. + + :returns: Putter instance + + :raises: ConnectionTimeout if initial connection timed out + :raises: ResponseTimeout if header retrieval timed out + :raises: InsufficientStorage on 507 response from node + :raises: PutterConnectError on non-507 server error response from node + """ + conn, expect_resp, final_resp, connect_duration = cls._make_connection( + node, part, path, headers, conn_timeout, node_timeout) + return cls(conn, node, final_resp, path, connect_duration, logger, + chunked=chunked) + + +class MIMEPutter(Putter): + """ + Putter for backend PUT requests that use MIME. + + This is here mostly to wrap up the fact that all multipart PUTs are + chunked because of the mime boundary footer trick and the first + half of the two-phase PUT conversation handling. + + An HTTP PUT request that supports streaming. + """ + def __init__(self, conn, node, resp, req, connect_duration, + logger, mime_boundary, multiphase=False): + super(MIMEPutter, self).__init__(conn, node, resp, req, + connect_duration, logger) + # Note: you probably want to call MimePutter.connect() instead of + # instantiating one of these directly. + self.chunked = True # MIME requests always send chunked body + self.mime_boundary = mime_boundary + self.multiphase = multiphase + + def _start_object_data(self): + # We're sending the object plus other stuff in the same request + # body, all wrapped up in multipart MIME, so we'd better start + # off the MIME document before sending any object data. + self.queue.put("--%s\r\nX-Document: object body\r\n\r\n" % + (self.mime_boundary,)) + + def end_of_object_data(self, footer_metadata=None): """ Call when there is no more data to send. + Overrides superclass implementation to send any footer metadata + after object data. + :param footer_metadata: dictionary of metadata items + to be sent as footers. """ if self.state == DATA_SENT: raise ValueError("called end_of_object_data twice") elif self.state == NO_DATA_SENT and self.mime_boundary: - self._start_mime_doc_object_body() + self._start_object_data() footer_body = json.dumps(footer_metadata) footer_md5 = md5(footer_body).hexdigest() tail_boundary = ("--%s" % (self.mime_boundary,)) + if not self.multiphase: + # this will be the last part sent + tail_boundary = tail_boundary + "--" message_parts = [ ("\r\n--%s\r\n" % self.mime_boundary), @@ -1498,6 +1667,9 @@ class ECPutter(object): Call when there are > quorum 2XX responses received. Send commit confirmations to all object nodes to finalize the PUT. """ + if not self.multiphase: + raise ValueError( + "called send_commit_confirmation but multiphase is False") if self.state == COMMIT_SENT: raise ValueError("called send_commit_confirmation twice") @@ -1517,79 +1689,49 @@ class ECPutter(object): self.queue.put('') self.state = COMMIT_SENT - def _send_file(self, write_timeout, exception_handler): - """ - Method for a file PUT coro. Takes chunks from a queue and sends them - down a socket. - - If something goes wrong, the "failed" attribute will be set to true - and the exception handler will be called. - """ - while True: - chunk = self.queue.get() - if not self.failed: - to_send = "%x\r\n%s\r\n" % (len(chunk), chunk) - try: - with ChunkWriteTimeout(write_timeout): - self.conn.send(to_send) - except (Exception, ChunkWriteTimeout): - self.failed = True - exception_handler(self.conn.node, _('Object'), - _('Trying to write to %s') % self.path) - self.queue.task_done() - @classmethod - def connect(cls, node, part, path, headers, conn_timeout, node_timeout, - chunked=False, expected_frag_archive_size=None): + def connect(cls, node, part, req, headers, conn_timeout, node_timeout, + logger=None, need_multiphase=True, **kwargs): """ Connect to a backend node and send the headers. - :returns: Putter instance + Override superclass method to notify object of need for support for + multipart body with footers and optionally multiphase commit, and + verify object server's capabilities. - :raises: ConnectionTimeout if initial connection timed out - :raises: ResponseTimeout if header retrieval timed out - :raises: InsufficientStorage on 507 response from node - :raises: PutterConnectError on non-507 server error response from node + :param need_multiphase: if True then multiphase support is required of + the object server :raises: FooterNotSupported if need_metadata_footer is set but backend node can't process footers - :raises: MultiphasePUTNotSupported if need_multiphase_support is - set but backend node can't handle multiphase PUT + :raises: MultiphasePUTNotSupported if need_multiphase is set but + backend node can't handle multiphase PUT """ mime_boundary = "%.64x" % random.randint(0, 16 ** 64) headers = HeaderKeyDict(headers) + # when using a multipart mime request to backend the actual + # content-length is not equal to the object content size, so move the + # object content size to X-Backend-Obj-Content-Length if that has not + # already been set by the EC PUT path. + headers.setdefault('X-Backend-Obj-Content-Length', + headers.pop('Content-Length', None)) # We're going to be adding some unknown amount of data to the # request, so we can't use an explicit content length, and thus # we must use chunked encoding. headers['Transfer-Encoding'] = 'chunked' headers['Expect'] = '100-continue' - # make sure this isn't there - headers.pop('Content-Length') - headers['X-Backend-Obj-Content-Length'] = expected_frag_archive_size - headers['X-Backend-Obj-Multipart-Mime-Boundary'] = mime_boundary headers['X-Backend-Obj-Metadata-Footer'] = 'yes' - headers['X-Backend-Obj-Multiphase-Commit'] = 'yes' + if need_multiphase: + headers['X-Backend-Obj-Multiphase-Commit'] = 'yes' - start_time = time.time() - with ConnectionTimeout(conn_timeout): - conn = http_connect(node['ip'], node['port'], node['device'], - part, 'PUT', path, headers) - connect_duration = time.time() - start_time + conn, expect_resp, final_resp, connect_duration = cls._make_connection( + node, part, req, headers, conn_timeout, node_timeout) - with ResponseTimeout(node_timeout): - resp = conn.getexpect() - - if resp.status == HTTP_INSUFFICIENT_STORAGE: - raise InsufficientStorage - - if is_server_error(resp.status): - raise PutterConnectError(resp.status) - - if is_informational(resp.status): - continue_headers = HeaderKeyDict(resp.getheaders()) + if is_informational(expect_resp.status): + continue_headers = HeaderKeyDict(expect_resp.getheaders()) can_send_metadata_footer = config_true_value( continue_headers.get('X-Obj-Metadata-Footer', 'no')) can_handle_multiphase_put = config_true_value( @@ -1598,18 +1740,11 @@ class ECPutter(object): if not can_send_metadata_footer: raise FooterNotSupported() - if not can_handle_multiphase_put: + if need_multiphase and not can_handle_multiphase_put: raise MultiphasePUTNotSupported() - conn.node = node - conn.resp = None - if is_success(resp.status) or resp.status == HTTP_CONFLICT: - conn.resp = resp - elif (headers.get('If-None-Match', None) is not None and - resp.status == HTTP_PRECONDITION_FAILED): - conn.resp = resp - - return cls(conn, node, resp, path, connect_duration, mime_boundary) + return cls(conn, node, final_resp, req, connect_duration, logger, + mime_boundary, multiphase=need_multiphase) def chunk_transformer(policy, nstreams): @@ -1674,7 +1809,7 @@ def chunk_transformer(policy, nstreams): def trailing_metadata(policy, client_obj_hasher, bytes_transferred_from_client, fragment_archive_index): - return { + return HeaderKeyDict({ # etag and size values are being added twice here. # The container override header is used to update the container db # with these values as they represent the correct etag and size for @@ -1692,7 +1827,7 @@ def trailing_metadata(policy, client_obj_hasher, # AKA "what is this thing?" 'X-Object-Sysmeta-EC-Scheme': policy.ec_scheme_description, 'X-Object-Sysmeta-EC-Segment-Size': str(policy.ec_segment_size), - } + }) @ObjectControllerRouter.register(EC_POLICY) @@ -1764,8 +1899,7 @@ class ECObjectController(BaseObjectController): return range_specs def _get_or_head_response(self, req, node_iter, partition, policy): - req.headers.setdefault("X-Backend-Etag-Is-At", - "X-Object-Sysmeta-Ec-Etag") + update_etag_is_at_header(req, "X-Object-Sysmeta-Ec-Etag") if req.method == 'HEAD': # no fancy EC decoding here, just one plain old HEAD request to @@ -1862,14 +1996,18 @@ class ECObjectController(BaseObjectController): resp = self.best_response( req, statuses, reasons, bodies, 'Object', headers=headers) - self._fix_response(resp) + self._fix_response(req, resp) return resp - def _fix_response(self, resp): + def _fix_response(self, req, resp): # EC fragment archives each have different bytes, hence different # etags. However, they all have the original object's etag stored in # sysmeta, so we copy that here (if it exists) so the client gets it. resp.headers['Etag'] = resp.headers.get('X-Object-Sysmeta-Ec-Etag') + # We're about to invoke conditional response checking so set the + # correct conditional etag from wherever X-Backend-Etag-Is-At points, + # if it exists at all. + resp._conditional_etag = resolve_etag_is_at_header(req, resp.headers) if (is_success(resp.status_int) or is_redirection(resp.status_int) or resp.status_int == HTTP_REQUESTED_RANGE_NOT_SATISFIABLE): resp.accept_ranges = 'bytes' @@ -1878,66 +2016,13 @@ class ECObjectController(BaseObjectController): 'X-Object-Sysmeta-Ec-Content-Length') resp.fix_conditional_response() - def _connect_put_node(self, node_iter, part, path, headers, - logger_thread_locals): - """ - Make a connection for a erasure encoded object. - - Connects to the first working node that it finds in node_iter and sends - over the request headers. Returns a Putter to handle the rest of the - streaming, or None if no working nodes were found. - """ - # the object server will get different bytes, so these - # values do not apply (Content-Length might, in general, but - # in the specific case of replication vs. EC, it doesn't). - client_cl = headers.pop('Content-Length', None) - headers.pop('Etag', None) - - expected_frag_size = None - if client_cl: - policy_index = int(headers.get('X-Backend-Storage-Policy-Index')) - policy = POLICIES.get_by_index(policy_index) - # TODO: PyECLib <= 1.2.0 looks to return the segment info - # different from the input for aligned data efficiency but - # Swift never does. So calculate the fragment length Swift - # will actually send to object sever by making two different - # get_segment_info calls (until PyECLib fixed). - # policy.fragment_size makes the call using segment size, - # and the next call is to get info for the last segment - - # get number of fragments except the tail - use truncation // - num_fragments = int(client_cl) // policy.ec_segment_size - expected_frag_size = policy.fragment_size * num_fragments - - # calculate the tail fragment_size by hand and add it to - # expected_frag_size - last_segment_size = int(client_cl) % policy.ec_segment_size - if last_segment_size: - last_info = policy.pyeclib_driver.get_segment_info( - last_segment_size, policy.ec_segment_size) - expected_frag_size += last_info['fragment_size'] - - self.app.logger.thread_locals = logger_thread_locals - for node in node_iter: - try: - putter = ECPutter.connect( - node, part, path, headers, - conn_timeout=self.app.conn_timeout, - node_timeout=self.app.node_timeout, - expected_frag_archive_size=expected_frag_size) - self.app.set_node_timing(node, putter.connect_duration) - return putter - except InsufficientStorage: - self.app.error_limit(node, _('ERROR Insufficient Storage')) - except PutterConnectError as e: - self.app.error_occurred( - node, _('ERROR %(status)d Expect: 100-continue ' - 'From Object Server') % { - 'status': e.status}) - except (Exception, Timeout): - self.app.exception_occurred( - node, _('Object'), - _('Expect: 100-continue on %s') % path) + def _make_putter(self, node, part, req, headers): + return MIMEPutter.connect( + node, part, req.swift_entity_path, headers, + conn_timeout=self.app.conn_timeout, + node_timeout=self.app.node_timeout, + logger=self.app.logger, + need_multiphase=True) def _determine_chunk_destinations(self, putters): """ @@ -1985,8 +2070,16 @@ class ECObjectController(BaseObjectController): bytes_transferred = 0 chunk_transform = chunk_transformer(policy, len(nodes)) chunk_transform.send(None) + chunk_hashers = collections.defaultdict(md5) def send_chunk(chunk): + # Note: there's two different hashers in here. etag_hasher is + # hashing the original object so that we can validate the ETag + # that the client sent (and etag_hasher is None if the client + # didn't send one). The hasher in chunk_hashers is hashing the + # fragment archive being sent to the client; this lets us guard + # against data corruption on the network between proxy and + # object server. if etag_hasher: etag_hasher.update(chunk) backend_chunks = chunk_transform.send(chunk) @@ -1996,15 +2089,18 @@ class ECObjectController(BaseObjectController): return for putter in list(putters): - backend_chunk = backend_chunks[chunk_index[putter]] + ci = chunk_index[putter] + backend_chunk = backend_chunks[ci] if not putter.failed: - putter.chunk_hasher.update(backend_chunk) + chunk_hashers[ci].update(backend_chunk) putter.send_chunk(backend_chunk) else: + putter.close() putters.remove(putter) self._check_min_conn( - req, putters, min_conns, msg='Object PUT exceptions during' - ' send, %(conns)s/%(nodes)s required connections') + req, putters, min_conns, + msg=_('Object PUT exceptions during send, ' + '%(conns)s/%(nodes)s required connections')) try: with ContextPool(len(putters)) as pool: @@ -2047,14 +2143,26 @@ class ECObjectController(BaseObjectController): send_chunk('') # flush out any buffered data + footers = self._get_footers(req) + received_etag = footers.get( + 'etag', '').strip('"') + if (computed_etag and received_etag and + computed_etag != received_etag): + raise HTTPUnprocessableEntity(request=req) + + # Remove any EC reserved metadata names from footers + footers = {(k, v) for k, v in footers.items() + if not k.lower().startswith('x-object-sysmeta-ec-')} for putter in putters: + ci = chunk_index[putter] + # Update any footers set by middleware with EC footers trail_md = trailing_metadata( policy, etag_hasher, - bytes_transferred, - chunk_index[putter]) - trail_md['Etag'] = \ - putter.chunk_hasher.hexdigest() - putter.end_of_object_data(trail_md) + bytes_transferred, ci) + trail_md.update(footers) + # Etag footer must always be hash of what we sent + trail_md['Etag'] = chunk_hashers[ci].hexdigest() + putter.end_of_object_data(footer_metadata=trail_md) for putter in putters: putter.wait() @@ -2065,12 +2173,12 @@ class ECObjectController(BaseObjectController): # object data and metadata commit and is a necessary # condition to be met before starting 2nd PUT phase final_phase = False - need_quorum = True - statuses, reasons, bodies, _junk, quorum = \ + statuses, reasons, bodies, _junk = \ self._get_put_responses( - req, putters, len(nodes), final_phase, - min_conns, need_quorum=need_quorum) - if not quorum: + req, putters, len(nodes), final_phase=final_phase, + min_responses=min_conns) + if not self.have_quorum( + statuses, len(nodes), quorum=min_conns): self.app.logger.error( _('Not enough object servers ack\'ed (got %d)'), statuses.count(HTTP_CONTINUE)) @@ -2153,109 +2261,15 @@ class ECObjectController(BaseObjectController): return self._have_adequate_responses( statuses, min_responses, is_informational) - def _await_response(self, conn, final_phase): - return conn.await_response( - self.app.node_timeout, not final_phase) - - def _get_conn_response(self, conn, req, logger_thread_locals, - final_phase, **kwargs): - self.app.logger.thread_locals = logger_thread_locals - try: - resp = self._await_response(conn, final_phase=final_phase, - **kwargs) - except (Exception, Timeout): - resp = None - if final_phase: - status_type = 'final' - else: - status_type = 'commit' - self.app.exception_occurred( - conn.node, _('Object'), - _('Trying to get %(status_type)s status of PUT to %(path)s') % - {'status_type': status_type, 'path': req.path}) - return (conn, resp) - - def _get_put_responses(self, req, putters, num_nodes, final_phase, - min_responses, need_quorum=True): - """ - Collect erasure coded object responses. - - Collect object responses to a PUT request and determine if - satisfactory number of nodes have returned success. Return - statuses, quorum result if indicated by 'need_quorum' and - etags if this is a final phase or a multiphase PUT transaction. - - :param req: the request - :param putters: list of putters for the request - :param num_nodes: number of nodes involved - :param final_phase: boolean indicating if this is the last phase - :param min_responses: minimum needed when not requiring quorum - :param need_quorum: boolean indicating if quorum is required - """ - statuses = [] - reasons = [] - bodies = [] - etags = set() - - pile = GreenAsyncPile(len(putters)) - for putter in putters: - if putter.failed: - continue - pile.spawn(self._get_conn_response, putter, req, - self.app.logger.thread_locals, final_phase=final_phase) - - def _handle_response(putter, response): - statuses.append(response.status) - reasons.append(response.reason) - if final_phase: - body = response.read() - else: - body = '' - bodies.append(body) - if response.status == HTTP_INSUFFICIENT_STORAGE: - putter.failed = True - self.app.error_limit(putter.node, - _('ERROR Insufficient Storage')) - elif response.status >= HTTP_INTERNAL_SERVER_ERROR: - putter.failed = True - self.app.error_occurred( - putter.node, - _('ERROR %(status)d %(body)s From Object Server ' - 're: %(path)s') % - {'status': response.status, - 'body': body[:1024], 'path': req.path}) - elif is_success(response.status): - etags.add(response.getheader('etag').strip('"')) - - quorum = False - for (putter, response) in pile: - if response: - _handle_response(putter, response) - if self._have_adequate_successes(statuses, min_responses): - break - else: - putter.failed = True - - # give any pending requests *some* chance to finish - finished_quickly = pile.waitall(self.app.post_quorum_timeout) - for (putter, response) in finished_quickly: - if response: - _handle_response(putter, response) - - if need_quorum: - if final_phase: - while len(statuses) < num_nodes: - statuses.append(HTTP_SERVICE_UNAVAILABLE) - reasons.append('') - bodies.append('') - else: - # intermediate response phase - set return value to true only - # if there are responses having same value of *any* status - # except 5xx - if self.have_quorum(statuses, num_nodes, quorum=min_responses): - quorum = True - - return statuses, reasons, bodies, etags, quorum + def _have_adequate_put_responses(self, statuses, num_nodes, min_responses): + # For an EC PUT we require a quorum of responses with success statuses + # in order to move on to next phase of PUT request handling without + # having to wait for *all* responses. + # TODO: this implies that in the first phase of the backend PUTs when + # we are actually expecting 1xx responses that we will end up waiting + # for *all* responses. That seems inefficient since we only need a + # quorum of 1xx responses to proceed. + return self._have_adequate_successes(statuses, min_responses) def _store_object(self, req, data_source, nodes, partition, outgoing_headers): @@ -2264,6 +2278,35 @@ class ECObjectController(BaseObjectController): """ policy_index = int(req.headers.get('X-Backend-Storage-Policy-Index')) policy = POLICIES.get_by_index(policy_index) + + expected_frag_size = None + if req.content_length: + # TODO: PyECLib <= 1.2.0 looks to return the segment info + # different from the input for aligned data efficiency but + # Swift never does. So calculate the fragment length Swift + # will actually send to object sever by making two different + # get_segment_info calls (until PyECLib fixed). + # policy.fragment_size makes the call using segment size, + # and the next call is to get info for the last segment + + # get number of fragments except the tail - use truncation // + num_fragments = req.content_length // policy.ec_segment_size + expected_frag_size = policy.fragment_size * num_fragments + + # calculate the tail fragment_size by hand and add it to + # expected_frag_size + last_segment_size = req.content_length % policy.ec_segment_size + if last_segment_size: + last_info = policy.pyeclib_driver.get_segment_info( + last_segment_size, policy.ec_segment_size) + expected_frag_size += last_info['fragment_size'] + for headers in outgoing_headers: + headers['X-Backend-Obj-Content-Length'] = expected_frag_size + # the object server will get different bytes, so these + # values do not apply. + headers.pop('Content-Length', None) + headers.pop('Etag', None) + # Since the request body sent from client -> proxy is not # the same as the request body sent proxy -> object, we # can't rely on the object-server to do the etag checking - @@ -2272,18 +2315,15 @@ class ECObjectController(BaseObjectController): min_conns = policy.quorum putters = self._get_put_connections( - req, nodes, partition, outgoing_headers, - policy, expect=True) + req, nodes, partition, outgoing_headers, policy) try: # check that a minimum number of connections were established and # meet all the correct conditions set in the request - self._check_failure_put_connections(putters, req, nodes, min_conns) + self._check_failure_put_connections(putters, req, min_conns) self._transfer_data(req, policy, data_source, putters, nodes, min_conns, etag_hasher) - final_phase = True - need_quorum = False # The .durable file will propagate in a replicated fashion; if # one exists, the reconstructor will spread it around. # In order to avoid successfully writing an object, but refusing @@ -2292,15 +2332,16 @@ class ECObjectController(BaseObjectController): # writes as quorum fragment writes. If object servers are in the # future able to serve their non-durable fragment archives we may # be able to reduce this quorum count if needed. - min_conns = policy.quorum - putters = [p for p in putters if not p.failed] - # ignore response etags, and quorum boolean - statuses, reasons, bodies, _etags, _quorum = \ + # ignore response etags + statuses, reasons, bodies, _etags = \ self._get_put_responses(req, putters, len(nodes), - final_phase, min_conns, - need_quorum=need_quorum) + final_phase=True, + min_responses=min_conns) except HTTPException as resp: return resp + finally: + for putter in putters: + putter.close() etag = etag_hasher.hexdigest() resp = self.best_response(req, statuses, reasons, bodies, diff --git a/test/unit/__init__.py b/test/unit/__init__.py index c4c833a79c..acc3c8612f 100644 --- a/test/unit/__init__.py +++ b/test/unit/__init__.py @@ -32,6 +32,8 @@ import eventlet from eventlet.green import socket from tempfile import mkdtemp from shutil import rmtree + + from swift.common.utils import Timestamp, NOTICE from test import get_config from swift.common import utils @@ -848,7 +850,7 @@ def fake_http_connect(*code_iter, **kwargs): def __init__(self, status, etag=None, body='', timestamp='1', headers=None, expect_headers=None, connection_id=None, - give_send=None): + give_send=None, give_expect=None): if not isinstance(status, FakeStatus): status = FakeStatus(status) self._status = status @@ -864,6 +866,8 @@ def fake_http_connect(*code_iter, **kwargs): self.timestamp = timestamp self.connection_id = connection_id self.give_send = give_send + self.give_expect = give_expect + self.closed = False if 'slow' in kwargs and isinstance(kwargs['slow'], list): try: self._next_sleep = kwargs['slow'].pop(0) @@ -884,6 +888,8 @@ def fake_http_connect(*code_iter, **kwargs): return self def getexpect(self): + if self.give_expect: + self.give_expect(self) expect_status = self._status.get_expect_status() headers = dict(self.expect_headers) if expect_status == 409: @@ -953,7 +959,7 @@ def fake_http_connect(*code_iter, **kwargs): def send(self, amt=None): if self.give_send: - self.give_send(self.connection_id, amt) + self.give_send(self, amt) am_slow, value = self.get_slow() if am_slow: if self.received < 4: @@ -964,7 +970,7 @@ def fake_http_connect(*code_iter, **kwargs): return HeaderKeyDict(self.getheaders()).get(name, default) def close(self): - pass + self.closed = True timestamps_iter = iter(kwargs.get('timestamps') or ['1'] * len(code_iter)) etag_iter = iter(kwargs.get('etags') or [None] * len(code_iter)) @@ -1017,7 +1023,8 @@ def fake_http_connect(*code_iter, **kwargs): body = next(body_iter) return FakeConn(status, etag, body=body, timestamp=timestamp, headers=headers, expect_headers=expect_headers, - connection_id=i, give_send=kwargs.get('give_send')) + connection_id=i, give_send=kwargs.get('give_send'), + give_expect=kwargs.get('give_expect')) connect.code_iter = code_iter diff --git a/test/unit/common/middleware/helpers.py b/test/unit/common/middleware/helpers.py index e542818967..8b8fff3b3d 100644 --- a/test/unit/common/middleware/helpers.py +++ b/test/unit/common/middleware/helpers.py @@ -16,7 +16,6 @@ # This stuff can't live in test/unit/__init__.py due to its swob dependency. from collections import defaultdict -from copy import deepcopy from hashlib import md5 from swift.common import swob from swift.common.header_key_dict import HeaderKeyDict @@ -113,24 +112,34 @@ class FakeSwift(object): raise KeyError("Didn't find %r in allowed responses" % ( (method, path),)) - self._calls.append((method, path, req_headers)) - # simulate object PUT if method == 'PUT' and obj: - input = env['wsgi.input'].read() + input = ''.join(iter(env['wsgi.input'].read, '')) + if 'swift.callback.update_footers' in env: + footers = HeaderKeyDict() + env['swift.callback.update_footers'](footers) + req_headers.update(footers) etag = md5(input).hexdigest() headers.setdefault('Etag', etag) headers.setdefault('Content-Length', len(input)) # keep it for subsequent GET requests later - self.uploaded[path] = (deepcopy(headers), input) + self.uploaded[path] = (dict(req_headers), input) if "CONTENT_TYPE" in env: self.uploaded[path][0]['Content-Type'] = env["CONTENT_TYPE"] - # range requests ought to work, which require conditional_response=True + self._calls.append((method, path, HeaderKeyDict(req_headers))) + + # range requests ought to work, hence conditional_response=True req = swob.Request(env) - resp = resp_class(req=req, headers=headers, body=body, - conditional_response=req.method in ('GET', 'HEAD')) + if isinstance(body, list): + resp = resp_class( + req=req, headers=headers, app_iter=body, + conditional_response=req.method in ('GET', 'HEAD')) + else: + resp = resp_class( + req=req, headers=headers, body=body, + conditional_response=req.method in ('GET', 'HEAD')) wsgi_iter = resp(env, start_response) self.mark_opened(path) return LeakTrackingIter(wsgi_iter, self, path) diff --git a/test/unit/common/test_request_helpers.py b/test/unit/common/test_request_helpers.py index c13bc03ca9..1c39e9f0af 100644 --- a/test/unit/common/test_request_helpers.py +++ b/test/unit/common/test_request_helpers.py @@ -21,7 +21,8 @@ from swift.common.storage_policy import POLICIES, EC_POLICY, REPL_POLICY from swift.common.request_helpers import is_sys_meta, is_user_meta, \ is_sys_or_user_meta, strip_sys_meta_prefix, strip_user_meta_prefix, \ remove_items, copy_header_subset, get_name_and_placement, \ - http_response_to_document_iters + http_response_to_document_iters, update_etag_is_at_header, \ + resolve_etag_is_at_header from test.unit import patch_policies from test.unit.common.test_utils import FakeResponse @@ -273,3 +274,74 @@ class TestHTTPResponseToDocumentIters(unittest.TestCase): self.assertEqual(body.read(), 'ches') self.assertRaises(StopIteration, next, doc_iters) + + def test_update_etag_is_at_header(self): + # start with no existing X-Backend-Etag-Is-At + req = Request.blank('/v/a/c/o') + update_etag_is_at_header(req, 'X-Object-Sysmeta-My-Etag') + self.assertEqual('X-Object-Sysmeta-My-Etag', + req.headers['X-Backend-Etag-Is-At']) + # add another alternate + update_etag_is_at_header(req, 'X-Object-Sysmeta-Ec-Etag') + self.assertEqual('X-Object-Sysmeta-My-Etag,X-Object-Sysmeta-Ec-Etag', + req.headers['X-Backend-Etag-Is-At']) + with self.assertRaises(ValueError) as cm: + update_etag_is_at_header(req, 'X-Object-Sysmeta-,-Bad') + self.assertEqual('Header name must not contain commas', + cm.exception.message) + + def test_resolve_etag_is_at_header(self): + def do_test(): + req = Request.blank('/v/a/c/o') + # ok to have no X-Backend-Etag-Is-At + self.assertIsNone(resolve_etag_is_at_header(req, metadata)) + + # ok to have no matching metadata + req.headers['X-Backend-Etag-Is-At'] = 'X-Not-There' + self.assertIsNone(resolve_etag_is_at_header(req, metadata)) + + # selects from metadata + req.headers['X-Backend-Etag-Is-At'] = 'X-Object-Sysmeta-Ec-Etag' + self.assertEqual('an etag value', + resolve_etag_is_at_header(req, metadata)) + req.headers['X-Backend-Etag-Is-At'] = 'X-Object-Sysmeta-My-Etag' + self.assertEqual('another etag value', + resolve_etag_is_at_header(req, metadata)) + + # first in list takes precedence + req.headers['X-Backend-Etag-Is-At'] = \ + 'X-Object-Sysmeta-My-Etag,X-Object-Sysmeta-Ec-Etag' + self.assertEqual('another etag value', + resolve_etag_is_at_header(req, metadata)) + + # non-existent alternates are passed over + req.headers['X-Backend-Etag-Is-At'] = \ + 'X-Bogus,X-Object-Sysmeta-My-Etag,X-Object-Sysmeta-Ec-Etag' + self.assertEqual('another etag value', + resolve_etag_is_at_header(req, metadata)) + + # spaces in list are ok + alts = 'X-Foo, X-Object-Sysmeta-My-Etag , X-Object-Sysmeta-Ec-Etag' + req.headers['X-Backend-Etag-Is-At'] = alts + self.assertEqual('another etag value', + resolve_etag_is_at_header(req, metadata)) + + # lower case in list is ok + alts = alts.lower() + req.headers['X-Backend-Etag-Is-At'] = alts + self.assertEqual('another etag value', + resolve_etag_is_at_header(req, metadata)) + + # upper case in list is ok + alts = alts.upper() + req.headers['X-Backend-Etag-Is-At'] = alts + self.assertEqual('another etag value', + resolve_etag_is_at_header(req, metadata)) + + metadata = {'X-Object-Sysmeta-Ec-Etag': 'an etag value', + 'X-Object-Sysmeta-My-Etag': 'another etag value'} + do_test() + metadata = dict((k.lower(), v) for k, v in metadata.items()) + do_test() + metadata = dict((k.upper(), v) for k, v in metadata.items()) + do_test() diff --git a/test/unit/obj/test_server.py b/test/unit/obj/test_server.py index 24eba9956a..b85230f395 100755 --- a/test/unit/obj/test_server.py +++ b/test/unit/obj/test_server.py @@ -2385,6 +2385,7 @@ class TestObjectController(unittest.TestCase): 'X-Timestamp': utils.Timestamp(time()).internal, 'Content-Type': 'application/octet-stream', 'X-Object-Meta-Xtag': 'madeup', + 'X-Object-Sysmeta-Xtag': 'alternate madeup', } req = Request.blank('/sda1/p/a/c/o', method='PUT', headers=headers) @@ -2400,6 +2401,39 @@ class TestObjectController(unittest.TestCase): resp = req.get_response(self.object_controller) self.assertEqual(resp.status_int, 200) + # match x-backend-etag-is-at, using first in list of alternates + req = Request.blank('/sda1/p/a/c/o', headers={ + 'If-Match': 'madeup', + 'X-Backend-Etag-Is-At': + 'X-Object-Meta-Xtag,X-Object-Sysmeta-Z'}) + resp = req.get_response(self.object_controller) + self.assertEqual(resp.status_int, 200) + + # match x-backend-etag-is-at, using second in list of alternates + alts = 'X-Object-Sysmeta-Y,X-Object-Meta-Xtag,X-Object-Sysmeta-Z' + req = Request.blank('/sda1/p/a/c/o', headers={ + 'If-Match': 'madeup', + 'X-Backend-Etag-Is-At': alts}) + resp = req.get_response(self.object_controller) + self.assertEqual(resp.status_int, 200) + + # match x-backend-etag-is-at, choosing first of multiple alternates + alts = 'X-Object-Sysmeta-Y,X-Object-Meta-Xtag,X-Object-Sysmeta-Xtag' + req = Request.blank('/sda1/p/a/c/o', headers={ + 'If-Match': 'madeup', + 'X-Backend-Etag-Is-At': alts}) + resp = req.get_response(self.object_controller) + self.assertEqual(resp.status_int, 200) + + # match x-backend-etag-is-at, choosing first of multiple alternates + # (switches order of second two alternates from previous assertion) + alts = 'X-Object-Sysmeta-Y,X-Object-Sysmeta-Xtag,X-Object-Meta-Xtag' + req = Request.blank('/sda1/p/a/c/o', headers={ + 'If-Match': 'alternate madeup', + 'X-Backend-Etag-Is-At': alts}) + resp = req.get_response(self.object_controller) + self.assertEqual(resp.status_int, 200) + # no match x-backend-etag-is-at req = Request.blank('/sda1/p/a/c/o', headers={ 'If-Match': real_etag, diff --git a/test/unit/proxy/controllers/test_obj.py b/test/unit/proxy/controllers/test_obj.py index be0893dbb2..4495fb0c68 100755 --- a/test/unit/proxy/controllers/test_obj.py +++ b/test/unit/proxy/controllers/test_obj.py @@ -122,6 +122,27 @@ class PatchedObjControllerApp(proxy_server.Application): PatchedObjControllerApp, self).__call__(*args, **kwargs) +def make_footers_callback(body=None): + # helper method to create a footers callback that will generate some fake + # footer metadata + cont_etag = 'container update etag may differ' + crypto_etag = '20242af0cd21dd7195a10483eb7472c9' + etag_crypto_meta = \ + '{"cipher": "AES_CTR_256", "iv": "sD+PSw/DfqYwpsVGSo0GEw=="}' + etag = md5(body).hexdigest() if body is not None else None + footers_to_add = { + 'X-Object-Sysmeta-Container-Update-Override-Etag': cont_etag, + 'X-Object-Sysmeta-Crypto-Etag': crypto_etag, + 'X-Object-Sysmeta-Crypto-Meta-Etag': etag_crypto_meta, + 'X-I-Feel-Lucky': 'Not blocked', + 'Etag': etag} + + def footers_callback(footers): + footers.update(footers_to_add) + + return footers_callback + + class BaseObjectControllerMixin(object): container_info = { 'status': 200, @@ -253,10 +274,11 @@ class BaseObjectControllerMixin(object): def test_connect_put_node_timeout(self): controller = self.controller_cls( self.app, 'a', 'c', 'o') + req = swift.common.swob.Request.blank('/v1/a/c/o') self.app.conn_timeout = 0.05 with set_http_connect(slow_connect=True): nodes = [dict(ip='', port='', device='')] - res = controller._connect_put_node(nodes, '', '', {}, ('', '')) + res = controller._connect_put_node(nodes, '', req, {}, ('', '')) self.assertTrue(res is None) def test_DELETE_simple(self): @@ -564,6 +586,163 @@ class TestReplicatedObjController(BaseObjectControllerMixin, resp = req.get_response(self.app) self.assertEqual(resp.status_int, 201) + def test_PUT_error_with_footers(self): + footers_callback = make_footers_callback('') + env = {'swift.callback.update_footers': footers_callback} + req = swift.common.swob.Request.blank('/v1/a/c/o', method='PUT', + environ=env) + req.headers['content-length'] = '0' + codes = [503] * self.replicas() + expect_headers = { + 'X-Obj-Metadata-Footer': 'yes' + } + + with set_http_connect(*codes, expect_headers=expect_headers): + resp = req.get_response(self.app) + self.assertEqual(resp.status_int, 503) + + def _test_PUT_with_no_footers(self, test_body='', chunked=False): + # verify that when no footers are required then the PUT uses a regular + # single part body + req = swift.common.swob.Request.blank('/v1/a/c/o', method='PUT', + body=test_body) + if chunked: + req.headers['Transfer-Encoding'] = 'chunked' + etag = md5(test_body).hexdigest() + req.headers['Etag'] = etag + + put_requests = defaultdict( + lambda: {'headers': None, 'chunks': [], 'connection': None}) + + def capture_body(conn, chunk): + put_requests[conn.connection_id]['chunks'].append(chunk) + put_requests[conn.connection_id]['connection'] = conn + + def capture_headers(ip, port, device, part, method, path, headers, + **kwargs): + conn_id = kwargs['connection_id'] + put_requests[conn_id]['headers'] = headers + + codes = [201] * self.replicas() + expect_headers = {'X-Obj-Metadata-Footer': 'yes'} + with set_http_connect(*codes, expect_headers=expect_headers, + give_send=capture_body, + give_connect=capture_headers): + resp = req.get_response(self.app) + + self.assertEqual(resp.status_int, 201) + for connection_id, info in put_requests.items(): + body = ''.join(info['chunks']) + headers = info['headers'] + if chunked: + body = unchunk_body(body) + self.assertEqual('100-continue', headers['Expect']) + self.assertEqual('chunked', headers['Transfer-Encoding']) + else: + self.assertNotIn('Transfer-Encoding', headers) + if body: + self.assertEqual('100-continue', headers['Expect']) + else: + self.assertNotIn('Expect', headers) + self.assertNotIn('X-Backend-Obj-Multipart-Mime-Boundary', headers) + self.assertNotIn('X-Backend-Obj-Metadata-Footer', headers) + self.assertNotIn('X-Backend-Obj-Multiphase-Commit', headers) + self.assertEqual(etag, headers['Etag']) + + self.assertEqual(test_body, body) + self.assertTrue(info['connection'].closed) + + def test_PUT_with_chunked_body_and_no_footers(self): + self._test_PUT_with_no_footers(test_body='asdf', chunked=True) + + def test_PUT_with_body_and_no_footers(self): + self._test_PUT_with_no_footers(test_body='asdf', chunked=False) + + def test_PUT_with_no_body_and_no_footers(self): + self._test_PUT_with_no_footers(test_body='', chunked=False) + + def _test_PUT_with_footers(self, test_body=''): + # verify that when footers are required the PUT body is multipart + # and the footers are appended + footers_callback = make_footers_callback(test_body) + env = {'swift.callback.update_footers': footers_callback} + req = swift.common.swob.Request.blank('/v1/a/c/o', method='PUT', + environ=env) + req.body = test_body + # send bogus Etag header to differentiate from footer value + req.headers['Etag'] = 'header_etag' + codes = [201] * self.replicas() + expect_headers = { + 'X-Obj-Metadata-Footer': 'yes' + } + + put_requests = defaultdict( + lambda: {'headers': None, 'chunks': [], 'connection': None}) + + def capture_body(conn, chunk): + put_requests[conn.connection_id]['chunks'].append(chunk) + put_requests[conn.connection_id]['connection'] = conn + + def capture_headers(ip, port, device, part, method, path, headers, + **kwargs): + conn_id = kwargs['connection_id'] + put_requests[conn_id]['headers'] = headers + + with set_http_connect(*codes, expect_headers=expect_headers, + give_send=capture_body, + give_connect=capture_headers): + resp = req.get_response(self.app) + + self.assertEqual(resp.status_int, 201) + for connection_id, info in put_requests.items(): + body = unchunk_body(''.join(info['chunks'])) + headers = info['headers'] + boundary = headers['X-Backend-Obj-Multipart-Mime-Boundary'] + self.assertTrue(boundary is not None, + "didn't get boundary for conn %r" % ( + connection_id,)) + self.assertEqual('chunked', headers['Transfer-Encoding']) + self.assertEqual('100-continue', headers['Expect']) + self.assertEqual('yes', headers['X-Backend-Obj-Metadata-Footer']) + self.assertNotIn('X-Backend-Obj-Multiphase-Commit', headers) + self.assertEqual('header_etag', headers['Etag']) + + # email.parser.FeedParser doesn't know how to take a multipart + # message and boundary together and parse it; it only knows how + # to take a string, parse the headers, and figure out the + # boundary on its own. + parser = email.parser.FeedParser() + parser.feed( + "Content-Type: multipart/nobodycares; boundary=%s\r\n\r\n" % + boundary) + parser.feed(body) + message = parser.close() + + self.assertTrue(message.is_multipart()) # sanity check + mime_parts = message.get_payload() + # notice, no commit confirmation + self.assertEqual(len(mime_parts), 2) + obj_part, footer_part = mime_parts + + self.assertEqual(obj_part['X-Document'], 'object body') + self.assertEqual(test_body, obj_part.get_payload()) + + # validate footer metadata + self.assertEqual(footer_part['X-Document'], 'object metadata') + footer_metadata = json.loads(footer_part.get_payload()) + self.assertTrue(footer_metadata) + expected = {} + footers_callback(expected) + self.assertDictEqual(expected, footer_metadata) + + self.assertTrue(info['connection'].closed) + + def test_PUT_with_body_and_footers(self): + self._test_PUT_with_footers(test_body='asdf') + + def test_PUT_with_no_body_and_footers(self): + self._test_PUT_with_footers() + def test_txn_id_logging_on_PUT(self): req = swift.common.swob.Request.blank('/v1/a/c/o', method='PUT') self.app.logger.txn_id = req.environ['swift.trans_id'] = 'test-txn-id' @@ -585,11 +764,15 @@ class TestReplicatedObjController(BaseObjectControllerMixin, req.headers['Content-Length'] = '0' req.headers['Etag'] = '"catbus"' - # The 2-tuple here makes getexpect() return 422, not 100. For - # objects that are >0 bytes, you get a 100 Continue and then a 422 - # Unprocessable Entity after sending the body. For zero-byte - # objects, though, you get the 422 right away. - codes = [FakeStatus((422, 422)) + # The 2-tuple here makes getexpect() return 422, not 100. For objects + # that are >0 bytes, you get a 100 Continue and then a 422 + # Unprocessable Entity after sending the body. For zero-byte objects, + # though, you get the 422 right away because no Expect header is sent + # with zero-byte PUT. The second status in the tuple should not be + # consumed, it's just there to make the FakeStatus treat the first as + # an expect status, but we'll make it something other than a 422 so + # that if it is consumed then the test should fail. + codes = [FakeStatus((422, 200)) for _junk in range(self.replicas())] with set_http_connect(*codes): @@ -707,16 +890,24 @@ class TestReplicatedObjController(BaseObjectControllerMixin, class FakeReader(object): def read(self, size): raise Timeout() + conns = [] + + def capture_expect(conn): + # stash connections so that we can verify they all get closed + conns.append(conn) req = swob.Request.blank('/v1/a/c/o.jpg', method='PUT', body='test body') req.environ['wsgi.input'] = FakeReader() req.headers['content-length'] = '6' - with set_http_connect(201, 201, 201): + with set_http_connect(201, 201, 201, give_expect=capture_expect): resp = req.get_response(self.app) self.assertEqual(resp.status_int, 499) + self.assertEqual(self.replicas(), len(conns)) + for conn in conns: + self.assertTrue(conn.closed) def test_PUT_exception_during_transfer_data(self): class FakeReader(object): @@ -1131,6 +1322,108 @@ class TestECObjController(BaseObjectControllerMixin, unittest.TestCase): self.assertEqual(resp.status_int, 200) self.assertIn('Accept-Ranges', resp.headers) + def _test_if_match(self, method): + num_responses = self.policy.ec_ndata if method == 'GET' else 1 + + def _do_test(match_value, backend_status, + etag_is_at='X-Object-Sysmeta-Does-Not-Exist'): + req = swift.common.swob.Request.blank( + '/v1/a/c/o', method=method, + headers={'If-Match': match_value, + 'X-Backend-Etag-Is-At': etag_is_at}) + get_resp = [backend_status] * num_responses + resp_headers = {'Etag': 'frag_etag', + 'X-Object-Sysmeta-Ec-Etag': 'data_etag', + 'X-Object-Sysmeta-Alternate-Etag': 'alt_etag'} + with set_http_connect(*get_resp, headers=resp_headers): + resp = req.get_response(self.app) + self.assertEqual('data_etag', resp.headers['Etag']) + return resp + + # wildcard + resp = _do_test('*', 200) + self.assertEqual(resp.status_int, 200) + + # match + resp = _do_test('"data_etag"', 200) + self.assertEqual(resp.status_int, 200) + + # no match + resp = _do_test('"frag_etag"', 412) + self.assertEqual(resp.status_int, 412) + + # match wildcard against an alternate etag + resp = _do_test('*', 200, + etag_is_at='X-Object-Sysmeta-Alternate-Etag') + self.assertEqual(resp.status_int, 200) + + # match against an alternate etag + resp = _do_test('"alt_etag"', 200, + etag_is_at='X-Object-Sysmeta-Alternate-Etag') + self.assertEqual(resp.status_int, 200) + + # no match against an alternate etag + resp = _do_test('"data_etag"', 412, + etag_is_at='X-Object-Sysmeta-Alternate-Etag') + self.assertEqual(resp.status_int, 412) + + def test_GET_if_match(self): + self._test_if_match('GET') + + def test_HEAD_if_match(self): + self._test_if_match('HEAD') + + def _test_if_none_match(self, method): + num_responses = self.policy.ec_ndata if method == 'GET' else 1 + + def _do_test(match_value, backend_status, + etag_is_at='X-Object-Sysmeta-Does-Not-Exist'): + req = swift.common.swob.Request.blank( + '/v1/a/c/o', method=method, + headers={'If-None-Match': match_value, + 'X-Backend-Etag-Is-At': etag_is_at}) + get_resp = [backend_status] * num_responses + resp_headers = {'Etag': 'frag_etag', + 'X-Object-Sysmeta-Ec-Etag': 'data_etag', + 'X-Object-Sysmeta-Alternate-Etag': 'alt_etag'} + with set_http_connect(*get_resp, headers=resp_headers): + resp = req.get_response(self.app) + self.assertEqual('data_etag', resp.headers['Etag']) + return resp + + # wildcard + resp = _do_test('*', 304) + self.assertEqual(resp.status_int, 304) + + # match + resp = _do_test('"data_etag"', 304) + self.assertEqual(resp.status_int, 304) + + # no match + resp = _do_test('"frag_etag"', 200) + self.assertEqual(resp.status_int, 200) + + # match wildcard against an alternate etag + resp = _do_test('*', 304, + etag_is_at='X-Object-Sysmeta-Alternate-Etag') + self.assertEqual(resp.status_int, 304) + + # match against an alternate etag + resp = _do_test('"alt_etag"', 304, + etag_is_at='X-Object-Sysmeta-Alternate-Etag') + self.assertEqual(resp.status_int, 304) + + # no match against an alternate etag + resp = _do_test('"data_etag"', 200, + etag_is_at='X-Object-Sysmeta-Alternate-Etag') + self.assertEqual(resp.status_int, 200) + + def test_GET_if_none_match(self): + self._test_if_none_match('GET') + + def test_HEAD_if_none_match(self): + self._test_if_none_match('HEAD') + def test_GET_simple_x_newest(self): req = swift.common.swob.Request.blank('/v1/a/c/o', headers={'X-Newest': 'true'}) @@ -1194,6 +1487,42 @@ class TestECObjController(BaseObjectControllerMixin, unittest.TestCase): resp = req.get_response(self.app) self.assertEqual(resp.status_int, 201) + def test_PUT_with_body_and_bad_etag(self): + segment_size = self.policy.ec_segment_size + test_body = ('asdf' * segment_size)[:-10] + codes = [201] * self.replicas() + expect_headers = { + 'X-Obj-Metadata-Footer': 'yes', + 'X-Obj-Multiphase-Commit': 'yes' + } + conns = [] + + def capture_expect(conn): + # stash the backend connection so we can verify that it is closed + # (no data will be sent) + conns.append(conn) + + # send a bad etag in the request headers + headers = {'Etag': 'bad etag'} + req = swift.common.swob.Request.blank( + '/v1/a/c/o', method='PUT', headers=headers, body=test_body) + with set_http_connect(*codes, expect_headers=expect_headers, + give_expect=capture_expect): + resp = req.get_response(self.app) + self.assertEqual(422, resp.status_int) + self.assertEqual(self.replicas(), len(conns)) + for conn in conns: + self.assertTrue(conn.closed) + + # make the footers callback send a bad Etag footer + footers_callback = make_footers_callback('not the test body') + env = {'swift.callback.update_footers': footers_callback} + req = swift.common.swob.Request.blank( + '/v1/a/c/o', method='PUT', environ=env, body=test_body) + with set_http_connect(*codes, expect_headers=expect_headers): + resp = req.get_response(self.app) + self.assertEqual(422, resp.status_int) + def test_txn_id_logging_ECPUT(self): req = swift.common.swob.Request.blank('/v1/a/c/o', method='PUT', body='') @@ -1399,9 +1728,15 @@ class TestECObjController(BaseObjectControllerMixin, unittest.TestCase): self.assertEqual(resp.status_int, 500) def test_PUT_with_body(self): - req = swift.common.swob.Request.blank('/v1/a/c/o', method='PUT') segment_size = self.policy.ec_segment_size test_body = ('asdf' * segment_size)[:-10] + # make the footers callback not include Etag footer so that we can + # verify that the correct EC-calculated Etag is included in footers + # sent to backend + footers_callback = make_footers_callback() + env = {'swift.callback.update_footers': footers_callback} + req = swift.common.swob.Request.blank( + '/v1/a/c/o', method='PUT', environ=env) etag = md5(test_body).hexdigest() size = len(test_body) req.body = test_body @@ -1413,8 +1748,8 @@ class TestECObjController(BaseObjectControllerMixin, unittest.TestCase): put_requests = defaultdict(lambda: {'boundary': None, 'chunks': []}) - def capture_body(conn_id, chunk): - put_requests[conn_id]['chunks'].append(chunk) + def capture_body(conn, chunk): + put_requests[conn.connection_id]['chunks'].append(chunk) def capture_headers(ip, port, device, part, method, path, headers, **kwargs): @@ -1471,13 +1806,16 @@ class TestECObjController(BaseObjectControllerMixin, unittest.TestCase): self.assertEqual(footer_part['X-Document'], 'object metadata') footer_metadata = json.loads(footer_part.get_payload()) self.assertTrue(footer_metadata) - expected = { - 'X-Object-Sysmeta-EC-Content-Length': str(size), + expected = {} + # update expected with footers from the callback... + footers_callback(expected) + expected.update({ + 'X-Object-Sysmeta-Ec-Content-Length': str(size), 'X-Backend-Container-Update-Override-Size': str(size), - 'X-Object-Sysmeta-EC-Etag': etag, + 'X-Object-Sysmeta-Ec-Etag': etag, 'X-Backend-Container-Update-Override-Etag': etag, - 'X-Object-Sysmeta-EC-Segment-Size': str(segment_size), - } + 'X-Object-Sysmeta-Ec-Segment-Size': str(segment_size), + 'Etag': md5(obj_part.get_payload()).hexdigest()}) for header, value in expected.items(): self.assertEqual(footer_metadata[header], value) @@ -1504,6 +1842,118 @@ class TestECObjController(BaseObjectControllerMixin, unittest.TestCase): self.assertEqual(len(test_body), len(expected_body)) self.assertEqual(test_body, expected_body) + def test_PUT_with_footers(self): + # verify footers supplied by a footers callback being added to + # trailing metadata + segment_size = self.policy.ec_segment_size + test_body = ('asdf' * segment_size)[:-10] + etag = md5(test_body).hexdigest() + size = len(test_body) + codes = [201] * self.replicas() + expect_headers = { + 'X-Obj-Metadata-Footer': 'yes', + 'X-Obj-Multiphase-Commit': 'yes' + } + + def do_test(footers_to_add, expect_added): + put_requests = defaultdict( + lambda: {'boundary': None, 'chunks': []}) + + def capture_body(conn, chunk): + put_requests[conn.connection_id]['chunks'].append(chunk) + + def capture_headers(ip, port, device, part, method, path, headers, + **kwargs): + conn_id = kwargs['connection_id'] + put_requests[conn_id]['boundary'] = headers[ + 'X-Backend-Obj-Multipart-Mime-Boundary'] + + def footers_callback(footers): + footers.update(footers_to_add) + env = {'swift.callback.update_footers': footers_callback} + req = swift.common.swob.Request.blank( + '/v1/a/c/o', method='PUT', environ=env, body=test_body) + + with set_http_connect(*codes, expect_headers=expect_headers, + give_send=capture_body, + give_connect=capture_headers): + resp = req.get_response(self.app) + + self.assertEqual(resp.status_int, 201) + for connection_id, info in put_requests.items(): + body = unchunk_body(''.join(info['chunks'])) + # email.parser.FeedParser doesn't know how to take a multipart + # message and boundary together and parse it; it only knows how + # to take a string, parse the headers, and figure out the + # boundary on its own. + parser = email.parser.FeedParser() + parser.feed( + "Content-Type: multipart/nobodycares; boundary=%s\r\n\r\n" + % info['boundary']) + parser.feed(body) + message = parser.close() + + self.assertTrue(message.is_multipart()) # sanity check + mime_parts = message.get_payload() + self.assertEqual(len(mime_parts), 3) + obj_part, footer_part, commit_part = mime_parts + + # validate EC footer metadata - should always be present + self.assertEqual(footer_part['X-Document'], 'object metadata') + footer_metadata = json.loads(footer_part.get_payload()) + self.assertIsNotNone( + footer_metadata.pop('X-Object-Sysmeta-Ec-Frag-Index')) + expected = { + 'X-Object-Sysmeta-Ec-Scheme': + self.policy.ec_scheme_description, + 'X-Object-Sysmeta-Ec-Content-Length': str(size), + 'X-Object-Sysmeta-Ec-Etag': etag, + 'X-Object-Sysmeta-Ec-Segment-Size': str(segment_size), + 'Etag': md5(obj_part.get_payload()).hexdigest()} + expected.update(expect_added) + for header, value in expected.items(): + self.assertIn(header, footer_metadata) + self.assertEqual(value, footer_metadata[header]) + footer_metadata.pop(header) + self.assertFalse(footer_metadata) + + # sanity check - middleware sets no footer, expect EC overrides + footers_to_add = {} + expect_added = { + 'X-Backend-Container-Update-Override-Size': str(size), + 'X-Backend-Container-Update-Override-Etag': etag} + do_test(footers_to_add, expect_added) + + # middleware cannot overwrite any EC sysmeta + footers_to_add = { + 'X-Object-Sysmeta-Ec-Content-Length': str(size + 1), + 'X-Object-Sysmeta-Ec-Etag': 'other etag', + 'X-Object-Sysmeta-Ec-Segment-Size': str(segment_size + 1), + 'X-Object-Sysmeta-Ec-Unused-But-Reserved': 'ignored'} + do_test(footers_to_add, expect_added) + + # middleware can add x-object-sysmeta- headers including + # x-object-sysmeta-container-update-override headers + footers_to_add = { + 'X-Object-Sysmeta-Foo': 'bar', + 'X-Object-Sysmeta-Container-Update-Override-Size': + str(size + 1), + 'X-Object-Sysmeta-Container-Update-Override-Etag': 'other etag', + 'X-Object-Sysmeta-Container-Update-Override-Ping': 'pong' + } + expect_added.update(footers_to_add) + do_test(footers_to_add, expect_added) + + # middleware can also overwrite x-backend-container-update-override + # headers + override_footers = { + 'X-Backend-Container-Update-Override-Wham': 'bam', + 'X-Backend-Container-Update-Override-Size': str(size + 2), + 'X-Backend-Container-Update-Override-Etag': 'another etag'} + footers_to_add.update(override_footers) + expect_added.update(override_footers) + do_test(footers_to_add, expect_added) + def test_PUT_old_obj_server(self): req = swift.common.swob.Request.blank('/v1/a/c/o', method='PUT', body='') diff --git a/test/unit/proxy/test_server.py b/test/unit/proxy/test_server.py index 6ae48bc605..f43ca5778e 100644 --- a/test/unit/proxy/test_server.py +++ b/test/unit/proxy/test_server.py @@ -2011,7 +2011,7 @@ class TestObjectController(unittest.TestCase): call_count[0] += 1 commit_confirmation = \ - 'swift.proxy.controllers.obj.ECPutter.send_commit_confirmation' + 'swift.proxy.controllers.obj.MIMEPutter.send_commit_confirmation' with mock.patch('swift.obj.server.md5', busted_md5_constructor), \ mock.patch(commit_confirmation, mock_committer): @@ -2062,7 +2062,7 @@ class TestObjectController(unittest.TestCase): read_footer = \ 'swift.obj.server.ObjectController._read_metadata_footer' commit_confirmation = \ - 'swift.proxy.controllers.obj.ECPutter.send_commit_confirmation' + 'swift.proxy.controllers.obj.MIMEPutter.send_commit_confirmation' with mock.patch(read_footer) as read_footer_call, \ mock.patch(commit_confirmation, mock_committer):