diff --git a/ceilometer/storage/impl_sqlalchemy.py b/ceilometer/storage/impl_sqlalchemy.py index fe95443d7..d44da0a70 100644 --- a/ceilometer/storage/impl_sqlalchemy.py +++ b/ceilometer/storage/impl_sqlalchemy.py @@ -55,17 +55,22 @@ class SQLAlchemyStorage(base.StorageEngine): - { id: source id } - project - { id: project uuid } + - meter + - meter definition + - { id: meter def id + name: meter name + type: meter type + unit: meter unit + } - sample - the raw incoming data - { id: sample id - counter_name: counter name + meter_id: meter id (->meter.id) user_id: user uuid (->user.id) project_id: project uuid (->project.id) resource_id: resource uuid (->resource.id) resource_metadata: metadata dictionaries - counter_type: counter type - counter_unit: counter unit - counter_volume: counter volume + volume: sample volume timestamp: datetime message_signature: message signature message_id: message uuid @@ -123,7 +128,7 @@ def apply_metaquery_filter(session, query, metaquery): meta_q = session.query(_model).\ filter(and_(_model.meta_key == key, _model.value == v)).subquery() - query = query.filter_by(id=meta_q.c.id) + query = query.filter(models.Sample.id == meta_q.c.id) return query @@ -138,7 +143,7 @@ def make_query_from_filter(session, query, sample_filter, require_meter=True): """ if sample_filter.meter: - query = query.filter(models.Sample.counter_name == sample_filter.meter) + query = query.filter(models.Meter.name == sample_filter.meter) elif require_meter: raise RuntimeError(_('Missing required meter specifier')) if sample_filter.source: @@ -157,13 +162,16 @@ def make_query_from_filter(session, query, sample_filter, require_meter=True): else: query = query.filter(models.Sample.timestamp < ts_end) if sample_filter.user: - query = query.filter_by(user_id=sample_filter.user) + query = query.filter(models.Sample.user_id == sample_filter.user) if sample_filter.project: - query = query.filter_by(project_id=sample_filter.project) + query = query.filter( + models.Sample.project_id == sample_filter.project) if sample_filter.resource: - query = query.filter_by(resource_id=sample_filter.resource) + query = query.filter( + models.Sample.resource_id == sample_filter.resource) if sample_filter.message_id: - query = query.filter_by(message_id=sample_filter.message_id) + query = query.filter( + models.Sample.message_id == sample_filter.message_id) if sample_filter.metaquery: query = apply_metaquery_filter(session, query, @@ -250,6 +258,28 @@ class Connection(base.Connection): setattr(obj, k, kwargs[k]) return obj + @staticmethod + def _create_meter(session, name, type, unit): + try: + nested = session.connection().dialect.name != 'sqlite' + if not nested and session.query(models.Meter)\ + .filter(models.Meter.name == name)\ + .filter(models.Meter.type == type)\ + .filter(models.Meter.unit == unit).count() > 0: + raise dbexc.DBDuplicateEntry() + + with session.begin(nested=nested, + subtransactions=not nested): + obj = models.Meter(name=name, type=type, unit=unit) + session.add(obj) + except dbexc.DBDuplicateEntry: + obj = session.query(models.Meter)\ + .filter(models.Meter.name == name)\ + .filter(models.Meter.type == type)\ + .filter(models.Meter.unit == unit).first() + + return obj + def record_metering_data(self, data): """Write the data to the backend storage system. @@ -272,9 +302,11 @@ class Connection(base.Connection): resource_metadata=rmetadata) # Record the raw data for the sample. - sample = models.Sample(counter_type=data['counter_type'], - counter_unit=data['counter_unit'], - counter_name=data['counter_name'], + meter = self._create_meter(session, + data['counter_name'], + data['counter_type'], + data['counter_unit']) + sample = models.Sample(meter_id=meter.id, resource=resource) session.add(sample) if not filter(lambda x: x.id == source.id, sample.sources): @@ -283,7 +315,7 @@ class Connection(base.Connection): sample.user = user sample.timestamp = data['timestamp'] sample.resource_metadata = rmetadata - sample.counter_volume = data['counter_volume'] + sample.volume = data['counter_volume'] sample.message_signature = data['message_signature'] sample.message_id = data['message_id'] session.flush() @@ -466,7 +498,7 @@ class Connection(base.Connection): last_sample_timestamp=last_ts, source=sample.sources[0].id, user_id=sample.user_id, - metadata=sample.resource_metadata, + metadata=sample.resource_metadata ) def get_meters(self, user=None, project=None, resource=None, source=None, @@ -491,31 +523,34 @@ class Connection(base.Connection): # subquery_sample is used to reduce sample records # by selecting a record for each (resource_id, counter_name). # max() is used to choice a sample record, so the latest record - # is selected for each (resource_id, counter_name). - # + # is selected for each (resource_id, meter.name). + subquery_sample = session.query( func.max(models.Sample.id).label('id'))\ + .join(models.Meter)\ .group_by(models.Sample.resource_id, - models.Sample.counter_name).subquery() + models.Meter.name).subquery() # The SQL of query_sample is essentially: # # SELECT sample.* FROM sample INNER JOIN # (SELECT max(sample.id) AS id FROM sample - # GROUP BY sample.resource_id, sample.counter_name) AS anon_2 + # GROUP BY sample.resource_id, meter.name) AS anon_2 # ON sample.id = anon_2.id - # - query_sample = session.query(models.Sample).\ - join(subquery_sample, models.Sample.id == subquery_sample.c.id) + + query_sample = session.query(models.MeterSample).\ + join(subquery_sample, + models.MeterSample.id == subquery_sample.c.id) if metaquery: query_sample = apply_metaquery_filter(session, query_sample, metaquery) - alias_sample = aliased(models.Sample, query_sample.subquery()) - query = session.query(models.Resource, alias_sample).join( - alias_sample, models.Resource.id == alias_sample.resource_id) + alias_sample = aliased(models.MeterSample, + query_sample.with_labels().subquery()) + query = session.query(models.Resource, alias_sample)\ + .filter(models.Resource.id == alias_sample.resource_id) if user is not None: query = query.filter(models.Resource.user_id == user) @@ -577,7 +612,7 @@ class Connection(base.Connection): if limit == 0: return [] - table = models.Sample + table = models.MeterSample session = self._get_db_session() query = session.query(table) query = make_query_from_filter(session, query, sample_filter, @@ -606,7 +641,7 @@ class Connection(base.Connection): limit, table) - retrieve = {models.Sample: self._retrieve_samples, + retrieve = {models.MeterSample: self._retrieve_samples, models.Alarm: self._retrieve_alarms, models.AlarmChange: self._retrieve_alarm_history} return retrieve[table](query) @@ -615,7 +650,7 @@ class Connection(base.Connection): return self._retrieve_data(filter_expr, orderby, limit, - models.Sample) + models.MeterSample) def _transform_expression(self, expression_tree, table): @@ -648,14 +683,14 @@ class Connection(base.Connection): def _make_stats_query(self, sample_filter, groupby): select = [ - models.Sample.counter_unit.label('unit'), + models.Meter.unit, func.min(models.Sample.timestamp).label('tsmin'), func.max(models.Sample.timestamp).label('tsmax'), - func.avg(models.Sample.counter_volume).label('avg'), - func.sum(models.Sample.counter_volume).label('sum'), - func.min(models.Sample.counter_volume).label('min'), - func.max(models.Sample.counter_volume).label('max'), - func.count(models.Sample.counter_volume).label('count'), + func.avg(models.Sample.volume).label('avg'), + func.sum(models.Sample.volume).label('sum'), + func.min(models.Sample.volume).label('min'), + func.max(models.Sample.volume).label('max'), + func.count(models.Sample.volume).label('count') ] session = self._get_db_session() diff --git a/ceilometer/storage/sqlalchemy/migrate_repo/versions/031_add_new_meter_table.py b/ceilometer/storage/sqlalchemy/migrate_repo/versions/031_add_new_meter_table.py new file mode 100644 index 000000000..b02674bf4 --- /dev/null +++ b/ceilometer/storage/sqlalchemy/migrate_repo/versions/031_add_new_meter_table.py @@ -0,0 +1,115 @@ +# +# Copyright 2013 OpenStack Foundation +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +import migrate +import sqlalchemy as sa + + +def handle_rid_index(meta, downgrade=False): + if meta.bind.engine.name == 'sqlite': + return + + resource = sa.Table('resource', meta, autoload=True) + sample = sa.Table('sample', meta, autoload=True) + params = {'columns': [sample.c.resource_id], + 'refcolumns': [resource.c.id], + 'name': 'fk_sample_resource_id'} + if meta.bind.engine.name == 'mysql': + # For mysql dialect all dependent FK should be removed + # before index create/delete + migrate.ForeignKeyConstraint(**params).drop() + + index = sa.Index('idx_sample_rid_cname', sample.c.resource_id, + sample.c.counter_name) + index.create() if downgrade else index.drop() + + if meta.bind.engine.name == 'mysql': + migrate.ForeignKeyConstraint(**params).create() + + +def upgrade(migrate_engine): + meta = sa.MetaData(bind=migrate_engine) + meter = sa.Table( + 'meter', meta, + sa.Column('id', sa.Integer, primary_key=True), + sa.Column('name', sa.String(255), nullable=False), + sa.Column('type', sa.String(255)), + sa.Column('unit', sa.String(255)), + sa.UniqueConstraint('name', 'type', 'unit', name='def_unique'), + mysql_engine='InnoDB', + mysql_charset='utf8' + ) + meter.create() + sample = sa.Table('sample', meta, autoload=True) + query = sa.select([sample.c.counter_name, sample.c.counter_type, + sample.c.counter_unit]).distinct() + for row in query.execute(): + meter.insert().values(name=row['counter_name'], + type=row['counter_type'], + unit=row['counter_unit']).execute() + + meter_id = sa.Column('meter_id', sa.Integer) + meter_id.create(sample) + params = {'columns': [sample.c.meter_id], + 'refcolumns': [meter.c.id]} + if migrate_engine.name == 'mysql': + params['name'] = 'fk_sample_meter_id' + if migrate_engine.name != 'sqlite': + migrate.ForeignKeyConstraint(**params).create() + + index = sa.Index('ix_meter_name', meter.c.name) + index.create(bind=migrate_engine) + + for row in sa.select([meter]).execute(): + sample.update()\ + .where(sa.and_(sample.c.counter_name == row['name'], + sample.c.counter_type == row['type'], + sample.c.counter_unit == row['unit']))\ + .values({sample.c.meter_id: row['id']}).execute() + + handle_rid_index(meta) + + sample.c.counter_name.drop() + sample.c.counter_type.drop() + sample.c.counter_unit.drop() + sample.c.counter_volume.alter(name='volume') + + +def downgrade(migrate_engine): + meta = sa.MetaData(bind=migrate_engine) + sample = sa.Table('sample', meta, autoload=True) + sample.c.volume.alter(name='counter_volume') + sa.Column('counter_name', sa.String(255)).create(sample) + sa.Column('counter_type', sa.String(255)).create(sample) + sa.Column('counter_unit', sa.String(255)).create(sample) + meter = sa.Table('meter', meta, autoload=True) + for row in sa.select([meter]).execute(): + sample.update()\ + .where(sample.c.meter_id == row['id'])\ + .values({sample.c.counter_name: row['name'], + sample.c.counter_type: row['type'], + sample.c.counter_unit: row['unit']}).execute() + + params = {'columns': [sample.c.meter_id], + 'refcolumns': [meter.c.id]} + if migrate_engine.name == 'mysql': + params['name'] = 'fk_sample_meter_id' + if migrate_engine.name != 'sqlite': + migrate.ForeignKeyConstraint(**params).drop() + + handle_rid_index(meta, True) + + sample.c.meter_id.drop() + meter.drop() diff --git a/ceilometer/storage/sqlalchemy/models.py b/ceilometer/storage/sqlalchemy/models.py index 8a8f437da..6ce20d6b3 100644 --- a/ceilometer/storage/sqlalchemy/models.py +++ b/ceilometer/storage/sqlalchemy/models.py @@ -23,11 +23,12 @@ import six.moves.urllib.parse as urlparse from oslo.config import cfg from sqlalchemy import Column, Integer, String, Table, ForeignKey, \ - Index, UniqueConstraint, BigInteger + Index, UniqueConstraint, BigInteger, join from sqlalchemy import Float, Boolean, Text, DateTime from sqlalchemy.dialects.mysql import DECIMAL from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import backref +from sqlalchemy.orm import column_property from sqlalchemy.orm import relationship from sqlalchemy.types import TypeDecorator @@ -189,6 +190,20 @@ class MetaFloat(Base): value = Column(Float(53), default=False) +class Meter(Base): + """Meter definition data.""" + + __tablename__ = 'meter' + __table_args__ = ( + UniqueConstraint('name', 'type', 'unit', name='def_unique'), + Index('ix_meter_name', 'name') + ) + id = Column(Integer, primary_key=True) + name = Column(String(255), nullable=False) + type = Column(String(255)) + unit = Column(String(255)) + + class Sample(Base): """Metering data.""" @@ -197,17 +212,14 @@ class Sample(Base): Index('ix_sample_timestamp', 'timestamp'), Index('ix_sample_user_id', 'user_id'), Index('ix_sample_project_id', 'project_id'), - Index('idx_sample_rid_cname', 'resource_id', 'counter_name'), ) id = Column(Integer, primary_key=True) - counter_name = Column(String(255)) + meter_id = Column(Integer, ForeignKey('meter.id')) user_id = Column(String(255), ForeignKey('user.id')) project_id = Column(String(255), ForeignKey('project.id')) resource_id = Column(String(255), ForeignKey('resource.id')) resource_metadata = Column(JSONEncodedDict()) - counter_type = Column(String(255)) - counter_unit = Column(String(255)) - counter_volume = Column(Float(53)) + volume = Column(Float(53)) timestamp = Column(PreciseTimestamp(), default=timeutils.utcnow) recorded_at = Column(PreciseTimestamp(), default=timeutils.utcnow) message_signature = Column(String(1000)) @@ -223,6 +235,23 @@ class Sample(Base): cascade="all, delete-orphan") +class MeterSample(Base): + """Helper model as many of the filters work against Sample data + joined with Meter data. + """ + meter = Meter.__table__ + sample = Sample.__table__ + __table__ = join(meter, sample) + + id = column_property(sample.c.id) + meter_id = column_property(meter.c.id, sample.c.meter_id) + counter_name = column_property(meter.c.name) + counter_type = column_property(meter.c.type) + counter_unit = column_property(meter.c.unit) + counter_volume = column_property(sample.c.volume) + sources = relationship("Source", secondary=lambda: sourceassoc) + + class User(Base): __tablename__ = 'user' id = Column(String(255), primary_key=True)