diff --git a/bin/swift-object-relinker b/bin/swift-object-relinker index 0712ea7696..7afd7b8735 100755 --- a/bin/swift-object-relinker +++ b/bin/swift-object-relinker @@ -28,6 +28,8 @@ if __name__ == '__main__': dest='swift_dir', help='Path to swift directory') parser.add_argument('--devices', default='/srv/node', dest='devices', help='Path to swift device directory') + parser.add_argument('--device', default=None, dest='device', + help='Device name to relink (default: all)') parser.add_argument('--skip-mount-check', default=False, help='Don\'t test if disk is mounted', action="store_true", dest='skip_mount_check') diff --git a/swift/cli/relinker.py b/swift/cli/relinker.py index b7b4aaf731..630c0e98ee 100644 --- a/swift/cli/relinker.py +++ b/swift/cli/relinker.py @@ -14,8 +14,12 @@ # limitations under the License. +import errno +import fcntl +import json import logging import os +from functools import partial from swift.common.storage_policy import POLICIES from swift.common.exceptions import DiskFileDeleted, DiskFileNotExist, \ DiskFileQuarantined @@ -24,10 +28,126 @@ from swift.common.utils import replace_partition_in_path, \ from swift.obj import diskfile +LOCK_FILE = '.relink.{datadir}.lock' +STATE_FILE = 'relink.{datadir}.json' +STATE_TMP_FILE = '.relink.{datadir}.json.tmp' +STEP_RELINK = 'relink' +STEP_CLEANUP = 'cleanup' + + +def devices_filter(device, _, devices): + if device: + devices = [d for d in devices if d == device] + + return set(devices) + + +def hook_pre_device(locks, states, datadir, device_path): + lock_file = os.path.join(device_path, LOCK_FILE.format(datadir=datadir)) + + fd = os.open(lock_file, os.O_CREAT | os.O_WRONLY) + fcntl.flock(fd, fcntl.LOCK_EX) + locks[0] = fd + + state_file = os.path.join(device_path, STATE_FILE.format(datadir=datadir)) + states.clear() + try: + with open(state_file, 'rt') as f: + tmp = json.load(f) + states.update(tmp) + except ValueError: + # Invalid JSON: remove the file to restart from scratch + os.unlink(state_file) + except IOError as err: + # Ignore file not found error + if err.errno != errno.ENOENT: + raise + + +def hook_post_device(locks, _): + os.close(locks[0]) + locks[0] = None + + +def partitions_filter(states, step, part_power, next_part_power, + datadir_path, partitions): + # Remove all non partitions first (eg: auditor_status_ALL.json) + partitions = [p for p in partitions if p.isdigit()] + + if not (step == STEP_CLEANUP and part_power == next_part_power): + # This is not a cleanup after cancel, partitions in the upper half are + # new partitions, there is nothing to relink/cleanup from there + partitions = [p for p in partitions + if int(p) < 2 ** next_part_power / 2] + + # Format: { 'part': [relinked, cleaned] } + if states: + missing = list(set(partitions) - set(states.keys())) + if missing: + # All missing partitions was created after the first run of + # relink, so after the new ring was distribued, so they already + # are hardlinked in both partitions, but they will need to + # cleaned.. Just update the state file. + for part in missing: + states[part] = [True, False] + if step == STEP_RELINK: + partitions = [str(p) for p, (r, c) in states.items() if not r] + elif step == STEP_CLEANUP: + partitions = [str(p) for p, (r, c) in states.items() if not c] + else: + states.update({str(p): [False, False] for p in partitions}) + + # Always scan the partitions in reverse order to minimize the amount of IO + # (it actually only matters for relink, not for cleanup). + # + # Initial situation: + # objects/0/000/00000000000000000000000000000000/12345.data + # -> relinked to objects/1/000/10000000000000000000000000000000/12345.data + # + # If the relinker then scan partition 1, it will listdir that object while + # it's unnecessary. By working in reverse order of partitions, this is + # avoided. + partitions = sorted(partitions, key=lambda x: int(x), reverse=True) + + return partitions + + +# Save states when a partition is done +def hook_post_partition(states, step, + partition_path): + part = os.path.basename(os.path.abspath(partition_path)) + datadir_path = os.path.dirname(os.path.abspath(partition_path)) + device_path = os.path.dirname(os.path.abspath(datadir_path)) + datadir_name = os.path.basename(os.path.abspath(datadir_path)) + state_tmp_file = os.path.join(device_path, + STATE_TMP_FILE.format(datadir=datadir_name)) + state_file = os.path.join(device_path, + STATE_FILE.format(datadir=datadir_name)) + + if step == STEP_RELINK: + states[part][0] = True + elif step == STEP_CLEANUP: + states[part][1] = True + with open(state_tmp_file, 'wt') as f: + json.dump(states, f) + os.fsync(f.fileno()) + os.rename(state_tmp_file, state_file) + + +def hashes_filter(next_part_power, suff_path, hashes): + hashes = list(hashes) + for hsh in hashes: + fname = os.path.join(suff_path, hsh, 'fake-file-name') + if replace_partition_in_path(fname, next_part_power) == fname: + hashes.remove(hsh) + return hashes + + def relink(swift_dir='/etc/swift', devices='/srv/node', skip_mount_check=False, - logger=logging.getLogger()): + logger=logging.getLogger(), + device=None): mount_check = not skip_mount_check run = False relinked = errors = 0 @@ -41,10 +161,31 @@ def relink(swift_dir='/etc/swift', logging.info('Relinking files for policy %s under %s', policy.name, devices) run = True + datadir = diskfile.get_data_dir(policy) + + locks = [None] + states = {} + relink_devices_filter = partial(devices_filter, device) + relink_hook_pre_device = partial(hook_pre_device, locks, states, + datadir) + relink_hook_post_device = partial(hook_post_device, locks) + relink_partition_filter = partial(partitions_filter, + states, STEP_RELINK, + part_power, next_part_power) + relink_hook_post_partition = partial(hook_post_partition, + states, STEP_RELINK) + relink_hashes_filter = partial(hashes_filter, next_part_power) + locations = audit_location_generator( devices, - diskfile.get_data_dir(policy), - mount_check=mount_check) + datadir, + mount_check=mount_check, + devices_filter=relink_devices_filter, + hook_pre_device=relink_hook_pre_device, + hook_post_device=relink_hook_post_device, + partitions_filter=relink_partition_filter, + hook_post_partition=relink_hook_post_partition, + hashes_filter=relink_hashes_filter) for fname, _, _ in locations: newfname = replace_partition_in_path(fname, next_part_power) try: @@ -67,7 +208,8 @@ def relink(swift_dir='/etc/swift', def cleanup(swift_dir='/etc/swift', devices='/srv/node', skip_mount_check=False, - logger=logging.getLogger()): + logger=logging.getLogger(), + device=None): mount_check = not skip_mount_check conf = {'devices': devices, 'mount_check': mount_check} diskfile_router = diskfile.DiskFileRouter(conf, get_logger(conf)) @@ -83,10 +225,31 @@ def cleanup(swift_dir='/etc/swift', logging.info('Cleaning up files for policy %s under %s', policy.name, devices) run = True + datadir = diskfile.get_data_dir(policy) + + locks = [None] + states = {} + cleanup_devices_filter = partial(devices_filter, device) + cleanup_hook_pre_device = partial(hook_pre_device, locks, states, + datadir) + cleanup_hook_post_device = partial(hook_post_device, locks) + cleanup_partition_filter = partial(partitions_filter, + states, STEP_CLEANUP, + part_power, next_part_power) + cleanup_hook_post_partition = partial(hook_post_partition, + states, STEP_CLEANUP) + cleanup_hashes_filter = partial(hashes_filter, next_part_power) + locations = audit_location_generator( devices, - diskfile.get_data_dir(policy), - mount_check=mount_check) + datadir, + mount_check=mount_check, + devices_filter=cleanup_devices_filter, + hook_pre_device=cleanup_hook_pre_device, + hook_post_device=cleanup_hook_post_device, + partitions_filter=cleanup_partition_filter, + hook_post_partition=cleanup_hook_post_partition, + hashes_filter=cleanup_hashes_filter) for fname, device, partition in locations: expected_fname = replace_partition_in_path(fname, part_power) if fname == expected_fname: @@ -152,8 +315,10 @@ def main(args): if args.action == 'relink': return relink( - args.swift_dir, args.devices, args.skip_mount_check, logger) + args.swift_dir, args.devices, args.skip_mount_check, logger, + device=args.device) if args.action == 'cleanup': return cleanup( - args.swift_dir, args.devices, args.skip_mount_check, logger) + args.swift_dir, args.devices, args.skip_mount_check, logger, + device=args.device) diff --git a/swift/common/utils.py b/swift/common/utils.py index df8713d3a8..3014bc6176 100644 --- a/swift/common/utils.py +++ b/swift/common/utils.py @@ -3152,11 +3152,26 @@ def remove_directory(path): def audit_location_generator(devices, datadir, suffix='', - mount_check=True, logger=None): + mount_check=True, logger=None, + devices_filter=None, partitions_filter=None, + suffixes_filter=None, hashes_filter=None, + hook_pre_device=None, hook_post_device=None, + hook_pre_partition=None, hook_post_partition=None, + hook_pre_suffix=None, hook_post_suffix=None, + hook_pre_hash=None, hook_post_hash=None): """ Given a devices path and a data directory, yield (path, device, partition) for all files in that directory + (devices|partitions|suffixes|hashes)_filter are meant to modify the list of + elements that will be iterated. eg: they can be used to exclude some + elements based on a custom condition defined by the caller. + + hook_pre_(device|partition|suffix|hash) are called before yielding the + element, hook_pos_(device|partition|suffix|hash) are called after the + element was yielded. They are meant to do some pre/post processing. + eg: saving a progress status. + :param devices: parent directory of the devices to be audited :param datadir: a directory located under self.devices. This should be one of the DATADIR constants defined in the account, @@ -3165,11 +3180,31 @@ def audit_location_generator(devices, datadir, suffix='', :param mount_check: Flag to check if a mount check should be performed on devices :param logger: a logger object + :devices_filter: a callable taking (devices, [list of devices]) as + parameters and returning a [list of devices] + :partitions_filter: a callable taking (datadir_path, [list of parts]) as + parameters and returning a [list of parts] + :suffixes_filter: a callable taking (part_path, [list of suffixes]) as + parameters and returning a [list of suffixes] + :hashes_filter: a callable taking (suff_path, [list of hashes]) as + parameters and returning a [list of hashes] + :hook_pre_device: a callable taking device_path as parameter + :hook_post_device: a callable taking device_path as parameter + :hook_pre_partition: a callable taking part_path as parameter + :hook_post_partition: a callable taking part_path as parameter + :hook_pre_suffix: a callable taking suff_path as parameter + :hook_post_suffix: a callable taking suff_path as parameter + :hook_pre_hash: a callable taking hash_path as parameter + :hook_post_hash: a callable taking hash_path as parameter """ device_dir = listdir(devices) # randomize devices in case of process restart before sweep completed shuffle(device_dir) + if devices_filter: + device_dir = devices_filter(devices, device_dir) for device in device_dir: + if hook_pre_device: + hook_pre_device(os.path.join(devices, device)) if mount_check and not ismount(os.path.join(devices, device)): if logger: logger.warning( @@ -3183,24 +3218,36 @@ def audit_location_generator(devices, datadir, suffix='', logger.warning(_('Skipping %(datadir)s because %(err)s'), {'datadir': datadir_path, 'err': e}) continue + if partitions_filter: + partitions = partitions_filter(datadir_path, partitions) for partition in partitions: part_path = os.path.join(datadir_path, partition) + if hook_pre_partition: + hook_pre_partition(part_path) try: suffixes = listdir(part_path) except OSError as e: if e.errno != errno.ENOTDIR: raise continue + if suffixes_filter: + suffixes = suffixes_filter(part_path, suffixes) for asuffix in suffixes: suff_path = os.path.join(part_path, asuffix) + if hook_pre_suffix: + hook_pre_suffix(suff_path) try: hashes = listdir(suff_path) except OSError as e: if e.errno != errno.ENOTDIR: raise continue + if hashes_filter: + hashes = hashes_filter(suff_path, hashes) for hsh in hashes: hash_path = os.path.join(suff_path, hsh) + if hook_pre_hash: + hook_pre_hash(hash_path) try: files = sorted(listdir(hash_path), reverse=True) except OSError as e: @@ -3212,6 +3259,14 @@ def audit_location_generator(devices, datadir, suffix='', continue path = os.path.join(hash_path, fname) yield path, device, partition + if hook_post_hash: + hook_post_hash(hash_path) + if hook_post_suffix: + hook_post_suffix(suff_path) + if hook_post_partition: + hook_post_partition(part_path) + if hook_post_device: + hook_post_device(os.path.join(devices, device)) def ratelimit_sleep(running_time, max_rate, incr_by=1, rate_buffer=5): diff --git a/test/unit/cli/test_relinker.py b/test/unit/cli/test_relinker.py index 8daddb13ff..571f1c2d7f 100644 --- a/test/unit/cli/test_relinker.py +++ b/test/unit/cli/test_relinker.py @@ -12,6 +12,9 @@ # limitations under the License. import binascii +import errno +import fcntl +import json import os import shutil import struct @@ -30,6 +33,9 @@ from test.unit import FakeLogger, skip_if_no_xattrs, DEFAULT_TEST_EC_TYPE, \ patch_policies +PART_POWER = 8 + + class TestRelinker(unittest.TestCase): def setUp(self): skip_if_no_xattrs() @@ -40,7 +46,7 @@ class TestRelinker(unittest.TestCase): os.mkdir(self.testdir) os.mkdir(self.devices) - self.rb = ring.RingBuilder(8, 6.0, 1) + self.rb = ring.RingBuilder(PART_POWER, 6.0, 1) for i in range(6): ip = "127.0.0.%s" % i @@ -55,10 +61,10 @@ class TestRelinker(unittest.TestCase): os.mkdir(self.objects) self._hash = utils.hash_path('a/c/o') digest = binascii.unhexlify(self._hash) - part = struct.unpack_from('>I', digest)[0] >> 24 + self.part = struct.unpack_from('>I', digest)[0] >> 24 self.next_part = struct.unpack_from('>I', digest)[0] >> 23 self.objdir = os.path.join( - self.objects, str(part), self._hash[-3:], self._hash) + self.objects, str(self.part), self._hash[-3:], self._hash) os.makedirs(self.objdir) self.object_fname = "1278553064.00000.data" self.objname = os.path.join(self.objdir, self.object_fname) @@ -97,6 +103,27 @@ class TestRelinker(unittest.TestCase): stat_new = os.stat(self.expected_file) self.assertEqual(stat_old.st_ino, stat_new.st_ino) + def test_relink_device_filter(self): + self.rb.prepare_increase_partition_power() + self._save_ring() + relinker.relink(self.testdir, self.devices, True, + device=self.existing_device) + + self.assertTrue(os.path.isdir(self.expected_dir)) + self.assertTrue(os.path.isfile(self.expected_file)) + + stat_old = os.stat(os.path.join(self.objdir, self.object_fname)) + stat_new = os.stat(self.expected_file) + self.assertEqual(stat_old.st_ino, stat_new.st_ino) + + def test_relink_device_filter_invalid(self): + self.rb.prepare_increase_partition_power() + self._save_ring() + relinker.relink(self.testdir, self.devices, True, device='none') + + self.assertFalse(os.path.isdir(self.expected_dir)) + self.assertFalse(os.path.isfile(self.expected_file)) + def _common_test_cleanup(self, relink=True): # Create a ring that has prev_part_power set self.rb.prepare_increase_partition_power() @@ -121,6 +148,187 @@ class TestRelinker(unittest.TestCase): self.assertFalse(os.path.isfile( os.path.join(self.objdir, self.object_fname))) + def test_cleanup_device_filter(self): + self._common_test_cleanup() + self.assertEqual(0, relinker.cleanup(self.testdir, self.devices, True, + device=self.existing_device)) + + # Old objectname should be removed, new should still exist + self.assertTrue(os.path.isdir(self.expected_dir)) + self.assertTrue(os.path.isfile(self.expected_file)) + self.assertFalse(os.path.isfile( + os.path.join(self.objdir, self.object_fname))) + + def test_cleanup_device_filter_invalid(self): + self._common_test_cleanup() + self.assertEqual(0, relinker.cleanup(self.testdir, self.devices, True, + device='none')) + + # Old objectname should still exist, new should still exist + self.assertTrue(os.path.isdir(self.expected_dir)) + self.assertTrue(os.path.isfile(self.expected_file)) + self.assertTrue(os.path.isfile( + os.path.join(self.objdir, self.object_fname))) + + def test_relink_cleanup(self): + state_file = os.path.join(self.devices, self.existing_device, + 'relink.objects.json') + + self.rb.prepare_increase_partition_power() + self._save_ring() + relinker.relink(self.testdir, self.devices, True) + with open(state_file, 'rt') as f: + self.assertEqual(json.load(f), {str(self.part): [True, False]}) + + self.rb.increase_partition_power() + self.rb._ring = None # Force builder to reload ring + self._save_ring() + relinker.cleanup(self.testdir, self.devices, True) + with open(state_file, 'rt') as f: + self.assertEqual(json.load(f), + {str(self.part): [True, True], + str(self.next_part): [True, True]}) + + def test_devices_filter_filtering(self): + # With no filtering, returns all devices + devices = relinker.devices_filter(None, "", [self.existing_device]) + self.assertEqual(set([self.existing_device]), devices) + + # With a matching filter, returns what is matching + devices = relinker.devices_filter(self.existing_device, "", + [self.existing_device, 'sda2']) + self.assertEqual(set([self.existing_device]), devices) + + # With a non matching filter, returns nothing + devices = relinker.devices_filter('none', "", [self.existing_device]) + self.assertEqual(set(), devices) + + def test_hook_pre_post_device_locking(self): + locks = [None] + device_path = os.path.join(self.devices, self.existing_device) + datadir = 'object' + lock_file = os.path.join(device_path, '.relink.%s.lock' % datadir) + + # The first run gets the lock + relinker.hook_pre_device(locks, {}, datadir, device_path) + self.assertNotEqual([None], locks) + + # A following run would block + with self.assertRaises(IOError) as raised: + with open(lock_file, 'a') as f: + fcntl.flock(f.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB) + self.assertEqual(errno.EAGAIN, raised.exception.errno) + + # Another must not get the lock, so it must return an empty list + relinker.hook_post_device(locks, "") + self.assertEqual([None], locks) + + with open(lock_file, 'a') as f: + fcntl.flock(f.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB) + + def test_state_file(self): + device_path = os.path.join(self.devices, self.existing_device) + datadir = 'objects' + datadir_path = os.path.join(device_path, datadir) + state_file = os.path.join(device_path, 'relink.%s.json' % datadir) + + def call_partition_filter(step, parts): + # Partition 312 will be ignored because it must have been created + # by the relinker + return relinker.partitions_filter(states, step, + PART_POWER, PART_POWER + 1, + datadir_path, parts) + + # Start relinking + states = {} + + # Load the states: As it starts, it must be empty + locks = [None] + relinker.hook_pre_device(locks, states, datadir, device_path) + self.assertEqual({}, states) + os.close(locks[0]) # Release the lock + + # Partition 312 is ignored because it must have been created with the + # next_part_power, so it does not need to be relinked + # 96 and 227 are reverse ordered + # auditor_status_ALL.json is ignored because it's not a partition + self.assertEqual(['227', '96'], + call_partition_filter(relinker.STEP_RELINK, + ['96', '227', '312', + 'auditor_status.json'])) + self.assertEqual(states, {'96': [False, False], '227': [False, False]}) + + # Ack partition 96 + relinker.hook_post_partition(states, relinker.STEP_RELINK, + os.path.join(datadir_path, '96')) + self.assertEqual(states, {'96': [True, False], '227': [False, False]}) + with open(state_file, 'rt') as f: + self.assertEqual(json.load(f), {'96': [True, False], + '227': [False, False]}) + + # Restart relinking after only part 96 was done + self.assertEqual(['227'], + call_partition_filter(relinker.STEP_RELINK, + ['96', '227', '312'])) + self.assertEqual(states, {'96': [True, False], '227': [False, False]}) + + # Ack partition 227 + relinker.hook_post_partition(states, relinker.STEP_RELINK, + os.path.join(datadir_path, '227')) + self.assertEqual(states, {'96': [True, False], '227': [True, False]}) + with open(state_file, 'rt') as f: + self.assertEqual(json.load(f), {'96': [True, False], + '227': [True, False]}) + + # If the process restarts, it reload the state + locks = [None] + states = {} + relinker.hook_pre_device(locks, states, datadir, device_path) + self.assertEqual(states, {'96': [True, False], '227': [True, False]}) + os.close(locks[0]) # Release the lock + + # Start cleanup + self.assertEqual(['227', '96'], + call_partition_filter(relinker.STEP_CLEANUP, + ['96', '227', '312'])) + # Ack partition 227 + relinker.hook_post_partition(states, relinker.STEP_CLEANUP, + os.path.join(datadir_path, '227')) + self.assertEqual(states, {'96': [True, False], '227': [True, True]}) + with open(state_file, 'rt') as f: + self.assertEqual(json.load(f), {'96': [True, False], + '227': [True, True]}) + + # Restart cleanup after only part 227 was done + self.assertEqual(['96'], + call_partition_filter(relinker.STEP_CLEANUP, + ['96', '227', '312'])) + self.assertEqual(states, {'96': [True, False], '227': [True, True]}) + + # Ack partition 96 + relinker.hook_post_partition(states, relinker.STEP_CLEANUP, + os.path.join(datadir_path, '96')) + self.assertEqual(states, {'96': [True, True], '227': [True, True]}) + with open(state_file, 'rt') as f: + self.assertEqual(json.load(f), {'96': [True, True], + '227': [True, True]}) + + # At the end, the state is still accurate + locks = [None] + states = {} + relinker.hook_pre_device(locks, states, datadir, device_path) + self.assertEqual(states, {'96': [True, True], '227': [True, True]}) + os.close(locks[0]) # Release the lock + + # If the file gets corrupted, restart from scratch + with open(state_file, 'wt') as f: + f.write('NOT JSON') + locks = [None] + states = {} + relinker.hook_pre_device(locks, states, datadir, device_path) + self.assertEqual(states, {}) + os.close(locks[0]) # Release the lock + def test_cleanup_not_yet_relinked(self): self._common_test_cleanup(relink=False) self.assertEqual(1, relinker.cleanup(self.testdir, self.devices, True)) @@ -176,3 +384,7 @@ class TestRelinker(unittest.TestCase): self.assertIn('failed audit and was quarantined', self.logger.get_lines_for_level('warning')[0]) + + +if __name__ == '__main__': + unittest.main() diff --git a/test/unit/common/test_utils.py b/test/unit/common/test_utils.py index b91be13299..fa5a057fc9 100644 --- a/test/unit/common/test_utils.py +++ b/test/unit/common/test_utils.py @@ -6080,6 +6080,136 @@ class TestAuditLocationGenerator(unittest.TestCase): self.assertEqual(list(locations), [(obj_path, "drive", "partition2")]) + def test_hooks(self): + with temptree([]) as tmpdir: + logger = FakeLogger() + data = os.path.join(tmpdir, "drive", "data") + os.makedirs(data) + partition = os.path.join(data, "partition1") + os.makedirs(partition) + suffix = os.path.join(partition, "suffix1") + os.makedirs(suffix) + hash_path = os.path.join(suffix, "hash1") + os.makedirs(hash_path) + obj_path = os.path.join(hash_path, "obj1.dat") + with open(obj_path, "w"): + pass + meta_path = os.path.join(hash_path, "obj1.meta") + with open(meta_path, "w"): + pass + hook_pre_device = MagicMock() + hook_post_device = MagicMock() + hook_pre_partition = MagicMock() + hook_post_partition = MagicMock() + hook_pre_suffix = MagicMock() + hook_post_suffix = MagicMock() + hook_pre_hash = MagicMock() + hook_post_hash = MagicMock() + locations = utils.audit_location_generator( + tmpdir, "data", ".dat", mount_check=False, logger=logger, + hook_pre_device=hook_pre_device, + hook_post_device=hook_post_device, + hook_pre_partition=hook_pre_partition, + hook_post_partition=hook_post_partition, + hook_pre_suffix=hook_pre_suffix, + hook_post_suffix=hook_post_suffix, + hook_pre_hash=hook_pre_hash, + hook_post_hash=hook_post_hash + ) + list(locations) + hook_pre_device.assert_called_once_with(os.path.join(tmpdir, + "drive")) + hook_post_device.assert_called_once_with(os.path.join(tmpdir, + "drive")) + hook_pre_partition.assert_called_once_with(partition) + hook_post_partition.assert_called_once_with(partition) + hook_pre_suffix.assert_called_once_with(suffix) + hook_post_suffix.assert_called_once_with(suffix) + hook_pre_hash.assert_called_once_with(hash_path) + hook_post_hash.assert_called_once_with(hash_path) + + def test_filters(self): + with temptree([]) as tmpdir: + logger = FakeLogger() + data = os.path.join(tmpdir, "drive", "data") + os.makedirs(data) + partition = os.path.join(data, "partition1") + os.makedirs(partition) + suffix = os.path.join(partition, "suffix1") + os.makedirs(suffix) + hash_path = os.path.join(suffix, "hash1") + os.makedirs(hash_path) + obj_path = os.path.join(hash_path, "obj1.dat") + with open(obj_path, "w"): + pass + meta_path = os.path.join(hash_path, "obj1.meta") + with open(meta_path, "w"): + pass + + def audit_location_generator(**kwargs): + return utils.audit_location_generator( + tmpdir, "data", ".dat", mount_check=False, logger=logger, + **kwargs) + + # Return the list of devices + + with patch('os.listdir', side_effect=os.listdir) as m_listdir: + # devices_filter + m_listdir.reset_mock() + devices_filter = MagicMock(return_value=["drive"]) + list(audit_location_generator(devices_filter=devices_filter)) + devices_filter.assert_called_once_with(tmpdir, ["drive"]) + self.assertIn(((data,),), m_listdir.call_args_list) + + m_listdir.reset_mock() + devices_filter = MagicMock(return_value=[]) + list(audit_location_generator(devices_filter=devices_filter)) + devices_filter.assert_called_once_with(tmpdir, ["drive"]) + self.assertNotIn(((data,),), m_listdir.call_args_list) + + # partitions_filter + m_listdir.reset_mock() + partitions_filter = MagicMock(return_value=["partition1"]) + list(audit_location_generator( + partitions_filter=partitions_filter)) + partitions_filter.assert_called_once_with(data, + ["partition1"]) + self.assertIn(((partition,),), m_listdir.call_args_list) + + m_listdir.reset_mock() + partitions_filter = MagicMock(return_value=[]) + list(audit_location_generator( + partitions_filter=partitions_filter)) + partitions_filter.assert_called_once_with(data, + ["partition1"]) + self.assertNotIn(((partition,),), m_listdir.call_args_list) + + # suffixes_filter + m_listdir.reset_mock() + suffixes_filter = MagicMock(return_value=["suffix1"]) + list(audit_location_generator(suffixes_filter=suffixes_filter)) + suffixes_filter.assert_called_once_with(partition, ["suffix1"]) + self.assertIn(((suffix,),), m_listdir.call_args_list) + + m_listdir.reset_mock() + suffixes_filter = MagicMock(return_value=[]) + list(audit_location_generator(suffixes_filter=suffixes_filter)) + suffixes_filter.assert_called_once_with(partition, ["suffix1"]) + self.assertNotIn(((suffix,),), m_listdir.call_args_list) + + # hashes_filter + m_listdir.reset_mock() + hashes_filter = MagicMock(return_value=["hash1"]) + list(audit_location_generator(hashes_filter=hashes_filter)) + hashes_filter.assert_called_once_with(suffix, ["hash1"]) + self.assertIn(((hash_path,),), m_listdir.call_args_list) + + m_listdir.reset_mock() + hashes_filter = MagicMock(return_value=[]) + list(audit_location_generator(hashes_filter=hashes_filter)) + hashes_filter.assert_called_once_with(suffix, ["hash1"]) + self.assertNotIn(((hash_path,),), m_listdir.call_args_list) + class TestGreenAsyncPile(unittest.TestCase):