unified warnings, use compare columns in tests
This commit is contained in:
parent
feab45798f
commit
43e0c3caf2
@ -1,7 +1,9 @@
|
||||
"""
|
||||
Schema module providing common schema operations.
|
||||
"""
|
||||
import warnings
|
||||
from UserDict import DictMixin
|
||||
|
||||
import sqlalchemy
|
||||
|
||||
from migrate.changeset import SQLA_06
|
||||
@ -105,8 +107,8 @@ def alter_column(*p, **k):
|
||||
|
||||
# deprecation
|
||||
if len(p) >= 2 and isinstance(p[1], sqlalchemy.Column):
|
||||
raise MigrateDeprecationWarning("Alter column with comparing columns is deprecated. Just pass in parameters instead.")
|
||||
|
||||
warnings.warn("Alter column with comparing columns is deprecated."
|
||||
" Just pass in parameters instead.", MigrateDeprecationWarning)
|
||||
engine = k['engine']
|
||||
delta = ColumnDelta(*p, **k)
|
||||
|
||||
|
@ -27,7 +27,6 @@
|
||||
|
||||
import sys
|
||||
import inspect
|
||||
import warnings
|
||||
import logging
|
||||
|
||||
from migrate.versioning import (exceptions, repository, schema, version,
|
||||
|
@ -140,7 +140,7 @@ class PythonScript(base.BaseScript):
|
||||
script_func(engine)
|
||||
except TypeError:
|
||||
warnings.warn("upgrade/downgrade functions must accept engine"
|
||||
" parameter (since version > 0.5.4)")
|
||||
" parameter (since version > 0.5.4)", MigrateDeprecationWarning)
|
||||
raise
|
||||
|
||||
@property
|
||||
|
@ -14,6 +14,7 @@ from sqlalchemy.pool import StaticPool
|
||||
from migrate.versioning import exceptions
|
||||
from migrate.versioning.util.keyedinstance import KeyedInstance
|
||||
from migrate.versioning.util.importpath import import_path
|
||||
from migrate.changeset.exceptions import *
|
||||
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
@ -30,7 +31,7 @@ def load_model(dotted_name):
|
||||
if ':' not in dotted_name:
|
||||
# backwards compatibility
|
||||
warnings.warn('model should be in form of module.model:User '
|
||||
'and not module.model.User', DeprecationWarning)
|
||||
'and not module.model.User', MigrateDeprecationWarning)
|
||||
dotted_name = ':'.join(dotted_name.rsplit('.', 1))
|
||||
return EntryPoint.parse('x=%s' % dotted_name).load(False)
|
||||
else:
|
||||
@ -126,7 +127,7 @@ def construct_engine(engine, **opts):
|
||||
if echo:
|
||||
warnings.warn('echo=True parameter is deprecated, pass '
|
||||
'engine_arg_echo=True or engine_dict={"echo": True}',
|
||||
DeprecationWarning)
|
||||
MigrateDeprecationWarning)
|
||||
kwargs['echo'] = echo
|
||||
|
||||
# parse keyword arguments
|
||||
|
@ -55,11 +55,11 @@ class TestConstraint(CommonTestConstraint):
|
||||
pk = PrimaryKeyConstraint(table=self.table, name='temp_pk_key', *cols)
|
||||
else:
|
||||
pk = PrimaryKeyConstraint(table=self.table, *cols)
|
||||
self.assertEquals(list(pk.columns), list(cols))
|
||||
self.compare_columns_equal(pk.columns, cols)
|
||||
pk.create()
|
||||
self.refresh_table()
|
||||
if not self.url.startswith('sqlite'):
|
||||
self.assertEquals(list(self.table.primary_key), list(cols))
|
||||
self.compare_columns_equal(self.table.primary_key, cols, ['type', 'autoincrement'])
|
||||
|
||||
# Drop the PK constraint
|
||||
#if (self.engine.name in ('oracle', 'firebird')):
|
||||
@ -85,7 +85,8 @@ class TestConstraint(CommonTestConstraint):
|
||||
name="fk_id_fkey",
|
||||
ondelete="CASCADE")
|
||||
self.assert_(self.table.c.fkey.foreign_keys._list is not [])
|
||||
self.assertEquals(list(fk.columns), [self.table.c.fkey])
|
||||
for key in fk.columns:
|
||||
self.assertEquals(key, self.table.c.fkey.name)
|
||||
self.assertEquals([e.column for e in fk.elements], [self.table.c.id])
|
||||
self.assertEquals(list(fk.referenced), [self.table.c.id])
|
||||
|
||||
@ -186,7 +187,7 @@ class TestAutoname(CommonTestConstraint):
|
||||
self.refresh_table()
|
||||
if not self.url.startswith('sqlite'):
|
||||
# TODO: test for index for sqlite
|
||||
self.assertEquals(list(cons.columns), list(self.table.primary_key))
|
||||
self.compare_columns_equal(cons.columns, self.table.primary_key, ['autoincrement', 'type'])
|
||||
|
||||
# Remove the name, drop the constraint; it should succeed
|
||||
cons.name = None
|
||||
@ -200,7 +201,7 @@ class TestAutoname(CommonTestConstraint):
|
||||
self.refresh_table()
|
||||
if not self.url.startswith('sqlite'):
|
||||
# TODO: test for index for sqlite
|
||||
self.assertEquals(list(cons.columns), list(self.table.primary_key))
|
||||
self.compare_columns_equal(cons.columns, self.table.primary_key)
|
||||
cons.name = None
|
||||
cons.drop()
|
||||
|
||||
|
@ -9,6 +9,7 @@ from sqlalchemy.orm import create_session
|
||||
from sqlalchemy.pool import StaticPool
|
||||
|
||||
from migrate.changeset import SQLA_06
|
||||
from migrate.changeset.schema import ColumnDelta
|
||||
from migrate.versioning.util import Memoize
|
||||
|
||||
from tests.fixture.base import Base
|
||||
@ -156,4 +157,14 @@ class DB(Base):
|
||||
self.meta.clear()
|
||||
self.table = Table(name, self.meta, autoload=True)
|
||||
|
||||
def compare_columns_equal(self, columns1, columns2, ignore=None):
|
||||
"""Loop through all columns and compare them"""
|
||||
for c1, c2 in zip(list(columns1), list(columns2)):
|
||||
diffs = ColumnDelta(c1, c2).diffs
|
||||
if ignore:
|
||||
for key in ignore:
|
||||
diffs.pop(key, None)
|
||||
if diffs:
|
||||
self.fail("Comparing %s to %s failed: %s" % (columns1, columns2, diffs))
|
||||
|
||||
# TODO: document engine.dispose and write tests
|
||||
|
@ -78,15 +78,14 @@ class TestSchemaDiff(fixture.DB):
|
||||
# Check Python code gen from database.
|
||||
diff = schemadiff.getDiffOfModelAgainstDatabase(MetaData(), self.engine, excludeTables=['migrate_version'])
|
||||
src = genmodel.ModelGenerator(diff).toPython()
|
||||
src = src.replace(genmodel.HEADER, '')
|
||||
self.assertEqualsIgnoreWhitespace(src, '''
|
||||
tmp_schemadiff = Table('tmp_schemadiff',meta,
|
||||
Column('id',Integer(),primary_key=True,nullable=False),
|
||||
Column('name',Text(length=None,convert_unicode=False,assert_unicode=None)),
|
||||
Column('data',Text(length=None,convert_unicode=False,assert_unicode=None)),
|
||||
)
|
||||
''')
|
||||
|
||||
|
||||
exec src in locals()
|
||||
|
||||
c1 = Table('tmp_schemadiff', self.meta, autoload=True).c
|
||||
c2 = tmp_schemadiff.c
|
||||
self.compare_columns_equal(c1, c2, ['type'])
|
||||
# TODO: get rid of ignoring type
|
||||
|
||||
if not self.engine.name == 'oracle':
|
||||
# Add data, later we'll make sure it's still present.
|
||||
result = self.engine.execute(self.table.insert(), id=1, name=u'mydata')
|
||||
|
Loading…
Reference in New Issue
Block a user