Merge "Make recording and scanning data more determined"
This commit is contained in:
commit
8d849136ee
@ -27,6 +27,7 @@ import os
|
|||||||
import re
|
import re
|
||||||
import six.moves.urllib.parse as urlparse
|
import six.moves.urllib.parse as urlparse
|
||||||
|
|
||||||
|
import bson.json_util
|
||||||
import happybase
|
import happybase
|
||||||
|
|
||||||
from ceilometer.openstack.common.gettextutils import _ # noqa
|
from ceilometer.openstack.common.gettextutils import _ # noqa
|
||||||
@ -48,22 +49,36 @@ class HBaseStorage(base.StorageEngine):
|
|||||||
|
|
||||||
- user
|
- user
|
||||||
- { _id: user id
|
- { _id: user id
|
||||||
source: [ array of source ids reporting for the user ]
|
s_source_name: each source reported for user is stored with prefix s_
|
||||||
|
the value of each entry is '1'
|
||||||
|
sources: this field contains the first source reported for user.
|
||||||
|
This data is not used but stored for simplification of impl
|
||||||
}
|
}
|
||||||
- project
|
- project
|
||||||
- { _id: project id
|
- { _id: project id
|
||||||
source: [ array of source ids reporting for the project ]
|
s_source_name: the same as for users
|
||||||
|
sources: the same as for users
|
||||||
}
|
}
|
||||||
- meter
|
- meter
|
||||||
- the raw incoming data
|
- {_id_reverted_ts: row key is constructed in this way for efficient
|
||||||
|
filtering
|
||||||
|
parsed_info_from_incoming_data: e.g. counter_name, counter_type
|
||||||
|
resource_metadata: raw metadata for corresponding resource
|
||||||
|
r_metadata_name: flattened metadata for corresponding resource
|
||||||
|
message: raw incoming data
|
||||||
|
recorded_at: when the sample has been recorded
|
||||||
|
source: source for the sample
|
||||||
|
}
|
||||||
- resource
|
- resource
|
||||||
- the metadata for resources
|
- the metadata for resources
|
||||||
- { _id: uuid of resource,
|
- { _id: uuid of resource,
|
||||||
metadata: metadata dictionaries
|
metadata: raw metadata dictionaries
|
||||||
|
r_metadata: flattened metadata fir quick filtering
|
||||||
timestamp: datetime of last update
|
timestamp: datetime of last update
|
||||||
user_id: uuid
|
user_id: uuid
|
||||||
project_id: uuid
|
project_id: uuid
|
||||||
meter: [ array of {counter_name: string, counter_type: string} ]
|
meter: [ array of {counter_name: string, counter_type: string} ]
|
||||||
|
source: source of resource
|
||||||
}
|
}
|
||||||
- alarm
|
- alarm
|
||||||
- the raw incoming alarm data
|
- the raw incoming alarm data
|
||||||
@ -184,7 +199,7 @@ class Connection(base.Connection):
|
|||||||
|
|
||||||
alarm_to_store = serialize_entry(alarm.as_dict())
|
alarm_to_store = serialize_entry(alarm.as_dict())
|
||||||
alarm_table.put(_id, alarm_to_store)
|
alarm_table.put(_id, alarm_to_store)
|
||||||
stored_alarm = deserialize_entry(alarm_table.row(_id))
|
stored_alarm = deserialize_entry(alarm_table.row(_id))[0]
|
||||||
return models.Alarm(**stored_alarm)
|
return models.Alarm(**stored_alarm)
|
||||||
|
|
||||||
create_alarm = update_alarm
|
create_alarm = update_alarm
|
||||||
@ -201,15 +216,12 @@ class Connection(base.Connection):
|
|||||||
|
|
||||||
alarm_table = self.conn.table(self.ALARM_TABLE)
|
alarm_table = self.conn.table(self.ALARM_TABLE)
|
||||||
|
|
||||||
#TODO(nprivalova): to be refactored
|
|
||||||
if enabled is not None:
|
|
||||||
enabled = json.dumps(enabled)
|
|
||||||
q = make_query(alarm_id=alarm_id, name=name, enabled=enabled,
|
q = make_query(alarm_id=alarm_id, name=name, enabled=enabled,
|
||||||
user_id=user, project_id=project)
|
user_id=user, project_id=project)
|
||||||
|
|
||||||
gen = alarm_table.scan(filter=q)
|
gen = alarm_table.scan(filter=q)
|
||||||
for ignored, data in gen:
|
for ignored, data in gen:
|
||||||
stored_alarm = deserialize_entry(data)
|
stored_alarm = deserialize_entry(data)[0]
|
||||||
yield models.Alarm(**stored_alarm)
|
yield models.Alarm(**stored_alarm)
|
||||||
|
|
||||||
def get_alarm_changes(self, alarm_id, on_behalf_of,
|
def get_alarm_changes(self, alarm_id, on_behalf_of,
|
||||||
@ -230,11 +242,7 @@ class Connection(base.Connection):
|
|||||||
gen = alarm_history_table.scan(filter=q, row_start=start_row,
|
gen = alarm_history_table.scan(filter=q, row_start=start_row,
|
||||||
row_stop=end_row)
|
row_stop=end_row)
|
||||||
for ignored, data in gen:
|
for ignored, data in gen:
|
||||||
stored_entry = deserialize_entry(data)
|
stored_entry = deserialize_entry(data)[0]
|
||||||
# It is needed to return 'details' field as string
|
|
||||||
detail = stored_entry['detail']
|
|
||||||
if detail:
|
|
||||||
stored_entry['detail'] = json.dumps(detail)
|
|
||||||
yield models.AlarmChange(**stored_entry)
|
yield models.AlarmChange(**stored_entry)
|
||||||
|
|
||||||
def record_alarm_change(self, alarm_change):
|
def record_alarm_change(self, alarm_change):
|
||||||
@ -259,52 +267,31 @@ class Connection(base.Connection):
|
|||||||
resource_table = self.conn.table(self.RESOURCE_TABLE)
|
resource_table = self.conn.table(self.RESOURCE_TABLE)
|
||||||
meter_table = self.conn.table(self.METER_TABLE)
|
meter_table = self.conn.table(self.METER_TABLE)
|
||||||
|
|
||||||
# store metadata fields with prefix "r_" to make filtering on metadata
|
|
||||||
# faster
|
|
||||||
resource_metadata = {}
|
|
||||||
res_meta_copy = data['resource_metadata']
|
|
||||||
if res_meta_copy:
|
|
||||||
for key, v in utils.dict_to_keyval(res_meta_copy):
|
|
||||||
resource_metadata['f:r_metadata.%s' % key] = unicode(v)
|
|
||||||
|
|
||||||
# Make sure we know about the user and project
|
# Make sure we know about the user and project
|
||||||
if data['user_id']:
|
if data['user_id']:
|
||||||
user = user_table.row(data['user_id'])
|
self._update_sources(user_table, data['user_id'], data['source'])
|
||||||
sources = _load_hbase_list(user, 's')
|
self._update_sources(project_table, data['project_id'], data['source'])
|
||||||
# Update if source is new
|
|
||||||
if data['source'] not in sources:
|
|
||||||
user['f:s_%s' % data['source']] = "1"
|
|
||||||
user_table.put(data['user_id'], user)
|
|
||||||
|
|
||||||
project = project_table.row(data['project_id'])
|
|
||||||
sources = _load_hbase_list(project, 's')
|
|
||||||
# Update if source is new
|
|
||||||
if data['source'] not in sources:
|
|
||||||
project['f:s_%s' % data['source']] = "1"
|
|
||||||
project_table.put(data['project_id'], project)
|
|
||||||
|
|
||||||
rts = reverse_timestamp(data['timestamp'])
|
|
||||||
|
|
||||||
resource = resource_table.row(data['resource_id'])
|
|
||||||
|
|
||||||
|
# Get metadata from user's data
|
||||||
|
resource_metadata = data.get('resource_metadata', {})
|
||||||
|
# Determine the name of new meter
|
||||||
new_meter = _format_meter_reference(
|
new_meter = _format_meter_reference(
|
||||||
data['counter_name'], data['counter_type'], data['counter_unit'])
|
data['counter_name'], data['counter_type'], data['counter_unit'])
|
||||||
new_resource = {'f:resource_id': data['resource_id'],
|
|
||||||
'f:project_id': data['project_id'],
|
flatten_result, sources, meters, metadata = \
|
||||||
'f:user_id': data['user_id'],
|
deserialize_entry(resource_table.row(data['resource_id']))
|
||||||
'f:source': data["source"],
|
|
||||||
# store meters with prefix "m_"
|
|
||||||
'f:m_%s' % new_meter: "1"
|
|
||||||
}
|
|
||||||
new_resource.update(resource_metadata)
|
|
||||||
|
|
||||||
# Update if resource has new information
|
# Update if resource has new information
|
||||||
if new_resource != resource:
|
if (data['source'] not in sources) or (new_meter not in meters) or (
|
||||||
meters = _load_hbase_list(resource, 'm')
|
metadata != resource_metadata):
|
||||||
if new_meter not in meters:
|
resource_table.put(data['resource_id'],
|
||||||
new_resource['f:m_%s' % new_meter] = "1"
|
serialize_entry(
|
||||||
|
**{'sources': [data['source']],
|
||||||
resource_table.put(data['resource_id'], new_resource)
|
'meters': [new_meter],
|
||||||
|
'metadata': resource_metadata,
|
||||||
|
'resource_id': data['resource_id'],
|
||||||
|
'project_id': data['project_id'],
|
||||||
|
'user_id': data['user_id']}))
|
||||||
|
|
||||||
# Rowkey consists of reversed timestamp, meter and an md5 of
|
# Rowkey consists of reversed timestamp, meter and an md5 of
|
||||||
# user+resource+project for purposes of uniqueness
|
# user+resource+project for purposes of uniqueness
|
||||||
@ -314,45 +301,20 @@ class Connection(base.Connection):
|
|||||||
|
|
||||||
# We use reverse timestamps in rowkeys as they are sorted
|
# We use reverse timestamps in rowkeys as they are sorted
|
||||||
# alphabetically.
|
# alphabetically.
|
||||||
|
rts = reverse_timestamp(data['timestamp'])
|
||||||
row = "%s_%d_%s" % (data['counter_name'], rts, m.hexdigest())
|
row = "%s_%d_%s" % (data['counter_name'], rts, m.hexdigest())
|
||||||
|
record = serialize_entry(data, **{'metadata': resource_metadata,
|
||||||
recorded_at = timeutils.utcnow()
|
'rts': rts,
|
||||||
|
'message': data,
|
||||||
# Convert timestamp to string as json.dumps won't
|
'recorded_at': timeutils.utcnow()})
|
||||||
ts = timeutils.strtime(data['timestamp'])
|
|
||||||
recorded_at_ts = timeutils.strtime(recorded_at)
|
|
||||||
|
|
||||||
record = {'f:timestamp': ts,
|
|
||||||
'f:counter_name': data['counter_name'],
|
|
||||||
'f:counter_type': data['counter_type'],
|
|
||||||
'f:counter_volume': str(data['counter_volume']),
|
|
||||||
'f:counter_unit': data['counter_unit'],
|
|
||||||
# TODO(shengjie) consider using QualifierFilter
|
|
||||||
# keep dimensions as column qualifier for quicker look up
|
|
||||||
# TODO(shengjie) extra dimensions need to be added as CQ
|
|
||||||
'f:user_id': data['user_id'],
|
|
||||||
'f:project_id': data['project_id'],
|
|
||||||
'f:message_id': data['message_id'],
|
|
||||||
'f:resource_id': data['resource_id'],
|
|
||||||
'f:source': data['source'],
|
|
||||||
'f:recorded_at': recorded_at,
|
|
||||||
# keep raw metadata as well as flattened to provide
|
|
||||||
# capability with API v2. It will be flattened in another
|
|
||||||
# way on API level
|
|
||||||
'f:metadata': data.get('resource_metadata', '{}'),
|
|
||||||
# add in reversed_ts here for time range scan
|
|
||||||
'f:rts': str(rts)
|
|
||||||
}
|
|
||||||
# Need to record resource_metadata for more robust filtering.
|
|
||||||
record.update(resource_metadata)
|
|
||||||
# Don't want to be changing the original data object.
|
|
||||||
data = copy.copy(data)
|
|
||||||
data['timestamp'] = ts
|
|
||||||
data['recorded_at'] = recorded_at_ts
|
|
||||||
# Save original meter.
|
|
||||||
record['f:message'] = json.dumps(data)
|
|
||||||
meter_table.put(row, record)
|
meter_table.put(row, record)
|
||||||
|
|
||||||
|
def _update_sources(self, table, id, source):
|
||||||
|
user, sources, _, _ = deserialize_entry(table.row(id))
|
||||||
|
if source not in sources:
|
||||||
|
sources.append(source)
|
||||||
|
table.put(id, serialize_entry(user, **{'sources': sources}))
|
||||||
|
|
||||||
def get_users(self, source=None):
|
def get_users(self, source=None):
|
||||||
"""Return an iterable of user id strings.
|
"""Return an iterable of user id strings.
|
||||||
|
|
||||||
@ -394,21 +356,9 @@ class Connection(base.Connection):
|
|||||||
:param resource: Optional resource filter.
|
:param resource: Optional resource filter.
|
||||||
:param pagination: Optional pagination query.
|
:param pagination: Optional pagination query.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
if pagination:
|
if pagination:
|
||||||
raise NotImplementedError(_('Pagination not implemented'))
|
raise NotImplementedError(_('Pagination not implemented'))
|
||||||
|
|
||||||
def make_resource(data, first_ts, last_ts):
|
|
||||||
"""Transform HBase fields to Resource model."""
|
|
||||||
return models.Resource(
|
|
||||||
resource_id=data['f:resource_id'],
|
|
||||||
first_sample_timestamp=first_ts,
|
|
||||||
last_sample_timestamp=last_ts,
|
|
||||||
project_id=data['f:project_id'],
|
|
||||||
source=data['f:source'],
|
|
||||||
user_id=data['f:user_id'],
|
|
||||||
metadata=data['f:metadata'],
|
|
||||||
)
|
|
||||||
meter_table = self.conn.table(self.METER_TABLE)
|
meter_table = self.conn.table(self.METER_TABLE)
|
||||||
|
|
||||||
sample_filter = storage.SampleFilter(
|
sample_filter = storage.SampleFilter(
|
||||||
@ -416,31 +366,36 @@ class Connection(base.Connection):
|
|||||||
start=start_timestamp, start_timestamp_op=start_timestamp_op,
|
start=start_timestamp, start_timestamp_op=start_timestamp_op,
|
||||||
end=end_timestamp, end_timestamp_op=end_timestamp_op,
|
end=end_timestamp, end_timestamp_op=end_timestamp_op,
|
||||||
resource=resource, source=source, metaquery=metaquery)
|
resource=resource, source=source, metaquery=metaquery)
|
||||||
|
|
||||||
q, start_row, stop_row = make_sample_query_from_filter(
|
q, start_row, stop_row = make_sample_query_from_filter(
|
||||||
sample_filter, require_meter=False)
|
sample_filter, require_meter=False)
|
||||||
|
|
||||||
LOG.debug(_("Query Meter table: %s") % q)
|
LOG.debug(_("Query Meter table: %s") % q)
|
||||||
meters = meter_table.scan(filter=q, row_start=start_row,
|
meters = meter_table.scan(filter=q, row_start=start_row,
|
||||||
row_stop=stop_row)
|
row_stop=stop_row)
|
||||||
|
d_meters = []
|
||||||
|
for i, m in meters:
|
||||||
|
d_meters.append(deserialize_entry(m))
|
||||||
|
|
||||||
# We have to sort on resource_id before we can group by it. According
|
# We have to sort on resource_id before we can group by it. According
|
||||||
# to the itertools documentation a new group is generated when the
|
# to the itertools documentation a new group is generated when the
|
||||||
# value of the key function changes (it breaks there).
|
# value of the key function changes (it breaks there).
|
||||||
meters = sorted(meters, key=_resource_id_from_record_tuple)
|
meters = sorted(d_meters, key=_resource_id_from_record_tuple)
|
||||||
|
|
||||||
for resource_id, r_meters in itertools.groupby(
|
for resource_id, r_meters in itertools.groupby(
|
||||||
meters, key=_resource_id_from_record_tuple):
|
meters, key=_resource_id_from_record_tuple):
|
||||||
meter_rows = [data[1] for data in sorted(
|
# We need deserialized entry(data[0]) and metadata(data[3])
|
||||||
|
meter_rows = [(data[0], data[3]) for data in sorted(
|
||||||
r_meters, key=_timestamp_from_record_tuple)]
|
r_meters, key=_timestamp_from_record_tuple)]
|
||||||
|
|
||||||
latest_data = meter_rows[-1]
|
latest_data = meter_rows[-1]
|
||||||
min_ts = timeutils.parse_strtime(meter_rows[0]['f:timestamp'])
|
min_ts = meter_rows[0][0]['timestamp']
|
||||||
max_ts = timeutils.parse_strtime(latest_data['f:timestamp'])
|
max_ts = latest_data[0]['timestamp']
|
||||||
yield make_resource(
|
yield models.Resource(
|
||||||
latest_data,
|
resource_id=resource_id,
|
||||||
min_ts,
|
first_sample_timestamp=min_ts,
|
||||||
max_ts
|
last_sample_timestamp=max_ts,
|
||||||
|
project_id=latest_data[0]['project_id'],
|
||||||
|
source=latest_data[0]['source'],
|
||||||
|
user_id=latest_data[0]['user_id'],
|
||||||
|
metadata=latest_data[1],
|
||||||
)
|
)
|
||||||
|
|
||||||
def get_meters(self, user=None, project=None, resource=None, source=None,
|
def get_meters(self, user=None, project=None, resource=None, source=None,
|
||||||
@ -465,42 +420,28 @@ class Connection(base.Connection):
|
|||||||
gen = resource_table.scan(filter=q)
|
gen = resource_table.scan(filter=q)
|
||||||
|
|
||||||
for ignored, data in gen:
|
for ignored, data in gen:
|
||||||
# Meter columns are stored like this:
|
flatten_result, s, m, md = deserialize_entry(data)
|
||||||
# "m_{counter_name}|{counter_type}|{counter_unit}" => "1"
|
if not m:
|
||||||
# where 'm' is a prefix (m for meter), value is always set to 1
|
|
||||||
meter = None
|
|
||||||
for m in data:
|
|
||||||
if m.startswith('f:m_'):
|
|
||||||
meter = m
|
|
||||||
break
|
|
||||||
if meter is None:
|
|
||||||
continue
|
continue
|
||||||
name, type, unit = meter[4:].split("!")
|
# Meter table may have only one "meter" and "source". That's why
|
||||||
|
# only first lists element is get in this method
|
||||||
|
name, type, unit = m[0].split("!")
|
||||||
yield models.Meter(
|
yield models.Meter(
|
||||||
name=name,
|
name=name,
|
||||||
type=type,
|
type=type,
|
||||||
unit=unit,
|
unit=unit,
|
||||||
resource_id=data['f:resource_id'],
|
resource_id=flatten_result['resource_id'],
|
||||||
project_id=data['f:project_id'],
|
project_id=flatten_result['project_id'],
|
||||||
source=data['f:source'],
|
source=s[0] if s else None,
|
||||||
user_id=data['f:user_id'],
|
user_id=flatten_result['user_id'],
|
||||||
)
|
)
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def _make_sample(data):
|
|
||||||
"""Transform HBase fields to Sample model."""
|
|
||||||
data = json.loads(data['f:message'])
|
|
||||||
data['timestamp'] = timeutils.parse_strtime(data['timestamp'])
|
|
||||||
data['recorded_at'] = timeutils.parse_strtime(data['recorded_at'])
|
|
||||||
return models.Sample(**data)
|
|
||||||
|
|
||||||
def get_samples(self, sample_filter, limit=None):
|
def get_samples(self, sample_filter, limit=None):
|
||||||
"""Return an iterable of models.Sample instances.
|
"""Return an iterable of models.Sample instances.
|
||||||
|
|
||||||
:param sample_filter: Filter.
|
:param sample_filter: Filter.
|
||||||
:param limit: Maximum number of results to return.
|
:param limit: Maximum number of results to return.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
meter_table = self.conn.table(self.METER_TABLE)
|
meter_table = self.conn.table(self.METER_TABLE)
|
||||||
|
|
||||||
q, start, stop = make_sample_query_from_filter(
|
q, start, stop = make_sample_query_from_filter(
|
||||||
@ -513,7 +454,9 @@ class Connection(base.Connection):
|
|||||||
break
|
break
|
||||||
else:
|
else:
|
||||||
limit -= 1
|
limit -= 1
|
||||||
yield self._make_sample(meter)
|
d_meter = deserialize_entry(meter)[0]
|
||||||
|
d_meter['message']['recorded_at'] = d_meter['recorded_at']
|
||||||
|
yield models.Sample(**d_meter['message'])
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def _update_meter_stats(stat, meter):
|
def _update_meter_stats(stat, meter):
|
||||||
@ -525,9 +468,9 @@ class Connection(base.Connection):
|
|||||||
:param start_time: query start time
|
:param start_time: query start time
|
||||||
:param period: length of the time bucket
|
:param period: length of the time bucket
|
||||||
"""
|
"""
|
||||||
vol = int(meter['f:counter_volume'])
|
vol = meter['counter_volume']
|
||||||
ts = timeutils.parse_strtime(meter['f:timestamp'])
|
ts = meter['timestamp']
|
||||||
stat.unit = meter['f:counter_unit']
|
stat.unit = meter['counter_unit']
|
||||||
stat.min = min(vol, stat.min or vol)
|
stat.min = min(vol, stat.min or vol)
|
||||||
stat.max = max(vol, stat.max)
|
stat.max = max(vol, stat.max)
|
||||||
stat.sum = vol + (stat.sum or 0)
|
stat.sum = vol + (stat.sum or 0)
|
||||||
@ -557,22 +500,21 @@ class Connection(base.Connection):
|
|||||||
|
|
||||||
meter_table = self.conn.table(self.METER_TABLE)
|
meter_table = self.conn.table(self.METER_TABLE)
|
||||||
q, start, stop = make_sample_query_from_filter(sample_filter)
|
q, start, stop = make_sample_query_from_filter(sample_filter)
|
||||||
meters = list(meter for (ignored, meter) in
|
meters = map(deserialize_entry, list(meter for (ignored, meter) in
|
||||||
meter_table.scan(filter=q, row_start=start,
|
meter_table.scan(filter=q, row_start=start,
|
||||||
row_stop=stop)
|
row_stop=stop)))
|
||||||
)
|
|
||||||
|
|
||||||
if sample_filter.start:
|
if sample_filter.start:
|
||||||
start_time = sample_filter.start
|
start_time = sample_filter.start
|
||||||
elif meters:
|
elif meters:
|
||||||
start_time = timeutils.parse_strtime(meters[-1]['f:timestamp'])
|
start_time = meters[-1][0]['timestamp']
|
||||||
else:
|
else:
|
||||||
start_time = None
|
start_time = None
|
||||||
|
|
||||||
if sample_filter.end:
|
if sample_filter.end:
|
||||||
end_time = sample_filter.end
|
end_time = sample_filter.end
|
||||||
elif meters:
|
elif meters:
|
||||||
end_time = timeutils.parse_strtime(meters[0]['f:timestamp'])
|
end_time = meters[0][0]['timestamp']
|
||||||
else:
|
else:
|
||||||
end_time = None
|
end_time = None
|
||||||
|
|
||||||
@ -586,7 +528,7 @@ class Connection(base.Connection):
|
|||||||
# As our HBase meters are stored as newest-first, we need to iterate
|
# As our HBase meters are stored as newest-first, we need to iterate
|
||||||
# in the reverse order
|
# in the reverse order
|
||||||
for meter in meters[::-1]:
|
for meter in meters[::-1]:
|
||||||
ts = timeutils.parse_strtime(meter['f:timestamp'])
|
ts = meter[0]['timestamp']
|
||||||
if period:
|
if period:
|
||||||
offset = int(timeutils.delta_seconds(
|
offset = int(timeutils.delta_seconds(
|
||||||
start_time, ts) / period) * period
|
start_time, ts) / period) * period
|
||||||
@ -612,7 +554,7 @@ class Connection(base.Connection):
|
|||||||
duration_end=None,
|
duration_end=None,
|
||||||
groupby=None)
|
groupby=None)
|
||||||
)
|
)
|
||||||
self._update_meter_stats(results[-1], meter)
|
self._update_meter_stats(results[-1], meter[0])
|
||||||
return results
|
return results
|
||||||
|
|
||||||
|
|
||||||
@ -666,7 +608,7 @@ class MTable(object):
|
|||||||
# Extract filter name and its arguments
|
# Extract filter name and its arguments
|
||||||
g = re.search("(.*)\((.*),?\)", f)
|
g = re.search("(.*)\((.*),?\)", f)
|
||||||
fname = g.group(1).strip()
|
fname = g.group(1).strip()
|
||||||
fargs = [s.strip().replace('\'', '').replace('\"', '')
|
fargs = [s.strip().replace('\'', '')
|
||||||
for s in g.group(2).split(',')]
|
for s in g.group(2).split(',')]
|
||||||
m = getattr(self, fname)
|
m = getattr(self, fname)
|
||||||
if callable(m):
|
if callable(m):
|
||||||
@ -769,6 +711,7 @@ def make_timestamp_query(func, start=None, start_op=None, end=None,
|
|||||||
return start_row, end_row
|
return start_row, end_row
|
||||||
|
|
||||||
q = []
|
q = []
|
||||||
|
# We dont need to dump here because get_start_end_rts returns strings
|
||||||
if rts_start:
|
if rts_start:
|
||||||
q.append("SingleColumnValueFilter ('f', 'rts', <=, 'binary:%s')" %
|
q.append("SingleColumnValueFilter ('f', 'rts', <=, 'binary:%s')" %
|
||||||
rts_start)
|
rts_start)
|
||||||
@ -805,11 +748,14 @@ def make_query(metaquery=None, **kwargs):
|
|||||||
column name in db
|
column name in db
|
||||||
"""
|
"""
|
||||||
q = []
|
q = []
|
||||||
|
# Note: we use extended constructor for SingleColumnValueFilter here.
|
||||||
for key, value in kwargs.iteritems():
|
# It is explicitly specified that entry should not be returned if CF is not
|
||||||
|
# found in table.
|
||||||
|
for key, value in kwargs.items():
|
||||||
if value is not None:
|
if value is not None:
|
||||||
q.append("SingleColumnValueFilter "
|
q.append("SingleColumnValueFilter "
|
||||||
"('f', '%s', =, 'binary:%s')" % (key, value))
|
"('f', '%s', =, 'binary:%s', true, true)" %
|
||||||
|
(key, dump(value)))
|
||||||
res_q = None
|
res_q = None
|
||||||
if len(q):
|
if len(q):
|
||||||
res_q = " AND ".join(q)
|
res_q = " AND ".join(q)
|
||||||
@ -818,8 +764,9 @@ def make_query(metaquery=None, **kwargs):
|
|||||||
meta_q = []
|
meta_q = []
|
||||||
for k, v in metaquery.items():
|
for k, v in metaquery.items():
|
||||||
meta_q.append(
|
meta_q.append(
|
||||||
"SingleColumnValueFilter ('f', '%s', =, 'binary:%s')"
|
"SingleColumnValueFilter ('f', '%s', =, 'binary:%s', "
|
||||||
% ('r_' + k, v))
|
"true, true)"
|
||||||
|
% ('r_' + k, dump(v)))
|
||||||
meta_q = " AND ".join(meta_q)
|
meta_q = " AND ".join(meta_q)
|
||||||
# join query and metaquery
|
# join query and metaquery
|
||||||
if res_q is not None:
|
if res_q is not None:
|
||||||
@ -827,7 +774,7 @@ def make_query(metaquery=None, **kwargs):
|
|||||||
else:
|
else:
|
||||||
res_q = meta_q # metaquery only
|
res_q = meta_q # metaquery only
|
||||||
|
|
||||||
return res_q or ""
|
return res_q
|
||||||
|
|
||||||
|
|
||||||
def make_sample_query_from_filter(sample_filter, require_meter=True):
|
def make_sample_query_from_filter(sample_filter, require_meter=True):
|
||||||
@ -876,16 +823,6 @@ def _make_general_rowkey_scan(rts_start=None, rts_end=None, some_id=None):
|
|||||||
return start_row, end_row
|
return start_row, end_row
|
||||||
|
|
||||||
|
|
||||||
def _load_hbase_list(d, prefix):
|
|
||||||
"""Deserialise dict stored as HBase column family
|
|
||||||
"""
|
|
||||||
ret = []
|
|
||||||
prefix = 'f:%s_' % prefix
|
|
||||||
for key in (k for k in d if k.startswith(prefix)):
|
|
||||||
ret.append(key[len(prefix):])
|
|
||||||
return ret
|
|
||||||
|
|
||||||
|
|
||||||
def _format_meter_reference(counter_name, counter_type, counter_unit):
|
def _format_meter_reference(counter_name, counter_type, counter_unit):
|
||||||
"""Format reference to meter data.
|
"""Format reference to meter data.
|
||||||
"""
|
"""
|
||||||
@ -895,45 +832,105 @@ def _format_meter_reference(counter_name, counter_type, counter_unit):
|
|||||||
def _timestamp_from_record_tuple(record):
|
def _timestamp_from_record_tuple(record):
|
||||||
"""Extract timestamp from HBase tuple record
|
"""Extract timestamp from HBase tuple record
|
||||||
"""
|
"""
|
||||||
return timeutils.parse_strtime(record[1]['f:timestamp'])
|
return record[0]['timestamp']
|
||||||
|
|
||||||
|
|
||||||
def _resource_id_from_record_tuple(record):
|
def _resource_id_from_record_tuple(record):
|
||||||
"""Extract resource_id from HBase tuple record
|
"""Extract resource_id from HBase tuple record
|
||||||
"""
|
"""
|
||||||
return record[1]['f:resource_id']
|
return record[0]['resource_id']
|
||||||
|
|
||||||
|
|
||||||
#TODO(nprivalova): to be refactored, will be used everywhere in impl_hbase
|
def deserialize_entry(entry, get_raw_meta=True):
|
||||||
# without additional ifs
|
"""Return a list of flatten_result, sources, meters and metadata
|
||||||
def serialize_entry(entry_from_user):
|
flatten_result contains a dict of simple structures such as 'resource_id':1
|
||||||
result_dict = copy.copy(entry_from_user)
|
sources/meters are the lists of sources and meters correspondingly.
|
||||||
keys = result_dict.keys()
|
metadata is metadata dict. This dict may be returned as flattened if
|
||||||
for key in keys:
|
get_raw_meta is False.
|
||||||
val = result_dict[key]
|
|
||||||
if isinstance(val, datetime.datetime):
|
:param entry: entry from HBase, without row name and timestamp
|
||||||
val = timeutils.strtime(val)
|
:param get_raw_meta: If true then raw metadata will be returned
|
||||||
if not isinstance(val, basestring):
|
If False metadata will be constructed from
|
||||||
val = json.dumps(val)
|
'f:r_metadata.' fields
|
||||||
result_dict['f:' + key] = val
|
"""
|
||||||
del result_dict[key]
|
flatten_result = {}
|
||||||
return result_dict
|
sources = []
|
||||||
|
meters = []
|
||||||
|
metadata_flattened = {}
|
||||||
|
for k, v in entry.items():
|
||||||
|
if k.startswith('f:s_'):
|
||||||
|
sources.append(k[4:])
|
||||||
|
elif k.startswith('f:m_'):
|
||||||
|
meters.append(k[4:])
|
||||||
|
elif k.startswith('f:r_metadata.'):
|
||||||
|
metadata_flattened[k[len('f:r_metadata.'):]] = load(v)
|
||||||
|
else:
|
||||||
|
flatten_result[k[2:]] = load(v)
|
||||||
|
if get_raw_meta:
|
||||||
|
metadata = flatten_result.get('metadata', {})
|
||||||
|
else:
|
||||||
|
metadata = metadata_flattened
|
||||||
|
|
||||||
|
return flatten_result, sources, meters, metadata
|
||||||
|
|
||||||
|
|
||||||
def deserialize_entry(stored_entry):
|
def serialize_entry(data={}, **kwargs):
|
||||||
result_entry = copy.copy(stored_entry)
|
"""Return a dict that is ready to be stored to HBase
|
||||||
keys = result_entry.keys()
|
|
||||||
for key in keys:
|
:param data: dict to be serialized
|
||||||
val = result_entry[key]
|
:param kwargs: additional args
|
||||||
try:
|
"""
|
||||||
val = json.loads(val)
|
entry_dict = copy.copy(data)
|
||||||
except ValueError:
|
entry_dict.update(**kwargs)
|
||||||
pass
|
|
||||||
if "timestamp" in key and val:
|
result = {}
|
||||||
val = timeutils.parse_strtime(val)
|
for k, v in entry_dict.items():
|
||||||
# There is no int in wsme models
|
if k == 'sources':
|
||||||
if isinstance(val, (int, long, float)) and not isinstance(val, bool):
|
# user and project tables may contain several sources and meters
|
||||||
val = str(val)
|
# that's why we store it separately as pairs "source/meter name:1".
|
||||||
result_entry[key[2:]] = val
|
# Resource and meter table contain only one and it's possible
|
||||||
del result_entry[key]
|
# to store pairs like "source/meter:source name/meter name". But to
|
||||||
return result_entry
|
# keep things simple it's possible to store all variants in all
|
||||||
|
# tables because it doesn't break logic and overhead is not too big
|
||||||
|
for source in v:
|
||||||
|
result['f:s_%s' % source] = dump('1')
|
||||||
|
if v:
|
||||||
|
result['f:source'] = dump(v[0])
|
||||||
|
elif k == 'meters':
|
||||||
|
for meter in v:
|
||||||
|
result['f:m_%s' % meter] = dump('1')
|
||||||
|
elif k == 'metadata':
|
||||||
|
# keep raw metadata as well as flattened to provide
|
||||||
|
# capability with API v2. It will be flattened in another
|
||||||
|
# way on API level. But we need flattened too for quick filtering.
|
||||||
|
flattened_meta = dump_metadata(v)
|
||||||
|
for k, m in flattened_meta.items():
|
||||||
|
result['f:r_metadata.' + k] = dump(m)
|
||||||
|
result['f:metadata'] = dump(v)
|
||||||
|
else:
|
||||||
|
result['f:' + k] = dump(v)
|
||||||
|
return result
|
||||||
|
|
||||||
|
|
||||||
|
def dump_metadata(meta):
|
||||||
|
resource_metadata = {}
|
||||||
|
for key, v in utils.dict_to_keyval(meta):
|
||||||
|
resource_metadata[key] = v
|
||||||
|
return resource_metadata
|
||||||
|
|
||||||
|
|
||||||
|
def dump(data):
|
||||||
|
return json.dumps(data, default=bson.json_util.default)
|
||||||
|
|
||||||
|
|
||||||
|
def load(data):
|
||||||
|
return json.loads(data, object_hook=object_hook)
|
||||||
|
|
||||||
|
|
||||||
|
# We don't want to have tzinfo in decoded json.This object_hook is
|
||||||
|
# overwritten json_util.object_hook for $date
|
||||||
|
def object_hook(dct):
|
||||||
|
if "$date" in dct:
|
||||||
|
dt = bson.json_util.object_hook(dct)
|
||||||
|
return dt.replace(tzinfo=None)
|
||||||
|
return bson.json_util.object_hook(dct)
|
||||||
|
Loading…
Reference in New Issue
Block a user