Skip to content

Commit

Permalink
feat: declare optional schema on streamsync state
Browse files Browse the repository at this point in the history
  • Loading branch information
FabienArcellier committed Mar 4, 2024
1 parent 26eb89a commit c95a645
Show file tree
Hide file tree
Showing 7 changed files with 578 additions and 65 deletions.
1 change: 1 addition & 0 deletions docs/docs/.vitepress/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ export default {
{ text: "Page routes", link: "/page-routes" },
{ text: "Sessions", link: "/sessions" },
{ text: "Custom server", link: "/custom-server" },
{ text: "State schema", link: "/state-schema" },
],
},
{
Expand Down
123 changes: 123 additions & 0 deletions docs/docs/state-schema.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
# State schema

Schema declaration on the [Application state](./application-state) allows Streamsync to handle complex serialization
scenario and empower your IDE and toolchains to provide autocomplete and type checking.

## Schema declaration

```python
import streamsync as ss

class AppSchema(ss.StreamsyncState):
counter: int

initial_state = ss.init_state({
"counter": 0
}, schema=AppSchema)

# Event handler
# It receives the session state as an argument and mutates it
def increment(state: AppSchema):
state.counter += 1
```

Access to an attribute by its key is always possible.

```python
def increment(state: AppSchema):
state['counter'] += 1
```

Attributes missing from the schema remain accessible by their key.

```python
initial_state = ss.init_state({
"counter": 0,
"message": None
}, schema=AppSchema)

def increment(state: AppSchema):
state['message'] = "Hello pigeon"
```

## Schema composition

Schema composition allows you to model a complex Application state.

```python
class MyappSchema(ss.State):
title: str

class AppSchema(ss.StreamsyncState):
my_app: MyappSchema
counter: int

initial_state = ss.init_state({
"counter": 0,
"my_app": {
"title": "Nested value"
}
}, schema=AppSchema)
```

## Multi-level dictionary

Some components like Vega require specifying a graph in the form of a multi-level dictionary.

A schema allows you to specify to streamsync that an attribute which contains a dictionary
must be treated as a dictionary and not as a group of state.

```python
class AppSchema(ss.StreamsyncState):
vegas_graph: dict

# Without schema, this handler is execute only once
def handle_vega_graph(state: AppSchema):
graph = state.vega_graph
graph['data']['values'][0]['b'] += 1000
state.vega_graph = graph

initial_state = ss.init_state({
"vegas_graph": {
"data": {
"values": [
{"a": "C", "b": 2}, {"a": "C", "b": 7}, {"a": "C", "b": 4},
{"a": "D", "b": 1}, {"a": "D", "b": 2}, {"a": "D", "b": 6},
{"a": "E", "b": 8}, {"a": "E", "b": 4}, {"a": "E", "b": 7}
]
},
"mark": "bar",
"encoding": {
"x": {"field": "a", "type": "nominal"},
"y": {"aggregate": "average", "field": "b", "type": "quantitative"}
}
},
}, schema=AppSchema)
```

## Type checking

A schema allows you to check the integrity of your backend using the type system.
The code below will raise an error with mypy.

```bash
$ mypy apps/myapp/main.py
apps/myapp/main.py:7: error: "AppSchema" has no attribute "countr"; maybe "counter"? [attr-defined]
```

Here is the code, can you spot the error ?

```python
import streamsync as ss

class AppSchema(ss.StreamsyncState):
counter: int

def increment(state: AppSchema):
state.countr += 1

initial_state = ss.init_state({
"counter": 26,
}, schema=AppSchema)
```

25 changes: 18 additions & 7 deletions src/streamsync/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import importlib.metadata
from typing import Union, Optional, Dict, Any
from streamsync.core import Readable, FileWrapper, BytesWrapper, Config
from streamsync.core import initial_state, base_component_tree, session_manager, session_verifier
from typing import Union, Optional, Dict, Any, Type, TypeVar, cast
from streamsync.core import Readable, FileWrapper, BytesWrapper, Config, StreamsyncState
from streamsync.core import new_initial_state, base_component_tree, session_manager, session_verifier

VERSION = importlib.metadata.version("streamsync")

Expand All @@ -28,13 +28,24 @@ def pack_bytes(raw_data, mime_type: Optional[str] = None):

return BytesWrapper(raw_data, mime_type)

S = TypeVar('S', bound=StreamsyncState)

def init_state(state_dict: Dict[str, Any]):

def init_state(state_dict: Dict[str, Any], schema: Optional[Type[S]] = None) -> Union[S, StreamsyncState]:
"""
Sets the initial state, which will be used as the starting point for
every session.
>>> import streamsync as ss
>>> initial_state = ss.init_state({
>>> "counter": 0,
>>> }, schema=AppSchema)
"""
concrete_schema = cast(Type[S], StreamsyncState if schema is None else schema)
if not issubclass(concrete_schema, StreamsyncState):
raise ValueError("Root schema must inherit from StreamsyncState")

initial_state.user_state.state = {}
initial_state.user_state.ingest(state_dict)
return initial_state
_initial_state: S = new_initial_state(concrete_schema)
_initial_state.ingest(state_dict)
return _initial_state
6 changes: 3 additions & 3 deletions src/streamsync/app_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ def _execute_user_code(self) -> None:
captured_stdout = f.getvalue()

if captured_stdout:
streamsync.initial_state.add_log_entry(
streamsync.core.initial_state.add_log_entry(
"info", "Stdout message during initialisation", captured_stdout)

def _apply_configuration(self) -> None:
Expand Down Expand Up @@ -329,7 +329,7 @@ def _main(self) -> None:
except BaseException:
# Initialisation errors will be sent to all sessions via mail during session initialisation

streamsync.initial_state.add_log_entry(
streamsync.core.initial_state.add_log_entry(
"error", "Code Error", "Couldn't execute code. An exception was raised.", tb.format_exc())

# Exit if in run mode
Expand All @@ -340,7 +340,7 @@ def _main(self) -> None:
try:
streamsync.base_component_tree.ingest(self.bmc_components)
except BaseException:
streamsync.initial_state.add_log_entry(
streamsync.core.initial_state.add_log_entry(
"error", "UI Components Error", "Couldn't load components. An exception was raised.", tb.format_exc())
if self.mode == "run":
terminate_early = True
Expand Down
Loading

0 comments on commit c95a645

Please sign in to comment.