HBase storage driver, initial version.
This is a new storage driver that supports storing of Ceilometer data in an HBase backend. This version does not have metaquery support yet. Implements: blueprint hbase-storage-backend Change-Id: Id3e7ec01434b1be30cce4f91b39461fc389a000f
This commit is contained in:
parent
1d44a182cb
commit
6f7525755f
661
ceilometer/storage/impl_hbase.py
Normal file
661
ceilometer/storage/impl_hbase.py
Normal file
@ -0,0 +1,661 @@
|
||||
# -*- encoding: utf-8 -*-
|
||||
#
|
||||
# Copyright © 2012, 2013 Dell Inc.
|
||||
#
|
||||
# Author: Stas Maksimov <Stanislav_M@dell.com>
|
||||
# Author: Shengjie Min <Shengjie_Min@dell.com>
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
"""Openstack Ceilometer HBase storage backend
|
||||
|
||||
.. note::
|
||||
This driver is designed to enable Ceilometer store its data in HBase.
|
||||
The implementation is using HBase Thrift interface so it's necessary to have
|
||||
the HBase Thrift server installed and started:
|
||||
(https://ccp.cloudera.com/display/CDHDOC/HBase+Installation)
|
||||
|
||||
This driver has been tested against HBase 0.92.1/CDH 4.1.1,
|
||||
HBase 0.94.4/HDP 1.2 and HBase 0.94.5/Apache.
|
||||
Versions earlier than 0.92.1 are not supported due to feature
|
||||
incompatibility.
|
||||
|
||||
Due to limitations of HBase the driver implements its own data aggregations
|
||||
which may harm its performance. It is likely that the performance could be
|
||||
improved if co-processors were used, however at the moment the co-processor
|
||||
support is not exposed through Thrift API.
|
||||
|
||||
The following four tables are expected to exist in HBase:
|
||||
create 'project', {NAME=>'f'}
|
||||
create 'user', {NAME=>'f'}
|
||||
create 'resource', {NAME=>'f'}
|
||||
create 'meter', {NAME=>'f'}
|
||||
|
||||
The driver is using HappyBase which is a wrapper library used to interact
|
||||
with HBase via Thrift protocol:
|
||||
http://happybase.readthedocs.org/en/latest/index.html#
|
||||
|
||||
"""
|
||||
|
||||
from urlparse import urlparse
|
||||
import json
|
||||
import hashlib
|
||||
import copy
|
||||
import datetime
|
||||
import happybase
|
||||
from collections import defaultdict
|
||||
|
||||
from oslo.config import cfg
|
||||
|
||||
from ceilometer.openstack.common import log, timeutils
|
||||
from ceilometer.storage import base
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
|
||||
class HBaseStorage(base.StorageEngine):
|
||||
"""Put the data into a HBase database
|
||||
|
||||
Collections:
|
||||
|
||||
- user
|
||||
- { _id: user id
|
||||
source: [ array of source ids reporting for the user ]
|
||||
}
|
||||
- project
|
||||
- { _id: project id
|
||||
source: [ array of source ids reporting for the project ]
|
||||
}
|
||||
- meter
|
||||
- the raw incoming data
|
||||
- resource
|
||||
- the metadata for resources
|
||||
- { _id: uuid of resource,
|
||||
metadata: metadata dictionaries
|
||||
timestamp: datetime of last update
|
||||
user_id: uuid
|
||||
project_id: uuid
|
||||
meter: [ array of {counter_name: string, counter_type: string} ]
|
||||
}
|
||||
"""
|
||||
|
||||
OPTIONS = [
|
||||
cfg.StrOpt('table_prefix',
|
||||
default='',
|
||||
help='Database table prefix',
|
||||
),
|
||||
]
|
||||
|
||||
def register_opts(self, conf):
|
||||
"""Register any configuration options used by this engine.
|
||||
"""
|
||||
conf.register_opts(self.OPTIONS)
|
||||
|
||||
@staticmethod
|
||||
def get_connection(conf):
|
||||
"""Return a Connection instance based on the configuration settings.
|
||||
"""
|
||||
return Connection(conf)
|
||||
|
||||
|
||||
class Connection(base.Connection):
|
||||
"""HBase connection.
|
||||
"""
|
||||
|
||||
def __init__(self, conf):
|
||||
'''
|
||||
Hbase Connection Initialization
|
||||
'''
|
||||
opts = self._parse_connection_url(conf.database_connection)
|
||||
opts['table_prefix'] = conf.table_prefix
|
||||
self.conn = self._get_connection(opts)
|
||||
self.conn.open()
|
||||
self.project = self.conn.table('project')
|
||||
self.user = self.conn.table('user')
|
||||
self.resource = self.conn.table('resource')
|
||||
self.meter = self.conn.table('meter')
|
||||
|
||||
def upgrade(self, version=None):
|
||||
pass
|
||||
|
||||
def clear(self):
|
||||
pass
|
||||
|
||||
@staticmethod
|
||||
def _get_connection(conf):
|
||||
"""Return a connection to the database.
|
||||
|
||||
.. note::
|
||||
|
||||
The tests use a subclass to override this and return an
|
||||
in-memory connection.
|
||||
"""
|
||||
LOG.debug('connecting to HBase on %s:%s', conf['host'], conf['port'])
|
||||
return happybase.Connection(host=conf['host'], port=conf['port'],
|
||||
table_prefix=conf['table_prefix'])
|
||||
|
||||
@staticmethod
|
||||
def _parse_connection_url(url):
|
||||
"""Parse connection parameters from a database url.
|
||||
|
||||
.. note::
|
||||
|
||||
HBase Thrift does not support authentication and there is no
|
||||
database name, so we are not looking for these in the url.
|
||||
"""
|
||||
opts = {}
|
||||
result = urlparse(url)
|
||||
opts['dbtype'] = result.scheme
|
||||
if ':' in result.netloc:
|
||||
opts['host'], port = result.netloc.split(':')
|
||||
else:
|
||||
opts['host'] = result.netloc
|
||||
port = 9090
|
||||
opts['port'] = port and int(port) or 9090
|
||||
return opts
|
||||
|
||||
def record_metering_data(self, data):
|
||||
"""Write the data to the backend storage system.
|
||||
|
||||
:param data: a dictionary such as returned by
|
||||
ceilometer.meter.meter_message_from_counter
|
||||
"""
|
||||
# Make sure we know about the user and project
|
||||
if data['user_id']:
|
||||
user = self.user.row(data['user_id'])
|
||||
sources = _load_hbase_list(user, 's')
|
||||
# Update if source is new
|
||||
if data['source'] not in sources:
|
||||
user['f:s_%s' % data['source']] = "1"
|
||||
self.user.put(data['user_id'], user)
|
||||
|
||||
project = self.project.row(data['project_id'])
|
||||
sources = _load_hbase_list(project, 's')
|
||||
# Update if source is new
|
||||
if data['source'] not in sources:
|
||||
project['f:s_%s' % data['source']] = "1"
|
||||
self.project.put(data['project_id'], project)
|
||||
|
||||
# Record the updated resource metadata.
|
||||
received_timestamp = timeutils.utcnow()
|
||||
|
||||
resource = self.resource.row(data['resource_id'])
|
||||
new_meter = "%s!%s!%s" % (
|
||||
data['counter_name'], data['counter_type'], data['counter_unit'])
|
||||
new_resource = {'f:resource_id': data['resource_id'],
|
||||
'f:project_id': data['project_id'],
|
||||
'f:user_id': data['user_id'],
|
||||
'f:timestamp': timeutils.strtime(data['timestamp']),
|
||||
'f:received_timestamp': timeutils.strtime(
|
||||
received_timestamp),
|
||||
'f:metadata': json.dumps(data['resource_metadata']),
|
||||
'f:source': data["source"],
|
||||
'f:m_%s' % new_meter: "1",
|
||||
}
|
||||
# Update if resource has new information
|
||||
if new_resource != resource:
|
||||
meters = _load_hbase_list(resource, 'm')
|
||||
if new_meter not in meters:
|
||||
new_resource['f:m_%s' % new_meter] = "1"
|
||||
|
||||
self.resource.put(data['resource_id'], new_resource)
|
||||
|
||||
# Rowkey consists of reversed timestamp, meter and an md5 of
|
||||
# user+resource+project for purposes of uniqueness
|
||||
m = hashlib.md5()
|
||||
m.update("%s%s%s" % (data['user_id'], data['resource_id'],
|
||||
data['project_id']))
|
||||
|
||||
# We use reverse timestamps in rowkeys as they are sorted
|
||||
# alphabetically.
|
||||
rts = reverse_timestamp(data['timestamp'])
|
||||
row = "%s_%d_%s" % (data['counter_name'], rts, m.hexdigest())
|
||||
|
||||
# Convert timestamp to string as json.dumps won't
|
||||
ts = timeutils.strtime(data['timestamp'])
|
||||
|
||||
record = {'f:timestamp': ts,
|
||||
'f:counter_name': data['counter_name'],
|
||||
'f:counter_type': data['counter_type'],
|
||||
'f:counter_volume': str(data['counter_volume']),
|
||||
'f:counter_unit': data['counter_unit'],
|
||||
# TODO(shengjie) consider using QualifierFilter
|
||||
# keep dimensions as column qualifier for quicker look up
|
||||
# TODO(shengjie) extra dimensions need to be added as CQ
|
||||
'f:user_id': data['user_id'],
|
||||
'f:project_id': data['project_id'],
|
||||
'f:resource_id': data['resource_id'],
|
||||
'f:source': data['source'],
|
||||
# add in reversed_ts here for time range scan
|
||||
'f:rts': str(rts)
|
||||
}
|
||||
# Don't want to be changing the original data object
|
||||
data = copy.copy(data)
|
||||
data['timestamp'] = ts
|
||||
# Save original event
|
||||
record['f:message'] = json.dumps(data)
|
||||
self.meter.put(row, record)
|
||||
|
||||
def get_users(self, source=None):
|
||||
"""Return an iterable of user id strings.
|
||||
|
||||
:param source: Optional source filter.
|
||||
"""
|
||||
LOG.debug("source: %s" % source)
|
||||
scan_args = {}
|
||||
if source:
|
||||
scan_args['columns'] = ['f:s_%s' % source]
|
||||
return sorted(key for key, ignored in self.user.scan(**scan_args))
|
||||
|
||||
def get_projects(self, source=None):
|
||||
"""Return an iterable of project id strings.
|
||||
|
||||
:param source: Optional source filter.
|
||||
"""
|
||||
LOG.debug("source: %s" % source)
|
||||
scan_args = {}
|
||||
if source:
|
||||
scan_args['columns'] = ['f:s_%s' % source]
|
||||
return (key for key, ignored in self.project.scan(**scan_args))
|
||||
|
||||
def get_resources(self, user=None, project=None, source=None,
|
||||
start_timestamp=None, end_timestamp=None,
|
||||
metaquery={}):
|
||||
"""Return an iterable of dictionaries containing resource information.
|
||||
|
||||
:type end_timestamp: object
|
||||
{ 'resource_id': UUID of the resource,
|
||||
'project_id': UUID of project owning the resource,
|
||||
'user_id': UUID of user owning the resource,
|
||||
'timestamp': UTC datetime of last update to the resource,
|
||||
'metadata': most current metadata for the resource,
|
||||
'meter': list of the meters reporting data for the resource,
|
||||
}
|
||||
|
||||
:param user: Optional ID for user that owns the resource.
|
||||
:param project: Optional ID for project that owns the resource.
|
||||
:param source: Optional source filter.
|
||||
:param start_timestamp: Optional modified timestamp start range.
|
||||
:param end_timestamp: Optional modified timestamp end range.
|
||||
"""
|
||||
q, start_row, end_row = make_query(user=user,
|
||||
project=project,
|
||||
source=source,
|
||||
start=start_timestamp,
|
||||
end=end_timestamp,
|
||||
require_meter=False)
|
||||
LOG.debug("q: %s" % q)
|
||||
# TODO implement metaquery support
|
||||
if len(metaquery) > 0:
|
||||
raise NotImplementedError('metaquery not implemented')
|
||||
|
||||
resource_ids = {}
|
||||
if start_timestamp or end_timestamp:
|
||||
# Look for resources matching the above criteria and with
|
||||
# events in the time range we care about, then change the
|
||||
# resource query to return just those resources by id.
|
||||
g = self.meter.scan(filter=q, row_start=start_row,
|
||||
row_stop=end_row)
|
||||
for ignored, data in g:
|
||||
resource_ids[data['f:resource_id']] = data['f:resource_id']
|
||||
|
||||
q = make_query(user=user, project=project, source=source,
|
||||
query_only=True, require_meter=False)
|
||||
LOG.debug("q: %s" % q)
|
||||
for resource_id, data in self.resource.scan(filter=q):
|
||||
if not resource_ids or resource_id in resource_ids:
|
||||
r = {'resource_id': resource_id,
|
||||
'metadata': json.loads(data['f:metadata']),
|
||||
'project_id': data['f:project_id'],
|
||||
'received_timestamp': data['f:received_timestamp'],
|
||||
'source': data['f:source'],
|
||||
'timestamp':
|
||||
timeutils.parse_strtime(data['f:timestamp']),
|
||||
'user_id': data['f:user_id'],
|
||||
'meter': []}
|
||||
|
||||
for m in data:
|
||||
if m.startswith('f:m_'):
|
||||
name, type, unit = m[4:].split("!")
|
||||
r['meter'].append({"counter_name": name,
|
||||
"counter_type": type,
|
||||
"counter_unit": unit})
|
||||
|
||||
yield r
|
||||
|
||||
def get_meters(self, user=None, project=None, resource=None, source=None,
|
||||
metaquery={}):
|
||||
"""Return an iterable of dictionaries containing meter information.
|
||||
|
||||
{ 'name': name of the meter,
|
||||
'type': type of the meter (guage, counter),
|
||||
'unit': unit of the meter,
|
||||
'resource_id': UUID of the resource,
|
||||
'project_id': UUID of project owning the resource,
|
||||
'user_id': UUID of user owning the resource,
|
||||
}
|
||||
|
||||
:param user: Optional ID for user that owns the resource.
|
||||
:param project: Optional ID for project that owns the resource.
|
||||
:param resource: Optional resource filter.
|
||||
:param source: Optional source filter.
|
||||
:param metaquery: Optional dict with metadata to match on.
|
||||
"""
|
||||
q, ignored, ignored = make_query(user=user, project=project,
|
||||
resource=resource, source=source,
|
||||
require_meter=False)
|
||||
LOG.debug("q: %s" % q)
|
||||
# TODO implement metaquery support
|
||||
if len(metaquery) > 0:
|
||||
raise NotImplementedError('metaquery not implemented')
|
||||
|
||||
gen = self.resource.scan(filter=q)
|
||||
|
||||
for ignored, data in gen:
|
||||
# Meter columns are stored like this:
|
||||
# "m_{counter_name}|{counter_type}|{counter_unit}" => "1"
|
||||
# where 'm' is a prefix (m for meter), value is always set to 1
|
||||
meter = None
|
||||
for m in data:
|
||||
if m.startswith('f:m_'):
|
||||
meter = m
|
||||
break
|
||||
if meter is None:
|
||||
continue
|
||||
name, type, unit = meter[4:].split("!")
|
||||
m = {'name': name,
|
||||
'type': type,
|
||||
'unit': unit,
|
||||
'resource_id': data['f:resource_id'],
|
||||
'project_id': data['f:project_id'],
|
||||
'user_id': data['f:user_id'],
|
||||
}
|
||||
yield m
|
||||
|
||||
def get_raw_events(self, event_filter):
|
||||
"""Return an iterable of raw event data as created by
|
||||
:func:`ceilometer.meter.meter_message_from_counter`.
|
||||
"""
|
||||
q, start, stop = make_query_from_filter(event_filter,
|
||||
require_meter=False)
|
||||
LOG.debug("q: %s" % q)
|
||||
|
||||
gen = self.meter.scan(filter=q, row_start=start, row_stop=stop)
|
||||
meters = []
|
||||
for ignored, meter in gen:
|
||||
meter = json.loads(meter['f:message'])
|
||||
meter['timestamp'] = timeutils.parse_strtime(meter['timestamp'])
|
||||
meters.append(meter)
|
||||
return meters
|
||||
|
||||
def _update_meter_stats(self, stat, meter):
|
||||
"""Do the stats calculation on a requested time bucket in stats dict
|
||||
|
||||
:param stats: dict where aggregated stats are kept
|
||||
:param index: time bucket index in stats
|
||||
:param meter: meter record as returned from HBase
|
||||
:param start_time: query start time
|
||||
:param period: length of the time bucket
|
||||
"""
|
||||
vol = int(meter['f:counter_volume'])
|
||||
ts = timeutils.parse_strtime(meter['f:timestamp'])
|
||||
stat['min'] = min(vol, stat['min'] or vol)
|
||||
stat['max'] = max(vol, stat['max'])
|
||||
stat['sum'] = vol + (stat['sum'] or 0)
|
||||
stat['count'] += 1
|
||||
stat['avg'] = (stat['sum'] / float(stat['count']))
|
||||
stat['duration_start'] = min(ts, stat['duration_start'] or ts)
|
||||
stat['duration_end'] = max(ts, stat['duration_end'] or ts)
|
||||
stat['duration'] = \
|
||||
timeutils.delta_seconds(stat['duration_start'],
|
||||
stat['duration_end'])
|
||||
|
||||
def get_meter_statistics(self, event_filter, period=None):
|
||||
"""Return a dictionary containing meter statistics.
|
||||
described by the query parameters.
|
||||
|
||||
The filter must have a meter value set.
|
||||
|
||||
{ 'min':
|
||||
'max':
|
||||
'avg':
|
||||
'sum':
|
||||
'count':
|
||||
'period':
|
||||
'period_start':
|
||||
'period_end':
|
||||
'duration':
|
||||
'duration_start':
|
||||
'duration_end':
|
||||
}
|
||||
|
||||
.. note::
|
||||
|
||||
Due to HBase limitations the aggregations are implemented
|
||||
in the driver itself, therefore this method will be quite slow
|
||||
because of all the Thrift traffic it is going to create.
|
||||
"""
|
||||
q, start, stop = make_query_from_filter(event_filter)
|
||||
|
||||
meters = list(meter for (ignored, meter) in
|
||||
self.meter.scan(filter=q,
|
||||
row_start=start,
|
||||
row_stop=stop)
|
||||
)
|
||||
|
||||
start_time = event_filter.start \
|
||||
or timeutils.parse_strtime(meters[-1]['f:timestamp'])
|
||||
end_time = event_filter.end \
|
||||
or timeutils.parse_strtime(meters[0]['f:timestamp'])
|
||||
|
||||
results = []
|
||||
|
||||
if not period:
|
||||
period = 0
|
||||
period_start = start_time
|
||||
period_end = end_time
|
||||
|
||||
# As our HBase meters are stored as newest-first, we need to iterate
|
||||
# in the reverse order
|
||||
for meter in meters[::-1]:
|
||||
ts = timeutils.parse_strtime(meter['f:timestamp'])
|
||||
if period:
|
||||
offset = int(timeutils.delta_seconds(
|
||||
start_time, ts) / period) * period
|
||||
period_start = start_time + datetime.timedelta(0, offset)
|
||||
|
||||
if not len(results) or not results[-1]['period_start'] == \
|
||||
period_start:
|
||||
if period:
|
||||
period_end = period_start + datetime.timedelta(
|
||||
0, period)
|
||||
results.append({'count': 0,
|
||||
'min': 0,
|
||||
'max': 0,
|
||||
'avg': 0,
|
||||
'sum': 0,
|
||||
'period': period,
|
||||
'period_start': period_start,
|
||||
'period_end': period_end,
|
||||
'duration': None,
|
||||
'duration_start': None,
|
||||
'duration_end': None,
|
||||
})
|
||||
self._update_meter_stats(results[-1], meter)
|
||||
return list(results)
|
||||
|
||||
def get_volume_sum(self, event_filter):
|
||||
"""Return the sum of the volume field for the events
|
||||
described by the query parameters.
|
||||
"""
|
||||
q, start, stop = make_query_from_filter(event_filter)
|
||||
LOG.debug("q: %s" % q)
|
||||
gen = self.meter.scan(filter=q, row_start=start, row_stop=stop)
|
||||
results = defaultdict(int)
|
||||
for ignored, meter in gen:
|
||||
results[meter['f:resource_id']] \
|
||||
+= int(meter['f:counter_volume'])
|
||||
|
||||
return ({'resource_id': k, 'value': v}
|
||||
for (k, v) in results.iteritems())
|
||||
|
||||
def get_volume_max(self, event_filter):
|
||||
"""Return the maximum of the volume field for the events
|
||||
described by the query parameters.
|
||||
"""
|
||||
|
||||
q, start, stop = make_query_from_filter(event_filter)
|
||||
LOG.debug("q: %s" % q)
|
||||
gen = self.meter.scan(filter=q, row_start=start, row_stop=stop)
|
||||
results = defaultdict(int)
|
||||
for ignored, meter in gen:
|
||||
results[meter['f:resource_id']] = \
|
||||
max(results[meter['f:resource_id']],
|
||||
int(meter['f:counter_volume']))
|
||||
return ({'resource_id': k, 'value': v}
|
||||
for (k, v) in results.iteritems())
|
||||
|
||||
def get_event_interval(self, event_filter):
|
||||
"""Return the min and max timestamps from events,
|
||||
using the event_filter to limit the events seen.
|
||||
|
||||
( datetime.datetime(), datetime.datetime() )
|
||||
"""
|
||||
q, start, stop = make_query_from_filter(event_filter)
|
||||
LOG.debug("q: %s" % q)
|
||||
gen = self.meter.scan(filter=q, row_start=start, row_stop=stop)
|
||||
a_min = None
|
||||
a_max = None
|
||||
for ignored, meter in gen:
|
||||
timestamp = timeutils.parse_strtime(meter['f:timestamp'])
|
||||
if a_min is None:
|
||||
a_min = timestamp
|
||||
else:
|
||||
if timestamp < a_min:
|
||||
a_min = timestamp
|
||||
if a_max is None:
|
||||
a_max = timestamp
|
||||
else:
|
||||
if timestamp > a_max:
|
||||
a_max = timestamp
|
||||
|
||||
return a_min, a_max
|
||||
|
||||
|
||||
#################################################
|
||||
# Here be various HBase helpers
|
||||
def reverse_timestamp(dt):
|
||||
"""Reverse timestamp so that newer timestamps are represented by smaller
|
||||
numbers than older ones.
|
||||
|
||||
Reverse timestamps is a technique used in HBase rowkey design. When period
|
||||
queries are required the HBase rowkeys must include timestamps, but as
|
||||
rowkeys in HBase are ordered lexicographically, the timestamps must be
|
||||
reversed.
|
||||
"""
|
||||
epoch = datetime.datetime(1970, 1, 1)
|
||||
td = dt - epoch
|
||||
ts = (td.microseconds +
|
||||
(td.seconds + td.days * 24 * 3600) * 100000) / 100000
|
||||
return 0x7fffffffffffffff - ts
|
||||
|
||||
|
||||
def make_query(user=None, project=None, meter=None,
|
||||
resource=None, source=None, start=None, end=None,
|
||||
require_meter=True, query_only=False):
|
||||
"""Return a filter query based on the selected parameters.
|
||||
:param user: Optional user-id
|
||||
:param project: Optional project-id
|
||||
:param meter: Optional counter-name
|
||||
:param resource: Optional resource-id
|
||||
:param source: Optional source-id
|
||||
:param start: Optional start timestamp
|
||||
:param end: Optional end timestamp
|
||||
:param require_meter: If true and the filter does not have a meter,
|
||||
raise an error.
|
||||
:param query_only: If true only returns the filter query,
|
||||
otherwise also returns start and stop rowkeys
|
||||
"""
|
||||
q = []
|
||||
|
||||
if user:
|
||||
q.append("SingleColumnValueFilter ('f', 'user_id', =, 'binary:%s')"
|
||||
% user)
|
||||
if project:
|
||||
q.append("SingleColumnValueFilter ('f', 'project_id', =, 'binary:%s')"
|
||||
% project)
|
||||
if resource:
|
||||
q.append("SingleColumnValueFilter ('f', 'resource_id', =, 'binary:%s')"
|
||||
% resource)
|
||||
if source:
|
||||
q.append("SingleColumnValueFilter "
|
||||
"('f', 'source', =, 'binary:%s')" % source)
|
||||
# when start_time and end_time is provided,
|
||||
# if it's filtered by meter,
|
||||
# rowkey will be used in the query;
|
||||
# if it's non meter filter query(eg. project_id, user_id etc),
|
||||
# SingleColumnValueFilter against rts will be appended to the query
|
||||
# query other tables should have no start and end passed in
|
||||
stopRow, startRow = "", ""
|
||||
rts_start = str(reverse_timestamp(start) + 1) if start else ""
|
||||
rts_end = str(reverse_timestamp(end) + 1) if end else ""
|
||||
|
||||
if meter:
|
||||
# if it's meter filter without start and end,
|
||||
# startRow = meter while stopRow = meter + MAX_BYTE
|
||||
if not rts_start:
|
||||
rts_start = chr(127)
|
||||
stopRow = "%s_%s" % (meter, rts_start)
|
||||
startRow = "%s_%s" % (meter, rts_end)
|
||||
elif require_meter:
|
||||
raise RuntimeError('Missing required meter specifier')
|
||||
else:
|
||||
if rts_start:
|
||||
q.append("SingleColumnValueFilter ('f', 'rts', <=, 'binary:%s')" %
|
||||
rts_start)
|
||||
if rts_end:
|
||||
q.append("SingleColumnValueFilter ('f', 'rts', >=, 'binary:%s')" %
|
||||
rts_end)
|
||||
|
||||
query_filter = None
|
||||
if len(q):
|
||||
query_filter = " AND ".join(q)
|
||||
if query_only:
|
||||
return query_filter
|
||||
else:
|
||||
return query_filter, startRow, stopRow
|
||||
|
||||
|
||||
def make_query_from_filter(event_filter, require_meter=True):
|
||||
"""Return a query dictionary based on the settings in the filter.
|
||||
|
||||
:param filter: EventFilter instance
|
||||
:param require_meter: If true and the filter does not have a meter,
|
||||
raise an error.
|
||||
"""
|
||||
if event_filter.metaquery is not None and len(event_filter.metaquery) > 0:
|
||||
raise NotImplementedError('metaquery not implemented')
|
||||
|
||||
return make_query(event_filter.user, event_filter.project,
|
||||
event_filter.meter, event_filter.resource,
|
||||
event_filter.source, event_filter.start,
|
||||
event_filter.end, require_meter)
|
||||
|
||||
|
||||
def _load_hbase_list(d, prefix):
|
||||
"""Deserialise dict stored as HBase column family
|
||||
"""
|
||||
ret = []
|
||||
prefix = 'f:%s_' % prefix
|
||||
for key in (k for k in d if k.startswith(prefix)):
|
||||
ret.append(key[len(prefix):])
|
||||
return ret
|
1
setup.py
1
setup.py
@ -127,6 +127,7 @@ setuptools.setup(
|
||||
postgresql = ceilometer.storage.impl_sqlalchemy:SQLAlchemyStorage
|
||||
sqlite = ceilometer.storage.impl_sqlalchemy:SQLAlchemyStorage
|
||||
test = ceilometer.storage.impl_test:TestDBStorage
|
||||
hbase = ceilometer.storage.impl_hbase:HBaseStorage
|
||||
|
||||
[ceilometer.compute.virt]
|
||||
libvirt = ceilometer.compute.virt.libvirt.inspector:LibvirtInspector
|
||||
|
291
tests/storage/test_impl_hbase.py
Normal file
291
tests/storage/test_impl_hbase.py
Normal file
@ -0,0 +1,291 @@
|
||||
# -*- encoding: utf-8 -*-
|
||||
#
|
||||
# Copyright © 2012, 2013 Dell Inc.
|
||||
#
|
||||
# Author: Stas Maksimov <Stanislav_M@dell.com>
|
||||
# Author: Shengjie Min <Shengjie_Min@dell.com>
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
"""Tests for ceilometer/storage/impl_hbase.py
|
||||
|
||||
.. note::
|
||||
To run the tests using in-memory mocked HappyBase API,
|
||||
set the environment variable CEILOMETER_TEST_LIVE=0 (this is the default
|
||||
value)
|
||||
|
||||
In order to run the tests against real HBase server set the environment
|
||||
variable CEILOMETER_TEST_LIVE=1 and set HBASE_URL below to
|
||||
point to that HBase instance before running the tests. Make sure the Thrift
|
||||
server is running on that server.
|
||||
|
||||
"""
|
||||
|
||||
from time import sleep
|
||||
import logging
|
||||
|
||||
import os
|
||||
import copy
|
||||
import re
|
||||
|
||||
from oslo.config import cfg
|
||||
|
||||
from tests.storage import base
|
||||
from ceilometer.storage import impl_hbase
|
||||
|
||||
from ceilometer.storage.impl_hbase import _load_hbase_list
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
CEILOMETER_TEST_LIVE = bool(int(os.environ.get('CEILOMETER_TEST_LIVE', 0)))
|
||||
|
||||
# Export this variable before running tests against real HBase
|
||||
# e.g. export CEILOMETER_TEST_HBASE_URL = hbase://192.168.1.100:9090
|
||||
CEILOMETER_TEST_HBASE_URL = os.environ.get('CEILOMETER_TEST_HBASE_URL')
|
||||
if CEILOMETER_TEST_LIVE:
|
||||
if not CEILOMETER_TEST_HBASE_URL:
|
||||
raise RuntimeError("CEILOMETER_TEST_LIVE is on, but "
|
||||
"CEILOMETER_TEST_HBASE_URL is not defined")
|
||||
PROJECT_TABLE = "project"
|
||||
USER_TABLE = "user"
|
||||
RESOURCE_TABLE = "resource"
|
||||
METER_TABLE = "meter"
|
||||
|
||||
TABLES = [PROJECT_TABLE, USER_TABLE, RESOURCE_TABLE, METER_TABLE]
|
||||
|
||||
|
||||
class TestConnection(impl_hbase.Connection):
|
||||
|
||||
def __init__(self, conf):
|
||||
if CEILOMETER_TEST_LIVE:
|
||||
super(TestConnection, self).__init__(conf)
|
||||
else:
|
||||
self.conn = MConnection()
|
||||
self.project = self.conn.table('project')
|
||||
self.user = self.conn.table('user')
|
||||
self.resource = self.conn.table('resource')
|
||||
self.meter = self.conn.table('meter')
|
||||
|
||||
def create_schema(self):
|
||||
LOG.debug('Creating HBase schema...')
|
||||
self.conn.create_table(PROJECT_TABLE, {'f': dict()})
|
||||
self.conn.create_table(USER_TABLE, {'f': dict()})
|
||||
self.conn.create_table(RESOURCE_TABLE, {'f': dict()})
|
||||
self.conn.create_table(METER_TABLE, {'f': dict()})
|
||||
# Real HBase needs some time to propagate create_table changes
|
||||
if CEILOMETER_TEST_LIVE:
|
||||
sleep(10)
|
||||
|
||||
def drop_schema(self):
|
||||
LOG.debug('Dropping HBase schema...')
|
||||
for table in TABLES:
|
||||
try:
|
||||
self.conn.disable_table(table)
|
||||
except:
|
||||
None
|
||||
try:
|
||||
self.conn.delete_table(table)
|
||||
except:
|
||||
None
|
||||
# Real HBase needs some time to propagate delete_table changes
|
||||
if CEILOMETER_TEST_LIVE:
|
||||
sleep(10)
|
||||
|
||||
|
||||
class HBaseEngine(base.DBEngineBase):
|
||||
|
||||
def get_connection(self):
|
||||
self.conf = cfg.CONF
|
||||
|
||||
self.conf.database_connection = CEILOMETER_TEST_HBASE_URL
|
||||
# use prefix so we don't affect any existing tables
|
||||
self.conf.table_prefix = 't'
|
||||
|
||||
self.conn = TestConnection(self.conf)
|
||||
|
||||
self.conn.drop_schema()
|
||||
self.conn.create_schema()
|
||||
|
||||
self.conn.upgrade()
|
||||
return self.conn
|
||||
|
||||
def clean_up(self):
|
||||
pass
|
||||
|
||||
def get_sources_by_project_id(self, id):
|
||||
project = self.conn.project.row(id)
|
||||
return _load_hbase_list(project, 's')
|
||||
|
||||
def get_sources_by_user_id(self, id):
|
||||
user = self.conn.user.row(id)
|
||||
return _load_hbase_list(user, 's')
|
||||
|
||||
|
||||
class HBaseEngineTestBase(base.DBTestBase):
|
||||
|
||||
def get_engine(cls):
|
||||
return HBaseEngine()
|
||||
|
||||
|
||||
class UserTest(base.UserTest, HBaseEngineTestBase):
|
||||
pass
|
||||
|
||||
|
||||
class ProjectTest(base.ProjectTest, HBaseEngineTestBase):
|
||||
pass
|
||||
|
||||
|
||||
class ResourceTest(base.ResourceTest, HBaseEngineTestBase):
|
||||
pass
|
||||
|
||||
|
||||
class MeterTest(base.MeterTest, HBaseEngineTestBase):
|
||||
pass
|
||||
|
||||
|
||||
class RawEventTest(base.RawEventTest, HBaseEngineTestBase):
|
||||
pass
|
||||
|
||||
|
||||
class TestGetEventInterval(base.TestGetEventInterval,
|
||||
HBaseEngineTestBase):
|
||||
pass
|
||||
|
||||
|
||||
class SumTest(base.SumTest, HBaseEngineTestBase):
|
||||
pass
|
||||
|
||||
|
||||
class MaxProjectTest(base.MaxProjectTest, HBaseEngineTestBase):
|
||||
pass
|
||||
|
||||
|
||||
class MaxResourceTest(base.MaxResourceTest, HBaseEngineTestBase):
|
||||
pass
|
||||
|
||||
|
||||
class StatisticsTest(base.StatisticsTest, HBaseEngineTestBase):
|
||||
pass
|
||||
|
||||
|
||||
###############
|
||||
# This is a very crude version of "in-memory HBase", which implements just
|
||||
# enough functionality of HappyBase API to support testing of our driver.
|
||||
#
|
||||
class MTable():
|
||||
"""HappyBase.Table mock
|
||||
"""
|
||||
def __init__(self, name, families):
|
||||
self.name = name
|
||||
self.families = families
|
||||
self.rows = {}
|
||||
|
||||
def row(self, key):
|
||||
return self.rows[key] if key in self.rows else {}
|
||||
|
||||
def put(self, key, data):
|
||||
self.rows[key] = data
|
||||
|
||||
def scan(self, filter=None, columns=[], row_start=None, row_stop=None):
|
||||
sorted_keys = sorted(self.rows)
|
||||
# copy data into a sorted dict
|
||||
rows = {}
|
||||
for row in sorted_keys:
|
||||
if row_start:
|
||||
if row < row_start:
|
||||
continue
|
||||
if row_stop:
|
||||
if row > row_stop:
|
||||
break
|
||||
rows[row] = copy.copy(self.rows[row])
|
||||
if columns:
|
||||
ret = {}
|
||||
for row in rows.keys():
|
||||
data = rows[row]
|
||||
for key in data:
|
||||
# if all(key in columns for key in data):
|
||||
if key in columns:
|
||||
ret[row] = data
|
||||
rows = ret
|
||||
elif filter:
|
||||
# TODO: we should really parse this properly, but at the moment we
|
||||
# are only going to support AND here
|
||||
filters = filter.split('AND')
|
||||
for f in filters:
|
||||
# Extract filter name and its arguments
|
||||
g = re.search("(.*)\((.*),?\)", f)
|
||||
fname = g.group(1).strip()
|
||||
fargs = [s.strip().replace('\'', '').replace('\"', '')
|
||||
for s in g.group(2).split(',')]
|
||||
m = getattr(self, fname)
|
||||
if callable(m):
|
||||
# overwrite rows for filtering to take effect
|
||||
# in case of multiple filters
|
||||
rows = m(fargs, rows)
|
||||
else:
|
||||
raise NotImplementedError("%s filter is not implemented, "
|
||||
"you may want to add it!")
|
||||
for k in sorted(rows):
|
||||
yield k, rows[k]
|
||||
|
||||
def SingleColumnValueFilter(self, args, rows):
|
||||
"""This method is called from scan() when 'SingleColumnValueFilter'
|
||||
is found in the 'filter' argument
|
||||
"""
|
||||
op = args[2]
|
||||
column = "%s:%s" % (args[0], args[1])
|
||||
value = args[3]
|
||||
if value.startswith('binary:'):
|
||||
value = value[7:]
|
||||
r = {}
|
||||
for row in rows:
|
||||
data = rows[row]
|
||||
|
||||
if op == '=':
|
||||
if column in data and data[column] == value:
|
||||
r[row] = data
|
||||
elif op == '<=':
|
||||
if column in data and data[column] <= value:
|
||||
r[row] = data
|
||||
elif op == '>=':
|
||||
if column in data and data[column] >= value:
|
||||
r[row] = data
|
||||
else:
|
||||
raise NotImplementedError("In-memory "
|
||||
"SingleColumnValueFilter "
|
||||
"doesn't support the %s operation "
|
||||
"yet" % op)
|
||||
return r
|
||||
|
||||
|
||||
class MConnection():
|
||||
"""HappyBase.Connection mock
|
||||
"""
|
||||
def __init__(self):
|
||||
self.tables = {}
|
||||
|
||||
def open(self):
|
||||
LOG.debug("Opening in-memory HBase connection")
|
||||
return
|
||||
|
||||
def create_table(self, n, families={}):
|
||||
if n in self.tables:
|
||||
return self.tables[n]
|
||||
t = MTable(n, families)
|
||||
self.tables[n] = t
|
||||
return t
|
||||
|
||||
def delete_table(self, name, use_prefix=True):
|
||||
self.tables.remove(self.tables[name])
|
||||
|
||||
def table(self, name):
|
||||
return self.create_table(name)
|
@ -22,3 +22,4 @@ extras
|
||||
wsme>=0.5b1
|
||||
pyyaml
|
||||
http://tarballs.openstack.org/oslo-config/oslo-config-2013.1b4.tar.gz#egg=oslo-config
|
||||
happybase>=0.4
|
||||
|
Loading…
Reference in New Issue
Block a user