Fix Elastic indexing of Rally

Switching how we index
+ Switching to indexing metadata with every result
+ Break iterations into its own field
+ Add Rally details to result

Change-Id: Ia70a3756fbf3bb3f5f393302a27fcf1cc2e75e8d
This commit is contained in:
Joe Talerico 2016-05-20 10:03:15 -04:00 committed by Joe
parent 1421842c1f
commit 9190701d93
2 changed files with 38 additions and 44 deletions

View File

@ -3,6 +3,7 @@ import logging
import json
import pprint
import numpy
import datetime
class Elastic:
"""
@ -15,7 +16,8 @@ class Elastic:
'port': self.config['elasticsearch']['port']}],
send_get_body_as='POST'
)
self.index = tool
today = datetime.datetime.today()
self.index = "{}-{}".format(tool,today.strftime('%Y.%m.%d'))
"""
"""
@ -54,10 +56,10 @@ class Elastic:
"""
"""
def index_result(self,result,_id=None) :
def index_result(self,result,_type='result',_id=None) :
return self.es.index(index=self.index,
id=_id,
body=result,
doc_type='result',
doc_type=_type,
refresh=True
)

View File

@ -13,6 +13,7 @@ import os
import shutil
import subprocess
import time
import re
class Rally(WorkloadBase):
@ -110,12 +111,13 @@ class Rally(WorkloadBase):
result['rally_metadata'] = meta
return result
def json_result(self,task_id):
def json_result(self,task_id,scenario_name):
rally_data = {}
rally_errors = []
rally_sla = []
self.logger.info("Loadding Task_ID {} JSON".format(task_id))
rally_json = self.elastic.load_json(self.gen_scenario_json(task_id))
es_ts = datetime.datetime.utcnow()
if len(rally_json) < 1 :
self.logger.error("Issue with Rally Results")
return False
@ -128,30 +130,43 @@ class Rally(WorkloadBase):
rally_data[value] = []
rally_data[value].append(metrics[workload][value])
if len(metrics['error']) > 0 :
rally_errors.append({'action_name': value,
'error': metrics['error']})
error = {'action_name': value,
'error': metrics['error'],
'result': task_id,
'timestamp': es_ts,
'scenario' : scenario_name,
}
self.elastic.index_result(error,'config')
rally_doc = []
for workload in rally_data:
if not type(rally_data[workload]) is dict :
rally_stats = {'action': workload,
'90th':numpy.percentile(rally_data[workload], 90),
'95th':numpy.percentile(rally_data[workload], 95),
'Max':numpy.max(rally_data[workload]),
'Min':numpy.min(rally_data[workload]),
'Average':numpy.average(rally_data[workload]),
'Median':numpy.median(rally_data[workload]),
'Raw':rally_data[workload]}
rally_doc.append(rally_stats)
return {'rally_stats' : rally_doc,
'rally_errors' : rally_errors,
'rally_setup' : rally_json[0]['key']}
iteration = 1
workload_name = workload
if workload.find('(') is not -1 :
iteration = re.findall('\d+',workload)
workload_name = workload.split('(')[0]
rally_stats = {'result': task_id,
'action': workload_name,
'iteration': iteration,
#'90th':numpy.percentile(rally_data[workload], 90),
#'95th':numpy.percentile(rally_data[workload], 95),
#'max':numpy.max(rally_data[workload]),
#'min':numpy.min(rally_data[workload]),
#'average':numpy.average(rally_data[workload]),
#'median':numpy.median(rally_data[workload]),
'timestamp': es_ts,
'scenario' : scenario_name,
'rally_setup': rally_json[0]['key'],
'raw':rally_data[workload]}
result = self.elastic.combine_metadata(rally_stats)
self.elastic.index_result(result)
return True
def start_workloads(self):
"""Iterates through all rally scenarios in browbeat yaml config file"""
results = OrderedDict()
self.logger.info("Starting Rally workloads")
es_ts = datetime.datetime.now()
es_ts = datetime.datetime.utcnow()
dir_ts = es_ts.strftime("%Y%m%d-%H%M%S")
self.logger.debug("Time Stamp (Prefix): {}".format(dir_ts))
benchmarks = self.config.get('rally')['benchmarks']
@ -256,30 +271,7 @@ class Rally(WorkloadBase):
workload, "pass")
if self.config['elasticsearch']['enabled'] :
# Start indexing
result_json = self.json_result(task_id)
_meta = {'taskid' : task_id,
'timestamp': es_ts,
'workload' : {
'name' : benchmark['name'],
'scenario' : scenario_name,
'times' : scenario['times'],
'concurrency' : scenario['concurrency']},
'grafana': self.grafana.grafana_urls()
}
if result_json :
result = self.elastic.combine_metadata(
self.rally_metadata(result_json,_meta))
if result is False :
self.logger.error
("Error with ElasticSerach connector")
else :
if len(result) < 1 :
self.logger.error(
"Issue with ElasticSearch Data, \
for task_id {}".format(task_id))
else :
self.elastic.index_result(result,
_id=task_id)
result_json = self.json_result(task_id,scenario_name)
else:
self.logger.error("Cannot find task_id")
self.update_fail_tests()