Merge pull request #52 from Mirantis/limits

Introduce limits for scheduling tasks
This commit is contained in:
Łukasz Oleś 2015-07-24 12:44:01 +02:00
commit e2017bfc98
13 changed files with 294 additions and 323 deletions

View File

@ -16,7 +16,7 @@ fi
. $VENV/bin/activate
pip install -r solar/requirements.txt --download-cache=/tmp/$JOB_NAME
pip install -r solar/test-requirements.txt --download-cache=/tmp/$JOB_NAME
pushd solar/solar

View File

@ -6,7 +6,6 @@ networkx==1.9.1
PyYAML==3.11
jsonschema==2.4.0
requests==2.7.0
#mock
dictdiffer==0.4.0
enum34==1.0.4
redis==2.10.3
@ -16,3 +15,4 @@ inflection
Fabric==1.10.2
tabulate==0.7.5
ansible
celery

View File

@ -0,0 +1,37 @@
from solar.orchestration.runner import app
from celery import group
def celery_executor(dg, tasks, control_tasks=()):
to_execute = []
for task_name in tasks:
# task_id needs to be unique, so for each plan we will use
# generated uid of this plan and task_name
task_id = '{}:{}'.format(dg.graph['uid'], task_name)
task = app.tasks[dg.node[task_name]['type']]
if all_success(dg, dg.predecessors(task_name)) or task_name in control_tasks:
dg.node[task_name]['status'] = 'INPROGRESS'
for t in generate_task(task, dg.node[task_name], task_id):
to_execute.append(t)
return group(to_execute)
def generate_task(task, data, task_id):
subtask = task.subtask(
data['args'], task_id=task_id,
time_limit=data.get('time_limit', None),
soft_time_limit=data.get('soft_time_limit', None))
if data.get('target', None):
subtask.set(queue=data['target'])
yield subtask
def all_success(dg, nodes):
return all((dg.node[n]['status'] == 'SUCCESS' for n in nodes))

View File

@ -0,0 +1,63 @@
class Chain(object):
def __init__(self, dg, inprogress, added):
self.dg = dg
self.inprogress = inprogress
self.added = added
self.rules = []
def add_rule(self, rule):
self.rules.append(rule)
@property
def filtered(self):
for item in self.added:
for rule in self.rules:
if not rule(self.dg, self.inprogress, item):
break
else:
self.inprogress.append(item)
yield item
def __iter__(self):
return iter(self.filtered)
def get_default_chain(dg, inprogress, added):
chain = Chain(dg, inprogress, added)
chain.add_rule(items_rule)
chain.add_rule(target_based_rule)
chain.add_rule(type_based_rule)
return chain
def type_based_rule(dg, inprogress, item):
"""condition will be specified like:
type_limit: 2
"""
_type = dg.node[item].get('resource_type')
if not 'type_limit' in dg.node[item]: return True
if not _type: return True
type_count = 0
for n in inprogress:
if dg.node[n].get('resource_type') == _type:
type_count += 1
return dg.node[item]['type_limit'] > type_count
def target_based_rule(dg, inprogress, item, limit=1):
target = dg.node[item].get('target')
if not target: return True
target_count = 0
for n in inprogress:
if dg.node[n].get('target') == target:
target_count += 1
return limit > target_count
def items_rule(dg, inprogress, item, limit=100):
return len(inprogress) < limit

View File

@ -5,8 +5,6 @@ import subprocess
import time
from celery.app import task
from celery import group
from celery.exceptions import Ignore
import redis
from solar.orchestration import graph
@ -14,6 +12,9 @@ from solar.core import actions
from solar.core import resource
from solar.system_log.tasks import commit_logitem, error_logitem
from solar.orchestration.runner import app
from solar.orchestration.traversal import traverse
from solar.orchestration import limits
from solar.orchestration import executor
r = redis.StrictRedis(host='10.0.0.2', port=6379, db=1)
@ -23,7 +24,7 @@ __all__ = ['solar_resource', 'cmd', 'sleep',
'error', 'fault_tolerance', 'schedule_start', 'schedule_next']
# NOTE(dshulyak) i am not using celery.signals because it is not possible
# to extrace task_id from *task_success* signal
# to extract task_id from *task_success* signal
class ReportTask(task.Task):
def on_success(self, retval, task_id, args, kwargs):
@ -41,13 +42,13 @@ class ReportTask(task.Task):
report_task = partial(app.task, base=ReportTask, bind=True)
@report_task
@report_task(name='solar_resource')
def solar_resource(ctxt, resource_name, action):
res = resource.load(resource_name)
return actions.resource_action(res, action)
@report_task
@report_task(name='cmd')
def cmd(ctxt, cmd):
popen = subprocess.Popen(
cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
@ -58,17 +59,17 @@ def cmd(ctxt, cmd):
return popen.returncode, out, err
@report_task
@report_task(name='sleep')
def sleep(ctxt, seconds):
time.sleep(seconds)
@report_task
@report_task(name='error')
def error(ctxt, message):
raise Exception('message')
@report_task
@report_task(name='fault_tolerance')
def fault_tolerance(ctxt, percent):
task_id = ctxt.request.id
plan_uid, task_name = task_id.rsplit(':', 1)
@ -88,12 +89,12 @@ def fault_tolerance(ctxt, percent):
succes_percent, percent))
@report_task
@report_task(name='echo')
def echo(ctxt, message):
return message
@report_task
@report_task(name='anchor')
def anchor(ctxt, *args):
# such tasks should be walked when atleast 1/3/exact number of resources visited
dg = graph.get_graph('current')
@ -103,12 +104,18 @@ def anchor(ctxt, *args):
def schedule(plan_uid, dg):
next_tasks = list(traverse(dg))
tasks = traverse(dg)
limit_chain = limits.get_default_chain(
dg,
[t for t in dg if dg.node[t]['status'] == 'INPROGRESS'],
tasks)
execution = executor.celery_executor(
dg, limit_chain, control_tasks=('fault_tolerance',))
graph.save_graph(plan_uid, dg)
group(next_tasks)()
execution()
@app.task
@app.task(name='schedule_start')
def schedule_start(plan_uid, start=None, end=None):
"""On receive finished task should update storage with task result:
@ -119,7 +126,7 @@ def schedule_start(plan_uid, start=None, end=None):
schedule(plan_uid, dg)
@app.task
@app.task(name='soft_stop')
def soft_stop(plan_uid):
dg = graph.get_graph(plan_uid)
for n in dg:
@ -128,7 +135,7 @@ def soft_stop(plan_uid):
graph.save_graph(plan_uid, dg)
@app.task
@app.task(name='schedule_next')
def schedule_next(task_id, status, errmsg=None):
plan_uid, task_name = task_id.rsplit(':', 1)
dg = graph.get_graph(plan_uid)
@ -136,62 +143,3 @@ def schedule_next(task_id, status, errmsg=None):
dg.node[task_name]['errmsg'] = errmsg
schedule(plan_uid, dg)
# TODO(dshulyak) some tasks should be evaluated even if not all predecessors
# succeded, how to identify this?
# - add ignor_error on edge
# - add ignore_predecessor_errors on task in consideration
# - make fault_tolerance not a task but a policy for all tasks
control_tasks = [fault_tolerance, anchor]
def traverse(dg):
"""
1. Node should be visited only when all predecessors already visited
2. Visited nodes should have any state except PENDING, INPROGRESS, for now
is SUCCESS or ERROR, but it can be extended
3. If node is INPROGRESS it should not be visited once again
"""
visited = set()
for node in dg:
data = dg.node[node]
if data['status'] not in ('PENDING', 'INPROGRESS', 'SKIPPED'):
visited.add(node)
for node in dg:
data = dg.node[node]
if node in visited:
continue
elif data['status'] in ('INPROGRESS', 'SKIPPED'):
continue
predecessors = set(dg.predecessors(node))
if predecessors <= visited:
task_id = '{}:{}'.format(dg.graph['uid'], node)
task_name = '{}.{}'.format(__name__, data['type'])
task = app.tasks[task_name]
if all_success(dg, predecessors) or task in control_tasks:
dg.node[node]['status'] = 'INPROGRESS'
for t in generate_task(task, dg, data, task_id):
yield t
def generate_task(task, dg, data, task_id):
subtask = task.subtask(
data['args'], task_id=task_id,
time_limit=data.get('time_limit', None),
soft_time_limit=data.get('soft_time_limit', None))
if data.get('target', None):
subtask.set(queue=data['target'])
yield subtask
def all_success(dg, nodes):
return all((dg.node[n]['status'] == 'SUCCESS' for n in nodes))

View File

@ -0,0 +1,36 @@
"""
task should be visited only when predecessors are visited,
visited node could be only in SUCCESS or ERROR
task can be scheduled for execution if it is not yet visited, and state
not in SKIPPED, INPROGRESS
PENDING - task that is scheduled to be executed
ERROR - visited node, but failed, can be failed by timeout
SUCCESS - visited node, successfull
INPROGRESS - task already scheduled, can be moved to ERROR or SUCCESS
SKIPPED - not visited, and should be skipped from execution
"""
VISITED = ('SUCCESS', 'ERROR', 'NOOP')
BLOCKED = ('INPROGRESS', 'SKIPPED')
def traverse(dg):
visited = set()
for node in dg:
data = dg.node[node]
if data['status'] in VISITED:
visited.add(node)
for node in dg:
data = dg.node[node]
if node in visited or data['status'] in BLOCKED:
continue
if set(dg.predecessors(node)) <= visited:
yield node

View File

@ -0,0 +1,22 @@
import networkx as nx
from pytest import fixture
from mock import patch
from solar.orchestration import executor
@fixture
def dg():
ex = nx.DiGraph()
ex.add_node('t1', args=['t'], status='PENDING', type='echo')
ex.graph['uid'] = 'some_string'
return ex
@patch.object(executor, 'app')
def test_celery_executor(mapp, dg):
"""Just check that it doesnt fail for now.
"""
assert executor.celery_executor(dg, ['t1'])
assert dg.node['t1']['status'] == 'INPROGRESS'

View File

@ -3,7 +3,7 @@ from pytest import fixture
from dictdiffer import revert, patch
import networkx as nx
from solar import operations
from solar.system_log import change
from solar.core.resource import wrap_resource
@ -32,12 +32,12 @@ def commited():
@fixture
def full_diff(staged):
return operations.create_diff(staged, {})
return change.create_diff(staged, {})
@fixture
def diff_for_update(staged, commited):
return operations.create_diff(staged, commited)
return change.create_diff(staged, commited)
def test_create_diff_with_empty_commited(full_diff):
@ -98,7 +98,7 @@ def conn_graph():
def test_stage_changes(resources, conn_graph):
commited = {}
log = operations._stage_changes(resources, conn_graph, commited, [])
log = change._stage_changes(resources, conn_graph, commited, [])
assert len(log) == 3
assert [l.res for l in log] == ['n.1', 'r.1', 'h.1']

View File

@ -0,0 +1,50 @@
from pytest import fixture
import networkx as nx
from solar.orchestration import limits
@fixture
def dg():
ex = nx.DiGraph()
ex.add_node('t1', status='PENDING', target='1',
resource_type='node', type_limit=2)
ex.add_node('t2', status='PENDING', target='1',
resource_type='node', type_limit=2)
ex.add_node('t3', status='PENDING', target='1',
resource_type='node', type_limit=2)
return ex
def test_target_rule(dg):
assert limits.target_based_rule(dg, [], 't1') == True
assert limits.target_based_rule(dg, ['t1'], 't2') == False
def test_type_limit_rule(dg):
assert limits.type_based_rule(dg, ['t1'], 't2') == True
assert limits.type_based_rule(dg, ['t1', 't2'], 't3') == False
def test_items_rule(dg):
assert limits.items_rule(dg, ['1']*99, '2')
assert limits.items_rule(dg, ['1']*99, '2', limit=10) == False
@fixture
def target_dg():
ex = nx.DiGraph()
ex.add_node('t1', status='PENDING', target='1')
ex.add_node('t2', status='PENDING', target='1')
return ex
def test_filtering_chain(target_dg):
chain = limits.get_default_chain(target_dg, [], ['t1', 't2'])
assert list(chain) == ['t1']

View File

@ -1,74 +0,0 @@
import pytest
from solar.core import resource
from solar import operations
from solar import state
@pytest.fixture
def default_resources():
from solar.core import signals
from solar.core import resource
node1 = resource.wrap_resource(
{'id': 'node1',
'input': {'ip': {'value':'10.0.0.3'}}})
rabbitmq_service1 = resource.wrap_resource(
{'id':'rabbitmq',
'input': {
'ip' : {'value': ''},
'image': {'value': 'rabbitmq:3-management'}}})
signals.connect(node1, rabbitmq_service1)
return resource.load_all()
@pytest.mark.usefixtures("default_resources")
def test_changes_on_update_image():
log = operations.stage_changes()
assert len(log) == 2
operations.commit_changes()
rabbitmq = resource.load('rabbitmq')
rabbitmq.update({'image': 'different'})
log = operations.stage_changes()
assert len(log) == 1
item = log.items[0]
assert item.diff == [
('change', u'input.image.value',
(u'rabbitmq:3-management', u'different')),
('change', u'metadata.input.image.value',
(u'rabbitmq:3-management', u'different'))]
assert item.action == 'update'
operations.commit_changes()
commited = state.CD()
assert commited['rabbitmq']['input']['image'] == {
u'emitter': None, u'value': u'different'}
reverse = operations.rollback(state.CL().items[-1])
assert reverse.diff == [
('change', u'input.image.value',
(u'different', u'rabbitmq:3-management')),
('change', u'metadata.input.image.value',
(u'different', u'rabbitmq:3-management'))]
operations.commit_changes()
commited = state.CD()
assert commited['rabbitmq']['input']['image'] == {
u'emitter': None, u'value': u'rabbitmq:3-management'}

View File

@ -0,0 +1,56 @@
import networkx as nx
from pytest import fixture
from solar.orchestration.traversal import traverse
@fixture
def tasks():
return [
{'id': 't1', 'status': 'PENDING'},
{'id': 't2', 'status': 'PENDING'},
{'id': 't3', 'status': 'PENDING'},
{'id': 't4', 'status': 'PENDING'},
{'id': 't5', 'status': 'PENDING'}]
@fixture
def dg(tasks):
ex = nx.DiGraph()
for t in tasks:
ex.add_node(t['id'], status=t['status'])
return ex
def test_parallel(dg):
dg.add_path(['t1', 't3', 't4', 't5'])
dg.add_path(['t2', 't3'])
assert set(traverse(dg)) == {'t1', 't2'}
def test_walked_only_when_all_predecessors_visited(dg):
dg.add_path(['t1', 't3', 't4', 't5'])
dg.add_path(['t2', 't3'])
dg.node['t1']['status'] = 'SUCCESS'
dg.node['t2']['status'] = 'INPROGRESS'
assert set(traverse(dg)) == set()
dg.node['t2']['status'] = 'SUCCESS'
assert set(traverse(dg)) == {'t3'}
def test_nothing_will_be_walked_if_parent_is_skipped(dg):
dg.add_path(['t1', 't2', 't3', 't4', 't5'])
dg.node['t1']['status'] = 'SKIPPED'
assert set(traverse(dg)) == set()
def test_node_will_be_walked_if_parent_is_noop(dg):
dg.add_path(['t1', 't2', 't3', 't4', 't5'])
dg.node['t1']['status'] = 'NOOP'
assert set(traverse(dg)) == {'t2'}

View File

@ -1,169 +0,0 @@
import pytest
from solar.core import signals
from solar.core import resource
from solar import operations
@pytest.fixture
def resources():
node1 = resource.wrap_resource(
{'id': 'node1',
'input': {'ip': {'value': '10.0.0.3'}}})
mariadb_service1 = resource.wrap_resource(
{'id': 'mariadb',
'input': {
'port' : {'value': 3306},
'ip': {'value': ''}}})
keystone_db = resource.wrap_resource(
{'id':'keystone_db',
'input': {
'login_port' : {'value': ''},
'ip': {'value': ''}}})
signals.connect(node1, mariadb_service1)
signals.connect(node1, keystone_db)
signals.connect(mariadb_service1, keystone_db, {'port': 'login_port'})
return resource.load_all()
def test_update_port_on_mariadb(resources):
operations.stage_changes()
operations.commit_changes()
mariadb = resources['mariadb']
mariadb.update({'port': 4400})
log = operations.stage_changes()
assert len(log) == 2
mariadb_log = log.items[0]
assert mariadb_log.diff == [
('change', u'input.port.value', (3306, 4400)),
('change', u'metadata.input.port.value', (3306, 4400))]
keystone_db_log = log.items[1]
assert keystone_db_log.diff == [
('change', u'input.login_port.value', (3306, 4400)),
('change', u'metadata.input.login_port.value', (3306, 4400))]
@pytest.fixture
def simple_input():
res1 = resource.wrap_resource(
{'id': 'res1',
'input': {'ip': {'value': '10.10.0.2'}}})
res2 = resource.wrap_resource(
{'id': 'res2',
'input': {'ip': {'value': '10.10.0.3'}}})
signals.connect(res1, res2)
return resource.load_all()
def test_update_simple_resource(simple_input):
operations.stage_changes()
operations.commit_changes()
res1 = simple_input['res1']
res1.update({'ip': '10.0.0.3'})
log = operations.stage_changes()
assert len(log) == 2
assert log.items[0].diff == [
('change', u'input.ip.value', ('10.10.0.2', '10.0.0.3')),
('change', 'metadata.input.ip.value', ('10.10.0.2', '10.0.0.3')),
]
assert log.items[1].diff == [
('change', u'input.ip.value', ('10.10.0.2', '10.0.0.3')),
('change', 'metadata.input.ip.value', ('10.10.0.2', '10.0.0.3')),
]
operations.commit_changes()
assert simple_input['res1'].args_dict() == {
'ip': '10.0.0.3',
}
assert simple_input['res2'].args_dict() == {
'ip': '10.0.0.3',
}
log_item = operations.rollback_last()
assert log_item.diff == [
('change', u'input.ip.value', (u'10.0.0.3', u'10.10.0.2')),
('change', 'metadata.input.ip.value', ('10.0.0.3', '10.10.0.2')),
]
res2 = resource.load('res2')
assert res2.args_dict() == {
'ip': '10.10.0.2',
}
@pytest.fixture
def list_input():
res1 = resource.wrap_resource(
{'id': 'res1',
'input': {'ip': {'value': '10.10.0.2'}}})
res2 = resource.wrap_resource(
{'id': 'res2',
'input': {'ip': {'value': '10.10.0.3'}}})
consumer = resource.wrap_resource(
{'id': 'consumer',
'input':
{'ips': {'value': [],
'schema': ['str']}}})
signals.connect(res1, consumer, {'ip': 'ips'})
signals.connect(res2, consumer, {'ip': 'ips'})
return resource.load_all()
def test_update_list_resource(list_input):
operations.stage_changes()
operations.commit_changes()
res3 = resource.wrap_resource(
{'id': 'res3',
'input': {'ip': {'value': '10.10.0.4'}}})
signals.connect(res3, list_input['consumer'], {'ip': 'ips'})
log = operations.stage_changes()
assert len(log) == 2
assert log.items[0].res == res3.name
assert log.items[1].diff == [
('add', u'connections', [(2, ['res3', u'consumer', ['ip', 'ips']])]),
('add', u'input.ips', [
(2, {u'emitter_attached_to': u'res3', u'emitter': u'ip', u'value': u'10.10.0.4'})]),
('add', u'metadata.input.ips.value',
[(2, {u'emitter_attached_to': u'res3', u'emitter': u'ip', u'value': u'10.10.0.4'})])]
operations.commit_changes()
assert list_input['consumer'].args_dict() == {
u'ips': [
{u'emitter_attached_to': u'res1', u'emitter': u'ip', u'value': u'10.10.0.2'},
{u'emitter_attached_to': u'res2', u'emitter': u'ip', u'value': u'10.10.0.3'},
{u'emitter_attached_to': u'res3', u'emitter': u'ip', u'value': u'10.10.0.4'}]}
log_item = operations.rollback_last()
assert log_item.diff == [
('remove', u'connections', [(2, ['res3', u'consumer', ['ip', 'ips']])]),
('remove', u'input.ips', [
(2, {u'emitter_attached_to': u'res3', u'emitter': u'ip', u'value': u'10.10.0.4'})]),
('remove', u'metadata.input.ips.value',
[(2, {u'emitter_attached_to': u'res3', u'emitter': u'ip', u'value': u'10.10.0.4'})])]
consumer = resource.load('consumer')
assert consumer.args_dict() == {
u'ips': [{u'emitter': u'ip',
u'emitter_attached_to': u'res1',
u'value': u'10.10.0.2'},
{u'emitter': u'ip',
u'emitter_attached_to': u'res2',
u'value': u'10.10.0.3'}]}

View File

@ -0,0 +1,2 @@
-r requirements.txt
mock