From c389b8f8a9fa2b55ce74b3b1bc7f56542802b91a Mon Sep 17 00:00:00 2001 From: Monsyne Dragon Date: Thu, 25 Feb 2016 16:28:41 +0000 Subject: [PATCH] Add watchdog for verifier processes. Add a watchdog in parent process to check verifier child processes, and restart if needed. Change-Id: Icacc4c046a8f4ba949499780cdc4724c9fd54fba --- tests/unit/test_base_verifier.py | 2 + verifier/base_verifier.py | 12 ++- verifier/config.py | 4 + verifier/glance_verifier.py | 4 +- verifier/nova_verifier.py | 4 - verifier/start_verifier.py | 170 +++++++++++++++++++++++-------- 6 files changed, 145 insertions(+), 51 deletions(-) diff --git a/tests/unit/test_base_verifier.py b/tests/unit/test_base_verifier.py index b4f6985..19decce 100644 --- a/tests/unit/test_base_verifier.py +++ b/tests/unit/test_base_verifier.py @@ -184,6 +184,7 @@ class BaseVerifierTestCase(StacktachBaseTestCase): start = datetime.datetime.utcnow() self.mox.StubOutWithMock(self.verifier_without_notifications, '_utcnow') self.verifier_without_notifications._utcnow().AndReturn(start) + self.verifier_without_notifications._utcnow().AndReturn(start) settle_offset = {SETTLE_UNITS: SETTLE_TIME} ending_max = start - datetime.timedelta(**settle_offset) self.mox.StubOutWithMock(self.verifier_without_notifications, 'verify_for_range') @@ -231,6 +232,7 @@ class BaseVerifierTestCase(StacktachBaseTestCase): start = datetime.datetime.utcnow() self.mox.StubOutWithMock(self.verifier_with_notifications, '_utcnow') self.verifier_with_notifications._utcnow().AndReturn(start) + self.verifier_with_notifications._utcnow().AndReturn(start) settle_offset = {SETTLE_UNITS: SETTLE_TIME} ending_max = start - datetime.timedelta(**settle_offset) self.mox.StubOutWithMock(self.verifier_with_notifications, 'verify_for_range') diff --git a/verifier/base_verifier.py b/verifier/base_verifier.py index 7f2e3c8..377cd2f 100644 --- a/verifier/base_verifier.py +++ b/verifier/base_verifier.py @@ -108,13 +108,17 @@ def _is_alphanumeric(attr_name, attr_value, exist_id, instance_uuid): class Verifier(object): - def __init__(self, config, pool=None, reconciler=None): + def __init__(self, config, pool=None, reconciler=None, stats=None): self.config = config self.pool = pool or multiprocessing.Pool(config.pool_size()) self.enable_notifications = config.enable_notifications() self.reconciler = reconciler self.results = [] self.failed = [] + if stats is None: + self.stats = {} + else: + self.stats = stats def clean_results(self): pending = [] @@ -147,6 +151,7 @@ class Verifier(object): settle_units = self.config.settle_units() settle_time = self.config.settle_time() while self._keep_running(): + self.stats['timestamp'] = self._utcnow() with transaction.commit_on_success(): now = self._utcnow() kwargs = {settle_units: settle_time} @@ -175,6 +180,7 @@ class Verifier(object): def callback(result): attempt = 0 while attempt < 2: + self.stats['timestamp'] = self._utcnow() try: (verified, exist) = result if verified: @@ -198,6 +204,10 @@ class Verifier(object): logger.exception(msg) break attempt += 1 + self.stats['timestamp'] = self._utcnow() + total = self.stats.get('total_processed', 0) + 1 + self.stats['total_processed'] = total + try: self._run(callback=callback) except Exception, e: diff --git a/verifier/config.py b/verifier/config.py index f7c711b..b9f4eee 100644 --- a/verifier/config.py +++ b/verifier/config.py @@ -66,6 +66,10 @@ def pool_size(): return config['pool_size'] +def process_timeout(default=0): + return config.get('process_timeout', default) + + def durable_queue(): return config['rabbit']['durable_queue'] diff --git a/verifier/glance_verifier.py b/verifier/glance_verifier.py index be9ca4d..2b2a9d3 100644 --- a/verifier/glance_verifier.py +++ b/verifier/glance_verifier.py @@ -158,8 +158,8 @@ def _verify(exists): class GlanceVerifier(Verifier): - def __init__(self, config, pool=None): - super(GlanceVerifier, self).__init__(config, pool=pool) + def __init__(self, config, pool=None, stats=None): + super(GlanceVerifier, self).__init__(config, pool=pool, stats=stats) def verify_exists(self, grouped_exists, callback, verifying_status): count = len(grouped_exists) diff --git a/verifier/nova_verifier.py b/verifier/nova_verifier.py index 41f26f3..0700a33 100644 --- a/verifier/nova_verifier.py +++ b/verifier/nova_verifier.py @@ -298,10 +298,6 @@ def _verify(exist, validation_level): class NovaVerifier(base_verifier.Verifier): - def __init__(self, config, pool=None, reconciler=None): - super(NovaVerifier, self).__init__(config, - pool=pool, - reconciler=reconciler) def send_verified_notification(self, exist, connection, exchange, routing_keys=None): diff --git a/verifier/start_verifier.py b/verifier/start_verifier.py index 5baf7b3..ef98086 100644 --- a/verifier/start_verifier.py +++ b/verifier/start_verifier.py @@ -5,9 +5,9 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -19,8 +19,9 @@ import json import os import signal import sys +import time -from multiprocessing import Process +from multiprocessing import Process, Manager POSSIBLE_TOPDIR = os.path.normpath(os.path.join(os.path.abspath(sys.argv[0]), os.pardir, os.pardir)) @@ -38,61 +39,142 @@ try: except ImportError: pass -process = None +processes = {} log_listener = None -processes = [] stacklog.set_default_logger_name('verifier') +DEFAULT_PROC_TIMEOUT = 3600 +RUNNING = True + def _get_parent_logger(): return stacklog.get_logger('verifier', is_parent=True) -def kill_time(signal, frame): - log_listener.end() - print "dying ..." - for process in processes: - process.terminate() - print "rose" - for process in processes: - process.join() - print "bud" - sys.exit(0) - - def _load_nova_reconciler(): config_loc = verifier_config.reconciler_config() with open(config_loc, 'r') as rec_config_file: rec_config = json.load(rec_config_file) return reconciler.Reconciler(rec_config) -if __name__ == '__main__': - def make_and_start_verifier(exchange): - # Gotta create it and run it this way so things don't get - # lost when the process is forked. - verifier = None - if exchange == "nova": - reconcile = verifier_config.reconcile() - reconciler = None - if reconcile: - reconciler = _load_nova_reconciler() - verifier = nova_verifier.NovaVerifier(verifier_config, - reconciler=reconciler) - elif exchange == "glance": - verifier = glance_verifier.GlanceVerifier(verifier_config) - verifier.run() +def make_and_start_verifier(exchange, stats=None): + # Gotta create it and run it this way so things don't get + # lost when the process is forked. + verifier = None + if exchange == "nova": + reconcile = verifier_config.reconcile() + reconciler = None + if reconcile: + reconciler = _load_nova_reconciler() + verifier = nova_verifier.NovaVerifier(verifier_config, + reconciler=reconciler, + stats=stats) + elif exchange == "glance": + verifier = glance_verifier.GlanceVerifier(verifier_config, + stats=stats) - verifier_config.load() - log_listener = stacklog.LogListener(_get_parent_logger()) - log_listener.start() + verifier.run() + + +def create_proc_table(manager): for exchange in verifier_config.topics().keys(): - process = Process(target=make_and_start_verifier, args=(exchange,)) - process.start() - processes.append(process) + stats = manager.dict() + proc_info = dict(process=None, + pid=0, + exchange=exchange, + stats=stats) + processes[exchange] = proc_info - if len(processes) > 0: - # Only pause parent process if there are children running. - # Otherwise just end... - signal.signal(signal.SIGINT, kill_time) - signal.signal(signal.SIGTERM, kill_time) - signal.pause() + +def is_alive(proc_info): + process = proc_info['process'] + if not proc_info['pid'] or process is None: + return False + return process.is_alive() + + +def needs_restart(proc_info): + timeout = verifier_config.process_timeout(DEFAULT_PROC_TIMEOUT) + process = proc_info['process'] + stats = proc_info['stats'] + age = datetime.datetime.utcnow() - stats['timestamp'] + if timeout and (age > datetime.timedelta(seconds=timeout)): + process.terminate() + return True + return False + + +def start_proc(proc_info): + logger = _get_parent_logger() + if is_alive(proc_info): + if needs_restart(proc_info): + logger.warning("Child process %s (%s) terminated due to " + "heartbeat timeout. Restarting..." % (proc_info['pid'], + proc_info['exchange'])) + else: + return False + stats = proc_info['stats'] + stats['timestamp'] = datetime.datetime.utcnow() + stats['total_processed'] = 0 + stats['processed'] = 0 + args = (proc_info['exchange'], stats) + process = Process(target=make_and_start_verifier, args=args) + process.daemon = True + process.start() + proc_info['pid'] = process.pid + proc_info['process'] = process + logger.info("Started child process %s (%s)" % (proc_info['pid'], + proc_info['exchange'])) + return True + + +def check_or_start_all(): + for proc_name in sorted(processes.keys()): + if RUNNING: + start_proc(processes[proc_name]) + + +def stop_all(): + procs = sorted(processes.keys()) + for pname in procs: + process = processes[pname]['process'] + if process is not None: + process.terminate() + for pname in procs: + process = processes[pname]['process'] + if process is not None: + process.join() + processes[pname]['process'] = None + processes[pname]['pid'] = 0 + + +def kill_time(signal, frame): + global RUNNING + RUNNING = False + stop_all() + + +if __name__ == '__main__': + verifier_config.load() + + logger = _get_parent_logger() + log_listener = stacklog.LogListener(logger) + log_listener.start() + manager = Manager() + + create_proc_table(manager) + + signal.signal(signal.SIGINT, kill_time) + signal.signal(signal.SIGTERM, kill_time) + + logger.info("Starting Verifiers...") + while RUNNING: + check_or_start_all() + time.sleep(30) + logger.info("Verifiers Shutting down...") + + #make sure. + stop_all() + + log_listener.end() + sys.exit(0)