Merge "Improve fixtures for functional tests"
This commit is contained in:
commit
d4243be281
@ -15,12 +15,19 @@
|
||||
import random
|
||||
import string
|
||||
|
||||
import gevent
|
||||
import pytest
|
||||
|
||||
|
||||
from solar.core.log import log
|
||||
from solar.dblayer.model import ModelMeta
|
||||
from solar.orchestration.executors import zerorpc_executor
|
||||
from solar.orchestration import workers
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def address():
|
||||
return 'ipc:///tmp/solar_test_' + ''.join(
|
||||
def address(tmpdir):
|
||||
return 'ipc:///%s/' % tmpdir + ''.join(
|
||||
(random.choice(string.ascii_lowercase) for x in xrange(4)))
|
||||
|
||||
|
||||
@ -37,3 +44,45 @@ def system_log_address(address):
|
||||
@pytest.fixture
|
||||
def scheduler_address(address):
|
||||
return address + 'scheduler'
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def scheduler(request, scheduler_address):
|
||||
tasks_client = None
|
||||
|
||||
if 'tasks' in request.node.fixturenames:
|
||||
tasks_client = zerorpc_executor.Client(
|
||||
request.getfuncargvalue('tasks_address'))
|
||||
|
||||
worker = workers.scheduler.Scheduler(tasks_client)
|
||||
|
||||
def session_end(ctxt):
|
||||
log.debug('Session end ID %s', id(gevent.getcurrent()))
|
||||
ModelMeta.session_end()
|
||||
|
||||
def session_start(ctxt):
|
||||
log.debug('Session start ID %s', id(gevent.getcurrent()))
|
||||
ModelMeta.session_start()
|
||||
|
||||
worker.for_all.before(session_start)
|
||||
worker.for_all.after(session_end)
|
||||
|
||||
executor = zerorpc_executor.Executor(worker, scheduler_address)
|
||||
gevent.spawn(executor.run)
|
||||
return worker, zerorpc_executor.Client(scheduler_address)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def tasks(request, tasks_address):
|
||||
worker = workers.tasks.Tasks()
|
||||
executor = zerorpc_executor.Executor(worker, tasks_address)
|
||||
worker.for_all.before(executor.register)
|
||||
if 'scheduler' in request.node.fixturenames:
|
||||
scheduler_client = workers.scheduler.SchedulerCallbackClient(
|
||||
zerorpc_executor.Client(request.getfuncargvalue(
|
||||
'scheduler_address')))
|
||||
worker.for_all.on_success(scheduler_client.update)
|
||||
worker.for_all.on_error(scheduler_client.error)
|
||||
|
||||
gevent.spawn(executor.run)
|
||||
return worker, zerorpc_executor.Client(tasks_address)
|
||||
|
@ -15,56 +15,6 @@
|
||||
import time
|
||||
|
||||
import gevent
|
||||
import pytest
|
||||
|
||||
from solar.core.log import log
|
||||
from solar.dblayer.model import ModelMeta
|
||||
from solar.orchestration.executors import zerorpc_executor
|
||||
from solar.orchestration.workers import scheduler as wscheduler
|
||||
from solar.orchestration.workers import tasks as wtasks
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def tasks_worker():
|
||||
return wtasks.Tasks()
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def tasks_for_scheduler(request, tasks_worker, address):
|
||||
address = address + 'tasks'
|
||||
executor = zerorpc_executor.Executor(tasks_worker, address)
|
||||
gevent.spawn(executor.run)
|
||||
return zerorpc_executor.Client(address)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def scheduler(tasks_for_scheduler, scheduler_address):
|
||||
address = scheduler_address
|
||||
worker = wscheduler.Scheduler(tasks_for_scheduler)
|
||||
|
||||
def session_end(ctxt):
|
||||
log.debug('Session end ID %s', id(gevent.getcurrent()))
|
||||
ModelMeta.session_end()
|
||||
|
||||
def session_start(ctxt):
|
||||
log.debug('Session start ID %s', id(gevent.getcurrent()))
|
||||
ModelMeta.session_start()
|
||||
|
||||
worker.for_all.before(session_start)
|
||||
worker.for_all.after(session_end)
|
||||
|
||||
executor = zerorpc_executor.Executor(worker, address)
|
||||
gevent.spawn(executor.run)
|
||||
return worker, zerorpc_executor.Client(address)
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def setup_scheduler_callback(scheduler, tasks_worker):
|
||||
worker, client = scheduler
|
||||
scheduler_client = wscheduler.SchedulerCallbackClient(
|
||||
zerorpc_executor.Client(client.connect_to))
|
||||
tasks_worker.for_all.on_success(scheduler_client.update)
|
||||
tasks_worker.for_all.on_error(scheduler_client.update)
|
||||
|
||||
|
||||
def _wait_scheduling(plan, wait_time, waiter, client):
|
||||
@ -73,7 +23,7 @@ def _wait_scheduling(plan, wait_time, waiter, client):
|
||||
waiter.join(timeout=wait_time)
|
||||
|
||||
|
||||
def test_simple_fixture(simple_plan, scheduler):
|
||||
def test_simple_fixture(simple_plan, scheduler, tasks):
|
||||
worker, client = scheduler
|
||||
scheduling_results = []
|
||||
expected = [['echo_stuff'], ['just_fail'], []]
|
||||
@ -89,7 +39,7 @@ def test_simple_fixture(simple_plan, scheduler):
|
||||
assert scheduling_results == expected
|
||||
|
||||
|
||||
def test_sequential_fixture(sequential_plan, scheduler):
|
||||
def test_sequential_fixture(sequential_plan, scheduler, tasks):
|
||||
worker, client = scheduler
|
||||
scheduling_results = set()
|
||||
expected = {('s1',), ('s2',), ('s3',), ()}
|
||||
@ -105,7 +55,7 @@ def test_sequential_fixture(sequential_plan, scheduler):
|
||||
assert scheduling_results == expected
|
||||
|
||||
|
||||
def test_two_path_fixture(two_path_plan, scheduler):
|
||||
def test_two_path_fixture(two_path_plan, scheduler, tasks):
|
||||
worker, client = scheduler
|
||||
scheduling_results = set()
|
||||
expected = {'a', 'b', 'c', 'd', 'e'}
|
||||
|
@ -18,13 +18,8 @@ import gevent
|
||||
import mock
|
||||
import pytest
|
||||
|
||||
from solar.core.log import log
|
||||
from solar.dblayer.model import ModelMeta
|
||||
from solar.orchestration.executors import zerorpc_executor
|
||||
from solar.orchestration import graph
|
||||
from solar.orchestration.traversal import states
|
||||
from solar.orchestration.workers import scheduler as wscheduler
|
||||
from solar.orchestration.workers import tasks as wtasks
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
@ -34,46 +29,6 @@ def simple_plan_retries(simple_plan):
|
||||
return simple_plan
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def scheduler(request, scheduler_address):
|
||||
tasks_client = None
|
||||
if 'tasks' in request.node.fixturenames:
|
||||
tasks_client = zerorpc_executor.Client(
|
||||
request.getfuncargvalue('tasks_address'))
|
||||
worker = wscheduler.Scheduler(tasks_client)
|
||||
|
||||
def session_end(ctxt):
|
||||
log.debug('Session end ID %s', id(gevent.getcurrent()))
|
||||
ModelMeta.session_end()
|
||||
|
||||
def session_start(ctxt):
|
||||
log.debug('Session start ID %s', id(gevent.getcurrent()))
|
||||
ModelMeta.session_start()
|
||||
|
||||
worker.for_all.before(session_start)
|
||||
worker.for_all.after(session_end)
|
||||
|
||||
executor = zerorpc_executor.Executor(worker, scheduler_address)
|
||||
gevent.spawn(executor.run)
|
||||
return worker, zerorpc_executor.Client(scheduler_address)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def tasks(request, tasks_address):
|
||||
worker = wtasks.Tasks()
|
||||
executor = zerorpc_executor.Executor(worker, tasks_address)
|
||||
|
||||
if 'scheduler' in request.node.fixturenames:
|
||||
scheduler_client = wscheduler.SchedulerCallbackClient(
|
||||
zerorpc_executor.Client(request.getfuncargvalue(
|
||||
'scheduler_address')))
|
||||
worker.for_all.on_success(scheduler_client.update)
|
||||
worker.for_all.on_error(scheduler_client.error)
|
||||
|
||||
gevent.spawn(executor.run)
|
||||
return worker, zerorpc_executor.Client(tasks_address)
|
||||
|
||||
|
||||
def test_retry_just_fail(scheduler, tasks, simple_plan_retries):
|
||||
timeout = 3
|
||||
plan = simple_plan_retries
|
||||
|
@ -17,45 +17,15 @@
|
||||
import time
|
||||
|
||||
import gevent
|
||||
import pytest
|
||||
|
||||
from solar.dblayer import ModelMeta
|
||||
from solar.errors import ExecutionTimeout
|
||||
from solar.orchestration import Client
|
||||
from solar.orchestration import Executor
|
||||
from solar.orchestration import graph
|
||||
from solar.orchestration.traversal import states
|
||||
from solar.orchestration import workers
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def scheduler(scheduler_address, tasks_address):
|
||||
scheduler = workers.Scheduler(Client(tasks_address))
|
||||
scheduler_executor = Executor(scheduler, scheduler_address)
|
||||
scheduler.for_all.before(lambda ctxt: ModelMeta.session_start())
|
||||
scheduler.for_all.after(lambda ctxt: ModelMeta.session_end())
|
||||
gevent.spawn(scheduler_executor.run)
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def tasks(tasks_address, scheduler_address):
|
||||
scheduler = workers.SchedulerCallbackClient(
|
||||
Client(scheduler_address))
|
||||
tasks = workers.Tasks()
|
||||
tasks_executor = Executor(tasks, tasks_address)
|
||||
tasks.for_all.before(tasks_executor.register)
|
||||
tasks.for_all.on_success(scheduler.update)
|
||||
tasks.for_all.on_error(scheduler.error)
|
||||
gevent.spawn(tasks_executor.run)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def scheduler_client(scheduler_address):
|
||||
return Client(scheduler_address)
|
||||
|
||||
|
||||
def test_timelimit_plan(timelimit_plan, scheduler_client):
|
||||
scheduler_client.next({}, timelimit_plan.graph['uid'])
|
||||
def test_timelimit_plan(timelimit_plan, scheduler, tasks):
|
||||
worker, client = scheduler
|
||||
client.next({}, timelimit_plan.graph['uid'])
|
||||
|
||||
def wait_function(timeout):
|
||||
try:
|
||||
|
Loading…
x
Reference in New Issue
Block a user