|
|
|
@ -15,8 +15,8 @@ from nose.plugins.skip import SkipTest
|
|
|
|
|
from migrate.versioning.repository import Repository
|
|
|
|
|
from migrate.versioning import genmodel, shell, api
|
|
|
|
|
from migrate.versioning.exceptions import *
|
|
|
|
|
from tests.fixture import Shell, DB, usedb
|
|
|
|
|
from tests.fixture import models
|
|
|
|
|
from migrate.tests.fixture import Shell, DB, usedb
|
|
|
|
|
from migrate.tests.fixture import models
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class TestShellCommands(Shell):
|
|
|
|
@ -24,15 +24,15 @@ class TestShellCommands(Shell):
|
|
|
|
|
|
|
|
|
|
def test_help(self):
|
|
|
|
|
"""Displays default help dialog"""
|
|
|
|
|
self.assertEqual(self.env.run('migrate -h').returncode, 0)
|
|
|
|
|
self.assertEqual(self.env.run('migrate --help').returncode, 0)
|
|
|
|
|
self.assertEqual(self.env.run('migrate help').returncode, 0)
|
|
|
|
|
self.assertEqual(self.env.run('bin/migrate -h').returncode, 0)
|
|
|
|
|
self.assertEqual(self.env.run('bin/migrate --help').returncode, 0)
|
|
|
|
|
self.assertEqual(self.env.run('bin/migrate help').returncode, 0)
|
|
|
|
|
|
|
|
|
|
def test_help_commands(self):
|
|
|
|
|
"""Display help on a specific command"""
|
|
|
|
|
# we can only test that we get some output
|
|
|
|
|
for cmd in api.__all__:
|
|
|
|
|
result = self.env.run('migrate help %s' % cmd)
|
|
|
|
|
result = self.env.run('bin/migrate help %s' % cmd)
|
|
|
|
|
self.assertTrue(isinstance(result.stdout, basestring))
|
|
|
|
|
self.assertTrue(result.stdout)
|
|
|
|
|
self.assertFalse(result.stderr)
|
|
|
|
@ -40,10 +40,10 @@ class TestShellCommands(Shell):
|
|
|
|
|
def test_shutdown_logging(self):
|
|
|
|
|
"""Try to shutdown logging output"""
|
|
|
|
|
repos = self.tmp_repos()
|
|
|
|
|
result = self.env.run('migrate create %s repository_name' % repos)
|
|
|
|
|
result = self.env.run('migrate version %s --disable_logging' % repos)
|
|
|
|
|
result = self.env.run('bin/migrate create %s repository_name' % repos)
|
|
|
|
|
result = self.env.run('bin/migrate version %s --disable_logging' % repos)
|
|
|
|
|
self.assertEqual(result.stdout, '')
|
|
|
|
|
result = self.env.run('migrate version %s -q' % repos)
|
|
|
|
|
result = self.env.run('bin/migrate version %s -q' % repos)
|
|
|
|
|
self.assertEqual(result.stdout, '')
|
|
|
|
|
|
|
|
|
|
# TODO: assert logging messages to 0
|
|
|
|
@ -88,7 +88,7 @@ class TestShellCommands(Shell):
|
|
|
|
|
repos = self.tmp_repos()
|
|
|
|
|
|
|
|
|
|
# Creating a file that doesn't exist should succeed
|
|
|
|
|
result = self.env.run('migrate create %s repository_name' % repos)
|
|
|
|
|
result = self.env.run('bin/migrate create %s repository_name' % repos)
|
|
|
|
|
|
|
|
|
|
# Files should actually be created
|
|
|
|
|
self.assert_(os.path.exists(repos))
|
|
|
|
@ -98,35 +98,35 @@ class TestShellCommands(Shell):
|
|
|
|
|
self.assertNotEquals(repos_.config.get('db_settings', 'version_table'), 'None')
|
|
|
|
|
|
|
|
|
|
# Can't create it again: it already exists
|
|
|
|
|
result = self.env.run('migrate create %s repository_name' % repos,
|
|
|
|
|
result = self.env.run('bin/migrate create %s repository_name' % repos,
|
|
|
|
|
expect_error=True)
|
|
|
|
|
self.assertEqual(result.returncode, 2)
|
|
|
|
|
|
|
|
|
|
def test_script(self):
|
|
|
|
|
"""We can create a migration script via the command line"""
|
|
|
|
|
repos = self.tmp_repos()
|
|
|
|
|
result = self.env.run('migrate create %s repository_name' % repos)
|
|
|
|
|
result = self.env.run('bin/migrate create %s repository_name' % repos)
|
|
|
|
|
|
|
|
|
|
result = self.env.run('migrate script --repository=%s Desc' % repos)
|
|
|
|
|
result = self.env.run('bin/migrate script --repository=%s Desc' % repos)
|
|
|
|
|
self.assert_(os.path.exists('%s/versions/001_Desc.py' % repos))
|
|
|
|
|
|
|
|
|
|
result = self.env.run('migrate script More %s' % repos)
|
|
|
|
|
result = self.env.run('bin/migrate script More %s' % repos)
|
|
|
|
|
self.assert_(os.path.exists('%s/versions/002_More.py' % repos))
|
|
|
|
|
|
|
|
|
|
result = self.env.run('migrate script "Some Random name" %s' % repos)
|
|
|
|
|
result = self.env.run('bin/migrate script "Some Random name" %s' % repos)
|
|
|
|
|
self.assert_(os.path.exists('%s/versions/003_Some_Random_name.py' % repos))
|
|
|
|
|
|
|
|
|
|
def test_script_sql(self):
|
|
|
|
|
"""We can create a migration sql script via the command line"""
|
|
|
|
|
repos = self.tmp_repos()
|
|
|
|
|
result = self.env.run('migrate create %s repository_name' % repos)
|
|
|
|
|
result = self.env.run('bin/migrate create %s repository_name' % repos)
|
|
|
|
|
|
|
|
|
|
result = self.env.run('migrate script_sql mydb %s' % repos)
|
|
|
|
|
result = self.env.run('bin/migrate script_sql mydb %s' % repos)
|
|
|
|
|
self.assert_(os.path.exists('%s/versions/001_mydb_upgrade.sql' % repos))
|
|
|
|
|
self.assert_(os.path.exists('%s/versions/001_mydb_downgrade.sql' % repos))
|
|
|
|
|
|
|
|
|
|
# Test creating a second
|
|
|
|
|
result = self.env.run('migrate script_sql postgres --repository=%s' % repos)
|
|
|
|
|
result = self.env.run('bin/migrate script_sql postgres --repository=%s' % repos)
|
|
|
|
|
self.assert_(os.path.exists('%s/versions/002_postgres_upgrade.sql' % repos))
|
|
|
|
|
self.assert_(os.path.exists('%s/versions/002_postgres_downgrade.sql' % repos))
|
|
|
|
|
|
|
|
|
@ -138,7 +138,7 @@ class TestShellCommands(Shell):
|
|
|
|
|
self.assert_(not os.path.exists(script))
|
|
|
|
|
|
|
|
|
|
# No attempt is made to verify correctness of the repository path here
|
|
|
|
|
result = self.env.run('migrate manage %s --repository=/bla/' % script)
|
|
|
|
|
result = self.env.run('bin/migrate manage %s --repository=/bla/' % script)
|
|
|
|
|
self.assert_(os.path.exists(script))
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@ -149,41 +149,41 @@ class TestShellRepository(Shell):
|
|
|
|
|
"""Create repository, python change script"""
|
|
|
|
|
super(TestShellRepository, self).setUp()
|
|
|
|
|
self.path_repos = self.tmp_repos()
|
|
|
|
|
result = self.env.run('migrate create %s repository_name' % self.path_repos)
|
|
|
|
|
result = self.env.run('bin/migrate create %s repository_name' % self.path_repos)
|
|
|
|
|
|
|
|
|
|
def test_version(self):
|
|
|
|
|
"""Correctly detect repository version"""
|
|
|
|
|
# Version: 0 (no scripts yet); successful execution
|
|
|
|
|
result = self.env.run('migrate version --repository=%s' % self.path_repos)
|
|
|
|
|
result = self.env.run('bin/migrate version --repository=%s' % self.path_repos)
|
|
|
|
|
self.assertEqual(result.stdout.strip(), "0")
|
|
|
|
|
|
|
|
|
|
# Also works as a positional param
|
|
|
|
|
result = self.env.run('migrate version %s' % self.path_repos)
|
|
|
|
|
result = self.env.run('bin/migrate version %s' % self.path_repos)
|
|
|
|
|
self.assertEqual(result.stdout.strip(), "0")
|
|
|
|
|
|
|
|
|
|
# Create a script and version should increment
|
|
|
|
|
result = self.env.run('migrate script Desc %s' % self.path_repos)
|
|
|
|
|
result = self.env.run('migrate version %s' % self.path_repos)
|
|
|
|
|
result = self.env.run('bin/migrate script Desc %s' % self.path_repos)
|
|
|
|
|
result = self.env.run('bin/migrate version %s' % self.path_repos)
|
|
|
|
|
self.assertEqual(result.stdout.strip(), "1")
|
|
|
|
|
|
|
|
|
|
def test_source(self):
|
|
|
|
|
"""Correctly fetch a script's source"""
|
|
|
|
|
result = self.env.run('migrate script Desc --repository=%s' % self.path_repos)
|
|
|
|
|
result = self.env.run('bin/migrate script Desc --repository=%s' % self.path_repos)
|
|
|
|
|
|
|
|
|
|
filename = '%s/versions/001_Desc.py' % self.path_repos
|
|
|
|
|
source = open(filename).read()
|
|
|
|
|
self.assert_(source.find('def upgrade') >= 0)
|
|
|
|
|
|
|
|
|
|
# Version is now 1
|
|
|
|
|
result = self.env.run('migrate version %s' % self.path_repos)
|
|
|
|
|
result = self.env.run('bin/migrate version %s' % self.path_repos)
|
|
|
|
|
self.assertEqual(result.stdout.strip(), "1")
|
|
|
|
|
|
|
|
|
|
# Output/verify the source of version 1
|
|
|
|
|
result = self.env.run('migrate source 1 --repository=%s' % self.path_repos)
|
|
|
|
|
result = self.env.run('bin/migrate source 1 --repository=%s' % self.path_repos)
|
|
|
|
|
self.assertEqual(result.stdout.strip(), source.strip())
|
|
|
|
|
|
|
|
|
|
# We can also send the source to a file... test that too
|
|
|
|
|
result = self.env.run('migrate source 1 %s --repository=%s' %
|
|
|
|
|
result = self.env.run('bin/migrate source 1 %s --repository=%s' %
|
|
|
|
|
(filename, self.path_repos))
|
|
|
|
|
self.assert_(os.path.exists(filename))
|
|
|
|
|
fd = open(filename)
|
|
|
|
@ -203,17 +203,17 @@ class TestShellDatabase(Shell, DB):
|
|
|
|
|
"""Ensure we can set version control on a database"""
|
|
|
|
|
path_repos = repos = self.tmp_repos()
|
|
|
|
|
url = self.url
|
|
|
|
|
result = self.env.run('migrate create %s repository_name' % repos)
|
|
|
|
|
result = self.env.run('bin/migrate create %s repository_name' % repos)
|
|
|
|
|
|
|
|
|
|
result = self.env.run('migrate drop_version_control %(url)s %(repos)s'\
|
|
|
|
|
result = self.env.run('bin/migrate drop_version_control %(url)s %(repos)s'\
|
|
|
|
|
% locals(), expect_error=True)
|
|
|
|
|
self.assertEqual(result.returncode, 1)
|
|
|
|
|
result = self.env.run('migrate version_control %(url)s %(repos)s' % locals())
|
|
|
|
|
result = self.env.run('bin/migrate version_control %(url)s %(repos)s' % locals())
|
|
|
|
|
|
|
|
|
|
# Clean up
|
|
|
|
|
result = self.env.run('migrate drop_version_control %(url)s %(repos)s' % locals())
|
|
|
|
|
result = self.env.run('bin/migrate drop_version_control %(url)s %(repos)s' % locals())
|
|
|
|
|
# Attempting to drop vc from a database without it should fail
|
|
|
|
|
result = self.env.run('migrate drop_version_control %(url)s %(repos)s'\
|
|
|
|
|
result = self.env.run('bin/migrate drop_version_control %(url)s %(repos)s'\
|
|
|
|
|
% locals(), expect_error=True)
|
|
|
|
|
self.assertEqual(result.returncode, 1)
|
|
|
|
|
|
|
|
|
@ -222,41 +222,41 @@ class TestShellDatabase(Shell, DB):
|
|
|
|
|
"""Commands with default arguments set by manage.py"""
|
|
|
|
|
path_repos = repos = self.tmp_repos()
|
|
|
|
|
url = self.url
|
|
|
|
|
result = self.env.run('migrate create --name=repository_name %s' % repos)
|
|
|
|
|
result = self.env.run('migrate drop_version_control %(url)s %(repos)s' % locals(), expect_error=True)
|
|
|
|
|
result = self.env.run('bin/migrate create --name=repository_name %s' % repos)
|
|
|
|
|
result = self.env.run('bin/migrate drop_version_control %(url)s %(repos)s' % locals(), expect_error=True)
|
|
|
|
|
self.assertEqual(result.returncode, 1)
|
|
|
|
|
result = self.env.run('migrate version_control %(url)s %(repos)s' % locals())
|
|
|
|
|
result = self.env.run('bin/migrate version_control %(url)s %(repos)s' % locals())
|
|
|
|
|
|
|
|
|
|
result = self.env.run('migrate drop_version_control %(url)s %(repos)s' % locals())
|
|
|
|
|
result = self.env.run('bin/migrate drop_version_control %(url)s %(repos)s' % locals())
|
|
|
|
|
|
|
|
|
|
@usedb()
|
|
|
|
|
def test_version_control_specified(self):
|
|
|
|
|
"""Ensure we can set version control to a particular version"""
|
|
|
|
|
path_repos = self.tmp_repos()
|
|
|
|
|
url = self.url
|
|
|
|
|
result = self.env.run('migrate create --name=repository_name %s' % path_repos)
|
|
|
|
|
result = self.env.run('migrate drop_version_control %(url)s %(path_repos)s' % locals(), expect_error=True)
|
|
|
|
|
result = self.env.run('bin/migrate create --name=repository_name %s' % path_repos)
|
|
|
|
|
result = self.env.run('bin/migrate drop_version_control %(url)s %(path_repos)s' % locals(), expect_error=True)
|
|
|
|
|
self.assertEqual(result.returncode, 1)
|
|
|
|
|
|
|
|
|
|
# Fill the repository
|
|
|
|
|
path_script = self.tmp_py()
|
|
|
|
|
version = 2
|
|
|
|
|
for i in range(version):
|
|
|
|
|
result = self.env.run('migrate script Desc --repository=%s' % path_repos)
|
|
|
|
|
result = self.env.run('bin/migrate script Desc --repository=%s' % path_repos)
|
|
|
|
|
|
|
|
|
|
# Repository version is correct
|
|
|
|
|
result = self.env.run('migrate version %s' % path_repos)
|
|
|
|
|
result = self.env.run('bin/migrate version %s' % path_repos)
|
|
|
|
|
self.assertEqual(result.stdout.strip(), str(version))
|
|
|
|
|
|
|
|
|
|
# Apply versioning to DB
|
|
|
|
|
result = self.env.run('migrate version_control %(url)s %(path_repos)s %(version)s' % locals())
|
|
|
|
|
result = self.env.run('bin/migrate version_control %(url)s %(path_repos)s %(version)s' % locals())
|
|
|
|
|
|
|
|
|
|
# Test db version number (should start at 2)
|
|
|
|
|
result = self.env.run('migrate db_version %(url)s %(path_repos)s' % locals())
|
|
|
|
|
result = self.env.run('bin/migrate db_version %(url)s %(path_repos)s' % locals())
|
|
|
|
|
self.assertEqual(result.stdout.strip(), str(version))
|
|
|
|
|
|
|
|
|
|
# Clean up
|
|
|
|
|
result = self.env.run('migrate drop_version_control %(url)s %(path_repos)s' % locals())
|
|
|
|
|
result = self.env.run('bin/migrate drop_version_control %(url)s %(path_repos)s' % locals())
|
|
|
|
|
|
|
|
|
|
@usedb()
|
|
|
|
|
def test_upgrade(self):
|
|
|
|
@ -264,67 +264,67 @@ class TestShellDatabase(Shell, DB):
|
|
|
|
|
# Create a repository
|
|
|
|
|
repos_name = 'repos_name'
|
|
|
|
|
repos_path = self.tmp()
|
|
|
|
|
result = self.env.run('migrate create %(repos_path)s %(repos_name)s' % locals())
|
|
|
|
|
result = self.env.run('bin/migrate create %(repos_path)s %(repos_name)s' % locals())
|
|
|
|
|
self.assertEquals(self.run_version(repos_path), 0)
|
|
|
|
|
|
|
|
|
|
# Version the DB
|
|
|
|
|
result = self.env.run('migrate drop_version_control %s %s' % (self.url, repos_path), expect_error=True)
|
|
|
|
|
result = self.env.run('migrate version_control %s %s' % (self.url, repos_path))
|
|
|
|
|
result = self.env.run('bin/migrate drop_version_control %s %s' % (self.url, repos_path), expect_error=True)
|
|
|
|
|
result = self.env.run('bin/migrate version_control %s %s' % (self.url, repos_path))
|
|
|
|
|
|
|
|
|
|
# Upgrades with latest version == 0
|
|
|
|
|
self.assertEquals(self.run_db_version(self.url, repos_path), 0)
|
|
|
|
|
result = self.env.run('migrate upgrade %s %s' % (self.url, repos_path))
|
|
|
|
|
result = self.env.run('bin/migrate upgrade %s %s' % (self.url, repos_path))
|
|
|
|
|
self.assertEquals(self.run_db_version(self.url, repos_path), 0)
|
|
|
|
|
result = self.env.run('migrate upgrade %s %s' % (self.url, repos_path))
|
|
|
|
|
result = self.env.run('bin/migrate upgrade %s %s' % (self.url, repos_path))
|
|
|
|
|
self.assertEquals(self.run_db_version(self.url, repos_path), 0)
|
|
|
|
|
result = self.env.run('migrate upgrade %s %s 1' % (self.url, repos_path), expect_error=True)
|
|
|
|
|
result = self.env.run('bin/migrate upgrade %s %s 1' % (self.url, repos_path), expect_error=True)
|
|
|
|
|
self.assertEquals(result.returncode, 1)
|
|
|
|
|
result = self.env.run('migrate upgrade %s %s -1' % (self.url, repos_path), expect_error=True)
|
|
|
|
|
result = self.env.run('bin/migrate upgrade %s %s -1' % (self.url, repos_path), expect_error=True)
|
|
|
|
|
self.assertEquals(result.returncode, 2)
|
|
|
|
|
|
|
|
|
|
# Add a script to the repository; upgrade the db
|
|
|
|
|
result = self.env.run('migrate script Desc --repository=%s' % (repos_path))
|
|
|
|
|
result = self.env.run('bin/migrate script Desc --repository=%s' % (repos_path))
|
|
|
|
|
self.assertEquals(self.run_version(repos_path), 1)
|
|
|
|
|
self.assertEquals(self.run_db_version(self.url, repos_path), 0)
|
|
|
|
|
|
|
|
|
|
# Test preview
|
|
|
|
|
result = self.env.run('migrate upgrade %s %s 0 --preview_sql' % (self.url, repos_path))
|
|
|
|
|
result = self.env.run('migrate upgrade %s %s 0 --preview_py' % (self.url, repos_path))
|
|
|
|
|
result = self.env.run('bin/migrate upgrade %s %s 0 --preview_sql' % (self.url, repos_path))
|
|
|
|
|
result = self.env.run('bin/migrate upgrade %s %s 0 --preview_py' % (self.url, repos_path))
|
|
|
|
|
|
|
|
|
|
result = self.env.run('migrate upgrade %s %s' % (self.url, repos_path))
|
|
|
|
|
result = self.env.run('bin/migrate upgrade %s %s' % (self.url, repos_path))
|
|
|
|
|
self.assertEquals(self.run_db_version(self.url, repos_path), 1)
|
|
|
|
|
|
|
|
|
|
# Downgrade must have a valid version specified
|
|
|
|
|
result = self.env.run('migrate downgrade %s %s' % (self.url, repos_path), expect_error=True)
|
|
|
|
|
result = self.env.run('bin/migrate downgrade %s %s' % (self.url, repos_path), expect_error=True)
|
|
|
|
|
self.assertEquals(result.returncode, 2)
|
|
|
|
|
result = self.env.run('migrate downgrade %s %s -1' % (self.url, repos_path), expect_error=True)
|
|
|
|
|
result = self.env.run('bin/migrate downgrade %s %s -1' % (self.url, repos_path), expect_error=True)
|
|
|
|
|
self.assertEquals(result.returncode, 2)
|
|
|
|
|
result = self.env.run('migrate downgrade %s %s 2' % (self.url, repos_path), expect_error=True)
|
|
|
|
|
result = self.env.run('bin/migrate downgrade %s %s 2' % (self.url, repos_path), expect_error=True)
|
|
|
|
|
self.assertEquals(result.returncode, 2)
|
|
|
|
|
self.assertEquals(self.run_db_version(self.url, repos_path), 1)
|
|
|
|
|
|
|
|
|
|
result = self.env.run('migrate downgrade %s %s 0' % (self.url, repos_path))
|
|
|
|
|
result = self.env.run('bin/migrate downgrade %s %s 0' % (self.url, repos_path))
|
|
|
|
|
self.assertEquals(self.run_db_version(self.url, repos_path), 0)
|
|
|
|
|
|
|
|
|
|
result = self.env.run('migrate downgrade %s %s 1' % (self.url, repos_path), expect_error=True)
|
|
|
|
|
result = self.env.run('bin/migrate downgrade %s %s 1' % (self.url, repos_path), expect_error=True)
|
|
|
|
|
self.assertEquals(result.returncode, 2)
|
|
|
|
|
self.assertEquals(self.run_db_version(self.url, repos_path), 0)
|
|
|
|
|
|
|
|
|
|
result = self.env.run('migrate drop_version_control %s %s' % (self.url, repos_path))
|
|
|
|
|
result = self.env.run('bin/migrate drop_version_control %s %s' % (self.url, repos_path))
|
|
|
|
|
|
|
|
|
|
def _run_test_sqlfile(self, upgrade_script, downgrade_script):
|
|
|
|
|
# TODO: add test script that checks if db really changed
|
|
|
|
|
repos_path = self.tmp()
|
|
|
|
|
repos_name = 'repos'
|
|
|
|
|
|
|
|
|
|
result = self.env.run('migrate create %s %s' % (repos_path, repos_name))
|
|
|
|
|
result = self.env.run('migrate drop_version_control %s %s' % (self.url, repos_path), expect_error=True)
|
|
|
|
|
result = self.env.run('migrate version_control %s %s' % (self.url, repos_path))
|
|
|
|
|
result = self.env.run('bin/migrate create %s %s' % (repos_path, repos_name))
|
|
|
|
|
result = self.env.run('bin/migrate drop_version_control %s %s' % (self.url, repos_path), expect_error=True)
|
|
|
|
|
result = self.env.run('bin/migrate version_control %s %s' % (self.url, repos_path))
|
|
|
|
|
self.assertEquals(self.run_version(repos_path), 0)
|
|
|
|
|
self.assertEquals(self.run_db_version(self.url, repos_path), 0)
|
|
|
|
|
|
|
|
|
|
beforeCount = len(os.listdir(os.path.join(repos_path, 'versions'))) # hmm, this number changes sometimes based on running from svn
|
|
|
|
|
result = self.env.run('migrate script_sql %s --repository=%s' % ('postgres', repos_path))
|
|
|
|
|
result = self.env.run('bin/migrate script_sql %s --repository=%s' % ('postgres', repos_path))
|
|
|
|
|
self.assertEquals(self.run_version(repos_path), 1)
|
|
|
|
|
self.assertEquals(len(os.listdir(os.path.join(repos_path, 'versions'))), beforeCount + 2)
|
|
|
|
|
|
|
|
|
@ -334,11 +334,11 @@ class TestShellDatabase(Shell, DB):
|
|
|
|
|
self.assertEquals(self.run_db_version(self.url, repos_path), 0)
|
|
|
|
|
self.assertRaises(Exception, self.engine.text('select * from t_table').execute)
|
|
|
|
|
|
|
|
|
|
result = self.env.run('migrate upgrade %s %s' % (self.url, repos_path))
|
|
|
|
|
result = self.env.run('bin/migrate upgrade %s %s' % (self.url, repos_path))
|
|
|
|
|
self.assertEquals(self.run_db_version(self.url, repos_path), 1)
|
|
|
|
|
self.engine.text('select * from t_table').execute()
|
|
|
|
|
|
|
|
|
|
result = self.env.run('migrate downgrade %s %s 0' % (self.url, repos_path))
|
|
|
|
|
result = self.env.run('bin/migrate downgrade %s %s 0' % (self.url, repos_path))
|
|
|
|
|
self.assertEquals(self.run_db_version(self.url, repos_path), 0)
|
|
|
|
|
self.assertRaises(Exception, self.engine.text('select * from t_table').execute)
|
|
|
|
|
|
|
|
|
@ -378,15 +378,15 @@ class TestShellDatabase(Shell, DB):
|
|
|
|
|
repos_name = 'repos_name'
|
|
|
|
|
repos_path = self.tmp()
|
|
|
|
|
|
|
|
|
|
result = self.env.run('migrate create repository_name --repository=%s' % repos_path)
|
|
|
|
|
result = self.env.run('migrate drop_version_control %s %s' % (self.url, repos_path), expect_error=True)
|
|
|
|
|
result = self.env.run('migrate version_control %s %s' % (self.url, repos_path))
|
|
|
|
|
result = self.env.run('bin/migrate create repository_name --repository=%s' % repos_path)
|
|
|
|
|
result = self.env.run('bin/migrate drop_version_control %s %s' % (self.url, repos_path), expect_error=True)
|
|
|
|
|
result = self.env.run('bin/migrate version_control %s %s' % (self.url, repos_path))
|
|
|
|
|
self.assertEquals(self.run_version(repos_path), 0)
|
|
|
|
|
self.assertEquals(self.run_db_version(self.url, repos_path), 0)
|
|
|
|
|
|
|
|
|
|
# Empty script should succeed
|
|
|
|
|
result = self.env.run('migrate script Desc %s' % repos_path)
|
|
|
|
|
result = self.env.run('migrate test %s %s' % (self.url, repos_path))
|
|
|
|
|
result = self.env.run('bin/migrate script Desc %s' % repos_path)
|
|
|
|
|
result = self.env.run('bin/migrate test %s %s' % (self.url, repos_path))
|
|
|
|
|
self.assertEquals(self.run_version(repos_path), 1)
|
|
|
|
|
self.assertEquals(self.run_db_version(self.url, repos_path), 0)
|
|
|
|
|
|
|
|
|
@ -408,7 +408,7 @@ class TestShellDatabase(Shell, DB):
|
|
|
|
|
file.write(script_text)
|
|
|
|
|
file.close()
|
|
|
|
|
|
|
|
|
|
result = self.env.run('migrate test %s %s bla' % (self.url, repos_path), expect_error=True)
|
|
|
|
|
result = self.env.run('bin/migrate test %s %s bla' % (self.url, repos_path), expect_error=True)
|
|
|
|
|
self.assertEqual(result.returncode, 2)
|
|
|
|
|
self.assertEquals(self.run_version(repos_path), 1)
|
|
|
|
|
self.assertEquals(self.run_db_version(self.url, repos_path), 0)
|
|
|
|
@ -439,7 +439,7 @@ class TestShellDatabase(Shell, DB):
|
|
|
|
|
file = open(script_path, 'w')
|
|
|
|
|
file.write(script_text)
|
|
|
|
|
file.close()
|
|
|
|
|
result = self.env.run('migrate test %s %s' % (self.url, repos_path))
|
|
|
|
|
result = self.env.run('bin/migrate test %s %s' % (self.url, repos_path))
|
|
|
|
|
self.assertEquals(self.run_version(repos_path), 1)
|
|
|
|
|
self.assertEquals(self.run_db_version(self.url, repos_path), 0)
|
|
|
|
|
|
|
|
|
@ -450,51 +450,51 @@ class TestShellDatabase(Shell, DB):
|
|
|
|
|
repos_name = 'repos_name'
|
|
|
|
|
repos_path = self.tmp()
|
|
|
|
|
script_path = self.tmp_py()
|
|
|
|
|
model_module = 'tests.fixture.models:meta_rundiffs'
|
|
|
|
|
old_model_module = 'tests.fixture.models:meta_old_rundiffs'
|
|
|
|
|
model_module = 'migrate.tests.fixture.models:meta_rundiffs'
|
|
|
|
|
old_model_module = 'migrate.tests.fixture.models:meta_old_rundiffs'
|
|
|
|
|
|
|
|
|
|
# Create empty repository.
|
|
|
|
|
self.meta = MetaData(self.engine, reflect=True)
|
|
|
|
|
self.meta.reflect()
|
|
|
|
|
self.meta.drop_all() # in case junk tables are lying around in the test database
|
|
|
|
|
|
|
|
|
|
result = self.env.run('migrate create %s %s' % (repos_path, repos_name))
|
|
|
|
|
result = self.env.run('migrate drop_version_control %s %s' % (self.url, repos_path), expect_error=True)
|
|
|
|
|
result = self.env.run('migrate version_control %s %s' % (self.url, repos_path))
|
|
|
|
|
result = self.env.run('bin/migrate create %s %s' % (repos_path, repos_name))
|
|
|
|
|
result = self.env.run('bin/migrate drop_version_control %s %s' % (self.url, repos_path), expect_error=True)
|
|
|
|
|
result = self.env.run('bin/migrate version_control %s %s' % (self.url, repos_path))
|
|
|
|
|
self.assertEquals(self.run_version(repos_path), 0)
|
|
|
|
|
self.assertEquals(self.run_db_version(self.url, repos_path), 0)
|
|
|
|
|
|
|
|
|
|
# Setup helper script.
|
|
|
|
|
result = self.env.run('migrate manage %s --repository=%s --url=%s --model=%s'\
|
|
|
|
|
result = self.env.run('bin/migrate manage %s --repository=%s --url=%s --model=%s'\
|
|
|
|
|
% (script_path, repos_path, self.url, model_module))
|
|
|
|
|
self.assert_(os.path.exists(script_path))
|
|
|
|
|
|
|
|
|
|
# Model is defined but database is empty.
|
|
|
|
|
result = self.env.run('migrate compare_model_to_db %s %s --model=%s' \
|
|
|
|
|
result = self.env.run('bin/migrate compare_model_to_db %s %s --model=%s' \
|
|
|
|
|
% (self.url, repos_path, model_module))
|
|
|
|
|
self.assert_("tables missing in database: tmp_account_rundiffs" in result.stdout)
|
|
|
|
|
|
|
|
|
|
# Test Deprecation
|
|
|
|
|
result = self.env.run('migrate compare_model_to_db %s %s --model=%s' \
|
|
|
|
|
result = self.env.run('bin/migrate compare_model_to_db %s %s --model=%s' \
|
|
|
|
|
% (self.url, repos_path, model_module.replace(":", ".")), expect_error=True)
|
|
|
|
|
self.assertEqual(result.returncode, 0)
|
|
|
|
|
self.assertTrue("DeprecationWarning" in result.stderr)
|
|
|
|
|
self.assert_("tables missing in database: tmp_account_rundiffs" in result.stdout)
|
|
|
|
|
|
|
|
|
|
# Update db to latest model.
|
|
|
|
|
result = self.env.run('migrate update_db_from_model %s %s %s'\
|
|
|
|
|
result = self.env.run('bin/migrate update_db_from_model %s %s %s'\
|
|
|
|
|
% (self.url, repos_path, model_module))
|
|
|
|
|
self.assertEquals(self.run_version(repos_path), 0)
|
|
|
|
|
self.assertEquals(self.run_db_version(self.url, repos_path), 0) # version did not get bumped yet because new version not yet created
|
|
|
|
|
|
|
|
|
|
result = self.env.run('migrate compare_model_to_db %s %s %s'\
|
|
|
|
|
result = self.env.run('bin/migrate compare_model_to_db %s %s %s'\
|
|
|
|
|
% (self.url, repos_path, model_module))
|
|
|
|
|
self.assert_("No schema diffs" in result.stdout)
|
|
|
|
|
|
|
|
|
|
result = self.env.run('migrate drop_version_control %s %s' % (self.url, repos_path), expect_error=True)
|
|
|
|
|
result = self.env.run('migrate version_control %s %s' % (self.url, repos_path))
|
|
|
|
|
result = self.env.run('bin/migrate drop_version_control %s %s' % (self.url, repos_path), expect_error=True)
|
|
|
|
|
result = self.env.run('bin/migrate version_control %s %s' % (self.url, repos_path))
|
|
|
|
|
|
|
|
|
|
result = self.env.run('migrate create_model %s %s' % (self.url, repos_path))
|
|
|
|
|
result = self.env.run('bin/migrate create_model %s %s' % (self.url, repos_path))
|
|
|
|
|
temp_dict = dict()
|
|
|
|
|
exec result.stdout in temp_dict
|
|
|
|
|
|
|
|
|
@ -506,10 +506,10 @@ class TestShellDatabase(Shell, DB):
|
|
|
|
|
#Column('passwd', String(length=None, convert_unicode=False, assert_unicode=None))""" in result.stdout)
|
|
|
|
|
|
|
|
|
|
# We're happy with db changes, make first db upgrade script to go from version 0 -> 1.
|
|
|
|
|
result = self.env.run('migrate make_update_script_for_model', expect_error=True)
|
|
|
|
|
result = self.env.run('bin/migrate make_update_script_for_model', expect_error=True)
|
|
|
|
|
self.assertTrue('Not enough arguments' in result.stderr)
|
|
|
|
|
|
|
|
|
|
result_script = self.env.run('migrate make_update_script_for_model %s %s %s %s'\
|
|
|
|
|
result_script = self.env.run('bin/migrate make_update_script_for_model %s %s %s %s'\
|
|
|
|
|
% (self.url, repos_path, old_model_module, model_module))
|
|
|
|
|
self.assertEqualsIgnoreWhitespace(result_script.stdout,
|
|
|
|
|
'''from sqlalchemy import *
|
|
|
|
@ -536,11 +536,11 @@ class TestShellDatabase(Shell, DB):
|
|
|
|
|
tmp_account_rundiffs.drop()''')
|
|
|
|
|
|
|
|
|
|
# Save the upgrade script.
|
|
|
|
|
result = self.env.run('migrate script Desc %s' % repos_path)
|
|
|
|
|
result = self.env.run('bin/migrate script Desc %s' % repos_path)
|
|
|
|
|
upgrade_script_path = '%s/versions/001_Desc.py' % repos_path
|
|
|
|
|
open(upgrade_script_path, 'w').write(result_script.stdout)
|
|
|
|
|
|
|
|
|
|
result = self.env.run('migrate compare_model_to_db %s %s %s'\
|
|
|
|
|
result = self.env.run('bin/migrate compare_model_to_db %s %s %s'\
|
|
|
|
|
% (self.url, repos_path, model_module))
|
|
|
|
|
self.assert_("No schema diffs" in result.stdout)
|
|
|
|
|
|