Skip to content

Commit

Permalink
Merge dev into main
Browse files Browse the repository at this point in the history
Signed-off-by: spark-rapids automation <[email protected]>
  • Loading branch information
nvauto committed Dec 20, 2024
2 parents be4af4c + 7308c12 commit 7e34d96
Show file tree
Hide file tree
Showing 50 changed files with 985 additions and 378 deletions.
2 changes: 1 addition & 1 deletion core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
<artifactId>rapids-4-spark-tools_2.12</artifactId>
<name>RAPIDS Accelerator for Apache Spark tools</name>
<description>RAPIDS Accelerator for Apache Spark tools</description>
<version>24.10.3</version>
<version>24.10.4-SNAPSHOT</version>
<packaging>jar</packaging>
<url>http://github.com/NVIDIA/spark-rapids-tools</url>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -305,3 +305,4 @@ DecimalSum,1.5
MaxBy,1.5
MinBy,1.5
ArrayJoin,1.5
RunningWindowFunctionExec,1.5
Original file line number Diff line number Diff line change
Expand Up @@ -305,3 +305,4 @@ DecimalSum,1.5
MaxBy,1.5
MinBy,1.5
ArrayJoin,1.5
RunningWindowFunctionExec,1.5
Original file line number Diff line number Diff line change
Expand Up @@ -293,3 +293,4 @@ DecimalSum,1.5
MaxBy,1.5
MinBy,1.5
ArrayJoin,1.5
RunningWindowFunctionExec,1.5
1 change: 1 addition & 0 deletions core/src/main/resources/operatorsScore-dataproc-gke-l4.csv
Original file line number Diff line number Diff line change
Expand Up @@ -287,3 +287,4 @@ DecimalSum,1.5
MaxBy,1.5
MinBy,1.5
ArrayJoin,1.5
RunningWindowFunctionExec,1.5
1 change: 1 addition & 0 deletions core/src/main/resources/operatorsScore-dataproc-gke-t4.csv
Original file line number Diff line number Diff line change
Expand Up @@ -287,3 +287,4 @@ DecimalSum,1.5
MaxBy,1.5
MinBy,1.5
ArrayJoin,1.5
RunningWindowFunctionExec,1.5
1 change: 1 addition & 0 deletions core/src/main/resources/operatorsScore-dataproc-l4.csv
Original file line number Diff line number Diff line change
Expand Up @@ -293,3 +293,4 @@ DecimalSum,1.5
MaxBy,1.5
MinBy,1.5
ArrayJoin,1.5
RunningWindowFunctionExec,1.5
Original file line number Diff line number Diff line change
Expand Up @@ -287,3 +287,4 @@ DecimalSum,1.5
MaxBy,1.5
MinBy,1.5
ArrayJoin,1.5
RunningWindowFunctionExec,1.5
1 change: 1 addition & 0 deletions core/src/main/resources/operatorsScore-dataproc-t4.csv
Original file line number Diff line number Diff line change
Expand Up @@ -293,3 +293,4 @@ DecimalSum,1.5
MaxBy,1.5
MinBy,1.5
ArrayJoin,1.5
RunningWindowFunctionExec,1.5
1 change: 1 addition & 0 deletions core/src/main/resources/operatorsScore-emr-a10.csv
Original file line number Diff line number Diff line change
Expand Up @@ -293,3 +293,4 @@ DecimalSum,1.5
MaxBy,1.5
MinBy,1.5
ArrayJoin,1.5
RunningWindowFunctionExec,1.5
1 change: 1 addition & 0 deletions core/src/main/resources/operatorsScore-emr-a10G.csv
Original file line number Diff line number Diff line change
Expand Up @@ -293,3 +293,4 @@ DecimalSum,1.5
MaxBy,1.5
MinBy,1.5
ArrayJoin,1.5
RunningWindowFunctionExec,1.5
1 change: 1 addition & 0 deletions core/src/main/resources/operatorsScore-emr-t4.csv
Original file line number Diff line number Diff line change
Expand Up @@ -293,3 +293,4 @@ DecimalSum,1.5
MaxBy,1.5
MinBy,1.5
ArrayJoin,1.5
RunningWindowFunctionExec,1.5
1 change: 1 addition & 0 deletions core/src/main/resources/operatorsScore-onprem-a100.csv
Original file line number Diff line number Diff line change
Expand Up @@ -305,3 +305,4 @@ DecimalSum,1.5
MaxBy,1.5
MinBy,1.5
ArrayJoin,1.5
RunningWindowFunctionExec,1.5
1 change: 1 addition & 0 deletions core/src/main/resources/supportedExecs.csv
Original file line number Diff line number Diff line change
Expand Up @@ -57,3 +57,4 @@ WriteFilesExec,S,None,Input/Output,S,S,S,S,S,S,S,S,PS,S,S,S,S,S,PS,PS,PS,S,S,S
CustomShuffleReaderExec,S,None,Input/Output,S,S,S,S,S,S,S,S,PS,S,S,S,S,NS,PS,PS,PS,NS,NS,NS
WindowGroupLimitExec,S,None,Input/Output,S,S,S,S,S,S,S,S,PS,S,S,S,NS,NS,PS,PS,PS,NS,NS,NS
MapInArrowExec,S,None,Input/Output,S,S,S,S,S,S,S,S,PS,S,NS,NS,NS,NS,PS,NS,PS,NS,NS,NS
RunningWindowFunctionExec,S,None,Input/Output,S,S,S,S,S,S,S,S,PS,S,S,S,NS,NS,PS,PS,PS,NS,NS,NS
19 changes: 18 additions & 1 deletion core/src/main/scala/com/nvidia/spark/rapids/tool/Platform.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import com.nvidia.spark.rapids.tool.profiling.ClusterProperties

import org.apache.spark.internal.Logging
import org.apache.spark.sql.rapids.tool.{ExistingClusterInfo, RecommendedClusterInfo}
import org.apache.spark.sql.rapids.tool.util.StringUtils
import org.apache.spark.sql.rapids.tool.util.{SparkRuntime, StringUtils}

/**
* Utility object containing constants for various platform names.
Expand Down Expand Up @@ -132,6 +132,19 @@ abstract class Platform(var gpuDevice: Option[GpuDevice],
var recommendedClusterInfo: Option[RecommendedClusterInfo] = None
// the number of GPUs to use, this might be updated as we handle different cases
var numGpus: Int = 1
// Default runtime for the platform
val defaultRuntime: SparkRuntime.SparkRuntime = SparkRuntime.SPARK
// Set of supported runtimes for the platform
protected val supportedRuntimes: Set[SparkRuntime.SparkRuntime] = Set(
SparkRuntime.SPARK, SparkRuntime.SPARK_RAPIDS
)

/**
* Checks if the given runtime is supported by the platform.
*/
def isRuntimeSupported(runtime: SparkRuntime.SparkRuntime): Boolean = {
supportedRuntimes.contains(runtime)
}

// This function allow us to have one gpu type used by the auto
// tuner recommendations but have a different GPU used for speedup
Expand Down Expand Up @@ -511,6 +524,10 @@ abstract class DatabricksPlatform(gpuDevice: Option[GpuDevice],
override val defaultGpuDevice: GpuDevice = T4Gpu
override def isPlatformCSP: Boolean = true

override val supportedRuntimes: Set[SparkRuntime.SparkRuntime] = Set(
SparkRuntime.SPARK, SparkRuntime.SPARK_RAPIDS, SparkRuntime.PHOTON
)

// note that Databricks generally sets the spark.executor.memory for the user. Our
// auto tuner heuristics generally sets it lower then Databricks so go ahead and
// allow our auto tuner to take affect for this in anticipation that we will use more
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.nvidia.spark.rapids.tool.analysis

import scala.collection.breakOut
import scala.collection.mutable.{AbstractSet, ArrayBuffer, HashMap, LinkedHashSet}

import com.nvidia.spark.rapids.tool.profiling.{AccumProfileResults, SQLAccumProfileResults, SQLMetricInfoCase, SQLStageInfoProfileResult, UnsupportedSQLPlan, WholeStageCodeGenResults}
Expand Down Expand Up @@ -265,7 +266,7 @@ class AppSQLPlanAnalyzer(app: AppBase, appIndex: Int) extends AppAnalysisBase(ap
val jobsWithSQL = app.jobIdToInfo.filter { case (_, j) =>
j.sqlID.nonEmpty
}
val sqlToStages = jobsWithSQL.flatMap { case (jobId, j) =>
jobsWithSQL.flatMap { case (jobId, j) =>
val stages = j.stageIds
val stagesInJob = app.stageManager.getStagesByIds(stages)
stagesInJob.map { sModel =>
Expand All @@ -283,8 +284,7 @@ class AppSQLPlanAnalyzer(app: AppBase, appIndex: Int) extends AppAnalysisBase(ap
SQLStageInfoProfileResult(appIndex, j.sqlID.get, jobId, sModel.stageInfo.stageId,
sModel.stageInfo.attemptNumber(), sModel.duration, nodeNames)
}
}
sqlToStages.toSeq
}(breakOut)
}

def generateSQLAccums(): Seq[SQLAccumProfileResults] = {
Expand All @@ -294,20 +294,11 @@ class AppSQLPlanAnalyzer(app: AppBase, appIndex: Int) extends AppAnalysisBase(ap
val driverAccumsOpt = app.driverAccumMap.get(metric.accumulatorId)
val driverMax = driverAccumsOpt match {
case Some(accums) =>
val filtered = accums.filter { a =>
a.sqlID == metric.sqlID
}
val accumValues = filtered.map(_.value).sortWith(_ < _)
if (accumValues.isEmpty) {
None
} else if (accumValues.length <= 1) {
Some(StatisticsMetrics(0L, 0L, 0L, accumValues.sum))
} else {
Some(StatisticsMetrics(accumValues(0), accumValues(accumValues.size / 2),
accumValues(accumValues.size - 1), accumValues.sum))
}
case None =>
None
StatisticsMetrics.createOptionalFromArr(accums.collect {
case a if a.sqlID == metric.sqlID =>
a.value
}(breakOut))
case _ => None
}

if (accumTaskStats.isDefined || driverMax.isDefined) {
Expand All @@ -325,7 +316,7 @@ class AppSQLPlanAnalyzer(app: AppBase, appIndex: Int) extends AppAnalysisBase(ap
} else {
None
}
}
}(breakOut)
}

/**
Expand All @@ -341,40 +332,31 @@ class AppSQLPlanAnalyzer(app: AppBase, appIndex: Int) extends AppAnalysisBase(ap
def generateStageLevelAccums(): Seq[AccumProfileResults] = {
app.accumManager.accumInfoMap.flatMap { accumMapEntry =>
val accumInfo = accumMapEntry._2
accumInfo.stageValuesMap.keySet.flatMap( stageId => {
val stageTaskIds = app.taskManager.getAllTasksStageAttempt(stageId).map(_.taskId).toSet
// get the task updates that belong to that stage
val taskUpatesSubset =
accumInfo.taskUpdatesMap.filterKeys(stageTaskIds.contains).values.toSeq.sorted
if (taskUpatesSubset.isEmpty) {
None
} else {
val min = taskUpatesSubset.head
val max = taskUpatesSubset.last
val sum = taskUpatesSubset.sum
val median = if (taskUpatesSubset.size % 2 == 0) {
val mid = taskUpatesSubset.size / 2
(taskUpatesSubset(mid) + taskUpatesSubset(mid - 1)) / 2
} else {
taskUpatesSubset(taskUpatesSubset.size / 2)
}
// reuse AccumProfileResults to avoid generating extra memory from allocating new objects
val accumProfileResults = AccumProfileResults(
appIndex,
stageId,
accumInfo.infoRef,
min = min,
median = median,
max = max,
total = sum)
if (accumInfo.infoRef.name.isDiagnosticMetrics()) {
updateStageDiagnosticMetrics(accumProfileResults)
}
Some(accumProfileResults)
accumInfo.stageValuesMap.keys.flatMap( stageId => {
val stageTaskIds: Set[Long] =
app.taskManager.getAllTasksStageAttempt(stageId).map(_.taskId)(breakOut)
// Get the task updates that belong to that stage
StatisticsMetrics.createOptionalFromArr(
accumInfo.taskUpdatesMap.filterKeys(stageTaskIds).map(_._2)(breakOut)) match {
case Some(stat) =>
// Reuse AccumProfileResults to avoid generating allocating new objects
val accumProfileResults = AccumProfileResults(
appIndex,
stageId,
accumInfo.infoRef,
min = stat.min,
median = stat.med,
max = stat.max,
total = stat.total)
if (accumInfo.infoRef.name.isDiagnosticMetrics()) {
updateStageDiagnosticMetrics(accumProfileResults)
}
Some(accumProfileResults)
case _ => None
}
})
}
}.toSeq
}(breakOut)
}
}

object AppSQLPlanAnalyzer {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,13 @@ trait AppSparkMetricsAggTrait extends AppIndexMapperTrait {
def getAggRawMetrics(app: AppBase, index: Int, sqlAnalyzer: Option[AppSQLPlanAnalyzer] = None):
AggRawMetricsResult = {
val analysisObj = new AppSparkMetricsAnalyzer(app)
val sqlMetricsAgg = analysisObj.aggregateSparkMetricsBySql(index)
AggRawMetricsResult(
analysisObj.aggregateSparkMetricsByJob(index),
analysisObj.aggregateSparkMetricsByStage(index),
analysisObj.shuffleSkewCheck(index),
analysisObj.aggregateSparkMetricsBySql(index),
analysisObj.aggregateIOMetricsBySql(analysisObj.aggregateSparkMetricsBySql(index)),
sqlMetricsAgg,
analysisObj.aggregateIOMetricsBySql(sqlMetricsAgg),
analysisObj.aggregateDurationAndCPUTimeBySql(index),
Seq(analysisObj.maxTaskInputSizeBytesPerSQL(index)),
analysisObj.aggregateDiagnosticMetricsByStage(index, sqlAnalyzer))
Expand Down
Loading

0 comments on commit 7e34d96

Please sign in to comment.