Make recording and scanning data more determined

HBase may contain only strings and unicodes.
In current implementation some entries put to db after str() or unicode(),
others are not being modified at all.
It is needed to make this process more determined. It may be achived by
creating serialize() and deserialized() methods. In these methods json library
may be used for safe work with HBase.

Partially implements bp hbase-alarming

Change-Id: If67a103e1d79d98cc93b88fe60e79c97fd17bb96
This commit is contained in:
Nadya Privalova 2014-02-21 20:03:19 +04:00
parent b4bb537e90
commit 0ccd4e7f63

View File

@ -27,6 +27,7 @@ import os
import re
import six.moves.urllib.parse as urlparse
import bson.json_util
import happybase
from ceilometer.openstack.common.gettextutils import _ # noqa
@ -48,22 +49,36 @@ class HBaseStorage(base.StorageEngine):
- user
- { _id: user id
source: [ array of source ids reporting for the user ]
s_source_name: each source reported for user is stored with prefix s_
the value of each entry is '1'
sources: this field contains the first source reported for user.
This data is not used but stored for simplification of impl
}
- project
- { _id: project id
source: [ array of source ids reporting for the project ]
s_source_name: the same as for users
sources: the same as for users
}
- meter
- the raw incoming data
- {_id_reverted_ts: row key is constructed in this way for efficient
filtering
parsed_info_from_incoming_data: e.g. counter_name, counter_type
resource_metadata: raw metadata for corresponding resource
r_metadata_name: flattened metadata for corresponding resource
message: raw incoming data
recorded_at: when the sample has been recorded
source: source for the sample
}
- resource
- the metadata for resources
- { _id: uuid of resource,
metadata: metadata dictionaries
metadata: raw metadata dictionaries
r_metadata: flattened metadata fir quick filtering
timestamp: datetime of last update
user_id: uuid
project_id: uuid
meter: [ array of {counter_name: string, counter_type: string} ]
source: source of resource
}
- alarm
- the raw incoming alarm data
@ -184,7 +199,7 @@ class Connection(base.Connection):
alarm_to_store = serialize_entry(alarm.as_dict())
alarm_table.put(_id, alarm_to_store)
stored_alarm = deserialize_entry(alarm_table.row(_id))
stored_alarm = deserialize_entry(alarm_table.row(_id))[0]
return models.Alarm(**stored_alarm)
create_alarm = update_alarm
@ -201,15 +216,12 @@ class Connection(base.Connection):
alarm_table = self.conn.table(self.ALARM_TABLE)
#TODO(nprivalova): to be refactored
if enabled is not None:
enabled = json.dumps(enabled)
q = make_query(alarm_id=alarm_id, name=name, enabled=enabled,
user_id=user, project_id=project)
gen = alarm_table.scan(filter=q)
for ignored, data in gen:
stored_alarm = deserialize_entry(data)
stored_alarm = deserialize_entry(data)[0]
yield models.Alarm(**stored_alarm)
def get_alarm_changes(self, alarm_id, on_behalf_of,
@ -230,11 +242,7 @@ class Connection(base.Connection):
gen = alarm_history_table.scan(filter=q, row_start=start_row,
row_stop=end_row)
for ignored, data in gen:
stored_entry = deserialize_entry(data)
# It is needed to return 'details' field as string
detail = stored_entry['detail']
if detail:
stored_entry['detail'] = json.dumps(detail)
stored_entry = deserialize_entry(data)[0]
yield models.AlarmChange(**stored_entry)
def record_alarm_change(self, alarm_change):
@ -259,52 +267,31 @@ class Connection(base.Connection):
resource_table = self.conn.table(self.RESOURCE_TABLE)
meter_table = self.conn.table(self.METER_TABLE)
# store metadata fields with prefix "r_" to make filtering on metadata
# faster
resource_metadata = {}
res_meta_copy = data['resource_metadata']
if res_meta_copy:
for key, v in utils.dict_to_keyval(res_meta_copy):
resource_metadata['f:r_metadata.%s' % key] = unicode(v)
# Make sure we know about the user and project
if data['user_id']:
user = user_table.row(data['user_id'])
sources = _load_hbase_list(user, 's')
# Update if source is new
if data['source'] not in sources:
user['f:s_%s' % data['source']] = "1"
user_table.put(data['user_id'], user)
project = project_table.row(data['project_id'])
sources = _load_hbase_list(project, 's')
# Update if source is new
if data['source'] not in sources:
project['f:s_%s' % data['source']] = "1"
project_table.put(data['project_id'], project)
rts = reverse_timestamp(data['timestamp'])
resource = resource_table.row(data['resource_id'])
self._update_sources(user_table, data['user_id'], data['source'])
self._update_sources(project_table, data['project_id'], data['source'])
# Get metadata from user's data
resource_metadata = data.get('resource_metadata', {})
# Determine the name of new meter
new_meter = _format_meter_reference(
data['counter_name'], data['counter_type'], data['counter_unit'])
new_resource = {'f:resource_id': data['resource_id'],
'f:project_id': data['project_id'],
'f:user_id': data['user_id'],
'f:source': data["source"],
# store meters with prefix "m_"
'f:m_%s' % new_meter: "1"
}
new_resource.update(resource_metadata)
flatten_result, sources, meters, metadata = \
deserialize_entry(resource_table.row(data['resource_id']))
# Update if resource has new information
if new_resource != resource:
meters = _load_hbase_list(resource, 'm')
if new_meter not in meters:
new_resource['f:m_%s' % new_meter] = "1"
resource_table.put(data['resource_id'], new_resource)
if (data['source'] not in sources) or (new_meter not in meters) or (
metadata != resource_metadata):
resource_table.put(data['resource_id'],
serialize_entry(
**{'sources': [data['source']],
'meters': [new_meter],
'metadata': resource_metadata,
'resource_id': data['resource_id'],
'project_id': data['project_id'],
'user_id': data['user_id']}))
# Rowkey consists of reversed timestamp, meter and an md5 of
# user+resource+project for purposes of uniqueness
@ -314,45 +301,20 @@ class Connection(base.Connection):
# We use reverse timestamps in rowkeys as they are sorted
# alphabetically.
rts = reverse_timestamp(data['timestamp'])
row = "%s_%d_%s" % (data['counter_name'], rts, m.hexdigest())
recorded_at = timeutils.utcnow()
# Convert timestamp to string as json.dumps won't
ts = timeutils.strtime(data['timestamp'])
recorded_at_ts = timeutils.strtime(recorded_at)
record = {'f:timestamp': ts,
'f:counter_name': data['counter_name'],
'f:counter_type': data['counter_type'],
'f:counter_volume': str(data['counter_volume']),
'f:counter_unit': data['counter_unit'],
# TODO(shengjie) consider using QualifierFilter
# keep dimensions as column qualifier for quicker look up
# TODO(shengjie) extra dimensions need to be added as CQ
'f:user_id': data['user_id'],
'f:project_id': data['project_id'],
'f:message_id': data['message_id'],
'f:resource_id': data['resource_id'],
'f:source': data['source'],
'f:recorded_at': recorded_at,
# keep raw metadata as well as flattened to provide
# capability with API v2. It will be flattened in another
# way on API level
'f:metadata': data.get('resource_metadata', '{}'),
# add in reversed_ts here for time range scan
'f:rts': str(rts)
}
# Need to record resource_metadata for more robust filtering.
record.update(resource_metadata)
# Don't want to be changing the original data object.
data = copy.copy(data)
data['timestamp'] = ts
data['recorded_at'] = recorded_at_ts
# Save original meter.
record['f:message'] = json.dumps(data)
record = serialize_entry(data, **{'metadata': resource_metadata,
'rts': rts,
'message': data,
'recorded_at': timeutils.utcnow()})
meter_table.put(row, record)
def _update_sources(self, table, id, source):
user, sources, _, _ = deserialize_entry(table.row(id))
if source not in sources:
sources.append(source)
table.put(id, serialize_entry(user, **{'sources': sources}))
def get_users(self, source=None):
"""Return an iterable of user id strings.
@ -394,21 +356,9 @@ class Connection(base.Connection):
:param resource: Optional resource filter.
:param pagination: Optional pagination query.
"""
if pagination:
raise NotImplementedError(_('Pagination not implemented'))
def make_resource(data, first_ts, last_ts):
"""Transform HBase fields to Resource model."""
return models.Resource(
resource_id=data['f:resource_id'],
first_sample_timestamp=first_ts,
last_sample_timestamp=last_ts,
project_id=data['f:project_id'],
source=data['f:source'],
user_id=data['f:user_id'],
metadata=data['f:metadata'],
)
meter_table = self.conn.table(self.METER_TABLE)
sample_filter = storage.SampleFilter(
@ -416,31 +366,36 @@ class Connection(base.Connection):
start=start_timestamp, start_timestamp_op=start_timestamp_op,
end=end_timestamp, end_timestamp_op=end_timestamp_op,
resource=resource, source=source, metaquery=metaquery)
q, start_row, stop_row = make_sample_query_from_filter(
sample_filter, require_meter=False)
LOG.debug(_("Query Meter table: %s") % q)
meters = meter_table.scan(filter=q, row_start=start_row,
row_stop=stop_row)
d_meters = []
for i, m in meters:
d_meters.append(deserialize_entry(m))
# We have to sort on resource_id before we can group by it. According
# to the itertools documentation a new group is generated when the
# value of the key function changes (it breaks there).
meters = sorted(meters, key=_resource_id_from_record_tuple)
meters = sorted(d_meters, key=_resource_id_from_record_tuple)
for resource_id, r_meters in itertools.groupby(
meters, key=_resource_id_from_record_tuple):
meter_rows = [data[1] for data in sorted(
# We need deserialized entry(data[0]) and metadata(data[3])
meter_rows = [(data[0], data[3]) for data in sorted(
r_meters, key=_timestamp_from_record_tuple)]
latest_data = meter_rows[-1]
min_ts = timeutils.parse_strtime(meter_rows[0]['f:timestamp'])
max_ts = timeutils.parse_strtime(latest_data['f:timestamp'])
yield make_resource(
latest_data,
min_ts,
max_ts
min_ts = meter_rows[0][0]['timestamp']
max_ts = latest_data[0]['timestamp']
yield models.Resource(
resource_id=resource_id,
first_sample_timestamp=min_ts,
last_sample_timestamp=max_ts,
project_id=latest_data[0]['project_id'],
source=latest_data[0]['source'],
user_id=latest_data[0]['user_id'],
metadata=latest_data[1],
)
def get_meters(self, user=None, project=None, resource=None, source=None,
@ -465,42 +420,28 @@ class Connection(base.Connection):
gen = resource_table.scan(filter=q)
for ignored, data in gen:
# Meter columns are stored like this:
# "m_{counter_name}|{counter_type}|{counter_unit}" => "1"
# where 'm' is a prefix (m for meter), value is always set to 1
meter = None
for m in data:
if m.startswith('f:m_'):
meter = m
break
if meter is None:
flatten_result, s, m, md = deserialize_entry(data)
if not m:
continue
name, type, unit = meter[4:].split("!")
# Meter table may have only one "meter" and "source". That's why
# only first lists element is get in this method
name, type, unit = m[0].split("!")
yield models.Meter(
name=name,
type=type,
unit=unit,
resource_id=data['f:resource_id'],
project_id=data['f:project_id'],
source=data['f:source'],
user_id=data['f:user_id'],
resource_id=flatten_result['resource_id'],
project_id=flatten_result['project_id'],
source=s[0] if s else None,
user_id=flatten_result['user_id'],
)
@staticmethod
def _make_sample(data):
"""Transform HBase fields to Sample model."""
data = json.loads(data['f:message'])
data['timestamp'] = timeutils.parse_strtime(data['timestamp'])
data['recorded_at'] = timeutils.parse_strtime(data['recorded_at'])
return models.Sample(**data)
def get_samples(self, sample_filter, limit=None):
"""Return an iterable of models.Sample instances.
:param sample_filter: Filter.
:param limit: Maximum number of results to return.
"""
meter_table = self.conn.table(self.METER_TABLE)
q, start, stop = make_sample_query_from_filter(
@ -513,7 +454,9 @@ class Connection(base.Connection):
break
else:
limit -= 1
yield self._make_sample(meter)
d_meter = deserialize_entry(meter)[0]
d_meter['message']['recorded_at'] = d_meter['recorded_at']
yield models.Sample(**d_meter['message'])
@staticmethod
def _update_meter_stats(stat, meter):
@ -525,9 +468,9 @@ class Connection(base.Connection):
:param start_time: query start time
:param period: length of the time bucket
"""
vol = int(meter['f:counter_volume'])
ts = timeutils.parse_strtime(meter['f:timestamp'])
stat.unit = meter['f:counter_unit']
vol = meter['counter_volume']
ts = meter['timestamp']
stat.unit = meter['counter_unit']
stat.min = min(vol, stat.min or vol)
stat.max = max(vol, stat.max)
stat.sum = vol + (stat.sum or 0)
@ -557,22 +500,21 @@ class Connection(base.Connection):
meter_table = self.conn.table(self.METER_TABLE)
q, start, stop = make_sample_query_from_filter(sample_filter)
meters = list(meter for (ignored, meter) in
meter_table.scan(filter=q, row_start=start,
row_stop=stop)
)
meters = map(deserialize_entry, list(meter for (ignored, meter) in
meter_table.scan(filter=q, row_start=start,
row_stop=stop)))
if sample_filter.start:
start_time = sample_filter.start
elif meters:
start_time = timeutils.parse_strtime(meters[-1]['f:timestamp'])
start_time = meters[-1][0]['timestamp']
else:
start_time = None
if sample_filter.end:
end_time = sample_filter.end
elif meters:
end_time = timeutils.parse_strtime(meters[0]['f:timestamp'])
end_time = meters[0][0]['timestamp']
else:
end_time = None
@ -586,7 +528,7 @@ class Connection(base.Connection):
# As our HBase meters are stored as newest-first, we need to iterate
# in the reverse order
for meter in meters[::-1]:
ts = timeutils.parse_strtime(meter['f:timestamp'])
ts = meter[0]['timestamp']
if period:
offset = int(timeutils.delta_seconds(
start_time, ts) / period) * period
@ -612,7 +554,7 @@ class Connection(base.Connection):
duration_end=None,
groupby=None)
)
self._update_meter_stats(results[-1], meter)
self._update_meter_stats(results[-1], meter[0])
return results
@ -666,7 +608,7 @@ class MTable(object):
# Extract filter name and its arguments
g = re.search("(.*)\((.*),?\)", f)
fname = g.group(1).strip()
fargs = [s.strip().replace('\'', '').replace('\"', '')
fargs = [s.strip().replace('\'', '')
for s in g.group(2).split(',')]
m = getattr(self, fname)
if callable(m):
@ -769,6 +711,7 @@ def make_timestamp_query(func, start=None, start_op=None, end=None,
return start_row, end_row
q = []
# We dont need to dump here because get_start_end_rts returns strings
if rts_start:
q.append("SingleColumnValueFilter ('f', 'rts', <=, 'binary:%s')" %
rts_start)
@ -805,11 +748,14 @@ def make_query(metaquery=None, **kwargs):
column name in db
"""
q = []
for key, value in kwargs.iteritems():
# Note: we use extended constructor for SingleColumnValueFilter here.
# It is explicitly specified that entry should not be returned if CF is not
# found in table.
for key, value in kwargs.items():
if value is not None:
q.append("SingleColumnValueFilter "
"('f', '%s', =, 'binary:%s')" % (key, value))
"('f', '%s', =, 'binary:%s', true, true)" %
(key, dump(value)))
res_q = None
if len(q):
res_q = " AND ".join(q)
@ -818,8 +764,9 @@ def make_query(metaquery=None, **kwargs):
meta_q = []
for k, v in metaquery.items():
meta_q.append(
"SingleColumnValueFilter ('f', '%s', =, 'binary:%s')"
% ('r_' + k, v))
"SingleColumnValueFilter ('f', '%s', =, 'binary:%s', "
"true, true)"
% ('r_' + k, dump(v)))
meta_q = " AND ".join(meta_q)
# join query and metaquery
if res_q is not None:
@ -827,7 +774,7 @@ def make_query(metaquery=None, **kwargs):
else:
res_q = meta_q # metaquery only
return res_q or ""
return res_q
def make_sample_query_from_filter(sample_filter, require_meter=True):
@ -876,16 +823,6 @@ def _make_general_rowkey_scan(rts_start=None, rts_end=None, some_id=None):
return start_row, end_row
def _load_hbase_list(d, prefix):
"""Deserialise dict stored as HBase column family
"""
ret = []
prefix = 'f:%s_' % prefix
for key in (k for k in d if k.startswith(prefix)):
ret.append(key[len(prefix):])
return ret
def _format_meter_reference(counter_name, counter_type, counter_unit):
"""Format reference to meter data.
"""
@ -895,45 +832,105 @@ def _format_meter_reference(counter_name, counter_type, counter_unit):
def _timestamp_from_record_tuple(record):
"""Extract timestamp from HBase tuple record
"""
return timeutils.parse_strtime(record[1]['f:timestamp'])
return record[0]['timestamp']
def _resource_id_from_record_tuple(record):
"""Extract resource_id from HBase tuple record
"""
return record[1]['f:resource_id']
return record[0]['resource_id']
#TODO(nprivalova): to be refactored, will be used everywhere in impl_hbase
# without additional ifs
def serialize_entry(entry_from_user):
result_dict = copy.copy(entry_from_user)
keys = result_dict.keys()
for key in keys:
val = result_dict[key]
if isinstance(val, datetime.datetime):
val = timeutils.strtime(val)
if not isinstance(val, basestring):
val = json.dumps(val)
result_dict['f:' + key] = val
del result_dict[key]
return result_dict
def deserialize_entry(entry, get_raw_meta=True):
"""Return a list of flatten_result, sources, meters and metadata
flatten_result contains a dict of simple structures such as 'resource_id':1
sources/meters are the lists of sources and meters correspondingly.
metadata is metadata dict. This dict may be returned as flattened if
get_raw_meta is False.
:param entry: entry from HBase, without row name and timestamp
:param get_raw_meta: If true then raw metadata will be returned
If False metadata will be constructed from
'f:r_metadata.' fields
"""
flatten_result = {}
sources = []
meters = []
metadata_flattened = {}
for k, v in entry.items():
if k.startswith('f:s_'):
sources.append(k[4:])
elif k.startswith('f:m_'):
meters.append(k[4:])
elif k.startswith('f:r_metadata.'):
metadata_flattened[k[len('f:r_metadata.'):]] = load(v)
else:
flatten_result[k[2:]] = load(v)
if get_raw_meta:
metadata = flatten_result.get('metadata', {})
else:
metadata = metadata_flattened
return flatten_result, sources, meters, metadata
def deserialize_entry(stored_entry):
result_entry = copy.copy(stored_entry)
keys = result_entry.keys()
for key in keys:
val = result_entry[key]
try:
val = json.loads(val)
except ValueError:
pass
if "timestamp" in key and val:
val = timeutils.parse_strtime(val)
# There is no int in wsme models
if isinstance(val, (int, long, float)) and not isinstance(val, bool):
val = str(val)
result_entry[key[2:]] = val
del result_entry[key]
return result_entry
def serialize_entry(data={}, **kwargs):
"""Return a dict that is ready to be stored to HBase
:param data: dict to be serialized
:param kwargs: additional args
"""
entry_dict = copy.copy(data)
entry_dict.update(**kwargs)
result = {}
for k, v in entry_dict.items():
if k == 'sources':
# user and project tables may contain several sources and meters
# that's why we store it separately as pairs "source/meter name:1".
# Resource and meter table contain only one and it's possible
# to store pairs like "source/meter:source name/meter name". But to
# keep things simple it's possible to store all variants in all
# tables because it doesn't break logic and overhead is not too big
for source in v:
result['f:s_%s' % source] = dump('1')
if v:
result['f:source'] = dump(v[0])
elif k == 'meters':
for meter in v:
result['f:m_%s' % meter] = dump('1')
elif k == 'metadata':
# keep raw metadata as well as flattened to provide
# capability with API v2. It will be flattened in another
# way on API level. But we need flattened too for quick filtering.
flattened_meta = dump_metadata(v)
for k, m in flattened_meta.items():
result['f:r_metadata.' + k] = dump(m)
result['f:metadata'] = dump(v)
else:
result['f:' + k] = dump(v)
return result
def dump_metadata(meta):
resource_metadata = {}
for key, v in utils.dict_to_keyval(meta):
resource_metadata[key] = v
return resource_metadata
def dump(data):
return json.dumps(data, default=bson.json_util.default)
def load(data):
return json.loads(data, object_hook=object_hook)
# We don't want to have tzinfo in decoded json.This object_hook is
# overwritten json_util.object_hook for $date
def object_hook(dct):
if "$date" in dct:
dt = bson.json_util.object_hook(dct)
return dt.replace(tzinfo=None)
return bson.json_util.object_hook(dct)