Merge "Make final stats dump after reconstructor runs once"
This commit is contained in:
commit
46443b7fbf
@ -1225,6 +1225,10 @@ class ObjectReconstructor(Daemon):
|
|||||||
recon_update['object_reconstruction_per_disk'] = {}
|
recon_update['object_reconstruction_per_disk'] = {}
|
||||||
dump_recon_cache(recon_update, self.rcache, self.logger)
|
dump_recon_cache(recon_update, self.rcache, self.logger)
|
||||||
|
|
||||||
|
def post_multiprocess_run(self):
|
||||||
|
# This method is called after run_once when using multiple workers.
|
||||||
|
self.aggregate_recon_update()
|
||||||
|
|
||||||
def run_once(self, multiprocess_worker_index=None, *args, **kwargs):
|
def run_once(self, multiprocess_worker_index=None, *args, **kwargs):
|
||||||
if multiprocess_worker_index is not None:
|
if multiprocess_worker_index is not None:
|
||||||
self._emplace_log_prefix(multiprocess_worker_index)
|
self._emplace_log_prefix(multiprocess_worker_index)
|
||||||
|
@ -2321,6 +2321,54 @@ class TestWorkerReconstructor(unittest.TestCase):
|
|||||||
}
|
}
|
||||||
}, data)
|
}, data)
|
||||||
|
|
||||||
|
def test_recon_aggregation_at_end_of_run_once(self):
|
||||||
|
reconstructor = object_reconstructor.ObjectReconstructor({
|
||||||
|
'reconstructor_workers': 2,
|
||||||
|
'recon_cache_path': self.recon_cache_path
|
||||||
|
}, logger=self.logger)
|
||||||
|
reconstructor.all_local_devices = set(['d0', 'd1', 'd2', 'd3'])
|
||||||
|
start = time.time() - 1000
|
||||||
|
for i in range(4):
|
||||||
|
with mock.patch('swift.obj.reconstructor.time.time',
|
||||||
|
return_value=start + (300 * i)), \
|
||||||
|
mock.patch('swift.obj.reconstructor.os.getpid',
|
||||||
|
return_value='pid-%s' % i):
|
||||||
|
reconstructor.final_recon_dump(
|
||||||
|
i, override_devices=['d%s' % i])
|
||||||
|
# sanity
|
||||||
|
with open(self.rcache) as f:
|
||||||
|
data = json.load(f)
|
||||||
|
self.assertEqual({
|
||||||
|
'object_reconstruction_per_disk': {
|
||||||
|
'd0': {
|
||||||
|
'object_reconstruction_last': start,
|
||||||
|
'object_reconstruction_time': 0.0,
|
||||||
|
'pid': 'pid-0',
|
||||||
|
},
|
||||||
|
'd1': {
|
||||||
|
'object_reconstruction_last': start + 300,
|
||||||
|
'object_reconstruction_time': 1,
|
||||||
|
'pid': 'pid-1',
|
||||||
|
},
|
||||||
|
'd2': {
|
||||||
|
'object_reconstruction_last': start + 600,
|
||||||
|
'object_reconstruction_time': 2,
|
||||||
|
'pid': 'pid-2',
|
||||||
|
},
|
||||||
|
'd3': {
|
||||||
|
'object_reconstruction_last': start + 900,
|
||||||
|
'object_reconstruction_time': 3,
|
||||||
|
'pid': 'pid-3',
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}, data)
|
||||||
|
|
||||||
|
reconstructor.post_multiprocess_run()
|
||||||
|
with open(self.rcache) as f:
|
||||||
|
data = json.load(f)
|
||||||
|
self.assertEqual(start + 900, data['object_reconstruction_last'])
|
||||||
|
self.assertEqual(15, data['object_reconstruction_time'])
|
||||||
|
|
||||||
def test_recon_aggregation_races_with_final_recon_dump(self):
|
def test_recon_aggregation_races_with_final_recon_dump(self):
|
||||||
reconstructor = object_reconstructor.ObjectReconstructor({
|
reconstructor = object_reconstructor.ObjectReconstructor({
|
||||||
'reconstructor_workers': 2,
|
'reconstructor_workers': 2,
|
||||||
@ -2349,7 +2397,7 @@ class TestWorkerReconstructor(unittest.TestCase):
|
|||||||
}, data)
|
}, data)
|
||||||
|
|
||||||
# simulate a second worker concurrently dumping to recon cache while
|
# simulate a second worker concurrently dumping to recon cache while
|
||||||
# parent is aggregatng existing results; mock dump_recon_cache as a
|
# parent is aggregating existing results; mock dump_recon_cache as a
|
||||||
# convenient way to interrupt parent aggregate_recon_update and 'pass
|
# convenient way to interrupt parent aggregate_recon_update and 'pass
|
||||||
# control' to second worker
|
# control' to second worker
|
||||||
updated_data = [] # state of recon cache just after second worker dump
|
updated_data = [] # state of recon cache just after second worker dump
|
||||||
|
Loading…
Reference in New Issue
Block a user