diff --git a/.testr.conf b/.testr.conf index 3aa8cae..b951a05 100644 --- a/.testr.conf +++ b/.testr.conf @@ -6,3 +6,4 @@ test_command=OS_STDOUT_CAPTURE=${OS_STDOUT_CAPTURE:-1} \ test_id_option=--load-list $IDFILE test_list_option=--list +test_run_concurrency=echo 1 diff --git a/requirements.txt b/requirements.txt index d64b484..26e13b2 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,5 +1,5 @@ pbr>=0.5.21,<0.6 -gear +gear>=0.5.4,<1.0.0 python-swiftclient python-keystoneclient @@ -13,4 +13,4 @@ sphinxcontrib-seqdiag mysql-python requests -PyYAML +PyYAML>=3.1.0,<4.0.0 diff --git a/tests/fakes.py b/tests/fakes.py index 07bac13..5f78fbf 100644 --- a/tests/fakes.py +++ b/tests/fakes.py @@ -14,292 +14,6 @@ # License for the specific language governing permissions and limitations # under the License. -import gear -import json -import threading -import os -import re -import time - -from turbo_hipster.worker_manager import ZuulManager -from turbo_hipster.task_plugins.gate_real_db_upgrade.task import Runner\ - as RealDbUpgradeRunner - - -class FakeZuulManager(ZuulManager): - def __init__(self, config, tasks, test): - self.test = test - super(FakeZuulManager, self).__init__(config, tasks) - - def setup_gearman(self): - hostname = os.uname()[1] - self.gearman_worker = FakeWorker('turbo-hipster-manager-%s' - % hostname, self.test) - self.gearman_worker.addServer( - self.config['zuul_server']['gearman_host'], - self.config['zuul_server']['gearman_port'] - ) - self.gearman_worker.registerFunction( - 'stop:turbo-hipster-manager-%s' % hostname) - - -class FakeWorker(gear.Worker): - def __init__(self, worker_id, test): - super(FakeWorker, self).__init__(worker_id) - self.gearman_jobs = {} - self.build_history = [] - self.running_builds = [] - self.build_counter = 0 - self.fail_tests = {} - self.test = test - - self.hold_jobs_in_build = False - self.lock = threading.Lock() - self.__work_thread = threading.Thread(target=self.work) - self.__work_thread.daemon = True - self.__work_thread.start() - - def handleJob(self, job): - parts = job.name.split(":") - cmd = parts[0] - name = parts[1] - if len(parts) > 2: - node = parts[2] - else: - node = None - if cmd == 'build': - self.handleBuild(job, name, node) - elif cmd == 'stop': - self.handleStop(job, name) - elif cmd == 'set_description': - self.handleSetDescription(job, name) - - def handleBuild(self, job, name, node): - build = FakeBuild(self, job, self.build_counter, node) - job.build = build - self.gearman_jobs[job.unique] = job - self.build_counter += 1 - - self.running_builds.append(build) - build.start() - - def handleStop(self, job, name): - self.log.debug("handle stop") - parameters = json.loads(job.arguments) - name = parameters['name'] - number = parameters['number'] - for build in self.running_builds: - if build.name == name and build.number == number: - build.aborted = True - build.release() - job.sendWorkComplete() - return - job.sendWorkFail() - - def handleSetDescription(self, job, name): - self.log.debug("handle set description") - parameters = json.loads(job.arguments) - name = parameters['name'] - number = parameters['number'] - descr = parameters['html_description'] - for build in self.running_builds: - if build.name == name and build.number == number: - build.description = descr - job.sendWorkComplete() - return - for build in self.build_history: - if build.name == name and build.number == number: - build.description = descr - job.sendWorkComplete() - return - job.sendWorkFail() - - def work(self): - while self.running: - try: - job = self.getJob() - except gear.InterruptedError: - continue - try: - self.handleJob(job) - except: - self.log.exception("Worker exception:") - - def addFailTest(self, name, change): - l = self.fail_tests.get(name, []) - l.append(change) - self.fail_tests[name] = l - - def shouldFailTest(self, name, ref): - l = self.fail_tests.get(name, []) - for change in l: - if self.test.ref_has_change(ref, change): - return True - return False - - def release(self, regex=None): - builds = self.running_builds[:] - self.log.debug("releasing build %s (%s)" % (regex, - len(self.running_builds))) - for build in builds: - if not regex or re.match(regex, build.name): - self.log.debug("releasing build %s" % - (build.parameters['ZUUL_UUID'])) - build.release() - else: - self.log.debug("not releasing build %s" % - (build.parameters['ZUUL_UUID'])) - self.log.debug("done releasing builds %s (%s)" % - (regex, len(self.running_builds))) - - -class FakeRealDbUpgradeRunner(RealDbUpgradeRunner): - def __init__(self, global_config, plugin_config, worker_name, test): - self.test = test - super(FakeRealDbUpgradeRunner, self).__init__(global_config, - plugin_config, - worker_name) - - -class BuildHistory(object): - def __init__(self, **kw): - self.__dict__.update(kw) - - def __repr__(self): - return ("" % - (self.result, self.name, self.number, self.changes)) - - -class FakeBuild(threading.Thread): - def __init__(self, worker, job, number, node): - threading.Thread.__init__(self) - self.daemon = True - self.worker = worker - self.job = job - self.name = job.name.split(':')[1] - self.number = number - self.node = node - self.parameters = json.loads(job.arguments) - self.unique = self.parameters['ZUUL_UUID'] - self.wait_condition = threading.Condition() - self.waiting = False - self.aborted = False - self.created = time.time() - self.description = '' - - def release(self): - self.wait_condition.acquire() - self.wait_condition.notify() - self.waiting = False - self.log.debug("Build %s released" % self.unique) - self.wait_condition.release() - - def isWaiting(self): - self.wait_condition.acquire() - if self.waiting: - ret = True - else: - ret = False - self.wait_condition.release() - return ret - - def _wait(self): - self.wait_condition.acquire() - self.waiting = True - self.log.debug("Build %s waiting" % self.unique) - self.wait_condition.wait() - self.wait_condition.release() - - def run(self): - data = { - 'url': 'https://server/job/%s/%s/' % (self.name, self.number), - 'name': self.name, - 'number': self.number, - 'manager': self.worker.worker_id, - } - - self.job.sendWorkData(json.dumps(data)) - self.job.sendWorkStatus(0, 100) - - if self.worker.hold_jobs_in_build: - self._wait() - self.log.debug("Build %s continuing" % self.unique) - - self.worker.lock.acquire() - - result = 'SUCCESS' - if (('ZUUL_REF' in self.parameters) and - self.worker.shouldFailTest(self.name, - self.parameters['ZUUL_REF'])): - result = 'FAILURE' - if self.aborted: - result = 'ABORTED' - - data = {'result': result} - changes = None - if 'ZUUL_CHANGE_IDS' in self.parameters: - changes = self.parameters['ZUUL_CHANGE_IDS'] - - self.worker.build_history.append( - BuildHistory(name=self.name, number=self.number, - result=result, changes=changes, node=self.node, - uuid=self.unique, description=self.description, - pipeline=self.parameters['ZUUL_PIPELINE']) - ) - - self.job.sendWorkComplete(json.dumps(data)) - del self.worker.gearman_jobs[self.job.unique] - self.worker.running_builds.remove(self) - self.worker.lock.release() - - -class FakeGearmanServer(gear.Server): - def __init__(self, port=4730): - self.hold_jobs_in_queue = False - super(FakeGearmanServer, self).__init__(port) - - def getJobForConnection(self, connection, peek=False): - for queue in [self.high_queue, self.normal_queue, self.low_queue]: - for job in queue: - if not hasattr(job, 'waiting'): - if job.name.startswith('build:'): - job.waiting = self.hold_jobs_in_queue - else: - job.waiting = False - if job.waiting: - continue - if job.name in connection.functions: - if not peek: - queue.remove(job) - connection.related_jobs[job.handle] = job - job.worker_connection = connection - job.running = True - return job - return None - - def release(self, regex=None): - released = False - qlen = (len(self.high_queue) + len(self.normal_queue) + - len(self.low_queue)) - self.log.debug("releasing queued job %s (%s)" % (regex, qlen)) - for job in self.getQueue(): - cmd, name = job.name.split(':') - if cmd != 'build': - continue - if not regex or re.match(regex, name): - self.log.debug("releasing queued job %s" % - job.unique) - job.waiting = False - released = True - else: - self.log.debug("not releasing queued job %s" % - job.unique) - if released: - self.wakeConnections() - qlen = (len(self.high_queue) + len(self.normal_queue) + - len(self.low_queue)) - self.log.debug("done releasing queued jobs %s (%s)" % (regex, qlen)) - class FakeJob(object): def __init__(self): diff --git a/tests/fixtures/default-config.json b/tests/fixtures/default-config.json new file mode 100644 index 0000000..e33ea42 --- /dev/null +++ b/tests/fixtures/default-config.json @@ -0,0 +1,35 @@ +{ + "zuul_server": { + "gerrit_site": "http://review.openstack.org", + "zuul_site": "http://localhost", + "git_origin": "git://git.openstack.org/", + "gearman_host": "localhost", + "gearman_port": 0 + }, + "debug_log": "/home/josh/var/log/turbo-hipster/debug.log", + "jobs_working_dir": "/home/josh/var/lib/turbo-hipster/jobs", + "git_working_dir": "/home/josh/var/lib/turbo-hipster/git", + "pip_download_cache": "/home/josh/var/cache/pip", + "plugins": [ + { + "name": "gate_real_db_upgrade", + "datasets_dir": "/var/lib/turbo-hipster/datasets_devstack_131007", + "function": "build:gate-real-db-upgrade_nova_mysql_devstack_131007" + }, + { + "name": "gate_real_db_upgrade", + "datasets_dir": "/var/lib/turbo-hipster/datasets_user_001", + "function": "build:gate-real-db-upgrade_nova_mysql_user_001" + }, + { + "name": "shell_script", + "function": "build:do_something_shelly" + } + ], + "publish_logs": + { + "type": "local", + "path": "/home/josh/var/www/results/", + "prepend_url": "http://localhost/results/" + } +} diff --git a/tests/test_worker_manager.py b/tests/test_worker_manager.py index ae2d5b5..1f3de5e 100644 --- a/tests/test_worker_manager.py +++ b/tests/test_worker_manager.py @@ -15,58 +15,171 @@ # under the License. +import gear +import logging import os import testtools import time import yaml -from fakes import FakeZuulManager, FakeGearmanServer,\ - FakeRealDbUpgradeRunner -CONFIG_DIR = os.path.join(os.path.dirname(__file__), 'etc') -with open(os.path.join(CONFIG_DIR, 'config.yaml'), 'r') as config_stream: - CONFIG = yaml.safe_load(config_stream) +import turbo_hipster.task_plugins.gate_real_db_upgrade.task +import turbo_hipster.worker_server + +logging.basicConfig(level=logging.DEBUG, + format='%(asctime)s %(name)-32s ' + '%(levelname)-8s %(message)s') -class TestZuulManager(testtools.TestCase): +class TestWithGearman(testtools.TestCase): + + log = logging.getLogger("TestWithGearman") + def setUp(self): - super(TestZuulManager, self).setUp() - self.config = CONFIG - self.gearman_server = FakeGearmanServer( - self.config['zuul_server']['gearman_port']) + super(TestWithGearman, self).setUp() + + self.config = [] + self._load_config_fixture() + + self.gearman_server = gear.Server(0) + + # Grab the port so the clients can connect to it self.config['zuul_server']['gearman_port'] = self.gearman_server.port - self.task = FakeRealDbUpgradeRunner(self.config, - self.config['plugins'][0], - 'test-worker-1', self) - self.tasks = dict(FakeRealDbUpgradeRunner_worker=self.task) - - self.gearman_manager = FakeZuulManager(self.config, self.tasks, self) + self.worker_server = turbo_hipster.worker_server.Server(self.config) + self.worker_server.setup_logging() + self.worker_server.start() + t0 = time.time() + while time.time() - t0 < 10: + if self.worker_server.services_started: + break + time.sleep(0.01) + if not self.worker_server.services_started: + self.fail("Failed to start worker_service services") def tearDown(self): - super(TestZuulManager, self).tearDown() + self.worker_server.stop() self.gearman_server.shutdown() + super(TestWithGearman, self).tearDown() - def test_manager_function_registered(self): - """ Check the manager is set up correctly and registered with the - gearman server with an appropriate function """ + def _load_config_fixture(self, config_name='default-config.json'): + config_dir = os.path.join(os.path.dirname(__file__), 'fixtures') + with open(os.path.join(config_dir, config_name), 'r') as config_stream: + self.config = yaml.safe_load(config_stream) - # Give the gearman server up to 5 seconds to register the function - for x in range(500): - time.sleep(0.01) - if len(self.gearman_server.functions) > 0: + +class TestWorkerServer(TestWithGearman): + def test_plugins_load(self): + "Test the configured plugins are loaded" + + self.assertFalse(self.worker_server.stopped()) + self.assertEqual(3, len(self.worker_server.plugins)) + + plugin0_config = { + "name": "gate_real_db_upgrade", + "datasets_dir": "/var/lib/turbo-hipster/datasets_devstack_131007", + "function": "build:gate-real-db-upgrade_nova_mysql_devstack_131007" + } + plugin1_config = { + "name": "gate_real_db_upgrade", + "datasets_dir": "/var/lib/turbo-hipster/datasets_user_001", + "function": "build:gate-real-db-upgrade_nova_mysql_user_001" + } + plugin2_config = { + "name": "shell_script", + "function": "build:do_something_shelly" + } + + self.assertEqual(plugin0_config, + self.worker_server.plugins[0]['plugin_config']) + self.assertEqual( + 'turbo_hipster.task_plugins.gate_real_db_upgrade.task', + self.worker_server.plugins[0]['module'].__name__ + ) + + self.assertEqual(plugin1_config, + self.worker_server.plugins[1]['plugin_config']) + self.assertEqual( + 'turbo_hipster.task_plugins.gate_real_db_upgrade.task', + self.worker_server.plugins[1]['module'].__name__ + ) + + self.assertEqual(plugin2_config, + self.worker_server.plugins[2]['plugin_config']) + self.assertEqual( + 'turbo_hipster.task_plugins.shell_script.task', + self.worker_server.plugins[2]['module'].__name__ + ) + + def test_zuul_client_started(self): + "Test the zuul client has been started" + self.assertFalse(self.worker_server.zuul_client.stopped()) + + def test_zuul_manager_started(self): + "Test the zuul manager has been started" + self.assertFalse(self.worker_server.zuul_manager.stopped()) + + +class TestZuulClient(TestWithGearman): + def test_setup_gearman_worker(self): + "Make sure the client is registered as a worker with gearman" + pass + + def test_registered_functions(self): + "Test the correct functions are registered with gearman" + # The client should have all of the functions defined in the config + # registered with gearman + + # We need to wait for all the functions to register with the server.. + # We'll give it up to 10seconds to do so + t0 = time.time() + failed = True + while time.time() - t0 < 10: + # There should be 4 functions. 1 for each plugin + 1 for the + # manager + if len(self.gearman_server.functions) == 4: + failed = False break + time.sleep(0.01) + if failed: + self.log.debug(self.gearman_server.functions) + self.fail("The correct number of functions haven't registered with" + " gearman") + + self.assertIn('build:gate-real-db-upgrade_nova_mysql_devstack_131007', + self.gearman_server.functions) + self.assertIn('build:gate-real-db-upgrade_nova_mysql_user_001', + self.gearman_server.functions) + self.assertIn('build:do_something_shelly', + self.gearman_server.functions) + + def test_waiting_for_job(self): + "Make sure the client waits for jobs as expected" + pass + + def test_stop(self): + "Test sending a stop signal to the client exists correctly" + pass + + +class TestZuulManager(TestWithGearman): + def test_registered_functions(self): + "Test the correct functions are registered with gearman" + # We need to wait for all the functions to register with the server.. + # We'll give it up to 10seconds to do so + t0 = time.time() + failed = True + while time.time() - t0 < 10: + # There should be 4 functions. 1 for each plugin + 1 for the + # manager + if len(self.gearman_server.functions) == 4: + failed = False + break + time.sleep(0.01) + if failed: + self.log.debug(self.gearman_server.functions) + self.fail("The correct number of functions haven't registered with" + " gearman") hostname = os.uname()[1] - function_name = 'stop:turbo-hipster-manager-%s' % hostname - - self.assertIn(function_name, self.gearman_server.functions) - - def test_task_registered_with_manager(self): - """ Check the FakeRealDbUpgradeRunner_worker task is registered """ - self.assertIn('FakeRealDbUpgradeRunner_worker', - self.gearman_manager.tasks.keys()) - - def test_stop_task(self): - """ Check that the manager successfully stops a task when requested - """ - pass + self.assertIn('stop:turbo-hipster-manager-%s' % hostname, + self.gearman_server.functions) diff --git a/tox.ini b/tox.ini index f0e38f0..99af832 100644 --- a/tox.ini +++ b/tox.ini @@ -4,7 +4,7 @@ # and then run "tox" from this directory. [tox] -envlist = pep8, pyflakes, py27 +envlist = pep8, py27 [testenv] setenv = VIRTUAL_ENV={envdir} @@ -28,9 +28,6 @@ commands = flake8 commands = python setup.py testr --coverage -[testenv:pyflakes] -commands = pyflakes turbo_hipster setup.py - [testenv:venv] commands = {posargs} diff --git a/turbo_hipster/cmd/server.py b/turbo_hipster/cmd/server.py index 9cbf4e3..6dc5e37 100644 --- a/turbo_hipster/cmd/server.py +++ b/turbo_hipster/cmd/server.py @@ -35,7 +35,13 @@ def main(args): with open(args.config, 'r') as config_stream: config = yaml.safe_load(config_stream) + if not config['debug_log']: + # NOTE(mikal): debug logging _must_ be enabled for the log writing + # in lib.utils.execute_to_log to work correctly. + raise Exception('Debug log not configured') + server = worker_server.Server(config) + server.setup_logging(config['debug_log']) def term_handler(signum, frame): server.stop() diff --git a/turbo_hipster/worker_manager.py b/turbo_hipster/worker_manager.py index 3923d35..a220425 100644 --- a/turbo_hipster/worker_manager.py +++ b/turbo_hipster/worker_manager.py @@ -57,6 +57,7 @@ class ZuulManager(threading.Thread): self._stop.set() # Unblock gearman self.log.debug("Telling gearman to stop waiting for jobs") + self.gearman_worker.stopWaitingForJobs() self.gearman_worker.shutdown() def stopped(self): @@ -76,8 +77,10 @@ class ZuulManager(threading.Thread): self.current_step = 0 job = self.gearman_worker.getJob() self._handle_job(job) + except gear.InterruptedError: + self.log.debug('We were asked to stop waiting for jobs') except: - logging.exception('Exception retrieving log event.') + self.log.exception('Unknown exception waiting for job.') self.log.debug("Finished manager thread") def _handle_job(self, job): @@ -137,6 +140,7 @@ class ZuulClient(threading.Thread): task.stop_working() # Unblock gearman self.log.debug("Telling gearman to stop waiting for jobs") + self.gearman_worker.stopWaitingForJobs() self.gearman_worker.shutdown() def stopped(self): @@ -155,8 +159,10 @@ class ZuulClient(threading.Thread): self.log.debug("Waiting for job") self.job = self.gearman_worker.getJob() self._handle_job() + except gear.InterruptedError: + self.log.debug('We were asked to stop waiting for jobs') except: - self.log.exception('Exception waiting for job.') + self.log.exception('Unknown exception waiting for job.') self.log.debug("Finished client thread") def _handle_job(self): diff --git a/turbo_hipster/worker_server.py b/turbo_hipster/worker_server.py index a0f3cf2..66225ff 100755 --- a/turbo_hipster/worker_server.py +++ b/turbo_hipster/worker_server.py @@ -78,17 +78,14 @@ class Server(threading.Thread): self.log.warn("conf_d parameter '%s' isn't a directory" % (self.config["conf_d"])) - def setup_logging(self): - if not self.debug_log: - raise Exception('Debug log not configured') - - # NOTE(mikal): debug logging _must_ be enabled for the log writing - # in lib.utils.execute_to_log to work correctly. - if not os.path.isdir(os.path.dirname(self.debug_log)): - os.makedirs(os.path.dirname(self.debug_log)) + def setup_logging(self, log_file=None): + if log_file: + if not os.path.isdir(os.path.dirname(log_file)): + os.makedirs(os.path.dirname(log_file)) logging.basicConfig(format='%(asctime)s %(name)-32s ' '%(levelname)-8s %(message)s', - filename=self.debug_log, level=logging.DEBUG) + filename=log_file, + level=logging.DEBUG) def load_plugins(self): """ Load the available plugins from task_plugins """