Commit 3248fe17 by Randy Döring

refactor: group functions for requirements parsing into a class (#7693)

parent 65486660
......@@ -13,7 +13,7 @@ from tomlkit import inline_table
from poetry.console.commands.command import Command
from poetry.console.commands.env_command import EnvCommand
from poetry.utils.dependency_specification import parse_dependency_specification
from poetry.utils.dependency_specification import RequirementsParser
if TYPE_CHECKING:
......@@ -437,14 +437,10 @@ You can specify a package in the following forms:
except (PyProjectException, RuntimeError):
cwd = Path.cwd()
return [
parse_dependency_specification(
requirement=requirement,
env=self.env if isinstance(self, EnvCommand) else None,
cwd=cwd,
)
for requirement in requirements
]
parser = RequirementsParser(
self.env if isinstance(self, EnvCommand) else None, cwd
)
return [parser.parse(requirement) for requirement in requirements]
def _format_requirements(self, requirements: list[dict[str, str]]) -> Requirements:
requires: Requirements = {}
......
......@@ -26,128 +26,6 @@ if TYPE_CHECKING:
DependencySpec = Dict[str, Union[str, bool, Dict[str, Union[str, bool]], List[str]]]
def _parse_dependency_specification_git_url(
requirement: str, env: Env | None = None
) -> DependencySpec | None:
from poetry.core.vcs.git import Git
from poetry.core.vcs.git import ParsedUrl
parsed = ParsedUrl.parse(requirement)
url = Git.normalize_url(requirement)
pair = {"name": parsed.name, "git": url.url}
if parsed.rev:
pair["rev"] = url.revision
if parsed.subdirectory:
pair["subdirectory"] = parsed.subdirectory
source_root = env.path.joinpath("src") if env else None
package = Provider.get_package_from_vcs(
"git",
url=url.url,
rev=pair.get("rev"),
subdirectory=parsed.subdirectory,
source_root=source_root,
)
pair["name"] = package.name
return pair
def _parse_dependency_specification_url(
requirement: str, env: Env | None = None
) -> DependencySpec | None:
url_parsed = urllib.parse.urlparse(requirement)
if not (url_parsed.scheme and url_parsed.netloc):
return None
if url_parsed.scheme in ["git+https", "git+ssh"]:
return _parse_dependency_specification_git_url(requirement, env)
if url_parsed.scheme in ["http", "https"]:
package = Provider.get_package_from_url(requirement)
assert package.source_url is not None
return {"name": package.name, "url": package.source_url}
return None
def _parse_dependency_specification_path(
requirement: str, cwd: Path
) -> DependencySpec | None:
if (os.path.sep in requirement or "/" in requirement) and (
cwd.joinpath(requirement).exists()
or Path(requirement).expanduser().exists()
and Path(requirement).expanduser().is_absolute()
):
path = Path(requirement).expanduser()
is_absolute = path.is_absolute()
if not path.is_absolute():
path = cwd.joinpath(requirement)
if path.is_file():
package = Provider.get_package_from_file(path.resolve())
else:
package = Provider.get_package_from_directory(path.resolve())
return {
"name": package.name,
"path": (
path.relative_to(cwd).as_posix() if not is_absolute else path.as_posix()
),
}
return None
def _parse_dependency_specification_simple(
requirement: str,
) -> DependencySpec | None:
extras: list[str] = []
pair = re.sub("^([^@=: ]+)(?:@|==|(?<![<>~!])=|:| )(.*)$", "\\1 \\2", requirement)
pair = pair.strip()
require: DependencySpec = {}
if " " in pair:
name, version = pair.split(" ", 1)
extras_m = re.search(r"\[([\w\d,-_]+)\]$", name)
if extras_m:
extras = [e.strip() for e in extras_m.group(1).split(",")]
name, _ = name.split("[")
require["name"] = name
if version != "latest":
require["version"] = version
else:
m = re.match(r"^([^><=!: ]+)((?:>=|<=|>|<|!=|~=|~|\^).*)$", requirement.strip())
if m:
name, constraint = m.group(1), m.group(2)
extras_m = re.search(r"\[([\w\d,-_]+)\]$", name)
if extras_m:
extras = [e.strip() for e in extras_m.group(1).split(",")]
name, _ = name.split("[")
require["name"] = name
require["version"] = constraint
else:
extras_m = re.search(r"\[([\w\d,-_]+)\]$", pair)
if extras_m:
extras = [e.strip() for e in extras_m.group(1).split(",")]
pair, _ = pair.split("[")
require["name"] = pair
if extras:
require["extras"] = extras
return require
BaseSpec = TypeVar("BaseSpec", DependencySpec, InlineTable)
......@@ -178,49 +56,167 @@ def dependency_to_specification(
return specification
def pep508_to_dependency_specification(requirement: str) -> DependencySpec | None:
if " ; " not in requirement and re.search(r"@[\^~!=<>\d]", requirement):
# this is of the form package@<semver>, do not attempt to parse it
return None
class RequirementsParser:
def __init__(self, env: Env | None = None, cwd: Path | None = None) -> None:
self._env = env
self._cwd = cwd or Path.cwd()
def parse(self, requirement: str) -> DependencySpec:
requirement = requirement.strip()
with contextlib.suppress(ValueError):
dependency = Dependency.create_from_pep_508(requirement)
specification: DependencySpec = {}
specification = dependency_to_specification(dependency, specification)
specification = self._parse_pep508(requirement)
if specification is not None:
return specification
extras = []
extras_m = re.search(r"\[([\w\d,-_ ]+)\]$", requirement)
if extras_m:
extras = [e.strip() for e in extras_m.group(1).split(",")]
requirement, _ = requirement.split("[")
specification = (
self._parse_url(requirement)
or self._parse_path(requirement)
or self._parse_simple(requirement)
)
if specification:
specification["name"] = dependency.name
if extras and "extras" not in specification:
specification["extras"] = extras
return specification
return None
raise ValueError(f"Invalid dependency specification: {requirement}")
def _parse_pep508(self, requirement: str) -> DependencySpec | None:
if " ; " not in requirement and re.search(r"@[\^~!=<>\d]", requirement):
# this is of the form package@<semver>, do not attempt to parse it
return None
with contextlib.suppress(ValueError):
dependency = Dependency.create_from_pep_508(requirement)
specification: DependencySpec = {}
specification = dependency_to_specification(dependency, specification)
if specification:
specification["name"] = dependency.name
return specification
return None
def _parse_git_url(self, requirement: str) -> DependencySpec | None:
from poetry.core.vcs.git import Git
from poetry.core.vcs.git import ParsedUrl
parsed = ParsedUrl.parse(requirement)
url = Git.normalize_url(requirement)
pair = {"name": parsed.name, "git": url.url}
if parsed.rev:
pair["rev"] = url.revision
if parsed.subdirectory:
pair["subdirectory"] = parsed.subdirectory
def parse_dependency_specification(
requirement: str, env: Env | None = None, cwd: Path | None = None
) -> DependencySpec:
requirement = requirement.strip()
cwd = cwd or Path.cwd()
source_root = self._env.path.joinpath("src") if self._env else None
package = Provider.get_package_from_vcs(
"git",
url=url.url,
rev=pair.get("rev"),
subdirectory=parsed.subdirectory,
source_root=source_root,
)
pair["name"] = package.name
return pair
specification = pep508_to_dependency_specification(requirement)
def _parse_url(self, requirement: str) -> DependencySpec | None:
url_parsed = urllib.parse.urlparse(requirement)
if not (url_parsed.scheme and url_parsed.netloc):
return None
if specification is not None:
return specification
if url_parsed.scheme in ["git+https", "git+ssh"]:
return self._parse_git_url(requirement)
extras = []
extras_m = re.search(r"\[([\w\d,-_ ]+)\]$", requirement)
if extras_m:
extras = [e.strip() for e in extras_m.group(1).split(",")]
requirement, _ = requirement.split("[")
if url_parsed.scheme in ["http", "https"]:
package = Provider.get_package_from_url(requirement)
assert package.source_url is not None
return {"name": package.name, "url": package.source_url}
specification = (
_parse_dependency_specification_url(requirement, env=env)
or _parse_dependency_specification_path(requirement, cwd=cwd)
or _parse_dependency_specification_simple(requirement)
)
return None
def _parse_path(self, requirement: str) -> DependencySpec | None:
if (os.path.sep in requirement or "/" in requirement) and (
self._cwd.joinpath(requirement).exists()
or Path(requirement).expanduser().exists()
and Path(requirement).expanduser().is_absolute()
):
path = Path(requirement).expanduser()
is_absolute = path.is_absolute()
if not path.is_absolute():
path = self._cwd.joinpath(requirement)
if path.is_file():
package = Provider.get_package_from_file(path.resolve())
else:
package = Provider.get_package_from_directory(path.resolve())
return {
"name": package.name,
"path": (
path.relative_to(self._cwd).as_posix()
if not is_absolute
else path.as_posix()
),
}
return None
if specification:
if extras and "extras" not in specification:
specification["extras"] = extras
return specification
def _parse_simple(
self,
requirement: str,
) -> DependencySpec | None:
extras: list[str] = []
pair = re.sub(
"^([^@=: ]+)(?:@|==|(?<![<>~!])=|:| )(.*)$", "\\1 \\2", requirement
)
pair = pair.strip()
raise ValueError(f"Invalid dependency specification: {requirement}")
require: DependencySpec = {}
if " " in pair:
name, version = pair.split(" ", 1)
extras_m = re.search(r"\[([\w\d,-_]+)\]$", name)
if extras_m:
extras = [e.strip() for e in extras_m.group(1).split(",")]
name, _ = name.split("[")
require["name"] = name
if version != "latest":
require["version"] = version
else:
m = re.match(
r"^([^><=!: ]+)((?:>=|<=|>|<|!=|~=|~|\^).*)$", requirement.strip()
)
if m:
name, constraint = m.group(1), m.group(2)
extras_m = re.search(r"\[([\w\d,-_]+)\]$", name)
if extras_m:
extras = [e.strip() for e in extras_m.group(1).split(",")]
name, _ = name.split("[")
require["name"] = name
require["version"] = constraint
else:
extras_m = re.search(r"\[([\w\d,-_]+)\]$", pair)
if extras_m:
extras = [e.strip() for e in extras_m.group(1).split(",")]
pair, _ = pair.split("[")
require["name"] = pair
if extras:
require["extras"] = extras
return require
......@@ -7,7 +7,7 @@ import pytest
from deepdiff import DeepDiff
from poetry.utils.dependency_specification import parse_dependency_specification
from poetry.utils.dependency_specification import RequirementsParser
if TYPE_CHECKING:
......@@ -116,5 +116,5 @@ def test_parse_dependency_specification(
mocker.patch("pathlib.Path.exists", _mock)
assert not DeepDiff(
parse_dependency_specification(requirement), specification, ignore_order=True
RequirementsParser().parse(requirement), specification, ignore_order=True
)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment