Skip to content

Commit

Permalink
Dart Support for COS Service Provider (#46)
Browse files Browse the repository at this point in the history
* Add gradle tasks to minimal and dependencies to maven local

* Add capability to dagger to read python udfs from Ali(oss) and Tencent(cosn) storage services
Given the configuration provided correctly. Set the below environment
variables accordingly to access the files stored in the respective
bucket.

Ali(oss)
- OSS_ACCESS_KEY_ID
- OSS_ACCESS_KEY_SECRET

Tencent(cos)
- COS_SECRET_ID
- COS_SECRET_KEY
- COS_REGION

* OSS client endpoint should be configurable via ENV variable

* COS filesystem high availability support
If you need to use COS filesystem for the dagger, provide the cos
bucket/key configuration in the state.backend.fs.checkpointdir,
state.savepoints.dir, high-availability.storageDir to flinkdeployment
manifest.

If the filesystem protocol begins with cosn for the above
configurations, dagger uses the below configurations provided in the
flinkdeployment manifest file.

    fs.cosn.impl: org.apache.hadoop.fs.CosFileSystem
    fs.AbstractFileSystem.cosn.impl: org.apache.hadoop.fs.CosN
    fs.cosn.userinfo.secretId: <secretID>
    fs.cosn.userinfo.secretKey: <secretKey>
    fs.cosn.bucket.region: <region>
    fs.cosn.bucket.endpoint_suffix: <tencent-provided-prefix.xyz.com>

* Fix checkstyle and made constants as static variables

* Refactor Dart Feature to plug other object storage service providers

* test checkstyle fix

* Dart Support for OSS Service Provider

* fix checkstyle

* Dart Support for COS Service Provider

---------

Co-authored-by: Raju G T <[email protected]>
Co-authored-by: rajuGT <[email protected]>
  • Loading branch information
3 people authored Jan 23, 2025
1 parent fd991c4 commit 3cbad4e
Show file tree
Hide file tree
Showing 5 changed files with 128 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ public class Constants {
public static final String UDF_STORE_PROVIDER_KEY = "UDF_STORE_PROVIDER";
public static final String UDF_STORE_PROVIDER_GCS = "GCS";
public static final String UDF_STORE_PROVIDER_OSS = "OSS";
public static final String UDF_STORE_PROVIDER_COS = "COS";

public static final String PYTHON_UDF_CONFIG = "PYTHON_UDF_CONFIG";
public static final String PYTHON_UDF_ENABLE_KEY = "PYTHON_UDF_ENABLE";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.gotocompany.dagger.functions.udfs.scalar.dart.store.DartDataStore;
import com.gotocompany.dagger.functions.udfs.scalar.dart.store.DartDataStoreClient;
import com.gotocompany.dagger.functions.udfs.scalar.dart.store.DefaultDartDataStore;
import com.gotocompany.dagger.functions.udfs.scalar.dart.store.cos.CosDartClient;
import com.gotocompany.dagger.functions.udfs.scalar.dart.store.gcs.GcsDartClient;
import com.gotocompany.dagger.functions.udfs.scalar.dart.store.oss.OssDartClient;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
Expand Down Expand Up @@ -155,6 +156,9 @@ private DartDataStore getDartDataSource() {
case Constants.UDF_STORE_PROVIDER_OSS:
dartDataStoreClient = new OssDartClient();
break;
case Constants.UDF_STORE_PROVIDER_COS:
dartDataStoreClient = new CosDartClient();
break;
default:
throw new IllegalArgumentException("Unknown UDF Store Provider: " + udfStoreProvider);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package com.gotocompany.dagger.functions.udfs.scalar.dart.store.cos;

import com.gotocompany.dagger.common.metrics.managers.GaugeStatsManager;
import com.gotocompany.dagger.functions.exceptions.TagDoesNotExistException;
import com.gotocompany.dagger.functions.udfs.scalar.dart.DartAspects;
import com.gotocompany.dagger.functions.udfs.scalar.dart.store.DartDataStoreClient;
import com.qcloud.cos.COSClient;
import com.qcloud.cos.ClientConfig;
import com.qcloud.cos.auth.BasicCOSCredentials;
import com.qcloud.cos.auth.COSCredentials;
import com.qcloud.cos.model.COSObject;
import com.qcloud.cos.model.COSObjectInputStream;
import com.qcloud.cos.region.Region;
import com.qcloud.cos.utils.IOUtils;

import java.io.IOException;

import static com.gotocompany.dagger.common.core.Constants.UDF_TELEMETRY_GROUP_KEY;

public class CosDartClient implements DartDataStoreClient {
private static final Double BYTES_TO_KB = 1024.0;
private static final String DART_PATH = "dartpath";

private static final String ENV_COS_SECRET_ID = "COS_SECRET_ID";
private static final String ENV_COS_SECRET_KEY = "COS_SECRET_KEY";
private static final String ENV_COS_REGION = "COS_REGION";

private final COSClient libCosClient;

/**
* Instantiates a new Cos client.
*/
public CosDartClient() {
String secretID = System.getenv(ENV_COS_SECRET_ID);
String secretKey = System.getenv(ENV_COS_SECRET_KEY);
String region = System.getenv(ENV_COS_REGION); // ap-singapore

COSCredentials credentials = new BasicCOSCredentials(secretID, secretKey);
ClientConfig clientConfig = new ClientConfig(new Region(region));
libCosClient = new COSClient(credentials, clientConfig);
}


public String fetchJsonData(String udfName, GaugeStatsManager gaugeStatsManager, String bucketName, String dartName) {
COSObject cosObject = libCosClient.getObject(bucketName, dartName);
String dartJson;
byte[] contentByteArray;
try (COSObjectInputStream inputStream = cosObject.getObjectContent()) {
contentByteArray = IOUtils.toByteArray(inputStream);
dartJson = new String(contentByteArray);
} catch (IOException e) {
throw new TagDoesNotExistException("Could not find the content in oss for + dartName", e);
}
gaugeStatsManager.registerString(UDF_TELEMETRY_GROUP_KEY, udfName, DartAspects.DART_GCS_PATH.getValue(), dartName);
gaugeStatsManager.registerDouble(DART_PATH, dartName, DartAspects.DART_GCS_FILE_SIZE.getValue(), contentByteArray.length / BYTES_TO_KB);
return dartJson;
}

/**
* Instantiates a new Cos client.
* This constructor used for unit test purposes.
*
* @param libCosClient the storage
*/
public CosDartClient(COSClient libCosClient) {
this.libCosClient = libCosClient;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package com.gotocompany.dagger.functions.udfs.scalar.dart.store.cos;

import com.gotocompany.dagger.common.metrics.managers.GaugeStatsManager;
import com.qcloud.cos.COSClient;
import com.qcloud.cos.model.COSObject;
import com.qcloud.cos.model.COSObjectInputStream;
import org.apache.http.client.methods.HttpRequestBase;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;

import java.io.ByteArrayInputStream;

import static org.mockito.Mockito.*;
import static org.mockito.MockitoAnnotations.initMocks;

public class CosDartClientTest {

@Mock
private COSClient libCosClient;

@Mock
private COSObject cosObject;

@Mock
private GaugeStatsManager gaugeStatsManager;

@Mock
private HttpRequestBase mockRequest;

@Before
public void setup() {
initMocks(this);
}

@Test
public void shouldGetObjectFile() {
String bucketName = "bucket_name";
String udfName = "DartGet";
String dartName = "dart-get/path/to/data.json";
String jsonFileContent = "{\"name\":\"house-stark-dev\"}";

when(libCosClient.getObject(bucketName, dartName)).thenReturn(cosObject);
when(cosObject.getObjectContent()).thenReturn(new COSObjectInputStream(new ByteArrayInputStream(jsonFileContent.getBytes()), mockRequest));

CosDartClient cosDartClient = new CosDartClient(libCosClient);
String jsonData = cosDartClient.fetchJsonData(udfName, gaugeStatsManager, bucketName, dartName);

verify(libCosClient, times(1)).getObject(bucketName, dartName);
verify(cosObject, times(1)).getObjectContent();
Assert.assertEquals(jsonFileContent, jsonData);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import org.mockito.Mock;

import java.io.ByteArrayInputStream;
import java.io.IOException;

import static org.mockito.Mockito.*;
import static org.mockito.MockitoAnnotations.initMocks;
Expand All @@ -30,7 +29,7 @@ public void setup() {
}

@Test
public void shouldGetObjectFile() throws IOException {
public void shouldGetObjectFile() {
String bucketName = "bucket_name";
String udfName = "DartGet";
String dartName = "dart-get/path/to/data.json";
Expand Down

0 comments on commit 3cbad4e

Please sign in to comment.