Commit 83ec6e9f by Arun Babu Neelicattu Committed by finswimmer

locker: refactor to reduce code complexity

(cherry picked from commit ed43d94b423a7bf53dfb530a549b686d016ab891)
parent 18abfd7b
......@@ -5,11 +5,14 @@ import re
from copy import deepcopy
from hashlib import sha256
from typing import Dict
from typing import Iterable
from typing import Iterator
from typing import List
from typing import Optional
from typing import Sequence
from typing import Set
from typing import Tuple
from typing import Union
from tomlkit import array
......@@ -185,36 +188,107 @@ class Locker(object):
return packages
@staticmethod
def __get_locked_package(
_dependency, packages_by_name
): # type: (Dependency, Dict[str, List[Package]]) -> Optional[Package]
"""
Internal helper to identify corresponding locked package using dependency
version constraints.
"""
for _package in packages_by_name.get(_dependency.name, []):
if _dependency.constraint.allows(_package.version):
return _package
return None
@classmethod
def __walk_dependency_level(
cls,
dependencies,
level,
pinned_versions,
packages_by_name,
project_level_dependencies,
nested_dependencies,
): # type: (List[Dependency], int, bool, Dict[str, List[Package]], Set[str], Dict[Tuple[str, str], Dependency]) -> Dict[Tuple[str, str], Dependency]
if not dependencies:
return nested_dependencies
next_level_dependencies = []
for requirement in dependencies:
locked_package = cls.__get_locked_package(requirement, packages_by_name)
if locked_package:
for require in locked_package.requires:
if require.marker.is_empty():
require.marker = requirement.marker
else:
require.marker = require.marker.intersect(requirement.marker)
require.marker = require.marker.intersect(locked_package.marker)
next_level_dependencies.append(require)
if requirement.name in project_level_dependencies and level == 0:
# project level dependencies take precedence
continue
if locked_package:
# create dependency from locked package to retain dependency metadata
# if this is not done, we can end-up with incorrect nested dependencies
marker = requirement.marker
requirement = locked_package.to_dependency()
requirement.marker = requirement.marker.intersect(marker)
else:
# we make a copy to avoid any side-effects
requirement = deepcopy(requirement)
if pinned_versions:
requirement.set_constraint(
cls.__get_locked_package(requirement, packages_by_name)
.to_dependency()
.constraint
)
# dependencies use extra to indicate that it was activated via parent
# package's extras, this is not required for nested exports as we assume
# the resolver already selected this dependency
requirement.marker = requirement.marker.without_extras()
key = (requirement.name, requirement.pretty_constraint)
if key not in nested_dependencies:
nested_dependencies[key] = requirement
else:
nested_dependencies[key].marker = nested_dependencies[
key
].marker.intersect(requirement.marker)
return cls.__walk_dependency_level(
dependencies=next_level_dependencies,
level=level + 1,
pinned_versions=pinned_versions,
packages_by_name=packages_by_name,
project_level_dependencies=project_level_dependencies,
nested_dependencies=nested_dependencies,
)
@classmethod
def get_project_dependencies(
cls, project_requires, locked_packages, pinned_versions=False, with_nested=False
): # type: (List[Dependency], List[Package], bool, bool) -> Iterable[Dependency]
# group packages entries by name, this is required because requirement might use
# different constraints
# group packages entries by name, this is required because requirement might use different constraints
packages_by_name = {}
for pkg in locked_packages:
if pkg.name not in packages_by_name:
packages_by_name[pkg.name] = []
packages_by_name[pkg.name].append(pkg)
def __get_locked_package(
_dependency,
): # type: (Dependency) -> Optional[Package]
"""
Internal helper to identify corresponding locked package using dependency
version constraints.
"""
for _package in packages_by_name.get(_dependency.name, []):
if _dependency.constraint.allows(_package.version):
return _package
return None
project_level_dependencies = set()
dependencies = []
for dependency in project_requires:
dependency = deepcopy(dependency)
locked_package = __get_locked_package(dependency)
locked_package = cls.__get_locked_package(dependency, packages_by_name)
if locked_package:
locked_dependency = locked_package.to_dependency()
locked_dependency.marker = dependency.marker.intersect(
......@@ -233,68 +307,14 @@ class Locker(object):
# return only with project level dependencies
return dependencies
nested_dependencies = dict()
def __walk_level(
__dependencies, __level
): # type: (List[Dependency], int) -> None
if not __dependencies:
return
__next_level = []
for requirement in __dependencies:
__locked_package = __get_locked_package(requirement)
if __locked_package:
for require in __locked_package.requires:
if require.marker.is_empty():
require.marker = requirement.marker
else:
require.marker = require.marker.intersect(
requirement.marker
)
require.marker = require.marker.intersect(
__locked_package.marker
)
__next_level.append(require)
if requirement.name in project_level_dependencies and __level == 0:
# project level dependencies take precedence
continue
if __locked_package:
# create dependency from locked package to retain dependency metadata
# if this is not done, we can end-up with incorrect nested dependencies
marker = requirement.marker
requirement = __locked_package.to_dependency()
requirement.marker = requirement.marker.intersect(marker)
else:
# we make a copy to avoid any side-effects
requirement = deepcopy(requirement)
if pinned_versions:
requirement.set_constraint(
__get_locked_package(requirement).to_dependency().constraint
)
# dependencies use extra to indicate that it was activated via parent
# package's extras, this is not required for nested exports as we assume
# the resolver already selected this dependency
requirement.marker = requirement.marker.without_extras()
key = (requirement.name, requirement.pretty_constraint)
if key not in nested_dependencies:
nested_dependencies[key] = requirement
else:
nested_dependencies[key].marker = nested_dependencies[
key
].marker.intersect(requirement.marker)
return __walk_level(__next_level, __level + 1)
__walk_level(dependencies, 0)
nested_dependencies = cls.__walk_dependency_level(
dependencies=dependencies,
level=0,
pinned_versions=pinned_versions,
packages_by_name=packages_by_name,
project_level_dependencies=project_level_dependencies,
nested_dependencies=dict(),
)
# Merge same dependencies using marker union
for requirement in dependencies:
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment