From ca003d0e2b24690ab208ceacde4c41880e9dd850 Mon Sep 17 00:00:00 2001 From: Sumit Aich Date: Fri, 13 Dec 2024 13:21:53 +0530 Subject: [PATCH] feat: add oss storage type in blob sink (#60) * feat: add oss storage type in blob sink * feat: add oss storage type in blob sink * docs: add docs for oss blob sink * docs: add docs for oss blob sink * test: add unit tests for ObjectStorageServiceConfig * test: add unit tests for BlobStorageFactory * test: add unit tests for ObjectStorageService * tests: add unit tests for BlobStorageFactory * fix: fix checkstyle * chore: version bump to 0.11.2 * chore: refactor direct config reference to class props --------- Co-authored-by: Eka Winata --- build.gradle | 2 +- docs/docs/sinks/blob-sink.md | 95 +++++- .../firehose/sink/blob/BlobSinkFactory.java | 5 +- .../blobstorage/oss/ObjectStorageService.java | 18 +- .../ObjectStorageServiceConfigTest.java | 296 ++++++++++++++++++ .../blobstorage/BlobStorageFactoryTest.java | 174 ++++++++++ .../oss/ObjectStorageServiceTest.java | 251 ++++++++++++++- 7 files changed, 816 insertions(+), 25 deletions(-) create mode 100644 src/test/java/com/gotocompany/firehose/config/ObjectStorageServiceConfigTest.java create mode 100644 src/test/java/com/gotocompany/firehose/sink/common/blobstorage/BlobStorageFactoryTest.java diff --git a/build.gradle b/build.gradle index 416b77683..4c6c8d96a 100644 --- a/build.gradle +++ b/build.gradle @@ -33,7 +33,7 @@ lombok { } group 'com.gotocompany' -version '0.11.1' +version '0.11.2' def projName = "firehose" diff --git a/docs/docs/sinks/blob-sink.md b/docs/docs/sinks/blob-sink.md index a91d0eb8b..8e7320a12 100644 --- a/docs/docs/sinks/blob-sink.md +++ b/docs/docs/sinks/blob-sink.md @@ -4,9 +4,9 @@ A Blob sink Firehose \(`SINK_TYPE`=`blob`\) requires the following variables to ### `SINK_BLOB_STORAGE_TYPE` -Defines the types of blob storage the destination remote file system the file will be uploaded. Currently, the only supported blob storages are `GCS` (google cloud storage) and `S3` (Amazon S3) . +Defines the types of blob storage the destination remote file system the file will be uploaded. Currently, the only supported blob storages are `GCS` (google cloud storage), `OSS`( Alibaba Cloud Object Storage Service) and `S3` (Amazon S3) . -- Example value: `GCS` or `S3` +- Example value: `GCS`, `OSS` or `S3` - Type: `required` ### `SINK_BLOB_LOCAL_FILE_WRITER_TYPE` @@ -262,3 +262,94 @@ The amount of time to allow the client to complete the execution of an API call. - Example value: `40000` - Type: `optional` - Default value : `40000` + +### `SINK_BLOB_OSS_ENDPOINT` + +The endpoint of the OSS service. Each region has its own endpoint. For example, the endpoint for the China (Hangzhou) region is "oss-cn-hangzhou.aliyuncs.com". For more information, please refer to the [OSS endpoints documentation](https://www.alibabacloud.com/help/en/oss/user-guide/regions-and-endpoints). + +- Example value: `oss-cn-hangzhou.aliyuncs.com` +- Type: `required` + +### `SINK_BLOB_OSS_REGION` + +The region where your OSS bucket is located. This should match the region in your endpoint. For example, if your endpoint is "oss-cn-hangzhou.aliyuncs.com", the region should be "cn-hangzhou". + +- Example value: `cn-hangzhou` +- Type: `required` + +### `SINK_BLOB_OSS_ACCESS_ID` + +The AccessKey ID provided by Alibaba Cloud. This is used for authentication when accessing OSS. You can obtain this from the Alibaba Cloud console under AccessKey management. For security best practices, it's recommended to use RAM user AccessKeys instead of the primary account AccessKey. + +- Example value: `LTAI4FxxxxxxxxxxxxxxxT7PD` +- Type: `required` + +### `SINK_BLOB_OSS_ACCESS_KEY` + +The AccessKey Secret provided by Alibaba Cloud. This is paired with the Access ID for authentication. Keep this secret secure and never share it publicly. + +- Example value: `D6TxxxxxxxxxxxxxxxxxxxxxxxtRcO` +- Type: `required` + +### `SINK_BLOB_OSS_BUCKET_NAME` + +The name of your OSS bucket. Bucket names must be globally unique across all Alibaba Cloud OSS. The name must comply with OSS naming conventions: lowercase letters, numbers, and hyphens only, 3-63 characters long, must start and end with lowercase letter or number. + +- Example value: `my-firehose-bucket` +- Type: `required` + +### `SINK_BLOB_OSS_DIRECTORY_PREFIX` + +The prefix path where objects will be stored in the bucket. This allows you to organize objects in a folder-like structure within your bucket. + +- Example value: `data/firehose/` +- Type: `optional` +- Default value: `` (empty string, root of bucket) + +### `SINK_BLOB_OSS_SOCKET_TIMEOUT_MS` + +The socket timeout in milliseconds. This is the maximum time to wait for data to be transferred over an established connection before giving up. + +- Example value: `50000` +- Type: `optional` +- Default value: `50000` + +### `SINK_BLOB_OSS_CONNECTION_TIMEOUT_MS` + +The connection timeout in milliseconds. This is the maximum time to wait while establishing a connection with OSS. + +- Example value: `50000` +- Type: `optional` +- Default value: `50000` + +### `SINK_BLOB_OSS_CONNECTION_REQUEST_TIMEOUT_MS` + +The timeout in milliseconds for requesting a connection from the connection manager. A value of -1 means no timeout. + +- Example value: `10000` +- Type: `optional` +- Default value: `-1` + +### `SINK_BLOB_OSS_REQUEST_TIMEOUT_MS` + +The maximum time allowed for the entire request operation (connection, upload, server processing, etc.). + +- Example value: `300000` +- Type: `optional` +- Default value: `300000` + +### `SINK_BLOB_OSS_RETRY_ENABLED` + +Whether to enable automatic retry of failed requests. When enabled, failed requests will be retried according to the retry configuration. + +- Example value: `true` +- Type: `optional` +- Default value: `true` + +### `SINK_BLOB_OSS_MAX_RETRY_ATTEMPTS` + +The maximum number of retry attempts for failed requests when retry is enabled. + +- Example value: `3` +- Type: `optional` +- Default value: `3` diff --git a/src/main/java/com/gotocompany/firehose/sink/blob/BlobSinkFactory.java b/src/main/java/com/gotocompany/firehose/sink/blob/BlobSinkFactory.java index 0f52b7392..4df91622c 100644 --- a/src/main/java/com/gotocompany/firehose/sink/blob/BlobSinkFactory.java +++ b/src/main/java/com/gotocompany/firehose/sink/blob/BlobSinkFactory.java @@ -46,7 +46,6 @@ private static Descriptors.Descriptor getMetadataMessageDescriptor(BlobSinkConfi return sinkConfig.getOutputKafkaMetadataColumnName().isEmpty() ? fileDescriptor.findMessageTypeByName(KafkaMetadataProtoMessage.getTypeName()) : fileDescriptor.findMessageTypeByName(NestedKafkaMetadataProtoMessage.getTypeName()); - } private static LocalStorage getLocalFileWriterWrapper(BlobSinkConfig sinkConfig, StencilClient stencilClient, StatsDReporter statsDReporter) { @@ -71,10 +70,12 @@ public static BlobStorage createSinkObjectStorage(BlobSinkConfig sinkConfig, Map case S3: configuration.put("S3_TYPE", "SINK_BLOB"); break; + case OSS: + configuration.put("OSS_TYPE", "SINK_BLOB"); + break; default: throw new IllegalArgumentException("Sink Blob Storage type " + sinkConfig.getBlobStorageType() + "is not supported"); } return BlobStorageFactory.createObjectStorage(sinkConfig.getBlobStorageType(), configuration); - } } diff --git a/src/main/java/com/gotocompany/firehose/sink/common/blobstorage/oss/ObjectStorageService.java b/src/main/java/com/gotocompany/firehose/sink/common/blobstorage/oss/ObjectStorageService.java index a981a78db..f5bad5c94 100644 --- a/src/main/java/com/gotocompany/firehose/sink/common/blobstorage/oss/ObjectStorageService.java +++ b/src/main/java/com/gotocompany/firehose/sink/common/blobstorage/oss/ObjectStorageService.java @@ -24,7 +24,8 @@ public class ObjectStorageService implements BlobStorage { private final OSS oss; - private final ObjectStorageServiceConfig objectStorageServiceConfig; + private final String ossBucketName; + private final String ossDirectoryPrefix; public ObjectStorageService(ObjectStorageServiceConfig objectStorageServiceConfig) { this(objectStorageServiceConfig, initializeOss(objectStorageServiceConfig)); @@ -32,7 +33,8 @@ public ObjectStorageService(ObjectStorageServiceConfig objectStorageServiceConfi public ObjectStorageService(ObjectStorageServiceConfig objectStorageServiceConfig, OSS oss) { this.oss = oss; - this.objectStorageServiceConfig = objectStorageServiceConfig; + this.ossBucketName = objectStorageServiceConfig.getOssBucketName(); + this.ossDirectoryPrefix = objectStorageServiceConfig.getOssDirectoryPrefix(); checkBucket(); } @@ -60,7 +62,7 @@ protected static OSS initializeOss(ObjectStorageServiceConfig objectStorageServi @Override public void store(String objectName, String filePath) throws BlobStorageException { PutObjectRequest putObjectRequest = new PutObjectRequest( - objectStorageServiceConfig.getOssBucketName(), + ossBucketName, buildObjectPath(objectName), new File(filePath) ); @@ -70,7 +72,7 @@ public void store(String objectName, String filePath) throws BlobStorageExceptio @Override public void store(String objectName, byte[] content) throws BlobStorageException { PutObjectRequest putObjectRequest = new PutObjectRequest( - objectStorageServiceConfig.getOssBucketName(), + ossBucketName, buildObjectPath(objectName), new ByteArrayInputStream(content) ); @@ -90,17 +92,17 @@ private void putObject(PutObjectRequest putObjectRequest) throws BlobStorageExce } private String buildObjectPath(String objectName) { - return Optional.ofNullable(objectStorageServiceConfig.getOssDirectoryPrefix()) + return Optional.ofNullable(ossDirectoryPrefix) .map(prefix -> prefix + "/" + objectName) .orElse(objectName); } private void checkBucket() { - BucketList bucketList = oss.listBuckets(new ListBucketsRequest(objectStorageServiceConfig.getOssBucketName(), + BucketList bucketList = oss.listBuckets(new ListBucketsRequest(ossBucketName, null, 1)); if (bucketList.getBucketList().isEmpty()) { - log.error("Bucket does not exist:{}", objectStorageServiceConfig.getOssBucketName()); - log.error("Please create OSS bucket before running firehose: {}", objectStorageServiceConfig.getOssBucketName()); + log.error("Bucket does not exist:{}", ossBucketName); + log.error("Please create OSS bucket before running firehose: {}", ossBucketName); throw new IllegalArgumentException("Bucket does not exist"); } } diff --git a/src/test/java/com/gotocompany/firehose/config/ObjectStorageServiceConfigTest.java b/src/test/java/com/gotocompany/firehose/config/ObjectStorageServiceConfigTest.java new file mode 100644 index 000000000..bbfb44715 --- /dev/null +++ b/src/test/java/com/gotocompany/firehose/config/ObjectStorageServiceConfigTest.java @@ -0,0 +1,296 @@ +package com.gotocompany.firehose.config; + +import org.aeonbits.owner.ConfigFactory; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.HashMap; +import java.util.Map; + +public class ObjectStorageServiceConfigTest { + + private Map properties; + + @Before + public void setup() { + properties = new HashMap<>(); + properties.put("OSS_TYPE", "SINK"); + } + + @Test + public void shouldGetOssEndpoint() { + properties.put("SINK_OSS_ENDPOINT", "http://oss-cn-hangzhou.aliyuncs.com"); + ObjectStorageServiceConfig config = ConfigFactory.create(ObjectStorageServiceConfig.class, properties); + Assert.assertEquals("http://oss-cn-hangzhou.aliyuncs.com", config.getOssEndpoint()); + } + + @Test + public void shouldGetOssRegion() { + properties.put("SINK_OSS_REGION", "cn-hangzhou"); + ObjectStorageServiceConfig config = ConfigFactory.create(ObjectStorageServiceConfig.class, properties); + Assert.assertEquals("cn-hangzhou", config.getOssRegion()); + } + + @Test + public void shouldGetOssAccessId() { + properties.put("SINK_OSS_ACCESS_ID", "test-access-id"); + ObjectStorageServiceConfig config = ConfigFactory.create(ObjectStorageServiceConfig.class, properties); + Assert.assertEquals("test-access-id", config.getOssAccessId()); + } + + @Test + public void shouldGetOssAccessKey() { + properties.put("SINK_OSS_ACCESS_KEY", "test-access-key"); + ObjectStorageServiceConfig config = ConfigFactory.create(ObjectStorageServiceConfig.class, properties); + Assert.assertEquals("test-access-key", config.getOssAccessKey()); + } + + @Test + public void shouldGetOssBucketName() { + properties.put("SINK_OSS_BUCKET_NAME", "test-bucket"); + ObjectStorageServiceConfig config = ConfigFactory.create(ObjectStorageServiceConfig.class, properties); + Assert.assertEquals("test-bucket", config.getOssBucketName()); + } + + @Test + public void shouldGetOssDirectoryPrefix() { + properties.put("SINK_OSS_DIRECTORY_PREFIX", "test/prefix"); + ObjectStorageServiceConfig config = ConfigFactory.create(ObjectStorageServiceConfig.class, properties); + Assert.assertEquals("test/prefix", config.getOssDirectoryPrefix()); + } + + @Test + public void shouldGetDefaultSocketTimeout() { + ObjectStorageServiceConfig config = ConfigFactory.create(ObjectStorageServiceConfig.class, properties); + Assert.assertEquals(Integer.valueOf(50000), config.getOssSocketTimeoutMs()); + } + + @Test + public void shouldOverrideDefaultSocketTimeout() { + properties.put("SINK_OSS_SOCKET_TIMEOUT_MS", "30000"); + ObjectStorageServiceConfig config = ConfigFactory.create(ObjectStorageServiceConfig.class, properties); + Assert.assertEquals(Integer.valueOf(30000), config.getOssSocketTimeoutMs()); + } + + @Test + public void shouldGetDefaultConnectionTimeout() { + ObjectStorageServiceConfig config = ConfigFactory.create(ObjectStorageServiceConfig.class, properties); + Assert.assertEquals(Integer.valueOf(50000), config.getOssConnectionTimeoutMs()); + } + + @Test + public void shouldOverrideDefaultConnectionTimeout() { + properties.put("SINK_OSS_CONNECTION_TIMEOUT_MS", "20000"); + ObjectStorageServiceConfig config = ConfigFactory.create(ObjectStorageServiceConfig.class, properties); + Assert.assertEquals(Integer.valueOf(20000), config.getOssConnectionTimeoutMs()); + } + + @Test + public void shouldGetDefaultConnectionRequestTimeout() { + ObjectStorageServiceConfig config = ConfigFactory.create(ObjectStorageServiceConfig.class, properties); + Assert.assertEquals(Integer.valueOf(-1), config.getOssConnectionRequestTimeoutMs()); + } + + @Test + public void shouldOverrideDefaultConnectionRequestTimeout() { + properties.put("SINK_OSS_CONNECTION_REQUEST_TIMEOUT_MS", "15000"); + ObjectStorageServiceConfig config = ConfigFactory.create(ObjectStorageServiceConfig.class, properties); + Assert.assertEquals(Integer.valueOf(15000), config.getOssConnectionRequestTimeoutMs()); + } + + @Test + public void shouldGetDefaultRequestTimeout() { + ObjectStorageServiceConfig config = ConfigFactory.create(ObjectStorageServiceConfig.class, properties); + Assert.assertEquals(Integer.valueOf(300000), config.getOssRequestTimeoutMs()); + } + + @Test + public void shouldOverrideDefaultRequestTimeout() { + properties.put("SINK_OSS_REQUEST_TIMEOUT_MS", "200000"); + ObjectStorageServiceConfig config = ConfigFactory.create(ObjectStorageServiceConfig.class, properties); + Assert.assertEquals(Integer.valueOf(200000), config.getOssRequestTimeoutMs()); + } + + @Test + public void shouldGetDefaultRetryEnabled() { + ObjectStorageServiceConfig config = ConfigFactory.create(ObjectStorageServiceConfig.class, properties); + Assert.assertTrue(config.isRetryEnabled()); + } + + @Test + public void shouldOverrideDefaultRetryEnabled() { + properties.put("SINK_OSS_RETRY_ENABLED", "false"); + ObjectStorageServiceConfig config = ConfigFactory.create(ObjectStorageServiceConfig.class, properties); + Assert.assertFalse(config.isRetryEnabled()); + } + + @Test + public void shouldGetDefaultMaxRetryAttempts() { + ObjectStorageServiceConfig config = ConfigFactory.create(ObjectStorageServiceConfig.class, properties); + Assert.assertEquals(3, config.getOssMaxRetryAttempts()); + } + + @Test + public void shouldOverrideDefaultMaxRetryAttempts() { + properties.put("SINK_OSS_MAX_RETRY_ATTEMPTS", "5"); + ObjectStorageServiceConfig config = ConfigFactory.create(ObjectStorageServiceConfig.class, properties); + Assert.assertEquals(5, config.getOssMaxRetryAttempts()); + } + + @Test + public void shouldHandleEmptyOssType() { + properties.put("OSS_TYPE", ""); + ObjectStorageServiceConfig config = ConfigFactory.create(ObjectStorageServiceConfig.class, properties); + Assert.assertNull(config.getOssEndpoint()); + } + + @Test + public void shouldHandleNullOssType() { + properties.remove("OSS_TYPE"); + ObjectStorageServiceConfig config = ConfigFactory.create(ObjectStorageServiceConfig.class, properties); + Assert.assertNull(config.getOssEndpoint()); + } + + @Test + public void shouldHandleSpecialCharactersInBucketName() { + properties.put("SINK_OSS_BUCKET_NAME", "test-bucket-123_special"); + ObjectStorageServiceConfig config = ConfigFactory.create(ObjectStorageServiceConfig.class, properties); + Assert.assertEquals("test-bucket-123_special", config.getOssBucketName()); + } + + @Test + public void shouldHandleInternalEndpoint() { + properties.put("SINK_OSS_ENDPOINT", "http://oss-cn-hangzhou-internal.aliyuncs.com"); + ObjectStorageServiceConfig config = ConfigFactory.create(ObjectStorageServiceConfig.class, properties); + Assert.assertEquals("http://oss-cn-hangzhou-internal.aliyuncs.com", config.getOssEndpoint()); + } + + @Test + public void shouldHandleHttpsEndpoint() { + properties.put("SINK_OSS_ENDPOINT", "https://oss-cn-hangzhou.aliyuncs.com"); + ObjectStorageServiceConfig config = ConfigFactory.create(ObjectStorageServiceConfig.class, properties); + Assert.assertEquals("https://oss-cn-hangzhou.aliyuncs.com", config.getOssEndpoint()); + } + + @Test + public void shouldHandleCustomEndpoint() { + properties.put("SINK_OSS_ENDPOINT", "http://custom-domain.com"); + ObjectStorageServiceConfig config = ConfigFactory.create(ObjectStorageServiceConfig.class, properties); + Assert.assertEquals("http://custom-domain.com", config.getOssEndpoint()); + } + + @Test + public void shouldHandleZeroTimeouts() { + properties.put("SINK_OSS_SOCKET_TIMEOUT_MS", "0"); + properties.put("SINK_OSS_CONNECTION_TIMEOUT_MS", "0"); + properties.put("SINK_OSS_REQUEST_TIMEOUT_MS", "0"); + ObjectStorageServiceConfig config = ConfigFactory.create(ObjectStorageServiceConfig.class, properties); + Assert.assertEquals(Integer.valueOf(0), config.getOssSocketTimeoutMs()); + Assert.assertEquals(Integer.valueOf(0), config.getOssConnectionTimeoutMs()); + Assert.assertEquals(Integer.valueOf(0), config.getOssRequestTimeoutMs()); + } + + @Test + public void shouldHandleNegativeTimeouts() { + properties.put("SINK_OSS_SOCKET_TIMEOUT_MS", "-1"); + properties.put("SINK_OSS_CONNECTION_TIMEOUT_MS", "-1"); + properties.put("SINK_OSS_REQUEST_TIMEOUT_MS", "-1"); + ObjectStorageServiceConfig config = ConfigFactory.create(ObjectStorageServiceConfig.class, properties); + Assert.assertEquals(Integer.valueOf(-1), config.getOssSocketTimeoutMs()); + Assert.assertEquals(Integer.valueOf(-1), config.getOssConnectionTimeoutMs()); + Assert.assertEquals(Integer.valueOf(-1), config.getOssRequestTimeoutMs()); + } + + @Test + public void shouldHandleMaxIntegerTimeouts() { + properties.put("SINK_OSS_SOCKET_TIMEOUT_MS", String.valueOf(Integer.MAX_VALUE)); + properties.put("SINK_OSS_CONNECTION_TIMEOUT_MS", String.valueOf(Integer.MAX_VALUE)); + properties.put("SINK_OSS_REQUEST_TIMEOUT_MS", String.valueOf(Integer.MAX_VALUE)); + ObjectStorageServiceConfig config = ConfigFactory.create(ObjectStorageServiceConfig.class, properties); + Assert.assertEquals(Integer.valueOf(Integer.MAX_VALUE), config.getOssSocketTimeoutMs()); + Assert.assertEquals(Integer.valueOf(Integer.MAX_VALUE), config.getOssConnectionTimeoutMs()); + Assert.assertEquals(Integer.valueOf(Integer.MAX_VALUE), config.getOssRequestTimeoutMs()); + } + + @Test + public void shouldHandleEmptyDirectoryPrefix() { + properties.put("SINK_OSS_DIRECTORY_PREFIX", ""); + ObjectStorageServiceConfig config = ConfigFactory.create(ObjectStorageServiceConfig.class, properties); + Assert.assertEquals("", config.getOssDirectoryPrefix()); + } + + @Test + public void shouldHandleMultiLevelDirectoryPrefix() { + properties.put("SINK_OSS_DIRECTORY_PREFIX", "level1/level2/level3"); + ObjectStorageServiceConfig config = ConfigFactory.create(ObjectStorageServiceConfig.class, properties); + Assert.assertEquals("level1/level2/level3", config.getOssDirectoryPrefix()); + } + + @Test + public void shouldHandleDirectoryPrefixWithSpecialCharacters() { + properties.put("SINK_OSS_DIRECTORY_PREFIX", "test-prefix_123/special@chars"); + ObjectStorageServiceConfig config = ConfigFactory.create(ObjectStorageServiceConfig.class, properties); + Assert.assertEquals("test-prefix_123/special@chars", config.getOssDirectoryPrefix()); + } + + @Test + public void shouldHandleMaxRetryAttemptsZero() { + properties.put("SINK_OSS_MAX_RETRY_ATTEMPTS", "0"); + ObjectStorageServiceConfig config = ConfigFactory.create(ObjectStorageServiceConfig.class, properties); + Assert.assertEquals(0, config.getOssMaxRetryAttempts()); + } + + @Test + public void shouldHandleMaxRetryAttemptsNegative() { + properties.put("SINK_OSS_MAX_RETRY_ATTEMPTS", "-1"); + ObjectStorageServiceConfig config = ConfigFactory.create(ObjectStorageServiceConfig.class, properties); + Assert.assertEquals(-1, config.getOssMaxRetryAttempts()); + } + + @Test + public void shouldHandleMaxRetryAttemptsMaxInteger() { + properties.put("SINK_OSS_MAX_RETRY_ATTEMPTS", String.valueOf(Integer.MAX_VALUE)); + ObjectStorageServiceConfig config = ConfigFactory.create(ObjectStorageServiceConfig.class, properties); + Assert.assertEquals(Integer.MAX_VALUE, config.getOssMaxRetryAttempts()); + } + + @Test + public void shouldHandleDifferentOssTypes() { + properties.put("OSS_TYPE", "DLQ"); + properties.put("DLQ_OSS_ENDPOINT", "http://oss-cn-hangzhou.aliyuncs.com"); + ObjectStorageServiceConfig config = ConfigFactory.create(ObjectStorageServiceConfig.class, properties); + Assert.assertEquals("http://oss-cn-hangzhou.aliyuncs.com", config.getOssEndpoint()); + } + + @Test + public void shouldHandleAllConfigurationsTogether() { + properties.put("SINK_OSS_ENDPOINT", "https://oss-cn-beijing.aliyuncs.com"); + properties.put("SINK_OSS_REGION", "cn-beijing"); + properties.put("SINK_OSS_ACCESS_ID", "test-id"); + properties.put("SINK_OSS_ACCESS_KEY", "test-key"); + properties.put("SINK_OSS_BUCKET_NAME", "test-bucket"); + properties.put("SINK_OSS_DIRECTORY_PREFIX", "test/prefix"); + properties.put("SINK_OSS_SOCKET_TIMEOUT_MS", "10000"); + properties.put("SINK_OSS_CONNECTION_TIMEOUT_MS", "10000"); + properties.put("SINK_OSS_CONNECTION_REQUEST_TIMEOUT_MS", "5000"); + properties.put("SINK_OSS_REQUEST_TIMEOUT_MS", "60000"); + properties.put("SINK_OSS_RETRY_ENABLED", "true"); + properties.put("SINK_OSS_MAX_RETRY_ATTEMPTS", "5"); + + ObjectStorageServiceConfig config = ConfigFactory.create(ObjectStorageServiceConfig.class, properties); + + Assert.assertEquals("https://oss-cn-beijing.aliyuncs.com", config.getOssEndpoint()); + Assert.assertEquals("cn-beijing", config.getOssRegion()); + Assert.assertEquals("test-id", config.getOssAccessId()); + Assert.assertEquals("test-key", config.getOssAccessKey()); + Assert.assertEquals("test-bucket", config.getOssBucketName()); + Assert.assertEquals("test/prefix", config.getOssDirectoryPrefix()); + Assert.assertEquals(Integer.valueOf(10000), config.getOssSocketTimeoutMs()); + Assert.assertEquals(Integer.valueOf(10000), config.getOssConnectionTimeoutMs()); + Assert.assertEquals(Integer.valueOf(5000), config.getOssConnectionRequestTimeoutMs()); + Assert.assertEquals(Integer.valueOf(60000), config.getOssRequestTimeoutMs()); + Assert.assertTrue(config.isRetryEnabled()); + Assert.assertEquals(5, config.getOssMaxRetryAttempts()); + } +} diff --git a/src/test/java/com/gotocompany/firehose/sink/common/blobstorage/BlobStorageFactoryTest.java b/src/test/java/com/gotocompany/firehose/sink/common/blobstorage/BlobStorageFactoryTest.java new file mode 100644 index 000000000..296f5be81 --- /dev/null +++ b/src/test/java/com/gotocompany/firehose/sink/common/blobstorage/BlobStorageFactoryTest.java @@ -0,0 +1,174 @@ +package com.gotocompany.firehose.sink.common.blobstorage; + +import com.aliyun.oss.OSS; +import com.aliyun.oss.model.Bucket; +import com.aliyun.oss.model.BucketList; +import com.aliyun.oss.model.ListBucketsRequest; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.*; + +@RunWith(MockitoJUnitRunner.class) +public class BlobStorageFactoryTest { + + private Map validOssConfig; + + @Mock + private OSS ossClient; + + @Mock + private BucketList bucketList; + + @Before + public void setUp() { + validOssConfig = new HashMap<>(); + validOssConfig.put("OSS_TYPE_OSS_ENDPOINT", "oss-cn-hangzhou.aliyuncs.com"); + validOssConfig.put("OSS_TYPE_OSS_REGION", "cn-hangzhou"); + validOssConfig.put("OSS_TYPE_OSS_ACCESS_ID", "test-access-id"); + validOssConfig.put("OSS_TYPE_OSS_ACCESS_KEY", "test-access-key"); + validOssConfig.put("OSS_TYPE_OSS_BUCKET_NAME", "test-bucket"); + validOssConfig.put("OSS_TYPE_OSS_DIRECTORY_PREFIX", "test-prefix"); + + when(ossClient.listBuckets(any(ListBucketsRequest.class))).thenReturn(bucketList); + when(bucketList.getBucketList()).thenReturn(Collections.singletonList(mock(Bucket.class))); + } + + @Test(expected = IllegalArgumentException.class) + public void shouldThrowExceptionWhenConfigIsNull() { + BlobStorageFactory.createObjectStorage(BlobStorageType.OSS, null); + } + + @Test(expected = IllegalArgumentException.class) + public void shouldThrowExceptionWhenConfigIsEmpty() { + BlobStorageFactory.createObjectStorage(BlobStorageType.OSS, new HashMap<>()); + } + + @Test(expected = IllegalArgumentException.class) + public void shouldThrowExceptionWhenEndpointIsMissing() { + validOssConfig.remove("OSS_TYPE_OSS_ENDPOINT"); + BlobStorageFactory.createObjectStorage(BlobStorageType.OSS, validOssConfig); + } + + @Test(expected = IllegalArgumentException.class) + public void shouldThrowExceptionWhenRegionIsMissing() { + validOssConfig.remove("OSS_TYPE_OSS_REGION"); + BlobStorageFactory.createObjectStorage(BlobStorageType.OSS, validOssConfig); + } + + @Test(expected = IllegalArgumentException.class) + public void shouldThrowExceptionWhenAccessIdIsMissing() { + validOssConfig.remove("OSS_TYPE_OSS_ACCESS_ID"); + BlobStorageFactory.createObjectStorage(BlobStorageType.OSS, validOssConfig); + } + + @Test(expected = IllegalArgumentException.class) + public void shouldThrowExceptionWhenAccessKeyIsMissing() { + validOssConfig.remove("OSS_TYPE_OSS_ACCESS_KEY"); + BlobStorageFactory.createObjectStorage(BlobStorageType.OSS, validOssConfig); + } + + @Test(expected = IllegalArgumentException.class) + public void shouldThrowExceptionWhenBucketNameIsMissing() { + validOssConfig.remove("OSS_TYPE_OSS_BUCKET_NAME"); + BlobStorageFactory.createObjectStorage(BlobStorageType.OSS, validOssConfig); + } + + @Test(expected = IllegalArgumentException.class) + public void shouldThrowExceptionWhenEndpointIsInvalid() { + validOssConfig.put("OSS_TYPE_OSS_ENDPOINT", ""); + BlobStorageFactory.createObjectStorage(BlobStorageType.OSS, validOssConfig); + } + + @Test(expected = IllegalArgumentException.class) + public void shouldThrowExceptionWhenAccessIdIsEmpty() { + validOssConfig.put("OSS_TYPE_OSS_ACCESS_ID", ""); + BlobStorageFactory.createObjectStorage(BlobStorageType.OSS, validOssConfig); + } + + @Test(expected = IllegalArgumentException.class) + public void shouldThrowExceptionWhenAccessKeyIsEmpty() { + validOssConfig.put("OSS_TYPE_OSS_ACCESS_KEY", ""); + BlobStorageFactory.createObjectStorage(BlobStorageType.OSS, validOssConfig); + } + + @Test(expected = IllegalArgumentException.class) + public void shouldThrowExceptionWhenBucketNameIsEmpty() { + validOssConfig.put("OSS_TYPE_OSS_BUCKET_NAME", ""); + BlobStorageFactory.createObjectStorage(BlobStorageType.OSS, validOssConfig); + } + + @Test(expected = IllegalArgumentException.class) + public void shouldThrowExceptionWhenSocketTimeoutIsInvalid() { + validOssConfig.put("OSS_TYPE_OSS_SOCKET_TIMEOUT_MS", "invalid"); + BlobStorageFactory.createObjectStorage(BlobStorageType.OSS, validOssConfig); + } + + @Test(expected = IllegalArgumentException.class) + public void shouldThrowExceptionWhenSocketTimeoutIsNegative() { + validOssConfig.put("OSS_TYPE_OSS_SOCKET_TIMEOUT_MS", "-1000"); + BlobStorageFactory.createObjectStorage(BlobStorageType.OSS, validOssConfig); + } + + @Test(expected = IllegalArgumentException.class) + public void shouldThrowExceptionWhenConnectionTimeoutIsInvalid() { + validOssConfig.put("OSS_TYPE_OSS_CONNECTION_TIMEOUT_MS", "invalid"); + BlobStorageFactory.createObjectStorage(BlobStorageType.OSS, validOssConfig); + } + + @Test(expected = IllegalArgumentException.class) + public void shouldThrowExceptionWhenConnectionTimeoutIsNegative() { + validOssConfig.put("OSS_TYPE_OSS_CONNECTION_TIMEOUT_MS", "-1000"); + BlobStorageFactory.createObjectStorage(BlobStorageType.OSS, validOssConfig); + } + + @Test(expected = IllegalArgumentException.class) + public void shouldThrowExceptionWhenRequestTimeoutIsInvalid() { + validOssConfig.put("OSS_TYPE_OSS_REQUEST_TIMEOUT_MS", "invalid"); + BlobStorageFactory.createObjectStorage(BlobStorageType.OSS, validOssConfig); + } + + @Test(expected = IllegalArgumentException.class) + public void shouldThrowExceptionWhenRequestTimeoutIsNegative() { + validOssConfig.put("OSS_TYPE_OSS_REQUEST_TIMEOUT_MS", "-1000"); + BlobStorageFactory.createObjectStorage(BlobStorageType.OSS, validOssConfig); + } + + @Test(expected = IllegalArgumentException.class) + public void shouldThrowExceptionWhenRetryEnabledIsInvalid() { + validOssConfig.put("OSS_TYPE_OSS_RETRY_ENABLED", "invalid"); + BlobStorageFactory.createObjectStorage(BlobStorageType.OSS, validOssConfig); + } + + @Test(expected = IllegalArgumentException.class) + public void shouldThrowExceptionWhenMaxRetryAttemptsIsInvalid() { + validOssConfig.put("OSS_TYPE_OSS_MAX_RETRY_ATTEMPTS", "invalid"); + BlobStorageFactory.createObjectStorage(BlobStorageType.OSS, validOssConfig); + } + + @Test(expected = IllegalArgumentException.class) + public void shouldThrowExceptionWhenMaxRetryAttemptsIsNegative() { + validOssConfig.put("OSS_TYPE_OSS_MAX_RETRY_ATTEMPTS", "-1"); + BlobStorageFactory.createObjectStorage(BlobStorageType.OSS, validOssConfig); + } + + @Test(expected = IllegalArgumentException.class) + public void shouldThrowExceptionWhenMaxRetryAttemptsIsZero() { + validOssConfig.put("OSS_TYPE_OSS_MAX_RETRY_ATTEMPTS", "0"); + BlobStorageFactory.createObjectStorage(BlobStorageType.OSS, validOssConfig); + } + + @Test(expected = IllegalArgumentException.class) + public void shouldThrowExceptionWhenRegionIsInvalid() { + validOssConfig.put("OSS_TYPE_OSS_REGION", ""); + BlobStorageFactory.createObjectStorage(BlobStorageType.OSS, validOssConfig); + } +} diff --git a/src/test/java/com/gotocompany/firehose/sink/common/blobstorage/oss/ObjectStorageServiceTest.java b/src/test/java/com/gotocompany/firehose/sink/common/blobstorage/oss/ObjectStorageServiceTest.java index 4835bbc00..66a5f67d9 100644 --- a/src/test/java/com/gotocompany/firehose/sink/common/blobstorage/oss/ObjectStorageServiceTest.java +++ b/src/test/java/com/gotocompany/firehose/sink/common/blobstorage/oss/ObjectStorageServiceTest.java @@ -27,7 +27,8 @@ import java.util.Collections; import static org.junit.Assert.assertEquals; -import static org.mockito.Mockito.when; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.*; public class ObjectStorageServiceTest { @@ -101,15 +102,15 @@ public void shouldStoreObjectGivenFilePath() throws BlobStorageException { when(objectStorageServiceConfig.getOssDirectoryPrefix()).thenReturn("dir_prefix"); OSS oss = Mockito.spy(OSS.class); ArgumentCaptor argumentCaptor = ArgumentCaptor.forClass(PutObjectRequest.class); - when(oss.putObject(Mockito.any(PutObjectRequest.class))).thenReturn(null); + when(oss.putObject(any(PutObjectRequest.class))).thenReturn(null); BucketList bucketList = new BucketList(); bucketList.setBucketList(Collections.singletonList(Mockito.mock(Bucket.class))); - when(oss.listBuckets(Mockito.any(ListBucketsRequest.class))).thenReturn(bucketList); + when(oss.listBuckets(any(ListBucketsRequest.class))).thenReturn(bucketList); ObjectStorageService objectStorageService = new ObjectStorageService(objectStorageServiceConfig, oss); objectStorageService.store("objectName", "filePath"); - Mockito.verify(oss, Mockito.times(1)) + verify(oss, times(1)) .putObject(argumentCaptor.capture()); assertEquals("bucket_name", argumentCaptor.getValue().getBucketName()); assertEquals("dir_prefix/objectName", argumentCaptor.getValue().getKey()); @@ -126,16 +127,16 @@ public void shouldStoreObjectGivenFileContent() throws BlobStorageException, IOE when(objectStorageServiceConfig.getOssBucketName()).thenReturn("bucket_name"); when(objectStorageServiceConfig.getOssDirectoryPrefix()).thenReturn("dir_prefix"); OSS oss = Mockito.spy(OSS.class); - when(oss.putObject(Mockito.any(PutObjectRequest.class))).thenReturn(null); + when(oss.putObject(any(PutObjectRequest.class))).thenReturn(null); BucketList bucketList = new BucketList(); bucketList.setBucketList(Collections.singletonList(Mockito.mock(Bucket.class))); - when(oss.listBuckets(Mockito.any(ListBucketsRequest.class))).thenReturn(bucketList); + when(oss.listBuckets(any(ListBucketsRequest.class))).thenReturn(bucketList); ObjectStorageService objectStorageService = new ObjectStorageService(objectStorageServiceConfig, oss); objectStorageService.store("objectName", "content".getBytes()); ArgumentCaptor argumentCaptor = ArgumentCaptor.forClass(PutObjectRequest.class); - Mockito.verify(oss, Mockito.times(1)) + verify(oss, times(1)) .putObject(argumentCaptor.capture()); assertEquals("bucket_name", argumentCaptor.getValue().getBucketName()); assertEquals("dir_prefix/objectName", argumentCaptor.getValue().getKey()); @@ -155,7 +156,7 @@ public void shouldThrowIllegalArgumentExceptionWhenGivenBucketIsNotExists() thro OSS oss = Mockito.spy(OSS.class); BucketList bucketList = new BucketList(); bucketList.setBucketList(new ArrayList<>()); - when(oss.listBuckets(Mockito.any(ListBucketsRequest.class))).thenReturn(bucketList); + when(oss.listBuckets(any(ListBucketsRequest.class))).thenReturn(bucketList); ObjectStorageService objectStorageService = new ObjectStorageService(objectStorageServiceConfig, oss); objectStorageService.store("objectName", "content".getBytes()); @@ -171,10 +172,10 @@ public void shouldWrapToBlobStorageExceptionWhenClientExceptionIsThrown() throws when(objectStorageServiceConfig.getOssBucketName()).thenReturn("bucket_name"); when(objectStorageServiceConfig.getOssDirectoryPrefix()).thenReturn("dir_prefix"); OSS oss = Mockito.spy(OSS.class); - when(oss.putObject(Mockito.any(PutObjectRequest.class))).thenThrow(new ClientException("client_error")); + when(oss.putObject(any(PutObjectRequest.class))).thenThrow(new ClientException("client_error")); BucketList bucketList = new BucketList(); bucketList.setBucketList(Collections.singletonList(Mockito.mock(Bucket.class))); - when(oss.listBuckets(Mockito.any(ListBucketsRequest.class))).thenReturn(bucketList); + when(oss.listBuckets(any(ListBucketsRequest.class))).thenReturn(bucketList); ObjectStorageService objectStorageService = new ObjectStorageService(objectStorageServiceConfig, oss); objectStorageService.store("objectName", "content".getBytes()); @@ -190,10 +191,10 @@ public void shouldWrapToBlobStorageExceptionWhenOSSExceptionIsThrown() throws Bl when(objectStorageServiceConfig.getOssBucketName()).thenReturn("bucket_name"); when(objectStorageServiceConfig.getOssDirectoryPrefix()).thenReturn("dir_prefix"); OSS oss = Mockito.spy(OSS.class); - when(oss.putObject(Mockito.any(PutObjectRequest.class))).thenThrow(new OSSException("server is down")); + when(oss.putObject(any(PutObjectRequest.class))).thenThrow(new OSSException("server is down")); BucketList bucketList = new BucketList(); bucketList.setBucketList(Collections.singletonList(Mockito.mock(Bucket.class))); - when(oss.listBuckets(Mockito.any(ListBucketsRequest.class))).thenReturn(bucketList); + when(oss.listBuckets(any(ListBucketsRequest.class))).thenReturn(bucketList); ObjectStorageService objectStorageService = new ObjectStorageService(objectStorageServiceConfig, oss); objectStorageService.store("objectName", "content".getBytes()); @@ -209,4 +210,230 @@ private static String getContent(InputStream inputStream) throws IOException { return result.toString("UTF-8"); } + @Test + public void shouldHandleLargeFileUpload() throws BlobStorageException { + ObjectStorageServiceConfig config = Mockito.mock(ObjectStorageServiceConfig.class); + when(config.getOssBucketName()).thenReturn("bucket_name"); + OSS oss = Mockito.spy(OSS.class); + BucketList bucketList = new BucketList(); + bucketList.setBucketList(Collections.singletonList(Mockito.mock(Bucket.class))); + when(oss.listBuckets(any())).thenReturn(bucketList); + ObjectStorageService service = new ObjectStorageService(config, oss); + + byte[] largeContent = new byte[10 * 1024 * 1024]; + service.store("large_file.dat", largeContent); + + verify(oss).putObject(any(PutObjectRequest.class)); + } + + @Test(expected = BlobStorageException.class) + public void shouldHandleNetworkDisconnection() throws BlobStorageException { + ObjectStorageServiceConfig config = Mockito.mock(ObjectStorageServiceConfig.class); + when(config.getOssBucketName()).thenReturn("bucket_name"); + OSS oss = Mockito.spy(OSS.class); + BucketList bucketList = new BucketList(); + bucketList.setBucketList(Collections.singletonList(Mockito.mock(Bucket.class))); + when(oss.listBuckets(any())).thenReturn(bucketList); + when(oss.putObject(any())).thenThrow(new ClientException("Network disconnected")); + ObjectStorageService service = new ObjectStorageService(config, oss); + + service.store("test.txt", "content".getBytes()); + } + + @Test(expected = BlobStorageException.class) + public void shouldHandleInvalidCredentials() throws BlobStorageException { + ObjectStorageServiceConfig config = Mockito.mock(ObjectStorageServiceConfig.class); + when(config.getOssBucketName()).thenReturn("bucket_name"); + OSS oss = Mockito.spy(OSS.class); + BucketList bucketList = new BucketList(); + bucketList.setBucketList(Collections.singletonList(Mockito.mock(Bucket.class))); + when(oss.listBuckets(any())).thenReturn(bucketList); + when(oss.putObject(any())).thenThrow(new OSSException("InvalidAccessKeyId")); + ObjectStorageService service = new ObjectStorageService(config, oss); + + service.store("test.txt", "content".getBytes()); + } + + @Test(expected = BlobStorageException.class) + public void shouldHandleRateLimitExceeded() throws BlobStorageException { + ObjectStorageServiceConfig config = Mockito.mock(ObjectStorageServiceConfig.class); + when(config.getOssBucketName()).thenReturn("bucket_name"); + OSS oss = Mockito.spy(OSS.class); + BucketList bucketList = new BucketList(); + bucketList.setBucketList(Collections.singletonList(Mockito.mock(Bucket.class))); + when(oss.listBuckets(any())).thenReturn(bucketList); + when(oss.putObject(any())).thenThrow(new OSSException("RequestTimeTooSkewed")); + ObjectStorageService service = new ObjectStorageService(config, oss); + + service.store("test.txt", "content".getBytes()); + } + + @Test + public void shouldHandleEmptyFile() throws BlobStorageException { + ObjectStorageServiceConfig config = Mockito.mock(ObjectStorageServiceConfig.class); + when(config.getOssBucketName()).thenReturn("bucket_name"); + OSS oss = Mockito.spy(OSS.class); + BucketList bucketList = new BucketList(); + bucketList.setBucketList(Collections.singletonList(Mockito.mock(Bucket.class))); + when(oss.listBuckets(any())).thenReturn(bucketList); + ObjectStorageService service = new ObjectStorageService(config, oss); + + service.store("empty.txt", new byte[0]); + + verify(oss).putObject(any(PutObjectRequest.class)); + } + + @Test + public void shouldHandleMultipleDirectoryLevels() throws BlobStorageException { + ObjectStorageServiceConfig config = Mockito.mock(ObjectStorageServiceConfig.class); + when(config.getOssBucketName()).thenReturn("bucket_name"); + when(config.getOssDirectoryPrefix()).thenReturn("level1/level2/level3"); + OSS oss = Mockito.spy(OSS.class); + BucketList bucketList = new BucketList(); + bucketList.setBucketList(Collections.singletonList(Mockito.mock(Bucket.class))); + when(oss.listBuckets(any())).thenReturn(bucketList); + ObjectStorageService service = new ObjectStorageService(config, oss); + + service.store("test.txt", "content".getBytes()); + + ArgumentCaptor captor = ArgumentCaptor.forClass(PutObjectRequest.class); + verify(oss).putObject(captor.capture()); + assertEquals("level1/level2/level3/test.txt", captor.getValue().getKey()); + } + + + @Test(expected = BlobStorageException.class) + public void shouldHandleServerSideError() throws BlobStorageException { + ObjectStorageServiceConfig config = Mockito.mock(ObjectStorageServiceConfig.class); + when(config.getOssBucketName()).thenReturn("bucket_name"); + OSS oss = Mockito.spy(OSS.class); + BucketList bucketList = new BucketList(); + bucketList.setBucketList(Collections.singletonList(Mockito.mock(Bucket.class))); + when(oss.listBuckets(any())).thenReturn(bucketList); + when(oss.putObject(any())).thenThrow(new OSSException("InternalError")); + ObjectStorageService service = new ObjectStorageService(config, oss); + + service.store("test.txt", "content".getBytes()); + } + + @Test(expected = BlobStorageException.class) + public void shouldHandleBucketPermissionDenied() throws BlobStorageException { + ObjectStorageServiceConfig config = Mockito.mock(ObjectStorageServiceConfig.class); + when(config.getOssBucketName()).thenReturn("bucket_name"); + OSS oss = Mockito.spy(OSS.class); + BucketList bucketList = new BucketList(); + bucketList.setBucketList(Collections.singletonList(Mockito.mock(Bucket.class))); + when(oss.listBuckets(Mockito.any())).thenReturn(bucketList); + when(oss.putObject(any())).thenThrow(new OSSException("AccessDenied")); + ObjectStorageService service = new ObjectStorageService(config, oss); + + service.store("test.txt", "content".getBytes()); + } + + @Test(expected = BlobStorageException.class) + public void shouldHandleBucketNotFound() throws BlobStorageException { + ObjectStorageServiceConfig config = Mockito.mock(ObjectStorageServiceConfig.class); + when(config.getOssBucketName()).thenReturn("nonexistent_bucket"); + OSS oss = Mockito.spy(OSS.class); + BucketList bucketList = new BucketList(); + bucketList.setBucketList(Collections.singletonList(Mockito.mock(Bucket.class))); + when(oss.listBuckets(Mockito.any())).thenReturn(bucketList); + when(oss.putObject(any())).thenThrow(new OSSException("NoSuchBucket")); + ObjectStorageService service = new ObjectStorageService(config, oss); + + service.store("test.txt", "content".getBytes()); + } + + @Test + public void shouldHandleVeryLongObjectNames() throws BlobStorageException { + ObjectStorageServiceConfig config = Mockito.mock(ObjectStorageServiceConfig.class); + when(config.getOssBucketName()).thenReturn("bucket_name"); + OSS oss = Mockito.spy(OSS.class); + BucketList bucketList = new BucketList(); + bucketList.setBucketList(Collections.singletonList(Mockito.mock(Bucket.class))); + when(oss.listBuckets(Mockito.any())).thenReturn(bucketList); + ObjectStorageService service = new ObjectStorageService(config, oss); + + StringBuilder longName = new StringBuilder(); + for (int i = 0; i < 100; i++) { + longName.append("very_long_name_"); + } + + service.store(longName.toString(), "content".getBytes()); + + verify(oss).putObject(any(PutObjectRequest.class)); + } + + @Test(expected = BlobStorageException.class) + public void shouldHandleQuotaExceeded() throws BlobStorageException { + ObjectStorageServiceConfig config = Mockito.mock(ObjectStorageServiceConfig.class); + when(config.getOssBucketName()).thenReturn("bucket_name"); + OSS oss = Mockito.spy(OSS.class); + BucketList bucketList = new BucketList(); + bucketList.setBucketList(Collections.singletonList(Mockito.mock(Bucket.class))); + when(oss.listBuckets(Mockito.any())).thenReturn(bucketList); + when(oss.putObject(any())).thenThrow(new OSSException("QuotaExceeded")); + ObjectStorageService service = new ObjectStorageService(config, oss); + + service.store("test.txt", "content".getBytes()); + } + + @Test(expected = BlobStorageException.class) + public void shouldHandleEntityTooLarge() throws BlobStorageException { + ObjectStorageServiceConfig config = Mockito.mock(ObjectStorageServiceConfig.class); + when(config.getOssBucketName()).thenReturn("bucket_name"); + OSS oss = Mockito.spy(OSS.class); + BucketList bucketList = new BucketList(); + bucketList.setBucketList(Collections.singletonList(Mockito.mock(Bucket.class))); + when(oss.listBuckets(Mockito.any())).thenReturn(bucketList); + when(oss.putObject(any())).thenThrow(new OSSException("EntityTooLarge")); + ObjectStorageService service = new ObjectStorageService(config, oss); + + service.store("test.txt", "content".getBytes()); + } + + @Test + public void shouldHandleWhitespaceInObjectName() throws BlobStorageException { + ObjectStorageServiceConfig config = Mockito.mock(ObjectStorageServiceConfig.class); + when(config.getOssBucketName()).thenReturn("bucket_name"); + OSS oss = Mockito.spy(OSS.class); + BucketList bucketList = new BucketList(); + bucketList.setBucketList(Collections.singletonList(Mockito.mock(Bucket.class))); + when(oss.listBuckets(Mockito.any())).thenReturn(bucketList); + ObjectStorageService service = new ObjectStorageService(config, oss); + + service.store("file with spaces.txt", "content".getBytes()); + + ArgumentCaptor captor = ArgumentCaptor.forClass(PutObjectRequest.class); + verify(oss).putObject(captor.capture()); + assertEquals("file with spaces.txt", captor.getValue().getKey()); + } + + @Test(expected = BlobStorageException.class) + public void shouldHandleSignatureDoesNotMatch() throws BlobStorageException { + ObjectStorageServiceConfig config = Mockito.mock(ObjectStorageServiceConfig.class); + when(config.getOssBucketName()).thenReturn("bucket_name"); + OSS oss = Mockito.spy(OSS.class); + BucketList bucketList = new BucketList(); + bucketList.setBucketList(Collections.singletonList(Mockito.mock(Bucket.class))); + when(oss.listBuckets(Mockito.any())).thenReturn(bucketList); + when(oss.putObject(any())).thenThrow(new OSSException("SignatureDoesNotMatch")); + ObjectStorageService service = new ObjectStorageService(config, oss); + + service.store("test.txt", "content".getBytes()); + } + + @Test(expected = BlobStorageException.class) + public void shouldHandleRequestTimeout() throws BlobStorageException { + ObjectStorageServiceConfig config = Mockito.mock(ObjectStorageServiceConfig.class); + when(config.getOssBucketName()).thenReturn("bucket_name"); + OSS oss = Mockito.spy(OSS.class); + BucketList bucketList = new BucketList(); + bucketList.setBucketList(Collections.singletonList(Mockito.mock(Bucket.class))); + when(oss.listBuckets(Mockito.any())).thenReturn(bucketList); + when(oss.putObject(any())).thenThrow(new ClientException("RequestTimeout")); + ObjectStorageService service = new ObjectStorageService(config, oss); + + service.store("test.txt", "content".getBytes()); + } }