Merge "Modify _get_hashes() arguments to be more generic"
This commit is contained in:
commit
a22208043f
@ -328,6 +328,14 @@ def invalidate_hash(suffix_dir):
|
|||||||
inv_fh.write(suffix + "\n")
|
inv_fh.write(suffix + "\n")
|
||||||
|
|
||||||
|
|
||||||
|
def get_part_path(dev_path, policy, partition):
|
||||||
|
"""
|
||||||
|
Given the device path, policy, and partition, returns the full
|
||||||
|
path to the partition
|
||||||
|
"""
|
||||||
|
return os.path.join(dev_path, get_data_dir(policy), str(partition))
|
||||||
|
|
||||||
|
|
||||||
class AuditLocation(object):
|
class AuditLocation(object):
|
||||||
"""
|
"""
|
||||||
Represents an object location to be audited.
|
Represents an object location to be audited.
|
||||||
@ -1029,13 +1037,16 @@ class BaseDiskFileManager(object):
|
|||||||
hashes.pop('valid', None)
|
hashes.pop('valid', None)
|
||||||
return hashed, hashes
|
return hashed, hashes
|
||||||
|
|
||||||
def __get_hashes(self, partition_path, recalculate=None, do_listdir=False):
|
def __get_hashes(self, device, partition, policy, recalculate=None,
|
||||||
|
do_listdir=False):
|
||||||
"""
|
"""
|
||||||
Get hashes for each suffix dir in a partition. do_listdir causes it to
|
Get hashes for each suffix dir in a partition. do_listdir causes it to
|
||||||
mistrust the hash cache for suffix existence at the (unexpectedly high)
|
mistrust the hash cache for suffix existence at the (unexpectedly high)
|
||||||
cost of a listdir.
|
cost of a listdir.
|
||||||
|
|
||||||
:param partition_path: absolute path of partition to get hashes for
|
:param device: name of target device
|
||||||
|
:param partition: partition on the device in which the object lives
|
||||||
|
:param policy: the StoragePolicy instance
|
||||||
:param recalculate: list of suffixes which should be recalculated when
|
:param recalculate: list of suffixes which should be recalculated when
|
||||||
got
|
got
|
||||||
:param do_listdir: force existence check for all hashes in the
|
:param do_listdir: force existence check for all hashes in the
|
||||||
@ -1044,6 +1055,8 @@ class BaseDiskFileManager(object):
|
|||||||
:returns: tuple of (number of suffix dirs hashed, dictionary of hashes)
|
:returns: tuple of (number of suffix dirs hashed, dictionary of hashes)
|
||||||
"""
|
"""
|
||||||
hashed = 0
|
hashed = 0
|
||||||
|
dev_path = self.get_dev_path(device)
|
||||||
|
partition_path = get_part_path(dev_path, policy, partition)
|
||||||
hashes_file = join(partition_path, HASH_FILE)
|
hashes_file = join(partition_path, HASH_FILE)
|
||||||
modified = False
|
modified = False
|
||||||
orig_hashes = {'valid': False}
|
orig_hashes = {'valid': False}
|
||||||
@ -1100,7 +1113,9 @@ class BaseDiskFileManager(object):
|
|||||||
if read_hashes(partition_path) == orig_hashes:
|
if read_hashes(partition_path) == orig_hashes:
|
||||||
write_hashes(partition_path, hashes)
|
write_hashes(partition_path, hashes)
|
||||||
return hashed, hashes
|
return hashed, hashes
|
||||||
return self.__get_hashes(partition_path, recalculate, do_listdir)
|
return self.__get_hashes(device, partition, policy,
|
||||||
|
recalculate=recalculate,
|
||||||
|
do_listdir=do_listdir)
|
||||||
else:
|
else:
|
||||||
return hashed, hashes
|
return hashed, hashes
|
||||||
|
|
||||||
@ -1289,12 +1304,11 @@ class BaseDiskFileManager(object):
|
|||||||
dev_path = self.get_dev_path(device)
|
dev_path = self.get_dev_path(device)
|
||||||
if not dev_path:
|
if not dev_path:
|
||||||
raise DiskFileDeviceUnavailable()
|
raise DiskFileDeviceUnavailable()
|
||||||
partition_path = os.path.join(dev_path, get_data_dir(policy),
|
partition_path = get_part_path(dev_path, policy, partition)
|
||||||
partition)
|
|
||||||
if not os.path.exists(partition_path):
|
if not os.path.exists(partition_path):
|
||||||
mkdirs(partition_path)
|
mkdirs(partition_path)
|
||||||
_junk, hashes = tpool_reraise(
|
_junk, hashes = tpool_reraise(
|
||||||
self._get_hashes, partition_path, recalculate=suffixes)
|
self._get_hashes, device, partition, policy, recalculate=suffixes)
|
||||||
return hashes
|
return hashes
|
||||||
|
|
||||||
def _listdir(self, path):
|
def _listdir(self, path):
|
||||||
@ -1322,8 +1336,7 @@ class BaseDiskFileManager(object):
|
|||||||
dev_path = self.get_dev_path(device)
|
dev_path = self.get_dev_path(device)
|
||||||
if not dev_path:
|
if not dev_path:
|
||||||
raise DiskFileDeviceUnavailable()
|
raise DiskFileDeviceUnavailable()
|
||||||
partition_path = os.path.join(dev_path, get_data_dir(policy),
|
partition_path = get_part_path(dev_path, policy, partition)
|
||||||
partition)
|
|
||||||
for suffix in self._listdir(partition_path):
|
for suffix in self._listdir(partition_path):
|
||||||
if len(suffix) != 3:
|
if len(suffix) != 3:
|
||||||
continue
|
continue
|
||||||
@ -1364,9 +1377,7 @@ class BaseDiskFileManager(object):
|
|||||||
if suffixes is None:
|
if suffixes is None:
|
||||||
suffixes = self.yield_suffixes(device, partition, policy)
|
suffixes = self.yield_suffixes(device, partition, policy)
|
||||||
else:
|
else:
|
||||||
partition_path = os.path.join(dev_path,
|
partition_path = get_part_path(dev_path, policy, partition)
|
||||||
get_data_dir(policy),
|
|
||||||
str(partition))
|
|
||||||
suffixes = (
|
suffixes = (
|
||||||
(os.path.join(partition_path, suffix), suffix)
|
(os.path.join(partition_path, suffix), suffix)
|
||||||
for suffix in suffixes)
|
for suffix in suffixes)
|
||||||
|
@ -505,11 +505,12 @@ class ObjectReconstructor(Daemon):
|
|||||||
self.kill_coros()
|
self.kill_coros()
|
||||||
self.last_reconstruction_count = self.reconstruction_count
|
self.last_reconstruction_count = self.reconstruction_count
|
||||||
|
|
||||||
def _get_hashes(self, policy, path, recalculate=None, do_listdir=False):
|
def _get_hashes(self, device, partition, policy, recalculate=None,
|
||||||
|
do_listdir=False):
|
||||||
df_mgr = self._df_router[policy]
|
df_mgr = self._df_router[policy]
|
||||||
hashed, suffix_hashes = tpool_reraise(
|
hashed, suffix_hashes = tpool_reraise(
|
||||||
df_mgr._get_hashes, path, recalculate=recalculate,
|
df_mgr._get_hashes, device, partition, policy,
|
||||||
do_listdir=do_listdir)
|
recalculate=recalculate, do_listdir=do_listdir)
|
||||||
self.logger.update_stats('suffix.hashes', hashed)
|
self.logger.update_stats('suffix.hashes', hashed)
|
||||||
return suffix_hashes
|
return suffix_hashes
|
||||||
|
|
||||||
@ -602,8 +603,9 @@ class ObjectReconstructor(Daemon):
|
|||||||
node['index'])
|
node['index'])
|
||||||
# now recalculate local hashes for suffixes that don't
|
# now recalculate local hashes for suffixes that don't
|
||||||
# match so we're comparing the latest
|
# match so we're comparing the latest
|
||||||
local_suff = self._get_hashes(job['policy'], job['path'],
|
local_suff = self._get_hashes(job['local_dev']['device'],
|
||||||
recalculate=suffixes)
|
job['partition'],
|
||||||
|
job['policy'], recalculate=suffixes)
|
||||||
|
|
||||||
suffixes = self.get_suffix_delta(local_suff,
|
suffixes = self.get_suffix_delta(local_suff,
|
||||||
job['frag_index'],
|
job['frag_index'],
|
||||||
@ -769,7 +771,8 @@ class ObjectReconstructor(Daemon):
|
|||||||
"""
|
"""
|
||||||
# find all the fi's in the part, and which suffixes have them
|
# find all the fi's in the part, and which suffixes have them
|
||||||
try:
|
try:
|
||||||
hashes = self._get_hashes(policy, part_path, do_listdir=True)
|
hashes = self._get_hashes(local_dev['device'], partition, policy,
|
||||||
|
do_listdir=True)
|
||||||
except OSError as e:
|
except OSError as e:
|
||||||
if e.errno != errno.ENOTDIR:
|
if e.errno != errno.ENOTDIR:
|
||||||
raise
|
raise
|
||||||
|
@ -408,7 +408,8 @@ class ObjectReplicator(Daemon):
|
|||||||
df_mgr = self._df_router[job['policy']]
|
df_mgr = self._df_router[job['policy']]
|
||||||
try:
|
try:
|
||||||
hashed, local_hash = tpool_reraise(
|
hashed, local_hash = tpool_reraise(
|
||||||
df_mgr._get_hashes, job['path'],
|
df_mgr._get_hashes, job['device'],
|
||||||
|
job['partition'], job['policy'],
|
||||||
do_listdir=_do_listdir(
|
do_listdir=_do_listdir(
|
||||||
int(job['partition']),
|
int(job['partition']),
|
||||||
self.replication_cycle))
|
self.replication_cycle))
|
||||||
@ -462,7 +463,8 @@ class ObjectReplicator(Daemon):
|
|||||||
continue
|
continue
|
||||||
hashed, recalc_hash = tpool_reraise(
|
hashed, recalc_hash = tpool_reraise(
|
||||||
df_mgr._get_hashes,
|
df_mgr._get_hashes,
|
||||||
job['path'], recalculate=suffixes)
|
job['device'], job['partition'], job['policy'],
|
||||||
|
recalculate=suffixes)
|
||||||
self.logger.update_stats('suffix.hashes', hashed)
|
self.logger.update_stats('suffix.hashes', hashed)
|
||||||
local_hash = recalc_hash
|
local_hash = recalc_hash
|
||||||
suffixes = [suffix for suffix in local_hash if
|
suffixes = [suffix for suffix in local_hash if
|
||||||
|
@ -275,6 +275,17 @@ class TestDiskFileModuleMethods(unittest.TestCase):
|
|||||||
# check tempdir
|
# check tempdir
|
||||||
self.assertTrue(os.path.isdir(tmp_path))
|
self.assertTrue(os.path.isdir(tmp_path))
|
||||||
|
|
||||||
|
def test_get_part_path(self):
|
||||||
|
# partition passed as 'str'
|
||||||
|
part_dir = diskfile.get_part_path('/srv/node/sda1', POLICIES[0], '123')
|
||||||
|
exp_dir = '/srv/node/sda1/objects/123'
|
||||||
|
self.assertEqual(part_dir, exp_dir)
|
||||||
|
|
||||||
|
# partition passed as 'int'
|
||||||
|
part_dir = diskfile.get_part_path('/srv/node/sdb5', POLICIES[1], 123)
|
||||||
|
exp_dir = '/srv/node/sdb5/objects-1/123'
|
||||||
|
self.assertEqual(part_dir, exp_dir)
|
||||||
|
|
||||||
|
|
||||||
@patch_policies
|
@patch_policies
|
||||||
class TestObjectAuditLocationGenerator(unittest.TestCase):
|
class TestObjectAuditLocationGenerator(unittest.TestCase):
|
||||||
|
@ -1793,7 +1793,7 @@ class TestObjectReconstructor(unittest.TestCase):
|
|||||||
diskfile.HASH_FILE)
|
diskfile.HASH_FILE)
|
||||||
self.assertTrue(os.path.exists(hashes_file))
|
self.assertTrue(os.path.exists(hashes_file))
|
||||||
suffixes = self.reconstructor._get_hashes(
|
suffixes = self.reconstructor._get_hashes(
|
||||||
self.policy, part_path, do_listdir=True)
|
self.local_dev['device'], 0, self.policy, do_listdir=True)
|
||||||
self.assertEqual(suffixes, {})
|
self.assertEqual(suffixes, {})
|
||||||
|
|
||||||
def test_build_jobs_no_hashes(self):
|
def test_build_jobs_no_hashes(self):
|
||||||
|
@ -1627,8 +1627,11 @@ class TestObjectReplicator(unittest.TestCase):
|
|||||||
# if a timeout occurs while replicating one partition to one node.
|
# if a timeout occurs while replicating one partition to one node.
|
||||||
timeouts = [Timeout()]
|
timeouts = [Timeout()]
|
||||||
|
|
||||||
def fake_get_hashes(df_mgr, part_path, **kwargs):
|
def fake_get_hashes(df_mgr, device, partition, policy, **kwargs):
|
||||||
self.get_hash_count += 1
|
self.get_hash_count += 1
|
||||||
|
dev_path = df_mgr.get_dev_path(device)
|
||||||
|
part_path = os.path.join(dev_path, diskfile.get_data_dir(policy),
|
||||||
|
str(partition))
|
||||||
# Simulate a REPLICATE timeout by raising Timeout for second call
|
# Simulate a REPLICATE timeout by raising Timeout for second call
|
||||||
# to get_hashes (with recalculate suffixes) for a specific
|
# to get_hashes (with recalculate suffixes) for a specific
|
||||||
# partition
|
# partition
|
||||||
@ -1750,7 +1753,7 @@ class TestObjectReplicator(unittest.TestCase):
|
|||||||
mock_do_listdir.side_effect = do_listdir_results
|
mock_do_listdir.side_effect = do_listdir_results
|
||||||
expected_tpool_calls = [
|
expected_tpool_calls = [
|
||||||
mock.call(self.replicator._df_router[job['policy']]._get_hashes,
|
mock.call(self.replicator._df_router[job['policy']]._get_hashes,
|
||||||
job['path'],
|
job['device'], job['partition'], job['policy'],
|
||||||
do_listdir=do_listdir)
|
do_listdir=do_listdir)
|
||||||
for job, do_listdir in zip(jobs, do_listdir_results)
|
for job, do_listdir in zip(jobs, do_listdir_results)
|
||||||
]
|
]
|
||||||
|
Loading…
Reference in New Issue
Block a user