Commit 58db4247 by Arun Babu Neelicattu Committed by Bjorn Neergaard

authenticator: allow multiple repos w/ same netloc

Co-authored-by: Agni Sairent <agniczech@gmail.com>
Co-authored-by: Dos Moonen <darsstar@gmail.com>
parent 070ea6b4
...@@ -112,6 +112,20 @@ class Config: ...@@ -112,6 +112,20 @@ class Config:
def raw(self) -> dict[str, Any]: def raw(self) -> dict[str, Any]:
return self._config return self._config
@staticmethod
def _get_environment_repositories() -> dict[str, dict[str, str]]:
repositories = {}
pattern = re.compile(r"POETRY_REPOSITORIES_(?P<name>[A-Z_]+)_URL")
for env_key in os.environ.keys():
match = pattern.match(env_key)
if match:
repositories[match.group("name").lower().replace("_", "-")] = {
"url": os.environ[env_key]
}
return repositories
def get(self, setting_name: str, default: Any = None) -> Any: def get(self, setting_name: str, default: Any = None) -> Any:
""" """
Retrieve a setting value. Retrieve a setting value.
...@@ -121,6 +135,12 @@ class Config: ...@@ -121,6 +135,12 @@ class Config:
# Looking in the environment if the setting # Looking in the environment if the setting
# is set via a POETRY_* environment variable # is set via a POETRY_* environment variable
if self._use_environment: if self._use_environment:
if setting_name == "repositories":
# repositories setting is special for now
repositories = self._get_environment_repositories()
if repositories:
return repositories
env = "POETRY_" + "_".join(k.upper().replace("-", "_") for k in keys) env = "POETRY_" + "_".join(k.upper().replace("-", "_") for k in keys)
env_value = os.getenv(env) env_value = os.getenv(env)
if env_value is not None: if env_value is not None:
......
...@@ -69,14 +69,14 @@ class Publisher: ...@@ -69,14 +69,14 @@ class Publisher:
logger.debug( logger.debug(
f"Found authentication information for {repository_name}." f"Found authentication information for {repository_name}."
) )
username = auth["username"] username = auth.username
password = auth["password"] password = auth.password
resolved_client_cert = client_cert or get_client_cert( resolved_client_cert = client_cert or get_client_cert(
self._poetry.config, repository_name self._poetry.config, repository_name
) )
# Requesting missing credentials but only if there is not a client cert defined. # Requesting missing credentials but only if there is not a client cert defined.
if not resolved_client_cert: if not resolved_client_cert and hasattr(self._io, "ask"):
if username is None: if username is None:
username = self._io.ask("Username:") username = self._io.ask("Username:")
......
from __future__ import annotations from __future__ import annotations
import contextlib import contextlib
import dataclasses
import functools
import logging import logging
import time import time
import urllib.parse import urllib.parse
from os.path import commonprefix
from typing import TYPE_CHECKING from typing import TYPE_CHECKING
from typing import Any from typing import Any
from typing import Iterator
import requests import requests
import requests.auth import requests.auth
...@@ -20,6 +22,7 @@ from poetry.exceptions import PoetryException ...@@ -20,6 +22,7 @@ from poetry.exceptions import PoetryException
from poetry.locations import REPOSITORY_CACHE_DIR from poetry.locations import REPOSITORY_CACHE_DIR
from poetry.utils.helpers import get_cert from poetry.utils.helpers import get_cert
from poetry.utils.helpers import get_client_cert from poetry.utils.helpers import get_client_cert
from poetry.utils.password_manager import HTTPAuthCredential
from poetry.utils.password_manager import PasswordManager from poetry.utils.password_manager import PasswordManager
...@@ -34,6 +37,50 @@ if TYPE_CHECKING: ...@@ -34,6 +37,50 @@ if TYPE_CHECKING:
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@dataclasses.dataclass
class AuthenticatorRepositoryConfig:
name: str
url: str
netloc: str = dataclasses.field(init=False)
path: str = dataclasses.field(init=False)
def __post_init__(self) -> None:
parsed_url = urllib.parse.urlsplit(self.url)
self.netloc = parsed_url.netloc
self.path = parsed_url.path
def certs(self, config: Config) -> dict[str, Path | None]:
return {
"cert": get_client_cert(config, self.name),
"verify": get_cert(config, self.name),
}
@property
def http_credential_keys(self) -> list[str]:
return [self.url, self.netloc, self.name]
def get_http_credentials(
self, password_manager: PasswordManager, username: str | None = None
) -> HTTPAuthCredential:
# try with the repository name via the password manager
credential = HTTPAuthCredential(
**(password_manager.get_http_auth(self.name) or {})
)
if credential.password is None:
# fallback to url and netloc based keyring entries
credential = password_manager.keyring.get_credential(
self.url, self.netloc, username=credential.username
)
if credential.password is not None:
return HTTPAuthCredential(
username=credential.username, password=credential.password
)
return credential
class Authenticator: class Authenticator:
def __init__( def __init__(
self, self,
...@@ -44,10 +91,12 @@ class Authenticator: ...@@ -44,10 +91,12 @@ class Authenticator:
) -> None: ) -> None:
self._config = config self._config = config
self._io = io self._io = io
self._session: requests.Session | None = None
self._sessions_for_netloc: dict[str, requests.Session] = {} self._sessions_for_netloc: dict[str, requests.Session] = {}
self._credentials: dict[str, tuple[str, str]] = {} self._credentials: dict[str, HTTPAuthCredential] = {}
self._certs: dict[str, dict[str, Path | None]] = {} self._certs: dict[str, dict[str, Path | None]] = {}
self._configured_repositories: dict[
str, AuthenticatorRepositoryConfig
] | None = None
self._password_manager = PasswordManager(self._config) self._password_manager = PasswordManager(self._config)
self._cache_control = ( self._cache_control = (
FileCache( FileCache(
...@@ -87,7 +136,7 @@ class Authenticator: ...@@ -87,7 +136,7 @@ class Authenticator:
return self._sessions_for_netloc[netloc] return self._sessions_for_netloc[netloc]
def close(self) -> None: def close(self) -> None:
for session in [self._session, *self._sessions_for_netloc.values()]: for session in self._sessions_for_netloc.values():
if session is not None: if session is not None:
with contextlib.suppress(AttributeError): with contextlib.suppress(AttributeError):
session.close() session.close()
...@@ -101,11 +150,11 @@ class Authenticator: ...@@ -101,11 +150,11 @@ class Authenticator:
def authenticated_url(self, url: str) -> str: def authenticated_url(self, url: str) -> str:
parsed = urllib.parse.urlparse(url) parsed = urllib.parse.urlparse(url)
username, password = self.get_credentials_for_url(url) credential = self.get_credentials_for_url(url)
if username is not None and password is not None: if credential.username is not None and credential.password is not None:
username = urllib.parse.quote(username, safe="") username = urllib.parse.quote(credential.username, safe="")
password = urllib.parse.quote(password, safe="") password = urllib.parse.quote(credential.password, safe="")
return ( return (
f"{parsed.scheme}://{username}:{password}@{parsed.netloc}{parsed.path}" f"{parsed.scheme}://{username}:{password}@{parsed.netloc}{parsed.path}"
...@@ -117,10 +166,12 @@ class Authenticator: ...@@ -117,10 +166,12 @@ class Authenticator:
self, method: str, url: str, raise_for_status: bool = True, **kwargs: Any self, method: str, url: str, raise_for_status: bool = True, **kwargs: Any
) -> requests.Response: ) -> requests.Response:
request = requests.Request(method, url) request = requests.Request(method, url)
username, password = self.get_credentials_for_url(url) credential = self.get_credentials_for_url(url)
if username is not None and password is not None: if credential.username is not None or credential.password is not None:
request = requests.auth.HTTPBasicAuth(username, password)(request) request = requests.auth.HTTPBasicAuth(
credential.username or "", credential.password or ""
)(request)
session = self.get_session(url=url) session = self.get_session(url=url)
prepared_request = session.prepare_request(request) prepared_request = session.prepare_request(request)
...@@ -180,18 +231,51 @@ class Authenticator: ...@@ -180,18 +231,51 @@ class Authenticator:
def post(self, url: str, **kwargs: Any) -> requests.Response: def post(self, url: str, **kwargs: Any) -> requests.Response:
return self.request("post", url, **kwargs) return self.request("post", url, **kwargs)
def get_credentials_for_url(self, url: str) -> tuple[str | None, str | None]: def _get_credentials_for_repository(
parsed_url = urllib.parse.urlsplit(url) self, repository: AuthenticatorRepositoryConfig, username: str | None = None
) -> HTTPAuthCredential:
# cache repository credentials by repository url to avoid multiple keyring
# backend queries when packages are being downloaded from the same source
key = f"{repository.url}#username={username or ''}"
netloc = parsed_url.netloc if key not in self._credentials:
self._credentials[key] = repository.get_http_credentials(
password_manager=self._password_manager, username=username
)
return self._credentials[key]
def _get_credentials_for_url(self, url: str) -> HTTPAuthCredential:
repository = self.get_repository_config_for_url(url)
credentials: tuple[str | None, str | None] = self._credentials.get( credential = (
netloc, (None, None) self._get_credentials_for_repository(repository=repository)
if repository is not None
else HTTPAuthCredential()
) )
if credentials == (None, None): if credential.password is None:
parsed_url = urllib.parse.urlsplit(url)
netloc = parsed_url.netloc
credential = self._password_manager.keyring.get_credential(
url, netloc, username=credential.username
)
return HTTPAuthCredential(
username=credential.username, password=credential.password
)
return credential
def get_credentials_for_url(self, url: str) -> HTTPAuthCredential:
parsed_url = urllib.parse.urlsplit(url)
netloc = parsed_url.netloc
if url not in self._credentials:
if "@" not in netloc: if "@" not in netloc:
credentials = self._get_credentials_for_netloc(netloc) # no credentials were provided in the url, try finding the
# best repository configuration
self._credentials[url] = self._get_credentials_for_url(url)
else: else:
# Split from the right because that's how urllib.parse.urlsplit() # Split from the right because that's how urllib.parse.urlsplit()
# behaves if more than one @ is present (which can be checked using # behaves if more than one @ is present (which can be checked using
...@@ -201,110 +285,89 @@ class Authenticator: ...@@ -201,110 +285,89 @@ class Authenticator:
# behaves if more than one : is present (which again can be checked # behaves if more than one : is present (which again can be checked
# using the password attribute of the return value) # using the password attribute of the return value)
user, password = auth.split(":", 1) if ":" in auth else (auth, "") user, password = auth.split(":", 1) if ":" in auth else (auth, "")
credentials = ( self._credentials[url] = HTTPAuthCredential(
urllib.parse.unquote(user), urllib.parse.unquote(user),
urllib.parse.unquote(password), urllib.parse.unquote(password),
) )
if any(credential is not None for credential in credentials): return self._credentials[url]
credentials = (credentials[0] or "", credentials[1] or "")
self._credentials[netloc] = credentials
return credentials
def get_pypi_token(self, name: str) -> str | None: def get_pypi_token(self, name: str) -> str | None:
return self._password_manager.get_pypi_token(name) return self._password_manager.get_pypi_token(name)
def get_http_auth(self, name: str) -> dict[str, str | None] | None: def get_http_auth(
return self._get_http_auth(name, None) self, name: str, username: str | None = None
) -> HTTPAuthCredential | None:
def _get_http_auth(
self, name: str, netloc: str | None
) -> dict[str, str | None] | None:
if name == "pypi": if name == "pypi":
url = "https://upload.pypi.org/legacy/" repository = AuthenticatorRepositoryConfig(
name, "https://upload.pypi.org/legacy/"
)
else: else:
url = self._config.get(f"repositories.{name}.url") if name not in self.configured_repositories:
if not url:
return None return None
repository = self.configured_repositories[name]
parsed_url = urllib.parse.urlsplit(url) return self._get_credentials_for_repository(
repository=repository, username=username
if netloc is None or netloc == parsed_url.netloc: )
auth = self._password_manager.get_http_auth(name)
auth = auth or {}
if auth.get("password") is None:
username = auth.get("username")
auth = self._get_credentials_for_netloc_from_keyring(
url, parsed_url.netloc, username
)
return auth
return None
def _get_credentials_for_netloc(self, netloc: str) -> tuple[str | None, str | None]:
for repository_name, _ in self._get_repository_netlocs():
auth = self._get_http_auth(repository_name, netloc)
if auth is None:
continue
return auth.get("username"), auth.get("password") @property
def configured_repositories(self) -> dict[str, AuthenticatorRepositoryConfig]:
if self._configured_repositories is None:
self._configured_repositories = {}
for repository_name in self._config.get("repositories", []):
url = self._config.get(f"repositories.{repository_name}.url")
self._configured_repositories[
repository_name
] = AuthenticatorRepositoryConfig(repository_name, url)
return None, None return self._configured_repositories
def get_certs_for_url(self, url: str) -> dict[str, Path | None]: def get_certs_for_url(self, url: str) -> dict[str, Path | None]:
if url not in self._certs:
self._certs[url] = self._get_certs_for_url(url)
return self._certs[url]
@functools.lru_cache(maxsize=None)
def get_repository_config_for_url(
self, url: str
) -> AuthenticatorRepositoryConfig | None:
parsed_url = urllib.parse.urlsplit(url) parsed_url = urllib.parse.urlsplit(url)
candidates_netloc_only = []
candidates_path_match = []
for repository in self.configured_repositories.values():
if repository.netloc == parsed_url.netloc:
if parsed_url.path.startswith(repository.path) or commonprefix(
(parsed_url.path, repository.path)
):
candidates_path_match.append(repository)
continue
candidates_netloc_only.append(repository)
if candidates_path_match:
candidates = candidates_path_match
elif candidates_netloc_only:
candidates = candidates_netloc_only
else:
return None
netloc = parsed_url.netloc if len(candidates) > 1:
logger.debug(
"Multiple source configurations found for %s - %s",
parsed_url.netloc,
", ".join(map(lambda c: c.name, candidates)),
)
# prefer the more specific path
candidates.sort(
key=lambda c: len(commonprefix([parsed_url.path, c.path])), reverse=True
)
return self._certs.setdefault( return candidates[0]
netloc,
self._get_certs_for_netloc_from_config(netloc),
)
def _get_repository_netlocs(self) -> Iterator[tuple[str, str]]: def _get_certs_for_url(self, url: str) -> dict[str, Path | None]:
for repository_name in self._config.get("repositories", []): selected = self.get_repository_config_for_url(url)
url = self._config.get(f"repositories.{repository_name}.url") if selected:
parsed_url = urllib.parse.urlsplit(url) return selected.certs(config=self._config)
yield repository_name, parsed_url.netloc return {"cert": None, "verify": None}
def _get_credentials_for_netloc_from_keyring(
self, url: str, netloc: str, username: str | None
) -> dict[str, str | None] | None:
import keyring
cred = keyring.get_credential(url, username)
if cred is not None:
return {
"username": cred.username,
"password": cred.password,
}
cred = keyring.get_credential(netloc, username)
if cred is not None:
return {
"username": cred.username,
"password": cred.password,
}
if username:
return {
"username": username,
"password": None,
}
return None
def _get_certs_for_netloc_from_config(self, netloc: str) -> dict[str, Path | None]:
certs: dict[str, Path | None] = {"cert": None, "verify": None}
for repository_name, repository_netloc in self._get_repository_netlocs():
if netloc == repository_netloc:
certs["cert"] = get_client_cert(self._config, repository_name)
certs["verify"] = get_cert(self._config, repository_name)
break
return certs
from __future__ import annotations from __future__ import annotations
import dataclasses
import logging import logging
from contextlib import suppress from contextlib import suppress
...@@ -22,6 +23,12 @@ class KeyRingError(Exception): ...@@ -22,6 +23,12 @@ class KeyRingError(Exception):
pass pass
@dataclasses.dataclass
class HTTPAuthCredential:
username: str | None = dataclasses.field(default=None)
password: str | None = dataclasses.field(default=None)
class KeyRing: class KeyRing:
def __init__(self, namespace: str) -> None: def __init__(self, namespace: str) -> None:
self._namespace = namespace self._namespace = namespace
...@@ -32,6 +39,25 @@ class KeyRing: ...@@ -32,6 +39,25 @@ class KeyRing:
def is_available(self) -> bool: def is_available(self) -> bool:
return self._is_available return self._is_available
def get_credential(
self, *names: str, username: str | None = None
) -> HTTPAuthCredential:
default = HTTPAuthCredential(username=username, password=None)
if not self.is_available():
return default
import keyring
for name in names:
credential = keyring.get_credential(name, username)
if credential:
return HTTPAuthCredential(
username=credential.username, password=credential.password
)
return default
def get_password(self, name: str, username: str) -> str | None: def get_password(self, name: str, username: str) -> str | None:
if not self.is_available(): if not self.is_available():
return None return None
......
...@@ -28,6 +28,11 @@ if TYPE_CHECKING: ...@@ -28,6 +28,11 @@ if TYPE_CHECKING:
from _pytest.monkeypatch import MonkeyPatch from _pytest.monkeypatch import MonkeyPatch
@pytest.fixture(autouse=True)
def _use_simple_keyring(with_simple_keyring: None) -> None:
pass
class MockRepository(LegacyRepository): class MockRepository(LegacyRepository):
FIXTURES = Path(__file__).parent / "fixtures" / "legacy" FIXTURES = Path(__file__).parent / "fixtures" / "legacy"
......
...@@ -22,6 +22,11 @@ if TYPE_CHECKING: ...@@ -22,6 +22,11 @@ if TYPE_CHECKING:
from pytest_mock import MockerFixture from pytest_mock import MockerFixture
@pytest.fixture(autouse=True)
def _use_simple_keyring(with_simple_keyring: None) -> None:
pass
class MockRepository(PyPiRepository): class MockRepository(PyPiRepository):
JSON_FIXTURES = Path(__file__).parent / "fixtures" / "pypi.org" / "json" JSON_FIXTURES = Path(__file__).parent / "fixtures" / "pypi.org" / "json"
......
from __future__ import annotations from __future__ import annotations
import base64
import re import re
import uuid import uuid
...@@ -286,19 +287,16 @@ def test_authenticator_request_retries_on_status_code( ...@@ -286,19 +287,16 @@ def test_authenticator_request_retries_on_status_code(
assert sleep.call_count == attempts assert sleep.call_count == attempts
@pytest.fixture
def environment_repository_credentials(monkeypatch: MonkeyPatch) -> None:
monkeypatch.setenv("POETRY_HTTP_BASIC_FOO_USERNAME", "bar")
monkeypatch.setenv("POETRY_HTTP_BASIC_FOO_PASSWORD", "baz")
def test_authenticator_uses_env_provided_credentials( def test_authenticator_uses_env_provided_credentials(
config: Config, config: Config,
environ: None, environ: None,
mock_remote: type[httpretty.httpretty], mock_remote: type[httpretty.httpretty],
http: type[httpretty.httpretty], http: type[httpretty.httpretty],
environment_repository_credentials: None, monkeypatch: MonkeyPatch,
): ):
monkeypatch.setenv("POETRY_HTTP_BASIC_FOO_USERNAME", "bar")
monkeypatch.setenv("POETRY_HTTP_BASIC_FOO_PASSWORD", "baz")
config.merge({"repositories": {"foo": {"url": "https://foo.bar/simple/"}}}) config.merge({"repositories": {"foo": {"url": "https://foo.bar/simple/"}}})
authenticator = Authenticator(config, NullIO()) authenticator = Authenticator(config, NullIO())
...@@ -352,3 +350,177 @@ def test_authenticator_uses_certs_from_config_if_not_provided( ...@@ -352,3 +350,177 @@ def test_authenticator_uses_certs_from_config_if_not_provided(
assert Path(kwargs["verify"]) == Path(cert or configured_cert) assert Path(kwargs["verify"]) == Path(cert or configured_cert)
assert Path(kwargs["cert"]) == Path(client_cert or configured_client_cert) assert Path(kwargs["cert"]) == Path(client_cert or configured_client_cert)
def test_authenticator_uses_credentials_from_config_matched_by_url_path(
config: Config, mock_remote: None, http: type[httpretty.httpretty]
):
config.merge(
{
"repositories": {
"foo-alpha": {"url": "https://foo.bar/alpha/files/simple/"},
"foo-beta": {"url": "https://foo.bar/beta/files/simple/"},
},
"http-basic": {
"foo-alpha": {"username": "bar", "password": "alpha"},
"foo-beta": {"username": "baz", "password": "beta"},
},
}
)
authenticator = Authenticator(config, NullIO())
authenticator.request("get", "https://foo.bar/alpha/files/simple/foo-0.1.0.tar.gz")
request = http.last_request()
basic_auth = base64.b64encode(b"bar:alpha").decode()
assert request.headers["Authorization"] == f"Basic {basic_auth}"
# Make request on second repository with the same netloc but different credentials
authenticator.request("get", "https://foo.bar/beta/files/simple/foo-0.1.0.tar.gz")
request = http.last_request()
basic_auth = base64.b64encode(b"baz:beta").decode()
assert request.headers["Authorization"] == f"Basic {basic_auth}"
def test_authenticator_uses_credentials_from_config_with_at_sign_in_path(
config: Config, mock_remote: None, http: type[httpretty.httpretty]
):
config.merge(
{
"repositories": {
"foo": {"url": "https://foo.bar/beta/files/simple/"},
},
"http-basic": {
"foo": {"username": "bar", "password": "baz"},
},
}
)
authenticator = Authenticator(config, NullIO())
authenticator.request("get", "https://foo.bar/beta/files/simple/f@@-0.1.0.tar.gz")
request = http.last_request()
basic_auth = base64.b64encode(b"bar:baz").decode()
assert request.headers["Authorization"] == f"Basic {basic_auth}"
def test_authenticator_falls_back_to_keyring_url_matched_by_path(
config: Config,
mock_remote: None,
http: type[httpretty.httpretty],
with_simple_keyring: None,
dummy_keyring: DummyBackend,
):
config.merge(
{
"repositories": {
"foo-alpha": {"url": "https://foo.bar/alpha/files/simple/"},
"foo-beta": {"url": "https://foo.bar/beta/files/simple/"},
}
}
)
dummy_keyring.set_password(
"https://foo.bar/alpha/files/simple/", None, SimpleCredential(None, "bar")
)
dummy_keyring.set_password(
"https://foo.bar/beta/files/simple/", None, SimpleCredential(None, "baz")
)
authenticator = Authenticator(config, NullIO())
authenticator.request("get", "https://foo.bar/alpha/files/simple/foo-0.1.0.tar.gz")
request = http.last_request()
basic_auth = base64.b64encode(b":bar").decode()
assert request.headers["Authorization"] == f"Basic {basic_auth}"
authenticator.request("get", "https://foo.bar/beta/files/simple/foo-0.1.0.tar.gz")
request = http.last_request()
basic_auth = base64.b64encode(b":baz").decode()
assert request.headers["Authorization"] == f"Basic {basic_auth}"
def test_authenticator_uses_env_provided_credentials_matched_by_url_path(
config: Config,
environ: None,
mock_remote: type[httpretty.httpretty],
http: type[httpretty.httpretty],
monkeypatch: MonkeyPatch,
):
monkeypatch.setenv("POETRY_HTTP_BASIC_FOO_ALPHA_USERNAME", "bar")
monkeypatch.setenv("POETRY_HTTP_BASIC_FOO_ALPHA_PASSWORD", "alpha")
monkeypatch.setenv("POETRY_HTTP_BASIC_FOO_BETA_USERNAME", "baz")
monkeypatch.setenv("POETRY_HTTP_BASIC_FOO_BETA_PASSWORD", "beta")
config.merge(
{
"repositories": {
"foo-alpha": {"url": "https://foo.bar/alpha/files/simple/"},
"foo-beta": {"url": "https://foo.bar/beta/files/simple/"},
}
}
)
authenticator = Authenticator(config, NullIO())
authenticator.request("get", "https://foo.bar/alpha/files/simple/foo-0.1.0.tar.gz")
request = http.last_request()
basic_auth = base64.b64encode(b"bar:alpha").decode()
assert request.headers["Authorization"] == f"Basic {basic_auth}"
authenticator.request("get", "https://foo.bar/beta/files/simple/foo-0.1.0.tar.gz")
request = http.last_request()
basic_auth = base64.b64encode(b"baz:beta").decode()
assert request.headers["Authorization"] == f"Basic {basic_auth}"
def test_authenticator_azure_feed_guid_credentials(
config: Config,
mock_remote: None,
http: type[httpretty.httpretty],
with_simple_keyring: None,
dummy_keyring: DummyBackend,
):
config.merge(
{
"repositories": {
"alpha": {
"url": "https://foo.bar/org-alpha/_packaging/feed/pypi/simple/"
},
"beta": {
"url": "https://foo.bar/org-beta/_packaging/feed/pypi/simple/"
},
},
"http-basic": {
"alpha": {"username": "foo", "password": "bar"},
"beta": {"username": "baz", "password": "qux"},
},
}
)
authenticator = Authenticator(config, NullIO())
authenticator.request(
"get",
"https://foo.bar/org-alpha/_packaging/GUID/pypi/simple/a/1.0.0/a-1.0.0.whl",
)
request = http.last_request()
basic_auth = base64.b64encode(b"foo:bar").decode()
assert request.headers["Authorization"] == f"Basic {basic_auth}"
authenticator.request(
"get",
"https://foo.bar/org-beta/_packaging/GUID/pypi/simple/b/1.0.0/a-1.0.0.whl",
)
request = http.last_request()
basic_auth = base64.b64encode(b"baz:qux").decode()
assert request.headers["Authorization"] == f"Basic {basic_auth}"
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment