sample table contains redundant/duplicate data

create a meter table to store meter definition attributes:
- name
- type
- unit

blueprint: meter-table-sql
Closes-Bug: #1211985
Change-Id: I4dd510e66d64226b03cfce8d7dd5e77937793ba3
This commit is contained in:
Gordon Chung 2014-01-08 14:45:05 -05:00
parent 461d13f2fc
commit 604fc8258c
3 changed files with 219 additions and 40 deletions

View File

@ -55,17 +55,22 @@ class SQLAlchemyStorage(base.StorageEngine):
- { id: source id }
- project
- { id: project uuid }
- meter
- meter definition
- { id: meter def id
name: meter name
type: meter type
unit: meter unit
}
- sample
- the raw incoming data
- { id: sample id
counter_name: counter name
meter_id: meter id (->meter.id)
user_id: user uuid (->user.id)
project_id: project uuid (->project.id)
resource_id: resource uuid (->resource.id)
resource_metadata: metadata dictionaries
counter_type: counter type
counter_unit: counter unit
counter_volume: counter volume
volume: sample volume
timestamp: datetime
message_signature: message signature
message_id: message uuid
@ -123,7 +128,7 @@ def apply_metaquery_filter(session, query, metaquery):
meta_q = session.query(_model).\
filter(and_(_model.meta_key == key,
_model.value == v)).subquery()
query = query.filter_by(id=meta_q.c.id)
query = query.filter(models.Sample.id == meta_q.c.id)
return query
@ -138,7 +143,7 @@ def make_query_from_filter(session, query, sample_filter, require_meter=True):
"""
if sample_filter.meter:
query = query.filter(models.Sample.counter_name == sample_filter.meter)
query = query.filter(models.Meter.name == sample_filter.meter)
elif require_meter:
raise RuntimeError(_('Missing required meter specifier'))
if sample_filter.source:
@ -157,13 +162,16 @@ def make_query_from_filter(session, query, sample_filter, require_meter=True):
else:
query = query.filter(models.Sample.timestamp < ts_end)
if sample_filter.user:
query = query.filter_by(user_id=sample_filter.user)
query = query.filter(models.Sample.user_id == sample_filter.user)
if sample_filter.project:
query = query.filter_by(project_id=sample_filter.project)
query = query.filter(
models.Sample.project_id == sample_filter.project)
if sample_filter.resource:
query = query.filter_by(resource_id=sample_filter.resource)
query = query.filter(
models.Sample.resource_id == sample_filter.resource)
if sample_filter.message_id:
query = query.filter_by(message_id=sample_filter.message_id)
query = query.filter(
models.Sample.message_id == sample_filter.message_id)
if sample_filter.metaquery:
query = apply_metaquery_filter(session, query,
@ -250,6 +258,28 @@ class Connection(base.Connection):
setattr(obj, k, kwargs[k])
return obj
@staticmethod
def _create_meter(session, name, type, unit):
try:
nested = session.connection().dialect.name != 'sqlite'
if not nested and session.query(models.Meter)\
.filter(models.Meter.name == name)\
.filter(models.Meter.type == type)\
.filter(models.Meter.unit == unit).count() > 0:
raise dbexc.DBDuplicateEntry()
with session.begin(nested=nested,
subtransactions=not nested):
obj = models.Meter(name=name, type=type, unit=unit)
session.add(obj)
except dbexc.DBDuplicateEntry:
obj = session.query(models.Meter)\
.filter(models.Meter.name == name)\
.filter(models.Meter.type == type)\
.filter(models.Meter.unit == unit).first()
return obj
def record_metering_data(self, data):
"""Write the data to the backend storage system.
@ -272,9 +302,11 @@ class Connection(base.Connection):
resource_metadata=rmetadata)
# Record the raw data for the sample.
sample = models.Sample(counter_type=data['counter_type'],
counter_unit=data['counter_unit'],
counter_name=data['counter_name'],
meter = self._create_meter(session,
data['counter_name'],
data['counter_type'],
data['counter_unit'])
sample = models.Sample(meter_id=meter.id,
resource=resource)
session.add(sample)
if not filter(lambda x: x.id == source.id, sample.sources):
@ -283,7 +315,7 @@ class Connection(base.Connection):
sample.user = user
sample.timestamp = data['timestamp']
sample.resource_metadata = rmetadata
sample.counter_volume = data['counter_volume']
sample.volume = data['counter_volume']
sample.message_signature = data['message_signature']
sample.message_id = data['message_id']
session.flush()
@ -466,7 +498,7 @@ class Connection(base.Connection):
last_sample_timestamp=last_ts,
source=sample.sources[0].id,
user_id=sample.user_id,
metadata=sample.resource_metadata,
metadata=sample.resource_metadata
)
def get_meters(self, user=None, project=None, resource=None, source=None,
@ -491,31 +523,34 @@ class Connection(base.Connection):
# subquery_sample is used to reduce sample records
# by selecting a record for each (resource_id, counter_name).
# max() is used to choice a sample record, so the latest record
# is selected for each (resource_id, counter_name).
#
# is selected for each (resource_id, meter.name).
subquery_sample = session.query(
func.max(models.Sample.id).label('id'))\
.join(models.Meter)\
.group_by(models.Sample.resource_id,
models.Sample.counter_name).subquery()
models.Meter.name).subquery()
# The SQL of query_sample is essentially:
#
# SELECT sample.* FROM sample INNER JOIN
# (SELECT max(sample.id) AS id FROM sample
# GROUP BY sample.resource_id, sample.counter_name) AS anon_2
# GROUP BY sample.resource_id, meter.name) AS anon_2
# ON sample.id = anon_2.id
#
query_sample = session.query(models.Sample).\
join(subquery_sample, models.Sample.id == subquery_sample.c.id)
query_sample = session.query(models.MeterSample).\
join(subquery_sample,
models.MeterSample.id == subquery_sample.c.id)
if metaquery:
query_sample = apply_metaquery_filter(session,
query_sample,
metaquery)
alias_sample = aliased(models.Sample, query_sample.subquery())
query = session.query(models.Resource, alias_sample).join(
alias_sample, models.Resource.id == alias_sample.resource_id)
alias_sample = aliased(models.MeterSample,
query_sample.with_labels().subquery())
query = session.query(models.Resource, alias_sample)\
.filter(models.Resource.id == alias_sample.resource_id)
if user is not None:
query = query.filter(models.Resource.user_id == user)
@ -577,7 +612,7 @@ class Connection(base.Connection):
if limit == 0:
return []
table = models.Sample
table = models.MeterSample
session = self._get_db_session()
query = session.query(table)
query = make_query_from_filter(session, query, sample_filter,
@ -606,7 +641,7 @@ class Connection(base.Connection):
limit,
table)
retrieve = {models.Sample: self._retrieve_samples,
retrieve = {models.MeterSample: self._retrieve_samples,
models.Alarm: self._retrieve_alarms,
models.AlarmChange: self._retrieve_alarm_history}
return retrieve[table](query)
@ -615,7 +650,7 @@ class Connection(base.Connection):
return self._retrieve_data(filter_expr,
orderby,
limit,
models.Sample)
models.MeterSample)
def _transform_expression(self, expression_tree, table):
@ -648,14 +683,14 @@ class Connection(base.Connection):
def _make_stats_query(self, sample_filter, groupby):
select = [
models.Sample.counter_unit.label('unit'),
models.Meter.unit,
func.min(models.Sample.timestamp).label('tsmin'),
func.max(models.Sample.timestamp).label('tsmax'),
func.avg(models.Sample.counter_volume).label('avg'),
func.sum(models.Sample.counter_volume).label('sum'),
func.min(models.Sample.counter_volume).label('min'),
func.max(models.Sample.counter_volume).label('max'),
func.count(models.Sample.counter_volume).label('count'),
func.avg(models.Sample.volume).label('avg'),
func.sum(models.Sample.volume).label('sum'),
func.min(models.Sample.volume).label('min'),
func.max(models.Sample.volume).label('max'),
func.count(models.Sample.volume).label('count')
]
session = self._get_db_session()

View File

@ -0,0 +1,115 @@
#
# Copyright 2013 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import migrate
import sqlalchemy as sa
def handle_rid_index(meta, downgrade=False):
if meta.bind.engine.name == 'sqlite':
return
resource = sa.Table('resource', meta, autoload=True)
sample = sa.Table('sample', meta, autoload=True)
params = {'columns': [sample.c.resource_id],
'refcolumns': [resource.c.id],
'name': 'fk_sample_resource_id'}
if meta.bind.engine.name == 'mysql':
# For mysql dialect all dependent FK should be removed
# before index create/delete
migrate.ForeignKeyConstraint(**params).drop()
index = sa.Index('idx_sample_rid_cname', sample.c.resource_id,
sample.c.counter_name)
index.create() if downgrade else index.drop()
if meta.bind.engine.name == 'mysql':
migrate.ForeignKeyConstraint(**params).create()
def upgrade(migrate_engine):
meta = sa.MetaData(bind=migrate_engine)
meter = sa.Table(
'meter', meta,
sa.Column('id', sa.Integer, primary_key=True),
sa.Column('name', sa.String(255), nullable=False),
sa.Column('type', sa.String(255)),
sa.Column('unit', sa.String(255)),
sa.UniqueConstraint('name', 'type', 'unit', name='def_unique'),
mysql_engine='InnoDB',
mysql_charset='utf8'
)
meter.create()
sample = sa.Table('sample', meta, autoload=True)
query = sa.select([sample.c.counter_name, sample.c.counter_type,
sample.c.counter_unit]).distinct()
for row in query.execute():
meter.insert().values(name=row['counter_name'],
type=row['counter_type'],
unit=row['counter_unit']).execute()
meter_id = sa.Column('meter_id', sa.Integer)
meter_id.create(sample)
params = {'columns': [sample.c.meter_id],
'refcolumns': [meter.c.id]}
if migrate_engine.name == 'mysql':
params['name'] = 'fk_sample_meter_id'
if migrate_engine.name != 'sqlite':
migrate.ForeignKeyConstraint(**params).create()
index = sa.Index('ix_meter_name', meter.c.name)
index.create(bind=migrate_engine)
for row in sa.select([meter]).execute():
sample.update()\
.where(sa.and_(sample.c.counter_name == row['name'],
sample.c.counter_type == row['type'],
sample.c.counter_unit == row['unit']))\
.values({sample.c.meter_id: row['id']}).execute()
handle_rid_index(meta)
sample.c.counter_name.drop()
sample.c.counter_type.drop()
sample.c.counter_unit.drop()
sample.c.counter_volume.alter(name='volume')
def downgrade(migrate_engine):
meta = sa.MetaData(bind=migrate_engine)
sample = sa.Table('sample', meta, autoload=True)
sample.c.volume.alter(name='counter_volume')
sa.Column('counter_name', sa.String(255)).create(sample)
sa.Column('counter_type', sa.String(255)).create(sample)
sa.Column('counter_unit', sa.String(255)).create(sample)
meter = sa.Table('meter', meta, autoload=True)
for row in sa.select([meter]).execute():
sample.update()\
.where(sample.c.meter_id == row['id'])\
.values({sample.c.counter_name: row['name'],
sample.c.counter_type: row['type'],
sample.c.counter_unit: row['unit']}).execute()
params = {'columns': [sample.c.meter_id],
'refcolumns': [meter.c.id]}
if migrate_engine.name == 'mysql':
params['name'] = 'fk_sample_meter_id'
if migrate_engine.name != 'sqlite':
migrate.ForeignKeyConstraint(**params).drop()
handle_rid_index(meta, True)
sample.c.meter_id.drop()
meter.drop()

View File

@ -23,11 +23,12 @@ import six.moves.urllib.parse as urlparse
from oslo.config import cfg
from sqlalchemy import Column, Integer, String, Table, ForeignKey, \
Index, UniqueConstraint, BigInteger
Index, UniqueConstraint, BigInteger, join
from sqlalchemy import Float, Boolean, Text, DateTime
from sqlalchemy.dialects.mysql import DECIMAL
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import backref
from sqlalchemy.orm import column_property
from sqlalchemy.orm import relationship
from sqlalchemy.types import TypeDecorator
@ -189,6 +190,20 @@ class MetaFloat(Base):
value = Column(Float(53), default=False)
class Meter(Base):
"""Meter definition data."""
__tablename__ = 'meter'
__table_args__ = (
UniqueConstraint('name', 'type', 'unit', name='def_unique'),
Index('ix_meter_name', 'name')
)
id = Column(Integer, primary_key=True)
name = Column(String(255), nullable=False)
type = Column(String(255))
unit = Column(String(255))
class Sample(Base):
"""Metering data."""
@ -197,17 +212,14 @@ class Sample(Base):
Index('ix_sample_timestamp', 'timestamp'),
Index('ix_sample_user_id', 'user_id'),
Index('ix_sample_project_id', 'project_id'),
Index('idx_sample_rid_cname', 'resource_id', 'counter_name'),
)
id = Column(Integer, primary_key=True)
counter_name = Column(String(255))
meter_id = Column(Integer, ForeignKey('meter.id'))
user_id = Column(String(255), ForeignKey('user.id'))
project_id = Column(String(255), ForeignKey('project.id'))
resource_id = Column(String(255), ForeignKey('resource.id'))
resource_metadata = Column(JSONEncodedDict())
counter_type = Column(String(255))
counter_unit = Column(String(255))
counter_volume = Column(Float(53))
volume = Column(Float(53))
timestamp = Column(PreciseTimestamp(), default=timeutils.utcnow)
recorded_at = Column(PreciseTimestamp(), default=timeutils.utcnow)
message_signature = Column(String(1000))
@ -223,6 +235,23 @@ class Sample(Base):
cascade="all, delete-orphan")
class MeterSample(Base):
"""Helper model as many of the filters work against Sample data
joined with Meter data.
"""
meter = Meter.__table__
sample = Sample.__table__
__table__ = join(meter, sample)
id = column_property(sample.c.id)
meter_id = column_property(meter.c.id, sample.c.meter_id)
counter_name = column_property(meter.c.name)
counter_type = column_property(meter.c.type)
counter_unit = column_property(meter.c.unit)
counter_volume = column_property(sample.c.volume)
sources = relationship("Source", secondary=lambda: sourceassoc)
class User(Base):
__tablename__ = 'user'
id = Column(String(255), primary_key=True)