From 627fd801c981676c4573a6e23662d72f7f6ccba3 Mon Sep 17 00:00:00 2001 From: Dmitry Shulyak Date: Tue, 26 Jan 2016 13:19:23 +0200 Subject: [PATCH] Timeout support for scheduled tasks If there will be no update from task X for configured period of time - scheduler will update task X to ERROR state and proceed with retries if those are configured. timeout should protect from network connectivity errors, unnexpected problems with tasks worker software or hardware where tasks worker is running timelimit protects from hanged tasks, mainly because of slow io timeout should be expected to be slightly higher than timelimit Change-Id: I429e12037b787db04e36c93e1fadfe74a023c1f8 --- solar/dblayer/solar_models.py | 1 + .../executors/zerorpc_executor.py | 30 +++++++++++++++---- solar/orchestration/graph.py | 11 +++++-- solar/orchestration/workers/scheduler.py | 26 ++++++++++++---- ...t.py => test_timelimit_timeout_support.py} | 28 +++++++++++++++++ 5 files changed, 83 insertions(+), 13 deletions(-) rename solar/test/functional/{test_timelimit.py => test_timelimit_timeout_support.py} (62%) diff --git a/solar/dblayer/solar_models.py b/solar/dblayer/solar_models.py index f34623f4..ee5a1cd1 100644 --- a/solar/dblayer/solar_models.py +++ b/solar/dblayer/solar_models.py @@ -1043,6 +1043,7 @@ class Task(Model): errmsg = Field(basestring, default=str) timelimit = Field(int, default=int) retry = Field(int, default=int) + timeout = Field(int, default=int) execution = IndexedField(basestring) parents = ParentField(default=list) diff --git a/solar/orchestration/executors/zerorpc_executor.py b/solar/orchestration/executors/zerorpc_executor.py index f4b71af0..286e9ae1 100644 --- a/solar/orchestration/executors/zerorpc_executor.py +++ b/solar/orchestration/executors/zerorpc_executor.py @@ -65,6 +65,11 @@ class PoolBasedPuller(zerorpc.Puller): class LimitedExecutionPuller(PoolBasedPuller): + def __init__(self, *args, **kwargs): + super(LimitedExecutionPuller, self).__init__(*args, **kwargs) + self._timelimit_group = gevent.pool.Group() + self._timeout_group = gevent.pool.Group() + def _handle_event(self, event): ctxt = event.args[0] timelimit = ctxt.get('timelimit', 0) @@ -73,10 +78,22 @@ class LimitedExecutionPuller(PoolBasedPuller): # share a pool with them, or it is highly possible that # it wont be ever executed with low number of greenlets in # a pool - gevent.spawn_later( - timelimit, self._methods['kill'], ctxt, ctxt['task_id']) + self._timelimit_group.add(gevent.spawn_later( + timelimit, self._methods['kill'], + ctxt, ctxt['task_id'])) self._tasks_pool.spawn(self._async_event, event) + def register_timeout(self, seconds, callable_): + self._timeout_group.add( + gevent.spawn_later(seconds, callable_)) + + def run(self): + try: + super(LimitedExecutionPuller, self).run() + finally: + self._timelimit_group.join(raise_error=True) + self._timeout_group.join(raise_error=True) + class Executor(object): @@ -85,6 +102,7 @@ class Executor(object): self.bind_to = bind_to self._tasks_register = {} worker._executor = self + self._server = LimitedExecutionPuller(methods=self.worker) def register(self, ctxt): if 'task_id' in ctxt: @@ -96,10 +114,12 @@ class Executor(object): self._tasks_register[task_id].kill(exc, block=True) self._tasks_register.pop(task_id) + def register_timeout(self, *args): + self._server.register_timeout(*args) + def run(self): - server = LimitedExecutionPuller(methods=self.worker) - server.bind(self.bind_to) - server.run() + self._server.bind(self.bind_to) + self._server.run() class Client(object): diff --git a/solar/orchestration/graph.py b/solar/orchestration/graph.py index 6f15fd97..a035b36f 100644 --- a/solar/orchestration/graph.py +++ b/solar/orchestration/graph.py @@ -31,6 +31,8 @@ def save_graph(graph): # maybe it is possible to store part of information in AsyncResult backend uid = graph.graph['uid'] + # TODO(dshulyak) remove duplication of parameters + # in solar_models.Task and this object for n in nx.topological_sort(graph): t = Task.new( {'name': n, @@ -41,7 +43,8 @@ def save_graph(graph): 'args': graph.node[n].get('args', []), 'errmsg': graph.node[n].get('errmsg', '') or '', 'timelimit': graph.node[n].get('timelimit', 0), - 'retry': graph.node[n].get('retry', 0)}) + 'retry': graph.node[n].get('retry', 0), + 'timeout': graph.node[n].get('timeout', 0)}) graph.node[n]['task'] = t for pred in graph.predecessors(n): pred_task = graph.node[pred]['task'] @@ -56,6 +59,7 @@ def update_graph(graph, force=False): task.status = graph.node[n]['status'] task.errmsg = graph.node[n]['errmsg'] or '' task.retry = graph.node[n].get('retry', 0) + task.timeout = graph.node[n].get('timeout', 0) task.save(force=force) @@ -81,8 +85,9 @@ def get_graph(uid): target=t.target or None, errmsg=t.errmsg or None, task=t, - timelimit=t.timelimit or 0, - retry=t.retry) + timelimit=t.timelimit, + retry=t.retry, + timeout=t.timeout) for u in t.parents.all_names(): dg.add_edge(u, t.name) return dg diff --git a/solar/orchestration/workers/scheduler.py b/solar/orchestration/workers/scheduler.py index 95600a62..a364d58c 100644 --- a/solar/orchestration/workers/scheduler.py +++ b/solar/orchestration/workers/scheduler.py @@ -12,6 +12,7 @@ # License for the specific language governing permissions and limitations # under the License. +from functools import partial import time from solar.core.log import log @@ -65,6 +66,10 @@ class Scheduler(base.Worker): """For single update correct state and other relevant data.""" old_status = plan.node[task_name]['status'] if old_status in VISITED: + log.debug( + 'Task %s already in visited status %s' + ', skipping update to %s', + task_name, old_status, status) return retries_count = plan.node[task_name]['retry'] @@ -84,18 +89,21 @@ class Scheduler(base.Worker): task_type = plan.node[task_name]['type'] plan.node[task_name]['status'] = states.INPROGRESS.name timelimit = plan.node[task_name].get('timelimit', 0) + timeout = plan.node[task_name].get('timeout', 0) ctxt = { 'task_id': task_id, 'task_name': task_name, 'plan_uid': plan.graph['uid'], - 'timelimit': timelimit} + 'timelimit': timelimit, + 'timeout': timeout} + log.debug( + 'Timelimit for task %s - %s, timeout - %s', + task_id, timelimit, timeout) self._tasks( task_type, ctxt, *plan.node[task_name]['args']) - if timelimit: - log.debug( - 'Timelimit for task %s will be %s', - task_id, timelimit) + if timeout: + self._configure_timeout(ctxt, timeout) def update_next(self, ctxt, status, errmsg): log.debug( @@ -112,6 +120,14 @@ class Scheduler(base.Worker): log.debug('Scheduled tasks %r', rst) return rst + def _configure_timeout(self, ctxt, timeout): + if not hasattr(self._executor, 'register_timeout'): + raise NotImplemented('Timeout is not supported') + self._executor.register_timeout( + timeout, + partial(self.update_next, ctxt, + states.ERROR.name, 'Timeout Error')) + class SchedulerCallbackClient(object): diff --git a/solar/test/functional/test_timelimit.py b/solar/test/functional/test_timelimit_timeout_support.py similarity index 62% rename from solar/test/functional/test_timelimit.py rename to solar/test/functional/test_timelimit_timeout_support.py index 422e4c7d..fcd699f2 100644 --- a/solar/test/functional/test_timelimit.py +++ b/solar/test/functional/test_timelimit_timeout_support.py @@ -17,6 +17,8 @@ import time import gevent +import mock +import pytest from solar.errors import ExecutionTimeout from solar.orchestration import graph @@ -39,3 +41,29 @@ def test_timelimit_plan(timelimit_plan, scheduler, tasks): finished_plan = graph.get_graph(timelimit_plan.graph['uid']) assert 'ExecutionTimeout' in finished_plan.node['t1']['errmsg'] assert finished_plan.node['t2']['status'] == states.PENDING.name + + +@pytest.fixture +def timeout_plan(simple_plan): + simple_plan.node['echo_stuff']['timeout'] = 1 + graph.update_graph(simple_plan, force=True) + return simple_plan + + +def test_timeout_plan(timeout_plan, scheduler): + worker, client = scheduler + worker._tasks = mock.Mock() + client.next({}, timeout_plan.graph['uid']) + + def wait_function(timeout): + for summary in graph.wait_finish( + timeout_plan.graph['uid'], timeout): + if summary[states.ERROR.name] == 1: + return summary + time.sleep(0.3) + return summary + waiter = gevent.spawn(wait_function, 2) + waiter.get(block=True, timeout=2) + timeout_plan = graph.get_graph(timeout_plan.graph['uid']) + assert (timeout_plan.node['echo_stuff']['status'] + == states.ERROR.name)