diff --git a/jupyter_scheduler/executors.py b/jupyter_scheduler/executors.py index 81aef3d9..7be02242 100644 --- a/jupyter_scheduler/executors.py +++ b/jupyter_scheduler/executors.py @@ -14,7 +14,6 @@ from nbconvert.preprocessors import CellExecutionError, ExecutePreprocessor from prefect import flow, task from prefect.futures import as_completed -from prefect_dask.task_runners import DaskTaskRunner from jupyter_scheduler.models import CreateJob, DescribeJob, JobFeature, Status from jupyter_scheduler.orm import Job, Workflow, create_session diff --git a/jupyter_scheduler/extension.py b/jupyter_scheduler/extension.py index d06e6dc0..086fc34a 100644 --- a/jupyter_scheduler/extension.py +++ b/jupyter_scheduler/extension.py @@ -107,25 +107,3 @@ def initialize_settings(self): if scheduler.task_runner: loop = asyncio.get_event_loop() loop.create_task(scheduler.task_runner.start()) - - async def stop_extension(self): - """ - Public method called by Jupyter Server when the server is stopping. - This calls the cleanup code defined in `self._stop_exception()` inside - an exception handler, as the server halts if this method raises an - exception. - """ - try: - await self._stop_extension() - except Exception as e: - self.log.error("Jupyter Scheduler raised an exception while stopping:") - self.log.exception(e) - - async def _stop_extension(self): - """ - Private method that defines the cleanup code to run when the server is - stopping. - """ - if "scheduler" in self.settings: - scheduler: SchedulerApp = self.settings["scheduler"] - await scheduler.stop_extension() diff --git a/jupyter_scheduler/scheduler.py b/jupyter_scheduler/scheduler.py index f2745ca0..724e18b0 100644 --- a/jupyter_scheduler/scheduler.py +++ b/jupyter_scheduler/scheduler.py @@ -5,7 +5,6 @@ import fsspec import psutil -from dask.distributed import Client as DaskClient from distributed import LocalCluster from jupyter_core.paths import jupyter_data_dir from jupyter_server.transutils import _i18n @@ -399,12 +398,6 @@ def get_local_output_path( else: return os.path.join(self.root_dir, self.output_directory, output_dir_name) - async def stop_extension(self): - """ - Placeholder method for a cleanup code to run when the server is stopping. - """ - pass - class Scheduler(BaseScheduler): _db_session = None @@ -419,12 +412,6 @@ class Scheduler(BaseScheduler): ), ) - dask_cluster_url = Unicode( - allow_none=True, - config=True, - help="URL of the Dask cluster to connect to.", - ) - db_url = Unicode(help=_i18n("Scheduler database url")) task_runner = Instance(allow_none=True, klass="jupyter_scheduler.task_runner.BaseTaskRunner") @@ -444,15 +431,6 @@ def __init__( if self.task_runner_class: self.task_runner = self.task_runner_class(scheduler=self, config=config) - self.dask_client: DaskClient = self._get_dask_client() - - def _get_dask_client(self): - """Creates and configures a Dask client.""" - if self.dask_cluster_url: - return DaskClient(self.dask_cluster_url) - cluster = LocalCluster(processes=True) - return DaskClient(cluster) - @property def db_session(self): if not self._db_session: @@ -875,13 +853,6 @@ def get_staging_paths(model: Union[DescribeJob, DescribeJobDefinition]) -> Dict[ return staging_paths - async def stop_extension(self): - """ - Cleanup code to run when the server is stopping. - """ - if self.dask_client: - await self.dask_client.close() - class ArchivingScheduler(Scheduler): """Scheduler that captures all files in output directory in an archive.""" diff --git a/jupyter_scheduler/workflows.py b/jupyter_scheduler/workflows.py index be1e231f..ec45b00e 100644 --- a/jupyter_scheduler/workflows.py +++ b/jupyter_scheduler/workflows.py @@ -63,7 +63,6 @@ async def get(self, workflow_id: str = None): class WorkflowsTasksHandler(ExtensionHandlerMixin, JobHandlersMixin, APIHandler): @authenticated async def post(self, workflow_id: str): - print("WorkflowsTasksHandler post") payload = self.get_json_body() if workflow_id != payload.get("workflow_id"): raise HTTPError(