Commit 299a885e by Sébastien Eustace Committed by GitHub

Improve password management (#1788)

parent 1c45012e
......@@ -5,8 +5,6 @@ from cleo import argument
from cleo import option
from poetry.factory import Factory
from poetry.utils.helpers import keyring_repository_password_del
from poetry.utils.helpers import keyring_repository_password_set
from .command import Command
......@@ -181,11 +179,14 @@ To remove a repository (repo is a short alias for repositories):
# handle auth
m = re.match(r"^(http-basic|pypi-token)\.(.+)", self.argument("key"))
if m:
from poetry.utils.password_manager import PasswordManager
password_manager = PasswordManager(config)
if self.option("unset"):
keyring_repository_password_del(config, m.group(2))
config.auth_config_source.remove_property(
"{}.{}".format(m.group(1), m.group(2))
)
if m.group(1) == "http-basic":
password_manager.delete_http_password(m.group(2))
elif m.group(1) == "pypi-token":
password_manager.delete_pypi_token(m.group(2))
return 0
......@@ -203,15 +204,7 @@ To remove a repository (repo is a short alias for repositories):
username = values[0]
password = values[1]
property_value = dict(username=username)
try:
keyring_repository_password_set(m.group(2), username, password)
except RuntimeError:
property_value.update(password=password)
config.auth_config_source.add_property(
"{}.{}".format(m.group(1), m.group(2)), property_value
)
password_manager.set_http_password(m.group(2), username, password)
elif m.group(1) == "pypi-token":
if len(values) != 1:
raise ValueError(
......@@ -220,9 +213,7 @@ To remove a repository (repo is a short alias for repositories):
token = values[0]
config.auth_config_source.add_property(
"{}.{}".format(m.group(1), m.group(2)), token
)
password_manager.set_pypi_token(m.group(2), token)
return 0
......
......@@ -53,7 +53,7 @@ class ApplicationConfig(BaseApplicationConfig):
io = event.io
loggers = ["poetry.packages.package"]
loggers = ["poetry.packages.package", "poetry.utils.password_manager"]
loggers += command.loggers
......
......@@ -233,7 +233,8 @@ class Factory:
): # type: (Dict[str, str], Config) -> LegacyRepository
from .repositories.auth import Auth
from .repositories.legacy_repository import LegacyRepository
from .utils.helpers import get_client_cert, get_cert, get_http_basic_auth
from .utils.helpers import get_client_cert, get_cert
from .utils.password_manager import PasswordManager
if "url" in source:
# PyPI-like repository
......@@ -242,11 +243,12 @@ class Factory:
else:
raise RuntimeError("Unsupported source specified")
password_manager = PasswordManager(auth_config)
name = source["name"]
url = source["url"]
credentials = get_http_basic_auth(auth_config, name)
credentials = password_manager.get_http_auth(name)
if credentials:
auth = Auth(url, credentials[0], credentials[1])
auth = Auth(url, credentials["username"], credentials["password"])
else:
auth = None
......
......@@ -2,7 +2,7 @@ import logging
from poetry.utils.helpers import get_cert
from poetry.utils.helpers import get_client_cert
from poetry.utils.helpers import get_http_basic_auth
from poetry.utils.password_manager import PasswordManager
from .uploader import Uploader
......@@ -20,6 +20,7 @@ class Publisher:
self._package = poetry.package
self._io = io
self._uploader = Uploader(poetry, io)
self._password_manager = PasswordManager(poetry.config)
@property
def files(self):
......@@ -60,21 +61,21 @@ class Publisher:
if not (username and password):
# Check if we have a token first
token = self._poetry.config.get("pypi-token.{}".format(repository_name))
token = self._password_manager.get_pypi_token(repository_name)
if token:
logger.debug("Found an API token for {}.".format(repository_name))
username = "__token__"
password = token
else:
auth = get_http_basic_auth(self._poetry.config, repository_name)
auth = self._password_manager.get_http_auth(repository_name)
if auth:
logger.debug(
"Found authentication information for {}.".format(
repository_name
)
)
username = auth[0]
password = auth[1]
username = auth["username"]
password = auth["password"]
resolved_client_cert = client_cert or get_client_cert(
self._poetry.config, repository_name
......
......@@ -8,11 +8,6 @@ from contextlib import contextmanager
from typing import List
from typing import Optional
from keyring import delete_password
from keyring import get_password
from keyring import set_password
from keyring.errors import KeyringError
from poetry.config.config import Config
from poetry.utils._compat import Path
from poetry.version import Version
......@@ -95,53 +90,6 @@ def parse_requires(requires): # type: (str) -> List[str]
return requires_dist
def keyring_service_name(repository_name): # type: (str) -> str
return "{}-{}".format("poetry-repository", repository_name)
def keyring_repository_password_get(
repository_name, username
): # type: (str, str) -> Optional[str]
try:
return get_password(keyring_service_name(repository_name), username)
except (RuntimeError, KeyringError):
return None
def keyring_repository_password_set(
repository_name, username, password
): # type: (str, str, str) -> None
try:
set_password(keyring_service_name(repository_name), username, password)
except (RuntimeError, KeyringError):
raise RuntimeError("Failed to store password in keyring")
def keyring_repository_password_del(
config, repository_name
): # type: (Config, str) -> None
try:
repo_auth = config.get("http-basic.{}".format(repository_name))
if repo_auth and "username" in repo_auth:
delete_password(
keyring_service_name(repository_name), repo_auth["username"]
)
except (RuntimeError, KeyringError):
pass
def get_http_basic_auth(
config, repository_name
): # type: (Config, str) -> Optional[tuple]
repo_auth = config.get("http-basic.{}".format(repository_name))
if repo_auth:
username, password = repo_auth["username"], repo_auth.get("password")
if password is None:
password = keyring_repository_password_get(repository_name, username)
return username, password
return None
def get_cert(config, repository_name): # type: (Config, str) -> Optional[Path]
cert = config.get("certificates.{}.cert".format(repository_name))
if cert:
......
import logging
logger = logging.getLogger(__name__)
class PasswordManagerError(Exception):
pass
class KeyRingError(Exception):
pass
class KeyRing:
def __init__(self, namespace):
self._namespace = namespace
self._is_available = True
self._check()
def is_available(self):
return self._is_available
def get_password(self, name, username):
if not self.is_available():
return
import keyring
import keyring.errors
name = self.get_entry_name(name)
try:
return keyring.get_password(name, username)
except (RuntimeError, keyring.errors.KeyringError):
raise KeyRingError(
"Unable to retrieve the password for {} from the key ring".format(name)
)
def set_password(self, name, username, password):
if not self.is_available():
return
import keyring
import keyring.errors
name = self.get_entry_name(name)
try:
keyring.set_password(name, username, password)
except (RuntimeError, keyring.errors.KeyringError) as e:
raise KeyRingError(
"Unable to store the password for {} in the key ring: {}".format(
name, str(e)
)
)
def delete_password(self, name, username):
if not self.is_available():
return
import keyring
import keyring.errors
name = self.get_entry_name(name)
try:
keyring.delete_password(name, username)
except (RuntimeError, keyring.errors.KeyringError):
raise KeyRingError(
"Unable to delete the password for {} from the key ring".format(name)
)
def get_entry_name(self, name):
return "{}-{}".format(self._namespace, name)
def _check(self):
try:
import keyring
except Exception as e:
logger.debug("An error occurred while importing keyring: {}".format(str(e)))
self._is_available = False
return
backend = keyring.get_keyring()
name = backend.name.split(" ")[0]
if name == "fail":
logger.debug("No suitable keyring backend found")
self._is_available = False
elif "plaintext" in backend.name.lower():
logger.debug("Only a plaintext keyring backend is available. Not using it.")
self._is_available = False
elif name == "chainer":
try:
import keyring.backend
backends = keyring.backend.get_all_keyring()
self._is_available = any(
[
b.name.split(" ")[0] not in ["chainer", "fail"]
and "plaintext" not in b.name.lower()
for b in backends
]
)
except Exception:
self._is_available = False
if not self._is_available:
logger.warning("No suitable keyring backends were found")
class PasswordManager:
def __init__(self, config):
self._config = config
self._keyring = KeyRing("poetry-repository")
if not self._keyring.is_available():
logger.warning("Using a plaintext file to store and retrieve credentials")
@property
def keyring(self):
return self._keyring
def set_pypi_token(self, name, token):
if not self._keyring.is_available():
self._config.auth_config_source.add_property(
"pypi-token.{}".format(name), token
)
else:
self._keyring.set_password(name, "__token__", token)
def get_pypi_token(self, name):
if not self._keyring.is_available():
return self._config.get("pypi-token.{}".format(name))
return self._keyring.get_password(name, "__token__")
def delete_pypi_token(self, name):
if not self._keyring.is_available():
return self._config.auth_config_source.remove_property(
"pypi-token.{}".format(name)
)
self._keyring.delete_password(name, "__token__")
def get_http_auth(self, name):
auth = self._config.get("http-basic.{}".format(name))
if not auth:
return None
username, password = auth["username"], auth.get("password")
if password is None:
password = self._keyring.get_password(name, username)
return {
"username": username,
"password": password,
}
def set_http_password(self, name, username, password):
auth = {"username": username}
if not self._keyring.is_available():
auth["password"] = password
else:
self._keyring.set_password(name, username, password)
self._config.auth_config_source.add_property("http-basic.{}".format(name), auth)
def delete_http_password(self, name):
auth = self.get_http_auth(name)
if not auth or "username" not in auth:
return
try:
self._keyring.delete_password(name, auth["username"])
except KeyRingError:
pass
self._config.auth_config_source.remove_property("http-basic.{}".format(name))
......@@ -52,6 +52,11 @@ def auth_config_source():
@pytest.fixture
def config(config_source, auth_config_source, mocker):
import keyring
from keyring.backends.fail import Keyring
keyring.set_keyring(Keyring())
c = Config()
c.merge(config_source.config)
c.set_config_source(config_source)
......
from poetry.utils._compat import Path
from poetry.utils.helpers import get_cert
from poetry.utils.helpers import get_client_cert
from poetry.utils.helpers import get_http_basic_auth
from poetry.utils.helpers import parse_requires
......@@ -54,22 +53,6 @@ zipfile36>=0.1.0.0,<0.2.0.0
assert result == expected
def test_get_http_basic_auth(config):
config.merge({"http-basic": {"foo": {"username": "foo", "password": "bar"}}})
assert get_http_basic_auth(config, "foo") == ("foo", "bar")
def test_get_http_basic_auth_without_password(config):
config.merge({"http-basic": {"foo": {"username": "foo"}}})
assert get_http_basic_auth(config, "foo") == ("foo", None)
def test_get_http_basic_auth_missing(config):
assert get_http_basic_auth(config, "foo") is None
def test_get_cert(config):
ca_cert = "path/to/ca.pem"
config.merge({"certificates": {"foo": {"cert": ca_cert}}})
......
from uuid import uuid4
import pytest
from keyring import get_keyring
from keyring import set_keyring
from keyring.backend import KeyringBackend
from keyring.errors import KeyringError
from poetry.utils.helpers import keyring_repository_password_del
from poetry.utils.helpers import keyring_repository_password_get
from poetry.utils.helpers import keyring_repository_password_set
from poetry.utils.helpers import keyring_service_name
class DictKeyring(KeyringBackend):
priority = 1
def __init__(self):
self._storage = dict()
def set_password(self, servicename, username, password):
if servicename not in self._storage:
self._storage[servicename] = dict()
self._storage[servicename][username] = password
def get_password(self, servicename, username):
if servicename in self._storage:
return self._storage[servicename].get(username)
def delete_password(self, servicename, username):
if servicename in self._storage:
if username in self._storage[servicename]:
del self._storage[servicename][username]
if not self._storage[servicename]:
del self._storage[servicename]
class BrokenKeyring(KeyringBackend):
priority = 1
def set_password(self, servicename, username, password):
raise KeyringError()
def get_password(self, servicename, username):
raise KeyringError()
def delete_password(self, servicename, username):
raise KeyringError()
@pytest.fixture
def keyring(): # type: () -> KeyringBackend
k = DictKeyring()
set_keyring(k)
return k
@pytest.fixture
def broken_keyring(): # type: () -> KeyringBackend
k = BrokenKeyring()
set_keyring(k)
return k
@pytest.fixture
def repository(): # type: () -> str
return "test"
@pytest.fixture
def username(): # type: () -> str
return "username"
@pytest.fixture
def password(): # type: () -> str
return str(uuid4())
def test_keyring_repository_password_get(keyring, repository, username, password):
keyring.set_password(keyring_service_name(repository), username, password)
assert keyring_repository_password_get(repository, username) == password
def test_keyring_repository_password_get_not_set(keyring, repository, username):
assert keyring.get_password(keyring_service_name(repository), username) is None
assert keyring_repository_password_get(repository, username) is None
def test_keyring_repository_password_get_broken(broken_keyring):
assert get_keyring() == broken_keyring
assert keyring_repository_password_get("repository", "username") is None
def test_keyring_repository_password_set(keyring, repository, username, password):
keyring_repository_password_set(repository, username, password)
assert keyring.get_password(keyring_service_name(repository), username) == password
def test_keyring_repository_password_set_broken(broken_keyring):
assert get_keyring() == broken_keyring
with pytest.raises(RuntimeError):
keyring_repository_password_set(repository, "username", "password")
def test_keyring_repository_password_del(
keyring, config, repository, username, password
):
keyring.set_password(keyring_service_name(repository), username, password)
config.merge({"http-basic": {repository: {"username": username}}})
keyring_repository_password_del(config, repository)
assert keyring.get_password(keyring_service_name(repository), username) is None
def test_keyring_repository_password_del_not_set(keyring, config, repository, username):
config.merge({"http-basic": {repository: {"username": username}}})
keyring_repository_password_del(config, repository)
assert keyring.get_password(keyring_service_name(repository), username) is None
def test_keyring_repository_password_del_broken(broken_keyring, config):
assert get_keyring() == broken_keyring
keyring_repository_password_del(config, "repository")
import pytest
from keyring.backend import KeyringBackend
from poetry.utils.password_manager import KeyRing
from poetry.utils.password_manager import KeyRingError
from poetry.utils.password_manager import PasswordManager
class DummyBackend(KeyringBackend):
def __init__(self):
self._passwords = {}
@classmethod
def priority(cls):
return 42
def set_password(self, service, username, password):
self._passwords[service] = {username: password}
def get_password(self, service, username):
return self._passwords.get(service, {}).get(username)
def delete_password(self, service, username):
if service in self._passwords and username in self._passwords[service]:
del self._passwords[service][username]
@pytest.fixture()
def backend():
return DummyBackend()
@pytest.fixture()
def mock_available_backend(backend):
import keyring
keyring.set_keyring(backend)
@pytest.fixture()
def mock_unavailable_backend():
import keyring
from keyring.backends.fail import Keyring
keyring.set_keyring(Keyring())
@pytest.fixture()
def mock_chainer_backend(mocker):
from keyring.backends.fail import Keyring
mocker.patch("keyring.backend.get_all_keyring", [Keyring()])
import keyring
from keyring.backends.chainer import ChainerBackend
keyring.set_keyring(ChainerBackend())
def test_set_http_password(config, mock_available_backend, backend):
manager = PasswordManager(config)
assert manager.keyring.is_available()
manager.set_http_password("foo", "bar", "baz")
assert "baz" == backend.get_password("poetry-repository-foo", "bar")
auth = config.get("http-basic.foo")
assert "bar" == auth["username"]
assert "password" not in auth
def test_get_http_auth(config, mock_available_backend, backend):
backend.set_password("poetry-repository-foo", "bar", "baz")
config.auth_config_source.add_property("http-basic.foo", {"username": "bar"})
manager = PasswordManager(config)
assert manager.keyring.is_available()
auth = manager.get_http_auth("foo")
assert "bar" == auth["username"]
assert "baz" == auth["password"]
def test_delete_http_password(config, mock_available_backend, backend):
backend.set_password("poetry-repository-foo", "bar", "baz")
config.auth_config_source.add_property("http-basic.foo", {"username": "bar"})
manager = PasswordManager(config)
assert manager.keyring.is_available()
manager.delete_http_password("foo")
assert backend.get_password("poetry-repository-foo", "bar") is None
assert config.get("http-basic.foo") is None
def test_set_pypi_token(config, mock_available_backend, backend):
manager = PasswordManager(config)
assert manager.keyring.is_available()
manager.set_pypi_token("foo", "baz")
assert config.get("pypi-token.foo") is None
assert "baz" == backend.get_password("poetry-repository-foo", "__token__")
def test_get_pypi_token(config, mock_available_backend, backend):
backend.set_password("poetry-repository-foo", "__token__", "baz")
manager = PasswordManager(config)
assert manager.keyring.is_available()
assert "baz" == manager.get_pypi_token("foo")
def test_delete_pypi_token(config, mock_available_backend, backend):
backend.set_password("poetry-repository-foo", "__token__", "baz")
manager = PasswordManager(config)
assert manager.keyring.is_available()
manager.delete_pypi_token("foo")
assert backend.get_password("poetry-repository-foo", "__token__") is None
def test_set_http_password_with_unavailable_backend(config, mock_unavailable_backend):
manager = PasswordManager(config)
assert not manager.keyring.is_available()
manager.set_http_password("foo", "bar", "baz")
auth = config.get("http-basic.foo")
assert "bar" == auth["username"]
assert "baz" == auth["password"]
def test_get_http_auth_with_unavailable_backend(config, mock_unavailable_backend):
config.auth_config_source.add_property(
"http-basic.foo", {"username": "bar", "password": "baz"}
)
manager = PasswordManager(config)
assert not manager.keyring.is_available()
auth = manager.get_http_auth("foo")
assert "bar" == auth["username"]
assert "baz" == auth["password"]
def test_delete_http_password_with_unavailable_backend(
config, mock_unavailable_backend
):
config.auth_config_source.add_property(
"http-basic.foo", {"username": "bar", "password": "baz"}
)
manager = PasswordManager(config)
assert not manager.keyring.is_available()
manager.delete_http_password("foo")
assert config.get("http-basic.foo") is None
def test_set_pypi_token_with_unavailable_backend(config, mock_unavailable_backend):
manager = PasswordManager(config)
assert not manager.keyring.is_available()
manager.set_pypi_token("foo", "baz")
assert "baz" == config.get("pypi-token.foo")
def test_get_pypi_token_with_unavailable_backend(config, mock_unavailable_backend):
config.auth_config_source.add_property("pypi-token.foo", "baz")
manager = PasswordManager(config)
assert not manager.keyring.is_available()
assert "baz" == manager.get_pypi_token("foo")
def test_delete_pypi_token_with_unavailable_backend(config, mock_unavailable_backend):
config.auth_config_source.add_property("pypi-token.foo", "baz")
manager = PasswordManager(config)
assert not manager.keyring.is_available()
manager.delete_pypi_token("foo")
assert config.get("pypi-token.foo") is None
def test_keyring_raises_errors_on_keyring_errors(mocker, mock_unavailable_backend):
mocker.patch("poetry.utils.password_manager.KeyRing._check")
key_ring = KeyRing("poetry")
with pytest.raises(KeyRingError):
key_ring.set_password("foo", "bar", "baz")
with pytest.raises(KeyRingError):
key_ring.get_password("foo", "bar")
with pytest.raises(KeyRingError):
key_ring.delete_password("foo", "bar")
def test_keyring_with_chainer_backend_and_not_compatible_only_should_be_unavailable(
mock_chainer_backend,
):
key_ring = KeyRing("poetry")
assert not key_ring.is_available()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment