diff --git a/lib/Elastic.py b/lib/Elastic.py index bfd9141d8..6fdb63309 100644 --- a/lib/Elastic.py +++ b/lib/Elastic.py @@ -10,7 +10,9 @@ # See the License for the specific language governing permissions and # limitations under the License. +from collections import deque import elasticsearch +from elasticsearch import helpers import logging import json import datetime @@ -25,8 +27,12 @@ browbeat_uuid = uuid.uuid4() class Elastic(object): - def __init__(self, config, workload, tool="browbeat"): + def __init__(self, config, workload, tool="browbeat", cache_size=1000, max_cache_time=10): self.config = config + self.cache = deque() + self.max_cache_size = cache_size + self.last_upload = datetime.datetime.utcnow() + self.max_cache_age = datetime.timedelta(minutes=max_cache_time) self.logger = logging.getLogger('browbeat.Elastic') self.es = elasticsearch.Elasticsearch([ {'host': self.config['elasticsearch']['host'], @@ -38,6 +44,9 @@ class Elastic(object): self.index = "{}-{}-{}".format(tool, workload, today.strftime('%Y.%m.%d')) + def __del__(self): + self.flush_cache() + def load_json(self, result): json_data = None self.logger.info("Loading JSON") @@ -74,28 +83,37 @@ class Elastic(object): sys.exit(1) return result - def index_result( - self, - result, - test_name, - result_dir, - identifier='', - _type='result', - _id=None): + # Used to transform the cache dict into a elastic insertable iterable + def cache_insertable_iterable(self): + output = deque() + for item in self.cache: + es_item = {} + es_item['_id'] = item['_id'] + es_item['_source'] = item['result'] + es_item['_type'] = item['_type'] + es_item['_index'] = self.index + output.append(es_item) + return output + + def flush_cache(self): + if len(self.cache) == 0: + return True retry = 2 - result['browbeat_uuid'] = str(browbeat_uuid) - result['cloud_name'] = self.config['browbeat']['cloud_name'] - result['browbeat_config'] = self.config for i in range(retry): try: - self.es.index(index=self.index, - id=_id, - body=result, - doc_type=_type, - refresh=True) - self.logger.debug("Pushed data to Elasticsearch to index {}" - "and browbeat UUID {}". - format(self.index, result['browbeat_uuid'])) + to_upload = helpers.parallel_bulk(self.es, + self.cache_insertable_iterable()) + counter = 0 + num_items = len(self.cache) + for item in to_upload: + self.logger.debug("{} of {} Elastic objects uploaded".format(num_items, + counter)) + output = "Pushed {} items to Elasticsearch to index {}".format(num_items, + self.index) + output += " and browbeat UUID {}".format(str(browbeat_uuid)) + self.logger.info(output) + self.cache = deque() + self.last_upload = datetime.datetime.utcnow() return True except Exception as Err: self.logger.error( @@ -103,17 +121,25 @@ class Elastic(object): " in 10 seconds") self.logger.error("Exception: {}".format(Err)) time.sleep(10) - if i == (retry - 1): - self.logger.error( - "Pushing Data to Elasticsearch failed in spite of retry," - " dumping JSON") - elastic_file = os.path.join( - result_dir, test_name + '-' + identifier + '-elastic' + '.' + 'json') - with open(elastic_file, 'w') as result_file: - json.dump(result, result_file, - indent=4, sort_keys=True) - self.logger.info( - "Saved Elasticsearch consumable result JSON to {}".format(elastic_file)) + if i == (retry-1): + self.logger.error("Pushing Data to Elasticsearch failed in spite of retry," + " dumping JSON for {} cached items".format(len(self.cache))) + for item in self.cache: + filename = item['test_name'] + '-' + item['identifier'] + filename += '-elastic' + '.' + 'json' + elastic_file = os.path.join(item['result_dir'], + filename) + + with open(elastic_file, 'w') as result_file: + json.dump(item['result'], + result_file, + indent=4, + sort_keys=True) + + self.logger.info("Saved Elasticsearch consumable result JSON to {}". + format(elastic_file)) + self.cache = deque() + self.last_upload = datetime.datetime.utcnow() return False def get_software_metadata(self, index, role, browbeat_uuid): @@ -233,3 +259,28 @@ class Elastic(object): return results['hits']['hits'] else: return False + + def index_result(self, + result, + test_name, + result_dir, + identifier='', + _type='result', + _id=None): + data = {} + result['browbeat_uuid'] = str(browbeat_uuid) + result['cloud_name'] = self.config['browbeat']['cloud_name'] + result['browbeat_config'] = self.config + data['result'] = result + data['test_name'] = test_name + data['result_dir'] = result_dir + data['identifier'] = identifier + data['_type'] = _type + data['_id'] = _id + self.cache.append(data) + now = datetime.datetime.utcnow() + if len(self.cache) <= self.max_cache_size \ + and (now - self.last_upload) <= self.max_cache_age: + return True + else: + return self.flush_cache()