Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: adds persistence layer for app state and job results [SBK-363] #45

Merged
merged 50 commits into from
Nov 29, 2023
Merged
Show file tree
Hide file tree
Changes from 39 commits
Commits
Show all changes
50 commits
Select commit Hold shift + click to select a range
c46ec08
feat: adds persistence layer for app state and job results
mikeshultz Nov 18, 2023
b956d48
style(lint): fix imports
mikeshultz Nov 18, 2023
7ba2f71
fix: fix types
mikeshultz Nov 18, 2023
85a5579
style(lint): disagreement flake8 didn't like black output
mikeshultz Nov 18, 2023
ec43642
refactor: adds silverback_startup task, and on_client_* and on_worker…
mikeshultz Nov 18, 2023
8d2427c
fix: add missing silverback_shutdown task exec in runner
mikeshultz Nov 18, 2023
2564382
style(lint): lint rolling
mikeshultz Nov 18, 2023
e4caa59
refactor: remove unused SilverbackApp.checkpoint() method
mikeshultz Nov 18, 2023
8be2d10
refactor: remove mongo, add sqlite
mikeshultz Nov 20, 2023
82cb68e
fix: type | None is new in 3.10
mikeshultz Nov 20, 2023
c420f49
fix: forgot to save...
mikeshultz Nov 20, 2023
7e9e67a
style: minor cleanup and docstrings
mikeshultz Nov 21, 2023
134748d
style(lint): where that pre-commit hook when you need it
mikeshultz Nov 21, 2023
f58911a
fix: call to self.network.__enter__ should be before we try and touch…
mikeshultz Nov 21, 2023
0506ad5
feat(cli): adds simple `worker` command
mikeshultz Nov 21, 2023
2d67e49
docs: move example, create redis example, update README
mikeshultz Nov 21, 2023
98a7adb
fix(style): remove unused import
mikeshultz Nov 21, 2023
19a458b
style(lint): chill, mypy
mikeshultz Nov 21, 2023
4bb23c8
fix: update examples to fix taskiq issue
mikeshultz Nov 21, 2023
4d50d4c
style(lint): b0rk b0rk b0rk b0rk
mikeshultz Nov 21, 2023
2d219b1
fix: worker command now uses broker app uses
mikeshultz Nov 22, 2023
9273cb0
refactor: simplify app strings for examples
mikeshultz Nov 22, 2023
f7059c4
fix: drop pickle for security, use generic types for HandlerResult re…
mikeshultz Nov 22, 2023
7dc5bd0
refactor: HandlerResult should subclass TaskiqResult
mikeshultz Nov 22, 2023
c300fb6
docs: update docs for worker events, new on_startup behavior, nad ela…
mikeshultz Nov 22, 2023
e96e4f2
refactor: use position arg for reasons
mikeshultz Nov 24, 2023
29fa6ce
fix: surface errors in persistence when saving results
mikeshultz Nov 24, 2023
a544e33
refactor: move results persistence calls to middleware
mikeshultz Nov 24, 2023
00b2bf9
Merge branch 'main' into feat/persistence
mikeshultz Nov 24, 2023
e8dcda3
Update silverback/_cli.py
mikeshultz Nov 27, 2023
ee76273
refactor: use lazy init and not require runner or whatever to have to…
mikeshultz Nov 27, 2023
9779c4b
Merge branch 'feat/persistence' of github.com:ApeWorX/silverback into…
mikeshultz Nov 27, 2023
0f1f7f2
fix: run all requested workers
mikeshultz Nov 27, 2023
8dfb17c
fix: wut
mikeshultz Nov 27, 2023
a0d1c20
docs: add section covering distributed configuration
mikeshultz Nov 27, 2023
e32297d
refactor: remove PERSISTENCE_URI, use first-class SQLITE_PATH
mikeshultz Nov 27, 2023
a7b0719
fix: leftover PERSISTENCE_URI in settings type
mikeshultz Nov 27, 2023
3892e13
refactor: minor cleanup of unused settings arg in BasePersistentStorage
mikeshultz Nov 27, 2023
db7cce6
docs: help string update
mikeshultz Nov 27, 2023
60c4fd1
refactor: remove on_client_* silverback decorators
mikeshultz Nov 27, 2023
f35048c
refactor: storage->store
mikeshultz Nov 27, 2023
40a2041
refactor: instance_state -> state
mikeshultz Nov 27, 2023
0f630b0
fix: remove unnecessary Union types
mikeshultz Nov 27, 2023
4a6a65a
Merge branch 'feat/persistence' of github.com:ApeWorX/silverback into…
mikeshultz Nov 27, 2023
ccc96d8
refactor(docs): move back to unified example script
mikeshultz Nov 27, 2023
ee51bec
docs(refactor): fix example reference
mikeshultz Nov 27, 2023
463f8f6
docs: add 0.2.0 notes on event handler decorator behavior changes
mikeshultz Nov 28, 2023
f7e44c5
docs: update for multi-process configuration
mikeshultz Nov 28, 2023
4f0b11b
docs: arg clarity
mikeshultz Nov 28, 2023
5bf000c
refactor: SilverbackIdent -> SilverbackID
mikeshultz Nov 29, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,18 +43,18 @@ python3 setup.py install

## Quick Usage

Checkout [the example](./example.py) to see how to use the library.
Checkout [the example](./examples/memory/main.py) to see how to use the library.

To run your bot against a live network, this SDK includes a simple runner you can use via:

```sh
$ silverback run "example:app" --network :mainnet:alchemy
$ silverback run "examples.memory:app" --network :mainnet:alchemy
```

## Docker Usage

```sh
$ docker run --volume $PWD:/home/harambe/project --volume ~/.tokenlists:/home/harambe/.tokenlists apeworx/silverback:latest run "example:app" --network :mainnet:alchemy
$ docker run --volume $PWD:/home/harambe/project --volume ~/.tokenlists:/home/harambe/.tokenlists apeworx/silverback:latest run "examples.memory:app" --network :mainnet:alchemy
```

## Development
Expand Down
76 changes: 71 additions & 5 deletions docs/userguides/development.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,19 +61,57 @@ Any errors you raise during this function will get captured by the client, and r

## Startup and Shutdown

If you have heavier resources you want to load during startup, or otherwise perform some data collection prior to starting the bot, you can add a startup function like so:
### Worker Events

If you have heavier resources you want to load during startup, or want to initialize things like database connections, you can add a worker startup function like so:

```py
@app.on_startup()
@app.on_worker_startup()
def handle_on_worker_startup(state):
# Connect to DB, set initial state, etc
...

@app.on_worker_shutdown()
def handle_on_worker_shutdown(state):
# cleanup resources, close connections cleanly, etc
...
```

This function comes a parameter `state` that you can use for storing the results of your startup computation or resources that you have provisioned.
It's import to note that this is useful for ensuring that your workers (of which there can be multiple) have the resources necessary to properly handle any updates you want to make in your handler functions, such as connecting to the Telegram API, an SQL or NoSQL database connection, or something else.
The `state` variable is also useful as this gets made available to each handler method so other stateful quantities can be maintained for other uses.

TODO: Add more information about `state`
It's import to note that this is useful for ensuring that your workers (of which there can be multiple) have the resources necessary to properly handle any updates you want to make in your handler functions, such as connecting to the Telegram API, an SQL or NoSQL database connection, or something else. **This function will run on every worker process**.

#### Worker State

The `state` variable is also useful as this can be made available to each handler method so other stateful quantities can be maintained for other uses. Each distributed worker has its own instance of state.

To access the state from a handler, you must annotate `context` as a dependency like so:

```py
from typing import Annotated
from taskiq import Context, TaskiqDepends

@app.on_(chain.blocks)
def block_handler(block, context: Annotated[Context, TaskiqDepends()]):
# Access state via context.state
...
```

### Application Events

You can also add an application startup and shutdown handler that will be **executed once upon every application startup**. This may be useful for things like processing historical events since the application was shutdown or other one-time actions to perform at startup.

```py
@app.on_startup()
def handle_on_startup(state):
# Process missed events, etc
...

@app.on_shutdown()
def handle_on_startup(state):
# Record final state, etc
...
```

## Running your Application

Expand Down Expand Up @@ -101,6 +139,34 @@ If you configure your application to use a signer, and that signer signs anythin
Always test your applications throughly before deploying.
```

### Distributed Execution

Using only the `silverback run ...` command in a defualt configuration executes everything in one process and the job queue is completely in-memory with a shared state. In some high volume environments, you may want to deploy your Silverback application in a distributed configuration.
mikeshultz marked this conversation as resolved.
Show resolved Hide resolved

The primary components are the client and workers. The client handles Silverback events (blocks and contract event logs) and creates jobs for the workers to process in an asynchronous manner.

For this to work, you must configure a [TaskIQ broker](https://taskiq-python.github.io/guide/architecture-overview.html#broker) capable of distributed processing. For instance, with [`taskiq_redis`](https://github.com/taskiq-python/taskiq-redis) you could do something like this for the client:
mikeshultz marked this conversation as resolved.
Show resolved Hide resolved

```bash
export SILVERBACK_BROKER_CLASS="taskiq_redis:ListQueueBroker"
export SILVERBACK_BROKER_URI="redis://127.0.0.1:6379"

silverback run "examples.redis.main:app" \
--network :mainnet:alchemy \
--runner "silverback.runner:WebsocketRunner"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question(non-blocking): should we standardize runner to client?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hard question, tbh. Would it be right to say "choose your client" to the user who wants to say use the WebsocketRunner?

```

And then the worker process with 2 worker subprocesses:

```bash
export SILVERBACK_BROKER_CLASS="taskiq_redis:ListQueueBroker"
export SILVERBACK_BROKER_URI="redis://127.0.0.1:6379"

silverback worker -w2 "examples.redis.main:app"
```

This will run one client and 2 workers and all queue data will be go through Redis.

## Testing your Application

TODO: Add backtesting mode w/ `silverback test`
Expand Down
48 changes: 0 additions & 48 deletions example.py

This file was deleted.

3 changes: 3 additions & 0 deletions examples/memory/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from .main import app

__all__ = ["app"]
74 changes: 74 additions & 0 deletions examples/memory/main.py
mikeshultz marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
from typing import Annotated

from ape import chain
from ape.api import BlockAPI
from ape.types import ContractLog
from ape_tokens import tokens # type: ignore[import]
from taskiq import Context, TaskiqDepends, TaskiqState

from silverback import CircuitBreaker, SilverbackApp, SilverbackStartupState

# Do this to initialize your app
app = SilverbackApp()

# NOTE: Don't do any networking until after initializing app
USDC = tokens["USDC"]
YFI = tokens["YFI"]


@app.on_startup()
def app_startup(startup_state: SilverbackStartupState):
return {"message": "Starting...", "block_number": startup_state.last_block_seen}


@app.on_client_startup()
def client_startup(state):
return {"message": "Client started."}


# Can handle some initialization on startup, like models or network connections
@app.on_worker_startup()
def worker_startup(state: TaskiqState):
state.block_count = 0
# state.db = MyDB()
return {"message": "Worker started."}


# This is how we trigger off of new blocks
@app.on_(chain.blocks)
# context must be a type annotated kwarg to be provided to the task
def exec_block(block: BlockAPI, context: Annotated[Context, TaskiqDepends()]):
context.state.block_count += 1
return len(block.transactions)


# This is how we trigger off of events
# Set new_block_timeout to adjust the expected block time.
@app.on_(USDC.Transfer, start_block=18588777, new_block_timeout=25)
# NOTE: Typing isn't required
def exec_event1(log):
if log.log_index % 7 == 3:
# If you ever want the app to shutdown under some scenario, call this exception
raise CircuitBreaker("Oopsie!")
return {"amount": log.amount}


@app.on_(YFI.Approval)
# Any handler function can be async too
async def exec_event2(log: ContractLog):
return log.amount


# Just in case you need to release some resources or something
@app.on_worker_shutdown()
def worker_shutdown(state):
return {
"message": f"Worker stopped after handling {state.block_count} blocks.",
"block_count": state.block_count,
}


# A final job to execute on Silverback shutdown
@app.on_shutdown()
def app_shutdown(state):
return {"message": "Stopping..."}
3 changes: 3 additions & 0 deletions examples/redis/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from .main import app

__all__ = ["app"]
75 changes: 75 additions & 0 deletions examples/redis/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
from typing import Annotated

from ape import chain
from ape.api import BlockAPI
from ape.types import ContractLog
from ape_tokens import tokens # type: ignore[import]
from taskiq import Context, TaskiqDepends, TaskiqState

from silverback import CircuitBreaker, SilverbackApp, SilverbackStartupState

# Do this to initialize your app
app = SilverbackApp()

# NOTE: Don't do any networking until after initializing app
USDC = tokens["USDC"]
YFI = tokens["YFI"]


# Can handle some stuff on startup, like loading a heavy model or something
@app.on_startup()
def app_startup(startup_state: SilverbackStartupState):
return {"message": "Starting...", "block_number": startup_state.last_block_seen}


@app.on_client_startup()
def client_startup(state):
return {"message": "Client started."}


@app.on_worker_startup()
def worker_startup(state: TaskiqState):
state.block_count = 0
# state.db = MyDB()
mikeshultz marked this conversation as resolved.
Show resolved Hide resolved
return {"message": "Worker started."}


# This is how we trigger off of new blocks
@app.on_(chain.blocks)
# context must be a type annotated kwarg to be provided to the task
def exec_block(block: BlockAPI, context: Annotated[Context, TaskiqDepends()]):
context.state.block_count += 1
return len(block.transactions)


# This is how we trigger off of events
# Set new_block_timeout to adjust the expected block time.
@app.on_(USDC.Transfer, start_block=18588777, new_block_timeout=25)
# NOTE: Typing isn't required
def exec_event1(log):
if log.log_index % 7 == 3:
# If you ever want the app to shutdown under some scenario, call this exception
raise CircuitBreaker("Oopsie!")
return {"amount": log.amount}


@app.on_(YFI.Approval)
# Any handler function can be async too
async def exec_event2(log: ContractLog):
return log.amount


# Just in case you need to release some resources or something
@app.on_worker_shutdown()
def worker_shutdown(state):
block_count = state.block_count if hasattr(state, "block_count") else 0
return {
"message": f"Worker stopped after handling {block_count} blocks.",
"block_count": block_count,
}


# A final job to execute on Silverback shutdown
@app.on_shutdown()
def app_shutdown(state):
return {"message": "Stopping..."}
2 changes: 2 additions & 0 deletions silverback/__init__.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
from .application import SilverbackApp
from .exceptions import CircuitBreaker, SilverbackException
from .types import SilverbackStartupState

__all__ = [
"CircuitBreaker",
"SilverbackApp",
"SilverbackException",
"SilverbackStartupState",
]
40 changes: 39 additions & 1 deletion silverback/_cli.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
import asyncio
import os
from concurrent.futures import ThreadPoolExecutor

import click
from ape.cli import AccountAliasPromptChoice, ape_cli_context, network_option, verbosity_option
from taskiq import AsyncBroker
from taskiq.cli.worker.run import shutdown_broker
from taskiq.receiver import Receiver

from silverback._importer import import_from_string
from silverback.runner import PollingRunner
Expand Down Expand Up @@ -40,7 +44,27 @@ def _network_callback(ctx, param, val):
return val


@cli.command()
async def run_worker(broker: AsyncBroker, worker_count=2, shutdown_timeout=90):
try:
tasks = []
with ThreadPoolExecutor(max_workers=worker_count) as pool:
for _ in range(worker_count):
receiver = Receiver(
broker=broker,
executor=pool,
validate_params=True,
max_async_tasks=1,
max_prefetch=0,
)
broker.is_worker_process = True
tasks.append(receiver.listen())

await asyncio.gather(*tasks)
finally:
await shutdown_broker(broker, shutdown_timeout)


@cli.command(help="Run Silverback application client")
@ape_cli_context()
@verbosity_option()
@network_option(default=None, callback=_network_callback)
Expand All @@ -57,3 +81,17 @@ def run(cli_ctx, network, account, runner, max_exceptions, path):
app = import_from_string(path)
runner = runner(app, max_exceptions=max_exceptions)
asyncio.run(runner.run())


@cli.command(help="Run Silverback application task workers")
@ape_cli_context()
@verbosity_option()
@network_option(default=None, callback=_network_callback)
@click.option("--account", type=AccountAliasPromptChoice(), callback=_account_callback)
@click.option("-w", "--workers", type=int, default=2)
@click.option("-x", "--max-exceptions", type=int, default=3)
@click.option("-s", "--shutdown_timeout", type=int, default=90)
@click.argument("path")
def worker(cli_ctx, network, account, workers, max_exceptions, shutdown_timeout, path):
app = import_from_string(path)
asyncio.run(run_worker(app.broker, worker_count=workers, shutdown_timeout=shutdown_timeout))
Loading