From 671eada5577a17343dcb7f9131b2aa342e73e942 Mon Sep 17 00:00:00 2001 From: Dmitry Shulyak Date: Mon, 25 Jan 2016 14:32:24 +0200 Subject: [PATCH] Improve fixtures for functional tests Make workers fixtures reusable, the only test that shouldn't use this fixutes is test_complete_workflow - as it will use the same way of bootstraping solar as solar-worker cli script Create all zeromq ipc transports in tmpdir fixture, so that they will be cleaned up after test Change-Id: Icf8f2bfd7c9a71044adc5ac4048775b30e861294 --- solar/test/functional/conftest.py | 53 +++++++++++++++++- .../test_orchestration_scheduling.py | 56 +------------------ solar/test/functional/test_retries.py | 45 --------------- solar/test/functional/test_timelimit.py | 36 +----------- 4 files changed, 57 insertions(+), 133 deletions(-) diff --git a/solar/test/functional/conftest.py b/solar/test/functional/conftest.py index 72d91cfe..398e40e1 100644 --- a/solar/test/functional/conftest.py +++ b/solar/test/functional/conftest.py @@ -15,12 +15,19 @@ import random import string +import gevent import pytest +from solar.core.log import log +from solar.dblayer.model import ModelMeta +from solar.orchestration.executors import zerorpc_executor +from solar.orchestration import workers + + @pytest.fixture -def address(): - return 'ipc:///tmp/solar_test_' + ''.join( +def address(tmpdir): + return 'ipc:///%s/' % tmpdir + ''.join( (random.choice(string.ascii_lowercase) for x in xrange(4))) @@ -37,3 +44,45 @@ def system_log_address(address): @pytest.fixture def scheduler_address(address): return address + 'scheduler' + + +@pytest.fixture +def scheduler(request, scheduler_address): + tasks_client = None + + if 'tasks' in request.node.fixturenames: + tasks_client = zerorpc_executor.Client( + request.getfuncargvalue('tasks_address')) + + worker = workers.scheduler.Scheduler(tasks_client) + + def session_end(ctxt): + log.debug('Session end ID %s', id(gevent.getcurrent())) + ModelMeta.session_end() + + def session_start(ctxt): + log.debug('Session start ID %s', id(gevent.getcurrent())) + ModelMeta.session_start() + + worker.for_all.before(session_start) + worker.for_all.after(session_end) + + executor = zerorpc_executor.Executor(worker, scheduler_address) + gevent.spawn(executor.run) + return worker, zerorpc_executor.Client(scheduler_address) + + +@pytest.fixture +def tasks(request, tasks_address): + worker = workers.tasks.Tasks() + executor = zerorpc_executor.Executor(worker, tasks_address) + worker.for_all.before(executor.register) + if 'scheduler' in request.node.fixturenames: + scheduler_client = workers.scheduler.SchedulerCallbackClient( + zerorpc_executor.Client(request.getfuncargvalue( + 'scheduler_address'))) + worker.for_all.on_success(scheduler_client.update) + worker.for_all.on_error(scheduler_client.error) + + gevent.spawn(executor.run) + return worker, zerorpc_executor.Client(tasks_address) diff --git a/solar/test/functional/test_orchestration_scheduling.py b/solar/test/functional/test_orchestration_scheduling.py index 21d9d2d4..24c0f4fc 100644 --- a/solar/test/functional/test_orchestration_scheduling.py +++ b/solar/test/functional/test_orchestration_scheduling.py @@ -15,56 +15,6 @@ import time import gevent -import pytest - -from solar.core.log import log -from solar.dblayer.model import ModelMeta -from solar.orchestration.executors import zerorpc_executor -from solar.orchestration.workers import scheduler as wscheduler -from solar.orchestration.workers import tasks as wtasks - - -@pytest.fixture -def tasks_worker(): - return wtasks.Tasks() - - -@pytest.fixture -def tasks_for_scheduler(request, tasks_worker, address): - address = address + 'tasks' - executor = zerorpc_executor.Executor(tasks_worker, address) - gevent.spawn(executor.run) - return zerorpc_executor.Client(address) - - -@pytest.fixture -def scheduler(tasks_for_scheduler, scheduler_address): - address = scheduler_address - worker = wscheduler.Scheduler(tasks_for_scheduler) - - def session_end(ctxt): - log.debug('Session end ID %s', id(gevent.getcurrent())) - ModelMeta.session_end() - - def session_start(ctxt): - log.debug('Session start ID %s', id(gevent.getcurrent())) - ModelMeta.session_start() - - worker.for_all.before(session_start) - worker.for_all.after(session_end) - - executor = zerorpc_executor.Executor(worker, address) - gevent.spawn(executor.run) - return worker, zerorpc_executor.Client(address) - - -@pytest.fixture(autouse=True) -def setup_scheduler_callback(scheduler, tasks_worker): - worker, client = scheduler - scheduler_client = wscheduler.SchedulerCallbackClient( - zerorpc_executor.Client(client.connect_to)) - tasks_worker.for_all.on_success(scheduler_client.update) - tasks_worker.for_all.on_error(scheduler_client.update) def _wait_scheduling(plan, wait_time, waiter, client): @@ -73,7 +23,7 @@ def _wait_scheduling(plan, wait_time, waiter, client): waiter.join(timeout=wait_time) -def test_simple_fixture(simple_plan, scheduler): +def test_simple_fixture(simple_plan, scheduler, tasks): worker, client = scheduler scheduling_results = [] expected = [['echo_stuff'], ['just_fail'], []] @@ -89,7 +39,7 @@ def test_simple_fixture(simple_plan, scheduler): assert scheduling_results == expected -def test_sequential_fixture(sequential_plan, scheduler): +def test_sequential_fixture(sequential_plan, scheduler, tasks): worker, client = scheduler scheduling_results = set() expected = {('s1',), ('s2',), ('s3',), ()} @@ -105,7 +55,7 @@ def test_sequential_fixture(sequential_plan, scheduler): assert scheduling_results == expected -def test_two_path_fixture(two_path_plan, scheduler): +def test_two_path_fixture(two_path_plan, scheduler, tasks): worker, client = scheduler scheduling_results = set() expected = {'a', 'b', 'c', 'd', 'e'} diff --git a/solar/test/functional/test_retries.py b/solar/test/functional/test_retries.py index fcf4baf5..4335061b 100644 --- a/solar/test/functional/test_retries.py +++ b/solar/test/functional/test_retries.py @@ -18,13 +18,8 @@ import gevent import mock import pytest -from solar.core.log import log -from solar.dblayer.model import ModelMeta -from solar.orchestration.executors import zerorpc_executor from solar.orchestration import graph from solar.orchestration.traversal import states -from solar.orchestration.workers import scheduler as wscheduler -from solar.orchestration.workers import tasks as wtasks @pytest.fixture @@ -34,46 +29,6 @@ def simple_plan_retries(simple_plan): return simple_plan -@pytest.fixture -def scheduler(request, scheduler_address): - tasks_client = None - if 'tasks' in request.node.fixturenames: - tasks_client = zerorpc_executor.Client( - request.getfuncargvalue('tasks_address')) - worker = wscheduler.Scheduler(tasks_client) - - def session_end(ctxt): - log.debug('Session end ID %s', id(gevent.getcurrent())) - ModelMeta.session_end() - - def session_start(ctxt): - log.debug('Session start ID %s', id(gevent.getcurrent())) - ModelMeta.session_start() - - worker.for_all.before(session_start) - worker.for_all.after(session_end) - - executor = zerorpc_executor.Executor(worker, scheduler_address) - gevent.spawn(executor.run) - return worker, zerorpc_executor.Client(scheduler_address) - - -@pytest.fixture -def tasks(request, tasks_address): - worker = wtasks.Tasks() - executor = zerorpc_executor.Executor(worker, tasks_address) - - if 'scheduler' in request.node.fixturenames: - scheduler_client = wscheduler.SchedulerCallbackClient( - zerorpc_executor.Client(request.getfuncargvalue( - 'scheduler_address'))) - worker.for_all.on_success(scheduler_client.update) - worker.for_all.on_error(scheduler_client.error) - - gevent.spawn(executor.run) - return worker, zerorpc_executor.Client(tasks_address) - - def test_retry_just_fail(scheduler, tasks, simple_plan_retries): timeout = 3 plan = simple_plan_retries diff --git a/solar/test/functional/test_timelimit.py b/solar/test/functional/test_timelimit.py index 01f4a2ae..422e4c7d 100644 --- a/solar/test/functional/test_timelimit.py +++ b/solar/test/functional/test_timelimit.py @@ -17,45 +17,15 @@ import time import gevent -import pytest -from solar.dblayer import ModelMeta from solar.errors import ExecutionTimeout -from solar.orchestration import Client -from solar.orchestration import Executor from solar.orchestration import graph from solar.orchestration.traversal import states -from solar.orchestration import workers -@pytest.fixture(autouse=True) -def scheduler(scheduler_address, tasks_address): - scheduler = workers.Scheduler(Client(tasks_address)) - scheduler_executor = Executor(scheduler, scheduler_address) - scheduler.for_all.before(lambda ctxt: ModelMeta.session_start()) - scheduler.for_all.after(lambda ctxt: ModelMeta.session_end()) - gevent.spawn(scheduler_executor.run) - - -@pytest.fixture(autouse=True) -def tasks(tasks_address, scheduler_address): - scheduler = workers.SchedulerCallbackClient( - Client(scheduler_address)) - tasks = workers.Tasks() - tasks_executor = Executor(tasks, tasks_address) - tasks.for_all.before(tasks_executor.register) - tasks.for_all.on_success(scheduler.update) - tasks.for_all.on_error(scheduler.error) - gevent.spawn(tasks_executor.run) - - -@pytest.fixture -def scheduler_client(scheduler_address): - return Client(scheduler_address) - - -def test_timelimit_plan(timelimit_plan, scheduler_client): - scheduler_client.next({}, timelimit_plan.graph['uid']) +def test_timelimit_plan(timelimit_plan, scheduler, tasks): + worker, client = scheduler + client.next({}, timelimit_plan.graph['uid']) def wait_function(timeout): try: