Commit e512cbee by Chad Crawford Committed by Bjorn Neergaard

Add `poetry.utils.cache` to replace cachy dependency.

parent 633f8f6b
from __future__ import annotations
import contextlib
import dataclasses
import hashlib
import json
import shutil
import time
from pathlib import Path
from typing import Any
from typing import Callable
from typing import Generic
from typing import TypeVar
# Used by Cachy for items that do not expire.
MAX_DATE = 9999999999
T = TypeVar("T")
def decode(string: bytes, encodings: list[str] | None = None) -> str:
"""
Compatiblity decode function pulled from cachy.
:param string: The byte string to decode.
:param encodings: List of encodings to apply
:return: Decoded string
"""
if encodings is None:
encodings = ["utf-8", "latin1", "ascii"]
for encoding in encodings:
with contextlib.suppress(UnicodeDecodeError):
return string.decode(encoding)
return string.decode(encodings[0], errors="ignore")
def encode(string: str, encodings: list[str] | None = None) -> bytes:
"""
Compatibility encode function from cachy.
:param string: The string to encode.
:param encodings: List of encodings to apply
:return: Encoded byte string
"""
if encodings is None:
encodings = ["utf-8", "latin1", "ascii"]
for encoding in encodings:
with contextlib.suppress(UnicodeDecodeError):
return string.encode(encoding)
return string.encode(encodings[0], errors="ignore")
def _expiration(minutes: int) -> int:
"""
Calculates the time in seconds since epoch that occurs 'minutes' from now.
:param minutes: The number of minutes to count forward
"""
return round(time.time()) + minutes * 60
_HASHES = {
"md5": (hashlib.md5, 2),
"sha1": (hashlib.sha1, 4),
"sha256": (hashlib.sha256, 8),
}
@dataclasses.dataclass(frozen=True)
class CacheItem(Generic[T]):
"""
Stores data and metadata for cache items.
"""
data: T
expires: int | None = None
@property
def expired(self) -> bool:
"""
Return true if the cache item has exceeded its expiration period.
"""
return self.expires is not None and time.time() >= self.expires
@dataclasses.dataclass(frozen=True)
class FileCache(Generic[T]):
"""
Cachy-compatible minimal file cache. Stores subsequent data in a JSON format.
:param path: The path that the cache starts at.
:param hash_type: The hash to use for encoding keys/building directories.
"""
path: Path
hash_type: str = "sha256"
def get(self, key: str) -> T | None:
return self._get_payload(key)
def has(self, key: str) -> bool:
"""
Determine if a file exists and has not expired in the cache.
:param key: The cache key
:returns: True if the key exists in the cache
"""
return self.get(key) is not None
def put(self, key: str, value: Any, minutes: int | None = None) -> None:
"""
Store an item in the cache.
:param key: The cache key
:param value: The cache value
:param minutes: The lifetime in minutes of the cached value
"""
payload: CacheItem[Any] = CacheItem(
value, expires=_expiration(minutes) if minutes is not None else None
)
path = self._path(key)
path.parent.mkdir(parents=True, exist_ok=True)
with open(path, "wb") as f:
f.write(self._serialize(payload))
def forget(self, key: str) -> None:
"""
Remove an item from the cache.
:param key: The cache key
"""
path = self._path(key)
if path.exists():
path.unlink()
def flush(self) -> None:
"""
Clear the cache.
"""
shutil.rmtree(self.path)
def remember(
self, key: str, callback: T | Callable[[], T], minutes: int | None = None
) -> T:
"""
Get an item from the cache, or use a default from callback.
:param key: The cache key
:param callback: Callback function providing default value
:param minutes: The lifetime in minutes of the cached value
"""
value = self.get(key)
if value is None:
value = callback() if callable(callback) else callback
self.put(key, value, minutes)
return value
def _get_payload(self, key: str) -> T | None:
path = self._path(key)
if not path.exists():
return None
with open(path, "rb") as f:
payload = self._deserialize(f.read())
if payload.expired:
self.forget(key)
return None
else:
return payload.data
def _path(self, key: str) -> Path:
hash_type, parts_count = _HASHES[self.hash_type]
h = hash_type(encode(key)).hexdigest()
parts = [h[i : i + 2] for i in range(0, len(h), 2)][:parts_count]
return Path(self.path, *parts, h)
def _serialize(self, payload: CacheItem[T]) -> bytes:
expires = payload.expires or MAX_DATE
data = json.dumps(payload.data)
return encode(f"{expires:010d}{data}")
def _deserialize(self, data_raw: bytes) -> CacheItem[T]:
data_str = decode(data_raw)
data = json.loads(data_str[10:])
expires = int(data_str[:10])
return CacheItem(data, expires)
from __future__ import annotations
from typing import TYPE_CHECKING
from typing import Any
from typing import TypeVar
from typing import Union
from unittest.mock import Mock
import pytest
from cachy import CacheManager
from poetry.utils.cache import FileCache
if TYPE_CHECKING:
from pathlib import Path
from _pytest.monkeypatch import MonkeyPatch
from pytest import FixtureRequest
from pytest_mock import MockerFixture
from tests.conftest import Config
FILE_CACHE = Union[FileCache, CacheManager]
T = TypeVar("T")
@pytest.fixture
def repository_cache_dir(monkeypatch: MonkeyPatch, config: Config) -> Path:
return config.repository_cache_directory
def patch_cachy(cache: CacheManager) -> CacheManager:
old_put = cache.put
old_remember = cache.remember
def new_put(key: str, value: Any, minutes: int | None = None) -> Any:
if minutes is not None:
return old_put(key, value, minutes=minutes)
else:
return cache.forever(key, value)
cache.put = new_put
def new_remember(key: str, value: Any, minutes: int | None = None) -> Any:
if minutes is not None:
return old_remember(key, value, minutes=minutes)
else:
return cache.remember_forever(key, value)
cache.remember = new_remember
return cache
@pytest.fixture
def cachy_file_cache(repository_cache_dir: Path) -> CacheManager:
cache = CacheManager(
{
"default": "cache",
"serializer": "json",
"stores": {
"cache": {"driver": "file", "path": str(repository_cache_dir / "cache")}
},
}
)
return patch_cachy(cache)
@pytest.fixture
def poetry_file_cache(repository_cache_dir: Path) -> FileCache[T]:
return FileCache(repository_cache_dir / "cache")
@pytest.fixture
def cachy_dict_cache() -> CacheManager:
cache = CacheManager(
{
"default": "cache",
"serializer": "json",
"stores": {"cache": {"driver": "dict"}},
}
)
return patch_cachy(cache)
@pytest.mark.parametrize("cache_name", ["cachy_file_cache", "poetry_file_cache"])
def test_cache_get_put_has(cache_name: str, request: FixtureRequest) -> None:
cache = request.getfixturevalue(cache_name)
cache.put("key1", "value")
cache.put("key2", {"a": ["json-encoded", "value"]})
assert cache.get("key1") == "value"
assert cache.get("key2") == {"a": ["json-encoded", "value"]}
assert cache.has("key1")
assert cache.has("key2")
assert not cache.has("key3")
@pytest.mark.parametrize("cache_name", ["cachy_file_cache", "poetry_file_cache"])
def test_cache_forget(cache_name: str, request: FixtureRequest) -> None:
cache = request.getfixturevalue(cache_name)
cache.put("key1", "value")
cache.put("key2", "value")
assert cache.has("key1")
assert cache.has("key2")
cache.forget("key1")
assert not cache.has("key1")
assert cache.has("key2")
@pytest.mark.parametrize("cache_name", ["cachy_file_cache", "poetry_file_cache"])
def test_cache_flush(cache_name: str, request: FixtureRequest) -> None:
cache = request.getfixturevalue(cache_name)
cache.put("key1", "value")
cache.put("key2", "value")
assert cache.has("key1")
assert cache.has("key2")
cache.flush()
assert not cache.has("key1")
assert not cache.has("key2")
@pytest.mark.parametrize("cache_name", ["cachy_file_cache", "poetry_file_cache"])
def test_cache_remember(
cache_name: str, request: FixtureRequest, mocker: MockerFixture
) -> None:
cache = request.getfixturevalue(cache_name)
method = Mock(return_value="value2")
cache.put("key1", "value1")
assert cache.remember("key1", method) == "value1"
method.assert_not_called()
assert cache.remember("key2", method) == "value2"
method.assert_called()
@pytest.mark.parametrize("cache_name", ["cachy_file_cache", "poetry_file_cache"])
def test_cache_get_limited_minutes(
mocker: MockerFixture,
cache_name: str,
request: FixtureRequest,
) -> None:
cache = request.getfixturevalue(cache_name)
# needs to be 10 digits because cachy assumes it's a 10-digit int.
start_time = 1111111111
mocker.patch("time.time", return_value=start_time)
cache.put("key1", "value", minutes=5)
cache.put("key2", "value", minutes=5)
assert cache.get("key1") is not None
assert cache.get("key2") is not None
mocker.patch("time.time", return_value=start_time + 5 * 60 + 1)
# check to make sure that the cache deletes for has() and get()
assert not cache.has("key1")
assert cache.get("key2") is None
def test_cachy_compatibility(
cachy_file_cache: CacheManager, poetry_file_cache: FileCache[T]
) -> None:
"""
The new file cache should be able to support reading legacy caches.
"""
test_str = "value"
test_obj = {"a": ["json", "object"]}
cachy_file_cache.put("key1", test_str)
cachy_file_cache.put("key2", test_obj)
assert poetry_file_cache.get("key1") == test_str
assert poetry_file_cache.get("key2") == test_obj
poetry_file_cache.put("key3", test_str)
poetry_file_cache.put("key4", test_obj)
assert cachy_file_cache.get("key3") == test_str
assert cachy_file_cache.get("key4") == test_obj
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment