diff --git a/tests/etc/default-config.yaml b/tests/etc/default-config.yaml new file mode 100644 index 0000000..0412e01 --- /dev/null +++ b/tests/etc/default-config.yaml @@ -0,0 +1,33 @@ +zuul_server: + gerrit_site: http://review.openstack.org + zuul_site: http://119.9.13.90 + git_origin: git://git.openstack.org/ + gearman_host: localhost + gearman_port: 0 + +debug_log: /var/log/turbo-hipster/debug.log +jobs_working_dir: /var/lib/turbo-hipster/jobs +git_working_dir: /var/lib/turbo-hipster/git +pip_download_cache: /var/cache/pip + +plugins: + - name: gate_real_db_upgrade + datasets_dir: /var/lib/turbo-hipster/datasets_devstack_131007 + function: build:gate-real-db-upgrade_nova_mysql_devstack_131007 + + - name: gate_real_db_upgrade + datasets_dir: /var/lib/turbo-hipster/datasets_user_001 + function: build:gate-real-db-upgrade_nova_mysql_user_001 + + - name: shell_script + function: build:do_something_shelly + +publish_logs: + type: swift + authurl: https://identity.api.rackspacecloud.com/v2.0/ + tenant: XXXX + user: XXXXXX + password: XXXXXX + container: XXXXXX + region: SYD + prepend_url: http://www.rcbops.com/turbo_hipster/results/ diff --git a/tests/etc/shutdown-config.yaml b/tests/etc/shutdown-config.yaml new file mode 100644 index 0000000..9175873 --- /dev/null +++ b/tests/etc/shutdown-config.yaml @@ -0,0 +1,25 @@ +zuul_server: + gerrit_site: http://review.openstack.org + zuul_site: http://119.9.13.90 + git_origin: git://git.openstack.org/ + gearman_host: localhost + gearman_port: 0 + +debug_log: /var/log/turbo-hipster/debug.log +jobs_working_dir: /var/lib/turbo-hipster/jobs +git_working_dir: /var/lib/turbo-hipster/git +pip_download_cache: /var/cache/pip + +plugins: + - name: shell_script + function: build:demo_job_clean + shell_script: /dev/null + - name: shell_script + function: build:demo_job_dirty + shell_script: /dev/null + shutdown-th: true + +publish_logs: + type: local + path: /var/lib/turbo_hipster/logs + prepend_url: http://mylogserver/ \ No newline at end of file diff --git a/tests/fakes.py b/tests/fakes.py index 5f78fbf..1b377cd 100644 --- a/tests/fakes.py +++ b/tests/fakes.py @@ -14,6 +14,11 @@ # License for the specific language governing permissions and limitations # under the License. +import gear +import json +import time +import uuid + class FakeJob(object): def __init__(self): @@ -21,3 +26,43 @@ class FakeJob(object): def sendWorkStatus(self, *args, **kwargs): pass + + +class FakeZuul(object): + """A fake zuul/gearman client to request work from gearman and check + results""" + def __init__(self, server, port): + self.gearman = gear.Client('FakeZuul') + self.gearman.addServer(server, port) + self.gearman.waitForServer() + self.job = None + + def make_zuul_data(self, data={}): + defaults = { + 'ZUUL_UUID': str(uuid.uuid1()), + 'ZUUL_REF': 'a', + 'ZUUL_COMMIT': 'a', + 'ZUUL_PROJECT': 'a', + 'ZUUL_PIPELINE': 'a', + 'ZUUL_URL': 'http://localhost', + 'BASE_LOG_PATH': '56/123456/8', + 'LOG_PATH': '56/123456/8/check/job_name/uuid123' + } + defaults.update(data) + return defaults + + def submit_job(self, name, data): + if not self.job: + self.job = gear.Job(name, + json.dumps(data), + unique=str(time.time())) + self.gearman.submitJob(self.job) + else: + raise Exception('A job already exists in self.job') + + return self.job + + def wait_for_completion(self): + if self.job: + while not self.job.complete: + time.sleep(0.1) diff --git a/tests/fixtures/default-config.json b/tests/fixtures/default-config.json deleted file mode 100644 index e33ea42..0000000 --- a/tests/fixtures/default-config.json +++ /dev/null @@ -1,35 +0,0 @@ -{ - "zuul_server": { - "gerrit_site": "http://review.openstack.org", - "zuul_site": "http://localhost", - "git_origin": "git://git.openstack.org/", - "gearman_host": "localhost", - "gearman_port": 0 - }, - "debug_log": "/home/josh/var/log/turbo-hipster/debug.log", - "jobs_working_dir": "/home/josh/var/lib/turbo-hipster/jobs", - "git_working_dir": "/home/josh/var/lib/turbo-hipster/git", - "pip_download_cache": "/home/josh/var/cache/pip", - "plugins": [ - { - "name": "gate_real_db_upgrade", - "datasets_dir": "/var/lib/turbo-hipster/datasets_devstack_131007", - "function": "build:gate-real-db-upgrade_nova_mysql_devstack_131007" - }, - { - "name": "gate_real_db_upgrade", - "datasets_dir": "/var/lib/turbo-hipster/datasets_user_001", - "function": "build:gate-real-db-upgrade_nova_mysql_user_001" - }, - { - "name": "shell_script", - "function": "build:do_something_shelly" - } - ], - "publish_logs": - { - "type": "local", - "path": "/home/josh/var/www/results/", - "prepend_url": "http://localhost/results/" - } -} diff --git a/tests/test_worker_manager.py b/tests/test_worker_manager.py index 1f3de5e..ff0733f 100644 --- a/tests/test_worker_manager.py +++ b/tests/test_worker_manager.py @@ -15,6 +15,7 @@ # under the License. +import fixtures import gear import logging import os @@ -25,6 +26,8 @@ import yaml import turbo_hipster.task_plugins.gate_real_db_upgrade.task import turbo_hipster.worker_server +import fakes + logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(name)-32s ' '%(levelname)-8s %(message)s') @@ -36,12 +39,13 @@ class TestWithGearman(testtools.TestCase): def setUp(self): super(TestWithGearman, self).setUp() - - self.config = [] - self._load_config_fixture() - + self.config = None + self.worker_server = None self.gearman_server = gear.Server(0) + def start_server(self): + if not self.config: + self._load_config_fixture() # Grab the port so the clients can connect to it self.config['zuul_server']['gearman_port'] = self.gearman_server.port @@ -57,20 +61,39 @@ class TestWithGearman(testtools.TestCase): self.fail("Failed to start worker_service services") def tearDown(self): - self.worker_server.stop() + if self.worker_server and not self.worker_server.stopped(): + self.worker_server.shutdown() self.gearman_server.shutdown() super(TestWithGearman, self).tearDown() - def _load_config_fixture(self, config_name='default-config.json'): - config_dir = os.path.join(os.path.dirname(__file__), 'fixtures') + def _load_config_fixture(self, config_name='default-config.yaml'): + config_dir = os.path.join(os.path.dirname(__file__), 'etc') with open(os.path.join(config_dir, config_name), 'r') as config_stream: self.config = yaml.safe_load(config_stream) + # Set all of the working dirs etc to a writeable temp dir + temp_path = self.useFixture(fixtures.TempDir()).path + for config_dir in ['debug_log', 'jobs_working_dir', 'git_working_dir', + 'pip_download_cache']: + if config_dir in self.config: + if self.config[config_dir][0] == '/': + self.config[config_dir] = self.config[config_dir][1:] + self.config[config_dir] = os.path.join(temp_path, + self.config[config_dir]) + if self.config['publish_logs']['type'] == 'local': + if self.config['publish_logs']['path'][0] == '/': + self.config['publish_logs']['path'] = \ + self.config['publish_logs']['path'][1:] + self.config['publish_logs']['path'] = os.path.join( + temp_path, self.config[config_dir]) + class TestWorkerServer(TestWithGearman): def test_plugins_load(self): "Test the configured plugins are loaded" + self.start_server() + self.assertFalse(self.worker_server.stopped()) self.assertEqual(3, len(self.worker_server.plugins)) @@ -112,10 +135,12 @@ class TestWorkerServer(TestWithGearman): def test_zuul_client_started(self): "Test the zuul client has been started" + self.start_server() self.assertFalse(self.worker_server.zuul_client.stopped()) def test_zuul_manager_started(self): "Test the zuul manager has been started" + self.start_server() self.assertFalse(self.worker_server.zuul_manager.stopped()) @@ -126,6 +151,9 @@ class TestZuulClient(TestWithGearman): def test_registered_functions(self): "Test the correct functions are registered with gearman" + + self.start_server() + # The client should have all of the functions defined in the config # registered with gearman @@ -160,10 +188,36 @@ class TestZuulClient(TestWithGearman): "Test sending a stop signal to the client exists correctly" pass + def test_job_can_shutdown_th(self): + self._load_config_fixture('shutdown-config.yaml') + self.start_server() + zuul = fakes.FakeZuul(self.config['zuul_server']['gearman_host'], + self.config['zuul_server']['gearman_port']) + + # First check we can run a job that /doesn't/ shut down turbo-hipster + data_req = zuul.make_zuul_data() + zuul.submit_job('build:demo_job_clean', data_req) + zuul.wait_for_completion() + self.assertTrue(zuul.job.complete) + self.assertFalse(self.worker_server.stopped()) + + # Now run a job that leaves the environment dirty and /should/ shut + # down turbo-hipster + zuul.job = None + zuul.submit_job('build:demo_job_dirty', data_req) + zuul.wait_for_completion() + self.assertTrue(zuul.job.complete) + # Give the server a second to shutdown + time.sleep(1) + self.assertTrue(self.worker_server.stopped()) + class TestZuulManager(TestWithGearman): def test_registered_functions(self): "Test the correct functions are registered with gearman" + + self.start_server() + # We need to wait for all the functions to register with the server.. # We'll give it up to 10seconds to do so t0 = time.time() diff --git a/turbo_hipster/cmd/server.py b/turbo_hipster/cmd/server.py index 6dc5e37..2e4df47 100644 --- a/turbo_hipster/cmd/server.py +++ b/turbo_hipster/cmd/server.py @@ -44,7 +44,7 @@ def main(args): server.setup_logging(config['debug_log']) def term_handler(signum, frame): - server.stop() + server.shutdown() signal.signal(signal.SIGTERM, term_handler) if args.background: @@ -56,7 +56,7 @@ def main(args): signal.pause() except KeyboardInterrupt: print "Ctrl + C: asking tasks to exit nicely...\n" - server.stop() + server.shutdown() if __name__ == '__main__': diff --git a/turbo_hipster/lib/models.py b/turbo_hipster/lib/models.py index cd73a73..f140b6c 100644 --- a/turbo_hipster/lib/models.py +++ b/turbo_hipster/lib/models.py @@ -26,8 +26,8 @@ class Task(object): """ A base object for running a job (aka Task) """ log = logging.getLogger("lib.models.Task") - def __init__(self, global_config, plugin_config, job_name): - self.global_config = global_config + def __init__(self, worker_server, plugin_config, job_name): + self.worker_server = worker_server self.plugin_config = plugin_config self.job_name = job_name self._reset() @@ -125,10 +125,10 @@ class Task(object): class ShellTask(Task): log = logging.getLogger("lib.models.ShellTask") - def __init__(self, global_config, plugin_config, job_name): - super(ShellTask, self).__init__(global_config, plugin_config, job_name) + def __init__(self, worker_server, plugin_config, job_name): + super(ShellTask, self).__init__(worker_server, plugin_config, job_name) # Define the number of steps we will do to determine our progress. - self.total_steps = 5 + self.total_steps = 6 def _reset(self): super(ShellTask, self)._reset() @@ -137,21 +137,24 @@ class ShellTask(Task): self.shell_output_log = None def do_job_steps(self): - # Step 1: Prep job working dir + self.log.info('Step 1: Prep job working dir') self._prep_working_dir() - # Step 2: Checkout updates from git + self.log.info('Step 2: Checkout updates from git') self._grab_patchset(self.job_arguments) - # Step 3: Run shell script + self.log.info('Step 3: Run shell script') self._execute_script() - # Step 4: Analyse logs for errors + self.log.info('Step 4: Analyse logs for errors') self._parse_and_check_results() - # Step 5: handle the results (and upload etc) + self.log.info('Step 5: handle the results (and upload etc)') self._handle_results() + self.log.info('Step 6: Handle extra actions such as shutting down') + self._handle_cleanup() + @common.task_step def _prep_working_dir(self): self.job_identifier = utils.determine_job_identifier( @@ -160,7 +163,7 @@ class ShellTask(Task): self.job.unique ) self.job_working_dir = os.path.join( - self.global_config['jobs_working_dir'], + self.worker_server.config['jobs_working_dir'], self.job_identifier ) self.shell_output_log = os.path.join( @@ -176,7 +179,7 @@ class ShellTask(Task): """ Checkout the reference into config['git_working_dir'] """ self.log.debug("Grab the patchset we want to test against") - local_path = os.path.join(self.global_config['git_working_dir'], + local_path = os.path.join(self.worker_server.config['git_working_dir'], self.job_name, job_args['ZUUL_PROJECT']) if not os.path.exists(local_path): os.makedirs(local_path) @@ -185,8 +188,8 @@ class ShellTask(Task): cmd = os.path.join(os.path.join(os.path.dirname(__file__), 'gerrit-git-prep.sh')) - cmd += ' ' + self.global_config['zuul_server']['gerrit_site'] - cmd += ' ' + self.global_config['zuul_server']['zuul_site'] + cmd += ' ' + self.worker_server.config['zuul_server']['gerrit_site'] + cmd += ' ' + self.worker_server.config['zuul_server']['zuul_site'] utils.execute_to_log(cmd, self.shell_output_log, env=git_args, cwd=local_path) self.git_path = local_path @@ -223,10 +226,10 @@ class ShellTask(Task): self.log.debug("Process the resulting files (upload/push)") - if 'publish_logs' in self.global_config: - index_url = utils.push_file(self.job_identifier, - self.shell_output_log, - self.global_config['publish_logs']) + if 'publish_logs' in self.worker_server.config: + index_url = utils.push_file( + self.job_identifier, self.shell_output_log, + self.worker_server.config['publish_logs']) self.log.debug("Index URL found at %s" % index_url) self.work_data['url'] = index_url @@ -234,3 +237,11 @@ class ShellTask(Task): # Upload to zuul's url as instructed utils.zuul_swift_upload(self.job_working_dir, self.job_arguments) self.work_data['url'] = self.job_identifier + + @common.task_step + def _handle_cleanup(self): + """Handle and cleanup functions. Shutdown if requested to so that no + further jobs are ran if the environment is dirty.""" + if ('shutdown-th' in self.plugin_config and + self.plugin_config['shutdown-th']): + self.worker_server.shutdown_gracefully() diff --git a/turbo_hipster/lib/utils.py b/turbo_hipster/lib/utils.py index 85e2183..fa295af 100644 --- a/turbo_hipster/lib/utils.py +++ b/turbo_hipster/lib/utils.py @@ -256,17 +256,8 @@ def scp_push_file(results_set_name, file_path, local_config): def determine_job_identifier(zuul_arguments, job, unique): - if 'build:' in job: - job = job.split('build:')[1] - - path = os.path.join(zuul_arguments['ZUUL_CHANGE'][:2], - zuul_arguments['ZUUL_CHANGE'], - zuul_arguments['ZUUL_PATCHSET'], - zuul_arguments['ZUUL_PIPELINE'], - job, - unique[:7]) - log.info('Converted args: %s, job: %s and unique: %s to %s' - % (zuul_arguments, job, unique, path)) + # use new determined path from zuul + path = zuul_arguments['LOG_PATH'] return path diff --git a/turbo_hipster/task_plugins/gate_real_db_upgrade/task.py b/turbo_hipster/task_plugins/gate_real_db_upgrade/task.py index 668ed88..21946da 100644 --- a/turbo_hipster/task_plugins/gate_real_db_upgrade/task.py +++ b/turbo_hipster/task_plugins/gate_real_db_upgrade/task.py @@ -40,15 +40,15 @@ class Runner(models.ShellTask): log = logging.getLogger("task_plugins.gate_real_db_upgrade.task.Runner") - def __init__(self, global_config, plugin_config, job_name): - super(Runner, self).__init__(global_config, plugin_config, job_name) + def __init__(self, worker_server, plugin_config, job_name): + super(Runner, self).__init__(worker_server, plugin_config, job_name) # Set up the runner worker self.datasets = [] self.job_datasets = [] # Define the number of steps we will do to determine our progress. - self.total_steps = 6 + self.total_steps += 1 def do_job_steps(self): # Step 1: Figure out which datasets to run @@ -74,7 +74,7 @@ class Runner(models.ShellTask): self.job.unique ) dataset['job_log_file_path'] = os.path.join( - self.global_config['jobs_working_dir'], + self.worker_server.config['jobs_working_dir'], dataset['determined_path'], dataset['name'] + '.log' ) @@ -102,7 +102,7 @@ class Runner(models.ShellTask): self.log.debug("Process the resulting files (upload/push)") index_url = handle_results.generate_push_results( self.job_datasets, - self.global_config['publish_logs'] + self.worker_server.config['publish_logs'] ) self.log.debug("Index URL found at %s" % index_url) self.work_data['url'] = index_url @@ -162,8 +162,8 @@ class Runner(models.ShellTask): for dataset in self.job_datasets: cmd = os.path.join(os.path.join(os.path.dirname(__file__), - (self.global_config['baseline_command'] - % self.global_config['flavor']))) + (self.worker_server.config['baseline_command'] + % self.worker_server.config['flavor']))) rc = utils.execute_to_log( cmd, dataset['job_log_file_path'], @@ -187,7 +187,7 @@ class Runner(models.ShellTask): % { 'unique_id': self.job.unique, 'job_working_dir': os.path.join( - self.global_config['jobs_working_dir'], + self.worker_server.config['jobs_working_dir'], dataset['determined_path'] ), 'git_path': self.git_path, @@ -202,7 +202,8 @@ class Runner(models.ShellTask): dataset['dataset_dir'], dataset['config']['logging_conf'] ), - 'pip_cache_dir': self.global_config['pip_download_cache'] + 'pip_cache_dir': + self.worker_server.config['pip_download_cache'] } ) @@ -210,13 +211,13 @@ class Runner(models.ShellTask): syslog = '/var/log/syslog' sqlslo = '/var/log/mysql/slow-queries.log' sqlerr = '/var/log/mysql/error.log' - if 'logs' in self.global_config: - if 'syslog' in self.global_config['logs']: - syslog = self.global_config['logs']['syslog'] - if 'sqlslo' in self.global_config['logs']: - sqlslo = self.global_config['logs']['sqlslo'] - if 'sqlerr' in self.global_config['logs']: - sqlerr = self.global_config['logs']['sqlerr'] + if 'logs' in self.worker_server.config: + if 'syslog' in self.worker_server.config['logs']: + syslog = self.worker_server.config['logs']['syslog'] + if 'sqlslo' in self.worker_server.config['logs']: + sqlslo = self.worker_server.config['logs']['sqlslo'] + if 'sqlerr' in self.worker_server.config['logs']: + sqlerr = self.worker_server.config['logs']['sqlerr'] rc = utils.execute_to_log( cmd, diff --git a/turbo_hipster/worker_manager.py b/turbo_hipster/worker_manager.py index a220425..40a1f35 100644 --- a/turbo_hipster/worker_manager.py +++ b/turbo_hipster/worker_manager.py @@ -18,6 +18,7 @@ import json import logging import os import threading +import time class ZuulManager(threading.Thread): @@ -30,10 +31,13 @@ class ZuulManager(threading.Thread): log = logging.getLogger("worker_manager.ZuulManager") - def __init__(self, config, tasks): + def __init__(self, worker_server, tasks): super(ZuulManager, self).__init__() self._stop = threading.Event() - self.config = config + self.stopping = False + self.running = False + + self.worker_server = worker_server self.tasks = tasks self.gearman_worker = None @@ -44,8 +48,8 @@ class ZuulManager(threading.Thread): self.gearman_worker = gear.Worker('turbo-hipster-manager-%s' % hostname) self.gearman_worker.addServer( - self.config['zuul_server']['gearman_host'], - self.config['zuul_server']['gearman_port'] + self.worker_server.config['zuul_server']['gearman_host'], + self.worker_server.config['zuul_server']['gearman_port'] ) def register_functions(self): @@ -53,6 +57,15 @@ class ZuulManager(threading.Thread): self.gearman_worker.registerFunction( 'stop:turbo-hipster-manager-%s' % hostname) + def stop_gracefully(self): + self.stopping = True + self.gearman_worker.stopWaitingForJobs() + while self.running: + self.log.debug('waiting to finish') + time.sleep(0.1) + self._stop.set() + self.gearman_worker.shutdown() + def stop(self): self._stop.set() # Unblock gearman @@ -64,7 +77,8 @@ class ZuulManager(threading.Thread): return self._stop.isSet() def run(self): - while not self.stopped(): + while not self.stopped() and not self.stopping: + self.running = True try: # gearman_worker.getJob() blocks until a job is available self.log.debug("Waiting for server") @@ -81,6 +95,7 @@ class ZuulManager(threading.Thread): self.log.debug('We were asked to stop waiting for jobs') except: self.log.exception('Unknown exception waiting for job.') + self.running = False self.log.debug("Finished manager thread") def _handle_job(self, job): @@ -101,12 +116,13 @@ class ZuulClient(threading.Thread): log = logging.getLogger("worker_manager.ZuulClient") - def __init__(self, global_config, worker_name): + def __init__(self, worker_server): super(ZuulClient, self).__init__() self._stop = threading.Event() - self.global_config = global_config + self.stopping = False + self.running = False - self.worker_name = worker_name + self.worker_server = worker_server # Set up the runner worker self.gearman_worker = None @@ -118,10 +134,10 @@ class ZuulClient(threading.Thread): def setup_gearman(self): self.log.debug("Set up gearman worker") - self.gearman_worker = gear.Worker(self.worker_name) + self.gearman_worker = gear.Worker(self.worker_server.worker_name) self.gearman_worker.addServer( - self.global_config['zuul_server']['gearman_host'], - self.global_config['zuul_server']['gearman_port'] + self.worker_server.config['zuul_server']['gearman_host'], + self.worker_server.config['zuul_server']['gearman_port'] ) def register_functions(self): @@ -143,11 +159,20 @@ class ZuulClient(threading.Thread): self.gearman_worker.stopWaitingForJobs() self.gearman_worker.shutdown() + def stop_gracefully(self): + self.stopping = True + self.gearman_worker.stopWaitingForJobs() + while self.running: + time.sleep(0.1) + self._stop.set() + self.gearman_worker.shutdown() + def stopped(self): return self._stop.isSet() def run(self): - while not self.stopped(): + while not self.stopped() and not self.stopping: + self.running = True try: # gearman_worker.getJob() blocks until a job is available self.log.debug("Waiting for server") @@ -163,6 +188,7 @@ class ZuulClient(threading.Thread): self.log.debug('We were asked to stop waiting for jobs') except: self.log.exception('Unknown exception waiting for job.') + self.running = False self.log.debug("Finished client thread") def _handle_job(self): diff --git a/turbo_hipster/worker_server.py b/turbo_hipster/worker_server.py old mode 100755 new mode 100644 index 66225ff..35c7882 --- a/turbo_hipster/worker_server.py +++ b/turbo_hipster/worker_server.py @@ -15,9 +15,6 @@ # under the License. -""" worker_server.py is an executable worker server that loads and runs -task_plugins. """ - import logging import os import threading @@ -104,15 +101,14 @@ class Server(threading.Thread): def start_zuul_client(self): """ Run the tasks """ self.log.debug('Starting zuul client') - self.zuul_client = worker_manager.ZuulClient(self.config, - self.worker_name) + self.zuul_client = worker_manager.ZuulClient(self) for task_number, plugin in enumerate(self.plugins): module = plugin['module'] job_name = '%s-%s-%s' % (plugin['plugin_config']['name'], self.worker_name, task_number) self.tasks[job_name] = module.Runner( - self.config, + self, plugin['plugin_config'], job_name ) @@ -122,14 +118,25 @@ class Server(threading.Thread): self.zuul_client.start() def start_zuul_manager(self): - self.zuul_manager = worker_manager.ZuulManager(self.config, self.tasks) + self.zuul_manager = worker_manager.ZuulManager(self, self.tasks) self.zuul_manager.start() - def stop(self): + def shutdown_gracefully(self): + """ Shutdown while no work is currently happening """ + self.log.debug('Graceful shutdown once jobs are complete...') + thread = threading.Thread(target=self._shutdown_gracefully) + thread.start() + + def _shutdown_gracefully(self): + self.zuul_client.stop_gracefully() + self.zuul_manager.stop_gracefully() self._stop.set() - self.log.debug('Exiting...') + + def shutdown(self): + self.log.debug('Shutting down now!...') self.zuul_client.stop() self.zuul_manager.stop() + self._stop.set() def stopped(self): return self._stop.isSet()