Splits hbase storage code base

The hbase storage file have three different kind of things
* The Connection class
* The In-Memory implementation of HBase
* The a list of helper methods

This change splits this three things into separate files

Partial implements blueprint dedicated-alarm-database
Co-Authored-By: Ala Rezmerita <ala.rezmerita@cloudwatt.com>

Change-Id: I6453dc7795fb24d8ea773b129874e7936befae47
This commit is contained in:
Mehdi Abaakouk 2014-06-09 11:13:00 +02:00
parent 0f735ac50c
commit 7fc5579865
5 changed files with 735 additions and 688 deletions

View File

View File

@ -0,0 +1,264 @@
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
""" This is a very crude version of "in-memory HBase", which implements just
enough functionality of HappyBase API to support testing of our driver.
"""
import copy
import re
import six
from ceilometer.openstack.common.gettextutils import _
from ceilometer.openstack.common import log
LOG = log.getLogger(__name__)
class MTable(object):
"""HappyBase.Table mock
"""
def __init__(self, name, families):
self.name = name
self.families = families
self._rows_with_ts = {}
def row(self, key, columns=None):
if key not in self._rows_with_ts:
return {}
res = copy.copy(sorted(six.iteritems(
self._rows_with_ts.get(key)))[-1][1])
if columns:
keys = res.keys()
for key in keys:
if key not in columns:
res.pop(key)
return res
def rows(self, keys):
return ((k, self.row(k)) for k in keys)
def put(self, key, data, ts=None):
# Note: Now we use 'timestamped' but only for one Resource table.
# That's why we may put ts='0' in case when ts is None. If it is
# needed to use 2 types of put in one table ts=0 cannot be used.
if ts is None:
ts = "0"
if key not in self._rows_with_ts:
self._rows_with_ts[key] = {ts: data}
else:
if ts in self._rows_with_ts[key]:
self._rows_with_ts[key][ts].update(data)
else:
self._rows_with_ts[key].update({ts: data})
def delete(self, key):
del self._rows_with_ts[key]
def _get_latest_dict(self, row):
# The idea here is to return latest versions of columns.
# In _rows_with_ts we store {row: {ts_1: {data}, ts_2: {data}}}.
# res will contain a list of tuples [(ts_1, {data}), (ts_2, {data})]
# sorted by ts, i.e. in this list ts_2 is the most latest.
# To get result as HBase provides we should iterate in reverse order
# and get from "latest" data only key-values that are not in newer data
data = {}
for i in sorted(six.iteritems(self._rows_with_ts[row])):
data.update(i[1])
return data
def scan(self, filter=None, columns=None, row_start=None, row_stop=None,
limit=None):
columns = columns or []
sorted_keys = sorted(self._rows_with_ts)
# copy data between row_start and row_stop into a dict
rows = {}
for row in sorted_keys:
if row_start and row < row_start:
continue
if row_stop and row > row_stop:
break
rows[row] = self._get_latest_dict(row)
if columns:
ret = {}
for row, data in six.iteritems(rows):
for key in data:
if key in columns:
ret[row] = data
rows = ret
if filter:
# TODO(jdanjou): we should really parse this properly,
# but at the moment we are only going to support AND here
filters = filter.split('AND')
for f in filters:
# Extract filter name and its arguments
g = re.search("(.*)\((.*),?\)", f)
fname = g.group(1).strip()
fargs = [s.strip().replace('\'', '')
for s in g.group(2).split(',')]
m = getattr(self, fname)
if callable(m):
# overwrite rows for filtering to take effect
# in case of multiple filters
rows = m(fargs, rows)
else:
raise NotImplementedError("%s filter is not implemented, "
"you may want to add it!")
for k in sorted(rows)[:limit]:
yield k, rows[k]
@staticmethod
def SingleColumnValueFilter(args, rows):
"""This method is called from scan() when 'SingleColumnValueFilter'
is found in the 'filter' argument.
"""
op = args[2]
column = "%s:%s" % (args[0], args[1])
value = args[3]
if value.startswith('binary:'):
value = value[7:]
r = {}
for row in rows:
data = rows[row]
if op == '=':
if column in data and data[column] == value:
r[row] = data
elif op == '<=':
if column in data and data[column] <= value:
r[row] = data
elif op == '>=':
if column in data and data[column] >= value:
r[row] = data
else:
raise NotImplementedError("In-memory "
"SingleColumnValueFilter "
"doesn't support the %s operation "
"yet" % op)
return r
@staticmethod
def ColumnPrefixFilter(args, rows):
"""This is filter for testing "in-memory HBase".
This method is called from scan() when 'ColumnPrefixFilter' is found
in the 'filter' argument.
:param args: a list of filter arguments, contain prefix of column
:param rows: a dict of row prefixes for filtering
"""
value = args[0]
column = 'f:' + value
r = {}
for row, data in rows.items():
column_dict = {}
for key in data:
if key.startswith(column):
column_dict[key] = data[key]
r[row] = column_dict
return r
@staticmethod
def RowFilter(args, rows):
"""This is filter for testing "in-memory HBase".
This method is called from scan() when 'RowFilter' is found in the
'filter' argument.
:param args: a list of filter arguments, it contains operator and
sought string
:param rows: a dict of rows which are filtered
"""
op = args[0]
value = args[1]
if value.startswith('regexstring:'):
value = value[len('regexstring:'):]
r = {}
for row, data in rows.items():
try:
g = re.search(value, row).group()
if op == '=':
if g == row:
r[row] = data
else:
raise NotImplementedError("In-memory "
"RowFilter doesn't support "
"the %s operation yet" % op)
except AttributeError:
pass
return r
@staticmethod
def QualifierFilter(args, rows):
"""This method is called from scan() when 'QualifierFilter'
is found in the 'filter' argument
"""
op = args[0]
value = args[1]
if value.startswith('binaryprefix:'):
value = value[len('binaryprefix:'):]
column = 'f:' + value
r = {}
for row in rows:
data = rows[row]
r_data = {}
for key in data:
if (op == '=' and key.startswith(column)) or \
(op == '>=' and key >= column) or \
(op == '<=' and key <= column):
r_data[key] = data[key]
else:
raise NotImplementedError("In-memory QualifierFilter "
"doesn't support the %s "
"operation yet" % op)
if r_data:
r[row] = r_data
return r
class MConnectionPool(object):
def __init__(self):
self.conn = MConnection()
def connection(self):
return self.conn
class MConnection(object):
"""HappyBase.Connection mock
"""
def __init__(self):
self.tables = {}
def __enter__(self, *args, **kwargs):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
pass
def open(self):
LOG.debug(_("Opening in-memory HBase connection"))
def create_table(self, n, families=None):
families = families or {}
if n in self.tables:
return self.tables[n]
t = MTable(n, families)
self.tables[n] = t
return t
def delete_table(self, name, use_prefix=True):
del self.tables[name]
def table(self, name):
return self.create_table(name)

View File

@ -0,0 +1,402 @@
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
""" Various HBase helpers
"""
import copy
import datetime
import json
import bson.json_util
from ceilometer import utils
DTYPE_NAMES = {'none': 0, 'string': 1, 'integer': 2, 'float': 3,
'datetime': 4}
OP_SIGN = {'eq': '=', 'lt': '<', 'le': '<=', 'ne': '!=', 'gt': '>',
'ge': '>='}
def _QualifierFilter(op, qualifier):
return "QualifierFilter (%s, 'binaryprefix:m_%s')" % (op, qualifier)
def timestamp(dt, reverse=True):
"""Timestamp is count of milliseconds since start of epoch.
If reverse=True then timestamp will be reversed. Such a technique is used
in HBase rowkey design when period queries are required. Because of the
fact that rows are sorted lexicographically it's possible to vary whether
the 'oldest' entries will be on top of the table or it should be the newest
ones (reversed timestamp case).
:param dt: datetime which is translated to timestamp
:param reverse: a boolean parameter for reverse or straight count of
timestamp in milliseconds
:return: count or reversed count of milliseconds since start of epoch
"""
epoch = datetime.datetime(1970, 1, 1)
td = dt - epoch
ts = td.microseconds + td.seconds * 1000000 + td.days * 86400000000
return 0x7fffffffffffffff - ts if reverse else ts
def make_events_query_from_filter(event_filter):
"""Return start and stop row for filtering and a query which based on the
selected parameter.
:param event_filter: storage.EventFilter object.
"""
q = []
res_q = None
start = "%s" % (timestamp(event_filter.start_time, reverse=False)
if event_filter.start_time else "")
stop = "%s" % (timestamp(event_filter.end_time, reverse=False)
if event_filter.end_time else "")
if event_filter.event_type:
q.append("SingleColumnValueFilter ('f', 'event_type', = , "
"'binary:%s')" % dump(event_filter.event_type))
if event_filter.message_id:
q.append("RowFilter ( = , 'regexstring:\d*_%s')" %
event_filter.message_id)
if len(q):
res_q = " AND ".join(q)
if event_filter.traits_filter:
for trait_filter in event_filter.traits_filter:
q_trait = make_query(trait_query=True, **trait_filter)
if q_trait:
if res_q:
res_q += " AND " + q_trait
else:
res_q = q_trait
return res_q, start, stop
def make_timestamp_query(func, start=None, start_op=None, end=None,
end_op=None, bounds_only=False, **kwargs):
"""Return a filter start and stop row for filtering and a query
which based on the fact that CF-name is 'rts'.
:param start: Optional start timestamp
:param start_op: Optional start timestamp operator, like gt, ge
:param end: Optional end timestamp
:param end_op: Optional end timestamp operator, like lt, le
:param bounds_only: if True than query will not be returned
:param func: a function that provide a format of row
:param kwargs: kwargs for :param func
"""
rts_start, rts_end = get_start_end_rts(start, start_op, end, end_op)
start_row, end_row = func(rts_start, rts_end, **kwargs)
if bounds_only:
return start_row, end_row
q = []
# We dont need to dump here because get_start_end_rts returns strings
if rts_start:
q.append("SingleColumnValueFilter ('f', 'rts', <=, 'binary:%s')" %
rts_start)
if rts_end:
q.append("SingleColumnValueFilter ('f', 'rts', >=, 'binary:%s')" %
rts_end)
res_q = None
if len(q):
res_q = " AND ".join(q)
return start_row, end_row, res_q
def get_start_end_rts(start, start_op, end, end_op):
rts_start = str(timestamp(start) + 1) if start else ""
rts_end = str(timestamp(end) + 1) if end else ""
# By default, we are using ge for lower bound and lt for upper bound
if start_op == 'gt':
rts_start = str(long(rts_start) - 2)
if end_op == 'le':
rts_end = str(long(rts_end) - 1)
return rts_start, rts_end
def make_query(metaquery=None, trait_query=None, **kwargs):
"""Return a filter query string based on the selected parameters.
:param metaquery: optional metaquery dict
:param trait_query: optional boolean, for trait_query from kwargs
:param kwargs: key-value pairs to filter on. Key should be a real
column name in db
"""
q = []
res_q = None
# Query for traits differs from others. It is constructed with
# SingleColumnValueFilter with the possibility to choose comparision
# operator
if trait_query:
trait_name = kwargs.pop('key')
op = kwargs.pop('op', 'eq')
for k, v in kwargs.items():
if v is not None:
res_q = ("SingleColumnValueFilter "
"('f', '%s+%d', %s, 'binary:%s', true, true)" %
(trait_name, DTYPE_NAMES[k], OP_SIGN[op],
dump(v)))
return res_q
# Note: we use extended constructor for SingleColumnValueFilter here.
# It is explicitly specified that entry should not be returned if CF is not
# found in table.
for key, value in sorted(kwargs.items()):
if value is not None:
if key == 'source':
q.append("SingleColumnValueFilter "
"('f', 's_%s', =, 'binary:%s', true, true)" %
(value, dump('1')))
elif key == 'trait_type':
q.append("ColumnPrefixFilter('%s')" % value)
else:
q.append("SingleColumnValueFilter "
"('f', '%s', =, 'binary:%s', true, true)" %
(key, dump(value)))
res_q = None
if len(q):
res_q = " AND ".join(q)
if metaquery:
meta_q = []
for k, v in metaquery.items():
meta_q.append(
"SingleColumnValueFilter ('f', '%s', =, 'binary:%s', "
"true, true)"
% ('r_' + k, dump(v)))
meta_q = " AND ".join(meta_q)
# join query and metaquery
if res_q is not None:
res_q += " AND " + meta_q
else:
res_q = meta_q # metaquery only
return res_q
def get_meter_columns(metaquery, **kwargs):
"""Return a list of required columns in meter table to be scanned .
:param metaquery: optional metaquery dict
:param kwargs: key-value pairs to filter on. Key should be a real
column name in db
"""
columns = ['f:message', 'f:recorded_at']
columns.extend(["f:%s" % k for k, v in kwargs.items() if v])
if metaquery:
columns.extend(["f:r_%s" % k for k, v in metaquery.items() if v])
return columns
def make_sample_query_from_filter(sample_filter, require_meter=True):
"""Return a query dictionary based on the settings in the filter.
:param sample_filter: SampleFilter instance
:param require_meter: If true and the filter does not have a meter,
raise an error.
"""
meter = sample_filter.meter
if not meter and require_meter:
raise RuntimeError('Missing required meter specifier')
start_row, end_row, ts_query = make_timestamp_query(
make_general_rowkey_scan,
start=sample_filter.start, start_op=sample_filter.start_timestamp_op,
end=sample_filter.end, end_op=sample_filter.end_timestamp_op,
some_id=meter)
kwargs = dict(user_id=sample_filter.user,
project_id=sample_filter.project,
counter_name=meter,
resource_id=sample_filter.resource,
source=sample_filter.source,
message_id=sample_filter.message_id)
q = make_query(metaquery=sample_filter.metaquery, **kwargs)
if q:
ts_query = (" AND " + ts_query) if ts_query else ""
res_q = q + ts_query if ts_query else q
else:
res_q = ts_query if ts_query else None
columns = get_meter_columns(metaquery=sample_filter.metaquery, **kwargs)
return res_q, start_row, end_row, columns
def make_meter_query_for_resource(start_timestamp, start_timestamp_op,
end_timestamp, end_timestamp_op, source,
query=None):
"""This method is used when Resource table should be filtered by meters.
In this method we are looking into all qualifiers with m_ prefix.
:param start_timestamp: meter's timestamp start range.
:param start_timestamp_op: meter's start time operator, like ge, gt.
:param end_timestamp: meter's timestamp end range.
:param end_timestamp_op: meter's end time operator, like lt, le.
:param source: source filter.
:param query: a query string to concatenate with.
"""
start_rts, end_rts = get_start_end_rts(start_timestamp,
start_timestamp_op,
end_timestamp, end_timestamp_op)
mq = []
if start_rts:
filter_value = start_rts + '+' + source if source else start_rts
mq.append(_QualifierFilter("<=", filter_value))
if end_rts:
filter_value = end_rts + '+' + source if source else end_rts
mq.append(_QualifierFilter(">=", filter_value))
if mq:
meter_q = " AND ".join(mq)
# If there is a filtering on time_range we need to point that
# qualifiers should start with m_. Overwise in case e.g.
# QualifierFilter (>=, 'binaryprefix:m_9222030811134775808')
# qualifier 's_test' satisfies the filter and will be returned.
meter_q = _QualifierFilter("=", '') + " AND " + meter_q
query = meter_q if not query else query + " AND " + meter_q
return query
def make_general_rowkey_scan(rts_start=None, rts_end=None, some_id=None):
"""If it's filter on some_id without start and end,
start_row = some_id while end_row = some_id + MAX_BYTE
"""
if some_id is None:
return None, None
if not rts_start:
rts_start = chr(127)
end_row = "%s_%s" % (some_id, rts_start)
start_row = "%s_%s" % (some_id, rts_end)
return start_row, end_row
def format_meter_reference(c_name, c_type, c_unit, rts, source):
"""Format reference to meter data.
"""
return "%s+%s+%s!%s!%s" % (rts, source, c_name, c_type, c_unit)
def timestamp_from_record_tuple(record):
"""Extract timestamp from HBase tuple record
"""
return record[0]['timestamp']
def resource_id_from_record_tuple(record):
"""Extract resource_id from HBase tuple record
"""
return record[0]['resource_id']
def deserialize_entry(entry, get_raw_meta=True):
"""Return a list of flatten_result, sources, meters and metadata
flatten_result contains a dict of simple structures such as 'resource_id':1
sources/meters are the lists of sources and meters correspondingly.
metadata is metadata dict. This dict may be returned as flattened if
get_raw_meta is False.
:param entry: entry from HBase, without row name and timestamp
:param get_raw_meta: If true then raw metadata will be returned,
if False metadata will be constructed from 'f:r_metadata.' fields
"""
flatten_result = {}
sources = []
meters = []
metadata_flattened = {}
for k, v in entry.items():
if k.startswith('f:s_'):
sources.append(k[4:])
elif k.startswith('f:r_metadata.'):
metadata_flattened[k[len('f:r_metadata.'):]] = load(v)
elif k.startswith("f:m_"):
meter = (k[4:], load(v))
meters.append(meter)
else:
flatten_result[k[2:]] = load(v)
if get_raw_meta:
metadata = flatten_result.get('resource_metadata', {})
else:
metadata = metadata_flattened
return flatten_result, sources, meters, metadata
def serialize_entry(data=None, **kwargs):
"""Return a dict that is ready to be stored to HBase
:param data: dict to be serialized
:param kwargs: additional args
"""
data = data or {}
entry_dict = copy.copy(data)
entry_dict.update(**kwargs)
result = {}
for k, v in entry_dict.items():
if k == 'source':
# user, project and resource tables may contain several sources.
# Besides, resource table may contain several meters.
# To make insertion safe we need to store all meters and sources in
# a separate cell. For this purpose s_ and m_ prefixes are
# introduced.
result['f:s_%s' % v] = dump('1')
elif k == 'meter':
for meter, ts in v.items():
result['f:m_%s' % meter] = dump(ts)
elif k == 'resource_metadata':
# keep raw metadata as well as flattened to provide
# capability with API v2. It will be flattened in another
# way on API level. But we need flattened too for quick filtering.
flattened_meta = dump_metadata(v)
for k, m in flattened_meta.items():
result['f:r_metadata.' + k] = dump(m)
result['f:resource_metadata'] = dump(v)
else:
result['f:' + k] = dump(v)
return result
def dump_metadata(meta):
resource_metadata = {}
for key, v in utils.dict_to_keyval(meta):
resource_metadata[key] = v
return resource_metadata
def dump(data):
return json.dumps(data, default=bson.json_util.default)
def load(data):
return json.loads(data, object_hook=object_hook)
# We don't want to have tzinfo in decoded json.This object_hook is
# overwritten json_util.object_hook for $date
def object_hook(dct):
if "$date" in dct:
dt = bson.json_util.object_hook(dct)
return dt.replace(tzinfo=None)
return bson.json_util.object_hook(dct)

View File

@ -1,9 +1,4 @@
#
# Copyright 2012, 2013 Dell Inc.
#
# Author: Stas Maksimov <Stanislav_M@dell.com>
# Author: Shengjie Min <Shengjie_Min@dell.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
@ -17,18 +12,13 @@
# under the License.
"""HBase storage backend
"""
import copy
import datetime
import hashlib
import json
import operator
import os
import re
import six
import six.moves.urllib.parse as urlparse
import time
import bson.json_util
import happybase
from ceilometer.alarm.storage import models as alarm_models
@ -37,6 +27,8 @@ from ceilometer.openstack.common import log
from ceilometer.openstack.common import network_utils
from ceilometer.openstack.common import timeutils
from ceilometer.storage import base
from ceilometer.storage.hbase import inmemory as hbase_inmemory
from ceilometer.storage.hbase import utils as hbase_utils
from ceilometer.storage import models
from ceilometer import utils
@ -61,11 +53,6 @@ AVAILABLE_STORAGE_CAPABILITIES = {
'storage': {'production_ready': True},
}
DTYPE_NAMES = {'none': 0, 'string': 1, 'integer': 2, 'float': 3,
'datetime': 4}
OP_SIGN = {'eq': '=', 'lt': '<', 'le': '<=', 'ne': '!=', 'gt': '>',
'ge': '>='}
class Connection(base.Connection):
"""Put the data into a HBase database
@ -185,7 +172,8 @@ class Connection(base.Connection):
if Connection._memory_instance is None:
LOG.debug(_('Creating a new in-memory HBase '
'Connection object'))
Connection._memory_instance = MConnectionPool()
Connection._memory_instance = \
hbase_inmemory.MConnectionPool()
self.conn_pool = Connection._memory_instance
else:
self.conn_pool = self._get_connection_pool(opts)
@ -259,11 +247,12 @@ class Connection(base.Connection):
call as_dict()
"""
_id = alarm.alarm_id
alarm_to_store = serialize_entry(alarm.as_dict())
alarm_to_store = hbase_utils.serialize_entry(alarm.as_dict())
with self.conn_pool.connection() as conn:
alarm_table = conn.table(self.ALARM_TABLE)
alarm_table.put(_id, alarm_to_store)
stored_alarm = deserialize_entry(alarm_table.row(_id))[0]
stored_alarm = hbase_utils.deserialize_entry(
alarm_table.row(_id))[0]
return alarm_models.Alarm(**stored_alarm)
create_alarm = update_alarm
@ -281,24 +270,26 @@ class Connection(base.Connection):
if meter:
raise NotImplementedError('Filter by meter not implemented')
q = make_query(alarm_id=alarm_id, name=name, enabled=enabled,
user_id=user, project_id=project, state=state)
q = hbase_utils.make_query(alarm_id=alarm_id, name=name,
enabled=enabled, user_id=user,
project_id=project, state=state)
with self.conn_pool.connection() as conn:
alarm_table = conn.table(self.ALARM_TABLE)
gen = alarm_table.scan(filter=q)
for ignored, data in gen:
stored_alarm = deserialize_entry(data)[0]
stored_alarm = hbase_utils.deserialize_entry(data)[0]
yield alarm_models.Alarm(**stored_alarm)
def get_alarm_changes(self, alarm_id, on_behalf_of,
user=None, project=None, type=None,
start_timestamp=None, start_timestamp_op=None,
end_timestamp=None, end_timestamp_op=None):
q = make_query(alarm_id=alarm_id, on_behalf_of=on_behalf_of, type=type,
user_id=user, project_id=project)
start_row, end_row = make_timestamp_query(
_make_general_rowkey_scan,
q = hbase_utils.make_query(alarm_id=alarm_id,
on_behalf_of=on_behalf_of, type=type,
user_id=user, project_id=project)
start_row, end_row = hbase_utils.make_timestamp_query(
hbase_utils.make_general_rowkey_scan,
start=start_timestamp, start_op=start_timestamp_op,
end=end_timestamp, end_op=end_timestamp_op, bounds_only=True,
some_id=alarm_id)
@ -307,15 +298,15 @@ class Connection(base.Connection):
gen = alarm_history_table.scan(filter=q, row_start=start_row,
row_stop=end_row)
for ignored, data in gen:
stored_entry = deserialize_entry(data)[0]
stored_entry = hbase_utils.deserialize_entry(data)[0]
yield alarm_models.AlarmChange(**stored_entry)
def record_alarm_change(self, alarm_change):
"""Record alarm change event.
"""
alarm_change_dict = serialize_entry(alarm_change)
alarm_change_dict = hbase_utils.serialize_entry(alarm_change)
ts = alarm_change.get('timestamp') or datetime.datetime.now()
rts = timestamp(ts)
rts = hbase_utils.timestamp(ts)
with self.conn_pool.connection() as conn:
alarm_history_table = conn.table(self.ALARM_HISTORY_TABLE)
alarm_history_table.put(alarm_change.get('alarm_id') + "_" +
@ -333,13 +324,13 @@ class Connection(base.Connection):
resource_metadata = data.get('resource_metadata', {})
# Determine the name of new meter
rts = timestamp(data['timestamp'])
new_meter = _format_meter_reference(
rts = hbase_utils.timestamp(data['timestamp'])
new_meter = hbase_utils.format_meter_reference(
data['counter_name'], data['counter_type'],
data['counter_unit'], rts, data['source'])
#TODO(nprivalova): try not to store resource_id
resource = serialize_entry(**{
resource = hbase_utils.serialize_entry(**{
'source': data['source'],
'meter': {new_meter: data['timestamp']},
'resource_metadata': resource_metadata,
@ -360,11 +351,9 @@ class Connection(base.Connection):
m.update("%s%s%s" % (data['user_id'], data['resource_id'],
data['project_id']))
row = "%s_%d_%s" % (data['counter_name'], rts, m.hexdigest())
record = serialize_entry(data, **{'source': data['source'],
'rts': rts,
'message': data,
'recorded_at': timeutils.utcnow(
)})
record = hbase_utils.serialize_entry(
data, **{'source': data['source'], 'rts': rts,
'message': data, 'recorded_at': timeutils.utcnow()})
meter_table.put(row, record)
def get_resources(self, user=None, project=None, source=None,
@ -387,16 +376,20 @@ class Connection(base.Connection):
if pagination:
raise NotImplementedError('Pagination not implemented')
q = make_query(metaquery=metaquery, user_id=user, project_id=project,
resource_id=resource, source=source)
q = make_meter_query_for_resource(start_timestamp, start_timestamp_op,
end_timestamp, end_timestamp_op,
source, q)
q = hbase_utils.make_query(metaquery=metaquery, user_id=user,
project_id=project,
resource_id=resource, source=source)
q = hbase_utils.make_meter_query_for_resource(start_timestamp,
start_timestamp_op,
end_timestamp,
end_timestamp_op,
source, q)
with self.conn_pool.connection() as conn:
resource_table = conn.table(self.RESOURCE_TABLE)
LOG.debug(_("Query Resource table: %s") % q)
for resource_id, data in resource_table.scan(filter=q):
f_res, sources, meters, md = deserialize_entry(data)
f_res, sources, meters, md = hbase_utils.deserialize_entry(
data)
# Unfortunately happybase doesn't keep ordered result from
# HBase. So that's why it's needed to find min and max
# manually
@ -410,7 +403,7 @@ class Connection(base.Connection):
row = resource_table.row(
resource_id, columns=['f:project_id', 'f:user_id',
'f:resource_metadata'])
f_res, _s, _m, md = deserialize_entry(row)
f_res, _s, _m, md = hbase_utils.deserialize_entry(row)
yield models.Resource(
resource_id=resource_id,
first_sample_timestamp=first_ts,
@ -438,9 +431,10 @@ class Connection(base.Connection):
raise NotImplementedError(_('Pagination not implemented'))
with self.conn_pool.connection() as conn:
resource_table = conn.table(self.RESOURCE_TABLE)
q = make_query(metaquery=metaquery, user_id=user,
project_id=project, resource_id=resource,
source=source)
q = hbase_utils.make_query(metaquery=metaquery, user_id=user,
project_id=project,
resource_id=resource,
source=source)
LOG.debug(_("Query Resource table: %s") % q)
gen = resource_table.scan(filter=q)
@ -449,7 +443,8 @@ class Connection(base.Connection):
# https://bugs.launchpad.net/ceilometer/+bug/1301371
result = set()
for ignored, data in gen:
flatten_result, s, meters, md = deserialize_entry(data)
flatten_result, s, meters, md = hbase_utils.deserialize_entry(
data)
for m in meters:
_m_rts, m_source, m_raw = m[0].split("+")
name, type, unit = m_raw.split('!')
@ -478,13 +473,14 @@ class Connection(base.Connection):
return
with self.conn_pool.connection() as conn:
meter_table = conn.table(self.METER_TABLE)
q, start, stop, columns = make_sample_query_from_filter(
sample_filter, require_meter=False)
q, start, stop, columns = \
hbase_utils.make_sample_query_from_filter(
sample_filter, require_meter=False)
LOG.debug(_("Query Meter Table: %s") % q)
gen = meter_table.scan(filter=q, row_start=start, row_stop=stop,
limit=limit)
for ignored, meter in gen:
d_meter = deserialize_entry(meter)[0]
d_meter = hbase_utils.deserialize_entry(meter)[0]
d_meter['message']['recorded_at'] = d_meter['recorded_at']
yield models.Sample(**d_meter['message'])
@ -534,14 +530,16 @@ class Connection(base.Connection):
with self.conn_pool.connection() as conn:
meter_table = conn.table(self.METER_TABLE)
q, start, stop, columns = make_sample_query_from_filter(
sample_filter)
q, start, stop, columns = \
hbase_utils.make_sample_query_from_filter(sample_filter)
# These fields are used in statistics' calculating
columns.extend(['f:timestamp', 'f:counter_volume',
'f:counter_unit'])
meters = map(deserialize_entry, list(meter for (ignored, meter) in
meter_table.scan(filter=q, row_start=start,
row_stop=stop, columns=columns)))
meters = map(hbase_utils.deserialize_entry,
list(meter for (ignored, meter) in
meter_table.scan(
filter=q, row_start=start,
row_stop=stop, columns=columns)))
if sample_filter.start:
start_time = sample_filter.start
@ -613,7 +611,7 @@ class Connection(base.Connection):
# models.Event or purposes of storage event sorted by
# timestamp in the database.
ts = event_model.generated
row = "%d_%s" % (timestamp(ts, reverse=False),
row = "%d_%s" % (hbase_utils.timestamp(ts, reverse=False),
event_model.message_id)
event_type = event_model.event_type
traits = {}
@ -621,8 +619,9 @@ class Connection(base.Connection):
for trait in event_model.traits:
key = "%s+%d" % (trait.name, trait.dtype)
traits[key] = trait.value
record = serialize_entry(traits, event_type=event_type,
timestamp=ts)
record = hbase_utils.serialize_entry(traits,
event_type=event_type,
timestamp=ts)
try:
events_table.put(row, record)
except Exception as ex:
@ -637,7 +636,8 @@ class Connection(base.Connection):
:param event_filter: storage.EventFilter object, consists of filters
for events that are stored in database.
"""
q, start, stop = make_events_query_from_filter(event_filter)
q, start, stop = hbase_utils.make_events_query_from_filter(
event_filter)
with self.conn_pool.connection() as conn:
events_table = conn.table(self.EVENT_TABLE)
@ -646,7 +646,7 @@ class Connection(base.Connection):
events = []
for event_id, data in gen:
traits = []
events_dict = deserialize_entry(data)[0]
events_dict = hbase_utils.deserialize_entry(data)[0]
for key, value in events_dict.items():
if (not key.startswith('event_type')
and not key.startswith('timestamp')):
@ -673,7 +673,7 @@ class Connection(base.Connection):
event_types = set()
for event_id, data in gen:
events_dict = deserialize_entry(data)[0]
events_dict = hbase_utils.deserialize_entry(data)[0]
for key, value in events_dict.items():
if key.startswith('event_type'):
if value not in event_types:
@ -688,13 +688,13 @@ class Connection(base.Connection):
:param event_type: the type of the Event
"""
q = make_query(event_type=event_type)
q = hbase_utils.make_query(event_type=event_type)
trait_types = set()
with self.conn_pool.connection() as conn:
events_table = conn.table(self.EVENT_TABLE)
gen = events_table.scan(filter=q)
for event_id, data in gen:
events_dict = deserialize_entry(data)[0]
events_dict = hbase_utils.deserialize_entry(data)[0]
for key, value in events_dict.items():
if (not key.startswith('event_type') and
not key.startswith('timestamp')):
@ -717,13 +717,14 @@ class Connection(base.Connection):
:param event_type: the type of the Event to filter by
:param trait_type: the name of the Trait to filter by
"""
q = make_query(event_type=event_type, trait_type=trait_type)
q = hbase_utils.make_query(event_type=event_type,
trait_type=trait_type)
traits = []
with self.conn_pool.connection() as conn:
events_table = conn.table(self.EVENT_TABLE)
gen = events_table.scan(filter=q)
for event_id, data in gen:
events_dict = deserialize_entry(data)[0]
events_dict = hbase_utils.deserialize_entry(data)[0]
for key, value in events_dict.items():
if (not key.startswith('event_type') and
not key.startswith('timestamp')):
@ -732,625 +733,3 @@ class Connection(base.Connection):
dtype=int(tt_number), value=value))
for trait in sorted(traits, key=operator.attrgetter('dtype')):
yield trait
def _QualifierFilter(op, qualifier):
return "QualifierFilter (%s, 'binaryprefix:m_%s')" % (op, qualifier)
###############
# This is a very crude version of "in-memory HBase", which implements just
# enough functionality of HappyBase API to support testing of our driver.
#
class MTable(object):
"""HappyBase.Table mock
"""
def __init__(self, name, families):
self.name = name
self.families = families
self._rows_with_ts = {}
def row(self, key, columns=None):
if key not in self._rows_with_ts:
return {}
res = copy.copy(sorted(six.iteritems(
self._rows_with_ts.get(key)))[-1][1])
if columns:
keys = res.keys()
for key in keys:
if key not in columns:
res.pop(key)
return res
def rows(self, keys):
return ((k, self.row(k)) for k in keys)
def put(self, key, data, ts=None):
# Note: Now we use 'timestamped' but only for one Resource table.
# That's why we may put ts='0' in case when ts is None. If it is
# needed to use 2 types of put in one table ts=0 cannot be used.
if ts is None:
ts = "0"
if key not in self._rows_with_ts:
self._rows_with_ts[key] = {ts: data}
else:
if ts in self._rows_with_ts[key]:
self._rows_with_ts[key][ts].update(data)
else:
self._rows_with_ts[key].update({ts: data})
def delete(self, key):
del self._rows_with_ts[key]
def _get_latest_dict(self, row):
# The idea here is to return latest versions of columns.
# In _rows_with_ts we store {row: {ts_1: {data}, ts_2: {data}}}.
# res will contain a list of tuples [(ts_1, {data}), (ts_2, {data})]
# sorted by ts, i.e. in this list ts_2 is the most latest.
# To get result as HBase provides we should iterate in reverse order
# and get from "latest" data only key-values that are not in newer data
data = {}
for i in sorted(six.iteritems(self._rows_with_ts[row])):
data.update(i[1])
return data
def scan(self, filter=None, columns=None, row_start=None, row_stop=None,
limit=None):
columns = columns or []
sorted_keys = sorted(self._rows_with_ts)
# copy data between row_start and row_stop into a dict
rows = {}
for row in sorted_keys:
if row_start and row < row_start:
continue
if row_stop and row > row_stop:
break
rows[row] = self._get_latest_dict(row)
if columns:
ret = {}
for row, data in six.iteritems(rows):
for key in data:
if key in columns:
ret[row] = data
rows = ret
if filter:
# TODO(jdanjou): we should really parse this properly,
# but at the moment we are only going to support AND here
filters = filter.split('AND')
for f in filters:
# Extract filter name and its arguments
g = re.search("(.*)\((.*),?\)", f)
fname = g.group(1).strip()
fargs = [s.strip().replace('\'', '')
for s in g.group(2).split(',')]
m = getattr(self, fname)
if callable(m):
# overwrite rows for filtering to take effect
# in case of multiple filters
rows = m(fargs, rows)
else:
raise NotImplementedError("%s filter is not implemented, "
"you may want to add it!")
for k in sorted(rows)[:limit]:
yield k, rows[k]
@staticmethod
def SingleColumnValueFilter(args, rows):
"""This method is called from scan() when 'SingleColumnValueFilter'
is found in the 'filter' argument.
"""
op = args[2]
column = "%s:%s" % (args[0], args[1])
value = args[3]
if value.startswith('binary:'):
value = value[7:]
r = {}
for row in rows:
data = rows[row]
if op == '=':
if column in data and data[column] == value:
r[row] = data
elif op == '<=':
if column in data and data[column] <= value:
r[row] = data
elif op == '>=':
if column in data and data[column] >= value:
r[row] = data
else:
raise NotImplementedError("In-memory "
"SingleColumnValueFilter "
"doesn't support the %s operation "
"yet" % op)
return r
@staticmethod
def ColumnPrefixFilter(args, rows):
"""This is filter for testing "in-memory HBase".
This method is called from scan() when 'ColumnPrefixFilter' is found
in the 'filter' argument.
:param args: a list of filter arguments, contain prefix of column
:param rows: a dict of row prefixes for filtering
"""
value = args[0]
column = 'f:' + value
r = {}
for row, data in rows.items():
column_dict = {}
for key in data:
if key.startswith(column):
column_dict[key] = data[key]
r[row] = column_dict
return r
@staticmethod
def RowFilter(args, rows):
"""This is filter for testing "in-memory HBase".
This method is called from scan() when 'RowFilter' is found in the
'filter' argument.
:param args: a list of filter arguments, it contains operator and
sought string
:param rows: a dict of rows which are filtered
"""
op = args[0]
value = args[1]
if value.startswith('regexstring:'):
value = value[len('regexstring:'):]
r = {}
for row, data in rows.items():
try:
g = re.search(value, row).group()
if op == '=':
if g == row:
r[row] = data
else:
raise NotImplementedError("In-memory "
"RowFilter doesn't support "
"the %s operation yet" % op)
except AttributeError:
pass
return r
@staticmethod
def QualifierFilter(args, rows):
"""This method is called from scan() when 'QualifierFilter'
is found in the 'filter' argument
"""
op = args[0]
value = args[1]
if value.startswith('binaryprefix:'):
value = value[len('binaryprefix:'):]
column = 'f:' + value
r = {}
for row in rows:
data = rows[row]
r_data = {}
for key in data:
if (op == '=' and key.startswith(column)) or \
(op == '>=' and key >= column) or \
(op == '<=' and key <= column):
r_data[key] = data[key]
else:
raise NotImplementedError("In-memory QualifierFilter "
"doesn't support the %s "
"operation yet" % op)
if r_data:
r[row] = r_data
return r
class MConnectionPool(object):
def __init__(self):
self.conn = MConnection()
def connection(self):
return self.conn
class MConnection(object):
"""HappyBase.Connection mock
"""
def __init__(self):
self.tables = {}
def __enter__(self, *args, **kwargs):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
pass
def open(self):
LOG.debug(_("Opening in-memory HBase connection"))
def create_table(self, n, families=None):
families = families or {}
if n in self.tables:
return self.tables[n]
t = MTable(n, families)
self.tables[n] = t
return t
def delete_table(self, name, use_prefix=True):
del self.tables[name]
def table(self, name):
return self.create_table(name)
#################################################
# Here be various HBase helpers
def timestamp(dt, reverse=True):
"""Timestamp is count of milliseconds since start of epoch.
If reverse=True then timestamp will be reversed. Such a technique is used
in HBase rowkey design when period queries are required. Because of the
fact that rows are sorted lexicographically it's possible to vary whether
the 'oldest' entries will be on top of the table or it should be the newest
ones (reversed timestamp case).
:param dt: datetime which is translated to timestamp
:param reverse: a boolean parameter for reverse or straight count of
timestamp in milliseconds
:return: count or reversed count of milliseconds since start of epoch
"""
epoch = datetime.datetime(1970, 1, 1)
td = dt - epoch
ts = td.microseconds + td.seconds * 1000000 + td.days * 86400000000
return 0x7fffffffffffffff - ts if reverse else ts
def make_events_query_from_filter(event_filter):
"""Return start and stop row for filtering and a query which based on the
selected parameter.
:param event_filter: storage.EventFilter object.
"""
q = []
res_q = None
start = "%s" % (timestamp(event_filter.start_time, reverse=False)
if event_filter.start_time else "")
stop = "%s" % (timestamp(event_filter.end_time, reverse=False)
if event_filter.end_time else "")
if event_filter.event_type:
q.append("SingleColumnValueFilter ('f', 'event_type', = , "
"'binary:%s')" % dump(event_filter.event_type))
if event_filter.message_id:
q.append("RowFilter ( = , 'regexstring:\d*_%s')" %
event_filter.message_id)
if len(q):
res_q = " AND ".join(q)
if event_filter.traits_filter:
for trait_filter in event_filter.traits_filter:
q_trait = make_query(trait_query=True, **trait_filter)
if q_trait:
if res_q:
res_q += " AND " + q_trait
else:
res_q = q_trait
return res_q, start, stop
def make_timestamp_query(func, start=None, start_op=None, end=None,
end_op=None, bounds_only=False, **kwargs):
"""Return a filter start and stop row for filtering and a query
which based on the fact that CF-name is 'rts'.
:param start: Optional start timestamp
:param start_op: Optional start timestamp operator, like gt, ge
:param end: Optional end timestamp
:param end_op: Optional end timestamp operator, like lt, le
:param bounds_only: if True than query will not be returned
:param func: a function that provide a format of row
:param kwargs: kwargs for :param func
"""
rts_start, rts_end = get_start_end_rts(start, start_op, end, end_op)
start_row, end_row = func(rts_start, rts_end, **kwargs)
if bounds_only:
return start_row, end_row
q = []
# We dont need to dump here because get_start_end_rts returns strings
if rts_start:
q.append("SingleColumnValueFilter ('f', 'rts', <=, 'binary:%s')" %
rts_start)
if rts_end:
q.append("SingleColumnValueFilter ('f', 'rts', >=, 'binary:%s')" %
rts_end)
res_q = None
if len(q):
res_q = " AND ".join(q)
return start_row, end_row, res_q
def get_start_end_rts(start, start_op, end, end_op):
rts_start = str(timestamp(start) + 1) if start else ""
rts_end = str(timestamp(end) + 1) if end else ""
# By default, we are using ge for lower bound and lt for upper bound
if start_op == 'gt':
rts_start = str(long(rts_start) - 2)
if end_op == 'le':
rts_end = str(long(rts_end) - 1)
return rts_start, rts_end
def make_query(metaquery=None, trait_query=None, **kwargs):
"""Return a filter query string based on the selected parameters.
:param metaquery: optional metaquery dict
:param trait_query: optional boolean, for trait_query from kwargs
:param kwargs: key-value pairs to filter on. Key should be a real
column name in db
"""
q = []
res_q = None
# Query for traits differs from others. It is constructed with
# SingleColumnValueFilter with the possibility to choose comparision
# operator
if trait_query:
trait_name = kwargs.pop('key')
op = kwargs.pop('op', 'eq')
for k, v in kwargs.items():
if v is not None:
res_q = ("SingleColumnValueFilter "
"('f', '%s+%d', %s, 'binary:%s', true, true)" %
(trait_name, DTYPE_NAMES[k], OP_SIGN[op],
dump(v)))
return res_q
# Note: we use extended constructor for SingleColumnValueFilter here.
# It is explicitly specified that entry should not be returned if CF is not
# found in table.
for key, value in sorted(kwargs.items()):
if value is not None:
if key == 'source':
q.append("SingleColumnValueFilter "
"('f', 's_%s', =, 'binary:%s', true, true)" %
(value, dump('1')))
elif key == 'trait_type':
q.append("ColumnPrefixFilter('%s')" % value)
else:
q.append("SingleColumnValueFilter "
"('f', '%s', =, 'binary:%s', true, true)" %
(key, dump(value)))
res_q = None
if len(q):
res_q = " AND ".join(q)
if metaquery:
meta_q = []
for k, v in metaquery.items():
meta_q.append(
"SingleColumnValueFilter ('f', '%s', =, 'binary:%s', "
"true, true)"
% ('r_' + k, dump(v)))
meta_q = " AND ".join(meta_q)
# join query and metaquery
if res_q is not None:
res_q += " AND " + meta_q
else:
res_q = meta_q # metaquery only
return res_q
def _get_meter_columns(metaquery, **kwargs):
"""Return a list of required columns in meter table to be scanned .
:param metaquery: optional metaquery dict
:param kwargs: key-value pairs to filter on. Key should be a real
column name in db
"""
columns = ['f:message', 'f:recorded_at']
columns.extend(["f:%s" % k for k, v in kwargs.items() if v])
if metaquery:
columns.extend(["f:r_%s" % k for k, v in metaquery.items() if v])
return columns
def make_sample_query_from_filter(sample_filter, require_meter=True):
"""Return a query dictionary based on the settings in the filter.
:param sample_filter: SampleFilter instance
:param require_meter: If true and the filter does not have a meter,
raise an error.
"""
meter = sample_filter.meter
if not meter and require_meter:
raise RuntimeError('Missing required meter specifier')
start_row, end_row, ts_query = make_timestamp_query(
_make_general_rowkey_scan,
start=sample_filter.start, start_op=sample_filter.start_timestamp_op,
end=sample_filter.end, end_op=sample_filter.end_timestamp_op,
some_id=meter)
kwargs = dict(user_id=sample_filter.user,
project_id=sample_filter.project,
counter_name=meter,
resource_id=sample_filter.resource,
source=sample_filter.source,
message_id=sample_filter.message_id)
q = make_query(metaquery=sample_filter.metaquery, **kwargs)
if q:
ts_query = (" AND " + ts_query) if ts_query else ""
res_q = q + ts_query if ts_query else q
else:
res_q = ts_query if ts_query else None
columns = _get_meter_columns(metaquery=sample_filter.metaquery, **kwargs)
return res_q, start_row, end_row, columns
def make_meter_query_for_resource(start_timestamp, start_timestamp_op,
end_timestamp, end_timestamp_op, source,
query=None):
"""This method is used when Resource table should be filtered by meters.
In this method we are looking into all qualifiers with m_ prefix.
:param start_timestamp: meter's timestamp start range.
:param start_timestamp_op: meter's start time operator, like ge, gt.
:param end_timestamp: meter's timestamp end range.
:param end_timestamp_op: meter's end time operator, like lt, le.
:param source: source filter.
:param query: a query string to concatenate with.
"""
start_rts, end_rts = get_start_end_rts(start_timestamp,
start_timestamp_op,
end_timestamp, end_timestamp_op)
mq = []
if start_rts:
filter_value = start_rts + '+' + source if source else start_rts
mq.append(_QualifierFilter("<=", filter_value))
if end_rts:
filter_value = end_rts + '+' + source if source else end_rts
mq.append(_QualifierFilter(">=", filter_value))
if mq:
meter_q = " AND ".join(mq)
# If there is a filtering on time_range we need to point that
# qualifiers should start with m_. Overwise in case e.g.
# QualifierFilter (>=, 'binaryprefix:m_9222030811134775808')
# qualifier 's_test' satisfies the filter and will be returned.
meter_q = _QualifierFilter("=", '') + " AND " + meter_q
query = meter_q if not query else query + " AND " + meter_q
return query
def _make_general_rowkey_scan(rts_start=None, rts_end=None, some_id=None):
"""If it's filter on some_id without start and end,
start_row = some_id while end_row = some_id + MAX_BYTE
"""
if some_id is None:
return None, None
if not rts_start:
rts_start = chr(127)
end_row = "%s_%s" % (some_id, rts_start)
start_row = "%s_%s" % (some_id, rts_end)
return start_row, end_row
def _format_meter_reference(c_name, c_type, c_unit, rts, source):
"""Format reference to meter data.
"""
return "%s+%s+%s!%s!%s" % (rts, source, c_name, c_type, c_unit)
def _timestamp_from_record_tuple(record):
"""Extract timestamp from HBase tuple record
"""
return record[0]['timestamp']
def _resource_id_from_record_tuple(record):
"""Extract resource_id from HBase tuple record
"""
return record[0]['resource_id']
def deserialize_entry(entry, get_raw_meta=True):
"""Return a list of flatten_result, sources, meters and metadata
flatten_result contains a dict of simple structures such as 'resource_id':1
sources/meters are the lists of sources and meters correspondingly.
metadata is metadata dict. This dict may be returned as flattened if
get_raw_meta is False.
:param entry: entry from HBase, without row name and timestamp
:param get_raw_meta: If true then raw metadata will be returned,
if False metadata will be constructed from 'f:r_metadata.' fields
"""
flatten_result = {}
sources = []
meters = []
metadata_flattened = {}
for k, v in entry.items():
if k.startswith('f:s_'):
sources.append(k[4:])
elif k.startswith('f:r_metadata.'):
metadata_flattened[k[len('f:r_metadata.'):]] = load(v)
elif k.startswith("f:m_"):
meter = (k[4:], load(v))
meters.append(meter)
else:
flatten_result[k[2:]] = load(v)
if get_raw_meta:
metadata = flatten_result.get('resource_metadata', {})
else:
metadata = metadata_flattened
return flatten_result, sources, meters, metadata
def serialize_entry(data=None, **kwargs):
"""Return a dict that is ready to be stored to HBase
:param data: dict to be serialized
:param kwargs: additional args
"""
data = data or {}
entry_dict = copy.copy(data)
entry_dict.update(**kwargs)
result = {}
for k, v in entry_dict.items():
if k == 'source':
# user, project and resource tables may contain several sources.
# Besides, resource table may contain several meters.
# To make insertion safe we need to store all meters and sources in
# a separate cell. For this purpose s_ and m_ prefixes are
# introduced.
result['f:s_%s' % v] = dump('1')
elif k == 'meter':
for meter, ts in v.items():
result['f:m_%s' % meter] = dump(ts)
elif k == 'resource_metadata':
# keep raw metadata as well as flattened to provide
# capability with API v2. It will be flattened in another
# way on API level. But we need flattened too for quick filtering.
flattened_meta = dump_metadata(v)
for k, m in flattened_meta.items():
result['f:r_metadata.' + k] = dump(m)
result['f:resource_metadata'] = dump(v)
else:
result['f:' + k] = dump(v)
return result
def dump_metadata(meta):
resource_metadata = {}
for key, v in utils.dict_to_keyval(meta):
resource_metadata[key] = v
return resource_metadata
def dump(data):
return json.dumps(data, default=bson.json_util.default)
def load(data):
return json.loads(data, object_hook=object_hook)
# We don't want to have tzinfo in decoded json.This object_hook is
# overwritten json_util.object_hook for $date
def object_hook(dct):
if "$date" in dct:
dt = bson.json_util.object_hook(dct)
return dt.replace(tzinfo=None)
return bson.json_util.object_hook(dct)

View File

@ -25,6 +25,7 @@
"""
from mock import patch
from ceilometer.storage.hbase import inmemory as hbase_inmemory
from ceilometer.storage import impl_hbase as hbase
from ceilometer.tests import base as test_base
from ceilometer.tests import db as tests_db
@ -36,7 +37,8 @@ class ConnectionTest(tests_db.TestBase,
@tests_db.run_with('hbase')
def test_hbase_connection(self):
conn = hbase.Connection(self.db_manager.url)
self.assertIsInstance(conn.conn_pool.connection(), hbase.MConnection)
self.assertIsInstance(conn.conn_pool.connection(),
hbase_inmemory.MConnection)
class TestConn(object):
def __init__(self, host, port):