Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(worker): add SILVERBACK_FORK_MODE handler execution context #157

Merged
merged 6 commits into from
Nov 24, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions ape-config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# Ensure we are configured for fork mode correct for example
ethereum:
mainnet-fork:
default_provider: foundry
50 changes: 42 additions & 8 deletions silverback/main.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
import asyncio
import atexit
import inspect
from collections import defaultdict
from datetime import timedelta
from typing import Any, Callable
from functools import wraps
from typing import Any, Awaitable, Callable

from ape.api.networks import LOCAL_NETWORK_NAME
from ape.contracts import ContractEvent, ContractInstance
Expand Down Expand Up @@ -108,17 +111,18 @@ def __init__(self, settings: Settings | None = None):

provider_context = settings.get_provider_context()
# NOTE: This allows using connected ape methods e.g. `Contract`
provider = provider_context.__enter__()
self.provider = provider_context.__enter__()

self.identifier = SilverbackID(
name=settings.BOT_NAME,
network=provider.network.name,
ecosystem=provider.network.ecosystem.name,
network=self.provider.network.name,
ecosystem=self.provider.network.ecosystem.name,
)

# Adjust defaults from connection
if settings.NEW_BLOCK_TIMEOUT is None and (
provider.network.name.endswith("-fork") or provider.network.name == LOCAL_NETWORK_NAME
self.provider.network.name.endswith("-fork")
or self.provider.network.name == LOCAL_NETWORK_NAME
):
settings.NEW_BLOCK_TIMEOUT = int(timedelta(days=1).total_seconds())

Expand All @@ -138,6 +142,7 @@ def __init__(self, settings: Settings | None = None):

self.signer = settings.get_signer()
self.new_block_timeout = settings.NEW_BLOCK_TIMEOUT
self.use_fork = settings.FORK_MODE and not self.provider.network.name.endswith("-fork")

signer_str = f"\n SIGNER={repr(self.signer)}"
new_block_timeout_str = (
Expand All @@ -146,7 +151,9 @@ def __init__(self, settings: Settings | None = None):

network_choice = f"{self.identifier.ecosystem}:{self.identifier.network}"
logger.success(
f'Loaded Silverback Bot:\n NETWORK="{network_choice}"'
"Loaded Silverback Bot:\n"
f' NETWORK="{network_choice}"\n'
f" FORK_MODE={self.use_fork}"
f"{signer_str}{new_block_timeout_str}"
)

Expand Down Expand Up @@ -225,6 +232,28 @@ async def __create_snapshot_handler(
last_block_processed=self.state.get("system:last_block_processed", -1),
)

# To ensure we don't have too many forks at once
# HACK: Until `NetworkManager.fork` (and `ProviderContextManager`) allows concurrency

fork_lock: asyncio.Lock = asyncio.Lock()

def _with_fork_decorator(self, handler: Callable) -> Callable:
# Trigger worker-side handling using fork network by wrapping handler
fork_context = self.provider.network_manager.fork

@wraps(handler)
async def fork_handler(*args, **kwargs):
async with self.fork_lock:
with fork_context():
result = handler(*args, **kwargs)

if inspect.isawaitable(result):
return await result

return result

return fork_handler

def broker_task_decorator(
self,
task_type: TaskType,
Expand Down Expand Up @@ -266,7 +295,9 @@ def broker_task_decorator(
raise ContainerTypeMismatchError(task_type, container)

# Register user function as task handler with our broker
def add_taskiq_task(handler: Callable) -> AsyncTaskiqDecoratedTask:
def add_taskiq_task(
handler: Callable[..., Any | Awaitable[Any]]
) -> AsyncTaskiqDecoratedTask:
labels = {"task_type": str(task_type)}

# NOTE: Do *not* do `if container` because that does a `len(container)` call,
Expand All @@ -276,14 +307,17 @@ def add_taskiq_task(handler: Callable) -> AsyncTaskiqDecoratedTask:
# Address is almost a certainty if the container is being used as a filter here.
if not (contract_address := getattr(container.contract, "address", None)):
raise InvalidContainerTypeError(
"Please provider a contract event from a valid contract instance."
"Please provide a contract event from a valid contract instance."
)

labels["contract_address"] = contract_address
labels["event_signature"] = container.abi.signature

self.tasks[task_type].append(TaskData(name=handler.__name__, labels=labels))

if self.use_fork:
handler = self._with_fork_decorator(handler)

return self.broker.register_task(
handler,
task_name=handler.__name__,
Expand Down
4 changes: 4 additions & 0 deletions silverback/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ class Settings(BaseSettings, ManagerAccessMixin):
# A unique identifier for this silverback instance
BOT_NAME: str = "bot"

# Execute every handler using an independent fork context
# NOTE: Requires fork-able provider installed and configured for network
FORK_MODE: bool = False

BROKER_CLASS: str = "taskiq:InMemoryBroker"
BROKER_URI: str = "" # To be deprecated in 0.6
BROKER_KWARGS: dict[str, Any] = dict()
Expand Down
Loading