diff --git a/doc/source/installation.rst b/doc/source/installation.rst index 9916eb7..a0a0732 100644 --- a/doc/source/installation.rst +++ b/doc/source/installation.rst @@ -67,10 +67,11 @@ Modify the config.json appropriately:: for projects. This is the cache directory used by pip. **plugins** A list of enabled plugins and their settings in a dictionary. - The only required parameter is *name* which should be the - same as the folder containing the plugin module. Any other - parameters are specified by the plugin themselves as - required. + The only required parameters are *name*, which should be the + same as the folder containing the plugin module, and + *function*, which is the function registered with zuul. + Any other parameters are specified by the plugin themselves + as required. **publish_logs** Log results from plugins can be published using multiple methods. Currently only a local copy is fully implemented. diff --git a/etc/turbo-hipster/config.json b/etc/turbo-hipster/config.json index 2afd604..325a373 100644 --- a/etc/turbo-hipster/config.json +++ b/etc/turbo-hipster/config.json @@ -11,8 +11,8 @@ "plugins": [ { "name": "gate_real_db_upgrade", - "datasets_dir": "/var/lib/turbo-hipster/datasets", - "job": "gate-real-db-upgrade_nova_mysql" + "function": "build:gate-real-db-upgrade_nova_mysql", + "datasets_dir": "/var/lib/turbo-hipster/datasets" } ], "publish_logs": { diff --git a/tests/fakes.py b/tests/fakes.py index 57d60aa..a4b3b60 100644 --- a/tests/fakes.py +++ b/tests/fakes.py @@ -21,15 +21,15 @@ import os import re import time -from turbo_hipster.worker_manager import GearmanManager +from turbo_hipster.worker_manager import ZuulManager from turbo_hipster.task_plugins.gate_real_db_upgrade.task import Runner\ as RealDbUpgradeRunner -class FakeGearmanManager(GearmanManager): +class FakeZuulManager(ZuulManager): def __init__(self, config, tasks, test): self.test = test - super(FakeGearmanManager, self).__init__(config, tasks) + super(FakeZuulManager, self).__init__(config, tasks) def setup_gearman(self): hostname = os.uname()[1] @@ -160,16 +160,6 @@ class FakeRealDbUpgradeRunner(RealDbUpgradeRunner): plugin_config, worker_name) - def setup_gearman(self): - self.log.debug("Set up real_db gearman worker") - self.gearman_worker = FakeWorker('FakeRealDbUpgradeRunner_worker', - self.test) - self.gearman_worker.addServer( - self.global_config['zuul_server']['gearman_host'], - self.global_config['zuul_server']['gearman_port'] - ) - self.register_functions() - class BuildHistory(object): def __init__(self, **kw): diff --git a/tests/test_worker_manager.py b/tests/test_worker_manager.py index 11fc960..89e02b2 100644 --- a/tests/test_worker_manager.py +++ b/tests/test_worker_manager.py @@ -19,7 +19,7 @@ import json import os import testtools import time -from fakes import FakeGearmanManager, FakeGearmanServer,\ +from fakes import FakeZuulManager, FakeGearmanServer,\ FakeRealDbUpgradeRunner CONFIG_DIR = os.path.join(os.path.dirname(__file__), 'etc') @@ -40,9 +40,7 @@ class TestGearmanManager(testtools.TestCase): 'test-worker-1', self) self.tasks = dict(FakeRealDbUpgradeRunner_worker=self.task) - self.gearman_manager = FakeGearmanManager(self.config, - self.tasks, - self) + self.gearman_manager = FakeZuulManager(self.config, self.tasks, self) def test_manager_function_registered(self): """ Check the manager is set up correctly and registered with the diff --git a/turbo_hipster/lib/utils.py b/turbo_hipster/lib/utils.py index 5a0237d..11a3ade 100644 --- a/turbo_hipster/lib/utils.py +++ b/turbo_hipster/lib/utils.py @@ -224,6 +224,8 @@ def scp_push_file(job_log_dir, file_path, local_config): def determine_job_identifier(zuul_arguments, job, unique): + if 'build:' in job: + job = job.split('build:')[1] return os.path.join(zuul_arguments['ZUUL_CHANGE'][:2], zuul_arguments['ZUUL_CHANGE'], zuul_arguments['ZUUL_PATCHSET'], diff --git a/turbo_hipster/task_plugins/gate_real_db_upgrade/task.py b/turbo_hipster/task_plugins/gate_real_db_upgrade/task.py index 7909fd1..a9f9e4b 100644 --- a/turbo_hipster/task_plugins/gate_real_db_upgrade/task.py +++ b/turbo_hipster/task_plugins/gate_real_db_upgrade/task.py @@ -13,12 +13,10 @@ # under the License. -import gear import json import logging import os import re -import threading from turbo_hipster.lib import utils @@ -31,7 +29,7 @@ MIGRATION_START_RE = re.compile('([0-9]+) -> ([0-9]+)\.\.\.$') MIGRATION_END_RE = re.compile('^done$') -class Runner(threading.Thread): +class Runner(object): """ This thread handles the actual sql-migration tests. It pulls in a gearman job from the build:gate-real-db-upgrade @@ -39,16 +37,12 @@ class Runner(threading.Thread): log = logging.getLogger("task_plugins.gate_real_db_upgrade.task.Runner") - def __init__(self, global_config, plugin_config, worker_name): - super(Runner, self).__init__() - self._stop = threading.Event() + def __init__(self, global_config, plugin_config, job_name): self.global_config = global_config self.plugin_config = plugin_config - - self.worker_name = worker_name + self.job_name = job_name # Set up the runner worker - self.gearman_worker = None self.datasets = [] self.job = None @@ -61,31 +55,6 @@ class Runner(threading.Thread): self.current_step = 0 self.total_steps = 4 - self.setup_gearman() - - def setup_gearman(self): - self.log.debug("Set up real_db gearman worker") - self.gearman_worker = gear.Worker(self.worker_name) - self.gearman_worker.addServer( - self.global_config['zuul_server']['gearman_host'], - self.global_config['zuul_server']['gearman_port'] - ) - self.register_functions() - - def register_functions(self): - self.gearman_worker.registerFunction( - 'build:' + self.plugin_config['job']) - - def stop(self): - self._stop.set() - # Unblock gearman - self.log.debug("Telling gearman to stop waiting for jobs") - self.gearman_worker.stopWaitingForJobs() - self.gearman_worker.shutdown() - - def stopped(self): - return self._stop.isSet() - def stop_worker(self, number): # Check the number is for this job instance # (makes it possible to run multiple workers with this task @@ -93,22 +62,10 @@ class Runner(threading.Thread): if number == self.job.unique: self.log.debug("We've been asked to stop by our gearman manager") self.cancelled = True + # TODO: Work out how to kill current step - def run(self): - while True and not self.stopped(): - try: - # Reset job information: - self.current_step = 0 - self.cancelled = False - self.work_data = None - # gearman_worker.getJob() blocks until a job is available - self.log.debug("Waiting for job") - self.job = self.gearman_worker.getJob() - self._handle_job() - except: - self.log.exception('Exception retrieving log event.') - - def _handle_job(self): + def start_job(self, job): + self.job = job if self.job is not None: try: self.job_arguments = \ @@ -215,7 +172,7 @@ class Runner(threading.Thread): dataset['config']['project'] and self._get_project_command(dataset['config']['type'])): dataset['determined_path'] = utils.determine_job_identifier( - self.job_arguments, self.plugin_config['job'], + self.job_arguments, self.plugin_config['function'], self.job.unique ) dataset['job_log_file_path'] = os.path.join( @@ -315,7 +272,7 @@ class Runner(threading.Thread): project_name + '/.git', os.path.join( self.global_config['git_working_dir'], - self.worker_name, + self.job_name, project_name ) ) @@ -333,7 +290,7 @@ class Runner(threading.Thread): if self.work_data is None: hostname = os.uname()[1] self.work_data = dict( - name=self.worker_name, + name=self.job_name, number=self.job.unique, manager='turbo-hipster-manager-%s' % hostname, url='http://localhost', diff --git a/turbo_hipster/worker_manager.py b/turbo_hipster/worker_manager.py index 72034fb..4b2e02f 100644 --- a/turbo_hipster/worker_manager.py +++ b/turbo_hipster/worker_manager.py @@ -20,7 +20,7 @@ import os import threading -class GearmanManager(threading.Thread): +class ZuulManager(threading.Thread): """ This thread manages all of the launched gearman workers. As required by the zuul protocol it handles stopping builds when they @@ -31,7 +31,7 @@ class GearmanManager(threading.Thread): log = logging.getLogger("worker_manager.GearmanManager") def __init__(self, config, tasks): - super(GearmanManager, self).__init__() + super(ZuulManager, self).__init__() self._stop = threading.Event() self.config = config self.tasks = tasks @@ -81,3 +81,67 @@ class GearmanManager(threading.Thread): except Exception as e: self.log.exception('Exception handling log event.') job.sendWorkException(str(e).encode('utf-8')) + + +class ZuulClient(threading.Thread): + + """ ...""" + + log = logging.getLogger("worker_manager.ZuulClient") + + def __init__(self, global_config, worker_name): + super(ZuulClient, self).__init__() + self._stop = threading.Event() + self.global_config = global_config + + self.worker_name = worker_name + + # Set up the runner worker + self.gearman_worker = None + self.functions = {} + + self.job = None + self.cancelled = False + + self.setup_gearman() + + def setup_gearman(self): + self.log.debug("Set up gearman worker") + self.gearman_worker = gear.Worker(self.worker_name) + self.gearman_worker.addServer( + self.global_config['zuul_server']['gearman_host'], + self.global_config['zuul_server']['gearman_port'] + ) + self.register_functions() + + def register_functions(self): + for function_name, plugin in self.functions.items(): + self.gearman_worker.registerFunction(function_name) + + def add_function(self, function_name, plugin): + self.functions[function_name] = plugin + + def stop(self): + self._stop.set() + # Unblock gearman + self.log.debug("Telling gearman to stop waiting for jobs") + self.gearman_worker.stopWaitingForJobs() + self.gearman_worker.shutdown() + + def stopped(self): + return self._stop.isSet() + + def run(self): + while True and not self.stopped(): + try: + self.cancelled = False + # gearman_worker.getJob() blocks until a job is available + self.log.debug("Waiting for job") + self.job = self.gearman_worker.getJob() + self._handle_job() + except: + self.log.exception('Exception retrieving log event.') + + def _handle_job(self): + """ We have a job, give it to the right plugin """ + self.functions[self.job.name].start_job(self.job) diff --git a/turbo_hipster/worker_server.py b/turbo_hipster/worker_server.py index 8bbf42d..556dcf9 100755 --- a/turbo_hipster/worker_server.py +++ b/turbo_hipster/worker_server.py @@ -40,16 +40,18 @@ class Server(object): log = logging.getLogger("worker_server.Server") def __init__(self, config): - # Config init - self.config = config - self.manager = None - self.plugins = [] - self.load_plugins() - # Python logging output file. self.debug_log = self.config['debug_log'] + # Config init + self.config = config + self.zuul_manager = None + self.zuul_client = None + self.plugins = [] + self.worker_name = os.uname()[1] + self.tasks = {} + self.load_plugins() def setup_logging(self): if self.debug_log: @@ -74,23 +76,30 @@ class Server(object): 'plugin_config': plugin }) - def run_tasks(self): + def start_gearman_workers(self): """ Run the tasks """ - for thread_number, plugin in enumerate(self.plugins): + self.zuul_client = worker_manager.ZuulClient(self.config, + self.worker_name) + + for task_number, plugin in enumerate(self.plugins): module = plugin['module'] - worker_name = '%s-%s-%s' % (plugin['plugin_config']['name'], - os.uname()[1], thread_number) - self.tasks[worker_name] = module.Runner( + job_name = '%s-%s-%s' % (plugin['plugin_config']['name'], + self.worker_name, task_number) + self.tasks[job_name] = module.Runner( self.config, plugin['plugin_config'], - worker_name + job_name ) - self.tasks[worker_name].daemon = True - self.tasks[worker_name].start() + self.zuul_client.add_function(plugin['plugin_config']['function'], + self.tasks[job_name]) - self.manager = worker_manager.GearmanManager(self.config, self.tasks) - self.manager.daemon = True - self.manager.start() + self.zuul_client.register_functions() + self.zuul_client.daemon = True + self.zuul_client.start() + + self.zuul_manager = worker_manager.ZuulManager(self.config, self.tasks) + self.zuul_manager.daemon = True + self.zuul_manager.start() def exit_handler(self, signum): signal.signal(signal.SIGUSR1, signal.SIG_IGN) @@ -101,7 +110,7 @@ class Server(object): def main(self): self.setup_logging() - self.run_tasks() + self.start_gearman_workers() while True: try: