diff --git a/swift/obj/reconstructor.py b/swift/obj/reconstructor.py index 09d137e949..5cbeaf28cd 100644 --- a/swift/obj/reconstructor.py +++ b/swift/obj/reconstructor.py @@ -1225,6 +1225,10 @@ class ObjectReconstructor(Daemon): recon_update['object_reconstruction_per_disk'] = {} dump_recon_cache(recon_update, self.rcache, self.logger) + def post_multiprocess_run(self): + # This method is called after run_once when using multiple workers. + self.aggregate_recon_update() + def run_once(self, multiprocess_worker_index=None, *args, **kwargs): if multiprocess_worker_index is not None: self._emplace_log_prefix(multiprocess_worker_index) diff --git a/test/unit/obj/test_reconstructor.py b/test/unit/obj/test_reconstructor.py index f35b4d37ce..c2d56fbb07 100644 --- a/test/unit/obj/test_reconstructor.py +++ b/test/unit/obj/test_reconstructor.py @@ -2321,6 +2321,54 @@ class TestWorkerReconstructor(unittest.TestCase): } }, data) + def test_recon_aggregation_at_end_of_run_once(self): + reconstructor = object_reconstructor.ObjectReconstructor({ + 'reconstructor_workers': 2, + 'recon_cache_path': self.recon_cache_path + }, logger=self.logger) + reconstructor.all_local_devices = set(['d0', 'd1', 'd2', 'd3']) + start = time.time() - 1000 + for i in range(4): + with mock.patch('swift.obj.reconstructor.time.time', + return_value=start + (300 * i)), \ + mock.patch('swift.obj.reconstructor.os.getpid', + return_value='pid-%s' % i): + reconstructor.final_recon_dump( + i, override_devices=['d%s' % i]) + # sanity + with open(self.rcache) as f: + data = json.load(f) + self.assertEqual({ + 'object_reconstruction_per_disk': { + 'd0': { + 'object_reconstruction_last': start, + 'object_reconstruction_time': 0.0, + 'pid': 'pid-0', + }, + 'd1': { + 'object_reconstruction_last': start + 300, + 'object_reconstruction_time': 1, + 'pid': 'pid-1', + }, + 'd2': { + 'object_reconstruction_last': start + 600, + 'object_reconstruction_time': 2, + 'pid': 'pid-2', + }, + 'd3': { + 'object_reconstruction_last': start + 900, + 'object_reconstruction_time': 3, + 'pid': 'pid-3', + }, + } + }, data) + + reconstructor.post_multiprocess_run() + with open(self.rcache) as f: + data = json.load(f) + self.assertEqual(start + 900, data['object_reconstruction_last']) + self.assertEqual(15, data['object_reconstruction_time']) + def test_recon_aggregation_races_with_final_recon_dump(self): reconstructor = object_reconstructor.ObjectReconstructor({ 'reconstructor_workers': 2, @@ -2349,7 +2397,7 @@ class TestWorkerReconstructor(unittest.TestCase): }, data) # simulate a second worker concurrently dumping to recon cache while - # parent is aggregatng existing results; mock dump_recon_cache as a + # parent is aggregating existing results; mock dump_recon_cache as a # convenient way to interrupt parent aggregate_recon_update and 'pass # control' to second worker updated_data = [] # state of recon cache just after second worker dump