Skip to content

Commit

Permalink
Dart implementation fix - the object storage client aren't serializab…
Browse files Browse the repository at this point in the history
…le (#47)

* Add gradle tasks to minimal and dependencies to maven local

* Add capability to dagger to read python udfs from Ali(oss) and Tencent(cosn) storage services
Given the configuration provided correctly. Set the below environment
variables accordingly to access the files stored in the respective
bucket.

Ali(oss)
- OSS_ACCESS_KEY_ID
- OSS_ACCESS_KEY_SECRET

Tencent(cos)
- COS_SECRET_ID
- COS_SECRET_KEY
- COS_REGION

* OSS client endpoint should be configurable via ENV variable

* COS filesystem high availability support
If you need to use COS filesystem for the dagger, provide the cos
bucket/key configuration in the state.backend.fs.checkpointdir,
state.savepoints.dir, high-availability.storageDir to flinkdeployment
manifest.

If the filesystem protocol begins with cosn for the above
configurations, dagger uses the below configurations provided in the
flinkdeployment manifest file.

    fs.cosn.impl: org.apache.hadoop.fs.CosFileSystem
    fs.AbstractFileSystem.cosn.impl: org.apache.hadoop.fs.CosN
    fs.cosn.userinfo.secretId: <secretID>
    fs.cosn.userinfo.secretKey: <secretKey>
    fs.cosn.bucket.region: <region>
    fs.cosn.bucket.endpoint_suffix: <tencent-provided-prefix.xyz.com>

* Fix checkstyle and made constants as static variables

* Refactor Dart Feature to plug other object storage service providers

* test checkstyle fix

* Dart Support for OSS Service Provider

* fix checkstyle

* Dart Support for COS Service Provider

* Dart implementation fix - the object storage client aren't serializable
Most of the client implementation including GCS, is not serializable, so
fixed this issue by making client implementation not part of the
serialization, and when the client is passed over wire and the client
doesn't exist, it initializes as and when it is required.

        // In a distributed system, we don't intend the client to be serialized and most of the implementations like
        // GCP Storage implementation doesn't implement java.io.Serializable interface and you may see the below error
        // Caused by: org.apache.flink.api.common.InvalidProgramException: com.google.api.services.storage.Storage@1c666a8f
        // is not serializable. The object probably contains or references non serializable fields.
        // Caused by: java.io.NotSerializableException: com.google.api.services.storage.Storage

* checkstyle fix

* Add unit tests for DartDataStoreClientProvider

* removed comment lines

---------

Co-authored-by: Raju G T <[email protected]>
Co-authored-by: rajuGT <[email protected]>
  • Loading branch information
3 people authored Jan 23, 2025
1 parent 3cbad4e commit 5e1d8e2
Show file tree
Hide file tree
Showing 5 changed files with 128 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,8 @@

import com.gotocompany.dagger.functions.common.Constants;
import com.gotocompany.dagger.functions.udfs.scalar.dart.store.DartDataStore;
import com.gotocompany.dagger.functions.udfs.scalar.dart.store.DartDataStoreClient;
import com.gotocompany.dagger.functions.udfs.scalar.dart.store.DartDataStoreClientProvider;
import com.gotocompany.dagger.functions.udfs.scalar.dart.store.DefaultDartDataStore;
import com.gotocompany.dagger.functions.udfs.scalar.dart.store.cos.CosDartClient;
import com.gotocompany.dagger.functions.udfs.scalar.dart.store.gcs.GcsDartClient;
import com.gotocompany.dagger.functions.udfs.scalar.dart.store.oss.OssDartClient;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import com.google.gson.Gson;
Expand Down Expand Up @@ -148,21 +145,7 @@ private DartDataStore getDartDataSource() {
bucketID = getConfiguration().getString(Constants.UDF_DART_GCS_BUCKET_ID_KEY, Constants.UDF_DART_GCS_BUCKET_ID_DEFAULT);
}

DartDataStoreClient dartDataStoreClient;
switch (udfStoreProvider) {
case Constants.UDF_STORE_PROVIDER_GCS:
dartDataStoreClient = new GcsDartClient(projectID);
break;
case Constants.UDF_STORE_PROVIDER_OSS:
dartDataStoreClient = new OssDartClient();
break;
case Constants.UDF_STORE_PROVIDER_COS:
dartDataStoreClient = new CosDartClient();
break;
default:
throw new IllegalArgumentException("Unknown UDF Store Provider: " + udfStoreProvider);
}
return new DefaultDartDataStore(dartDataStoreClient, bucketID);
return new DefaultDartDataStore(new DartDataStoreClientProvider(udfStoreProvider, projectID), bucketID);
}

private LinkedHashMap<String, String> getProtosInInputStreams() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package com.gotocompany.dagger.functions.udfs.scalar.dart.store;

import com.gotocompany.dagger.functions.common.Constants;
import com.gotocompany.dagger.functions.udfs.scalar.dart.store.cos.CosDartClient;
import com.gotocompany.dagger.functions.udfs.scalar.dart.store.gcs.GcsDartClient;
import com.gotocompany.dagger.functions.udfs.scalar.dart.store.oss.OssDartClient;

import java.io.Serializable;

public class DartDataStoreClientProvider implements Serializable {
private final String udfStoreProvider;
private final String projectID;

// Do not make this final, if so then the implementation of client should be Serializable
private DartDataStoreClient dartDataStoreClient;

public DartDataStoreClientProvider(String udfStoreProvider, String projectID) {
this.udfStoreProvider = udfStoreProvider;
this.projectID = projectID;
}

public DartDataStoreClient getDartDataStoreClient() {
// In a distributed system, we don't intend the client to be serialized and most of the implementations like
// GCP Storage implementation doesn't implement java.io.Serializable interface and you may see the below error
// Caused by: org.apache.flink.api.common.InvalidProgramException: com.google.api.services.storage.Storage@1c666a8f
// is not serializable. The object probably contains or references non serializable fields.
// Caused by: java.io.NotSerializableException: com.google.api.services.storage.Storage
if (dartDataStoreClient != null) {
return dartDataStoreClient;
}
switch (udfStoreProvider) {
case Constants.UDF_STORE_PROVIDER_GCS:
dartDataStoreClient = new GcsDartClient(projectID);
break;
case Constants.UDF_STORE_PROVIDER_OSS:
dartDataStoreClient = new OssDartClient();
break;
case Constants.UDF_STORE_PROVIDER_COS:
dartDataStoreClient = new CosDartClient();
break;
default:
throw new IllegalArgumentException("Unknown UDF Store Provider: " + udfStoreProvider);
}
return dartDataStoreClient;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import com.gotocompany.dagger.functions.udfs.scalar.dart.types.SetCache;
import com.gotocompany.dagger.functions.udfs.scalar.DartContains;
import com.gotocompany.dagger.functions.udfs.scalar.DartGet;
import lombok.Getter;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.type.TypeReference;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
Expand All @@ -30,19 +29,17 @@ public class DefaultDartDataStore implements DartDataStore, Serializable {
public static final String DART_GET_DIRECTORY = "dart-get/";
public static final String DART_CONTAINS_DIRECTORY = "dart-contains/";

private final DartDataStoreClientProvider clientProvider;
private final String bucketId;

@Getter
private final DartDataStoreClient storeClient;

/**
* Instantiates a new data store.
*
* @param storeClient a {@link DartDataStoreClient} implementation for the respective object storage provider
* @param bucketId the bucket id
* @param clientProvider a {@link DartDataStoreClient} implementation for the respective object storage provider
* @param bucketId the bucket id
*/
public DefaultDartDataStore(DartDataStoreClient storeClient, String bucketId) {
this.storeClient = storeClient;
public DefaultDartDataStore(DartDataStoreClientProvider clientProvider, String bucketId) {
this.clientProvider = clientProvider;
this.bucketId = bucketId;
}

Expand All @@ -58,7 +55,7 @@ public MapCache getMap(String mapName, MeterStatsManager meterStatsManager, Gaug
}

private Map<String, String> getMapOfObjects(String dartName, MeterStatsManager meterManager, GaugeStatsManager gaugeManager) {
String jsonData = getStoreClient().fetchJsonData(
String jsonData = clientProvider.getDartDataStoreClient().fetchJsonData(
DartGet.class.getSimpleName(),
gaugeManager,
this.bucketId,
Expand All @@ -77,7 +74,7 @@ private Map<String, String> getMapOfObjects(String dartName, MeterStatsManager m
}

private Set<String> getSetOfObjects(String dartName, MeterStatsManager meterManager, GaugeStatsManager gaugeManager) {
String jsonData = getStoreClient().fetchJsonData(DartContains.class.getSimpleName(), gaugeManager, this.bucketId, DART_CONTAINS_DIRECTORY + dartName);
String jsonData = clientProvider.getDartDataStoreClient().fetchJsonData(DartContains.class.getSimpleName(), gaugeManager, this.bucketId, DART_CONTAINS_DIRECTORY + dartName);
ObjectMapper mapper = new ObjectMapper();
try {
ObjectNode node = (ObjectNode) mapper.readTree(jsonData);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package com.gotocompany.dagger.functions.udfs.scalar.dart.store;

import com.gotocompany.dagger.functions.common.Constants;
import com.gotocompany.dagger.functions.udfs.scalar.dart.store.gcs.GcsDartClient;
import com.gotocompany.dagger.functions.udfs.scalar.dart.store.oss.OssDartClient;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

import static org.junit.Assert.assertTrue;

public class DartDataStoreClientProviderTest {
private DartDataStoreClientProvider dartDataStoreClientProvider;

@Before
public void setUp() {
dartDataStoreClientProvider = null;
}

@Test
public void shouldReturnGcsDartClientWhenUdfStoreProviderIsGcs() {
String udfStoreProvider = Constants.UDF_STORE_PROVIDER_GCS;
String projectID = "test-project";

dartDataStoreClientProvider = new DartDataStoreClientProvider(udfStoreProvider, projectID);

DartDataStoreClient client = dartDataStoreClientProvider.getDartDataStoreClient();

assertTrue(client instanceof GcsDartClient);
}

@Test
public void shouldReturnOssDartClientWhenUdfStoreProviderIsOss() {
String udfStoreProvider = Constants.UDF_STORE_PROVIDER_OSS;

dartDataStoreClientProvider = new DartDataStoreClientProvider(udfStoreProvider, null);
DartDataStoreClient client = dartDataStoreClientProvider.getDartDataStoreClient();

assertTrue(client instanceof OssDartClient);
}

@Test(expected = IllegalArgumentException.class)
public void shouldThrowIllegalArgumentExceptionForUnknownUdfStoreProvider() {
String udfStoreProvider = "UNKNOWN-PROVIDER";

dartDataStoreClientProvider = new DartDataStoreClientProvider(udfStoreProvider, null);

try {
dartDataStoreClientProvider.getDartDataStoreClient();
} catch (IllegalArgumentException e) {
Assert.assertEquals("Unknown UDF Store Provider: UNKNOWN-PROVIDER", e.getMessage());
throw e;
}
}

@Test
public void shouldReturnSameClientOnSubsequentCalls() {
String udfStoreProvider = Constants.UDF_STORE_PROVIDER_GCS;
String projectID = "test-project";

dartDataStoreClientProvider = new DartDataStoreClientProvider(udfStoreProvider, projectID);

DartDataStoreClient firstClient = dartDataStoreClientProvider.getDartDataStoreClient();
DartDataStoreClient secondClient = dartDataStoreClientProvider.getDartDataStoreClient();

Assert.assertEquals(firstClient, secondClient);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,14 @@ public class DefaultDartDataStoreTest {

@Before
public void setUp() {
defaultDartDataStore = mock(DefaultDartDataStore.class);
// Subject
DartDataStoreClientProvider dartDataStoreClientProvider = mock(DartDataStoreClientProvider.class);
defaultDartDataStore = new DefaultDartDataStore(dartDataStoreClientProvider, "test-bucket");

gcsDartClient = mock(GcsDartClient.class);
meterStatsManager = mock(MeterStatsManager.class);
gaugeStatsManager = mock(GaugeStatsManager.class);
when(defaultDartDataStore.getSet(anyString(), any(), any())).thenCallRealMethod();
when(defaultDartDataStore.getMap(anyString(), any(), any())).thenCallRealMethod();
when(defaultDartDataStore.getStoreClient()).thenReturn(gcsDartClient);
when(dartDataStoreClientProvider.getDartDataStoreClient()).thenReturn(gcsDartClient);
listContent = Arrays.asList("listContent");
mapContent = Collections.singletonMap("key", "value");
}
Expand Down

0 comments on commit 5e1d8e2

Please sign in to comment.