diff --git a/swift/obj/reconstructor.py b/swift/obj/reconstructor.py index 3a9cfe99ee..09d137e949 100644 --- a/swift/obj/reconstructor.py +++ b/swift/obj/reconstructor.py @@ -33,7 +33,8 @@ from swift.common.utils import ( whataremyips, unlink_older_than, compute_eta, get_logger, dump_recon_cache, mkdirs, config_true_value, tpool_reraise, GreenAsyncPile, Timestamp, remove_file, - load_recon_cache, parse_override_options, distribute_evenly) + load_recon_cache, parse_override_options, distribute_evenly, + PrefixLoggerAdapter) from swift.common.header_key_dict import HeaderKeyDict from swift.common.bufferedhttp import http_connect from swift.common.daemon import Daemon @@ -142,8 +143,8 @@ class ObjectReconstructor(Daemon): :param logger: logging object """ self.conf = conf - self.logger = logger or get_logger( - conf, log_route='object-reconstructor') + self.logger = PrefixLoggerAdapter( + logger or get_logger(conf, log_route='object-reconstructor'), {}) self.devices_dir = conf.get('devices', '/srv/node') self.mount_check = config_true_value(conf.get('mount_check', 'true')) self.swift_dir = conf.get('swift_dir', '/etc/swift') @@ -225,16 +226,21 @@ class ObjectReconstructor(Daemon): if not devices: # we only need a single worker to do nothing until a ring change yield dict(override_devices=override_opts.devices, - override_partitions=override_opts.partitions) + override_partitions=override_opts.partitions, + multiprocess_worker_index=0) return + # for somewhat uniform load per worker use same # max_devices_per_worker when handling all devices or just override # devices, but only use enough workers for the actual devices being # handled - n_workers = min(self.reconstructor_workers, len(devices)) - for ods in distribute_evenly(devices, n_workers): + self.reconstructor_workers = min(self.reconstructor_workers, + len(devices)) + for index, ods in enumerate(distribute_evenly( + devices, self.reconstructor_workers)): yield dict(override_partitions=override_opts.partitions, - override_devices=ods) + override_devices=ods, + multiprocess_worker_index=index) def is_healthy(self): """ @@ -571,6 +577,12 @@ class ObjectReconstructor(Daemon): _("Nothing reconstructed for %s seconds."), (time.time() - self.start)) + def _emplace_log_prefix(self, worker_index): + self.logger.set_prefix("[worker %d/%d pid=%s] " % ( + worker_index + 1, # use 1-based indexing for more readable logs + self.reconstructor_workers, + os.getpid())) + def kill_coros(self): """Utility function that kills all coroutines currently running.""" for coro in list(self.run_pool.coroutines_running): @@ -1213,7 +1225,9 @@ class ObjectReconstructor(Daemon): recon_update['object_reconstruction_per_disk'] = {} dump_recon_cache(recon_update, self.rcache, self.logger) - def run_once(self, *args, **kwargs): + def run_once(self, multiprocess_worker_index=None, *args, **kwargs): + if multiprocess_worker_index is not None: + self._emplace_log_prefix(multiprocess_worker_index) start = time.time() self.logger.info(_("Running object reconstructor in script mode.")) override_opts = parse_override_options(once=True, **kwargs) @@ -1231,7 +1245,9 @@ class ObjectReconstructor(Daemon): total, override_devices=override_opts.devices, override_partitions=override_opts.partitions) - def run_forever(self, *args, **kwargs): + def run_forever(self, multiprocess_worker_index=None, *args, **kwargs): + if multiprocess_worker_index is not None: + self._emplace_log_prefix(multiprocess_worker_index) self.logger.info(_("Starting object reconstructor in daemon mode.")) # Run the reconstructor continually while True: diff --git a/test/unit/obj/test_reconstructor.py b/test/unit/obj/test_reconstructor.py index be05adc089..f35b4d37ce 100644 --- a/test/unit/obj/test_reconstructor.py +++ b/test/unit/obj/test_reconstructor.py @@ -755,7 +755,7 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase): with mock.patch('swift.obj.reconstructor.ObjectReconstructor.' 'check_ring', return_value=False): self.reconstructor.reconstruct() - msgs = self.reconstructor.logger.get_lines_for_level('info') + msgs = self.logger.get_lines_for_level('info') self.assertIn('Ring change detected. Aborting' ' current reconstruction pass.', msgs[0]) self.assertEqual(self.reconstructor.reconstruction_count, 0) @@ -802,7 +802,7 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase): ], found) self.assertEqual(found_job_types, {object_reconstructor.REVERT}) # but failures keep handoffs remaining - msgs = self.reconstructor.logger.get_lines_for_level('info') + msgs = self.logger.get_lines_for_level('info') self.assertIn('Next pass will continue to revert handoffs', msgs[-1]) self.logger._clear() @@ -818,7 +818,7 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase): self.reconstructor.reconstruct() self.assertEqual(found_job_types, {object_reconstructor.REVERT}) # it's time to turn off handoffs_only - msgs = self.reconstructor.logger.get_lines_for_level('warning') + msgs = self.logger.get_lines_for_level('warning') self.assertIn('You should disable handoffs_only', msgs[-1]) def test_get_partners(self): @@ -895,7 +895,7 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase): self.reconstructor.reconstruct() self.assertFalse(os.path.exists(pol_1_part_1_path)) - warnings = self.reconstructor.logger.get_lines_for_level('warning') + warnings = self.logger.get_lines_for_level('warning') self.assertEqual(2, len(warnings)) # first warning is due to get_hashes failing to take lock on non-dir self.assertIn(pol_1_part_1_path + '/hashes.pkl', warnings[0]) @@ -934,7 +934,7 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase): self.reconstructor._reset_stats() for part_info in self.reconstructor.collect_parts(): self.assertNotIn(part_info['part_path'], status_paths) - warnings = self.reconstructor.logger.get_lines_for_level('warning') + warnings = self.logger.get_lines_for_level('warning') self.assertEqual(0, len(warnings)) for status_path in status_paths: self.assertTrue(os.path.exists(status_path)) @@ -1066,9 +1066,9 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase): # failed jobs don't sync suffixes self.assertFalse( - self.reconstructor.logger.get_lines_for_level('warning')) + self.logger.get_lines_for_level('warning')) self.assertFalse( - self.reconstructor.logger.get_lines_for_level('error')) + self.logger.get_lines_for_level('error')) # handoffs remaining and part exists self.assertEqual(2, self.reconstructor.handoffs_remaining) self.assertTrue(os.path.exists(self.parts_1['2'])) @@ -1099,10 +1099,10 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase): set((r['ip'], r['path']) for r in request_log.requests)) self.assertFalse( - self.reconstructor.logger.get_lines_for_level('error')) + self.logger.get_lines_for_level('error')) # handoffs are cleaned up self.assertEqual(0, self.reconstructor.handoffs_remaining) - warning_msgs = self.reconstructor.logger.get_lines_for_level('warning') + warning_msgs = self.logger.get_lines_for_level('warning') self.assertEqual(1, len(warning_msgs)) self.assertIn('no handoffs remaining', warning_msgs[0]) @@ -1335,7 +1335,7 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase): self.reconstructor.reconstruct() self.assertEqual(0, self.reconstructor.reconstruction_count) - warnings = self.reconstructor.logger.get_lines_for_level('warning') + warnings = self.logger.get_lines_for_level('warning') self.assertIn( "next_part_power set in policy 'one'. Skipping", warnings) @@ -1383,7 +1383,8 @@ class TestWorkerReconstructor(unittest.TestCase): self.assertEqual(num_workers, reconstructor.reconstructor_workers) self.assertEqual(1, len(list(reconstructor.get_worker_args()))) self.assertEqual([ - {'override_partitions': [], 'override_devices': []}, + {'override_partitions': [], 'override_devices': [], + 'multiprocess_worker_index': 0}, ], list(reconstructor.get_worker_args())) do_test(1) do_test(10) @@ -1399,15 +1400,18 @@ class TestWorkerReconstructor(unittest.TestCase): once=True, devices='sdz')) self.assertEqual(1, len(worker_args)) self.assertEqual([{'override_partitions': [], - 'override_devices': ['sdz']}], + 'override_devices': ['sdz'], + 'multiprocess_worker_index': 0}], worker_args) # overrides are ignored in forever mode worker_args = list(reconstructor.get_worker_args( once=False, devices='sdz')) self.assertEqual(2, len(worker_args)) self.assertEqual([ - {'override_partitions': [], 'override_devices': ['sdb']}, - {'override_partitions': [], 'override_devices': ['sdc']} + {'override_partitions': [], 'override_devices': ['sdb'], + 'multiprocess_worker_index': 0}, + {'override_partitions': [], 'override_devices': ['sdc'], + 'multiprocess_worker_index': 1}, ], worker_args) def test_workers_with_devices(self): @@ -1417,8 +1421,10 @@ class TestWorkerReconstructor(unittest.TestCase): self.assertEqual(2, reconstructor.reconstructor_workers) self.assertEqual(2, len(list(reconstructor.get_worker_args()))) expected = [ - {'override_partitions': [], 'override_devices': ['sdb']}, - {'override_partitions': [], 'override_devices': ['sdc']}, + {'override_partitions': [], 'override_devices': ['sdb'], + 'multiprocess_worker_index': 0}, + {'override_partitions': [], 'override_devices': ['sdc'], + 'multiprocess_worker_index': 1}, ] worker_args = list(reconstructor.get_worker_args(once=False)) self.assertEqual(2, len(worker_args)) @@ -1439,15 +1445,21 @@ class TestWorkerReconstructor(unittest.TestCase): once=True, devices='sdb,sdz', partitions='99,333')) self.assertEqual(1, len(worker_args)) self.assertEqual( - [{'override_partitions': [99, 333], 'override_devices': ['sdb']}], + [{'override_partitions': [99, 333], 'override_devices': ['sdb'], + 'multiprocess_worker_index': 0}], worker_args) + # overrides are ignored in forever mode + reconstructor = object_reconstructor.ObjectReconstructor( + {'reconstructor_workers': '2'}, logger=self.logger) + reconstructor.get_local_devices = lambda: ['sdb', 'sdc'] worker_args = list(reconstructor.get_worker_args( once=False, devices='sdb,sdz', partitions='99,333')) - self.assertEqual(2, len(worker_args)) self.assertEqual([ - {'override_partitions': [], 'override_devices': ['sdb']}, - {'override_partitions': [], 'override_devices': ['sdc']} + {'override_partitions': [], 'override_devices': ['sdb'], + 'multiprocess_worker_index': 0}, + {'override_partitions': [], 'override_devices': ['sdc'], + 'multiprocess_worker_index': 1} ], worker_args) def test_workers_with_lots_of_devices(self): @@ -1458,10 +1470,12 @@ class TestWorkerReconstructor(unittest.TestCase): self.assertEqual(2, reconstructor.reconstructor_workers) self.assertEqual(2, len(list(reconstructor.get_worker_args()))) self.assertEqual([ - {'override_partitions': [], 'override_devices': [ - 'sdb', 'sdd', 'sdf']}, - {'override_partitions': [], 'override_devices': [ - 'sdc', 'sde']}, + {'override_partitions': [], + 'override_devices': ['sdb', 'sdd', 'sdf'], + 'multiprocess_worker_index': 0}, + {'override_partitions': [], + 'override_devices': ['sdc', 'sde'], + 'multiprocess_worker_index': 1}, ], list(reconstructor.get_worker_args())) def test_workers_with_lots_of_devices_and_overrides(self): @@ -1479,9 +1493,11 @@ class TestWorkerReconstructor(unittest.TestCase): self.assertEqual([{ 'override_partitions': [99, 333], 'override_devices': ['sdb', 'sdf'], + 'multiprocess_worker_index': 0, }, { 'override_partitions': [99, 333], 'override_devices': ['sdd'], + 'multiprocess_worker_index': 1, }], worker_args) # with 4 override devices, expect 2 per worker @@ -1489,10 +1505,12 @@ class TestWorkerReconstructor(unittest.TestCase): once=True, devices='sdb,sdc,sdd,sdf', partitions='99,333')) self.assertEqual(2, len(worker_args)) self.assertEqual([ - {'override_partitions': [99, 333], 'override_devices': [ - 'sdb', 'sdd']}, - {'override_partitions': [99, 333], 'override_devices': [ - 'sdc', 'sdf']}, + {'override_partitions': [99, 333], + 'override_devices': ['sdb', 'sdd'], + 'multiprocess_worker_index': 0}, + {'override_partitions': [99, 333], + 'override_devices': ['sdc', 'sdf'], + 'multiprocess_worker_index': 1}, ], worker_args) def test_workers_with_lots_of_workers(self): @@ -1502,8 +1520,10 @@ class TestWorkerReconstructor(unittest.TestCase): self.assertEqual(10, reconstructor.reconstructor_workers) self.assertEqual(2, len(list(reconstructor.get_worker_args()))) self.assertEqual([ - {'override_partitions': [], 'override_devices': ['sdb']}, - {'override_partitions': [], 'override_devices': ['sdc']}, + {'override_partitions': [], 'override_devices': ['sdb'], + 'multiprocess_worker_index': 0}, + {'override_partitions': [], 'override_devices': ['sdc'], + 'multiprocess_worker_index': 1}, ], list(reconstructor.get_worker_args())) def test_workers_with_lots_of_workers_and_devices(self): @@ -1514,11 +1534,16 @@ class TestWorkerReconstructor(unittest.TestCase): self.assertEqual(10, reconstructor.reconstructor_workers) self.assertEqual(5, len(list(reconstructor.get_worker_args()))) self.assertEqual([ - {'override_partitions': [], 'override_devices': ['sdb']}, - {'override_partitions': [], 'override_devices': ['sdc']}, - {'override_partitions': [], 'override_devices': ['sdd']}, - {'override_partitions': [], 'override_devices': ['sde']}, - {'override_partitions': [], 'override_devices': ['sdf']}, + {'override_partitions': [], 'override_devices': ['sdb'], + 'multiprocess_worker_index': 0}, + {'override_partitions': [], 'override_devices': ['sdc'], + 'multiprocess_worker_index': 1}, + {'override_partitions': [], 'override_devices': ['sdd'], + 'multiprocess_worker_index': 2}, + {'override_partitions': [], 'override_devices': ['sde'], + 'multiprocess_worker_index': 3}, + {'override_partitions': [], 'override_devices': ['sdf'], + 'multiprocess_worker_index': 4}, ], list(reconstructor.get_worker_args())) def test_workers_with_some_workers_and_devices(self): @@ -1562,17 +1587,28 @@ class TestWorkerReconstructor(unittest.TestCase): # Spot check one full result for sanity's sake reconstructor.reconstructor_workers = 11 self.assertEqual([ - {'override_partitions': [], 'override_devices': ['d1', 'd12']}, - {'override_partitions': [], 'override_devices': ['d2', 'd13']}, - {'override_partitions': [], 'override_devices': ['d3', 'd14']}, - {'override_partitions': [], 'override_devices': ['d4', 'd15']}, - {'override_partitions': [], 'override_devices': ['d5', 'd16']}, - {'override_partitions': [], 'override_devices': ['d6', 'd17']}, - {'override_partitions': [], 'override_devices': ['d7', 'd18']}, - {'override_partitions': [], 'override_devices': ['d8', 'd19']}, - {'override_partitions': [], 'override_devices': ['d9', 'd20']}, - {'override_partitions': [], 'override_devices': ['d10', 'd21']}, - {'override_partitions': [], 'override_devices': ['d11']}, + {'override_partitions': [], 'override_devices': ['d1', 'd12'], + 'multiprocess_worker_index': 0}, + {'override_partitions': [], 'override_devices': ['d2', 'd13'], + 'multiprocess_worker_index': 1}, + {'override_partitions': [], 'override_devices': ['d3', 'd14'], + 'multiprocess_worker_index': 2}, + {'override_partitions': [], 'override_devices': ['d4', 'd15'], + 'multiprocess_worker_index': 3}, + {'override_partitions': [], 'override_devices': ['d5', 'd16'], + 'multiprocess_worker_index': 4}, + {'override_partitions': [], 'override_devices': ['d6', 'd17'], + 'multiprocess_worker_index': 5}, + {'override_partitions': [], 'override_devices': ['d7', 'd18'], + 'multiprocess_worker_index': 6}, + {'override_partitions': [], 'override_devices': ['d8', 'd19'], + 'multiprocess_worker_index': 7}, + {'override_partitions': [], 'override_devices': ['d9', 'd20'], + 'multiprocess_worker_index': 8}, + {'override_partitions': [], 'override_devices': ['d10', 'd21'], + 'multiprocess_worker_index': 9}, + {'override_partitions': [], 'override_devices': ['d11'], + 'multiprocess_worker_index': 10}, ], list(reconstructor.get_worker_args())) def test_next_rcache_update_configured_with_stats_interval(self): @@ -2395,6 +2431,33 @@ class TestWorkerReconstructor(unittest.TestCase): } }, data) + def test_worker_logging(self): + reconstructor = object_reconstructor.ObjectReconstructor({ + 'reconstructor_workers': 4, + 'recon_cache_path': self.recon_cache_path + }, logger=self.logger) + + def log_some_stuff(*a, **kw): + reconstructor.logger.debug("debug message") + reconstructor.logger.info("info message") + reconstructor.logger.warning("warning message") + reconstructor.logger.error("error message") + + with mock.patch.object(reconstructor, 'reconstruct', + log_some_stuff), \ + mock.patch("os.getpid", lambda: 20641): + reconstructor.get_worker_args() + reconstructor.run_once(multiprocess_worker_index=1, + override_devices=['sda', 'sdb']) + + prefix = "[worker 2/4 pid=20641] " + for level, lines in self.logger.logger.all_log_lines().items(): + for line in lines: + self.assertTrue( + line.startswith(prefix), + "%r doesn't start with %r (level %s)" % ( + line, prefix, level)) + @patch_policies(with_ec_default=True) class BaseTestObjectReconstructor(unittest.TestCase): diff --git a/test/unit/obj/test_ssync.py b/test/unit/obj/test_ssync.py index f564d92d40..0d485c9d65 100644 --- a/test/unit/obj/test_ssync.py +++ b/test/unit/obj/test_ssync.py @@ -309,8 +309,8 @@ class TestBaseSsyncEC(TestBaseSsync): def setUp(self): super(TestBaseSsyncEC, self).setUp() self.policy = POLICIES.default - self.daemon = ObjectReconstructor(self.daemon_conf, - debug_logger('test-ssync-sender')) + self.logger = debug_logger('test-ssync-sender') + self.daemon = ObjectReconstructor(self.daemon_conf, self.logger) def _get_object_data(self, path, frag_index=None, **kwargs): # return a frag archive for given object name and frag index. @@ -674,7 +674,7 @@ class TestSsyncEC(TestBaseSsyncEC): self.daemon, self.rx_node, job, ['abc']) success, _ = sender() self.assertFalse(success) - error_log_lines = self.daemon.logger.get_lines_for_level('error') + error_log_lines = self.logger.get_lines_for_level('error') self.assertEqual(1, len(error_log_lines)) error_msg = error_log_lines[0] self.assertIn("Expected status 200; got 400", error_msg) @@ -857,7 +857,7 @@ class TestSsyncECReconstructorSyncJob(TestBaseSsyncEC): self.policy.object_ring, 'get_part_nodes', fake_get_part_nodes): self.reconstructor = ObjectReconstructor( - {}, logger=debug_logger('test_reconstructor')) + {}, logger=self.logger) job = { 'device': self.device, 'partition': self.partition, @@ -892,7 +892,7 @@ class TestSsyncECReconstructorSyncJob(TestBaseSsyncEC): pass # expected outcome if msgs: self.fail('Failed with:\n%s' % '\n'.join(msgs)) - log_lines = self.daemon.logger.get_lines_for_level('error') + log_lines = self.logger.get_lines_for_level('error') self.assertIn('Sent data length does not match content-length', log_lines[0]) self.assertFalse(log_lines[1:]) @@ -926,7 +926,7 @@ class TestSsyncECReconstructorSyncJob(TestBaseSsyncEC): pass # expected outcome if msgs: self.fail('Failed with:\n%s' % '\n'.join(msgs)) - log_lines = self.daemon.logger.get_lines_for_level('error') + log_lines = self.logger.get_lines_for_level('error') self.assertIn('Sent data length does not match content-length', log_lines[0]) self.assertFalse(log_lines[1:]) @@ -969,12 +969,11 @@ class TestSsyncECReconstructorSyncJob(TestBaseSsyncEC): if msgs: self.fail('Failed with:\n%s' % '\n'.join(msgs)) - log_lines = self.reconstructor.logger.get_lines_for_level('error') + log_lines = self.logger.get_lines_for_level('error') self.assertIn('Error trying to rebuild', log_lines[0]) - log_lines = self.daemon.logger.get_lines_for_level('error') self.assertIn('Sent data length does not match content-length', - log_lines[0]) - self.assertFalse(log_lines[1:]) + log_lines[1]) + self.assertFalse(log_lines[2:]) # trampoline for the receiver to write a log eventlet.sleep(0) log_lines = self.rx_logger.get_lines_for_level('warning') @@ -1027,8 +1026,7 @@ class TestSsyncECReconstructorSyncJob(TestBaseSsyncEC): pass # expected outcome if msgs: self.fail('Failed with:\n%s' % '\n'.join(msgs)) - self.assertFalse(self.daemon.logger.get_lines_for_level('error')) - log_lines = self.reconstructor.logger.get_lines_for_level('error') + log_lines = self.logger.get_lines_for_level('error') self.assertIn('Unable to get enough responses', log_lines[0]) # trampoline for the receiver to write a log eventlet.sleep(0) @@ -1063,9 +1061,9 @@ class TestSsyncECReconstructorSyncJob(TestBaseSsyncEC): msgs.append('Missing rx diskfile for %r' % obj_name) if msgs: self.fail('Failed with:\n%s' % '\n'.join(msgs)) - self.assertFalse(self.daemon.logger.get_lines_for_level('error')) + self.assertFalse(self.logger.get_lines_for_level('error')) self.assertFalse( - self.reconstructor.logger.get_lines_for_level('error')) + self.logger.get_lines_for_level('error')) # trampoline for the receiver to write a log eventlet.sleep(0) self.assertFalse(self.rx_logger.get_lines_for_level('warning')) @@ -1076,8 +1074,8 @@ class TestSsyncECReconstructorSyncJob(TestBaseSsyncEC): class TestSsyncReplication(TestBaseSsync): def setUp(self): super(TestSsyncReplication, self).setUp() - self.daemon = ObjectReplicator(self.daemon_conf, - debug_logger('test-ssync-sender')) + self.logger = debug_logger('test-ssync-sender') + self.daemon = ObjectReplicator(self.daemon_conf, self.logger) def test_sync(self): policy = POLICIES.default