[HBase] get_resource optimization
In this change only Resource table is scanned during get_resources(). It becomes possible because of the following: 1. Store all meters for each resource. 2. QualifierFilter is used when timestamp filter is needed. 3. To make metadata up-to date in case of unusual order (old comes earlier then new) we use versioning. So during 'put' it's possible to set timestamp and the newest data will be automatically 'on the top' Implements blueprint hbase-resource-rowkey-enhancement Change-Id: Ie189531e6425af3acfeae5636d8b93b102f319d1
This commit is contained in:
parent
1c4f604f96
commit
a0667fe040
@ -20,12 +20,13 @@
|
|||||||
import copy
|
import copy
|
||||||
import datetime
|
import datetime
|
||||||
import hashlib
|
import hashlib
|
||||||
import itertools
|
|
||||||
import json
|
import json
|
||||||
import operator
|
import operator
|
||||||
import os
|
import os
|
||||||
import re
|
import re
|
||||||
|
import six
|
||||||
import six.moves.urllib.parse as urlparse
|
import six.moves.urllib.parse as urlparse
|
||||||
|
import time
|
||||||
|
|
||||||
import bson.json_util
|
import bson.json_util
|
||||||
import happybase
|
import happybase
|
||||||
@ -34,7 +35,6 @@ from ceilometer.openstack.common.gettextutils import _
|
|||||||
from ceilometer.openstack.common import log
|
from ceilometer.openstack.common import log
|
||||||
from ceilometer.openstack.common import network_utils
|
from ceilometer.openstack.common import network_utils
|
||||||
from ceilometer.openstack.common import timeutils
|
from ceilometer.openstack.common import timeutils
|
||||||
from ceilometer import storage
|
|
||||||
from ceilometer.storage import base
|
from ceilometer.storage import base
|
||||||
from ceilometer.storage import models
|
from ceilometer.storage import models
|
||||||
from ceilometer import utils
|
from ceilometer import utils
|
||||||
@ -107,8 +107,8 @@ class Connection(base.Connection):
|
|||||||
f:r_metadata.display_name or f:r_metadata.tag
|
f:r_metadata.display_name or f:r_metadata.tag
|
||||||
-sources for all corresponding meters with prefix 's'
|
-sources for all corresponding meters with prefix 's'
|
||||||
-all meters for this resource in format
|
-all meters for this resource in format
|
||||||
"%s!%s!%s+%s" % (counter_name, counter_type, counter_unit,
|
"%s+%s+%s!%s!%s" % (rts, source, counter_name, counter_type,
|
||||||
source)
|
counter_unit)
|
||||||
|
|
||||||
- alarm
|
- alarm
|
||||||
- row_key: uuid of alarm
|
- row_key: uuid of alarm
|
||||||
@ -306,16 +306,25 @@ class Connection(base.Connection):
|
|||||||
|
|
||||||
resource_metadata = data.get('resource_metadata', {})
|
resource_metadata = data.get('resource_metadata', {})
|
||||||
# Determine the name of new meter
|
# Determine the name of new meter
|
||||||
|
rts = timestamp(data['timestamp'])
|
||||||
new_meter = _format_meter_reference(
|
new_meter = _format_meter_reference(
|
||||||
data['counter_name'], data['counter_type'],
|
data['counter_name'], data['counter_type'],
|
||||||
data['counter_unit'], data['source'])
|
data['counter_unit'], rts, data['source'])
|
||||||
|
|
||||||
#TODO(nprivalova): try not to store resource_id
|
#TODO(nprivalova): try not to store resource_id
|
||||||
resource = serialize_entry(**{
|
resource = serialize_entry(**{
|
||||||
'source': data['source'], 'meter': new_meter,
|
'source': data['source'],
|
||||||
|
'meter': {new_meter: data['timestamp']},
|
||||||
'resource_metadata': resource_metadata,
|
'resource_metadata': resource_metadata,
|
||||||
'resource_id': data['resource_id'],
|
'resource_id': data['resource_id'],
|
||||||
'project_id': data['project_id'], 'user_id': data['user_id']})
|
'project_id': data['project_id'], 'user_id': data['user_id']})
|
||||||
resource_table.put(data['resource_id'], resource)
|
# Here we put entry in HBase with our own timestamp. This is needed
|
||||||
|
# when samples arrive out-of-order
|
||||||
|
# If we use timestamp=data['timestamp'] the newest data will be
|
||||||
|
# automatically 'on the top'. It is needed to keep metadata
|
||||||
|
# up-to-date: metadata from newest samples is considered as actual.
|
||||||
|
ts = int(time.mktime(data['timestamp'].timetuple()) * 1000)
|
||||||
|
resource_table.put(data['resource_id'], resource, ts)
|
||||||
|
|
||||||
#TODO(nprivalova): improve uniqueness
|
#TODO(nprivalova): improve uniqueness
|
||||||
# Rowkey consists of reversed timestamp, meter and an md5 of
|
# Rowkey consists of reversed timestamp, meter and an md5 of
|
||||||
@ -323,10 +332,6 @@ class Connection(base.Connection):
|
|||||||
m = hashlib.md5()
|
m = hashlib.md5()
|
||||||
m.update("%s%s%s" % (data['user_id'], data['resource_id'],
|
m.update("%s%s%s" % (data['user_id'], data['resource_id'],
|
||||||
data['project_id']))
|
data['project_id']))
|
||||||
|
|
||||||
# We use reverse timestamps in rowkeys as they are sorted
|
|
||||||
# alphabetically.
|
|
||||||
rts = timestamp(data['timestamp'])
|
|
||||||
row = "%s_%d_%s" % (data['counter_name'], rts, m.hexdigest())
|
row = "%s_%d_%s" % (data['counter_name'], rts, m.hexdigest())
|
||||||
record = serialize_entry(data, **{'source': data['source'],
|
record = serialize_entry(data, **{'source': data['source'],
|
||||||
'rts': rts,
|
'rts': rts,
|
||||||
@ -355,47 +360,38 @@ class Connection(base.Connection):
|
|||||||
if pagination:
|
if pagination:
|
||||||
raise NotImplementedError('Pagination not implemented')
|
raise NotImplementedError('Pagination not implemented')
|
||||||
|
|
||||||
metaquery = metaquery or {}
|
q = make_query(metaquery=metaquery, user_id=user, project_id=project,
|
||||||
|
resource_id=resource, source=source)
|
||||||
sample_filter = storage.SampleFilter(
|
q = make_meter_query_for_resource(start_timestamp, start_timestamp_op,
|
||||||
user=user, project=project,
|
end_timestamp, end_timestamp_op,
|
||||||
start=start_timestamp, start_timestamp_op=start_timestamp_op,
|
source, q)
|
||||||
end=end_timestamp, end_timestamp_op=end_timestamp_op,
|
|
||||||
resource=resource, source=source, metaquery=metaquery)
|
|
||||||
q, start_row, stop_row, __ = make_sample_query_from_filter(
|
|
||||||
sample_filter, require_meter=False)
|
|
||||||
with self.conn_pool.connection() as conn:
|
with self.conn_pool.connection() as conn:
|
||||||
meter_table = conn.table(self.METER_TABLE)
|
resource_table = conn.table(self.RESOURCE_TABLE)
|
||||||
LOG.debug(_("Query Meter table: %s") % q)
|
LOG.debug(_("Query Resource table: %s") % q)
|
||||||
meters = meter_table.scan(filter=q, row_start=start_row,
|
for resource_id, data in resource_table.scan(filter=q):
|
||||||
row_stop=stop_row)
|
f_res, sources, meters, md = deserialize_entry(data)
|
||||||
d_meters = []
|
# Unfortunately happybase doesn't keep ordered result from
|
||||||
for i, m in meters:
|
# HBase. So that's why it's needed to find min and max
|
||||||
d_meters.append(deserialize_entry(m))
|
# manually
|
||||||
|
first_ts = min(meters, key=operator.itemgetter(1))[1]
|
||||||
# We have to sort on resource_id before we can group by it.
|
last_ts = max(meters, key=operator.itemgetter(1))[1]
|
||||||
# According to the itertools documentation a new group is
|
source = meters[0][0].split('+')[1]
|
||||||
# generated when the value of the key function changes
|
# If we use QualifierFilter then HBase returnes only
|
||||||
# (it breaks there).
|
# qualifiers filtered by. It will not return the whole entry.
|
||||||
meters = sorted(d_meters, key=_resource_id_from_record_tuple)
|
# That's why if we need to ask additional qualifiers manually.
|
||||||
for resource_id, r_meters in itertools.groupby(
|
if 'project_id' not in f_res and 'user_id' not in f_res:
|
||||||
meters, key=_resource_id_from_record_tuple):
|
row = resource_table.row(
|
||||||
# We need deserialized entry(data[0]), sources (data[1]) and
|
resource_id, columns=['f:project_id', 'f:user_id',
|
||||||
# metadata(data[3])
|
'f:resource_metadata'])
|
||||||
meter_rows = [(data[0], data[1], data[3]) for data in sorted(
|
f_res, _s, _m, md = deserialize_entry(row)
|
||||||
r_meters, key=_timestamp_from_record_tuple)]
|
|
||||||
latest_data = meter_rows[-1]
|
|
||||||
min_ts = meter_rows[0][0]['timestamp']
|
|
||||||
max_ts = latest_data[0]['timestamp']
|
|
||||||
yield models.Resource(
|
yield models.Resource(
|
||||||
resource_id=resource_id,
|
resource_id=resource_id,
|
||||||
first_sample_timestamp=min_ts,
|
first_sample_timestamp=first_ts,
|
||||||
last_sample_timestamp=max_ts,
|
last_sample_timestamp=last_ts,
|
||||||
project_id=latest_data[0]['project_id'],
|
project_id=f_res['project_id'],
|
||||||
source=latest_data[1][0],
|
source=source,
|
||||||
user_id=latest_data[0]['user_id'],
|
user_id=f_res['user_id'],
|
||||||
metadata=latest_data[2],
|
metadata=md)
|
||||||
)
|
|
||||||
|
|
||||||
def get_meters(self, user=None, project=None, resource=None, source=None,
|
def get_meters(self, user=None, project=None, resource=None, source=None,
|
||||||
metaquery=None, pagination=None):
|
metaquery=None, pagination=None):
|
||||||
@ -428,8 +424,8 @@ class Connection(base.Connection):
|
|||||||
for ignored, data in gen:
|
for ignored, data in gen:
|
||||||
flatten_result, s, meters, md = deserialize_entry(data)
|
flatten_result, s, meters, md = deserialize_entry(data)
|
||||||
for m in meters:
|
for m in meters:
|
||||||
meter_raw, m_source = m.split("+")
|
_m_rts, m_source, m_raw = m[0].split("+")
|
||||||
name, type, unit = meter_raw.split('!')
|
name, type, unit = m_raw.split('!')
|
||||||
meter_dict = {'name': name,
|
meter_dict = {'name': name,
|
||||||
'type': type,
|
'type': type,
|
||||||
'unit': unit,
|
'unit': unit,
|
||||||
@ -451,13 +447,15 @@ class Connection(base.Connection):
|
|||||||
:param sample_filter: Filter.
|
:param sample_filter: Filter.
|
||||||
:param limit: Maximum number of results to return.
|
:param limit: Maximum number of results to return.
|
||||||
"""
|
"""
|
||||||
|
if limit == 0:
|
||||||
|
return
|
||||||
with self.conn_pool.connection() as conn:
|
with self.conn_pool.connection() as conn:
|
||||||
meter_table = conn.table(self.METER_TABLE)
|
meter_table = conn.table(self.METER_TABLE)
|
||||||
q, start, stop, columns = make_sample_query_from_filter(
|
q, start, stop, columns = make_sample_query_from_filter(
|
||||||
sample_filter, require_meter=False)
|
sample_filter, require_meter=False)
|
||||||
LOG.debug(_("Query Meter Table: %s") % q)
|
LOG.debug(_("Query Meter Table: %s") % q)
|
||||||
gen = meter_table.scan(filter=q, row_start=start, row_stop=stop,
|
gen = meter_table.scan(filter=q, row_start=start, row_stop=stop,
|
||||||
columns=columns, limit=limit)
|
limit=limit)
|
||||||
for ignored, meter in gen:
|
for ignored, meter in gen:
|
||||||
d_meter = deserialize_entry(meter)[0]
|
d_meter = deserialize_entry(meter)[0]
|
||||||
d_meter['message']['recorded_at'] = d_meter['recorded_at']
|
d_meter['message']['recorded_at'] = d_meter['recorded_at']
|
||||||
@ -705,6 +703,10 @@ class Connection(base.Connection):
|
|||||||
yield trait
|
yield trait
|
||||||
|
|
||||||
|
|
||||||
|
def _QualifierFilter(op, qualifier):
|
||||||
|
return "QualifierFilter (%s, 'binaryprefix:m_%s')" % (op, qualifier)
|
||||||
|
|
||||||
|
|
||||||
###############
|
###############
|
||||||
# This is a very crude version of "in-memory HBase", which implements just
|
# This is a very crude version of "in-memory HBase", which implements just
|
||||||
# enough functionality of HappyBase API to support testing of our driver.
|
# enough functionality of HappyBase API to support testing of our driver.
|
||||||
@ -715,27 +717,56 @@ class MTable(object):
|
|||||||
def __init__(self, name, families):
|
def __init__(self, name, families):
|
||||||
self.name = name
|
self.name = name
|
||||||
self.families = families
|
self.families = families
|
||||||
self._rows = {}
|
self._rows_with_ts = {}
|
||||||
|
|
||||||
def row(self, key):
|
def row(self, key, columns=None):
|
||||||
return self._rows.get(key, {})
|
if key not in self._rows_with_ts:
|
||||||
|
return {}
|
||||||
|
res = copy.copy(sorted(six.iteritems(
|
||||||
|
self._rows_with_ts.get(key)))[-1][1])
|
||||||
|
if columns:
|
||||||
|
keys = res.keys()
|
||||||
|
for key in keys:
|
||||||
|
if key not in columns:
|
||||||
|
res.pop(key)
|
||||||
|
return res
|
||||||
|
|
||||||
def rows(self, keys):
|
def rows(self, keys):
|
||||||
return ((k, self.row(k)) for k in keys)
|
return ((k, self.row(k)) for k in keys)
|
||||||
|
|
||||||
def put(self, key, data):
|
def put(self, key, data, ts=None):
|
||||||
if key not in self._rows:
|
# Note: Now we use 'timestamped' but only for one Resource table.
|
||||||
self._rows[key] = data
|
# That's why we may put ts='0' in case when ts is None. If it is
|
||||||
|
# needed to use 2 types of put in one table ts=0 cannot be used.
|
||||||
|
if ts is None:
|
||||||
|
ts = "0"
|
||||||
|
if key not in self._rows_with_ts:
|
||||||
|
self._rows_with_ts[key] = {ts: data}
|
||||||
else:
|
else:
|
||||||
self._rows[key].update(data)
|
if ts in self._rows_with_ts[key]:
|
||||||
|
self._rows_with_ts[key][ts].update(data)
|
||||||
|
else:
|
||||||
|
self._rows_with_ts[key].update({ts: data})
|
||||||
|
|
||||||
def delete(self, key):
|
def delete(self, key):
|
||||||
del self._rows[key]
|
del self._rows_with_ts[key]
|
||||||
|
|
||||||
|
def _get_latest_dict(self, row):
|
||||||
|
# The idea here is to return latest versions of columns.
|
||||||
|
# In _rows_with_ts we store {row: {ts_1: {data}, ts_2: {data}}}.
|
||||||
|
# res will contain a list of tuples [(ts_1, {data}), (ts_2, {data})]
|
||||||
|
# sorted by ts, i.e. in this list ts_2 is the most latest.
|
||||||
|
# To get result as HBase provides we should iterate in reverse order
|
||||||
|
# and get from "latest" data only key-values that are not in newer data
|
||||||
|
data = {}
|
||||||
|
for i in sorted(six.iteritems(self._rows_with_ts[row])):
|
||||||
|
data.update(i[1])
|
||||||
|
return data
|
||||||
|
|
||||||
def scan(self, filter=None, columns=None, row_start=None, row_stop=None,
|
def scan(self, filter=None, columns=None, row_start=None, row_stop=None,
|
||||||
limit=None):
|
limit=None):
|
||||||
columns = columns or []
|
columns = columns or []
|
||||||
sorted_keys = sorted(self._rows)
|
sorted_keys = sorted(self._rows_with_ts)
|
||||||
# copy data between row_start and row_stop into a dict
|
# copy data between row_start and row_stop into a dict
|
||||||
rows = {}
|
rows = {}
|
||||||
for row in sorted_keys:
|
for row in sorted_keys:
|
||||||
@ -743,11 +774,11 @@ class MTable(object):
|
|||||||
continue
|
continue
|
||||||
if row_stop and row > row_stop:
|
if row_stop and row > row_stop:
|
||||||
break
|
break
|
||||||
rows[row] = copy.copy(self._rows[row])
|
rows[row] = self._get_latest_dict(row)
|
||||||
|
|
||||||
if columns:
|
if columns:
|
||||||
ret = {}
|
ret = {}
|
||||||
for row in rows.keys():
|
for row, data in six.iteritems(rows):
|
||||||
data = rows[row]
|
|
||||||
for key in data:
|
for key in data:
|
||||||
if key in columns:
|
if key in columns:
|
||||||
ret[row] = data
|
ret[row] = data
|
||||||
@ -852,6 +883,33 @@ class MTable(object):
|
|||||||
pass
|
pass
|
||||||
return r
|
return r
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def QualifierFilter(args, rows):
|
||||||
|
"""This method is called from scan() when 'QualifierFilter'
|
||||||
|
is found in the 'filter' argument
|
||||||
|
"""
|
||||||
|
op = args[0]
|
||||||
|
value = args[1]
|
||||||
|
if value.startswith('binaryprefix:'):
|
||||||
|
value = value[len('binaryprefix:'):]
|
||||||
|
column = 'f:' + value
|
||||||
|
r = {}
|
||||||
|
for row in rows:
|
||||||
|
data = rows[row]
|
||||||
|
r_data = {}
|
||||||
|
for key in data:
|
||||||
|
if (op == '=' and key.startswith(column)) or \
|
||||||
|
(op == '>=' and key >= column) or \
|
||||||
|
(op == '<=' and key <= column):
|
||||||
|
r_data[key] = data[key]
|
||||||
|
else:
|
||||||
|
raise NotImplementedError("In-memory QualifierFilter "
|
||||||
|
"doesn't support the %s "
|
||||||
|
"operation yet" % op)
|
||||||
|
if r_data:
|
||||||
|
r[row] = r_data
|
||||||
|
return r
|
||||||
|
|
||||||
|
|
||||||
class MConnectionPool(object):
|
class MConnectionPool(object):
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
@ -896,14 +954,14 @@ class MConnection(object):
|
|||||||
def timestamp(dt, reverse=True):
|
def timestamp(dt, reverse=True):
|
||||||
"""Timestamp is count of milliseconds since start of epoch.
|
"""Timestamp is count of milliseconds since start of epoch.
|
||||||
|
|
||||||
Timestamps is a technique used in HBase rowkey design. When period
|
If reverse=True then timestamp will be reversed. Such a technique is used
|
||||||
queries are required the HBase rowkeys must include timestamps, but as
|
in HBase rowkey design when period queries are required. Because of the
|
||||||
rowkeys in HBase are ordered lexicographically.
|
fact that rows are sorted lexicographically it's possible to vary whether
|
||||||
|
the 'oldest' entries will be on top of the table or it should be the newest
|
||||||
|
ones (reversed timestamp case).
|
||||||
|
|
||||||
Same for the reversed timestamps, but the order will be opposite.
|
:param: dt: datetime which is translated to timestamp
|
||||||
|
:param: reverse: a boolean parameter for reverse or straight count of
|
||||||
:param: dt: datetime which is translated to the (reversed or not) timestamp
|
|
||||||
:param: reverse: is a boolean parameter for reverse or straight count of
|
|
||||||
timestamp in milliseconds
|
timestamp in milliseconds
|
||||||
:return count or reversed count of milliseconds since start of epoch
|
:return count or reversed count of milliseconds since start of epoch
|
||||||
"""
|
"""
|
||||||
@ -984,7 +1042,7 @@ def get_start_end_rts(start, start_op, end, end_op):
|
|||||||
rts_start = str(timestamp(start) + 1) if start else ""
|
rts_start = str(timestamp(start) + 1) if start else ""
|
||||||
rts_end = str(timestamp(end) + 1) if end else ""
|
rts_end = str(timestamp(end) + 1) if end else ""
|
||||||
|
|
||||||
#By default, we are using ge for lower bound and lt for upper bound
|
# By default, we are using ge for lower bound and lt for upper bound
|
||||||
if start_op == 'gt':
|
if start_op == 'gt':
|
||||||
rts_start = str(long(rts_start) - 2)
|
rts_start = str(long(rts_start) - 2)
|
||||||
if end_op == 'le':
|
if end_op == 'le':
|
||||||
@ -1004,9 +1062,9 @@ def make_query(metaquery=None, trait_query=None, **kwargs):
|
|||||||
q = []
|
q = []
|
||||||
res_q = None
|
res_q = None
|
||||||
|
|
||||||
# Query for traits if a little differ from others it is constructed with
|
# Query for traits differs from others. It is constructed with
|
||||||
# SingleColumnValueFilter with the possibility to choose an operator for
|
# SingleColumnValueFilter with the possibility to choose comparision
|
||||||
# value
|
# operator
|
||||||
if trait_query:
|
if trait_query:
|
||||||
trait_name = kwargs.pop('key')
|
trait_name = kwargs.pop('key')
|
||||||
op = kwargs.pop('op', 'eq')
|
op = kwargs.pop('op', 'eq')
|
||||||
@ -1103,6 +1161,43 @@ def make_sample_query_from_filter(sample_filter, require_meter=True):
|
|||||||
return res_q, start_row, end_row, columns
|
return res_q, start_row, end_row, columns
|
||||||
|
|
||||||
|
|
||||||
|
def make_meter_query_for_resource(start_timestamp, start_timestamp_op,
|
||||||
|
end_timestamp, end_timestamp_op, source,
|
||||||
|
query=None):
|
||||||
|
"""This method is used when Resource table should be filtered by meters.
|
||||||
|
In this method we are looking into all qualifiers with m_ prefix.
|
||||||
|
|
||||||
|
:param start_timestamp: meter's timestamp start range.
|
||||||
|
:param start_timestamp_op: meter's start time operator, like ge, gt.
|
||||||
|
:param end_timestamp: meter's timestamp end range.
|
||||||
|
:param end_timestamp_op: meter's end time operator, like lt, le.
|
||||||
|
:param source: source filter.
|
||||||
|
:param query: a query string to concatenate with.
|
||||||
|
"""
|
||||||
|
start_rts, end_rts = get_start_end_rts(start_timestamp,
|
||||||
|
start_timestamp_op,
|
||||||
|
end_timestamp, end_timestamp_op)
|
||||||
|
mq = []
|
||||||
|
|
||||||
|
if start_rts:
|
||||||
|
filter_value = start_rts + '+' + source if source else start_rts
|
||||||
|
mq.append(_QualifierFilter("<=", filter_value))
|
||||||
|
|
||||||
|
if end_rts:
|
||||||
|
filter_value = end_rts + '+' + source if source else end_rts
|
||||||
|
mq.append(_QualifierFilter(">=", filter_value))
|
||||||
|
|
||||||
|
if mq:
|
||||||
|
meter_q = " AND ".join(mq)
|
||||||
|
# If there is a filtering on time_range we need to point that
|
||||||
|
# qualifiers should start with m_. Overwise in case e.g.
|
||||||
|
# QualifierFilter (>=, 'binaryprefix:m_9222030811134775808')
|
||||||
|
# qualifier 's_test' satisfies the filter and will be returned.
|
||||||
|
meter_q = _QualifierFilter("=", '') + " AND " + meter_q
|
||||||
|
query = meter_q if not query else query + " AND " + meter_q
|
||||||
|
return query
|
||||||
|
|
||||||
|
|
||||||
def _make_general_rowkey_scan(rts_start=None, rts_end=None, some_id=None):
|
def _make_general_rowkey_scan(rts_start=None, rts_end=None, some_id=None):
|
||||||
"""If it's filter on some_id without start and end,
|
"""If it's filter on some_id without start and end,
|
||||||
start_row = some_id while end_row = some_id + MAX_BYTE
|
start_row = some_id while end_row = some_id + MAX_BYTE
|
||||||
@ -1117,10 +1212,10 @@ def _make_general_rowkey_scan(rts_start=None, rts_end=None, some_id=None):
|
|||||||
return start_row, end_row
|
return start_row, end_row
|
||||||
|
|
||||||
|
|
||||||
def _format_meter_reference(counter_name, counter_type, counter_unit, source):
|
def _format_meter_reference(c_name, c_type, c_unit, rts, source):
|
||||||
"""Format reference to meter data.
|
"""Format reference to meter data.
|
||||||
"""
|
"""
|
||||||
return "%s!%s!%s+%s" % (counter_name, counter_type, counter_unit, source)
|
return "%s+%s+%s!%s!%s" % (rts, source, c_name, c_type, c_unit)
|
||||||
|
|
||||||
|
|
||||||
def _timestamp_from_record_tuple(record):
|
def _timestamp_from_record_tuple(record):
|
||||||
@ -1156,8 +1251,9 @@ def deserialize_entry(entry, get_raw_meta=True):
|
|||||||
sources.append(k[4:])
|
sources.append(k[4:])
|
||||||
elif k.startswith('f:r_metadata.'):
|
elif k.startswith('f:r_metadata.'):
|
||||||
metadata_flattened[k[len('f:r_metadata.'):]] = load(v)
|
metadata_flattened[k[len('f:r_metadata.'):]] = load(v)
|
||||||
elif k.startswith('f:m_'):
|
elif k.startswith("f:m_"):
|
||||||
meters.append(k[4:])
|
meter = (k[4:], load(v))
|
||||||
|
meters.append(meter)
|
||||||
else:
|
else:
|
||||||
flatten_result[k[2:]] = load(v)
|
flatten_result[k[2:]] = load(v)
|
||||||
if get_raw_meta:
|
if get_raw_meta:
|
||||||
@ -1187,9 +1283,9 @@ def serialize_entry(data=None, **kwargs):
|
|||||||
# a separate cell. For this purpose s_ and m_ prefixes are
|
# a separate cell. For this purpose s_ and m_ prefixes are
|
||||||
# introduced.
|
# introduced.
|
||||||
result['f:s_%s' % v] = dump('1')
|
result['f:s_%s' % v] = dump('1')
|
||||||
|
|
||||||
elif k == 'meter':
|
elif k == 'meter':
|
||||||
result['f:m_%s' % v] = dump('1')
|
for meter, ts in v.items():
|
||||||
|
result['f:m_%s' % meter] = dump(ts)
|
||||||
elif k == 'resource_metadata':
|
elif k == 'resource_metadata':
|
||||||
# keep raw metadata as well as flattened to provide
|
# keep raw metadata as well as flattened to provide
|
||||||
# capability with API v2. It will be flattened in another
|
# capability with API v2. It will be flattened in another
|
||||||
|
Loading…
x
Reference in New Issue
Block a user