From a5f2b68b4d105d0a5a4d00047641fe86c6c98a40 Mon Sep 17 00:00:00 2001 From: Shuchang Zheng Date: Mon, 27 Jan 2025 22:24:32 +0800 Subject: [PATCH] shu/get rid off observer cruise part3 (#1658) --- skyvern/forge/sdk/executor/async_executor.py | 2 +- skyvern/forge/sdk/routes/agent_protocol.py | 2 +- .../forge/sdk/services/observer_service.py | 46 +++++++++---------- 3 files changed, 25 insertions(+), 25 deletions(-) diff --git a/skyvern/forge/sdk/executor/async_executor.py b/skyvern/forge/sdk/executor/async_executor.py index 342bdb80fc..cb80bf30b6 100644 --- a/skyvern/forge/sdk/executor/async_executor.py +++ b/skyvern/forge/sdk/executor/async_executor.py @@ -176,7 +176,7 @@ async def execute_cruise( if background_tasks: background_tasks.add_task( - observer_service.run_observer_cruise, + observer_service.run_observer_task, organization=organization, observer_cruise_id=observer_cruise_id, max_iterations_override=max_iterations_override, diff --git a/skyvern/forge/sdk/routes/agent_protocol.py b/skyvern/forge/sdk/routes/agent_protocol.py index 2533e3db6e..df0b2fac84 100644 --- a/skyvern/forge/sdk/routes/agent_protocol.py +++ b/skyvern/forge/sdk/routes/agent_protocol.py @@ -1152,7 +1152,7 @@ async def observer_task( LOG.info("Overriding max iterations for observer", max_iterations_override=x_max_iterations_override) try: - observer_task = await observer_service.initialize_observer_cruise( + observer_task = await observer_service.initialize_observer_task( organization=organization, user_prompt=data.user_prompt, user_url=str(data.url) if data.url else None, diff --git a/skyvern/forge/sdk/services/observer_service.py b/skyvern/forge/sdk/services/observer_service.py index dc87db1d9d..4ea7ff5077 100644 --- a/skyvern/forge/sdk/services/observer_service.py +++ b/skyvern/forge/sdk/services/observer_service.py @@ -85,7 +85,7 @@ def _generate_data_extraction_schema_for_loop(loop_values_key: str) -> dict: } -async def initialize_observer_cruise( +async def initialize_observer_task( organization: Organization, user_prompt: str, user_url: str | None = None, @@ -95,7 +95,7 @@ async def initialize_observer_cruise( webhook_callback_url: str | None = None, publish_workflow: bool = False, ) -> ObserverTask: - observer_cruise = await app.DATABASE.create_observer_cruise( + observer_task = await app.DATABASE.create_observer_cruise( prompt=user_prompt, organization_id=organization.organization_id, totp_verification_url=totp_verification_url, @@ -106,10 +106,10 @@ async def initialize_observer_cruise( # set observer cruise id in context context = skyvern_context.current() if context: - context.observer_cruise_id = observer_cruise.observer_cruise_id + context.observer_cruise_id = observer_task.observer_cruise_id observer_thought = await app.DATABASE.create_observer_thought( - observer_cruise_id=observer_cruise.observer_cruise_id, + observer_cruise_id=observer_task.observer_cruise_id, organization_id=organization.organization_id, observer_thought_type=ObserverThoughtType.metadata, observer_thought_scenario=ObserverThoughtScenario.generate_metadata, @@ -173,8 +173,8 @@ async def initialize_observer_cruise( # update oserver cruise try: - observer_cruise = await app.DATABASE.update_observer_cruise( - observer_cruise_id=observer_cruise.observer_cruise_id, + observer_task = await app.DATABASE.update_observer_cruise( + observer_cruise_id=observer_task.observer_cruise_id, workflow_run_id=workflow_run.workflow_run_id, workflow_id=new_workflow.workflow_id, workflow_permanent_id=new_workflow.workflow_permanent_id, @@ -190,10 +190,10 @@ async def initialize_observer_cruise( ) raise - return observer_cruise + return observer_task -async def run_observer_cruise( +async def run_observer_task( organization: Organization, observer_cruise_id: str, request_id: str | None = None, @@ -210,14 +210,14 @@ async def run_observer_cruise( organization_id=organization_id, exc_info=True, ) - return await mark_observer_cruise_as_failed(observer_cruise_id, organization_id=organization_id) + return await mark_observer_task_as_failed(observer_cruise_id, organization_id=organization_id) if not observer_task: LOG.error("Observer cruise not found", observer_cruise_id=observer_cruise_id, organization_id=organization_id) raise ObserverCruiseNotFound(observer_cruise_id=observer_cruise_id) workflow, workflow_run = None, None try: - workflow, workflow_run, observer_task = await run_observer_cruise_helper( + workflow, workflow_run, observer_task = await run_observer_task_helper( organization=organization, observer_task=observer_task, request_id=request_id, @@ -226,7 +226,7 @@ async def run_observer_cruise( ) except OperationalError: LOG.error("Database error when running observer cruise", exc_info=True) - observer_task = await mark_observer_cruise_as_failed( + observer_task = await mark_observer_task_as_failed( observer_cruise_id, workflow_run_id=observer_task.workflow_run_id, failure_reason="Database error when running task 2.0", @@ -235,7 +235,7 @@ async def run_observer_cruise( except Exception as e: LOG.error("Failed to run observer cruise", exc_info=True) failure_reason = f"Failed to run task 2.0: {str(e)}" - observer_task = await mark_observer_cruise_as_failed( + observer_task = await mark_observer_task_as_failed( observer_cruise_id, workflow_run_id=observer_task.workflow_run_id, failure_reason=failure_reason, @@ -257,7 +257,7 @@ async def run_observer_cruise( return observer_task -async def run_observer_cruise_helper( +async def run_observer_task_helper( organization: Organization, observer_task: ObserverTask, request_id: str | None = None, @@ -419,7 +419,7 @@ async def run_observer_cruise_helper( iteration=i, workflow_run_id=workflow_run_id, ) - observer_task = await _summarize_observer_cruise( + observer_task = await _summarize_observer_task( observer_task=observer_task, task_history=task_history, context=context, @@ -610,7 +610,7 @@ async def run_observer_cruise_helper( workflow_run_id=workflow_run_id, completion_resp=completion_resp, ) - observer_task = await _summarize_observer_cruise( + observer_task = await _summarize_observer_task( observer_task=observer_task, task_history=task_history, context=context, @@ -623,7 +623,7 @@ async def run_observer_cruise_helper( max_iterations=max_iterations, workflow_run_id=workflow_run_id, ) - observer_task = await mark_observer_cruise_as_failed( + observer_task = await mark_observer_task_as_failed( observer_cruise_id=observer_cruise_id, workflow_run_id=workflow_run_id, # TODO: add a better failure reason with LLM @@ -1087,7 +1087,7 @@ async def get_observer_cruise(observer_cruise_id: str, organization_id: str | No return await app.DATABASE.get_observer_cruise(observer_cruise_id, organization_id=organization_id) -async def mark_observer_cruise_as_failed( +async def mark_observer_task_as_failed( observer_cruise_id: str, workflow_run_id: str | None = None, failure_reason: str | None = None, @@ -1102,11 +1102,11 @@ async def mark_observer_cruise_as_failed( await app.WORKFLOW_SERVICE.mark_workflow_run_as_failed( workflow_run_id, failure_reason=failure_reason or "Skyvern task 2.0 failed" ) - await send_observer_cruise_webhook(observer_task) + await send_observer_task_webhook(observer_task) return observer_task -async def mark_observer_cruise_as_completed( +async def mark_observer_task_as_completed( observer_cruise_id: str, workflow_run_id: str | None = None, organization_id: str | None = None, @@ -1123,7 +1123,7 @@ async def mark_observer_cruise_as_completed( if workflow_run_id: await app.WORKFLOW_SERVICE.mark_workflow_run_as_completed(workflow_run_id) - await send_observer_cruise_webhook(observer_task) + await send_observer_task_webhook(observer_task) return observer_task @@ -1196,7 +1196,7 @@ def _get_extracted_data_from_block_result( return None -async def _summarize_observer_cruise( +async def _summarize_observer_task( observer_task: ObserverTask, task_history: list[dict], context: SkyvernContext, @@ -1234,7 +1234,7 @@ async def _summarize_observer_cruise( output=observer_summary_resp, ) - return await mark_observer_cruise_as_completed( + return await mark_observer_task_as_completed( observer_cruise_id=observer_task.observer_cruise_id, workflow_run_id=observer_task.workflow_run_id, organization_id=observer_task.organization_id, @@ -1243,7 +1243,7 @@ async def _summarize_observer_cruise( ) -async def send_observer_cruise_webhook(observer_task: ObserverTask) -> None: +async def send_observer_task_webhook(observer_task: ObserverTask) -> None: if not observer_task.webhook_callback_url: return organization_id = observer_task.organization_id