diff --git a/build.gradle b/build.gradle index 2b76293e..b846dcc6 100644 --- a/build.gradle +++ b/build.gradle @@ -4,7 +4,7 @@ buildscript { } dependencies { classpath 'com.google.protobuf:protobuf-gradle-plugin:0.8.17' - classpath "org.jfrog.buildinfo:build-info-extractor-gradle:4.4.7" + classpath "org.jfrog.buildinfo:build-info-extractor-gradle:4.4.7" classpath "org.ajoberstar:gradle-git:1.6.0" } } @@ -26,7 +26,6 @@ version '0.3.4' repositories { mavenCentral() - jcenter() mavenLocal() } @@ -37,6 +36,9 @@ dependencies { implementation group: 'io.odpf', name: 'stencil', version: '0.2.1' exclude group: 'org.slf4j' implementation group: 'org.aeonbits.owner', name: 'owner', version: '1.0.9' implementation 'com.google.cloud:google-cloud-bigquery:1.115.0' + implementation (group: 'com.google.cloud', name:'google-cloud-bigtable', version:'2.11.2') { + exclude group: "io.grpc" + } implementation "io.grpc:grpc-all:1.38.0" implementation group: 'org.slf4j', name: 'jul-to-slf4j', version: '1.7.35' implementation group: 'redis.clients', name: 'jedis', version: '3.0.1' diff --git a/docs/README.md b/docs/README.md index 31a55f8d..f3285d8c 100644 --- a/docs/README.md +++ b/docs/README.md @@ -11,6 +11,7 @@ GRPC) * Log Sink * Bigquery Sink * Redis Sink +* Bigtable Sink Depot is a sink connector, which acts as a bridge between data processing systems and real sink. The APIs in this library can be used to push data to various sinks. Common sinks implementations will be added in this repo. diff --git a/docs/reference/configuration/README.md b/docs/reference/configuration/README.md index 642d1f34..859a854b 100644 --- a/docs/reference/configuration/README.md +++ b/docs/reference/configuration/README.md @@ -7,4 +7,6 @@ This page contains reference for all the configurations for sink connectors. * [Generic](generic.md) * [Stencil Client](stencil-client.md) * [Bigquery Sink](bigquery-sink.md) +* [Redis Sink](redis.md) +* [Bigtable Sink](bigtable.md) diff --git a/docs/reference/configuration/bigtable.md b/docs/reference/configuration/bigtable.md new file mode 100644 index 00000000..d99066a4 --- /dev/null +++ b/docs/reference/configuration/bigtable.md @@ -0,0 +1,61 @@ +# Bigtable Sink + +A Bigtable sink requires the following variables to be set along with Generic ones + +## `SINK_BIGTABLE_GOOGLE_CLOUD_PROJECT_ID` + +Contains information of google cloud project id of the bigtable table where the records need to be inserted/updated. Further +documentation on google cloud [project id](https://cloud.google.com/resource-manager/docs/creating-managing-projects). + +* Example value: `gcp-project-id` +* Type: `required` + +## `SINK_BIGTABLE_INSTANCE_ID` + +A Bigtable instance is a container for your data, which contain clusters that your applications can connect to. Each cluster contains nodes, compute units that manage your data and perform maintenance tasks. + +A table belongs to an instance, not to a cluster or node. Here you provide the name of that bigtable instance your table belongs to. Further +documentation on [bigtable Instances, clusters, and nodes](https://cloud.google.com/bigtable/docs/instances-clusters-nodes). + +* Example value: `cloud-bigtable-instance-id` +* Type: `required` + +## `SINK_BIGTABLE_CREDENTIAL_PATH` + +Full path of google cloud credentials file. Further documentation of google cloud authentication +and [credentials](https://cloud.google.com/docs/authentication/getting-started). + +* Example value: `/.secret/google-cloud-credentials.json` +* Type: `required` + +## `SINK_BIGTABLE_TABLE_ID` + +Bigtable stores data in massively scalable tables, each of which is a sorted key/value map. + +Here you provide the name of the table where the records need to be inserted/updated. Further documentation on +[bigtable tables](https://cloud.google.com/bigtable/docs/managing-tables). + +* Example value: `depot-sample-table` +* Type: `required` + +## `SINK_BIGTABLE_ROW_KEY_TEMPLATE` + +Bigtable tables are composed of rows, each of which typically describes a single entity. Each row is indexed by a single row key. + +Here you provide a string template which will be used to create row keys using one or many fields of your input data. Further documentation on [Bigtable storage model](https://cloud.google.com/bigtable/docs/overview#storage-model). + +In the example below, If field_1 and field_2 are `String` and `Integer` data types respectively with values as `alpha` and `10` for a specific record, row key generated for this record will be: `key-alpha-10` + +* Example value: `key-%s-%d, field_1, field_2` +* Type: `required` + +## `SINK_BIGTABLE_COLUMN_FAMILY_MAPPING` + +Bigtable columns that are related to one another are typically grouped into a column family. Each column is identified by a combination of the column family and a column qualifier, which is a unique name within the column family. + +Here you provide the mapping of the table's `column families` and `qualifiers`, and the field names from input data that we intent to insert into the table. Further documentation on [Bigtable storage model](https://cloud.google.com/bigtable/docs/overview#storage-model). + +Please note that `Column families` being provided in this configuration, need to exist in the table beforehand. While `Column Qualifiers` will be created if they don't exist. + +* Example value: `{ "depot-sample-family" : { "depot-sample-qualifier-1" : "field_1", "depot-sample-qualifier-2" : "field_7", "depot-sample-qualifier-3" : "field_5"} }` +* Type: `required` diff --git a/docs/reference/metrics.md b/docs/reference/metrics.md index 8b926388..2ee2bdb6 100644 --- a/docs/reference/metrics.md +++ b/docs/reference/metrics.md @@ -6,10 +6,11 @@ Sinks can have their own metrics, and they will be emmited while using sink conn ## Table of Contents * [Bigquery Sink](metrics.md#bigquery-sink) +* [Bigtable Sink](metrics.md#bigtable-sink) ## Bigquery Sink -### `Biquery Operation Total` +### `Bigquery Operation Total` Total number of bigquery API operation performed @@ -19,7 +20,20 @@ Time taken for bigquery API operation performed ### `Bigquery Errors Total` -Total numbers of error occurred on bigquery insert operation. +Total numbers of error occurred on bigquery insert operation +## Bigtable Sink + +### `Bigtable Operation Total` + +Total number of bigtable insert/update operation performed + +### `Bigtable Operation Latency` + +Time taken for bigtable insert/update operation performed + +### `Bigtable Errors Total` + +Total numbers of error occurred on bigtable insert/update operation diff --git a/docs/reference/odpf_sink_response.md b/docs/reference/odpf_sink_response.md index d2cf9880..1fe749f3 100644 --- a/docs/reference/odpf_sink_response.md +++ b/docs/reference/odpf_sink_response.md @@ -17,7 +17,8 @@ These errors are returned by sinks in the OdpfSinkResponse object. The error typ * UNKNOWN_FIELDS_ERROR * SINK_4XX_ERROR * SINK_5XX_ERROR +* SINK_RETRYABLE_ERROR * SINK_UNKNOWN_ERROR * DEFAULT_ERROR - * If no error is specified + * If no error is specified (To be deprecated soon) diff --git a/docs/sinks/bigtable.md b/docs/sinks/bigtable.md new file mode 100644 index 00000000..2bcb7f52 --- /dev/null +++ b/docs/sinks/bigtable.md @@ -0,0 +1,40 @@ +# Bigtable Sink + +## Overview +Depot Bigtable Sink translates protobuf messages to bigtable records and insert them to a bigtable table. Its other responsibilities include validating the provided [column-family-schema](../reference/configuration/bigtable.md#sink_bigtable_column_family_mapping), and check whether the configured table exists in [Bigtable instance](../reference/configuration/bigtable.md#sink_bigtable_instance_id) or not. + +Depot uses [Java Client Library for the Cloud Bigtable API](https://cloud.google.com/bigtable/docs/reference/libraries) to perform any operations on Bigtable. + +## Setup Required +To be able to insert/update records in Bigtable, One must have following setup in place: + +* [Bigtable Instance](../reference/configuration/bigtable.md#sink_bigtable_instance_id) belonging to the [GCP project](../reference/configuration/bigtable.md#sink_bigtable_google_cloud_project_id) provided in configuration +* Bigtable [Table](../reference/configuration/bigtable.md#sink_bigtable_table_id) where the records are supposed to be inserted/updated +* Column families that are provided as part of [column-family-mapping](../reference/configuration/bigtable.md#sink_bigtable_column_family_mapping) +* Google cloud [Bigtable IAM permission](https://cloud.google.com/bigtable/docs/access-control) required to access and modify the configured Bigtable Instance and Table + +## Metrics + +Check out the list of [metrics](../reference/metrics.md#bigtable-sink) captured under Bigtable Sink. + +## Error Handling + +[BigtableResponse](../../src/main/java/io/odpf/depot/bigtable/response/BigTableResponse.java) class have the list of failed [mutations](https://cloud.google.com/bigtable/docs/writes#write-types). [BigtableResponseParser](../../src/main/java/io/odpf/depot/bigtable/parser/BigTableResponseParser.java) looks for errors from each failed mutation and create [ErrorInfo](../../src/main/java/io/odpf/depot/error/ErrorInfo.java) objects based on the type/HttpStatusCode of the underlying error. This error info is then sent to the application. + +| Error From Bigtable | Error Type Captured | +| --------------- | -------------------- | +| Retryable Error | SINK_RETRYABLE_ERROR | +| Having status code in range 400-499 | SINK_4XX_ERROR | +| Having status code in range 500-599 | SINK_5XX_ERROR | +| Any other Error | SINK_UNKNOWN_ERROR | + +### Error Telemetry + +[BigtableResponseParser](../../src/main/java/io/odpf/depot/bigtable/parser/BigTableResponseParser.java) looks for any specific error types sent from Bigtable and capture those under [BigtableTotalErrorMetrics](../reference/metrics.md#bigtable-sink) with suitable error tags. + +| Error Type | Error Tag Assigned | +| --------------- | -------------------- | +| Bad Request | BAD_REQUEST | +| Quota Failure | QUOTA_FAILURE | +| Precondition Failure | PRECONDITION_FAILURE | +| Any other Error | RPC_FAILURE | diff --git a/src/main/java/io/odpf/depot/bigtable/BigTableSink.java b/src/main/java/io/odpf/depot/bigtable/BigTableSink.java new file mode 100644 index 00000000..f59422e6 --- /dev/null +++ b/src/main/java/io/odpf/depot/bigtable/BigTableSink.java @@ -0,0 +1,58 @@ +package io.odpf.depot.bigtable; + +import io.odpf.depot.OdpfSink; +import io.odpf.depot.OdpfSinkResponse; +import io.odpf.depot.bigtable.client.BigTableClient; +import io.odpf.depot.bigtable.model.BigTableRecord; +import io.odpf.depot.bigtable.parser.BigTableRecordParser; +import io.odpf.depot.bigtable.parser.BigTableResponseParser; +import io.odpf.depot.bigtable.response.BigTableResponse; +import io.odpf.depot.error.ErrorInfo; +import io.odpf.depot.message.OdpfMessage; +import io.odpf.depot.metrics.BigTableMetrics; +import io.odpf.depot.metrics.Instrumentation; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +public class BigTableSink implements OdpfSink { + private final BigTableClient bigTableClient; + private final BigTableRecordParser bigTableRecordParser; + private final BigTableMetrics bigtableMetrics; + private final Instrumentation instrumentation; + + public BigTableSink(BigTableClient bigTableClient, BigTableRecordParser bigTableRecordParser, BigTableMetrics bigtableMetrics, Instrumentation instrumentation) { + this.bigTableClient = bigTableClient; + this.bigTableRecordParser = bigTableRecordParser; + this.bigtableMetrics = bigtableMetrics; + this.instrumentation = instrumentation; + } + + @Override + public OdpfSinkResponse pushToSink(List messages) { + List records = bigTableRecordParser.convert(messages); + Map> splitterRecords = records.stream().collect(Collectors.partitioningBy(BigTableRecord::isValid)); + List invalidRecords = splitterRecords.get(Boolean.FALSE); + List validRecords = splitterRecords.get(Boolean.TRUE); + + OdpfSinkResponse odpfSinkResponse = new OdpfSinkResponse(); + invalidRecords.forEach(invalidRecord -> odpfSinkResponse.addErrors(invalidRecord.getIndex(), invalidRecord.getErrorInfo())); + + if (validRecords.size() > 0) { + BigTableResponse bigTableResponse = bigTableClient.send(validRecords); + if (bigTableResponse != null && bigTableResponse.hasErrors()) { + instrumentation.logInfo("Found {} Error records in response", bigTableResponse.getErrorCount()); + Map errorInfoMap = BigTableResponseParser.getErrorsFromSinkResponse(validRecords, bigTableResponse, bigtableMetrics, instrumentation); + errorInfoMap.forEach(odpfSinkResponse::addErrors); + } + } + + return odpfSinkResponse; + } + + @Override + public void close() throws IOException { + } +} diff --git a/src/main/java/io/odpf/depot/bigtable/BigTableSinkFactory.java b/src/main/java/io/odpf/depot/bigtable/BigTableSinkFactory.java new file mode 100644 index 00000000..c1c2cf1e --- /dev/null +++ b/src/main/java/io/odpf/depot/bigtable/BigTableSinkFactory.java @@ -0,0 +1,85 @@ +package io.odpf.depot.bigtable; + +import com.timgroup.statsd.NoOpStatsDClient; +import io.odpf.depot.OdpfSink; +import io.odpf.depot.bigtable.client.BigTableClient; +import io.odpf.depot.bigtable.model.BigTableSchema; +import io.odpf.depot.bigtable.parser.BigTableRecordParser; +import io.odpf.depot.bigtable.parser.BigTableRowKeyParser; +import io.odpf.depot.common.Template; +import io.odpf.depot.common.Tuple; +import io.odpf.depot.config.BigTableSinkConfig; +import io.odpf.depot.exception.ConfigurationException; +import io.odpf.depot.exception.InvalidTemplateException; +import io.odpf.depot.message.OdpfMessageParser; +import io.odpf.depot.message.OdpfMessageParserFactory; +import io.odpf.depot.message.OdpfMessageSchema; +import io.odpf.depot.message.SinkConnectorSchemaMessageMode; +import io.odpf.depot.metrics.BigTableMetrics; +import io.odpf.depot.metrics.Instrumentation; +import io.odpf.depot.metrics.StatsDReporter; +import io.odpf.depot.utils.MessageConfigUtils; + +import java.io.IOException; + +public class BigTableSinkFactory { + private final BigTableSinkConfig sinkConfig; + private final StatsDReporter statsDReporter; + private BigTableClient bigTableClient; + private BigTableRecordParser bigTableRecordParser; + private BigTableMetrics bigtableMetrics; + + public BigTableSinkFactory(BigTableSinkConfig sinkConfig, StatsDReporter statsDReporter) { + this.sinkConfig = sinkConfig; + this.statsDReporter = statsDReporter; + } + + public BigTableSinkFactory(BigTableSinkConfig sinkConfig) { + this(sinkConfig, new StatsDReporter(new NoOpStatsDClient())); + } + + + public void init() { + try { + Instrumentation instrumentation = new Instrumentation(statsDReporter, BigTableSinkFactory.class); + String bigtableConfig = String.format("\n\tbigtable.gcloud.project = %s\n\tbigtable.instance = %s\n\tbigtable.table = %s" + + "\n\tbigtable.credential.path = %s\n\tbigtable.row.key.template = %s\n\tbigtable.column.family.mapping = %s\n\t", + sinkConfig.getGCloudProjectID(), + sinkConfig.getInstanceId(), + sinkConfig.getTableId(), + sinkConfig.getCredentialPath(), + sinkConfig.getRowKeyTemplate(), + sinkConfig.getColumnFamilyMapping()); + + instrumentation.logInfo(bigtableConfig); + BigTableSchema bigtableSchema = new BigTableSchema(sinkConfig.getColumnFamilyMapping()); + bigtableMetrics = new BigTableMetrics(sinkConfig); + bigTableClient = new BigTableClient(sinkConfig, bigtableSchema, bigtableMetrics, new Instrumentation(statsDReporter, BigTableClient.class)); + bigTableClient.validateBigTableSchema(); + + Tuple modeAndSchema = MessageConfigUtils.getModeAndSchema(sinkConfig); + OdpfMessageParser odpfMessageParser = OdpfMessageParserFactory.getParser(sinkConfig, statsDReporter); + OdpfMessageSchema schema = odpfMessageParser.getSchema(modeAndSchema.getSecond()); + + Template keyTemplate = new Template(sinkConfig.getRowKeyTemplate()); + BigTableRowKeyParser bigTableRowKeyParser = new BigTableRowKeyParser(keyTemplate, schema); + bigTableRecordParser = new BigTableRecordParser( + odpfMessageParser, + bigTableRowKeyParser, + modeAndSchema, + schema, + bigtableSchema); + instrumentation.logInfo("Connection to bigtable established successfully"); + } catch (IOException | InvalidTemplateException e) { + throw new ConfigurationException("Exception occurred while creating sink", e); + } + } + + public OdpfSink create() { + return new BigTableSink( + bigTableClient, + bigTableRecordParser, + bigtableMetrics, + new Instrumentation(statsDReporter, BigTableSink.class)); + } +} diff --git a/src/main/java/io/odpf/depot/bigtable/client/BigTableClient.java b/src/main/java/io/odpf/depot/bigtable/client/BigTableClient.java new file mode 100644 index 00000000..2f37f715 --- /dev/null +++ b/src/main/java/io/odpf/depot/bigtable/client/BigTableClient.java @@ -0,0 +1,120 @@ +package io.odpf.depot.bigtable.client; + +import com.google.api.gax.core.FixedCredentialsProvider; +import com.google.auth.oauth2.GoogleCredentials; +import com.google.cloud.bigtable.admin.v2.BigtableTableAdminClient; +import com.google.cloud.bigtable.admin.v2.BigtableTableAdminSettings; +import com.google.cloud.bigtable.admin.v2.models.ColumnFamily; +import com.google.cloud.bigtable.data.v2.BigtableDataClient; +import com.google.cloud.bigtable.data.v2.BigtableDataSettings; +import com.google.cloud.bigtable.data.v2.models.BulkMutation; +import com.google.cloud.bigtable.data.v2.models.MutateRowsException; +import io.odpf.depot.bigtable.exception.BigTableInvalidSchemaException; +import io.odpf.depot.bigtable.model.BigTableRecord; +import io.odpf.depot.bigtable.model.BigTableSchema; +import io.odpf.depot.bigtable.response.BigTableResponse; +import io.odpf.depot.config.BigTableSinkConfig; +import io.odpf.depot.metrics.BigTableMetrics; +import io.odpf.depot.metrics.Instrumentation; + +import java.io.FileInputStream; +import java.io.IOException; +import java.time.Instant; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + +public class BigTableClient { + private final BigtableTableAdminClient bigtableTableAdminClient; + private final BigtableDataClient bigtableDataClient; + private final BigTableSinkConfig sinkConfig; + private final BigTableSchema bigtableSchema; + private final BigTableMetrics bigtableMetrics; + private final Instrumentation instrumentation; + + public BigTableClient(BigTableSinkConfig sinkConfig, BigTableSchema bigtableSchema, BigTableMetrics bigtableMetrics, Instrumentation instrumentation) throws IOException { + this(sinkConfig, getBigTableDataClient(sinkConfig), getBigTableAdminClient(sinkConfig), bigtableSchema, bigtableMetrics, instrumentation); + } + + public BigTableClient(BigTableSinkConfig sinkConfig, BigtableDataClient bigtableDataClient, BigtableTableAdminClient bigtableTableAdminClient, BigTableSchema bigtableSchema, BigTableMetrics bigtableMetrics, Instrumentation instrumentation) { + this.sinkConfig = sinkConfig; + this.bigtableDataClient = bigtableDataClient; + this.bigtableTableAdminClient = bigtableTableAdminClient; + this.bigtableSchema = bigtableSchema; + this.bigtableMetrics = bigtableMetrics; + this.instrumentation = instrumentation; + } + + private static BigtableDataClient getBigTableDataClient(BigTableSinkConfig sinkConfig) throws IOException { + BigtableDataSettings settings = BigtableDataSettings.newBuilder() + .setProjectId(sinkConfig.getGCloudProjectID()) + .setInstanceId(sinkConfig.getInstanceId()) + .setCredentialsProvider(FixedCredentialsProvider.create(GoogleCredentials.fromStream(new FileInputStream(sinkConfig.getCredentialPath())))) + .build(); + return BigtableDataClient.create(settings); + } + + private static BigtableTableAdminClient getBigTableAdminClient(BigTableSinkConfig sinkConfig) throws IOException { + BigtableTableAdminSettings settings = BigtableTableAdminSettings.newBuilder() + .setProjectId(sinkConfig.getGCloudProjectID()) + .setInstanceId(sinkConfig.getInstanceId()) + .setCredentialsProvider(FixedCredentialsProvider.create(GoogleCredentials.fromStream(new FileInputStream(sinkConfig.getCredentialPath())))) + .build(); + return BigtableTableAdminClient.create(settings); + } + + public BigTableResponse send(List records) { + BigTableResponse bigTableResponse = null; + BulkMutation batch = BulkMutation.create(sinkConfig.getTableId()); + records.forEach(record -> batch.add(record.getRowMutationEntry())); + try { + Instant startTime = Instant.now(); + bigtableDataClient.bulkMutateRows(batch); + instrument(startTime, batch.getEntryCount()); + } catch (MutateRowsException e) { + bigTableResponse = new BigTableResponse(e); + instrumentation.logError("Some entries failed to be applied. {}", e.getCause()); + } + return bigTableResponse; + } + + private void instrument(Instant startTime, long entryCount) { + instrumentation.captureDurationSince( + bigtableMetrics.getBigtableOperationLatencyMetric(), + startTime, + String.format(BigTableMetrics.BIGTABLE_INSTANCE_TAG, sinkConfig.getInstanceId()), + String.format(BigTableMetrics.BIGTABLE_TABLE_TAG, sinkConfig.getTableId())); + instrumentation.captureCount( + bigtableMetrics.getBigtableOperationTotalMetric(), + entryCount, + String.format(BigTableMetrics.BIGTABLE_INSTANCE_TAG, sinkConfig.getInstanceId()), + String.format(BigTableMetrics.BIGTABLE_TABLE_TAG, sinkConfig.getTableId())); + } + + public void validateBigTableSchema() throws BigTableInvalidSchemaException { + String tableId = sinkConfig.getTableId(); + instrumentation.logDebug(String.format("Validating schema for table: %s...", tableId)); + checkIfTableExists(tableId); + checkIfColumnFamiliesExist(tableId); + instrumentation.logDebug("Validation complete, Schema is valid."); + } + + private void checkIfTableExists(String tableId) throws BigTableInvalidSchemaException { + if (!bigtableTableAdminClient.exists(tableId)) { + throw new BigTableInvalidSchemaException(String.format("Table: %s does not exist!", tableId)); + } + } + + private void checkIfColumnFamiliesExist(String tableId) throws BigTableInvalidSchemaException { + Set existingColumnFamilies = bigtableTableAdminClient.getTable(tableId) + .getColumnFamilies() + .stream() + .map(ColumnFamily::getId) + .collect(Collectors.toSet()); + Set missingColumnFamilies = bigtableSchema.getMissingColumnFamilies(existingColumnFamilies); + if (missingColumnFamilies.size() > 0) { + throw new BigTableInvalidSchemaException( + String.format("Column families %s do not exist in table %s!", missingColumnFamilies, tableId)); + } + } +} diff --git a/src/main/java/io/odpf/depot/bigtable/exception/BigTableInvalidSchemaException.java b/src/main/java/io/odpf/depot/bigtable/exception/BigTableInvalidSchemaException.java new file mode 100644 index 00000000..fba1a8b7 --- /dev/null +++ b/src/main/java/io/odpf/depot/bigtable/exception/BigTableInvalidSchemaException.java @@ -0,0 +1,11 @@ +package io.odpf.depot.bigtable.exception; + +public class BigTableInvalidSchemaException extends RuntimeException { + public BigTableInvalidSchemaException(String message, Throwable cause) { + super(message, cause); + } + + public BigTableInvalidSchemaException(String messsage) { + super(messsage); + } +} diff --git a/src/main/java/io/odpf/depot/bigtable/model/BigTableRecord.java b/src/main/java/io/odpf/depot/bigtable/model/BigTableRecord.java new file mode 100644 index 00000000..5cea4b51 --- /dev/null +++ b/src/main/java/io/odpf/depot/bigtable/model/BigTableRecord.java @@ -0,0 +1,21 @@ +package io.odpf.depot.bigtable.model; + +import com.google.cloud.bigtable.data.v2.models.RowMutationEntry; +import io.odpf.depot.error.ErrorInfo; +import lombok.AllArgsConstructor; +import lombok.Getter; + +import java.util.Map; + +@AllArgsConstructor +@Getter +public class BigTableRecord { + private final RowMutationEntry rowMutationEntry; + private final long index; + private final ErrorInfo errorInfo; + private final Map metadata; + + public boolean isValid() { + return errorInfo == null; + } +} diff --git a/src/main/java/io/odpf/depot/bigtable/model/BigTableSchema.java b/src/main/java/io/odpf/depot/bigtable/model/BigTableSchema.java new file mode 100644 index 00000000..3311eb2d --- /dev/null +++ b/src/main/java/io/odpf/depot/bigtable/model/BigTableSchema.java @@ -0,0 +1,44 @@ +package io.odpf.depot.bigtable.model; + +import io.odpf.depot.exception.ConfigurationException; +import org.json.JSONObject; + +import java.util.HashSet; +import java.util.Set; + +public class BigTableSchema { + + private final JSONObject columnFamilyMapping; + + public BigTableSchema(String columnMapping) { + if (columnMapping == null || columnMapping.isEmpty()) { + throw new ConfigurationException("Column Mapping should not be empty or null"); + } + this.columnFamilyMapping = new JSONObject(columnMapping); + } + + public String getField(String columnFamily, String columnName) { + JSONObject columns = columnFamilyMapping.getJSONObject(columnFamily); + return columns.getString(columnName); + } + + public Set getColumnFamilies() { + return columnFamilyMapping.keySet(); + } + + public Set getColumns(String family) { + return columnFamilyMapping.getJSONObject(family).keySet(); + } + + /** + * Returns missing column families. + * + * @param existingColumnFamilies existing column families in a table. + * @return set of missing column families + */ + public Set getMissingColumnFamilies(Set existingColumnFamilies) { + Set tempSet = new HashSet<>(getColumnFamilies()); + tempSet.removeAll(existingColumnFamilies); + return tempSet; + } +} diff --git a/src/main/java/io/odpf/depot/bigtable/parser/BigTableRecordParser.java b/src/main/java/io/odpf/depot/bigtable/parser/BigTableRecordParser.java new file mode 100644 index 00000000..8e5ffa61 --- /dev/null +++ b/src/main/java/io/odpf/depot/bigtable/parser/BigTableRecordParser.java @@ -0,0 +1,85 @@ +package io.odpf.depot.bigtable.parser; + +import com.google.cloud.bigtable.data.v2.models.RowMutationEntry; +import io.odpf.depot.bigtable.model.BigTableRecord; +import io.odpf.depot.bigtable.model.BigTableSchema; +import io.odpf.depot.common.Tuple; +import io.odpf.depot.error.ErrorInfo; +import io.odpf.depot.error.ErrorType; +import io.odpf.depot.exception.ConfigurationException; +import io.odpf.depot.exception.DeserializerException; +import io.odpf.depot.exception.EmptyMessageException; +import io.odpf.depot.message.OdpfMessage; +import io.odpf.depot.message.ParsedOdpfMessage; +import io.odpf.depot.message.OdpfMessageSchema; +import io.odpf.depot.message.OdpfMessageParser; +import io.odpf.depot.message.SinkConnectorSchemaMessageMode; +import lombok.extern.slf4j.Slf4j; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +@Slf4j +public class BigTableRecordParser { + private final OdpfMessageParser odpfMessageParser; + private final BigTableRowKeyParser bigTableRowKeyParser; + private final BigTableSchema bigTableSchema; + private final OdpfMessageSchema schema; + private final Tuple modeAndSchema; + + public BigTableRecordParser(OdpfMessageParser odpfMessageParser, + BigTableRowKeyParser bigTableRowKeyParser, + Tuple modeAndSchema, + OdpfMessageSchema schema, + BigTableSchema bigTableSchema) { + this.odpfMessageParser = odpfMessageParser; + this.bigTableRowKeyParser = bigTableRowKeyParser; + this.modeAndSchema = modeAndSchema; + this.schema = schema; + this.bigTableSchema = bigTableSchema; + } + + public List convert(List messages) { + ArrayList records = new ArrayList<>(); + for (int index = 0; index < messages.size(); index++) { + OdpfMessage message = messages.get(index); + BigTableRecord record = createRecord(message, index); + records.add(record); + } + return records; + } + + private BigTableRecord createRecord(OdpfMessage message, long index) { + try { + ParsedOdpfMessage parsedOdpfMessage = odpfMessageParser.parse(message, modeAndSchema.getFirst(), modeAndSchema.getSecond()); + String rowKey = bigTableRowKeyParser.parse(parsedOdpfMessage); + RowMutationEntry rowMutationEntry = RowMutationEntry.create(rowKey); + bigTableSchema.getColumnFamilies().forEach( + columnFamily -> bigTableSchema + .getColumns(columnFamily) + .forEach(column -> { + String fieldName = bigTableSchema.getField(columnFamily, column); + String value = String.valueOf(parsedOdpfMessage.getFieldByName(fieldName, schema)); + rowMutationEntry.setCell(columnFamily, column, value); + })); + BigTableRecord bigTableRecord = new BigTableRecord(rowMutationEntry, index, null, message.getMetadata()); + if (log.isDebugEnabled()) { + log.debug(bigTableRecord.toString()); + } + return bigTableRecord; + } catch (EmptyMessageException e) { + return createErrorRecord(e, ErrorType.INVALID_MESSAGE_ERROR, index, message.getMetadata()); + } catch (ConfigurationException | IllegalArgumentException e) { + return createErrorRecord(e, ErrorType.UNKNOWN_FIELDS_ERROR, index, message.getMetadata()); + } catch (DeserializerException | IOException e) { + return createErrorRecord(e, ErrorType.DESERIALIZATION_ERROR, index, message.getMetadata()); + } + } + + private BigTableRecord createErrorRecord(Exception e, ErrorType type, long index, Map metadata) { + ErrorInfo errorInfo = new ErrorInfo(e, type); + return new BigTableRecord(null, index, errorInfo, metadata); + } +} diff --git a/src/main/java/io/odpf/depot/bigtable/parser/BigTableResponseParser.java b/src/main/java/io/odpf/depot/bigtable/parser/BigTableResponseParser.java new file mode 100644 index 00000000..4bf8bb7c --- /dev/null +++ b/src/main/java/io/odpf/depot/bigtable/parser/BigTableResponseParser.java @@ -0,0 +1,54 @@ +package io.odpf.depot.bigtable.parser; + +import com.google.cloud.bigtable.data.v2.models.MutateRowsException; +import io.odpf.depot.bigtable.model.BigTableRecord; +import io.odpf.depot.bigtable.response.BigTableResponse; +import io.odpf.depot.error.ErrorInfo; +import io.odpf.depot.error.ErrorType; +import io.odpf.depot.metrics.BigTableMetrics; +import io.odpf.depot.metrics.Instrumentation; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class BigTableResponseParser { + public static Map getErrorsFromSinkResponse(List validRecords, BigTableResponse bigTableResponse, BigTableMetrics bigtableMetrics, Instrumentation instrumentation) { + HashMap errorInfoMap = new HashMap<>(); + for (MutateRowsException.FailedMutation fm : bigTableResponse.getFailedMutations()) { + BigTableRecord record = validRecords.get(fm.getIndex()); + long messageIndex = record.getIndex(); + + String httpStatusCode = String.valueOf(fm.getError().getStatusCode().getCode().getHttpStatusCode()); + if (fm.getError().isRetryable()) { + errorInfoMap.put(messageIndex, new ErrorInfo(fm.getError(), ErrorType.SINK_RETRYABLE_ERROR)); + } else if (httpStatusCode.startsWith("4")) { + errorInfoMap.put(messageIndex, new ErrorInfo(fm.getError(), ErrorType.SINK_4XX_ERROR)); + } else if (httpStatusCode.startsWith("5")) { + errorInfoMap.put(messageIndex, new ErrorInfo(fm.getError(), ErrorType.SINK_5XX_ERROR)); + } else { + errorInfoMap.put(messageIndex, new ErrorInfo(fm.getError(), ErrorType.SINK_UNKNOWN_ERROR)); + } + + instrumentation.logError("Error while inserting to Bigtable. Record Metadata: {}, Cause: {}, Reason: {}, StatusCode: {}, HttpCode: {}", + record.getMetadata(), + fm.getError().getCause(), + fm.getError().getReason(), + fm.getError().getStatusCode().getCode(), + fm.getError().getStatusCode().getCode().getHttpStatusCode()); + + if (fm.getError().getErrorDetails() == null) { + instrumentation.incrementCounter(bigtableMetrics.getBigtableTotalErrorsMetrics(), String.format(BigTableMetrics.BIGTABLE_ERROR_TAG, BigTableMetrics.BigTableErrorType.RPC_FAILURE)); + } else if (fm.getError().getErrorDetails().getBadRequest() != null) { + instrumentation.incrementCounter(bigtableMetrics.getBigtableTotalErrorsMetrics(), String.format(BigTableMetrics.BIGTABLE_ERROR_TAG, BigTableMetrics.BigTableErrorType.BAD_REQUEST)); + } else if (fm.getError().getErrorDetails().getQuotaFailure() != null) { + instrumentation.incrementCounter(bigtableMetrics.getBigtableTotalErrorsMetrics(), String.format(BigTableMetrics.BIGTABLE_ERROR_TAG, BigTableMetrics.BigTableErrorType.QUOTA_FAILURE)); + } else if (fm.getError().getErrorDetails().getPreconditionFailure() != null) { + instrumentation.incrementCounter(bigtableMetrics.getBigtableTotalErrorsMetrics(), String.format(BigTableMetrics.BIGTABLE_ERROR_TAG, BigTableMetrics.BigTableErrorType.PRECONDITION_FAILURE)); + } else { + instrumentation.incrementCounter(bigtableMetrics.getBigtableTotalErrorsMetrics(), String.format(BigTableMetrics.BIGTABLE_ERROR_TAG, BigTableMetrics.BigTableErrorType.RPC_FAILURE)); + } + } + return errorInfoMap; + } +} diff --git a/src/main/java/io/odpf/depot/bigtable/parser/BigTableRowKeyParser.java b/src/main/java/io/odpf/depot/bigtable/parser/BigTableRowKeyParser.java new file mode 100644 index 00000000..1130031c --- /dev/null +++ b/src/main/java/io/odpf/depot/bigtable/parser/BigTableRowKeyParser.java @@ -0,0 +1,16 @@ +package io.odpf.depot.bigtable.parser; + +import io.odpf.depot.common.Template; +import io.odpf.depot.message.OdpfMessageSchema; +import io.odpf.depot.message.ParsedOdpfMessage; +import lombok.AllArgsConstructor; + +@AllArgsConstructor +public class BigTableRowKeyParser { + private final Template keyTemplate; + private final OdpfMessageSchema schema; + + public String parse(ParsedOdpfMessage parsedOdpfMessage) { + return keyTemplate.parse(parsedOdpfMessage, schema); + } +} diff --git a/src/main/java/io/odpf/depot/bigtable/response/BigTableResponse.java b/src/main/java/io/odpf/depot/bigtable/response/BigTableResponse.java new file mode 100644 index 00000000..e74116f7 --- /dev/null +++ b/src/main/java/io/odpf/depot/bigtable/response/BigTableResponse.java @@ -0,0 +1,23 @@ +package io.odpf.depot.bigtable.response; + +import com.google.cloud.bigtable.data.v2.models.MutateRowsException; +import lombok.Getter; + +import java.util.List; + +@Getter +public class BigTableResponse { + private final List failedMutations; + + public BigTableResponse(MutateRowsException e) { + failedMutations = e.getFailedMutations(); + } + + public boolean hasErrors() { + return !failedMutations.isEmpty(); + } + + public int getErrorCount() { + return failedMutations.size(); + } +} diff --git a/src/main/java/io/odpf/depot/redis/parsers/Template.java b/src/main/java/io/odpf/depot/common/Template.java similarity index 80% rename from src/main/java/io/odpf/depot/redis/parsers/Template.java rename to src/main/java/io/odpf/depot/common/Template.java index 55e605e2..dc62ff3d 100644 --- a/src/main/java/io/odpf/depot/redis/parsers/Template.java +++ b/src/main/java/io/odpf/depot/common/Template.java @@ -1,6 +1,7 @@ -package io.odpf.depot.redis.parsers; +package io.odpf.depot.common; import com.google.common.base.Splitter; +import io.odpf.depot.exception.InvalidTemplateException; import io.odpf.depot.message.OdpfMessageSchema; import io.odpf.depot.message.ParsedOdpfMessage; import io.odpf.depot.utils.StringUtils; @@ -12,9 +13,9 @@ public class Template { private final String templatePattern; private final List patternVariableFieldNames; - public Template(String template) { + public Template(String template) throws InvalidTemplateException { if (template == null || template.isEmpty()) { - throw new IllegalArgumentException("Template '" + template + "' is invalid"); + throw new InvalidTemplateException("Template cannot be empty"); } List templateStrings = new ArrayList<>(); Splitter.on(",").omitEmptyStrings().split(template).forEach(s -> templateStrings.add(s.trim())); @@ -23,12 +24,12 @@ public Template(String template) { validate(); } - private void validate() { + private void validate() throws InvalidTemplateException { int validArgs = StringUtils.countVariables(templatePattern); int values = patternVariableFieldNames.size(); int variables = StringUtils.count(templatePattern, '%'); if (validArgs != values || variables != values) { - throw new IllegalArgumentException(String.format("Template is not valid, variables=%d, validArgs=%d, values=%d", variables, validArgs, values)); + throw new InvalidTemplateException(String.format("Template is not valid, variables=%d, validArgs=%d, values=%d", variables, validArgs, values)); } } diff --git a/src/main/java/io/odpf/depot/config/BigTableSinkConfig.java b/src/main/java/io/odpf/depot/config/BigTableSinkConfig.java new file mode 100644 index 00000000..675c9207 --- /dev/null +++ b/src/main/java/io/odpf/depot/config/BigTableSinkConfig.java @@ -0,0 +1,24 @@ +package io.odpf.depot.config; + +import org.aeonbits.owner.Config; + +@Config.DisableFeature(Config.DisableableFeature.PARAMETER_FORMATTING) +public interface BigTableSinkConfig extends OdpfSinkConfig { + @Key("SINK_BIGTABLE_GOOGLE_CLOUD_PROJECT_ID") + String getGCloudProjectID(); + + @Key("SINK_BIGTABLE_INSTANCE_ID") + String getInstanceId(); + + @Key("SINK_BIGTABLE_TABLE_ID") + String getTableId(); + + @Key("SINK_BIGTABLE_CREDENTIAL_PATH") + String getCredentialPath(); + + @Key("SINK_BIGTABLE_ROW_KEY_TEMPLATE") + String getRowKeyTemplate(); + + @Key("SINK_BIGTABLE_COLUMN_FAMILY_MAPPING") + String getColumnFamilyMapping(); +} diff --git a/src/main/java/io/odpf/depot/config/OdpfSinkConfig.java b/src/main/java/io/odpf/depot/config/OdpfSinkConfig.java index b183b35a..0d36e1d5 100644 --- a/src/main/java/io/odpf/depot/config/OdpfSinkConfig.java +++ b/src/main/java/io/odpf/depot/config/OdpfSinkConfig.java @@ -42,7 +42,7 @@ public interface OdpfSinkConfig extends Config { List
getSchemaRegistryStencilFetchHeaders(); @Key("SCHEMA_REGISTRY_STENCIL_CACHE_AUTO_REFRESH") - @DefaultValue("false") + @DefaultValue("true") Boolean getSchemaRegistryStencilCacheAutoRefresh(); @Key("SCHEMA_REGISTRY_STENCIL_CACHE_TTL_MS") diff --git a/src/main/java/io/odpf/depot/error/ErrorType.java b/src/main/java/io/odpf/depot/error/ErrorType.java index bb3c5a28..e11f1c15 100644 --- a/src/main/java/io/odpf/depot/error/ErrorType.java +++ b/src/main/java/io/odpf/depot/error/ErrorType.java @@ -6,6 +6,7 @@ public enum ErrorType { UNKNOWN_FIELDS_ERROR, SINK_4XX_ERROR, SINK_5XX_ERROR, + SINK_RETRYABLE_ERROR, SINK_UNKNOWN_ERROR, - DEFAULT_ERROR + DEFAULT_ERROR //Deprecated } diff --git a/src/main/java/io/odpf/depot/exception/ConfigurationException.java b/src/main/java/io/odpf/depot/exception/ConfigurationException.java index 5ec3e107..9fb93bca 100644 --- a/src/main/java/io/odpf/depot/exception/ConfigurationException.java +++ b/src/main/java/io/odpf/depot/exception/ConfigurationException.java @@ -4,4 +4,8 @@ public class ConfigurationException extends RuntimeException { public ConfigurationException(String message) { super(message); } + + public ConfigurationException(String message, Throwable cause) { + super(message, cause); + } } diff --git a/src/main/java/io/odpf/depot/exception/InvalidTemplateException.java b/src/main/java/io/odpf/depot/exception/InvalidTemplateException.java new file mode 100644 index 00000000..db618f4a --- /dev/null +++ b/src/main/java/io/odpf/depot/exception/InvalidTemplateException.java @@ -0,0 +1,7 @@ +package io.odpf.depot.exception; + +public class InvalidTemplateException extends Exception { + public InvalidTemplateException(String message) { + super(message); + } +} diff --git a/src/main/java/io/odpf/depot/metrics/BigTableMetrics.java b/src/main/java/io/odpf/depot/metrics/BigTableMetrics.java new file mode 100644 index 00000000..125a8982 --- /dev/null +++ b/src/main/java/io/odpf/depot/metrics/BigTableMetrics.java @@ -0,0 +1,35 @@ +package io.odpf.depot.metrics; + +import io.odpf.depot.config.OdpfSinkConfig; + +public class BigTableMetrics extends SinkMetrics { + + public static final String BIGTABLE_SINK_PREFIX = "bigtable_"; + public static final String BIGTABLE_INSTANCE_TAG = "instance=%s"; + public static final String BIGTABLE_TABLE_TAG = "table=%s"; + public static final String BIGTABLE_ERROR_TAG = "error=%s"; + + public BigTableMetrics(OdpfSinkConfig config) { + super(config); + } + + + public enum BigTableErrorType { + QUOTA_FAILURE, // A quota check failed. + PRECONDITION_FAILURE, // Some preconditions have failed. + BAD_REQUEST, // Violations in a client request + RPC_FAILURE, + } + + public String getBigtableOperationLatencyMetric() { + return getApplicationPrefix() + SINK_PREFIX + BIGTABLE_SINK_PREFIX + "operation_latency_milliseconds"; + } + + public String getBigtableOperationTotalMetric() { + return getApplicationPrefix() + SINK_PREFIX + BIGTABLE_SINK_PREFIX + "operation_total"; + } + + public String getBigtableTotalErrorsMetrics() { + return getApplicationPrefix() + SINK_PREFIX + BIGTABLE_SINK_PREFIX + "errors_total"; + } +} diff --git a/src/main/java/io/odpf/depot/redis/parsers/RedisEntryParserFactory.java b/src/main/java/io/odpf/depot/redis/parsers/RedisEntryParserFactory.java index 8f519b87..974522c6 100644 --- a/src/main/java/io/odpf/depot/redis/parsers/RedisEntryParserFactory.java +++ b/src/main/java/io/odpf/depot/redis/parsers/RedisEntryParserFactory.java @@ -1,6 +1,8 @@ package io.odpf.depot.redis.parsers; +import io.odpf.depot.common.Template; import io.odpf.depot.config.RedisSinkConfig; +import io.odpf.depot.exception.InvalidTemplateException; import io.odpf.depot.message.OdpfMessageSchema; import io.odpf.depot.metrics.StatsDReporter; @@ -17,7 +19,12 @@ public static RedisEntryParser getRedisEntryParser( RedisSinkConfig redisSinkConfig, StatsDReporter statsDReporter, OdpfMessageSchema schema) { - Template keyTemplate = new Template(redisSinkConfig.getSinkRedisKeyTemplate()); + Template keyTemplate; + try { + keyTemplate = new Template(redisSinkConfig.getSinkRedisKeyTemplate()); + } catch (InvalidTemplateException e) { + throw new IllegalArgumentException(e.getMessage()); + } switch (redisSinkConfig.getSinkRedisDataType()) { case KEYVALUE: String fieldName = redisSinkConfig.getSinkRedisKeyValueDataFieldName(); @@ -36,8 +43,15 @@ public static RedisEntryParser getRedisEntryParser( if (properties == null || properties.isEmpty()) { throw new IllegalArgumentException("Empty config SINK_REDIS_HASHSET_FIELD_TO_COLUMN_MAPPING found"); } + Map fieldTemplates = properties.entrySet().stream().collect(Collectors.toMap( - kv -> kv.getKey().toString(), kv -> new Template(kv.getValue().toString()) + kv -> kv.getKey().toString(), kv -> { + try { + return new Template(kv.getValue().toString()); + } catch (InvalidTemplateException e) { + throw new IllegalArgumentException(e.getMessage()); + } + } )); return new RedisHashSetEntryParser(statsDReporter, keyTemplate, fieldTemplates, schema); } diff --git a/src/main/java/io/odpf/depot/redis/parsers/RedisHashSetEntryParser.java b/src/main/java/io/odpf/depot/redis/parsers/RedisHashSetEntryParser.java index 74e000f5..209a341b 100644 --- a/src/main/java/io/odpf/depot/redis/parsers/RedisHashSetEntryParser.java +++ b/src/main/java/io/odpf/depot/redis/parsers/RedisHashSetEntryParser.java @@ -1,5 +1,6 @@ package io.odpf.depot.redis.parsers; +import io.odpf.depot.common.Template; import io.odpf.depot.message.OdpfMessageSchema; import io.odpf.depot.message.ParsedOdpfMessage; import io.odpf.depot.metrics.Instrumentation; diff --git a/src/main/java/io/odpf/depot/redis/parsers/RedisKeyValueEntryParser.java b/src/main/java/io/odpf/depot/redis/parsers/RedisKeyValueEntryParser.java index 51e31b1c..89546cbd 100644 --- a/src/main/java/io/odpf/depot/redis/parsers/RedisKeyValueEntryParser.java +++ b/src/main/java/io/odpf/depot/redis/parsers/RedisKeyValueEntryParser.java @@ -1,5 +1,6 @@ package io.odpf.depot.redis.parsers; +import io.odpf.depot.common.Template; import io.odpf.depot.message.OdpfMessageSchema; import io.odpf.depot.message.ParsedOdpfMessage; import io.odpf.depot.metrics.Instrumentation; diff --git a/src/main/java/io/odpf/depot/redis/parsers/RedisListEntryParser.java b/src/main/java/io/odpf/depot/redis/parsers/RedisListEntryParser.java index 3a730746..6db0e6b8 100644 --- a/src/main/java/io/odpf/depot/redis/parsers/RedisListEntryParser.java +++ b/src/main/java/io/odpf/depot/redis/parsers/RedisListEntryParser.java @@ -1,6 +1,7 @@ package io.odpf.depot.redis.parsers; +import io.odpf.depot.common.Template; import io.odpf.depot.message.OdpfMessageSchema; import io.odpf.depot.message.ParsedOdpfMessage; import io.odpf.depot.metrics.Instrumentation; diff --git a/src/test/java/io/odpf/depot/bigtable/BigTableSinkTest.java b/src/test/java/io/odpf/depot/bigtable/BigTableSinkTest.java new file mode 100644 index 00000000..b7442882 --- /dev/null +++ b/src/test/java/io/odpf/depot/bigtable/BigTableSinkTest.java @@ -0,0 +1,92 @@ +package io.odpf.depot.bigtable; + +import com.google.cloud.bigtable.data.v2.models.RowMutationEntry; +import io.odpf.depot.OdpfSinkResponse; +import io.odpf.depot.TestBookingLogKey; +import io.odpf.depot.TestBookingLogMessage; +import io.odpf.depot.TestServiceType; +import io.odpf.depot.bigtable.client.BigTableClient; +import io.odpf.depot.bigtable.model.BigTableRecord; +import io.odpf.depot.bigtable.parser.BigTableRecordParser; +import io.odpf.depot.error.ErrorInfo; +import io.odpf.depot.error.ErrorType; +import io.odpf.depot.message.OdpfMessage; +import io.odpf.depot.metrics.BigTableMetrics; +import io.odpf.depot.metrics.Instrumentation; +import io.odpf.depot.metrics.StatsDReporter; +import org.aeonbits.owner.util.Collections; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.MockitoAnnotations; + +import java.util.List; + +public class BigTableSinkTest { + + @Mock + private BigTableRecordParser bigTableRecordParser; + @Mock + private BigTableClient bigTableClient; + @Mock + private StatsDReporter statsDReporter; + @Mock + private BigTableMetrics bigtableMetrics; + + private BigTableSink bigTableSink; + private List messages; + private List validRecords; + private List invalidRecords; + private ErrorInfo errorInfo; + + @Before + public void setUp() { + MockitoAnnotations.openMocks(this); + TestBookingLogKey bookingLogKey1 = TestBookingLogKey.newBuilder().setOrderNumber("order#1").setOrderUrl("order-url#1").build(); + TestBookingLogMessage bookingLogMessage1 = TestBookingLogMessage.newBuilder().setOrderNumber("order#1").setOrderUrl("order-url#1").setServiceType(TestServiceType.Enum.GO_SEND).build(); + TestBookingLogKey bookingLogKey2 = TestBookingLogKey.newBuilder().setOrderNumber("order#2").setOrderUrl("order-url#2").build(); + TestBookingLogMessage bookingLogMessage2 = TestBookingLogMessage.newBuilder().setOrderNumber("order#2").setOrderUrl("order-url#2").setServiceType(TestServiceType.Enum.GO_SHOP).build(); + + OdpfMessage message1 = new OdpfMessage(bookingLogKey1.toByteArray(), bookingLogMessage1.toByteArray()); + OdpfMessage message2 = new OdpfMessage(bookingLogKey2.toByteArray(), bookingLogMessage2.toByteArray()); + messages = Collections.list(message1, message2); + + RowMutationEntry rowMutationEntry = RowMutationEntry.create("rowKey").setCell("family", "qualifier", "value"); + BigTableRecord bigTableRecord1 = new BigTableRecord(rowMutationEntry, 1, null, message1.getMetadata()); + BigTableRecord bigTableRecord2 = new BigTableRecord(rowMutationEntry, 2, null, message2.getMetadata()); + validRecords = Collections.list(bigTableRecord1, bigTableRecord2); + + errorInfo = new ErrorInfo(new Exception("test-exception-message"), ErrorType.DEFAULT_ERROR); + BigTableRecord bigTableRecord3 = new BigTableRecord(null, 3, errorInfo, message1.getMetadata()); + BigTableRecord bigTableRecord4 = new BigTableRecord(null, 4, errorInfo, message2.getMetadata()); + invalidRecords = Collections.list(bigTableRecord3, bigTableRecord4); + + bigTableSink = new BigTableSink(bigTableClient, bigTableRecordParser, bigtableMetrics, new Instrumentation(statsDReporter, BigTableSink.class)); + } + + @Test + public void shouldSendValidBigTableRecordsToBigTableSink() { + Mockito.when(bigTableRecordParser.convert(messages)).thenReturn(validRecords); + Mockito.when(bigTableClient.send(validRecords)).thenReturn(null); + + OdpfSinkResponse response = bigTableSink.pushToSink(messages); + + Mockito.verify(bigTableClient, Mockito.times(1)).send(validRecords); + Assert.assertEquals(0, response.getErrors().size()); + } + + @Test + public void shouldAddErrorsFromInvalidRecordsToOdpfResponse() { + Mockito.when(bigTableRecordParser.convert(messages)).thenReturn(invalidRecords); + + OdpfSinkResponse response = bigTableSink.pushToSink(messages); + + Mockito.verify(bigTableClient, Mockito.times(0)).send(validRecords); + Assert.assertTrue(response.hasErrors()); + Assert.assertEquals(2, response.getErrors().size()); + Assert.assertEquals(errorInfo, response.getErrorsFor(3)); + Assert.assertEquals(errorInfo, response.getErrorsFor(4)); + } +} diff --git a/src/test/java/io/odpf/depot/bigtable/client/BigTableClientTest.java b/src/test/java/io/odpf/depot/bigtable/client/BigTableClientTest.java new file mode 100644 index 00000000..f2524e66 --- /dev/null +++ b/src/test/java/io/odpf/depot/bigtable/client/BigTableClientTest.java @@ -0,0 +1,155 @@ +package io.odpf.depot.bigtable.client; + +import com.google.api.gax.rpc.ApiException; +import com.google.bigtable.admin.v2.ColumnFamily; +import com.google.cloud.bigtable.admin.v2.BigtableTableAdminClient; +import com.google.cloud.bigtable.admin.v2.models.Table; +import com.google.cloud.bigtable.data.v2.BigtableDataClient; +import com.google.cloud.bigtable.data.v2.models.BulkMutation; +import com.google.cloud.bigtable.data.v2.models.MutateRowsException; +import com.google.cloud.bigtable.data.v2.models.RowMutationEntry; +import io.odpf.depot.TestBookingLogKey; +import io.odpf.depot.TestBookingLogMessage; +import io.odpf.depot.TestServiceType; +import io.odpf.depot.bigtable.exception.BigTableInvalidSchemaException; +import io.odpf.depot.bigtable.model.BigTableRecord; +import io.odpf.depot.bigtable.model.BigTableSchema; +import io.odpf.depot.bigtable.response.BigTableResponse; +import io.odpf.depot.config.BigTableSinkConfig; +import io.odpf.depot.message.OdpfMessage; +import io.odpf.depot.message.SinkConnectorSchemaMessageMode; +import io.odpf.depot.metrics.BigTableMetrics; +import io.odpf.depot.metrics.Instrumentation; +import org.aeonbits.owner.ConfigFactory; +import org.aeonbits.owner.util.Collections; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.MockitoAnnotations; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import static org.mockito.ArgumentMatchers.isA; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.when; +import static org.mockito.Mockito.doThrow; + +public class BigTableClientTest { + + @Mock + private BigtableDataClient bigTableDataClient; + @Mock + private BigtableTableAdminClient bigtableTableAdminClient; + @Mock + private ApiException apiException; + @Mock + private BigTableMetrics bigtableMetrics; + @Mock + private Instrumentation instrumentation; + + private BigTableClient bigTableClient; + private List validRecords; + private BigTableSinkConfig sinkConfig; + + @Before + public void setUp() throws IOException { + MockitoAnnotations.openMocks(this); + System.setProperty("SINK_CONNECTOR_SCHEMA_PROTO_MESSAGE_CLASS", "io.odpf.depot.TestBookingLogMessage"); + System.setProperty("SINK_CONNECTOR_SCHEMA_MESSAGE_MODE", String.valueOf(SinkConnectorSchemaMessageMode.LOG_MESSAGE)); + System.setProperty("SINK_BIGTABLE_GOOGLE_CLOUD_PROJECT_ID", "test-gcloud-project"); + System.setProperty("SINK_BIGTABLE_INSTANCE_ID", "test-instance"); + System.setProperty("SINK_BIGTABLE_TABLE_ID", "test-table"); + System.setProperty("SINK_BIGTABLE_CREDENTIAL_PATH", "Users/github/bigtable/test-credential"); + System.setProperty("SINK_BIGTABLE_COLUMN_FAMILY_MAPPING", "{ \"family-test\" : { \"qualifier_name1\" : \"input_field1\", \"qualifier_name2\" : \"input_field2\"} }"); + System.setProperty("SINK_BIGTABLE_ROW_KEY_TEMPLATE", "row-key-constant-string"); + + TestBookingLogKey bookingLogKey1 = TestBookingLogKey.newBuilder().setOrderNumber("order#1").setOrderUrl("order-url#1").build(); + TestBookingLogMessage bookingLogMessage1 = TestBookingLogMessage.newBuilder().setOrderNumber("order#1").setOrderUrl("order-url#1").setServiceType(TestServiceType.Enum.GO_SEND).build(); + TestBookingLogKey bookingLogKey2 = TestBookingLogKey.newBuilder().setOrderNumber("order#2").setOrderUrl("order-url#2").build(); + TestBookingLogMessage bookingLogMessage2 = TestBookingLogMessage.newBuilder().setOrderNumber("order#2").setOrderUrl("order-url#2").setServiceType(TestServiceType.Enum.GO_SHOP).build(); + + OdpfMessage message1 = new OdpfMessage(bookingLogKey1.toByteArray(), bookingLogMessage1.toByteArray()); + OdpfMessage message2 = new OdpfMessage(bookingLogKey2.toByteArray(), bookingLogMessage2.toByteArray()); + + RowMutationEntry rowMutationEntry = RowMutationEntry.create("rowKey").setCell("family", "qualifier", "value"); + BigTableRecord bigTableRecord1 = new BigTableRecord(rowMutationEntry, 1, null, message1.getMetadata()); + BigTableRecord bigTableRecord2 = new BigTableRecord(rowMutationEntry, 2, null, message2.getMetadata()); + validRecords = Collections.list(bigTableRecord1, bigTableRecord2); + sinkConfig = ConfigFactory.create(BigTableSinkConfig.class, System.getProperties()); + BigTableSchema schema = new BigTableSchema(sinkConfig.getColumnFamilyMapping()); + bigTableClient = new BigTableClient(sinkConfig, bigTableDataClient, bigtableTableAdminClient, schema, bigtableMetrics, instrumentation); + } + + @Test + public void shouldReturnNullBigTableResponseWhenBulkMutateRowsDoesNotThrowAnException() { + doNothing().when(bigTableDataClient).bulkMutateRows(isA(BulkMutation.class)); + + BigTableResponse bigTableResponse = bigTableClient.send(validRecords); + + Assert.assertNull(bigTableResponse); + } + + @Test + public void shouldReturnBigTableResponseWithFailedMutationsWhenBulkMutateRowsThrowsMutateRowsException() { + List failedMutations = new ArrayList<>(); + failedMutations.add(MutateRowsException.FailedMutation.create(0, apiException)); + failedMutations.add(MutateRowsException.FailedMutation.create(1, apiException)); + MutateRowsException mutateRowsException = new MutateRowsException(null, failedMutations, false); + + doThrow(mutateRowsException).when(bigTableDataClient).bulkMutateRows(isA(BulkMutation.class)); + + BigTableResponse bigTableResponse = bigTableClient.send(validRecords); + + Assert.assertTrue(bigTableResponse.hasErrors()); + Assert.assertEquals(2, bigTableResponse.getFailedMutations().size()); + Mockito.verify(instrumentation, Mockito.times(1)).logError("Some entries failed to be applied. {}", mutateRowsException.getCause()); + } + + @Test + public void shouldThrowInvalidSchemaExceptionIfTableDoesNotExist() { + when(bigtableTableAdminClient.exists(sinkConfig.getTableId())).thenReturn(false); + try { + bigTableClient.validateBigTableSchema(); + } catch (BigTableInvalidSchemaException e) { + Assert.assertEquals("Table: " + sinkConfig.getTableId() + " does not exist!", e.getMessage()); + } + } + + @Test + public void shouldThrowInvalidSchemaExceptionIfColumnFamilyDoesNotExist() { + Table testTable = Table.fromProto(com.google.bigtable.admin.v2.Table.newBuilder() + .setName("projects/" + sinkConfig.getGCloudProjectID() + "/instances/" + sinkConfig.getInstanceId() + "/tables/" + sinkConfig.getTableId()) + .putColumnFamilies("existing-family-test", ColumnFamily.newBuilder().build()) + .build()); + + when(bigtableTableAdminClient.exists(sinkConfig.getTableId())).thenReturn(true); + when(bigtableTableAdminClient.getTable(sinkConfig.getTableId())).thenReturn(testTable); + try { + bigTableClient.validateBigTableSchema(); + } catch (BigTableInvalidSchemaException e) { + Assert.assertEquals("Column families [family-test] do not exist in table test-table!", e.getMessage()); + } + } + + @Test + public void shouldCaptureBigtableMetricsWhenBulkMutateRowsDoesNotThrowAnException() { + doNothing().when(bigTableDataClient).bulkMutateRows(isA(BulkMutation.class)); + + bigTableClient.send(validRecords); + + Mockito.verify(instrumentation, Mockito.times(1)).captureDurationSince(eq(bigtableMetrics.getBigtableOperationLatencyMetric()), + any(), + eq(String.format(BigTableMetrics.BIGTABLE_INSTANCE_TAG, sinkConfig.getInstanceId())), + eq(String.format(BigTableMetrics.BIGTABLE_TABLE_TAG, sinkConfig.getTableId()))); + Mockito.verify(instrumentation, Mockito.times(1)).captureCount(eq(bigtableMetrics.getBigtableOperationTotalMetric()), + any(), + eq(String.format(BigTableMetrics.BIGTABLE_INSTANCE_TAG, sinkConfig.getInstanceId())), + eq(String.format(BigTableMetrics.BIGTABLE_TABLE_TAG, sinkConfig.getTableId()))); + } +} diff --git a/src/test/java/io/odpf/depot/bigtable/model/BigTableSchemaTest.java b/src/test/java/io/odpf/depot/bigtable/model/BigTableSchemaTest.java new file mode 100644 index 00000000..811dbd34 --- /dev/null +++ b/src/test/java/io/odpf/depot/bigtable/model/BigTableSchemaTest.java @@ -0,0 +1,137 @@ +package io.odpf.depot.bigtable.model; + +import io.odpf.depot.config.BigTableSinkConfig; +import io.odpf.depot.exception.ConfigurationException; +import org.aeonbits.owner.ConfigFactory; +import org.json.JSONException; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.HashSet; +import java.util.Set; + +import static org.junit.jupiter.api.Assertions.*; + +public class BigTableSchemaTest { + private BigTableSchema bigtableSchema; + + @Before + public void setUp() { + System.setProperty("SINK_BIGTABLE_COLUMN_FAMILY_MAPPING", "{\n" + + "\"family_name1\" : {\n" + + "\"qualifier_name1\" : \"data.is_complete\",\n" + + "\"qualifier_name2\" : \"data.content\"\n" + + "},\n" + + "\"family_name2\" : {\n" + + "\"qualifier_name3\" : \"base_content3\",\n" + + "\"qualifier_name4\" : \"base_content4\"\n" + + "}\n" + + "}"); + BigTableSinkConfig sinkConfig = ConfigFactory.create(BigTableSinkConfig.class, System.getProperties()); + bigtableSchema = new BigTableSchema(sinkConfig.getColumnFamilyMapping()); + } + + @Test + public void shouldGeSetOfColumnFamilies() { + Set columnFamilies = bigtableSchema.getColumnFamilies(); + assertEquals(2, columnFamilies.size()); + assertTrue(columnFamilies.contains("family_name1")); + assertTrue(columnFamilies.contains("family_name2")); + } + + @Test + public void shouldGetFieldNameForGivenColumnFamilyAndQualifier() { + assertEquals("data.is_complete", bigtableSchema.getField("family_name1", "qualifier_name1")); + assertEquals("data.content", bigtableSchema.getField("family_name1", "qualifier_name2")); + + assertEquals("base_content3", bigtableSchema.getField("family_name2", "qualifier_name3")); + assertEquals("base_content4", bigtableSchema.getField("family_name2", "qualifier_name4")); + } + + @Test + public void shouldGetColumnsForGivenColumnFamily() { + assertEquals(2, bigtableSchema.getColumns("family_name1").size()); + assertTrue(bigtableSchema.getColumns("family_name1").contains("qualifier_name1")); + assertTrue(bigtableSchema.getColumns("family_name1").contains("qualifier_name2")); + + assertEquals(2, bigtableSchema.getColumns("family_name2").size()); + assertTrue(bigtableSchema.getColumns("family_name2").contains("qualifier_name3")); + assertTrue(bigtableSchema.getColumns("family_name2").contains("qualifier_name4")); + } + + @Test + public void shouldThrowConfigurationExceptionWhenColumnMappingIsEmpty() { + System.setProperty("SINK_BIGTABLE_COLUMN_FAMILY_MAPPING", ""); + BigTableSinkConfig sinkConfig = ConfigFactory.create(BigTableSinkConfig.class, System.getProperties()); + ConfigurationException configurationException = assertThrows(ConfigurationException.class, () -> new BigTableSchema(sinkConfig.getColumnFamilyMapping())); + Assert.assertEquals("Column Mapping should not be empty or null", configurationException.getMessage()); + } + + @Test + public void shouldReturnEmptySetIfNoColumnFamilies() { + System.setProperty("SINK_BIGTABLE_COLUMN_FAMILY_MAPPING", "{}"); + BigTableSinkConfig sinkConfig = ConfigFactory.create(BigTableSinkConfig.class, System.getProperties()); + bigtableSchema = new BigTableSchema(sinkConfig.getColumnFamilyMapping()); + Set columnFamilies = bigtableSchema.getColumnFamilies(); + Assert.assertEquals(0, columnFamilies.size()); + } + + @Test + public void shouldReturnEmptySetIfNoColumnsPresent() { + System.setProperty("SINK_BIGTABLE_COLUMN_FAMILY_MAPPING", "{\n" + + "\"family_name1\" : {\n" + + "\"qualifier_name1\" : \"data.is_complete\",\n" + + "\"qualifier_name2\" : \"data.content\"\n" + + "},\n" + + "\"family_name2\" : {}\n" + + "}"); + BigTableSinkConfig sinkConfig = ConfigFactory.create(BigTableSinkConfig.class, System.getProperties()); + bigtableSchema = new BigTableSchema(sinkConfig.getColumnFamilyMapping()); + Set columnFamilies = bigtableSchema.getColumnFamilies(); + Assert.assertEquals(2, columnFamilies.size()); + Set columns = bigtableSchema.getColumns("family_name2"); + Assert.assertEquals(0, columns.size()); + } + + @Test + public void shouldThrowJsonException() { + System.setProperty("SINK_BIGTABLE_COLUMN_FAMILY_MAPPING", "{\n" + + "\"family_name1\" : {\n" + + "\"qualifier_name1\" : \"data.is_complete\",\n" + + "\"qualifier_name2\" : \"data.content\"\n" + + "},\n" + + "\"family_name2\" : {}\n" + + "}"); + BigTableSinkConfig sinkConfig = ConfigFactory.create(BigTableSinkConfig.class, System.getProperties()); + bigtableSchema = new BigTableSchema(sinkConfig.getColumnFamilyMapping()); + Set columnFamilies = bigtableSchema.getColumnFamilies(); + Assert.assertEquals(2, columnFamilies.size()); + JSONException jsonException = assertThrows(JSONException.class, () -> bigtableSchema.getColumns("family_name3")); + Assert.assertEquals("JSONObject[\"family_name3\"] not found.", jsonException.getMessage()); + + jsonException = assertThrows(JSONException.class, () -> bigtableSchema.getField("family_name1", "qualifier_name3")); + Assert.assertEquals("JSONObject[\"qualifier_name3\"] not found.", jsonException.getMessage()); + } + + @Test + public void shouldReturnEmptySetOfMissingColumnFamilies() { + Set missingColumnFamilies = bigtableSchema.getMissingColumnFamilies(new HashSet() {{ + add("family_name1"); + add("family_name2"); + }}); + Assert.assertEquals(0, missingColumnFamilies.size()); + } + + @Test + public void shouldReturnMissingColumnFamilies() { + Set missingColumnFamilies = bigtableSchema.getMissingColumnFamilies(new HashSet() {{ + add("family_name3"); + add("family_name2"); + add("family_name4"); + }}); + Assert.assertEquals(new HashSet() {{ + add("family_name1"); + }}, missingColumnFamilies); + } +} diff --git a/src/test/java/io/odpf/depot/bigtable/parser/BigTableRecordParserTest.java b/src/test/java/io/odpf/depot/bigtable/parser/BigTableRecordParserTest.java new file mode 100644 index 00000000..9d5c7979 --- /dev/null +++ b/src/test/java/io/odpf/depot/bigtable/parser/BigTableRecordParserTest.java @@ -0,0 +1,174 @@ +package io.odpf.depot.bigtable.parser; + +import io.odpf.depot.TestBookingLogKey; +import io.odpf.depot.TestBookingLogMessage; +import io.odpf.depot.TestServiceType; +import io.odpf.depot.bigtable.model.BigTableRecord; +import io.odpf.depot.bigtable.model.BigTableSchema; +import io.odpf.depot.common.Template; +import io.odpf.depot.common.Tuple; +import io.odpf.depot.config.BigTableSinkConfig; +import io.odpf.depot.error.ErrorType; +import io.odpf.depot.exception.ConfigurationException; +import io.odpf.depot.exception.EmptyMessageException; +import io.odpf.depot.exception.InvalidTemplateException; +import io.odpf.depot.message.OdpfMessage; +import io.odpf.depot.message.OdpfMessageSchema; +import io.odpf.depot.message.ParsedOdpfMessage; +import io.odpf.depot.message.OdpfMessageParser; +import io.odpf.depot.message.SinkConnectorSchemaMessageMode; +import io.odpf.depot.message.proto.ProtoOdpfMessageParser; +import io.odpf.depot.utils.MessageConfigUtils; +import io.odpf.stencil.client.ClassLoadStencilClient; +import org.aeonbits.owner.ConfigFactory; +import org.aeonbits.owner.util.Collections; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.MockitoAnnotations; +import java.io.IOException; +import java.util.List; + +import static org.mockito.Mockito.CALLS_REAL_METHODS; +import static org.mockito.Mockito.when; +import static org.mockito.Mockito.any; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertEquals; + +public class BigTableRecordParserTest { + + @Mock + private ClassLoadStencilClient stencilClient; + @Mock + private OdpfMessageSchema schema; + @Mock + private OdpfMessageParser mockOdpfMessageParser; + @Mock + private BigTableRowKeyParser mockBigTableRowKeyParser; + @Mock + private ParsedOdpfMessage mockParsedOdpfMessage; + private BigTableRecordParser bigTableRecordParser; + private List messages; + private BigTableSinkConfig sinkConfig; + + @Before + public void setUp() throws IOException, InvalidTemplateException { + MockitoAnnotations.openMocks(this); + System.setProperty("SINK_CONNECTOR_SCHEMA_PROTO_MESSAGE_CLASS", "io.odpf.depot.TestBookingLogMessage"); + System.setProperty("SINK_CONNECTOR_SCHEMA_MESSAGE_MODE", String.valueOf(SinkConnectorSchemaMessageMode.LOG_MESSAGE)); + System.setProperty("SINK_BIGTABLE_COLUMN_FAMILY_MAPPING", "{}"); + System.setProperty("SINK_BIGTABLE_ROW_KEY_TEMPLATE", "row-key-constant-string"); + + + TestBookingLogKey bookingLogKey1 = TestBookingLogKey.newBuilder().setOrderNumber("order#1").setOrderUrl("order-url#1").build(); + TestBookingLogMessage bookingLogMessage1 = TestBookingLogMessage.newBuilder().setOrderNumber("order#1").setOrderUrl("order-url#1").setServiceType(TestServiceType.Enum.GO_SEND).build(); + TestBookingLogKey bookingLogKey2 = TestBookingLogKey.newBuilder().setOrderNumber("order#2").setOrderUrl("order-url#2").build(); + TestBookingLogMessage bookingLogMessage2 = TestBookingLogMessage.newBuilder().setOrderNumber("order#2").setOrderUrl("order-url#2").setServiceType(TestServiceType.Enum.GO_SHOP).build(); + + OdpfMessage message1 = new OdpfMessage(bookingLogKey1.toByteArray(), bookingLogMessage1.toByteArray()); + OdpfMessage message2 = new OdpfMessage(bookingLogKey2.toByteArray(), bookingLogMessage2.toByteArray()); + messages = Collections.list(message1, message2); + + stencilClient = Mockito.mock(ClassLoadStencilClient.class, CALLS_REAL_METHODS); + ProtoOdpfMessageParser protoOdpfMessageParser = new ProtoOdpfMessageParser(stencilClient); + sinkConfig = ConfigFactory.create(BigTableSinkConfig.class, System.getProperties()); + Tuple modeAndSchema = MessageConfigUtils.getModeAndSchema(sinkConfig); + BigTableSchema bigtableSchema = new BigTableSchema(sinkConfig.getColumnFamilyMapping()); + BigTableRowKeyParser bigTableRowKeyParser = new BigTableRowKeyParser(new Template(sinkConfig.getRowKeyTemplate()), schema); + + bigTableRecordParser = new BigTableRecordParser(protoOdpfMessageParser, bigTableRowKeyParser, modeAndSchema, schema, bigtableSchema); + } + + @Test + public void shouldReturnValidRecordsForListOfValidOdpfMessages() { + List records = bigTableRecordParser.convert(messages); + assertTrue(records.get(0).isValid()); + assertTrue(records.get(1).isValid()); + assertNull(records.get(0).getErrorInfo()); + assertNull(records.get(1).getErrorInfo()); + } + + @Test + public void shouldReturnInvalidRecordForAnyNullOdpfMessage() { + List records = bigTableRecordParser.convert(Collections.list(new OdpfMessage(null, null))); + assertFalse(records.get(0).isValid()); + assertNotNull(records.get(0).getErrorInfo()); + } + + @Test + public void shouldCatchEmptyMessageExceptionAndReturnAnInvalidBigtableRecordWithErrorTypeAsInvalidMessageError() throws IOException { + bigTableRecordParser = new BigTableRecordParser(mockOdpfMessageParser, + mockBigTableRowKeyParser, + MessageConfigUtils.getModeAndSchema(sinkConfig), + schema, + new BigTableSchema(sinkConfig.getColumnFamilyMapping()) + ); + when(mockOdpfMessageParser.parse(any(), any(), any())).thenThrow(EmptyMessageException.class); + + List bigTableRecords = bigTableRecordParser.convert(messages); + + for (BigTableRecord record : bigTableRecords) { + assertFalse(record.isValid()); + assertEquals(ErrorType.INVALID_MESSAGE_ERROR, record.getErrorInfo().getErrorType()); + } + } + + @Test + public void shouldCatchConfigurationExceptionAndReturnAnInvalidBigtableRecordWithErrorTypeAsUnknownFieldsError() throws IOException { + bigTableRecordParser = new BigTableRecordParser(mockOdpfMessageParser, + mockBigTableRowKeyParser, + MessageConfigUtils.getModeAndSchema(sinkConfig), + schema, + new BigTableSchema(sinkConfig.getColumnFamilyMapping()) + ); + when(mockOdpfMessageParser.parse(any(), any(), any())).thenThrow(ConfigurationException.class); + + List bigTableRecords = bigTableRecordParser.convert(messages); + + for (BigTableRecord record : bigTableRecords) { + assertFalse(record.isValid()); + assertEquals(ErrorType.UNKNOWN_FIELDS_ERROR, record.getErrorInfo().getErrorType()); + } + } + + @Test + public void shouldCatchIOExceptionAndReturnAnInvalidBigtableRecordWithErrorTypeAsDeserializationError() throws IOException { + bigTableRecordParser = new BigTableRecordParser(mockOdpfMessageParser, + mockBigTableRowKeyParser, + MessageConfigUtils.getModeAndSchema(sinkConfig), + schema, + new BigTableSchema(sinkConfig.getColumnFamilyMapping()) + ); + when(mockOdpfMessageParser.parse(any(), any(), any())).thenThrow(IOException.class); + + List bigTableRecords = bigTableRecordParser.convert(messages); + + for (BigTableRecord record : bigTableRecords) { + assertFalse(record.isValid()); + assertEquals(ErrorType.DESERIALIZATION_ERROR, record.getErrorInfo().getErrorType()); + } + } + + @Test + public void shouldCatchIllegalArgumentExceptionAndReturnAnInvalidBigtableRecordWithErrorTypeAsUnknownFieldsError() throws IOException { + bigTableRecordParser = new BigTableRecordParser(mockOdpfMessageParser, + mockBigTableRowKeyParser, + MessageConfigUtils.getModeAndSchema(sinkConfig), + schema, + new BigTableSchema(sinkConfig.getColumnFamilyMapping()) + ); + when(mockOdpfMessageParser.parse(any(), any(), any())).thenReturn(mockParsedOdpfMessage); + when(mockBigTableRowKeyParser.parse(mockParsedOdpfMessage)).thenThrow(IllegalArgumentException.class); + + List bigTableRecords = bigTableRecordParser.convert(messages); + + for (BigTableRecord record : bigTableRecords) { + assertFalse(record.isValid()); + assertEquals(ErrorType.UNKNOWN_FIELDS_ERROR, record.getErrorInfo().getErrorType()); + } + } +} diff --git a/src/test/java/io/odpf/depot/bigtable/parser/BigTableResponseParserTest.java b/src/test/java/io/odpf/depot/bigtable/parser/BigTableResponseParserTest.java new file mode 100644 index 00000000..7c5df022 --- /dev/null +++ b/src/test/java/io/odpf/depot/bigtable/parser/BigTableResponseParserTest.java @@ -0,0 +1,230 @@ +package io.odpf.depot.bigtable.parser; + +import com.google.api.gax.rpc.ApiException; +import com.google.api.gax.rpc.ErrorDetails; +import com.google.api.gax.rpc.StatusCode; +import com.google.cloud.bigtable.data.v2.models.MutateRowsException; +import com.google.cloud.bigtable.data.v2.models.RowMutationEntry; +import com.google.rpc.BadRequest; +import com.google.rpc.PreconditionFailure; +import com.google.rpc.QuotaFailure; +import io.odpf.depot.TestBookingLogKey; +import io.odpf.depot.TestBookingLogMessage; +import io.odpf.depot.TestServiceType; +import io.odpf.depot.bigtable.model.BigTableRecord; +import io.odpf.depot.bigtable.response.BigTableResponse; +import io.odpf.depot.error.ErrorInfo; +import io.odpf.depot.error.ErrorType; +import io.odpf.depot.message.OdpfMessage; +import io.odpf.depot.metrics.BigTableMetrics; +import io.odpf.depot.metrics.Instrumentation; +import org.aeonbits.owner.util.Collections; +import org.junit.Before; +import org.junit.Test; +import org.junit.jupiter.api.Assertions; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.MockitoAnnotations; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +public class BigTableResponseParserTest { + @Mock + private BigTableMetrics bigtableMetrics; + @Mock + private Instrumentation instrumentation; + @Mock + private ApiException apiException; + @Mock + private StatusCode statusCode; + @Mock + private StatusCode.Code code; + @Mock + private ErrorDetails errorDetails; + private List validRecords; + + @Before + public void setUp() { + MockitoAnnotations.openMocks(this); + Mockito.when(apiException.getStatusCode()).thenReturn(statusCode); + Mockito.when(statusCode.getCode()).thenReturn(code); + Mockito.when(apiException.getErrorDetails()).thenReturn(errorDetails); + Mockito.when(apiException.getReason()).thenReturn("REASON_STRING"); + Mockito.when(apiException.isRetryable()).thenReturn(Boolean.FALSE); + Mockito.when(apiException.getCause()).thenReturn(apiException); + Mockito.when(errorDetails.getBadRequest()).thenReturn(null); + Mockito.when(errorDetails.getQuotaFailure()).thenReturn(null); + Mockito.when(errorDetails.getPreconditionFailure()).thenReturn(null); + + TestBookingLogKey bookingLogKey1 = TestBookingLogKey.newBuilder().setOrderNumber("order#1").setOrderUrl("order-url#1").build(); + TestBookingLogMessage bookingLogMessage1 = TestBookingLogMessage.newBuilder().setOrderNumber("order#1").setOrderUrl("order-url#1").setServiceType(TestServiceType.Enum.GO_SEND).build(); + TestBookingLogKey bookingLogKey2 = TestBookingLogKey.newBuilder().setOrderNumber("order#2").setOrderUrl("order-url#2").build(); + TestBookingLogMessage bookingLogMessage2 = TestBookingLogMessage.newBuilder().setOrderNumber("order#2").setOrderUrl("order-url#2").setServiceType(TestServiceType.Enum.GO_SHOP).build(); + + OdpfMessage message1 = new OdpfMessage(bookingLogKey1.toByteArray(), bookingLogMessage1.toByteArray()); + OdpfMessage message2 = new OdpfMessage(bookingLogKey2.toByteArray(), bookingLogMessage2.toByteArray()); + + RowMutationEntry rowMutationEntry1 = RowMutationEntry.create("rowKey1").setCell("family1", "qualifier1", "value1"); + RowMutationEntry rowMutationEntry2 = RowMutationEntry.create("rowKey2").setCell("family2", "qualifier2", "value2"); + BigTableRecord bigTableRecord1 = new BigTableRecord(rowMutationEntry1, 0, null, message1.getMetadata()); + BigTableRecord bigTableRecord2 = new BigTableRecord(rowMutationEntry2, 1, null, message2.getMetadata()); + validRecords = Collections.list(bigTableRecord1, bigTableRecord2); + } + + @Test + public void shouldReturnErrorInfoMapWithRetryableError() { + List failedMutations = new ArrayList<>(); + failedMutations.add(MutateRowsException.FailedMutation.create(1, apiException)); + MutateRowsException mutateRowsException = new MutateRowsException(null, failedMutations, false); + BigTableResponse bigtableResponse = new BigTableResponse(mutateRowsException); + + Mockito.when(code.getHttpStatusCode()).thenReturn(400); + Mockito.when(apiException.isRetryable()).thenReturn(Boolean.TRUE); + + Map errorsFromSinkResponse = BigTableResponseParser.getErrorsFromSinkResponse(validRecords, bigtableResponse, bigtableMetrics, instrumentation); + + Assertions.assertEquals(ErrorType.SINK_RETRYABLE_ERROR, errorsFromSinkResponse.get(1L).getErrorType()); + Assertions.assertEquals(apiException, errorsFromSinkResponse.get(1L).getException()); + } + + @Test + public void shouldReturnErrorInfoMapWith4XXError() { + List failedMutations = new ArrayList<>(); + failedMutations.add(MutateRowsException.FailedMutation.create(1, apiException)); + MutateRowsException mutateRowsException = new MutateRowsException(null, failedMutations, false); + BigTableResponse bigtableResponse = new BigTableResponse(mutateRowsException); + + Mockito.when(code.getHttpStatusCode()).thenReturn(400); + + Map errorsFromSinkResponse = BigTableResponseParser.getErrorsFromSinkResponse(validRecords, bigtableResponse, bigtableMetrics, instrumentation); + + Assertions.assertEquals(ErrorType.SINK_4XX_ERROR, errorsFromSinkResponse.get(1L).getErrorType()); + Assertions.assertEquals(apiException, errorsFromSinkResponse.get(1L).getException()); + } + + @Test + public void shouldReturnErrorInfoMapWith5XXError() { + List failedMutations = new ArrayList<>(); + failedMutations.add(MutateRowsException.FailedMutation.create(1, apiException)); + MutateRowsException mutateRowsException = new MutateRowsException(null, failedMutations, false); + BigTableResponse bigtableResponse = new BigTableResponse(mutateRowsException); + + Mockito.when(code.getHttpStatusCode()).thenReturn(500); + + Map errorsFromSinkResponse = BigTableResponseParser.getErrorsFromSinkResponse(validRecords, bigtableResponse, bigtableMetrics, instrumentation); + + Assertions.assertEquals(ErrorType.SINK_5XX_ERROR, errorsFromSinkResponse.get(1L).getErrorType()); + Assertions.assertEquals(apiException, errorsFromSinkResponse.get(1L).getException()); + } + + @Test + public void shouldReturnErrorInfoMapWithUnknownError() { + List failedMutations = new ArrayList<>(); + failedMutations.add(MutateRowsException.FailedMutation.create(1, apiException)); + MutateRowsException mutateRowsException = new MutateRowsException(null, failedMutations, false); + BigTableResponse bigtableResponse = new BigTableResponse(mutateRowsException); + + Mockito.when(code.getHttpStatusCode()).thenReturn(0); + + Map errorsFromSinkResponse = BigTableResponseParser.getErrorsFromSinkResponse(validRecords, bigtableResponse, bigtableMetrics, instrumentation); + + Assertions.assertEquals(ErrorType.SINK_UNKNOWN_ERROR, errorsFromSinkResponse.get(1L).getErrorType()); + Assertions.assertEquals(apiException, errorsFromSinkResponse.get(1L).getException()); + } + + @Test + public void shouldCaptureMetricBigtableErrorTypeBadRequest() { + List failedMutations = new ArrayList<>(); + failedMutations.add(MutateRowsException.FailedMutation.create(1, apiException)); + MutateRowsException mutateRowsException = new MutateRowsException(null, failedMutations, false); + BigTableResponse bigtableResponse = new BigTableResponse(mutateRowsException); + + Mockito.when(code.getHttpStatusCode()).thenReturn(0); + Mockito.when(errorDetails.getBadRequest()).thenReturn(BadRequest.getDefaultInstance()); + + BigTableResponseParser.getErrorsFromSinkResponse(validRecords, bigtableResponse, bigtableMetrics, instrumentation); + + Mockito.verify(instrumentation, Mockito.times(1)).incrementCounter(bigtableMetrics.getBigtableTotalErrorsMetrics(), String.format(BigTableMetrics.BIGTABLE_ERROR_TAG, BigTableMetrics.BigTableErrorType.BAD_REQUEST)); + } + + @Test + public void shouldCaptureMetricBigtableErrorTypeQuotaFailure() { + List failedMutations = new ArrayList<>(); + failedMutations.add(MutateRowsException.FailedMutation.create(1, apiException)); + MutateRowsException mutateRowsException = new MutateRowsException(null, failedMutations, false); + BigTableResponse bigtableResponse = new BigTableResponse(mutateRowsException); + + Mockito.when(code.getHttpStatusCode()).thenReturn(0); + Mockito.when(errorDetails.getQuotaFailure()).thenReturn(QuotaFailure.getDefaultInstance()); + + BigTableResponseParser.getErrorsFromSinkResponse(validRecords, bigtableResponse, bigtableMetrics, instrumentation); + + Mockito.verify(instrumentation, Mockito.times(1)).incrementCounter(bigtableMetrics.getBigtableTotalErrorsMetrics(), String.format(BigTableMetrics.BIGTABLE_ERROR_TAG, BigTableMetrics.BigTableErrorType.QUOTA_FAILURE)); + } + + @Test + public void shouldCaptureMetricBigtableErrorTypePreconditionFailure() { + List failedMutations = new ArrayList<>(); + failedMutations.add(MutateRowsException.FailedMutation.create(1, apiException)); + MutateRowsException mutateRowsException = new MutateRowsException(null, failedMutations, false); + BigTableResponse bigtableResponse = new BigTableResponse(mutateRowsException); + + Mockito.when(code.getHttpStatusCode()).thenReturn(0); + Mockito.when(errorDetails.getPreconditionFailure()).thenReturn(PreconditionFailure.getDefaultInstance()); + + BigTableResponseParser.getErrorsFromSinkResponse(validRecords, bigtableResponse, bigtableMetrics, instrumentation); + + Mockito.verify(instrumentation, Mockito.times(1)).incrementCounter(bigtableMetrics.getBigtableTotalErrorsMetrics(), String.format(BigTableMetrics.BIGTABLE_ERROR_TAG, BigTableMetrics.BigTableErrorType.PRECONDITION_FAILURE)); + } + + @Test + public void shouldCaptureMetricBigtableErrorTypeRpcFailureByDefault() { + List failedMutations = new ArrayList<>(); + failedMutations.add(MutateRowsException.FailedMutation.create(1, apiException)); + MutateRowsException mutateRowsException = new MutateRowsException(null, failedMutations, false); + BigTableResponse bigtableResponse = new BigTableResponse(mutateRowsException); + + Mockito.when(code.getHttpStatusCode()).thenReturn(0); + + BigTableResponseParser.getErrorsFromSinkResponse(validRecords, bigtableResponse, bigtableMetrics, instrumentation); + + Mockito.verify(instrumentation, Mockito.times(1)).incrementCounter(bigtableMetrics.getBigtableTotalErrorsMetrics(), String.format(BigTableMetrics.BIGTABLE_ERROR_TAG, BigTableMetrics.BigTableErrorType.RPC_FAILURE)); + } + + @Test + public void shouldCaptureMetricBigtableErrorTypeRpcFailureIfErrorDetailsIsNull() { + List failedMutations = new ArrayList<>(); + failedMutations.add(MutateRowsException.FailedMutation.create(1, apiException)); + MutateRowsException mutateRowsException = new MutateRowsException(null, failedMutations, false); + BigTableResponse bigtableResponse = new BigTableResponse(mutateRowsException); + + Mockito.when(apiException.getErrorDetails()).thenReturn(null); + Mockito.when(code.getHttpStatusCode()).thenReturn(0); + + BigTableResponseParser.getErrorsFromSinkResponse(validRecords, bigtableResponse, bigtableMetrics, instrumentation); + + Mockito.verify(instrumentation, Mockito.times(1)).incrementCounter(bigtableMetrics.getBigtableTotalErrorsMetrics(), String.format(BigTableMetrics.BIGTABLE_ERROR_TAG, BigTableMetrics.BigTableErrorType.RPC_FAILURE)); + } + + @Test + public void shouldLogErrorRecordWithReasonAndStatusCode() { + List failedMutations = new ArrayList<>(); + failedMutations.add(MutateRowsException.FailedMutation.create(1, apiException)); + MutateRowsException mutateRowsException = new MutateRowsException(null, failedMutations, false); + BigTableResponse bigtableResponse = new BigTableResponse(mutateRowsException); + + Mockito.when(code.getHttpStatusCode()).thenReturn(0); + Mockito.when(errorDetails.getPreconditionFailure()).thenReturn(PreconditionFailure.getDefaultInstance()); + + BigTableResponseParser.getErrorsFromSinkResponse(validRecords, bigtableResponse, bigtableMetrics, instrumentation); + + Mockito.verify(instrumentation, Mockito.times(1)).logError("Error while inserting to Bigtable. Record Metadata: {}, Cause: {}, Reason: {}, StatusCode: {}, HttpCode: {}", + validRecords.get(1).getMetadata(), + failedMutations.get(0).getError().getCause(), + failedMutations.get(0).getError().getReason(), + failedMutations.get(0).getError().getStatusCode().getCode(), + failedMutations.get(0).getError().getStatusCode().getCode().getHttpStatusCode()); + } +} diff --git a/src/test/java/io/odpf/depot/bigtable/parser/BigTableRowKeyParserTest.java b/src/test/java/io/odpf/depot/bigtable/parser/BigTableRowKeyParserTest.java new file mode 100644 index 00000000..cfe45f94 --- /dev/null +++ b/src/test/java/io/odpf/depot/bigtable/parser/BigTableRowKeyParserTest.java @@ -0,0 +1,94 @@ +package io.odpf.depot.bigtable.parser; + +import com.google.protobuf.Descriptors; +import com.timgroup.statsd.NoOpStatsDClient; +import io.odpf.depot.TestKey; +import io.odpf.depot.TestMessage; +import io.odpf.depot.TestNestedMessage; +import io.odpf.depot.TestNestedRepeatedMessage; +import io.odpf.depot.common.Template; +import io.odpf.depot.config.BigTableSinkConfig; +import io.odpf.depot.exception.InvalidTemplateException; +import io.odpf.depot.message.OdpfMessage; +import io.odpf.depot.message.OdpfMessageSchema; +import io.odpf.depot.message.ParsedOdpfMessage; +import io.odpf.depot.message.SinkConnectorSchemaMessageMode; +import io.odpf.depot.message.proto.ProtoOdpfMessageParser; +import io.odpf.depot.metrics.StatsDReporter; +import org.aeonbits.owner.ConfigFactory; +import org.junit.Test; +import org.junit.jupiter.api.Assertions; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +import static org.junit.Assert.assertEquals; + +public class BigTableRowKeyParserTest { + + private final Map descriptorsMap = new HashMap() {{ + put(String.format("%s", TestKey.class.getName()), TestKey.getDescriptor()); + put(String.format("%s", TestMessage.class.getName()), TestMessage.getDescriptor()); + put(String.format("%s", TestNestedMessage.class.getName()), TestNestedMessage.getDescriptor()); + put(String.format("%s", TestNestedRepeatedMessage.class.getName()), TestNestedRepeatedMessage.getDescriptor()); + }}; + + @Test + public void shouldReturnParsedRowKeyForValidParameterisedTemplate() throws IOException, InvalidTemplateException { + System.setProperty("SINK_BIGTABLE_ROW_KEY_TEMPLATE", "row-%s$key#%s*test,order_number,order_details"); + System.setProperty("SINK_CONNECTOR_SCHEMA_PROTO_MESSAGE_CLASS", "io.odpf.depot.TestMessage"); + BigTableSinkConfig sinkConfig = ConfigFactory.create(BigTableSinkConfig.class, System.getProperties()); + + ProtoOdpfMessageParser odpfMessageParser = new ProtoOdpfMessageParser(sinkConfig, new StatsDReporter(new NoOpStatsDClient()), null); + OdpfMessageSchema schema = odpfMessageParser.getSchema(sinkConfig.getSinkConnectorSchemaProtoMessageClass(), descriptorsMap); + + byte[] logMessage = TestMessage.newBuilder() + .setOrderNumber("xyz-order") + .setOrderDetails("eureka") + .build() + .toByteArray(); + OdpfMessage message = new OdpfMessage(null, logMessage); + ParsedOdpfMessage parsedOdpfMessage = odpfMessageParser.parse(message, SinkConnectorSchemaMessageMode.LOG_MESSAGE, sinkConfig.getSinkConnectorSchemaProtoMessageClass()); + + BigTableRowKeyParser bigTableRowKeyParser = new BigTableRowKeyParser(new Template(sinkConfig.getRowKeyTemplate()), schema); + String parsedRowKey = bigTableRowKeyParser.parse(parsedOdpfMessage); + assertEquals("row-xyz-order$key#eureka*test", parsedRowKey); + } + + @Test + public void shouldReturnTheRowKeySameAsTemplateWhenTemplateIsValidAndContainsOnlyConstantStrings() throws IOException, InvalidTemplateException { + System.setProperty("SINK_BIGTABLE_ROW_KEY_TEMPLATE", "row-key#constant$String"); + System.setProperty("SINK_CONNECTOR_SCHEMA_PROTO_MESSAGE_CLASS", "io.odpf.depot.TestMessage"); + BigTableSinkConfig sinkConfig = ConfigFactory.create(BigTableSinkConfig.class, System.getProperties()); + + ProtoOdpfMessageParser odpfMessageParser = new ProtoOdpfMessageParser(sinkConfig, new StatsDReporter(new NoOpStatsDClient()), null); + OdpfMessageSchema schema = odpfMessageParser.getSchema(sinkConfig.getSinkConnectorSchemaProtoMessageClass(), descriptorsMap); + + byte[] logMessage = TestMessage.newBuilder() + .setOrderNumber("xyz-order") + .setOrderDetails("eureka") + .build() + .toByteArray(); + OdpfMessage message = new OdpfMessage(null, logMessage); + ParsedOdpfMessage parsedOdpfMessage = odpfMessageParser.parse(message, SinkConnectorSchemaMessageMode.LOG_MESSAGE, sinkConfig.getSinkConnectorSchemaProtoMessageClass()); + + BigTableRowKeyParser bigTableRowKeyParser = new BigTableRowKeyParser(new Template(sinkConfig.getRowKeyTemplate()), schema); + String parsedRowKey = bigTableRowKeyParser.parse(parsedOdpfMessage); + assertEquals("row-key#constant$String", parsedRowKey); + } + + @Test + public void shouldThrowErrorForInvalidTemplate() throws IOException { + System.setProperty("SINK_BIGTABLE_ROW_KEY_TEMPLATE", "row-key%s"); + System.setProperty("SINK_CONNECTOR_SCHEMA_PROTO_MESSAGE_CLASS", "io.odpf.depot.TestMessage"); + BigTableSinkConfig sinkConfig = ConfigFactory.create(BigTableSinkConfig.class, System.getProperties()); + + ProtoOdpfMessageParser odpfMessageParser = new ProtoOdpfMessageParser(sinkConfig, new StatsDReporter(new NoOpStatsDClient()), null); + OdpfMessageSchema schema = odpfMessageParser.getSchema(sinkConfig.getSinkConnectorSchemaProtoMessageClass(), descriptorsMap); + + InvalidTemplateException illegalArgumentException = Assertions.assertThrows(InvalidTemplateException.class, () -> new BigTableRowKeyParser(new Template(sinkConfig.getRowKeyTemplate()), schema)); + assertEquals("Template is not valid, variables=1, validArgs=1, values=0", illegalArgumentException.getMessage()); + } + +} diff --git a/src/test/java/io/odpf/depot/redis/parsers/TemplateTest.java b/src/test/java/io/odpf/depot/common/TemplateTest.java similarity index 81% rename from src/test/java/io/odpf/depot/redis/parsers/TemplateTest.java rename to src/test/java/io/odpf/depot/common/TemplateTest.java index 93dbcda6..7f627ad7 100644 --- a/src/test/java/io/odpf/depot/redis/parsers/TemplateTest.java +++ b/src/test/java/io/odpf/depot/common/TemplateTest.java @@ -1,4 +1,4 @@ -package io.odpf.depot.redis.parsers; +package io.odpf.depot.common; import com.google.protobuf.Descriptors; import com.google.protobuf.util.JsonFormat; @@ -8,8 +8,9 @@ import io.odpf.depot.TestKey; import io.odpf.depot.TestLocation; import io.odpf.depot.TestMessage; -import io.odpf.depot.config.RedisSinkConfig; +import io.odpf.depot.config.OdpfSinkConfig; import io.odpf.depot.config.enums.SinkConnectorSchemaDataType; +import io.odpf.depot.exception.InvalidTemplateException; import io.odpf.depot.message.OdpfMessage; import io.odpf.depot.message.OdpfMessageParserFactory; import io.odpf.depot.message.OdpfMessageSchema; @@ -43,7 +44,7 @@ public class TemplateTest { .preservingProtoFieldNames() .includingDefaultValueFields(); @Mock - private RedisSinkConfig redisSinkConfig; + private OdpfSinkConfig sinkConfig; @Mock private StatsDReporter statsDReporter; private ParsedOdpfMessage parsedTestMessage; @@ -69,65 +70,65 @@ public void setUp() throws Exception { parsedTestMessage = new ProtoOdpfParsedMessage(protoParserTest.parse((byte[]) message.getLogMessage()), configuration, jsonPrinter); Parser protoParserBooking = StencilClientFactory.getClient().getParser(TestBookingLogMessage.class.getName()); parsedBookingMessage = new ProtoOdpfParsedMessage(protoParserBooking.parse((byte[]) bookingMessage.getLogMessage()), configuration, jsonPrinter); - when(redisSinkConfig.getSinkConnectorSchemaDataType()).thenReturn(SinkConnectorSchemaDataType.PROTOBUF); - ProtoOdpfMessageParser messageParser = (ProtoOdpfMessageParser) OdpfMessageParserFactory.getParser(redisSinkConfig, statsDReporter); + when(sinkConfig.getSinkConnectorSchemaDataType()).thenReturn(SinkConnectorSchemaDataType.PROTOBUF); + ProtoOdpfMessageParser messageParser = (ProtoOdpfMessageParser) OdpfMessageParserFactory.getParser(sinkConfig, statsDReporter); schemaTest = messageParser.getSchema("io.odpf.depot.TestMessage", descriptorsMap); schemaBooking = messageParser.getSchema("io.odpf.depot.TestBookingLogMessage", descriptorsMap); } @Test - public void shouldParseStringMessageForCollectionKeyTemplate() { + public void shouldParseStringMessageForCollectionKeyTemplate() throws InvalidTemplateException { Template template = new Template("Test-%s,order_number"); assertEquals("Test-test-order", template.parse(parsedTestMessage, schemaTest)); } @Test - public void shouldParseStringMessageWithSpacesForCollectionKeyTemplate() { + public void shouldParseStringMessageWithSpacesForCollectionKeyTemplate() throws InvalidTemplateException { Template template = new Template("Test-%s, order_number"); assertEquals("Test-test-order", template.parse(parsedTestMessage, schemaTest)); } @Test - public void shouldParseFloatMessageForCollectionKeyTemplate() { - Template template = new Template("Test-%s,amount_paid_by_cash"); - assertEquals("Test-12.3", template.parse(parsedBookingMessage, schemaBooking)); + public void shouldParseFloatMessageForCollectionKeyTemplate() throws InvalidTemplateException { + Template template = new Template("Test-%.2f,amount_paid_by_cash"); + assertEquals("Test-12.30", template.parse(parsedBookingMessage, schemaBooking)); } @Test - public void shouldParseLongMessageForCollectionKeyTemplate() { + public void shouldParseLongMessageForCollectionKeyTemplate() throws InvalidTemplateException { Template template = new Template("Test-%s,customer_total_fare_without_surge"); assertEquals("Test-2000", template.parse(parsedBookingMessage, schemaBooking)); } @Test public void shouldThrowExceptionForNullCollectionKeyTemplate() { - IllegalArgumentException e = assertThrows(IllegalArgumentException.class, () -> new Template(null)); - assertEquals("Template 'null' is invalid", e.getMessage()); + InvalidTemplateException e = assertThrows(InvalidTemplateException.class, () -> new Template(null)); + assertEquals("Template cannot be empty", e.getMessage()); } @Test public void shouldThrowExceptionForEmptyCollectionKeyTemplate() { - IllegalArgumentException e = assertThrows(IllegalArgumentException.class, () -> new Template("")); - assertEquals("Template '' is invalid", e.getMessage()); + InvalidTemplateException e = assertThrows(InvalidTemplateException.class, () -> new Template("")); + assertEquals("Template cannot be empty", e.getMessage()); } @Test - public void shouldAcceptStringForCollectionKey() { + public void shouldAcceptStringForCollectionKey() throws InvalidTemplateException { Template template = new Template("Test"); assertEquals("Test", template.parse(parsedBookingMessage, schemaBooking)); } @Test public void shouldNotAcceptStringWithPatternForCollectionKeyWithEmptyVariables() { - IllegalArgumentException e = assertThrows(IllegalArgumentException.class, () -> new Template("Test-%s%d%b,t1,t2")); + InvalidTemplateException e = assertThrows(InvalidTemplateException.class, () -> new Template("Test-%s%d%b,t1,t2")); Assert.assertEquals("Template is not valid, variables=3, validArgs=3, values=2", e.getMessage()); - e = assertThrows(IllegalArgumentException.class, () -> new Template("Test-%s%s%y,order_number,order_details")); + e = assertThrows(InvalidTemplateException.class, () -> new Template("Test-%s%s%y,order_number,order_details")); Assert.assertEquals("Template is not valid, variables=3, validArgs=2, values=2", e.getMessage()); } @Test - public void shouldAcceptStringWithPatternForCollectionKeyWithMultipleVariables() { + public void shouldAcceptStringWithPatternForCollectionKeyWithMultipleVariables() throws InvalidTemplateException { Template template = new Template("Test-%s::%s, order_number, order_details"); assertEquals("Test-test-order::ORDER-DETAILS", template.parse(parsedTestMessage, schemaTest)); } diff --git a/src/test/java/io/odpf/depot/redis/parsers/RedisEntryParserFactoryTest.java b/src/test/java/io/odpf/depot/redis/parsers/RedisEntryParserFactoryTest.java index 73e7b44a..eaebe1c0 100644 --- a/src/test/java/io/odpf/depot/redis/parsers/RedisEntryParserFactoryTest.java +++ b/src/test/java/io/odpf/depot/redis/parsers/RedisEntryParserFactoryTest.java @@ -78,7 +78,7 @@ public void shouldThrowExceptionForEmptyMappingKeyHashSet() { when(redisSinkConfig.getSinkRedisHashsetFieldToColumnMapping()).thenReturn(new JsonToPropertiesConverter().convert(null, "{\"order_details\":\"\"}")); IllegalArgumentException e = Assert.assertThrows(IllegalArgumentException.class, () -> RedisEntryParserFactory.getRedisEntryParser(redisSinkConfig, statsDReporter, schema)); - assertEquals("Template '' is invalid", e.getMessage()); + assertEquals("Template cannot be empty", e.getMessage()); } @Test @@ -104,6 +104,6 @@ public void shouldThrowExceptionForEmptyRedisTemplate() { when(redisSinkConfig.getSinkRedisKeyTemplate()).thenReturn(""); IllegalArgumentException illegalArgumentException = assertThrows(IllegalArgumentException.class, () -> RedisEntryParserFactory.getRedisEntryParser(redisSinkConfig, statsDReporter, schema)); - assertEquals("Template '' is invalid", illegalArgumentException.getMessage()); + assertEquals("Template cannot be empty", illegalArgumentException.getMessage()); } } diff --git a/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker b/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker new file mode 100644 index 00000000..ca6ee9ce --- /dev/null +++ b/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker @@ -0,0 +1 @@ +mock-maker-inline \ No newline at end of file