# # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. """HBase storage backend """ import datetime import hashlib import operator import os import time import happybase from six.moves.urllib import parse as urlparse from ceilometer.alarm.storage import models as alarm_models from ceilometer.openstack.common.gettextutils import _ from ceilometer.openstack.common import log from ceilometer.openstack.common import network_utils from ceilometer.openstack.common import timeutils from ceilometer.storage import base from ceilometer.storage.hbase import inmemory as hbase_inmemory from ceilometer.storage.hbase import utils as hbase_utils from ceilometer.storage import models from ceilometer import utils LOG = log.getLogger(__name__) AVAILABLE_CAPABILITIES = { 'meters': {'query': {'simple': True, 'metadata': True}}, 'resources': {'query': {'simple': True, 'metadata': True}}, 'samples': {'query': {'simple': True, 'metadata': True}}, 'statistics': {'query': {'simple': True, 'metadata': True}, 'aggregation': {'standard': True}}, 'events': {'query': {'simple': True}}, 'alarms': {'query': {'simple': True, 'complex': False}, 'history': {'query': {'simple': True, 'complex': False}}}, } AVAILABLE_STORAGE_CAPABILITIES = { 'storage': {'production_ready': True}, } class Connection(base.Connection): """Put the data into a HBase database Collections: - meter (describes sample actually): - row-key: consists of reversed timestamp, meter and an md5 of user+resource+project for purposes of uniqueness - Column Families: f: contains the following qualifiers: - counter_name: - counter_type: - counter_unit: - counter_volume: - message: - message_id: - message_signature: - resource_metadata: raw metadata for corresponding resource of the meter - project_id: - resource_id: - user_id: - recorded_at: - flattened metadata with prefix r_metadata. e.g.:: f:r_metadata.display_name or f:r_metadata.tag - rts: - timestamp: - source for meter with prefix 's' - resource: - row_key: uuid of resource - Column Families: f: contains the following qualifiers: - resource_metadata: raw metadata for corresponding resource - project_id: - resource_id: - user_id: - flattened metadata with prefix r_metadata. e.g.:: f:r_metadata.display_name or f:r_metadata.tag - sources for all corresponding meters with prefix 's' - all meters for this resource in format: .. code-block:: python "%s+%s+%s!%s!%s" % (rts, source, counter_name, counter_type, counter_unit) - alarm: - row_key: uuid of alarm - Column Families: f: contains the raw incoming alarm data - alarm_h: - row_key: uuid of alarm + "_" + reversed timestamp - Column Families: f: raw incoming alarm_history data. Timestamp becomes now() if not determined - events: - row_key: timestamp of event's generation + uuid of event in format: "%s+%s" % (ts, Event.message_id) - Column Families: f: contains the following qualifiers: - event_type: description of event's type - timestamp: time stamp of event generation - all traits for this event in format: .. code-block:: python "%s+%s" % (trait_name, trait_type) """ CAPABILITIES = utils.update_nested(base.Connection.CAPABILITIES, AVAILABLE_CAPABILITIES) STORAGE_CAPABILITIES = utils.update_nested( base.Connection.STORAGE_CAPABILITIES, AVAILABLE_STORAGE_CAPABILITIES, ) _memory_instance = None RESOURCE_TABLE = "resource" METER_TABLE = "meter" ALARM_TABLE = "alarm" ALARM_HISTORY_TABLE = "alarm_h" EVENT_TABLE = "event" def __init__(self, url): """Hbase Connection Initialization.""" opts = self._parse_connection_url(url) if opts['host'] == '__test__': url = os.environ.get('CEILOMETER_TEST_HBASE_URL') if url: # Reparse URL, but from the env variable now opts = self._parse_connection_url(url) self.conn_pool = self._get_connection_pool(opts) else: # This is a in-memory usage for unit tests if Connection._memory_instance is None: LOG.debug(_('Creating a new in-memory HBase ' 'Connection object')) Connection._memory_instance = \ hbase_inmemory.MConnectionPool() self.conn_pool = Connection._memory_instance else: self.conn_pool = self._get_connection_pool(opts) def upgrade(self): with self.conn_pool.connection() as conn: conn.create_table(self.RESOURCE_TABLE, {'f': dict(max_versions=1)}) conn.create_table(self.METER_TABLE, {'f': dict(max_versions=1)}) conn.create_table(self.ALARM_TABLE, {'f': dict()}) conn.create_table(self.ALARM_HISTORY_TABLE, {'f': dict()}) conn.create_table(self.EVENT_TABLE, {'f': dict(max_versions=1)}) def clear(self): LOG.debug(_('Dropping HBase schema...')) with self.conn_pool.connection() as conn: for table in [self.RESOURCE_TABLE, self.METER_TABLE, self.ALARM_TABLE, self.ALARM_HISTORY_TABLE, self.EVENT_TABLE]: try: conn.disable_table(table) except Exception: LOG.debug(_('Cannot disable table but ignoring error')) try: conn.delete_table(table) except Exception: LOG.debug(_('Cannot delete table but ignoring error')) @staticmethod def _get_connection_pool(conf): """Return a connection pool to the database. .. note:: The tests use a subclass to override this and return an in-memory connection pool. """ LOG.debug(_('connecting to HBase on %(host)s:%(port)s') % ( {'host': conf['host'], 'port': conf['port']})) return happybase.ConnectionPool(size=100, host=conf['host'], port=conf['port'], table_prefix=conf['table_prefix']) @staticmethod def _parse_connection_url(url): """Parse connection parameters from a database url. .. note:: HBase Thrift does not support authentication and there is no database name, so we are not looking for these in the url. """ opts = {} result = network_utils.urlsplit(url) opts['table_prefix'] = urlparse.parse_qs( result.query).get('table_prefix', [None])[0] opts['dbtype'] = result.scheme if ':' in result.netloc: opts['host'], port = result.netloc.split(':') else: opts['host'] = result.netloc port = 9090 opts['port'] = port and int(port) or 9090 return opts def update_alarm(self, alarm): """Create an alarm. :param alarm: The alarm to create. It is Alarm object, so we need to call as_dict() """ _id = alarm.alarm_id alarm_to_store = hbase_utils.serialize_entry(alarm.as_dict()) with self.conn_pool.connection() as conn: alarm_table = conn.table(self.ALARM_TABLE) alarm_table.put(_id, alarm_to_store) stored_alarm = hbase_utils.deserialize_entry( alarm_table.row(_id))[0] return alarm_models.Alarm(**stored_alarm) create_alarm = update_alarm def delete_alarm(self, alarm_id): with self.conn_pool.connection() as conn: alarm_table = conn.table(self.ALARM_TABLE) alarm_table.delete(alarm_id) def get_alarms(self, name=None, user=None, state=None, meter=None, project=None, enabled=None, alarm_id=None, pagination=None): if pagination: raise NotImplementedError('Pagination not implemented') if meter: raise NotImplementedError('Filter by meter not implemented') q = hbase_utils.make_query(alarm_id=alarm_id, name=name, enabled=enabled, user_id=user, project_id=project, state=state) with self.conn_pool.connection() as conn: alarm_table = conn.table(self.ALARM_TABLE) gen = alarm_table.scan(filter=q) for ignored, data in gen: stored_alarm = hbase_utils.deserialize_entry(data)[0] yield alarm_models.Alarm(**stored_alarm) def get_alarm_changes(self, alarm_id, on_behalf_of, user=None, project=None, type=None, start_timestamp=None, start_timestamp_op=None, end_timestamp=None, end_timestamp_op=None): q = hbase_utils.make_query(alarm_id=alarm_id, on_behalf_of=on_behalf_of, type=type, user_id=user, project_id=project) start_row, end_row = hbase_utils.make_timestamp_query( hbase_utils.make_general_rowkey_scan, start=start_timestamp, start_op=start_timestamp_op, end=end_timestamp, end_op=end_timestamp_op, bounds_only=True, some_id=alarm_id) with self.conn_pool.connection() as conn: alarm_history_table = conn.table(self.ALARM_HISTORY_TABLE) gen = alarm_history_table.scan(filter=q, row_start=start_row, row_stop=end_row) for ignored, data in gen: stored_entry = hbase_utils.deserialize_entry(data)[0] yield alarm_models.AlarmChange(**stored_entry) def record_alarm_change(self, alarm_change): """Record alarm change event. """ alarm_change_dict = hbase_utils.serialize_entry(alarm_change) ts = alarm_change.get('timestamp') or datetime.datetime.now() rts = hbase_utils.timestamp(ts) with self.conn_pool.connection() as conn: alarm_history_table = conn.table(self.ALARM_HISTORY_TABLE) alarm_history_table.put(alarm_change.get('alarm_id') + "_" + str(rts), alarm_change_dict) def record_metering_data(self, data): """Write the data to the backend storage system. :param data: a dictionary such as returned by ceilometer.meter.meter_message_from_counter """ with self.conn_pool.connection() as conn: resource_table = conn.table(self.RESOURCE_TABLE) meter_table = conn.table(self.METER_TABLE) resource_metadata = data.get('resource_metadata', {}) # Determine the name of new meter rts = hbase_utils.timestamp(data['timestamp']) new_meter = hbase_utils.format_meter_reference( data['counter_name'], data['counter_type'], data['counter_unit'], rts, data['source']) # TODO(nprivalova): try not to store resource_id resource = hbase_utils.serialize_entry(**{ 'source': data['source'], 'meter': {new_meter: data['timestamp']}, 'resource_metadata': resource_metadata, 'resource_id': data['resource_id'], 'project_id': data['project_id'], 'user_id': data['user_id']}) # Here we put entry in HBase with our own timestamp. This is needed # when samples arrive out-of-order # If we use timestamp=data['timestamp'] the newest data will be # automatically 'on the top'. It is needed to keep metadata # up-to-date: metadata from newest samples is considered as actual. ts = int(time.mktime(data['timestamp'].timetuple()) * 1000) resource_table.put(data['resource_id'], resource, ts) # TODO(nprivalova): improve uniqueness # Rowkey consists of reversed timestamp, meter and an md5 of # user+resource+project for purposes of uniqueness m = hashlib.md5() m.update("%s%s%s" % (data['user_id'], data['resource_id'], data['project_id'])) row = "%s_%d_%s" % (data['counter_name'], rts, m.hexdigest()) record = hbase_utils.serialize_entry( data, **{'source': data['source'], 'rts': rts, 'message': data, 'recorded_at': timeutils.utcnow()}) meter_table.put(row, record) def get_resources(self, user=None, project=None, source=None, start_timestamp=None, start_timestamp_op=None, end_timestamp=None, end_timestamp_op=None, metaquery=None, resource=None, pagination=None): """Return an iterable of models.Resource instances :param user: Optional ID for user that owns the resource. :param project: Optional ID for project that owns the resource. :param source: Optional source filter. :param start_timestamp: Optional modified timestamp start range. :param start_timestamp_op: Optional start time operator, like ge, gt. :param end_timestamp: Optional modified timestamp end range. :param end_timestamp_op: Optional end time operator, like lt, le. :param metaquery: Optional dict with metadata to match on. :param resource: Optional resource filter. :param pagination: Optional pagination query. """ if pagination: raise NotImplementedError('Pagination not implemented') q = hbase_utils.make_query(metaquery=metaquery, user_id=user, project_id=project, resource_id=resource, source=source) q = hbase_utils.make_meter_query_for_resource(start_timestamp, start_timestamp_op, end_timestamp, end_timestamp_op, source, q) with self.conn_pool.connection() as conn: resource_table = conn.table(self.RESOURCE_TABLE) LOG.debug(_("Query Resource table: %s") % q) for resource_id, data in resource_table.scan(filter=q): f_res, sources, meters, md = hbase_utils.deserialize_entry( data) # Unfortunately happybase doesn't keep ordered result from # HBase. So that's why it's needed to find min and max # manually first_ts = min(meters, key=operator.itemgetter(1))[1] last_ts = max(meters, key=operator.itemgetter(1))[1] source = meters[0][0].split('+')[1] # If we use QualifierFilter then HBase returnes only # qualifiers filtered by. It will not return the whole entry. # That's why if we need to ask additional qualifiers manually. if 'project_id' not in f_res and 'user_id' not in f_res: row = resource_table.row( resource_id, columns=['f:project_id', 'f:user_id', 'f:resource_metadata']) f_res, _s, _m, md = hbase_utils.deserialize_entry(row) yield models.Resource( resource_id=resource_id, first_sample_timestamp=first_ts, last_sample_timestamp=last_ts, project_id=f_res['project_id'], source=source, user_id=f_res['user_id'], metadata=md) def get_meters(self, user=None, project=None, resource=None, source=None, metaquery=None, pagination=None): """Return an iterable of models.Meter instances :param user: Optional ID for user that owns the resource. :param project: Optional ID for project that owns the resource. :param resource: Optional resource filter. :param source: Optional source filter. :param metaquery: Optional dict with metadata to match on. :param pagination: Optional pagination query. """ metaquery = metaquery or {} if pagination: raise NotImplementedError(_('Pagination not implemented')) with self.conn_pool.connection() as conn: resource_table = conn.table(self.RESOURCE_TABLE) q = hbase_utils.make_query(metaquery=metaquery, user_id=user, project_id=project, resource_id=resource, source=source) LOG.debug(_("Query Resource table: %s") % q) gen = resource_table.scan(filter=q) # We need result set to be sure that user doesn't receive several # same meters. Please see bug # https://bugs.launchpad.net/ceilometer/+bug/1301371 result = set() for ignored, data in gen: flatten_result, s, meters, md = hbase_utils.deserialize_entry( data) for m in meters: _m_rts, m_source, m_raw = m[0].split("+") name, type, unit = m_raw.split('!') meter_dict = {'name': name, 'type': type, 'unit': unit, 'resource_id': flatten_result['resource_id'], 'project_id': flatten_result['project_id'], 'user_id': flatten_result['user_id']} frozen_meter = frozenset(meter_dict.items()) if frozen_meter in result: continue result.add(frozen_meter) meter_dict.update({'source': m_source if m_source else None}) yield models.Meter(**meter_dict) def get_samples(self, sample_filter, limit=None): """Return an iterable of models.Sample instances. :param sample_filter: Filter. :param limit: Maximum number of results to return. """ if limit == 0: return with self.conn_pool.connection() as conn: meter_table = conn.table(self.METER_TABLE) q, start, stop, columns = \ hbase_utils.make_sample_query_from_filter( sample_filter, require_meter=False) LOG.debug(_("Query Meter Table: %s") % q) gen = meter_table.scan(filter=q, row_start=start, row_stop=stop, limit=limit) for ignored, meter in gen: d_meter = hbase_utils.deserialize_entry(meter)[0] d_meter['message']['recorded_at'] = d_meter['recorded_at'] yield models.Sample(**d_meter['message']) @staticmethod def _update_meter_stats(stat, meter): """Do the stats calculation on a requested time bucket in stats dict :param stats: dict where aggregated stats are kept :param index: time bucket index in stats :param meter: meter record as returned from HBase :param start_time: query start time :param period: length of the time bucket """ vol = meter['counter_volume'] ts = meter['timestamp'] stat.unit = meter['counter_unit'] stat.min = min(vol, stat.min or vol) stat.max = max(vol, stat.max) stat.sum = vol + (stat.sum or 0) stat.count += 1 stat.avg = (stat.sum / float(stat.count)) stat.duration_start = min(ts, stat.duration_start or ts) stat.duration_end = max(ts, stat.duration_end or ts) stat.duration = \ timeutils.delta_seconds(stat.duration_start, stat.duration_end) def get_meter_statistics(self, sample_filter, period=None, groupby=None, aggregate=None): """Return an iterable of models.Statistics instances containing meter statistics described by the query parameters. The filter must have a meter value set. .. note:: Due to HBase limitations the aggregations are implemented in the driver itself, therefore this method will be quite slow because of all the Thrift traffic it is going to create. """ if groupby: raise NotImplementedError("Group by not implemented.") if aggregate: raise NotImplementedError('Selectable aggregates not implemented') with self.conn_pool.connection() as conn: meter_table = conn.table(self.METER_TABLE) q, start, stop, columns = \ hbase_utils.make_sample_query_from_filter(sample_filter) # These fields are used in statistics' calculating columns.extend(['f:timestamp', 'f:counter_volume', 'f:counter_unit']) meters = map(hbase_utils.deserialize_entry, list(meter for (ignored, meter) in meter_table.scan( filter=q, row_start=start, row_stop=stop, columns=columns))) if sample_filter.start: start_time = sample_filter.start elif meters: start_time = meters[-1][0]['timestamp'] else: start_time = None if sample_filter.end: end_time = sample_filter.end elif meters: end_time = meters[0][0]['timestamp'] else: end_time = None results = [] if not period: period = 0 period_start = start_time period_end = end_time # As our HBase meters are stored as newest-first, we need to iterate # in the reverse order for meter in meters[::-1]: ts = meter[0]['timestamp'] if period: offset = int(timeutils.delta_seconds( start_time, ts) / period) * period period_start = start_time + datetime.timedelta(0, offset) if not results or not results[-1].period_start == \ period_start: if period: period_end = period_start + datetime.timedelta( 0, period) results.append( models.Statistics(unit='', count=0, min=0, max=0, avg=0, sum=0, period=period, period_start=period_start, period_end=period_end, duration=None, duration_start=None, duration_end=None, groupby=None) ) self._update_meter_stats(results[-1], meter[0]) return results def record_events(self, event_models): """Write the events to Hbase. :param event_models: a list of models.Event objects. :return problem_events: a list of events that could not be saved in a (reason, event) tuple. From the reasons that are enumerated in storage.models.Event only the UNKNOWN_PROBLEM is applicable here. """ problem_events = [] with self.conn_pool.connection() as conn: events_table = conn.table(self.EVENT_TABLE) for event_model in event_models: # Row key consists of timestamp and message_id from # models.Event or purposes of storage event sorted by # timestamp in the database. ts = event_model.generated row = "%d_%s" % (hbase_utils.timestamp(ts, reverse=False), event_model.message_id) event_type = event_model.event_type traits = {} if event_model.traits: for trait in event_model.traits: key = "%s+%d" % (trait.name, trait.dtype) traits[key] = trait.value record = hbase_utils.serialize_entry(traits, event_type=event_type, timestamp=ts) try: events_table.put(row, record) except Exception as ex: LOG.debug(_("Failed to record event: %s") % ex) problem_events.append((models.Event.UNKNOWN_PROBLEM, event_model)) return problem_events def get_events(self, event_filter): """Return an iterable of models.Event objects. :param event_filter: storage.EventFilter object, consists of filters for events that are stored in database. """ q, start, stop = hbase_utils.make_events_query_from_filter( event_filter) with self.conn_pool.connection() as conn: events_table = conn.table(self.EVENT_TABLE) gen = events_table.scan(filter=q, row_start=start, row_stop=stop) events = [] for event_id, data in gen: traits = [] events_dict = hbase_utils.deserialize_entry(data)[0] for key, value in events_dict.items(): if (not key.startswith('event_type') and not key.startswith('timestamp')): trait_name, trait_dtype = key.rsplit('+', 1) traits.append(models.Trait(name=trait_name, dtype=int(trait_dtype), value=value)) ts, mess = event_id.split('_', 1) events.append(models.Event( message_id=mess, event_type=events_dict['event_type'], generated=events_dict['timestamp'], traits=sorted(traits, key=(lambda item: getattr(item, 'dtype'))) )) return events def get_event_types(self): """Return all event types as an iterable of strings.""" with self.conn_pool.connection() as conn: events_table = conn.table(self.EVENT_TABLE) gen = events_table.scan() event_types = set() for event_id, data in gen: events_dict = hbase_utils.deserialize_entry(data)[0] for key, value in events_dict.items(): if key.startswith('event_type'): if value not in event_types: event_types.add(value) yield value def get_trait_types(self, event_type): """Return a dictionary containing the name and data type of the trait. Only trait types for the provided event_type are returned. :param event_type: the type of the Event """ q = hbase_utils.make_query(event_type=event_type) trait_types = set() with self.conn_pool.connection() as conn: events_table = conn.table(self.EVENT_TABLE) gen = events_table.scan(filter=q) for event_id, data in gen: events_dict = hbase_utils.deserialize_entry(data)[0] for key, value in events_dict.items(): if (not key.startswith('event_type') and not key.startswith('timestamp')): name, tt_number = key.rsplit('+', 1) if name not in trait_types: # Here we check that our method return only unique # trait types, for ex. if it is found the same trait # types in different events with equal event_type, # method will return only one trait type. It is # proposed that certain trait name could have only one # trait type. trait_types.add(name) data_type = models.Trait.type_names[int(tt_number)] yield {'name': name, 'data_type': data_type} def get_traits(self, event_type, trait_type=None): """Return all trait instances associated with an event_type. If trait_type is specified, only return instances of that trait type. :param event_type: the type of the Event to filter by :param trait_type: the name of the Trait to filter by """ q = hbase_utils.make_query(event_type=event_type, trait_type=trait_type) traits = [] with self.conn_pool.connection() as conn: events_table = conn.table(self.EVENT_TABLE) gen = events_table.scan(filter=q) for event_id, data in gen: events_dict = hbase_utils.deserialize_entry(data)[0] for key, value in events_dict.items(): if (not key.startswith('event_type') and not key.startswith('timestamp')): name, tt_number = key.rsplit('+', 1) traits.append(models.Trait(name=name, dtype=int(tt_number), value=value)) for trait in sorted(traits, key=operator.attrgetter('dtype')): yield trait