Use storage scenario test base to test migration

This change make the migration test running based on real storage backend
scenario.

Change-Id: If4cb8960406545bfa4157be40879f1fbf301724c
Closes-Bug: 1481875
This commit is contained in:
liu-sheng 2015-08-07 11:00:47 +08:00
parent 12ca732740
commit a3d8780049
3 changed files with 14 additions and 26 deletions

View File

@ -66,7 +66,7 @@ class PreciseTimestamp(TypeDecorator):
def compare_against_backend(self, dialect, conn_type):
if dialect.name == 'mysql':
return issubclass(type(conn_type), DECIMAL)
return issubclass(type(conn_type), type(self.impl))
return issubclass(type(conn_type), DateTime)
@staticmethod
def process_result_value(value, dialect):

View File

@ -185,8 +185,9 @@ def run_with(*drivers):
if isinstance(test, type) and issubclass(test, TestBase):
# Decorate all test methods
for attr in dir(test):
if attr.startswith('test_'):
value = getattr(test, attr)
if callable(value) and attr.startswith('test_'):
if callable(value):
if six.PY3:
value._run_with = drivers
else:

View File

@ -15,48 +15,35 @@
import abc
import mock
from oslo_config import fixture as fixture_config
from oslo_db.sqlalchemy import test_migrations
import six
import six.moves.urllib.parse as urlparse
from aodh import service
from aodh.storage import impl_sqlalchemy
from aodh.storage.sqlalchemy import models
from aodh.tests import base
from aodh.tests import db as tests_db
class ABCSkip(base.SkipNotImplementedMeta, abc.ABCMeta):
pass
@tests_db.run_with('mysql', 'pgsql', 'sqlite')
class ModelsMigrationsSync(
six.with_metaclass(ABCSkip,
base.BaseTestCase,
tests_db.TestBase,
tests_db.MixinTestsWithBackendScenarios,
test_migrations.ModelsMigrationsSync)):
def setUp(self):
super(ModelsMigrationsSync, self).setUp()
self.db = mock.Mock()
conf = service.prepare_service([])
self.conf = self.useFixture(fixture_config.Config(conf)).conf
db_url = self.conf.database.connection
if not db_url:
self.skipTest("The db connection option should be specified.")
connection_scheme = urlparse.urlparse(db_url).scheme
engine_name = connection_scheme.split('+')[0]
if engine_name not in ('postgresql', 'mysql', 'sqlite'):
self.skipTest("This test only works with PostgreSQL or MySQL or"
" SQLite")
self.conn = impl_sqlalchemy.Connection(self.conf,
self.conf.database.connection)
@staticmethod
def get_metadata():
return models.Base.metadata
def get_engine(self):
return self.conn._engine_facade.get_engine()
return self.alarm_conn._engine_facade.get_engine()
def db_sync(self, engine):
self.conn.upgrade(nocreate=True)
pass