diff --git a/browbeat.py b/browbeat.py index 39fdf059b..6b3d475f0 100755 --- a/browbeat.py +++ b/browbeat.py @@ -41,6 +41,8 @@ def main(): parser.add_argument('workloads', nargs='*', help='Browbeat workload(s). Takes a space separated' ' list of workloads ({}) or \"all\"'.format(', '.join(_workload_opts))) parser.add_argument('--debug', action='store_true', help='Enable Debug messages') + parser.add_argument('-p','--postprocess', + dest="path",help="Path to process, ie results/20170101/") _cli_args = parser.parse_args() _logger = logging.getLogger('browbeat') @@ -67,6 +69,8 @@ def main(): # Default to all workloads if _cli_args.workloads == []: _cli_args.workloads.append('all') + if _cli_args.path : + return tools.post_process(_cli_args) if len(_cli_args.workloads) == 1 and 'all' in _cli_args.workloads: _cli_args.workloads = _workload_opts diff --git a/lib/Rally.py b/lib/Rally.py index b616d67f1..c6cd3f1c0 100644 --- a/lib/Rally.py +++ b/lib/Rally.py @@ -23,6 +23,7 @@ import shutil import time import Tools import WorkloadBase +import json class Rally(WorkloadBase.WorkloadBase): @@ -33,7 +34,8 @@ class Rally(WorkloadBase.WorkloadBase): self.tools = Tools.Tools(self.config) self.connmon = Connmon.Connmon(self.config) self.grafana = Grafana.Grafana(self.config) - self.elastic = Elastic.Elastic(self.config, self.__class__.__name__.lower()) + self.elastic = Elastic.Elastic( + self.config, self.__class__.__name__.lower()) self.error_count = 0 self.pass_count = 0 self.test_count = 0 @@ -125,16 +127,44 @@ class Rally(WorkloadBase.WorkloadBase): result['rally_metadata'] = meta return result - def json_result(self, task_id, scenario_name, run, test_name, result_dir): + def file_to_json(self, filename, push_to_es=False): + self.logger.info("Loading rally JSON file {} JSON".format(filename)) + rally_json = self.elastic.load_json_file(filename) + errors, results = self.json_parse(rally_json) + for error in errors: + error_result = self.elastic.combine_metadata(error) + with open("{}/{}-error_index-es.json".format(os.path.dirname(filename), + os.path.basename(filename)), + 'w+') as error_file: + json.dump(error_result, error_file) + for result in results: + result_doc = self.elastic.combine_metadata(result) + with open("{}/{}-result_index-es.json".format(os.path.dirname(filename), + os.path.splitext( + os.path.basename(filename))[0]), + 'w+') as result_file: + json.dump(result_doc, result_file) + return errors, results + + def json_parse(self, json_doc, metadata={}): + """Function to extract data out of a json document + + Args: + json_doc (json): json document to parse + metadata (dict): dict containing run specific metadata, ie rally UUID. + + Returns: + errors (list) : errors contained within the json_doc + results (list) : results contained within the json_doc + """ rally_data = {} - failure = False - self.logger.info("Loading Task_ID {} JSON".format(task_id)) - rally_json = self.elastic.load_json(self.gen_scenario_json(task_id)) - es_ts = datetime.datetime.utcnow() - if len(rally_json) < 1: - self.logger.error("Issue with Rally Results") + errors = [] + results = [] + if len(json_doc) < 1: + self.logger.error("Issue with JSON document") return False - for metrics in rally_json[0]['result']: + es_ts = datetime.datetime.utcnow() + for metrics in json_doc[0]['result']: for workload in metrics: if type(metrics[workload]) is dict: for value in metrics[workload]: @@ -146,47 +176,56 @@ class Rally(WorkloadBase.WorkloadBase): iteration = 1 workload_name = value if value.find('(') is not -1: - iteration = re.findall('\d+', value) + iteration = re.findall('\d+', value)[0] workload_name = value.split('(')[0] error = {'action': workload_name.strip(), - 'browbeat_rerun': run, 'iteration': iteration, 'error_type': metrics['error'][0], 'error_msg': metrics['error'][1], - 'result': task_id, 'timestamp': str(es_ts).replace(" ", "T"), - 'rally_setup': rally_json[0]['key'], - 'scenario': scenario_name, + 'rally_setup': json_doc[0]['key'] } - error_result = self.elastic.combine_metadata(error) - index_status = self.elastic.index_result(error_result, test_name, result_dir, - workload, 'error') - if index_status is False: - failure = True + if len(metadata) > 0: + error.update(metadata) + errors.append(error) for workload in rally_data: if not type(rally_data[workload]) is dict: iteration = 1 workload_name = workload if workload.find('(') is not -1: - iteration = re.findall('\d+', workload) + iteration = re.findall('\d+', workload)[0] workload_name = workload.split('(')[0] - rally_stats = {'result': task_id, - 'action': workload_name.strip(), - 'browbeat_rerun': run, + rally_stats = {'action': workload_name.strip(), 'iteration': iteration, 'timestamp': str(es_ts).replace(" ", "T"), 'grafana_url': [self.grafana.grafana_urls()], - 'scenario': scenario_name, - 'rally_setup': rally_json[0]['key'], + 'rally_setup': json_doc[0]['key'], 'raw': rally_data[workload]} - result = self.elastic.combine_metadata(rally_stats) - index_status = self.elastic.index_result(result, test_name, result_dir, workload) - if index_status is False: - failure = True - if failure: - return False - else: - return True + if len(metadata) > 0: + rally_stats.update(metadata) + results.append(rally_stats) + return errors, results + + def json_result(self, task_id, scenario_name, run, test_name, result_dir): + success = True + self.logger.info("Loading Task_ID {} JSON".format(task_id)) + rally_json = self.elastic.load_json(self.gen_scenario_json(task_id)) + errors, results = self.json_parse(rally_json, {'scenario': scenario_name, + 'browbeat_rerun': run, + 'result': task_id}) + for error in errors: + error_result = self.elastic.combine_metadata(error) + status = self.elastic.index_result(error_result, test_name, result_dir, + 'rally', 'error') + if not status: + success = False + for result in results: + result = self.elastic.combine_metadata(result) + status = self.elastic.index_result( + result, test_name, result_dir, 'rally') + if not status: + success = False + return success def start_workloads(self): """Iterates through all rally scenarios in browbeat yaml config file""" @@ -239,7 +278,8 @@ class Rally(WorkloadBase.WorkloadBase): del scenario['concurrency'] else: concurrencies = def_concurrencies - concurrency_count_dict = collections.Counter(concurrencies) + concurrency_count_dict = collections.Counter( + concurrencies) if 'times' not in scenario: scenario['times'] = def_times @@ -260,7 +300,8 @@ class Rally(WorkloadBase.WorkloadBase): self.logger.debug("Duplicate concurrency {} found," " setting test name" " to {}".format(concurrency, test_name)) - concurrency_count_dict[concurrency] -= 1 + concurrency_count_dict[ + concurrency] -= 1 if not result_dir: self.logger.error( @@ -311,7 +352,8 @@ class Rally(WorkloadBase.WorkloadBase): index_status = self.json_result( task_id, scenario_name, run, test_name, result_dir) self.get_time_dict(to_time, from_time, - benchmark['name'], new_test_name, + benchmark[ + 'name'], new_test_name, workload, "pass", index_status) else: self.get_time_dict(to_time, from_time, benchmark[ diff --git a/lib/Tools.py b/lib/Tools.py index 39b2afcb5..22a2f2d31 100644 --- a/lib/Tools.py +++ b/lib/Tools.py @@ -17,9 +17,11 @@ import logging import os import subprocess import yaml +import re from pykwalify import core as pykwalify_core from pykwalify import errors as pykwalify_errors + class Tools(object): def __init__(self, config=None): @@ -59,32 +61,38 @@ class Tools(object): # Create directory for results def create_results_dir(self, results_dir, timestamp, service, scenario): - the_directory = "{}/{}/{}/{}".format(results_dir, timestamp, service, scenario) + the_directory = "{}/{}/{}/{}".format(results_dir, + timestamp, service, scenario) if not os.path.isdir(the_directory): try: os.makedirs(the_directory) except OSError as err: - self.logger.error("Error creating the results directory: {}".format(err)) + self.logger.error( + "Error creating the results directory: {}".format(err)) return False return the_directory - def _load_config(self, path): + def _load_config(self, path, validate=True): try: stream = open(path, 'r') except IOError: - self.logger.error("Configuration file {} passed is missing".format(path)) + self.logger.error( + "Configuration file {} passed is missing".format(path)) exit(1) config = yaml.load(stream) stream.close() self.config = config - self.validate_yaml() + if validate: + self.validate_yaml() return config def validate_yaml(self): - self.logger.info("Validating the configuration file passed by the user") + self.logger.info( + "Validating the configuration file passed by the user") stream = open("lib/validate.yaml", 'r') schema = yaml.load(stream) - check = pykwalify_core.Core(source_data=self.config, schema_data=schema) + check = pykwalify_core.Core( + source_data=self.config, schema_data=schema) try: check.validate(raise_exception=True) self.logger.info("Validation successful") @@ -110,12 +118,14 @@ class Tools(object): meta = self.config['elasticsearch']['metadata_files'] for _meta in meta: if not os.path.isfile(_meta['file']): - self.logger.error("Metadata file {} is not present".format(_meta['file'])) + self.logger.error( + "Metadata file {} is not present".format(_meta['file'])) return False return True def gather_metadata(self): - os.putenv("ANSIBLE_SSH_ARGS"," -F {}".format(self.config['ansible']['ssh_config'])) + os.putenv("ANSIBLE_SSH_ARGS", + " -F {}".format(self.config['ansible']['ssh_config'])) ansible_cmd = \ 'ansible-playbook -i {} {}' \ .format(self.config['ansible']['hosts'], self.config['ansible']['metadata']) @@ -126,3 +136,54 @@ class Tools(object): else: self.logger.info("Metadata about cloud has been gathered") return True + + def post_process(self, cli): + workloads = {} + workloads['shaker'] = re.compile("shaker") + workloads['perfkit'] = re.compile("perfkit") + workloads['rally'] = re.compile("(?!perfkit)|(?!shaker)") + """ Iterate through dir structure """ + results = {} + if os.path.isdir(cli.path): + for dirname, dirnames, files in os.walk(cli.path): + self.logger.info("Inspecting : %s" % dirname) + results[dirname] = files + else: + self.logger.error("Path does not exist") + return False + + """ Capture per-workload results """ + workload_results = {} + json = re.compile("\.json") + if len(results) > 0: + for path in results: + for regex in workloads: + if re.findall(workloads[regex], path): + if regex not in workload_results: + workload_results[regex] = [] + for file in results[path]: + if (re.findall(json, file) and + 'result_index-es' not in file): + workload_results[regex].append( + "{}/{}".format(path, file)) + else: + self.logger.error("Results are empty") + return False + + """ Iterate through each workload result, generate ES JSON """ + if len(workload_results) > 0: + for workload in workload_results: + if workload is "rally": + rally = Rally.Rally(self.config) + for file in workload_results[workload]: + errors, results = rally.file_to_json(file) + return True + if cli.es: + # Stub for when we want to push to ES. + continue + if workload is "shaker": + # Stub for Shaker. + continue + if workload is "perfkit": + # Stub for PerfKit. + continue