Skip to content

Commit

Permalink
Merge dev into main
Browse files Browse the repository at this point in the history
Signed-off-by: spark-rapids automation <[email protected]>
  • Loading branch information
nvauto committed Jul 13, 2023
2 parents dcfbc66 + a849c0d commit 2b02b25
Show file tree
Hide file tree
Showing 48 changed files with 1,292 additions and 1,244 deletions.
611 changes: 313 additions & 298 deletions core/docs/spark-profiling-tool.md

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
<artifactId>rapids-4-spark-tools_2.12</artifactId>
<name>RAPIDS Accelerator for Apache Spark tools</name>
<description>RAPIDS Accelerator for Apache Spark tools</description>
<version>23.06.1</version>
<version>23.06.2-SNAPSHOT</version>
<packaging>jar</packaging>
<url>http://github.com/NVIDIA/spark-rapids-tools</url>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ case class FileSourceScanExecParser(
val fullExecName = "FileSourceScanExec"

override def parse: ExecInfo = {
// Remove trailing spaces from node name
// Example: Scan parquet . -> Scan parquet.
val nodeName = node.name.trim
val accumId = node.metrics.find(_.name == "scan time").map(_.accumulatorId)
val maxDuration = SQLPlanParser.getTotalDuration(accumId, app)

Expand All @@ -42,6 +45,6 @@ case class FileSourceScanExecParser(
val overallSpeedup = Math.max((speedupFactor * score), 1.0)

// TODO - add in parsing expressions - average speedup across?
new ExecInfo(sqlID, node.name, "", overallSpeedup, maxDuration, node.id, score > 0, None)
new ExecInfo(sqlID, nodeName, "", overallSpeedup, maxDuration, node.id, score > 0, None)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,8 @@ class RecommendationEntry(val name: String,
*/
class AutoTuner(
val clusterProps: ClusterProperties,
val appInfoProvider: AppSummaryInfoBaseProvider) extends Logging {
val appInfoProvider: AppSummaryInfoBaseProvider,
val platform: String) extends Logging {

import AutoTuner._

Expand Down Expand Up @@ -987,9 +988,10 @@ object AutoTuner extends Logging {

private def handleException(
ex: Exception,
appInfo: AppSummaryInfoBaseProvider): AutoTuner = {
appInfo: AppSummaryInfoBaseProvider,
platform: String): AutoTuner = {
logError("Exception: " + ex.getStackTrace.mkString("Array(", ", ", ")"))
val tuning = new AutoTuner(new ClusterProperties(), appInfo)
val tuning = new AutoTuner(new ClusterProperties(), appInfo, platform)
val msg = ex match {
case cEx: ConstructorException => cEx.getContext
case _ => if (ex.getCause != null) ex.getCause.toString else ex.toString
Expand Down Expand Up @@ -1033,25 +1035,27 @@ object AutoTuner extends Logging {
*/
def buildAutoTunerFromProps(
clusterProps: String,
singleAppProvider: AppSummaryInfoBaseProvider): AutoTuner = {
singleAppProvider: AppSummaryInfoBaseProvider,
platform: String = Profiler.DEFAULT_PLATFORM): AutoTuner = {
try {
val clusterPropsOpt = loadClusterPropertiesFromContent(clusterProps)
new AutoTuner(clusterPropsOpt.getOrElse(new ClusterProperties()), singleAppProvider)
new AutoTuner(clusterPropsOpt.getOrElse(new ClusterProperties()), singleAppProvider, platform)
} catch {
case e: Exception =>
handleException(e, singleAppProvider)
handleException(e, singleAppProvider, platform)
}
}

def buildAutoTuner(
filePath: String,
singleAppProvider: AppSummaryInfoBaseProvider): AutoTuner = {
singleAppProvider: AppSummaryInfoBaseProvider,
platform: String = Profiler.DEFAULT_PLATFORM): AutoTuner = {
try {
val clusterPropsOpt = loadClusterProps(filePath)
new AutoTuner(clusterPropsOpt.getOrElse(new ClusterProperties()), singleAppProvider)
new AutoTuner(clusterPropsOpt.getOrElse(new ClusterProperties()), singleAppProvider, platform)
} catch {
case e: Exception =>
handleException(e, singleAppProvider)
handleException(e, singleAppProvider, platform)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,12 @@ Usage: java -cp rapids-4-spark-tools_2.12-<version>.jar:$SPARK_HOME/jars/*
opt[Boolean](required = false,
descr = "Print the SQL plans to a file named 'planDescriptions.log'." +
" Default is false.")
val platform: ScallopOption[String] =
opt[String](required = false,
descr = "Cluster platform where Spark GPU workloads were executed. Options include " +
"onprem, dataproc, emr, databricks." +
" Default is onprem.",
default = Some(Profiler.DEFAULT_PLATFORM))
val generateTimeline: ScallopOption[Boolean] =
opt[Boolean](required = false,
descr = "Write an SVG graph out for the full application timeline.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -470,8 +470,9 @@ class Profiler(hadoopConf: Configuration, appArgs: ProfileArgs) extends Logging

if (useAutoTuner) {
val workerInfoPath = appArgs.workerInfo.getOrElse(AutoTuner.DEFAULT_WORKER_INFO_PATH)
val platform = appArgs.platform.getOrElse(Profiler.DEFAULT_PLATFORM)
val autoTuner: AutoTuner = AutoTuner.buildAutoTuner(workerInfoPath,
new SingleAppSummaryInfoProvider(app))
new SingleAppSummaryInfoProvider(app), platform)
// the autotuner allows skipping some properties
// e.g. getRecommendedProperties(Some(Seq("spark.executor.instances"))) skips the
// recommendation related to executor instances.
Expand All @@ -489,6 +490,8 @@ object Profiler {
val COMPARE_LOG_FILE_NAME_PREFIX = "rapids_4_spark_tools_compare"
val COMBINED_LOG_FILE_NAME_PREFIX = "rapids_4_spark_tools_combined"
val SUBDIR = "rapids_4_spark_profile"
val DEFAULT_PLATFORM = "onprem"

def getAutoTunerResultsAsString(props: Seq[RecommendedPropertyResult],
comments: Seq[RecommendedCommentResult]): String = {
val propStr = if (props.nonEmpty) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ class PluginTypeChecker(platform: String = "onprem") extends Logging {
// Some SQL function names have backquotes(`) around their names,
// so we remove them before saving.
readSupportedOperators(source, "exprs").map(
x => (x._1.toLowerCase.replaceAll("\\`", ""), x._2))
x => (x._1.toLowerCase.replaceAll("\\`", "").replaceAll(" ",""), x._2))
}

private def readSupportedTypesForPlugin: (
Expand Down Expand Up @@ -293,15 +293,7 @@ class PluginTypeChecker(platform: String = "onprem") extends Logging {
}

def isExprSupported(expr: String): Boolean = {
// Remove _ from the string. Example: collect_list => collectlist.
// collect_list is alias for CollectList aggregate function.
// An exception is date_format since the Expression and sql function name is different
// Expression: DateFormatClass, sql function- date_format
val exprLowercase = if (expr.equalsIgnoreCase("date_format")) {
expr
} else {
expr.toLowerCase.replace("_", "")
}
val exprLowercase = expr.toLowerCase
if (supportedExprs.contains(exprLowercase)) {
val exprSupported = supportedExprs.getOrElse(exprLowercase, "NS")
if (exprSupported == "S") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

package com.nvidia.spark.rapids.tool.qualification

import scala.collection.mutable.{Buffer, LinkedHashMap, ListBuffer}
import scala.collection.mutable.{ArrayBuffer, Buffer, LinkedHashMap, ListBuffer}

import com.nvidia.spark.rapids.tool.ToolTextFileWriter
import com.nvidia.spark.rapids.tool.planparser.{ExecInfo, PlanInfo}
Expand Down Expand Up @@ -146,6 +146,21 @@ class QualOutputWriter(outputDir: String, reportReadSchema: Boolean,
}
}

def writeUnsupportedOperatorsCSVReport(sums: Seq[QualificationSummaryInfo],
order: String): Unit = {
val csvFileWriter = new ToolTextFileWriter(outputDir,
s"${QualOutputWriter.LOGFILE_NAME}_unsupportedOperators.csv",
"Unsupported Operators CSV Report", hadoopConf)
val headersAndSizes = QualOutputWriter.getUnsupportedOperatorsHeaderStringsAndSizes(sums)
csvFileWriter.write(QualOutputWriter.constructOutputRowFromMap(headersAndSizes,
QualOutputWriter.CSV_DELIMITER, false))
sums.foreach { sum =>
val rows = QualOutputWriter.constructUnsupportedOperatorsInfo(sum, headersAndSizes,
QualOutputWriter.CSV_DELIMITER, false)
rows.foreach(row => csvFileWriter.write(row))
}
}

def writePerSqlCSVReport(sums: Seq[QualificationSummaryInfo], maxSQLDescLength: Int): Unit = {
val csvFileWriter = new ToolTextFileWriter(outputDir,
s"${QualOutputWriter.LOGFILE_NAME}_persql.csv",
Expand Down Expand Up @@ -362,6 +377,9 @@ object QualOutputWriter {
val CLUSTER_TAGS = "Cluster Tags"
val CLUSTER_ID = "ClusterId"
val JOB_ID = "JobId"
val UNSUPPORTED_TYPE = "Unsupported Type"
val DETAILS = "Details"
val NOTES = "Notes"
val RUN_NAME = "RunName"
val ESTIMATED_FREQUENCY = "Estimated Job Frequency (monthly)"
val ML_FUNCTIONS = "ML Functions"
Expand Down Expand Up @@ -493,6 +511,18 @@ object QualOutputWriter {
prettyPrintValue
}

def getUnsupportedOperatorsHeaderStringsAndSizes(
appInfos: Seq[QualificationSummaryInfo]): LinkedHashMap[String, Int] = {
val detailedHeaderAndFields = LinkedHashMap[String, Int](
APP_ID_STR -> QualOutputWriter.getAppIdSize(appInfos),
UNSUPPORTED_TYPE -> UNSUPPORTED_TYPE.size,
DETAILS -> DETAILS.size,
NOTES -> NOTES.size
)
detailedHeaderAndFields
}


def getDetailedHeaderStringsAndSizes(appInfos: Seq[QualificationSummaryInfo],
reportReadSchema: Boolean): LinkedHashMap[String, Int] = {
val detailedHeadersAndFields = LinkedHashMap[String, Int](
Expand Down Expand Up @@ -831,6 +861,93 @@ object QualOutputWriter {
}
}

def constructUnsupportedOperatorsInfo(
sumInfo: QualificationSummaryInfo,
headersAndSizes: LinkedHashMap[String, Int],
delimiter: String = TEXT_DELIMITER,
prettyPrint: Boolean,
reformatCSV: Boolean = true): Seq[String] = {
val reformatCSVFunc: String => String =
if (reformatCSV) str => StringUtils.reformatCSVString(str) else str => stringIfempty(str)
val appId = sumInfo.appId
val readFormat = sumInfo.readFileFormatAndTypesNotSupported
val writeFormat = sumInfo.writeDataFormat
val unsupportedExecs = sumInfo.unSupportedExecs
val unsupportedExprs = sumInfo.unSupportedExprs
val unsupportedExecExprsMap = sumInfo.unsupportedExecstoExprsMap
val unsupportedOperatorsOutputRows = new ArrayBuffer[String]()

if (readFormat.nonEmpty) {
val unsupportedReadFormatRows = readFormat.map { format =>
val readFormatAndType = format.split("\\[")
val readFormat = readFormatAndType(0)
val readType = if (readFormatAndType.size > 1) {
s"Types not supported - ${readFormatAndType(1).replace("]", "")}"
} else {
""
}
val data = ListBuffer(
reformatCSVFunc(appId) -> headersAndSizes(APP_ID_STR),
reformatCSVFunc("Read")-> headersAndSizes(UNSUPPORTED_TYPE),
reformatCSVFunc(readFormat) -> headersAndSizes(DETAILS),
reformatCSVFunc(readType) -> headersAndSizes(NOTES)
)
constructOutputRow(data, delimiter, prettyPrint)
}
unsupportedOperatorsOutputRows ++= unsupportedReadFormatRows
}
if (unsupportedExecs.nonEmpty) {
val unsupportedExecRows = unsupportedExecs.split(";").map { exec =>
val data = ListBuffer(
reformatCSVFunc(appId) -> headersAndSizes(APP_ID_STR),
reformatCSVFunc("Exec") -> headersAndSizes(UNSUPPORTED_TYPE),
reformatCSVFunc(exec) -> headersAndSizes(DETAILS),
reformatCSVFunc("") -> headersAndSizes(NOTES)
)
constructOutputRow(data, delimiter, prettyPrint)
}
unsupportedOperatorsOutputRows ++= unsupportedExecRows
}
if (unsupportedExecExprsMap.nonEmpty) {
val unsupportedExecExprMapRows = unsupportedExecExprsMap.map { case (exec, exprs) =>
val data = ListBuffer(
reformatCSVFunc(appId) -> headersAndSizes(APP_ID_STR),
reformatCSVFunc("Exec") -> headersAndSizes(UNSUPPORTED_TYPE),
reformatCSVFunc(exec) -> headersAndSizes(DETAILS),
reformatCSVFunc("$exec Exec is not supported as expressions are " +
"not supported - `${exprs}`") -> headersAndSizes(NOTES)
)
constructOutputRow(data, delimiter, prettyPrint)
}.toArray
unsupportedOperatorsOutputRows ++= unsupportedExecExprMapRows
}
if (unsupportedExprs.nonEmpty) {
val unsupportedExprRows = unsupportedExprs.split(";").map { expr =>
val data = ListBuffer(
reformatCSVFunc(appId) -> headersAndSizes(APP_ID_STR),
reformatCSVFunc("Expression") -> headersAndSizes(UNSUPPORTED_TYPE),
reformatCSVFunc(expr) -> headersAndSizes(DETAILS),
reformatCSVFunc("") -> headersAndSizes(NOTES)
)
constructOutputRow(data, delimiter, prettyPrint)
}
unsupportedOperatorsOutputRows ++= unsupportedExprRows
}
if (writeFormat.nonEmpty) {
val unsupportedwriteFormatRows = writeFormat.map { format =>
val data = ListBuffer(
reformatCSVFunc(appId) -> headersAndSizes(APP_ID_STR),
reformatCSVFunc("Write") -> headersAndSizes(UNSUPPORTED_TYPE),
reformatCSVFunc(format) -> headersAndSizes(DETAILS),
reformatCSVFunc("") -> headersAndSizes(NOTES)
)
constructOutputRow(data, delimiter, prettyPrint)
}
unsupportedOperatorsOutputRows ++= unsupportedwriteFormatRows
}
unsupportedOperatorsOutputRows
}

def getAllExecsFromPlan(plans: Seq[PlanInfo]): Set[ExecInfo] = {
val topExecInfo = plans.flatMap(_.execInfo)
topExecInfo.flatMap { e =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ class Qualification(outputDir: String, numRows: Int, hadoopConf: Configuration,
}
qWriter.writeExecReport(allAppsSum, order)
qWriter.writeStageReport(allAppsSum, order)
qWriter.writeUnsupportedOperatorsCSVReport(allAppsSum, order)
if (mlOpsEnabled) {
if (allAppsSum.exists(x => x.mlFunctions.nonEmpty)) {
qWriter.writeMlFuncsReports(allAppsSum, order)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -439,7 +439,7 @@ class QualificationAppInfo(
val supportedSQLTaskDuration = calculateSQLSupportedTaskDuration(allStagesSummary)
val taskSpeedupFactor = calculateSpeedupFactor(allStagesSummary)
// Get all the unsupported Execs from the plan
val unSupportedExecs = origPlanInfos.flatMap { p =>
val unSupportedExecs = planInfos.flatMap { p =>
// WholeStageCodeGen is excluded from the result.
val topLevelExecs = p.execInfo.filterNot(_.isSupported).filterNot(
x => x.exec.startsWith("WholeStage"))
Expand All @@ -454,6 +454,19 @@ class QualificationAppInfo(
_.unsupportedExprs)).flatten.filter(_.nonEmpty).toSet.mkString(";")
.trim.replaceAll("\n", "").replace(",", ":")

// Get all unsupported execs and expressions from the plan in form of map[exec -> exprs]
val unsupportedExecExprsMap = planInfos.flatMap { p =>
val topLevelExecs = p.execInfo.filterNot(_.isSupported).filterNot(
x => x.exec.startsWith("WholeStage"))
val childrenExecs = p.execInfo.flatMap { e =>
e.children.map(x => x.filterNot(_.isSupported))
}.flatten
val execs = topLevelExecs ++ childrenExecs
val exprs = execs.filter(_.unsupportedExprs.nonEmpty).map(
e => e.exec -> e.unsupportedExprs.mkString(";")).toMap
exprs
}.toMap

// check if there are any SparkML/XGBoost functions or expressions if the mlOpsEnabled
// config is true
val mlFunctions = if (mlOpsEnabled) {
Expand Down Expand Up @@ -494,7 +507,7 @@ class QualificationAppInfo(
taskSpeedupFactor, info.sparkUser, info.startTime, origPlanInfos,
perSqlStageSummary.map(_.stageSum).flatten, estimatedInfo, perSqlInfos,
unSupportedExecs, unSupportedExprs, clusterTags, allClusterTagsMap, mlFunctions,
mlTotalStageDuration)
mlTotalStageDuration, unsupportedExecExprsMap)
}
}

Expand Down Expand Up @@ -708,6 +721,7 @@ case class QualificationSummaryInfo(
allClusterTagsMap: Map[String, String],
mlFunctions: Option[Seq[MLFunctions]],
mlFunctionsStageDurations: Option[Seq[MLFuncsStageDuration]],
unsupportedExecstoExprsMap: Map[String, String],
estimatedFrequency: Option[Long] = None)

case class StageQualSummaryInfo(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
App Name,App ID,Recommendation,Estimated GPU Speedup,Estimated GPU Duration,Estimated GPU Time Saved,SQL DF Duration,SQL Dataframe Task Duration,App Duration,GPU Opportunity,Executor CPU Time Percent,SQL Ids with Failures,Unsupported Read File Formats and Types,Unsupported Write Data Format,Complex Types,Nested Complex Types,Potential Problems,Longest SQL Duration,NONSQL Task Duration Plus Overhead,Unsupported Task Duration,Supported SQL DF Task Duration,Task Speedup Factor,App Duration Estimated,Unsupported Execs,Unsupported Expressions,Estimated Job Frequency (monthly)
"Spark shell","local-1626104300434","Not Recommended",1.01,129484.66,1619.33,2429,1469,131104,2429,88.35,"","","","struct<firstname:string,middlename:array<string>,lastname:string>;struct<current:struct<state:string,city:string>,previous:struct<state:map<string,string>,city:string>>;array<struct<city:string,state:string>>;map<string,string>;map<string,array<string>>;map<string,map<string,string>>;array<array<string>>;array<string>","struct<firstname:string,middlename:array<string>,lastname:string>;struct<current:struct<state:string,city:string>,previous:struct<state:map<string,string>,city:string>>;array<struct<city:string,state:string>>;map<string,array<string>>;map<string,map<string,string>>;array<array<string>>","NESTED COMPLEX TYPE",1260,128847,0,1469,3.0,false,"CollectLimit;ColumnarToRow","",30
"Spark shell","local-1626104300434","Not Recommended",1.01,129484.66,1619.33,2429,1469,131104,2429,88.35,"","","","struct<firstname:string,middlename:array<string>,lastname:string>;struct<current:struct<state:string,city:string>,previous:struct<state:map<string,string>,city:string>>;array<struct<city:string,state:string>>;map<string,string>;map<string,array<string>>;map<string,map<string,string>>;array<array<string>>;array<string>","struct<firstname:string,middlename:array<string>,lastname:string>;struct<current:struct<state:string,city:string>,previous:struct<state:map<string,string>,city:string>>;array<struct<city:string,state:string>>;map<string,array<string>>;map<string,map<string,string>>;array<array<string>>","NESTED COMPLEX TYPE",1260,128847,0,1469,3.0,false,"CollectLimit","",30
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
App Name,App ID,Recommendation,Estimated GPU Speedup,Estimated GPU Duration,Estimated GPU Time Saved,SQL DF Duration,SQL Dataframe Task Duration,App Duration,GPU Opportunity,Executor CPU Time Percent,SQL Ids with Failures,Unsupported Read File Formats and Types,Unsupported Write Data Format,Complex Types,Nested Complex Types,Potential Problems,Longest SQL Duration,NONSQL Task Duration Plus Overhead,Unsupported Task Duration,Supported SQL DF Task Duration,Task Speedup Factor,App Duration Estimated,Unsupported Execs,Unsupported Expressions,Estimated Job Frequency (monthly)
"Spark shell","app-20211019113801-0001","Not Recommended",1.0,569385.42,2581.57,3627,19894,571967,3503,28.41,"","JDBC[*]","","","","",1812,544575,677,19217,3.8,false,"Scan JDBCRelation(TBLS) [numPartitions=1] ;Execute CreateViewCommand;CollectLimit","",30
"Spark shell","app-20211019113801-0001","Not Recommended",1.0,569385.42,2581.57,3627,19894,571967,3503,28.41,"","JDBC[*]","","","","",1812,544575,677,19217,3.8,false,"Scan JDBCRelation(TBLS) [numPartitions=1];Execute CreateViewCommand;CollectLimit","",30
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
App Name,App ID,Recommendation,Estimated GPU Speedup,Estimated GPU Duration,Estimated GPU Time Saved,SQL DF Duration,SQL Dataframe Task Duration,App Duration,GPU Opportunity,Executor CPU Time Percent,SQL Ids with Failures,Unsupported Read File Formats and Types,Unsupported Write Data Format,Complex Types,Nested Complex Types,Potential Problems,Longest SQL Duration,NONSQL Task Duration Plus Overhead,Unsupported Task Duration,Supported SQL DF Task Duration,Task Speedup Factor,App Duration Estimated,Unsupported Execs,Unsupported Expressions,Estimated Job Frequency (monthly)
"TPC-DS Like Bench q86","app-20210319163812-1778","Not Applicable",1.36,19120.15,7050.84,9569,4320658,26171,9569,0.0,"24","","","","","",9565,3595714,0,4320658,3.8,false,"Execute CreateViewCommand;ColumnarToRow","",30
"TPC-DS Like Bench q86","app-20210319163812-1778","Not Applicable",1.36,19120.15,7050.84,9569,4320658,26171,9569,0.0,"24","","","","","",9565,3595714,0,4320658,3.8,false,"Execute CreateViewCommand","",30
Loading

0 comments on commit 2b02b25

Please sign in to comment.