Remove duplicated code in orchestration/scheduler

Create _do_scheduling method which will wrap setting correct state
and sending messages to various components

Change-Id: I162f63b0f55a7874403313b828e8c59b2b8ff904
This commit is contained in:
Dmitry Shulyak 2016-01-25 17:26:39 +02:00
parent 671eada557
commit b6b6f390d1

View File

@ -31,51 +31,38 @@ class Scheduler(base.Worker):
self._tasks = tasks_client self._tasks = tasks_client
super(Scheduler, self).__init__() super(Scheduler, self).__init__()
def _next(self, dg): def _next(self, plan):
tasks = traverse(dg) tasks = traverse(plan)
filtered_tasks = list(limits.get_default_chain( filtered_tasks = list(limits.get_default_chain(
dg, plan,
[t for t in dg if dg.node[t]['status'] == states.INPROGRESS.name], [t for t in plan
if plan.node[t]['status'] == states.INPROGRESS.name],
tasks)) tasks))
return filtered_tasks return filtered_tasks
def next(self, ctxt, plan_uid): def next(self, ctxt, plan_uid):
with Lock(plan_uid, str(get_current_ident()), retries=20, wait=1): with Lock(plan_uid, str(get_current_ident()), retries=20, wait=1):
log.debug('Received *next* event for %s', plan_uid) log.debug('Received *next* event for %s', plan_uid)
dg = graph.get_graph(plan_uid) plan = graph.get_graph(plan_uid)
rst = self._next(dg) rst = self._next(plan)
for task_name in rst: for task_name in rst:
task_id = '{}:{}'.format(dg.graph['uid'], task_name) self._do_scheduling(plan, task_name)
task_type = dg.node[task_name]['type'] graph.update_graph(plan)
timelimit = dg.node[task_name].get('timelimit', 0)
dg.node[task_name]['status'] = states.INPROGRESS.name
ctxt = {
'task_id': task_id,
'task_name': task_name,
'plan_uid': plan_uid,
'timelimit': timelimit}
self._tasks(
task_type, ctxt,
*dg.node[task_name]['args'])
if timelimit:
log.debug(
'Timelimit for task %s will be %s',
task_id, timelimit)
graph.update_graph(dg)
log.debug('Scheduled tasks %r', rst) log.debug('Scheduled tasks %r', rst)
# process tasks with tasks client # process tasks with tasks client
return rst return rst
def soft_stop(self, ctxt, plan_uid): def soft_stop(self, ctxt, plan_uid):
with Lock(plan_uid, str(get_current_ident()), retries=20, wait=1): with Lock(plan_uid, str(get_current_ident()), retries=20, wait=1):
dg = graph.get_graph(plan_uid) plan = graph.get_graph(plan_uid)
for n in dg: for n in plan:
if dg.node[n]['status'] in ( if plan.node[n]['status'] in (
states.PENDING.name, states.PENDING_RETRY.name): states.PENDING.name, states.PENDING_RETRY.name):
dg.node[n]['status'] = states.SKIPPED.name plan.node[n]['status'] = states.SKIPPED.name
graph.update_graph(dg) graph.update_graph(plan)
def _handle_update(self, plan, task_name, status, errmsg=''): def _do_update(self, plan, task_name, status, errmsg=''):
"""For single update correct state and other relevant data."""
old_status = plan.node[task_name]['status'] old_status = plan.node[task_name]['status']
if old_status in VISITED: if old_status in VISITED:
return return
@ -92,33 +79,36 @@ class Scheduler(base.Worker):
plan.node[task_name]['errmsg'] = errmsg plan.node[task_name]['errmsg'] = errmsg
plan.node[task_name]['retry'] = retries_count plan.node[task_name]['retry'] = retries_count
def _do_scheduling(self, plan, task_name):
task_id = '{}:{}'.format(plan.graph['uid'], task_name)
task_type = plan.node[task_name]['type']
plan.node[task_name]['status'] = states.INPROGRESS.name
timelimit = plan.node[task_name].get('timelimit', 0)
ctxt = {
'task_id': task_id,
'task_name': task_name,
'plan_uid': plan.graph['uid'],
'timelimit': timelimit}
self._tasks(
task_type, ctxt,
*plan.node[task_name]['args'])
if timelimit:
log.debug(
'Timelimit for task %s will be %s',
task_id, timelimit)
def update_next(self, ctxt, status, errmsg): def update_next(self, ctxt, status, errmsg):
log.debug( log.debug(
'Received update for TASK %s - %s %s', 'Received update for TASK %s - %s %s',
ctxt['task_id'], status, errmsg) ctxt['task_id'], status, errmsg)
plan_uid, task_name = ctxt['task_id'].rsplit(':', 1) plan_uid, task_name = ctxt['task_id'].rsplit(':', 1)
with Lock(plan_uid, str(get_current_ident()), retries=20, wait=1): with Lock(plan_uid, str(get_current_ident()), retries=20, wait=1):
dg = graph.get_graph(plan_uid) plan = graph.get_graph(plan_uid)
self._handle_update(dg, task_name, status, errmsg=errmsg) self._do_update(plan, task_name, status, errmsg=errmsg)
rst = self._next(dg) rst = self._next(plan)
for task_name in rst: for task_name in rst:
task_id = '{}:{}'.format(dg.graph['uid'], task_name) self._do_scheduling(plan, task_name)
task_type = dg.node[task_name]['type'] graph.update_graph(plan)
timelimit = dg.node[task_name].get('timelimit', 0)
dg.node[task_name]['status'] = states.INPROGRESS.name
ctxt = {
'task_id': task_id,
'task_name': task_name,
'plan_uid': plan_uid,
'timelimit': timelimit}
self._tasks(
task_type, ctxt,
*dg.node[task_name]['args'])
if timelimit:
log.debug(
'Timelimit for task %s will be %s',
task_id, timelimit)
graph.update_graph(dg)
log.debug('Scheduled tasks %r', rst) log.debug('Scheduled tasks %r', rst)
return rst return rst