Merge "Support configurable retries on orchestration layer"
This commit is contained in:
commit
34b5783b20
@ -1042,6 +1042,7 @@ class Task(Model):
|
||||
args = Field(list)
|
||||
errmsg = Field(basestring, default=str)
|
||||
timelimit = Field(int, default=int)
|
||||
retry = Field(int, default=int)
|
||||
|
||||
execution = IndexedField(basestring)
|
||||
parents = ParentField(default=list)
|
||||
|
@ -40,7 +40,8 @@ def save_graph(graph):
|
||||
'task_type': graph.node[n].get('type', ''),
|
||||
'args': graph.node[n].get('args', []),
|
||||
'errmsg': graph.node[n].get('errmsg', '') or '',
|
||||
'timelimit': graph.node[n].get('timelimit', 0)})
|
||||
'timelimit': graph.node[n].get('timelimit', 0),
|
||||
'retry': graph.node[n].get('retry', 0)})
|
||||
graph.node[n]['task'] = t
|
||||
for pred in graph.predecessors(n):
|
||||
pred_task = graph.node[pred]['task']
|
||||
@ -49,12 +50,13 @@ def save_graph(graph):
|
||||
t.save()
|
||||
|
||||
|
||||
def update_graph(graph):
|
||||
def update_graph(graph, force=False):
|
||||
for n in graph:
|
||||
task = graph.node[n]['task']
|
||||
task.status = graph.node[n]['status']
|
||||
task.errmsg = graph.node[n]['errmsg'] or ''
|
||||
task.save()
|
||||
task.retry = graph.node[n].get('retry', 0)
|
||||
task.save(force=force)
|
||||
|
||||
|
||||
def set_states(uid, tasks):
|
||||
@ -79,7 +81,8 @@ def get_graph(uid):
|
||||
target=t.target or None,
|
||||
errmsg=t.errmsg or None,
|
||||
task=t,
|
||||
timelimit=t.timelimit or 0)
|
||||
timelimit=t.timelimit or 0,
|
||||
retry=t.retry)
|
||||
for u in t.parents.all_names():
|
||||
dg.add_edge(u, t.name)
|
||||
return dg
|
||||
|
@ -21,6 +21,7 @@ task can be scheduled for execution if it is not yet visited, and state
|
||||
not in SKIPPED, INPROGRESS
|
||||
|
||||
PENDING - task that is scheduled to be executed
|
||||
ERROR_RETRY - task was in error but will be re-executed
|
||||
ERROR - visited node, but failed, can be failed by timeout
|
||||
SUCCESS - visited node, successfull
|
||||
INPROGRESS - task already scheduled, can be moved to ERROR or SUCCESS
|
||||
@ -30,7 +31,9 @@ NOOP - task wont be executed, but should be treated as visited
|
||||
|
||||
from enum import Enum
|
||||
|
||||
states = Enum('States', 'SUCCESS ERROR NOOP INPROGRESS SKIPPED PENDING')
|
||||
states = Enum(
|
||||
'States',
|
||||
'SUCCESS ERROR NOOP INPROGRESS SKIPPED PENDING ERROR_RETRY')
|
||||
|
||||
VISITED = (states.SUCCESS.name, states.NOOP.name)
|
||||
BLOCKED = (states.INPROGRESS.name, states.SKIPPED.name, states.ERROR.name)
|
||||
|
@ -18,7 +18,9 @@ from solar.core.log import log
|
||||
from solar.dblayer.locking import Lock
|
||||
from solar.orchestration import graph
|
||||
from solar.orchestration import limits
|
||||
from solar.orchestration.traversal import states
|
||||
from solar.orchestration.traversal import traverse
|
||||
from solar.orchestration.traversal import VISITED
|
||||
from solar.orchestration.workers import base
|
||||
from solar.utils import get_current_ident
|
||||
|
||||
@ -33,7 +35,7 @@ class Scheduler(base.Worker):
|
||||
tasks = traverse(dg)
|
||||
filtered_tasks = list(limits.get_default_chain(
|
||||
dg,
|
||||
[t for t in dg if dg.node[t]['status'] == 'INPROGRESS'],
|
||||
[t for t in dg if dg.node[t]['status'] == states.INPROGRESS.name],
|
||||
tasks))
|
||||
return filtered_tasks
|
||||
|
||||
@ -45,8 +47,8 @@ class Scheduler(base.Worker):
|
||||
for task_name in rst:
|
||||
task_id = '{}:{}'.format(dg.graph['uid'], task_name)
|
||||
task_type = dg.node[task_name]['type']
|
||||
dg.node[task_name]['status'] = 'INPROGRESS'
|
||||
timelimit = dg.node[task_name].get('timelimit', 0)
|
||||
dg.node[task_name]['status'] = states.INPROGRESS.name
|
||||
ctxt = {
|
||||
'task_id': task_id,
|
||||
'task_name': task_name,
|
||||
@ -68,10 +70,28 @@ class Scheduler(base.Worker):
|
||||
with Lock(plan_uid, str(get_current_ident()), retries=20, wait=1):
|
||||
dg = graph.get_graph(plan_uid)
|
||||
for n in dg:
|
||||
if dg.node[n]['status'] == 'PENDING':
|
||||
dg.node[n]['status'] = 'SKIPPED'
|
||||
if dg.node[n]['status'] in (
|
||||
states.PENDING.name, states.PENDING_RETRY.name):
|
||||
dg.node[n]['status'] = states.SKIPPED.name
|
||||
graph.update_graph(dg)
|
||||
|
||||
def _handle_update(self, plan, task_name, status, errmsg=''):
|
||||
old_status = plan.node[task_name]['status']
|
||||
if old_status in VISITED:
|
||||
return
|
||||
retries_count = plan.node[task_name]['retry']
|
||||
|
||||
if status == states.ERROR.name and retries_count > 0:
|
||||
retries_count -= 1
|
||||
status = states.ERROR_RETRY.name
|
||||
log.debug('Retry task %s in plan, retries left %s',
|
||||
task_name, plan.graph['uid'], retries_count)
|
||||
else:
|
||||
plan.node[task_name]['end_time'] = time.time()
|
||||
plan.node[task_name]['status'] = status
|
||||
plan.node[task_name]['errmsg'] = errmsg
|
||||
plan.node[task_name]['retry'] = retries_count
|
||||
|
||||
def update_next(self, ctxt, status, errmsg):
|
||||
log.debug(
|
||||
'Received update for TASK %s - %s %s',
|
||||
@ -79,15 +99,13 @@ class Scheduler(base.Worker):
|
||||
plan_uid, task_name = ctxt['task_id'].rsplit(':', 1)
|
||||
with Lock(plan_uid, str(get_current_ident()), retries=20, wait=1):
|
||||
dg = graph.get_graph(plan_uid)
|
||||
dg.node[task_name]['status'] = status
|
||||
dg.node[task_name]['errmsg'] = errmsg
|
||||
dg.node[task_name]['end_time'] = time.time()
|
||||
self._handle_update(dg, task_name, status, errmsg=errmsg)
|
||||
rst = self._next(dg)
|
||||
for task_name in rst:
|
||||
task_id = '{}:{}'.format(dg.graph['uid'], task_name)
|
||||
task_type = dg.node[task_name]['type']
|
||||
dg.node[task_name]['status'] = 'INPROGRESS'
|
||||
timelimit = dg.node[task_name].get('timelimit', 0)
|
||||
dg.node[task_name]['status'] = states.INPROGRESS.name
|
||||
ctxt = {
|
||||
'task_id': task_id,
|
||||
'task_name': task_name,
|
||||
@ -111,7 +129,7 @@ class SchedulerCallbackClient(object):
|
||||
self.client = client
|
||||
|
||||
def update(self, ctxt, result, *args, **kwargs):
|
||||
self.client.update_next(ctxt, 'SUCCESS', '')
|
||||
self.client.update_next(ctxt, states.SUCCESS.name, '')
|
||||
|
||||
def error(self, ctxt, result, *args, **kwargs):
|
||||
self.client.update_next(ctxt, 'ERROR', repr(result))
|
||||
self.client.update_next(ctxt, states.ERROR.name, repr(result))
|
||||
|
@ -30,7 +30,7 @@ def tasks_worker():
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def tasks_for_scheduler(tasks_worker, address):
|
||||
def tasks_for_scheduler(request, tasks_worker, address):
|
||||
address = address + 'tasks'
|
||||
executor = zerorpc_executor.Executor(tasks_worker, address)
|
||||
gevent.spawn(executor.run)
|
||||
@ -38,8 +38,8 @@ def tasks_for_scheduler(tasks_worker, address):
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def scheduler(tasks_for_scheduler, address):
|
||||
address = address + 'scheduler'
|
||||
def scheduler(tasks_for_scheduler, scheduler_address):
|
||||
address = scheduler_address
|
||||
worker = wscheduler.Scheduler(tasks_for_scheduler)
|
||||
|
||||
def session_end(ctxt):
|
||||
|
97
solar/test/functional/test_retries.py
Normal file
97
solar/test/functional/test_retries.py
Normal file
@ -0,0 +1,97 @@
|
||||
# Copyright 2015 Mirantis, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import time
|
||||
|
||||
import gevent
|
||||
import mock
|
||||
import pytest
|
||||
|
||||
from solar.core.log import log
|
||||
from solar.dblayer.model import ModelMeta
|
||||
from solar.orchestration.executors import zerorpc_executor
|
||||
from solar.orchestration import graph
|
||||
from solar.orchestration.traversal import states
|
||||
from solar.orchestration.workers import scheduler as wscheduler
|
||||
from solar.orchestration.workers import tasks as wtasks
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def simple_plan_retries(simple_plan):
|
||||
simple_plan.node['just_fail']['retry'] = 1
|
||||
graph.update_graph(simple_plan, force=True)
|
||||
return simple_plan
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def scheduler(request, scheduler_address):
|
||||
tasks_client = None
|
||||
if 'tasks' in request.node.fixturenames:
|
||||
tasks_client = zerorpc_executor.Client(
|
||||
request.getfuncargvalue('tasks_address'))
|
||||
worker = wscheduler.Scheduler(tasks_client)
|
||||
|
||||
def session_end(ctxt):
|
||||
log.debug('Session end ID %s', id(gevent.getcurrent()))
|
||||
ModelMeta.session_end()
|
||||
|
||||
def session_start(ctxt):
|
||||
log.debug('Session start ID %s', id(gevent.getcurrent()))
|
||||
ModelMeta.session_start()
|
||||
|
||||
worker.for_all.before(session_start)
|
||||
worker.for_all.after(session_end)
|
||||
|
||||
executor = zerorpc_executor.Executor(worker, scheduler_address)
|
||||
gevent.spawn(executor.run)
|
||||
return worker, zerorpc_executor.Client(scheduler_address)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def tasks(request, tasks_address):
|
||||
worker = wtasks.Tasks()
|
||||
executor = zerorpc_executor.Executor(worker, tasks_address)
|
||||
|
||||
if 'scheduler' in request.node.fixturenames:
|
||||
scheduler_client = wscheduler.SchedulerCallbackClient(
|
||||
zerorpc_executor.Client(request.getfuncargvalue(
|
||||
'scheduler_address')))
|
||||
worker.for_all.on_success(scheduler_client.update)
|
||||
worker.for_all.on_error(scheduler_client.error)
|
||||
|
||||
gevent.spawn(executor.run)
|
||||
return worker, zerorpc_executor.Client(tasks_address)
|
||||
|
||||
|
||||
def test_retry_just_fail(scheduler, tasks, simple_plan_retries):
|
||||
timeout = 3
|
||||
plan = simple_plan_retries
|
||||
worker, client = scheduler
|
||||
tracer = mock.Mock()
|
||||
worker.for_all.on_success(tracer.update)
|
||||
|
||||
def wait_function(timeout):
|
||||
for summary in graph.wait_finish(plan.graph['uid'], timeout):
|
||||
assert summary[states.ERROR.name] <= 1
|
||||
time.sleep(0.5)
|
||||
return summary
|
||||
client.next({}, plan.graph['uid'])
|
||||
waiter = gevent.spawn(wait_function, timeout)
|
||||
waiter.join(timeout=timeout)
|
||||
assert len(tracer.update.call_args_list) == 4
|
||||
for call in tracer.update.call_args_list[2:]:
|
||||
args, _ = call
|
||||
ctxt, rst, status, msg = args
|
||||
assert ctxt['task_name'] == 'just_fail'
|
||||
assert status == states.ERROR.name
|
@ -62,7 +62,8 @@ def test_wait_finish(simple):
|
||||
'NOOP': 0,
|
||||
'ERROR': 0,
|
||||
'INPROGRESS': 0,
|
||||
'PENDING': 0
|
||||
'PENDING': 0,
|
||||
'ERROR_RETRY': 0,
|
||||
}
|
||||
|
||||
|
||||
@ -76,7 +77,8 @@ def test_several_updates(simple):
|
||||
'NOOP': 0,
|
||||
'ERROR': 1,
|
||||
'INPROGRESS': 0,
|
||||
'PENDING': 1
|
||||
'PENDING': 1,
|
||||
'ERROR_RETRY': 0,
|
||||
}
|
||||
|
||||
simple.node['echo_stuff']['status'] = states.ERROR.name
|
||||
@ -88,5 +90,6 @@ def test_several_updates(simple):
|
||||
'NOOP': 0,
|
||||
'ERROR': 2,
|
||||
'INPROGRESS': 0,
|
||||
'PENDING': 0
|
||||
'PENDING': 0,
|
||||
'ERROR_RETRY': 0,
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user