Skip to content

Commit

Permalink
[FEA] Add IO diagnostic output for GPU slowness in Profiler tool (#1451)
Browse files Browse the repository at this point in the history
Contributes to #1374

### Changes

- Added an IO diagnostic view in Profiler output:
`io_diagnostic_metrics.csv`
- Added class `IOAccumDiagnosticMetrics` to store selected IO related
metric names and methods
- Added class `IODiagnosticResult` to represent each IO diagnostic
result
- In
`core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/AppSQLPlanAnalyzer.scala`,
cache results from `generateSQLAccums` and use them to compute IO
diagnostic metrics in function `generateIODiagnosticAccums`
  - Added `IODiagnostics` in class `DiagnosticSummaryInfo`
- Reorganized `AccumProfileResults` and `SQLAccumProfileResults`
presentation for better readability


### Testing

- Added unit test "test IO diagnostic metrics" in
`core/src/test/scala/com/nvidia/spark/rapids/tool/profiling/AnalysisSuite.scala`

### Example Output

```
appIndex,appName,appId,sqlId,stageId,stageDurationMs,nodeId,nodeName,outputRowsMin,outputRowsMedian,outputRowsMax,outputRowsTotal,scanTimeMin,scanTimeMedian,scanTimeMax,scanTimeTotal,outputBatchesMin,outputBatchesMedian,outputBatchesMax,outputBatchesTotal,bufferTimeMin,bufferTimeMedian,bufferTimeMax,bufferTimeTotal,shuffleWriteTimeMin,shuffleWriteTimeMedian,shuffleWriteTimeMax,shuffleWriteTimeTotal,fetchWaitTimeMin,fetchWaitTimeMedian,fetchWaitTimeMax,fetchWaitTimeTotal,gpuDecodeTimeMin,gpuDecodeTimeMedian,gpuDecodeTimeMax,gpuDecodeTimeTotal
1,Spark shell,local-1622814619968,0,0,1743,16,"GpuColumnarExchange",1666666,1666667,1666667,10000000,0,0,0,0,200,200,200,1200,0,0,0,0,41434653,60830365,100858775,400284505,0,0,0,0,0,0,0,0
1,Spark shell,local-1622814619968,0,0,1743,21,"Scan",1666666,1666667,1666667,10000000,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0
1,Spark shell,local-1622814619968,0,1,1631,8,"GpuColumnarExchange",1666666,1666667,1666667,10000000,0,0,0,0,200,200,200,1200,0,0,0,0,37444140,92128351,108992798,508750471,0,0,0,0,0,0,0,0
1,Spark shell,local-1622814619968,0,1,1631,13,"Scan",1666666,1666667,1666667,10000000,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0
1,Spark shell,local-1622814619968,0,2,688,3,"GpuColumnarExchange",1,1,1,200,0,0,0,0,1,1,1,200,0,0,0,0,139875,230038,9747416,93193331,0,0,0,0,0,0,0,0
```

### Follow-up Issue
#1454

---------

Signed-off-by: cindyyuanjiang <[email protected]>
  • Loading branch information
cindyyuanjiang authored Jan 27, 2025
1 parent d15a999 commit de411e3
Show file tree
Hide file tree
Showing 14 changed files with 758 additions and 123 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
* Copyright (c) 2024-2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -19,15 +19,17 @@ package com.nvidia.spark.rapids.tool.analysis
import scala.collection.breakOut
import scala.collection.mutable.{AbstractSet, ArrayBuffer, HashMap, LinkedHashSet}

import com.nvidia.spark.rapids.tool.profiling.{AccumProfileResults, SQLAccumProfileResults, SQLMetricInfoCase, SQLStageInfoProfileResult, UnsupportedSQLPlan, WholeStageCodeGenResults}
import com.nvidia.spark.rapids.tool.analysis.util.IOAccumDiagnosticMetrics._
import com.nvidia.spark.rapids.tool.analysis.util.StageAccumDiagnosticMetrics._
import com.nvidia.spark.rapids.tool.profiling.{AccumProfileResults, IODiagnosticResult, SQLAccumProfileResults, SQLMetricInfoCase, SQLStageInfoProfileResult, UnsupportedSQLPlan, WholeStageCodeGenResults}
import com.nvidia.spark.rapids.tool.qualification.QualSQLPlanAnalyzer

import org.apache.spark.sql.execution.SparkPlanInfo
import org.apache.spark.sql.execution.ui.{SparkPlanGraph, SparkPlanGraphCluster, SparkPlanGraphNode}
import org.apache.spark.sql.rapids.tool.{AppBase, RDDCheckHelper, SqlPlanInfoGraphBuffer, SqlPlanInfoGraphEntry}
import org.apache.spark.sql.rapids.tool.profiling.ApplicationInfo
import org.apache.spark.sql.rapids.tool.qualification.QualificationAppInfo
import org.apache.spark.sql.rapids.tool.store.DataSourceRecord
import org.apache.spark.sql.rapids.tool.store.{AccumInfo, DataSourceRecord}
import org.apache.spark.sql.rapids.tool.util.ToolsPlanGraph

/**
Expand Down Expand Up @@ -58,21 +60,78 @@ class AppSQLPlanAnalyzer(app: AppBase, appIndex: Int) extends AppAnalysisBase(ap
var allSQLMetrics: ArrayBuffer[SQLMetricInfoCase] = ArrayBuffer[SQLMetricInfoCase]()
// A map between stage ID and a set of node names
val stageToNodeNames: HashMap[Long, Seq[String]] = HashMap.empty[Long, Seq[String]]
// A map between stage ID and diagnostic metrics results (stored as a map between metric name
// and AccumProfileResults)

// A mapping from stage ID to diagnostic metrics results.
// Each stage ID maps to another HashMap, where:
// - The key is the diagnostic metric name (String).
// - The value is an AccumProfileResults object containing the diagnostic data for that metric.
val stageToDiagnosticMetrics: HashMap[Long, HashMap[String, AccumProfileResults]] =
HashMap.empty[Long, HashMap[String, AccumProfileResults]]

// A mapping from a unique combination of SQL execution identifiers to a list of IO diagnostic
// metrics results.
// The key is a tuple consisting of:
// - sqlID (Long): The unique identifier for the SQL query.
// - nodeID (Long): The unique identifier for the node.
// The value is an ArrayBuffer of SQLAccumProfileResults objects, storing the IO diagnostic
// metrics for the given key.
val IODiagnosticMetricsMap: HashMap[(Long, Long), ArrayBuffer[SQLAccumProfileResults]] =
HashMap.empty[(Long, Long), ArrayBuffer[SQLAccumProfileResults]]

/**
* Given an input diagnostic metric result, update stageToDiagnosticMetrics mapping
* @param accum AccumProfileResults to be analyzed
* Updates the stageToDiagnosticMetrics mapping with the provided AccumProfileResults.
* @param accum AccumProfileResults instance containing diagnostic metrics to be added
* to stageToDiagnosticMetrics mapping.
*/
private def updateStageDiagnosticMetrics(accum: AccumProfileResults): Unit = {
val stageId = accum.stageId
if (!stageToDiagnosticMetrics.contains(stageId)) {
stageToDiagnosticMetrics(stageId) = HashMap.empty[String, AccumProfileResults]
// Initialize an empty mapping for the stage if it doesn't already exist
if (!stageToDiagnosticMetrics.contains(accum.stageId)) {
stageToDiagnosticMetrics(accum.stageId) = HashMap.empty[String, AccumProfileResults]
}
stageToDiagnosticMetrics(stageId)(accum.accMetaRef.getName()) = accum

stageToDiagnosticMetrics(accum.stageId)(accum.accMetaRef.getName()) = accum
}

/**
* Updates the IODiagnosticMetricsMap with the provided SQLAccumProfileResults.
* @param accum SQLAccumProfileResults instance containing IO diagnostics metrics
* to be added to IODiagnosticMetricsMap.
*/
private def updateIODiagnosticMetricsMap(accum: SQLAccumProfileResults): Unit = {
val key = (accum.sqlID, accum.nodeID)

// Initialize an entry if the key does not exist
if (!IODiagnosticMetricsMap.contains(key)) {
IODiagnosticMetricsMap(key) = ArrayBuffer[SQLAccumProfileResults]()
}

IODiagnosticMetricsMap(key) += accum
}

/**
* Retrieves the task IDs associated with a specific stage.
*
* @param stageId The ID of the stage.
* @return A seq of task IDs corresponding to the given stage ID.
*/
private def getStageTaskIds(stageId: Int): Seq[Long] = {
app.taskManager.getAllTasksStageAttempt(stageId).map(_.taskId)(breakOut).distinct
}

/**
* Retrieves task update values from the accumulator info for the specified stage ID.
*
* @param accumInfo AccumInfo object containing the task updates map.
* @param stageId The stage ID for which task updates need to be retrived.
* @return An array of task update values (`Long`) corresponding to the tasks
* in the specified stage.
*/
private def filterAccumTaskUpdatesForStage(accumInfo: AccumInfo, stageTaskIds: Seq[Long])
: Array[Long] = {
stageTaskIds.collect {
case taskId if accumInfo.taskUpdatesMap.contains(taskId) =>
accumInfo.taskUpdatesMap(taskId)
}(breakOut)
}

/**
Expand Down Expand Up @@ -310,15 +369,100 @@ class AppSQLPlanAnalyzer(app: AppBase, appIndex: Int) extends AppAnalysisBase(ap
val med = Math.max(taskInfo.med, driverInfo.med)
val total = Math.max(taskInfo.total, driverInfo.total)

Some(SQLAccumProfileResults(appIndex, metric.sqlID,
val sqlAccumProileResult = SQLAccumProfileResults(appIndex, metric.sqlID,
metric.nodeID, metric.nodeName, metric.accumulatorId, metric.name,
min, med, max, total, metric.metricType, metric.stageIds.mkString(",")))
min, med, max, total, metric.metricType, metric.stageIds)

if (isIODiagnosticMetricName(metric.name)) {
updateIODiagnosticMetricsMap(sqlAccumProileResult)
}

Some(sqlAccumProileResult)
} else {
None
}
}(breakOut)
}

/**
* Generates IO-related diagnostic metrics for the SQL plan. Metrics include:
* - Output rows
* - Scan time
* - Output batches
* - Buffer time
* - Shuffle write time
* - Fetch wait time
* - GPU decode time
*
* This method processes accumulator information for each SQL stage and node and
* computes statistical results (min, median, max, sum) for IO-related metrics.
*
* @return A sequence of `IODiagnosticResult` objects one per SQL stage and node.
*/
def generateIODiagnosticAccums(): Seq[IODiagnosticResult] = {
// Transform the diagnostic metrics map into a sequence of results
IODiagnosticMetricsMap.flatMap { case ((sqlId, nodeId), sqlAccums) =>
// Process each stage ID and compute diagnostic results
// TODO: currently if stage IDs is empty, the result is skipped
val stageIds = sqlAccums.head.stageIds
stageIds.flatMap { stageId =>
val stageTaskIds = getStageTaskIds(stageId)
val nodeName = sqlAccums.head.nodeName

// Initialize a map to store statistics for each IO metric
val metricNameToStatistics = HashMap.empty[String, StatisticsMetrics].
withDefaultValue(StatisticsMetrics.ZERO_RECORD)

// Process each accumulator for the current SQL stage
sqlAccums.foreach { sqlAccum =>
// TODO: check if accumulator ID is in driverAccumMap, currently skipped
val accumInfoOpt = app.accumManager.accumInfoMap.get(sqlAccum.accumulatorId)

val metricStats: Option[StatisticsMetrics] = accumInfoOpt.flatMap { accumInfo =>
if (!accumInfo.stageValuesMap.contains(stageId)) {
None
} else if (stageIds.size == 1) {
// Skip computing statistics when there is only one stage
Some(StatisticsMetrics(sqlAccum.min, sqlAccum.median, sqlAccum.max, sqlAccum.total))
} else {
// Retrieve task updates which correspond to the current stage
val filteredTaskUpdates = filterAccumTaskUpdatesForStage(accumInfo, stageTaskIds)
StatisticsMetrics.createOptionalFromArr(filteredTaskUpdates)
}
}

// Compute the metric's statistics and store the results if available
metricStats.map { stat =>
val metricKey = normalizeToIODiagnosticMetricKey(sqlAccum.name)
metricNameToStatistics(metricKey) = stat
}
}

if (metricNameToStatistics.isEmpty) {
// No IO metric statistics were computed for this stage
None
} else {
Some(IODiagnosticResult(
appIndex,
app.getAppName,
app.appId,
sqlId,
stageId,
app.stageManager.getDurationById(stageId),
nodeId,
nodeName,
metricNameToStatistics(OUTPUT_ROWS_METRIC_KEY),
metricNameToStatistics(SCAN_TIME_METRIC_KEY),
metricNameToStatistics(OUTPUT_BATCHES_METRIC_KEY),
metricNameToStatistics(BUFFER_TIME_METRIC_KEY),
metricNameToStatistics(SHUFFLE_WRITE_TIME_METRIC_KEY),
metricNameToStatistics(FETCH_WAIT_TIME_METRIC_KEY),
metricNameToStatistics(GPU_DECODE_TIME_METRIC_KEY)))
}
}
}(breakOut)
}

/**
* Generate the stage level metrics for the SQL plan including GPU metrics if applicable.
* Along with Spark defined metrics, below is the list of GPU metrics that are collected if they
Expand All @@ -333,11 +477,12 @@ class AppSQLPlanAnalyzer(app: AppBase, appIndex: Int) extends AppAnalysisBase(ap
app.accumManager.accumInfoMap.flatMap { accumMapEntry =>
val accumInfo = accumMapEntry._2
accumInfo.stageValuesMap.keys.flatMap( stageId => {
val stageTaskIds: Set[Long] =
app.taskManager.getAllTasksStageAttempt(stageId).map(_.taskId)(breakOut)
// Retrieve task updates correspond to the current stage
val filteredTaskUpdates =
filterAccumTaskUpdatesForStage(accumInfo, getStageTaskIds(stageId))

// Get the task updates that belong to that stage
StatisticsMetrics.createOptionalFromArr(
accumInfo.taskUpdatesMap.filterKeys(stageTaskIds).map(_._2)(breakOut)) match {
StatisticsMetrics.createOptionalFromArr(filteredTaskUpdates) match {
case Some(stat) =>
// Reuse AccumProfileResults to avoid generating allocating new objects
val accumProfileResults = AccumProfileResults(
Expand All @@ -348,7 +493,7 @@ class AppSQLPlanAnalyzer(app: AppBase, appIndex: Int) extends AppAnalysisBase(ap
median = stat.med,
max = stat.max,
total = stat.total)
if (accumInfo.infoRef.name.isDiagnosticMetrics()) {
if (isDiagnosticMetrics(accumInfo.infoRef.name.value)) {
updateStageDiagnosticMetrics(accumProfileResults)
}
Some(accumProfileResults)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ package com.nvidia.spark.rapids.tool.analysis
import scala.collection.breakOut
import scala.collection.mutable.{ArrayBuffer, HashMap, LinkedHashMap}

import com.nvidia.spark.rapids.tool.analysis.StageAccumDiagnosticMetrics._
import com.nvidia.spark.rapids.tool.analysis.util.{AggAccumHelper, AggAccumPhotonHelper}
import com.nvidia.spark.rapids.tool.analysis.util.StageAccumDiagnosticMetrics._
import com.nvidia.spark.rapids.tool.planparser.DatabricksParseHelper
import com.nvidia.spark.rapids.tool.profiling._

Expand Down
Loading

0 comments on commit de411e3

Please sign in to comment.