Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Workflows backend prototype #542

Closed
wants to merge 44 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
abedadf
initialize dask cluster
andrii-i Sep 5, 2024
6c9de64
add workflow handler and endpoint
andrii-i Sep 9, 2024
71aa7fe
make execute_workflow a flow
andrii-i Sep 11, 2024
c8e0bf6
add execute_workflow option to DefaultExecutionManager
andrii-i Sep 11, 2024
b79b917
Update job model to be used with workflows
andrii-i Sep 11, 2024
4ef2373
Add workflow run and jobs endpoints
andrii-i Sep 11, 2024
d77310e
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Sep 11, 2024
c5da5b0
split create_workflow and run_workflow endpoints and methods
andrii-i Sep 11, 2024
c64a53b
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Sep 11, 2024
b33191c
add GET workflow/{id}
andrii-i Sep 12, 2024
b07a765
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Sep 12, 2024
044e3bb
add workflow/{workflow_id}/tasks endpoint
andrii-i Sep 12, 2024
5c351ac
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Sep 12, 2024
4aa3046
add create_workflow_task to handler and scheduler
andrii-i Sep 12, 2024
c276385
make CreateWorkflow.tasks optional
andrii-i Sep 12, 2024
2f6938b
add update workflow functionality
andrii-i Sep 13, 2024
3ae8997
execute notebook as task via DefaultExecutionManager
andrii-i Sep 16, 2024
b9c466b
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Sep 16, 2024
69b34ca
Use prefect task to execute jobs, download files. Add flow and run names
andrii-i Sep 16, 2024
f18071e
Remove Dask
andrii-i Sep 16, 2024
440dfd1
change depends_on and tasks data type to not limit the size
andrii-i Sep 23, 2024
6f9ec65
make workflow_id optional
andrii-i Sep 23, 2024
5a29d03
remove distributed dependency
andrii-i Sep 23, 2024
cdafb4f
don't require workflow_id in the task creation body, only as a part o…
andrii-i Sep 23, 2024
650a566
add status to the workflow models
andrii-i Sep 23, 2024
6466c48
add workflow definitions
andrii-i Sep 23, 2024
9994767
add workflow execution handlers and endpoints
andrii-i Oct 7, 2024
be6a01f
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Oct 7, 2024
417f392
serve workflow definition on schedule
andrii-i Oct 9, 2024
5a2621b
fix parameters passed to create_and_run_workflow when served with sch…
andrii-i Oct 14, 2024
87306a8
change worklow_definitions/{id}/activate endpoint to worklow_definiti…
andrii-i Oct 14, 2024
e35607f
add name, parameters fields to Workflow and WorkflowDefinition
andrii-i Oct 14, 2024
78e0818
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Oct 14, 2024
e9d22e1
replace prefect with dask
andrii-i Oct 18, 2024
14f5740
add workflow runner
andrii-i Oct 18, 2024
9321d41
use dask instead of prefect
andrii-i Oct 18, 2024
2b25c01
add trigger_rule enum, field to Jobs and JobDefinitions
andrii-i Oct 18, 2024
c2bcf13
add DRAFT Status enum and set it as a default workflow status
andrii-i Oct 22, 2024
6e162cb
add DEPLOYED status
andrii-i Oct 24, 2024
8866d24
rename activate_workflow_definition into deploy_workflow_definition
andrii-i Oct 24, 2024
d49f930
add create_time to Workflow model
andrii-i Oct 28, 2024
ee9e067
create and run workflow definitions including on schedule
andrii-i Oct 28, 2024
75e0e37
Merge branch 'jupyter-server:main' into workflows-backend
andrii-i Oct 29, 2024
97ac530
add more print statements
andrii-i Oct 29, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
149 changes: 145 additions & 4 deletions jupyter_scheduler/executors.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,30 @@
import io
import multiprocessing as mp
import os
import shutil
import tarfile
import traceback
from abc import ABC, abstractmethod
from typing import Dict
from functools import lru_cache
from pathlib import Path
from typing import Dict, List

import dask
import fsspec
import nbconvert
import nbformat
from nbconvert.preprocessors import CellExecutionError, ExecutePreprocessor

from jupyter_scheduler.models import DescribeJob, JobFeature, Status
from jupyter_scheduler.orm import Job, create_session
from jupyter_scheduler.models import CreateJob, DescribeJob, JobFeature, Status
from jupyter_scheduler.orm import Job, Workflow, WorkflowDefinition, create_session
from jupyter_scheduler.parameterize import add_parameters
from jupyter_scheduler.scheduler import Scheduler
from jupyter_scheduler.utils import get_utc_timestamp
from jupyter_scheduler.workflows import (
CreateWorkflow,
DescribeWorkflow,
DescribeWorkflowDefinition,
)


class ExecutionManager(ABC):
Expand All @@ -29,14 +39,42 @@ class ExecutionManager(ABC):
_model = None
_db_session = None

def __init__(self, job_id: str, root_dir: str, db_url: str, staging_paths: Dict[str, str]):
def __init__(
self,
db_url: str,
job_id: str = None,
workflow_id: str = None,
workflow_definition_id: str = None,
root_dir: str = None,
staging_paths: Dict[str, str] = None,
):
self.job_id = job_id
self.workflow_id = workflow_id
self.workflow_definition_id = workflow_definition_id
self.staging_paths = staging_paths
self.root_dir = root_dir
self.db_url = db_url

@property
def model(self):
if self.workflow_id:
with self.db_session() as session:
workflow = (
session.query(Workflow).filter(Workflow.workflow_id == self.workflow_id).first()
)
self._model = DescribeWorkflow.from_orm(workflow)
return self._model
if self.workflow_definition_id:
with self.db_session() as session:
workflow_definition = (
session.query(WorkflowDefinition)
.filter(
WorkflowDefinition.workflow_definition_id == self.workflow_definition_id
)
.first()
)
self._model = DescribeWorkflowDefinition.from_orm(workflow_definition)
return self._model
if self._model is None:
with self.db_session() as session:
job = session.query(Job).filter(Job.job_id == self.job_id).first()
Expand Down Expand Up @@ -65,6 +103,18 @@ def process(self):
else:
self.on_complete()

def process_workflow(self):
print(f"calling ExecutionManager(ABC).process_workflow for {self.model}")
self.before_start_workflow()
try:
self.execute_workflow()
except CellExecutionError as e:
self.on_failure_workflow(e)
except Exception as e:
self.on_failure_workflow(e)
else:
self.on_complete_workflow()

@abstractmethod
def execute(self):
"""Performs notebook execution,
Expand All @@ -74,6 +124,11 @@ def execute(self):
"""
pass

@abstractmethod
def execute_workflow(self):
"""Performs workflow execution"""
pass

@classmethod
@abstractmethod
def supported_features(cls) -> Dict[JobFeature, bool]:
Expand All @@ -98,6 +153,16 @@ def before_start(self):
)
session.commit()

def before_start_workflow(self):
"""Called before start of execute"""
print(f"calling ExecutionManager(ABC).before_start_workflow for {self.model}")
workflow = self.model
with self.db_session() as session:
session.query(Workflow).filter(Workflow.workflow_id == workflow.workflow_id).update(
{"status": Status.IN_PROGRESS}
)
session.commit()

def on_failure(self, e: Exception):
"""Called after failure of execute"""
job = self.model
Expand All @@ -109,6 +174,17 @@ def on_failure(self, e: Exception):

traceback.print_exc()

def on_failure_workflow(self, e: Exception):
"""Called after failure of execute"""
workflow = self.model
with self.db_session() as session:
session.query(Workflow).filter(Workflow.workflow_id == workflow.workflow_id).update(
{"status": Status.FAILED, "status_message": str(e)}
)
session.commit()

traceback.print_exc()

def on_complete(self):
"""Called after job is completed"""
job = self.model
Expand All @@ -118,10 +194,60 @@ def on_complete(self):
)
session.commit()

def on_complete_workflow(self):
workflow = self.model
with self.db_session() as session:
session.query(Workflow).filter(Workflow.workflow_id == workflow.workflow_id).update(
{"status": Status.COMPLETED}
)
session.commit()


class DefaultExecutionManager(ExecutionManager):
"""Default execution manager that executes notebooks"""

def get_tasks_records(self, task_ids: List[str]) -> List[Job]:
print(f"getting task records for task: {task_ids}")
with self.db_session() as session:
tasks = session.query(Job).filter(Job.job_id.in_(task_ids)).all()
print(f"gotten task records for task {task_ids}: {tasks}")

return tasks

# @dask.delayed(name="Execute workflow")
def execute_workflow(self):
tasks_info: List[Job] = self.get_tasks_records(self.model.tasks)
print(f"tasks_info in execute_workflow: {tasks_info}")
tasks = {task.job_id: task for task in tasks_info}
print(f"tasks in execute_workflow: {tasks}")

@lru_cache(maxsize=None)
def make_task(task_id):
"""Create a delayed object for the given task recursively creating delayed objects for all tasks it depends on"""
print("making task for")
print(task_id)
deps = tasks[task_id].depends_on or []
print(deps)
print(f"dependencies in make_task for {task_id}")
print(deps)

execute_task_delayed = execute_task(
job=tasks[task_id],
root_dir=self.root_dir,
db_url=self.db_url,
dependencies=[make_task(dep_id) for dep_id in deps],
)
print("execute task result from make_task")
print(execute_task_delayed)

return execute_task_delayed

final_tasks = [make_task(task_id) for task_id in tasks]
print("Final tasks:")
print(final_tasks)
print(f"Calling compute after loops")
dask.compute(*final_tasks)

def execute(self):
job = self.model

Expand All @@ -144,6 +270,7 @@ def execute(self):
self.add_side_effects_files(staging_dir)
self.create_output_files(job, nb)

# @dask.delayed(name="Check for and add side effect files")
def add_side_effects_files(self, staging_dir: str):
"""Scan for side effect files potentially created after input file execution and update the job's packaged_files with these files"""
input_notebook = os.path.relpath(self.staging_paths["input"])
Expand All @@ -166,6 +293,7 @@ def add_side_effects_files(self, staging_dir: str):
)
session.commit()

# @dask.delayed(name="Create output files")
def create_output_files(self, job: DescribeJob, notebook_node):
for output_format in job.output_formats:
cls = nbconvert.get_exporter(output_format)
Expand Down Expand Up @@ -201,6 +329,19 @@ def validate(cls, input_path: str) -> bool:
return True


@dask.delayed(name="Execute workflow task")
def execute_task(job: Job, root_dir: str, db_url: str, dependencies: List[str] = []):
print(f"executing task {job.job_id} with dependencies {dependencies}")
staging_paths = Scheduler.get_staging_paths(DescribeJob.from_orm(job))
process_job = DefaultExecutionManager(
job_id=job.job_id,
staging_paths=staging_paths,
root_dir=root_dir,
db_url=db_url,
).process
return process_job()


class ArchivingExecutionManager(DefaultExecutionManager):
"""Execution manager that archives all output files in and under the
output directory into a single archive file
Expand Down
60 changes: 60 additions & 0 deletions jupyter_scheduler/extension.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,14 @@
from traitlets import Bool, Type, Unicode, default

from jupyter_scheduler.orm import create_tables
from jupyter_scheduler.workflows import (
WorkflowDefinitionsDeploymentHandler,
WorkflowDefinitionsHandler,
WorkflowDefinitionsTasksHandler,
WorkflowsHandler,
WorkflowsRunHandler,
WorkflowsTasksHandler,
)

from .handlers import (
BatchJobHandler,
Expand All @@ -20,6 +28,8 @@

JOB_DEFINITION_ID_REGEX = r"(?P<job_definition_id>\w+(?:-\w+)+)"
JOB_ID_REGEX = r"(?P<job_id>\w+(?:-\w+)+)"
WORKFLOW_DEFINITION_ID_REGEX = r"(?P<workflow_definition_id>\w+(?:-\w+)+)"
WORKFLOW_ID_REGEX = r"(?P<workflow_id>\w+(?:-\w+)+)"


class SchedulerApp(ExtensionApp):
Expand All @@ -35,6 +45,29 @@ class SchedulerApp(ExtensionApp):
(r"scheduler/job_definitions/%s/jobs" % JOB_DEFINITION_ID_REGEX, JobFromDefinitionHandler),
(r"scheduler/runtime_environments", RuntimeEnvironmentsHandler),
(r"scheduler/config", ConfigHandler),
(r"scheduler/worklows", WorkflowsHandler),
(rf"scheduler/worklows/{WORKFLOW_ID_REGEX}", WorkflowsHandler),
(
rf"scheduler/worklows/{WORKFLOW_ID_REGEX}/run",
WorkflowsRunHandler,
),
(
rf"scheduler/worklows/{WORKFLOW_ID_REGEX}/tasks",
WorkflowsTasksHandler,
),
(r"scheduler/worklow_definitions", WorkflowDefinitionsHandler),
(
rf"scheduler/worklow_definitions/{WORKFLOW_DEFINITION_ID_REGEX}",
WorkflowDefinitionsHandler,
),
(
rf"scheduler/worklow_definitions/{WORKFLOW_DEFINITION_ID_REGEX}/deploy",
WorkflowDefinitionsDeploymentHandler,
),
(
rf"scheduler/worklow_definitions/{WORKFLOW_DEFINITION_ID_REGEX}/tasks",
WorkflowDefinitionsTasksHandler,
),
]

drop_tables = Bool(False, config=True, help="Drop the database tables before starting.")
Expand Down Expand Up @@ -91,3 +124,30 @@ def initialize_settings(self):
if scheduler.task_runner:
loop = asyncio.get_event_loop()
loop.create_task(scheduler.task_runner.start())

if scheduler.workflow_runner:
loop = asyncio.get_event_loop()
loop.create_task(scheduler.workflow_runner.start())

async def stop_extension(self):
"""
Public method called by Jupyter Server when the server is stopping.
This calls the cleanup code defined in `self._stop_exception()` inside
an exception handler, as the server halts if this method raises an
exception.
"""
try:
await self._stop_extension()
except Exception as e:
self.log.error("Jupyter Scheduler raised an exception while stopping:")

self.log.exception(e)

async def _stop_extension(self):
"""
Private method that defines the cleanup code to run when the server is
stopping.
"""
if "scheduler" in self.settings:
scheduler: SchedulerApp = self.settings["scheduler"]
await scheduler.stop_extension()
21 changes: 10 additions & 11 deletions jupyter_scheduler/job_files_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from multiprocessing import Process
from typing import Dict, List, Optional, Type

import dask
import fsspec
from jupyter_server.utils import ensure_async

Expand All @@ -23,17 +24,14 @@ async def copy_from_staging(self, job_id: str, redownload: Optional[bool] = Fals
output_filenames = self.scheduler.get_job_filenames(job)
output_dir = self.scheduler.get_local_output_path(model=job, root_dir_relative=True)

p = Process(
target=Downloader(
output_formats=job.output_formats,
output_filenames=output_filenames,
staging_paths=staging_paths,
output_dir=output_dir,
redownload=redownload,
include_staging_files=job.package_input_folder,
).download
)
p.start()
target = Downloader(
output_formats=job.output_formats,
output_filenames=output_filenames,
staging_paths=staging_paths,
output_dir=output_dir,
redownload=redownload,
include_staging_files=job.package_input_folder,
).download


class Downloader:
Expand Down Expand Up @@ -77,6 +75,7 @@ def download_tar(self, archive_format: str = "tar"):
with tarfile.open(fileobj=f, mode=read_mode) as tar:
tar.extractall(self.output_dir)

# @dask.delayed(name="Download job files")
def download(self):
# ensure presence of staging paths
if not self.staging_paths:
Expand Down
Loading
Loading