Merge "Fix SSYNC failing to replicate unexpired object"
This commit is contained in:
commit
45b17d89c7
@ -320,7 +320,8 @@ def check_delete_headers(request):
|
|||||||
raise HTTPBadRequest(request=request, content_type='text/plain',
|
raise HTTPBadRequest(request=request, content_type='text/plain',
|
||||||
body='Non-integer X-Delete-At')
|
body='Non-integer X-Delete-At')
|
||||||
|
|
||||||
if x_delete_at < time.time():
|
if x_delete_at < time.time() and not utils.config_true_value(
|
||||||
|
request.headers.get('x-backend-replication', 'f')):
|
||||||
raise HTTPBadRequest(request=request, content_type='text/plain',
|
raise HTTPBadRequest(request=request, content_type='text/plain',
|
||||||
body='X-Delete-At in past')
|
body='X-Delete-At in past')
|
||||||
return request
|
return request
|
||||||
|
@ -1920,6 +1920,8 @@ class BaseDiskFile(object):
|
|||||||
:param use_splice: if true, use zero-copy splice() to send data
|
:param use_splice: if true, use zero-copy splice() to send data
|
||||||
:param pipe_size: size of pipe buffer used in zero-copy operations
|
:param pipe_size: size of pipe buffer used in zero-copy operations
|
||||||
:param use_linkat: if True, use open() with linkat() to create obj file
|
:param use_linkat: if True, use open() with linkat() to create obj file
|
||||||
|
:param open_expired: if True, open() will not raise a DiskFileExpired if
|
||||||
|
object is expired
|
||||||
"""
|
"""
|
||||||
reader_cls = None # must be set by subclasses
|
reader_cls = None # must be set by subclasses
|
||||||
writer_cls = None # must be set by subclasses
|
writer_cls = None # must be set by subclasses
|
||||||
@ -1927,7 +1929,7 @@ class BaseDiskFile(object):
|
|||||||
def __init__(self, mgr, device_path, partition,
|
def __init__(self, mgr, device_path, partition,
|
||||||
account=None, container=None, obj=None, _datadir=None,
|
account=None, container=None, obj=None, _datadir=None,
|
||||||
policy=None, use_splice=False, pipe_size=None,
|
policy=None, use_splice=False, pipe_size=None,
|
||||||
use_linkat=False, **kwargs):
|
use_linkat=False, open_expired=False, **kwargs):
|
||||||
self._manager = mgr
|
self._manager = mgr
|
||||||
self._device_path = device_path
|
self._device_path = device_path
|
||||||
self._logger = mgr.logger
|
self._logger = mgr.logger
|
||||||
@ -1936,6 +1938,7 @@ class BaseDiskFile(object):
|
|||||||
self._use_splice = use_splice
|
self._use_splice = use_splice
|
||||||
self._pipe_size = pipe_size
|
self._pipe_size = pipe_size
|
||||||
self._use_linkat = use_linkat
|
self._use_linkat = use_linkat
|
||||||
|
self._open_expired = open_expired
|
||||||
# This might look a lttle hacky i.e tracking number of newly created
|
# This might look a lttle hacky i.e tracking number of newly created
|
||||||
# dirs to fsync only those many later. If there is a better way,
|
# dirs to fsync only those many later. If there is a better way,
|
||||||
# please suggest.
|
# please suggest.
|
||||||
@ -2229,7 +2232,7 @@ class BaseDiskFile(object):
|
|||||||
data_file, "bad metadata x-delete-at value %s" % (
|
data_file, "bad metadata x-delete-at value %s" % (
|
||||||
self._metadata['X-Delete-At']))
|
self._metadata['X-Delete-At']))
|
||||||
else:
|
else:
|
||||||
if x_delete_at <= time.time():
|
if x_delete_at <= time.time() and not self._open_expired:
|
||||||
raise DiskFileExpired(metadata=self._metadata)
|
raise DiskFileExpired(metadata=self._metadata)
|
||||||
try:
|
try:
|
||||||
metadata_size = int(self._metadata['Content-Length'])
|
metadata_size = int(self._metadata['Content-Length'])
|
||||||
|
@ -516,7 +516,8 @@ class ObjectController(BaseStorageServer):
|
|||||||
try:
|
try:
|
||||||
disk_file = self.get_diskfile(
|
disk_file = self.get_diskfile(
|
||||||
device, partition, account, container, obj,
|
device, partition, account, container, obj,
|
||||||
policy=policy)
|
policy=policy, open_expired=config_true_value(
|
||||||
|
request.headers.get('x-backend-replication', 'false')))
|
||||||
except DiskFileDeviceUnavailable:
|
except DiskFileDeviceUnavailable:
|
||||||
return HTTPInsufficientStorage(drive=device, request=request)
|
return HTTPInsufficientStorage(drive=device, request=request)
|
||||||
try:
|
try:
|
||||||
@ -653,9 +654,6 @@ class ObjectController(BaseStorageServer):
|
|||||||
if error_response:
|
if error_response:
|
||||||
return error_response
|
return error_response
|
||||||
new_delete_at = int(request.headers.get('X-Delete-At') or 0)
|
new_delete_at = int(request.headers.get('X-Delete-At') or 0)
|
||||||
if new_delete_at and new_delete_at < time.time():
|
|
||||||
return HTTPBadRequest(body='X-Delete-At in past', request=request,
|
|
||||||
content_type='text/plain')
|
|
||||||
try:
|
try:
|
||||||
fsize = request.message_length()
|
fsize = request.message_length()
|
||||||
except ValueError as e:
|
except ValueError as e:
|
||||||
|
@ -255,7 +255,7 @@ class Receiver(object):
|
|||||||
try:
|
try:
|
||||||
df = self.diskfile_mgr.get_diskfile_from_hash(
|
df = self.diskfile_mgr.get_diskfile_from_hash(
|
||||||
self.device, self.partition, remote['object_hash'],
|
self.device, self.partition, remote['object_hash'],
|
||||||
self.policy, frag_index=self.frag_index)
|
self.policy, frag_index=self.frag_index, open_expired=True)
|
||||||
except exceptions.DiskFileNotExist:
|
except exceptions.DiskFileNotExist:
|
||||||
return {}
|
return {}
|
||||||
try:
|
try:
|
||||||
|
@ -75,7 +75,7 @@ class BaseTest(unittest.TestCase):
|
|||||||
account='a', container='c', obj='o', body='test',
|
account='a', container='c', obj='o', body='test',
|
||||||
extra_metadata=None, policy=None,
|
extra_metadata=None, policy=None,
|
||||||
frag_index=None, timestamp=None, df_mgr=None,
|
frag_index=None, timestamp=None, df_mgr=None,
|
||||||
commit=True):
|
commit=True, verify=True):
|
||||||
policy = policy or POLICIES.legacy
|
policy = policy or POLICIES.legacy
|
||||||
object_parts = account, container, obj
|
object_parts = account, container, obj
|
||||||
timestamp = Timestamp(time.time()) if timestamp is None else timestamp
|
timestamp = Timestamp(time.time()) if timestamp is None else timestamp
|
||||||
@ -86,7 +86,7 @@ class BaseTest(unittest.TestCase):
|
|||||||
frag_index=frag_index)
|
frag_index=frag_index)
|
||||||
write_diskfile(df, timestamp, data=body, extra_metadata=extra_metadata,
|
write_diskfile(df, timestamp, data=body, extra_metadata=extra_metadata,
|
||||||
commit=commit)
|
commit=commit)
|
||||||
if commit:
|
if commit and verify:
|
||||||
# when we write and commit stub data, sanity check it's readable
|
# when we write and commit stub data, sanity check it's readable
|
||||||
# and not quarantined because of any validation check
|
# and not quarantined because of any validation check
|
||||||
with df.open():
|
with df.open():
|
||||||
|
@ -2899,22 +2899,23 @@ class DiskFileMixin(BaseDiskFileTestMixin):
|
|||||||
pickle.dumps(metadata, diskfile.PICKLE_PROTOCOL))
|
pickle.dumps(metadata, diskfile.PICKLE_PROTOCOL))
|
||||||
|
|
||||||
def _simple_get_diskfile(self, partition='0', account='a', container='c',
|
def _simple_get_diskfile(self, partition='0', account='a', container='c',
|
||||||
obj='o', policy=None, frag_index=None):
|
obj='o', policy=None, frag_index=None, **kwargs):
|
||||||
policy = policy or POLICIES.default
|
policy = policy or POLICIES.default
|
||||||
df_mgr = self.df_router[policy]
|
df_mgr = self.df_router[policy]
|
||||||
if policy.policy_type == EC_POLICY and frag_index is None:
|
if policy.policy_type == EC_POLICY and frag_index is None:
|
||||||
frag_index = 2
|
frag_index = 2
|
||||||
return df_mgr.get_diskfile(self.existing_device, partition,
|
return df_mgr.get_diskfile(self.existing_device, partition,
|
||||||
account, container, obj,
|
account, container, obj,
|
||||||
policy=policy, frag_index=frag_index)
|
policy=policy, frag_index=frag_index,
|
||||||
|
**kwargs)
|
||||||
|
|
||||||
def _create_test_file(self, data, timestamp=None, metadata=None,
|
def _create_test_file(self, data, timestamp=None, metadata=None,
|
||||||
account='a', container='c', obj='o'):
|
account='a', container='c', obj='o', **kwargs):
|
||||||
if metadata is None:
|
if metadata is None:
|
||||||
metadata = {}
|
metadata = {}
|
||||||
metadata.setdefault('name', '/%s/%s/%s' % (account, container, obj))
|
metadata.setdefault('name', '/%s/%s/%s' % (account, container, obj))
|
||||||
df = self._simple_get_diskfile(account=account, container=container,
|
df = self._simple_get_diskfile(account=account, container=container,
|
||||||
obj=obj)
|
obj=obj, **kwargs)
|
||||||
if timestamp is None:
|
if timestamp is None:
|
||||||
timestamp = time()
|
timestamp = time()
|
||||||
timestamp = Timestamp(timestamp)
|
timestamp = Timestamp(timestamp)
|
||||||
@ -2984,6 +2985,16 @@ class DiskFileMixin(BaseDiskFileTestMixin):
|
|||||||
self._create_test_file,
|
self._create_test_file,
|
||||||
'1234567890', metadata={'X-Delete-At': '0'})
|
'1234567890', metadata={'X-Delete-At': '0'})
|
||||||
|
|
||||||
|
try:
|
||||||
|
self._create_test_file('1234567890', open_expired=True,
|
||||||
|
metadata={'X-Delete-At': '0',
|
||||||
|
'X-Object-Meta-Foo': 'bar'})
|
||||||
|
df = self._simple_get_diskfile(open_expired=True)
|
||||||
|
md = df.read_metadata()
|
||||||
|
self.assertEqual(md['X-Object-Meta-Foo'], 'bar')
|
||||||
|
except SwiftException as err:
|
||||||
|
self.fail("Unexpected swift exception raised: %r" % err)
|
||||||
|
|
||||||
def test_open_not_expired(self):
|
def test_open_not_expired(self):
|
||||||
try:
|
try:
|
||||||
self._create_test_file(
|
self._create_test_file(
|
||||||
|
@ -24,7 +24,7 @@ import itertools
|
|||||||
from six.moves import urllib
|
from six.moves import urllib
|
||||||
|
|
||||||
from swift.common.exceptions import DiskFileNotExist, DiskFileError, \
|
from swift.common.exceptions import DiskFileNotExist, DiskFileError, \
|
||||||
DiskFileDeleted
|
DiskFileDeleted, DiskFileExpired
|
||||||
from swift.common import utils
|
from swift.common import utils
|
||||||
from swift.common.storage_policy import POLICIES, EC_POLICY
|
from swift.common.storage_policy import POLICIES, EC_POLICY
|
||||||
from swift.common.utils import Timestamp
|
from swift.common.utils import Timestamp
|
||||||
@ -165,7 +165,9 @@ class TestBaseSsync(BaseTest):
|
|||||||
self.assertNotEqual(v, rx_metadata.pop(k, None))
|
self.assertNotEqual(v, rx_metadata.pop(k, None))
|
||||||
continue
|
continue
|
||||||
else:
|
else:
|
||||||
self.assertEqual(v, rx_metadata.pop(k), k)
|
actual = rx_metadata.pop(k)
|
||||||
|
self.assertEqual(v, actual, 'Expected %r but got %r for %s' %
|
||||||
|
(v, actual, k))
|
||||||
self.assertFalse(rx_metadata)
|
self.assertFalse(rx_metadata)
|
||||||
expected_body = self._get_object_data(tx_df._name,
|
expected_body = self._get_object_data(tx_df._name,
|
||||||
frag_index=frag_index)
|
frag_index=frag_index)
|
||||||
@ -1344,6 +1346,98 @@ class TestSsyncReplication(TestBaseSsync):
|
|||||||
self.assertEqual(metadata['X-Object-Meta-Test'], oname)
|
self.assertEqual(metadata['X-Object-Meta-Test'], oname)
|
||||||
self.assertEqual(metadata['X-Object-Sysmeta-Test'], 'sys_' + oname)
|
self.assertEqual(metadata['X-Object-Sysmeta-Test'], 'sys_' + oname)
|
||||||
|
|
||||||
|
def _check_no_longer_expired_object(self, obj_name, df, policy):
|
||||||
|
# verify that objects with x-delete-at metadata that are not expired
|
||||||
|
# can be sync'd
|
||||||
|
rx_node_index = 0
|
||||||
|
|
||||||
|
def do_ssync():
|
||||||
|
# create ssync sender instance...
|
||||||
|
suffixes = [os.path.basename(os.path.dirname(df._datadir))]
|
||||||
|
job = {'device': self.device,
|
||||||
|
'partition': self.partition,
|
||||||
|
'policy': policy}
|
||||||
|
node = dict(self.rx_node)
|
||||||
|
node.update({'index': rx_node_index})
|
||||||
|
sender = ssync_sender.Sender(self.daemon, node, job, suffixes)
|
||||||
|
# wrap connection from tx to rx to capture ssync messages...
|
||||||
|
sender.connect, trace = self.make_connect_wrapper(sender)
|
||||||
|
|
||||||
|
# run the sync protocol...
|
||||||
|
return sender()
|
||||||
|
|
||||||
|
with self.assertRaises(DiskFileExpired):
|
||||||
|
df.open() # sanity check - expired
|
||||||
|
t1_meta = next(self.ts_iter)
|
||||||
|
df.write_metadata({'X-Timestamp': t1_meta.internal}) # no x-delete-at
|
||||||
|
df.open() # sanity check - no longer expired
|
||||||
|
|
||||||
|
success, in_sync_objs = do_ssync()
|
||||||
|
self.assertEqual(1, len(in_sync_objs))
|
||||||
|
self.assertTrue(success)
|
||||||
|
self._verify_ondisk_files({obj_name: [df]}, policy)
|
||||||
|
|
||||||
|
# update object metadata with x-delete-at in distant future
|
||||||
|
t2_meta = next(self.ts_iter)
|
||||||
|
df.write_metadata({'X-Timestamp': t2_meta.internal,
|
||||||
|
'X-Delete-At': str(int(t2_meta) + 10000)})
|
||||||
|
df.open() # sanity check - not expired
|
||||||
|
|
||||||
|
success, in_sync_objs = do_ssync()
|
||||||
|
self.assertEqual(1, len(in_sync_objs))
|
||||||
|
self.assertTrue(success)
|
||||||
|
self._verify_ondisk_files({obj_name: [df]}, policy)
|
||||||
|
|
||||||
|
# update object metadata with x-delete-at in not so distant future to
|
||||||
|
# check that we can update rx with older x-delete-at than it's current
|
||||||
|
t3_meta = next(self.ts_iter)
|
||||||
|
df.write_metadata({'X-Timestamp': t3_meta.internal,
|
||||||
|
'X-Delete-At': str(int(t2_meta) + 5000)})
|
||||||
|
df.open() # sanity check - not expired
|
||||||
|
|
||||||
|
success, in_sync_objs = do_ssync()
|
||||||
|
self.assertEqual(1, len(in_sync_objs))
|
||||||
|
self.assertTrue(success)
|
||||||
|
self._verify_ondisk_files({obj_name: [df]}, policy)
|
||||||
|
|
||||||
|
def test_no_longer_expired_object_syncs(self):
|
||||||
|
policy = POLICIES.default
|
||||||
|
# simulate o1 that was PUT with x-delete-at that is now expired but
|
||||||
|
# later had a POST that had no x-delete-at: object should not expire.
|
||||||
|
tx_df_mgr = self.daemon._df_router[policy]
|
||||||
|
t1 = next(self.ts_iter)
|
||||||
|
obj_name = 'o1'
|
||||||
|
metadata = {'X-Delete-At': '0', 'Content-Type': 'plain/text'}
|
||||||
|
df = self._make_diskfile(
|
||||||
|
obj=obj_name, body=self._get_object_data('/a/c/%s' % obj_name),
|
||||||
|
extra_metadata=metadata, timestamp=t1, policy=policy,
|
||||||
|
df_mgr=tx_df_mgr, verify=False)
|
||||||
|
|
||||||
|
self._check_no_longer_expired_object(obj_name, df, policy)
|
||||||
|
|
||||||
|
def test_no_longer_expired_object_syncs_meta(self):
|
||||||
|
policy = POLICIES.default
|
||||||
|
# simulate o1 that was PUT with x-delete-at that is now expired but
|
||||||
|
# later had a POST that had no x-delete-at: object should not expire.
|
||||||
|
tx_df_mgr = self.daemon._df_router[policy]
|
||||||
|
rx_df_mgr = self.rx_controller._diskfile_router[policy]
|
||||||
|
t1 = next(self.ts_iter)
|
||||||
|
obj_name = 'o1'
|
||||||
|
metadata = {'X-Delete-At': '0', 'Content-Type': 'plain/text'}
|
||||||
|
df = self._make_diskfile(
|
||||||
|
obj=obj_name, body=self._get_object_data('/a/c/%s' % obj_name),
|
||||||
|
extra_metadata=metadata, timestamp=t1, policy=policy,
|
||||||
|
df_mgr=tx_df_mgr, verify=False)
|
||||||
|
# rx got the .data file but is missing the .meta
|
||||||
|
rx_df = self._make_diskfile(
|
||||||
|
obj=obj_name, body=self._get_object_data('/a/c/%s' % obj_name),
|
||||||
|
extra_metadata=metadata, timestamp=t1, policy=policy,
|
||||||
|
df_mgr=rx_df_mgr, verify=False)
|
||||||
|
with self.assertRaises(DiskFileExpired):
|
||||||
|
rx_df.open() # sanity check - expired
|
||||||
|
|
||||||
|
self._check_no_longer_expired_object(obj_name, df, policy)
|
||||||
|
|
||||||
def test_meta_file_not_synced_to_legacy_receiver(self):
|
def test_meta_file_not_synced_to_legacy_receiver(self):
|
||||||
# verify that the sender does sync a data file to a legacy receiver,
|
# verify that the sender does sync a data file to a legacy receiver,
|
||||||
# but does not PUT meta file content to a legacy receiver
|
# but does not PUT meta file content to a legacy receiver
|
||||||
|
@ -35,6 +35,7 @@ from swift.obj.reconstructor import ObjectReconstructor
|
|||||||
|
|
||||||
from test import listen_zero, unit
|
from test import listen_zero, unit
|
||||||
from test.unit import debug_logger, patch_policies, make_timestamp_iter
|
from test.unit import debug_logger, patch_policies, make_timestamp_iter
|
||||||
|
from test.unit.obj.common import write_diskfile
|
||||||
|
|
||||||
|
|
||||||
@unit.patch_policies()
|
@unit.patch_policies()
|
||||||
@ -665,6 +666,36 @@ class TestReceiver(unittest.TestCase):
|
|||||||
self.assertFalse(self.controller.logger.error.called)
|
self.assertFalse(self.controller.logger.error.called)
|
||||||
self.assertFalse(self.controller.logger.exception.called)
|
self.assertFalse(self.controller.logger.exception.called)
|
||||||
|
|
||||||
|
def test_MISSING_CHECK_missing_meta_expired_data(self):
|
||||||
|
# verify that even when rx disk file has expired x-delete-at, it will
|
||||||
|
# still be opened and checked for missing meta
|
||||||
|
self.controller.logger = mock.MagicMock()
|
||||||
|
ts1 = next(make_timestamp_iter())
|
||||||
|
df = self.controller.get_diskfile(
|
||||||
|
'sda1', '1', self.account1, self.container1, self.object1,
|
||||||
|
POLICIES[0])
|
||||||
|
write_diskfile(df, ts1, extra_metadata={'X-Delete-At': 0})
|
||||||
|
|
||||||
|
# make a request - expect newer metadata to be wanted
|
||||||
|
req = swob.Request.blank(
|
||||||
|
'/sda1/1',
|
||||||
|
environ={'REQUEST_METHOD': 'SSYNC',
|
||||||
|
'HTTP_X_BACKEND_STORAGE_POLICY_INDEX': '0'},
|
||||||
|
body=':MISSING_CHECK: START\r\n' +
|
||||||
|
self.hash1 + ' ' + ts1.internal + ' m:30d40\r\n'
|
||||||
|
':MISSING_CHECK: END\r\n'
|
||||||
|
':UPDATES: START\r\n:UPDATES: END\r\n')
|
||||||
|
resp = req.get_response(self.controller)
|
||||||
|
self.assertEqual(
|
||||||
|
self.body_lines(resp.body),
|
||||||
|
[':MISSING_CHECK: START',
|
||||||
|
'c2519f265f9633e74f9b2fe3b9bec27d m',
|
||||||
|
':MISSING_CHECK: END',
|
||||||
|
':UPDATES: START', ':UPDATES: END'])
|
||||||
|
self.assertEqual(resp.status_int, 200)
|
||||||
|
self.assertFalse(self.controller.logger.error.called)
|
||||||
|
self.assertFalse(self.controller.logger.exception.called)
|
||||||
|
|
||||||
@patch_policies(with_ec_default=True)
|
@patch_policies(with_ec_default=True)
|
||||||
def test_MISSING_CHECK_missing_durable(self):
|
def test_MISSING_CHECK_missing_durable(self):
|
||||||
self.controller.logger = mock.MagicMock()
|
self.controller.logger = mock.MagicMock()
|
||||||
|
Loading…
Reference in New Issue
Block a user