479 lines
17 KiB
Python
Executable File
479 lines
17 KiB
Python
Executable File
#!/usr/bin/env python
|
|
|
|
from __future__ import print_function
|
|
|
|
import collections
|
|
import itertools
|
|
import logging
|
|
import re
|
|
import sys
|
|
|
|
import argparse
|
|
import six
|
|
|
|
import pip.index
|
|
import pip.req
|
|
import pkg_resources
|
|
|
|
|
|
# Use this for the sorting order of operators
|
|
OP_ORDER = ('!=', '==', '<', '<=', '>', '>=')
|
|
|
|
# Exit codes that are returned on various issues.
|
|
BAD_REQUIREMENTS = 2
|
|
INCOMPATIBLE_REQUIREMENTS = 3
|
|
|
|
LOGGER = logging.getLogger()
|
|
|
|
|
|
class RequirementException(Exception):
|
|
pass
|
|
|
|
|
|
def create_parser():
|
|
parser = argparse.ArgumentParser()
|
|
|
|
parser.add_argument(
|
|
"-r", "--requirement",
|
|
dest="requirements",
|
|
nargs="*",
|
|
default=[],
|
|
metavar="<file>",
|
|
help="Analyze all the packages listed in the given requirements file")
|
|
parser.add_argument(
|
|
"requirement_specs",
|
|
nargs="*",
|
|
default=[],
|
|
metavar="<requirement specifier>",
|
|
help="Analyze specified package")
|
|
parser.add_argument(
|
|
# A regex to be used to skip requirements
|
|
"--skip-requirements-regex",
|
|
default="",
|
|
help=argparse.SUPPRESS)
|
|
parser.add_argument(
|
|
# The default version control system for editables, e.g. 'svn'
|
|
'--default-vcs',
|
|
dest='default_vcs',
|
|
default='',
|
|
help=argparse.SUPPRESS)
|
|
parser.add_argument(
|
|
"--debug", "-d",
|
|
action="store_true",
|
|
default=False,
|
|
help="Print debug information")
|
|
parser.add_argument(
|
|
"--ignore-packages",
|
|
nargs="*",
|
|
default=[],
|
|
metavar="<requirement specifier>",
|
|
help="Ignore listed packages")
|
|
parser.add_argument(
|
|
"--force-packages",
|
|
nargs="*",
|
|
default=[],
|
|
metavar="<requirement specifier>",
|
|
help="Ignore listed packages")
|
|
return parser
|
|
|
|
|
|
def setup_logging(options):
|
|
level = logging.DEBUG if options.debug else logging.WARNING
|
|
handler = logging.StreamHandler(sys.stderr)
|
|
LOGGER.addHandler(handler)
|
|
LOGGER.setLevel(level)
|
|
|
|
|
|
def install_requirement_ensure_req_field(req):
|
|
if not hasattr(req, 'req') or not req.req:
|
|
# pip 0.8 or so
|
|
link = pip.index.Link(req.url)
|
|
name = link.egg_fragment
|
|
if not name:
|
|
raise Exception("Cannot find package name from `%s'" % req.url)
|
|
req.req = pkg_resources.Requirement.parse(name)
|
|
return req
|
|
|
|
|
|
def install_requirement_str(req):
|
|
return req.url or str(req.req)
|
|
|
|
|
|
def install_requirement_parse(line, comes_from):
|
|
line = line.strip()
|
|
if line.startswith('-e') or line.startswith('--editable'):
|
|
if line.startswith('-e'):
|
|
line = line[2:].strip()
|
|
else:
|
|
line = line[len('--editable'):].strip().lstrip('=')
|
|
req = pip.req.InstallRequirement.from_editable(
|
|
line, comes_from=comes_from)
|
|
else:
|
|
req = pip.req.InstallRequirement.from_line(line, comes_from)
|
|
return install_requirement_ensure_req_field(req)
|
|
|
|
|
|
def iter_combinations(elements, include_empty=False):
|
|
"""Iterates over all combinations of the given elements list."""
|
|
if include_empty:
|
|
start = 0
|
|
else:
|
|
start = 1
|
|
for i in range(start, len(elements) + 1):
|
|
for c in itertools.combinations(elements, i):
|
|
yield c
|
|
|
|
|
|
def conflict_scorer(versioned):
|
|
"""Scores a list of (op, version) tuples, a higher score means more likely
|
|
to cause conflicts while a lower score means less likely to cause
|
|
conflicts. A zero score means that no conflicts have been detected (aka
|
|
when installing no version issues will be encountered).
|
|
"""
|
|
if len(versioned) == 1:
|
|
# A single list has no capability to conflict with anything.
|
|
return 0
|
|
# Group by operator (and the versions that those operators are compatible
|
|
# with).
|
|
op_versions = collections.defaultdict(list)
|
|
for (op, version) in versioned:
|
|
op_versions[op].append(version)
|
|
score = 0
|
|
for version in sorted(op_versions.get("==", [])):
|
|
for (op, version2) in versioned:
|
|
# Any request for something not this version is a conflict.
|
|
if version != version2:
|
|
score += 1
|
|
# Any request for something is this version, but isn't '=='
|
|
# is also a conflict.
|
|
if version == version2 and op != "==":
|
|
score += 1
|
|
for version in sorted(op_versions.get("!=", [])):
|
|
for (op, version2) in versioned:
|
|
if op in ["!=", ">", "<"]:
|
|
continue
|
|
# Anything that is includes this version would be a conflict,
|
|
# thats why we exclude !=, <, and > from the above since
|
|
# those exclude versions.
|
|
if version2 == version:
|
|
score += 1
|
|
for version in sorted(op_versions.get(">", [])):
|
|
for (op, version2) in versioned:
|
|
if (op, version2) == (">", version):
|
|
continue
|
|
# A request for a lower version than the desired greater than
|
|
# version is a conflict.
|
|
if op in ["<", "<="] and version2 <= version:
|
|
score += 1
|
|
# A request for an inclusive version and matching with this
|
|
# version is also a conflict (since both can not be satisfied).
|
|
elif op in ["==", ">="] and version2 == version:
|
|
score += 1
|
|
# If another request asks for a version less than this version but
|
|
# also is asking for a greater than operator, that version spans
|
|
# a wider range of compatible versions and therefore we are in
|
|
# more of a conflict than that version.
|
|
elif op == ">" and version2 < version:
|
|
score += 1
|
|
elif op == ">=" and version2 <= version:
|
|
score += 1
|
|
for version in sorted(op_versions.get(">=", [])):
|
|
for (op, version2) in versioned:
|
|
if (op, version2) == (">=", version):
|
|
continue
|
|
if op in ["<", "<="] and version2 < version:
|
|
score += 1
|
|
elif op in [">=", ">"] and version2 < version:
|
|
score += 1
|
|
for version in sorted(op_versions.get("<", [])):
|
|
for (op, version2) in versioned:
|
|
if (op, version2) == ("<", version):
|
|
continue
|
|
if op in [">", ">="] and version2 >= version:
|
|
score += 1
|
|
elif op in ["==", "<="] and version2 == version:
|
|
score += 1
|
|
elif op == "<" and version2 > version:
|
|
score += 1
|
|
elif op == "<=" and version2 >= version:
|
|
score += 1
|
|
for version in sorted(op_versions.get("<=", [])):
|
|
for (op, version2) in versioned:
|
|
if (op, version2) == ("<=", version):
|
|
continue
|
|
if op in [">", ">="] and version2 > version:
|
|
score += 1
|
|
elif op in ["<=", "<"] and version2 > version:
|
|
score += 1
|
|
return score
|
|
|
|
|
|
def find_best_match(versioned, counts, scorer_func):
|
|
"""Iterates over all combinations of the given version and comparator in
|
|
the provided lists and finds the one with the best score (closest to zero
|
|
with the maximum number of elements).
|
|
"""
|
|
scored = []
|
|
for combo in iter_combinations(versioned):
|
|
scored.append((combo, scorer_func(combo)))
|
|
if len(scored) == 0:
|
|
raise ValueError("No version combinations scored")
|
|
|
|
# Find the lowest score with the highest number of elements.
|
|
min_score = sys.maxint
|
|
for (combo, combo_score) in scored:
|
|
if combo_score < min_score:
|
|
min_score = combo_score
|
|
max_elems = -1
|
|
best_matches = []
|
|
for (combo, combo_score) in scored:
|
|
if min_score == combo_score:
|
|
if len(combo) > max_elems:
|
|
best_matches = [combo]
|
|
max_elems = len(combo)
|
|
if len(combo) == max_elems:
|
|
best_matches.append(combo)
|
|
|
|
if len(best_matches) == 1:
|
|
best_match = best_matches[0]
|
|
else:
|
|
# If equivalent scores, then select the one that has the most requests
|
|
# for its combinations over ones that have less requests.
|
|
best_score = -1
|
|
best_match = None
|
|
for match in best_matches:
|
|
match_score = 0
|
|
for combo in match:
|
|
match_score += counts.get(combo, 0)
|
|
if match_score > best_score:
|
|
best_score = match_score
|
|
best_match = match
|
|
incompatibles = set()
|
|
for (combo, combo_score) in scored:
|
|
for spec in combo:
|
|
if spec not in best_match:
|
|
incompatibles.add(spec)
|
|
return (best_match, incompatibles)
|
|
|
|
|
|
def best_match(req_key, req_list, forced_req=None):
|
|
"""Attempts to find the versions which will work the best for the given
|
|
requirement specification list.
|
|
"""
|
|
def fetch_specs(req):
|
|
try:
|
|
specs = req.req.specs
|
|
except (AttributeError, TypeError):
|
|
specs = []
|
|
return tuple(specs)
|
|
|
|
all_specs = []
|
|
for req in req_list:
|
|
all_specs.extend(fetch_specs(req))
|
|
if not all_specs:
|
|
return (req_list, [])
|
|
|
|
def spec_sort(spec1, spec2):
|
|
# Ensure there is always a well defined spec ordering so that the
|
|
# selection of matched specs is also well defined.
|
|
(op1, version1) = spec1
|
|
(op2, version2) = spec2
|
|
c = cmp(version1, version2)
|
|
if c == 0:
|
|
c = cmp(OP_ORDER.index(op1), OP_ORDER.index(op2))
|
|
return c
|
|
|
|
def reform(specs, versions):
|
|
# Covert the parsed versions back into the string versions so that
|
|
# we can return that as matches (instead of the comparable versions).
|
|
cleaned_specs = []
|
|
for (op, version) in specs:
|
|
cleaned_specs.append((op, versions[version]))
|
|
# Try to see if any of the requirements that we had actually had this
|
|
# exact spec, if so then just return that as the requirement, if not
|
|
# create a requirement instead.
|
|
specs = tuple(cleaned_specs)
|
|
sources = []
|
|
for req in req_list:
|
|
if fetch_specs(req) == specs:
|
|
sources.append(req)
|
|
if not sources:
|
|
spec_pieces = []
|
|
for (op, version) in specs:
|
|
spec_pieces.append("%s%s" % (op, version))
|
|
spec = "%s%s" % (req_key, ",".join(spec_pieces))
|
|
sources.append(pip.req.InstallRequirement.from_line(spec, 'compiled'))
|
|
return sources
|
|
|
|
def reform_incompatibles(incompatible_specs, versions):
|
|
causes = []
|
|
for (op, version) in incompatible_specs:
|
|
matches = 0
|
|
version = versions[version]
|
|
for req in req_list:
|
|
matched = False
|
|
for (op2, version2) in fetch_specs(req):
|
|
if (op2, version2) == (op, version):
|
|
matched = True
|
|
break
|
|
if matched:
|
|
if req not in causes:
|
|
causes.append(req)
|
|
matches += 1
|
|
if not matches:
|
|
spec_pieces = "%s%s" % (op, version)
|
|
spec = "%s%s" % (req_key, spec_pieces)
|
|
causes.append(pip.req.InstallRequirement.from_line(spec,
|
|
"compiled conflict"))
|
|
return causes
|
|
|
|
if forced_req is None:
|
|
versions = {}
|
|
versioned = set()
|
|
counts = collections.defaultdict(int)
|
|
for (op, version) in all_specs:
|
|
parsed_version = pkg_resources.parse_version(version)
|
|
versioned.add((op, parsed_version))
|
|
versions[parsed_version] = version
|
|
counts[(op, parsed_version)] += 1
|
|
versioned = list(sorted(versioned, cmp=spec_sort))
|
|
initial_score = conflict_scorer(versioned)
|
|
if initial_score == 0:
|
|
incompatibles = []
|
|
match = versioned
|
|
else:
|
|
match, incompatibles = find_best_match(versioned, counts, conflict_scorer)
|
|
return (reform(match, versions),
|
|
reform_incompatibles(incompatibles, versions))
|
|
else:
|
|
return ([forced_req], req_list)
|
|
|
|
|
|
def parse_requirements(options):
|
|
"""Parse package requirements from command line and files.
|
|
|
|
:return: tuple (all, ignored) of InstallRequirement
|
|
"""
|
|
|
|
def req_key(req):
|
|
return req.req.key
|
|
|
|
all_requirements = {}
|
|
skip_match = None
|
|
if options.skip_requirements_regex:
|
|
skip_match = re.compile(options.skip_requirements_regex)
|
|
for req_spec in options.requirement_specs:
|
|
try:
|
|
req = install_requirement_parse(req_spec, "command line")
|
|
if skip_match and skip_match.search(req_key(req)):
|
|
continue
|
|
all_requirements.setdefault(req_key(req), []).append(req)
|
|
except Exception as ex:
|
|
raise RequirementException("Cannot parse `%s': %s" % (req_spec, ex))
|
|
for filename in options.requirements:
|
|
try:
|
|
for req in pip.req.parse_requirements(filename):
|
|
req = install_requirement_ensure_req_field(req)
|
|
if skip_match and skip_match.search(req_key(req)):
|
|
continue
|
|
all_requirements.setdefault(req_key(req), []).append(req)
|
|
except Exception as ex:
|
|
raise RequirementException("Cannot parse `%s': %s" % (filename, ex))
|
|
ignored_requirements = []
|
|
for req_spec in options.ignore_packages:
|
|
try:
|
|
req = install_requirement_parse(req_spec, "command line")
|
|
ignored_requirements.append(req)
|
|
except Exception as ex:
|
|
raise RequirementException("Cannot parse `%s': %s" % (req_spec, ex))
|
|
forced_requirements = []
|
|
for req_spec in options.force_packages:
|
|
try:
|
|
req = install_requirement_parse(req_spec, "command line")
|
|
forced_requirements.append(req)
|
|
except Exception as ex:
|
|
raise RequirementException("Cannot parse `%s': %s" % (req_spec, ex))
|
|
return (all_requirements, ignored_requirements, forced_requirements)
|
|
|
|
|
|
def join_requirements(requirements, ignored_requirements, forced_requirements):
|
|
skip_keys = set(pkg.req.key for pkg in ignored_requirements)
|
|
incompatibles = {}
|
|
joined_requirements = {}
|
|
forced = set(pkg.req.key for pkg in forced_requirements)
|
|
for (req_key, req_list) in six.iteritems(requirements):
|
|
if req_key in skip_keys:
|
|
continue
|
|
forced_req = None
|
|
if req_key in forced:
|
|
forced_matches = [pkg for pkg in forced_requirements
|
|
if pkg.req.key == req_key]
|
|
# Take the last forced match...
|
|
forced_req = forced_matches[-1]
|
|
req_matches, req_incompatibles = best_match(req_key, req_list,
|
|
forced_req=forced_req)
|
|
joined_requirements[req_key] = req_matches
|
|
if req_incompatibles:
|
|
incompatibles[req_key] = req_incompatibles
|
|
return (joined_requirements, incompatibles)
|
|
|
|
|
|
def print_requirements(joined_requirements):
|
|
formatted_requirements = []
|
|
for req_key in sorted(six.iterkeys(joined_requirements)):
|
|
req = joined_requirements[req_key][0]
|
|
req_prefix = ""
|
|
if req.editable:
|
|
req_prefix = "-e "
|
|
if req.url:
|
|
req = "%s#egg=%s" % (req.url, req.req)
|
|
else:
|
|
req = str(req.req)
|
|
formatted_requirements.append("%s%s" % (req_prefix, req))
|
|
for req in formatted_requirements:
|
|
print(req)
|
|
|
|
|
|
def print_incompatibles(incompatibles, joined_requirements):
|
|
for req_key in sorted(six.iterkeys(incompatibles)):
|
|
req_incompatibles = incompatibles[req_key]
|
|
if not req_incompatibles:
|
|
continue
|
|
print("%s: incompatible requirements" % (req_key),
|
|
file=sys.stderr)
|
|
chosen_reqs = joined_requirements.get(req_key, [])
|
|
if chosen_reqs:
|
|
print("Choosing:", file=sys.stderr)
|
|
for chosen in chosen_reqs:
|
|
print("\t%s: %s" % (chosen.comes_from,
|
|
install_requirement_str(chosen)),
|
|
file=sys.stderr)
|
|
print("Conflicting:", file=sys.stderr)
|
|
for conflicting in req_incompatibles:
|
|
print("\t%s: %s" % (conflicting.comes_from,
|
|
install_requirement_str(conflicting)),
|
|
file=sys.stderr)
|
|
|
|
|
|
def main():
|
|
parser = create_parser()
|
|
options = parser.parse_args()
|
|
setup_logging(options)
|
|
try:
|
|
requirements, ignored_requirements, forced_requirements = parse_requirements(options)
|
|
except RequirementException as ex:
|
|
LOGGER.error("Requirement failure: %s", ex)
|
|
sys.exit(BAD_REQUIREMENTS)
|
|
else:
|
|
joined_requirements, incompatibles = join_requirements(requirements,
|
|
ignored_requirements,
|
|
forced_requirements)
|
|
print_incompatibles(incompatibles, joined_requirements)
|
|
print_requirements(joined_requirements)
|
|
if incompatibles:
|
|
sys.exit(INCOMPATIBLE_REQUIREMENTS)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|