From b1eda4aef8a228961d5aafe7e4fbd4e812d233ad Mon Sep 17 00:00:00 2001 From: Samuel Merritt Date: Tue, 16 Sep 2014 18:40:41 -0700 Subject: [PATCH] Allow sending object metadata after data This lets the proxy server send object metadata to the object server after the object data. This is necessary for EC, as it allows us to compute the etag of the object in the proxy server and still store it with the object. The wire format is a multipart MIME document. For sanity during a rolling upgrade, the multipart MIME document is only sent to the object server if it indicates, via 100 Continue header, that it knows how to consume it. Example 1 (new proxy, new obj server): proxy: PUT /p/a/c/o X-Backend-Obj-Metadata-Footer: yes obj: 100 Continue X-Obj-Metadata-Footer: yes proxy: --MIMEmimeMIMEmime... Example2: (new proxy, old obj server) proxy: PUT /p/a/c/o X-Backend-Obj-Metadata-Footer: yes obj: 100 Continue proxy: Co-Authored-By: Alistair Coles Co-Authored-By: Thiago da Silva Co-Authored-By: John Dickinson Co-Authored-By: Clay Gerrard Co-Authored-By: Tushar Gohad Co-Authored-By: Paul Luse Co-Authored-By: Christian Schwede Co-Authored-By: Yuan Zhou Change-Id: Id38f7e93e3473f19ff88123ae0501000ed9b2e89 --- swift/common/request_helpers.py | 28 +- swift/common/swob.py | 63 +- swift/obj/server.py | 275 +++++-- swift/obj/ssync_receiver.py | 17 +- swift/proxy/controllers/obj.py | 3 + test/unit/common/test_request_helpers.py | 81 +- test/unit/common/test_swob.py | 21 + test/unit/obj/test_server.py | 954 +++++++++++++++++++++-- test/unit/obj/test_ssync_receiver.py | 66 +- 9 files changed, 1333 insertions(+), 175 deletions(-) diff --git a/swift/common/request_helpers.py b/swift/common/request_helpers.py index 08e0ab5dc4..14b9fd8849 100644 --- a/swift/common/request_helpers.py +++ b/swift/common/request_helpers.py @@ -26,10 +26,12 @@ import time from contextlib import contextmanager from urllib import unquote from swift import gettext_ as _ +from swift.common.storage_policy import POLICIES from swift.common.constraints import FORMAT2CONTENT_TYPE from swift.common.exceptions import ListingIterError, SegmentError from swift.common.http import is_success -from swift.common.swob import HTTPBadRequest, HTTPNotAcceptable +from swift.common.swob import (HTTPBadRequest, HTTPNotAcceptable, + HTTPServiceUnavailable) from swift.common.utils import split_path, validate_device_partition from swift.common.wsgi import make_subrequest @@ -82,21 +84,27 @@ def get_listing_content_type(req): def get_name_and_placement(request, minsegs=1, maxsegs=None, rest_with_last=False): """ - Utility function to split and validate the request path and - storage_policy_index. The storage_policy_index is extracted from - the headers of the request and converted to an integer, and then the - args are passed through to :meth:`split_and_validate_path`. + Utility function to split and validate the request path and storage + policy. The storage policy index is extracted from the headers of + the request and converted to a StoragePolicy instance. The + remaining args are passed through to + :meth:`split_and_validate_path`. :returns: a list, result of :meth:`split_and_validate_path` with - storage_policy_index appended on the end - :raises: HTTPBadRequest + the BaseStoragePolicy instance appended on the end + :raises: HTTPServiceUnavailable if the path is invalid or no policy exists + with the extracted policy_index. """ - policy_idx = request.headers.get('X-Backend-Storage-Policy-Index', '0') - policy_idx = int(policy_idx) + policy_index = request.headers.get('X-Backend-Storage-Policy-Index') + policy = POLICIES.get_by_index(policy_index) + if not policy: + raise HTTPServiceUnavailable( + body=_("No policy with index %s") % policy_index, + request=request, content_type='text/plain') results = split_and_validate_path(request, minsegs=minsegs, maxsegs=maxsegs, rest_with_last=rest_with_last) - results.append(policy_idx) + results.append(policy) return results diff --git a/swift/common/swob.py b/swift/common/swob.py index 729cdd96fd..c2e3afb4e8 100644 --- a/swift/common/swob.py +++ b/swift/common/swob.py @@ -36,7 +36,7 @@ needs to change. """ from collections import defaultdict -from cStringIO import StringIO +from StringIO import StringIO import UserDict import time from functools import partial @@ -128,6 +128,20 @@ class _UTC(tzinfo): UTC = _UTC() +class WsgiStringIO(StringIO): + """ + This class adds support for the additional wsgi.input methods defined on + eventlet.wsgi.Input to the StringIO class which would otherwise be a fine + stand-in for the file-like object in the WSGI environment. + """ + + def set_hundred_continue_response_headers(self, headers): + pass + + def send_hundred_continue_response(self): + pass + + def _datetime_property(header): """ Set and retrieve the datetime value of self.headers[header] @@ -743,16 +757,16 @@ def _req_environ_property(environ_field): def _req_body_property(): """ Set and retrieve the Request.body parameter. It consumes wsgi.input and - returns the results. On assignment, uses a StringIO to create a new + returns the results. On assignment, uses a WsgiStringIO to create a new wsgi.input. """ def getter(self): body = self.environ['wsgi.input'].read() - self.environ['wsgi.input'] = StringIO(body) + self.environ['wsgi.input'] = WsgiStringIO(body) return body def setter(self, value): - self.environ['wsgi.input'] = StringIO(value) + self.environ['wsgi.input'] = WsgiStringIO(value) self.environ['CONTENT_LENGTH'] = str(len(value)) return property(getter, setter, doc="Get and set the request body str") @@ -820,7 +834,7 @@ class Request(object): :param path: encoded, parsed, and unquoted into PATH_INFO :param environ: WSGI environ dictionary :param headers: HTTP headers - :param body: stuffed in a StringIO and hung on wsgi.input + :param body: stuffed in a WsgiStringIO and hung on wsgi.input :param kwargs: any environ key with an property setter """ headers = headers or {} @@ -855,10 +869,10 @@ class Request(object): } env.update(environ) if body is not None: - env['wsgi.input'] = StringIO(body) + env['wsgi.input'] = WsgiStringIO(body) env['CONTENT_LENGTH'] = str(len(body)) elif 'wsgi.input' not in env: - env['wsgi.input'] = StringIO('') + env['wsgi.input'] = WsgiStringIO('') req = Request(env) for key, val in headers.iteritems(): req.headers[key] = val @@ -965,7 +979,7 @@ class Request(object): env.update({ 'REQUEST_METHOD': 'GET', 'CONTENT_LENGTH': '0', - 'wsgi.input': StringIO(''), + 'wsgi.input': WsgiStringIO(''), }) return Request(env) @@ -1102,10 +1116,12 @@ class Response(object): app_iter = _resp_app_iter_property() def __init__(self, body=None, status=200, headers=None, app_iter=None, - request=None, conditional_response=False, **kw): + request=None, conditional_response=False, + conditional_etag=None, **kw): self.headers = HeaderKeyDict( [('Content-Type', 'text/html; charset=UTF-8')]) self.conditional_response = conditional_response + self._conditional_etag = conditional_etag self.request = request self.body = body self.app_iter = app_iter @@ -1131,6 +1147,26 @@ class Response(object): if 'charset' in kw and 'content_type' in kw: self.charset = kw['charset'] + @property + def conditional_etag(self): + """ + The conditional_etag keyword argument for Response will allow the + conditional match value of a If-Match request to be compared to a + non-standard value. + + This is available for Storage Policies that do not store the client + object data verbatim on the storage nodes, but still need support + conditional requests. + + It's most effectively used with X-Backend-Etag-Is-At which would + define the additional Metadata key where the original ETag of the + clear-form client request data. + """ + if self._conditional_etag is not None: + return self._conditional_etag + else: + return self.etag + def _prepare_for_ranges(self, ranges): """ Prepare the Response for multiple ranges. @@ -1161,15 +1197,16 @@ class Response(object): return content_size, content_type def _response_iter(self, app_iter, body): + etag = self.conditional_etag if self.conditional_response and self.request: - if self.etag and self.request.if_none_match and \ - self.etag in self.request.if_none_match: + if etag and self.request.if_none_match and \ + etag in self.request.if_none_match: self.status = 304 self.content_length = 0 return [''] - if self.etag and self.request.if_match and \ - self.etag not in self.request.if_match: + if etag and self.request.if_match and \ + etag not in self.request.if_match: self.status = 412 self.content_length = 0 return [''] diff --git a/swift/obj/server.py b/swift/obj/server.py index f4dbb4264b..0ebaeb5f05 100644 --- a/swift/obj/server.py +++ b/swift/obj/server.py @@ -16,10 +16,12 @@ """ Object Server for Swift """ import cPickle as pickle +import json import os import multiprocessing import time import traceback +import rfc822 import socket import math from swift import gettext_ as _ @@ -30,7 +32,7 @@ from eventlet import sleep, wsgi, Timeout from swift.common.utils import public, get_logger, \ config_true_value, timing_stats, replication, \ normalize_delete_at_timestamp, get_log_line, Timestamp, \ - get_expirer_container + get_expirer_container, iter_multipart_mime_documents from swift.common.bufferedhttp import http_connect from swift.common.constraints import check_object_creation, \ valid_timestamp, check_utf8 @@ -48,8 +50,35 @@ from swift.common.swob import HTTPAccepted, HTTPBadRequest, HTTPCreated, \ HTTPPreconditionFailed, HTTPRequestTimeout, HTTPUnprocessableEntity, \ HTTPClientDisconnect, HTTPMethodNotAllowed, Request, Response, \ HTTPInsufficientStorage, HTTPForbidden, HTTPException, HeaderKeyDict, \ - HTTPConflict -from swift.obj.diskfile import DATAFILE_SYSTEM_META, DiskFileManager + HTTPConflict, HTTPServerError +from swift.obj.diskfile import DATAFILE_SYSTEM_META, DiskFileRouter + + +def iter_mime_headers_and_bodies(wsgi_input, mime_boundary, read_chunk_size): + mime_documents_iter = iter_multipart_mime_documents( + wsgi_input, mime_boundary, read_chunk_size) + + for file_like in mime_documents_iter: + hdrs = HeaderKeyDict(rfc822.Message(file_like, 0)) + yield (hdrs, file_like) + + +def drain(file_like, read_size, timeout): + """ + Read and discard any bytes from file_like. + + :param file_like: file-like object to read from + :param read_size: how big a chunk to read at a time + :param timeout: how long to wait for a read (use None for no timeout) + + :raises ChunkReadTimeout: if no chunk was read in time + """ + + while True: + with ChunkReadTimeout(timeout): + chunk = file_like.read(read_size) + if not chunk: + break class EventletPlungerString(str): @@ -142,7 +171,7 @@ class ObjectController(BaseStorageServer): # Common on-disk hierarchy shared across account, container and object # servers. - self._diskfile_mgr = DiskFileManager(conf, self.logger) + self._diskfile_router = DiskFileRouter(conf, self.logger) # This is populated by global_conf_callback way below as the semaphore # is shared by all workers. if 'replication_semaphore' in conf: @@ -156,7 +185,7 @@ class ObjectController(BaseStorageServer): conf.get('replication_failure_ratio') or 1.0) def get_diskfile(self, device, partition, account, container, obj, - policy_idx, **kwargs): + policy, **kwargs): """ Utility method for instantiating a DiskFile object supporting a given REST API. @@ -165,11 +194,11 @@ class ObjectController(BaseStorageServer): DiskFile class would simply over-ride this method to provide that behavior. """ - return self._diskfile_mgr.get_diskfile( - device, partition, account, container, obj, policy_idx, **kwargs) + return self._diskfile_router[policy].get_diskfile( + device, partition, account, container, obj, policy, **kwargs) def async_update(self, op, account, container, obj, host, partition, - contdevice, headers_out, objdevice, policy_index): + contdevice, headers_out, objdevice, policy): """ Sends or saves an async update. @@ -183,7 +212,7 @@ class ObjectController(BaseStorageServer): :param headers_out: dictionary of headers to send in the container request :param objdevice: device name that the object is in - :param policy_index: the associated storage policy index + :param policy: the associated BaseStoragePolicy instance """ headers_out['user-agent'] = 'object-server %s' % os.getpid() full_path = '/%s/%s/%s' % (account, container, obj) @@ -213,12 +242,11 @@ class ObjectController(BaseStorageServer): data = {'op': op, 'account': account, 'container': container, 'obj': obj, 'headers': headers_out} timestamp = headers_out['x-timestamp'] - self._diskfile_mgr.pickle_async_update(objdevice, account, container, - obj, data, timestamp, - policy_index) + self._diskfile_router[policy].pickle_async_update( + objdevice, account, container, obj, data, timestamp, policy) def container_update(self, op, account, container, obj, request, - headers_out, objdevice, policy_idx): + headers_out, objdevice, policy): """ Update the container when objects are updated. @@ -230,6 +258,7 @@ class ObjectController(BaseStorageServer): :param headers_out: dictionary of headers to send in the container request(s) :param objdevice: device name that the object is in + :param policy: the BaseStoragePolicy instance """ headers_in = request.headers conthosts = [h.strip() for h in @@ -255,14 +284,14 @@ class ObjectController(BaseStorageServer): headers_out['x-trans-id'] = headers_in.get('x-trans-id', '-') headers_out['referer'] = request.as_referer() - headers_out['X-Backend-Storage-Policy-Index'] = policy_idx + headers_out['X-Backend-Storage-Policy-Index'] = int(policy) for conthost, contdevice in updates: self.async_update(op, account, container, obj, conthost, contpartition, contdevice, headers_out, - objdevice, policy_idx) + objdevice, policy) def delete_at_update(self, op, delete_at, account, container, obj, - request, objdevice, policy_index): + request, objdevice, policy): """ Update the expiring objects container when objects are updated. @@ -273,7 +302,7 @@ class ObjectController(BaseStorageServer): :param obj: object name :param request: the original request driving the update :param objdevice: device name that the object is in - :param policy_index: the policy index to be used for tmp dir + :param policy: the BaseStoragePolicy instance (used for tmp dir) """ if config_true_value( request.headers.get('x-backend-replication', 'f')): @@ -333,13 +362,66 @@ class ObjectController(BaseStorageServer): op, self.expiring_objects_account, delete_at_container, '%s-%s/%s/%s' % (delete_at, account, container, obj), host, partition, contdevice, headers_out, objdevice, - policy_index) + policy) + + def _make_timeout_reader(self, file_like): + def timeout_reader(): + with ChunkReadTimeout(self.client_timeout): + return file_like.read(self.network_chunk_size) + return timeout_reader + + def _read_put_commit_message(self, mime_documents_iter): + rcvd_commit = False + try: + with ChunkReadTimeout(self.client_timeout): + commit_hdrs, commit_iter = next(mime_documents_iter) + if commit_hdrs.get('X-Document', None) == "put commit": + rcvd_commit = True + drain(commit_iter, self.network_chunk_size, self.client_timeout) + except ChunkReadTimeout: + raise HTTPClientDisconnect() + except StopIteration: + raise HTTPBadRequest(body="couldn't find PUT commit MIME doc") + return rcvd_commit + + def _read_metadata_footer(self, mime_documents_iter): + try: + with ChunkReadTimeout(self.client_timeout): + footer_hdrs, footer_iter = next(mime_documents_iter) + except ChunkReadTimeout: + raise HTTPClientDisconnect() + except StopIteration: + raise HTTPBadRequest(body="couldn't find footer MIME doc") + + timeout_reader = self._make_timeout_reader(footer_iter) + try: + footer_body = ''.join(iter(timeout_reader, '')) + except ChunkReadTimeout: + raise HTTPClientDisconnect() + + footer_md5 = footer_hdrs.get('Content-MD5') + if not footer_md5: + raise HTTPBadRequest(body="no Content-MD5 in footer") + if footer_md5 != md5(footer_body).hexdigest(): + raise HTTPUnprocessableEntity(body="footer MD5 mismatch") + + try: + return HeaderKeyDict(json.loads(footer_body)) + except ValueError: + raise HTTPBadRequest("invalid JSON for footer doc") + + def _check_container_override(self, update_headers, metadata): + for key, val in metadata.iteritems(): + override_prefix = 'x-backend-container-update-override-' + if key.lower().startswith(override_prefix): + override = key.lower().replace(override_prefix, 'x-') + update_headers[override] = val @public @timing_stats() def POST(self, request): """Handle HTTP POST requests for the Swift Object Server.""" - device, partition, account, container, obj, policy_idx = \ + device, partition, account, container, obj, policy = \ get_name_and_placement(request, 5, 5, True) req_timestamp = valid_timestamp(request) new_delete_at = int(request.headers.get('X-Delete-At') or 0) @@ -349,7 +431,7 @@ class ObjectController(BaseStorageServer): try: disk_file = self.get_diskfile( device, partition, account, container, obj, - policy_idx=policy_idx) + policy=policy) except DiskFileDeviceUnavailable: return HTTPInsufficientStorage(drive=device, request=request) try: @@ -374,11 +456,11 @@ class ObjectController(BaseStorageServer): if orig_delete_at != new_delete_at: if new_delete_at: self.delete_at_update('PUT', new_delete_at, account, container, - obj, request, device, policy_idx) + obj, request, device, policy) if orig_delete_at: self.delete_at_update('DELETE', orig_delete_at, account, container, obj, request, device, - policy_idx) + policy) try: disk_file.write_metadata(metadata) except (DiskFileXattrNotSupported, DiskFileNoSpace): @@ -389,7 +471,7 @@ class ObjectController(BaseStorageServer): @timing_stats() def PUT(self, request): """Handle HTTP PUT requests for the Swift Object Server.""" - device, partition, account, container, obj, policy_idx = \ + device, partition, account, container, obj, policy = \ get_name_and_placement(request, 5, 5, True) req_timestamp = valid_timestamp(request) error_response = check_object_creation(request, obj) @@ -404,10 +486,22 @@ class ObjectController(BaseStorageServer): except ValueError as e: return HTTPBadRequest(body=str(e), request=request, content_type='text/plain') + + # In case of multipart-MIME put, the proxy sends a chunked request, + # but may let us know the real content length so we can verify that + # we have enough disk space to hold the object. + if fsize is None: + fsize = request.headers.get('X-Backend-Obj-Content-Length') + if fsize is not None: + try: + fsize = int(fsize) + except ValueError as e: + return HTTPBadRequest(body=str(e), request=request, + content_type='text/plain') try: disk_file = self.get_diskfile( device, partition, account, container, obj, - policy_idx=policy_idx) + policy=policy) except DiskFileDeviceUnavailable: return HTTPInsufficientStorage(drive=device, request=request) try: @@ -439,13 +533,51 @@ class ObjectController(BaseStorageServer): with disk_file.create(size=fsize) as writer: upload_size = 0 - def timeout_reader(): - with ChunkReadTimeout(self.client_timeout): - return request.environ['wsgi.input'].read( - self.network_chunk_size) + # If the proxy wants to send us object metadata after the + # object body, it sets some headers. We have to tell the + # proxy, in the 100 Continue response, that we're able to + # parse a multipart MIME document and extract the object and + # metadata from it. If we don't, then the proxy won't + # actually send the footer metadata. + have_metadata_footer = False + use_multiphase_commit = False + mime_documents_iter = iter([]) + obj_input = request.environ['wsgi.input'] + hundred_continue_headers = [] + if config_true_value( + request.headers.get( + 'X-Backend-Obj-Multiphase-Commit')): + use_multiphase_commit = True + hundred_continue_headers.append( + ('X-Obj-Multiphase-Commit', 'yes')) + + if config_true_value( + request.headers.get('X-Backend-Obj-Metadata-Footer')): + have_metadata_footer = True + hundred_continue_headers.append( + ('X-Obj-Metadata-Footer', 'yes')) + + if have_metadata_footer or use_multiphase_commit: + obj_input.set_hundred_continue_response_headers( + hundred_continue_headers) + mime_boundary = request.headers.get( + 'X-Backend-Obj-Multipart-Mime-Boundary') + if not mime_boundary: + return HTTPBadRequest("no MIME boundary") + + try: + with ChunkReadTimeout(self.client_timeout): + mime_documents_iter = iter_mime_headers_and_bodies( + request.environ['wsgi.input'], + mime_boundary, self.network_chunk_size) + _junk_hdrs, obj_input = next(mime_documents_iter) + except ChunkReadTimeout: + return HTTPRequestTimeout(request=request) + + timeout_reader = self._make_timeout_reader(obj_input) try: - for chunk in iter(lambda: timeout_reader(), ''): + for chunk in iter(timeout_reader, ''): start_time = time.time() if start_time > upload_expiration: self.logger.increment('PUT.timeouts') @@ -461,9 +593,16 @@ class ObjectController(BaseStorageServer): upload_size) if fsize is not None and fsize != upload_size: return HTTPClientDisconnect(request=request) + + footer_meta = {} + if have_metadata_footer: + footer_meta = self._read_metadata_footer( + mime_documents_iter) + + request_etag = (footer_meta.get('etag') or + request.headers.get('etag', '')).lower() etag = etag.hexdigest() - if 'etag' in request.headers and \ - request.headers['etag'].lower() != etag: + if request_etag and request_etag != etag: return HTTPUnprocessableEntity(request=request) metadata = { 'X-Timestamp': request.timestamp.internal, @@ -473,6 +612,8 @@ class ObjectController(BaseStorageServer): } metadata.update(val for val in request.headers.iteritems() if is_sys_or_user_meta('object', val[0])) + metadata.update(val for val in footer_meta.iteritems() + if is_sys_or_user_meta('object', val[0])) headers_to_copy = ( request.headers.get( 'X-Backend-Replication-Headers', '').split() + @@ -482,39 +623,63 @@ class ObjectController(BaseStorageServer): header_caps = header_key.title() metadata[header_caps] = request.headers[header_key] writer.put(metadata) + + # if the PUT requires a two-phase commit (a data and a commit + # phase) send the proxy server another 100-continue response + # to indicate that we are finished writing object data + if use_multiphase_commit: + request.environ['wsgi.input'].\ + send_hundred_continue_response() + if not self._read_put_commit_message(mime_documents_iter): + return HTTPServerError(request=request) + # got 2nd phase confirmation, write a timestamp.durable + # state file to indicate a successful PUT + + writer.commit(request.timestamp) + + # Drain any remaining MIME docs from the socket. There + # shouldn't be any, but we must read the whole request body. + try: + while True: + with ChunkReadTimeout(self.client_timeout): + _junk_hdrs, _junk_body = next(mime_documents_iter) + drain(_junk_body, self.network_chunk_size, + self.client_timeout) + except ChunkReadTimeout: + raise HTTPClientDisconnect() + except StopIteration: + pass + except (DiskFileXattrNotSupported, DiskFileNoSpace): return HTTPInsufficientStorage(drive=device, request=request) if orig_delete_at != new_delete_at: if new_delete_at: self.delete_at_update( 'PUT', new_delete_at, account, container, obj, request, - device, policy_idx) + device, policy) if orig_delete_at: self.delete_at_update( 'DELETE', orig_delete_at, account, container, obj, - request, device, policy_idx) + request, device, policy) update_headers = HeaderKeyDict({ 'x-size': metadata['Content-Length'], 'x-content-type': metadata['Content-Type'], 'x-timestamp': metadata['X-Timestamp'], 'x-etag': metadata['ETag']}) # apply any container update header overrides sent with request - for key, val in request.headers.iteritems(): - override_prefix = 'x-backend-container-update-override-' - if key.lower().startswith(override_prefix): - override = key.lower().replace(override_prefix, 'x-') - update_headers[override] = val + self._check_container_override(update_headers, request.headers) + self._check_container_override(update_headers, footer_meta) self.container_update( 'PUT', account, container, obj, request, update_headers, - device, policy_idx) + device, policy) return HTTPCreated(request=request, etag=etag) @public @timing_stats() def GET(self, request): """Handle HTTP GET requests for the Swift Object Server.""" - device, partition, account, container, obj, policy_idx = \ + device, partition, account, container, obj, policy = \ get_name_and_placement(request, 5, 5, True) keep_cache = self.keep_cache_private or ( 'X-Auth-Token' not in request.headers and @@ -522,7 +687,7 @@ class ObjectController(BaseStorageServer): try: disk_file = self.get_diskfile( device, partition, account, container, obj, - policy_idx=policy_idx) + policy=policy) except DiskFileDeviceUnavailable: return HTTPInsufficientStorage(drive=device, request=request) try: @@ -533,9 +698,14 @@ class ObjectController(BaseStorageServer): keep_cache = (self.keep_cache_private or ('X-Auth-Token' not in request.headers and 'X-Storage-Token' not in request.headers)) + conditional_etag = None + if 'X-Backend-Etag-Is-At' in request.headers: + conditional_etag = metadata.get( + request.headers['X-Backend-Etag-Is-At']) response = Response( app_iter=disk_file.reader(keep_cache=keep_cache), - request=request, conditional_response=True) + request=request, conditional_response=True, + conditional_etag=conditional_etag) response.headers['Content-Type'] = metadata.get( 'Content-Type', 'application/octet-stream') for key, value in metadata.iteritems(): @@ -567,12 +737,12 @@ class ObjectController(BaseStorageServer): @timing_stats(sample_rate=0.8) def HEAD(self, request): """Handle HTTP HEAD requests for the Swift Object Server.""" - device, partition, account, container, obj, policy_idx = \ + device, partition, account, container, obj, policy = \ get_name_and_placement(request, 5, 5, True) try: disk_file = self.get_diskfile( device, partition, account, container, obj, - policy_idx=policy_idx) + policy=policy) except DiskFileDeviceUnavailable: return HTTPInsufficientStorage(drive=device, request=request) try: @@ -585,7 +755,12 @@ class ObjectController(BaseStorageServer): headers['X-Backend-Timestamp'] = e.timestamp.internal return HTTPNotFound(request=request, headers=headers, conditional_response=True) - response = Response(request=request, conditional_response=True) + conditional_etag = None + if 'X-Backend-Etag-Is-At' in request.headers: + conditional_etag = metadata.get( + request.headers['X-Backend-Etag-Is-At']) + response = Response(request=request, conditional_response=True, + conditional_etag=conditional_etag) response.headers['Content-Type'] = metadata.get( 'Content-Type', 'application/octet-stream') for key, value in metadata.iteritems(): @@ -609,13 +784,13 @@ class ObjectController(BaseStorageServer): @timing_stats() def DELETE(self, request): """Handle HTTP DELETE requests for the Swift Object Server.""" - device, partition, account, container, obj, policy_idx = \ + device, partition, account, container, obj, policy = \ get_name_and_placement(request, 5, 5, True) req_timestamp = valid_timestamp(request) try: disk_file = self.get_diskfile( device, partition, account, container, obj, - policy_idx=policy_idx) + policy=policy) except DiskFileDeviceUnavailable: return HTTPInsufficientStorage(drive=device, request=request) try: @@ -667,13 +842,13 @@ class ObjectController(BaseStorageServer): if orig_delete_at: self.delete_at_update('DELETE', orig_delete_at, account, container, obj, request, device, - policy_idx) + policy) if orig_timestamp < req_timestamp: disk_file.delete(req_timestamp) self.container_update( 'DELETE', account, container, obj, request, HeaderKeyDict({'x-timestamp': req_timestamp.internal}), - device, policy_idx) + device, policy) return response_class( request=request, headers={'X-Backend-Timestamp': response_timestamp.internal}) @@ -694,7 +869,7 @@ class ObjectController(BaseStorageServer): get_name_and_placement(request, 2, 3, True) suffixes = suffix_parts.split('-') if suffix_parts else [] try: - hashes = self._diskfile_mgr.get_hashes( + hashes = self._diskfile_router[policy].get_hashes( device, partition, suffixes, policy) except DiskFileDeviceUnavailable: resp = HTTPInsufficientStorage(drive=device, request=request) diff --git a/swift/obj/ssync_receiver.py b/swift/obj/ssync_receiver.py index 248715d006..99495cd48d 100644 --- a/swift/obj/ssync_receiver.py +++ b/swift/obj/ssync_receiver.py @@ -24,6 +24,7 @@ from swift.common import exceptions from swift.common import http from swift.common import swob from swift.common import utils +from swift.common import request_helpers class Receiver(object): @@ -98,7 +99,7 @@ class Receiver(object): if not self.app.replication_semaphore.acquire(False): raise swob.HTTPServiceUnavailable() try: - with self.app._diskfile_mgr.replication_lock(self.device): + with self.diskfile_mgr.replication_lock(self.device): for data in self.missing_check(): yield data for data in self.updates(): @@ -166,14 +167,14 @@ class Receiver(object): """ # The following is the setting we talk about above in _ensure_flush. self.request.environ['eventlet.minimum_write_chunk_size'] = 0 - self.device, self.partition = utils.split_path( - urllib.unquote(self.request.path), 2, 2, False) + self.device, self.partition, self.policy = \ + request_helpers.get_name_and_placement(self.request, 2, 2, False) self.policy_idx = \ int(self.request.headers.get('X-Backend-Storage-Policy-Index', 0)) utils.validate_device_partition(self.device, self.partition) - if self.app._diskfile_mgr.mount_check and \ - not constraints.check_mount( - self.app._diskfile_mgr.devices, self.device): + self.diskfile_mgr = self.app._diskfile_router[self.policy] + if self.diskfile_mgr.mount_check and not constraints.check_mount( + self.diskfile_mgr.devices, self.device): raise swob.HTTPInsufficientStorage(drive=self.device) self.fp = self.request.environ['wsgi.input'] for data in self._ensure_flush(): @@ -229,8 +230,8 @@ class Receiver(object): object_hash, timestamp = [urllib.unquote(v) for v in line.split()] want = False try: - df = self.app._diskfile_mgr.get_diskfile_from_hash( - self.device, self.partition, object_hash, self.policy_idx) + df = self.diskfile_mgr.get_diskfile_from_hash( + self.device, self.partition, object_hash, self.policy) except exceptions.DiskFileNotExist: want = True else: diff --git a/swift/proxy/controllers/obj.py b/swift/proxy/controllers/obj.py index 70b0d0cf68..5407c0e734 100644 --- a/swift/proxy/controllers/obj.py +++ b/swift/proxy/controllers/obj.py @@ -882,6 +882,9 @@ class ObjectController(Controller): req, delete_at_container, delete_at_part, \ delete_at_nodes = self._config_obj_expiration(req) + # XXX hack for PUT to EC until the proxy learns how to encode + req.headers['X-Object-Sysmeta-Ec-Archive-Index'] = 0 + # add special headers to be handled by storage nodes outgoing_headers = self._backend_requests( req, len(nodes), container_partition, container_nodes, diff --git a/test/unit/common/test_request_helpers.py b/test/unit/common/test_request_helpers.py index c87a39979b..d2dc02c48b 100644 --- a/test/unit/common/test_request_helpers.py +++ b/test/unit/common/test_request_helpers.py @@ -16,10 +16,13 @@ """Tests for swift.common.request_helpers""" import unittest -from swift.common.swob import Request +from swift.common.swob import Request, HTTPException +from swift.common.storage_policy import POLICIES, EC_POLICY, REPL_POLICY from swift.common.request_helpers import is_sys_meta, is_user_meta, \ is_sys_or_user_meta, strip_sys_meta_prefix, strip_user_meta_prefix, \ - remove_items, copy_header_subset + remove_items, copy_header_subset, get_name_and_placement + +from test.unit import patch_policies server_types = ['account', 'container', 'object'] @@ -81,3 +84,77 @@ class TestRequestHelpers(unittest.TestCase): self.assertEqual(to_req.headers['A'], 'b') self.assertFalse('c' in to_req.headers) self.assertFalse('C' in to_req.headers) + + @patch_policies(with_ec_default=True) + def test_get_name_and_placement_object_req(self): + path = '/device/part/account/container/object' + req = Request.blank(path, headers={ + 'X-Backend-Storage-Policy-Index': '0'}) + device, part, account, container, obj, policy = \ + get_name_and_placement(req, 5, 5, True) + self.assertEqual(device, 'device') + self.assertEqual(part, 'part') + self.assertEqual(account, 'account') + self.assertEqual(container, 'container') + self.assertEqual(obj, 'object') + self.assertEqual(policy, POLICIES[0]) + self.assertEqual(policy.policy_type, EC_POLICY) + + req.headers['X-Backend-Storage-Policy-Index'] = 1 + device, part, account, container, obj, policy = \ + get_name_and_placement(req, 5, 5, True) + self.assertEqual(device, 'device') + self.assertEqual(part, 'part') + self.assertEqual(account, 'account') + self.assertEqual(container, 'container') + self.assertEqual(obj, 'object') + self.assertEqual(policy, POLICIES[1]) + self.assertEqual(policy.policy_type, REPL_POLICY) + + req.headers['X-Backend-Storage-Policy-Index'] = 'foo' + try: + device, part, account, container, obj, policy = \ + get_name_and_placement(req, 5, 5, True) + except HTTPException as e: + self.assertEqual(e.status_int, 503) + self.assertEqual(str(e), '503 Service Unavailable') + self.assertEqual(e.body, "No policy with index foo") + else: + self.fail('get_name_and_placement did not raise error ' + 'for invalid storage policy index') + + @patch_policies(with_ec_default=True) + def test_get_name_and_placement_object_replication(self): + # yup, suffixes are sent '-'.joined in the path + path = '/device/part/012-345-678-9ab-cde' + req = Request.blank(path, headers={ + 'X-Backend-Storage-Policy-Index': '0'}) + device, partition, suffix_parts, policy = \ + get_name_and_placement(req, 2, 3, True) + self.assertEqual(device, 'device') + self.assertEqual(partition, 'part') + self.assertEqual(suffix_parts, '012-345-678-9ab-cde') + self.assertEqual(policy, POLICIES[0]) + self.assertEqual(policy.policy_type, EC_POLICY) + + path = '/device/part' + req = Request.blank(path, headers={ + 'X-Backend-Storage-Policy-Index': '1'}) + device, partition, suffix_parts, policy = \ + get_name_and_placement(req, 2, 3, True) + self.assertEqual(device, 'device') + self.assertEqual(partition, 'part') + self.assertEqual(suffix_parts, None) # false-y + self.assertEqual(policy, POLICIES[1]) + self.assertEqual(policy.policy_type, REPL_POLICY) + + path = '/device/part/' # with a trailing slash + req = Request.blank(path, headers={ + 'X-Backend-Storage-Policy-Index': '1'}) + device, partition, suffix_parts, policy = \ + get_name_and_placement(req, 2, 3, True) + self.assertEqual(device, 'device') + self.assertEqual(partition, 'part') + self.assertEqual(suffix_parts, '') # still false-y + self.assertEqual(policy, POLICIES[1]) + self.assertEqual(policy.policy_type, REPL_POLICY) diff --git a/test/unit/common/test_swob.py b/test/unit/common/test_swob.py index fffb33ecf1..7015abb8eb 100644 --- a/test/unit/common/test_swob.py +++ b/test/unit/common/test_swob.py @@ -1553,6 +1553,17 @@ class TestConditionalIfMatch(unittest.TestCase): self.assertEquals(resp.status_int, 200) self.assertEquals(body, 'hi') + def test_simple_conditional_etag_match(self): + # if etag matches, proceed as normal + req = swift.common.swob.Request.blank( + '/', headers={'If-Match': 'not-the-etag'}) + resp = req.get_response(self.fake_app) + resp.conditional_response = True + resp._conditional_etag = 'not-the-etag' + body = ''.join(resp(req.environ, self.fake_start_response)) + self.assertEquals(resp.status_int, 200) + self.assertEquals(body, 'hi') + def test_quoted_simple_match(self): # double quotes or not, doesn't matter req = swift.common.swob.Request.blank( @@ -1573,6 +1584,16 @@ class TestConditionalIfMatch(unittest.TestCase): self.assertEquals(resp.status_int, 412) self.assertEquals(body, '') + def test_simple_conditional_etag_no_match(self): + req = swift.common.swob.Request.blank( + '/', headers={'If-Match': 'the-etag'}) + resp = req.get_response(self.fake_app) + resp.conditional_response = True + resp._conditional_etag = 'not-the-etag' + body = ''.join(resp(req.environ, self.fake_start_response)) + self.assertEquals(resp.status_int, 412) + self.assertEquals(body, '') + def test_match_star(self): # "*" means match anything; see RFC 2616 section 14.24 req = swift.common.swob.Request.blank( diff --git a/test/unit/obj/test_server.py b/test/unit/obj/test_server.py index ee0d364c92..cfb9fa281c 100755 --- a/test/unit/obj/test_server.py +++ b/test/unit/obj/test_server.py @@ -18,6 +18,7 @@ import cPickle as pickle import datetime +import json import errno import operator import os @@ -39,17 +40,19 @@ from eventlet.green import httplib from nose import SkipTest from swift import __version__ as swift_version +from swift.common.http import is_success from test.unit import FakeLogger, debug_logger, mocked_http_conn from test.unit import connect_tcp, readuntil2crlfs, patch_policies from swift.obj import server as object_server from swift.obj import diskfile -from swift.common import utils, storage_policy, bufferedhttp +from swift.common import utils, bufferedhttp from swift.common.utils import hash_path, mkdirs, normalize_timestamp, \ NullLogger, storage_directory, public, replication from swift.common import constraints -from swift.common.swob import Request, HeaderKeyDict +from swift.common.swob import Request, HeaderKeyDict, WsgiStringIO from swift.common.splice import splice -from swift.common.storage_policy import POLICIES +from swift.common.storage_policy import (StoragePolicy, ECStoragePolicy, + POLICIES, EC_POLICY) from swift.common.exceptions import DiskFileDeviceUnavailable @@ -57,7 +60,14 @@ def mock_time(*args, **kwargs): return 5000.0 -@patch_policies +test_policies = [ + StoragePolicy(0, name='zero', is_default=True), + ECStoragePolicy(1, name='one', ec_type='jerasure_rs_vand', + ec_ndata=10, ec_nparity=4), +] + + +@patch_policies(test_policies) class TestObjectController(unittest.TestCase): """Test swift.obj.server.ObjectController""" @@ -733,6 +743,241 @@ class TestObjectController(unittest.TestCase): 'X-Object-Meta-1': 'One', 'X-Object-Meta-Two': 'Two'}) + def test_PUT_etag_in_footer(self): + timestamp = normalize_timestamp(time()) + req = Request.blank( + '/sda1/p/a/c/o', + headers={'X-Timestamp': timestamp, + 'Content-Type': 'text/plain', + 'Transfer-Encoding': 'chunked', + 'Etag': 'other-etag', + 'X-Backend-Obj-Metadata-Footer': 'yes', + 'X-Backend-Obj-Multipart-Mime-Boundary': 'boundary'}, + environ={'REQUEST_METHOD': 'PUT'}) + + obj_etag = md5("obj data").hexdigest() + footer_meta = json.dumps({"Etag": obj_etag}) + footer_meta_cksum = md5(footer_meta).hexdigest() + + req.body = "\r\n".join(( + "--boundary", + "", + "obj data", + "--boundary", + "Content-MD5: " + footer_meta_cksum, + "", + footer_meta, + "--boundary--", + )) + req.headers.pop("Content-Length", None) + + resp = req.get_response(self.object_controller) + self.assertEqual(resp.etag, obj_etag) + self.assertEqual(resp.status_int, 201) + + objfile = os.path.join( + self.testdir, 'sda1', + storage_directory(diskfile.get_data_dir(POLICIES[0]), 'p', + hash_path('a', 'c', 'o')), + utils.Timestamp(timestamp).internal + '.data') + with open(objfile) as fh: + self.assertEqual(fh.read(), "obj data") + + def test_PUT_etag_in_footer_mismatch(self): + timestamp = normalize_timestamp(time()) + req = Request.blank( + '/sda1/p/a/c/o', + headers={'X-Timestamp': timestamp, + 'Content-Type': 'text/plain', + 'Transfer-Encoding': 'chunked', + 'X-Backend-Obj-Metadata-Footer': 'yes', + 'X-Backend-Obj-Multipart-Mime-Boundary': 'boundary'}, + environ={'REQUEST_METHOD': 'PUT'}) + + footer_meta = json.dumps({"Etag": md5("green").hexdigest()}) + footer_meta_cksum = md5(footer_meta).hexdigest() + + req.body = "\r\n".join(( + "--boundary", + "", + "blue", + "--boundary", + "Content-MD5: " + footer_meta_cksum, + "", + footer_meta, + "--boundary--", + )) + req.headers.pop("Content-Length", None) + + resp = req.get_response(self.object_controller) + self.assertEqual(resp.status_int, 422) + + def test_PUT_meta_in_footer(self): + timestamp = normalize_timestamp(time()) + req = Request.blank( + '/sda1/p/a/c/o', + headers={'X-Timestamp': timestamp, + 'Content-Type': 'text/plain', + 'Transfer-Encoding': 'chunked', + 'X-Object-Meta-X': 'Z', + 'X-Object-Sysmeta-X': 'Z', + 'X-Backend-Obj-Metadata-Footer': 'yes', + 'X-Backend-Obj-Multipart-Mime-Boundary': 'boundary'}, + environ={'REQUEST_METHOD': 'PUT'}) + + footer_meta = json.dumps({ + 'X-Object-Meta-X': 'Y', + 'X-Object-Sysmeta-X': 'Y', + }) + footer_meta_cksum = md5(footer_meta).hexdigest() + + req.body = "\r\n".join(( + "--boundary", + "", + "stuff stuff stuff", + "--boundary", + "Content-MD5: " + footer_meta_cksum, + "", + footer_meta, + "--boundary--", + )) + req.headers.pop("Content-Length", None) + + resp = req.get_response(self.object_controller) + self.assertEqual(resp.status_int, 201) + + timestamp = normalize_timestamp(time()) + req = Request.blank( + '/sda1/p/a/c/o', + headers={'X-Timestamp': timestamp}, + environ={'REQUEST_METHOD': 'HEAD'}) + resp = req.get_response(self.object_controller) + self.assertEqual(resp.headers.get('X-Object-Meta-X'), 'Y') + self.assertEqual(resp.headers.get('X-Object-Sysmeta-X'), 'Y') + + def test_PUT_missing_footer_checksum(self): + timestamp = normalize_timestamp(time()) + req = Request.blank( + '/sda1/p/a/c/o', + headers={'X-Timestamp': timestamp, + 'Content-Type': 'text/plain', + 'Transfer-Encoding': 'chunked', + 'X-Backend-Obj-Metadata-Footer': 'yes', + 'X-Backend-Obj-Multipart-Mime-Boundary': 'boundary'}, + environ={'REQUEST_METHOD': 'PUT'}) + + footer_meta = json.dumps({"Etag": md5("obj data").hexdigest()}) + + req.body = "\r\n".join(( + "--boundary", + "", + "obj data", + "--boundary", + # no Content-MD5 + "", + footer_meta, + "--boundary--", + )) + req.headers.pop("Content-Length", None) + + resp = req.get_response(self.object_controller) + self.assertEqual(resp.status_int, 400) + + def test_PUT_bad_footer_checksum(self): + timestamp = normalize_timestamp(time()) + req = Request.blank( + '/sda1/p/a/c/o', + headers={'X-Timestamp': timestamp, + 'Content-Type': 'text/plain', + 'Transfer-Encoding': 'chunked', + 'X-Backend-Obj-Metadata-Footer': 'yes', + 'X-Backend-Obj-Multipart-Mime-Boundary': 'boundary'}, + environ={'REQUEST_METHOD': 'PUT'}) + + footer_meta = json.dumps({"Etag": md5("obj data").hexdigest()}) + bad_footer_meta_cksum = md5(footer_meta + "bad").hexdigest() + + req.body = "\r\n".join(( + "--boundary", + "", + "obj data", + "--boundary", + "Content-MD5: " + bad_footer_meta_cksum, + "", + footer_meta, + "--boundary--", + )) + req.headers.pop("Content-Length", None) + + resp = req.get_response(self.object_controller) + self.assertEqual(resp.status_int, 422) + + def test_PUT_bad_footer_json(self): + timestamp = normalize_timestamp(time()) + req = Request.blank( + '/sda1/p/a/c/o', + headers={'X-Timestamp': timestamp, + 'Content-Type': 'text/plain', + 'Transfer-Encoding': 'chunked', + 'X-Backend-Obj-Metadata-Footer': 'yes', + 'X-Backend-Obj-Multipart-Mime-Boundary': 'boundary'}, + environ={'REQUEST_METHOD': 'PUT'}) + + footer_meta = "{{{[[{{[{[[{[{[[{{{[{{{{[[{{[{[" + footer_meta_cksum = md5(footer_meta).hexdigest() + + req.body = "\r\n".join(( + "--boundary", + "", + "obj data", + "--boundary", + "Content-MD5: " + footer_meta_cksum, + "", + footer_meta, + "--boundary--", + )) + req.headers.pop("Content-Length", None) + + resp = req.get_response(self.object_controller) + self.assertEqual(resp.status_int, 400) + + def test_PUT_extra_mime_docs_ignored(self): + timestamp = normalize_timestamp(time()) + req = Request.blank( + '/sda1/p/a/c/o', + headers={'X-Timestamp': timestamp, + 'Content-Type': 'text/plain', + 'Transfer-Encoding': 'chunked', + 'X-Backend-Obj-Metadata-Footer': 'yes', + 'X-Backend-Obj-Multipart-Mime-Boundary': 'boundary'}, + environ={'REQUEST_METHOD': 'PUT'}) + + footer_meta = json.dumps({'X-Object-Meta-Mint': 'pepper'}) + footer_meta_cksum = md5(footer_meta).hexdigest() + + req.body = "\r\n".join(( + "--boundary", + "", + "obj data", + "--boundary", + "Content-MD5: " + footer_meta_cksum, + "", + footer_meta, + "--boundary", + "This-Document-Is-Useless: yes", + "", + "blah blah I take up space", + "--boundary--" + )) + req.headers.pop("Content-Length", None) + + resp = req.get_response(self.object_controller) + self.assertEqual(resp.status_int, 201) + + # swob made this into a StringIO for us + wsgi_input = req.environ['wsgi.input'] + self.assertEqual(wsgi_input.tell(), len(wsgi_input.getvalue())) + def test_PUT_user_metadata_no_xattr(self): timestamp = normalize_timestamp(time()) req = Request.blank( @@ -772,7 +1017,7 @@ class TestObjectController(unittest.TestCase): headers={'X-Timestamp': timestamp, 'Content-Type': 'text/plain', 'Content-Length': '6'}) - req.environ['wsgi.input'] = StringIO('VERIFY') + req.environ['wsgi.input'] = WsgiStringIO('VERIFY') resp = req.get_response(self.object_controller) self.assertEquals(resp.status_int, 408) @@ -1021,6 +1266,40 @@ class TestObjectController(unittest.TestCase): finally: object_server.http_connect = old_http_connect + def test_PUT_durable_files(self): + for policy in POLICIES: + timestamp = utils.Timestamp(int(time())).internal + data_file_tail = '.data' + headers = {'X-Timestamp': timestamp, + 'Content-Length': '6', + 'Content-Type': 'application/octet-stream', + 'X-Backend-Storage-Policy-Index': int(policy)} + if policy.policy_type == EC_POLICY: + headers['X-Object-Sysmeta-Ec-Frag-Index'] = '2' + data_file_tail = '#2.data' + req = Request.blank( + '/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'PUT'}, + headers=headers) + req.body = 'VERIFY' + resp = req.get_response(self.object_controller) + + self.assertEquals(resp.status_int, 201) + obj_dir = os.path.join( + self.testdir, 'sda1', + storage_directory(diskfile.get_data_dir(int(policy)), + 'p', hash_path('a', 'c', 'o'))) + data_file = os.path.join(obj_dir, timestamp) + data_file_tail + self.assertTrue(os.path.isfile(data_file), + 'Expected file %r not found in %r for policy %r' + % (data_file, os.listdir(obj_dir), int(policy))) + durable_file = os.path.join(obj_dir, timestamp) + '.durable' + if policy.policy_type == EC_POLICY: + self.assertTrue(os.path.isfile(durable_file)) + self.assertFalse(os.path.getsize(durable_file)) + else: + self.assertFalse(os.path.isfile(durable_file)) + rmtree(obj_dir) + def test_HEAD(self): # Test swift.obj.server.ObjectController.HEAD req = Request.blank('/sda1/p/a/c', environ={'REQUEST_METHOD': 'HEAD'}) @@ -1295,6 +1574,58 @@ class TestObjectController(unittest.TestCase): resp = req.get_response(self.object_controller) self.assertEquals(resp.status_int, 412) + def test_GET_if_match_etag_is_at(self): + headers = { + 'X-Timestamp': utils.Timestamp(time()).internal, + 'Content-Type': 'application/octet-stream', + 'X-Object-Meta-Xtag': 'madeup', + } + req = Request.blank('/sda1/p/a/c/o', method='PUT', + headers=headers) + req.body = 'test' + resp = req.get_response(self.object_controller) + self.assertEquals(resp.status_int, 201) + real_etag = resp.etag + + # match x-backend-etag-is-at + req = Request.blank('/sda1/p/a/c/o', headers={ + 'If-Match': 'madeup', + 'X-Backend-Etag-Is-At': 'X-Object-Meta-Xtag'}) + resp = req.get_response(self.object_controller) + self.assertEqual(resp.status_int, 200) + + # no match x-backend-etag-is-at + req = Request.blank('/sda1/p/a/c/o', headers={ + 'If-Match': real_etag, + 'X-Backend-Etag-Is-At': 'X-Object-Meta-Xtag'}) + resp = req.get_response(self.object_controller) + self.assertEqual(resp.status_int, 412) + + # etag-is-at metadata doesn't exist, default to real etag + req = Request.blank('/sda1/p/a/c/o', headers={ + 'If-Match': real_etag, + 'X-Backend-Etag-Is-At': 'X-Object-Meta-Missing'}) + resp = req.get_response(self.object_controller) + self.assertEqual(resp.status_int, 200) + + # sanity no-match with no etag-is-at + req = Request.blank('/sda1/p/a/c/o', headers={ + 'If-Match': 'madeup'}) + resp = req.get_response(self.object_controller) + self.assertEqual(resp.status_int, 412) + + # sanity match with no etag-is-at + req = Request.blank('/sda1/p/a/c/o', headers={ + 'If-Match': real_etag}) + resp = req.get_response(self.object_controller) + self.assertEqual(resp.status_int, 200) + + # sanity with no if-match + req = Request.blank('/sda1/p/a/c/o', headers={ + 'X-Backend-Etag-Is-At': 'X-Object-Meta-Xtag'}) + resp = req.get_response(self.object_controller) + self.assertEqual(resp.status_int, 200) + def test_HEAD_if_match(self): req = Request.blank('/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'PUT'}, headers={ @@ -2191,7 +2522,7 @@ class TestObjectController(unittest.TestCase): def test_call_bad_request(self): # Test swift.obj.server.ObjectController.__call__ - inbuf = StringIO() + inbuf = WsgiStringIO() errbuf = StringIO() outbuf = StringIO() @@ -2218,7 +2549,7 @@ class TestObjectController(unittest.TestCase): self.assertEquals(outbuf.getvalue()[:4], '400 ') def test_call_not_found(self): - inbuf = StringIO() + inbuf = WsgiStringIO() errbuf = StringIO() outbuf = StringIO() @@ -2245,7 +2576,7 @@ class TestObjectController(unittest.TestCase): self.assertEquals(outbuf.getvalue()[:4], '404 ') def test_call_bad_method(self): - inbuf = StringIO() + inbuf = WsgiStringIO() errbuf = StringIO() outbuf = StringIO() @@ -2281,7 +2612,7 @@ class TestObjectController(unittest.TestCase): with mock.patch("swift.obj.diskfile.hash_path", my_hash_path): with mock.patch("swift.obj.server.check_object_creation", my_check): - inbuf = StringIO() + inbuf = WsgiStringIO() errbuf = StringIO() outbuf = StringIO() @@ -2310,7 +2641,7 @@ class TestObjectController(unittest.TestCase): self.assertEquals(errbuf.getvalue(), '') self.assertEquals(outbuf.getvalue()[:4], '201 ') - inbuf = StringIO() + inbuf = WsgiStringIO() errbuf = StringIO() outbuf = StringIO() @@ -2461,6 +2792,9 @@ class TestObjectController(unittest.TestCase): return ' ' return '' + def set_hundred_continue_response_headers(*a, **kw): + pass + req = Request.blank( '/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'PUT', 'wsgi.input': SlowBody()}, @@ -2490,6 +2824,9 @@ class TestObjectController(unittest.TestCase): return ' ' return '' + def set_hundred_continue_response_headers(*a, **kw): + pass + req = Request.blank( '/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'PUT', 'wsgi.input': ShortBody()}, @@ -2572,10 +2909,13 @@ class TestObjectController(unittest.TestCase): 'user-agent': 'object-server %s' % os.getpid(), 'X-Backend-Storage-Policy-Index': int(policy)}]) - @patch_policies([storage_policy.StoragePolicy(0, 'zero', True), - storage_policy.StoragePolicy(1, 'one'), - storage_policy.StoragePolicy(37, 'fantastico')]) + @patch_policies([StoragePolicy(0, 'zero', True), + StoragePolicy(1, 'one'), + StoragePolicy(37, 'fantastico')]) def test_updating_multiple_delete_at_container_servers(self): + # update router post patch + self.object_controller._diskfile_router = diskfile.DiskFileRouter( + self.conf, self.object_controller.logger) policy = random.choice(list(POLICIES)) self.object_controller.expiring_objects_account = 'exp' self.object_controller.expiring_objects_container_divisor = 60 @@ -2691,10 +3031,13 @@ class TestObjectController(unittest.TestCase): 'X-Backend-Storage-Policy-Index': 0, 'x-trans-id': '-'})}) - @patch_policies([storage_policy.StoragePolicy(0, 'zero', True), - storage_policy.StoragePolicy(1, 'one'), - storage_policy.StoragePolicy(26, 'twice-thirteen')]) + @patch_policies([StoragePolicy(0, 'zero', True), + StoragePolicy(1, 'one'), + StoragePolicy(26, 'twice-thirteen')]) def test_updating_multiple_container_servers(self): + # update router post patch + self.object_controller._diskfile_router = diskfile.DiskFileRouter( + self.conf, self.object_controller.logger) http_connect_args = [] def fake_http_connect(ipaddr, port, device, partition, method, path, @@ -2807,6 +3150,8 @@ class TestObjectController(unittest.TestCase): 'X-Delete-At-Host': '10.0.0.2:6002', 'X-Delete-At-Device': 'sda1', 'X-Backend-Storage-Policy-Index': int(policy)} + if policy.policy_type == EC_POLICY: + headers['X-Object-Sysmeta-Ec-Frag-Index'] = '2' req = Request.blank( '/sda1/p/a/c/o', method='PUT', body='', headers=headers) with mocked_http_conn( @@ -3010,6 +3355,7 @@ class TestObjectController(unittest.TestCase): utils.HASH_PATH_PREFIX = _prefix def test_container_update_no_async_update(self): + policy = random.choice(list(POLICIES)) given_args = [] def fake_async_update(*args): @@ -3020,12 +3366,13 @@ class TestObjectController(unittest.TestCase): '/v1/a/c/o', environ={'REQUEST_METHOD': 'PUT'}, headers={'X-Timestamp': 1, - 'X-Trans-Id': '1234'}) + 'X-Trans-Id': '1234', + 'X-Backend-Storage-Policy-Index': int(policy)}) self.object_controller.container_update( 'PUT', 'a', 'c', 'o', req, { 'x-size': '0', 'x-etag': 'd41d8cd98f00b204e9800998ecf8427e', 'x-content-type': 'text/plain', 'x-timestamp': '1'}, - 'sda1', 0) + 'sda1', policy) self.assertEquals(given_args, []) def test_container_update_success(self): @@ -3107,6 +3454,7 @@ class TestObjectController(unittest.TestCase): 'x-foo': 'bar'})) def test_container_update_async(self): + policy = random.choice(list(POLICIES)) req = Request.blank( '/sda1/0/a/c/o', environ={'REQUEST_METHOD': 'PUT'}, @@ -3115,26 +3463,28 @@ class TestObjectController(unittest.TestCase): 'X-Container-Host': 'chost:cport', 'X-Container-Partition': 'cpartition', 'X-Container-Device': 'cdevice', - 'Content-Type': 'text/plain'}, body='') + 'Content-Type': 'text/plain', + 'X-Object-Sysmeta-Ec-Frag-Index': 0, + 'X-Backend-Storage-Policy-Index': int(policy)}, body='') given_args = [] def fake_pickle_async_update(*args): given_args[:] = args - self.object_controller._diskfile_mgr.pickle_async_update = \ - fake_pickle_async_update + diskfile_mgr = self.object_controller._diskfile_router[policy] + diskfile_mgr.pickle_async_update = fake_pickle_async_update with mocked_http_conn(500) as fake_conn: resp = req.get_response(self.object_controller) self.assertRaises(StopIteration, fake_conn.code_iter.next) self.assertEqual(resp.status_int, 201) self.assertEqual(len(given_args), 7) (objdevice, account, container, obj, data, timestamp, - policy_index) = given_args + policy) = given_args self.assertEqual(objdevice, 'sda1') self.assertEqual(account, 'a') self.assertEqual(container, 'c') self.assertEqual(obj, 'o') self.assertEqual(timestamp, utils.Timestamp(1).internal) - self.assertEqual(policy_index, 0) + self.assertEqual(policy, policy) self.assertEqual(data, { 'headers': HeaderKeyDict({ 'X-Size': '0', @@ -3143,7 +3493,7 @@ class TestObjectController(unittest.TestCase): 'X-Timestamp': utils.Timestamp(1).internal, 'X-Trans-Id': '123', 'Referer': 'PUT http://localhost/sda1/0/a/c/o', - 'X-Backend-Storage-Policy-Index': '0', + 'X-Backend-Storage-Policy-Index': int(policy), 'X-Etag': 'd41d8cd98f00b204e9800998ecf8427e'}), 'obj': 'o', 'account': 'a', @@ -3151,6 +3501,7 @@ class TestObjectController(unittest.TestCase): 'op': 'PUT'}) def test_container_update_bad_args(self): + policy = random.choice(list(POLICIES)) given_args = [] def fake_async_update(*args): @@ -3163,7 +3514,8 @@ class TestObjectController(unittest.TestCase): 'X-Trans-Id': '123', 'X-Container-Host': 'chost,badhost', 'X-Container-Partition': 'cpartition', - 'X-Container-Device': 'cdevice'}) + 'X-Container-Device': 'cdevice', + 'X-Backend-Storage-Policy-Index': int(policy)}) with mock.patch.object(self.object_controller, 'async_update', fake_async_update): self.object_controller.container_update( @@ -3171,7 +3523,7 @@ class TestObjectController(unittest.TestCase): 'x-size': '0', 'x-etag': 'd41d8cd98f00b204e9800998ecf8427e', 'x-content-type': 'text/plain', 'x-timestamp': '1'}, - 'sda1', 0) + 'sda1', policy) self.assertEqual(given_args, []) errors = self.object_controller.logger.get_lines_for_level('error') self.assertEqual(len(errors), 1) @@ -3184,6 +3536,7 @@ class TestObjectController(unittest.TestCase): def test_delete_at_update_on_put(self): # Test how delete_at_update works when issued a delete for old # expiration info after a new put with no new expiration info. + policy = random.choice(list(POLICIES)) given_args = [] def fake_async_update(*args): @@ -3193,11 +3546,12 @@ class TestObjectController(unittest.TestCase): '/v1/a/c/o', environ={'REQUEST_METHOD': 'PUT'}, headers={'X-Timestamp': 1, - 'X-Trans-Id': '123'}) + 'X-Trans-Id': '123', + 'X-Backend-Storage-Policy-Index': int(policy)}) with mock.patch.object(self.object_controller, 'async_update', fake_async_update): self.object_controller.delete_at_update( - 'DELETE', 2, 'a', 'c', 'o', req, 'sda1', 0) + 'DELETE', 2, 'a', 'c', 'o', req, 'sda1', policy) self.assertEquals( given_args, [ 'DELETE', '.expiring_objects', '0000000000', @@ -3207,12 +3561,13 @@ class TestObjectController(unittest.TestCase): 'x-timestamp': utils.Timestamp('1').internal, 'x-trans-id': '123', 'referer': 'PUT http://localhost/v1/a/c/o'}), - 'sda1', 0]) + 'sda1', policy]) def test_delete_at_negative(self): # Test how delete_at_update works when issued a delete for old # expiration info after a new put with no new expiration info. # Test negative is reset to 0 + policy = random.choice(list(POLICIES)) given_args = [] def fake_async_update(*args): @@ -3223,23 +3578,26 @@ class TestObjectController(unittest.TestCase): '/v1/a/c/o', environ={'REQUEST_METHOD': 'PUT'}, headers={'X-Timestamp': 1, - 'X-Trans-Id': '1234'}) + 'X-Trans-Id': '1234', 'X-Backend-Storage-Policy-Index': + int(policy)}) self.object_controller.delete_at_update( - 'DELETE', -2, 'a', 'c', 'o', req, 'sda1', 0) + 'DELETE', -2, 'a', 'c', 'o', req, 'sda1', policy) self.assertEquals(given_args, [ 'DELETE', '.expiring_objects', '0000000000', '0000000000-a/c/o', None, None, None, HeaderKeyDict({ + # the expiring objects account is always 0 'X-Backend-Storage-Policy-Index': 0, 'x-timestamp': utils.Timestamp('1').internal, 'x-trans-id': '1234', 'referer': 'PUT http://localhost/v1/a/c/o'}), - 'sda1', 0]) + 'sda1', policy]) def test_delete_at_cap(self): # Test how delete_at_update works when issued a delete for old # expiration info after a new put with no new expiration info. # Test past cap is reset to cap + policy = random.choice(list(POLICIES)) given_args = [] def fake_async_update(*args): @@ -3250,9 +3608,10 @@ class TestObjectController(unittest.TestCase): '/v1/a/c/o', environ={'REQUEST_METHOD': 'PUT'}, headers={'X-Timestamp': 1, - 'X-Trans-Id': '1234'}) + 'X-Trans-Id': '1234', + 'X-Backend-Storage-Policy-Index': int(policy)}) self.object_controller.delete_at_update( - 'DELETE', 12345678901, 'a', 'c', 'o', req, 'sda1', 0) + 'DELETE', 12345678901, 'a', 'c', 'o', req, 'sda1', policy) expiring_obj_container = given_args.pop(2) expected_exp_cont = utils.get_expirer_container( utils.normalize_delete_at_timestamp(12345678901), @@ -3267,12 +3626,13 @@ class TestObjectController(unittest.TestCase): 'x-timestamp': utils.Timestamp('1').internal, 'x-trans-id': '1234', 'referer': 'PUT http://localhost/v1/a/c/o'}), - 'sda1', 0]) + 'sda1', policy]) def test_delete_at_update_put_with_info(self): # Keep next test, # test_delete_at_update_put_with_info_but_missing_container, in sync # with this one but just missing the X-Delete-At-Container header. + policy = random.choice(list(POLICIES)) given_args = [] def fake_async_update(*args): @@ -3287,14 +3647,16 @@ class TestObjectController(unittest.TestCase): 'X-Delete-At-Container': '0', 'X-Delete-At-Host': '127.0.0.1:1234', 'X-Delete-At-Partition': '3', - 'X-Delete-At-Device': 'sdc1'}) + 'X-Delete-At-Device': 'sdc1', + 'X-Backend-Storage-Policy-Index': int(policy)}) self.object_controller.delete_at_update('PUT', 2, 'a', 'c', 'o', - req, 'sda1', 0) + req, 'sda1', policy) self.assertEquals( given_args, [ 'PUT', '.expiring_objects', '0000000000', '0000000002-a/c/o', '127.0.0.1:1234', '3', 'sdc1', HeaderKeyDict({ + # the .expiring_objects account is always policy-0 'X-Backend-Storage-Policy-Index': 0, 'x-size': '0', 'x-etag': 'd41d8cd98f00b204e9800998ecf8427e', @@ -3302,11 +3664,12 @@ class TestObjectController(unittest.TestCase): 'x-timestamp': utils.Timestamp('1').internal, 'x-trans-id': '1234', 'referer': 'PUT http://localhost/v1/a/c/o'}), - 'sda1', 0]) + 'sda1', policy]) def test_delete_at_update_put_with_info_but_missing_container(self): # Same as previous test, test_delete_at_update_put_with_info, but just # missing the X-Delete-At-Container header. + policy = random.choice(list(POLICIES)) given_args = [] def fake_async_update(*args): @@ -3321,9 +3684,10 @@ class TestObjectController(unittest.TestCase): 'X-Trans-Id': '1234', 'X-Delete-At-Host': '127.0.0.1:1234', 'X-Delete-At-Partition': '3', - 'X-Delete-At-Device': 'sdc1'}) + 'X-Delete-At-Device': 'sdc1', + 'X-Backend-Storage-Policy-Index': int(policy)}) self.object_controller.delete_at_update('PUT', 2, 'a', 'c', 'o', - req, 'sda1', 0) + req, 'sda1', policy) self.assertEquals( self.logger.get_lines_for_level('warning'), ['X-Delete-At-Container header must be specified for expiring ' @@ -3331,6 +3695,7 @@ class TestObjectController(unittest.TestCase): 'to the container name for now.']) def test_delete_at_update_delete(self): + policy = random.choice(list(POLICIES)) given_args = [] def fake_async_update(*args): @@ -3341,9 +3706,10 @@ class TestObjectController(unittest.TestCase): '/v1/a/c/o', environ={'REQUEST_METHOD': 'DELETE'}, headers={'X-Timestamp': 1, - 'X-Trans-Id': '1234'}) + 'X-Trans-Id': '1234', + 'X-Backend-Storage-Policy-Index': int(policy)}) self.object_controller.delete_at_update('DELETE', 2, 'a', 'c', 'o', - req, 'sda1', 0) + req, 'sda1', policy) self.assertEquals( given_args, [ 'DELETE', '.expiring_objects', '0000000000', @@ -3353,11 +3719,12 @@ class TestObjectController(unittest.TestCase): 'x-timestamp': utils.Timestamp('1').internal, 'x-trans-id': '1234', 'referer': 'DELETE http://localhost/v1/a/c/o'}), - 'sda1', 0]) + 'sda1', policy]) def test_delete_backend_replication(self): # If X-Backend-Replication: True delete_at_update should completely # short-circuit. + policy = random.choice(list(POLICIES)) given_args = [] def fake_async_update(*args): @@ -3369,12 +3736,14 @@ class TestObjectController(unittest.TestCase): environ={'REQUEST_METHOD': 'PUT'}, headers={'X-Timestamp': 1, 'X-Trans-Id': '1234', - 'X-Backend-Replication': 'True'}) + 'X-Backend-Replication': 'True', + 'X-Backend-Storage-Policy-Index': int(policy)}) self.object_controller.delete_at_update( - 'DELETE', -2, 'a', 'c', 'o', req, 'sda1', 0) + 'DELETE', -2, 'a', 'c', 'o', req, 'sda1', policy) self.assertEquals(given_args, []) def test_POST_calls_delete_at(self): + policy = random.choice(list(POLICIES)) given_args = [] def fake_delete_at_update(*args): @@ -3386,7 +3755,9 @@ class TestObjectController(unittest.TestCase): '/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'PUT'}, headers={'X-Timestamp': normalize_timestamp(time()), 'Content-Length': '4', - 'Content-Type': 'application/octet-stream'}) + 'Content-Type': 'application/octet-stream', + 'X-Backend-Storage-Policy-Index': int(policy), + 'X-Object-Sysmeta-Ec-Frag-Index': 2}) req.body = 'TEST' resp = req.get_response(self.object_controller) self.assertEquals(resp.status_int, 201) @@ -3397,7 +3768,8 @@ class TestObjectController(unittest.TestCase): '/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'POST'}, headers={'X-Timestamp': normalize_timestamp(time()), - 'Content-Type': 'application/x-test'}) + 'Content-Type': 'application/x-test', + 'X-Backend-Storage-Policy-Index': int(policy)}) resp = req.get_response(self.object_controller) self.assertEquals(resp.status_int, 202) self.assertEquals(given_args, []) @@ -3410,13 +3782,14 @@ class TestObjectController(unittest.TestCase): environ={'REQUEST_METHOD': 'POST'}, headers={'X-Timestamp': timestamp1, 'Content-Type': 'application/x-test', - 'X-Delete-At': delete_at_timestamp1}) + 'X-Delete-At': delete_at_timestamp1, + 'X-Backend-Storage-Policy-Index': int(policy)}) resp = req.get_response(self.object_controller) self.assertEquals(resp.status_int, 202) self.assertEquals( given_args, [ 'PUT', int(delete_at_timestamp1), 'a', 'c', 'o', - given_args[5], 'sda1', 0]) + given_args[5], 'sda1', policy]) while given_args: given_args.pop() @@ -3429,17 +3802,19 @@ class TestObjectController(unittest.TestCase): environ={'REQUEST_METHOD': 'POST'}, headers={'X-Timestamp': timestamp2, 'Content-Type': 'application/x-test', - 'X-Delete-At': delete_at_timestamp2}) + 'X-Delete-At': delete_at_timestamp2, + 'X-Backend-Storage-Policy-Index': int(policy)}) resp = req.get_response(self.object_controller) self.assertEquals(resp.status_int, 202) self.assertEquals( given_args, [ 'PUT', int(delete_at_timestamp2), 'a', 'c', 'o', - given_args[5], 'sda1', 0, + given_args[5], 'sda1', policy, 'DELETE', int(delete_at_timestamp1), 'a', 'c', 'o', - given_args[5], 'sda1', 0]) + given_args[5], 'sda1', policy]) def test_PUT_calls_delete_at(self): + policy = random.choice(list(POLICIES)) given_args = [] def fake_delete_at_update(*args): @@ -3451,7 +3826,9 @@ class TestObjectController(unittest.TestCase): '/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'PUT'}, headers={'X-Timestamp': normalize_timestamp(time()), 'Content-Length': '4', - 'Content-Type': 'application/octet-stream'}) + 'Content-Type': 'application/octet-stream', + 'X-Backend-Storage-Policy-Index': int(policy), + 'X-Object-Sysmeta-Ec-Frag-Index': 4}) req.body = 'TEST' resp = req.get_response(self.object_controller) self.assertEquals(resp.status_int, 201) @@ -3465,14 +3842,16 @@ class TestObjectController(unittest.TestCase): headers={'X-Timestamp': timestamp1, 'Content-Length': '4', 'Content-Type': 'application/octet-stream', - 'X-Delete-At': delete_at_timestamp1}) + 'X-Delete-At': delete_at_timestamp1, + 'X-Backend-Storage-Policy-Index': int(policy), + 'X-Object-Sysmeta-Ec-Frag-Index': 3}) req.body = 'TEST' resp = req.get_response(self.object_controller) self.assertEquals(resp.status_int, 201) self.assertEquals( given_args, [ 'PUT', int(delete_at_timestamp1), 'a', 'c', 'o', - given_args[5], 'sda1', 0]) + given_args[5], 'sda1', policy]) while given_args: given_args.pop() @@ -3486,16 +3865,18 @@ class TestObjectController(unittest.TestCase): headers={'X-Timestamp': timestamp2, 'Content-Length': '4', 'Content-Type': 'application/octet-stream', - 'X-Delete-At': delete_at_timestamp2}) + 'X-Delete-At': delete_at_timestamp2, + 'X-Backend-Storage-Policy-Index': int(policy), + 'X-Object-Sysmeta-Ec-Frag-Index': 3}) req.body = 'TEST' resp = req.get_response(self.object_controller) self.assertEquals(resp.status_int, 201) self.assertEquals( given_args, [ 'PUT', int(delete_at_timestamp2), 'a', 'c', 'o', - given_args[5], 'sda1', 0, + given_args[5], 'sda1', policy, 'DELETE', int(delete_at_timestamp1), 'a', 'c', 'o', - given_args[5], 'sda1', 0]) + given_args[5], 'sda1', policy]) def test_GET_but_expired(self): test_time = time() + 10000 @@ -3917,7 +4298,7 @@ class TestObjectController(unittest.TestCase): self.assertEquals(resp.status_int, 201) self.assertEquals(given_args, [ 'PUT', int(delete_at_timestamp1), 'a', 'c', 'o', - given_args[5], 'sda1', 0]) + given_args[5], 'sda1', POLICIES[0]]) while given_args: given_args.pop() @@ -3933,7 +4314,7 @@ class TestObjectController(unittest.TestCase): self.assertEquals(resp.status_int, 204) self.assertEquals(given_args, [ 'DELETE', int(delete_at_timestamp1), 'a', 'c', 'o', - given_args[5], 'sda1', 0]) + given_args[5], 'sda1', POLICIES[0]]) def test_PUT_delete_at_in_past(self): req = Request.blank( @@ -4132,7 +4513,7 @@ class TestObjectController(unittest.TestCase): def test_correct_allowed_method(self): # Test correct work for allowed method using # swift.obj.server.ObjectController.__call__ - inbuf = StringIO() + inbuf = WsgiStringIO() errbuf = StringIO() outbuf = StringIO() self.object_controller = object_server.app_factory( @@ -4170,7 +4551,7 @@ class TestObjectController(unittest.TestCase): def test_not_allowed_method(self): # Test correct work for NOT allowed method using # swift.obj.server.ObjectController.__call__ - inbuf = StringIO() + inbuf = WsgiStringIO() errbuf = StringIO() outbuf = StringIO() self.object_controller = object_server.ObjectController( @@ -4253,7 +4634,7 @@ class TestObjectController(unittest.TestCase): self.assertEquals(outbuf.getvalue()[:4], '405 ') def test_not_utf8_and_not_logging_requests(self): - inbuf = StringIO() + inbuf = WsgiStringIO() errbuf = StringIO() outbuf = StringIO() self.object_controller = object_server.ObjectController( @@ -4291,7 +4672,7 @@ class TestObjectController(unittest.TestCase): self.assertEqual(self.logger.get_lines_for_level('info'), []) def test__call__returns_500(self): - inbuf = StringIO() + inbuf = WsgiStringIO() errbuf = StringIO() outbuf = StringIO() self.logger = debug_logger('test') @@ -4337,7 +4718,7 @@ class TestObjectController(unittest.TestCase): self.assertEqual(self.logger.get_lines_for_level('info'), []) def test_PUT_slow(self): - inbuf = StringIO() + inbuf = WsgiStringIO() errbuf = StringIO() outbuf = StringIO() self.object_controller = object_server.ObjectController( @@ -4398,9 +4779,12 @@ class TestObjectController(unittest.TestCase): ['1.2.3.4 - - [01/Jan/1970:02:46:41 +0000] "HEAD /sda1/p/a/c/o" ' '404 - "-" "-" "-" 2.0000 "-" 1234 -']) - @patch_policies([storage_policy.StoragePolicy(0, 'zero', True), - storage_policy.StoragePolicy(1, 'one', False)]) + @patch_policies([StoragePolicy(0, 'zero', True), + StoragePolicy(1, 'one', False)]) def test_dynamic_datadir(self): + # update router post patch + self.object_controller._diskfile_router = diskfile.DiskFileRouter( + self.conf, self.object_controller.logger) timestamp = normalize_timestamp(time()) req = Request.blank('/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'PUT'}, headers={'X-Timestamp': timestamp, @@ -4434,8 +4818,50 @@ class TestObjectController(unittest.TestCase): self.assertEquals(resp.status_int, 201) self.assertTrue(os.path.isdir(object_dir)) + def test_storage_policy_index_is_validated(self): + # sanity check that index for existing policy is ok + ts = (utils.Timestamp(t).internal for t in + itertools.count(int(time()))) + methods = ('PUT', 'POST', 'GET', 'HEAD', 'REPLICATE', 'DELETE') + valid_indices = sorted([int(policy) for policy in POLICIES]) + for index in valid_indices: + object_dir = self.testdir + "/sda1/objects" + if index > 0: + object_dir = "%s-%s" % (object_dir, index) + self.assertFalse(os.path.isdir(object_dir)) + for method in methods: + headers = { + 'X-Timestamp': ts.next(), + 'Content-Type': 'application/x-test', + 'X-Backend-Storage-Policy-Index': index} + if POLICIES[index].policy_type == EC_POLICY: + headers['X-Object-Sysmeta-Ec-Frag-Index'] = '2' + req = Request.blank( + '/sda1/p/a/c/o', + environ={'REQUEST_METHOD': method}, + headers=headers) + req.body = 'VERIFY' + resp = req.get_response(self.object_controller) + self.assertTrue(is_success(resp.status_int), + '%s method failed: %r' % (method, resp.status)) -@patch_policies + # index for non-existent policy should return 503 + index = valid_indices[-1] + 1 + for method in methods: + req = Request.blank('/sda1/p/a/c/o', + environ={'REQUEST_METHOD': method}, + headers={ + 'X-Timestamp': ts.next(), + 'Content-Type': 'application/x-test', + 'X-Backend-Storage-Policy-Index': index}) + req.body = 'VERIFY' + object_dir = self.testdir + "/sda1/objects-%s" % index + resp = req.get_response(self.object_controller) + self.assertEquals(resp.status_int, 503) + self.assertFalse(os.path.isdir(object_dir)) + + +@patch_policies(test_policies) class TestObjectServer(unittest.TestCase): def setUp(self): @@ -4447,13 +4873,13 @@ class TestObjectServer(unittest.TestCase): for device in ('sda1', 'sdb1'): os.makedirs(os.path.join(self.devices, device)) - conf = { + self.conf = { 'devices': self.devices, 'swift_dir': self.tempdir, 'mount_check': 'false', } self.logger = debug_logger('test-object-server') - app = object_server.ObjectController(conf, logger=self.logger) + app = object_server.ObjectController(self.conf, logger=self.logger) sock = listen(('127.0.0.1', 0)) self.server = spawn(wsgi.server, sock, app, utils.NullLogger()) self.port = sock.getsockname()[1] @@ -4486,6 +4912,23 @@ class TestObjectServer(unittest.TestCase): resp.read() resp.close() + def test_expect_on_put_footer(self): + test_body = 'test' + headers = { + 'Expect': '100-continue', + 'Content-Length': len(test_body), + 'X-Timestamp': utils.Timestamp(time()).internal, + 'X-Backend-Obj-Metadata-Footer': 'yes', + 'X-Backend-Obj-Multipart-Mime-Boundary': 'boundary123', + } + conn = bufferedhttp.http_connect('127.0.0.1', self.port, 'sda1', '0', + 'PUT', '/a/c/o', headers=headers) + resp = conn.getexpect() + self.assertEqual(resp.status, 100) + headers = HeaderKeyDict(resp.getheaders()) + self.assertEqual(headers['X-Obj-Metadata-Footer'], 'yes') + resp.close() + def test_expect_on_put_conflict(self): test_body = 'test' put_timestamp = utils.Timestamp(time()) @@ -4514,6 +4957,377 @@ class TestObjectServer(unittest.TestCase): resp.read() resp.close() + def test_multiphase_put_no_mime_boundary(self): + test_data = 'obj data' + put_timestamp = utils.Timestamp(time()).internal + headers = { + 'Content-Type': 'text/plain', + 'X-Timestamp': put_timestamp, + 'Transfer-Encoding': 'chunked', + 'Expect': '100-continue', + 'X-Backend-Obj-Content-Length': len(test_data), + 'X-Backend-Obj-Multiphase-Commit': 'yes', + } + conn = bufferedhttp.http_connect('127.0.0.1', self.port, 'sda1', '0', + 'PUT', '/a/c/o', headers=headers) + resp = conn.getexpect() + self.assertEqual(resp.status, 400) + resp.read() + resp.close() + + def test_expect_on_multiphase_put(self): + test_data = 'obj data' + test_doc = "\r\n".join(( + "--boundary123", + "X-Document: object body", + "", + test_data, + "--boundary123", + )) + + put_timestamp = utils.Timestamp(time()).internal + headers = { + 'Content-Type': 'text/plain', + 'X-Timestamp': put_timestamp, + 'Transfer-Encoding': 'chunked', + 'Expect': '100-continue', + 'X-Backend-Obj-Content-Length': len(test_data), + 'X-Backend-Obj-Multipart-Mime-Boundary': 'boundary123', + 'X-Backend-Obj-Multiphase-Commit': 'yes', + } + conn = bufferedhttp.http_connect('127.0.0.1', self.port, 'sda1', '0', + 'PUT', '/a/c/o', headers=headers) + resp = conn.getexpect() + self.assertEqual(resp.status, 100) + headers = HeaderKeyDict(resp.getheaders()) + self.assertEqual(headers['X-Obj-Multiphase-Commit'], 'yes') + + to_send = "%x\r\n%s\r\n0\r\n\r\n" % (len(test_doc), test_doc) + conn.send(to_send) + + # verify 100-continue response to mark end of phase1 + resp = conn.getexpect() + self.assertEqual(resp.status, 100) + resp.close() + + def test_multiphase_put_metadata_footer(self): + # Test 2-phase commit conversation - end of 1st phase marked + # by 100-continue response from the object server, with a + # successful 2nd phase marked by the presence of a .durable + # file along with .data file in the object data directory + test_data = 'obj data' + footer_meta = { + "X-Object-Sysmeta-Ec-Frag-Index": "2", + "Etag": md5(test_data).hexdigest(), + } + footer_json = json.dumps(footer_meta) + footer_meta_cksum = md5(footer_json).hexdigest() + test_doc = "\r\n".join(( + "--boundary123", + "X-Document: object body", + "", + test_data, + "--boundary123", + "X-Document: object metadata", + "Content-MD5: " + footer_meta_cksum, + "", + footer_json, + "--boundary123", + )) + + # phase1 - PUT request with object metadata in footer and + # multiphase commit conversation + put_timestamp = utils.Timestamp(time()).internal + headers = { + 'Content-Type': 'text/plain', + 'X-Timestamp': put_timestamp, + 'Transfer-Encoding': 'chunked', + 'Expect': '100-continue', + 'X-Backend-Storage-Policy-Index': '1', + 'X-Backend-Obj-Content-Length': len(test_data), + 'X-Backend-Obj-Metadata-Footer': 'yes', + 'X-Backend-Obj-Multipart-Mime-Boundary': 'boundary123', + 'X-Backend-Obj-Multiphase-Commit': 'yes', + } + conn = bufferedhttp.http_connect('127.0.0.1', self.port, 'sda1', '0', + 'PUT', '/a/c/o', headers=headers) + resp = conn.getexpect() + self.assertEqual(resp.status, 100) + headers = HeaderKeyDict(resp.getheaders()) + self.assertEqual(headers['X-Obj-Multiphase-Commit'], 'yes') + self.assertEqual(headers['X-Obj-Metadata-Footer'], 'yes') + + to_send = "%x\r\n%s\r\n0\r\n\r\n" % (len(test_doc), test_doc) + conn.send(to_send) + # verify 100-continue response to mark end of phase1 + resp = conn.getexpect() + self.assertEqual(resp.status, 100) + + # send commit confirmation to start phase2 + commit_confirmation_doc = "\r\n".join(( + "X-Document: put commit", + "", + "commit_confirmation", + "--boundary123--", + )) + to_send = "%x\r\n%s\r\n0\r\n\r\n" % \ + (len(commit_confirmation_doc), commit_confirmation_doc) + conn.send(to_send) + + # verify success (2xx) to make end of phase2 + resp = conn.getresponse() + self.assertEqual(resp.status, 201) + resp.read() + resp.close() + + # verify successful object data and durable state file write + obj_basename = os.path.join( + self.devices, 'sda1', + storage_directory(diskfile.get_data_dir(POLICIES[1]), '0', + hash_path('a', 'c', 'o')), + put_timestamp) + obj_datafile = obj_basename + '#2.data' + self.assertTrue(os.path.isfile(obj_datafile)) + obj_durablefile = obj_basename + '.durable' + self.assertTrue(os.path.isfile(obj_durablefile)) + + def test_multiphase_put_no_metadata_footer(self): + # Test 2-phase commit conversation, with no metadata footer + # at the end of object data - end of 1st phase marked + # by 100-continue response from the object server, with a + # successful 2nd phase marked by the presence of a .durable + # file along with .data file in the object data directory + # (No metadata footer case) + test_data = 'obj data' + test_doc = "\r\n".join(( + "--boundary123", + "X-Document: object body", + "", + test_data, + "--boundary123", + )) + + # phase1 - PUT request with multiphase commit conversation + # no object metadata in footer + put_timestamp = utils.Timestamp(time()).internal + headers = { + 'Content-Type': 'text/plain', + 'X-Timestamp': put_timestamp, + 'Transfer-Encoding': 'chunked', + 'Expect': '100-continue', + # normally the frag index gets sent in the MIME footer (which this + # test doesn't have, see `test_multiphase_put_metadata_footer`), + # but the proxy *could* send the frag index in the headers and + # this test verifies that would work. + 'X-Object-Sysmeta-Ec-Frag-Index': '2', + 'X-Backend-Storage-Policy-Index': '1', + 'X-Backend-Obj-Content-Length': len(test_data), + 'X-Backend-Obj-Multipart-Mime-Boundary': 'boundary123', + 'X-Backend-Obj-Multiphase-Commit': 'yes', + } + conn = bufferedhttp.http_connect('127.0.0.1', self.port, 'sda1', '0', + 'PUT', '/a/c/o', headers=headers) + resp = conn.getexpect() + self.assertEqual(resp.status, 100) + headers = HeaderKeyDict(resp.getheaders()) + self.assertEqual(headers['X-Obj-Multiphase-Commit'], 'yes') + + to_send = "%x\r\n%s\r\n0\r\n\r\n" % (len(test_doc), test_doc) + conn.send(to_send) + # verify 100-continue response to mark end of phase1 + resp = conn.getexpect() + self.assertEqual(resp.status, 100) + + # send commit confirmation to start phase2 + commit_confirmation_doc = "\r\n".join(( + "X-Document: put commit", + "", + "commit_confirmation", + "--boundary123--", + )) + to_send = "%x\r\n%s\r\n0\r\n\r\n" % \ + (len(commit_confirmation_doc), commit_confirmation_doc) + conn.send(to_send) + + # verify success (2xx) to make end of phase2 + resp = conn.getresponse() + self.assertEqual(resp.status, 201) + resp.read() + resp.close() + + # verify successful object data and durable state file write + obj_basename = os.path.join( + self.devices, 'sda1', + storage_directory(diskfile.get_data_dir(POLICIES[1]), '0', + hash_path('a', 'c', 'o')), + put_timestamp) + obj_datafile = obj_basename + '#2.data' + self.assertTrue(os.path.isfile(obj_datafile)) + obj_durablefile = obj_basename + '.durable' + self.assertTrue(os.path.isfile(obj_durablefile)) + + def test_multiphase_put_draining(self): + # We want to ensure that we read the whole response body even if + # it's multipart MIME and there's document parts that we don't + # expect or understand. This'll help save our bacon if we ever jam + # more stuff in there. + in_a_timeout = [False] + + # inherit from BaseException so we get a stack trace when the test + # fails instead of just a 500 + class NotInATimeout(BaseException): + pass + + class FakeTimeout(BaseException): + def __enter__(self): + in_a_timeout[0] = True + + def __exit__(self, typ, value, tb): + in_a_timeout[0] = False + + class PickyWsgiStringIO(WsgiStringIO): + def read(self, *a, **kw): + if not in_a_timeout[0]: + raise NotInATimeout() + return WsgiStringIO.read(self, *a, **kw) + + def readline(self, *a, **kw): + if not in_a_timeout[0]: + raise NotInATimeout() + return WsgiStringIO.readline(self, *a, **kw) + + test_data = 'obj data' + footer_meta = { + "X-Object-Sysmeta-Ec-Frag-Index": "7", + "Etag": md5(test_data).hexdigest(), + } + footer_json = json.dumps(footer_meta) + footer_meta_cksum = md5(footer_json).hexdigest() + test_doc = "\r\n".join(( + "--boundary123", + "X-Document: object body", + "", + test_data, + "--boundary123", + "X-Document: object metadata", + "Content-MD5: " + footer_meta_cksum, + "", + footer_json, + "--boundary123", + "X-Document: we got cleverer", + "", + "stuff stuff meaningless stuuuuuuuuuuff", + "--boundary123", + "X-Document: we got even cleverer; can you believe it?", + "Waneshaft: ambifacient lunar", + "Casing: malleable logarithmic", + "", + "potato potato potato potato potato potato potato", + "--boundary123--" + )) + + # phase1 - PUT request with object metadata in footer and + # multiphase commit conversation + put_timestamp = utils.Timestamp(time()).internal + headers = { + 'Content-Type': 'text/plain', + 'X-Timestamp': put_timestamp, + 'Transfer-Encoding': 'chunked', + 'Expect': '100-continue', + 'X-Backend-Storage-Policy-Index': '1', + 'X-Backend-Obj-Content-Length': len(test_data), + 'X-Backend-Obj-Metadata-Footer': 'yes', + 'X-Backend-Obj-Multipart-Mime-Boundary': 'boundary123', + } + wsgi_input = PickyWsgiStringIO(test_doc) + req = Request.blank( + "/sda1/0/a/c/o", + environ={'REQUEST_METHOD': 'PUT', 'wsgi.input': wsgi_input}, + headers=headers) + + app = object_server.ObjectController(self.conf, logger=self.logger) + with mock.patch('swift.obj.server.ChunkReadTimeout', FakeTimeout): + resp = req.get_response(app) + self.assertEqual(resp.status_int, 201) # sanity check + + in_a_timeout[0] = True # so we can check without an exception + self.assertEqual(wsgi_input.read(), '') # we read all the bytes + + def test_multiphase_put_bad_commit_message(self): + # Test 2-phase commit conversation - end of 1st phase marked + # by 100-continue response from the object server, with 2nd + # phase commit confirmation being received corrupt + test_data = 'obj data' + footer_meta = { + "X-Object-Sysmeta-Ec-Frag-Index": "7", + "Etag": md5(test_data).hexdigest(), + } + footer_json = json.dumps(footer_meta) + footer_meta_cksum = md5(footer_json).hexdigest() + test_doc = "\r\n".join(( + "--boundary123", + "X-Document: object body", + "", + test_data, + "--boundary123", + "X-Document: object metadata", + "Content-MD5: " + footer_meta_cksum, + "", + footer_json, + "--boundary123", + )) + + # phase1 - PUT request with object metadata in footer and + # multiphase commit conversation + put_timestamp = utils.Timestamp(time()).internal + headers = { + 'Content-Type': 'text/plain', + 'X-Timestamp': put_timestamp, + 'Transfer-Encoding': 'chunked', + 'Expect': '100-continue', + 'X-Backend-Storage-Policy-Index': '1', + 'X-Backend-Obj-Content-Length': len(test_data), + 'X-Backend-Obj-Metadata-Footer': 'yes', + 'X-Backend-Obj-Multipart-Mime-Boundary': 'boundary123', + 'X-Backend-Obj-Multiphase-Commit': 'yes', + } + conn = bufferedhttp.http_connect('127.0.0.1', self.port, 'sda1', '0', + 'PUT', '/a/c/o', headers=headers) + resp = conn.getexpect() + self.assertEqual(resp.status, 100) + headers = HeaderKeyDict(resp.getheaders()) + self.assertEqual(headers['X-Obj-Multiphase-Commit'], 'yes') + self.assertEqual(headers['X-Obj-Metadata-Footer'], 'yes') + + to_send = "%x\r\n%s\r\n0\r\n\r\n" % (len(test_doc), test_doc) + conn.send(to_send) + # verify 100-continue response to mark end of phase1 + resp = conn.getexpect() + self.assertEqual(resp.status, 100) + + # send commit confirmation to start phase2 + commit_confirmation_doc = "\r\n".join(( + "junkjunk", + "--boundary123--", + )) + to_send = "%x\r\n%s\r\n0\r\n\r\n" % \ + (len(commit_confirmation_doc), commit_confirmation_doc) + conn.send(to_send) + resp = conn.getresponse() + self.assertEqual(resp.status, 500) + resp.read() + resp.close() + # verify that durable file was NOT created + obj_basename = os.path.join( + self.devices, 'sda1', + storage_directory(diskfile.get_data_dir(1), '0', + hash_path('a', 'c', 'o')), + put_timestamp) + obj_datafile = obj_basename + '#7.data' + self.assertTrue(os.path.isfile(obj_datafile)) + obj_durablefile = obj_basename + '.durable' + self.assertFalse(os.path.isfile(obj_durablefile)) + @patch_policies class TestZeroCopy(unittest.TestCase): diff --git a/test/unit/obj/test_ssync_receiver.py b/test/unit/obj/test_ssync_receiver.py index 9af76185b1..002a08a72c 100644 --- a/test/unit/obj/test_ssync_receiver.py +++ b/test/unit/obj/test_ssync_receiver.py @@ -27,6 +27,7 @@ from swift.common import constraints from swift.common import exceptions from swift.common import swob from swift.common import utils +from swift.common.storage_policy import POLICIES from swift.obj import diskfile from swift.obj import server from swift.obj import ssync_receiver @@ -34,6 +35,7 @@ from swift.obj import ssync_receiver from test import unit +@unit.patch_policies() class TestReceiver(unittest.TestCase): def setUp(self): @@ -46,12 +48,12 @@ class TestReceiver(unittest.TestCase): self.testdir = os.path.join( tempfile.mkdtemp(), 'tmp_test_ssync_receiver') utils.mkdirs(os.path.join(self.testdir, 'sda1', 'tmp')) - conf = { + self.conf = { 'devices': self.testdir, 'mount_check': 'false', 'replication_one_per_device': 'false', 'log_requests': 'false'} - self.controller = server.ObjectController(conf) + self.controller = server.ObjectController(self.conf) self.controller.bytes_per_sync = 1 self.account1 = 'a' @@ -111,8 +113,8 @@ class TestReceiver(unittest.TestCase): def test_REPLICATION_calls_replication_lock(self): with mock.patch.object( - self.controller._diskfile_mgr, 'replication_lock') as \ - mocked_replication_lock: + self.controller._diskfile_router[POLICIES.legacy], + 'replication_lock') as mocked_replication_lock: req = swob.Request.blank( '/sda1/1', environ={'REQUEST_METHOD': 'REPLICATION'}, @@ -140,9 +142,8 @@ class TestReceiver(unittest.TestCase): body_lines, [':MISSING_CHECK: START', ':MISSING_CHECK: END', ':UPDATES: START', ':UPDATES: END']) - self.assertEqual(rcvr.policy_idx, 0) + self.assertEqual(rcvr.policy, POLICIES[0]) - @unit.patch_policies() def test_Receiver_with_storage_policy_index_header(self): req = swob.Request.blank( '/sda1/1', @@ -157,15 +158,30 @@ class TestReceiver(unittest.TestCase): body_lines, [':MISSING_CHECK: START', ':MISSING_CHECK: END', ':UPDATES: START', ':UPDATES: END']) - self.assertEqual(rcvr.policy_idx, 1) + self.assertEqual(rcvr.policy, POLICIES[1]) + + def test_Receiver_with_bad_storage_policy_index_header(self): + valid_indices = sorted([int(policy) for policy in POLICIES]) + bad_index = valid_indices[-1] + 1 + req = swob.Request.blank( + '/sda1/1', + environ={'REQUEST_METHOD': 'SSYNC', + 'HTTP_X_BACKEND_STORAGE_POLICY_INDEX': bad_index}, + body=':MISSING_CHECK: START\r\n' + ':MISSING_CHECK: END\r\n' + ':UPDATES: START\r\n:UPDATES: END\r\n') + self.controller.logger = mock.MagicMock() + receiver = ssync_receiver.Receiver(self.controller, req) + body_lines = [chunk.strip() for chunk in receiver() if chunk.strip()] + self.assertEqual(body_lines, [":ERROR: 503 'No policy with index 2'"]) def test_REPLICATION_replication_lock_fail(self): def _mock(path): with exceptions.ReplicationLockTimeout(0.01, '/somewhere/' + path): eventlet.sleep(0.05) with mock.patch.object( - self.controller._diskfile_mgr, 'replication_lock', _mock): - self.controller._diskfile_mgr + self.controller._diskfile_router[POLICIES.legacy], + 'replication_lock', _mock): self.controller.logger = mock.MagicMock() req = swob.Request.blank( '/sda1/1', @@ -190,7 +206,7 @@ class TestReceiver(unittest.TestCase): resp = req.get_response(self.controller) self.assertEqual( self.body_lines(resp.body), - [":ERROR: 0 'Invalid path: /device'"]) + [":ERROR: 400 'Invalid path: /device'"]) self.assertEqual(resp.status_int, 200) self.assertFalse(mocked_replication_semaphore.acquire.called) self.assertFalse(mocked_replication_semaphore.release.called) @@ -203,7 +219,7 @@ class TestReceiver(unittest.TestCase): resp = req.get_response(self.controller) self.assertEqual( self.body_lines(resp.body), - [":ERROR: 0 'Invalid path: /device/'"]) + [":ERROR: 400 'Invalid path: /device/'"]) self.assertEqual(resp.status_int, 200) self.assertFalse(mocked_replication_semaphore.acquire.called) self.assertFalse(mocked_replication_semaphore.release.called) @@ -230,7 +246,7 @@ class TestReceiver(unittest.TestCase): resp = req.get_response(self.controller) self.assertEqual( self.body_lines(resp.body), - [":ERROR: 0 'Invalid path: /device/partition/junk'"]) + [":ERROR: 400 'Invalid path: /device/partition/junk'"]) self.assertEqual(resp.status_int, 200) self.assertFalse(mocked_replication_semaphore.acquire.called) self.assertFalse(mocked_replication_semaphore.release.called) @@ -240,7 +256,8 @@ class TestReceiver(unittest.TestCase): mock.patch.object( self.controller, 'replication_semaphore'), mock.patch.object( - self.controller._diskfile_mgr, 'mount_check', False), + self.controller._diskfile_router[POLICIES.legacy], + 'mount_check', False), mock.patch.object( constraints, 'check_mount', return_value=False)) as ( mocked_replication_semaphore, @@ -259,7 +276,8 @@ class TestReceiver(unittest.TestCase): mock.patch.object( self.controller, 'replication_semaphore'), mock.patch.object( - self.controller._diskfile_mgr, 'mount_check', True), + self.controller._diskfile_router[POLICIES.legacy], + 'mount_check', True), mock.patch.object( constraints, 'check_mount', return_value=False)) as ( mocked_replication_semaphore, @@ -275,7 +293,8 @@ class TestReceiver(unittest.TestCase): "device

'"]) self.assertEqual(resp.status_int, 200) mocked_check_mount.assert_called_once_with( - self.controller._diskfile_mgr.devices, 'device') + self.controller._diskfile_router[POLICIES.legacy].devices, + 'device') mocked_check_mount.reset_mock() mocked_check_mount.return_value = True @@ -287,7 +306,8 @@ class TestReceiver(unittest.TestCase): [':ERROR: 0 "Looking for :MISSING_CHECK: START got \'\'"']) self.assertEqual(resp.status_int, 200) mocked_check_mount.assert_called_once_with( - self.controller._diskfile_mgr.devices, 'device') + self.controller._diskfile_router[POLICIES.legacy].devices, + 'device') def test_REPLICATION_Exception(self): @@ -486,7 +506,8 @@ class TestReceiver(unittest.TestCase): def test_MISSING_CHECK_have_one_exact(self): object_dir = utils.storage_directory( - os.path.join(self.testdir, 'sda1', diskfile.get_data_dir(0)), + os.path.join(self.testdir, 'sda1', + diskfile.get_data_dir(POLICIES[0])), '1', self.hash1) utils.mkdirs(object_dir) fp = open(os.path.join(object_dir, self.ts1 + '.data'), 'w+') @@ -515,10 +536,10 @@ class TestReceiver(unittest.TestCase): self.assertFalse(self.controller.logger.error.called) self.assertFalse(self.controller.logger.exception.called) - @unit.patch_policies def test_MISSING_CHECK_storage_policy(self): object_dir = utils.storage_directory( - os.path.join(self.testdir, 'sda1', diskfile.get_data_dir(1)), + os.path.join(self.testdir, 'sda1', + diskfile.get_data_dir(POLICIES[1])), '1', self.hash1) utils.mkdirs(object_dir) fp = open(os.path.join(object_dir, self.ts1 + '.data'), 'w+') @@ -550,7 +571,8 @@ class TestReceiver(unittest.TestCase): def test_MISSING_CHECK_have_one_newer(self): object_dir = utils.storage_directory( - os.path.join(self.testdir, 'sda1', diskfile.get_data_dir(0)), + os.path.join(self.testdir, 'sda1', + diskfile.get_data_dir(POLICIES[0])), '1', self.hash1) utils.mkdirs(object_dir) newer_ts1 = utils.normalize_timestamp(float(self.ts1) + 1) @@ -583,7 +605,8 @@ class TestReceiver(unittest.TestCase): def test_MISSING_CHECK_have_one_older(self): object_dir = utils.storage_directory( - os.path.join(self.testdir, 'sda1', diskfile.get_data_dir(0)), + os.path.join(self.testdir, 'sda1', + diskfile.get_data_dir(POLICIES[0])), '1', self.hash1) utils.mkdirs(object_dir) older_ts1 = utils.normalize_timestamp(float(self.ts1) - 1) @@ -1072,7 +1095,6 @@ class TestReceiver(unittest.TestCase): 'content-encoding specialty-header')}) self.assertEqual(req.read_body, '1') - @unit.patch_policies() def test_UPDATES_with_storage_policy(self): _PUT_request = [None]