Merge "Bulk indexing of all elastic docs"

This commit is contained in:
Jenkins 2017-06-28 18:56:40 +00:00 committed by Gerrit Code Review
commit 9410abeb7b

View File

@ -10,7 +10,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from collections import deque
import elasticsearch
from elasticsearch import helpers
import logging
import json
import datetime
@ -25,8 +27,12 @@ browbeat_uuid = uuid.uuid4()
class Elastic(object):
def __init__(self, config, workload, tool="browbeat"):
def __init__(self, config, workload, tool="browbeat", cache_size=1000, max_cache_time=10):
self.config = config
self.cache = deque()
self.max_cache_size = cache_size
self.last_upload = datetime.datetime.utcnow()
self.max_cache_age = datetime.timedelta(minutes=max_cache_time)
self.logger = logging.getLogger('browbeat.Elastic')
self.es = elasticsearch.Elasticsearch([
{'host': self.config['elasticsearch']['host'],
@ -38,6 +44,9 @@ class Elastic(object):
self.index = "{}-{}-{}".format(tool,
workload, today.strftime('%Y.%m.%d'))
def __del__(self):
self.flush_cache()
def load_json(self, result):
json_data = None
self.logger.info("Loading JSON")
@ -74,28 +83,37 @@ class Elastic(object):
sys.exit(1)
return result
def index_result(
self,
result,
test_name,
result_dir,
identifier='',
_type='result',
_id=None):
# Used to transform the cache dict into a elastic insertable iterable
def cache_insertable_iterable(self):
output = deque()
for item in self.cache:
es_item = {}
es_item['_id'] = item['_id']
es_item['_source'] = item['result']
es_item['_type'] = item['_type']
es_item['_index'] = self.index
output.append(es_item)
return output
def flush_cache(self):
if len(self.cache) == 0:
return True
retry = 2
result['browbeat_uuid'] = str(browbeat_uuid)
result['cloud_name'] = self.config['browbeat']['cloud_name']
result['browbeat_config'] = self.config
for i in range(retry):
try:
self.es.index(index=self.index,
id=_id,
body=result,
doc_type=_type,
refresh=True)
self.logger.debug("Pushed data to Elasticsearch to index {}"
"and browbeat UUID {}".
format(self.index, result['browbeat_uuid']))
to_upload = helpers.parallel_bulk(self.es,
self.cache_insertable_iterable())
counter = 0
num_items = len(self.cache)
for item in to_upload:
self.logger.debug("{} of {} Elastic objects uploaded".format(num_items,
counter))
output = "Pushed {} items to Elasticsearch to index {}".format(num_items,
self.index)
output += " and browbeat UUID {}".format(str(browbeat_uuid))
self.logger.info(output)
self.cache = deque()
self.last_upload = datetime.datetime.utcnow()
return True
except Exception as Err:
self.logger.error(
@ -103,17 +121,25 @@ class Elastic(object):
" in 10 seconds")
self.logger.error("Exception: {}".format(Err))
time.sleep(10)
if i == (retry - 1):
self.logger.error(
"Pushing Data to Elasticsearch failed in spite of retry,"
" dumping JSON")
elastic_file = os.path.join(
result_dir, test_name + '-' + identifier + '-elastic' + '.' + 'json')
with open(elastic_file, 'w') as result_file:
json.dump(result, result_file,
indent=4, sort_keys=True)
self.logger.info(
"Saved Elasticsearch consumable result JSON to {}".format(elastic_file))
if i == (retry-1):
self.logger.error("Pushing Data to Elasticsearch failed in spite of retry,"
" dumping JSON for {} cached items".format(len(self.cache)))
for item in self.cache:
filename = item['test_name'] + '-' + item['identifier']
filename += '-elastic' + '.' + 'json'
elastic_file = os.path.join(item['result_dir'],
filename)
with open(elastic_file, 'w') as result_file:
json.dump(item['result'],
result_file,
indent=4,
sort_keys=True)
self.logger.info("Saved Elasticsearch consumable result JSON to {}".
format(elastic_file))
self.cache = deque()
self.last_upload = datetime.datetime.utcnow()
return False
def get_software_metadata(self, index, role, browbeat_uuid):
@ -233,3 +259,28 @@ class Elastic(object):
return results['hits']['hits']
else:
return False
def index_result(self,
result,
test_name,
result_dir,
identifier='',
_type='result',
_id=None):
data = {}
result['browbeat_uuid'] = str(browbeat_uuid)
result['cloud_name'] = self.config['browbeat']['cloud_name']
result['browbeat_config'] = self.config
data['result'] = result
data['test_name'] = test_name
data['result_dir'] = result_dir
data['identifier'] = identifier
data['_type'] = _type
data['_id'] = _id
self.cache.append(data)
now = datetime.datetime.utcnow()
if len(self.cache) <= self.max_cache_size \
and (now - self.last_upload) <= self.max_cache_age:
return True
else:
return self.flush_cache()