diff --git a/ceilometer/storage/impl_sqlalchemy.py b/ceilometer/storage/impl_sqlalchemy.py new file mode 100644 index 000000000..e3ff3f130 --- /dev/null +++ b/ceilometer/storage/impl_sqlalchemy.py @@ -0,0 +1,298 @@ +# -*- encoding: utf-8 -*- +# +# Author: John Tran +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +"""SQLAlchemy storage backend +""" + +import copy +import datetime + +from ceilometer.openstack.common import log +from ceilometer.openstack.common import cfg +from ceilometer.storage import base +from ceilometer.storage.sqlalchemy.models import Meter, Project, Resource +from ceilometer.storage.sqlalchemy.models import Source, User +from ceilometer.storage.sqlalchemy.session import get_session +import ceilometer.storage.sqlalchemy.session as session + +LOG = log.getLogger(__name__) + + +class SQLAlchemyStorage(base.StorageEngine): + """Put the data into a SQLAlchemy database + + Tables: + + - user + - { _id: user id + source: [ array of source ids reporting for the user ] + } + - project + - { _id: project id + source: [ array of source ids reporting for the project ] + } + - meter + - the raw incoming data + - resource + - the metadata for resources + - { _id: uuid of resource, + metadata: metadata dictionaries + timestamp: datetime of last update + user_id: uuid + project_id: uuid + meter: [ array of {counter_name: string, counter_type: string} ] + } + """ + + OPTIONS = [] + + def register_opts(self, conf): + """Register any configuration options used by this engine. + """ + conf.register_opts(self.OPTIONS) + + def get_connection(self, conf): + """Return a Connection instance based on the configuration settings. + """ + return Connection(conf) + + +def make_query_from_filter(query, event_filter, require_meter=True): + """Return a query dictionary based on the settings in the filter. + + :param filter: EventFilter instance + :param require_meter: If true and the filter does not have a meter, + raise an error. + """ + + if event_filter.meter: + query = query.filter(Meter.counter_name == event_filter.meter) + elif require_meter: + raise RuntimeError('Missing required meter specifier') + if event_filter.source: + query = query.filter_by(source=event_filter.source) + if event_filter.start: + query = query = query.filter(Meter.timestamp >= event_filter.start) + if event_filter.end: + query = query = query.filter(Meter.timestamp < event_filter.end) + if event_filter.user: + query = query.filter_by(user_id=event_filter.user) + elif event_filter.project: + query = query.filter_by(project_id=event_filter.project) + if event_filter.resource: + query = query.filter_by(resource_id=event_filter.resource) + + return query + + +class Connection(base.Connection): + """SqlAlchemy connection. + """ + + def __init__(self, conf): + LOG.info('connecting to %s', conf.database_connection) + self.session = self._get_connection(conf) + return + + def _get_connection(self, conf): + """Return a connection to the database. + """ + return session.get_session() + + def record_metering_data(self, data): + """Write the data to the backend storage system. + + :param data: a dictionary such as returned by + ceilometer.meter.meter_message_from_counter + """ + if data['source']: + source = self.session.query(Source).get(data['source']) + if not source: + source = Source(id=data['source']) + self.session.add(source) + else: + source = None + + # create/update user && project, add/update their sources list + if data['user_id']: + user = self.session.merge(User(id=data['user_id'])) + if not filter(lambda x: x.id == source.id, user.sources): + user.sources.append(source) + else: + user = None + + if data['project_id']: + project = self.session.merge(Project(id=data['project_id'])) + if not filter(lambda x: x.id == source.id, project.sources): + project.sources.append(source) + else: + project = None + + # Record the updated resource metadata + rtimestamp = datetime.datetime.utcnow() + rmetadata = data['resource_metadata'] + + resource = self.session.merge(Resource(id=data['resource_id'])) + if not filter(lambda x: x.id == source.id, resource.sources): + resource.sources.append(source) + resource.project = project + resource.user = user + resource.timestamp = data['timestamp'] + resource.received_timestamp = rtimestamp + # Current metadata being used and when it was last updated. + resource.resource_metadata = rmetadata + # autoflush didn't catch this one, requires manual flush + self.session.flush() + + # Record the raw data for the event. + meter = Meter(counter_type=data['counter_type'], + counter_name=data['counter_name'], resource=resource) + self.session.add(meter) + if not filter(lambda x: x.id == source.id, meter.sources): + meter.sources.append(source) + meter.project = project + meter.user = user + meter.timestamp = data['timestamp'] + meter.resource_metadata = rmetadata + meter.counter_duration = data['counter_duration'] + meter.counter_volume = data['counter_volume'] + meter.message_signature = data['message_signature'] + meter.message_id = data['message_id'] + + return + + def get_users(self, source=None): + """Return an iterable of user id strings. + + :param source: Optional source filter. + """ + query = model_query(User.id, session=self.session) + if source is not None: + query = query.filter(User.sources.any(id=source)) + return (x[0] for x in query.all()) + + def get_projects(self, source=None): + """Return an iterable of project id strings. + + :param source: Optional source filter. + """ + query = model_query(Project.id, session=self.session) + if source: + query = query.filter(Project.sources.any(id=source)) + return (x[0] for x in query.all()) + + def get_resources(self, user=None, project=None, source=None, + start_timestamp=None, end_timestamp=None, + session=None): + """Return an iterable of dictionaries containing resource information. + + { 'resource_id': UUID of the resource, + 'project_id': UUID of project owning the resource, + 'user_id': UUID of user owning the resource, + 'timestamp': UTC datetime of last update to the resource, + 'metadata': most current metadata for the resource, + 'meter': list of the meters reporting data for the resource, + } + + :param user: Optional ID for user that owns the resource. + :param project: Optional ID for project that owns the resource. + :param source: Optional source filter. + :param start_timestamp: Optional modified timestamp start range. + :param end_timestamp: Optional modified timestamp end range. + """ + query = model_query(Resource, session=session) + if user is not None: + query = query.filter(Resource.user_id == user) + if source is not None: + query = query.filter(Resource.sources.any(id=source)) + if start_timestamp is not None: + query = query.filter(Resource.timestamp >= start_timestamp) + if end_timestamp: + query = query.filter(Resource.timestamp < end_timestamp) + if project is not None: + query = query.filter(Resource.project_id == project) + + for resource in query.all(): + r = row2dict(resource) + # Replace the '_id' key with 'resource_id' to meet the + # caller's expectations. + r['resource_id'] = r['id'] + del r['id'] + yield r + + def get_raw_events(self, event_filter): + """Return an iterable of raw event data as created by + :func:`ceilometer.meter.meter_message_from_counter`. + """ + query = model_query(Meter, session=self.session) + query = make_query_from_filter(query, event_filter, + require_meter=False) + events = query.all() + + for e in events: + # Remove the id generated by the database when + # the event was inserted. It is an implementation + # detail that should not leak outside of the driver. + e = row2dict(e) + del e['id'] + yield e + + def get_volume_sum(self, event_filter): + # it isn't clear these are used + pass + + def get_volume_max(self, event_filter): + # it isn't clear these are used + pass + + def get_event_interval(self, event_filter): + """Return the min and max timestamps from events, + using the event_filter to limit the events seen. + + ( datetime.datetime(), datetime.datetime() ) + """ + func = session.func() + query = self.session.query(func.min(Meter.timestamp), + func.max(Meter.timestamp)) + query = make_query_from_filter(query, event_filter) + results = query.all() + a_min, a_max = results[0] + return (a_min, a_max) + + +############################ + + +def model_query(*args, **kwargs): + """Query helper + + :param session: if present, the session to use + """ + session = kwargs.get('session') or get_session() + query = session.query(*args) + return query + + +def row2dict(row, srcflag=None): + """Convert User, Project, Meter, Resource instance to dictionary object + with nested Source(s) + """ + d = copy.copy(row.__dict__) + for col in ['_sa_instance_state', 'sources']: + if col in d: + del d[col] + if not srcflag: + d['sources'] = map(lambda x: row2dict(x, True), row.sources) + return d diff --git a/ceilometer/storage/migration.py b/ceilometer/storage/migration.py new file mode 100644 index 000000000..16907427d --- /dev/null +++ b/ceilometer/storage/migration.py @@ -0,0 +1,29 @@ +# -*- encoding: utf-8 -*- +# +# Author: John Tran +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +"""Database setup and migration commands.""" + +import ceilometer.storage.sqlalchemy.migration as IMPL + + +def db_sync(version=None): + """Migrate the database to `version` or the most recent version.""" + return IMPL.db_sync(version=version) + + +def db_version(): + """Display the current database version.""" + return IMPL.db_version() diff --git a/ceilometer/storage/sqlalchemy/__init__.py b/ceilometer/storage/sqlalchemy/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/ceilometer/storage/sqlalchemy/migrate_repo/README b/ceilometer/storage/sqlalchemy/migrate_repo/README new file mode 100644 index 000000000..6218f8cac --- /dev/null +++ b/ceilometer/storage/sqlalchemy/migrate_repo/README @@ -0,0 +1,4 @@ +This is a database migration repository. + +More information at +http://code.google.com/p/sqlalchemy-migrate/ diff --git a/ceilometer/storage/sqlalchemy/migrate_repo/__init__.py b/ceilometer/storage/sqlalchemy/migrate_repo/__init__.py new file mode 100644 index 000000000..2f288d3cf --- /dev/null +++ b/ceilometer/storage/sqlalchemy/migrate_repo/__init__.py @@ -0,0 +1 @@ +# template repository default module diff --git a/ceilometer/storage/sqlalchemy/migrate_repo/manage.py b/ceilometer/storage/sqlalchemy/migrate_repo/manage.py new file mode 100644 index 000000000..39fa3892e --- /dev/null +++ b/ceilometer/storage/sqlalchemy/migrate_repo/manage.py @@ -0,0 +1,5 @@ +#!/usr/bin/env python +from migrate.versioning.shell import main + +if __name__ == '__main__': + main(debug='False') diff --git a/ceilometer/storage/sqlalchemy/migrate_repo/migrate.cfg b/ceilometer/storage/sqlalchemy/migrate_repo/migrate.cfg new file mode 100644 index 000000000..cd16764f4 --- /dev/null +++ b/ceilometer/storage/sqlalchemy/migrate_repo/migrate.cfg @@ -0,0 +1,25 @@ +[db_settings] +# Used to identify which repository this database is versioned under. +# You can use the name of your project. +repository_id=ceilometer + +# The name of the database table used to track the schema version. +# This name shouldn't already be used by your project. +# If this is changed once a database is under version control, you'll need to +# change the table name in each database too. +version_table=migrate_version + +# When committing a change script, Migrate will attempt to generate the +# sql for all supported databases; normally, if one of them fails - probably +# because you don't have that database installed - it is ignored and the +# commit continues, perhaps ending successfully. +# Databases in this list MUST compile successfully during a commit, or the +# entire commit will fail. List the databases your application will actually +# be using to ensure your updates to that database work properly. +# This must be a list; example: ['postgres','sqlite'] +required_dbs=[] + +# When creating new change scripts, Migrate will stamp the new script with +# a version number. By default this is latest_version + 1. You can set this +# to 'true' to tell Migrate to use the UTC timestamp instead. +use_timestamp_numbering=False diff --git a/ceilometer/storage/sqlalchemy/migrate_repo/versions/001_add_meter_table.py b/ceilometer/storage/sqlalchemy/migrate_repo/versions/001_add_meter_table.py new file mode 100644 index 000000000..f5d02fb83 --- /dev/null +++ b/ceilometer/storage/sqlalchemy/migrate_repo/versions/001_add_meter_table.py @@ -0,0 +1,90 @@ +# -*- encoding: utf-8 -*- +# +# Author: John Tran +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from sqlalchemy import * +from ceilometer.openstack.common import timeutils + +meta = MetaData() + +meter = Table( + 'meter', meta, + Column('id', Integer, primary_key=True, index=True), + Column('counter_name', String(255)), + Column('user_id', String(255), index=True), + Column('project_id', String(255), index=True), + Column('resource_id', String(255)), + Column('resource_metadata', String(5000)), + Column('counter_type', String(255)), + Column('counter_volume', Integer), + Column('counter_duration', Integer), + Column('timestamp', DateTime(timezone=False), index=True), + Column('message_signature', String(1000)), + Column('message_id', String(1000)) +) + +resource = Table( + 'resource', meta, + Column('id', String(255), primary_key=True, index=True), + Column('resource_metadata', String(5000)), + Column('project_id', String(255), index=True), + Column('received_timestamp', DateTime(timezone=False)), + Column('timestamp', DateTime(timezone=False), index=True), + Column('user_id', String(255), index=True) +) + +user = Table( + 'user', meta, + Column('id', String(255), primary_key=True, index=True), +) + +project = Table( + 'project', meta, + Column('id', String(255), primary_key=True, index=True), +) + +sourceassoc = Table( + 'sourceassoc', meta, + Column('source_id', String(255), index=True), + Column('user_id', String(255)), + Column('project_id', String(255)), + Column('resource_id', String(255)), + Column('meter_id', Integer), + Index('idx_su', 'source_id', 'user_id'), + Index('idx_sp', 'source_id', 'project_id'), + Index('idx_sr', 'source_id', 'resource_id'), + Index('idx_sm', 'source_id', 'meter_id') +) + +source = Table( + 'source', meta, + Column('id', String(255), primary_key=True, index=True), + UniqueConstraint('id') +) + + +tables = [meter, project, resource, user, source, sourceassoc] + + +def upgrade(migrate_engine): + meta.bind = migrate_engine + for i in sorted(tables): + i.create() + + +def downgrade(migrate_engine): + meta.bind = migrate_engine + for i in sorted(tables, reverse=True): + i.drop() diff --git a/ceilometer/storage/sqlalchemy/migrate_repo/versions/__init__.py b/ceilometer/storage/sqlalchemy/migrate_repo/versions/__init__.py new file mode 100644 index 000000000..507b5ff6b --- /dev/null +++ b/ceilometer/storage/sqlalchemy/migrate_repo/versions/__init__.py @@ -0,0 +1 @@ +# template repository default versions module diff --git a/ceilometer/storage/sqlalchemy/migration.py b/ceilometer/storage/sqlalchemy/migration.py new file mode 100644 index 000000000..7cc2edc46 --- /dev/null +++ b/ceilometer/storage/sqlalchemy/migration.py @@ -0,0 +1,106 @@ +# -*- encoding: utf-8 -*- +# +# Author: John Tran +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import distutils.version as dist_version +import os + +from ceilometer.storage.sqlalchemy.session import get_engine +from ceilometer.openstack.common import log as logging + + +import migrate +from migrate.versioning import util as migrate_util +import sqlalchemy + +INIT_VERSION = 1 +LOG = logging.getLogger(__name__) + + +@migrate_util.decorator +def patched_with_engine(f, *a, **kw): + url = a[0] + engine = migrate_util.construct_engine(url, **kw) + + try: + kw['engine'] = engine + return f(*a, **kw) + finally: + if isinstance(engine, migrate_util.Engine) and engine is not url: + migrate_util.log.debug('Disposing SQLAlchemy engine %s', engine) + engine.dispose() + + +# TODO(jkoelker) When migrate 0.7.3 is released and nova depends +# on that version or higher, this can be removed +MIN_PKG_VERSION = dist_version.StrictVersion('0.7.3') +if (not hasattr(migrate, '__version__') or + dist_version.StrictVersion(migrate.__version__) < MIN_PKG_VERSION): + migrate_util.with_engine = patched_with_engine + + +# NOTE(jkoelker) Delay importing migrate until we are patched +from migrate import exceptions as versioning_exceptions +from migrate.versioning import api as versioning_api +from migrate.versioning.repository import Repository + +_REPOSITORY = None + + +def db_sync(version=None): + if version is not None: + try: + version = int(version) + except ValueError: + raise Exception(_("version should be an integer")) + + current_version = db_version() + repository = _find_migrate_repo() + if version is None or version > current_version: + return versioning_api.upgrade(get_engine(), repository, version) + else: + return versioning_api.downgrade(get_engine(), repository, + version) + + +def db_version(): + repository = _find_migrate_repo() + try: + return versioning_api.db_version(get_engine(), repository) + except versioning_exceptions.DatabaseNotControlledError: + meta = sqlalchemy.MetaData() + engine = get_engine() + meta.reflect(bind=engine) + tables = meta.tables + if len(tables) == 0: + db_version_control(0) + return versioning_api.db_version(get_engine(), repository) + + +def db_version_control(version=None): + repository = _find_migrate_repo() + versioning_api.version_control(get_engine(), repository, version) + return version + + +def _find_migrate_repo(): + """Get the path for the migrate repository.""" + global _REPOSITORY + path = os.path.join(os.path.abspath(os.path.dirname(__file__)), + 'migrate_repo') + assert os.path.exists(path) + if _REPOSITORY is None: + _REPOSITORY = Repository(path) + return _REPOSITORY diff --git a/ceilometer/storage/sqlalchemy/models.py b/ceilometer/storage/sqlalchemy/models.py new file mode 100644 index 000000000..99a7ee894 --- /dev/null +++ b/ceilometer/storage/sqlalchemy/models.py @@ -0,0 +1,141 @@ +# -*- encoding: utf-8 -*- +# +# Author: John Tran +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +SQLAlchemy models for nova data. +""" + +import json +from sqlalchemy import Column, Integer, String, Table +from sqlalchemy.ext.declarative import declarative_base +from sqlalchemy import ForeignKey, DateTime +from sqlalchemy.orm import relationship, backref +from sqlalchemy.types import TypeDecorator, VARCHAR +from urlparse import urlparse + +import ceilometer.openstack.common.cfg as cfg +from ceilometer.openstack.common import timeutils + +sql_opts = [ + cfg.IntOpt('mysql_engine', + default='InnoDB', + help='MySQL engine') + ] + +cfg.CONF.register_opts(sql_opts) + + +def table_args(): + engine_name = urlparse(cfg.CONF.database_connection).scheme + if engine_name == 'mysql': + return {'mysql_engine': cfg.CONF.mysql_engine} + return None + + +class JSONEncodedDict(TypeDecorator): + "Represents an immutable structure as a json-encoded string." + + impl = VARCHAR + + def process_bind_param(self, value, dialect): + if value is not None: + value = json.dumps(value) + return value + + def process_result_value(self, value, dialect): + if value is not None: + value = json.loads(value) + return value + + +class CeilometerBase(object): + """Base class for Ceilometer Models.""" + __table_args__ = table_args() + __table_initialized__ = False + + def __setitem__(self, key, value): + setattr(self, key, value) + + def __getitem__(self, key): + return getattr(self, key) + + +Base = declarative_base(cls=CeilometerBase) + + +sourceassoc = Table('sourceassoc', Base.metadata, + Column('meter_id', Integer, ForeignKey("meter.id")), + Column('project_id', String(255), ForeignKey("project.id")), + Column('resource_id', String(255), ForeignKey("resource.id")), + Column('user_id', String(255), ForeignKey("user.id")), + Column('source_id', String(255), ForeignKey("source.id")) +) + + +class Source(Base): + __tablename__ = 'source' + id = Column(String(255), primary_key=True) + + +class Meter(Base): + """Metering data""" + + __tablename__ = 'meter' + id = Column(Integer, primary_key=True) + counter_name = Column(String(255)) + sources = relationship("Source", secondary=lambda: sourceassoc, + lazy='joined') + user_id = Column(String(255), ForeignKey('user.id')) + project_id = Column(String(255), ForeignKey('project.id')) + resource_id = Column(String(255), ForeignKey('resource.id')) + resource_metadata = Column(JSONEncodedDict) + counter_type = Column(String(255)) + counter_volume = Column(Integer) + counter_duration = Column(Integer) + timestamp = Column(DateTime, default=timeutils.utcnow) + message_signature = Column(String) + message_id = Column(String) + + +class User(Base): + __tablename__ = 'user' + id = Column(String(255), primary_key=True) + sources = relationship("Source", secondary=lambda: sourceassoc, + lazy='joined') + resources = relationship("Resource", backref='user', lazy='joined') + meters = relationship("Meter", backref='user', lazy='joined') + + +class Project(Base): + __tablename__ = 'project' + id = Column(String(255), primary_key=True) + sources = relationship("Source", secondary=lambda: sourceassoc, + lazy='joined') + resources = relationship("Resource", backref='project', lazy='joined') + meters = relationship("Meter", backref='project', lazy='joined') + + +class Resource(Base): + __tablename__ = 'resource' + id = Column(String(255), primary_key=True) + sources = relationship("Source", secondary=lambda: sourceassoc, + lazy='joined') + timestamp = Column(DateTime) + resource_metadata = Column(JSONEncodedDict) + received_timestamp = Column(DateTime, default=timeutils.utcnow) + user_id = Column(String(255), ForeignKey('user.id')) + project_id = Column(String(255), ForeignKey('project.id')) + meters = relationship("Meter", backref='resource', lazy='joined') diff --git a/ceilometer/storage/sqlalchemy/session.py b/ceilometer/storage/sqlalchemy/session.py new file mode 100644 index 000000000..db2bc5a2d --- /dev/null +++ b/ceilometer/storage/sqlalchemy/session.py @@ -0,0 +1,196 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2010 United States Government as represented by the +# Administrator of the National Aeronautics and Space Administration. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +"""Session Handling for SQLAlchemy backend.""" + +import re +import time + +import sqlalchemy +from sqlalchemy.exc import DisconnectionError, OperationalError +import sqlalchemy.orm +from sqlalchemy.pool import NullPool, StaticPool + +import ceilometer.openstack.common.cfg as cfg +import ceilometer.openstack.common.log as logging + +LOG = logging.getLogger(__name__) + +_MAKER = None +_ENGINE = None + +sql_opts = [ + cfg.IntOpt('sql_connection_debug', + default=0, + help='Verbosity of SQL debugging information. 0=None, ' + '100=Everything'), + cfg.BoolOpt('sql_connection_trace', + default=False, + help='Add python stack traces to SQL as comment strings'), + cfg.BoolOpt('sqlite_synchronous', + default=True, + help='If passed, use synchronous mode for sqlite'), + cfg.IntOpt('sql_idle_timeout', + default=3600, + help='timeout before idle sql connections are reaped'), + cfg.IntOpt('sql_max_retries', + default=10, + help='maximum db connection retries during startup. ' + '(setting -1 implies an infinite retry count)'), + cfg.IntOpt('sql_retry_interval', + default=10, + help='interval between retries of opening a sql connection'), + ] + +cfg.CONF.register_opts(sql_opts) + + +def get_session(autocommit=True, expire_on_commit=False, autoflush=True): + """Return a SQLAlchemy session.""" + global _MAKER + + if _MAKER is None: + engine = get_engine() + _MAKER = get_maker(engine, autocommit, expire_on_commit, autoflush) + + session = _MAKER() + return session + + +def synchronous_switch_listener(dbapi_conn, connection_rec): + """Switch sqlite connections to non-synchronous mode""" + dbapi_conn.execute("PRAGMA synchronous = OFF") + + +def add_regexp_listener(dbapi_con, con_record): + """Add REGEXP function to sqlite connections.""" + + def regexp(expr, item): + reg = re.compile(expr) + return reg.search(unicode(item)) is not None + dbapi_con.create_function('regexp', 2, regexp) + + +def ping_listener(dbapi_conn, connection_rec, connection_proxy): + """ + Ensures that MySQL connections checked out of the + pool are alive. + + Borrowed from: + http://groups.google.com/group/sqlalchemy/msg/a4ce563d802c929f + """ + try: + dbapi_conn.cursor().execute('select 1') + except dbapi_conn.OperationalError, ex: + if ex.args[0] in (2006, 2013, 2014, 2045, 2055): + LOG.warn('Got mysql server has gone away: %s', ex) + raise DisconnectionError("Database server went away") + else: + raise + + +def is_db_connection_error(args): + """Return True if error in connecting to db.""" + # NOTE(adam_g): This is currently MySQL specific and needs to be extended + # to support Postgres and others. + conn_err_codes = ('2002', '2003', '2006') + for err_code in conn_err_codes: + if args.find(err_code) != -1: + return True + return False + + +def get_engine(): + """Return a SQLAlchemy engine.""" + global _ENGINE + if _ENGINE is None: + connection_dict = sqlalchemy.engine.url.make_url( + cfg.CONF.database_connection) + + engine_args = { + "pool_recycle": cfg.CONF.sql_idle_timeout, + "echo": False, + 'convert_unicode': True, + } + + # Map our SQL debug level to SQLAlchemy's options + if cfg.CONF.sql_connection_debug >= 100: + engine_args['echo'] = 'debug' + elif cfg.CONF.sql_connection_debug >= 50: + engine_args['echo'] = True + + if "sqlite" in connection_dict.drivername: + engine_args["poolclass"] = NullPool + + if cfg.CONF.database_connection == "sqlite://": + engine_args["poolclass"] = StaticPool + engine_args["connect_args"] = {'check_same_thread': False} + + _ENGINE = sqlalchemy.create_engine(cfg.CONF.database_connection, + **engine_args) + + if 'mysql' in connection_dict.drivername: + sqlalchemy.event.listen(_ENGINE, 'checkout', ping_listener) + elif "sqlite" in connection_dict.drivername: + if not cfg.CONF.sqlite_synchronous: + sqlalchemy.event.listen(_ENGINE, 'connect', + synchronous_switch_listener) + sqlalchemy.event.listen(_ENGINE, 'connect', add_regexp_listener) + + if (cfg.CONF.sql_connection_trace and + _ENGINE.dialect.dbapi.__name__ == 'MySQLdb'): + import MySQLdb.cursors + _do_query = debug_mysql_do_query() + setattr(MySQLdb.cursors.BaseCursor, '_do_query', _do_query) + + try: + _ENGINE.connect() + except OperationalError, e: + if not is_db_connection_error(e.args[0]): + raise + + remaining = cfg.CONF.sql_max_retries + if remaining == -1: + remaining = 'infinite' + while True: + msg = _('SQL connection failed. %s attempts left.') + LOG.warn(msg % remaining) + if remaining != 'infinite': + remaining -= 1 + time.sleep(cfg.CONF.sql_retry_interval) + try: + _ENGINE.connect() + break + except OperationalError, e: + if (remaining != 'infinite' and remaining == 0) or \ + not is_db_connection_error(e.args[0]): + raise + return _ENGINE + + +def get_maker(engine, autocommit=True, expire_on_commit=False, autoflush=True): + """Return a SQLAlchemy sessionmaker using the given engine.""" + return sqlalchemy.orm.sessionmaker(bind=engine, + autocommit=autocommit, + autoflush=autoflush, + expire_on_commit=expire_on_commit) + + +def func(): + # ugly hack sqlalchemy name conflict from impl_sqlalchemy + return sqlalchemy.func diff --git a/setup.py b/setup.py index b4ff50e6e..25d0c8773 100755 --- a/setup.py +++ b/setup.py @@ -57,5 +57,8 @@ setuptools.setup( [ceilometer.storage] log = ceilometer.storage.impl_log:LogStorage mongodb = ceilometer.storage.impl_mongodb:MongoDBStorage + mysql = ceilometer.storage.impl_sqlalchemy:SQLAlchemyStorage + postgresql = ceilometer.storage.impl_sqlalchemy:SQLAlchemyStorage + sqlite = ceilometer.storage.impl_sqlalchemy:SQLAlchemyStorage """), ) diff --git a/tests/storage/test_impl_sqlalchemy.py b/tests/storage/test_impl_sqlalchemy.py new file mode 100644 index 000000000..1ce863154 --- /dev/null +++ b/tests/storage/test_impl_sqlalchemy.py @@ -0,0 +1,466 @@ +# -*- encoding: utf-8 -*- +# +# Author: John Tran +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +"""Tests for ceilometer/storage/impl_sqlalchemy.py +""" + +import datetime +import logging +import os +import re +import unittest + +import mox + +from nose.plugins import skip +from sqlalchemy import MetaData, text + +from ceilometer import counter +from ceilometer import meter +from ceilometer import storage +from ceilometer.storage import migration +import ceilometer.openstack.common.cfg as cfg +from ceilometer.storage import impl_sqlalchemy +from ceilometer.storage.sqlalchemy.models import Meter, Project, Resource +from ceilometer.storage.sqlalchemy.models import Source, User + + +LOG = logging.getLogger(__name__) +CEILOMETER_TEST_LIVE = bool(int(os.environ.get('CEILOMETER_TEST_LIVE', 0))) +if CEILOMETER_TEST_LIVE: + MYSQL_DBNAME = 'ceilometer_test' + MYSQL_URL = 'mysql://ceilometer:somepass@localhost/%s' % MYSQL_DBNAME + + +class Connection(impl_sqlalchemy.Connection): + + def _get_connection(self, conf): + try: + return super(Connection, self)._get_connection(conf) + except: + LOG.debug('Unable to connect to %s' % conf.database_connection) + raise + + +class SQLAlchemyEngineTestBase(unittest.TestCase): + + def tearDown(self): + super(SQLAlchemyEngineTestBase, self).tearDown() + engine_conn = self.session.bind.connect() + if CEILOMETER_TEST_LIVE: + engine_conn.execute(text('drop database %s' % MYSQL_DBNAME)) + engine_conn.execute(text('create database %s' % MYSQL_DBNAME)) + # needed for sqlite in-memory db to destroy + self.session.close_all() + self.session.bind.dispose() + + def setUp(self): + super(SQLAlchemyEngineTestBase, self).setUp() + + self.conf = cfg.CONF + self.conf.database_connection = 'sqlite://' + # Use a real MySQL server if we can connect, but fall back + # to a Sqlite in-memory connection if we cannot. + if CEILOMETER_TEST_LIVE: + # should pull from conf file but for now manually specified + # just make sure ceilometer_test db exists in mysql + self.conf.database_connection = MYSQL_URL + + self.conn = Connection(self.conf) + self.session = self.conn.session + + migration.db_sync() + + self.counter = counter.Counter( + 'test-1', + 'instance', + 'cumulative', + volume=1, + user_id='user-id', + project_id='project-id', + resource_id='resource-id', + timestamp=datetime.datetime(2012, 7, 2, 10, 40), + duration=0, + resource_metadata={'display_name': 'test-server', + 'tag': 'self.counter', + } + ) + self.msg1 = meter.meter_message_from_counter(self.counter) + self.conn.record_metering_data(self.msg1) + + self.counter2 = counter.Counter( + 'test-2', + 'instance', + 'cumulative', + volume=1, + user_id='user-id', + project_id='project-id', + resource_id='resource-id-alternate', + timestamp=datetime.datetime(2012, 7, 2, 10, 41), + duration=0, + resource_metadata={'display_name': 'test-server', + 'tag': 'self.counter2', + } + ) + self.msg2 = meter.meter_message_from_counter(self.counter2) + self.conn.record_metering_data(self.msg2) + + self.counter3 = counter.Counter( + 'test-3', + 'instance', + 'cumulative', + volume=1, + user_id='user-id-alternate', + project_id='project-id', + resource_id='resource-id-alternate', + timestamp=datetime.datetime(2012, 7, 2, 10, 41), + duration=0, + resource_metadata={'display_name': 'test-server', + 'tag': 'self.counter3', + } + ) + self.msg3 = meter.meter_message_from_counter(self.counter3) + self.conn.record_metering_data(self.msg3) + + for i in range(2, 4): + c = counter.Counter( + 'test', + 'instance', + 'cumulative', + 1, + 'user-id-%s' % i, + 'project-id-%s' % i, + 'resource-id-%s' % i, + timestamp=datetime.datetime(2012, 7, 2, 10, 40 + i), + duration=0, + resource_metadata={'display_name': 'test-server', + 'tag': 'counter-%s' % i, + } + ) + msg = meter.meter_message_from_counter(c) + self.conn.record_metering_data(msg) + + +class UserTest(SQLAlchemyEngineTestBase): + + def test_new_user(self): + user = self.session.query(User).get('user-id') + assert user is not None + + def test_new_user_source(self): + user = self.session.query(User).get('user-id') + assert hasattr(user, 'sources') + sources = user.sources + assert map(lambda x: x.id, user.sources) == ['test-1', 'test-2'] + + def test_get_users(self): + users = self.conn.get_users() + xpct = set(['user-id', 'user-id-alternate', 'user-id-2', 'user-id-3']) + assert set(self.conn.get_users()) == xpct + + def test_get_users_by_source(self): + assert set(self.conn.get_users(source='test-1')) == set(['user-id']) + + +class ProjectTest(SQLAlchemyEngineTestBase): + + def test_new_project(self): + project = self.session.query(Project).get('project-id') + assert project is not None + + def test_new_project_source(self): + project = self.session.query(Project).get('project-id') + assert hasattr(project, 'sources') + expected = ['test-1', 'test-2', 'test-3'] + assert map(lambda x: x.id, project.sources) == expected + + def test_get_projects(self): + projects = self.session.query(Project).all() + projects = map(lambda x: x.id, projects) + expect = set(['project-id', 'project-id-2', 'project-id-3']) + assert set(projects) == expect + + def test_get_projects_by_source(self): + projects = self.conn.get_projects(source='test-1') + assert list(projects) == ['project-id'] + + +class ResourceTest(SQLAlchemyEngineTestBase): + + def test_new_resource(self): + resource = self.session.query(Resource).get('resource-id') + assert resource is not None + + def test_new_resource_project(self): + resource = self.session.query(Resource).get('resource-id') + assert hasattr(resource, 'project') + assert resource.project.id == 'project-id' + + def test_new_resource_user(self): + resource = self.session.query(Resource).get('resource-id') + assert hasattr(resource, 'user') + assert resource.user.id == 'user-id' + + def test_new_resource_meter(self): + resource = self.session.query(Resource).filter_by(id='resource-id').\ + filter(Meter.counter_name == 'instance').\ + filter(Meter.counter_type == 'cumulative').first() + assert len(set(resource.meters)) == 1 + foo = map(lambda x: [x.counter_name, x.counter_type], resource.meters) + assert ['instance', 'cumulative'] in foo + + def test_new_resource_metadata(self): + resource = self.session.query(Resource).get('resource-id') + assert hasattr(resource, 'metadata') + metadata = resource.resource_metadata + assert metadata['display_name'] == 'test-server' + + def test_get_resources(self): + resources = list(self.conn.get_resources()) + assert len(resources) == 4 + for resource in resources: + if resource['resource_id'] != 'resource-id': + continue + assert resource['resource_id'] == 'resource-id' + assert resource['project_id'] == 'project-id' + assert resource['user_id'] == 'user-id' + assert 'resource_metadata' in resource + assert 'meters' in resource + foo = map(lambda x: [x['counter_name'], x['counter_type']], + resource['meters']) + assert ['instance', 'cumulative'] in foo + break + else: + assert False, 'Never found resource-id' + + def test_get_resources_start_timestamp(self): + timestamp = datetime.datetime(2012, 7, 2, 10, 42) + resources = list(self.conn.get_resources(start_timestamp=timestamp)) + resource_ids = [r['resource_id'] for r in resources] + expected = set(['resource-id-2', 'resource-id-3']) + assert set(resource_ids) == expected + + def test_get_resources_end_timestamp(self): + timestamp = datetime.datetime(2012, 7, 2, 10, 42) + resources = list(self.conn.get_resources(end_timestamp=timestamp)) + resource_ids = [r['resource_id'] for r in resources] + expected = set(['resource-id', 'resource-id-alternate']) + assert set(resource_ids) == expected + + def test_get_resources_both_timestamps(self): + start_ts = datetime.datetime(2012, 7, 2, 10, 42) + end_ts = datetime.datetime(2012, 7, 2, 10, 43) + resources = list(self.conn.get_resources(start_timestamp=start_ts, + end_timestamp=end_ts) + ) + resource_ids = [r['resource_id'] for r in resources] + assert set(resource_ids) == set(['resource-id-2']) + + def test_get_resources_by_source(self): + resources = list(self.conn.get_resources(source='test-1')) + assert len(resources) == 1 + ids = set(r['resource_id'] for r in resources) + assert ids == set(['resource-id']) + + def test_get_resources_by_user(self): + resources = list(self.conn.get_resources(user='user-id')) + assert len(resources) == 1 + ids = set(r['resource_id'] for r in resources) + assert ids == set(['resource-id']) + + def test_get_resources_by_project(self): + resources = list(self.conn.get_resources(project='project-id')) + assert len(resources) == 2 + ids = set(r['resource_id'] for r in resources) + assert ids == set(['resource-id', 'resource-id-alternate']) + + +class MeterTest(SQLAlchemyEngineTestBase): + + def _compare_raw(self, msg_dict, result_dict): + for k, v in msg_dict.items(): + if k in ['timestamp', 'source']: + continue + if k == 'resource_metadata': + key = result_dict[k] + value = v + else: + key = str(result_dict[k]) + value = str(v) + assert key == value + + def _iterate_msgs(self, results): + for meter in results: + labels = map(lambda x: x['id'], meter['sources']) + # should only have one source + assert len(labels) == 1 + count = re.match('test-(\d+)', labels[0]).group(1) + self._compare_raw(getattr(self, 'msg' + count), meter) + + def test_new_meter(self): + meter = self.session.query(Meter).first() + assert meter is not None + + def test_get_raw_events_by_user(self): + f = storage.EventFilter(user='user-id') + results = list(self.conn.get_raw_events(f)) + assert len(results) == 2 + self._iterate_msgs(results) + + def test_get_raw_events_by_project(self): + f = storage.EventFilter(project='project-id') + results = list(self.conn.get_raw_events(f)) + assert len(results) == 3 + self._iterate_msgs(results) + + def test_get_raw_events_by_resource(self): + f = storage.EventFilter(user='user-id', resource='resource-id') + results = list(self.conn.get_raw_events(f)) + assert len(results) == 1 + self._compare_raw(self.msg1, results[0]) + + def test_get_raw_events_by_start_time(self): + f = storage.EventFilter( + user='user-id', + start=datetime.datetime(2012, 7, 2, 10, 41), + ) + results = list(self.conn.get_raw_events(f)) + assert len(results) == 1 + assert results[0]['timestamp'] == datetime.datetime(2012, 7, 2, 10, 41) + + def test_get_raw_events_by_end_time(self): + f = storage.EventFilter( + user='user-id', + end=datetime.datetime(2012, 7, 2, 10, 41), + ) + results = list(self.conn.get_raw_events(f)) + length = len(results) + assert length == 1 + assert results[0]['timestamp'] == datetime.datetime(2012, 7, 2, 10, 40) + + def test_get_raw_events_by_both_times(self): + f = storage.EventFilter( + start=datetime.datetime(2012, 7, 2, 10, 42), + end=datetime.datetime(2012, 7, 2, 10, 43), + ) + results = list(self.conn.get_raw_events(f)) + length = len(results) + assert length == 1 + assert results[0]['timestamp'] == datetime.datetime(2012, 7, 2, 10, 42) + + def test_get_raw_events_by_meter(self): + f = storage.EventFilter(user='user-id', meter='no-such-meter') + results = list(self.conn.get_raw_events(f)) + assert not results + + def test_get_raw_events_by_meter2(self): + f = storage.EventFilter(user='user-id', meter='instance') + results = list(self.conn.get_raw_events(f)) + assert results + + +class TestGetEventInterval(SQLAlchemyEngineTestBase): + + def setUp(self): + super(TestGetEventInterval, self).setUp() + + # Create events relative to the range and pretend + # that the intervening events exist. + + self.start = datetime.datetime(2012, 8, 28, 0, 0) + self.end = datetime.datetime(2012, 8, 29, 0, 0) + + self.early1 = self.start - datetime.timedelta(minutes=20) + self.early2 = self.start - datetime.timedelta(minutes=10) + + self.middle1 = self.start + datetime.timedelta(minutes=10) + self.middle2 = self.end - datetime.timedelta(minutes=10) + + self.late1 = self.end + datetime.timedelta(minutes=10) + self.late2 = self.end + datetime.timedelta(minutes=20) + + self._filter = storage.EventFilter( + resource='111', + meter='instance', + start=self.start, + end=self.end, + ) + + def _make_events(self, *timestamps): + for t in timestamps: + c = counter.Counter( + 'test', + 'instance', + 'cumulative', + 1, + '11', + '1', + '111', + timestamp=t, + duration=0, + resource_metadata={'display_name': 'test-server', + } + ) + msg = meter.meter_message_from_counter(c) + self.conn.record_metering_data(msg) + + def test_before_range(self): + self._make_events(self.early1, self.early2) + s, e = self.conn.get_event_interval(self._filter) + assert s is None + assert e is None + + def test_overlap_range_start(self): + self._make_events(self.early1, self.start, self.middle1) + s, e = self.conn.get_event_interval(self._filter) + assert s == self.start + assert e == self.middle1 + + def test_within_range(self): + self._make_events(self.middle1, self.middle2) + s, e = self.conn.get_event_interval(self._filter) + assert s == self.middle1 + assert e == self.middle2 + + def test_within_range_zero_duration(self): + self._make_events(self.middle1) + s, e = self.conn.get_event_interval(self._filter) + assert s == self.middle1 + assert e == self.middle1 + + def test_within_range_zero_duration_two_events(self): + self._make_events(self.middle1, self.middle1) + s, e = self.conn.get_event_interval(self._filter) + assert s == self.middle1 + assert e == self.middle1 + + def test_overlap_range_end(self): + self._make_events(self.middle2, self.end, self.late1) + s, e = self.conn.get_event_interval(self._filter) + assert s == self.middle2 + assert e == self.middle2 + + def test_overlap_range_end_with_offset(self): + self._make_events(self.middle2, self.end, self.late1) + self._filter.end = self.late1 + s, e = self.conn.get_event_interval(self._filter) + assert s == self.middle2 + assert e == self.end + + def test_after_range(self): + self._make_events(self.late1, self.late2) + s, e = self.conn.get_event_interval(self._filter) + assert s is None + assert e is None diff --git a/tools/dbsync b/tools/dbsync new file mode 100755 index 000000000..0769e45aa --- /dev/null +++ b/tools/dbsync @@ -0,0 +1,27 @@ +#!/usr/bin/env python +# -*- encoding: utf-8 -*- +# +# Author: John Tran +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +"""Run SQLAlchemy db migration. +""" + +import sys +from ceilometer import storage +from ceilometer.storage import migration +from ceilometer.openstack.common import cfg + +if __name__ == '__main__': + cfg.CONF(sys.argv[1:]) + migration.db_sync()