Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
…upyter-plugin-fork into spark-properties-bugfix
  • Loading branch information
aditee-accenture committed Jul 26, 2024
2 parents 3858d91 + 64898f6 commit 43360fe
Show file tree
Hide file tree
Showing 7 changed files with 108 additions and 99 deletions.
35 changes: 35 additions & 0 deletions dataproc_jupyter_plugin/commons/commands.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


import asyncio
import subprocess
import sys
import tempfile


async def async_run_gsutil_subcommand(cmd):
"""Run a specified command and return its output."""
with tempfile.TemporaryFile() as t:
p = await asyncio.create_subprocess_shell(
f"{cmd}",
stdin=subprocess.DEVNULL,
stderr=sys.stderr,
stdout=t,
)
await p.wait()
if p.returncode != 0:
raise subprocess.CalledProcessError(p.returncode, None, None, None)
t.seek(0)
return t.read().decode("UTF-8").strip()
2 changes: 1 addition & 1 deletion dataproc_jupyter_plugin/controllers/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ async def get(self):
bucket_name = self.get_argument("bucket_name")
dag_id = self.get_argument("dag_id")
dag_run_id = self.get_argument("dag_run_id")
download_status = client.download_dag_output(
download_status = await client.download_dag_output(
bucket_name, dag_id, dag_run_id
)
self.finish(json.dumps({"status": download_status}))
Expand Down
14 changes: 3 additions & 11 deletions dataproc_jupyter_plugin/services/airflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import urllib

from dataproc_jupyter_plugin import urls
from dataproc_jupyter_plugin.commons.commands import async_run_gsutil_subcommand
from dataproc_jupyter_plugin.commons.constants import (
COMPOSER_SERVICE_NAME,
CONTENT_TYPE,
Expand Down Expand Up @@ -95,17 +96,8 @@ async def delete_job(self, composer_name, dag_id, from_page):
) as response:
self.log.info(response)
cmd = f"gsutil rm gs://{bucket}/dags/dag_{dag_id}.py"
process = subprocess.Popen(
cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True
)
output, _ = process.communicate()
if process.returncode == 0:
return 0
else:
self.log.exception("Error deleting dag")
raise Exception(
f"Error getting airflow uri: {response.reason} {await response.text()}"
)
await async_run_gsutil_subcommand(cmd)
return 0
except Exception as e:
self.log.exception(f"Error deleting dag: {str(e)}")
return {"error": str(e)}
Expand Down
1 change: 0 additions & 1 deletion dataproc_jupyter_plugin/services/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ async def list_datasets(self, page_token, project_id):
try:
bigquery_url = await urls.gcp_service_url(BIGQUERY_SERVICE_NAME)
api_endpoint = f"{bigquery_url}bigquery/v2/projects/{project_id}/datasets?pageToken={page_token}"

async with self.client_session.get(
api_endpoint, headers=self.create_headers()
) as response:
Expand Down
93 changes: 36 additions & 57 deletions dataproc_jupyter_plugin/services/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from jinja2 import Environment, PackageLoader, select_autoescape

from dataproc_jupyter_plugin import urls
from dataproc_jupyter_plugin.commons.commands import async_run_gsutil_subcommand
from dataproc_jupyter_plugin.commons.constants import (
COMPOSER_SERVICE_NAME,
CONTENT_TYPE,
Expand Down Expand Up @@ -83,51 +84,37 @@ async def get_bucket(self, runtime_env):
self.log.exception(f"Error getting bucket name: {str(e)}")
raise Exception(f"Error getting composer bucket: {str(e)}")

def check_file_exists(self, bucket, file_path):
cmd = f"gsutil ls gs://{bucket}/dataproc-notebooks/{file_path}"
process = subprocess.Popen(
cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True
)
output, error = process.communicate()
if process.returncode == 0:
async def check_file_exists(self, bucket, file_path):
try:
cmd = f"gsutil ls gs://{bucket}/dataproc-notebooks/{file_path}"
await async_run_gsutil_subcommand(cmd)
return True
else:
if "matched no objects" in error.decode():
return False
else:
self.log.exception(f"Error cheking file existence: {error.decode()}")
raise FileNotFoundError(error.decode)
except subprocess.CalledProcessError as error:
self.log.exception(f"Error checking papermill file: {error.decode()}")
raise IOError(error.decode)

def upload_papermill_to_gcs(self, gcs_dag_bucket):
async def upload_papermill_to_gcs(self, gcs_dag_bucket):
env = Environment(
loader=PackageLoader(PACKAGE_NAME, "dagTemplates"),
autoescape=select_autoescape(["py"]),
)
wrapper_papermill_path = env.get_template("wrapper_papermill.py").filename
cmd = f"gsutil cp '{wrapper_papermill_path}' gs://{gcs_dag_bucket}/dataproc-notebooks/"
process = subprocess.Popen(
cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True
)
output, error = process.communicate()
print(process.returncode, error, output)
if process.returncode == 0:
try:
cmd = f"gsutil cp '{wrapper_papermill_path}' gs://{gcs_dag_bucket}/dataproc-notebooks/"
await async_run_gsutil_subcommand(cmd)
self.log.info("Papermill file uploaded to gcs successfully")
print(process.returncode, error, output)
else:
except subprocess.CalledProcessError as error:
self.log.exception(
f"Error uploading papermill file to gcs: {error.decode()}"
)
raise IOError(error.decode)

def upload_input_file_to_gcs(self, input, gcs_dag_bucket, job_name):
cmd = f"gsutil cp './{input}' gs://{gcs_dag_bucket}/dataproc-notebooks/{job_name}/input_notebooks/"
process = subprocess.Popen(
cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True
)
output, error = process.communicate()
if process.returncode == 0:
async def upload_input_file_to_gcs(self, input, gcs_dag_bucket, job_name):
try:
cmd = f"gsutil cp './{input}' gs://{gcs_dag_bucket}/dataproc-notebooks/{job_name}/input_notebooks/"
await async_run_gsutil_subcommand(cmd)
self.log.info("Input file uploaded to gcs successfully")
else:
except subprocess.CalledProcessError as error:
self.log.exception(f"Error uploading input file to gcs: {error.decode()}")
raise IOError(error.decode)

Expand Down Expand Up @@ -247,18 +234,14 @@ def prepare_dag(self, job, gcs_dag_bucket, dag_file):
wrapper_papermill_path = env.get_template("wrapper_papermill.py").filename
shutil.copy2(wrapper_papermill_path, LOCAL_DAG_FILE_LOCATION)

def upload_dag_to_gcs(self, job, dag_file, gcs_dag_bucket):
async def upload_dag_to_gcs(self, job, dag_file, gcs_dag_bucket):
LOCAL_DAG_FILE_LOCATION = f"./scheduled-jobs/{job.name}"
file_path = os.path.join(LOCAL_DAG_FILE_LOCATION, dag_file)
cmd = f"gsutil cp '{file_path}' gs://{gcs_dag_bucket}/dags/"
process = subprocess.Popen(
cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True
)
output, error = process.communicate()
if process.returncode == 0:
try:
cmd = f"gsutil cp '{file_path}' gs://{gcs_dag_bucket}/dags/"
await async_run_gsutil_subcommand(cmd)
self.log.info("Dag file uploaded to gcs successfully")

if process.returncode != 0:
except subprocess.CalledProcessError as error:
self.log.exception(f"Error uploading dag file to gcs: {error.decode()}")
raise IOError(error.decode)

Expand All @@ -273,37 +256,33 @@ async def execute(self, input_data):
gcs_dag_bucket = await self.get_bucket(job.composer_environment_name)
wrapper_pappermill_file_path = WRAPPER_PAPPERMILL_FILE

if self.check_file_exists(gcs_dag_bucket, wrapper_pappermill_file_path):
if await self.check_file_exists(
gcs_dag_bucket, wrapper_pappermill_file_path
):
print(
f"The file gs://{gcs_dag_bucket}/{wrapper_pappermill_file_path} exists."
)
else:
self.upload_papermill_to_gcs(gcs_dag_bucket)
await self.upload_papermill_to_gcs(gcs_dag_bucket)
print(
f"The file gs://{gcs_dag_bucket}/{wrapper_pappermill_file_path} does not exist."
)
if not job.input_filename.startswith(GCS):
self.upload_input_file_to_gcs(
await self.upload_input_file_to_gcs(
job.input_filename, gcs_dag_bucket, job_name
)
self.prepare_dag(job, gcs_dag_bucket, dag_file)
self.upload_dag_to_gcs(job, dag_file, gcs_dag_bucket)
await self.upload_dag_to_gcs(job, dag_file, gcs_dag_bucket)
return {"status": 0}
except Exception as e:
return {"error": str(e)}

def download_dag_output(self, bucket_name, dag_id, dag_run_id):
async def download_dag_output(self, bucket_name, dag_id, dag_run_id):
try:
cmd = f"gsutil cp 'gs://{bucket_name}/dataproc-output/{dag_id}/output-notebooks/{dag_id}_{dag_run_id}.ipynb' ./"
process = subprocess.Popen(
cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True
)
output, _ = process.communicate()
if process.returncode == 0:
return 0
else:
self.log.exception("Error downloading output notebook file")
return 1
except Exception as e:
self.log.exception(f"Error downloading output notebook file: {str(e)}")
return {"error": str(e)}
await async_run_gsutil_subcommand(cmd)
self.log.info("Output notebook file downloaded successfully")
return 0
except subprocess.CalledProcessError as error:
self.log.exception(f"Error downloading output notebook file: {str(error)}")
return {"error": str(error)}
26 changes: 14 additions & 12 deletions dataproc_jupyter_plugin/tests/test_airflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@

from dataproc_jupyter_plugin.tests import mocks

from unittest.mock import Mock

import pytest
from google.cloud import jupyter_config

Expand Down Expand Up @@ -85,19 +83,20 @@ async def mock_credentials():

@pytest.mark.parametrize("returncode, expected_result", [(0, 0)])
async def test_delete_job(monkeypatch, returncode, expected_result, jp_fetch):
def mock_popen(returncode=0):
def _mock_popen(*args, **kwargs):
mock_process = Mock()
mock_process.communicate.return_value = (b"output", b"")
mock_process.returncode = returncode
return mock_process

return _mock_popen
async def mock_async_command_executor(cmd):
if cmd is None:
raise ValueError("Received None for cmd parameter")
if returncode == 0:
return b"output", b""
else:
raise subprocess.CalledProcessError(
returncode, cmd, output=b"output", stderr=b"error in executing command"
)

mocks.patch_mocks(monkeypatch)
monkeypatch.setattr(airflow.Client, "get_airflow_uri", mock_get_airflow_uri)
monkeypatch.setattr(subprocess, "Popen", mock_popen(returncode))

monkeypatch.setattr(airflow, "async_run_gsutil_subcommand", mock_async_command_executor)
monkeypatch.setattr(aiohttp, "ClientSession", MockClientSession)
mock_composer = "mock_composer"
mock_dag_id = "mock_dag_id"
mock_from_page = "mock_from_page"
Expand Down Expand Up @@ -130,6 +129,9 @@ def patch(self, api_endpoint, json, headers=None):
def get(self, api_endpoint, headers=None):
return mocks.MockResponse(None, text="mock log content")

def delete(self, api_endpoint, headers=None):
return mocks.MockResponse({})


async def test_update_job(monkeypatch, jp_fetch):
async def mock_config(config_field):
Expand Down
36 changes: 19 additions & 17 deletions dataproc_jupyter_plugin/tests/test_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@
import unittest
from unittest.mock import AsyncMock, MagicMock, Mock, patch

import aiohttp
import pytest
import requests
from google.cloud import jupyter_config

from dataproc_jupyter_plugin import credentials
from dataproc_jupyter_plugin.commons import commands
from dataproc_jupyter_plugin.services import executor
from dataproc_jupyter_plugin.tests.test_airflow import MockClientSession


async def mock_credentials():
Expand Down Expand Up @@ -93,23 +93,25 @@ async def test_execute_success(


@pytest.mark.parametrize("returncode, expected_result", [(0, 0)])
async def test_downlaod_output(monkeypatch, returncode, expected_result, jp_fetch):
def mock_popen(returncode=0):
def _mock_popen(*args, **kwargs):
mock_process = Mock()
mock_process.communicate.return_value = (b"output", b"")
mock_process.returncode = returncode
return mock_process

return _mock_popen

monkeypatch.setattr(credentials, "get_cached", mock_credentials)
monkeypatch.setattr(jupyter_config, "async_get_gcloud_config", mock_config)
monkeypatch.setattr(requests, "get", mock_get)
monkeypatch.setattr(subprocess, "Popen", mock_popen(returncode))
async def test_download_dag_output(monkeypatch, returncode, expected_result, jp_fetch):

async def mock_async_command_executor(cmd):
if cmd is None:
raise ValueError("Received None for cmd parameter")
if returncode == 0:
return b"output", b""
else:
raise subprocess.CalledProcessError(
returncode, cmd, output=b"output", stderr=b"error in executing command"
)

monkeypatch.setattr(executor, "async_run_gsutil_subcommand", mock_async_command_executor)
monkeypatch.setattr(aiohttp, "ClientSession", MockClientSession)

mock_bucket_name = "mock_bucekt"
mock_dag_id = "mock_dag_id"
mock_dag_run_id = "mock_dag_run_id"
command = f"gsutil cp 'gs://{mock_bucket_name}/dataproc-output/{mock_dag_id}/output-notebooks/{mock_dag_id}_{mock_dag_run_id}.ipynb' ./"
response = await jp_fetch(
"dataproc-plugin",
"downloadOutput",
Expand Down

0 comments on commit 43360fe

Please sign in to comment.