Merge "Make the object auditor's run-once mode run once."

This commit is contained in:
Jenkins 2016-03-22 19:47:41 +00:00 committed by Gerrit Code Review
commit d9a4f18b49
2 changed files with 128 additions and 6 deletions

View File

@ -284,6 +284,7 @@ class ObjectAuditor(Daemon):
"""Parallel audit loop"""
self.clear_recon_cache('ALL')
self.clear_recon_cache('ZBF')
once = kwargs.get('mode') == 'once'
kwargs['device_dirs'] = override_devices
if parent:
kwargs['zero_byte_fps'] = zbo_fps
@ -310,13 +311,18 @@ class ObjectAuditor(Daemon):
if len(pids) == parallel_proc:
pid = os.wait()[0]
pids.remove(pid)
# ZBF scanner must be restarted as soon as it finishes
if self.conf_zero_byte_fps and pid == zbf_pid:
if self.conf_zero_byte_fps and pid == zbf_pid and once:
# If we're only running one pass and the ZBF scanner
# finished, don't bother restarting it.
zbf_pid = -100
elif self.conf_zero_byte_fps and pid == zbf_pid:
# When we're running forever, the ZBF scanner must
# be restarted as soon as it finishes.
kwargs['device_dirs'] = override_devices
# sleep between ZBF scanner forks
self._sleep()
zbf_pid = self.fork_child(zero_byte_fps=True,
**kwargs)
zbf_pid = self.fork_child(zero_byte_fps=True, **kwargs)
pids.append(zbf_pid)
else:
kwargs['device_dirs'] = [device_list.pop()]
@ -324,8 +330,9 @@ class ObjectAuditor(Daemon):
while pids:
pid = os.wait()[0]
# ZBF scanner must be restarted as soon as it finishes
# unless we're in run-once mode
if self.conf_zero_byte_fps and pid == zbf_pid and \
len(pids) > 1:
len(pids) > 1 and not once:
kwargs['device_dirs'] = override_devices
# sleep between ZBF scanner forks
self._sleep()

View File

@ -41,6 +41,19 @@ _mocked_policies = [
]
def works_only_once(callable_thing, exception):
called = [False]
def only_once(*a, **kw):
if called[0]:
raise exception
else:
called[0] = True
return callable_thing(*a, **kw)
return only_once
@patch_policies(_mocked_policies)
class TestAuditor(unittest.TestCase):
@ -603,6 +616,9 @@ class TestAuditor(unittest.TestCase):
loop_error = Bogus('exception')
class LetMeOut(BaseException):
pass
class ObjectAuditorMock(object):
check_args = ()
check_kwargs = {}
@ -699,11 +715,13 @@ class TestAuditor(unittest.TestCase):
self.assertEqual(mocker.wait_called, 1)
my_auditor._sleep = mocker.mock_sleep_continue
my_auditor.audit_loop = works_only_once(my_auditor.audit_loop,
LetMeOut())
my_auditor.concurrency = 2
mocker.fork_called = 0
mocker.wait_called = 0
my_auditor.run_once()
self.assertRaises(LetMeOut, my_auditor.run_forever)
# Fork is called no. of devices + (no. of devices)/2 + 1 times
# since zbf process is forked (no.of devices)/2 + 1 times
no_devices = len(os.listdir(self.devices))
@ -716,5 +734,102 @@ class TestAuditor(unittest.TestCase):
os.fork = was_fork
os.wait = was_wait
def test_run_audit_once(self):
my_auditor = auditor.ObjectAuditor(dict(devices=self.devices,
mount_check='false',
zero_byte_files_per_second=89,
concurrency=1))
forked_pids = []
next_zbf_pid = [2]
next_normal_pid = [1001]
outstanding_pids = [[]]
def fake_fork_child(**kwargs):
if len(forked_pids) > 10:
# something's gone horribly wrong
raise BaseException("forking too much")
# ZBF pids are all smaller than the normal-audit pids; this way
# we can return them first.
#
# Also, ZBF pids are even and normal-audit pids are odd; this is
# so humans seeing this test fail can better tell what's happening.
if kwargs.get('zero_byte_fps'):
pid = next_zbf_pid[0]
next_zbf_pid[0] += 2
else:
pid = next_normal_pid[0]
next_normal_pid[0] += 2
outstanding_pids[0].append(pid)
forked_pids.append(pid)
return pid
def fake_os_wait():
# Smallest pid first; that's ZBF if we have one, else normal
outstanding_pids[0].sort()
pid = outstanding_pids[0].pop(0)
return (pid, 0) # (pid, status)
with mock.patch("swift.obj.auditor.os.wait", fake_os_wait), \
mock.patch.object(my_auditor, 'fork_child', fake_fork_child), \
mock.patch.object(my_auditor, '_sleep', lambda *a: None):
my_auditor.run_once()
self.assertEqual(sorted(forked_pids), [2, 1001])
def test_run_parallel_audit_once(self):
my_auditor = auditor.ObjectAuditor(
dict(devices=self.devices, mount_check='false',
zero_byte_files_per_second=89, concurrency=2))
# ZBF pids are smaller than the normal-audit pids; this way we can
# return them first from our mocked os.wait().
#
# Also, ZBF pids are even and normal-audit pids are odd; this is so
# humans seeing this test fail can better tell what's happening.
forked_pids = []
next_zbf_pid = [2]
next_normal_pid = [1001]
outstanding_pids = [[]]
def fake_fork_child(**kwargs):
if len(forked_pids) > 10:
# something's gone horribly wrong; try not to hang the test
# run because of it
raise BaseException("forking too much")
if kwargs.get('zero_byte_fps'):
pid = next_zbf_pid[0]
next_zbf_pid[0] += 2
else:
pid = next_normal_pid[0]
next_normal_pid[0] += 2
outstanding_pids[0].append(pid)
forked_pids.append(pid)
return pid
def fake_os_wait():
if not outstanding_pids[0]:
raise BaseException("nobody waiting")
# ZBF auditor finishes first
outstanding_pids[0].sort()
pid = outstanding_pids[0].pop(0)
return (pid, 0) # (pid, status)
# make sure we've got enough devs that the ZBF auditor can finish
# before all the normal auditors have been started
mkdirs(os.path.join(self.devices, 'sdc'))
mkdirs(os.path.join(self.devices, 'sdd'))
with mock.patch("swift.obj.auditor.os.wait", fake_os_wait), \
mock.patch.object(my_auditor, 'fork_child', fake_fork_child), \
mock.patch.object(my_auditor, '_sleep', lambda *a: None):
my_auditor.run_once()
self.assertEqual(sorted(forked_pids), [2, 1001, 1003, 1005, 1007])
if __name__ == '__main__':
unittest.main()