Merge "normalise resource data"

This commit is contained in:
Jenkins 2014-09-09 18:02:00 +00:00 committed by Gerrit Code Review
commit fbe05d8900
4 changed files with 407 additions and 123 deletions

View File

@ -18,6 +18,7 @@
from __future__ import absolute_import from __future__ import absolute_import
import datetime import datetime
import hashlib
import operator import operator
import os import os
@ -33,6 +34,7 @@ from sqlalchemy import func
from sqlalchemy.orm import aliased from sqlalchemy.orm import aliased
from ceilometer.openstack.common.gettextutils import _ from ceilometer.openstack.common.gettextutils import _
from ceilometer.openstack.common import jsonutils
from ceilometer.openstack.common import log from ceilometer.openstack.common import log
from ceilometer import storage from ceilometer import storage
from ceilometer.storage import base from ceilometer.storage import base
@ -62,7 +64,7 @@ PARAMETERIZED_AGGREGATES = dict(
), ),
compute=dict( compute=dict(
cardinality=lambda p: func.count( cardinality=lambda p: func.count(
distinct(getattr(models.Sample, p)) distinct(getattr(models.Resource, p))
).label('cardinality/%s' % p) ).label('cardinality/%s' % p)
) )
) )
@ -116,7 +118,7 @@ def apply_metaquery_filter(session, query, metaquery):
{"key": k, "value": type(value)}) {"key": k, "value": type(value)})
else: else:
meta_alias = aliased(_model) meta_alias = aliased(_model)
on_clause = and_(models.Sample.id == meta_alias.id, on_clause = and_(models.Resource.internal_id == meta_alias.id,
meta_alias.meta_key == key) meta_alias.meta_key == key)
# outer join is needed to support metaquery # outer join is needed to support metaquery
# with or operator on non existent metadata field # with or operator on non existent metadata field
@ -144,7 +146,7 @@ def make_query_from_filter(session, query, sample_filter, require_meter=True):
raise RuntimeError('Missing required meter specifier') raise RuntimeError('Missing required meter specifier')
if sample_filter.source: if sample_filter.source:
query = query.filter( query = query.filter(
models.Sample.source_id == sample_filter.source) models.Resource.source_id == sample_filter.source)
if sample_filter.start: if sample_filter.start:
ts_start = sample_filter.start ts_start = sample_filter.start
if sample_filter.start_timestamp_op == 'gt': if sample_filter.start_timestamp_op == 'gt':
@ -158,13 +160,13 @@ def make_query_from_filter(session, query, sample_filter, require_meter=True):
else: else:
query = query.filter(models.Sample.timestamp < ts_end) query = query.filter(models.Sample.timestamp < ts_end)
if sample_filter.user: if sample_filter.user:
query = query.filter(models.Sample.user_id == sample_filter.user) query = query.filter(models.Resource.user_id == sample_filter.user)
if sample_filter.project: if sample_filter.project:
query = query.filter( query = query.filter(
models.Sample.project_id == sample_filter.project) models.Resource.project_id == sample_filter.project)
if sample_filter.resource: if sample_filter.resource:
query = query.filter( query = query.filter(
models.Sample.resource_id == sample_filter.resource) models.Resource.resource_id == sample_filter.resource)
if sample_filter.message_id: if sample_filter.message_id:
query = query.filter( query = query.filter(
models.Sample.message_id == sample_filter.message_id) models.Sample.message_id == sample_filter.message_id)
@ -183,22 +185,29 @@ class Connection(base.Connection):
- meter - meter
- meter definition - meter definition
- { id: meter def id - { id: meter id
name: meter name name: meter name
type: meter type type: meter type
unit: meter unit unit: meter unit
} }
- resource
- resource definition
- { internal_id: resource id
resource_id: resource uuid
user_id: user uuid
project_id: project uuid
source_id: source id
resource_metadata: metadata dictionary
metadata_hash: metadata dictionary hash
}
- sample - sample
- the raw incoming data - the raw incoming data
- { id: sample id - { id: sample id
meter_id: meter id (->meter.id) meter_id: meter id (->meter.id)
user_id: user uuid resource_id: resource id (->resource.internal_id)
project_id: project uuid
resource_id: resource uuid
source_id: source id
resource_metadata: metadata dictionaries
volume: sample volume volume: sample volume
timestamp: datetime timestamp: datetime
recorded_at: datetime
message_signature: message signature message_signature: message signature
message_id: message uuid message_id: message uuid
} }
@ -230,6 +239,7 @@ class Connection(base.Connection):
@staticmethod @staticmethod
def _create_meter(session, name, type, unit): def _create_meter(session, name, type, unit):
# TODO(gordc): implement lru_cache to improve performance
try: try:
nested = session.connection().dialect.name != 'sqlite' nested = session.connection().dialect.name != 'sqlite'
with session.begin(nested=nested, with session.begin(nested=nested,
@ -247,6 +257,55 @@ class Connection(base.Connection):
return obj return obj
@staticmethod
def _create_resource(session, res_id, user_id, project_id, source_id,
rmeta):
# TODO(gordc): implement lru_cache to improve performance
try:
nested = session.connection().dialect.name != 'sqlite'
m_hash = jsonutils.dumps(rmeta, sort_keys=True)
with session.begin(nested=nested,
subtransactions=not nested):
obj = (session.query(models.Resource.internal_id)
.filter(models.Resource.resource_id == res_id)
.filter(models.Resource.user_id == user_id)
.filter(models.Resource.project_id == project_id)
.filter(models.Resource.source_id == source_id)
.filter(models.Resource.metadata_hash ==
hashlib.md5(m_hash).hexdigest()).first())
obj_id = obj[0] if obj else None
if obj_id is None:
obj = models.Resource(resource_id=res_id, user_id=user_id,
project_id=project_id,
source_id=source_id,
resource_metadata=rmeta)
session.add(obj)
session.flush()
obj_id = obj.internal_id
if rmeta and isinstance(rmeta, dict):
meta_map = {}
for key, v in utils.dict_to_keyval(rmeta):
try:
_model = sql_utils.META_TYPE_MAP[type(v)]
if meta_map.get(_model) is None:
meta_map[_model] = []
meta_map[_model].append(
{'id': obj_id, 'meta_key': key,
'value': v})
except KeyError:
LOG.warn(_("Unknown metadata type. Key (%s) "
"will not be queryable."), key)
for _model in meta_map.keys():
session.execute(_model.__table__.insert(),
meta_map[_model])
except dbexc.DBDuplicateEntry:
# retry function to pick up duplicate committed object
obj_id = Connection._create_resource(session, res_id, user_id,
project_id, source_id, rmeta)
return obj_id
def record_metering_data(self, data): def record_metering_data(self, data):
"""Write the data to the backend storage system. """Write the data to the backend storage system.
@ -256,36 +315,24 @@ class Connection(base.Connection):
session = self._engine_facade.get_session() session = self._engine_facade.get_session()
with session.begin(): with session.begin():
# Record the raw data for the sample. # Record the raw data for the sample.
rmetadata = data['resource_metadata']
meter = self._create_meter(session, meter = self._create_meter(session,
data['counter_name'], data['counter_name'],
data['counter_type'], data['counter_type'],
data['counter_unit']) data['counter_unit'])
sample = models.Sample(meter_id=meter.id) res_id = self._create_resource(session,
data['resource_id'],
data['user_id'],
data['project_id'],
data['source'],
data['resource_metadata'])
sample = models.Sample(
meter_id=meter.id,
resource_id=res_id,
timestamp=data['timestamp'],
volume=data['counter_volume'],
message_signature=data['message_signature'],
message_id=data['message_id'])
session.add(sample) session.add(sample)
sample.resource_id = data['resource_id']
sample.project_id = data['project_id']
sample.user_id = data['user_id']
sample.timestamp = data['timestamp']
sample.resource_metadata = rmetadata
sample.volume = data['counter_volume']
sample.message_signature = data['message_signature']
sample.message_id = data['message_id']
sample.source_id = data['source']
session.flush()
if rmetadata:
if isinstance(rmetadata, dict):
for key, v in utils.dict_to_keyval(rmetadata):
try:
_model = sql_utils.META_TYPE_MAP[type(v)]
except KeyError:
LOG.warn(_("Unknown metadata type. Key (%s) will "
"not be queryable."), key)
else:
session.add(_model(id=sample.id,
meta_key=key,
value=v))
def clear_expired_metering_data(self, ttl): def clear_expired_metering_data(self, ttl):
"""Clear expired data from the backend storage system. """Clear expired data from the backend storage system.
@ -312,6 +359,9 @@ class Connection(base.Connection):
(session.query(models.Meter) (session.query(models.Meter)
.filter(~models.Meter.samples.any()) .filter(~models.Meter.samples.any())
.delete(synchronize_session='fetch')) .delete(synchronize_session='fetch'))
(session.query(models.Resource)
.filter(~models.Resource.samples.any())
.delete(synchronize_session='fetch'))
LOG.info(_("%d samples removed from database"), rows) LOG.info(_("%d samples removed from database"), rows)
def get_resources(self, user=None, project=None, source=None, def get_resources(self, user=None, project=None, source=None,
@ -346,14 +396,19 @@ class Connection(base.Connection):
session = self._engine_facade.get_session() session = self._engine_facade.get_session()
# get list of resource_ids # get list of resource_ids
res_q = session.query(distinct(models.Sample.resource_id)) res_q = session.query(distinct(models.Resource.resource_id)).join(
models.Sample,
models.Sample.resource_id == models.Resource.internal_id)
res_q = make_query_from_filter(session, res_q, s_filter, res_q = make_query_from_filter(session, res_q, s_filter,
require_meter=False) require_meter=False)
for res_id in res_q.all(): for res_id in res_q.all():
# get latest Sample # get latest Sample
max_q = (session.query(models.Sample) max_q = (session.query(models.Sample)
.filter(models.Sample.resource_id == res_id[0])) .join(models.Resource,
models.Resource.internal_id ==
models.Sample.resource_id)
.filter(models.Resource.resource_id == res_id[0]))
max_q = make_query_from_filter(session, max_q, s_filter, max_q = make_query_from_filter(session, max_q, s_filter,
require_meter=False) require_meter=False)
max_q = max_q.order_by(models.Sample.timestamp.desc(), max_q = max_q.order_by(models.Sample.timestamp.desc(),
@ -361,7 +416,10 @@ class Connection(base.Connection):
# get the min timestamp value. # get the min timestamp value.
min_q = (session.query(models.Sample.timestamp) min_q = (session.query(models.Sample.timestamp)
.filter(models.Sample.resource_id == res_id[0])) .join(models.Resource,
models.Resource.internal_id ==
models.Sample.resource_id)
.filter(models.Resource.resource_id == res_id[0]))
min_q = make_query_from_filter(session, min_q, s_filter, min_q = make_query_from_filter(session, min_q, s_filter,
require_meter=False) require_meter=False)
min_q = min_q.order_by(models.Sample.timestamp.asc()).limit(1) min_q = min_q.order_by(models.Sample.timestamp.asc()).limit(1)
@ -369,13 +427,13 @@ class Connection(base.Connection):
sample = max_q.first() sample = max_q.first()
if sample: if sample:
yield api_models.Resource( yield api_models.Resource(
resource_id=sample.resource_id, resource_id=sample.resource.resource_id,
project_id=sample.project_id, project_id=sample.resource.project_id,
first_sample_timestamp=min_q.first().timestamp, first_sample_timestamp=min_q.first().timestamp,
last_sample_timestamp=sample.timestamp, last_sample_timestamp=sample.timestamp,
source=sample.source_id, source=sample.resource.source_id,
user_id=sample.user_id, user_id=sample.resource.user_id,
metadata=sample.resource_metadata metadata=sample.resource.resource_metadata
) )
def get_meters(self, user=None, project=None, resource=None, source=None, def get_meters(self, user=None, project=None, resource=None, source=None,
@ -399,37 +457,41 @@ class Connection(base.Connection):
metaquery=metaquery, metaquery=metaquery,
resource=resource) resource=resource)
# NOTE(gordc): get latest sample of each meter/resource. we do not
# filter here as we want to filter only on latest record.
session = self._engine_facade.get_session() session = self._engine_facade.get_session()
subq = session.query(func.max(models.Sample.id).label('id')).join(
models.Resource,
models.Resource.internal_id == models.Sample.resource_id).group_by(
models.Sample.meter_id, models.Resource.resource_id)
if resource:
subq = subq.filter(models.Resource.resource_id == resource)
subq = subq.subquery()
# sample_subq is used to reduce sample records # get meter details for samples.
# by selecting a record for each (resource_id, meter_id). query_sample = (session.query(models.Sample.meter_id,
# max() is used to choice a sample record, so the latest record models.Meter.name, models.Meter.type,
# is selected for each (resource_id, meter_id). models.Meter.unit,
sample_subq = (session.query( models.Resource.resource_id,
func.max(models.Sample.id).label('id')) models.Resource.project_id,
.group_by(models.Sample.meter_id, models.Resource.source_id,
models.Sample.resource_id)) models.Resource.user_id).join(
sample_subq = sample_subq.subquery() subq, subq.c.id == models.Sample.id)
.join(models.Meter, models.Meter.id == models.Sample.meter_id)
# SELECT sample.* FROM sample INNER JOIN .join(models.Resource,
# (SELECT max(sample.id) AS id FROM sample models.Resource.internal_id == models.Sample.resource_id))
# GROUP BY sample.resource_id, sample.meter_id) AS anon_2
# ON sample.id = anon_2.id
query_sample = (session.query(models.MeterSample).
join(sample_subq, models.MeterSample.id ==
sample_subq.c.id))
query_sample = make_query_from_filter(session, query_sample, s_filter, query_sample = make_query_from_filter(session, query_sample, s_filter,
require_meter=False) require_meter=False)
for sample in query_sample.all(): for row in query_sample.all():
yield api_models.Meter( yield api_models.Meter(
name=sample.counter_name, name=row.name,
type=sample.counter_type, type=row.type,
unit=sample.counter_unit, unit=row.unit,
resource_id=sample.resource_id, resource_id=row.resource_id,
project_id=sample.project_id, project_id=row.project_id,
source=sample.source_id, source=row.source_id,
user_id=sample.user_id) user_id=row.user_id)
def _retrieve_samples(self, query): def _retrieve_samples(self, query):
samples = query.all() samples = query.all()
@ -463,28 +525,41 @@ class Connection(base.Connection):
if limit == 0: if limit == 0:
return [] return []
table = models.MeterSample
session = self._engine_facade.get_session() session = self._engine_facade.get_session()
query = session.query(table) query = session.query(models.Sample.timestamp,
models.Sample.recorded_at,
models.Sample.message_id,
models.Sample.message_signature,
models.Sample.volume.label('counter_volume'),
models.Meter.name.label('counter_name'),
models.Meter.type.label('counter_type'),
models.Meter.unit.label('counter_unit'),
models.Resource.source_id,
models.Resource.user_id,
models.Resource.project_id,
models.Resource.resource_metadata,
models.Resource.resource_id).join(
models.Meter, models.Meter.id == models.Sample.meter_id).join(
models.Resource,
models.Resource.internal_id == models.Sample.resource_id).order_by(
models.Sample.timestamp.desc())
query = make_query_from_filter(session, query, sample_filter, query = make_query_from_filter(session, query, sample_filter,
require_meter=False) require_meter=False)
transformer = sql_utils.QueryTransformer(table, query) if limit:
transformer.apply_options(None, query = query.limit(limit)
limit) return self._retrieve_samples(query)
return self._retrieve_samples(transformer.get_query())
def query_samples(self, filter_expr=None, orderby=None, limit=None): def query_samples(self, filter_expr=None, orderby=None, limit=None):
if limit == 0: if limit == 0:
return [] return []
session = self._engine_facade.get_session() session = self._engine_facade.get_session()
query = session.query(models.MeterSample) query = session.query(models.FullSample)
transformer = sql_utils.QueryTransformer(models.MeterSample, query) transformer = sql_utils.QueryTransformer(models.FullSample, query)
if filter_expr is not None: if filter_expr is not None:
transformer.apply_filter(filter_expr) transformer.apply_filter(filter_expr)
transformer.apply_options(orderby, transformer.apply_options(orderby, limit)
limit)
return self._retrieve_samples(transformer.get_query()) return self._retrieve_samples(transformer.get_query())
@staticmethod @staticmethod
@ -515,22 +590,25 @@ class Connection(base.Connection):
def _make_stats_query(self, sample_filter, groupby, aggregate): def _make_stats_query(self, sample_filter, groupby, aggregate):
select = [ select = [
models.Meter.unit,
func.min(models.Sample.timestamp).label('tsmin'), func.min(models.Sample.timestamp).label('tsmin'),
func.max(models.Sample.timestamp).label('tsmax'), func.max(models.Sample.timestamp).label('tsmax'),
models.Meter.unit
] ]
select.extend(self._get_aggregate_functions(aggregate)) select.extend(self._get_aggregate_functions(aggregate))
session = self._engine_facade.get_session() session = self._engine_facade.get_session()
if groupby: if groupby:
group_attributes = [getattr(models.Sample, g) for g in groupby] group_attributes = [getattr(models.Resource, g) for g in groupby]
select.extend(group_attributes) select.extend(group_attributes)
query = (session.query(*select).join( query = (session.query(*select)
models.Sample, models.Meter.id == models.Sample.meter_id). .join(models.Meter,
group_by(models.Meter.unit)) models.Meter.id == models.Sample.meter_id)
.join(
models.Resource,
models.Resource.internal_id == models.Sample.resource_id)
.group_by(models.Meter.unit))
if groupby: if groupby:
query = query.group_by(*group_attributes) query = query.group_by(*group_attributes)

View File

@ -0,0 +1,172 @@
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import hashlib
import migrate
import sqlalchemy as sa
from ceilometer.openstack.common import jsonutils
m_tables = [('metadata_text', sa.Text, True),
('metadata_bool', sa.Boolean, False),
('metadata_int', sa.BigInteger, False),
('metadata_float', sa.Float(53), False)]
def _migrate_meta_tables(meta, col, new_col, new_fk):
for t_name, t_type, t_nullable in m_tables:
m_table = sa.Table(t_name, meta, autoload=True)
m_table_new = sa.Table(
'%s_new' % t_name, meta,
sa.Column('id', sa.Integer, sa.ForeignKey(new_fk),
primary_key=True),
sa.Column('meta_key', sa.String(255),
primary_key=True),
sa.Column('value', t_type, nullable=t_nullable),
mysql_engine='InnoDB',
mysql_charset='utf8',
)
m_table_new.create()
if m_table.select().scalar() is not None:
m_table_new.insert().from_select(
['id', 'meta_key', 'value'],
sa.select([new_col, m_table.c.meta_key,
m_table.c.value]).where(
col == m_table.c.id).group_by(
new_col, m_table.c.meta_key, m_table.c.value)).execute()
m_table.drop()
if meta.bind.engine.name != 'sqlite':
sa.Index('ix_%s_meta_key' % t_name,
m_table_new.c.meta_key).create()
m_table_new.rename(t_name)
def upgrade(migrate_engine):
meta = sa.MetaData(bind=migrate_engine)
resource = sa.Table(
'resource', meta,
sa.Column('internal_id', sa.Integer, primary_key=True),
sa.Column('resource_id', sa.String(255)),
sa.Column('user_id', sa.String(255)),
sa.Column('project_id', sa.String(255)),
sa.Column('source_id', sa.String(255)),
sa.Column('resource_metadata', sa.Text),
sa.Column('metadata_hash', sa.String(32)),
mysql_engine='InnoDB',
mysql_charset='utf8')
resource.create()
# copy resource data in to resource table
sample = sa.Table('sample', meta, autoload=True)
sa.Column('metadata_hash', sa.String(32)).create(sample)
for row in sa.select([sample.c.id, sample.c.resource_metadata]).execute():
sample.update().where(sample.c.id == row['id']).values(
{sample.c.metadata_hash:
hashlib.md5(jsonutils.dumps(
row['resource_metadata'],
sort_keys=True)).hexdigest()}).execute()
query = sa.select([sample.c.resource_id, sample.c.user_id,
sample.c.project_id, sample.c.source_id,
sample.c.resource_metadata,
sample.c.metadata_hash]).distinct()
for row in query.execute():
resource.insert().values(
resource_id=row['resource_id'],
user_id=row['user_id'],
project_id=row['project_id'],
source_id=row['source_id'],
resource_metadata=row['resource_metadata'],
metadata_hash=row['metadata_hash']).execute()
# link sample records to new resource records
sa.Column('resource_id_new', sa.Integer).create(sample)
for row in sa.select([resource]).execute():
(sample.update().
where(sa.and_(
sample.c.resource_id == row['resource_id'],
sample.c.user_id == row['user_id'],
sample.c.project_id == row['project_id'],
sample.c.source_id == row['source_id'],
sample.c.metadata_hash == row['metadata_hash'])).
values({sample.c.resource_id_new: row['internal_id']}).execute())
sample.c.resource_id.drop()
sample.c.metadata_hash.drop()
sample.c.resource_id_new.alter(name='resource_id')
# re-bind metadata to pick up alter name change
meta = sa.MetaData(bind=migrate_engine)
sample = sa.Table('sample', meta, autoload=True)
resource = sa.Table('resource', meta, autoload=True)
if migrate_engine.name != 'sqlite':
sa.Index('ix_resource_resource_id', resource.c.resource_id).create()
sa.Index('ix_sample_user_id', sample.c.user_id).drop()
sa.Index('ix_sample_project_id', sample.c.project_id).drop()
sa.Index('ix_sample_resource_id', sample.c.resource_id).create()
sa.Index('ix_sample_meter_id_resource_id',
sample.c.meter_id, sample.c.resource_id).create()
params = {'columns': [sample.c.resource_id],
'refcolumns': [resource.c.internal_id]}
if migrate_engine.name == 'mysql':
params['name'] = 'fk_sample_resource_internal_id'
migrate.ForeignKeyConstraint(**params).create()
sample.c.user_id.drop()
sample.c.project_id.drop()
sample.c.source_id.drop()
sample.c.resource_metadata.drop()
_migrate_meta_tables(meta, sample.c.id, sample.c.resource_id,
'resource.internal_id')
def downgrade(migrate_engine):
meta = sa.MetaData(bind=migrate_engine)
sample = sa.Table('sample', meta, autoload=True)
_migrate_meta_tables(meta, sample.c.resource_id, sample.c.id,
'sample.id')
sa.Column('user_id', sa.String(255)).create(sample)
sa.Column('project_id', sa.String(255)).create(sample)
sa.Column('source_id', sa.String(255)).create(sample)
sa.Column('resource_id_new', sa.String(255)).create(sample)
sa.Column('resource_metadata', sa.Text).create(sample)
resource = sa.Table('resource', meta, autoload=True)
for row in sa.select([resource]).execute():
(sample.update().
where(sample.c.resource_id == row['internal_id']).
values({sample.c.resource_id_new: row['resource_id'],
sample.c.user_id: row['user_id'],
sample.c.project_id: row['project_id'],
sample.c.source_id: row['source_id'],
sample.c.resource_metadata: row['resource_metadata']})
.execute())
if migrate_engine.name != 'sqlite':
params = {'columns': [sample.c.resource_id],
'refcolumns': [resource.c.internal_id]}
if migrate_engine.name == 'mysql':
params['name'] = 'fk_sample_resource_internal_id'
migrate.ForeignKeyConstraint(**params).drop()
sa.Index('ix_sample_meter_id_resource_id',
sample.c.meter_id, sample.c.resource_id).drop()
sa.Index('ix_sample_resource_id', sample.c.resource_id).drop()
sa.Index('ix_sample_user_id', sample.c.user_id).create()
sa.Index('ix_sample_project_id', sample.c.project_id).create()
resource.drop()
sample.c.resource_id.drop()
sample.c.resource_id_new.alter(name='resource_id')

View File

@ -16,18 +16,19 @@
""" """
SQLAlchemy models for Ceilometer data. SQLAlchemy models for Ceilometer data.
""" """
import hashlib
import json import json
from oslo.utils import timeutils from oslo.utils import timeutils
import six import six
from sqlalchemy import (Column, Integer, String, ForeignKey, Index, from sqlalchemy import (Column, Integer, String, ForeignKey, Index,
UniqueConstraint, BigInteger, join) UniqueConstraint, BigInteger)
from sqlalchemy import event, select
from sqlalchemy import Float, Boolean, Text, DateTime from sqlalchemy import Float, Boolean, Text, DateTime
from sqlalchemy.dialects.mysql import DECIMAL from sqlalchemy.dialects.mysql import DECIMAL
from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import backref from sqlalchemy.orm import backref
from sqlalchemy.orm import column_property from sqlalchemy.orm import deferred
from sqlalchemy.orm import relationship from sqlalchemy.orm import relationship
from sqlalchemy.types import TypeDecorator from sqlalchemy.types import TypeDecorator
@ -106,7 +107,7 @@ class MetaText(Base):
__table_args__ = ( __table_args__ = (
Index('ix_meta_text_key', 'meta_key'), Index('ix_meta_text_key', 'meta_key'),
) )
id = Column(Integer, ForeignKey('sample.id'), primary_key=True) id = Column(Integer, ForeignKey('resource.internal_id'), primary_key=True)
meta_key = Column(String(255), primary_key=True) meta_key = Column(String(255), primary_key=True)
value = Column(Text) value = Column(Text)
@ -118,7 +119,7 @@ class MetaBool(Base):
__table_args__ = ( __table_args__ = (
Index('ix_meta_bool_key', 'meta_key'), Index('ix_meta_bool_key', 'meta_key'),
) )
id = Column(Integer, ForeignKey('sample.id'), primary_key=True) id = Column(Integer, ForeignKey('resource.internal_id'), primary_key=True)
meta_key = Column(String(255), primary_key=True) meta_key = Column(String(255), primary_key=True)
value = Column(Boolean) value = Column(Boolean)
@ -130,7 +131,7 @@ class MetaBigInt(Base):
__table_args__ = ( __table_args__ = (
Index('ix_meta_int_key', 'meta_key'), Index('ix_meta_int_key', 'meta_key'),
) )
id = Column(Integer, ForeignKey('sample.id'), primary_key=True) id = Column(Integer, ForeignKey('resource.internal_id'), primary_key=True)
meta_key = Column(String(255), primary_key=True) meta_key = Column(String(255), primary_key=True)
value = Column(BigInteger, default=False) value = Column(BigInteger, default=False)
@ -142,7 +143,7 @@ class MetaFloat(Base):
__table_args__ = ( __table_args__ = (
Index('ix_meta_float_key', 'meta_key'), Index('ix_meta_float_key', 'meta_key'),
) )
id = Column(Integer, ForeignKey('sample.id'), primary_key=True) id = Column(Integer, ForeignKey('resource.internal_id'), primary_key=True)
meta_key = Column(String(255), primary_key=True) meta_key = Column(String(255), primary_key=True)
value = Column(Float(53), default=False) value = Column(Float(53), default=False)
@ -153,7 +154,7 @@ class Meter(Base):
__tablename__ = 'meter' __tablename__ = 'meter'
__table_args__ = ( __table_args__ = (
UniqueConstraint('name', 'type', 'unit', name='def_unique'), UniqueConstraint('name', 'type', 'unit', name='def_unique'),
Index('ix_meter_name', 'name') Index('ix_meter_name', 'name'),
) )
id = Column(Integer, primary_key=True) id = Column(Integer, primary_key=True)
name = Column(String(255), nullable=False) name = Column(String(255), nullable=False)
@ -162,55 +163,88 @@ class Meter(Base):
samples = relationship("Sample", backref="meter") samples = relationship("Sample", backref="meter")
class Resource(Base):
"""Resource data."""
__tablename__ = 'resource'
__table_args__ = (
# TODO(gordc): this should exist but the attribute values we set
# for user/project/source/resource id's are too large
# for a uuid.
# UniqueConstraint('resource_id', 'user_id', 'project_id',
# 'source_id', 'metadata_hash',
# name='res_def_unique'),
Index('ix_resource_resource_id', 'resource_id'),
)
internal_id = Column(Integer, primary_key=True)
user_id = Column(String(255))
project_id = Column(String(255))
source_id = Column(String(255))
resource_id = Column(String(255), nullable=False)
resource_metadata = deferred(Column(JSONEncodedDict()))
metadata_hash = deferred(Column(String(32)))
samples = relationship("Sample", backref="resource")
meta_text = relationship("MetaText", backref="resource",
cascade="all, delete-orphan")
meta_float = relationship("MetaFloat", backref="resource",
cascade="all, delete-orphan")
meta_int = relationship("MetaBigInt", backref="resource",
cascade="all, delete-orphan")
meta_bool = relationship("MetaBool", backref="resource",
cascade="all, delete-orphan")
@event.listens_for(Resource, "before_insert")
def before_insert(mapper, connection, target):
metadata = json.dumps(target.resource_metadata, sort_keys=True)
target.metadata_hash = hashlib.md5(metadata).hexdigest()
class Sample(Base): class Sample(Base):
"""Metering data.""" """Metering data."""
__tablename__ = 'sample' __tablename__ = 'sample'
__table_args__ = ( __table_args__ = (
Index('ix_sample_timestamp', 'timestamp'), Index('ix_sample_timestamp', 'timestamp'),
Index('ix_sample_user_id', 'user_id'), Index('ix_sample_resource_id', 'resource_id'),
Index('ix_sample_project_id', 'project_id'), Index('ix_sample_meter_id', 'meter_id'),
Index('ix_sample_meter_id', 'meter_id') Index('ix_sample_meter_id_resource_id', 'meter_id', 'resource_id')
) )
id = Column(Integer, primary_key=True) id = Column(Integer, primary_key=True)
meter_id = Column(Integer, ForeignKey('meter.id')) meter_id = Column(Integer, ForeignKey('meter.id'))
user_id = Column(String(255)) resource_id = Column(Integer, ForeignKey('resource.internal_id'))
project_id = Column(String(255))
resource_id = Column(String(255))
resource_metadata = Column(JSONEncodedDict())
volume = Column(Float(53)) volume = Column(Float(53))
timestamp = Column(PreciseTimestamp(), default=lambda: timeutils.utcnow()) timestamp = Column(PreciseTimestamp(), default=lambda: timeutils.utcnow())
recorded_at = Column(PreciseTimestamp(), recorded_at = Column(PreciseTimestamp(),
default=lambda: timeutils.utcnow()) default=lambda: timeutils.utcnow())
message_signature = Column(String(1000)) message_signature = Column(String(1000))
message_id = Column(String(1000)) message_id = Column(String(1000))
source_id = Column(String(255))
meta_text = relationship("MetaText", backref="sample",
cascade="all, delete-orphan")
meta_float = relationship("MetaFloat", backref="sample",
cascade="all, delete-orphan")
meta_int = relationship("MetaBigInt", backref="sample",
cascade="all, delete-orphan")
meta_bool = relationship("MetaBool", backref="sample",
cascade="all, delete-orphan")
class MeterSample(Base): class FullSample(Base):
"""Helper model. """Mapper model.
It's needed as many of the filters work against Sample data joined with It's needed as many of the filters work against raw data which is split
Meter data. between Meter, Sample, and Resource tables
""" """
meter = Meter.__table__ meter = Meter.__table__
sample = Sample.__table__ sample = Sample.__table__
__table__ = join(meter, sample) resource = Resource.__table__
__table__ = (select([sample.c.id, meter.c.name.label('counter_name'),
id = column_property(sample.c.id) meter.c.type.label('counter_type'),
meter_id = column_property(meter.c.id, sample.c.meter_id) meter.c.unit.label('counter_unit'),
counter_name = column_property(meter.c.name) sample.c.volume.label('counter_volume'),
counter_type = column_property(meter.c.type) resource.c.resource_id, resource.c.source_id,
counter_unit = column_property(meter.c.unit) resource.c.user_id, resource.c.project_id,
counter_volume = column_property(sample.c.volume) resource.c.resource_metadata, resource.c.internal_id,
sample.c.timestamp, sample.c.message_id,
sample.c.message_signature, sample.c.recorded_at])
.select_from(
sample.join(meter, sample.c.meter_id == meter.c.id).join(
resource,
sample.c.resource_id == resource.c.internal_id))
.alias())
class Alarm(Base): class Alarm(Base):

View File

@ -85,7 +85,7 @@ class QueryTransformer(object):
field_name = field_name[len('resource_metadata.'):] field_name = field_name[len('resource_metadata.'):]
meta_table = META_TYPE_MAP[type(value)] meta_table = META_TYPE_MAP[type(value)]
meta_alias = aliased(meta_table) meta_alias = aliased(meta_table)
on_clause = and_(self.table.id == meta_alias.id, on_clause = and_(self.table.internal_id == meta_alias.id,
meta_alias.meta_key == field_name) meta_alias.meta_key == field_name)
# outer join is needed to support metaquery # outer join is needed to support metaquery
# with or operator on non existent metadata field # with or operator on non existent metadata field