Drop reconstructor stats when worker has no devices
If you're watching (new) node's reconstruction_last time to ensure a cycle finishes since the last ring rebalance you won't ever see reconstructors with no devices drop recon stats. Change-Id: I84c07fc6841119b00d1a74078fe53f4ce637187b
This commit is contained in:
parent
1f751a7979
commit
63ca3a74ef
@ -1237,8 +1237,8 @@ class ObjectReconstructor(Daemon):
|
|||||||
'object_reconstruction_last': time.time(),
|
'object_reconstruction_last': time.time(),
|
||||||
}
|
}
|
||||||
|
|
||||||
if self.reconstructor_workers > 0:
|
devices = override_devices or self.all_local_devices
|
||||||
devices = override_devices or self.all_local_devices
|
if self.reconstructor_workers > 0 and devices:
|
||||||
recon_update['pid'] = os.getpid()
|
recon_update['pid'] = os.getpid()
|
||||||
recon_update = {'object_reconstruction_per_disk': {
|
recon_update = {'object_reconstruction_per_disk': {
|
||||||
d: recon_update for d in devices}}
|
d: recon_update for d in devices}}
|
||||||
|
@ -1653,37 +1653,49 @@ class TestWorkerReconstructor(unittest.TestCase):
|
|||||||
'object_reconstruction_last': now,
|
'object_reconstruction_last': now,
|
||||||
'object_reconstruction_time': total,
|
'object_reconstruction_time': total,
|
||||||
}, data)
|
}, data)
|
||||||
# per_disk_stats with workers
|
|
||||||
|
def check_per_disk_stats(before, now, old_total, total,
|
||||||
|
override_devices):
|
||||||
|
with mock.patch('swift.obj.reconstructor.time.time',
|
||||||
|
return_value=now), \
|
||||||
|
mock.patch('swift.obj.reconstructor.os.getpid',
|
||||||
|
return_value='pid-1'):
|
||||||
|
reconstructor.final_recon_dump(
|
||||||
|
total, override_devices=override_devices)
|
||||||
|
with open(self.rcache) as f:
|
||||||
|
data = json.load(f)
|
||||||
|
self.assertEqual({
|
||||||
|
'object_reconstruction_last': before,
|
||||||
|
'object_reconstruction_time': old_total,
|
||||||
|
'object_reconstruction_per_disk': {
|
||||||
|
'sda': {
|
||||||
|
'object_reconstruction_last': now,
|
||||||
|
'object_reconstruction_time': total,
|
||||||
|
'pid': 'pid-1',
|
||||||
|
},
|
||||||
|
'sdc': {
|
||||||
|
'object_reconstruction_last': now,
|
||||||
|
'object_reconstruction_time': total,
|
||||||
|
'pid': 'pid-1',
|
||||||
|
},
|
||||||
|
|
||||||
|
},
|
||||||
|
}, data)
|
||||||
|
|
||||||
|
# per_disk_stats with workers and local_devices
|
||||||
reconstructor.reconstructor_workers = 1
|
reconstructor.reconstructor_workers = 1
|
||||||
old_total = total
|
old_total = total
|
||||||
total = 16.0
|
total = 16.0
|
||||||
before = now
|
before = now
|
||||||
now += total * 60
|
now += total * 60
|
||||||
with mock.patch('swift.obj.reconstructor.time.time',
|
check_per_disk_stats(before, now, old_total, total, ['sda', 'sdc'])
|
||||||
return_value=now), \
|
|
||||||
mock.patch('swift.obj.reconstructor.os.getpid',
|
# per_disk_stats with workers and local_devices but no overrides
|
||||||
return_value='pid-1'):
|
reconstructor.reconstructor_workers = 1
|
||||||
reconstructor.final_recon_dump(total, override_devices=[
|
total = 17.0
|
||||||
'sda', 'sdc'])
|
now += total * 60
|
||||||
with open(self.rcache) as f:
|
check_per_disk_stats(before, now, old_total, total, [])
|
||||||
data = json.load(f)
|
|
||||||
self.assertEqual({
|
|
||||||
'object_reconstruction_last': before,
|
|
||||||
'object_reconstruction_time': old_total,
|
|
||||||
'object_reconstruction_per_disk': {
|
|
||||||
'sda': {
|
|
||||||
'object_reconstruction_last': now,
|
|
||||||
'object_reconstruction_time': total,
|
|
||||||
'pid': 'pid-1',
|
|
||||||
},
|
|
||||||
'sdc': {
|
|
||||||
'object_reconstruction_last': now,
|
|
||||||
'object_reconstruction_time': total,
|
|
||||||
'pid': 'pid-1',
|
|
||||||
},
|
|
||||||
|
|
||||||
},
|
|
||||||
}, data)
|
|
||||||
# and without workers we clear it out
|
# and without workers we clear it out
|
||||||
reconstructor.reconstructor_workers = 0
|
reconstructor.reconstructor_workers = 0
|
||||||
total = 18.0
|
total = 18.0
|
||||||
@ -1697,6 +1709,27 @@ class TestWorkerReconstructor(unittest.TestCase):
|
|||||||
'object_reconstruction_time': total,
|
'object_reconstruction_time': total,
|
||||||
}, data)
|
}, data)
|
||||||
|
|
||||||
|
# set per disk stats again...
|
||||||
|
reconstructor.reconstructor_workers = 1
|
||||||
|
old_total = total
|
||||||
|
total = 18.0
|
||||||
|
before = now
|
||||||
|
now += total * 60
|
||||||
|
check_per_disk_stats(before, now, old_total, total, ['sda', 'sdc'])
|
||||||
|
|
||||||
|
# ...then remove all devices and check we clear out per-disk stats
|
||||||
|
reconstructor.all_local_devices = []
|
||||||
|
total = 20.0
|
||||||
|
now += total * 60
|
||||||
|
with mock.patch('swift.obj.reconstructor.time.time', return_value=now):
|
||||||
|
reconstructor.final_recon_dump(total)
|
||||||
|
with open(self.rcache) as f:
|
||||||
|
data = json.load(f)
|
||||||
|
self.assertEqual({
|
||||||
|
'object_reconstruction_last': now,
|
||||||
|
'object_reconstruction_time': total,
|
||||||
|
}, data)
|
||||||
|
|
||||||
def test_dump_recon_run_once_inline(self):
|
def test_dump_recon_run_once_inline(self):
|
||||||
reconstructor = object_reconstructor.ObjectReconstructor(
|
reconstructor = object_reconstructor.ObjectReconstructor(
|
||||||
{'recon_cache_path': self.recon_cache_path},
|
{'recon_cache_path': self.recon_cache_path},
|
||||||
@ -1778,9 +1811,14 @@ class TestWorkerReconstructor(unittest.TestCase):
|
|||||||
# a recon dump, but it'll get cleaned up in the next aggregation
|
# a recon dump, but it'll get cleaned up in the next aggregation
|
||||||
os.unlink(self.rcache)
|
os.unlink(self.rcache)
|
||||||
do_test(dict(devices='sdz'), 'sdz')
|
do_test(dict(devices='sdz'), 'sdz')
|
||||||
|
# repeat with no local devices
|
||||||
|
reconstructor.get_local_devices = lambda: set()
|
||||||
|
os.unlink(self.rcache)
|
||||||
|
do_test(dict(devices='sdz'), 'sdz')
|
||||||
|
|
||||||
# now disable workers and check that inline run_once updates rcache
|
# now disable workers and check that inline run_once updates rcache
|
||||||
# and clears out per disk stats
|
# and clears out per disk stats
|
||||||
|
reconstructor.get_local_devices = lambda: {'sda'}
|
||||||
now = time.time()
|
now = time.time()
|
||||||
later = now + 600 # 10 mins
|
later = now + 600 # 10 mins
|
||||||
reconstructor.reconstructor_workers = 0
|
reconstructor.reconstructor_workers = 0
|
||||||
@ -1973,6 +2011,63 @@ class TestWorkerReconstructor(unittest.TestCase):
|
|||||||
}
|
}
|
||||||
}, data)
|
}, data)
|
||||||
|
|
||||||
|
def test_run_forever_recon_no_devices(self):
|
||||||
|
|
||||||
|
class StopForever(Exception):
|
||||||
|
pass
|
||||||
|
|
||||||
|
reconstructor = object_reconstructor.ObjectReconstructor({
|
||||||
|
'reconstructor_workers': 2,
|
||||||
|
'recon_cache_path': self.recon_cache_path
|
||||||
|
}, logger=self.logger)
|
||||||
|
|
||||||
|
def run_forever_but_stop(pid, mock_times, worker_kwargs):
|
||||||
|
with mock.patch('swift.obj.reconstructor.time.time',
|
||||||
|
side_effect=mock_times), \
|
||||||
|
mock.patch('swift.obj.reconstructor.os.getpid',
|
||||||
|
return_value=pid), \
|
||||||
|
mock.patch('swift.obj.reconstructor.sleep',
|
||||||
|
side_effect=[StopForever]), \
|
||||||
|
Timeout(.3), quiet_eventlet_exceptions(), \
|
||||||
|
self.assertRaises(StopForever):
|
||||||
|
gt = spawn(reconstructor.run_forever, **worker_kwargs)
|
||||||
|
gt.wait()
|
||||||
|
|
||||||
|
reconstructor.reconstruct = mock.MagicMock()
|
||||||
|
now = time.time()
|
||||||
|
# first run_forever with no devices
|
||||||
|
reconstructor.get_local_devices = lambda: []
|
||||||
|
later = now + 6 # 6 sec
|
||||||
|
worker_args = list(
|
||||||
|
# include 'devices' kwarg as a sanity check - it should be ignored
|
||||||
|
# in run_forever mode
|
||||||
|
reconstructor.get_worker_args(once=False, devices='sda'))
|
||||||
|
run_forever_but_stop('pid-1', [now, later, later], worker_args[0])
|
||||||
|
# override args are passed to reconstruct
|
||||||
|
self.assertEqual([mock.call(
|
||||||
|
override_devices=[],
|
||||||
|
override_partitions=[]
|
||||||
|
)], reconstructor.reconstruct.call_args_list)
|
||||||
|
# forever mode with no args, we expect total recon dumps
|
||||||
|
self.assertTrue(os.path.exists(self.rcache))
|
||||||
|
with open(self.rcache) as f:
|
||||||
|
data = json.load(f)
|
||||||
|
expected = {
|
||||||
|
'object_reconstruction_last': later,
|
||||||
|
'object_reconstruction_time': 0.1,
|
||||||
|
}
|
||||||
|
self.assertEqual(expected, data)
|
||||||
|
reconstructor.reconstruct.reset_mock()
|
||||||
|
|
||||||
|
# aggregation is done in the parent thread even later
|
||||||
|
now = later + 300
|
||||||
|
with mock.patch('swift.obj.reconstructor.time.time',
|
||||||
|
side_effect=[now]):
|
||||||
|
reconstructor.aggregate_recon_update()
|
||||||
|
with open(self.rcache) as f:
|
||||||
|
data = json.load(f)
|
||||||
|
self.assertEqual(expected, data)
|
||||||
|
|
||||||
def test_recon_aggregation_waits_for_all_devices(self):
|
def test_recon_aggregation_waits_for_all_devices(self):
|
||||||
reconstructor = object_reconstructor.ObjectReconstructor({
|
reconstructor = object_reconstructor.ObjectReconstructor({
|
||||||
'reconstructor_workers': 2,
|
'reconstructor_workers': 2,
|
||||||
|
Loading…
x
Reference in New Issue
Block a user