Skip to content

Commit

Permalink
Dart Support for OSS Service Provider (#45)
Browse files Browse the repository at this point in the history
* Add gradle tasks to minimal and dependencies to maven local

* Add capability to dagger to read python udfs from Ali(oss) and Tencent(cosn) storage services
Given the configuration provided correctly. Set the below environment
variables accordingly to access the files stored in the respective
bucket.

Ali(oss)
- OSS_ACCESS_KEY_ID
- OSS_ACCESS_KEY_SECRET

Tencent(cos)
- COS_SECRET_ID
- COS_SECRET_KEY
- COS_REGION

* OSS client endpoint should be configurable via ENV variable

* COS filesystem high availability support
If you need to use COS filesystem for the dagger, provide the cos
bucket/key configuration in the state.backend.fs.checkpointdir,
state.savepoints.dir, high-availability.storageDir to flinkdeployment
manifest.

If the filesystem protocol begins with cosn for the above
configurations, dagger uses the below configurations provided in the
flinkdeployment manifest file.

    fs.cosn.impl: org.apache.hadoop.fs.CosFileSystem
    fs.AbstractFileSystem.cosn.impl: org.apache.hadoop.fs.CosN
    fs.cosn.userinfo.secretId: <secretID>
    fs.cosn.userinfo.secretKey: <secretKey>
    fs.cosn.bucket.region: <region>
    fs.cosn.bucket.endpoint_suffix: <tencent-provided-prefix.xyz.com>

* Fix checkstyle and made constants as static variables

* Refactor Dart Feature to plug other object storage service providers

* test checkstyle fix

* Dart Support for OSS Service Provider

* fix checkstyle

---------

Co-authored-by: Raju G T <[email protected]>
Co-authored-by: rajuGT <[email protected]>
  • Loading branch information
3 people authored Jan 23, 2025
1 parent dc18103 commit fd991c4
Show file tree
Hide file tree
Showing 13 changed files with 238 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ public ParameterTool getParam() {
return param;
}

public String getString(String configKey) {
return param.get(configKey);
}

public String getString(String configKey, String defaultValue) {
return param.get(configKey, defaultValue);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import org.mockito.Mock;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.mockito.Mockito.when;
import static org.mockito.MockitoAnnotations.initMocks;

Expand All @@ -30,6 +31,11 @@ public void shouldGetStringFromParamTool() {
assertEquals("test_value", configuration.getString("test_config", "test_default"));
}

@Test
public void shouldGetNullIfParamIsNotSet() {
assertNull(configuration.getString("config_not_exist"));
}

@Test
public void shouldGetIntegerFromParamTool() {
when(parameterTool.getInt("test_config", 1)).thenReturn(2);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,25 @@
public class Constants {
public static final Integer NUMBER_OF_DATA_TYPES_IN_FEATURE_ROW = 8;
public static final Integer NUMBER_OF_ARGUMENTS_IN_FEATURE_ACCUMULATOR = 3;

@Deprecated
public static final String UDF_DART_GCS_PROJECT_ID_KEY = "UDF_DART_GCS_PROJECT_ID";
@Deprecated
public static final String UDF_DART_GCS_PROJECT_ID_DEFAULT = "";
@Deprecated
public static final String UDF_DART_GCS_BUCKET_ID_KEY = "UDF_DART_GCS_BUCKET_ID";
@Deprecated
public static final String UDF_DART_GCS_BUCKET_ID_DEFAULT = "";

public static final String UDF_DART_PROJECT_ID_KEY = "UDF_DART_PROJECT_ID";
public static final String UDF_DART_PROJECT_ID_DEFAULT = "";
public static final String UDF_DART_BUCKET_ID_KEY = "UDF_DART_BUCKET_ID";
public static final String UDF_DART_BUCKET_ID_DEFAULT = "";

public static final String UDF_STORE_PROVIDER_KEY = "UDF_STORE_PROVIDER";
public static final String UDF_STORE_PROVIDER_GCS = "GCS";
public static final String UDF_STORE_PROVIDER_OSS = "OSS";

public static final String PYTHON_UDF_CONFIG = "PYTHON_UDF_CONFIG";
public static final String PYTHON_UDF_ENABLE_KEY = "PYTHON_UDF_ENABLE";
public static final boolean PYTHON_UDF_ENABLE_DEFAULT = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,8 @@ public class TagDoesNotExistException extends RuntimeException {
public TagDoesNotExistException(String message) {
super(message);
}

public TagDoesNotExistException(String message, Throwable cause) {
super(message, cause);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@

import com.gotocompany.dagger.functions.common.Constants;
import com.gotocompany.dagger.functions.udfs.scalar.dart.store.DartDataStore;
import com.gotocompany.dagger.functions.udfs.scalar.dart.store.gcs.GcsDartDataStore;
import com.gotocompany.dagger.functions.udfs.scalar.dart.store.DartDataStoreClient;
import com.gotocompany.dagger.functions.udfs.scalar.dart.store.DefaultDartDataStore;
import com.gotocompany.dagger.functions.udfs.scalar.dart.store.gcs.GcsDartClient;
import com.gotocompany.dagger.functions.udfs.scalar.dart.store.oss.OssDartClient;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import com.google.gson.Gson;
Expand Down Expand Up @@ -134,9 +137,28 @@ public HashSet<AggregateUdf> getAggregateUdfs() {
}

private DartDataStore getDartDataSource() {
String projectID = getConfiguration().getString(Constants.UDF_DART_GCS_PROJECT_ID_KEY, Constants.UDF_DART_GCS_PROJECT_ID_DEFAULT);
String bucketID = getConfiguration().getString(Constants.UDF_DART_GCS_BUCKET_ID_KEY, Constants.UDF_DART_GCS_BUCKET_ID_DEFAULT);
return new GcsDartDataStore(projectID, bucketID);
String projectID = getConfiguration().getString(Constants.UDF_DART_PROJECT_ID_KEY, Constants.UDF_DART_PROJECT_ID_DEFAULT);
String bucketID = getConfiguration().getString(Constants.UDF_DART_BUCKET_ID_KEY, Constants.UDF_DART_BUCKET_ID_DEFAULT);

String udfStoreProvider = getConfiguration().getString(Constants.UDF_STORE_PROVIDER_KEY);
if (udfStoreProvider == null) {
udfStoreProvider = Constants.UDF_STORE_PROVIDER_GCS;
projectID = getConfiguration().getString(Constants.UDF_DART_GCS_PROJECT_ID_KEY, Constants.UDF_DART_GCS_PROJECT_ID_DEFAULT);
bucketID = getConfiguration().getString(Constants.UDF_DART_GCS_BUCKET_ID_KEY, Constants.UDF_DART_GCS_BUCKET_ID_DEFAULT);
}

DartDataStoreClient dartDataStoreClient;
switch (udfStoreProvider) {
case Constants.UDF_STORE_PROVIDER_GCS:
dartDataStoreClient = new GcsDartClient(projectID);
break;
case Constants.UDF_STORE_PROVIDER_OSS:
dartDataStoreClient = new OssDartClient();
break;
default:
throw new IllegalArgumentException("Unknown UDF Store Provider: " + udfStoreProvider);
}
return new DefaultDartDataStore(dartDataStoreClient, bucketID);
}

private LinkedHashMap<String, String> getProtosInInputStreams() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package com.gotocompany.dagger.functions.udfs.scalar.dart.store;

import com.gotocompany.dagger.common.metrics.managers.GaugeStatsManager;
import com.gotocompany.dagger.functions.exceptions.BucketDoesNotExistException;
import com.gotocompany.dagger.functions.exceptions.TagDoesNotExistException;

public interface DartDataStoreClient {

/**
* If a client could provide implementation to this, use the default data store, else implement DartDataStore along with client implementation.
*
* @param udfName either "DartGet" or "DartContains"
* @param gaugeStatsManager an instrumentation provider
* @param bucketName name of the object storage service bucket
* @param dartName from the bucket, this would be either dart-get/path/to/file.json or dart-contains/path/to/file.json
* @return Content of the file in String format from abc://bucket-name/dart-(get/contains)/path/to/file.json
* @throws TagDoesNotExistException if tag doesn't exist
* @throws BucketDoesNotExistException if bucket doesn't exist
*/
String fetchJsonData(String udfName, GaugeStatsManager gaugeStatsManager, String bucketName, String dartName)
throws TagDoesNotExistException, BucketDoesNotExistException;
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.gotocompany.dagger.functions.udfs.scalar.dart.store.gcs;
package com.gotocompany.dagger.functions.udfs.scalar.dart.store;

import com.gotocompany.dagger.common.metrics.managers.GaugeStatsManager;
import com.gotocompany.dagger.common.metrics.managers.MeterStatsManager;
Expand All @@ -7,7 +7,7 @@
import com.gotocompany.dagger.functions.udfs.scalar.dart.types.SetCache;
import com.gotocompany.dagger.functions.udfs.scalar.DartContains;
import com.gotocompany.dagger.functions.udfs.scalar.DartGet;
import com.gotocompany.dagger.functions.udfs.scalar.dart.store.DartDataStore;
import lombok.Getter;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.type.TypeReference;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
Expand All @@ -22,24 +22,27 @@
import java.util.Set;

/**
* The type Gcs data store.
* DefaultDartDataStore would be able to fetch the darts from the object storage services.
* pass the relevant client which implements {@link DartDataStoreClient}
*/
public class GcsDartDataStore implements DartDataStore, Serializable {
public class DefaultDartDataStore implements DartDataStore, Serializable {

private final String projectId;
public static final String DART_GET_DIRECTORY = "dart-get/";
public static final String DART_CONTAINS_DIRECTORY = "dart-contains/";

private final String bucketId;

private GcsClient gcsClient;
@Getter
private final DartDataStoreClient storeClient;

/**
* Instantiates a new Gcs data store.
* Instantiates a new data store.
*
* @param projectId the project id
* @param bucketId the bucket id
* @param storeClient a {@link DartDataStoreClient} implementation for the respective object storage provider
* @param bucketId the bucket id
*/
public GcsDartDataStore(String projectId, String bucketId) {
this.projectId = projectId;
public DefaultDartDataStore(DartDataStoreClient storeClient, String bucketId) {
this.storeClient = storeClient;
this.bucketId = bucketId;
}

Expand All @@ -55,11 +58,11 @@ public MapCache getMap(String mapName, MeterStatsManager meterStatsManager, Gaug
}

private Map<String, String> getMapOfObjects(String dartName, MeterStatsManager meterManager, GaugeStatsManager gaugeManager) {
String jsonData = getGcsClient().fetchJsonData(
String jsonData = getStoreClient().fetchJsonData(
DartGet.class.getSimpleName(),
gaugeManager,
this.bucketId,
"dart-get/" + dartName);
DART_GET_DIRECTORY + dartName);

ObjectMapper mapper = new ObjectMapper();

Expand All @@ -74,8 +77,7 @@ private Map<String, String> getMapOfObjects(String dartName, MeterStatsManager m
}

private Set<String> getSetOfObjects(String dartName, MeterStatsManager meterManager, GaugeStatsManager gaugeManager) {

String jsonData = getGcsClient().fetchJsonData(DartContains.class.getSimpleName(), gaugeManager, this.bucketId, "dart-contains/" + dartName);
String jsonData = getStoreClient().fetchJsonData(DartContains.class.getSimpleName(), gaugeManager, this.bucketId, DART_CONTAINS_DIRECTORY + dartName);
ObjectMapper mapper = new ObjectMapper();
try {
ObjectNode node = (ObjectNode) mapper.readTree(jsonData);
Expand All @@ -92,16 +94,4 @@ private Set<String> getSetOfObjects(String dartName, MeterStatsManager meterMana

return new HashSet<>();
}

/**
* Gets gcs client.
*
* @return the gcs client
*/
GcsClient getGcsClient() {
if (this.gcsClient == null) {
this.gcsClient = new GcsClient(this.projectId);
}
return this.gcsClient;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,14 @@
import com.gotocompany.dagger.functions.exceptions.BucketDoesNotExistException;
import com.gotocompany.dagger.functions.exceptions.TagDoesNotExistException;
import com.gotocompany.dagger.functions.udfs.scalar.dart.DartAspects;
import com.gotocompany.dagger.functions.udfs.scalar.dart.store.DartDataStoreClient;

import static com.gotocompany.dagger.common.core.Constants.UDF_TELEMETRY_GROUP_KEY;

/**
* The type Gcs client.
*/
public class GcsClient {
public class GcsDartClient implements DartDataStoreClient {

private Storage storage;

Expand All @@ -28,7 +29,7 @@ public class GcsClient {
*
* @param projectId the project id
*/
public GcsClient(String projectId) {
public GcsDartClient(String projectId) {

if (storage == null) {
storage = StorageOptions.newBuilder()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package com.gotocompany.dagger.functions.udfs.scalar.dart.store.oss;

import com.aliyun.core.utils.IOUtils;
import com.aliyun.oss.OSS;
import com.aliyun.oss.OSSClientBuilder;
import com.aliyun.oss.common.auth.CredentialsProviderFactory;
import com.aliyun.oss.model.OSSObject;
import com.aliyuncs.exceptions.ClientException;
import com.gotocompany.dagger.common.metrics.managers.GaugeStatsManager;
import com.gotocompany.dagger.functions.exceptions.TagDoesNotExistException;
import com.gotocompany.dagger.functions.udfs.scalar.dart.DartAspects;
import com.gotocompany.dagger.functions.udfs.scalar.dart.store.DartDataStoreClient;

import java.io.IOException;
import java.io.InputStream;

import static com.gotocompany.dagger.common.core.Constants.UDF_TELEMETRY_GROUP_KEY;

public class OssDartClient implements DartDataStoreClient {

private static final String ENV_OSS_ENDPOINT = "OSS_ENDPOINT";
private static final String DEFAULT_OSS_ENDPOINT = "oss-ap-southeast-1.aliyuncs.com";

private static final Double BYTES_TO_KB = 1024.0;
private static final String DART_PATH = "dartpath";

private final OSS libOssClient;

/**
* Instantiates a new Oss client.
*/
public OssDartClient() {
String endpoint = System.getenv(ENV_OSS_ENDPOINT);
if (endpoint == null || endpoint.isEmpty()) {
endpoint = DEFAULT_OSS_ENDPOINT;
}
try {
libOssClient = new OSSClientBuilder().build(endpoint, CredentialsProviderFactory.newEnvironmentVariableCredentialsProvider());
} catch (ClientException e) {
throw new RuntimeException("failed to initialise oss client", e);
}
}

public String fetchJsonData(String udfName, GaugeStatsManager gaugeStatsManager, String bucketName, String dartName) {
OSSObject ossObject = libOssClient.getObject(bucketName, dartName);
String dartJson;
byte[] contentByteArray;
try (InputStream inputStream = ossObject.getObjectContent()) {
contentByteArray = IOUtils.toByteArray(inputStream);
dartJson = new String(contentByteArray);
} catch (IOException e) {
throw new TagDoesNotExistException("Could not find the content in oss for + dartName", e);
}
gaugeStatsManager.registerString(UDF_TELEMETRY_GROUP_KEY, udfName, DartAspects.DART_GCS_PATH.getValue(), dartName);
gaugeStatsManager.registerDouble(DART_PATH, dartName, DartAspects.DART_GCS_FILE_SIZE.getValue(), contentByteArray.length / BYTES_TO_KB);
return dartJson;
}

/**
* Instantiates a new OSS client.
* This constructor used for unit test purposes.
*
* @param libOssClient the storage
*/
public OssDartClient(OSS libOssClient) {
this.libOssClient = libOssClient;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import com.gotocompany.dagger.common.metrics.managers.MeterStatsManager;
import com.gotocompany.dagger.functions.exceptions.TagDoesNotExistException;
import com.gotocompany.dagger.functions.udfs.scalar.dart.store.DartDataStore;
import com.gotocompany.dagger.functions.udfs.scalar.dart.store.gcs.GcsDartDataStore;
import com.gotocompany.dagger.functions.udfs.scalar.dart.store.DefaultDartDataStore;
import com.gotocompany.dagger.functions.udfs.scalar.dart.types.SetCache;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.MetricGroup;
Expand Down Expand Up @@ -44,7 +44,7 @@ public void setUp() {
when(functionContext.getMetricGroup()).thenReturn(metricGroup);
when(metricGroup.addGroup("udf", "DartContains")).thenReturn(metricGroup);
when(metricGroup.addGroup("DartContains")).thenReturn(metricGroup);
this.dataStore = mock(GcsDartDataStore.class);
this.dataStore = mock(DefaultDartDataStore.class);

dartContains = new DartContains(dataStore);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import com.gotocompany.dagger.common.metrics.managers.MeterStatsManager;
import com.gotocompany.dagger.functions.exceptions.KeyDoesNotExistException;
import com.gotocompany.dagger.functions.exceptions.TagDoesNotExistException;
import com.gotocompany.dagger.functions.udfs.scalar.dart.store.gcs.GcsDartDataStore;
import com.gotocompany.dagger.functions.udfs.scalar.dart.store.DefaultDartDataStore;
import com.gotocompany.dagger.functions.udfs.scalar.dart.types.MapCache;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.MetricGroup;
Expand All @@ -19,7 +19,7 @@
import static org.mockito.MockitoAnnotations.initMocks;

public class DartGetTest {
private GcsDartDataStore dataStore;
private DefaultDartDataStore dataStore;

@Mock
private MetricGroup metricGroup;
Expand All @@ -42,7 +42,7 @@ public void setUp() {
when(functionContext.getMetricGroup()).thenReturn(metricGroup);
when(metricGroup.addGroup("udf", "DartGet")).thenReturn(metricGroup);
when(metricGroup.addGroup("DartGet")).thenReturn(metricGroup);
this.dataStore = mock(GcsDartDataStore.class);
this.dataStore = mock(DefaultDartDataStore.class);

dartGet = new DartGet(dataStore);

Expand Down
Loading

0 comments on commit fd991c4

Please sign in to comment.