Skip to content

Commit

Permalink
feat: httpv2 sink using depot (#11)
Browse files Browse the repository at this point in the history
* feat: httpv2 sink using depot

* feat: httpv2 sink using depot

* feat: httpv2 sink using depot

* feat: httpv2 sink using depot

* docs: add http sink docs

* docs: add httpv2 sink docs

* docs: add httpv2 sink docs

* docs: add httpv2 sink docs

* docs: add httpv2 sink docs

* docs: add httpv2 sink docs

* fix: deprecate HTTP SinkType

* docs: deprecate HTTP sink type in docs

* docs: deprecate HTTP sink type in docs

* docs: deprecate HTTP sink type in docs

* docs: deprecate HTTP sink type in docs

* chore: depot version bump

* chore: depot version bump

* fix: add SINK_RETRYABLE_ERROR in ERROR_TYPES_FOR_RETRY

* docs: add docs for httpv2 sink (#33)

* docs: add docs for httpv2 sink

* docs: add docs for httpv2 sink

* docs: add docs for httpv2 sink

* chore: version bump

* refactor: import httpv2 classes

* fix: resolve conflicts in import

* Update build.gradle

* feat: make httpv2 connection equal to num threads

* feat: refactor to new class

* feat: fix build

* chore: version bump of depot to 0.9.0

* Update build.gradle

* chore: version bump depot for httpv2

* fix: handle null values for kafka consumer mode and sink pool num threads

---------

Co-authored-by: gagan.dhand <[email protected]>
  • Loading branch information
sumitaich1998 and gdgagangeek authored Apr 24, 2024
1 parent 91794d2 commit e021eeb
Show file tree
Hide file tree
Showing 10 changed files with 103 additions and 16 deletions.
4 changes: 2 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ lombok {
}

group 'com.gotocompany'
version '0.9.10'
version '0.10.0'

def projName = "firehose"

Expand Down Expand Up @@ -101,7 +101,7 @@ dependencies {
implementation platform('com.google.cloud:libraries-bom:20.5.0')
implementation 'com.google.cloud:google-cloud-storage:2.20.1'
implementation 'org.apache.logging.log4j:log4j-core:2.20.0'
implementation group: 'com.gotocompany', name: 'depot', version: '0.8.2'
implementation group: 'com.gotocompany', name: 'depot', version: '0.9.0'
implementation group: 'com.networknt', name: 'json-schema-validator', version: '1.0.59' exclude group: 'org.slf4j'

testImplementation group: 'junit', name: 'junit', version: '4.11'
Expand Down
9 changes: 9 additions & 0 deletions docs/docs/guides/create_firehose.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ event_timestamp {

Firehose [HTTP](https://en.wikipedia.org/wiki/Hypertext_Transfer_Protocol) sink allows users to read data from Kafka and write to an HTTP endpoint. it requires the following [variables](../sinks/http-sink.md#http-sink) to be set. You need to create your own HTTP endpoint so that the Firehose can send data to it.

Note: HTTP sink type is deprecated from Firehose version 0.8.11 onwards. Please consider using HTTPV2 sink type instead.

### Supported methods

Firehose supports `PUT` and `POST` verbs in its HTTP sink. The method can be configured using [`SINK_HTTP_REQUEST_METHOD`](../sinks/http-sink.md#sink_http_request_method).
Expand Down Expand Up @@ -140,4 +142,11 @@ _**Note:**_ [_**DATABASE**_](../sinks/influxdb-sink.md#sink_influx_db_name) _**a

- it requires the following environment [variables](https://github.com/goto/depot/blob/main/docs/reference/configuration/bigtable.md) ,which are required by Depot library, to be set along with the generic firehose variables.

## Create an HTTPV2 Sink

- HttpV2 Sink is implemented in Firehose using the Http sink connector implementation in Depot library. For details on all the features supported by HttpV2 Sink, please refer the Depot documentation [here](https://github.com/goto/depot/blob/main/docs/sinks/http.md).
- it requires the following environment [variables](https://github.com/goto/depot/blob/main/docs/reference/configuration/http.md) ,which are required by Depot library, to be set along with the generic firehose variables.

If you'd like to connect to a sink which is not yet supported, you can create a new sink by following the [contribution guidelines](../contribute/contribution.md)


3 changes: 2 additions & 1 deletion docs/docs/introduction.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ Discover why users choose Firehose as their main Kafka Consumer
Following sinks are supported in the Firehose

- [Log](https://en.wikipedia.org/wiki/Log_file) - Standard Output
- [HTTP](https://en.wikipedia.org/wiki/Hypertext_Transfer_Protocol) - HTTP services
- [HTTP](https://en.wikipedia.org/wiki/Hypertext_Transfer_Protocol) - HTTP services (deprecated in v0.10.0+)
- [HTTP V2](https://en.wikipedia.org/wiki/Hypertext_Transfer_Protocol) - HTTP services
- [JDBC](https://en.wikipedia.org/wiki/Java_Database_Connectivity) - Postgres DB
- [InfluxDB](https://en.wikipedia.org/wiki/InfluxDB) - A time-series database
- [Redis](https://en.wikipedia.org/wiki/Redis) - An in-memory Key value store
Expand Down
2 changes: 2 additions & 0 deletions docs/docs/sinks/http-sink.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
# HTTP

Note: HTTP sink type is deprecated from Firehose version 0.10.0 onwards. Please consider using HTTPV2 sink type instead.

REST API stands for Representational State Transfer and is an architectural pattern for creating web services.

An Http sink Firehose \(`SINK_TYPE`=`http`\) requires the following variables to be set along with Generic ones.
Expand Down
7 changes: 7 additions & 0 deletions docs/docs/sinks/httpv2-sink.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# HttpV2 Sink

HttpV2 Sink is implemented in Firehose using the Http sink connector implementation in Depot library. For details on all the features supported by HttpV2 Sink, please refer the Depot documentation [here](https://github.com/goto/depot/blob/main/docs/sinks/http-sink.md).

### Configuration

For HttpV2 sink in Firehose we need to set first (`SINK_TYPE`=`httpv2`). There are some generic configs which are common across different sink types which need to be set which are mentioned in [generic.md](../advance/generic.md). Http sink specific configs are mentioned in Depot repository. You can check out the Http Sink configs [here](https://github.com/goto/depot/blob/main/docs/reference/configuration/http-sink.md)
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ public interface ErrorConfig extends Config, Mutable {
@ConverterClass(SetErrorTypeConverter.class)
@Key("ERROR_TYPES_FOR_RETRY")
@Separator(",")
@DefaultValue("DEFAULT_ERROR")
@DefaultValue("DEFAULT_ERROR,SINK_RETRYABLE_ERROR")
Set<ErrorType> getErrorTypesForRetry();

@ConverterClass(SetErrorTypeConverter.class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,13 @@

public enum SinkType {
JDBC,
/**
* @deprecated
* HTTP is deprecated from Firehose v0.8.11 onwards. Please consider using HTTPV2 instead.
*/
@Deprecated
HTTP,
HTTPV2,
LOG,
CLEVERTAP,
INFLUXDB,
Expand Down
37 changes: 25 additions & 12 deletions src/main/java/com/gotocompany/firehose/sink/SinkFactory.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,19 @@
package com.gotocompany.firehose.sink;

import com.gotocompany.depot.bigquery.BigQuerySink;
import com.gotocompany.depot.bigquery.BigQuerySinkFactory;
import com.gotocompany.depot.bigtable.BigTableSink;
import com.gotocompany.depot.bigtable.BigTableSinkFactory;
import com.gotocompany.depot.config.BigQuerySinkConfig;
import com.gotocompany.depot.config.BigTableSinkConfig;
import com.gotocompany.depot.config.HttpSinkConfig;
import com.gotocompany.depot.config.RedisSinkConfig;
import com.gotocompany.depot.http.HttpSink;
import com.gotocompany.depot.log.LogSink;
import com.gotocompany.depot.log.LogSinkFactory;
import com.gotocompany.depot.metrics.StatsDReporter;
import com.gotocompany.depot.redis.RedisSink;
import com.gotocompany.depot.redis.RedisSinkFactory;
import com.gotocompany.firehose.config.KafkaConsumerConfig;
import com.gotocompany.firehose.config.enums.SinkType;
import com.gotocompany.firehose.consumer.kafka.OffsetManager;
Expand All @@ -10,22 +24,11 @@
import com.gotocompany.firehose.sink.elasticsearch.EsSinkFactory;
import com.gotocompany.firehose.sink.grpc.GrpcSinkFactory;
import com.gotocompany.firehose.sink.http.HttpSinkFactory;
import com.gotocompany.firehose.sink.httpv2.HttpV2SinkUtils;
import com.gotocompany.firehose.sink.influxdb.InfluxSinkFactory;
import com.gotocompany.firehose.sink.jdbc.JdbcSinkFactory;
import com.gotocompany.firehose.sink.mongodb.MongoSinkFactory;
import com.gotocompany.firehose.sink.prometheus.PromSinkFactory;
import com.gotocompany.depot.bigquery.BigQuerySink;
import com.gotocompany.depot.bigquery.BigQuerySinkFactory;
import com.gotocompany.depot.config.BigQuerySinkConfig;
import com.gotocompany.depot.config.RedisSinkConfig;
import com.gotocompany.depot.bigtable.BigTableSinkFactory;
import com.gotocompany.depot.bigtable.BigTableSink;
import com.gotocompany.depot.config.BigTableSinkConfig;
import com.gotocompany.depot.log.LogSink;
import com.gotocompany.depot.log.LogSinkFactory;
import com.gotocompany.depot.metrics.StatsDReporter;
import com.gotocompany.depot.redis.RedisSink;
import com.gotocompany.depot.redis.RedisSinkFactory;
import com.gotocompany.stencil.client.StencilClient;
import org.aeonbits.owner.ConfigFactory;

Expand All @@ -42,6 +45,7 @@ public class SinkFactory {
private BigTableSinkFactory bigTableSinkFactory;
private LogSinkFactory logSinkFactory;
private RedisSinkFactory redisSinkFactory;
private com.gotocompany.depot.http.HttpSinkFactory httpv2SinkFactory;

public SinkFactory(KafkaConsumerConfig kafkaConsumerConfig,
StatsDReporter statsDReporter,
Expand Down Expand Up @@ -93,6 +97,13 @@ public void init() {
statsDReporter);
bigTableSinkFactory.init();
return;
case HTTPV2:
HttpV2SinkUtils.addAdditionalConfigsForHttpV2Sink(config);
httpv2SinkFactory = new com.gotocompany.depot.http.HttpSinkFactory(
ConfigFactory.create(HttpSinkConfig.class, config),
statsDReporter);
httpv2SinkFactory.init();
return;
default:
throw new ConfigurationException("Invalid Firehose SINK_TYPE");
}
Expand Down Expand Up @@ -126,6 +137,8 @@ public Sink getSink() {
return new GenericSink(new FirehoseInstrumentation(statsDReporter, BigTableSink.class), sinkType.name(), bigTableSinkFactory.create());
case MONGODB:
return MongoSinkFactory.create(config, statsDReporter, stencilClient);
case HTTPV2:
return new GenericSink(new FirehoseInstrumentation(statsDReporter, HttpSink.class), sinkType.name(), httpv2SinkFactory.create());
default:
throw new ConfigurationException("Invalid Firehose SINK_TYPE");
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package com.gotocompany.firehose.sink.httpv2;

import com.gotocompany.firehose.config.enums.KafkaConsumerMode;

import java.util.Map;

public class HttpV2SinkUtils {

public static void addAdditionalConfigsForHttpV2Sink(Map<String, String> env) {

switch (KafkaConsumerMode.valueOf(env.getOrDefault("SOURCE_KAFKA_CONSUMER_MODE", "SYNC"))) {
case SYNC:
env.put("SINK_HTTPV2_MAX_CONNECTIONS", "1");
break;
case ASYNC:
env.put("SINK_HTTPV2_MAX_CONNECTIONS", env.getOrDefault("SINK_POOL_NUM_THREADS", "1"));
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package com.gotocompany.firehose.sink.httpv2;

import org.junit.Assert;
import org.junit.Test;

import java.util.HashMap;
import java.util.Map;

public class HttpV2SinkUtilsTest {
@Test
public void shouldAddAdditionalConfigsForSyncConsumer() {
Map<String, String> config = new HashMap<String, String>() {{
put("SOURCE_KAFKA_CONSUMER_MODE", "sync");
put("SINK_HTTPV2_MAX_CONNECTIONS", "5");
}};
HttpV2SinkUtils.addAdditionalConfigsForHttpV2Sink(config);
Assert.assertEquals(config.get("SINK_HTTPV2_MAX_CONNECTIONS"), "1");
}

@Test
public void shouldAddAdditionalConfigsForASyncConsumer() {
Map<String, String> config = new HashMap<String, String>() {{
put("SOURCE_KAFKA_CONSUMER_MODE", "async");
put("SINK_POOL_NUM_THREADS", "10");
put("SINK_HTTPV2_MAX_CONNECTIONS", "5");
}};
HttpV2SinkUtils.addAdditionalConfigsForHttpV2Sink(config);
Assert.assertEquals(config.get("SINK_HTTPV2_MAX_CONNECTIONS"), "10");
}
}

0 comments on commit e021eeb

Please sign in to comment.