From 732784a226091636b928d5faff462e956e9b661e Mon Sep 17 00:00:00 2001 From: Ramiro Medina <64783088+ramedina86@users.noreply.github.com> Date: Fri, 15 Nov 2024 16:43:48 +0000 Subject: [PATCH 01/20] test: Workflows --- ...ts-workflows_workflow-0-auxjfi7lssb268ly.jsonl | 8 +++----- tests/e2e/presets/workflows/.wf/metadata.json | 2 +- tests/e2e/tests/workflows.spec.ts | 15 +++++++-------- 3 files changed, 11 insertions(+), 14 deletions(-) diff --git a/tests/e2e/presets/workflows/.wf/components-workflows_workflow-0-auxjfi7lssb268ly.jsonl b/tests/e2e/presets/workflows/.wf/components-workflows_workflow-0-auxjfi7lssb268ly.jsonl index c9267239c..8a10d040c 100644 --- a/tests/e2e/presets/workflows/.wf/components-workflows_workflow-0-auxjfi7lssb268ly.jsonl +++ b/tests/e2e/presets/workflows/.wf/components-workflows_workflow-0-auxjfi7lssb268ly.jsonl @@ -1,6 +1,4 @@ {"id": "auxjfi7lssb268ly", "type": "workflows_workflow", "content": {"key": "handle_object"}, "handlers": {}, "isCodeManaged": false, "parentId": "workflows_root", "position": 0} -{"id": "bgfri5xbo3l5916z", "type": "workflows_writeraddchatmessage", "content": {"conversationStateElement": "convo", "message": "@{payload}"}, "handlers": {}, "isCodeManaged": false, "outs": [{"toNodeId": "36w4ghu0oioub912", "outId": "success"}], "parentId": "auxjfi7lssb268ly", "position": 0, "x": 473, "y": 204} -{"id": "36w4ghu0oioub912", "type": "workflows_writerchat", "content": {"conversationStateElement": "convo", "tools": "{\"get_employee_info\":{\"description\":\"Gets info for an employee, given an employee id\",\"parameters\":{\"id\":{\"type\":\"string\",\"description\":\"Id of the employee\"}},\"type\":\"function\"}}", "useStreaming": "no"}, "handlers": {}, "isCodeManaged": false, "outs": [{"toNodeId": "6ktplk37vue14rog", "outId": "tools_get_employee_info"}], "parentId": "auxjfi7lssb268ly", "position": 1, "x": 889, "y": 173} -{"id": "6ktplk37vue14rog", "type": "workflows_httprequest", "content": {"url": "https://reqres.in/api/users/@{id}"}, "handlers": {}, "isCodeManaged": false, "outs": [{"toNodeId": "8ly2cojv5m77lfpd", "outId": "success"}], "parentId": "auxjfi7lssb268ly", "position": 2, "x": 1382, "y": 152} -{"id": "i183fmlmluglk2tg", "type": "workflows_writerinitchat", "content": {"alias": "", "conversationStateElement": "convo"}, "handlers": {}, "isCodeManaged": false, "outs": [{"toNodeId": "bgfri5xbo3l5916z", "outId": "success"}], "parentId": "auxjfi7lssb268ly", "position": 3, "x": 52, "y": 203} -{"id": "8ly2cojv5m77lfpd", "type": "workflows_returnvalue", "content": {"value": "@{result.body}"}, "handlers": {}, "isCodeManaged": false, "parentId": "auxjfi7lssb268ly", "position": 4, "x": 1744, "y": 170} +{"id": "8y56lmia3wu99jhl", "type": "workflows_parsejson", "content": {"plainText": "{\"color\": \"@{payload}\", \"object\": \"@{context.item.object}\"}"}, "handlers": {}, "isCodeManaged": false, "outs": [{"toNodeId": "xy6vdzh2pm55alc0", "outId": "success"}], "parentId": "auxjfi7lssb268ly", "position": 0, "x": 208, "y": 321} +{"id": "xy6vdzh2pm55alc0", "type": "workflows_setstate", "content": {"alias": "Save the JSON", "element": "json_e2e", "value": "@{result}"}, "handlers": {}, "isCodeManaged": false, "outs": [{"toNodeId": "mve8ssvtk0pvw5yf", "outId": "success"}], "parentId": "auxjfi7lssb268ly", "position": 1, "x": 537, "y": 321} +{"id": "mve8ssvtk0pvw5yf", "type": "workflows_returnvalue", "content": {"alias": "", "value": "@{json_e2e}"}, "handlers": {}, "isCodeManaged": false, "parentId": "auxjfi7lssb268ly", "position": 2, "x": 876, "y": 317} diff --git a/tests/e2e/presets/workflows/.wf/metadata.json b/tests/e2e/presets/workflows/.wf/metadata.json index cab48d17b..f3c150d97 100644 --- a/tests/e2e/presets/workflows/.wf/metadata.json +++ b/tests/e2e/presets/workflows/.wf/metadata.json @@ -1,3 +1,3 @@ { - "writer_version": "0.8.0rc6" + "writer_version": "0.8.0rc7" } \ No newline at end of file diff --git a/tests/e2e/tests/workflows.spec.ts b/tests/e2e/tests/workflows.spec.ts index 7d4bd980a..2f145c73c 100644 --- a/tests/e2e/tests/workflows.spec.ts +++ b/tests/e2e/tests/workflows.spec.ts @@ -1,4 +1,3 @@ -/* import { test, expect } from "@playwright/test"; const setTextField = async (page, text) => { @@ -8,7 +7,7 @@ const setTextField = async (page, text) => { .fill(text); } -test.describe("state autocompletion", () => { +test.describe("Workflows", () => { let url: string; test.beforeAll(async ({request}) => { @@ -25,13 +24,14 @@ test.describe("state autocompletion", () => { await page.goto(url); }); - test.describe("text", () => { + test.describe("Payload and context", () => { + + const instancePaths = ["root:0,c0f99a9e-5004-4e75-a6c6-36f17490b134:0,ixxb26ukbvr0sknw:0,iftqnmjw8ipaknex:0,7no34ag7gmwgm1rd:0", "root:0,c0f99a9e-5004-4e75-a6c6-36f17490b134:0,ixxb26ukbvr0sknw:0,iftqnmjw8ipaknex:0,7no34ag7gmwgm1rd:0"]; + test("completion", async ({ page }) => { - - const instancePaths = ["root:0,c0f99a9e-5004-4e75-a6c6-36f17490b134:0,ixxb26ukbvr0sknw:0,iftqnmjw8ipaknex:0,7no34ag7gmwgm1rd:0", "root:0,c0f99a9e-5004-4e75-a6c6-36f17490b134:0,ixxb26ukbvr0sknw:0,iftqnmjw8ipaknex:0,7no34ag7gmwgm1rd:0"]; + page.locator('.BuilderFieldsText[data-automation-key="text"] .fieldStateAutocomplete span.prop:text-matches("string")').click(); await setTextField(page, "@{types."); - page.locator('.BuilderFieldsText[data-automation-key="text"] .fieldStateAutocomplete span.prop:text-matches("string")').click(); await expect(page .locator('.BuilderFieldsText[data-automation-key="text"] .templateInput')) .toHaveValue("@{types.string"); @@ -39,5 +39,4 @@ test.describe("state autocompletion", () => { }); -}); -*/ \ No newline at end of file +}); \ No newline at end of file From e7a82fb7263be4891dfadec83169e2c876846a52 Mon Sep 17 00:00:00 2001 From: Ramiro Medina <64783088+ramedina86@users.noreply.github.com> Date: Tue, 19 Nov 2024 09:28:09 +0000 Subject: [PATCH 02/20] chore: Refactor evaluator --- src/writer/core.py | 214 ++------------------------------ src/writer/evaluator.py | 207 ++++++++++++++++++++++++++++++ tests/backend/test_core.py | 126 ------------------- tests/backend/test_evaluator.py | 172 +++++++++++++++++++++++++ 4 files changed, 389 insertions(+), 330 deletions(-) create mode 100644 src/writer/evaluator.py create mode 100644 tests/backend/test_evaluator.py diff --git a/src/writer/core.py b/src/writer/core.py index d3e8c9e6f..026eaed9f 100644 --- a/src/writer/core.py +++ b/src/writer/core.py @@ -7,12 +7,10 @@ import functools import inspect import io -import json import logging import math import multiprocessing import numbers -import os import re import secrets import time @@ -45,14 +43,13 @@ import pyarrow # type: ignore import writer.workflows -from writer import core_ui +from writer import core_ui, evaluator from writer.core_ui import Component from writer.ss_types import ( DataframeRecordAdded, DataframeRecordRemoved, DataframeRecordUpdated, InstancePath, - InstancePathItem, Readable, ServeMode, WorkflowExecutionLog, @@ -1203,8 +1200,8 @@ class EventDeserialiser: Its main goal is to deserialise incoming content in a controlled and predictable way, applying sanitisation of inputs where relevant.""" - def __init__(self, session_state: WriterState, session_component_tree: core_ui.ComponentTree): - self.evaluator = Evaluator(session_state, session_component_tree) + def __init__(self, session: "WriterSession"): + self.evaluator = evaluator.Evaluator(session) def transform(self, ev: WriterEvent) -> None: # Events without payloads are safe @@ -1448,198 +1445,6 @@ def _transform_dataframe_action(self, ev: WriterEvent) -> Optional[Dict]: return payload -class Evaluator: - - """ - Evaluates templates and expressions in the backend. - It allows for the sanitisation of frontend inputs. - """ - - template_regex = re.compile(r"[\\]?@{([^{]*?)}") - - def __init__(self, session_state: WriterState, session_component_tree: core_ui.ComponentTree): - self.wf = session_state - self.ct = session_component_tree - - def evaluate_field(self, instance_path: InstancePath, field_key: str, as_json=False, default_field_value="", base_context={}) -> Any: - def replacer(matched): - if matched.string[0] == "\\": # Escaped @, don't evaluate - return matched.string - expr = matched.group(1).strip() - expr_value = self.evaluate_expression(expr, instance_path, base_context) - - try: - if as_json: - serialised_value = state_serialiser.serialise(expr_value) - if not isinstance(serialised_value, str): - serialised_value = json.dumps(serialised_value) - return serialised_value - return expr_value - except BaseException: - raise ValueError( - f"""Couldn't serialise value of type "{ type(expr_value) }" when evaluating field "{ field_key }".""") - - component_id = instance_path[-1]["componentId"] - component = self.ct.get_component(component_id) - if component: - field_value = component.content.get(field_key) or default_field_value - replaced = None - full_match = self.template_regex.fullmatch(field_value) - - if full_match is None: - replaced = self.template_regex.sub(lambda m: str(replacer(m)), field_value) - else: - replaced = replacer(full_match) - - if (replaced is not None) and as_json: - replaced_as_json = None - try: - replaced_as_json = json.loads(replaced) - except json.JSONDecodeError: - replaced_as_json = json.loads(default_field_value) - return replaced_as_json - else: - return replaced - else: - raise ValueError(f"Couldn't acquire a component by ID '{component_id}'") - - - def get_context_data(self, instance_path: InstancePath, base_context={}) -> Dict[str, Any]: - context: Dict[str, Any] = base_context - for i in range(len(instance_path)): - path_item = instance_path[i] - component_id = path_item["componentId"] - component = self.ct.get_component(component_id) - if not component: - continue - if component.type != "repeater": - continue - if i + 1 >= len(instance_path): - continue - repeater_instance_path = instance_path[0:i+1] - next_instance_path = instance_path[0:i+2] - instance_number = next_instance_path[-1]["instanceNumber"] - repeater_object = self.evaluate_field( - repeater_instance_path, "repeaterObject", True, """{ "a": { "desc": "Option A" }, "b": { "desc": "Option B" } }""") - key_variable = self.evaluate_field( - repeater_instance_path, "keyVariable", False, "itemId") - value_variable = self.evaluate_field( - repeater_instance_path, "valueVariable", False, "item") - - repeater_items: List[Tuple[Any, Any]] = [] - if isinstance(repeater_object, dict): - repeater_items = list(repeater_object.items()) - elif isinstance(repeater_object, list): - repeater_items = list(enumerate(repeater_object)) - else: - raise ValueError( - "Cannot produce context. Repeater object must evaluate to a dictionary.") - - context[key_variable] = repeater_items[instance_number][0] - context[value_variable] = repeater_items[instance_number][1] - - if len(instance_path) > 0: - context['target'] = instance_path[-1]['componentId'] - - return context - - def set_state(self, expr: str, instance_path: InstancePath, value: Any, base_context = {}) -> None: - accessors = self.parse_expression(expr, instance_path, base_context) - state_ref: StateProxy = self.wf.user_state - leaf_state_ref: StateProxy = state_ref - - for accessor in accessors[:-1]: - if isinstance(state_ref, StateProxy): - leaf_state_ref = state_ref - - if isinstance(state_ref, list): - state_ref = state_ref[int(accessor)] - else: - state_ref = state_ref[accessor] - - if not isinstance(state_ref, (StateProxy, dict)): - raise ValueError( - f"Incorrect state reference. Reference \"{expr}\" isn't part of a StateProxy or dict.") - - state_ref[accessors[-1]] = value - leaf_state_ref.apply_mutation_marker(recursive=True) - - def parse_expression(self, expr: str, instance_path: Optional[InstancePath] = None, base_context = {}) -> List[str]: - - """ Returns a list of accessors from an expression. """ - - accessors: List[str] = [] - s = "" - level = 0 - - i = 0 - while i < len(expr): - character = expr[i] - if character == "\\": - if i + 1 < len(expr): - s += expr[i + 1] - i += 1 - elif character == ".": - if level == 0: - accessors.append(s) - s = "" - else: - s += character - elif character == "[": - if level == 0: - accessors.append(s) - s = "" - else: - s += character - level += 1 - elif character == "]": - level -= 1 - if level == 0: - s = str(self.evaluate_expression(s, instance_path, base_context)) - else: - s += character - else: - s += character - - i += 1 - - if s: - accessors.append(s) - - return accessors - - def get_env_variable_value(self, expr: str): - return os.getenv(expr[1:]) - - def evaluate_expression(self, expr: str, instance_path: Optional[InstancePath] = None, base_context = {}) -> Any: - context_data = base_context - result = None - if instance_path: - context_data = self.get_context_data(instance_path, base_context) - context_ref: Any = context_data - state_ref: Any = self.wf.user_state.state - accessors: List[str] = self.parse_expression(expr, instance_path, base_context) - - for accessor in accessors: - if isinstance(state_ref, (StateProxy, dict)) and accessor in state_ref: - state_ref = state_ref.get(accessor) - result = state_ref - elif isinstance(state_ref, (list)) and state_ref[int(accessor)] is not None: - state_ref = state_ref[int(accessor)] - result = state_ref - elif isinstance(context_ref, dict) and accessor in context_ref: - context_ref = context_ref.get(accessor) - result = context_ref - - if isinstance(result, StateProxy): - return result.to_dict() - - if result is None and expr.startswith("$"): - return self.get_env_variable_value(expr) - - return result - - class WriterSession: """ @@ -1770,8 +1575,9 @@ def __init__(self, session: WriterSession) -> None: self.session = session self.session_state = session.session_state self.session_component_tree = session.session_component_tree - self.deser = EventDeserialiser(self.session_state, self.session_component_tree) - self.evaluator = Evaluator(self.session_state, self.session_component_tree) + self.deser = EventDeserialiser(session) + self.evaluator = evaluator.Evaluator(session) + self.workflow_runner = writer.workflows.WorkflowRunner(session) def _handle_binding(self, event_type, target_component, instance_path, payload) -> None: @@ -1784,15 +1590,15 @@ def _handle_binding(self, event_type, target_component, instance_path, payload) def _get_workflow_callable(self, workflow_key: Optional[str], workflow_id: Optional[str]): def fn(payload, context, session): - execution_env = { + execution_environment = { "payload": payload, "context": context, "session": session } if workflow_key: - writer.workflows.run_workflow_by_key(self.session, workflow_key, execution_env) + self.workflow_runner.run_workflow_by_key( workflow_key, execution_environment) elif workflow_id: - writer.workflows.run_workflow(self.session, workflow_id, execution_env) + self.workflow_runner.run_workflow(workflow_id, execution_environment) return fn def _get_handler_callable(self, target_component: Component, event_type: str) -> Optional[Callable]: @@ -1819,7 +1625,7 @@ def _call_handler_callable( self, event_type: str, target_component: Component, - instance_path: List[InstancePathItem], + instance_path: InstancePath, payload: Any ) -> Any: diff --git a/src/writer/evaluator.py b/src/writer/evaluator.py new file mode 100644 index 000000000..7bd5ba8af --- /dev/null +++ b/src/writer/evaluator.py @@ -0,0 +1,207 @@ +import json +import os +import re +from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple + +from writer.ss_types import ( + InstancePath, +) + +import writer.core + +if TYPE_CHECKING: + from writer.core import WriterState, WriterSession + from writer.core_ui import ComponentTree + + +class Evaluator: + + """ + Evaluates templates and expressions in the backend. + It allows for the sanitisation of frontend inputs. + """ + + TEMPLATE_REGEX = re.compile(r"[\\]?@{([^{]*?)}") + + def __init__(self, session: "WriterSession"): + self.state = session.session_state + self.component_tree = session.component_tree + self.serializer = writer.core.StateSerialiser() + + def evaluate_field(self, instance_path: InstancePath, field_key: str, as_json=False, default_field_value="", base_context={}) -> Any: + def replacer(matched): + if matched.string[0] == "\\": # Escaped @, don't evaluate + return matched.string + expr = matched.group(1).strip() + expr_value = self.evaluate_expression(expr, instance_path, base_context) + + try: + if as_json: + serialized_value = self.serializer.serialise(expr_value) + if not isinstance(serialized_value, str): + serialized_value = json.dumps(serialized_value) + return serialized_value + return expr_value + except BaseException: + raise ValueError( + f"""Couldn't serialize value of type "{ type(expr_value) }" when evaluating field "{ field_key }".""") + + component_id = instance_path[-1]["componentId"] + component = self.component_tree.get_component(component_id) + if component: + field_value = component.content.get(field_key) or default_field_value + replaced = None + full_match = self.TEMPLATE_REGEX.fullmatch(field_value) + + if full_match is None: + replaced = self.TEMPLATE_REGEX.sub(lambda m: str(replacer(m)), field_value) + else: + replaced = replacer(full_match) + + if (replaced is not None) and as_json: + replaced_as_json = None + try: + replaced_as_json = json.loads(replaced) + except json.JSONDecodeError: + replaced_as_json = json.loads(default_field_value) + return replaced_as_json + else: + return replaced + else: + raise ValueError(f"Couldn't acquire a component by ID '{component_id}'") + + + def get_context_data(self, instance_path: InstancePath, base_context={}) -> Dict[str, Any]: + context: Dict[str, Any] = base_context + for i in range(len(instance_path)): + path_item = instance_path[i] + component_id = path_item["componentId"] + component = self.component_tree.get_component(component_id) + if not component: + continue + if component.type != "repeater": + continue + if i + 1 >= len(instance_path): + continue + repeater_instance_path = instance_path[0:i+1] + next_instance_path = instance_path[0:i+2] + instance_number = next_instance_path[-1]["instanceNumber"] + repeater_object = self.evaluate_field( + repeater_instance_path, "repeaterObject", True, """{ "a": { "desc": "Option A" }, "b": { "desc": "Option B" } }""") + key_variable = self.evaluate_field( + repeater_instance_path, "keyVariable", False, "itemId") + value_variable = self.evaluate_field( + repeater_instance_path, "valueVariable", False, "item") + + repeater_items: List[Tuple[Any, Any]] = [] + if isinstance(repeater_object, dict): + repeater_items = list(repeater_object.items()) + elif isinstance(repeater_object, list): + repeater_items = list(enumerate(repeater_object)) + else: + raise ValueError( + "Cannot produce context. Repeater object must evaluate to a dictionary.") + + context[key_variable] = repeater_items[instance_number][0] + context[value_variable] = repeater_items[instance_number][1] + + if len(instance_path) > 0: + context['target'] = instance_path[-1]['componentId'] + + return context + + def set_state(self, expr: str, instance_path: InstancePath, value: Any, base_context = {}) -> None: + accessors = self.parse_expression(expr, instance_path, base_context) + state_ref: writer.core.StateProxy = self.state.user_state + leaf_state_ref: writer.core.StateProxy = state_ref + + for accessor in accessors[:-1]: + if isinstance(state_ref, writer.core.StateProxy): + leaf_state_ref = state_ref + + if isinstance(state_ref, list): + state_ref = state_ref[int(accessor)] + else: + state_ref = state_ref[accessor] + + if not isinstance(state_ref, (writer.core.StateProxy, dict)): + raise ValueError( + f"Incorrect state reference. Reference \"{expr}\" isn't part of a StateProxy or dict.") + + state_ref[accessors[-1]] = value + leaf_state_ref.apply_mutation_marker(recursive=True) + + def parse_expression(self, expr: str, instance_path: Optional[InstancePath] = None, base_context = {}) -> List[str]: + + """ Returns a list of accessors from an expression. """ + + accessors: List[str] = [] + s = "" + level = 0 + + i = 0 + while i < len(expr): + character = expr[i] + if character == "\\": + if i + 1 < len(expr): + s += expr[i + 1] + i += 1 + elif character == ".": + if level == 0: + accessors.append(s) + s = "" + else: + s += character + elif character == "[": + if level == 0: + accessors.append(s) + s = "" + else: + s += character + level += 1 + elif character == "]": + level -= 1 + if level == 0: + s = str(self.evaluate_expression(s, instance_path, base_context)) + else: + s += character + else: + s += character + + i += 1 + + if s: + accessors.append(s) + + return accessors + + def get_env_variable_value(self, expr: str): + return os.getenv(expr[1:]) + + def evaluate_expression(self, expr: str, instance_path: Optional[InstancePath] = None, base_context = {}) -> Any: + context_data = base_context + result = None + if instance_path: + context_data = self.get_context_data(instance_path, base_context) + context_ref: Any = context_data + state_ref: Any = self.state.user_state + accessors: List[str] = self.parse_expression(expr, instance_path, base_context) + + for accessor in accessors: + if isinstance(state_ref, (writer.core.StateProxy, dict)) and accessor in state_ref: + state_ref = state_ref.get(accessor) + result = state_ref + elif isinstance(state_ref, (list)) and state_ref[int(accessor)] is not None: + state_ref = state_ref[int(accessor)] + result = state_ref + elif isinstance(context_ref, dict) and accessor in context_ref: + context_ref = context_ref.get(accessor) + result = context_ref + + if isinstance(result, writer.core.StateProxy): + return result.to_dict() + + if result is None and expr.startswith("$"): + return self.get_env_variable_value(expr) + + return result \ No newline at end of file diff --git a/tests/backend/test_core.py b/tests/backend/test_core.py index dbada5043..ce19e01b4 100644 --- a/tests/backend/test_core.py +++ b/tests/backend/test_core.py @@ -17,7 +17,6 @@ from writer import audit_and_fix, wf_project from writer.core import ( BytesWrapper, - Evaluator, EventDeserialiser, FileWrapper, MutableValue, @@ -29,14 +28,10 @@ import_failure, parse_state_variable_expression, ) -from writer.core_ui import Component from writer.ss_types import WriterEvent from tests.backend import test_app_dir from tests.backend.fixtures import ( - core_ui_fixtures, - file_fixtures, - load_fixture_content, writer_fixtures, ) @@ -1144,127 +1139,6 @@ def test_polars_df(self) -> None: assert table.column("name")[0].as_py() == "Byte" assert table.column("length_cm")[2].as_py() == 32 -class TestEvaluator: - - def test_evaluate_field_simple(self) -> None: - - instance_path = [ - {"componentId": "root", "instanceNumber": 0}, - {"componentId": "4b6f14b0-b2d9-43e7-8aba-8d3e939c1f83", "instanceNumber": 0}, - {"componentId": "0cd59329-29c8-4887-beee-39794065221e", "instanceNumber": 0} - - ] - st = WriterState({ - "counter": 8 - }) - ct = session.session_component_tree - e = Evaluator(st, ct) - evaluated = e.evaluate_field(instance_path, "text") - assert evaluated == "The counter is 8" - - def test_evaluate_field_repeater(self) -> None: - instance_path_base = [ - {"componentId": "root", "instanceNumber": 0}, - {"componentId": "4b6f14b0-b2d9-43e7-8aba-8d3e939c1f83", "instanceNumber": 0}, - {"componentId": "f811ca14-8915-443d-8dd3-77ae69fb80f4", "instanceNumber": 0} - ] - instance_path_0 = instance_path_base + [ - {"componentId": "2e688107-f865-419b-a07b-95103197e3fd", "instanceNumber": 0} - ] - instance_path_2 = instance_path_base + [ - {"componentId": "2e688107-f865-419b-a07b-95103197e3fd", "instanceNumber": 2} - ] - st = WriterState({ - "prog_languages": { - "c": "C", - "py": "Python", - "js": "JavaScript", - "ts": "TypeScript" - } - }) - ct = session.session_component_tree - e = Evaluator(st, ct) - assert e.evaluate_field( - instance_path_0, "text") == "The id is c and the name is C" - assert e.evaluate_field( - instance_path_2, "text") == "The id is js and the name is JavaScript" - - def test_set_state(self) -> None: - instance_path = [ - {"componentId": "root", "instanceNumber": 0} - ] - st = WriterState(raw_state_dict) - ct = session.session_component_tree - e = Evaluator(st, ct) - e.set_state("name", instance_path, "Roger") - e.set_state("dynamic_prop", instance_path, "height") - e.set_state("features[dynamic_prop]", instance_path, "toddler height") - e.set_state("features.new_feature", instance_path, "blue") - assert st["name"] == "Roger" - assert st["features"]["height"] == "toddler height" - assert st["features"]["new_feature"] == "blue" - - def test_evaluate_expression(self) -> None: - instance_path = [ - {"componentId": "root", "instanceNumber": 0} - ] - st = WriterState(raw_state_dict) - ct = session.session_component_tree - e = Evaluator(st, ct) - assert e.evaluate_expression("features.eyes", instance_path) == "green" - assert e.evaluate_expression("best_feature", instance_path) == "eyes" - assert e.evaluate_expression("features[best_feature]", instance_path) == "green" - assert e.evaluate_expression("a\.b", instance_path) == 3 - - def test_get_context_data_should_return_the_target_of_event(self) -> None: - """ - Test that the target of the event is correctly returned by the get_context_data method - - Here we reproduce a click on a button - """ - # Given - st = WriterState({}) - ct = core_ui_fixtures.build_fake_component_tree([ - Component(id="button1", parentId="root", type="button") - ], init_root=True) - - e = Evaluator(st, ct) - - # When - context = e.get_context_data([ - {"componentId": "root", "instanceNumber": 0}, - {"componentId": "button1", "instanceNumber": 0} - ]) - - # Then - assert context.get("target") == "button1" - - def test_get_context_data_should_return_the_repeater_position_and_the_target_inside_the_repeater(self) -> None: - """ - Test that the repeater position and target of the event is correctly returned by the get_context_data method - - Here we reproduce a click on a button - """ - # Given - st = WriterState({}) - ct = core_ui_fixtures.build_fake_component_tree([ - Component(id="repeater1", parentId="root", type="repeater", content={'keyVariable': 'item', 'valueVariable': 'value', 'repeaterObject': json.dumps({'a': 'A', 'b': 'B'})}), - Component(id="button1", parentId="repeater1", type="button") - ], init_root=True) - - e = Evaluator(st, ct) - - # When - context = e.get_context_data([ - {"componentId": "root", "instanceNumber": 0}, - {"componentId": "repeater1", "instanceNumber": 0}, - {"componentId": "button1", "instanceNumber": 1} - ]) - - # Then - assert context.get("target") == "button1" - assert context.get("item") == "b" - assert context.get("value") == "B" class TestSessionManager: diff --git a/tests/backend/test_evaluator.py b/tests/backend/test_evaluator.py new file mode 100644 index 000000000..269c09b46 --- /dev/null +++ b/tests/backend/test_evaluator.py @@ -0,0 +1,172 @@ +import json + +import numpy as np +import writer as wf +from writer import audit_and_fix, wf_project +from writer import evaluator +from writer.core import ( + WriterState, +) +from writer.core_ui import Component + +from tests.backend import test_app_dir +from tests.backend.fixtures import ( + core_ui_fixtures, +) + +raw_state_dict = { + "name": "Robert", + "age": 1, + "interests": ["lamps", "cars"], + "state.with.dots": { + "photo.jpeg": "Not available", + }, + "features": { + "eyes": "green", + "height": "very short" + }, + "best_feature": "eyes", + "utfࠀ": 23, + "counter": 4, + "_private": 3, + # Used as an example of something unserialisable yet pickable + "_private_unserialisable": np.array([[1+2j, 2, 3+3j]]), + "a.b": 3 +} + +simple_dict = {"items": { + "Apple": {"name": "Apple", "type": "fruit"}, + "Cucumber": {"name": "Cucumber", "type": "vegetable"}, + "Lettuce": {"name": "Lettuce", "type": "vegetable"} + }} + +wf.Config.is_mail_enabled_for_log = True +wf.init_state(raw_state_dict) + +_, sc = wf_project.read_files(test_app_dir) +sc = audit_and_fix.fix_components(sc) + +session = wf.session_manager.get_new_session() +session.session_component_tree.ingest(sc) + +class TestEvaluator: + + def test_evaluate_field_simple(self) -> None: + + instance_path = [ + {"componentId": "root", "instanceNumber": 0}, + {"componentId": "4b6f14b0-b2d9-43e7-8aba-8d3e939c1f83", "instanceNumber": 0}, + {"componentId": "0cd59329-29c8-4887-beee-39794065221e", "instanceNumber": 0} + + ] + st = WriterState({ + "counter": 8 + }) + ct = session.session_component_tree + e = evaluator.Evaluator(st, ct) + evaluated = e.evaluate_field(instance_path, "text") + assert evaluated == "The counter is 8" + + def test_evaluate_field_repeater(self) -> None: + instance_path_base = [ + {"componentId": "root", "instanceNumber": 0}, + {"componentId": "4b6f14b0-b2d9-43e7-8aba-8d3e939c1f83", "instanceNumber": 0}, + {"componentId": "f811ca14-8915-443d-8dd3-77ae69fb80f4", "instanceNumber": 0} + ] + instance_path_0 = instance_path_base + [ + {"componentId": "2e688107-f865-419b-a07b-95103197e3fd", "instanceNumber": 0} + ] + instance_path_2 = instance_path_base + [ + {"componentId": "2e688107-f865-419b-a07b-95103197e3fd", "instanceNumber": 2} + ] + st = WriterState({ + "prog_languages": { + "c": "C", + "py": "Python", + "js": "JavaScript", + "ts": "TypeScript" + } + }) + ct = session.session_component_tree + e = evaluator.Evaluator(st, ct) + assert e.evaluate_field( + instance_path_0, "text") == "The id is c and the name is C" + assert e.evaluate_field( + instance_path_2, "text") == "The id is js and the name is JavaScript" + + def test_set_state(self) -> None: + instance_path = [ + {"componentId": "root", "instanceNumber": 0} + ] + st = WriterState(raw_state_dict) + ct = session.session_component_tree + e = evaluator.Evaluator(st, ct) + e.set_state("name", instance_path, "Roger") + e.set_state("dynamic_prop", instance_path, "height") + e.set_state("features[dynamic_prop]", instance_path, "toddler height") + e.set_state("features.new_feature", instance_path, "blue") + assert st["name"] == "Roger" + assert st["features"]["height"] == "toddler height" + assert st["features"]["new_feature"] == "blue" + + def test_evaluate_expression(self) -> None: + instance_path = [ + {"componentId": "root", "instanceNumber": 0} + ] + st = WriterState(raw_state_dict) + ct = session.session_component_tree + e = evaluator.Evaluator(st, ct) + assert e.evaluate_expression("features.eyes", instance_path) == "green" + assert e.evaluate_expression("best_feature", instance_path) == "eyes" + assert e.evaluate_expression("features[best_feature]", instance_path) == "green" + assert e.evaluate_expression("a\.b", instance_path) == 3 + + def test_get_context_data_should_return_the_target_of_event(self) -> None: + """ + Test that the target of the event is correctly returned by the get_context_data method + + Here we reproduce a click on a button + """ + # Given + st = WriterState({}) + ct = core_ui_fixtures.build_fake_component_tree([ + Component(id="button1", parentId="root", type="button") + ], init_root=True) + + e = evaluator.Evaluator(st, ct) + + # When + context = e.get_context_data([ + {"componentId": "root", "instanceNumber": 0}, + {"componentId": "button1", "instanceNumber": 0} + ]) + + # Then + assert context.get("target") == "button1" + + def test_get_context_data_should_return_the_repeater_position_and_the_target_inside_the_repeater(self) -> None: + """ + Test that the repeater position and target of the event is correctly returned by the get_context_data method + + Here we reproduce a click on a button + """ + # Given + st = WriterState({}) + ct = core_ui_fixtures.build_fake_component_tree([ + Component(id="repeater1", parentId="root", type="repeater", content={'keyVariable': 'item', 'valueVariable': 'value', 'repeaterObject': json.dumps({'a': 'A', 'b': 'B'})}), + Component(id="button1", parentId="repeater1", type="button") + ], init_root=True) + + e = evaluator.Evaluator(st, ct) + + # When + context = e.get_context_data([ + {"componentId": "root", "instanceNumber": 0}, + {"componentId": "repeater1", "instanceNumber": 0}, + {"componentId": "button1", "instanceNumber": 1} + ]) + + # Then + assert context.get("target") == "button1" + assert context.get("item") == "b" + assert context.get("value") == "B" \ No newline at end of file From f3d020eff059daec12064532e1d2971ea9c928ff Mon Sep 17 00:00:00 2001 From: Ramiro Medina <64783088+ramedina86@users.noreply.github.com> Date: Tue, 19 Nov 2024 09:30:17 +0000 Subject: [PATCH 03/20] fix: Whitespace --- src/writer/core.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/writer/core.py b/src/writer/core.py index 026eaed9f..ffe776c68 100644 --- a/src/writer/core.py +++ b/src/writer/core.py @@ -1598,7 +1598,7 @@ def fn(payload, context, session): if workflow_key: self.workflow_runner.run_workflow_by_key( workflow_key, execution_environment) elif workflow_id: - self.workflow_runner.run_workflow(workflow_id, execution_environment) + self.workflow_runner.run_workflow(workflow_id, execution_environment) return fn def _get_handler_callable(self, target_component: Component, event_type: str) -> Optional[Callable]: From ef32f7587317d4a56cd0f28ff4ac96630d6ef69b Mon Sep 17 00:00:00 2001 From: Ramiro Medina <64783088+ramedina86@users.noreply.github.com> Date: Tue, 19 Nov 2024 09:31:58 +0000 Subject: [PATCH 04/20] chore: Workflows refactor --- src/writer/workflows.py | 314 +++++++++++++++++++++++----------------- 1 file changed, 183 insertions(+), 131 deletions(-) diff --git a/src/writer/workflows.py b/src/writer/workflows.py index e3266edc4..c1cc6b7ed 100644 --- a/src/writer/workflows.py +++ b/src/writer/workflows.py @@ -1,142 +1,194 @@ import time -from typing import Any, Dict, List, Literal, Optional, Tuple +from typing import TYPE_CHECKING, Any, Dict, List, Literal, Optional, Tuple, Type import writer.core -import writer.workflows_blocks -from writer.core_ui import Component from writer.ss_types import WorkflowExecutionLog -from writer.workflows_blocks.blocks import WorkflowBlock - - -def _get_workflow_nodes(component_id): - return writer.core.base_component_tree.get_descendents(component_id) - -def run_workflow_by_key(session, workflow_key: str, execution_env: Dict): - all_components = writer.core.base_component_tree.components.values() - workflows = list(filter(lambda c: c.type == "workflows_workflow" and c.content.get("key") == workflow_key, all_components)) - if len(workflows) == 0: - return - workflow = workflows[0] - - return run_workflow(session, workflow.id, execution_env) - - -def run_workflow(session, component_id: str, execution_env: Dict): - execution: Dict[str, WorkflowBlock] = {} - nodes = _get_workflow_nodes(component_id) - return_value = None - try: - for node in get_terminal_nodes(nodes): - run_node(node, nodes, execution, session, execution_env) - for component_id, tool in execution.items(): - if tool and tool.return_value is not None: - return_value = tool.return_value - except BaseException as e: - _generate_run_log(session, execution, "Failed workflow execution", "error") - raise e - else: - _generate_run_log(session, execution, "Workflow execution", "info", return_value) - return return_value - -def _generate_run_log(session: "writer.core.WriterSession", execution: Dict[str, WorkflowBlock], title: str, entry_type: Literal["info", "error"], return_value: Optional[Any] = None): - if not writer.core.Config.is_mail_enabled_for_log: - return - exec_log:WorkflowExecutionLog = WorkflowExecutionLog(summary=[]) - for component_id, tool in execution.items(): - exec_log.summary.append({ - "componentId": component_id, - "outcome": tool.outcome, - "result": tool.result, - "returnValue": tool.return_value, - "executionEnvironment": tool.execution_env, - "executionTimeInSeconds": tool.execution_time_in_seconds - }) - msg = "Execution finished." - state = session.session_state - state.add_log_entry(entry_type, title, msg, workflow_execution=exec_log) - - -def get_terminal_nodes(nodes): - return [node for node in nodes if not node.outs] - -def _get_node_dependencies(target_node: "Component", nodes: List["Component"]): - dependencies:List[Tuple] = [] - parent_id = target_node.parentId - if not parent_id: - return [] - for node in nodes: - if not node.outs: - continue - for out in node.outs: - to_node_id = out.get("toNodeId") - out_id = out.get("outId") - if to_node_id == target_node.id: - dependencies.append((node, out_id)) - return dependencies - - -def get_branch_nodes(root_node_id: str): - root_node = writer.core.base_component_tree.get_component(root_node_id) - if not root_node: - return [] - branch_nodes = [root_node] - if not root_node.outs: - return branch_nodes - for out in root_node.outs: - branch_nodes += get_branch_nodes(out.get("toNodeId")) - return branch_nodes - +from typing import Dict, List +from writer.ss_types import InstancePath + +WorkflowBlock_T = Type["WorkflowBlock"] +block_map:Dict[str, WorkflowBlock_T] = {} + +if TYPE_CHECKING: + from writer.core import WriterSession, Component, Config + +class WorkflowRunner(): + + def __init__(self, session: "WriterSession"): + import writer.blocks + + self.execution: Dict[str, WorkflowBlock] = {} + self.state = session.session_state + self.component_tree = session.session_component_tree + self.evaluator = writer.evaluator.Evaluator(session) + + def run_workflow_by_key(self, workflow_key: str, execution_environment: Dict = {}): + all_components = self.component_tree.components.values() + workflows = list(filter(lambda c: c.type == "workflows_workflow" and c.content.get("key") == workflow_key, all_components)) + if len(workflows) == 0: + raise ValueError(f'Workflow with key "{workflow_key}" not found.') + workflow = workflows[0] + return self.run_workflow(workflow.id, execution_environment) + + def _get_workflow_nodes(self, component_id): + return self.component_tree.get_descendents(component_id) + + def _get_branch_nodes(self, base_component_id: str, base_outcome: str): + base_component = self.component_tree.get_component(base_component_id) + if not base_component: + raise RuntimeError(f'Cannot obtain branch. Could not find component "{base_component_id}".') + outs = base_component.outs + nodes:List["Component"] = [] + if not outs: + return nodes + for out in outs: + if out.get("outId") == base_outcome: + component_id = out.get("toNodeId") + component = self.component_tree.get_component(component_id) + if not component: + continue + nodes.append(component) + return nodes + + def run_branch(self, base_component_id: str, base_outcome: str, execution_environment: Dict): + nodes = self._get_branch_nodes(base_component_id, base_outcome) + return self.run_nodes(nodes, execution_environment) + + def run_workflow(self, component_id: str, execution_environment): + nodes = self._get_workflow_nodes(component_id) + return self.run_nodes(nodes, execution_environment) + + def run_nodes(self, nodes: List["Component"], execution_environment: Dict): + execution: Dict[str, WorkflowBlock] = {} + return_value = None + try: + for node in self.get_terminal_nodes(nodes): + self.run_node(node, nodes, execution_environment) + for tool in execution.values(): + if tool and tool.return_value is not None: + return_value = tool.return_value + except BaseException as e: + self._generate_run_log("Failed workflow execution", "error") + raise e + else: + self._generate_run_log("Workflow execution", "info", return_value) + return return_value -def _is_outcome_managed(target_node: "Component", target_out_id: str): - if not target_node.outs: + def _generate_run_log(self, title: str, entry_type: Literal["info", "error"], return_value: Optional[Any] = None): + if not writer.core.Config.is_mail_enabled_for_log: + return + exec_log:WorkflowExecutionLog = WorkflowExecutionLog(summary=[]) + for component_id, tool in self.execution.items(): + exec_log.summary.append({ + "componentId": component_id, + "outcome": tool.outcome, + "result": tool.result, + "returnValue": tool.return_value, + "executionEnvironment": tool.execution_environment, + "executionTimeInSeconds": tool.execution_time_in_seconds + }) + msg = "Execution finished." + self.state.add_log_entry(entry_type, title, msg, workflow_execution=exec_log) + + + def get_terminal_nodes(self, nodes): + return [node for node in nodes if not node.outs] + + def _get_node_dependencies(self, target_node: "Component", nodes: List["Component"]): + dependencies:List[Tuple] = [] + parent_id = target_node.parentId + if not parent_id: + return [] + for node in nodes: + if not node.outs: + continue + for out in node.outs: + to_node_id = out.get("toNodeId") + out_id = out.get("outId") + if to_node_id == target_node.id: + dependencies.append((node, out_id)) + return dependencies + + def _is_outcome_managed(self, target_node: "Component", target_out_id: str): + if not target_node.outs: + return False + for out in target_node.outs: + if out.get("outId") == target_out_id: + return True return False - for out in target_node.outs: - if out.get("outId") == target_out_id: - return True - return False - -def run_node(target_node: "Component", nodes: List["Component"], execution: Dict, session: "writer.core.WriterSession", execution_env: Dict): - tool_class = writer.workflows_blocks.blocks.block_map.get(target_node.type) - if not tool_class: - raise RuntimeError(f"Couldn't find tool for {target_node.type}.") - dependencies = _get_node_dependencies(target_node, nodes) - - tool = execution.get(target_node.id) - if tool: - return tool - - result = None - matched_dependencies = 0 - for node, out_id in dependencies: - tool = run_node(node, nodes, execution, session, execution_env) - if not tool: - continue - if tool.outcome == out_id: - matched_dependencies += 1 - result = tool.result - if tool.return_value is not None: + def run_node(self, target_node: "Component", nodes: List["Component"], execution_environment: Dict): + tool_class = block_map.get(target_node.type) + if not tool_class: + raise RuntimeError(f'Could not find tool for "{target_node.type}".') + dependencies = self._get_node_dependencies(target_node, nodes) + + tool = self.execution.get(target_node.id) + if tool: + return tool + + result = None + matched_dependencies = 0 + for node, out_id in dependencies: + tool = self.run_node(node, nodes, execution_environment) + if not tool: + continue + if tool.outcome == out_id: + matched_dependencies += 1 + result = tool.result + if tool.return_value is not None: + return + + if len(dependencies) > 0 and matched_dependencies == 0: return - if len(dependencies) > 0 and matched_dependencies == 0: - return + expanded_execution_environment = execution_environment | { + "result": result, + "results": { k:v.result for k,v in self.execution.items() } + } + tool = tool_class(target_node, self, expanded_execution_environment) + + try: + start_time = time.time() + tool.run() + tool.execution_time_in_seconds = time.time() - start_time + except BaseException as e: + if tool and not tool.result: + tool.result = repr(e) + if not tool.outcome or not self._is_outcome_managed(target_node, tool.outcome): + raise e + finally: + self.execution[target_node.id] = tool + + return tool - results = { - "result": result, - "results": { k:v.result for k,v in execution.items() } - } - tool = tool_class(target_node, execution, session, execution_env | results) - - try: - start_time = time.time() - tool.run() - tool.execution_time_in_seconds = time.time() - start_time - except BaseException as e: - if tool and not tool.result: - tool.result = repr(e) - if not tool.outcome or not _is_outcome_managed(target_node, tool.outcome): - raise e - finally: - execution[target_node.id] = tool +class WorkflowBlock: - return tool \ No newline at end of file + @classmethod + def register(cls, type: str): + block_map[type] = cls + + def __init__(self, component: "Component", runner: WorkflowRunner, execution_environment: Dict): + self.outcome = None + self.component = component + self.runner = runner + self.execution_time_in_seconds = -1.0 + self.execution_environment = execution_environment + self.result:Any = None + self.return_value = None + self.instance_path: InstancePath = [{"componentId": self.component.id, "instanceNumber": 0}] + + def _get_field(self, field_key: str, as_json=False, default_field_value=None): + if default_field_value is None: + if as_json: + default_field_value = "{}" + else: + default_field_value = "" + value = self.runner.evaluator.evaluate_field(self.instance_path, field_key, as_json, default_field_value, self.execution_environment) + return value + + def _set_state(self, expr: str, value: Any): + self.runner.evaluator.set_state(expr, self.instance_path, value, base_context=self.execution_environment) + + def run(self): + pass \ No newline at end of file From 0d97d09890c4e9bdbe31ff7ae7330fdc44248b11 Mon Sep 17 00:00:00 2001 From: Ramiro Medina <64783088+ramedina86@users.noreply.github.com> Date: Tue, 19 Nov 2024 09:32:20 +0000 Subject: [PATCH 05/20] chore: Workflows refactor --- src/writer/blocks/__init__.py | 31 +++++++++++ .../addtostatelist.py | 6 +-- .../calleventhandler.py | 4 +- .../{workflows_blocks => blocks}/foreach.py | 13 ++--- .../httprequest.py | 2 +- .../logmessage.py | 2 +- .../{workflows_blocks => blocks}/parsejson.py | 2 +- .../returnvalue.py | 2 +- .../runworkflow.py | 6 +-- .../{workflows_blocks => blocks}/setstate.py | 4 +- .../writeraddchatmessage.py | 6 +-- .../writerchat.py | 39 +++++--------- .../writerclassification.py | 2 +- .../writercompletion.py | 2 +- .../writerinitchat.py | 10 ++-- .../writernocodeapp.py | 4 +- src/writer/workflows_blocks/__init__.py | 31 ----------- src/writer/workflows_blocks/blocks.py | 53 ------------------- 18 files changed, 77 insertions(+), 142 deletions(-) create mode 100644 src/writer/blocks/__init__.py rename src/writer/{workflows_blocks => blocks}/addtostatelist.py (86%) rename src/writer/{workflows_blocks => blocks}/calleventhandler.py (96%) rename src/writer/{workflows_blocks => blocks}/foreach.py (84%) rename src/writer/{workflows_blocks => blocks}/httprequest.py (98%) rename src/writer/{workflows_blocks => blocks}/logmessage.py (97%) rename src/writer/{workflows_blocks => blocks}/parsejson.py (96%) rename src/writer/{workflows_blocks => blocks}/returnvalue.py (96%) rename src/writer/{workflows_blocks => blocks}/runworkflow.py (91%) rename src/writer/{workflows_blocks => blocks}/setstate.py (91%) rename src/writer/{workflows_blocks => blocks}/writeraddchatmessage.py (89%) rename src/writer/{workflows_blocks => blocks}/writerchat.py (74%) rename src/writer/{workflows_blocks => blocks}/writerclassification.py (97%) rename src/writer/{workflows_blocks => blocks}/writercompletion.py (97%) rename src/writer/{workflows_blocks => blocks}/writerinitchat.py (84%) rename src/writer/{workflows_blocks => blocks}/writernocodeapp.py (96%) delete mode 100644 src/writer/workflows_blocks/__init__.py delete mode 100644 src/writer/workflows_blocks/blocks.py diff --git a/src/writer/blocks/__init__.py b/src/writer/blocks/__init__.py new file mode 100644 index 000000000..c08e77125 --- /dev/null +++ b/src/writer/blocks/__init__.py @@ -0,0 +1,31 @@ +from writer.blocks.addtostatelist import AddToStateList +from writer.blocks.calleventhandler import CallEventHandler +from writer.blocks.foreach import ForEach +from writer.blocks.httprequest import HTTPRequest +from writer.blocks.logmessage import LogMessage +from writer.blocks.parsejson import ParseJSON +from writer.blocks.returnvalue import ReturnValue +from writer.blocks.runworkflow import RunWorkflow +from writer.blocks.setstate import SetState +from writer.blocks.writeraddchatmessage import WriterAddChatMessage +from writer.blocks.writerchat import WriterChat +from writer.blocks.writerclassification import WriterClassification +from writer.blocks.writercompletion import WriterCompletion +from writer.blocks.writerinitchat import WriterInitChat +from writer.blocks.writernocodeapp import WriterNoCodeApp + +SetState.register("workflows_setstate") +WriterClassification.register("workflows_writerclassification") +WriterCompletion.register("workflows_writercompletion") +HTTPRequest.register("workflows_httprequest") +RunWorkflow.register("workflows_runworkflow") +WriterNoCodeApp.register("workflows_writernocodeapp") +ForEach.register("workflows_foreach") +LogMessage.register("workflows_logmessage") +WriterChat.register("workflows_writerchat") +WriterAddChatMessage.register("workflows_writeraddchatmessage") +ParseJSON.register("workflows_parsejson") +CallEventHandler.register("workflows_calleventhandler") +AddToStateList.register("workflows_addtostatelist") +ReturnValue.register("workflows_returnvalue") +WriterInitChat.register("workflows_writerinitchat") \ No newline at end of file diff --git a/src/writer/workflows_blocks/addtostatelist.py b/src/writer/blocks/addtostatelist.py similarity index 86% rename from src/writer/workflows_blocks/addtostatelist.py rename to src/writer/blocks/addtostatelist.py index f2b3e0f97..37225c47c 100644 --- a/src/writer/workflows_blocks/addtostatelist.py +++ b/src/writer/blocks/addtostatelist.py @@ -1,6 +1,6 @@ from writer.abstract import register_abstract_template from writer.ss_types import AbstractTemplate -from writer.workflows_blocks.blocks import WorkflowBlock +from writer.workflows import WorkflowBlock class AddToStateList(WorkflowBlock): @@ -45,7 +45,7 @@ def run(self): element_expr = self._get_field("element") value = self._get_field("value") - element = self.evaluator.evaluate_expression(element_expr, self.instance_path, self.execution_env) + element = self.runner.evaluator.evaluate_expression(element_expr, self.instance_path, self.execution_environment) if not element: element = [] @@ -53,7 +53,7 @@ def run(self): element = [element] element.append(value) - self.evaluator.set_state(element_expr, self.instance_path, element, base_context=self.execution_env) + self._set_state(element_expr, element) self.outcome = "success" except BaseException as e: self.outcome = "error" diff --git a/src/writer/workflows_blocks/calleventhandler.py b/src/writer/blocks/calleventhandler.py similarity index 96% rename from src/writer/workflows_blocks/calleventhandler.py rename to src/writer/blocks/calleventhandler.py index 62798494f..d3decbd51 100644 --- a/src/writer/workflows_blocks/calleventhandler.py +++ b/src/writer/blocks/calleventhandler.py @@ -4,7 +4,7 @@ import writer.workflows from writer.abstract import register_abstract_template from writer.ss_types import AbstractTemplate -from writer.workflows_blocks.blocks import WorkflowBlock +from writer.workflows import WorkflowBlock class CallEventHandler(WorkflowBlock): @@ -58,7 +58,7 @@ def run(self): args = { "state": self.session.session_state, - "context": self.execution_env, + "context": self.execution_environment, "session": writer.core._event_handler_session_info(), } | additional_args diff --git a/src/writer/workflows_blocks/foreach.py b/src/writer/blocks/foreach.py similarity index 84% rename from src/writer/workflows_blocks/foreach.py rename to src/writer/blocks/foreach.py index 183064abf..e5a67348f 100644 --- a/src/writer/workflows_blocks/foreach.py +++ b/src/writer/blocks/foreach.py @@ -1,7 +1,7 @@ import writer.workflows from writer.abstract import register_abstract_template from writer.ss_types import AbstractTemplate -from writer.workflows_blocks.blocks import WorkflowBlock +from writer.workflows import WorkflowBlock class ForEach(WorkflowBlock): @@ -52,22 +52,23 @@ def register(cls, type: str): } )) - def _run_workflow_for_item(self, workflow_key, base_execution_env, item_id, item): - return writer.workflows.run_workflow_by_key(self.session, workflow_key, base_execution_env | { "itemId": item_id, "item": item }) + def _run_workflow_for_item(self, workflow_key, base_execution_environment, item_id, item): + expanded_execution_environment = base_execution_environment | { "itemId": item_id, "item": item } + return self.runner.run_workflow_by_key(workflow_key, expanded_execution_environment) def run(self): try: workflow_key = self._get_field("workflowKey") items = self._get_field("items", as_json=True) - base_execution_env = self._get_field("executionEnv", as_json=True) + base_execution_environment = self._get_field("executionEnv", as_json=True) std_items = items result = None if isinstance(items, list): std_items = enumerate(std_items, 0) - result = [self._run_workflow_for_item(workflow_key, base_execution_env, item_id, item) for item_id, item in std_items] + result = [self._run_workflow_for_item(workflow_key, base_execution_environment, item_id, item) for item_id, item in std_items] elif isinstance(items, dict): std_items = items.items() - result = {item_id:self._run_workflow_for_item(workflow_key, base_execution_env, item_id, item) for item_id, item in std_items} + result = {item_id:self._run_workflow_for_item(workflow_key, base_execution_environment, item_id, item) for item_id, item in std_items} self.result = result self.outcome = "success" diff --git a/src/writer/workflows_blocks/httprequest.py b/src/writer/blocks/httprequest.py similarity index 98% rename from src/writer/workflows_blocks/httprequest.py rename to src/writer/blocks/httprequest.py index aa77f4977..6936ba2ca 100644 --- a/src/writer/workflows_blocks/httprequest.py +++ b/src/writer/blocks/httprequest.py @@ -2,7 +2,7 @@ from writer.abstract import register_abstract_template from writer.ss_types import AbstractTemplate -from writer.workflows_blocks.blocks import WorkflowBlock +from writer.workflows import WorkflowBlock class HTTPRequest(WorkflowBlock): diff --git a/src/writer/workflows_blocks/logmessage.py b/src/writer/blocks/logmessage.py similarity index 97% rename from src/writer/workflows_blocks/logmessage.py rename to src/writer/blocks/logmessage.py index 2cba940e5..72186b4f4 100644 --- a/src/writer/workflows_blocks/logmessage.py +++ b/src/writer/blocks/logmessage.py @@ -1,7 +1,7 @@ import writer.workflows from writer.abstract import register_abstract_template from writer.ss_types import AbstractTemplate -from writer.workflows_blocks.blocks import WorkflowBlock +from writer.workflows import WorkflowBlock class LogMessage(WorkflowBlock): diff --git a/src/writer/workflows_blocks/parsejson.py b/src/writer/blocks/parsejson.py similarity index 96% rename from src/writer/workflows_blocks/parsejson.py rename to src/writer/blocks/parsejson.py index 784d1427a..af8ffae94 100644 --- a/src/writer/workflows_blocks/parsejson.py +++ b/src/writer/blocks/parsejson.py @@ -2,7 +2,7 @@ from writer.abstract import register_abstract_template from writer.ss_types import AbstractTemplate -from writer.workflows_blocks.blocks import WorkflowBlock +from writer.workflows import WorkflowBlock class ParseJSON(WorkflowBlock): diff --git a/src/writer/workflows_blocks/returnvalue.py b/src/writer/blocks/returnvalue.py similarity index 96% rename from src/writer/workflows_blocks/returnvalue.py rename to src/writer/blocks/returnvalue.py index 6770565cd..b8bd112d5 100644 --- a/src/writer/workflows_blocks/returnvalue.py +++ b/src/writer/blocks/returnvalue.py @@ -1,7 +1,7 @@ import writer.workflows from writer.abstract import register_abstract_template from writer.ss_types import AbstractTemplate -from writer.workflows_blocks.blocks import WorkflowBlock +from writer.workflows import WorkflowBlock class ReturnValue(WorkflowBlock): diff --git a/src/writer/workflows_blocks/runworkflow.py b/src/writer/blocks/runworkflow.py similarity index 91% rename from src/writer/workflows_blocks/runworkflow.py rename to src/writer/blocks/runworkflow.py index 247e2c236..4444944ae 100644 --- a/src/writer/workflows_blocks/runworkflow.py +++ b/src/writer/blocks/runworkflow.py @@ -1,7 +1,7 @@ import writer.workflows from writer.abstract import register_abstract_template from writer.ss_types import AbstractTemplate -from writer.workflows_blocks.blocks import WorkflowBlock +from writer.workflows import WorkflowBlock class RunWorkflow(WorkflowBlock): @@ -46,9 +46,9 @@ def register(cls, type: str): def run(self): try: workflow_key = self._get_field("workflowKey") - execution_env = self._get_field("executionEnv", as_json=True) + execution_environment = self._get_field("executionEnv", as_json=True) - return_value = writer.workflows.run_workflow_by_key(self.session, workflow_key, execution_env) + return_value = writer.workflows.run_workflow_by_key(workflow_key, execution_environment) self.result = return_value self.outcome = "success" except BaseException as e: diff --git a/src/writer/workflows_blocks/setstate.py b/src/writer/blocks/setstate.py similarity index 91% rename from src/writer/workflows_blocks/setstate.py rename to src/writer/blocks/setstate.py index 389c8a224..2ffae35b6 100644 --- a/src/writer/workflows_blocks/setstate.py +++ b/src/writer/blocks/setstate.py @@ -1,6 +1,6 @@ from writer.abstract import register_abstract_template from writer.ss_types import AbstractTemplate -from writer.workflows_blocks.blocks import WorkflowBlock +from writer.workflows import WorkflowBlock class SetState(WorkflowBlock): @@ -44,7 +44,7 @@ def run(self): try: element = self._get_field("element") value = self._get_field("value") - self.evaluator.set_state(element, self.instance_path, value, base_context=self.execution_env) + self._set_state(element, value) self.result = value self.outcome = "success" except BaseException as e: diff --git a/src/writer/workflows_blocks/writeraddchatmessage.py b/src/writer/blocks/writeraddchatmessage.py similarity index 89% rename from src/writer/workflows_blocks/writeraddchatmessage.py rename to src/writer/blocks/writeraddchatmessage.py index 1d9a9f235..277edce4c 100644 --- a/src/writer/workflows_blocks/writeraddchatmessage.py +++ b/src/writer/blocks/writeraddchatmessage.py @@ -1,6 +1,6 @@ from writer.abstract import register_abstract_template from writer.ss_types import AbstractTemplate -from writer.workflows_blocks.blocks import WorkflowBlock +from writer.workflows import WorkflowBlock class WriterAddChatMessage(WorkflowBlock): @@ -47,7 +47,7 @@ def run(self): conversation_state_element = self._get_field("conversationStateElement") message = self._get_field("message", as_json=True) - conversation = self.evaluator.evaluate_expression(conversation_state_element, self.instance_path, self.execution_env) + conversation = self.runner.evaluator.evaluate_expression(conversation_state_element, self.instance_path, self.execution_environment) if conversation is None or not isinstance(conversation, writer.ai.Conversation): self.result = "The state element specified doesn't contain a conversation. Initialize one using the block 'Initialize chat'." @@ -70,7 +70,7 @@ def run(self): conversation += message - self.evaluator.set_state(conversation_state_element, self.instance_path, conversation, base_context=self.execution_env) + self._set_state(conversation_state_element, conversation) self.result = "Success" self.outcome = "success" except BaseException as e: diff --git a/src/writer/workflows_blocks/writerchat.py b/src/writer/blocks/writerchat.py similarity index 74% rename from src/writer/workflows_blocks/writerchat.py rename to src/writer/blocks/writerchat.py index 02afe9558..f0e35185f 100644 --- a/src/writer/workflows_blocks/writerchat.py +++ b/src/writer/blocks/writerchat.py @@ -1,7 +1,7 @@ import writer.workflows from writer.abstract import register_abstract_template from writer.ss_types import AbstractTemplate -from writer.workflows_blocks.blocks import WorkflowBlock +from writer.workflows import WorkflowBlock class WriterChat(WorkflowBlock): @@ -58,28 +58,17 @@ def register(cls, type: str): } )) - def run_branch(self, outcome: str, **args): - branch_root_nodes = self._get_nodes_at_outcome(outcome) - return_value = None - for branch_root_node in branch_root_nodes: - branch_nodes = writer.workflows.get_branch_nodes(branch_root_node.id) - terminal_nodes = writer.workflows.get_terminal_nodes(branch_nodes) - - for terminal_node in terminal_nodes: - tool = writer.workflows.run_node(terminal_node, branch_nodes, self.execution, self.session, self.execution_env | args) - if tool and tool.return_value: - return_value = tool.return_value - - if return_value is None: - self.result = f"No value has been returned for the outcome branch '{outcome}'. Use the block 'Return value' to specify one." - self.outcome = "error" - raise ValueError("No value available") - - return repr(return_value) - def _make_callable(self, tool_name: str): def callable(**args): - return self.run_branch(f"tools_{tool_name}", **args) + expanded_execution_environment = self.execution_environment | args + return_value = self.run_branch(self.component.id, f"tools_{tool_name}", expanded_execution_environment) + + if return_value is None: + self.result = f'No value has been returned for the outcome branch "{tool_name}". Use the block "Return value" to specify one.' + self.outcome = "error" + raise ValueError(f'No value has been returned for the outcome branch "{tool_name}".') + + return return_value return callable def run(self): @@ -111,7 +100,7 @@ def run(self): continue tools.append(tool) - conversation = self.evaluator.evaluate_expression(conversation_state_element, self.instance_path, self.execution_env) + conversation = self.runner.evaluator.evaluate_expression(conversation_state_element, self.instance_path, self.execution_environment) if conversation is None or not isinstance(conversation, writer.ai.Conversation): self.result = "The state element specified doesn't contain a conversation. Initialize one using the block 'Initialize chat'." @@ -123,21 +112,21 @@ def run(self): if not use_streaming: msg = conversation.complete(tools=tools) conversation += msg - self.evaluator.set_state(conversation_state_element, self.instance_path, conversation, base_context=self.execution_env) + self._set_state(conversation_state_element, conversation) else: for chunk in conversation.stream_complete(tools=tools): if chunk.get("content") is None: chunk["content"] = "" msg += chunk.get("content") conversation += chunk - self.evaluator.set_state(conversation_state_element, self.instance_path, conversation, base_context=self.execution_env) + self._set_state(conversation_state_element, conversation) except BaseException: msg = { "role": "assistant", "content": "Couldn't process the request." } conversation += msg - self.evaluator.set_state(conversation_state_element, self.instance_path, conversation, base_context=self.execution_env) + self._set_state(conversation_state_element, conversation) if not self.outcome: self.result = msg diff --git a/src/writer/workflows_blocks/writerclassification.py b/src/writer/blocks/writerclassification.py similarity index 97% rename from src/writer/workflows_blocks/writerclassification.py rename to src/writer/blocks/writerclassification.py index b9f9f7826..61002503a 100644 --- a/src/writer/workflows_blocks/writerclassification.py +++ b/src/writer/blocks/writerclassification.py @@ -2,7 +2,7 @@ from writer.abstract import register_abstract_template from writer.ss_types import AbstractTemplate -from writer.workflows_blocks.blocks import WorkflowBlock +from writer.workflows import WorkflowBlock class WriterClassification(WorkflowBlock): diff --git a/src/writer/workflows_blocks/writercompletion.py b/src/writer/blocks/writercompletion.py similarity index 97% rename from src/writer/workflows_blocks/writercompletion.py rename to src/writer/blocks/writercompletion.py index 213dc7eb7..bfd13bb4d 100644 --- a/src/writer/workflows_blocks/writercompletion.py +++ b/src/writer/blocks/writercompletion.py @@ -1,6 +1,6 @@ from writer.abstract import register_abstract_template from writer.ss_types import AbstractTemplate -from writer.workflows_blocks.blocks import WorkflowBlock +from writer.workflows import WorkflowBlock DEFAULT_MODEL = "palmyra-x-003-instruct" diff --git a/src/writer/workflows_blocks/writerinitchat.py b/src/writer/blocks/writerinitchat.py similarity index 84% rename from src/writer/workflows_blocks/writerinitchat.py rename to src/writer/blocks/writerinitchat.py index 68c52a8ff..a57b83e1d 100644 --- a/src/writer/workflows_blocks/writerinitchat.py +++ b/src/writer/blocks/writerinitchat.py @@ -1,6 +1,6 @@ from writer.abstract import register_abstract_template from writer.ss_types import AbstractTemplate -from writer.workflows_blocks.blocks import WorkflowBlock +from writer.workflows import WorkflowBlock DEFAULT_MODEL = "palmyra-x-004" @@ -14,7 +14,7 @@ def register(cls, type: str): baseType="workflows_node", writer={ "name": "Initialize chat", - "description": "If it doesn't already exist, initializes a conversation for Chat Completion", + "description": "If it doesn't already exist, initializes a conversation for Chat completion", "category": "Writer", "fields": { "conversationStateElement": { @@ -57,10 +57,10 @@ def run(self): model_id = self._get_field("modelId", False, default_field_value=DEFAULT_MODEL) config = { "temperature": temperature, "model": model_id} - conversation = self.evaluator.evaluate_expression(conversation_state_element, self.instance_path, self.execution_env) + conversation = self.runner.evaluator.evaluate_expression(conversation_state_element, self.instance_path, self.execution_environment) if conversation is not None and not isinstance(conversation, writer.ai.Conversation): - self.result = f"The state element specified doesn't contain a Conversation. A value of type {type(conversation)} was found." + self.result = f'The state element specified does not contain a Conversation. A value of type "{type(conversation)}" was found.' self.outcome = "error" return elif conversation is not None: @@ -69,7 +69,7 @@ def run(self): return conversation = writer.ai.Conversation(config=config) - self.evaluator.set_state(conversation_state_element, self.instance_path, conversation, base_context=self.execution_env) + self._set_state(conversation_state_element, conversation) self.result = None self.outcome = "success" except BaseException as e: diff --git a/src/writer/workflows_blocks/writernocodeapp.py b/src/writer/blocks/writernocodeapp.py similarity index 96% rename from src/writer/workflows_blocks/writernocodeapp.py rename to src/writer/blocks/writernocodeapp.py index 8fb55a0b9..87d0689e5 100644 --- a/src/writer/workflows_blocks/writernocodeapp.py +++ b/src/writer/blocks/writernocodeapp.py @@ -1,8 +1,6 @@ -import json - from writer.abstract import register_abstract_template from writer.ss_types import AbstractTemplate -from writer.workflows_blocks.blocks import WorkflowBlock +from writer.workflows import WorkflowBlock class WriterNoCodeApp(WorkflowBlock): diff --git a/src/writer/workflows_blocks/__init__.py b/src/writer/workflows_blocks/__init__.py deleted file mode 100644 index b6f73b483..000000000 --- a/src/writer/workflows_blocks/__init__.py +++ /dev/null @@ -1,31 +0,0 @@ -from writer.workflows_blocks.addtostatelist import AddToStateList -from writer.workflows_blocks.calleventhandler import CallEventHandler -from writer.workflows_blocks.foreach import ForEach -from writer.workflows_blocks.httprequest import HTTPRequest -from writer.workflows_blocks.logmessage import LogMessage -from writer.workflows_blocks.parsejson import ParseJSON -from writer.workflows_blocks.returnvalue import ReturnValue -from writer.workflows_blocks.runworkflow import RunWorkflow -from writer.workflows_blocks.setstate import SetState -from writer.workflows_blocks.writeraddchatmessage import WriterAddChatMessage -from writer.workflows_blocks.writerchat import WriterChat -from writer.workflows_blocks.writerclassification import WriterClassification -from writer.workflows_blocks.writercompletion import WriterCompletion -from writer.workflows_blocks.writerinitchat import WriterInitChat -from writer.workflows_blocks.writernocodeapp import WriterNoCodeApp - -SetState.register("workflows_setstate") -WriterClassification.register("workflows_writerclassification") -WriterCompletion.register("workflows_writercompletion") -HTTPRequest.register("workflows_httprequest") -RunWorkflow.register("workflows_runworkflow") -WriterNoCodeApp.register("workflows_writernocodeapp") -ForEach.register("workflows_foreach") -LogMessage.register("workflows_logmessage") -WriterChat.register("workflows_writerchat") -WriterAddChatMessage.register("workflows_writeraddchatmessage") -ParseJSON.register("workflows_parsejson") -CallEventHandler.register("workflows_calleventhandler") -AddToStateList.register("workflows_addtostatelist") -ReturnValue.register("workflows_returnvalue") -WriterInitChat.register("workflows_writerinitchat") \ No newline at end of file diff --git a/src/writer/workflows_blocks/blocks.py b/src/writer/workflows_blocks/blocks.py deleted file mode 100644 index 3f7023d04..000000000 --- a/src/writer/workflows_blocks/blocks.py +++ /dev/null @@ -1,53 +0,0 @@ -from typing import Dict, List - -import writer.core -import writer.core_ui -import writer.workflows_blocks -from writer.ss_types import InstancePath - -block_map = {} - -class WorkflowBlock: - - @classmethod - def register(cls, type: str): - block_map[type] = cls - - def __init__(self, component: "writer.core_ui.Component", execution: Dict, session: "writer.core.WriterSession", execution_env: Dict): - self.outcome = None - self.component = component - self.execution = execution - self.execution_time_in_seconds = -1.0 - self.session = session - self.execution_env = execution_env - self.result = None - self.return_value = None - self.evaluator = writer.core.Evaluator(session.session_state, session.session_component_tree) - self.instance_path: InstancePath = [{"componentId": self.component.id, "instanceNumber": 0}] - - def _get_nodes_at_outcome(self, target_outcome: str): - outs = self.component.outs - nodes:List["writer.core_ui.Component"] = [] - if not outs: - return nodes - for out in outs: - if out.get("outId") == target_outcome: - component_id = out.get("toNodeId") - component = writer.core.base_component_tree.get_component(component_id) - if not component: - continue - nodes.append(component) - return nodes - - def _get_field(self, field_key: str, as_json=False, default_field_value=None): - if default_field_value is None: - if as_json: - default_field_value = "{}" - else: - default_field_value = "" - v = self.evaluator.evaluate_field(self.instance_path, field_key, base_context=self.execution_env, as_json=as_json, default_field_value=default_field_value) - - return v - - def run(self): - pass \ No newline at end of file From dcc1e534a12b2a280fa17430a6b7a56cca338dc4 Mon Sep 17 00:00:00 2001 From: Ramiro Medina <64783088+ramedina86@users.noreply.github.com> Date: Tue, 19 Nov 2024 09:34:59 +0000 Subject: [PATCH 06/20] fix: Imports / type checking --- src/writer/evaluator.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/src/writer/evaluator.py b/src/writer/evaluator.py index 7bd5ba8af..3a3e87937 100644 --- a/src/writer/evaluator.py +++ b/src/writer/evaluator.py @@ -7,12 +7,11 @@ InstancePath, ) + import writer.core if TYPE_CHECKING: - from writer.core import WriterState, WriterSession - from writer.core_ui import ComponentTree - + from writer.core import WriterSession class Evaluator: @@ -25,7 +24,7 @@ class Evaluator: def __init__(self, session: "WriterSession"): self.state = session.session_state - self.component_tree = session.component_tree + self.component_tree = session.session_component_tree self.serializer = writer.core.StateSerialiser() def evaluate_field(self, instance_path: InstancePath, field_key: str, as_json=False, default_field_value="", base_context={}) -> Any: From 396b781cf95331536840a883ca7b98dceb871d7c Mon Sep 17 00:00:00 2001 From: Ramiro Medina <64783088+ramedina86@users.noreply.github.com> Date: Tue, 19 Nov 2024 09:35:34 +0000 Subject: [PATCH 07/20] test: Fix tests --- tests/backend/blocks/test_parsejson.py | 5 ++++ tests/backend/test_core.py | 5 ++-- tests/backend/test_evaluator.py | 36 ++++++++++++-------------- 3 files changed, 23 insertions(+), 23 deletions(-) create mode 100644 tests/backend/blocks/test_parsejson.py diff --git a/tests/backend/blocks/test_parsejson.py b/tests/backend/blocks/test_parsejson.py new file mode 100644 index 000000000..eda8ced03 --- /dev/null +++ b/tests/backend/blocks/test_parsejson.py @@ -0,0 +1,5 @@ +from writer.blocks.parsejson import ParseJSON + +def test_valid_json(): + assert 1 == 1 + # tool = ParseJSON() \ No newline at end of file diff --git a/tests/backend/test_core.py b/tests/backend/test_core.py index ce19e01b4..b22567462 100644 --- a/tests/backend/test_core.py +++ b/tests/backend/test_core.py @@ -681,9 +681,8 @@ def test_unpickable_members(self) -> None: class TestEventDeserialiser: root_instance_path = [{"componentId": "root", "instanceNumber": 0}] - session_state = WriterState(raw_state_dict) - component_tree = session.session_component_tree - ed = EventDeserialiser(session_state, component_tree) + session.session_state = WriterState(raw_state_dict) + ed = EventDeserialiser(session) def test_unknown_no_payload(self) -> None: ev = WriterEvent( diff --git a/tests/backend/test_evaluator.py b/tests/backend/test_evaluator.py index 269c09b46..539d4d3ff 100644 --- a/tests/backend/test_evaluator.py +++ b/tests/backend/test_evaluator.py @@ -59,11 +59,10 @@ def test_evaluate_field_simple(self) -> None: {"componentId": "0cd59329-29c8-4887-beee-39794065221e", "instanceNumber": 0} ] - st = WriterState({ + session.session_state = WriterState({ "counter": 8 }) - ct = session.session_component_tree - e = evaluator.Evaluator(st, ct) + e = evaluator.Evaluator(session) evaluated = e.evaluate_field(instance_path, "text") assert evaluated == "The counter is 8" @@ -79,7 +78,7 @@ def test_evaluate_field_repeater(self) -> None: instance_path_2 = instance_path_base + [ {"componentId": "2e688107-f865-419b-a07b-95103197e3fd", "instanceNumber": 2} ] - st = WriterState({ + session.session_state = WriterState({ "prog_languages": { "c": "C", "py": "Python", @@ -87,8 +86,7 @@ def test_evaluate_field_repeater(self) -> None: "ts": "TypeScript" } }) - ct = session.session_component_tree - e = evaluator.Evaluator(st, ct) + e = evaluator.Evaluator(session) assert e.evaluate_field( instance_path_0, "text") == "The id is c and the name is C" assert e.evaluate_field( @@ -98,24 +96,22 @@ def test_set_state(self) -> None: instance_path = [ {"componentId": "root", "instanceNumber": 0} ] - st = WriterState(raw_state_dict) - ct = session.session_component_tree - e = evaluator.Evaluator(st, ct) + session.session_state = WriterState(raw_state_dict) + e = evaluator.Evaluator(session) e.set_state("name", instance_path, "Roger") e.set_state("dynamic_prop", instance_path, "height") e.set_state("features[dynamic_prop]", instance_path, "toddler height") e.set_state("features.new_feature", instance_path, "blue") - assert st["name"] == "Roger" - assert st["features"]["height"] == "toddler height" - assert st["features"]["new_feature"] == "blue" + assert session.session_state["name"] == "Roger" + assert session.session_state["features"]["height"] == "toddler height" + assert session.session_state["features"]["new_feature"] == "blue" def test_evaluate_expression(self) -> None: instance_path = [ {"componentId": "root", "instanceNumber": 0} ] - st = WriterState(raw_state_dict) - ct = session.session_component_tree - e = evaluator.Evaluator(st, ct) + session.session_state = WriterState(raw_state_dict) + e = evaluator.Evaluator(session) assert e.evaluate_expression("features.eyes", instance_path) == "green" assert e.evaluate_expression("best_feature", instance_path) == "eyes" assert e.evaluate_expression("features[best_feature]", instance_path) == "green" @@ -128,12 +124,12 @@ def test_get_context_data_should_return_the_target_of_event(self) -> None: Here we reproduce a click on a button """ # Given - st = WriterState({}) - ct = core_ui_fixtures.build_fake_component_tree([ + + session.session_component_tree = core_ui_fixtures.build_fake_component_tree([ Component(id="button1", parentId="root", type="button") ], init_root=True) - e = evaluator.Evaluator(st, ct) + e = evaluator.Evaluator(session) # When context = e.get_context_data([ @@ -152,12 +148,12 @@ def test_get_context_data_should_return_the_repeater_position_and_the_target_ins """ # Given st = WriterState({}) - ct = core_ui_fixtures.build_fake_component_tree([ + session.session_component_tree = core_ui_fixtures.build_fake_component_tree([ Component(id="repeater1", parentId="root", type="repeater", content={'keyVariable': 'item', 'valueVariable': 'value', 'repeaterObject': json.dumps({'a': 'A', 'b': 'B'})}), Component(id="button1", parentId="repeater1", type="button") ], init_root=True) - e = evaluator.Evaluator(st, ct) + e = evaluator.Evaluator(session) # When context = e.get_context_data([ From 9b5183b9529117fa8d6c41de53cb93d19d20ba5a Mon Sep 17 00:00:00 2001 From: Ramiro Medina <64783088+ramedina86@users.noreply.github.com> Date: Tue, 19 Nov 2024 10:37:45 +0100 Subject: [PATCH 08/20] Squashed commit of the following: commit 202406d858e8f8053795efb9b98c2a9d0567def1 Merge: f7bd8e7 f571e48 Author: Ramiro Medina <64783088+ramedina86@users.noreply.github.com> Date: Tue Nov 19 10:18:54 2024 +0100 Merge pull request #636 from mmikita95/fix-tool-calls-finish-processing fix: change logic for finish reason processing commit f7bd8e7a8e3454663951dd400426f248ec38645b Merge: b538673 0e379de Author: Ramiro Medina <64783088+ramedina86@users.noreply.github.com> Date: Tue Nov 19 09:55:16 2024 +0100 Merge pull request #637 from FabienArcellier/WF-80-clean-up-core chore: clean up core - WF-80 commit 0e379de40b3649655def3c2a207f48f97801b7c1 Author: Fabien Arcellier Date: Mon Nov 18 21:56:20 2024 +0100 chore: clean up core * chore: move dataframe dedicated method into core_df commit a30d46b47ea9dc059c2b75c1a4c4c64f4f9ff8cb Author: Fabien Arcellier Date: Mon Nov 18 21:44:19 2024 +0100 chore: clean up core * chore: move all functions that start with writer_event_handler_* to EventHandlerExecutor class as static commit f571e489efd56310ff7b6f58371e2389764eae90 Author: mmikita95 Date: Mon Nov 18 13:15:56 2024 +0300 fix: change logic for finish reason processing --- src/writer/__init__.py | 2 +- src/writer/ai.py | 14 +- src/writer/core.py | 624 ++++++----------------------------------- src/writer/core_df.py | 479 +++++++++++++++++++++++++++++++ 4 files changed, 569 insertions(+), 550 deletions(-) create mode 100644 src/writer/core_df.py diff --git a/src/writer/__init__.py b/src/writer/__init__.py index 29fb3548e..b0ff35089 100644 --- a/src/writer/__init__.py +++ b/src/writer/__init__.py @@ -7,7 +7,6 @@ from writer.core import ( BytesWrapper, Config, - EditableDataframe, FileWrapper, Readable, State, @@ -22,6 +21,7 @@ from writer.core import ( writerproperty as property, ) +from writer.core_df import EditableDataframe try: from writer.ui import WriterUIManager diff --git a/src/writer/ai.py b/src/writer/ai.py index bb260a9ad..569bb63bb 100644 --- a/src/writer/ai.py +++ b/src/writer/ai.py @@ -1663,10 +1663,15 @@ def _process_stream_response( chunk |= {"chunk": True} # Handling tool call fragments - if chunk.get("tool_calls") is not None: - self += chunk - self._process_streaming_tool_calls(chunk) - if chunk_data.get("finish_reason") == "tool_calls": + tool_calls_present = chunk.get("tool_calls") is not None + tool_calls_need_processing = \ + chunk_data.get("finish_reason") == "tool_calls" + if tool_calls_present or tool_calls_need_processing: + # Handle tool calls chunks + if tool_calls_present: + self += chunk + self._process_streaming_tool_calls(chunk) + if tool_calls_need_processing: # Send follow-up call to LLM self.messages += self._gather_tool_calls_results() follow_up_response = cast( @@ -1690,7 +1695,6 @@ def _process_stream_response( ) finally: follow_up_response.close() - else: # Handle regular message chunks if chunk.get("content") is not None: diff --git a/src/writer/core.py b/src/writer/core.py index ffe776c68..f0391ac2f 100644 --- a/src/writer/core.py +++ b/src/writer/core.py @@ -16,7 +16,6 @@ import time import traceback import urllib.request -from abc import ABCMeta from contextvars import ContextVar from multiprocessing.process import BaseProcess from types import ModuleType @@ -46,9 +45,6 @@ from writer import core_ui, evaluator from writer.core_ui import Component from writer.ss_types import ( - DataframeRecordAdded, - DataframeRecordRemoved, - DataframeRecordUpdated, InstancePath, Readable, ServeMode, @@ -60,7 +56,6 @@ if TYPE_CHECKING: import pandas - import polars from writer.app_runner import AppProcess from writer.ss_types import AppProcessServerRequest @@ -261,6 +256,8 @@ class StateSerialiser: """ def serialise(self, v: Any) -> Union[Dict, List, str, bool, int, float, None]: from writer.ai import Conversation + from writer.core_df import EditableDataframe + if isinstance(v, State): return self._serialise_dict_recursively(v.to_dict()) if isinstance(v, Conversation): @@ -461,7 +458,7 @@ def __setitem__(self, key: str, raw_value: Any) -> None: "new_value": raw_value } - writer_event_handler_invoke(local_mutation.handler, { + EventHandlerExecutor.invoke(local_mutation.handler, { "state": local_mutation.state, "context": context_data, "payload": payload, @@ -831,7 +828,7 @@ def subscribe_mutation(self, # existing states. To cause this, we trigger manually # the handler. if initial_triggered is True: - writer_event_handler_invoke(handler, { + EventHandlerExecutor.invoke(handler, { "state": self, "context": {"event": "init"}, "payload": {}, @@ -1080,7 +1077,7 @@ def __init__(self, middleware: Callable): @contextlib.contextmanager def execute(self, args: dict): - middleware_args = writer_event_handler_build_arguments(self.middleware, args) + middleware_args = EventHandlerExecutor.build_arguments(self.middleware, args) it = self.middleware(*middleware_args) try: yield from it @@ -1651,7 +1648,7 @@ def _call_handler_callable( with core_ui.use_component_tree(self.session.session_component_tree), \ contextlib.redirect_stdout(io.StringIO()) as f: middlewares_executors = current_app_process.middleware_registry.executors() - result = writer_event_handler_invoke_with_middlewares(middlewares_executors, handler_callable, writer_args) + result = EventHandlerExecutor.invoke_with_middlewares(middlewares_executors, handler_callable, writer_args) captured_stdout = f.getvalue() if captured_stdout: @@ -1692,6 +1689,80 @@ def handle(self, ev: WriterEvent) -> WriterEventResult: return {"ok": ok, "result": result} +class EventHandlerExecutor: + + @staticmethod + def build_arguments(func: Callable, writer_args: dict) -> List[Any]: + """ + Constructs the list of arguments based on the signature of the function + which can be a handler or middleware. + + >>> def my_event_handler(state, context): + >>> yield + + >>> args = EventHandlerExecutor.build_arguments(my_event_handler, {'state': {}, 'payload': {}, 'context': {"target": '11'}, 'session': None, 'ui': None}) + >>> [{}, {"target": '11'}] + + :param func: the function that will be called + :param writer_args: the possible arguments in writer (state, payload, ...) + """ + handler_args = inspect.getfullargspec(func).args + func_args = [] + for arg in handler_args: + if arg in writer_args: + func_args.append(writer_args[arg]) + + return func_args + + @staticmethod + def invoke(callable_handler: Callable, writer_args: dict) -> Any: + """ + Runs a handler based on its signature. + + If the handler is asynchronous, it is executed asynchronously. + If the handler only has certain parameters, only these are passed as arguments + + >>> def my_handler(state): + >>> state['a'] = 2 + >>> + >>> EventHandlerExecutor.invoke(my_handler, {'state': {'a': 1}, 'payload': None, 'context': None, 'session': None, 'ui': None}) + """ + is_async_handler = inspect.iscoroutinefunction(callable_handler) + if (not callable(callable_handler) and not is_async_handler): + raise ValueError("Invalid handler. The handler isn't a callable object.") + + handler_args = EventHandlerExecutor.build_arguments(callable_handler, writer_args) + + if is_async_handler: + async_wrapper = _async_wrapper_internal(callable_handler, handler_args) + result = asyncio.run(async_wrapper) + else: + result = callable_handler(*handler_args) + + return result + + @staticmethod + def invoke_with_middlewares(middlewares_executors: List[MiddlewareExecutor], callable_handler: Callable, writer_args: dict) -> Any: + """ + Runs the middlewares then the handler. This function allows you to manage exceptions that are triggered in middleware + + :param middlewares_executors: The list of middleware to run + :param callable_handler: The target handler + + >>> @wf.middleware() + >>> def my_middleware(state, payload, context, session, ui): + >>> yield + + >>> executor = MiddlewareExecutor(my_middleware, {'state': {}, 'payload': None, 'context': None, 'session': None, 'ui': None}) + >>> EventHandlerExecutor.invoke_with_middlewares([executor], my_handler, {'state': {}, 'payload': None, 'context': None, 'session': None, 'ui': None} + """ + if len(middlewares_executors) == 0: + return EventHandlerExecutor.invoke(callable_handler, writer_args) + else: + executor = middlewares_executors[0] + with executor.execute(writer_args): + return EventHandlerExecutor.invoke_with_middlewares(middlewares_executors[1:], callable_handler, writer_args) + class DictPropertyProxy: """ @@ -1739,412 +1810,6 @@ def __set__(self, instance, value): proxy[self.key] = value -class DataframeRecordRemove: - pass - - -class DataframeRecordProcessor(): - """ - This interface defines the signature of the methods to process the events of a - dataframe compatible with EditableDataframe. - - A Dataframe can be any structure composed of tabular data. - - This class defines the signature of the methods to be implemented. - """ - __metaclass__ = ABCMeta - - @staticmethod - def match(df: Any) -> bool: - """ - This method checks if the dataframe is compatible with the processor. - """ - raise NotImplementedError - - @staticmethod - def record(df: Any, record_index: int) -> dict: - """ - This method read a record at the given line and get it back as dictionary - - >>> edf = EditableDataframe(df) - >>> r = edf.record(1) - """ - raise NotImplementedError - - @staticmethod - def record_add(df: Any, payload: DataframeRecordAdded) -> Any: - """ - signature of the methods to be implemented to process wf-dataframe-add event - - >>> edf = EditableDataframe(df) - >>> edf.record_add({"record": {"a": 1, "b": 2}}) - """ - raise NotImplementedError - - @staticmethod - def record_update(df: Any, payload: DataframeRecordUpdated) -> Any: - """ - signature of the methods to be implemented to process wf-dataframe-update event - - >>> edf = EditableDataframe(df) - >>> edf.record_update({"record_index": 12, "record": {"a": 1, "b": 2}}) - """ - raise NotImplementedError - - @staticmethod - def record_remove(df: Any, payload: DataframeRecordRemoved) -> Any: - """ - signature of the methods to be implemented to process wf-dataframe-action event - - >>> edf = EditableDataframe(df) - >>> edf.record_remove({"record_index": 12}) - """ - raise NotImplementedError - - @staticmethod - def pyarrow_table(df: Any) -> pyarrow.Table: - """ - Serializes the dataframe into a pyarrow table - """ - raise NotImplementedError - - -class PandasRecordProcessor(DataframeRecordProcessor): - """ - PandasRecordProcessor processes records from a pandas dataframe saved into an EditableDataframe - - >>> df = pandas.DataFrame({"a": [1, 2], "b": [3, 4]}) - >>> edf = EditableDataframe(df) - >>> edf.record_add({"a": 1, "b": 2}) - """ - - @staticmethod - @import_failure(rvalue=False) - def match(df: Any) -> bool: - import pandas - return True if isinstance(df, pandas.DataFrame) else False - - @staticmethod - def record(df: 'pandas.DataFrame', record_index: int) -> dict: - """ - - >>> edf = EditableDataframe(df) - >>> r = edf.record(1) - """ - import pandas - - record = df.iloc[record_index] - if not isinstance(df.index, pandas.RangeIndex): - index_list = df.index.tolist() - record_index_content = index_list[record_index] - if isinstance(record_index_content, tuple): - for i, n in enumerate(df.index.names): - record[n] = record_index_content[i] - else: - record[df.index.names[0]] = record_index_content - - return dict(record) - - @staticmethod - def record_add(df: 'pandas.DataFrame', payload: DataframeRecordAdded) -> 'pandas.DataFrame': - """ - >>> edf = EditableDataframe(df) - >>> edf.record_add({"record": {"a": 1, "b": 2}}) - """ - import pandas - - _assert_record_match_pandas_df(df, payload['record']) - - record, index = _split_record_as_pandas_record_and_index(payload['record'], df.index.names) - - if isinstance(df.index, pandas.RangeIndex): - new_df = pandas.DataFrame([record]) - return pandas.concat([df, new_df], ignore_index=True) - else: - new_df = pandas.DataFrame([record], index=[index]) - return pandas.concat([df, new_df]) - - @staticmethod - def record_update(df: 'pandas.DataFrame', payload: DataframeRecordUpdated) -> 'pandas.DataFrame': - """ - >>> edf = EditableDataframe(df) - >>> edf.record_update({"record_index": 12, "record": {"a": 1, "b": 2}}) - """ - import pandas - - _assert_record_match_pandas_df(df, payload['record']) - - record: dict - record, index = _split_record_as_pandas_record_and_index(payload['record'], df.index.names) - - record_index = payload['record_index'] - - if isinstance(df.index, pandas.RangeIndex): - df.iloc[record_index] = record # type: ignore - else: - df.iloc[record_index] = record # type: ignore - index_list = df.index.tolist() - index_list[record_index] = index - df.index = index_list # type: ignore - - return df - - @staticmethod - def record_remove(df: 'pandas.DataFrame', payload: DataframeRecordRemoved) -> 'pandas.DataFrame': - """ - >>> edf = EditableDataframe(df) - >>> edf.record_remove({"record_index": 12}) - """ - record_index: int = payload['record_index'] - idx = df.index[record_index] - df = df.drop(idx) - - return df - - @staticmethod - def pyarrow_table(df: 'pandas.DataFrame') -> pyarrow.Table: - """ - Serializes the dataframe into a pyarrow table - """ - table = pyarrow.Table.from_pandas(df=df) - return table - - -class PolarRecordProcessor(DataframeRecordProcessor): - """ - PolarRecordProcessor processes records from a polar dataframe saved into an EditableDataframe - - >>> df = polars.DataFrame({"a": [1, 2], "b": [3, 4]}) - >>> edf = EditableDataframe(df) - >>> edf.record_add({"record": {"a": 1, "b": 2}}) - """ - - @staticmethod - @import_failure(rvalue=False) - def match(df: Any) -> bool: - import polars - return True if isinstance(df, polars.DataFrame) else False - - @staticmethod - def record(df: 'polars.DataFrame', record_index: int) -> dict: - """ - - >>> edf = EditableDataframe(df) - >>> r = edf.record(1) - """ - record = {} - r = df[record_index] - for c in r.columns: - record[c] = df[record_index, c] - - return record - - - @staticmethod - def record_add(df: 'polars.DataFrame', payload: DataframeRecordAdded) -> 'polars.DataFrame': - _assert_record_match_polar_df(df, payload['record']) - - import polars - new_df = polars.DataFrame([payload['record']]) - return polars.concat([df, new_df]) - - @staticmethod - def record_update(df: 'polars.DataFrame', payload: DataframeRecordUpdated) -> 'polars.DataFrame': - # This implementation works but is not optimal. - # I didn't find a better way to update a record in polars - # - # https://github.com/pola-rs/polars/issues/5973 - _assert_record_match_polar_df(df, payload['record']) - - record = payload['record'] - record_index = payload['record_index'] - for r in record: - df[record_index, r] = record[r] - - return df - - @staticmethod - def record_remove(df: 'polars.DataFrame', payload: DataframeRecordRemoved) -> 'polars.DataFrame': - import polars - - record_index: int = payload['record_index'] - df_filtered = polars.concat([df[:record_index], df[record_index + 1:]]) - return df_filtered - - @staticmethod - def pyarrow_table(df: 'polars.DataFrame') -> pyarrow.Table: - """ - Serializes the dataframe into a pyarrow table - """ - import pyarrow.interchange - table: pyarrow.Table = pyarrow.interchange.from_dataframe(df) - return table - -class RecordListRecordProcessor(DataframeRecordProcessor): - """ - RecordListRecordProcessor processes records from a list of record saved into an EditableDataframe - - >>> df = [{"a": 1, "b": 2}, {"a": 3, "b": 4}] - >>> edf = EditableDataframe(df) - >>> edf.record_add({"record": {"a": 1, "b": 2}}) - """ - - @staticmethod - def match(df: Any) -> bool: - return True if isinstance(df, list) else False - - - @staticmethod - def record(df: List[Dict[str, Any]], record_index: int) -> dict: - """ - - >>> edf = EditableDataframe(df) - >>> r = edf.record(1) - """ - r = df[record_index] - return copy.copy(r) - - @staticmethod - def record_add(df: List[Dict[str, Any]], payload: DataframeRecordAdded) -> List[Dict[str, Any]]: - _assert_record_match_list_of_records(df, payload['record']) - df.append(payload['record']) - return df - - @staticmethod - def record_update(df: List[Dict[str, Any]], payload: DataframeRecordUpdated) -> List[Dict[str, Any]]: - _assert_record_match_list_of_records(df, payload['record']) - - record_index = payload['record_index'] - record = payload['record'] - - df[record_index] = record - return df - - @staticmethod - def record_remove(df: List[Dict[str, Any]], payload: DataframeRecordRemoved) -> List[Dict[str, Any]]: - del(df[payload['record_index']]) - return df - - @staticmethod - def pyarrow_table(df: List[Dict[str, Any]]) -> pyarrow.Table: - """ - Serializes the dataframe into a pyarrow table - """ - column_names = list(df[0].keys()) - columns = {key: [record[key] for record in df] for key in column_names} - - pyarrow_columns = {key: pyarrow.array(values) for key, values in columns.items()} - schema = pyarrow.schema([(key, pyarrow_columns[key].type) for key in pyarrow_columns]) - table = pyarrow.Table.from_arrays( - [pyarrow_columns[key] for key in column_names], - schema=schema - ) - - return table - -class EditableDataframe(MutableValue): - """ - Editable Dataframe makes it easier to process events from components - that modify a dataframe like the dataframe editor. - - >>> initial_state = wf.init_state({ - >>> "df": wf.EditableDataframe(df) - >>> }) - - Editable Dataframe is compatible with a pandas, thrillers or record list dataframe - """ - processors = [PandasRecordProcessor, PolarRecordProcessor, RecordListRecordProcessor] - - def __init__(self, df: Union['pandas.DataFrame', 'polars.DataFrame', List[dict]]): - super().__init__() - self._df = df - self.processor: Type[DataframeRecordProcessor] - for processor in self.processors: - if processor.match(self.df): - self.processor = processor - break - - if self.processor is None: - raise ValueError("The dataframe must be a pandas, polar Dataframe or a list of record") - - @property - def df(self) -> Union['pandas.DataFrame', 'polars.DataFrame', List[dict]]: - return self._df - - @df.setter - def df(self, value: Union['pandas.DataFrame', 'polars.DataFrame', List[dict]]) -> None: - self._df = value - self.mutate() - - def record_add(self, payload: DataframeRecordAdded) -> None: - """ - Adds a record to the dataframe - - >>> df = pandas.DataFrame({"a": [1, 2], "b": [3, 4]}) - >>> edf = EditableDataframe(df) - >>> edf.record_add({"record": {"a": 1, "b": 2}}) - """ - assert self.processor is not None - - self._df = self.processor.record_add(self.df, payload) - self.mutate() - - def record_update(self, payload: DataframeRecordUpdated) -> None: - """ - Updates a record in the dataframe - - The record must be complete otherwise an error is raised (ValueError). - It must a value for each index / column. - - >>> df = pandas.DataFrame({"a": [1, 2], "b": [3, 4]}) - >>> edf = EditableDataframe(df) - >>> edf.record_update({"record_index": 0, "record": {"a": 2, "b": 2}}) - """ - assert self.processor is not None - - self._df = self.processor.record_update(self.df, payload) - self.mutate() - - def record_remove(self, payload: DataframeRecordRemoved) -> None: - """ - Removes a record from the dataframe - - >>> df = pandas.DataFrame({"a": [1, 2], "b": [3, 4]}) - >>> edf = EditableDataframe(df) - >>> edf.record_remove({"record_index": 0}) - """ - assert self.processor is not None - - self._df = self.processor.record_remove(self.df, payload) - self.mutate() - - def pyarrow_table(self) -> pyarrow.Table: - """ - Serializes the dataframe into a pyarrow table - - This mechanism is used for serializing data for transmission to the frontend. - - >>> df = pandas.DataFrame({"a": [1, 2], "b": [3, 4]}) - >>> edf = EditableDataframe(df) - >>> pa_table = edf.pyarrow_table() - """ - assert self.processor is not None - - pa_table = self.processor.pyarrow_table(self.df) - return pa_table - - def record(self, record_index: int): - """ - Retrieves a specific record in dictionary form. - - :param record_index: - :return: - """ - assert self.processor is not None - - record = self.processor.record(self.df, record_index) - return record - S = TypeVar("S", bound=WriterState) def new_initial_state(klass: Type[S], raw_state: dict) -> S: @@ -2329,119 +1994,10 @@ def parse_state_variable_expression(p: str): return parts -def writer_event_handler_build_arguments(func: Callable, writer_args: dict) -> List[Any]: - """ - Constructs the list of arguments based on the signature of the function - which can be a handler or middleware. - - >>> def my_event_handler(state, context): - >>> yield - - >>> args = writer_event_handler_build_arguments(my_event_handler, {'state': {}, 'payload': {}, 'context': {"target": '11'}, 'session': None, 'ui': None}) - >>> [{}, {"target": '11'}] - - :param func: the function that will be called - :param writer_args: the possible arguments in writer (state, payload, ...) - """ - handler_args = inspect.getfullargspec(func).args - func_args = [] - for arg in handler_args: - if arg in writer_args: - func_args.append(writer_args[arg]) - - return func_args - - -def writer_event_handler_invoke(callable_handler: Callable, writer_args: dict) -> Any: - """ - Runs a handler based on its signature. - - If the handler is asynchronous, it is executed asynchronously. - If the handler only has certain parameters, only these are passed as arguments - - >>> def my_handler(state): - >>> state['a'] = 2 - >>> - >>> writer_event_handler_invoke(my_handler, {'state': {'a': 1}, 'payload': None, 'context': None, 'session': None, 'ui': None}) - """ - is_async_handler = inspect.iscoroutinefunction(callable_handler) - if (not callable(callable_handler) and not is_async_handler): - raise ValueError("Invalid handler. The handler isn't a callable object.") - - handler_args = writer_event_handler_build_arguments(callable_handler, writer_args) - - if is_async_handler: - async_wrapper = _async_wrapper_internal(callable_handler, handler_args) - result = asyncio.run(async_wrapper) - else: - result = callable_handler(*handler_args) - - return result - -def writer_event_handler_invoke_with_middlewares(middlewares_executors: List[MiddlewareExecutor], callable_handler: Callable, writer_args: dict) -> Any: - """ - Runs the middlewares then the handler. This function allows you to manage exceptions that are triggered in middleware - - :param middlewares_executors: The list of middleware to run - :param callable_handler: The target handler - - >>> @wf.middleware() - >>> def my_middleware(state, payload, context, session, ui): - >>> yield - - >>> executor = MiddlewareExecutor(my_middleware, {'state': {}, 'payload': None, 'context': None, 'session': None, 'ui': None}) - >>> writer_event_handler_invoke_with_middlewares([executor], my_handler, {'state': {}, 'payload': None, 'context': None, 'session': None, 'ui': None} - """ - if len(middlewares_executors) == 0: - return writer_event_handler_invoke(callable_handler, writer_args) - else: - executor = middlewares_executors[0] - with executor.execute(writer_args): - return writer_event_handler_invoke_with_middlewares(middlewares_executors[1:], callable_handler, writer_args) - - async def _async_wrapper_internal(callable_handler: Callable, arg_values: List[Any]) -> Any: result = await callable_handler(*arg_values) return result -def _assert_record_match_pandas_df(df: 'pandas.DataFrame', record: Dict[str, Any]) -> None: - """ - Asserts that the record matches the dataframe columns & index - - >>> _assert_record_match_pandas_df(pandas.DataFrame({"a": [1, 2], "b": [3, 4]}), {"a": 1, "b": 2}) - """ - import pandas - - columns = set(list(df.columns.values) + df.index.names) if isinstance(df.index, pandas.RangeIndex) is False else set(df.columns.values) - columns_record = set(record.keys()) - if columns != columns_record: - raise ValueError(f"Columns mismatch. Expected {columns}, got {columns_record}") - -def _assert_record_match_polar_df(df: 'polars.DataFrame', record: Dict[str, Any]) -> None: - """ - Asserts that the record matches the columns of polar dataframe - - >>> _assert_record_match_pandas_df(polars.DataFrame({"a": [1, 2], "b": [3, 4]}), {"a": 1, "b": 2}) - """ - columns = set(df.columns) - columns_record = set(record.keys()) - if columns != columns_record: - raise ValueError(f"Columns mismatch. Expected {columns}, got {columns_record}") - -def _assert_record_match_list_of_records(df: List[Dict[str, Any]], record: Dict[str, Any]) -> None: - """ - Asserts that the record matches the key in the record list (it use the first record to check) - - >>> _assert_record_match_list_of_records([{"a": 1, "b": 2}, {"a": 3, "b": 4}], {"a": 1, "b": 2}) - """ - if len(df) == 0: - return - - columns = set(df[0].keys()) - columns_record = set(record.keys()) - if columns != columns_record: - raise ValueError(f"Columns mismatch. Expected {columns}, got {columns_record}") - def _event_handler_session_info() -> Dict[str, Any]: """ Returns the session information for the current event handler. @@ -2467,26 +2023,6 @@ def _event_handler_ui_manager(): else: raise RuntimeError(_get_ui_runtime_error_message()) - -def _split_record_as_pandas_record_and_index(param: dict, index_columns: list) -> Tuple[dict, tuple]: - """ - Separates a record into the record part and the index part to be able to - create or update a row in a dataframe. - - >>> record, index = _split_record_as_pandas_record_and_index({"a": 1, "b": 2}, ["a"]) - >>> print(record) # {"b": 2} - >>> print(index) # (1,) - """ - final_record = {} - final_index = [] - for key, value in param.items(): - if key in index_columns: - final_index.append(value) - else: - final_record[key] = value - - return final_record, tuple(final_index) - def _deserialize_bigint_format(payload: Optional[Union[dict, list]]): """ Decodes the payload of a big int serialization diff --git a/src/writer/core_df.py b/src/writer/core_df.py new file mode 100644 index 000000000..f5e694798 --- /dev/null +++ b/src/writer/core_df.py @@ -0,0 +1,479 @@ +""" +`core_df` contains classes and functions that allow you to manipulate editable dataframes. +""" +import copy +from abc import ABCMeta +from typing import TYPE_CHECKING, Any, Dict, List, Tuple, Type, Union + +import pyarrow # type: ignore + +from .core import MutableValue, import_failure +from .ss_types import DataframeRecordAdded, DataframeRecordRemoved, DataframeRecordUpdated + +if TYPE_CHECKING: + import pandas + import polars + + +class DataframeRecordProcessor(): + """ + This interface defines the signature of the methods to process the events of a + dataframe compatible with EditableDataframe. + + A Dataframe can be any structure composed of tabular data. + + This class defines the signature of the methods to be implemented. + """ + __metaclass__ = ABCMeta + + @staticmethod + def match(df: Any) -> bool: + """ + This method checks if the dataframe is compatible with the processor. + """ + raise NotImplementedError + + @staticmethod + def record(df: Any, record_index: int) -> dict: + """ + This method read a record at the given line and get it back as dictionary + + >>> edf = EditableDataframe(df) + >>> r = edf.record(1) + """ + raise NotImplementedError + + @staticmethod + def record_add(df: Any, payload: DataframeRecordAdded) -> Any: + """ + signature of the methods to be implemented to process wf-dataframe-add event + + >>> edf = EditableDataframe(df) + >>> edf.record_add({"record": {"a": 1, "b": 2}}) + """ + raise NotImplementedError + + @staticmethod + def record_update(df: Any, payload: DataframeRecordUpdated) -> Any: + """ + signature of the methods to be implemented to process wf-dataframe-update event + + >>> edf = EditableDataframe(df) + >>> edf.record_update({"record_index": 12, "record": {"a": 1, "b": 2}}) + """ + raise NotImplementedError + + @staticmethod + def record_remove(df: Any, payload: DataframeRecordRemoved) -> Any: + """ + signature of the methods to be implemented to process wf-dataframe-action event + + >>> edf = EditableDataframe(df) + >>> edf.record_remove({"record_index": 12}) + """ + raise NotImplementedError + + @staticmethod + def pyarrow_table(df: Any) -> pyarrow.Table: + """ + Serializes the dataframe into a pyarrow table + """ + raise NotImplementedError + + +class PandasRecordProcessor(DataframeRecordProcessor): + """ + PandasRecordProcessor processes records from a pandas dataframe saved into an EditableDataframe + + >>> df = pandas.DataFrame({"a": [1, 2], "b": [3, 4]}) + >>> edf = EditableDataframe(df) + >>> edf.record_add({"a": 1, "b": 2}) + """ + + @staticmethod + @import_failure(rvalue=False) + def match(df: Any) -> bool: + import pandas + return True if isinstance(df, pandas.DataFrame) else False + + @staticmethod + def record(df: 'pandas.DataFrame', record_index: int) -> dict: + """ + + >>> edf = EditableDataframe(df) + >>> r = edf.record(1) + """ + import pandas + + record = df.iloc[record_index] + if not isinstance(df.index, pandas.RangeIndex): + index_list = df.index.tolist() + record_index_content = index_list[record_index] + if isinstance(record_index_content, tuple): + for i, n in enumerate(df.index.names): + record[n] = record_index_content[i] + else: + record[df.index.names[0]] = record_index_content + + return dict(record) + + @staticmethod + def record_add(df: 'pandas.DataFrame', payload: DataframeRecordAdded) -> 'pandas.DataFrame': + """ + >>> edf = EditableDataframe(df) + >>> edf.record_add({"record": {"a": 1, "b": 2}}) + """ + import pandas + + _assert_record_match_pandas_df(df, payload['record']) + + record, index = _split_record_as_pandas_record_and_index(payload['record'], df.index.names) + + if isinstance(df.index, pandas.RangeIndex): + new_df = pandas.DataFrame([record]) + return pandas.concat([df, new_df], ignore_index=True) + else: + new_df = pandas.DataFrame([record], index=[index]) + return pandas.concat([df, new_df]) + + @staticmethod + def record_update(df: 'pandas.DataFrame', payload: DataframeRecordUpdated) -> 'pandas.DataFrame': + """ + >>> edf = EditableDataframe(df) + >>> edf.record_update({"record_index": 12, "record": {"a": 1, "b": 2}}) + """ + import pandas + + _assert_record_match_pandas_df(df, payload['record']) + + record: dict + record, index = _split_record_as_pandas_record_and_index(payload['record'], df.index.names) + + record_index = payload['record_index'] + + if isinstance(df.index, pandas.RangeIndex): + df.iloc[record_index] = record # type: ignore + else: + df.iloc[record_index] = record # type: ignore + index_list = df.index.tolist() + index_list[record_index] = index + df.index = index_list # type: ignore + + return df + + @staticmethod + def record_remove(df: 'pandas.DataFrame', payload: DataframeRecordRemoved) -> 'pandas.DataFrame': + """ + >>> edf = EditableDataframe(df) + >>> edf.record_remove({"record_index": 12}) + """ + record_index: int = payload['record_index'] + idx = df.index[record_index] + df = df.drop(idx) + + return df + + @staticmethod + def pyarrow_table(df: 'pandas.DataFrame') -> pyarrow.Table: + """ + Serializes the dataframe into a pyarrow table + """ + table = pyarrow.Table.from_pandas(df=df) + return table + + +class PolarRecordProcessor(DataframeRecordProcessor): + """ + PolarRecordProcessor processes records from a polar dataframe saved into an EditableDataframe + + >>> df = polars.DataFrame({"a": [1, 2], "b": [3, 4]}) + >>> edf = EditableDataframe(df) + >>> edf.record_add({"record": {"a": 1, "b": 2}}) + """ + + @staticmethod + @import_failure(rvalue=False) + def match(df: Any) -> bool: + import polars + return True if isinstance(df, polars.DataFrame) else False + + @staticmethod + def record(df: 'polars.DataFrame', record_index: int) -> dict: + """ + + >>> edf = EditableDataframe(df) + >>> r = edf.record(1) + """ + record = {} + r = df[record_index] + for c in r.columns: + record[c] = df[record_index, c] + + return record + + + @staticmethod + def record_add(df: 'polars.DataFrame', payload: DataframeRecordAdded) -> 'polars.DataFrame': + _assert_record_match_polar_df(df, payload['record']) + + import polars + new_df = polars.DataFrame([payload['record']]) + return polars.concat([df, new_df]) + + @staticmethod + def record_update(df: 'polars.DataFrame', payload: DataframeRecordUpdated) -> 'polars.DataFrame': + # This implementation works but is not optimal. + # I didn't find a better way to update a record in polars + # + # https://github.com/pola-rs/polars/issues/5973 + _assert_record_match_polar_df(df, payload['record']) + + record = payload['record'] + record_index = payload['record_index'] + for r in record: + df[record_index, r] = record[r] + + return df + + @staticmethod + def record_remove(df: 'polars.DataFrame', payload: DataframeRecordRemoved) -> 'polars.DataFrame': + import polars + + record_index: int = payload['record_index'] + df_filtered = polars.concat([df[:record_index], df[record_index + 1:]]) + return df_filtered + + @staticmethod + def pyarrow_table(df: 'polars.DataFrame') -> pyarrow.Table: + """ + Serializes the dataframe into a pyarrow table + """ + import pyarrow.interchange # type: ignore + table: pyarrow.Table = pyarrow.interchange.from_dataframe(df) + return table + +class RecordListRecordProcessor(DataframeRecordProcessor): + """ + RecordListRecordProcessor processes records from a list of record saved into an EditableDataframe + + >>> df = [{"a": 1, "b": 2}, {"a": 3, "b": 4}] + >>> edf = EditableDataframe(df) + >>> edf.record_add({"record": {"a": 1, "b": 2}}) + """ + + @staticmethod + def match(df: Any) -> bool: + return True if isinstance(df, list) else False + + + @staticmethod + def record(df: List[Dict[str, Any]], record_index: int) -> dict: + """ + + >>> edf = EditableDataframe(df) + >>> r = edf.record(1) + """ + r = df[record_index] + return copy.copy(r) + + @staticmethod + def record_add(df: List[Dict[str, Any]], payload: DataframeRecordAdded) -> List[Dict[str, Any]]: + _assert_record_match_list_of_records(df, payload['record']) + df.append(payload['record']) + return df + + @staticmethod + def record_update(df: List[Dict[str, Any]], payload: DataframeRecordUpdated) -> List[Dict[str, Any]]: + _assert_record_match_list_of_records(df, payload['record']) + + record_index = payload['record_index'] + record = payload['record'] + + df[record_index] = record + return df + + @staticmethod + def record_remove(df: List[Dict[str, Any]], payload: DataframeRecordRemoved) -> List[Dict[str, Any]]: + del(df[payload['record_index']]) + return df + + @staticmethod + def pyarrow_table(df: List[Dict[str, Any]]) -> pyarrow.Table: + """ + Serializes the dataframe into a pyarrow table + """ + column_names = list(df[0].keys()) + columns = {key: [record[key] for record in df] for key in column_names} + + pyarrow_columns = {key: pyarrow.array(values) for key, values in columns.items()} + schema = pyarrow.schema([(key, pyarrow_columns[key].type) for key in pyarrow_columns]) + table = pyarrow.Table.from_arrays( + [pyarrow_columns[key] for key in column_names], + schema=schema + ) + + return table + +class EditableDataframe(MutableValue): + """ + Editable Dataframe makes it easier to process events from components + that modify a dataframe like the dataframe editor. + + >>> initial_state = wf.init_state({ + >>> "df": wf.EditableDataframe(df) + >>> }) + + Editable Dataframe is compatible with a pandas, thrillers or record list dataframe + """ + processors = [PandasRecordProcessor, PolarRecordProcessor, RecordListRecordProcessor] + + def __init__(self, df: Union['pandas.DataFrame', 'polars.DataFrame', List[dict]]): + super().__init__() + self._df = df + self.processor: Type[DataframeRecordProcessor] + for processor in self.processors: + if processor.match(self.df): + self.processor = processor + break + + if self.processor is None: + raise ValueError("The dataframe must be a pandas, polar Dataframe or a list of record") + + @property + def df(self) -> Union['pandas.DataFrame', 'polars.DataFrame', List[dict]]: + return self._df + + @df.setter + def df(self, value: Union['pandas.DataFrame', 'polars.DataFrame', List[dict]]) -> None: + self._df = value + self.mutate() + + def record_add(self, payload: DataframeRecordAdded) -> None: + """ + Adds a record to the dataframe + + >>> df = pandas.DataFrame({"a": [1, 2], "b": [3, 4]}) + >>> edf = EditableDataframe(df) + >>> edf.record_add({"record": {"a": 1, "b": 2}}) + """ + assert self.processor is not None + + self._df = self.processor.record_add(self.df, payload) + self.mutate() + + def record_update(self, payload: DataframeRecordUpdated) -> None: + """ + Updates a record in the dataframe + + The record must be complete otherwise an error is raised (ValueError). + It must a value for each index / column. + + >>> df = pandas.DataFrame({"a": [1, 2], "b": [3, 4]}) + >>> edf = EditableDataframe(df) + >>> edf.record_update({"record_index": 0, "record": {"a": 2, "b": 2}}) + """ + assert self.processor is not None + + self._df = self.processor.record_update(self.df, payload) + self.mutate() + + def record_remove(self, payload: DataframeRecordRemoved) -> None: + """ + Removes a record from the dataframe + + >>> df = pandas.DataFrame({"a": [1, 2], "b": [3, 4]}) + >>> edf = EditableDataframe(df) + >>> edf.record_remove({"record_index": 0}) + """ + assert self.processor is not None + + self._df = self.processor.record_remove(self.df, payload) + self.mutate() + + def pyarrow_table(self) -> pyarrow.Table: + """ + Serializes the dataframe into a pyarrow table + + This mechanism is used for serializing data for transmission to the frontend. + + >>> df = pandas.DataFrame({"a": [1, 2], "b": [3, 4]}) + >>> edf = EditableDataframe(df) + >>> pa_table = edf.pyarrow_table() + """ + assert self.processor is not None + + pa_table = self.processor.pyarrow_table(self.df) + return pa_table + + def record(self, record_index: int): + """ + Retrieves a specific record in dictionary form. + + :param record_index: + :return: + """ + assert self.processor is not None + + record = self.processor.record(self.df, record_index) + return record + + + +def _assert_record_match_pandas_df(df: 'pandas.DataFrame', record: Dict[str, Any]) -> None: + """ + Asserts that the record matches the dataframe columns & index + + >>> _assert_record_match_pandas_df(pandas.DataFrame({"a": [1, 2], "b": [3, 4]}), {"a": 1, "b": 2}) + """ + import pandas + + columns = set(list(df.columns.values) + df.index.names) if isinstance(df.index, pandas.RangeIndex) is False else set(df.columns.values) + columns_record = set(record.keys()) + if columns != columns_record: + raise ValueError(f"Columns mismatch. Expected {columns}, got {columns_record}") + +def _assert_record_match_polar_df(df: 'polars.DataFrame', record: Dict[str, Any]) -> None: + """ + Asserts that the record matches the columns of polar dataframe + + >>> _assert_record_match_pandas_df(polars.DataFrame({"a": [1, 2], "b": [3, 4]}), {"a": 1, "b": 2}) + """ + columns = set(df.columns) + columns_record = set(record.keys()) + if columns != columns_record: + raise ValueError(f"Columns mismatch. Expected {columns}, got {columns_record}") + +def _assert_record_match_list_of_records(df: List[Dict[str, Any]], record: Dict[str, Any]) -> None: + """ + Asserts that the record matches the key in the record list (it use the first record to check) + + >>> _assert_record_match_list_of_records([{"a": 1, "b": 2}, {"a": 3, "b": 4}], {"a": 1, "b": 2}) + """ + if len(df) == 0: + return + + columns = set(df[0].keys()) + columns_record = set(record.keys()) + if columns != columns_record: + raise ValueError(f"Columns mismatch. Expected {columns}, got {columns_record}") + + + +def _split_record_as_pandas_record_and_index(param: dict, index_columns: list) -> Tuple[dict, tuple]: + """ + Separates a record into the record part and the index part to be able to + create or update a row in a dataframe. + + >>> record, index = _split_record_as_pandas_record_and_index({"a": 1, "b": 2}, ["a"]) + >>> print(record) # {"b": 2} + >>> print(index) # (1,) + """ + final_record = {} + final_index = [] + for key, value in param.items(): + if key in index_columns: + final_index.append(value) + else: + final_record[key] = value + + return final_record, tuple(final_index) From a8e4233168b6125d654696013fa3a6a4874e0dfa Mon Sep 17 00:00:00 2001 From: Ramiro Medina <64783088+ramedina86@users.noreply.github.com> Date: Tue, 19 Nov 2024 10:49:20 +0000 Subject: [PATCH 09/20] fix: Mypy checks --- src/writer/core.py | 12 ++++++++---- src/writer/evaluator.py | 6 ++---- src/writer/workflows.py | 9 ++++----- 3 files changed, 14 insertions(+), 13 deletions(-) diff --git a/src/writer/core.py b/src/writer/core.py index f0391ac2f..823c6f98a 100644 --- a/src/writer/core.py +++ b/src/writer/core.py @@ -41,8 +41,7 @@ import pyarrow # type: ignore -import writer.workflows -from writer import core_ui, evaluator +from writer import core_ui from writer.core_ui import Component from writer.ss_types import ( InstancePath, @@ -1198,7 +1197,9 @@ class EventDeserialiser: applying sanitisation of inputs where relevant.""" def __init__(self, session: "WriterSession"): - self.evaluator = evaluator.Evaluator(session) + import writer.evaluator + + self.evaluator = writer.evaluator.Evaluator(session) def transform(self, ev: WriterEvent) -> None: # Events without payloads are safe @@ -1569,11 +1570,14 @@ class EventHandler: """ def __init__(self, session: WriterSession) -> None: + import writer.workflows + import writer.evaluator + self.session = session self.session_state = session.session_state self.session_component_tree = session.session_component_tree self.deser = EventDeserialiser(session) - self.evaluator = evaluator.Evaluator(session) + self.evaluator = writer.evaluator.Evaluator(session) self.workflow_runner = writer.workflows.WorkflowRunner(session) diff --git a/src/writer/evaluator.py b/src/writer/evaluator.py index 3a3e87937..733a4c973 100644 --- a/src/writer/evaluator.py +++ b/src/writer/evaluator.py @@ -1,7 +1,7 @@ import json import os import re -from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple +from typing import Any, Dict, List, Optional, Tuple from writer.ss_types import ( InstancePath, @@ -9,9 +9,7 @@ import writer.core - -if TYPE_CHECKING: - from writer.core import WriterSession +from writer.core import WriterSession class Evaluator: diff --git a/src/writer/workflows.py b/src/writer/workflows.py index c1cc6b7ed..860a1ee3c 100644 --- a/src/writer/workflows.py +++ b/src/writer/workflows.py @@ -1,7 +1,6 @@ import time -from typing import TYPE_CHECKING, Any, Dict, List, Literal, Optional, Tuple, Type +from typing import Any, Dict, List, Literal, Optional, Tuple, Type -import writer.core from writer.ss_types import WorkflowExecutionLog from typing import Dict, List from writer.ss_types import InstancePath @@ -9,12 +8,12 @@ WorkflowBlock_T = Type["WorkflowBlock"] block_map:Dict[str, WorkflowBlock_T] = {} -if TYPE_CHECKING: - from writer.core import WriterSession, Component, Config +import writer.core +from writer.core import WriterSession, Component class WorkflowRunner(): - def __init__(self, session: "WriterSession"): + def __init__(self, session: WriterSession): import writer.blocks self.execution: Dict[str, WorkflowBlock] = {} From d32c0d3beed1f12c9893782ef2968e28d8a7a92a Mon Sep 17 00:00:00 2001 From: Ramiro Medina <64783088+ramedina86@users.noreply.github.com> Date: Tue, 19 Nov 2024 11:54:55 +0000 Subject: [PATCH 10/20] fix: Ruff --- src/writer/core.py | 2 +- src/writer/evaluator.py | 5 ++--- src/writer/workflows.py | 8 +++----- tests/backend/blocks/test_parsejson.py | 1 + tests/backend/test_evaluator.py | 4 +--- 5 files changed, 8 insertions(+), 12 deletions(-) diff --git a/src/writer/core.py b/src/writer/core.py index 823c6f98a..822c98b9c 100644 --- a/src/writer/core.py +++ b/src/writer/core.py @@ -1570,8 +1570,8 @@ class EventHandler: """ def __init__(self, session: WriterSession) -> None: - import writer.workflows import writer.evaluator + import writer.workflows self.session = session self.session_state = session.session_state diff --git a/src/writer/evaluator.py b/src/writer/evaluator.py index 733a4c973..68e2dbca5 100644 --- a/src/writer/evaluator.py +++ b/src/writer/evaluator.py @@ -3,14 +3,13 @@ import re from typing import Any, Dict, List, Optional, Tuple +import writer.core +from writer.core import WriterSession from writer.ss_types import ( InstancePath, ) -import writer.core -from writer.core import WriterSession - class Evaluator: """ diff --git a/src/writer/workflows.py b/src/writer/workflows.py index 860a1ee3c..55e3fd939 100644 --- a/src/writer/workflows.py +++ b/src/writer/workflows.py @@ -1,15 +1,13 @@ import time from typing import Any, Dict, List, Literal, Optional, Tuple, Type -from writer.ss_types import WorkflowExecutionLog -from typing import Dict, List -from writer.ss_types import InstancePath +import writer.core +from writer.core import Component, WriterSession +from writer.ss_types import InstancePath, WorkflowExecutionLog WorkflowBlock_T = Type["WorkflowBlock"] block_map:Dict[str, WorkflowBlock_T] = {} -import writer.core -from writer.core import WriterSession, Component class WorkflowRunner(): diff --git a/tests/backend/blocks/test_parsejson.py b/tests/backend/blocks/test_parsejson.py index eda8ced03..e7eefca62 100644 --- a/tests/backend/blocks/test_parsejson.py +++ b/tests/backend/blocks/test_parsejson.py @@ -1,5 +1,6 @@ from writer.blocks.parsejson import ParseJSON + def test_valid_json(): assert 1 == 1 # tool = ParseJSON() \ No newline at end of file diff --git a/tests/backend/test_evaluator.py b/tests/backend/test_evaluator.py index 539d4d3ff..d2888cdd8 100644 --- a/tests/backend/test_evaluator.py +++ b/tests/backend/test_evaluator.py @@ -2,8 +2,7 @@ import numpy as np import writer as wf -from writer import audit_and_fix, wf_project -from writer import evaluator +from writer import audit_and_fix, evaluator, wf_project from writer.core import ( WriterState, ) @@ -147,7 +146,6 @@ def test_get_context_data_should_return_the_repeater_position_and_the_target_ins Here we reproduce a click on a button """ # Given - st = WriterState({}) session.session_component_tree = core_ui_fixtures.build_fake_component_tree([ Component(id="repeater1", parentId="root", type="repeater", content={'keyVariable': 'item', 'valueVariable': 'value', 'repeaterObject': json.dumps({'a': 'A', 'b': 'B'})}), Component(id="button1", parentId="repeater1", type="button") From 18794a08281454c5705966de0575109f6bcbae42 Mon Sep 17 00:00:00 2001 From: Ramiro Medina <64783088+ramedina86@users.noreply.github.com> Date: Tue, 19 Nov 2024 12:13:39 +0000 Subject: [PATCH 11/20] test: E2E workflows test --- tests/e2e/tests/workflows.spec.ts | 108 ++++++++++++++++++++++++------ 1 file changed, 87 insertions(+), 21 deletions(-) diff --git a/tests/e2e/tests/workflows.spec.ts b/tests/e2e/tests/workflows.spec.ts index 2f145c73c..41237c906 100644 --- a/tests/e2e/tests/workflows.spec.ts +++ b/tests/e2e/tests/workflows.spec.ts @@ -1,22 +1,15 @@ import { test, expect } from "@playwright/test"; -const setTextField = async (page, text) => { - await page.locator('div.CoreText.component').click(); - await page - .locator('.BuilderFieldsText[data-automation-key="text"] .templateInput') - .fill(text); -} - test.describe("Workflows", () => { let url: string; - test.beforeAll(async ({request}) => { + test.beforeAll(async ({ request }) => { const response = await request.post(`/preset/workflows`); expect(response.ok()).toBeTruthy(); - ({url} = await response.json()); + ({ url } = await response.json()); }); - test.afterAll(async ({request}) => { + test.afterAll(async ({ request }) => { await request.delete(url); }); @@ -24,19 +17,92 @@ test.describe("Workflows", () => { await page.goto(url); }); - test.describe("Payload and context", () => { - - const instancePaths = ["root:0,c0f99a9e-5004-4e75-a6c6-36f17490b134:0,ixxb26ukbvr0sknw:0,iftqnmjw8ipaknex:0,7no34ag7gmwgm1rd:0", "root:0,c0f99a9e-5004-4e75-a6c6-36f17490b134:0,ixxb26ukbvr0sknw:0,iftqnmjw8ipaknex:0,7no34ag7gmwgm1rd:0"]; + const inputData = [ + { object: "plant", color: "green" }, + { object: "cup", color: "pink" }, + ]; - test("completion", async ({ page }) => { - page.locator('.BuilderFieldsText[data-automation-key="text"] .fieldStateAutocomplete span.prop:text-matches("string")').click(); - - await setTextField(page, "@{types."); - await expect(page - .locator('.BuilderFieldsText[data-automation-key="text"] .templateInput')) - .toHaveValue("@{types.string"); + inputData.forEach(({ object, color }) => { + test(`Test context and payload in Workflows for ${object} ${color}`, async ({ + page, + }) => { + await page.getByPlaceholder(object).fill(color); + await page.locator(`[data-automation-action="toggle-log-panel"]`).click(); + const rowLocator = page + .locator(".BuilderLogPanel div.row") + .filter({ hasText: "Return value" }); + await rowLocator.getByRole("button", { name: "Details" }).click(); + const resultsLocator = page.locator( + `.BuilderModal [data-automation-key="result"]`, + ); + const returnValueLocator = page.locator( + `.BuilderModal [data-automation-key="return-value"]`, + ); + const expectedTexts = ["color", color, "object", object]; + expectedTexts.forEach( + async (text) => await expect(resultsLocator).toContainText(text), + ); + expectedTexts.forEach( + async (text) => await expect(returnValueLocator).toContainText(text), + ); }); - }); + test("Create workflow and run workflow handle_object from it", async ({ + page, + }) => { + await page.locator(`[data-automation-action="set-mode-workflows"]`).click(); + await page.locator(`[data-automation-action="add-workflow"]`).click(); + + await page + .locator( + `div.component.button[data-component-type="workflows_runworkflow"]`, + ) + .dragTo(page.locator(".WorkflowsWorkflow"), { + targetPosition: { x: 100, y: 100 }, + }); + const runWorkflowBlock = page.locator(`.WorkflowsNode.wf-type-workflows_runworkflow`); + + await page + .locator( + `div.component.button[data-component-type="workflows_returnvalue"]`, + ) + .dragTo(page.locator(".WorkflowsWorkflow"), { + targetPosition: { x: 400, y: 100 }, + }); + const returnValueBlock = page.locator(`.WorkflowsNode.wf-type-workflows_returnvalue`); + + await runWorkflowBlock.click(); + await page.locator(`.BuilderFieldsText[data-automation-key="workflowKey"] input`).fill("handle_object"); + const executionEnv = { + "payload": "blue", + "context": { + "item": { + "object": "bottle" + } + } + }; + await page.locator(`.BuilderFieldsObject[data-automation-key="executionEnv"] textarea`).fill(JSON.stringify(executionEnv)); + await page.locator(`[data-automation-action="close-settings"]`).click(); + + await runWorkflowBlock.locator(".ball.success").dragTo(returnValueBlock); + + await returnValueBlock.click(); + await page.locator(`.BuilderFieldsText[data-automation-key="value"] textarea`).fill("@{result}"); + + await page.locator(`[data-automation-action="run-workflow"]`).click(); + + await page.locator(`[data-automation-action="toggle-log-panel"]`).click(); + const rowLocator = page + .locator(".BuilderLogPanel div.row") + .filter({ hasText: "Return value" }).first();; + await rowLocator.getByRole("button", { name: "Details" }).click(); + const returnValueLocator = page.locator( + `.BuilderModal [data-automation-key="return-value"]`, + ); + const expectedTexts = ["color", "blue", "object", "bottle"]; + expectedTexts.forEach( + async (text) => await expect(returnValueLocator).toContainText(text), + ); + }); }); \ No newline at end of file From 55d9954a180ca58e6f870ce9fae1ec6d49d7b492 Mon Sep 17 00:00:00 2001 From: Ramiro Medina <64783088+ramedina86@users.noreply.github.com> Date: Tue, 19 Nov 2024 12:13:50 +0000 Subject: [PATCH 12/20] test: Workflows test app --- ...components-page-0-c0f99a9e-5004-4e75-a6c6-36f17490b134.jsonl | 2 +- .../workflows/.wf/components-page-1-i9io5f734z9esrxs.jsonl | 2 -- .../.wf/components-workflows_workflow-0-auxjfi7lssb268ly.jsonl | 2 +- tests/e2e/presets/workflows/.wf/metadata.json | 2 +- 4 files changed, 3 insertions(+), 5 deletions(-) delete mode 100644 tests/e2e/presets/workflows/.wf/components-page-1-i9io5f734z9esrxs.jsonl diff --git a/tests/e2e/presets/workflows/.wf/components-page-0-c0f99a9e-5004-4e75-a6c6-36f17490b134.jsonl b/tests/e2e/presets/workflows/.wf/components-page-0-c0f99a9e-5004-4e75-a6c6-36f17490b134.jsonl index 110c00bf7..ee3033be5 100644 --- a/tests/e2e/presets/workflows/.wf/components-page-0-c0f99a9e-5004-4e75-a6c6-36f17490b134.jsonl +++ b/tests/e2e/presets/workflows/.wf/components-page-0-c0f99a9e-5004-4e75-a6c6-36f17490b134.jsonl @@ -2,4 +2,4 @@ {"id": "bebc5fe9-63a7-46a7-b0fa-62303555cfaf", "type": "header", "content": {"text": "Workflows Test App"}, "handlers": {}, "isCodeManaged": false, "parentId": "c0f99a9e-5004-4e75-a6c6-36f17490b134", "position": 0, "visible": {"binding": "", "expression": true, "reversed": false}} {"id": "ixxb26ukbvr0sknw", "type": "repeater", "content": {"keyVariable": "itemId", "repeaterObject": "{ \"pl\": { \"object\": \"plant\" }, \"cu\": { \"object\": \"cup\" }}", "valueVariable": "item"}, "handlers": {}, "isCodeManaged": false, "parentId": "c0f99a9e-5004-4e75-a6c6-36f17490b134", "position": 1} {"id": "iftqnmjw8ipaknex", "type": "section", "content": {"title": "@{itemId}: @{item.object}"}, "handlers": {}, "isCodeManaged": false, "parentId": "ixxb26ukbvr0sknw", "position": 0} -{"id": "7no34ag7gmwgm1rd", "type": "textinput", "content": {"label": ""}, "handlers": {"wf-change": "$runWorkflow_handle_object"}, "isCodeManaged": false, "parentId": "iftqnmjw8ipaknex", "position": 0} +{"id": "7no34ag7gmwgm1rd", "type": "textinput", "content": {"label": "", "placeholder": "@{item.object}"}, "handlers": {"wf-change": "$runWorkflow_handle_object"}, "isCodeManaged": false, "parentId": "iftqnmjw8ipaknex", "position": 0} diff --git a/tests/e2e/presets/workflows/.wf/components-page-1-i9io5f734z9esrxs.jsonl b/tests/e2e/presets/workflows/.wf/components-page-1-i9io5f734z9esrxs.jsonl deleted file mode 100644 index 5a00a5f42..000000000 --- a/tests/e2e/presets/workflows/.wf/components-page-1-i9io5f734z9esrxs.jsonl +++ /dev/null @@ -1,2 +0,0 @@ -{"id": "i9io5f734z9esrxs", "type": "page", "content": {}, "handlers": {}, "isCodeManaged": false, "parentId": "root", "position": 1} -{"id": "16eeo21o6k8tcxf7", "type": "chatbot", "content": {"conversation": "@{convo}"}, "handlers": {"wf-chatbot-message": "$runWorkflow_handle_object"}, "isCodeManaged": false, "parentId": "i9io5f734z9esrxs", "position": 0} diff --git a/tests/e2e/presets/workflows/.wf/components-workflows_workflow-0-auxjfi7lssb268ly.jsonl b/tests/e2e/presets/workflows/.wf/components-workflows_workflow-0-auxjfi7lssb268ly.jsonl index 8a10d040c..4216ffb42 100644 --- a/tests/e2e/presets/workflows/.wf/components-workflows_workflow-0-auxjfi7lssb268ly.jsonl +++ b/tests/e2e/presets/workflows/.wf/components-workflows_workflow-0-auxjfi7lssb268ly.jsonl @@ -1,4 +1,4 @@ {"id": "auxjfi7lssb268ly", "type": "workflows_workflow", "content": {"key": "handle_object"}, "handlers": {}, "isCodeManaged": false, "parentId": "workflows_root", "position": 0} -{"id": "8y56lmia3wu99jhl", "type": "workflows_parsejson", "content": {"plainText": "{\"color\": \"@{payload}\", \"object\": \"@{context.item.object}\"}"}, "handlers": {}, "isCodeManaged": false, "outs": [{"toNodeId": "xy6vdzh2pm55alc0", "outId": "success"}], "parentId": "auxjfi7lssb268ly", "position": 0, "x": 208, "y": 321} +{"id": "8y56lmia3wu99jhl", "type": "workflows_parsejson", "content": {"plainText": "{\"color\": \"@{payload}\", \"object\": \"@{context.item.object}\"}"}, "handlers": {}, "isCodeManaged": false, "outs": [{"toNodeId": "xy6vdzh2pm55alc0", "outId": "success"}], "parentId": "auxjfi7lssb268ly", "position": 0, "x": 150, "y": 319} {"id": "xy6vdzh2pm55alc0", "type": "workflows_setstate", "content": {"alias": "Save the JSON", "element": "json_e2e", "value": "@{result}"}, "handlers": {}, "isCodeManaged": false, "outs": [{"toNodeId": "mve8ssvtk0pvw5yf", "outId": "success"}], "parentId": "auxjfi7lssb268ly", "position": 1, "x": 537, "y": 321} {"id": "mve8ssvtk0pvw5yf", "type": "workflows_returnvalue", "content": {"alias": "", "value": "@{json_e2e}"}, "handlers": {}, "isCodeManaged": false, "parentId": "auxjfi7lssb268ly", "position": 2, "x": 876, "y": 317} diff --git a/tests/e2e/presets/workflows/.wf/metadata.json b/tests/e2e/presets/workflows/.wf/metadata.json index f3c150d97..da9c77c7c 100644 --- a/tests/e2e/presets/workflows/.wf/metadata.json +++ b/tests/e2e/presets/workflows/.wf/metadata.json @@ -1,3 +1,3 @@ { - "writer_version": "0.8.0rc7" + "writer_version": "0.8.0rc3" } \ No newline at end of file From ab9f32014cda61c1dd5b8bae2a12071faaf0c87d Mon Sep 17 00:00:00 2001 From: Ramiro Medina <64783088+ramedina86@users.noreply.github.com> Date: Tue, 19 Nov 2024 13:27:44 +0100 Subject: [PATCH 13/20] chore: Tests and automation actions --- src/ui/src/builder/BuilderHeader.vue | 3 +- .../builder/BuilderLogWorkflowExecution.vue | 39 +++++++------------ src/ui/src/builder/BuilderSettings.vue | 2 +- src/ui/src/builder/BuilderSidebarTree.vue | 12 +++++- src/ui/src/builder/BuilderSwitcher.vue | 8 +++- .../workflows/WorkflowsWorkflow.vue | 1 + src/ui/src/core/auditAndFix.ts | 1 + tests/e2e/tests/reuse.spec.ts | 2 +- 8 files changed, 38 insertions(+), 30 deletions(-) diff --git a/src/ui/src/builder/BuilderHeader.vue b/src/ui/src/builder/BuilderHeader.vue index 09cc97e85..a2be0bf07 100644 --- a/src/ui/src/builder/BuilderHeader.vue +++ b/src/ui/src/builder/BuilderHeader.vue @@ -49,7 +49,7 @@ variant="primary" size="small" class="panelToggler" - title="Toggle Code panel" + data-automation-action="toggle-code-panel" :class="{ active: ssbm.openPanels.has('code') }" @click="togglePanel('code')" > @@ -60,6 +60,7 @@ variant="primary" size="small" class="panelToggler" + data-automation-action="toggle-log-panel" title="Toggle Log panel" :class="{ active: ssbm.openPanels.has('log') }" @click="togglePanel('log')" diff --git a/src/ui/src/builder/BuilderLogWorkflowExecution.vue b/src/ui/src/builder/BuilderLogWorkflowExecution.vue index 6a5a3f266..62171723c 100644 --- a/src/ui/src/builder/BuilderLogWorkflowExecution.vue +++ b/src/ui/src/builder/BuilderLogWorkflowExecution.vue @@ -31,10 +31,10 @@
-
+
@@ -68,12 +68,8 @@

@@ -89,10 +85,13 @@ determine the result of 'Run workflow' blocks and 'Chat completion' tool calls.

-
+
@@ -111,15 +110,7 @@ " variant="secondary" size="small" - @click=" - () => - (displayedDetails = { - executionEnvironment: - item.executionEnvironment, - result: item.result, - returnValue: item.returnValue, - }) - " + @click="() => (displayedItemId = itemId)" > find_in_page Details @@ -151,7 +142,7 @@ const { goToComponentParentPage } = useComponentActions(wf, wfbm); const props = defineProps<{ executionLog: WorkflowExecutionLog; }>(); -const displayedDetails = ref(null); +const displayedItemId = ref(null); type EnrichedExecutionLog = WorkflowExecutionLog & { summary: { diff --git a/src/ui/src/builder/BuilderSettings.vue b/src/ui/src/builder/BuilderSettings.vue index d514b7ed1..679253caf 100644 --- a/src/ui/src/builder/BuilderSettings.vue +++ b/src/ui/src/builder/BuilderSettings.vue @@ -19,7 +19,7 @@ class="windowAction" tabindex="0" title="Close (Esc)" - data-automation-action="close" + data-automation-action="close-settings" @click="closeSettings" > close diff --git a/src/ui/src/builder/BuilderSidebarTree.vue b/src/ui/src/builder/BuilderSidebarTree.vue index 9b41af95d..c03a028af 100644 --- a/src/ui/src/builder/BuilderSidebarTree.vue +++ b/src/ui/src/builder/BuilderSidebarTree.vue @@ -50,11 +50,19 @@
- - diff --git a/src/ui/src/builder/BuilderSwitcher.vue b/src/ui/src/builder/BuilderSwitcher.vue index 2b0e4ccc2..7a3cf597d 100644 --- a/src/ui/src/builder/BuilderSwitcher.vue +++ b/src/ui/src/builder/BuilderSwitcher.vue @@ -1,11 +1,16 @@