Make the object auditor's run-once mode run once.
If you invoked the object auditor with --once, it would run the full-audit checker(s) once, but it would run the ZBF checker over and over until the full-audit checkers were done. Now it runs the ZBF and full-audit checkers once each. Change-Id: Ieeaa6fba4184a069756ee150727f24df7833697a
This commit is contained in:
parent
e354f897c3
commit
04ff0927ee
@ -268,6 +268,7 @@ class ObjectAuditor(Daemon):
|
||||
"""Parallel audit loop"""
|
||||
self.clear_recon_cache('ALL')
|
||||
self.clear_recon_cache('ZBF')
|
||||
once = kwargs.get('mode') == 'once'
|
||||
kwargs['device_dirs'] = override_devices
|
||||
if parent:
|
||||
kwargs['zero_byte_fps'] = zbo_fps
|
||||
@ -294,13 +295,18 @@ class ObjectAuditor(Daemon):
|
||||
if len(pids) == parallel_proc:
|
||||
pid = os.wait()[0]
|
||||
pids.remove(pid)
|
||||
# ZBF scanner must be restarted as soon as it finishes
|
||||
if self.conf_zero_byte_fps and pid == zbf_pid:
|
||||
|
||||
if self.conf_zero_byte_fps and pid == zbf_pid and once:
|
||||
# If we're only running one pass and the ZBF scanner
|
||||
# finished, don't bother restarting it.
|
||||
zbf_pid = -100
|
||||
elif self.conf_zero_byte_fps and pid == zbf_pid:
|
||||
# When we're running forever, the ZBF scanner must
|
||||
# be restarted as soon as it finishes.
|
||||
kwargs['device_dirs'] = override_devices
|
||||
# sleep between ZBF scanner forks
|
||||
self._sleep()
|
||||
zbf_pid = self.fork_child(zero_byte_fps=True,
|
||||
**kwargs)
|
||||
zbf_pid = self.fork_child(zero_byte_fps=True, **kwargs)
|
||||
pids.append(zbf_pid)
|
||||
else:
|
||||
kwargs['device_dirs'] = [device_list.pop()]
|
||||
@ -308,8 +314,9 @@ class ObjectAuditor(Daemon):
|
||||
while pids:
|
||||
pid = os.wait()[0]
|
||||
# ZBF scanner must be restarted as soon as it finishes
|
||||
# unless we're in run-once mode
|
||||
if self.conf_zero_byte_fps and pid == zbf_pid and \
|
||||
len(pids) > 1:
|
||||
len(pids) > 1 and not once:
|
||||
kwargs['device_dirs'] = override_devices
|
||||
# sleep between ZBF scanner forks
|
||||
self._sleep()
|
||||
|
@ -34,6 +34,19 @@ _mocked_policies = [StoragePolicy(0, 'zero', False),
|
||||
StoragePolicy(1, 'one', True)]
|
||||
|
||||
|
||||
def works_only_once(callable_thing, exception):
|
||||
called = [False]
|
||||
|
||||
def only_once(*a, **kw):
|
||||
if called[0]:
|
||||
raise exception
|
||||
else:
|
||||
called[0] = True
|
||||
return callable_thing(*a, **kw)
|
||||
|
||||
return only_once
|
||||
|
||||
|
||||
@patch_policies(_mocked_policies)
|
||||
class TestAuditor(unittest.TestCase):
|
||||
|
||||
@ -548,6 +561,9 @@ class TestAuditor(unittest.TestCase):
|
||||
|
||||
loop_error = Bogus('exception')
|
||||
|
||||
class LetMeOut(BaseException):
|
||||
pass
|
||||
|
||||
class ObjectAuditorMock(object):
|
||||
check_args = ()
|
||||
check_kwargs = {}
|
||||
@ -644,11 +660,13 @@ class TestAuditor(unittest.TestCase):
|
||||
self.assertEqual(mocker.wait_called, 1)
|
||||
|
||||
my_auditor._sleep = mocker.mock_sleep_continue
|
||||
my_auditor.audit_loop = works_only_once(my_auditor.audit_loop,
|
||||
LetMeOut())
|
||||
|
||||
my_auditor.concurrency = 2
|
||||
mocker.fork_called = 0
|
||||
mocker.wait_called = 0
|
||||
my_auditor.run_once()
|
||||
self.assertRaises(LetMeOut, my_auditor.run_forever)
|
||||
# Fork is called no. of devices + (no. of devices)/2 + 1 times
|
||||
# since zbf process is forked (no.of devices)/2 + 1 times
|
||||
no_devices = len(os.listdir(self.devices))
|
||||
@ -661,5 +679,102 @@ class TestAuditor(unittest.TestCase):
|
||||
os.fork = was_fork
|
||||
os.wait = was_wait
|
||||
|
||||
def test_run_audit_once(self):
|
||||
my_auditor = auditor.ObjectAuditor(dict(devices=self.devices,
|
||||
mount_check='false',
|
||||
zero_byte_files_per_second=89,
|
||||
concurrency=1))
|
||||
|
||||
forked_pids = []
|
||||
next_zbf_pid = [2]
|
||||
next_normal_pid = [1001]
|
||||
outstanding_pids = [[]]
|
||||
|
||||
def fake_fork_child(**kwargs):
|
||||
if len(forked_pids) > 10:
|
||||
# something's gone horribly wrong
|
||||
raise BaseException("forking too much")
|
||||
|
||||
# ZBF pids are all smaller than the normal-audit pids; this way
|
||||
# we can return them first.
|
||||
#
|
||||
# Also, ZBF pids are even and normal-audit pids are odd; this is
|
||||
# so humans seeing this test fail can better tell what's happening.
|
||||
if kwargs.get('zero_byte_fps'):
|
||||
pid = next_zbf_pid[0]
|
||||
next_zbf_pid[0] += 2
|
||||
else:
|
||||
pid = next_normal_pid[0]
|
||||
next_normal_pid[0] += 2
|
||||
outstanding_pids[0].append(pid)
|
||||
forked_pids.append(pid)
|
||||
return pid
|
||||
|
||||
def fake_os_wait():
|
||||
# Smallest pid first; that's ZBF if we have one, else normal
|
||||
outstanding_pids[0].sort()
|
||||
pid = outstanding_pids[0].pop(0)
|
||||
return (pid, 0) # (pid, status)
|
||||
|
||||
with mock.patch("swift.obj.auditor.os.wait", fake_os_wait), \
|
||||
mock.patch.object(my_auditor, 'fork_child', fake_fork_child), \
|
||||
mock.patch.object(my_auditor, '_sleep', lambda *a: None):
|
||||
my_auditor.run_once()
|
||||
|
||||
self.assertEqual(sorted(forked_pids), [2, 1001])
|
||||
|
||||
def test_run_parallel_audit_once(self):
|
||||
my_auditor = auditor.ObjectAuditor(
|
||||
dict(devices=self.devices, mount_check='false',
|
||||
zero_byte_files_per_second=89, concurrency=2))
|
||||
|
||||
# ZBF pids are smaller than the normal-audit pids; this way we can
|
||||
# return them first from our mocked os.wait().
|
||||
#
|
||||
# Also, ZBF pids are even and normal-audit pids are odd; this is so
|
||||
# humans seeing this test fail can better tell what's happening.
|
||||
forked_pids = []
|
||||
next_zbf_pid = [2]
|
||||
next_normal_pid = [1001]
|
||||
outstanding_pids = [[]]
|
||||
|
||||
def fake_fork_child(**kwargs):
|
||||
if len(forked_pids) > 10:
|
||||
# something's gone horribly wrong; try not to hang the test
|
||||
# run because of it
|
||||
raise BaseException("forking too much")
|
||||
|
||||
if kwargs.get('zero_byte_fps'):
|
||||
pid = next_zbf_pid[0]
|
||||
next_zbf_pid[0] += 2
|
||||
else:
|
||||
pid = next_normal_pid[0]
|
||||
next_normal_pid[0] += 2
|
||||
outstanding_pids[0].append(pid)
|
||||
forked_pids.append(pid)
|
||||
return pid
|
||||
|
||||
def fake_os_wait():
|
||||
if not outstanding_pids[0]:
|
||||
raise BaseException("nobody waiting")
|
||||
|
||||
# ZBF auditor finishes first
|
||||
outstanding_pids[0].sort()
|
||||
pid = outstanding_pids[0].pop(0)
|
||||
return (pid, 0) # (pid, status)
|
||||
|
||||
# make sure we've got enough devs that the ZBF auditor can finish
|
||||
# before all the normal auditors have been started
|
||||
mkdirs(os.path.join(self.devices, 'sdc'))
|
||||
mkdirs(os.path.join(self.devices, 'sdd'))
|
||||
|
||||
with mock.patch("swift.obj.auditor.os.wait", fake_os_wait), \
|
||||
mock.patch.object(my_auditor, 'fork_child', fake_fork_child), \
|
||||
mock.patch.object(my_auditor, '_sleep', lambda *a: None):
|
||||
my_auditor.run_once()
|
||||
|
||||
self.assertEqual(sorted(forked_pids), [2, 1001, 1003, 1005, 1007])
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
|
Loading…
Reference in New Issue
Block a user