Skip to content

Commit

Permalink
feat: bigtable sink (#65)
Browse files Browse the repository at this point in the history
* feat: Bigtable Sink (#33)

* feat: add bigtable sink with stringified odpf messages as values

* feat: add BIGTABLE_CREDENTIAL_PATH to access google cloud

* feat: add wrapper class on bigtable client

* refactor: fix checkstyle

* feat: add bigtable parser tests

* feat: add bigtable sink tests

* feat: add bigtable client tests

* chore: revert version bump

* chore: revert version change in build.gradle

Co-authored-by: mayur.gubrele <[email protected]>
Co-authored-by: lavkesh <[email protected]>

* chore: qa and review fixes

* chore: change default SCHEMA_REGISTRY_STENCIL_CACHE_AUTO_REFRESH to true

* feat: Bigtable record parser (#39)

* feat: Bigtable Sink (#33)

* feat: add bigtable sink with stringified odpf messages as values

* feat: add BIGTABLE_CREDENTIAL_PATH to access google cloud

* feat: add wrapper class on bigtable client

* refactor: fix checkstyle

* feat: add bigtable parser tests

* feat: add bigtable sink tests

* feat: add bigtable client tests

* chore: revert version bump

* chore: revert version change in build.gradle

Co-authored-by: mayur.gubrele <[email protected]>
Co-authored-by: lavkesh <[email protected]>

* feat: create bigtable records using InputOutputFieldMapping provided as configuration

* refactor: fix checkstyle and add unit tests

* review: minor refactor

* refactor: add BigTableSchemaTest and fix BigTableRecordParserTest

* refactor: fix checkstyle

* tests: add few more tests

Co-authored-by: mayur.gubrele <[email protected]>
Co-authored-by: lavkesh <[email protected]>

* chore: schema refactoring

* chore: naming conventions

* feat: add functionality to create rowkey from configured template (#44)

* chore: refactor Template validation

* chore: change exception message in Template

* feat: add bigtable sink metrics and logging (#51)

* feat: Bigtable Sink (#33)

* feat: add bigtable sink with stringified odpf messages as values

* feat: add BIGTABLE_CREDENTIAL_PATH to access google cloud

* feat: add wrapper class on bigtable client

* refactor: fix checkstyle

* feat: add bigtable parser tests

* feat: add bigtable sink tests

* feat: add bigtable client tests

* chore: revert version bump

* chore: revert version change in build.gradle

Co-authored-by: mayur.gubrele <[email protected]>
Co-authored-by: lavkesh <[email protected]>

* chore: qa and review fixes

* chore: change default SCHEMA_REGISTRY_STENCIL_CACHE_AUTO_REFRESH to true

* feat: Bigtable record parser (#39)

* feat: Bigtable Sink (#33)

* feat: add bigtable sink with stringified odpf messages as values

* feat: add BIGTABLE_CREDENTIAL_PATH to access google cloud

* feat: add wrapper class on bigtable client

* refactor: fix checkstyle

* feat: add bigtable parser tests

* feat: add bigtable sink tests

* feat: add bigtable client tests

* chore: revert version bump

* chore: revert version change in build.gradle

Co-authored-by: mayur.gubrele <[email protected]>
Co-authored-by: lavkesh <[email protected]>

* feat: create bigtable records using InputOutputFieldMapping provided as configuration

* refactor: fix checkstyle and add unit tests

* review: minor refactor

* refactor: add BigTableSchemaTest and fix BigTableRecordParserTest

* refactor: fix checkstyle

* tests: add few more tests

Co-authored-by: mayur.gubrele <[email protected]>
Co-authored-by: lavkesh <[email protected]>

* chore: schema refactoring

* chore: naming conventions

* feat: add functionality to create rowkey from configured template (#44)

* chore: refactor Template validation

* chore: change exception message in Template

* feat: add bigtable sink metrics and logging (#51)

* feat: Bigtable error parsing (#55)

* feat: Bigtable Sink (#33)

* feat: add bigtable sink with stringified odpf messages as values

* feat: add BIGTABLE_CREDENTIAL_PATH to access google cloud

* feat: add wrapper class on bigtable client

* refactor: fix checkstyle

* feat: add bigtable parser tests

* feat: add bigtable sink tests

* feat: add bigtable client tests

* chore: revert version bump

* chore: revert version change in build.gradle

Co-authored-by: mayur.gubrele <[email protected]>
Co-authored-by: lavkesh <[email protected]>

* chore: qa and review fixes

* chore: change default SCHEMA_REGISTRY_STENCIL_CACHE_AUTO_REFRESH to true

* feat: Bigtable record parser (#39)

* feat: Bigtable Sink (#33)

* feat: add bigtable sink with stringified odpf messages as values

* feat: add BIGTABLE_CREDENTIAL_PATH to access google cloud

* feat: add wrapper class on bigtable client

* refactor: fix checkstyle

* feat: add bigtable parser tests

* feat: add bigtable sink tests

* feat: add bigtable client tests

* chore: revert version bump

* chore: revert version change in build.gradle

Co-authored-by: mayur.gubrele <[email protected]>
Co-authored-by: lavkesh <[email protected]>

* feat: create bigtable records using InputOutputFieldMapping provided as configuration

* refactor: fix checkstyle and add unit tests

* review: minor refactor

* refactor: add BigTableSchemaTest and fix BigTableRecordParserTest

* refactor: fix checkstyle

* tests: add few more tests

Co-authored-by: mayur.gubrele <[email protected]>
Co-authored-by: lavkesh <[email protected]>

* chore: schema refactoring

* chore: naming conventions

* feat: add functionality to create rowkey from configured template (#44)

* chore: refactor Template validation

* chore: change exception message in Template

* feat: add bigtable sink metrics and logging (#51)

* feat: parse bigtable errors and create odpf response

* feat: capture error metrics with predefined tags

* chore: add tests for bigtable response parser

* chore: remove deprecated jcenter repo from build.gradle

* chore: comment out classpath dependencies

* chore: remove unused classpath dependencies

* chore: minor refactor

* chore: remove unused classpath dependencies

* refactor: change BigtableRecord,Response contracts, improved logging, added tests

Co-authored-by: mayur.gubrele <[email protected]>
Co-authored-by: lavkesh <[email protected]>

* chore: minor fixes (#59)

* feat: Bigtable Sink (#33)

* feat: add bigtable sink with stringified odpf messages as values

* feat: add BIGTABLE_CREDENTIAL_PATH to access google cloud

* feat: add wrapper class on bigtable client

* refactor: fix checkstyle

* feat: add bigtable parser tests

* feat: add bigtable sink tests

* feat: add bigtable client tests

* chore: revert version bump

* chore: revert version change in build.gradle

Co-authored-by: mayur.gubrele <[email protected]>
Co-authored-by: lavkesh <[email protected]>

* chore: qa and review fixes

* chore: change default SCHEMA_REGISTRY_STENCIL_CACHE_AUTO_REFRESH to true

* feat: Bigtable record parser (#39)

* feat: Bigtable Sink (#33)

* feat: add bigtable sink with stringified odpf messages as values

* feat: add BIGTABLE_CREDENTIAL_PATH to access google cloud

* feat: add wrapper class on bigtable client

* refactor: fix checkstyle

* feat: add bigtable parser tests

* feat: add bigtable sink tests

* feat: add bigtable client tests

* chore: revert version bump

* chore: revert version change in build.gradle

Co-authored-by: mayur.gubrele <[email protected]>
Co-authored-by: lavkesh <[email protected]>

* feat: create bigtable records using InputOutputFieldMapping provided as configuration

* refactor: fix checkstyle and add unit tests

* review: minor refactor

* refactor: add BigTableSchemaTest and fix BigTableRecordParserTest

* refactor: fix checkstyle

* tests: add few more tests

Co-authored-by: mayur.gubrele <[email protected]>
Co-authored-by: lavkesh <[email protected]>

* chore: schema refactoring

* chore: naming conventions

* feat: add functionality to create rowkey from configured template (#44)

* chore: refactor Template validation

* chore: change exception message in Template

* feat: add bigtable sink metrics and logging (#51)

* feat: parse bigtable errors and create odpf response

* feat: capture error metrics with predefined tags

* chore: add tests for bigtable response parser

* chore: remove deprecated jcenter repo from build.gradle

* chore: add bq error logs (#57)

* chore: comment out classpath dependencies

* chore: remove unused classpath dependencies

* chore: minor refactor

* chore: remove unused classpath dependencies

* refactor: change BigtableRecord,Response contracts, improved logging, added tests

* chore: small fixes

Co-authored-by: Mayur Gubrele <[email protected]>
Co-authored-by: mayur.gubrele <[email protected]>

* docs: adds bigtable sink documentation (#58)

* feat: Bigtable Sink (#33)

* feat: add bigtable sink with stringified odpf messages as values

* feat: add BIGTABLE_CREDENTIAL_PATH to access google cloud

* feat: add wrapper class on bigtable client

* refactor: fix checkstyle

* feat: add bigtable parser tests

* feat: add bigtable sink tests

* feat: add bigtable client tests

* chore: revert version bump

* chore: revert version change in build.gradle

Co-authored-by: mayur.gubrele <[email protected]>
Co-authored-by: lavkesh <[email protected]>

* chore: qa and review fixes

* chore: change default SCHEMA_REGISTRY_STENCIL_CACHE_AUTO_REFRESH to true

* feat: Bigtable record parser (#39)

* feat: Bigtable Sink (#33)

* feat: add bigtable sink with stringified odpf messages as values

* feat: add BIGTABLE_CREDENTIAL_PATH to access google cloud

* feat: add wrapper class on bigtable client

* refactor: fix checkstyle

* feat: add bigtable parser tests

* feat: add bigtable sink tests

* feat: add bigtable client tests

* chore: revert version bump

* chore: revert version change in build.gradle

Co-authored-by: mayur.gubrele <[email protected]>
Co-authored-by: lavkesh <[email protected]>

* feat: create bigtable records using InputOutputFieldMapping provided as configuration

* refactor: fix checkstyle and add unit tests

* review: minor refactor

* refactor: add BigTableSchemaTest and fix BigTableRecordParserTest

* refactor: fix checkstyle

* tests: add few more tests

Co-authored-by: mayur.gubrele <[email protected]>
Co-authored-by: lavkesh <[email protected]>

* chore: schema refactoring

* chore: naming conventions

* feat: add functionality to create rowkey from configured template (#44)

* chore: refactor Template validation

* chore: change exception message in Template

* feat: add bigtable sink metrics and logging (#51)

* docs: add bigtable sink documentation

Co-authored-by: mayur.gubrele <[email protected]>
Co-authored-by: lavkesh <[email protected]>

* chore: rc version

* feat: exclude io.grpc from bigtable library and bump the version to 0.3.4-beta.4

* chore: bump up depot version to 0.3.4

* refactor: fix template test and remove whitespace

Co-authored-by: mayur.gubrele <[email protected]>
Co-authored-by: lavkesh <[email protected]>
  • Loading branch information
3 people authored Nov 29, 2022
1 parent 7992b44 commit 85435fd
Show file tree
Hide file tree
Showing 37 changed files with 1,646 additions and 35 deletions.
6 changes: 4 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ buildscript {
}
dependencies {
classpath 'com.google.protobuf:protobuf-gradle-plugin:0.8.17'
classpath "org.jfrog.buildinfo:build-info-extractor-gradle:4.4.7"
classpath "org.jfrog.buildinfo:build-info-extractor-gradle:4.4.7"
classpath "org.ajoberstar:gradle-git:1.6.0"
}
}
Expand All @@ -26,7 +26,6 @@ version '0.3.4'

repositories {
mavenCentral()
jcenter()
mavenLocal()
}

Expand All @@ -37,6 +36,9 @@ dependencies {
implementation group: 'io.odpf', name: 'stencil', version: '0.2.1' exclude group: 'org.slf4j'
implementation group: 'org.aeonbits.owner', name: 'owner', version: '1.0.9'
implementation 'com.google.cloud:google-cloud-bigquery:1.115.0'
implementation (group: 'com.google.cloud', name:'google-cloud-bigtable', version:'2.11.2') {
exclude group: "io.grpc"
}
implementation "io.grpc:grpc-all:1.38.0"
implementation group: 'org.slf4j', name: 'jul-to-slf4j', version: '1.7.35'
implementation group: 'redis.clients', name: 'jedis', version: '3.0.1'
Expand Down
1 change: 1 addition & 0 deletions docs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ GRPC)
* Log Sink
* Bigquery Sink
* Redis Sink
* Bigtable Sink

Depot is a sink connector, which acts as a bridge between data processing systems and real sink. The APIs in this
library can be used to push data to various sinks. Common sinks implementations will be added in this repo.
Expand Down
2 changes: 2 additions & 0 deletions docs/reference/configuration/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,6 @@ This page contains reference for all the configurations for sink connectors.
* [Generic](generic.md)
* [Stencil Client](stencil-client.md)
* [Bigquery Sink](bigquery-sink.md)
* [Redis Sink](redis.md)
* [Bigtable Sink](bigtable.md)

61 changes: 61 additions & 0 deletions docs/reference/configuration/bigtable.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
# Bigtable Sink

A Bigtable sink requires the following variables to be set along with Generic ones

## `SINK_BIGTABLE_GOOGLE_CLOUD_PROJECT_ID`

Contains information of google cloud project id of the bigtable table where the records need to be inserted/updated. Further
documentation on google cloud [project id](https://cloud.google.com/resource-manager/docs/creating-managing-projects).

* Example value: `gcp-project-id`
* Type: `required`

## `SINK_BIGTABLE_INSTANCE_ID`

A Bigtable instance is a container for your data, which contain clusters that your applications can connect to. Each cluster contains nodes, compute units that manage your data and perform maintenance tasks.

A table belongs to an instance, not to a cluster or node. Here you provide the name of that bigtable instance your table belongs to. Further
documentation on [bigtable Instances, clusters, and nodes](https://cloud.google.com/bigtable/docs/instances-clusters-nodes).

* Example value: `cloud-bigtable-instance-id`
* Type: `required`

## `SINK_BIGTABLE_CREDENTIAL_PATH`

Full path of google cloud credentials file. Further documentation of google cloud authentication
and [credentials](https://cloud.google.com/docs/authentication/getting-started).

* Example value: `/.secret/google-cloud-credentials.json`
* Type: `required`

## `SINK_BIGTABLE_TABLE_ID`

Bigtable stores data in massively scalable tables, each of which is a sorted key/value map.

Here you provide the name of the table where the records need to be inserted/updated. Further documentation on
[bigtable tables](https://cloud.google.com/bigtable/docs/managing-tables).

* Example value: `depot-sample-table`
* Type: `required`

## `SINK_BIGTABLE_ROW_KEY_TEMPLATE`

Bigtable tables are composed of rows, each of which typically describes a single entity. Each row is indexed by a single row key.

Here you provide a string template which will be used to create row keys using one or many fields of your input data. Further documentation on [Bigtable storage model](https://cloud.google.com/bigtable/docs/overview#storage-model).

In the example below, If field_1 and field_2 are `String` and `Integer` data types respectively with values as `alpha` and `10` for a specific record, row key generated for this record will be: `key-alpha-10`

* Example value: `key-%s-%d, field_1, field_2`
* Type: `required`

## `SINK_BIGTABLE_COLUMN_FAMILY_MAPPING`

Bigtable columns that are related to one another are typically grouped into a column family. Each column is identified by a combination of the column family and a column qualifier, which is a unique name within the column family.

Here you provide the mapping of the table's `column families` and `qualifiers`, and the field names from input data that we intent to insert into the table. Further documentation on [Bigtable storage model](https://cloud.google.com/bigtable/docs/overview#storage-model).

Please note that `Column families` being provided in this configuration, need to exist in the table beforehand. While `Column Qualifiers` will be created if they don't exist.

* Example value: `{ "depot-sample-family" : { "depot-sample-qualifier-1" : "field_1", "depot-sample-qualifier-2" : "field_7", "depot-sample-qualifier-3" : "field_5"} }`
* Type: `required`
18 changes: 16 additions & 2 deletions docs/reference/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@ Sinks can have their own metrics, and they will be emmited while using sink conn
## Table of Contents

* [Bigquery Sink](metrics.md#bigquery-sink)
* [Bigtable Sink](metrics.md#bigtable-sink)

## Bigquery Sink

### `Biquery Operation Total`
### `Bigquery Operation Total`

Total number of bigquery API operation performed

Expand All @@ -19,7 +20,20 @@ Time taken for bigquery API operation performed

### `Bigquery Errors Total`

Total numbers of error occurred on bigquery insert operation.
Total numbers of error occurred on bigquery insert operation

## Bigtable Sink

### `Bigtable Operation Total`

Total number of bigtable insert/update operation performed

### `Bigtable Operation Latency`

Time taken for bigtable insert/update operation performed

### `Bigtable Errors Total`

Total numbers of error occurred on bigtable insert/update operation


3 changes: 2 additions & 1 deletion docs/reference/odpf_sink_response.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ These errors are returned by sinks in the OdpfSinkResponse object. The error typ
* UNKNOWN_FIELDS_ERROR
* SINK_4XX_ERROR
* SINK_5XX_ERROR
* SINK_RETRYABLE_ERROR
* SINK_UNKNOWN_ERROR
* DEFAULT_ERROR
* If no error is specified
* If no error is specified (To be deprecated soon)

40 changes: 40 additions & 0 deletions docs/sinks/bigtable.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# Bigtable Sink

## Overview
Depot Bigtable Sink translates protobuf messages to bigtable records and insert them to a bigtable table. Its other responsibilities include validating the provided [column-family-schema](../reference/configuration/bigtable.md#sink_bigtable_column_family_mapping), and check whether the configured table exists in [Bigtable instance](../reference/configuration/bigtable.md#sink_bigtable_instance_id) or not.

Depot uses [Java Client Library for the Cloud Bigtable API](https://cloud.google.com/bigtable/docs/reference/libraries) to perform any operations on Bigtable.

## Setup Required
To be able to insert/update records in Bigtable, One must have following setup in place:

* [Bigtable Instance](../reference/configuration/bigtable.md#sink_bigtable_instance_id) belonging to the [GCP project](../reference/configuration/bigtable.md#sink_bigtable_google_cloud_project_id) provided in configuration
* Bigtable [Table](../reference/configuration/bigtable.md#sink_bigtable_table_id) where the records are supposed to be inserted/updated
* Column families that are provided as part of [column-family-mapping](../reference/configuration/bigtable.md#sink_bigtable_column_family_mapping)
* Google cloud [Bigtable IAM permission](https://cloud.google.com/bigtable/docs/access-control) required to access and modify the configured Bigtable Instance and Table

## Metrics

Check out the list of [metrics](../reference/metrics.md#bigtable-sink) captured under Bigtable Sink.

## Error Handling

[BigtableResponse](../../src/main/java/io/odpf/depot/bigtable/response/BigTableResponse.java) class have the list of failed [mutations](https://cloud.google.com/bigtable/docs/writes#write-types). [BigtableResponseParser](../../src/main/java/io/odpf/depot/bigtable/parser/BigTableResponseParser.java) looks for errors from each failed mutation and create [ErrorInfo](../../src/main/java/io/odpf/depot/error/ErrorInfo.java) objects based on the type/HttpStatusCode of the underlying error. This error info is then sent to the application.

| Error From Bigtable | Error Type Captured |
| --------------- | -------------------- |
| Retryable Error | SINK_RETRYABLE_ERROR |
| Having status code in range 400-499 | SINK_4XX_ERROR |
| Having status code in range 500-599 | SINK_5XX_ERROR |
| Any other Error | SINK_UNKNOWN_ERROR |

### Error Telemetry

[BigtableResponseParser](../../src/main/java/io/odpf/depot/bigtable/parser/BigTableResponseParser.java) looks for any specific error types sent from Bigtable and capture those under [BigtableTotalErrorMetrics](../reference/metrics.md#bigtable-sink) with suitable error tags.

| Error Type | Error Tag Assigned |
| --------------- | -------------------- |
| Bad Request | BAD_REQUEST |
| Quota Failure | QUOTA_FAILURE |
| Precondition Failure | PRECONDITION_FAILURE |
| Any other Error | RPC_FAILURE |
58 changes: 58 additions & 0 deletions src/main/java/io/odpf/depot/bigtable/BigTableSink.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package io.odpf.depot.bigtable;

import io.odpf.depot.OdpfSink;
import io.odpf.depot.OdpfSinkResponse;
import io.odpf.depot.bigtable.client.BigTableClient;
import io.odpf.depot.bigtable.model.BigTableRecord;
import io.odpf.depot.bigtable.parser.BigTableRecordParser;
import io.odpf.depot.bigtable.parser.BigTableResponseParser;
import io.odpf.depot.bigtable.response.BigTableResponse;
import io.odpf.depot.error.ErrorInfo;
import io.odpf.depot.message.OdpfMessage;
import io.odpf.depot.metrics.BigTableMetrics;
import io.odpf.depot.metrics.Instrumentation;

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

public class BigTableSink implements OdpfSink {
private final BigTableClient bigTableClient;
private final BigTableRecordParser bigTableRecordParser;
private final BigTableMetrics bigtableMetrics;
private final Instrumentation instrumentation;

public BigTableSink(BigTableClient bigTableClient, BigTableRecordParser bigTableRecordParser, BigTableMetrics bigtableMetrics, Instrumentation instrumentation) {
this.bigTableClient = bigTableClient;
this.bigTableRecordParser = bigTableRecordParser;
this.bigtableMetrics = bigtableMetrics;
this.instrumentation = instrumentation;
}

@Override
public OdpfSinkResponse pushToSink(List<OdpfMessage> messages) {
List<BigTableRecord> records = bigTableRecordParser.convert(messages);
Map<Boolean, List<BigTableRecord>> splitterRecords = records.stream().collect(Collectors.partitioningBy(BigTableRecord::isValid));
List<BigTableRecord> invalidRecords = splitterRecords.get(Boolean.FALSE);
List<BigTableRecord> validRecords = splitterRecords.get(Boolean.TRUE);

OdpfSinkResponse odpfSinkResponse = new OdpfSinkResponse();
invalidRecords.forEach(invalidRecord -> odpfSinkResponse.addErrors(invalidRecord.getIndex(), invalidRecord.getErrorInfo()));

if (validRecords.size() > 0) {
BigTableResponse bigTableResponse = bigTableClient.send(validRecords);
if (bigTableResponse != null && bigTableResponse.hasErrors()) {
instrumentation.logInfo("Found {} Error records in response", bigTableResponse.getErrorCount());
Map<Long, ErrorInfo> errorInfoMap = BigTableResponseParser.getErrorsFromSinkResponse(validRecords, bigTableResponse, bigtableMetrics, instrumentation);
errorInfoMap.forEach(odpfSinkResponse::addErrors);
}
}

return odpfSinkResponse;
}

@Override
public void close() throws IOException {
}
}
85 changes: 85 additions & 0 deletions src/main/java/io/odpf/depot/bigtable/BigTableSinkFactory.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package io.odpf.depot.bigtable;

import com.timgroup.statsd.NoOpStatsDClient;
import io.odpf.depot.OdpfSink;
import io.odpf.depot.bigtable.client.BigTableClient;
import io.odpf.depot.bigtable.model.BigTableSchema;
import io.odpf.depot.bigtable.parser.BigTableRecordParser;
import io.odpf.depot.bigtable.parser.BigTableRowKeyParser;
import io.odpf.depot.common.Template;
import io.odpf.depot.common.Tuple;
import io.odpf.depot.config.BigTableSinkConfig;
import io.odpf.depot.exception.ConfigurationException;
import io.odpf.depot.exception.InvalidTemplateException;
import io.odpf.depot.message.OdpfMessageParser;
import io.odpf.depot.message.OdpfMessageParserFactory;
import io.odpf.depot.message.OdpfMessageSchema;
import io.odpf.depot.message.SinkConnectorSchemaMessageMode;
import io.odpf.depot.metrics.BigTableMetrics;
import io.odpf.depot.metrics.Instrumentation;
import io.odpf.depot.metrics.StatsDReporter;
import io.odpf.depot.utils.MessageConfigUtils;

import java.io.IOException;

public class BigTableSinkFactory {
private final BigTableSinkConfig sinkConfig;
private final StatsDReporter statsDReporter;
private BigTableClient bigTableClient;
private BigTableRecordParser bigTableRecordParser;
private BigTableMetrics bigtableMetrics;

public BigTableSinkFactory(BigTableSinkConfig sinkConfig, StatsDReporter statsDReporter) {
this.sinkConfig = sinkConfig;
this.statsDReporter = statsDReporter;
}

public BigTableSinkFactory(BigTableSinkConfig sinkConfig) {
this(sinkConfig, new StatsDReporter(new NoOpStatsDClient()));
}


public void init() {
try {
Instrumentation instrumentation = new Instrumentation(statsDReporter, BigTableSinkFactory.class);
String bigtableConfig = String.format("\n\tbigtable.gcloud.project = %s\n\tbigtable.instance = %s\n\tbigtable.table = %s"
+ "\n\tbigtable.credential.path = %s\n\tbigtable.row.key.template = %s\n\tbigtable.column.family.mapping = %s\n\t",
sinkConfig.getGCloudProjectID(),
sinkConfig.getInstanceId(),
sinkConfig.getTableId(),
sinkConfig.getCredentialPath(),
sinkConfig.getRowKeyTemplate(),
sinkConfig.getColumnFamilyMapping());

instrumentation.logInfo(bigtableConfig);
BigTableSchema bigtableSchema = new BigTableSchema(sinkConfig.getColumnFamilyMapping());
bigtableMetrics = new BigTableMetrics(sinkConfig);
bigTableClient = new BigTableClient(sinkConfig, bigtableSchema, bigtableMetrics, new Instrumentation(statsDReporter, BigTableClient.class));
bigTableClient.validateBigTableSchema();

Tuple<SinkConnectorSchemaMessageMode, String> modeAndSchema = MessageConfigUtils.getModeAndSchema(sinkConfig);
OdpfMessageParser odpfMessageParser = OdpfMessageParserFactory.getParser(sinkConfig, statsDReporter);
OdpfMessageSchema schema = odpfMessageParser.getSchema(modeAndSchema.getSecond());

Template keyTemplate = new Template(sinkConfig.getRowKeyTemplate());
BigTableRowKeyParser bigTableRowKeyParser = new BigTableRowKeyParser(keyTemplate, schema);
bigTableRecordParser = new BigTableRecordParser(
odpfMessageParser,
bigTableRowKeyParser,
modeAndSchema,
schema,
bigtableSchema);
instrumentation.logInfo("Connection to bigtable established successfully");
} catch (IOException | InvalidTemplateException e) {
throw new ConfigurationException("Exception occurred while creating sink", e);
}
}

public OdpfSink create() {
return new BigTableSink(
bigTableClient,
bigTableRecordParser,
bigtableMetrics,
new Instrumentation(statsDReporter, BigTableSink.class));
}
}
Loading

0 comments on commit 85435fd

Please sign in to comment.