Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: adds persistence layer for app state and job results [SBK-363] #45

Merged
merged 50 commits into from
Nov 29, 2023
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
50 commits
Select commit Hold shift + click to select a range
c46ec08
feat: adds persistence layer for app state and job results
mikeshultz Nov 18, 2023
b956d48
style(lint): fix imports
mikeshultz Nov 18, 2023
7ba2f71
fix: fix types
mikeshultz Nov 18, 2023
85a5579
style(lint): disagreement flake8 didn't like black output
mikeshultz Nov 18, 2023
ec43642
refactor: adds silverback_startup task, and on_client_* and on_worker…
mikeshultz Nov 18, 2023
8d2427c
fix: add missing silverback_shutdown task exec in runner
mikeshultz Nov 18, 2023
2564382
style(lint): lint rolling
mikeshultz Nov 18, 2023
e4caa59
refactor: remove unused SilverbackApp.checkpoint() method
mikeshultz Nov 18, 2023
8be2d10
refactor: remove mongo, add sqlite
mikeshultz Nov 20, 2023
82cb68e
fix: type | None is new in 3.10
mikeshultz Nov 20, 2023
c420f49
fix: forgot to save...
mikeshultz Nov 20, 2023
7e9e67a
style: minor cleanup and docstrings
mikeshultz Nov 21, 2023
134748d
style(lint): where that pre-commit hook when you need it
mikeshultz Nov 21, 2023
f58911a
fix: call to self.network.__enter__ should be before we try and touch…
mikeshultz Nov 21, 2023
0506ad5
feat(cli): adds simple `worker` command
mikeshultz Nov 21, 2023
2d67e49
docs: move example, create redis example, update README
mikeshultz Nov 21, 2023
98a7adb
fix(style): remove unused import
mikeshultz Nov 21, 2023
19a458b
style(lint): chill, mypy
mikeshultz Nov 21, 2023
4bb23c8
fix: update examples to fix taskiq issue
mikeshultz Nov 21, 2023
4d50d4c
style(lint): b0rk b0rk b0rk b0rk
mikeshultz Nov 21, 2023
2d219b1
fix: worker command now uses broker app uses
mikeshultz Nov 22, 2023
9273cb0
refactor: simplify app strings for examples
mikeshultz Nov 22, 2023
f7059c4
fix: drop pickle for security, use generic types for HandlerResult re…
mikeshultz Nov 22, 2023
7dc5bd0
refactor: HandlerResult should subclass TaskiqResult
mikeshultz Nov 22, 2023
c300fb6
docs: update docs for worker events, new on_startup behavior, nad ela…
mikeshultz Nov 22, 2023
e96e4f2
refactor: use position arg for reasons
mikeshultz Nov 24, 2023
29fa6ce
fix: surface errors in persistence when saving results
mikeshultz Nov 24, 2023
a544e33
refactor: move results persistence calls to middleware
mikeshultz Nov 24, 2023
00b2bf9
Merge branch 'main' into feat/persistence
mikeshultz Nov 24, 2023
e8dcda3
Update silverback/_cli.py
mikeshultz Nov 27, 2023
ee76273
refactor: use lazy init and not require runner or whatever to have to…
mikeshultz Nov 27, 2023
9779c4b
Merge branch 'feat/persistence' of github.com:ApeWorX/silverback into…
mikeshultz Nov 27, 2023
0f1f7f2
fix: run all requested workers
mikeshultz Nov 27, 2023
8dfb17c
fix: wut
mikeshultz Nov 27, 2023
a0d1c20
docs: add section covering distributed configuration
mikeshultz Nov 27, 2023
e32297d
refactor: remove PERSISTENCE_URI, use first-class SQLITE_PATH
mikeshultz Nov 27, 2023
a7b0719
fix: leftover PERSISTENCE_URI in settings type
mikeshultz Nov 27, 2023
3892e13
refactor: minor cleanup of unused settings arg in BasePersistentStorage
mikeshultz Nov 27, 2023
db7cce6
docs: help string update
mikeshultz Nov 27, 2023
60c4fd1
refactor: remove on_client_* silverback decorators
mikeshultz Nov 27, 2023
f35048c
refactor: storage->store
mikeshultz Nov 27, 2023
40a2041
refactor: instance_state -> state
mikeshultz Nov 27, 2023
0f630b0
fix: remove unnecessary Union types
mikeshultz Nov 27, 2023
4a6a65a
Merge branch 'feat/persistence' of github.com:ApeWorX/silverback into…
mikeshultz Nov 27, 2023
ccc96d8
refactor(docs): move back to unified example script
mikeshultz Nov 27, 2023
ee51bec
docs(refactor): fix example reference
mikeshultz Nov 27, 2023
463f8f6
docs: add 0.2.0 notes on event handler decorator behavior changes
mikeshultz Nov 28, 2023
f7e44c5
docs: update for multi-process configuration
mikeshultz Nov 28, 2023
4f0b11b
docs: arg clarity
mikeshultz Nov 28, 2023
5bf000c
refactor: SilverbackIdent -> SilverbackID
mikeshultz Nov 29, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@
"IPython", # Console for interacting
"ipdb", # Debugger (Must use `export PYTHONBREAKPOINT=ipdb.set_trace`)
],
"mongo": [
"beanie~=1.23.6",
],
}

# NOTE: `pip install -e .[dev]` to install package
Expand All @@ -49,6 +52,7 @@
+ extras_require["doc"]
+ extras_require["release"]
+ extras_require["dev"]
+ extras_require["mongo"]
)

with open("./README.md") as readme:
Expand Down
2 changes: 2 additions & 0 deletions silverback/__init__.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
from .application import SilverbackApp
from .exceptions import CircuitBreaker, SilverbackException
from .types import SilverbackStartupState

__all__ = [
"CircuitBreaker",
"SilverbackApp",
"SilverbackException",
"SilverbackStartupState",
]
72 changes: 69 additions & 3 deletions silverback/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,19 +75,67 @@ def __init__(self, settings: Optional[Settings] = None):

def on_startup(self) -> Callable:
"""
Code to execute on worker startup / restart after an error.
Code to execute on one worker upon startup / restart after an error.

Usage example::

@app.on_startup()
def do_something_on_startup(state):
... # Can provision resources, or add things to `state`.
"""
return self.broker.on_event(TaskiqEvents.WORKER_STARTUP)
return self.broker.task(task_name="silverback_startup")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what are the pros/cons of moving the startup/shutdown handlers into separate tasks from the worker startup/shutdown events

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They cannot be combined. They're very separate things.

Having on_startup be a worker event means unexpected duplication of execution. Having a separate task also allows us to feed the handler useful information, like last_block_*. This is basically what we need for ApePay.

Renaming the previous handlers to be explicit worker event handlers implies their actual usage and is clearer, IMO. They're two distinct things.

The only real con is we're changing up behavior here. Anyone setting up worker state may need to refactor.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They cannot be combined. They're very separate things.

Having on_startup be a worker event means unexpected duplication of execution. Having a separate task also allows us to feed the handler useful information, like last_block_*. This is basically what we need for ApePay.

Renaming the previous handlers to be explicit worker event handlers implies their actual usage and is clearer, IMO. They're two distinct things.

The only real con is we're changing up behavior here. Anyone setting up worker state may need to refactor.

but won't this mean that only one worker will handle this event on startup? vs. the worker startup event is executed on every worker as it's set up?

Copy link
Contributor Author

@mikeshultz mikeshultz Nov 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes. on_startup will only run once upon runner startup. on_worker_startup will run on every worker startup.


def on_shutdown(self) -> Callable:
"""
Code to execute on normal worker shutdown.
Code to execute on one worker at shutdown.

Usage example::

@app.on_shutdown()
def do_something_on_shutdown(state):
... # Update some external service, perhaps using information from `state`.
"""
return self.broker.task(task_name="silverback_shutdown")
mikeshultz marked this conversation as resolved.
Show resolved Hide resolved

def on_client_startup(self) -> Callable:
mikeshultz marked this conversation as resolved.
Show resolved Hide resolved
"""
Code to execute on client startup / restart after an error.

Usage example::

@app.on_startup()
def do_something_on_startup(state):
... # Can provision resources, or add things to `state`.
"""
return self.broker.on_event(TaskiqEvents.CLIENT_STARTUP)
mikeshultz marked this conversation as resolved.
Show resolved Hide resolved

def on_client_shutdown(self) -> Callable:
"""
Code to execute on client shutdown.

Usage example::

@app.on_shutdown()
def do_something_on_shutdown(state):
... # Update some external service, perhaps using information from `state`.
"""
return self.broker.on_event(TaskiqEvents.CLIENT_SHUTDOWN)

def on_worker_startup(self) -> Callable:
"""
Code to execute on every worker at startup / restart after an error.

Usage example::

@app.on_startup()
def do_something_on_startup(state):
... # Can provision resources, or add things to `state`.
"""
return self.broker.on_event(TaskiqEvents.WORKER_STARTUP)

def on_worker_shutdown(self) -> Callable:
"""
Code to execute on every worker at shutdown.

Usage example::

Expand All @@ -97,6 +145,24 @@ def do_something_on_shutdown(state):
"""
return self.broker.on_event(TaskiqEvents.WORKER_SHUTDOWN)

def get_startup_handler(self) -> Optional[AsyncTaskiqDecoratedTask]:
"""
Get access to the handler for `silverback_startup` events.

Returns:
Optional[AsyncTaskiqDecoratedTask]: Returns decorated task, if one has been created.
"""
return self.broker.available_tasks.get("silverback_startup")

def get_shutdown_handler(self) -> Optional[AsyncTaskiqDecoratedTask]:
"""
Get access to the handler for `silverback_shutdown` events.

Returns:
Optional[AsyncTaskiqDecoratedTask]: Returns decorated task, if one has been created.
"""
return self.broker.available_tasks.get("silverback_shutdown")

def get_block_handler(self) -> Optional[AsyncTaskiqDecoratedTask]:
"""
Get access to the handler for `block` events.
Expand Down
16 changes: 4 additions & 12 deletions silverback/middlewares.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
from ape.logging import logger
from ape.types import ContractLog, HexBytes
from ape.types import ContractLog
from ape.utils import ManagerAccessMixin
from taskiq import TaskiqMessage, TaskiqMiddleware, TaskiqResult

from silverback.utils import hexbytes_dict


class SilverbackMiddleware(TaskiqMiddleware, ManagerAccessMixin):
def __init__(self, *args, **kwargs):
Expand Down Expand Up @@ -47,20 +49,10 @@ def _create_label(self, message: TaskiqMessage) -> str:
return f"{message.task_name}{args}"

def pre_execute(self, message: TaskiqMessage) -> TaskiqMessage:
def fix_dict(data: dict) -> dict:
fixed_data = {}
for name, value in data.items():
if isinstance(value, str) and value.startswith("0x"):
fixed_data[name] = HexBytes(value)
else:
fixed_data[name] = value

return fixed_data

if message.task_name == "block":
# NOTE: Necessary because we don't know the exact block class
message.args[0] = self.provider.network.ecosystem.decode_block(
fix_dict(message.args[0])
hexbytes_dict(message.args[0])
)

elif "event" in message.task_name:
Expand Down
Loading