anvil/tools/multipip
Joshua Harlow 86bf535a0a Adjust multipip usage.
Use json as the output format and only use stderr for
when the multipip program really fails.

Instead of outputting to the screen all incompatible errors
just output the incompatible versions to the output json
data structure and use this in the main anvil program.

Use defaultdict + list in multipip where appropriate.

Ensure that as documented we select the first matching requirement
that comes out of multipip instead of the last (when incompat. matches
happen).

Adjust documentation for new json output format.

Change-Id: I6403182066ef01cb3f7f26c305ab5577a00043ae
2013-06-05 10:36:34 -07:00

343 lines
11 KiB
Python
Executable File

#!/usr/bin/python
import argparse
import collections
import distutils.spawn
import json
import logging
import os
import subprocess
import sys
import pip.index
import pip.req
from pip.vcs import git, mercurial, subversion, bazaar
import pkg_resources
BAD_REQUIREMENTS = 2
logger = logging.getLogger()
def create_parser():
parser = argparse.ArgumentParser()
parser.add_argument(
"-r", "--requirement",
dest="requirements",
nargs="*",
default=[],
metavar="<file>",
help="Install all the packages listed in the given requirements file")
parser.add_argument(
"requirement_specs",
nargs="*",
default=[],
metavar="<requirement specifier>",
help="Install specified package")
parser.add_argument(
# A regex to be used to skip requirements
"--skip-requirements-regex",
default="",
help=argparse.SUPPRESS)
parser.add_argument(
# The default version control system for editables, e.g. 'svn'
'--default-vcs',
dest='default_vcs',
default='',
help=argparse.SUPPRESS)
parser.add_argument(
"--debug", "-d",
action="store_true",
default=False,
help="Print debug information")
parser.add_argument(
"--ignore-installed", "-i",
action="store_true",
default=False,
help="Ignore installed packages")
parser.add_argument(
"--ignore-packages",
nargs="*",
default=[],
metavar="<requirement specifier>",
help="Ignore listed packages")
parser.add_argument(
"--frozen", "-f",
action="store_true",
default=False,
help="Make requirements meet installed packages (taken from pip freeze)")
pip_executable = (distutils.spawn.find_executable("pip") or
distutils.spawn.find_executable("pip-python"))
parser.add_argument(
"--pip",
metavar="<filename>",
default=pip_executable,
help="Full or short name of pip executable (default: %s)" %
pip_executable)
return parser
def setup_logging(options):
level = logging.DEBUG if options.debug else logging.WARNING
handler = logging.StreamHandler(sys.stderr)
logger.addHandler(handler)
logger.setLevel(level)
incompatibles = collections.defaultdict(list)
joined_requirements = []
def install_requirement_ensure_req_field(req):
if not hasattr(req, 'req') or not req.req:
# pip 0.8 or so
link = pip.index.Link(req.url)
name = link.egg_fragment
if not name:
raise Exception("Cannot find package name from `%s'" % req.url)
req.req = pkg_resources.Requirement.parse(name)
return req
def install_requirement_str(req):
return req.url or str(req.req)
def install_requirement_parse(line, comes_from):
line = line.strip()
if line.startswith('-e') or line.startswith('--editable'):
if line.startswith('-e'):
line = line[2:].strip()
else:
line = line[len('--editable'):].strip().lstrip('=')
req = pip.req.InstallRequirement.from_editable(
line, comes_from=comes_from)
else:
req = pip.req.InstallRequirement.from_line(line, comes_from)
return install_requirement_ensure_req_field(req)
def add_incompatible_requirement(chosen, conflicting):
global incompatibles
key = chosen.req.key
if key not in incompatibles:
incompatibles[key].extend([install_requirement_str(conflicting),
install_requirement_str(chosen)])
else:
incompatibles[key].append(install_requirement_str(chosen))
def parse_requirements(options):
"""Parse package requirements from command line and files.
:return: tuple (all, ignored) of InstallRequirement
"""
all_requirements = collections.defaultdict(list)
# CLI directly provided requirements first.
for filename in options.requirements:
try:
for req in pip.req.parse_requirements(filename, options=options):
req = install_requirement_ensure_req_field(req)
all_requirements[req.req.key].append(req)
except Exception as ex:
logger.error("Cannot parse `%s': %s" % (filename, ex))
sys.exit(BAD_REQUIREMENTS)
# CLI provided files after that.
for req_spec in options.requirement_specs:
try:
req = install_requirement_parse(req_spec, "command line")
all_requirements[req.req.key].append(req)
except Exception as ex:
logger.error("Cannot parse `%s': %s" % (req_spec, ex))
sys.exit(BAD_REQUIREMENTS)
ignored_requirements = []
for req_spec in options.ignore_packages:
try:
req = install_requirement_parse(req_spec, "command line")
ignored_requirements.append(req)
except Exception as ex:
logger.error("Cannot parse `%s': %s" % (req_spec, ex))
sys.exit(BAD_REQUIREMENTS)
return all_requirements, ignored_requirements
def installed_packages(options):
pip_cmdline = [
options.pip,
"freeze",
]
(package_list, _) = subprocess.Popen(
pip_cmdline, stdout=subprocess.PIPE).communicate()
pkg_list = []
for line in package_list.splitlines():
try:
pkg_list.append(install_requirement_parse(line, "pip freeze").req)
except Exception:
pass
return pkg_list
def join_one_requirement(req_list):
"""Join requirement list for one package together.
Possible returns:
* ==A - exact version (even when there are conflicts)
* >=?A,<=?B,(!=C)+ - line segment (no conflicts detected)
* >=?A,(!=C)+ - more than (also when conflicts detected)
:param:req_list list of pip.req.InstallRequirement
:return: pip.req.InstallRequirement
"""
if len(req_list) == 1:
return req_list[0]
req_strict = None
lower_bound_str = None
lower_bound_version = None
lower_bound_req = None
upper_bound_str = None
upper_bound_version = None
upper_bound_req = None
conflicts = []
for req in req_list:
for spec in req.req.specs:
if spec[0] == "==":
return req
spec_str = "%s%s" % spec
if spec[0] == "!=":
conflicts.append(spec_str)
continue
version = pkg_resources.parse_version(spec[1])
# strict_check is < or >, not <= or >=
strict_check = len(spec[0]) == 1
if spec[0][0] == ">":
if (not lower_bound_version or (version > lower_bound_version) or
(strict_check and version == lower_bound_version)):
lower_bound_version = version
lower_bound_str = spec_str
lower_bound_req = req
else:
if (not upper_bound_version or (version < upper_bound_version) or
(strict_check and version == upper_bound_version)):
upper_bound_version = version
upper_bound_str = spec_str
upper_bound_req = req
if lower_bound_version and upper_bound_version:
bad_bounds = False
if lower_bound_version > upper_bound_version:
upper_bound_str = None
if lower_bound_version == upper_bound_version:
if lower_bound_str[1] == "=" and upper_bound_str[1] == "=":
return pip.req.InstallRequirement.from_line(
"%s==%s" % (req_key, upper_bound_str[2:]),
"compiled")
else:
upper_bound_str = None
req_specs = []
req_key = req_list[0].req.key
if lower_bound_str:
req_specs.append(lower_bound_str)
if upper_bound_str:
req_specs.append(upper_bound_str)
req_specs.extend(conflicts)
return pip.req.InstallRequirement.from_line(
"%s%s" % (req_key, ",".join(req_specs)),
"compiled")
def join_requirements(options):
global joined_requirements
all_requirements, ignored_requirements = parse_requirements(options)
skip_keys = set(pkg.req.key for pkg in ignored_requirements)
installed_by_key = {}
installed_requrements = []
if options.ignore_installed or options.frozen:
installed_requrements = installed_packages(options)
if options.ignore_installed:
skip_keys |= set(pkg.key for pkg in installed_requrements)
if options.frozen:
installed_by_key = dict((pkg.key, pkg) for pkg in installed_requrements)
for req_key, req_list in all_requirements.iteritems():
if req_key in skip_keys:
continue
joined_req = join_one_requirement(req_list)
try:
installed_req = installed_by_key[req_key]
installed_version = installed_req.index[0][0]
except (KeyError, IndexError):
pass
else:
if installed_version not in joined_req.req:
frozen_req = pip.req.InstallRequirement.from_line(
"%s>=%s" % (installed_req.project_name,
installed_req.specs[0][1]),
"pip freeze")
add_incompatible_requirement(frozen_req, joined_req)
joined_req = frozen_req
joined_requirements.append(joined_req.req)
segment_ok = False
lower_version = None
lower_strict = False
exact_version = None
conflicts = []
for parsed, trans, op, ver in joined_req.req.index:
if op[0] == ">":
lower_version = parsed
lower_strict = len(op) == 2
elif op[0] == "<":
segment_ok = True
elif op[0] == "=":
exact_version = parsed
else:
conflicts.append(parsed)
if exact_version:
for req in req_list:
if not exact_version in req.req:
add_incompatible_requirement(joined_req, req)
else:
for req in req_list:
for parsed, trans, op, ver in req.req.index:
if op[0] == "=":
if parsed in conflicts:
add_incompatible_requirement(joined_req, req)
break
elif not segment_ok and op[0] == "<":
# analyse lower bound: x >= A or x > A
if (lower_version > parsed or (
lower_version == parsed and
(lower_strict or len(op) != 2))):
add_incompatible_requirement(joined_req, req)
break
def print_requirements():
global joined_requirements
global incompatibles
requirements = {
'compatibles': [],
'incompatibles': {},
}
for r in joined_requirements:
if r.key not in incompatibles:
requirements['compatibles'].append(str(r))
requirements['incompatibles'].update(incompatibles)
print(json.dumps(requirements))
def main():
parser = create_parser()
options = parser.parse_args()
setup_logging(options)
join_requirements(options)
print_requirements()
if __name__ == "__main__":
main()