apply option parsing patch for Issue 54 by iElectric

This commit is contained in:
jan.dittberner 2009-05-23 22:12:40 +00:00
parent 77c1cae8a9
commit 1b927fa427
4 changed files with 155 additions and 124 deletions

View File

@ -102,6 +102,15 @@ between the script :file:`manage.py` in the current directory and the
script inside the repository is, that the one in the current directory
has the database URL preconfigured.
.. versionchanged:: 0.5.4
Whole command line parsing was rewriten from scratch, with use of OptionParser.
Options passed as kwargs to migrate.versioning.shell.main are now parsed correctly.
Options are passed to commands in the following priority (starting from highest):
- optional (given by --option in commandline)
- normal arguments
- kwargs passed to migrate.versioning.shell.main
Making schema changes
=====================

View File

@ -44,6 +44,7 @@ cls_vernum = version.VerNum
cls_script_python = script_.PythonScript
# deprecated
def help(cmd=None, **opts):
"""%prog help COMMAND
@ -68,7 +69,7 @@ def create(repository, name, **opts):
Create an empty repository at the specified path.
You can specify the version_table to be used; by default, it is
'_version'. This table is created in all version-controlled
'migrate_version'. This table is created in all version-controlled
databases.
"""
try:
@ -103,8 +104,8 @@ def script_sql(database, repository=None, **opts):
or generic ('default').
For instance, manage.py script_sql postgres creates:
repository/versions/001_upgrade_postgres.py and
repository/versions/001_downgrade_postgres.py
repository/versions/001_upgrade_postgres.sql and
repository/versions/001_downgrade_postgres.sql
"""
try:
if repository is None:

View File

@ -1,10 +1,12 @@
"""The migrate command-line tool.
"""
"""The migrate command-line tool."""
import sys
from migrate.versioning.base import *
from optparse import OptionParser,Values
from migrate.versioning import api,exceptions
import inspect
from optparse import OptionParser, BadOptionError
from migrate.versioning.base import *
from migrate.versioning import api, exceptions
alias = dict(
s=api.script,
@ -18,133 +20,139 @@ def alias_setup():
setattr(api,key,val)
alias_setup()
class ShellUsageError(Exception):
def die(self,exitcode=None):
usage="""%%prog COMMAND ...
Available commands:
class PassiveOptionParser(OptionParser):
def _process_args(self, largs, rargs, values):
"""little hack to support all --some_option=value parameters"""
while rargs:
arg = rargs[0]
if arg == "--":
del rargs[0]
return
elif arg[0:2] == "--":
# if parser does not know about the option, pass it along (make it anonymous)
try:
opt = arg.split('=', 1)[0]
self._match_long_opt(opt)
except BadOptionError:
largs.append(arg)
del rargs[0]
else:
self._process_long_opt(rargs, values)
elif arg[:1] == "-" and len(arg) > 1:
self._process_short_opts(rargs, values)
elif self.allow_interspersed_args:
largs.append(arg)
del rargs[0]
else:
return
def main(argv=None, **kwargs):
"""kwargs are default options that can be overriden with passing --some_option to cmdline"""
argv = argv or list(sys.argv[1:])
commands = list(api.__all__)
commands.sort()
usage="""%%prog COMMAND ...
Available commands:
%s
Enter "%%prog help COMMAND" for information on a particular command.
"""
usage = usage.replace("\n"+" "*8,"\n")
commands = list(api.__all__)
commands.sort()
commands = '\n'.join(map((lambda x:'\t'+x),commands))
message = usage%commands
try:
message = message.replace('%prog',sys.argv[0])
except IndexError:
pass
Enter "%%prog help COMMAND" for information on a particular command.
""" % '\n\t'.join(commands)
if self.args[0] is not None:
message += "\nError: %s\n"%str(self.args[0])
if exitcode is None:
exitcode = 1
if exitcode is None:
exitcode = 0
die(message,exitcode)
parser = PassiveOptionParser(usage=usage)
parser.add_option("-v", "--verbose", action="store_true", dest="verbose")
parser.add_option("-d", "--debug", action="store_true", dest="debug")
parser.add_option("-f", "--force", action="store_true", dest="force")
help_commands = ['help', '-h', '--help']
HELP = False
def die(message,exitcode=1):
if message is not None:
sys.stderr.write(message)
sys.stderr.write("\n")
raise SystemExit(int(exitcode))
kwmap = dict(
v='verbose',
d='debug',
f='force',
)
def kwparse(arg):
ret = arg.split('=',1)
if len(ret) == 1:
# No value specified (--kw, not --kw=stuff): use True
ret = [ret[0],True]
return ret
def parse_arg(arg,argnames):
global kwmap
if arg.startswith('--'):
# Keyword-argument; either --keyword or --keyword=value
kw,val = kwparse(arg[2:])
elif arg.startswith('-'):
# Short form of a keyword-argument; map it to a keyword
try:
parg = kwmap.get(arg)
except KeyError:
raise ShellUsageError("Invalid argument: %s"%arg)
kw,val = kwparse(parg)
else:
# Simple positional parameter
val = arg
try:
kw = argnames.pop(0)
except IndexError,e:
raise ShellUsageError("Too many arguments to command")
return kw,val
def parse_args(*args,**kwargs):
"""Map positional arguments to keyword-args"""
args=list(args)
try:
cmdname = args.pop(0)
if cmdname == 'downgrade':
if not args[-1].startswith('--'):
try:
kwargs['version'] = str(int(args[-1]))
args = args[:-1]
except:
pass
command = argv.pop(0)
if command in help_commands:
HELP = True
command = argv.pop(0)
except IndexError:
# No command specified: no error message; just show usage
raise ShellUsageError(None)
parser.print_help()
return
# Special cases: -h and --help should act like 'help'
if cmdname == '-h' or cmdname == '--help':
cmdname = 'help'
command_func = getattr(api, command, None)
if command_func is None or command.startswith('_'):
parser.error("Invalid command %s" % command)
cmdfunc = getattr(api,cmdname,None)
if cmdfunc is None or cmdname.startswith('_'):
raise ShellUsageError("Invalid command %s"%cmdname)
parser.set_usage(inspect.getdoc(command_func))
f_args, f_varargs, f_kwargs, f_defaults = inspect.getargspec(command_func)
for arg in f_args:
parser.add_option(
"--%s" % arg,
dest=arg,
action='store',
type="string")
argnames, p,k, defaults = inspect.getargspec(cmdfunc)
argnames_orig = list(argnames)
# display help of the current command
if HELP:
parser.print_help()
return
options, args = parser.parse_args(argv)
# override kwargs with anonymous parameters
override_kwargs = dict()
for arg in list(args):
if arg.startswith('--'):
args.remove(arg)
if '=' in arg:
opt, value = arg[2:].split('=', 1)
else:
opt = arg[2:]
value = True
override_kwargs[opt] = value
# override kwargs with options if user is overwriting
for key, value in options.__dict__.iteritems():
if value is not None:
override_kwargs[key] = value
# arguments that function accepts without passed kwargs
f_required = list(f_args)
candidates = dict(kwargs)
candidates.update(override_kwargs)
for key, value in candidates.iteritems():
if key in f_args:
f_required.remove(key)
# map function arguments to parsed arguments
for arg in args:
kw,val = parse_arg(arg,argnames)
kwargs[kw] = val
try:
kw = f_required.pop(0)
except IndexError:
parser.error("Too many arguments for command %s: %s" % (command, arg))
kwargs[kw] = arg
if defaults is not None:
num_defaults = len(defaults)
else:
# apply overrides
kwargs.update(override_kwargs)
# check if all args are given
try:
num_defaults = len(f_defaults)
except TypeError:
num_defaults = 0
req_argnames = argnames_orig[:len(argnames_orig)-num_defaults]
for name in req_argnames:
if name not in kwargs:
raise ShellUsageError("Too few arguments: %s not specified"%name)
return cmdfunc,kwargs
def main(argv=None,**kwargs):
if argv is None:
argv = list(sys.argv[1:])
f_args_default = f_args[len(f_args) - num_defaults:]
required = list(set(f_required) - set(f_args_default))
if required:
parser.error("Not enough arguments for command %s: %s not specified" % (command, ', '.join(required)))
# handle command
try:
command, kwargs = parse_args(*argv,**kwargs)
except ShellUsageError,e:
e.die()
try:
ret = command(**kwargs)
ret = command_func(**kwargs)
if ret is not None:
print ret
except exceptions.UsageError,e:
e = ShellUsageError(e.args[0])
e.die()
except exceptions.KnownError,e:
die(e.args[0])
except (exceptions.UsageError, exceptions.KnownError), e:
if e.args[0] is None:
parser.print_help()
parser.error(e.args[0])
if __name__=="__main__":
main()

View File

@ -17,7 +17,7 @@ class Shell(fixture.Shell):
p = map(lambda s: str(s),p)
ret = ' '.join([cls._cmd]+p)
return ret
def execute(self,shell_cmd,runshell=None):
def execute(self, shell_cmd, runshell=None, **kwargs):
"""A crude simulation of a shell command, to speed things up"""
# If we get an fd, the command is already done
if isinstance(shell_cmd, FileType) or isinstance(shell_cmd, StringIO):
@ -46,7 +46,7 @@ class Shell(fixture.Shell):
# Execute this command
try:
try:
shell.main(params)
shell.main(params, **kwargs)
except SystemExit,e:
# Simulate the exit status
fd_close=fd.close
@ -103,7 +103,7 @@ class TestShellCommands(Shell):
output = fd.read()
self.assertNotEquals(output,'')
self.assertSuccess(fd)
def test_create(self):
"""Repositories are created successfully"""
repos=self.tmp_repos()
@ -142,6 +142,7 @@ class TestShellCommands(Shell):
self.assert_(os.path.exists('%s/versions/002_mydb_upgrade.sql' % repos))
self.assert_(os.path.exists('%s/versions/002_mydb_downgrade.sql' % repos))
def test_manage(self):
"""Create a project management script"""
script=self.tmp_py()
@ -156,7 +157,7 @@ class TestShellRepository(Shell):
"""Create repository, python change script"""
self.path_repos=repos=self.tmp_repos()
self.assertSuccess(self.cmd('create',repos,'repository_name'))
def test_version(self):
"""Correctly detect repository version"""
# Version: 0 (no scripts yet); successful execution
@ -172,6 +173,7 @@ class TestShellRepository(Shell):
fd=self.execute(self.cmd('version',self.path_repos))
self.assertEquals(fd.read().strip(),"1")
self.assertSuccess(fd)
def test_source(self):
"""Correctly fetch a script's source"""
self.assertSuccess(self.cmd('script', '--repository=%s' % self.path_repos, 'Desc'))
@ -212,6 +214,18 @@ class TestShellDatabase(Shell,fixture.DB):
# Attempting to drop vc from a database without it should fail
self.assertFailure(self.cmd('drop_version_control',self.url,path_repos))
@fixture.usedb()
def test_wrapped_kwargs(self):
"""Commands with default arguments set by manage.py"""
path_repos=repos=self.tmp_repos()
self.assertSuccess(self.cmd('create', 'repository_name'), repository=path_repos)
self.exitcode(self.cmd('drop_version_control'), url=self.url, repository=path_repos)
self.assertSuccess(self.cmd('version_control'), url=self.url, repository=path_repos)
# Clean up
self.assertSuccess(self.cmd('drop_version_control'), url=self.url, repository=path_repos)
# Attempting to drop vc from a database without it should fail
self.assertFailure(self.cmd('drop_version_control'), url=self.url, repository=path_repos)
@fixture.usedb()
def test_version_control_specified(self):
"""Ensure we can set version control to a particular version"""
@ -469,7 +483,7 @@ class TestShellDatabase(Shell,fixture.DB):
# We're happy with db changes, make first db upgrade script to go from version 0 -> 1.
output, exitcode = self.output_and_exitcode('python %s make_update_script_for_model' % script_path) # intentionally omit a parameter
self.assertEquals('Error: Too few arguments' in output, True)
self.assertEquals('Not enough arguments' in output, True)
output, exitcode = self.output_and_exitcode('python %s make_update_script_for_model --oldmodel=oldtestmodel.meta' % script_path)
assert """from sqlalchemy import *
from migrate import *
@ -500,4 +514,3 @@ def downgrade():
self.assertEquals(exitcode, None)
self.assertEquals(self.cmd_version(repos_path),1)
self.assertEquals(self.cmd_db_version(self.url,repos_path),1)