hbase metaquery support
- move HBase configuration comment to docs - store metadata in resource table with prefix "r_" - get_resources and get_meters, get_samples support metaquery - enable api v1 metaquery tests Change-Id: I3285bb420283c2385e6f340ff30e951d58dcb450 Implements: blueprint hbase-metadata-query Fixes: bug #1146655
This commit is contained in:
parent
9d339a59c6
commit
09b4623c57
@ -16,36 +16,9 @@
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
"""Openstack Ceilometer HBase storage backend
|
||||
|
||||
.. note::
|
||||
This driver is designed to enable Ceilometer store its data in HBase.
|
||||
The implementation is using HBase Thrift interface so it's necessary to have
|
||||
the HBase Thrift server installed and started:
|
||||
(https://ccp.cloudera.com/display/CDHDOC/HBase+Installation)
|
||||
|
||||
This driver has been tested against HBase 0.92.1/CDH 4.1.1,
|
||||
HBase 0.94.4/HDP 1.2 and HBase 0.94.5/Apache.
|
||||
Versions earlier than 0.92.1 are not supported due to feature
|
||||
incompatibility.
|
||||
|
||||
Due to limitations of HBase the driver implements its own data aggregations
|
||||
which may harm its performance. It is likely that the performance could be
|
||||
improved if co-processors were used, however at the moment the co-processor
|
||||
support is not exposed through Thrift API.
|
||||
|
||||
The following four tables are expected to exist in HBase:
|
||||
create 'project', {NAME=>'f'}
|
||||
create 'user', {NAME=>'f'}
|
||||
create 'resource', {NAME=>'f'}
|
||||
create 'meter', {NAME=>'f'}
|
||||
|
||||
The driver is using HappyBase which is a wrapper library used to interact
|
||||
with HBase via Thrift protocol:
|
||||
http://happybase.readthedocs.org/en/latest/index.html#
|
||||
|
||||
"""HBase storage backend
|
||||
"""
|
||||
|
||||
from sets import Set
|
||||
from urlparse import urlparse
|
||||
import json
|
||||
import hashlib
|
||||
@ -223,16 +196,24 @@ class Connection(base.Connection):
|
||||
project['f:s_%s' % data['source']] = "1"
|
||||
self.project.put(data['project_id'], project)
|
||||
|
||||
rts = reverse_timestamp(data['timestamp'])
|
||||
|
||||
resource = self.resource.row(data['resource_id'])
|
||||
new_meter = "%s!%s!%s" % (
|
||||
data['counter_name'], data['counter_type'], data['counter_unit'])
|
||||
new_resource = {'f:resource_id': data['resource_id'],
|
||||
'f:project_id': data['project_id'],
|
||||
'f:user_id': data['user_id'],
|
||||
'f:metadata': json.dumps(data['resource_metadata']),
|
||||
'f:source': data["source"],
|
||||
'f:m_%s' % new_meter: "1",
|
||||
# store meters with prefix "m_"
|
||||
'f:m_%s' % new_meter: "1"
|
||||
}
|
||||
# store metadata fields with prefix "r_"
|
||||
resource_metadata = dict(('f:r_%s' % k, v)
|
||||
for (k, v)
|
||||
in data['resource_metadata'].iteritems())
|
||||
new_resource.update(resource_metadata)
|
||||
|
||||
# Update if resource has new information
|
||||
if new_resource != resource:
|
||||
meters = _load_hbase_list(resource, 'm')
|
||||
@ -249,7 +230,6 @@ class Connection(base.Connection):
|
||||
|
||||
# We use reverse timestamps in rowkeys as they are sorted
|
||||
# alphabetically.
|
||||
rts = reverse_timestamp(data['timestamp'])
|
||||
row = "%s_%d_%s" % (data['counter_name'], rts, m.hexdigest())
|
||||
|
||||
# Convert timestamp to string as json.dumps won't
|
||||
@ -309,34 +289,22 @@ class Connection(base.Connection):
|
||||
:param source: Optional source filter.
|
||||
:param start_timestamp: Optional modified timestamp start range.
|
||||
:param end_timestamp: Optional modified timestamp end range.
|
||||
:param metaquery: Optional dict with metadata to match on.
|
||||
"""
|
||||
q, start_row, end_row = make_query(user=user,
|
||||
project=project,
|
||||
source=source,
|
||||
start=start_timestamp,
|
||||
end=end_timestamp,
|
||||
require_meter=False)
|
||||
LOG.debug("q: %s" % q)
|
||||
# TODO implement metaquery support
|
||||
if len(metaquery) > 0:
|
||||
raise NotImplementedError('metaquery not implemented')
|
||||
def make_resource(data):
|
||||
""" transform HBase fields to Resource model
|
||||
"""
|
||||
# convert HBase metadata e.g. f:r_display_name to display_name
|
||||
data['f:metadata'] = dict((k[4:], v)
|
||||
for k, v in data.iteritems()
|
||||
if k.startswith('f:r_'))
|
||||
|
||||
resource_ids = {}
|
||||
g = self.meter.scan(filter=q, row_start=start_row,
|
||||
row_stop=end_row)
|
||||
for ignored, data in g:
|
||||
resource_ids[data['f:resource_id']] = data['f:resource_id']
|
||||
|
||||
q = make_query(user=user, project=project, source=source,
|
||||
query_only=True, require_meter=False)
|
||||
LOG.debug("q: %s" % q)
|
||||
for resource_id, data in self.resource.rows(resource_ids):
|
||||
yield models.Resource(
|
||||
resource_id=resource_id,
|
||||
return models.Resource(
|
||||
resource_id=data['f:resource_id'],
|
||||
project_id=data['f:project_id'],
|
||||
source=data['f:source'],
|
||||
user_id=data['f:user_id'],
|
||||
metadata=json.loads(data['f:metadata']),
|
||||
metadata=data['f:metadata'],
|
||||
meter=[
|
||||
models.ResourceMeter(*(m[4:].split("!")))
|
||||
for m in data
|
||||
@ -344,6 +312,35 @@ class Connection(base.Connection):
|
||||
],
|
||||
)
|
||||
|
||||
q, start_row, stop_row = make_query(user=user,
|
||||
project=project,
|
||||
source=source,
|
||||
start=start_timestamp,
|
||||
end=end_timestamp,
|
||||
require_meter=False,
|
||||
query_only=False)
|
||||
LOG.debug("Query Meter table: %s" % q)
|
||||
gen = self.meter.scan(filter=q, row_start=start_row, row_stop=stop_row)
|
||||
|
||||
# put all the resource_ids in a Set
|
||||
resource_ids = Set()
|
||||
for ignored, data in gen:
|
||||
resource_ids.add(data['f:resource_id'])
|
||||
|
||||
# handle metaquery
|
||||
if len(metaquery) > 0:
|
||||
for ignored, data in self.resource.rows(resource_ids):
|
||||
for k, v in metaquery.iteritems():
|
||||
# if metaquery matches, yield the resource model
|
||||
# e.g. metaquery: metadata.display_name
|
||||
# equals
|
||||
# HBase: f:r_display_name
|
||||
if data['f:r_' + k.split('.', 1)[1]] == v:
|
||||
yield make_resource(data)
|
||||
else:
|
||||
for ignored, data in self.resource.rows(resource_ids):
|
||||
yield make_resource(data)
|
||||
|
||||
def get_meters(self, user=None, project=None, resource=None, source=None,
|
||||
metaquery={}):
|
||||
"""Return an iterable of models.Meter instances
|
||||
@ -354,13 +351,23 @@ class Connection(base.Connection):
|
||||
:param source: Optional source filter.
|
||||
:param metaquery: Optional dict with metadata to match on.
|
||||
"""
|
||||
q, ignored, ignored = make_query(user=user, project=project,
|
||||
resource=resource, source=source,
|
||||
require_meter=False)
|
||||
LOG.debug("q: %s" % q)
|
||||
# TODO implement metaquery support
|
||||
q = make_query(user=user, project=project, resource=resource,
|
||||
source=source, require_meter=False, query_only=True)
|
||||
LOG.debug("Query Resource table: %s" % q)
|
||||
|
||||
# handle metaquery
|
||||
if len(metaquery) > 0:
|
||||
raise NotImplementedError('metaquery not implemented')
|
||||
meta_q = []
|
||||
for k, v in metaquery.iteritems():
|
||||
meta_q.append(
|
||||
"SingleColumnValueFilter ('f', '%s', =, 'binary:%s')"
|
||||
% ('r_' + k.split('.', 1)[1], v))
|
||||
meta_q = " AND ".join(meta_q)
|
||||
# join query and metaquery
|
||||
if q is not None:
|
||||
q += " AND " + meta_q
|
||||
else:
|
||||
q = meta_q # metaquery only
|
||||
|
||||
gen = self.resource.scan(filter=q)
|
||||
|
||||
@ -389,15 +396,36 @@ class Connection(base.Connection):
|
||||
def get_samples(self, sample_filter):
|
||||
"""Return an iterable of models.Sample instances
|
||||
"""
|
||||
def make_sample(data):
|
||||
""" transform HBase fields to Sample model
|
||||
"""
|
||||
data = json.loads(data['f:message'])
|
||||
data['timestamp'] = timeutils.parse_strtime(data['timestamp'])
|
||||
return models.Sample(**data)
|
||||
|
||||
q, start, stop = make_query_from_filter(sample_filter,
|
||||
require_meter=False)
|
||||
LOG.debug("q: %s" % q)
|
||||
LOG.debug("Query Meter Table: %s" % q)
|
||||
|
||||
gen = self.meter.scan(filter=q, row_start=start, row_stop=stop)
|
||||
|
||||
for ignored, meter in gen:
|
||||
meter = json.loads(meter['f:message'])
|
||||
meter['timestamp'] = timeutils.parse_strtime(meter['timestamp'])
|
||||
yield models.Sample(**meter)
|
||||
# TODO (shengjie) put this implementation here because it's failing
|
||||
# the test. bp hbase-meter-table-enhancement will address this
|
||||
# properly.
|
||||
# handle metaquery
|
||||
metaquery = sample_filter.metaquery
|
||||
if len(metaquery) > 0:
|
||||
# metaquery checks resource table
|
||||
resource = self.resource.row(meter['f:resource_id'])
|
||||
|
||||
for k, v in metaquery.iteritems():
|
||||
if resource['f:r_' + k.split('.', 1)[1]] != v:
|
||||
break # if one metaquery doesn't match, break
|
||||
else:
|
||||
yield make_sample(meter)
|
||||
else:
|
||||
yield make_sample(meter)
|
||||
|
||||
def _update_meter_stats(self, stat, meter):
|
||||
"""Do the stats calculation on a requested time bucket in stats dict
|
||||
@ -660,7 +688,8 @@ def reverse_timestamp(dt):
|
||||
def make_query(user=None, project=None, meter=None,
|
||||
resource=None, source=None, start=None, end=None,
|
||||
require_meter=True, query_only=False):
|
||||
"""Return a filter query based on the selected parameters.
|
||||
"""Return a filter query string based on the selected parameters.
|
||||
|
||||
:param user: Optional user-id
|
||||
:param project: Optional project-id
|
||||
:param meter: Optional counter-name
|
||||
@ -687,23 +716,19 @@ def make_query(user=None, project=None, meter=None,
|
||||
if source:
|
||||
q.append("SingleColumnValueFilter "
|
||||
"('f', 'source', =, 'binary:%s')" % source)
|
||||
# when start_time and end_time is provided,
|
||||
# if it's filtered by meter,
|
||||
# rowkey will be used in the query;
|
||||
# if it's non meter filter query(eg. project_id, user_id etc),
|
||||
# SingleColumnValueFilter against rts will be appended to the query
|
||||
# query other tables should have no start and end passed in
|
||||
stopRow, startRow = "", ""
|
||||
|
||||
start_row, end_row = "", ""
|
||||
rts_start = str(reverse_timestamp(start) + 1) if start else ""
|
||||
rts_end = str(reverse_timestamp(end) + 1) if end else ""
|
||||
|
||||
# when start_time and end_time is provided,
|
||||
# if it's filtered by meter,
|
||||
# rowkey will be used in the query;
|
||||
# else it's non meter filter query(e.g. project_id, user_id etc),
|
||||
# SingleColumnValueFilter against rts will be appended to the query
|
||||
# query other tables should have no start and end passed in
|
||||
if meter:
|
||||
# if it's meter filter without start and end,
|
||||
# startRow = meter while stopRow = meter + MAX_BYTE
|
||||
if not rts_start:
|
||||
rts_start = chr(127)
|
||||
stopRow = "%s_%s" % (meter, rts_start)
|
||||
startRow = "%s_%s" % (meter, rts_end)
|
||||
start_row, end_row = _make_rowkey_scan(meter, rts_start, rts_end)
|
||||
elif require_meter:
|
||||
raise RuntimeError('Missing required meter specifier')
|
||||
else:
|
||||
@ -717,10 +742,11 @@ def make_query(user=None, project=None, meter=None,
|
||||
sample_filter = None
|
||||
if len(q):
|
||||
sample_filter = " AND ".join(q)
|
||||
|
||||
if query_only:
|
||||
return sample_filter
|
||||
else:
|
||||
return sample_filter, startRow, stopRow
|
||||
return sample_filter, start_row, end_row
|
||||
|
||||
|
||||
def make_query_from_filter(sample_filter, require_meter=True):
|
||||
@ -730,16 +756,24 @@ def make_query_from_filter(sample_filter, require_meter=True):
|
||||
:param require_meter: If true and the filter does not have a meter,
|
||||
raise an error.
|
||||
"""
|
||||
if sample_filter.metaquery is not None and \
|
||||
len(sample_filter.metaquery) > 0:
|
||||
raise NotImplementedError('metaquery not implemented')
|
||||
|
||||
return make_query(sample_filter.user, sample_filter.project,
|
||||
sample_filter.meter, sample_filter.resource,
|
||||
sample_filter.source, sample_filter.start,
|
||||
sample_filter.end, require_meter)
|
||||
|
||||
|
||||
def _make_rowkey_scan(meter, rts_start=None, rts_end=None):
|
||||
""" if it's meter filter without start and end,
|
||||
start_row = meter while end_row = meter + MAX_BYTE
|
||||
"""
|
||||
if not rts_start:
|
||||
rts_start = chr(127)
|
||||
end_row = "%s_%s" % (meter, rts_start)
|
||||
start_row = "%s_%s" % (meter, rts_end)
|
||||
|
||||
return start_row, end_row
|
||||
|
||||
|
||||
def _load_hbase_list(d, prefix):
|
||||
"""Deserialise dict stored as HBase column family
|
||||
"""
|
||||
|
@ -34,6 +34,10 @@ class TestListEvents(list_events.TestListEvents):
|
||||
database_connection = 'hbase://__test__'
|
||||
|
||||
|
||||
class TestListEventsMetaQuery(list_events.TestListEventsMetaquery):
|
||||
database_connection = 'hbase://__test__'
|
||||
|
||||
|
||||
class TestListEmptyMeters(list_meters.TestListEmptyMeters):
|
||||
database_connection = 'hbase://__test__'
|
||||
|
||||
@ -42,6 +46,10 @@ class TestListMeters(list_meters.TestListMeters):
|
||||
database_connection = 'hbase://__test__'
|
||||
|
||||
|
||||
class TestListMetersMetaquery(list_meters.TestListMetersMetaquery):
|
||||
database_connection = 'hbase://__test__'
|
||||
|
||||
|
||||
class TestListEmptyUsers(list_users.TestListEmptyUsers):
|
||||
database_connection = 'hbase://__test__'
|
||||
|
||||
@ -70,6 +78,10 @@ class TestListResources(list_resources.TestListResources):
|
||||
database_connection = 'hbase://__test__'
|
||||
|
||||
|
||||
class TestListResourcesMetaquery(list_resources.TestListResourcesMetaquery):
|
||||
database_connection = 'hbase://__test__'
|
||||
|
||||
|
||||
class TestListSource(list_sources.TestListSource):
|
||||
database_connection = 'hbase://__test__'
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user