Make ECDiskFile report all fragments found on disk

Refactor the disk file get_ondisk_files logic to enable
ECDiskfile to gather *all* fragments found on disk (not just those
with a matching .durable file) and make the fragments available
via the DiskFile interface as a dict mapping:

    Timestamp --> list of fragment indexes

Also, if a durable fragment has been found then the timestamp
of the durable file is exposed via the diskfile interface.

Co-Authored-By: Clay Gerrard <clay.gerrard@gmail.com>
Change-Id: I55e20a999685b94023d47b231d51007045ac920e
This commit is contained in:
Alistair Coles 2015-10-05 16:15:29 +01:00 committed by Alistair Coles
parent ea3a0d38c1
commit 60b2e02905
6 changed files with 615 additions and 338 deletions

View File

@ -831,6 +831,9 @@ class Timestamp(object):
other = Timestamp(other)
return cmp(self.internal, other.internal)
def __hash__(self):
return hash(self.internal)
def normalize_timestamp(timestamp):
"""

View File

@ -460,92 +460,175 @@ class BaseDiskFileManager(object):
"""
raise NotImplementedError
def _gather_on_disk_file(self, filename, ext, context, frag_index=None,
**kwargs):
def _process_ondisk_files(self, exts, results, **kwargs):
"""
Called by gather_ondisk_files() for each file in an object
datadir in reverse sorted order. If a file is considered part of a
valid on-disk file set it will be added to the context dict, keyed by
its extension. If a file is considered to be obsolete it will be added
to a list stored under the key 'obsolete' in the context dict.
Called by get_ondisk_files(). Should be over-ridden to implement
subclass specific handling of files.
:param filename: name of file to be accepted or not
:param ext: extension part of filename
:param context: a context dict that may have been populated by previous
calls to this method
:returns: True if a valid file set has been found, False otherwise
:param exts: dict of lists of file info, keyed by extension
:param results: a dict that may be updated with results
"""
raise NotImplementedError
def _verify_on_disk_files(self, accepted_files, **kwargs):
def _verify_ondisk_files(self, results, **kwargs):
"""
Verify that the final combination of on disk files complies with the
diskfile contract.
:param accepted_files: files that have been found and accepted
:param results: files that have been found and accepted
:returns: True if the file combination is compliant, False otherwise
"""
raise NotImplementedError
data_file, meta_file, ts_file = tuple(
[results[key]
for key in ('data_file', 'meta_file', 'ts_file')])
def gather_ondisk_files(self, files, include_obsolete=False,
verify=False, **kwargs):
return ((data_file is None and meta_file is None and ts_file is None)
or (ts_file is not None and data_file is None
and meta_file is None)
or (data_file is not None and ts_file is None))
def _split_list(self, original_list, condition):
"""
Given a simple list of files names, iterate over them to determine the
files that constitute a valid object, and optionally determine the
files that are obsolete and could be deleted. Note that some files may
fall into neither category.
Split a list into two lists. The first list contains the first N items
of the original list, in their original order, where 0 < N <=
len(original list). The second list contains the remaining items of the
original list, in their original order.
The index, N, at which the original list is split is the index of the
first item in the list that does not satisfy the given condition. Note
that the original list should be appropriately sorted if the second
list is to contain no items that satisfy the given condition.
:param original_list: the list to be split.
:param condition: a single argument function that will be used to test
for the list item to split on.
:return: a tuple of two lists.
"""
for i, item in enumerate(original_list):
if not condition(item):
return original_list[:i], original_list[i:]
return original_list, []
def _split_gt_timestamp(self, file_info_list, timestamp):
"""
Given a list of file info dicts, reverse sorted by timestamp, split the
list into two: items newer than timestamp, and items at same time or
older than timestamp.
:param file_info_list: a list of file_info dicts.
:param timestamp: a Timestamp.
:return: a tuple of two lists.
"""
return self._split_list(
file_info_list, lambda x: x['timestamp'] > timestamp)
def _split_gte_timestamp(self, file_info_list, timestamp):
"""
Given a list of file info dicts, reverse sorted by timestamp, split the
list into two: items newer than or at same time as the timestamp, and
items older than timestamp.
:param file_info_list: a list of file_info dicts.
:param timestamp: a Timestamp.
:return: a tuple of two lists.
"""
return self._split_list(
file_info_list, lambda x: x['timestamp'] >= timestamp)
def get_ondisk_files(self, files, datadir, verify=True, **kwargs):
"""
Given a simple list of files names, determine the files that constitute
a valid fileset i.e. a set of files that defines the state of an
object, and determine the files that are obsolete and could be deleted.
Note that some files may fall into neither category.
If a file is considered part of a valid fileset then its info dict will
be added to the results dict, keyed by <extension>_info. Any files that
are no longer required will have their info dicts added to a list
stored under the key 'obsolete'.
The results dict will always contain entries with keys 'ts_file',
'data_file' and 'meta_file'. Their values will be the fully qualified
path to a file of the corresponding type if there is such a file in the
valid fileset, or None.
:param files: a list of file names.
:param include_obsolete: By default the iteration will stop when a
valid file set has been found. Setting this
argument to True will cause the iteration to
continue in order to find all obsolete files.
:param datadir: directory name files are from.
:param verify: if True verify that the ondisk file contract has not
been violated, otherwise do not verify.
:returns: a dict that may contain: valid on disk files keyed by their
filename extension; a list of obsolete files stored under the
key 'obsolete'.
:returns: a dict that will contain keys:
ts_file -> path to a .ts file or None
data_file -> path to a .data file or None
meta_file -> path to a .meta file or None
and may contain keys:
ts_info -> a file info dict for a .ts file
data_info -> a file info dict for a .data file
meta_info -> a file info dict for a .meta file
obsolete -> a list of file info dicts for obsolete files
"""
files.sort(reverse=True)
results = {}
# Build the exts data structure:
# exts is a dict that maps file extensions to a list of file_info
# dicts for the files having that extension. The file_info dicts are of
# the form returned by parse_on_disk_filename, with the filename added.
# Each list is sorted in reverse timestamp order.
#
# The exts dict will be modified during subsequent processing as files
# are removed to be discarded or ignored.
exts = defaultdict(list)
for afile in files:
ts_file = results.get('.ts')
data_file = results.get('.data')
if not include_obsolete:
assert ts_file is None, "On-disk file search loop" \
" continuing after tombstone, %s, encountered" % ts_file
assert data_file is None, "On-disk file search loop" \
" continuing after data file, %s, encountered" % data_file
# Categorize files by extension
try:
file_info = self.parse_on_disk_filename(afile)
file_info['filename'] = afile
exts[file_info['ext']].append(file_info)
except DiskFileError as e:
self.logger.warning('Unexpected file %s: %s' %
(os.path.join(datadir or '', afile), e))
for ext in exts:
# For each extension sort files into reverse chronological order.
exts[ext] = sorted(
exts[ext], key=lambda info: info['timestamp'], reverse=True)
ext = splitext(afile)[1]
if self._gather_on_disk_file(
afile, ext, results, **kwargs):
if not include_obsolete:
break
# the results dict is used to collect results of file filtering
results = {}
# non-tombstones older than or equal to latest tombstone are obsolete
if exts.get('.ts'):
for ext in filter(lambda ext: ext != '.ts', exts.keys()):
exts[ext], older = self._split_gt_timestamp(
exts[ext], exts['.ts'][0]['timestamp'])
results.setdefault('obsolete', []).extend(older)
# all but most recent .meta and .ts are obsolete
for ext in ('.meta', '.ts'):
if ext in exts:
results.setdefault('obsolete', []).extend(exts[ext][1:])
exts[ext] = exts[ext][:1]
# delegate to subclass handler
self._process_ondisk_files(exts, results, **kwargs)
# set final choice of files
if exts.get('.ts'):
results['ts_info'] = exts['.ts'][0]
if 'data_info' in results and exts.get('.meta'):
# only report a meta file if there is a data file
results['meta_info'] = exts['.meta'][0]
# set ts_file, data_file and meta_file with path to chosen file or None
for info_key in ('data_info', 'meta_info', 'ts_info'):
info = results.get(info_key)
key = info_key[:-5] + '_file'
results[key] = join(datadir, info['filename']) if info else None
if verify:
assert self._verify_on_disk_files(
assert self._verify_ondisk_files(
results, **kwargs), \
"On-disk file search algorithm contract is broken: %s" \
% results.values()
% str(results)
return results
def get_ondisk_files(self, files, datadir, **kwargs):
"""
Given a simple list of files names, determine the files to use.
:param files: simple set of files as a python list
:param datadir: directory name files are from for convenience
:returns: dict of files to use having keys 'data_file', 'ts_file',
'meta_file' and optionally other policy specific keys
"""
file_info = self.gather_ondisk_files(files, verify=True, **kwargs)
for ext in ('.data', '.meta', '.ts'):
filename = file_info.get(ext)
key = '%s_file' % ext[1:]
file_info[key] = join(datadir, filename) if filename else None
return file_info
def cleanup_ondisk_files(self, hsh_path, reclaim_age=ONE_WEEK, **kwargs):
"""
Clean up on-disk files that are obsolete and gather the set of valid
@ -560,27 +643,24 @@ class BaseDiskFileManager(object):
key 'obsolete'; a list of files remaining in the directory,
reverse sorted, stored under the key 'files'.
"""
def is_reclaimable(filename):
timestamp = self.parse_on_disk_filename(filename)['timestamp']
def is_reclaimable(timestamp):
return (time.time() - float(timestamp)) > reclaim_age
files = listdir(hsh_path)
files.sort(reverse=True)
results = self.gather_ondisk_files(files, include_obsolete=True,
**kwargs)
# TODO ref to durables here
if '.durable' in results and not results.get('fragments'):
# a .durable with no .data is deleted as soon as it is found
results.setdefault('obsolete', []).append(results.pop('.durable'))
if '.ts' in results and is_reclaimable(results['.ts']):
results.setdefault('obsolete', []).append(results.pop('.ts'))
for filename in results.get('fragments_without_durable', []):
results = self.get_ondisk_files(
files, hsh_path, verify=False, **kwargs)
if 'ts_info' in results and is_reclaimable(
results['ts_info']['timestamp']):
remove_file(join(hsh_path, results['ts_info']['filename']))
files.remove(results.pop('ts_info')['filename'])
for file_info in results.get('possible_reclaim', []):
# stray fragments are not deleted until reclaim-age
if is_reclaimable(filename):
results.setdefault('obsolete', []).append(filename)
for filename in results.get('obsolete', []):
remove_file(join(hsh_path, filename))
files.remove(filename)
if is_reclaimable(file_info['timestamp']):
results.setdefault('obsolete', []).append(file_info)
for file_info in results.get('obsolete', []):
remove_file(join(hsh_path, file_info['filename']))
files.remove(file_info['filename'])
results['files'] = files
return results
@ -915,9 +995,9 @@ class BaseDiskFileManager(object):
(os.path.join(partition_path, suffix), suffix)
for suffix in suffixes)
key_preference = (
('ts_meta', '.meta'),
('ts_data', '.data'),
('ts_data', '.ts'),
('ts_meta', 'meta_info'),
('ts_data', 'data_info'),
('ts_data', 'ts_info'),
)
for suffix_path, suffix in suffixes:
for object_hash in self._listdir(suffix_path):
@ -926,14 +1006,13 @@ class BaseDiskFileManager(object):
results = self.cleanup_ondisk_files(
object_path, self.reclaim_age, **kwargs)
timestamps = {}
for ts_key, ext in key_preference:
if ext not in results:
for ts_key, info_key in key_preference:
if info_key not in results:
continue
timestamps[ts_key] = self.parse_on_disk_filename(
results[ext])['timestamp']
timestamps[ts_key] = results[info_key]['timestamp']
if 'ts_data' not in timestamps:
# file sets that do not include a .data or .ts
# file can not be opened and therefore can not
# file cannot be opened and therefore cannot
# be ssync'd
continue
yield (object_path, object_hash, timestamps)
@ -1430,6 +1509,7 @@ class BaseDiskFile(object):
self._obj = None
self._datadir = None
self._tmpdir = join(device_path, get_tmp_dir(policy))
self._ondisk_info = None
self._metadata = None
self._datafile_metadata = None
self._metafile_metadata = None
@ -1479,6 +1559,26 @@ class BaseDiskFile(object):
raise DiskFileNotOpen()
return Timestamp(self._datafile_metadata.get('X-Timestamp'))
@property
def durable_timestamp(self):
"""
Provides the timestamp of the newest data file found in the object
directory.
:return: A Timestamp instance, or None if no data file was found.
:raises DiskFileNotOpen: if the open() method has not been previously
called on this instance.
"""
if self._ondisk_info is None:
raise DiskFileNotOpen()
if self._datafile_metadata:
return Timestamp(self._datafile_metadata.get('X-Timestamp'))
return None
@property
def fragments(self):
return None
@classmethod
def from_hash_dir(cls, mgr, hash_dir_path, device_path, partition, policy):
return cls(mgr, device_path, None, partition, _datadir=hash_dir_path,
@ -1524,8 +1624,8 @@ class BaseDiskFile(object):
# The data directory does not exist, so the object cannot exist.
files = []
# gather info about the valid files to us to open the DiskFile
file_info = self._get_ondisk_file(files)
# gather info about the valid files to use to open the DiskFile
file_info = self._get_ondisk_files(files)
self._data_file = file_info.get('data_file')
if not self._data_file:
@ -1579,7 +1679,7 @@ class BaseDiskFile(object):
self._logger.increment('quarantines')
return DiskFileQuarantined(msg)
def _get_ondisk_file(self, files):
def _get_ondisk_files(self, files):
"""
Determine the on-disk files to use.
@ -1950,8 +2050,9 @@ class DiskFile(BaseDiskFile):
reader_cls = DiskFileReader
writer_cls = DiskFileWriter
def _get_ondisk_file(self, files):
return self.manager.get_ondisk_files(files, self._datadir)
def _get_ondisk_files(self, files):
self._ondisk_info = self.manager.get_ondisk_files(files, self._datadir)
return self._ondisk_info
@DiskFileRouter.register(REPL_POLICY)
@ -1967,89 +2068,44 @@ class DiskFileManager(BaseDiskFileManager):
* timestamp is a :class:`~swift.common.utils.Timestamp`
* ext is a string, the file extension including the leading dot or
the empty string if the filename has no extenstion.
the empty string if the filename has no extension.
:raises DiskFileError: if any part of the filename is not able to be
validated.
"""
filename, ext = splitext(filename)
float_part, ext = splitext(filename)
try:
timestamp = Timestamp(float_part)
except ValueError:
raise DiskFileError('Invalid Timestamp value in filename %r'
% filename)
return {
'timestamp': Timestamp(filename),
'timestamp': timestamp,
'ext': ext,
}
def _gather_on_disk_file(self, filename, ext, context, frag_index=None,
**kwargs):
def _process_ondisk_files(self, exts, results, **kwargs):
"""
Called by gather_ondisk_files() for each file in an object
datadir in reverse sorted order. If a file is considered part of a
valid on-disk file set it will be added to the context dict, keyed by
its extension. If a file is considered to be obsolete it will be added
to a list stored under the key 'obsolete' in the context dict.
Implement replication policy specific handling of .data files.
:param filename: name of file to be accepted or not
:param ext: extension part of filename
:param context: a context dict that may have been populated by previous
calls to this method
:returns: True if a valid file set has been found, False otherwise
:param exts: dict of lists of file info, keyed by extension
:param results: a dict that may be updated with results
"""
if exts.get('.data'):
for ext in exts.keys():
if ext == '.data':
# older .data's are obsolete
exts[ext], obsolete = self._split_gte_timestamp(
exts[ext], exts['.data'][0]['timestamp'])
else:
# other files at same or older timestamp as most recent
# data are obsolete
exts[ext], obsolete = self._split_gt_timestamp(
exts[ext], exts['.data'][0]['timestamp'])
results.setdefault('obsolete', []).extend(obsolete)
# if first file with given extension then add filename to context
# dict and return True
accept_first = lambda: context.setdefault(ext, filename) == filename
# add the filename to the list of obsolete files in context dict
discard = lambda: context.setdefault('obsolete', []).append(filename)
# set a flag in the context dict indicating that a valid fileset has
# been found
set_valid_fileset = lambda: context.setdefault('found_valid', True)
# return True if the valid fileset flag is set in the context dict
have_valid_fileset = lambda: context.get('found_valid')
if ext == '.data':
if have_valid_fileset():
# valid fileset means we must have a newer
# .data or .ts, so discard the older .data file
discard()
else:
accept_first()
set_valid_fileset()
elif ext == '.ts':
if have_valid_fileset() or not accept_first():
# newer .data or .ts already found so discard this
discard()
if not have_valid_fileset():
# remove any .meta that may have been previously found
context.pop('.meta', None)
set_valid_fileset()
elif ext == '.meta':
if have_valid_fileset() or not accept_first():
# newer .data, .durable or .ts already found so discard this
discard()
else:
# ignore unexpected files
pass
return have_valid_fileset()
def _verify_on_disk_files(self, accepted_files, **kwargs):
"""
Verify that the final combination of on disk files complies with the
replicated diskfile contract.
:param accepted_files: files that have been found and accepted
:returns: True if the file combination is compliant, False otherwise
"""
# mimic legacy behavior - .meta is ignored when .ts is found
if accepted_files.get('.ts'):
accepted_files.pop('.meta', None)
data_file, meta_file, ts_file, durable_file = tuple(
[accepted_files.get(ext)
for ext in ('.data', '.meta', '.ts', '.durable')])
return ((data_file is None and meta_file is None and ts_file is None)
or (ts_file is not None and data_file is None
and meta_file is None)
or (data_file is not None and ts_file is None))
# set results
results['data_info'] = exts['.data'][0]
def _hash_suffix(self, path, reclaim_age):
"""
@ -2153,14 +2209,47 @@ class ECDiskFile(BaseDiskFile):
if frag_index is not None:
self._frag_index = self.manager.validate_fragment_index(frag_index)
def _get_ondisk_file(self, files):
@property
def durable_timestamp(self):
"""
Provides the timestamp of the newest durable file found in the object
directory.
:return: A Timestamp instance, or None if no durable file was found.
:raises DiskFileNotOpen: if the open() method has not been previously
called on this instance.
"""
if self._ondisk_info is None:
raise DiskFileNotOpen()
if self._ondisk_info.get('durable_frag_set'):
return self._ondisk_info['durable_frag_set'][0]['timestamp']
return None
@property
def fragments(self):
"""
Provides information about all fragments that were found in the object
directory, including fragments without a matching durable file, and
including any fragment chosen to construct the opened diskfile.
:return: A dict mapping <Timestamp instance> -> <list of frag indexes>,
or None if the diskfile has not been opened or no fragments
were found.
"""
if self._ondisk_info:
frag_sets = self._ondisk_info['frag_sets']
return dict([(ts, [info['frag_index'] for info in frag_set])
for ts, frag_set in frag_sets.items()])
def _get_ondisk_files(self, files):
"""
The only difference between this method and the replication policy
DiskFile method is passing in the frag_index kwarg to our manager's
get_ondisk_files method.
"""
return self.manager.get_ondisk_files(
self._ondisk_info = self.manager.get_ondisk_files(
files, self._datadir, frag_index=self._frag_index)
return self._ondisk_info
def purge(self, timestamp, frag_index):
"""
@ -2254,9 +2343,13 @@ class ECDiskFileManager(BaseDiskFileManager):
validated.
"""
frag_index = None
filename, ext = splitext(filename)
parts = filename.split('#', 1)
timestamp = parts[0]
float_frag, ext = splitext(filename)
parts = float_frag.split('#', 1)
try:
timestamp = Timestamp(parts[0])
except ValueError:
raise DiskFileError('Invalid Timestamp value in filename %r'
% filename)
if ext == '.data':
# it is an error for an EC data file to not have a valid
# fragment index
@ -2267,137 +2360,94 @@ class ECDiskFileManager(BaseDiskFileManager):
pass
frag_index = self.validate_fragment_index(frag_index)
return {
'timestamp': Timestamp(timestamp),
'timestamp': timestamp,
'frag_index': frag_index,
'ext': ext,
}
def is_obsolete(self, filename, other_filename):
def _process_ondisk_files(self, exts, results, frag_index=None, **kwargs):
"""
Test if a given file is considered to be obsolete with respect to
another file in an object storage dir.
Implement EC policy specific handling of .data and .durable files.
Implements EC policy specific behavior when comparing files against a
.durable file.
A simple string comparison would consider t2#1.data to be older than
t2.durable (since t2#1.data < t2.durable). By stripping off the file
extensions we get the desired behavior: t2#1 > t2 without compromising
the detection of t1#1 < t2.
:param filename: a string representing an absolute filename
:param other_filename: a string representing an absolute filename
:returns: True if filename is considered obsolete, False otherwise.
"""
if other_filename.endswith('.durable'):
return splitext(filename)[0] < splitext(other_filename)[0]
return filename < other_filename
def _gather_on_disk_file(self, filename, ext, context, frag_index=None,
**kwargs):
"""
Called by gather_ondisk_files() for each file in an object
datadir in reverse sorted order. If a file is considered part of a
valid on-disk file set it will be added to the context dict, keyed by
its extension. If a file is considered to be obsolete it will be added
to a list stored under the key 'obsolete' in the context dict.
:param filename: name of file to be accepted or not
:param ext: extension part of filename
:param context: a context dict that may have been populated by previous
calls to this method
:param exts: dict of lists of file info, keyed by extension
:param results: a dict that may be updated with results
:param frag_index: if set, search for a specific fragment index .data
file, otherwise accept the first valid .data file.
:returns: True if a valid file set has been found, False otherwise
"""
durable_info = None
if exts.get('.durable'):
durable_info = exts['.durable'][0]
# Mark everything older than most recent .durable as obsolete
# and remove from the exts dict.
for ext in exts.keys():
exts[ext], older = self._split_gte_timestamp(
exts[ext], durable_info['timestamp'])
results.setdefault('obsolete', []).extend(older)
# if first file with given extension then add filename to context
# dict and return True
accept_first = lambda: context.setdefault(ext, filename) == filename
# add the filename to the list of obsolete files in context dict
discard = lambda: context.setdefault('obsolete', []).append(filename)
# set a flag in the context dict indicating that a valid fileset has
# been found
set_valid_fileset = lambda: context.setdefault('found_valid', True)
# return True if the valid fileset flag is set in the context dict
have_valid_fileset = lambda: context.get('found_valid')
# Split the list of .data files into sets of frags having the same
# timestamp, identifying the durable and newest sets (if any) as we go.
# To do this we can take advantage of the list of .data files being
# reverse-time ordered. Keep the resulting per-timestamp frag sets in
# a frag_sets dict mapping a Timestamp instance -> frag_set.
all_frags = exts.get('.data')
frag_sets = {}
durable_frag_set = None
while all_frags:
frag_set, all_frags = self._split_gte_timestamp(
all_frags, all_frags[0]['timestamp'])
# sort the frag set into ascending frag_index order
frag_set.sort(key=lambda info: info['frag_index'])
timestamp = frag_set[0]['timestamp']
frag_sets[timestamp] = frag_set
if durable_info and durable_info['timestamp'] == timestamp:
durable_frag_set = frag_set
if context.get('.durable'):
# a .durable file has been found
if ext == '.data':
if self.is_obsolete(filename, context.get('.durable')):
# this and remaining data files are older than durable
discard()
set_valid_fileset()
else:
# accept the first .data file if it matches requested
# frag_index, or if no specific frag_index is requested
fi = self.parse_on_disk_filename(filename)['frag_index']
if frag_index is None or frag_index == int(fi):
accept_first()
set_valid_fileset()
# else: keep searching for a .data file to match frag_index
context.setdefault('fragments', []).append(filename)
# Select a single chosen frag from the chosen frag_set, by either
# matching against a specified frag_index or taking the highest index.
chosen_frag = None
if durable_frag_set:
if frag_index is not None:
# search the frag set to find the exact frag_index
for info in durable_frag_set:
if info['frag_index'] == frag_index:
chosen_frag = info
break
else:
# there can no longer be a matching .data file so mark what has
# been found so far as the valid fileset
discard()
set_valid_fileset()
elif ext == '.data':
# not yet found a .durable
if have_valid_fileset():
# valid fileset means we must have a newer
# .ts, so discard the older .data file
discard()
else:
# .data newer than a .durable or .ts, don't discard yet
context.setdefault('fragments_without_durable', []).append(
filename)
elif ext == '.ts':
if have_valid_fileset() or not accept_first():
# newer .data, .durable or .ts already found so discard this
discard()
if not have_valid_fileset():
# remove any .meta that may have been previously found
context.pop('.meta', None)
set_valid_fileset()
elif ext in ('.meta', '.durable'):
if have_valid_fileset() or not accept_first():
# newer .data, .durable or .ts already found so discard this
discard()
else:
# ignore unexpected files
pass
return have_valid_fileset()
chosen_frag = durable_frag_set[-1]
def _verify_on_disk_files(self, accepted_files, frag_index=None, **kwargs):
# If we successfully found a frag then set results
if chosen_frag:
results['data_info'] = chosen_frag
results['durable_frag_set'] = durable_frag_set
results['frag_sets'] = frag_sets
# Mark any isolated .durable as obsolete
if exts.get('.durable') and not durable_frag_set:
results.setdefault('obsolete', []).extend(exts['.durable'])
exts.pop('.durable')
# Fragments *may* be ready for reclaim, unless they are durable or
# at the timestamp we have just chosen for constructing the diskfile.
for frag_set in frag_sets.values():
if frag_set == durable_frag_set:
continue
results.setdefault('possible_reclaim', []).extend(frag_set)
def _verify_ondisk_files(self, results, frag_index=None, **kwargs):
"""
Verify that the final combination of on disk files complies with the
erasure-coded diskfile contract.
:param accepted_files: files that have been found and accepted
:param results: files that have been found and accepted
:param frag_index: specifies a specific fragment index .data file
:returns: True if the file combination is compliant, False otherwise
"""
if not accepted_files.get('.data'):
# We may find only a .meta, which doesn't mean the on disk
# contract is broken. So we clear it to comply with
# superclass assertions.
accepted_files.pop('.meta', None)
data_file, meta_file, ts_file, durable_file = tuple(
[accepted_files.get(ext)
for ext in ('.data', '.meta', '.ts', '.durable')])
return ((data_file is None or durable_file is not None)
and (data_file is None and meta_file is None
and ts_file is None and durable_file is None)
or (ts_file is not None and data_file is None
and meta_file is None and durable_file is None)
or (data_file is not None and durable_file is not None
and ts_file is None)
or (durable_file is not None and meta_file is None
and ts_file is None))
if super(ECDiskFileManager, self)._verify_ondisk_files(
results, **kwargs):
have_data_file = results['data_file'] is not None
have_durable = results.get('durable_frag_set') is not None
return have_data_file == have_durable
return False
def _hash_suffix(self, path, reclaim_age):
"""
@ -2412,12 +2462,12 @@ class ECDiskFileManager(BaseDiskFileManager):
# here we flatten out the hashers hexdigest into a dictionary instead
# of just returning the one hexdigest for the whole suffix
def mapper(filename):
info = self.parse_on_disk_filename(filename)
fi = info['frag_index']
if fi is None:
return None, filename
else:
return fi, info['timestamp'].internal
info = self.parse_on_disk_filename(filename)
fi = info['frag_index']
if fi is None:
return None, filename
else:
return fi, info['timestamp'].internal
hash_per_fi = self._hash_suffix_dir(path, mapper, reclaim_age)
return dict((fi, md5.hexdigest()) for fi, md5 in hash_per_fi.items())

View File

@ -254,6 +254,7 @@ class DiskFile(object):
self._metadata = None
self._fp = None
self._filesystem = fs
self.fragments = None
def open(self):
"""
@ -421,3 +422,5 @@ class DiskFile(object):
return Timestamp(self._metadata.get('X-Timestamp'))
data_timestamp = timestamp
durable_timestamp = timestamp

View File

@ -779,6 +779,15 @@ class TestTimestamp(unittest.TestCase):
self.assertEqual(
sorted([t.internal for t in timestamps]), expected)
def test_hashable(self):
ts_0 = utils.Timestamp('1402444821.72589')
ts_0_also = utils.Timestamp('1402444821.72589')
self.assertEqual(ts_0, ts_0_also) # sanity
self.assertEqual(hash(ts_0), hash(ts_0_also))
d = {ts_0: 'whatever'}
self.assertIn(ts_0, d) # sanity
self.assertIn(ts_0_also, d)
class TestUtils(unittest.TestCase):
"""Tests for swift.common.utils """

View File

@ -20,6 +20,7 @@ import six.moves.cPickle as pickle
import os
import errno
import itertools
from unittest.util import safe_repr
import mock
import unittest
import email
@ -462,6 +463,35 @@ class BaseDiskFileTestMixin(object):
return '.'.join([
mgr_cls.__module__, mgr_cls.__name__, manager_attribute_name])
def _assertDictContainsSubset(self, subset, dictionary, msg=None):
"""Checks whether dictionary is a superset of subset."""
# This is almost identical to the method in python3.4 version of
# unitest.case.TestCase.assertDictContainsSubset, reproduced here to
# avoid the deprecation warning in the original when using python3.
missing = []
mismatched = []
for key, value in subset.items():
if key not in dictionary:
missing.append(key)
elif value != dictionary[key]:
mismatched.append('%s, expected: %s, actual: %s' %
(safe_repr(key), safe_repr(value),
safe_repr(dictionary[key])))
if not (missing or mismatched):
return
standardMsg = ''
if missing:
standardMsg = 'Missing: %s' % ','.join(safe_repr(m) for m in
missing)
if mismatched:
if standardMsg:
standardMsg += '; '
standardMsg += 'Mismatched values: %s' % ','.join(mismatched)
self.fail(self._formatMessage(msg, standardMsg))
class DiskFileManagerMixin(BaseDiskFileTestMixin):
"""
@ -516,8 +546,8 @@ class DiskFileManagerMixin(BaseDiskFileTestMixin):
for _order in ('ordered', 'shuffled', 'shuffled'):
class_under_test = self._get_diskfile(policy, frag_index)
try:
actual = class_under_test._get_ondisk_file(files)
self.assertDictContainsSubset(
actual = class_under_test._get_ondisk_files(files)
self._assertDictContainsSubset(
expected, actual,
'Expected %s from %s but got %s'
% (expected, files, actual))
@ -593,14 +623,38 @@ class DiskFileManagerMixin(BaseDiskFileTestMixin):
df_mgr = self.df_router[policy]
datadir = os.path.join('/srv/node/sdb1/',
diskfile.get_data_dir(policy))
self.assertEqual(expected, df_mgr.get_ondisk_files(
files, datadir))
actual = df_mgr.get_ondisk_files(files, datadir)
self._assertDictContainsSubset(expected, actual)
# check diskfile under the hood
df = self._get_diskfile(policy, frag_index=frag_index)
self.assertEqual(expected, df._get_ondisk_file(files))
actual = df._get_ondisk_files(files)
self._assertDictContainsSubset(expected, actual)
# check diskfile open
self.assertRaises(DiskFileNotExist, df.open)
def test_get_ondisk_files_with_unexpected_file(self):
unexpected_files = ['junk', 'junk.data', '.junk']
timestamp = next(make_timestamp_iter())
tomb_file = timestamp.internal + '.ts'
for policy in POLICIES:
for unexpected in unexpected_files:
files = [unexpected, tomb_file]
df_mgr = self.df_router[policy]
df_mgr.logger = FakeLogger()
datadir = os.path.join('/srv/node/sdb1/',
diskfile.get_data_dir(policy))
results = df_mgr.get_ondisk_files(files, datadir)
expected = {'ts_file': os.path.join(datadir, tomb_file)}
self._assertDictContainsSubset(expected, results)
log_lines = df_mgr.logger.get_lines_for_level('warning')
self.assertTrue(
log_lines[0].startswith(
'Unexpected file %s'
% os.path.join(datadir, unexpected)))
def test_construct_dev_path(self):
res_path = self.df_mgr.construct_dev_path('abc')
self.assertEqual(os.path.join(self.df_mgr.devices, 'abc'), res_path)
@ -1014,14 +1068,59 @@ class TestDiskFileManager(DiskFileManagerMixin, unittest.TestCase):
self._test_yield_hashes_cleanup(scenarios, POLICIES[0])
def test_get_ondisk_files_with_stray_meta(self):
# get_ondisk_files does not tolerate a stray .meta file
# get_ondisk_files ignores a stray .meta file
class_under_test = self._get_diskfile(POLICIES[0])
files = ['0000000007.00000.meta']
self.assertRaises(AssertionError,
class_under_test.manager.get_ondisk_files, files,
self.testdir)
with mock.patch('swift.obj.diskfile.os.listdir', lambda *args: files):
self.assertRaises(DiskFileNotExist, class_under_test.open)
def test_verify_ondisk_files(self):
# ._verify_ondisk_files should only return False if get_ondisk_files
# has produced a bad set of files due to a bug, so to test it we need
# to probe it directly.
mgr = self.df_router[POLICIES.default]
ok_scenarios = (
{'ts_file': None, 'data_file': None, 'meta_file': None},
{'ts_file': None, 'data_file': 'a_file', 'meta_file': None},
{'ts_file': None, 'data_file': 'a_file', 'meta_file': 'a_file'},
{'ts_file': 'a_file', 'data_file': None, 'meta_file': None},
)
for scenario in ok_scenarios:
self.assertTrue(mgr._verify_ondisk_files(scenario),
'Unexpected result for scenario %s' % scenario)
# construct every possible invalid combination of results
vals = (None, 'a_file')
for ts_file, data_file, meta_file in [
(a, b, c) for a in vals for b in vals for c in vals]:
scenario = {
'ts_file': ts_file,
'data_file': data_file,
'meta_file': meta_file}
if scenario in ok_scenarios:
continue
self.assertFalse(mgr._verify_ondisk_files(scenario),
'Unexpected result for scenario %s' % scenario)
def test_parse_on_disk_filename(self):
mgr = self.df_router[POLICIES.default]
for ts in (Timestamp('1234567890.00001'),
Timestamp('1234567890.00001', offset=17)):
for ext in ('.meta', '.data', '.ts'):
fname = '%s%s' % (ts.internal, ext)
info = mgr.parse_on_disk_filename(fname)
self.assertEqual(ts, info['timestamp'])
self.assertEqual(ext, info['ext'])
def test_parse_on_disk_filename_errors(self):
mgr = self.df_router[POLICIES.default]
with self.assertRaises(DiskFileError) as cm:
mgr.parse_on_disk_filename('junk')
self.assertEqual("Invalid Timestamp value in filename 'junk'",
str(cm.exception))
def test_hash_cleanup_listdir_reclaim(self):
# Each scenario specifies a list of (filename, extension, [survives])
@ -1187,6 +1286,10 @@ class TestECDiskFileManager(DiskFileManagerMixin, unittest.TestCase):
# data with no durable is ignored
[('0000000007.00000#0.data', False, True)],
# data newer than tombstone with no durable is ignored
[('0000000007.00000#0.data', False, True),
('0000000006.00000.ts', '.ts', True)],
# data newer than durable is ignored
[('0000000008.00000#1.data', False, True),
('0000000007.00000.durable', '.durable'),
@ -1365,7 +1468,7 @@ class TestECDiskFileManager(DiskFileManagerMixin, unittest.TestCase):
reclaim_age=1000)
def test_get_ondisk_files_with_stray_meta(self):
# get_ondisk_files does not tolerate a stray .meta file
# get_ondisk_files ignores a stray .meta file
class_under_test = self._get_diskfile(POLICIES.default)
@contextmanager
@ -1408,6 +1511,41 @@ class TestECDiskFileManager(DiskFileManagerMixin, unittest.TestCase):
self.fail('expected DiskFileNotExist opening %s with %r' % (
class_under_test.__class__.__name__, files))
def test_verify_ondisk_files(self):
# _verify_ondisk_files should only return False if get_ondisk_files
# has produced a bad set of files due to a bug, so to test it we need
# to probe it directly.
mgr = self.df_router[POLICIES.default]
ok_scenarios = (
{'ts_file': None, 'data_file': None, 'meta_file': None,
'durable_frag_set': None},
{'ts_file': None, 'data_file': 'a_file', 'meta_file': None,
'durable_frag_set': ['a_file']},
{'ts_file': None, 'data_file': 'a_file', 'meta_file': 'a_file',
'durable_frag_set': ['a_file']},
{'ts_file': 'a_file', 'data_file': None, 'meta_file': None,
'durable_frag_set': None},
)
for scenario in ok_scenarios:
self.assertTrue(mgr._verify_ondisk_files(scenario),
'Unexpected result for scenario %s' % scenario)
# construct every possible invalid combination of results
vals = (None, 'a_file')
for ts_file, data_file, meta_file, durable_frag in [
(a, b, c, d)
for a in vals for b in vals for c in vals for d in vals]:
scenario = {
'ts_file': ts_file,
'data_file': data_file,
'meta_file': meta_file,
'durable_frag_set': [durable_frag] if durable_frag else None}
if scenario in ok_scenarios:
continue
self.assertFalse(mgr._verify_ondisk_files(scenario),
'Unexpected result for scenario %s' % scenario)
def test_parse_on_disk_filename(self):
mgr = self.df_router[POLICIES.default]
for ts in (Timestamp('1234567890.00001'),
@ -1416,6 +1554,7 @@ class TestECDiskFileManager(DiskFileManagerMixin, unittest.TestCase):
fname = '%s#%s.data' % (ts.internal, frag)
info = mgr.parse_on_disk_filename(fname)
self.assertEqual(ts, info['timestamp'])
self.assertEqual('.data', info['ext'])
self.assertEqual(frag, info['frag_index'])
self.assertEqual(mgr.make_on_disk_filename(**info), fname)
@ -1423,6 +1562,7 @@ class TestECDiskFileManager(DiskFileManagerMixin, unittest.TestCase):
fname = '%s%s' % (ts.internal, ext)
info = mgr.parse_on_disk_filename(fname)
self.assertEqual(ts, info['timestamp'])
self.assertEqual(ext, info['ext'])
self.assertEqual(None, info['frag_index'])
self.assertEqual(mgr.make_on_disk_filename(**info), fname)
@ -1431,12 +1571,9 @@ class TestECDiskFileManager(DiskFileManagerMixin, unittest.TestCase):
for ts in (Timestamp('1234567890.00001'),
Timestamp('1234567890.00001', offset=17)):
fname = '%s.data' % ts.internal
try:
with self.assertRaises(DiskFileError) as cm:
mgr.parse_on_disk_filename(fname)
msg = 'Expected DiskFileError for filename %s' % fname
self.fail(msg)
except DiskFileError:
pass
self.assertTrue(str(cm.exception).startswith("Bad fragment index"))
expected = {
'': 'bad',
@ -1451,13 +1588,14 @@ class TestECDiskFileManager(DiskFileManagerMixin, unittest.TestCase):
for frag, msg in expected.items():
fname = '%s#%s.data' % (ts.internal, frag)
try:
with self.assertRaises(DiskFileError) as cm:
mgr.parse_on_disk_filename(fname)
except DiskFileError as e:
self.assertTrue(msg in str(e).lower())
else:
msg = 'Expected DiskFileError for filename %s' % fname
self.fail(msg)
self.assertTrue(msg in str(cm.exception).lower())
with self.assertRaises(DiskFileError) as cm:
mgr.parse_on_disk_filename('junk')
self.assertEqual("Invalid Timestamp value in filename 'junk'",
str(cm.exception))
def test_make_on_disk_filename(self):
mgr = self.df_router[POLICIES.default]
@ -1524,34 +1662,6 @@ class TestECDiskFileManager(DiskFileManagerMixin, unittest.TestCase):
actual = mgr.make_on_disk_filename(ts, ext, frag_index=frag)
self.assertEqual(expected, actual)
def test_is_obsolete(self):
mgr = self.df_router[POLICIES.default]
for ts in (Timestamp('1234567890.00001'),
Timestamp('1234567890.00001', offset=17)):
for ts2 in (Timestamp('1234567890.99999'),
Timestamp('1234567890.99999', offset=17),
ts):
f_2 = mgr.make_on_disk_filename(ts, '.durable')
for fi in (0, 2):
for ext in ('.data', '.meta', '.durable', '.ts'):
f_1 = mgr.make_on_disk_filename(
ts2, ext, frag_index=fi)
self.assertFalse(mgr.is_obsolete(f_1, f_2),
'%s should not be obsolete w.r.t. %s'
% (f_1, f_2))
for ts2 in (Timestamp('1234567890.00000'),
Timestamp('1234500000.00000', offset=0),
Timestamp('1234500000.00000', offset=17)):
f_2 = mgr.make_on_disk_filename(ts, '.durable')
for fi in (0, 2):
for ext in ('.data', '.meta', '.durable', '.ts'):
f_1 = mgr.make_on_disk_filename(
ts2, ext, frag_index=fi)
self.assertTrue(mgr.is_obsolete(f_1, f_2),
'%s should not be w.r.t. %s'
% (f_1, f_2))
def test_yield_hashes(self):
old_ts = '1383180000.12345'
fresh_ts = Timestamp(time() - 10).internal
@ -1724,6 +1834,7 @@ class TestECDiskFileManager(DiskFileManagerMixin, unittest.TestCase):
# missing frag index
'9444a92d072897b136b3fc06595b7456': [
ts1.internal + '.data'],
# junk
'9555a92d072897b136b3fc06595b8456': [
'junk_file'],
# missing .durable
@ -1733,6 +1844,7 @@ class TestECDiskFileManager(DiskFileManagerMixin, unittest.TestCase):
# .meta files w/o .data files can't be opened, and are ignored
'9777a92d072897b136b3fc06595ba456': [
ts1.internal + '.meta'],
# multiple meta files with no data
'9888a92d072897b136b3fc06595bb456': [
ts1.internal + '.meta',
ts2.internal + '.meta'],
@ -2259,12 +2371,13 @@ class DiskFileMixin(BaseDiskFileTestMixin):
def _get_open_disk_file(self, invalid_type=None, obj_name='o', fsize=1024,
csize=8, mark_deleted=False, prealloc=False,
ts=None, mount_check=False, extra_metadata=None,
policy=None, frag_index=None):
policy=None, frag_index=None, data=None,
commit=True):
'''returns a DiskFile'''
policy = policy or POLICIES.legacy
df = self._simple_get_diskfile(obj=obj_name, policy=policy,
frag_index=frag_index)
data = '0' * fsize
data = data or '0' * fsize
etag = md5()
if ts:
timestamp = Timestamp(ts)
@ -2304,7 +2417,8 @@ class DiskFileMixin(BaseDiskFileTestMixin):
elif invalid_type == 'Bad-X-Delete-At':
metadata['X-Delete-At'] = 'bad integer'
diskfile.write_metadata(writer._fd, metadata)
writer.commit(timestamp)
if commit:
writer.commit(timestamp)
if mark_deleted:
df.delete(timestamp)
@ -3181,6 +3295,33 @@ class DiskFileMixin(BaseDiskFileTestMixin):
with self.assertRaises(DiskFileNotOpen):
df.data_timestamp
def test_durable_timestamp(self):
ts_1 = self.ts()
df = self._get_open_disk_file(ts=ts_1.internal)
with df.open():
self.assertEqual(df.durable_timestamp, ts_1.internal)
# verify durable timestamp does not change when metadata is written
ts_2 = self.ts()
df.write_metadata({'X-Timestamp': ts_2.internal})
with df.open():
self.assertEqual(df.durable_timestamp, ts_1.internal)
def test_durable_timestamp_not_open(self):
df = self._simple_get_diskfile()
with self.assertRaises(DiskFileNotOpen):
df.durable_timestamp
def test_durable_timestamp_no_data_file(self):
df = self._get_open_disk_file(self.ts().internal)
for f in os.listdir(df._datadir):
if f.endswith('.data'):
os.unlink(os.path.join(df._datadir, f))
df = self._simple_get_diskfile()
with self.assertRaises(DiskFileNotExist):
df.open()
# open() was attempted, but no data file so expect None
self.assertIsNone(df.durable_timestamp)
def test_error_in_hash_cleanup_listdir(self):
def mock_hcl(*args, **kwargs):
@ -3914,6 +4055,72 @@ class TestECDiskFile(DiskFileMixin, unittest.TestCase):
'a', 'c', 'o', policy=policy)
self.assertRaises(DiskFileNotExist, df.read_metadata)
def test_fragments(self):
ts_1 = self.ts()
self._get_open_disk_file(ts=ts_1.internal, frag_index=0)
df = self._get_open_disk_file(ts=ts_1.internal, frag_index=2)
self.assertEqual(df.fragments, {ts_1: [0, 2]})
# now add a newer datafile for frag index 3 but don't write a
# durable with it (so ignore the error when we try to open)
ts_2 = self.ts()
try:
df = self._get_open_disk_file(ts=ts_2.internal, frag_index=3,
commit=False)
except DiskFileNotExist:
pass
# sanity check: should have 2* .data, .durable, .data
files = os.listdir(df._datadir)
self.assertEqual(4, len(files))
with df.open():
self.assertEqual(df.fragments, {ts_1: [0, 2], ts_2: [3]})
# verify frags available even if open fails e.g. if .durable missing
for f in filter(lambda f: f.endswith('.durable'), files):
os.remove(os.path.join(df._datadir, f))
self.assertRaises(DiskFileNotExist, df.open)
self.assertEqual(df.fragments, {ts_1: [0, 2], ts_2: [3]})
def test_fragments_not_open(self):
df = self._simple_get_diskfile()
self.assertIsNone(df.fragments)
def test_durable_timestamp_no_durable_file(self):
try:
self._get_open_disk_file(self.ts().internal, commit=False)
except DiskFileNotExist:
pass
df = self._simple_get_diskfile()
with self.assertRaises(DiskFileNotExist):
df.open()
# open() was attempted, but no durable file so expect None
self.assertIsNone(df.durable_timestamp)
def test_durable_timestamp_missing_frag_index(self):
ts1 = self.ts()
self._get_open_disk_file(ts=ts1.internal, frag_index=1)
df = self._simple_get_diskfile(frag_index=2)
with self.assertRaises(DiskFileNotExist):
df.open()
# open() was attempted, but no data file for frag index so expect None
self.assertIsNone(df.durable_timestamp)
def test_durable_timestamp_newer_non_durable_data_file(self):
ts1 = self.ts()
self._get_open_disk_file(ts=ts1.internal)
ts2 = self.ts()
try:
self._get_open_disk_file(ts=ts2.internal, commit=False)
except DiskFileNotExist:
pass
df = self._simple_get_diskfile()
# sanity check - one .durable file, two .data files
self.assertEqual(3, len(os.listdir(df._datadir)))
df.open()
self.assertEqual(ts1, df.durable_timestamp)
@patch_policies(with_ec_default=True)
class TestSuffixHashes(unittest.TestCase):
@ -4493,15 +4700,19 @@ class TestSuffixHashes(unittest.TestCase):
filename += '#%s' % df._frag_index
filename += suff
open(os.path.join(df._datadir, filename), 'w').close()
meta_timestamp = Timestamp(now)
metadata_filename = meta_timestamp.internal + '.meta'
open(os.path.join(df._datadir, metadata_filename), 'w').close()
# call get_hashes and it should clean things up
hashes = df_mgr.get_hashes('sda1', '0', [], policy)
data_filename = timestamp.internal
if policy.policy_type == EC_POLICY:
data_filename += '#%s' % df._frag_index
data_filename += '.data'
metadata_filename = timestamp.internal + '.meta'
durable_filename = timestamp.internal + '.durable'
if policy.policy_type == EC_POLICY:
durable_filename = timestamp.internal + '.durable'
hasher = md5()
hasher.update(metadata_filename)
hasher.update(durable_filename)

View File

@ -1499,15 +1499,16 @@ class TestSender(BaseTest):
'%(body)s\r\n' % expected)
def test_send_post(self):
ts_iter = make_timestamp_iter()
# create .data file
extra_metadata = {'X-Object-Meta-Foo': 'old_value',
'X-Object-Sysmeta-Test': 'test_sysmeta',
'Content-Type': 'test_content_type'}
ts_0 = next(make_timestamp_iter())
ts_0 = next(ts_iter)
df = self._make_open_diskfile(extra_metadata=extra_metadata,
timestamp=ts_0)
# create .meta file
ts_1 = next(make_timestamp_iter())
ts_1 = next(ts_iter)
newer_metadata = {'X-Object-Meta-Foo': 'new_value',
'X-Timestamp': ts_1.internal}
df.write_metadata(newer_metadata)