From 06fbb850c152c98cbd7b04f694dea3434fbffd6b Mon Sep 17 00:00:00 2001 From: saranyaloganathan23 Date: Wed, 16 Oct 2024 12:25:22 +0530 Subject: [PATCH 1/9] gsutil changes --- dataproc_jupyter_plugin/commons/constants.py | 2 +- dataproc_jupyter_plugin/services/airflow.py | 18 ++-- dataproc_jupyter_plugin/services/executor.py | 91 ++++++++++---------- 3 files changed, 61 insertions(+), 50 deletions(-) diff --git a/dataproc_jupyter_plugin/commons/constants.py b/dataproc_jupyter_plugin/commons/constants.py index 354fef69..7483a469 100644 --- a/dataproc_jupyter_plugin/commons/constants.py +++ b/dataproc_jupyter_plugin/commons/constants.py @@ -51,7 +51,7 @@ # DAG run IDs are largely free-form, but we still enforce some sanity checking # on them in case the generated ID might cause issues with how we generate # output file names. -DAG_RUN_ID_REGEXP = re.compile("[a-zA-Z0-9_:\\+-]+") +DAG_RUN_ID_REGEXP = re.compile("[a-zA-Z0-9_:\\+.-]+") # This matches the requirements set by the scheduler form. AIRFLOW_JOB_REGEXP = re.compile("[a-zA-Z0-9_-]+") diff --git a/dataproc_jupyter_plugin/services/airflow.py b/dataproc_jupyter_plugin/services/airflow.py index 67dace05..7c4763c0 100644 --- a/dataproc_jupyter_plugin/services/airflow.py +++ b/dataproc_jupyter_plugin/services/airflow.py @@ -15,6 +15,7 @@ import re import subprocess import urllib +from google.cloud import storage from dataproc_jupyter_plugin import urls from dataproc_jupyter_plugin.commons.commands import async_run_gsutil_subcommand @@ -86,20 +87,27 @@ async def list_jobs(self, composer_name): self.log.exception(f"Error getting dag list: {str(e)}") return {"error": str(e)} + async def delete_job(self, composer_name, dag_id, from_page): - airflow_uri, bucket = await self.get_airflow_uri(composer_name) + airflow_uri, bucket_name = await self.get_airflow_uri(composer_name) try: - api_endpoint = f"{airflow_uri}/api/v1/dags/{dag_id}" + api_endpoint = f"{airflow_uri}/api/v1/dags/{dag_id}" + # Delete the DAG via the Airflow API if from_page is None if from_page is None: async with self.client_session.delete( api_endpoint, headers=self.create_headers() ) as response: self.log.info(response) - cmd = f"gsutil rm gs://{bucket}/dags/dag_{dag_id}.py" - await async_run_gsutil_subcommand(cmd) + bucket = storage.Client().bucket(bucket_name) + blob_name = f"dags/dag_{dag_id}.py" + blob = bucket.blob(blob_name) + blob.delete() + + self.log.info(f"Deleted {blob_name} from bucket {bucket_name}") + return 0 except Exception as e: - self.log.exception(f"Error deleting dag: {str(e)}") + self.log.exception(f"Error deleting DAG: {str(e)}") return {"error": str(e)} async def update_job(self, composer_name, dag_id, status): diff --git a/dataproc_jupyter_plugin/services/executor.py b/dataproc_jupyter_plugin/services/executor.py index f8517702..32bf7985 100644 --- a/dataproc_jupyter_plugin/services/executor.py +++ b/dataproc_jupyter_plugin/services/executor.py @@ -19,6 +19,8 @@ from datetime import datetime, timedelta from google.cloud import storage from google.api_core.exceptions import NotFound +import google.oauth2.credentials as oauth2 +import aiofiles import aiohttp import pendulum @@ -99,30 +101,32 @@ async def check_file_exists(self, bucket_name, file_path): self.log.exception(f"Error checking file: {error}") raise IOError(f"Error creating dag: {error}") - async def upload_papermill_to_gcs(self, gcs_dag_bucket): - env = Environment( - loader=PackageLoader(PACKAGE_NAME, "dagTemplates"), - autoescape=select_autoescape(["py"]), - ) - wrapper_papermill_path = env.get_template("wrapper_papermill.py").filename + + async def upload_to_gcs(self, gcs_dag_bucket, file_path=None, template_name=None, destination_dir=None): try: - cmd = f"gsutil cp '{wrapper_papermill_path}' gs://{gcs_dag_bucket}/dataproc-notebooks/" - await async_run_gsutil_subcommand(cmd) - self.log.info("Papermill file uploaded to gcs successfully") - except subprocess.CalledProcessError as error: - self.log.exception( - f"Error uploading papermill file to gcs: {error.decode()}" - ) - raise IOError(error.decode) - - async def upload_input_file_to_gcs(self, input, gcs_dag_bucket, job_name): - try: - cmd = f"gsutil cp './{input}' gs://{gcs_dag_bucket}/dataproc-notebooks/{job_name}/input_notebooks/" - await async_run_gsutil_subcommand(cmd) - self.log.info("Input file uploaded to gcs successfully") - except subprocess.CalledProcessError as error: - self.log.exception(f"Error uploading input file to gcs: {error.decode()}") - raise IOError(error.decode) + storage_client = storage.Client() + bucket = storage_client.bucket(gcs_dag_bucket) + if template_name: + env = Environment( + loader=PackageLoader(PACKAGE_NAME, TEMPLATES_FOLDER_PATH), + autoescape=select_autoescape(["py"]), + ) + file_path = env.get_template(template_name).filename + + if not file_path: + raise ValueError("No file path or template name provided for upload.") + if destination_dir: + blob_name = f"{destination_dir}/{file_path.split('/')[-1]}" + else: + blob_name = f"{file_path.split('/')[-1]}" + + blob = bucket.blob(blob_name) + blob.upload_from_filename(file_path) + self.log.info(f"File {file_path} uploaded to gcs successfully") + + except Exception as error: + self.log.exception(f"Error uploading file to GCS: {str(error)}") + raise IOError(str(error)) def prepare_dag(self, job, gcs_dag_bucket, dag_file): self.log.info("Generating dag file") @@ -239,17 +243,8 @@ def prepare_dag(self, job, gcs_dag_bucket, dag_file): ) wrapper_papermill_path = env.get_template("wrapper_papermill.py").filename shutil.copy2(wrapper_papermill_path, LOCAL_DAG_FILE_LOCATION) + return file_path - async def upload_dag_to_gcs(self, job, dag_file, gcs_dag_bucket): - LOCAL_DAG_FILE_LOCATION = f"./scheduled-jobs/{job.name}" - file_path = os.path.join(LOCAL_DAG_FILE_LOCATION, dag_file) - try: - cmd = f"gsutil cp '{file_path}' gs://{gcs_dag_bucket}/dags/" - await async_run_gsutil_subcommand(cmd) - self.log.info("Dag file uploaded to gcs successfully") - except subprocess.CalledProcessError as error: - self.log.exception(f"Error uploading dag file to gcs: {error.decode()}") - raise IOError(error.decode) async def execute(self, input_data): try: @@ -269,20 +264,19 @@ async def execute(self, input_data): f"The file gs://{gcs_dag_bucket}/{wrapper_pappermill_file_path} exists." ) else: - await self.upload_papermill_to_gcs(gcs_dag_bucket) + await self.upload_to_gcs(gcs_dag_bucket, template_name=WRAPPER_PAPPERMILL_FILE, destination_dir="dataproc-notebooks") print( f"The file gs://{gcs_dag_bucket}/{wrapper_pappermill_file_path} does not exist." ) if not job.input_filename.startswith(GCS): - await self.upload_input_file_to_gcs( - job.input_filename, gcs_dag_bucket, job_name - ) - self.prepare_dag(job, gcs_dag_bucket, dag_file) - await self.upload_dag_to_gcs(job, dag_file, gcs_dag_bucket) + await self.upload_to_gcs(gcs_dag_bucket, file_path=f"./{job.input_filename}", destination_dir=f"dataproc-notebooks/{job_name}/input_notebooks") + file_path = self.prepare_dag(job, gcs_dag_bucket, dag_file) + await self.upload_to_gcs(gcs_dag_bucket, file_path=file_path, destination_dir="dags") return {"status": 0} except Exception as e: return {"error": str(e)} + async def download_dag_output( self, composer_environment_name, bucket_name, dag_id, dag_run_id ): @@ -292,11 +286,20 @@ async def download_dag_output( ) except Exception as ex: return {"error": f"Invalid DAG run ID {dag_run_id}"} + try: - cmd = f"gsutil cp 'gs://{bucket_name}/dataproc-output/{dag_id}/output-notebooks/{dag_id}_{dag_run_id}.ipynb' ./" - await async_run_gsutil_subcommand(cmd) - self.log.info("Output notebook file downloaded successfully") + credentials = oauth2.Credentials(self._access_token) + storage_client = storage.Client(credentials=credentials) + blob_name = f"dataproc-output/{dag_id}/output-notebooks/{dag_id}_{dag_run_id}.ipynb" + bucket = storage_client.bucket(bucket_name) + blob = bucket.blob(blob_name) + original_file_name = os.path.basename(blob_name) + destination_file_name = os.path.join(".", original_file_name) + async with aiofiles.open(destination_file_name, "wb") as f: + file_data = blob.download_as_bytes() + await f.write(file_data) + self.log.info(f"Output notebook file '{original_file_name}' downloaded successfully") return 0 - except subprocess.CalledProcessError as error: + except Exception as error: self.log.exception(f"Error downloading output notebook file: {str(error)}") - return {"error": str(error)} + return {"error": str(error)} \ No newline at end of file From 187347b8af991a9497c173ee2af78944a0229e42 Mon Sep 17 00:00:00 2001 From: saranyaloganathan23 Date: Wed, 16 Oct 2024 13:22:29 +0530 Subject: [PATCH 2/9] FE changes --- src/scheduler/createNotebookScheduler.tsx | 184 +++++++++++----------- 1 file changed, 94 insertions(+), 90 deletions(-) diff --git a/src/scheduler/createNotebookScheduler.tsx b/src/scheduler/createNotebookScheduler.tsx index 1e895e9f..d8f503ec 100644 --- a/src/scheduler/createNotebookScheduler.tsx +++ b/src/scheduler/createNotebookScheduler.tsx @@ -121,8 +121,8 @@ const CreateNotebookScheduler = ({ const [editMode, setEditMode] = useState(false); const [dagListCall, setDagListCall] = useState(false); const [isLoadingKernelDetail, setIsLoadingKernelDetail] = useState(false); + const [isLocalKernel, setIsLocalKernel] = useState(false); - const [isBigQueryNotebook, setIsBigQueryNotebook] = useState(false); const listClustersAPI = async () => { await SchedulerService.listClustersAPIService( @@ -266,6 +266,7 @@ const CreateNotebookScheduler = ({ schedule_value: scheduleMode === 'runNow' ? '' : scheduleValue, stop_cluster: stopCluster, dag_id: randomDagId, + local_kernel: isLocalKernel, time_zone: scheduleMode !== 'runNow' ? timeZoneSelected : '', [selectedMode === 'cluster' ? 'cluster_name' : 'serverless_name']: selectedMode === 'cluster' ? clusterSelected : serverlessDataSelected @@ -306,8 +307,8 @@ const CreateNotebookScheduler = ({ (!jobNameUniqueValidation && !editMode) || inputFileSelected === '' || composerSelected === '' || - (selectedMode === 'cluster' && clusterSelected === '') || - (selectedMode === 'serverless' && serverlessSelected === '') || + (selectedMode === 'cluster' && clusterSelected === '' && !isLocalKernel) || + (selectedMode === 'serverless' && serverlessSelected === '' && !isLocalKernel) || ((emailOnFailure || emailOnRetry || emailOnSuccess) && emailList.length === 0) ); @@ -327,6 +328,12 @@ const CreateNotebookScheduler = ({ const kernels = kernelSpecs.kernelspecs; if (kernels && context.sessionContext.kernelPreference.name) { + console.log('aaaaa', context.sessionContext.kernelDisplayName); + if (context.sessionContext.kernelDisplayName.includes('Local')) { + setIsLocalKernel(true); + } else { + setIsLocalKernel(false); + } if ( kernels[context.sessionContext.kernelPreference.name].resources .endpointParentResource @@ -369,10 +376,6 @@ const CreateNotebookScheduler = ({ if (context !== '') { setInputFileSelected(context.path); - if (context.path.toLowerCase().startsWith('bigframes')) { - setIsBigQueryNotebook(true); - setSelectedMode('serverless'); - } } setJobNameSelected(''); if (!editMode) { @@ -553,93 +556,94 @@ const CreateNotebookScheduler = ({ fromPage="scheduler" /> - {!isBigQueryNotebook && ( -
- - - } - label={ - Cluster - } + + {!isLocalKernel && ( + <> +
+ + + } + label={ + Cluster + } + /> + } + label={ + + Serverless + + } + /> + + +
+
+ {isLoadingKernelDetail && ( + - } - label={ - - Serverless - - } + )} + {selectedMode === 'cluster' && !isLoadingKernelDetail && ( + handleClusterSelected(val)} + renderInput={params => ( + + )} /> - - -
- )} -
- {isLoadingKernelDetail && ( - - )} - {!isBigQueryNotebook && - selectedMode === 'cluster' && - !isLoadingKernelDetail && ( - handleClusterSelected(val)} - renderInput={params => ( - - )} - /> - )} - {selectedMode === 'serverless' && !isLoadingKernelDetail && ( - handleServerlessSelected(val)} - renderInput={params => ( - )} - /> - )} -
- {!isBigQueryNotebook && selectedMode === 'cluster' && ( -
- - handleServerlessSelected(val)} + renderInput={params => ( + + )} + /> + )} +
+ {selectedMode === 'cluster' && ( +
+ + + } + className="create-scheduler-label-style" + label={ + + Stop the cluster after notebook execution + + } /> - } - className="create-scheduler-label-style" - label={ - - Stop the cluster after notebook execution - - } - /> - -
+ +
+ )} + )}
Date: Wed, 16 Oct 2024 15:30:59 +0530 Subject: [PATCH 3/9] Revert "FE changes" This reverts commit 187347b8af991a9497c173ee2af78944a0229e42. --- src/scheduler/createNotebookScheduler.tsx | 184 +++++++++++----------- 1 file changed, 90 insertions(+), 94 deletions(-) diff --git a/src/scheduler/createNotebookScheduler.tsx b/src/scheduler/createNotebookScheduler.tsx index d8f503ec..1e895e9f 100644 --- a/src/scheduler/createNotebookScheduler.tsx +++ b/src/scheduler/createNotebookScheduler.tsx @@ -121,8 +121,8 @@ const CreateNotebookScheduler = ({ const [editMode, setEditMode] = useState(false); const [dagListCall, setDagListCall] = useState(false); const [isLoadingKernelDetail, setIsLoadingKernelDetail] = useState(false); - const [isLocalKernel, setIsLocalKernel] = useState(false); + const [isBigQueryNotebook, setIsBigQueryNotebook] = useState(false); const listClustersAPI = async () => { await SchedulerService.listClustersAPIService( @@ -266,7 +266,6 @@ const CreateNotebookScheduler = ({ schedule_value: scheduleMode === 'runNow' ? '' : scheduleValue, stop_cluster: stopCluster, dag_id: randomDagId, - local_kernel: isLocalKernel, time_zone: scheduleMode !== 'runNow' ? timeZoneSelected : '', [selectedMode === 'cluster' ? 'cluster_name' : 'serverless_name']: selectedMode === 'cluster' ? clusterSelected : serverlessDataSelected @@ -307,8 +306,8 @@ const CreateNotebookScheduler = ({ (!jobNameUniqueValidation && !editMode) || inputFileSelected === '' || composerSelected === '' || - (selectedMode === 'cluster' && clusterSelected === '' && !isLocalKernel) || - (selectedMode === 'serverless' && serverlessSelected === '' && !isLocalKernel) || + (selectedMode === 'cluster' && clusterSelected === '') || + (selectedMode === 'serverless' && serverlessSelected === '') || ((emailOnFailure || emailOnRetry || emailOnSuccess) && emailList.length === 0) ); @@ -328,12 +327,6 @@ const CreateNotebookScheduler = ({ const kernels = kernelSpecs.kernelspecs; if (kernels && context.sessionContext.kernelPreference.name) { - console.log('aaaaa', context.sessionContext.kernelDisplayName); - if (context.sessionContext.kernelDisplayName.includes('Local')) { - setIsLocalKernel(true); - } else { - setIsLocalKernel(false); - } if ( kernels[context.sessionContext.kernelPreference.name].resources .endpointParentResource @@ -376,6 +369,10 @@ const CreateNotebookScheduler = ({ if (context !== '') { setInputFileSelected(context.path); + if (context.path.toLowerCase().startsWith('bigframes')) { + setIsBigQueryNotebook(true); + setSelectedMode('serverless'); + } } setJobNameSelected(''); if (!editMode) { @@ -556,94 +553,93 @@ const CreateNotebookScheduler = ({ fromPage="scheduler" /> - - {!isLocalKernel && ( - <> -
- - - } - label={ - Cluster - } - /> - } - label={ - - Serverless - - } - /> - - -
-
- {isLoadingKernelDetail && ( - + + + } + label={ + Cluster + } /> - )} - {selectedMode === 'cluster' && !isLoadingKernelDetail && ( - handleClusterSelected(val)} - renderInput={params => ( - - )} - /> - )} - {selectedMode === 'serverless' && !isLoadingKernelDetail && ( - handleServerlessSelected(val)} - renderInput={params => ( - - )} + } + label={ + + Serverless + + } /> + + +
+ )} +
+ {isLoadingKernelDetail && ( + + )} + {!isBigQueryNotebook && + selectedMode === 'cluster' && + !isLoadingKernelDetail && ( + handleClusterSelected(val)} + renderInput={params => ( + + )} + /> + )} + {selectedMode === 'serverless' && !isLoadingKernelDetail && ( + handleServerlessSelected(val)} + renderInput={params => ( + )} -
- {selectedMode === 'cluster' && ( -
- - - } - className="create-scheduler-label-style" - label={ - - Stop the cluster after notebook execution - - } + /> + )} +
+ {!isBigQueryNotebook && selectedMode === 'cluster' && ( +
+ + - -
- )} - + } + className="create-scheduler-label-style" + label={ + + Stop the cluster after notebook execution + + } + /> + +
)}
Date: Thu, 17 Oct 2024 09:57:21 +0530 Subject: [PATCH 4/9] test file changes --- dataproc_jupyter_plugin/tests/test_airflow.py | 63 +++++++++---------- .../tests/test_executor.py | 28 ++++----- 2 files changed, 43 insertions(+), 48 deletions(-) diff --git a/dataproc_jupyter_plugin/tests/test_airflow.py b/dataproc_jupyter_plugin/tests/test_airflow.py index 1d3ed5ef..05ecd920 100644 --- a/dataproc_jupyter_plugin/tests/test_airflow.py +++ b/dataproc_jupyter_plugin/tests/test_airflow.py @@ -14,6 +14,7 @@ import json import subprocess +from unittest.mock import AsyncMock, MagicMock import aiohttp @@ -81,40 +82,34 @@ async def mock_credentials(): assert payload == {"error": "Missing required credentials"} -@pytest.mark.parametrize("returncode, expected_result", [(0, 0)]) -async def test_delete_job(monkeypatch, returncode, expected_result, jp_fetch): - - async def mock_async_command_executor(cmd): - if cmd is None: - raise ValueError("Received None for cmd parameter") - if returncode == 0: - return b"output", b"" - else: - raise subprocess.CalledProcessError( - returncode, cmd, output=b"output", stderr=b"error in executing command" - ) - - monkeypatch.setattr(airflow.Client, "get_airflow_uri", mock_get_airflow_uri) - monkeypatch.setattr( - airflow, "async_run_gsutil_subcommand", mock_async_command_executor - ) - monkeypatch.setattr(aiohttp, "ClientSession", MockClientSession) - mock_composer = "mock-composer" - mock_dag_id = "mock_dag_id" - mock_from_page = "mock_from_page" - response = await jp_fetch( - "dataproc-plugin", - "dagDelete", - params={ - "composer": mock_composer, - "dag_id": mock_dag_id, - "from_page": mock_from_page, - }, - method="DELETE", - ) - assert response.code == 200 - payload = json.loads(response.body) - assert payload["status"] == 0 + @pytest.mark.asyncio + @pytest.mark.parametrize("from_page, expected_status", [(None, 0), ("some_page", 0)]) + async def test_delete_job(monkeypatch, from_page, expected_status, jp_fetch): + monkeypatch.setattr(airflow.Client, "get_airflow_uri", mock_get_airflow_uri) + mock_delete = AsyncMock() + mock_delete.return_value.__aenter__.return_value.status = 200 + mock_client_session = MagicMock() + mock_client_session.delete = mock_delete + monkeypatch.setattr("dagDelete.aiohttp.ClientSession", lambda: mock_client_session) + mock_bucket = MagicMock() + mock_blob = MagicMock() + mock_bucket.blob.return_value = mock_blob + mock_storage_client = MagicMock() + mock_storage_client.bucket.return_value = mock_bucket + monkeypatch.setattr("dagDelete.storage.Client", lambda: mock_storage_client) + response = await jp_fetch( + "dataproc-plugin", + "dagDelete", + method="DELETE", + params={ + "composer": "mock-composer", + "dag_id": "mock_dag_id", + "from_page": from_page, + }, + ) + assert response.code == 200 + payload = json.loads(response.body) + assert payload["status"] == expected_status class MockClientSession: diff --git a/dataproc_jupyter_plugin/tests/test_executor.py b/dataproc_jupyter_plugin/tests/test_executor.py index 67638e40..8fd6e5b6 100644 --- a/dataproc_jupyter_plugin/tests/test_executor.py +++ b/dataproc_jupyter_plugin/tests/test_executor.py @@ -19,7 +19,9 @@ import aiohttp import pytest +from google.cloud import storage +from dataproc_jupyter_plugin import credentials from dataproc_jupyter_plugin.commons import commands from dataproc_jupyter_plugin.services import airflow from dataproc_jupyter_plugin.services import executor @@ -96,30 +98,28 @@ async def test_execute_success( @pytest.mark.parametrize("returncode, expected_result", [(0, 0)]) async def test_download_dag_output(monkeypatch, returncode, expected_result, jp_fetch): - async def mock_async_command_executor(cmd): - if cmd is None: - raise ValueError("Received None for cmd parameter") - if returncode == 0: - return b"output", b"" - else: - raise subprocess.CalledProcessError( - returncode, cmd, output=b"output", stderr=b"error in executing command" - ) - async def mock_list_dag_run_task(*args, **kwargs): return None monkeypatch.setattr(airflow.Client, "list_dag_run_task", mock_list_dag_run_task) - monkeypatch.setattr( - executor, "async_run_gsutil_subcommand", mock_async_command_executor - ) monkeypatch.setattr(aiohttp, "ClientSession", MockClientSession) + mock_blob = MagicMock() + mock_blob.download_as_bytes.return_value = b"mock file content" + + mock_bucket = MagicMock() + mock_bucket.blob.return_value = mock_blob + + mock_storage_client = MagicMock() + mock_storage_client.bucket.return_value = mock_bucket + monkeypatch.setattr(credentials, "get_cached", mock_credentials) + monkeypatch.setattr(storage, "Client", lambda credentials=None: mock_storage_client) mock_composer_name = "mock-composer" mock_bucket_name = "mock_bucket" mock_dag_id = "mock-dag-id" mock_dag_run_id = "258" - command = f"gsutil cp 'gs://{mock_bucket_name}/dataproc-output/{mock_dag_id}/output-notebooks/{mock_dag_id}_{mock_dag_run_id}.ipynb' ./" + + response = await jp_fetch( "dataproc-plugin", "downloadOutput", From f9a0cfb307d763798e509aaaad9910691647dea1 Mon Sep 17 00:00:00 2001 From: saranyaloganathan23 Date: Thu, 17 Oct 2024 10:38:30 +0530 Subject: [PATCH 5/9] code formatting --- dataproc_jupyter_plugin/services/airflow.py | 5 +- dataproc_jupyter_plugin/services/executor.py | 47 ++++++++++++------- dataproc_jupyter_plugin/tests/test_airflow.py | 9 ++-- .../tests/test_executor.py | 1 - 4 files changed, 38 insertions(+), 24 deletions(-) diff --git a/dataproc_jupyter_plugin/services/airflow.py b/dataproc_jupyter_plugin/services/airflow.py index 7c4763c0..75157c92 100644 --- a/dataproc_jupyter_plugin/services/airflow.py +++ b/dataproc_jupyter_plugin/services/airflow.py @@ -87,11 +87,10 @@ async def list_jobs(self, composer_name): self.log.exception(f"Error getting dag list: {str(e)}") return {"error": str(e)} - async def delete_job(self, composer_name, dag_id, from_page): airflow_uri, bucket_name = await self.get_airflow_uri(composer_name) try: - api_endpoint = f"{airflow_uri}/api/v1/dags/{dag_id}" + api_endpoint = f"{airflow_uri}/api/v1/dags/{dag_id}" # Delete the DAG via the Airflow API if from_page is None if from_page is None: async with self.client_session.delete( @@ -104,7 +103,7 @@ async def delete_job(self, composer_name, dag_id, from_page): blob.delete() self.log.info(f"Deleted {blob_name} from bucket {bucket_name}") - + return 0 except Exception as e: self.log.exception(f"Error deleting DAG: {str(e)}") diff --git a/dataproc_jupyter_plugin/services/executor.py b/dataproc_jupyter_plugin/services/executor.py index 32bf7985..9805b95e 100644 --- a/dataproc_jupyter_plugin/services/executor.py +++ b/dataproc_jupyter_plugin/services/executor.py @@ -101,8 +101,9 @@ async def check_file_exists(self, bucket_name, file_path): self.log.exception(f"Error checking file: {error}") raise IOError(f"Error creating dag: {error}") - - async def upload_to_gcs(self, gcs_dag_bucket, file_path=None, template_name=None, destination_dir=None): + async def upload_to_gcs( + self, gcs_dag_bucket, file_path=None, template_name=None, destination_dir=None + ): try: storage_client = storage.Client() bucket = storage_client.bucket(gcs_dag_bucket) @@ -112,18 +113,18 @@ async def upload_to_gcs(self, gcs_dag_bucket, file_path=None, template_name=None autoescape=select_autoescape(["py"]), ) file_path = env.get_template(template_name).filename - + if not file_path: raise ValueError("No file path or template name provided for upload.") if destination_dir: blob_name = f"{destination_dir}/{file_path.split('/')[-1]}" else: blob_name = f"{file_path.split('/')[-1]}" - + blob = bucket.blob(blob_name) blob.upload_from_filename(file_path) self.log.info(f"File {file_path} uploaded to gcs successfully") - + except Exception as error: self.log.exception(f"Error uploading file to GCS: {str(error)}") raise IOError(str(error)) @@ -245,7 +246,6 @@ def prepare_dag(self, job, gcs_dag_bucket, dag_file): shutil.copy2(wrapper_papermill_path, LOCAL_DAG_FILE_LOCATION) return file_path - async def execute(self, input_data): try: job = DescribeJob(**input_data) @@ -264,19 +264,28 @@ async def execute(self, input_data): f"The file gs://{gcs_dag_bucket}/{wrapper_pappermill_file_path} exists." ) else: - await self.upload_to_gcs(gcs_dag_bucket, template_name=WRAPPER_PAPPERMILL_FILE, destination_dir="dataproc-notebooks") + await self.upload_to_gcs( + gcs_dag_bucket, + template_name=WRAPPER_PAPPERMILL_FILE, + destination_dir="dataproc-notebooks", + ) print( f"The file gs://{gcs_dag_bucket}/{wrapper_pappermill_file_path} does not exist." ) if not job.input_filename.startswith(GCS): - await self.upload_to_gcs(gcs_dag_bucket, file_path=f"./{job.input_filename}", destination_dir=f"dataproc-notebooks/{job_name}/input_notebooks") + await self.upload_to_gcs( + gcs_dag_bucket, + file_path=f"./{job.input_filename}", + destination_dir=f"dataproc-notebooks/{job_name}/input_notebooks", + ) file_path = self.prepare_dag(job, gcs_dag_bucket, dag_file) - await self.upload_to_gcs(gcs_dag_bucket, file_path=file_path, destination_dir="dags") + await self.upload_to_gcs( + gcs_dag_bucket, file_path=file_path, destination_dir="dags" + ) return {"status": 0} except Exception as e: return {"error": str(e)} - async def download_dag_output( self, composer_environment_name, bucket_name, dag_id, dag_run_id ): @@ -286,20 +295,24 @@ async def download_dag_output( ) except Exception as ex: return {"error": f"Invalid DAG run ID {dag_run_id}"} - + try: credentials = oauth2.Credentials(self._access_token) storage_client = storage.Client(credentials=credentials) - blob_name = f"dataproc-output/{dag_id}/output-notebooks/{dag_id}_{dag_run_id}.ipynb" + blob_name = ( + f"dataproc-output/{dag_id}/output-notebooks/{dag_id}_{dag_run_id}.ipynb" + ) bucket = storage_client.bucket(bucket_name) blob = bucket.blob(blob_name) - original_file_name = os.path.basename(blob_name) + original_file_name = os.path.basename(blob_name) destination_file_name = os.path.join(".", original_file_name) async with aiofiles.open(destination_file_name, "wb") as f: - file_data = blob.download_as_bytes() - await f.write(file_data) - self.log.info(f"Output notebook file '{original_file_name}' downloaded successfully") + file_data = blob.download_as_bytes() + await f.write(file_data) + self.log.info( + f"Output notebook file '{original_file_name}' downloaded successfully" + ) return 0 except Exception as error: self.log.exception(f"Error downloading output notebook file: {str(error)}") - return {"error": str(error)} \ No newline at end of file + return {"error": str(error)} diff --git a/dataproc_jupyter_plugin/tests/test_airflow.py b/dataproc_jupyter_plugin/tests/test_airflow.py index 05ecd920..59eb79fa 100644 --- a/dataproc_jupyter_plugin/tests/test_airflow.py +++ b/dataproc_jupyter_plugin/tests/test_airflow.py @@ -81,16 +81,19 @@ async def mock_credentials(): payload = json.loads(response.body) assert payload == {"error": "Missing required credentials"} - @pytest.mark.asyncio - @pytest.mark.parametrize("from_page, expected_status", [(None, 0), ("some_page", 0)]) + @pytest.mark.parametrize( + "from_page, expected_status", [(None, 0), ("some_page", 0)] + ) async def test_delete_job(monkeypatch, from_page, expected_status, jp_fetch): monkeypatch.setattr(airflow.Client, "get_airflow_uri", mock_get_airflow_uri) mock_delete = AsyncMock() mock_delete.return_value.__aenter__.return_value.status = 200 mock_client_session = MagicMock() mock_client_session.delete = mock_delete - monkeypatch.setattr("dagDelete.aiohttp.ClientSession", lambda: mock_client_session) + monkeypatch.setattr( + "dagDelete.aiohttp.ClientSession", lambda: mock_client_session + ) mock_bucket = MagicMock() mock_blob = MagicMock() mock_bucket.blob.return_value = mock_blob diff --git a/dataproc_jupyter_plugin/tests/test_executor.py b/dataproc_jupyter_plugin/tests/test_executor.py index 8fd6e5b6..3bb10cd6 100644 --- a/dataproc_jupyter_plugin/tests/test_executor.py +++ b/dataproc_jupyter_plugin/tests/test_executor.py @@ -119,7 +119,6 @@ async def mock_list_dag_run_task(*args, **kwargs): mock_dag_id = "mock-dag-id" mock_dag_run_id = "258" - response = await jp_fetch( "dataproc-plugin", "downloadOutput", From bcaac28d7628f147828a16ca33f2bd1f67ee77c5 Mon Sep 17 00:00:00 2001 From: saranyaloganathan23 Date: Thu, 17 Oct 2024 11:12:06 +0530 Subject: [PATCH 6/9] adding aiofiles dependency --- pyproject.toml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index f126ad39..462557ad 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -30,7 +30,8 @@ dependencies = [ "pydantic~=1.10.0", "bigframes~=0.22.0", "aiohttp~=3.9.5", - "google-cloud-storage~=2.18.2" + "google-cloud-storage~=2.18.2", + "aiofiles~=24.1.0" ] dynamic = ["version", "description", "authors", "urls", "keywords"] From 9e8ee79bdf3b9af399a9589b971f2393c8618b7e Mon Sep 17 00:00:00 2001 From: saranyaloganathan23 Date: Fri, 18 Oct 2024 19:07:23 +0530 Subject: [PATCH 7/9] aiofiles version change --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 462557ad..e8e370a7 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -31,7 +31,7 @@ dependencies = [ "bigframes~=0.22.0", "aiohttp~=3.9.5", "google-cloud-storage~=2.18.2", - "aiofiles~=24.1.0" + "aiofiles>=22.1.0" ] dynamic = ["version", "description", "authors", "urls", "keywords"] From e1eebc951d0b36588f72b384aa85dc5adbd25a07 Mon Sep 17 00:00:00 2001 From: saranyaloganathan23 Date: Mon, 21 Oct 2024 10:20:29 +0530 Subject: [PATCH 8/9] aiofiles version update --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index e8e370a7..8c00162e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -31,7 +31,7 @@ dependencies = [ "bigframes~=0.22.0", "aiohttp~=3.9.5", "google-cloud-storage~=2.18.2", - "aiofiles>=22.1.0" + "aiofiles~=22.1.0" ] dynamic = ["version", "description", "authors", "urls", "keywords"] From 1ef78eaa417a28481256ec4d40f6d258e02a25f1 Mon Sep 17 00:00:00 2001 From: saranyaloganathan23 Date: Mon, 21 Oct 2024 14:02:57 +0530 Subject: [PATCH 9/9] version range update --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 8c00162e..3fb83114 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -31,7 +31,7 @@ dependencies = [ "bigframes~=0.22.0", "aiohttp~=3.9.5", "google-cloud-storage~=2.18.2", - "aiofiles~=22.1.0" + "aiofiles>=22.1.0,<23" ] dynamic = ["version", "description", "authors", "urls", "keywords"]