partly fix SA0.6 tests on postgres
This commit is contained in:
parent
42ab0ba796
commit
e4868841d5
@ -403,7 +403,7 @@ class ChangesetTable(object):
|
||||
|
||||
def drop_column(self, column, *p, **kw):
|
||||
"""Drop a column, given its name or definition.
|
||||
|
||||
|
||||
API to :meth:`ChangesetColumn.drop`
|
||||
|
||||
:param column: Column to be droped
|
||||
@ -417,7 +417,7 @@ class ChangesetTable(object):
|
||||
# That column isn't part of the table. We don't need
|
||||
# its entire definition to drop the column, just its
|
||||
# name, so create a dummy column with the same name.
|
||||
column = sqlalchemy.Column(str(column))
|
||||
column = sqlalchemy.Column(str(column), sqlalchemy.Integer())
|
||||
column.drop(table=self, *p, **kw)
|
||||
|
||||
def rename(self, name, *args, **kwargs):
|
||||
|
@ -9,9 +9,11 @@
|
||||
import sys
|
||||
import logging
|
||||
|
||||
import migrate
|
||||
import sqlalchemy
|
||||
|
||||
import migrate
|
||||
import migrate.changeset
|
||||
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
HEADER = """
|
||||
@ -37,7 +39,6 @@ class ModelGenerator(object):
|
||||
self.diff = diff
|
||||
self.declarative = declarative
|
||||
|
||||
|
||||
def column_repr(self, col):
|
||||
kwarg = []
|
||||
if col.key != col.name:
|
||||
@ -174,9 +175,6 @@ class ModelGenerator(object):
|
||||
|
||||
def applyModel(self):
|
||||
"""Apply model to current database."""
|
||||
# Yuck! We have to import from changeset to apply the
|
||||
# monkey-patch to allow column adding/dropping.
|
||||
from migrate.changeset import schema
|
||||
|
||||
def dbCanHandleThisChange(missingInDatabase, missingInModel, diffDecl):
|
||||
if missingInDatabase and not missingInModel and not diffDecl:
|
||||
|
@ -45,9 +45,9 @@ class ControlledSchema(object):
|
||||
self.table.c.repository_id == str(self.repository.id)))
|
||||
|
||||
data = list(result)[0]
|
||||
except Exception:
|
||||
except:
|
||||
cls, exc, tb = sys.exc_info()
|
||||
raise exceptions.DatabaseNotControlledError, exc.message, tb
|
||||
raise exceptions.DatabaseNotControlledError, exc.__str__(), tb
|
||||
|
||||
self.version = data['version']
|
||||
return data
|
||||
|
2
setup.py
2
setup.py
@ -14,7 +14,7 @@ try:
|
||||
except ImportError:
|
||||
pass
|
||||
|
||||
test_requirements = ['nose >= 0.10', 'ScriptTest']
|
||||
test_requirements = ['nose >= 0.10', 'ScriptTest==1.0.1']
|
||||
required_deps = ['sqlalchemy >= 0.5', 'decorator', 'tempita']
|
||||
readme_file = open(os.path.join(os.path.dirname(os.path.abspath(__file__)), 'README'))
|
||||
|
||||
|
@ -3,4 +3,4 @@ nose
|
||||
-e git://github.com/cmheisel/nose-xcover.git#egg=nosexcover
|
||||
pytz
|
||||
psycopg2
|
||||
scripttest
|
||||
scripttest==1.0.1
|
||||
|
@ -41,7 +41,7 @@ class TestAddDropColumn(fixture.DB):
|
||||
# number of cols should be correct in table object and in database
|
||||
self.refresh_table(self.table_name)
|
||||
result = len(self.table.c)
|
||||
|
||||
|
||||
self.assertEquals(result, num_of_expected_cols),
|
||||
if col_k.get('primary_key', None):
|
||||
# new primary key: check its length too
|
||||
@ -273,7 +273,7 @@ class TestAddDropColumn(fixture.DB):
|
||||
col.create(self.table)
|
||||
|
||||
self.table.insert(values={'id': 10}).execute()
|
||||
row = self.table.select(autocommit=True).execute().fetchone()
|
||||
row = self._select_row()
|
||||
self.assertEqual(u'foobar', row['data'])
|
||||
|
||||
col.drop()
|
||||
@ -287,7 +287,7 @@ class TestAddDropColumn(fixture.DB):
|
||||
col.create(self.table, populate_default=True)
|
||||
|
||||
self.table.insert(values={'id': 10}).execute()
|
||||
row = self.table.select(autocommit=True).execute().fetchone()
|
||||
row = self._select_row()
|
||||
self.assertEqual(u'foobar', row['data'])
|
||||
|
||||
col.drop()
|
||||
@ -509,7 +509,7 @@ class TestColumnChange(fixture.DB):
|
||||
# TextClause returned by autoload
|
||||
self.assert_(default in str(self.table.c.data.server_default.arg))
|
||||
self.engine.execute(self.table.insert(), id=12)
|
||||
row = self.table.select(autocommit=True).execute().fetchone()
|
||||
row = self._select_row()
|
||||
self.assertEqual(row['data'], default)
|
||||
|
||||
# Column object
|
||||
@ -526,15 +526,17 @@ class TestColumnChange(fixture.DB):
|
||||
# server_default isn't necessarily None for Oracle
|
||||
#self.assert_(self.table.c.data.server_default is None,self.table.c.data.server_default)
|
||||
self.engine.execute(self.table.insert(), id=11)
|
||||
row = self.table.select(self.table.c.id == 11, autocommit=True).execute().fetchone()
|
||||
if SQLA_06:
|
||||
row = self.table.select(self.table.c.id == 11).execution_options(autocommit=True).execute().fetchone()
|
||||
else:
|
||||
row = self.table.select(self.table.c.id == 11, autocommit=True).execute().fetchone()
|
||||
self.assert_(row['data'] is None, row['data'])
|
||||
|
||||
|
||||
@fixture.usedb(not_supported='firebird')
|
||||
def test_null(self):
|
||||
"""Can change a column's null constraint"""
|
||||
self.assertEquals(self.table.c.data.nullable, True)
|
||||
|
||||
|
||||
# Column object
|
||||
self.table.c.data.alter(Column('data', String(40), nullable=False))
|
||||
self.table.nullable = None
|
||||
@ -607,7 +609,7 @@ class TestColumnChange(fixture.DB):
|
||||
|
||||
# insert data and assert default
|
||||
self.table.insert(values={'id': 10}).execute()
|
||||
row = self.table.select(autocommit=True).execute().fetchone()
|
||||
row = self._select_row()
|
||||
self.assertEqual(u'foobar', row['data_new'])
|
||||
|
||||
|
||||
|
@ -8,6 +8,7 @@ from sqlalchemy import create_engine, Table, MetaData
|
||||
from sqlalchemy.orm import create_session
|
||||
from sqlalchemy.pool import StaticPool
|
||||
|
||||
from migrate.changeset import SQLA_06
|
||||
from migrate.versioning.util import Memoize
|
||||
from tests.fixture.base import Base
|
||||
from tests.fixture.pathed import Pathed
|
||||
@ -131,6 +132,14 @@ class DB(Base):
|
||||
def _not_supported(self, url):
|
||||
return not self._supported(url)
|
||||
|
||||
def _select_row(self):
|
||||
"""Select rows, used in multiple tests"""
|
||||
if SQLA_06:
|
||||
row = self.table.select().execution_options(autocommit=True).execute().fetchone()
|
||||
else:
|
||||
row = self.table.select(autocommit=True).execute().fetchone()
|
||||
return row
|
||||
|
||||
def refresh_table(self, name=None):
|
||||
"""Reload the table from the database
|
||||
Assumes we're working with only a single table, self.table, and
|
||||
|
@ -3,7 +3,9 @@ import os
|
||||
import sqlalchemy
|
||||
from sqlalchemy import *
|
||||
from nose.tools import eq_
|
||||
|
||||
from migrate.versioning import genmodel, schemadiff
|
||||
from migrate.changeset import schema
|
||||
|
||||
from tests import fixture
|
||||
|
||||
@ -37,22 +39,19 @@ class TestSchemaDiff(fixture.DB):
|
||||
def _applyLatestModel(self):
|
||||
diff = schemadiff.getDiffOfModelAgainstDatabase(self.meta, self.engine, excludeTables=['migrate_version'])
|
||||
genmodel.ModelGenerator(diff).applyModel()
|
||||
|
||||
|
||||
@fixture.usedb()
|
||||
def test_rundiffs(self):
|
||||
|
||||
# Yuck! We have to import from changeset to apply the monkey-patch to allow column adding/dropping.
|
||||
from migrate.changeset import schema
|
||||
|
||||
|
||||
def assertDiff(isDiff, tablesMissingInDatabase, tablesMissingInModel, tablesWithDiff):
|
||||
diff = schemadiff.getDiffOfModelAgainstDatabase(self.meta, self.engine, excludeTables=['migrate_version'])
|
||||
eq_(bool(diff), isDiff)
|
||||
eq_( ([t.name for t in diff.tablesMissingInDatabase], [t.name for t in diff.tablesMissingInModel], [t.name for t in diff.tablesWithDiff]),
|
||||
(tablesMissingInDatabase, tablesMissingInModel, tablesWithDiff) )
|
||||
|
||||
|
||||
# Model is defined but database is empty.
|
||||
assertDiff(True, [self.table_name], [], [])
|
||||
|
||||
|
||||
# Check Python upgrade and downgrade of database from updated model.
|
||||
diff = schemadiff.getDiffOfModelAgainstDatabase(self.meta, self.engine, excludeTables=['migrate_version'])
|
||||
decls, upgradeCommands, downgradeCommands = genmodel.ModelGenerator(diff).toUpgradeDowngradePython()
|
||||
|
@ -19,12 +19,12 @@ class TestUtil(fixture.Pathed):
|
||||
self.assert_(engine.name == 'sqlite')
|
||||
|
||||
# keyword arg
|
||||
engine = construct_engine(url, engine_arg_assert_unicode=True)
|
||||
self.assertTrue(engine.dialect.assert_unicode)
|
||||
engine = construct_engine(url, engine_arg_encoding=True)
|
||||
self.assertTrue(engine.dialect.encoding)
|
||||
|
||||
# dict
|
||||
engine = construct_engine(url, engine_dict={'assert_unicode': True})
|
||||
self.assertTrue(engine.dialect.assert_unicode)
|
||||
engine = construct_engine(url, engine_dict={'encoding': True})
|
||||
self.assertTrue(engine.dialect.encoding)
|
||||
|
||||
# engine parameter
|
||||
engine_orig = create_engine('sqlite://')
|
||||
@ -32,9 +32,9 @@ class TestUtil(fixture.Pathed):
|
||||
self.assertEqual(engine, engine_orig)
|
||||
|
||||
# test precedance
|
||||
engine = construct_engine(url, engine_dict={'assert_unicode': False},
|
||||
engine_arg_assert_unicode=True)
|
||||
self.assertTrue(engine.dialect.assert_unicode)
|
||||
engine = construct_engine(url, engine_dict={'encoding': False},
|
||||
engine_arg_encoding=True)
|
||||
self.assertTrue(engine.dialect.encoding)
|
||||
|
||||
# deprecated echo=True parameter
|
||||
engine = construct_engine(url, echo='True')
|
||||
|
Loading…
x
Reference in New Issue
Block a user