Commit 1b92e35c by Sébastien Eustace

Cleanup code

parent bb32ad22
......@@ -33,7 +33,7 @@ If you do not specify a version constraint, poetry will choose a suitable one ba
def handle(self):
from poetry.installation import Installer
from poetry.semver.semver import parse_constraint
from poetry.semver import parse_constraint
packages = self.argument('name')
is_dev = self.option('dev')
......
......@@ -22,7 +22,7 @@ class DebugResolveCommand(Command):
from poetry.packages import ProjectPackage
from poetry.puzzle import Solver
from poetry.repositories.repository import Repository
from poetry.semver.semver import parse_constraint
from poetry.semver import parse_constraint
packages = self.argument('package')
......
......@@ -22,7 +22,6 @@ class SelfUpdateCommand(Command):
def handle(self):
from poetry.__version__ import __version__
from poetry.repositories.pypi_repository import PyPiRepository
from poetry.semver.comparison import less_than
version = self.argument('version')
if not version:
......@@ -38,7 +37,7 @@ class SelfUpdateCommand(Command):
key=cmp_to_key(
lambda x, y:
0 if x.version == y.version
else -1 * int(less_than(x.version, y.version) or -1)
else int(x.version < y.version or -1)
)
)
......
......@@ -253,7 +253,7 @@ lists all packages available."""
)
def get_update_status(self, latest, package):
from poetry.semver.semver import parse_constraint
from poetry.semver import parse_constraint
if latest.full_pretty_version == package.full_pretty_version:
return 'up-to-date'
......
......@@ -28,10 +28,9 @@ patch, minor, major, prepatch, preminor, premajor, prerelease.
def handle(self):
version = self.argument('version')
if version in self.RESERVED:
version = self.increment_version(
self.poetry.package.pretty_version, version
)
version = self.increment_version(
self.poetry.package.pretty_version, version
)
self.line(
'Bumping version from <comment>{}</> to <info>{}</>'.format(
......@@ -46,88 +45,42 @@ patch, minor, major, prepatch, preminor, premajor, prerelease.
self.poetry.file.write(content)
def increment_version(self, version, rule):
from poetry.semver.version_parser import VersionParser
from poetry.semver import Version
parser = VersionParser()
version_regex = (
'v?(\d+)(?:\.(\d+))?(?:\.(\d+))?(?:\.(\d+))?{}(?:\+[^\s]+)?'
).format(parser._modifier_regex)
m = re.match(version_regex, version)
if not m:
try:
version = Version.parse(version)
except ValueError:
raise ValueError(
'The project\'s version doesn\'t seem to follow semver'
)
if m.group(3):
index = 2
elif m.group(2):
index = 1
else:
index = 0
matches = m.groups()[:index+1]
base = '.'.join(matches)
extra_matches = list(g or '' for g in m.groups()[4:])
extras = version[len('.'.join(matches)):]
increment = 1
is_prerelease = (extra_matches[0] or extra_matches[1]) != ''
bump_prerelease = rule in {
'premajor', 'preminor', 'prepatch', 'prerelease'
}
position = -1
if rule in {'major', 'premajor'}:
if m.group(1) != '0' or m.group(2) != '0' or not is_prerelease:
position = 0
new = version.next_major
if rule == 'premajor':
new = new.first_prerelease
elif rule in {'minor', 'preminor'}:
if m.group(2) != '0' or not is_prerelease:
position = 1
new = version.next_minor
if rule == 'preminor':
new = new.first_prerelease
elif rule in {'patch', 'prepatch'}:
if not is_prerelease:
position = 2
elif rule == 'prerelease' and not is_prerelease:
position = 2
if position != -1:
extra_matches[0] = None
base = parser._manipulate_version_string(
matches,
position,
increment=increment
)
if bump_prerelease:
# We bump the prerelease part of the version
sep = ''
if not extra_matches[0]:
extra_matches[0] = 'a'
extra_matches[1] = '0'
sep = ''
new = version.next_patch
if rule == 'prepatch':
new = new.first_prerelease
elif rule == 'prerelease':
if version.is_prerelease():
pre = version.prerelease
new_prerelease = int(pre[1]) + 1
new = Version.parse(
'{}.{}.{}-{}'.format(
version.major,
version.minor,
version.patch,
'.'.join([pre[0], str(new_prerelease)])
)
)
else:
if extras.startswith(('.', '_', '-')):
sep = extras[0]
prerelease = extra_matches[1]
if not prerelease:
prerelease = '.1'
psep = ''
if prerelease.startswith(('.', '-')):
psep = prerelease[0]
prerelease = prerelease[1:]
new_prerelease = str(int(prerelease) + 1)
extra_matches[1] = '{}{}'.format(psep, new_prerelease)
extras = '{}{}{}{}'.format(
sep,
extra_matches[0],
extra_matches[1],
extra_matches[2]
)
new = version.next_patch.first_prerelease
else:
extras = ''
new = rule
return '.'.join(base.split('.')[:max(index, position)+1]) + extras
return new
......@@ -15,8 +15,8 @@ from poetry.puzzle.operations.operation import Operation
from poetry.repositories import Pool
from poetry.repositories import Repository
from poetry.repositories.installed_repository import InstalledRepository
from poetry.semver.semver import parse_constraint
from poetry.semver.semver import Version
from poetry.semver import parse_constraint
from poetry.semver import Version
from poetry.utils.helpers import canonicalize_name
from .base_installer import BaseInstaller
......
......@@ -13,7 +13,7 @@ from base64 import urlsafe_b64encode
from io import StringIO
from poetry.__version__ import __version__
from poetry.semver.semver import parse_constraint
from poetry.semver import parse_constraint
from poetry.utils._compat import Path
from ..utils.helpers import normalize_file_permissions
......
from .dependency_graph import DependencyGraph
from .resolver import Resolver
from .version_solver import VersionSolver
def resolve_version(root, provider, locked=None, use_latest=None):
solver = VersionSolver(root, provider, locked=locked, use_latest=use_latest)
with provider.progress():
return solver.solve()
class Conflict:
def __init__(self,
requirement,
requirements,
existing,
possibility_set,
locked_requirement,
requirement_trees,
activated_by_name,
underlying_error):
self.requirement = requirement
self.requirements = requirements
self.existing = existing
self.possibility_set = possibility_set
self.locked_requirement = locked_requirement
self.requirement_trees = requirement_trees,
self.activated_by_name = activated_by_name
self.underlying_error = underlying_error
@property
def possibility(self):
if self.possibility_set and self.possibility_set.latest_version:
return self.possibility_set.latest_version
from .specification_provider import SpecificationProvider
from .ui import UI
from typing import Any
from typing import Dict
from typing import List
from ..conflict import Conflict
from ..dependency_graph import DependencyGraph
class SpecificationProvider(object):
"""
Provides information about specifcations and dependencies to the resolver,
allowing the Resolver class to remain generic while still providing power
and flexibility.
This contract contains the methods
that users of Molinillo must implement
using knowledge of their own model classes.
"""
@property
def name_for_explicit_dependency_source(self): # type: () -> str
return 'user-specified dependency'
@property
def name_for_locking_dependency_source(self): # type: () -> str
return 'Lockfile'
def search_for(self, dependency): # type: (Any) -> List[Any]
"""
Search for the specifications that match the given dependency.
The specifications in the returned list will be considered in reverse
order, so the latest version ought to be last.
"""
return []
def dependencies_for(self, specification): # type: (Any) -> List[Any]
"""
Returns the dependencies of specification.
"""
return []
def is_requirement_satisfied_by(self,
requirement, # type: Any
activated, # type: DependencyGraph
spec # type: Any
): # type: (...) -> Any
"""
Determines whether the given requirement is satisfied by the given
spec, in the context of the current activated dependency graph.
"""
return True
def name_for(self, dependency): # type: (Any) -> str
"""
Returns the name for the given dependency.
"""
return str(dependency)
def sort_dependencies(self,
dependencies, # type: List[Any]
activated, # type: DependencyGraph
conflicts # type: Dict[str, List[Conflict]]
): # type: (...) -> List[Any]
"""
Sort dependencies so that the ones
that are easiest to resolve are first.
Easiest to resolve is (usually) defined by:
1) Is this dependency already activated?
2) How relaxed are the requirements?
3) Are there any conflicts for this dependency?
4) How many possibilities are there to satisfy this dependency?
"""
return sorted(
dependencies,
key=lambda dep: (
activated.vertex_named(self.name_for(dep)).payload is None,
conflicts.get(self.name_for(dep) is None)
)
)
def allow_missing(self, dependency): # type: (Any) -> bool
"""
Returns whether this dependency, which has no possible matching
specifications, can safely be ignored.
"""
return False
import sys
class UI(object):
def __init__(self, debug=False):
self._debug = debug
@property
def output(self):
return sys.stdout
@property
def progress_rate(self): # type: () -> float
return 0.33
def is_debugging(self): # type: () -> bool
return self._debug
def indicate_progress(self): # type: () -> None
self.output.write('.')
def before_resolution(self): # type: () -> None
self.output.write('Resolving dependencies...\n')
def after_resolution(self): # type: () -> None
self.output.write('')
def debug(self, message, depth): # type: (...) -> None
if self.is_debugging():
debug_info = str(message)
debug_info = '\n'.join([
':{}: {}'.format(str(depth).rjust(4), s)
for s in debug_info.split('\n')
]) + '\n'
self.output.write(debug_info)
from .exceptions import CircularDependencyError
from .graph.log import Log
class DependencyGraph:
def __init__(self):
self._vertices = {}
self._log = Log()
@property
def vertices(self):
return self._vertices
@property
def log(self):
return self._log
def tag(self, tag):
return self._log.tag(self, tag)
def rewind_to(self, tag):
return self._log.rewind_to(self, tag)
def add_child_vertex(self, name, payload, parent_names, requirement):
root = True
try:
parent_names.index(None)
except ValueError:
root = False
parent_names = [n for n in parent_names if n is not None]
vertex = self.add_vertex(name, payload, root)
if root:
vertex.explicit_requirements.append(requirement)
for parent_name in parent_names:
parent_vertex = self.vertex_named(parent_name)
self.add_edge(parent_vertex, vertex, requirement)
return vertex
def add_vertex(self, name, payload, root=False):
return self._log.add_vertex(self, name, payload, root)
def detach_vertex_named(self, name):
return self._log.detach_vertex_named(self, name)
def vertex_named(self, name):
return self.vertices.get(name)
def root_vertex_named(self, name):
vertex = self.vertex_named(name)
if vertex and vertex.root:
return vertex
def add_edge(self, origin, destination, requirement):
if destination.has_path_to(origin):
raise CircularDependencyError([origin, destination])
return self.add_edge_no_circular(origin, destination, requirement)
def add_edge_no_circular(self, origin, destination, requirement):
self._log.add_edge_no_circular(
self,
origin.name, destination.name,
requirement
)
def delete_edge(self, edge):
return self._log.delete_edge(
self,
edge.origin.name,
edge.destination.name,
edge.requirement
)
def set_payload(self, name, payload):
return self._log.set_payload(self, name, payload)
def to_dot(self):
dot_vertices = []
dot_edges = []
for n, v in self.vertices.items():
dot_vertices.append(
' {} [label="{}|{}"]'.format(n, n, v.payload or '')
)
for e in v.outgoing_edges:
label = e.requirement
dot_edges.append(
' {} -> {} [label="{}"]'.format(
e.origin.name,
e.destination.name,
label
)
)
dot_vertices = sorted(set(dot_vertices))
dot_edges = sorted(set(dot_edges))
dot_vertices.insert(0, 'digraph G {')
dot_vertices.append('')
dot_edges.append('}')
dot = dot_vertices + dot_edges
return '\n'.join(dot)
def __iter__(self):
return iter(self.vertices.values())
from .helpers import flat_map
class ResolverError(Exception):
pass
class NoSuchDependencyError(ResolverError):
def __init__(self, dependency, required_by=None):
if required_by is None:
required_by = []
sources = ' and '.join(['"{}"'.format(r) for r in required_by])
message = 'Unable to find a specification for "{}"'.format(dependency)
if sources:
message += ' depended upon by {}'.format(sources)
super(NoSuchDependencyError, self).__init__(message)
class CircularDependencyError(ResolverError):
def __init__(self, vertices):
super(CircularDependencyError, self).__init__(
'There is a circular dependency between {}'.format(
' and '.join([v.name for v in vertices])
)
)
self._dependencies = [v.payload.possibilities[-1] for v in vertices]
@property
def dependencies(self):
return self._dependencies
class VersionConflict(ResolverError):
def __init__(self, conflicts, specification_provider):
pairs = []
for conflicting in flat_map(
list(conflicts.values()), lambda x: x.requirements
):
for source, conflict_requirements in conflicting.items():
for c in conflict_requirements:
pairs.append((c, source))
super(VersionConflict, self).__init__(
'Unable to satisfy the following requirements:\n\n'
'{}'.format(
'\n'.join('- "{}" required by "{}"'.format(r, d)
for r, d in pairs)
)
)
self._conflicts = conflicts
self._specification_provider = specification_provider
@property
def conflicts(self):
return self._conflicts
@property
def specification_provider(self):
return self._specification_provider
def message_with_trees(self,
solver_name='Poetry',
possibility_type='possibility named',
reduce_trees=lambda trees: sorted(set(trees), key=str),
printable_requirement=str,
message_for_conflict=None,
version_for_spec=str):
o = []
for name, conflict in sorted(self._conflicts):
o.append(
'\n{} could not find compatible versions for {} "{}"_n'.format(
solver_name, possibility_type, name
)
)
if conflict.locked_requirement:
o.append(
' In snapshot ({}):\n'.format(
self._specification_provider.name_for_locking_dependency_source
)
)
o.append(
' {}\n'.format(
printable_requirement(conflict.locked_requirement)
)
)
o.append('\n')
o.append(
' In {}:\n'.format(
self._specification_provider.name_for_explicit_dependency_source
)
)
trees = reduce_trees(conflict.requirement_trees)
ot = []
for tree in trees:
t = ''
depth = 2
for req in tree:
t += ' ' * depth + str(req)
if tree[-1] != req:
spec = conflict.activated_by_name.get(
self._specification_provider.name_for(req)
)
if spec:
t += ' was resolved to {}, which'.format(
version_for_spec(spec)
)
t += ' depends on'
t += '\n'
depth += 1
ot.append(t)
o.append('\n'.join(ot))
if message_for_conflict:
message_for_conflict(o, name, conflict)
return ''.join(o).strip()
from typing import Any
class Action(object):
def __init__(self):
self.previous = None
self.next = None
@property
def action_name(self): # type: () -> str
raise NotImplementedError()
def up(self, graph): # type: (DependencyGraph) -> Any
"""
Performs the action on the given graph.
"""
raise NotImplementedError()
def down(self, graph): # type: (DependencyGraph) -> None
"""
Reverses the action on the given graph.
"""
raise NotImplementedError()
from .action import Action
from .edge import Edge
class AddEdgeNoCircular(Action):
def __init__(self, origin, destination, requirement):
super(AddEdgeNoCircular, self).__init__()
self._origin = origin
self._destination = destination
self._requirement = requirement
@property
def action_name(self):
return 'add_edge_no_circular'
@property
def origin(self):
return self._origin
@property
def destination(self):
return self._destination
@property
def requirement(self):
return self._requirement
def up(self, graph):
edge = self.make_edge(graph)
edge.origin.outgoing_edges.append(edge)
edge.destination.incoming_edges.append(edge)
return edge
def down(self, graph):
edge = self.make_edge(graph)
self._delete_first(edge.origin.outgoing_edges, edge)
self._delete_first(edge.destination.incoming_edges, edge)
def make_edge(self, graph):
return Edge(
graph.vertex_named(self._origin),
graph.vertex_named(self._destination),
self._requirement
)
def _delete_first(self, elements, element):
"""
:type elements: list
"""
try:
index = elements.index(element)
except ValueError:
return
del elements[index]
from .action import Action
from .vertex import Vertex
_NULL = object()
class AddVertex(Action):
def __init__(self, name, payload, root):
super(AddVertex, self).__init__()
self._name = name
self._payload = payload
self._root = root
self._existing_payload = _NULL
self._existing_root = None
@property
def action_name(self):
return 'add_vertex'
@property
def name(self):
return self._name
@property
def payload(self):
return self._payload
@property
def root(self):
return self._root
def up(self, graph):
existing = graph.vertices.get(self._name)
if existing:
self._existing_payload = existing.payload
self._existing_root = existing.root
vertex = existing or Vertex(self._name, self._payload)
graph.vertices[vertex.name] = vertex
if not vertex.payload:
vertex.payload = self.payload
if not vertex.root:
vertex.root = self.root
return vertex
def down(self, graph):
if self._existing_payload is not _NULL:
vertex = graph.vertices[self._name]
vertex.payload = self._existing_payload
vertex.root = self._existing_root
else:
del graph.vertices[self._name]
from .action import Action
from .edge import Edge
class DeleteEdge(Action):
def __init__(self, origin, destination, requirement):
super(DeleteEdge, self).__init__()
self._origin = origin
self._destination = destination
self._requirement = requirement
@property
def action_name(self):
return 'delete_edge'
@property
def origin(self):
return self._origin
@property
def destination(self):
return self._destination
@property
def requirement(self):
return self._requirement
def up(self, graph):
edge = self.make_edge(graph)
self._delete_first(edge.origin.outgoing_edges, edge)
self._delete_first(edge.destination.incoming_edges, edge)
return edge
def down(self, graph):
edge = self.make_edge(graph)
edge.origin.outgoing_edges.append(edge)
edge.origin.incoming_edges.append(edge)
def make_edge(self, graph):
return Edge(
graph.vertex_named(self._origin),
graph.vertex_named(self._destination),
self._requirement
)
def _delete_first(self, elements, element):
"""
:type elements: list
"""
try:
index = elements.index(element)
except ValueError:
return
del elements[index]
from .action import Action
class DetachVertexNamed(Action):
def __init__(self, name):
super(DetachVertexNamed, self).__init__()
self._name = name
self._vertex = None
@property
def action_name(self):
return 'detach_vertex'
@property
def name(self):
return self._name
def up(self, graph):
if self._name not in graph.vertices:
return []
self._vertex = graph.vertices[self._name]
del graph.vertices[self._name]
removed_vertices = [self._vertex]
for e in self._vertex.outgoing_edges:
v = e.destination
try:
v.incoming_edges.remove(e)
except ValueError:
pass
if not v.root and not v.incoming_edges:
removed_vertices += graph.detach_vertex_named(v.name)
for e in self._vertex.incoming_edges:
v = e.origin
try:
v.outgoing_edges.remove(e)
except ValueError:
pass
return removed_vertices
def down(self, graph):
if self._vertex is None:
return
graph.vertices[self._vertex.name] = self._vertex
for e in self._vertex.outgoing_edges:
e.destination.incoming_edges.append(e)
for e in self._vertex.incoming_edges:
e.origin.outgoing_edges.append(e)
class Edge:
"""
A directed edge of a DependencyGraph
"""
def __init__(self, origin, destination, requirement):
self._origin = origin
self._destination = destination
self._requirement = requirement
@property
def origin(self):
return self._origin
@property
def destination(self):
return self._destination
@property
def requirement(self):
return self._requirement
def __eq__(self, other):
return self._origin == other.origin and self._destination == other.destination
def __repr__(self):
return '<Edge {} -> {}>'.format(
self._origin.name, self._destination.name
)
from .add_edge_no_circular import AddEdgeNoCircular
from .add_vertex import AddVertex
from .delete_edge import DeleteEdge
from .detach_vertex_named import DetachVertexNamed
from .set_payload import SetPayload
from .tag import Tag
class Log:
"""
A log for dependency graph actions.
"""
def __init__(self):
self._current_action = None
self._first_action = None
def tag(self, graph, tag):
"""
Tags the current state of the dependency as the given tag.
"""
return self._push_action(graph, Tag(tag))
def add_vertex(self, graph, name, payload, root):
return self._push_action(graph, AddVertex(name, payload, root))
def detach_vertex_named(self, graph, name):
return self._push_action(graph, DetachVertexNamed(name))
def add_edge_no_circular(self, graph, origin, destination, requirement):
action = AddEdgeNoCircular(origin, destination, requirement)
return self._push_action(graph, action)
def delete_edge(self, graph, origin, destination, requirement):
action = DeleteEdge(origin, destination, requirement)
return self._push_action(graph, action)
def set_payload(self, graph, name, payload):
return self._push_action(graph, SetPayload(name, payload))
def pop(self, graph):
action = self._current_action
if not action:
return
self._current_action = action.previous
if not self._current_action:
self._first_action = None
action.down(graph)
return action
def rewind_to(self, graph, tag):
while True:
action = self.pop(graph)
if not action:
raise ValueError('No tag "{}" found'.format(tag))
if isinstance(action, Tag) and action.tag == tag:
break
def _push_action(self, graph, action):
"""
Adds the given action to the log, running the action
:param graph: The graph
:param action: The action
:type action: Action
"""
action.previous = self._current_action
if self._current_action:
self._current_action.next = action
self._current_action = action
if not self._first_action:
self._first_action = action
return action.up(graph)
from .action import Action
class SetPayload(Action):
def __init__(self, name, payload):
super(SetPayload, self).__init__()
self._name = name
self._payload = payload
self._old_payload = None
@property
def action_name(self):
return 'set_payload'
@property
def name(self):
return self._name
@property
def payload(self):
return self._payload
def up(self, graph):
vertex = graph.vertex_named(self._name)
self._old_payload = vertex.payload
vertex.payload = self._payload
def down(self, graph):
graph.vertex_named(self._name).payload = self._old_payload
from .action import Action
class Tag(Action):
def __init__(self, tag):
super(Tag, self).__init__()
self._tag = tag
@property
def action_name(self):
return 'tag'
@property
def tag(self):
return self._tag
def up(self, graph):
pass
def down(self, graph):
pass
from ..utils import unique
class Vertex:
def __init__(self, name, payload):
self.name = name
self.payload = payload
self.root = False
self._explicit_requirements = []
self.outgoing_edges = []
self.incoming_edges = []
@property
def explicit_requirements(self):
return self._explicit_requirements
@property
def requirements(self):
return unique([
edge.requirement for edge in self.incoming_edges
] + self._explicit_requirements)
@property
def predecessors(self):
return [edge.origin for edge in self.incoming_edges]
@property
def recursive_predecessors(self):
return self._recursive_predecessors()
def _recursive_predecessors(self, vertices=None):
if vertices is None:
vertices = set()
for edge in self.incoming_edges:
vertex = edge.origin
if vertex in vertices:
continue
vertices.add(vertex)
vertex._recursive_predecessors(vertices)
return vertices
@property
def successors(self):
return [
edge.destination for edge in self.outgoing_edges
]
@property
def recursive_successors(self):
return self._recursive_successors()
def _recursive_successors(self, vertices=None):
if vertices is None:
vertices = set()
for edge in self.outgoing_edges:
vertex = edge.destination
if vertex in vertices:
continue
vertices.add(vertex)
vertex._recursive_successors(vertices)
return vertices
def __eq__(self, other):
if not isinstance(other, Vertex):
return NotImplemented
if self is other:
return True
return (
self.name == other.name
and self.payload == other.payload
and set(self.successors) == set(other.successors)
)
def __hash__(self):
return hash(self.name)
def has_path_to(self, other):
return (
self == other
or any([v.has_path_to(other) for v in self.successors])
)
def is_ancestor(self, other):
return other.path_to(self)
def __repr__(self):
return '<Vertex {} ({})>'.format(self.name, self.payload)
def flat_map(iter, callable):
if not isinstance(iter, (list, tuple)):
yield callable(iter)
else:
for v in iter:
for i in flat_map(v, callable):
yield i
......@@ -16,7 +16,7 @@ class PartialSolution:
# what's true for the eventual set of package versions that will comprise the
# total solution.
#
# See https://github.com/dart-lang/pub/tree/master/doc/solver.md#partial-solution.
# See https://github.com/dart-lang/mixology/tree/master/doc/solver.md#partial-solution.
"""
def __init__(self):
......
class PossibilitySet:
def __init__(self, dependencies, possibilities):
self.dependencies = dependencies
self.possibilities = possibilities
@property
def latest_version(self):
if self.possibilities:
return self.possibilities[-1]
def __str__(self):
return '[{}]'.format(', '.join([str(p) for p in self.possibilities]))
def __repr__(self):
return '<PossibilitySet {}>'.format(str(self))
# -*- coding: utf-8 -*-
import logging
from copy import copy
from datetime import datetime
from typing import Any
from typing import List
from .contracts import SpecificationProvider
from .contracts import UI
from .exceptions import CircularDependencyError
from .exceptions import VersionConflict
from .conflict import Conflict
from .dependency_graph import DependencyGraph
from .helpers import flat_map
from .possibility_set import PossibilitySet
from .state import DependencyState
from .unwind_details import UnwindDetails
from .utils import unique
logger = logging.getLogger(__name__)
class Resolution:
def __init__(self,
provider, # type: SpecificationProvider
ui, # type: UI
requested, # type: List[Any]
base # type: DependencyGraph
):
self._provider = provider
self._ui = ui
self._requested = requested
self._original_requested = copy(requested)
self._base = base
self._states = []
self._iteration_counter = 0
self._progress_rate = 0.33
self._iteration_rate = None
self._parents_of = {}
self._started_at = None
@property
def provider(self): # type: () -> SpecificationProvider
return self._provider
@property
def ui(self): # type: () -> UI
return self._ui
@property
def requested(self): # type: () -> List[Any]
return self._requested
@property
def base(self): # type: () -> DependencyGraph
return self._base
@property
def activated(self): # type: () -> DependencyGraph
return self.state.activated
def resolve(self): # type: () -> DependencyGraph
"""
Resolve the original requested dependencies into a full
dependency graph.
"""
self._start()
try:
while self.state:
if not self.state.requirement and not self.state.requirements:
break
self._indicate_progress()
if hasattr(self.state, 'pop_possibility_state'):
self._debug(
'Creating possibility state for {} ({} remaining)'
.format(
str(self.state.requirement),
len(self.state.possibilities)
)
)
s = self.state.pop_possibility_state()
if s:
self._states.append(s)
self.activated.tag(s)
self._process_topmost_state()
return self._resolve_activated_specs()
finally:
self._end()
def _start(self): # type: () -> None
"""
Set up the resolution process.
"""
self._started_at = datetime.now()
self._debug(
'Starting resolution ({})\nRequested dependencies: {}'.format(
self._started_at,
[str(d) for d in self._original_requested]
)
)
self._ui.before_resolution()
self._handle_missing_or_push_dependency_state(self._initial_state())
def _resolve_activated_specs(self): # type: () -> DependencyGraph
for vertex in self.activated.vertices.values():
if not vertex.payload:
continue
latest_version = None
for possibility in reversed(list(vertex.payload.possibilities)):
if all(
[
self._provider.is_requirement_satisfied_by(
req, self.activated, possibility
)
for req in vertex.requirements
]
):
latest_version = possibility
break
self.activated.set_payload(vertex.name, latest_version)
return self.activated
def _end(self): # type: () -> None
"""
Ends the resolution process
"""
elapsed = (datetime.now() - self._started_at).total_seconds()
self._ui.after_resolution()
self._debug(
'Finished resolution ({} steps) '
'in {:.3f} seconds'.format(
self._iteration_counter, elapsed
)
)
def _process_topmost_state(self): # type: () -> None
"""
Processes the topmost available RequirementState on the stack.
"""
try:
if self.possibility:
self._attempt_to_activate()
else:
self._create_conflict()
self._unwind_for_conflict()
except CircularDependencyError as e:
self._create_conflict(e)
self._unwind_for_conflict()
@property
def possibility(self): # type: () -> PossibilitySet
"""
The current possibility that the resolution is trying.
"""
if self.state.possibilities:
return self.state.possibilities[-1]
@property
def state(self): # type: () -> DependencyState
"""
The current state the resolution is operating upon.
"""
if self._states:
return self._states[-1]
@property
def name(self): # type: () -> str
return self.state.name
@property
def requirement(self): # type: () -> Any
return self.state.requirement
def _initial_state(self): # type: () -> DependencyState
"""
Create the initial state for the resolution, based upon the
requested dependencies.
"""
graph = DependencyGraph()
for requested in self._original_requested:
vertex = graph.add_vertex(
self._provider.name_for(requested), None, True
)
vertex.explicit_requirements.append(requested)
graph.tag('initial_state')
requirements = self._provider.sort_dependencies(
self._original_requested, graph, {}
)
initial_requirement = None
if requirements:
initial_requirement = requirements.pop(0)
name = None
if initial_requirement:
name = self._provider.name_for(initial_requirement)
return DependencyState(
name,
requirements,
graph,
initial_requirement,
self._possibilities_for_requirement(initial_requirement, graph),
0,
{},
[]
)
def _unwind_for_conflict(self): # type: () -> None
"""
Unwinds the states stack because a conflict has been encountered
"""
details_for_unwind = self._build_details_for_unwind()
unwind_options = self.state.unused_unwind_options
self._debug(
'Unwinding for conflict: '
'{} to {}'.format(
str(self.state.requirement),
details_for_unwind.state_index // 2
),
self.state.depth
)
conflicts = self.state.conflicts
sliced_states = self._states[details_for_unwind.state_index + 1:]
self._states = self._states[:details_for_unwind.state_index + 1]
self._raise_error_unless_state(conflicts)
if sliced_states:
self.activated.rewind_to(
sliced_states[0] or 'initial_state'
)
self.state.conflicts = conflicts
self.state.unused_unwind_options = unwind_options
self._filter_possibilities_after_unwind(details_for_unwind)
index = len(self._states) - 1
for k, l in self._parents_of.items():
self._parents_of[k] = [x for x in l if x < index]
self.state.unused_unwind_options = [
uw
for uw in self.state.unused_unwind_options
if uw.state_index < index
]
def _raise_error_unless_state(self, conflicts): # type: (dict) -> None
"""
Raise a VersionConflict error, or any underlying error,
if there is no current state
"""
if self.state:
return
errors = [c.underlying_error
for c in conflicts.values()
if c.underlying_error is not None]
if errors:
error = errors[0]
else:
error = VersionConflict(conflicts, self._provider)
raise error
def _build_details_for_unwind(self): # type: () -> UnwindDetails
"""
Return the details of the nearest index to which we could unwind.
"""
# Get the possible unwinds for the current conflict
current_conflict = self.state.conflicts[self.state.name]
binding_requirements = self._binding_requirements_for_conflict(
current_conflict
)
unwind_details = self._unwind_options_for_requirements(
binding_requirements
)
last_detail_for_current_unwind = sorted(unwind_details)[-1]
current_detail = last_detail_for_current_unwind
# Look for past conflicts that could be unwound to affect the
# requirement tree for the current conflict
relevant_unused_unwinds = []
for alternative in self.state.unused_unwind_options:
intersecting_requirements = (
set(last_detail_for_current_unwind.all_requirements)
&
set(alternative.requirements_unwound_to_instead)
)
if not intersecting_requirements:
continue
# Find the highest index unwind whilst looping through
if alternative > current_detail:
current_detail = alternative
relevant_unused_unwinds.append(alternative)
# Add the current unwind options to the `unused_unwind_options` array.
# The "used" option will be filtered out during `unwind_for_conflict`.
self.state.unused_unwind_options += [
detail
for detail in unwind_details
if detail.state_index != -1
]
# Update the requirements_unwound
# to_instead on any relevant unused unwinds
for d in relevant_unused_unwinds:
d.requirements_unwound_to_instead.append(
current_detail.state_requirement
)
for d in unwind_details:
d.requirements_unwound_to_instead.append(
current_detail.state_requirement
)
return current_detail
def _unwind_options_for_requirements(self, binding_requirements
): # type: (list) -> List[UnwindDetails]
unwind_details = []
trees = []
for r in reversed(binding_requirements):
partial_tree = [r]
trees.append(partial_tree)
unwind_details.append(
UnwindDetails(
-1, None, partial_tree, binding_requirements, trees, []
)
)
# If this requirement has alternative possibilities,
# check if any would satisfy the other requirements
# that created this conflict
requirement_state = self._find_state_for(r)
if self._conflict_fixing_possibilities(requirement_state,
binding_requirements):
unwind_details.append(
UnwindDetails(
self._states.index(requirement_state),
r,
partial_tree,
binding_requirements,
trees,
[]
)
)
# Next, look at the parent of this requirement,
# and check if the requirement could have been avoided
# if an alternative PossibilitySet had been chosen
parent_r = self._parent_of(r)
if parent_r is None:
continue
partial_tree.insert(0, parent_r)
requirement_state = self._find_state_for(parent_r)
possibilities = [
r.name in map(lambda x: x.name, set_.dependencies)
for set_ in requirement_state.possibilities
]
if any(possibilities):
unwind_details.append(
UnwindDetails(
self._states.index(requirement_state),
parent_r,
partial_tree,
binding_requirements,
trees,
[]
)
)
# Finally, look at the grandparent and up of this requirement,
# looking for any possibilities that wouldn't
# create their parent requirement
grandparent_r = self._parent_of(parent_r)
while grandparent_r is not None:
partial_tree.insert(0, grandparent_r)
requirement_state = self._find_state_for(grandparent_r)
possibilities = [
parent_r.name in map(lambda x: x.name, set_.dependencies)
for set_ in requirement_state.possibilities
]
if any(possibilities):
unwind_details.append(
UnwindDetails(
self._states.index(requirement_state),
grandparent_r,
partial_tree,
binding_requirements,
trees,
[]
)
)
parent_r = grandparent_r
grandparent_r = self._parent_of(parent_r)
return unwind_details
def _conflict_fixing_possibilities(self, state, binding_requirements):
"""
Return whether or not the given state has any possibilities
that could satisfy the given requirements
:rtype: bool
"""
if not state:
return False
return any([
any([
self._possibility_satisfies_requirements(
poss, binding_requirements
)
])
for possibility_set in state.possibilities
for poss in possibility_set.possibilities
])
def _filter_possibilities_after_unwind(self, unwind_details):
"""
Filter a state's possibilities to remove any that would not fix the
conflict we've just rewound from
:type unwind_details: UnwindDetails
"""
if not self.state or not self.state.possibilities:
return
if unwind_details.unwinding_to_primary_requirement():
self._filter_possibilities_for_primary_unwind(unwind_details)
else:
self._filter_possibilities_for_parent_unwind(unwind_details)
def _filter_possibilities_for_primary_unwind(self, unwind_details):
"""
Filter a state's possibilities to remove any that would not satisfy
the requirements in the conflict we've just rewound from.
:type unwind_details: UnwindDetails
"""
unwinds_to_state = [
uw
for uw in self.state.unused_unwind_options
if uw.state_index == unwind_details.state_index
]
unwinds_to_state.append(unwind_details)
unwind_requirement_sets = [
uw.conflicting_requirements
for uw in unwinds_to_state
]
possibilities = []
for possibility_set in self.state.possibilities:
if not any([
any([
self._possibility_satisfies_requirements(
poss, requirements
)
])
for poss in possibility_set.possibilities
for requirements in unwind_requirement_sets
]):
continue
possibilities.append(possibility_set)
self.state.possibilities = possibilities
def _possibility_satisfies_requirements(self, possibility, requirements):
name = self._provider.name_for(possibility)
self.activated.tag('swap')
if self.activated.vertex_named(name):
self.activated.set_payload(name, possibility)
satisfied = all([
self._provider.is_requirement_satisfied_by(
r, self.activated, possibility
)
for r in requirements
])
self.activated.rewind_to('swap')
return satisfied
def _filter_possibilities_for_parent_unwind(self,
unwind_details # type: UnwindDetails
):
"""
Filter a state's possibilities to remove any that would (eventually)
the requirements in the conflict we've just rewound from.
"""
unwinds_to_state = [
uw
for uw in self.state.unused_unwind_options
if uw.state_index == unwind_details.state_index
]
unwinds_to_state.append(unwind_details)
primary_unwinds = unique([
uw
for uw in unwinds_to_state
if uw.unwinding_to_primary_requirement()
])
parent_unwinds = unique(unwinds_to_state)
parent_unwinds = [uw for uw in parent_unwinds if uw not in primary_unwinds]
allowed_possibility_sets = []
for unwind in primary_unwinds:
for possibility_set in self._states[unwind.state_index].possibilities:
if any([
self._possibility_satisfies_requirements(
poss, unwind.conflicting_requirements
)
for poss in possibility_set.possibilities
]):
allowed_possibility_sets.append(possibility_set)
requirements_to_avoid = list(flat_map(
parent_unwinds,
lambda x: x.sub_dependencies_to_avoid
))
possibilities = []
for possibility_set in self.state.possibilities:
if (
possibility_set in allowed_possibility_sets
or [
r
for r in requirements_to_avoid
if r not in possibility_set.dependencies
]
):
possibilities.append(possibility_set)
self.state.possibilities = possibilities
def _binding_requirements_for_conflict(self, conflict):
"""
Return the minimal list of requirements that would cause the passed
conflict to occur.
:rtype: list
"""
if conflict.possibility is None:
return [conflict.requirement]
possible_binding_requirements_set = list(conflict.requirements.values())
possible_binding_requirements = []
for reqs in possible_binding_requirements_set:
if isinstance(reqs, list):
possible_binding_requirements += reqs
else:
possible_binding_requirements.append(reqs)
possible_binding_requirements = unique(possible_binding_requirements)
# When there’s a `CircularDependency` error the conflicting requirement
# (the one causing the circular) won’t be `conflict.requirement`
# (which won’t be for the right state, because we won’t have created it,
# because it’s circular).
# We need to make sure we have that requirement in the conflict’s list,
# otherwise we won’t be able to unwind properly, so we just return all
# the requirements for the conflict.
if conflict.underlying_error:
return possible_binding_requirements
possibilities = self._provider.search_for(conflict.requirement)
# If all the requirements together don't filter out all possibilities,
# then the only two requirements we need to consider are the initial one
# (where the dependency's version was first chosen) and the last
if self._binding_requirement_in_set(
None, possible_binding_requirements,
possibilities
):
return list(filter(None, [
conflict.requirement,
self._requirement_for_existing_name(
self._provider.name_for(conflict.requirement)
)
]))
# Loop through the possible binding requirements, removing each one
# that doesn't bind. Use a reversed as we want the earliest set of
# binding requirements.
binding_requirements = copy(possible_binding_requirements)
for req in reversed(possible_binding_requirements):
if req == conflict.requirement:
continue
if not self._binding_requirement_in_set(
req, binding_requirements, possibilities
):
index = binding_requirements.index(req)
del binding_requirements[index]
return binding_requirements
def _binding_requirement_in_set(self,
requirement,
possible_binding_requirements,
possibilities): # type: () -> bool
"""
Return whether or not the given requirement is required
to filter out all elements of the list of possibilities.
"""
return any([
self._possibility_satisfies_requirements(
poss,
set(possible_binding_requirements) - set([requirement])
)
for poss in possibilities
])
def _parent_of(self, requirement):
if not requirement:
return
if requirement not in self._parents_of:
self._parents_of[requirement] = []
if not self._parents_of[requirement]:
return
try:
index = self._parents_of[requirement][-1]
except ValueError:
return
try:
parent_state = self._states[index]
except ValueError:
return
return parent_state.requirement
def _requirement_for_existing_name(self, name):
vertex = self.activated.vertex_named(name)
if not vertex:
return
if not vertex.payload:
return
for s in self._states:
if s.name == name:
return s.requirement
def _find_state_for(self, requirement):
if not requirement:
return
for s in self._states:
if s.requirement == requirement:
return s
def _create_conflict(self, underlying_error=None):
vertex = self.activated.vertex_named(self.state.name)
locked_requirement = self._locked_requirement_named(self.state.name)
requirements = {}
if vertex.explicit_requirements:
requirements[self._provider.name_for_explicit_dependency_source] = vertex.explicit_requirements
if locked_requirement:
requirements[self._provider.name_for_locking_dependency_source] = [locked_requirement]
for edge in vertex.incoming_edges:
if edge.origin.payload.latest_version not in requirements:
requirements[edge.origin.payload.latest_version] = []
requirements[edge.origin.payload.latest_version].insert(0, edge.requirement)
activated_by_name = {}
for v in self.activated:
if v.payload:
activated_by_name[v.name] = v.payload.latest_version
conflict = Conflict(
self.requirement,
requirements,
vertex.payload.latest_version if vertex.payload else None,
self.possibility,
locked_requirement,
self.requirement_trees,
activated_by_name,
underlying_error
)
self.state.conflicts[self.name] = conflict
return conflict
@property
def requirement_trees(self):
vertex = self.activated.vertex_named(self.state.name)
return [self._requirement_tree_for(r) for r in vertex.requirements]
def _requirement_tree_for(self, requirement):
tree = []
while requirement:
tree.insert(0, requirement)
requirement = self._parent_of(requirement)
return tree
def _indicate_progress(self):
self._iteration_counter += 1
progress_rate = self._ui.progress_rate or self._progress_rate
if self._iteration_rate is None:
if (datetime.now() - self._started_at).total_seconds() >= progress_rate:
self._iteration_rate = self._iteration_counter
if self._iteration_rate and (self._iteration_counter % self._iteration_rate) == 0:
self._ui.indicate_progress()
def _debug(self, message, depth=0):
self._ui.debug(message, depth)
def _attempt_to_activate(self):
self._debug(
'Attempting to activate {}'.format(str(self.possibility)),
self.state.depth,
)
existing_vertex = self.activated.vertex_named(self.state.name)
if existing_vertex.payload:
self._debug(
'Found existing spec ({})'.format(existing_vertex.payload),
self.state.depth
)
self._attempt_to_filter_existing_spec(existing_vertex)
else:
latest = self.possibility.latest_version
possibilities = []
for possibility in self.possibility.possibilities:
if self._provider.is_requirement_satisfied_by(
self.requirement, self.activated, possibility
):
possibilities.append(possibility)
self.possibility.possibilities = possibilities
if self.possibility.latest_version is None:
# ensure there's a possibility for better error messages
if latest:
self.possibility.possibilities.append(latest)
self._create_conflict()
self._unwind_for_conflict()
else:
self._activate_new_spec()
def _attempt_to_filter_existing_spec(self, vertex):
"""
Attempt to update the existing vertex's
`PossibilitySet` with a filtered version.
"""
filtered_set = self._filtered_possibility_set(vertex)
if filtered_set.possibilities:
self.activated.set_payload(self.name, filtered_set)
new_requirements = copy(self.state.requirements)
self._push_state_for_requirements(new_requirements, False)
else:
self._create_conflict()
self._debug(
'Unsatisfied by existing spec ({})'.format(str(vertex.payload)),
self.state.depth
)
self._unwind_for_conflict()
def _filtered_possibility_set(self, vertex):
possibilities = [
p
for p in vertex.payload.possibilities
if p in self.possibility.possibilities
]
return PossibilitySet(
vertex.payload.dependencies,
possibilities
)
def _locked_requirement_named(self, requirement_name):
vertex = self.base.vertex_named(requirement_name)
if vertex:
return vertex.payload
def _activate_new_spec(self):
if self.state.name in self.state.conflicts:
del self.state.conflicts[self.name]
self._debug(
'Activated {} at {}'.format(self.state.name, str(self.possibility)),
self.state.depth
)
self.activated.set_payload(self.state.name, self.possibility)
self._require_nested_dependencies_for(self.possibility)
def _require_nested_dependencies_for(self, possibility_set):
nested_dependencies = self._provider.dependencies_for(
possibility_set.latest_version
)
self._debug(
'Requiring nested dependencies '
'({})'.format(', '.join([str(d) for d in nested_dependencies])),
self.state.depth
)
for d in nested_dependencies:
self.activated.add_child_vertex(
self._provider.name_for(d),
None,
[self._provider.name_for(possibility_set.latest_version)],
d
)
parent_index = len(self._states) - 1
if d not in self._parents_of:
self._parents_of[d] = []
parents = self._parents_of[d]
if not parents:
parents.append(parent_index)
self._push_state_for_requirements(
self.state.requirements + nested_dependencies,
len(nested_dependencies) > 0
)
def _push_state_for_requirements(self,
new_requirements,
requires_sort=True,
new_activated=None):
if new_activated is None:
new_activated = self.activated
if requires_sort:
new_requirements = self._provider.sort_dependencies(
unique(new_requirements), new_activated, self.state.conflicts
)
while True:
new_requirement = None
if new_requirements:
new_requirement = new_requirements.pop(0)
if (
new_requirement is None
or not any([
s.requirement == new_requirement
for s in self._states
])
):
break
new_name = ''
if new_requirement:
new_name = self._provider.name_for(new_requirement)
possibilities = self._possibilities_for_requirement(new_requirement)
self._handle_missing_or_push_dependency_state(
DependencyState(
new_name, new_requirements, new_activated,
new_requirement, possibilities, self.state.depth,
copy(self.state.conflicts),
copy(self.state.unused_unwind_options)
)
)
def _possibilities_for_requirement(self, requirement, activated=None):
if activated is None:
activated = self.activated
if not requirement:
return []
if self._locked_requirement_named(self._provider.name_for(requirement)):
return self._locked_requirement_possibility_set(
requirement, activated
)
return self._group_possibilities(
self._provider.search_for(requirement)
)
def _locked_requirement_possibility_set(self, requirement, activated=None):
if activated is None:
activated = self.activated
all_possibilities = self._provider.search_for(requirement)
locked_requirement = self._locked_requirement_named(
self._provider.name_for(requirement)
)
# Longwinded way to build a possibilities list with either the locked
# requirement or nothing in it. Required, since the API for
# locked_requirement isn't guaranteed.
locked_possibilities = [
possibility
for possibility in all_possibilities
if self._provider.is_requirement_satisfied_by(
locked_requirement, activated, possibility
)
]
return self._group_possibilities(locked_possibilities)
def _group_possibilities(self, possibilities):
possibility_sets = []
current_possibility_set = None
for possibility in reversed(possibilities):
dependencies = self._provider.dependencies_for(possibility)
if current_possibility_set and current_possibility_set.dependencies == dependencies:
current_possibility_set.possibilities.insert(0, possibility)
else:
possibility_sets.insert(
0, PossibilitySet(dependencies, [possibility])
)
current_possibility_set = possibility_sets[0]
return possibility_sets
def _handle_missing_or_push_dependency_state(self, state):
if (
state.requirement
and not state.possibilities
and self._provider.allow_missing(state.requirement)
):
state.activated.detach_vertex_named(state.name)
self._push_state_for_requirements(
copy(state.requirements), False, state.activated
)
else:
self._states.append(state)
state.activated.tag(state)
from typing import Any
from typing import List
from typing import Union
from .contracts import SpecificationProvider
from .contracts import UI
from .dependency_graph import DependencyGraph
from .resolution import Resolution
class Resolver:
def __init__(self,
specification_provider, # type: SpecificationProvider
resolver_ui # type: UI
):
self._specification_provider = specification_provider
self._resolver_ui = resolver_ui
@property
def specification_provider(self): # type: () -> SpecificationProvider
return self._specification_provider
@property
def ui(self): # type: () -> UI
return self._resolver_ui
def resolve(self,
requested, # type: List[Any]
base=None # type: Union[DependencyGraph, None]
): # type: (...) -> DependencyGraph
if base is None:
base = DependencyGraph()
return Resolution(
self._specification_provider,
self._resolver_ui,
requested,
base
).resolve()
from copy import copy
from .dependency_graph import DependencyGraph
class ResolutionState:
def __init__(self, name, requirements, activated,
requirement, possibilities, depth,
conflicts, unused_unwind_options):
self._name = name
self._requirements = requirements
self._activated = activated
self._requirement = requirement
self.possibilities = possibilities
self._depth = depth
self.conflicts = conflicts
self.unused_unwind_options = unused_unwind_options
@property
def name(self):
return self._name
@property
def requirements(self):
return self._requirements
@property
def activated(self):
return self._activated
@property
def requirement(self):
return self._requirement
@property
def depth(self):
return self._depth
@classmethod
def empty(cls):
return cls(None, [], DependencyGraph(), None, None, 0, {}, [])
def __repr__(self):
return '<{} {} ({})>'.format(
self.__class__.__name__,
self._name,
str(self.requirement)
)
class PossibilityState(ResolutionState):
pass
class DependencyState(ResolutionState):
def pop_possibility_state(self):
state = PossibilityState(
self._name,
copy(self._requirements),
self._activated,
self._requirement,
[self.possibilities.pop() if self.possibilities else None],
self._depth + 1,
copy(self.conflicts),
copy(self.unused_unwind_options)
)
state.activated.tag(state)
return state
class UnwindDetails:
def __init__(self,
state_index,
state_requirement,
requirement_tree,
conflicting_requirements,
requirement_trees,
requirements_unwound_to_instead):
self.state_index = state_index
self.state_requirement = state_requirement
self.requirement_tree = requirement_tree
self.conflicting_requirements = conflicting_requirements
self.requirement_trees = requirement_trees
self.requirements_unwound_to_instead = requirements_unwound_to_instead
self._reversed_requirement_tree_index = None
self._sub_dependencies_to_avoid = None
self._all_requirements = None
@property
def reversed_requirement_tree_index(self):
if self._reversed_requirement_tree_index is None:
if self.state_requirement:
self._reversed_requirement_tree_index = list(reversed(
self.requirement_tree
)).index(self.state_requirement)
else:
self._reversed_requirement_tree_index = 999999
return self._reversed_requirement_tree_index
def unwinding_to_primary_requirement(self):
return self.requirement_tree[-1] == self.state_requirement
@property
def sub_dependencies_to_avoid(self):
if self._sub_dependencies_to_avoid is None:
self._sub_dependencies_to_avoid = []
for tree in self.requirement_trees:
try:
index = tree.index(self.state_requirement)
except ValueError:
continue
if tree[index + 1] is not None:
self._sub_dependencies_to_avoid.append(tree[index + 1])
return self._sub_dependencies_to_avoid
@property
def all_requirements(self):
if self._all_requirements is None:
self._all_requirements = [
x
for tree in self.requirement_trees
for x in tree
]
return self._all_requirements
def __eq__(self, other):
if not isinstance(other, UnwindDetails):
return NotImplemented
return (
self.state_index == other.state_index
and (
self.reversed_requirement_tree_index
== other.reversed_requirement_tree_index
)
)
def __lt__(self, other):
if not isinstance(other, UnwindDetails):
return NotImplemented
return self.state_index < other.state_index
def __le__(self, other):
if not isinstance(other, UnwindDetails):
return NotImplemented
return self.state_index <= other.state_index
def __gt__(self, other):
if not isinstance(other, UnwindDetails):
return NotImplemented
return self.state_index > other.state_index
def __ge__(self, other):
if not isinstance(other, UnwindDetails):
return NotImplemented
return self.state_index >= other.state_index
def __hash__(self):
return hash((id(self), self.state_index, self.state_requirement))
def unique(l):
used = set()
return [x for x in l if x not in used and (used.add(x) or True)]
......@@ -6,8 +6,8 @@ from typing import List
from typing import Union
from poetry.packages import Dependency
from poetry.semver.semver import Version
from poetry.semver.semver import VersionRange
from poetry.semver import Version
from poetry.semver import VersionRange
from .failure import SolveFailure
from .incompatibility import Incompatibility
......
import operator
import re
from poetry.semver.constraints import EmptyConstraint
from poetry.semver.constraints import MultiConstraint
from poetry.semver.constraints.base_constraint import BaseConstraint
from .base_constraint import BaseConstraint
from .empty_constraint import EmptyConstraint
from .multi_constraint import MultiConstraint
class GenericConstraint(BaseConstraint):
......
import poetry.packages
from poetry.semver.constraints import EmptyConstraint
from poetry.semver.constraints import MultiConstraint
from poetry.semver.semver import parse_constraint
from poetry.semver.semver import Version
from poetry.semver.semver import VersionConstraint
from poetry.semver.semver import VersionUnion
from poetry.semver import parse_constraint
from poetry.semver import Version
from poetry.semver import VersionConstraint
from poetry.semver import VersionUnion
from poetry.utils.helpers import canonicalize_name
from .constraints.empty_constraint import EmptyConstraint
from .constraints.generic_constraint import GenericConstraint
from .constraints.multi_constraint import MultiConstraint
class Dependency(object):
......
......@@ -4,18 +4,14 @@ import re
from typing import Union
from poetry.semver.constraints import Constraint
from poetry.semver.constraints import EmptyConstraint
from poetry.semver.helpers import parse_stability
from poetry.semver.version_parser import VersionParser
from poetry.semver.semver import Version
from poetry.semver.semver import parse_constraint
from poetry.semver import Version
from poetry.semver import parse_constraint
from poetry.spdx import license_by_id
from poetry.spdx import License
from poetry.utils._compat import Path
from poetry.utils.helpers import canonicalize_name
from poetry.version import parse as parse_version
from .constraints.empty_constraint import EmptyConstraint
from .constraints.generic_constraint import GenericConstraint
from .dependency import Dependency
from .directory_dependency import DirectoryDependency
......
from .version_solver import VersionSolver
def resolve_version(root, provider, locked=None, use_latest=None):
solver = VersionSolver(root, provider, locked=locked, use_latest=use_latest)
with provider.progress():
return solver.solve()
......@@ -11,11 +11,6 @@ from typing import Dict
from typing import List
from typing import Union
from poetry.mixology import DependencyGraph
from poetry.mixology.conflict import Conflict
from poetry.mixology.contracts import SpecificationProvider
from poetry.mixology.contracts import UI
from poetry.packages import Dependency
from poetry.packages import DirectoryDependency
from poetry.packages import FileDependency
......@@ -23,11 +18,11 @@ from poetry.packages import Package
from poetry.packages import VCSDependency
from poetry.packages import dependency_from_pep_508
from poetry.pub.incompatibility import Incompatibility
from poetry.pub.incompatibility_cause import DependencyCause
from poetry.pub.incompatibility_cause import PlatformCause
from poetry.pub.incompatibility_cause import PythonCause
from poetry.pub.term import Term
from poetry.mixology.incompatibility import Incompatibility
from poetry.mixology.incompatibility_cause import DependencyCause
from poetry.mixology.incompatibility_cause import PlatformCause
from poetry.mixology.incompatibility_cause import PythonCause
from poetry.mixology.term import Term
from poetry.repositories import Pool
......@@ -61,7 +56,7 @@ class Indicator(ProgressIndicator):
return '{:.1f}s'.format(elapsed)
class Provider(SpecificationProvider, UI):
class Provider:
UNSAFE_PACKAGES = {'setuptools', 'distribute', 'pip'}
......@@ -74,10 +69,8 @@ class Provider(SpecificationProvider, UI):
self._pool = pool
self._io = io
self._python_constraint = package.python_constraint
self._base_dg = DependencyGraph()
self._search_for = {}
super(Provider, self).__init__(debug=self._io.is_debug())
self._is_debugging = self._io.is_debug()
@property
def pool(self): # type: () -> Pool
......@@ -91,6 +84,9 @@ class Provider(SpecificationProvider, UI):
def name_for_locking_dependency_source(self): # type: () -> str
return 'pyproject.lock'
def is_debugging(self):
return self._is_debugging
def name_for(self, dependency): # type: (Dependency) -> str
"""
Returns the name for the given dependency.
......@@ -345,44 +341,6 @@ class Provider(SpecificationProvider, UI):
and r.name not in self.UNSAFE_PACKAGES
]
def is_requirement_satisfied_by(self,
requirement, # type: Dependency
activated, # type: DependencyGraph
package # type: Package
): # type: (...) -> bool
"""
Determines whether the given requirement is satisfied by the given
spec, in the context of the current activated dependency graph.
"""
if isinstance(requirement, Package):
return requirement == package
if not requirement.accepts(package):
return False
if package.is_prerelease() and not requirement.allows_prereleases():
vertex = activated.vertex_named(package.name)
if not any([r.allows_prereleases() for r in vertex.requirements]):
return False
return (
self._package.python_constraint.matches(package.python_constraint)
and self._package.platform_constraint.matches(package.platform_constraint)
)
def sort_dependencies(self,
dependencies, # type: List[Dependency]
activated, # type: DependencyGraph
conflicts # type: Dict[str, List[Conflict]]
): # type: (...) -> List[Dependency]
return sorted(dependencies, key=lambda d: [
0 if activated.vertex_named(d.name).payload else 1,
0 if activated.vertex_named(d.name).root else 1,
0 if d.allows_prereleases() else 1,
0 if d.name in conflicts else 1
])
# UI
@property
......
from typing import List
from poetry.pub import resolve_version
from poetry.pub.failure import SolveFailure
from poetry.mixology import resolve_version
from poetry.mixology.failure import SolveFailure
from poetry.packages.constraints.generic_constraint import GenericConstraint
from poetry.semver.semver import parse_constraint
from poetry.semver import parse_constraint
from .exceptions import SolverProblemError
......
......@@ -19,9 +19,9 @@ import poetry.packages
from poetry.locations import CACHE_DIR
from poetry.packages import Package
from poetry.packages import dependency_from_pep_508
from poetry.semver.constraints import Constraint
from poetry.semver.constraints.base_constraint import BaseConstraint
from poetry.semver.version_parser import VersionParser
from poetry.semver import parse_constraint
from poetry.semver import Version
from poetry.semver import VersionConstraint
from poetry.utils._compat import Path
from poetry.version.markers import InvalidMarker
......@@ -70,9 +70,8 @@ class LegacyRepository(PyPiRepository):
packages = []
if constraint is not None and not isinstance(constraint,
BaseConstraint):
version_parser = VersionParser()
constraint = version_parser.parse_constraints(constraint)
VersionConstraint):
constraint = parse_constraint(constraint)
key = name
if constraint:
......@@ -88,9 +87,14 @@ class LegacyRepository(PyPiRepository):
if version in versions:
continue
try:
version = Version.parse(version)
except ValueError:
continue
if (
not constraint
or (constraint and constraint.matches(Constraint('=', version)))
or (constraint and constraint.allows(version))
):
versions.append(version)
......
......@@ -29,9 +29,8 @@ from requests import session
from poetry.locations import CACHE_DIR
from poetry.packages import dependency_from_pep_508
from poetry.packages import Package
from poetry.semver.constraints import Constraint
from poetry.semver.semver import parse_constraint
from poetry.semver.semver import VersionConstraint
from poetry.semver import parse_constraint
from poetry.semver import VersionConstraint
from poetry.utils._compat import Path
from poetry.utils._compat import to_str
from poetry.utils.helpers import parse_requires
......@@ -79,7 +78,7 @@ class PyPiRepository(Repository):
def find_packages(self,
name, # type: str
constraint=None, # type: Union[Constraint, str, None]
constraint=None, # type: Union[VersionConstraint, str, None]
extras=None, # type: Union[list, None]
allow_prereleases=False # type: bool
): # type: (...) -> List[Package]
......
from poetry.semver.semver import parse_constraint
from poetry.semver.semver import VersionConstraint
from poetry.version import parse as parse_version
from poetry.semver import parse_constraint
from poetry.semver import VersionConstraint
from .base_repository import BaseRepository
......
from functools import cmp_to_key
import re
from .comparison import less_than
from .constraints import Constraint
from .helpers import normalize_version
from .empty_constraint import EmptyConstraint
from .patterns import BASIC_CONSTRAINT
from .patterns import CARET_CONSTRAINT
from .patterns import TILDE_CONSTRAINT
from .patterns import X_CONSTRAINT
from .version import Version
from .version_constraint import VersionConstraint
from .version_range import VersionRange
from .version_union import VersionUnion
SORT_ASC = 1
SORT_DESC = -1
def parse_constraint(constraints): # type: (str) -> VersionConstraint
if constraints == '*':
return VersionRange()
def satisfied_by(versions, constraints):
"""
Return all versions that satisfy given constraints.
or_constraints = re.split('\s*\|\|?\s*', constraints.strip())
or_groups = []
for constraints in or_constraints:
and_constraints = re.split(
'(?<!^)(?<![=>< ,]) *(?<!-)[, ](?!-) *(?!,|$)',
constraints
)
constraint_objects = []
:type versions: List[str]
:type constraints: str
if len(and_constraints) > 1:
for constraint in and_constraints:
constraint_objects.append(parse_single_constraint(constraint))
else:
constraint_objects.append(parse_single_constraint(and_constraints[0]))
:rtype: List[str]
"""
return [version for version in versions if statisfies(version, constraints)]
if len(constraint_objects) == 1:
constraint = constraint_objects[0]
else:
constraint = constraint_objects[0]
for next_constraint in constraint_objects[1:]:
constraint = constraint.intersect(next_constraint)
or_groups.append(constraint)
def sort(versions):
return _sort(versions, SORT_ASC)
if len(or_groups) == 1:
return or_groups[0]
else:
return VersionUnion.of(*or_groups)
def rsort(versions):
return _sort(versions, SORT_DESC)
def parse_single_constraint(constraint): # type: (str) -> VersionConstraint
m = re.match('(?i)^v?[xX*](\.[xX*])*$', constraint)
if m:
return VersionRange()
# Tilde range
m = TILDE_CONSTRAINT.match(constraint)
if m:
version = Version.parse(m.group(1))
def _sort(versions, direction):
normalized = [
(i, normalize_version(version))
for i, version in enumerate(versions)
]
normalized.sort(
key=cmp_to_key(
lambda x, y:
0 if x[1] == y[1]
else -direction * int(less_than(x[1], y[1]) or -1)
)
)
high = version.stable.next_minor
if len(m.group(1).split('.')) == 1:
high = version.stable.next_major
return VersionRange(version, high, include_min=True)
# Caret range
m = CARET_CONSTRAINT.match(constraint)
if m:
version = Version.parse(m.group(1))
return VersionRange(version, version.next_breaking, include_min=True)
# X Range
m = X_CONSTRAINT.match(constraint)
if m:
op = m.group(1)
major = int(m.group(2))
minor = m.group(3)
if minor is not None:
version = Version(major, int(minor), 0)
result = VersionRange(version, version.next_minor, include_min=True)
else:
if major == 0:
result = VersionRange(max=Version(1, 0, 0))
else:
version = Version(major, 0, 0)
result = VersionRange(version, version.next_major, include_min=True)
if op == '!=':
result = VersionRange().difference(result)
return result
# Basic comparator
m = BASIC_CONSTRAINT.match(constraint)
if m:
op = m.group(1)
version = m.group(2)
try:
version = Version.parse(version)
except ValueError:
raise ValueError('Could not parse version constraint: {}'.format(constraint))
if op == '<':
return VersionRange(max=version)
elif op == '<=':
return VersionRange(max=version, include_max=True)
elif op == '>':
return VersionRange(min=version)
elif op == '>=':
return VersionRange(min=version, include_min=True)
elif op == '!=':
return VersionUnion(
VersionRange(max=version),
VersionRange(min=version)
)
else:
return version
return [versions[i] for i, _ in normalized]
raise ValueError('Could not parse version constraint: {}'.format(constraint))
from .constraints.constraint import Constraint
def greater_than(version1, version2):
"""
Evaluates the expression: version1 > version2.
:type version1: str
:type version2: str
:rtype: bool
"""
return compare(version1, '>', version2)
def greater_than_or_equal(version1, version2):
"""
Evaluates the expression: version1 >= version2.
:type version1: str
:type version2: str
:rtype: bool
"""
return compare(version1, '>=', version2)
def less_than(version1, version2):
"""
Evaluates the expression: version1 < version2.
:type version1: str
:type version2: str
:rtype: bool
"""
return compare(version1, '<', version2)
def less_than_or_equal(version1, version2):
"""
Evaluates the expression: version1 <= version2.
:type version1: str
:type version2: str
:rtype: bool
"""
return compare(version1, '<=', version2)
def equal(version1, version2):
"""
Evaluates the expression: version1 == version2.
:type version1: str
:type version2: str
:rtype: bool
"""
return compare(version1, '==', version2)
def not_equal(version1, version2):
"""
Evaluates the expression: version1 != version2.
:type version1: str
:type version2: str
:rtype: bool
"""
return compare(version1, '!=', version2)
def compare(version1, operator, version2):
"""
Evaluates the expression: $version1 $operator $version2
:type version1: str
:type operator: str
:type version2: str
:rtype: bool
"""
constraint = Constraint(operator, version2)
return constraint.matches(Constraint('==', version1))
from .constraint import Constraint
from .empty_constraint import EmptyConstraint
from .multi_constraint import MultiConstraint
import operator
from poetry.version import parse as parse_version
from poetry.version import version_compare
from .base_constraint import BaseConstraint
class Constraint(BaseConstraint):
OP_EQ = operator.eq
OP_LT = operator.lt
OP_LE = operator.le
OP_GT = operator.gt
OP_GE = operator.ge
OP_NE = operator.ne
_trans_op_str = {
'=': OP_EQ,
'==': OP_EQ,
'<': OP_LT,
'<=': OP_LE,
'>': OP_GT,
'>=': OP_GE,
'!=': OP_NE
}
_trans_op_int = {
OP_EQ: '==',
OP_LT: '<',
OP_LE: '<=',
OP_GT: '>',
OP_GE: '>=',
OP_NE: '!='
}
def __init__(self, operator, version): # type: (str, str) -> None
if operator not in self.supported_operators:
raise ValueError(
'Invalid operator "{}" given, '
'expected one of: {}'
.format(
operator, ', '.join(self.supported_operators)
)
)
self._operator = self._trans_op_str[operator]
self._string_operator = operator
self._version = str(parse_version(version))
@property
def supported_operators(self): # type: () -> list
return list(self._trans_op_str.keys())
@property
def operator(self):
return self._operator
@property
def string_operator(self):
return self._string_operator
@property
def version(self): # type: () -> str
return self._version
def matches(self, provider):
if (
isinstance(provider, self.__class__)
and provider.__class__ is self.__class__
):
return self.match_specific(provider)
# turn matching around to find a match
return provider.matches(self)
def version_compare(self, a, b, operator
): # type: (str, str, str) -> bool
if operator not in self._trans_op_str:
raise ValueError(
'Invalid operator "{}" given, '
'expected one of: {}'
.format(
operator, ', '.join(self.supported_operators)
)
)
return version_compare(a, b, operator)
def match_specific(self, provider): # type: (Constraint) -> bool
no_equal_op = self._trans_op_int[self._operator].replace('=', '')
provider_no_equal_op = self._trans_op_int[provider.operator].replace('=', '')
is_equal_op = self.OP_EQ is self._operator
is_non_equal_op = self.OP_NE is self._operator
is_provider_equal_op = self.OP_EQ is provider.operator
is_provider_non_equal_op = self.OP_NE is provider.operator
# '!=' operator is match when other operator
# is not '==' operator or version is not match
# these kinds of comparisons always have a solution
if is_non_equal_op or is_provider_non_equal_op:
return (not is_equal_op and not is_provider_equal_op
or self.version_compare(provider.version,
self._version,
'!='))
# An example for the condition is <= 2.0 & < 1.0
# These kinds of comparisons always have a solution
if (self._operator is not self.OP_EQ
and no_equal_op == provider_no_equal_op):
return True
if self.version_compare(
provider.version,
self.version,
self._trans_op_int[self._operator]
):
# special case, e.g. require >= 1.0 and provide < 1.0
# 1.0 >= 1.0 but 1.0 is outside of the provided interval
if (
provider.version == self.version
and self._trans_op_int[provider.operator] == provider_no_equal_op
and self._trans_op_int[self.operator] != no_equal_op
):
return False
return True
return False
def __str__(self):
return '{} {}'.format(
self._trans_op_int[self._operator],
self._version
)
def __repr__(self):
return '<Constraint \'{}\'>'.format(str(self))
import re
_modifier_regex = (
'[._-]?'
'(?:(stable|beta|b|RC|c|pre|alpha|a|patch|pl|p|post|[a-z])'
'((?:[.-]?\d+)*)?)?'
'([.-]?dev)?'
)
def normalize_version(version):
"""
Normalizes a version string to be able to perform comparisons on it.
"""
version = version.strip()
# strip off build metadata
m = re.match('^([^,\s+]+)\+[^\s]+$', version)
if m:
version = m.group(1)
index = None
# Match classic versioning
m = re.match(
'(?i)^v?(\d{{1,5}})(\.\d+)?(\.\d+)?(\.\d+)?{}$'.format(
_modifier_regex
),
version
)
if m:
version = '{}{}{}{}'.format(
m.group(1),
m.group(2) if m.group(2) else '.0',
m.group(3) if m.group(3) else '.0',
m.group(4) if m.group(4) else '.0',
)
index = 5
else:
# Some versions have the form M.m.p-\d+
# which means M.m.p-post\d+
m = re.match(
'(?i)^v?(\d{1,5})(\.\d+)?(\.\d+)?(\.\d+)?-(\d+)$',
version
)
if m:
version = '{}{}{}{}'.format(
m.group(1),
m.group(2) if m.group(2) else '.0',
m.group(3) if m.group(3) else '.0',
m.group(4) if m.group(4) else '.0',
)
if m.group(5):
version += '-post.' + m.group(5)
m = re.match(
'(?i)^v?(\d{{1,5}})(\.\d+)?(\.\d+)?(\.\d+)?{}$'.format(
_modifier_regex
),
version
)
index = 5
else:
# Match date(time) based versioning
m = re.match(
'(?i)^v?(\d{{4}}(?:[.:-]?\d{{2}}){{1,6}}(?:[.:-]?\d{{1,3}})?){}$'.format(
_modifier_regex
),
version
)
if m:
version = re.sub('\D', '.', m.group(1))
index = 2
# add version modifiers if a version was matched
if index is not None:
if len(m.groups()) - 1 >= index and m.group(index):
if m.group(index) == 'post':
# Post releases should be considered
# stable releases
if '-post' in version:
return version
version = '{}-post'.format(version)
else:
version = '{}-{}'.format(
version, _expand_stability(m.group(index))
)
if m.group(index + 1):
version = '{}.{}'.format(
version, m.group(index + 1).lstrip('.-')
)
return version
raise ValueError('Invalid version string "{}"'.format(version))
def normalize_stability(stability): # type: (str) -> str
stability = stability.lower()
if stability == 'rc':
return 'RC'
return stability
def parse_stability(version): # type: (str) -> str
"""
Returns the stability of a version.
"""
version = re.sub('(?i)#.+$', '', version)
if 'dev-' == version[:4] or '-dev' == version[-4:]:
return 'dev'
m = re.search('(?i){}(?:\+.*)?$'.format(_modifier_regex), version.lower())
if m:
if m.group(3):
return 'dev'
if m.group(1):
if m.group(1) in ['beta', 'b']:
return 'beta'
elif m.group(1) in ['alpha', 'a']:
return 'alpha'
elif m.group(1) in ['rc', 'c']:
return 'RC'
elif m.group(1) == 'post':
return 'stable'
else:
return 'dev'
return 'stable'
def _expand_stability(stability): # type: (str) -> str
stability = stability.lower()
if stability == 'a':
return 'alpha'
elif stability == 'b':
return 'beta'
elif stability in ['c', 'pre']:
return 'rc'
elif stability in ['p', 'pl']:
return 'patch'
elif stability in ['post']:
return 'post'
return stability
import re
from .empty_constraint import EmptyConstraint
from .patterns import BASIC_CONSTRAINT
from .patterns import CARET_CONSTRAINT
from .patterns import TILDE_CONSTRAINT
from .patterns import X_CONSTRAINT
from .version import Version
from .version_constraint import VersionConstraint
from .version_range import VersionRange
from .version_union import VersionUnion
def parse_constraint(constraints): # type: (str) -> VersionConstraint
if constraints == '*':
return VersionRange()
or_constraints = re.split('\s*\|\|?\s*', constraints.strip())
or_groups = []
for constraints in or_constraints:
and_constraints = re.split(
'(?<!^)(?<![=>< ,]) *(?<!-)[, ](?!-) *(?!,|$)',
constraints
)
constraint_objects = []
if len(and_constraints) > 1:
for constraint in and_constraints:
constraint_objects.append(parse_single_constraint(constraint))
else:
constraint_objects.append(parse_single_constraint(and_constraints[0]))
if len(constraint_objects) == 1:
constraint = constraint_objects[0]
else:
constraint = constraint_objects[0]
for next_constraint in constraint_objects[1:]:
constraint = constraint.intersect(next_constraint)
or_groups.append(constraint)
if len(or_groups) == 1:
return or_groups[0]
else:
return VersionUnion.of(*or_groups)
def parse_single_constraint(constraint): # type: (str) -> VersionConstraint
m = re.match('(?i)^v?[xX*](\.[xX*])*$', constraint)
if m:
return VersionRange()
# Tilde range
m = TILDE_CONSTRAINT.match(constraint)
if m:
version = Version.parse(m.group(1))
high = version.stable.next_minor
if len(m.group(1).split('.')) == 1:
high = version.stable.next_major
return VersionRange(version, high, include_min=True)
# Caret range
m = CARET_CONSTRAINT.match(constraint)
if m:
version = Version.parse(m.group(1))
return VersionRange(version, version.next_breaking, include_min=True)
# X Range
m = X_CONSTRAINT.match(constraint)
if m:
op = m.group(1)
major = int(m.group(2))
minor = m.group(3)
if minor is not None:
version = Version(major, int(minor), 0)
result = VersionRange(version, version.next_minor, include_min=True)
else:
if major == 0:
result = VersionRange(max=Version(1, 0, 0))
else:
version = Version(major, 0, 0)
result = VersionRange(version, version.next_major, include_min=True)
if op == '!=':
result = VersionRange().difference(result)
return result
# Basic comparator
m = BASIC_CONSTRAINT.match(constraint)
if m:
op = m.group(1)
version = m.group(2)
try:
version = Version.parse(version)
except ValueError:
raise ValueError('Could not parse version constraint: {}'.format(constraint))
if op == '<':
return VersionRange(max=version)
elif op == '<=':
return VersionRange(max=version, include_max=True)
elif op == '>':
return VersionRange(min=version)
elif op == '>=':
return VersionRange(min=version, include_min=True)
elif op == '!=':
return VersionUnion(
VersionRange(max=version),
VersionRange(min=version)
)
else:
return version
raise ValueError('Could not parse version constraint: {}'.format(constraint))
......@@ -126,7 +126,7 @@ class Version(VersionRange):
@property
def first_prerelease(self): # type: () -> Version
return Version(self.major, self.minor, self.patch, '0')
return Version.parse('{}.{}.{}-alpha.0'.format(self.major, self.minor, self.patch))
@property
def min(self):
......
import re
from typing import Tuple
from typing import Union
from .constraints.constraint import Constraint
from .constraints.base_constraint import BaseConstraint
from .constraints.empty_constraint import EmptyConstraint
from .constraints.multi_constraint import MultiConstraint
from .constraints.wildcard_constraint import WilcardConstraint
from .helpers import normalize_version, _expand_stability, parse_stability
class VersionParser:
_modifier_regex = (
'[._-]?'
'(?:(stable|beta|b|RC|alpha|a|patch|post|pl|p)((?:[.-]?\d+)*)?)?'
'([.-]?dev)?'
)
_stabilities = [
'stable', 'RC', 'beta', 'alpha', 'dev'
]
def parse_constraints(
self, constraints
): # type: (str) -> Union[Constraint, MultiConstraint]
"""
Parses a constraint string into
MultiConstraint and/or Constraint objects.
"""
pretty_constraint = constraints
m = re.match(
'(?i)([^,\s]*?)@({})$'.format('|'.join(self._stabilities)),
constraints
)
if m:
constraints = m.group(1)
if not constraints:
constraints = '*'
or_constraints = re.split('\s*\|\|?\s*', constraints.strip())
or_groups = []
for constraints in or_constraints:
and_constraints = re.split(
'(?<!^)(?<![=>< ,]) *(?<!-)[, ](?!-) *(?!,|$)',
constraints
)
if len(and_constraints) > 1:
constraint_objects = []
for constraint in and_constraints:
for parsed_constraint in self._parse_constraint(constraint):
constraint_objects.append(parsed_constraint)
else:
constraint_objects = self._parse_constraint(and_constraints[0])
if len(constraint_objects) == 1:
constraint = constraint_objects[0]
else:
constraint = MultiConstraint(constraint_objects)
or_groups.append(constraint)
if len(or_groups) == 1:
constraint = or_groups[0]
elif len(or_groups) == 2:
# parse the two OR groups and if they are contiguous we collapse
# them into one constraint
a = str(or_groups[0])
b = str(or_groups[1])
pos_a = a.find('<', 4)
pos_b = a.find('<', 4)
if (
isinstance(or_groups[0], MultiConstraint)
and isinstance(or_groups[1], MultiConstraint)
and len(or_groups[0].constraints)
and len(or_groups[1].constraints)
and a[:3] == '>=' and pos_a != -1
and b[:3] == '>=' and pos_b != -1
and a[pos_a + 2:-1] == b[4:pos_b - 5]
):
constraint = MultiConstraint(
Constraint('>=', a[4:pos_a - 5]),
Constraint('<', b[pos_b + 2:-1])
)
else:
constraint = MultiConstraint(or_groups, False)
else:
constraint = MultiConstraint(or_groups, False)
constraint.pretty_string = pretty_constraint
return constraint
def _parse_constraint(
self, constraint
): # type: (str) -> Union[Tuple[BaseConstraint], Tuple[BaseConstraint, BaseConstraint]]
m = re.match('(?i)^v?[xX*](\.[xX*])*$', constraint)
if m:
return EmptyConstraint(),
# Some versions have the form M.m.p-\d+
# which means M.m.p-post\d+
m = re.match(
'(?i)^(~=?|\^|<> ?|!= ?|>=? ?|<=? ?|==? ?)v?(\d{{1,5}})(\.\d+)?(\.\d+)?(\.\d+)?-(\d+){}$'.format(
self._modifier_regex
),
constraint
)
if m:
constraint = '{}{}{}{}{}'.format(
m.group(1),
m.group(2),
m.group(3) if m.group(3) else '.0',
m.group(4) if m.group(4) else '.0',
m.group(5) if m.group(5) else '.0',
)
if m.group(6):
constraint += '-post.' + m.group(6)
version_regex = (
'v?(\d+)(?:\.(\d+))?(?:\.(\d+))?(?:\.(\d+))?{}(?:\+[^\s]+)?'
).format(self._modifier_regex)
# Tilde range
#
# Like wildcard constraints, unsuffixed tilde constraints
# say that they must be greater than the previous version,
# to ensure that unstable instances of the current version are allowed.
# However, if a stability suffix is added to the constraint,
# then a >= match on the current version is used instead.
m = re.match('(?i)^~=?{}$'.format(version_regex), constraint)
if m:
# Work out which position in the version we are operating at
if m.group(4):
position = 3
elif m.group(3):
position = 2
elif m.group(2):
position = 2
else:
position = 0
# Calculate the stability suffix
stability_suffix = ''
if m.group(5):
stability_suffix += '-{}{}'.format(
_expand_stability(m.group(5)),
'.' + m.group(6) if m.group(6) else ''
)
low_version = self._manipulate_version_string(
m.groups(), position, 0
) + stability_suffix
lower_bound = Constraint('>=', low_version)
# For upper bound,
# we increment the position of one more significance,
# but high_position = 0 would be illegal
high_position = max(0, position - 1)
high_version = self._manipulate_version_string(
m.groups(), high_position, 1
)
upper_bound = Constraint('<', high_version)
return lower_bound, upper_bound
# Caret range
#
# Allows changes that do not modify
# the left-most non-zero digit in the [major, minor, patch] tuple.
# In other words, this allows:
# - patch and minor updates for versions 1.0.0 and above,
# - patch updates for versions 0.X >=0.1.0,
# - and no updates for versions 0.0.X
m = re.match('^\^{}($)'.format(version_regex), constraint)
if m:
if m.group(1) != '0' or not m.group(2):
position = 0
elif m.group(2) != '0' or not m.group(3):
position = 1
else:
position = 2
# Calculate the stability suffix
stability_suffix = ''
if m.group(5):
stability_suffix += '-{}{}'.format(
_expand_stability(m.group(5)),
'.' + m.group(6) if m.group(6) else ''
)
low_version = normalize_version(constraint[1:])
lower_bound = Constraint('>=', low_version)
# For upper bound,
# we increment the position of one more significance,
# but high_position = 0 would be illegal
high_version = self._manipulate_version_string(
m.groups(), position, 1
)
upper_bound = Constraint('<', high_version)
return lower_bound, upper_bound
# X range
#
# Any of X, x, or * may be used to "stand in"
# for one of the numeric values in the [major, minor, patch] tuple.
# A partial version range is treated as an X-Range,
# so the special character is in fact optional.
m = re.match(
'^(!= ?|==)?v?(\d+)(?:\.(\d+))?(?:\.(\d+))?(?:\.[xX*])+$',
constraint
)
if m:
# We just leave it as is
return WilcardConstraint(constraint),
# Basic Comparators
m = re.match('^(<>|!=|>=?|<=?|==?)?\s*(.*)', constraint)
if m:
try:
version = normalize_version(m.group(2))
stability = parse_stability(version)
stability_re = re.match(
'(?:[^-]*)(-{})$'.format(self._modifier_regex),
m.group(2).lower()
)
if stability == 'stable' and stability_re:
version = version.split('-')[0] + stability_re.group(1)
return Constraint(m.group(1) or '=', version),
except ValueError:
pass
raise ValueError(
'Could not parse version constraint: {}'.format(constraint)
)
def _manipulate_version_string(self, matches, position,
increment=0, pad='0'):
"""
Increment, decrement, or simply pad a version number.
"""
matches = [matches[i]
if i <= len(matches) - 1 and matches[i] is not None else pad
for i in range(4)]
for i in range(3, -1, -1):
if i > position:
matches[i] = pad
elif i == position and increment:
matches[i] = int(matches[i]) + increment
# If matches[i] was 0, carry the decrement
if matches[i] < 0:
matches[i] = pad
position -= 1
# Return null on a carry overflow
if i == 1:
return
return '{}.{}.{}.{}'.format(matches[0], matches[1],
matches[2], matches[3])
......@@ -42,30 +42,3 @@ def parse(version, # type: str
raise
return LegacyVersion(version)
def version_compare(version1, version2, operator
): # type: (str, str, str) -> bool
from poetry.semver.helpers import normalize_version
if operator in _trans_op:
operator = _trans_op[operator]
elif operator in _trans_op.values():
pass
else:
raise ValueError('Invalid operator')
version1 = parse(version1)
version2 = parse(version2)
try:
version1 = parse(normalize_version(str(version1)))
except ValueError:
pass
try:
version2 = parse(normalize_version(str(version2)))
except ValueError:
pass
return operator(version1, version2)
from poetry.semver.semver import parse_constraint
from poetry.semver.semver import VersionUnion
from poetry.semver import parse_constraint
from poetry.semver import VersionUnion
PYTHON_VERSION = [
'2.7.*',
......
......@@ -17,7 +17,7 @@ from pyparsing import (
from pyparsing import ZeroOrMore, Word, Optional, Regex, Combine
from pyparsing import Literal as L # noqa
from poetry.semver.semver import parse_constraint
from poetry.semver import parse_constraint
from .markers import MARKER_EXPR, Marker
......
import re
from typing import Union
from poetry.packages import Package
from poetry.semver.semver import parse_constraint
from poetry.semver.semver import Version
from poetry.semver import parse_constraint
from poetry.semver import Version
class VersionSelector(object):
......
......@@ -14,25 +14,25 @@ def command():
('0.0.0', 'patch', '0.0.1'),
('0.0.0', 'minor', '0.1.0'),
('0.0.0', 'major', '1.0.0'),
('0.0', 'major', '1.0'),
('0.0', 'minor', '0.1'),
('0.0', 'major', '1.0.0'),
('0.0', 'minor', '0.1.0'),
('0.0', 'patch', '0.0.1'),
('1.2.3', 'patch', '1.2.4'),
('1.2.3', 'minor', '1.3.0'),
('1.2.3', 'major', '2.0.0'),
('1.2.3', 'prepatch', '1.2.4a0'),
('1.2.3', 'preminor', '1.3.0a0'),
('1.2.3', 'premajor', '2.0.0a0'),
('1.2.3', 'prepatch', '1.2.4-alpha.0'),
('1.2.3', 'preminor', '1.3.0-alpha.0'),
('1.2.3', 'premajor', '2.0.0-alpha.0'),
('1.2.3-beta.1', 'patch', '1.2.3'),
('1.2.3-beta.1', 'minor', '1.3.0'),
('1.2.3-beta.1', 'major', '2.0.0'),
('1.2.3-beta.1', 'prerelease', '1.2.3-beta.2'),
('1.2.3-beta1', 'prerelease', '1.2.3-beta2'),
('1.2.3beta1', 'prerelease', '1.2.3beta2'),
('1.2.3b1', 'prerelease', '1.2.3b2'),
('1.2.3', 'prerelease', '1.2.4a0')
('1.2.3-beta1', 'prerelease', '1.2.3-beta.2'),
('1.2.3beta1', 'prerelease', '1.2.3-beta.2'),
('1.2.3b1', 'prerelease', '1.2.3-beta.2'),
('1.2.3', 'prerelease', '1.2.4-alpha.0')
]
)
def test_increment_version(version, rule, expected, command):
assert expected == command.increment_version(version, rule)
assert expected == command.increment_version(version, rule).text
......@@ -52,14 +52,6 @@ def setup(mocker, installer):
p = mocker.patch('poetry.vcs.git.Git.rev_parse')
p.return_value = '9cf87a285a2d3fbb0b9fa621997b3acc3631ed24'
# Patch provider progress rate to have a consistent
# dependency resolution output
p = mocker.patch(
'poetry.puzzle.provider.Provider.progress_rate',
new_callable=mocker.PropertyMock
)
p.return_value = 3600
class Application(BaseApplication):
......
from poetry.packages import Package
from poetry.pub.failure import SolveFailure
from poetry.pub.version_solver import VersionSolver
from poetry.mixology.failure import SolveFailure
from poetry.mixology.version_solver import VersionSolver
def add_to_repo(repository, name, version, deps=None):
......
import pytest
from poetry.semver.constraints.constraint import Constraint
@pytest.mark.parametrize(
'require_op, require_version, provide_op, provide_version',
[
('==', '1', '==', '1'),
('>=', '1', '>=', '2'),
('>=', '2', '>=', '1'),
('>=', '2', '>', '1'),
('<=', '2', '>=', '1'),
('>=', '1', '<=', '2'),
('==', '2', '>=', '2'),
('!=', '1', '!=', '1'),
('!=', '1', '==', '2'),
('!=', '1', '<', '1'),
('!=', '1', '<=', '1'),
('!=', '1', '>', '1'),
('!=', '1', '>=', '1')
]
)
def test_version_match_succeeds(require_op, require_version,
provide_op, provide_version):
require = Constraint(require_op, require_version)
provide = Constraint(provide_op, provide_version)
assert require.matches(provide)
@pytest.mark.parametrize(
'require_op, require_version, provide_op, provide_version',
[
('==', '1', '==', '2'),
('>=', '2', '<=', '1'),
('>=', '2', '<', '2'),
('<=', '2', '>', '2'),
('>', '2', '<=', '2'),
('<=', '1', '>=', '2'),
('>=', '2', '<=', '1'),
('==', '2', '<', '2'),
('!=', '1', '==', '1'),
('==', '1', '!=', '1'),
]
)
def test_version_match_fails(require_op, require_version,
provide_op, provide_version):
require = Constraint(require_op, require_version)
provide = Constraint(provide_op, provide_version)
assert not require.matches(provide)
def test_invalid_operators():
with pytest.raises(ValueError):
Constraint('invalid', '1.2.3')
from poetry.semver.constraints.constraint import Constraint
from poetry.semver.constraints.multi_constraint import MultiConstraint
def test_multi_version_match_succeeds():
require_start = Constraint('>', '1.0')
require_end = Constraint('<', '1.2')
provider = Constraint('==', '1.1')
multi = MultiConstraint((require_start, require_end))
assert multi.matches(provider)
def test_multi_version_provided_match_succeeds():
require_start = Constraint('>', '1.0')
require_end = Constraint('<', '1.2')
provide_start = Constraint('>=', '1.1')
provide_end = Constraint('<', '2.0')
multi_require = MultiConstraint((require_start, require_end))
multi_provide = MultiConstraint((provide_start, provide_end))
assert multi_require.matches(multi_provide)
def test_multi_version_match_fails():
require_start = Constraint('>', '1.0')
require_end = Constraint('<', '1.2')
provider = Constraint('==', '1.2')
multi = MultiConstraint((require_start, require_end))
assert not multi.matches(provider)
import pytest
from poetry.semver.comparison import compare
from poetry.semver.comparison import equal
from poetry.semver.comparison import greater_than
from poetry.semver.comparison import greater_than_or_equal
from poetry.semver.comparison import less_than
from poetry.semver.comparison import less_than_or_equal
from poetry.semver.comparison import not_equal
@pytest.mark.parametrize(
'version1, version2, expected',
[
('1.25.0', '1.24.0', True),
('1.25.0', '1.25.0', False),
('1.25.0', '1.26.0', False),
]
)
def test_greater_than(version1, version2, expected):
if expected is True:
assert greater_than(version1, version2)
else:
assert not greater_than(version1, version2)
@pytest.mark.parametrize(
'version1, version2, expected',
[
('1.25.0', '1.24.0', True),
('1.25.0', '1.25.0', True),
('1.25.0', '1.26.0', False),
]
)
def test_greater_than_or_equal(version1, version2, expected):
if expected is True:
assert greater_than_or_equal(version1, version2)
else:
assert not greater_than_or_equal(version1, version2)
@pytest.mark.parametrize(
'version1, version2, expected',
[
('1.25.0', '1.24.0', False),
('1.25.0', '1.25.0', False),
('1.25.0', '1.26.0', True),
('1.25.0', '1.26.0-beta', True),
('1.25.0', '1.25.0-beta', False),
]
)
def test_less_than(version1, version2, expected):
if expected is True:
assert less_than(version1, version2)
else:
assert not less_than(version1, version2)
@pytest.mark.parametrize(
'version1, version2, expected',
[
('1.25.0', '1.24.0', False),
('1.25.0', '1.25.0', True),
('1.25.0', '1.26.0', True),
]
)
def test_less_than_or_equal(version1, version2, expected):
if expected is True:
assert less_than_or_equal(version1, version2)
else:
assert not less_than_or_equal(version1, version2)
@pytest.mark.parametrize(
'version1, version2, expected',
[
('1.25.0', '1.24.0', False),
('1.25.0', '1.25.0', True),
('1.25.0', '1.26.0', False),
]
)
def test_equal(version1, version2, expected):
if expected is True:
assert equal(version1, version2)
else:
assert not equal(version1, version2)
@pytest.mark.parametrize(
'version1, version2, expected',
[
('1.25.0', '1.24.0', True),
('1.25.0', '1.25.0', False),
('1.25.0', '1.26.0', True),
]
)
def test_not_equal(version1, version2, expected):
if expected is True:
assert not_equal(version1, version2)
else:
assert not not_equal(version1, version2)
@pytest.mark.parametrize(
'version1, operator, version2, expected',
[
('1.25.0', '>', '1.24.0', True),
('1.25.0', '>', '1.25.0', False),
('1.25.0', '>', '1.26.0', False),
('1.25.0', '>=', '1.24.0', True),
('1.25.0', '>=', '1.25.0', True),
('1.25.0', '>=', '1.26.0', False),
('1.25.0', '<', '1.24.0', False),
('1.25.0', '<', '1.25.0', False),
('1.25.0', '<', '1.26.0', True),
('1.25.0-beta2.1', '<', '1.25.0-b.3', True),
('1.25.0-b2.1', '<', '1.25.0beta.3', True),
('1.25.0-b-2.1', '<', '1.25.0-rc', True),
('1.25.0', '<=', '1.24.0', False),
('1.25.0', '<=', '1.25.0', True),
('1.25.0', '<=', '1.26.0', True),
('1.25.0', '==', '1.24.0', False),
('1.25.0', '==', '1.25.0', True),
('1.25.0', '==', '1.26.0', False),
('1.25.0-beta2.1', '==', '1.25.0-b.2.1', True),
('1.25.0beta2.1', '==', '1.25.0-b2.1', True),
('1.25.0', '=', '1.24.0', False),
('1.25.0', '=', '1.25.0', True),
('1.25.0', '=', '1.26.0', False),
('1.25.0', '!=', '1.24.0', True),
('1.25.0', '!=', '1.25.0', False),
('1.25.0', '!=', '1.26.0', True),
]
)
def test_compare(version1, operator, version2, expected):
if expected is True:
assert compare(version1, operator, version2)
else:
assert not compare(version1, operator, version2)
import pytest
from poetry.semver.helpers import normalize_version
@pytest.mark.parametrize(
'version,expected',
[
('1.0.0', '1.0.0.0'),
('1.2.3.4', '1.2.3.4'),
('1.0.0RC1', '1.0.0.0-rc.1'),
('1.0.0rC13', '1.0.0.0-rc.13'),
('1.0.0.RC.15-dev', '1.0.0.0-rc.15'),
('1.0.0-rc1', '1.0.0.0-rc.1'),
('1.0.0.pl3', '1.0.0.0-patch.3'),
('1.0', '1.0.0.0'),
('0', '0.0.0.0'),
('10.4.13-b', '10.4.13.0-beta'),
('10.4.13-b5', '10.4.13.0-beta.5'),
('v1.0.0', '1.0.0.0'),
('2010.01', '2010.01.0.0'),
('2010.01.02', '2010.01.02.0'),
('v20100102', '20100102'),
('2010-01-02', '2010.01.02'),
('2010-01-02.5', '2010.01.02.5'),
('20100102-203040', '20100102.203040'),
('20100102203040-10', '20100102203040.10'),
('20100102-203040-p1', '20100102.203040-patch.1'),
('1.0.0-beta.5+foo', '1.0.0.0-beta.5'),
('0.6c', '0.6.0.0-rc'),
('3.0.17-20140602', '3.0.17.0-post.20140602'),
('3.0pre', '3.0.0.0-rc')
]
)
def test_normalize(version, expected):
assert normalize_version(version) == expected
@pytest.mark.parametrize(
'version',
[
'',
'1.0.0-meh',
'1.0.0.0.0',
'1.0.0+foo bar',
]
)
def test_normalize_fail(version):
with pytest.raises(ValueError):
normalize_version(version)
import pytest
from poetry.semver.semver import parse_constraint
from poetry.semver.semver import Version
from poetry.semver.semver import VersionRange
from poetry.semver import parse_constraint
from poetry.semver import Version
from poetry.semver import VersionRange
@pytest.mark.parametrize(
......
import pytest
from poetry.semver.semver import EmptyConstraint
from poetry.semver.semver import Version
from poetry.semver.semver import VersionRange
from poetry.semver import EmptyConstraint
from poetry.semver import Version
from poetry.semver import VersionRange
@pytest.mark.parametrize(
......
import pytest
from poetry.semver.version_parser import VersionParser
from poetry.semver.constraints.constraint import Constraint
from poetry.semver.constraints.empty_constraint import EmptyConstraint
from poetry.semver.constraints.multi_constraint import MultiConstraint
@pytest.fixture
def parser():
return VersionParser()
@pytest.mark.parametrize(
'input,constraint',
[
('*', EmptyConstraint()),
('*.*', EmptyConstraint()),
('v*.*', EmptyConstraint()),
('*.x.*', EmptyConstraint()),
('x.X.x.*', EmptyConstraint()),
('!=1.0.0', Constraint('!=', '1.0.0.0')),
('>1.0.0', Constraint('>', '1.0.0.0')),
('<1.2.3.4', Constraint('<', '1.2.3.4')),
('<=1.2.3', Constraint('<=', '1.2.3.0')),
('>=1.2.3', Constraint('>=', '1.2.3.0')),
('=1.2.3', Constraint('=', '1.2.3.0')),
('1.2.3', Constraint('=', '1.2.3.0')),
('=1.0', Constraint('=', '1.0.0.0')),
('1.2.3b5', Constraint('=', '1.2.3.0-beta.5')),
('>= 1.2.3', Constraint('>=', '1.2.3.0'))
]
)
def test_parse_constraints_simple(parser, input, constraint):
assert str(parser.parse_constraints(input)) == str(constraint)
@pytest.mark.parametrize(
'input,min,max',
[
('v2.*', Constraint('>=', '2.0.0.0'), Constraint('<', '3.0.0.0')),
('2.*.*', Constraint('>=', '2.0.0.0'), Constraint('<', '3.0.0.0')),
('20.*', Constraint('>=', '20.0.0.0'), Constraint('<', '21.0.0.0')),
('20.*.*', Constraint('>=', '20.0.0.0'), Constraint('<', '21.0.0.0')),
('2.0.*', Constraint('>=', '2.0.0.0'), Constraint('<', '2.1.0.0')),
('2.x', Constraint('>=', '2.0.0.0'), Constraint('<', '3.0.0.0')),
('2.x.x', Constraint('>=', '2.0.0.0'), Constraint('<', '3.0.0.0')),
('2.2.X', Constraint('>=', '2.2.0.0'), Constraint('<', '2.3.0.0')),
('0.*', None, Constraint('<', '1.0.0.0')),
('0.*.*', None, Constraint('<', '1.0.0.0')),
('0.x', None, Constraint('<', '1.0.0.0')),
]
)
def test_parse_constraints_wildcard(parser, input, min, max):
if min:
expected = MultiConstraint((min, max))
else:
expected = max
constraint = parser.parse_constraints(input)
assert str(constraint.constraint) == str(expected)
@pytest.mark.parametrize(
'input,min,max',
[
('!=v2.*', Constraint('<', '2.0.0.0'), Constraint('>=', '3.0.0.0')),
('!=2.*.*', Constraint('<', '2.0.0.0'), Constraint('>=', '3.0.0.0')),
('!=2.0.*', Constraint('<', '2.0.0.0'), Constraint('>=', '2.1.0.0')),
('!=0.*', None, Constraint('>=', '1.0.0.0')),
('!=0.*.*', None, Constraint('>=', '1.0.0.0')),
]
)
def test_parse_constraints_negative_wildcard(parser, input, min, max):
if min:
expected = MultiConstraint((min, max), conjunctive=False)
else:
expected = max
constraint = parser.parse_constraints(input)
assert str(constraint.constraint) == str(expected)
@pytest.mark.parametrize(
'input,min,max',
[
('~v1', Constraint('>=', '1.0.0.0'), Constraint('<', '2.0.0.0')),
('~1.0', Constraint('>=', '1.0.0.0'), Constraint('<', '1.1.0.0')),
('~1.0.0', Constraint('>=', '1.0.0.0'), Constraint('<', '1.1.0.0')),
('~1.2', Constraint('>=', '1.2.0.0'), Constraint('<', '1.3.0.0')),
('~1.2.3', Constraint('>=', '1.2.3.0'), Constraint('<', '1.3.0.0')),
('~1.2.3.4', Constraint('>=', '1.2.3.4'), Constraint('<', '1.2.4.0')),
('~1.2-beta', Constraint('>=', '1.2.0.0-beta'), Constraint('<', '1.3.0.0')),
('~1.2-b2', Constraint('>=', '1.2.0.0-beta.2'), Constraint('<', '1.3.0.0')),
('~0.3', Constraint('>=', '0.3.0.0'), Constraint('<', '0.4.0.0')),
]
)
def test_parse_constraints_tilde(parser, input, min, max):
if min:
expected = MultiConstraint((min, max))
else:
expected = max
assert str(parser.parse_constraints(input)) == str(expected)
@pytest.mark.parametrize(
'input,min,max',
[
('^v1', Constraint('>=', '1.0.0.0'), Constraint('<', '2.0.0.0')),
('^0', Constraint('>=', '0.0.0.0'), Constraint('<', '1.0.0.0')),
('^0.0', Constraint('>=', '0.0.0.0'), Constraint('<', '0.1.0.0')),
('^1.2', Constraint('>=', '1.2.0.0'), Constraint('<', '2.0.0.0')),
('^1.2.3-beta.2', Constraint('>=', '1.2.3.0-beta.2'), Constraint('<', '2.0.0.0')),
('^1.2.3.4', Constraint('>=', '1.2.3.4'), Constraint('<', '2.0.0.0')),
('^1.2.3', Constraint('>=', '1.2.3.0'), Constraint('<', '2.0.0.0')),
('^0.2.3', Constraint('>=', '0.2.3.0'), Constraint('<', '0.3.0.0')),
('^0.2', Constraint('>=', '0.2.0.0'), Constraint('<', '0.3.0.0')),
('^0.2.0', Constraint('>=', '0.2.0.0'), Constraint('<', '0.3.0.0')),
('^0.0.3', Constraint('>=', '0.0.3.0'), Constraint('<', '0.0.4.0')),
]
)
def test_parse_constraints_caret(parser, input, min, max):
if min:
expected = MultiConstraint((min, max))
else:
expected = max
assert str(parser.parse_constraints(input)) == str(expected)
@pytest.mark.parametrize(
'input',
[
'>2.0,<=3.0',
'>2.0 <=3.0',
'>2.0 <=3.0',
'>2.0, <=3.0',
'>2.0 ,<=3.0',
'>2.0 , <=3.0',
'>2.0 , <=3.0',
'> 2.0 <= 3.0',
'> 2.0 , <= 3.0',
' > 2.0 , <= 3.0 ',
]
)
def test_parse_constraints_multi(parser, input):
first = Constraint('>', '2.0.0.0')
second = Constraint('<=', '3.0.0.0')
multi = MultiConstraint((first, second))
assert str(parser.parse_constraints(input)) == str(multi)
@pytest.mark.parametrize(
'input',
[
'>2.0,<2.0.5 | >2.0.6',
'>2.0,<2.0.5 || >2.0.6',
'> 2.0 , <2.0.5 | > 2.0.6',
]
)
def test_parse_constraints_multi2(parser, input):
first = Constraint('>', '2.0.0.0')
second = Constraint('<', '2.0.5.0')
third = Constraint('>', '2.0.6.0')
multi1 = MultiConstraint((first, second))
multi2 = MultiConstraint((multi1, third), False)
assert str(parser.parse_constraints(input)) == str(multi2)
@pytest.mark.parametrize(
'input',
[
'',
'1.0.0-meh',
'>2.0,,<=3.0',
'>2.0 ,, <=3.0',
'>2.0 ||| <=3.0',
]
)
def test_parse_constraints_fail(parser, input):
with pytest.raises(ValueError):
parser.parse_constraints(input)
import pytest
from poetry.semver.semver import EmptyConstraint
from poetry.semver.semver import Version
from poetry.semver.semver import VersionRange
from poetry.semver import EmptyConstraint
from poetry.semver import Version
from poetry.semver import VersionRange
@pytest.fixture()
......
from poetry.version.helpers import format_python_constraint
from poetry.semver.semver import parse_constraint
from poetry.semver import parse_constraint
def test_format_python_constraint():
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment