Enhancing Elastic resilience

Previously the same file name was used for each atomic action, causing overwriting.
Changing the logic to write to a different file each time.

+ Dump JSON in the relevant results directory
+ Update Elasticsearch indexing status in summary
+ Add failover to Shaker results also

Sample output: https://gist.github.com/smalleni/c25ef05c9b815a8e3299c2fc904908ef

Change-Id: Ib74cfd2ecf5c63857548c0ef219e7965b6323d56
This commit is contained in:
Sai Sindhur Malleni 2016-10-03 09:46:14 -04:00
parent ec90189db0
commit e06f895455
4 changed files with 97 additions and 112 deletions

View File

@ -85,7 +85,8 @@ class Elastic(object):
""" """
""" """
def index_result(self, result, test_name, _type='result', _id=None): def index_result(self, result, test_name, result_dir, identifier=None, _type='result',
_id=None):
retry = 2 retry = 2
result['browbeat_uuid'] = str(browbeat_uuid) result['browbeat_uuid'] = str(browbeat_uuid)
result['cloud_name'] = self.config['browbeat']['cloud_name'] result['cloud_name'] = self.config['browbeat']['cloud_name']
@ -104,17 +105,19 @@ class Elastic(object):
self.logger.info("Pushed data to Elasticsearch to index {}" self.logger.info("Pushed data to Elasticsearch to index {}"
" and browbeat UUID {}" . " and browbeat UUID {}" .
format(self.index, result['browbeat_uuid'])) format(self.index, result['browbeat_uuid']))
break return True
except Exception: except Exception:
self.logger.error("Error pushing data to Elasticsearch, going to retry" self.logger.error("Error pushing data to Elasticsearch, going to retry"
" in 10 seconds") " in 10 seconds")
time.sleep(10) time.sleep(10)
if i == (retry-1): if i == (retry-1):
self.logger.error("Pushing Data to Elasticsearch failed in spite of retry," self.logger.error("Pushing Data to Elasticsearch failed in spite of retry,"
" dumping JSON") " dumping JSON")
elastic_file = os.path.join(self.config['browbeat']['results'], elastic_file = os.path.join(result_dir,
test_name + '-elastic' + '.' + 'json') test_name + '-' + identifier + '-elastic' +
with open(elastic_file, 'w') as result_file: '.' + 'json')
json.dump(result, result_file, indent=4, sort_keys=True) with open(elastic_file, 'w') as result_file:
self.logger.info("Saved Elasticsearch consumable result JSON to {}". json.dump(result, result_file, indent=4, sort_keys=True)
format(elastic_file)) self.logger.info("Saved Elasticsearch consumable result JSON to {}".
format(elastic_file))
return False

View File

@ -125,9 +125,10 @@ class Rally(WorkloadBase.WorkloadBase):
result['rally_metadata'] = meta result['rally_metadata'] = meta
return result return result
def json_result(self, task_id, scenario_name, run, test_name): def json_result(self, task_id, scenario_name, run, test_name, result_dir):
rally_data = {} rally_data = {}
self.logger.info("Loadding Task_ID {} JSON".format(task_id)) failure = False
self.logger.info("Loading Task_ID {} JSON".format(task_id))
rally_json = self.elastic.load_json(self.gen_scenario_json(task_id)) rally_json = self.elastic.load_json(self.gen_scenario_json(task_id))
es_ts = datetime.datetime.utcnow() es_ts = datetime.datetime.utcnow()
if len(rally_json) < 1: if len(rally_json) < 1:
@ -158,7 +159,10 @@ class Rally(WorkloadBase.WorkloadBase):
'scenario': scenario_name, 'scenario': scenario_name,
} }
error_result = self.elastic.combine_metadata(error) error_result = self.elastic.combine_metadata(error)
self.elastic.index_result(error_result, test_name, 'error') index_status = self.elastic.index_result(error_result, test_name, result_dir,
workload, 'error')
if index_status is False:
failure = True
for workload in rally_data: for workload in rally_data:
if not type(rally_data[workload]) is dict: if not type(rally_data[workload]) is dict:
iteration = 1 iteration = 1
@ -176,8 +180,13 @@ class Rally(WorkloadBase.WorkloadBase):
'rally_setup': rally_json[0]['key'], 'rally_setup': rally_json[0]['key'],
'raw': rally_data[workload]} 'raw': rally_data[workload]}
result = self.elastic.combine_metadata(rally_stats) result = self.elastic.combine_metadata(rally_stats)
self.elastic.index_result(result, test_name) index_status = self.elastic.index_result(result, test_name, result_dir, workload)
return True if index_status is False:
failure = True
if failure:
return False
else:
return True
def start_workloads(self): def start_workloads(self):
"""Iterates through all rally scenarios in browbeat yaml config file""" """Iterates through all rally scenarios in browbeat yaml config file"""
@ -287,14 +296,18 @@ class Rally(WorkloadBase.WorkloadBase):
results[run].append(task_id) results[run].append(task_id)
self.update_pass_tests() self.update_pass_tests()
self.update_total_pass_tests() self.update_total_pass_tests()
self.get_time_dict(
to_time, from_time, benchmark[
'name'], new_test_name,
workload, "pass")
if self.config['elasticsearch']['enabled']: if self.config['elasticsearch']['enabled']:
# Start indexing # Start indexing
self.json_result( index_status = self.json_result(
task_id, scenario_name, run, test_name) task_id, scenario_name, run, test_name, result_dir)
self.get_time_dict(to_time, from_time,
benchmark['name'], new_test_name,
workload, "pass", index_status)
else:
self.get_time_dict(to_time, from_time, benchmark[
'name'], new_test_name,
workload, "pass", )
else: else:
self.logger.error( self.logger.error(
"Cannot find task_id") "Cannot find task_id")

View File

@ -83,8 +83,9 @@ class Shaker(WorkloadBase.WorkloadBase):
# by ElasticSearch and ship the data to ES # by ElasticSearch and ship the data to ES
def send_to_elastic(self, outputfile, browbeat_scenario, def send_to_elastic(self, outputfile, browbeat_scenario,
shaker_uuid, es_ts, es_list, run, test_name): shaker_uuid, es_ts, es_list, run, test_name, result_dir):
fname = outputfile fname = outputfile
failure = False
# Load output json # Load output json
try: try:
with open(fname) as data_file: with open(fname) as data_file:
@ -103,8 +104,11 @@ class Shaker(WorkloadBase.WorkloadBase):
} }
result = self.elastic.combine_metadata(shaker_stats) result = self.elastic.combine_metadata(shaker_stats)
self.elastic.index_result(result, test_name, _type='error') index_status = self.elastic.index_result(result, test_name, result_dir, _type='error')
return if index_status is False:
return False
else:
return True
# Dictionary to capture common test data # Dictionary to capture common test data
shaker_test_meta = {} shaker_test_meta = {}
for scenario in data['scenarios'].iterkeys(): for scenario in data['scenarios'].iterkeys():
@ -171,12 +175,16 @@ class Shaker(WorkloadBase.WorkloadBase):
'grafana_url': [ 'grafana_url': [
self.grafana.grafana_urls()], self.grafana.grafana_urls()],
'shaker_uuid': str(shaker_uuid)} 'shaker_uuid': str(shaker_uuid)}
identifier = elastic_timestamp + '-' + record + '-' + result['result_type']
# Ship Data to ES when record status is ok # Ship Data to ES when record status is ok
if result['value'] is None: if result['value'] is None:
self.logger.debug("Ignoring sending null values to ES") self.logger.debug("Ignoring sending null values to ES")
else: else:
result = self.elastic.combine_metadata(shaker_stats) result = self.elastic.combine_metadata(shaker_stats)
self.elastic.index_result(result, test_name) index_status = self.elastic.index_result(result, test_name, result_dir,
identifier)
if index_status is False:
failure = True
else: else:
# If the status of the record is not ok, ship minimal # If the status of the record is not ok, ship minimal
# shaker_stats dictionary to ES # shaker_stats dictionary to ES
@ -190,8 +198,16 @@ class Shaker(WorkloadBase.WorkloadBase):
'browbeat_scenario': browbeat_scenario, 'browbeat_scenario': browbeat_scenario,
'grafana_url': [self.grafana.grafana_urls()], 'grafana_url': [self.grafana.grafana_urls()],
'shaker_uuid': str(shaker_uuid)} 'shaker_uuid': str(shaker_uuid)}
identifier = record
result = self.elastic.combine_metadata(shaker_stats) result = self.elastic.combine_metadata(shaker_stats)
self.elastic.index_result(result, test_name, _type='error') index_status = self.elastic.index_result(result, test_name, result_dir, identifier,
_type='error')
if index_status is False:
failure = True
if failure:
return False
else:
return True
def set_scenario(self, scenario, fname, default_time): def set_scenario(self, scenario, fname, default_time):
stream = open(fname, 'r') stream = open(fname, 'r')
@ -238,13 +254,8 @@ class Shaker(WorkloadBase.WorkloadBase):
uuidlist.append(key) uuidlist.append(key)
return uuidlist return uuidlist
def result_check( def result_check(self, result_dir, test_name, scenario,
self, to_time, from_time, index_status="disabled"):
result_dir,
test_name,
scenario,
to_time,
from_time):
outputfile = os.path.join(result_dir, test_name + "." + "json") outputfile = os.path.join(result_dir, test_name + "." + "json")
error = False error = False
workload = self.__class__.__name__ workload = self.__class__.__name__
@ -255,88 +266,44 @@ class Shaker(WorkloadBase.WorkloadBase):
with open(outputfile) as data_file: with open(outputfile) as data_file:
data = json.load(data_file) data = json.load(data_file)
except IOError: except IOError:
self.logger.error( self.logger.error("Cannot open outputfile, possible stack creation"
"Cannot open outputfile, possible stack creation failure for test: {}". format( "failure for test: {}". format(scenario['name']))
scenario['name'])) self.error_update(result_dir, test_name, scenario, to_time,
self.error_update( from_time, new_test_name, workload, index_status)
result_dir,
test_name,
scenario,
to_time,
from_time,
new_test_name,
workload)
return return
uuidlist = self.get_uuidlist(data) uuidlist = self.get_uuidlist(data)
for id in uuidlist: for id in uuidlist:
if data['records'][id]['status'] != "ok": if data['records'][id]['status'] != "ok":
error = True error = True
if error: if error:
self.error_update( self.error_update(result_dir, test_name, scenario,
result_dir, to_time, from_time, new_test_name,
test_name, workload, index_status)
scenario,
to_time,
from_time,
new_test_name,
workload)
else: else:
self.success_update( self.success_update(result_dir, test_name, scenario, to_time,
result_dir, from_time, new_test_name, workload, index_status)
test_name,
scenario,
to_time,
from_time,
new_test_name,
workload)
def error_update(self, result_dir, test_name, scenario, to_time, from_time, def error_update(self, result_dir, test_name, scenario, to_time, from_time,
new_test_name, workload): new_test_name, workload, index_status):
self.logger.error("Failed Test: {}".format(scenario['name'])) self.logger.error("Failed Test: {}".format(scenario['name']))
self.logger.error( self.logger.error("saved log to: {}.log".format(os.path.join(result_dir,
"saved log to: {}.log".format( test_name)))
os.path.join(
result_dir,
test_name)))
self.update_fail_tests() self.update_fail_tests()
self.update_total_fail_tests() self.update_total_fail_tests()
self.get_time_dict( self.get_time_dict(to_time, from_time, scenario['name'],
to_time, new_test_name, workload, "fail", index_status)
from_time,
scenario['name'],
new_test_name,
workload,
"fail")
def success_update( def success_update(self, result_dir, test_name, scenario, to_time,
self, from_time, new_test_name, workload, index_status):
result_dir,
test_name,
scenario,
to_time,
from_time,
new_test_name,
workload):
self.logger.info("Completed Test: {}".format(scenario['name'])) self.logger.info("Completed Test: {}".format(scenario['name']))
self.logger.info( self.logger.info("Saved report to: {}.html".
"Saved report to: {}.html".format( format(os.path.join(result_dir,test_name)))
os.path.join( self.logger.info("saved log to: {}.log".format(os.path.join(result_dir,
result_dir, test_name)))
test_name)))
self.logger.info(
"saved log to: {}.log".format(
os.path.join(
result_dir,
test_name)))
self.update_pass_tests() self.update_pass_tests()
self.update_total_pass_tests() self.update_total_pass_tests()
self.get_time_dict( self.get_time_dict(to_time, from_time, scenario['name'],
to_time, new_test_name, workload, "pass", index_status)
from_time,
scenario['name'],
new_test_name,
workload,
"pass")
def run_scenario(self, scenario, result_dir, test_name, filename, def run_scenario(self, scenario, result_dir, test_name, filename,
shaker_uuid, es_ts, es_list, run): shaker_uuid, es_ts, es_list, run):
@ -355,14 +322,8 @@ class Shaker(WorkloadBase.WorkloadBase):
" --os-region-name {7} --agent-join-timeout {6}" " --os-region-name {7} --agent-join-timeout {6}"
" --report {4}/{5}.html --output {4}/{5}.json" " --report {4}/{5}.html --output {4}/{5}.json"
" --book {4}/{5} --debug > {4}/{5}.log 2>&1").format( " --book {4}/{5} --debug > {4}/{5}.log 2>&1").format(
server_endpoint, server_endpoint, port_no, flavor, filename,
port_no, result_dir, test_name, timeout, shaker_region)
flavor,
filename,
result_dir,
test_name,
timeout,
shaker_region)
cmd = ("{}; {}").format(cmd_1, cmd_2) cmd = ("{}; {}").format(cmd_1, cmd_2)
from_ts = int(time.time() * 1000) from_ts = int(time.time() * 1000)
if 'sleep_before' in self.config['shaker']: if 'sleep_before' in self.config['shaker']:
@ -373,7 +334,6 @@ class Shaker(WorkloadBase.WorkloadBase):
self.update_tests() self.update_tests()
self.update_total_tests() self.update_total_tests()
outputfile = os.path.join(result_dir, test_name + "." + "json") outputfile = os.path.join(result_dir, test_name + "." + "json")
self.result_check(result_dir, test_name, scenario, to_time, from_time)
if 'sleep_after' in self.config['shaker']: if 'sleep_after' in self.config['shaker']:
time.sleep(self.config['shaker']['sleep_after']) time.sleep(self.config['shaker']['sleep_after'])
to_ts = int(time.time() * 1000) to_ts = int(time.time() * 1000)
@ -385,8 +345,11 @@ class Shaker(WorkloadBase.WorkloadBase):
self.grafana.run_playbook(from_ts, to_ts, result_dir, test_name) self.grafana.run_playbook(from_ts, to_ts, result_dir, test_name)
# Send Data to elastic # Send Data to elastic
if self.config['elasticsearch']['enabled']: if self.config['elasticsearch']['enabled']:
self.send_to_elastic(outputfile, scenario['name'], shaker_uuid, index_status = self.send_to_elastic(outputfile, scenario['name'], shaker_uuid,
es_ts, es_list, run, test_name) es_ts, es_list, run, test_name, result_dir)
self.result_check(result_dir, test_name, scenario, to_time, from_time, index_status)
else:
self.result_check(result_dir, test_name, scenario, to_time, from_time)
def run_shaker(self): def run_shaker(self):
self.logger.info("Starting Shaker workloads") self.logger.info("Starting Shaker workloads")

View File

@ -65,7 +65,8 @@ class WorkloadBase(object):
self.logger.addHandler(file) self.logger.addHandler(file)
return None return None
def get_time_dict(self, to_time, from_time, benchmark, test_name, workload, status): def get_time_dict(self, to_time, from_time, benchmark, test_name, workload, status,
index_status="disabled"):
time_diff = (to_time - from_time) time_diff = (to_time - from_time)
if workload not in WorkloadBase.browbeat: if workload not in WorkloadBase.browbeat:
WorkloadBase.browbeat[workload] = {} WorkloadBase.browbeat[workload] = {}
@ -73,8 +74,13 @@ class WorkloadBase(object):
WorkloadBase.browbeat[workload][benchmark] = {} WorkloadBase.browbeat[workload][benchmark] = {}
if 'tests' not in WorkloadBase.browbeat[workload][benchmark]: if 'tests' not in WorkloadBase.browbeat[workload][benchmark]:
WorkloadBase.browbeat[workload][benchmark]['tests'] = [] WorkloadBase.browbeat[workload][benchmark]['tests'] = []
if index_status is True:
index_status = "success"
elif index_status is False:
index_status = "failure"
WorkloadBase.browbeat[workload][benchmark]['tests'].append( WorkloadBase.browbeat[workload][benchmark]['tests'].append(
{'Test name': test_name, 'Time': time_diff, 'status': status}) {'Test name': test_name, 'Time': time_diff, 'Test Status': status,
'Elasticsearch Indexing': index_status})
@staticmethod @staticmethod
def print_report(result_dir, time_stamp): def print_report(result_dir, time_stamp):