From abfa6bee72eadd07aa5b2913a553d1c98964c03e Mon Sep 17 00:00:00 2001 From: Tim Burke Date: Wed, 6 Jan 2021 16:18:04 -0800 Subject: [PATCH] relinker: Parallelize per disk Add a new option, workers, that works more or less like the same option from background daemons. Disks will be distributed across N worker sub-processes so we can make the best use of the I/O available. While we're at it, log final stats at warning if there were errors. Co-Authored-By: Clay Gerrard Change-Id: I039d2b8861f69a64bd9d2cdf68f1f534c236b2ba --- etc/object-server.conf-sample | 9 +- swift/cli/relinker.py | 115 ++++++++--- test/unit/cli/test_relinker.py | 360 ++++++++++++++++++++++++--------- 3 files changed, 357 insertions(+), 127 deletions(-) diff --git a/etc/object-server.conf-sample b/etc/object-server.conf-sample index cc6a54b43f..762037178c 100644 --- a/etc/object-server.conf-sample +++ b/etc/object-server.conf-sample @@ -610,7 +610,12 @@ use = egg:swift#xprofile # log_level = INFO # log_address = /dev/log # -# Target this many relinks/cleanups per second, to reduce the +# Start up to this many sub-processes to process disks in parallel. Each disk +# will be handled by at most one child process. By default, one process is +# spawned per disk. +# workers = auto +# +# Target this many relinks/cleanups per second for each worker, to reduce the # likelihood that the added I/O from a partition-power increase impacts # client traffic. Use zero for unlimited. # files_per_second = 0.0 @@ -632,4 +637,4 @@ use = egg:swift#xprofile # useful if previous partition power increases have failed to cleanup # tombstones from their old locations, causing duplicate tombstones with # different inodes to be relinked to the next partition power location. -# link_check_limit = 2 \ No newline at end of file +# link_check_limit = 2 diff --git a/swift/cli/relinker.py b/swift/cli/relinker.py index 9fd88a87b5..e3e1ddee6e 100644 --- a/swift/cli/relinker.py +++ b/swift/cli/relinker.py @@ -20,10 +20,12 @@ import fcntl import json import logging import os +import time from swift.common.storage_policy import POLICIES from swift.common.utils import replace_partition_in_path, config_true_value, \ audit_location_generator, get_logger, readconf, drop_privileges, \ - RateLimitedIterator, lock_path, non_negative_float, non_negative_int + RateLimitedIterator, lock_path, PrefixLoggerAdapter, distribute_evenly, \ + non_negative_float, non_negative_int, config_auto_int_value from swift.obj import diskfile @@ -45,14 +47,14 @@ def policy(policy_name_or_index): class Relinker(object): - def __init__(self, conf, logger, device, do_cleanup=False): + def __init__(self, conf, logger, device_list=None, do_cleanup=False): self.conf = conf self.logger = logger - self.device = device + self.device_list = device_list or [] self.do_cleanup = do_cleanup self.root = self.conf['devices'] - if self.device is not None: - self.root = os.path.join(self.root, self.device) + if len(self.device_list) == 1: + self.root = os.path.join(self.root, list(self.device_list)[0]) self.part_power = self.next_part_power = None self.diskfile_mgr = None self.dev_lock = None @@ -70,8 +72,8 @@ class Relinker(object): } def devices_filter(self, _, devices): - if self.device: - devices = [d for d in devices if d == self.device] + if self.device_list: + devices = [d for d in devices if d in self.device_list] return set(devices) @@ -494,23 +496,84 @@ class Relinker(object): 'There were unexpected errors while enumerating disk ' 'files: %r', self.stats) - self.logger.info( + if action_errors + listdir_errors + unmounted > 0: + log_method = self.logger.warning + # NB: audit_location_generator logs unmounted disks as warnings, + # but we want to treat them as errors + status = EXIT_ERROR + else: + log_method = self.logger.info + status = EXIT_SUCCESS + + log_method( '%d hash dirs processed (cleanup=%s) (%d files, %d linked, ' '%d removed, %d errors)', hash_dirs, self.do_cleanup, files, linked, removed, action_errors + listdir_errors) - if action_errors + listdir_errors + unmounted > 0: - # NB: audit_location_generator logs unmounted disks as warnings, - # but we want to treat them as errors - return EXIT_ERROR - return EXIT_SUCCESS + return status -def relink(conf, logger, device): - return Relinker(conf, logger, device, do_cleanup=False).run() +def parallel_process(do_cleanup, conf, logger=None, device_list=None): + logger = logger or logging.getLogger() + device_list = sorted(set(device_list or os.listdir(conf['devices']))) + workers = conf['workers'] + if workers == 'auto': + workers = len(device_list) + else: + workers = min(workers, len(device_list)) + if workers == 0 or len(device_list) in (0, 1): + return Relinker( + conf, logger, device_list, do_cleanup=do_cleanup).run() + start = time.time() + children = {} + for worker_devs in distribute_evenly(device_list, workers): + pid = os.fork() + if pid == 0: + dev_logger = PrefixLoggerAdapter(logger, {}) + dev_logger.set_prefix('[pid=%s, devs=%s] ' % ( + os.getpid(), ','.join(worker_devs))) + os._exit(Relinker( + conf, dev_logger, worker_devs, do_cleanup=do_cleanup).run()) + else: + children[pid] = worker_devs -def cleanup(conf, logger, device): - return Relinker(conf, logger, device, do_cleanup=True).run() + final_status = EXIT_SUCCESS + final_messages = [] + while children: + pid, status = os.wait() + sig = status & 0xff + status = status >> 8 + time_delta = time.time() - start + devs = children.pop(pid, ['unknown device']) + worker_desc = '(pid=%s, devs=%s)' % (pid, ','.join(devs)) + if sig != 0: + final_status = EXIT_ERROR + final_messages.append( + 'Worker %s exited in %.1fs after receiving signal: %s' + % (worker_desc, time_delta, sig)) + continue + + if status == EXIT_SUCCESS: + continue + + if status == EXIT_NO_APPLICABLE_POLICY: + if final_status == EXIT_SUCCESS: + final_status = status + continue + + final_status = EXIT_ERROR + if status == EXIT_ERROR: + final_messages.append( + 'Worker %s completed in %.1fs with errors' + % (worker_desc, time_delta)) + else: + final_messages.append( + 'Worker %s exited in %.1fs with unexpected status %s' + % (worker_desc, time_delta, status)) + + for msg in final_messages: + logger.warning(msg) + return final_status def main(args): @@ -529,7 +592,8 @@ def main(args): dest='devices', help='Path to swift device directory') parser.add_argument('--user', default=None, dest='user', help='Drop privileges to this user before relinking') - parser.add_argument('--device', default=None, dest='device', + parser.add_argument('--device', + default=[], dest='device_list', action='append', help='Device name to relink (default: all)') parser.add_argument('--partition', '-p', default=[], dest='partitions', type=non_negative_int, action='append', @@ -541,6 +605,10 @@ def main(args): type=non_negative_float, dest='files_per_second', help='Used to limit I/O. Zero implies no limit ' '(default: no limit).') + parser.add_argument( + '--workers', default=None, type=non_negative_int, help=( + 'Process devices across N workers ' + '(default: one worker per device)')) parser.add_argument('--logfile', default=None, dest='logfile', help='Set log file name. Ignored if using conf_file.') parser.add_argument('--debug', default=False, action='store_true', @@ -584,13 +652,12 @@ def main(args): else non_negative_float(conf.get('files_per_second', '0'))), 'policies': set(args.policies) or POLICIES, 'partitions': set(args.partitions), + 'workers': config_auto_int_value( + conf.get('workers') if args.workers is None else args.workers, + 'auto'), 'link_check_limit': ( args.link_check_limit if args.link_check_limit is not None else non_negative_int(conf.get('link_check_limit', 2))), }) - - if args.action == 'relink': - return relink(conf, logger, device=args.device) - - if args.action == 'cleanup': - return cleanup(conf, logger, device=args.device) + return parallel_process( + args.action == 'cleanup', conf, logger, args.device_list) diff --git a/test/unit/cli/test_relinker.py b/test/unit/cli/test_relinker.py index c2f3c27035..4d1d403003 100644 --- a/test/unit/cli/test_relinker.py +++ b/test/unit/cli/test_relinker.py @@ -161,29 +161,175 @@ class TestRelinker(unittest.TestCase): with mock.patch('swift.common.utils.listdir', mocked): yield + def test_workers_parent(self): + os.mkdir(os.path.join(self.devices, 'sda2')) + self.rb.prepare_increase_partition_power() + self.rb.increase_partition_power() + self._save_ring() + pids = { + 2: 0, + 3: 0, + } + + def mock_wait(): + return pids.popitem() + + with mock.patch('os.fork', side_effect=list(pids.keys())), \ + mock.patch('os.wait', mock_wait): + self.assertEqual(0, relinker.main([ + 'cleanup', + '--swift-dir', self.testdir, + '--devices', self.devices, + '--workers', '2', + '--skip-mount', + ])) + self.assertEqual(pids, {}) + + def test_workers_parent_bubbles_up_errors(self): + def do_test(wait_result, msg): + pids = { + 2: 0, + 3: 0, + 4: 0, + 5: wait_result, + 6: 0, + } + + with mock.patch('os.fork', side_effect=list(pids.keys())), \ + mock.patch('os.wait', lambda: pids.popitem()), \ + mock.patch.object(relinker.logging, 'getLogger', + return_value=self.logger): + self.assertEqual(1, relinker.main([ + 'cleanup', + '--swift-dir', self.testdir, + '--devices', self.devices, + '--skip-mount', + ])) + self.assertEqual(pids, {}) + warning_lines = self.logger.get_lines_for_level('warning') + self.assertTrue( + warning_lines[0].startswith('Worker (pid=5, devs=')) + self.assertTrue( + warning_lines[0].endswith(msg), + 'Expected log line to end with %r; got %r' + % (msg, warning_lines[0])) + self.assertFalse(warning_lines[1:]) + self.logger.clear() + + os.mkdir(os.path.join(self.devices, 'sda2')) + os.mkdir(os.path.join(self.devices, 'sda3')) + os.mkdir(os.path.join(self.devices, 'sda4')) + os.mkdir(os.path.join(self.devices, 'sda5')) + self.rb.prepare_increase_partition_power() + self.rb.increase_partition_power() + self._save_ring() + # signals get the low bits + do_test(9, 'exited in 0.0s after receiving signal: 9') + # exit codes get the high + do_test(1 << 8, 'completed in 0.0s with errors') + do_test(42 << 8, 'exited in 0.0s with unexpected status 42') + + def test_workers_children(self): + os.mkdir(os.path.join(self.devices, 'sda2')) + os.mkdir(os.path.join(self.devices, 'sda3')) + os.mkdir(os.path.join(self.devices, 'sda4')) + os.mkdir(os.path.join(self.devices, 'sda5')) + self.rb.prepare_increase_partition_power() + self.rb.increase_partition_power() + self._save_ring() + + calls = [] + + def fake_fork(): + calls.append('fork') + return 0 + + def fake_run(self): + calls.append(('run', self.device_list)) + return 0 + + def fake_exit(status): + calls.append(('exit', status)) + + with mock.patch('os.fork', fake_fork), \ + mock.patch('os._exit', fake_exit), \ + mock.patch('swift.cli.relinker.Relinker.run', fake_run): + self.assertEqual(0, relinker.main([ + 'cleanup', + '--swift-dir', self.testdir, + '--devices', self.devices, + '--workers', '2', + '--skip-mount', + ])) + self.assertEqual([ + 'fork', + ('run', ['sda1', 'sda3', 'sda5']), + ('exit', 0), + 'fork', + ('run', ['sda2', 'sda4']), + ('exit', 0), + ], calls) + + # test too many workers + calls = [] + + with mock.patch('os.fork', fake_fork), \ + mock.patch('os._exit', fake_exit), \ + mock.patch('swift.cli.relinker.Relinker.run', fake_run): + self.assertEqual(0, relinker.main([ + 'cleanup', + '--swift-dir', self.testdir, + '--devices', self.devices, + '--workers', '6', + '--skip-mount', + ])) + self.assertEqual([ + 'fork', + ('run', ['sda1']), + ('exit', 0), + 'fork', + ('run', ['sda2']), + ('exit', 0), + 'fork', + ('run', ['sda3']), + ('exit', 0), + 'fork', + ('run', ['sda4']), + ('exit', 0), + 'fork', + ('run', ['sda5']), + ('exit', 0), + ], calls) + def _do_test_relinker_drop_privileges(self, command): @contextmanager def do_mocks(): # attach mocks to call_capture so that call order can be asserted call_capture = mock.Mock() - with mock.patch('swift.cli.relinker.drop_privileges') as mock_dp: - with mock.patch('swift.cli.relinker.' + command, - return_value=0) as mock_command: - call_capture.attach_mock(mock_dp, 'drop_privileges') - call_capture.attach_mock(mock_command, command) - yield call_capture + mod = 'swift.cli.relinker.' + with mock.patch(mod + 'Relinker') as mock_relinker, \ + mock.patch(mod + 'drop_privileges') as mock_dp, \ + mock.patch(mod + 'os.listdir', + return_value=['sda', 'sdb']): + mock_relinker.return_value.run.return_value = 0 + call_capture.attach_mock(mock_dp, 'drop_privileges') + call_capture.attach_mock(mock_relinker, 'run') + yield call_capture # no user option with do_mocks() as capture: - self.assertEqual(0, relinker.main([command])) - self.assertEqual([(command, mock.ANY, mock.ANY)], + self.assertEqual(0, relinker.main([command, '--workers', '0'])) + self.assertEqual([mock.call.run(mock.ANY, mock.ANY, ['sda', 'sdb'], + do_cleanup=(command == 'cleanup'))], capture.method_calls) # cli option --user with do_mocks() as capture: - self.assertEqual(0, relinker.main([command, '--user', 'cli_user'])) + self.assertEqual(0, relinker.main([command, '--user', 'cli_user', + '--workers', '0'])) self.assertEqual([('drop_privileges', ('cli_user',), {}), - (command, mock.ANY, mock.ANY)], + mock.call.run(mock.ANY, mock.ANY, ['sda', 'sdb'], + do_cleanup=(command == 'cleanup'))], capture.method_calls) # cli option --user takes precedence over conf file user @@ -191,18 +337,22 @@ class TestRelinker(unittest.TestCase): with mock.patch('swift.cli.relinker.readconf', return_value={'user': 'conf_user'}): self.assertEqual(0, relinker.main([command, 'conf_file', - '--user', 'cli_user'])) + '--user', 'cli_user', + '--workers', '0'])) self.assertEqual([('drop_privileges', ('cli_user',), {}), - (command, mock.ANY, mock.ANY)], + mock.call.run(mock.ANY, mock.ANY, ['sda', 'sdb'], + do_cleanup=(command == 'cleanup'))], capture.method_calls) # conf file user with do_mocks() as capture: with mock.patch('swift.cli.relinker.readconf', - return_value={'user': 'conf_user'}): + return_value={'user': 'conf_user', + 'workers': '0'}): self.assertEqual(0, relinker.main([command, 'conf_file'])) self.assertEqual([('drop_privileges', ('conf_user',), {}), - (command, mock.ANY, mock.ANY)], + mock.call.run(mock.ANY, mock.ANY, ['sda', 'sdb'], + do_cleanup=(command == 'cleanup'))], capture.method_calls) def test_relinker_drop_privileges(self): @@ -290,7 +440,7 @@ class TestRelinker(unittest.TestCase): f.write(dedent(config)) # cite conf file on command line - with mock.patch('swift.cli.relinker.relink') as mock_relink: + with mock.patch('swift.cli.relinker.Relinker') as mock_relinker: relinker.main(['relink', conf_file, '--device', 'sdx', '--debug']) exp_conf = { '__file__': mock.ANY, @@ -302,11 +452,13 @@ class TestRelinker(unittest.TestCase): 'log_name': 'test-relinker', 'log_level': 'DEBUG', 'policies': POLICIES, + 'workers': 'auto', 'partitions': set(), 'link_check_limit': 2, } - mock_relink.assert_called_once_with(exp_conf, mock.ANY, device='sdx') - logger = mock_relink.call_args[0][1] + mock_relinker.assert_called_once_with( + exp_conf, mock.ANY, ['sdx'], do_cleanup=False) + logger = mock_relinker.call_args[0][1] # --debug overrides conf file self.assertEqual(logging.DEBUG, logger.getEffectiveLevel()) self.assertEqual('test-relinker', logger.logger.name) @@ -333,9 +485,9 @@ class TestRelinker(unittest.TestCase): """ with open(conf_file, 'w') as f: f.write(dedent(config)) - with mock.patch('swift.cli.relinker.relink') as mock_relink: + with mock.patch('swift.cli.relinker.Relinker') as mock_relinker: relinker.main(['relink', conf_file, '--device', 'sdx']) - mock_relink.assert_called_once_with({ + mock_relinker.assert_called_once_with({ '__file__': mock.ANY, 'swift_dir': 'test/swift/dir', 'devices': '/test/node', @@ -345,14 +497,15 @@ class TestRelinker(unittest.TestCase): 'log_level': 'WARNING', 'policies': POLICIES, 'partitions': set(), + 'workers': 'auto', 'link_check_limit': 1, - }, mock.ANY, device='sdx') - logger = mock_relink.call_args[0][1] + }, mock.ANY, ['sdx'], do_cleanup=False) + logger = mock_relinker.call_args[0][1] self.assertEqual(logging.WARNING, logger.getEffectiveLevel()) self.assertEqual('test-relinker', logger.logger.name) # override with cli options... - with mock.patch('swift.cli.relinker.relink') as mock_relink: + with mock.patch('swift.cli.relinker.Relinker') as mock_relinker: relinker.main([ 'relink', conf_file, '--device', 'sdx', '--debug', '--swift-dir', 'cli-dir', '--devices', 'cli-devs', @@ -361,7 +514,7 @@ class TestRelinker(unittest.TestCase): '--partition', '123', '--partition', '456', '--link-check-limit', '3', ]) - mock_relink.assert_called_once_with({ + mock_relinker.assert_called_once_with({ '__file__': mock.ANY, 'swift_dir': 'cli-dir', 'devices': 'cli-devs', @@ -371,15 +524,16 @@ class TestRelinker(unittest.TestCase): 'log_name': 'test-relinker', 'policies': {POLICIES[1]}, 'partitions': {123, 456}, + 'workers': 'auto', 'link_check_limit': 3, - }, mock.ANY, device='sdx') + }, mock.ANY, ['sdx'], do_cleanup=False) - with mock.patch('swift.cli.relinker.relink') as mock_relink, \ + with mock.patch('swift.cli.relinker.Relinker') as mock_relinker, \ mock.patch('logging.basicConfig') as mock_logging_config: relinker.main(['relink', '--device', 'sdx', '--swift-dir', 'cli-dir', '--devices', 'cli-devs', '--skip-mount-check']) - mock_relink.assert_called_once_with({ + mock_relinker.assert_called_once_with({ 'swift_dir': 'cli-dir', 'devices': 'cli-devs', 'mount_check': False, @@ -387,12 +541,13 @@ class TestRelinker(unittest.TestCase): 'log_level': 'INFO', 'policies': POLICIES, 'partitions': set(), + 'workers': 'auto', 'link_check_limit': 2, - }, mock.ANY, device='sdx') + }, mock.ANY, ['sdx'], do_cleanup=False) mock_logging_config.assert_called_once_with( format='%(message)s', level=logging.INFO, filename=None) - with mock.patch('swift.cli.relinker.relink') as mock_relink, \ + with mock.patch('swift.cli.relinker.Relinker') as mock_relinker, \ mock.patch('logging.basicConfig') as mock_logging_config: relinker.main([ 'relink', '--debug', @@ -404,7 +559,7 @@ class TestRelinker(unittest.TestCase): '--policy', '1', '--policy', '0', ]) - mock_relink.assert_called_once_with({ + mock_relinker.assert_called_once_with({ 'swift_dir': 'cli-dir', 'devices': 'cli-devs', 'mount_check': False, @@ -412,8 +567,9 @@ class TestRelinker(unittest.TestCase): 'log_level': 'DEBUG', 'policies': set(POLICIES), 'partitions': set(), + 'workers': 'auto', 'link_check_limit': 2 - }, mock.ANY, device='sdx') + }, mock.ANY, ['sdx'], do_cleanup=False) # --debug is now effective mock_logging_config.assert_called_once_with( format='%(message)s', level=logging.DEBUG, filename=None) @@ -740,10 +896,10 @@ class TestRelinker(unittest.TestCase): (('ts', 0),), (('ts', 0),), exp_ret_code=1) - info_lines = self.logger.get_lines_for_level('info') + warning_lines = self.logger.get_lines_for_level('warning') self.assertIn('1 hash dirs processed (cleanup=True) ' '(1 files, 0 linked, 0 removed, 1 errors)', - info_lines) + warning_lines) def test_relink_link_already_exists_but_different_inode(self): self.rb.prepare_increase_partition_power() @@ -769,9 +925,9 @@ class TestRelinker(unittest.TestCase): '[Errno 17] File exists' % (self.objname, self.expected_file), warning_lines[0]) - info_lines = self.logger.get_lines_for_level('info') self.assertIn('1 hash dirs processed (cleanup=False) ' - '(1 files, 0 linked, 0 removed, 1 errors)', info_lines) + '(1 files, 0 linked, 0 removed, 1 errors)', + warning_lines) def test_relink_link_already_exists(self): self.rb.prepare_increase_partition_power() @@ -862,7 +1018,10 @@ class TestRelinker(unittest.TestCase): ])) self.assertEqual(self.logger.get_lines_for_level('warning'), [ 'Skipping sda1 as it is not mounted', - '1 disks were unmounted']) + '1 disks were unmounted', + '0 hash dirs processed (cleanup=False) ' + '(0 files, 0 linked, 0 removed, 0 errors)', + ]) def test_relink_listdir_error(self): self.rb.prepare_increase_partition_power() @@ -878,10 +1037,10 @@ class TestRelinker(unittest.TestCase): ])) self.assertEqual(self.logger.get_lines_for_level('warning'), [ 'Skipping %s because ' % self.objects, - 'There were 1 errors listing partition directories']) - info_lines = self.logger.get_lines_for_level('info') - self.assertIn('0 hash dirs processed (cleanup=False) ' - '(0 files, 0 linked, 0 removed, 1 errors)', info_lines) + 'There were 1 errors listing partition directories', + '0 hash dirs processed (cleanup=False) ' + '(0 files, 0 linked, 0 removed, 1 errors)', + ]) def test_relink_device_filter(self): self.rb.prepare_increase_partition_power() @@ -946,7 +1105,7 @@ class TestRelinker(unittest.TestCase): '--swift-dir', self.testdir, '--devices', self.devices, '--skip-mount', - '--partition', '-1' + '--partition', '-1', ])) self.assertEqual(2, cm.exception.code) @@ -957,7 +1116,7 @@ class TestRelinker(unittest.TestCase): '--swift-dir', self.testdir, '--devices', self.devices, '--skip-mount', - '--partition', 'abc' + '--partition', 'abc', ])) self.assertEqual(2, cm.exception.code) @@ -970,12 +1129,12 @@ class TestRelinker(unittest.TestCase): '--swift-dir', self.testdir, '--devices', self.devices, '--skip-mount', - '--partition', str(self.part + 1) + '--partition', str(self.part + 1), ])) self.assertFalse(os.path.isdir(self.expected_dir)) self.assertEqual( ['Processing files for policy platinum under %s (cleanup=False)' - % self.devices, + % os.path.join(self.devices, 'sda1'), '0 hash dirs processed (cleanup=False) (0 files, 0 linked, ' '0 removed, 0 errors)'], self.logger.get_lines_for_level('info') @@ -990,7 +1149,7 @@ class TestRelinker(unittest.TestCase): '--swift-dir', self.testdir, '--devices', self.devices, '--skip-mount', - '--partition', str(self.part) + '--partition', str(self.part), ])) self.assertTrue(os.path.isdir(self.expected_dir)) self.assertTrue(os.path.isfile(self.expected_file)) @@ -999,7 +1158,7 @@ class TestRelinker(unittest.TestCase): self.assertEqual(stat_old.st_ino, stat_new.st_ino) self.assertEqual( ['Processing files for policy platinum under %s (cleanup=False)' - % self.devices, + % os.path.join(self.devices, 'sda1'), 'Step: relink Device: sda1 Policy: platinum Partitions: 1/3', '1 hash dirs processed (cleanup=False) (1 files, 1 linked, ' '0 removed, 0 errors)'], @@ -1017,7 +1176,7 @@ class TestRelinker(unittest.TestCase): '--skip-mount', '--partition', str(other_objs[0][0]), '-p', str(other_objs[0][0]), # duplicates should be ignored - '-p', str(other_objs[1][0]) + '-p', str(other_objs[1][0]), ])) expected_file = utils.replace_partition_in_path( self.devices, other_objs[0][1], PART_POWER + 1) @@ -1033,7 +1192,7 @@ class TestRelinker(unittest.TestCase): self.assertEqual(stat_old.st_ino, stat_new.st_ino) self.assertEqual( ['Processing files for policy platinum under %s (cleanup=False)' - % self.devices, + % os.path.join(self.devices, 'sda1'), 'Step: relink Device: sda1 Policy: platinum Partitions: 2/3', 'Step: relink Device: sda1 Policy: platinum Partitions: 3/3', '2 hash dirs processed (cleanup=False) (2 files, 2 linked, ' @@ -1398,11 +1557,10 @@ class TestRelinker(unittest.TestCase): (('ts', 0),), # retained (('ts', 0),), exp_ret_code=1) - info_lines = self.logger.get_lines_for_level('info') + warning_lines = self.logger.get_lines_for_level('warning') self.assertIn('2 hash dirs processed (cleanup=False) ' '(2 files, 0 linked, 0 removed, 1 errors)', - info_lines) - warning_lines = self.logger.get_lines_for_level('warning') + warning_lines) self.assertIn('Error relinking: failed to relink', warning_lines[0]) with open(new_filepath, 'r') as fd: self.assertEqual(older_filepath, fd.read()) @@ -1507,10 +1665,10 @@ class TestRelinker(unittest.TestCase): extra_options=['--link-check-limit', '0'], mock_relink_paths=mock_relink_paths, ) - info_lines = self.logger.get_lines_for_level('info') + warning_lines = self.logger.get_lines_for_level('warning') self.assertIn('1 hash dirs processed (cleanup=False) ' '(1 files, 0 linked, 0 removed, 1 errors)', - info_lines) + warning_lines) # one attempt to link from current plus a check/retry from each # possible partition power, stopping at part power 1 @@ -1544,10 +1702,10 @@ class TestRelinker(unittest.TestCase): extra_options=['--link-check-limit', str(PART_POWER + 99)], mock_relink_paths=mock_relink_paths, ) - info_lines = self.logger.get_lines_for_level('info') + warning_lines = self.logger.get_lines_for_level('warning') self.assertIn('1 hash dirs processed (cleanup=False) ' '(1 files, 0 linked, 0 removed, 1 errors)', - info_lines) + warning_lines) # one attempt to link from current plus a check/retry from each # possible partition power, stopping at part power 1 @@ -1629,9 +1787,11 @@ class TestRelinker(unittest.TestCase): 'devices': self.devices, 'mount_check': False, 'files_per_second': 0, - 'policies': POLICIES} - self.assertEqual(0, relinker.relink( - conf, logger=self.logger, device=self.existing_device)) + 'policies': POLICIES, + 'workers': 0} + self.assertEqual(0, relinker.Relinker( + conf, logger=self.logger, device_list=[self.existing_device], + do_cleanup=False).run()) self.rb.increase_partition_power() self._save_ring() @@ -1845,10 +2005,10 @@ class TestRelinker(unittest.TestCase): exp_ret_code=1, relink_errors={'data': OSError(errno.EPERM, 'oops')} ) - info_lines = self.logger.get_lines_for_level('info') + warning_lines = self.logger.get_lines_for_level('warning') self.assertIn('1 hash dirs processed (cleanup=True) ' '(2 files, 0 linked, 0 removed, 1 errors)', - info_lines) + warning_lines) def test_cleanup_missing_meta_file_relink_fails(self): self._cleanup_test((('data', 0), ('meta', 1)), @@ -1859,10 +2019,10 @@ class TestRelinker(unittest.TestCase): exp_ret_code=1, relink_errors={'meta': OSError(errno.EPERM, 'oops')} ) - info_lines = self.logger.get_lines_for_level('info') + warning_lines = self.logger.get_lines_for_level('warning') self.assertIn('1 hash dirs processed (cleanup=True) ' '(2 files, 0 linked, 0 removed, 1 errors)', - info_lines) + warning_lines) def test_cleanup_missing_data_and_meta_file_one_relink_fails(self): self._cleanup_test((('data', 0), ('meta', 1)), @@ -1873,10 +2033,10 @@ class TestRelinker(unittest.TestCase): exp_ret_code=1, relink_errors={'meta': OSError(errno.EPERM, 'oops')} ) - info_lines = self.logger.get_lines_for_level('info') + warning_lines = self.logger.get_lines_for_level('warning') self.assertIn('1 hash dirs processed (cleanup=True) ' '(2 files, 1 linked, 0 removed, 1 errors)', - info_lines) + warning_lines) def test_cleanup_missing_data_and_meta_file_both_relinks_fails(self): self._cleanup_test((('data', 0), ('meta', 1)), @@ -1888,10 +2048,10 @@ class TestRelinker(unittest.TestCase): relink_errors={'data': OSError(errno.EPERM, 'oops'), 'meta': OSError(errno.EPERM, 'oops')} ) - info_lines = self.logger.get_lines_for_level('info') + warning_lines = self.logger.get_lines_for_level('warning') self.assertIn('1 hash dirs processed (cleanup=True) ' '(2 files, 0 linked, 0 removed, 2 errors)', - info_lines) + warning_lines) def test_cleanup_conflicting_data_file(self): self._cleanup_test((('data', 0),), @@ -1900,10 +2060,10 @@ class TestRelinker(unittest.TestCase): (('data', 0),), (('data', 0),), exp_ret_code=1) - info_lines = self.logger.get_lines_for_level('info') + warning_lines = self.logger.get_lines_for_level('warning') self.assertIn('1 hash dirs processed (cleanup=True) ' '(1 files, 0 linked, 0 removed, 1 errors)', - info_lines) + warning_lines) def test_cleanup_conflicting_ts_file(self): self._cleanup_test((('ts', 0),), @@ -1912,10 +2072,10 @@ class TestRelinker(unittest.TestCase): (('ts', 0),), (('ts', 0),), exp_ret_code=1) - info_lines = self.logger.get_lines_for_level('info') + warning_lines = self.logger.get_lines_for_level('warning') self.assertIn('1 hash dirs processed (cleanup=True) ' '(1 files, 0 linked, 0 removed, 1 errors)', - info_lines) + warning_lines) def test_cleanup_conflicting_ts_is_linked_to_part_power_minus_1(self): self._setup_object(lambda part: part >= 2 ** (PART_POWER - 1)) @@ -1965,11 +2125,10 @@ class TestRelinker(unittest.TestCase): (('ts', 0),), # retained (('ts', 0),), exp_ret_code=1) - info_lines = self.logger.get_lines_for_level('info') + warning_lines = self.logger.get_lines_for_level('warning') self.assertIn('2 hash dirs processed (cleanup=True) ' '(2 files, 0 linked, 1 removed, 1 errors)', - info_lines) - warning_lines = self.logger.get_lines_for_level('warning') + warning_lines) self.assertIn('Error relinking (cleanup): failed to relink', warning_lines[0]) with open(new_filepath, 'r') as fd: @@ -2031,10 +2190,10 @@ class TestRelinker(unittest.TestCase): (('data', 0), ('meta', 1)), (('data', 0), ('meta', 1)), exp_ret_code=1) - info_lines = self.logger.get_lines_for_level('info') + warning_lines = self.logger.get_lines_for_level('warning') self.assertIn('1 hash dirs processed (cleanup=True) ' '(2 files, 0 linked, 0 removed, 2 errors)', - info_lines) + warning_lines) def test_cleanup_conflicting_data_file_existing_meta_file(self): # if just one link fails to be created then *nothing* is removed from @@ -2045,10 +2204,10 @@ class TestRelinker(unittest.TestCase): (('data', 0), ('meta', 1)), (('data', 0), ('meta', 1)), exp_ret_code=1) - info_lines = self.logger.get_lines_for_level('info') + warning_lines = self.logger.get_lines_for_level('warning') self.assertIn('1 hash dirs processed (cleanup=True) ' '(2 files, 0 linked, 0 removed, 1 errors)', - info_lines) + warning_lines) def test_cleanup_first_quartile_does_rehash(self): # we need object name in lower half of current part @@ -2160,7 +2319,10 @@ class TestRelinker(unittest.TestCase): ])) self.assertEqual(self.logger.get_lines_for_level('warning'), [ 'Skipping sda1 as it is not mounted', - '1 disks were unmounted']) + '1 disks were unmounted', + '0 hash dirs processed (cleanup=True) ' + '(0 files, 0 linked, 0 removed, 0 errors)', + ]) def test_cleanup_listdir_error(self): self._common_test_cleanup() @@ -2175,10 +2337,10 @@ class TestRelinker(unittest.TestCase): ])) self.assertEqual(self.logger.get_lines_for_level('warning'), [ 'Skipping %s because ' % self.objects, - 'There were 1 errors listing partition directories']) - info_lines = self.logger.get_lines_for_level('info') - self.assertIn('0 hash dirs processed (cleanup=True) ' - '(0 files, 0 linked, 0 removed, 1 errors)', info_lines) + 'There were 1 errors listing partition directories', + '0 hash dirs processed (cleanup=True) ' + '(0 files, 0 linked, 0 removed, 1 errors)', + ]) def test_cleanup_device_filter(self): self._common_test_cleanup() @@ -2269,7 +2431,7 @@ class TestRelinker(unittest.TestCase): self.assertEqual(set([self.existing_device]), devices) # With a non matching filter, returns nothing - r.device = 'none' + r.device_list = ['none'] devices = r.devices_filter("", [self.existing_device]) self.assertEqual(set(), devices) @@ -2539,15 +2701,15 @@ class TestRelinker(unittest.TestCase): with open(self.expected_file, 'r') as fd: self.assertEqual('same but different', fd.read()) warning_lines = self.logger.get_lines_for_level('warning') - self.assertEqual(1, len(warning_lines), warning_lines) + self.assertEqual(2, len(warning_lines), warning_lines) self.assertIn('Error relinking (cleanup): failed to relink %s to %s' % (self.objname, self.expected_file), warning_lines[0]) # suffix should not be invalidated in new partition hashes_invalid = os.path.join(self.next_part_dir, 'hashes.invalid') self.assertFalse(os.path.exists(hashes_invalid)) - info_lines = self.logger.get_lines_for_level('info') - self.assertIn('1 hash dirs processed (cleanup=True) ' - '(1 files, 0 linked, 0 removed, 1 errors)', info_lines) + self.assertEqual('1 hash dirs processed (cleanup=True) ' + '(1 files, 0 linked, 0 removed, 1 errors)', + warning_lines[1]) def test_cleanup_older_object_in_new_partition(self): # relink of the current object failed, but there is an older version of @@ -2749,14 +2911,12 @@ class TestRelinker(unittest.TestCase): ])) self.assertFalse(os.path.isfile(self.expected_file)) self.assertTrue(os.path.isfile(self.objname)) # old file intact - self.assertEqual( - ['Error relinking (cleanup): failed to relink %s to %s: ' - % (self.objname, self.expected_file)], - self.logger.get_lines_for_level('warning'), - ) - info_lines = self.logger.get_lines_for_level('info') - self.assertIn('1 hash dirs processed (cleanup=True) ' - '(1 files, 0 linked, 0 removed, 1 errors)', info_lines) + self.assertEqual(self.logger.get_lines_for_level('warning'), [ + 'Error relinking (cleanup): failed to relink %s to %s: ' + % (self.objname, self.expected_file), + '1 hash dirs processed (cleanup=True) ' + '(1 files, 0 linked, 0 removed, 1 errors)', + ]) # suffix should not be invalidated in new partition self.assertTrue(os.path.exists(hashes_invalid)) with open(hashes_invalid, 'r') as fd: @@ -2797,13 +2957,11 @@ class TestRelinker(unittest.TestCase): self.assertTrue(os.path.isfile(new_meta_path)) # new file intact self.assertFalse(os.path.isfile(self.objname)) # old file removed self.assertTrue(os.path.isfile(old_meta_path)) # meta file remove fail - self.assertEqual( - ['Error cleaning up %s: OSError()' % old_meta_path], - self.logger.get_lines_for_level('warning'), - ) - info_lines = self.logger.get_lines_for_level('info') - self.assertIn('1 hash dirs processed (cleanup=True) ' - '(2 files, 0 linked, 1 removed, 1 errors)', info_lines) + self.assertEqual(self.logger.get_lines_for_level('warning'), [ + 'Error cleaning up %s: OSError()' % old_meta_path, + '1 hash dirs processed (cleanup=True) ' + '(2 files, 0 linked, 1 removed, 1 errors)', + ]) def test_cleanup_two_files_need_linking(self): meta_file = utils.Timestamp(int(self.obj_ts) + 1).internal + '.meta'