From ddd6a1eb74f4b377e7c246e88f4bebb0f6fa5830 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Sat, 17 Feb 2024 16:28:38 +0000 Subject: [PATCH] update mirror of ShineyDev/utility (#6) ShineyDev/utility@b245f4c25a814964e57a88fcca5bf081df85e8a1 Co-authored-by: github-actions[bot] --- pretty/utility/_mirror/__init__.py | 38 ++++++ pretty/utility/_mirror/cache.py | 186 ++++++++++++++++++++++++++ pretty/utility/_mirror/typing.py | 26 ++++ pretty/utility/_mirror/version.py | 67 ++++++++++ pretty/utility/_mirror/warning.py | 202 +++++++++++++++++++++++++++++ pretty/utility/_mirror/wrapper.py | 67 ++++++++++ 6 files changed, 586 insertions(+) create mode 100644 pretty/utility/_mirror/__init__.py create mode 100644 pretty/utility/_mirror/cache.py create mode 100644 pretty/utility/_mirror/typing.py create mode 100644 pretty/utility/_mirror/version.py create mode 100644 pretty/utility/_mirror/warning.py create mode 100644 pretty/utility/_mirror/wrapper.py diff --git a/pretty/utility/_mirror/__init__.py b/pretty/utility/_mirror/__init__.py new file mode 100644 index 0000000..dd113d3 --- /dev/null +++ b/pretty/utility/_mirror/__init__.py @@ -0,0 +1,38 @@ +""" +utility: A Python package with utilities I use in several of my other projects. +""" + +from __future__ import annotations +from typing import NamedTuple + +from .cache import * +from .cache import __all__ as _cache__all__ +from .typing import * +from .typing import __all__ as _typing__all__ +from .version import * +from .version import __all__ as _version__all__ +from .warning import * +from .warning import __all__ as _warning__all__ +from .wrapper import * +from .wrapper import __all__ as _wrapper__all__ + + +class _VersionInfo(NamedTuple): + major: int + minor: int + micro: int + release: str + serial: int + + +version: str = "0.1.0a" +version_info: _VersionInfo = _VersionInfo(0, 1, 0, "alpha", 0) + + +__all__ = [ # pyright: ignore[reportUnsupportedDunderAll] + *_cache__all__, + *_typing__all__, + *_version__all__, + *_warning__all__, + *_wrapper__all__, +] diff --git a/pretty/utility/_mirror/cache.py b/pretty/utility/_mirror/cache.py new file mode 100644 index 0000000..413073a --- /dev/null +++ b/pretty/utility/_mirror/cache.py @@ -0,0 +1,186 @@ +from __future__ import annotations +from typing import TYPE_CHECKING, TypeVar + +if TYPE_CHECKING: + from collections.abc import Callable, Collection, Generator + from typing import Any, overload + from typing_extensions import TypeAlias, ParamSpec, Self + + _P = ParamSpec("_P") + _T = TypeVar("_T") + _U = TypeVar("_U") + + _Generator: TypeAlias = Generator[_T, None, Any] + _GeneratorFunc: TypeAlias = Callable[_P, Generator[_T, None, Any]] + +import collections +import typing + +from .typing import MISSING +from .version import SUPPORTS_GENERICBUILTINS + + +def _make_key( + args: tuple[Any, ...], + kwargs: dict[str, Any], +) -> int: + return hash((args, frozenset(kwargs.items()))) + + +if TYPE_CHECKING: + + @overload + def cache_generator( + wrapped: _GeneratorFunc[_P, _T], + /, + ) -> _GeneratorFunc[_P, _T]: ... + + @overload + def cache_generator( + *, + max_size: int | None = ..., + ) -> Callable[[_GeneratorFunc[_P, _T]], _GeneratorFunc[_P, _T]]: ... + + @overload + def cache_generator( + *, + max_size: int | None = ..., + wrapper: Callable[[_Generator[_T]], _U], + ) -> Callable[[_GeneratorFunc[_P, _T]], Callable[_P, _U]]: ... + + +def cache_generator( + wrapped: _GeneratorFunc[_P, _T] = MISSING, + /, + *, + max_size: int | None = MISSING, + wrapper: Callable[[_Generator[_T]], _U] = MISSING, +) -> _GeneratorFunc[_P, _T] | Callable[[_GeneratorFunc[_P, _T]], _GeneratorFunc[_P, _T]] | Callable[[_GeneratorFunc[_P, _T]], Callable[_P, _U]]: + max_size = max_size if max_size is not MISSING else 1024 + + if isinstance(max_size, int): + if max_size < -1 or max_size == 0: + raise ValueError("max_size must be None, -1, or a positive integer") + + if wrapper is not MISSING: + + def decorator_wrapper( + wrapped: _GeneratorFunc[_P, _T], + /, + ) -> Callable[_P, _U]: + cache: dict[int, _U] | None + + if max_size == -1 or max_size is None: + cache = dict() + else: + cache = LRUCache(max_size=max_size) + + def inner( + *args: _P.args, + **kwargs: _P.kwargs, + ) -> _U: + key = _make_key(args, kwargs) + + if key not in cache: + cache[key] = wrapper(wrapped(*args, **kwargs)) + + return cache[key] + + inner.__utility_cache__ = cache + + return inner + + return decorator_wrapper + + else: + + def decorator( + wrapped: _GeneratorFunc[_P, _T], + /, + ) -> _GeneratorFunc[_P, _T]: + cache: dict[int, tuple[Generator[_T, None, Any], list[_T], bool]] + + if max_size == -1 or max_size is None: + cache = dict() + else: + cache = LRUCache(max_size=max_size) + + def inner( + *args: _P.args, + **kwargs: _P.kwargs, + ) -> Generator[_T, None, Any]: + key = _make_key(args, kwargs) + + if key not in cache: + generator = wrapped(*args, **kwargs) + cache[key] = (generator, list(), False) + + generator, items, done = cache[key] + + i = 0 # NOTE: this garbage is all required to support multiple entries before exit + while i < len(items): + yield items[i] + i += 1 + + if not done: + i = 0 + for item in generator: + items.append(item) + yield item + i += 1 + + if cache[key][2]: + yield from items[i:] + return + + cache[key] = (MISSING, items, True) + + inner.__utility_cache__ = cache + + return inner + + if wrapped is MISSING: + return decorator + + return decorator(wrapped) + + +_K = TypeVar("_K") +_V = TypeVar("_V") + + +class LRUCache(collections.OrderedDict[_K, _V] if SUPPORTS_GENERICBUILTINS else typing.OrderedDict[_K, _V]): # type: ignore + """ + TODO + """ + + def __init__( + self: Self, + /, + *, + max_size: int, + ) -> None: + super().__init__() + + self.max_size = max_size + + def __getitem__(self, key: _K) -> _V: + value = super().__getitem__(key) + self.move_to_end(key) + + return value + + def __setitem__(self, key: _K, value: _V) -> None: + if key in self: + self.move_to_end(key) + + super().__setitem__(key, value) + + if len(self) > self.max_size: + self.popitem(last=False) + + +__all__ = [ + "cache_generator", + "LRUCache", +] diff --git a/pretty/utility/_mirror/typing.py b/pretty/utility/_mirror/typing.py new file mode 100644 index 0000000..6a8e4e3 --- /dev/null +++ b/pretty/utility/_mirror/typing.py @@ -0,0 +1,26 @@ +from __future__ import annotations +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from typing import Any + + +class _MissingSentinel: + __slots__ = () + + def __repr__(self) -> str: + return "..." + + def __bool__(self) -> bool: + return False + + +MISSING: Any = _MissingSentinel() +""" +TODO +""" + + +__all__ = [ + "MISSING", +] diff --git a/pretty/utility/_mirror/version.py b/pretty/utility/_mirror/version.py new file mode 100644 index 0000000..e6a1a06 --- /dev/null +++ b/pretty/utility/_mirror/version.py @@ -0,0 +1,67 @@ +from __future__ import annotations +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from typing import Final + +import importlib.util +import sys + + +try: + from typing_extensions import __all__ as _typing_extensions__all__ +except ImportError: + _typing_extensions__all__ = () + + +PY_39: Final[bool] = sys.version_info >= (3, 9, 0) + +SUPPORTS_ANNOTATED = PY_39 or ("Annotated" in _typing_extensions__all__) # typing.Annotated +SUPPORTS_GENERICBUILTINS = PY_39 # list[int], collections.abc.Sequence[int] +SUPPORTS_ZONEINFO = PY_39 or (importlib.util.find_spec("backports") and importlib.util.find_spec("backports.zoneinfo")) is not None # zoneinfo + +PY_310: Final[bool] = sys.version_info >= (3, 10, 0) + +SUPPORTS_FLATLITERAL = PY_310 # Literal[1, Literal[2]] -> Literal[1, 2] in Literal.__new__ +SUPPORTS_ISTYPEDDICT = PY_310 # typing.is_typeddict +SUPPORTS_UNIONTYPE = PY_310 # types.UnionType + +PY_311: Final[bool] = sys.version_info >= (3, 11, 0) + +SUPPORTS_EXCEPTIONGROUP = PY_311 # ExceptionGroup +SUPPORTS_EXCEPTIONNOTES = PY_311 # BaseException.__notes__ +SUPPORTS_NEVER = PY_311 or ("Never" in _typing_extensions__all__) # typing.Never +SUPPORTS_SELF = PY_311 or ("Self" in _typing_extensions__all__) # typing.Self +SUPPORTS_TOMLLIB = PY_311 # tomllib +SUPPORTS_TYPEDDICTREQUIREDNESS = PY_311 or ("NotRequired" in _typing_extensions__all__ and "Required" in _typing_extensions__all__) # NotRequired[T], Required[T] + +PY_312: Final[bool] = sys.version_info >= (3, 12, 0) + +SUPPORTS_MOREORIGBASES = PY_312 # python/cpython@0056701 +SUPPORTS_SYSMONITORING = PY_312 # sys.monitoring +SUPPORTS_TYPEKEYWORD = PY_312 # type T +SUPPORTS_WARNINGSKIPS = PY_312 + + +__all__ = [ + "PY_39", + "SUPPORTS_ANNOTATED", + "SUPPORTS_GENERICBUILTINS", + "SUPPORTS_ZONEINFO", + "PY_310", + "SUPPORTS_FLATLITERAL", + "SUPPORTS_ISTYPEDDICT", + "SUPPORTS_UNIONTYPE", + "PY_311", + "SUPPORTS_EXCEPTIONGROUP", + "SUPPORTS_EXCEPTIONNOTES", + "SUPPORTS_NEVER", + "SUPPORTS_SELF", + "SUPPORTS_TOMLLIB", + "SUPPORTS_TYPEDDICTREQUIREDNESS", + "PY_312", + "SUPPORTS_MOREORIGBASES", + "SUPPORTS_SYSMONITORING", + "SUPPORTS_TYPEKEYWORD", + "SUPPORTS_WARNINGSKIPS", +] diff --git a/pretty/utility/_mirror/warning.py b/pretty/utility/_mirror/warning.py new file mode 100644 index 0000000..11ee57a --- /dev/null +++ b/pretty/utility/_mirror/warning.py @@ -0,0 +1,202 @@ +from __future__ import annotations +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from typing import Any, Callable, TypeVar, overload + from typing_extensions import ParamSpec + + from datetime import date + + _P = ParamSpec("_P") + _T = TypeVar("_T") + +import datetime +import pathlib +import warnings + +from .typing import MISSING +from .version import SUPPORTS_WARNINGSKIPS +from .wrapper import wrap + + +if TYPE_CHECKING: + + @overload + def deprecated( + wrapped: Callable[_P, _T], + /, + ) -> Callable[_P, _T]: ... + + @overload + def deprecated( + *, + condition: Callable[[tuple[Any], dict[str, Any]], bool] = MISSING, + what: str = MISSING, + when: date | str = MISSING, + where: str = MISSING, + who: type[Warning] = MISSING, + why: str = MISSING, + ) -> Callable[[Callable[_P, _T]], Callable[_P, _T]]: ... + + +def deprecated( + wrapped: Callable[_P, _T] = MISSING, + /, + *, + condition: Callable[[tuple[Any, ...], dict[str, Any]], bool] = MISSING, + what: str = MISSING, + when: date | str = MISSING, + where: str = MISSING, + who: type[Warning] = MISSING, + why: str = MISSING, +) -> Callable[_P, _T] | Callable[[Callable[_P, _T]], Callable[_P, _T]]: + """ + |decorator_dynamic| + + Marks a callable as deprecated, causing it to issue a warning when + called. + + + Parameters + ---------- + condition: Callable[[tuple[Any, ...], dict[:class:`str`, Any]], :class:`bool`] + A condition callable given `(args, kwargs)` that should return + a boolean as to whether to issue the warning. Typically used to + issue warnings only to users who use a deprecated parameter. + Defaults to `(_, _) -> True` + what: :class:`str` + The name of the thing that is deprecated. Defaults to the name + of the callable. + when: :class:`~datetime.date` | :class:`str` + The date on which the thing will be changed or removed. + Defaults to the next major version. + where: :class:`str` + The name of a thing to use instead of the deprecated thing. + Defaults to nothing. + who: type[:class:`Warning`] + The category of warning to issue. + why: :class:`str` + An arbitrary sentence to place at the end of the warning + message. Typically the reason for the deprecation. Defaults to + nothing. + + + Examples + -------- + + Issue a deprecation warning for a renamed callable + ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + + .. code-block:: python + :emphasize-lines: 1 + + @utility.deprecated(where="bar") + def foo(*, a: str, b: int): + ... + + + Issue a deprecation warning for a renamed parameter + ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + + .. code-block:: python + :emphasize-lines: 1-5 + + @utility.deprecated( + condition=lambda args,kwargs: "a" in kwargs.keys(), + what="foo(a=...)" + where="foo(b=...)", + ) + def foo(*, a: str = MISSING, b: int): + ... + """ + + who = who if who is not MISSING else DeprecationWarning + + if when is not MISSING: + if isinstance(when, datetime.date): + when = when.strftime("%Y-%m-%d") + + when = f"on {when}" + else: + when = "in the next major version" + + def decorator( + wrapped: Callable[_P, _T], + /, + ) -> Callable[_P, _T]: + what_ = what if what is not MISSING else wrapped.__qualname__ + + message = f"{what_} will be removed {when}" + + if where: + message += f", use {where} instead." + else: + message += "." + + if why: + message += f" {why}." + + @wrap(wrapped) + def wrapper( + *args: _P.args, + **kwargs: _P.kwargs, + ) -> _T: + if condition is MISSING or condition(args, kwargs): + warn_once(message, cls=who, level=2) + + return wrapped(*args, **kwargs) + + return wrapper + + if wrapped is not MISSING: + return decorator(wrapped) + else: + return decorator + + +_warning_skips = (str(pathlib.Path(__file__).parent.parent),) + + +def warn( + message: str, + /, + *, + cls: type[Warning], + level: int = 1, +) -> None: + """ + TODO + """ + + if SUPPORTS_WARNINGSKIPS: + warnings.warn(message, cls, 0, skip_file_prefixes=_warning_skips) # type: ignore # skip_file_prefixes does exist + else: + warnings.warn(message, cls, level + 1) + + +_warning_hashes: set[int] = set() + + +def warn_once( + message: str, + /, + *, + cls: type[Warning], + level: int = 1, +) -> None: + """ + TODO + """ + + warning_hash = hash((cls, message)) + + if warning_hash not in _warning_hashes: + _warning_hashes.add(warning_hash) + warn(message, cls=cls, level=level + 1) + + +__all__ = [ + "deprecated", + "warn", + "warn_once", +] diff --git a/pretty/utility/_mirror/wrapper.py b/pretty/utility/_mirror/wrapper.py new file mode 100644 index 0000000..7d2dbee --- /dev/null +++ b/pretty/utility/_mirror/wrapper.py @@ -0,0 +1,67 @@ +from __future__ import annotations +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from collections.abc import Callable + from typing import TypeVar + from typing_extensions import ParamSpec + + _P = ParamSpec("_P") + _T = TypeVar("_T") + + +def wrap( + wrapped: Callable[_P, _T], + /, +) -> Callable[[Callable[_P, _T]], Callable[_P, _T]]: + """ + TODO + """ + + def decorator( + wrapper: Callable[_P, _T], + /, + ) -> Callable[_P, _T]: + wrapper.__doc__ = wrapped.__doc__ + wrapper.__name__ = wrapped.__name__ + wrapper.__qualname__ = wrapped.__qualname__ + wrapper.__wrapped__ = wrapped + + return wrapper + + return decorator + + +def wrap_fallback( + wrapped: Callable[_P, _T], + /, +) -> Callable[[Callable[_P, _T]], Callable[_P, _T]]: + """ + TODO + """ + + def decorator( + wrapper: Callable[_P, _T], + /, + ) -> Callable[_P, _T]: + wrapper = wrap(wrapped)(wrapper) + + @wrap(wrapper) + def inner( + *args: _P.args, + **kwargs: _P.kwargs, + ) -> _T: + try: + return wrapper(*args, **kwargs) + except Exception: + return wrapped(*args, **kwargs) + + return inner + + return decorator + + +__all__ = [ + "wrap", + "wrap_fallback", +]