From 4a40cfe7ba0db6b91b14f250aa3823340a43b72a Mon Sep 17 00:00:00 2001 From: gordon chung Date: Fri, 1 Aug 2014 11:25:56 -0400 Subject: [PATCH] normalise resource data create a table to contain resource specific attributes. the new resource table will contain: - id - resource_id, - project_id, - user_id, - source_id - resource_metadata - metadata_hash we will only capture unique resources (ie. unique ids or metadata) Closes-Bug: #1320257 Implements: blueprint bigger-data-sql Co-Authored-By: Srinivas Sakhamuri Change-Id: I4a32688ad3a78e574e018a154cb145e31da6298e --- ceilometer/storage/impl_sqlalchemy.py | 250 ++++++++++++------ .../versions/038_normalise_tables.py | 172 ++++++++++++ ceilometer/storage/sqlalchemy/models.py | 106 +++++--- ceilometer/storage/sqlalchemy/utils.py | 2 +- 4 files changed, 407 insertions(+), 123 deletions(-) create mode 100644 ceilometer/storage/sqlalchemy/migrate_repo/versions/038_normalise_tables.py diff --git a/ceilometer/storage/impl_sqlalchemy.py b/ceilometer/storage/impl_sqlalchemy.py index 7ee715d10..868a1cef2 100644 --- a/ceilometer/storage/impl_sqlalchemy.py +++ b/ceilometer/storage/impl_sqlalchemy.py @@ -18,6 +18,7 @@ from __future__ import absolute_import import datetime +import hashlib import operator import os @@ -33,6 +34,7 @@ from sqlalchemy import func from sqlalchemy.orm import aliased from ceilometer.openstack.common.gettextutils import _ +from ceilometer.openstack.common import jsonutils from ceilometer.openstack.common import log from ceilometer import storage from ceilometer.storage import base @@ -62,7 +64,7 @@ PARAMETERIZED_AGGREGATES = dict( ), compute=dict( cardinality=lambda p: func.count( - distinct(getattr(models.Sample, p)) + distinct(getattr(models.Resource, p)) ).label('cardinality/%s' % p) ) ) @@ -116,7 +118,7 @@ def apply_metaquery_filter(session, query, metaquery): {"key": k, "value": type(value)}) else: meta_alias = aliased(_model) - on_clause = and_(models.Sample.id == meta_alias.id, + on_clause = and_(models.Resource.internal_id == meta_alias.id, meta_alias.meta_key == key) # outer join is needed to support metaquery # with or operator on non existent metadata field @@ -144,7 +146,7 @@ def make_query_from_filter(session, query, sample_filter, require_meter=True): raise RuntimeError('Missing required meter specifier') if sample_filter.source: query = query.filter( - models.Sample.source_id == sample_filter.source) + models.Resource.source_id == sample_filter.source) if sample_filter.start: ts_start = sample_filter.start if sample_filter.start_timestamp_op == 'gt': @@ -158,13 +160,13 @@ def make_query_from_filter(session, query, sample_filter, require_meter=True): else: query = query.filter(models.Sample.timestamp < ts_end) if sample_filter.user: - query = query.filter(models.Sample.user_id == sample_filter.user) + query = query.filter(models.Resource.user_id == sample_filter.user) if sample_filter.project: query = query.filter( - models.Sample.project_id == sample_filter.project) + models.Resource.project_id == sample_filter.project) if sample_filter.resource: query = query.filter( - models.Sample.resource_id == sample_filter.resource) + models.Resource.resource_id == sample_filter.resource) if sample_filter.message_id: query = query.filter( models.Sample.message_id == sample_filter.message_id) @@ -183,22 +185,29 @@ class Connection(base.Connection): - meter - meter definition - - { id: meter def id + - { id: meter id name: meter name type: meter type unit: meter unit } + - resource + - resource definition + - { internal_id: resource id + resource_id: resource uuid + user_id: user uuid + project_id: project uuid + source_id: source id + resource_metadata: metadata dictionary + metadata_hash: metadata dictionary hash + } - sample - the raw incoming data - { id: sample id meter_id: meter id (->meter.id) - user_id: user uuid - project_id: project uuid - resource_id: resource uuid - source_id: source id - resource_metadata: metadata dictionaries + resource_id: resource id (->resource.internal_id) volume: sample volume timestamp: datetime + recorded_at: datetime message_signature: message signature message_id: message uuid } @@ -230,6 +239,7 @@ class Connection(base.Connection): @staticmethod def _create_meter(session, name, type, unit): + # TODO(gordc): implement lru_cache to improve performance try: nested = session.connection().dialect.name != 'sqlite' with session.begin(nested=nested, @@ -247,6 +257,55 @@ class Connection(base.Connection): return obj + @staticmethod + def _create_resource(session, res_id, user_id, project_id, source_id, + rmeta): + # TODO(gordc): implement lru_cache to improve performance + try: + nested = session.connection().dialect.name != 'sqlite' + m_hash = jsonutils.dumps(rmeta, sort_keys=True) + with session.begin(nested=nested, + subtransactions=not nested): + obj = (session.query(models.Resource.internal_id) + .filter(models.Resource.resource_id == res_id) + .filter(models.Resource.user_id == user_id) + .filter(models.Resource.project_id == project_id) + .filter(models.Resource.source_id == source_id) + .filter(models.Resource.metadata_hash == + hashlib.md5(m_hash).hexdigest()).first()) + obj_id = obj[0] if obj else None + if obj_id is None: + obj = models.Resource(resource_id=res_id, user_id=user_id, + project_id=project_id, + source_id=source_id, + resource_metadata=rmeta) + session.add(obj) + session.flush() + obj_id = obj.internal_id + if rmeta and isinstance(rmeta, dict): + meta_map = {} + for key, v in utils.dict_to_keyval(rmeta): + try: + _model = sql_utils.META_TYPE_MAP[type(v)] + if meta_map.get(_model) is None: + meta_map[_model] = [] + meta_map[_model].append( + {'id': obj_id, 'meta_key': key, + 'value': v}) + except KeyError: + LOG.warn(_("Unknown metadata type. Key (%s) " + "will not be queryable."), key) + for _model in meta_map.keys(): + session.execute(_model.__table__.insert(), + meta_map[_model]) + + except dbexc.DBDuplicateEntry: + # retry function to pick up duplicate committed object + obj_id = Connection._create_resource(session, res_id, user_id, + project_id, source_id, rmeta) + + return obj_id + def record_metering_data(self, data): """Write the data to the backend storage system. @@ -256,36 +315,24 @@ class Connection(base.Connection): session = self._engine_facade.get_session() with session.begin(): # Record the raw data for the sample. - rmetadata = data['resource_metadata'] meter = self._create_meter(session, data['counter_name'], data['counter_type'], data['counter_unit']) - sample = models.Sample(meter_id=meter.id) + res_id = self._create_resource(session, + data['resource_id'], + data['user_id'], + data['project_id'], + data['source'], + data['resource_metadata']) + sample = models.Sample( + meter_id=meter.id, + resource_id=res_id, + timestamp=data['timestamp'], + volume=data['counter_volume'], + message_signature=data['message_signature'], + message_id=data['message_id']) session.add(sample) - sample.resource_id = data['resource_id'] - sample.project_id = data['project_id'] - sample.user_id = data['user_id'] - sample.timestamp = data['timestamp'] - sample.resource_metadata = rmetadata - sample.volume = data['counter_volume'] - sample.message_signature = data['message_signature'] - sample.message_id = data['message_id'] - sample.source_id = data['source'] - session.flush() - - if rmetadata: - if isinstance(rmetadata, dict): - for key, v in utils.dict_to_keyval(rmetadata): - try: - _model = sql_utils.META_TYPE_MAP[type(v)] - except KeyError: - LOG.warn(_("Unknown metadata type. Key (%s) will " - "not be queryable."), key) - else: - session.add(_model(id=sample.id, - meta_key=key, - value=v)) def clear_expired_metering_data(self, ttl): """Clear expired data from the backend storage system. @@ -312,6 +359,9 @@ class Connection(base.Connection): (session.query(models.Meter) .filter(~models.Meter.samples.any()) .delete(synchronize_session='fetch')) + (session.query(models.Resource) + .filter(~models.Resource.samples.any()) + .delete(synchronize_session='fetch')) LOG.info(_("%d samples removed from database"), rows) def get_resources(self, user=None, project=None, source=None, @@ -346,14 +396,19 @@ class Connection(base.Connection): session = self._engine_facade.get_session() # get list of resource_ids - res_q = session.query(distinct(models.Sample.resource_id)) + res_q = session.query(distinct(models.Resource.resource_id)).join( + models.Sample, + models.Sample.resource_id == models.Resource.internal_id) res_q = make_query_from_filter(session, res_q, s_filter, require_meter=False) for res_id in res_q.all(): # get latest Sample max_q = (session.query(models.Sample) - .filter(models.Sample.resource_id == res_id[0])) + .join(models.Resource, + models.Resource.internal_id == + models.Sample.resource_id) + .filter(models.Resource.resource_id == res_id[0])) max_q = make_query_from_filter(session, max_q, s_filter, require_meter=False) max_q = max_q.order_by(models.Sample.timestamp.desc(), @@ -361,7 +416,10 @@ class Connection(base.Connection): # get the min timestamp value. min_q = (session.query(models.Sample.timestamp) - .filter(models.Sample.resource_id == res_id[0])) + .join(models.Resource, + models.Resource.internal_id == + models.Sample.resource_id) + .filter(models.Resource.resource_id == res_id[0])) min_q = make_query_from_filter(session, min_q, s_filter, require_meter=False) min_q = min_q.order_by(models.Sample.timestamp.asc()).limit(1) @@ -369,13 +427,13 @@ class Connection(base.Connection): sample = max_q.first() if sample: yield api_models.Resource( - resource_id=sample.resource_id, - project_id=sample.project_id, + resource_id=sample.resource.resource_id, + project_id=sample.resource.project_id, first_sample_timestamp=min_q.first().timestamp, last_sample_timestamp=sample.timestamp, - source=sample.source_id, - user_id=sample.user_id, - metadata=sample.resource_metadata + source=sample.resource.source_id, + user_id=sample.resource.user_id, + metadata=sample.resource.resource_metadata ) def get_meters(self, user=None, project=None, resource=None, source=None, @@ -399,37 +457,41 @@ class Connection(base.Connection): metaquery=metaquery, resource=resource) + # NOTE(gordc): get latest sample of each meter/resource. we do not + # filter here as we want to filter only on latest record. session = self._engine_facade.get_session() + subq = session.query(func.max(models.Sample.id).label('id')).join( + models.Resource, + models.Resource.internal_id == models.Sample.resource_id).group_by( + models.Sample.meter_id, models.Resource.resource_id) + if resource: + subq = subq.filter(models.Resource.resource_id == resource) + subq = subq.subquery() - # sample_subq is used to reduce sample records - # by selecting a record for each (resource_id, meter_id). - # max() is used to choice a sample record, so the latest record - # is selected for each (resource_id, meter_id). - sample_subq = (session.query( - func.max(models.Sample.id).label('id')) - .group_by(models.Sample.meter_id, - models.Sample.resource_id)) - sample_subq = sample_subq.subquery() - - # SELECT sample.* FROM sample INNER JOIN - # (SELECT max(sample.id) AS id FROM sample - # GROUP BY sample.resource_id, sample.meter_id) AS anon_2 - # ON sample.id = anon_2.id - query_sample = (session.query(models.MeterSample). - join(sample_subq, models.MeterSample.id == - sample_subq.c.id)) + # get meter details for samples. + query_sample = (session.query(models.Sample.meter_id, + models.Meter.name, models.Meter.type, + models.Meter.unit, + models.Resource.resource_id, + models.Resource.project_id, + models.Resource.source_id, + models.Resource.user_id).join( + subq, subq.c.id == models.Sample.id) + .join(models.Meter, models.Meter.id == models.Sample.meter_id) + .join(models.Resource, + models.Resource.internal_id == models.Sample.resource_id)) query_sample = make_query_from_filter(session, query_sample, s_filter, require_meter=False) - for sample in query_sample.all(): + for row in query_sample.all(): yield api_models.Meter( - name=sample.counter_name, - type=sample.counter_type, - unit=sample.counter_unit, - resource_id=sample.resource_id, - project_id=sample.project_id, - source=sample.source_id, - user_id=sample.user_id) + name=row.name, + type=row.type, + unit=row.unit, + resource_id=row.resource_id, + project_id=row.project_id, + source=row.source_id, + user_id=row.user_id) def _retrieve_samples(self, query): samples = query.all() @@ -463,28 +525,41 @@ class Connection(base.Connection): if limit == 0: return [] - table = models.MeterSample session = self._engine_facade.get_session() - query = session.query(table) + query = session.query(models.Sample.timestamp, + models.Sample.recorded_at, + models.Sample.message_id, + models.Sample.message_signature, + models.Sample.volume.label('counter_volume'), + models.Meter.name.label('counter_name'), + models.Meter.type.label('counter_type'), + models.Meter.unit.label('counter_unit'), + models.Resource.source_id, + models.Resource.user_id, + models.Resource.project_id, + models.Resource.resource_metadata, + models.Resource.resource_id).join( + models.Meter, models.Meter.id == models.Sample.meter_id).join( + models.Resource, + models.Resource.internal_id == models.Sample.resource_id).order_by( + models.Sample.timestamp.desc()) query = make_query_from_filter(session, query, sample_filter, require_meter=False) - transformer = sql_utils.QueryTransformer(table, query) - transformer.apply_options(None, - limit) - return self._retrieve_samples(transformer.get_query()) + if limit: + query = query.limit(limit) + return self._retrieve_samples(query) def query_samples(self, filter_expr=None, orderby=None, limit=None): if limit == 0: return [] session = self._engine_facade.get_session() - query = session.query(models.MeterSample) - transformer = sql_utils.QueryTransformer(models.MeterSample, query) + query = session.query(models.FullSample) + transformer = sql_utils.QueryTransformer(models.FullSample, query) if filter_expr is not None: transformer.apply_filter(filter_expr) - transformer.apply_options(orderby, - limit) + transformer.apply_options(orderby, limit) return self._retrieve_samples(transformer.get_query()) @staticmethod @@ -515,22 +590,25 @@ class Connection(base.Connection): def _make_stats_query(self, sample_filter, groupby, aggregate): select = [ - models.Meter.unit, func.min(models.Sample.timestamp).label('tsmin'), func.max(models.Sample.timestamp).label('tsmax'), + models.Meter.unit ] - select.extend(self._get_aggregate_functions(aggregate)) session = self._engine_facade.get_session() if groupby: - group_attributes = [getattr(models.Sample, g) for g in groupby] + group_attributes = [getattr(models.Resource, g) for g in groupby] select.extend(group_attributes) - query = (session.query(*select).join( - models.Sample, models.Meter.id == models.Sample.meter_id). - group_by(models.Meter.unit)) + query = (session.query(*select) + .join(models.Meter, + models.Meter.id == models.Sample.meter_id) + .join( + models.Resource, + models.Resource.internal_id == models.Sample.resource_id) + .group_by(models.Meter.unit)) if groupby: query = query.group_by(*group_attributes) diff --git a/ceilometer/storage/sqlalchemy/migrate_repo/versions/038_normalise_tables.py b/ceilometer/storage/sqlalchemy/migrate_repo/versions/038_normalise_tables.py new file mode 100644 index 000000000..a596185a0 --- /dev/null +++ b/ceilometer/storage/sqlalchemy/migrate_repo/versions/038_normalise_tables.py @@ -0,0 +1,172 @@ +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +import hashlib + +import migrate +import sqlalchemy as sa + +from ceilometer.openstack.common import jsonutils + + +m_tables = [('metadata_text', sa.Text, True), + ('metadata_bool', sa.Boolean, False), + ('metadata_int', sa.BigInteger, False), + ('metadata_float', sa.Float(53), False)] + + +def _migrate_meta_tables(meta, col, new_col, new_fk): + for t_name, t_type, t_nullable in m_tables: + m_table = sa.Table(t_name, meta, autoload=True) + m_table_new = sa.Table( + '%s_new' % t_name, meta, + sa.Column('id', sa.Integer, sa.ForeignKey(new_fk), + primary_key=True), + sa.Column('meta_key', sa.String(255), + primary_key=True), + sa.Column('value', t_type, nullable=t_nullable), + mysql_engine='InnoDB', + mysql_charset='utf8', + ) + m_table_new.create() + + if m_table.select().scalar() is not None: + m_table_new.insert().from_select( + ['id', 'meta_key', 'value'], + sa.select([new_col, m_table.c.meta_key, + m_table.c.value]).where( + col == m_table.c.id).group_by( + new_col, m_table.c.meta_key, m_table.c.value)).execute() + + m_table.drop() + if meta.bind.engine.name != 'sqlite': + sa.Index('ix_%s_meta_key' % t_name, + m_table_new.c.meta_key).create() + m_table_new.rename(t_name) + + +def upgrade(migrate_engine): + meta = sa.MetaData(bind=migrate_engine) + resource = sa.Table( + 'resource', meta, + sa.Column('internal_id', sa.Integer, primary_key=True), + sa.Column('resource_id', sa.String(255)), + sa.Column('user_id', sa.String(255)), + sa.Column('project_id', sa.String(255)), + sa.Column('source_id', sa.String(255)), + sa.Column('resource_metadata', sa.Text), + sa.Column('metadata_hash', sa.String(32)), + mysql_engine='InnoDB', + mysql_charset='utf8') + resource.create() + + # copy resource data in to resource table + sample = sa.Table('sample', meta, autoload=True) + sa.Column('metadata_hash', sa.String(32)).create(sample) + for row in sa.select([sample.c.id, sample.c.resource_metadata]).execute(): + sample.update().where(sample.c.id == row['id']).values( + {sample.c.metadata_hash: + hashlib.md5(jsonutils.dumps( + row['resource_metadata'], + sort_keys=True)).hexdigest()}).execute() + query = sa.select([sample.c.resource_id, sample.c.user_id, + sample.c.project_id, sample.c.source_id, + sample.c.resource_metadata, + sample.c.metadata_hash]).distinct() + for row in query.execute(): + resource.insert().values( + resource_id=row['resource_id'], + user_id=row['user_id'], + project_id=row['project_id'], + source_id=row['source_id'], + resource_metadata=row['resource_metadata'], + metadata_hash=row['metadata_hash']).execute() + # link sample records to new resource records + sa.Column('resource_id_new', sa.Integer).create(sample) + for row in sa.select([resource]).execute(): + (sample.update(). + where(sa.and_( + sample.c.resource_id == row['resource_id'], + sample.c.user_id == row['user_id'], + sample.c.project_id == row['project_id'], + sample.c.source_id == row['source_id'], + sample.c.metadata_hash == row['metadata_hash'])). + values({sample.c.resource_id_new: row['internal_id']}).execute()) + + sample.c.resource_id.drop() + sample.c.metadata_hash.drop() + sample.c.resource_id_new.alter(name='resource_id') + # re-bind metadata to pick up alter name change + meta = sa.MetaData(bind=migrate_engine) + sample = sa.Table('sample', meta, autoload=True) + resource = sa.Table('resource', meta, autoload=True) + if migrate_engine.name != 'sqlite': + sa.Index('ix_resource_resource_id', resource.c.resource_id).create() + sa.Index('ix_sample_user_id', sample.c.user_id).drop() + sa.Index('ix_sample_project_id', sample.c.project_id).drop() + sa.Index('ix_sample_resource_id', sample.c.resource_id).create() + sa.Index('ix_sample_meter_id_resource_id', + sample.c.meter_id, sample.c.resource_id).create() + + params = {'columns': [sample.c.resource_id], + 'refcolumns': [resource.c.internal_id]} + if migrate_engine.name == 'mysql': + params['name'] = 'fk_sample_resource_internal_id' + migrate.ForeignKeyConstraint(**params).create() + + sample.c.user_id.drop() + sample.c.project_id.drop() + sample.c.source_id.drop() + sample.c.resource_metadata.drop() + + _migrate_meta_tables(meta, sample.c.id, sample.c.resource_id, + 'resource.internal_id') + + +def downgrade(migrate_engine): + meta = sa.MetaData(bind=migrate_engine) + sample = sa.Table('sample', meta, autoload=True) + _migrate_meta_tables(meta, sample.c.resource_id, sample.c.id, + 'sample.id') + + sa.Column('user_id', sa.String(255)).create(sample) + sa.Column('project_id', sa.String(255)).create(sample) + sa.Column('source_id', sa.String(255)).create(sample) + sa.Column('resource_id_new', sa.String(255)).create(sample) + sa.Column('resource_metadata', sa.Text).create(sample) + resource = sa.Table('resource', meta, autoload=True) + + for row in sa.select([resource]).execute(): + (sample.update(). + where(sample.c.resource_id == row['internal_id']). + values({sample.c.resource_id_new: row['resource_id'], + sample.c.user_id: row['user_id'], + sample.c.project_id: row['project_id'], + sample.c.source_id: row['source_id'], + sample.c.resource_metadata: row['resource_metadata']}) + .execute()) + + if migrate_engine.name != 'sqlite': + params = {'columns': [sample.c.resource_id], + 'refcolumns': [resource.c.internal_id]} + if migrate_engine.name == 'mysql': + params['name'] = 'fk_sample_resource_internal_id' + migrate.ForeignKeyConstraint(**params).drop() + sa.Index('ix_sample_meter_id_resource_id', + sample.c.meter_id, sample.c.resource_id).drop() + sa.Index('ix_sample_resource_id', sample.c.resource_id).drop() + sa.Index('ix_sample_user_id', sample.c.user_id).create() + sa.Index('ix_sample_project_id', sample.c.project_id).create() + + resource.drop() + sample.c.resource_id.drop() + sample.c.resource_id_new.alter(name='resource_id') diff --git a/ceilometer/storage/sqlalchemy/models.py b/ceilometer/storage/sqlalchemy/models.py index a494a8bcc..4afae7087 100644 --- a/ceilometer/storage/sqlalchemy/models.py +++ b/ceilometer/storage/sqlalchemy/models.py @@ -16,18 +16,19 @@ """ SQLAlchemy models for Ceilometer data. """ - +import hashlib import json from oslo.utils import timeutils import six from sqlalchemy import (Column, Integer, String, ForeignKey, Index, - UniqueConstraint, BigInteger, join) + UniqueConstraint, BigInteger) +from sqlalchemy import event, select from sqlalchemy import Float, Boolean, Text, DateTime from sqlalchemy.dialects.mysql import DECIMAL from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import backref -from sqlalchemy.orm import column_property +from sqlalchemy.orm import deferred from sqlalchemy.orm import relationship from sqlalchemy.types import TypeDecorator @@ -106,7 +107,7 @@ class MetaText(Base): __table_args__ = ( Index('ix_meta_text_key', 'meta_key'), ) - id = Column(Integer, ForeignKey('sample.id'), primary_key=True) + id = Column(Integer, ForeignKey('resource.internal_id'), primary_key=True) meta_key = Column(String(255), primary_key=True) value = Column(Text) @@ -118,7 +119,7 @@ class MetaBool(Base): __table_args__ = ( Index('ix_meta_bool_key', 'meta_key'), ) - id = Column(Integer, ForeignKey('sample.id'), primary_key=True) + id = Column(Integer, ForeignKey('resource.internal_id'), primary_key=True) meta_key = Column(String(255), primary_key=True) value = Column(Boolean) @@ -130,7 +131,7 @@ class MetaBigInt(Base): __table_args__ = ( Index('ix_meta_int_key', 'meta_key'), ) - id = Column(Integer, ForeignKey('sample.id'), primary_key=True) + id = Column(Integer, ForeignKey('resource.internal_id'), primary_key=True) meta_key = Column(String(255), primary_key=True) value = Column(BigInteger, default=False) @@ -142,7 +143,7 @@ class MetaFloat(Base): __table_args__ = ( Index('ix_meta_float_key', 'meta_key'), ) - id = Column(Integer, ForeignKey('sample.id'), primary_key=True) + id = Column(Integer, ForeignKey('resource.internal_id'), primary_key=True) meta_key = Column(String(255), primary_key=True) value = Column(Float(53), default=False) @@ -153,7 +154,7 @@ class Meter(Base): __tablename__ = 'meter' __table_args__ = ( UniqueConstraint('name', 'type', 'unit', name='def_unique'), - Index('ix_meter_name', 'name') + Index('ix_meter_name', 'name'), ) id = Column(Integer, primary_key=True) name = Column(String(255), nullable=False) @@ -162,55 +163,88 @@ class Meter(Base): samples = relationship("Sample", backref="meter") +class Resource(Base): + """Resource data.""" + + __tablename__ = 'resource' + __table_args__ = ( + # TODO(gordc): this should exist but the attribute values we set + # for user/project/source/resource id's are too large + # for a uuid. + # UniqueConstraint('resource_id', 'user_id', 'project_id', + # 'source_id', 'metadata_hash', + # name='res_def_unique'), + Index('ix_resource_resource_id', 'resource_id'), + ) + + internal_id = Column(Integer, primary_key=True) + user_id = Column(String(255)) + project_id = Column(String(255)) + source_id = Column(String(255)) + resource_id = Column(String(255), nullable=False) + resource_metadata = deferred(Column(JSONEncodedDict())) + metadata_hash = deferred(Column(String(32))) + samples = relationship("Sample", backref="resource") + meta_text = relationship("MetaText", backref="resource", + cascade="all, delete-orphan") + meta_float = relationship("MetaFloat", backref="resource", + cascade="all, delete-orphan") + meta_int = relationship("MetaBigInt", backref="resource", + cascade="all, delete-orphan") + meta_bool = relationship("MetaBool", backref="resource", + cascade="all, delete-orphan") + + +@event.listens_for(Resource, "before_insert") +def before_insert(mapper, connection, target): + metadata = json.dumps(target.resource_metadata, sort_keys=True) + target.metadata_hash = hashlib.md5(metadata).hexdigest() + + class Sample(Base): """Metering data.""" __tablename__ = 'sample' __table_args__ = ( Index('ix_sample_timestamp', 'timestamp'), - Index('ix_sample_user_id', 'user_id'), - Index('ix_sample_project_id', 'project_id'), - Index('ix_sample_meter_id', 'meter_id') + Index('ix_sample_resource_id', 'resource_id'), + Index('ix_sample_meter_id', 'meter_id'), + Index('ix_sample_meter_id_resource_id', 'meter_id', 'resource_id') ) id = Column(Integer, primary_key=True) meter_id = Column(Integer, ForeignKey('meter.id')) - user_id = Column(String(255)) - project_id = Column(String(255)) - resource_id = Column(String(255)) - resource_metadata = Column(JSONEncodedDict()) + resource_id = Column(Integer, ForeignKey('resource.internal_id')) volume = Column(Float(53)) timestamp = Column(PreciseTimestamp(), default=lambda: timeutils.utcnow()) recorded_at = Column(PreciseTimestamp(), default=lambda: timeutils.utcnow()) message_signature = Column(String(1000)) message_id = Column(String(1000)) - source_id = Column(String(255)) - meta_text = relationship("MetaText", backref="sample", - cascade="all, delete-orphan") - meta_float = relationship("MetaFloat", backref="sample", - cascade="all, delete-orphan") - meta_int = relationship("MetaBigInt", backref="sample", - cascade="all, delete-orphan") - meta_bool = relationship("MetaBool", backref="sample", - cascade="all, delete-orphan") -class MeterSample(Base): - """Helper model. +class FullSample(Base): + """Mapper model. - It's needed as many of the filters work against Sample data joined with - Meter data. + It's needed as many of the filters work against raw data which is split + between Meter, Sample, and Resource tables """ meter = Meter.__table__ sample = Sample.__table__ - __table__ = join(meter, sample) - - id = column_property(sample.c.id) - meter_id = column_property(meter.c.id, sample.c.meter_id) - counter_name = column_property(meter.c.name) - counter_type = column_property(meter.c.type) - counter_unit = column_property(meter.c.unit) - counter_volume = column_property(sample.c.volume) + resource = Resource.__table__ + __table__ = (select([sample.c.id, meter.c.name.label('counter_name'), + meter.c.type.label('counter_type'), + meter.c.unit.label('counter_unit'), + sample.c.volume.label('counter_volume'), + resource.c.resource_id, resource.c.source_id, + resource.c.user_id, resource.c.project_id, + resource.c.resource_metadata, resource.c.internal_id, + sample.c.timestamp, sample.c.message_id, + sample.c.message_signature, sample.c.recorded_at]) + .select_from( + sample.join(meter, sample.c.meter_id == meter.c.id).join( + resource, + sample.c.resource_id == resource.c.internal_id)) + .alias()) class Alarm(Base): diff --git a/ceilometer/storage/sqlalchemy/utils.py b/ceilometer/storage/sqlalchemy/utils.py index c42918910..c2f7954b2 100644 --- a/ceilometer/storage/sqlalchemy/utils.py +++ b/ceilometer/storage/sqlalchemy/utils.py @@ -85,7 +85,7 @@ class QueryTransformer(object): field_name = field_name[len('resource_metadata.'):] meta_table = META_TYPE_MAP[type(value)] meta_alias = aliased(meta_table) - on_clause = and_(self.table.id == meta_alias.id, + on_clause = and_(self.table.internal_id == meta_alias.id, meta_alias.meta_key == field_name) # outer join is needed to support metaquery # with or operator on non existent metadata field