refactor api.py a bit, lots of PEP8 love

This commit is contained in:
iElectric 2009-06-02 19:50:31 +00:00
parent 6a34d5ca59
commit c6883c0d47
6 changed files with 271 additions and 224 deletions

View File

@ -19,26 +19,26 @@ from sqlalchemy import create_engine
from migrate.versioning import (exceptions, repository, schema, version,
script as script_) # command name conflict
from migrate.versioning.util import asbool
from migrate.versioning.util import asbool, catch_known_errors
__all__ = [
'help',
'create',
'script',
'script_sql',
'make_update_script_for_model',
'version',
'source',
'version_control',
'db_version',
'upgrade',
'downgrade',
'drop_version_control',
'manage',
'test',
'compare_model_to_db',
'create_model',
'update_db_from_model',
'help',
'create',
'script',
'script_sql',
'make_update_script_for_model',
'version',
'source',
'version_control',
'db_version',
'upgrade',
'downgrade',
'drop_version_control',
'manage',
'test',
'compare_model_to_db',
'create_model',
'update_db_from_model',
]
cls_repository = repository.Repository
@ -65,7 +65,7 @@ def help(cmd=None, **opts):
ret = ret.replace('%prog', sys.argv[0])
return ret
@catch_known_errors
def create(repository, name, **opts):
"""%prog create REPOSITORY_PATH NAME [--table=TABLE]
@ -75,13 +75,11 @@ def create(repository, name, **opts):
'migrate_version'. This table is created in all version-controlled
databases.
"""
try:
rep = cls_repository.create(repository, name, **opts)
except exceptions.PathFoundError, e:
raise exceptions.KnownError("The path %s already exists" % e.args[0])
rep = cls_repository.create(repository, name, **opts)
def script(description, repository=None, **opts):
@catch_known_errors
def script(description, repository, **opts):
"""%prog script [--repository=REPOSITORY_PATH] DESCRIPTION
Create an empty change script using the next unused version number
@ -90,16 +88,12 @@ def script(description, repository=None, **opts):
For instance, manage.py script "Add initial tables" creates:
repository/versions/001_Add_initial_tables.py
"""
try:
if repository is None:
raise exceptions.UsageError("A repository must be specified")
repos = cls_repository(repository)
repos.create_script(description, **opts)
except exceptions.PathFoundError, e:
raise exceptions.KnownError("The path %s already exists" % e.args[0])
repos = cls_repository(repository)
repos.create_script(description, **opts)
def script_sql(database, repository=None, **opts):
@catch_known_errors
def script_sql(database, repository, **opts):
"""%prog script_sql [--repository=REPOSITORY_PATH] DATABASE
Create empty change SQL scripts for given DATABASE, where DATABASE
@ -107,16 +101,11 @@ def script_sql(database, repository=None, **opts):
or generic ('default').
For instance, manage.py script_sql postgres creates:
repository/versions/001_upgrade_postgres.sql and
repository/versions/001_downgrade_postgres.sql
repository/versions/001_postgres_upgrade.sql and
repository/versions/001_postgres_postgres.sql
"""
try:
if repository is None:
raise exceptions.UsageError("A repository must be specified")
repos = cls_repository(repository)
repos.create_script_sql(database, **opts)
except exceptions.PathFoundError, e:
raise exceptions.KnownError("The path %s already exists" % e.args[0])
repos = cls_repository(repository)
repos.create_script_sql(database, **opts)
def test(repository, url=None, **opts):
@ -130,21 +119,14 @@ def test(repository, url=None, **opts):
engine = create_engine(url)
repos = cls_repository(repository)
script = repos.version(None).script()
# Upgrade
print "Upgrading...",
try:
script.run(engine, 1)
except:
print "ERROR"
raise
script.run(engine, 1)
print "done"
print "Downgrading...",
try:
script.run(engine, -1)
except:
print "ERROR"
raise
script.run(engine, -1)
print "done"
print "Success"
@ -172,6 +154,7 @@ def source(version, dest=None, repository=None, **opts):
if dest is not None:
dest = open(dest, 'w')
dest.write(ret)
dest.close()
ret = None
return ret
@ -298,7 +281,7 @@ def drop_version_control(url, repository, **opts):
"""
echo = asbool(opts.get('echo', False))
engine = create_engine(url, echo=echo)
schema=cls_schema(engine, repository)
schema = cls_schema(engine, repository)
schema.drop()
@ -347,6 +330,8 @@ def create_model(url, repository, **opts):
print cls_schema.create_model(engine, repository, declarative)
# TODO: get rid of this? if we don't add back path param
@catch_known_errors
def make_update_script_for_model(url, oldmodel, model, repository, **opts):
"""%prog make_update_script_for_model URL OLDMODEL MODEL REPOSITORY_PATH
@ -357,12 +342,8 @@ def make_update_script_for_model(url, oldmodel, model, repository, **opts):
""" # TODO: get rid of EXPERIMENTAL label
echo = asbool(opts.get('echo', False))
engine = create_engine(url, echo=echo)
try:
print cls_script_python.make_update_script_for_model(
engine, oldmodel, model, repository, **opts)
except exceptions.PathFoundError, e:
# TODO: get rid of this? if we don't add back path param
raise exceptions.KnownError("The path %s already exists" % e.args[0])
print cls_script_python.make_update_script_for_model(
engine, oldmodel, model, repository, **opts)
def update_db_from_model(url, model, repository, **opts):

View File

@ -1,5 +1,12 @@
from keyedinstance import KeyedInstance
from importpath import import_path
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from decorator import decorator
from migrate.versioning import exceptions
from migrate.versioning.util.keyedinstance import KeyedInstance
from migrate.versioning.util.importpath import import_path
def loadModel(model):
''' Import module and use module-level variable -- assume model is of form "mod1.mod2.varname". '''
@ -23,3 +30,12 @@ def asbool(obj):
else:
raise ValueError("String is not true/false: %r" % obj)
return bool(obj)
@decorator
def catch_known_errors(f, *a, **kw):
"""Decorator that catches known api usage errors"""
try:
f(*a, **kw)
except exceptions.PathFoundError, e:
raise exceptions.KnownError("The path %s already exists" % e.args[0])

View File

@ -3,8 +3,11 @@ source-dir = docs
build-dir = docs/_build
[egg_info]
tag_svn_revision=1
tag_build=.dev
tag_svn_revision = 1
tag_build = .dev
[nosetests]
pdb = true
[aliases]
release = egg_info -RDb ''

View File

@ -13,6 +13,7 @@ except ImportError:
pass
test_requirements = ['nose >= 0.10']
required_deps = ['sqlalchemy >= 0.5', 'decorator']
setup(
name = "sqlalchemy-migrate",
@ -26,7 +27,7 @@ Inspired by Ruby on Rails' migrations, Migrate provides a way to deal with datab
Migrate extends SQLAlchemy to have database changeset handling. It provides a database change repository mechanism which can be used from the command line as well as from inside python code.
""",
install_requires = ['sqlalchemy >= 0.5'],
install_requires = required_deps,
extras_require = {
'testing': test_requirements,
'docs' : ['sphinx >= 0.5'],

View File

@ -1,45 +1,55 @@
from pathed import *
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os
import shutil
import sys
from test.fixture.pathed import *
class Shell(Pathed):
"""Base class for command line tests"""
def execute(self,command,*p,**k):
def execute(self, command, *p, **k):
"""Return the fd of a command; can get output (stdout/err) and exitcode"""
# We might be passed a file descriptor for some reason; if so, just return it
if type(command) is file:
if isinstance(command, file):
return command
# Redirect stderr to stdout
# This is a bit of a hack, but I've not found a better way
py_path = os.environ.get('PYTHONPATH', '')
py_path_list = py_path.split(':')
py_path_list.append(os.path.abspath('.'))
os.environ['PYTHONPATH'] = ':'.join(py_path_list)
fd=os.popen(command+' 2>&1',*p,**k)
fd = os.popen(command + ' 2>&1')
if py_path:
py_path = os.environ['PYTHONPATH'] = py_path
else:
del os.environ['PYTHONPATH']
return fd
def output_and_exitcode(self,*p,**k):
fd=self.execute(*p,**k)
def output_and_exitcode(self, *p, **k):
fd=self.execute(*p, **k)
output = fd.read().strip()
exitcode = fd.close()
if k.pop('emit',False):
print output
return (output,exitcode)
def exitcode(self,*p,**k):
return (output, exitcode)
def exitcode(self, *p, **k):
"""Execute a command and return its exit code
...without printing its output/errors
"""
ret = self.output_and_exitcode(*p,**k)
ret = self.output_and_exitcode(*p, **k)
return ret[1]
def assertFailure(self,*p,**k):
output,exitcode = self.output_and_exitcode(*p,**k)
def assertFailure(self, *p, **k):
output,exitcode = self.output_and_exitcode(*p, **k)
assert (exitcode), output
def assertSuccess(self,*p,**k):
output,exitcode = self.output_and_exitcode(*p,**k)
def assertSuccess(self, *p, **k):
output,exitcode = self.output_and_exitcode(*p, **k)
#self.assert_(not exitcode, output)
assert (not exitcode), output

View File

@ -1,41 +1,52 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os
import sys
import shutil
import traceback
from StringIO import StringIO
from types import FileType
import os,shutil
from test import fixture
from migrate.versioning.repository import Repository
from migrate.versioning import genmodel, shell
from StringIO import StringIO
from sqlalchemy import MetaData,Table
python_version = sys.version[0:3]
from migrate.versioning.repository import Repository
from migrate.versioning import genmodel, shell
from test import fixture
python_version = sys.version[:3]
class Shell(fixture.Shell):
_cmd=os.path.join('python migrate', 'versioning', 'shell.py')
_cmd = os.path.join('python migrate', 'versioning', 'shell.py')
@classmethod
def cmd(cls,*p):
p = map(lambda s: str(s),p)
ret = ' '.join([cls._cmd]+p)
return ret
def cmd(cls, *args):
safe_parameters = map(lambda arg: str(arg), args)
return ' '.join([cls._cmd] + safe_parameters)
def execute(self, shell_cmd, runshell=None, **kwargs):
"""A crude simulation of a shell command, to speed things up"""
# If we get an fd, the command is already done
if isinstance(shell_cmd, FileType) or isinstance(shell_cmd, StringIO):
if isinstance(shell_cmd, (FileType, StringIO)):
return shell_cmd
# Analyze the command; see if we can 'fake' the shell
try:
# Forced to run in shell?
#if runshell or '--runshell' in sys.argv:
# if runshell or '--runshell' in sys.argv:
if runshell:
raise Exception
# Remove the command prefix
if not shell_cmd.startswith(self._cmd):
raise Exception
cmd = shell_cmd[(len(self._cmd)+1):]
cmd = shell_cmd[(len(self._cmd) + 1):]
params = cmd.split(' ')
command = params[0]
except:
return super(Shell,self).execute(shell_cmd)
except:
return super(Shell, self).execute(shell_cmd)
# Redirect stdout to an object; redirect stderr to stdout
fd = StringIO()
@ -47,18 +58,18 @@ class Shell(fixture.Shell):
try:
try:
shell.main(params, **kwargs)
except SystemExit,e:
except SystemExit, e:
# Simulate the exit status
fd_close=fd.close
fd_close = fd.close
def close_():
fd_close()
return e.args[0]
fd.close = close_
except Exception,e:
except Exception, e:
# Print the exception, but don't re-raise it
traceback.print_exc()
# Simulate a nonzero exit status
fd_close=fd.close
fd_close = fd.close
def close_():
fd_close()
return 2
@ -70,13 +81,14 @@ class Shell(fixture.Shell):
fd.seek(0)
return fd
def cmd_version(self,repos_path):
fd = self.execute(self.cmd('version',repos_path))
ret = int(fd.read().strip())
def cmd_version(self, repos_path):
fd = self.execute(self.cmd('version', repos_path))
result = int(fd.read().strip())
self.assertSuccess(fd)
return ret
def cmd_db_version(self,url,repos_path):
fd = self.execute(self.cmd('db_version',url,repos_path))
return result
def cmd_db_version(self, url, repos_path):
fd = self.execute(self.cmd('db_version', url, repos_path))
txt = fd.read()
#print txt
ret = int(txt.strip())
@ -86,129 +98,143 @@ class Shell(fixture.Shell):
class TestShellCommands(Shell):
"""Tests migrate.py commands"""
def test_run(self):
"""Runs; displays help"""
# Force this to run in shell...
self.assertSuccess(self.cmd('-h'),runshell=True)
self.assertSuccess(self.cmd('--help'),runshell=True)
def test_help(self):
"""Displays default help dialog"""
self.assertSuccess(self.cmd('-h'), runshell=True)
self.assertSuccess(self.cmd('--help'), runshell=True)
self.assertSuccess(self.cmd('help'), runshell=True)
def test_help_commands(self):
"""Display help on a specific command"""
self.assertSuccess(self.cmd('-h'),runshell=True)
self.assertSuccess(self.cmd('--help'),runshell=True)
for cmd in shell.api.__all__:
fd=self.execute(self.cmd('help',cmd))
fd = self.execute(self.cmd('help', cmd))
# Description may change, so best we can do is ensure it shows up
#self.assertNotEquals(fd.read(),'')
output = fd.read()
self.assertNotEquals(output,'')
self.assertNotEquals(output, '')
self.assertSuccess(fd)
def test_create(self):
"""Repositories are created successfully"""
repos=self.tmp_repos()
name='name'
repos = self.tmp_repos()
# Creating a file that doesn't exist should succeed
cmd=self.cmd('create',repos,name)
cmd = self.cmd('create', repos, 'repository_name')
self.assertSuccess(cmd)
# Files should actually be created
self.assert_(os.path.exists(repos))
# The default table should not be None
repos_ = Repository(repos)
self.assertNotEquals(repos_.config.get('db_settings','version_table'),'None')
self.assertNotEquals(repos_.config.get('db_settings', 'version_table'), 'None')
# Can't create it again: it already exists
self.assertFailure(cmd)
def test_script(self):
"""We can create a migration script via the command line"""
repos=self.tmp_repos()
self.assertSuccess(self.cmd('create',repos,'repository_name'))
repos = self.tmp_repos()
self.assertSuccess(self.cmd('create', repos, 'repository_name'))
self.assertSuccess(self.cmd('script', '--repository=%s' % repos, 'Desc'))
self.assert_(os.path.exists('%s/versions/001_Desc.py' % repos))
# 's' instead of 'script' should work too
self.assertSuccess(self.cmd('script', '--repository=%s' % repos, 'More'))
self.assert_(os.path.exists('%s/versions/002_More.py' % repos))
self.assertSuccess(self.cmd('script', '--repository=%s' % repos, '"Some Random name"'), runshell=True)
self.assert_(os.path.exists('%s/versions/003_Some_Random_name.py' % repos))
def test_script_sql(self):
"""We can create a migration sql script via the command line"""
repos=self.tmp_repos()
self.assertSuccess(self.cmd('create',repos,'repository_name'))
repos = self.tmp_repos()
self.assertSuccess(self.cmd('create', repos, 'repository_name'))
self.assertSuccess(self.cmd('script_sql', '--repository=%s' % repos, 'mydb'))
self.assert_(os.path.exists('%s/versions/001_mydb_upgrade.sql' % repos))
self.assert_(os.path.exists('%s/versions/001_mydb_downgrade.sql' % repos))
# Test creating a second
self.assertSuccess(self.cmd('script_sql', '--repository=%s' % repos, 'mydb'))
self.assert_(os.path.exists('%s/versions/002_mydb_upgrade.sql' % repos))
self.assert_(os.path.exists('%s/versions/002_mydb_downgrade.sql' % repos))
self.assertSuccess(self.cmd('script_sql', '--repository=%s' % repos, 'postgres'))
self.assert_(os.path.exists('%s/versions/002_postgres_upgrade.sql' % repos))
self.assert_(os.path.exists('%s/versions/002_postgres_downgrade.sql' % repos))
def test_manage(self):
"""Create a project management script"""
script=self.tmp_py()
script = self.tmp_py()
self.assert_(not os.path.exists(script))
# No attempt is made to verify correctness of the repository path here
self.assertSuccess(self.cmd('manage',script,'--repository=/path/to/repository'))
self.assertSuccess(self.cmd('manage', script, '--repository=/path/to/repository'))
self.assert_(os.path.exists(script))
class TestShellRepository(Shell):
"""Shell commands on an existing repository/python script"""
def setUp(self):
"""Create repository, python change script"""
self.path_repos=repos=self.tmp_repos()
self.assertSuccess(self.cmd('create',repos,'repository_name'))
self.path_repos = repos = self.tmp_repos()
self.assertSuccess(self.cmd('create', repos, 'repository_name'))
def test_version(self):
"""Correctly detect repository version"""
# Version: 0 (no scripts yet); successful execution
fd=self.execute(self.cmd('version','--repository=%s'%self.path_repos))
self.assertEquals(fd.read().strip(),"0")
fd = self.execute(self.cmd('version','--repository=%s' % self.path_repos))
self.assertEquals(fd.read().strip(), "0")
self.assertSuccess(fd)
# Also works as a positional param
fd=self.execute(self.cmd('version',self.path_repos))
self.assertEquals(fd.read().strip(),"0")
fd = self.execute(self.cmd('version', self.path_repos))
self.assertEquals(fd.read().strip(), "0")
self.assertSuccess(fd)
# Create a script and version should increment
self.assertSuccess(self.cmd('script', '--repository=%s' % self.path_repos, 'Desc'))
fd=self.execute(self.cmd('version',self.path_repos))
self.assertEquals(fd.read().strip(),"1")
fd = self.execute(self.cmd('version',self.path_repos))
self.assertEquals(fd.read().strip(), "1")
self.assertSuccess(fd)
def test_source(self):
"""Correctly fetch a script's source"""
self.assertSuccess(self.cmd('script', '--repository=%s' % self.path_repos, 'Desc'))
filename='%s/versions/001_Desc.py' % self.path_repos
source=open(filename).read()
self.assert_(source.find('def upgrade')>=0)
# Version is now 1
fd=self.execute(self.cmd('version',self.path_repos))
self.assert_(fd.read().strip()=="1")
self.assertSuccess(fd)
# Output/verify the source of version 1
fd=self.execute(self.cmd('source',1,'--repository=%s'%self.path_repos))
result=fd.read()
self.assertSuccess(fd)
self.assert_(result.strip()==source.strip())
# We can also send the source to a file... test that too
self.assertSuccess(self.cmd('source',1,filename,'--repository=%s'%self.path_repos))
self.assert_(os.path.exists(filename))
fd=open(filename)
result=fd.read()
self.assert_(result.strip()==source.strip())
class TestShellDatabase(Shell,fixture.DB):
filename = '%s/versions/001_Desc.py' % self.path_repos
source = open(filename).read()
self.assert_(source.find('def upgrade') >= 0)
# Version is now 1
fd = self.execute(self.cmd('version', self.path_repos))
self.assert_(fd.read().strip() == "1")
self.assertSuccess(fd)
# Output/verify the source of version 1
fd = self.execute(self.cmd('source', 1, '--repository=%s' % self.path_repos))
result = fd.read()
self.assertSuccess(fd)
self.assert_(result.strip() == source.strip())
# We can also send the source to a file... test that too
self.assertSuccess(self.cmd('source', 1, filename, '--repository=%s'%self.path_repos))
self.assert_(os.path.exists(filename))
fd = open(filename)
result = fd.read()
self.assert_(result.strip() == source.strip())
class TestShellDatabase(Shell, fixture.DB):
"""Commands associated with a particular database"""
# We'll need to clean up after ourself, since the shell creates its own txn;
# we need to connect to the DB to see if things worked
level=fixture.DB.CONNECT
level = fixture.DB.CONNECT
@fixture.usedb()
def test_version_control(self):
"""Ensure we can set version control on a database"""
path_repos=repos=self.tmp_repos()
self.assertSuccess(self.cmd('create',path_repos,'repository_name'))
self.exitcode(self.cmd('drop_version_control',self.url,path_repos))
self.assertSuccess(self.cmd('version_control',self.url,path_repos))
path_repos = repos = self.tmp_repos()
self.assertSuccess(self.cmd('create', path_repos, 'repository_name'))
self.exitcode(self.cmd('drop_version_control', self.url, path_repos))
self.assertSuccess(self.cmd('version_control', self.url, path_repos))
# Clean up
self.assertSuccess(self.cmd('drop_version_control',self.url,path_repos))
# Attempting to drop vc from a database without it should fail
@ -217,10 +243,11 @@ class TestShellDatabase(Shell,fixture.DB):
@fixture.usedb()
def test_wrapped_kwargs(self):
"""Commands with default arguments set by manage.py"""
path_repos=repos=self.tmp_repos()
path_repos = repos = self.tmp_repos()
self.assertSuccess(self.cmd('create', 'repository_name'), repository=path_repos)
self.exitcode(self.cmd('drop_version_control'), url=self.url, repository=path_repos)
self.assertSuccess(self.cmd('version_control'), url=self.url, repository=path_repos)
# Clean up
self.assertSuccess(self.cmd('drop_version_control'), url=self.url, repository=path_repos)
# Attempting to drop vc from a database without it should fail
@ -229,26 +256,31 @@ class TestShellDatabase(Shell,fixture.DB):
@fixture.usedb()
def test_version_control_specified(self):
"""Ensure we can set version control to a particular version"""
path_repos=self.tmp_repos()
self.assertSuccess(self.cmd('create',path_repos,'repository_name'))
self.exitcode(self.cmd('drop_version_control',self.url,path_repos))
path_repos = self.tmp_repos()
self.assertSuccess(self.cmd('create', path_repos, 'repository_name'))
self.exitcode(self.cmd('drop_version_control', self.url, path_repos))
# Fill the repository
path_script = self.tmp_py()
version=1
version = 1
for i in range(version):
self.assertSuccess(self.cmd('script', '--repository=%s' % path_repos, 'Desc'))
# Repository version is correct
fd=self.execute(self.cmd('version',path_repos))
self.assertEquals(fd.read().strip(),str(version))
fd = self.execute(self.cmd('version', path_repos))
self.assertEquals(fd.read().strip(), str(version))
self.assertSuccess(fd)
# Apply versioning to DB
self.assertSuccess(self.cmd('version_control',self.url,path_repos,version))
self.assertSuccess(self.cmd('version_control', self.url, path_repos, version))
# Test version number
fd=self.execute(self.cmd('db_version',self.url,path_repos))
self.assertEquals(fd.read().strip(),str(version))
fd = self.execute(self.cmd('db_version', self.url, path_repos))
self.assertEquals(fd.read().strip(), str(version))
self.assertSuccess(fd)
# Clean up
self.assertSuccess(self.cmd('drop_version_control',self.url,path_repos))
self.assertSuccess(self.cmd('drop_version_control', self.url, path_repos))
@fixture.usedb()
def test_upgrade(self):
@ -256,69 +288,72 @@ class TestShellDatabase(Shell,fixture.DB):
# Create a repository
repos_name = 'repos_name'
repos_path = self.tmp()
self.assertSuccess(self.cmd('create',repos_path,repos_name))
self.assertEquals(self.cmd_version(repos_path),0)
self.assertSuccess(self.cmd('create', repos_path,repos_name))
self.assertEquals(self.cmd_version(repos_path), 0)
# Version the DB
self.exitcode(self.cmd('drop_version_control',self.url,repos_path))
self.assertSuccess(self.cmd('version_control',self.url,repos_path))
self.exitcode(self.cmd('drop_version_control', self.url, repos_path))
self.assertSuccess(self.cmd('version_control', self.url, repos_path))
# Upgrades with latest version == 0
self.assertEquals(self.cmd_db_version(self.url,repos_path),0)
self.assertSuccess(self.cmd('upgrade',self.url,repos_path))
self.assertEquals(self.cmd_db_version(self.url,repos_path),0)
self.assertSuccess(self.cmd('upgrade',self.url,repos_path,0))
self.assertEquals(self.cmd_db_version(self.url,repos_path),0)
self.assertFailure(self.cmd('upgrade',self.url,repos_path,1))
self.assertFailure(self.cmd('upgrade',self.url,repos_path,-1))
self.assertEquals(self.cmd_db_version(self.url, repos_path), 0)
self.assertSuccess(self.cmd('upgrade', self.url, repos_path))
self.assertEquals(self.cmd_db_version(self.url, repos_path), 0)
self.assertSuccess(self.cmd('upgrade', self.url, repos_path, 0))
self.assertEquals(self.cmd_db_version(self.url, repos_path), 0)
self.assertFailure(self.cmd('upgrade', self.url, repos_path, 1))
self.assertFailure(self.cmd('upgrade', self.url, repos_path, -1))
# Add a script to the repository; upgrade the db
self.assertSuccess(self.cmd('script', '--repository=%s' % repos_path, 'Desc'))
self.assertEquals(self.cmd_version(repos_path),1)
self.assertEquals(self.cmd_version(repos_path), 1)
self.assertEquals(self.cmd_db_version(self.url,repos_path),0)
self.assertSuccess(self.cmd('upgrade',self.url,repos_path))
self.assertEquals(self.cmd_db_version(self.url,repos_path),1)
self.assertEquals(self.cmd_db_version(self.url, repos_path), 0)
self.assertSuccess(self.cmd('upgrade', self.url, repos_path))
self.assertEquals(self.cmd_db_version(self.url, repos_path), 1)
# Downgrade must have a valid version specified
self.assertFailure(self.cmd('downgrade',self.url, repos_path))
self.assertFailure(self.cmd('downgrade',self.url, repos_path, '-1', 2))
#self.assertFailure(self.cmd('downgrade',self.url, repos_path, '1', 2))
self.assertEquals(self.cmd_db_version(self.url, repos_path),1)
self.assertFailure(self.cmd('downgrade', self.url, repos_path))
self.assertFailure(self.cmd('downgrade', self.url, repos_path, '-1', 2))
#self.assertFailure(self.cmd('downgrade', self.url, repos_path, '1', 2))
self.assertEquals(self.cmd_db_version(self.url, repos_path), 1)
self.assertSuccess(self.cmd('downgrade', self.url, repos_path, 0))
self.assertEquals(self.cmd_db_version(self.url,repos_path),0)
self.assertEquals(self.cmd_db_version(self.url, repos_path), 0)
self.assertFailure(self.cmd('downgrade',self.url,repos_path,1))
self.assertEquals(self.cmd_db_version(self.url,repos_path),0)
self.assertFailure(self.cmd('downgrade',self.url, repos_path, 1))
self.assertEquals(self.cmd_db_version(self.url, repos_path), 0)
self.assertSuccess(self.cmd('drop_version_control',self.url,repos_path))
self.assertSuccess(self.cmd('drop_version_control', self.url, repos_path))
def _run_test_sqlfile(self,upgrade_script,downgrade_script):
def _run_test_sqlfile(self, upgrade_script, downgrade_script):
# TODO: add test script that checks if db really changed
repos_path = self.tmp()
repos_name = 'repos'
self.assertSuccess(self.cmd('create',repos_path,repos_name))
self.exitcode(self.cmd('drop_version_control',self.url,repos_path))
self.assertSuccess(self.cmd('version_control',self.url,repos_path))
self.assertEquals(self.cmd_version(repos_path),0)
self.assertEquals(self.cmd_db_version(self.url,repos_path),0)
self.assertSuccess(self.cmd('create', repos_path, repos_name))
self.exitcode(self.cmd('drop_version_control', self.url, repos_path))
self.assertSuccess(self.cmd('version_control', self.url, repos_path))
self.assertEquals(self.cmd_version(repos_path), 0)
self.assertEquals(self.cmd_db_version(self.url,repos_path), 0)
beforeCount = len(os.listdir(os.path.join(repos_path,'versions'))) # hmm, this number changes sometimes based on running from svn
beforeCount = len(os.listdir(os.path.join(repos_path, 'versions'))) # hmm, this number changes sometimes based on running from svn
self.assertSuccess(self.cmd('script_sql', '--repository=%s' % repos_path, 'postgres'))
self.assertEquals(self.cmd_version(repos_path),1)
self.assertEquals(self.cmd_version(repos_path), 1)
self.assertEquals(len(os.listdir(os.path.join(repos_path,'versions'))), beforeCount + 2)
open('%s/versions/001_postgres_upgrade.sql' % repos_path, 'a').write(upgrade_script)
open('%s/versions/001_postgres_downgrade.sql' % repos_path, 'a').write(downgrade_script)
self.assertEquals(self.cmd_db_version(self.url,repos_path),0)
self.assertEquals(self.cmd_db_version(self.url, repos_path), 0)
self.assertRaises(Exception, self.engine.text('select * from t_table').execute)
self.assertSuccess(self.cmd('upgrade',self.url,repos_path))
self.assertEquals(self.cmd_db_version(self.url,repos_path),1)
self.assertSuccess(self.cmd('upgrade', self.url,repos_path))
self.assertEquals(self.cmd_db_version(self.url,repos_path), 1)
self.engine.text('select * from t_table').execute()
self.assertSuccess(self.cmd('downgrade',self.url,repos_path,0))
self.assertEquals(self.cmd_db_version(self.url,repos_path),0)
self.assertSuccess(self.cmd('downgrade', self.url, repos_path, 0))
self.assertEquals(self.cmd_db_version(self.url, repos_path), 0)
self.assertRaises(Exception, self.engine.text('select * from t_table').execute)
# The tests below are written with some postgres syntax, but the stuff
@ -335,7 +370,7 @@ class TestShellDatabase(Shell,fixture.DB):
drop table t_table;
"""
self.meta.drop_all()
self._run_test_sqlfile(upgrade_script,downgrade_script)
self._run_test_sqlfile(upgrade_script, downgrade_script)
@fixture.usedb(supported='postgres')
@ -358,17 +393,17 @@ class TestShellDatabase(Shell,fixture.DB):
repos_name = 'repos_name'
repos_path = self.tmp()
self.assertSuccess(self.cmd('create',repos_path,repos_name))
self.exitcode(self.cmd('drop_version_control',self.url,repos_path))
self.assertSuccess(self.cmd('version_control',self.url,repos_path))
self.assertEquals(self.cmd_version(repos_path),0)
self.assertEquals(self.cmd_db_version(self.url,repos_path),0)
self.assertSuccess(self.cmd('create', repos_path, repos_name))
self.exitcode(self.cmd('drop_version_control', self.url, repos_path))
self.assertSuccess(self.cmd('version_control', self.url, repos_path))
self.assertEquals(self.cmd_version(repos_path), 0)
self.assertEquals(self.cmd_db_version(self.url, repos_path), 0)
# Empty script should succeed
self.assertSuccess(self.cmd('script', '--repository=%s' % repos_path, 'Desc'))
self.assertSuccess(self.cmd('test',repos_path,self.url))
self.assertEquals(self.cmd_version(repos_path),1)
self.assertEquals(self.cmd_db_version(self.url,repos_path),0)
self.assertSuccess(self.cmd('test', repos_path, self.url))
self.assertEquals(self.cmd_version(repos_path), 1)
self.assertEquals(self.cmd_db_version(self.url, repos_path), 0)
# Error script should fail
script_path = self.tmp_py()
@ -384,12 +419,13 @@ class TestShellDatabase(Shell,fixture.DB):
print 'sdfsgf'
raise Exception()
""".replace("\n ","\n")
file=open(script_path,'w')
file = open(script_path, 'w')
file.write(script_text)
file.close()
self.assertFailure(self.cmd('test',repos_path,self.url,'blah blah'))
self.assertEquals(self.cmd_version(repos_path),1)
self.assertEquals(self.cmd_db_version(self.url,repos_path),0)
self.assertFailure(self.cmd('test', repos_path, self.url, 'blah blah'))
self.assertEquals(self.cmd_version(repos_path), 1)
self.assertEquals(self.cmd_db_version(self.url, repos_path),0)
# Nonempty script using migrate_engine should succeed
script_path = self.tmp_py()
@ -412,12 +448,12 @@ class TestShellDatabase(Shell,fixture.DB):
# Operations to reverse the above upgrade go here.
meta.drop_all()
""".replace("\n ","\n")
file=open(script_path,'w')
file = open(script_path, 'w')
file.write(script_text)
file.close()
self.assertSuccess(self.cmd('test',repos_path,self.url))
self.assertEquals(self.cmd_version(repos_path),1)
self.assertEquals(self.cmd_db_version(self.url,repos_path),0)
self.assertSuccess(self.cmd('test', repos_path, self.url))
self.assertEquals(self.cmd_version(repos_path), 1)
self.assertEquals(self.cmd_db_version(self.url, repos_path), 0)
@fixture.usedb()
def test_rundiffs_in_shell(self):