From 2f6938b40f8835c8bb500761cc203194a1eb89de Mon Sep 17 00:00:00 2001 From: Andrii Ieroshenko Date: Fri, 13 Sep 2024 14:20:05 -0700 Subject: [PATCH] add update workflow functionality --- jupyter_scheduler/models.py | 6 +++--- jupyter_scheduler/scheduler.py | 13 ++++++++++--- jupyter_scheduler/workflows.py | 12 ++---------- 3 files changed, 15 insertions(+), 16 deletions(-) diff --git a/jupyter_scheduler/models.py b/jupyter_scheduler/models.py index d118673f..d8ad3d62 100644 --- a/jupyter_scheduler/models.py +++ b/jupyter_scheduler/models.py @@ -86,7 +86,7 @@ class CreateJob(BaseModel): output_filename_template: Optional[str] = OUTPUT_FILENAME_TEMPLATE compute_type: Optional[str] = None package_input_folder: Optional[bool] = None - depends_on: Optional[str] = None + depends_on: Optional[List[str]] = None workflow_id: str = None @root_validator @@ -150,7 +150,7 @@ class DescribeJob(BaseModel): downloaded: bool = False package_input_folder: Optional[bool] = None packaged_files: Optional[List[str]] = [] - depends_on: Optional[str] = None + depends_on: Optional[List[str]] = None workflow_id: str = None class Config: @@ -197,7 +197,7 @@ class UpdateJob(BaseModel): status: Optional[Status] = None name: Optional[str] = None compute_type: Optional[str] = None - depends_on: Optional[str] = None + depends_on: Optional[List[str]] = None class DeleteJob(BaseModel): diff --git a/jupyter_scheduler/scheduler.py b/jupyter_scheduler/scheduler.py index 5aece2be..0906c62b 100644 --- a/jupyter_scheduler/scheduler.py +++ b/jupyter_scheduler/scheduler.py @@ -570,13 +570,20 @@ def get_workflow(self, workflow_id: str) -> DescribeWorkflow: return model def create_workflow_task(self, workflow_id: str, model: CreateJob) -> str: - job_id = self.scheduler.create_job(model, run=False) - workflow: DescribeWorkflow = self.scheduler.get_workflow(workflow_id) + job_id = self.create_job(model, run=False) + workflow: DescribeWorkflow = self.get_workflow(workflow_id) updated_tasks = (workflow.tasks or [])[:] updated_tasks.append(job_id) - self.scheduler.update_workflow(workflow_id, UpdateWorkflow(depends_on=updated_tasks)) + self.update_workflow(workflow_id, UpdateWorkflow(tasks=updated_tasks)) return job_id + def update_workflow(self, workflow_id: str, model: UpdateWorkflow): + with self.db_session() as session: + session.query(Workflow).filter(Workflow.workflow_id == workflow_id).update( + model.dict(exclude_none=True) + ) + session.commit() + def update_job(self, job_id: str, model: UpdateJob): with self.db_session() as session: session.query(Job).filter(Job.job_id == job_id).update(model.dict(exclude_none=True)) diff --git a/jupyter_scheduler/workflows.py b/jupyter_scheduler/workflows.py index 33388c4b..be1e231f 100644 --- a/jupyter_scheduler/workflows.py +++ b/jupyter_scheduler/workflows.py @@ -173,16 +173,8 @@ class Config: class UpdateWorkflow(BaseModel): - workflow_id: str - tasks: List[str] = None - status: Status = None + tasks: Optional[List[str]] = None + status: Optional[Status] = None class Config: orm_mode = True - - -class UpdateWorkflow(BaseModel): - status: Optional[Status] = None - name: Optional[str] = None - compute_type: Optional[str] = None - depends_on: Optional[str] = None