Increase Persister Performance

The main improvement comes from using the Influxdb Line Protocol. The
encoding methods in line_utils.py are like the ones used in the influxdb
client but optimized for our data

Additional improvement comes from avoiding calls to encode('utf8') as
the influxdb client already does that.

On my test system, these changes increased the number of measurements
processed from about 2200/second to about 3700/second. Measurement
processing time is now dominated by Kafka. Approximately, 35% of time
is spent reading from Kafka and approximately 22% of time is committing
offsets. Only 10% of the time is spent writing to Influxdb. About 30% of
the time is spent converting messages from the json string read from
Kafka into the Line Protocol format for Influxdb.

Once monasca-common is modified to use the faster kafka library,
performance should be even better.

I did try using ujson, but my tests showed it wasn't any faster than
the json package.

Change-Id: I2acf76d9a5f583c74a272e18350b9c0ad5883f95
This commit is contained in:
Craig Bryant 2017-02-08 18:31:40 -07:00
parent 23c3684a82
commit a7112fd30b
6 changed files with 140 additions and 55 deletions

View File

@ -1,4 +1,4 @@
# (C) Copyright 2016 Hewlett Packard Enterprise Development Company LP # (C) Copyright 2016-2017 Hewlett Packard Enterprise Development LP
# #
# Licensed under the Apache License, Version 2.0 (the "License"); # Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License. # you may not use this file except in compliance with the License.
@ -34,4 +34,4 @@ class AbstractInfluxdbRepository(abstract_repository.AbstractRepository):
self.conf.influxdb.database_name) self.conf.influxdb.database_name)
def write_batch(self, data_points): def write_batch(self, data_points):
self._influxdb_client.write_points(data_points, 'ms') self._influxdb_client.write_points(data_points, 'ms', protocol='line')

View File

@ -1,4 +1,4 @@
# (C) Copyright 2016 Hewlett Packard Enterprise Development Company LP # (C) Copyright 2016-2017 Hewlett Packard Enterprise Development LP
# #
# Licensed under the Apache License, Version 2.0 (the "License"); # Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License. # you may not use this file except in compliance with the License.
@ -12,13 +12,12 @@
# implied. # implied.
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
from datetime import datetime
import json import json
from oslo_log import log from oslo_log import log
import pytz
from monasca_persister.repositories.influxdb import abstract_repository from monasca_persister.repositories.influxdb import abstract_repository
from monasca_persister.repositories.influxdb import line_utils
from monasca_persister.repositories.utils import parse_alarm_state_hist_message from monasca_persister.repositories.utils import parse_alarm_state_hist_message
LOG = log.getLogger(__name__) LOG = log.getLogger(__name__)
@ -39,28 +38,27 @@ class AlarmStateHistInfluxdbRepository(
time_stamp) = parse_alarm_state_hist_message( time_stamp) = parse_alarm_state_hist_message(
message) message)
ts = time_stamp / 1000.0 name = u'alarm_state_history'
fields = []
fields.append(u'tenant_id=' + line_utils.escape_value(tenant_id))
fields.append(u'alarm_id=' + line_utils.escape_value(alarm_id))
fields.append(u'metrics=' + line_utils.escape_value(
json.dumps(metrics, ensure_ascii=False)))
fields.append(u'new_state=' + line_utils.escape_value(new_state))
fields.append(u'old_state=' + line_utils.escape_value(old_state))
fields.append(u'link=' + line_utils.escape_value(link))
fields.append(u'lifecycle_state=' + line_utils.escape_value(
lifecycle_state))
fields.append(u'reason=' + line_utils.escape_value(
state_change_reason))
fields.append(u'reason_data=' + line_utils.escape_value("{}"))
fields.append(u'sub_alarms=' + line_utils.escape_value(
sub_alarms_json_snake_case))
data = {"measurement": 'alarm_state_history', line = name + u',tenant_id=' + line_utils.escape_tag(tenant_id)
"time": datetime.fromtimestamp(ts, tz=pytz.utc).strftime( line += u' ' + u','.join(fields)
'%Y-%m-%dT%H:%M:%S.%fZ'), line += u' ' + str(int(time_stamp))
"fields": {
"tenant_id": tenant_id.encode('utf8'),
"alarm_id": alarm_id.encode('utf8'),
"metrics": json.dumps(metrics, ensure_ascii=False).encode(
'utf8'),
"new_state": new_state.encode('utf8'),
"old_state": old_state.encode('utf8'),
"link": link.encode('utf8'),
"lifecycle_state": lifecycle_state.encode('utf8'),
"reason": state_change_reason.encode('utf8'),
"reason_data": "{}".encode('utf8'),
"sub_alarms": sub_alarms_json_snake_case.encode('utf8')
},
"tags": {
"tenant_id": tenant_id.encode('utf8')
}}
LOG.debug(data) LOG.debug(line)
return data return line

View File

@ -0,0 +1,46 @@
# (C) Copyright 2017 Hewlett Packard Enterprise Development LP
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from six import PY2
def escape_tag(tag):
tag = get_unicode(tag)
return tag.replace(
u"\\", u"\\\\"
).replace(
u" ", u"\\ "
).replace(
u",", u"\\,"
).replace(
u"=", u"\\="
)
def get_unicode(data):
if PY2:
return unicode(data)
else:
return str(data)
def escape_value(value):
return u"\"{0}\"".format(
get_unicode(value).replace(
u"\\", u"\\\\"
).replace(
u"\"", u"\\\""
).replace(
u"\n", u"\\n"
)
)

View File

@ -1,4 +1,4 @@
# (C) Copyright 2016 Hewlett Packard Enterprise Development Company LP # (C) Copyright 2016-2017 Hewlett Packard Enterprise Development LP
# #
# Licensed under the Apache License, Version 2.0 (the "License"); # Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License. # you may not use this file except in compliance with the License.
@ -12,13 +12,12 @@
# implied. # implied.
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
from datetime import datetime
import json import json
from oslo_log import log from oslo_log import log
import pytz
from monasca_persister.repositories.influxdb import abstract_repository from monasca_persister.repositories.influxdb import abstract_repository
from monasca_persister.repositories.influxdb import line_utils
from monasca_persister.repositories.utils import parse_measurement_message from monasca_persister.repositories.utils import parse_measurement_message
LOG = log.getLogger(__name__) LOG = log.getLogger(__name__)
@ -36,20 +35,27 @@ class MetricInfluxdbRepository(abstract_repository.AbstractInfluxdbRepository):
value_meta) = parse_measurement_message(message) value_meta) = parse_measurement_message(message)
tags = dimensions tags = dimensions
tags['_tenant_id'] = tenant_id.encode('utf8') tags[u'_tenant_id'] = tenant_id
tags['_region'] = region.encode('utf8') tags[u'_region'] = region
ts = time_stamp / 1000.0 if not value_meta:
value_meta_str = u'"{}"'
else:
value_meta_str = line_utils.escape_value(json.dumps(value_meta, ensure_ascii=False))
data = {"measurement": metric_name.encode('utf8'), key_values = [line_utils.escape_tag(metric_name)]
"time": datetime.fromtimestamp(ts, tz=pytz.utc).strftime(
'%Y-%m-%dT%H:%M:%S.%fZ'), # tags should be sorted client-side to take load off server
"fields": { for key in sorted(tags.keys()):
"value": value, key_tag = line_utils.escape_tag(key)
"value_meta": json.dumps(value_meta, value_tag = line_utils.escape_tag(tags[key])
ensure_ascii=False).encode('utf8') key_values.append(key_tag + u'=' + value_tag)
}, key_values = u','.join(key_values)
"tags": tags}
value_field = u'value={}'.format(value)
value_meta_field = u'value_meta=' + value_meta_str
data = key_values + u' ' + value_field + u',' + value_meta_field + u' ' + str(int(time_stamp))
LOG.debug(data) LOG.debug(data)

View File

@ -1,4 +1,4 @@
# (C) Copyright 2016 Hewlett Packard Enterprise Development Company LP # (C) Copyright 2016-2017 Hewlett Packard Enterprise Development LP
# #
# Licensed under the Apache License, Version 2.0 (the "License"); # Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License. # you may not use this file except in compliance with the License.
@ -27,24 +27,17 @@ def parse_measurement_message(message):
tenant_id = decoded_message['meta']['tenantId'] tenant_id = decoded_message['meta']['tenantId']
dimensions = {}
if 'dimensions' in metric:
for dimension_name in metric['dimensions']:
dimensions[dimension_name.encode('utf8')] = (
metric['dimensions'][dimension_name].encode('utf8'))
time_stamp = metric['timestamp'] time_stamp = metric['timestamp']
value = float(metric['value']) value = float(metric['value'])
if 'value_meta' in metric and metric['value_meta']: value_meta = metric.get('value_meta', {})
value_meta = metric['value_meta'] if 'value_meta' is None:
# Ensure value_meta is a dict
else:
value_meta = {} value_meta = {}
return (dimensions, metric_name, region, tenant_id, time_stamp, value, return (metric.get('dimensions', {}), metric_name, region, tenant_id,
value_meta) time_stamp, value, value_meta)
def parse_alarm_state_hist_message(message): def parse_alarm_state_hist_message(message):

View File

@ -0,0 +1,42 @@
# -*- coding: utf-8 -*-
# (C) Copyright 2017 Hewlett Packard Enterprise Development LP
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from oslotest import base
from monasca_persister.repositories.influxdb import line_utils
class TestInfluxdb(base.BaseTestCase):
def setUp(self):
super(TestInfluxdb, self).setUp()
def tearDown(self):
super(TestInfluxdb, self).tearDown()
def test_line_utils_handles_utf8(self):
utf8_name = u''
self.assertEqual(u'"' + utf8_name + u'"', line_utils.escape_value(utf8_name))
self.assertEqual(utf8_name, line_utils.escape_tag(utf8_name))
def test_line_utils_escape_tag(self):
simple = u"aaaaa"
self.assertEqual(simple, line_utils.escape_tag(simple))
complex = u"a\\ b,c="
self.assertEqual("a\\\\\\ b\\,c\\=", line_utils.escape_tag(complex))
def test_line_utils_escape_value(self):
simple = u"aaaaa"
self.assertEqual(u'"' + simple + u'"', line_utils.escape_value(simple))
complex = u"a\\b\"\n"
self.assertEqual(u"\"a\\\\b\\\"\\n\"", line_utils.escape_value(complex))