From 77eb9917b89e1e24ce380919806d347639480717 Mon Sep 17 00:00:00 2001 From: Suraj Goel Date: Tue, 28 Jan 2025 09:56:59 +0530 Subject: [PATCH 1/3] Improve S3 upload speeds using aws transfer manager --- .../druid/storage/s3/S3StorageConfig.java | 18 +++- .../druid/storage/s3/S3TransferConfig.java | 71 ++++++++++++++ .../org/apache/druid/storage/s3/S3Utils.java | 7 +- .../s3/ServerSideEncryptingAmazonS3.java | 29 +++++- .../data/input/s3/S3InputSourceTest.java | 4 +- .../storage/s3/ObjectSummaryIteratorTest.java | 2 +- .../storage/s3/S3DataSegmentArchiverTest.java | 3 +- .../storage/s3/S3DataSegmentMoverTest.java | 2 +- .../storage/s3/S3DataSegmentPusherTest.java | 93 ++++++++++--------- .../s3/S3StorageConnectorProviderTest.java | 2 +- .../druid/storage/s3/S3TaskLogsTest.java | 24 ++--- .../s3/TestAWSCredentialsProvider.java | 4 +- .../output/RetryableS3OutputStreamTest.java | 3 +- 13 files changed, 190 insertions(+), 72 deletions(-) create mode 100644 extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3TransferConfig.java diff --git a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3StorageConfig.java b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3StorageConfig.java index cfae0eb084b7..b52d13cd518e 100644 --- a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3StorageConfig.java +++ b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3StorageConfig.java @@ -36,12 +36,22 @@ public class S3StorageConfig @JsonProperty("sse") private final ServerSideEncryption serverSideEncryption; + /** + * S3 transfer config. + * + * @see S3StorageDruidModule#configure + */ + @JsonProperty("transfer") + private final S3TransferConfig s3TransferConfig; + @JsonCreator public S3StorageConfig( - @JsonProperty("sse") ServerSideEncryption serverSideEncryption + @JsonProperty("sse") ServerSideEncryption serverSideEncryption, + @JsonProperty("transfer") S3TransferConfig s3TransferConfig ) { this.serverSideEncryption = serverSideEncryption == null ? new NoopServerSideEncryption() : serverSideEncryption; + this.s3TransferConfig = s3TransferConfig == null ? new S3TransferConfig() : s3TransferConfig; } @JsonProperty("sse") @@ -49,4 +59,10 @@ public ServerSideEncryption getServerSideEncryption() { return serverSideEncryption; } + + @JsonProperty("transfer") + public S3TransferConfig getS3TransferConfig() + { + return s3TransferConfig; + } } diff --git a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3TransferConfig.java b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3TransferConfig.java new file mode 100644 index 000000000000..fc8bd8903fad --- /dev/null +++ b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3TransferConfig.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.storage.s3; + +import com.fasterxml.jackson.annotation.JsonProperty; + +import javax.validation.constraints.Min; + +/** + */ +public class S3TransferConfig +{ + @JsonProperty + private boolean useTransferManager = false; + + @JsonProperty + @Min(1) + private long minimumUploadPartSize = 5 * 1024 * 1024L; + + @JsonProperty + @Min(1) + private long multipartUploadThreshold = 5 * 1024 * 1024L; + + public void setUseTransferManager(boolean useTransferManager) + { + this.useTransferManager = useTransferManager; + } + + public void setMinimumUploadPartSize(long minimumUploadPartSize) + { + this.minimumUploadPartSize = minimumUploadPartSize; + } + + public void setMultipartUploadThreshold(long multipartUploadThreshold) + { + this.multipartUploadThreshold = multipartUploadThreshold; + } + + public boolean isUseTransferManager() + { + return useTransferManager; + } + + public long getMinimumUploadPartSize() + { + return minimumUploadPartSize; + } + + public long getMultipartUploadThreshold() + { + return multipartUploadThreshold; + } + +} diff --git a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3Utils.java b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3Utils.java index b299d4f9dd80..1eba9907ab36 100644 --- a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3Utils.java +++ b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3Utils.java @@ -96,6 +96,9 @@ public boolean apply(Throwable e) // This can happen sometimes when AWS isn't able to obtain the credentials for some service: // https://github.com/aws/aws-sdk-java/issues/2285 return true; + } else if (e instanceof InterruptedException) { + Thread.interrupted(); // Clear interrupted state and not retry + return false; } else if (e instanceof AmazonClientException) { return AWSClientUtil.isClientExceptionRecoverable((AmazonClientException) e); } else { @@ -348,7 +351,7 @@ static void uploadFileIfPossible( String bucket, String key, File file - ) + ) throws InterruptedException { final PutObjectRequest putObjectRequest = new PutObjectRequest(bucket, key, file); @@ -356,7 +359,7 @@ static void uploadFileIfPossible( putObjectRequest.setAccessControlList(S3Utils.grantFullControlToBucketOwner(service, bucket)); } log.info("Pushing [%s] to bucket[%s] and key[%s].", file, bucket, key); - service.putObject(putObjectRequest); + service.upload(putObjectRequest); } @Nullable diff --git a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/ServerSideEncryptingAmazonS3.java b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/ServerSideEncryptingAmazonS3.java index 31120ba883c4..13725e3aa232 100644 --- a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/ServerSideEncryptingAmazonS3.java +++ b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/ServerSideEncryptingAmazonS3.java @@ -43,6 +43,9 @@ import com.amazonaws.services.s3.model.S3Object; import com.amazonaws.services.s3.model.UploadPartRequest; import com.amazonaws.services.s3.model.UploadPartResult; +import com.amazonaws.services.s3.transfer.TransferManager; +import com.amazonaws.services.s3.transfer.TransferManagerBuilder; +import com.amazonaws.services.s3.transfer.Upload; import org.apache.druid.java.util.common.ISE; import java.io.File; @@ -65,11 +68,21 @@ public static Builder builder() private final AmazonS3 amazonS3; private final ServerSideEncryption serverSideEncryption; + private final TransferManager transferManager; - public ServerSideEncryptingAmazonS3(AmazonS3 amazonS3, ServerSideEncryption serverSideEncryption) + public ServerSideEncryptingAmazonS3(AmazonS3 amazonS3, ServerSideEncryption serverSideEncryption, S3TransferConfig transferConfig) { this.amazonS3 = amazonS3; this.serverSideEncryption = serverSideEncryption; + if (transferConfig.isUseTransferManager()) { + this.transferManager = TransferManagerBuilder.standard() + .withS3Client(amazonS3) + .withMinimumUploadPartSize(transferConfig.getMinimumUploadPartSize()) + .withMultipartUploadThreshold(transferConfig.getMultipartUploadThreshold()) + .build(); + } else { + this.transferManager = null; + } } public AmazonS3 getAmazonS3() @@ -173,10 +186,20 @@ public CompleteMultipartUploadResult completeMultipartUpload(CompleteMultipartUp return amazonS3.completeMultipartUpload(request); } + public void upload(PutObjectRequest request) throws InterruptedException + { + if (transferManager == null) { + putObject(request); + } else { + Upload transfer = transferManager.upload(serverSideEncryption.decorate(request)); + transfer.waitForCompletion(); + } + } + public static class Builder { private AmazonS3ClientBuilder amazonS3ClientBuilder = AmazonS3Client.builder(); - private S3StorageConfig s3StorageConfig = new S3StorageConfig(new NoopServerSideEncryption()); + private S3StorageConfig s3StorageConfig = new S3StorageConfig(new NoopServerSideEncryption(), null); public Builder setAmazonS3ClientBuilder(AmazonS3ClientBuilder amazonS3ClientBuilder) { @@ -217,7 +240,7 @@ public ServerSideEncryptingAmazonS3 build() throw new RuntimeException(e); } - return new ServerSideEncryptingAmazonS3(amazonS3Client, s3StorageConfig.getServerSideEncryption()); + return new ServerSideEncryptingAmazonS3(amazonS3ClientBuilder.build(), s3StorageConfig.getServerSideEncryption(), s3StorageConfig.getS3TransferConfig()); } } } diff --git a/extensions-core/s3-extensions/src/test/java/org/apache/druid/data/input/s3/S3InputSourceTest.java b/extensions-core/s3-extensions/src/test/java/org/apache/druid/data/input/s3/S3InputSourceTest.java index 4f14364e7222..d2c2a33293f9 100644 --- a/extensions-core/s3-extensions/src/test/java/org/apache/druid/data/input/s3/S3InputSourceTest.java +++ b/extensions-core/s3-extensions/src/test/java/org/apache/druid/data/input/s3/S3InputSourceTest.java @@ -74,6 +74,7 @@ import org.apache.druid.metadata.DefaultPasswordProvider; import org.apache.druid.storage.s3.NoopServerSideEncryption; import org.apache.druid.storage.s3.S3InputDataConfig; +import org.apache.druid.storage.s3.S3TransferConfig; import org.apache.druid.storage.s3.S3Utils; import org.apache.druid.storage.s3.ServerSideEncryptingAmazonS3; import org.apache.druid.testing.InitializedNullHandlingTest; @@ -113,7 +114,8 @@ public class S3InputSourceTest extends InitializedNullHandlingTest public static final AmazonS3ClientBuilder AMAZON_S3_CLIENT_BUILDER = AmazonS3Client.builder(); public static final ServerSideEncryptingAmazonS3 SERVICE = new ServerSideEncryptingAmazonS3( S3_CLIENT, - new NoopServerSideEncryption() + new NoopServerSideEncryption(), + new S3TransferConfig() ); public static final S3InputDataConfig INPUT_DATA_CONFIG; private static final int MAX_LISTING_LENGTH = 10; diff --git a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/ObjectSummaryIteratorTest.java b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/ObjectSummaryIteratorTest.java index ea2ca4af26c1..8ee6c826718d 100644 --- a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/ObjectSummaryIteratorTest.java +++ b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/ObjectSummaryIteratorTest.java @@ -195,7 +195,7 @@ private static ServerSideEncryptingAmazonS3 makeMockClient( final List objects ) { - return new ServerSideEncryptingAmazonS3(null, null) + return new ServerSideEncryptingAmazonS3(null, null, new S3TransferConfig()) { @Override public ListObjectsV2Result listObjectsV2(final ListObjectsV2Request request) diff --git a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3DataSegmentArchiverTest.java b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3DataSegmentArchiverTest.java index f5005c706e01..4acf553fae50 100644 --- a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3DataSegmentArchiverTest.java +++ b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3DataSegmentArchiverTest.java @@ -76,7 +76,8 @@ public String getArchiveBaseKey() private static final Supplier S3_SERVICE = Suppliers.ofInstance( new ServerSideEncryptingAmazonS3( EasyMock.createStrictMock(AmazonS3Client.class), - new NoopServerSideEncryption() + new NoopServerSideEncryption(), + new S3TransferConfig() ) ); private static final S3DataSegmentPuller PULLER = new S3DataSegmentPuller(S3_SERVICE.get()); diff --git a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3DataSegmentMoverTest.java b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3DataSegmentMoverTest.java index 550a72cef43c..8f653e956a83 100644 --- a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3DataSegmentMoverTest.java +++ b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3DataSegmentMoverTest.java @@ -201,7 +201,7 @@ private static class MockAmazonS3Client extends ServerSideEncryptingAmazonS3 private MockAmazonS3Client() { - super(new AmazonS3Client(), new NoopServerSideEncryption()); + super(new AmazonS3Client(), new NoopServerSideEncryption(), new S3TransferConfig()); } public boolean didMove() diff --git a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3DataSegmentPusherTest.java b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3DataSegmentPusherTest.java index ba1aba1305ee..698f9d6e63f4 100644 --- a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3DataSegmentPusherTest.java +++ b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3DataSegmentPusherTest.java @@ -25,7 +25,7 @@ import com.amazonaws.services.s3.model.Grant; import com.amazonaws.services.s3.model.Owner; import com.amazonaws.services.s3.model.Permission; -import com.amazonaws.services.s3.model.PutObjectResult; +import com.amazonaws.services.s3.model.PutObjectRequest; import com.google.common.io.Files; import org.apache.druid.error.DruidException; import org.apache.druid.java.util.common.Intervals; @@ -41,9 +41,9 @@ import org.junit.rules.TemporaryFolder; import java.io.File; +import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; -import java.util.function.Consumer; import java.util.regex.Pattern; /** @@ -58,12 +58,8 @@ public class S3DataSegmentPusherTest public void testPush() throws Exception { testPushInternal( - false, - "key/foo/2015-01-01T00:00:00\\.000Z_2016-01-01T00:00:00\\.000Z/0/0/index\\.zip", - client -> - EasyMock.expect(client.putObject(EasyMock.anyObject())) - .andReturn(new PutObjectResult()) - .once() + false, + "key/foo/2015-01-01T00:00:00\\.000Z_2016-01-01T00:00:00\\.000Z/0/0/index\\.zip" ); } @@ -71,12 +67,8 @@ public void testPush() throws Exception public void testPushUseUniquePath() throws Exception { testPushInternal( - true, - "key/foo/2015-01-01T00:00:00\\.000Z_2016-01-01T00:00:00\\.000Z/0/0/[A-Za-z0-9-]{36}/index\\.zip", - client -> - EasyMock.expect(client.putObject(EasyMock.anyObject())) - .andReturn(new PutObjectResult()) - .once() + true, + "key/foo/2015-01-01T00:00:00\\.000Z_2016-01-01T00:00:00\\.000Z/0/0/[A-Za-z0-9-]{36}/index\\.zip" ); } @@ -86,30 +78,19 @@ public void testEntityTooLarge() final DruidException exception = Assert.assertThrows( DruidException.class, () -> - testPushInternal( + testPushInternalForEntityTooLarge( false, - "key/foo/2015-01-01T00:00:00\\.000Z_2016-01-01T00:00:00\\.000Z/0/0/index\\.zip", - client -> { - final AmazonS3Exception e = new AmazonS3Exception("whoa too many bytes"); - e.setErrorCode(S3Utils.ERROR_ENTITY_TOO_LARGE); - EasyMock.expect(client.putObject(EasyMock.anyObject())) - .andThrow(e) - .once(); - } - ) + "key/foo/2015-01-01T00:00:00\\.000Z_2016-01-01T00:00:00\\.000Z/0/0/index\\.zip" + ) ); MatcherAssert.assertThat( - exception, - ThrowableMessageMatcher.hasMessage(CoreMatchers.startsWith("Got error[EntityTooLarge] from S3")) + exception, + ThrowableMessageMatcher.hasMessage(CoreMatchers.startsWith("Got error[EntityTooLarge] from S3")) ); } - private void testPushInternal( - boolean useUniquePath, - String matcher, - Consumer clientDecorator - ) throws Exception + private void testPushInternal(boolean useUniquePath, String matcher) throws Exception { ServerSideEncryptingAmazonS3 s3Client = EasyMock.createStrictMock(ServerSideEncryptingAmazonS3.class); @@ -118,10 +99,36 @@ private void testPushInternal( acl.grantAllPermissions(new Grant(new CanonicalGrantee(acl.getOwner().getId()), Permission.FullControl)); EasyMock.expect(s3Client.getBucketAcl(EasyMock.eq("bucket"))).andReturn(acl).once(); - clientDecorator.accept(s3Client); + s3Client.upload(EasyMock.anyObject(PutObjectRequest.class)); + EasyMock.expectLastCall().once(); EasyMock.replay(s3Client); + validate(useUniquePath, matcher, s3Client); + } + + private void testPushInternalForEntityTooLarge(boolean useUniquePath, String matcher) throws Exception + { + ServerSideEncryptingAmazonS3 s3Client = EasyMock.createStrictMock(ServerSideEncryptingAmazonS3.class); + final AmazonS3Exception e = new AmazonS3Exception("whoa too many bytes"); + e.setErrorCode(S3Utils.ERROR_ENTITY_TOO_LARGE); + + + final AccessControlList acl = new AccessControlList(); + acl.setOwner(new Owner("ownerId", "owner")); + acl.grantAllPermissions(new Grant(new CanonicalGrantee(acl.getOwner().getId()), Permission.FullControl)); + EasyMock.expect(s3Client.getBucketAcl(EasyMock.eq("bucket"))).andReturn(acl).once(); + + s3Client.upload(EasyMock.anyObject(PutObjectRequest.class)); + EasyMock.expectLastCall().andThrow(e).once(); + + EasyMock.replay(s3Client); + + validate(useUniquePath, matcher, s3Client); + } + + private void validate(boolean useUniquePath, String matcher, ServerSideEncryptingAmazonS3 s3Client) throws IOException + { S3DataSegmentPusherConfig config = new S3DataSegmentPusherConfig(); config.setBucket("bucket"); config.setBaseKey("key"); @@ -136,15 +143,15 @@ private void testPushInternal( final long size = data.length; DataSegment segmentToPush = new DataSegment( - "foo", - Intervals.of("2015/2016"), - "0", - new HashMap<>(), - new ArrayList<>(), - new ArrayList<>(), - NoneShardSpec.instance(), - 0, - size + "foo", + Intervals.of("2015/2016"), + "0", + new HashMap<>(), + new ArrayList<>(), + new ArrayList<>(), + NoneShardSpec.instance(), + 0, + size ); DataSegment segment = pusher.push(tempFolder.getRoot(), segmentToPush, useUniquePath); @@ -153,8 +160,8 @@ private void testPushInternal( Assert.assertEquals(1, (int) segment.getBinaryVersion()); Assert.assertEquals("bucket", segment.getLoadSpec().get("bucket")); Assert.assertTrue( - segment.getLoadSpec().get("key").toString(), - Pattern.compile(matcher).matcher(segment.getLoadSpec().get("key").toString()).matches() + segment.getLoadSpec().get("key").toString(), + Pattern.compile(matcher).matcher(segment.getLoadSpec().get("key").toString()).matches() ); Assert.assertEquals("s3_zip", segment.getLoadSpec().get("type")); diff --git a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3StorageConnectorProviderTest.java b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3StorageConnectorProviderTest.java index 3210a26cc584..8f898848c87d 100644 --- a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3StorageConnectorProviderTest.java +++ b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3StorageConnectorProviderTest.java @@ -148,7 +148,7 @@ public void configure(Binder binder) new InjectableValues.Std() .addValue( ServerSideEncryptingAmazonS3.class, - new ServerSideEncryptingAmazonS3(null, new NoopServerSideEncryption()) + new ServerSideEncryptingAmazonS3(null, new NoopServerSideEncryption(), new S3TransferConfig()) ) .addValue( S3UploadManager.class, diff --git a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3TaskLogsTest.java b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3TaskLogsTest.java index 011dc4888456..4fa8cf7e044f 100644 --- a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3TaskLogsTest.java +++ b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3TaskLogsTest.java @@ -29,7 +29,6 @@ import com.amazonaws.services.s3.model.Owner; import com.amazonaws.services.s3.model.Permission; import com.amazonaws.services.s3.model.PutObjectRequest; -import com.amazonaws.services.s3.model.PutObjectResult; import com.amazonaws.services.s3.model.S3Object; import com.amazonaws.services.s3.model.S3ObjectSummary; import com.google.common.base.Optional; @@ -123,11 +122,9 @@ public void testTaskLogsPushWithAclEnabled() throws Exception } @Test - public void test_pushTaskStatus() throws IOException + public void test_pushTaskStatus() throws IOException, InterruptedException { - EasyMock.expect(s3Client.putObject(EasyMock.anyObject(PutObjectRequest.class))) - .andReturn(new PutObjectResult()) - .once(); + s3Client.upload(EasyMock.anyObject(PutObjectRequest.class)); EasyMock.replay(s3Client); @@ -148,12 +145,11 @@ public void test_pushTaskStatus() throws IOException } @Test - public void test_pushTaskPayload() throws IOException + public void test_pushTaskPayload() throws IOException, InterruptedException { Capture putObjectRequestCapture = Capture.newInstance(CaptureType.FIRST); - EasyMock.expect(s3Client.putObject(EasyMock.capture(putObjectRequestCapture))) - .andReturn(new PutObjectResult()) - .once(); + s3Client.upload(EasyMock.capture(putObjectRequestCapture)); + EasyMock.expectLastCall().once(); EasyMock.replay(s3Client); @@ -617,9 +613,8 @@ private S3TaskLogs getS3TaskLogs() private List testPushInternal(boolean disableAcl, String ownerId, String ownerDisplayName) throws Exception { - EasyMock.expect(s3Client.putObject(EasyMock.anyObject())) - .andReturn(new PutObjectResult()) - .once(); + s3Client.upload(EasyMock.anyObject(PutObjectRequest.class)); + EasyMock.expectLastCall().once(); AccessControlList aclExpected = new AccessControlList(); aclExpected.setOwner(new Owner(ownerId, ownerDisplayName)); @@ -628,9 +623,8 @@ private List testPushInternal(boolean disableAcl, String ownerId, String .andReturn(aclExpected) .once(); - EasyMock.expect(s3Client.putObject(EasyMock.anyObject(PutObjectRequest.class))) - .andReturn(new PutObjectResult()) - .once(); + s3Client.upload(EasyMock.anyObject(PutObjectRequest.class)); + EasyMock.expectLastCall().once(); EasyMock.replay(s3Client); diff --git a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/TestAWSCredentialsProvider.java b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/TestAWSCredentialsProvider.java index 3685fc6fa19b..fefcb8c3c38b 100644 --- a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/TestAWSCredentialsProvider.java +++ b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/TestAWSCredentialsProvider.java @@ -67,7 +67,7 @@ public void testWithFixedAWSKeys() new AWSProxyConfig(), new AWSEndpointConfig(), new AWSClientConfig(), - new S3StorageConfig(new NoopServerSideEncryption()) + new S3StorageConfig(new NoopServerSideEncryption(), null) ); s3Module.getAmazonS3Client( @@ -102,7 +102,7 @@ public void testWithFileSessionCredentials() throws IOException new AWSProxyConfig(), new AWSEndpointConfig(), new AWSClientConfig(), - new S3StorageConfig(new NoopServerSideEncryption()) + new S3StorageConfig(new NoopServerSideEncryption(), null) ); s3Module.getAmazonS3Client( diff --git a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/output/RetryableS3OutputStreamTest.java b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/output/RetryableS3OutputStreamTest.java index ead65a89f771..437f635433bd 100644 --- a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/output/RetryableS3OutputStreamTest.java +++ b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/output/RetryableS3OutputStreamTest.java @@ -36,6 +36,7 @@ import org.apache.druid.java.util.metrics.StubServiceEmitter; import org.apache.druid.query.DruidProcessingConfigTest; import org.apache.druid.storage.s3.NoopServerSideEncryption; +import org.apache.druid.storage.s3.S3TransferConfig; import org.apache.druid.storage.s3.ServerSideEncryptingAmazonS3; import org.easymock.EasyMock; import org.junit.Assert; @@ -230,7 +231,7 @@ private static class TestAmazonS3 extends ServerSideEncryptingAmazonS3 private TestAmazonS3(int totalUploadFailures) { - super(EasyMock.createMock(AmazonS3.class), new NoopServerSideEncryption()); + super(EasyMock.createMock(AmazonS3.class), new NoopServerSideEncryption(), new S3TransferConfig()); this.uploadFailuresLeft = totalUploadFailures; } From b6758ef81494ab3d55b10609c744c0130e351e80 Mon Sep 17 00:00:00 2001 From: Suraj Goel Date: Tue, 28 Jan 2025 12:04:11 +0530 Subject: [PATCH 2/3] Pass correct amazonS3Client to ServerSideEncryptingAmazonS3 --- .../apache/druid/storage/s3/ServerSideEncryptingAmazonS3.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/ServerSideEncryptingAmazonS3.java b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/ServerSideEncryptingAmazonS3.java index 13725e3aa232..e747ddf9f4e1 100644 --- a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/ServerSideEncryptingAmazonS3.java +++ b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/ServerSideEncryptingAmazonS3.java @@ -240,7 +240,7 @@ public ServerSideEncryptingAmazonS3 build() throw new RuntimeException(e); } - return new ServerSideEncryptingAmazonS3(amazonS3ClientBuilder.build(), s3StorageConfig.getServerSideEncryption(), s3StorageConfig.getS3TransferConfig()); + return new ServerSideEncryptingAmazonS3(amazonS3Client, s3StorageConfig.getServerSideEncryption(), s3StorageConfig.getS3TransferConfig()); } } } From a103181e2f1c32ff1ce39ce6411da308dd0ee130 Mon Sep 17 00:00:00 2001 From: Suraj Goel Date: Tue, 28 Jan 2025 15:53:12 +0530 Subject: [PATCH 3/3] Add Unit Test Cases --- .../storage/s3/S3TransferConfigTest.java | 59 +++++++ .../s3/ServerSideEncryptingAmazonS3Test.java | 148 ++++++++++++++++++ 2 files changed, 207 insertions(+) create mode 100644 extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3TransferConfigTest.java create mode 100644 extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/ServerSideEncryptingAmazonS3Test.java diff --git a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3TransferConfigTest.java b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3TransferConfigTest.java new file mode 100644 index 000000000000..7af20431ab23 --- /dev/null +++ b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3TransferConfigTest.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.storage.s3; + +import org.junit.Assert; +import org.junit.Test; + +public class S3TransferConfigTest +{ + @Test + public void testDefaultValues() + { + S3TransferConfig config = new S3TransferConfig(); + Assert.assertFalse(config.isUseTransferManager()); + Assert.assertEquals(5 * 1024 * 1024L, config.getMinimumUploadPartSize()); + Assert.assertEquals(5 * 1024 * 1024L, config.getMultipartUploadThreshold()); + } + + @Test + public void testSetUseTransferManager() + { + S3TransferConfig config = new S3TransferConfig(); + config.setUseTransferManager(true); + Assert.assertTrue(config.isUseTransferManager()); + } + + @Test + public void testSetMinimumUploadPartSize() + { + S3TransferConfig config = new S3TransferConfig(); + config.setMinimumUploadPartSize(10 * 1024 * 1024L); + Assert.assertEquals(10 * 1024 * 1024L, config.getMinimumUploadPartSize()); + } + + @Test + public void testSetMultipartUploadThreshold() + { + S3TransferConfig config = new S3TransferConfig(); + config.setMultipartUploadThreshold(10 * 1024 * 1024L); + Assert.assertEquals(10 * 1024 * 1024L, config.getMultipartUploadThreshold()); + } +} diff --git a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/ServerSideEncryptingAmazonS3Test.java b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/ServerSideEncryptingAmazonS3Test.java new file mode 100644 index 000000000000..75e1a72da0d2 --- /dev/null +++ b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/ServerSideEncryptingAmazonS3Test.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.storage.s3; + +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.model.PutObjectRequest; +import com.amazonaws.services.s3.model.PutObjectResult; +import com.amazonaws.services.s3.transfer.TransferManager; +import com.amazonaws.services.s3.transfer.Upload; +import org.easymock.EasyMock; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + + +import java.lang.reflect.Field; + + + +public class ServerSideEncryptingAmazonS3Test +{ + private AmazonS3 mockAmazonS3; + private ServerSideEncryption mockServerSideEncryption; + private S3TransferConfig mockTransferConfig; + private TransferManager mockTransferManager; + + @Before + public void setup() + { + mockAmazonS3 = EasyMock.createMock(AmazonS3.class); + mockServerSideEncryption = EasyMock.createMock(ServerSideEncryption.class); + mockTransferConfig = EasyMock.createMock(S3TransferConfig.class); + mockTransferManager = EasyMock.createMock(TransferManager.class); + } + + @Test + public void testConstructor_WithTransferManager() throws NoSuchFieldException, IllegalAccessException + { + EasyMock.expect(mockTransferConfig.isUseTransferManager()).andReturn(true); + EasyMock.expect(mockTransferConfig.getMinimumUploadPartSize()).andReturn(5L); + EasyMock.expect(mockTransferConfig.getMultipartUploadThreshold()).andReturn(10L); + EasyMock.replay(mockTransferConfig); + + ServerSideEncryptingAmazonS3 s3 = new ServerSideEncryptingAmazonS3(mockAmazonS3, mockServerSideEncryption, mockTransferConfig); + + Field transferManagerField = ServerSideEncryptingAmazonS3.class.getDeclaredField("transferManager"); + transferManagerField.setAccessible(true); + Object transferManager = transferManagerField.get(s3); + + Assert.assertNotNull("TransferManager should be initialized", transferManager); + Assert.assertNotNull(s3); + EasyMock.verify(mockTransferConfig); + } + + @Test + public void testConstructor_WithoutTransferManager() throws NoSuchFieldException, IllegalAccessException + { + + EasyMock.expect(mockTransferConfig.isUseTransferManager()).andReturn(false); + EasyMock.replay(mockTransferConfig); + + ServerSideEncryptingAmazonS3 s3 = new ServerSideEncryptingAmazonS3(mockAmazonS3, mockServerSideEncryption, mockTransferConfig); + + Field transferManagerField = ServerSideEncryptingAmazonS3.class.getDeclaredField("transferManager"); + transferManagerField.setAccessible(true); + Object transferManager = transferManagerField.get(s3); + + Assert.assertNull("TransferManager should not be initialized", transferManager); + Assert.assertNotNull(s3); + EasyMock.verify(mockTransferConfig); + } + + @Test + public void testUpload_WithoutTransferManager() throws InterruptedException + { + PutObjectRequest originalRequest = new PutObjectRequest("bucket", "key", "file"); + PutObjectRequest decoratedRequest = new PutObjectRequest("bucket", "key", "file-encrypted"); + PutObjectResult mockResult = new PutObjectResult(); + + EasyMock.expect(mockTransferConfig.isUseTransferManager()).andReturn(false); + EasyMock.replay(mockTransferConfig); + + EasyMock.expect(mockServerSideEncryption.decorate(originalRequest)).andReturn(decoratedRequest); + EasyMock.replay(mockServerSideEncryption); + + EasyMock.expect(mockAmazonS3.putObject(decoratedRequest)).andReturn(mockResult).once(); + EasyMock.replay(mockAmazonS3); + + ServerSideEncryptingAmazonS3 s3 = new ServerSideEncryptingAmazonS3(mockAmazonS3, mockServerSideEncryption, mockTransferConfig); + s3.upload(originalRequest); + + EasyMock.verify(mockServerSideEncryption); + EasyMock.verify(mockAmazonS3); + EasyMock.verify(mockTransferConfig); + } + + @Test + public void testUpload_WithTransferManager() throws InterruptedException, NoSuchFieldException, IllegalAccessException + { + PutObjectRequest originalRequest = new PutObjectRequest("bucket", "key", "file"); + PutObjectRequest decoratedRequest = new PutObjectRequest("bucket", "key", "file-encrypted"); + Upload mockUpload = EasyMock.createMock(Upload.class); + + EasyMock.expect(mockTransferConfig.isUseTransferManager()).andReturn(true).once(); + EasyMock.expect(mockTransferConfig.getMinimumUploadPartSize()).andReturn(5242880L).once(); // 5 MB + EasyMock.expect(mockTransferConfig.getMultipartUploadThreshold()).andReturn(10485760L).once(); // 10 MB + EasyMock.replay(mockTransferConfig); + + EasyMock.expect(mockServerSideEncryption.decorate(originalRequest)).andReturn(decoratedRequest); + EasyMock.replay(mockServerSideEncryption); + + EasyMock.expect(mockTransferManager.upload(decoratedRequest)).andReturn(mockUpload); + EasyMock.replay(mockTransferManager); + + mockUpload.waitForCompletion(); + EasyMock.expectLastCall(); + EasyMock.replay(mockUpload); + + ServerSideEncryptingAmazonS3 s3 = new ServerSideEncryptingAmazonS3(mockAmazonS3, mockServerSideEncryption, mockTransferConfig); + + Field transferManagerField = ServerSideEncryptingAmazonS3.class.getDeclaredField("transferManager"); + transferManagerField.setAccessible(true); + transferManagerField.set(s3, mockTransferManager); + + s3.upload(originalRequest); + + EasyMock.verify(mockServerSideEncryption); + EasyMock.verify(mockTransferManager); + EasyMock.verify(mockUpload); + } +}