#!/usr/bin/python2 # # Copyright 2013 Rackspace Australia # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. import gear import json import threading import os import re import time from turbo_hipster.worker_manager import ZuulManager from turbo_hipster.task_plugins.gate_real_db_upgrade.task import Runner\ as RealDbUpgradeRunner class FakeZuulManager(ZuulManager): def __init__(self, config, tasks, test): self.test = test super(FakeZuulManager, self).__init__(config, tasks) def setup_gearman(self): hostname = os.uname()[1] self.gearman_worker = FakeWorker('turbo-hipster-manager-%s' % hostname, self.test) self.gearman_worker.addServer( self.config['zuul_server']['gearman_host'], self.config['zuul_server']['gearman_port'] ) self.gearman_worker.registerFunction( 'stop:turbo-hipster-manager-%s' % hostname) class FakeWorker(gear.Worker): def __init__(self, worker_id, test): super(FakeWorker, self).__init__(worker_id) self.gearman_jobs = {} self.build_history = [] self.running_builds = [] self.build_counter = 0 self.fail_tests = {} self.test = test self.hold_jobs_in_build = False self.lock = threading.Lock() self.__work_thread = threading.Thread(target=self.work) self.__work_thread.daemon = True self.__work_thread.start() def handleJob(self, job): parts = job.name.split(":") cmd = parts[0] name = parts[1] if len(parts) > 2: node = parts[2] else: node = None if cmd == 'build': self.handleBuild(job, name, node) elif cmd == 'stop': self.handleStop(job, name) elif cmd == 'set_description': self.handleSetDescription(job, name) def handleBuild(self, job, name, node): build = FakeBuild(self, job, self.build_counter, node) job.build = build self.gearman_jobs[job.unique] = job self.build_counter += 1 self.running_builds.append(build) build.start() def handleStop(self, job, name): self.log.debug("handle stop") parameters = json.loads(job.arguments) name = parameters['name'] number = parameters['number'] for build in self.running_builds: if build.name == name and build.number == number: build.aborted = True build.release() job.sendWorkComplete() return job.sendWorkFail() def handleSetDescription(self, job, name): self.log.debug("handle set description") parameters = json.loads(job.arguments) name = parameters['name'] number = parameters['number'] descr = parameters['html_description'] for build in self.running_builds: if build.name == name and build.number == number: build.description = descr job.sendWorkComplete() return for build in self.build_history: if build.name == name and build.number == number: build.description = descr job.sendWorkComplete() return job.sendWorkFail() def work(self): while self.running: try: job = self.getJob() except gear.InterruptedError: continue try: self.handleJob(job) except: self.log.exception("Worker exception:") def addFailTest(self, name, change): l = self.fail_tests.get(name, []) l.append(change) self.fail_tests[name] = l def shouldFailTest(self, name, ref): l = self.fail_tests.get(name, []) for change in l: if self.test.ref_has_change(ref, change): return True return False def release(self, regex=None): builds = self.running_builds[:] self.log.debug("releasing build %s (%s)" % (regex, len(self.running_builds))) for build in builds: if not regex or re.match(regex, build.name): self.log.debug("releasing build %s" % (build.parameters['ZUUL_UUID'])) build.release() else: self.log.debug("not releasing build %s" % (build.parameters['ZUUL_UUID'])) self.log.debug("done releasing builds %s (%s)" % (regex, len(self.running_builds))) class FakeRealDbUpgradeRunner(RealDbUpgradeRunner): def __init__(self, global_config, plugin_config, worker_name, test): self.test = test super(FakeRealDbUpgradeRunner, self).__init__(global_config, plugin_config, worker_name) class BuildHistory(object): def __init__(self, **kw): self.__dict__.update(kw) def __repr__(self): return ("" % (self.result, self.name, self.number, self.changes)) class FakeBuild(threading.Thread): def __init__(self, worker, job, number, node): threading.Thread.__init__(self) self.daemon = True self.worker = worker self.job = job self.name = job.name.split(':')[1] self.number = number self.node = node self.parameters = json.loads(job.arguments) self.unique = self.parameters['ZUUL_UUID'] self.wait_condition = threading.Condition() self.waiting = False self.aborted = False self.created = time.time() self.description = '' def release(self): self.wait_condition.acquire() self.wait_condition.notify() self.waiting = False self.log.debug("Build %s released" % self.unique) self.wait_condition.release() def isWaiting(self): self.wait_condition.acquire() if self.waiting: ret = True else: ret = False self.wait_condition.release() return ret def _wait(self): self.wait_condition.acquire() self.waiting = True self.log.debug("Build %s waiting" % self.unique) self.wait_condition.wait() self.wait_condition.release() def run(self): data = { 'url': 'https://server/job/%s/%s/' % (self.name, self.number), 'name': self.name, 'number': self.number, 'manager': self.worker.worker_id, } self.job.sendWorkData(json.dumps(data)) self.job.sendWorkStatus(0, 100) if self.worker.hold_jobs_in_build: self._wait() self.log.debug("Build %s continuing" % self.unique) self.worker.lock.acquire() result = 'SUCCESS' if (('ZUUL_REF' in self.parameters) and self.worker.shouldFailTest(self.name, self.parameters['ZUUL_REF'])): result = 'FAILURE' if self.aborted: result = 'ABORTED' data = {'result': result} changes = None if 'ZUUL_CHANGE_IDS' in self.parameters: changes = self.parameters['ZUUL_CHANGE_IDS'] self.worker.build_history.append( BuildHistory(name=self.name, number=self.number, result=result, changes=changes, node=self.node, uuid=self.unique, description=self.description, pipeline=self.parameters['ZUUL_PIPELINE']) ) self.job.sendWorkComplete(json.dumps(data)) del self.worker.gearman_jobs[self.job.unique] self.worker.running_builds.remove(self) self.worker.lock.release() class FakeGearmanServer(gear.Server): def __init__(self, port=4730): self.hold_jobs_in_queue = False super(FakeGearmanServer, self).__init__(port) def getJobForConnection(self, connection, peek=False): for queue in [self.high_queue, self.normal_queue, self.low_queue]: for job in queue: if not hasattr(job, 'waiting'): if job.name.startswith('build:'): job.waiting = self.hold_jobs_in_queue else: job.waiting = False if job.waiting: continue if job.name in connection.functions: if not peek: queue.remove(job) connection.related_jobs[job.handle] = job job.worker_connection = connection job.running = True return job return None def release(self, regex=None): released = False qlen = (len(self.high_queue) + len(self.normal_queue) + len(self.low_queue)) self.log.debug("releasing queued job %s (%s)" % (regex, qlen)) for job in self.getQueue(): cmd, name = job.name.split(':') if cmd != 'build': continue if not regex or re.match(regex, name): self.log.debug("releasing queued job %s" % job.unique) job.waiting = False released = True else: self.log.debug("not releasing queued job %s" % job.unique) if released: self.wakeConnections() qlen = (len(self.high_queue) + len(self.normal_queue) + len(self.low_queue)) self.log.debug("done releasing queued jobs %s (%s)" % (regex, qlen)) class FakeJob(object): def __init__(self): pass def sendWorkStatus(self, *args, **kwargs): pass