Monkey-patch in tests using mock.patch

Change-Id: Idf2205074e8f12bc0e515c860e9c1bddf59561d1
This commit is contained in:
Tim Burke 2021-01-14 14:11:41 -08:00
parent 267d432f86
commit 14a42d573a

View File

@ -1374,55 +1374,46 @@ class TestServer(unittest.TestCase):
def sleep(self, *args, **kwargs):
pass
with temptree([]) as t:
old_stdout = sys.stdout
old_wait = manager.WARNING_WAIT
old_time = manager.time
try:
manager.WARNING_WAIT = 0.01
manager.time = MockTime()
with open(os.path.join(t, 'output'), 'w+') as f:
# actually capture the read stdout (for prints)
sys.stdout = f
# test closing pipe in subprocess unblocks read
with MockProcess() as proc:
server.procs = [proc]
status = server.wait()
self.assertEqual(status, 0)
# wait should return before process exits
self.assertTrue(proc.is_alive())
self.assertFalse(proc.finished)
self.assertTrue(proc.finished) # make sure it did finish
# test output kwarg prints subprocess output
with MockProcess() as proc:
server.procs = [proc]
status = server.wait(output=True)
output = pop_stream(f)
self.assertIn('mock process started', output)
self.assertIn('setup complete', output)
# make sure we don't get prints after stdout was closed
self.assertNotIn('mock process finished', output)
# test process which fails to start
with MockProcess(fail_to_start=True) as proc:
server.procs = [proc]
status = server.wait()
self.assertEqual(status, 1)
self.assertIn('failed', pop_stream(f))
# test multiple procs
procs = [MockProcess(delay=.5) for i in range(3)]
for proc in procs:
proc.start()
server.procs = procs
status = server.wait()
self.assertEqual(status, 0)
for proc in procs:
self.assertTrue(proc.is_alive())
for proc in procs:
proc.join()
finally:
sys.stdout = old_stdout
manager.WARNING_WAIT = old_wait
manager.time = old_time
with temptree([]) as t, open(os.path.join(t, 'output'), 'w+') as f, \
mock.patch.object(sys, 'stdout', f), \
mock.patch.object(manager, 'WARNING_WAIT', 0.01), \
mock.patch.object(manager, 'time', MockTime()):
# Note that we actually capture the read stdout (for prints)
# test closing pipe in subprocess unblocks read
with MockProcess() as proc:
server.procs = [proc]
status = server.wait()
self.assertEqual(status, 0)
# wait should return before process exits
self.assertTrue(proc.is_alive())
self.assertFalse(proc.finished)
self.assertTrue(proc.finished) # make sure it did finish
# test output kwarg prints subprocess output
with MockProcess() as proc:
server.procs = [proc]
status = server.wait(output=True)
output = pop_stream(f)
self.assertIn('mock process started', output)
self.assertIn('setup complete', output)
# make sure we don't get prints after stdout was closed
self.assertNotIn('mock process finished', output)
# test process which fails to start
with MockProcess(fail_to_start=True) as proc:
server.procs = [proc]
status = server.wait()
self.assertEqual(status, 1)
self.assertIn('failed', pop_stream(f))
# test multiple procs
procs = [MockProcess(delay=.5) for i in range(3)]
for proc in procs:
proc.start()
server.procs = procs
status = server.wait()
self.assertEqual(status, 0)
for proc in procs:
self.assertTrue(proc.is_alive())
for proc in procs:
proc.join()
def test_interact(self):
class MockProcess(object):