Viewing file: resolvers.py (17.13 KB) -rw-r--r-- Select action/file-type: (+) | (+) | (+) | Code (+) | Session (+) | (+) | SDB (+) | (+) | (+) | (+) | (+) | (+) |
import collections import operator
from .providers import AbstractResolver from .structs import DirectedGraph, IteratorMapping, build_iter_view
RequirementInformation = collections.namedtuple( "RequirementInformation", ["requirement", "parent"] )
class ResolverException(Exception): """A base class for all exceptions raised by this module.
Exceptions derived by this class should all be handled in this module. Any bubbling pass the resolver should be treated as a bug. """
class RequirementsConflicted(ResolverException): def __init__(self, criterion): super(RequirementsConflicted, self).__init__(criterion) self.criterion = criterion
def __str__(self): return "Requirements conflict: {}".format( ", ".join(repr(r) for r in self.criterion.iter_requirement()), )
class InconsistentCandidate(ResolverException): def __init__(self, candidate, criterion): super(InconsistentCandidate, self).__init__(candidate, criterion) self.candidate = candidate self.criterion = criterion
def __str__(self): return "Provided candidate {!r} does not satisfy {}".format( self.candidate, ", ".join(repr(r) for r in self.criterion.iter_requirement()), )
class Criterion(object): """Representation of possible resolution results of a package.
This holds three attributes:
* `information` is a collection of `RequirementInformation` pairs. Each pair is a requirement contributing to this criterion, and the candidate that provides the requirement. * `incompatibilities` is a collection of all known not-to-work candidates to exclude from consideration. * `candidates` is a collection containing all possible candidates deducted from the union of contributing requirements and known incompatibilities. It should never be empty, except when the criterion is an attribute of a raised `RequirementsConflicted` (in which case it is always empty).
.. note:: This class is intended to be externally immutable. **Do not** mutate any of its attribute containers. """
def __init__(self, candidates, information, incompatibilities): self.candidates = candidates self.information = information self.incompatibilities = incompatibilities
def __repr__(self): requirements = ", ".join( "({!r}, via={!r})".format(req, parent) for req, parent in self.information ) return "Criterion({})".format(requirements)
def iter_requirement(self): return (i.requirement for i in self.information)
def iter_parent(self): return (i.parent for i in self.information)
class ResolutionError(ResolverException): pass
class ResolutionImpossible(ResolutionError): def __init__(self, causes): super(ResolutionImpossible, self).__init__(causes) # causes is a list of RequirementInformation objects self.causes = causes
class ResolutionTooDeep(ResolutionError): def __init__(self, round_count): super(ResolutionTooDeep, self).__init__(round_count) self.round_count = round_count
# Resolution state in a round. State = collections.namedtuple("State", "mapping criteria backtrack_causes")
class Resolution(object): """Stateful resolution object.
This is designed as a one-off object that holds information to kick start the resolution process, and holds the results afterwards. """
def __init__(self, provider, reporter): self._p = provider self._r = reporter self._states = []
@property def state(self): try: return self._states[-1] except IndexError: raise AttributeError("state")
def _push_new_state(self): """Push a new state into history.
This new state will be used to hold resolution results of the next coming round. """ base = self._states[-1] state = State( mapping=base.mapping.copy(), criteria=base.criteria.copy(), backtrack_causes=base.backtrack_causes[:], ) self._states.append(state)
def _add_to_criteria(self, criteria, requirement, parent): self._r.adding_requirement(requirement=requirement, parent=parent)
identifier = self._p.identify(requirement_or_candidate=requirement) criterion = criteria.get(identifier) if criterion: incompatibilities = list(criterion.incompatibilities) else: incompatibilities = []
matches = self._p.find_matches( identifier=identifier, requirements=IteratorMapping( criteria, operator.methodcaller("iter_requirement"), {identifier: [requirement]}, ), incompatibilities=IteratorMapping( criteria, operator.attrgetter("incompatibilities"), {identifier: incompatibilities}, ), )
if criterion: information = list(criterion.information) information.append(RequirementInformation(requirement, parent)) else: information = [RequirementInformation(requirement, parent)]
criterion = Criterion( candidates=build_iter_view(matches), information=information, incompatibilities=incompatibilities, ) if not criterion.candidates: raise RequirementsConflicted(criterion) criteria[identifier] = criterion
def _get_preference(self, name): return self._p.get_preference( identifier=name, resolutions=self.state.mapping, candidates=IteratorMapping( self.state.criteria, operator.attrgetter("candidates"), ), information=IteratorMapping( self.state.criteria, operator.attrgetter("information"), ), backtrack_causes=self.state.backtrack_causes, )
def _is_current_pin_satisfying(self, name, criterion): try: current_pin = self.state.mapping[name] except KeyError: return False return all( self._p.is_satisfied_by(requirement=r, candidate=current_pin) for r in criterion.iter_requirement() )
def _get_updated_criteria(self, candidate): criteria = self.state.criteria.copy() for requirement in self._p.get_dependencies(candidate=candidate): self._add_to_criteria(criteria, requirement, parent=candidate) return criteria
def _attempt_to_pin_criterion(self, name): criterion = self.state.criteria[name]
causes = [] for candidate in criterion.candidates: try: criteria = self._get_updated_criteria(candidate) except RequirementsConflicted as e: causes.append(e.criterion) continue
# Check the newly-pinned candidate actually works. This should # always pass under normal circumstances, but in the case of a # faulty provider, we will raise an error to notify the implementer # to fix find_matches() and/or is_satisfied_by(). satisfied = all( self._p.is_satisfied_by(requirement=r, candidate=candidate) for r in criterion.iter_requirement() ) if not satisfied: raise InconsistentCandidate(candidate, criterion)
self._r.pinning(candidate=candidate) self.state.criteria.update(criteria)
# Put newly-pinned candidate at the end. This is essential because # backtracking looks at this mapping to get the last pin. self.state.mapping.pop(name, None) self.state.mapping[name] = candidate
return []
# All candidates tried, nothing works. This criterion is a dead # end, signal for backtracking. return causes
def _backtrack(self): """Perform backtracking.
When we enter here, the stack is like this::
[ state Z ] [ state Y ] [ state X ] .... earlier states are irrelevant.
1. No pins worked for Z, so it does not have a pin. 2. We want to reset state Y to unpinned, and pin another candidate. 3. State X holds what state Y was before the pin, but does not have the incompatibility information gathered in state Y.
Each iteration of the loop will:
1. Discard Z. 2. Discard Y but remember its incompatibility information gathered previously, and the failure we're dealing with right now. 3. Push a new state Y' based on X, and apply the incompatibility information from Y to Y'. 4a. If this causes Y' to conflict, we need to backtrack again. Make Y' the new Z and go back to step 2. 4b. If the incompatibilities apply cleanly, end backtracking. """ while len(self._states) >= 3: # Remove the state that triggered backtracking. del self._states[-1]
# Retrieve the last candidate pin and known incompatibilities. broken_state = self._states.pop() name, candidate = broken_state.mapping.popitem() incompatibilities_from_broken = [ (k, list(v.incompatibilities)) for k, v in broken_state.criteria.items() ]
# Also mark the newly known incompatibility. incompatibilities_from_broken.append((name, [candidate]))
self._r.backtracking(candidate=candidate)
# Create a new state from the last known-to-work one, and apply # the previously gathered incompatibility information. def _patch_criteria(): for k, incompatibilities in incompatibilities_from_broken: if not incompatibilities: continue try: criterion = self.state.criteria[k] except KeyError: continue matches = self._p.find_matches( identifier=k, requirements=IteratorMapping( self.state.criteria, operator.methodcaller("iter_requirement"), ), incompatibilities=IteratorMapping( self.state.criteria, operator.attrgetter("incompatibilities"), {k: incompatibilities}, ), ) candidates = build_iter_view(matches) if not candidates: return False incompatibilities.extend(criterion.incompatibilities) self.state.criteria[k] = Criterion( candidates=candidates, information=list(criterion.information), incompatibilities=incompatibilities, ) return True
self._push_new_state() success = _patch_criteria()
# It works! Let's work on this new state. if success: return True
# State does not work after applying known incompatibilities. # Try the still previous state.
# No way to backtrack anymore. return False
def resolve(self, requirements, max_rounds): if self._states: raise RuntimeError("already resolved")
self._r.starting()
# Initialize the root state. self._states = [ State( mapping=collections.OrderedDict(), criteria={}, backtrack_causes=[], ) ] for r in requirements: try: self._add_to_criteria(self.state.criteria, r, parent=None) except RequirementsConflicted as e: raise ResolutionImpossible(e.criterion.information)
# The root state is saved as a sentinel so the first ever pin can have # something to backtrack to if it fails. The root state is basically # pinning the virtual "root" package in the graph. self._push_new_state()
for round_index in range(max_rounds): self._r.starting_round(index=round_index)
unsatisfied_names = [ key for key, criterion in self.state.criteria.items() if not self._is_current_pin_satisfying(key, criterion) ]
# All criteria are accounted for. Nothing more to pin, we are done! if not unsatisfied_names: self._r.ending(state=self.state) return self.state
# Choose the most preferred unpinned criterion to try. name = min(unsatisfied_names, key=self._get_preference) failure_causes = self._attempt_to_pin_criterion(name)
if failure_causes: # Backtrack if pinning fails. The backtrack process puts us in # an unpinned state, so we can work on it in the next round. success = self._backtrack() self.state.backtrack_causes[:] = [ i for c in failure_causes for i in c.information ]
# Dead ends everywhere. Give up. if not success: raise ResolutionImpossible(self.state.backtrack_causes) else: # Pinning was successful. Push a new state to do another pin. self._push_new_state()
self._r.ending_round(index=round_index, state=self.state)
raise ResolutionTooDeep(max_rounds)
def _has_route_to_root(criteria, key, all_keys, connected): if key in connected: return True if key not in criteria: return False for p in criteria[key].iter_parent(): try: pkey = all_keys[id(p)] except KeyError: continue if pkey in connected: connected.add(key) return True if _has_route_to_root(criteria, pkey, all_keys, connected): connected.add(key) return True return False
Result = collections.namedtuple("Result", "mapping graph criteria")
def _build_result(state): mapping = state.mapping all_keys = {id(v): k for k, v in mapping.items()} all_keys[id(None)] = None
graph = DirectedGraph() graph.add(None) # Sentinel as root dependencies' parent.
connected = {None} for key, criterion in state.criteria.items(): if not _has_route_to_root(state.criteria, key, all_keys, connected): continue if key not in graph: graph.add(key) for p in criterion.iter_parent(): try: pkey = all_keys[id(p)] except KeyError: continue if pkey not in graph: graph.add(pkey) graph.connect(pkey, key)
return Result( mapping={k: v for k, v in mapping.items() if k in connected}, graph=graph, criteria=state.criteria, )
class Resolver(AbstractResolver): """The thing that performs the actual resolution work."""
base_exception = ResolverException
def resolve(self, requirements, max_rounds=100): """Take a collection of constraints, spit out the resolution result.
The return value is a representation to the final resolution result. It is a tuple subclass with three public members:
* `mapping`: A dict of resolved candidates. Each key is an identifier of a requirement (as returned by the provider's `identify` method), and the value is the resolved candidate. * `graph`: A `DirectedGraph` instance representing the dependency tree. The vertices are keys of `mapping`, and each edge represents *why* a particular package is included. A special vertex `None` is included to represent parents of user-supplied requirements. * `criteria`: A dict of "criteria" that hold detailed information on how edges in the graph are derived. Each key is an identifier of a requirement, and the value is a `Criterion` instance.
The following exceptions may be raised if a resolution cannot be found:
* `ResolutionImpossible`: A resolution cannot be found for the given combination of requirements. The `causes` attribute of the exception is a list of (requirement, parent), giving the requirements that could not be satisfied. * `ResolutionTooDeep`: The dependency tree is too deeply nested and the resolver gave up. This is usually caused by a circular dependency, but you can try to resolve this by increasing the `max_rounds` argument. """ resolution = Resolution(self.provider, self.reporter) state = resolution.resolve(requirements, max_rounds=max_rounds) return _build_result(state)
|