Skip to content

Commit

Permalink
shu/get rid off observer cruise part3 (#1658)
Browse files Browse the repository at this point in the history
  • Loading branch information
wintonzheng authored Jan 27, 2025
1 parent b40c802 commit a5f2b68
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 25 deletions.
2 changes: 1 addition & 1 deletion skyvern/forge/sdk/executor/async_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ async def execute_cruise(

if background_tasks:
background_tasks.add_task(
observer_service.run_observer_cruise,
observer_service.run_observer_task,
organization=organization,
observer_cruise_id=observer_cruise_id,
max_iterations_override=max_iterations_override,
Expand Down
2 changes: 1 addition & 1 deletion skyvern/forge/sdk/routes/agent_protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -1152,7 +1152,7 @@ async def observer_task(
LOG.info("Overriding max iterations for observer", max_iterations_override=x_max_iterations_override)

try:
observer_task = await observer_service.initialize_observer_cruise(
observer_task = await observer_service.initialize_observer_task(
organization=organization,
user_prompt=data.user_prompt,
user_url=str(data.url) if data.url else None,
Expand Down
46 changes: 23 additions & 23 deletions skyvern/forge/sdk/services/observer_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ def _generate_data_extraction_schema_for_loop(loop_values_key: str) -> dict:
}


async def initialize_observer_cruise(
async def initialize_observer_task(
organization: Organization,
user_prompt: str,
user_url: str | None = None,
Expand All @@ -95,7 +95,7 @@ async def initialize_observer_cruise(
webhook_callback_url: str | None = None,
publish_workflow: bool = False,
) -> ObserverTask:
observer_cruise = await app.DATABASE.create_observer_cruise(
observer_task = await app.DATABASE.create_observer_cruise(
prompt=user_prompt,
organization_id=organization.organization_id,
totp_verification_url=totp_verification_url,
Expand All @@ -106,10 +106,10 @@ async def initialize_observer_cruise(
# set observer cruise id in context
context = skyvern_context.current()
if context:
context.observer_cruise_id = observer_cruise.observer_cruise_id
context.observer_cruise_id = observer_task.observer_cruise_id

observer_thought = await app.DATABASE.create_observer_thought(
observer_cruise_id=observer_cruise.observer_cruise_id,
observer_cruise_id=observer_task.observer_cruise_id,
organization_id=organization.organization_id,
observer_thought_type=ObserverThoughtType.metadata,
observer_thought_scenario=ObserverThoughtScenario.generate_metadata,
Expand Down Expand Up @@ -173,8 +173,8 @@ async def initialize_observer_cruise(

# update oserver cruise
try:
observer_cruise = await app.DATABASE.update_observer_cruise(
observer_cruise_id=observer_cruise.observer_cruise_id,
observer_task = await app.DATABASE.update_observer_cruise(
observer_cruise_id=observer_task.observer_cruise_id,
workflow_run_id=workflow_run.workflow_run_id,
workflow_id=new_workflow.workflow_id,
workflow_permanent_id=new_workflow.workflow_permanent_id,
Expand All @@ -190,10 +190,10 @@ async def initialize_observer_cruise(
)
raise

return observer_cruise
return observer_task


async def run_observer_cruise(
async def run_observer_task(
organization: Organization,
observer_cruise_id: str,
request_id: str | None = None,
Expand All @@ -210,14 +210,14 @@ async def run_observer_cruise(
organization_id=organization_id,
exc_info=True,
)
return await mark_observer_cruise_as_failed(observer_cruise_id, organization_id=organization_id)
return await mark_observer_task_as_failed(observer_cruise_id, organization_id=organization_id)
if not observer_task:
LOG.error("Observer cruise not found", observer_cruise_id=observer_cruise_id, organization_id=organization_id)
raise ObserverCruiseNotFound(observer_cruise_id=observer_cruise_id)

workflow, workflow_run = None, None
try:
workflow, workflow_run, observer_task = await run_observer_cruise_helper(
workflow, workflow_run, observer_task = await run_observer_task_helper(
organization=organization,
observer_task=observer_task,
request_id=request_id,
Expand All @@ -226,7 +226,7 @@ async def run_observer_cruise(
)
except OperationalError:
LOG.error("Database error when running observer cruise", exc_info=True)
observer_task = await mark_observer_cruise_as_failed(
observer_task = await mark_observer_task_as_failed(
observer_cruise_id,
workflow_run_id=observer_task.workflow_run_id,
failure_reason="Database error when running task 2.0",
Expand All @@ -235,7 +235,7 @@ async def run_observer_cruise(
except Exception as e:
LOG.error("Failed to run observer cruise", exc_info=True)
failure_reason = f"Failed to run task 2.0: {str(e)}"
observer_task = await mark_observer_cruise_as_failed(
observer_task = await mark_observer_task_as_failed(
observer_cruise_id,
workflow_run_id=observer_task.workflow_run_id,
failure_reason=failure_reason,
Expand All @@ -257,7 +257,7 @@ async def run_observer_cruise(
return observer_task


async def run_observer_cruise_helper(
async def run_observer_task_helper(
organization: Organization,
observer_task: ObserverTask,
request_id: str | None = None,
Expand Down Expand Up @@ -419,7 +419,7 @@ async def run_observer_cruise_helper(
iteration=i,
workflow_run_id=workflow_run_id,
)
observer_task = await _summarize_observer_cruise(
observer_task = await _summarize_observer_task(
observer_task=observer_task,
task_history=task_history,
context=context,
Expand Down Expand Up @@ -610,7 +610,7 @@ async def run_observer_cruise_helper(
workflow_run_id=workflow_run_id,
completion_resp=completion_resp,
)
observer_task = await _summarize_observer_cruise(
observer_task = await _summarize_observer_task(
observer_task=observer_task,
task_history=task_history,
context=context,
Expand All @@ -623,7 +623,7 @@ async def run_observer_cruise_helper(
max_iterations=max_iterations,
workflow_run_id=workflow_run_id,
)
observer_task = await mark_observer_cruise_as_failed(
observer_task = await mark_observer_task_as_failed(
observer_cruise_id=observer_cruise_id,
workflow_run_id=workflow_run_id,
# TODO: add a better failure reason with LLM
Expand Down Expand Up @@ -1087,7 +1087,7 @@ async def get_observer_cruise(observer_cruise_id: str, organization_id: str | No
return await app.DATABASE.get_observer_cruise(observer_cruise_id, organization_id=organization_id)


async def mark_observer_cruise_as_failed(
async def mark_observer_task_as_failed(
observer_cruise_id: str,
workflow_run_id: str | None = None,
failure_reason: str | None = None,
Expand All @@ -1102,11 +1102,11 @@ async def mark_observer_cruise_as_failed(
await app.WORKFLOW_SERVICE.mark_workflow_run_as_failed(
workflow_run_id, failure_reason=failure_reason or "Skyvern task 2.0 failed"
)
await send_observer_cruise_webhook(observer_task)
await send_observer_task_webhook(observer_task)
return observer_task


async def mark_observer_cruise_as_completed(
async def mark_observer_task_as_completed(
observer_cruise_id: str,
workflow_run_id: str | None = None,
organization_id: str | None = None,
Expand All @@ -1123,7 +1123,7 @@ async def mark_observer_cruise_as_completed(
if workflow_run_id:
await app.WORKFLOW_SERVICE.mark_workflow_run_as_completed(workflow_run_id)

await send_observer_cruise_webhook(observer_task)
await send_observer_task_webhook(observer_task)
return observer_task


Expand Down Expand Up @@ -1196,7 +1196,7 @@ def _get_extracted_data_from_block_result(
return None


async def _summarize_observer_cruise(
async def _summarize_observer_task(
observer_task: ObserverTask,
task_history: list[dict],
context: SkyvernContext,
Expand Down Expand Up @@ -1234,7 +1234,7 @@ async def _summarize_observer_cruise(
output=observer_summary_resp,
)

return await mark_observer_cruise_as_completed(
return await mark_observer_task_as_completed(
observer_cruise_id=observer_task.observer_cruise_id,
workflow_run_id=observer_task.workflow_run_id,
organization_id=observer_task.organization_id,
Expand All @@ -1243,7 +1243,7 @@ async def _summarize_observer_cruise(
)


async def send_observer_cruise_webhook(observer_task: ObserverTask) -> None:
async def send_observer_task_webhook(observer_task: ObserverTask) -> None:
if not observer_task.webhook_callback_url:
return
organization_id = observer_task.organization_id
Expand Down

0 comments on commit a5f2b68

Please sign in to comment.