Merge "Improve test coverage for loopingcall module"
This commit is contained in:
commit
c78ffce11e
@ -12,10 +12,7 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import time
|
||||
|
||||
from eventlet import greenthread
|
||||
from mox3 import mox
|
||||
import mock
|
||||
from oslotest import base as test_base
|
||||
|
||||
from oslo_service import loopingcall
|
||||
@ -41,6 +38,13 @@ class LoopingCallTestCase(test_base.BaseTestCase):
|
||||
timer = loopingcall.FixedIntervalLoopingCall(_raise_it)
|
||||
self.assertFalse(timer.start(interval=0.5).wait())
|
||||
|
||||
def test_terminate_on_exception(self):
|
||||
def _raise_it():
|
||||
raise RuntimeError()
|
||||
|
||||
timer = loopingcall.FixedIntervalLoopingCall(_raise_it)
|
||||
self.assertRaises(RuntimeError, timer.start(interval=0.5).wait)
|
||||
|
||||
def _wait_for_zero(self):
|
||||
"""Called at an interval until num_runs == 0."""
|
||||
if self.num_runs == 0:
|
||||
@ -54,30 +58,95 @@ class LoopingCallTestCase(test_base.BaseTestCase):
|
||||
timer = loopingcall.FixedIntervalLoopingCall(self._wait_for_zero)
|
||||
self.assertFalse(timer.start(interval=0.5).wait())
|
||||
|
||||
def test_interval_adjustment(self):
|
||||
def assertAlmostEqual(self, expected, actual, precision=7, message=None):
|
||||
self.assertEqual(0, round(actual - expected, precision), message)
|
||||
|
||||
@mock.patch('eventlet.greenthread.sleep')
|
||||
@mock.patch('oslo_service.loopingcall._ts')
|
||||
def test_interval_adjustment(self, time_mock, sleep_mock):
|
||||
"""Ensure the interval is adjusted to account for task duration."""
|
||||
self.num_runs = 3
|
||||
|
||||
now = time.time()
|
||||
now = 1234567890
|
||||
second = 1
|
||||
smidgen = 0.01
|
||||
|
||||
m = mox.Mox()
|
||||
m.StubOutWithMock(greenthread, 'sleep')
|
||||
greenthread.sleep(mox.IsAlmost(0.02))
|
||||
greenthread.sleep(mox.IsAlmost(0.0))
|
||||
greenthread.sleep(mox.IsAlmost(0.0))
|
||||
m.StubOutWithMock(loopingcall, '_ts')
|
||||
loopingcall._ts().AndReturn(now)
|
||||
loopingcall._ts().AndReturn(now + second - smidgen)
|
||||
loopingcall._ts().AndReturn(now)
|
||||
loopingcall._ts().AndReturn(now + second + second)
|
||||
loopingcall._ts().AndReturn(now)
|
||||
loopingcall._ts().AndReturn(now + second + smidgen)
|
||||
loopingcall._ts().AndReturn(now)
|
||||
m.ReplayAll()
|
||||
time_mock.side_effect = [now, # start
|
||||
now + second - smidgen, # end
|
||||
now, # start
|
||||
now + second + second, # end
|
||||
now, # start
|
||||
now + second + smidgen, # end
|
||||
now] # start
|
||||
|
||||
timer = loopingcall.FixedIntervalLoopingCall(self._wait_for_zero)
|
||||
timer.start(interval=1.01).wait()
|
||||
m.UnsetStubs()
|
||||
m.VerifyAll()
|
||||
|
||||
expected_calls = [0.02, 0.00, 0.00]
|
||||
for i, call in enumerate(sleep_mock.call_args_list):
|
||||
expected = expected_calls[i]
|
||||
args, kwargs = call
|
||||
actual = args[0]
|
||||
message = ('Call #%d, expected: %s, actual: %s' %
|
||||
(i, expected, actual))
|
||||
self.assertAlmostEqual(expected, actual, message=message)
|
||||
|
||||
|
||||
class DynamicLoopingCallTestCase(test_base.BaseTestCase):
|
||||
def setUp(self):
|
||||
super(DynamicLoopingCallTestCase, self).setUp()
|
||||
self.num_runs = 0
|
||||
|
||||
def test_return_true(self):
|
||||
def _raise_it():
|
||||
raise loopingcall.LoopingCallDone(True)
|
||||
|
||||
timer = loopingcall.DynamicLoopingCall(_raise_it)
|
||||
self.assertTrue(timer.start().wait())
|
||||
|
||||
def test_return_false(self):
|
||||
def _raise_it():
|
||||
raise loopingcall.LoopingCallDone(False)
|
||||
|
||||
timer = loopingcall.DynamicLoopingCall(_raise_it)
|
||||
self.assertFalse(timer.start().wait())
|
||||
|
||||
def test_terminate_on_exception(self):
|
||||
def _raise_it():
|
||||
raise RuntimeError()
|
||||
|
||||
timer = loopingcall.DynamicLoopingCall(_raise_it)
|
||||
self.assertRaises(RuntimeError, timer.start().wait)
|
||||
|
||||
def _wait_for_zero(self):
|
||||
"""Called at an interval until num_runs == 0."""
|
||||
if self.num_runs == 0:
|
||||
raise loopingcall.LoopingCallDone(False)
|
||||
else:
|
||||
self.num_runs = self.num_runs - 1
|
||||
sleep_for = self.num_runs * 10 + 1 # dynamic duration
|
||||
return sleep_for
|
||||
|
||||
def test_repeat(self):
|
||||
self.num_runs = 2
|
||||
|
||||
timer = loopingcall.DynamicLoopingCall(self._wait_for_zero)
|
||||
self.assertFalse(timer.start().wait())
|
||||
|
||||
@mock.patch('eventlet.greenthread.sleep')
|
||||
def test_interval_adjustment(self, sleep_mock):
|
||||
self.num_runs = 2
|
||||
|
||||
timer = loopingcall.DynamicLoopingCall(self._wait_for_zero)
|
||||
timer.start(periodic_interval_max=5).wait()
|
||||
|
||||
sleep_mock.assert_has_calls([mock.call(5), mock.call(1)])
|
||||
|
||||
@mock.patch('eventlet.greenthread.sleep')
|
||||
def test_initial_delay(self, sleep_mock):
|
||||
self.num_runs = 1
|
||||
|
||||
timer = loopingcall.DynamicLoopingCall(self._wait_for_zero)
|
||||
timer.start(initial_delay=3).wait()
|
||||
|
||||
sleep_mock.assert_has_calls([mock.call(3), mock.call(1)])
|
||||
|
Loading…
x
Reference in New Issue
Block a user