-
Notifications
You must be signed in to change notification settings - Fork 41
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Use a stub to store Spark StageInfo #1525
Conversation
Signed-off-by: Ahmed Hussein (amahussein) <[email protected]> Fixes NVIDIA#1524 This commit uses a smaller class `StageInfoStub` to store Spark's StageInfo. This class is common between all the spark implementations but it has more fields to the constructor across different versions. Currently we only use a subset of the class fields. The remaining fields represent an overhead or redundant storage; especially when it comes to store the accumulables and taskMetrics for each stage. To evaluate the memory optimization, a new `Checkpoint` mechanism was added to allow gathering information at separate stages of the execution. The `checkpoint` design and implementation can be further improved and extended to build a performance profile to compare different tradeoffs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @amahussein. A minor typo. This framework for logging memory used looks great.
core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/DevRuntimeCheckpoint.scala
Outdated
Show resolved
Hide resolved
Signed-off-by: Ahmed Hussein (amahussein) <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @parthosa
core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/DevRuntimeCheckpoint.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @amahussein. LGTME.
private def initStageInfo(newStageInfo: StageInfo): StageInfo = { | ||
newStageInfo | ||
private def initStageInfo(newStageInfo: StageInfo): StageInfoStub = { | ||
StageInfoStub.fromStageInfo(newStageInfo) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@amahussein Currently we are reassigning the StageInfo object and updating the StageModel class with the incoming StageInfo object.
Now that we are using a Stub and creating a new object, can we not use the existing Stub object in case of updates to StageModel and just update its variables. Currently we are doing a new Stub allocation in all the cases.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
mmm, yeah it is possible we do only update to some fields that get changed.
The idea that:
- This update should only happens a single time when the stage is completed. This implies that this is not very frequent event.
- Updating some fields could lead to bugs. When we extend this object in the future, the dev will have to make sure that they are handling the fields correctly (which one could be updated vs which one are not).
- allocating the new object in that case made the code look easier especially to maintain moving fwd.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @sayedbilalbari
private def initStageInfo(newStageInfo: StageInfo): StageInfo = { | ||
newStageInfo | ||
private def initStageInfo(newStageInfo: StageInfo): StageInfoStub = { | ||
StageInfoStub.fromStageInfo(newStageInfo) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
mmm, yeah it is possible we do only update to some fields that get changed.
The idea that:
- This update should only happens a single time when the stage is completed. This implies that this is not very frequent event.
- Updating some fields could lead to bugs. When we extend this object in the future, the dev will have to make sure that they are handling the fields correctly (which one could be updated vs which one are not).
- allocating the new object in that case made the code look easier especially to maintain moving fwd.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @amahussein! A very minor nit.
} else { // loads the noOp implementation by default | ||
new NoOpRuntimeCheckpoint | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: new line between 2 defs
Signed-off-by: Ahmed Hussein (amahussein) [email protected]
Fixes #1524
This commit uses a smaller class
StageInfoStub
to store Spark's StageInfo. This class is common between all the spark implementations but it has more fields to the constructor across different versions.Currently we only use a subset of the class fields. The remaining fields represent an overhead or redundant storage; especially when it comes to store the accumulables and taskMetrics for each stage.
To evaluate the memory optimization, a new
Checkpoint
mechanism was added to allow gathering information at separate stages of the execution.The
checkpoint
design and implementation can be further improved and extended to build a performance profile to compare different tradeoffs.This pull request introduces a new runtime checkpointing feature for performance benchmarking and debugging in the RAPIDS tools. The main changes include adding new classes for runtime checkpoints, updating build properties, and modifying existing classes to integrate the new checkpointing functionality.
Memory evaluation:
benchmarks.checkpoints
todev
. This is achieved by changing the pom.xml file or passing the property as an arhument to the mvn command.Code Changes
Main core changes
core/src/main/scala/org/apache/spark/sql/rapids/tool/util/stubs/StageInfoStub.scala
: Added a new classStageInfoStub
to provide a consistent interface forStageInfo
across different Spark versions.core/src/main/scala/org/apache/spark/sql/rapids/tool/store/StageModel.scala
: UpdatedStageModel
to useStageInfoStub
for compatibility with different Spark versions. [1] [2] [3]Build and Configuration Updates:
core/pom.xml
: Added a new propertybenchmarks.checkpoints
to manage the checkpointing feature.core/src/main/resources/configs/build.properties
: Updated build properties to includebenchmarks.checkpoints
. [1] [2]Runtime Checkpointing Feature:
core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/DevRuntimeCheckpoint.scala
: Added a new classDevRuntimeCheckpoint
to insert memory markers and print memory information during runtime for performance metrics.core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/NoOpRuntimeCheckpoint.scala
: Added a new classNoOpRuntimeCheckpoint
as a default no-operation implementation for checkpoints.core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/RuntimeCheckpointTrait.scala
: Introduced a new traitRuntimeCheckpointTrait
defining the API for inserting runtime checkpoints.core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/RuntimeInjector.scala
: Added a new objectRuntimeInjector
to manage and insert runtime checkpoints based on build properties.Integration of Checkpoints:
core/src/main/scala/org/apache/spark/sql/rapids/tool/AppBase.scala
: IntegratedRuntimeInjector
to insert a memory marker after processing events. [1] [2]core/src/main/scala/org/apache/spark/sql/rapids/tool/util/RuntimeUtil.scala
: Added a methodgetJVMHeapInfo
to retrieve JVM heap information. [1] [2]