Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: support keepalive and deadline configuration for grpc sink #19

Merged
merged 6 commits into from
Jan 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ lombok {
}

group 'com.gotocompany'
version '0.9.6'
version '0.9.7'

def projName = "firehose"

Expand Down
25 changes: 25 additions & 0 deletions docs/docs/sinks/grpc-sink.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,28 @@ Defines the Proto which would be the response of the GRPC Method.

- Example value: `com.tests.SampleGrpcResponse`
- Type: `required`

### `SINK_GRPC_ARG_KEEPALIVE_TIME_MS`

The keepalive ping is a way to check if a channel is currently working by sending HTTP2 pings over the transport. It is sent periodically, and if the ping is not acknowledged by the peer within a certain timeout period, the transport is disconnected. Other keepalive configurations are described [here](https://github.com/grpc/grpc/blob/master/doc/keepalive.md).

Defines the period (in milliseconds) after which a keepalive ping is sent on the transport. If smaller than 10000, 10000 will be used instead.

- Example value: `60000`
- Type: `optional`
- Default value: `infinite`

### `SINK_GRPC_ARG_KEEPALIVE_TIMEOUT_MS`

Defines the amount of time (in milliseconds) the sender of the keepalive ping waits for an acknowledgement. If it does not receive an acknowledgment within this time, it will close the connection.

- Example value: `5000`
- Type: `optional`
- Default value: `20000`

### `SINK_GRPC_ARG_DEADLINE_MS`

Defines the amount of time (in milliseconds) gRPC clients are willing to wait for an RPC to complete before the RPC is terminated with the error [DEADLINE_EXCEEDED](https://grpc.io/docs/guides/deadlines/#:~:text=By%20default%2C%20gRPC%20does%20not,realistic%20deadline%20in%20your%20clients.)

- Example value: `1000`
- Type: `optional`
10 changes: 10 additions & 0 deletions src/main/java/com/gotocompany/firehose/config/GrpcSinkConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,14 @@ public interface GrpcSinkConfig extends AppConfig {
@Config.Key("SINK_GRPC_RESPONSE_SCHEMA_PROTO_CLASS")
String getSinkGrpcResponseSchemaProtoClass();

@Config.Key("SINK_GRPC_ARG_KEEPALIVE_TIME_MS")
@Config.DefaultValue("9223372036854775807")
Long getSinkGrpcArgKeepaliveTimeMS();

@Config.Key("SINK_GRPC_ARG_KEEPALIVE_TIMEOUT_MS")
@DefaultValue("20000")
Long getSinkGrpcArgKeepaliveTimeoutMS();

@Config.Key("SINK_GRPC_ARG_DEADLINE_MS")
Long getSinkGrpcArgDeadlineMS();
}
2 changes: 2 additions & 0 deletions src/main/java/com/gotocompany/firehose/metrics/Metrics.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ public class Metrics {
//SINK PREFIXES
public static final String SINK_PREFIX = "sink_";
public static final String HTTP_SINK_PREFIX = "http_";
public static final String GRPC_SINK_PREFIX = "grpc_";
public static final String BLOB_SINK_PREFIX = "blob_";

public static final String MONGO_SINK_PREFIX = "mongo_";
Expand Down Expand Up @@ -43,6 +44,7 @@ public class Metrics {
public static final String SINK_MESSAGES_DROP_TOTAL = APPLICATION_PREFIX + SINK_PREFIX + "messages_drop_total";
public static final String SINK_HTTP_RESPONSE_CODE_TOTAL = APPLICATION_PREFIX + SINK_PREFIX + HTTP_SINK_PREFIX + "response_code_total";
public static final String SINK_PUSH_BATCH_SIZE_TOTAL = APPLICATION_PREFIX + SINK_PREFIX + "push_batch_size_total";
public static final String SINK_GRPC_ERROR_TOTAL = APPLICATION_PREFIX + GRPC_SINK_PREFIX + "error_total";

// MONGO SINK MEASUREMENTS
public static final String SINK_MONGO_INSERTED_TOTAL = APPLICATION_PREFIX + SINK_PREFIX + MONGO_SINK_PREFIX + "inserted_total";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.aeonbits.owner.ConfigFactory;

import java.util.Map;
import java.util.concurrent.TimeUnit;

/**
* Factory class to create the GrpcSink.
Expand All @@ -29,7 +30,10 @@ public static AbstractSink create(Map<String, String> configuration, StatsDRepor
grpcConfig.getSinkGrpcServiceHost(), grpcConfig.getSinkGrpcServicePort(), grpcConfig.getSinkGrpcMethodUrl(), grpcConfig.getSinkGrpcResponseSchemaProtoClass());
firehoseInstrumentation.logDebug(grpcSinkConfig);

ManagedChannel managedChannel = ManagedChannelBuilder.forAddress(grpcConfig.getSinkGrpcServiceHost(), grpcConfig.getSinkGrpcServicePort()).usePlaintext().build();
ManagedChannel managedChannel = ManagedChannelBuilder.forAddress(grpcConfig.getSinkGrpcServiceHost(), grpcConfig.getSinkGrpcServicePort())
.keepAliveTime(grpcConfig.getSinkGrpcArgKeepaliveTimeMS(), TimeUnit.MILLISECONDS)
.keepAliveTimeout(grpcConfig.getSinkGrpcArgKeepaliveTimeoutMS(), TimeUnit.MILLISECONDS)
.usePlaintext().build();

GrpcClient grpcClient = new GrpcClient(new FirehoseInstrumentation(statsDReporter, GrpcClient.class), grpcConfig, managedChannel, stencilClient);
firehoseInstrumentation.logInfo("GRPC connection established");
Expand Down
Original file line number Diff line number Diff line change
@@ -1,19 +1,21 @@
package com.gotocompany.firehose.sink.grpc.client;



import com.gotocompany.firehose.config.GrpcSinkConfig;
import com.gotocompany.firehose.metrics.FirehoseInstrumentation;
import com.google.protobuf.DynamicMessage;

import io.grpc.ManagedChannel;
import com.gotocompany.firehose.metrics.Metrics;

import io.grpc.CallOptions;
import io.grpc.Metadata;
import io.grpc.Channel;
import io.grpc.StatusRuntimeException;
import io.grpc.ClientInterceptors;
import io.grpc.ManagedChannel;
import io.grpc.MethodDescriptor;
import io.grpc.stub.ClientCalls;
import io.grpc.stub.MetadataUtils;
import io.grpc.Channel;
import io.grpc.ClientInterceptors;
import io.grpc.CallOptions;
import com.gotocompany.stencil.client.StencilClient;
import org.apache.commons.io.IOUtils;
import org.apache.kafka.common.header.Header;
Expand All @@ -22,6 +24,7 @@
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.concurrent.TimeUnit;


/**
Expand All @@ -33,47 +36,55 @@ public class GrpcClient {
private final GrpcSinkConfig grpcSinkConfig;
private StencilClient stencilClient;
private ManagedChannel managedChannel;
private final MethodDescriptor<byte[], byte[]> methodDescriptor;
private final DynamicMessage emptyResponse;

public GrpcClient(FirehoseInstrumentation firehoseInstrumentation, GrpcSinkConfig grpcSinkConfig, ManagedChannel managedChannel, StencilClient stencilClient) {
this.firehoseInstrumentation = firehoseInstrumentation;
this.grpcSinkConfig = grpcSinkConfig;
this.stencilClient = stencilClient;
this.managedChannel = managedChannel;
MethodDescriptor.Marshaller<byte[]> marshaller = getMarshaller();
this.methodDescriptor = MethodDescriptor.newBuilder(marshaller, marshaller)
mayankrai09 marked this conversation as resolved.
Show resolved Hide resolved
.setType(MethodDescriptor.MethodType.UNARY)
.setFullMethodName(grpcSinkConfig.getSinkGrpcMethodUrl())
.build();
this.emptyResponse = DynamicMessage.newBuilder(this.stencilClient.get(this.grpcSinkConfig.getSinkGrpcResponseSchemaProtoClass())).build();
}

public DynamicMessage execute(byte[] logMessage, Headers headers) {

MethodDescriptor.Marshaller<byte[]> marshaller = getMarshaller();
DynamicMessage dynamicMessage;

try {


Metadata metadata = new Metadata();
for (Header header : headers) {
metadata.put(Metadata.Key.of(header.key(), Metadata.ASCII_STRING_MARSHALLER), new String(header.value()));
}

Channel decoratedChannel = ClientInterceptors.intercept(managedChannel,
MetadataUtils.newAttachHeadersInterceptor(metadata));
byte[] response = ClientCalls.blockingUnaryCall(
decoratedChannel,
MethodDescriptor.newBuilder(marshaller, marshaller)
.setType(MethodDescriptor.MethodType.UNARY)
.setFullMethodName(grpcSinkConfig.getSinkGrpcMethodUrl())
.build(),
CallOptions.DEFAULT,
methodDescriptor,
decoratedDefaultCallOptions(),
logMessage);

dynamicMessage = stencilClient.parse(grpcSinkConfig.getSinkGrpcResponseSchemaProtoClass(), response);
return stencilClient.parse(grpcSinkConfig.getSinkGrpcResponseSchemaProtoClass(), response);

} catch (StatusRuntimeException sre) {
firehoseInstrumentation.logWarn(sre.getMessage());
firehoseInstrumentation.incrementCounter(Metrics.SINK_GRPC_ERROR_TOTAL, "status=" + sre.getStatus().getCode());
} catch (Exception e) {
firehoseInstrumentation.logWarn(e.getMessage());
dynamicMessage = DynamicMessage.newBuilder(this.stencilClient.get(this.grpcSinkConfig.getSinkGrpcResponseSchemaProtoClass())).build();

firehoseInstrumentation.incrementCounter(Metrics.SINK_GRPC_ERROR_TOTAL, "status=UNIDENTIFIED");
}
return emptyResponse;
}

return dynamicMessage;
protected CallOptions decoratedDefaultCallOptions() {
CallOptions defaultCallOption = CallOptions.DEFAULT;
if (grpcSinkConfig.getSinkGrpcArgDeadlineMS() != null && grpcSinkConfig.getSinkGrpcArgDeadlineMS() > 0) {
return defaultCallOption.withDeadlineAfter(grpcSinkConfig.getSinkGrpcArgDeadlineMS(), TimeUnit.MILLISECONDS);
}
return defaultCallOption;
}

private MethodDescriptor.Marshaller<byte[]> getMarshaller() {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
package com.gotocompany.firehose.sink.grpc;

import com.gotocompany.firehose.config.GrpcSinkConfig;
import com.gotocompany.firehose.consumer.TestGrpcResponse;
import com.gotocompany.firehose.exception.DeserializerException;
import com.gotocompany.firehose.sink.Sink;
import com.gotocompany.depot.metrics.StatsDReporter;
import com.gotocompany.firehose.consumer.TestServerGrpc;
import com.gotocompany.stencil.StencilClientFactory;
import io.grpc.ManagedChannelBuilder;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import com.gotocompany.stencil.client.StencilClient;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
Expand All @@ -17,6 +22,7 @@
import java.util.Map;

import static org.mockito.Mockito.when;

import static org.mockito.MockitoAnnotations.initMocks;

public class GrpcSinkFactoryTest {
Expand All @@ -27,22 +33,35 @@ public class GrpcSinkFactoryTest {
@Mock
private TestServerGrpc.TestServerImplBase testGrpcService;

@Mock
private StencilClient stencilClient;

// private static ConsulClient consulClient;
@Mock
private GrpcSinkConfig grpcConfig;

@Mock
private ManagedChannelBuilder channelBuilder;

private Server server;

@Before
public void setUp() {
initMocks(this);
stencilClient = StencilClientFactory.getClient();
}

@After
public void tearDown() {
if (server != null) {
server.shutdown();
}
}


@Test
public void shouldCreateChannelPoolWithHostAndPort() throws IOException, DeserializerException {
when(testGrpcService.bindService()).thenCallRealMethod();

Server server = ServerBuilder
when(testGrpcService.bindService()).thenCallRealMethod();
server = ServerBuilder
.forPort(5000)
.addService(testGrpcService.bindService())
.build()
Expand All @@ -52,11 +71,10 @@ public void shouldCreateChannelPoolWithHostAndPort() throws IOException, Deseria
config.put("SINK_GRPC_METHOD_URL", "com.gotocompany.firehose.consumer.TestServer/TestRpcMethod");
config.put("SINK_GRPC_SERVICE_HOST", "localhost");
config.put("SINK_GRPC_SERVICE_PORT", "5000");

config.put("SINK_GRPC_RESPONSE_SCHEMA_PROTO_CLASS", TestGrpcResponse.class.getName());

Sink sink = GrpcSinkFactory.create(config, statsDReporter, stencilClient);

Assert.assertNotNull(sink);
server.shutdownNow();
}
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package com.gotocompany.firehose.sink.grpc;
package com.gotocompany.firehose.sink.grpc.client;


import com.gotocompany.firehose.config.GrpcSinkConfig;
import com.gotocompany.firehose.metrics.FirehoseInstrumentation;
import com.gotocompany.firehose.sink.grpc.client.GrpcClient;
import com.gotocompany.firehose.consumer.Error;
import com.gotocompany.firehose.consumer.TestGrpcRequest;
import com.gotocompany.firehose.consumer.TestGrpcResponse;
Expand All @@ -19,6 +19,7 @@
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.stubbing.Stubber;

Expand All @@ -29,9 +30,7 @@
import java.util.List;
import java.util.Map;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.*;
import static org.mockito.Mockito.*;

public class GrpcClientTest {
Expand All @@ -43,9 +42,12 @@ public class GrpcClientTest {
private static final List<String> HEADER_KEYS = Arrays.asList("test-header-key-1", "test-header-key-2");
private HeaderTestInterceptor headerTestInterceptor;

@Mock
private FirehoseInstrumentation firehoseInstrumentation;

@Before
public void setup() throws IOException {
FirehoseInstrumentation firehoseInstrumentation = Mockito.mock(FirehoseInstrumentation.class);
firehoseInstrumentation = Mockito.mock(FirehoseInstrumentation.class);
testGrpcService = Mockito.mock(TestServerGrpc.TestServerImplBase.class, CALLS_REAL_METHODS);
headerTestInterceptor = new HeaderTestInterceptor();
headerTestInterceptor.setHeaderKeys(HEADER_KEYS);
Expand Down Expand Up @@ -111,7 +113,6 @@ public void shouldCallTheGivenRpcMethodAndGetSuccessResponse() {
.setField2("field2")
.build();
DynamicMessage response = grpcClient.execute(request.toByteArray(), headers);
System.out.println(response.toString());
assertTrue(Boolean.parseBoolean(String.valueOf(response.getField(TestGrpcResponse.getDescriptor().findFieldByName("success")))));
}

Expand Down Expand Up @@ -155,6 +156,29 @@ public void shouldReturnErrorWhenBytesAreNull() {
assertFalse(Boolean.parseBoolean(String.valueOf(response.getField(response.getDescriptorForType().findFieldByName("success")))));
}

@Test
public void shouldNotDecorateCallOptionsWithDeadline() {
CallOptions decoratedCallOptions = grpcClient.decoratedDefaultCallOptions();
assertNull(decoratedCallOptions.getDeadline());
}

@Test
public void shouldDecorateCallOptionsWithDeadline() {
Map<String, String> config = new HashMap<>();
config.put("SINK_GRPC_SERVICE_HOST", "localhost");
config.put("SINK_GRPC_SERVICE_PORT", "5000");
config.put("SINK_GRPC_METHOD_URL", "com.gotocompany.firehose.consumer.TestServer/TestRpcMethod");
config.put("SINK_GRPC_RESPONSE_SCHEMA_PROTO_CLASS", "com.gotocompany.firehose.consumer.TestGrpcResponse");
config.put("SINK_GRPC_ARG_DEADLINE_MS", "1000");
GrpcSinkConfig grpcSinkConfig = ConfigFactory.create(GrpcSinkConfig.class, config);
StencilClient stencilClient = StencilClientFactory.getClient();
ManagedChannel managedChannel = ManagedChannelBuilder.forAddress(grpcSinkConfig.getSinkGrpcServiceHost(), grpcSinkConfig.getSinkGrpcServicePort()).usePlaintext().build();
grpcClient = new GrpcClient(firehoseInstrumentation, grpcSinkConfig, managedChannel, stencilClient);

CallOptions decoratedCallOptions = grpcClient.decoratedDefaultCallOptions();
assertNotNull(decoratedCallOptions.getDeadline());
}

@Test
public void shouldReturnErrorWhenGrpcException() {
doThrow(new RuntimeException("error")).when(testGrpcService).testRpcMethod(any(TestGrpcRequest.class), any());
Expand All @@ -166,6 +190,16 @@ public void shouldReturnErrorWhenGrpcException() {
assertFalse(Boolean.parseBoolean(String.valueOf(response.getField(response.getDescriptorForType().findFieldByName("success")))));
}

@Test
public void shouldReportMetricsWhenGrpcException() {
doThrow(new StatusRuntimeException(Status.UNKNOWN)).when(testGrpcService).testRpcMethod(any(TestGrpcRequest.class), any());
TestGrpcRequest request = TestGrpcRequest.newBuilder()
.setField1("field1")
.setField2("field2")
.build();
grpcClient.execute(request.toByteArray(), headers);
verify(firehoseInstrumentation, times(1)).incrementCounter("firehose_grpc_error_total", "status=" + Status.UNKNOWN.getCode());
}

private <T extends AbstractMessage> Stubber doAnswerProtoReponse(T response) {
return doAnswer(invocation -> {
Expand Down
Loading