diff --git a/lib/Elastic.py b/lib/Elastic.py index 2fe7ed925..5fb44871c 100644 --- a/lib/Elastic.py +++ b/lib/Elastic.py @@ -106,9 +106,10 @@ class Elastic(object): " and browbeat UUID {}" . format(self.index, result['browbeat_uuid'])) return True - except Exception: + except Exception as Err: self.logger.error("Error pushing data to Elasticsearch, going to retry" " in 10 seconds") + self.logger.error("Exception: {}".format(Err)) time.sleep(10) if i == (retry-1): self.logger.error("Pushing Data to Elasticsearch failed in spite of retry," diff --git a/lib/PerfKit.py b/lib/PerfKit.py index 5ecd535e6..f8b4c8ce8 100644 --- a/lib/PerfKit.py +++ b/lib/PerfKit.py @@ -12,6 +12,7 @@ import Connmon import datetime +import Elastic import glob import Grafana import logging @@ -32,6 +33,7 @@ class PerfKit(WorkloadBase.WorkloadBase): self.tools = Tools.Tools(self.config) self.connmon = Connmon.Connmon(self.config) self.grafana = Grafana.Grafana(self.config) + self.elastic = Elastic.Elastic(self.config, self.__class__.__name__.lower()) self.test_count = 0 self.scenario_count = 0 self.pass_count = 0 @@ -46,6 +48,17 @@ class PerfKit(WorkloadBase.WorkloadBase): self.logger.info( "Current number of Perfkit test failures: {}".format(self.error_count)) + def string_to_dict(self, string): + """Function for converting "|" quoted hash data into python dictionary.""" + dict_data = {} + split_data = string.split('|,|') + split_data[0] = split_data[0][1:] + split_data[-1] = split_data[-1][:-1] + for item in split_data: + split_item = item.replace('.', '_').split(':') + dict_data[split_item[0]] = split_item[1] + return dict_data + def update_tests(self): self.test_count += 1 @@ -58,6 +71,47 @@ class PerfKit(WorkloadBase.WorkloadBase): def update_scenarios(self): self.scenario_count += 1 + def get_error_details(self, result_dir): + error_details = [] + with open('{}/pkb.stderr.log'.format(result_dir)) as perfkit_stderr: + for line in perfkit_stderr: + if 'ERROR' in line or 'Error' in line or 'Exception' in line: + error_details.append(line) + return error_details + + def index_results(self, sucessful_run, result_dir, test_name, browbeat_rerun, benchmark_config): + complete_result_json = {'browbeat_scenario': benchmark_config} + es_ts = datetime.datetime.utcnow() + if sucessful_run: + complete_result_json['results'] = {'unit':{}, 'value': {}} + # PerfKit json is newline delimited and thus each newline json needs to be loaded + with open('{}/perfkitbenchmarker_results.json'.format(result_dir)) \ + as perfkit_results_json: + for json_result in perfkit_results_json: + single_result = self.elastic.load_json(json_result.strip()) + if 'browbeat_rerun' not in complete_result_json: + complete_result_json['browbeat_rerun'] = browbeat_rerun + if 'timestamp' not in complete_result_json: + complete_result_json['timestamp'] = str(es_ts).replace(" ", "T") + if 'grafana_url' not in complete_result_json: + complete_result_json['grafana_url'] = self.grafana.grafana_urls() + if 'perfkit_setup' not in complete_result_json: + complete_result_json['perfkit_setup'] = \ + self.string_to_dict(single_result['labels']) + result_metric = single_result['metric'].lower().replace(' ', '_'). \ + replace('.', '_') + complete_result_json['results']['value'][result_metric] = single_result['value'] + complete_result_json['results']['unit'][result_metric] = single_result['unit'] + result_type = 'result' + else: + complete_result_json['perfkit_errors'] = self.get_error_details(result_dir) + complete_result_json['browbeat_rerun'] = browbeat_rerun + complete_result_json['timestamp'] = str(es_ts).replace(" ", "T") + complete_result_json['grafana_url'] = self.grafana.grafana_urls() + result_type = 'error' + result = self.elastic.combine_metadata(complete_result_json) + self.elastic.index_result(result, test_name, result_type) + def run_benchmark(self, benchmark_config, result_dir, test_name, cloud_type="OpenStack"): self.logger.debug("--------------------------------") self.logger.debug("Benchmark_config: {}".format(benchmark_config)) @@ -71,10 +125,6 @@ class PerfKit(WorkloadBase.WorkloadBase): cmd = ("source /home/stack/overcloudrc; source {0}; " "/home/stack/perfkit-venv/PerfKitBenchmarker/pkb.py " "--cloud={1} --run_uri=browbeat".format(self.config['perfkit']['venv'], cloud_type)) - # Add default parameters as necessary - for default_item, value in self.config['perfkit']['default'].iteritems(): - if default_item not in benchmark_config: - benchmark_config[default_item] = value for parameter, value in benchmark_config.iteritems(): if not parameter == 'name': self.logger.debug( @@ -116,24 +166,22 @@ class PerfKit(WorkloadBase.WorkloadBase): new_test_name = new_test_name[2:] new_test_name = '-'.join(new_test_name) # Determine success + success = False try: with open("{}/pkb.stderr.log".format(result_dir), 'r') as stderr: if any('SUCCEEDED' in line for line in stderr): self.logger.info("Benchmark completed.") self.update_pass_tests() self.update_total_pass_tests() - self.get_time_dict( - to_ts, from_ts, benchmark_config[ - 'benchmarks'], new_test_name, - workload, "pass") + self.get_time_dict(to_ts, from_ts, benchmark_config['benchmarks'], + new_test_name, workload, "pass") + success = True else: self.logger.error("Benchmark failed.") self.update_fail_tests() self.update_total_fail_tests() - self.get_time_dict( - to_ts, from_ts, benchmark_config[ - 'benchmarks'], new_test_name, - workload, "fail") + self.get_time_dict(to_ts, from_ts, benchmark_config['benchmarks'], + new_test_name, workload, "fail") except IOError: self.logger.error( "File missing: {}/pkb.stderr.log".format(result_dir)) @@ -153,6 +201,8 @@ class PerfKit(WorkloadBase.WorkloadBase): from_ts, to_ts, result_dir, test_name) self.grafana.run_playbook(from_ts, to_ts, result_dir, test_name) + return success + def start_workloads(self): self.logger.info("Starting PerfKitBenchmarker Workloads.") time_stamp = datetime.datetime.utcnow().strftime("%Y%m%d-%H%M%S") @@ -164,17 +214,22 @@ class PerfKit(WorkloadBase.WorkloadBase): self.logger.info("Benchmark: {}".format(benchmark['name'])) self.update_scenarios() self.update_total_scenarios() + # Add default parameters as necessary + for default_item, value in self.config['perfkit']['default'].iteritems(): + if default_item not in benchmark: + benchmark[default_item] = value for run in range(self.config['browbeat']['rerun']): self.update_tests() self.update_total_tests() result_dir = self.tools.create_results_dir( self.config['browbeat']['results'], time_stamp, benchmark['name'], run) - test_name = "{}-{}-{}".format(time_stamp, - benchmark['name'], run) + test_name = "{}-{}-{}".format(time_stamp, benchmark['name'], run) workload = self.__class__.__name__ self.workload_logger(result_dir, workload) - self.run_benchmark(benchmark, result_dir, test_name) + sucess = self.run_benchmark(benchmark, result_dir, test_name) self._log_details() + if self.config['elasticsearch']['enabled']: + self.index_results(sucess, result_dir, test_name, run, benchmark) else: self.logger.info( "Skipping {} benchmark, enabled: false".format(benchmark['name'])) diff --git a/lib/Tools.py b/lib/Tools.py index b398c7e53..a6969b3d6 100644 --- a/lib/Tools.py +++ b/lib/Tools.py @@ -54,11 +54,11 @@ class Tools(object): # Create directory for results def create_results_dir(self, results_dir, timestamp, service, scenario): - try: - os.makedirs("{}/{}/{}/{}".format(results_dir, - timestamp, service, scenario)) - self.logger.debug("{}/{}/{}/{}".format(os.path.dirname(results_dir), timestamp, service, - scenario)) - return "{}/{}/{}/{}".format(os.path.dirname(results_dir), timestamp, service, scenario) - except OSError: - return False + the_directory = "{}/{}/{}/{}".format(results_dir, timestamp, service, scenario) + if not os.path.isdir(the_directory): + try: + os.makedirs(the_directory) + except OSError as err: + self.logger.error("Error creating the results directory: {}".format(err)) + return False + return the_directory