diff --git a/lib/Elastic.py b/lib/Elastic.py index 88d46a546..2fe7ed925 100644 --- a/lib/Elastic.py +++ b/lib/Elastic.py @@ -85,7 +85,8 @@ class Elastic(object): """ """ - def index_result(self, result, test_name, _type='result', _id=None): + def index_result(self, result, test_name, result_dir, identifier=None, _type='result', + _id=None): retry = 2 result['browbeat_uuid'] = str(browbeat_uuid) result['cloud_name'] = self.config['browbeat']['cloud_name'] @@ -104,17 +105,19 @@ class Elastic(object): self.logger.info("Pushed data to Elasticsearch to index {}" " and browbeat UUID {}" . format(self.index, result['browbeat_uuid'])) - break + return True except Exception: self.logger.error("Error pushing data to Elasticsearch, going to retry" " in 10 seconds") time.sleep(10) if i == (retry-1): - self.logger.error("Pushing Data to Elasticsearch failed in spite of retry," - " dumping JSON") - elastic_file = os.path.join(self.config['browbeat']['results'], - test_name + '-elastic' + '.' + 'json') - with open(elastic_file, 'w') as result_file: - json.dump(result, result_file, indent=4, sort_keys=True) - self.logger.info("Saved Elasticsearch consumable result JSON to {}". - format(elastic_file)) + self.logger.error("Pushing Data to Elasticsearch failed in spite of retry," + " dumping JSON") + elastic_file = os.path.join(result_dir, + test_name + '-' + identifier + '-elastic' + + '.' + 'json') + with open(elastic_file, 'w') as result_file: + json.dump(result, result_file, indent=4, sort_keys=True) + self.logger.info("Saved Elasticsearch consumable result JSON to {}". + format(elastic_file)) + return False diff --git a/lib/Rally.py b/lib/Rally.py index 21ec48d56..c6e22efe4 100644 --- a/lib/Rally.py +++ b/lib/Rally.py @@ -125,9 +125,10 @@ class Rally(WorkloadBase.WorkloadBase): result['rally_metadata'] = meta return result - def json_result(self, task_id, scenario_name, run, test_name): + def json_result(self, task_id, scenario_name, run, test_name, result_dir): rally_data = {} - self.logger.info("Loadding Task_ID {} JSON".format(task_id)) + failure = False + self.logger.info("Loading Task_ID {} JSON".format(task_id)) rally_json = self.elastic.load_json(self.gen_scenario_json(task_id)) es_ts = datetime.datetime.utcnow() if len(rally_json) < 1: @@ -158,7 +159,10 @@ class Rally(WorkloadBase.WorkloadBase): 'scenario': scenario_name, } error_result = self.elastic.combine_metadata(error) - self.elastic.index_result(error_result, test_name, 'error') + index_status = self.elastic.index_result(error_result, test_name, result_dir, + workload, 'error') + if index_status is False: + failure = True for workload in rally_data: if not type(rally_data[workload]) is dict: iteration = 1 @@ -176,8 +180,13 @@ class Rally(WorkloadBase.WorkloadBase): 'rally_setup': rally_json[0]['key'], 'raw': rally_data[workload]} result = self.elastic.combine_metadata(rally_stats) - self.elastic.index_result(result, test_name) - return True + index_status = self.elastic.index_result(result, test_name, result_dir, workload) + if index_status is False: + failure = True + if failure: + return False + else: + return True def start_workloads(self): """Iterates through all rally scenarios in browbeat yaml config file""" @@ -287,14 +296,18 @@ class Rally(WorkloadBase.WorkloadBase): results[run].append(task_id) self.update_pass_tests() self.update_total_pass_tests() - self.get_time_dict( - to_time, from_time, benchmark[ - 'name'], new_test_name, - workload, "pass") if self.config['elasticsearch']['enabled']: # Start indexing - self.json_result( - task_id, scenario_name, run, test_name) + index_status = self.json_result( + task_id, scenario_name, run, test_name, result_dir) + self.get_time_dict(to_time, from_time, + benchmark['name'], new_test_name, + workload, "pass", index_status) + else: + self.get_time_dict(to_time, from_time, benchmark[ + 'name'], new_test_name, + workload, "pass", ) + else: self.logger.error( "Cannot find task_id") diff --git a/lib/Shaker.py b/lib/Shaker.py index 9180c72e8..f40bd0ddb 100644 --- a/lib/Shaker.py +++ b/lib/Shaker.py @@ -83,8 +83,9 @@ class Shaker(WorkloadBase.WorkloadBase): # by ElasticSearch and ship the data to ES def send_to_elastic(self, outputfile, browbeat_scenario, - shaker_uuid, es_ts, es_list, run, test_name): + shaker_uuid, es_ts, es_list, run, test_name, result_dir): fname = outputfile + failure = False # Load output json try: with open(fname) as data_file: @@ -103,8 +104,11 @@ class Shaker(WorkloadBase.WorkloadBase): } result = self.elastic.combine_metadata(shaker_stats) - self.elastic.index_result(result, test_name, _type='error') - return + index_status = self.elastic.index_result(result, test_name, result_dir, _type='error') + if index_status is False: + return False + else: + return True # Dictionary to capture common test data shaker_test_meta = {} for scenario in data['scenarios'].iterkeys(): @@ -171,12 +175,16 @@ class Shaker(WorkloadBase.WorkloadBase): 'grafana_url': [ self.grafana.grafana_urls()], 'shaker_uuid': str(shaker_uuid)} + identifier = elastic_timestamp + '-' + record + '-' + result['result_type'] # Ship Data to ES when record status is ok if result['value'] is None: self.logger.debug("Ignoring sending null values to ES") else: result = self.elastic.combine_metadata(shaker_stats) - self.elastic.index_result(result, test_name) + index_status = self.elastic.index_result(result, test_name, result_dir, + identifier) + if index_status is False: + failure = True else: # If the status of the record is not ok, ship minimal # shaker_stats dictionary to ES @@ -190,8 +198,16 @@ class Shaker(WorkloadBase.WorkloadBase): 'browbeat_scenario': browbeat_scenario, 'grafana_url': [self.grafana.grafana_urls()], 'shaker_uuid': str(shaker_uuid)} + identifier = record result = self.elastic.combine_metadata(shaker_stats) - self.elastic.index_result(result, test_name, _type='error') + index_status = self.elastic.index_result(result, test_name, result_dir, identifier, + _type='error') + if index_status is False: + failure = True + if failure: + return False + else: + return True def set_scenario(self, scenario, fname, default_time): stream = open(fname, 'r') @@ -238,13 +254,8 @@ class Shaker(WorkloadBase.WorkloadBase): uuidlist.append(key) return uuidlist - def result_check( - self, - result_dir, - test_name, - scenario, - to_time, - from_time): + def result_check(self, result_dir, test_name, scenario, + to_time, from_time, index_status="disabled"): outputfile = os.path.join(result_dir, test_name + "." + "json") error = False workload = self.__class__.__name__ @@ -255,88 +266,44 @@ class Shaker(WorkloadBase.WorkloadBase): with open(outputfile) as data_file: data = json.load(data_file) except IOError: - self.logger.error( - "Cannot open outputfile, possible stack creation failure for test: {}". format( - scenario['name'])) - self.error_update( - result_dir, - test_name, - scenario, - to_time, - from_time, - new_test_name, - workload) + self.logger.error("Cannot open outputfile, possible stack creation" + "failure for test: {}". format(scenario['name'])) + self.error_update(result_dir, test_name, scenario, to_time, + from_time, new_test_name, workload, index_status) return uuidlist = self.get_uuidlist(data) for id in uuidlist: if data['records'][id]['status'] != "ok": error = True if error: - self.error_update( - result_dir, - test_name, - scenario, - to_time, - from_time, - new_test_name, - workload) + self.error_update(result_dir, test_name, scenario, + to_time, from_time, new_test_name, + workload, index_status) else: - self.success_update( - result_dir, - test_name, - scenario, - to_time, - from_time, - new_test_name, - workload) + self.success_update(result_dir, test_name, scenario, to_time, + from_time, new_test_name, workload, index_status) def error_update(self, result_dir, test_name, scenario, to_time, from_time, - new_test_name, workload): + new_test_name, workload, index_status): self.logger.error("Failed Test: {}".format(scenario['name'])) - self.logger.error( - "saved log to: {}.log".format( - os.path.join( - result_dir, - test_name))) + self.logger.error("saved log to: {}.log".format(os.path.join(result_dir, + test_name))) self.update_fail_tests() self.update_total_fail_tests() - self.get_time_dict( - to_time, - from_time, - scenario['name'], - new_test_name, - workload, - "fail") + self.get_time_dict(to_time, from_time, scenario['name'], + new_test_name, workload, "fail", index_status) - def success_update( - self, - result_dir, - test_name, - scenario, - to_time, - from_time, - new_test_name, - workload): + def success_update(self, result_dir, test_name, scenario, to_time, + from_time, new_test_name, workload, index_status): self.logger.info("Completed Test: {}".format(scenario['name'])) - self.logger.info( - "Saved report to: {}.html".format( - os.path.join( - result_dir, - test_name))) - self.logger.info( - "saved log to: {}.log".format( - os.path.join( - result_dir, - test_name))) + self.logger.info("Saved report to: {}.html". + format(os.path.join(result_dir,test_name))) + self.logger.info("saved log to: {}.log".format(os.path.join(result_dir, + test_name))) self.update_pass_tests() self.update_total_pass_tests() - self.get_time_dict( - to_time, - from_time, - scenario['name'], - new_test_name, - workload, - "pass") + self.get_time_dict(to_time, from_time, scenario['name'], + new_test_name, workload, "pass", index_status) def run_scenario(self, scenario, result_dir, test_name, filename, shaker_uuid, es_ts, es_list, run): @@ -355,14 +322,8 @@ class Shaker(WorkloadBase.WorkloadBase): " --os-region-name {7} --agent-join-timeout {6}" " --report {4}/{5}.html --output {4}/{5}.json" " --book {4}/{5} --debug > {4}/{5}.log 2>&1").format( - server_endpoint, - port_no, - flavor, - filename, - result_dir, - test_name, - timeout, - shaker_region) + server_endpoint, port_no, flavor, filename, + result_dir, test_name, timeout, shaker_region) cmd = ("{}; {}").format(cmd_1, cmd_2) from_ts = int(time.time() * 1000) if 'sleep_before' in self.config['shaker']: @@ -373,7 +334,6 @@ class Shaker(WorkloadBase.WorkloadBase): self.update_tests() self.update_total_tests() outputfile = os.path.join(result_dir, test_name + "." + "json") - self.result_check(result_dir, test_name, scenario, to_time, from_time) if 'sleep_after' in self.config['shaker']: time.sleep(self.config['shaker']['sleep_after']) to_ts = int(time.time() * 1000) @@ -385,8 +345,11 @@ class Shaker(WorkloadBase.WorkloadBase): self.grafana.run_playbook(from_ts, to_ts, result_dir, test_name) # Send Data to elastic if self.config['elasticsearch']['enabled']: - self.send_to_elastic(outputfile, scenario['name'], shaker_uuid, - es_ts, es_list, run, test_name) + index_status = self.send_to_elastic(outputfile, scenario['name'], shaker_uuid, + es_ts, es_list, run, test_name, result_dir) + self.result_check(result_dir, test_name, scenario, to_time, from_time, index_status) + else: + self.result_check(result_dir, test_name, scenario, to_time, from_time) def run_shaker(self): self.logger.info("Starting Shaker workloads") diff --git a/lib/WorkloadBase.py b/lib/WorkloadBase.py index 832e36c21..c38a59011 100644 --- a/lib/WorkloadBase.py +++ b/lib/WorkloadBase.py @@ -65,7 +65,8 @@ class WorkloadBase(object): self.logger.addHandler(file) return None - def get_time_dict(self, to_time, from_time, benchmark, test_name, workload, status): + def get_time_dict(self, to_time, from_time, benchmark, test_name, workload, status, + index_status="disabled"): time_diff = (to_time - from_time) if workload not in WorkloadBase.browbeat: WorkloadBase.browbeat[workload] = {} @@ -73,8 +74,13 @@ class WorkloadBase(object): WorkloadBase.browbeat[workload][benchmark] = {} if 'tests' not in WorkloadBase.browbeat[workload][benchmark]: WorkloadBase.browbeat[workload][benchmark]['tests'] = [] + if index_status is True: + index_status = "success" + elif index_status is False: + index_status = "failure" WorkloadBase.browbeat[workload][benchmark]['tests'].append( - {'Test name': test_name, 'Time': time_diff, 'status': status}) + {'Test name': test_name, 'Time': time_diff, 'Test Status': status, + 'Elasticsearch Indexing': index_status}) @staticmethod def print_report(result_dir, time_stamp):