Split and fix live db tests
With this commit, it's now possible to only run live tests (i.e. on real database) on certain backend. Typically, I don't have HBase installed, so now I can run the live tests against MongoDB and MySQL only very easily. Change-Id: I0e584b8243abc27ab9f027fdea17dc6c1192c62d Signed-off-by: Julien Danjou <julien@danjou.info>
This commit is contained in:
parent
a284960c37
commit
473dce84b8
@ -120,10 +120,13 @@ class Connection(base.Connection):
|
||||
opts = self._parse_connection_url(conf.database_connection)
|
||||
opts['table_prefix'] = conf.table_prefix
|
||||
|
||||
# This is a in-memory usage for unit tests
|
||||
if opts['host'] == '__test__':
|
||||
live_tests = bool(int(os.environ.get('CEILOMETER_TEST_LIVE', 0)))
|
||||
if not live_tests:
|
||||
url = os.environ.get('CEILOMETER_TEST_HBASE_URL')
|
||||
if url:
|
||||
# Reparse URL, but from the env variable now
|
||||
opts = self._parse_connection_url(url)
|
||||
else:
|
||||
# This is a in-memory usage for unit tests
|
||||
self.conn = MConnection()
|
||||
self.project = self.conn.table(self.PROJECT_TABLE)
|
||||
self.user = self.conn.table(self.USER_TABLE)
|
||||
@ -131,16 +134,6 @@ class Connection(base.Connection):
|
||||
self.meter = self.conn.table(self.METER_TABLE)
|
||||
return
|
||||
|
||||
# Export this variable before running tests against real HBase
|
||||
# e.g. CEILOMETER_TEST_HBASE_URL=hbase://192.168.1.100:9090
|
||||
url = os.environ.get('CEILOMETER_TEST_HBASE_URL')
|
||||
if not url:
|
||||
raise RuntimeError("CEILOMETER_TEST_LIVE is on, but "
|
||||
"CEILOMETER_TEST_HBASE_URL is not defined")
|
||||
|
||||
# Reparse URL, but from the env variable now
|
||||
opts = self._parse_connection_url(url)
|
||||
|
||||
self.conn = self._get_connection(opts)
|
||||
self.conn.open()
|
||||
self.project = self.conn.table(self.PROJECT_TABLE)
|
||||
|
@ -248,13 +248,8 @@ class Connection(base.Connection):
|
||||
LOG.info('connecting to MongoDB on %s:%s', opts['host'], opts['port'])
|
||||
|
||||
if opts['host'] == '__test__':
|
||||
live_tests = bool(int(os.environ.get('CEILOMETER_TEST_LIVE', 0)))
|
||||
if live_tests:
|
||||
url = os.environ.get('CEILOMETER_TEST_MONGO_URL')
|
||||
if not url:
|
||||
raise RuntimeError("CEILOMETER_TEST_LIVE is on, but "
|
||||
"CEILOMETER_TEST_MONGO_URL "
|
||||
"is not defined")
|
||||
url = os.environ.get('CEILOMETER_TEST_MONGODB_URL')
|
||||
if url:
|
||||
opts = self._parse_connection_url(url)
|
||||
self.conn = pymongo.Connection(opts['host'],
|
||||
opts['port'],
|
||||
|
@ -1,6 +1,7 @@
|
||||
# -*- encoding: utf-8 -*-
|
||||
#
|
||||
# Author: John Tran <jhtran@att.com>
|
||||
# Julien Danjou <julien@danjou.info>
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
@ -19,6 +20,7 @@
|
||||
from __future__ import absolute_import
|
||||
|
||||
import copy
|
||||
import os
|
||||
from sqlalchemy import func
|
||||
|
||||
from ceilometer.openstack.common import log
|
||||
@ -125,9 +127,11 @@ class Connection(base.Connection):
|
||||
"""SqlAlchemy connection."""
|
||||
|
||||
def __init__(self, conf):
|
||||
LOG.info('connecting to %s', conf.database_connection)
|
||||
self.session = self._get_connection(conf)
|
||||
return
|
||||
url = conf.database_connection
|
||||
if url == 'sqlite://':
|
||||
url = os.environ.get('CEILOMETER_TEST_SQL_URL', url)
|
||||
LOG.info('connecting to %s', url)
|
||||
self.session = sqlalchemy_session.get_session(url, conf)
|
||||
|
||||
def upgrade(self, version=None):
|
||||
migration.db_sync(self.session.get_bind(), version=version)
|
||||
@ -137,10 +141,6 @@ class Connection(base.Connection):
|
||||
for table in reversed(Base.metadata.sorted_tables):
|
||||
engine.execute(table.delete())
|
||||
|
||||
def _get_connection(self, conf):
|
||||
"""Return a connection to the database."""
|
||||
return sqlalchemy_session.get_session()
|
||||
|
||||
def record_metering_data(self, data):
|
||||
"""Write the data to the backend storage system.
|
||||
|
||||
@ -205,7 +205,7 @@ class Connection(base.Connection):
|
||||
|
||||
:param source: Optional source filter.
|
||||
"""
|
||||
query = model_query(User.id, session=self.session)
|
||||
query = self.session.query(User.id)
|
||||
if source is not None:
|
||||
query = query.filter(User.sources.any(id=source))
|
||||
return (x[0] for x in query.all())
|
||||
@ -215,7 +215,7 @@ class Connection(base.Connection):
|
||||
|
||||
:param source: Optional source filter.
|
||||
"""
|
||||
query = model_query(Project.id, session=self.session)
|
||||
query = self.session.query(Project.id)
|
||||
if source:
|
||||
query = query.filter(Project.sources.any(id=source))
|
||||
return (x[0] for x in query.all())
|
||||
@ -241,8 +241,7 @@ class Connection(base.Connection):
|
||||
:param metaquery: Optional dict with metadata to match on.
|
||||
:param resource: Optional resource filter.
|
||||
"""
|
||||
query = model_query(Meter,
|
||||
session=self.session).group_by(Meter.resource_id)
|
||||
query = self.session.query(Meter,).group_by(Meter.resource_id)
|
||||
if user is not None:
|
||||
query = query.filter(Meter.user_id == user)
|
||||
if source is not None:
|
||||
@ -293,7 +292,7 @@ class Connection(base.Connection):
|
||||
:param source: Optional source filter.
|
||||
:param metaquery: Optional dict with metadata to match on.
|
||||
"""
|
||||
query = model_query(Resource, session=self.session)
|
||||
query = self.session.query(Resource)
|
||||
if user is not None:
|
||||
query = query.filter(Resource.user_id == user)
|
||||
if source is not None:
|
||||
@ -326,7 +325,7 @@ class Connection(base.Connection):
|
||||
"""Return an iterable of samples as created by
|
||||
:func:`ceilometer.meter.meter_message_from_counter`.
|
||||
"""
|
||||
query = model_query(Meter, session=self.session)
|
||||
query = self.session.query(Meter)
|
||||
query = make_query_from_filter(query, event_filter,
|
||||
require_meter=False)
|
||||
samples = query.all()
|
||||
@ -346,7 +345,7 @@ class Connection(base.Connection):
|
||||
|
||||
def _make_volume_query(self, event_filter, counter_volume_func):
|
||||
"""Returns complex Meter counter_volume query for max and sum."""
|
||||
subq = model_query(Meter.id, session=self.session)
|
||||
subq = self.session.query(Meter.id)
|
||||
subq = make_query_from_filter(subq, event_filter, require_meter=False)
|
||||
subq = subq.subquery()
|
||||
mainq = self.session.query(Resource.id, counter_volume_func)
|
||||
@ -458,16 +457,6 @@ class Connection(base.Connection):
|
||||
return results
|
||||
|
||||
|
||||
def model_query(*args, **kwargs):
|
||||
"""Query helper.
|
||||
|
||||
:param session: if present, the session to use
|
||||
"""
|
||||
session = kwargs.get('session') or sqlalchemy_session.get_session()
|
||||
query = session.query(*args)
|
||||
return query
|
||||
|
||||
|
||||
def row2dict(row, srcflag=False):
|
||||
"""Convert User, Project, Meter, Resource instance to dictionary object
|
||||
with nested Source(s) and Meter(s)
|
||||
|
@ -1,6 +1,7 @@
|
||||
# -*- encoding: utf-8 -*-
|
||||
#
|
||||
# Author: John Tran <jhtran@att.com>
|
||||
# Julien Danjou <julien@danjou.info>
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
@ -22,7 +23,6 @@ from migrate.versioning import util as migrate_util
|
||||
import sqlalchemy
|
||||
|
||||
from ceilometer.openstack.common import log
|
||||
from ceilometer.storage.sqlalchemy import session
|
||||
|
||||
|
||||
INIT_VERSION = 1
|
||||
@ -66,7 +66,7 @@ def db_sync(engine, version=None):
|
||||
except ValueError:
|
||||
raise Exception(_("version should be an integer"))
|
||||
|
||||
current_version = db_version()
|
||||
current_version = db_version(engine)
|
||||
repository = _find_migrate_repo()
|
||||
if version is None or version > current_version:
|
||||
return versioning_api.upgrade(engine, repository, version)
|
||||
@ -75,23 +75,23 @@ def db_sync(engine, version=None):
|
||||
version)
|
||||
|
||||
|
||||
def db_version():
|
||||
def db_version(engine):
|
||||
repository = _find_migrate_repo()
|
||||
try:
|
||||
return versioning_api.db_version(session.get_engine(), repository)
|
||||
return versioning_api.db_version(engine,
|
||||
repository)
|
||||
except versioning_exceptions.DatabaseNotControlledError:
|
||||
meta = sqlalchemy.MetaData()
|
||||
engine = session.get_engine()
|
||||
meta.reflect(bind=engine)
|
||||
tables = meta.tables
|
||||
if len(tables) == 0:
|
||||
db_version_control(0)
|
||||
return versioning_api.db_version(session.get_engine(), repository)
|
||||
db_version_control(engine, 0)
|
||||
return versioning_api.db_version(engine, repository)
|
||||
|
||||
|
||||
def db_version_control(version=None):
|
||||
def db_version_control(engine, version=None):
|
||||
repository = _find_migrate_repo()
|
||||
versioning_api.version_control(session.get_engine(), repository, version)
|
||||
versioning_api.version_control(engine, repository, version)
|
||||
return version
|
||||
|
||||
|
||||
|
@ -61,12 +61,13 @@ sql_opts = [
|
||||
cfg.CONF.register_opts(sql_opts)
|
||||
|
||||
|
||||
def get_session(autocommit=True, expire_on_commit=False, autoflush=True):
|
||||
def get_session(database_connection, conf,
|
||||
autocommit=True, expire_on_commit=False, autoflush=True):
|
||||
"""Return a SQLAlchemy session."""
|
||||
global _MAKER
|
||||
|
||||
if _MAKER is None:
|
||||
engine = get_engine()
|
||||
engine = get_engine(database_connection, conf)
|
||||
_MAKER = get_maker(engine, autocommit, expire_on_commit, autoflush)
|
||||
|
||||
session = _MAKER()
|
||||
@ -116,44 +117,44 @@ def is_db_connection_error(args):
|
||||
return False
|
||||
|
||||
|
||||
def get_engine():
|
||||
def get_engine(database_connection, conf):
|
||||
"""Return a SQLAlchemy engine."""
|
||||
global _ENGINE
|
||||
if _ENGINE is None:
|
||||
connection_dict = sqlalchemy.engine.url.make_url(
|
||||
cfg.CONF.database_connection)
|
||||
database_connection)
|
||||
|
||||
engine_args = {
|
||||
"pool_recycle": cfg.CONF.sql_idle_timeout,
|
||||
"pool_recycle": conf.sql_idle_timeout,
|
||||
"echo": False,
|
||||
'convert_unicode': True,
|
||||
}
|
||||
|
||||
# Map our SQL debug level to SQLAlchemy's options
|
||||
if cfg.CONF.sql_connection_debug >= 100:
|
||||
if conf.sql_connection_debug >= 100:
|
||||
engine_args['echo'] = 'debug'
|
||||
elif cfg.CONF.sql_connection_debug >= 50:
|
||||
elif conf.sql_connection_debug >= 50:
|
||||
engine_args['echo'] = True
|
||||
|
||||
if "sqlite" in connection_dict.drivername:
|
||||
engine_args["poolclass"] = pool.NullPool
|
||||
|
||||
if cfg.CONF.database_connection == "sqlite://":
|
||||
if database_connection == "sqlite://":
|
||||
engine_args["poolclass"] = pool.StaticPool
|
||||
engine_args["connect_args"] = {'check_same_thread': False}
|
||||
|
||||
_ENGINE = sqlalchemy.create_engine(cfg.CONF.database_connection,
|
||||
_ENGINE = sqlalchemy.create_engine(database_connection,
|
||||
**engine_args)
|
||||
|
||||
if 'mysql' in connection_dict.drivername:
|
||||
sqlalchemy.event.listen(_ENGINE, 'checkout', ping_listener)
|
||||
elif "sqlite" in connection_dict.drivername:
|
||||
if not cfg.CONF.sqlite_synchronous:
|
||||
if not conf.sqlite_synchronous:
|
||||
sqlalchemy.event.listen(_ENGINE, 'connect',
|
||||
synchronous_switch_listener)
|
||||
sqlalchemy.event.listen(_ENGINE, 'connect', add_regexp_listener)
|
||||
|
||||
if (cfg.CONF.sql_connection_trace and
|
||||
if (conf.sql_connection_trace and
|
||||
_ENGINE.dialect.dbapi.__name__ == 'MySQLdb'):
|
||||
import MySQLdb.cursors
|
||||
_do_query = debug_mysql_do_query()
|
||||
@ -165,7 +166,7 @@ def get_engine():
|
||||
if not is_db_connection_error(e.args[0]):
|
||||
raise
|
||||
|
||||
remaining = cfg.CONF.sql_max_retries
|
||||
remaining = conf.sql_max_retries
|
||||
if remaining == -1:
|
||||
remaining = 'infinite'
|
||||
while True:
|
||||
@ -173,7 +174,7 @@ def get_engine():
|
||||
LOG.warn(msg % remaining)
|
||||
if remaining != 'infinite':
|
||||
remaining -= 1
|
||||
time.sleep(cfg.CONF.sql_retry_interval)
|
||||
time.sleep(conf.sql_retry_interval)
|
||||
try:
|
||||
_ENGINE.connect()
|
||||
break
|
||||
|
@ -19,14 +19,9 @@
|
||||
"""Tests for ceilometer/storage/impl_hbase.py
|
||||
|
||||
.. note::
|
||||
To run the tests using in-memory mocked HappyBase API,
|
||||
set the environment variable CEILOMETER_TEST_LIVE=0 (this is the default
|
||||
value)
|
||||
|
||||
In order to run the tests against real HBase server set the environment
|
||||
variable CEILOMETER_TEST_LIVE=1 and set HBASE_URL below to
|
||||
point to that HBase instance before running the tests. Make sure the Thrift
|
||||
server is running on that server.
|
||||
variable CEILOMETER_TEST_HBASE_URL to point to that HBase instance before
|
||||
running the tests. Make sure the Thrift server is running on that server.
|
||||
|
||||
"""
|
||||
from tests.storage import base
|
||||
|
@ -42,7 +42,7 @@
|
||||
pip install python-spidermonkey
|
||||
|
||||
To run the tests *without* mim, set the environment variable
|
||||
CEILOMETER_TEST_LIVE=1 before running tox.
|
||||
CEILOMETER_TEST_MONGODB_URL to a MongoDB URL before running tox.
|
||||
|
||||
"""
|
||||
|
||||
|
@ -1,6 +1,7 @@
|
||||
# -*- encoding: utf-8 -*-
|
||||
#
|
||||
# Author: John Tran <jhtran@att.com>
|
||||
# Julien Danjou <julien@danjou.info>
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
@ -14,6 +15,12 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
"""Tests for ceilometer/storage/impl_sqlalchemy.py
|
||||
|
||||
.. note::
|
||||
In order to run the tests against real SQL server set the environment
|
||||
variable CEILOMETER_TEST_SQL_URL to point to a SQL server before running
|
||||
the tests.
|
||||
|
||||
"""
|
||||
|
||||
from oslo.config import cfg
|
||||
|
Loading…
Reference in New Issue
Block a user