Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle invalid cluster recommendation for Dataproc #1537

Draft
wants to merge 2 commits into
base: dev
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 35 additions & 10 deletions core/src/main/scala/com/nvidia/spark/rapids/tool/Platform.scala
Original file line number Diff line number Diff line change
Expand Up @@ -457,10 +457,10 @@ abstract class Platform(var gpuDevice: Option[GpuDevice],
*
* @param sparkProperties A map of Spark properties (combined from application and
* cluster properties)
* @return Optional `RecommendedClusterInfo` containing the GPU cluster configuration
* recommendation.
* @return Either a failure message or the recommended cluster configuration
*/
def createRecommendedGpuClusterInfo(sparkProperties: Map[String, String]): Unit = {
def createRecommendedGpuClusterInfo(
sparkProperties: Map[String, String]): Either[String, RecommendedClusterInfo] = {
// Get the appropriate cluster configuration strategy (either
// 'ClusterPropertyBasedStrategy' based on cluster properties or
// 'EventLogBasedStrategy' based on the event log).
Expand All @@ -485,8 +485,7 @@ abstract class Platform(var gpuDevice: Option[GpuDevice],
}

val dynamicAllocSettings = Platform.getDynamicAllocationSettings(sparkProperties)
recommendedNodeInstanceInfo = Some(recommendedNodeInstance)
recommendedClusterInfo = Some(RecommendedClusterInfo(
val recommendedCluster = RecommendedClusterInfo(
vendor = vendor,
coresPerExecutor = clusterConfig.coresPerExec,
numWorkerNodes = numWorkerNodes,
Expand All @@ -498,21 +497,36 @@ abstract class Platform(var gpuDevice: Option[GpuDevice],
dynamicAllocationMinExecutors = dynamicAllocSettings.min,
dynamicAllocationInitialExecutors = dynamicAllocSettings.initial,
workerNodeType = Some(recommendedNodeInstance.name)
))
)

validateRecommendedCluster(recommendedCluster).map { validCluster =>
recommendedNodeInstanceInfo = Some(recommendedNodeInstance)
recommendedClusterInfo = Some(validCluster)
validCluster
}

case None =>
logWarning("Failed to generate a cluster recommendation. " +
"Could not determine number of executors. " +
Left("Could not determine number of executors. " +
"Check the Spark properties used for this application or " +
"cluster properties (if provided).")
}

case None =>
logWarning("Failed to generate a cluster recommendation. " +
"Could not determine number of executors. " +
Left("Could not determine number of executors. " +
"Cluster properties are missing and event log does not contain cluster information.")
}
}

/**
* Validates the recommended cluster configuration. This can be overridden by
* subclasses to provide platform-specific validation.
* @param recommendedClusterInfo Recommended cluster configuration
* @return Either a failure message or the valid recommended cluster configuration
*/
protected def validateRecommendedCluster(
recommendedClusterInfo: RecommendedClusterInfo): Either[String, RecommendedClusterInfo] = {
Right(recommendedClusterInfo)
}
}

abstract class DatabricksPlatform(gpuDevice: Option[GpuDevice],
Expand Down Expand Up @@ -592,10 +606,21 @@ class DataprocPlatform(gpuDevice: Option[GpuDevice],
override val defaultGpuDevice: GpuDevice = T4Gpu
override def isPlatformCSP: Boolean = true
override def maxGpusSupported: Int = 4
private val minWorkerNodes = 2

override def getInstanceByResourcesMap: Map[(Int, Int), InstanceInfo] = {
PlatformInstanceTypes.DATAPROC_BY_GPUS_CORES
}

override def validateRecommendedCluster(
recommendedClusterInfo: RecommendedClusterInfo): Either[String, RecommendedClusterInfo] = {
if (recommendedClusterInfo.numWorkerNodes < minWorkerNodes) {
Left(s"Requested number of worker nodes (${recommendedClusterInfo.numWorkerNodes}) " +
s"is less than the minimum required ($minWorkerNodes) by the platform.")
} else {
Right(recommendedClusterInfo)
}
}
}

class DataprocServerlessPlatform(gpuDevice: Option[GpuDevice],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -379,12 +379,16 @@ class AutoTuner(
* Returns None if the platform doesn't support specific instance types.
*/
private def configureGPURecommendedInstanceType(): Unit = {
platform.createRecommendedGpuClusterInfo(getAllProperties.toMap)
platform.recommendedClusterInfo.foreach { gpuClusterRec =>
appendRecommendation("spark.executor.cores", gpuClusterRec.coresPerExecutor)
if (gpuClusterRec.numExecutors > 0) {
appendRecommendation("spark.executor.instances", gpuClusterRec.numExecutors)
}
platform.createRecommendedGpuClusterInfo(getAllProperties.toMap) match {
case Right(gpuClusterRec) =>
appendRecommendation("spark.executor.cores", gpuClusterRec.coresPerExecutor)
if (gpuClusterRec.numExecutors > 0) {
appendRecommendation("spark.executor.instances", gpuClusterRec.numExecutors)
}
case Left(reason) =>
val errorMsg = "Failed to generate a cluster recommendation. Reason: " + reason
logWarning(errorMsg)
appendComment(errorMsg)
}
}

Expand Down
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -1619,7 +1619,12 @@ class QualificationSuite extends BaseTestSuite {
expectedClusterInfoMap.foreach { case (eventlogPath, expectedClusterInfo) =>
test(s"test cluster information JSON - $eventlogPath") {
val logFile = s"$logDir/cluster_information/$eventlogPath"
runQualificationAndTestClusterInfo(logFile, PlatformNames.DEFAULT, expectedClusterInfo)
val actualClusterInfo =
runQualificationAndGetClusterSummary(logFile, PlatformNames.DEFAULT)
.flatMap(_.clusterInfo)
assert(actualClusterInfo == expectedClusterInfo,
s"Actual cluster info does not match the expected cluster info. " +
s"Expected: $expectedClusterInfo, Actual: $actualClusterInfo")
}
}

Expand Down Expand Up @@ -1688,16 +1693,22 @@ class QualificationSuite extends BaseTestSuite {

expectedPlatformClusterInfoMap.foreach { case (platform, expectedClusterInfo) =>
test(s"test cluster information JSON for platform - $platform ") {
val logFile = s"$logDir/cluster_information/platform/$platform"
runQualificationAndTestClusterInfo(logFile, platform, Some(expectedClusterInfo))
val logFile = s"$logDir/cluster_information/platform/valid/$platform"
val actualClusterInfo =
runQualificationAndGetClusterSummary(logFile, platform)
.flatMap(_.clusterInfo)
assert(actualClusterInfo.contains(expectedClusterInfo),
s"Actual cluster info does not match the expected cluster info. " +
s"Expected: $expectedClusterInfo, Actual: $actualClusterInfo")
}
}

/**
* Runs the qualification tool and verifies cluster information against expected values.
* Runs the qualification tool and returns the cluster summary.
*/
private def runQualificationAndTestClusterInfo(eventlogPath: String, platform: String,
expectedClusterInfo: Option[ExistingClusterInfo]): Unit = {
private def runQualificationAndGetClusterSummary(
eventlogPath: String, platform: String): Option[ClusterSummary] = {
var clusterSummary: Option[ClusterSummary] = None
TrampolineUtil.withTempDir { outPath =>
val baseArgs = Array("--output-directory", outPath.getAbsolutePath, "--platform", platform)
val appArgs = new QualificationArgs(baseArgs :+ eventlogPath)
Expand All @@ -1714,10 +1725,9 @@ class QualificationSuite extends BaseTestSuite {
// Read output JSON and create a set of (event log, cluster info)
val outputResultFile = s"$outPath/${QualOutputWriter.LOGFILE_NAME}/" +
s"${QualOutputWriter.LOGFILE_NAME}_cluster_information.json"
val actualClusterInfo = readJson(outputResultFile).headOption.flatMap(_.clusterInfo)
assert(actualClusterInfo == expectedClusterInfo,
"Actual cluster info does not match the expected cluster info.")
clusterSummary = readJson(outputResultFile).headOption
}
clusterSummary
}

test("test cluster information generation is disabled") {
Expand All @@ -1740,6 +1750,18 @@ class QualificationSuite extends BaseTestSuite {
}
}

// TODO: This should be extended for validating the recommended cluster information
// for other platforms.
test(s"test invalid recommended num workers for platform - dataproc") {
val logFile = s"$logDir/cluster_information/platform/invalid/dataproc_invalid_num_workers.zstd"
val actualRecommendedClusterInfo =
runQualificationAndGetClusterSummary(logFile, PlatformNames.DATAPROC)
.flatMap(_.recommendedClusterInfo)
assert(actualRecommendedClusterInfo.isEmpty,
"Recommended cluster info is expected to be empty. " +
s"Actual: $actualRecommendedClusterInfo")
}

test("test status report generation for wildcard event log") {
val logFiles = Array(
s"$logDir/cluster_information/eventlog_3node*") // correct wildcard event log with 3 matches
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ class ProfilingAutoTunerSuite extends BaseAutoTunerSuite {
|- 'spark.sql.files.maxPartitionBytes' was not set.
|- 'spark.task.resource.gpu.amount' should be set to 0.001.
|- Could not infer the cluster configuration, recommendations are generated using default values!
|- Failed to generate a cluster recommendation. Reason: Could not determine number of executors. Cluster properties are missing and event log does not contain cluster information.
|- ${ProfilingAutoTunerConfigsProvider.classPathComments("rapids.jars.missing")}
|- ${ProfilingAutoTunerConfigsProvider.classPathComments("rapids.shuffle.jars")}
|""".stripMargin
Expand Down Expand Up @@ -187,6 +188,7 @@ class ProfilingAutoTunerSuite extends BaseAutoTunerSuite {
|- 'spark.sql.files.maxPartitionBytes' was not set.
|- 'spark.task.resource.gpu.amount' should be set to 0.001.
|- Could not infer the cluster configuration, recommendations are generated using default values!
|- Failed to generate a cluster recommendation. Reason: Could not determine number of executors. Cluster properties are missing and event log does not contain cluster information.
|- ${ProfilingAutoTunerConfigsProvider.classPathComments("rapids.jars.missing")}
|- ${ProfilingAutoTunerConfigsProvider.classPathComments("rapids.shuffle.jars")}
|""".stripMargin
Expand Down Expand Up @@ -241,6 +243,7 @@ We recommend using nodes/workers with more memory. Need at least 17496MB memory.
|- 'spark.sql.files.maxPartitionBytes' was not set.
|- 'spark.task.resource.gpu.amount' should be set to 0.001.
|- Could not infer the cluster configuration, recommendations are generated using default values!
|- Failed to generate a cluster recommendation. Reason: Could not determine number of executors. Cluster properties are missing and event log does not contain cluster information.
|- ${ProfilingAutoTunerConfigsProvider.classPathComments("rapids.jars.missing")}
|- ${ProfilingAutoTunerConfigsProvider.classPathComments("rapids.shuffle.jars")}
|""".stripMargin
Expand Down