Plugable constructors, runners and hooks

Constructors responsible for preparing worker with executor
to be run by runner
Runner acts like a supervisor for executor/executors, in a
simplest scenario it just runs executor in the same thread

Related to blueprint stevedorize-orchestration

Change-Id: If37c3f6be340b7108ac5d672110071238059bd95
This commit is contained in:
Dmitry Shulyak 2016-01-28 14:13:21 +02:00
parent e0e6e0eb73
commit 1b4e1da872
11 changed files with 115 additions and 56 deletions

View File

@ -69,3 +69,16 @@ solar.orchestration.drivers.scheduler =
solar = solar.orchestration.workers.scheduler:Scheduler solar = solar.orchestration.workers.scheduler:Scheduler
solar.orchestration.drivers.system_log = solar.orchestration.drivers.system_log =
solar = solar.orchestration.workers.system_log:SystemLog solar = solar.orchestration.workers.system_log:SystemLog
solar.orchestration.hooks.tasks.construct =
scheduler_sub = solar.orchestration.workers.scheduler:tasks_subscribe
system_log_sub = solar.orchestration.workers.system_log:tasks_subscribe
solar.orchestration.hooks.system_log.construct =
session_sub = solar.orchestration:wrap_session
solar.orchestration.hooks.scheduler.construct =
session_sub = solar.orchestration:wrap_session
solar.orchestration.runners =
gevent = solar.orchestration.runners.gevent_runner:run_all
solar.orchestration.constructors =
tasks = solar.orchestration:construct_tasks
system_log = solar.orchestration:construct_system_log
scheduler = solar.orchestration:construct_scheduler

View File

@ -38,6 +38,7 @@ C.executor = 'zerorpc'
C.tasks_driver = 'solar' C.tasks_driver = 'solar'
C.scheduler_driver = 'solar' C.scheduler_driver = 'solar'
C.system_log_driver = 'solar' C.system_log_driver = 'solar'
C.runner = 'gevent'
def _lookup_vals(setter, config, prefix=None): def _lookup_vals(setter, config, prefix=None):

View File

@ -17,58 +17,42 @@ from solar.core.log import log
from solar.dblayer import ModelMeta from solar.dblayer import ModelMeta
from solar.orchestration import extensions as loader from solar.orchestration import extensions as loader
from solar.orchestration.executors import Executor from solar.orchestration.executors import Executor
from solar.orchestration.workers.scheduler import SchedulerCallbackClient
SCHEDULER_CLIENT = loader.get_client('scheduler') SCHEDULER_CLIENT = loader.get_client('scheduler')
def wrap_session(extension, clients):
log.debug('DB session for %r', extension)
extension.for_all.before(lambda ctxt: ModelMeta.session_start())
extension.for_all.after(lambda ctxt: ModelMeta.session_end())
def construct_scheduler(extensions, clients): def construct_scheduler(extensions, clients):
scheduler = extensions['scheduler'] scheduler = extensions['scheduler']
loader.load_contruct_hooks('scheduler', extensions, clients)
scheduler_executor = Executor( scheduler_executor = Executor(
scheduler, clients['scheduler'].connect_to) scheduler, clients['scheduler'].connect_to)
scheduler.for_all.before(lambda ctxt: ModelMeta.session_start())
scheduler.for_all.after(lambda ctxt: ModelMeta.session_end())
scheduler_executor.run() scheduler_executor.run()
def construct_system_log(extensions, clients): def construct_system_log(extensions, clients):
syslog = extensions['system_log'] syslog = extensions['system_log']
syslog.for_all.before(lambda ctxt: ModelMeta.session_start()) loader.load_contruct_hooks('system_log', extensions, clients)
syslog.for_all.after(lambda ctxt: ModelMeta.session_end())
Executor(syslog, clients['system_log'].connect_to).run() Executor(syslog, clients['system_log'].connect_to).run()
def construct_tasks(extensions, clients): def construct_tasks(extensions, clients):
syslog = clients['system_log']
# FIXME will be solved by hooks on certain events
# solar.orchestraion.extensions.tasks.before =
# 1 = solar.orchestration.workers.scheduler:subscribe
scheduler = SchedulerCallbackClient(clients['scheduler'])
tasks = extensions['tasks'] tasks = extensions['tasks']
loader.load_contruct_hooks('tasks', extensions, clients)
tasks_executor = Executor(tasks, clients['tasks'].connect_to) tasks_executor = Executor(tasks, clients['tasks'].connect_to)
tasks.for_all.before(tasks_executor.register_task) tasks.for_all.before(tasks_executor.register_task)
tasks.for_all.on_success(syslog.commit)
tasks.for_all.on_error(syslog.error)
tasks.for_all.on_success(scheduler.update)
tasks.for_all.on_error(scheduler.error)
tasks_executor.run() tasks_executor.run()
def main(): def main():
import sys runner = loader.get_runner(C.runner)
from gevent import spawn constructors = loader.get_constructors()
from gevent import joinall
clients = loader.get_clients() clients = loader.get_clients()
mgr = loader.get_extensions(clients) exts = loader.get_extensions(clients)
servers = [ runner.driver(constructors, exts, clients)
spawn(construct_scheduler, mgr, clients),
spawn(construct_system_log, mgr, clients),
spawn(construct_tasks, mgr, clients)
]
try:
log.info('Spawning scheduler, system log and tasks workers.')
joinall(servers)
except KeyboardInterrupt:
log.info('Exit solar-worker')
sys.exit()

View File

@ -74,3 +74,23 @@ def get_extensions(clients):
invoke_on_load=True, invoke_on_load=True,
invoke_args=(clients,)) invoke_args=(clients,))
return ext return ext
def load_contruct_hooks(name, extensions, clients):
extension.ExtensionManager(
namespace='solar.orchestration.hooks.{}.construct'.format(name),
invoke_on_load=True,
invoke_args=(extensions[name], clients))
def get_runner(name):
return driver.DriverManager(
namespace='solar.orchestration.runners',
name=name,
invoke_on_load=False)
def get_constructors():
return extension.ExtensionManager(
namespace='solar.orchestration.constructors',
invoke_on_load=False)

View File

View File

@ -0,0 +1,34 @@
#!/usr/bin/env python
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import sys
import gevent
from solar.core.log import log
def run_all(construct_manager, extensions, clients):
def _spawn(constructor, extensions, clients):
return gevent.spawn(constructor.plugin, extensions, clients)
try:
log.info('Spawning scheduler, system log and tasks workers.')
gevent.joinall(
construct_manager.map(_spawn, extensions, clients))
except KeyboardInterrupt:
log.info('Exit solar-worker')
sys.exit()

View File

@ -1,5 +1,4 @@
# # Copyright 2015 Mirantis, Inc.
# Copyright 2015 Mirantis, Inc.
# #
# Licensed under the Apache License, Version 2.0 (the "License"); you may # Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain # not use this file except in compliance with the License. You may obtain
@ -12,9 +11,7 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
#
from solar.orchestration.workers.scheduler import Scheduler import solar.orchestration.workers.scheduler
from solar.orchestration.workers.scheduler import SchedulerCallbackClient import solar.orchestration.workers.system_log
from solar.orchestration.workers.system_log import SystemLog import solar.orchestration.workers.tasks
from solar.orchestration.workers.tasks import Tasks

View File

@ -140,3 +140,10 @@ class SchedulerCallbackClient(object):
def error(self, ctxt, result, *args, **kwargs): def error(self, ctxt, result, *args, **kwargs):
self.client.update_next(ctxt, states.ERROR.name, repr(result)) self.client.update_next(ctxt, states.ERROR.name, repr(result))
def tasks_subscribe(tasks, clients):
log.debug('Scheduler subscribes to tasks hooks')
scheduler = SchedulerCallbackClient(clients['scheduler'])
tasks.for_all.on_success(scheduler.update)
tasks.for_all.on_error(scheduler.error)

View File

@ -12,6 +12,7 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
from solar.core.log import log
from solar.orchestration.workers import base from solar.orchestration.workers import base
from solar.system_log.operations import move_to_commited from solar.system_log.operations import move_to_commited
from solar.system_log.operations import set_error from solar.system_log.operations import set_error
@ -24,3 +25,10 @@ class SystemLog(base.Worker):
def error(self, ctxt, *args, **kwargs): def error(self, ctxt, *args, **kwargs):
return set_error(ctxt['task_id'].rsplit(':', 1)[-1]) return set_error(ctxt['task_id'].rsplit(':', 1)[-1])
def tasks_subscribe(tasks, clients):
log.debug('System log subscribes to tasks hooks')
syslog = clients['system_log']
tasks.for_all.on_success(syslog.commit)
tasks.for_all.on_error(syslog.error)

View File

@ -18,7 +18,7 @@ import string
import gevent import gevent
import pytest import pytest
from solar.config import C
from solar.core.log import log from solar.core.log import log
from solar.dblayer.model import ModelMeta from solar.dblayer.model import ModelMeta
from solar.orchestration import executors from solar.orchestration import executors
@ -104,3 +104,13 @@ def clients(request):
@pytest.fixture @pytest.fixture
def extensions(clients): def extensions(clients):
return loader.get_extensions(clients) return loader.get_extensions(clients)
@pytest.fixture
def runner():
return loader.get_runner(C.runner)
@pytest.fixture
def constructors():
return loader.get_constructors()

View File

@ -33,24 +33,9 @@ def scheduler_client(scheduler_address):
@pytest.fixture(autouse=True) @pytest.fixture(autouse=True)
def tasks(extensions, clients): def prepare_all(constructors, extensions, clients):
gevent.spawn( for cons in constructors:
orchestration.construct_tasks, gevent.spawn(cons.plugin, extensions, clients)
extensions, clients)
@pytest.fixture(autouse=True)
def scheduler(extensions, clients):
gevent.spawn(
orchestration.construct_scheduler,
extensions, clients)
@pytest.fixture(autouse=True)
def system_log(extensions, clients):
gevent.spawn(
orchestration.construct_system_log,
extensions, clients)
@pytest.fixture(autouse=True) @pytest.fixture(autouse=True)
@ -61,7 +46,7 @@ def resources(request, sequence_vr):
'sequence_%s' % idx, sequence_vr, inputs={'idx': idx}) 'sequence_%s' % idx, sequence_vr, inputs={'idx': idx})
@pytest.mark.parametrize('scale', [10]) @pytest.mark.parametrize('scale', [3])
def test_concurrent_sequences_with_no_handler(scale, clients): def test_concurrent_sequences_with_no_handler(scale, clients):
total_resources = scale * 3 total_resources = scale * 3
timeout = scale * 2 timeout = scale * 2