Skip to content

Commit

Permalink
add execute_workflow option to DefaultExecutionManager
Browse files Browse the repository at this point in the history
  • Loading branch information
andrii-i committed Sep 11, 2024
1 parent 71aa7fe commit e60d96e
Show file tree
Hide file tree
Showing 3 changed files with 6 additions and 13 deletions.
11 changes: 5 additions & 6 deletions jupyter_scheduler/executors.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,11 @@ class ExecutionManager(ABC):

def __init__(
self,
job_id: str,
workflow_id: str,
root_dir: str,
db_url: str,
staging_paths: Dict[str, str],
job_id: str = None,
workflow_id: str = None,
root_dir: str = None,
staging_paths: Dict[str, str] = None,
):
self.job_id = job_id
self.workflow_id = workflow_id
Expand Down Expand Up @@ -86,7 +86,6 @@ def process(self):
self.on_complete()

def process_workflow(self):

self.before_start_workflow()
try:
self.execute_workflow()
Expand Down Expand Up @@ -205,7 +204,7 @@ def get_task_data(self, task_ids: List[str] = []):

return tasks_data_obj

@flow()
@flow
def execute_workflow(self):

tasks_info = self.get_task_data()
Expand Down
3 changes: 0 additions & 3 deletions jupyter_scheduler/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -541,10 +541,7 @@ def create_workflow(self, model: CreateWorkflow) -> str:
session.commit()

execution_manager = self.execution_manager_class(
job_id="123",
staging_paths=dict(),
workflow_id=workflow.workflow_id,
root_dir=self.root_dir,
db_url=self.db_url,
)
execution_manager.process_workflow()
Expand Down
5 changes: 1 addition & 4 deletions jupyter_scheduler/workflows.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
import json
from typing import List
from typing import Dict, List

from jupyter_server.utils import ensure_async
from tornado.web import HTTPError, authenticated

from jupyter_scheduler.exceptions import (
IdempotencyTokenError,
InputUriError,
Expand All @@ -26,8 +25,6 @@ async def post(self):
workflow_id = await ensure_async(
self.scheduler.create_workflow(CreateWorkflow(**payload))
)
self.log.info(payload)
print(payload)
except ValidationError as e:
self.log.exception(e)
raise HTTPError(500, str(e)) from e
Expand Down

0 comments on commit e60d96e

Please sign in to comment.