Index Perfkit Result Data into ES

This commit adds indexing perfkit result data into ElasticSearch.
The results will be able to be aggregated and visualized over various
points of metadata.

+ Improved creating a result directory to help support testing PerfKit
  indexing with pre-loaded results from previous runs.
+ Add additional error logging when unable to index results.

Change-Id: Ic18b43f19d75a061c117de6ba41545c68792e27c
This commit is contained in:
akrzos 2016-10-05 21:12:38 -04:00
parent ae07da17a7
commit 03f4db089e
3 changed files with 80 additions and 24 deletions

View File

@ -106,9 +106,10 @@ class Elastic(object):
" and browbeat UUID {}" .
format(self.index, result['browbeat_uuid']))
return True
except Exception:
except Exception as Err:
self.logger.error("Error pushing data to Elasticsearch, going to retry"
" in 10 seconds")
self.logger.error("Exception: {}".format(Err))
time.sleep(10)
if i == (retry-1):
self.logger.error("Pushing Data to Elasticsearch failed in spite of retry,"

View File

@ -12,6 +12,7 @@
import Connmon
import datetime
import Elastic
import glob
import Grafana
import logging
@ -32,6 +33,7 @@ class PerfKit(WorkloadBase.WorkloadBase):
self.tools = Tools.Tools(self.config)
self.connmon = Connmon.Connmon(self.config)
self.grafana = Grafana.Grafana(self.config)
self.elastic = Elastic.Elastic(self.config, self.__class__.__name__.lower())
self.test_count = 0
self.scenario_count = 0
self.pass_count = 0
@ -46,6 +48,17 @@ class PerfKit(WorkloadBase.WorkloadBase):
self.logger.info(
"Current number of Perfkit test failures: {}".format(self.error_count))
def string_to_dict(self, string):
"""Function for converting "|" quoted hash data into python dictionary."""
dict_data = {}
split_data = string.split('|,|')
split_data[0] = split_data[0][1:]
split_data[-1] = split_data[-1][:-1]
for item in split_data:
split_item = item.replace('.', '_').split(':')
dict_data[split_item[0]] = split_item[1]
return dict_data
def update_tests(self):
self.test_count += 1
@ -58,6 +71,47 @@ class PerfKit(WorkloadBase.WorkloadBase):
def update_scenarios(self):
self.scenario_count += 1
def get_error_details(self, result_dir):
error_details = []
with open('{}/pkb.stderr.log'.format(result_dir)) as perfkit_stderr:
for line in perfkit_stderr:
if 'ERROR' in line or 'Error' in line or 'Exception' in line:
error_details.append(line)
return error_details
def index_results(self, sucessful_run, result_dir, test_name, browbeat_rerun, benchmark_config):
complete_result_json = {'browbeat_scenario': benchmark_config}
es_ts = datetime.datetime.utcnow()
if sucessful_run:
complete_result_json['results'] = {'unit':{}, 'value': {}}
# PerfKit json is newline delimited and thus each newline json needs to be loaded
with open('{}/perfkitbenchmarker_results.json'.format(result_dir)) \
as perfkit_results_json:
for json_result in perfkit_results_json:
single_result = self.elastic.load_json(json_result.strip())
if 'browbeat_rerun' not in complete_result_json:
complete_result_json['browbeat_rerun'] = browbeat_rerun
if 'timestamp' not in complete_result_json:
complete_result_json['timestamp'] = str(es_ts).replace(" ", "T")
if 'grafana_url' not in complete_result_json:
complete_result_json['grafana_url'] = self.grafana.grafana_urls()
if 'perfkit_setup' not in complete_result_json:
complete_result_json['perfkit_setup'] = \
self.string_to_dict(single_result['labels'])
result_metric = single_result['metric'].lower().replace(' ', '_'). \
replace('.', '_')
complete_result_json['results']['value'][result_metric] = single_result['value']
complete_result_json['results']['unit'][result_metric] = single_result['unit']
result_type = 'result'
else:
complete_result_json['perfkit_errors'] = self.get_error_details(result_dir)
complete_result_json['browbeat_rerun'] = browbeat_rerun
complete_result_json['timestamp'] = str(es_ts).replace(" ", "T")
complete_result_json['grafana_url'] = self.grafana.grafana_urls()
result_type = 'error'
result = self.elastic.combine_metadata(complete_result_json)
self.elastic.index_result(result, test_name, result_type)
def run_benchmark(self, benchmark_config, result_dir, test_name, cloud_type="OpenStack"):
self.logger.debug("--------------------------------")
self.logger.debug("Benchmark_config: {}".format(benchmark_config))
@ -71,10 +125,6 @@ class PerfKit(WorkloadBase.WorkloadBase):
cmd = ("source /home/stack/overcloudrc; source {0}; "
"/home/stack/perfkit-venv/PerfKitBenchmarker/pkb.py "
"--cloud={1} --run_uri=browbeat".format(self.config['perfkit']['venv'], cloud_type))
# Add default parameters as necessary
for default_item, value in self.config['perfkit']['default'].iteritems():
if default_item not in benchmark_config:
benchmark_config[default_item] = value
for parameter, value in benchmark_config.iteritems():
if not parameter == 'name':
self.logger.debug(
@ -116,24 +166,22 @@ class PerfKit(WorkloadBase.WorkloadBase):
new_test_name = new_test_name[2:]
new_test_name = '-'.join(new_test_name)
# Determine success
success = False
try:
with open("{}/pkb.stderr.log".format(result_dir), 'r') as stderr:
if any('SUCCEEDED' in line for line in stderr):
self.logger.info("Benchmark completed.")
self.update_pass_tests()
self.update_total_pass_tests()
self.get_time_dict(
to_ts, from_ts, benchmark_config[
'benchmarks'], new_test_name,
workload, "pass")
self.get_time_dict(to_ts, from_ts, benchmark_config['benchmarks'],
new_test_name, workload, "pass")
success = True
else:
self.logger.error("Benchmark failed.")
self.update_fail_tests()
self.update_total_fail_tests()
self.get_time_dict(
to_ts, from_ts, benchmark_config[
'benchmarks'], new_test_name,
workload, "fail")
self.get_time_dict(to_ts, from_ts, benchmark_config['benchmarks'],
new_test_name, workload, "fail")
except IOError:
self.logger.error(
"File missing: {}/pkb.stderr.log".format(result_dir))
@ -153,6 +201,8 @@ class PerfKit(WorkloadBase.WorkloadBase):
from_ts, to_ts, result_dir, test_name)
self.grafana.run_playbook(from_ts, to_ts, result_dir, test_name)
return success
def start_workloads(self):
self.logger.info("Starting PerfKitBenchmarker Workloads.")
time_stamp = datetime.datetime.utcnow().strftime("%Y%m%d-%H%M%S")
@ -164,17 +214,22 @@ class PerfKit(WorkloadBase.WorkloadBase):
self.logger.info("Benchmark: {}".format(benchmark['name']))
self.update_scenarios()
self.update_total_scenarios()
# Add default parameters as necessary
for default_item, value in self.config['perfkit']['default'].iteritems():
if default_item not in benchmark:
benchmark[default_item] = value
for run in range(self.config['browbeat']['rerun']):
self.update_tests()
self.update_total_tests()
result_dir = self.tools.create_results_dir(
self.config['browbeat']['results'], time_stamp, benchmark['name'], run)
test_name = "{}-{}-{}".format(time_stamp,
benchmark['name'], run)
test_name = "{}-{}-{}".format(time_stamp, benchmark['name'], run)
workload = self.__class__.__name__
self.workload_logger(result_dir, workload)
self.run_benchmark(benchmark, result_dir, test_name)
sucess = self.run_benchmark(benchmark, result_dir, test_name)
self._log_details()
if self.config['elasticsearch']['enabled']:
self.index_results(sucess, result_dir, test_name, run, benchmark)
else:
self.logger.info(
"Skipping {} benchmark, enabled: false".format(benchmark['name']))

View File

@ -54,11 +54,11 @@ class Tools(object):
# Create directory for results
def create_results_dir(self, results_dir, timestamp, service, scenario):
try:
os.makedirs("{}/{}/{}/{}".format(results_dir,
timestamp, service, scenario))
self.logger.debug("{}/{}/{}/{}".format(os.path.dirname(results_dir), timestamp, service,
scenario))
return "{}/{}/{}/{}".format(os.path.dirname(results_dir), timestamp, service, scenario)
except OSError:
return False
the_directory = "{}/{}/{}/{}".format(results_dir, timestamp, service, scenario)
if not os.path.isdir(the_directory):
try:
os.makedirs(the_directory)
except OSError as err:
self.logger.error("Error creating the results directory: {}".format(err))
return False
return the_directory