Merge "Log (Watchdog's) Timeouts with duration"
This commit is contained in:
commit
f99a6e5762
@ -1621,8 +1621,10 @@ class LogAdapter(logging.LoggerAdapter, object):
|
|||||||
emsg = '%s: %s' % (exc.__class__.__name__, exc.line)
|
emsg = '%s: %s' % (exc.__class__.__name__, exc.line)
|
||||||
elif isinstance(exc, eventlet.Timeout):
|
elif isinstance(exc, eventlet.Timeout):
|
||||||
emsg = exc.__class__.__name__
|
emsg = exc.__class__.__name__
|
||||||
if hasattr(exc, 'seconds'):
|
detail = '%ss' % exc.seconds
|
||||||
emsg += ' (%ss)' % exc.seconds
|
if hasattr(exc, 'created_at'):
|
||||||
|
detail += ' after %0.2fs' % (time.time() - exc.created_at)
|
||||||
|
emsg += ' (%s)' % detail
|
||||||
if isinstance(exc, swift.common.exceptions.MessageTimeout):
|
if isinstance(exc, swift.common.exceptions.MessageTimeout):
|
||||||
if exc.msg:
|
if exc.msg:
|
||||||
emsg += ' %s' % exc.msg
|
emsg += ' %s' % exc.msg
|
||||||
@ -6218,14 +6220,15 @@ class Watchdog(object):
|
|||||||
|
|
||||||
:param timeout: duration before the timeout expires
|
:param timeout: duration before the timeout expires
|
||||||
:param exc: exception to throw when the timeout expire, must inherit
|
:param exc: exception to throw when the timeout expire, must inherit
|
||||||
from eventlet.timeouts.Timeout
|
from eventlet.Timeout
|
||||||
:param timeout_at: allow to force the expiration timestamp
|
:param timeout_at: allow to force the expiration timestamp
|
||||||
:return: id of the scheduled timeout, needed to cancel it
|
:return: id of the scheduled timeout, needed to cancel it
|
||||||
"""
|
"""
|
||||||
|
now = time.time()
|
||||||
if not timeout_at:
|
if not timeout_at:
|
||||||
timeout_at = time.time() + timeout
|
timeout_at = now + timeout
|
||||||
gth = eventlet.greenthread.getcurrent()
|
gth = eventlet.greenthread.getcurrent()
|
||||||
timeout_definition = (timeout, timeout_at, gth, exc)
|
timeout_definition = (timeout, timeout_at, gth, exc, now)
|
||||||
key = id(timeout_definition)
|
key = id(timeout_definition)
|
||||||
self._timeouts[key] = timeout_definition
|
self._timeouts[key] = timeout_definition
|
||||||
|
|
||||||
@ -6248,8 +6251,7 @@ class Watchdog(object):
|
|||||||
:param key: timeout id, as returned by start()
|
:param key: timeout id, as returned by start()
|
||||||
"""
|
"""
|
||||||
try:
|
try:
|
||||||
if key in self._timeouts:
|
del(self._timeouts[key])
|
||||||
del(self._timeouts[key])
|
|
||||||
except KeyError:
|
except KeyError:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
@ -6269,15 +6271,14 @@ class Watchdog(object):
|
|||||||
self._next_expiration = None
|
self._next_expiration = None
|
||||||
if self._evt.ready():
|
if self._evt.ready():
|
||||||
self._evt.reset()
|
self._evt.reset()
|
||||||
for k, (timeout, timeout_at, gth, exc) in list(self._timeouts.items()):
|
for k, (timeout, timeout_at, gth, exc,
|
||||||
|
created_at) in list(self._timeouts.items()):
|
||||||
if timeout_at <= now:
|
if timeout_at <= now:
|
||||||
try:
|
self.stop(k)
|
||||||
if k in self._timeouts:
|
|
||||||
del(self._timeouts[k])
|
|
||||||
except KeyError:
|
|
||||||
pass
|
|
||||||
e = exc()
|
e = exc()
|
||||||
|
# set this after __init__ to keep it off the eventlet scheduler
|
||||||
e.seconds = timeout
|
e.seconds = timeout
|
||||||
|
e.created_at = created_at
|
||||||
eventlet.hubs.get_hub().schedule_call_global(0, gth.throw, e)
|
eventlet.hubs.get_hub().schedule_call_global(0, gth.throw, e)
|
||||||
else:
|
else:
|
||||||
if (self._next_expiration is None
|
if (self._next_expiration is None
|
||||||
|
@ -1199,11 +1199,15 @@ class TestUtils(unittest.TestCase):
|
|||||||
# test eventlet.Timeout
|
# test eventlet.Timeout
|
||||||
with ConnectionTimeout(42, 'my error message') \
|
with ConnectionTimeout(42, 'my error message') \
|
||||||
as connection_timeout:
|
as connection_timeout:
|
||||||
log_exception(connection_timeout)
|
now = time.time()
|
||||||
|
connection_timeout.created_at = now - 123.456
|
||||||
|
with mock.patch('swift.common.utils.time.time',
|
||||||
|
return_value=now):
|
||||||
|
log_exception(connection_timeout)
|
||||||
log_msg = strip_value(sio)
|
log_msg = strip_value(sio)
|
||||||
self.assertNotIn('Traceback', log_msg)
|
self.assertNotIn('Traceback', log_msg)
|
||||||
self.assertTrue('ConnectionTimeout' in log_msg)
|
self.assertTrue('ConnectionTimeout' in log_msg)
|
||||||
self.assertTrue('(42s)' in log_msg)
|
self.assertTrue('(42s after 123.46s)' in log_msg)
|
||||||
self.assertNotIn('my error message', log_msg)
|
self.assertNotIn('my error message', log_msg)
|
||||||
|
|
||||||
with MessageTimeout(42, 'my error message') as message_timeout:
|
with MessageTimeout(42, 'my error message') as message_timeout:
|
||||||
@ -8799,13 +8803,16 @@ class TestWatchdog(unittest.TestCase):
|
|||||||
w._evt.send = mock.Mock(side_effect=w._evt.send)
|
w._evt.send = mock.Mock(side_effect=w._evt.send)
|
||||||
gth = object()
|
gth = object()
|
||||||
|
|
||||||
|
now = time.time()
|
||||||
|
timeout_value = 1.0
|
||||||
with patch('eventlet.greenthread.getcurrent', return_value=gth),\
|
with patch('eventlet.greenthread.getcurrent', return_value=gth),\
|
||||||
patch('time.time', return_value=10.0):
|
patch('time.time', return_value=now):
|
||||||
# On first call, _next_expiration is None, it should unblock
|
# On first call, _next_expiration is None, it should unblock
|
||||||
# greenthread that is blocked for ever
|
# greenthread that is blocked for ever
|
||||||
key = w.start(1.0, Timeout)
|
key = w.start(timeout_value, Timeout)
|
||||||
self.assertIn(key, w._timeouts)
|
self.assertIn(key, w._timeouts)
|
||||||
self.assertEqual(w._timeouts[key], (1.0, 11.0, gth, Timeout))
|
self.assertEqual(w._timeouts[key], (
|
||||||
|
timeout_value, now + timeout_value, gth, Timeout, now))
|
||||||
w._evt.send.assert_called_once()
|
w._evt.send.assert_called_once()
|
||||||
|
|
||||||
w.stop(key)
|
w.stop(key)
|
||||||
|
@ -4927,7 +4927,7 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase):
|
|||||||
for line in error_lines[:nparity]:
|
for line in error_lines[:nparity]:
|
||||||
self.assertIn('retrying', line)
|
self.assertIn('retrying', line)
|
||||||
for line in error_lines[nparity:]:
|
for line in error_lines[nparity:]:
|
||||||
self.assertIn('ChunkReadTimeout (0.01s)', line)
|
self.assertIn('ChunkReadTimeout (0.01s', line)
|
||||||
for line in self.logger.logger.records['ERROR']:
|
for line in self.logger.logger.records['ERROR']:
|
||||||
self.assertIn(req.headers['x-trans-id'], line)
|
self.assertIn(req.headers['x-trans-id'], line)
|
||||||
|
|
||||||
@ -5012,7 +5012,7 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase):
|
|||||||
error_lines = self.logger.get_lines_for_level('error')
|
error_lines = self.logger.get_lines_for_level('error')
|
||||||
self.assertEqual(ndata, len(error_lines))
|
self.assertEqual(ndata, len(error_lines))
|
||||||
for line in error_lines:
|
for line in error_lines:
|
||||||
self.assertIn('ChunkReadTimeout (0.01s)', line)
|
self.assertIn('ChunkReadTimeout (0.01s', line)
|
||||||
for line in self.logger.logger.records['ERROR']:
|
for line in self.logger.logger.records['ERROR']:
|
||||||
self.assertIn(req.headers['x-trans-id'], line)
|
self.assertIn(req.headers['x-trans-id'], line)
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user