Commit 977a72ca by Arun Babu Neelicattu Committed by Steph Samson

site: use importlib instead of adhoc file searches

This change replaces various cases in which installed distribution and
file look-ups used path searches with importlib.metadata backed
look-ups.

This fixes issues with `dist-info` lookup for PEP610 implementation as
well as `.pth` file look-up and `dist-info` removal.
parent bf18a8f7
......@@ -714,19 +714,6 @@ class Executor:
def _should_write_operation(self, operation: Operation) -> bool:
return not operation.skipped or self._dry_run or self._verbose
@staticmethod
def _package_dist_info_path(package: "Package") -> Path:
from poetry.core.masonry.utils.helpers import escape_name
from poetry.core.masonry.utils.helpers import escape_version
return Path(
f"{escape_name(package.pretty_name)}-{escape_version(package.version.text)}.dist-info"
)
@classmethod
def _direct_url_json_path(cls, package: "Package") -> Path:
return cls._package_dist_info_path(package) / "direct_url.json"
def _save_url_reference(self, operation: "OperationTypes") -> None:
"""
Create and store a PEP-610 `direct_url.json` file, if needed.
......@@ -742,11 +729,14 @@ class Executor:
# distribution.
# That's not what we want so we remove the direct_url.json file,
# if it exists.
for direct_url in self._env.site_packages.find(
self._direct_url_json_path(package), True
for (
direct_url_json
) in self._env.site_packages.find_distribution_direct_url_json_files(
distribution_name=package.name, writable_only=True
):
direct_url.unlink()
# We can't use unlink(missing_ok=True) because it's not always available
if direct_url_json.exists():
direct_url_json.unlink()
return
url_reference = None
......@@ -761,15 +751,13 @@ class Executor:
url_reference = self._create_file_url_reference(package)
if url_reference:
for path in self._env.site_packages.find(
self._package_dist_info_path(package), writable_only=True
for dist in self._env.site_packages.distributions(
name=package.name, writable_only=True
):
self._env.site_packages.write_text(
path / "direct_url.json",
dist._path.joinpath("direct_url.json").write_text(
json.dumps(url_reference),
encoding="utf-8",
)
break
def _create_git_url_reference(
self, package: "Package"
......
......@@ -117,7 +117,9 @@ class PipInstaller(BaseInstaller):
raise
# This is a workaround for https://github.com/pypa/pip/issues/4176
for nspkg_pth_file in self._env.site_packages.find(f"{package.name}-nspkg.pth"):
for nspkg_pth_file in self._env.site_packages.find_distribution_nspkg_pth_files(
distribution_name=package.name
):
nspkg_pth_file.unlink()
# If we have a VCS package, remove its source directory
......
......@@ -61,6 +61,15 @@ class EditableBuilder(Builder):
self._run_build_script(self._package.build_script)
for removed in self._env.site_packages.remove_distribution_files(
distribution_name=self._package.name
):
self._debug(
" - Removed <c2>{}</c2> directory from <b>{}</b>".format(
removed.name, removed.parent
)
)
added_files = []
added_files += self._add_pth()
added_files += self._add_scripts()
......@@ -115,6 +124,18 @@ class EditableBuilder(Builder):
content += decode(path + os.linesep)
pth_file = Path(self._module.name).with_suffix(".pth")
# remove any pre-existing pth files for this package
for file in self._env.site_packages.find(path=pth_file, writable_only=True):
self._debug(
" - Removing existing <c2>{}</c2> from <b>{}</b> for {}".format(
file.name, file.parent, self._poetry.file.parent
)
)
# We can't use unlink(missing_ok=True) because it's not always available
if file.exists():
file.unlink()
try:
pth_file = self._env.site_packages.write_text(
pth_file, content, encoding="utf-8"
......@@ -199,20 +220,7 @@ class EditableBuilder(Builder):
added_files = added_files[:]
builder = WheelBuilder(self._poetry)
dist_info_path = Path(builder.dist_info)
for dist_info in self._env.site_packages.find(
dist_info_path, writable_only=True
):
if dist_info.exists():
self._debug(
" - Removing existing <c2>{}</c2> directory from <b>{}</b>".format(
dist_info.name, dist_info.parent
)
)
shutil.rmtree(str(dist_info))
dist_info = self._env.site_packages.mkdir(dist_info_path)
dist_info = self._env.site_packages.mkdir(Path(builder.dist_info))
self._debug(
" - Adding the <c2>{}</c2> directory to <b>{}</b>".format(
......
......@@ -2,7 +2,6 @@ import itertools
import json
from pathlib import Path
from typing import TYPE_CHECKING
from typing import Set
from typing import Tuple
from typing import Union
......@@ -19,9 +18,6 @@ from .repository import Repository
_VENDORS = Path(__file__).parent.parent.joinpath("_vendor")
if TYPE_CHECKING:
from importlib.metadata import Distribution
try:
FileNotFoundError
......@@ -102,7 +98,7 @@ class InstalledRepository(Repository):
@classmethod
def create_package_from_distribution(
cls, distribution: "Distribution", env: "Env"
cls, distribution: metadata.Distribution, env: "Env"
) -> Package:
# We first check for a direct_url.json file to determine
# the type of package.
......@@ -172,7 +168,7 @@ class InstalledRepository(Repository):
return package
@classmethod
def create_package_from_pep610(cls, distribution: "Distribution") -> Package:
def create_package_from_pep610(cls, distribution: metadata.Distribution) -> Package:
path = Path(str(distribution._path))
source_type = None
source_url = None
......
import base64
import hashlib
import itertools
import json
import os
import platform
......@@ -17,6 +18,7 @@ from subprocess import CalledProcessError
from typing import Any
from typing import ContextManager
from typing import Dict
from typing import Iterable
from typing import Iterator
from typing import List
from typing import Optional
......@@ -43,6 +45,7 @@ from poetry.poetry import Poetry
from poetry.utils._compat import decode
from poetry.utils._compat import encode
from poetry.utils._compat import list_to_shell_command
from poetry.utils._compat import metadata
from poetry.utils.helpers import is_dir_writable
from poetry.utils.helpers import paths_csv
from poetry.utils.helpers import temporary_directory
......@@ -166,7 +169,12 @@ class SitePackages:
self._fallbacks = fallbacks or []
self._skip_write_checks = skip_write_checks
self._candidates = list({self._purelib, self._platlib}) + self._fallbacks
self._candidates: List[Path] = []
for path in itertools.chain([self._purelib, self._platlib], self._fallbacks):
if path not in self._candidates:
self._candidates.append(path)
self._writable_candidates = None if not skip_write_checks else self._candidates
@property
......@@ -198,7 +206,9 @@ class SitePackages:
return self._writable_candidates
def make_candidates(self, path: Path, writable_only: bool = False) -> List[Path]:
def make_candidates(
self, path: Path, writable_only: bool = False, strict: bool = False
) -> List[Path]:
candidates = self._candidates if not writable_only else self.writable_candidates
if path.is_absolute():
for candidate in candidates:
......@@ -214,7 +224,94 @@ class SitePackages:
)
)
return [candidate / path for candidate in candidates if candidate]
results = [candidate / path for candidate in candidates if candidate]
if not results and strict:
raise RuntimeError(
'Unable to find a suitable destination for "{}" in {}'.format(
str(path), paths_csv(self._candidates)
)
)
return results
def distributions(
self, name: Optional[str] = None, writable_only: bool = False
) -> Iterable[metadata.PathDistribution]:
path = list(
map(
str, self._candidates if not writable_only else self.writable_candidates
)
)
for distribution in metadata.PathDistribution.discover(
name=name, path=path
): # type: metadata.PathDistribution
yield distribution
def find_distribution(
self, name: str, writable_only: bool = False
) -> Optional[metadata.PathDistribution]:
for distribution in self.distributions(name=name, writable_only=writable_only):
return distribution
else:
return None
def find_distribution_files_with_suffix(
self, distribution_name: str, suffix: str, writable_only: bool = False
) -> Iterable[Path]:
for distribution in self.distributions(
name=distribution_name, writable_only=writable_only
):
for file in distribution.files:
if file.name.endswith(suffix):
yield Path(distribution.locate_file(file))
def find_distribution_files_with_name(
self, distribution_name: str, name: str, writable_only: bool = False
) -> Iterable[Path]:
for distribution in self.distributions(
name=distribution_name, writable_only=writable_only
):
for file in distribution.files:
if file.name == name:
yield Path(distribution.locate_file(file))
def find_distribution_nspkg_pth_files(
self, distribution_name: str, writable_only: bool = False
) -> Iterable[Path]:
return self.find_distribution_files_with_suffix(
distribution_name=distribution_name,
suffix="-nspkg.pth",
writable_only=writable_only,
)
def find_distribution_direct_url_json_files(
self, distribution_name: str, writable_only: bool = False
) -> Iterable[Path]:
return self.find_distribution_files_with_name(
distribution_name=distribution_name,
name="direct_url.json",
writable_only=writable_only,
)
def remove_distribution_files(self, distribution_name: str) -> List[Path]:
paths = []
for distribution in self.distributions(
name=distribution_name, writable_only=True
):
for file in distribution.files:
file = Path(distribution.locate_file(file))
# We can't use unlink(missing_ok=True) because it's not always available
if file.exists():
file.unlink()
if distribution._path.exists():
shutil.rmtree(str(distribution._path))
paths.append(distribution._path)
return paths
def _path_method_wrapper(
self,
......@@ -228,13 +325,8 @@ class SitePackages:
if isinstance(path, str):
path = Path(path)
candidates = self.make_candidates(path, writable_only=writable_only)
if not candidates:
raise RuntimeError(
'Unable to find a suitable destination for "{}" in {}'.format(
str(path), paths_csv(self._candidates)
)
candidates = self.make_candidates(
path, writable_only=writable_only, strict=True
)
results = []
......@@ -244,7 +336,6 @@ class SitePackages:
result = candidate, getattr(candidate, method)(*args, **kwargs)
if return_first:
return result
else:
results.append(result)
except OSError:
# TODO: Replace with PermissionError
......@@ -267,7 +358,11 @@ class SitePackages:
for value in self._path_method_wrapper(path, "exists", return_first=False)
)
def find(self, path: Union[str, Path], writable_only: bool = False) -> List[Path]:
def find(
self,
path: Union[str, Path],
writable_only: bool = False,
) -> List[Path]:
return [
value[0]
for value in self._path_method_wrapper(
......
......@@ -265,6 +265,24 @@ def test_executor_should_delete_incomplete_downloads(
assert not destination_fixture.exists()
def verify_installed_distribution(venv, package, url_reference=None):
distributions = list(venv.site_packages.distributions(name=package.name))
assert len(distributions) == 1
distribution = distributions[0]
metadata = distribution.metadata
assert metadata["Name"] == package.name
assert metadata["Version"] == package.version.text
direct_url_file = distribution._path.joinpath("direct_url.json")
if url_reference is not None:
assert direct_url_file.exists()
assert json.loads(direct_url_file.read_text(encoding="utf-8")) == url_reference
else:
assert not direct_url_file.exists()
def test_executor_should_write_pep610_url_references_for_files(
tmp_venv, pool, config, io
):
......@@ -279,18 +297,9 @@ def test_executor_should_write_pep610_url_references_for_files(
executor = Executor(tmp_venv, pool, config, io)
executor.execute([Install(package)])
dist_info = "demo-0.1.0.dist-info"
assert tmp_venv.site_packages.exists(dist_info)
dist_info = tmp_venv.site_packages.find(dist_info)[0]
direct_url_file = dist_info.joinpath("direct_url.json")
assert direct_url_file.exists()
url_reference = json.loads(direct_url_file.read_text(encoding="utf-8"))
assert url_reference == {"archive_info": {}, "url": url.as_uri()}
verify_installed_distribution(
tmp_venv, package, {"archive_info": {}, "url": url.as_uri()}
)
def test_executor_should_write_pep610_url_references_for_directories(
......@@ -303,18 +312,9 @@ def test_executor_should_write_pep610_url_references_for_directories(
executor = Executor(tmp_venv, pool, config, io)
executor.execute([Install(package)])
dist_info = "simple_project-1.2.3.dist-info"
assert tmp_venv.site_packages.exists(dist_info)
dist_info = tmp_venv.site_packages.find(dist_info)[0]
direct_url_file = dist_info.joinpath("direct_url.json")
assert direct_url_file.exists()
url_reference = json.loads(direct_url_file.read_text(encoding="utf-8"))
assert url_reference == {"dir_info": {}, "url": url.as_uri()}
verify_installed_distribution(
tmp_venv, package, {"dir_info": {}, "url": url.as_uri()}
)
def test_executor_should_write_pep610_url_references_for_editable_directories(
......@@ -331,18 +331,9 @@ def test_executor_should_write_pep610_url_references_for_editable_directories(
executor = Executor(tmp_venv, pool, config, io)
executor.execute([Install(package)])
dist_info_dir = "simple_project-1.2.3.dist-info"
assert tmp_venv.site_packages.exists(dist_info_dir)
dist_info = tmp_venv.site_packages.find(dist_info_dir)[0]
direct_url_file = dist_info.joinpath("direct_url.json")
assert direct_url_file.exists()
url_reference = json.loads(direct_url_file.read_text(encoding="utf-8"))
assert url_reference == {"dir_info": {"editable": True}, "url": url.as_uri()}
verify_installed_distribution(
tmp_venv, package, {"dir_info": {"editable": True}, "url": url.as_uri()}
)
def test_executor_should_write_pep610_url_references_for_urls(
......@@ -357,21 +348,9 @@ def test_executor_should_write_pep610_url_references_for_urls(
executor = Executor(tmp_venv, pool, config, io)
executor.execute([Install(package)])
dist_info = "demo-0.1.0.dist-info"
assert tmp_venv.site_packages.exists(dist_info)
dist_info = tmp_venv.site_packages.find(dist_info)[0]
direct_url_file = dist_info.joinpath("direct_url.json")
assert direct_url_file.exists()
url_reference = json.loads(direct_url_file.read_text(encoding="utf-8"))
assert url_reference == {
"archive_info": {},
"url": "https://files.pythonhosted.org/demo-0.1.0-py2.py3-none-any.whl",
}
verify_installed_distribution(
tmp_venv, package, {"archive_info": {}, "url": package.source_url}
)
def test_executor_should_write_pep610_url_references_for_git(
......@@ -388,22 +367,15 @@ def test_executor_should_write_pep610_url_references_for_git(
executor = Executor(tmp_venv, pool, config, io)
executor.execute([Install(package)])
dist_info = "demo-0.1.2.dist-info"
assert tmp_venv.site_packages.exists(dist_info)
dist_info = tmp_venv.site_packages.find(dist_info)[0]
direct_url_file = dist_info.joinpath("direct_url.json")
assert direct_url_file.exists()
url_reference = json.loads(direct_url_file.read_text(encoding="utf-8"))
assert url_reference == {
verify_installed_distribution(
tmp_venv,
package,
{
"vcs_info": {
"vcs": "git",
"requested_revision": "master",
"commit_id": "123456",
},
"url": "https://github.com/demo/demo.git",
}
"url": package.source_url,
},
)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment