Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-47618][CORE] Use Magic Committer for all S3 buckets by default #45740

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

dongjoon-hyun
Copy link
Member

@dongjoon-hyun dongjoon-hyun commented Mar 27, 2024

What changes were proposed in this pull request?

This PR aims to use Apache Hadoop Magic Committer for all S3 buckets by default in Apache Spark 4.0.0.

Why are the changes needed?

Apache Hadoop Magic Committer has been used for S3 buckets to get the best performance since S3 became fully consistent on December 1st, 2020.

Amazon S3 provides strong read-after-write consistency for PUT and DELETE requests of objects in your Amazon S3 bucket in all AWS Regions. This behavior applies to both writes to new objects as well as PUT requests that overwrite existing objects and DELETE requests. In addition, read operations on Amazon S3 Select, Amazon S3 access controls lists (ACLs), Amazon S3 Object Tags, and object metadata (for example, the HEAD object) are strongly consistent.

Does this PR introduce any user-facing change?

Yes, the migration guide is updated.

How was this patch tested?

Pass the CIs.

Was this patch authored or co-authored using generative AI tooling?

No.

@dongjoon-hyun
Copy link
Member Author

Hi, do you have any concern on using S3 Magic Committer by default, @steveloughran ?

@dongjoon-hyun
Copy link
Member Author

Also, cc @viirya

Copy link
Member

@viirya viirya left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems good to me. Of course, except for there are additional concerns from @steveloughran .

@dongjoon-hyun
Copy link
Member Author

Thank you, @viirya .

@dongjoon-hyun
Copy link
Member Author

All tests passed.

@@ -420,6 +420,14 @@ class SparkContext(config: SparkConf) extends Logging {
// HADOOP-19097 Set fs.s3a.connection.establish.timeout to 30s
// We can remove this after Apache Hadoop 3.4.1 releases
conf.setIfMissing("spark.hadoop.fs.s3a.connection.establish.timeout", "30s")
try {
// Try to enable Magic Committer by default for all buckets
Utils.classForName("org.apache.spark.internal.io.cloud.PathOutputCommitProtocol")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use Utils.classIsLoadable?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you, @yaooqinn .

@yaooqinn
Copy link
Member

LGTM from my side. Thank you @dongjoon-hyun

Comment on lines 424 to 429
if (Utils.classIsLoadable("org.apache.spark.internal.io.cloud.PathOutputCommitProtocol")) {
conf.setIfMissing("spark.hadoop.fs.s3a.bucket.*.committer.magic.enabled", "true")
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the class is not there, will setting the config cause any problem?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, at the first commit test on this PR, CI shows the following.

[info] - SPARK-23731 plans should be canonicalizable after being (de)serialized *** FAILED *** (53 milliseconds)
[info]   java.lang.ClassNotFoundException: org.apache.spark.internal.io.cloud.PathOutputCommitProtocol
[info]   at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:641)
[info]   at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:188)
[info]   at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:525)
[info]   at java.base/java.lang.Class.forName0(Native Method)
[info]   at java.base/java.lang.Class.forName(Class.java:467)
[info]   at org.apache.spark.util.SparkClassUtils.classForName(SparkClassUtils.scala:41)
[info]   at org.apache.spark.util.SparkClassUtils.classForName$(SparkClassUtils.scala:36)
[info]   at org.apache.spark.util.Utils$.classForName(Utils.scala:97)
[info]   at org.apache.spark.internal.io.FileCommitProtocol$.instantiate(FileCommitProtocol.scala:213)
[info]   at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:115)

@dongjoon-hyun
Copy link
Member Author

Thank you for review, @yaooqinn and @viirya .

I might overlook the other object storage side-effect cases. I'll convert this PR to Draft and test with Google Cloud Storage at least.

@dongjoon-hyun dongjoon-hyun marked this pull request as draft March 28, 2024 03:48
@steveloughran
Copy link
Contributor

I have no problems with the PR; we have made it the default in our releases.

This could be a good time to revisit "why there's some separate PathOutputCommitter" stuff; originally it was because spark built against releases without the new PathOutputCommitter interface. This no longer holds: could anything needed from it be pulled up into the main committer?

One recurrent troublespot we have with committing work is parquet; it requires all committers to be a subclass of ParquetOutputCommitter, hence the (ugly, brittle) wrapping stuff. Life will be a lot easier if parquet didn't mind if it was any PathOutputCommitter -it would just skip the schema writing.

Of course, we then come up against the fact that parquet still wants to build against hadoop 2.8. Everyone needs to move on, especially as hadoop java11+ support is 3.2.x+ only.

@dongjoon-hyun
Copy link
Member Author

Thank you for your feedback, @steveloughran . Ya, as you mentioned, this is blocked by exactly those two configurations.

spark.sql.parquet.output.committer.class=org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter
spark.sql.sources.commitProtocolClass=org.apache.spark.internal.io.cloud.PathOutputCommitProtocol

@steveloughran
Copy link
Contributor

What code is doing the instanceof check? it's in parquet. correct? as unless its been told to save a summary, it shouldn't care...

@dongjoon-hyun
Copy link
Member Author

Oh, I must be clear.

Unlike spark.hadoop.fs.s3a.* configuration, the following are applied to all FS globally. We cannot do that for non-S3A filesystem. That's the side-effect I need to remove in this PR.

spark.sql.parquet.output.committer.class=org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter
spark.sql.sources.commitProtocolClass=org.apache.spark.internal.io.cloud.PathOutputCommitProtocol

@steveloughran
Copy link
Contributor

So both those bindings hand off to PathOutputCommitterFactory(), which looks for a committer from the config key mapreduce.outputcommitter.factory.class
FileOutputCommitterFactory: classic committer
NamedCommitterFactory: class in mapreduce.outputcommitter.named.classname

then fallback to mapreduce.outputcommitter.factory.scheme.SCHEMA factory definition.

The idea being; you get an fs specific one unless asked for.

  • the parquet one is there because parquet is fussy about its committer subclasses; that should be reviewed (where?)
  • and PathOutputCommitProtocol is probably surplus now that spark can use PathOutputCommitter everywhere...

Copy link

We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!

@dongjoon-hyun
Copy link
Member Author

I removed Stale tag.

Copy link

github-actions bot commented Nov 7, 2024

We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants