Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: allow multiple handlers #66

Merged
merged 19 commits into from
Apr 11, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
a370d3a
refactor: add TaskType enum to types
fubuloubu Apr 8, 2024
7fd52d9
refactor!: use task collection by task type; use dynamic registration
fubuloubu Apr 8, 2024
144a087
refactor: use new task collections api for fetch task handlers
fubuloubu Apr 8, 2024
8ddb0d3
refactor!: use task type to differentiate tasks in middleware, not name
fubuloubu Apr 8, 2024
64c281e
fix: typing bugs, not using Union for 3.8 support
fubuloubu Apr 8, 2024
42247be
fix: use task type to capture event logs
fubuloubu Apr 9, 2024
e0c3dc7
refactor: use defaultdict instead of custom collection type
fubuloubu Apr 9, 2024
8798669
refactor: use standardized labels, use task_name for task_id
fubuloubu Apr 9, 2024
2a7d62a
refactor: remove `.task_name` from message labels
fubuloubu Apr 9, 2024
6aafc83
refactor: convert to TaskType for better processing
fubuloubu Apr 9, 2024
1bf0702
Merge branch 'main' into refactor/allow-multiple-handlers
fubuloubu Apr 10, 2024
5ad9b7a
refactor: use StrEnum if available
fubuloubu Apr 10, 2024
c177a94
docs: add note to deprecate in breaking change
fubuloubu Apr 10, 2024
eadaea5
refactor: make object type clearer when working with labels in middle…
fubuloubu Apr 10, 2024
9519b30
refactor: use official backport
fubuloubu Apr 10, 2024
184a939
docs: update typing and add docs for dynamic broker task decorator fn
fubuloubu Apr 10, 2024
77a11aa
style: ignore mypy typing issues on <3.11
fubuloubu Apr 11, 2024
9ca7692
refactor: avoid div/0 fault, fix duplicate log entry for results w/errs
fubuloubu Apr 11, 2024
897cf95
refactor: rollback weird typing backport issue
fubuloubu Apr 11, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 49 additions & 63 deletions silverback/application.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,42 @@
import atexit
from dataclasses import dataclass
from datetime import timedelta
from typing import Callable, Dict, Optional, Union

from ape.api.networks import LOCAL_NETWORK_NAME
from ape.contracts import ContractEvent, ContractInstance
from ape.logging import logger
from ape.managers.chain import BlockContainer
from ape.types import AddressType
from ape.utils import ManagerAccessMixin
from taskiq import AsyncTaskiqDecoratedTask, TaskiqEvents

from .exceptions import DuplicateHandlerError, InvalidContainerTypeError
from .exceptions import InvalidContainerTypeError
from .settings import Settings
from .types import TaskType


@dataclass
class Task:
container: Union[BlockContainer, ContractEvent, None]
handler: AsyncTaskiqDecoratedTask


class TaskCollection(dict):
fubuloubu marked this conversation as resolved.
Show resolved Hide resolved
def insert(self, task_type: TaskType, task: Task):
if not isinstance(task_type, TaskType):
raise ValueError("Unexpected key type")

elif not isinstance(task, Task):
raise ValueError("Unexpected value type")

elif task_type is TaskType.NEW_BLOCKS and not isinstance(task.container, BlockContainer):
raise ValueError("Mismatch between key and value types")

elif task_type is TaskType.EVENT_LOG and not isinstance(task.container, ContractEvent):
raise ValueError("Mismatch between key and value types")

task_list = super().get(task_type) or []
fubuloubu marked this conversation as resolved.
Show resolved Hide resolved
super().__setitem__(task_type, task_list + [task])


class SilverbackApp(ManagerAccessMixin):
Expand Down Expand Up @@ -52,7 +77,7 @@ def __init__(self, settings: Optional[Settings] = None):
logger.info(f"Loading Silverback App with settings:\n {settings_str}")

self.broker = settings.get_broker()
self.contract_events: Dict[AddressType, Dict[str, ContractEvent]] = {}
self.tasks = TaskCollection()
self.poll_settings: Dict[str, Dict] = {}

atexit.register(self.network.__exit__, None, None, None)
Expand All @@ -72,6 +97,23 @@ def __init__(self, settings: Optional[Settings] = None):
f"{signer_str}{start_block_str}{new_block_timeout_str}"
)

def broker_task_decorator(
self,
task_type: TaskType,
container: Union[BlockContainer, ContractEvent, None] = None,
):
def add_taskiq_task(handler: Callable):
# TODO: Support generic registration
task = self.broker.register_task(
handler,
task_name=handler.__name__,
fubuloubu marked this conversation as resolved.
Show resolved Hide resolved
task_type=str(task_type),
)
self.tasks.insert(task_type, Task(container=container, handler=task))
return task

return add_taskiq_task

def on_startup(self) -> Callable:
"""
Code to execute on one worker upon startup / restart after an error.
Expand All @@ -82,7 +124,7 @@ def on_startup(self) -> Callable:
def do_something_on_startup(startup_state):
... # Reprocess missed events or blocks
"""
return self.broker.task(task_name="silverback_startup")
return self.broker_task_decorator(TaskType.STARTUP)

def on_shutdown(self) -> Callable:
"""
Expand All @@ -94,7 +136,7 @@ def on_shutdown(self) -> Callable:
def do_something_on_shutdown():
... # Record final state of app
"""
return self.broker.task(task_name="silverback_shutdown")
return self.broker_task_decorator(TaskType.SHUTDOWN)

def on_worker_startup(self) -> Callable:
"""
Expand All @@ -120,48 +162,6 @@ def do_something_on_shutdown(state):
"""
return self.broker.on_event(TaskiqEvents.WORKER_SHUTDOWN)

def get_startup_handler(self) -> Optional[AsyncTaskiqDecoratedTask]:
"""
Get access to the handler for `silverback_startup` events.

Returns:
Optional[AsyncTaskiqDecoratedTask]: Returns decorated task, if one has been created.
"""
return self.broker.find_task("silverback_startup")

def get_shutdown_handler(self) -> Optional[AsyncTaskiqDecoratedTask]:
"""
Get access to the handler for `silverback_shutdown` events.

Returns:
Optional[AsyncTaskiqDecoratedTask]: Returns decorated task, if one has been created.
"""
return self.broker.find_task("silverback_shutdown")

def get_block_handler(self) -> Optional[AsyncTaskiqDecoratedTask]:
"""
Get access to the handler for `block` events.

Returns:
Optional[AsyncTaskiqDecoratedTask]: Returns decorated task, if one has been created.
"""
return self.broker.find_task("block")

def get_event_handler(
self, event_target: AddressType, event_name: str
) -> Optional[AsyncTaskiqDecoratedTask]:
"""
Get access to the handler for `<event_target>:<event_name>` events.

Args:
event_target (AddressType): The contract address of the target.
event_name: (str): The name of the event emitted by ``event_target``.

Returns:
Optional[AsyncTaskiqDecoratedTask]: Returns decorated task, if one has been created.
"""
return self.broker.find_task(f"{event_target}/event/{event_name}")

def on_(
self,
container: Union[BlockContainer, ContractEvent],
Expand All @@ -183,9 +183,6 @@ def on_(
If the type of `container` is not configurable for the app.
"""
if isinstance(container, BlockContainer):
if self.get_block_handler():
raise DuplicateHandlerError("block")

if new_block_timeout is not None:
if "_blocks_" in self.poll_settings:
self.poll_settings["_blocks_"]["new_block_timeout"] = new_block_timeout
Expand All @@ -198,21 +195,12 @@ def on_(
else:
self.poll_settings["_blocks_"] = {"start_block": start_block}

return self.broker.task(task_name="block")
return self.broker_task_decorator(TaskType.NEW_BLOCKS, container=container)

elif isinstance(container, ContractEvent) and isinstance(
container.contract, ContractInstance
):
if self.get_event_handler(container.contract.address, container.abi.name):
raise DuplicateHandlerError(
f"event {container.contract.address}:{container.abi.name}"
)

key = container.contract.address
if container.contract.address in self.contract_events:
self.contract_events[key][container.abi.name] = container
else:
self.contract_events[key] = {container.abi.name: container}

if new_block_timeout is not None:
if key in self.poll_settings:
Expand All @@ -226,9 +214,7 @@ def on_(
else:
self.poll_settings[key] = {"start_block": start_block}

return self.broker.task(
task_name=f"{container.contract.address}/event/{container.abi.name}"
)
return self.broker_task_decorator(TaskType.EVENT_LOG, container=container)

# TODO: Support account transaction polling
# TODO: Support mempool polling
Expand Down
5 changes: 0 additions & 5 deletions silverback/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,6 @@ class ImportFromStringError(Exception):
pass


class DuplicateHandlerError(Exception):
fubuloubu marked this conversation as resolved.
Show resolved Hide resolved
def __init__(self, handler_type: str):
super().__init__(f"Only one handler allowed for: {handler_type}")


class InvalidContainerTypeError(Exception):
def __init__(self, container: Any):
super().__init__(f"Invalid container type: {container.__class__}")
Expand Down
51 changes: 30 additions & 21 deletions silverback/middlewares.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,24 +7,22 @@
from taskiq import TaskiqMessage, TaskiqMiddleware, TaskiqResult

from silverback.persistence import HandlerResult
from silverback.types import SilverbackID, handler_id_block, handler_id_event
from silverback.types import SilverbackID, TaskType, handler_id_block, handler_id_event
from silverback.utils import hexbytes_dict


def resolve_task(message: TaskiqMessage) -> Tuple[str, Optional[int], Optional[int]]:
block_number = None
log_index = None
block_number = message.labels.get("number") or message.labels.get("block")
log_index = message.labels.get("log_index")
task_id = message.task_name
fubuloubu marked this conversation as resolved.
Show resolved Hide resolved

if task_id == "block":
block_number = message.args[0].number
task_id = handler_id_block(block_number)
elif "event" in task_id:
block_number = message.args[0].block_number
log_index = message.args[0].log_index
if log_index:
# TODO: Should standardize on event signature here instead of name in case of overloading
task_id = handler_id_event(message.args[0].contract_address, message.args[0].event_name)

elif block_number:
task_id = handler_id_block(block_number)

return task_id, block_number, log_index


Expand Down Expand Up @@ -66,34 +64,40 @@ def fix_dict(data: dict, recurse_count: int = 0) -> dict:
return message

def _create_label(self, message: TaskiqMessage) -> str:
if message.task_name == "block":
args = f"[block={message.args[0].hash.hex()}]"

elif "event" in message.task_name:
args = f"[txn={message.args[0].transaction_hash},log_index={message.args[0].log_index}]"
if labels_str := (
",".join(f"{k}={v}" for k, v in message.labels.items() if k != "task_name")
):
return f"{message.task_name}[{labels_str}]"

else:
args = ""

return f"{message.task_name}{args}"
return message.task_name

def pre_execute(self, message: TaskiqMessage) -> TaskiqMessage:
if message.task_name == "block":
message.labels["task_name"] = message.task_name
fubuloubu marked this conversation as resolved.
Show resolved Hide resolved
task_type = message.labels.pop("task_type", "<unknown>")

# NOTE: Don't compare `str` to `TaskType` using `is`
fubuloubu marked this conversation as resolved.
Show resolved Hide resolved
if task_type == TaskType.NEW_BLOCKS:
# NOTE: Necessary because we don't know the exact block class
message.args[0] = self.provider.network.ecosystem.decode_block(
hexbytes_dict(message.args[0])
)
message.labels["number"] = str(message.args[0].number)
message.labels["hash"] = message.args[0].hash.hex()

elif "event" in message.task_name:
fubuloubu marked this conversation as resolved.
Show resolved Hide resolved
# NOTE: Just in case the user doesn't specify type as `ContractLog`
message.args[0] = ContractLog.model_validate(message.args[0])
message.labels["block"] = str(message.args[0].block_number)
message.labels["txn_id"] = message.args[0].transaction_hash
fubuloubu marked this conversation as resolved.
Show resolved Hide resolved
message.labels["log_index"] = str(message.args[0].log_index)

logger.info(f"{self._create_label(message)} - Started")
logger.debug(f"{self._create_label(message)} - Started")
return message

def post_execute(self, message: TaskiqMessage, result: TaskiqResult):
percentage_time = 100 * (result.execution_time / self.block_time)
logger.info(
logger.success(
f"{self._create_label(message)} "
f"- {result.execution_time:.3f}s ({percentage_time:.1f}%)"
)
Expand All @@ -119,4 +123,9 @@ async def on_error(
result: TaskiqResult,
exception: BaseException,
):
logger.error(f"{message.task_name} - {type(exception).__name__}: {exception}")
percentage_time = 100 * (result.execution_time / self.block_time)
fubuloubu marked this conversation as resolved.
Show resolved Hide resolved
logger.error(
f"{self._create_label(message)} "
f"- {result.execution_time:.3f}s ({percentage_time:.1f}%)"
)
# NOTE: Unless stdout is ignored, error traceback appears in stdout
26 changes: 11 additions & 15 deletions silverback/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from .persistence import BasePersistentStore
from .settings import Settings
from .subscriptions import SubscriptionType, Web3SubscriptionsManager
from .types import SilverbackID, SilverbackStartupState
from .types import SilverbackID, SilverbackStartupState, TaskType
from .utils import async_wrap_iter, hexbytes_dict

settings = Settings()
Expand Down Expand Up @@ -103,8 +103,8 @@ async def run(self):
await self.app.broker.startup()

# Execute Silverback startup task before we init the rest
if startup_handler := self.app.get_startup_handler():
task = await startup_handler.kiq(
for startup_task in self.app.tasks.get(TaskType.STARTUP):
task = await startup_task.handler.kiq(
SilverbackStartupState(
last_block_seen=self.last_block_seen,
last_block_processed=self.last_block_processed,
Expand All @@ -113,26 +113,22 @@ async def run(self):
result = await task.wait_result()
self._handle_result(result)

if block_handler := self.app.get_block_handler():
tasks = [self._block_task(block_handler)]
else:
tasks = []
tasks = []
for task in self.app.tasks.get(TaskType.NEW_BLOCKS):
tasks.append(self._block_task(task.handler))

for contract_address in self.app.contract_events:
for event_name, contract_event in self.app.contract_events[contract_address].items():
if event_handler := self.app.get_event_handler(contract_address, event_name):
tasks.append(self._event_task(contract_event, event_handler))
for task in self.app.tasks.get(TaskType.EVENT_LOG):
tasks.append(self._event_task(task.container, task.handler))

if len(tasks) == 0:
raise Halt("No tasks to execute")

await asyncio.gather(*tasks)

# Execute Silverback shutdown task before shutting down the broker
if shutdown_handler := self.app.get_shutdown_handler():
task = await shutdown_handler.kiq()
result = await task.wait_result()
self._handle_result(result)
for shutdown_task in self.app.tasks.get(TaskType.SHUTDOWN):
task = await shutdown_task.handler.kiq()
result = self._handle_result(await task.wait_result())

await self.app.broker.shutdown()

Expand Down
11 changes: 11 additions & 0 deletions silverback/types.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,20 @@
from enum import Enum
from typing import Optional, Protocol

from pydantic import BaseModel
from typing_extensions import Self # Introduced 3.11


class TaskType(str, Enum):
fubuloubu marked this conversation as resolved.
Show resolved Hide resolved
STARTUP = "silverback_startup" # TODO: Shorten
fubuloubu marked this conversation as resolved.
Show resolved Hide resolved
NEW_BLOCKS = "block"
EVENT_LOG = "event"
SHUTDOWN = "silverback_shutdown" # TODO: Shorten

def __str__(self) -> str:
return self.value


class ISilverbackSettings(Protocol):
"""Loose approximation of silverback.settings.Settings. If you can, use the class as
a type reference."""
Expand Down
Loading