diff --git a/setup.cfg b/setup.cfg index e0ebd438..801fc0aa 100644 --- a/setup.cfg +++ b/setup.cfg @@ -55,3 +55,17 @@ solar.handlers = naive_sync = solar.core.handlers.naive_sync:NaiveSync solar.orchestration.executors = zerorpc = solar.orchestration.executors.zerorpc_executor +solar.orchestration.extensions = + tasks = solar.orchestration.extensions:tasks + system_log = solar.orchestration.extensions:system_log + scheduler = solar.orchestration.extensions:scheduler +solar.orchestration.extensions_clients = + tasks = solar.orchestration.extensions:tasks_client + system_log = solar.orchestration.extensions:system_log_client + scheduler = solar.orchestration.extensions:scheduler_client +solar.orchestration.drivers.tasks = + solar = solar.orchestration.workers.tasks:Tasks +solar.orchestration.drivers.scheduler = + solar = solar.orchestration.workers.scheduler:Scheduler +solar.orchestration.drivers.system_log = + solar = solar.orchestration.workers.system_log:SystemLog diff --git a/solar/config.py b/solar/config.py index 6637b50f..ecbf81ca 100644 --- a/solar/config.py +++ b/solar/config.py @@ -35,6 +35,9 @@ C.tasks_address = 'ipc:///tmp/solar_tasks' C.scheduler_address = 'ipc:///tmp/solar_scheduler' C.timewatcher_address = 'ipc:///tmp/solar_timewatcher' C.executor = 'zerorpc' +C.tasks_driver = 'solar' +C.scheduler_driver = 'solar' +C.system_log_driver = 'solar' def _lookup_vals(setter, config, prefix=None): diff --git a/solar/orchestration/__init__.py b/solar/orchestration/__init__.py index 7841bc02..3aff3328 100644 --- a/solar/orchestration/__init__.py +++ b/solar/orchestration/__init__.py @@ -15,54 +15,56 @@ from solar.config import C from solar.core.log import log from solar.dblayer import ModelMeta -from solar.orchestration.executors import Client +from solar.orchestration import extensions as loader from solar.orchestration.executors import Executor -from solar.orchestration.workers import scheduler as wscheduler -from solar.orchestration.workers.system_log import SystemLog -from solar.orchestration.workers.tasks import Tasks +from solar.orchestration.workers.scheduler import SchedulerCallbackClient -SCHEDULER_CLIENT = Client(C.scheduler_address) +SCHEDULER_CLIENT = loader.get_client('scheduler') -def construct_scheduler(tasks_address, scheduler_address): - scheduler = wscheduler.Scheduler(Client(tasks_address)) - scheduler_executor = Executor(scheduler, scheduler_address) +def construct_scheduler(extensions, clients): + scheduler = extensions['scheduler'] + scheduler_executor = Executor( + scheduler, clients['scheduler'].connect_to) scheduler.for_all.before(lambda ctxt: ModelMeta.session_start()) scheduler.for_all.after(lambda ctxt: ModelMeta.session_end()) - Executor(scheduler, scheduler_address).run() + scheduler_executor.run() -def construct_system_log(system_log_address): - syslog = SystemLog() +def construct_system_log(extensions, clients): + syslog = extensions['system_log'] syslog.for_all.before(lambda ctxt: ModelMeta.session_start()) syslog.for_all.after(lambda ctxt: ModelMeta.session_end()) - Executor(syslog, system_log_address).run() + Executor(syslog, clients['system_log'].connect_to).run() -def construct_tasks(system_log_address, tasks_address, scheduler_address): - syslog = Client(system_log_address) - scheduler = wscheduler.SchedulerCallbackClient( - Client(scheduler_address)) - tasks = Tasks() - tasks_executor = Executor(tasks, tasks_address) +def construct_tasks(extensions, clients): + syslog = clients['system_log'] + # FIXME will be solved by hooks on certain events + # solar.orchestraion.extensions.tasks.before = + # 1 = solar.orchestration.workers.scheduler:subscribe + scheduler = SchedulerCallbackClient(clients['scheduler']) + tasks = extensions['tasks'] + tasks_executor = Executor(tasks, clients['tasks'].connect_to) tasks.for_all.before(tasks_executor.register_task) tasks.for_all.on_success(syslog.commit) tasks.for_all.on_error(syslog.error) tasks.for_all.on_success(scheduler.update) tasks.for_all.on_error(scheduler.error) - Executor(tasks, tasks_address).run() + tasks_executor.run() def main(): import sys from gevent import spawn from gevent import joinall + clients = loader.get_clients() + mgr = loader.get_extensions(clients) servers = [ - spawn(construct_scheduler, C.tasks_address, C.scheduler_address), - spawn(construct_system_log, C.system_log_address), - spawn(construct_tasks, C.system_log_address, C.tasks_address, - C.scheduler_address) + spawn(construct_scheduler, mgr, clients), + spawn(construct_system_log, mgr, clients), + spawn(construct_tasks, mgr, clients) ] try: log.info('Spawning scheduler, system log and tasks workers.') diff --git a/solar/orchestration/extensions.py b/solar/orchestration/extensions.py new file mode 100644 index 00000000..b31bc1bf --- /dev/null +++ b/solar/orchestration/extensions.py @@ -0,0 +1,76 @@ +# Copyright 2015 Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from functools import partial + +from stevedore import driver +from stevedore import extension + +from solar.config import C +from solar.orchestration.executors import Client + + +def client(address): + return Client(address) + + +tasks_client = partial(client, C.tasks_address) +scheduler_client = partial(client, C.scheduler_address) +system_log_client = partial(client, C.system_log_address) + + +def get_driver(extension, implementation): + mgr = driver.DriverManager( + namespace='solar.orchestration.drivers.%s' % extension, + name=implementation, + invoke_on_load=False, + ) + return mgr.driver + + +def tasks(clients): + return get_driver('tasks', C.tasks_driver)() + + +def scheduler(clients): + return get_driver('scheduler', C.scheduler_driver)(clients['tasks']) + + +def system_log(clients): + return get_driver('system_log', C.system_log_driver)() + + +class GetObjExtensionManager(extension.ExtensionManager): + + def __getitem__(self, name): + ext = super(GetObjExtensionManager, self).__getitem__(name) + return ext.obj + + +def get_clients(): + return GetObjExtensionManager( + namespace='solar.orchestration.extensions_clients', + invoke_on_load=True) + + +def get_client(name): + return get_clients()[name] + + +def get_extensions(clients): + ext = GetObjExtensionManager( + namespace='solar.orchestration.extensions', + invoke_on_load=True, + invoke_args=(clients,)) + return ext diff --git a/solar/test/functional/conftest.py b/solar/test/functional/conftest.py index becc659e..56b2ca8f 100644 --- a/solar/test/functional/conftest.py +++ b/solar/test/functional/conftest.py @@ -22,6 +22,7 @@ import pytest from solar.core.log import log from solar.dblayer.model import ModelMeta from solar.orchestration import executors +from solar.orchestration import extensions as loader from solar.orchestration import workers @@ -86,3 +87,20 @@ def tasks(request, tasks_address): gevent.spawn(executor.run) return worker, executors.Client(tasks_address) + + +@pytest.fixture +def clients(request): + rst = {} + rst['tasks'] = executors.Client(request.getfuncargvalue( + 'tasks_address')) + rst['scheduler'] = executors.Client(request.getfuncargvalue( + 'scheduler_address')) + rst['system_log'] = executors.Client(request.getfuncargvalue( + 'system_log_address')) + return rst + + +@pytest.fixture +def extensions(clients): + return loader.get_extensions(clients) diff --git a/solar/test/functional/test_complete_solar_workflow.py b/solar/test/functional/test_complete_solar_workflow.py index f6a0e0d5..66b509bb 100644 --- a/solar/test/functional/test_complete_solar_workflow.py +++ b/solar/test/functional/test_complete_solar_workflow.py @@ -33,24 +33,24 @@ def scheduler_client(scheduler_address): @pytest.fixture(autouse=True) -def tasks(system_log_address, tasks_address, scheduler_address): +def tasks(extensions, clients): gevent.spawn( orchestration.construct_tasks, - system_log_address, tasks_address, scheduler_address) + extensions, clients) @pytest.fixture(autouse=True) -def scheduler(tasks_address, scheduler_address): +def scheduler(extensions, clients): gevent.spawn( orchestration.construct_scheduler, - tasks_address, scheduler_address) + extensions, clients) @pytest.fixture(autouse=True) -def system_log(system_log_address): +def system_log(extensions, clients): gevent.spawn( orchestration.construct_system_log, - system_log_address) + extensions, clients) @pytest.fixture(autouse=True) @@ -62,9 +62,10 @@ def resources(request, sequence_vr): @pytest.mark.parametrize('scale', [10]) -def test_concurrent_sequences_with_no_handler(scale, scheduler_client): +def test_concurrent_sequences_with_no_handler(scale, clients): total_resources = scale * 3 timeout = scale * 2 + scheduler_client = clients['scheduler'] assert len(change.stage_changes()) == total_resources plan = change.send_to_orchestration()