diff --git a/swift/common/swob.py b/swift/common/swob.py index 1c43316ba4..729cdd96fd 100644 --- a/swift/common/swob.py +++ b/swift/common/swob.py @@ -928,6 +928,10 @@ class Request(object): if entity_path is not None: return '/' + entity_path + @property + def is_chunked(self): + return 'chunked' in self.headers.get('transfer-encoding', '') + @property def url(self): "Provides the full url of the request" diff --git a/swift/proxy/controllers/obj.py b/swift/proxy/controllers/obj.py index 1b9bcab61d..70b0d0cf68 100644 --- a/swift/proxy/controllers/obj.py +++ b/swift/proxy/controllers/obj.py @@ -52,12 +52,13 @@ from swift.common.http import ( HTTP_MULTIPLE_CHOICES, HTTP_NOT_FOUND, HTTP_INTERNAL_SERVER_ERROR, HTTP_SERVICE_UNAVAILABLE, HTTP_INSUFFICIENT_STORAGE, HTTP_PRECONDITION_FAILED, HTTP_CONFLICT) +from swift.common.storage_policy import POLICIES from swift.proxy.controllers.base import Controller, delay_denial, \ cors_validation from swift.common.swob import HTTPAccepted, HTTPBadRequest, HTTPNotFound, \ HTTPPreconditionFailed, HTTPRequestEntityTooLarge, HTTPRequestTimeout, \ HTTPServerError, HTTPServiceUnavailable, Request, \ - HTTPClientDisconnect, HeaderKeyDict + HTTPClientDisconnect, HeaderKeyDict, HTTPException from swift.common.request_helpers import is_sys_or_user_meta, is_sys_meta, \ remove_items, copy_header_subset @@ -321,7 +322,13 @@ class ObjectController(Controller): def _connect_put_node(self, nodes, part, path, headers, logger_thread_locals): - """Method for a file PUT connect""" + """ + Make a connection for a replicated object. + + Connects to the first working node that it finds in node_iter + and sends over the request headers. Returns an HTTPConnection + object to handle the rest of the streaming. + """ self.app.logger.thread_locals = logger_thread_locals for node in nodes: try: @@ -350,36 +357,41 @@ class ObjectController(Controller): self.app.error_limit(node, _('ERROR Insufficient Storage')) elif is_server_error(resp.status): self.app.error_occurred( - node, _('ERROR %(status)d Expect: 100-continue ' - 'From Object Server') % { - 'status': resp.status}) + node, + _('ERROR %(status)d Expect: 100-continue ' + 'From Object Server') % { + 'status': resp.status}) except (Exception, Timeout): self.app.exception_occurred( node, _('Object'), _('Expect: 100-continue on %s') % path) + def _await_response(self, conn, **kwargs): + with Timeout(self.app.node_timeout): + if conn.resp: + return conn.resp + else: + return conn.getresponse() + + def _get_conn_response(self, conn, req, **kwargs): + try: + resp = self._await_response(conn, **kwargs) + return (conn, resp) + except (Exception, Timeout): + self.app.exception_occurred( + conn.node, _('Object'), + _('Trying to get final status of PUT to %s') % req.path) + return (None, None) + def _get_put_responses(self, req, conns, nodes): statuses = [] reasons = [] bodies = [] etags = set() - def get_conn_response(conn): - try: - with Timeout(self.app.node_timeout): - if conn.resp: - return (conn, conn.resp) - else: - return (conn, conn.getresponse()) - except (Exception, Timeout): - self.app.exception_occurred( - conn.node, _('Object'), - _('Trying to get final status of PUT to %s') % req.path) - return (None, None) - pile = GreenAsyncPile(len(conns)) for conn in conns: - pile.spawn(get_conn_response, conn) + pile.spawn(self._get_conn_response, conn, req) def _handle_response(conn, response): statuses.append(response.status) @@ -440,56 +452,135 @@ class ObjectController(Controller): return req, delete_at_container, delete_at_part, delete_at_nodes - @public - @cors_validation - @delay_denial - def PUT(self, req): - """HTTP PUT request handler.""" - if req.if_none_match is not None and '*' not in req.if_none_match: - # Sending an etag with if-none-match isn't currently supported - return HTTPBadRequest(request=req, content_type='text/plain', - body='If-None-Match only supports *') + def _handle_copy_request(self, req): + """ + This method handles copying objects based on values set in the headers + 'X-Copy-From' and 'X-Copy-From-Account' + + This method was added as part of the refactoring of the PUT method and + the functionality is expected to be moved to middleware + """ + if req.environ.get('swift.orig_req_method', req.method) != 'POST': + req.environ.setdefault('swift.log_info', []).append( + 'x-copy-from:%s' % req.headers['X-Copy-From']) + ver, acct, _rest = req.split_path(2, 3, True) + src_account_name = req.headers.get('X-Copy-From-Account', None) + if src_account_name: + src_account_name = check_account_format(req, src_account_name) + else: + src_account_name = acct + src_container_name, src_obj_name = check_copy_from_header(req) + source_header = '/%s/%s/%s/%s' % ( + ver, src_account_name, src_container_name, src_obj_name) + source_req = req.copy_get() + + # make sure the source request uses it's container_info + source_req.headers.pop('X-Backend-Storage-Policy-Index', None) + source_req.path_info = source_header + source_req.headers['X-Newest'] = 'true' + + orig_obj_name = self.object_name + orig_container_name = self.container_name + orig_account_name = self.account_name + sink_req = Request.blank(req.path_info, + environ=req.environ, headers=req.headers) + + self.object_name = src_obj_name + self.container_name = src_container_name + self.account_name = src_account_name + source_resp = self.GET(source_req) + + # This gives middlewares a way to change the source; for example, + # this lets you COPY a SLO manifest and have the new object be the + # concatenation of the segments (like what a GET request gives + # the client), not a copy of the manifest file. + hook = req.environ.get( + 'swift.copy_hook', + (lambda source_req, source_resp, sink_req: source_resp)) + source_resp = hook(source_req, source_resp, sink_req) + + # reset names + self.object_name = orig_obj_name + self.container_name = orig_container_name + self.account_name = orig_account_name + + if source_resp.status_int >= HTTP_MULTIPLE_CHOICES: + # this is a bit of ugly code, but I'm willing to live with it + # until copy request handling moves to middleware + return source_resp, None, None, None + if source_resp.content_length is None: + # This indicates a transfer-encoding: chunked source object, + # which currently only happens because there are more than + # CONTAINER_LISTING_LIMIT segments in a segmented object. In + # this case, we're going to refuse to do the server-side copy. + raise HTTPRequestEntityTooLarge(request=req) + if source_resp.content_length > constraints.MAX_FILE_SIZE: + raise HTTPRequestEntityTooLarge(request=req) + + data_source = iter(source_resp.app_iter) + sink_req.content_length = source_resp.content_length + sink_req.etag = source_resp.etag + + # we no longer need the X-Copy-From header + del sink_req.headers['X-Copy-From'] + if 'X-Copy-From-Account' in sink_req.headers: + del sink_req.headers['X-Copy-From-Account'] + if not req.content_type_manually_set: + sink_req.headers['Content-Type'] = \ + source_resp.headers['Content-Type'] + if config_true_value( + sink_req.headers.get('x-fresh-metadata', 'false')): + # post-as-copy: ignore new sysmeta, copy existing sysmeta + condition = lambda k: is_sys_meta('object', k) + remove_items(sink_req.headers, condition) + copy_header_subset(source_resp, sink_req, condition) + else: + # copy/update existing sysmeta and user meta + copy_headers_into(source_resp, sink_req) + copy_headers_into(req, sink_req) + + # copy over x-static-large-object for POSTs and manifest copies + if 'X-Static-Large-Object' in source_resp.headers and \ + req.params.get('multipart-manifest') == 'get': + sink_req.headers['X-Static-Large-Object'] = \ + source_resp.headers['X-Static-Large-Object'] + + req = sink_req + + def update_response(req, resp): + acct, path = source_resp.environ['PATH_INFO'].split('/', 3)[2:4] + resp.headers['X-Copied-From-Account'] = quote(acct) + resp.headers['X-Copied-From'] = quote(path) + if 'last-modified' in source_resp.headers: + resp.headers['X-Copied-From-Last-Modified'] = \ + source_resp.headers['last-modified'] + copy_headers_into(req, resp) + return resp + + # this is a bit of ugly code, but I'm willing to live with it + # until copy request handling moves to middleware + return None, req, data_source, update_response + + def _handle_object_versions(self, req): + """ + This method handles versionining of objects in containers that + have the feature enabled. + + When a new PUT request is sent, the proxy checks for previous versions + of that same object name. If found, it is copied to a different + container and the new version is stored in its place. + + This method was added as part of the PUT method refactoring and the + functionality is expected to be moved to middleware + """ container_info = self.container_info( self.account_name, self.container_name, req) policy_index = req.headers.get('X-Backend-Storage-Policy-Index', container_info['storage_policy']) obj_ring = self.app.get_object_ring(policy_index) - - # pass the policy index to storage nodes via req header - req.headers['X-Backend-Storage-Policy-Index'] = policy_index - container_partition = container_info['partition'] - containers = container_info['nodes'] - req.acl = container_info['write_acl'] - req.environ['swift_sync_key'] = container_info['sync_key'] - object_versions = container_info['versions'] - if 'swift.authorize' in req.environ: - aresp = req.environ['swift.authorize'](req) - if aresp: - return aresp - - if not containers: - return HTTPNotFound(request=req) - - # Sometimes the 'content-type' header exists, but is set to None. - content_type_manually_set = True - detect_content_type = \ - config_true_value(req.headers.get('x-detect-content-type')) - if detect_content_type or not req.headers.get('content-type'): - guessed_type, _junk = mimetypes.guess_type(req.path_info) - req.headers['Content-Type'] = guessed_type or \ - 'application/octet-stream' - if detect_content_type: - req.headers.pop('x-detect-content-type') - else: - content_type_manually_set = False - - error_response = check_object_creation(req, self.object_name) or \ - check_content_type(req) - if error_response: - return error_response - partition, nodes = obj_ring.get_nodes( self.account_name, self.container_name, self.object_name) + object_versions = container_info['versions'] # do a HEAD request for checking object versions if object_versions and not req.environ.get('swift_versioned_copy'): @@ -502,20 +593,6 @@ class ObjectController(Controller): hreq, _('Object'), obj_ring, partition, hreq.swift_entity_path) - # Used by container sync feature - if 'x-timestamp' in req.headers: - try: - req_timestamp = Timestamp(req.headers['X-Timestamp']) - except ValueError: - return HTTPBadRequest( - request=req, content_type='text/plain', - body='X-Timestamp should be a UNIX timestamp float value; ' - 'was %r' % req.headers['x-timestamp']) - req.headers['X-Timestamp'] = req_timestamp.internal - else: - req.headers['X-Timestamp'] = Timestamp(time.time()).internal - - if object_versions and not req.environ.get('swift_versioned_copy'): is_manifest = 'X-Object-Manifest' in req.headers or \ 'X-Object-Manifest' in hresp.headers if hresp.status_int != HTTP_NOT_FOUND and not is_manifest: @@ -543,120 +620,41 @@ class ObjectController(Controller): copy_resp = self.COPY(copy_req) if is_client_error(copy_resp.status_int): # missing container or bad permissions - return HTTPPreconditionFailed(request=req) + raise HTTPPreconditionFailed(request=req) elif not is_success(copy_resp.status_int): # could not copy the data, bail - return HTTPServiceUnavailable(request=req) + raise HTTPServiceUnavailable(request=req) - reader = req.environ['wsgi.input'].read - data_source = iter(lambda: reader(self.app.client_chunk_size), '') - source_header = req.headers.get('X-Copy-From') - source_resp = None - if source_header: - if req.environ.get('swift.orig_req_method', req.method) != 'POST': - req.environ.setdefault('swift.log_info', []).append( - 'x-copy-from:%s' % source_header) - ver, acct, _rest = req.split_path(2, 3, True) - src_account_name = req.headers.get('X-Copy-From-Account', None) - if src_account_name: - src_account_name = check_account_format(req, src_account_name) + def _update_content_type(self, req): + # Sometimes the 'content-type' header exists, but is set to None. + req.content_type_manually_set = True + detect_content_type = \ + config_true_value(req.headers.get('x-detect-content-type')) + if detect_content_type or not req.headers.get('content-type'): + guessed_type, _junk = mimetypes.guess_type(req.path_info) + req.headers['Content-Type'] = guessed_type or \ + 'application/octet-stream' + if detect_content_type: + req.headers.pop('x-detect-content-type') else: - src_account_name = acct - src_container_name, src_obj_name = check_copy_from_header(req) - source_header = '/%s/%s/%s/%s' % ( - ver, src_account_name, src_container_name, src_obj_name) - source_req = req.copy_get() + req.content_type_manually_set = False - # make sure the source request uses it's container_info - source_req.headers.pop('X-Backend-Storage-Policy-Index', None) - source_req.path_info = source_header - source_req.headers['X-Newest'] = 'true' - orig_obj_name = self.object_name - orig_container_name = self.container_name - orig_account_name = self.account_name - self.object_name = src_obj_name - self.container_name = src_container_name - self.account_name = src_account_name - sink_req = Request.blank(req.path_info, - environ=req.environ, headers=req.headers) - source_resp = self.GET(source_req) - - # This gives middlewares a way to change the source; for example, - # this lets you COPY a SLO manifest and have the new object be the - # concatenation of the segments (like what a GET request gives - # the client), not a copy of the manifest file. - hook = req.environ.get( - 'swift.copy_hook', - (lambda source_req, source_resp, sink_req: source_resp)) - source_resp = hook(source_req, source_resp, sink_req) - - if source_resp.status_int >= HTTP_MULTIPLE_CHOICES: - return source_resp - self.object_name = orig_obj_name - self.container_name = orig_container_name - self.account_name = orig_account_name - data_source = iter(source_resp.app_iter) - sink_req.content_length = source_resp.content_length - if sink_req.content_length is None: - # This indicates a transfer-encoding: chunked source object, - # which currently only happens because there are more than - # CONTAINER_LISTING_LIMIT segments in a segmented object. In - # this case, we're going to refuse to do the server-side copy. - return HTTPRequestEntityTooLarge(request=req) - if sink_req.content_length > constraints.MAX_FILE_SIZE: - return HTTPRequestEntityTooLarge(request=req) - sink_req.etag = source_resp.etag - - # we no longer need the X-Copy-From header - del sink_req.headers['X-Copy-From'] - if 'X-Copy-From-Account' in sink_req.headers: - del sink_req.headers['X-Copy-From-Account'] - if not content_type_manually_set: - sink_req.headers['Content-Type'] = \ - source_resp.headers['Content-Type'] - if config_true_value( - sink_req.headers.get('x-fresh-metadata', 'false')): - # post-as-copy: ignore new sysmeta, copy existing sysmeta - condition = lambda k: is_sys_meta('object', k) - remove_items(sink_req.headers, condition) - copy_header_subset(source_resp, sink_req, condition) - else: - # copy/update existing sysmeta and user meta - copy_headers_into(source_resp, sink_req) - copy_headers_into(req, sink_req) - - # copy over x-static-large-object for POSTs and manifest copies - if 'X-Static-Large-Object' in source_resp.headers and \ - req.params.get('multipart-manifest') == 'get': - sink_req.headers['X-Static-Large-Object'] = \ - source_resp.headers['X-Static-Large-Object'] - - req = sink_req - - req, delete_at_container, delete_at_part, \ - delete_at_nodes = self._config_obj_expiration(req) - - node_iter = GreenthreadSafeIterator( - self.iter_nodes_local_first(obj_ring, partition)) - pile = GreenPile(len(nodes)) - te = req.headers.get('transfer-encoding', '') - chunked = ('chunked' in te) - - outgoing_headers = self._backend_requests( - req, len(nodes), container_partition, containers, - delete_at_container, delete_at_part, delete_at_nodes) - - for nheaders in outgoing_headers: - # RFC2616:8.2.3 disallows 100-continue without a body - if (req.content_length > 0) or chunked: - nheaders['Expect'] = '100-continue' - pile.spawn(self._connect_put_node, node_iter, partition, - req.swift_entity_path, nheaders, - self.app.logger.thread_locals) - - conns = [conn for conn in pile if conn] - min_conns = quorum_size(len(nodes)) + def _update_x_timestamp(self, req): + # Used by container sync feature + if 'x-timestamp' in req.headers: + try: + req_timestamp = Timestamp(req.headers['X-Timestamp']) + except ValueError: + raise HTTPBadRequest( + request=req, content_type='text/plain', + body='X-Timestamp should be a UNIX timestamp float value; ' + 'was %r' % req.headers['x-timestamp']) + req.headers['X-Timestamp'] = req_timestamp.internal + else: + req.headers['X-Timestamp'] = Timestamp(time.time()).internal + return None + def _check_failure_put_connections(self, conns, req, nodes): if req.if_none_match is not None and '*' in req.if_none_match: statuses = [conn.resp.status for conn in conns if conn.resp] if HTTP_PRECONDITION_FAILED in statuses: @@ -664,7 +662,7 @@ class ObjectController(Controller): self.app.logger.debug( _('Object PUT returning 412, %(statuses)r'), {'statuses': statuses}) - return HTTPPreconditionFailed(request=req) + raise HTTPPreconditionFailed(request=req) if any(conn for conn in conns if conn.resp and conn.resp.status == HTTP_CONFLICT): @@ -675,14 +673,44 @@ class ObjectController(Controller): '%(req_timestamp)s <= %(timestamps)r'), {'req_timestamp': req.timestamp.internal, 'timestamps': ', '.join(timestamps)}) - return HTTPAccepted(request=req) + raise HTTPAccepted(request=req) + + min_conns = quorum_size(len(nodes)) + self._check_min_conn(req, conns, min_conns) + + def _get_put_connections(self, req, nodes, partition, outgoing_headers, + policy, expect): + """ + Establish connections to storage nodes for PUT request + """ + obj_ring = policy.object_ring + node_iter = GreenthreadSafeIterator( + self.iter_nodes_local_first(obj_ring, partition)) + pile = GreenPile(len(nodes)) + + for nheaders in outgoing_headers: + if expect: + nheaders['Expect'] = '100-continue' + pile.spawn(self._connect_put_node, node_iter, partition, + req.swift_entity_path, nheaders, + self.app.logger.thread_locals) + + conns = [conn for conn in pile if conn] + + return conns + + def _check_min_conn(self, req, conns, min_conns, msg=None): + msg = msg or 'Object PUT returning 503, %(conns)s/%(nodes)s ' \ + 'required connections' if len(conns) < min_conns: - self.app.logger.error( - _('Object PUT returning 503, %(conns)s/%(nodes)s ' - 'required connections'), - {'conns': len(conns), 'nodes': min_conns}) - return HTTPServiceUnavailable(request=req) + self.app.logger.error((msg), + {'conns': len(conns), 'nodes': min_conns}) + raise HTTPServiceUnavailable(request=req) + + def _transfer_data(self, req, data_source, conns, nodes): + min_conns = quorum_size(len(nodes)) + bytes_transferred = 0 try: with ContextPool(len(nodes)) as pool: @@ -695,48 +723,90 @@ class ObjectController(Controller): try: chunk = next(data_source) except StopIteration: - if chunked: + if req.is_chunked: for conn in conns: conn.queue.put('0\r\n\r\n') break bytes_transferred += len(chunk) if bytes_transferred > constraints.MAX_FILE_SIZE: - return HTTPRequestEntityTooLarge(request=req) + raise HTTPRequestEntityTooLarge(request=req) for conn in list(conns): if not conn.failed: conn.queue.put( '%x\r\n%s\r\n' % (len(chunk), chunk) - if chunked else chunk) + if req.is_chunked else chunk) else: + conn.close() conns.remove(conn) - if len(conns) < min_conns: - self.app.logger.error(_( - 'Object PUT exceptions during' - ' send, %(conns)s/%(nodes)s required connections'), - {'conns': len(conns), 'nodes': min_conns}) - return HTTPServiceUnavailable(request=req) + self._check_min_conn( + req, conns, min_conns, + msg='Object PUT exceptions during' + ' send, %(conns)s/%(nodes)s required connections') for conn in conns: if conn.queue.unfinished_tasks: conn.queue.join() conns = [conn for conn in conns if not conn.failed] + self._check_min_conn( + req, conns, min_conns, + msg='Object PUT exceptions after last send, ' + '%(conns)s/%(nodes)s required connections') except ChunkReadTimeout as err: self.app.logger.warn( _('ERROR Client read timeout (%ss)'), err.seconds) self.app.logger.increment('client_timeouts') - return HTTPRequestTimeout(request=req) + raise HTTPRequestTimeout(request=req) + except HTTPException: + raise except (Exception, Timeout): self.app.logger.exception( _('ERROR Exception causing client disconnect')) - return HTTPClientDisconnect(request=req) + raise HTTPClientDisconnect(request=req) if req.content_length and bytes_transferred < req.content_length: req.client_disconnect = True self.app.logger.warn( _('Client disconnected without sending enough data')) self.app.logger.increment('client_disconnects') - return HTTPClientDisconnect(request=req) + raise HTTPClientDisconnect(request=req) - statuses, reasons, bodies, etags = self._get_put_responses(req, conns, - nodes) + def _store_object(self, req, data_source, nodes, partition, + outgoing_headers): + """ + Store a replicated object. + + This method is responsible for establishing connection + with storage nodes and sending object to each one of those + nodes. After sending the data, the "best" reponse will be + returned based on statuses from all connections + """ + policy_idx = req.headers.get('X-Backend-Storage-Policy-Index') + policy = POLICIES.get_by_index(policy_idx) + if not nodes: + return HTTPNotFound() + + # RFC2616:8.2.3 disallows 100-continue without a body + if (req.content_length > 0) or req.is_chunked: + expect = True + else: + expect = False + conns = self._get_put_connections(req, nodes, partition, + outgoing_headers, policy, expect) + + try: + # check that a minimum number of connections were established and + # meet all the correct conditions set in the request + self._check_failure_put_connections(conns, req, nodes) + + # transfer data + self._transfer_data(req, data_source, conns, nodes) + + # get responses + statuses, reasons, bodies, etags = self._get_put_responses( + req, conns, nodes) + except HTTPException as resp: + return resp + finally: + for conn in conns: + conn.close() if len(etags) > 1: self.app.logger.error( @@ -745,18 +815,83 @@ class ObjectController(Controller): etag = etags.pop() if len(etags) else None resp = self.best_response(req, statuses, reasons, bodies, _('Object PUT'), etag=etag) - if source_header: - acct, path = source_header.split('/', 3)[2:4] - resp.headers['X-Copied-From-Account'] = quote(acct) - resp.headers['X-Copied-From'] = quote(path) - if 'last-modified' in source_resp.headers: - resp.headers['X-Copied-From-Last-Modified'] = \ - source_resp.headers['last-modified'] - copy_headers_into(req, resp) resp.last_modified = math.ceil( float(Timestamp(req.headers['X-Timestamp']))) return resp + @public + @cors_validation + @delay_denial + def PUT(self, req): + """HTTP PUT request handler.""" + if req.if_none_match is not None and '*' not in req.if_none_match: + # Sending an etag with if-none-match isn't currently supported + return HTTPBadRequest(request=req, content_type='text/plain', + body='If-None-Match only supports *') + container_info = self.container_info( + self.account_name, self.container_name, req) + policy_index = req.headers.get('X-Backend-Storage-Policy-Index', + container_info['storage_policy']) + obj_ring = self.app.get_object_ring(policy_index) + container_nodes = container_info['nodes'] + container_partition = container_info['partition'] + partition, nodes = obj_ring.get_nodes( + self.account_name, self.container_name, self.object_name) + + # pass the policy index to storage nodes via req header + req.headers['X-Backend-Storage-Policy-Index'] = policy_index + req.acl = container_info['write_acl'] + req.environ['swift_sync_key'] = container_info['sync_key'] + + # is request authorized + if 'swift.authorize' in req.environ: + aresp = req.environ['swift.authorize'](req) + if aresp: + return aresp + + if not container_info['nodes']: + return HTTPNotFound(request=req) + + # update content type in case it is missing + self._update_content_type(req) + + # check constraints on object name and request headers + error_response = check_object_creation(req, self.object_name) or \ + check_content_type(req) + if error_response: + return error_response + + self._update_x_timestamp(req) + + # check if versioning is enabled and handle copying previous version + self._handle_object_versions(req) + + # check if request is a COPY of an existing object + source_header = req.headers.get('X-Copy-From') + if source_header: + error_response, req, data_source, update_response = \ + self._handle_copy_request(req) + if error_response: + return error_response + else: + reader = req.environ['wsgi.input'].read + data_source = iter(lambda: reader(self.app.client_chunk_size), '') + update_response = lambda req, resp: resp + + # check if object is set to be automaticaly deleted (i.e. expired) + req, delete_at_container, delete_at_part, \ + delete_at_nodes = self._config_obj_expiration(req) + + # add special headers to be handled by storage nodes + outgoing_headers = self._backend_requests( + req, len(nodes), container_partition, container_nodes, + delete_at_container, delete_at_part, delete_at_nodes) + + # send object to storage nodes + resp = self._store_object( + req, data_source, nodes, partition, outgoing_headers) + return update_response(req, resp) + @public @cors_validation @delay_denial diff --git a/test/unit/__init__.py b/test/unit/__init__.py index 0e10d3bac0..da7212c987 100644 --- a/test/unit/__init__.py +++ b/test/unit/__init__.py @@ -733,6 +733,9 @@ def fake_http_connect(*code_iter, **kwargs): def getheader(self, name, default=None): return swob.HeaderKeyDict(self.getheaders()).get(name, default) + def close(self): + pass + timestamps_iter = iter(kwargs.get('timestamps') or ['1'] * len(code_iter)) etag_iter = iter(kwargs.get('etags') or [None] * len(code_iter)) if isinstance(kwargs.get('headers'), list): diff --git a/test/unit/proxy/test_server.py b/test/unit/proxy/test_server.py index 6d5bf0ed54..1b3b2e06d4 100644 --- a/test/unit/proxy/test_server.py +++ b/test/unit/proxy/test_server.py @@ -993,7 +993,10 @@ class TestObjectController(unittest.TestCase): headers={'Content-Length': '0', 'Content-Type': 'text/plain'}) self.app.update_request(req) - res = method(req) + try: + res = method(req) + except HTTPException as res: + pass self.assertEquals(res.status_int, expected) # repeat test @@ -1003,7 +1006,10 @@ class TestObjectController(unittest.TestCase): headers={'Content-Length': '0', 'Content-Type': 'text/plain'}) self.app.update_request(req) - res = method(req) + try: + res = method(req) + except HTTPException as res: + pass self.assertEquals(res.status_int, expected) @unpatch_policies @@ -1734,7 +1740,10 @@ class TestObjectController(unittest.TestCase): req = Request.blank('/v1/a/c/o.jpg', {}) req.content_length = 0 self.app.update_request(req) - res = controller.PUT(req) + try: + res = controller.PUT(req) + except HTTPException as res: + pass expected = str(expected) self.assertEquals(res.status[:len(expected)], expected) test_status_map((200, 200, 201, 201, -1), 201) # connect exc @@ -1763,7 +1772,10 @@ class TestObjectController(unittest.TestCase): environ={'REQUEST_METHOD': 'PUT'}, body='some data') self.app.update_request(req) - res = controller.PUT(req) + try: + res = controller.PUT(req) + except HTTPException as res: + pass expected = str(expected) self.assertEquals(res.status[:len(expected)], expected) test_status_map((200, 200, 201, -1, 201), 201) @@ -1805,7 +1817,10 @@ class TestObjectController(unittest.TestCase): req = Request.blank('/v1/a/c/o.jpg', {}) req.content_length = 0 self.app.update_request(req) - res = controller.PUT(req) + try: + res = controller.PUT(req) + except HTTPException as res: + pass expected = str(expected) self.assertEquals(res.status[:len(str(expected))], str(expected)) @@ -3391,7 +3406,10 @@ class TestObjectController(unittest.TestCase): self.app.update_request(req) self.app.memcache.store = {} - resp = controller.PUT(req) + try: + resp = controller.PUT(req) + except HTTPException as resp: + pass self.assertEquals(resp.status_int, 413) def test_basic_COPY(self): @@ -3632,7 +3650,10 @@ class TestObjectController(unittest.TestCase): kwargs = dict(body=copy_from_obj_body) with self.controller_context(req, *status_list, **kwargs) as controller: - resp = controller.COPY(req) + try: + resp = controller.COPY(req) + except HTTPException as resp: + pass self.assertEquals(resp.status_int, 413) @_limit_max_file_size @@ -3656,7 +3677,10 @@ class TestObjectController(unittest.TestCase): kwargs = dict(body=copy_from_obj_body) with self.controller_context(req, *status_list, **kwargs) as controller: - resp = controller.COPY(req) + try: + resp = controller.COPY(req) + except HTTPException as resp: + pass self.assertEquals(resp.status_int, 413) def test_COPY_newest(self): @@ -3698,41 +3722,46 @@ class TestObjectController(unittest.TestCase): def test_COPY_delete_at(self): with save_globals(): - given_headers = {} + backend_requests = [] - def fake_connect_put_node(nodes, part, path, headers, - logger_thread_locals): - given_headers.update(headers) + def capture_requests(ipaddr, port, device, partition, method, path, + headers=None, query_string=None): + backend_requests.append((method, path, headers)) controller = proxy_server.ObjectController(self.app, 'a', 'c', 'o') - controller._connect_put_node = fake_connect_put_node - set_http_connect(200, 200, 200, 200, 200, 201, 201, 201) + set_http_connect(200, 200, 200, 200, 200, 201, 201, 201, + give_connect=capture_requests) self.app.memcache.store = {} req = Request.blank('/v1/a/c/o', environ={'REQUEST_METHOD': 'COPY'}, headers={'Destination': '/c/o'}) self.app.update_request(req) - controller.COPY(req) - self.assertEquals(given_headers.get('X-Delete-At'), '9876543210') - self.assertTrue('X-Delete-At-Host' in given_headers) - self.assertTrue('X-Delete-At-Device' in given_headers) - self.assertTrue('X-Delete-At-Partition' in given_headers) - self.assertTrue('X-Delete-At-Container' in given_headers) + resp = controller.COPY(req) + self.assertEqual(201, resp.status_int) # sanity + for method, path, given_headers in backend_requests: + if method != 'PUT': + continue + self.assertEquals(given_headers.get('X-Delete-At'), + '9876543210') + self.assertTrue('X-Delete-At-Host' in given_headers) + self.assertTrue('X-Delete-At-Device' in given_headers) + self.assertTrue('X-Delete-At-Partition' in given_headers) + self.assertTrue('X-Delete-At-Container' in given_headers) def test_COPY_account_delete_at(self): with save_globals(): - given_headers = {} + backend_requests = [] - def fake_connect_put_node(nodes, part, path, headers, - logger_thread_locals): - given_headers.update(headers) + def capture_requests(ipaddr, port, device, partition, method, path, + headers=None, query_string=None): + backend_requests.append((method, path, headers)) controller = proxy_server.ObjectController(self.app, 'a', 'c', 'o') - controller._connect_put_node = fake_connect_put_node - set_http_connect(200, 200, 200, 200, 200, 200, 200, 201, 201, 201) + set_http_connect(200, 200, 200, 200, 200, 200, 200, 201, 201, 201, + give_connect=capture_requests) self.app.memcache.store = {} req = Request.blank('/v1/a/c/o', environ={'REQUEST_METHOD': 'COPY'}, @@ -3740,12 +3769,17 @@ class TestObjectController(unittest.TestCase): 'Destination-Account': 'a1'}) self.app.update_request(req) - controller.COPY(req) - self.assertEquals(given_headers.get('X-Delete-At'), '9876543210') - self.assertTrue('X-Delete-At-Host' in given_headers) - self.assertTrue('X-Delete-At-Device' in given_headers) - self.assertTrue('X-Delete-At-Partition' in given_headers) - self.assertTrue('X-Delete-At-Container' in given_headers) + resp = controller.COPY(req) + self.assertEqual(201, resp.status_int) # sanity + for method, path, given_headers in backend_requests: + if method != 'PUT': + continue + self.assertEquals(given_headers.get('X-Delete-At'), + '9876543210') + self.assertTrue('X-Delete-At-Host' in given_headers) + self.assertTrue('X-Delete-At-Device' in given_headers) + self.assertTrue('X-Delete-At-Partition' in given_headers) + self.assertTrue('X-Delete-At-Container' in given_headers) def test_chunked_put(self): diff --git a/test/unit/proxy/test_sysmeta.py b/test/unit/proxy/test_sysmeta.py index c3b6731082..d80f2855e4 100644 --- a/test/unit/proxy/test_sysmeta.py +++ b/test/unit/proxy/test_sysmeta.py @@ -70,6 +70,9 @@ class FakeServerConnection(WSGIContext): def send(self, data): self.data += data + def close(self): + pass + def __call__(self, ipaddr, port, device, partition, method, path, headers=None, query_string=None): self.path = quote('/' + device + '/' + str(partition) + path)