Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge Distributed Qualification Tools CLI #1516

Draft
wants to merge 6 commits into
base: dev
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion user_tools/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,9 @@ dependencies = [
# dependency of shap, python [3.9, 3.12]
"scikit-learn==1.5.2",
# used for retrieving available memory on the host
"psutil==6.1.1"
"psutil==6.1.1",
# pyspark for distributed computing
"pyspark==3.5.0"
]
dynamic=["entry-points", "version"]

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2023-2024, NVIDIA CORPORATION.
# Copyright (c) 2023-2025, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -88,6 +88,9 @@ def create_saving_estimator(self,
def create_local_submission_job(self, job_prop, ctxt) -> Any:
return DBAzureLocalRapidsJob(prop_container=job_prop, exec_ctxt=ctxt)

def create_distributed_submission_job(self, job_prop, ctxt) -> Any:
pass

def validate_job_submission_args(self, submission_args: dict) -> dict:
pass

Expand Down
5 changes: 4 additions & 1 deletion user_tools/src/spark_rapids_pytools/cloud_api/dataproc.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2023-2024, NVIDIA CORPORATION.
# Copyright (c) 2023-2025, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -130,6 +130,9 @@ def create_saving_estimator(self,
def create_local_submission_job(self, job_prop, ctxt) -> Any:
return DataprocLocalRapidsJob(prop_container=job_prop, exec_ctxt=ctxt)

def create_distributed_submission_job(self, job_prop, ctxt) -> Any:
pass

def validate_job_submission_args(self, submission_args: dict) -> dict:
pass

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2023-2024, NVIDIA CORPORATION.
# Copyright (c) 2023-2025, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -92,6 +92,9 @@ def create_saving_estimator(self,
def create_local_submission_job(self, job_prop, ctxt) -> Any:
return DataprocGkeLocalRapidsJob(prop_container=job_prop, exec_ctxt=ctxt)

def create_distributed_submission_job(self, job_prop, ctxt) -> Any:
pass


@dataclass
class DataprocGkeCMDDriver(DataprocCMDDriver):
Expand Down
5 changes: 4 additions & 1 deletion user_tools/src/spark_rapids_pytools/cloud_api/emr.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2023-2024, NVIDIA CORPORATION.
# Copyright (c) 2023-2025, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -115,6 +115,9 @@ def create_saving_estimator(self,
def create_local_submission_job(self, job_prop, ctxt) -> Any:
return EmrLocalRapidsJob(prop_container=job_prop, exec_ctxt=ctxt)

def create_distributed_submission_job(self, job_prop, ctxt) -> Any:
pass

def generate_cluster_configuration(self, render_args: dict):
image_version = self.configs.get_value_silent('clusterInference', 'defaultImage')
render_args['IMAGE'] = f'"{image_version}"'
Expand Down
16 changes: 14 additions & 2 deletions user_tools/src/spark_rapids_pytools/cloud_api/onprem.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2023-2024, NVIDIA CORPORATION.
# Copyright (c) 2023-2025, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -19,14 +19,14 @@
from typing import Any, List, Optional

from spark_rapids_tools import CspEnv
from spark_rapids_pytools.rapids.rapids_job import RapidsLocalJob
from spark_rapids_pytools.cloud_api.sp_types import PlatformBase, ClusterBase, ClusterNode, \
CMDDriverBase, ClusterGetAccessor, GpuDevice, \
GpuHWInfo, NodeHWInfo, SparkNodeType, SysInfo
from spark_rapids_pytools.common.prop_manager import JSONPropertiesContainer
from spark_rapids_pytools.common.sys_storage import StorageDriver
from spark_rapids_pytools.pricing.dataproc_pricing import DataprocPriceProvider
from spark_rapids_pytools.pricing.price_provider import SavingsEstimator
from spark_rapids_pytools.rapids.rapids_job import RapidsLocalJob, RapidsDistributedJob


@dataclass
Expand All @@ -49,6 +49,9 @@ def _install_storage_driver(self):
def create_local_submission_job(self, job_prop, ctxt) -> Any:
return OnPremLocalRapidsJob(prop_container=job_prop, exec_ctxt=ctxt)

def create_distributed_submission_job(self, job_prop, ctxt) -> RapidsDistributedJob:
return OnPremDistributedRapidsJob(prop_container=job_prop, exec_ctxt=ctxt)

def _construct_cluster_from_props(self, cluster: str, props: str = None, is_inferred: bool = False,
is_props_file: bool = False):
return OnPremCluster(self, is_inferred=is_inferred).set_connection(cluster_id=cluster, props=props)
Expand Down Expand Up @@ -154,6 +157,15 @@ class OnPremLocalRapidsJob(RapidsLocalJob):
job_label = 'onpremLocal'


# pylint: disable=abstract-method
@dataclass
class OnPremDistributedRapidsJob(RapidsDistributedJob):
"""
Implementation of a RAPIDS job that runs on a distributed cluster
"""
job_label = 'onprem.distributed'


@dataclass
class OnPremNode(ClusterNode):
"""Implementation of Onprem cluster node."""
Expand Down
7 changes: 5 additions & 2 deletions user_tools/src/spark_rapids_pytools/cloud_api/sp_types.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2023-2024, NVIDIA CORPORATION.
# Copyright (c) 2023-2025, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -22,11 +22,11 @@
from logging import Logger
from typing import Type, Any, List, Callable, Union, Optional, final, Dict

from spark_rapids_tools import EnumeratedType, CspEnv
from spark_rapids_pytools.common.prop_manager import AbstractPropertiesContainer, JSONPropertiesContainer, \
get_elem_non_safe
from spark_rapids_pytools.common.sys_storage import StorageDriver, FSUtil
from spark_rapids_pytools.common.utilities import ToolLogging, SysCmd, Utils, TemplateGenerator
from spark_rapids_tools import EnumeratedType, CspEnv


class DeployMode(EnumeratedType):
Expand Down Expand Up @@ -884,6 +884,9 @@ def create_saving_estimator(self,
def create_local_submission_job(self, job_prop, ctxt) -> Any:
raise NotImplementedError

def create_distributed_submission_job(self, job_prop, ctxt) -> Any:
raise NotImplementedError

def load_platform_configs(self):
config_file_name = f'{CspEnv.tostring(self.type_id).lower()}-configs.json'
config_path = Utils.resource_path(config_file_name)
Expand Down
19 changes: 17 additions & 2 deletions user_tools/src/spark_rapids_pytools/rapids/qualification.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
from spark_rapids_pytools.common.sys_storage import FSUtil
from spark_rapids_pytools.common.utilities import Utils, TemplateGenerator
from spark_rapids_pytools.rapids.rapids_tool import RapidsJarTool
from spark_rapids_tools.enums import QualFilterApp, QualEstimationModel
from spark_rapids_tools.enums import QualFilterApp, QualEstimationModel, SubmissionMode
from spark_rapids_tools.storagelib import CspFs
from spark_rapids_tools.tools.additional_heuristics import AdditionalHeuristics
from spark_rapids_tools.tools.cluster_config_recommender import ClusterConfigRecommender
Expand Down Expand Up @@ -153,6 +153,17 @@ def _process_estimation_model_args(self) -> None:
estimation_model_args = QualEstimationModel.create_default_model_args(selected_model)
self.ctxt.set_ctxt('estimationModelArgs', estimation_model_args)

def _process_submission_mode_arg(self) -> None:
"""
Process the value provided by `--submission_mode` argument.
"""
submission_mode_arg = self.wrapper_options.get('submissionMode')
if submission_mode_arg is None or not submission_mode_arg:
submission_mode = SubmissionMode.get_default()
else:
submission_mode = SubmissionMode.fromstring(submission_mode_arg)
self.ctxt.set_ctxt('submissionMode', submission_mode)

def _process_custom_args(self) -> None:
"""
Qualification tool processes extra arguments:
Expand Down Expand Up @@ -181,6 +192,7 @@ def _process_custom_args(self) -> None:
self._process_estimation_model_args()
self._process_offline_cluster_args()
self._process_eventlogs_args()
self._process_submission_mode_arg()
# This is noise to dump everything
# self.logger.debug('%s custom arguments = %s', self.pretty_name(), self.ctxt.props['wrapperCtx'])

Expand Down Expand Up @@ -375,7 +387,7 @@ def create_stdout_table_pprinter(total_apps: pd.DataFrame,

df = self._read_qualification_output_file('summaryReport')
# 1. Operations related to XGboost modelling
if self.ctxt.get_ctxt('estimationModelArgs')['xgboostEnabled']:
if not df.empty and self.ctxt.get_ctxt('estimationModelArgs')['xgboostEnabled']:
try:
df = self.__update_apps_with_prediction_info(df,
self.ctxt.get_ctxt('estimationModelArgs'))
Expand Down Expand Up @@ -609,6 +621,9 @@ def _read_qualification_output_file(self, report_name_key: str, file_format_key:
# extract the file name of report from the YAML config (e.g., toolOutput -> csv -> summaryReport -> fileName)
report_file_name = self.ctxt.get_value('toolOutput', file_format_key, report_name_key, 'fileName')
report_file_path = FSUtil.build_path(self.ctxt.get_rapids_output_folder(), report_file_name)
if not FSUtil.resource_exists(report_file_path):
self.logger.warning('Unable to read the report file \'%s\'. File does not exist.', report_file_path)
return pd.DataFrame()
return pd.read_csv(report_file_path)

def _read_qualification_metric_file(self, file_name: str) -> Dict[str, pd.DataFrame]:
Expand Down
65 changes: 54 additions & 11 deletions user_tools/src/spark_rapids_pytools/rapids/rapids_job.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2023-2024, NVIDIA CORPORATION.
# Copyright (c) 2023-2025, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -17,12 +17,15 @@
import os
from dataclasses import dataclass, field
from logging import Logger
from typing import List, Optional
from typing import List, Optional, Union

from spark_rapids_pytools.common.prop_manager import JSONPropertiesContainer
from spark_rapids_pytools.common.utilities import ToolLogging, Utils
from spark_rapids_pytools.rapids.tool_ctxt import ToolContext
from spark_rapids_tools import CspPath
from spark_rapids_tools.storagelib import LocalPath
from spark_rapids_tools_distributed.distributed_main import DistributedToolsExecutor
from spark_rapids_tools_distributed.jar_cmd_args import JarCmdArgs


@dataclass
Expand All @@ -38,6 +41,8 @@ def _init_fields(self):
self.props['sparkConfArgs'] = {}
if self.get_value_silent('platformArgs') is None:
self.props['platformArgs'] = {}
if self.get_value_silent('distributedToolsConfigs') is None:
self.props['distributedToolsConfigs'] = {}

def get_jar_file(self):
return self.get_value('rapidsArgs', 'jarFile')
Expand All @@ -48,6 +53,9 @@ def get_jar_main_class(self):
def get_rapids_args(self):
return self.get_value('rapidsArgs', 'jarArgs')

def get_distribution_tools_configs(self):
return self.get_value('distributedToolsConfigs')


@dataclass
class RapidsJob:
Expand Down Expand Up @@ -90,10 +98,10 @@ def _build_rapids_args(self):
rapids_arguments.extend(extra_rapids_args)
return rapids_arguments

def _build_submission_cmd(self) -> list:
def _build_submission_cmd(self) -> Union[list, JarCmdArgs]:
raise NotImplementedError

def _submit_job(self, cmd_args: list) -> str:
def _submit_job(self, cmd_args: Union[list, JarCmdArgs]) -> str:
raise NotImplementedError

def _print_job_output(self, job_output: str):
Expand Down Expand Up @@ -125,13 +133,6 @@ def run_job(self):
self._cleanup_temp_log4j_files()
return job_output


@dataclass
class RapidsLocalJob(RapidsJob):
"""
Implementation of a RAPIDS job that runs local on a machine.
"""

def _get_hadoop_classpath(self) -> Optional[str]:
"""
Gets the Hadoop's configuration directory from the environment variables.
Expand Down Expand Up @@ -202,6 +203,13 @@ def _build_jvm_args(self):
vm_args.append(val)
return vm_args


@dataclass
class RapidsLocalJob(RapidsJob):
"""
Implementation of a RAPIDS job that runs local on a machine.
"""

def _build_submission_cmd(self) -> list:
# env vars are added later as a separate dictionary
classpath_arr = self._build_classpath()
Expand All @@ -218,3 +226,38 @@ def _submit_job(self, cmd_args: list) -> str:
out_std = self.exec_ctxt.platform.cli.run_sys_cmd(cmd=cmd_args,
env_vars=env_args)
return out_std


@dataclass
class RapidsDistributedJob(RapidsJob):
"""
Implementation of a RAPIDS job that runs distributed on a cluster.
"""

def _build_submission_cmd(self) -> JarCmdArgs:
classpath_arr = self._build_classpath()
hadoop_cp = self._get_hadoop_classpath()
jvm_args_arr = self._build_jvm_args()
jar_main_class = self.prop_container.get_jar_main_class()
jar_output_dir_args = self._get_persistent_rapids_args()
extra_rapids_args = self.prop_container.get_rapids_args()
return JarCmdArgs(jvm_args_arr, classpath_arr, hadoop_cp, jar_main_class,
jar_output_dir_args, extra_rapids_args)

def _build_classpath(self) -> List[str]:
"""
Only the Spark RAPIDS Tools JAR file is needed for the classpath.
Assumption: Each worker node should have the Spark Jars pre-installed.
TODO: Ship the Spark JARs to the cluster to avoid version mismatch issues.
"""
return ['-cp', self.prop_container.get_jar_file()]

def _submit_job(self, cmd_args: JarCmdArgs) -> None:
"""
Submit the Tools JAR cmd to the Spark cluster.
"""
user_configs = self.prop_container.get_distribution_tools_configs()
executor = DistributedToolsExecutor(user_submission_configs=user_configs.submission,
cli_output_path=CspPath(self.exec_ctxt.get_output_folder()),
jar_cmd_args=cmd_args)
executor.run_as_spark_app()
Loading
Loading