15a92ec1ef
We currently can disable computing signature for metering message, but HBase still uses signature as a dimension to generate unique row key, this will lead to different samples treated as same one. Message id should be used in stead of message signature, because id will always exist and also unique. Change-Id: I47cdce59c9934573076cf609ce1a0c37aea75c44 Closes-Bug: #1445227
423 lines
18 KiB
Python
423 lines
18 KiB
Python
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
import datetime
|
|
import operator
|
|
import time
|
|
|
|
from oslo_utils import timeutils
|
|
|
|
import ceilometer
|
|
from ceilometer.i18n import _
|
|
from ceilometer.openstack.common import log
|
|
from ceilometer.storage import base
|
|
from ceilometer.storage.hbase import base as hbase_base
|
|
from ceilometer.storage.hbase import migration as hbase_migration
|
|
from ceilometer.storage.hbase import utils as hbase_utils
|
|
from ceilometer.storage import models
|
|
from ceilometer import utils
|
|
|
|
LOG = log.getLogger(__name__)
|
|
|
|
|
|
AVAILABLE_CAPABILITIES = {
|
|
'meters': {'query': {'simple': True,
|
|
'metadata': True}},
|
|
'resources': {'query': {'simple': True,
|
|
'metadata': True}},
|
|
'samples': {'query': {'simple': True,
|
|
'metadata': True}},
|
|
'statistics': {'query': {'simple': True,
|
|
'metadata': True},
|
|
'aggregation': {'standard': True}},
|
|
}
|
|
|
|
|
|
AVAILABLE_STORAGE_CAPABILITIES = {
|
|
'storage': {'production_ready': True},
|
|
}
|
|
|
|
|
|
class Connection(hbase_base.Connection, base.Connection):
|
|
"""Put the metering data into a HBase database
|
|
|
|
Collections:
|
|
|
|
- meter (describes sample actually):
|
|
|
|
- row-key: consists of reversed timestamp, meter and a message uuid
|
|
for purposes of uniqueness
|
|
- Column Families:
|
|
|
|
f: contains the following qualifiers:
|
|
|
|
- counter_name: <name of counter>
|
|
- counter_type: <type of counter>
|
|
- counter_unit: <unit of counter>
|
|
- counter_volume: <volume of counter>
|
|
- message: <raw incoming data>
|
|
- message_id: <id of message>
|
|
- message_signature: <signature of message>
|
|
- resource_metadata: raw metadata for corresponding resource
|
|
of the meter
|
|
- project_id: <id of project>
|
|
- resource_id: <id of resource>
|
|
- user_id: <id of user>
|
|
- recorded_at: <datetime when sample has been recorded (utc.now)>
|
|
- flattened metadata with prefix r_metadata. e.g.::
|
|
|
|
f:r_metadata.display_name or f:r_metadata.tag
|
|
|
|
- rts: <reversed timestamp of entry>
|
|
- timestamp: <meter's timestamp (came from message)>
|
|
- source for meter with prefix 's'
|
|
|
|
- resource:
|
|
|
|
- row_key: uuid of resource
|
|
- Column Families:
|
|
|
|
f: contains the following qualifiers:
|
|
|
|
- resource_metadata: raw metadata for corresponding resource
|
|
- project_id: <id of project>
|
|
- resource_id: <id of resource>
|
|
- user_id: <id of user>
|
|
- flattened metadata with prefix r_metadata. e.g.::
|
|
|
|
f:r_metadata.display_name or f:r_metadata.tag
|
|
|
|
- sources for all corresponding meters with prefix 's'
|
|
- all meters with prefix 'm' for this resource in format:
|
|
|
|
.. code-block:: python
|
|
|
|
"%s:%s:%s:%s:%s" % (rts, source, counter_name, counter_type,
|
|
counter_unit)
|
|
"""
|
|
|
|
CAPABILITIES = utils.update_nested(base.Connection.CAPABILITIES,
|
|
AVAILABLE_CAPABILITIES)
|
|
STORAGE_CAPABILITIES = utils.update_nested(
|
|
base.Connection.STORAGE_CAPABILITIES,
|
|
AVAILABLE_STORAGE_CAPABILITIES,
|
|
)
|
|
_memory_instance = None
|
|
|
|
RESOURCE_TABLE = "resource"
|
|
METER_TABLE = "meter"
|
|
|
|
def __init__(self, url):
|
|
super(Connection, self).__init__(url)
|
|
|
|
def upgrade(self):
|
|
tables = [self.RESOURCE_TABLE, self.METER_TABLE]
|
|
column_families = {'f': dict(max_versions=1)}
|
|
with self.conn_pool.connection() as conn:
|
|
hbase_utils.create_tables(conn, tables, column_families)
|
|
hbase_migration.migrate_tables(conn, tables)
|
|
|
|
def clear(self):
|
|
LOG.debug(_('Dropping HBase schema...'))
|
|
with self.conn_pool.connection() as conn:
|
|
for table in [self.RESOURCE_TABLE,
|
|
self.METER_TABLE]:
|
|
try:
|
|
conn.disable_table(table)
|
|
except Exception:
|
|
LOG.debug(_('Cannot disable table but ignoring error'))
|
|
try:
|
|
conn.delete_table(table)
|
|
except Exception:
|
|
LOG.debug(_('Cannot delete table but ignoring error'))
|
|
|
|
def record_metering_data(self, data):
|
|
"""Write the data to the backend storage system.
|
|
|
|
:param data: a dictionary such as returned by
|
|
ceilometer.meter.meter_message_from_counter
|
|
"""
|
|
with self.conn_pool.connection() as conn:
|
|
resource_table = conn.table(self.RESOURCE_TABLE)
|
|
meter_table = conn.table(self.METER_TABLE)
|
|
|
|
resource_metadata = data.get('resource_metadata', {})
|
|
# Determine the name of new meter
|
|
rts = hbase_utils.timestamp(data['timestamp'])
|
|
new_meter = hbase_utils.prepare_key(
|
|
rts, data['source'], data['counter_name'],
|
|
data['counter_type'], data['counter_unit'])
|
|
|
|
# TODO(nprivalova): try not to store resource_id
|
|
resource = hbase_utils.serialize_entry(**{
|
|
'source': data['source'],
|
|
'meter': {new_meter: data['timestamp']},
|
|
'resource_metadata': resource_metadata,
|
|
'resource_id': data['resource_id'],
|
|
'project_id': data['project_id'], 'user_id': data['user_id']})
|
|
# Here we put entry in HBase with our own timestamp. This is needed
|
|
# when samples arrive out-of-order
|
|
# If we use timestamp=data['timestamp'] the newest data will be
|
|
# automatically 'on the top'. It is needed to keep metadata
|
|
# up-to-date: metadata from newest samples is considered as actual.
|
|
ts = int(time.mktime(data['timestamp'].timetuple()) * 1000)
|
|
resource_table.put(hbase_utils.encode_unicode(data['resource_id']),
|
|
resource, ts)
|
|
|
|
# Rowkey consists of reversed timestamp, meter and a
|
|
# message uuid for purposes of uniqueness
|
|
row = hbase_utils.prepare_key(data['counter_name'], rts,
|
|
data['message_id'])
|
|
record = hbase_utils.serialize_entry(
|
|
data, **{'source': data['source'], 'rts': rts,
|
|
'message': data, 'recorded_at': timeutils.utcnow()})
|
|
meter_table.put(row, record)
|
|
|
|
def get_resources(self, user=None, project=None, source=None,
|
|
start_timestamp=None, start_timestamp_op=None,
|
|
end_timestamp=None, end_timestamp_op=None,
|
|
metaquery=None, resource=None, pagination=None):
|
|
"""Return an iterable of models.Resource instances
|
|
|
|
:param user: Optional ID for user that owns the resource.
|
|
:param project: Optional ID for project that owns the resource.
|
|
:param source: Optional source filter.
|
|
:param start_timestamp: Optional modified timestamp start range.
|
|
:param start_timestamp_op: Optional start time operator, like ge, gt.
|
|
:param end_timestamp: Optional modified timestamp end range.
|
|
:param end_timestamp_op: Optional end time operator, like lt, le.
|
|
:param metaquery: Optional dict with metadata to match on.
|
|
:param resource: Optional resource filter.
|
|
:param pagination: Optional pagination query.
|
|
"""
|
|
if pagination:
|
|
raise ceilometer.NotImplementedError('Pagination not implemented')
|
|
|
|
q = hbase_utils.make_query(metaquery=metaquery, user_id=user,
|
|
project_id=project,
|
|
resource_id=resource, source=source)
|
|
q = hbase_utils.make_meter_query_for_resource(start_timestamp,
|
|
start_timestamp_op,
|
|
end_timestamp,
|
|
end_timestamp_op,
|
|
source, q)
|
|
with self.conn_pool.connection() as conn:
|
|
resource_table = conn.table(self.RESOURCE_TABLE)
|
|
LOG.debug(_("Query Resource table: %s") % q)
|
|
for resource_id, data in resource_table.scan(filter=q):
|
|
f_res, sources, meters, md = hbase_utils.deserialize_entry(
|
|
data)
|
|
resource_id = hbase_utils.encode_unicode(resource_id)
|
|
# Unfortunately happybase doesn't keep ordered result from
|
|
# HBase. So that's why it's needed to find min and max
|
|
# manually
|
|
first_ts = min(meters, key=operator.itemgetter(1))[1]
|
|
last_ts = max(meters, key=operator.itemgetter(1))[1]
|
|
source = meters[0][0][1]
|
|
# If we use QualifierFilter then HBase returnes only
|
|
# qualifiers filtered by. It will not return the whole entry.
|
|
# That's why if we need to ask additional qualifiers manually.
|
|
if 'project_id' not in f_res and 'user_id' not in f_res:
|
|
row = resource_table.row(
|
|
resource_id, columns=['f:project_id', 'f:user_id',
|
|
'f:resource_metadata'])
|
|
f_res, _s, _m, md = hbase_utils.deserialize_entry(row)
|
|
yield models.Resource(
|
|
resource_id=resource_id,
|
|
first_sample_timestamp=first_ts,
|
|
last_sample_timestamp=last_ts,
|
|
project_id=f_res['project_id'],
|
|
source=source,
|
|
user_id=f_res['user_id'],
|
|
metadata=md)
|
|
|
|
def get_meters(self, user=None, project=None, resource=None, source=None,
|
|
metaquery=None, pagination=None):
|
|
"""Return an iterable of models.Meter instances
|
|
|
|
:param user: Optional ID for user that owns the resource.
|
|
:param project: Optional ID for project that owns the resource.
|
|
:param resource: Optional resource filter.
|
|
:param source: Optional source filter.
|
|
:param metaquery: Optional dict with metadata to match on.
|
|
:param pagination: Optional pagination query.
|
|
"""
|
|
|
|
metaquery = metaquery or {}
|
|
|
|
if pagination:
|
|
raise ceilometer.NotImplementedError(
|
|
_('Pagination not implemented'))
|
|
with self.conn_pool.connection() as conn:
|
|
resource_table = conn.table(self.RESOURCE_TABLE)
|
|
q = hbase_utils.make_query(metaquery=metaquery, user_id=user,
|
|
project_id=project,
|
|
resource_id=resource,
|
|
source=source)
|
|
LOG.debug(_("Query Resource table: %s") % q)
|
|
|
|
gen = resource_table.scan(filter=q)
|
|
# We need result set to be sure that user doesn't receive several
|
|
# same meters. Please see bug
|
|
# https://bugs.launchpad.net/ceilometer/+bug/1301371
|
|
result = set()
|
|
for ignored, data in gen:
|
|
flatten_result, s, meters, md = hbase_utils.deserialize_entry(
|
|
data)
|
|
for m in meters:
|
|
_m_rts, m_source, name, m_type, unit = m[0]
|
|
meter_dict = {'name': name,
|
|
'type': m_type,
|
|
'unit': unit,
|
|
'resource_id': flatten_result['resource_id'],
|
|
'project_id': flatten_result['project_id'],
|
|
'user_id': flatten_result['user_id']}
|
|
frozen_meter = frozenset(meter_dict.items())
|
|
if frozen_meter in result:
|
|
continue
|
|
result.add(frozen_meter)
|
|
meter_dict.update({'source': m_source
|
|
if m_source else None})
|
|
|
|
yield models.Meter(**meter_dict)
|
|
|
|
def get_samples(self, sample_filter, limit=None):
|
|
"""Return an iterable of models.Sample instances.
|
|
|
|
:param sample_filter: Filter.
|
|
:param limit: Maximum number of results to return.
|
|
"""
|
|
if limit == 0:
|
|
return
|
|
with self.conn_pool.connection() as conn:
|
|
meter_table = conn.table(self.METER_TABLE)
|
|
q, start, stop, columns = (hbase_utils.
|
|
make_sample_query_from_filter
|
|
(sample_filter, require_meter=False))
|
|
LOG.debug(_("Query Meter Table: %s") % q)
|
|
gen = meter_table.scan(filter=q, row_start=start, row_stop=stop,
|
|
limit=limit, columns=columns)
|
|
for ignored, meter in gen:
|
|
d_meter = hbase_utils.deserialize_entry(meter)[0]
|
|
d_meter['message']['recorded_at'] = d_meter['recorded_at']
|
|
yield models.Sample(**d_meter['message'])
|
|
|
|
@staticmethod
|
|
def _update_meter_stats(stat, meter):
|
|
"""Do the stats calculation on a requested time bucket in stats dict
|
|
|
|
:param stats: dict where aggregated stats are kept
|
|
:param index: time bucket index in stats
|
|
:param meter: meter record as returned from HBase
|
|
:param start_time: query start time
|
|
:param period: length of the time bucket
|
|
"""
|
|
vol = meter['counter_volume']
|
|
ts = meter['timestamp']
|
|
stat.unit = meter['counter_unit']
|
|
stat.min = min(vol, stat.min or vol)
|
|
stat.max = max(vol, stat.max)
|
|
stat.sum = vol + (stat.sum or 0)
|
|
stat.count += 1
|
|
stat.avg = (stat.sum / float(stat.count))
|
|
stat.duration_start = min(ts, stat.duration_start or ts)
|
|
stat.duration_end = max(ts, stat.duration_end or ts)
|
|
stat.duration = (timeutils.delta_seconds(stat.duration_start,
|
|
stat.duration_end))
|
|
|
|
def get_meter_statistics(self, sample_filter, period=None, groupby=None,
|
|
aggregate=None):
|
|
"""Return an iterable of models.Statistics instances.
|
|
|
|
Items are containing meter statistics described by the query
|
|
parameters. The filter must have a meter value set.
|
|
|
|
.. note::
|
|
|
|
Due to HBase limitations the aggregations are implemented
|
|
in the driver itself, therefore this method will be quite slow
|
|
because of all the Thrift traffic it is going to create.
|
|
"""
|
|
if groupby:
|
|
raise ceilometer.NotImplementedError("Group by not implemented.")
|
|
|
|
if aggregate:
|
|
raise ceilometer.NotImplementedError(
|
|
'Selectable aggregates not implemented')
|
|
|
|
with self.conn_pool.connection() as conn:
|
|
meter_table = conn.table(self.METER_TABLE)
|
|
q, start, stop, columns = (hbase_utils.
|
|
make_sample_query_from_filter
|
|
(sample_filter))
|
|
# These fields are used in statistics' calculating
|
|
columns.extend(['f:timestamp', 'f:counter_volume',
|
|
'f:counter_unit'])
|
|
meters = map(hbase_utils.deserialize_entry,
|
|
list(meter for (ignored, meter) in
|
|
meter_table.scan(
|
|
filter=q, row_start=start,
|
|
row_stop=stop, columns=columns)))
|
|
|
|
if sample_filter.start_timestamp:
|
|
start_time = sample_filter.start_timestamp
|
|
elif meters:
|
|
start_time = meters[-1][0]['timestamp']
|
|
else:
|
|
start_time = None
|
|
|
|
if sample_filter.end_timestamp:
|
|
end_time = sample_filter.end_timestamp
|
|
elif meters:
|
|
end_time = meters[0][0]['timestamp']
|
|
else:
|
|
end_time = None
|
|
|
|
results = []
|
|
|
|
if not period:
|
|
period = 0
|
|
period_start = start_time
|
|
period_end = end_time
|
|
|
|
# As our HBase meters are stored as newest-first, we need to iterate
|
|
# in the reverse order
|
|
for meter in meters[::-1]:
|
|
ts = meter[0]['timestamp']
|
|
if period:
|
|
offset = int(timeutils.delta_seconds(
|
|
start_time, ts) / period) * period
|
|
period_start = start_time + datetime.timedelta(0, offset)
|
|
|
|
if not results or not results[-1].period_start == period_start:
|
|
if period:
|
|
period_end = period_start + datetime.timedelta(
|
|
0, period)
|
|
results.append(
|
|
models.Statistics(unit='',
|
|
count=0,
|
|
min=0,
|
|
max=0,
|
|
avg=0,
|
|
sum=0,
|
|
period=period,
|
|
period_start=period_start,
|
|
period_end=period_end,
|
|
duration=None,
|
|
duration_start=None,
|
|
duration_end=None,
|
|
groupby=None)
|
|
)
|
|
self._update_meter_stats(results[-1], meter[0])
|
|
return results
|