Merge "[HBase] get_resource optimization"

This commit is contained in:
Jenkins 2014-06-15 21:40:02 +00:00 committed by Gerrit Code Review
commit 3efe03b60a

View File

@ -20,12 +20,13 @@
import copy import copy
import datetime import datetime
import hashlib import hashlib
import itertools
import json import json
import operator import operator
import os import os
import re import re
import six
import six.moves.urllib.parse as urlparse import six.moves.urllib.parse as urlparse
import time
import bson.json_util import bson.json_util
import happybase import happybase
@ -34,7 +35,6 @@ from ceilometer.openstack.common.gettextutils import _
from ceilometer.openstack.common import log from ceilometer.openstack.common import log
from ceilometer.openstack.common import network_utils from ceilometer.openstack.common import network_utils
from ceilometer.openstack.common import timeutils from ceilometer.openstack.common import timeutils
from ceilometer import storage
from ceilometer.storage import base from ceilometer.storage import base
from ceilometer.storage import models from ceilometer.storage import models
from ceilometer import utils from ceilometer import utils
@ -107,8 +107,8 @@ class Connection(base.Connection):
f:r_metadata.display_name or f:r_metadata.tag f:r_metadata.display_name or f:r_metadata.tag
-sources for all corresponding meters with prefix 's' -sources for all corresponding meters with prefix 's'
-all meters for this resource in format -all meters for this resource in format
"%s!%s!%s+%s" % (counter_name, counter_type, counter_unit, "%s+%s+%s!%s!%s" % (rts, source, counter_name, counter_type,
source) counter_unit)
- alarm - alarm
- row_key: uuid of alarm - row_key: uuid of alarm
@ -308,16 +308,25 @@ class Connection(base.Connection):
resource_metadata = data.get('resource_metadata', {}) resource_metadata = data.get('resource_metadata', {})
# Determine the name of new meter # Determine the name of new meter
rts = timestamp(data['timestamp'])
new_meter = _format_meter_reference( new_meter = _format_meter_reference(
data['counter_name'], data['counter_type'], data['counter_name'], data['counter_type'],
data['counter_unit'], data['source']) data['counter_unit'], rts, data['source'])
#TODO(nprivalova): try not to store resource_id #TODO(nprivalova): try not to store resource_id
resource = serialize_entry(**{ resource = serialize_entry(**{
'source': data['source'], 'meter': new_meter, 'source': data['source'],
'meter': {new_meter: data['timestamp']},
'resource_metadata': resource_metadata, 'resource_metadata': resource_metadata,
'resource_id': data['resource_id'], 'resource_id': data['resource_id'],
'project_id': data['project_id'], 'user_id': data['user_id']}) 'project_id': data['project_id'], 'user_id': data['user_id']})
resource_table.put(data['resource_id'], resource) # Here we put entry in HBase with our own timestamp. This is needed
# when samples arrive out-of-order
# If we use timestamp=data['timestamp'] the newest data will be
# automatically 'on the top'. It is needed to keep metadata
# up-to-date: metadata from newest samples is considered as actual.
ts = int(time.mktime(data['timestamp'].timetuple()) * 1000)
resource_table.put(data['resource_id'], resource, ts)
#TODO(nprivalova): improve uniqueness #TODO(nprivalova): improve uniqueness
# Rowkey consists of reversed timestamp, meter and an md5 of # Rowkey consists of reversed timestamp, meter and an md5 of
@ -325,10 +334,6 @@ class Connection(base.Connection):
m = hashlib.md5() m = hashlib.md5()
m.update("%s%s%s" % (data['user_id'], data['resource_id'], m.update("%s%s%s" % (data['user_id'], data['resource_id'],
data['project_id'])) data['project_id']))
# We use reverse timestamps in rowkeys as they are sorted
# alphabetically.
rts = timestamp(data['timestamp'])
row = "%s_%d_%s" % (data['counter_name'], rts, m.hexdigest()) row = "%s_%d_%s" % (data['counter_name'], rts, m.hexdigest())
record = serialize_entry(data, **{'source': data['source'], record = serialize_entry(data, **{'source': data['source'],
'rts': rts, 'rts': rts,
@ -357,47 +362,38 @@ class Connection(base.Connection):
if pagination: if pagination:
raise NotImplementedError('Pagination not implemented') raise NotImplementedError('Pagination not implemented')
metaquery = metaquery or {} q = make_query(metaquery=metaquery, user_id=user, project_id=project,
resource_id=resource, source=source)
sample_filter = storage.SampleFilter( q = make_meter_query_for_resource(start_timestamp, start_timestamp_op,
user=user, project=project, end_timestamp, end_timestamp_op,
start=start_timestamp, start_timestamp_op=start_timestamp_op, source, q)
end=end_timestamp, end_timestamp_op=end_timestamp_op,
resource=resource, source=source, metaquery=metaquery)
q, start_row, stop_row, __ = make_sample_query_from_filter(
sample_filter, require_meter=False)
with self.conn_pool.connection() as conn: with self.conn_pool.connection() as conn:
meter_table = conn.table(self.METER_TABLE) resource_table = conn.table(self.RESOURCE_TABLE)
LOG.debug(_("Query Meter table: %s") % q) LOG.debug(_("Query Resource table: %s") % q)
meters = meter_table.scan(filter=q, row_start=start_row, for resource_id, data in resource_table.scan(filter=q):
row_stop=stop_row) f_res, sources, meters, md = deserialize_entry(data)
d_meters = [] # Unfortunately happybase doesn't keep ordered result from
for i, m in meters: # HBase. So that's why it's needed to find min and max
d_meters.append(deserialize_entry(m)) # manually
first_ts = min(meters, key=operator.itemgetter(1))[1]
# We have to sort on resource_id before we can group by it. last_ts = max(meters, key=operator.itemgetter(1))[1]
# According to the itertools documentation a new group is source = meters[0][0].split('+')[1]
# generated when the value of the key function changes # If we use QualifierFilter then HBase returnes only
# (it breaks there). # qualifiers filtered by. It will not return the whole entry.
meters = sorted(d_meters, key=_resource_id_from_record_tuple) # That's why if we need to ask additional qualifiers manually.
for resource_id, r_meters in itertools.groupby( if 'project_id' not in f_res and 'user_id' not in f_res:
meters, key=_resource_id_from_record_tuple): row = resource_table.row(
# We need deserialized entry(data[0]), sources (data[1]) and resource_id, columns=['f:project_id', 'f:user_id',
# metadata(data[3]) 'f:resource_metadata'])
meter_rows = [(data[0], data[1], data[3]) for data in sorted( f_res, _s, _m, md = deserialize_entry(row)
r_meters, key=_timestamp_from_record_tuple)]
latest_data = meter_rows[-1]
min_ts = meter_rows[0][0]['timestamp']
max_ts = latest_data[0]['timestamp']
yield models.Resource( yield models.Resource(
resource_id=resource_id, resource_id=resource_id,
first_sample_timestamp=min_ts, first_sample_timestamp=first_ts,
last_sample_timestamp=max_ts, last_sample_timestamp=last_ts,
project_id=latest_data[0]['project_id'], project_id=f_res['project_id'],
source=latest_data[1][0], source=source,
user_id=latest_data[0]['user_id'], user_id=f_res['user_id'],
metadata=latest_data[2], metadata=md)
)
def get_meters(self, user=None, project=None, resource=None, source=None, def get_meters(self, user=None, project=None, resource=None, source=None,
metaquery=None, pagination=None): metaquery=None, pagination=None):
@ -430,8 +426,8 @@ class Connection(base.Connection):
for ignored, data in gen: for ignored, data in gen:
flatten_result, s, meters, md = deserialize_entry(data) flatten_result, s, meters, md = deserialize_entry(data)
for m in meters: for m in meters:
meter_raw, m_source = m.split("+") _m_rts, m_source, m_raw = m[0].split("+")
name, type, unit = meter_raw.split('!') name, type, unit = m_raw.split('!')
meter_dict = {'name': name, meter_dict = {'name': name,
'type': type, 'type': type,
'unit': unit, 'unit': unit,
@ -453,13 +449,15 @@ class Connection(base.Connection):
:param sample_filter: Filter. :param sample_filter: Filter.
:param limit: Maximum number of results to return. :param limit: Maximum number of results to return.
""" """
if limit == 0:
return
with self.conn_pool.connection() as conn: with self.conn_pool.connection() as conn:
meter_table = conn.table(self.METER_TABLE) meter_table = conn.table(self.METER_TABLE)
q, start, stop, columns = make_sample_query_from_filter( q, start, stop, columns = make_sample_query_from_filter(
sample_filter, require_meter=False) sample_filter, require_meter=False)
LOG.debug(_("Query Meter Table: %s") % q) LOG.debug(_("Query Meter Table: %s") % q)
gen = meter_table.scan(filter=q, row_start=start, row_stop=stop, gen = meter_table.scan(filter=q, row_start=start, row_stop=stop,
columns=columns, limit=limit) limit=limit)
for ignored, meter in gen: for ignored, meter in gen:
d_meter = deserialize_entry(meter)[0] d_meter = deserialize_entry(meter)[0]
d_meter['message']['recorded_at'] = d_meter['recorded_at'] d_meter['message']['recorded_at'] = d_meter['recorded_at']
@ -707,6 +705,10 @@ class Connection(base.Connection):
yield trait yield trait
def _QualifierFilter(op, qualifier):
return "QualifierFilter (%s, 'binaryprefix:m_%s')" % (op, qualifier)
############### ###############
# This is a very crude version of "in-memory HBase", which implements just # This is a very crude version of "in-memory HBase", which implements just
# enough functionality of HappyBase API to support testing of our driver. # enough functionality of HappyBase API to support testing of our driver.
@ -717,27 +719,56 @@ class MTable(object):
def __init__(self, name, families): def __init__(self, name, families):
self.name = name self.name = name
self.families = families self.families = families
self._rows = {} self._rows_with_ts = {}
def row(self, key): def row(self, key, columns=None):
return self._rows.get(key, {}) if key not in self._rows_with_ts:
return {}
res = copy.copy(sorted(six.iteritems(
self._rows_with_ts.get(key)))[-1][1])
if columns:
keys = res.keys()
for key in keys:
if key not in columns:
res.pop(key)
return res
def rows(self, keys): def rows(self, keys):
return ((k, self.row(k)) for k in keys) return ((k, self.row(k)) for k in keys)
def put(self, key, data): def put(self, key, data, ts=None):
if key not in self._rows: # Note: Now we use 'timestamped' but only for one Resource table.
self._rows[key] = data # That's why we may put ts='0' in case when ts is None. If it is
# needed to use 2 types of put in one table ts=0 cannot be used.
if ts is None:
ts = "0"
if key not in self._rows_with_ts:
self._rows_with_ts[key] = {ts: data}
else: else:
self._rows[key].update(data) if ts in self._rows_with_ts[key]:
self._rows_with_ts[key][ts].update(data)
else:
self._rows_with_ts[key].update({ts: data})
def delete(self, key): def delete(self, key):
del self._rows[key] del self._rows_with_ts[key]
def _get_latest_dict(self, row):
# The idea here is to return latest versions of columns.
# In _rows_with_ts we store {row: {ts_1: {data}, ts_2: {data}}}.
# res will contain a list of tuples [(ts_1, {data}), (ts_2, {data})]
# sorted by ts, i.e. in this list ts_2 is the most latest.
# To get result as HBase provides we should iterate in reverse order
# and get from "latest" data only key-values that are not in newer data
data = {}
for i in sorted(six.iteritems(self._rows_with_ts[row])):
data.update(i[1])
return data
def scan(self, filter=None, columns=None, row_start=None, row_stop=None, def scan(self, filter=None, columns=None, row_start=None, row_stop=None,
limit=None): limit=None):
columns = columns or [] columns = columns or []
sorted_keys = sorted(self._rows) sorted_keys = sorted(self._rows_with_ts)
# copy data between row_start and row_stop into a dict # copy data between row_start and row_stop into a dict
rows = {} rows = {}
for row in sorted_keys: for row in sorted_keys:
@ -745,11 +776,11 @@ class MTable(object):
continue continue
if row_stop and row > row_stop: if row_stop and row > row_stop:
break break
rows[row] = copy.copy(self._rows[row]) rows[row] = self._get_latest_dict(row)
if columns: if columns:
ret = {} ret = {}
for row in rows.keys(): for row, data in six.iteritems(rows):
data = rows[row]
for key in data: for key in data:
if key in columns: if key in columns:
ret[row] = data ret[row] = data
@ -854,6 +885,33 @@ class MTable(object):
pass pass
return r return r
@staticmethod
def QualifierFilter(args, rows):
"""This method is called from scan() when 'QualifierFilter'
is found in the 'filter' argument
"""
op = args[0]
value = args[1]
if value.startswith('binaryprefix:'):
value = value[len('binaryprefix:'):]
column = 'f:' + value
r = {}
for row in rows:
data = rows[row]
r_data = {}
for key in data:
if (op == '=' and key.startswith(column)) or \
(op == '>=' and key >= column) or \
(op == '<=' and key <= column):
r_data[key] = data[key]
else:
raise NotImplementedError("In-memory QualifierFilter "
"doesn't support the %s "
"operation yet" % op)
if r_data:
r[row] = r_data
return r
class MConnectionPool(object): class MConnectionPool(object):
def __init__(self): def __init__(self):
@ -898,14 +956,14 @@ class MConnection(object):
def timestamp(dt, reverse=True): def timestamp(dt, reverse=True):
"""Timestamp is count of milliseconds since start of epoch. """Timestamp is count of milliseconds since start of epoch.
Timestamps is a technique used in HBase rowkey design. When period If reverse=True then timestamp will be reversed. Such a technique is used
queries are required the HBase rowkeys must include timestamps, but as in HBase rowkey design when period queries are required. Because of the
rowkeys in HBase are ordered lexicographically. fact that rows are sorted lexicographically it's possible to vary whether
the 'oldest' entries will be on top of the table or it should be the newest
ones (reversed timestamp case).
Same for the reversed timestamps, but the order will be opposite. :param: dt: datetime which is translated to timestamp
:param: reverse: a boolean parameter for reverse or straight count of
:param: dt: datetime which is translated to the (reversed or not) timestamp
:param: reverse: is a boolean parameter for reverse or straight count of
timestamp in milliseconds timestamp in milliseconds
:return count or reversed count of milliseconds since start of epoch :return count or reversed count of milliseconds since start of epoch
""" """
@ -1006,9 +1064,9 @@ def make_query(metaquery=None, trait_query=None, **kwargs):
q = [] q = []
res_q = None res_q = None
# Query for traits if a little differ from others it is constructed with # Query for traits differs from others. It is constructed with
# SingleColumnValueFilter with the possibility to choose an operator for # SingleColumnValueFilter with the possibility to choose comparision
# value # operator
if trait_query: if trait_query:
trait_name = kwargs.pop('key') trait_name = kwargs.pop('key')
op = kwargs.pop('op', 'eq') op = kwargs.pop('op', 'eq')
@ -1105,6 +1163,43 @@ def make_sample_query_from_filter(sample_filter, require_meter=True):
return res_q, start_row, end_row, columns return res_q, start_row, end_row, columns
def make_meter_query_for_resource(start_timestamp, start_timestamp_op,
end_timestamp, end_timestamp_op, source,
query=None):
"""This method is used when Resource table should be filtered by meters.
In this method we are looking into all qualifiers with m_ prefix.
:param start_timestamp: meter's timestamp start range.
:param start_timestamp_op: meter's start time operator, like ge, gt.
:param end_timestamp: meter's timestamp end range.
:param end_timestamp_op: meter's end time operator, like lt, le.
:param source: source filter.
:param query: a query string to concatenate with.
"""
start_rts, end_rts = get_start_end_rts(start_timestamp,
start_timestamp_op,
end_timestamp, end_timestamp_op)
mq = []
if start_rts:
filter_value = start_rts + '+' + source if source else start_rts
mq.append(_QualifierFilter("<=", filter_value))
if end_rts:
filter_value = end_rts + '+' + source if source else end_rts
mq.append(_QualifierFilter(">=", filter_value))
if mq:
meter_q = " AND ".join(mq)
# If there is a filtering on time_range we need to point that
# qualifiers should start with m_. Overwise in case e.g.
# QualifierFilter (>=, 'binaryprefix:m_9222030811134775808')
# qualifier 's_test' satisfies the filter and will be returned.
meter_q = _QualifierFilter("=", '') + " AND " + meter_q
query = meter_q if not query else query + " AND " + meter_q
return query
def _make_general_rowkey_scan(rts_start=None, rts_end=None, some_id=None): def _make_general_rowkey_scan(rts_start=None, rts_end=None, some_id=None):
"""If it's filter on some_id without start and end, """If it's filter on some_id without start and end,
start_row = some_id while end_row = some_id + MAX_BYTE start_row = some_id while end_row = some_id + MAX_BYTE
@ -1119,10 +1214,10 @@ def _make_general_rowkey_scan(rts_start=None, rts_end=None, some_id=None):
return start_row, end_row return start_row, end_row
def _format_meter_reference(counter_name, counter_type, counter_unit, source): def _format_meter_reference(c_name, c_type, c_unit, rts, source):
"""Format reference to meter data. """Format reference to meter data.
""" """
return "%s!%s!%s+%s" % (counter_name, counter_type, counter_unit, source) return "%s+%s+%s!%s!%s" % (rts, source, c_name, c_type, c_unit)
def _timestamp_from_record_tuple(record): def _timestamp_from_record_tuple(record):
@ -1158,8 +1253,9 @@ def deserialize_entry(entry, get_raw_meta=True):
sources.append(k[4:]) sources.append(k[4:])
elif k.startswith('f:r_metadata.'): elif k.startswith('f:r_metadata.'):
metadata_flattened[k[len('f:r_metadata.'):]] = load(v) metadata_flattened[k[len('f:r_metadata.'):]] = load(v)
elif k.startswith('f:m_'): elif k.startswith("f:m_"):
meters.append(k[4:]) meter = (k[4:], load(v))
meters.append(meter)
else: else:
flatten_result[k[2:]] = load(v) flatten_result[k[2:]] = load(v)
if get_raw_meta: if get_raw_meta:
@ -1189,9 +1285,9 @@ def serialize_entry(data=None, **kwargs):
# a separate cell. For this purpose s_ and m_ prefixes are # a separate cell. For this purpose s_ and m_ prefixes are
# introduced. # introduced.
result['f:s_%s' % v] = dump('1') result['f:s_%s' % v] = dump('1')
elif k == 'meter': elif k == 'meter':
result['f:m_%s' % v] = dump('1') for meter, ts in v.items():
result['f:m_%s' % meter] = dump(ts)
elif k == 'resource_metadata': elif k == 'resource_metadata':
# keep raw metadata as well as flattened to provide # keep raw metadata as well as flattened to provide
# capability with API v2. It will be flattened in another # capability with API v2. It will be flattened in another