From 4d1e9d67491cd8ecaddd0d7657b5916ba8543e3f Mon Sep 17 00:00:00 2001 From: rajuGT Date: Thu, 23 Jan 2025 12:46:25 +0530 Subject: [PATCH 1/8] Add gradle tasks to minimal and dependencies to maven local (#40) Co-authored-by: Raju G T --- dagger-core/build.gradle | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/dagger-core/build.gradle b/dagger-core/build.gradle index 0dee7e59e..9f020b2b4 100644 --- a/dagger-core/build.gradle +++ b/dagger-core/build.gradle @@ -273,18 +273,27 @@ task minimalPublish(dependsOn: 'minimalJar') { description('Publishes minimal jar') } +task minimalPublishToMavenLocal(dependsOn: 'minimalJar') { + dependsOn('publishMinimalArtifactPublicationToMavenLocal') + description('Publishes minimal jar to Maven Local') +} + task dependenciesPublish(dependsOn: 'dependenciesJar') { dependsOn('publishDependenciesArtifactPublicationToGitHubPackagesRepository') description('Publishes dependencies jar') } +task dependenciesPublishToMavenLocal(dependsOn: 'dependenciesJar') { + dependsOn('publishDependenciesArtifactPublicationToMavenLocal') + description('Publishes dependencies jar to Maven Local') +} + project.afterEvaluate { tasks.withType(PublishToMavenLocal) { dependsOn minimalJar, dependenciesJar } } - task runFlink(type: JavaExec, dependsOn: classes) { Properties properties = loadEnv() systemProperties['ConfigSource'] = "ENVIRONMENT" @@ -297,4 +306,3 @@ task runFlink(type: JavaExec, dependsOn: classes) { classpath = sourceSets.main.runtimeClasspath environment properties } - From c6a98540ad30a972874ef124a817d6dd2d4a3405 Mon Sep 17 00:00:00 2001 From: rajuGT Date: Thu, 23 Jan 2025 12:59:35 +0530 Subject: [PATCH 2/8] Add capability to dagger to read python udfs from Ali(oss) and Tencent(cosn) storage services (#42) * Add gradle tasks to minimal and dependencies to maven local * Add capability to dagger to read python udfs from Ali(oss) and Tencent(cosn) storage services Given the configuration provided correctly. Set the below environment variables accordingly to access the files stored in the respective bucket. Ali(oss) - OSS_ACCESS_KEY_ID - OSS_ACCESS_KEY_SECRET Tencent(cos) - COS_SECRET_ID - COS_SECRET_KEY - COS_REGION * OSS client endpoint should be configurable via ENV variable * Fix checkstyle and made constants as static variables --------- Co-authored-by: Raju G T Co-authored-by: rajuGT --- dagger-functions/build.gradle | 2 + .../python/file/source/FileSourceFactory.java | 6 ++ .../python/file/source/cos/CosClient.java | 65 +++++++++++++++++++ .../python/file/source/cos/CosFileSource.java | 47 ++++++++++++++ .../python/file/source/oss/OssClient.java | 65 +++++++++++++++++++ .../python/file/source/oss/OssFileSource.java | 47 ++++++++++++++ .../file/source/FileSourceFactoryTest.java | 20 ++++++ .../python/file/source/cos/CosClientTest.java | 52 +++++++++++++++ .../file/source/cos/CosFileSourceTest.java | 38 +++++++++++ .../python/file/source/oss/OssClientTest.java | 48 ++++++++++++++ .../file/source/oss/OssFileSourceTest.java | 38 +++++++++++ 11 files changed, 428 insertions(+) create mode 100644 dagger-functions/src/main/java/com/gotocompany/dagger/functions/udfs/python/file/source/cos/CosClient.java create mode 100644 dagger-functions/src/main/java/com/gotocompany/dagger/functions/udfs/python/file/source/cos/CosFileSource.java create mode 100644 dagger-functions/src/main/java/com/gotocompany/dagger/functions/udfs/python/file/source/oss/OssClient.java create mode 100644 dagger-functions/src/main/java/com/gotocompany/dagger/functions/udfs/python/file/source/oss/OssFileSource.java create mode 100644 dagger-functions/src/test/java/com/gotocompany/dagger/functions/udfs/python/file/source/cos/CosClientTest.java create mode 100644 dagger-functions/src/test/java/com/gotocompany/dagger/functions/udfs/python/file/source/cos/CosFileSourceTest.java create mode 100644 dagger-functions/src/test/java/com/gotocompany/dagger/functions/udfs/python/file/source/oss/OssClientTest.java create mode 100644 dagger-functions/src/test/java/com/gotocompany/dagger/functions/udfs/python/file/source/oss/OssFileSourceTest.java diff --git a/dagger-functions/build.gradle b/dagger-functions/build.gradle index a14474508..d85f905ca 100644 --- a/dagger-functions/build.gradle +++ b/dagger-functions/build.gradle @@ -64,6 +64,8 @@ dependencies { dependenciesFunctionsJar group: 'org.apache.commons', name: 'commons-jexl3', version: '3.1' dependenciesFunctionsJar group: 'org.isuper', name: 's2-geometry-library-java', version: '0.0.1' dependenciesFunctionsJar group: 'com.google.cloud', name: 'google-cloud-storage', version: '2.23.0' + dependenciesFunctionsJar group: 'com.aliyun.oss', name: 'aliyun-sdk-oss', version: '3.18.1' + dependenciesFunctionsJar group: 'com.qcloud', name: 'cos_api', version: '5.6.227' testImplementation project(':dagger-common').sourceSets.test.output testImplementation group: 'junit', name: 'junit', version: '4.12' diff --git a/dagger-functions/src/main/java/com/gotocompany/dagger/functions/udfs/python/file/source/FileSourceFactory.java b/dagger-functions/src/main/java/com/gotocompany/dagger/functions/udfs/python/file/source/FileSourceFactory.java index 60f1fe94a..9aab9b500 100644 --- a/dagger-functions/src/main/java/com/gotocompany/dagger/functions/udfs/python/file/source/FileSourceFactory.java +++ b/dagger-functions/src/main/java/com/gotocompany/dagger/functions/udfs/python/file/source/FileSourceFactory.java @@ -1,7 +1,9 @@ package com.gotocompany.dagger.functions.udfs.python.file.source; +import com.gotocompany.dagger.functions.udfs.python.file.source.cos.CosFileSource; import com.gotocompany.dagger.functions.udfs.python.file.source.gcs.GcsFileSource; import com.gotocompany.dagger.functions.udfs.python.file.source.local.LocalFileSource; +import com.gotocompany.dagger.functions.udfs.python.file.source.oss.OssFileSource; /** * The type File source factory. @@ -17,6 +19,10 @@ public class FileSourceFactory { public static FileSource getFileSource(String pythonFile) { if ("GS".equals(getFileSourcePrefix(pythonFile))) { return new GcsFileSource(pythonFile); + } else if ("OSS".equals(getFileSourcePrefix(pythonFile))) { + return new OssFileSource(pythonFile); + } else if ("COSN".equals(getFileSourcePrefix(pythonFile))) { + return new CosFileSource(pythonFile); } else { return new LocalFileSource(pythonFile); } diff --git a/dagger-functions/src/main/java/com/gotocompany/dagger/functions/udfs/python/file/source/cos/CosClient.java b/dagger-functions/src/main/java/com/gotocompany/dagger/functions/udfs/python/file/source/cos/CosClient.java new file mode 100644 index 000000000..508212203 --- /dev/null +++ b/dagger-functions/src/main/java/com/gotocompany/dagger/functions/udfs/python/file/source/cos/CosClient.java @@ -0,0 +1,65 @@ +package com.gotocompany.dagger.functions.udfs.python.file.source.cos; + +import com.qcloud.cos.COSClient; +import com.qcloud.cos.ClientConfig; +import com.qcloud.cos.auth.BasicCOSCredentials; +import com.qcloud.cos.auth.COSCredentials; +import com.qcloud.cos.model.COSObject; +import com.qcloud.cos.model.COSObjectInputStream; +import com.qcloud.cos.region.Region; +import com.qcloud.cos.utils.IOUtils; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; + +public class CosClient { + + private static final String ENV_COS_SECRET_ID = "COS_SECRET_ID"; + private static final String ENV_COS_SECRET_KEY = "COS_SECRET_KEY"; + private static final String ENV_COS_REGION = "COS_REGION"; + + private final COSClient libCosClient; + + /** + * Instantiates a new Cos client. + */ + public CosClient() { + String secretID = System.getenv(ENV_COS_SECRET_ID); + String secretKey = System.getenv(ENV_COS_SECRET_KEY); + String region = System.getenv(ENV_COS_REGION); // ap-singapore + + COSCredentials credentials = new BasicCOSCredentials(secretID, secretKey); + ClientConfig clientConfig = new ClientConfig(new Region(region)); + libCosClient = new COSClient(credentials, clientConfig); + } + + /** + * Instantiates a new Cos client. + * This constructor used for unit test purposes. + * + * @param libCosClient the storage + */ + public CosClient(COSClient libCosClient) { + this.libCosClient = libCosClient; + } + + /** + * Get file byte [ ]. + * + * @param pythonFile the python file + * @return the byte [ ] + */ + public byte[] getFile(String pythonFile) throws IOException { + List file = Arrays.asList(pythonFile.replace("cosn://", "").split("/")); + + String bucketName = file.get(0); + String objectName = file.stream().skip(1).collect(Collectors.joining("/")); + + COSObject cosObject = libCosClient.getObject(bucketName, objectName); + try (COSObjectInputStream inputStream = cosObject.getObjectContent()) { + return IOUtils.toByteArray(inputStream); + } + } +} diff --git a/dagger-functions/src/main/java/com/gotocompany/dagger/functions/udfs/python/file/source/cos/CosFileSource.java b/dagger-functions/src/main/java/com/gotocompany/dagger/functions/udfs/python/file/source/cos/CosFileSource.java new file mode 100644 index 000000000..8f1795fb0 --- /dev/null +++ b/dagger-functions/src/main/java/com/gotocompany/dagger/functions/udfs/python/file/source/cos/CosFileSource.java @@ -0,0 +1,47 @@ +package com.gotocompany.dagger.functions.udfs.python.file.source.cos; + +import com.gotocompany.dagger.functions.udfs.python.file.source.FileSource; + +import java.io.IOException; + +public class CosFileSource implements FileSource { + + private CosClient cosClient; + private final String pythonFile; + + /** + * Instantiates a new Cos file source. + * + * @param pythonFile the python file + */ + public CosFileSource(String pythonFile) { + this.pythonFile = pythonFile; + } + + /** + * Instantiates a new Cos file source. + * + * @param pythonFile the python file + */ + public CosFileSource(String pythonFile, CosClient cosClient) { + this.pythonFile = pythonFile; + this.cosClient = cosClient; + } + + @Override + public byte[] getObjectFile() throws IOException { + return getCosClient().getFile(pythonFile); + } + + /** + * Gets cos client. + * + * @return the cos client + */ + private CosClient getCosClient() { + if (this.cosClient == null) { + this.cosClient = new CosClient(); + } + return this.cosClient; + } +} diff --git a/dagger-functions/src/main/java/com/gotocompany/dagger/functions/udfs/python/file/source/oss/OssClient.java b/dagger-functions/src/main/java/com/gotocompany/dagger/functions/udfs/python/file/source/oss/OssClient.java new file mode 100644 index 000000000..a371242e2 --- /dev/null +++ b/dagger-functions/src/main/java/com/gotocompany/dagger/functions/udfs/python/file/source/oss/OssClient.java @@ -0,0 +1,65 @@ +package com.gotocompany.dagger.functions.udfs.python.file.source.oss; + +import com.aliyun.core.utils.IOUtils; +import com.aliyun.oss.OSS; +import com.aliyun.oss.OSSClientBuilder; +import com.aliyun.oss.common.auth.CredentialsProviderFactory; +import com.aliyun.oss.model.OSSObject; +import com.aliyuncs.exceptions.ClientException; + +import java.io.IOException; +import java.io.InputStream; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; + +public class OssClient { + + private static final String ENV_OSS_ENDPOINT = "OSS_ENDPOINT"; + private static final String DEFAULT_OSS_ENDPOINT = "oss-ap-southeast-1.aliyuncs.com"; + + private final OSS libOssClient; + + /** + * Instantiates a new Oss client. + */ + public OssClient() { + String endpoint = System.getenv(ENV_OSS_ENDPOINT); + if (endpoint == null || endpoint.isEmpty()) { + endpoint = DEFAULT_OSS_ENDPOINT; + } + try { + libOssClient = new OSSClientBuilder().build(endpoint, CredentialsProviderFactory.newEnvironmentVariableCredentialsProvider()); + } catch (ClientException e) { + throw new RuntimeException("failed to initialise oss client", e); + } + } + + /** + * Instantiates a new OSS client. + * This constructor used for unit test purposes. + * + * @param libOssClient the storage + */ + public OssClient(OSS libOssClient) { + this.libOssClient = libOssClient; + } + + /** + * Get file byte [ ]. + * + * @param pythonFile the python file + * @return the byte [ ] + */ + public byte[] getFile(String pythonFile) throws IOException { + List file = Arrays.asList(pythonFile.replace("oss://", "").split("/")); + + String bucketName = file.get(0); + String objectName = file.stream().skip(1).collect(Collectors.joining("/")); + + OSSObject ossObject = libOssClient.getObject(bucketName, objectName); + try (InputStream inputStream = ossObject.getObjectContent()) { + return IOUtils.toByteArray(inputStream); + } + } +} diff --git a/dagger-functions/src/main/java/com/gotocompany/dagger/functions/udfs/python/file/source/oss/OssFileSource.java b/dagger-functions/src/main/java/com/gotocompany/dagger/functions/udfs/python/file/source/oss/OssFileSource.java new file mode 100644 index 000000000..cc183005b --- /dev/null +++ b/dagger-functions/src/main/java/com/gotocompany/dagger/functions/udfs/python/file/source/oss/OssFileSource.java @@ -0,0 +1,47 @@ +package com.gotocompany.dagger.functions.udfs.python.file.source.oss; + +import com.gotocompany.dagger.functions.udfs.python.file.source.FileSource; + +import java.io.IOException; + +public class OssFileSource implements FileSource { + + private OssClient ossClient; + private final String pythonFile; + + /** + * Instantiates a new Oss file source. + * + * @param pythonFile the python file + */ + public OssFileSource(String pythonFile) { + this.pythonFile = pythonFile; + } + + /** + * Instantiates a new Oss file source. + * + * @param pythonFile the python file + */ + public OssFileSource(String pythonFile, OssClient ossClient) { + this.pythonFile = pythonFile; + this.ossClient = ossClient; + } + + @Override + public byte[] getObjectFile() throws IOException { + return getOssClient().getFile(pythonFile); + } + + /** + * Gets oss client. + * + * @return the oss client + */ + private OssClient getOssClient() { + if (this.ossClient == null) { + this.ossClient = new OssClient(); + } + return this.ossClient; + } +} diff --git a/dagger-functions/src/test/java/com/gotocompany/dagger/functions/udfs/python/file/source/FileSourceFactoryTest.java b/dagger-functions/src/test/java/com/gotocompany/dagger/functions/udfs/python/file/source/FileSourceFactoryTest.java index 9bcaadbf0..4dbfbb81a 100644 --- a/dagger-functions/src/test/java/com/gotocompany/dagger/functions/udfs/python/file/source/FileSourceFactoryTest.java +++ b/dagger-functions/src/test/java/com/gotocompany/dagger/functions/udfs/python/file/source/FileSourceFactoryTest.java @@ -1,7 +1,9 @@ package com.gotocompany.dagger.functions.udfs.python.file.source; +import com.gotocompany.dagger.functions.udfs.python.file.source.cos.CosFileSource; import com.gotocompany.dagger.functions.udfs.python.file.source.gcs.GcsFileSource; import com.gotocompany.dagger.functions.udfs.python.file.source.local.LocalFileSource; +import com.gotocompany.dagger.functions.udfs.python.file.source.oss.OssFileSource; import org.junit.Assert; import org.junit.Test; @@ -24,4 +26,22 @@ public void shouldGetGcsFileSource() { Assert.assertTrue(fileSource instanceof GcsFileSource); } + + @Test + public void shouldGetOssFileSource() { + String pythonFile = "oss://bucket-name/path/to/file/test_function.py"; + + FileSource fileSource = FileSourceFactory.getFileSource(pythonFile); + + Assert.assertTrue(fileSource instanceof OssFileSource); + } + + @Test + public void shouldGetCosnFileSource() { + String pythonFile = "cosn://bucket-name/path/to/file/test_function.py"; + + FileSource fileSource = FileSourceFactory.getFileSource(pythonFile); + + Assert.assertTrue(fileSource instanceof CosFileSource); + } } diff --git a/dagger-functions/src/test/java/com/gotocompany/dagger/functions/udfs/python/file/source/cos/CosClientTest.java b/dagger-functions/src/test/java/com/gotocompany/dagger/functions/udfs/python/file/source/cos/CosClientTest.java new file mode 100644 index 000000000..aad1cdb43 --- /dev/null +++ b/dagger-functions/src/test/java/com/gotocompany/dagger/functions/udfs/python/file/source/cos/CosClientTest.java @@ -0,0 +1,52 @@ +package com.gotocompany.dagger.functions.udfs.python.file.source.cos; + +import com.qcloud.cos.COSClient; +import com.qcloud.cos.model.COSObject; +import com.qcloud.cos.model.COSObjectInputStream; +import org.apache.http.client.methods.HttpRequestBase; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; +import org.mockito.Mockito; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.util.Arrays; + +import static org.mockito.Mockito.*; +import static org.mockito.MockitoAnnotations.initMocks; + +public class CosClientTest { + + @Mock + private COSClient libCosClient; + + @Mock + private COSObject cosObject; + + @Before + public void setup() { + initMocks(this); + } + + @Test + public void shouldGetObjectFile() throws IOException { + HttpRequestBase mockRequest = Mockito.mock(HttpRequestBase.class); + + String pythonFile = "cosn://bucket_name/path/to/file/python_udf.zip"; + String bucketName = "bucket_name"; + String objectName = "path/to/file/python_udf.zip"; + String expectedValue = Arrays.toString("objectFile".getBytes()); + + when(libCosClient.getObject(bucketName, objectName)).thenReturn(cosObject); + when(cosObject.getObjectContent()).thenReturn(new COSObjectInputStream(new ByteArrayInputStream("objectFile".getBytes()), mockRequest)); + + CosClient cosClient = new CosClient(libCosClient); + byte[] actualValue = cosClient.getFile(pythonFile); + + verify(libCosClient, times(1)).getObject(bucketName, objectName); + verify(cosObject, times(1)).getObjectContent(); + Assert.assertEquals(expectedValue, Arrays.toString(actualValue)); + } +} diff --git a/dagger-functions/src/test/java/com/gotocompany/dagger/functions/udfs/python/file/source/cos/CosFileSourceTest.java b/dagger-functions/src/test/java/com/gotocompany/dagger/functions/udfs/python/file/source/cos/CosFileSourceTest.java new file mode 100644 index 000000000..f0700a4bb --- /dev/null +++ b/dagger-functions/src/test/java/com/gotocompany/dagger/functions/udfs/python/file/source/cos/CosFileSourceTest.java @@ -0,0 +1,38 @@ +package com.gotocompany.dagger.functions.udfs.python.file.source.cos; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Paths; + +import static org.mockito.Mockito.when; +import static org.mockito.MockitoAnnotations.initMocks; + +public class CosFileSourceTest { + + @Mock + private CosClient cosClient; + + @Before + public void setup() { + initMocks(this); + } + + @Test + public void shouldGetObjectFile() throws IOException { + ClassLoader classLoader = getClass().getClassLoader(); + String pythonFile = classLoader.getResource("python_udf.zip").getFile(); + byte[] expectedObject = Files.readAllBytes(Paths.get(pythonFile)); + + when(cosClient.getFile(pythonFile)).thenReturn(expectedObject); + CosFileSource cosFileSource = new CosFileSource(pythonFile, cosClient); + + byte[] actualObject = cosFileSource.getObjectFile(); + + Assert.assertEquals(expectedObject, actualObject); + } +} diff --git a/dagger-functions/src/test/java/com/gotocompany/dagger/functions/udfs/python/file/source/oss/OssClientTest.java b/dagger-functions/src/test/java/com/gotocompany/dagger/functions/udfs/python/file/source/oss/OssClientTest.java new file mode 100644 index 000000000..f89b4fcc5 --- /dev/null +++ b/dagger-functions/src/test/java/com/gotocompany/dagger/functions/udfs/python/file/source/oss/OssClientTest.java @@ -0,0 +1,48 @@ +package com.gotocompany.dagger.functions.udfs.python.file.source.oss; + +import com.aliyun.oss.OSS; +import com.aliyun.oss.model.OSSObject; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.util.Arrays; + +import static org.mockito.Mockito.*; +import static org.mockito.MockitoAnnotations.initMocks; + +public class OssClientTest { + + @Mock + private OSS libOSSClient; + + @Mock + private OSSObject ossObject; + + @Before + public void setup() { + initMocks(this); + } + + @Test + public void shouldGetObjectFile() throws IOException { + + String pythonFile = "oss://bucket_name/path/to/file/python_udf.zip"; + String bucketName = "bucket_name"; + String objectName = "path/to/file/python_udf.zip"; + String expectedValue = Arrays.toString("objectFile".getBytes()); + + when(libOSSClient.getObject(bucketName, objectName)).thenReturn(ossObject); + when(ossObject.getObjectContent()).thenReturn(new ByteArrayInputStream("objectFile".getBytes())); + + OssClient ossClient = new OssClient(libOSSClient); + byte[] actualValue = ossClient.getFile(pythonFile); + + verify(libOSSClient, times(1)).getObject(bucketName, objectName); + verify(ossObject, times(1)).getObjectContent(); + Assert.assertEquals(expectedValue, Arrays.toString(actualValue)); + } +} diff --git a/dagger-functions/src/test/java/com/gotocompany/dagger/functions/udfs/python/file/source/oss/OssFileSourceTest.java b/dagger-functions/src/test/java/com/gotocompany/dagger/functions/udfs/python/file/source/oss/OssFileSourceTest.java new file mode 100644 index 000000000..4b7b444ac --- /dev/null +++ b/dagger-functions/src/test/java/com/gotocompany/dagger/functions/udfs/python/file/source/oss/OssFileSourceTest.java @@ -0,0 +1,38 @@ +package com.gotocompany.dagger.functions.udfs.python.file.source.oss; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Paths; + +import static org.mockito.Mockito.when; +import static org.mockito.MockitoAnnotations.initMocks; + +public class OssFileSourceTest { + + @Mock + private OssClient ossClient; + + @Before + public void setup() { + initMocks(this); + } + + @Test + public void shouldGetObjectFile() throws IOException { + ClassLoader classLoader = getClass().getClassLoader(); + String pythonFile = classLoader.getResource("python_udf.zip").getFile(); + byte[] expectedObject = Files.readAllBytes(Paths.get(pythonFile)); + + when(ossClient.getFile(pythonFile)).thenReturn(expectedObject); + OssFileSource ossFileSource = new OssFileSource(pythonFile, ossClient); + + byte[] actualObject = ossFileSource.getObjectFile(); + + Assert.assertEquals(expectedObject, actualObject); + } +} From a4663b6164bce10d1d006f27346e6906d978a5df Mon Sep 17 00:00:00 2001 From: rajuGT Date: Thu, 23 Jan 2025 13:13:08 +0530 Subject: [PATCH 3/8] COS filesystem high availability support (#43) * Add gradle tasks to minimal and dependencies to maven local * Add capability to dagger to read python udfs from Ali(oss) and Tencent(cosn) storage services Given the configuration provided correctly. Set the below environment variables accordingly to access the files stored in the respective bucket. Ali(oss) - OSS_ACCESS_KEY_ID - OSS_ACCESS_KEY_SECRET Tencent(cos) - COS_SECRET_ID - COS_SECRET_KEY - COS_REGION * OSS client endpoint should be configurable via ENV variable * COS filesystem high availability support If you need to use COS filesystem for the dagger, provide the cos bucket/key configuration in the state.backend.fs.checkpointdir, state.savepoints.dir, high-availability.storageDir to flinkdeployment manifest. If the filesystem protocol begins with cosn for the above configurations, dagger uses the below configurations provided in the flinkdeployment manifest file. fs.cosn.impl: org.apache.hadoop.fs.CosFileSystem fs.AbstractFileSystem.cosn.impl: org.apache.hadoop.fs.CosN fs.cosn.userinfo.secretId: fs.cosn.userinfo.secretKey: fs.cosn.bucket.region: fs.cosn.bucket.endpoint_suffix: * Fix checkstyle and made constants as static variables --------- Co-authored-by: Raju G T Co-authored-by: rajuGT --- dagger-common/build.gradle | 1 + 1 file changed, 1 insertion(+) diff --git a/dagger-common/build.gradle b/dagger-common/build.gradle index 00b4d3c48..1be353e23 100644 --- a/dagger-common/build.gradle +++ b/dagger-common/build.gradle @@ -65,6 +65,7 @@ dependencies { exclude module: "commons-compress" } dependenciesCommonJar group: 'com.google.cloud.bigdataoss', name: 'gcs-connector', version: 'hadoop2-2.2.16' + dependenciesCommonJar group: 'com.qcloud.cos', name: 'flink-cos-fs-hadoop', version: '1.10.0-0.1.10' dependenciesCommonJar 'org.apache.flink:flink-metrics-dropwizard:' + flinkVersion dependenciesCommonJar 'org.apache.flink:flink-json:' + flinkVersion dependenciesCommonJar 'com.jayway.jsonpath:json-path:2.4.0' From dc18103c0ae4b91ed27d95f9a1e1f37b328df3d6 Mon Sep 17 00:00:00 2001 From: rajuGT Date: Thu, 23 Jan 2025 13:22:37 +0530 Subject: [PATCH 4/8] Refactor Dart Feature to plug other object storage service providers (#44) * Add gradle tasks to minimal and dependencies to maven local * Add capability to dagger to read python udfs from Ali(oss) and Tencent(cosn) storage services Given the configuration provided correctly. Set the below environment variables accordingly to access the files stored in the respective bucket. Ali(oss) - OSS_ACCESS_KEY_ID - OSS_ACCESS_KEY_SECRET Tencent(cos) - COS_SECRET_ID - COS_SECRET_KEY - COS_REGION * OSS client endpoint should be configurable via ENV variable * COS filesystem high availability support If you need to use COS filesystem for the dagger, provide the cos bucket/key configuration in the state.backend.fs.checkpointdir, state.savepoints.dir, high-availability.storageDir to flinkdeployment manifest. If the filesystem protocol begins with cosn for the above configurations, dagger uses the below configurations provided in the flinkdeployment manifest file. fs.cosn.impl: org.apache.hadoop.fs.CosFileSystem fs.AbstractFileSystem.cosn.impl: org.apache.hadoop.fs.CosN fs.cosn.userinfo.secretId: fs.cosn.userinfo.secretKey: fs.cosn.bucket.region: fs.cosn.bucket.endpoint_suffix: * Fix checkstyle and made constants as static variables * Refactor Dart Feature to plug other object storage service providers * test checkstyle fix --------- Co-authored-by: Raju G T Co-authored-by: rajuGT --- .../dagger/common/udfs/ScalarUdf.java | 5 + .../udfs/factories/FunctionFactory.java | 17 +-- .../functions/udfs/scalar/DartContains.java | 44 ++----- .../dagger/functions/udfs/scalar/DartGet.java | 43 ++----- .../udfs/scalar/dart/DartScalarUdf.java | 23 ++++ .../{DataStore.java => DartDataStore.java} | 8 +- ...csDataStore.java => GcsDartDataStore.java} | 72 +++-------- .../udfs/scalar/DartContainsTest.java | 116 ++++++++---------- .../functions/udfs/scalar/DartGetTest.java | 52 ++++---- ...oreTest.java => GcsDartDataStoreTest.java} | 35 ++---- 10 files changed, 164 insertions(+), 251 deletions(-) create mode 100644 dagger-functions/src/main/java/com/gotocompany/dagger/functions/udfs/scalar/dart/DartScalarUdf.java rename dagger-functions/src/main/java/com/gotocompany/dagger/functions/udfs/scalar/dart/store/{DataStore.java => DartDataStore.java} (52%) rename dagger-functions/src/main/java/com/gotocompany/dagger/functions/udfs/scalar/dart/store/gcs/{GcsDataStore.java => GcsDartDataStore.java} (60%) rename dagger-functions/src/test/java/com/gotocompany/dagger/functions/udfs/scalar/dart/store/gcs/{GcsDataStoreTest.java => GcsDartDataStoreTest.java} (80%) diff --git a/dagger-common/src/main/java/com/gotocompany/dagger/common/udfs/ScalarUdf.java b/dagger-common/src/main/java/com/gotocompany/dagger/common/udfs/ScalarUdf.java index 1fffb084e..9256d482c 100644 --- a/dagger-common/src/main/java/com/gotocompany/dagger/common/udfs/ScalarUdf.java +++ b/dagger-common/src/main/java/com/gotocompany/dagger/common/udfs/ScalarUdf.java @@ -38,4 +38,9 @@ public String getName() { public GaugeStatsManager getGaugeStatsManager() { return gaugeStatsManager; } + + // For testing purpose only + public void setGaugeStatsManager(GaugeStatsManager gaugeStatsManager) { + this.gaugeStatsManager = gaugeStatsManager; + } } diff --git a/dagger-functions/src/main/java/com/gotocompany/dagger/functions/udfs/factories/FunctionFactory.java b/dagger-functions/src/main/java/com/gotocompany/dagger/functions/udfs/factories/FunctionFactory.java index 963cccf27..41e82f7ee 100644 --- a/dagger-functions/src/main/java/com/gotocompany/dagger/functions/udfs/factories/FunctionFactory.java +++ b/dagger-functions/src/main/java/com/gotocompany/dagger/functions/udfs/factories/FunctionFactory.java @@ -1,6 +1,8 @@ package com.gotocompany.dagger.functions.udfs.factories; import com.gotocompany.dagger.functions.common.Constants; +import com.gotocompany.dagger.functions.udfs.scalar.dart.store.DartDataStore; +import com.gotocompany.dagger.functions.udfs.scalar.dart.store.gcs.GcsDartDataStore; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import com.google.gson.Gson; @@ -79,8 +81,9 @@ public FunctionFactory(StreamTableEnvironment streamTableEnvironment, Configurat @Override public HashSet getScalarUdfs() { HashSet scalarUdfs = new HashSet<>(); - scalarUdfs.add(DartContains.withGcsDataStore(getGcsProjectId(), getGcsBucketId())); - scalarUdfs.add(DartGet.withGcsDataStore(getGcsProjectId(), getGcsBucketId())); + DartDataStore dartDataSource = getDartDataSource(); + scalarUdfs.add(new DartContains(dartDataSource)); + scalarUdfs.add(new DartGet(dartDataSource)); scalarUdfs.add(new Distance()); scalarUdfs.add(new ElementAt(getProtosInInputStreams(), stencilClientOrchestrator)); scalarUdfs.add(new EndOfMonth()); @@ -130,12 +133,10 @@ public HashSet getAggregateUdfs() { return aggregateUdfs; } - private String getGcsProjectId() { - return getConfiguration().getString(Constants.UDF_DART_GCS_PROJECT_ID_KEY, Constants.UDF_DART_GCS_PROJECT_ID_DEFAULT); - } - - private String getGcsBucketId() { - return getConfiguration().getString(Constants.UDF_DART_GCS_BUCKET_ID_KEY, Constants.UDF_DART_GCS_BUCKET_ID_DEFAULT); + private DartDataStore getDartDataSource() { + String projectID = getConfiguration().getString(Constants.UDF_DART_GCS_PROJECT_ID_KEY, Constants.UDF_DART_GCS_PROJECT_ID_DEFAULT); + String bucketID = getConfiguration().getString(Constants.UDF_DART_GCS_BUCKET_ID_KEY, Constants.UDF_DART_GCS_BUCKET_ID_DEFAULT); + return new GcsDartDataStore(projectID, bucketID); } private LinkedHashMap getProtosInInputStreams() { diff --git a/dagger-functions/src/main/java/com/gotocompany/dagger/functions/udfs/scalar/DartContains.java b/dagger-functions/src/main/java/com/gotocompany/dagger/functions/udfs/scalar/DartContains.java index 4f05db063..f7c41c2a5 100644 --- a/dagger-functions/src/main/java/com/gotocompany/dagger/functions/udfs/scalar/DartContains.java +++ b/dagger-functions/src/main/java/com/gotocompany/dagger/functions/udfs/scalar/DartContains.java @@ -1,22 +1,18 @@ package com.gotocompany.dagger.functions.udfs.scalar; -import com.gotocompany.dagger.common.metrics.managers.MeterStatsManager; -import com.gotocompany.dagger.common.udfs.ScalarUdf; import com.gotocompany.dagger.functions.udfs.scalar.dart.DartAspects; -import com.gotocompany.dagger.functions.udfs.scalar.dart.store.gcs.GcsDataStore; +import com.gotocompany.dagger.functions.udfs.scalar.dart.DartScalarUdf; +import com.gotocompany.dagger.functions.udfs.scalar.dart.store.DartDataStore; import com.gotocompany.dagger.functions.udfs.scalar.dart.types.SetCache; -import org.apache.flink.table.functions.FunctionContext; import java.util.HashMap; import java.util.Map; -import static com.gotocompany.dagger.common.core.Constants.UDF_TELEMETRY_GROUP_KEY; - /** * The DartContains udf. */ -public class DartContains extends ScalarUdf { - private final GcsDataStore dataStore; +public class DartContains extends DartScalarUdf { + private final DartDataStore dataStore; private final Map setCache; /** @@ -24,32 +20,11 @@ public class DartContains extends ScalarUdf { * * @param dataStore the data store */ - DartContains(GcsDataStore dataStore) { + public DartContains(DartDataStore dataStore) { this.dataStore = dataStore; setCache = new HashMap<>(); } - /** - * With gcs data store dart contains. - * - * @param projectId the project id - * @param bucketId the bucket id - * @return the dart contains - */ - public static DartContains withGcsDataStore(String projectId, String bucketId) { - return new DartContains(new GcsDataStore(projectId, bucketId)); - } - - @Override - public void open(FunctionContext context) throws Exception { - super.open(context); - MeterStatsManager meterStatsManager = new MeterStatsManager(context.getMetricGroup(), true); - meterStatsManager.register(UDF_TELEMETRY_GROUP_KEY, this.getName(), DartAspects.values()); - this.dataStore.setMeterStatsManager(meterStatsManager); - this.dataStore.setGaugeStatsManager(getGaugeStatsManager()); - - } - /** * To check if a data point in the message is present in the Redis collection. * @@ -108,18 +83,17 @@ public boolean eval(String listName, String field, String regex, Integer refresh private SetCache getListData(String listName, String field, int refreshRateInHours) { if (setCache.isEmpty() || !setCache.containsKey(listName) || setCache.get(listName).hasExpired(refreshRateInHours) || setCache.get(listName).isEmpty()) { - setCache.put(listName, dataStore.getSet(listName)); - dataStore.getMeterStatsManager().markEvent(DartAspects.DART_GCS_FETCH_SUCCESS); + setCache.put(listName, dataStore.getSet(listName, getMeterStatsManager(), getGaugeStatsManager())); + getMeterStatsManager().markEvent(DartAspects.DART_GCS_FETCH_SUCCESS); } return setCache.get(listName); } private void updateMetrics(boolean isPresent) { if (isPresent) { - dataStore.getMeterStatsManager().markEvent(DartAspects.DART_CACHE_HIT); + getMeterStatsManager().markEvent(DartAspects.DART_CACHE_HIT); } else { - dataStore.getMeterStatsManager().markEvent(DartAspects.DART_CACHE_MISS); + getMeterStatsManager().markEvent(DartAspects.DART_CACHE_MISS); } } - } diff --git a/dagger-functions/src/main/java/com/gotocompany/dagger/functions/udfs/scalar/DartGet.java b/dagger-functions/src/main/java/com/gotocompany/dagger/functions/udfs/scalar/DartGet.java index 90d88ca0f..8242fcfeb 100644 --- a/dagger-functions/src/main/java/com/gotocompany/dagger/functions/udfs/scalar/DartGet.java +++ b/dagger-functions/src/main/java/com/gotocompany/dagger/functions/udfs/scalar/DartGet.java @@ -1,23 +1,19 @@ package com.gotocompany.dagger.functions.udfs.scalar; -import com.gotocompany.dagger.common.metrics.managers.MeterStatsManager; -import com.gotocompany.dagger.common.udfs.ScalarUdf; import com.gotocompany.dagger.functions.exceptions.KeyDoesNotExistException; import com.gotocompany.dagger.functions.udfs.scalar.dart.DartAspects; -import com.gotocompany.dagger.functions.udfs.scalar.dart.store.gcs.GcsDataStore; +import com.gotocompany.dagger.functions.udfs.scalar.dart.DartScalarUdf; +import com.gotocompany.dagger.functions.udfs.scalar.dart.store.DartDataStore; import com.gotocompany.dagger.functions.udfs.scalar.dart.types.MapCache; -import org.apache.flink.table.functions.FunctionContext; import java.util.HashMap; import java.util.Map; -import static com.gotocompany.dagger.common.core.Constants.UDF_TELEMETRY_GROUP_KEY; - /** * The DartGet udf. */ -public class DartGet extends ScalarUdf { - private final GcsDataStore dataStore; +public class DartGet extends DartScalarUdf { + private final DartDataStore dataStore; private final Map cache; /** @@ -25,31 +21,11 @@ public class DartGet extends ScalarUdf { * * @param dataStore the data store */ - public DartGet(GcsDataStore dataStore) { + public DartGet(DartDataStore dataStore) { this.dataStore = dataStore; cache = new HashMap<>(); } - /** - * With gcs data store dart get. - * - * @param projectId the project id - * @param bucketId the bucket id - * @return the dart get - */ - public static DartGet withGcsDataStore(String projectId, String bucketId) { - return new DartGet(new GcsDataStore(projectId, bucketId)); - } - - @Override - public void open(FunctionContext context) throws Exception { - super.open(context); - MeterStatsManager meterStatsManager = new MeterStatsManager(context.getMetricGroup(), true); - meterStatsManager.register(UDF_TELEMETRY_GROUP_KEY, this.getName(), DartAspects.values()); - dataStore.setMeterStatsManager(meterStatsManager); - dataStore.setGaugeStatsManager(getGaugeStatsManager()); - } - /** * To fetch a corresponding value in a collection given a key from data point. * @@ -62,10 +38,10 @@ public void open(FunctionContext context) throws Exception { */ public String eval(String collectionName, String key, Integer refreshRateInHours) { if (cache.isEmpty() || !cache.containsKey(collectionName) || cache.get(collectionName).hasExpired(refreshRateInHours) || cache.get(collectionName).isEmpty()) { - cache.put(collectionName, dataStore.getMap(collectionName)); - dataStore.getMeterStatsManager().markEvent(DartAspects.DART_GCS_FETCH_SUCCESS); + cache.put(collectionName, dataStore.getMap(collectionName, getMeterStatsManager(), getGaugeStatsManager())); + getMeterStatsManager().markEvent(DartAspects.DART_GCS_FETCH_SUCCESS); } - dataStore.getMeterStatsManager().markEvent(DartAspects.DART_CACHE_HIT); + getMeterStatsManager().markEvent(DartAspects.DART_CACHE_HIT); return cache.get(collectionName).get(key); } @@ -82,9 +58,8 @@ public String eval(String collectionName, String key, Integer refreshRateInHours try { return eval(collectionName, key, refreshRateInHours); } catch (KeyDoesNotExistException e) { - dataStore.getMeterStatsManager().markEvent(DartAspects.DART_CACHE_MISS); + getMeterStatsManager().markEvent(DartAspects.DART_CACHE_MISS); return defaultValue; } } - } diff --git a/dagger-functions/src/main/java/com/gotocompany/dagger/functions/udfs/scalar/dart/DartScalarUdf.java b/dagger-functions/src/main/java/com/gotocompany/dagger/functions/udfs/scalar/dart/DartScalarUdf.java new file mode 100644 index 000000000..df0bc89b1 --- /dev/null +++ b/dagger-functions/src/main/java/com/gotocompany/dagger/functions/udfs/scalar/dart/DartScalarUdf.java @@ -0,0 +1,23 @@ +package com.gotocompany.dagger.functions.udfs.scalar.dart; + +import com.gotocompany.dagger.common.metrics.managers.MeterStatsManager; +import com.gotocompany.dagger.common.udfs.ScalarUdf; +import lombok.Getter; +import lombok.Setter; +import org.apache.flink.table.functions.FunctionContext; + +import static com.gotocompany.dagger.common.core.Constants.UDF_TELEMETRY_GROUP_KEY; + +public abstract class DartScalarUdf extends ScalarUdf { + + @Getter + @Setter // For testing purpose only + private MeterStatsManager meterStatsManager; + + @Override + public void open(FunctionContext context) throws Exception { + super.open(context); + meterStatsManager = new MeterStatsManager(context.getMetricGroup(), true); + meterStatsManager.register(UDF_TELEMETRY_GROUP_KEY, this.getName(), DartAspects.values()); + } +} diff --git a/dagger-functions/src/main/java/com/gotocompany/dagger/functions/udfs/scalar/dart/store/DataStore.java b/dagger-functions/src/main/java/com/gotocompany/dagger/functions/udfs/scalar/dart/store/DartDataStore.java similarity index 52% rename from dagger-functions/src/main/java/com/gotocompany/dagger/functions/udfs/scalar/dart/store/DataStore.java rename to dagger-functions/src/main/java/com/gotocompany/dagger/functions/udfs/scalar/dart/store/DartDataStore.java index 50b8c5aa1..fc1224d84 100644 --- a/dagger-functions/src/main/java/com/gotocompany/dagger/functions/udfs/scalar/dart/store/DataStore.java +++ b/dagger-functions/src/main/java/com/gotocompany/dagger/functions/udfs/scalar/dart/store/DartDataStore.java @@ -1,19 +1,21 @@ package com.gotocompany.dagger.functions.udfs.scalar.dart.store; +import com.gotocompany.dagger.common.metrics.managers.GaugeStatsManager; +import com.gotocompany.dagger.common.metrics.managers.MeterStatsManager; import com.gotocompany.dagger.functions.udfs.scalar.dart.types.MapCache; import com.gotocompany.dagger.functions.udfs.scalar.dart.types.SetCache; /** * The interface Data store. */ -public interface DataStore { +public interface DartDataStore { /** * Gets set. * * @param setName the set name * @return the set */ - SetCache getSet(String setName); + SetCache getSet(String setName, MeterStatsManager meterStatsManager, GaugeStatsManager gaugeStatsManager); /** * Gets map. @@ -21,5 +23,5 @@ public interface DataStore { * @param mapName the map name * @return the map */ - MapCache getMap(String mapName); + MapCache getMap(String mapName, MeterStatsManager meterStatsManager, GaugeStatsManager gaugeStatsManager); } diff --git a/dagger-functions/src/main/java/com/gotocompany/dagger/functions/udfs/scalar/dart/store/gcs/GcsDataStore.java b/dagger-functions/src/main/java/com/gotocompany/dagger/functions/udfs/scalar/dart/store/gcs/GcsDartDataStore.java similarity index 60% rename from dagger-functions/src/main/java/com/gotocompany/dagger/functions/udfs/scalar/dart/store/gcs/GcsDataStore.java rename to dagger-functions/src/main/java/com/gotocompany/dagger/functions/udfs/scalar/dart/store/gcs/GcsDartDataStore.java index 1364f91ed..ed2ce69f5 100644 --- a/dagger-functions/src/main/java/com/gotocompany/dagger/functions/udfs/scalar/dart/store/gcs/GcsDataStore.java +++ b/dagger-functions/src/main/java/com/gotocompany/dagger/functions/udfs/scalar/dart/store/gcs/GcsDartDataStore.java @@ -7,7 +7,7 @@ import com.gotocompany.dagger.functions.udfs.scalar.dart.types.SetCache; import com.gotocompany.dagger.functions.udfs.scalar.DartContains; import com.gotocompany.dagger.functions.udfs.scalar.DartGet; -import com.gotocompany.dagger.functions.udfs.scalar.dart.store.DataStore; +import com.gotocompany.dagger.functions.udfs.scalar.dart.store.DartDataStore; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.type.TypeReference; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; @@ -21,11 +21,10 @@ import java.util.Map; import java.util.Set; - /** * The type Gcs data store. */ -public class GcsDataStore implements DataStore, Serializable { +public class GcsDartDataStore implements DartDataStore, Serializable { private final String projectId; @@ -33,71 +32,34 @@ public class GcsDataStore implements DataStore, Serializable { private GcsClient gcsClient; - private MeterStatsManager meterStatsManager; - private GaugeStatsManager gaugeStatsManager; - /** * Instantiates a new Gcs data store. * * @param projectId the project id * @param bucketId the bucket id */ - public GcsDataStore(String projectId, String bucketId) { + public GcsDartDataStore(String projectId, String bucketId) { this.projectId = projectId; this.bucketId = bucketId; } @Override - public SetCache getSet(String setName) { - - return new SetCache(getSetOfObjects(setName)); + public SetCache getSet(String setName, MeterStatsManager meterStatsManager, GaugeStatsManager gaugeManager) { + return new SetCache(getSetOfObjects(setName, meterStatsManager, gaugeManager)); } @Override - public MapCache getMap(String mapName) { - Map mapOfObjects = getMapOfObjects(mapName); + public MapCache getMap(String mapName, MeterStatsManager meterStatsManager, GaugeStatsManager gaugeManager) { + Map mapOfObjects = getMapOfObjects(mapName, meterStatsManager, gaugeManager); return new MapCache(mapOfObjects); } - /** - * Sets meter stats manager. - * - * @param meterStatsManager the meter stats manager - */ - public void setMeterStatsManager(MeterStatsManager meterStatsManager) { - this.meterStatsManager = meterStatsManager; - } - - /** - * Gets meter stats manager. - * - * @return the meter stats manager - */ - public MeterStatsManager getMeterStatsManager() { - return this.meterStatsManager; - } - - /** - * Sets gauge stats manager. - * - * @param gaugeStatsManager the gauge stats manager - */ - public void setGaugeStatsManager(GaugeStatsManager gaugeStatsManager) { - this.gaugeStatsManager = gaugeStatsManager; - } - - /** - * Gets gauge stats manager. - * - * @return the gauge stats manager - */ - public GaugeStatsManager getGaugeStatsManager() { - return gaugeStatsManager; - } - - private Map getMapOfObjects(String dartName) { - - String jsonData = getGcsClient().fetchJsonData(DartGet.class.getSimpleName(), getGaugeStatsManager(), this.bucketId, "dart-get/" + dartName); + private Map getMapOfObjects(String dartName, MeterStatsManager meterManager, GaugeStatsManager gaugeManager) { + String jsonData = getGcsClient().fetchJsonData( + DartGet.class.getSimpleName(), + gaugeManager, + this.bucketId, + "dart-get/" + dartName); ObjectMapper mapper = new ObjectMapper(); @@ -105,15 +67,15 @@ private Map getMapOfObjects(String dartName) { try { map = mapper.readValue(jsonData, Map.class); } catch (IOException e) { - getMeterStatsManager().markEvent(DartAspects.DART_GCS_FETCH_FAILURES); + meterManager.markEvent(DartAspects.DART_GCS_FETCH_FAILURES); e.printStackTrace(); } return map; } - private Set getSetOfObjects(String dartName) { + private Set getSetOfObjects(String dartName, MeterStatsManager meterManager, GaugeStatsManager gaugeManager) { - String jsonData = getGcsClient().fetchJsonData(DartContains.class.getSimpleName(), getGaugeStatsManager(), this.bucketId, "dart-contains/" + dartName); + String jsonData = getGcsClient().fetchJsonData(DartContains.class.getSimpleName(), gaugeManager, this.bucketId, "dart-contains/" + dartName); ObjectMapper mapper = new ObjectMapper(); try { ObjectNode node = (ObjectNode) mapper.readTree(jsonData); @@ -124,7 +86,7 @@ private Set getSetOfObjects(String dartName) { return new HashSet<>(list); } catch (Exception e) { - getMeterStatsManager().markEvent(DartAspects.DART_GCS_FETCH_FAILURES); + meterManager.markEvent(DartAspects.DART_GCS_FETCH_FAILURES); e.printStackTrace(); } diff --git a/dagger-functions/src/test/java/com/gotocompany/dagger/functions/udfs/scalar/DartContainsTest.java b/dagger-functions/src/test/java/com/gotocompany/dagger/functions/udfs/scalar/DartContainsTest.java index 1393a3b61..7a0b47113 100644 --- a/dagger-functions/src/test/java/com/gotocompany/dagger/functions/udfs/scalar/DartContainsTest.java +++ b/dagger-functions/src/test/java/com/gotocompany/dagger/functions/udfs/scalar/DartContainsTest.java @@ -1,9 +1,10 @@ package com.gotocompany.dagger.functions.udfs.scalar; - +import com.gotocompany.dagger.common.metrics.managers.GaugeStatsManager; import com.gotocompany.dagger.common.metrics.managers.MeterStatsManager; import com.gotocompany.dagger.functions.exceptions.TagDoesNotExistException; -import com.gotocompany.dagger.functions.udfs.scalar.dart.store.gcs.GcsDataStore; +import com.gotocompany.dagger.functions.udfs.scalar.dart.store.DartDataStore; +import com.gotocompany.dagger.functions.udfs.scalar.dart.store.gcs.GcsDartDataStore; import com.gotocompany.dagger.functions.udfs.scalar.dart.types.SetCache; import org.apache.flink.metrics.Gauge; import org.apache.flink.metrics.MetricGroup; @@ -14,13 +15,13 @@ import org.mockito.Mockito; import static java.util.Collections.singleton; -import static org.junit.Assert.assertEquals; +import static org.junit.Assert.*; import static org.mockito.Mockito.*; import static org.mockito.MockitoAnnotations.initMocks; import static org.mockito.internal.verification.VerificationModeFactory.times; public class DartContainsTest { - private GcsDataStore dataStore; + private DartDataStore dataStore; @Mock private MetricGroup metricGroup; @@ -31,173 +32,154 @@ public class DartContainsTest { @Mock private MeterStatsManager meterStatsManager; + @Mock + private GaugeStatsManager gaugeStatsManager; + + // Subject + private DartContains dartContains; + @Before public void setUp() { initMocks(this); when(functionContext.getMetricGroup()).thenReturn(metricGroup); when(metricGroup.addGroup("udf", "DartContains")).thenReturn(metricGroup); when(metricGroup.addGroup("DartContains")).thenReturn(metricGroup); - this.dataStore = mock(GcsDataStore.class); - when(dataStore.getMeterStatsManager()).thenReturn(meterStatsManager); + this.dataStore = mock(GcsDartDataStore.class); + + dartContains = new DartContains(dataStore); + + dartContains.setMeterStatsManager(meterStatsManager); + dartContains.setGaugeStatsManager(gaugeStatsManager); } @Test public void shouldReturnTrueWhenFieldContainsTheValue() { - when(dataStore.getSet("someList")).thenReturn(new SetCache(singleton("someField"))); - - DartContains dartContains = new DartContains(dataStore); + when(dataStore.getSet("someList", meterStatsManager, gaugeStatsManager)).thenReturn(new SetCache(singleton("someField"))); - assertEquals(true, dartContains.eval("someList", "someField", 0)); + assertTrue(dartContains.eval("someList", "someField", 0)); } @Test public void shouldReturnTrueWhenFieldContainsTheValueFromDifferentPaths() { - when(dataStore.getSet("someList")).thenReturn(new SetCache(singleton("someField"))); - when(dataStore.getSet("otherList")).thenReturn(new SetCache(singleton("otherField"))); + when(dataStore.getSet("someList", meterStatsManager, gaugeStatsManager)).thenReturn(new SetCache(singleton("someField"))); + when(dataStore.getSet("otherList", meterStatsManager, gaugeStatsManager)).thenReturn(new SetCache(singleton("otherField"))); - DartContains dartContains = new DartContains(dataStore); - - assertEquals(true, dartContains.eval("someList", "someField", 0)); - assertEquals(true, dartContains.eval("otherList", "otherField", 0)); + assertTrue(dartContains.eval("someList", "someField", 0)); + assertTrue(dartContains.eval("otherList", "otherField", 0)); } @Test public void shouldReturnFalseWhenFieldDoesNotContainsTheValue() { - when(dataStore.getSet("someList")).thenReturn(new SetCache(singleton("someField"))); - - DartContains dartContains = new DartContains(dataStore); + when(dataStore.getSet("someList", meterStatsManager, gaugeStatsManager)).thenReturn(new SetCache(singleton("someField"))); - assertEquals(false, dartContains.eval("someList", "otherField", 0)); + assertFalse(dartContains.eval("someList", "otherField", 0)); } @Test(expected = TagDoesNotExistException.class) public void shouldThrowErrorWhenFieldIsNotExist() { - when(dataStore.getSet("nonExistingList")).thenThrow(TagDoesNotExistException.class); - - DartContains dartContains = new DartContains(dataStore); + when(dataStore.getSet("nonExistingList", meterStatsManager, gaugeStatsManager)).thenThrow(TagDoesNotExistException.class); dartContains.eval("nonExistingList", "someField", 0); } @Test public void shouldNotInvokeDataSourceWhenInvokedAgainWithinRefreshRate() { - when(dataStore.getSet("someList")).thenReturn(new SetCache(singleton("someField"))); + when(dataStore.getSet("someList", meterStatsManager, gaugeStatsManager)).thenReturn(new SetCache(singleton("someField"))); - DartContains dartContains = new DartContains(dataStore); dartContains.eval("someList", "someField", 1); dartContains.eval("someList", "otherField", 1); - verify(dataStore, times(1)).getSet("someList"); + verify(dataStore, times(1)).getSet("someList", meterStatsManager, gaugeStatsManager); } @Test public void shouldInvokeDataSourceWhenExceededRefreshRate() { - when(dataStore.getSet("someList")).thenReturn(new SetCache(singleton("someField"))); + when(dataStore.getSet("someList", meterStatsManager, gaugeStatsManager)).thenReturn(new SetCache(singleton("someField"))); - DartContains dartContains = new DartContains(dataStore); dartContains.eval("someList", "someField", -1); dartContains.eval("someList", "otherField", -1); - verify(dataStore, times(2)).getSet("someList"); + verify(dataStore, times(2)).getSet("someList", meterStatsManager, gaugeStatsManager); } @Test public void shouldReturnTrueWhenFieldContainsTheValueInMiddleWithARegex() { - when(dataStore.getSet("someList")).thenReturn(new SetCache(singleton("prefixsomeField"))); - - DartContains dartContains = new DartContains(dataStore); + when(dataStore.getSet("someList", meterStatsManager, gaugeStatsManager)).thenReturn(new SetCache(singleton("prefixsomeField"))); - assertEquals(true, dartContains.eval("someList", "a sentence with prefixsomeField and an end", ".*%s.*")); + assertTrue(dartContains.eval("someList", "a sentence with prefixsomeField and an end", ".*%s.*")); } @Test public void shouldReturnFalseWhenTagContainsSpaceAndFieldDoesNotWithARegex() { - when(dataStore.getSet("someList")).thenReturn(new SetCache(singleton("prefixsomeField "))); - - DartContains dartContains = new DartContains(dataStore); + when(dataStore.getSet("someList", meterStatsManager, gaugeStatsManager)).thenReturn(new SetCache(singleton("prefixsomeField "))); - assertEquals(false, dartContains.eval("someList", "a sentence with prefixsomeFieldsuffix and an end", ".*%s.*")); + assertFalse(dartContains.eval("someList", "a sentence with prefixsomeFieldsuffix and an end", ".*%s.*")); } @Test public void shouldReturnTrueWhenFieldContainsTheValueAtEndWithARegex() { - when(dataStore.getSet("someList")).thenReturn(new SetCache(singleton("prefixsomeField"))); + when(dataStore.getSet("someList", meterStatsManager, gaugeStatsManager)).thenReturn(new SetCache(singleton("prefixsomeField"))); - DartContains dartContains = new DartContains(dataStore); - - assertEquals(true, dartContains.eval("someList", "a sentence that ends with prefixsomeField", ".*%s")); + assertTrue(dartContains.eval("someList", "a sentence that ends with prefixsomeField", ".*%s")); } @Test public void shouldReturnTrueWhenFieldContainsTheValueAtBeginningWithARegex() { - when(dataStore.getSet("someList")).thenReturn(new SetCache(singleton("prefixsomeField"))); - - DartContains dartContains = new DartContains(dataStore); + when(dataStore.getSet("someList", meterStatsManager, gaugeStatsManager)).thenReturn(new SetCache(singleton("prefixsomeField"))); - assertEquals(true, dartContains.eval("someList", "prefixsomeField is the start of this sentence", "%s.*")); + assertTrue(dartContains.eval("someList", "prefixsomeField is the start of this sentence", "%s.*")); } @Test public void shouldReturnTrueWhenFieldContainsEntireValueWithARegex() { - when(dataStore.getSet("someList")).thenReturn(new SetCache(singleton("prefixsomeField"))); + when(dataStore.getSet("someList", meterStatsManager, gaugeStatsManager)).thenReturn(new SetCache(singleton("prefixsomeField"))); - DartContains dartContains = new DartContains(dataStore); - - assertEquals(true, dartContains.eval("someList", "prefixsomeField", "%s")); + assertTrue(dartContains.eval("someList", "prefixsomeField", "%s")); } @Test public void shouldReturnFalseWhenFieldContainsValueNotInSameCaseWithARegex() { - when(dataStore.getSet("someList")).thenReturn(new SetCache(singleton("prefixsomeField"))); - - DartContains dartContains = new DartContains(dataStore); + when(dataStore.getSet("someList", meterStatsManager, gaugeStatsManager)).thenReturn(new SetCache(singleton("prefixsomeField"))); - assertEquals(false, dartContains.eval("someList", "preFixSomEfield", ".*%s.*")); + assertFalse(dartContains.eval("someList", "preFixSomEfield", ".*%s.*")); } @Test public void shouldReturnFalseWhenFieldDoesNotContainsTheValueWithARegex() { - when(dataStore.getSet("someList")).thenReturn(new SetCache(singleton("someField"))); + when(dataStore.getSet("someList", meterStatsManager, gaugeStatsManager)).thenReturn(new SetCache(singleton("someField"))); - DartContains dartContains = new DartContains(dataStore); - - assertEquals(false, dartContains.eval("someList", "other", ".*%s.*")); + assertFalse(dartContains.eval("someList", "other", ".*%s.*")); } @Test(expected = TagDoesNotExistException.class) public void shouldThrowErrorWhenFieldIsNotExistWithARegex() { - when(dataStore.getSet("nonExistingList")).thenThrow(TagDoesNotExistException.class); - - DartContains dartContains = new DartContains(dataStore); + when(dataStore.getSet("nonExistingList", meterStatsManager, gaugeStatsManager)).thenThrow(TagDoesNotExistException.class); dartContains.eval("nonExistingList", "someField", ".*%s.*"); } @Test public void shouldNotInvokeDataSourceWhenInvokedAgainWithinRefreshRateWithARegex() { - when(dataStore.getSet("someList")).thenReturn(new SetCache(singleton("someField"))); + when(dataStore.getSet("someList", meterStatsManager, gaugeStatsManager)).thenReturn(new SetCache(singleton("someField"))); - DartContains dartContains = new DartContains(dataStore); dartContains.eval("someList", "someField", ".*%s.*", 1); dartContains.eval("someList", "otherField", ".*%s.*", 1); - verify(dataStore, times(1)).getSet("someList"); + verify(dataStore, times(1)).getSet("someList", meterStatsManager, gaugeStatsManager); } @Test public void shouldInvokeDataSourceWhenExceededRefreshRateWithARegex() { - when(dataStore.getSet("someList")).thenReturn(new SetCache(singleton("someField"))); + when(dataStore.getSet("someList", meterStatsManager, gaugeStatsManager)).thenReturn(new SetCache(singleton("someField"))); - DartContains dartContains = new DartContains(dataStore); dartContains.eval("someList", "someField", ".*%s.*", -1); dartContains.eval("someList", "otherField", ".*%s.*", -1); - verify(dataStore, times(2)).getSet("someList"); + verify(dataStore, times(2)).getSet("someList", meterStatsManager, gaugeStatsManager); } @Test public void shouldRegisterGauge() throws Exception { - DartContains dartContains = new DartContains(dataStore); dartContains.open(functionContext); verify(metricGroup, Mockito.times(1)).gauge(any(String.class), any(Gauge.class)); } diff --git a/dagger-functions/src/test/java/com/gotocompany/dagger/functions/udfs/scalar/DartGetTest.java b/dagger-functions/src/test/java/com/gotocompany/dagger/functions/udfs/scalar/DartGetTest.java index d896bb2d4..bfcd38805 100644 --- a/dagger-functions/src/test/java/com/gotocompany/dagger/functions/udfs/scalar/DartGetTest.java +++ b/dagger-functions/src/test/java/com/gotocompany/dagger/functions/udfs/scalar/DartGetTest.java @@ -1,9 +1,10 @@ package com.gotocompany.dagger.functions.udfs.scalar; +import com.gotocompany.dagger.common.metrics.managers.GaugeStatsManager; import com.gotocompany.dagger.common.metrics.managers.MeterStatsManager; import com.gotocompany.dagger.functions.exceptions.KeyDoesNotExistException; import com.gotocompany.dagger.functions.exceptions.TagDoesNotExistException; -import com.gotocompany.dagger.functions.udfs.scalar.dart.store.gcs.GcsDataStore; +import com.gotocompany.dagger.functions.udfs.scalar.dart.store.gcs.GcsDartDataStore; import com.gotocompany.dagger.functions.udfs.scalar.dart.types.MapCache; import org.apache.flink.metrics.Gauge; import org.apache.flink.metrics.MetricGroup; @@ -18,7 +19,7 @@ import static org.mockito.MockitoAnnotations.initMocks; public class DartGetTest { - private GcsDataStore dataStore; + private GcsDartDataStore dataStore; @Mock private MetricGroup metricGroup; @@ -29,23 +30,31 @@ public class DartGetTest { @Mock private MeterStatsManager meterStatsManager; + @Mock + private GaugeStatsManager gaugeStatsManager; + + // Subject + private DartGet dartGet; + @Before public void setUp() { initMocks(this); when(functionContext.getMetricGroup()).thenReturn(metricGroup); when(metricGroup.addGroup("udf", "DartGet")).thenReturn(metricGroup); when(metricGroup.addGroup("DartGet")).thenReturn(metricGroup); - this.dataStore = mock(GcsDataStore.class); - when(dataStore.getMeterStatsManager()).thenReturn(meterStatsManager); + this.dataStore = mock(GcsDartDataStore.class); + + dartGet = new DartGet(dataStore); + + dartGet.setMeterStatsManager(meterStatsManager); + dartGet.setGaugeStatsManager(gaugeStatsManager); } @Test public void shouldReturnValueWhenMapAndKeyExist() { String key = "some-key"; String value = "expected-value"; - when(dataStore.getMap("someMap")).thenReturn(new MapCache(singletonMap(key, value))); - - DartGet dartGet = new DartGet(dataStore); + when(dataStore.getMap("someMap", meterStatsManager, gaugeStatsManager)).thenReturn(new MapCache(singletonMap(key, value))); assertEquals(value, dartGet.eval("someMap", "some-key", 1)); } @@ -56,10 +65,8 @@ public void shouldReturnDifferentValueWhenMapAndKeyExistForAllOfThem() { String key2 = "other-key"; String value = "expected-value"; String value2 = "other-expected-value"; - when(dataStore.getMap("someMap")).thenReturn(new MapCache(singletonMap(key, value))); - when(dataStore.getMap("otherMap")).thenReturn(new MapCache(singletonMap(key2, value2))); - - DartGet dartGet = new DartGet(dataStore); + when(dataStore.getMap("someMap", meterStatsManager, gaugeStatsManager)).thenReturn(new MapCache(singletonMap(key, value))); + when(dataStore.getMap("otherMap", meterStatsManager, gaugeStatsManager)).thenReturn(new MapCache(singletonMap(key2, value2))); assertEquals(value, dartGet.eval("someMap", "some-key", 1)); assertEquals(value2, dartGet.eval("otherMap", "other-key", 1)); @@ -67,9 +74,7 @@ public void shouldReturnDifferentValueWhenMapAndKeyExistForAllOfThem() { @Test(expected = TagDoesNotExistException.class) public void shouldThrowErrorWhenMapDoesNotExist() { - when(dataStore.getMap("nonExistingMap")).thenThrow(TagDoesNotExistException.class); - - DartGet dartGet = new DartGet(dataStore); + when(dataStore.getMap("nonExistingMap", meterStatsManager, gaugeStatsManager)).thenThrow(TagDoesNotExistException.class); dartGet.eval("nonExistingMap", "some-key", 1); } @@ -77,55 +82,48 @@ public void shouldThrowErrorWhenMapDoesNotExist() { @Test(expected = KeyDoesNotExistException.class) public void shouldThrowErrorWhenKeyDoesNotExistAndDefaultValueNotGiven() { MapCache mapCache = mock(MapCache.class); - when(dataStore.getMap("someMap")).thenReturn(mapCache); + when(dataStore.getMap("someMap", meterStatsManager, gaugeStatsManager)).thenReturn(mapCache); when(mapCache.get("nonExistingKey")).thenThrow(KeyDoesNotExistException.class); - DartGet dartGet = new DartGet(dataStore); - dartGet.eval("someMap", "nonExistingKey", 1); } @Test public void shouldReturnDefaultValueWhenKeyIsNotFoundAndDefaultValueGiven() { MapCache mapCache = mock(MapCache.class); - when(dataStore.getMap("someMap")).thenReturn(mapCache); + when(dataStore.getMap("someMap", meterStatsManager, gaugeStatsManager)).thenReturn(mapCache); when(mapCache.get("nonExistingKey")).thenThrow(KeyDoesNotExistException.class); String defaultValue = "some value"; - DartGet dartGet = new DartGet(dataStore); - assertEquals(defaultValue, dartGet.eval("someMap", "nonExistingKey", 1, defaultValue)); } @Test public void shouldNotInvokeDataSourceWhenNotExceededRefreshRate() { MapCache mapCache = mock(MapCache.class); - when(dataStore.getMap("someMap")).thenReturn(mapCache); + when(dataStore.getMap("someMap", meterStatsManager, gaugeStatsManager)).thenReturn(mapCache); when(mapCache.hasExpired(1)).thenReturn(false); - DartGet dartGet = new DartGet(dataStore); dartGet.eval("someMap", "some-key", 1); dartGet.eval("someMap", "some-key", 1); - verify(dataStore, times(1)).getMap("someMap"); + verify(dataStore, times(1)).getMap("someMap", meterStatsManager, gaugeStatsManager); } @Test public void shouldInvokeDataSourceWhenExceededRefreshRate() { MapCache mapCache = mock(MapCache.class); - when(dataStore.getMap("someMap")).thenReturn(mapCache); + when(dataStore.getMap("someMap", meterStatsManager, gaugeStatsManager)).thenReturn(mapCache); when(mapCache.hasExpired(-1)).thenReturn(true); - DartGet dartGet = new DartGet(dataStore); dartGet.eval("someMap", "some-key", -1); - verify(dataStore, times(1)).getMap("someMap"); + verify(dataStore, times(1)).getMap("someMap", meterStatsManager, gaugeStatsManager); } @Test public void shouldRegisterGauge() throws Exception { - DartGet dartGet = new DartGet(dataStore); dartGet.open(functionContext); verify(metricGroup, times(1)).gauge(any(String.class), any(Gauge.class)); } diff --git a/dagger-functions/src/test/java/com/gotocompany/dagger/functions/udfs/scalar/dart/store/gcs/GcsDataStoreTest.java b/dagger-functions/src/test/java/com/gotocompany/dagger/functions/udfs/scalar/dart/store/gcs/GcsDartDataStoreTest.java similarity index 80% rename from dagger-functions/src/test/java/com/gotocompany/dagger/functions/udfs/scalar/dart/store/gcs/GcsDataStoreTest.java rename to dagger-functions/src/test/java/com/gotocompany/dagger/functions/udfs/scalar/dart/store/gcs/GcsDartDataStoreTest.java index 229e0656a..bd6444e38 100644 --- a/dagger-functions/src/test/java/com/gotocompany/dagger/functions/udfs/scalar/dart/store/gcs/GcsDataStoreTest.java +++ b/dagger-functions/src/test/java/com/gotocompany/dagger/functions/udfs/scalar/dart/store/gcs/GcsDartDataStoreTest.java @@ -1,6 +1,6 @@ package com.gotocompany.dagger.functions.udfs.scalar.dart.store.gcs; - +import com.gotocompany.dagger.common.metrics.managers.GaugeStatsManager; import com.gotocompany.dagger.common.metrics.managers.MeterStatsManager; import com.gotocompany.dagger.functions.exceptions.BucketDoesNotExistException; import com.gotocompany.dagger.functions.exceptions.TagDoesNotExistException; @@ -19,42 +19,39 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -public class GcsDataStoreTest { - - +public class GcsDartDataStoreTest { private final String defaultListName = "listName"; private final String defaultMapName = "mapName"; @Rule public ExpectedException thrown = ExpectedException.none(); - private GcsDataStore gcsDataStore; + private GcsDartDataStore gcsDataStore; private List listContent; private Map mapContent; private GcsClient gcsClient; private MeterStatsManager meterStatsManager; + private GaugeStatsManager gaugeStatsManager; @Before public void setUp() { - gcsDataStore = mock(GcsDataStore.class); + gcsDataStore = mock(GcsDartDataStore.class); gcsClient = mock(GcsClient.class); meterStatsManager = mock(MeterStatsManager.class); - when(gcsDataStore.getSet(anyString())).thenCallRealMethod(); - when(gcsDataStore.getMap(anyString())).thenCallRealMethod(); + gaugeStatsManager = mock(GaugeStatsManager.class); + when(gcsDataStore.getSet(anyString(), any(), any())).thenCallRealMethod(); + when(gcsDataStore.getMap(anyString(), any(), any())).thenCallRealMethod(); when(gcsDataStore.getGcsClient()).thenReturn(gcsClient); - when(gcsDataStore.getMeterStatsManager()).thenReturn(meterStatsManager); listContent = Arrays.asList("listContent"); mapContent = Collections.singletonMap("key", "value"); - } - @Test public void shouldGetExistingListGivenName() { String jsonData = " { \"data\" : [ \"listContent\" ] } "; when(gcsClient.fetchJsonData(any(), any(), any(), anyString())).thenReturn(jsonData); SetCache setCache = new SetCache(new HashSet<>(listContent)); - Assert.assertEquals(setCache, gcsDataStore.getSet(defaultListName)); + Assert.assertEquals(setCache, gcsDataStore.getSet(defaultListName, meterStatsManager, gaugeStatsManager)); } @Test @@ -62,10 +59,9 @@ public void shouldThrowTagDoesNotExistWhenListIsNotThere() { thrown.expect(TagDoesNotExistException.class); thrown.expectMessage("Could not find the content in gcs for invalidListName"); - when(gcsClient.fetchJsonData(any(), any(), any(), anyString())).thenThrow(new TagDoesNotExistException("Could not find the content in gcs for invalidListName")); - gcsDataStore.getSet("invalidListName"); + gcsDataStore.getSet("invalidListName", meterStatsManager, gaugeStatsManager); } @Test @@ -73,21 +69,18 @@ public void shouldThrowBucketDoesNotExistWhenBucketIsNotThere() { thrown.expect(BucketDoesNotExistException.class); thrown.expectMessage("Could not find the bucket in gcs for invalidListName"); - when(gcsClient.fetchJsonData(any(), any(), any(), anyString())).thenThrow(new BucketDoesNotExistException("Could not find the bucket in gcs for invalidListName")); - gcsDataStore.getSet("invalidListName"); + gcsDataStore.getSet("invalidListName", meterStatsManager, gaugeStatsManager); } - @Test public void shouldGetExistingMapGivenName() { - String jsonData = " { \"key\" : \"value\" } "; when(gcsClient.fetchJsonData(any(), any(), any(), anyString())).thenReturn(jsonData); MapCache mapCache = new MapCache(new HashMap<>(mapContent)); - Assert.assertEquals(mapCache, gcsDataStore.getMap(defaultMapName)); + Assert.assertEquals(mapCache, gcsDataStore.getMap(defaultMapName, meterStatsManager, gaugeStatsManager)); } @Test @@ -97,8 +90,6 @@ public void shouldThrowTagDoesNotExistWhenMapIsNotThere() { when(gcsClient.fetchJsonData(any(), any(), any(), anyString())).thenThrow(new TagDoesNotExistException("Could not find the content in gcs for invalidMapName")); - gcsDataStore.getSet("invalidMapName"); + gcsDataStore.getSet("invalidMapName", meterStatsManager, gaugeStatsManager); } - - } From fd991c4f91f43e368f16499b1a6e20841681e51a Mon Sep 17 00:00:00 2001 From: rajuGT Date: Thu, 23 Jan 2025 14:04:37 +0530 Subject: [PATCH 5/8] Dart Support for OSS Service Provider (#45) * Add gradle tasks to minimal and dependencies to maven local * Add capability to dagger to read python udfs from Ali(oss) and Tencent(cosn) storage services Given the configuration provided correctly. Set the below environment variables accordingly to access the files stored in the respective bucket. Ali(oss) - OSS_ACCESS_KEY_ID - OSS_ACCESS_KEY_SECRET Tencent(cos) - COS_SECRET_ID - COS_SECRET_KEY - COS_REGION * OSS client endpoint should be configurable via ENV variable * COS filesystem high availability support If you need to use COS filesystem for the dagger, provide the cos bucket/key configuration in the state.backend.fs.checkpointdir, state.savepoints.dir, high-availability.storageDir to flinkdeployment manifest. If the filesystem protocol begins with cosn for the above configurations, dagger uses the below configurations provided in the flinkdeployment manifest file. fs.cosn.impl: org.apache.hadoop.fs.CosFileSystem fs.AbstractFileSystem.cosn.impl: org.apache.hadoop.fs.CosN fs.cosn.userinfo.secretId: fs.cosn.userinfo.secretKey: fs.cosn.bucket.region: fs.cosn.bucket.endpoint_suffix: * Fix checkstyle and made constants as static variables * Refactor Dart Feature to plug other object storage service providers * test checkstyle fix * Dart Support for OSS Service Provider * fix checkstyle --------- Co-authored-by: Raju G T Co-authored-by: rajuGT --- .../common/configuration/Configuration.java | 4 ++ .../configuration/ConfigurationTest.java | 6 ++ .../dagger/functions/common/Constants.java | 14 ++++ .../exceptions/TagDoesNotExistException.java | 4 ++ .../udfs/factories/FunctionFactory.java | 30 ++++++-- .../dart/store/DartDataStoreClient.java | 22 ++++++ ...taStore.java => DefaultDartDataStore.java} | 44 +++++------- .../{GcsClient.java => GcsDartClient.java} | 5 +- .../scalar/dart/store/oss/OssDartClient.java | 68 +++++++++++++++++++ .../udfs/scalar/DartContainsTest.java | 4 +- .../functions/udfs/scalar/DartGetTest.java | 6 +- ...est.java => DefaultDartDataStoreTest.java} | 39 +++++------ .../dart/store/oss/OssDartClientTest.java | 49 +++++++++++++ 13 files changed, 238 insertions(+), 57 deletions(-) create mode 100644 dagger-functions/src/main/java/com/gotocompany/dagger/functions/udfs/scalar/dart/store/DartDataStoreClient.java rename dagger-functions/src/main/java/com/gotocompany/dagger/functions/udfs/scalar/dart/store/{gcs/GcsDartDataStore.java => DefaultDartDataStore.java} (72%) rename dagger-functions/src/main/java/com/gotocompany/dagger/functions/udfs/scalar/dart/store/gcs/{GcsClient.java => GcsDartClient.java} (92%) create mode 100644 dagger-functions/src/main/java/com/gotocompany/dagger/functions/udfs/scalar/dart/store/oss/OssDartClient.java rename dagger-functions/src/test/java/com/gotocompany/dagger/functions/udfs/scalar/dart/store/{gcs/GcsDartDataStoreTest.java => DefaultDartDataStoreTest.java} (56%) create mode 100644 dagger-functions/src/test/java/com/gotocompany/dagger/functions/udfs/scalar/dart/store/oss/OssDartClientTest.java diff --git a/dagger-common/src/main/java/com/gotocompany/dagger/common/configuration/Configuration.java b/dagger-common/src/main/java/com/gotocompany/dagger/common/configuration/Configuration.java index 7a471879f..c819475a1 100644 --- a/dagger-common/src/main/java/com/gotocompany/dagger/common/configuration/Configuration.java +++ b/dagger-common/src/main/java/com/gotocompany/dagger/common/configuration/Configuration.java @@ -15,6 +15,10 @@ public ParameterTool getParam() { return param; } + public String getString(String configKey) { + return param.get(configKey); + } + public String getString(String configKey, String defaultValue) { return param.get(configKey, defaultValue); } diff --git a/dagger-common/src/test/java/com/gotocompany/dagger/common/configuration/ConfigurationTest.java b/dagger-common/src/test/java/com/gotocompany/dagger/common/configuration/ConfigurationTest.java index 2046c04f7..2119f25cf 100644 --- a/dagger-common/src/test/java/com/gotocompany/dagger/common/configuration/ConfigurationTest.java +++ b/dagger-common/src/test/java/com/gotocompany/dagger/common/configuration/ConfigurationTest.java @@ -7,6 +7,7 @@ import org.mockito.Mock; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; import static org.mockito.Mockito.when; import static org.mockito.MockitoAnnotations.initMocks; @@ -30,6 +31,11 @@ public void shouldGetStringFromParamTool() { assertEquals("test_value", configuration.getString("test_config", "test_default")); } + @Test + public void shouldGetNullIfParamIsNotSet() { + assertNull(configuration.getString("config_not_exist")); + } + @Test public void shouldGetIntegerFromParamTool() { when(parameterTool.getInt("test_config", 1)).thenReturn(2); diff --git a/dagger-functions/src/main/java/com/gotocompany/dagger/functions/common/Constants.java b/dagger-functions/src/main/java/com/gotocompany/dagger/functions/common/Constants.java index bb60d02bc..a469733a4 100644 --- a/dagger-functions/src/main/java/com/gotocompany/dagger/functions/common/Constants.java +++ b/dagger-functions/src/main/java/com/gotocompany/dagger/functions/common/Constants.java @@ -3,11 +3,25 @@ public class Constants { public static final Integer NUMBER_OF_DATA_TYPES_IN_FEATURE_ROW = 8; public static final Integer NUMBER_OF_ARGUMENTS_IN_FEATURE_ACCUMULATOR = 3; + + @Deprecated public static final String UDF_DART_GCS_PROJECT_ID_KEY = "UDF_DART_GCS_PROJECT_ID"; + @Deprecated public static final String UDF_DART_GCS_PROJECT_ID_DEFAULT = ""; + @Deprecated public static final String UDF_DART_GCS_BUCKET_ID_KEY = "UDF_DART_GCS_BUCKET_ID"; + @Deprecated public static final String UDF_DART_GCS_BUCKET_ID_DEFAULT = ""; + public static final String UDF_DART_PROJECT_ID_KEY = "UDF_DART_PROJECT_ID"; + public static final String UDF_DART_PROJECT_ID_DEFAULT = ""; + public static final String UDF_DART_BUCKET_ID_KEY = "UDF_DART_BUCKET_ID"; + public static final String UDF_DART_BUCKET_ID_DEFAULT = ""; + + public static final String UDF_STORE_PROVIDER_KEY = "UDF_STORE_PROVIDER"; + public static final String UDF_STORE_PROVIDER_GCS = "GCS"; + public static final String UDF_STORE_PROVIDER_OSS = "OSS"; + public static final String PYTHON_UDF_CONFIG = "PYTHON_UDF_CONFIG"; public static final String PYTHON_UDF_ENABLE_KEY = "PYTHON_UDF_ENABLE"; public static final boolean PYTHON_UDF_ENABLE_DEFAULT = false; diff --git a/dagger-functions/src/main/java/com/gotocompany/dagger/functions/exceptions/TagDoesNotExistException.java b/dagger-functions/src/main/java/com/gotocompany/dagger/functions/exceptions/TagDoesNotExistException.java index 65bc9def4..6bdb1eae1 100644 --- a/dagger-functions/src/main/java/com/gotocompany/dagger/functions/exceptions/TagDoesNotExistException.java +++ b/dagger-functions/src/main/java/com/gotocompany/dagger/functions/exceptions/TagDoesNotExistException.java @@ -12,4 +12,8 @@ public class TagDoesNotExistException extends RuntimeException { public TagDoesNotExistException(String message) { super(message); } + + public TagDoesNotExistException(String message, Throwable cause) { + super(message, cause); + } } diff --git a/dagger-functions/src/main/java/com/gotocompany/dagger/functions/udfs/factories/FunctionFactory.java b/dagger-functions/src/main/java/com/gotocompany/dagger/functions/udfs/factories/FunctionFactory.java index 41e82f7ee..b93287d17 100644 --- a/dagger-functions/src/main/java/com/gotocompany/dagger/functions/udfs/factories/FunctionFactory.java +++ b/dagger-functions/src/main/java/com/gotocompany/dagger/functions/udfs/factories/FunctionFactory.java @@ -2,7 +2,10 @@ import com.gotocompany.dagger.functions.common.Constants; import com.gotocompany.dagger.functions.udfs.scalar.dart.store.DartDataStore; -import com.gotocompany.dagger.functions.udfs.scalar.dart.store.gcs.GcsDartDataStore; +import com.gotocompany.dagger.functions.udfs.scalar.dart.store.DartDataStoreClient; +import com.gotocompany.dagger.functions.udfs.scalar.dart.store.DefaultDartDataStore; +import com.gotocompany.dagger.functions.udfs.scalar.dart.store.gcs.GcsDartClient; +import com.gotocompany.dagger.functions.udfs.scalar.dart.store.oss.OssDartClient; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import com.google.gson.Gson; @@ -134,9 +137,28 @@ public HashSet getAggregateUdfs() { } private DartDataStore getDartDataSource() { - String projectID = getConfiguration().getString(Constants.UDF_DART_GCS_PROJECT_ID_KEY, Constants.UDF_DART_GCS_PROJECT_ID_DEFAULT); - String bucketID = getConfiguration().getString(Constants.UDF_DART_GCS_BUCKET_ID_KEY, Constants.UDF_DART_GCS_BUCKET_ID_DEFAULT); - return new GcsDartDataStore(projectID, bucketID); + String projectID = getConfiguration().getString(Constants.UDF_DART_PROJECT_ID_KEY, Constants.UDF_DART_PROJECT_ID_DEFAULT); + String bucketID = getConfiguration().getString(Constants.UDF_DART_BUCKET_ID_KEY, Constants.UDF_DART_BUCKET_ID_DEFAULT); + + String udfStoreProvider = getConfiguration().getString(Constants.UDF_STORE_PROVIDER_KEY); + if (udfStoreProvider == null) { + udfStoreProvider = Constants.UDF_STORE_PROVIDER_GCS; + projectID = getConfiguration().getString(Constants.UDF_DART_GCS_PROJECT_ID_KEY, Constants.UDF_DART_GCS_PROJECT_ID_DEFAULT); + bucketID = getConfiguration().getString(Constants.UDF_DART_GCS_BUCKET_ID_KEY, Constants.UDF_DART_GCS_BUCKET_ID_DEFAULT); + } + + DartDataStoreClient dartDataStoreClient; + switch (udfStoreProvider) { + case Constants.UDF_STORE_PROVIDER_GCS: + dartDataStoreClient = new GcsDartClient(projectID); + break; + case Constants.UDF_STORE_PROVIDER_OSS: + dartDataStoreClient = new OssDartClient(); + break; + default: + throw new IllegalArgumentException("Unknown UDF Store Provider: " + udfStoreProvider); + } + return new DefaultDartDataStore(dartDataStoreClient, bucketID); } private LinkedHashMap getProtosInInputStreams() { diff --git a/dagger-functions/src/main/java/com/gotocompany/dagger/functions/udfs/scalar/dart/store/DartDataStoreClient.java b/dagger-functions/src/main/java/com/gotocompany/dagger/functions/udfs/scalar/dart/store/DartDataStoreClient.java new file mode 100644 index 000000000..f534ab6db --- /dev/null +++ b/dagger-functions/src/main/java/com/gotocompany/dagger/functions/udfs/scalar/dart/store/DartDataStoreClient.java @@ -0,0 +1,22 @@ +package com.gotocompany.dagger.functions.udfs.scalar.dart.store; + +import com.gotocompany.dagger.common.metrics.managers.GaugeStatsManager; +import com.gotocompany.dagger.functions.exceptions.BucketDoesNotExistException; +import com.gotocompany.dagger.functions.exceptions.TagDoesNotExistException; + +public interface DartDataStoreClient { + + /** + * If a client could provide implementation to this, use the default data store, else implement DartDataStore along with client implementation. + * + * @param udfName either "DartGet" or "DartContains" + * @param gaugeStatsManager an instrumentation provider + * @param bucketName name of the object storage service bucket + * @param dartName from the bucket, this would be either dart-get/path/to/file.json or dart-contains/path/to/file.json + * @return Content of the file in String format from abc://bucket-name/dart-(get/contains)/path/to/file.json + * @throws TagDoesNotExistException if tag doesn't exist + * @throws BucketDoesNotExistException if bucket doesn't exist + */ + String fetchJsonData(String udfName, GaugeStatsManager gaugeStatsManager, String bucketName, String dartName) + throws TagDoesNotExistException, BucketDoesNotExistException; +} diff --git a/dagger-functions/src/main/java/com/gotocompany/dagger/functions/udfs/scalar/dart/store/gcs/GcsDartDataStore.java b/dagger-functions/src/main/java/com/gotocompany/dagger/functions/udfs/scalar/dart/store/DefaultDartDataStore.java similarity index 72% rename from dagger-functions/src/main/java/com/gotocompany/dagger/functions/udfs/scalar/dart/store/gcs/GcsDartDataStore.java rename to dagger-functions/src/main/java/com/gotocompany/dagger/functions/udfs/scalar/dart/store/DefaultDartDataStore.java index ed2ce69f5..a1e61b613 100644 --- a/dagger-functions/src/main/java/com/gotocompany/dagger/functions/udfs/scalar/dart/store/gcs/GcsDartDataStore.java +++ b/dagger-functions/src/main/java/com/gotocompany/dagger/functions/udfs/scalar/dart/store/DefaultDartDataStore.java @@ -1,4 +1,4 @@ -package com.gotocompany.dagger.functions.udfs.scalar.dart.store.gcs; +package com.gotocompany.dagger.functions.udfs.scalar.dart.store; import com.gotocompany.dagger.common.metrics.managers.GaugeStatsManager; import com.gotocompany.dagger.common.metrics.managers.MeterStatsManager; @@ -7,7 +7,7 @@ import com.gotocompany.dagger.functions.udfs.scalar.dart.types.SetCache; import com.gotocompany.dagger.functions.udfs.scalar.DartContains; import com.gotocompany.dagger.functions.udfs.scalar.DartGet; -import com.gotocompany.dagger.functions.udfs.scalar.dart.store.DartDataStore; +import lombok.Getter; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.type.TypeReference; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; @@ -22,24 +22,27 @@ import java.util.Set; /** - * The type Gcs data store. + * DefaultDartDataStore would be able to fetch the darts from the object storage services. + * pass the relevant client which implements {@link DartDataStoreClient} */ -public class GcsDartDataStore implements DartDataStore, Serializable { +public class DefaultDartDataStore implements DartDataStore, Serializable { - private final String projectId; + public static final String DART_GET_DIRECTORY = "dart-get/"; + public static final String DART_CONTAINS_DIRECTORY = "dart-contains/"; private final String bucketId; - private GcsClient gcsClient; + @Getter + private final DartDataStoreClient storeClient; /** - * Instantiates a new Gcs data store. + * Instantiates a new data store. * - * @param projectId the project id - * @param bucketId the bucket id + * @param storeClient a {@link DartDataStoreClient} implementation for the respective object storage provider + * @param bucketId the bucket id */ - public GcsDartDataStore(String projectId, String bucketId) { - this.projectId = projectId; + public DefaultDartDataStore(DartDataStoreClient storeClient, String bucketId) { + this.storeClient = storeClient; this.bucketId = bucketId; } @@ -55,11 +58,11 @@ public MapCache getMap(String mapName, MeterStatsManager meterStatsManager, Gaug } private Map getMapOfObjects(String dartName, MeterStatsManager meterManager, GaugeStatsManager gaugeManager) { - String jsonData = getGcsClient().fetchJsonData( + String jsonData = getStoreClient().fetchJsonData( DartGet.class.getSimpleName(), gaugeManager, this.bucketId, - "dart-get/" + dartName); + DART_GET_DIRECTORY + dartName); ObjectMapper mapper = new ObjectMapper(); @@ -74,8 +77,7 @@ private Map getMapOfObjects(String dartName, MeterStatsManager m } private Set getSetOfObjects(String dartName, MeterStatsManager meterManager, GaugeStatsManager gaugeManager) { - - String jsonData = getGcsClient().fetchJsonData(DartContains.class.getSimpleName(), gaugeManager, this.bucketId, "dart-contains/" + dartName); + String jsonData = getStoreClient().fetchJsonData(DartContains.class.getSimpleName(), gaugeManager, this.bucketId, DART_CONTAINS_DIRECTORY + dartName); ObjectMapper mapper = new ObjectMapper(); try { ObjectNode node = (ObjectNode) mapper.readTree(jsonData); @@ -92,16 +94,4 @@ private Set getSetOfObjects(String dartName, MeterStatsManager meterMana return new HashSet<>(); } - - /** - * Gets gcs client. - * - * @return the gcs client - */ - GcsClient getGcsClient() { - if (this.gcsClient == null) { - this.gcsClient = new GcsClient(this.projectId); - } - return this.gcsClient; - } } diff --git a/dagger-functions/src/main/java/com/gotocompany/dagger/functions/udfs/scalar/dart/store/gcs/GcsClient.java b/dagger-functions/src/main/java/com/gotocompany/dagger/functions/udfs/scalar/dart/store/gcs/GcsDartClient.java similarity index 92% rename from dagger-functions/src/main/java/com/gotocompany/dagger/functions/udfs/scalar/dart/store/gcs/GcsClient.java rename to dagger-functions/src/main/java/com/gotocompany/dagger/functions/udfs/scalar/dart/store/gcs/GcsDartClient.java index 35e806199..94a7a6748 100644 --- a/dagger-functions/src/main/java/com/gotocompany/dagger/functions/udfs/scalar/dart/store/gcs/GcsClient.java +++ b/dagger-functions/src/main/java/com/gotocompany/dagger/functions/udfs/scalar/dart/store/gcs/GcsDartClient.java @@ -10,13 +10,14 @@ import com.gotocompany.dagger.functions.exceptions.BucketDoesNotExistException; import com.gotocompany.dagger.functions.exceptions.TagDoesNotExistException; import com.gotocompany.dagger.functions.udfs.scalar.dart.DartAspects; +import com.gotocompany.dagger.functions.udfs.scalar.dart.store.DartDataStoreClient; import static com.gotocompany.dagger.common.core.Constants.UDF_TELEMETRY_GROUP_KEY; /** * The type Gcs client. */ -public class GcsClient { +public class GcsDartClient implements DartDataStoreClient { private Storage storage; @@ -28,7 +29,7 @@ public class GcsClient { * * @param projectId the project id */ - public GcsClient(String projectId) { + public GcsDartClient(String projectId) { if (storage == null) { storage = StorageOptions.newBuilder() diff --git a/dagger-functions/src/main/java/com/gotocompany/dagger/functions/udfs/scalar/dart/store/oss/OssDartClient.java b/dagger-functions/src/main/java/com/gotocompany/dagger/functions/udfs/scalar/dart/store/oss/OssDartClient.java new file mode 100644 index 000000000..63337bf96 --- /dev/null +++ b/dagger-functions/src/main/java/com/gotocompany/dagger/functions/udfs/scalar/dart/store/oss/OssDartClient.java @@ -0,0 +1,68 @@ +package com.gotocompany.dagger.functions.udfs.scalar.dart.store.oss; + +import com.aliyun.core.utils.IOUtils; +import com.aliyun.oss.OSS; +import com.aliyun.oss.OSSClientBuilder; +import com.aliyun.oss.common.auth.CredentialsProviderFactory; +import com.aliyun.oss.model.OSSObject; +import com.aliyuncs.exceptions.ClientException; +import com.gotocompany.dagger.common.metrics.managers.GaugeStatsManager; +import com.gotocompany.dagger.functions.exceptions.TagDoesNotExistException; +import com.gotocompany.dagger.functions.udfs.scalar.dart.DartAspects; +import com.gotocompany.dagger.functions.udfs.scalar.dart.store.DartDataStoreClient; + +import java.io.IOException; +import java.io.InputStream; + +import static com.gotocompany.dagger.common.core.Constants.UDF_TELEMETRY_GROUP_KEY; + +public class OssDartClient implements DartDataStoreClient { + + private static final String ENV_OSS_ENDPOINT = "OSS_ENDPOINT"; + private static final String DEFAULT_OSS_ENDPOINT = "oss-ap-southeast-1.aliyuncs.com"; + + private static final Double BYTES_TO_KB = 1024.0; + private static final String DART_PATH = "dartpath"; + + private final OSS libOssClient; + + /** + * Instantiates a new Oss client. + */ + public OssDartClient() { + String endpoint = System.getenv(ENV_OSS_ENDPOINT); + if (endpoint == null || endpoint.isEmpty()) { + endpoint = DEFAULT_OSS_ENDPOINT; + } + try { + libOssClient = new OSSClientBuilder().build(endpoint, CredentialsProviderFactory.newEnvironmentVariableCredentialsProvider()); + } catch (ClientException e) { + throw new RuntimeException("failed to initialise oss client", e); + } + } + + public String fetchJsonData(String udfName, GaugeStatsManager gaugeStatsManager, String bucketName, String dartName) { + OSSObject ossObject = libOssClient.getObject(bucketName, dartName); + String dartJson; + byte[] contentByteArray; + try (InputStream inputStream = ossObject.getObjectContent()) { + contentByteArray = IOUtils.toByteArray(inputStream); + dartJson = new String(contentByteArray); + } catch (IOException e) { + throw new TagDoesNotExistException("Could not find the content in oss for + dartName", e); + } + gaugeStatsManager.registerString(UDF_TELEMETRY_GROUP_KEY, udfName, DartAspects.DART_GCS_PATH.getValue(), dartName); + gaugeStatsManager.registerDouble(DART_PATH, dartName, DartAspects.DART_GCS_FILE_SIZE.getValue(), contentByteArray.length / BYTES_TO_KB); + return dartJson; + } + + /** + * Instantiates a new OSS client. + * This constructor used for unit test purposes. + * + * @param libOssClient the storage + */ + public OssDartClient(OSS libOssClient) { + this.libOssClient = libOssClient; + } +} diff --git a/dagger-functions/src/test/java/com/gotocompany/dagger/functions/udfs/scalar/DartContainsTest.java b/dagger-functions/src/test/java/com/gotocompany/dagger/functions/udfs/scalar/DartContainsTest.java index 7a0b47113..d8643b3e8 100644 --- a/dagger-functions/src/test/java/com/gotocompany/dagger/functions/udfs/scalar/DartContainsTest.java +++ b/dagger-functions/src/test/java/com/gotocompany/dagger/functions/udfs/scalar/DartContainsTest.java @@ -4,7 +4,7 @@ import com.gotocompany.dagger.common.metrics.managers.MeterStatsManager; import com.gotocompany.dagger.functions.exceptions.TagDoesNotExistException; import com.gotocompany.dagger.functions.udfs.scalar.dart.store.DartDataStore; -import com.gotocompany.dagger.functions.udfs.scalar.dart.store.gcs.GcsDartDataStore; +import com.gotocompany.dagger.functions.udfs.scalar.dart.store.DefaultDartDataStore; import com.gotocompany.dagger.functions.udfs.scalar.dart.types.SetCache; import org.apache.flink.metrics.Gauge; import org.apache.flink.metrics.MetricGroup; @@ -44,7 +44,7 @@ public void setUp() { when(functionContext.getMetricGroup()).thenReturn(metricGroup); when(metricGroup.addGroup("udf", "DartContains")).thenReturn(metricGroup); when(metricGroup.addGroup("DartContains")).thenReturn(metricGroup); - this.dataStore = mock(GcsDartDataStore.class); + this.dataStore = mock(DefaultDartDataStore.class); dartContains = new DartContains(dataStore); diff --git a/dagger-functions/src/test/java/com/gotocompany/dagger/functions/udfs/scalar/DartGetTest.java b/dagger-functions/src/test/java/com/gotocompany/dagger/functions/udfs/scalar/DartGetTest.java index bfcd38805..81b356a66 100644 --- a/dagger-functions/src/test/java/com/gotocompany/dagger/functions/udfs/scalar/DartGetTest.java +++ b/dagger-functions/src/test/java/com/gotocompany/dagger/functions/udfs/scalar/DartGetTest.java @@ -4,7 +4,7 @@ import com.gotocompany.dagger.common.metrics.managers.MeterStatsManager; import com.gotocompany.dagger.functions.exceptions.KeyDoesNotExistException; import com.gotocompany.dagger.functions.exceptions.TagDoesNotExistException; -import com.gotocompany.dagger.functions.udfs.scalar.dart.store.gcs.GcsDartDataStore; +import com.gotocompany.dagger.functions.udfs.scalar.dart.store.DefaultDartDataStore; import com.gotocompany.dagger.functions.udfs.scalar.dart.types.MapCache; import org.apache.flink.metrics.Gauge; import org.apache.flink.metrics.MetricGroup; @@ -19,7 +19,7 @@ import static org.mockito.MockitoAnnotations.initMocks; public class DartGetTest { - private GcsDartDataStore dataStore; + private DefaultDartDataStore dataStore; @Mock private MetricGroup metricGroup; @@ -42,7 +42,7 @@ public void setUp() { when(functionContext.getMetricGroup()).thenReturn(metricGroup); when(metricGroup.addGroup("udf", "DartGet")).thenReturn(metricGroup); when(metricGroup.addGroup("DartGet")).thenReturn(metricGroup); - this.dataStore = mock(GcsDartDataStore.class); + this.dataStore = mock(DefaultDartDataStore.class); dartGet = new DartGet(dataStore); diff --git a/dagger-functions/src/test/java/com/gotocompany/dagger/functions/udfs/scalar/dart/store/gcs/GcsDartDataStoreTest.java b/dagger-functions/src/test/java/com/gotocompany/dagger/functions/udfs/scalar/dart/store/DefaultDartDataStoreTest.java similarity index 56% rename from dagger-functions/src/test/java/com/gotocompany/dagger/functions/udfs/scalar/dart/store/gcs/GcsDartDataStoreTest.java rename to dagger-functions/src/test/java/com/gotocompany/dagger/functions/udfs/scalar/dart/store/DefaultDartDataStoreTest.java index bd6444e38..d17f23eaf 100644 --- a/dagger-functions/src/test/java/com/gotocompany/dagger/functions/udfs/scalar/dart/store/gcs/GcsDartDataStoreTest.java +++ b/dagger-functions/src/test/java/com/gotocompany/dagger/functions/udfs/scalar/dart/store/DefaultDartDataStoreTest.java @@ -1,9 +1,10 @@ -package com.gotocompany.dagger.functions.udfs.scalar.dart.store.gcs; +package com.gotocompany.dagger.functions.udfs.scalar.dart.store; import com.gotocompany.dagger.common.metrics.managers.GaugeStatsManager; import com.gotocompany.dagger.common.metrics.managers.MeterStatsManager; import com.gotocompany.dagger.functions.exceptions.BucketDoesNotExistException; import com.gotocompany.dagger.functions.exceptions.TagDoesNotExistException; +import com.gotocompany.dagger.functions.udfs.scalar.dart.store.gcs.GcsDartClient; import com.gotocompany.dagger.functions.udfs.scalar.dart.types.MapCache; import com.gotocompany.dagger.functions.udfs.scalar.dart.types.SetCache; import org.junit.Assert; @@ -19,28 +20,28 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -public class GcsDartDataStoreTest { +public class DefaultDartDataStoreTest { private final String defaultListName = "listName"; private final String defaultMapName = "mapName"; @Rule public ExpectedException thrown = ExpectedException.none(); - private GcsDartDataStore gcsDataStore; + private DefaultDartDataStore defaultDartDataStore; private List listContent; private Map mapContent; - private GcsClient gcsClient; + private GcsDartClient gcsDartClient; private MeterStatsManager meterStatsManager; private GaugeStatsManager gaugeStatsManager; @Before public void setUp() { - gcsDataStore = mock(GcsDartDataStore.class); - gcsClient = mock(GcsClient.class); + defaultDartDataStore = mock(DefaultDartDataStore.class); + gcsDartClient = mock(GcsDartClient.class); meterStatsManager = mock(MeterStatsManager.class); gaugeStatsManager = mock(GaugeStatsManager.class); - when(gcsDataStore.getSet(anyString(), any(), any())).thenCallRealMethod(); - when(gcsDataStore.getMap(anyString(), any(), any())).thenCallRealMethod(); - when(gcsDataStore.getGcsClient()).thenReturn(gcsClient); + when(defaultDartDataStore.getSet(anyString(), any(), any())).thenCallRealMethod(); + when(defaultDartDataStore.getMap(anyString(), any(), any())).thenCallRealMethod(); + when(defaultDartDataStore.getStoreClient()).thenReturn(gcsDartClient); listContent = Arrays.asList("listContent"); mapContent = Collections.singletonMap("key", "value"); } @@ -49,9 +50,9 @@ public void setUp() { public void shouldGetExistingListGivenName() { String jsonData = " { \"data\" : [ \"listContent\" ] } "; - when(gcsClient.fetchJsonData(any(), any(), any(), anyString())).thenReturn(jsonData); + when(gcsDartClient.fetchJsonData(any(), any(), any(), anyString())).thenReturn(jsonData); SetCache setCache = new SetCache(new HashSet<>(listContent)); - Assert.assertEquals(setCache, gcsDataStore.getSet(defaultListName, meterStatsManager, gaugeStatsManager)); + Assert.assertEquals(setCache, defaultDartDataStore.getSet(defaultListName, meterStatsManager, gaugeStatsManager)); } @Test @@ -59,9 +60,9 @@ public void shouldThrowTagDoesNotExistWhenListIsNotThere() { thrown.expect(TagDoesNotExistException.class); thrown.expectMessage("Could not find the content in gcs for invalidListName"); - when(gcsClient.fetchJsonData(any(), any(), any(), anyString())).thenThrow(new TagDoesNotExistException("Could not find the content in gcs for invalidListName")); + when(gcsDartClient.fetchJsonData(any(), any(), any(), anyString())).thenThrow(new TagDoesNotExistException("Could not find the content in gcs for invalidListName")); - gcsDataStore.getSet("invalidListName", meterStatsManager, gaugeStatsManager); + defaultDartDataStore.getSet("invalidListName", meterStatsManager, gaugeStatsManager); } @Test @@ -69,18 +70,18 @@ public void shouldThrowBucketDoesNotExistWhenBucketIsNotThere() { thrown.expect(BucketDoesNotExistException.class); thrown.expectMessage("Could not find the bucket in gcs for invalidListName"); - when(gcsClient.fetchJsonData(any(), any(), any(), anyString())).thenThrow(new BucketDoesNotExistException("Could not find the bucket in gcs for invalidListName")); + when(gcsDartClient.fetchJsonData(any(), any(), any(), anyString())).thenThrow(new BucketDoesNotExistException("Could not find the bucket in gcs for invalidListName")); - gcsDataStore.getSet("invalidListName", meterStatsManager, gaugeStatsManager); + defaultDartDataStore.getSet("invalidListName", meterStatsManager, gaugeStatsManager); } @Test public void shouldGetExistingMapGivenName() { String jsonData = " { \"key\" : \"value\" } "; - when(gcsClient.fetchJsonData(any(), any(), any(), anyString())).thenReturn(jsonData); + when(gcsDartClient.fetchJsonData(any(), any(), any(), anyString())).thenReturn(jsonData); MapCache mapCache = new MapCache(new HashMap<>(mapContent)); - Assert.assertEquals(mapCache, gcsDataStore.getMap(defaultMapName, meterStatsManager, gaugeStatsManager)); + Assert.assertEquals(mapCache, defaultDartDataStore.getMap(defaultMapName, meterStatsManager, gaugeStatsManager)); } @Test @@ -88,8 +89,8 @@ public void shouldThrowTagDoesNotExistWhenMapIsNotThere() { thrown.expect(TagDoesNotExistException.class); thrown.expectMessage("Could not find the content in gcs for invalidMapName"); - when(gcsClient.fetchJsonData(any(), any(), any(), anyString())).thenThrow(new TagDoesNotExistException("Could not find the content in gcs for invalidMapName")); + when(gcsDartClient.fetchJsonData(any(), any(), any(), anyString())).thenThrow(new TagDoesNotExistException("Could not find the content in gcs for invalidMapName")); - gcsDataStore.getSet("invalidMapName", meterStatsManager, gaugeStatsManager); + defaultDartDataStore.getSet("invalidMapName", meterStatsManager, gaugeStatsManager); } } diff --git a/dagger-functions/src/test/java/com/gotocompany/dagger/functions/udfs/scalar/dart/store/oss/OssDartClientTest.java b/dagger-functions/src/test/java/com/gotocompany/dagger/functions/udfs/scalar/dart/store/oss/OssDartClientTest.java new file mode 100644 index 000000000..2d1b25d40 --- /dev/null +++ b/dagger-functions/src/test/java/com/gotocompany/dagger/functions/udfs/scalar/dart/store/oss/OssDartClientTest.java @@ -0,0 +1,49 @@ +package com.gotocompany.dagger.functions.udfs.scalar.dart.store.oss; + +import com.aliyun.oss.OSS; +import com.aliyun.oss.model.OSSObject; +import com.gotocompany.dagger.common.metrics.managers.GaugeStatsManager; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; + +import java.io.ByteArrayInputStream; +import java.io.IOException; + +import static org.mockito.Mockito.*; +import static org.mockito.MockitoAnnotations.initMocks; + +public class OssDartClientTest { + @Mock + private OSS libOSSClient; + + @Mock + private OSSObject ossObject; + + @Mock + private GaugeStatsManager gaugeStatsManager; + + @Before + public void setup() { + initMocks(this); + } + + @Test + public void shouldGetObjectFile() throws IOException { + String bucketName = "bucket_name"; + String udfName = "DartGet"; + String dartName = "dart-get/path/to/data.json"; + String jsonFileContent = "{\"name\":\"house-stark-dev\"}"; + + when(libOSSClient.getObject(bucketName, dartName)).thenReturn(ossObject); + when(ossObject.getObjectContent()).thenReturn(new ByteArrayInputStream(jsonFileContent.getBytes())); + + OssDartClient ossDartClient = new OssDartClient(libOSSClient); + String jsonData = ossDartClient.fetchJsonData(udfName, gaugeStatsManager, bucketName, dartName); + + verify(libOSSClient, times(1)).getObject(bucketName, dartName); + verify(ossObject, times(1)).getObjectContent(); + Assert.assertEquals(jsonFileContent, jsonData); + } +} From 3cbad4ed79b13cf0bfe98e74e9d0378da7ffc39d Mon Sep 17 00:00:00 2001 From: rajuGT Date: Thu, 23 Jan 2025 14:14:16 +0530 Subject: [PATCH 6/8] Dart Support for COS Service Provider (#46) * Add gradle tasks to minimal and dependencies to maven local * Add capability to dagger to read python udfs from Ali(oss) and Tencent(cosn) storage services Given the configuration provided correctly. Set the below environment variables accordingly to access the files stored in the respective bucket. Ali(oss) - OSS_ACCESS_KEY_ID - OSS_ACCESS_KEY_SECRET Tencent(cos) - COS_SECRET_ID - COS_SECRET_KEY - COS_REGION * OSS client endpoint should be configurable via ENV variable * COS filesystem high availability support If you need to use COS filesystem for the dagger, provide the cos bucket/key configuration in the state.backend.fs.checkpointdir, state.savepoints.dir, high-availability.storageDir to flinkdeployment manifest. If the filesystem protocol begins with cosn for the above configurations, dagger uses the below configurations provided in the flinkdeployment manifest file. fs.cosn.impl: org.apache.hadoop.fs.CosFileSystem fs.AbstractFileSystem.cosn.impl: org.apache.hadoop.fs.CosN fs.cosn.userinfo.secretId: fs.cosn.userinfo.secretKey: fs.cosn.bucket.region: fs.cosn.bucket.endpoint_suffix: * Fix checkstyle and made constants as static variables * Refactor Dart Feature to plug other object storage service providers * test checkstyle fix * Dart Support for OSS Service Provider * fix checkstyle * Dart Support for COS Service Provider --------- Co-authored-by: Raju G T Co-authored-by: rajuGT --- .../dagger/functions/common/Constants.java | 1 + .../udfs/factories/FunctionFactory.java | 4 ++ .../scalar/dart/store/cos/CosDartClient.java | 68 +++++++++++++++++++ .../dart/store/cos/CosDartClientTest.java | 54 +++++++++++++++ .../dart/store/oss/OssDartClientTest.java | 3 +- 5 files changed, 128 insertions(+), 2 deletions(-) create mode 100644 dagger-functions/src/main/java/com/gotocompany/dagger/functions/udfs/scalar/dart/store/cos/CosDartClient.java create mode 100644 dagger-functions/src/test/java/com/gotocompany/dagger/functions/udfs/scalar/dart/store/cos/CosDartClientTest.java diff --git a/dagger-functions/src/main/java/com/gotocompany/dagger/functions/common/Constants.java b/dagger-functions/src/main/java/com/gotocompany/dagger/functions/common/Constants.java index a469733a4..6931108e6 100644 --- a/dagger-functions/src/main/java/com/gotocompany/dagger/functions/common/Constants.java +++ b/dagger-functions/src/main/java/com/gotocompany/dagger/functions/common/Constants.java @@ -21,6 +21,7 @@ public class Constants { public static final String UDF_STORE_PROVIDER_KEY = "UDF_STORE_PROVIDER"; public static final String UDF_STORE_PROVIDER_GCS = "GCS"; public static final String UDF_STORE_PROVIDER_OSS = "OSS"; + public static final String UDF_STORE_PROVIDER_COS = "COS"; public static final String PYTHON_UDF_CONFIG = "PYTHON_UDF_CONFIG"; public static final String PYTHON_UDF_ENABLE_KEY = "PYTHON_UDF_ENABLE"; diff --git a/dagger-functions/src/main/java/com/gotocompany/dagger/functions/udfs/factories/FunctionFactory.java b/dagger-functions/src/main/java/com/gotocompany/dagger/functions/udfs/factories/FunctionFactory.java index b93287d17..da80540c9 100644 --- a/dagger-functions/src/main/java/com/gotocompany/dagger/functions/udfs/factories/FunctionFactory.java +++ b/dagger-functions/src/main/java/com/gotocompany/dagger/functions/udfs/factories/FunctionFactory.java @@ -4,6 +4,7 @@ import com.gotocompany.dagger.functions.udfs.scalar.dart.store.DartDataStore; import com.gotocompany.dagger.functions.udfs.scalar.dart.store.DartDataStoreClient; import com.gotocompany.dagger.functions.udfs.scalar.dart.store.DefaultDartDataStore; +import com.gotocompany.dagger.functions.udfs.scalar.dart.store.cos.CosDartClient; import com.gotocompany.dagger.functions.udfs.scalar.dart.store.gcs.GcsDartClient; import com.gotocompany.dagger.functions.udfs.scalar.dart.store.oss.OssDartClient; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; @@ -155,6 +156,9 @@ private DartDataStore getDartDataSource() { case Constants.UDF_STORE_PROVIDER_OSS: dartDataStoreClient = new OssDartClient(); break; + case Constants.UDF_STORE_PROVIDER_COS: + dartDataStoreClient = new CosDartClient(); + break; default: throw new IllegalArgumentException("Unknown UDF Store Provider: " + udfStoreProvider); } diff --git a/dagger-functions/src/main/java/com/gotocompany/dagger/functions/udfs/scalar/dart/store/cos/CosDartClient.java b/dagger-functions/src/main/java/com/gotocompany/dagger/functions/udfs/scalar/dart/store/cos/CosDartClient.java new file mode 100644 index 000000000..4b9858dd4 --- /dev/null +++ b/dagger-functions/src/main/java/com/gotocompany/dagger/functions/udfs/scalar/dart/store/cos/CosDartClient.java @@ -0,0 +1,68 @@ +package com.gotocompany.dagger.functions.udfs.scalar.dart.store.cos; + +import com.gotocompany.dagger.common.metrics.managers.GaugeStatsManager; +import com.gotocompany.dagger.functions.exceptions.TagDoesNotExistException; +import com.gotocompany.dagger.functions.udfs.scalar.dart.DartAspects; +import com.gotocompany.dagger.functions.udfs.scalar.dart.store.DartDataStoreClient; +import com.qcloud.cos.COSClient; +import com.qcloud.cos.ClientConfig; +import com.qcloud.cos.auth.BasicCOSCredentials; +import com.qcloud.cos.auth.COSCredentials; +import com.qcloud.cos.model.COSObject; +import com.qcloud.cos.model.COSObjectInputStream; +import com.qcloud.cos.region.Region; +import com.qcloud.cos.utils.IOUtils; + +import java.io.IOException; + +import static com.gotocompany.dagger.common.core.Constants.UDF_TELEMETRY_GROUP_KEY; + +public class CosDartClient implements DartDataStoreClient { + private static final Double BYTES_TO_KB = 1024.0; + private static final String DART_PATH = "dartpath"; + + private static final String ENV_COS_SECRET_ID = "COS_SECRET_ID"; + private static final String ENV_COS_SECRET_KEY = "COS_SECRET_KEY"; + private static final String ENV_COS_REGION = "COS_REGION"; + + private final COSClient libCosClient; + + /** + * Instantiates a new Cos client. + */ + public CosDartClient() { + String secretID = System.getenv(ENV_COS_SECRET_ID); + String secretKey = System.getenv(ENV_COS_SECRET_KEY); + String region = System.getenv(ENV_COS_REGION); // ap-singapore + + COSCredentials credentials = new BasicCOSCredentials(secretID, secretKey); + ClientConfig clientConfig = new ClientConfig(new Region(region)); + libCosClient = new COSClient(credentials, clientConfig); + } + + + public String fetchJsonData(String udfName, GaugeStatsManager gaugeStatsManager, String bucketName, String dartName) { + COSObject cosObject = libCosClient.getObject(bucketName, dartName); + String dartJson; + byte[] contentByteArray; + try (COSObjectInputStream inputStream = cosObject.getObjectContent()) { + contentByteArray = IOUtils.toByteArray(inputStream); + dartJson = new String(contentByteArray); + } catch (IOException e) { + throw new TagDoesNotExistException("Could not find the content in oss for + dartName", e); + } + gaugeStatsManager.registerString(UDF_TELEMETRY_GROUP_KEY, udfName, DartAspects.DART_GCS_PATH.getValue(), dartName); + gaugeStatsManager.registerDouble(DART_PATH, dartName, DartAspects.DART_GCS_FILE_SIZE.getValue(), contentByteArray.length / BYTES_TO_KB); + return dartJson; + } + + /** + * Instantiates a new Cos client. + * This constructor used for unit test purposes. + * + * @param libCosClient the storage + */ + public CosDartClient(COSClient libCosClient) { + this.libCosClient = libCosClient; + } +} diff --git a/dagger-functions/src/test/java/com/gotocompany/dagger/functions/udfs/scalar/dart/store/cos/CosDartClientTest.java b/dagger-functions/src/test/java/com/gotocompany/dagger/functions/udfs/scalar/dart/store/cos/CosDartClientTest.java new file mode 100644 index 000000000..8bdbb004a --- /dev/null +++ b/dagger-functions/src/test/java/com/gotocompany/dagger/functions/udfs/scalar/dart/store/cos/CosDartClientTest.java @@ -0,0 +1,54 @@ +package com.gotocompany.dagger.functions.udfs.scalar.dart.store.cos; + +import com.gotocompany.dagger.common.metrics.managers.GaugeStatsManager; +import com.qcloud.cos.COSClient; +import com.qcloud.cos.model.COSObject; +import com.qcloud.cos.model.COSObjectInputStream; +import org.apache.http.client.methods.HttpRequestBase; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; + +import java.io.ByteArrayInputStream; + +import static org.mockito.Mockito.*; +import static org.mockito.MockitoAnnotations.initMocks; + +public class CosDartClientTest { + + @Mock + private COSClient libCosClient; + + @Mock + private COSObject cosObject; + + @Mock + private GaugeStatsManager gaugeStatsManager; + + @Mock + private HttpRequestBase mockRequest; + + @Before + public void setup() { + initMocks(this); + } + + @Test + public void shouldGetObjectFile() { + String bucketName = "bucket_name"; + String udfName = "DartGet"; + String dartName = "dart-get/path/to/data.json"; + String jsonFileContent = "{\"name\":\"house-stark-dev\"}"; + + when(libCosClient.getObject(bucketName, dartName)).thenReturn(cosObject); + when(cosObject.getObjectContent()).thenReturn(new COSObjectInputStream(new ByteArrayInputStream(jsonFileContent.getBytes()), mockRequest)); + + CosDartClient cosDartClient = new CosDartClient(libCosClient); + String jsonData = cosDartClient.fetchJsonData(udfName, gaugeStatsManager, bucketName, dartName); + + verify(libCosClient, times(1)).getObject(bucketName, dartName); + verify(cosObject, times(1)).getObjectContent(); + Assert.assertEquals(jsonFileContent, jsonData); + } +} diff --git a/dagger-functions/src/test/java/com/gotocompany/dagger/functions/udfs/scalar/dart/store/oss/OssDartClientTest.java b/dagger-functions/src/test/java/com/gotocompany/dagger/functions/udfs/scalar/dart/store/oss/OssDartClientTest.java index 2d1b25d40..d839d6df6 100644 --- a/dagger-functions/src/test/java/com/gotocompany/dagger/functions/udfs/scalar/dart/store/oss/OssDartClientTest.java +++ b/dagger-functions/src/test/java/com/gotocompany/dagger/functions/udfs/scalar/dart/store/oss/OssDartClientTest.java @@ -9,7 +9,6 @@ import org.mockito.Mock; import java.io.ByteArrayInputStream; -import java.io.IOException; import static org.mockito.Mockito.*; import static org.mockito.MockitoAnnotations.initMocks; @@ -30,7 +29,7 @@ public void setup() { } @Test - public void shouldGetObjectFile() throws IOException { + public void shouldGetObjectFile() { String bucketName = "bucket_name"; String udfName = "DartGet"; String dartName = "dart-get/path/to/data.json"; From 5e1d8e2066adc7595bfffb595f6139b959b48626 Mon Sep 17 00:00:00 2001 From: rajuGT Date: Thu, 23 Jan 2025 14:22:16 +0530 Subject: [PATCH 7/8] Dart implementation fix - the object storage client aren't serializable (#47) * Add gradle tasks to minimal and dependencies to maven local * Add capability to dagger to read python udfs from Ali(oss) and Tencent(cosn) storage services Given the configuration provided correctly. Set the below environment variables accordingly to access the files stored in the respective bucket. Ali(oss) - OSS_ACCESS_KEY_ID - OSS_ACCESS_KEY_SECRET Tencent(cos) - COS_SECRET_ID - COS_SECRET_KEY - COS_REGION * OSS client endpoint should be configurable via ENV variable * COS filesystem high availability support If you need to use COS filesystem for the dagger, provide the cos bucket/key configuration in the state.backend.fs.checkpointdir, state.savepoints.dir, high-availability.storageDir to flinkdeployment manifest. If the filesystem protocol begins with cosn for the above configurations, dagger uses the below configurations provided in the flinkdeployment manifest file. fs.cosn.impl: org.apache.hadoop.fs.CosFileSystem fs.AbstractFileSystem.cosn.impl: org.apache.hadoop.fs.CosN fs.cosn.userinfo.secretId: fs.cosn.userinfo.secretKey: fs.cosn.bucket.region: fs.cosn.bucket.endpoint_suffix: * Fix checkstyle and made constants as static variables * Refactor Dart Feature to plug other object storage service providers * test checkstyle fix * Dart Support for OSS Service Provider * fix checkstyle * Dart Support for COS Service Provider * Dart implementation fix - the object storage client aren't serializable Most of the client implementation including GCS, is not serializable, so fixed this issue by making client implementation not part of the serialization, and when the client is passed over wire and the client doesn't exist, it initializes as and when it is required. // In a distributed system, we don't intend the client to be serialized and most of the implementations like // GCP Storage implementation doesn't implement java.io.Serializable interface and you may see the below error // Caused by: org.apache.flink.api.common.InvalidProgramException: com.google.api.services.storage.Storage@1c666a8f // is not serializable. The object probably contains or references non serializable fields. // Caused by: java.io.NotSerializableException: com.google.api.services.storage.Storage * checkstyle fix * Add unit tests for DartDataStoreClientProvider * removed comment lines --------- Co-authored-by: Raju G T Co-authored-by: rajuGT --- .../udfs/factories/FunctionFactory.java | 21 +----- .../store/DartDataStoreClientProvider.java | 46 +++++++++++++ .../dart/store/DefaultDartDataStore.java | 17 ++--- .../DartDataStoreClientProviderTest.java | 68 +++++++++++++++++++ .../dart/store/DefaultDartDataStoreTest.java | 9 +-- 5 files changed, 128 insertions(+), 33 deletions(-) create mode 100644 dagger-functions/src/main/java/com/gotocompany/dagger/functions/udfs/scalar/dart/store/DartDataStoreClientProvider.java create mode 100644 dagger-functions/src/test/java/com/gotocompany/dagger/functions/udfs/scalar/dart/store/DartDataStoreClientProviderTest.java diff --git a/dagger-functions/src/main/java/com/gotocompany/dagger/functions/udfs/factories/FunctionFactory.java b/dagger-functions/src/main/java/com/gotocompany/dagger/functions/udfs/factories/FunctionFactory.java index da80540c9..4a2709a48 100644 --- a/dagger-functions/src/main/java/com/gotocompany/dagger/functions/udfs/factories/FunctionFactory.java +++ b/dagger-functions/src/main/java/com/gotocompany/dagger/functions/udfs/factories/FunctionFactory.java @@ -2,11 +2,8 @@ import com.gotocompany.dagger.functions.common.Constants; import com.gotocompany.dagger.functions.udfs.scalar.dart.store.DartDataStore; -import com.gotocompany.dagger.functions.udfs.scalar.dart.store.DartDataStoreClient; +import com.gotocompany.dagger.functions.udfs.scalar.dart.store.DartDataStoreClientProvider; import com.gotocompany.dagger.functions.udfs.scalar.dart.store.DefaultDartDataStore; -import com.gotocompany.dagger.functions.udfs.scalar.dart.store.cos.CosDartClient; -import com.gotocompany.dagger.functions.udfs.scalar.dart.store.gcs.GcsDartClient; -import com.gotocompany.dagger.functions.udfs.scalar.dart.store.oss.OssDartClient; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import com.google.gson.Gson; @@ -148,21 +145,7 @@ private DartDataStore getDartDataSource() { bucketID = getConfiguration().getString(Constants.UDF_DART_GCS_BUCKET_ID_KEY, Constants.UDF_DART_GCS_BUCKET_ID_DEFAULT); } - DartDataStoreClient dartDataStoreClient; - switch (udfStoreProvider) { - case Constants.UDF_STORE_PROVIDER_GCS: - dartDataStoreClient = new GcsDartClient(projectID); - break; - case Constants.UDF_STORE_PROVIDER_OSS: - dartDataStoreClient = new OssDartClient(); - break; - case Constants.UDF_STORE_PROVIDER_COS: - dartDataStoreClient = new CosDartClient(); - break; - default: - throw new IllegalArgumentException("Unknown UDF Store Provider: " + udfStoreProvider); - } - return new DefaultDartDataStore(dartDataStoreClient, bucketID); + return new DefaultDartDataStore(new DartDataStoreClientProvider(udfStoreProvider, projectID), bucketID); } private LinkedHashMap getProtosInInputStreams() { diff --git a/dagger-functions/src/main/java/com/gotocompany/dagger/functions/udfs/scalar/dart/store/DartDataStoreClientProvider.java b/dagger-functions/src/main/java/com/gotocompany/dagger/functions/udfs/scalar/dart/store/DartDataStoreClientProvider.java new file mode 100644 index 000000000..8adf04474 --- /dev/null +++ b/dagger-functions/src/main/java/com/gotocompany/dagger/functions/udfs/scalar/dart/store/DartDataStoreClientProvider.java @@ -0,0 +1,46 @@ +package com.gotocompany.dagger.functions.udfs.scalar.dart.store; + +import com.gotocompany.dagger.functions.common.Constants; +import com.gotocompany.dagger.functions.udfs.scalar.dart.store.cos.CosDartClient; +import com.gotocompany.dagger.functions.udfs.scalar.dart.store.gcs.GcsDartClient; +import com.gotocompany.dagger.functions.udfs.scalar.dart.store.oss.OssDartClient; + +import java.io.Serializable; + +public class DartDataStoreClientProvider implements Serializable { + private final String udfStoreProvider; + private final String projectID; + + // Do not make this final, if so then the implementation of client should be Serializable + private DartDataStoreClient dartDataStoreClient; + + public DartDataStoreClientProvider(String udfStoreProvider, String projectID) { + this.udfStoreProvider = udfStoreProvider; + this.projectID = projectID; + } + + public DartDataStoreClient getDartDataStoreClient() { + // In a distributed system, we don't intend the client to be serialized and most of the implementations like + // GCP Storage implementation doesn't implement java.io.Serializable interface and you may see the below error + // Caused by: org.apache.flink.api.common.InvalidProgramException: com.google.api.services.storage.Storage@1c666a8f + // is not serializable. The object probably contains or references non serializable fields. + // Caused by: java.io.NotSerializableException: com.google.api.services.storage.Storage + if (dartDataStoreClient != null) { + return dartDataStoreClient; + } + switch (udfStoreProvider) { + case Constants.UDF_STORE_PROVIDER_GCS: + dartDataStoreClient = new GcsDartClient(projectID); + break; + case Constants.UDF_STORE_PROVIDER_OSS: + dartDataStoreClient = new OssDartClient(); + break; + case Constants.UDF_STORE_PROVIDER_COS: + dartDataStoreClient = new CosDartClient(); + break; + default: + throw new IllegalArgumentException("Unknown UDF Store Provider: " + udfStoreProvider); + } + return dartDataStoreClient; + } +} diff --git a/dagger-functions/src/main/java/com/gotocompany/dagger/functions/udfs/scalar/dart/store/DefaultDartDataStore.java b/dagger-functions/src/main/java/com/gotocompany/dagger/functions/udfs/scalar/dart/store/DefaultDartDataStore.java index a1e61b613..7a9f814fc 100644 --- a/dagger-functions/src/main/java/com/gotocompany/dagger/functions/udfs/scalar/dart/store/DefaultDartDataStore.java +++ b/dagger-functions/src/main/java/com/gotocompany/dagger/functions/udfs/scalar/dart/store/DefaultDartDataStore.java @@ -7,7 +7,6 @@ import com.gotocompany.dagger.functions.udfs.scalar.dart.types.SetCache; import com.gotocompany.dagger.functions.udfs.scalar.DartContains; import com.gotocompany.dagger.functions.udfs.scalar.DartGet; -import lombok.Getter; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.type.TypeReference; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; @@ -30,19 +29,17 @@ public class DefaultDartDataStore implements DartDataStore, Serializable { public static final String DART_GET_DIRECTORY = "dart-get/"; public static final String DART_CONTAINS_DIRECTORY = "dart-contains/"; + private final DartDataStoreClientProvider clientProvider; private final String bucketId; - @Getter - private final DartDataStoreClient storeClient; - /** * Instantiates a new data store. * - * @param storeClient a {@link DartDataStoreClient} implementation for the respective object storage provider - * @param bucketId the bucket id + * @param clientProvider a {@link DartDataStoreClient} implementation for the respective object storage provider + * @param bucketId the bucket id */ - public DefaultDartDataStore(DartDataStoreClient storeClient, String bucketId) { - this.storeClient = storeClient; + public DefaultDartDataStore(DartDataStoreClientProvider clientProvider, String bucketId) { + this.clientProvider = clientProvider; this.bucketId = bucketId; } @@ -58,7 +55,7 @@ public MapCache getMap(String mapName, MeterStatsManager meterStatsManager, Gaug } private Map getMapOfObjects(String dartName, MeterStatsManager meterManager, GaugeStatsManager gaugeManager) { - String jsonData = getStoreClient().fetchJsonData( + String jsonData = clientProvider.getDartDataStoreClient().fetchJsonData( DartGet.class.getSimpleName(), gaugeManager, this.bucketId, @@ -77,7 +74,7 @@ private Map getMapOfObjects(String dartName, MeterStatsManager m } private Set getSetOfObjects(String dartName, MeterStatsManager meterManager, GaugeStatsManager gaugeManager) { - String jsonData = getStoreClient().fetchJsonData(DartContains.class.getSimpleName(), gaugeManager, this.bucketId, DART_CONTAINS_DIRECTORY + dartName); + String jsonData = clientProvider.getDartDataStoreClient().fetchJsonData(DartContains.class.getSimpleName(), gaugeManager, this.bucketId, DART_CONTAINS_DIRECTORY + dartName); ObjectMapper mapper = new ObjectMapper(); try { ObjectNode node = (ObjectNode) mapper.readTree(jsonData); diff --git a/dagger-functions/src/test/java/com/gotocompany/dagger/functions/udfs/scalar/dart/store/DartDataStoreClientProviderTest.java b/dagger-functions/src/test/java/com/gotocompany/dagger/functions/udfs/scalar/dart/store/DartDataStoreClientProviderTest.java new file mode 100644 index 000000000..f706c776c --- /dev/null +++ b/dagger-functions/src/test/java/com/gotocompany/dagger/functions/udfs/scalar/dart/store/DartDataStoreClientProviderTest.java @@ -0,0 +1,68 @@ +package com.gotocompany.dagger.functions.udfs.scalar.dart.store; + +import com.gotocompany.dagger.functions.common.Constants; +import com.gotocompany.dagger.functions.udfs.scalar.dart.store.gcs.GcsDartClient; +import com.gotocompany.dagger.functions.udfs.scalar.dart.store.oss.OssDartClient; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import static org.junit.Assert.assertTrue; + +public class DartDataStoreClientProviderTest { + private DartDataStoreClientProvider dartDataStoreClientProvider; + + @Before + public void setUp() { + dartDataStoreClientProvider = null; + } + + @Test + public void shouldReturnGcsDartClientWhenUdfStoreProviderIsGcs() { + String udfStoreProvider = Constants.UDF_STORE_PROVIDER_GCS; + String projectID = "test-project"; + + dartDataStoreClientProvider = new DartDataStoreClientProvider(udfStoreProvider, projectID); + + DartDataStoreClient client = dartDataStoreClientProvider.getDartDataStoreClient(); + + assertTrue(client instanceof GcsDartClient); + } + + @Test + public void shouldReturnOssDartClientWhenUdfStoreProviderIsOss() { + String udfStoreProvider = Constants.UDF_STORE_PROVIDER_OSS; + + dartDataStoreClientProvider = new DartDataStoreClientProvider(udfStoreProvider, null); + DartDataStoreClient client = dartDataStoreClientProvider.getDartDataStoreClient(); + + assertTrue(client instanceof OssDartClient); + } + + @Test(expected = IllegalArgumentException.class) + public void shouldThrowIllegalArgumentExceptionForUnknownUdfStoreProvider() { + String udfStoreProvider = "UNKNOWN-PROVIDER"; + + dartDataStoreClientProvider = new DartDataStoreClientProvider(udfStoreProvider, null); + + try { + dartDataStoreClientProvider.getDartDataStoreClient(); + } catch (IllegalArgumentException e) { + Assert.assertEquals("Unknown UDF Store Provider: UNKNOWN-PROVIDER", e.getMessage()); + throw e; + } + } + + @Test + public void shouldReturnSameClientOnSubsequentCalls() { + String udfStoreProvider = Constants.UDF_STORE_PROVIDER_GCS; + String projectID = "test-project"; + + dartDataStoreClientProvider = new DartDataStoreClientProvider(udfStoreProvider, projectID); + + DartDataStoreClient firstClient = dartDataStoreClientProvider.getDartDataStoreClient(); + DartDataStoreClient secondClient = dartDataStoreClientProvider.getDartDataStoreClient(); + + Assert.assertEquals(firstClient, secondClient); + } +} diff --git a/dagger-functions/src/test/java/com/gotocompany/dagger/functions/udfs/scalar/dart/store/DefaultDartDataStoreTest.java b/dagger-functions/src/test/java/com/gotocompany/dagger/functions/udfs/scalar/dart/store/DefaultDartDataStoreTest.java index d17f23eaf..eb2f95d70 100644 --- a/dagger-functions/src/test/java/com/gotocompany/dagger/functions/udfs/scalar/dart/store/DefaultDartDataStoreTest.java +++ b/dagger-functions/src/test/java/com/gotocompany/dagger/functions/udfs/scalar/dart/store/DefaultDartDataStoreTest.java @@ -35,13 +35,14 @@ public class DefaultDartDataStoreTest { @Before public void setUp() { - defaultDartDataStore = mock(DefaultDartDataStore.class); + // Subject + DartDataStoreClientProvider dartDataStoreClientProvider = mock(DartDataStoreClientProvider.class); + defaultDartDataStore = new DefaultDartDataStore(dartDataStoreClientProvider, "test-bucket"); + gcsDartClient = mock(GcsDartClient.class); meterStatsManager = mock(MeterStatsManager.class); gaugeStatsManager = mock(GaugeStatsManager.class); - when(defaultDartDataStore.getSet(anyString(), any(), any())).thenCallRealMethod(); - when(defaultDartDataStore.getMap(anyString(), any(), any())).thenCallRealMethod(); - when(defaultDartDataStore.getStoreClient()).thenReturn(gcsDartClient); + when(dartDataStoreClientProvider.getDartDataStoreClient()).thenReturn(gcsDartClient); listContent = Arrays.asList("listContent"); mapContent = Collections.singletonMap("key", "value"); } From a9e00e86a4a498ed921b6ed17c0e8115855cc2f8 Mon Sep 17 00:00:00 2001 From: rajuGT Date: Thu, 23 Jan 2025 14:33:23 +0530 Subject: [PATCH 8/8] Enable Dagger Parquet Source feature using Ali OSS Service (#49) * Add gradle tasks to minimal and dependencies to maven local * Add capability to dagger to read python udfs from Ali(oss) and Tencent(cosn) storage services Given the configuration provided correctly. Set the below environment variables accordingly to access the files stored in the respective bucket. Ali(oss) - OSS_ACCESS_KEY_ID - OSS_ACCESS_KEY_SECRET Tencent(cos) - COS_SECRET_ID - COS_SECRET_KEY - COS_REGION * OSS client endpoint should be configurable via ENV variable * COS filesystem high availability support If you need to use COS filesystem for the dagger, provide the cos bucket/key configuration in the state.backend.fs.checkpointdir, state.savepoints.dir, high-availability.storageDir to flinkdeployment manifest. If the filesystem protocol begins with cosn for the above configurations, dagger uses the below configurations provided in the flinkdeployment manifest file. fs.cosn.impl: org.apache.hadoop.fs.CosFileSystem fs.AbstractFileSystem.cosn.impl: org.apache.hadoop.fs.CosN fs.cosn.userinfo.secretId: fs.cosn.userinfo.secretKey: fs.cosn.bucket.region: fs.cosn.bucket.endpoint_suffix: * Fix checkstyle and made constants as static variables * Refactor Dart Feature to plug other object storage service providers * test checkstyle fix * Dart Support for OSS Service Provider * fix checkstyle * Dart Support for COS Service Provider * Dart implementation fix - the object storage client aren't serializable Most of the client implementation including GCS, is not serializable, so fixed this issue by making client implementation not part of the serialization, and when the client is passed over wire and the client doesn't exist, it initializes as and when it is required. // In a distributed system, we don't intend the client to be serialized and most of the implementations like // GCP Storage implementation doesn't implement java.io.Serializable interface and you may see the below error // Caused by: org.apache.flink.api.common.InvalidProgramException: com.google.api.services.storage.Storage@1c666a8f // is not serializable. The object probably contains or references non serializable fields. // Caused by: java.io.NotSerializableException: com.google.api.services.storage.Storage * checkstyle fix * Add unit tests for DartDataStoreClientProvider * Enable Dagger Parquet Source feature using Ali OSS Service --------- Co-authored-by: Raju G T Co-authored-by: rajuGT --- dagger-common/build.gradle | 3 +++ 1 file changed, 3 insertions(+) diff --git a/dagger-common/build.gradle b/dagger-common/build.gradle index 1be353e23..0ea9af248 100644 --- a/dagger-common/build.gradle +++ b/dagger-common/build.gradle @@ -66,6 +66,9 @@ dependencies { } dependenciesCommonJar group: 'com.google.cloud.bigdataoss', name: 'gcs-connector', version: 'hadoop2-2.2.16' dependenciesCommonJar group: 'com.qcloud.cos', name: 'flink-cos-fs-hadoop', version: '1.10.0-0.1.10' + + dependenciesCommonJar group: 'org.apache.hadoop', name: 'hadoop-aliyun', version: '3.4.1' + dependenciesCommonJar 'org.apache.flink:flink-metrics-dropwizard:' + flinkVersion dependenciesCommonJar 'org.apache.flink:flink-json:' + flinkVersion dependenciesCommonJar 'com.jayway.jsonpath:json-path:2.4.0'