Merge "Timeout support for scheduled tasks"
This commit is contained in:
commit
2164e76012
@ -1043,6 +1043,7 @@ class Task(Model):
|
||||
errmsg = Field(basestring, default=str)
|
||||
timelimit = Field(int, default=int)
|
||||
retry = Field(int, default=int)
|
||||
timeout = Field(int, default=int)
|
||||
|
||||
execution = IndexedField(basestring)
|
||||
parents = ParentField(default=list)
|
||||
|
@ -65,6 +65,11 @@ class PoolBasedPuller(zerorpc.Puller):
|
||||
|
||||
class LimitedExecutionPuller(PoolBasedPuller):
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
super(LimitedExecutionPuller, self).__init__(*args, **kwargs)
|
||||
self._timelimit_group = gevent.pool.Group()
|
||||
self._timeout_group = gevent.pool.Group()
|
||||
|
||||
def _handle_event(self, event):
|
||||
ctxt = event.args[0]
|
||||
timelimit = ctxt.get('timelimit', 0)
|
||||
@ -73,10 +78,22 @@ class LimitedExecutionPuller(PoolBasedPuller):
|
||||
# share a pool with them, or it is highly possible that
|
||||
# it wont be ever executed with low number of greenlets in
|
||||
# a pool
|
||||
gevent.spawn_later(
|
||||
timelimit, self._methods['kill'], ctxt, ctxt['task_id'])
|
||||
self._timelimit_group.add(gevent.spawn_later(
|
||||
timelimit, self._methods['kill'],
|
||||
ctxt, ctxt['task_id']))
|
||||
self._tasks_pool.spawn(self._async_event, event)
|
||||
|
||||
def register_timeout(self, seconds, callable_):
|
||||
self._timeout_group.add(
|
||||
gevent.spawn_later(seconds, callable_))
|
||||
|
||||
def run(self):
|
||||
try:
|
||||
super(LimitedExecutionPuller, self).run()
|
||||
finally:
|
||||
self._timelimit_group.join(raise_error=True)
|
||||
self._timeout_group.join(raise_error=True)
|
||||
|
||||
|
||||
class Executor(object):
|
||||
|
||||
@ -85,6 +102,7 @@ class Executor(object):
|
||||
self.bind_to = bind_to
|
||||
self._tasks_register = {}
|
||||
worker._executor = self
|
||||
self._server = LimitedExecutionPuller(methods=self.worker)
|
||||
|
||||
def register(self, ctxt):
|
||||
if 'task_id' in ctxt:
|
||||
@ -96,10 +114,12 @@ class Executor(object):
|
||||
self._tasks_register[task_id].kill(exc, block=True)
|
||||
self._tasks_register.pop(task_id)
|
||||
|
||||
def register_timeout(self, *args):
|
||||
self._server.register_timeout(*args)
|
||||
|
||||
def run(self):
|
||||
server = LimitedExecutionPuller(methods=self.worker)
|
||||
server.bind(self.bind_to)
|
||||
server.run()
|
||||
self._server.bind(self.bind_to)
|
||||
self._server.run()
|
||||
|
||||
|
||||
class Client(object):
|
||||
|
@ -31,6 +31,8 @@ def save_graph(graph):
|
||||
# maybe it is possible to store part of information in AsyncResult backend
|
||||
uid = graph.graph['uid']
|
||||
|
||||
# TODO(dshulyak) remove duplication of parameters
|
||||
# in solar_models.Task and this object
|
||||
for n in nx.topological_sort(graph):
|
||||
t = Task.new(
|
||||
{'name': n,
|
||||
@ -41,7 +43,8 @@ def save_graph(graph):
|
||||
'args': graph.node[n].get('args', []),
|
||||
'errmsg': graph.node[n].get('errmsg', '') or '',
|
||||
'timelimit': graph.node[n].get('timelimit', 0),
|
||||
'retry': graph.node[n].get('retry', 0)})
|
||||
'retry': graph.node[n].get('retry', 0),
|
||||
'timeout': graph.node[n].get('timeout', 0)})
|
||||
graph.node[n]['task'] = t
|
||||
for pred in graph.predecessors(n):
|
||||
pred_task = graph.node[pred]['task']
|
||||
@ -56,6 +59,7 @@ def update_graph(graph, force=False):
|
||||
task.status = graph.node[n]['status']
|
||||
task.errmsg = graph.node[n]['errmsg'] or ''
|
||||
task.retry = graph.node[n].get('retry', 0)
|
||||
task.timeout = graph.node[n].get('timeout', 0)
|
||||
task.save(force=force)
|
||||
|
||||
|
||||
@ -81,8 +85,9 @@ def get_graph(uid):
|
||||
target=t.target or None,
|
||||
errmsg=t.errmsg or None,
|
||||
task=t,
|
||||
timelimit=t.timelimit or 0,
|
||||
retry=t.retry)
|
||||
timelimit=t.timelimit,
|
||||
retry=t.retry,
|
||||
timeout=t.timeout)
|
||||
for u in t.parents.all_names():
|
||||
dg.add_edge(u, t.name)
|
||||
return dg
|
||||
|
@ -12,6 +12,7 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from functools import partial
|
||||
import time
|
||||
|
||||
from solar.core.log import log
|
||||
@ -65,6 +66,10 @@ class Scheduler(base.Worker):
|
||||
"""For single update correct state and other relevant data."""
|
||||
old_status = plan.node[task_name]['status']
|
||||
if old_status in VISITED:
|
||||
log.debug(
|
||||
'Task %s already in visited status %s'
|
||||
', skipping update to %s',
|
||||
task_name, old_status, status)
|
||||
return
|
||||
retries_count = plan.node[task_name]['retry']
|
||||
|
||||
@ -84,18 +89,21 @@ class Scheduler(base.Worker):
|
||||
task_type = plan.node[task_name]['type']
|
||||
plan.node[task_name]['status'] = states.INPROGRESS.name
|
||||
timelimit = plan.node[task_name].get('timelimit', 0)
|
||||
timeout = plan.node[task_name].get('timeout', 0)
|
||||
ctxt = {
|
||||
'task_id': task_id,
|
||||
'task_name': task_name,
|
||||
'plan_uid': plan.graph['uid'],
|
||||
'timelimit': timelimit}
|
||||
'timelimit': timelimit,
|
||||
'timeout': timeout}
|
||||
log.debug(
|
||||
'Timelimit for task %s - %s, timeout - %s',
|
||||
task_id, timelimit, timeout)
|
||||
self._tasks(
|
||||
task_type, ctxt,
|
||||
*plan.node[task_name]['args'])
|
||||
if timelimit:
|
||||
log.debug(
|
||||
'Timelimit for task %s will be %s',
|
||||
task_id, timelimit)
|
||||
if timeout:
|
||||
self._configure_timeout(ctxt, timeout)
|
||||
|
||||
def update_next(self, ctxt, status, errmsg):
|
||||
log.debug(
|
||||
@ -112,6 +120,14 @@ class Scheduler(base.Worker):
|
||||
log.debug('Scheduled tasks %r', rst)
|
||||
return rst
|
||||
|
||||
def _configure_timeout(self, ctxt, timeout):
|
||||
if not hasattr(self._executor, 'register_timeout'):
|
||||
raise NotImplemented('Timeout is not supported')
|
||||
self._executor.register_timeout(
|
||||
timeout,
|
||||
partial(self.update_next, ctxt,
|
||||
states.ERROR.name, 'Timeout Error'))
|
||||
|
||||
|
||||
class SchedulerCallbackClient(object):
|
||||
|
||||
|
@ -17,6 +17,8 @@
|
||||
import time
|
||||
|
||||
import gevent
|
||||
import mock
|
||||
import pytest
|
||||
|
||||
from solar.errors import ExecutionTimeout
|
||||
from solar.orchestration import graph
|
||||
@ -39,3 +41,29 @@ def test_timelimit_plan(timelimit_plan, scheduler, tasks):
|
||||
finished_plan = graph.get_graph(timelimit_plan.graph['uid'])
|
||||
assert 'ExecutionTimeout' in finished_plan.node['t1']['errmsg']
|
||||
assert finished_plan.node['t2']['status'] == states.PENDING.name
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def timeout_plan(simple_plan):
|
||||
simple_plan.node['echo_stuff']['timeout'] = 1
|
||||
graph.update_graph(simple_plan, force=True)
|
||||
return simple_plan
|
||||
|
||||
|
||||
def test_timeout_plan(timeout_plan, scheduler):
|
||||
worker, client = scheduler
|
||||
worker._tasks = mock.Mock()
|
||||
client.next({}, timeout_plan.graph['uid'])
|
||||
|
||||
def wait_function(timeout):
|
||||
for summary in graph.wait_finish(
|
||||
timeout_plan.graph['uid'], timeout):
|
||||
if summary[states.ERROR.name] == 1:
|
||||
return summary
|
||||
time.sleep(0.3)
|
||||
return summary
|
||||
waiter = gevent.spawn(wait_function, 2)
|
||||
waiter.get(block=True, timeout=2)
|
||||
timeout_plan = graph.get_graph(timeout_plan.graph['uid'])
|
||||
assert (timeout_plan.node['echo_stuff']['status']
|
||||
== states.ERROR.name)
|
Loading…
x
Reference in New Issue
Block a user