Commit 8cb3aab3 by Arun Babu Neelicattu Committed by Bjorn Neergaard

config: allow bool values for repo cert

This change allows certificates.<repo>.cert configuration to accept
boolean values in addition to certificate paths. This allows for
repositories to skip TLS certificate validation for cases where
self-signed certificats are used by package sources.

In addition to the above, the certificate configuration handling has
now been delegated to a dedicated dataclass.

Co-authored-by: Celeborn2BeAlive <laurent.noel.c2ba@gmail.com>
Co-authored-by: Maayan Bar <maayanbar13@gmail.com>
parent 6a6034e1
......@@ -315,12 +315,15 @@ for more information.
### `certificates.<name>.cert`:
**Type**: string
**Type**: string | bool
Set custom certificate authority for repository `<name>`.
See [Repositories - Configuring credentials - Custom certificate authority]({{< relref "repositories#custom-certificate-authority-and-mutual-tls-authentication" >}})
for more information.
This configuration can be set to `false`, if TLS certificate verification should be skipped for this
repository.
### `certificates.<name>.client-cert`:
**Type**: string
......
......@@ -384,6 +384,21 @@ poetry config certificates.foo.cert /path/to/ca.pem
poetry config certificates.foo.client-cert /path/to/client.pem
```
{{% note %}}
The value of `certificates.<repository>.cert` can be set to `false` if certificate verification is
required to be skipped. This is useful for cases where a package source with self-signed certificates
are used.
```bash
poetry config certificates.foo.cert false
```
{{% warning %}}
Disabling certificate verification is not recommended as it is does not conform to security
best practices.
{{% /warning %}}
{{% /note %}}
## Caches
Poetry employs multiple caches for package sources in order to improve user experience and avoid duplicate network
......
......@@ -3,6 +3,7 @@ from __future__ import annotations
import json
import re
from pathlib import Path
from typing import TYPE_CHECKING
from typing import Any
from typing import cast
......@@ -11,7 +12,11 @@ from cleo.helpers import argument
from cleo.helpers import option
from poetry.config.config import PackageFilterPolicy
from poetry.config.config import boolean_normalizer
from poetry.config.config import boolean_validator
from poetry.config.config import int_normalizer
from poetry.console.commands.command import Command
from poetry.locations import DEFAULT_CACHE_DIR
if TYPE_CHECKING:
......@@ -48,13 +53,6 @@ To remove a repository (repo is a short alias for repositories):
@property
def unique_config_values(self) -> dict[str, tuple[Any, Any, Any]]:
from pathlib import Path
from poetry.config.config import boolean_normalizer
from poetry.config.config import boolean_validator
from poetry.config.config import int_normalizer
from poetry.locations import DEFAULT_CACHE_DIR
unique_config_values = {
"cache-dir": (
str,
......@@ -275,20 +273,26 @@ To remove a repository (repo is a short alias for repositories):
return 0
# handle certs
m = re.match(
r"(?:certificates)\.([^.]+)\.(cert|client-cert)", self.argument("key")
)
m = re.match(r"certificates\.([^.]+)\.(cert|client-cert)", self.argument("key"))
if m:
repository = m.group(1)
key = m.group(2)
if self.option("unset"):
config.auth_config_source.remove_property(
f"certificates.{m.group(1)}.{m.group(2)}"
f"certificates.{repository}.{key}"
)
return 0
if len(values) == 1:
new_value: str | bool = values[0]
if key == "cert" and boolean_validator(values[0]):
new_value = boolean_normalizer(values[0])
config.auth_config_source.add_property(
f"certificates.{m.group(1)}.{m.group(2)}", values[0]
f"certificates.{repository}.{key}", new_value
)
else:
raise ValueError("You must pass exactly 1 value")
......
......@@ -63,11 +63,17 @@ class PipInstaller(BaseInstaller):
args += ["--trusted-host", parsed.hostname]
if isinstance(repository, HTTPRepository):
if repository.cert:
args += ["--cert", str(repository.cert)]
certificates = repository.certificates
if repository.client_cert:
args += ["--client-cert", str(repository.client_cert)]
if certificates.cert:
args += ["--cert", str(certificates.cert)]
if parsed.scheme == "https" and not certificates.verify:
assert parsed.hostname is not None
args += ["--trusted-host", parsed.hostname]
if certificates.client_cert:
args += ["--client-cert", str(certificates.client_cert)]
index_url = repository.authenticated_url
......
......@@ -6,8 +6,6 @@ from typing import TYPE_CHECKING
from poetry.publishing.uploader import Uploader
from poetry.utils.authenticator import Authenticator
from poetry.utils.helpers import get_cert
from poetry.utils.helpers import get_client_cert
if TYPE_CHECKING:
......@@ -72,9 +70,10 @@ class Publisher:
username = auth.username
password = auth.password
resolved_client_cert = client_cert or get_client_cert(
self._poetry.config, repository_name
)
certificates = self._authenticator.get_certs_for_repository(repository_name)
resolved_cert = cert or certificates.cert or certificates.verify
resolved_client_cert = client_cert or certificates.client_cert
# Requesting missing credentials but only if there is not a client cert defined.
if not resolved_client_cert and hasattr(self._io, "ask"):
if username is None:
......@@ -96,7 +95,7 @@ class Publisher:
self._uploader.upload(
url,
cert=cert or get_cert(self._poetry.config, repository_name),
cert=resolved_cert,
client_cert=resolved_client_cert,
dry_run=dry_run,
skip_existing=skip_existing,
......
......@@ -3,6 +3,7 @@ from __future__ import annotations
import hashlib
import io
from pathlib import Path
from typing import TYPE_CHECKING
from typing import Any
......@@ -25,8 +26,6 @@ from poetry.utils.patterns import wheel_file_re
if TYPE_CHECKING:
from pathlib import Path
from cleo.io.null_io import NullIO
from poetry.poetry import Poetry
......@@ -114,15 +113,14 @@ class Uploader:
def upload(
self,
url: str,
cert: Path | None = None,
cert: Path | bool = True,
client_cert: Path | None = None,
dry_run: bool = False,
skip_existing: bool = False,
) -> None:
session = self.make_session()
if cert:
session.verify = str(cert)
session.verify = str(cert) if isinstance(cert, Path) else cert
if client_cert:
session.cert = str(client_cert)
......
......@@ -31,6 +31,7 @@ from poetry.utils.patterns import wheel_file_re
if TYPE_CHECKING:
from poetry.config.config import Config
from poetry.inspection.info import PackageInfo
from poetry.utils.authenticator import RepositoryCertificateConfig
class HTTPRepository(CachedRepository, ABC):
......@@ -59,18 +60,8 @@ class HTTPRepository(CachedRepository, ABC):
return self._url
@property
def cert(self) -> Path | None:
cert = self._authenticator.get_certs_for_url(self.url).get("verify")
if cert:
return Path(cert)
return None
@property
def client_cert(self) -> Path | None:
cert = self._authenticator.get_certs_for_url(self.url).get("cert")
if cert:
return Path(cert)
return None
def certificates(self) -> RepositoryCertificateConfig:
return self._authenticator.get_certs_for_url(self.url)
@property
def authenticated_url(self) -> str:
......
......@@ -8,6 +8,7 @@ import time
import urllib.parse
from os.path import commonprefix
from pathlib import Path
from typing import TYPE_CHECKING
from typing import Any
......@@ -20,21 +21,42 @@ from cachecontrol.caches import FileCache
from poetry.config.config import Config
from poetry.exceptions import PoetryException
from poetry.utils.helpers import get_cert
from poetry.utils.helpers import get_client_cert
from poetry.utils.password_manager import HTTPAuthCredential
from poetry.utils.password_manager import PasswordManager
if TYPE_CHECKING:
from pathlib import Path
from cleo.io.io import IO
logger = logging.getLogger(__name__)
@dataclasses.dataclass(frozen=True)
class RepositoryCertificateConfig:
cert: Path | None = dataclasses.field(default=None)
client_cert: Path | None = dataclasses.field(default=None)
verify: bool = dataclasses.field(default=True)
@classmethod
def create(
cls, repository: str, config: Config | None
) -> RepositoryCertificateConfig:
config = config if config else Config.create()
verify: str | bool = config.get(
f"certificates.{repository}.verify",
config.get(f"certificates.{repository}.cert", True),
)
client_cert: str = config.get(f"certificates.{repository}.client-cert")
return cls(
cert=Path(verify) if isinstance(verify, str) else None,
client_cert=Path(client_cert) if client_cert else None,
verify=verify if isinstance(verify, bool) else True,
)
@dataclasses.dataclass
class AuthenticatorRepositoryConfig:
name: str
......@@ -47,11 +69,8 @@ class AuthenticatorRepositoryConfig:
self.netloc = parsed_url.netloc
self.path = parsed_url.path
def certs(self, config: Config) -> dict[str, Path | None]:
return {
"cert": get_client_cert(config, self.name),
"verify": get_cert(config, self.name),
}
def certs(self, config: Config) -> RepositoryCertificateConfig:
return RepositoryCertificateConfig.create(self.name, config)
@property
def http_credential_keys(self) -> list[str]:
......@@ -91,7 +110,7 @@ class Authenticator:
self._io = io
self._sessions_for_netloc: dict[str, requests.Session] = {}
self._credentials: dict[str, HTTPAuthCredential] = {}
self._certs: dict[str, dict[str, Path | None]] = {}
self._certs: dict[str, RepositoryCertificateConfig] = {}
self._configured_repositories: dict[
str, AuthenticatorRepositoryConfig
] | None = None
......@@ -186,14 +205,13 @@ class Authenticator:
stream = kwargs.get("stream")
certs = self.get_certs_for_url(url)
verify = kwargs.get("verify") or certs.get("verify")
cert = kwargs.get("cert") or certs.get("cert")
verify = kwargs.get("verify") or certs.cert or certs.verify
cert = kwargs.get("cert") or certs.client_cert
if cert is not None:
cert = str(cert)
if verify is not None:
verify = str(verify)
verify = str(verify) if isinstance(verify, Path) else verify
settings = session.merge_environment_settings( # type: ignore[no-untyped-call]
prepared_request.url, proxies, stream, verify, cert
......@@ -332,6 +350,11 @@ class Authenticator:
repository=repository, username=username
)
def get_certs_for_repository(self, name: str) -> RepositoryCertificateConfig:
if name.lower() == "pypi" or name not in self.configured_repositories:
return RepositoryCertificateConfig()
return self.configured_repositories[name].certs(self._config)
@property
def configured_repositories(self) -> dict[str, AuthenticatorRepositoryConfig]:
if self._configured_repositories is None:
......@@ -352,7 +375,7 @@ class Authenticator:
self.configured_repositories[name] = AuthenticatorRepositoryConfig(name, url)
self.reset_credentials_cache()
def get_certs_for_url(self, url: str) -> dict[str, Path | None]:
def get_certs_for_url(self, url: str) -> RepositoryCertificateConfig:
if url not in self._certs:
self._certs[url] = self._get_certs_for_url(url)
return self._certs[url]
......@@ -398,11 +421,11 @@ class Authenticator:
return candidates[0]
def _get_certs_for_url(self, url: str) -> dict[str, Path | None]:
def _get_certs_for_url(self, url: str) -> RepositoryCertificateConfig:
selected = self.get_repository_config_for_url(url)
if selected:
return selected.certs(config=self._config)
return {"cert": None, "verify": None}
return RepositoryCertificateConfig()
_authenticator: Authenticator | None = None
......
......@@ -18,7 +18,6 @@ if TYPE_CHECKING:
from poetry.core.packages.package import Package
from requests import Session
from poetry.config.config import Config
from poetry.utils.authenticator import Authenticator
......@@ -33,22 +32,6 @@ def module_name(name: str) -> str:
return canonicalize_name(name).replace(".", "_").replace("-", "_")
def get_cert(config: Config, repository_name: str) -> Path | None:
cert = config.get(f"certificates.{repository_name}.cert")
if cert:
return Path(cert)
else:
return None
def get_client_cert(config: Config, repository_name: str) -> Path | None:
client_cert = config.get(f"certificates.{repository_name}.client-cert")
if client_cert:
return Path(client_cert)
else:
return None
def _on_rm_error(func: Callable[[str], None], path: str, exc_info: Exception) -> None:
if not os.path.exists(path):
return
......
......@@ -175,16 +175,26 @@ def test_set_client_cert(
)
@pytest.mark.parametrize(
("value", "result"),
[
("path/to/ca.pem", "path/to/ca.pem"),
("true", True),
("false", False),
],
)
def test_set_cert(
tester: CommandTester,
auth_config_source: DictConfigSource,
mocker: MockerFixture,
value: str,
result: str | bool,
):
mocker.spy(ConfigSource, "__init__")
tester.execute("certificates.foo.cert path/to/ca.pem")
tester.execute(f"certificates.foo.cert {value}")
assert auth_config_source.config["certificates"]["foo"]["cert"] == "path/to/ca.pem"
assert auth_config_source.config["certificates"]["foo"]["cert"] == result
def test_config_installer_parallel(
......
......@@ -14,6 +14,7 @@ from poetry.core.packages.package import Package
from poetry.installation.pip_installer import PipInstaller
from poetry.repositories.legacy_repository import LegacyRepository
from poetry.repositories.pool import Pool
from poetry.utils.authenticator import RepositoryCertificateConfig
from poetry.utils.env import NullEnv
......@@ -21,6 +22,7 @@ if TYPE_CHECKING:
from pytest_mock import MockerFixture
from poetry.utils.env import VirtualEnv
from tests.conftest import Config
@pytest.fixture
......@@ -120,15 +122,15 @@ def test_install_with_non_pypi_default_repository(pool: Pool, installer: PipInst
@pytest.mark.parametrize(
("key", "option"),
[
("cert", "client-cert"),
("verify", "cert"),
("client_cert", "client-cert"),
("cert", "cert"),
],
)
def test_install_with_certs(mocker: MockerFixture, key: str, option: str):
client_path = "path/to/client.pem"
mocker.patch(
"poetry.utils.authenticator.Authenticator.get_certs_for_url",
return_value={key: client_path},
return_value=RepositoryCertificateConfig(**{key: Path(client_path)}),
)
default = LegacyRepository("default", "https://foo.bar")
......@@ -204,3 +206,31 @@ def test_uninstall_git_package_nspkg_pth_cleanup(
# any command in the virtual environment should trigger the error message
output = tmp_venv.run("python", "-m", "site")
assert not re.match(rf"Error processing line 1 of .*{pth_file}", output)
def test_install_with_trusted_host(config: Config):
config.merge({"certificates": {"default": {"cert": False}}})
default = LegacyRepository("default", "https://foo.bar")
pool = Pool()
pool.add_repository(default, default=True)
null_env = NullEnv()
installer = PipInstaller(null_env, NullIO(), pool)
foo = Package(
"foo",
"0.0.0",
source_type="legacy",
source_reference=default.name,
source_url=default.url,
)
installer.install(foo)
assert len(null_env.executed) == 1
cmd = null_env.executed[0]
assert "--trusted-host" in cmd
cert_index = cmd.index("--trusted-host")
assert cmd[cert_index + 1] == "foo.bar"
......@@ -38,7 +38,7 @@ def test_publish_publishes_to_pypi_by_default(
assert [("foo", "bar")] == uploader_auth.call_args
assert [
("https://upload.pypi.org/legacy/",),
{"cert": None, "client_cert": None, "dry_run": False, "skip_existing": False},
{"cert": True, "client_cert": None, "dry_run": False, "skip_existing": False},
] == uploader_upload.call_args
......@@ -70,7 +70,7 @@ def test_publish_can_publish_to_given_repository(
assert [("foo", "bar")] == uploader_auth.call_args
assert [
("http://foo.bar",),
{"cert": None, "client_cert": None, "dry_run": False, "skip_existing": False},
{"cert": True, "client_cert": None, "dry_run": False, "skip_existing": False},
] == uploader_upload.call_args
assert "Publishing my-package (1.2.3) to foo" in io.fetch_output()
......@@ -104,7 +104,7 @@ def test_publish_uses_token_if_it_exists(
assert [("__token__", "my-token")] == uploader_auth.call_args
assert [
("https://upload.pypi.org/legacy/",),
{"cert": None, "client_cert": None, "dry_run": False, "skip_existing": False},
{"cert": True, "client_cert": None, "dry_run": False, "skip_existing": False},
] == uploader_upload.call_args
......@@ -159,7 +159,7 @@ def test_publish_uses_client_cert(
assert [
("https://foo.bar",),
{
"cert": None,
"cert": True,
"client_cert": Path(client_cert),
"dry_run": False,
"skip_existing": False,
......@@ -186,5 +186,5 @@ def test_publish_read_from_environment_variable(
assert [("bar", "baz")] == uploader_auth.call_args
assert [
("https://foo.bar",),
{"cert": None, "client_cert": None, "dry_run": False, "skip_existing": False},
{"cert": True, "client_cert": None, "dry_run": False, "skip_existing": False},
] == uploader_upload.call_args
......@@ -16,6 +16,7 @@ import requests
from cleo.io.null_io import NullIO
from poetry.utils.authenticator import Authenticator
from poetry.utils.authenticator import RepositoryCertificateConfig
if TYPE_CHECKING:
......@@ -599,3 +600,52 @@ def test_authenticator_git_repositories(
three = authenticator.get_credentials_for_git_url("https://foo.bar/org/three.git")
assert not three.username
assert not three.password
@pytest.mark.parametrize(
("ca_cert", "client_cert", "result"),
[
(None, None, RepositoryCertificateConfig()),
(
"path/to/ca.pem",
"path/to/client.pem",
RepositoryCertificateConfig(
Path("path/to/ca.pem"), Path("path/to/client.pem")
),
),
(
None,
"path/to/client.pem",
RepositoryCertificateConfig(None, Path("path/to/client.pem")),
),
(
"path/to/ca.pem",
None,
RepositoryCertificateConfig(Path("path/to/ca.pem"), None),
),
(True, None, RepositoryCertificateConfig()),
(False, None, RepositoryCertificateConfig(verify=False)),
(
False,
"path/to/client.pem",
RepositoryCertificateConfig(None, Path("path/to/client.pem"), verify=False),
),
],
)
def test_repository_certificate_configuration_create(
ca_cert: str | bool | None,
client_cert: str | None,
result: RepositoryCertificateConfig,
config: Config,
) -> None:
cert_config = {}
if ca_cert is not None:
cert_config["cert"] = ca_cert
if client_cert is not None:
cert_config["client-cert"] = client_cert
config.merge({"certificates": {"foo": cert_config}})
assert RepositoryCertificateConfig.create("foo", config) == result
from __future__ import annotations
from pathlib import Path
from typing import TYPE_CHECKING
import pytest
from poetry.core.utils.helpers import parse_requires
from poetry.utils.helpers import canonicalize_name
from poetry.utils.helpers import get_cert
from poetry.utils.helpers import get_client_cert
if TYPE_CHECKING:
from tests.conftest import Config
def test_parse_requires():
......@@ -72,20 +63,6 @@ isort@ git+git://github.com/timothycrosley/isort.git@e63ae06ec7d70b06df9e5283576
assert result == expected
def test_get_cert(config: Config):
ca_cert = "path/to/ca.pem"
config.merge({"certificates": {"foo": {"cert": ca_cert}}})
assert get_cert(config, "foo") == Path(ca_cert)
def test_get_client_cert(config: Config):
client_cert = "path/to/client.pem"
config.merge({"certificates": {"foo": {"client-cert": client_cert}}})
assert get_client_cert(config, "foo") == Path(client_cert)
test_canonicalize_name_cases = [
("flask", "flask"),
("Flask", "flask"),
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment