diff --git a/etc/config.json b/etc/config.json index 3c0559b..a26337d 100644 --- a/etc/config.json +++ b/etc/config.json @@ -5,7 +5,7 @@ "gearman_port": 4730 }, "debug_log": "/home/josh/var/log/turbo-hipster/debug.log", - "job_working_dir": "/home/josh/var/lib/turbo-hipster/jobs", + "jobs_working_dir": "/home/josh/var/lib/turbo-hipster/jobs", "git_working_dir": "/home/josh/var/lib/turbo-hipster/git", "pip_download_cache": "/home/josh/var/cache/pip", "plugins": ["gate_real_db_upgrade"] diff --git a/turbo_hipster/task_plugins/gate_real_db_upgrade/task.py b/turbo_hipster/task_plugins/gate_real_db_upgrade/task.py index 6d785e2..4749f5c 100644 --- a/turbo_hipster/task_plugins/gate_real_db_upgrade/task.py +++ b/turbo_hipster/task_plugins/gate_real_db_upgrade/task.py @@ -17,12 +17,19 @@ import gear import json import logging import os +import re import threading from lib import utils +import task_plugins.gate_real_db_upgrade.handle_results as handle_results + __worker_name__ = 'sql-migrate-test-runner-%s' % os.uname()[1] +# Regex for log checking +MIGRATION_START_RE = re.compile('([0-9]+) -> ([0-9]+)\.\.\.$') +MIGRATION_END_RE = re.compile('^done$') + class Runner(threading.Thread): @@ -106,7 +113,7 @@ class Runner(threading.Thread): # Step 3: Analyse logs for errors self._do_next_step() - self._check_log_for_errors() + self._check_all_dataset_logs_for_errors() # Step 4: handle the results (and upload etc) self._do_next_step() @@ -114,6 +121,8 @@ class Runner(threading.Thread): # Finally, send completed packet self._send_work_data() + + return if self.work_data['result'] is 'SUCCESS': self.job.sendWorkComplete( json.dumps(self._get_work_data())) @@ -124,21 +133,21 @@ class Runner(threading.Thread): if not self.cancelled: self.job.sendWorkException(str(e).encode('utf-8')) - def _get_logging_file(self, dataset): - return os.path.join( - self.config['job_working_dir'], - self.job.unique, - dataset['name'] + '.log' - ) - def _handle_results(self): """ pass over the results to handle_results.py for post-processing """ - pass + index_url = handle_results.generate_push_results(self._get_datasets()) + self.work_data['url'] = index_url - def _check_log_for_errors(self): - # logging_file = self._get_logging_file(job) + def _check_all_dataset_logs_for_errors(self): + failed = False + for dataset in self._get_datasets(): + # Look for the beginning of the migration start + pass - self.work_data['result'] = "Failed: errors found in log" + if failed: + self.work_data['result'] = "Failed: errors found in dataset log(s)" + else: + self.work_data['result'] = "SUCCESS" def _get_datasets(self): if len(self.datasets) > 0: @@ -153,6 +162,14 @@ class Runner(threading.Thread): dataset = {} dataset['name'] = ent dataset['path'] = os.path.join(datasets_path, ent) + dataset['job_working_dir'] = os.path.join( + self.config['jobs_working_dir'], + self.job.unique + ) + dataset['log_file_path'] = os.path.join( + dataset['job_working_dir'], + dataset['name'] + '.log' + ) with open(os.path.join(dataset['path'], 'config.json'), 'r') as config_stream: dataset['config'] = json.load(config_stream) @@ -177,12 +194,12 @@ class Runner(threading.Thread): # $7 is the path to the dataset to test against # $8 is the pip cache dir cmd += ( - (' %(unique_id)s %(working_dir)s %(git_path)s' + (' %(unique_id)s %(job_working_dir)s %(git_path)s' ' %(dbuser)s %(dbpassword)s %(db)s' ' %(dataset_path)s %(pip_cache_dir)s') % { 'unique_id': self.job.unique, - 'working_dir': self.config['job_working_dir'], + 'job_working_dir': dataset['job_working_dir'], 'git_path': git_path, 'dbuser': dataset['config']['db_user'], 'dbpassword': dataset['config']['db_pass'], @@ -206,7 +223,7 @@ class Runner(threading.Thread): utils.execute_to_log( cmd, - self._get_logging_file(dataset), + dataset['log_file_path'], watch_logs=[ ('[syslog]', syslog), ('[sqlslo]', sqlslo), @@ -226,6 +243,10 @@ class Runner(threading.Thread): ) ) + # reset to zuul's master + repo.reset() + + # Fetch patchset and checkout repo.fetch(zuul_ref) repo.checkout('FETCH_HEAD') @@ -254,12 +275,12 @@ class Runner(threading.Thread): if self.stopped(): self.work_data['result'] = "Failed: Worker interrupted/stopped" self.job.sendWorkStatus(self.current_step, self.total_steps) - raise Exception('Thread stopped', 'stopping') + raise Exception('Thread stopped') elif self.cancelled: self.work_data['result'] = "Failed: Job cancelled" self.job.sendWorkStatus(self.current_step, self.total_steps) self.job.sendWorkFail() - raise Exception('Job cancelled', 'stopping') + raise Exception('Job cancelled') self.current_step += 1 self.job.sendWorkStatus(self.current_step, self.total_steps)