diff --git a/ceilometer/storage/hbase/__init__.py b/ceilometer/storage/hbase/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/ceilometer/storage/hbase/inmemory.py b/ceilometer/storage/hbase/inmemory.py new file mode 100644 index 000000000..4d864e743 --- /dev/null +++ b/ceilometer/storage/hbase/inmemory.py @@ -0,0 +1,264 @@ +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +""" This is a very crude version of "in-memory HBase", which implements just + enough functionality of HappyBase API to support testing of our driver. +""" + +import copy +import re +import six + +from ceilometer.openstack.common.gettextutils import _ +from ceilometer.openstack.common import log + +LOG = log.getLogger(__name__) + + +class MTable(object): + """HappyBase.Table mock + """ + def __init__(self, name, families): + self.name = name + self.families = families + self._rows_with_ts = {} + + def row(self, key, columns=None): + if key not in self._rows_with_ts: + return {} + res = copy.copy(sorted(six.iteritems( + self._rows_with_ts.get(key)))[-1][1]) + if columns: + keys = res.keys() + for key in keys: + if key not in columns: + res.pop(key) + return res + + def rows(self, keys): + return ((k, self.row(k)) for k in keys) + + def put(self, key, data, ts=None): + # Note: Now we use 'timestamped' but only for one Resource table. + # That's why we may put ts='0' in case when ts is None. If it is + # needed to use 2 types of put in one table ts=0 cannot be used. + if ts is None: + ts = "0" + if key not in self._rows_with_ts: + self._rows_with_ts[key] = {ts: data} + else: + if ts in self._rows_with_ts[key]: + self._rows_with_ts[key][ts].update(data) + else: + self._rows_with_ts[key].update({ts: data}) + + def delete(self, key): + del self._rows_with_ts[key] + + def _get_latest_dict(self, row): + # The idea here is to return latest versions of columns. + # In _rows_with_ts we store {row: {ts_1: {data}, ts_2: {data}}}. + # res will contain a list of tuples [(ts_1, {data}), (ts_2, {data})] + # sorted by ts, i.e. in this list ts_2 is the most latest. + # To get result as HBase provides we should iterate in reverse order + # and get from "latest" data only key-values that are not in newer data + data = {} + for i in sorted(six.iteritems(self._rows_with_ts[row])): + data.update(i[1]) + return data + + def scan(self, filter=None, columns=None, row_start=None, row_stop=None, + limit=None): + columns = columns or [] + sorted_keys = sorted(self._rows_with_ts) + # copy data between row_start and row_stop into a dict + rows = {} + for row in sorted_keys: + if row_start and row < row_start: + continue + if row_stop and row > row_stop: + break + rows[row] = self._get_latest_dict(row) + + if columns: + ret = {} + for row, data in six.iteritems(rows): + for key in data: + if key in columns: + ret[row] = data + rows = ret + if filter: + # TODO(jdanjou): we should really parse this properly, + # but at the moment we are only going to support AND here + filters = filter.split('AND') + for f in filters: + # Extract filter name and its arguments + g = re.search("(.*)\((.*),?\)", f) + fname = g.group(1).strip() + fargs = [s.strip().replace('\'', '') + for s in g.group(2).split(',')] + m = getattr(self, fname) + if callable(m): + # overwrite rows for filtering to take effect + # in case of multiple filters + rows = m(fargs, rows) + else: + raise NotImplementedError("%s filter is not implemented, " + "you may want to add it!") + for k in sorted(rows)[:limit]: + yield k, rows[k] + + @staticmethod + def SingleColumnValueFilter(args, rows): + """This method is called from scan() when 'SingleColumnValueFilter' + is found in the 'filter' argument. + """ + op = args[2] + column = "%s:%s" % (args[0], args[1]) + value = args[3] + if value.startswith('binary:'): + value = value[7:] + r = {} + for row in rows: + data = rows[row] + + if op == '=': + if column in data and data[column] == value: + r[row] = data + elif op == '<=': + if column in data and data[column] <= value: + r[row] = data + elif op == '>=': + if column in data and data[column] >= value: + r[row] = data + else: + raise NotImplementedError("In-memory " + "SingleColumnValueFilter " + "doesn't support the %s operation " + "yet" % op) + return r + + @staticmethod + def ColumnPrefixFilter(args, rows): + """This is filter for testing "in-memory HBase". + + This method is called from scan() when 'ColumnPrefixFilter' is found + in the 'filter' argument. + + :param args: a list of filter arguments, contain prefix of column + :param rows: a dict of row prefixes for filtering + """ + value = args[0] + column = 'f:' + value + r = {} + for row, data in rows.items(): + column_dict = {} + for key in data: + if key.startswith(column): + column_dict[key] = data[key] + r[row] = column_dict + return r + + @staticmethod + def RowFilter(args, rows): + """This is filter for testing "in-memory HBase". + + This method is called from scan() when 'RowFilter' is found in the + 'filter' argument. + + :param args: a list of filter arguments, it contains operator and + sought string + :param rows: a dict of rows which are filtered + """ + op = args[0] + value = args[1] + if value.startswith('regexstring:'): + value = value[len('regexstring:'):] + r = {} + for row, data in rows.items(): + try: + g = re.search(value, row).group() + if op == '=': + if g == row: + r[row] = data + else: + raise NotImplementedError("In-memory " + "RowFilter doesn't support " + "the %s operation yet" % op) + except AttributeError: + pass + return r + + @staticmethod + def QualifierFilter(args, rows): + """This method is called from scan() when 'QualifierFilter' + is found in the 'filter' argument + """ + op = args[0] + value = args[1] + if value.startswith('binaryprefix:'): + value = value[len('binaryprefix:'):] + column = 'f:' + value + r = {} + for row in rows: + data = rows[row] + r_data = {} + for key in data: + if (op == '=' and key.startswith(column)) or \ + (op == '>=' and key >= column) or \ + (op == '<=' and key <= column): + r_data[key] = data[key] + else: + raise NotImplementedError("In-memory QualifierFilter " + "doesn't support the %s " + "operation yet" % op) + if r_data: + r[row] = r_data + return r + + +class MConnectionPool(object): + def __init__(self): + self.conn = MConnection() + + def connection(self): + return self.conn + + +class MConnection(object): + """HappyBase.Connection mock + """ + def __init__(self): + self.tables = {} + + def __enter__(self, *args, **kwargs): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + pass + + def open(self): + LOG.debug(_("Opening in-memory HBase connection")) + + def create_table(self, n, families=None): + families = families or {} + if n in self.tables: + return self.tables[n] + t = MTable(n, families) + self.tables[n] = t + return t + + def delete_table(self, name, use_prefix=True): + del self.tables[name] + + def table(self, name): + return self.create_table(name) diff --git a/ceilometer/storage/hbase/utils.py b/ceilometer/storage/hbase/utils.py new file mode 100644 index 000000000..ae66f1eef --- /dev/null +++ b/ceilometer/storage/hbase/utils.py @@ -0,0 +1,402 @@ +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +""" Various HBase helpers +""" +import copy +import datetime +import json + +import bson.json_util + +from ceilometer import utils + +DTYPE_NAMES = {'none': 0, 'string': 1, 'integer': 2, 'float': 3, + 'datetime': 4} +OP_SIGN = {'eq': '=', 'lt': '<', 'le': '<=', 'ne': '!=', 'gt': '>', + 'ge': '>='} + + +def _QualifierFilter(op, qualifier): + return "QualifierFilter (%s, 'binaryprefix:m_%s')" % (op, qualifier) + + +def timestamp(dt, reverse=True): + """Timestamp is count of milliseconds since start of epoch. + + If reverse=True then timestamp will be reversed. Such a technique is used + in HBase rowkey design when period queries are required. Because of the + fact that rows are sorted lexicographically it's possible to vary whether + the 'oldest' entries will be on top of the table or it should be the newest + ones (reversed timestamp case). + + :param dt: datetime which is translated to timestamp + :param reverse: a boolean parameter for reverse or straight count of + timestamp in milliseconds + :return: count or reversed count of milliseconds since start of epoch + """ + epoch = datetime.datetime(1970, 1, 1) + td = dt - epoch + ts = td.microseconds + td.seconds * 1000000 + td.days * 86400000000 + return 0x7fffffffffffffff - ts if reverse else ts + + +def make_events_query_from_filter(event_filter): + """Return start and stop row for filtering and a query which based on the + selected parameter. + + :param event_filter: storage.EventFilter object. + """ + q = [] + res_q = None + start = "%s" % (timestamp(event_filter.start_time, reverse=False) + if event_filter.start_time else "") + stop = "%s" % (timestamp(event_filter.end_time, reverse=False) + if event_filter.end_time else "") + if event_filter.event_type: + q.append("SingleColumnValueFilter ('f', 'event_type', = , " + "'binary:%s')" % dump(event_filter.event_type)) + if event_filter.message_id: + q.append("RowFilter ( = , 'regexstring:\d*_%s')" % + event_filter.message_id) + if len(q): + res_q = " AND ".join(q) + + if event_filter.traits_filter: + for trait_filter in event_filter.traits_filter: + q_trait = make_query(trait_query=True, **trait_filter) + if q_trait: + if res_q: + res_q += " AND " + q_trait + else: + res_q = q_trait + return res_q, start, stop + + +def make_timestamp_query(func, start=None, start_op=None, end=None, + end_op=None, bounds_only=False, **kwargs): + """Return a filter start and stop row for filtering and a query + which based on the fact that CF-name is 'rts'. + + :param start: Optional start timestamp + :param start_op: Optional start timestamp operator, like gt, ge + :param end: Optional end timestamp + :param end_op: Optional end timestamp operator, like lt, le + :param bounds_only: if True than query will not be returned + :param func: a function that provide a format of row + :param kwargs: kwargs for :param func + """ + rts_start, rts_end = get_start_end_rts(start, start_op, end, end_op) + start_row, end_row = func(rts_start, rts_end, **kwargs) + + if bounds_only: + return start_row, end_row + + q = [] + # We dont need to dump here because get_start_end_rts returns strings + if rts_start: + q.append("SingleColumnValueFilter ('f', 'rts', <=, 'binary:%s')" % + rts_start) + if rts_end: + q.append("SingleColumnValueFilter ('f', 'rts', >=, 'binary:%s')" % + rts_end) + + res_q = None + if len(q): + res_q = " AND ".join(q) + + return start_row, end_row, res_q + + +def get_start_end_rts(start, start_op, end, end_op): + + rts_start = str(timestamp(start) + 1) if start else "" + rts_end = str(timestamp(end) + 1) if end else "" + + # By default, we are using ge for lower bound and lt for upper bound + if start_op == 'gt': + rts_start = str(long(rts_start) - 2) + if end_op == 'le': + rts_end = str(long(rts_end) - 1) + + return rts_start, rts_end + + +def make_query(metaquery=None, trait_query=None, **kwargs): + """Return a filter query string based on the selected parameters. + + :param metaquery: optional metaquery dict + :param trait_query: optional boolean, for trait_query from kwargs + :param kwargs: key-value pairs to filter on. Key should be a real + column name in db + """ + q = [] + res_q = None + + # Query for traits differs from others. It is constructed with + # SingleColumnValueFilter with the possibility to choose comparision + # operator + if trait_query: + trait_name = kwargs.pop('key') + op = kwargs.pop('op', 'eq') + for k, v in kwargs.items(): + if v is not None: + res_q = ("SingleColumnValueFilter " + "('f', '%s+%d', %s, 'binary:%s', true, true)" % + (trait_name, DTYPE_NAMES[k], OP_SIGN[op], + dump(v))) + return res_q + + # Note: we use extended constructor for SingleColumnValueFilter here. + # It is explicitly specified that entry should not be returned if CF is not + # found in table. + for key, value in sorted(kwargs.items()): + if value is not None: + if key == 'source': + q.append("SingleColumnValueFilter " + "('f', 's_%s', =, 'binary:%s', true, true)" % + (value, dump('1'))) + elif key == 'trait_type': + q.append("ColumnPrefixFilter('%s')" % value) + else: + q.append("SingleColumnValueFilter " + "('f', '%s', =, 'binary:%s', true, true)" % + (key, dump(value))) + res_q = None + if len(q): + res_q = " AND ".join(q) + + if metaquery: + meta_q = [] + for k, v in metaquery.items(): + meta_q.append( + "SingleColumnValueFilter ('f', '%s', =, 'binary:%s', " + "true, true)" + % ('r_' + k, dump(v))) + meta_q = " AND ".join(meta_q) + # join query and metaquery + if res_q is not None: + res_q += " AND " + meta_q + else: + res_q = meta_q # metaquery only + + return res_q + + +def get_meter_columns(metaquery, **kwargs): + """Return a list of required columns in meter table to be scanned . + + :param metaquery: optional metaquery dict + :param kwargs: key-value pairs to filter on. Key should be a real + column name in db + """ + columns = ['f:message', 'f:recorded_at'] + columns.extend(["f:%s" % k for k, v in kwargs.items() if v]) + if metaquery: + columns.extend(["f:r_%s" % k for k, v in metaquery.items() if v]) + return columns + + +def make_sample_query_from_filter(sample_filter, require_meter=True): + """Return a query dictionary based on the settings in the filter. + + :param sample_filter: SampleFilter instance + :param require_meter: If true and the filter does not have a meter, + raise an error. + """ + + meter = sample_filter.meter + if not meter and require_meter: + raise RuntimeError('Missing required meter specifier') + start_row, end_row, ts_query = make_timestamp_query( + make_general_rowkey_scan, + start=sample_filter.start, start_op=sample_filter.start_timestamp_op, + end=sample_filter.end, end_op=sample_filter.end_timestamp_op, + some_id=meter) + + kwargs = dict(user_id=sample_filter.user, + project_id=sample_filter.project, + counter_name=meter, + resource_id=sample_filter.resource, + source=sample_filter.source, + message_id=sample_filter.message_id) + + q = make_query(metaquery=sample_filter.metaquery, **kwargs) + + if q: + ts_query = (" AND " + ts_query) if ts_query else "" + res_q = q + ts_query if ts_query else q + else: + res_q = ts_query if ts_query else None + columns = get_meter_columns(metaquery=sample_filter.metaquery, **kwargs) + return res_q, start_row, end_row, columns + + +def make_meter_query_for_resource(start_timestamp, start_timestamp_op, + end_timestamp, end_timestamp_op, source, + query=None): + """This method is used when Resource table should be filtered by meters. + In this method we are looking into all qualifiers with m_ prefix. + + :param start_timestamp: meter's timestamp start range. + :param start_timestamp_op: meter's start time operator, like ge, gt. + :param end_timestamp: meter's timestamp end range. + :param end_timestamp_op: meter's end time operator, like lt, le. + :param source: source filter. + :param query: a query string to concatenate with. + """ + start_rts, end_rts = get_start_end_rts(start_timestamp, + start_timestamp_op, + end_timestamp, end_timestamp_op) + mq = [] + + if start_rts: + filter_value = start_rts + '+' + source if source else start_rts + mq.append(_QualifierFilter("<=", filter_value)) + + if end_rts: + filter_value = end_rts + '+' + source if source else end_rts + mq.append(_QualifierFilter(">=", filter_value)) + + if mq: + meter_q = " AND ".join(mq) + # If there is a filtering on time_range we need to point that + # qualifiers should start with m_. Overwise in case e.g. + # QualifierFilter (>=, 'binaryprefix:m_9222030811134775808') + # qualifier 's_test' satisfies the filter and will be returned. + meter_q = _QualifierFilter("=", '') + " AND " + meter_q + query = meter_q if not query else query + " AND " + meter_q + return query + + +def make_general_rowkey_scan(rts_start=None, rts_end=None, some_id=None): + """If it's filter on some_id without start and end, + start_row = some_id while end_row = some_id + MAX_BYTE + """ + if some_id is None: + return None, None + if not rts_start: + rts_start = chr(127) + end_row = "%s_%s" % (some_id, rts_start) + start_row = "%s_%s" % (some_id, rts_end) + + return start_row, end_row + + +def format_meter_reference(c_name, c_type, c_unit, rts, source): + """Format reference to meter data. + """ + return "%s+%s+%s!%s!%s" % (rts, source, c_name, c_type, c_unit) + + +def timestamp_from_record_tuple(record): + """Extract timestamp from HBase tuple record + """ + return record[0]['timestamp'] + + +def resource_id_from_record_tuple(record): + """Extract resource_id from HBase tuple record + """ + return record[0]['resource_id'] + + +def deserialize_entry(entry, get_raw_meta=True): + """Return a list of flatten_result, sources, meters and metadata + flatten_result contains a dict of simple structures such as 'resource_id':1 + sources/meters are the lists of sources and meters correspondingly. + metadata is metadata dict. This dict may be returned as flattened if + get_raw_meta is False. + + :param entry: entry from HBase, without row name and timestamp + :param get_raw_meta: If true then raw metadata will be returned, + if False metadata will be constructed from 'f:r_metadata.' fields + """ + flatten_result = {} + sources = [] + meters = [] + metadata_flattened = {} + for k, v in entry.items(): + if k.startswith('f:s_'): + sources.append(k[4:]) + elif k.startswith('f:r_metadata.'): + metadata_flattened[k[len('f:r_metadata.'):]] = load(v) + elif k.startswith("f:m_"): + meter = (k[4:], load(v)) + meters.append(meter) + else: + flatten_result[k[2:]] = load(v) + if get_raw_meta: + metadata = flatten_result.get('resource_metadata', {}) + else: + metadata = metadata_flattened + + return flatten_result, sources, meters, metadata + + +def serialize_entry(data=None, **kwargs): + """Return a dict that is ready to be stored to HBase + + :param data: dict to be serialized + :param kwargs: additional args + """ + data = data or {} + entry_dict = copy.copy(data) + entry_dict.update(**kwargs) + + result = {} + for k, v in entry_dict.items(): + if k == 'source': + # user, project and resource tables may contain several sources. + # Besides, resource table may contain several meters. + # To make insertion safe we need to store all meters and sources in + # a separate cell. For this purpose s_ and m_ prefixes are + # introduced. + result['f:s_%s' % v] = dump('1') + elif k == 'meter': + for meter, ts in v.items(): + result['f:m_%s' % meter] = dump(ts) + elif k == 'resource_metadata': + # keep raw metadata as well as flattened to provide + # capability with API v2. It will be flattened in another + # way on API level. But we need flattened too for quick filtering. + flattened_meta = dump_metadata(v) + for k, m in flattened_meta.items(): + result['f:r_metadata.' + k] = dump(m) + result['f:resource_metadata'] = dump(v) + else: + result['f:' + k] = dump(v) + return result + + +def dump_metadata(meta): + resource_metadata = {} + for key, v in utils.dict_to_keyval(meta): + resource_metadata[key] = v + return resource_metadata + + +def dump(data): + return json.dumps(data, default=bson.json_util.default) + + +def load(data): + return json.loads(data, object_hook=object_hook) + + +# We don't want to have tzinfo in decoded json.This object_hook is +# overwritten json_util.object_hook for $date +def object_hook(dct): + if "$date" in dct: + dt = bson.json_util.object_hook(dct) + return dt.replace(tzinfo=None) + return bson.json_util.object_hook(dct) diff --git a/ceilometer/storage/impl_hbase.py b/ceilometer/storage/impl_hbase.py index 717916a62..d620630a3 100644 --- a/ceilometer/storage/impl_hbase.py +++ b/ceilometer/storage/impl_hbase.py @@ -1,9 +1,4 @@ # -# Copyright 2012, 2013 Dell Inc. -# -# Author: Stas Maksimov -# Author: Shengjie Min -# # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at @@ -17,18 +12,13 @@ # under the License. """HBase storage backend """ -import copy import datetime import hashlib -import json import operator import os -import re -import six import six.moves.urllib.parse as urlparse import time -import bson.json_util import happybase from ceilometer.alarm.storage import models as alarm_models @@ -37,6 +27,8 @@ from ceilometer.openstack.common import log from ceilometer.openstack.common import network_utils from ceilometer.openstack.common import timeutils from ceilometer.storage import base +from ceilometer.storage.hbase import inmemory as hbase_inmemory +from ceilometer.storage.hbase import utils as hbase_utils from ceilometer.storage import models from ceilometer import utils @@ -61,11 +53,6 @@ AVAILABLE_STORAGE_CAPABILITIES = { 'storage': {'production_ready': True}, } -DTYPE_NAMES = {'none': 0, 'string': 1, 'integer': 2, 'float': 3, - 'datetime': 4} -OP_SIGN = {'eq': '=', 'lt': '<', 'le': '<=', 'ne': '!=', 'gt': '>', - 'ge': '>='} - class Connection(base.Connection): """Put the data into a HBase database @@ -185,7 +172,8 @@ class Connection(base.Connection): if Connection._memory_instance is None: LOG.debug(_('Creating a new in-memory HBase ' 'Connection object')) - Connection._memory_instance = MConnectionPool() + Connection._memory_instance = \ + hbase_inmemory.MConnectionPool() self.conn_pool = Connection._memory_instance else: self.conn_pool = self._get_connection_pool(opts) @@ -259,11 +247,12 @@ class Connection(base.Connection): call as_dict() """ _id = alarm.alarm_id - alarm_to_store = serialize_entry(alarm.as_dict()) + alarm_to_store = hbase_utils.serialize_entry(alarm.as_dict()) with self.conn_pool.connection() as conn: alarm_table = conn.table(self.ALARM_TABLE) alarm_table.put(_id, alarm_to_store) - stored_alarm = deserialize_entry(alarm_table.row(_id))[0] + stored_alarm = hbase_utils.deserialize_entry( + alarm_table.row(_id))[0] return alarm_models.Alarm(**stored_alarm) create_alarm = update_alarm @@ -281,24 +270,26 @@ class Connection(base.Connection): if meter: raise NotImplementedError('Filter by meter not implemented') - q = make_query(alarm_id=alarm_id, name=name, enabled=enabled, - user_id=user, project_id=project, state=state) + q = hbase_utils.make_query(alarm_id=alarm_id, name=name, + enabled=enabled, user_id=user, + project_id=project, state=state) with self.conn_pool.connection() as conn: alarm_table = conn.table(self.ALARM_TABLE) gen = alarm_table.scan(filter=q) for ignored, data in gen: - stored_alarm = deserialize_entry(data)[0] + stored_alarm = hbase_utils.deserialize_entry(data)[0] yield alarm_models.Alarm(**stored_alarm) def get_alarm_changes(self, alarm_id, on_behalf_of, user=None, project=None, type=None, start_timestamp=None, start_timestamp_op=None, end_timestamp=None, end_timestamp_op=None): - q = make_query(alarm_id=alarm_id, on_behalf_of=on_behalf_of, type=type, - user_id=user, project_id=project) - start_row, end_row = make_timestamp_query( - _make_general_rowkey_scan, + q = hbase_utils.make_query(alarm_id=alarm_id, + on_behalf_of=on_behalf_of, type=type, + user_id=user, project_id=project) + start_row, end_row = hbase_utils.make_timestamp_query( + hbase_utils.make_general_rowkey_scan, start=start_timestamp, start_op=start_timestamp_op, end=end_timestamp, end_op=end_timestamp_op, bounds_only=True, some_id=alarm_id) @@ -307,15 +298,15 @@ class Connection(base.Connection): gen = alarm_history_table.scan(filter=q, row_start=start_row, row_stop=end_row) for ignored, data in gen: - stored_entry = deserialize_entry(data)[0] + stored_entry = hbase_utils.deserialize_entry(data)[0] yield alarm_models.AlarmChange(**stored_entry) def record_alarm_change(self, alarm_change): """Record alarm change event. """ - alarm_change_dict = serialize_entry(alarm_change) + alarm_change_dict = hbase_utils.serialize_entry(alarm_change) ts = alarm_change.get('timestamp') or datetime.datetime.now() - rts = timestamp(ts) + rts = hbase_utils.timestamp(ts) with self.conn_pool.connection() as conn: alarm_history_table = conn.table(self.ALARM_HISTORY_TABLE) alarm_history_table.put(alarm_change.get('alarm_id') + "_" + @@ -333,13 +324,13 @@ class Connection(base.Connection): resource_metadata = data.get('resource_metadata', {}) # Determine the name of new meter - rts = timestamp(data['timestamp']) - new_meter = _format_meter_reference( + rts = hbase_utils.timestamp(data['timestamp']) + new_meter = hbase_utils.format_meter_reference( data['counter_name'], data['counter_type'], data['counter_unit'], rts, data['source']) #TODO(nprivalova): try not to store resource_id - resource = serialize_entry(**{ + resource = hbase_utils.serialize_entry(**{ 'source': data['source'], 'meter': {new_meter: data['timestamp']}, 'resource_metadata': resource_metadata, @@ -360,11 +351,9 @@ class Connection(base.Connection): m.update("%s%s%s" % (data['user_id'], data['resource_id'], data['project_id'])) row = "%s_%d_%s" % (data['counter_name'], rts, m.hexdigest()) - record = serialize_entry(data, **{'source': data['source'], - 'rts': rts, - 'message': data, - 'recorded_at': timeutils.utcnow( - )}) + record = hbase_utils.serialize_entry( + data, **{'source': data['source'], 'rts': rts, + 'message': data, 'recorded_at': timeutils.utcnow()}) meter_table.put(row, record) def get_resources(self, user=None, project=None, source=None, @@ -387,16 +376,20 @@ class Connection(base.Connection): if pagination: raise NotImplementedError('Pagination not implemented') - q = make_query(metaquery=metaquery, user_id=user, project_id=project, - resource_id=resource, source=source) - q = make_meter_query_for_resource(start_timestamp, start_timestamp_op, - end_timestamp, end_timestamp_op, - source, q) + q = hbase_utils.make_query(metaquery=metaquery, user_id=user, + project_id=project, + resource_id=resource, source=source) + q = hbase_utils.make_meter_query_for_resource(start_timestamp, + start_timestamp_op, + end_timestamp, + end_timestamp_op, + source, q) with self.conn_pool.connection() as conn: resource_table = conn.table(self.RESOURCE_TABLE) LOG.debug(_("Query Resource table: %s") % q) for resource_id, data in resource_table.scan(filter=q): - f_res, sources, meters, md = deserialize_entry(data) + f_res, sources, meters, md = hbase_utils.deserialize_entry( + data) # Unfortunately happybase doesn't keep ordered result from # HBase. So that's why it's needed to find min and max # manually @@ -410,7 +403,7 @@ class Connection(base.Connection): row = resource_table.row( resource_id, columns=['f:project_id', 'f:user_id', 'f:resource_metadata']) - f_res, _s, _m, md = deserialize_entry(row) + f_res, _s, _m, md = hbase_utils.deserialize_entry(row) yield models.Resource( resource_id=resource_id, first_sample_timestamp=first_ts, @@ -438,9 +431,10 @@ class Connection(base.Connection): raise NotImplementedError(_('Pagination not implemented')) with self.conn_pool.connection() as conn: resource_table = conn.table(self.RESOURCE_TABLE) - q = make_query(metaquery=metaquery, user_id=user, - project_id=project, resource_id=resource, - source=source) + q = hbase_utils.make_query(metaquery=metaquery, user_id=user, + project_id=project, + resource_id=resource, + source=source) LOG.debug(_("Query Resource table: %s") % q) gen = resource_table.scan(filter=q) @@ -449,7 +443,8 @@ class Connection(base.Connection): # https://bugs.launchpad.net/ceilometer/+bug/1301371 result = set() for ignored, data in gen: - flatten_result, s, meters, md = deserialize_entry(data) + flatten_result, s, meters, md = hbase_utils.deserialize_entry( + data) for m in meters: _m_rts, m_source, m_raw = m[0].split("+") name, type, unit = m_raw.split('!') @@ -478,13 +473,14 @@ class Connection(base.Connection): return with self.conn_pool.connection() as conn: meter_table = conn.table(self.METER_TABLE) - q, start, stop, columns = make_sample_query_from_filter( - sample_filter, require_meter=False) + q, start, stop, columns = \ + hbase_utils.make_sample_query_from_filter( + sample_filter, require_meter=False) LOG.debug(_("Query Meter Table: %s") % q) gen = meter_table.scan(filter=q, row_start=start, row_stop=stop, limit=limit) for ignored, meter in gen: - d_meter = deserialize_entry(meter)[0] + d_meter = hbase_utils.deserialize_entry(meter)[0] d_meter['message']['recorded_at'] = d_meter['recorded_at'] yield models.Sample(**d_meter['message']) @@ -534,14 +530,16 @@ class Connection(base.Connection): with self.conn_pool.connection() as conn: meter_table = conn.table(self.METER_TABLE) - q, start, stop, columns = make_sample_query_from_filter( - sample_filter) + q, start, stop, columns = \ + hbase_utils.make_sample_query_from_filter(sample_filter) # These fields are used in statistics' calculating columns.extend(['f:timestamp', 'f:counter_volume', 'f:counter_unit']) - meters = map(deserialize_entry, list(meter for (ignored, meter) in - meter_table.scan(filter=q, row_start=start, - row_stop=stop, columns=columns))) + meters = map(hbase_utils.deserialize_entry, + list(meter for (ignored, meter) in + meter_table.scan( + filter=q, row_start=start, + row_stop=stop, columns=columns))) if sample_filter.start: start_time = sample_filter.start @@ -613,7 +611,7 @@ class Connection(base.Connection): # models.Event or purposes of storage event sorted by # timestamp in the database. ts = event_model.generated - row = "%d_%s" % (timestamp(ts, reverse=False), + row = "%d_%s" % (hbase_utils.timestamp(ts, reverse=False), event_model.message_id) event_type = event_model.event_type traits = {} @@ -621,8 +619,9 @@ class Connection(base.Connection): for trait in event_model.traits: key = "%s+%d" % (trait.name, trait.dtype) traits[key] = trait.value - record = serialize_entry(traits, event_type=event_type, - timestamp=ts) + record = hbase_utils.serialize_entry(traits, + event_type=event_type, + timestamp=ts) try: events_table.put(row, record) except Exception as ex: @@ -637,7 +636,8 @@ class Connection(base.Connection): :param event_filter: storage.EventFilter object, consists of filters for events that are stored in database. """ - q, start, stop = make_events_query_from_filter(event_filter) + q, start, stop = hbase_utils.make_events_query_from_filter( + event_filter) with self.conn_pool.connection() as conn: events_table = conn.table(self.EVENT_TABLE) @@ -646,7 +646,7 @@ class Connection(base.Connection): events = [] for event_id, data in gen: traits = [] - events_dict = deserialize_entry(data)[0] + events_dict = hbase_utils.deserialize_entry(data)[0] for key, value in events_dict.items(): if (not key.startswith('event_type') and not key.startswith('timestamp')): @@ -673,7 +673,7 @@ class Connection(base.Connection): event_types = set() for event_id, data in gen: - events_dict = deserialize_entry(data)[0] + events_dict = hbase_utils.deserialize_entry(data)[0] for key, value in events_dict.items(): if key.startswith('event_type'): if value not in event_types: @@ -688,13 +688,13 @@ class Connection(base.Connection): :param event_type: the type of the Event """ - q = make_query(event_type=event_type) + q = hbase_utils.make_query(event_type=event_type) trait_types = set() with self.conn_pool.connection() as conn: events_table = conn.table(self.EVENT_TABLE) gen = events_table.scan(filter=q) for event_id, data in gen: - events_dict = deserialize_entry(data)[0] + events_dict = hbase_utils.deserialize_entry(data)[0] for key, value in events_dict.items(): if (not key.startswith('event_type') and not key.startswith('timestamp')): @@ -717,13 +717,14 @@ class Connection(base.Connection): :param event_type: the type of the Event to filter by :param trait_type: the name of the Trait to filter by """ - q = make_query(event_type=event_type, trait_type=trait_type) + q = hbase_utils.make_query(event_type=event_type, + trait_type=trait_type) traits = [] with self.conn_pool.connection() as conn: events_table = conn.table(self.EVENT_TABLE) gen = events_table.scan(filter=q) for event_id, data in gen: - events_dict = deserialize_entry(data)[0] + events_dict = hbase_utils.deserialize_entry(data)[0] for key, value in events_dict.items(): if (not key.startswith('event_type') and not key.startswith('timestamp')): @@ -732,625 +733,3 @@ class Connection(base.Connection): dtype=int(tt_number), value=value)) for trait in sorted(traits, key=operator.attrgetter('dtype')): yield trait - - -def _QualifierFilter(op, qualifier): - return "QualifierFilter (%s, 'binaryprefix:m_%s')" % (op, qualifier) - - -############### -# This is a very crude version of "in-memory HBase", which implements just -# enough functionality of HappyBase API to support testing of our driver. -# -class MTable(object): - """HappyBase.Table mock - """ - def __init__(self, name, families): - self.name = name - self.families = families - self._rows_with_ts = {} - - def row(self, key, columns=None): - if key not in self._rows_with_ts: - return {} - res = copy.copy(sorted(six.iteritems( - self._rows_with_ts.get(key)))[-1][1]) - if columns: - keys = res.keys() - for key in keys: - if key not in columns: - res.pop(key) - return res - - def rows(self, keys): - return ((k, self.row(k)) for k in keys) - - def put(self, key, data, ts=None): - # Note: Now we use 'timestamped' but only for one Resource table. - # That's why we may put ts='0' in case when ts is None. If it is - # needed to use 2 types of put in one table ts=0 cannot be used. - if ts is None: - ts = "0" - if key not in self._rows_with_ts: - self._rows_with_ts[key] = {ts: data} - else: - if ts in self._rows_with_ts[key]: - self._rows_with_ts[key][ts].update(data) - else: - self._rows_with_ts[key].update({ts: data}) - - def delete(self, key): - del self._rows_with_ts[key] - - def _get_latest_dict(self, row): - # The idea here is to return latest versions of columns. - # In _rows_with_ts we store {row: {ts_1: {data}, ts_2: {data}}}. - # res will contain a list of tuples [(ts_1, {data}), (ts_2, {data})] - # sorted by ts, i.e. in this list ts_2 is the most latest. - # To get result as HBase provides we should iterate in reverse order - # and get from "latest" data only key-values that are not in newer data - data = {} - for i in sorted(six.iteritems(self._rows_with_ts[row])): - data.update(i[1]) - return data - - def scan(self, filter=None, columns=None, row_start=None, row_stop=None, - limit=None): - columns = columns or [] - sorted_keys = sorted(self._rows_with_ts) - # copy data between row_start and row_stop into a dict - rows = {} - for row in sorted_keys: - if row_start and row < row_start: - continue - if row_stop and row > row_stop: - break - rows[row] = self._get_latest_dict(row) - - if columns: - ret = {} - for row, data in six.iteritems(rows): - for key in data: - if key in columns: - ret[row] = data - rows = ret - if filter: - # TODO(jdanjou): we should really parse this properly, - # but at the moment we are only going to support AND here - filters = filter.split('AND') - for f in filters: - # Extract filter name and its arguments - g = re.search("(.*)\((.*),?\)", f) - fname = g.group(1).strip() - fargs = [s.strip().replace('\'', '') - for s in g.group(2).split(',')] - m = getattr(self, fname) - if callable(m): - # overwrite rows for filtering to take effect - # in case of multiple filters - rows = m(fargs, rows) - else: - raise NotImplementedError("%s filter is not implemented, " - "you may want to add it!") - for k in sorted(rows)[:limit]: - yield k, rows[k] - - @staticmethod - def SingleColumnValueFilter(args, rows): - """This method is called from scan() when 'SingleColumnValueFilter' - is found in the 'filter' argument. - """ - op = args[2] - column = "%s:%s" % (args[0], args[1]) - value = args[3] - if value.startswith('binary:'): - value = value[7:] - r = {} - for row in rows: - data = rows[row] - - if op == '=': - if column in data and data[column] == value: - r[row] = data - elif op == '<=': - if column in data and data[column] <= value: - r[row] = data - elif op == '>=': - if column in data and data[column] >= value: - r[row] = data - else: - raise NotImplementedError("In-memory " - "SingleColumnValueFilter " - "doesn't support the %s operation " - "yet" % op) - return r - - @staticmethod - def ColumnPrefixFilter(args, rows): - """This is filter for testing "in-memory HBase". - - This method is called from scan() when 'ColumnPrefixFilter' is found - in the 'filter' argument. - - :param args: a list of filter arguments, contain prefix of column - :param rows: a dict of row prefixes for filtering - """ - value = args[0] - column = 'f:' + value - r = {} - for row, data in rows.items(): - column_dict = {} - for key in data: - if key.startswith(column): - column_dict[key] = data[key] - r[row] = column_dict - return r - - @staticmethod - def RowFilter(args, rows): - """This is filter for testing "in-memory HBase". - - This method is called from scan() when 'RowFilter' is found in the - 'filter' argument. - - :param args: a list of filter arguments, it contains operator and - sought string - :param rows: a dict of rows which are filtered - """ - op = args[0] - value = args[1] - if value.startswith('regexstring:'): - value = value[len('regexstring:'):] - r = {} - for row, data in rows.items(): - try: - g = re.search(value, row).group() - if op == '=': - if g == row: - r[row] = data - else: - raise NotImplementedError("In-memory " - "RowFilter doesn't support " - "the %s operation yet" % op) - except AttributeError: - pass - return r - - @staticmethod - def QualifierFilter(args, rows): - """This method is called from scan() when 'QualifierFilter' - is found in the 'filter' argument - """ - op = args[0] - value = args[1] - if value.startswith('binaryprefix:'): - value = value[len('binaryprefix:'):] - column = 'f:' + value - r = {} - for row in rows: - data = rows[row] - r_data = {} - for key in data: - if (op == '=' and key.startswith(column)) or \ - (op == '>=' and key >= column) or \ - (op == '<=' and key <= column): - r_data[key] = data[key] - else: - raise NotImplementedError("In-memory QualifierFilter " - "doesn't support the %s " - "operation yet" % op) - if r_data: - r[row] = r_data - return r - - -class MConnectionPool(object): - def __init__(self): - self.conn = MConnection() - - def connection(self): - return self.conn - - -class MConnection(object): - """HappyBase.Connection mock - """ - def __init__(self): - self.tables = {} - - def __enter__(self, *args, **kwargs): - return self - - def __exit__(self, exc_type, exc_val, exc_tb): - pass - - def open(self): - LOG.debug(_("Opening in-memory HBase connection")) - - def create_table(self, n, families=None): - families = families or {} - if n in self.tables: - return self.tables[n] - t = MTable(n, families) - self.tables[n] = t - return t - - def delete_table(self, name, use_prefix=True): - del self.tables[name] - - def table(self, name): - return self.create_table(name) - - -################################################# -# Here be various HBase helpers -def timestamp(dt, reverse=True): - """Timestamp is count of milliseconds since start of epoch. - - If reverse=True then timestamp will be reversed. Such a technique is used - in HBase rowkey design when period queries are required. Because of the - fact that rows are sorted lexicographically it's possible to vary whether - the 'oldest' entries will be on top of the table or it should be the newest - ones (reversed timestamp case). - - :param dt: datetime which is translated to timestamp - :param reverse: a boolean parameter for reverse or straight count of - timestamp in milliseconds - :return: count or reversed count of milliseconds since start of epoch - """ - epoch = datetime.datetime(1970, 1, 1) - td = dt - epoch - ts = td.microseconds + td.seconds * 1000000 + td.days * 86400000000 - return 0x7fffffffffffffff - ts if reverse else ts - - -def make_events_query_from_filter(event_filter): - """Return start and stop row for filtering and a query which based on the - selected parameter. - - :param event_filter: storage.EventFilter object. - """ - q = [] - res_q = None - start = "%s" % (timestamp(event_filter.start_time, reverse=False) - if event_filter.start_time else "") - stop = "%s" % (timestamp(event_filter.end_time, reverse=False) - if event_filter.end_time else "") - if event_filter.event_type: - q.append("SingleColumnValueFilter ('f', 'event_type', = , " - "'binary:%s')" % dump(event_filter.event_type)) - if event_filter.message_id: - q.append("RowFilter ( = , 'regexstring:\d*_%s')" % - event_filter.message_id) - if len(q): - res_q = " AND ".join(q) - - if event_filter.traits_filter: - for trait_filter in event_filter.traits_filter: - q_trait = make_query(trait_query=True, **trait_filter) - if q_trait: - if res_q: - res_q += " AND " + q_trait - else: - res_q = q_trait - return res_q, start, stop - - -def make_timestamp_query(func, start=None, start_op=None, end=None, - end_op=None, bounds_only=False, **kwargs): - """Return a filter start and stop row for filtering and a query - which based on the fact that CF-name is 'rts'. - - :param start: Optional start timestamp - :param start_op: Optional start timestamp operator, like gt, ge - :param end: Optional end timestamp - :param end_op: Optional end timestamp operator, like lt, le - :param bounds_only: if True than query will not be returned - :param func: a function that provide a format of row - :param kwargs: kwargs for :param func - """ - rts_start, rts_end = get_start_end_rts(start, start_op, end, end_op) - start_row, end_row = func(rts_start, rts_end, **kwargs) - - if bounds_only: - return start_row, end_row - - q = [] - # We dont need to dump here because get_start_end_rts returns strings - if rts_start: - q.append("SingleColumnValueFilter ('f', 'rts', <=, 'binary:%s')" % - rts_start) - if rts_end: - q.append("SingleColumnValueFilter ('f', 'rts', >=, 'binary:%s')" % - rts_end) - - res_q = None - if len(q): - res_q = " AND ".join(q) - - return start_row, end_row, res_q - - -def get_start_end_rts(start, start_op, end, end_op): - - rts_start = str(timestamp(start) + 1) if start else "" - rts_end = str(timestamp(end) + 1) if end else "" - - # By default, we are using ge for lower bound and lt for upper bound - if start_op == 'gt': - rts_start = str(long(rts_start) - 2) - if end_op == 'le': - rts_end = str(long(rts_end) - 1) - - return rts_start, rts_end - - -def make_query(metaquery=None, trait_query=None, **kwargs): - """Return a filter query string based on the selected parameters. - - :param metaquery: optional metaquery dict - :param trait_query: optional boolean, for trait_query from kwargs - :param kwargs: key-value pairs to filter on. Key should be a real - column name in db - """ - q = [] - res_q = None - - # Query for traits differs from others. It is constructed with - # SingleColumnValueFilter with the possibility to choose comparision - # operator - if trait_query: - trait_name = kwargs.pop('key') - op = kwargs.pop('op', 'eq') - for k, v in kwargs.items(): - if v is not None: - res_q = ("SingleColumnValueFilter " - "('f', '%s+%d', %s, 'binary:%s', true, true)" % - (trait_name, DTYPE_NAMES[k], OP_SIGN[op], - dump(v))) - return res_q - - # Note: we use extended constructor for SingleColumnValueFilter here. - # It is explicitly specified that entry should not be returned if CF is not - # found in table. - for key, value in sorted(kwargs.items()): - if value is not None: - if key == 'source': - q.append("SingleColumnValueFilter " - "('f', 's_%s', =, 'binary:%s', true, true)" % - (value, dump('1'))) - elif key == 'trait_type': - q.append("ColumnPrefixFilter('%s')" % value) - else: - q.append("SingleColumnValueFilter " - "('f', '%s', =, 'binary:%s', true, true)" % - (key, dump(value))) - res_q = None - if len(q): - res_q = " AND ".join(q) - - if metaquery: - meta_q = [] - for k, v in metaquery.items(): - meta_q.append( - "SingleColumnValueFilter ('f', '%s', =, 'binary:%s', " - "true, true)" - % ('r_' + k, dump(v))) - meta_q = " AND ".join(meta_q) - # join query and metaquery - if res_q is not None: - res_q += " AND " + meta_q - else: - res_q = meta_q # metaquery only - - return res_q - - -def _get_meter_columns(metaquery, **kwargs): - """Return a list of required columns in meter table to be scanned . - - :param metaquery: optional metaquery dict - :param kwargs: key-value pairs to filter on. Key should be a real - column name in db - """ - columns = ['f:message', 'f:recorded_at'] - columns.extend(["f:%s" % k for k, v in kwargs.items() if v]) - if metaquery: - columns.extend(["f:r_%s" % k for k, v in metaquery.items() if v]) - return columns - - -def make_sample_query_from_filter(sample_filter, require_meter=True): - """Return a query dictionary based on the settings in the filter. - - :param sample_filter: SampleFilter instance - :param require_meter: If true and the filter does not have a meter, - raise an error. - """ - - meter = sample_filter.meter - if not meter and require_meter: - raise RuntimeError('Missing required meter specifier') - start_row, end_row, ts_query = make_timestamp_query( - _make_general_rowkey_scan, - start=sample_filter.start, start_op=sample_filter.start_timestamp_op, - end=sample_filter.end, end_op=sample_filter.end_timestamp_op, - some_id=meter) - - kwargs = dict(user_id=sample_filter.user, - project_id=sample_filter.project, - counter_name=meter, - resource_id=sample_filter.resource, - source=sample_filter.source, - message_id=sample_filter.message_id) - - q = make_query(metaquery=sample_filter.metaquery, **kwargs) - - if q: - ts_query = (" AND " + ts_query) if ts_query else "" - res_q = q + ts_query if ts_query else q - else: - res_q = ts_query if ts_query else None - columns = _get_meter_columns(metaquery=sample_filter.metaquery, **kwargs) - return res_q, start_row, end_row, columns - - -def make_meter_query_for_resource(start_timestamp, start_timestamp_op, - end_timestamp, end_timestamp_op, source, - query=None): - """This method is used when Resource table should be filtered by meters. - In this method we are looking into all qualifiers with m_ prefix. - - :param start_timestamp: meter's timestamp start range. - :param start_timestamp_op: meter's start time operator, like ge, gt. - :param end_timestamp: meter's timestamp end range. - :param end_timestamp_op: meter's end time operator, like lt, le. - :param source: source filter. - :param query: a query string to concatenate with. - """ - start_rts, end_rts = get_start_end_rts(start_timestamp, - start_timestamp_op, - end_timestamp, end_timestamp_op) - mq = [] - - if start_rts: - filter_value = start_rts + '+' + source if source else start_rts - mq.append(_QualifierFilter("<=", filter_value)) - - if end_rts: - filter_value = end_rts + '+' + source if source else end_rts - mq.append(_QualifierFilter(">=", filter_value)) - - if mq: - meter_q = " AND ".join(mq) - # If there is a filtering on time_range we need to point that - # qualifiers should start with m_. Overwise in case e.g. - # QualifierFilter (>=, 'binaryprefix:m_9222030811134775808') - # qualifier 's_test' satisfies the filter and will be returned. - meter_q = _QualifierFilter("=", '') + " AND " + meter_q - query = meter_q if not query else query + " AND " + meter_q - return query - - -def _make_general_rowkey_scan(rts_start=None, rts_end=None, some_id=None): - """If it's filter on some_id without start and end, - start_row = some_id while end_row = some_id + MAX_BYTE - """ - if some_id is None: - return None, None - if not rts_start: - rts_start = chr(127) - end_row = "%s_%s" % (some_id, rts_start) - start_row = "%s_%s" % (some_id, rts_end) - - return start_row, end_row - - -def _format_meter_reference(c_name, c_type, c_unit, rts, source): - """Format reference to meter data. - """ - return "%s+%s+%s!%s!%s" % (rts, source, c_name, c_type, c_unit) - - -def _timestamp_from_record_tuple(record): - """Extract timestamp from HBase tuple record - """ - return record[0]['timestamp'] - - -def _resource_id_from_record_tuple(record): - """Extract resource_id from HBase tuple record - """ - return record[0]['resource_id'] - - -def deserialize_entry(entry, get_raw_meta=True): - """Return a list of flatten_result, sources, meters and metadata - flatten_result contains a dict of simple structures such as 'resource_id':1 - sources/meters are the lists of sources and meters correspondingly. - metadata is metadata dict. This dict may be returned as flattened if - get_raw_meta is False. - - :param entry: entry from HBase, without row name and timestamp - :param get_raw_meta: If true then raw metadata will be returned, - if False metadata will be constructed from 'f:r_metadata.' fields - """ - flatten_result = {} - sources = [] - meters = [] - metadata_flattened = {} - for k, v in entry.items(): - if k.startswith('f:s_'): - sources.append(k[4:]) - elif k.startswith('f:r_metadata.'): - metadata_flattened[k[len('f:r_metadata.'):]] = load(v) - elif k.startswith("f:m_"): - meter = (k[4:], load(v)) - meters.append(meter) - else: - flatten_result[k[2:]] = load(v) - if get_raw_meta: - metadata = flatten_result.get('resource_metadata', {}) - else: - metadata = metadata_flattened - - return flatten_result, sources, meters, metadata - - -def serialize_entry(data=None, **kwargs): - """Return a dict that is ready to be stored to HBase - - :param data: dict to be serialized - :param kwargs: additional args - """ - data = data or {} - entry_dict = copy.copy(data) - entry_dict.update(**kwargs) - - result = {} - for k, v in entry_dict.items(): - if k == 'source': - # user, project and resource tables may contain several sources. - # Besides, resource table may contain several meters. - # To make insertion safe we need to store all meters and sources in - # a separate cell. For this purpose s_ and m_ prefixes are - # introduced. - result['f:s_%s' % v] = dump('1') - elif k == 'meter': - for meter, ts in v.items(): - result['f:m_%s' % meter] = dump(ts) - elif k == 'resource_metadata': - # keep raw metadata as well as flattened to provide - # capability with API v2. It will be flattened in another - # way on API level. But we need flattened too for quick filtering. - flattened_meta = dump_metadata(v) - for k, m in flattened_meta.items(): - result['f:r_metadata.' + k] = dump(m) - result['f:resource_metadata'] = dump(v) - else: - result['f:' + k] = dump(v) - return result - - -def dump_metadata(meta): - resource_metadata = {} - for key, v in utils.dict_to_keyval(meta): - resource_metadata[key] = v - return resource_metadata - - -def dump(data): - return json.dumps(data, default=bson.json_util.default) - - -def load(data): - return json.loads(data, object_hook=object_hook) - - -# We don't want to have tzinfo in decoded json.This object_hook is -# overwritten json_util.object_hook for $date -def object_hook(dct): - if "$date" in dct: - dt = bson.json_util.object_hook(dct) - return dt.replace(tzinfo=None) - return bson.json_util.object_hook(dct) diff --git a/ceilometer/tests/storage/test_impl_hbase.py b/ceilometer/tests/storage/test_impl_hbase.py index 6f2f025dd..e72245c04 100644 --- a/ceilometer/tests/storage/test_impl_hbase.py +++ b/ceilometer/tests/storage/test_impl_hbase.py @@ -25,6 +25,7 @@ """ from mock import patch +from ceilometer.storage.hbase import inmemory as hbase_inmemory from ceilometer.storage import impl_hbase as hbase from ceilometer.tests import base as test_base from ceilometer.tests import db as tests_db @@ -36,7 +37,8 @@ class ConnectionTest(tests_db.TestBase, @tests_db.run_with('hbase') def test_hbase_connection(self): conn = hbase.Connection(self.db_manager.url) - self.assertIsInstance(conn.conn_pool.connection(), hbase.MConnection) + self.assertIsInstance(conn.conn_pool.connection(), + hbase_inmemory.MConnection) class TestConn(object): def __init__(self, host, port):