From 0a6a9c83a08447a2bb3becbf9595199778f63e52 Mon Sep 17 00:00:00 2001 From: Doggie B <3859395+fubuloubu@users.noreply.github.com> Date: Sat, 26 Oct 2024 14:30:10 -0400 Subject: [PATCH 1/4] feat(worker): add SILVERBACK_FORK_MODE handler execution context --- ape-config.yaml | 4 ++++ silverback/main.py | 21 ++++++++++++++++++++- silverback/settings.py | 4 ++++ 3 files changed, 28 insertions(+), 1 deletion(-) create mode 100644 ape-config.yaml diff --git a/ape-config.yaml b/ape-config.yaml new file mode 100644 index 00000000..4b16339d --- /dev/null +++ b/ape-config.yaml @@ -0,0 +1,4 @@ +# Ensure we are configured for fork mode correct for example +ethereum: + mainnet-fork: + default_provider: foundry diff --git a/silverback/main.py b/silverback/main.py index 015ec1a5..b5061b96 100644 --- a/silverback/main.py +++ b/silverback/main.py @@ -1,6 +1,8 @@ import atexit +import inspect from collections import defaultdict from datetime import timedelta +from functools import wraps from typing import Any, Callable from ape.api.networks import LOCAL_NETWORK_NAME @@ -138,6 +140,7 @@ def __init__(self, settings: Settings | None = None): self.signer = settings.get_signer() self.new_block_timeout = settings.NEW_BLOCK_TIMEOUT + self.use_fork = settings.FORK_MODE signer_str = f"\n SIGNER={repr(self.signer)}" new_block_timeout_str = ( @@ -146,7 +149,7 @@ def __init__(self, settings: Settings | None = None): network_choice = f"{self.identifier.ecosystem}:{self.identifier.network}" logger.success( - f'Loaded Silverback Bot:\n NETWORK="{network_choice}"' + f'Loaded Silverback Bot:\n NETWORK="{network_choice}"\n FORK_MODE={self.use_fork}' f"{signer_str}{new_block_timeout_str}" ) @@ -284,6 +287,22 @@ def add_taskiq_task(handler: Callable) -> AsyncTaskiqDecoratedTask: self.tasks[task_type].append(TaskData(name=handler.__name__, labels=labels)) + if self.use_fork: + from ape import networks # NOTE: Defer import for load speed + + # Trigger worker-side handling using fork network by wrapping handler + is_awaitable = inspect.isawaitable(handler) + + @wraps(handler) + async def fork_handler(*args, **kwargs): + with networks.fork(): + if is_awaitable: + return await handler(*args, **kwargs) + else: + return handler(*args, **kwargs) + + handler = fork_handler + return self.broker.register_task( handler, task_name=handler.__name__, diff --git a/silverback/settings.py b/silverback/settings.py index 7b756484..acd2b6d4 100644 --- a/silverback/settings.py +++ b/silverback/settings.py @@ -28,6 +28,10 @@ class Settings(BaseSettings, ManagerAccessMixin): # A unique identifier for this silverback instance BOT_NAME: str = "bot" + # Execute every handler using an independent fork context + # NOTE: Requires fork-able provider installed and configured for network + FORK_MODE: bool = False + BROKER_CLASS: str = "taskiq:InMemoryBroker" BROKER_URI: str = "" # To be deprecated in 0.6 BROKER_KWARGS: dict[str, Any] = dict() From 138d61a0fbb54a2214779e7780e0519c5bad56d4 Mon Sep 17 00:00:00 2001 From: Doggie B <3859395+fubuloubu@users.noreply.github.com> Date: Sat, 26 Oct 2024 20:51:14 -0400 Subject: [PATCH 2/4] refactor: use internal variable for fork detection to prevent scope bug --- silverback/main.py | 54 ++++++++++++++++++++++++++-------------------- 1 file changed, 31 insertions(+), 23 deletions(-) diff --git a/silverback/main.py b/silverback/main.py index b5061b96..5590c61c 100644 --- a/silverback/main.py +++ b/silverback/main.py @@ -3,7 +3,7 @@ from collections import defaultdict from datetime import timedelta from functools import wraps -from typing import Any, Callable +from typing import Any, Awaitable, Callable from ape.api.networks import LOCAL_NETWORK_NAME from ape.contracts import ContractEvent, ContractInstance @@ -110,17 +110,18 @@ def __init__(self, settings: Settings | None = None): provider_context = settings.get_provider_context() # NOTE: This allows using connected ape methods e.g. `Contract` - provider = provider_context.__enter__() + self.provider = provider_context.__enter__() self.identifier = SilverbackID( name=settings.BOT_NAME, - network=provider.network.name, - ecosystem=provider.network.ecosystem.name, + network=self.provider.network.name, + ecosystem=self.provider.network.ecosystem.name, ) # Adjust defaults from connection if settings.NEW_BLOCK_TIMEOUT is None and ( - provider.network.name.endswith("-fork") or provider.network.name == LOCAL_NETWORK_NAME + self.provider.network.name.endswith("-fork") + or self.provider.network.name == LOCAL_NETWORK_NAME ): settings.NEW_BLOCK_TIMEOUT = int(timedelta(days=1).total_seconds()) @@ -140,7 +141,7 @@ def __init__(self, settings: Settings | None = None): self.signer = settings.get_signer() self.new_block_timeout = settings.NEW_BLOCK_TIMEOUT - self.use_fork = settings.FORK_MODE + self.use_fork = settings.FORK_MODE and not self.provider.network.name.endswith("-fork") signer_str = f"\n SIGNER={repr(self.signer)}" new_block_timeout_str = ( @@ -149,7 +150,9 @@ def __init__(self, settings: Settings | None = None): network_choice = f"{self.identifier.ecosystem}:{self.identifier.network}" logger.success( - f'Loaded Silverback Bot:\n NETWORK="{network_choice}"\n FORK_MODE={self.use_fork}' + "Loaded Silverback Bot:\n" + f' NETWORK="{network_choice}"\n' + f" FORK_MODE={self.use_fork}" f"{signer_str}{new_block_timeout_str}" ) @@ -228,6 +231,22 @@ async def __create_snapshot_handler( last_block_processed=self.state.get("system:last_block_processed", -1), ) + def _with_fork_decorator(self, handler: Callable) -> Callable: + # Trigger worker-side handling using fork network by wrapping handler + fork_context = self.provider.network_manager.fork + + @wraps(handler) + async def fork_handler(*args, **kwargs): + with fork_context(): + result = handler(*args, **kwargs) + + if inspect.isawaitable(result): + return await result + + return result + + return fork_handler + def broker_task_decorator( self, task_type: TaskType, @@ -269,7 +288,9 @@ def broker_task_decorator( raise ContainerTypeMismatchError(task_type, container) # Register user function as task handler with our broker - def add_taskiq_task(handler: Callable) -> AsyncTaskiqDecoratedTask: + def add_taskiq_task( + handler: Callable[..., Any | Awaitable[Any]] + ) -> AsyncTaskiqDecoratedTask: labels = {"task_type": str(task_type)} # NOTE: Do *not* do `if container` because that does a `len(container)` call, @@ -279,7 +300,7 @@ def add_taskiq_task(handler: Callable) -> AsyncTaskiqDecoratedTask: # Address is almost a certainty if the container is being used as a filter here. if not (contract_address := getattr(container.contract, "address", None)): raise InvalidContainerTypeError( - "Please provider a contract event from a valid contract instance." + "Please provide a contract event from a valid contract instance." ) labels["contract_address"] = contract_address @@ -288,20 +309,7 @@ def add_taskiq_task(handler: Callable) -> AsyncTaskiqDecoratedTask: self.tasks[task_type].append(TaskData(name=handler.__name__, labels=labels)) if self.use_fork: - from ape import networks # NOTE: Defer import for load speed - - # Trigger worker-side handling using fork network by wrapping handler - is_awaitable = inspect.isawaitable(handler) - - @wraps(handler) - async def fork_handler(*args, **kwargs): - with networks.fork(): - if is_awaitable: - return await handler(*args, **kwargs) - else: - return handler(*args, **kwargs) - - handler = fork_handler + handler = self._with_fork_decorator(handler) return self.broker.register_task( handler, From 5dc3025a570fbb4a787c73b885ba70ec72601520 Mon Sep 17 00:00:00 2001 From: Doggie B <3859395+fubuloubu@users.noreply.github.com> Date: Sat, 26 Oct 2024 21:07:56 -0400 Subject: [PATCH 3/4] fix: block other forks until https://github.com/ApeWorX/ape/issues/2348 --- silverback/main.py | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/silverback/main.py b/silverback/main.py index 5590c61c..16e515cc 100644 --- a/silverback/main.py +++ b/silverback/main.py @@ -1,3 +1,4 @@ +import asyncio import atexit import inspect from collections import defaultdict @@ -231,19 +232,25 @@ async def __create_snapshot_handler( last_block_processed=self.state.get("system:last_block_processed", -1), ) + # To ensure we don't have too many forks at once + # HACK: Until `NetworkManager.fork` (and `ProviderContextManager`) allows concurrency + + fork_lock: asyncio.Lock = asyncio.Lock() + def _with_fork_decorator(self, handler: Callable) -> Callable: # Trigger worker-side handling using fork network by wrapping handler fork_context = self.provider.network_manager.fork @wraps(handler) async def fork_handler(*args, **kwargs): - with fork_context(): - result = handler(*args, **kwargs) + async with self.fork_lock: + with fork_context(): + result = handler(*args, **kwargs) - if inspect.isawaitable(result): - return await result + if inspect.isawaitable(result): + return await result - return result + return result return fork_handler From 6f4b117a5e29d438e617fd194635488250e81794 Mon Sep 17 00:00:00 2001 From: Doggie B <3859395+fubuloubu@users.noreply.github.com> Date: Mon, 28 Oct 2024 17:34:02 -0400 Subject: [PATCH 4/4] refactor(fork): working with fork fix from ape core depends: https://github.com/ApeWorX/ape/pull/2349 --- silverback/main.py | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/silverback/main.py b/silverback/main.py index 16e515cc..eccedd5e 100644 --- a/silverback/main.py +++ b/silverback/main.py @@ -1,4 +1,3 @@ -import asyncio import atexit import inspect from collections import defaultdict @@ -235,22 +234,19 @@ async def __create_snapshot_handler( # To ensure we don't have too many forks at once # HACK: Until `NetworkManager.fork` (and `ProviderContextManager`) allows concurrency - fork_lock: asyncio.Lock = asyncio.Lock() - def _with_fork_decorator(self, handler: Callable) -> Callable: # Trigger worker-side handling using fork network by wrapping handler fork_context = self.provider.network_manager.fork @wraps(handler) async def fork_handler(*args, **kwargs): - async with self.fork_lock: - with fork_context(): - result = handler(*args, **kwargs) + with fork_context(): + result = handler(*args, **kwargs) - if inspect.isawaitable(result): - return await result + if inspect.isawaitable(result): + return await result - return result + return result return fork_handler