Allow jobs to shutdown turbo-hipster

This lets jobs turn off and exit turbo-hipster once they are done.
This is useful for when using nodepool or when a job leaves the
environment dirty and we can't run more jobs on this worker.

Change-Id: I823be4196a5bf9ca92a14d9caf26163398a9434c
This commit is contained in:
Joshua Hesketh 2014-03-25 16:26:45 +11:00
parent 5cd8ea3ce3
commit 96adb287f8
11 changed files with 268 additions and 110 deletions

View File

@ -0,0 +1,33 @@
zuul_server:
gerrit_site: http://review.openstack.org
zuul_site: http://119.9.13.90
git_origin: git://git.openstack.org/
gearman_host: localhost
gearman_port: 0
debug_log: /var/log/turbo-hipster/debug.log
jobs_working_dir: /var/lib/turbo-hipster/jobs
git_working_dir: /var/lib/turbo-hipster/git
pip_download_cache: /var/cache/pip
plugins:
- name: gate_real_db_upgrade
datasets_dir: /var/lib/turbo-hipster/datasets_devstack_131007
function: build:gate-real-db-upgrade_nova_mysql_devstack_131007
- name: gate_real_db_upgrade
datasets_dir: /var/lib/turbo-hipster/datasets_user_001
function: build:gate-real-db-upgrade_nova_mysql_user_001
- name: shell_script
function: build:do_something_shelly
publish_logs:
type: swift
authurl: https://identity.api.rackspacecloud.com/v2.0/
tenant: XXXX
user: XXXXXX
password: XXXXXX
container: XXXXXX
region: SYD
prepend_url: http://www.rcbops.com/turbo_hipster/results/

View File

@ -0,0 +1,25 @@
zuul_server:
gerrit_site: http://review.openstack.org
zuul_site: http://119.9.13.90
git_origin: git://git.openstack.org/
gearman_host: localhost
gearman_port: 0
debug_log: /var/log/turbo-hipster/debug.log
jobs_working_dir: /var/lib/turbo-hipster/jobs
git_working_dir: /var/lib/turbo-hipster/git
pip_download_cache: /var/cache/pip
plugins:
- name: shell_script
function: build:demo_job_clean
shell_script: /dev/null
- name: shell_script
function: build:demo_job_dirty
shell_script: /dev/null
shutdown-th: true
publish_logs:
type: local
path: /var/lib/turbo_hipster/logs
prepend_url: http://mylogserver/

View File

@ -14,6 +14,11 @@
# License for the specific language governing permissions and limitations
# under the License.
import gear
import json
import time
import uuid
class FakeJob(object):
def __init__(self):
@ -21,3 +26,43 @@ class FakeJob(object):
def sendWorkStatus(self, *args, **kwargs):
pass
class FakeZuul(object):
"""A fake zuul/gearman client to request work from gearman and check
results"""
def __init__(self, server, port):
self.gearman = gear.Client('FakeZuul')
self.gearman.addServer(server, port)
self.gearman.waitForServer()
self.job = None
def make_zuul_data(self, data={}):
defaults = {
'ZUUL_UUID': str(uuid.uuid1()),
'ZUUL_REF': 'a',
'ZUUL_COMMIT': 'a',
'ZUUL_PROJECT': 'a',
'ZUUL_PIPELINE': 'a',
'ZUUL_URL': 'http://localhost',
'BASE_LOG_PATH': '56/123456/8',
'LOG_PATH': '56/123456/8/check/job_name/uuid123'
}
defaults.update(data)
return defaults
def submit_job(self, name, data):
if not self.job:
self.job = gear.Job(name,
json.dumps(data),
unique=str(time.time()))
self.gearman.submitJob(self.job)
else:
raise Exception('A job already exists in self.job')
return self.job
def wait_for_completion(self):
if self.job:
while not self.job.complete:
time.sleep(0.1)

View File

@ -1,35 +0,0 @@
{
"zuul_server": {
"gerrit_site": "http://review.openstack.org",
"zuul_site": "http://localhost",
"git_origin": "git://git.openstack.org/",
"gearman_host": "localhost",
"gearman_port": 0
},
"debug_log": "/home/josh/var/log/turbo-hipster/debug.log",
"jobs_working_dir": "/home/josh/var/lib/turbo-hipster/jobs",
"git_working_dir": "/home/josh/var/lib/turbo-hipster/git",
"pip_download_cache": "/home/josh/var/cache/pip",
"plugins": [
{
"name": "gate_real_db_upgrade",
"datasets_dir": "/var/lib/turbo-hipster/datasets_devstack_131007",
"function": "build:gate-real-db-upgrade_nova_mysql_devstack_131007"
},
{
"name": "gate_real_db_upgrade",
"datasets_dir": "/var/lib/turbo-hipster/datasets_user_001",
"function": "build:gate-real-db-upgrade_nova_mysql_user_001"
},
{
"name": "shell_script",
"function": "build:do_something_shelly"
}
],
"publish_logs":
{
"type": "local",
"path": "/home/josh/var/www/results/",
"prepend_url": "http://localhost/results/"
}
}

View File

@ -15,6 +15,7 @@
# under the License.
import fixtures
import gear
import logging
import os
@ -25,6 +26,8 @@ import yaml
import turbo_hipster.task_plugins.gate_real_db_upgrade.task
import turbo_hipster.worker_server
import fakes
logging.basicConfig(level=logging.DEBUG,
format='%(asctime)s %(name)-32s '
'%(levelname)-8s %(message)s')
@ -36,12 +39,13 @@ class TestWithGearman(testtools.TestCase):
def setUp(self):
super(TestWithGearman, self).setUp()
self.config = []
self._load_config_fixture()
self.config = None
self.worker_server = None
self.gearman_server = gear.Server(0)
def start_server(self):
if not self.config:
self._load_config_fixture()
# Grab the port so the clients can connect to it
self.config['zuul_server']['gearman_port'] = self.gearman_server.port
@ -57,20 +61,39 @@ class TestWithGearman(testtools.TestCase):
self.fail("Failed to start worker_service services")
def tearDown(self):
self.worker_server.stop()
if self.worker_server and not self.worker_server.stopped():
self.worker_server.shutdown()
self.gearman_server.shutdown()
super(TestWithGearman, self).tearDown()
def _load_config_fixture(self, config_name='default-config.json'):
config_dir = os.path.join(os.path.dirname(__file__), 'fixtures')
def _load_config_fixture(self, config_name='default-config.yaml'):
config_dir = os.path.join(os.path.dirname(__file__), 'etc')
with open(os.path.join(config_dir, config_name), 'r') as config_stream:
self.config = yaml.safe_load(config_stream)
# Set all of the working dirs etc to a writeable temp dir
temp_path = self.useFixture(fixtures.TempDir()).path
for config_dir in ['debug_log', 'jobs_working_dir', 'git_working_dir',
'pip_download_cache']:
if config_dir in self.config:
if self.config[config_dir][0] == '/':
self.config[config_dir] = self.config[config_dir][1:]
self.config[config_dir] = os.path.join(temp_path,
self.config[config_dir])
if self.config['publish_logs']['type'] == 'local':
if self.config['publish_logs']['path'][0] == '/':
self.config['publish_logs']['path'] = \
self.config['publish_logs']['path'][1:]
self.config['publish_logs']['path'] = os.path.join(
temp_path, self.config[config_dir])
class TestWorkerServer(TestWithGearman):
def test_plugins_load(self):
"Test the configured plugins are loaded"
self.start_server()
self.assertFalse(self.worker_server.stopped())
self.assertEqual(3, len(self.worker_server.plugins))
@ -112,10 +135,12 @@ class TestWorkerServer(TestWithGearman):
def test_zuul_client_started(self):
"Test the zuul client has been started"
self.start_server()
self.assertFalse(self.worker_server.zuul_client.stopped())
def test_zuul_manager_started(self):
"Test the zuul manager has been started"
self.start_server()
self.assertFalse(self.worker_server.zuul_manager.stopped())
@ -126,6 +151,9 @@ class TestZuulClient(TestWithGearman):
def test_registered_functions(self):
"Test the correct functions are registered with gearman"
self.start_server()
# The client should have all of the functions defined in the config
# registered with gearman
@ -160,10 +188,36 @@ class TestZuulClient(TestWithGearman):
"Test sending a stop signal to the client exists correctly"
pass
def test_job_can_shutdown_th(self):
self._load_config_fixture('shutdown-config.yaml')
self.start_server()
zuul = fakes.FakeZuul(self.config['zuul_server']['gearman_host'],
self.config['zuul_server']['gearman_port'])
# First check we can run a job that /doesn't/ shut down turbo-hipster
data_req = zuul.make_zuul_data()
zuul.submit_job('build:demo_job_clean', data_req)
zuul.wait_for_completion()
self.assertTrue(zuul.job.complete)
self.assertFalse(self.worker_server.stopped())
# Now run a job that leaves the environment dirty and /should/ shut
# down turbo-hipster
zuul.job = None
zuul.submit_job('build:demo_job_dirty', data_req)
zuul.wait_for_completion()
self.assertTrue(zuul.job.complete)
# Give the server a second to shutdown
time.sleep(1)
self.assertTrue(self.worker_server.stopped())
class TestZuulManager(TestWithGearman):
def test_registered_functions(self):
"Test the correct functions are registered with gearman"
self.start_server()
# We need to wait for all the functions to register with the server..
# We'll give it up to 10seconds to do so
t0 = time.time()

View File

@ -44,7 +44,7 @@ def main(args):
server.setup_logging(config['debug_log'])
def term_handler(signum, frame):
server.stop()
server.shutdown()
signal.signal(signal.SIGTERM, term_handler)
if args.background:
@ -56,7 +56,7 @@ def main(args):
signal.pause()
except KeyboardInterrupt:
print "Ctrl + C: asking tasks to exit nicely...\n"
server.stop()
server.shutdown()
if __name__ == '__main__':

View File

@ -26,8 +26,8 @@ class Task(object):
""" A base object for running a job (aka Task) """
log = logging.getLogger("lib.models.Task")
def __init__(self, global_config, plugin_config, job_name):
self.global_config = global_config
def __init__(self, worker_server, plugin_config, job_name):
self.worker_server = worker_server
self.plugin_config = plugin_config
self.job_name = job_name
self._reset()
@ -125,10 +125,10 @@ class Task(object):
class ShellTask(Task):
log = logging.getLogger("lib.models.ShellTask")
def __init__(self, global_config, plugin_config, job_name):
super(ShellTask, self).__init__(global_config, plugin_config, job_name)
def __init__(self, worker_server, plugin_config, job_name):
super(ShellTask, self).__init__(worker_server, plugin_config, job_name)
# Define the number of steps we will do to determine our progress.
self.total_steps = 5
self.total_steps = 6
def _reset(self):
super(ShellTask, self)._reset()
@ -137,21 +137,24 @@ class ShellTask(Task):
self.shell_output_log = None
def do_job_steps(self):
# Step 1: Prep job working dir
self.log.info('Step 1: Prep job working dir')
self._prep_working_dir()
# Step 2: Checkout updates from git
self.log.info('Step 2: Checkout updates from git')
self._grab_patchset(self.job_arguments)
# Step 3: Run shell script
self.log.info('Step 3: Run shell script')
self._execute_script()
# Step 4: Analyse logs for errors
self.log.info('Step 4: Analyse logs for errors')
self._parse_and_check_results()
# Step 5: handle the results (and upload etc)
self.log.info('Step 5: handle the results (and upload etc)')
self._handle_results()
self.log.info('Step 6: Handle extra actions such as shutting down')
self._handle_cleanup()
@common.task_step
def _prep_working_dir(self):
self.job_identifier = utils.determine_job_identifier(
@ -160,7 +163,7 @@ class ShellTask(Task):
self.job.unique
)
self.job_working_dir = os.path.join(
self.global_config['jobs_working_dir'],
self.worker_server.config['jobs_working_dir'],
self.job_identifier
)
self.shell_output_log = os.path.join(
@ -176,7 +179,7 @@ class ShellTask(Task):
""" Checkout the reference into config['git_working_dir'] """
self.log.debug("Grab the patchset we want to test against")
local_path = os.path.join(self.global_config['git_working_dir'],
local_path = os.path.join(self.worker_server.config['git_working_dir'],
self.job_name, job_args['ZUUL_PROJECT'])
if not os.path.exists(local_path):
os.makedirs(local_path)
@ -185,8 +188,8 @@ class ShellTask(Task):
cmd = os.path.join(os.path.join(os.path.dirname(__file__),
'gerrit-git-prep.sh'))
cmd += ' ' + self.global_config['zuul_server']['gerrit_site']
cmd += ' ' + self.global_config['zuul_server']['zuul_site']
cmd += ' ' + self.worker_server.config['zuul_server']['gerrit_site']
cmd += ' ' + self.worker_server.config['zuul_server']['zuul_site']
utils.execute_to_log(cmd, self.shell_output_log, env=git_args,
cwd=local_path)
self.git_path = local_path
@ -223,10 +226,10 @@ class ShellTask(Task):
self.log.debug("Process the resulting files (upload/push)")
if 'publish_logs' in self.global_config:
index_url = utils.push_file(self.job_identifier,
self.shell_output_log,
self.global_config['publish_logs'])
if 'publish_logs' in self.worker_server.config:
index_url = utils.push_file(
self.job_identifier, self.shell_output_log,
self.worker_server.config['publish_logs'])
self.log.debug("Index URL found at %s" % index_url)
self.work_data['url'] = index_url
@ -234,3 +237,11 @@ class ShellTask(Task):
# Upload to zuul's url as instructed
utils.zuul_swift_upload(self.job_working_dir, self.job_arguments)
self.work_data['url'] = self.job_identifier
@common.task_step
def _handle_cleanup(self):
"""Handle and cleanup functions. Shutdown if requested to so that no
further jobs are ran if the environment is dirty."""
if ('shutdown-th' in self.plugin_config and
self.plugin_config['shutdown-th']):
self.worker_server.shutdown_gracefully()

View File

@ -256,17 +256,8 @@ def scp_push_file(results_set_name, file_path, local_config):
def determine_job_identifier(zuul_arguments, job, unique):
if 'build:' in job:
job = job.split('build:')[1]
path = os.path.join(zuul_arguments['ZUUL_CHANGE'][:2],
zuul_arguments['ZUUL_CHANGE'],
zuul_arguments['ZUUL_PATCHSET'],
zuul_arguments['ZUUL_PIPELINE'],
job,
unique[:7])
log.info('Converted args: %s, job: %s and unique: %s to %s'
% (zuul_arguments, job, unique, path))
# use new determined path from zuul
path = zuul_arguments['LOG_PATH']
return path

View File

@ -40,15 +40,15 @@ class Runner(models.ShellTask):
log = logging.getLogger("task_plugins.gate_real_db_upgrade.task.Runner")
def __init__(self, global_config, plugin_config, job_name):
super(Runner, self).__init__(global_config, plugin_config, job_name)
def __init__(self, worker_server, plugin_config, job_name):
super(Runner, self).__init__(worker_server, plugin_config, job_name)
# Set up the runner worker
self.datasets = []
self.job_datasets = []
# Define the number of steps we will do to determine our progress.
self.total_steps = 6
self.total_steps += 1
def do_job_steps(self):
# Step 1: Figure out which datasets to run
@ -74,7 +74,7 @@ class Runner(models.ShellTask):
self.job.unique
)
dataset['job_log_file_path'] = os.path.join(
self.global_config['jobs_working_dir'],
self.worker_server.config['jobs_working_dir'],
dataset['determined_path'],
dataset['name'] + '.log'
)
@ -102,7 +102,7 @@ class Runner(models.ShellTask):
self.log.debug("Process the resulting files (upload/push)")
index_url = handle_results.generate_push_results(
self.job_datasets,
self.global_config['publish_logs']
self.worker_server.config['publish_logs']
)
self.log.debug("Index URL found at %s" % index_url)
self.work_data['url'] = index_url
@ -162,8 +162,8 @@ class Runner(models.ShellTask):
for dataset in self.job_datasets:
cmd = os.path.join(os.path.join(os.path.dirname(__file__),
(self.global_config['baseline_command']
% self.global_config['flavor'])))
(self.worker_server.config['baseline_command']
% self.worker_server.config['flavor'])))
rc = utils.execute_to_log(
cmd,
dataset['job_log_file_path'],
@ -187,7 +187,7 @@ class Runner(models.ShellTask):
% {
'unique_id': self.job.unique,
'job_working_dir': os.path.join(
self.global_config['jobs_working_dir'],
self.worker_server.config['jobs_working_dir'],
dataset['determined_path']
),
'git_path': self.git_path,
@ -202,7 +202,8 @@ class Runner(models.ShellTask):
dataset['dataset_dir'],
dataset['config']['logging_conf']
),
'pip_cache_dir': self.global_config['pip_download_cache']
'pip_cache_dir':
self.worker_server.config['pip_download_cache']
}
)
@ -210,13 +211,13 @@ class Runner(models.ShellTask):
syslog = '/var/log/syslog'
sqlslo = '/var/log/mysql/slow-queries.log'
sqlerr = '/var/log/mysql/error.log'
if 'logs' in self.global_config:
if 'syslog' in self.global_config['logs']:
syslog = self.global_config['logs']['syslog']
if 'sqlslo' in self.global_config['logs']:
sqlslo = self.global_config['logs']['sqlslo']
if 'sqlerr' in self.global_config['logs']:
sqlerr = self.global_config['logs']['sqlerr']
if 'logs' in self.worker_server.config:
if 'syslog' in self.worker_server.config['logs']:
syslog = self.worker_server.config['logs']['syslog']
if 'sqlslo' in self.worker_server.config['logs']:
sqlslo = self.worker_server.config['logs']['sqlslo']
if 'sqlerr' in self.worker_server.config['logs']:
sqlerr = self.worker_server.config['logs']['sqlerr']
rc = utils.execute_to_log(
cmd,

View File

@ -18,6 +18,7 @@ import json
import logging
import os
import threading
import time
class ZuulManager(threading.Thread):
@ -30,10 +31,13 @@ class ZuulManager(threading.Thread):
log = logging.getLogger("worker_manager.ZuulManager")
def __init__(self, config, tasks):
def __init__(self, worker_server, tasks):
super(ZuulManager, self).__init__()
self._stop = threading.Event()
self.config = config
self.stopping = False
self.running = False
self.worker_server = worker_server
self.tasks = tasks
self.gearman_worker = None
@ -44,8 +48,8 @@ class ZuulManager(threading.Thread):
self.gearman_worker = gear.Worker('turbo-hipster-manager-%s'
% hostname)
self.gearman_worker.addServer(
self.config['zuul_server']['gearman_host'],
self.config['zuul_server']['gearman_port']
self.worker_server.config['zuul_server']['gearman_host'],
self.worker_server.config['zuul_server']['gearman_port']
)
def register_functions(self):
@ -53,6 +57,15 @@ class ZuulManager(threading.Thread):
self.gearman_worker.registerFunction(
'stop:turbo-hipster-manager-%s' % hostname)
def stop_gracefully(self):
self.stopping = True
self.gearman_worker.stopWaitingForJobs()
while self.running:
self.log.debug('waiting to finish')
time.sleep(0.1)
self._stop.set()
self.gearman_worker.shutdown()
def stop(self):
self._stop.set()
# Unblock gearman
@ -64,7 +77,8 @@ class ZuulManager(threading.Thread):
return self._stop.isSet()
def run(self):
while not self.stopped():
while not self.stopped() and not self.stopping:
self.running = True
try:
# gearman_worker.getJob() blocks until a job is available
self.log.debug("Waiting for server")
@ -81,6 +95,7 @@ class ZuulManager(threading.Thread):
self.log.debug('We were asked to stop waiting for jobs')
except:
self.log.exception('Unknown exception waiting for job.')
self.running = False
self.log.debug("Finished manager thread")
def _handle_job(self, job):
@ -101,12 +116,13 @@ class ZuulClient(threading.Thread):
log = logging.getLogger("worker_manager.ZuulClient")
def __init__(self, global_config, worker_name):
def __init__(self, worker_server):
super(ZuulClient, self).__init__()
self._stop = threading.Event()
self.global_config = global_config
self.stopping = False
self.running = False
self.worker_name = worker_name
self.worker_server = worker_server
# Set up the runner worker
self.gearman_worker = None
@ -118,10 +134,10 @@ class ZuulClient(threading.Thread):
def setup_gearman(self):
self.log.debug("Set up gearman worker")
self.gearman_worker = gear.Worker(self.worker_name)
self.gearman_worker = gear.Worker(self.worker_server.worker_name)
self.gearman_worker.addServer(
self.global_config['zuul_server']['gearman_host'],
self.global_config['zuul_server']['gearman_port']
self.worker_server.config['zuul_server']['gearman_host'],
self.worker_server.config['zuul_server']['gearman_port']
)
def register_functions(self):
@ -143,11 +159,20 @@ class ZuulClient(threading.Thread):
self.gearman_worker.stopWaitingForJobs()
self.gearman_worker.shutdown()
def stop_gracefully(self):
self.stopping = True
self.gearman_worker.stopWaitingForJobs()
while self.running:
time.sleep(0.1)
self._stop.set()
self.gearman_worker.shutdown()
def stopped(self):
return self._stop.isSet()
def run(self):
while not self.stopped():
while not self.stopped() and not self.stopping:
self.running = True
try:
# gearman_worker.getJob() blocks until a job is available
self.log.debug("Waiting for server")
@ -163,6 +188,7 @@ class ZuulClient(threading.Thread):
self.log.debug('We were asked to stop waiting for jobs')
except:
self.log.exception('Unknown exception waiting for job.')
self.running = False
self.log.debug("Finished client thread")
def _handle_job(self):

25
turbo_hipster/worker_server.py Executable file → Normal file
View File

@ -15,9 +15,6 @@
# under the License.
""" worker_server.py is an executable worker server that loads and runs
task_plugins. """
import logging
import os
import threading
@ -104,15 +101,14 @@ class Server(threading.Thread):
def start_zuul_client(self):
""" Run the tasks """
self.log.debug('Starting zuul client')
self.zuul_client = worker_manager.ZuulClient(self.config,
self.worker_name)
self.zuul_client = worker_manager.ZuulClient(self)
for task_number, plugin in enumerate(self.plugins):
module = plugin['module']
job_name = '%s-%s-%s' % (plugin['plugin_config']['name'],
self.worker_name, task_number)
self.tasks[job_name] = module.Runner(
self.config,
self,
plugin['plugin_config'],
job_name
)
@ -122,14 +118,25 @@ class Server(threading.Thread):
self.zuul_client.start()
def start_zuul_manager(self):
self.zuul_manager = worker_manager.ZuulManager(self.config, self.tasks)
self.zuul_manager = worker_manager.ZuulManager(self, self.tasks)
self.zuul_manager.start()
def stop(self):
def shutdown_gracefully(self):
""" Shutdown while no work is currently happening """
self.log.debug('Graceful shutdown once jobs are complete...')
thread = threading.Thread(target=self._shutdown_gracefully)
thread.start()
def _shutdown_gracefully(self):
self.zuul_client.stop_gracefully()
self.zuul_manager.stop_gracefully()
self._stop.set()
self.log.debug('Exiting...')
def shutdown(self):
self.log.debug('Shutting down now!...')
self.zuul_client.stop()
self.zuul_manager.stop()
self._stop.set()
def stopped(self):
return self._stop.isSet()