Skip to content

Commit

Permalink
run a job via executor
Browse files Browse the repository at this point in the history
  • Loading branch information
andrii-i committed Sep 12, 2024
1 parent 4aa3046 commit 2b8de36
Showing 1 changed file with 41 additions and 17 deletions.
58 changes: 41 additions & 17 deletions jupyter_scheduler/executors.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import shutil
import tarfile
import traceback
import multiprocessing as mp
from abc import ABC, abstractmethod
from functools import lru_cache
from typing import Dict, List
Expand All @@ -15,7 +16,7 @@
from prefect.futures import as_completed
from prefect_dask.task_runners import DaskTaskRunner

from jupyter_scheduler.models import DescribeJob, JobFeature, Status
from jupyter_scheduler.models import CreateJob, DescribeJob, JobFeature, Status
from jupyter_scheduler.orm import Job, Workflow, create_session
from jupyter_scheduler.parameterize import add_parameters
from jupyter_scheduler.utils import get_utc_timestamp
Expand Down Expand Up @@ -187,35 +188,58 @@ class DefaultExecutionManager(ExecutionManager):
"""Default execution manager that executes notebooks"""

@task(task_run_name="{task_id}")
def execute_task(self, task_id: str):
print(f"Task {task_id} executed")
return task_id
def execute_task(self, job: Job):

# The MP context forces new processes to not be forked on Linux.
# This is necessary because `asyncio.get_event_loop()` is bugged in
# forked processes in Python versions below 3.12. This method is
# called by `jupyter_core` by `nbconvert` in the default executor.
#
# See: https://github.com/python/cpython/issues/66285
# See also: https://github.com/jupyter/jupyter_core/pull/362
mp_ctx = mp.get_context("spawn")
p = mp_ctx.Process(
target=self.execution_manager_class(
job_id=job.job_id,
staging_paths=self.staging_paths,
root_dir=self.root_dir,
db_url=self.db_url,
).process
)
p.start()

return job.job_id

@task
def get_task_data(self, task_ids: List[str] = []):
def get_task_data(self, task_ids: List[str]) -> List[Job]:
# TODO: get orm objects from Task table of the db, create DescribeTask for each
tasks_data_obj = [
{"id": "task0", "dependsOn": ["task3"]},
{"id": "task4", "dependsOn": ["task0", "task1", "task2", "task3"]},
{"id": "task1", "dependsOn": []},
{"id": "task2", "dependsOn": ["task1"]},
{"id": "task3", "dependsOn": ["task1", "task2"]},
]
# tasks_data_obj = [
# {"id": "task0", "dependsOn": ["task3"]},
# {"id": "task4", "dependsOn": ["task0", "task1", "task2", "task3"]},
# {"id": "task1", "dependsOn": []},
# {"id": "task2", "dependsOn": ["task1"]},
# {"id": "task3", "dependsOn": ["task1", "task2"]},
# ]
tasks = []
with self.db_session() as session:
for task_id in task_ids:
job = session.query(Job).filter(Job.job_id == task_id).first()
tasks.append(job)

return tasks_data_obj
return tasks

@flow
def execute_workflow(self):

tasks_info = self.get_task_data()
tasks = {task["id"]: task for task in tasks_info}
tasks_info: List[Job] = self.get_task_data(self.model.tasks)
tasks = {task.job_id: task for task in tasks_info}

# create Prefect tasks, use caching to ensure Prefect tasks are created before wait_for is called on them
@lru_cache(maxsize=None)
def make_task(task_id):
deps = tasks[task_id]["dependsOn"]
deps = tasks[task_id].depends_on
return self.execute_task.submit(
task_id, wait_for=[make_task(dep_id) for dep_id in deps]
tasks[task_id], wait_for=[make_task(dep_id) for dep_id in deps]
)

final_tasks = [make_task(task_id) for task_id in tasks]
Expand Down

0 comments on commit 2b8de36

Please sign in to comment.