Merge "Implement timelimit mechanism based on tracking of tasks"

This commit is contained in:
Jenkins 2016-01-26 15:46:52 +00:00 committed by Gerrit Code Review
commit 0d36cb271e
14 changed files with 198 additions and 29 deletions

View File

@ -33,6 +33,7 @@ C.log_file = 'solar.log'
C.system_log_address = 'ipc:///tmp/solar_system_log' C.system_log_address = 'ipc:///tmp/solar_system_log'
C.tasks_address = 'ipc:///tmp/solar_tasks' C.tasks_address = 'ipc:///tmp/solar_tasks'
C.scheduler_address = 'ipc:///tmp/solar_scheduler' C.scheduler_address = 'ipc:///tmp/solar_scheduler'
C.timewatcher_address = 'ipc:///tmp/solar_timewatcher'
def _lookup_vals(setter, config, prefix=None): def _lookup_vals(setter, config, prefix=None):

View File

@ -1041,6 +1041,7 @@ class Task(Model):
task_type = Field(basestring) task_type = Field(basestring)
args = Field(list) args = Field(list)
errmsg = Field(basestring, default=str) errmsg = Field(basestring, default=str)
timelimit = Field(int, default=int)
execution = IndexedField(basestring) execution = IndexedField(basestring)
parents = ParentField(default=list) parents = ParentField(default=list)

View File

@ -27,6 +27,7 @@ SCHEDULER_CLIENT = Client(C.scheduler_address)
def construct_scheduler(tasks_address, scheduler_address): def construct_scheduler(tasks_address, scheduler_address):
scheduler = wscheduler.Scheduler(Client(tasks_address)) scheduler = wscheduler.Scheduler(Client(tasks_address))
scheduler_executor = Executor(scheduler, scheduler_address)
scheduler.for_all.before(lambda ctxt: ModelMeta.session_start()) scheduler.for_all.before(lambda ctxt: ModelMeta.session_start())
scheduler.for_all.after(lambda ctxt: ModelMeta.session_end()) scheduler.for_all.after(lambda ctxt: ModelMeta.session_end())
Executor(scheduler, scheduler_address).run() Executor(scheduler, scheduler_address).run()
@ -44,6 +45,8 @@ def construct_tasks(system_log_address, tasks_address, scheduler_address):
scheduler = wscheduler.SchedulerCallbackClient( scheduler = wscheduler.SchedulerCallbackClient(
Client(scheduler_address)) Client(scheduler_address))
tasks = Tasks() tasks = Tasks()
tasks_executor = Executor(tasks, tasks_address)
tasks.for_all.before(tasks_executor.register)
tasks.for_all.on_success(syslog.commit) tasks.for_all.on_success(syslog.commit)
tasks.for_all.on_error(syslog.error) tasks.for_all.on_error(syslog.error)
tasks.for_all.on_success(scheduler.update) tasks.for_all.on_success(scheduler.update)

View File

@ -20,19 +20,22 @@ import zerorpc
from solar.core.log import log from solar.core.log import log
class ImprovedPuller(zerorpc.Puller): class PoolBasedPuller(zerorpc.Puller):
"""ImprovedPuller allows to control pool of gevent threads and """ImprovedPuller allows to control pool of gevent threads and
track assignments of gevent threads track assignments of gevent threads
""" """
def __init__(self, pool_size=100, *args, **kwargs): def __init__(self, pool_size=100, *args, **kwargs):
# TODO put pool_size into config for each worker # TODO put pool_size into config for each worker
self._tasks_pool = gevent.pool.Pool(pool_size) self._tasks_pool = gevent.pool.Pool(pool_size)
super(ImprovedPuller, self).__init__(*args, **kwargs) super(PoolBasedPuller, self).__init__(*args, **kwargs)
def _receiver(self): def _receiver(self):
while True: while True:
event = self._events.recv() event = self._events.recv()
self._tasks_pool.spawn(self._async_event, event) self._handle_event(event)
def _handle_event(self, event):
self._tasks_pool.spawn(self._async_event, event)
def _async_event(self, event): def _async_event(self, event):
try: try:
@ -55,11 +58,26 @@ class ImprovedPuller(zerorpc.Puller):
def run(self): def run(self):
try: try:
super(ImprovedPuller, self).run() super(PoolBasedPuller, self).run()
finally: finally:
self._tasks_pool.join(raise_error=True) self._tasks_pool.join(raise_error=True)
class LimitedExecutionPuller(PoolBasedPuller):
def _handle_event(self, event):
ctxt = event.args[0]
timelimit = ctxt.get('timelimit', 0)
if timelimit and 'kill' in self._methods:
# greenlet for interupting pool-based greenlets shouldn't
# share a pool with them, or it is highly possible that
# it wont be ever executed with low number of greenlets in
# a pool
gevent.spawn_later(
timelimit, self._methods['kill'], ctxt, ctxt['task_id'])
self._tasks_pool.spawn(self._async_event, event)
class Executor(object): class Executor(object):
def __init__(self, worker, bind_to): def __init__(self, worker, bind_to):
@ -79,7 +97,7 @@ class Executor(object):
self._tasks_register.pop(task_id) self._tasks_register.pop(task_id)
def run(self): def run(self):
server = ImprovedPuller(methods=self.worker) server = LimitedExecutionPuller(methods=self.worker)
server.bind(self.bind_to) server.bind(self.bind_to)
server.run() server.run()

View File

@ -39,7 +39,8 @@ def save_graph(graph):
'target': graph.node[n].get('target', '') or '', 'target': graph.node[n].get('target', '') or '',
'task_type': graph.node[n].get('type', ''), 'task_type': graph.node[n].get('type', ''),
'args': graph.node[n].get('args', []), 'args': graph.node[n].get('args', []),
'errmsg': graph.node[n].get('errmsg', '') or ''}) 'errmsg': graph.node[n].get('errmsg', '') or '',
'timelimit': graph.node[n].get('timelimit', 0)})
graph.node[n]['task'] = t graph.node[n]['task'] = t
for pred in graph.predecessors(n): for pred in graph.predecessors(n):
pred_task = graph.node[pred]['task'] pred_task = graph.node[pred]['task']
@ -77,7 +78,8 @@ def get_graph(uid):
type=t.task_type, args=t.args, type=t.task_type, args=t.args,
target=t.target or None, target=t.target or None,
errmsg=t.errmsg or None, errmsg=t.errmsg or None,
task=t) task=t,
timelimit=t.timelimit or 0)
for u in t.parents.all_names(): for u in t.parents.all_names():
dg.add_edge(u, t.name) dg.add_edge(u, t.name)
return dg return dg

View File

@ -1 +1,20 @@
#
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
from solar.orchestration.workers.scheduler import Scheduler
from solar.orchestration.workers.scheduler import SchedulerCallbackClient
from solar.orchestration.workers.system_log import SystemLog
from solar.orchestration.workers.tasks import Tasks

View File

@ -46,10 +46,19 @@ class Scheduler(base.Worker):
task_id = '{}:{}'.format(dg.graph['uid'], task_name) task_id = '{}:{}'.format(dg.graph['uid'], task_name)
task_type = dg.node[task_name]['type'] task_type = dg.node[task_name]['type']
dg.node[task_name]['status'] = 'INPROGRESS' dg.node[task_name]['status'] = 'INPROGRESS'
ctxt = {'task_id': task_id, 'task_name': task_name} timelimit = dg.node[task_name].get('timelimit', 0)
ctxt = {
'task_id': task_id,
'task_name': task_name,
'plan_uid': plan_uid,
'timelimit': timelimit}
self._tasks( self._tasks(
task_type, ctxt, task_type, ctxt,
*dg.node[task_name]['args']) *dg.node[task_name]['args'])
if timelimit:
log.debug(
'Timelimit for task %s will be %s',
task_id, timelimit)
graph.update_graph(dg) graph.update_graph(dg)
log.debug('Scheduled tasks %r', rst) log.debug('Scheduled tasks %r', rst)
# process tasks with tasks client # process tasks with tasks client
@ -64,6 +73,9 @@ class Scheduler(base.Worker):
graph.update_graph(dg) graph.update_graph(dg)
def update_next(self, ctxt, status, errmsg): def update_next(self, ctxt, status, errmsg):
log.debug(
'Received update for TASK %s - %s %s',
ctxt['task_id'], status, errmsg)
plan_uid, task_name = ctxt['task_id'].rsplit(':', 1) plan_uid, task_name = ctxt['task_id'].rsplit(':', 1)
with Lock(plan_uid, str(get_current_ident()), retries=20, wait=1): with Lock(plan_uid, str(get_current_ident()), retries=20, wait=1):
dg = graph.get_graph(plan_uid) dg = graph.get_graph(plan_uid)
@ -75,10 +87,19 @@ class Scheduler(base.Worker):
task_id = '{}:{}'.format(dg.graph['uid'], task_name) task_id = '{}:{}'.format(dg.graph['uid'], task_name)
task_type = dg.node[task_name]['type'] task_type = dg.node[task_name]['type']
dg.node[task_name]['status'] = 'INPROGRESS' dg.node[task_name]['status'] = 'INPROGRESS'
ctxt = {'task_id': task_id, 'task_name': task_name} timelimit = dg.node[task_name].get('timelimit', 0)
ctxt = {
'task_id': task_id,
'task_name': task_name,
'plan_uid': plan_uid,
'timelimit': timelimit}
self._tasks( self._tasks(
task_type, ctxt, task_type, ctxt,
*dg.node[task_name]['args']) *dg.node[task_name]['args'])
if timelimit:
log.debug(
'Timelimit for task %s will be %s',
task_id, timelimit)
graph.update_graph(dg) graph.update_graph(dg)
log.debug('Scheduled tasks %r', rst) log.debug('Scheduled tasks %r', rst)
return rst return rst

View File

@ -17,13 +17,17 @@ import time
from solar.core import actions from solar.core import actions
from solar.core.log import log from solar.core.log import log
from solar.core import resource from solar.core import resource
from solar.errors import ExecutionTimeout
from solar.orchestration.workers import base from solar.orchestration.workers import base
class Tasks(base.Worker): class Tasks(base.Worker):
def sleep(self, ctxt, seconds): def sleep(self, ctxt, seconds):
return time.sleep(seconds) log.debug('Received sleep for %s', seconds)
time.sleep(seconds)
log.debug('Finished sleep %s', seconds)
return None
def error(self, ctxt, message): def error(self, ctxt, message):
raise Exception(message) raise Exception(message)
@ -36,3 +40,10 @@ class Tasks(base.Worker):
resource_name, action) resource_name, action)
res = resource.load(resource_name) res = resource.load(resource_name)
return actions.resource_action(res, action) return actions.resource_action(res, action)
def kill(self, ctxt, task_id):
log.debug('Received kill request for task_id %s', task_id)
if not hasattr(self._executor, 'kill'):
raise NotImplemented(
'Current executor doesnt support interruping tasks')
self._executor.kill(task_id, ExecutionTimeout)

View File

@ -109,6 +109,11 @@ def two_path_plan():
return plan_from_fixture('two_path') return plan_from_fixture('two_path')
@pytest.fixture
def timelimit_plan():
return plan_from_fixture('timelimit')
@pytest.fixture @pytest.fixture
def sequence_vr(tmpdir): def sequence_vr(tmpdir):
base_path = os.path.join( base_path = os.path.join(

View File

@ -22,3 +22,18 @@ import pytest
def address(): def address():
return 'ipc:///tmp/solar_test_' + ''.join( return 'ipc:///tmp/solar_test_' + ''.join(
(random.choice(string.ascii_lowercase) for x in xrange(4))) (random.choice(string.ascii_lowercase) for x in xrange(4)))
@pytest.fixture
def tasks_address(address):
return address + 'tasks'
@pytest.fixture
def system_log_address(address):
return address + 'system_log'
@pytest.fixture
def scheduler_address(address):
return address + 'scheduler'

View File

@ -19,6 +19,7 @@ import pytest
from solar.core.resource import composer from solar.core.resource import composer
from solar.dblayer.model import clear_cache from solar.dblayer.model import clear_cache
from solar.errors import ExecutionTimeout
from solar import orchestration from solar import orchestration
from solar.orchestration.graph import wait_finish from solar.orchestration.graph import wait_finish
from solar.orchestration.traversal import states from solar.orchestration.traversal import states
@ -26,21 +27,6 @@ from solar.system_log import change
from solar.system_log import data from solar.system_log import data
@pytest.fixture
def tasks_address(address):
return address + 'tasks'
@pytest.fixture
def system_log_address(address):
return address + 'system_log'
@pytest.fixture
def scheduler_address(address):
return address + 'scheduler'
@pytest.fixture @pytest.fixture
def scheduler_client(scheduler_address): def scheduler_client(scheduler_address):
return orchestration.Client(scheduler_address) return orchestration.Client(scheduler_address)
@ -78,16 +64,19 @@ def resources(request, sequence_vr):
@pytest.mark.parametrize('scale', [10]) @pytest.mark.parametrize('scale', [10])
def test_concurrent_sequences_with_no_handler(scale, scheduler_client): def test_concurrent_sequences_with_no_handler(scale, scheduler_client):
total_resources = scale * 3 total_resources = scale * 3
timeout = scale timeout = scale * 2
assert len(change.stage_changes()) == total_resources assert len(change.stage_changes()) == total_resources
plan = change.send_to_orchestration() plan = change.send_to_orchestration()
scheduler_client.next({}, plan.graph['uid']) scheduler_client.next({}, plan.graph['uid'])
def wait_function(timeout): def wait_function(timeout):
for summary in wait_finish(plan.graph['uid'], timeout): try:
assert summary[states.ERROR.name] == 0 for summary in wait_finish(plan.graph['uid'], timeout):
time.sleep(0.5) assert summary[states.ERROR.name] == 0
time.sleep(0.5)
except ExecutionTimeout:
pass
return summary return summary
waiter = gevent.spawn(wait_function, timeout) waiter = gevent.spawn(wait_function, timeout)
waiter.join(timeout=timeout) waiter.join(timeout=timeout)

View File

@ -52,6 +52,7 @@ def scheduler(tasks_for_scheduler, address):
worker.for_all.before(session_start) worker.for_all.before(session_start)
worker.for_all.after(session_end) worker.for_all.after(session_end)
executor = zerorpc_executor.Executor(worker, address) executor = zerorpc_executor.Executor(worker, address)
gevent.spawn(executor.run) gevent.spawn(executor.run)
return worker, zerorpc_executor.Client(address) return worker, zerorpc_executor.Client(address)

View File

@ -0,0 +1,71 @@
#
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
import time
import gevent
import pytest
from solar.dblayer import ModelMeta
from solar.errors import ExecutionTimeout
from solar.orchestration import Client
from solar.orchestration import Executor
from solar.orchestration import graph
from solar.orchestration.traversal import states
from solar.orchestration import workers
@pytest.fixture(autouse=True)
def scheduler(scheduler_address, tasks_address):
scheduler = workers.Scheduler(Client(tasks_address))
scheduler_executor = Executor(scheduler, scheduler_address)
scheduler.for_all.before(lambda ctxt: ModelMeta.session_start())
scheduler.for_all.after(lambda ctxt: ModelMeta.session_end())
gevent.spawn(scheduler_executor.run)
@pytest.fixture(autouse=True)
def tasks(tasks_address, scheduler_address):
scheduler = workers.SchedulerCallbackClient(
Client(scheduler_address))
tasks = workers.Tasks()
tasks_executor = Executor(tasks, tasks_address)
tasks.for_all.before(tasks_executor.register)
tasks.for_all.on_success(scheduler.update)
tasks.for_all.on_error(scheduler.error)
gevent.spawn(tasks_executor.run)
@pytest.fixture
def scheduler_client(scheduler_address):
return Client(scheduler_address)
def test_timelimit_plan(timelimit_plan, scheduler_client):
scheduler_client.next({}, timelimit_plan.graph['uid'])
def wait_function(timeout):
try:
for summary in graph.wait_finish(
timelimit_plan.graph['uid'], timeout):
time.sleep(0.5)
except ExecutionTimeout:
return summary
waiter = gevent.spawn(wait_function, 3)
waiter.join(timeout=3)
finished_plan = graph.get_graph(timelimit_plan.graph['uid'])
assert 'ExecutionTimeout' in finished_plan.node['t1']['errmsg']
assert finished_plan.node['t2']['status'] == states.PENDING.name

View File

@ -0,0 +1,12 @@
name: timelimit
tasks:
- uid: t1
parameters:
type: sleep
args: [10]
timelimit: 1
- uid: t2
parameters:
type: echo
args: ['message']
after: [t1]