Timeout support for scheduled tasks

If there will be no update from task X for configured period
of time - scheduler will update task X to ERROR state and
proceed with retries if those are configured.

timeout should protect from network connectivity errors,
unnexpected problems with tasks worker software or hardware
where tasks worker is running

timelimit protects from hanged tasks, mainly because of slow io

timeout should be expected to be slightly higher than timelimit

Change-Id: I429e12037b787db04e36c93e1fadfe74a023c1f8
This commit is contained in:
Dmitry Shulyak 2016-01-26 13:19:23 +02:00
parent b6b6f390d1
commit 627fd801c9
5 changed files with 83 additions and 13 deletions

View File

@ -1043,6 +1043,7 @@ class Task(Model):
errmsg = Field(basestring, default=str)
timelimit = Field(int, default=int)
retry = Field(int, default=int)
timeout = Field(int, default=int)
execution = IndexedField(basestring)
parents = ParentField(default=list)

View File

@ -65,6 +65,11 @@ class PoolBasedPuller(zerorpc.Puller):
class LimitedExecutionPuller(PoolBasedPuller):
def __init__(self, *args, **kwargs):
super(LimitedExecutionPuller, self).__init__(*args, **kwargs)
self._timelimit_group = gevent.pool.Group()
self._timeout_group = gevent.pool.Group()
def _handle_event(self, event):
ctxt = event.args[0]
timelimit = ctxt.get('timelimit', 0)
@ -73,10 +78,22 @@ class LimitedExecutionPuller(PoolBasedPuller):
# share a pool with them, or it is highly possible that
# it wont be ever executed with low number of greenlets in
# a pool
gevent.spawn_later(
timelimit, self._methods['kill'], ctxt, ctxt['task_id'])
self._timelimit_group.add(gevent.spawn_later(
timelimit, self._methods['kill'],
ctxt, ctxt['task_id']))
self._tasks_pool.spawn(self._async_event, event)
def register_timeout(self, seconds, callable_):
self._timeout_group.add(
gevent.spawn_later(seconds, callable_))
def run(self):
try:
super(LimitedExecutionPuller, self).run()
finally:
self._timelimit_group.join(raise_error=True)
self._timeout_group.join(raise_error=True)
class Executor(object):
@ -85,6 +102,7 @@ class Executor(object):
self.bind_to = bind_to
self._tasks_register = {}
worker._executor = self
self._server = LimitedExecutionPuller(methods=self.worker)
def register(self, ctxt):
if 'task_id' in ctxt:
@ -96,10 +114,12 @@ class Executor(object):
self._tasks_register[task_id].kill(exc, block=True)
self._tasks_register.pop(task_id)
def register_timeout(self, *args):
self._server.register_timeout(*args)
def run(self):
server = LimitedExecutionPuller(methods=self.worker)
server.bind(self.bind_to)
server.run()
self._server.bind(self.bind_to)
self._server.run()
class Client(object):

View File

@ -31,6 +31,8 @@ def save_graph(graph):
# maybe it is possible to store part of information in AsyncResult backend
uid = graph.graph['uid']
# TODO(dshulyak) remove duplication of parameters
# in solar_models.Task and this object
for n in nx.topological_sort(graph):
t = Task.new(
{'name': n,
@ -41,7 +43,8 @@ def save_graph(graph):
'args': graph.node[n].get('args', []),
'errmsg': graph.node[n].get('errmsg', '') or '',
'timelimit': graph.node[n].get('timelimit', 0),
'retry': graph.node[n].get('retry', 0)})
'retry': graph.node[n].get('retry', 0),
'timeout': graph.node[n].get('timeout', 0)})
graph.node[n]['task'] = t
for pred in graph.predecessors(n):
pred_task = graph.node[pred]['task']
@ -56,6 +59,7 @@ def update_graph(graph, force=False):
task.status = graph.node[n]['status']
task.errmsg = graph.node[n]['errmsg'] or ''
task.retry = graph.node[n].get('retry', 0)
task.timeout = graph.node[n].get('timeout', 0)
task.save(force=force)
@ -81,8 +85,9 @@ def get_graph(uid):
target=t.target or None,
errmsg=t.errmsg or None,
task=t,
timelimit=t.timelimit or 0,
retry=t.retry)
timelimit=t.timelimit,
retry=t.retry,
timeout=t.timeout)
for u in t.parents.all_names():
dg.add_edge(u, t.name)
return dg

View File

@ -12,6 +12,7 @@
# License for the specific language governing permissions and limitations
# under the License.
from functools import partial
import time
from solar.core.log import log
@ -65,6 +66,10 @@ class Scheduler(base.Worker):
"""For single update correct state and other relevant data."""
old_status = plan.node[task_name]['status']
if old_status in VISITED:
log.debug(
'Task %s already in visited status %s'
', skipping update to %s',
task_name, old_status, status)
return
retries_count = plan.node[task_name]['retry']
@ -84,18 +89,21 @@ class Scheduler(base.Worker):
task_type = plan.node[task_name]['type']
plan.node[task_name]['status'] = states.INPROGRESS.name
timelimit = plan.node[task_name].get('timelimit', 0)
timeout = plan.node[task_name].get('timeout', 0)
ctxt = {
'task_id': task_id,
'task_name': task_name,
'plan_uid': plan.graph['uid'],
'timelimit': timelimit}
'timelimit': timelimit,
'timeout': timeout}
log.debug(
'Timelimit for task %s - %s, timeout - %s',
task_id, timelimit, timeout)
self._tasks(
task_type, ctxt,
*plan.node[task_name]['args'])
if timelimit:
log.debug(
'Timelimit for task %s will be %s',
task_id, timelimit)
if timeout:
self._configure_timeout(ctxt, timeout)
def update_next(self, ctxt, status, errmsg):
log.debug(
@ -112,6 +120,14 @@ class Scheduler(base.Worker):
log.debug('Scheduled tasks %r', rst)
return rst
def _configure_timeout(self, ctxt, timeout):
if not hasattr(self._executor, 'register_timeout'):
raise NotImplemented('Timeout is not supported')
self._executor.register_timeout(
timeout,
partial(self.update_next, ctxt,
states.ERROR.name, 'Timeout Error'))
class SchedulerCallbackClient(object):

View File

@ -17,6 +17,8 @@
import time
import gevent
import mock
import pytest
from solar.errors import ExecutionTimeout
from solar.orchestration import graph
@ -39,3 +41,29 @@ def test_timelimit_plan(timelimit_plan, scheduler, tasks):
finished_plan = graph.get_graph(timelimit_plan.graph['uid'])
assert 'ExecutionTimeout' in finished_plan.node['t1']['errmsg']
assert finished_plan.node['t2']['status'] == states.PENDING.name
@pytest.fixture
def timeout_plan(simple_plan):
simple_plan.node['echo_stuff']['timeout'] = 1
graph.update_graph(simple_plan, force=True)
return simple_plan
def test_timeout_plan(timeout_plan, scheduler):
worker, client = scheduler
worker._tasks = mock.Mock()
client.next({}, timeout_plan.graph['uid'])
def wait_function(timeout):
for summary in graph.wait_finish(
timeout_plan.graph['uid'], timeout):
if summary[states.ERROR.name] == 1:
return summary
time.sleep(0.3)
return summary
waiter = gevent.spawn(wait_function, 2)
waiter.get(block=True, timeout=2)
timeout_plan = graph.get_graph(timeout_plan.graph['uid'])
assert (timeout_plan.node['echo_stuff']['status']
== states.ERROR.name)