From 19f2c18a5036313aebaf21de61192337f5ff4bc0 Mon Sep 17 00:00:00 2001 From: Mayank Rai Date: Thu, 31 Aug 2023 23:36:39 +0530 Subject: [PATCH 1/6] chore: add keepalive and deadline configuration for grpc sink --- .../firehose/config/GrpcSinkConfig.java | 11 ++++ .../gotocompany/firehose/metrics/Metrics.java | 2 + .../firehose/sink/grpc/GrpcSinkFactory.java | 16 +++++- .../firehose/sink/grpc/client/GrpcClient.java | 54 ++++++++++++------- .../sink/grpc/GrpcSinkFactoryTest.java | 50 ++++++++++++++++- .../grpc/{ => client}/GrpcClientTest.java | 47 ++++++++++++++-- 6 files changed, 153 insertions(+), 27 deletions(-) rename src/test/java/com/gotocompany/firehose/sink/grpc/{ => client}/GrpcClientTest.java (77%) diff --git a/src/main/java/com/gotocompany/firehose/config/GrpcSinkConfig.java b/src/main/java/com/gotocompany/firehose/config/GrpcSinkConfig.java index 9c2e61390..8a6bc5066 100644 --- a/src/main/java/com/gotocompany/firehose/config/GrpcSinkConfig.java +++ b/src/main/java/com/gotocompany/firehose/config/GrpcSinkConfig.java @@ -17,4 +17,15 @@ public interface GrpcSinkConfig extends AppConfig { @Config.Key("SINK_GRPC_RESPONSE_SCHEMA_PROTO_CLASS") String getSinkGrpcResponseSchemaProtoClass(); + @Config.Key("SINK_GRPC_ARG_KEEPALIVE_TIME_MS") + @Config.DefaultValue("-1") + Integer getSinkGrpcArgKeepaliveTimeMS(); + + @Config.Key("SINK_GRPC_ARG_KEEPALIVE_TIMEOUT_MS") + @Config.DefaultValue("-1") + Integer getSinkGrpcArgKeepaliveTimeoutMS(); + + @Config.Key("SINK_GRPC_ARG_DEADLINE_MS") + @Config.DefaultValue("-1") + Integer getSinkGrpcArgDeadlineMS(); } diff --git a/src/main/java/com/gotocompany/firehose/metrics/Metrics.java b/src/main/java/com/gotocompany/firehose/metrics/Metrics.java index 568f8399a..4422db62c 100644 --- a/src/main/java/com/gotocompany/firehose/metrics/Metrics.java +++ b/src/main/java/com/gotocompany/firehose/metrics/Metrics.java @@ -11,6 +11,7 @@ public class Metrics { //SINK PREFIXES public static final String SINK_PREFIX = "sink_"; public static final String HTTP_SINK_PREFIX = "http_"; + public static final String GRPC_SINK_PREFIX = "grpc_"; public static final String BLOB_SINK_PREFIX = "blob_"; public static final String MONGO_SINK_PREFIX = "mongo_"; @@ -43,6 +44,7 @@ public class Metrics { public static final String SINK_MESSAGES_DROP_TOTAL = APPLICATION_PREFIX + SINK_PREFIX + "messages_drop_total"; public static final String SINK_HTTP_RESPONSE_CODE_TOTAL = APPLICATION_PREFIX + SINK_PREFIX + HTTP_SINK_PREFIX + "response_code_total"; public static final String SINK_PUSH_BATCH_SIZE_TOTAL = APPLICATION_PREFIX + SINK_PREFIX + "push_batch_size_total"; + public static final String SINK_GRPC_ERROR_TOTAL = APPLICATION_PREFIX + GRPC_SINK_PREFIX + "error_total"; // MONGO SINK MEASUREMENTS public static final String SINK_MONGO_INSERTED_TOTAL = APPLICATION_PREFIX + SINK_PREFIX + MONGO_SINK_PREFIX + "inserted_total"; diff --git a/src/main/java/com/gotocompany/firehose/sink/grpc/GrpcSinkFactory.java b/src/main/java/com/gotocompany/firehose/sink/grpc/GrpcSinkFactory.java index 9cf02ca0e..14213c36d 100644 --- a/src/main/java/com/gotocompany/firehose/sink/grpc/GrpcSinkFactory.java +++ b/src/main/java/com/gotocompany/firehose/sink/grpc/GrpcSinkFactory.java @@ -12,6 +12,7 @@ import org.aeonbits.owner.ConfigFactory; import java.util.Map; +import java.util.concurrent.TimeUnit; /** * Factory class to create the GrpcSink. @@ -29,12 +30,25 @@ public static AbstractSink create(Map configuration, StatsDRepor grpcConfig.getSinkGrpcServiceHost(), grpcConfig.getSinkGrpcServicePort(), grpcConfig.getSinkGrpcMethodUrl(), grpcConfig.getSinkGrpcResponseSchemaProtoClass()); firehoseInstrumentation.logDebug(grpcSinkConfig); - ManagedChannel managedChannel = ManagedChannelBuilder.forAddress(grpcConfig.getSinkGrpcServiceHost(), grpcConfig.getSinkGrpcServicePort()).usePlaintext().build(); + ManagedChannelBuilder managedChannelBuilder = ManagedChannelBuilder.forAddress(grpcConfig.getSinkGrpcServiceHost(), grpcConfig.getSinkGrpcServicePort()) + .usePlaintext(); + ManagedChannel managedChannel = decorateManagedChannelBuilder(grpcConfig, managedChannelBuilder).build(); GrpcClient grpcClient = new GrpcClient(new FirehoseInstrumentation(statsDReporter, GrpcClient.class), grpcConfig, managedChannel, stencilClient); + grpcClient.initialize(); firehoseInstrumentation.logInfo("GRPC connection established"); return new GrpcSink(new FirehoseInstrumentation(statsDReporter, GrpcSink.class), grpcClient, stencilClient); } + protected static ManagedChannelBuilder decorateManagedChannelBuilder(GrpcSinkConfig grpcConfig, ManagedChannelBuilder channelBuilder) { + if (grpcConfig.getSinkGrpcArgKeepaliveTimeMS() != null && grpcConfig.getSinkGrpcArgKeepaliveTimeMS() > 0) { + channelBuilder = channelBuilder.keepAliveTime(grpcConfig.getSinkGrpcArgKeepaliveTimeMS(), TimeUnit.MILLISECONDS); + } + if (grpcConfig.getSinkGrpcArgKeepaliveTimeoutMS() != null && grpcConfig.getSinkGrpcArgKeepaliveTimeoutMS() > 0) { + channelBuilder = channelBuilder.keepAliveTimeout(grpcConfig.getSinkGrpcArgKeepaliveTimeoutMS(), TimeUnit.MILLISECONDS); + } + return channelBuilder; + } + } diff --git a/src/main/java/com/gotocompany/firehose/sink/grpc/client/GrpcClient.java b/src/main/java/com/gotocompany/firehose/sink/grpc/client/GrpcClient.java index 44b11a591..181e356b4 100644 --- a/src/main/java/com/gotocompany/firehose/sink/grpc/client/GrpcClient.java +++ b/src/main/java/com/gotocompany/firehose/sink/grpc/client/GrpcClient.java @@ -1,19 +1,21 @@ package com.gotocompany.firehose.sink.grpc.client; - import com.gotocompany.firehose.config.GrpcSinkConfig; import com.gotocompany.firehose.metrics.FirehoseInstrumentation; import com.google.protobuf.DynamicMessage; -import io.grpc.ManagedChannel; +import com.gotocompany.firehose.metrics.Metrics; + +import io.grpc.CallOptions; import io.grpc.Metadata; +import io.grpc.Channel; +import io.grpc.StatusRuntimeException; +import io.grpc.ClientInterceptors; +import io.grpc.ManagedChannel; import io.grpc.MethodDescriptor; import io.grpc.stub.ClientCalls; import io.grpc.stub.MetadataUtils; -import io.grpc.Channel; -import io.grpc.ClientInterceptors; -import io.grpc.CallOptions; import com.gotocompany.stencil.client.StencilClient; import org.apache.commons.io.IOUtils; import org.apache.kafka.common.header.Header; @@ -22,6 +24,7 @@ import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; +import java.util.concurrent.TimeUnit; /** @@ -33,6 +36,7 @@ public class GrpcClient { private final GrpcSinkConfig grpcSinkConfig; private StencilClient stencilClient; private ManagedChannel managedChannel; + private MethodDescriptor methodeBuilder; public GrpcClient(FirehoseInstrumentation firehoseInstrumentation, GrpcSinkConfig grpcSinkConfig, ManagedChannel managedChannel, StencilClient stencilClient) { this.firehoseInstrumentation = firehoseInstrumentation; @@ -41,39 +45,49 @@ public GrpcClient(FirehoseInstrumentation firehoseInstrumentation, GrpcSinkConfi this.managedChannel = managedChannel; } - public DynamicMessage execute(byte[] logMessage, Headers headers) { - + public void initialize() { MethodDescriptor.Marshaller marshaller = getMarshaller(); - DynamicMessage dynamicMessage; - - try { + this.methodeBuilder = MethodDescriptor.newBuilder(marshaller, marshaller) + .setType(MethodDescriptor.MethodType.UNARY) + .setFullMethodName(grpcSinkConfig.getSinkGrpcMethodUrl()) + .build(); + } + public DynamicMessage execute(byte[] logMessage, Headers headers) { + CallOptions callOption = CallOptions.DEFAULT; + try { Metadata metadata = new Metadata(); for (Header header : headers) { metadata.put(Metadata.Key.of(header.key(), Metadata.ASCII_STRING_MARSHALLER), new String(header.value())); } - Channel decoratedChannel = ClientInterceptors.intercept(managedChannel, MetadataUtils.newAttachHeadersInterceptor(metadata)); + callOption = decorateCallOptions(callOption); byte[] response = ClientCalls.blockingUnaryCall( decoratedChannel, - MethodDescriptor.newBuilder(marshaller, marshaller) - .setType(MethodDescriptor.MethodType.UNARY) - .setFullMethodName(grpcSinkConfig.getSinkGrpcMethodUrl()) - .build(), - CallOptions.DEFAULT, + methodeBuilder, + callOption, logMessage); - dynamicMessage = stencilClient.parse(grpcSinkConfig.getSinkGrpcResponseSchemaProtoClass(), response); + return stencilClient.parse(grpcSinkConfig.getSinkGrpcResponseSchemaProtoClass(), response); + } catch (StatusRuntimeException sre) { + firehoseInstrumentation.logWarn(sre.getMessage()); + firehoseInstrumentation.incrementCounter(Metrics.SINK_GRPC_ERROR_TOTAL, "status=" + sre.getStatus().getCode()); } catch (Exception e) { + e.printStackTrace(); firehoseInstrumentation.logWarn(e.getMessage()); - dynamicMessage = DynamicMessage.newBuilder(this.stencilClient.get(this.grpcSinkConfig.getSinkGrpcResponseSchemaProtoClass())).build(); - + firehoseInstrumentation.incrementCounter(Metrics.SINK_GRPC_ERROR_TOTAL, "status=UNIDENTIFIED"); } + return DynamicMessage.newBuilder(this.stencilClient.get(this.grpcSinkConfig.getSinkGrpcResponseSchemaProtoClass())).build(); + } - return dynamicMessage; + protected CallOptions decorateCallOptions(CallOptions defaultCallOptions) { + if (grpcSinkConfig.getSinkGrpcArgDeadlineMS() != null && grpcSinkConfig.getSinkGrpcArgDeadlineMS() > 0) { + defaultCallOptions = defaultCallOptions.withDeadlineAfter(grpcSinkConfig.getSinkGrpcArgDeadlineMS(), TimeUnit.MILLISECONDS); + } + return defaultCallOptions; } private MethodDescriptor.Marshaller getMarshaller() { diff --git a/src/test/java/com/gotocompany/firehose/sink/grpc/GrpcSinkFactoryTest.java b/src/test/java/com/gotocompany/firehose/sink/grpc/GrpcSinkFactoryTest.java index ce7235005..bc7358637 100644 --- a/src/test/java/com/gotocompany/firehose/sink/grpc/GrpcSinkFactoryTest.java +++ b/src/test/java/com/gotocompany/firehose/sink/grpc/GrpcSinkFactoryTest.java @@ -1,9 +1,11 @@ package com.gotocompany.firehose.sink.grpc; +import com.gotocompany.firehose.config.GrpcSinkConfig; import com.gotocompany.firehose.exception.DeserializerException; import com.gotocompany.firehose.sink.Sink; import com.gotocompany.depot.metrics.StatsDReporter; import com.gotocompany.firehose.consumer.TestServerGrpc; +import io.grpc.ManagedChannelBuilder; import io.grpc.Server; import io.grpc.ServerBuilder; import com.gotocompany.stencil.client.StencilClient; @@ -15,8 +17,10 @@ import java.io.IOException; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.TimeUnit; -import static org.mockito.Mockito.when; +import static org.mockito.Mockito.*; +import static org.mockito.Mockito.times; import static org.mockito.MockitoAnnotations.initMocks; public class GrpcSinkFactoryTest { @@ -30,18 +34,25 @@ public class GrpcSinkFactoryTest { @Mock private StencilClient stencilClient; + @Mock + private GrpcSinkConfig grpcConfig; + + @Mock + private ManagedChannelBuilder channelBuilder; + // private static ConsulClient consulClient; @Before public void setUp() { initMocks(this); + } @Test public void shouldCreateChannelPoolWithHostAndPort() throws IOException, DeserializerException { - when(testGrpcService.bindService()).thenCallRealMethod(); + when(testGrpcService.bindService()).thenCallRealMethod(); Server server = ServerBuilder .forPort(5000) .addService(testGrpcService.bindService()) @@ -59,4 +70,39 @@ public void shouldCreateChannelPoolWithHostAndPort() throws IOException, Deseria Assert.assertNotNull(sink); server.shutdownNow(); } + + @Test + public void channelBuilderShouldBeDecoratedWithKeepaliveAndTimeOutMS() { + when(grpcConfig.getSinkGrpcArgKeepaliveTimeMS()).thenReturn(1000); + when(grpcConfig.getSinkGrpcArgKeepaliveTimeoutMS()).thenReturn(100); + when(channelBuilder.keepAliveTime(Integer.parseInt("1000"), TimeUnit.MILLISECONDS)).thenReturn(channelBuilder); + + GrpcSinkFactory.decorateManagedChannelBuilder(grpcConfig, channelBuilder); + + verify(channelBuilder, times(1)).keepAliveTimeout(Integer.parseInt("100"), TimeUnit.MILLISECONDS); + verify(channelBuilder, times(1)).keepAliveTime(Integer.parseInt("1000"), TimeUnit.MILLISECONDS); + } + + @Test + public void channelBuilderShouldBeDecoratedWithOnlyKeepaliveMS() { + when(grpcConfig.getSinkGrpcArgKeepaliveTimeMS()).thenReturn(1000); + when(grpcConfig.getSinkGrpcArgKeepaliveTimeoutMS()).thenReturn(-1); + when(channelBuilder.keepAliveTime(anyInt(), eq(TimeUnit.MILLISECONDS))).thenReturn(channelBuilder); + + GrpcSinkFactory.decorateManagedChannelBuilder(grpcConfig, channelBuilder); + + verify(channelBuilder, times(0)).keepAliveTimeout(anyInt(), eq(TimeUnit.MILLISECONDS)); + verify(channelBuilder, times(1)).keepAliveTime(Integer.parseInt("1000"), TimeUnit.MILLISECONDS); + } + + @Test + public void channelBuilderShouldNotBeDecoratedWithKeepaliveAndTimeoutMS() { + when(grpcConfig.getSinkGrpcArgKeepaliveTimeMS()).thenReturn(-1); + when(grpcConfig.getSinkGrpcArgKeepaliveTimeoutMS()).thenReturn(-1); + //when(channelBuilder.keepAliveTime(anyInt(), eq(TimeUnit.MILLISECONDS))).thenReturn(channelBuilder); + + GrpcSinkFactory.decorateManagedChannelBuilder(grpcConfig, channelBuilder); + verify(channelBuilder, times(0)).keepAliveTimeout(anyInt(), eq(TimeUnit.MILLISECONDS)); + verify(channelBuilder, times(0)).keepAliveTime(anyInt(), eq(TimeUnit.MILLISECONDS)); + } } diff --git a/src/test/java/com/gotocompany/firehose/sink/grpc/GrpcClientTest.java b/src/test/java/com/gotocompany/firehose/sink/grpc/client/GrpcClientTest.java similarity index 77% rename from src/test/java/com/gotocompany/firehose/sink/grpc/GrpcClientTest.java rename to src/test/java/com/gotocompany/firehose/sink/grpc/client/GrpcClientTest.java index 44c9b0c34..ed5304dd9 100644 --- a/src/test/java/com/gotocompany/firehose/sink/grpc/GrpcClientTest.java +++ b/src/test/java/com/gotocompany/firehose/sink/grpc/client/GrpcClientTest.java @@ -1,8 +1,8 @@ -package com.gotocompany.firehose.sink.grpc; +package com.gotocompany.firehose.sink.grpc.client; + import com.gotocompany.firehose.config.GrpcSinkConfig; import com.gotocompany.firehose.metrics.FirehoseInstrumentation; -import com.gotocompany.firehose.sink.grpc.client.GrpcClient; import com.gotocompany.firehose.consumer.Error; import com.gotocompany.firehose.consumer.TestGrpcRequest; import com.gotocompany.firehose.consumer.TestGrpcResponse; @@ -19,6 +19,7 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; +import org.mockito.Mock; import org.mockito.Mockito; import org.mockito.stubbing.Stubber; @@ -32,6 +33,8 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertNotNull; import static org.mockito.Mockito.*; public class GrpcClientTest { @@ -43,9 +46,12 @@ public class GrpcClientTest { private static final List HEADER_KEYS = Arrays.asList("test-header-key-1", "test-header-key-2"); private HeaderTestInterceptor headerTestInterceptor; + @Mock + private FirehoseInstrumentation firehoseInstrumentation; + @Before public void setup() throws IOException { - FirehoseInstrumentation firehoseInstrumentation = Mockito.mock(FirehoseInstrumentation.class); + firehoseInstrumentation = Mockito.mock(FirehoseInstrumentation.class); testGrpcService = Mockito.mock(TestServerGrpc.TestServerImplBase.class, CALLS_REAL_METHODS); headerTestInterceptor = new HeaderTestInterceptor(); headerTestInterceptor.setHeaderKeys(HEADER_KEYS); @@ -64,6 +70,7 @@ public void setup() throws IOException { StencilClient stencilClient = StencilClientFactory.getClient(); ManagedChannel managedChannel = ManagedChannelBuilder.forAddress(grpcSinkConfig.getSinkGrpcServiceHost(), grpcSinkConfig.getSinkGrpcServicePort()).usePlaintext().build(); grpcClient = new GrpcClient(firehoseInstrumentation, grpcSinkConfig, managedChannel, stencilClient); + grpcClient.initialize(); headers = new RecordHeaders(); } @@ -111,7 +118,6 @@ public void shouldCallTheGivenRpcMethodAndGetSuccessResponse() { .setField2("field2") .build(); DynamicMessage response = grpcClient.execute(request.toByteArray(), headers); - System.out.println(response.toString()); assertTrue(Boolean.parseBoolean(String.valueOf(response.getField(TestGrpcResponse.getDescriptor().findFieldByName("success"))))); } @@ -155,6 +161,29 @@ public void shouldReturnErrorWhenBytesAreNull() { assertFalse(Boolean.parseBoolean(String.valueOf(response.getField(response.getDescriptorForType().findFieldByName("success"))))); } + @Test + public void shouldNotDecorateCallOptionsWithDeadline() { + CallOptions decorateCallOptions = grpcClient.decorateCallOptions(CallOptions.DEFAULT); + assertNull(decorateCallOptions.getDeadline()); + } + + @Test + public void shouldDecorateCallOptionsWithDeadline() { + Map config = new HashMap<>(); + config.put("SINK_GRPC_SERVICE_HOST", "localhost"); + config.put("SINK_GRPC_SERVICE_PORT", "5000"); + config.put("SINK_GRPC_METHOD_URL", "com.gotocompany.firehose.consumer.TestServer/TestRpcMethod"); + config.put("SINK_GRPC_RESPONSE_SCHEMA_PROTO_CLASS", "com.gotocompany.firehose.consumer.TestGrpcResponse"); + config.put("SINK_GRPC_ARG_DEADLINE_MS", "1000"); + GrpcSinkConfig grpcSinkConfig = ConfigFactory.create(GrpcSinkConfig.class, config); + StencilClient stencilClient = StencilClientFactory.getClient(); + ManagedChannel managedChannel = ManagedChannelBuilder.forAddress(grpcSinkConfig.getSinkGrpcServiceHost(), grpcSinkConfig.getSinkGrpcServicePort()).usePlaintext().build(); + grpcClient = new GrpcClient(firehoseInstrumentation, grpcSinkConfig, managedChannel, stencilClient); + + CallOptions decorateCallOptions = grpcClient.decorateCallOptions(CallOptions.DEFAULT); + assertNotNull(decorateCallOptions.getDeadline()); + } + @Test public void shouldReturnErrorWhenGrpcException() { doThrow(new RuntimeException("error")).when(testGrpcService).testRpcMethod(any(TestGrpcRequest.class), any()); @@ -166,6 +195,16 @@ public void shouldReturnErrorWhenGrpcException() { assertFalse(Boolean.parseBoolean(String.valueOf(response.getField(response.getDescriptorForType().findFieldByName("success"))))); } + @Test + public void shouldReportMetricsWhenGrpcException() { + doThrow(new StatusRuntimeException(Status.UNKNOWN)).when(testGrpcService).testRpcMethod(any(TestGrpcRequest.class), any()); + TestGrpcRequest request = TestGrpcRequest.newBuilder() + .setField1("field1") + .setField2("field2") + .build(); + grpcClient.execute(request.toByteArray(), headers); + verify(firehoseInstrumentation, times(1)).incrementCounter("firehose_grpc_error_total", "status=" + Status.UNKNOWN.getCode()); + } private Stubber doAnswerProtoReponse(T response) { return doAnswer(invocation -> { From 2208da5dcd1edf30593b957805512d8ac0a56f99 Mon Sep 17 00:00:00 2001 From: Mayank Rai Date: Fri, 1 Sep 2023 00:00:51 +0530 Subject: [PATCH 2/6] chore: documentation and removed default value --- docs/docs/sinks/grpc-sink.md | 25 +++++++++++++++++++ .../firehose/config/GrpcSinkConfig.java | 3 --- 2 files changed, 25 insertions(+), 3 deletions(-) diff --git a/docs/docs/sinks/grpc-sink.md b/docs/docs/sinks/grpc-sink.md index 63981d8e8..4232e9c50 100644 --- a/docs/docs/sinks/grpc-sink.md +++ b/docs/docs/sinks/grpc-sink.md @@ -31,3 +31,28 @@ Defines the Proto which would be the response of the GRPC Method. - Example value: `com.tests.SampleGrpcResponse` - Type: `required` + +### `SINK_GRPC_ARG_KEEPALIVE_TIME_MS` + +The keepalive ping is a way to check if a channel is currently working by sending HTTP2 pings over the transport. It is sent periodically, and if the ping is not acknowledged by the peer within a certain timeout period, the transport is disconnected. Other keepalive configurations are described [here](https://github.com/grpc/grpc/blob/master/doc/keepalive.md). + +Defines the period (in milliseconds) after which a keepalive ping is sent on the transport. If smaller than 10000, 10000 will be used instead. + +- Example value: `60000` +- Type: `optional` +- Default value: `infinite` + +### `SINK_GRPC_ARG_KEEPALIVE_TIMEOUT_MS` + +Defines the amount of time (in milliseconds) the sender of the keepalive ping waits for an acknowledgement. If it does not receive an acknowledgment within this time, it will close the connection. + +- Example value: `5000` +- Type: `optional` +- Default value: `20000` + +### `SINK_GRPC_ARG_DEADLINE_MS` + +Defines the amount of time (in milliseconds) gRPC clients are willing to wait for an RPC to complete before the RPC is terminated with the error [DEADLINE_EXCEEDED](https://grpc.io/docs/guides/deadlines/#:~:text=By%20default%2C%20gRPC%20does%20not,realistic%20deadline%20in%20your%20clients.) + +- Example value: `1000` +- Type: `optional` diff --git a/src/main/java/com/gotocompany/firehose/config/GrpcSinkConfig.java b/src/main/java/com/gotocompany/firehose/config/GrpcSinkConfig.java index 8a6bc5066..286ff729b 100644 --- a/src/main/java/com/gotocompany/firehose/config/GrpcSinkConfig.java +++ b/src/main/java/com/gotocompany/firehose/config/GrpcSinkConfig.java @@ -18,14 +18,11 @@ public interface GrpcSinkConfig extends AppConfig { String getSinkGrpcResponseSchemaProtoClass(); @Config.Key("SINK_GRPC_ARG_KEEPALIVE_TIME_MS") - @Config.DefaultValue("-1") Integer getSinkGrpcArgKeepaliveTimeMS(); @Config.Key("SINK_GRPC_ARG_KEEPALIVE_TIMEOUT_MS") - @Config.DefaultValue("-1") Integer getSinkGrpcArgKeepaliveTimeoutMS(); @Config.Key("SINK_GRPC_ARG_DEADLINE_MS") - @Config.DefaultValue("-1") Integer getSinkGrpcArgDeadlineMS(); } From be4097d709336b22e6917692880886df3223c345 Mon Sep 17 00:00:00 2001 From: Mayank Rai Date: Mon, 4 Sep 2023 12:21:53 +0530 Subject: [PATCH 3/6] fix: typo in grpc clinet --- .../firehose/sink/grpc/client/GrpcClient.java | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/src/main/java/com/gotocompany/firehose/sink/grpc/client/GrpcClient.java b/src/main/java/com/gotocompany/firehose/sink/grpc/client/GrpcClient.java index 181e356b4..b8673cc06 100644 --- a/src/main/java/com/gotocompany/firehose/sink/grpc/client/GrpcClient.java +++ b/src/main/java/com/gotocompany/firehose/sink/grpc/client/GrpcClient.java @@ -36,7 +36,7 @@ public class GrpcClient { private final GrpcSinkConfig grpcSinkConfig; private StencilClient stencilClient; private ManagedChannel managedChannel; - private MethodDescriptor methodeBuilder; + private MethodDescriptor methodDescriptor; public GrpcClient(FirehoseInstrumentation firehoseInstrumentation, GrpcSinkConfig grpcSinkConfig, ManagedChannel managedChannel, StencilClient stencilClient) { this.firehoseInstrumentation = firehoseInstrumentation; @@ -47,7 +47,7 @@ public GrpcClient(FirehoseInstrumentation firehoseInstrumentation, GrpcSinkConfi public void initialize() { MethodDescriptor.Marshaller marshaller = getMarshaller(); - this.methodeBuilder = MethodDescriptor.newBuilder(marshaller, marshaller) + this.methodDescriptor = MethodDescriptor.newBuilder(marshaller, marshaller) .setType(MethodDescriptor.MethodType.UNARY) .setFullMethodName(grpcSinkConfig.getSinkGrpcMethodUrl()) .build(); @@ -55,7 +55,6 @@ public void initialize() { public DynamicMessage execute(byte[] logMessage, Headers headers) { - CallOptions callOption = CallOptions.DEFAULT; try { Metadata metadata = new Metadata(); for (Header header : headers) { @@ -63,11 +62,10 @@ public DynamicMessage execute(byte[] logMessage, Headers headers) { } Channel decoratedChannel = ClientInterceptors.intercept(managedChannel, MetadataUtils.newAttachHeadersInterceptor(metadata)); - callOption = decorateCallOptions(callOption); byte[] response = ClientCalls.blockingUnaryCall( decoratedChannel, - methodeBuilder, - callOption, + methodDescriptor, + decorateCallOptions(CallOptions.DEFAULT), logMessage); return stencilClient.parse(grpcSinkConfig.getSinkGrpcResponseSchemaProtoClass(), response); @@ -83,11 +81,11 @@ public DynamicMessage execute(byte[] logMessage, Headers headers) { return DynamicMessage.newBuilder(this.stencilClient.get(this.grpcSinkConfig.getSinkGrpcResponseSchemaProtoClass())).build(); } - protected CallOptions decorateCallOptions(CallOptions defaultCallOptions) { + protected CallOptions decorateCallOptions(CallOptions defaultCallOption) { if (grpcSinkConfig.getSinkGrpcArgDeadlineMS() != null && grpcSinkConfig.getSinkGrpcArgDeadlineMS() > 0) { - defaultCallOptions = defaultCallOptions.withDeadlineAfter(grpcSinkConfig.getSinkGrpcArgDeadlineMS(), TimeUnit.MILLISECONDS); + defaultCallOption = defaultCallOption.withDeadlineAfter(grpcSinkConfig.getSinkGrpcArgDeadlineMS(), TimeUnit.MILLISECONDS); } - return defaultCallOptions; + return defaultCallOption; } private MethodDescriptor.Marshaller getMarshaller() { From 1dfd8db746bfea45ba971f058d82066edf1ada6a Mon Sep 17 00:00:00 2001 From: Mayank Rai Date: Mon, 9 Oct 2023 13:10:35 +0530 Subject: [PATCH 4/6] fix: default value for keepalive and keepalive timeout --- .../firehose/config/GrpcSinkConfig.java | 8 ++-- .../firehose/sink/grpc/GrpcSinkFactory.java | 18 ++------ .../firehose/sink/grpc/client/GrpcClient.java | 17 ++++---- .../sink/grpc/GrpcSinkFactoryTest.java | 41 ++----------------- .../sink/grpc/client/GrpcClientTest.java | 15 +++---- 5 files changed, 25 insertions(+), 74 deletions(-) diff --git a/src/main/java/com/gotocompany/firehose/config/GrpcSinkConfig.java b/src/main/java/com/gotocompany/firehose/config/GrpcSinkConfig.java index 286ff729b..8ebef0f6d 100644 --- a/src/main/java/com/gotocompany/firehose/config/GrpcSinkConfig.java +++ b/src/main/java/com/gotocompany/firehose/config/GrpcSinkConfig.java @@ -18,11 +18,13 @@ public interface GrpcSinkConfig extends AppConfig { String getSinkGrpcResponseSchemaProtoClass(); @Config.Key("SINK_GRPC_ARG_KEEPALIVE_TIME_MS") - Integer getSinkGrpcArgKeepaliveTimeMS(); + @Config.DefaultValue("9223372036854775807") + Long getSinkGrpcArgKeepaliveTimeMS(); @Config.Key("SINK_GRPC_ARG_KEEPALIVE_TIMEOUT_MS") - Integer getSinkGrpcArgKeepaliveTimeoutMS(); + @DefaultValue("20000") + Long getSinkGrpcArgKeepaliveTimeoutMS(); @Config.Key("SINK_GRPC_ARG_DEADLINE_MS") - Integer getSinkGrpcArgDeadlineMS(); + Long getSinkGrpcArgDeadlineMS(); } diff --git a/src/main/java/com/gotocompany/firehose/sink/grpc/GrpcSinkFactory.java b/src/main/java/com/gotocompany/firehose/sink/grpc/GrpcSinkFactory.java index 14213c36d..a405a5f57 100644 --- a/src/main/java/com/gotocompany/firehose/sink/grpc/GrpcSinkFactory.java +++ b/src/main/java/com/gotocompany/firehose/sink/grpc/GrpcSinkFactory.java @@ -30,25 +30,15 @@ public static AbstractSink create(Map configuration, StatsDRepor grpcConfig.getSinkGrpcServiceHost(), grpcConfig.getSinkGrpcServicePort(), grpcConfig.getSinkGrpcMethodUrl(), grpcConfig.getSinkGrpcResponseSchemaProtoClass()); firehoseInstrumentation.logDebug(grpcSinkConfig); - ManagedChannelBuilder managedChannelBuilder = ManagedChannelBuilder.forAddress(grpcConfig.getSinkGrpcServiceHost(), grpcConfig.getSinkGrpcServicePort()) - .usePlaintext(); - ManagedChannel managedChannel = decorateManagedChannelBuilder(grpcConfig, managedChannelBuilder).build(); + ManagedChannel managedChannel = ManagedChannelBuilder.forAddress(grpcConfig.getSinkGrpcServiceHost(), grpcConfig.getSinkGrpcServicePort()) + .keepAliveTime(grpcConfig.getSinkGrpcArgKeepaliveTimeMS(), TimeUnit.MILLISECONDS) + .keepAliveTimeout(grpcConfig.getSinkGrpcArgKeepaliveTimeoutMS(), TimeUnit.MILLISECONDS) + .usePlaintext().build(); GrpcClient grpcClient = new GrpcClient(new FirehoseInstrumentation(statsDReporter, GrpcClient.class), grpcConfig, managedChannel, stencilClient); - grpcClient.initialize(); firehoseInstrumentation.logInfo("GRPC connection established"); return new GrpcSink(new FirehoseInstrumentation(statsDReporter, GrpcSink.class), grpcClient, stencilClient); } - protected static ManagedChannelBuilder decorateManagedChannelBuilder(GrpcSinkConfig grpcConfig, ManagedChannelBuilder channelBuilder) { - if (grpcConfig.getSinkGrpcArgKeepaliveTimeMS() != null && grpcConfig.getSinkGrpcArgKeepaliveTimeMS() > 0) { - channelBuilder = channelBuilder.keepAliveTime(grpcConfig.getSinkGrpcArgKeepaliveTimeMS(), TimeUnit.MILLISECONDS); - } - if (grpcConfig.getSinkGrpcArgKeepaliveTimeoutMS() != null && grpcConfig.getSinkGrpcArgKeepaliveTimeoutMS() > 0) { - channelBuilder = channelBuilder.keepAliveTimeout(grpcConfig.getSinkGrpcArgKeepaliveTimeoutMS(), TimeUnit.MILLISECONDS); - } - return channelBuilder; - } - } diff --git a/src/main/java/com/gotocompany/firehose/sink/grpc/client/GrpcClient.java b/src/main/java/com/gotocompany/firehose/sink/grpc/client/GrpcClient.java index b8673cc06..7178ab1f9 100644 --- a/src/main/java/com/gotocompany/firehose/sink/grpc/client/GrpcClient.java +++ b/src/main/java/com/gotocompany/firehose/sink/grpc/client/GrpcClient.java @@ -36,21 +36,20 @@ public class GrpcClient { private final GrpcSinkConfig grpcSinkConfig; private StencilClient stencilClient; private ManagedChannel managedChannel; - private MethodDescriptor methodDescriptor; + private final MethodDescriptor methodDescriptor; + private final DynamicMessage emptyResponse; public GrpcClient(FirehoseInstrumentation firehoseInstrumentation, GrpcSinkConfig grpcSinkConfig, ManagedChannel managedChannel, StencilClient stencilClient) { this.firehoseInstrumentation = firehoseInstrumentation; this.grpcSinkConfig = grpcSinkConfig; this.stencilClient = stencilClient; this.managedChannel = managedChannel; - } - - public void initialize() { MethodDescriptor.Marshaller marshaller = getMarshaller(); this.methodDescriptor = MethodDescriptor.newBuilder(marshaller, marshaller) .setType(MethodDescriptor.MethodType.UNARY) .setFullMethodName(grpcSinkConfig.getSinkGrpcMethodUrl()) .build(); + this.emptyResponse = DynamicMessage.newBuilder(this.stencilClient.get(this.grpcSinkConfig.getSinkGrpcResponseSchemaProtoClass())).build(); } public DynamicMessage execute(byte[] logMessage, Headers headers) { @@ -65,7 +64,7 @@ public DynamicMessage execute(byte[] logMessage, Headers headers) { byte[] response = ClientCalls.blockingUnaryCall( decoratedChannel, methodDescriptor, - decorateCallOptions(CallOptions.DEFAULT), + decoratedDefaultCallOptions(), logMessage); return stencilClient.parse(grpcSinkConfig.getSinkGrpcResponseSchemaProtoClass(), response); @@ -74,16 +73,16 @@ public DynamicMessage execute(byte[] logMessage, Headers headers) { firehoseInstrumentation.logWarn(sre.getMessage()); firehoseInstrumentation.incrementCounter(Metrics.SINK_GRPC_ERROR_TOTAL, "status=" + sre.getStatus().getCode()); } catch (Exception e) { - e.printStackTrace(); firehoseInstrumentation.logWarn(e.getMessage()); firehoseInstrumentation.incrementCounter(Metrics.SINK_GRPC_ERROR_TOTAL, "status=UNIDENTIFIED"); } - return DynamicMessage.newBuilder(this.stencilClient.get(this.grpcSinkConfig.getSinkGrpcResponseSchemaProtoClass())).build(); + return emptyResponse; } - protected CallOptions decorateCallOptions(CallOptions defaultCallOption) { + protected CallOptions decoratedDefaultCallOptions() { + CallOptions defaultCallOption = CallOptions.DEFAULT; if (grpcSinkConfig.getSinkGrpcArgDeadlineMS() != null && grpcSinkConfig.getSinkGrpcArgDeadlineMS() > 0) { - defaultCallOption = defaultCallOption.withDeadlineAfter(grpcSinkConfig.getSinkGrpcArgDeadlineMS(), TimeUnit.MILLISECONDS); + return defaultCallOption.withDeadlineAfter(grpcSinkConfig.getSinkGrpcArgDeadlineMS(), TimeUnit.MILLISECONDS); } return defaultCallOption; } diff --git a/src/test/java/com/gotocompany/firehose/sink/grpc/GrpcSinkFactoryTest.java b/src/test/java/com/gotocompany/firehose/sink/grpc/GrpcSinkFactoryTest.java index bc7358637..807bee2a4 100644 --- a/src/test/java/com/gotocompany/firehose/sink/grpc/GrpcSinkFactoryTest.java +++ b/src/test/java/com/gotocompany/firehose/sink/grpc/GrpcSinkFactoryTest.java @@ -17,10 +17,9 @@ import java.io.IOException; import java.util.HashMap; import java.util.Map; -import java.util.concurrent.TimeUnit; -import static org.mockito.Mockito.*; -import static org.mockito.Mockito.times; +import static org.mockito.Mockito.when; + import static org.mockito.MockitoAnnotations.initMocks; public class GrpcSinkFactoryTest { @@ -68,41 +67,7 @@ public void shouldCreateChannelPoolWithHostAndPort() throws IOException, Deseria Sink sink = GrpcSinkFactory.create(config, statsDReporter, stencilClient); Assert.assertNotNull(sink); - server.shutdownNow(); - } - - @Test - public void channelBuilderShouldBeDecoratedWithKeepaliveAndTimeOutMS() { - when(grpcConfig.getSinkGrpcArgKeepaliveTimeMS()).thenReturn(1000); - when(grpcConfig.getSinkGrpcArgKeepaliveTimeoutMS()).thenReturn(100); - when(channelBuilder.keepAliveTime(Integer.parseInt("1000"), TimeUnit.MILLISECONDS)).thenReturn(channelBuilder); - - GrpcSinkFactory.decorateManagedChannelBuilder(grpcConfig, channelBuilder); - - verify(channelBuilder, times(1)).keepAliveTimeout(Integer.parseInt("100"), TimeUnit.MILLISECONDS); - verify(channelBuilder, times(1)).keepAliveTime(Integer.parseInt("1000"), TimeUnit.MILLISECONDS); - } - @Test - public void channelBuilderShouldBeDecoratedWithOnlyKeepaliveMS() { - when(grpcConfig.getSinkGrpcArgKeepaliveTimeMS()).thenReturn(1000); - when(grpcConfig.getSinkGrpcArgKeepaliveTimeoutMS()).thenReturn(-1); - when(channelBuilder.keepAliveTime(anyInt(), eq(TimeUnit.MILLISECONDS))).thenReturn(channelBuilder); - - GrpcSinkFactory.decorateManagedChannelBuilder(grpcConfig, channelBuilder); - - verify(channelBuilder, times(0)).keepAliveTimeout(anyInt(), eq(TimeUnit.MILLISECONDS)); - verify(channelBuilder, times(1)).keepAliveTime(Integer.parseInt("1000"), TimeUnit.MILLISECONDS); - } - - @Test - public void channelBuilderShouldNotBeDecoratedWithKeepaliveAndTimeoutMS() { - when(grpcConfig.getSinkGrpcArgKeepaliveTimeMS()).thenReturn(-1); - when(grpcConfig.getSinkGrpcArgKeepaliveTimeoutMS()).thenReturn(-1); - //when(channelBuilder.keepAliveTime(anyInt(), eq(TimeUnit.MILLISECONDS))).thenReturn(channelBuilder); - - GrpcSinkFactory.decorateManagedChannelBuilder(grpcConfig, channelBuilder); - verify(channelBuilder, times(0)).keepAliveTimeout(anyInt(), eq(TimeUnit.MILLISECONDS)); - verify(channelBuilder, times(0)).keepAliveTime(anyInt(), eq(TimeUnit.MILLISECONDS)); + server.shutdownNow(); } } diff --git a/src/test/java/com/gotocompany/firehose/sink/grpc/client/GrpcClientTest.java b/src/test/java/com/gotocompany/firehose/sink/grpc/client/GrpcClientTest.java index ed5304dd9..1a796520b 100644 --- a/src/test/java/com/gotocompany/firehose/sink/grpc/client/GrpcClientTest.java +++ b/src/test/java/com/gotocompany/firehose/sink/grpc/client/GrpcClientTest.java @@ -30,11 +30,7 @@ import java.util.List; import java.util.Map; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.*; import static org.mockito.Mockito.*; public class GrpcClientTest { @@ -70,7 +66,6 @@ public void setup() throws IOException { StencilClient stencilClient = StencilClientFactory.getClient(); ManagedChannel managedChannel = ManagedChannelBuilder.forAddress(grpcSinkConfig.getSinkGrpcServiceHost(), grpcSinkConfig.getSinkGrpcServicePort()).usePlaintext().build(); grpcClient = new GrpcClient(firehoseInstrumentation, grpcSinkConfig, managedChannel, stencilClient); - grpcClient.initialize(); headers = new RecordHeaders(); } @@ -163,8 +158,8 @@ public void shouldReturnErrorWhenBytesAreNull() { @Test public void shouldNotDecorateCallOptionsWithDeadline() { - CallOptions decorateCallOptions = grpcClient.decorateCallOptions(CallOptions.DEFAULT); - assertNull(decorateCallOptions.getDeadline()); + CallOptions decoratedCallOptions = grpcClient.decoratedDefaultCallOptions(); + assertNull(decoratedCallOptions.getDeadline()); } @Test @@ -180,8 +175,8 @@ public void shouldDecorateCallOptionsWithDeadline() { ManagedChannel managedChannel = ManagedChannelBuilder.forAddress(grpcSinkConfig.getSinkGrpcServiceHost(), grpcSinkConfig.getSinkGrpcServicePort()).usePlaintext().build(); grpcClient = new GrpcClient(firehoseInstrumentation, grpcSinkConfig, managedChannel, stencilClient); - CallOptions decorateCallOptions = grpcClient.decorateCallOptions(CallOptions.DEFAULT); - assertNotNull(decorateCallOptions.getDeadline()); + CallOptions decoratedCallOptions = grpcClient.decoratedDefaultCallOptions(); + assertNotNull(decoratedCallOptions.getDeadline()); } @Test From d69bae9ef3b49d0277b610e92cbf5863b0f9f866 Mon Sep 17 00:00:00 2001 From: Mayank Rai Date: Fri, 27 Oct 2023 17:11:31 +0530 Subject: [PATCH 5/6] fix: unit test for grpcsink factory test --- .../sink/grpc/GrpcSinkFactoryTest.java | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/src/test/java/com/gotocompany/firehose/sink/grpc/GrpcSinkFactoryTest.java b/src/test/java/com/gotocompany/firehose/sink/grpc/GrpcSinkFactoryTest.java index 807bee2a4..b4de43727 100644 --- a/src/test/java/com/gotocompany/firehose/sink/grpc/GrpcSinkFactoryTest.java +++ b/src/test/java/com/gotocompany/firehose/sink/grpc/GrpcSinkFactoryTest.java @@ -1,14 +1,17 @@ package com.gotocompany.firehose.sink.grpc; import com.gotocompany.firehose.config.GrpcSinkConfig; +import com.gotocompany.firehose.consumer.TestGrpcResponse; import com.gotocompany.firehose.exception.DeserializerException; import com.gotocompany.firehose.sink.Sink; import com.gotocompany.depot.metrics.StatsDReporter; import com.gotocompany.firehose.consumer.TestServerGrpc; +import com.gotocompany.stencil.StencilClientFactory; import io.grpc.ManagedChannelBuilder; import io.grpc.Server; import io.grpc.ServerBuilder; import com.gotocompany.stencil.client.StencilClient; +import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -30,7 +33,6 @@ public class GrpcSinkFactoryTest { @Mock private TestServerGrpc.TestServerImplBase testGrpcService; - @Mock private StencilClient stencilClient; @Mock @@ -39,12 +41,19 @@ public class GrpcSinkFactoryTest { @Mock private ManagedChannelBuilder channelBuilder; -// private static ConsulClient consulClient; + private Server server; @Before public void setUp() { initMocks(this); + stencilClient = StencilClientFactory.getClient(); + } + @After + public void tearDown() { + if (server != null) { + server.shutdown(); + } } @@ -52,7 +61,7 @@ public void setUp() { public void shouldCreateChannelPoolWithHostAndPort() throws IOException, DeserializerException { when(testGrpcService.bindService()).thenCallRealMethod(); - Server server = ServerBuilder + server = ServerBuilder .forPort(5000) .addService(testGrpcService.bindService()) .build() @@ -62,12 +71,10 @@ public void shouldCreateChannelPoolWithHostAndPort() throws IOException, Deseria config.put("SINK_GRPC_METHOD_URL", "com.gotocompany.firehose.consumer.TestServer/TestRpcMethod"); config.put("SINK_GRPC_SERVICE_HOST", "localhost"); config.put("SINK_GRPC_SERVICE_PORT", "5000"); - + config.put("SINK_GRPC_RESPONSE_SCHEMA_PROTO_CLASS", TestGrpcResponse.class.getName()); Sink sink = GrpcSinkFactory.create(config, statsDReporter, stencilClient); Assert.assertNotNull(sink); - - server.shutdownNow(); } } From f3cb6b21a7017ff2e4eba3f1339e260469230cd1 Mon Sep 17 00:00:00 2001 From: Mayank Rai Date: Tue, 16 Jan 2024 15:55:05 +0530 Subject: [PATCH 6/6] chore: bump version for grpc keepalive and deadline configuration --- build.gradle | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build.gradle b/build.gradle index 17af10512..d5bc72bc6 100644 --- a/build.gradle +++ b/build.gradle @@ -33,7 +33,7 @@ lombok { } group 'com.gotocompany' -version '0.9.6' +version '0.9.7' def projName = "firehose"