diff --git a/doc/source/index.rst b/doc/source/index.rst index 86a0bff0f2..8133cfda0f 100644 --- a/doc/source/index.rst +++ b/doc/source/index.rst @@ -44,6 +44,7 @@ Overview and Concepts overview_replication ratelimit overview_large_objects + overview_object_versioning overview_container_sync overview_expiring_objects diff --git a/doc/source/overview_object_versioning.rst b/doc/source/overview_object_versioning.rst new file mode 100644 index 0000000000..dc184661a3 --- /dev/null +++ b/doc/source/overview_object_versioning.rst @@ -0,0 +1,77 @@ +================= +Object Versioning +================= + +-------- +Overview +-------- + +Object versioning in swift is implemented by setting a flag on the container +to tell swift to version all objects in the container. The flag is the +``X-Versions-Location`` header on the container, and its value is the +container where the versions are stored. It is recommended to use a different +``X-Versions-Location`` container for each container that is being versioned. + +When data is ``PUT`` into a versioned container (a container with the +versioning flag turned on), the existing data in the file is redirected to a +new object and the data in the ``PUT`` request is saved as the data for the +versioned object. The new object name (for the previous version) is +``//``, where ``length`` +is the 2-character zero-padded hexidecimal length of the ```` and +```` is the timestamp of when the previous version was created. + +A ``GET`` to a versioned object will return the current version of the object +without having to do any request redirects or metadata lookups. + +A ``POST`` to a versioned object will update the object metadata as normal, +but will not create a new version of the object. In other words, new versions +are only created when the content of the object changes. + +A ``DELETE`` to a versioned object will only remove the current version of the +object. If you have 5 total versions of the object, you must delete the +object 5 times to completely remove the object. + +Note: A large object manifest file cannot be versioned, but a large object +manifest may point to versioned segments. + +-------------------------------------------------- +How to Enable Object Versioning in a Swift Cluster +-------------------------------------------------- + +Set ``allow_versions`` to ``True`` in the container server config. + +----------------------- +Examples Using ``curl`` +----------------------- + +First, create a container with the ``X-Versions-Location`` header or add the +header to an existing container. Also make sure the container referenced by +the ``X-Versions-Location`` exists. In this example, the name of that +container is "versions":: + + curl -i -XPUT -H "X-Auth-Token: " \ + -H "X-Versions-Location: versions" http:///container + curl -i -XPUT -H "X-Auth-Token: " http:///versions + +Create an object (the first version):: + + curl -i -XPUT --data-binary 1 -H "X-Auth-Token: " \ + http:///container/myobject + +Now create a new version of that object:: + + curl -i -XPUT --data-binary 2 -H "X-Auth-Token: " \ + http:///container/myobject + +See a listing of the older versions of the object:: + + curl -i -H "X-Auth-Token: " \ + http:///versions?prefix=myobject/ + +Now delete the current version of the object and see that the older version is +gone:: + + curl -i -XDELETE -H "X-Auth-Token: " \ + http:///container/myobject + curl -i -H "X-Auth-Token: " \ + http:///versions?prefix=myobject/ diff --git a/etc/container-server.conf-sample b/etc/container-server.conf-sample index 6f295ade16..4dda5f38eb 100644 --- a/etc/container-server.conf-sample +++ b/etc/container-server.conf-sample @@ -27,6 +27,7 @@ use = egg:swift#container # set log_requests = True # node_timeout = 3 # conn_timeout = 0.5 +# allow_versions = False [container-replicator] # You can override the default log routing for this app here (don't use set!): diff --git a/swift/common/exceptions.py b/swift/common/exceptions.py index 369b2f69f0..ed238b687a 100644 --- a/swift/common/exceptions.py +++ b/swift/common/exceptions.py @@ -76,3 +76,17 @@ class EmptyRingError(RingBuilderError): class DuplicateDeviceError(RingBuilderError): pass + + +class ListingIterError(Exception): + pass + + +class ListingIterNotFound(ListingIterError): + pass + + +class ListingIterNotAuthorized(ListingIterError): + + def __init__(self, aresp): + self.aresp = aresp diff --git a/swift/container/server.py b/swift/container/server.py index 8a18cfdbaf..f42dd63580 100644 --- a/swift/container/server.py +++ b/swift/container/server.py @@ -31,7 +31,8 @@ from webob.exc import HTTPAccepted, HTTPBadRequest, HTTPConflict, \ from swift.common.db import ContainerBroker from swift.common.utils import get_logger, get_param, hash_path, \ - normalize_timestamp, storage_directory, split_path, validate_sync_to + normalize_timestamp, storage_directory, split_path, validate_sync_to, \ + TRUE_VALUES from swift.common.constraints import CONTAINER_LISTING_LIMIT, \ check_mount, check_float, check_utf8 from swift.common.bufferedhttp import http_connect @@ -52,7 +53,7 @@ class ContainerController(object): self.logger = get_logger(conf, log_route='container-server') self.root = conf.get('devices', '/srv/node/') self.mount_check = conf.get('mount_check', 'true').lower() in \ - ('true', 't', '1', 'on', 'yes', 'y') + TRUE_VALUES self.node_timeout = int(conf.get('node_timeout', 3)) self.conn_timeout = float(conf.get('conn_timeout', 0.5)) self.allowed_sync_hosts = [h.strip() @@ -62,6 +63,8 @@ class ContainerController(object): ContainerBroker, self.mount_check, logger=self.logger) self.auto_create_account_prefix = \ conf.get('auto_create_account_prefix') or '.' + if conf.get('allow_versions', 'f').lower() in TRUE_VALUES: + self.save_headers.append('x-versions-location') def _get_container_broker(self, drive, part, account, container): """ diff --git a/swift/obj/server.py b/swift/obj/server.py index 9cca16ba5d..2770c22b87 100644 --- a/swift/obj/server.py +++ b/swift/obj/server.py @@ -368,9 +368,9 @@ class ObjectController(object): x-delete-at, x-object-manifest, ''' - self.allowed_headers = set(i.strip().lower() for i in \ - conf.get('allowed_headers', \ - default_allowed_headers).split(',') if i.strip() and \ + self.allowed_headers = set(i.strip().lower() for i in + conf.get('allowed_headers', + default_allowed_headers).split(',') if i.strip() and i.strip().lower() not in DISALLOWED_HEADERS) self.expiring_objects_account = \ (conf.get('auto_create_account_prefix') or '.') + \ diff --git a/swift/proxy/server.py b/swift/proxy/server.py index 0d15c39838..15d17fcc8e 100644 --- a/swift/proxy/server.py +++ b/swift/proxy/server.py @@ -59,7 +59,8 @@ from swift.common.constraints import check_metadata, check_object_creation, \ check_utf8, CONTAINER_LISTING_LIMIT, MAX_ACCOUNT_NAME_LENGTH, \ MAX_CONTAINER_NAME_LENGTH, MAX_FILE_SIZE from swift.common.exceptions import ChunkReadTimeout, \ - ChunkWriteTimeout, ConnectionTimeout + ChunkWriteTimeout, ConnectionTimeout, ListingIterNotFound, \ + ListingIterNotAuthorized, ListingIterError def update_headers(response, headers): @@ -487,17 +488,20 @@ class Controller(object): read_acl = cache_value['read_acl'] write_acl = cache_value['write_acl'] sync_key = cache_value.get('sync_key') + versions = cache_value.get('versions') if status == 200: - return partition, nodes, read_acl, write_acl, sync_key + return partition, nodes, read_acl, write_acl, sync_key, \ + versions elif status == 404: - return None, None, None, None, None + return None, None, None, None, None, None if not self.account_info(account, autocreate=account_autocreate)[1]: - return None, None, None, None, None + return None, None, None, None, None, None result_code = 0 read_acl = None write_acl = None sync_key = None container_size = None + versions = None attempts_left = self.app.container_ring.replica_count headers = {'x-trans-id': self.trans_id, 'Connection': 'close'} for node in self.iter_nodes(partition, nodes, self.app.container_ring): @@ -515,6 +519,7 @@ class Controller(object): sync_key = resp.getheader('x-container-sync-key') container_size = \ resp.getheader('X-Container-Object-Count') + versions = resp.getheader('x-versions-location') break elif resp.status == 404: if result_code == 0: @@ -542,11 +547,12 @@ class Controller(object): 'read_acl': read_acl, 'write_acl': write_acl, 'sync_key': sync_key, - 'container_size': container_size}, + 'container_size': container_size, + 'versions': versions}, timeout=cache_timeout) if result_code == 200: - return partition, nodes, read_acl, write_acl, sync_key - return None, None, None, None, None + return partition, nodes, read_acl, write_acl, sync_key, versions + return None, None, None, None, None, None def iter_nodes(self, partition, nodes, ring): """ @@ -788,9 +794,6 @@ class Controller(object): bodies.append('') possible_source.read() continue - if (req.method == 'GET' and - possible_source.status in (200, 206)) or \ - 200 <= possible_source.status <= 399: if newest: if source: ts = float(source.getheader('x-put-timestamp') or @@ -860,11 +863,42 @@ class ObjectController(Controller): self.container_name = unquote(container_name) self.object_name = unquote(object_name) + def _listing_iter(self, lcontainer, lprefix, env): + lpartition, lnodes = self.app.container_ring.get_nodes( + self.account_name, lcontainer) + marker = '' + while True: + path = '/%s/%s' % (quote(self.account_name), quote(lcontainer)) + lreq = Request.blank( + '%s?prefix=%s&format=json&marker=%s' % + (path, quote(lprefix), quote(marker)), environ=env) + lreq.environ['REQUEST_METHOD'] = 'GET' + lreq.path_info = path + shuffle(lnodes) + lresp = self.GETorHEAD_base(lreq, _('Container'), + lpartition, lnodes, lreq.path_info, + self.app.container_ring.replica_count) + if lresp.status_int == 404: + raise ListingIterNotFound() + elif lresp.status_int // 100 != 2: + raise ListingIterError() + if 'swift.authorize' in env: + lreq.acl = lresp.headers.get('x-container-read') + aresp = env['swift.authorize'](lreq) + if aresp: + raise ListingIterNotAuthorized(aresp) + sublisting = json.loads(lresp.body) + if not sublisting: + break + marker = sublisting[-1]['name'] + for obj in sublisting: + yield obj + def GETorHEAD(self, req): """Handle HTTP GET or HEAD requests.""" + _junk, _junk, req.acl, _junk, _junk, object_versions = \ + self.container_info(self.account_name, self.container_name) if 'swift.authorize' in req.environ: - req.acl = \ - self.container_info(self.account_name, self.container_name)[2] aresp = req.environ['swift.authorize'](req) if aresp: return aresp @@ -874,9 +908,10 @@ class ObjectController(Controller): resp = self.GETorHEAD_base(req, _('Object'), partition, self.iter_nodes(partition, nodes, self.app.object_ring), req.path_info, self.app.object_ring.replica_count) + # If we get a 416 Requested Range Not Satisfiable we have to check if - # we were actually requesting a manifest object and then redo the range - # request on the whole object. + # we were actually requesting a manifest and then redo + # the range request on the whole object. if resp.status_int == 416: req_range = req.range req.range = None @@ -891,68 +926,17 @@ class ObjectController(Controller): if 'x-object-manifest' in resp.headers: lcontainer, lprefix = \ resp.headers['x-object-manifest'].split('/', 1) - lpartition, lnodes = self.app.container_ring.get_nodes( - self.account_name, lcontainer) - marker = '' - listing = [] - while True: - lreq = Request.blank('/%s/%s?prefix=%s&format=json&marker=%s' % - (quote(self.account_name), quote(lcontainer), - quote(lprefix), quote(marker))) - shuffle(lnodes) - lresp = self.GETorHEAD_base(lreq, _('Container'), lpartition, - lnodes, lreq.path_info, - self.app.container_ring.replica_count) - if lresp.status_int // 100 != 2: - lresp = HTTPNotFound(request=req) - lresp.headers['X-Object-Manifest'] = \ - resp.headers['x-object-manifest'] - return lresp - if 'swift.authorize' in req.environ: - req.acl = lresp.headers.get('x-container-read') - aresp = req.environ['swift.authorize'](req) - if aresp: - return aresp - sublisting = json.loads(lresp.body) - if not sublisting: - break - listing.extend(sublisting) - if len(listing) > CONTAINER_LISTING_LIMIT: - break - marker = sublisting[-1]['name'] + try: + listing = list(self._listing_iter(lcontainer, lprefix, + req.environ)) + except ListingIterNotFound: + return HTTPNotFound(request=req) + except ListingIterNotAuthorized, err: + return err.aresp + except ListingIterError: + return HTTPServerError(request=req) if len(listing) > CONTAINER_LISTING_LIMIT: - # We will serve large objects with a ton of segments with - # chunked transfer encoding. - - def listing_iter(): - marker = '' - while True: - lreq = Request.blank( - '/%s/%s?prefix=%s&format=json&marker=%s' % - (quote(self.account_name), quote(lcontainer), - quote(lprefix), quote(marker))) - lresp = self.GETorHEAD_base(lreq, _('Container'), - lpartition, lnodes, lreq.path_info, - self.app.container_ring.replica_count) - if lresp.status_int // 100 != 2: - raise Exception(_('Object manifest GET could not ' - 'continue listing: %s %s') % - (req.path, lreq.path)) - if 'swift.authorize' in req.environ: - req.acl = lresp.headers.get('x-container-read') - aresp = req.environ['swift.authorize'](req) - if aresp: - raise Exception(_('Object manifest GET could ' - 'not continue listing: %s %s') % - (req.path, aresp)) - sublisting = json.loads(lresp.body) - if not sublisting: - break - for obj in sublisting: - yield obj - marker = sublisting[-1]['name'] - resp = Response(headers=resp.headers, request=req, conditional_response=True) if req.method == 'HEAD': @@ -971,7 +955,8 @@ class ObjectController(Controller): return head_response else: resp.app_iter = SegmentedIterable(self, lcontainer, - listing_iter(), resp) + self._listing_iter(lcontainer, lprefix, req.environ), + resp) else: # For objects with a reasonable number of segments, we'll serve @@ -1030,6 +1015,7 @@ class ObjectController(Controller): req.headers['X-Copy-From'] = quote('/%s/%s' % (self.container_name, self.object_name)) req.headers['X-Fresh-Metadata'] = 'true' + req.environ['swift_versioned_copy'] = True resp = self.PUT(req) # Older editions returned 202 Accepted on object POSTs, so we'll # convert any 201 Created responses to that for compatibility with @@ -1041,7 +1027,7 @@ class ObjectController(Controller): error_response = check_metadata(req, 'object') if error_response: return error_response - container_partition, containers, _junk, req.acl, _junk = \ + container_partition, containers, _junk, req.acl, _junk, _junk = \ self.container_info(self.account_name, self.container_name, account_autocreate=self.app.account_autocreate) if 'swift.authorize' in req.environ: @@ -1124,7 +1110,7 @@ class ObjectController(Controller): def PUT(self, req): """HTTP PUT request handler.""" (container_partition, containers, _junk, req.acl, - req.environ['swift_sync_key']) = \ + req.environ['swift_sync_key'], object_versions) = \ self.container_info(self.account_name, self.container_name, account_autocreate=self.app.account_autocreate) if 'swift.authorize' in req.environ: @@ -1160,19 +1146,20 @@ class ObjectController(Controller): delete_at_part = delete_at_nodes = None partition, nodes = self.app.object_ring.get_nodes( self.account_name, self.container_name, self.object_name) + # do a HEAD request for container sync and checking object versions + if 'x-timestamp' in req.headers or (object_versions and not + req.environ.get('swift_versioned_copy')): + hreq = Request.blank(req.path_info, headers={'X-Newest': 'True'}, + environ={'REQUEST_METHOD': 'HEAD'}) + hresp = self.GETorHEAD_base(hreq, _('Object'), partition, nodes, + hreq.path_info, self.app.object_ring.replica_count) # Used by container sync feature if 'x-timestamp' in req.headers: try: req.headers['X-Timestamp'] = \ normalize_timestamp(float(req.headers['x-timestamp'])) - # For container sync PUTs, do a HEAD to see if we can - # shortcircuit - hreq = Request.blank(req.path_info, - environ={'REQUEST_METHOD': 'HEAD'}) - self.GETorHEAD_base(hreq, _('Object'), partition, nodes, - hreq.path_info, self.app.object_ring.replica_count) - if 'swift_x_timestamp' in hreq.environ and \ - float(hreq.environ['swift_x_timestamp']) >= \ + if 'swift_x_timestamp' in hresp.environ and \ + float(hresp.environ['swift_x_timestamp']) >= \ float(req.headers['x-timestamp']): return HTTPAccepted(request=req) except ValueError: @@ -1191,6 +1178,39 @@ class ObjectController(Controller): error_response = check_object_creation(req, self.object_name) if error_response: return error_response + if object_versions and not req.environ.get('swift_versioned_copy'): + is_manifest = 'x-object-manifest' in req.headers or \ + 'x-object-manifest' in hresp.headers + if hresp.status_int != 404 and not is_manifest: + # This is a version manifest and needs to be handled + # differently. First copy the existing data to a new object, + # then write the data from this request to the version manifest + # object. + lcontainer = object_versions.split('/')[0] + prefix_len = '%03x' % len(self.object_name) + lprefix = prefix_len + self.object_name + '/' + ts_source = hresp.environ.get('swift_x_timestamp') + if ts_source is None: + ts_source = time.mktime(time.strptime( + hresp.headers['last-modified'], + '%a, %d %b %Y %H:%M:%S GMT')) + new_ts = normalize_timestamp(ts_source) + vers_obj_name = lprefix + new_ts + copy_headers = { + 'Destination': '%s/%s' % (lcontainer, vers_obj_name)} + copy_environ = {'REQUEST_METHOD': 'COPY', + 'swift_versioned_copy': True + } + copy_req = Request.blank(req.path_info, headers=copy_headers, + environ=copy_environ) + copy_resp = self.COPY(copy_req) + if copy_resp.status_int // 100 == 4: + # missing container or bad permissions + return HTTPPreconditionFailed(request=req) + elif copy_resp.status_int // 100 != 2: + # could not copy the data, bail + return HTTPServiceUnavailable(request=req) + reader = req.environ['wsgi.input'].read data_source = iter(lambda: reader(self.app.client_chunk_size), '') source_header = req.headers.get('X-Copy-From') @@ -1367,8 +1387,59 @@ class ObjectController(Controller): def DELETE(self, req): """HTTP DELETE request handler.""" (container_partition, containers, _junk, req.acl, - req.environ['swift_sync_key']) = \ + req.environ['swift_sync_key'], object_versions) = \ self.container_info(self.account_name, self.container_name) + if object_versions: + # this is a version manifest and needs to be handled differently + lcontainer = object_versions.split('/')[0] + prefix_len = '%03x' % len(self.object_name) + lprefix = prefix_len + self.object_name + '/' + try: + raw_listing = self._listing_iter(lcontainer, lprefix, + req.environ) + except ListingIterNotFound: + # set raw_listing so that the actual object is deleted + raw_listing = [] + except ListingIterNotAuthorized, err: + return err.aresp + except ListingIterError: + return HTTPServerError(request=req) + last_item = None + for item in raw_listing: # find the last item + last_item = item + if last_item: + # there are older versions so copy the previous version to the + # current object and delete the previous version + orig_container = self.container_name + orig_obj = self.object_name + self.container_name = lcontainer + self.object_name = last_item['name'] + copy_path = '/' + self.account_name + '/' + \ + self.container_name + '/' + self.object_name + copy_headers = {'X-Newest': 'True', + 'Destination': orig_container + '/' + orig_obj + } + copy_environ = {'REQUEST_METHOD': 'COPY', + 'swift_versioned_copy': True + } + creq = Request.blank(copy_path, headers=copy_headers, + environ=copy_environ) + copy_resp = self.COPY(creq) + if copy_resp.status_int // 100 == 4: + # some user error, maybe permissions + return HTTPPreconditionFailed(request=req) + elif copy_resp.status_int // 100 != 2: + # could not copy the data, bail + return HTTPServiceUnavailable(request=req) + # reset these because the COPY changed them + self.container_name = lcontainer + self.object_name = last_item['name'] + new_del_req = Request.blank(copy_path, environ=req.environ) + (container_partition, containers, + _junk, new_del_req.acl, _junk, _junk) = \ + self.container_info(self.account_name, self.container_name) + new_del_req.path_info = copy_path + req = new_del_req if 'swift.authorize' in req.environ: aresp = req.environ['swift.authorize'](req) if aresp: @@ -1435,7 +1506,8 @@ class ContainerController(Controller): # Ensure these are all lowercase pass_through_headers = ['x-container-read', 'x-container-write', - 'x-container-sync-key', 'x-container-sync-to'] + 'x-container-sync-key', 'x-container-sync-to', + 'x-versions-location'] def __init__(self, app, account_name, container_name, **kwargs): Controller.__init__(self, app) @@ -1473,7 +1545,8 @@ class ContainerController(Controller): 'read_acl': resp.headers.get('x-container-read'), 'write_acl': resp.headers.get('x-container-write'), 'sync_key': resp.headers.get('x-container-sync-key'), - 'container_size': resp.headers.get('x-container-object-count')}, + 'container_size': resp.headers.get('x-container-object-count'), + 'versions': resp.headers.get('x-versions-location')}, timeout=self.app.recheck_container_existence) if 'swift.authorize' in req.environ: diff --git a/test/unit/proxy/test_server.py b/test/unit/proxy/test_server.py index 364370e85c..4019b0408f 100644 --- a/test/unit/proxy/test_server.py +++ b/test/unit/proxy/test_server.py @@ -71,7 +71,8 @@ def setup(): _orig_container_listing_limit = proxy_server.CONTAINER_LISTING_LIMIT conf = {'devices': _testdir, 'swift_dir': _testdir, 'mount_check': 'false', 'allowed_headers': - 'content-encoding, x-object-manifest, content-disposition, foo'} + 'content-encoding, x-object-manifest, content-disposition, foo', + 'allow_versions': 'True'} prolis = listen(('localhost', 0)) acc1lis = listen(('localhost', 0)) acc2lis = listen(('localhost', 0)) @@ -169,6 +170,10 @@ def fake_http_connect(*code_iter, **kwargs): return self def getexpect(self): + if self.status == -2: + raise HTTPException() + if self.status == -3: + return FakeConn(507) return FakeConn(100) def getheaders(self): @@ -232,7 +237,7 @@ def fake_http_connect(*code_iter, **kwargs): status = code_iter.next() etag = etag_iter.next() timestamp = timestamps_iter.next() - if status == -1: + if status <= 0: raise HTTPException() return FakeConn(status, etag, body=kwargs.get('body', ''), timestamp=timestamp) @@ -626,23 +631,19 @@ class TestProxyServer(unittest.TestCase): proxy_server.get_logger = mock_get_logger test_conf({}) line = snarf.strip_value() - print line self.assert_(line.startswith('swift')) self.assert_(line.endswith('INFO')) test_conf({'log_name': 'snarf-test'}) line = snarf.strip_value() - print line self.assert_(line.startswith('snarf-test')) self.assert_(line.endswith('INFO')) test_conf({'log_name': 'snarf-test', 'log_level': 'ERROR'}) line = snarf.strip_value() - print line self.assertFalse(line) test_conf({'log_name': 'snarf-test', 'log_level': 'ERROR', 'access_log_name': 'access-test', 'access_log_level': 'INFO'}) line = snarf.strip_value() - print line self.assert_(line.startswith('access-test')) self.assert_(line.endswith('INFO')) @@ -830,46 +831,12 @@ class TestObjectController(unittest.TestCase): def test_PUT_connect_exceptions(self): - def mock_http_connect(*code_iter, **kwargs): - - class FakeConn(object): - - def __init__(self, status): - self.status = status - self.reason = 'Fake' - - def getresponse(self): - return self - - def read(self, amt=None): - return '' - - def getheader(self, name): - return '' - - def getexpect(self): - if self.status == -2: - raise HTTPException() - if self.status == -3: - return FakeConn(507) - return FakeConn(100) - - code_iter = iter(code_iter) - - def connect(*args, **ckwargs): - status = code_iter.next() - if status == -1: - raise HTTPException() - return FakeConn(status) - - return connect - with save_globals(): controller = proxy_server.ObjectController(self.app, 'account', 'container', 'object') def test_status_map(statuses, expected): - proxy_server.http_connect = mock_http_connect(*statuses) + proxy_server.http_connect = fake_http_connect(*statuses) self.app.memcache.store = {} req = Request.blank('/a/c/o.jpg', {}) req.content_length = 0 @@ -885,50 +852,13 @@ class TestObjectController(unittest.TestCase): def test_PUT_send_exceptions(self): - def mock_http_connect(*code_iter, **kwargs): - - class FakeConn(object): - - def __init__(self, status): - self.status = status - self.reason = 'Fake' - self.host = '1.2.3.4' - self.port = 1024 - self.etag = md5() - - def getresponse(self): - self.etag = self.etag.hexdigest() - self.headers = { - 'etag': self.etag, - } - return self - - def read(self, amt=None): - return '' - - def send(self, amt=None): - if self.status == -1: - raise HTTPException() - else: - self.etag.update(amt) - - def getheader(self, name): - return self.headers.get(name, '') - - def getexpect(self): - return FakeConn(100) - code_iter = iter(code_iter) - - def connect(*args, **ckwargs): - return FakeConn(code_iter.next()) - return connect with save_globals(): controller = proxy_server.ObjectController(self.app, 'account', 'container', 'object') def test_status_map(statuses, expected): self.app.memcache.store = {} - proxy_server.http_connect = mock_http_connect(*statuses) + proxy_server.http_connect = fake_http_connect(*statuses) req = Request.blank('/a/c/o.jpg', environ={'REQUEST_METHOD': 'PUT'}, body='some data') self.app.update_request(req) @@ -953,44 +883,13 @@ class TestObjectController(unittest.TestCase): def test_PUT_getresponse_exceptions(self): - def mock_http_connect(*code_iter, **kwargs): - - class FakeConn(object): - - def __init__(self, status): - self.status = status - self.reason = 'Fake' - self.host = '1.2.3.4' - self.port = 1024 - - def getresponse(self): - if self.status == -1: - raise HTTPException() - return self - - def read(self, amt=None): - return '' - - def send(self, amt=None): - pass - - def getheader(self, name): - return '' - - def getexpect(self): - return FakeConn(100) - code_iter = iter(code_iter) - - def connect(*args, **ckwargs): - return FakeConn(code_iter.next()) - return connect with save_globals(): controller = proxy_server.ObjectController(self.app, 'account', 'container', 'object') def test_status_map(statuses, expected): self.app.memcache.store = {} - proxy_server.http_connect = mock_http_connect(*statuses) + proxy_server.http_connect = fake_http_connect(*statuses) req = Request.blank('/a/c/o.jpg', {}) req.content_length = 0 self.app.update_request(req) @@ -1086,12 +985,12 @@ class TestObjectController(unittest.TestCase): self.assert_('accept-ranges' in res.headers) self.assertEquals(res.headers['accept-ranges'], 'bytes') - test_status_map((200, 404, 404), 200) - test_status_map((200, 500, 404), 200) - test_status_map((304, 500, 404), 304) - test_status_map((404, 404, 404), 404) - test_status_map((404, 404, 500), 404) - test_status_map((500, 500, 500), 503) + test_status_map((200, 200, 200, 404, 404), 200) + test_status_map((200, 200, 200, 500, 404), 200) + test_status_map((200, 200, 304, 500, 404), 304) + test_status_map((200, 200, 404, 404, 404), 404) + test_status_map((200, 200, 404, 404, 500), 404) + test_status_map((200, 200, 500, 500, 500), 503) def test_HEAD_newest(self): with save_globals(): @@ -1111,12 +1010,13 @@ class TestObjectController(unittest.TestCase): self.assertEquals(res.headers.get('last-modified'), expected_timestamp) - test_status_map((200, 200, 200), 200, ('1', '2', '3'), '3') - test_status_map((200, 200, 200), 200, ('1', '3', '2'), '3') - test_status_map((200, 200, 200), 200, ('1', '3', '1'), '3') - test_status_map((200, 200, 200), 200, ('3', '3', '1'), '3') - test_status_map((200, 200, 200), 200, (None, None, None), None) - test_status_map((200, 200, 200), 200, (None, None, '1'), '1') + # acct cont obj obj obj + test_status_map((200, 200, 200, 200, 200), 200, ('0', '0', '1', '2', '3'), '3') + test_status_map((200, 200, 200, 200, 200), 200, ('0', '0', '1', '3', '2'), '3') + test_status_map((200, 200, 200, 200, 200), 200, ('0', '0', '1', '3', '1'), '3') + test_status_map((200, 200, 200, 200, 200), 200, ('0', '0', '3', '3', '1'), '3') + test_status_map((200, 200, 200, 200, 200), 200, ('0', '0', None, None, None), None) + test_status_map((200, 200, 200, 200, 200), 200, ('0', '0', None, None, '1'), '1') def test_GET_newest(self): with save_globals(): @@ -1136,12 +1036,12 @@ class TestObjectController(unittest.TestCase): self.assertEquals(res.headers.get('last-modified'), expected_timestamp) - test_status_map((200, 200, 200), 200, ('1', '2', '3'), '3') - test_status_map((200, 200, 200), 200, ('1', '3', '2'), '3') - test_status_map((200, 200, 200), 200, ('1', '3', '1'), '3') - test_status_map((200, 200, 200), 200, ('3', '3', '1'), '3') - test_status_map((200, 200, 200), 200, (None, None, None), None) - test_status_map((200, 200, 200), 200, (None, None, '1'), '1') + test_status_map((200, 200, 200, 200, 200), 200, ('0', '0', '1', '2', '3'), '3') + test_status_map((200, 200, 200, 200, 200), 200, ('0', '0', '1', '3', '2'), '3') + test_status_map((200, 200, 200, 200, 200), 200, ('0', '0', '1', '3', '1'), '3') + test_status_map((200, 200, 200, 200, 200), 200, ('0', '0', '3', '3', '1'), '3') + test_status_map((200, 200, 200, 200, 200), 200, ('0', '0', None, None, None), None) + test_status_map((200, 200, 200, 200, 200), 200, ('0', '0', None, None, '1'), '1') with save_globals(): controller = proxy_server.ObjectController(self.app, 'account', @@ -1160,11 +1060,11 @@ class TestObjectController(unittest.TestCase): self.assertEquals(res.headers.get('last-modified'), expected_timestamp) - test_status_map((200, 200, 200), 200, ('1', '2', '3'), '1') - test_status_map((200, 200, 200), 200, ('1', '3', '2'), '1') - test_status_map((200, 200, 200), 200, ('1', '3', '1'), '1') - test_status_map((200, 200, 200), 200, ('3', '3', '1'), '3') - test_status_map((200, 200, 200), 200, (None, '1', '2'), None) + test_status_map((200, 200, 200, 200, 200), 200, ('0', '0', '1', '2', '3'), '1') + test_status_map((200, 200, 200, 200, 200), 200, ('0', '0', '1', '3', '2'), '1') + test_status_map((200, 200, 200, 200, 200), 200, ('0', '0', '1', '3', '1'), '1') + test_status_map((200, 200, 200, 200, 200), 200, ('0', '0', '3', '3', '1'), '3') + test_status_map((200, 200, 200, 200, 200), 200, ('0', '0', None, '1', '2'), None) def test_POST_meta_val_len(self): with save_globals(): @@ -1516,25 +1416,25 @@ class TestObjectController(unittest.TestCase): proxy_server.shuffle = lambda l: None controller = proxy_server.ObjectController(self.app, 'account', 'container', 'object') - self.assert_status_map(controller.HEAD, (503, 200, 200), 200) + self.assert_status_map(controller.HEAD, (200, 200, 503, 200, 200), 200) self.assertEquals(controller.app.object_ring.devs[0]['errors'], 2) self.assert_('last_error' in controller.app.object_ring.devs[0]) for _junk in xrange(self.app.error_suppression_limit): - self.assert_status_map(controller.HEAD, (503, 503, 503), 503) + self.assert_status_map(controller.HEAD, (200, 200, 503, 503, 503), 503) self.assertEquals(controller.app.object_ring.devs[0]['errors'], self.app.error_suppression_limit + 1) - self.assert_status_map(controller.HEAD, (200, 200, 200), 503) + self.assert_status_map(controller.HEAD, (200, 200, 200, 200, 200), 503) self.assert_('last_error' in controller.app.object_ring.devs[0]) - self.assert_status_map(controller.PUT, (200, 201, 201, 201), 503) + self.assert_status_map(controller.PUT, (200, 200, 200, 201, 201, 201), 503) self.assert_status_map(controller.POST, - (200, 200, 200, 200, 202, 202, 202), 503) + (200, 200, 200, 200, 200, 200, 202, 202, 202), 503) self.assert_status_map(controller.DELETE, - (200, 204, 204, 204), 503) + (200, 200, 200, 204, 204, 204), 503) self.app.error_suppression_interval = -300 - self.assert_status_map(controller.HEAD, (200, 200, 200), 200) + self.assert_status_map(controller.HEAD, (200, 200, 200, 200, 200), 200) self.assertRaises(BaseException, self.assert_status_map, controller.DELETE, - (200, 204, 204, 204), 503, raise_exc=True) + (200, 200, 200, 204, 204, 204), 503, raise_exc=True) def test_acc_or_con_missing_returns_404(self): with save_globals(): @@ -2449,6 +2349,279 @@ class TestObjectController(unittest.TestCase): body = fd.read() self.assertEquals(body, 'oh hai123456789abcdef') + def test_version_manifest(self): + versions_to_create = 3 + # Create a container for our versioned object testing + (prolis, acc1lis, acc2lis, con2lis, con2lis, obj1lis, obj2lis) = \ + _test_sockets + sock = connect_tcp(('localhost', prolis.getsockname()[1])) + fd = sock.makefile() + fd.write('PUT /v1/a/versions HTTP/1.1\r\nHost: localhost\r\n' + 'Connection: close\r\nX-Storage-Token: t\r\n' + 'Content-Length: 0\r\nX-Versions-Location: vers\r\n\r\n') + fd.flush() + headers = readuntil2crlfs(fd) + exp = 'HTTP/1.1 201' + self.assertEquals(headers[:len(exp)], exp) + # check that the header was set + sock = connect_tcp(('localhost', prolis.getsockname()[1])) + fd = sock.makefile() + fd.write('GET /v1/a/versions HTTP/1.1\r\nHost: localhost\r\n' + 'Connection: close\r\nX-Storage-Token: t\r\n\r\n\r\n') + fd.flush() + headers = readuntil2crlfs(fd) + exp = 'HTTP/1.1 2' # 2xx series response + self.assertEquals(headers[:len(exp)], exp) + self.assert_('X-Versions-Location: vers' in headers) + # make the container for the object versions + sock = connect_tcp(('localhost', prolis.getsockname()[1])) + fd = sock.makefile() + fd.write('PUT /v1/a/vers HTTP/1.1\r\nHost: localhost\r\n' + 'Connection: close\r\nX-Storage-Token: t\r\n' + 'Content-Length: 0\r\n\r\n') + fd.flush() + headers = readuntil2crlfs(fd) + exp = 'HTTP/1.1 201' + self.assertEquals(headers[:len(exp)], exp) + # Create the versioned file + sock = connect_tcp(('localhost', prolis.getsockname()[1])) + fd = sock.makefile() + fd.write('PUT /v1/a/versions/name HTTP/1.1\r\nHost: ' + 'localhost\r\nConnection: close\r\nX-Storage-Token: ' + 't\r\nContent-Length: 5\r\nContent-Type: text/jibberish0\r\n' + 'X-Object-Meta-Foo: barbaz\r\n\r\n00000\r\n') + fd.flush() + headers = readuntil2crlfs(fd) + exp = 'HTTP/1.1 201' + self.assertEquals(headers[:len(exp)], exp) + # Create the object versions + for segment in xrange(1, versions_to_create): + sleep(.01) # guarantee that the timestamp changes + sock = connect_tcp(('localhost', prolis.getsockname()[1])) + fd = sock.makefile() + fd.write('PUT /v1/a/versions/name HTTP/1.1\r\nHost: ' + 'localhost\r\nConnection: close\r\nX-Storage-Token: ' + 't\r\nContent-Length: 5\r\nContent-Type: text/jibberish%s' + '\r\n\r\n%05d\r\n' % (segment, segment)) + fd.flush() + headers = readuntil2crlfs(fd) + exp = 'HTTP/1.1 201' + self.assertEquals(headers[:len(exp)], exp) + # Ensure retrieving the manifest file gets the latest version + sock = connect_tcp(('localhost', prolis.getsockname()[1])) + fd = sock.makefile() + fd.write('GET /v1/a/versions/name HTTP/1.1\r\nHost: ' + 'localhost\r\nConnection: close\r\nX-Auth-Token: t\r\n\r\n') + fd.flush() + headers = readuntil2crlfs(fd) + exp = 'HTTP/1.1 200' + self.assertEquals(headers[:len(exp)], exp) + self.assert_('Content-Type: text/jibberish%s' % segment in headers) + self.assert_('X-Object-Meta-Foo: barbaz' not in headers) + body = fd.read() + self.assertEquals(body, '%05d' % segment) + # Ensure we have the right number of versions saved + sock = connect_tcp(('localhost', prolis.getsockname()[1])) + fd = sock.makefile() + fd.write('GET /v1/a/vers?prefix=004name/ HTTP/1.1\r\nHost: ' + 'localhost\r\nConnection: close\r\nX-Auth-Token: t\r\n\r\n') + fd.flush() + headers = readuntil2crlfs(fd) + exp = 'HTTP/1.1 200' + self.assertEquals(headers[:len(exp)], exp) + body = fd.read() + versions = [x for x in body.split('\n') if x] + self.assertEquals(len(versions), versions_to_create - 1) + # copy a version and make sure the version info is stripped + sock = connect_tcp(('localhost', prolis.getsockname()[1])) + fd = sock.makefile() + fd.write('COPY /v1/a/versions/name HTTP/1.1\r\nHost: ' + 'localhost\r\nConnection: close\r\nX-Auth-Token: ' + 't\r\nDestination: versions/copied_name\r\n' + 'Content-Length: 0\r\n\r\n') + fd.flush() + headers = readuntil2crlfs(fd) + exp = 'HTTP/1.1 2' # 2xx series response to the COPY + self.assertEquals(headers[:len(exp)], exp) + sock = connect_tcp(('localhost', prolis.getsockname()[1])) + fd = sock.makefile() + fd.write('GET /v1/a/versions/copied_name HTTP/1.1\r\nHost: ' + 'localhost\r\nConnection: close\r\nX-Auth-Token: t\r\n\r\n') + fd.flush() + headers = readuntil2crlfs(fd) + exp = 'HTTP/1.1 200' + self.assertEquals(headers[:len(exp)], exp) + body = fd.read() + self.assertEquals(body, '%05d' % segment) + # post and make sure it's updated + sock = connect_tcp(('localhost', prolis.getsockname()[1])) + fd = sock.makefile() + fd.write('POST /v1/a/versions/name HTTP/1.1\r\nHost: ' + 'localhost\r\nConnection: close\r\nX-Auth-Token: ' + 't\r\nContent-Type: foo/bar\r\nContent-Length: 0\r\n' + 'X-Object-Meta-Bar: foo\r\n\r\n') + fd.flush() + headers = readuntil2crlfs(fd) + exp = 'HTTP/1.1 2' # 2xx series response to the POST + self.assertEquals(headers[:len(exp)], exp) + sock = connect_tcp(('localhost', prolis.getsockname()[1])) + fd = sock.makefile() + fd.write('GET /v1/a/versions/name HTTP/1.1\r\nHost: ' + 'localhost\r\nConnection: close\r\nX-Auth-Token: t\r\n\r\n') + fd.flush() + headers = readuntil2crlfs(fd) + exp = 'HTTP/1.1 200' + self.assertEquals(headers[:len(exp)], exp) + self.assert_('Content-Type: foo/bar' in headers) + self.assert_('X-Object-Meta-Bar: foo' in headers) + body = fd.read() + self.assertEquals(body, '%05d' % segment) + # Delete the object versions + for segment in xrange(versions_to_create - 1, 0, -1): + sock = connect_tcp(('localhost', prolis.getsockname()[1])) + fd = sock.makefile() + fd.write('DELETE /v1/a/versions/name HTTP/1.1\r\nHost: ' + 'localhost\r\nConnection: close\r\nX-Storage-Token: t\r\n\r\n') + fd.flush() + headers = readuntil2crlfs(fd) + exp = 'HTTP/1.1 2' # 2xx series response + self.assertEquals(headers[:len(exp)], exp) + # Ensure retrieving the manifest file gets the latest version + sock = connect_tcp(('localhost', prolis.getsockname()[1])) + fd = sock.makefile() + fd.write('GET /v1/a/versions/name HTTP/1.1\r\nHost: ' + 'localhost\r\nConnection: close\r\nX-Auth-Token: t\r\n\r\n') + fd.flush() + headers = readuntil2crlfs(fd) + exp = 'HTTP/1.1 200' + self.assertEquals(headers[:len(exp)], exp) + self.assert_('Content-Type: text/jibberish%s' % (segment - 1) + in headers) + body = fd.read() + self.assertEquals(body, '%05d' % (segment - 1)) + # Ensure we have the right number of versions saved + sock = connect_tcp(('localhost', prolis.getsockname()[1])) + fd = sock.makefile() + fd.write('GET /v1/a/vers?prefix=004name/ HTTP/1.1\r\nHost: ' + 'localhost\r\nConnection: close\r\nX-Auth-Token: t\r\n\r\n') + fd.flush() + headers = readuntil2crlfs(fd) + exp = 'HTTP/1.1 2' # 2xx series response + self.assertEquals(headers[:len(exp)], exp) + body = fd.read() + versions = [x for x in body.split('\n') if x] + self.assertEquals(len(versions), segment - 1) + # there is now one segment left (in the manifest) + # Ensure we have no saved versions + sock = connect_tcp(('localhost', prolis.getsockname()[1])) + fd = sock.makefile() + fd.write('GET /v1/a/vers?prefix=004name/ HTTP/1.1\r\nHost: ' + 'localhost\r\nConnection: close\r\nX-Auth-Token: t\r\n\r\n') + fd.flush() + headers = readuntil2crlfs(fd) + exp = 'HTTP/1.1 204 No Content' + self.assertEquals(headers[:len(exp)], exp) + # delete the last verision + sock = connect_tcp(('localhost', prolis.getsockname()[1])) + fd = sock.makefile() + fd.write('DELETE /v1/a/versions/name HTTP/1.1\r\nHost: ' + 'localhost\r\nConnection: close\r\nX-Storage-Token: t\r\n\r\n') + fd.flush() + headers = readuntil2crlfs(fd) + exp = 'HTTP/1.1 2' # 2xx series response + self.assertEquals(headers[:len(exp)], exp) + # Ensure it's all gone + sock = connect_tcp(('localhost', prolis.getsockname()[1])) + fd = sock.makefile() + fd.write('GET /v1/a/versions/name HTTP/1.1\r\nHost: ' + 'localhost\r\nConnection: close\r\nX-Auth-Token: t\r\n\r\n') + fd.flush() + headers = readuntil2crlfs(fd) + exp = 'HTTP/1.1 404' + self.assertEquals(headers[:len(exp)], exp) + + # make sure manifest files don't get versioned + sock = connect_tcp(('localhost', prolis.getsockname()[1])) + fd = sock.makefile() + fd.write('PUT /v1/a/versions/name HTTP/1.1\r\nHost: ' + 'localhost\r\nConnection: close\r\nX-Storage-Token: ' + 't\r\nContent-Length: 0\r\nContent-Type: text/jibberish0\r\n' + 'Foo: barbaz\r\nX-Object-Manifest: vers/foo_\r\n\r\n') + fd.flush() + headers = readuntil2crlfs(fd) + exp = 'HTTP/1.1 201' + self.assertEquals(headers[:len(exp)], exp) + # Ensure we have no saved versions + sock = connect_tcp(('localhost', prolis.getsockname()[1])) + fd = sock.makefile() + fd.write('GET /v1/a/vers?prefix=004name/ HTTP/1.1\r\nHost: ' + 'localhost\r\nConnection: close\r\nX-Auth-Token: t\r\n\r\n') + fd.flush() + headers = readuntil2crlfs(fd) + exp = 'HTTP/1.1 204 No Content' + self.assertEquals(headers[:len(exp)], exp) + + # DELETE v1/a/c/obj shouldn't delete v1/a/c/obj/sub versions + sock = connect_tcp(('localhost', prolis.getsockname()[1])) + fd = sock.makefile() + fd.write('PUT /v1/a/versions/name HTTP/1.1\r\nHost: ' + 'localhost\r\nConnection: close\r\nX-Storage-Token: ' + 't\r\nContent-Length: 5\r\nContent-Type: text/jibberish0\r\n' + 'Foo: barbaz\r\n\r\n00000\r\n') + fd.flush() + headers = readuntil2crlfs(fd) + exp = 'HTTP/1.1 201' + self.assertEquals(headers[:len(exp)], exp) + sock = connect_tcp(('localhost', prolis.getsockname()[1])) + fd = sock.makefile() + fd.write('PUT /v1/a/versions/name HTTP/1.1\r\nHost: ' + 'localhost\r\nConnection: close\r\nX-Storage-Token: ' + 't\r\nContent-Length: 5\r\nContent-Type: text/jibberish0\r\n' + 'Foo: barbaz\r\n\r\n00001\r\n') + fd.flush() + headers = readuntil2crlfs(fd) + exp = 'HTTP/1.1 201' + self.assertEquals(headers[:len(exp)], exp) + sock = connect_tcp(('localhost', prolis.getsockname()[1])) + fd = sock.makefile() + fd.write('PUT /v1/a/versions/name/sub HTTP/1.1\r\nHost: ' + 'localhost\r\nConnection: close\r\nX-Storage-Token: ' + 't\r\nContent-Length: 4\r\nContent-Type: text/jibberish0\r\n' + 'Foo: barbaz\r\n\r\nsub1\r\n') + fd.flush() + headers = readuntil2crlfs(fd) + exp = 'HTTP/1.1 201' + self.assertEquals(headers[:len(exp)], exp) + sock = connect_tcp(('localhost', prolis.getsockname()[1])) + fd = sock.makefile() + fd.write('PUT /v1/a/versions/name/sub HTTP/1.1\r\nHost: ' + 'localhost\r\nConnection: close\r\nX-Storage-Token: ' + 't\r\nContent-Length: 4\r\nContent-Type: text/jibberish0\r\n' + 'Foo: barbaz\r\n\r\nsub2\r\n') + fd.flush() + headers = readuntil2crlfs(fd) + exp = 'HTTP/1.1 201' + self.assertEquals(headers[:len(exp)], exp) + sock = connect_tcp(('localhost', prolis.getsockname()[1])) + fd = sock.makefile() + fd.write('DELETE /v1/a/versions/name HTTP/1.1\r\nHost: ' + 'localhost\r\nConnection: close\r\nX-Storage-Token: t\r\n\r\n') + fd.flush() + headers = readuntil2crlfs(fd) + exp = 'HTTP/1.1 2' # 2xx series response + self.assertEquals(headers[:len(exp)], exp) + sock = connect_tcp(('localhost', prolis.getsockname()[1])) + fd = sock.makefile() + fd.write('GET /v1/a/vers?prefix=008name/sub/ HTTP/1.1\r\nHost: ' + 'localhost\r\nConnection: close\r\nX-Auth-Token: t\r\n\r\n') + fd.flush() + headers = readuntil2crlfs(fd) + exp = 'HTTP/1.1 2' # 2xx series response + self.assertEquals(headers[:len(exp)], exp) + body = fd.read() + versions = [x for x in body.split('\n') if x] + self.assertEquals(len(versions), 1) + def test_chunked_put_lobjects(self): # Create a container for our segmented/manifest object testing (prolis, acc1lis, acc2lis, con2lis, con2lis, obj1lis, obj2lis) = \ @@ -2619,7 +2792,6 @@ class TestObjectController(unittest.TestCase): headers = readuntil2crlfs(fd) exp = 'HTTP/1.1 200' self.assertEquals(headers[:len(exp)], exp) - print headers self.assert_('Content-Type: text/jibberish' in headers) # Check set content type sock = connect_tcp(('localhost', prolis.getsockname()[1])) @@ -2731,7 +2903,7 @@ class TestObjectController(unittest.TestCase): def test_response_bytes_transferred_attr(self): with save_globals(): proxy_server.http_connect = \ - fake_http_connect(200, body='1234567890') + fake_http_connect(200, 200, 200, body='1234567890') controller = proxy_server.ObjectController(self.app, 'account', 'container', 'object') req = Request.blank('/a/c/o') @@ -2759,7 +2931,7 @@ class TestObjectController(unittest.TestCase): def test_response_client_disconnect_attr(self): with save_globals(): proxy_server.http_connect = \ - fake_http_connect(200, body='1234567890') + fake_http_connect(200, 200, 200, body='1234567890') controller = proxy_server.ObjectController(self.app, 'account', 'container', 'object') req = Request.blank('/a/c/o')