Skip to content

Commit

Permalink
Merge pull request #278 from FabienArcellier/277-declare-optional-sch…
Browse files Browse the repository at this point in the history
…ema-on-streamsync-state-1

feat:declare optional schema on streamsync state
  • Loading branch information
ramedina86 authored Mar 18, 2024
2 parents 602300c + 677be06 commit f95fb7e
Show file tree
Hide file tree
Showing 7 changed files with 786 additions and 79 deletions.
1 change: 1 addition & 0 deletions docs/docs/.vitepress/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ export default {
{ text: "Page routes", link: "/page-routes" },
{ text: "Sessions", link: "/sessions" },
{ text: "Custom server", link: "/custom-server" },
{ text: "State schema", link: "/state-schema" },
],
},
{
Expand Down
123 changes: 123 additions & 0 deletions docs/docs/state-schema.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
# State schema

Schema declaration on the [Application state](./application-state) allows Streamsync to handle complex serialization
scenario and empower your IDE and toolchains to provide autocomplete and type checking.

## Schema declaration

```python
import streamsync as ss

class AppSchema(ss.StreamsyncState):
counter: int

initial_state = ss.init_state({
"counter": 0
}, schema=AppSchema)

# Event handler
# It receives the session state as an argument and mutates it
def increment(state: AppSchema):
state.counter += 1
```

Access to an attribute by its key is always possible.

```python
def increment(state: AppSchema):
state['counter'] += 1
```

Attributes missing from the schema remain accessible by their key.

```python
initial_state = ss.init_state({
"counter": 0,
"message": None
}, schema=AppSchema)

def increment(state: AppSchema):
state['message'] = "Hello pigeon"
```

## Schema composition

Schema composition allows you to model a complex Application state.

```python
class MyappSchema(ss.State):
title: str

class AppSchema(ss.StreamsyncState):
my_app: MyappSchema
counter: int

initial_state = ss.init_state({
"counter": 0,
"my_app": {
"title": "Nested value"
}
}, schema=AppSchema)
```

## Multi-level dictionary

Some components like Vega require specifying a graph in the form of a multi-level dictionary.

A schema allows you to specify to streamsync that an attribute which contains a dictionary
must be treated as a dictionary and not as a group of state.

```python
class AppSchema(ss.StreamsyncState):
vegas_graph: dict

# Without schema, this handler is execute only once
def handle_vega_graph(state: AppSchema):
graph = state.vega_graph
graph['data']['values'][0]['b'] += 1000
state.vega_graph = graph

initial_state = ss.init_state({
"vegas_graph": {
"data": {
"values": [
{"a": "C", "b": 2}, {"a": "C", "b": 7}, {"a": "C", "b": 4},
{"a": "D", "b": 1}, {"a": "D", "b": 2}, {"a": "D", "b": 6},
{"a": "E", "b": 8}, {"a": "E", "b": 4}, {"a": "E", "b": 7}
]
},
"mark": "bar",
"encoding": {
"x": {"field": "a", "type": "nominal"},
"y": {"aggregate": "average", "field": "b", "type": "quantitative"}
}
},
}, schema=AppSchema)
```

## Type checking

A schema allows you to check the integrity of your backend using the type system.
The code below will raise an error with mypy.

```bash
$ mypy apps/myapp/main.py
apps/myapp/main.py:7: error: "AppSchema" has no attribute "countr"; maybe "counter"? [attr-defined]
```

Here is the code, can you spot the error ?

```python
import streamsync as ss

class AppSchema(ss.StreamsyncState):
counter: int

def increment(state: AppSchema):
state.countr += 1

initial_state = ss.init_state({
"counter": 26,
}, schema=AppSchema)
```

43 changes: 31 additions & 12 deletions src/streamsync/__init__.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import importlib.metadata
from typing import Any, Dict, Optional, Union
from typing import Union, Optional, Dict, Any, Type, TypeVar, cast

from streamsync.core import (BytesWrapper, Config, FileWrapper, Readable,
base_component_tree, initial_state,
session_manager, session_verifier)
from streamsync.core import Readable, FileWrapper, BytesWrapper, Config, StreamsyncState
from streamsync.core import new_initial_state, base_component_tree, session_manager, session_verifier
from streamsync.ui import StreamsyncUIManager

VERSION = importlib.metadata.version("streamsync")
Expand Down Expand Up @@ -31,17 +33,7 @@ def pack_bytes(raw_data, mime_type: Optional[str] = None):

return BytesWrapper(raw_data, mime_type)


def init_state(state_dict: Dict[str, Any]):
"""
Sets the initial state, which will be used as the starting point for
every session.
"""

initial_state.user_state.state = {}
initial_state.user_state.ingest(state_dict)
return initial_state

S = TypeVar('S', bound=StreamsyncState)

def init_ui() -> StreamsyncUIManager:
"""Initializes and returns an instance of StreamsyncUIManager.
Expand All @@ -58,8 +50,35 @@ def init_ui() -> StreamsyncUIManager:
**Example**::
>>> import streamsync as ss
>>>
>>> with ss.init_ui() as ui:
>>> with ui.Page({"key": "hello"}):
>>> ui.Text({"text": "Hello pigeons"})
"""
return StreamsyncUIManager()


def init_state(raw_state: Dict[str, Any], schema: Optional[Type[S]] = None) -> Union[S, StreamsyncState]:
"""
Sets the initial state, which will be used as the starting point for
every session.
initial_state.user_state.state = {}
initial_state.user_state.ingest(state_dict)
return initial_state
>>> import streamsync as ss
>>> initial_state = ss.init_state({
>>> "counter": 0,
>>> }, schema=AppSchema)
"""
concrete_schema = cast(Type[S], StreamsyncState if schema is None else schema)
if not issubclass(concrete_schema, StreamsyncState):
raise ValueError("Root schema must inherit from StreamsyncState")

_initial_state: S = new_initial_state(concrete_schema, raw_state)
return _initial_state
14 changes: 11 additions & 3 deletions src/streamsync/app_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ def _execute_user_code(self) -> None:
captured_stdout = f.getvalue()

if captured_stdout:
streamsync.initial_state.add_log_entry(
streamsync.core.initial_state.add_log_entry(
"info", "Stdout message during initialisation", captured_stdout)

def _apply_configuration(self) -> None:
Expand Down Expand Up @@ -326,7 +326,7 @@ def _main(self) -> None:
try:
streamsync.base_component_tree.ingest(self.bmc_components)
except BaseException:
streamsync.initial_state.add_log_entry(
streamsync.core.initial_state.add_log_entry(
"error", "UI Components Error", "Couldn't load components. An exception was raised.", tb.format_exc())
if self.mode == "run":
terminate_early = True
Expand All @@ -336,14 +336,22 @@ def _main(self) -> None:
except BaseException:
# Initialisation errors will be sent to all sessions via mail during session initialisation

streamsync.initial_state.add_log_entry(
streamsync.core.initial_state.add_log_entry(
"error", "Code Error", "Couldn't execute code. An exception was raised.", tb.format_exc())

# Exit if in run mode

if self.mode == "run":
terminate_early = True

try:
streamsync.base_component_tree.ingest(self.bmc_components)
except BaseException:
streamsync.core.initial_state.add_log_entry(
"error", "UI Components Error", "Couldn't load components. An exception was raised.", tb.format_exc())
if self.mode == "run":
terminate_early = True

if terminate_early:
self._terminate_early()
return
Expand Down
Loading

0 comments on commit f95fb7e

Please sign in to comment.