Skip to content

Commit

Permalink
add update workflow functionality
Browse files Browse the repository at this point in the history
  • Loading branch information
andrii-i committed Sep 13, 2024
1 parent c276385 commit 2f6938b
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 16 deletions.
6 changes: 3 additions & 3 deletions jupyter_scheduler/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ class CreateJob(BaseModel):
output_filename_template: Optional[str] = OUTPUT_FILENAME_TEMPLATE
compute_type: Optional[str] = None
package_input_folder: Optional[bool] = None
depends_on: Optional[str] = None
depends_on: Optional[List[str]] = None
workflow_id: str = None

@root_validator
Expand Down Expand Up @@ -150,7 +150,7 @@ class DescribeJob(BaseModel):
downloaded: bool = False
package_input_folder: Optional[bool] = None
packaged_files: Optional[List[str]] = []
depends_on: Optional[str] = None
depends_on: Optional[List[str]] = None
workflow_id: str = None

class Config:
Expand Down Expand Up @@ -197,7 +197,7 @@ class UpdateJob(BaseModel):
status: Optional[Status] = None
name: Optional[str] = None
compute_type: Optional[str] = None
depends_on: Optional[str] = None
depends_on: Optional[List[str]] = None


class DeleteJob(BaseModel):
Expand Down
13 changes: 10 additions & 3 deletions jupyter_scheduler/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -570,13 +570,20 @@ def get_workflow(self, workflow_id: str) -> DescribeWorkflow:
return model

def create_workflow_task(self, workflow_id: str, model: CreateJob) -> str:
job_id = self.scheduler.create_job(model, run=False)
workflow: DescribeWorkflow = self.scheduler.get_workflow(workflow_id)
job_id = self.create_job(model, run=False)
workflow: DescribeWorkflow = self.get_workflow(workflow_id)
updated_tasks = (workflow.tasks or [])[:]
updated_tasks.append(job_id)
self.scheduler.update_workflow(workflow_id, UpdateWorkflow(depends_on=updated_tasks))
self.update_workflow(workflow_id, UpdateWorkflow(tasks=updated_tasks))
return job_id

def update_workflow(self, workflow_id: str, model: UpdateWorkflow):
with self.db_session() as session:
session.query(Workflow).filter(Workflow.workflow_id == workflow_id).update(
model.dict(exclude_none=True)
)
session.commit()

def update_job(self, job_id: str, model: UpdateJob):
with self.db_session() as session:
session.query(Job).filter(Job.job_id == job_id).update(model.dict(exclude_none=True))
Expand Down
12 changes: 2 additions & 10 deletions jupyter_scheduler/workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,16 +173,8 @@ class Config:


class UpdateWorkflow(BaseModel):
workflow_id: str
tasks: List[str] = None
status: Status = None
tasks: Optional[List[str]] = None
status: Optional[Status] = None

class Config:
orm_mode = True


class UpdateWorkflow(BaseModel):
status: Optional[Status] = None
name: Optional[str] = None
compute_type: Optional[str] = None
depends_on: Optional[str] = None

0 comments on commit 2f6938b

Please sign in to comment.