diff --git a/turbo_hipster/task_plugins/gate_real_db_upgrade/task.py b/turbo_hipster/task_plugins/gate_real_db_upgrade/task.py index 792f9f6..b73e6e2 100644 --- a/turbo_hipster/task_plugins/gate_real_db_upgrade/task.py +++ b/turbo_hipster/task_plugins/gate_real_db_upgrade/task.py @@ -28,6 +28,8 @@ class Runner(threading.Thread): self.gearman_worker = None self.setup_gearman() + self.datasets = None + self.job = None self.work_data = None self.cancelled = False @@ -63,78 +65,79 @@ class Runner(threading.Thread): logging.debug("Waiting for job") self.current_step = 0 self.cancelled = False - job = self.gearman_worker.getJob() - self._handle_job(job) + self.job = self.gearman_worker.getJob() + self._handle_job() return except: logging.exception('Exception retrieving log event.') - def _handle_job(self, job): - try: - job_arguments = json.loads(job.arguments.decode('utf-8')) - self.log.debug("Got job from ZUUL %s" % job_arguments) + def _handle_job(self): + if self.job is not None: + try: + job_arguments = json.loads(self.job.arguments.decode('utf-8')) + self.log.debug("Got job from ZUUL %s" % job_arguments) - # Send an initial WORK_DATA and WORK_STATUS packets - self._send_work_data(job) + # Send an initial WORK_DATA and WORK_STATUS packets + self._send_work_data() - # Step 1: Checkout updates from git! - self._do_next_step(job) + # Step 1: Checkout updates from git! + self._do_next_step() + git_path = self._grab_patchset( + job_arguments['ZUUL_PROJECT'], + job_arguments['ZUUL_REF'] + ) - # Checkout the patchset - git_path = self._grab_patchset( - job_arguments['ZUUL_PROJECT'], - job_arguments['ZUUL_REF'] - ) + # Step 2: Run migrations on datasets + self._do_next_step() + self._execute_migrations(git_path) - # Step 2: - self._do_next_step(job) - self._execute_migrations(job, git_path) + # Step 3: Analyse logs for errors + self._do_next_step() + self._check_log_for_errors() - # Step 3: - self._do_next_step(job) - self._check_log_for_errors(job) + # Final step, send completed packet + self._send_work_data() + self.job.sendWorkComplete(json.dumps(self._get_work_data())) + except Exception as e: + self.log.exception('Exception handling log event.') + if not self.cancelled: + self.job.sendWorkException(str(e).encode('utf-8')) - # Final step, send completed packet - self._send_work_data(job) - job.sendWorkComplete(json.dumps(self._get_work_data())) - except Exception as e: - self.log.exception('Exception handling log event.') - if not self.cancelled: - job.sendWorkException(str(e).encode('utf-8')) - - def _get_logging_file(self, job): + def _get_logging_file(self, dataset): return os.path.join( self.config['job_working_dir'], - job.unique, - 'testing.log' + self.job.unique, + dataset + '.log' ) - def _check_log_for_errors(self, job): - logging_file = self._get_logging_file(job) + def _check_log_for_errors(self): + #logging_file = self._get_logging_file(job) self.work_data['result'] = "Failed: errors found in log" - job.sendWorkStatus(self.current_step, self.total_steps) - job.sendWorkFail() + self.job.sendWorkStatus(self.current_step, self.total_steps) + self.job.sendWorkFail() def _get_datasets(self): + if self.datasets is not None: + return self.datasets + datasets_path = os.path.join(os.path.dirname(__file__), 'datasets') - datasets = [] + self.datasets = {} for ent in os.listdir(datasets_path): if (os.path.isdir(os.path.join(datasets_path, ent)) and os.path.isfile( os.path.join(datasets_path, ent, 'config.json'))): - datasets.append( - os.path.join(datasets_path, ent)) + self.datasets[ent] = os.path.join(datasets_path, ent) - return datasets + return self.datasets - def _execute_migrations(self, job, git_path): + def _execute_migrations(self, git_path): """ Execute the migration on each dataset in datasets """ - for dataset_path in self._get_datasets(): + for dataset, dataset_path in self._get_datasets().items(): with open(os.path.join(dataset_path, 'config.json'), 'r') as config_stream: dataset_config = json.load(config_stream) @@ -154,7 +157,7 @@ class Runner(threading.Thread): ' %(dbuser)s %(dbpassword)s %(db)s' ' %(dataset_path)s %(pip_cache_dir)s') % { - 'unique_id': job.unique, + 'unique_id': self.job.unique, 'working_dir': self.config['job_working_dir'], 'git_path': git_path, 'dbuser': dataset_config['db_user'], @@ -167,7 +170,7 @@ class Runner(threading.Thread): utils.execute_to_log( cmd, - self._get_logging_file(job) + self._get_logging_file(dataset) ) def _grab_patchset(self, project_name, zuul_ref): @@ -198,24 +201,24 @@ class Runner(threading.Thread): ) return self.work_data - def _send_work_data(self, job): + def _send_work_data(self): """ Send the WORK DATA in json format for job """ - job.sendWorkData(json.dumps(self._get_work_data())) + self.job.sendWorkData(json.dumps(self._get_work_data())) - def _do_next_step(self, job): + def _do_next_step(self): """ Send a WORK_STATUS command to the gearman server. This can provide a progress bar. """ # Each opportunity we should check if we need to stop if self.stopped(): self.work_data['result'] = "Failed: Worker interrupted/stopped" - job.sendWorkStatus(self.current_step, self.total_steps) + self.job.sendWorkStatus(self.current_step, self.total_steps) raise Exception('Thread stopped', 'stopping') elif self.cancelled: self.work_data['result'] = "Failed: Job cancelled" - job.sendWorkStatus(self.current_step, self.total_steps) - job.sendWorkFail() + self.job.sendWorkStatus(self.current_step, self.total_steps) + self.job.sendWorkFail() raise Exception('Job cancelled', 'stopping') self.current_step += 1 - job.sendWorkStatus(self.current_step, self.total_steps) + self.job.sendWorkStatus(self.current_step, self.total_steps)