From 38d35797df1d18d58eed5b537faa3696762c2e2a Mon Sep 17 00:00:00 2001 From: Romain LE DISEZ Date: Fri, 14 Apr 2017 17:21:22 +0200 Subject: [PATCH] Fix SSYNC failing to replicate unexpired object Fix a situation where SSYNC would fail to replicate a valid object because the datafile contains an expired X-Delete-At information while a metafile contains no X-Delete-At information. Example: - 1454619054.02968.data => contains X-Delete-At: 1454619654 - 1454619056.04876.meta => does not contain X-Delete-At info In this situation, the replicator tries to PUT the datafile, and then to POST the metadata. Previously, if the receiver has the datafile but current time is greater than the X-Delete-At, then it considers it to be expired and requests no updates from the sender, so the metafile is never synced. If the receiver does not have the datafile then it does request updates from the sender, but the ssync PUT subrequest is refused if the current time is greater than the X-Delete-At (the object is expired). If the datafile is transfered, the ssync POST subrequest fails because the object does not exist (expired). This commit allows PUT and POST to works so that the object can be replicated, by enabling the receiver object server to open expired diskfiles when handling replication requests. Closes-Bug: #1683689 Co-Authored-By: Alistair Coles Change-Id: I919994ead2b20dbb6c5671c208823e8b7f513715 --- swift/common/constraints.py | 3 +- swift/obj/diskfile.py | 7 +- swift/obj/server.py | 6 +- swift/obj/ssync_receiver.py | 2 +- test/unit/obj/common.py | 4 +- test/unit/obj/test_diskfile.py | 19 ++++-- test/unit/obj/test_ssync.py | 98 +++++++++++++++++++++++++++- test/unit/obj/test_ssync_receiver.py | 31 +++++++++ 8 files changed, 154 insertions(+), 16 deletions(-) diff --git a/swift/common/constraints.py b/swift/common/constraints.py index 005c7875f9..2db9f05ca4 100644 --- a/swift/common/constraints.py +++ b/swift/common/constraints.py @@ -320,7 +320,8 @@ def check_delete_headers(request): raise HTTPBadRequest(request=request, content_type='text/plain', body='Non-integer X-Delete-At') - if x_delete_at < time.time(): + if x_delete_at < time.time() and not utils.config_true_value( + request.headers.get('x-backend-replication', 'f')): raise HTTPBadRequest(request=request, content_type='text/plain', body='X-Delete-At in past') return request diff --git a/swift/obj/diskfile.py b/swift/obj/diskfile.py index fe5fc73804..3626cd8a60 100644 --- a/swift/obj/diskfile.py +++ b/swift/obj/diskfile.py @@ -1903,6 +1903,8 @@ class BaseDiskFile(object): :param use_splice: if true, use zero-copy splice() to send data :param pipe_size: size of pipe buffer used in zero-copy operations :param use_linkat: if True, use open() with linkat() to create obj file + :param open_expired: if True, open() will not raise a DiskFileExpired if + object is expired """ reader_cls = None # must be set by subclasses writer_cls = None # must be set by subclasses @@ -1910,7 +1912,7 @@ class BaseDiskFile(object): def __init__(self, mgr, device_path, partition, account=None, container=None, obj=None, _datadir=None, policy=None, use_splice=False, pipe_size=None, - use_linkat=False, **kwargs): + use_linkat=False, open_expired=False, **kwargs): self._manager = mgr self._device_path = device_path self._logger = mgr.logger @@ -1919,6 +1921,7 @@ class BaseDiskFile(object): self._use_splice = use_splice self._pipe_size = pipe_size self._use_linkat = use_linkat + self._open_expired = open_expired # This might look a lttle hacky i.e tracking number of newly created # dirs to fsync only those many later. If there is a better way, # please suggest. @@ -2217,7 +2220,7 @@ class BaseDiskFile(object): data_file, "bad metadata x-delete-at value %s" % ( self._metadata['X-Delete-At'])) else: - if x_delete_at <= time.time(): + if x_delete_at <= time.time() and not self._open_expired: raise DiskFileExpired(metadata=self._metadata) try: metadata_size = int(self._metadata['Content-Length']) diff --git a/swift/obj/server.py b/swift/obj/server.py index b6d911656a..1efa3997c1 100644 --- a/swift/obj/server.py +++ b/swift/obj/server.py @@ -516,7 +516,8 @@ class ObjectController(BaseStorageServer): try: disk_file = self.get_diskfile( device, partition, account, container, obj, - policy=policy) + policy=policy, open_expired=config_true_value( + request.headers.get('x-backend-replication', 'false'))) except DiskFileDeviceUnavailable: return HTTPInsufficientStorage(drive=device, request=request) try: @@ -653,9 +654,6 @@ class ObjectController(BaseStorageServer): if error_response: return error_response new_delete_at = int(request.headers.get('X-Delete-At') or 0) - if new_delete_at and new_delete_at < time.time(): - return HTTPBadRequest(body='X-Delete-At in past', request=request, - content_type='text/plain') try: fsize = request.message_length() except ValueError as e: diff --git a/swift/obj/ssync_receiver.py b/swift/obj/ssync_receiver.py index 990bdb21a9..3b8b860134 100644 --- a/swift/obj/ssync_receiver.py +++ b/swift/obj/ssync_receiver.py @@ -257,7 +257,7 @@ class Receiver(object): try: df = self.diskfile_mgr.get_diskfile_from_hash( self.device, self.partition, remote['object_hash'], - self.policy, frag_index=self.frag_index) + self.policy, frag_index=self.frag_index, open_expired=True) except exceptions.DiskFileNotExist: return {} try: diff --git a/test/unit/obj/common.py b/test/unit/obj/common.py index 3fc3dc2be7..b18ee40d7e 100644 --- a/test/unit/obj/common.py +++ b/test/unit/obj/common.py @@ -75,7 +75,7 @@ class BaseTest(unittest.TestCase): account='a', container='c', obj='o', body='test', extra_metadata=None, policy=None, frag_index=None, timestamp=None, df_mgr=None, - commit=True): + commit=True, verify=True): policy = policy or POLICIES.legacy object_parts = account, container, obj timestamp = Timestamp(time.time()) if timestamp is None else timestamp @@ -86,7 +86,7 @@ class BaseTest(unittest.TestCase): frag_index=frag_index) write_diskfile(df, timestamp, data=body, extra_metadata=extra_metadata, commit=commit) - if commit: + if commit and verify: # when we write and commit stub data, sanity check it's readable # and not quarantined because of any validation check with df.open(): diff --git a/test/unit/obj/test_diskfile.py b/test/unit/obj/test_diskfile.py index fe7069c91c..dc29196e08 100644 --- a/test/unit/obj/test_diskfile.py +++ b/test/unit/obj/test_diskfile.py @@ -2847,22 +2847,23 @@ class DiskFileMixin(BaseDiskFileTestMixin): pickle.dumps(metadata, diskfile.PICKLE_PROTOCOL)) def _simple_get_diskfile(self, partition='0', account='a', container='c', - obj='o', policy=None, frag_index=None): + obj='o', policy=None, frag_index=None, **kwargs): policy = policy or POLICIES.default df_mgr = self.df_router[policy] if policy.policy_type == EC_POLICY and frag_index is None: frag_index = 2 return df_mgr.get_diskfile(self.existing_device, partition, account, container, obj, - policy=policy, frag_index=frag_index) + policy=policy, frag_index=frag_index, + **kwargs) def _create_test_file(self, data, timestamp=None, metadata=None, - account='a', container='c', obj='o'): + account='a', container='c', obj='o', **kwargs): if metadata is None: metadata = {} metadata.setdefault('name', '/%s/%s/%s' % (account, container, obj)) df = self._simple_get_diskfile(account=account, container=container, - obj=obj) + obj=obj, **kwargs) if timestamp is None: timestamp = time() timestamp = Timestamp(timestamp) @@ -2932,6 +2933,16 @@ class DiskFileMixin(BaseDiskFileTestMixin): self._create_test_file, '1234567890', metadata={'X-Delete-At': '0'}) + try: + self._create_test_file('1234567890', open_expired=True, + metadata={'X-Delete-At': '0', + 'X-Object-Meta-Foo': 'bar'}) + df = self._simple_get_diskfile(open_expired=True) + md = df.read_metadata() + self.assertEqual(md['X-Object-Meta-Foo'], 'bar') + except SwiftException as err: + self.fail("Unexpected swift exception raised: %r" % err) + def test_open_not_expired(self): try: self._create_test_file( diff --git a/test/unit/obj/test_ssync.py b/test/unit/obj/test_ssync.py index 41a2a8872d..60a38511e9 100644 --- a/test/unit/obj/test_ssync.py +++ b/test/unit/obj/test_ssync.py @@ -24,7 +24,7 @@ import itertools from six.moves import urllib from swift.common.exceptions import DiskFileNotExist, DiskFileError, \ - DiskFileDeleted + DiskFileDeleted, DiskFileExpired from swift.common import utils from swift.common.storage_policy import POLICIES, EC_POLICY from swift.common.utils import Timestamp @@ -164,7 +164,9 @@ class TestBaseSsync(BaseTest): self.assertNotEqual(v, rx_metadata.pop(k, None)) continue else: - self.assertEqual(v, rx_metadata.pop(k), k) + actual = rx_metadata.pop(k) + self.assertEqual(v, actual, 'Expected %r but got %r for %s' % + (v, actual, k)) self.assertFalse(rx_metadata) expected_body = self._get_object_data(tx_df._name, frag_index=frag_index) @@ -1343,6 +1345,98 @@ class TestSsyncReplication(TestBaseSsync): self.assertEqual(metadata['X-Object-Meta-Test'], oname) self.assertEqual(metadata['X-Object-Sysmeta-Test'], 'sys_' + oname) + def _check_no_longer_expired_object(self, obj_name, df, policy): + # verify that objects with x-delete-at metadata that are not expired + # can be sync'd + rx_node_index = 0 + + def do_ssync(): + # create ssync sender instance... + suffixes = [os.path.basename(os.path.dirname(df._datadir))] + job = {'device': self.device, + 'partition': self.partition, + 'policy': policy} + node = dict(self.rx_node) + node.update({'index': rx_node_index}) + sender = ssync_sender.Sender(self.daemon, node, job, suffixes) + # wrap connection from tx to rx to capture ssync messages... + sender.connect, trace = self.make_connect_wrapper(sender) + + # run the sync protocol... + return sender() + + with self.assertRaises(DiskFileExpired): + df.open() # sanity check - expired + t1_meta = next(self.ts_iter) + df.write_metadata({'X-Timestamp': t1_meta.internal}) # no x-delete-at + df.open() # sanity check - no longer expired + + success, in_sync_objs = do_ssync() + self.assertEqual(1, len(in_sync_objs)) + self.assertTrue(success) + self._verify_ondisk_files({obj_name: [df]}, policy) + + # update object metadata with x-delete-at in distant future + t2_meta = next(self.ts_iter) + df.write_metadata({'X-Timestamp': t2_meta.internal, + 'X-Delete-At': str(int(t2_meta) + 10000)}) + df.open() # sanity check - not expired + + success, in_sync_objs = do_ssync() + self.assertEqual(1, len(in_sync_objs)) + self.assertTrue(success) + self._verify_ondisk_files({obj_name: [df]}, policy) + + # update object metadata with x-delete-at in not so distant future to + # check that we can update rx with older x-delete-at than it's current + t3_meta = next(self.ts_iter) + df.write_metadata({'X-Timestamp': t3_meta.internal, + 'X-Delete-At': str(int(t2_meta) + 5000)}) + df.open() # sanity check - not expired + + success, in_sync_objs = do_ssync() + self.assertEqual(1, len(in_sync_objs)) + self.assertTrue(success) + self._verify_ondisk_files({obj_name: [df]}, policy) + + def test_no_longer_expired_object_syncs(self): + policy = POLICIES.default + # simulate o1 that was PUT with x-delete-at that is now expired but + # later had a POST that had no x-delete-at: object should not expire. + tx_df_mgr = self.daemon._df_router[policy] + t1 = next(self.ts_iter) + obj_name = 'o1' + metadata = {'X-Delete-At': '0', 'Content-Type': 'plain/text'} + df = self._make_diskfile( + obj=obj_name, body=self._get_object_data('/a/c/%s' % obj_name), + extra_metadata=metadata, timestamp=t1, policy=policy, + df_mgr=tx_df_mgr, verify=False) + + self._check_no_longer_expired_object(obj_name, df, policy) + + def test_no_longer_expired_object_syncs_meta(self): + policy = POLICIES.default + # simulate o1 that was PUT with x-delete-at that is now expired but + # later had a POST that had no x-delete-at: object should not expire. + tx_df_mgr = self.daemon._df_router[policy] + rx_df_mgr = self.rx_controller._diskfile_router[policy] + t1 = next(self.ts_iter) + obj_name = 'o1' + metadata = {'X-Delete-At': '0', 'Content-Type': 'plain/text'} + df = self._make_diskfile( + obj=obj_name, body=self._get_object_data('/a/c/%s' % obj_name), + extra_metadata=metadata, timestamp=t1, policy=policy, + df_mgr=tx_df_mgr, verify=False) + # rx got the .data file but is missing the .meta + rx_df = self._make_diskfile( + obj=obj_name, body=self._get_object_data('/a/c/%s' % obj_name), + extra_metadata=metadata, timestamp=t1, policy=policy, + df_mgr=rx_df_mgr, verify=False) + with self.assertRaises(DiskFileExpired): + rx_df.open() # sanity check - expired + + self._check_no_longer_expired_object(obj_name, df, policy) + def test_meta_file_not_synced_to_legacy_receiver(self): # verify that the sender does sync a data file to a legacy receiver, # but does not PUT meta file content to a legacy receiver diff --git a/test/unit/obj/test_ssync_receiver.py b/test/unit/obj/test_ssync_receiver.py index f6be6b1d84..d8a20b658d 100644 --- a/test/unit/obj/test_ssync_receiver.py +++ b/test/unit/obj/test_ssync_receiver.py @@ -35,6 +35,7 @@ from swift.obj.reconstructor import ObjectReconstructor from test import unit from test.unit import debug_logger, patch_policies, make_timestamp_iter +from test.unit.obj.common import write_diskfile @unit.patch_policies() @@ -665,6 +666,36 @@ class TestReceiver(unittest.TestCase): self.assertFalse(self.controller.logger.error.called) self.assertFalse(self.controller.logger.exception.called) + def test_MISSING_CHECK_missing_meta_expired_data(self): + # verify that even when rx disk file has expired x-delete-at, it will + # still be opened and checked for missing meta + self.controller.logger = mock.MagicMock() + ts1 = next(make_timestamp_iter()) + df = self.controller.get_diskfile( + 'sda1', '1', self.account1, self.container1, self.object1, + POLICIES[0]) + write_diskfile(df, ts1, extra_metadata={'X-Delete-At': 0}) + + # make a request - expect newer metadata to be wanted + req = swob.Request.blank( + '/sda1/1', + environ={'REQUEST_METHOD': 'SSYNC', + 'HTTP_X_BACKEND_STORAGE_POLICY_INDEX': '0'}, + body=':MISSING_CHECK: START\r\n' + + self.hash1 + ' ' + ts1.internal + ' m:30d40\r\n' + ':MISSING_CHECK: END\r\n' + ':UPDATES: START\r\n:UPDATES: END\r\n') + resp = req.get_response(self.controller) + self.assertEqual( + self.body_lines(resp.body), + [':MISSING_CHECK: START', + 'c2519f265f9633e74f9b2fe3b9bec27d m', + ':MISSING_CHECK: END', + ':UPDATES: START', ':UPDATES: END']) + self.assertEqual(resp.status_int, 200) + self.assertFalse(self.controller.logger.error.called) + self.assertFalse(self.controller.logger.exception.called) + @patch_policies(with_ec_default=True) def test_MISSING_CHECK_missing_durable(self): self.controller.logger = mock.MagicMock()