Commit bdfdc628 by Arun Babu Neelicattu

repositories: dedupe logic

This change unifies shared logic across repository implementations and
improves inheritance model.
parent a8088463
......@@ -98,7 +98,7 @@ class DebugResolveCommand(InitCommand):
show_command.init_styles(self.io)
packages = [op.package for op in ops]
repo = Repository(packages)
repo = Repository(packages=packages)
requires = package.all_requires
for pkg in repo.packages:
......
......@@ -14,8 +14,8 @@ if TYPE_CHECKING:
from poetry.core.packages.package import Package
from poetry.packages.project_package import ProjectPackage
from poetry.repositories import Repository
from poetry.repositories.installed_repository import InstalledRepository
from poetry.repositories.repository import Repository
class ShowCommand(GroupCommand):
......
from __future__ import annotations
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from poetry.core.packages.dependency import Dependency
from poetry.core.packages.package import Package
class BaseRepository:
def __init__(self) -> None:
self._packages: list[Package] = []
@property
def packages(self) -> list[Package]:
return self._packages
def has_package(self, package: Package) -> bool:
raise NotImplementedError()
def package(
self, name: str, version: str, extras: list[str] | None = None
) -> Package:
raise NotImplementedError()
def find_packages(self, dependency: Dependency) -> list[Package]:
raise NotImplementedError()
def search(self, query: str) -> list[Package]:
raise NotImplementedError()
from __future__ import annotations
from abc import ABC
from abc import abstractmethod
from typing import TYPE_CHECKING
from cachecontrol.caches import FileCache
from cachy import CacheManager
from poetry.core.semver.helpers import parse_constraint
from poetry.locations import REPOSITORY_CACHE_DIR
from poetry.repositories.repository import Repository
if TYPE_CHECKING:
from poetry.core.packages.package import Package
from poetry.inspection.info import PackageInfo
class CachedRepository(Repository, ABC):
CACHE_VERSION = parse_constraint("1.0.0")
def __init__(self, name: str, cache_group: str, disable_cache: bool = False):
super().__init__(name)
self._disable_cache = disable_cache
self._cache_dir = REPOSITORY_CACHE_DIR / name
self._cache = CacheManager(
{
"default": "releases",
"serializer": "json",
"stores": {
"releases": {"driver": "file", "path": str(self._cache_dir)},
"packages": {"driver": "dict"},
"matches": {"driver": "dict"},
},
}
)
self._cache_control_cache = FileCache(str(self._cache_dir / cache_group))
@abstractmethod
def _get_release_info(self, name: str, version: str) -> dict:
raise NotImplementedError()
def get_release_info(self, name: str, version: str) -> PackageInfo:
"""
Return the release information given a package name and a version.
The information is returned from the cache if it exists
or retrieved from the remote server.
"""
from poetry.inspection.info import PackageInfo
if self._disable_cache:
return PackageInfo.load(self._get_release_info(name, version))
cached = self._cache.remember_forever(
f"{name}:{version}", lambda: self._get_release_info(name, version)
)
cache_version = cached.get("_cache_version", "0.0.0")
if parse_constraint(cache_version) != self.CACHE_VERSION:
# The cache must be updated
self._log(
f"The cache for {name} {version} is outdated. Refreshing.",
level="debug",
)
cached = self._get_release_info(name, version)
self._cache.forever(f"{name}:{version}", cached)
return PackageInfo.load(cached)
def package(
self,
name: str,
version: str,
extras: (list | None) = None,
) -> Package:
return self.get_release_info(name, version).to_package(name=name, extras=extras)
from __future__ import annotations
import contextlib
import hashlib
import os
import urllib
from abc import ABC
from collections import defaultdict
from pathlib import Path
from typing import TYPE_CHECKING
from urllib.parse import quote
import requests
import requests.auth
from cachecontrol import CacheControl
from poetry.core.packages.dependency import Dependency
from poetry.core.packages.utils.link import Link
from poetry.core.version.markers import parse_marker
from poetry.config.config import Config
from poetry.repositories.cached import CachedRepository
from poetry.repositories.exceptions import PackageNotFound
from poetry.repositories.exceptions import RepositoryError
from poetry.repositories.link_sources.html import HTMLPage
from poetry.utils.authenticator import Authenticator
from poetry.utils.helpers import download_file
from poetry.utils.helpers import temporary_directory
from poetry.utils.patterns import wheel_file_re
if TYPE_CHECKING:
from poetry.inspection.info import PackageInfo
class HTTPRepository(CachedRepository, ABC):
def __init__(
self,
name: str,
url: str,
config: Config | None = None,
disable_cache: bool = False,
cert: Path | None = None,
client_cert: Path | None = None,
) -> None:
super().__init__(name, "_http", disable_cache)
self._url = url
self._client_cert = client_cert
self._cert = cert
self._authenticator = Authenticator(
config=config or Config(use_environment=True)
)
self._session = CacheControl(
self._authenticator.session, cache=self._cache_control_cache
)
username, password = self._authenticator.get_credentials_for_url(self._url)
if username is not None and password is not None:
self._authenticator.session.auth = requests.auth.HTTPBasicAuth(
username, password
)
if self._cert:
self._authenticator.session.verify = str(self._cert)
if self._client_cert:
self._authenticator.session.cert = str(self._client_cert)
@property
def session(self) -> CacheControl:
return self._session
def __del__(self) -> None:
with contextlib.suppress(AttributeError):
self._session.close()
@property
def url(self) -> str:
return self._url
@property
def cert(self) -> Path | None:
return self._cert
@property
def client_cert(self) -> Path | None:
return self._client_cert
@property
def authenticated_url(self) -> str:
if not self._session.auth:
return self.url
parsed = urllib.parse.urlparse(self.url)
username = quote(self._session.auth.username, safe="")
password = quote(self._session.auth.password, safe="")
return f"{parsed.scheme}://{username}:{password}@{parsed.netloc}{parsed.path}"
def _download(self, url: str, dest: str) -> None:
return download_file(url, dest, session=self.session)
def _get_info_from_wheel(self, url: str) -> PackageInfo:
from poetry.inspection.info import PackageInfo
wheel_name = urllib.parse.urlparse(url).path.rsplit("/")[-1]
self._log(f"Downloading wheel: {wheel_name}", level="debug")
filename = os.path.basename(wheel_name)
with temporary_directory() as temp_dir:
filepath = Path(temp_dir) / filename
self._download(url, str(filepath))
return PackageInfo.from_wheel(filepath)
def _get_info_from_sdist(self, url: str) -> PackageInfo:
from poetry.inspection.info import PackageInfo
sdist_name = urllib.parse.urlparse(url).path
sdist_name_log = sdist_name.rsplit("/")[-1]
self._log(f"Downloading sdist: {sdist_name_log}", level="debug")
filename = os.path.basename(sdist_name)
with temporary_directory() as temp_dir:
filepath = Path(temp_dir) / filename
self._download(url, str(filepath))
return PackageInfo.from_sdist(filepath)
def _get_info_from_urls(self, urls: dict[str, list[str]]) -> PackageInfo:
# Checking wheels first as they are more likely to hold
# the necessary information
if "bdist_wheel" in urls:
# Check for a universal wheel
wheels = urls["bdist_wheel"]
universal_wheel = None
universal_python2_wheel = None
universal_python3_wheel = None
platform_specific_wheels = []
for wheel in wheels:
link = Link(wheel)
m = wheel_file_re.match(link.filename)
if not m:
continue
pyver = m.group("pyver")
abi = m.group("abi")
plat = m.group("plat")
if abi == "none" and plat == "any":
# Universal wheel
if pyver == "py2.py3":
# Any Python
universal_wheel = wheel
elif pyver == "py2":
universal_python2_wheel = wheel
else:
universal_python3_wheel = wheel
else:
platform_specific_wheels.append(wheel)
if universal_wheel is not None:
return self._get_info_from_wheel(universal_wheel)
info = None
if universal_python2_wheel and universal_python3_wheel:
info = self._get_info_from_wheel(universal_python2_wheel)
py3_info = self._get_info_from_wheel(universal_python3_wheel)
if py3_info.requires_dist:
if not info.requires_dist:
info.requires_dist = py3_info.requires_dist
return info
py2_requires_dist = {
Dependency.create_from_pep_508(r).to_pep_508()
for r in info.requires_dist
}
py3_requires_dist = {
Dependency.create_from_pep_508(r).to_pep_508()
for r in py3_info.requires_dist
}
base_requires_dist = py2_requires_dist & py3_requires_dist
py2_only_requires_dist = py2_requires_dist - py3_requires_dist
py3_only_requires_dist = py3_requires_dist - py2_requires_dist
# Normalizing requires_dist
requires_dist = list(base_requires_dist)
for requirement in py2_only_requires_dist:
dep = Dependency.create_from_pep_508(requirement)
dep.marker = dep.marker.intersect(
parse_marker("python_version == '2.7'")
)
requires_dist.append(dep.to_pep_508())
for requirement in py3_only_requires_dist:
dep = Dependency.create_from_pep_508(requirement)
dep.marker = dep.marker.intersect(
parse_marker("python_version >= '3'")
)
requires_dist.append(dep.to_pep_508())
info.requires_dist = sorted(set(requires_dist))
if info:
return info
# Prefer non platform specific wheels
if universal_python3_wheel:
return self._get_info_from_wheel(universal_python3_wheel)
if universal_python2_wheel:
return self._get_info_from_wheel(universal_python2_wheel)
if platform_specific_wheels and "sdist" not in urls:
# Pick the first wheel available and hope for the best
return self._get_info_from_wheel(platform_specific_wheels[0])
return self._get_info_from_sdist(urls["sdist"][0])
def _links_to_data(self, links: list[Link], data: PackageInfo) -> dict:
if not links:
raise PackageNotFound(
f'No valid distribution links found for package: "{data.name}" version:'
f' "{data.version}"'
)
urls = defaultdict(list)
files = []
for link in links:
if link.is_wheel:
urls["bdist_wheel"].append(link.url)
elif link.filename.endswith(
(".tar.gz", ".zip", ".bz2", ".xz", ".Z", ".tar")
):
urls["sdist"].append(link.url)
file_hash = f"{link.hash_name}:{link.hash}" if link.hash else None
if not link.hash or (
link.hash_name not in ("sha256", "sha384", "sha512")
and hasattr(hashlib, link.hash_name)
):
with temporary_directory() as temp_dir:
filepath = Path(temp_dir) / link.filename
self._download(link.url, str(filepath))
known_hash = (
getattr(hashlib, link.hash_name)() if link.hash_name else None
)
required_hash = hashlib.sha256()
chunksize = 4096
with filepath.open("rb") as f:
while True:
chunk = f.read(chunksize)
if not chunk:
break
if known_hash:
known_hash.update(chunk)
required_hash.update(chunk)
if not known_hash or known_hash.hexdigest() == link.hash:
file_hash = f"{required_hash.name}:{required_hash.hexdigest()}"
files.append({"file": link.filename, "hash": file_hash})
data.files = files
info = self._get_info_from_urls(urls)
data.summary = info.summary
data.requires_dist = info.requires_dist
data.requires_python = info.requires_python
return data.asdict()
def _get_response(self, endpoint: str) -> requests.Response | None:
url = self._url + endpoint
try:
response = self.session.get(url)
if response.status_code in (401, 403):
self._log(
f"Authorization error accessing {url}",
level="warning",
)
return None
if response.status_code == 404:
return None
response.raise_for_status()
except requests.exceptions.HTTPError as e:
raise RepositoryError(e)
if response.url != url:
self._log(
f"Response URL {response.url} differs from request URL {url}",
level="debug",
)
return response
def _get_page(self, endpoint: str) -> HTMLPage | None:
response = self._get_response(endpoint)
if not response:
return None
return HTMLPage(response.url, response.text)
from __future__ import annotations
import cgi
import hashlib
import re
import urllib.parse
import warnings
from collections import defaultdict
from html import unescape
from pathlib import Path
from typing import TYPE_CHECKING
from typing import Any
from typing import Iterator
from urllib.parse import quote
import requests.auth
import requests.exceptions
from cachecontrol import CacheControl
from cachecontrol.caches.file_cache import FileCache
from cachy import CacheManager
from poetry.core.packages.package import Package
from poetry.core.packages.utils.link import Link
from poetry.core.semver.helpers import parse_constraint
from poetry.core.semver.version import Version
from poetry.core.semver.version_constraint import VersionConstraint
from poetry.core.semver.version_range import VersionRange
from poetry.config.config import Config
from poetry.inspection.info import PackageInfo
from poetry.locations import REPOSITORY_CACHE_DIR
from poetry.repositories.exceptions import PackageNotFound
from poetry.repositories.exceptions import RepositoryError
from poetry.repositories.pypi_repository import PyPiRepository
from poetry.utils.authenticator import Authenticator
from poetry.repositories.http import HTTPRepository
from poetry.repositories.link_sources.html import SimpleRepositoryPage
from poetry.utils.helpers import canonicalize_name
from poetry.utils.helpers import download_file
from poetry.utils.helpers import temporary_directory
from poetry.utils.patterns import wheel_file_re
if TYPE_CHECKING:
from poetry.core.packages.dependency import Dependency
with warnings.catch_warnings():
warnings.simplefilter("ignore")
import html5lib
class Page:
VERSION_REGEX = re.compile(r"(?i)([a-z0-9_\-.]+?)-(?=\d)([a-z0-9_.!+-]+)")
SUPPORTED_FORMATS = [
".tar.gz",
".whl",
".zip",
".tar.bz2",
".tar.xz",
".tar.Z",
".tar",
]
def __init__(self, url: str, content: str, headers: dict[str, Any]) -> None:
if not url.endswith("/"):
url += "/"
self._url = url
encoding = None
if headers and "Content-Type" in headers:
content_type, params = cgi.parse_header(headers["Content-Type"])
if "charset" in params:
encoding = params["charset"]
self._content = content
if encoding is None:
self._parsed = html5lib.parse(content, namespaceHTMLElements=False)
else:
self._parsed = html5lib.parse(
content, transport_encoding=encoding, namespaceHTMLElements=False
)
@property
def versions(self) -> Iterator[Version]:
seen = set()
for link in self.links:
version = self.link_version(link)
if not version:
continue
if version in seen:
continue
seen.add(version)
from pathlib import Path
yield version
@property
def links(self) -> Iterator[Link]:
for anchor in self._parsed.findall(".//a"):
if anchor.get("href"):
href = anchor.get("href")
url = self.clean_link(urllib.parse.urljoin(self._url, href))
pyrequire = anchor.get("data-requires-python")
pyrequire = unescape(pyrequire) if pyrequire else None
link = Link(url, self, requires_python=pyrequire)
if link.ext not in self.SUPPORTED_FORMATS:
continue
yield link
def links_for_version(self, version: Version) -> Iterator[Link]:
for link in self.links:
if self.link_version(link) == version:
yield link
def link_version(self, link: Link) -> Version | None:
m = wheel_file_re.match(link.filename)
if m:
version = m.group("ver")
else:
info, ext = link.splitext()
match = self.VERSION_REGEX.match(info)
if not match:
return None
version = match.group(2)
try:
version = Version.parse(version)
except ValueError:
return None
return version
_clean_re = re.compile(r"[^a-z0-9$&+,/:;=?@.#%_\\|-]", re.I)
from poetry.core.packages.dependency import Dependency
from poetry.core.packages.utils.link import Link
def clean_link(self, url: str) -> str:
"""Makes sure a link is fully encoded. That is, if a ' ' shows up in
the link, it will be rewritten to %20 (while not over-quoting
% or other characters)."""
return self._clean_re.sub(lambda match: f"%{ord(match.group(0)):02x}", url)
from poetry.config.config import Config
# TODO: revisit whether the LegacyRepository should inherit from PyPiRepository.
# <https://github.com/python-poetry/poetry/pull/4755#discussion_r748865374>.
class LegacyRepository(PyPiRepository):
class LegacyRepository(HTTPRepository):
def __init__(
self,
name: str,
......@@ -162,83 +34,15 @@ class LegacyRepository(PyPiRepository):
if name == "pypi":
raise ValueError("The name [pypi] is reserved for repositories")
self._packages = []
self._name = name
self._url = url.rstrip("/")
self._client_cert = client_cert
self._cert = cert
self._cache_dir = REPOSITORY_CACHE_DIR / name
self._cache = CacheManager(
{
"default": "releases",
"serializer": "json",
"stores": {
"releases": {"driver": "file", "path": str(self._cache_dir)},
"packages": {"driver": "dict"},
"matches": {"driver": "dict"},
},
}
)
self._authenticator = Authenticator(
config=config or Config(use_environment=True)
super().__init__(
name, url.rstrip("/"), config, disable_cache, cert, client_cert
)
self._session = CacheControl(
self._authenticator.session, cache=FileCache(str(self._cache_dir / "_http"))
)
username, password = self._authenticator.get_credentials_for_url(self._url)
if username is not None and password is not None:
self._authenticator.session.auth = requests.auth.HTTPBasicAuth(
username, password
)
if self._cert:
self._authenticator.session.verify = str(self._cert)
if self._client_cert:
self._authenticator.session.cert = str(self._client_cert)
self._disable_cache = disable_cache
@property
def cert(self) -> Path | None:
return self._cert
@property
def client_cert(self) -> Path | None:
return self._client_cert
@property
def authenticated_url(self) -> str:
if not self._session.auth:
return self.url
parsed = urllib.parse.urlparse(self.url)
username = quote(self._session.auth.username, safe="")
password = quote(self._session.auth.password, safe="")
return f"{parsed.scheme}://{username}:{password}@{parsed.netloc}{parsed.path}"
def find_packages(self, dependency: Dependency) -> list[Package]:
packages = []
constraint = dependency.constraint
if constraint is None:
constraint = "*"
if not isinstance(constraint, VersionConstraint):
constraint = parse_constraint(constraint)
allow_prereleases = dependency.allows_prereleases()
if isinstance(constraint, VersionRange) and (
constraint.max is not None
and constraint.max.is_unstable()
or constraint.min is not None
and constraint.min.is_unstable()
):
allow_prereleases = True
constraint, allow_prereleases = self._get_constraints_from_dependency(
dependency
)
key = dependency.name
if not constraint.is_any():
......@@ -254,7 +58,7 @@ class LegacyRepository(PyPiRepository):
return []
versions = []
for version in page.versions:
for version in page.versions(dependency.name):
if version.is_unstable() and not allow_prereleases:
if constraint.is_any():
# we need this when all versions of the package are pre-releases
......@@ -320,14 +124,18 @@ class LegacyRepository(PyPiRepository):
if page is None:
return []
return list(page.links_for_version(package.version))
return list(page.links_for_version(package.name, package.version))
def _get_release_info(self, name: str, version: str) -> dict:
page = self._get_page(f"/{canonicalize_name(name).replace('.', '-')}/")
if page is None:
raise PackageNotFound(f'No package named "{name}"')
data = PackageInfo(
links = list(page.links_for_version(name, Version.parse(version)))
return self._links_to_data(
links,
PackageInfo(
name=name,
version=version,
summary="",
......@@ -336,87 +144,11 @@ class LegacyRepository(PyPiRepository):
requires_python=None,
files=[],
cache_version=str(self.CACHE_VERSION),
),
)
links = list(page.links_for_version(Version.parse(version)))
if not links:
raise PackageNotFound(
f'No valid distribution links found for package: "{name}" version:'
f' "{version}"'
)
urls = defaultdict(list)
files = []
for link in links:
if link.is_wheel:
urls["bdist_wheel"].append(link.url)
elif link.filename.endswith(
(".tar.gz", ".zip", ".bz2", ".xz", ".Z", ".tar")
):
urls["sdist"].append(link.url)
file_hash = f"{link.hash_name}:{link.hash}" if link.hash else None
if not link.hash or (
link.hash_name not in ("sha256", "sha384", "sha512")
and hasattr(hashlib, link.hash_name)
):
with temporary_directory() as temp_dir:
filepath = Path(temp_dir) / link.filename
self._download(link.url, str(filepath))
known_hash = (
getattr(hashlib, link.hash_name)() if link.hash_name else None
)
required_hash = hashlib.sha256()
chunksize = 4096
with filepath.open("rb") as f:
while True:
chunk = f.read(chunksize)
if not chunk:
break
if known_hash:
known_hash.update(chunk)
required_hash.update(chunk)
if not known_hash or known_hash.hexdigest() == link.hash:
file_hash = f"{required_hash.name}:{required_hash.hexdigest()}"
files.append({"file": link.filename, "hash": file_hash})
data.files = files
info = self._get_info_from_urls(urls)
data.summary = info.summary
data.requires_dist = info.requires_dist
data.requires_python = info.requires_python
return data.asdict()
def _get_page(self, endpoint: str) -> Page | None:
url = self._url + endpoint
try:
response = self.session.get(url)
if response.status_code in (401, 403):
self._log(
f"Authorization error accessing {url}",
level="warning",
)
return None
if response.status_code == 404:
def _get_page(self, endpoint: str) -> SimpleRepositoryPage | None:
response = self._get_response(endpoint)
if not response:
return None
response.raise_for_status()
except requests.exceptions.HTTPError as e:
raise RepositoryError(e)
if response.url != url:
self._log(
f"Response URL {response.url} differs from request URL {url}",
level="debug",
)
return Page(response.url, response.content, response.headers)
def _download(self, url: str, dest: str) -> None:
return download_file(url, dest, session=self.session)
return SimpleRepositoryPage(response.url, response.text)
from __future__ import annotations
import contextlib
import re
from abc import abstractmethod
from typing import TYPE_CHECKING
from typing import Iterator
from poetry.core.packages.package import Package
from poetry.core.semver.version import Version
from poetry.utils.helpers import canonicalize_name
from poetry.utils.patterns import sdist_file_re
from poetry.utils.patterns import wheel_file_re
if TYPE_CHECKING:
from poetry.core.packages.utils.link import Link
class LinkSource:
VERSION_REGEX = re.compile(r"(?i)([a-z0-9_\-.]+?)-(?=\d)([a-z0-9_.!+-]+)")
CLEAN_REGEX = re.compile(r"[^a-z0-9$&+,/:;=?@.#%_\\|-]", re.I)
SUPPORTED_FORMATS = [
".tar.gz",
".whl",
".zip",
".tar.bz2",
".tar.xz",
".tar.Z",
".tar",
]
def __init__(self, url: str) -> None:
self._url = url
@property
def url(self) -> str:
return self._url
def versions(self, name: str) -> Iterator[Version]:
name = canonicalize_name(name)
seen: set[Version] = set()
for link in self.links:
pkg = self.link_package_data(link)
if pkg.name == name and pkg.version and pkg.version not in seen:
seen.add(pkg.version)
yield pkg.version
@property
def packages(self) -> Iterator[Package]:
for link in self.links:
pkg = self.link_package_data(link)
if pkg.name and pkg.version:
yield pkg
@property
@abstractmethod
def links(self) -> Iterator[Link]:
raise NotImplementedError()
def link_package_data(self, link: Link) -> Package:
name, version = None, None
m = wheel_file_re.match(link.filename) or sdist_file_re.match(link.filename)
if m:
name = canonicalize_name(m.group("name"))
version = m.group("ver")
else:
info, ext = link.splitext()
match = self.VERSION_REGEX.match(info)
if match:
version = match.group(2)
with contextlib.suppress(ValueError):
version = Version.parse(version)
return Package(name, version, source_url=link.url)
def links_for_version(self, name: str, version: Version) -> Iterator[Link]:
name = canonicalize_name(name)
for link in self.links:
pkg = self.link_package_data(link)
if pkg.name == name and pkg.version and pkg.version == version:
yield link
def clean_link(self, url: str) -> str:
"""Makes sure a link is fully encoded. That is, if a ' ' shows up in
the link, it will be rewritten to %20 (while not over-quoting
% or other characters)."""
return self.CLEAN_REGEX.sub(lambda match: f"%{ord(match.group(0)):02x}", url)
from __future__ import annotations
import urllib.parse
import warnings
from html import unescape
from typing import Iterator
from poetry.core.packages.utils.link import Link
from poetry.repositories.link_sources.base import LinkSource
with warnings.catch_warnings():
warnings.simplefilter("ignore")
import html5lib
class HTMLPage(LinkSource):
def __init__(self, url: str, content: str) -> None:
super().__init__(url=url)
self._parsed = html5lib.parse(content, namespaceHTMLElements=False)
@property
def links(self) -> Iterator[Link]:
for anchor in self._parsed.findall(".//a"):
if anchor.get("href"):
href = anchor.get("href")
url = self.clean_link(urllib.parse.urljoin(self._url, href))
pyrequire = anchor.get("data-requires-python")
pyrequire = unescape(pyrequire) if pyrequire else None
link = Link(url, self, requires_python=pyrequire)
if link.ext not in self.SUPPORTED_FORMATS:
continue
yield link
class SimpleRepositoryPage(HTMLPage):
def __init__(self, url: str, content: str) -> None:
if not url.endswith("/"):
url += "/"
super().__init__(url=url, content=content)
......@@ -3,23 +3,23 @@ from __future__ import annotations
from contextlib import suppress
from typing import TYPE_CHECKING
from poetry.repositories.base_repository import BaseRepository
from poetry.repositories.exceptions import PackageNotFound
from poetry.repositories.repository import Repository
if TYPE_CHECKING:
from poetry.core.packages.dependency import Dependency
from poetry.core.packages.package import Package
from poetry.repositories.repository import Repository
class Pool(BaseRepository):
class Pool(Repository):
def __init__(
self,
repositories: list[Repository] | None = None,
ignore_repository_names: bool = False,
) -> None:
super().__init__()
if repositories is None:
repositories = []
......
from __future__ import annotations
import logging
import os
import urllib.parse
from collections import defaultdict
from pathlib import Path
from typing import TYPE_CHECKING
import requests
from cachecontrol import CacheControl
from cachecontrol.caches.file_cache import FileCache
from cachecontrol.controller import logger as cache_control_logger
from cachy import CacheManager
from html5lib.html5parser import parse
from poetry.core.packages.dependency import Dependency
from poetry.core.packages.package import Package
from poetry.core.packages.utils.link import Link
from poetry.core.semver.helpers import parse_constraint
from poetry.core.semver.version_constraint import VersionConstraint
from poetry.core.semver.version_range import VersionRange
from poetry.core.version.exceptions import InvalidVersion
from poetry.core.version.markers import parse_marker
from poetry.locations import REPOSITORY_CACHE_DIR
from poetry.repositories.exceptions import PackageNotFound
from poetry.repositories.remote_repository import RemoteRepository
from poetry.repositories.http import HTTPRepository
from poetry.utils._compat import to_str
from poetry.utils.helpers import download_file
from poetry.utils.helpers import temporary_directory
from poetry.utils.patterns import wheel_file_re
cache_control_logger.setLevel(logging.ERROR)
......@@ -39,70 +24,30 @@ logger = logging.getLogger(__name__)
if TYPE_CHECKING:
from poetry.inspection.info import PackageInfo
from poetry.core.packages.dependency import Dependency
class PyPiRepository(RemoteRepository):
CACHE_VERSION = parse_constraint("1.0.0")
class PyPiRepository(HTTPRepository):
def __init__(
self,
url: str = "https://pypi.org/",
disable_cache: bool = False,
fallback: bool = True,
) -> None:
super().__init__(url.rstrip("/") + "/simple/")
super().__init__(
"PyPI", url.rstrip("/") + "/simple/", disable_cache=disable_cache
)
self._base_url = url
self._disable_cache = disable_cache
self._fallback = fallback
release_cache_dir = REPOSITORY_CACHE_DIR / "pypi"
self._cache = CacheManager(
{
"default": "releases",
"serializer": "json",
"stores": {
"releases": {"driver": "file", "path": str(release_cache_dir)},
"packages": {"driver": "dict"},
},
}
)
self._cache_control_cache = FileCache(str(release_cache_dir / "_http"))
self._session = CacheControl(
requests.session(), cache=self._cache_control_cache
)
self._name = "PyPI"
@property
def session(self) -> CacheControl:
return self._session
def __del__(self) -> None:
self._session.close()
def find_packages(self, dependency: Dependency) -> list[Package]:
"""
Find packages on the remote server.
"""
constraint = dependency.constraint
if constraint is None:
constraint = "*"
if not isinstance(constraint, VersionConstraint):
constraint = parse_constraint(constraint)
allow_prereleases = dependency.allows_prereleases()
if isinstance(constraint, VersionRange) and (
constraint.max is not None
and constraint.max.is_unstable()
or constraint.min is not None
and constraint.min.is_unstable()
):
allow_prereleases = True
constraint, allow_prereleases = self._get_constraints_from_dependency(
dependency
)
try:
info = self.get_package_info(dependency.name)
......@@ -152,14 +97,6 @@ class PyPiRepository(RemoteRepository):
return packages or ignored_pre_release_packages
def package(
self,
name: str,
version: str,
extras: (list | None) = None,
) -> Package:
return self.get_release_info(name, version).to_package(name=name, extras=extras)
def search(self, query: str) -> list[Package]:
results = []
......@@ -212,35 +149,6 @@ class PyPiRepository(RemoteRepository):
return data
def get_release_info(self, name: str, version: str) -> PackageInfo:
"""
Return the release information given a package name and a version.
The information is returned from the cache if it exists
or retrieved from the remote server.
"""
from poetry.inspection.info import PackageInfo
if self._disable_cache:
return PackageInfo.load(self._get_release_info(name, version))
cached = self._cache.remember_forever(
f"{name}:{version}", lambda: self._get_release_info(name, version)
)
cache_version = cached.get("_cache_version", "0.0.0")
if parse_constraint(cache_version) != self.CACHE_VERSION:
# The cache must be updated
self._log(
f"The cache for {name} {version} is outdated. Refreshing.",
level="debug",
)
cached = self._get_release_info(name, version)
self._cache.forever(f"{name}:{version}", cached)
return PackageInfo.load(cached)
def find_links_for_package(self, package: Package) -> list[Link]:
json_data = self._get(f"pypi/{package.name}/{package.version}/json")
if json_data is None:
......@@ -253,7 +161,9 @@ class PyPiRepository(RemoteRepository):
return links
def _get_release_info(self, name: str, version: str) -> dict:
def _get_release_info(
self, name: str, version: str
) -> dict[str, str | list[str] | None]:
from poetry.inspection.info import PackageInfo
self._log(f"Getting info for {name} ({version}) from PyPI", "debug")
......@@ -329,127 +239,3 @@ class PyPiRepository(RemoteRepository):
return None
return json_response.json()
def _get_info_from_urls(self, urls: dict[str, list[str]]) -> PackageInfo:
# Checking wheels first as they are more likely to hold
# the necessary information
if "bdist_wheel" in urls:
# Check for a universal wheel
wheels = urls["bdist_wheel"]
universal_wheel = None
universal_python2_wheel = None
universal_python3_wheel = None
platform_specific_wheels = []
for wheel in wheels:
link = Link(wheel)
m = wheel_file_re.match(link.filename)
if not m:
continue
pyver = m.group("pyver")
abi = m.group("abi")
plat = m.group("plat")
if abi == "none" and plat == "any":
# Universal wheel
if pyver == "py2.py3":
# Any Python
universal_wheel = wheel
elif pyver == "py2":
universal_python2_wheel = wheel
else:
universal_python3_wheel = wheel
else:
platform_specific_wheels.append(wheel)
if universal_wheel is not None:
return self._get_info_from_wheel(universal_wheel)
info = None
if universal_python2_wheel and universal_python3_wheel:
info = self._get_info_from_wheel(universal_python2_wheel)
py3_info = self._get_info_from_wheel(universal_python3_wheel)
if py3_info.requires_dist:
if not info.requires_dist:
info.requires_dist = py3_info.requires_dist
return info
py2_requires_dist = {
Dependency.create_from_pep_508(r).to_pep_508()
for r in info.requires_dist
}
py3_requires_dist = {
Dependency.create_from_pep_508(r).to_pep_508()
for r in py3_info.requires_dist
}
base_requires_dist = py2_requires_dist & py3_requires_dist
py2_only_requires_dist = py2_requires_dist - py3_requires_dist
py3_only_requires_dist = py3_requires_dist - py2_requires_dist
# Normalizing requires_dist
requires_dist = list(base_requires_dist)
for requirement in py2_only_requires_dist:
dep = Dependency.create_from_pep_508(requirement)
dep.marker = dep.marker.intersect(
parse_marker("python_version == '2.7'")
)
requires_dist.append(dep.to_pep_508())
for requirement in py3_only_requires_dist:
dep = Dependency.create_from_pep_508(requirement)
dep.marker = dep.marker.intersect(
parse_marker("python_version >= '3'")
)
requires_dist.append(dep.to_pep_508())
info.requires_dist = sorted(set(requires_dist))
if info:
return info
# Prefer non platform specific wheels
if universal_python3_wheel:
return self._get_info_from_wheel(universal_python3_wheel)
if universal_python2_wheel:
return self._get_info_from_wheel(universal_python2_wheel)
if platform_specific_wheels and "sdist" not in urls:
# Pick the first wheel available and hope for the best
return self._get_info_from_wheel(platform_specific_wheels[0])
return self._get_info_from_sdist(urls["sdist"][0])
def _get_info_from_wheel(self, url: str) -> PackageInfo:
from poetry.inspection.info import PackageInfo
wheel_name = urllib.parse.urlparse(url).path.rsplit("/")[-1]
self._log(f"Downloading wheel: {wheel_name}", level="debug")
filename = os.path.basename(wheel_name)
with temporary_directory() as temp_dir:
filepath = Path(temp_dir) / filename
self._download(url, str(filepath))
return PackageInfo.from_wheel(filepath)
def _get_info_from_sdist(self, url: str) -> PackageInfo:
from poetry.inspection.info import PackageInfo
sdist_name = urllib.parse.urlparse(url).path
self._log(f"Downloading sdist: {sdist_name.rsplit('/')[-1]}", level="debug")
filename = os.path.basename(sdist_name)
with temporary_directory() as temp_dir:
filepath = Path(temp_dir) / filename
self._download(url, str(filepath))
return PackageInfo.from_sdist(filepath)
def _download(self, url: str, dest: str) -> None:
return download_file(url, dest, session=self.session)
def _log(self, msg: str, level: str = "info") -> None:
getattr(logger, level)(f"<debug>{self._name}:</debug> {msg}")
from __future__ import annotations
from poetry.repositories.repository import Repository
class RemoteRepository(Repository):
def __init__(self, url: str) -> None:
self._url = url
super().__init__()
@property
def url(self) -> str:
return self._url
@property
def authenticated_url(self) -> str:
return self._url
from __future__ import annotations
import logging
from typing import TYPE_CHECKING
from poetry.repositories.base_repository import BaseRepository
from poetry.core.semver.helpers import parse_constraint
from poetry.core.semver.version_constraint import VersionConstraint
from poetry.core.semver.version_range import VersionRange
if TYPE_CHECKING:
from poetry.core.packages.dependency import Dependency
from poetry.core.packages.package import Package
from poetry.core.packages.utils.link import Link
from poetry.core.semver.helpers import VersionTypes
class Repository(BaseRepository):
def __init__(self, packages: list[Package] = None, name: str = None) -> None:
super().__init__()
class Repository:
def __init__(self, name: str = None, packages: list[Package] = None) -> None:
self._name = name
self._packages: list[Package] = []
if packages is None:
packages = []
for package in packages:
for package in packages or []:
self.add_package(package)
@property
def name(self) -> str | None:
return self._name
def package(
self, name: str, version: str, extras: list[str] | None = None
) -> Package:
name = name.lower()
for package in self.packages:
if name == package.name and package.version.text == version:
return package.clone()
@property
def packages(self) -> list[Package]:
return self._packages
def find_packages(self, dependency: Dependency) -> list[Package]:
from poetry.core.semver.helpers import parse_constraint
from poetry.core.semver.version_constraint import VersionConstraint
from poetry.core.semver.version_range import VersionRange
constraint = dependency.constraint
packages = []
ignored_pre_release_packages = []
if constraint is None:
constraint = "*"
if not isinstance(constraint, VersionConstraint):
constraint = parse_constraint(constraint)
allow_prereleases = dependency.allows_prereleases()
if isinstance(constraint, VersionRange) and (
constraint.max is not None
and constraint.max.is_unstable()
or constraint.min is not None
and constraint.min.is_unstable()
):
allow_prereleases = True
constraint, allow_prereleases = self._get_constraints_from_dependency(
dependency
)
for package in self.packages:
if dependency.name == package.name:
......@@ -103,9 +82,6 @@ class Repository(BaseRepository):
if index is not None:
del self._packages[index]
def find_links_for_package(self, package: Package) -> list[Link]:
return []
def search(self, query: str) -> list[Package]:
results: list[Package] = []
......@@ -115,5 +91,44 @@ class Repository(BaseRepository):
return results
@staticmethod
def _get_constraints_from_dependency(
dependency: Dependency,
) -> tuple[VersionTypes, bool]:
constraint = dependency.constraint
if constraint is None:
constraint = "*"
if not isinstance(constraint, VersionConstraint):
constraint = parse_constraint(constraint)
allow_prereleases = dependency.allows_prereleases()
if isinstance(constraint, VersionRange) and (
constraint.max is not None
and constraint.max.is_unstable()
or constraint.min is not None
and constraint.min.is_unstable()
):
allow_prereleases = True
return constraint, allow_prereleases
def _log(self, msg: str, level: str = "info") -> None:
getattr(logging.getLogger(self.__class__.__name__), level)(
f"<debug>{self.name}:</debug> {msg}"
)
def __len__(self) -> int:
return len(self._packages)
def find_links_for_package(self, package: Package) -> list[Link]:
return []
def package(
self, name: str, version: str, extras: list[str] | None = None
) -> Package:
name = name.lower()
for package in self.packages:
if name == package.name and package.version.text == version:
return package.clone()
......@@ -12,3 +12,8 @@ wheel_file_re = re.compile(
r"\.whl|\.dist-info$",
re.VERBOSE,
)
sdist_file_re = re.compile(
r"^(?P<namever>(?P<name>.+?)-(?P<ver>\d.*?))"
r"(\.sdist)?\.(?P<format>(zip|tar(\.(gz|bz2|xz|Z))?))$"
)
......@@ -191,7 +191,7 @@ def download_mock(mocker: MockerFixture) -> None:
# Patch download to not download anything but to just copy from fixtures
mocker.patch("poetry.utils.helpers.download_file", new=mock_download)
mocker.patch("poetry.puzzle.provider.download_file", new=mock_download)
mocker.patch("poetry.repositories.pypi_repository.download_file", new=mock_download)
mocker.patch("poetry.repositories.http.download_file", new=mock_download)
@pytest.fixture(autouse=True)
......
......@@ -14,7 +14,7 @@ from poetry.factory import Factory
from poetry.repositories.exceptions import PackageNotFound
from poetry.repositories.exceptions import RepositoryError
from poetry.repositories.legacy_repository import LegacyRepository
from poetry.repositories.legacy_repository import Page
from poetry.repositories.link_sources.html import SimpleRepositoryPage
try:
......@@ -35,7 +35,7 @@ class MockRepository(LegacyRepository):
def __init__(self) -> None:
super().__init__("legacy", url="http://legacy.foo.bar", disable_cache=True)
def _get_page(self, endpoint: str) -> Page | None:
def _get_page(self, endpoint: str) -> SimpleRepositoryPage | None:
parts = endpoint.split("/")
name = parts[1]
......@@ -44,7 +44,7 @@ class MockRepository(LegacyRepository):
return
with fixture.open(encoding="utf-8") as f:
return Page(self._url + endpoint, f.read(), {})
return SimpleRepositoryPage(self._url + endpoint, f.read())
def _download(self, url: str, dest: Path) -> None:
filename = urlparse.urlparse(url).path.rsplit("/")[-1]
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment