# -*- encoding: utf-8 -*- # # Author: John Tran # Julien Danjou # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. """SQLAlchemy storage backend.""" from __future__ import absolute_import import datetime import eventlet import operator import os import types from sqlalchemy import and_ from sqlalchemy import desc from sqlalchemy import func from sqlalchemy.orm import aliased from sqlalchemy import pool from ceilometer.openstack.common.db import exception as dbexc import ceilometer.openstack.common.db.sqlalchemy.session as sqlalchemy_session from ceilometer.openstack.common.gettextutils import _ # noqa from ceilometer.openstack.common import log from ceilometer.openstack.common import timeutils from ceilometer.storage import base from ceilometer.storage import models as api_models from ceilometer.storage.sqlalchemy import migration from ceilometer.storage.sqlalchemy import models from ceilometer import utils LOG = log.getLogger(__name__) class SQLAlchemyStorage(base.StorageEngine): """Put the data into a SQLAlchemy database. Tables:: - user - { id: user uuid } - source - { id: source id } - project - { id: project uuid } - meter - the raw incoming data - { id: meter id counter_name: counter name user_id: user uuid (->user.id) project_id: project uuid (->project.id) resource_id: resource uuid (->resource.id) resource_metadata: metadata dictionaries counter_type: counter type counter_unit: counter unit counter_volume: counter volume timestamp: datetime message_signature: message signature message_id: message uuid } - resource - the metadata for resources - { id: resource uuid resource_metadata: metadata dictionaries project_id: project uuid (->project.id) user_id: user uuid (->user.id) } - sourceassoc - the relationships - { meter_id: meter id (->meter.id) project_id: project uuid (->project.id) resource_id: resource uuid (->resource.id) user_id: user uuid (->user.id) source_id: source id (->source.id) } """ @staticmethod def get_connection(conf): """Return a Connection instance based on the configuration settings. """ return Connection(conf) META_TYPE_MAP = {bool: models.MetaBool, str: models.MetaText, unicode: models.MetaText, types.NoneType: models.MetaText, int: models.MetaBigInt, long: models.MetaBigInt, float: models.MetaFloat} def apply_metaquery_filter(session, query, metaquery): """Apply provided metaquery filter to existing query. :param session: session used for original query :param query: Query instance :param metaquery: dict with metadata to match on. """ for k, v in metaquery.iteritems(): key = k[9:] # strip out 'metadata.' prefix try: _model = META_TYPE_MAP[type(v)] except KeyError: raise NotImplementedError(_('Query on %(key)s is of %(value)s ' 'type and is not supported') % {"key": k, "value": type(v)}) else: meta_q = session.query(_model).\ filter(and_(_model.meta_key == key, _model.value == v)).subquery() query = query.filter_by(id=meta_q.c.id) return query def make_query_from_filter(session, query, sample_filter, require_meter=True): """Return a query dictionary based on the settings in the filter. :param filter: SampleFilter instance :param require_meter: If true and the filter does not have a meter, raise an error. """ if sample_filter.meter: query = query.filter(models.Meter.counter_name == sample_filter.meter) elif require_meter: raise RuntimeError(_('Missing required meter specifier')) if sample_filter.source: query = query.filter(models.Meter.sources.any(id=sample_filter.source)) if sample_filter.start: ts_start = sample_filter.start if sample_filter.start_timestamp_op == 'gt': query = query.filter(models.Meter.timestamp > ts_start) else: query = query.filter(models.Meter.timestamp >= ts_start) if sample_filter.end: ts_end = sample_filter.end if sample_filter.end_timestamp_op == 'le': query = query.filter(models.Meter.timestamp <= ts_end) else: query = query.filter(models.Meter.timestamp < ts_end) if sample_filter.user: query = query.filter_by(user_id=sample_filter.user) if sample_filter.project: query = query.filter_by(project_id=sample_filter.project) if sample_filter.resource: query = query.filter_by(resource_id=sample_filter.resource) if sample_filter.metaquery: query = apply_metaquery_filter(session, query, sample_filter.metaquery) return query class Connection(base.Connection): """SqlAlchemy connection.""" def __init__(self, conf): url = conf.database.connection if url == 'sqlite://': conf.database.connection = \ os.environ.get('CEILOMETER_TEST_SQL_URL', url) session = sqlalchemy_session.get_session() engine = session.get_bind() if isinstance(engine.pool, pool.QueuePool): poolsize = engine.pool.size() + engine.pool._max_overflow self.pool = eventlet.GreenPool(poolsize) else: self.pool = None def upgrade(self): session = sqlalchemy_session.get_session() migration.db_sync(session.get_bind()) def clear(self): session = sqlalchemy_session.get_session() engine = session.get_bind() for table in reversed(models.Base.metadata.sorted_tables): engine.execute(table.delete()) @staticmethod def _create_or_update(session, model_class, _id, source=None, **kwargs): if not _id: return None try: # create a nested session for the case of two call of # record_metering_data run in parallel to not fail the # record of this sample # (except for sqlite, that doesn't support nested # transaction and doesn't have concurrency problem) nested = session.connection().dialect.name != 'sqlite' # raise dbexc.DBDuplicateEntry manually for sqlite # to not break the current session if not nested and session.query(model_class).get(str(_id)): raise dbexc.DBDuplicateEntry() with session.begin(nested=nested, subtransactions=not nested): obj = model_class(id=str(_id)) session.add(obj) except dbexc.DBDuplicateEntry: # requery the object from the db if this is an other # parallel/previous call of record_metering_data that # have successfully created this object obj = session.query(model_class).get(str(_id)) # update the object if source and not filter(lambda x: x.id == source.id, obj.sources): obj.sources.append(source) for k in kwargs: setattr(obj, k, kwargs[k]) return obj def record_metering_data(self, data): if self.pool: if self.pool.waiting() > 0: LOG.warn(_("Sqlalchemy connection pool is full, " "perhaps pool_size should be increased")) self.pool.spawn(self._real_record_metering_data, data) else: self._real_record_metering_data(data) @classmethod def _real_record_metering_data(cls, data): """Write the data to the backend storage system. :param data: a dictionary such as returned by ceilometer.meter.meter_message_from_counter """ session = sqlalchemy_session.get_session() with session.begin(): # Record the updated resource metadata rmetadata = data['resource_metadata'] source = cls._create_or_update(session, models.Source, data['source']) user = cls._create_or_update(session, models.User, data['user_id'], source) project = cls._create_or_update(session, models.Project, data['project_id'], source) resource = cls._create_or_update(session, models.Resource, data['resource_id'], source, user=user, project=project, resource_metadata=rmetadata) # Record the raw data for the meter. meter = models.Meter(counter_type=data['counter_type'], counter_unit=data['counter_unit'], counter_name=data['counter_name'], resource=resource) session.add(meter) if not filter(lambda x: x.id == source.id, meter.sources): meter.sources.append(source) meter.project = project meter.user = user meter.timestamp = data['timestamp'] meter.resource_metadata = rmetadata meter.counter_volume = data['counter_volume'] meter.message_signature = data['message_signature'] meter.message_id = data['message_id'] session.flush() if rmetadata: if isinstance(rmetadata, dict): for key, v in utils.dict_to_keyval(rmetadata): try: _model = META_TYPE_MAP[type(v)] except KeyError: LOG.warn(_("Unknown metadata type. Key (%s) will " "not be queryable."), key) else: session.add(_model(id=meter.id, meta_key=key, value=v)) @staticmethod def clear_expired_metering_data(ttl): """Clear expired data from the backend storage system according to the time-to-live. :param ttl: Number of seconds to keep records for. """ session = sqlalchemy_session.get_session() with session.begin(): end = timeutils.utcnow() - datetime.timedelta(seconds=ttl) meter_query = session.query(models.Meter)\ .filter(models.Meter.timestamp < end) for meter_obj in meter_query.all(): session.delete(meter_obj) query = session.query(models.User).filter( ~models.User.id.in_(session.query(models.Meter.user_id) .group_by(models.Meter.user_id)), ~models.User.id.in_(session.query(models.Alarm.user_id) .group_by(models.Alarm.user_id)), ~models.User.id.in_(session.query(models.AlarmChange.user_id) .group_by(models.AlarmChange.user_id)) ) for user_obj in query.all(): session.delete(user_obj) query = session.query(models.Project)\ .filter(~models.Project.id.in_( session.query(models.Meter.project_id) .group_by(models.Meter.project_id)), ~models.Project.id.in_( session.query(models.Alarm.project_id) .group_by(models.Alarm.project_id)), ~models.Project.id.in_( session.query(models.AlarmChange.project_id) .group_by(models.AlarmChange.project_id)), ~models.Project.id.in_( session.query(models.AlarmChange.on_behalf_of) .group_by(models.AlarmChange.on_behalf_of)) ) for project_obj in query.all(): session.delete(project_obj) query = session.query(models.Resource)\ .filter(~models.Resource.id.in_( session.query(models.Meter.resource_id).group_by( models.Meter.resource_id))) for res_obj in query.all(): session.delete(res_obj) @staticmethod def get_users(source=None): """Return an iterable of user id strings. :param source: Optional source filter. """ session = sqlalchemy_session.get_session() query = session.query(models.User.id) if source is not None: query = query.filter(models.User.sources.any(id=source)) return (x[0] for x in query.all()) @staticmethod def get_projects(source=None): """Return an iterable of project id strings. :param source: Optional source filter. """ session = sqlalchemy_session.get_session() query = session.query(models.Project.id) if source: query = query.filter(models.Project.sources.any(id=source)) return (x[0] for x in query.all()) @staticmethod def get_resources(user=None, project=None, source=None, start_timestamp=None, start_timestamp_op=None, end_timestamp=None, end_timestamp_op=None, metaquery={}, resource=None, pagination=None): """Return an iterable of api_models.Resource instances :param user: Optional ID for user that owns the resource. :param project: Optional ID for project that owns the resource. :param source: Optional source filter. :param start_timestamp: Optional modified timestamp start range. :param start_timestamp_op: Optonal start time operator, like gt, ge. :param end_timestamp: Optional modified timestamp end range. :param end_timestamp_op: Optional end time operator, like lt, le. :param metaquery: Optional dict with metadata to match on. :param resource: Optional resource filter. :param pagination: Optional pagination query. """ # We probably want to raise these early, since we don't know from here # if they will be handled. We don't want extra wait or work for it to # just fail. if pagination: raise NotImplementedError(_('Pagination not implemented')) # (thomasm) We need to get the max timestamp first, since that's the # most accurate. We also need to filter down in the subquery to # constrain what we have to JOIN on later. session = sqlalchemy_session.get_session() ts_subquery = session.query( models.Meter.resource_id, func.max(models.Meter.timestamp).label("max_ts"), func.min(models.Meter.timestamp).label("min_ts") ).group_by(models.Meter.resource_id) # Here are the basic 'eq' operation filters for the sample data. for column, value in [(models.Meter.resource_id, resource), (models.Meter.user_id, user), (models.Meter.project_id, project)]: if value: ts_subquery = ts_subquery.filter(column == value) if source: ts_subquery = ts_subquery.filter( models.Meter.sources.any(id=source)) if metaquery: ts_subquery = apply_metaquery_filter(session, ts_subquery, metaquery) # Here we limit the samples being used to a specific time period, # if requested. if start_timestamp: if start_timestamp_op == 'gt': ts_subquery = ts_subquery.filter( models.Meter.timestamp > start_timestamp) else: ts_subquery = ts_subquery.filter( models.Meter.timestamp >= start_timestamp) if end_timestamp: if end_timestamp_op == 'le': ts_subquery = ts_subquery.filter( models.Meter.timestamp <= end_timestamp) else: ts_subquery = ts_subquery.filter( models.Meter.timestamp < end_timestamp) ts_subquery = ts_subquery.subquery() # Now we need to get the max Meter.id out of the leftover results, to # break any ties. agg_subquery = session.query( func.max(models.Meter.id).label("max_id"), ts_subquery ).filter( models.Meter.resource_id == ts_subquery.c.resource_id, models.Meter.timestamp == ts_subquery.c.max_ts ).group_by( ts_subquery.c.resource_id, ts_subquery.c.max_ts, ts_subquery.c.min_ts ).subquery() query = session.query( models.Meter, agg_subquery.c.min_ts, agg_subquery.c.max_ts ).filter( models.Meter.id == agg_subquery.c.max_id ) for meter, first_ts, last_ts in query.all(): yield api_models.Resource( resource_id=meter.resource_id, project_id=meter.project_id, first_sample_timestamp=first_ts, last_sample_timestamp=last_ts, source=meter.sources[0].id, user_id=meter.user_id, metadata=meter.resource_metadata, ) @staticmethod def get_meters(user=None, project=None, resource=None, source=None, metaquery={}, pagination=None): """Return an iterable of api_models.Meter instances :param user: Optional ID for user that owns the resource. :param project: Optional ID for project that owns the resource. :param resource: Optional ID of the resource. :param source: Optional source filter. :param metaquery: Optional dict with metadata to match on. :param pagination: Optional pagination query. """ if pagination: raise NotImplementedError(_('Pagination not implemented')) session = sqlalchemy_session.get_session() # Meter table will store large records and join with resource # will be very slow. # subquery_meter is used to reduce meter records # by selecting a record for each (resource_id, counter_name). # max() is used to choice a meter record, so the latest record # is selected for each (resource_id, counter_name). # subquery_meter = session.query(func.max(models.Meter.id).label('id'))\ .group_by(models.Meter.resource_id, models.Meter.counter_name).subquery() # The SQL of query_meter is essentially: # # SELECT meter.* FROM meter INNER JOIN # (SELECT max(meter.id) AS id FROM meter # GROUP BY meter.resource_id, meter.counter_name) AS anon_2 # ON meter.id = anon_2.id # query_meter = session.query(models.Meter).\ join(subquery_meter, models.Meter.id == subquery_meter.c.id) if metaquery: query_meter = apply_metaquery_filter(session, query_meter, metaquery) alias_meter = aliased(models.Meter, query_meter.subquery()) query = session.query(models.Resource, alias_meter).join( alias_meter, models.Resource.id == alias_meter.resource_id) if user is not None: query = query.filter(models.Resource.user_id == user) if source is not None: query = query.filter(models.Resource.sources.any(id=source)) if resource: query = query.filter(models.Resource.id == resource) if project is not None: query = query.filter(models.Resource.project_id == project) for resource, meter in query.all(): yield api_models.Meter( name=meter.counter_name, type=meter.counter_type, unit=meter.counter_unit, resource_id=resource.id, project_id=resource.project_id, source=resource.sources[0].id, user_id=resource.user_id) @staticmethod def get_samples(sample_filter, limit=None): """Return an iterable of api_models.Samples. :param sample_filter: Filter. :param limit: Maximum number of results to return. """ if limit == 0: return session = sqlalchemy_session.get_session() query = session.query(models.Meter) query = make_query_from_filter(session, query, sample_filter, require_meter=False) if limit: query = query.limit(limit) samples = query.from_self()\ .order_by(desc(models.Meter.timestamp)).all() for s in samples: # Remove the id generated by the database when # the sample was inserted. It is an implementation # detail that should not leak outside of the driver. yield api_models.Sample( # Replace 'sources' with 'source' to meet the caller's # expectation, Meter.sources contains one and only one # source in the current implementation. source=s.sources[0].id, counter_name=s.counter_name, counter_type=s.counter_type, counter_unit=s.counter_unit, counter_volume=s.counter_volume, user_id=s.user_id, project_id=s.project_id, resource_id=s.resource_id, timestamp=s.timestamp, resource_metadata=s.resource_metadata, message_id=s.message_id, message_signature=s.message_signature, ) @staticmethod def _make_stats_query(sample_filter, groupby): select = [ models.Meter.counter_unit.label('unit'), func.min(models.Meter.timestamp).label('tsmin'), func.max(models.Meter.timestamp).label('tsmax'), func.avg(models.Meter.counter_volume).label('avg'), func.sum(models.Meter.counter_volume).label('sum'), func.min(models.Meter.counter_volume).label('min'), func.max(models.Meter.counter_volume).label('max'), func.count(models.Meter.counter_volume).label('count'), ] session = sqlalchemy_session.get_session() if groupby: group_attributes = [getattr(models.Meter, g) for g in groupby] select.extend(group_attributes) query = session.query(*select) if groupby: query = query.group_by(*group_attributes) return make_query_from_filter(session, query, sample_filter) @staticmethod def _stats_result_to_model(result, period, period_start, period_end, groupby): duration = (timeutils.delta_seconds(result.tsmin, result.tsmax) if result.tsmin is not None and result.tsmax is not None else None) return api_models.Statistics( unit=result.unit, count=int(result.count), min=result.min, max=result.max, avg=result.avg, sum=result.sum, duration_start=result.tsmin, duration_end=result.tsmax, duration=duration, period=period, period_start=period_start, period_end=period_end, groupby=(dict((g, getattr(result, g)) for g in groupby) if groupby else None) ) def get_meter_statistics(self, sample_filter, period=None, groupby=None): """Return an iterable of api_models.Statistics instances containing meter statistics described by the query parameters. The filter must have a meter value set. """ if groupby: for group in groupby: if group not in ['user_id', 'project_id', 'resource_id']: raise NotImplementedError( _("Unable to group by these fields")) if not period: for res in self._make_stats_query(sample_filter, groupby): if res.count: yield self._stats_result_to_model(res, 0, res.tsmin, res.tsmax, groupby) return if not sample_filter.start or not sample_filter.end: res = self._make_stats_query(sample_filter, None).first() query = self._make_stats_query(sample_filter, groupby) # HACK(jd) This is an awful method to compute stats by period, but # since we're trying to be SQL agnostic we have to write portable # code, so here it is, admire! We're going to do one request to get # stats by period. We would like to use GROUP BY, but there's no # portable way to manipulate timestamp in SQL, so we can't. for period_start, period_end in base.iter_period( sample_filter.start or res.tsmin, sample_filter.end or res.tsmax, period): q = query.filter(models.Meter.timestamp >= period_start) q = q.filter(models.Meter.timestamp < period_end) for r in q.all(): if r.count: yield self._stats_result_to_model( result=r, period=int(timeutils.delta_seconds(period_start, period_end)), period_start=period_start, period_end=period_end, groupby=groupby ) @staticmethod def _row_to_alarm_model(row): return api_models.Alarm(alarm_id=row.id, enabled=row.enabled, type=row.type, name=row.name, description=row.description, timestamp=row.timestamp, user_id=row.user_id, project_id=row.project_id, state=row.state, state_timestamp=row.state_timestamp, ok_actions=row.ok_actions, alarm_actions=row.alarm_actions, insufficient_data_actions= row.insufficient_data_actions, rule=row.rule, repeat_actions=row.repeat_actions) def get_alarms(self, name=None, user=None, project=None, enabled=None, alarm_id=None, pagination=None): """Yields a lists of alarms that match filters :param user: Optional ID for user that owns the resource. :param project: Optional ID for project that owns the resource. :param enabled: Optional boolean to list disable alarm. :param alarm_id: Optional alarm_id to return one alarm. :param pagination: Optional pagination query. """ if pagination: raise NotImplementedError(_('Pagination not implemented')) session = sqlalchemy_session.get_session() query = session.query(models.Alarm) if name is not None: query = query.filter(models.Alarm.name == name) if enabled is not None: query = query.filter(models.Alarm.enabled == enabled) if user is not None: query = query.filter(models.Alarm.user_id == user) if project is not None: query = query.filter(models.Alarm.project_id == project) if alarm_id is not None: query = query.filter(models.Alarm.id == alarm_id) return (self._row_to_alarm_model(x) for x in query.all()) def create_alarm(self, alarm): """Create an alarm. :param alarm: The alarm to create. """ session = sqlalchemy_session.get_session() with session.begin(): Connection._create_or_update(session, models.User, alarm.user_id) Connection._create_or_update(session, models.Project, alarm.project_id) alarm_row = models.Alarm(id=alarm.alarm_id) alarm_row.update(alarm.as_dict()) session.add(alarm_row) return self._row_to_alarm_model(alarm_row) def update_alarm(self, alarm): """Update an alarm. :param alarm: the new Alarm to update """ session = sqlalchemy_session.get_session() with session.begin(): Connection._create_or_update(session, models.User, alarm.user_id) Connection._create_or_update(session, models.Project, alarm.project_id) alarm_row = session.merge(models.Alarm(id=alarm.alarm_id)) alarm_row.update(alarm.as_dict()) return self._row_to_alarm_model(alarm_row) @staticmethod def delete_alarm(alarm_id): """Delete a alarm :param alarm_id: ID of the alarm to delete """ session = sqlalchemy_session.get_session() with session.begin(): session.query(models.Alarm).filter( models.Alarm.id == alarm_id).delete() @staticmethod def _row_to_alarm_change_model(row): return api_models.AlarmChange(event_id=row.event_id, alarm_id=row.alarm_id, type=row.type, detail=row.detail, user_id=row.user_id, project_id=row.project_id, on_behalf_of=row.on_behalf_of, timestamp=row.timestamp) def get_alarm_changes(self, alarm_id, on_behalf_of, user=None, project=None, type=None, start_timestamp=None, start_timestamp_op=None, end_timestamp=None, end_timestamp_op=None): """Yields list of AlarmChanges describing alarm history Changes are always sorted in reverse order of occurrence, given the importance of currency. Segregation for non-administrative users is done on the basis of the on_behalf_of parameter. This allows such users to have visibility on both the changes initiated by themselves directly (generally creation, rule changes, or deletion) and also on those changes initiated on their behalf by the alarming service (state transitions after alarm thresholds are crossed). :param alarm_id: ID of alarm to return changes for :param on_behalf_of: ID of tenant to scope changes query (None for administrative user, indicating all projects) :param user: Optional ID of user to return changes for :param project: Optional ID of project to return changes for :project type: Optional change type :param start_timestamp: Optional modified timestamp start range :param start_timestamp_op: Optional timestamp start range operation :param end_timestamp: Optional modified timestamp end range :param end_timestamp_op: Optional timestamp end range operation """ session = sqlalchemy_session.get_session() query = session.query(models.AlarmChange) query = query.filter(models.AlarmChange.alarm_id == alarm_id) if on_behalf_of is not None: query = query.filter( models.AlarmChange.on_behalf_of == on_behalf_of) if user is not None: query = query.filter(models.AlarmChange.user_id == user) if project is not None: query = query.filter(models.AlarmChange.project_id == project) if type is not None: query = query.filter(models.AlarmChange.type == type) if start_timestamp: if start_timestamp_op == 'gt': query = query.filter( models.AlarmChange.timestamp > start_timestamp) else: query = query.filter( models.AlarmChange.timestamp >= start_timestamp) if end_timestamp: if end_timestamp_op == 'le': query = query.filter( models.AlarmChange.timestamp <= end_timestamp) else: query = query.filter( models.AlarmChange.timestamp < end_timestamp) query = query.order_by(desc(models.AlarmChange.timestamp)) return (self._row_to_alarm_change_model(x) for x in query.all()) def record_alarm_change(self, alarm_change): """Record alarm change event. """ session = sqlalchemy_session.get_session() with session.begin(): Connection._create_or_update(session, models.User, alarm_change['user_id']) Connection._create_or_update(session, models.Project, alarm_change['project_id']) Connection._create_or_update(session, models.Project, alarm_change['on_behalf_of']) alarm_change_row = models.AlarmChange( event_id=alarm_change['event_id']) alarm_change_row.update(alarm_change) session.add(alarm_change_row) @staticmethod def _get_or_create_trait_type(trait_type, data_type, session=None): """Find if this trait already exists in the database, and if it does not, create a new entry in the trait type table. """ if session is None: session = sqlalchemy_session.get_session() with session.begin(subtransactions=True): tt = session.query(models.TraitType).filter( models.TraitType.desc == trait_type, models.TraitType.data_type == data_type).first() if not tt: tt = models.TraitType(trait_type, data_type) session.add(tt) return tt @classmethod def _make_trait(cls, trait_model, event, session=None): """Make a new Trait from a Trait model. Doesn't flush or add to session. """ trait_type = cls._get_or_create_trait_type(trait_model.name, trait_model.dtype, session) value_map = models.Trait._value_map values = {'t_string': None, 't_float': None, 't_int': None, 't_datetime': None} value = trait_model.value values[value_map[trait_model.dtype]] = value return models.Trait(trait_type, event, **values) @staticmethod def _get_or_create_event_type(event_type, session=None): """Here, we check to see if an event type with the supplied name already exists. If not, we create it and return the record. This may result in a flush. """ if session is None: session = sqlalchemy_session.get_session() with session.begin(subtransactions=True): et = session.query(models.EventType).filter( models.EventType.desc == event_type).first() if not et: et = models.EventType(event_type) session.add(et) return et @classmethod def _record_event(cls, session, event_model): """Store a single Event, including related Traits. """ with session.begin(subtransactions=True): event_type = cls._get_or_create_event_type(event_model.event_type, session=session) event = models.Event(event_model.message_id, event_type, event_model.generated) session.add(event) new_traits = [] if event_model.traits: for trait in event_model.traits: t = cls._make_trait(trait, event, session=session) session.add(t) new_traits.append(t) # Note: we don't flush here, explicitly (unless a new trait or event # does it). Otherwise, just wait until all the Events are staged. return (event, new_traits) def record_events(self, event_models): """Write the events to SQL database via sqlalchemy. :param event_models: a list of model.Event objects. Returns a list of events that could not be saved in a (reason, event) tuple. Reasons are enumerated in storage.model.Event Flush when they're all added, unless new EventTypes or TraitTypes are added along the way. """ session = sqlalchemy_session.get_session() events = [] problem_events = [] for event_model in event_models: event = None try: with session.begin(): event = self._record_event(session, event_model) except dbexc.DBDuplicateEntry: problem_events.append((api_models.Event.DUPLICATE, event_model)) except Exception as e: LOG.exception(_('Failed to record event: %s') % e) problem_events.append((api_models.Event.UNKNOWN_PROBLEM, event_model)) events.append(event) return problem_events def get_events(self, event_filter): """Return an iterable of model.Event objects. :param event_filter: EventFilter instance """ start = event_filter.start_time end = event_filter.end_time session = sqlalchemy_session.get_session() LOG.debug(_("Getting events that match filter: %s") % event_filter) with session.begin(): event_query = session.query(models.Event) # Build up the join conditions event_join_conditions = [models.EventType.id == models.Event.event_type_id] if event_filter.event_type: event_join_conditions\ .append(models.EventType.desc == event_filter.event_type) event_query = event_query.join(models.EventType, and_(*event_join_conditions)) # Build up the where conditions event_filter_conditions = [] if event_filter.message_id: event_filter_conditions\ .append(models.Event.message_id == event_filter.message_id) if start: event_filter_conditions.append(models.Event.generated >= start) if end: event_filter_conditions.append(models.Event.generated <= end) if event_filter_conditions: event_query = event_query\ .filter(and_(*event_filter_conditions)) event_models_dict = {} if event_filter.traits_filter: for trait_filter in event_filter.traits_filter: # Build a sub query that joins Trait to TraitType # where the trait name matches trait_name = trait_filter.pop('key') conditions = [models.Trait.trait_type_id == models.TraitType.id, models.TraitType.desc == trait_name] for key, value in trait_filter.iteritems(): if key == 'string': conditions.append(models.Trait.t_string == value) elif key == 'integer': conditions.append(models.Trait.t_int == value) elif key == 'datetime': conditions.append(models.Trait.t_datetime == value) elif key == 'float': conditions.append(models.Trait.t_float == value) trait_query = session.query(models.Trait.event_id)\ .join(models.TraitType, and_(*conditions)).subquery() event_query = event_query\ .join(trait_query, models.Event.id == trait_query.c.event_id) else: # If there are no trait filters, grab the events from the db query = session.query(models.Event.id, models.Event.generated, models.Event.message_id, models.EventType.desc)\ .join(models.EventType, and_(*event_join_conditions)) if event_filter_conditions: query = query.filter(and_(*event_filter_conditions)) for (id, generated, message_id, desc) in query.all(): event_models_dict[id] = api_models.Event(message_id, desc, generated, []) # Build event models for the events event_query = event_query.subquery() query = session.query(models.Trait)\ .join(models.TraitType, models.Trait.trait_type_id == models.TraitType.id)\ .join(event_query, models.Trait.event_id == event_query.c.id) # Now convert the sqlalchemy objects back into Models ... for trait in query.all(): event = event_models_dict.get(trait.event_id) if not event: event = api_models.Event( trait.event.message_id, trait.event.event_type.desc, trait.event.generated, []) event_models_dict[trait.event_id] = event trait_model = api_models.Trait(trait.trait_type.desc, trait.trait_type.data_type, trait.get_value()) event.append_trait(trait_model) event_models = event_models_dict.values() return sorted(event_models, key=operator.attrgetter('generated')) @staticmethod def get_event_types(): """Return all event types as an iterable of strings. """ session = sqlalchemy_session.get_session() with session.begin(): query = session.query(models.EventType.desc)\ .order_by(models.EventType.desc) for name in query.all(): # The query returns a tuple with one element. yield name[0] @staticmethod def get_trait_types(event_type): """Return a dictionary containing the name and data type of the trait type. Only trait types for the provided event_type are returned. :param event_type: the type of the Event """ session = sqlalchemy_session.get_session() LOG.debug(_("Get traits for %s") % event_type) with session.begin(): query = (session.query(models.TraitType.desc, models.TraitType.data_type) .join(models.Trait, models.Trait.trait_type_id == models.TraitType.id) .join(models.Event, models.Event.id == models.Trait.event_id) .join(models.EventType, and_(models.EventType.id == models.Event.id, models.EventType.desc == event_type)) .group_by(models.TraitType.desc, models.TraitType.data_type) .distinct()) for desc, type in query.all(): yield {'name': desc, 'data_type': type} @staticmethod def get_traits(event_type, trait_type=None): """Return all trait instances associated with an event_type. If trait_type is specified, only return instances of that trait type. :param event_type: the type of the Event to filter by :param trait_type: the name of the Trait to filter by """ session = sqlalchemy_session.get_session() with session.begin(): trait_type_filters = [models.TraitType.id == models.Trait.trait_type_id] if trait_type: trait_type_filters.append(models.TraitType.desc == trait_type) query = (session.query(models.Trait) .join(models.TraitType, and_(*trait_type_filters)) .join(models.Event, models.Event.id == models.Trait.event_id) .join(models.EventType, and_(models.EventType.id == models.Event.event_type_id, models.EventType.desc == event_type))) for trait in query.all(): type = trait.trait_type yield api_models.Trait(name=type.desc, dtype=type.data_type, value=trait.get_value())