Implement redesigned separator in names of columns in HBase
This change improves constructing "keys" for stored data in HBase. Now we are using separators "_!+" to construct the "key", but we don't restrict any character inside the names of metrics, so it could be a reason of incorrect parsing of stored data. In this patch only one separator ":" is used, and method "prepare_key" helps to avoid incorrect parsing by "quoting" columns and rows names. Closes-bug: #1328114 Change-Id: I8f816055bb76e67d239de73a57b7cb0baa94dee5
This commit is contained in:
parent
3643ca5b3b
commit
d71753f4ab
@ -59,7 +59,7 @@ class Connection(base.Connection):
|
||||
|
||||
- alarm_h:
|
||||
|
||||
- row_key: uuid of alarm + "_" + reversed timestamp
|
||||
- row_key: uuid of alarm + ":" + reversed timestamp
|
||||
- Column Families:
|
||||
|
||||
f: raw incoming alarm_history data. Timestamp becomes now()
|
||||
@ -225,5 +225,6 @@ class Connection(base.Connection):
|
||||
rts = hbase_utils.timestamp(ts)
|
||||
with self.conn_pool.connection() as conn:
|
||||
alarm_history_table = conn.table(self.ALARM_HISTORY_TABLE)
|
||||
alarm_history_table.put(alarm_change.get('alarm_id') + "_" +
|
||||
str(rts), alarm_change_dict)
|
||||
alarm_history_table.put(
|
||||
hbase_utils.prepare_key(alarm_change.get('alarm_id'), rts),
|
||||
alarm_change_dict)
|
||||
|
@ -148,9 +148,9 @@ def make_query(metaquery=None, trait_query=None, **kwargs):
|
||||
for k, v in kwargs.items():
|
||||
if v is not None:
|
||||
res_q = ("SingleColumnValueFilter "
|
||||
"('f', '%s+%d', %s, 'binary:%s', true, true)" %
|
||||
(trait_name, EVENT_TRAIT_TYPES[k], OP_SIGN[op],
|
||||
dump(v)))
|
||||
"('f', '%s', %s, 'binary:%s', true, true)" %
|
||||
(prepare_key(trait_name, EVENT_TRAIT_TYPES[k]),
|
||||
OP_SIGN[op], dump(v)))
|
||||
return res_q
|
||||
|
||||
# Note: we use extended constructor for SingleColumnValueFilter here.
|
||||
@ -165,11 +165,11 @@ def make_query(metaquery=None, trait_query=None, **kwargs):
|
||||
elif key == 'trait_type':
|
||||
q.append("ColumnPrefixFilter('%s')" % value)
|
||||
elif key == 'event_id':
|
||||
q.append("RowFilter ( = , 'regexstring:\d*_%s')" % value)
|
||||
q.append("RowFilter ( = , 'regexstring:\d*:%s')" % value)
|
||||
else:
|
||||
q.append("SingleColumnValueFilter "
|
||||
"('f', '%s', =, 'binary:%s', true, true)" %
|
||||
(key, dump(value)))
|
||||
(quote(key), dump(value)))
|
||||
res_q = None
|
||||
if len(q):
|
||||
res_q = " AND ".join(q)
|
||||
@ -282,11 +282,13 @@ def make_meter_query_for_resource(start_timestamp, start_timestamp_op,
|
||||
end_op = end_timestamp_op or 'lt'
|
||||
|
||||
if start_rts:
|
||||
filter_value = start_rts + '+' + source if source else start_rts
|
||||
filter_value = (start_rts + ':' + quote(source) if source
|
||||
else start_rts)
|
||||
mq.append(_QualifierFilter(OP_SIGN_REV[start_op], filter_value))
|
||||
|
||||
if end_rts:
|
||||
filter_value = end_rts + '+' + source if source else end_rts
|
||||
filter_value = (end_rts + ':' + quote(source) if source
|
||||
else end_rts)
|
||||
mq.append(_QualifierFilter(OP_SIGN_REV[end_op], filter_value))
|
||||
|
||||
if mq:
|
||||
@ -308,18 +310,17 @@ def make_general_rowkey_scan(rts_start=None, rts_end=None, some_id=None):
|
||||
if some_id is None:
|
||||
return None, None
|
||||
if not rts_start:
|
||||
rts_start = chr(127)
|
||||
end_row = "%s_%s" % (some_id, rts_start)
|
||||
start_row = "%s_%s" % (some_id, rts_end)
|
||||
# NOTE(idegtiarov): Here we could not use chr > 122 because chr >= 123
|
||||
# will be quoted and character will be turn in a composition that is
|
||||
# started with '%' (chr(37)) that lexicographically is less then chr
|
||||
# of number
|
||||
rts_start = chr(122)
|
||||
end_row = prepare_key(some_id, rts_start)
|
||||
start_row = prepare_key(some_id, rts_end)
|
||||
|
||||
return start_row, end_row
|
||||
|
||||
|
||||
def format_meter_reference(c_name, c_type, c_unit, rts, source):
|
||||
"""Format reference to meter data."""
|
||||
return "%s+%s+%s!%s!%s" % (rts, source, c_name, c_type, c_unit)
|
||||
|
||||
|
||||
def prepare_key(*args):
|
||||
"""Prepares names for rows and columns with correct separator.
|
||||
|
||||
@ -367,10 +368,14 @@ def deserialize_entry(entry, get_raw_meta=True):
|
||||
elif k.startswith('f:r_metadata.'):
|
||||
metadata_flattened[k[len('f:r_metadata.'):]] = load(v)
|
||||
elif k.startswith("f:m_"):
|
||||
meter = (k[4:], load(v))
|
||||
meter = ([unquote(i) for i in k[4:].split(':')], load(v))
|
||||
meters.append(meter)
|
||||
else:
|
||||
flatten_result[k[2:]] = load(v)
|
||||
if ':' in k[2:]:
|
||||
key = tuple([unquote(i) for i in k[2:].split(':')])
|
||||
else:
|
||||
key = unquote(k[2:])
|
||||
flatten_result[key] = load(v)
|
||||
if get_raw_meta:
|
||||
metadata = flatten_result.get('resource_metadata', {})
|
||||
else:
|
||||
@ -406,11 +411,11 @@ def serialize_entry(data=None, **kwargs):
|
||||
# capability with API v2. It will be flattened in another
|
||||
# way on API level. But we need flattened too for quick filtering.
|
||||
flattened_meta = dump_metadata(v)
|
||||
for k, m in flattened_meta.items():
|
||||
result['f:r_metadata.' + k] = dump(m)
|
||||
for key, m in flattened_meta.items():
|
||||
result['f:r_metadata.' + key] = dump(m)
|
||||
result['f:resource_metadata'] = dump(v)
|
||||
else:
|
||||
result['f:' + k] = dump(v)
|
||||
result['f:' + quote(k, ':')] = dump(v)
|
||||
return result
|
||||
|
||||
|
||||
|
@ -105,17 +105,17 @@ class Connection(base.Connection):
|
||||
f:r_metadata.display_name or f:r_metadata.tag
|
||||
|
||||
- sources for all corresponding meters with prefix 's'
|
||||
- all meters for this resource in format:
|
||||
- all meters with prefix 'm' for this resource in format:
|
||||
|
||||
.. code-block:: python
|
||||
|
||||
"%s+%s+%s!%s!%s" % (rts, source, counter_name, counter_type,
|
||||
"%s:%s:%s:%s:%s" % (rts, source, counter_name, counter_type,
|
||||
counter_unit)
|
||||
|
||||
- events:
|
||||
|
||||
- row_key: timestamp of event's generation + uuid of event
|
||||
in format: "%s+%s" % (ts, Event.message_id)
|
||||
in format: "%s:%s" % (ts, Event.message_id)
|
||||
- Column Families:
|
||||
|
||||
f: contains the following qualifiers:
|
||||
@ -126,7 +126,7 @@ class Connection(base.Connection):
|
||||
|
||||
.. code-block:: python
|
||||
|
||||
"%s+%s" % (trait_name, trait_type)
|
||||
"%s:%s" % (trait_name, trait_type)
|
||||
"""
|
||||
|
||||
CAPABILITIES = utils.update_nested(base.Connection.CAPABILITIES,
|
||||
@ -234,9 +234,9 @@ class Connection(base.Connection):
|
||||
resource_metadata = data.get('resource_metadata', {})
|
||||
# Determine the name of new meter
|
||||
rts = hbase_utils.timestamp(data['timestamp'])
|
||||
new_meter = hbase_utils.format_meter_reference(
|
||||
data['counter_name'], data['counter_type'],
|
||||
data['counter_unit'], rts, data['source'])
|
||||
new_meter = hbase_utils.prepare_key(
|
||||
rts, data['source'], data['counter_name'],
|
||||
data['counter_type'], data['counter_unit'])
|
||||
|
||||
# TODO(nprivalova): try not to store resource_id
|
||||
resource = hbase_utils.serialize_entry(**{
|
||||
@ -255,8 +255,8 @@ class Connection(base.Connection):
|
||||
|
||||
# Rowkey consists of reversed timestamp, meter and a
|
||||
# message signature for purposes of uniqueness
|
||||
row = "%s_%d_%s" % (data['counter_name'], rts,
|
||||
data['message_signature'])
|
||||
row = hbase_utils.prepare_key(data['counter_name'], rts,
|
||||
data['message_signature'])
|
||||
record = hbase_utils.serialize_entry(
|
||||
data, **{'source': data['source'], 'rts': rts,
|
||||
'message': data, 'recorded_at': timeutils.utcnow()})
|
||||
@ -301,7 +301,7 @@ class Connection(base.Connection):
|
||||
# manually
|
||||
first_ts = min(meters, key=operator.itemgetter(1))[1]
|
||||
last_ts = max(meters, key=operator.itemgetter(1))[1]
|
||||
source = meters[0][0].split('+')[1]
|
||||
source = meters[0][0][1]
|
||||
# If we use QualifierFilter then HBase returnes only
|
||||
# qualifiers filtered by. It will not return the whole entry.
|
||||
# That's why if we need to ask additional qualifiers manually.
|
||||
@ -353,10 +353,9 @@ class Connection(base.Connection):
|
||||
flatten_result, s, meters, md = hbase_utils.deserialize_entry(
|
||||
data)
|
||||
for m in meters:
|
||||
_m_rts, m_source, m_raw = m[0].split("+")
|
||||
name, type, unit = m_raw.split('!')
|
||||
_m_rts, m_source, name, m_type, unit = m[0]
|
||||
meter_dict = {'name': name,
|
||||
'type': type,
|
||||
'type': m_type,
|
||||
'unit': unit,
|
||||
'resource_id': flatten_result['resource_id'],
|
||||
'project_id': flatten_result['project_id'],
|
||||
@ -365,8 +364,8 @@ class Connection(base.Connection):
|
||||
if frozen_meter in result:
|
||||
continue
|
||||
result.add(frozen_meter)
|
||||
meter_dict.update({'source':
|
||||
m_source if m_source else None})
|
||||
meter_dict.update({'source': m_source
|
||||
if m_source else None})
|
||||
|
||||
yield models.Meter(**meter_dict)
|
||||
|
||||
@ -517,13 +516,14 @@ class Connection(base.Connection):
|
||||
# models.Event or purposes of storage event sorted by
|
||||
# timestamp in the database.
|
||||
ts = event_model.generated
|
||||
row = "%d_%s" % (hbase_utils.timestamp(ts, reverse=False),
|
||||
event_model.message_id)
|
||||
row = hbase_utils.prepare_key(
|
||||
hbase_utils.timestamp(ts, reverse=False),
|
||||
event_model.message_id)
|
||||
event_type = event_model.event_type
|
||||
traits = {}
|
||||
if event_model.traits:
|
||||
for trait in event_model.traits:
|
||||
key = "%s+%d" % (trait.name, trait.dtype)
|
||||
key = hbase_utils.prepare_key(trait.name, trait.dtype)
|
||||
traits[key] = trait.value
|
||||
record = hbase_utils.serialize_entry(traits,
|
||||
event_type=event_type,
|
||||
@ -553,16 +553,15 @@ class Connection(base.Connection):
|
||||
traits = []
|
||||
events_dict = hbase_utils.deserialize_entry(data)[0]
|
||||
for key, value in events_dict.items():
|
||||
if (not key.startswith('event_type')
|
||||
and not key.startswith('timestamp')):
|
||||
trait_name, trait_dtype = key.rsplit('+', 1)
|
||||
if isinstance(key, tuple):
|
||||
trait_name, trait_dtype = key
|
||||
traits.append(ev_models.Trait(name=trait_name,
|
||||
dtype=int(trait_dtype),
|
||||
value=value))
|
||||
ts, mess = event_id.split('_', 1)
|
||||
ts, mess = event_id.split(':')
|
||||
|
||||
yield ev_models.Event(
|
||||
message_id=mess,
|
||||
message_id=hbase_utils.unquote(mess),
|
||||
event_type=events_dict['event_type'],
|
||||
generated=events_dict['timestamp'],
|
||||
traits=sorted(traits,
|
||||
@ -579,7 +578,7 @@ class Connection(base.Connection):
|
||||
for event_id, data in gen:
|
||||
events_dict = hbase_utils.deserialize_entry(data)[0]
|
||||
for key, value in events_dict.items():
|
||||
if key.startswith('event_type'):
|
||||
if not isinstance(key, tuple) and key.startswith('event_type'):
|
||||
if value not in event_types:
|
||||
event_types.add(value)
|
||||
yield value
|
||||
@ -600,9 +599,8 @@ class Connection(base.Connection):
|
||||
for event_id, data in gen:
|
||||
events_dict = hbase_utils.deserialize_entry(data)[0]
|
||||
for key, value in events_dict.items():
|
||||
if (not key.startswith('event_type') and
|
||||
not key.startswith('timestamp')):
|
||||
trait_name, trait_type = key.rsplit('+', 1)
|
||||
if isinstance(key, tuple):
|
||||
trait_name, trait_type = key
|
||||
if trait_name not in trait_names:
|
||||
# Here we check that our method return only unique
|
||||
# trait types, for ex. if it is found the same trait
|
||||
@ -629,8 +627,7 @@ class Connection(base.Connection):
|
||||
for event_id, data in gen:
|
||||
events_dict = hbase_utils.deserialize_entry(data)[0]
|
||||
for key, value in events_dict.items():
|
||||
if (not key.startswith('event_type') and
|
||||
not key.startswith('timestamp')):
|
||||
trait_name, trait_type = key.rsplit('+', 1)
|
||||
if isinstance(key, tuple):
|
||||
trait_name, trait_type = key
|
||||
yield ev_models.Trait(name=trait_name,
|
||||
dtype=int(trait_type), value=value)
|
||||
|
Loading…
x
Reference in New Issue
Block a user