From e021eebf630bd991c55c41dfc6950277ca4bea84 Mon Sep 17 00:00:00 2001 From: Sumit Aich Date: Thu, 25 Apr 2024 03:12:04 +0530 Subject: [PATCH] feat: httpv2 sink using depot (#11) * feat: httpv2 sink using depot * feat: httpv2 sink using depot * feat: httpv2 sink using depot * feat: httpv2 sink using depot * docs: add http sink docs * docs: add httpv2 sink docs * docs: add httpv2 sink docs * docs: add httpv2 sink docs * docs: add httpv2 sink docs * docs: add httpv2 sink docs * fix: deprecate HTTP SinkType * docs: deprecate HTTP sink type in docs * docs: deprecate HTTP sink type in docs * docs: deprecate HTTP sink type in docs * docs: deprecate HTTP sink type in docs * chore: depot version bump * chore: depot version bump * fix: add SINK_RETRYABLE_ERROR in ERROR_TYPES_FOR_RETRY * docs: add docs for httpv2 sink (#33) * docs: add docs for httpv2 sink * docs: add docs for httpv2 sink * docs: add docs for httpv2 sink * chore: version bump * refactor: import httpv2 classes * fix: resolve conflicts in import * Update build.gradle * feat: make httpv2 connection equal to num threads * feat: refactor to new class * feat: fix build * chore: version bump of depot to 0.9.0 * Update build.gradle * chore: version bump depot for httpv2 * fix: handle null values for kafka consumer mode and sink pool num threads --------- Co-authored-by: gagan.dhand --- build.gradle | 4 +- docs/docs/guides/create_firehose.md | 9 +++++ docs/docs/introduction.md | 3 +- docs/docs/sinks/http-sink.md | 2 + docs/docs/sinks/httpv2-sink.md | 7 ++++ .../firehose/config/ErrorConfig.java | 2 +- .../firehose/config/enums/SinkType.java | 6 +++ .../firehose/sink/SinkFactory.java | 37 +++++++++++++------ .../firehose/sink/httpv2/HttpV2SinkUtils.java | 19 ++++++++++ .../sink/httpv2/HttpV2SinkUtilsTest.java | 30 +++++++++++++++ 10 files changed, 103 insertions(+), 16 deletions(-) create mode 100644 docs/docs/sinks/httpv2-sink.md create mode 100644 src/main/java/com/gotocompany/firehose/sink/httpv2/HttpV2SinkUtils.java create mode 100644 src/test/java/com/gotocompany/firehose/sink/httpv2/HttpV2SinkUtilsTest.java diff --git a/build.gradle b/build.gradle index 05c128424..dba4ca424 100644 --- a/build.gradle +++ b/build.gradle @@ -33,7 +33,7 @@ lombok { } group 'com.gotocompany' -version '0.9.10' +version '0.10.0' def projName = "firehose" @@ -101,7 +101,7 @@ dependencies { implementation platform('com.google.cloud:libraries-bom:20.5.0') implementation 'com.google.cloud:google-cloud-storage:2.20.1' implementation 'org.apache.logging.log4j:log4j-core:2.20.0' - implementation group: 'com.gotocompany', name: 'depot', version: '0.8.2' + implementation group: 'com.gotocompany', name: 'depot', version: '0.9.0' implementation group: 'com.networknt', name: 'json-schema-validator', version: '1.0.59' exclude group: 'org.slf4j' testImplementation group: 'junit', name: 'junit', version: '4.11' diff --git a/docs/docs/guides/create_firehose.md b/docs/docs/guides/create_firehose.md index 3835223f9..a4fae4d4c 100644 --- a/docs/docs/guides/create_firehose.md +++ b/docs/docs/guides/create_firehose.md @@ -42,6 +42,8 @@ event_timestamp { Firehose [HTTP](https://en.wikipedia.org/wiki/Hypertext_Transfer_Protocol) sink allows users to read data from Kafka and write to an HTTP endpoint. it requires the following [variables](../sinks/http-sink.md#http-sink) to be set. You need to create your own HTTP endpoint so that the Firehose can send data to it. +Note: HTTP sink type is deprecated from Firehose version 0.8.11 onwards. Please consider using HTTPV2 sink type instead. + ### Supported methods Firehose supports `PUT` and `POST` verbs in its HTTP sink. The method can be configured using [`SINK_HTTP_REQUEST_METHOD`](../sinks/http-sink.md#sink_http_request_method). @@ -140,4 +142,11 @@ _**Note:**_ [_**DATABASE**_](../sinks/influxdb-sink.md#sink_influx_db_name) _**a - it requires the following environment [variables](https://github.com/goto/depot/blob/main/docs/reference/configuration/bigtable.md) ,which are required by Depot library, to be set along with the generic firehose variables. +## Create an HTTPV2 Sink + +- HttpV2 Sink is implemented in Firehose using the Http sink connector implementation in Depot library. For details on all the features supported by HttpV2 Sink, please refer the Depot documentation [here](https://github.com/goto/depot/blob/main/docs/sinks/http.md). +- it requires the following environment [variables](https://github.com/goto/depot/blob/main/docs/reference/configuration/http.md) ,which are required by Depot library, to be set along with the generic firehose variables. + If you'd like to connect to a sink which is not yet supported, you can create a new sink by following the [contribution guidelines](../contribute/contribution.md) + + diff --git a/docs/docs/introduction.md b/docs/docs/introduction.md index 2ad38eb1d..88a5a3e53 100644 --- a/docs/docs/introduction.md +++ b/docs/docs/introduction.md @@ -29,7 +29,8 @@ Discover why users choose Firehose as their main Kafka Consumer Following sinks are supported in the Firehose - [Log](https://en.wikipedia.org/wiki/Log_file) - Standard Output -- [HTTP](https://en.wikipedia.org/wiki/Hypertext_Transfer_Protocol) - HTTP services +- [HTTP](https://en.wikipedia.org/wiki/Hypertext_Transfer_Protocol) - HTTP services (deprecated in v0.10.0+) +- [HTTP V2](https://en.wikipedia.org/wiki/Hypertext_Transfer_Protocol) - HTTP services - [JDBC](https://en.wikipedia.org/wiki/Java_Database_Connectivity) - Postgres DB - [InfluxDB](https://en.wikipedia.org/wiki/InfluxDB) - A time-series database - [Redis](https://en.wikipedia.org/wiki/Redis) - An in-memory Key value store diff --git a/docs/docs/sinks/http-sink.md b/docs/docs/sinks/http-sink.md index d1cb66725..845254b57 100644 --- a/docs/docs/sinks/http-sink.md +++ b/docs/docs/sinks/http-sink.md @@ -1,5 +1,7 @@ # HTTP +Note: HTTP sink type is deprecated from Firehose version 0.10.0 onwards. Please consider using HTTPV2 sink type instead. + REST API stands for Representational State Transfer and is an architectural pattern for creating web services. An Http sink Firehose \(`SINK_TYPE`=`http`\) requires the following variables to be set along with Generic ones. diff --git a/docs/docs/sinks/httpv2-sink.md b/docs/docs/sinks/httpv2-sink.md new file mode 100644 index 000000000..18d2569c2 --- /dev/null +++ b/docs/docs/sinks/httpv2-sink.md @@ -0,0 +1,7 @@ +# HttpV2 Sink + +HttpV2 Sink is implemented in Firehose using the Http sink connector implementation in Depot library. For details on all the features supported by HttpV2 Sink, please refer the Depot documentation [here](https://github.com/goto/depot/blob/main/docs/sinks/http-sink.md). + +### Configuration + +For HttpV2 sink in Firehose we need to set first (`SINK_TYPE`=`httpv2`). There are some generic configs which are common across different sink types which need to be set which are mentioned in [generic.md](../advance/generic.md). Http sink specific configs are mentioned in Depot repository. You can check out the Http Sink configs [here](https://github.com/goto/depot/blob/main/docs/reference/configuration/http-sink.md) diff --git a/src/main/java/com/gotocompany/firehose/config/ErrorConfig.java b/src/main/java/com/gotocompany/firehose/config/ErrorConfig.java index a1db59b09..032c6ea09 100644 --- a/src/main/java/com/gotocompany/firehose/config/ErrorConfig.java +++ b/src/main/java/com/gotocompany/firehose/config/ErrorConfig.java @@ -18,7 +18,7 @@ public interface ErrorConfig extends Config, Mutable { @ConverterClass(SetErrorTypeConverter.class) @Key("ERROR_TYPES_FOR_RETRY") @Separator(",") - @DefaultValue("DEFAULT_ERROR") + @DefaultValue("DEFAULT_ERROR,SINK_RETRYABLE_ERROR") Set getErrorTypesForRetry(); @ConverterClass(SetErrorTypeConverter.class) diff --git a/src/main/java/com/gotocompany/firehose/config/enums/SinkType.java b/src/main/java/com/gotocompany/firehose/config/enums/SinkType.java index 771b5366d..bbc93e095 100644 --- a/src/main/java/com/gotocompany/firehose/config/enums/SinkType.java +++ b/src/main/java/com/gotocompany/firehose/config/enums/SinkType.java @@ -2,7 +2,13 @@ public enum SinkType { JDBC, + /** + * @deprecated + * HTTP is deprecated from Firehose v0.8.11 onwards. Please consider using HTTPV2 instead. + */ + @Deprecated HTTP, + HTTPV2, LOG, CLEVERTAP, INFLUXDB, diff --git a/src/main/java/com/gotocompany/firehose/sink/SinkFactory.java b/src/main/java/com/gotocompany/firehose/sink/SinkFactory.java index 160cb5f05..9305e57a1 100644 --- a/src/main/java/com/gotocompany/firehose/sink/SinkFactory.java +++ b/src/main/java/com/gotocompany/firehose/sink/SinkFactory.java @@ -1,5 +1,19 @@ package com.gotocompany.firehose.sink; +import com.gotocompany.depot.bigquery.BigQuerySink; +import com.gotocompany.depot.bigquery.BigQuerySinkFactory; +import com.gotocompany.depot.bigtable.BigTableSink; +import com.gotocompany.depot.bigtable.BigTableSinkFactory; +import com.gotocompany.depot.config.BigQuerySinkConfig; +import com.gotocompany.depot.config.BigTableSinkConfig; +import com.gotocompany.depot.config.HttpSinkConfig; +import com.gotocompany.depot.config.RedisSinkConfig; +import com.gotocompany.depot.http.HttpSink; +import com.gotocompany.depot.log.LogSink; +import com.gotocompany.depot.log.LogSinkFactory; +import com.gotocompany.depot.metrics.StatsDReporter; +import com.gotocompany.depot.redis.RedisSink; +import com.gotocompany.depot.redis.RedisSinkFactory; import com.gotocompany.firehose.config.KafkaConsumerConfig; import com.gotocompany.firehose.config.enums.SinkType; import com.gotocompany.firehose.consumer.kafka.OffsetManager; @@ -10,22 +24,11 @@ import com.gotocompany.firehose.sink.elasticsearch.EsSinkFactory; import com.gotocompany.firehose.sink.grpc.GrpcSinkFactory; import com.gotocompany.firehose.sink.http.HttpSinkFactory; +import com.gotocompany.firehose.sink.httpv2.HttpV2SinkUtils; import com.gotocompany.firehose.sink.influxdb.InfluxSinkFactory; import com.gotocompany.firehose.sink.jdbc.JdbcSinkFactory; import com.gotocompany.firehose.sink.mongodb.MongoSinkFactory; import com.gotocompany.firehose.sink.prometheus.PromSinkFactory; -import com.gotocompany.depot.bigquery.BigQuerySink; -import com.gotocompany.depot.bigquery.BigQuerySinkFactory; -import com.gotocompany.depot.config.BigQuerySinkConfig; -import com.gotocompany.depot.config.RedisSinkConfig; -import com.gotocompany.depot.bigtable.BigTableSinkFactory; -import com.gotocompany.depot.bigtable.BigTableSink; -import com.gotocompany.depot.config.BigTableSinkConfig; -import com.gotocompany.depot.log.LogSink; -import com.gotocompany.depot.log.LogSinkFactory; -import com.gotocompany.depot.metrics.StatsDReporter; -import com.gotocompany.depot.redis.RedisSink; -import com.gotocompany.depot.redis.RedisSinkFactory; import com.gotocompany.stencil.client.StencilClient; import org.aeonbits.owner.ConfigFactory; @@ -42,6 +45,7 @@ public class SinkFactory { private BigTableSinkFactory bigTableSinkFactory; private LogSinkFactory logSinkFactory; private RedisSinkFactory redisSinkFactory; + private com.gotocompany.depot.http.HttpSinkFactory httpv2SinkFactory; public SinkFactory(KafkaConsumerConfig kafkaConsumerConfig, StatsDReporter statsDReporter, @@ -93,6 +97,13 @@ public void init() { statsDReporter); bigTableSinkFactory.init(); return; + case HTTPV2: + HttpV2SinkUtils.addAdditionalConfigsForHttpV2Sink(config); + httpv2SinkFactory = new com.gotocompany.depot.http.HttpSinkFactory( + ConfigFactory.create(HttpSinkConfig.class, config), + statsDReporter); + httpv2SinkFactory.init(); + return; default: throw new ConfigurationException("Invalid Firehose SINK_TYPE"); } @@ -126,6 +137,8 @@ public Sink getSink() { return new GenericSink(new FirehoseInstrumentation(statsDReporter, BigTableSink.class), sinkType.name(), bigTableSinkFactory.create()); case MONGODB: return MongoSinkFactory.create(config, statsDReporter, stencilClient); + case HTTPV2: + return new GenericSink(new FirehoseInstrumentation(statsDReporter, HttpSink.class), sinkType.name(), httpv2SinkFactory.create()); default: throw new ConfigurationException("Invalid Firehose SINK_TYPE"); } diff --git a/src/main/java/com/gotocompany/firehose/sink/httpv2/HttpV2SinkUtils.java b/src/main/java/com/gotocompany/firehose/sink/httpv2/HttpV2SinkUtils.java new file mode 100644 index 000000000..97209a655 --- /dev/null +++ b/src/main/java/com/gotocompany/firehose/sink/httpv2/HttpV2SinkUtils.java @@ -0,0 +1,19 @@ +package com.gotocompany.firehose.sink.httpv2; + +import com.gotocompany.firehose.config.enums.KafkaConsumerMode; + +import java.util.Map; + +public class HttpV2SinkUtils { + + public static void addAdditionalConfigsForHttpV2Sink(Map env) { + + switch (KafkaConsumerMode.valueOf(env.getOrDefault("SOURCE_KAFKA_CONSUMER_MODE", "SYNC"))) { + case SYNC: + env.put("SINK_HTTPV2_MAX_CONNECTIONS", "1"); + break; + case ASYNC: + env.put("SINK_HTTPV2_MAX_CONNECTIONS", env.getOrDefault("SINK_POOL_NUM_THREADS", "1")); + } + } +} diff --git a/src/test/java/com/gotocompany/firehose/sink/httpv2/HttpV2SinkUtilsTest.java b/src/test/java/com/gotocompany/firehose/sink/httpv2/HttpV2SinkUtilsTest.java new file mode 100644 index 000000000..774cd6af6 --- /dev/null +++ b/src/test/java/com/gotocompany/firehose/sink/httpv2/HttpV2SinkUtilsTest.java @@ -0,0 +1,30 @@ +package com.gotocompany.firehose.sink.httpv2; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.HashMap; +import java.util.Map; + +public class HttpV2SinkUtilsTest { + @Test + public void shouldAddAdditionalConfigsForSyncConsumer() { + Map config = new HashMap() {{ + put("SOURCE_KAFKA_CONSUMER_MODE", "sync"); + put("SINK_HTTPV2_MAX_CONNECTIONS", "5"); + }}; + HttpV2SinkUtils.addAdditionalConfigsForHttpV2Sink(config); + Assert.assertEquals(config.get("SINK_HTTPV2_MAX_CONNECTIONS"), "1"); + } + + @Test + public void shouldAddAdditionalConfigsForASyncConsumer() { + Map config = new HashMap() {{ + put("SOURCE_KAFKA_CONSUMER_MODE", "async"); + put("SINK_POOL_NUM_THREADS", "10"); + put("SINK_HTTPV2_MAX_CONNECTIONS", "5"); + }}; + HttpV2SinkUtils.addAdditionalConfigsForHttpV2Sink(config); + Assert.assertEquals(config.get("SINK_HTTPV2_MAX_CONNECTIONS"), "10"); + } +}