Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(WIP) Update Spark Optimization Practices for latest releases #29

Open
secretazianman opened this issue Jun 11, 2024 · 1 comment
Open

Comments

@secretazianman
Copy link

Few notable changes:

Managed Scaling

  • As of EMR 6.11.0/Hadoop 3.3.3, EMR Scale down is not longer Spark shuffle or cache aware with default settings
    • Set yarn.resourcemanager.decommissioning-nodes-watcher.wait-for-applications = True

Spot Instances

  • Spot Instances + Spark Caching + Intermediate Tables
    • A lost rdd block results in entire cache being recomputed, this can be expensive. Without caching, lost spark data can be incrementally recomputed, so may be better to not cache
    • Consider only using on-demand if caching necessary
    • Consider storing intermediate tables in HDFS or S3 instead. Consider cost of reading/writing/storing intermediate data vs re-computing intermediate data.
    • Caching will cause spark evaluation at that point in time, potentially losing improvements from sql optimizer which occur when DAG is analyzed in entirety

AQE

  • Since EMR 6.6/Spark 3.2, default settings force AQE to run in legacy behavior, "to avoid performance regression when enabling adaptive query execution"

    • Enable AQE by setting spark.sql.adaptive.coalescePartitions.parallelismFirst = false
    • Now, in the Query plan you should see "AQEShuffleRead coalesced" if it is working
  • Optimizing AQE

    • Set spark.sql.adaptive.coalescePartitions.initialPartitionNum to large number, such as 10x what you might set spark.sql.shuffle.partitions to. This allows AQE to have small enough initial partitions to optimize them using the advisoryPartitionSizeInBytes setting.
    • Set spark.sql.adaptive.advisoryPartitionSizeInBytes by analyzing the resulting Task memory pressure on the Executor, consider increasing the value if memory is underutilized
    • Optimization Example
      • Environment Setup
        • Instance Choice: r6.4xlarge
        • Core Units: 64 units
        • Task Units: 500 units
        • Spark Executor Memory: 32GB
        • Spark Executor Cores: 5
        • Spark.sql.adaptive.coalescePartitions.initialPartitionNum: 100,000
        • Dataset: S3 - 523GB - 2,700 files - Orc+Snappy - 4,584,646,650 rows
        • Spark Query performs wide joins
    • Spark Shuffle = 10,000 and AQE Disabled
      • TODO
    • AQE Enabled and advisoryPartitionSizeInBytes=64MB
      • TODO
    • AQE Enabled and advisoryPartitionSizeInBytes=256MB
      • TODO
    • AQE Enabled and advisoryPartitionSizeInBytes=512MB
      • TODO
@mattliemAWS
Copy link
Contributor

Thanks! Your notes around spark + spot are super useful. let me figure out a way to incorporate some of these recommendations.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants