Quarantine objects with busted metadata.
Before, if you encountered an object with corrupt or missing xattrs, the object server would return a 500 on GET, and wouldn't quarantine anything. Now the object server returns a 404 for that GET and the corrupted file is quarantined, thus giving replication a chance to fix it. Change-Id: Ib1d7ab965391742c88fde3d83dc0b5afe85bada9
This commit is contained in:
parent
0717133197
commit
f52d96b904
@ -772,7 +772,7 @@ class DiskFile(object):
|
||||
|
||||
:param data_file: full path of data file to quarantine
|
||||
:param msg: reason for quarantining to be included in the exception
|
||||
:raises DiskFileQuarantine:
|
||||
:raises DiskFileQuarantined:
|
||||
"""
|
||||
self._quarantined_dir = self._threadpool.run_in_thread(
|
||||
quarantine_renamer, self._device_path, data_file)
|
||||
@ -849,13 +849,19 @@ class DiskFile(object):
|
||||
if not ts_file:
|
||||
exc = DiskFileNotExist()
|
||||
else:
|
||||
metadata = read_metadata(ts_file)
|
||||
# All well and good that we have found a tombstone file, but
|
||||
# we don't have a data file so we are just going to raise an
|
||||
# exception that we could not find the object, providing the
|
||||
# tombstone's timestamp.
|
||||
exc = DiskFileDeleted()
|
||||
exc.timestamp = metadata['X-Timestamp']
|
||||
try:
|
||||
metadata = self._failsafe_read_metadata(ts_file, ts_file)
|
||||
except DiskFileQuarantined:
|
||||
# If the tombstone's corrupted, quarantine it and pretend it
|
||||
# wasn't there
|
||||
exc = DiskFileNotExist()
|
||||
else:
|
||||
# All well and good that we have found a tombstone file, but
|
||||
# we don't have a data file so we are just going to raise an
|
||||
# exception that we could not find the object, providing the
|
||||
# tombstone's timestamp.
|
||||
exc = DiskFileDeleted()
|
||||
exc.timestamp = metadata['X-Timestamp']
|
||||
return exc
|
||||
|
||||
def _verify_data_file(self, data_file, fp):
|
||||
@ -925,6 +931,15 @@ class DiskFile(object):
|
||||
metadata_size, statbuf.st_size))
|
||||
return obj_size
|
||||
|
||||
def _failsafe_read_metadata(self, source, quarantine_filename=None):
|
||||
# Takes source and filename separately so we can read from an open
|
||||
# file if we have one
|
||||
try:
|
||||
return read_metadata(source)
|
||||
except Exception as err:
|
||||
self._quarantine(quarantine_filename,
|
||||
"Exception reading metadata: %s" % err.message)
|
||||
|
||||
def _construct_from_data_file(self, data_file, meta_file):
|
||||
"""
|
||||
Open the `.data` file to fetch its metadata, and fetch the metadata
|
||||
@ -938,9 +953,9 @@ class DiskFile(object):
|
||||
:func:`swift.obj.diskfile.DiskFile._verify_data_file`
|
||||
"""
|
||||
fp = open(data_file, 'rb')
|
||||
datafile_metadata = read_metadata(fp)
|
||||
datafile_metadata = self._failsafe_read_metadata(fp, data_file)
|
||||
if meta_file:
|
||||
self._metadata = read_metadata(meta_file)
|
||||
self._metadata = self._failsafe_read_metadata(meta_file, meta_file)
|
||||
sys_metadata = dict(
|
||||
[(key, val) for key, val in datafile_metadata.iteritems()
|
||||
if key.lower() in DATAFILE_SYSTEM_META])
|
||||
|
@ -25,6 +25,7 @@ import mock
|
||||
import unittest
|
||||
import email
|
||||
import tempfile
|
||||
import xattr
|
||||
from shutil import rmtree
|
||||
from time import time
|
||||
from tempfile import mkdtemp
|
||||
@ -34,7 +35,6 @@ from gzip import GzipFile
|
||||
|
||||
from eventlet import tpool
|
||||
from test.unit import FakeLogger, mock as unit_mock
|
||||
from test.unit import _setxattr as setxattr
|
||||
|
||||
from swift.obj import diskfile
|
||||
from swift.common import utils
|
||||
@ -384,8 +384,8 @@ class TestDiskFile(unittest.TestCase):
|
||||
data_file = os.path.join(df._datadir, timestamp + ext)
|
||||
with open(data_file, 'wb') as f:
|
||||
f.write(data)
|
||||
setxattr(f.fileno(), diskfile.METADATA_KEY,
|
||||
pickle.dumps(metadata, diskfile.PICKLE_PROTOCOL))
|
||||
xattr.setxattr(f.fileno(), diskfile.METADATA_KEY,
|
||||
pickle.dumps(metadata, diskfile.PICKLE_PROTOCOL))
|
||||
|
||||
def _create_test_file(self, data, timestamp=None, metadata=None):
|
||||
df = self.df_mgr.get_diskfile('sda', '0', 'a', 'c', 'o')
|
||||
@ -552,19 +552,35 @@ class TestDiskFile(unittest.TestCase):
|
||||
etag = etag.hexdigest()
|
||||
metadata['ETag'] = etag
|
||||
diskfile.write_metadata(writer._fd, metadata)
|
||||
if invalid_type == 'Content-Length':
|
||||
elif invalid_type == 'Content-Length':
|
||||
metadata['Content-Length'] = fsize - 1
|
||||
diskfile.write_metadata(writer._fd, metadata)
|
||||
if invalid_type == 'Bad-Content-Length':
|
||||
elif invalid_type == 'Bad-Content-Length':
|
||||
metadata['Content-Length'] = 'zero'
|
||||
diskfile.write_metadata(writer._fd, metadata)
|
||||
if invalid_type == 'Missing-Content-Length':
|
||||
elif invalid_type == 'Missing-Content-Length':
|
||||
del metadata['Content-Length']
|
||||
diskfile.write_metadata(writer._fd, metadata)
|
||||
|
||||
if mark_deleted:
|
||||
df.delete(timestamp)
|
||||
|
||||
data_files = [os.path.join(df._datadir, fname)
|
||||
for fname in sorted(os.listdir(df._datadir),
|
||||
reverse=True)
|
||||
if fname.endswith('.data')]
|
||||
if invalid_type == 'Corrupt-Xattrs':
|
||||
# We have to go below read_metadata/write_metadata to get proper
|
||||
# corruption.
|
||||
meta_xattr = xattr.getxattr(data_files[0], "user.swift.metadata")
|
||||
wrong_byte = 'X' if meta_xattr[0] != 'X' else 'Y'
|
||||
xattr.setxattr(data_files[0], "user.swift.metadata",
|
||||
wrong_byte + meta_xattr[1:])
|
||||
elif invalid_type == 'Truncated-Xattrs':
|
||||
meta_xattr = xattr.getxattr(data_files[0], "user.swift.metadata")
|
||||
xattr.setxattr(data_files[0], "user.swift.metadata",
|
||||
meta_xattr[:-1])
|
||||
|
||||
self.conf['disk_chunk_size'] = csize
|
||||
self.conf['mount_check'] = mount_check
|
||||
self.df_mgr = diskfile.DiskFileManager(self.conf, FakeLogger())
|
||||
@ -638,7 +654,8 @@ class TestDiskFile(unittest.TestCase):
|
||||
def run_quarantine_invalids(self, invalid_type):
|
||||
|
||||
def verify(*args, **kwargs):
|
||||
open_exc = invalid_type in ('Content-Length', 'Bad-Content-Length')
|
||||
open_exc = invalid_type in ('Content-Length', 'Bad-Content-Length',
|
||||
'Corrupt-Xattrs', 'Truncated-Xattrs')
|
||||
reader = None
|
||||
try:
|
||||
df = self._get_open_disk_file(**kwargs)
|
||||
@ -670,7 +687,8 @@ class TestDiskFile(unittest.TestCase):
|
||||
|
||||
def verify_air(params, start=0, adjustment=0):
|
||||
"""verify (a)pp (i)ter (r)ange"""
|
||||
open_exc = invalid_type in ('Content-Length', 'Bad-Content-Length')
|
||||
open_exc = invalid_type in ('Content-Length', 'Bad-Content-Length',
|
||||
'Corrupt-Xattrs', 'Truncated-Xattrs')
|
||||
reader = None
|
||||
try:
|
||||
df = self._get_open_disk_file(**params)
|
||||
@ -701,6 +719,12 @@ class TestDiskFile(unittest.TestCase):
|
||||
|
||||
verify_air(dict(invalid_type=invalid_type, obj_name='8'), 1, 1)
|
||||
|
||||
def test_quarantine_corrupt_xattrs(self):
|
||||
self.run_quarantine_invalids('Corrupt-Xattrs')
|
||||
|
||||
def test_quarantine_truncated_xattrs(self):
|
||||
self.run_quarantine_invalids('Truncated-Xattrs')
|
||||
|
||||
def test_quarantine_invalid_etag(self):
|
||||
self.run_quarantine_invalids('ETag')
|
||||
|
||||
@ -804,6 +828,25 @@ class TestDiskFile(unittest.TestCase):
|
||||
df = self.df_mgr.get_diskfile('sda1', '0', 'a', 'c', 'o')
|
||||
self.assertRaises(DiskFileDeleted, df.open)
|
||||
|
||||
def test_open_deleted_with_corrupt_tombstone(self):
|
||||
df = self._get_open_disk_file()
|
||||
ts = time()
|
||||
df.delete(ts)
|
||||
exp_name = '%s.ts' % str(normalize_timestamp(ts))
|
||||
dl = os.listdir(df._datadir)
|
||||
self.assertEquals(len(dl), 1)
|
||||
self.assertTrue(exp_name in set(dl))
|
||||
# it's pickle-format, so removing the last byte is sufficient to
|
||||
# corrupt it
|
||||
ts_fullpath = os.path.join(df._datadir, exp_name)
|
||||
self.assertTrue(os.path.exists(ts_fullpath)) # sanity check
|
||||
meta_xattr = xattr.getxattr(ts_fullpath, "user.swift.metadata")
|
||||
xattr.setxattr(ts_fullpath, "user.swift.metadata", meta_xattr[:-1])
|
||||
|
||||
df = self.df_mgr.get_diskfile('sda1', '0', 'a', 'c', 'o')
|
||||
self.assertRaises(DiskFileNotExist, df.open)
|
||||
self.assertFalse(os.path.exists(ts_fullpath))
|
||||
|
||||
def test_close_error(self):
|
||||
|
||||
def mock_handle_close_quarantine():
|
||||
|
Loading…
x
Reference in New Issue
Block a user