Add validation method for metadata in ECDiskfile

Historically, we've allowed objects to get on disk that may not have had
all of their required EC metadata. Add a new method to sanity-check
metadata in the auditor, and quarantine such invalid data.

Additionally, call validation early in the reconstructor's
reconstruct_fa() method so we do not have to attempt reconstruction for
invalid EC fragments.

Change-Id: I73551007843d27041f594923c59e6fd89caf17e5
This commit is contained in:
indianwhocodes 2022-08-16 09:19:55 -07:00 committed by Tim Burke
parent d7a931191b
commit 745a7f04fe
7 changed files with 33 additions and 0 deletions

View File

@ -258,6 +258,10 @@ class AuditorWorker(object):
try:
with df.open(modernize=True):
metadata = df.get_metadata()
if not df.validate_metadata():
df._quarantine(
df._data_file,
"Metadata failed validation")
obj_size = int(metadata['Content-Length'])
if self.stats_sizes:
self.record_stats(obj_size)

View File

@ -2689,6 +2689,9 @@ class BaseDiskFile(object):
exc = DiskFileDeleted(metadata=metadata)
return exc
def validate_metadata(self):
return ('Content-Length' in self._datafile_metadata)
def _verify_name_matches_hash(self, data_file):
"""
@ -3357,6 +3360,17 @@ class ECDiskFile(BaseDiskFile):
raise DiskFileError(
'Bad frag_prefs: %r: %s' % (frag_prefs, e))
def validate_metadata(self):
required_metadata = [
'Content-Length',
'X-Object-Sysmeta-Ec-Frag-Index',
'X-Object-Sysmeta-Ec-Etag',
]
for header in required_metadata:
if not self._datafile_metadata.get(header):
return False
return True
@property
def durable_timestamp(self):
"""

View File

@ -653,6 +653,9 @@ class ObjectReconstructor(Daemon):
# of the node we're rebuilding to within the primary part list
fi_to_rebuild = node['backend_index']
datafile_metadata = df.get_datafile_metadata()
if not df.validate_metadata():
raise df._quarantine(
df._data_file, "Invalid fragment #%s" % df._frag_index)
local_timestamp = Timestamp(datafile_metadata['X-Timestamp'])
path = datafile_metadata['name']

View File

@ -38,6 +38,7 @@ def write_diskfile(df, timestamp, data=b'test data', frag_index=None,
metadata.update(extra_metadata)
if frag_index is not None:
metadata['X-Object-Sysmeta-Ec-Frag-Index'] = str(frag_index)
metadata['X-Object-Sysmeta-Ec-Etag'] = 'fake-etag'
writer.put(metadata)
if commit and legacy_durable:
# simulate legacy .durable file creation

View File

@ -224,6 +224,11 @@ class TestAuditor(TestAuditorBase):
'X-Timestamp': timestamp,
'Content-Length': str(os.fstat(writer._fd).st_size),
}
if disk_file.policy.policy_type == EC_POLICY:
metadata.update({
'X-Object-Sysmeta-Ec-Frag-Index': '1',
'X-Object-Sysmeta-Ec-Etag': 'fake-etag',
})
writer.put(metadata)
writer.commit(Timestamp(timestamp))
pre_quarantines = auditor_worker.quarantines
@ -368,6 +373,8 @@ class TestAuditor(TestAuditorBase):
'ETag': etag,
'X-Timestamp': timestamp,
'Content-Length': len(data),
'X-Object-Sysmeta-Ec-Frag-Index': '1',
'X-Object-Sysmeta-Ec-Etag': 'fake-etag',
}
writer.put(metadata)
writer.commit(Timestamp(timestamp))
@ -1639,6 +1646,8 @@ class TestAuditWatchers(TestAuditorBase):
'X-Timestamp': timestamp.internal,
'Content-Length': str(len(frag_0)),
'X-Object-Meta-Flavor': 'peach',
'X-Object-Sysmeta-Ec-Frag-Index': '1',
'X-Object-Sysmeta-Ec-Etag': 'fake-etag',
}
writer.put(metadata)
writer.commit(timestamp)

View File

@ -6381,6 +6381,7 @@ class TestECDiskFile(DiskFileMixin, unittest.TestCase):
'ETag': md5('test data').hexdigest(),
'X-Timestamp': ts.internal,
'Content-Length': str(len('test data')),
'X-Object-Sysmeta-Ec-Etag': 'fake-etag',
'X-Object-Sysmeta-Ec-Frag-Index': '3',
}

View File

@ -129,6 +129,7 @@ class TestBaseSsync(BaseTest):
frag_index=frag_index)
if policy.policy_type == EC_POLICY:
metadata['X-Object-Sysmeta-Ec-Frag-Index'] = str(frag_index)
metadata['X-Object-Sysmeta-Ec-Etag'] = 'fake-etag'
df = self._make_diskfile(
device=self.device, partition=self.partition, account='a',
container='c', obj=obj_name, body=object_data,