Make final stats dump after reconstructor runs once

When running in multiprocess mode, the object reconstructor would
periodically aggregate its workers' recon data into a single recon
measurement. However, at the end of the run, all that was left in
recon was the last periodic measurement; any work that took place
after that point was not recored in the aggregate. However, it was
recorded in the per-disk stats that the worker processes emitted.

This commit adds a final recon aggregation after the worker processes
have finished.

Change-Id: Ia6a3a931e9e7a23824765b2ab111a5492e509be8
This commit is contained in:
Samuel Merritt 2018-04-02 17:34:44 -07:00
parent 84f2bfcb2e
commit ecf47553b5
2 changed files with 53 additions and 1 deletions

View File

@ -1225,6 +1225,10 @@ class ObjectReconstructor(Daemon):
recon_update['object_reconstruction_per_disk'] = {}
dump_recon_cache(recon_update, self.rcache, self.logger)
def post_multiprocess_run(self):
# This method is called after run_once when using multiple workers.
self.aggregate_recon_update()
def run_once(self, multiprocess_worker_index=None, *args, **kwargs):
if multiprocess_worker_index is not None:
self._emplace_log_prefix(multiprocess_worker_index)

View File

@ -2321,6 +2321,54 @@ class TestWorkerReconstructor(unittest.TestCase):
}
}, data)
def test_recon_aggregation_at_end_of_run_once(self):
reconstructor = object_reconstructor.ObjectReconstructor({
'reconstructor_workers': 2,
'recon_cache_path': self.recon_cache_path
}, logger=self.logger)
reconstructor.all_local_devices = set(['d0', 'd1', 'd2', 'd3'])
start = time.time() - 1000
for i in range(4):
with mock.patch('swift.obj.reconstructor.time.time',
return_value=start + (300 * i)), \
mock.patch('swift.obj.reconstructor.os.getpid',
return_value='pid-%s' % i):
reconstructor.final_recon_dump(
i, override_devices=['d%s' % i])
# sanity
with open(self.rcache) as f:
data = json.load(f)
self.assertEqual({
'object_reconstruction_per_disk': {
'd0': {
'object_reconstruction_last': start,
'object_reconstruction_time': 0.0,
'pid': 'pid-0',
},
'd1': {
'object_reconstruction_last': start + 300,
'object_reconstruction_time': 1,
'pid': 'pid-1',
},
'd2': {
'object_reconstruction_last': start + 600,
'object_reconstruction_time': 2,
'pid': 'pid-2',
},
'd3': {
'object_reconstruction_last': start + 900,
'object_reconstruction_time': 3,
'pid': 'pid-3',
},
}
}, data)
reconstructor.post_multiprocess_run()
with open(self.rcache) as f:
data = json.load(f)
self.assertEqual(start + 900, data['object_reconstruction_last'])
self.assertEqual(15, data['object_reconstruction_time'])
def test_recon_aggregation_races_with_final_recon_dump(self):
reconstructor = object_reconstructor.ObjectReconstructor({
'reconstructor_workers': 2,
@ -2349,7 +2397,7 @@ class TestWorkerReconstructor(unittest.TestCase):
}, data)
# simulate a second worker concurrently dumping to recon cache while
# parent is aggregatng existing results; mock dump_recon_cache as a
# parent is aggregating existing results; mock dump_recon_cache as a
# convenient way to interrupt parent aggregate_recon_update and 'pass
# control' to second worker
updated_data = [] # state of recon cache just after second worker dump