diff --git a/core/pom.xml b/core/pom.xml
index 026233640..7ae043493 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -23,7 +23,7 @@
rapids-4-spark-tools_2.12
RAPIDS Accelerator for Apache Spark tools
RAPIDS Accelerator for Apache Spark tools
- 24.10.3
+ 24.10.4-SNAPSHOT
jar
http://github.com/NVIDIA/spark-rapids-tools
diff --git a/core/src/main/resources/operatorsScore-databricks-aws-a10G.csv b/core/src/main/resources/operatorsScore-databricks-aws-a10G.csv
index e5096e1a9..52eb193f2 100644
--- a/core/src/main/resources/operatorsScore-databricks-aws-a10G.csv
+++ b/core/src/main/resources/operatorsScore-databricks-aws-a10G.csv
@@ -305,3 +305,4 @@ DecimalSum,1.5
MaxBy,1.5
MinBy,1.5
ArrayJoin,1.5
+RunningWindowFunctionExec,1.5
diff --git a/core/src/main/resources/operatorsScore-databricks-aws-t4.csv b/core/src/main/resources/operatorsScore-databricks-aws-t4.csv
index e5096e1a9..52eb193f2 100644
--- a/core/src/main/resources/operatorsScore-databricks-aws-t4.csv
+++ b/core/src/main/resources/operatorsScore-databricks-aws-t4.csv
@@ -305,3 +305,4 @@ DecimalSum,1.5
MaxBy,1.5
MinBy,1.5
ArrayJoin,1.5
+RunningWindowFunctionExec,1.5
diff --git a/core/src/main/resources/operatorsScore-databricks-azure-t4.csv b/core/src/main/resources/operatorsScore-databricks-azure-t4.csv
index 66c738016..5ad387036 100644
--- a/core/src/main/resources/operatorsScore-databricks-azure-t4.csv
+++ b/core/src/main/resources/operatorsScore-databricks-azure-t4.csv
@@ -293,3 +293,4 @@ DecimalSum,1.5
MaxBy,1.5
MinBy,1.5
ArrayJoin,1.5
+RunningWindowFunctionExec,1.5
diff --git a/core/src/main/resources/operatorsScore-dataproc-gke-l4.csv b/core/src/main/resources/operatorsScore-dataproc-gke-l4.csv
index 57fc5b44d..902e598a1 100644
--- a/core/src/main/resources/operatorsScore-dataproc-gke-l4.csv
+++ b/core/src/main/resources/operatorsScore-dataproc-gke-l4.csv
@@ -287,3 +287,4 @@ DecimalSum,1.5
MaxBy,1.5
MinBy,1.5
ArrayJoin,1.5
+RunningWindowFunctionExec,1.5
diff --git a/core/src/main/resources/operatorsScore-dataproc-gke-t4.csv b/core/src/main/resources/operatorsScore-dataproc-gke-t4.csv
index 3459e64cb..e30f156f4 100644
--- a/core/src/main/resources/operatorsScore-dataproc-gke-t4.csv
+++ b/core/src/main/resources/operatorsScore-dataproc-gke-t4.csv
@@ -287,3 +287,4 @@ DecimalSum,1.5
MaxBy,1.5
MinBy,1.5
ArrayJoin,1.5
+RunningWindowFunctionExec,1.5
diff --git a/core/src/main/resources/operatorsScore-dataproc-l4.csv b/core/src/main/resources/operatorsScore-dataproc-l4.csv
index 422020970..0660dbdee 100644
--- a/core/src/main/resources/operatorsScore-dataproc-l4.csv
+++ b/core/src/main/resources/operatorsScore-dataproc-l4.csv
@@ -293,3 +293,4 @@ DecimalSum,1.5
MaxBy,1.5
MinBy,1.5
ArrayJoin,1.5
+RunningWindowFunctionExec,1.5
diff --git a/core/src/main/resources/operatorsScore-dataproc-serverless-l4.csv b/core/src/main/resources/operatorsScore-dataproc-serverless-l4.csv
index 61d9e3f1a..8dc9faa90 100644
--- a/core/src/main/resources/operatorsScore-dataproc-serverless-l4.csv
+++ b/core/src/main/resources/operatorsScore-dataproc-serverless-l4.csv
@@ -287,3 +287,4 @@ DecimalSum,1.5
MaxBy,1.5
MinBy,1.5
ArrayJoin,1.5
+RunningWindowFunctionExec,1.5
diff --git a/core/src/main/resources/operatorsScore-dataproc-t4.csv b/core/src/main/resources/operatorsScore-dataproc-t4.csv
index 10ef53900..e2eb69f60 100644
--- a/core/src/main/resources/operatorsScore-dataproc-t4.csv
+++ b/core/src/main/resources/operatorsScore-dataproc-t4.csv
@@ -293,3 +293,4 @@ DecimalSum,1.5
MaxBy,1.5
MinBy,1.5
ArrayJoin,1.5
+RunningWindowFunctionExec,1.5
diff --git a/core/src/main/resources/operatorsScore-emr-a10.csv b/core/src/main/resources/operatorsScore-emr-a10.csv
index 77befd12e..0d350be80 100644
--- a/core/src/main/resources/operatorsScore-emr-a10.csv
+++ b/core/src/main/resources/operatorsScore-emr-a10.csv
@@ -293,3 +293,4 @@ DecimalSum,1.5
MaxBy,1.5
MinBy,1.5
ArrayJoin,1.5
+RunningWindowFunctionExec,1.5
diff --git a/core/src/main/resources/operatorsScore-emr-a10G.csv b/core/src/main/resources/operatorsScore-emr-a10G.csv
index 77befd12e..0d350be80 100644
--- a/core/src/main/resources/operatorsScore-emr-a10G.csv
+++ b/core/src/main/resources/operatorsScore-emr-a10G.csv
@@ -293,3 +293,4 @@ DecimalSum,1.5
MaxBy,1.5
MinBy,1.5
ArrayJoin,1.5
+RunningWindowFunctionExec,1.5
diff --git a/core/src/main/resources/operatorsScore-emr-t4.csv b/core/src/main/resources/operatorsScore-emr-t4.csv
index 3f1296c38..c651cf976 100644
--- a/core/src/main/resources/operatorsScore-emr-t4.csv
+++ b/core/src/main/resources/operatorsScore-emr-t4.csv
@@ -293,3 +293,4 @@ DecimalSum,1.5
MaxBy,1.5
MinBy,1.5
ArrayJoin,1.5
+RunningWindowFunctionExec,1.5
diff --git a/core/src/main/resources/operatorsScore-onprem-a100.csv b/core/src/main/resources/operatorsScore-onprem-a100.csv
index 4dc58f0c0..7cdd59978 100644
--- a/core/src/main/resources/operatorsScore-onprem-a100.csv
+++ b/core/src/main/resources/operatorsScore-onprem-a100.csv
@@ -305,3 +305,4 @@ DecimalSum,1.5
MaxBy,1.5
MinBy,1.5
ArrayJoin,1.5
+RunningWindowFunctionExec,1.5
diff --git a/core/src/main/resources/supportedExecs.csv b/core/src/main/resources/supportedExecs.csv
index 06e35b026..50e47f42f 100644
--- a/core/src/main/resources/supportedExecs.csv
+++ b/core/src/main/resources/supportedExecs.csv
@@ -57,3 +57,4 @@ WriteFilesExec,S,None,Input/Output,S,S,S,S,S,S,S,S,PS,S,S,S,S,S,PS,PS,PS,S,S,S
CustomShuffleReaderExec,S,None,Input/Output,S,S,S,S,S,S,S,S,PS,S,S,S,S,NS,PS,PS,PS,NS,NS,NS
WindowGroupLimitExec,S,None,Input/Output,S,S,S,S,S,S,S,S,PS,S,S,S,NS,NS,PS,PS,PS,NS,NS,NS
MapInArrowExec,S,None,Input/Output,S,S,S,S,S,S,S,S,PS,S,NS,NS,NS,NS,PS,NS,PS,NS,NS,NS
+RunningWindowFunctionExec,S,None,Input/Output,S,S,S,S,S,S,S,S,PS,S,S,S,NS,NS,PS,PS,PS,NS,NS,NS
diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/Platform.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/Platform.scala
index 5ac20b05e..39429085e 100644
--- a/core/src/main/scala/com/nvidia/spark/rapids/tool/Platform.scala
+++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/Platform.scala
@@ -22,7 +22,7 @@ import com.nvidia.spark.rapids.tool.profiling.ClusterProperties
import org.apache.spark.internal.Logging
import org.apache.spark.sql.rapids.tool.{ExistingClusterInfo, RecommendedClusterInfo}
-import org.apache.spark.sql.rapids.tool.util.StringUtils
+import org.apache.spark.sql.rapids.tool.util.{SparkRuntime, StringUtils}
/**
* Utility object containing constants for various platform names.
@@ -132,6 +132,19 @@ abstract class Platform(var gpuDevice: Option[GpuDevice],
var recommendedClusterInfo: Option[RecommendedClusterInfo] = None
// the number of GPUs to use, this might be updated as we handle different cases
var numGpus: Int = 1
+ // Default runtime for the platform
+ val defaultRuntime: SparkRuntime.SparkRuntime = SparkRuntime.SPARK
+ // Set of supported runtimes for the platform
+ protected val supportedRuntimes: Set[SparkRuntime.SparkRuntime] = Set(
+ SparkRuntime.SPARK, SparkRuntime.SPARK_RAPIDS
+ )
+
+ /**
+ * Checks if the given runtime is supported by the platform.
+ */
+ def isRuntimeSupported(runtime: SparkRuntime.SparkRuntime): Boolean = {
+ supportedRuntimes.contains(runtime)
+ }
// This function allow us to have one gpu type used by the auto
// tuner recommendations but have a different GPU used for speedup
@@ -511,6 +524,10 @@ abstract class DatabricksPlatform(gpuDevice: Option[GpuDevice],
override val defaultGpuDevice: GpuDevice = T4Gpu
override def isPlatformCSP: Boolean = true
+ override val supportedRuntimes: Set[SparkRuntime.SparkRuntime] = Set(
+ SparkRuntime.SPARK, SparkRuntime.SPARK_RAPIDS, SparkRuntime.PHOTON
+ )
+
// note that Databricks generally sets the spark.executor.memory for the user. Our
// auto tuner heuristics generally sets it lower then Databricks so go ahead and
// allow our auto tuner to take affect for this in anticipation that we will use more
diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/AppSQLPlanAnalyzer.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/AppSQLPlanAnalyzer.scala
index 9580aa470..7ca4bbb5b 100644
--- a/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/AppSQLPlanAnalyzer.scala
+++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/AppSQLPlanAnalyzer.scala
@@ -16,6 +16,7 @@
package com.nvidia.spark.rapids.tool.analysis
+import scala.collection.breakOut
import scala.collection.mutable.{AbstractSet, ArrayBuffer, HashMap, LinkedHashSet}
import com.nvidia.spark.rapids.tool.profiling.{AccumProfileResults, SQLAccumProfileResults, SQLMetricInfoCase, SQLStageInfoProfileResult, UnsupportedSQLPlan, WholeStageCodeGenResults}
@@ -265,7 +266,7 @@ class AppSQLPlanAnalyzer(app: AppBase, appIndex: Int) extends AppAnalysisBase(ap
val jobsWithSQL = app.jobIdToInfo.filter { case (_, j) =>
j.sqlID.nonEmpty
}
- val sqlToStages = jobsWithSQL.flatMap { case (jobId, j) =>
+ jobsWithSQL.flatMap { case (jobId, j) =>
val stages = j.stageIds
val stagesInJob = app.stageManager.getStagesByIds(stages)
stagesInJob.map { sModel =>
@@ -283,8 +284,7 @@ class AppSQLPlanAnalyzer(app: AppBase, appIndex: Int) extends AppAnalysisBase(ap
SQLStageInfoProfileResult(appIndex, j.sqlID.get, jobId, sModel.stageInfo.stageId,
sModel.stageInfo.attemptNumber(), sModel.duration, nodeNames)
}
- }
- sqlToStages.toSeq
+ }(breakOut)
}
def generateSQLAccums(): Seq[SQLAccumProfileResults] = {
@@ -294,20 +294,11 @@ class AppSQLPlanAnalyzer(app: AppBase, appIndex: Int) extends AppAnalysisBase(ap
val driverAccumsOpt = app.driverAccumMap.get(metric.accumulatorId)
val driverMax = driverAccumsOpt match {
case Some(accums) =>
- val filtered = accums.filter { a =>
- a.sqlID == metric.sqlID
- }
- val accumValues = filtered.map(_.value).sortWith(_ < _)
- if (accumValues.isEmpty) {
- None
- } else if (accumValues.length <= 1) {
- Some(StatisticsMetrics(0L, 0L, 0L, accumValues.sum))
- } else {
- Some(StatisticsMetrics(accumValues(0), accumValues(accumValues.size / 2),
- accumValues(accumValues.size - 1), accumValues.sum))
- }
- case None =>
- None
+ StatisticsMetrics.createOptionalFromArr(accums.collect {
+ case a if a.sqlID == metric.sqlID =>
+ a.value
+ }(breakOut))
+ case _ => None
}
if (accumTaskStats.isDefined || driverMax.isDefined) {
@@ -325,7 +316,7 @@ class AppSQLPlanAnalyzer(app: AppBase, appIndex: Int) extends AppAnalysisBase(ap
} else {
None
}
- }
+ }(breakOut)
}
/**
@@ -341,40 +332,31 @@ class AppSQLPlanAnalyzer(app: AppBase, appIndex: Int) extends AppAnalysisBase(ap
def generateStageLevelAccums(): Seq[AccumProfileResults] = {
app.accumManager.accumInfoMap.flatMap { accumMapEntry =>
val accumInfo = accumMapEntry._2
- accumInfo.stageValuesMap.keySet.flatMap( stageId => {
- val stageTaskIds = app.taskManager.getAllTasksStageAttempt(stageId).map(_.taskId).toSet
- // get the task updates that belong to that stage
- val taskUpatesSubset =
- accumInfo.taskUpdatesMap.filterKeys(stageTaskIds.contains).values.toSeq.sorted
- if (taskUpatesSubset.isEmpty) {
- None
- } else {
- val min = taskUpatesSubset.head
- val max = taskUpatesSubset.last
- val sum = taskUpatesSubset.sum
- val median = if (taskUpatesSubset.size % 2 == 0) {
- val mid = taskUpatesSubset.size / 2
- (taskUpatesSubset(mid) + taskUpatesSubset(mid - 1)) / 2
- } else {
- taskUpatesSubset(taskUpatesSubset.size / 2)
- }
- // reuse AccumProfileResults to avoid generating extra memory from allocating new objects
- val accumProfileResults = AccumProfileResults(
- appIndex,
- stageId,
- accumInfo.infoRef,
- min = min,
- median = median,
- max = max,
- total = sum)
- if (accumInfo.infoRef.name.isDiagnosticMetrics()) {
- updateStageDiagnosticMetrics(accumProfileResults)
- }
- Some(accumProfileResults)
+ accumInfo.stageValuesMap.keys.flatMap( stageId => {
+ val stageTaskIds: Set[Long] =
+ app.taskManager.getAllTasksStageAttempt(stageId).map(_.taskId)(breakOut)
+ // Get the task updates that belong to that stage
+ StatisticsMetrics.createOptionalFromArr(
+ accumInfo.taskUpdatesMap.filterKeys(stageTaskIds).map(_._2)(breakOut)) match {
+ case Some(stat) =>
+ // Reuse AccumProfileResults to avoid generating allocating new objects
+ val accumProfileResults = AccumProfileResults(
+ appIndex,
+ stageId,
+ accumInfo.infoRef,
+ min = stat.min,
+ median = stat.med,
+ max = stat.max,
+ total = stat.total)
+ if (accumInfo.infoRef.name.isDiagnosticMetrics()) {
+ updateStageDiagnosticMetrics(accumProfileResults)
+ }
+ Some(accumProfileResults)
+ case _ => None
}
})
- }
- }.toSeq
+ }(breakOut)
+ }
}
object AppSQLPlanAnalyzer {
diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/AppSparkMetricsAggTrait.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/AppSparkMetricsAggTrait.scala
index 0f43ae8b2..30fb10ac9 100644
--- a/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/AppSparkMetricsAggTrait.scala
+++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/AppSparkMetricsAggTrait.scala
@@ -35,12 +35,13 @@ trait AppSparkMetricsAggTrait extends AppIndexMapperTrait {
def getAggRawMetrics(app: AppBase, index: Int, sqlAnalyzer: Option[AppSQLPlanAnalyzer] = None):
AggRawMetricsResult = {
val analysisObj = new AppSparkMetricsAnalyzer(app)
+ val sqlMetricsAgg = analysisObj.aggregateSparkMetricsBySql(index)
AggRawMetricsResult(
analysisObj.aggregateSparkMetricsByJob(index),
analysisObj.aggregateSparkMetricsByStage(index),
analysisObj.shuffleSkewCheck(index),
- analysisObj.aggregateSparkMetricsBySql(index),
- analysisObj.aggregateIOMetricsBySql(analysisObj.aggregateSparkMetricsBySql(index)),
+ sqlMetricsAgg,
+ analysisObj.aggregateIOMetricsBySql(sqlMetricsAgg),
analysisObj.aggregateDurationAndCPUTimeBySql(index),
Seq(analysisObj.maxTaskInputSizeBytesPerSQL(index)),
analysisObj.aggregateDiagnosticMetricsByStage(index, sqlAnalyzer))
diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/AppSparkMetricsAnalyzer.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/AppSparkMetricsAnalyzer.scala
index 33194644e..6b8c3d5e5 100644
--- a/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/AppSparkMetricsAnalyzer.scala
+++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/AppSparkMetricsAnalyzer.scala
@@ -16,17 +16,17 @@
package com.nvidia.spark.rapids.tool.analysis
-import java.util.concurrent.TimeUnit
-
+import scala.collection.breakOut
import scala.collection.mutable.{ArrayBuffer, HashMap, LinkedHashMap}
import com.nvidia.spark.rapids.tool.analysis.StageAccumDiagnosticMetrics._
+import com.nvidia.spark.rapids.tool.analysis.util.{AggAccumHelper, AggAccumPhotonHelper}
import com.nvidia.spark.rapids.tool.planparser.DatabricksParseHelper
-import com.nvidia.spark.rapids.tool.profiling.{AccumProfileResults, IOAnalysisProfileResult, JobAggTaskMetricsProfileResult, ShuffleSkewProfileResult, SQLDurationExecutorTimeProfileResult, SQLMaxTaskInputSizes, SQLTaskAggMetricsProfileResult, StageAggTaskMetricsProfileResult, StageDiagnosticResult}
+import com.nvidia.spark.rapids.tool.profiling._
import org.apache.spark.sql.rapids.tool.{AppBase, ToolUtils}
import org.apache.spark.sql.rapids.tool.profiling.ApplicationInfo
-import org.apache.spark.sql.rapids.tool.store.{AccumInfo, AccumMetaRef, AccumNameRef, TaskModel}
+import org.apache.spark.sql.rapids.tool.store.{AccumInfo, AccumMetaRef}
/**
* Does analysis on the DataFrames from object of AppBase.
@@ -80,60 +80,54 @@ class AppSparkMetricsAnalyzer(app: AppBase) extends AppAnalysisBase(app) {
* @return sequence of JobAggTaskMetricsProfileResult that contains only Job Ids
*/
def aggregateSparkMetricsByJob(index: Int): Seq[JobAggTaskMetricsProfileResult] = {
- val jobRows = app.jobIdToInfo.flatMap { case (id, jc) =>
+ app.jobIdToInfo.flatMap { case (id, jc) =>
if (jc.stageIds.isEmpty) {
None
} else {
- val profResultsInJob = stageLevelSparkMetrics(index).filterKeys(jc.stageIds.contains).values
- if (profResultsInJob.isEmpty) {
+ val jobAggAccumulator = new AggAccumHelper()
+ val perJobRec = jobAggAccumulator.accumPerJob(
+ jc.stageIds.collect {
+ case stageId if stageLevelSparkMetrics(index).contains(stageId) =>
+ stageLevelSparkMetrics(index)(stageId)
+ })
+ if (perJobRec.isEmptyAggregates) {
None
} else {
- // Recalculate the duration sum, max, min, avg for the job based on the cached
- // stage Profiling results
- val tasksInJob = profResultsInJob.map(_.numTasks).sum
- val durSum = profResultsInJob.map(_.durationSum).sum
- val durMax =
- AppSparkMetricsAnalyzer.maxWithEmptyHandling(profResultsInJob.map(_.durationMax))
- val durMin =
- AppSparkMetricsAnalyzer.minWithEmptyHandling(profResultsInJob.map(_.durationMin))
- val durAvg = ToolUtils.calculateAverage(durSum, tasksInJob, 1)
Some(JobAggTaskMetricsProfileResult(index,
id,
- tasksInJob,
+ perJobRec.numTasks,
jc.duration,
- profResultsInJob.map(_.diskBytesSpilledSum).sum,
- durSum,
- durMax,
- durMin,
- durAvg,
- profResultsInJob.map(_.executorCPUTimeSum).sum,
- profResultsInJob.map(_.executorDeserializeCpuTimeSum).sum,
- profResultsInJob.map(_.executorDeserializeTimeSum).sum,
- profResultsInJob.map(_.executorRunTimeSum).sum,
- profResultsInJob.map(_.inputBytesReadSum).sum,
- profResultsInJob.map(_.inputRecordsReadSum).sum,
- profResultsInJob.map(_.jvmGCTimeSum).sum,
- profResultsInJob.map(_.memoryBytesSpilledSum).sum,
- profResultsInJob.map(_.outputBytesWrittenSum).sum,
- profResultsInJob.map(_.outputRecordsWrittenSum).sum,
- AppSparkMetricsAnalyzer.maxWithEmptyHandling(
- profResultsInJob.map(_.peakExecutionMemoryMax)),
- profResultsInJob.map(_.resultSerializationTimeSum).sum,
- AppSparkMetricsAnalyzer.maxWithEmptyHandling(profResultsInJob.map(_.resultSizeMax)),
- profResultsInJob.map(_.srFetchWaitTimeSum).sum,
- profResultsInJob.map(_.srLocalBlocksFetchedSum).sum,
- profResultsInJob.map(_.srcLocalBytesReadSum).sum,
- profResultsInJob.map(_.srRemoteBlocksFetchSum).sum,
- profResultsInJob.map(_.srRemoteBytesReadSum).sum,
- profResultsInJob.map(_.srRemoteBytesReadToDiskSum).sum,
- profResultsInJob.map(_.srTotalBytesReadSum).sum,
- profResultsInJob.map(_.swBytesWrittenSum).sum,
- profResultsInJob.map(_.swRecordsWrittenSum).sum,
- profResultsInJob.map(_.swWriteTimeSum).sum))
+ perJobRec.diskBytesSpilledSum,
+ perJobRec.durationSum,
+ perJobRec.durationMax,
+ perJobRec.durationMin,
+ perJobRec.durationAvg,
+ perJobRec.executorCPUTimeSum,
+ perJobRec.executorDeserializeCpuTimeSum,
+ perJobRec.executorDeserializeTimeSum,
+ perJobRec.executorRunTimeSum,
+ perJobRec.inputBytesReadSum,
+ perJobRec.inputRecordsReadSum,
+ perJobRec.jvmGCTimeSum,
+ perJobRec.memoryBytesSpilledSum,
+ perJobRec.outputBytesWrittenSum,
+ perJobRec.outputRecordsWrittenSum,
+ perJobRec.peakExecutionMemoryMax,
+ perJobRec.resultSerializationTimeSum,
+ perJobRec.resultSizeMax,
+ perJobRec.srFetchWaitTimeSum,
+ perJobRec.srLocalBlocksFetchedSum,
+ perJobRec.srLocalBytesReadSum,
+ perJobRec.srRemoteBlocksFetchSum,
+ perJobRec.srRemoteBytesReadSum,
+ perJobRec.srRemoteBytesReadToDiskSum,
+ perJobRec.srTotalBytesReadSum,
+ perJobRec.swBytesWrittenSum,
+ perJobRec.swRecordsWrittenSum,
+ perJobRec.swWriteTimeSum))
}
}
- }
- jobRows.toSeq
+ }(breakOut)
}
private case class AverageStageInfo(avgDuration: Double, avgShuffleReadBytes: Double)
@@ -169,7 +163,7 @@ class AppSparkMetricsAnalyzer(app: AppBase) extends AppAnalysisBase(app) {
tc.taskId, tc.attempt, tc.duration, avg.avgDuration, tc.sr_totalBytesRead,
avg.avgShuffleReadBytes, tc.peakExecutionMemory, tc.successful, tc.endReason)
}
- }.toSeq
+ }(breakOut)
}
/**
@@ -178,76 +172,64 @@ class AppSparkMetricsAnalyzer(app: AppBase) extends AppAnalysisBase(app) {
* @return sequence of SQLTaskAggMetricsProfileResult
*/
def aggregateSparkMetricsBySql(index: Int): Seq[SQLTaskAggMetricsProfileResult] = {
- val sqlRows = app.sqlIdToInfo.flatMap { case (sqlId, sqlCase) =>
+ app.sqlIdToInfo.flatMap { case (sqlId, sqlCase) =>
if (app.sqlIdToStages.contains(sqlId)) {
val stagesInSQL = app.sqlIdToStages(sqlId)
// TODO: Should we only consider successful tasks?
- val cachedResBySQL = stageLevelSparkMetrics(index).filterKeys(stagesInSQL.contains).values
- if (cachedResBySQL.isEmpty) {
+ val sqlAggAccumulator = new AggAccumHelper()
+ val preSqlRec = sqlAggAccumulator.accumPerSQL(
+ stagesInSQL.collect {
+ case stageId if stageLevelSparkMetrics(index).contains(stageId) =>
+ stageLevelSparkMetrics(index)(stageId)
+ })
+ if (preSqlRec.isEmptyAggregates) {
None
} else {
- // Recalculate the duration sum, max, min, avg for the job based on the cached
- // stage Profiling results
- val tasksInSql = cachedResBySQL.map(_.numTasks).sum
- val durSum = cachedResBySQL.map(_.durationSum).sum
- val durMax =
- AppSparkMetricsAnalyzer.maxWithEmptyHandling(cachedResBySQL.map(_.durationMax))
- val durMin =
- AppSparkMetricsAnalyzer.minWithEmptyHandling(cachedResBySQL.map(_.durationMin))
- val durAvg = ToolUtils.calculateAverage(durSum, tasksInSql, 1)
- val diskBytes = cachedResBySQL.map(_.diskBytesSpilledSum).sum
- val execCpuTime = cachedResBySQL.map(_.executorCPUTimeSum).sum
- val execRunTime = cachedResBySQL.map(_.executorRunTimeSum).sum
- val execCPURatio = ToolUtils.calculateDurationPercent(execCpuTime, execRunTime)
- val inputBytesRead = cachedResBySQL.map(_.inputBytesReadSum).sum
// set this here, so make sure we don't get it again until later
- sqlCase.sqlCpuTimePercent = execCPURatio
-
+ sqlCase.sqlCpuTimePercent = preSqlRec.executorCpuRatio
Some(SQLTaskAggMetricsProfileResult(index,
app.appId,
sqlId,
sqlCase.description,
- tasksInSql,
+ preSqlRec.numTasks,
sqlCase.duration,
- execCpuTime,
- execRunTime,
- execCPURatio,
- diskBytes,
- durSum,
- durMax,
- durMin,
- durAvg,
- execCpuTime,
- cachedResBySQL.map(_.executorDeserializeCpuTimeSum).sum,
- cachedResBySQL.map(_.executorDeserializeTimeSum).sum,
- execRunTime,
- inputBytesRead,
- inputBytesRead * 1.0 / tasksInSql,
- cachedResBySQL.map(_.inputRecordsReadSum).sum,
- cachedResBySQL.map(_.jvmGCTimeSum).sum,
- cachedResBySQL.map(_.memoryBytesSpilledSum).sum,
- cachedResBySQL.map(_.outputBytesWrittenSum).sum,
- cachedResBySQL.map(_.outputRecordsWrittenSum).sum,
- AppSparkMetricsAnalyzer.maxWithEmptyHandling(
- cachedResBySQL.map(_.peakExecutionMemoryMax)),
- cachedResBySQL.map(_.resultSerializationTimeSum).sum,
- AppSparkMetricsAnalyzer.maxWithEmptyHandling(cachedResBySQL.map(_.resultSizeMax)),
- cachedResBySQL.map(_.srFetchWaitTimeSum).sum,
- cachedResBySQL.map(_.srLocalBlocksFetchedSum).sum,
- cachedResBySQL.map(_.srcLocalBytesReadSum).sum,
- cachedResBySQL.map(_.srRemoteBlocksFetchSum).sum,
- cachedResBySQL.map(_.srRemoteBytesReadSum).sum,
- cachedResBySQL.map(_.srRemoteBytesReadToDiskSum).sum,
- cachedResBySQL.map(_.srTotalBytesReadSum).sum,
- cachedResBySQL.map(_.swBytesWrittenSum).sum,
- cachedResBySQL.map(_.swRecordsWrittenSum).sum,
- cachedResBySQL.map(_.swWriteTimeSum).sum))
+ preSqlRec.executorCPUTimeSum,
+ preSqlRec.executorRunTimeSum,
+ preSqlRec.executorCpuRatio,
+ preSqlRec.diskBytesSpilledSum,
+ preSqlRec.durationSum,
+ preSqlRec.durationMax,
+ preSqlRec.durationMin,
+ preSqlRec.durationAvg,
+ preSqlRec.executorCPUTimeSum,
+ preSqlRec.executorDeserializeCpuTimeSum,
+ preSqlRec.executorDeserializeTimeSum,
+ preSqlRec.executorRunTimeSum,
+ preSqlRec.inputBytesReadSum,
+ preSqlRec.inputBytesReadAvg,
+ preSqlRec.inputRecordsReadSum,
+ preSqlRec.jvmGCTimeSum,
+ preSqlRec.memoryBytesSpilledSum,
+ preSqlRec.outputBytesWrittenSum,
+ preSqlRec.outputRecordsWrittenSum,
+ preSqlRec.peakExecutionMemoryMax,
+ preSqlRec.resultSerializationTimeSum,
+ preSqlRec.resultSizeMax,
+ preSqlRec.srFetchWaitTimeSum,
+ preSqlRec.srLocalBlocksFetchedSum,
+ preSqlRec.srLocalBytesReadSum,
+ preSqlRec.srRemoteBlocksFetchSum,
+ preSqlRec.srRemoteBytesReadSum,
+ preSqlRec.srRemoteBytesReadToDiskSum,
+ preSqlRec.srTotalBytesReadSum,
+ preSqlRec.swBytesWrittenSum,
+ preSqlRec.swRecordsWrittenSum,
+ preSqlRec.swWriteTimeSum))
}
} else {
None
}
- }
- sqlRows.toSeq
+ }(breakOut)
}
/**
@@ -258,7 +240,7 @@ class AppSparkMetricsAnalyzer(app: AppBase) extends AppAnalysisBase(app) {
*/
def aggregateIOMetricsBySql(
sqlMetricsAggs: Seq[SQLTaskAggMetricsProfileResult]): Seq[IOAnalysisProfileResult] = {
- val sqlIORows = sqlMetricsAggs.map { sqlAgg =>
+ sqlMetricsAggs.map { sqlAgg =>
IOAnalysisProfileResult(sqlAgg.appIndex,
app.appId,
sqlAgg.sqlId,
@@ -270,8 +252,7 @@ class AppSparkMetricsAnalyzer(app: AppBase) extends AppAnalysisBase(app) {
sqlAgg.memoryBytesSpilledSum,
sqlAgg.srTotalBytesReadSum,
sqlAgg.swBytesWrittenSum)
- }
- sqlIORows
+ }(breakOut)
}
/**
@@ -306,7 +287,7 @@ class AppSparkMetricsAnalyzer(app: AppBase) extends AppAnalysisBase(app) {
* @return a sequence of SQLDurationExecutorTimeProfileResult or Empty if None.
*/
def aggregateDurationAndCPUTimeBySql(index: Int): Seq[SQLDurationExecutorTimeProfileResult] = {
- val sqlRows = app.sqlIdToInfo.map { case (sqlId, sqlCase) =>
+ app.sqlIdToInfo.map { case (sqlId, sqlCase) =>
// First, build the SQLIssues string by retrieving the potential issues from the
// app.sqlIDtoProblematic map.
val sqlIssues = if (app.sqlIDtoProblematic.contains(sqlId)) {
@@ -318,8 +299,7 @@ class AppSparkMetricsAnalyzer(app: AppBase) extends AppAnalysisBase(app) {
SQLDurationExecutorTimeProfileResult(index, app.appId, sqlCase.rootExecutionID,
sqlId, sqlCase.duration, sqlCase.hasDatasetOrRDD,
app.getAppDuration.orElse(Option(0L)), sqlIssues, sqlCase.sqlCpuTimePercent)
- }
- sqlRows.toSeq
+ }(breakOut)
}
/**
@@ -339,8 +319,9 @@ class AppSparkMetricsAnalyzer(app: AppBase) extends AppAnalysisBase(app) {
app.asInstanceOf[ApplicationInfo].planMetricProcessor
}
val zeroAccumProfileResults =
- AccumProfileResults(0, 0, AccumMetaRef(0L, AccumNameRef("")), 0L, 0L, 0L, 0L)
-
+ AccumProfileResults(0, 0, AccumMetaRef.EMPTY_ACCUM_META_REF, 0L, 0L, 0L, 0L)
+ val emptyNodeNames = Seq.empty[String]
+ val emptyDiagnosticMetrics = HashMap.empty[String, AccumProfileResults]
// TODO: this has stage attempts. we should handle different attempts
app.stageManager.getAllStages.map { sm =>
// TODO: Should we only consider successful tasks?
@@ -348,13 +329,13 @@ class AppSparkMetricsAnalyzer(app: AppBase) extends AppAnalysisBase(app) {
sm.stageInfo.attemptNumber())
// count duplicate task attempts
val numTasks = tasksInStage.size
- val nodeNames = sqlAnalyzer.stageToNodeNames.
- getOrElse(sm.stageInfo.stageId, Seq.empty[String])
- val diagnosticMetricsMap = sqlAnalyzer.stageToDiagnosticMetrics.
- getOrElse(sm.stageInfo.stageId, HashMap.empty[String, AccumProfileResults]).
- withDefaultValue(zeroAccumProfileResults)
+ val nodeNames = sqlAnalyzer.stageToNodeNames.getOrElse(sm.stageInfo.stageId, emptyNodeNames)
+ val diagnosticMetricsMap =
+ sqlAnalyzer.stageToDiagnosticMetrics
+ .getOrElse(sm.stageInfo.stageId, emptyDiagnosticMetrics)
+ .withDefaultValue(zeroAccumProfileResults)
val srTotalBytesMetrics =
- AppSparkMetricsAnalyzer.getStatistics(tasksInStage.map(_.sr_totalBytesRead))
+ StatisticsMetrics.createFromArr(tasksInStage.map(_.sr_totalBytesRead)(breakOut))
StageDiagnosticResult(index,
app.getAppName,
@@ -375,7 +356,7 @@ class AppSparkMetricsAnalyzer(app: AppBase) extends AppAnalysisBase(app) {
diagnosticMetricsMap(SW_WRITE_TIME_METRIC),
diagnosticMetricsMap(GPU_SEMAPHORE_WAIT_METRIC),
nodeNames)
- }.toSeq
+ }(breakOut)
}
/**
@@ -417,10 +398,8 @@ class AppSparkMetricsAnalyzer(app: AppBase) extends AppAnalysisBase(app) {
// TODO: Should we only consider successful tasks?
val tasksInStage = app.taskManager.getTasks(sm.stageInfo.stageId,
sm.stageInfo.attemptNumber())
- // count duplicate task attempts
- val numAttempts = tasksInStage.size
- val (peakMemoryMax, shuffleWriteTimeSum) = if (app.isPhoton) {
+ val accumHelperObj = if (app.isPhoton) { // If this a photon app, use the photonHelper
// For max peak memory, we need to look at the accumulators at the task level.
val peakMemoryValues = tasksInStage.flatMap { taskModel =>
photonPeakMemoryAccumInfos.flatMap { accumInfo =>
@@ -431,98 +410,46 @@ class AppSparkMetricsAnalyzer(app: AppBase) extends AppAnalysisBase(app) {
val shuffleWriteValues = photonShuffleWriteTimeAccumInfos.flatMap { accumInfo =>
accumInfo.stageValuesMap.get(sm.stageInfo.stageId)
}
- (AppSparkMetricsAnalyzer.maxWithEmptyHandling(peakMemoryValues),
- TimeUnit.NANOSECONDS.toMillis(shuffleWriteValues.sum))
+ new AggAccumPhotonHelper(shuffleWriteValues, peakMemoryValues)
} else {
// For non-Photon apps, use the task metrics directly.
- val peakMemoryValues = tasksInStage.map(_.peakExecutionMemory)
- val shuffleWriteTime = tasksInStage.map(_.sw_writeTime)
- (AppSparkMetricsAnalyzer.maxWithEmptyHandling(peakMemoryValues),
- shuffleWriteTime.sum)
+ new AggAccumHelper()
}
-
- val (durSum, durMax, durMin, durAvg) = AppSparkMetricsAnalyzer.getDurations(tasksInStage)
+ val perStageRec = accumHelperObj.accumPerStage(tasksInStage)
val stageRow = StageAggTaskMetricsProfileResult(index,
sm.stageInfo.stageId,
- numAttempts, // TODO: why is this numAttempts and not numTasks?
+ // numTasks includes duplicate task attempts
+ perStageRec.numTasks,
sm.duration,
- tasksInStage.map(_.diskBytesSpilled).sum,
- durSum,
- durMax,
- durMin,
- durAvg,
- tasksInStage.map(_.executorCPUTime).sum,
- tasksInStage.map(_.executorDeserializeCPUTime).sum,
- tasksInStage.map(_.executorDeserializeTime).sum,
- tasksInStage.map(_.executorRunTime).sum,
- tasksInStage.map(_.input_bytesRead).sum,
- tasksInStage.map(_.input_recordsRead).sum,
- tasksInStage.map(_.jvmGCTime).sum,
- tasksInStage.map(_.memoryBytesSpilled).sum,
- tasksInStage.map(_.output_bytesWritten).sum,
- tasksInStage.map(_.output_recordsWritten).sum,
- peakMemoryMax,
- tasksInStage.map(_.resultSerializationTime).sum,
- AppSparkMetricsAnalyzer.maxWithEmptyHandling(tasksInStage.map(_.resultSize)),
- tasksInStage.map(_.sr_fetchWaitTime).sum,
- tasksInStage.map(_.sr_localBlocksFetched).sum,
- tasksInStage.map(_.sr_localBytesRead).sum,
- tasksInStage.map(_.sr_remoteBlocksFetched).sum,
- tasksInStage.map(_.sr_remoteBytesRead).sum,
- tasksInStage.map(_.sr_remoteBytesReadToDisk).sum,
- tasksInStage.map(_.sr_totalBytesRead).sum,
- tasksInStage.map(_.sw_bytesWritten).sum,
- tasksInStage.map(_.sw_recordsWritten).sum,
- shuffleWriteTimeSum
- )
+ perStageRec.diskBytesSpilledSum,
+ perStageRec.durationSum,
+ perStageRec.durationMax,
+ perStageRec.durationMin,
+ perStageRec.durationAvg,
+ perStageRec.executorCPUTimeSum,
+ perStageRec.executorDeserializeCpuTimeSum,
+ perStageRec.executorDeserializeTimeSum,
+ perStageRec.executorRunTimeSum,
+ perStageRec.inputBytesReadSum,
+ perStageRec.inputRecordsReadSum,
+ perStageRec.jvmGCTimeSum,
+ perStageRec.memoryBytesSpilledSum,
+ perStageRec.outputBytesWrittenSum,
+ perStageRec.outputRecordsWrittenSum,
+ perStageRec.peakExecutionMemoryMax,
+ perStageRec.resultSerializationTimeSum,
+ perStageRec.resultSizeMax,
+ perStageRec.srFetchWaitTimeSum,
+ perStageRec.srLocalBlocksFetchedSum,
+ perStageRec.srLocalBytesReadSum,
+ perStageRec.srRemoteBlocksFetchSum,
+ perStageRec.srRemoteBytesReadSum,
+ perStageRec.srRemoteBytesReadToDiskSum,
+ perStageRec.srTotalBytesReadSum,
+ perStageRec.swBytesWrittenSum,
+ perStageRec.swRecordsWrittenSum,
+ perStageRec.swWriteTimeSum)
stageLevelSparkMetrics(index).put(sm.stageInfo.stageId, stageRow)
}
}
}
-
-
-object AppSparkMetricsAnalyzer {
- def getDurations(tcs: Iterable[TaskModel]): (Long, Long, Long, Double) = {
- val durations = tcs.map(_.duration)
- if (durations.nonEmpty) {
- (durations.sum, durations.max, durations.min,
- ToolUtils.calculateAverage(durations.sum, durations.size, 1))
- } else {
- (0L, 0L, 0L, 0.toDouble)
- }
- }
-
- /**
- * Given an input iterable, returns its min, median, max and sum.
- */
- def getStatistics(arr: Iterable[Long]): StatisticsMetrics = {
- if (arr.isEmpty) {
- StatisticsMetrics(0L, 0L, 0L, 0L)
- } else {
- val sortedArr = arr.toSeq.sorted
- val len = sortedArr.size
- val med = if (len % 2 == 0) {
- (sortedArr(len / 2) + sortedArr(len / 2 - 1)) / 2
- } else {
- sortedArr(len / 2)
- }
- StatisticsMetrics(sortedArr.head, med, sortedArr(len - 1), sortedArr.sum)
- }
- }
-
- def maxWithEmptyHandling(arr: Iterable[Long]): Long = {
- if (arr.isEmpty) {
- 0L
- } else {
- arr.max
- }
- }
-
- def minWithEmptyHandling(arr: Iterable[Long]): Long = {
- if (arr.isEmpty) {
- 0L
- } else {
- arr.min
- }
- }
-}
diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/StatisticsMetrics.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/StatisticsMetrics.scala
index 1b88d2d4c..d0a21a6c0 100644
--- a/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/StatisticsMetrics.scala
+++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/StatisticsMetrics.scala
@@ -16,6 +16,8 @@
package com.nvidia.spark.rapids.tool.analysis
+import org.apache.spark.sql.rapids.tool.util.InPlaceMedianArrView.{chooseMidpointPivotInPlace, findMedianInPlace}
+
// Store (min, median, max, total) for a given metric
case class StatisticsMetrics(min: Long, med: Long, max: Long, total: Long)
@@ -23,4 +25,31 @@ object StatisticsMetrics {
// a static variable used to represent zero-statistics instead of allocating a dummy record
// on every calculation.
val ZERO_RECORD: StatisticsMetrics = StatisticsMetrics(0L, 0L, 0L, 0L)
+
+ def createFromArr(arr: Array[Long]): StatisticsMetrics = {
+ if (arr.isEmpty) {
+ return ZERO_RECORD
+ }
+ val medV = findMedianInPlace(arr)(chooseMidpointPivotInPlace)
+ var minV = Long.MaxValue
+ var maxV = Long.MinValue
+ var totalV = 0L
+ arr.foreach { v =>
+ if (v < minV) {
+ minV = v
+ }
+ if (v > maxV) {
+ maxV = v
+ }
+ totalV += v
+ }
+ StatisticsMetrics(minV, medV, maxV, totalV)
+ }
+
+ def createOptionalFromArr(arr: Array[Long]): Option[StatisticsMetrics] = {
+ if (arr.isEmpty) {
+ return None
+ }
+ Some(createFromArr(arr))
+ }
}
diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/util/AggAccumHelper.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/util/AggAccumHelper.scala
new file mode 100644
index 000000000..b42ac08b4
--- /dev/null
+++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/util/AggAccumHelper.scala
@@ -0,0 +1,59 @@
+/*
+ * Copyright (c) 2024, NVIDIA CORPORATION.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.nvidia.spark.rapids.tool.analysis.util
+
+import com.nvidia.spark.rapids.tool.profiling.StageAggTaskMetricsProfileResult
+
+import org.apache.spark.sql.rapids.tool.store.TaskModel
+
+/**
+ * A helper class to facilitate the accumulation of aggregate metrics.
+ * This is a separate class to allow further customization in the future. For example,
+ * a parellel processor can be used to split the iterables without changing the caller side.
+ */
+class AggAccumHelper {
+
+ private def accumCachedRecords[R <: TaskMetricsAccumRec](
+ stageRecords: Iterable[StageAggTaskMetricsProfileResult],
+ rec: R): Unit = {
+ stageRecords.foreach(rec.addRecord)
+ rec.finalizeAggregation()
+ }
+
+ protected def createStageAccumRecord(): TaskMetricsAccumRec = {
+ StageAggAccum()
+ }
+
+ def accumPerStage(taskRecords: Iterable[TaskModel]): TaskMetricsAccumRec = {
+ val resRec = createStageAccumRecord()
+ taskRecords.foreach(resRec.addRecord)
+ resRec.finalizeAggregation()
+ resRec
+ }
+
+ def accumPerSQL(stageRecords: Iterable[StageAggTaskMetricsProfileResult]): SQLAggAccum = {
+ val resRec = SQLAggAccum()
+ accumCachedRecords(stageRecords, resRec)
+ resRec
+ }
+
+ def accumPerJob(stageRecords: Iterable[StageAggTaskMetricsProfileResult]): JobAggAccum = {
+ val resRec = JobAggAccum()
+ accumCachedRecords(stageRecords, resRec)
+ resRec
+ }
+}
diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/util/AggAccumPhotonHelper.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/util/AggAccumPhotonHelper.scala
new file mode 100644
index 000000000..4f1356960
--- /dev/null
+++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/util/AggAccumPhotonHelper.scala
@@ -0,0 +1,31 @@
+/*
+ * Copyright (c) 2024, NVIDIA CORPORATION.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.nvidia.spark.rapids.tool.analysis.util
+
+/**
+ * Implementation of AggAccumHelper for Photon.
+ * It takes the shuffleWriteValues and peakMemValues Accumulables as an argument because those
+ * values are not available in the TaskModel.
+ */
+class AggAccumPhotonHelper(
+ shuffleWriteValues: Iterable[Long],
+ peakMemValues: Iterable[Long]) extends AggAccumHelper {
+
+ override def createStageAccumRecord(): TaskMetricsAccumRec = {
+ StageAggPhoton(shuffleWriteValues, peakMemValues)
+ }
+}
diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/util/JobAggAccum.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/util/JobAggAccum.scala
new file mode 100644
index 000000000..a8e5b78db
--- /dev/null
+++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/util/JobAggAccum.scala
@@ -0,0 +1,31 @@
+/*
+ * Copyright (c) 2024, NVIDIA CORPORATION.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.nvidia.spark.rapids.tool.analysis.util
+
+import org.apache.spark.sql.rapids.tool.store.TaskModel
+
+/**
+ * Accumulator for Job Aggregates.
+ * This is an optimization to avoid using the Scala collections API on each field for the entire
+ * number of tasks/stages in a Job.
+ */
+case class JobAggAccum() extends TaskMetricsAccumRec {
+ override def addRecord(rec: TaskModel): Unit = {
+ throw new UnsupportedOperationException(
+ "Not implemented: JobAggAccum accepts only cached records")
+ }
+}
diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/util/SQLAggAccum.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/util/SQLAggAccum.scala
new file mode 100644
index 000000000..b8222679f
--- /dev/null
+++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/util/SQLAggAccum.scala
@@ -0,0 +1,42 @@
+/*
+ * Copyright (c) 2024, NVIDIA CORPORATION.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.nvidia.spark.rapids.tool.analysis.util
+
+import org.apache.spark.sql.rapids.tool.ToolUtils
+import org.apache.spark.sql.rapids.tool.store.TaskModel
+
+/**
+ * Accumulator for SQL Aggregates.
+ * This is an optimization to avoid using the Scala collections API on each field for the entire
+ * number of tasks/stages in a SQL.
+ */
+case class SQLAggAccum(
+ var executorCpuRatio: Double = 0,
+ // Not added to the output since it is used only by the AutoTuner
+ var inputBytesReadAvg: Double = 0) extends TaskMetricsAccumRec {
+
+ override def finalizeAggregation(): Unit = {
+ super.finalizeAggregation()
+ executorCpuRatio = ToolUtils.calculateDurationPercent(executorCPUTimeSum, executorRunTimeSum)
+ inputBytesReadAvg = ToolUtils.calculateAverage(inputBytesReadSum, numTasks, 1)
+ }
+
+ override def addRecord(rec: TaskModel): Unit = {
+ throw new UnsupportedOperationException(
+ "Not implemented: SQLAggAccum accepts only cached records")
+ }
+}
diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/util/StageAggAccum.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/util/StageAggAccum.scala
new file mode 100644
index 000000000..c88f1a77d
--- /dev/null
+++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/util/StageAggAccum.scala
@@ -0,0 +1,31 @@
+/*
+ * Copyright (c) 2024, NVIDIA CORPORATION.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.nvidia.spark.rapids.tool.analysis.util
+
+import com.nvidia.spark.rapids.tool.profiling.StageAggTaskMetricsProfileResult
+
+/**
+ * Accumulator for Stage Aggregates.
+ * This is an optimization to avoid using the Scala collections API on each field for the entire
+ * number of tasks in a Stage.
+ */
+case class StageAggAccum() extends TaskMetricsAccumRec {
+ override def addRecord(rec: StageAggTaskMetricsProfileResult): Unit = {
+ throw new UnsupportedOperationException("Not implemented: Cannot use cached results to" +
+ "calculate stage aggregates")
+ }
+}
diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/util/StageAggPhoton.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/util/StageAggPhoton.scala
new file mode 100644
index 000000000..ed7127050
--- /dev/null
+++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/util/StageAggPhoton.scala
@@ -0,0 +1,54 @@
+/*
+ * Copyright (c) 2024, NVIDIA CORPORATION.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.nvidia.spark.rapids.tool.analysis.util
+
+import java.util.concurrent.TimeUnit
+
+import com.nvidia.spark.rapids.tool.profiling.StageAggTaskMetricsProfileResult
+
+/**
+ * Implementation of Accumulator object for Photon.
+ * It takes the shuffleWriteValues and peakMemValues Accumulables as an argument because those
+ * values are not available in the TaskModel.
+ */
+case class StageAggPhoton(
+ shuffleWriteValues: Iterable[Long],
+ peakMemValues: Iterable[Long]) extends TaskMetricsAccumRec {
+
+ override def addRecord(rec: StageAggTaskMetricsProfileResult): Unit = {
+ throw new UnsupportedOperationException("Not implemented: Cannot use cached results to" +
+ "calculate stage aggregates")
+ }
+
+ override def finalizeAggregation(): Unit = {
+ // Fix the shuffleWriteTimes and the peakMemoryValues to use the shuffleWriteValues and
+ // the peakMemValues.
+ swWriteTimeSum = 0
+ peakExecutionMemoryMax = 0
+ if (!isEmptyAggregates) {
+ // Re-calculate the photon specific fields only if the accumulator has tasks.
+ // Otherwise, leave it as 0.
+ if (shuffleWriteValues.nonEmpty) {
+ swWriteTimeSum = TimeUnit.NANOSECONDS.toMillis(shuffleWriteValues.sum)
+ }
+ if (peakMemValues.nonEmpty) {
+ peakExecutionMemoryMax = peakMemValues.max
+ }
+ }
+ super.finalizeAggregation()
+ }
+}
diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/util/TaskMetricsAccumRec.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/util/TaskMetricsAccumRec.scala
new file mode 100644
index 000000000..b5d98b9ac
--- /dev/null
+++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/util/TaskMetricsAccumRec.scala
@@ -0,0 +1,157 @@
+/*
+ * Copyright (c) 2024, NVIDIA CORPORATION.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.nvidia.spark.rapids.tool.analysis.util
+
+import com.nvidia.spark.rapids.tool.profiling.StageAggTaskMetricsProfileResult
+
+import org.apache.spark.sql.rapids.tool.ToolUtils
+import org.apache.spark.sql.rapids.tool.store.TaskModel
+
+/**
+ * Accumulator used for task metrics.
+ * This is an optimization decision to avoid using Scala builtin collections on every field in the
+ * taskModel.
+ */
+class TaskMetricsAccumRec {
+ var numTasks: Int = 0
+ var diskBytesSpilledSum: Long = 0
+ var durationSum: Long = 0
+ var durationMax: Long = Long.MinValue
+ var durationMin: Long = Long.MaxValue
+ var durationAvg: Double = 0.0
+ var executorCPUTimeSum: Long = 0
+ var executorDeserializeCpuTimeSum: Long = 0
+ var executorDeserializeTimeSum: Long = 0
+ var executorRunTimeSum: Long = 0
+ var inputBytesReadSum: Long = 0
+ var inputRecordsReadSum: Long = 0
+ var jvmGCTimeSum: Long = 0
+ var memoryBytesSpilledSum: Long = 0
+ var outputBytesWrittenSum: Long = 0
+ var outputRecordsWrittenSum: Long = 0
+ var peakExecutionMemoryMax: Long = Long.MinValue
+ var resultSerializationTimeSum: Long = 0
+ var resultSizeMax: Long = Long.MinValue
+ var srFetchWaitTimeSum: Long = 0
+ var srLocalBlocksFetchedSum: Long = 0
+ var srLocalBytesReadSum: Long = 0
+ var srRemoteBlocksFetchSum: Long = 0
+ var srRemoteBytesReadSum: Long = 0
+ var srRemoteBytesReadToDiskSum: Long = 0
+ var srTotalBytesReadSum: Long = 0
+ var swBytesWrittenSum: Long = 0
+ var swRecordsWrittenSum: Long = 0
+ var swWriteTimeSum: Long = 0
+
+ /**
+ * Assumption that 0-tasks implies no aggregations on metrics. This means that metrics on
+ * job/SQL levels won't be accumulated as long as no tasks are accounted for.
+ */
+ def isEmptyAggregates: Boolean = numTasks == 0
+
+ /**
+ * Reset all fields to 0. This is used to reset the fields when the Task iterator is empty.
+ * When the iterator is empty, then fields such as "max" should be reset to 0.
+ */
+ def resetFields(): Unit = {
+ durationMax = 0
+ durationMin = 0
+ peakExecutionMemoryMax = 0
+ resultSizeMax = 0
+ }
+
+ def addRecord(rec: TaskModel): Unit = {
+ numTasks += 1
+ // SumFields
+ diskBytesSpilledSum += rec.diskBytesSpilled
+ durationSum += rec.duration
+ executorCPUTimeSum += rec.executorCPUTime
+ executorDeserializeCpuTimeSum += rec.executorDeserializeCPUTime
+ executorDeserializeTimeSum += rec.executorDeserializeTime
+ executorRunTimeSum += rec.executorRunTime
+ inputBytesReadSum += rec.input_bytesRead
+ inputRecordsReadSum += rec.input_recordsRead
+ jvmGCTimeSum += rec.jvmGCTime
+ memoryBytesSpilledSum += rec.memoryBytesSpilled
+ outputBytesWrittenSum += rec.output_bytesWritten
+ outputRecordsWrittenSum += rec.output_recordsWritten
+ resultSerializationTimeSum += rec.resultSerializationTime
+ srFetchWaitTimeSum += rec.sr_fetchWaitTime
+ srLocalBlocksFetchedSum += rec.sr_localBlocksFetched
+ srLocalBytesReadSum += rec.sr_localBytesRead
+ srRemoteBlocksFetchSum += rec.sr_remoteBlocksFetched
+ srRemoteBytesReadSum += rec.sr_remoteBytesRead
+ srRemoteBytesReadToDiskSum += rec.sr_remoteBytesReadToDisk
+ srTotalBytesReadSum += rec.sr_totalBytesRead
+ swBytesWrittenSum += rec.sw_bytesWritten
+ swRecordsWrittenSum += rec.sw_recordsWritten
+ swWriteTimeSum += rec.sw_writeTime
+ // Max fields
+ durationMax = math.max(durationMax, rec.duration)
+ peakExecutionMemoryMax = math.max(peakExecutionMemoryMax, rec.peakExecutionMemory)
+ resultSizeMax = math.max(resultSizeMax, rec.resultSize)
+ // Min Fields
+ durationMin = math.min(durationMin, rec.duration)
+ }
+
+ def addRecord(rec: StageAggTaskMetricsProfileResult): Unit = {
+ // Sums
+ numTasks += rec.numTasks
+ durationSum += rec.durationSum
+ diskBytesSpilledSum += rec.diskBytesSpilledSum
+ executorCPUTimeSum += rec.executorCPUTimeSum
+ executorRunTimeSum += rec.executorRunTimeSum
+ inputBytesReadSum += rec.inputBytesReadSum
+ executorDeserializeCpuTimeSum += rec.executorDeserializeCpuTimeSum
+ executorDeserializeTimeSum += rec.executorDeserializeTimeSum
+ inputRecordsReadSum += rec.inputRecordsReadSum
+ jvmGCTimeSum += rec.jvmGCTimeSum
+ memoryBytesSpilledSum += rec.memoryBytesSpilledSum
+ outputBytesWrittenSum += rec.outputBytesWrittenSum
+ outputRecordsWrittenSum += rec.outputRecordsWrittenSum
+ resultSerializationTimeSum += rec.resultSerializationTimeSum
+ srFetchWaitTimeSum += rec.srFetchWaitTimeSum
+ srLocalBlocksFetchedSum += rec.srLocalBlocksFetchedSum
+ srLocalBytesReadSum += rec.srcLocalBytesReadSum
+ srRemoteBlocksFetchSum += rec.srRemoteBlocksFetchSum
+ srRemoteBytesReadSum += rec.srRemoteBytesReadSum
+ srRemoteBytesReadToDiskSum += rec.srRemoteBytesReadToDiskSum
+ srTotalBytesReadSum += rec.srTotalBytesReadSum
+ swBytesWrittenSum += rec.swBytesWrittenSum
+ swRecordsWrittenSum += rec.swRecordsWrittenSum
+ swWriteTimeSum += rec.swWriteTimeSum
+ // Max
+ durationMax = math.max(durationMax, rec.durationMax)
+ peakExecutionMemoryMax = math.max(peakExecutionMemoryMax, rec.peakExecutionMemoryMax)
+ resultSizeMax = math.max(resultSizeMax, rec.resultSizeMax)
+ // Min
+ durationMin = math.min(durationMin, rec.durationMin)
+ }
+
+ /**
+ * This method should be called to finalize the accumulations of all the metrics.
+ * For example, calculating averages and doing any last transformations on a field before the
+ * results are consumed.
+ */
+ def finalizeAggregation(): Unit = {
+ durationAvg = ToolUtils.calculateAverage(durationSum, numTasks, 1)
+ if (numTasks < 1) {
+ // number of tasks is 0, then we should reset fields such as (max, min) to 0.
+ resetFields()
+ }
+ }
+}
diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/GenericExecParser.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/GenericExecParser.scala
index 6295c5533..cc60904be 100644
--- a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/GenericExecParser.scala
+++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/GenericExecParser.scala
@@ -91,7 +91,8 @@ class GenericExecParser(
ExecInfo(
node,
sqlID,
- node.name,
+ // Remove trailing spaces from node name if any
+ node.name.trim,
"",
speedupFactor,
duration,
diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/SQLPlanParser.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/SQLPlanParser.scala
index 4d9c59dd8..8471e8a57 100644
--- a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/SQLPlanParser.scala
+++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/SQLPlanParser.scala
@@ -483,8 +483,8 @@ object SQLPlanParser extends Logging {
case "AggregateInPandas" | "ArrowEvalPython" | "AQEShuffleRead" | "CartesianProduct"
| "Coalesce" | "CollectLimit" | "CustomShuffleReader" | "FlatMapGroupsInPandas"
| "GlobalLimit" | "LocalLimit" | "InMemoryTableScan" | "MapInPandas"
- | "PythonMapInArrow" | "MapInArrow" | "Range" | "Sample" | "Union"
- | "WindowInPandas" =>
+ | "PythonMapInArrow" | "MapInArrow" | "Range" | "RunningWindowFunction"
+ | "Sample" | "Union" | "WindowInPandas" =>
GenericExecParser(node, checker, sqlID, app = Some(app)).parse
case "BatchScan" =>
BatchScanExecParser(node, checker, sqlID, app).parse
diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/AutoTuner.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/AutoTuner.scala
index 783726cd9..44ff839ba 100644
--- a/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/AutoTuner.scala
+++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/AutoTuner.scala
@@ -760,6 +760,7 @@ class AutoTuner(
}
} else if (sparkVersion.contains("amzn")) {
sparkVersion match {
+ case ver if ver.contains("3.5.2") => "352"
case ver if ver.contains("3.5.1") => "351"
case ver if ver.contains("3.5.0") => "350"
case ver if ver.contains("3.4.1") => "341"
diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/Profiler.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/Profiler.scala
index 75b4c4590..5f92cf32d 100644
--- a/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/Profiler.scala
+++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/Profiler.scala
@@ -22,7 +22,7 @@ import scala.collection.JavaConverters._
import scala.collection.mutable.{ArrayBuffer, HashMap}
import scala.util.control.NonFatal
-import com.nvidia.spark.rapids.tool.{AppSummaryInfoBaseProvider, EventLogInfo, EventLogPathProcessor, FailedEventLog, PlatformFactory, ToolBase}
+import com.nvidia.spark.rapids.tool.{AppSummaryInfoBaseProvider, EventLogInfo, EventLogPathProcessor, FailedEventLog, Platform, PlatformFactory, ToolBase}
import com.nvidia.spark.rapids.tool.profiling.AutoTuner.loadClusterProps
import com.nvidia.spark.rapids.tool.views._
import org.apache.hadoop.conf.Configuration
@@ -43,6 +43,8 @@ class Profiler(hadoopConf: Configuration, appArgs: ProfileArgs, enablePB: Boolea
private val outputCombined: Boolean = appArgs.combined()
private val useAutoTuner: Boolean = appArgs.autoTuner()
private val outputAlignedSQLIds: Boolean = appArgs.outputSqlIdsAligned()
+ // Unlike qualification tool, profiler tool does not require platform per app
+ private val platform: Platform = PlatformFactory.createInstance(appArgs.platform())
override def getNumThreads: Int = appArgs.numThreads.getOrElse(
Math.ceil(Runtime.getRuntime.availableProcessors() / 4f).toInt)
@@ -295,9 +297,9 @@ class Profiler(hadoopConf: Configuration, appArgs: ProfileArgs, enablePB: Boolea
private def createApp(path: EventLogInfo, index: Int,
hadoopConf: Configuration): Either[FailureApp, ApplicationInfo] = {
try {
- // This apps only contains 1 app in each loop.
+ // These apps only contains 1 app in each loop.
val startTime = System.currentTimeMillis()
- val app = new ApplicationInfo(hadoopConf, path, index)
+ val app = new ApplicationInfo(hadoopConf, path, index, platform)
EventLogPathProcessor.logApplicationInfo(app)
val endTime = System.currentTimeMillis()
if (!app.isAppMetaDefined) {
diff --git a/core/src/main/scala/org/apache/spark/sql/rapids/tool/AppBase.scala b/core/src/main/scala/org/apache/spark/sql/rapids/tool/AppBase.scala
index b90917cd8..971a8711f 100644
--- a/core/src/main/scala/org/apache/spark/sql/rapids/tool/AppBase.scala
+++ b/core/src/main/scala/org/apache/spark/sql/rapids/tool/AppBase.scala
@@ -23,7 +23,7 @@ import scala.collection.immutable
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, LinkedHashSet, Map}
import com.nvidia.spark.rapids.SparkRapidsBuildInfoEvent
-import com.nvidia.spark.rapids.tool.{DatabricksEventLog, DatabricksRollingEventLogFilesFileReader, EventLogInfo}
+import com.nvidia.spark.rapids.tool.{DatabricksEventLog, DatabricksRollingEventLogFilesFileReader, EventLogInfo, Platform}
import com.nvidia.spark.rapids.tool.planparser.{HiveParseHelper, ReadParser}
import com.nvidia.spark.rapids.tool.planparser.HiveParseHelper.isHiveTableScanNode
import com.nvidia.spark.rapids.tool.profiling.{BlockManagerRemovedCase, DriverAccumCase, JobInfoClass, ResourceProfileInfoCase, SQLExecutionInfoClass, SQLPlanMetricsCase}
@@ -42,7 +42,8 @@ import org.apache.spark.util.Utils
abstract class AppBase(
val eventLogInfo: Option[EventLogInfo],
- val hadoopConf: Option[Configuration]) extends Logging
+ val hadoopConf: Option[Configuration],
+ val platform: Option[Platform] = None) extends Logging
with ClusterTagPropHandler
with AccumToStageRetriever {
@@ -481,6 +482,7 @@ abstract class AppBase(
protected def postCompletion(): Unit = {
registerAttemptId()
calculateAppDuration()
+ validateSparkRuntime()
}
/**
@@ -491,6 +493,20 @@ abstract class AppBase(
processEventsInternal()
postCompletion()
}
+
+ /**
+ * Validates if the spark runtime (parsed from event log) is supported by the platform.
+ * If the runtime is not supported, an `UnsupportedSparkRuntimeException`
+ * is thrown.
+ */
+ private def validateSparkRuntime(): Unit = {
+ val parsedRuntime = getSparkRuntime
+ platform.foreach { p =>
+ require(p.isRuntimeSupported(parsedRuntime),
+ throw UnsupportedSparkRuntimeException(p, parsedRuntime)
+ )
+ }
+ }
}
object AppBase {
diff --git a/core/src/main/scala/org/apache/spark/sql/rapids/tool/ToolUtils.scala b/core/src/main/scala/org/apache/spark/sql/rapids/tool/ToolUtils.scala
index 021337495..455f19147 100644
--- a/core/src/main/scala/org/apache/spark/sql/rapids/tool/ToolUtils.scala
+++ b/core/src/main/scala/org/apache/spark/sql/rapids/tool/ToolUtils.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.rapids.tool
import scala.collection.mutable
import scala.util.{Failure, Success, Try}
+import com.nvidia.spark.rapids.tool.Platform
import com.nvidia.spark.rapids.tool.planparser.SubqueryExecParser
import com.nvidia.spark.rapids.tool.profiling.ProfileUtils.replaceDelimiter
import com.nvidia.spark.rapids.tool.qualification.QualOutputWriter
@@ -28,7 +29,7 @@ import org.apache.spark.internal.{config, Logging}
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.execution.SparkPlanInfo
import org.apache.spark.sql.execution.ui.{SparkPlanGraph, SparkPlanGraphNode}
-import org.apache.spark.sql.rapids.tool.util.ToolsPlanGraph
+import org.apache.spark.sql.rapids.tool.util.{SparkRuntime, ToolsPlanGraph}
object ToolUtils extends Logging {
// List of recommended file-encodings on the GPUs.
@@ -441,6 +442,12 @@ case class UnsupportedMetricNameException(metricName: String)
extends AppEventlogProcessException(
s"Unsupported metric name found in the event log: $metricName")
+case class UnsupportedSparkRuntimeException(
+ platform: Platform,
+ sparkRuntime: SparkRuntime.SparkRuntime)
+ extends AppEventlogProcessException(
+ s"Platform '${platform.platformName}' does not support the runtime '$sparkRuntime'")
+
// Class used a container to hold the information of the Tuple
// to simplify arguments of methods and caching.
case class SqlPlanInfoGraphEntry(
diff --git a/core/src/main/scala/org/apache/spark/sql/rapids/tool/profiling/ApplicationInfo.scala b/core/src/main/scala/org/apache/spark/sql/rapids/tool/profiling/ApplicationInfo.scala
index 83a3cbc0b..6fbf2bb68 100644
--- a/core/src/main/scala/org/apache/spark/sql/rapids/tool/profiling/ApplicationInfo.scala
+++ b/core/src/main/scala/org/apache/spark/sql/rapids/tool/profiling/ApplicationInfo.scala
@@ -18,7 +18,7 @@ package org.apache.spark.sql.rapids.tool.profiling
import scala.collection.Map
-import com.nvidia.spark.rapids.tool.EventLogInfo
+import com.nvidia.spark.rapids.tool.{EventLogInfo, Platform, PlatformFactory}
import com.nvidia.spark.rapids.tool.analysis.AppSQLPlanAnalyzer
import org.apache.hadoop.conf.Configuration
@@ -184,8 +184,9 @@ object SparkPlanInfoWithStage {
class ApplicationInfo(
hadoopConf: Configuration,
eLogInfo: EventLogInfo,
- val index: Int)
- extends AppBase(Some(eLogInfo), Some(hadoopConf)) with Logging {
+ val index: Int,
+ platform: Platform = PlatformFactory.createInstance())
+ extends AppBase(Some(eLogInfo), Some(hadoopConf), Some(platform)) with Logging {
private lazy val eventProcessor = new EventsProcessor(this)
diff --git a/core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationAppInfo.scala b/core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationAppInfo.scala
index 1ef8b7315..e3c33203f 100644
--- a/core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationAppInfo.scala
+++ b/core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationAppInfo.scala
@@ -41,7 +41,7 @@ class QualificationAppInfo(
mlOpsEnabled: Boolean = false,
penalizeTransitions: Boolean = true,
platform: Platform)
- extends AppBase(eventLogInfo, hadoopConf) with Logging {
+ extends AppBase(eventLogInfo, hadoopConf, Some(platform)) with Logging {
var lastJobEndTime: Option[Long] = None
var lastSQLEndTime: Option[Long] = None
diff --git a/core/src/main/scala/org/apache/spark/sql/rapids/tool/store/AccumInfo.scala b/core/src/main/scala/org/apache/spark/sql/rapids/tool/store/AccumInfo.scala
index 0f8e520c6..080a34df3 100644
--- a/core/src/main/scala/org/apache/spark/sql/rapids/tool/store/AccumInfo.scala
+++ b/core/src/main/scala/org/apache/spark/sql/rapids/tool/store/AccumInfo.scala
@@ -16,7 +16,7 @@
package org.apache.spark.sql.rapids.tool.store
-import scala.collection.mutable
+import scala.collection.{breakOut, mutable}
import com.nvidia.spark.rapids.tool.analysis.StatisticsMetrics
@@ -98,22 +98,8 @@ class AccumInfo(val infoRef: AccumMetaRef) {
}
def calculateAccStats(): StatisticsMetrics = {
- val sortedTaskUpdates = taskUpdatesMap.values.toSeq.sorted
- if (sortedTaskUpdates.isEmpty) {
- // do not check stage values because the stats is only meant for task updates
- StatisticsMetrics.ZERO_RECORD
- } else {
- val min = sortedTaskUpdates.head
- val max = sortedTaskUpdates.last
- val sum = sortedTaskUpdates.sum
- val median = if (sortedTaskUpdates.size % 2 == 0) {
- val mid = sortedTaskUpdates.size / 2
- (sortedTaskUpdates(mid) + sortedTaskUpdates(mid - 1)) / 2
- } else {
- sortedTaskUpdates(sortedTaskUpdates.size / 2)
- }
- StatisticsMetrics(min, median, max, sum)
- }
+ // do not check stage values because the stats is only meant for task updates
+ StatisticsMetrics.createFromArr(taskUpdatesMap.map(_._2)(breakOut))
}
def getMaxStageValue: Option[Long] = {
diff --git a/core/src/main/scala/org/apache/spark/sql/rapids/tool/store/AccumMetaRef.scala b/core/src/main/scala/org/apache/spark/sql/rapids/tool/store/AccumMetaRef.scala
index 7b70bedb2..35c9c19e1 100644
--- a/core/src/main/scala/org/apache/spark/sql/rapids/tool/store/AccumMetaRef.scala
+++ b/core/src/main/scala/org/apache/spark/sql/rapids/tool/store/AccumMetaRef.scala
@@ -27,6 +27,7 @@ case class AccumMetaRef(id: Long, name: AccumNameRef) {
}
object AccumMetaRef {
+ val EMPTY_ACCUM_META_REF: AccumMetaRef = new AccumMetaRef(0L, AccumNameRef.EMPTY_ACC_NAME_REF)
def apply(id: Long, name: Option[String]): AccumMetaRef =
new AccumMetaRef(id, AccumNameRef.getOrCreateAccumNameRef(name))
}
diff --git a/core/src/main/scala/org/apache/spark/sql/rapids/tool/store/AccumNameRef.scala b/core/src/main/scala/org/apache/spark/sql/rapids/tool/store/AccumNameRef.scala
index 0172f5229..4ce41e4a5 100644
--- a/core/src/main/scala/org/apache/spark/sql/rapids/tool/store/AccumNameRef.scala
+++ b/core/src/main/scala/org/apache/spark/sql/rapids/tool/store/AccumNameRef.scala
@@ -42,7 +42,7 @@ case class AccumNameRef(value: String) {
object AccumNameRef {
// Dummy AccNameRef to represent None accumulator names. This is an optimization to avoid
// storing an option[string] for all accumulable names which leads to "get-or-else" everywhere.
- private val EMPTY_ACC_NAME_REF: AccumNameRef = new AccumNameRef("N/A")
+ val EMPTY_ACC_NAME_REF: AccumNameRef = new AccumNameRef("N/A")
// A global table to store reference to all accumulator names. The map is accessible by all
// threads (different applications) running in parallel. This avoids duplicate work across
// different threads.
diff --git a/core/src/main/scala/org/apache/spark/sql/rapids/tool/util/InPlaceMedianArrView.scala b/core/src/main/scala/org/apache/spark/sql/rapids/tool/util/InPlaceMedianArrView.scala
new file mode 100644
index 000000000..1be48a6a7
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/sql/rapids/tool/util/InPlaceMedianArrView.scala
@@ -0,0 +1,150 @@
+/*
+ * Copyright (c) 2024, NVIDIA CORPORATION.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.rapids.tool.util
+
+import scala.annotation.tailrec
+import scala.language.postfixOps
+
+/**
+ * Allows for in-place partitioning and finding the median.
+ * The tools used to find the median of a sequence by sorting the entire sequence, then returning
+ * the elements in the middle. As we started to capture all the accumulators in Spark plans,
+ * sorting is inefficient for large eventlogs that contain huge number of tasks and
+ * Accumulables. Thus, this class is an optimized version to get the median in a linear
+ * complexity while doing it in place to avoid allocating new array to store the sorted elements.
+ * The code is copied from a Stackoverflow thread:
+ * https://stackoverflow.com/questions/4662292/scala-median-implementation
+ *
+ * Notes:
+ * - The implementation assumes that the array is not empty.
+ */
+case class InPlaceMedianArrView(arr: Array[Long], from: Int, until: Int) {
+ def apply(n: Int): Long = {
+ if (from + n < until) {
+ arr(from + n)
+ } else {
+ throw new ArrayIndexOutOfBoundsException(n)
+ }
+ }
+
+ /**
+ * Returns a new view of the array with the same elements but a different range.
+ * @param p a predicate to apply on the elements to proceed with the partitioning.
+ * @return a tuple of 2 views, the first one contains the elements that satisfy the predicate,
+ * and the second one contains the rest.
+ */
+ def partitionInPlace(p: Long => Boolean): (InPlaceMedianArrView, InPlaceMedianArrView) = {
+ var upper = until - 1
+ var lower = from
+ while (lower < upper) {
+ while (lower < until && p(arr(lower))) lower += 1
+ while (upper >= from && !p(arr(upper))) upper -= 1
+ if (lower < upper) { val tmp = arr(lower); arr(lower) = arr(upper); arr(upper) = tmp }
+ }
+ (copy(until = lower), copy(from = lower))
+ }
+
+ def size: Int = {
+ until - from
+ }
+
+ def isEmpty: Boolean = {
+ size <= 0
+ }
+
+ override def toString = {
+ arr mkString ("ArraySize(", ", ", ")")
+ }
+}
+
+/**
+ * Companion object for InPlaceMedianArrView.
+ */
+object InPlaceMedianArrView {
+
+ def apply(arr: Array[Long]): InPlaceMedianArrView = {
+ InPlaceMedianArrView(arr, 0, arr.size)
+ }
+
+ /**
+ * Finds the median of the array in place.
+ * @param arr the Array[Long] to be processed
+ * @param k the index of the median
+ * @param choosePivot a function to choose the pivot index. This useful to choose different
+ * strategies. For example, choosing the midpoint works better for sorted
+ * arrays.
+ * @return the median of the array.
+ */
+ @tailrec
+ def findKMedianInPlace(arr: InPlaceMedianArrView, k: Int)
+ (implicit choosePivot: InPlaceMedianArrView => Long): Long = {
+ val a = choosePivot(arr)
+ val (s, b) = arr partitionInPlace (a >)
+ if (s.size == k) {
+ a
+ } else if (s.isEmpty) {
+ val (s, b) = arr partitionInPlace (a ==)
+ if (s.size > k) {
+ a
+ } else {
+ findKMedianInPlace(b, k - s.size)
+ }
+ } else if (s.size < k) {
+ findKMedianInPlace(b, k - s.size)
+ } else {
+ findKMedianInPlace(s, k)
+ }
+ }
+
+ /**
+ * Choose a random pivot in the array. This can lead to worst case for sorted arrays.
+ * @param arr the array to choose the pivot from.
+ * @return a random element from the array.
+ */
+ def chooseRandomPivotInPlace(arr: InPlaceMedianArrView): Long = {
+ arr(scala.util.Random.nextInt(arr.size))
+ }
+
+ /**
+ * Choose the element in the middle as a pivot. This works better to find median of sorted arrays.
+ * @param arr the array to choose the pivot from.
+ * @return the element in the middle of the array.
+ */
+ def chooseMidpointPivotInPlace(arr: InPlaceMedianArrView): Long = {
+ arr((arr.size - 1) / 2)
+ }
+
+ /**
+ * Finds the median of the array in place.
+ * @param arr the Array[Long] to be processed.
+ * @param choosePivot a function to choose the pivot index.
+ * @return the median of the array.
+ */
+ def findMedianInPlace(
+ arr: Array[Long])(implicit choosePivot: InPlaceMedianArrView => Long): Long = {
+ val midIndex = (arr.size - 1) / 2
+ if (arr.size % 2 == 0) {
+ // For even-length arrays, find the two middle elements and compute their average
+ val mid1 = findKMedianInPlace(InPlaceMedianArrView(arr), midIndex)
+ val mid2 = findKMedianInPlace(InPlaceMedianArrView(arr), midIndex + 1)
+ (mid1 + mid2) / 2
+ } else {
+ // For odd-length arrays, return the middle element
+ findKMedianInPlace(InPlaceMedianArrView(arr), midIndex)
+ }
+ }
+}
diff --git a/core/src/test/scala/com/nvidia/spark/rapids/tool/ToolTestUtils.scala b/core/src/test/scala/com/nvidia/spark/rapids/tool/ToolTestUtils.scala
index bd5e7bf25..011e5010e 100644
--- a/core/src/test/scala/com/nvidia/spark/rapids/tool/ToolTestUtils.scala
+++ b/core/src/test/scala/com/nvidia/spark/rapids/tool/ToolTestUtils.scala
@@ -144,12 +144,13 @@ object ToolTestUtils extends Logging {
val apps: ArrayBuffer[ApplicationInfo] = ArrayBuffer[ApplicationInfo]()
val appArgs = new ProfileArgs(logs)
var index: Int = 1
+ val platform = PlatformFactory.createInstance(appArgs.platform())
for (path <- appArgs.eventlog()) {
val eventLogInfo = EventLogPathProcessor
.getEventLogInfo(path, RapidsToolsConfUtil.newHadoopConf())
- assert(eventLogInfo.size >= 1, s"event log not parsed as expected $path")
+ assert(eventLogInfo.nonEmpty, s"event log not parsed as expected $path")
apps += new ApplicationInfo(RapidsToolsConfUtil.newHadoopConf(),
- eventLogInfo.head._1, index)
+ eventLogInfo.head._1, index, platform)
index += 1
}
apps
diff --git a/core/src/test/scala/com/nvidia/spark/rapids/tool/planparser/BasePlanParserSuite.scala b/core/src/test/scala/com/nvidia/spark/rapids/tool/planparser/BasePlanParserSuite.scala
index b7966d4d2..6fe3cd2cd 100644
--- a/core/src/test/scala/com/nvidia/spark/rapids/tool/planparser/BasePlanParserSuite.scala
+++ b/core/src/test/scala/com/nvidia/spark/rapids/tool/planparser/BasePlanParserSuite.scala
@@ -17,7 +17,7 @@
package com.nvidia.spark.rapids.tool.planparser
import com.nvidia.spark.rapids.BaseTestSuite
-import com.nvidia.spark.rapids.tool.{EventLogPathProcessor, PlatformFactory, ToolTestUtils}
+import com.nvidia.spark.rapids.tool.{EventLogPathProcessor, PlatformFactory, PlatformNames, ToolTestUtils}
import com.nvidia.spark.rapids.tool.qualification._
import org.apache.spark.sql.rapids.tool.qualification.QualificationAppInfo
@@ -59,7 +59,8 @@ class BasePlanParserSuite extends BaseTestSuite {
}
}
- def createAppFromEventlog(eventLog: String): QualificationAppInfo = {
+ def createAppFromEventlog(eventLog: String,
+ platformName: String = PlatformNames.DEFAULT): QualificationAppInfo = {
val hadoopConf = RapidsToolsConfUtil.newHadoopConf()
val (_, allEventLogs) = EventLogPathProcessor.processAllPaths(
None, None, List(eventLog), hadoopConf)
@@ -67,7 +68,7 @@ class BasePlanParserSuite extends BaseTestSuite {
assert(allEventLogs.size == 1)
val appResult = QualificationAppInfo.createApp(allEventLogs.head, hadoopConf,
pluginTypeChecker, reportSqlLevel = false, mlOpsEnabled = false, penalizeTransitions = true,
- PlatformFactory.createInstance())
+ PlatformFactory.createInstance(platformName))
appResult match {
case Right(app) => app
case Left(_) => throw new AssertionError("Cannot create application")
diff --git a/core/src/test/scala/com/nvidia/spark/rapids/tool/planparser/PhotonPlanParserSuite.scala b/core/src/test/scala/com/nvidia/spark/rapids/tool/planparser/PhotonPlanParserSuite.scala
index edf8095bc..74f237178 100644
--- a/core/src/test/scala/com/nvidia/spark/rapids/tool/planparser/PhotonPlanParserSuite.scala
+++ b/core/src/test/scala/com/nvidia/spark/rapids/tool/planparser/PhotonPlanParserSuite.scala
@@ -16,6 +16,7 @@
package com.nvidia.spark.rapids.tool.planparser
+import com.nvidia.spark.rapids.tool.PlatformNames
import com.nvidia.spark.rapids.tool.qualification.PluginTypeChecker
@@ -34,7 +35,7 @@ class PhotonPlanParserSuite extends BasePlanParserSuite {
test(s"$photonName is parsed as Spark $sparkName") {
val eventLog = s"$qualLogDir/nds_q88_photon_db_13_3.zstd"
val pluginTypeChecker = new PluginTypeChecker()
- val app = createAppFromEventlog(eventLog)
+ val app = createAppFromEventlog(eventLog, platformName = PlatformNames.DATABRICKS_AWS)
assert(app.sqlPlans.nonEmpty)
val parsedPlans = app.sqlPlans.map { case (sqlID, plan) =>
SQLPlanParser.parseSQLPlan(app.appId, plan, sqlID, "", pluginTypeChecker, app)
diff --git a/core/src/test/scala/com/nvidia/spark/rapids/tool/profiling/AnalysisSuite.scala b/core/src/test/scala/com/nvidia/spark/rapids/tool/profiling/AnalysisSuite.scala
index 2b8c3bf12..b7d8b315f 100644
--- a/core/src/test/scala/com/nvidia/spark/rapids/tool/profiling/AnalysisSuite.scala
+++ b/core/src/test/scala/com/nvidia/spark/rapids/tool/profiling/AnalysisSuite.scala
@@ -18,7 +18,7 @@ package com.nvidia.spark.rapids.tool.profiling
import java.io.File
-import com.nvidia.spark.rapids.tool.ToolTestUtils
+import com.nvidia.spark.rapids.tool.{PlatformNames, ToolTestUtils}
import com.nvidia.spark.rapids.tool.views.{ProfDataSourceView, RawMetricProfilerView}
import org.scalatest.FunSuite
@@ -139,7 +139,8 @@ class AnalysisSuite extends FunSuite {
s"${fileName}_${metric}_metrics_agg_expectation.csv"
}
testSqlMetricsAggregation(Array(s"${qualLogDir}/${fileName}.zstd"),
- expectFile("sql"), expectFile("job"), expectFile("stage"))
+ expectFile("sql"), expectFile("job"), expectFile("stage"),
+ platformName = PlatformNames.DATABRICKS_AWS)
}
test("test stage-level diagnostic aggregation simple") {
@@ -163,8 +164,10 @@ class AnalysisSuite extends FunSuite {
}
private def testSqlMetricsAggregation(logs: Array[String], expectFileSQL: String,
- expectFileJob: String, expectFileStage: String): Unit = {
- val apps = ToolTestUtils.processProfileApps(logs, sparkSession)
+ expectFileJob: String, expectFileStage: String,
+ platformName: String = PlatformNames.DEFAULT): Unit = {
+ val args = Array("--platform", platformName) ++ logs
+ val apps = ToolTestUtils.processProfileApps(args, sparkSession)
assert(apps.size == logs.size)
val aggResults = RawMetricProfilerView.getAggMetrics(apps)
import sparkSession.implicits._
@@ -256,9 +259,12 @@ class AnalysisSuite extends FunSuite {
}
test("test photon scan metrics") {
- val fileName = "nds_q88_photon_db_13_3"
- val logs = Array(s"${qualLogDir}/${fileName}.zstd")
- val apps = ToolTestUtils.processProfileApps(logs, sparkSession)
+ val args = Array(
+ "--platform",
+ PlatformNames.DATABRICKS_AWS,
+ s"$qualLogDir/nds_q88_photon_db_13_3.zstd"
+ )
+ val apps = ToolTestUtils.processProfileApps(args, sparkSession)
val dataSourceResults = ProfDataSourceView.getRawView(apps)
assert(dataSourceResults.exists(_.scan_time > 0))
}
diff --git a/core/src/test/scala/com/nvidia/spark/rapids/tool/profiling/ApplicationInfoSuite.scala b/core/src/test/scala/com/nvidia/spark/rapids/tool/profiling/ApplicationInfoSuite.scala
index 7ff03a943..1d40472c9 100644
--- a/core/src/test/scala/com/nvidia/spark/rapids/tool/profiling/ApplicationInfoSuite.scala
+++ b/core/src/test/scala/com/nvidia/spark/rapids/tool/profiling/ApplicationInfoSuite.scala
@@ -22,7 +22,7 @@ import java.nio.file.{Files, Paths, StandardOpenOption}
import scala.collection.mutable.ArrayBuffer
-import com.nvidia.spark.rapids.tool.{EventLogPathProcessor, StatusReportCounts, ToolTestUtils}
+import com.nvidia.spark.rapids.tool.{EventLogPathProcessor, PlatformNames, StatusReportCounts, ToolTestUtils}
import com.nvidia.spark.rapids.tool.views.RawMetricProfilerView
import org.apache.hadoop.io.IOUtils
import org.scalatest.FunSuite
@@ -30,6 +30,7 @@ import org.scalatest.FunSuite
import org.apache.spark.internal.Logging
import org.apache.spark.resource.ResourceProfile
import org.apache.spark.sql.{SparkSession, TrampolineUtil}
+import org.apache.spark.sql.rapids.tool.UnsupportedSparkRuntimeException
import org.apache.spark.sql.rapids.tool.profiling._
import org.apache.spark.sql.rapids.tool.util.{FSUtils, SparkRuntime}
@@ -1116,17 +1117,56 @@ class ApplicationInfoSuite extends FunSuite with Logging {
}
}
- val sparkRuntimeTestCases: Seq[(SparkRuntime.Value, String)] = Seq(
- SparkRuntime.SPARK -> s"$qualLogDir/nds_q86_test",
- SparkRuntime.SPARK_RAPIDS -> s"$logDir/nds_q66_gpu.zstd",
- SparkRuntime.PHOTON -> s"$qualLogDir/nds_q88_photon_db_13_3.zstd"
+ // scalastyle:off line.size.limit
+ val supportedSparkRuntimeTestCases: Map[String, Seq[(String, SparkRuntime.SparkRuntime)]] = Map(
+ // tests for standard Spark runtime
+ s"$qualLogDir/nds_q86_test" -> Seq(
+ (PlatformNames.DATABRICKS_AWS, SparkRuntime.SPARK), // Expected: SPARK on Databricks AWS
+ (PlatformNames.ONPREM, SparkRuntime.SPARK) // Expected: SPARK on Onprem
+ ),
+ // tests for Spark Rapids runtime
+ s"$logDir/nds_q66_gpu.zstd" -> Seq(
+ (PlatformNames.DATABRICKS_AWS, SparkRuntime.SPARK_RAPIDS), // Expected: SPARK_RAPIDS on Databricks AWS
+ (PlatformNames.ONPREM, SparkRuntime.SPARK_RAPIDS) // Expected: SPARK_RAPIDS on Onprem
+ ),
+ // tests for Photon runtime with fallback to SPARK for unsupported platforms
+ s"$qualLogDir/nds_q88_photon_db_13_3.zstd" -> Seq(
+ (PlatformNames.DATABRICKS_AWS, SparkRuntime.PHOTON), // Expected: PHOTON on Databricks AWS
+ (PlatformNames.DATABRICKS_AZURE, SparkRuntime.PHOTON) // Expected: PHOTON on Databricks Azure
+ )
)
+ // scalastyle:on line.size.limit
+
+ supportedSparkRuntimeTestCases.foreach { case (logPath, platformRuntimeCases) =>
+ val baseFileName = logPath.split("/").last
+ platformRuntimeCases.foreach { case (platform, expectedRuntime) =>
+ test(s"test eventlog $baseFileName on $platform has supported runtime: $expectedRuntime") {
+ val args = Array("--platform", platform, logPath)
+ val apps = ToolTestUtils.processProfileApps(args, sparkSession)
+ assert(apps.size == 1)
+ assert(apps.head.getSparkRuntime == expectedRuntime)
+ }
+ }
+ }
- sparkRuntimeTestCases.foreach { case (expectedSparkRuntime, eventLog) =>
- test(s"test spark runtime property for ${expectedSparkRuntime.toString} eventlog") {
- val apps = ToolTestUtils.processProfileApps(Array(eventLog), sparkSession)
- assert(apps.size == 1)
- assert(apps.head.getSparkRuntime == expectedSparkRuntime)
+ // scalastyle:off line.size.limit
+ val unsupportedSparkRuntimeTestCases: Map[String, Seq[String]] = Map(
+ s"$qualLogDir/nds_q88_photon_db_13_3.zstd" -> Seq(
+ PlatformNames.ONPREM, // Expected: PHOTON runtime on Onprem is not supported
+ PlatformNames.DATAPROC // Expected: PHOTON runtime on Dataproc is not supported
+ )
+ )
+ // scalastyle:on line.size.limit
+
+ unsupportedSparkRuntimeTestCases.foreach { case (logPath, platformNames) =>
+ val baseFileName = logPath.split("/").last
+ platformNames.foreach { platform =>
+ test(s"test eventlog $baseFileName on $platform has unsupported runtime") {
+ val args = Array("--platform", platform, logPath)
+ intercept[UnsupportedSparkRuntimeException] {
+ ToolTestUtils.processProfileApps(args, sparkSession)
+ }
+ }
}
}
}
diff --git a/core/src/test/scala/com/nvidia/spark/rapids/tool/qualification/QualificationSuite.scala b/core/src/test/scala/com/nvidia/spark/rapids/tool/qualification/QualificationSuite.scala
index 03943d463..6de463db1 100644
--- a/core/src/test/scala/com/nvidia/spark/rapids/tool/qualification/QualificationSuite.scala
+++ b/core/src/test/scala/com/nvidia/spark/rapids/tool/qualification/QualificationSuite.scala
@@ -136,12 +136,15 @@ class QualificationSuite extends BaseTestSuite {
}
}
- private def runQualificationTest(eventLogs: Array[String], expectFileName: String = "",
+ private def runQualificationTest(eventLogs: Array[String],
+ expectFileName: String = "", platformName: String = PlatformNames.DEFAULT,
shouldReturnEmpty: Boolean = false, expectPerSqlFileName: Option[String] = None,
expectedStatus: Option[StatusReportCounts] = None): Unit = {
TrampolineUtil.withTempDir { outpath =>
val qualOutputPrefix = "rapids_4_spark_qualification_output"
val outputArgs = Array(
+ "--platform",
+ platformName,
"--output-directory",
outpath.getAbsolutePath())
@@ -1762,7 +1765,8 @@ class QualificationSuite extends BaseTestSuite {
val logFiles = Array(s"$logDir/nds_q88_photon_db_13_3.zstd") // photon event log
// Status counts: 1 SUCCESS, 0 FAILURE, 0 SKIPPED, 0 UNKNOWN
val expectedStatus = Some(StatusReportCounts(1, 0, 0, 0))
- runQualificationTest(logFiles, expectedStatus = expectedStatus)
+ runQualificationTest(logFiles, platformName = PlatformNames.DATABRICKS_AWS,
+ expectedStatus = expectedStatus)
}
test("process multiple attempts of the same app ID and skip lower attempts") {
diff --git a/core/src/test/scala/com/nvidia/spark/rapids/tool/util/ToolUtilsSuite.scala b/core/src/test/scala/com/nvidia/spark/rapids/tool/util/ToolUtilsSuite.scala
index baba6eb79..5e1b6558b 100644
--- a/core/src/test/scala/com/nvidia/spark/rapids/tool/util/ToolUtilsSuite.scala
+++ b/core/src/test/scala/com/nvidia/spark/rapids/tool/util/ToolUtilsSuite.scala
@@ -24,13 +24,13 @@ import scala.concurrent.duration._
import scala.xml.XML
import com.nvidia.spark.rapids.tool.profiling.{ProfileOutputWriter, ProfileResult}
+import org.scalatest.AppendedClues.convertToClueful
import org.scalatest.FunSuite
import org.scalatest.Matchers.{contain, convertToAnyShouldWrapper, equal, not}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.TrampolineUtil
-import org.apache.spark.sql.rapids.tool.util.{FSUtils, RapidsToolsConfUtil, StringUtils, WebCrawlerUtil}
-
+import org.apache.spark.sql.rapids.tool.util.{FSUtils, InPlaceMedianArrView, RapidsToolsConfUtil, StringUtils, WebCrawlerUtil}
class ToolUtilsSuite extends FunSuite with Logging {
test("get page links of a url") {
@@ -210,6 +210,27 @@ class ToolUtilsSuite extends FunSuite with Logging {
}
}
+ test("Finding median of arrays") {
+ val testSet: Map[String, (Array[Long], Long)] = Map(
+ "All same values" -> (Array[Long](5, 5, 5, 5) -> 5L),
+ "Odd number of values [9, 7, 5, 3, 1]" -> (Array[Long](9, 7, 5, 3, 1) -> 5L),
+ "Even number of values [11, 9, 7, 5, 3, 1]" -> (Array[Long](11, 9, 7, 5, 3, 1) -> 6),
+ "Even number of values(2) [15, 13, 11, 9, 7, 5, 3, 1]" ->
+ (Array[Long](15, 13, 11, 9, 7, 5, 3, 1) -> 8),
+ "Even number of values(3) [3, 13, 11, 9, 7, 5, 15, 1]" ->
+ (Array[Long](3, 13, 11, 9, 7, 5, 15, 1) -> 8),
+ "Single element" -> (Array[Long](1) -> 1),
+ "Two elements" -> (Array[Long](1, 2).reverse -> 1)
+ )
+ for ((desc, (arr, expectedMedian)) <- testSet) {
+ val actualMedian =
+ InPlaceMedianArrView.findMedianInPlace(arr)(InPlaceMedianArrView.chooseMidpointPivotInPlace)
+ actualMedian shouldBe expectedMedian withClue s"Failed for $desc. " +
+ s"Expected: $expectedMedian, " +
+ s"Actual: $actualMedian"
+ }
+ }
+
case class MockProfileResults(appID: String, appIndex: Int, nonEnglishField: String,
parentIDs: String) extends ProfileResult {
override val outputHeaders: Seq[String] = Seq("appID", "appIndex", "nonEnglishField",
diff --git a/user_tools/src/spark_rapids_pytools/__init__.py b/user_tools/src/spark_rapids_pytools/__init__.py
index f4ec6f064..13d08a4dd 100644
--- a/user_tools/src/spark_rapids_pytools/__init__.py
+++ b/user_tools/src/spark_rapids_pytools/__init__.py
@@ -16,7 +16,7 @@
from spark_rapids_pytools.build import get_version, get_spark_dep_version
-VERSION = '24.10.3'
+VERSION = '24.10.4'
# defines the default runtime build version for the user tools environment
SPARK_DEP_VERSION = '350'
__version__ = get_version(VERSION)
diff --git a/user_tools/src/spark_rapids_tools/cmdli/argprocessor.py b/user_tools/src/spark_rapids_tools/cmdli/argprocessor.py
index 978f683c1..87b39c40e 100644
--- a/user_tools/src/spark_rapids_tools/cmdli/argprocessor.py
+++ b/user_tools/src/spark_rapids_tools/cmdli/argprocessor.py
@@ -400,6 +400,15 @@ def init_extra_arg_cases(self) -> list:
def define_invalid_arg_cases(self) -> None:
super().define_invalid_arg_cases()
self.define_rejected_missing_eventlogs()
+ self.rejected['Missing Platform argument'] = {
+ 'valid': False,
+ 'callable': partial(self.raise_validation_exception,
+ 'Cannot run tool cmd without platform argument. Re-run the command '
+ 'providing the platform argument.'),
+ 'cases': [
+ [ArgValueCase.UNDEFINED, ArgValueCase.IGNORE, ArgValueCase.IGNORE]
+ ]
+ }
self.rejected['Cluster By Name Without Platform Hints'] = {
'valid': False,
'callable': partial(self.raise_validation_exception,
diff --git a/user_tools/src/spark_rapids_tools/cmdli/tools_cli.py b/user_tools/src/spark_rapids_tools/cmdli/tools_cli.py
index 0d46e5025..e50659b46 100644
--- a/user_tools/src/spark_rapids_tools/cmdli/tools_cli.py
+++ b/user_tools/src/spark_rapids_tools/cmdli/tools_cli.py
@@ -35,9 +35,10 @@ class ToolsCLI(object): # pylint: disable=too-few-public-methods
"""
def qualification(self,
+ *, # force named arguments
+ platform: str,
eventlogs: str = None,
cluster: str = None,
- platform: str = None,
output_folder: str = None,
filter_apps: str = None,
custom_model_file: str = None,
@@ -55,6 +56,8 @@ def qualification(self,
The cmd will process each app individually, but will group apps with the same name into the
same output row after averaging duration metrics accordingly.
+ :param platform: Defines one of the following: "onprem", "emr", "dataproc", "dataproc-gke",
+ "databricks-aws", and "databricks-azure".
:param eventlogs: Event log filenames or CSP storage directories containing event logs
(comma separated).
@@ -62,8 +65,6 @@ def qualification(self,
cluster name on the CSP.
:param cluster: The CPU cluster on which the Spark application(s) were executed.
Name or ID (for databricks platforms) of cluster or path to cluster-properties.
- :param platform: Defines one of the following: "onprem", "emr", "dataproc", "dataproc-gke",
- "databricks-aws", and "databricks-azure".
:param output_folder: Local path to store the output.
:param tools_jar: Path to a bundled jar including Rapids tool. The path is a local filesystem,
or remote cloud storage url. If missing, the wrapper downloads the latest rapids-4-spark-tools_*.jar
@@ -89,8 +90,8 @@ def qualification(self,
For more details on Qualification tool options, please visit
https://docs.nvidia.com/spark-rapids/user-guide/latest/qualification/jar-usage.html#running-the-qualification-tool-standalone-on-spark-event-logs
"""
- eventlogs = Utils.get_value_or_pop(eventlogs, rapids_options, 'e')
platform = Utils.get_value_or_pop(platform, rapids_options, 'p')
+ eventlogs = Utils.get_value_or_pop(eventlogs, rapids_options, 'e')
tools_jar = Utils.get_value_or_pop(tools_jar, rapids_options, 't')
output_folder = Utils.get_value_or_pop(output_folder, rapids_options, 'o')
filter_apps = Utils.get_value_or_pop(filter_apps, rapids_options, 'f')
@@ -108,9 +109,9 @@ def qualification(self,
if estimation_model_args is None:
return None
qual_args = AbsToolUserArgModel.create_tool_args('qualification',
+ platform=platform,
eventlogs=eventlogs,
cluster=cluster,
- platform=platform,
output_folder=output_folder,
tools_jar=tools_jar,
jvm_heap_size=jvm_heap_size,
@@ -127,9 +128,10 @@ def qualification(self,
return None
def profiling(self,
+ *, # force named arguments
+ platform: str,
eventlogs: str = None,
cluster: str = None,
- platform: str = None,
driverlog: str = None,
output_folder: str = None,
tools_jar: str = None,
@@ -146,14 +148,14 @@ def profiling(self,
The tool also will recommend setting for the application assuming that the job will be able
to use all the cluster resources (CPU and GPU) when it is running.
+ :param platform: defines one of the following "onprem", "emr", "dataproc", "databricks-aws",
+ and "databricks-azure".
:param eventlogs: Event log filenames or cloud storage directories
containing event logs (comma separated). If missing, the wrapper reads the Spark's
property `spark.eventLog.dir` defined in the `cluster`.
:param cluster: The cluster on which the Spark applications were executed. The argument
can be a cluster name or ID (for databricks platforms) or a valid path to the cluster's
properties file (json format) generated by the CSP SDK.
- :param platform: defines one of the following "onprem", "emr", "dataproc", "databricks-aws",
- and "databricks-azure".
:param driverlog: Valid path to the GPU driver log file.
:param output_folder: path to store the output.
:param tools_jar: Path to a bundled jar including Rapids tool. The path is a local filesystem,
@@ -173,9 +175,9 @@ def profiling(self,
For more details on Profiling tool options, please visit
https://docs.nvidia.com/spark-rapids/user-guide/latest/profiling/jar-usage.html#prof-tool-title-options
"""
+ platform = Utils.get_value_or_pop(platform, rapids_options, 'p')
eventlogs = Utils.get_value_or_pop(eventlogs, rapids_options, 'e')
cluster = Utils.get_value_or_pop(cluster, rapids_options, 'c')
- platform = Utils.get_value_or_pop(platform, rapids_options, 'p')
driverlog = Utils.get_value_or_pop(driverlog, rapids_options, 'd')
output_folder = Utils.get_value_or_pop(output_folder, rapids_options, 'o')
tools_jar = Utils.get_value_or_pop(tools_jar, rapids_options, 't')
@@ -184,9 +186,9 @@ def profiling(self,
ToolLogging.enable_debug_mode()
init_environment('prof')
prof_args = AbsToolUserArgModel.create_tool_args('profiling',
+ platform=platform,
eventlogs=eventlogs,
cluster=cluster,
- platform=platform,
driverlog=driverlog,
jvm_heap_size=jvm_heap_size,
jvm_threads=jvm_threads,
diff --git a/user_tools/tests/spark_rapids_tools_e2e/features/event_log_processing.feature b/user_tools/tests/spark_rapids_tools_e2e/features/event_log_processing.feature
index cd66b0bb6..fc7ec2a52 100644
--- a/user_tools/tests/spark_rapids_tools_e2e/features/event_log_processing.feature
+++ b/user_tools/tests/spark_rapids_tools_e2e/features/event_log_processing.feature
@@ -16,6 +16,7 @@ Feature: Event Log Processing
@test_id_ELP_0001
Scenario Outline: Tool spark_rapids runs with different types of event logs
+ Given platform is ""
When spark-rapids tool is executed with "" eventlogs
Then stderr contains the following
"""
@@ -25,12 +26,13 @@ Feature: Event Log Processing
And return code is "0"
Examples:
- | event_logs | expected_stderr | processed_apps_count |
- | invalid_path_eventlog | process.failure.count = 1;invalid_path_eventlog not found, skipping! | 0 |
- | gpu_eventlog.zstd | process.skipped.count = 1;GpuEventLogException: Cannot parse event logs from GPU run: skipping this file | 0 |
- | photon_eventlog.zstd | process.success.count = 1; | 1 |
- | streaming_eventlog.zstd | process.skipped.count = 1;StreamingEventLogException: Encountered Spark Structured Streaming Job: skipping this file! | 0 |
- | incorrect_app_status_eventlog.zstd | process.NA.count = 1;IncorrectAppStatusException: Application status is incorrect. Missing AppInfo | 0 |
+ | platform | event_logs | expected_stderr | processed_apps_count |
+ | onprem | invalid_path_eventlog | process.failure.count = 1;invalid_path_eventlog not found, skipping! | 0 |
+ | onprem | gpu_eventlog.zstd | process.skipped.count = 1;GpuEventLogException: Cannot parse event logs from GPU run: skipping this file | 0 |
+ | onprem | streaming_eventlog.zstd | process.skipped.count = 1;StreamingEventLogException: Encountered Spark Structured Streaming Job: skipping this file! | 0 |
+ | onprem | incorrect_app_status_eventlog.zstd | process.NA.count = 1;IncorrectAppStatusException: Application status is incorrect. Missing AppInfo | 0 |
+ | onprem | photon_eventlog.zstd | process.skipped.count = 1;UnsupportedSparkRuntimeException: Platform 'onprem' does not support the runtime 'PHOTON' | 0 |
+ | databricks-aws | photon_eventlog.zstd | process.success.count = 1; | 1 |
@test_id_ELP_0002
Scenario: Qualification tool JAR crashes
diff --git a/user_tools/tests/spark_rapids_tools_ut/test_tool_argprocessor.py b/user_tools/tests/spark_rapids_tools_ut/test_tool_argprocessor.py
index 300751242..ff4bfff35 100644
--- a/user_tools/tests/spark_rapids_tools_ut/test_tool_argprocessor.py
+++ b/user_tools/tests/spark_rapids_tools_ut/test_tool_argprocessor.py
@@ -131,13 +131,9 @@ def test_with_platform_with_eventlogs(self, get_ut_data_dir, tool_name, csp):
cost_savings_enabled=False,
expected_platform=csp)
- # should pass: platform not provided; event logs are provided
- tool_args = self.create_tool_args_should_pass(tool_name,
- eventlogs=f'{get_ut_data_dir}/eventlogs')
- # for qualification, cost savings should be disabled because cluster is not provided
- self.validate_tool_args(tool_name=tool_name, tool_args=tool_args,
- cost_savings_enabled=False,
- expected_platform=CspEnv.ONPREM)
+ # should fail: platform must be provided
+ self.create_tool_args_should_fail(tool_name,
+ eventlogs=f'{get_ut_data_dir}/eventlogs')
@pytest.mark.parametrize('tool_name', ['qualification', 'profiling'])
@pytest.mark.parametrize('csp', all_csps)
@@ -150,17 +146,19 @@ def test_with_platform_with_eventlogs_with_jar_files(self, get_ut_data_dir, tool
tools_jar=f'{get_ut_data_dir}/tools_mock.jar')
assert tool_args['toolsJar'] == f'{get_ut_data_dir}/tools_mock.jar'
- # should pass: tools_jar is correct
- tool_args = self.create_tool_args_should_pass(tool_name, eventlogs=f'{get_ut_data_dir}/eventlogs',
- tools_jar=f'{get_ut_data_dir}/tools_mock.jar')
- assert tool_args['toolsJar'] == f'{get_ut_data_dir}/tools_mock.jar'
+ # should fail: platform must be provided
+ self.create_tool_args_should_fail(tool_name,
+ eventlogs=f'{get_ut_data_dir}/eventlogs',
+ tools_jar=f'{get_ut_data_dir}/tools_mock.jar')
# should fail: tools_jar does not exist
- self.create_tool_args_should_fail(tool_name, eventlogs=f'{get_ut_data_dir}/eventlogs',
+ self.create_tool_args_should_fail(tool_name, platform=csp,
+ eventlogs=f'{get_ut_data_dir}/eventlogs',
tools_jar=f'{get_ut_data_dir}/tools_mock.txt')
# should fail: tools_jar is not .jar extension
- self.create_tool_args_should_fail(tool_name, eventlogs=f'{get_ut_data_dir}/eventlogs',
+ self.create_tool_args_should_fail(tool_name, platform=csp,
+ eventlogs=f'{get_ut_data_dir}/eventlogs',
tools_jar=f'{get_ut_data_dir}/worker_info.yaml')
@pytest.mark.parametrize('tool_name', ['qualification', 'profiling'])
@@ -230,25 +228,15 @@ def test_with_platform_with_cluster_props(self, get_ut_data_dir, tool_name, csp,
self.validate_tool_args(tool_name=tool_name, tool_args=tool_args,
cost_savings_enabled=True,
expected_platform=csp)
-
- # should pass: platform not provided; missing eventlogs should be accepted for all CSPs (except onPrem)
- # because the eventlogs can be retrieved from the cluster properties
- tool_args = self.create_tool_args_should_pass(tool_name,
- cluster=cluster_prop_file)
- # for qualification, cost savings should be enabled because cluster is provided
- self.validate_tool_args(tool_name=tool_name, tool_args=tool_args,
- cost_savings_enabled=True,
- expected_platform=csp)
else:
# should fail: onprem platform cannot retrieve eventlogs from cluster properties
self.create_tool_args_should_fail(tool_name,
platform=csp,
cluster=cluster_prop_file)
- # should fail: platform not provided; defaults platform to onprem, cannot retrieve eventlogs from
- # cluster properties
- self.create_tool_args_should_fail(tool_name,
- cluster=cluster_prop_file)
+ # should fail: platform must be provided for all CSPs as well as onprem
+ self.create_tool_args_should_fail(tool_name,
+ cluster=cluster_prop_file)
@pytest.mark.parametrize('tool_name', ['qualification', 'profiling'])
@pytest.mark.parametrize('csp,prop_path', all_cpu_cluster_props)
@@ -266,14 +254,10 @@ def test_with_platform_with_cluster_props_with_eventlogs(self, get_ut_data_dir,
cost_savings_enabled=CspEnv(csp) != CspEnv.ONPREM,
expected_platform=csp)
- # should pass: platform not provided; cluster properties and eventlogs are provided
- tool_args = self.create_tool_args_should_pass(tool_name,
- cluster=cluster_prop_file,
- eventlogs=f'{get_ut_data_dir}/eventlogs')
- # for qualification, cost savings should be enabled because cluster is provided (except for onprem)
- self.validate_tool_args(tool_name=tool_name, tool_args=tool_args,
- cost_savings_enabled=CspEnv(csp) != CspEnv.ONPREM,
- expected_platform=csp)
+ # should fail: platform must be provided
+ self.create_tool_args_should_fail(tool_name,
+ cluster=cluster_prop_file,
+ eventlogs=f'{get_ut_data_dir}/eventlogs')
@pytest.mark.parametrize('tool_name', ['profiling'])
@pytest.mark.parametrize('csp', all_csps)
@@ -308,18 +292,15 @@ def test_with_platform_with_autotuner_with_eventlogs(self, get_ut_data_dir, tool
cost_savings_enabled=False,
expected_platform=csp)
- # should pass: platform not provided; autotuner properties and eventlogs are provided
- tool_args = self.create_tool_args_should_pass(tool_name,
- cluster=autotuner_prop_file,
- eventlogs=f'{get_ut_data_dir}/eventlogs')
- # cost savings should be disabled for profiling
- self.validate_tool_args(tool_name=tool_name, tool_args=tool_args,
- cost_savings_enabled=False,
- expected_platform=CspEnv.ONPREM)
+ # should fail: platform must be provided
+ self.create_tool_args_should_fail(tool_name,
+ cluster=autotuner_prop_file,
+ eventlogs=f'{get_ut_data_dir}/eventlogs')
@pytest.mark.parametrize('prop_path', [autotuner_prop_path])
def test_profiler_with_driverlog(self, get_ut_data_dir, prop_path):
prof_args = AbsToolUserArgModel.create_tool_args('profiling',
+ platform=CspEnv.get_default(),
driverlog=f'{get_ut_data_dir}/{prop_path}')
assert not prof_args['requiresEventlogs']
assert prof_args['rapidOptions']['driverlog'] == f'{get_ut_data_dir}/{prop_path}'