Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat!: add app.state #80

Merged
merged 17 commits into from
Oct 9, 2024
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions docs/userguides/development.md
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,45 @@ def handle_on_shutdown():

*Changed in 0.2.0*: The behavior of the `@bot.on_startup()` decorator and handler signature have changed. It is now executed only once upon application startup and worker events have moved on `@bot.on_worker_startup()`.

## Application State
johnson2427 marked this conversation as resolved.
Show resolved Hide resolved

Sometimes it is very useful to have access to values in a shared state across your workers.
For example you might have a value or complex reference type that you wish to update during one of your tasks, and read during another.
Silverback provides `app.state` to help with these use cases.
johnson2427 marked this conversation as resolved.
Show resolved Hide resolved

For example, you might want to pre-populate a large dataframe into state on startup, keeping that dataframe in sync with the chain through event logs,
and then use that data to determine a signal under which you want trigger transactions to commit back to the chain.
Such an application might look like this:

```py
@app.on_startup()
johnson2427 marked this conversation as resolved.
Show resolved Hide resolved
def create_table(startup_state):
df = contract.MyEvent.query(..., start_block=startup_state.last_block_processed)
... # Do some further processing on df
app.state.table = df
johnson2427 marked this conversation as resolved.
Show resolved Hide resolved


@app.on_(contract.MyEvent)
johnson2427 marked this conversation as resolved.
Show resolved Hide resolved
def update_table(log):
app.state.table = ... # Update using stuff from `log`
johnson2427 marked this conversation as resolved.
Show resolved Hide resolved


@app.on_(chain.blocks)
johnson2427 marked this conversation as resolved.
Show resolved Hide resolved
def use_table(blk):
if app.state.table[...].mean() > app.state.table[...].sum():
# Trigger your app to send a transaction from `app.signer`
contract.myMethod(..., sender=app.signer)
johnson2427 marked this conversation as resolved.
Show resolved Hide resolved
...
```

```{warning}
You can use `app.state` to store any python variable type, however note that the item is not networked nor threadsafe so it is not recommended to have multiple tasks write to the same value in state at the same time.
johnson2427 marked this conversation as resolved.
Show resolved Hide resolved
```

```{note}
Application startup and application runtime events (e.g. block or event container) are handled distinctly and can be trusted not to execute at the same time.
johnson2427 marked this conversation as resolved.
Show resolved Hide resolved
```

### Signing Transactions

If configured, your bot with have `bot.signer` which is an Ape account that can sign arbitrary transactions you ask it to.
Expand Down
62 changes: 47 additions & 15 deletions example.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
from typing import Annotated

from ape import chain
Expand All @@ -6,36 +7,52 @@
from ape_tokens import tokens # type: ignore[import]
from taskiq import Context, TaskiqDepends, TaskiqState

from silverback import AppState, CircuitBreaker, SilverbackApp
from silverback import CircuitBreaker, SilverbackApp, StateSnapshot

# Do this first to initialize your app
app = SilverbackApp()

# Cannot call `app.state` outside of an app function handler
# app.state.something # NOTE: raises AttributeError

# NOTE: Don't do any networking until after initializing app
USDC = tokens["USDC"]
YFI = tokens["YFI"]


@app.on_startup()
def app_startup(startup_state: AppState):
# NOTE: This is called just as the app is put into "run" state,
# and handled by the first available worker
# raise Exception # NOTE: Any exception raised on startup aborts immediately
def app_startup(startup_state: StateSnapshot):
# This is called just as the app is put into "run" state,
# and handled by the first available worker

# Any exception raised on startup aborts immediately:
# raise Exception # NOTE: raises StartupFailure

# This is a great place to set `app.state` values
app.state.logs_processed = 0
# NOTE: Can put anything here, any python object works

return {"block_number": startup_state.last_block_seen}


# Can handle some resource initialization for each worker, like LLMs or database connections
class MyDB:
def execute(self, query: str):
pass
pass # Handle query somehow...


@app.on_worker_startup()
def worker_startup(state: TaskiqState): # NOTE: You need the type hint here
# NOTE: This event is triggered internally, do not use unless you know what you're doing
def worker_startup(worker_state: TaskiqState): # NOTE: You need the type hint to load worker state
# NOTE: Worker state is per-worker, not shared with other workers
# NOTE: Can put anything here, any python object works
state.db = MyDB()
state.block_count = 0
# raise Exception # NOTE: Any exception raised on worker startup aborts immediately
worker_state.db = MyDB()

# Any exception raised on worker startup aborts immediately:
# raise Exception # NOTE: raises StartupFailure

# Cannot call `app.state` because it is not set up yet on worker startup functions
# app.state.something # NOTE: raises AttributeError


# This is how we trigger off of new blocks
Expand All @@ -57,28 +74,43 @@ def exec_event1(log):
# NOTE: By default, if you have 3 tasks fail in a row, the app will shutdown itself
raise ValueError("I don't like the number 3.")

# You can update state whenever you want
app.state.logs_processed += 1

return {"amount": log.amount}


@app.on_(YFI.Approval)
# Any handler function can be async too
async def exec_event2(log: ContractLog):
if log.log_index % 7 == 6:
# All `app.state` values are updated across all workers at the same time
app.state.logs_processed += 1
# Do any other long running tasks...
await asyncio.sleep(5)
return log.amount


@app.on_(chain.blocks)
# NOTE: You can have multiple handlers for any trigger we support
def check_logs(log):
if app.state.logs_processed > 20:
# If you ever want the app to immediately shutdown under some scenario, raise this exception
raise CircuitBreaker("Oopsie!")

return log.amount


# A final job to execute on Silverback shutdown
@app.on_shutdown()
def app_shutdown():
# raise Exception # NOTE: Any exception raised on shutdown is ignored
# NOTE: Any exception raised on worker shutdown is ignored:
# raise Exception
return {"some_metric": 123}


# Just in case you need to release some resources or something inside each worker
@app.on_worker_shutdown()
def worker_shutdown(state: TaskiqState): # NOTE: You need the type hint here
# This is a good time to release resources
state.db = None
# raise Exception # NOTE: Any exception raised on worker shutdown is ignored

# NOTE: Any exception raised on worker shutdown is ignored:
# raise Exception
4 changes: 2 additions & 2 deletions silverback/__init__.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
from .application import SilverbackApp
from .exceptions import CircuitBreaker, SilverbackException
from .state import AppState
from .state import StateSnapshot

__all__ = [
"AppState",
"StateSnapshot",
"CircuitBreaker",
"SilverbackApp",
"SilverbackException",
Expand Down
121 changes: 112 additions & 9 deletions silverback/application.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import atexit
from collections import defaultdict
from datetime import timedelta
from typing import Any, Callable

Expand All @@ -13,6 +14,7 @@

from .exceptions import ContainerTypeMismatchError, InvalidContainerTypeError
from .settings import Settings
from .state import StateSnapshot
from .types import SilverbackID, TaskType


Expand All @@ -33,6 +35,53 @@ class TaskData(BaseModel):
# NOTE: Any other items here must have a default value


class SharedState(defaultdict):
"""
Class containing the application shared state that all workers can read from and write to.

```{warning}
This is not networked in any way, nor is it multi-process safe, but will be
accessible across multiple thread workers within a single process.
```

Usage example::

@app.on_(...)
def do_something_with_state(value):
# Read from state using `getattr`
... = app.state.something

# Set state using `setattr`
app.state.something = ...

# Read from state using `getitem`
... = app.state["something"]

# Set state using setitem
app.state["something"] = ...
"""

# TODO: This class does not have thread-safe access control, but should remain safe due to
# it being a memory mapping, and writes are strictly controlled to be handled only by
# one worker at a time. There may be issues with using this in production however.

def __init__(self):
# Any unknown key returns None
super().__init__(lambda: None)

def __getattr__(self, attr):
try:
return super().__getattr__(attr)
except AttributeError:
return super().__getitem__(attr)
fubuloubu marked this conversation as resolved.
Show resolved Hide resolved

def __setattr__(self, attr, val):
try:
super().__setattr__(attr, val)
except AttributeError:
super().__setitem__(attr, val)
fubuloubu marked this conversation as resolved.
Show resolved Hide resolved


class SilverbackApp(ManagerAccessMixin):
"""
The application singleton. Must be initialized prior to use.
Expand Down Expand Up @@ -112,6 +161,12 @@ def __init__(self, settings: Settings | None = None):
self._get_user_all_taskdata = self.__register_system_task(
TaskType.SYSTEM_USER_ALL_TASKDATA, self.__get_user_all_taskdata_handler
)
self._load_snapshot = self.__register_system_task(
TaskType.SYSTEM_LOAD_SNAPSHOT, self.__load_snapshot_handler
)
self._create_snapshot = self.__register_system_task(
TaskType.SYSTEM_CREATE_SNAPSHOT, self.__create_snapshot_handler
)

def __register_system_task(
self, task_type: TaskType, task_handler: Callable
Expand Down Expand Up @@ -142,6 +197,34 @@ def __get_user_taskdata_handler(self, task_type: TaskType) -> list[TaskData]:
def __get_user_all_taskdata_handler(self) -> list[TaskData]:
return [v for k, l in self.tasks.items() if str(k).startswith("user:") for v in l]

async def __load_snapshot_handler(self, startup_state: StateSnapshot):
# NOTE: *DO NOT USE* in Runner, as it will not be updated by the app
self.state = SharedState()
# NOTE: attribute does not exist before this task is executed,
# ensuring no one uses it during worker startup

self.state["system:last_block_seen"] = startup_state.last_block_seen
self.state["system:last_block_processed"] = startup_state.last_block_processed
# TODO: Load user custom state (should not start with `system:`)

async def __create_snapshot_handler(
self,
last_block_seen: int | None = None,
last_block_processed: int | None = None,
):
# Task that updates state checkpoints before/after every non-system runtime task/at shutdown
if last_block_seen is not None:
self.state["system:last_block_seen"] = last_block_seen

if last_block_processed is not None:
self.state["system:last_block_processed"] = last_block_processed

return StateSnapshot(
# TODO: Migrate these to parameters (remove explicitly from state)
last_block_seen=self.state.get("system:last_block_seen", -1),
last_block_processed=self.state.get("system:last_block_processed", -1),
)
fubuloubu marked this conversation as resolved.
Show resolved Hide resolved

def broker_task_decorator(
self,
task_type: TaskType,
Expand All @@ -150,6 +233,11 @@ def broker_task_decorator(
"""
Dynamically create a new broker task that handles tasks of ``task_type``.

```{warning}
Dynamically creating a task does not ensure that the runner will be aware of the task
in order to trigger it. Use at your own risk.
```

Args:
task_type: :class:`~silverback.types.TaskType`: The type of task to create.
container: (BlockContainer | ContractEvent): The event source to watch.
Expand Down Expand Up @@ -206,19 +294,21 @@ def add_taskiq_task(handler: Callable) -> AsyncTaskiqDecoratedTask:

def on_startup(self) -> Callable:
"""
Code to execute on one worker upon startup / restart after an error.
Code that will be exected by one worker after worker startup, but before the
application is put into the "run" state by the Runner.

Usage example::

@app.on_startup()
def do_something_on_startup(startup_state):
def do_something_on_startup(startup_state: StateSnapshot):
... # Reprocess missed events or blocks
"""
return self.broker_task_decorator(TaskType.STARTUP)

def on_shutdown(self) -> Callable:
"""
Code to execute on one worker at shutdown.
Code that will be exected by one worker before worker shutdown, after the
Runner has decided to put the application into the "shutdown" state.

Usage example::

Expand All @@ -228,25 +318,37 @@ def do_something_on_shutdown():
"""
return self.broker_task_decorator(TaskType.SHUTDOWN)

# TODO: Abstract away worker startup into dependency system
def on_worker_startup(self) -> Callable:
"""
Code to execute on every worker at startup / restart after an error.
Code to execute on every worker immediately after broker startup.

```{note}
This is a great place to load heavy dependencies for the workers,
such as database connections, ML models, etc.
```

Usage example::

@app.on_startup()
@app.on_worker_startup()
def do_something_on_startup(state):
... # Can provision resources, or add things to `state`.
"""
return self.broker.on_event(TaskiqEvents.WORKER_STARTUP)

# TODO: Abstract away worker shutdown into dependency system
def on_worker_shutdown(self) -> Callable:
"""
Code to execute on every worker at shutdown.
Code to execute on every worker immediately before broker shutdown.

```{note}
This is where you should also release any resources you have loaded during
worker startup.
```

Usage example::

@app.on_shutdown()
@app.on_worker_shutdown()
def do_something_on_shutdown(state):
... # Update some external service, perhaps using information from `state`.
"""
Expand All @@ -255,11 +357,12 @@ def do_something_on_shutdown(state):
def on_(
self,
container: BlockContainer | ContractEvent,
# TODO: possibly remove these
new_block_timeout: int | None = None,
start_block: int | None = None,
):
"""
Create task to handle events created by `container`.
Create task to handle events created by the `container` trigger.

Args:
container: (BlockContainer | ContractEvent): The event source to watch.
Expand Down Expand Up @@ -307,5 +410,5 @@ def on_(
return self.broker_task_decorator(TaskType.EVENT_LOG, container=container)

# TODO: Support account transaction polling
# TODO: Support mempool polling
# TODO: Support mempool polling?
raise InvalidContainerTypeError(container)
Loading
Loading