Add watchdog for verifier processes.

Add a watchdog in parent process to check verifier child processes,
and restart if needed.

Change-Id: Icacc4c046a8f4ba949499780cdc4724c9fd54fba
This commit is contained in:
Monsyne Dragon 2016-02-25 16:28:41 +00:00
parent 2b4535c8a2
commit c389b8f8a9
6 changed files with 145 additions and 51 deletions

View File

@ -184,6 +184,7 @@ class BaseVerifierTestCase(StacktachBaseTestCase):
start = datetime.datetime.utcnow()
self.mox.StubOutWithMock(self.verifier_without_notifications, '_utcnow')
self.verifier_without_notifications._utcnow().AndReturn(start)
self.verifier_without_notifications._utcnow().AndReturn(start)
settle_offset = {SETTLE_UNITS: SETTLE_TIME}
ending_max = start - datetime.timedelta(**settle_offset)
self.mox.StubOutWithMock(self.verifier_without_notifications, 'verify_for_range')
@ -231,6 +232,7 @@ class BaseVerifierTestCase(StacktachBaseTestCase):
start = datetime.datetime.utcnow()
self.mox.StubOutWithMock(self.verifier_with_notifications, '_utcnow')
self.verifier_with_notifications._utcnow().AndReturn(start)
self.verifier_with_notifications._utcnow().AndReturn(start)
settle_offset = {SETTLE_UNITS: SETTLE_TIME}
ending_max = start - datetime.timedelta(**settle_offset)
self.mox.StubOutWithMock(self.verifier_with_notifications, 'verify_for_range')

View File

@ -108,13 +108,17 @@ def _is_alphanumeric(attr_name, attr_value, exist_id, instance_uuid):
class Verifier(object):
def __init__(self, config, pool=None, reconciler=None):
def __init__(self, config, pool=None, reconciler=None, stats=None):
self.config = config
self.pool = pool or multiprocessing.Pool(config.pool_size())
self.enable_notifications = config.enable_notifications()
self.reconciler = reconciler
self.results = []
self.failed = []
if stats is None:
self.stats = {}
else:
self.stats = stats
def clean_results(self):
pending = []
@ -147,6 +151,7 @@ class Verifier(object):
settle_units = self.config.settle_units()
settle_time = self.config.settle_time()
while self._keep_running():
self.stats['timestamp'] = self._utcnow()
with transaction.commit_on_success():
now = self._utcnow()
kwargs = {settle_units: settle_time}
@ -175,6 +180,7 @@ class Verifier(object):
def callback(result):
attempt = 0
while attempt < 2:
self.stats['timestamp'] = self._utcnow()
try:
(verified, exist) = result
if verified:
@ -198,6 +204,10 @@ class Verifier(object):
logger.exception(msg)
break
attempt += 1
self.stats['timestamp'] = self._utcnow()
total = self.stats.get('total_processed', 0) + 1
self.stats['total_processed'] = total
try:
self._run(callback=callback)
except Exception, e:

View File

@ -66,6 +66,10 @@ def pool_size():
return config['pool_size']
def process_timeout(default=0):
return config.get('process_timeout', default)
def durable_queue():
return config['rabbit']['durable_queue']

View File

@ -158,8 +158,8 @@ def _verify(exists):
class GlanceVerifier(Verifier):
def __init__(self, config, pool=None):
super(GlanceVerifier, self).__init__(config, pool=pool)
def __init__(self, config, pool=None, stats=None):
super(GlanceVerifier, self).__init__(config, pool=pool, stats=stats)
def verify_exists(self, grouped_exists, callback, verifying_status):
count = len(grouped_exists)

View File

@ -298,10 +298,6 @@ def _verify(exist, validation_level):
class NovaVerifier(base_verifier.Verifier):
def __init__(self, config, pool=None, reconciler=None):
super(NovaVerifier, self).__init__(config,
pool=pool,
reconciler=reconciler)
def send_verified_notification(self, exist, connection, exchange,
routing_keys=None):

View File

@ -5,9 +5,9 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
#
# http://www.apache.org/licenses/LICENSE-2.0
#
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@ -19,8 +19,9 @@ import json
import os
import signal
import sys
import time
from multiprocessing import Process
from multiprocessing import Process, Manager
POSSIBLE_TOPDIR = os.path.normpath(os.path.join(os.path.abspath(sys.argv[0]),
os.pardir, os.pardir))
@ -38,61 +39,142 @@ try:
except ImportError:
pass
process = None
processes = {}
log_listener = None
processes = []
stacklog.set_default_logger_name('verifier')
DEFAULT_PROC_TIMEOUT = 3600
RUNNING = True
def _get_parent_logger():
return stacklog.get_logger('verifier', is_parent=True)
def kill_time(signal, frame):
log_listener.end()
print "dying ..."
for process in processes:
process.terminate()
print "rose"
for process in processes:
process.join()
print "bud"
sys.exit(0)
def _load_nova_reconciler():
config_loc = verifier_config.reconciler_config()
with open(config_loc, 'r') as rec_config_file:
rec_config = json.load(rec_config_file)
return reconciler.Reconciler(rec_config)
if __name__ == '__main__':
def make_and_start_verifier(exchange):
# Gotta create it and run it this way so things don't get
# lost when the process is forked.
verifier = None
if exchange == "nova":
reconcile = verifier_config.reconcile()
reconciler = None
if reconcile:
reconciler = _load_nova_reconciler()
verifier = nova_verifier.NovaVerifier(verifier_config,
reconciler=reconciler)
elif exchange == "glance":
verifier = glance_verifier.GlanceVerifier(verifier_config)
verifier.run()
def make_and_start_verifier(exchange, stats=None):
# Gotta create it and run it this way so things don't get
# lost when the process is forked.
verifier = None
if exchange == "nova":
reconcile = verifier_config.reconcile()
reconciler = None
if reconcile:
reconciler = _load_nova_reconciler()
verifier = nova_verifier.NovaVerifier(verifier_config,
reconciler=reconciler,
stats=stats)
elif exchange == "glance":
verifier = glance_verifier.GlanceVerifier(verifier_config,
stats=stats)
verifier_config.load()
log_listener = stacklog.LogListener(_get_parent_logger())
log_listener.start()
verifier.run()
def create_proc_table(manager):
for exchange in verifier_config.topics().keys():
process = Process(target=make_and_start_verifier, args=(exchange,))
process.start()
processes.append(process)
stats = manager.dict()
proc_info = dict(process=None,
pid=0,
exchange=exchange,
stats=stats)
processes[exchange] = proc_info
if len(processes) > 0:
# Only pause parent process if there are children running.
# Otherwise just end...
signal.signal(signal.SIGINT, kill_time)
signal.signal(signal.SIGTERM, kill_time)
signal.pause()
def is_alive(proc_info):
process = proc_info['process']
if not proc_info['pid'] or process is None:
return False
return process.is_alive()
def needs_restart(proc_info):
timeout = verifier_config.process_timeout(DEFAULT_PROC_TIMEOUT)
process = proc_info['process']
stats = proc_info['stats']
age = datetime.datetime.utcnow() - stats['timestamp']
if timeout and (age > datetime.timedelta(seconds=timeout)):
process.terminate()
return True
return False
def start_proc(proc_info):
logger = _get_parent_logger()
if is_alive(proc_info):
if needs_restart(proc_info):
logger.warning("Child process %s (%s) terminated due to "
"heartbeat timeout. Restarting..." % (proc_info['pid'],
proc_info['exchange']))
else:
return False
stats = proc_info['stats']
stats['timestamp'] = datetime.datetime.utcnow()
stats['total_processed'] = 0
stats['processed'] = 0
args = (proc_info['exchange'], stats)
process = Process(target=make_and_start_verifier, args=args)
process.daemon = True
process.start()
proc_info['pid'] = process.pid
proc_info['process'] = process
logger.info("Started child process %s (%s)" % (proc_info['pid'],
proc_info['exchange']))
return True
def check_or_start_all():
for proc_name in sorted(processes.keys()):
if RUNNING:
start_proc(processes[proc_name])
def stop_all():
procs = sorted(processes.keys())
for pname in procs:
process = processes[pname]['process']
if process is not None:
process.terminate()
for pname in procs:
process = processes[pname]['process']
if process is not None:
process.join()
processes[pname]['process'] = None
processes[pname]['pid'] = 0
def kill_time(signal, frame):
global RUNNING
RUNNING = False
stop_all()
if __name__ == '__main__':
verifier_config.load()
logger = _get_parent_logger()
log_listener = stacklog.LogListener(logger)
log_listener.start()
manager = Manager()
create_proc_table(manager)
signal.signal(signal.SIGINT, kill_time)
signal.signal(signal.SIGTERM, kill_time)
logger.info("Starting Verifiers...")
while RUNNING:
check_or_start_all()
time.sleep(30)
logger.info("Verifiers Shutting down...")
#make sure.
stop_all()
log_listener.end()
sys.exit(0)