From a0667fe0404f097658752259de577ea011582391 Mon Sep 17 00:00:00 2001 From: Nadya Privalova Date: Thu, 10 Apr 2014 16:18:00 +0400 Subject: [PATCH] [HBase] get_resource optimization In this change only Resource table is scanned during get_resources(). It becomes possible because of the following: 1. Store all meters for each resource. 2. QualifierFilter is used when timestamp filter is needed. 3. To make metadata up-to date in case of unusual order (old comes earlier then new) we use versioning. So during 'put' it's possible to set timestamp and the newest data will be automatically 'on the top' Implements blueprint hbase-resource-rowkey-enhancement Change-Id: Ie189531e6425af3acfeae5636d8b93b102f319d1 --- ceilometer/storage/impl_hbase.py | 258 +++++++++++++++++++++---------- 1 file changed, 177 insertions(+), 81 deletions(-) diff --git a/ceilometer/storage/impl_hbase.py b/ceilometer/storage/impl_hbase.py index df257cbe3..bc8857a26 100644 --- a/ceilometer/storage/impl_hbase.py +++ b/ceilometer/storage/impl_hbase.py @@ -20,12 +20,13 @@ import copy import datetime import hashlib -import itertools import json import operator import os import re +import six import six.moves.urllib.parse as urlparse +import time import bson.json_util import happybase @@ -34,7 +35,6 @@ from ceilometer.openstack.common.gettextutils import _ from ceilometer.openstack.common import log from ceilometer.openstack.common import network_utils from ceilometer.openstack.common import timeutils -from ceilometer import storage from ceilometer.storage import base from ceilometer.storage import models from ceilometer import utils @@ -107,8 +107,8 @@ class Connection(base.Connection): f:r_metadata.display_name or f:r_metadata.tag -sources for all corresponding meters with prefix 's' -all meters for this resource in format - "%s!%s!%s+%s" % (counter_name, counter_type, counter_unit, - source) + "%s+%s+%s!%s!%s" % (rts, source, counter_name, counter_type, + counter_unit) - alarm - row_key: uuid of alarm @@ -306,16 +306,25 @@ class Connection(base.Connection): resource_metadata = data.get('resource_metadata', {}) # Determine the name of new meter + rts = timestamp(data['timestamp']) new_meter = _format_meter_reference( data['counter_name'], data['counter_type'], - data['counter_unit'], data['source']) + data['counter_unit'], rts, data['source']) + #TODO(nprivalova): try not to store resource_id resource = serialize_entry(**{ - 'source': data['source'], 'meter': new_meter, + 'source': data['source'], + 'meter': {new_meter: data['timestamp']}, 'resource_metadata': resource_metadata, 'resource_id': data['resource_id'], 'project_id': data['project_id'], 'user_id': data['user_id']}) - resource_table.put(data['resource_id'], resource) + # Here we put entry in HBase with our own timestamp. This is needed + # when samples arrive out-of-order + # If we use timestamp=data['timestamp'] the newest data will be + # automatically 'on the top'. It is needed to keep metadata + # up-to-date: metadata from newest samples is considered as actual. + ts = int(time.mktime(data['timestamp'].timetuple()) * 1000) + resource_table.put(data['resource_id'], resource, ts) #TODO(nprivalova): improve uniqueness # Rowkey consists of reversed timestamp, meter and an md5 of @@ -323,10 +332,6 @@ class Connection(base.Connection): m = hashlib.md5() m.update("%s%s%s" % (data['user_id'], data['resource_id'], data['project_id'])) - - # We use reverse timestamps in rowkeys as they are sorted - # alphabetically. - rts = timestamp(data['timestamp']) row = "%s_%d_%s" % (data['counter_name'], rts, m.hexdigest()) record = serialize_entry(data, **{'source': data['source'], 'rts': rts, @@ -355,47 +360,38 @@ class Connection(base.Connection): if pagination: raise NotImplementedError('Pagination not implemented') - metaquery = metaquery or {} - - sample_filter = storage.SampleFilter( - user=user, project=project, - start=start_timestamp, start_timestamp_op=start_timestamp_op, - end=end_timestamp, end_timestamp_op=end_timestamp_op, - resource=resource, source=source, metaquery=metaquery) - q, start_row, stop_row, __ = make_sample_query_from_filter( - sample_filter, require_meter=False) + q = make_query(metaquery=metaquery, user_id=user, project_id=project, + resource_id=resource, source=source) + q = make_meter_query_for_resource(start_timestamp, start_timestamp_op, + end_timestamp, end_timestamp_op, + source, q) with self.conn_pool.connection() as conn: - meter_table = conn.table(self.METER_TABLE) - LOG.debug(_("Query Meter table: %s") % q) - meters = meter_table.scan(filter=q, row_start=start_row, - row_stop=stop_row) - d_meters = [] - for i, m in meters: - d_meters.append(deserialize_entry(m)) - - # We have to sort on resource_id before we can group by it. - # According to the itertools documentation a new group is - # generated when the value of the key function changes - # (it breaks there). - meters = sorted(d_meters, key=_resource_id_from_record_tuple) - for resource_id, r_meters in itertools.groupby( - meters, key=_resource_id_from_record_tuple): - # We need deserialized entry(data[0]), sources (data[1]) and - # metadata(data[3]) - meter_rows = [(data[0], data[1], data[3]) for data in sorted( - r_meters, key=_timestamp_from_record_tuple)] - latest_data = meter_rows[-1] - min_ts = meter_rows[0][0]['timestamp'] - max_ts = latest_data[0]['timestamp'] + resource_table = conn.table(self.RESOURCE_TABLE) + LOG.debug(_("Query Resource table: %s") % q) + for resource_id, data in resource_table.scan(filter=q): + f_res, sources, meters, md = deserialize_entry(data) + # Unfortunately happybase doesn't keep ordered result from + # HBase. So that's why it's needed to find min and max + # manually + first_ts = min(meters, key=operator.itemgetter(1))[1] + last_ts = max(meters, key=operator.itemgetter(1))[1] + source = meters[0][0].split('+')[1] + # If we use QualifierFilter then HBase returnes only + # qualifiers filtered by. It will not return the whole entry. + # That's why if we need to ask additional qualifiers manually. + if 'project_id' not in f_res and 'user_id' not in f_res: + row = resource_table.row( + resource_id, columns=['f:project_id', 'f:user_id', + 'f:resource_metadata']) + f_res, _s, _m, md = deserialize_entry(row) yield models.Resource( resource_id=resource_id, - first_sample_timestamp=min_ts, - last_sample_timestamp=max_ts, - project_id=latest_data[0]['project_id'], - source=latest_data[1][0], - user_id=latest_data[0]['user_id'], - metadata=latest_data[2], - ) + first_sample_timestamp=first_ts, + last_sample_timestamp=last_ts, + project_id=f_res['project_id'], + source=source, + user_id=f_res['user_id'], + metadata=md) def get_meters(self, user=None, project=None, resource=None, source=None, metaquery=None, pagination=None): @@ -428,8 +424,8 @@ class Connection(base.Connection): for ignored, data in gen: flatten_result, s, meters, md = deserialize_entry(data) for m in meters: - meter_raw, m_source = m.split("+") - name, type, unit = meter_raw.split('!') + _m_rts, m_source, m_raw = m[0].split("+") + name, type, unit = m_raw.split('!') meter_dict = {'name': name, 'type': type, 'unit': unit, @@ -451,13 +447,15 @@ class Connection(base.Connection): :param sample_filter: Filter. :param limit: Maximum number of results to return. """ + if limit == 0: + return with self.conn_pool.connection() as conn: meter_table = conn.table(self.METER_TABLE) q, start, stop, columns = make_sample_query_from_filter( sample_filter, require_meter=False) LOG.debug(_("Query Meter Table: %s") % q) gen = meter_table.scan(filter=q, row_start=start, row_stop=stop, - columns=columns, limit=limit) + limit=limit) for ignored, meter in gen: d_meter = deserialize_entry(meter)[0] d_meter['message']['recorded_at'] = d_meter['recorded_at'] @@ -705,6 +703,10 @@ class Connection(base.Connection): yield trait +def _QualifierFilter(op, qualifier): + return "QualifierFilter (%s, 'binaryprefix:m_%s')" % (op, qualifier) + + ############### # This is a very crude version of "in-memory HBase", which implements just # enough functionality of HappyBase API to support testing of our driver. @@ -715,27 +717,56 @@ class MTable(object): def __init__(self, name, families): self.name = name self.families = families - self._rows = {} + self._rows_with_ts = {} - def row(self, key): - return self._rows.get(key, {}) + def row(self, key, columns=None): + if key not in self._rows_with_ts: + return {} + res = copy.copy(sorted(six.iteritems( + self._rows_with_ts.get(key)))[-1][1]) + if columns: + keys = res.keys() + for key in keys: + if key not in columns: + res.pop(key) + return res def rows(self, keys): return ((k, self.row(k)) for k in keys) - def put(self, key, data): - if key not in self._rows: - self._rows[key] = data + def put(self, key, data, ts=None): + # Note: Now we use 'timestamped' but only for one Resource table. + # That's why we may put ts='0' in case when ts is None. If it is + # needed to use 2 types of put in one table ts=0 cannot be used. + if ts is None: + ts = "0" + if key not in self._rows_with_ts: + self._rows_with_ts[key] = {ts: data} else: - self._rows[key].update(data) + if ts in self._rows_with_ts[key]: + self._rows_with_ts[key][ts].update(data) + else: + self._rows_with_ts[key].update({ts: data}) def delete(self, key): - del self._rows[key] + del self._rows_with_ts[key] + + def _get_latest_dict(self, row): + # The idea here is to return latest versions of columns. + # In _rows_with_ts we store {row: {ts_1: {data}, ts_2: {data}}}. + # res will contain a list of tuples [(ts_1, {data}), (ts_2, {data})] + # sorted by ts, i.e. in this list ts_2 is the most latest. + # To get result as HBase provides we should iterate in reverse order + # and get from "latest" data only key-values that are not in newer data + data = {} + for i in sorted(six.iteritems(self._rows_with_ts[row])): + data.update(i[1]) + return data def scan(self, filter=None, columns=None, row_start=None, row_stop=None, limit=None): columns = columns or [] - sorted_keys = sorted(self._rows) + sorted_keys = sorted(self._rows_with_ts) # copy data between row_start and row_stop into a dict rows = {} for row in sorted_keys: @@ -743,11 +774,11 @@ class MTable(object): continue if row_stop and row > row_stop: break - rows[row] = copy.copy(self._rows[row]) + rows[row] = self._get_latest_dict(row) + if columns: ret = {} - for row in rows.keys(): - data = rows[row] + for row, data in six.iteritems(rows): for key in data: if key in columns: ret[row] = data @@ -852,6 +883,33 @@ class MTable(object): pass return r + @staticmethod + def QualifierFilter(args, rows): + """This method is called from scan() when 'QualifierFilter' + is found in the 'filter' argument + """ + op = args[0] + value = args[1] + if value.startswith('binaryprefix:'): + value = value[len('binaryprefix:'):] + column = 'f:' + value + r = {} + for row in rows: + data = rows[row] + r_data = {} + for key in data: + if (op == '=' and key.startswith(column)) or \ + (op == '>=' and key >= column) or \ + (op == '<=' and key <= column): + r_data[key] = data[key] + else: + raise NotImplementedError("In-memory QualifierFilter " + "doesn't support the %s " + "operation yet" % op) + if r_data: + r[row] = r_data + return r + class MConnectionPool(object): def __init__(self): @@ -896,14 +954,14 @@ class MConnection(object): def timestamp(dt, reverse=True): """Timestamp is count of milliseconds since start of epoch. - Timestamps is a technique used in HBase rowkey design. When period - queries are required the HBase rowkeys must include timestamps, but as - rowkeys in HBase are ordered lexicographically. + If reverse=True then timestamp will be reversed. Such a technique is used + in HBase rowkey design when period queries are required. Because of the + fact that rows are sorted lexicographically it's possible to vary whether + the 'oldest' entries will be on top of the table or it should be the newest + ones (reversed timestamp case). - Same for the reversed timestamps, but the order will be opposite. - - :param: dt: datetime which is translated to the (reversed or not) timestamp - :param: reverse: is a boolean parameter for reverse or straight count of + :param: dt: datetime which is translated to timestamp + :param: reverse: a boolean parameter for reverse or straight count of timestamp in milliseconds :return count or reversed count of milliseconds since start of epoch """ @@ -984,7 +1042,7 @@ def get_start_end_rts(start, start_op, end, end_op): rts_start = str(timestamp(start) + 1) if start else "" rts_end = str(timestamp(end) + 1) if end else "" - #By default, we are using ge for lower bound and lt for upper bound + # By default, we are using ge for lower bound and lt for upper bound if start_op == 'gt': rts_start = str(long(rts_start) - 2) if end_op == 'le': @@ -1004,9 +1062,9 @@ def make_query(metaquery=None, trait_query=None, **kwargs): q = [] res_q = None - # Query for traits if a little differ from others it is constructed with - # SingleColumnValueFilter with the possibility to choose an operator for - # value + # Query for traits differs from others. It is constructed with + # SingleColumnValueFilter with the possibility to choose comparision + # operator if trait_query: trait_name = kwargs.pop('key') op = kwargs.pop('op', 'eq') @@ -1103,6 +1161,43 @@ def make_sample_query_from_filter(sample_filter, require_meter=True): return res_q, start_row, end_row, columns +def make_meter_query_for_resource(start_timestamp, start_timestamp_op, + end_timestamp, end_timestamp_op, source, + query=None): + """This method is used when Resource table should be filtered by meters. + In this method we are looking into all qualifiers with m_ prefix. + + :param start_timestamp: meter's timestamp start range. + :param start_timestamp_op: meter's start time operator, like ge, gt. + :param end_timestamp: meter's timestamp end range. + :param end_timestamp_op: meter's end time operator, like lt, le. + :param source: source filter. + :param query: a query string to concatenate with. + """ + start_rts, end_rts = get_start_end_rts(start_timestamp, + start_timestamp_op, + end_timestamp, end_timestamp_op) + mq = [] + + if start_rts: + filter_value = start_rts + '+' + source if source else start_rts + mq.append(_QualifierFilter("<=", filter_value)) + + if end_rts: + filter_value = end_rts + '+' + source if source else end_rts + mq.append(_QualifierFilter(">=", filter_value)) + + if mq: + meter_q = " AND ".join(mq) + # If there is a filtering on time_range we need to point that + # qualifiers should start with m_. Overwise in case e.g. + # QualifierFilter (>=, 'binaryprefix:m_9222030811134775808') + # qualifier 's_test' satisfies the filter and will be returned. + meter_q = _QualifierFilter("=", '') + " AND " + meter_q + query = meter_q if not query else query + " AND " + meter_q + return query + + def _make_general_rowkey_scan(rts_start=None, rts_end=None, some_id=None): """If it's filter on some_id without start and end, start_row = some_id while end_row = some_id + MAX_BYTE @@ -1117,10 +1212,10 @@ def _make_general_rowkey_scan(rts_start=None, rts_end=None, some_id=None): return start_row, end_row -def _format_meter_reference(counter_name, counter_type, counter_unit, source): +def _format_meter_reference(c_name, c_type, c_unit, rts, source): """Format reference to meter data. """ - return "%s!%s!%s+%s" % (counter_name, counter_type, counter_unit, source) + return "%s+%s+%s!%s!%s" % (rts, source, c_name, c_type, c_unit) def _timestamp_from_record_tuple(record): @@ -1156,8 +1251,9 @@ def deserialize_entry(entry, get_raw_meta=True): sources.append(k[4:]) elif k.startswith('f:r_metadata.'): metadata_flattened[k[len('f:r_metadata.'):]] = load(v) - elif k.startswith('f:m_'): - meters.append(k[4:]) + elif k.startswith("f:m_"): + meter = (k[4:], load(v)) + meters.append(meter) else: flatten_result[k[2:]] = load(v) if get_raw_meta: @@ -1187,9 +1283,9 @@ def serialize_entry(data=None, **kwargs): # a separate cell. For this purpose s_ and m_ prefixes are # introduced. result['f:s_%s' % v] = dump('1') - elif k == 'meter': - result['f:m_%s' % v] = dump('1') + for meter, ts in v.items(): + result['f:m_%s' % meter] = dump(ts) elif k == 'resource_metadata': # keep raw metadata as well as flattened to provide # capability with API v2. It will be flattened in another