From b4b5c374886284b8a9d71778a134b6c08184c6af Mon Sep 17 00:00:00 2001 From: Chris O'Hara Date: Mon, 11 Mar 2024 09:25:42 +1000 Subject: [PATCH 1/4] Experiment with using dill for serialization --- pyproject.toml | 1 + src/dispatch/scheduler.py | 22 ++++++++++++++++++++-- 2 files changed, 21 insertions(+), 2 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index a9d36b6f..4f3b2f1e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -15,6 +15,7 @@ dependencies = [ "grpc-stubs >= 1.53.0.5", "http-message-signatures >= 0.4.4", "tblib >= 3.0.0", + "dill >= 0.3.8" ] [project.optional-dependencies] diff --git a/src/dispatch/scheduler.py b/src/dispatch/scheduler.py index 320b2193..db10966f 100644 --- a/src/dispatch/scheduler.py +++ b/src/dispatch/scheduler.py @@ -1,9 +1,13 @@ import logging +import os import pickle import sys from dataclasses import dataclass from typing import Any, Callable, Protocol, TypeAlias +import dill # type: ignore +import dill.detect # type: ignore + from dispatch.coroutine import Gather from dispatch.error import IncompatibleStateError from dispatch.experimental.durable.function import DurableCoroutine, DurableGenerator @@ -245,7 +249,7 @@ def _rebuild_state(self, input: Input): "resuming scheduler with %d bytes of state", len(input.coroutine_state) ) try: - state = pickle.loads(input.coroutine_state) + state = deserialize(input.coroutine_state) if not isinstance(state, State): raise ValueError("invalid state") if state.version != self.version: @@ -421,7 +425,7 @@ def _run(self, input: Input) -> Output: # Serialize coroutines and scheduler state. logger.debug("serializing state") try: - serialized_state = pickle.dumps(state) + serialized_state = serialize(state) except pickle.PickleError as e: logger.exception("state could not be serialized") return Output.error(Error.from_exception(e, status=Status.PERMANENT_ERROR)) @@ -446,6 +450,20 @@ def _run(self, input: Input) -> Output: ) +TRACE = os.getenv("DISPATCH_TRACE") + + +def serialize(obj: Any) -> bytes: + if TRACE: + with dill.detect.trace(): + return dill.dumps(obj, byref=True) + return dill.dumps(obj, byref=True) + + +def deserialize(state: bytes) -> Any: + return dill.loads(state) + + def correlation_id(coroutine_id: CoroutineID, call_id: CallID) -> CorrelationID: return coroutine_id << 32 | call_id From b4a4ff2d9071ba9dfba9ee855fcb4aa492f1fae8 Mon Sep 17 00:00:00 2001 From: Chris O'Hara Date: Mon, 11 Mar 2024 09:40:13 +1000 Subject: [PATCH 2/4] Update README --- README.md | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/README.md b/README.md index c175146c..22872852 100644 --- a/README.md +++ b/README.md @@ -203,8 +203,10 @@ async def transform(msg): ### Serialization -Dispatch uses the [pickle] library to serialize coroutines. +Dispatch uses the [dill] library to serialize coroutines, which is +an extension of [pickle] from the standard library. +[dill]: https://dill.readthedocs.io/en/latest/ [pickle]: https://docs.python.org/3/library/pickle.html Serialization of coroutines is enabled by a CPython extension. @@ -213,10 +215,10 @@ The user must ensure that the contents of their stack frames are serializable. That is, users should avoid using variables inside coroutines that cannot be pickled. -If a pickle error is encountered, serialization tracing can be enabled -with the `DISPATCH_TRACE=1` environment variable to debug the issue. The -stacks of coroutines and generators will be printed to stdout before -the pickle library attempts serialization. +If a serialization error is encountered, tracing can be enabled with the +`DISPATCH_TRACE=1` environment variable. The object graph, and the stacks +of coroutines and generators, will be printed to stdout before any +serialization is attempted. For help with a serialization issues, please submit a [GitHub issue][issues]. From 3088949fb48204147c4d491be359cd25682d8a15 Mon Sep 17 00:00:00 2001 From: Chris O'Hara Date: Mon, 11 Mar 2024 09:49:05 +1000 Subject: [PATCH 3/4] Provide a way to make breaking changes to scheduler state --- src/dispatch/scheduler.py | 43 +++++++++++++++++++++++++++------------ 1 file changed, 30 insertions(+), 13 deletions(-) diff --git a/src/dispatch/scheduler.py b/src/dispatch/scheduler.py index db10966f..665532e3 100644 --- a/src/dispatch/scheduler.py +++ b/src/dispatch/scheduler.py @@ -148,9 +148,12 @@ def __repr__(self): class State: """State of the scheduler and the coroutines it's managing.""" - version: str + interpreter_version: str + scheduler_version: int + suspended: dict[CoroutineID, Coroutine] ready: list[Coroutine] + next_coroutine_id: int next_call_id: int @@ -159,6 +162,11 @@ class State: outstanding_calls: int +# Version of the scheduler and its state. Increment this when a breaking +# change is introduced. +SCHEDULER_VERSION = 1 + + class OneShotScheduler: """Scheduler for local coroutines. @@ -169,7 +177,7 @@ class OneShotScheduler: __slots__ = ( "entry_point", - "version", + "interpreter_version", "poll_min_results", "poll_max_results", "poll_max_wait_seconds", @@ -178,7 +186,7 @@ class OneShotScheduler: def __init__( self, entry_point: Callable, - version: str = sys.version, + interpreter_version: str = sys.version, poll_min_results: int = 1, poll_max_results: int = 10, poll_max_wait_seconds: int | None = None, @@ -188,9 +196,9 @@ def __init__( Args: entry_point: Entry point for the main coroutine. - version: Version string to attach to scheduler/coroutine state. - If the scheduler sees a version mismatch, it will respond to - Dispatch with an INCOMPATIBLE_STATE status code. + interpreter_version: Version string to attach to scheduler / + coroutine state. If the scheduler sees a version mismatch it will + respond to Dispatch with an INCOMPATIBLE_STATE status code. poll_min_results: Minimum number of call results to wait for before coroutine execution should continue. Dispatch waits until this @@ -204,14 +212,15 @@ def __init__( while waiting for call results. Optional. """ self.entry_point = entry_point - self.version = version + self.interpreter_version = interpreter_version self.poll_min_results = poll_min_results self.poll_max_results = poll_max_results self.poll_max_wait_seconds = poll_max_wait_seconds logger.debug( - "booting coroutine scheduler with entry point '%s' version '%s'", + "booting coroutine scheduler with entry point '%s', interpreter version '%s', scheduler version %d", entry_point.__qualname__, - version, + self.interpreter_version, + SCHEDULER_VERSION, ) def run(self, input: Input) -> Output: @@ -235,7 +244,8 @@ def _init_state(self, input: Input) -> State: raise ValueError("entry point is not a @dispatch.function") return State( - version=sys.version, + interpreter_version=sys.version, + scheduler_version=SCHEDULER_VERSION, suspended={}, ready=[Coroutine(id=0, parent_id=None, coroutine=main)], next_coroutine_id=1, @@ -252,12 +262,19 @@ def _rebuild_state(self, input: Input): state = deserialize(input.coroutine_state) if not isinstance(state, State): raise ValueError("invalid state") - if state.version != self.version: + + if state.interpreter_version != self.interpreter_version: raise ValueError( - f"version mismatch: '{state.version}' vs. current '{self.version}'" + f"interpreter version mismatch: '{state.interpreter_version}' vs. current '{self.interpreter_version}'" ) + if state.scheduler_version != SCHEDULER_VERSION: + raise ValueError( + f"scheduler version mismatch: {state.scheduler_version} vs. current {SCHEDULER_VERSION}" + ) + return state - except (pickle.PickleError, ValueError) as e: + + except (pickle.PickleError, AttributeError, ValueError) as e: logger.warning("state is incompatible", exc_info=True) raise IncompatibleStateError from e From d2656243e7dd7b7ddd4337723e40f62445d3803c Mon Sep 17 00:00:00 2001 From: Chris O'Hara Date: Mon, 11 Mar 2024 09:58:12 +1000 Subject: [PATCH 4/4] Clarify how serialization works --- README.md | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/README.md b/README.md index 22872852..26a36b66 100644 --- a/README.md +++ b/README.md @@ -203,22 +203,24 @@ async def transform(msg): ### Serialization -Dispatch uses the [dill] library to serialize coroutines, which is -an extension of [pickle] from the standard library. +Serialization of coroutines is enabled by a CPython extension that +exposes internal details about stack frames. + +Dispatch then uses the [dill] library to serialize these stack frames. +Note that `dill` is an extension of [pickle] from the standard library. [dill]: https://dill.readthedocs.io/en/latest/ [pickle]: https://docs.python.org/3/library/pickle.html -Serialization of coroutines is enabled by a CPython extension. - The user must ensure that the contents of their stack frames are -serializable. That is, users should avoid using variables inside -coroutines that cannot be pickled. +serializable. That is, users should either avoid using variables inside +coroutines that cannot be pickled, or should wrap them in a container +that is serializable. If a serialization error is encountered, tracing can be enabled with the `DISPATCH_TRACE=1` environment variable. The object graph, and the stacks -of coroutines and generators, will be printed to stdout before any -serialization is attempted. +of coroutines and generators, will be printed to stdout before serialization +is attempted. This allows users to pinpoint where serialization issues occur. For help with a serialization issues, please submit a [GitHub issue][issues].