From b63fe212f7550673cee61353e9e5cb061ad4c53d Mon Sep 17 00:00:00 2001
From: "Ahmed Hussein (amahussein)"
Date: Tue, 4 Feb 2025 10:07:19 -0600
Subject: [PATCH 1/2] Use a stub to store Spark StageInfo
Signed-off-by: Ahmed Hussein (amahussein)
Fixes #1524
This commit uses a smaller class `StageInfoStub` to store Spark's
StageInfo. This class is common between all the spark implementations
but it has more fields to the constructor across different versions.
Currently we only use a subset of the class fields. The remaining fields
represent an overhead or redundant storage; especially when it comes to
store the accumulables and taskMetrics for each stage.
To evaluate the memory optimization, a new `Checkpoint` mechanism was
added to allow gathering information at separate stages of the
execution.
The `checkpoint` design and implementation can be further improved and
extended to build a performance profile to compare different tradeoffs.
---
core/pom.xml | 1 +
.../main/resources/configs/build.properties | 3 +-
.../benchmarks/DevRuntimeCheckpoint.scala | 37 +++++++++++++
.../benchmarks/NoOpRuntimeCheckpoint.scala | 29 ++++++++++
.../benchmarks/RuntimeCheckpointTrait.scala | 29 ++++++++++
.../tool/benchmarks/RuntimeInjector.scala | 44 +++++++++++++++
.../spark/sql/rapids/tool/AppBase.scala | 2 +
.../sql/rapids/tool/store/StageModel.scala | 9 ++--
.../sql/rapids/tool/util/RuntimeUtil.scala | 14 ++++-
.../tool/util/stubs/StageInfoStub.scala | 53 +++++++++++++++++++
10 files changed, 215 insertions(+), 6 deletions(-)
create mode 100644 core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/DevRuntimeCheckpoint.scala
create mode 100644 core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/NoOpRuntimeCheckpoint.scala
create mode 100644 core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/RuntimeCheckpointTrait.scala
create mode 100644 core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/RuntimeInjector.scala
create mode 100644 core/src/main/scala/org/apache/spark/sql/rapids/tool/util/stubs/StageInfoStub.scala
diff --git a/core/pom.xml b/core/pom.xml
index 77157830c..071142bf2 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -417,6 +417,7 @@
3.9.0
-Xlint:all,-serial,-path,-try
com.nvidia.shaded.spark
+ noOp
1.16.1
1.0.1
diff --git a/core/src/main/resources/configs/build.properties b/core/src/main/resources/configs/build.properties
index 260cfed6e..d19852da4 100644
--- a/core/src/main/resources/configs/build.properties
+++ b/core/src/main/resources/configs/build.properties
@@ -1,5 +1,5 @@
#
-# Copyright (c) 2024, NVIDIA CORPORATION.
+# Copyright (c) 2024-2025, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -23,3 +23,4 @@ build.spark.version=${spark.version}
build.hadoop.version=${hadoop.version}
build.java.version=${java.version}
build.scala.version=${scala.version}
+build.benchmarks.checkpoints=${benchmarks.checkpoints}
diff --git a/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/DevRuntimeCheckpoint.scala b/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/DevRuntimeCheckpoint.scala
new file mode 100644
index 000000000..3787b4220
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/DevRuntimeCheckpoint.scala
@@ -0,0 +1,37 @@
+/*
+ * Copyright (c) 2025, NVIDIA CORPORATION.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rapids.tool.benchmarks
+
+import org.apache.spark.sql.rapids.tool.util.RuntimeUtil
+
+/**
+ * A simple implementation to insert checkpoints during runtime to pull some performance metrics
+ * related to Tools. This is disabled by default and can be enabled by setting the build
+ * property `benchmarks.checkpoints`.
+ */
+class DevRuntimeCheckpoint extends RuntimeCheckpointTrait {
+ /**
+ * Insert a memory marker with the given label. This will print the memory information.
+ * @param label the label for the memory marker
+ */
+ override def insertMemoryMarker(label: String): Unit = {
+ val memoryInfo = RuntimeUtil.getJVMHeapInfo(runGC = true)
+ // scalastyle:off println
+ println(s"Memory Marker: $label, ${memoryInfo.mkString("\n")}")
+ // scalastyle:oon println
+ }
+}
diff --git a/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/NoOpRuntimeCheckpoint.scala b/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/NoOpRuntimeCheckpoint.scala
new file mode 100644
index 000000000..8bd4d33da
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/NoOpRuntimeCheckpoint.scala
@@ -0,0 +1,29 @@
+/*
+ * Copyright (c) 2025, NVIDIA CORPORATION.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rapids.tool.benchmarks
+
+import scala.annotation.nowarn
+
+/**
+ * An empty implementation of the Checkpoint interface that inserts NoOps.
+ * This is the default implementation that will be used in production and normal builds.
+ */
+class NoOpRuntimeCheckpoint extends RuntimeCheckpointTrait {
+ override def insertMemoryMarker(@nowarn label: String): Unit = {
+ // Do nothing. This is a noOp
+ }
+}
diff --git a/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/RuntimeCheckpointTrait.scala b/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/RuntimeCheckpointTrait.scala
new file mode 100644
index 000000000..136f676bb
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/RuntimeCheckpointTrait.scala
@@ -0,0 +1,29 @@
+/*
+ * Copyright (c) 2025, NVIDIA CORPORATION.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rapids.tool.benchmarks
+
+/**
+ * API for inserting checkpoints in runtime.
+ * This is used for debugging and benchmarking purposes.
+ */
+trait RuntimeCheckpointTrait {
+ /**
+ * Insert a memory marker with the given label.
+ * @param label the label for the memory marker
+ */
+ def insertMemoryMarker(label: String): Unit
+}
diff --git a/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/RuntimeInjector.scala b/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/RuntimeInjector.scala
new file mode 100644
index 000000000..33d4dd329
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/RuntimeInjector.scala
@@ -0,0 +1,44 @@
+/*
+ * Copyright (c) 2025, NVIDIA CORPORATION.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rapids.tool.benchmarks
+
+import org.apache.spark.sql.rapids.tool.util.RapidsToolsConfUtil
+
+/**
+ * The global runtime injector that will be used to insert checkpoints during runtime.
+ * This is used to pull some performance metrics related to Tools.
+ */
+object RuntimeInjector extends RuntimeCheckpointTrait {
+ /**
+ * Initializes the runtime injector based on the build properties "benchmarks.checkpoints".
+ * @return the runtime injector
+ */
+ private def loadRuntimeCheckPoint(): RuntimeCheckpointTrait = {
+ val buildProps = RapidsToolsConfUtil.loadBuildProperties
+ if (buildProps.getProperty("build.benchmarks.checkpoints").contains("dev")) {
+ // The benchmark injection is enabled.
+ new DevRuntimeCheckpoint
+ } else { // loads the noOp implementation by default
+ new NoOpRuntimeCheckpoint
+ }
+ }
+ private lazy val runtimeCheckpoint: RuntimeCheckpointTrait = loadRuntimeCheckPoint()
+
+ override def insertMemoryMarker(label: String): Unit = {
+ runtimeCheckpoint.insertMemoryMarker(label)
+ }
+}
diff --git a/core/src/main/scala/org/apache/spark/sql/rapids/tool/AppBase.scala b/core/src/main/scala/org/apache/spark/sql/rapids/tool/AppBase.scala
index 432f3b1c9..bcc4d22ac 100644
--- a/core/src/main/scala/org/apache/spark/sql/rapids/tool/AppBase.scala
+++ b/core/src/main/scala/org/apache/spark/sql/rapids/tool/AppBase.scala
@@ -33,6 +33,7 @@ import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.deploy.history.{EventLogFileReader, EventLogFileWriter}
import org.apache.spark.internal.Logging
+import org.apache.spark.rapids.tool.benchmarks.RuntimeInjector
import org.apache.spark.scheduler.{SparkListenerEvent, StageInfo}
import org.apache.spark.sql.execution.SparkPlanInfo
import org.apache.spark.sql.execution.ui.SparkPlanGraphNode
@@ -492,6 +493,7 @@ abstract class AppBase(
def processEvents(): Unit = {
processEventsInternal()
postCompletion()
+ RuntimeInjector.insertMemoryMarker("Post processing events")
}
/**
diff --git a/core/src/main/scala/org/apache/spark/sql/rapids/tool/store/StageModel.scala b/core/src/main/scala/org/apache/spark/sql/rapids/tool/store/StageModel.scala
index 724585702..79a8d0813 100644
--- a/core/src/main/scala/org/apache/spark/sql/rapids/tool/store/StageModel.scala
+++ b/core/src/main/scala/org/apache/spark/sql/rapids/tool/store/StageModel.scala
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2024, NVIDIA CORPORATION.
+ * Copyright (c) 2024-2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -20,6 +20,7 @@ import com.nvidia.spark.rapids.tool.profiling.ProfileUtils
import org.apache.spark.scheduler.StageInfo
import org.apache.spark.sql.rapids.tool.annotation.{Calculated, Since, WallClock}
+import org.apache.spark.sql.rapids.tool.util.stubs.StageInfoStub
/**
* StageModel is a class to store the information of a stage.
@@ -31,7 +32,7 @@ import org.apache.spark.sql.rapids.tool.annotation.{Calculated, Since, WallClock
@Since("24.02.3")
class StageModel private(sInfo: StageInfo) {
- var stageInfo: StageInfo = _
+ var stageInfo: StageInfoStub = _
updateInfo(sInfo)
/**
@@ -39,8 +40,8 @@ class StageModel private(sInfo: StageInfo) {
* @return a new StageInfo object.
* TODO: https://github.com/NVIDIA/spark-rapids-tools/issues/1260
*/
- private def initStageInfo(newStageInfo: StageInfo): StageInfo = {
- newStageInfo
+ private def initStageInfo(newStageInfo: StageInfo): StageInfoStub = {
+ StageInfoStub.fromStageInfo(newStageInfo)
}
@WallClock
diff --git a/core/src/main/scala/org/apache/spark/sql/rapids/tool/util/RuntimeUtil.scala b/core/src/main/scala/org/apache/spark/sql/rapids/tool/util/RuntimeUtil.scala
index a5babf35d..f8522aa0b 100644
--- a/core/src/main/scala/org/apache/spark/sql/rapids/tool/util/RuntimeUtil.scala
+++ b/core/src/main/scala/org/apache/spark/sql/rapids/tool/util/RuntimeUtil.scala
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2024, NVIDIA CORPORATION.
+ * Copyright (c) 2024-2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -103,4 +103,16 @@ object RuntimeUtil extends Logging {
}
}.toMap
}
+
+ def getJVMHeapInfo(runGC: Boolean = true): Map[String, String] = {
+ if (runGC) {
+ System.gc()
+ }
+ val runtime = Runtime.getRuntime
+ Map(
+ "jvm.heap.max" -> runtime.maxMemory().toString,
+ "jvm.heap.total" -> runtime.totalMemory().toString,
+ "jvm.heap.free" -> runtime.freeMemory().toString
+ )
+ }
}
diff --git a/core/src/main/scala/org/apache/spark/sql/rapids/tool/util/stubs/StageInfoStub.scala b/core/src/main/scala/org/apache/spark/sql/rapids/tool/util/stubs/StageInfoStub.scala
new file mode 100644
index 000000000..9d7277b1d
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/sql/rapids/tool/util/stubs/StageInfoStub.scala
@@ -0,0 +1,53 @@
+/*
+ * Copyright (c) 2025, NVIDIA CORPORATION.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.rapids.tool.util.stubs
+
+import org.apache.spark.scheduler.StageInfo
+import org.apache.spark.sql.rapids.tool.annotation.ToolsReflection
+
+@ToolsReflection("Common",
+ "StageInfo is a common class used in all versions of Spark but the constructor signature is" +
+ " different across versions.")
+case class StageInfoStub(
+ stageId: Int,
+ attemptId: Int,
+ name: String,
+ numTasks: Int,
+ details: String,
+ /** When this stage was submitted from the DAGScheduler to a TaskScheduler. */
+ submissionTime: Option[Long] = None,
+ /** Time when the stage completed or when the stage was cancelled. */
+ completionTime: Option[Long] = None,
+ /** If the stage failed, the reason why. */
+ failureReason: Option[String] = None) {
+
+ def attemptNumber(): Int = attemptId
+}
+
+object StageInfoStub {
+ def fromStageInfo(stageInfo: StageInfo): StageInfoStub = {
+ StageInfoStub(
+ stageInfo.stageId,
+ stageInfo.attemptNumber(),
+ stageInfo.name,
+ stageInfo.numTasks,
+ stageInfo.details,
+ stageInfo.submissionTime,
+ stageInfo.completionTime,
+ stageInfo.failureReason)
+ }
+}
From 8f2122ae977c1cd91f1ccc5bd2045a28dbab93a1 Mon Sep 17 00:00:00 2001
From: "Ahmed Hussein (amahussein)"
Date: Wed, 5 Feb 2025 09:03:48 -0600
Subject: [PATCH 2/2] Fix typo in DevRuntimeCheckpoint
Signed-off-by: Ahmed Hussein (amahussein)
---
.../spark/rapids/tool/benchmarks/DevRuntimeCheckpoint.scala | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/DevRuntimeCheckpoint.scala b/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/DevRuntimeCheckpoint.scala
index 3787b4220..01c89a25d 100644
--- a/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/DevRuntimeCheckpoint.scala
+++ b/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/DevRuntimeCheckpoint.scala
@@ -32,6 +32,6 @@ class DevRuntimeCheckpoint extends RuntimeCheckpointTrait {
val memoryInfo = RuntimeUtil.getJVMHeapInfo(runGC = true)
// scalastyle:off println
println(s"Memory Marker: $label, ${memoryInfo.mkString("\n")}")
- // scalastyle:oon println
+ // scalastyle:on println
}
}