Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve S3 upload speeds using aws transfer manager #17674

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,33 @@ public class S3StorageConfig
@JsonProperty("sse")
private final ServerSideEncryption serverSideEncryption;

/**
* S3 transfer config.
*
* @see S3StorageDruidModule#configure
*/
@JsonProperty("transfer")
private final S3TransferConfig s3TransferConfig;

@JsonCreator
public S3StorageConfig(
@JsonProperty("sse") ServerSideEncryption serverSideEncryption
@JsonProperty("sse") ServerSideEncryption serverSideEncryption,
@JsonProperty("transfer") S3TransferConfig s3TransferConfig
)
{
this.serverSideEncryption = serverSideEncryption == null ? new NoopServerSideEncryption() : serverSideEncryption;
this.s3TransferConfig = s3TransferConfig == null ? new S3TransferConfig() : s3TransferConfig;
}

@JsonProperty("sse")
public ServerSideEncryption getServerSideEncryption()
{
return serverSideEncryption;
}

@JsonProperty("transfer")
public S3TransferConfig getS3TransferConfig()
{
return s3TransferConfig;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.storage.s3;

import com.fasterxml.jackson.annotation.JsonProperty;

import javax.validation.constraints.Min;

/**
*/
public class S3TransferConfig
{
@JsonProperty
private boolean useTransferManager = false;

@JsonProperty
@Min(1)
private long minimumUploadPartSize = 5 * 1024 * 1024L;

@JsonProperty
@Min(1)
private long multipartUploadThreshold = 5 * 1024 * 1024L;

public void setUseTransferManager(boolean useTransferManager)
{
this.useTransferManager = useTransferManager;
}

public void setMinimumUploadPartSize(long minimumUploadPartSize)
{
this.minimumUploadPartSize = minimumUploadPartSize;
}

public void setMultipartUploadThreshold(long multipartUploadThreshold)
{
this.multipartUploadThreshold = multipartUploadThreshold;
}

public boolean isUseTransferManager()
{
return useTransferManager;
}

public long getMinimumUploadPartSize()
{
return minimumUploadPartSize;
}

public long getMultipartUploadThreshold()
{
return multipartUploadThreshold;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,9 @@ public boolean apply(Throwable e)
// This can happen sometimes when AWS isn't able to obtain the credentials for some service:
// https://github.com/aws/aws-sdk-java/issues/2285
return true;
} else if (e instanceof InterruptedException) {
Thread.interrupted(); // Clear interrupted state and not retry
return false;
} else if (e instanceof AmazonClientException) {
return AWSClientUtil.isClientExceptionRecoverable((AmazonClientException) e);
} else {
Expand Down Expand Up @@ -348,15 +351,15 @@ static void uploadFileIfPossible(
String bucket,
String key,
File file
)
) throws InterruptedException
{
final PutObjectRequest putObjectRequest = new PutObjectRequest(bucket, key, file);

if (!disableAcl) {
putObjectRequest.setAccessControlList(S3Utils.grantFullControlToBucketOwner(service, bucket));
}
log.info("Pushing [%s] to bucket[%s] and key[%s].", file, bucket, key);
service.putObject(putObjectRequest);
service.upload(putObjectRequest);
}

@Nullable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@
import com.amazonaws.services.s3.model.S3Object;
import com.amazonaws.services.s3.model.UploadPartRequest;
import com.amazonaws.services.s3.model.UploadPartResult;
import com.amazonaws.services.s3.transfer.TransferManager;
import com.amazonaws.services.s3.transfer.TransferManagerBuilder;
import com.amazonaws.services.s3.transfer.Upload;
import org.apache.druid.java.util.common.ISE;

import java.io.File;
Expand All @@ -65,11 +68,21 @@ public static Builder builder()

private final AmazonS3 amazonS3;
private final ServerSideEncryption serverSideEncryption;
private final TransferManager transferManager;

public ServerSideEncryptingAmazonS3(AmazonS3 amazonS3, ServerSideEncryption serverSideEncryption)
public ServerSideEncryptingAmazonS3(AmazonS3 amazonS3, ServerSideEncryption serverSideEncryption, S3TransferConfig transferConfig)
{
this.amazonS3 = amazonS3;
this.serverSideEncryption = serverSideEncryption;
if (transferConfig.isUseTransferManager()) {
this.transferManager = TransferManagerBuilder.standard()
.withS3Client(amazonS3)
.withMinimumUploadPartSize(transferConfig.getMinimumUploadPartSize())
.withMultipartUploadThreshold(transferConfig.getMultipartUploadThreshold())
.build();
} else {
this.transferManager = null;
}
}

public AmazonS3 getAmazonS3()
Expand Down Expand Up @@ -173,10 +186,20 @@ public CompleteMultipartUploadResult completeMultipartUpload(CompleteMultipartUp
return amazonS3.completeMultipartUpload(request);
}

public void upload(PutObjectRequest request) throws InterruptedException
{
if (transferManager == null) {
putObject(request);
} else {
Upload transfer = transferManager.upload(serverSideEncryption.decorate(request));
transfer.waitForCompletion();
}
}

public static class Builder
{
private AmazonS3ClientBuilder amazonS3ClientBuilder = AmazonS3Client.builder();
private S3StorageConfig s3StorageConfig = new S3StorageConfig(new NoopServerSideEncryption());
private S3StorageConfig s3StorageConfig = new S3StorageConfig(new NoopServerSideEncryption(), null);

public Builder setAmazonS3ClientBuilder(AmazonS3ClientBuilder amazonS3ClientBuilder)
{
Expand Down Expand Up @@ -217,7 +240,7 @@ public ServerSideEncryptingAmazonS3 build()
throw new RuntimeException(e);
}

return new ServerSideEncryptingAmazonS3(amazonS3Client, s3StorageConfig.getServerSideEncryption());
return new ServerSideEncryptingAmazonS3(amazonS3Client, s3StorageConfig.getServerSideEncryption(), s3StorageConfig.getS3TransferConfig());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
import org.apache.druid.metadata.DefaultPasswordProvider;
import org.apache.druid.storage.s3.NoopServerSideEncryption;
import org.apache.druid.storage.s3.S3InputDataConfig;
import org.apache.druid.storage.s3.S3TransferConfig;
import org.apache.druid.storage.s3.S3Utils;
import org.apache.druid.storage.s3.ServerSideEncryptingAmazonS3;
import org.apache.druid.testing.InitializedNullHandlingTest;
Expand Down Expand Up @@ -113,7 +114,8 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
public static final AmazonS3ClientBuilder AMAZON_S3_CLIENT_BUILDER = AmazonS3Client.builder();
public static final ServerSideEncryptingAmazonS3 SERVICE = new ServerSideEncryptingAmazonS3(
S3_CLIENT,
new NoopServerSideEncryption()
new NoopServerSideEncryption(),
new S3TransferConfig()
);
public static final S3InputDataConfig INPUT_DATA_CONFIG;
private static final int MAX_LISTING_LENGTH = 10;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ private static ServerSideEncryptingAmazonS3 makeMockClient(
final List<S3ObjectSummary> objects
)
{
return new ServerSideEncryptingAmazonS3(null, null)
return new ServerSideEncryptingAmazonS3(null, null, new S3TransferConfig())
{
@Override
public ListObjectsV2Result listObjectsV2(final ListObjectsV2Request request)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ public String getArchiveBaseKey()
private static final Supplier<ServerSideEncryptingAmazonS3> S3_SERVICE = Suppliers.ofInstance(
new ServerSideEncryptingAmazonS3(
EasyMock.createStrictMock(AmazonS3Client.class),
new NoopServerSideEncryption()
new NoopServerSideEncryption(),
new S3TransferConfig()
)
);
private static final S3DataSegmentPuller PULLER = new S3DataSegmentPuller(S3_SERVICE.get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@

private MockAmazonS3Client()
{
super(new AmazonS3Client(), new NoopServerSideEncryption());
super(new AmazonS3Client(), new NoopServerSideEncryption(), new S3TransferConfig());

Check notice

Code scanning / CodeQL

Deprecated method or constructor invocation Note test

Invoking
AmazonS3Client.AmazonS3Client
should be avoided because it has been deprecated.
}

public boolean didMove()
Expand Down
Loading
Loading