Skip to content

Commit

Permalink
Enable Dagger Parquet Source feature using Tencent COS Service (#50)
Browse files Browse the repository at this point in the history
* Add gradle tasks to minimal and dependencies to maven local

* Add capability to dagger to read python udfs from Ali(oss) and Tencent(cosn) storage services
Given the configuration provided correctly. Set the below environment
variables accordingly to access the files stored in the respective
bucket.

Ali(oss)
- OSS_ACCESS_KEY_ID
- OSS_ACCESS_KEY_SECRET

Tencent(cos)
- COS_SECRET_ID
- COS_SECRET_KEY
- COS_REGION

* OSS client endpoint should be configurable via ENV variable

* COS filesystem high availability support
If you need to use COS filesystem for the dagger, provide the cos
bucket/key configuration in the state.backend.fs.checkpointdir,
state.savepoints.dir, high-availability.storageDir to flinkdeployment
manifest.

If the filesystem protocol begins with cosn for the above
configurations, dagger uses the below configurations provided in the
flinkdeployment manifest file.

    fs.cosn.impl: org.apache.hadoop.fs.CosFileSystem
    fs.AbstractFileSystem.cosn.impl: org.apache.hadoop.fs.CosN
    fs.cosn.userinfo.secretId: <secretID>
    fs.cosn.userinfo.secretKey: <secretKey>
    fs.cosn.bucket.region: <region>
    fs.cosn.bucket.endpoint_suffix: <tencent-provided-prefix.xyz.com>

* Fix checkstyle and made constants as static variables

* Refactor Dart Feature to plug other object storage service providers

* test checkstyle fix

* Dart Support for OSS Service Provider

* fix checkstyle

* Dart Support for COS Service Provider

* Dart implementation fix - the object storage client aren't serializable
Most of the client implementation including GCS, is not serializable, so
fixed this issue by making client implementation not part of the
serialization, and when the client is passed over wire and the client
doesn't exist, it initializes as and when it is required.

        // In a distributed system, we don't intend the client to be serialized and most of the implementations like
        // GCP Storage implementation doesn't implement java.io.Serializable interface and you may see the below error
        // Caused by: org.apache.flink.api.common.InvalidProgramException: com.google.api.services.storage.Storage@1c666a8f
        // is not serializable. The object probably contains or references non serializable fields.
        // Caused by: java.io.NotSerializableException: com.google.api.services.storage.Storage

* checkstyle fix

* Add unit tests for DartDataStoreClientProvider

* Enable Dagger Parquet Source feature using Ali OSS Service

* Enable Dagger Parquet Source feature using Tencent COS Service

* merge conflict correction

---------

Co-authored-by: Raju G T <[email protected]>
Co-authored-by: rajuGT <[email protected]>
  • Loading branch information
3 people authored Jan 28, 2025
1 parent 880b113 commit 5dbe07c
Showing 1 changed file with 1 addition and 0 deletions.
1 change: 1 addition & 0 deletions dagger-common/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ dependencies {
dependenciesCommonJar group: 'com.qcloud.cos', name: 'flink-cos-fs-hadoop', version: '1.10.0-0.1.10'

dependenciesCommonJar group: 'org.apache.hadoop', name: 'hadoop-aliyun', version: '3.4.1'
dependenciesCommonJar group: 'org.apache.hadoop', name: 'hadoop-cos', version: '3.3.5'

dependenciesCommonJar 'org.apache.flink:flink-metrics-dropwizard:' + flinkVersion
dependenciesCommonJar 'org.apache.flink:flink-json:' + flinkVersion
Expand Down

0 comments on commit 5dbe07c

Please sign in to comment.