Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for configurable qualx label column #1528

Merged
merged 5 commits into from
Feb 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions user_tools/src/spark_rapids_tools/tools/qualx/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# Copyright (c) 2025, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Config module for Qualx, controlled by environment variables.

Environment variables:
- QUALX_CACHE_DIR: cache directory for saving Profiler output.
- QUALX_DATA_DIR: data directory containing eventlogs, primarily used in dataset JSON files.
- QUALX_DIR: root directory for Qualx execution, primarily used in dataset JSON files to locate
dataset-specific plugins.
- QUALX_LABEL: targeted label column for XGBoost model.
- SPARK_RAPIDS_TOOLS_JAR: path to Spark RAPIDS Tools JAR file.
Comment on lines +17 to +23
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • In the user-tools wrapper we used a pattern across all environment variables RAPIDS_USER_TOOLS_*. Shall we apply the same concept for QualX related ones?
  • For, QUALX_CACHE_DIR: there is cache-directory used by the tools wrapper. Can we use the same value for both to reduce the number of variables needed by the tools? the tools uses env variable RAPIDS_USER_TOOLS_CACHE_FOLDER and it has default variable to /var/tmp/spark_rapids_user_tools_cache.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@amahussein I think there are a lot of scripts/tools that use these at the moment, so I'd leave renaming for another time. My hope is that this new config.py file will make it easier to refactor/rename in the future (while keeping changes minimal for now).

"""
import os


def get_cache_dir() -> str:
"""Get cache directory to save Profiler output."""
return os.environ.get('QUALX_CACHE_DIR', 'qualx_cache')
Comment on lines +28 to +30
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can use the utility methods to get/set the env variables.
For RAPIDS_USER_TOOLS environments, it will take care of adding the prefix.

@classmethod
def find_full_rapids_tools_env_key(cls, actual_key: str) -> str:
return f'RAPIDS_USER_TOOLS_{actual_key}'
@classmethod
def get_sys_env_var(cls, k: str, def_val=None) -> Optional[str]:
return os.environ.get(k, def_val)
@classmethod
def get_rapids_tools_env(cls, k: str, def_val=None):
val = cls.get_sys_env_var(cls.find_full_rapids_tools_env_key(k), def_val)
return val
@classmethod
def set_rapids_tools_env(cls, k: str, val):
os.environ[cls.find_full_rapids_tools_env_key(k)] = str(val)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment as above.



def get_label() -> str:
"""Get targeted label column for XGBoost model."""
label = os.environ.get('QUALX_LABEL', 'Duration')
assert label in ['Duration', 'duration_sum']
return label
45 changes: 24 additions & 21 deletions user_tools/src/spark_rapids_tools/tools/qualx/model.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2024, NVIDIA CORPORATION.
# Copyright (c) 2024-2025, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -22,6 +22,7 @@
import pandas as pd
import xgboost as xgb
from xgboost import Booster
from spark_rapids_tools.tools.qualx.config import get_label
from spark_rapids_tools.tools.qualx.preprocess import expected_raw_features
from spark_rapids_tools.tools.qualx.util import get_logger
# Import optional packages
Expand All @@ -42,7 +43,6 @@
'appId',
'appName',
'description',
'Duration',
'fraction_supported',
'jobStartTime_min',
'pluginEnabled',
Expand All @@ -53,8 +53,6 @@
'sqlID'
}

expected_model_features = expected_raw_features - ignored_features


def train(
cpu_aug_tbl: pd.DataFrame,
Expand Down Expand Up @@ -174,13 +172,14 @@ def predict(
preds['y'] = y
preds_df = pd.DataFrame(preds)

label = get_label() # Duration, duration_sum
select_columns = [
'appName',
'appId',
'appDuration',
'sqlID',
'scaleFactor',
'Duration',
label,
'fraction_supported',
'description',
]
Expand All @@ -196,31 +195,35 @@ def predict(

if 'y' in results_df.columns:
# reconstruct original gpu duration for validation purposes
results_df['gpuDuration'] = results_df['Duration'] / results_df['y']
results_df['gpuDuration'] = np.floor(results_df['gpuDuration'])
results_df[f'gpu_{label}'] = results_df[label] / results_df['y']
results_df[f'gpu_{label}'] = np.floor(results_df[f'gpu_{label}'])

# adjust raw predictions with stage/sqlID filtering of unsupported ops
results_df['Duration_pred'] = results_df['Duration'] * (
results_df[f'{label}_pred'] = results_df[label] * (
1.0
- results_df['fraction_supported']
+ (results_df['fraction_supported'] / results_df['y_pred'])
)
# compute fraction of duration in supported ops
results_df['Duration_supported'] = (
results_df['Duration'] * results_df['fraction_supported']
results_df[f'{label}_supported'] = (
results_df[label] * results_df['fraction_supported']
)
# compute adjusted speedup (vs. raw speedup prediction: 'y_pred')
# without qual data, this should be the same as the raw 'y_pred'
results_df['speedup_pred'] = results_df['Duration'] / results_df['Duration_pred']
results_df['speedup_pred'] = results_df[label] / results_df[f'{label}_pred']
results_df = results_df.drop(columns=['fraction_supported'])

return results_df


def extract_model_features(
df: pd.DataFrame, split_functions: Mapping[str, Callable[[pd.DataFrame], pd.DataFrame]] = None
df: pd.DataFrame,
split_functions: Mapping[str, Callable[[pd.DataFrame], pd.DataFrame]] = None,
) -> Tuple[pd.DataFrame, List[str], str]:
"""Extract model features from raw features."""
label = get_label()
expected_model_features = expected_raw_features - ignored_features
expected_model_features.remove(label)
missing = expected_raw_features - set(df.columns)
if missing:
logger.warning('Input dataframe is missing expected raw features: %s', missing)
Expand Down Expand Up @@ -256,11 +259,11 @@ def extract_model_features(
'appName',
'scaleFactor',
'sqlID',
'Duration',
label,
'description',
]
]
gpu_aug_tbl = gpu_aug_tbl.rename(columns={'Duration': 'xgpu_Duration'})
gpu_aug_tbl = gpu_aug_tbl.rename(columns={label: f'xgpu_{label}'})
cpu_aug_tbl = cpu_aug_tbl.merge(
gpu_aug_tbl,
on=['appName', 'scaleFactor', 'sqlID', 'description'],
Expand All @@ -269,7 +272,7 @@ def extract_model_features(

# warn for possible mismatched sqlIDs
num_rows = len(cpu_aug_tbl)
num_na = cpu_aug_tbl['xgpu_Duration'].isna().sum()
num_na = cpu_aug_tbl[f'xgpu_{label}'].isna().sum()
if (
num_na / num_rows > 0.05
): # arbitrary threshold, misaligned sqlIDs still may 'match' most of the time
Expand All @@ -279,14 +282,14 @@ def extract_model_features(
num_rows,
)

# calculate Duration_speedup
cpu_aug_tbl['Duration_speedup'] = (
cpu_aug_tbl['Duration'] / cpu_aug_tbl['xgpu_Duration']
# calculate speedup
cpu_aug_tbl[f'{label}_speedup'] = (
cpu_aug_tbl[label] / cpu_aug_tbl[f'xgpu_{label}']
)
cpu_aug_tbl = cpu_aug_tbl.drop(columns=['xgpu_Duration'])
cpu_aug_tbl = cpu_aug_tbl.drop(columns=[f'xgpu_{label}'])

# use Duration_speedup as label
label_col = 'Duration_speedup'
# use speedup as label
label_col = f'{label}_speedup'
else:
# inference dataset with CPU runs only
label_col = None
Expand Down
38 changes: 24 additions & 14 deletions user_tools/src/spark_rapids_tools/tools/qualx/preprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,14 @@
import os
import numpy as np
import pandas as pd
from spark_rapids_tools.tools.qualx.config import (
get_cache_dir,
get_label,
)
from spark_rapids_tools.tools.qualx.util import (
ensure_directory,
find_eventlogs,
find_paths,
get_cache_dir,
get_logger,
get_dataset_platforms,
load_plugin,
Expand Down Expand Up @@ -392,19 +395,20 @@ def infer_app_meta(eventlogs: List[str]) -> Mapping[str, Mapping]:
)

# run any plugin hooks on profile_df
for ds_name, plugin_path in plugins.items():
plugin = load_plugin(plugin_path)
if plugin:
df_schema = profile_df.dtypes
dataset_df = profile_df.loc[
(profile_df.appName == ds_name) | (profile_df.appName.str.startswith(f'{ds_name}:'))
]
modified_dataset_df = plugin.load_profiles_hook(dataset_df)
if modified_dataset_df.index.equals(dataset_df.index):
profile_df.update(modified_dataset_df)
profile_df.astype(df_schema)
else:
raise ValueError(f'Plugin: load_profiles_hook for {ds_name} unexpectedly modified row indices.')
if not profile_df.empty:
for ds_name, plugin_path in plugins.items():
plugin = load_plugin(plugin_path)
if plugin:
df_schema = profile_df.dtypes
dataset_df = profile_df.loc[
(profile_df.appName == ds_name) | (profile_df.appName.str.startswith(f'{ds_name}:'))
]
modified_dataset_df = plugin.load_profiles_hook(dataset_df)
if modified_dataset_df.index.equals(dataset_df.index):
profile_df.update(modified_dataset_df)
profile_df.astype(df_schema)
else:
raise ValueError(f'Plugin: load_profiles_hook for {ds_name} unexpectedly modified row indices.')
return profile_df


Expand Down Expand Up @@ -448,6 +452,12 @@ def combine_tables(table_name: str) -> pd.DataFrame:
fallback_reason=f'Empty feature tables found after preprocessing: {empty_tables_str}')
return pd.DataFrame()

if get_label() == 'duration_sum':
# override appDuration with sum(duration_sum) across all stages per appId
app_duration_sum = job_stage_agg_tbl.groupby('appId')['duration_sum'].sum().reset_index()
app_duration_sum = app_duration_sum.rename(columns={'duration_sum': 'appDuration'})
app_tbl = app_tbl.merge(app_duration_sum, on=['appId'], how='left', suffixes=['_orig', None])

# normalize dtypes
app_int_dtypes = ['taskCpu', 'taskGpu']
app_tbl[app_int_dtypes] = app_tbl[app_int_dtypes].fillna(0).astype(int)
Expand Down
Loading