# # Author: John Tran # Julien Danjou # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. """SQLAlchemy storage backend.""" from __future__ import absolute_import import datetime import hashlib import os from oslo.config import cfg from oslo.db import exception as dbexc from oslo.db.sqlalchemy import session as db_session from oslo.serialization import jsonutils from oslo.utils import timeutils import six import sqlalchemy as sa from sqlalchemy import and_ from sqlalchemy import distinct from sqlalchemy import func from sqlalchemy.orm import aliased import ceilometer from ceilometer.openstack.common.gettextutils import _ from ceilometer.openstack.common import log from ceilometer import storage from ceilometer.storage import base from ceilometer.storage import models as api_models from ceilometer.storage.sqlalchemy import models from ceilometer.storage.sqlalchemy import utils as sql_utils from ceilometer import utils LOG = log.getLogger(__name__) STANDARD_AGGREGATES = dict( avg=func.avg(models.Sample.volume).label('avg'), sum=func.sum(models.Sample.volume).label('sum'), min=func.min(models.Sample.volume).label('min'), max=func.max(models.Sample.volume).label('max'), count=func.count(models.Sample.volume).label('count') ) UNPARAMETERIZED_AGGREGATES = dict( stddev=func.stddev_pop(models.Sample.volume).label('stddev') ) PARAMETERIZED_AGGREGATES = dict( validate=dict( cardinality=lambda p: p in ['resource_id', 'user_id', 'project_id'] ), compute=dict( cardinality=lambda p: func.count( distinct(getattr(models.Resource, p)) ).label('cardinality/%s' % p) ) ) AVAILABLE_CAPABILITIES = { 'meters': {'query': {'simple': True, 'metadata': True}}, 'resources': {'query': {'simple': True, 'metadata': True}}, 'samples': {'pagination': True, 'groupby': True, 'query': {'simple': True, 'metadata': True, 'complex': True}}, 'statistics': {'groupby': True, 'query': {'simple': True, 'metadata': True}, 'aggregation': {'standard': True, 'selectable': { 'max': True, 'min': True, 'sum': True, 'avg': True, 'count': True, 'stddev': True, 'cardinality': True}} }, } AVAILABLE_STORAGE_CAPABILITIES = { 'storage': {'production_ready': True}, } def apply_metaquery_filter(session, query, metaquery): """Apply provided metaquery filter to existing query. :param session: session used for original query :param query: Query instance :param metaquery: dict with metadata to match on. """ for k, value in six.iteritems(metaquery): key = k[9:] # strip out 'metadata.' prefix try: _model = sql_utils.META_TYPE_MAP[type(value)] except KeyError: raise ceilometer.NotImplementedError( 'Query on %(key)s is of %(value)s ' 'type and is not supported' % {"key": k, "value": type(value)}) else: meta_alias = aliased(_model) on_clause = and_(models.Resource.internal_id == meta_alias.id, meta_alias.meta_key == key) # outer join is needed to support metaquery # with or operator on non existent metadata field # see: test_query_non_existing_metadata_with_result # test case. query = query.outerjoin(meta_alias, on_clause) query = query.filter(meta_alias.value == value) return query def make_query_from_filter(session, query, sample_filter, require_meter=True): """Return a query dictionary based on the settings in the filter. :param session: session used for original query :param query: Query instance :param sample_filter: SampleFilter instance :param require_meter: If true and the filter does not have a meter, raise an error. """ if sample_filter.meter: query = query.filter(models.Meter.name == sample_filter.meter) elif require_meter: raise RuntimeError('Missing required meter specifier') if sample_filter.source: query = query.filter( models.Resource.source_id == sample_filter.source) if sample_filter.start: ts_start = sample_filter.start if sample_filter.start_timestamp_op == 'gt': query = query.filter(models.Sample.timestamp > ts_start) else: query = query.filter(models.Sample.timestamp >= ts_start) if sample_filter.end: ts_end = sample_filter.end if sample_filter.end_timestamp_op == 'le': query = query.filter(models.Sample.timestamp <= ts_end) else: query = query.filter(models.Sample.timestamp < ts_end) if sample_filter.user: query = query.filter(models.Resource.user_id == sample_filter.user) if sample_filter.project: query = query.filter( models.Resource.project_id == sample_filter.project) if sample_filter.resource: query = query.filter( models.Resource.resource_id == sample_filter.resource) if sample_filter.message_id: query = query.filter( models.Sample.message_id == sample_filter.message_id) if sample_filter.metaquery: query = apply_metaquery_filter(session, query, sample_filter.metaquery) return query class Connection(base.Connection): """Put the data into a SQLAlchemy database. Tables:: - meter - meter definition - { id: meter id name: meter name type: meter type unit: meter unit } - resource - resource definition - { internal_id: resource id resource_id: resource uuid user_id: user uuid project_id: project uuid source_id: source id resource_metadata: metadata dictionary metadata_hash: metadata dictionary hash } - sample - the raw incoming data - { id: sample id meter_id: meter id (->meter.id) resource_id: resource id (->resource.internal_id) volume: sample volume timestamp: datetime recorded_at: datetime message_signature: message signature message_id: message uuid } """ CAPABILITIES = utils.update_nested(base.Connection.CAPABILITIES, AVAILABLE_CAPABILITIES) STORAGE_CAPABILITIES = utils.update_nested( base.Connection.STORAGE_CAPABILITIES, AVAILABLE_STORAGE_CAPABILITIES, ) def __init__(self, url): self._engine_facade = db_session.EngineFacade( url, **dict(cfg.CONF.database.items()) ) def upgrade(self): # NOTE(gordc): to minimise memory, only import migration when needed from oslo.db.sqlalchemy import migration path = os.path.join(os.path.abspath(os.path.dirname(__file__)), 'sqlalchemy', 'migrate_repo') migration.db_sync(self._engine_facade.get_engine(), path) def clear(self): engine = self._engine_facade.get_engine() for table in reversed(models.Base.metadata.sorted_tables): engine.execute(table.delete()) self._engine_facade._session_maker.close_all() engine.dispose() @staticmethod def _create_meter(conn, name, type, unit): # TODO(gordc): implement lru_cache to improve performance try: meter = models.Meter.__table__ trans = conn.begin_nested() if conn.dialect.name == 'sqlite': trans = conn.begin() with trans: meter_row = conn.execute( sa.select([meter.c.id]) .where(sa.and_(meter.c.name == name, meter.c.type == type, meter.c.unit == unit))).first() meter_id = meter_row[0] if meter_row else None if meter_id is None: result = conn.execute(meter.insert(), name=name, type=type, unit=unit) meter_id = result.inserted_primary_key[0] except dbexc.DBDuplicateEntry: # retry function to pick up duplicate committed object meter_id = Connection._create_meter(conn, name, type, unit) return meter_id @staticmethod def _create_resource(conn, res_id, user_id, project_id, source_id, rmeta): # TODO(gordc): implement lru_cache to improve performance try: res = models.Resource.__table__ m_hash = hashlib.md5(jsonutils.dumps(rmeta, sort_keys=True)).hexdigest() trans = conn.begin_nested() if conn.dialect.name == 'sqlite': trans = conn.begin() with trans: res_row = conn.execute( sa.select([res.c.internal_id]) .where(sa.and_(res.c.resource_id == res_id, res.c.user_id == user_id, res.c.project_id == project_id, res.c.source_id == source_id, res.c.metadata_hash == m_hash))).first() internal_id = res_row[0] if res_row else None if internal_id is None: result = conn.execute(res.insert(), resource_id=res_id, user_id=user_id, project_id=project_id, source_id=source_id, resource_metadata=rmeta, metadata_hash=m_hash) internal_id = result.inserted_primary_key[0] if rmeta and isinstance(rmeta, dict): meta_map = {} for key, v in utils.dict_to_keyval(rmeta): try: _model = sql_utils.META_TYPE_MAP[type(v)] if meta_map.get(_model) is None: meta_map[_model] = [] meta_map[_model].append( {'id': internal_id, 'meta_key': key, 'value': v}) except KeyError: LOG.warn(_("Unknown metadata type. Key (%s) " "will not be queryable."), key) for _model in meta_map.keys(): conn.execute(_model.__table__.insert(), meta_map[_model]) except dbexc.DBDuplicateEntry: # retry function to pick up duplicate committed object internal_id = Connection._create_resource( conn, res_id, user_id, project_id, source_id, rmeta) return internal_id def record_metering_data(self, data): """Write the data to the backend storage system. :param data: a dictionary such as returned by ceilometer.meter.meter_message_from_counter """ engine = self._engine_facade.get_engine() with engine.begin() as conn: # Record the raw data for the sample. m_id = self._create_meter(conn, data['counter_name'], data['counter_type'], data['counter_unit']) res_id = self._create_resource(conn, data['resource_id'], data['user_id'], data['project_id'], data['source'], data['resource_metadata']) sample = models.Sample.__table__ conn.execute(sample.insert(), meter_id=m_id, resource_id=res_id, timestamp=data['timestamp'], volume=data['counter_volume'], message_signature=data['message_signature'], message_id=data['message_id']) def clear_expired_metering_data(self, ttl): """Clear expired data from the backend storage system. Clearing occurs according to the time-to-live. :param ttl: Number of seconds to keep records for. """ session = self._engine_facade.get_session() with session.begin(): end = timeutils.utcnow() - datetime.timedelta(seconds=ttl) sample_q = (session.query(models.Sample) .filter(models.Sample.timestamp < end)) sample_subq = sample_q.subquery() for table in [models.MetaText, models.MetaBigInt, models.MetaFloat, models.MetaBool]: (session.query(table) .join(sample_subq, sample_subq.c.id == table.id) .delete()) rows = sample_q.delete() # remove Meter definitions with no matching samples (session.query(models.Meter) .filter(~models.Meter.samples.any()) .delete(synchronize_session='fetch')) (session.query(models.Resource) .filter(~models.Resource.samples.any()) .delete(synchronize_session='fetch')) LOG.info(_("%d samples removed from database"), rows) def get_resources(self, user=None, project=None, source=None, start_timestamp=None, start_timestamp_op=None, end_timestamp=None, end_timestamp_op=None, metaquery=None, resource=None, pagination=None): """Return an iterable of api_models.Resource instances :param user: Optional ID for user that owns the resource. :param project: Optional ID for project that owns the resource. :param source: Optional source filter. :param start_timestamp: Optional modified timestamp start range. :param start_timestamp_op: Optional start time operator, like gt, ge. :param end_timestamp: Optional modified timestamp end range. :param end_timestamp_op: Optional end time operator, like lt, le. :param metaquery: Optional dict with metadata to match on. :param resource: Optional resource filter. :param pagination: Optional pagination query. """ if pagination: raise ceilometer.NotImplementedError('Pagination not implemented') s_filter = storage.SampleFilter(user=user, project=project, source=source, start=start_timestamp, start_timestamp_op=start_timestamp_op, end=end_timestamp, end_timestamp_op=end_timestamp_op, metaquery=metaquery, resource=resource) session = self._engine_facade.get_session() # get list of resource_ids res_q = session.query(distinct(models.Resource.resource_id)).join( models.Sample, models.Sample.resource_id == models.Resource.internal_id) res_q = make_query_from_filter(session, res_q, s_filter, require_meter=False) for res_id in res_q.all(): # get latest Sample max_q = (session.query(models.Sample) .join(models.Resource, models.Resource.internal_id == models.Sample.resource_id) .filter(models.Resource.resource_id == res_id[0])) max_q = make_query_from_filter(session, max_q, s_filter, require_meter=False) max_q = max_q.order_by(models.Sample.timestamp.desc(), models.Sample.id.desc()).limit(1) # get the min timestamp value. min_q = (session.query(models.Sample.timestamp) .join(models.Resource, models.Resource.internal_id == models.Sample.resource_id) .filter(models.Resource.resource_id == res_id[0])) min_q = make_query_from_filter(session, min_q, s_filter, require_meter=False) min_q = min_q.order_by(models.Sample.timestamp.asc()).limit(1) sample = max_q.first() if sample: yield api_models.Resource( resource_id=sample.resource.resource_id, project_id=sample.resource.project_id, first_sample_timestamp=min_q.first().timestamp, last_sample_timestamp=sample.timestamp, source=sample.resource.source_id, user_id=sample.resource.user_id, metadata=sample.resource.resource_metadata ) def get_meters(self, user=None, project=None, resource=None, source=None, metaquery=None, pagination=None): """Return an iterable of api_models.Meter instances :param user: Optional ID for user that owns the resource. :param project: Optional ID for project that owns the resource. :param resource: Optional ID of the resource. :param source: Optional source filter. :param metaquery: Optional dict with metadata to match on. :param pagination: Optional pagination query. """ if pagination: raise ceilometer.NotImplementedError('Pagination not implemented') s_filter = storage.SampleFilter(user=user, project=project, source=source, metaquery=metaquery, resource=resource) # NOTE(gordc): get latest sample of each meter/resource. we do not # filter here as we want to filter only on latest record. session = self._engine_facade.get_session() subq = session.query(func.max(models.Sample.id).label('id')).join( models.Resource, models.Resource.internal_id == models.Sample.resource_id).group_by( models.Sample.meter_id, models.Resource.resource_id) if resource: subq = subq.filter(models.Resource.resource_id == resource) subq = subq.subquery() # get meter details for samples. query_sample = (session.query(models.Sample.meter_id, models.Meter.name, models.Meter.type, models.Meter.unit, models.Resource.resource_id, models.Resource.project_id, models.Resource.source_id, models.Resource.user_id).join( subq, subq.c.id == models.Sample.id) .join(models.Meter, models.Meter.id == models.Sample.meter_id) .join(models.Resource, models.Resource.internal_id == models.Sample.resource_id)) query_sample = make_query_from_filter(session, query_sample, s_filter, require_meter=False) for row in query_sample.all(): yield api_models.Meter( name=row.name, type=row.type, unit=row.unit, resource_id=row.resource_id, project_id=row.project_id, source=row.source_id, user_id=row.user_id) def _retrieve_samples(self, query): samples = query.all() for s in samples: # Remove the id generated by the database when # the sample was inserted. It is an implementation # detail that should not leak outside of the driver. yield api_models.Sample( source=s.source_id, counter_name=s.counter_name, counter_type=s.counter_type, counter_unit=s.counter_unit, counter_volume=s.counter_volume, user_id=s.user_id, project_id=s.project_id, resource_id=s.resource_id, timestamp=s.timestamp, recorded_at=s.recorded_at, resource_metadata=s.resource_metadata, message_id=s.message_id, message_signature=s.message_signature, ) def get_samples(self, sample_filter, limit=None): """Return an iterable of api_models.Samples. :param sample_filter: Filter. :param limit: Maximum number of results to return. """ if limit == 0: return [] session = self._engine_facade.get_session() query = session.query(models.Sample.timestamp, models.Sample.recorded_at, models.Sample.message_id, models.Sample.message_signature, models.Sample.volume.label('counter_volume'), models.Meter.name.label('counter_name'), models.Meter.type.label('counter_type'), models.Meter.unit.label('counter_unit'), models.Resource.source_id, models.Resource.user_id, models.Resource.project_id, models.Resource.resource_metadata, models.Resource.resource_id).join( models.Meter, models.Meter.id == models.Sample.meter_id).join( models.Resource, models.Resource.internal_id == models.Sample.resource_id).order_by( models.Sample.timestamp.desc()) query = make_query_from_filter(session, query, sample_filter, require_meter=False) if limit: query = query.limit(limit) return self._retrieve_samples(query) def query_samples(self, filter_expr=None, orderby=None, limit=None): if limit == 0: return [] session = self._engine_facade.get_session() query = session.query(models.FullSample) transformer = sql_utils.QueryTransformer(models.FullSample, query) if filter_expr is not None: transformer.apply_filter(filter_expr) transformer.apply_options(orderby, limit) return self._retrieve_samples(transformer.get_query()) @staticmethod def _get_aggregate_functions(aggregate): if not aggregate: return [f for f in STANDARD_AGGREGATES.values()] functions = [] for a in aggregate: if a.func in STANDARD_AGGREGATES: functions.append(STANDARD_AGGREGATES[a.func]) elif a.func in UNPARAMETERIZED_AGGREGATES: functions.append(UNPARAMETERIZED_AGGREGATES[a.func]) elif a.func in PARAMETERIZED_AGGREGATES['compute']: validate = PARAMETERIZED_AGGREGATES['validate'].get(a.func) if not (validate and validate(a.param)): raise storage.StorageBadAggregate('Bad aggregate: %s.%s' % (a.func, a.param)) compute = PARAMETERIZED_AGGREGATES['compute'][a.func] functions.append(compute(a.param)) else: raise ceilometer.NotImplementedError( 'Selectable aggregate function %s' ' is not supported' % a.func) return functions def _make_stats_query(self, sample_filter, groupby, aggregate): select = [ func.min(models.Sample.timestamp).label('tsmin'), func.max(models.Sample.timestamp).label('tsmax'), models.Meter.unit ] select.extend(self._get_aggregate_functions(aggregate)) session = self._engine_facade.get_session() if groupby: group_attributes = [getattr(models.Resource, g) for g in groupby] select.extend(group_attributes) query = (session.query(*select) .join(models.Meter, models.Meter.id == models.Sample.meter_id) .join( models.Resource, models.Resource.internal_id == models.Sample.resource_id) .group_by(models.Meter.unit)) if groupby: query = query.group_by(*group_attributes) return make_query_from_filter(session, query, sample_filter) @staticmethod def _stats_result_aggregates(result, aggregate): stats_args = {} if isinstance(result.count, (int, long)): stats_args['count'] = result.count for attr in ['min', 'max', 'sum', 'avg']: if hasattr(result, attr): stats_args[attr] = getattr(result, attr) if aggregate: stats_args['aggregate'] = {} for a in aggregate: key = '%s%s' % (a.func, '/%s' % a.param if a.param else '') stats_args['aggregate'][key] = getattr(result, key) return stats_args @staticmethod def _stats_result_to_model(result, period, period_start, period_end, groupby, aggregate): stats_args = Connection._stats_result_aggregates(result, aggregate) stats_args['unit'] = result.unit duration = (timeutils.delta_seconds(result.tsmin, result.tsmax) if result.tsmin is not None and result.tsmax is not None else None) stats_args['duration'] = duration stats_args['duration_start'] = result.tsmin stats_args['duration_end'] = result.tsmax stats_args['period'] = period stats_args['period_start'] = period_start stats_args['period_end'] = period_end stats_args['groupby'] = (dict( (g, getattr(result, g)) for g in groupby) if groupby else None) return api_models.Statistics(**stats_args) def get_meter_statistics(self, sample_filter, period=None, groupby=None, aggregate=None): """Return an iterable of api_models.Statistics instances. Items are containing meter statistics described by the query parameters. The filter must have a meter value set. """ if groupby: for group in groupby: if group not in ['user_id', 'project_id', 'resource_id']: raise ceilometer.NotImplementedError('Unable to group by ' 'these fields') if not period: for res in self._make_stats_query(sample_filter, groupby, aggregate): if res.count: yield self._stats_result_to_model(res, 0, res.tsmin, res.tsmax, groupby, aggregate) return if not sample_filter.start or not sample_filter.end: res = self._make_stats_query(sample_filter, None, aggregate).first() if not res: # NOTE(liusheng):The 'res' may be NoneType, because no # sample has found with sample filter(s). return query = self._make_stats_query(sample_filter, groupby, aggregate) # HACK(jd) This is an awful method to compute stats by period, but # since we're trying to be SQL agnostic we have to write portable # code, so here it is, admire! We're going to do one request to get # stats by period. We would like to use GROUP BY, but there's no # portable way to manipulate timestamp in SQL, so we can't. for period_start, period_end in base.iter_period( sample_filter.start or res.tsmin, sample_filter.end or res.tsmax, period): q = query.filter(models.Sample.timestamp >= period_start) q = q.filter(models.Sample.timestamp < period_end) for r in q.all(): if r.count: yield self._stats_result_to_model( result=r, period=int(timeutils.delta_seconds(period_start, period_end)), period_start=period_start, period_end=period_end, groupby=groupby, aggregate=aggregate )