From e8fe6b091f26223089bb5b752998b119e96d0927 Mon Sep 17 00:00:00 2001 From: Mayank Rai Date: Tue, 29 Aug 2023 12:00:18 +0530 Subject: [PATCH 1/7] chore: add keepalive and keepalive timeout for grpc external processor --- .../external/grpc/GrpcSourceConfig.java | 78 ++++++++++++++----- .../grpc/GrpcSourceConfigBuilder.java | 14 +++- .../external/grpc/client/GrpcClient.java | 36 +++++---- .../external/grpc/client/GrpcClientTest.java | 65 +++++++++++++--- docs/docs/advance/post_processor.md | 18 +++++ 5 files changed, 166 insertions(+), 45 deletions(-) diff --git a/dagger-core/src/main/java/com/gotocompany/dagger/core/processors/external/grpc/GrpcSourceConfig.java b/dagger-core/src/main/java/com/gotocompany/dagger/core/processors/external/grpc/GrpcSourceConfig.java index d9e1b1680..2f8a337ff 100644 --- a/dagger-core/src/main/java/com/gotocompany/dagger/core/processors/external/grpc/GrpcSourceConfig.java +++ b/dagger-core/src/main/java/com/gotocompany/dagger/core/processors/external/grpc/GrpcSourceConfig.java @@ -24,6 +24,8 @@ public class GrpcSourceConfig implements Serializable, SourceConfig { private String grpcMethodUrl; private String requestPattern; private String requestVariables; + private String grpcArgKeepaliveTimeMs; + private String grpcArgKeepaliveTimeoutMs; private String streamTimeout; private String connectTimeout; private boolean failOnErrors; @@ -63,30 +65,34 @@ public GrpcSourceConfig(String endpoint, int servicePort, String grpcRequestProt /** * Instantiates a new Grpc source config with specified grpc stencil url. * - * @param endpoint the endpoint - * @param servicePort the service port - * @param grpcRequestProtoSchema the grpc request proto schema - * @param grpcResponseProtoSchema the grpc response proto schema - * @param grpcMethodUrl the grpc method url - * @param requestPattern the request pattern - * @param requestVariables the request variables - * @param streamTimeout the stream timeout - * @param connectTimeout the connect timeout - * @param failOnErrors the fail on errors - * @param grpcStencilUrl the grpc stencil url - * @param type the type - * @param retainResponseType the retain response type - * @param headers the headers - * @param outputMapping the output mapping - * @param metricId the metric id - * @param capacity the capacity + * @param endpoint the endpoint + * @param servicePort the service port + * @param grpcRequestProtoSchema the grpc request proto schema + * @param grpcResponseProtoSchema the grpc response proto schema + * @param grpcMethodUrl the grpc method url + * @param requestPattern the request pattern + * @param grpcArgKeepaliveTimeMs the grpc Keepalive Time ms + * @param grpcArgKeepaliveTimeoutMs the grpc Keepalive Timeout ms + * @param requestVariables the request variables + * @param streamTimeout the stream timeout + * @param connectTimeout the connect timeout + * @param failOnErrors the fail on errors + * @param grpcStencilUrl the grpc stencil url + * @param type the type + * @param retainResponseType the retain response type + * @param headers the headers + * @param outputMapping the output mapping + * @param metricId the metric id + * @param capacity the capacity */ - public GrpcSourceConfig(String endpoint, int servicePort, String grpcRequestProtoSchema, String grpcResponseProtoSchema, String grpcMethodUrl, String requestPattern, String requestVariables, String streamTimeout, String connectTimeout, boolean failOnErrors, String grpcStencilUrl, String type, boolean retainResponseType, Map headers, Map outputMapping, String metricId, int capacity) { + public GrpcSourceConfig(String endpoint, int servicePort, String grpcRequestProtoSchema, String grpcResponseProtoSchema, String grpcMethodUrl, String grpcArgKeepaliveTimeMs, String grpcArgKeepaliveTimeoutMs , String requestPattern, String requestVariables, String streamTimeout, String connectTimeout, boolean failOnErrors, String grpcStencilUrl, String type, boolean retainResponseType, Map headers, Map outputMapping, String metricId, int capacity) { this.endpoint = endpoint; this.servicePort = servicePort; this.grpcRequestProtoSchema = grpcRequestProtoSchema; this.grpcResponseProtoSchema = grpcResponseProtoSchema; this.grpcMethodUrl = grpcMethodUrl; + this.grpcArgKeepaliveTimeMs = grpcArgKeepaliveTimeMs; + this.grpcArgKeepaliveTimeoutMs = grpcArgKeepaliveTimeoutMs; this.requestPattern = requestPattern; this.requestVariables = requestVariables; this.streamTimeout = streamTimeout; @@ -209,6 +215,42 @@ public String getGrpcMethodUrl() { return grpcMethodUrl; } + /** + * Gets grpc arg keepalive time ms. + * + * @return grpc arg keepalive time ms + */ + public String getGrpcArgKeepaliveTimeMs() { + return grpcArgKeepaliveTimeMs; + } + + /** + * Gets grpc arg keepalive timeout ms. + * + * @return grpc arg keepalive timeout ms + */ + public String getGrpcArgKeepaliveTimeoutMs() { + return grpcArgKeepaliveTimeoutMs; + } + + /** + * Sets grpc arg keepalive time ms. + * + * @param grpcArgKeepaliveTimeMs the grpc arg keepalive time ms + */ + public void setGrpcArgKeepaliveTimeMs(String grpcArgKeepaliveTimeMs) { + this.grpcArgKeepaliveTimeMs = grpcArgKeepaliveTimeMs; + } + + /** + * Sets grpc arg keepalive timeout ms. + * + * @param grpcArgKeepaliveTimeOutMs the grpc arg keepalive timeout ms + */ + public void setGrpcArgKeepaliveTimeOutMs(String grpcArgKeepaliveTimeOutMs) { + this.grpcArgKeepaliveTimeoutMs = grpcArgKeepaliveTimeOutMs; + } + /** * Gets service port. * diff --git a/dagger-core/src/main/java/com/gotocompany/dagger/core/processors/external/grpc/GrpcSourceConfigBuilder.java b/dagger-core/src/main/java/com/gotocompany/dagger/core/processors/external/grpc/GrpcSourceConfigBuilder.java index 729975177..fc260b70f 100644 --- a/dagger-core/src/main/java/com/gotocompany/dagger/core/processors/external/grpc/GrpcSourceConfigBuilder.java +++ b/dagger-core/src/main/java/com/gotocompany/dagger/core/processors/external/grpc/GrpcSourceConfigBuilder.java @@ -10,6 +10,8 @@ public class GrpcSourceConfigBuilder { private String grpcRequestProtoSchema; private String grpcResponseProtoSchema; private String grpcMethodUrl; + private String grpcArgKeepaliveTimeMs; + private String grpcArgKeepaliveTimeOutMs; private String requestPattern; private String requestVariables; private Map outputMapping; @@ -108,8 +110,18 @@ public GrpcSourceConfigBuilder setCapacity(int capacity) { return this; } + public GrpcSourceConfigBuilder setGrpcArgKeepaliveTimeMs(String grpcArgKeepaliveTimeMs) { + this.grpcArgKeepaliveTimeMs = grpcArgKeepaliveTimeMs; + return this; + } + + public GrpcSourceConfigBuilder setGrpcArgKeepaliveTimeOutMs(String grpcArgKeepaliveTimeOutMs) { + this.grpcArgKeepaliveTimeOutMs = grpcArgKeepaliveTimeOutMs; + return this; + } + public GrpcSourceConfig createGrpcSourceConfig() { - return new GrpcSourceConfig(endpoint, servicePort, grpcRequestProtoSchema, grpcResponseProtoSchema, grpcMethodUrl, requestPattern, requestVariables, + return new GrpcSourceConfig(endpoint, servicePort, grpcRequestProtoSchema, grpcResponseProtoSchema, grpcMethodUrl, grpcArgKeepaliveTimeMs, grpcArgKeepaliveTimeOutMs, requestPattern, requestVariables, streamTimeout, connectTimeout, failOnErrors, grpcStencilUrl, type, retainResponseType, headers, outputMapping, metricId, capacity); } } diff --git a/dagger-core/src/main/java/com/gotocompany/dagger/core/processors/external/grpc/client/GrpcClient.java b/dagger-core/src/main/java/com/gotocompany/dagger/core/processors/external/grpc/client/GrpcClient.java index f77a54f48..14a2c1f50 100644 --- a/dagger-core/src/main/java/com/gotocompany/dagger/core/processors/external/grpc/client/GrpcClient.java +++ b/dagger-core/src/main/java/com/gotocompany/dagger/core/processors/external/grpc/client/GrpcClient.java @@ -5,18 +5,14 @@ import com.google.protobuf.Descriptors.Descriptor; import com.google.protobuf.DynamicMessage; -import io.grpc.Channel; -import io.grpc.ManagedChannelBuilder; -import io.grpc.ClientInterceptors; -import io.grpc.Metadata; -import io.grpc.CallOptions; -import io.grpc.ClientCall; -import io.grpc.MethodDescriptor; +import io.grpc.*; import io.grpc.stub.ClientCalls; import io.grpc.stub.MetadataUtils; import io.grpc.stub.StreamObserver; +import org.apache.commons.lang3.StringUtils; import java.util.Map; +import java.util.concurrent.TimeUnit; /** * The Grpc client. @@ -24,7 +20,7 @@ public class GrpcClient { private final GrpcSourceConfig grpcConfig; - private Channel decoratedChannel; + private ManagedChannel decoratedChannel; /** * Instantiates a new Grpc client. @@ -39,19 +35,27 @@ public GrpcClient(GrpcSourceConfig grpcConfig) { * Add channel. */ public void addChannel() { - Channel channel = ManagedChannelBuilder.forAddress(grpcConfig.getEndpoint(), grpcConfig.getServicePort()).usePlaintext().build(); - - Metadata metadata = new Metadata(); + ManagedChannelBuilder channelBuilder = ManagedChannelBuilder.forAddress(grpcConfig.getEndpoint(), grpcConfig.getServicePort()).usePlaintext(); + channelBuilder = decorateManagedChannelBuilder(channelBuilder); + decoratedChannel = channelBuilder.build(); + } + protected ManagedChannelBuilder decorateManagedChannelBuilder(ManagedChannelBuilder channelBuilder){ + if(StringUtils.isNotEmpty(grpcConfig.getGrpcArgKeepaliveTimeMs())){ + channelBuilder = channelBuilder.keepAliveTime(Long.parseLong(grpcConfig.getGrpcArgKeepaliveTimeMs()), TimeUnit.MILLISECONDS); + } + if(StringUtils.isNotEmpty(grpcConfig.getGrpcArgKeepaliveTimeoutMs())){ + channelBuilder = channelBuilder.keepAliveTimeout(Long.parseLong(grpcConfig.getGrpcArgKeepaliveTimeoutMs()), TimeUnit.MILLISECONDS); + } if (grpcConfig.getHeaders() != null && !grpcConfig.getHeaders().isEmpty()) { + Metadata metadata = new Metadata(); for (Map.Entry header : grpcConfig.getHeaders().entrySet()) { metadata.put(Metadata.Key.of(header.getKey(), Metadata.ASCII_STRING_MARSHALLER), header.getValue()); } + channelBuilder.intercept(MetadataUtils.newAttachHeadersInterceptor(metadata)); } - decoratedChannel = ClientInterceptors.intercept(channel, - MetadataUtils.newAttachHeadersInterceptor(metadata)); - + return channelBuilder; } /** @@ -89,6 +93,10 @@ private ClientCall createCall(CallOptions callOp * Close channel. */ public void close() { + if(decoratedChannel !=null && decoratedChannel.isShutdown()) + { + decoratedChannel.shutdown(); + } this.decoratedChannel = null; } diff --git a/dagger-core/src/test/java/com/gotocompany/dagger/core/processors/external/grpc/client/GrpcClientTest.java b/dagger-core/src/test/java/com/gotocompany/dagger/core/processors/external/grpc/client/GrpcClientTest.java index b3d4078de..023449073 100644 --- a/dagger-core/src/test/java/com/gotocompany/dagger/core/processors/external/grpc/client/GrpcClientTest.java +++ b/dagger-core/src/test/java/com/gotocompany/dagger/core/processors/external/grpc/client/GrpcClientTest.java @@ -1,27 +1,43 @@ package com.gotocompany.dagger.core.processors.external.grpc.client; +import com.gotocompany.dagger.common.exceptions.DescriptorNotFoundException; +import com.gotocompany.dagger.core.processors.common.OutputMapping; +import com.gotocompany.dagger.core.processors.external.grpc.GrpcSourceConfigBuilder; import io.grpc.Channel; import com.gotocompany.dagger.core.processors.external.grpc.GrpcSourceConfig; +import io.grpc.ManagedChannelBuilder; +import org.junit.Before; import org.junit.Test; +import org.mockito.Mock; + +import java.util.HashMap; +import java.util.concurrent.TimeUnit; import static org.junit.Assert.*; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.*; +import static org.mockito.Mockito.times; +import static org.mockito.MockitoAnnotations.initMocks; public class GrpcClientTest { + @Mock private GrpcSourceConfig grpcSourceConfig; + @Mock + ManagedChannelBuilder channelBuilder; + + @Before + public void setUp() { + initMocks(this); + when(grpcSourceConfig.getEndpoint()).thenReturn("localhost"); + when(grpcSourceConfig.getServicePort()).thenReturn(8080); + } @Test public void channelShouldBeAddedForAHostAndPort() { - grpcSourceConfig = mock(GrpcSourceConfig.class); - GrpcClient grpcClient = new GrpcClient(grpcSourceConfig); - when(grpcSourceConfig.getEndpoint()).thenReturn("localhost"); - when(grpcSourceConfig.getServicePort()).thenReturn(8080); - grpcClient.addChannel(); Channel decoratedChannel = grpcClient.getDecoratedChannel(); @@ -30,14 +46,39 @@ public void channelShouldBeAddedForAHostAndPort() { } @Test - public void grpcClientCloseShouldWork() { + public void channelBuilderShouldNotBeDecoratedWithKeepaliveORTimeoutMS() { + GrpcClient grpcClient = new GrpcClient(grpcSourceConfig); + grpcClient.decorateManagedChannelBuilder(channelBuilder); + verify(channelBuilder, never()).keepAliveTime(anyLong(), any()); + verify(channelBuilder, never()).keepAliveTimeout(anyLong(), any()); + } - grpcSourceConfig = mock(GrpcSourceConfig.class); + @Test + public void channelBuilderShouldBeDecoratedWithKeepaliveMS() { + when(grpcSourceConfig.getGrpcArgKeepaliveTimeMs()).thenReturn("1000"); GrpcClient grpcClient = new GrpcClient(grpcSourceConfig); + grpcClient.decorateManagedChannelBuilder(channelBuilder); + verify(channelBuilder, times(1)).keepAliveTime(Long.parseLong("1000"),TimeUnit.MILLISECONDS); + verify(channelBuilder, never()).keepAliveTimeout(anyLong(), any()); + } - when(grpcSourceConfig.getEndpoint()).thenReturn("localhost"); - when(grpcSourceConfig.getServicePort()).thenReturn(8080); + @Test + public void channelBuilderShouldBeDecoratedWithKeepaliveAndTimeOutMS() { + when(grpcSourceConfig.getGrpcArgKeepaliveTimeMs()).thenReturn("1000"); + when(grpcSourceConfig.getGrpcArgKeepaliveTimeoutMs()).thenReturn("100"); + when(channelBuilder.keepAliveTime(anyLong(),any())).thenReturn(channelBuilder); + + GrpcClient grpcClient = new GrpcClient(grpcSourceConfig); + grpcClient.decorateManagedChannelBuilder(channelBuilder); + verify(channelBuilder, times(1)).keepAliveTimeout(Long.parseLong("100"),TimeUnit.MILLISECONDS); + verify(channelBuilder, times(1)).keepAliveTime(Long.parseLong("1000"), TimeUnit.MILLISECONDS); + } + + @Test + public void grpcClientCloseShouldWork() { + + GrpcClient grpcClient = new GrpcClient(grpcSourceConfig); grpcClient.addChannel(); @@ -45,7 +86,7 @@ public void grpcClientCloseShouldWork() { assertNotNull(decoratedChannel); grpcClient.close(); - decoratedChannel = grpcClient.getDecoratedChannel(); + decoratedChannel = grpcClient.getDecoratedChannel(); assertNull(decoratedChannel); } diff --git a/docs/docs/advance/post_processor.md b/docs/docs/advance/post_processor.md index 3c1f8aa5c..a5634cfb9 100644 --- a/docs/docs/advance/post_processor.md +++ b/docs/docs/advance/post_processor.md @@ -666,6 +666,24 @@ The timeout value for gRPC client in ms. - Example value: `5000` - Type: `required` +##### `grpc_arg_keepalive_time_ms` + +The keepalive ping is a way to check if a channel is currently working by sending HTTP2 pings over the transport. It is sent periodically, and if the ping is not acknowledged by the peer within a certain timeout period, the transport is disconnected. Other keepalive configurations are described [here](https://github.com/grpc/grpc/blob/master/doc/keepalive.md). + +This channel argument controls the period (in milliseconds) after which a keepalive ping is sent on the transport. If smaller than 10000, 10000 will be used instead. + +- Example value: `60000` +- Type: `optional` +- Default value: `infinite` + +##### `grpc_arg_keepalive_timeout_ms` + +This channel argument controls the amount of time (in milliseconds) the sender of the keepalive ping waits for an acknowledgement. If it does not receive an acknowledgment within this time, it will close the connection. + +- Example value: `5000` +- Type: `optional` +- Default value: `20000` + ##### `fail_on_errors` A flag for deciding whether the job should fail on encountering errors or not. If set false the job won’t fail and enrich with empty fields otherwise the job will fail. From 445d7d858d1dd0baa45c842fc5431ff647da6302 Mon Sep 17 00:00:00 2001 From: Mayank Rai Date: Tue, 29 Aug 2023 12:22:11 +0530 Subject: [PATCH 2/7] chore: add keepalive and keepalive timeout for grpc external processor --- config/checkstyle/checkstyle.xml | 2 +- .../external/grpc/GrpcSourceConfig.java | 2 +- .../external/grpc/client/GrpcClient.java | 17 +++++++++++------ .../external/grpc/client/GrpcClientTest.java | 13 +++++-------- 4 files changed, 18 insertions(+), 16 deletions(-) diff --git a/config/checkstyle/checkstyle.xml b/config/checkstyle/checkstyle.xml index dad0a53d8..4074eed88 100644 --- a/config/checkstyle/checkstyle.xml +++ b/config/checkstyle/checkstyle.xml @@ -95,7 +95,7 @@ - + diff --git a/dagger-core/src/main/java/com/gotocompany/dagger/core/processors/external/grpc/GrpcSourceConfig.java b/dagger-core/src/main/java/com/gotocompany/dagger/core/processors/external/grpc/GrpcSourceConfig.java index 2f8a337ff..7633527b4 100644 --- a/dagger-core/src/main/java/com/gotocompany/dagger/core/processors/external/grpc/GrpcSourceConfig.java +++ b/dagger-core/src/main/java/com/gotocompany/dagger/core/processors/external/grpc/GrpcSourceConfig.java @@ -85,7 +85,7 @@ public GrpcSourceConfig(String endpoint, int servicePort, String grpcRequestProt * @param metricId the metric id * @param capacity the capacity */ - public GrpcSourceConfig(String endpoint, int servicePort, String grpcRequestProtoSchema, String grpcResponseProtoSchema, String grpcMethodUrl, String grpcArgKeepaliveTimeMs, String grpcArgKeepaliveTimeoutMs , String requestPattern, String requestVariables, String streamTimeout, String connectTimeout, boolean failOnErrors, String grpcStencilUrl, String type, boolean retainResponseType, Map headers, Map outputMapping, String metricId, int capacity) { + public GrpcSourceConfig(String endpoint, int servicePort, String grpcRequestProtoSchema, String grpcResponseProtoSchema, String grpcMethodUrl, String grpcArgKeepaliveTimeMs, String grpcArgKeepaliveTimeoutMs, String requestPattern, String requestVariables, String streamTimeout, String connectTimeout, boolean failOnErrors, String grpcStencilUrl, String type, boolean retainResponseType, Map headers, Map outputMapping, String metricId, int capacity) { this.endpoint = endpoint; this.servicePort = servicePort; this.grpcRequestProtoSchema = grpcRequestProtoSchema; diff --git a/dagger-core/src/main/java/com/gotocompany/dagger/core/processors/external/grpc/client/GrpcClient.java b/dagger-core/src/main/java/com/gotocompany/dagger/core/processors/external/grpc/client/GrpcClient.java index 14a2c1f50..9b44a444a 100644 --- a/dagger-core/src/main/java/com/gotocompany/dagger/core/processors/external/grpc/client/GrpcClient.java +++ b/dagger-core/src/main/java/com/gotocompany/dagger/core/processors/external/grpc/client/GrpcClient.java @@ -5,7 +5,13 @@ import com.google.protobuf.Descriptors.Descriptor; import com.google.protobuf.DynamicMessage; -import io.grpc.*; +import io.grpc.CallOptions; +import io.grpc.ClientCall; +import io.grpc.Channel; +import io.grpc.MethodDescriptor; +import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; +import io.grpc.Metadata; import io.grpc.stub.ClientCalls; import io.grpc.stub.MetadataUtils; import io.grpc.stub.StreamObserver; @@ -40,11 +46,11 @@ public void addChannel() { decoratedChannel = channelBuilder.build(); } - protected ManagedChannelBuilder decorateManagedChannelBuilder(ManagedChannelBuilder channelBuilder){ - if(StringUtils.isNotEmpty(grpcConfig.getGrpcArgKeepaliveTimeMs())){ + protected ManagedChannelBuilder decorateManagedChannelBuilder(ManagedChannelBuilder channelBuilder) { + if (StringUtils.isNotEmpty(grpcConfig.getGrpcArgKeepaliveTimeMs())) { channelBuilder = channelBuilder.keepAliveTime(Long.parseLong(grpcConfig.getGrpcArgKeepaliveTimeMs()), TimeUnit.MILLISECONDS); } - if(StringUtils.isNotEmpty(grpcConfig.getGrpcArgKeepaliveTimeoutMs())){ + if (StringUtils.isNotEmpty(grpcConfig.getGrpcArgKeepaliveTimeoutMs())) { channelBuilder = channelBuilder.keepAliveTimeout(Long.parseLong(grpcConfig.getGrpcArgKeepaliveTimeoutMs()), TimeUnit.MILLISECONDS); } if (grpcConfig.getHeaders() != null && !grpcConfig.getHeaders().isEmpty()) { @@ -93,8 +99,7 @@ private ClientCall createCall(CallOptions callOp * Close channel. */ public void close() { - if(decoratedChannel !=null && decoratedChannel.isShutdown()) - { + if (decoratedChannel != null && decoratedChannel.isShutdown()) { decoratedChannel.shutdown(); } this.decoratedChannel = null; diff --git a/dagger-core/src/test/java/com/gotocompany/dagger/core/processors/external/grpc/client/GrpcClientTest.java b/dagger-core/src/test/java/com/gotocompany/dagger/core/processors/external/grpc/client/GrpcClientTest.java index 023449073..175ca90cc 100644 --- a/dagger-core/src/test/java/com/gotocompany/dagger/core/processors/external/grpc/client/GrpcClientTest.java +++ b/dagger-core/src/test/java/com/gotocompany/dagger/core/processors/external/grpc/client/GrpcClientTest.java @@ -1,8 +1,5 @@ package com.gotocompany.dagger.core.processors.external.grpc.client; -import com.gotocompany.dagger.common.exceptions.DescriptorNotFoundException; -import com.gotocompany.dagger.core.processors.common.OutputMapping; -import com.gotocompany.dagger.core.processors.external.grpc.GrpcSourceConfigBuilder; import io.grpc.Channel; import com.gotocompany.dagger.core.processors.external.grpc.GrpcSourceConfig; import io.grpc.ManagedChannelBuilder; @@ -10,7 +7,6 @@ import org.junit.Test; import org.mockito.Mock; -import java.util.HashMap; import java.util.concurrent.TimeUnit; import static org.junit.Assert.*; @@ -23,8 +19,9 @@ public class GrpcClientTest { @Mock private GrpcSourceConfig grpcSourceConfig; + @Mock - ManagedChannelBuilder channelBuilder; + private ManagedChannelBuilder channelBuilder; @Before public void setUp() { @@ -59,7 +56,7 @@ public void channelBuilderShouldBeDecoratedWithKeepaliveMS() { GrpcClient grpcClient = new GrpcClient(grpcSourceConfig); grpcClient.decorateManagedChannelBuilder(channelBuilder); - verify(channelBuilder, times(1)).keepAliveTime(Long.parseLong("1000"),TimeUnit.MILLISECONDS); + verify(channelBuilder, times(1)).keepAliveTime(Long.parseLong("1000"), TimeUnit.MILLISECONDS); verify(channelBuilder, never()).keepAliveTimeout(anyLong(), any()); } @@ -67,11 +64,11 @@ public void channelBuilderShouldBeDecoratedWithKeepaliveMS() { public void channelBuilderShouldBeDecoratedWithKeepaliveAndTimeOutMS() { when(grpcSourceConfig.getGrpcArgKeepaliveTimeMs()).thenReturn("1000"); when(grpcSourceConfig.getGrpcArgKeepaliveTimeoutMs()).thenReturn("100"); - when(channelBuilder.keepAliveTime(anyLong(),any())).thenReturn(channelBuilder); + when(channelBuilder.keepAliveTime(anyLong(), any())).thenReturn(channelBuilder); GrpcClient grpcClient = new GrpcClient(grpcSourceConfig); grpcClient.decorateManagedChannelBuilder(channelBuilder); - verify(channelBuilder, times(1)).keepAliveTimeout(Long.parseLong("100"),TimeUnit.MILLISECONDS); + verify(channelBuilder, times(1)).keepAliveTimeout(Long.parseLong("100"), TimeUnit.MILLISECONDS); verify(channelBuilder, times(1)).keepAliveTime(Long.parseLong("1000"), TimeUnit.MILLISECONDS); } From 09e2551ae0b46aacefc5c2432a862d9a770f4b96 Mon Sep 17 00:00:00 2001 From: Mayank Rai Date: Thu, 31 Aug 2023 19:11:28 +0530 Subject: [PATCH 3/7] fix: grpc channel shutdown and reformation --- .../external/grpc/GrpcSourceConfig.java | 38 +++++++++---------- .../grpc/GrpcSourceConfigBuilder.java | 8 ++-- .../external/grpc/client/GrpcClient.java | 2 +- 3 files changed, 24 insertions(+), 24 deletions(-) diff --git a/dagger-core/src/main/java/com/gotocompany/dagger/core/processors/external/grpc/GrpcSourceConfig.java b/dagger-core/src/main/java/com/gotocompany/dagger/core/processors/external/grpc/GrpcSourceConfig.java index 7633527b4..3d7e1f1e4 100644 --- a/dagger-core/src/main/java/com/gotocompany/dagger/core/processors/external/grpc/GrpcSourceConfig.java +++ b/dagger-core/src/main/java/com/gotocompany/dagger/core/processors/external/grpc/GrpcSourceConfig.java @@ -65,25 +65,25 @@ public GrpcSourceConfig(String endpoint, int servicePort, String grpcRequestProt /** * Instantiates a new Grpc source config with specified grpc stencil url. * - * @param endpoint the endpoint - * @param servicePort the service port - * @param grpcRequestProtoSchema the grpc request proto schema - * @param grpcResponseProtoSchema the grpc response proto schema - * @param grpcMethodUrl the grpc method url - * @param requestPattern the request pattern - * @param grpcArgKeepaliveTimeMs the grpc Keepalive Time ms - * @param grpcArgKeepaliveTimeoutMs the grpc Keepalive Timeout ms - * @param requestVariables the request variables - * @param streamTimeout the stream timeout - * @param connectTimeout the connect timeout - * @param failOnErrors the fail on errors - * @param grpcStencilUrl the grpc stencil url - * @param type the type - * @param retainResponseType the retain response type - * @param headers the headers - * @param outputMapping the output mapping - * @param metricId the metric id - * @param capacity the capacity + * @param endpoint the endpoint + * @param servicePort the service port + * @param grpcRequestProtoSchema the grpc request proto schema + * @param grpcResponseProtoSchema the grpc response proto schema + * @param grpcMethodUrl the grpc method url + * @param requestPattern the request pattern + * @param grpcArgKeepaliveTimeMs the grpc Keepalive Time ms + * @param grpcArgKeepaliveTimeoutMs the grpc Keepalive Timeout ms + * @param requestVariables the request variables + * @param streamTimeout the stream timeout + * @param connectTimeout the connect timeout + * @param failOnErrors the fail on errors + * @param grpcStencilUrl the grpc stencil url + * @param type the type + * @param retainResponseType the retain response type + * @param headers the headers + * @param outputMapping the output mapping + * @param metricId the metric id + * @param capacity the capacity */ public GrpcSourceConfig(String endpoint, int servicePort, String grpcRequestProtoSchema, String grpcResponseProtoSchema, String grpcMethodUrl, String grpcArgKeepaliveTimeMs, String grpcArgKeepaliveTimeoutMs, String requestPattern, String requestVariables, String streamTimeout, String connectTimeout, boolean failOnErrors, String grpcStencilUrl, String type, boolean retainResponseType, Map headers, Map outputMapping, String metricId, int capacity) { this.endpoint = endpoint; diff --git a/dagger-core/src/main/java/com/gotocompany/dagger/core/processors/external/grpc/GrpcSourceConfigBuilder.java b/dagger-core/src/main/java/com/gotocompany/dagger/core/processors/external/grpc/GrpcSourceConfigBuilder.java index fc260b70f..da9393926 100644 --- a/dagger-core/src/main/java/com/gotocompany/dagger/core/processors/external/grpc/GrpcSourceConfigBuilder.java +++ b/dagger-core/src/main/java/com/gotocompany/dagger/core/processors/external/grpc/GrpcSourceConfigBuilder.java @@ -11,7 +11,7 @@ public class GrpcSourceConfigBuilder { private String grpcResponseProtoSchema; private String grpcMethodUrl; private String grpcArgKeepaliveTimeMs; - private String grpcArgKeepaliveTimeOutMs; + private String grpcArgKeepaliveTimeoutMs; private String requestPattern; private String requestVariables; private Map outputMapping; @@ -115,13 +115,13 @@ public GrpcSourceConfigBuilder setGrpcArgKeepaliveTimeMs(String grpcArgKeepalive return this; } - public GrpcSourceConfigBuilder setGrpcArgKeepaliveTimeOutMs(String grpcArgKeepaliveTimeOutMs) { - this.grpcArgKeepaliveTimeOutMs = grpcArgKeepaliveTimeOutMs; + public GrpcSourceConfigBuilder setGrpcArgKeepaliveTimeoutMs(String grpcArgKeepaliveTimeoutMs) { + this.grpcArgKeepaliveTimeoutMs = grpcArgKeepaliveTimeoutMs; return this; } public GrpcSourceConfig createGrpcSourceConfig() { - return new GrpcSourceConfig(endpoint, servicePort, grpcRequestProtoSchema, grpcResponseProtoSchema, grpcMethodUrl, grpcArgKeepaliveTimeMs, grpcArgKeepaliveTimeOutMs, requestPattern, requestVariables, + return new GrpcSourceConfig(endpoint, servicePort, grpcRequestProtoSchema, grpcResponseProtoSchema, grpcMethodUrl, grpcArgKeepaliveTimeMs, grpcArgKeepaliveTimeoutMs, requestPattern, requestVariables, streamTimeout, connectTimeout, failOnErrors, grpcStencilUrl, type, retainResponseType, headers, outputMapping, metricId, capacity); } } diff --git a/dagger-core/src/main/java/com/gotocompany/dagger/core/processors/external/grpc/client/GrpcClient.java b/dagger-core/src/main/java/com/gotocompany/dagger/core/processors/external/grpc/client/GrpcClient.java index 9b44a444a..9c89041f8 100644 --- a/dagger-core/src/main/java/com/gotocompany/dagger/core/processors/external/grpc/client/GrpcClient.java +++ b/dagger-core/src/main/java/com/gotocompany/dagger/core/processors/external/grpc/client/GrpcClient.java @@ -99,7 +99,7 @@ private ClientCall createCall(CallOptions callOp * Close channel. */ public void close() { - if (decoratedChannel != null && decoratedChannel.isShutdown()) { + if (decoratedChannel != null && !decoratedChannel.isShutdown()) { decoratedChannel.shutdown(); } this.decoratedChannel = null; From d3da38790f60790227d0db50ef95190131392105 Mon Sep 17 00:00:00 2001 From: Mayank Rai Date: Thu, 31 Aug 2023 19:19:01 +0530 Subject: [PATCH 4/7] fix: rename grpcArgKeepaliveTimeOutMs to grpcArgKeepaliveTimeoutMs --- .../core/processors/external/grpc/GrpcSourceConfig.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/dagger-core/src/main/java/com/gotocompany/dagger/core/processors/external/grpc/GrpcSourceConfig.java b/dagger-core/src/main/java/com/gotocompany/dagger/core/processors/external/grpc/GrpcSourceConfig.java index 3d7e1f1e4..2a88745ae 100644 --- a/dagger-core/src/main/java/com/gotocompany/dagger/core/processors/external/grpc/GrpcSourceConfig.java +++ b/dagger-core/src/main/java/com/gotocompany/dagger/core/processors/external/grpc/GrpcSourceConfig.java @@ -245,10 +245,10 @@ public void setGrpcArgKeepaliveTimeMs(String grpcArgKeepaliveTimeMs) { /** * Sets grpc arg keepalive timeout ms. * - * @param grpcArgKeepaliveTimeOutMs the grpc arg keepalive timeout ms + * @param grpcArgKeepaliveTimeoutMs the grpc arg keepalive timeout ms */ - public void setGrpcArgKeepaliveTimeOutMs(String grpcArgKeepaliveTimeOutMs) { - this.grpcArgKeepaliveTimeoutMs = grpcArgKeepaliveTimeOutMs; + public void setGrpcArgKeepaliveTimeoutMs(String grpcArgKeepaliveTimeoutMs) { + this.grpcArgKeepaliveTimeoutMs = grpcArgKeepaliveTimeoutMs; } /** From dfc83d146a7d43d1cb3c86d7aea51c9c24eeba33 Mon Sep 17 00:00:00 2001 From: Mayank Rai Date: Mon, 4 Dec 2023 12:14:21 +0530 Subject: [PATCH 5/7] fix: added default value for keepalive and keepalivetimeout --- .../external/grpc/client/GrpcClient.java | 16 ++++++++++------ .../external/grpc/client/GrpcClientTest.java | 18 ------------------ 2 files changed, 10 insertions(+), 24 deletions(-) diff --git a/dagger-core/src/main/java/com/gotocompany/dagger/core/processors/external/grpc/client/GrpcClient.java b/dagger-core/src/main/java/com/gotocompany/dagger/core/processors/external/grpc/client/GrpcClient.java index 9c89041f8..88263dc97 100644 --- a/dagger-core/src/main/java/com/gotocompany/dagger/core/processors/external/grpc/client/GrpcClient.java +++ b/dagger-core/src/main/java/com/gotocompany/dagger/core/processors/external/grpc/client/GrpcClient.java @@ -28,6 +28,10 @@ public class GrpcClient { private ManagedChannel decoratedChannel; + private final long defaultKeepAliveTimeout = 20000L; + + private final long defaultKeepAliveInterval = Long.MAX_VALUE; + /** * Instantiates a new Grpc client. * @@ -47,12 +51,12 @@ public void addChannel() { } protected ManagedChannelBuilder decorateManagedChannelBuilder(ManagedChannelBuilder channelBuilder) { - if (StringUtils.isNotEmpty(grpcConfig.getGrpcArgKeepaliveTimeMs())) { - channelBuilder = channelBuilder.keepAliveTime(Long.parseLong(grpcConfig.getGrpcArgKeepaliveTimeMs()), TimeUnit.MILLISECONDS); - } - if (StringUtils.isNotEmpty(grpcConfig.getGrpcArgKeepaliveTimeoutMs())) { - channelBuilder = channelBuilder.keepAliveTimeout(Long.parseLong(grpcConfig.getGrpcArgKeepaliveTimeoutMs()), TimeUnit.MILLISECONDS); - } + + long keepAliveInterval = StringUtils.isNotEmpty(grpcConfig.getGrpcArgKeepaliveTimeMs()) ? Long.parseLong(grpcConfig.getGrpcArgKeepaliveTimeMs()): defaultKeepAliveInterval; + long keepAliveTimeout = StringUtils.isNotEmpty(grpcConfig.getGrpcArgKeepaliveTimeoutMs()) ? Long.parseLong(grpcConfig.getGrpcArgKeepaliveTimeoutMs()): defaultKeepAliveTimeout; + + channelBuilder = channelBuilder.keepAliveTime(keepAliveInterval, TimeUnit.MILLISECONDS).keepAliveTimeout(keepAliveTimeout, TimeUnit.MILLISECONDS); + if (grpcConfig.getHeaders() != null && !grpcConfig.getHeaders().isEmpty()) { Metadata metadata = new Metadata(); for (Map.Entry header : grpcConfig.getHeaders().entrySet()) { diff --git a/dagger-core/src/test/java/com/gotocompany/dagger/core/processors/external/grpc/client/GrpcClientTest.java b/dagger-core/src/test/java/com/gotocompany/dagger/core/processors/external/grpc/client/GrpcClientTest.java index 175ca90cc..50295a4bb 100644 --- a/dagger-core/src/test/java/com/gotocompany/dagger/core/processors/external/grpc/client/GrpcClientTest.java +++ b/dagger-core/src/test/java/com/gotocompany/dagger/core/processors/external/grpc/client/GrpcClientTest.java @@ -42,24 +42,6 @@ public void channelShouldBeAddedForAHostAndPort() { } - @Test - public void channelBuilderShouldNotBeDecoratedWithKeepaliveORTimeoutMS() { - GrpcClient grpcClient = new GrpcClient(grpcSourceConfig); - grpcClient.decorateManagedChannelBuilder(channelBuilder); - verify(channelBuilder, never()).keepAliveTime(anyLong(), any()); - verify(channelBuilder, never()).keepAliveTimeout(anyLong(), any()); - } - - @Test - public void channelBuilderShouldBeDecoratedWithKeepaliveMS() { - when(grpcSourceConfig.getGrpcArgKeepaliveTimeMs()).thenReturn("1000"); - - GrpcClient grpcClient = new GrpcClient(grpcSourceConfig); - grpcClient.decorateManagedChannelBuilder(channelBuilder); - verify(channelBuilder, times(1)).keepAliveTime(Long.parseLong("1000"), TimeUnit.MILLISECONDS); - verify(channelBuilder, never()).keepAliveTimeout(anyLong(), any()); - } - @Test public void channelBuilderShouldBeDecoratedWithKeepaliveAndTimeOutMS() { when(grpcSourceConfig.getGrpcArgKeepaliveTimeMs()).thenReturn("1000"); From 55556972eef69cbe8dba1eeb72a964048fd78251 Mon Sep 17 00:00:00 2001 From: Mayank Rai Date: Mon, 4 Dec 2023 12:35:24 +0530 Subject: [PATCH 6/7] fix: checkstyle fix --- .../core/processors/external/grpc/client/GrpcClient.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dagger-core/src/main/java/com/gotocompany/dagger/core/processors/external/grpc/client/GrpcClient.java b/dagger-core/src/main/java/com/gotocompany/dagger/core/processors/external/grpc/client/GrpcClient.java index 88263dc97..8b6b70764 100644 --- a/dagger-core/src/main/java/com/gotocompany/dagger/core/processors/external/grpc/client/GrpcClient.java +++ b/dagger-core/src/main/java/com/gotocompany/dagger/core/processors/external/grpc/client/GrpcClient.java @@ -52,8 +52,8 @@ public void addChannel() { protected ManagedChannelBuilder decorateManagedChannelBuilder(ManagedChannelBuilder channelBuilder) { - long keepAliveInterval = StringUtils.isNotEmpty(grpcConfig.getGrpcArgKeepaliveTimeMs()) ? Long.parseLong(grpcConfig.getGrpcArgKeepaliveTimeMs()): defaultKeepAliveInterval; - long keepAliveTimeout = StringUtils.isNotEmpty(grpcConfig.getGrpcArgKeepaliveTimeoutMs()) ? Long.parseLong(grpcConfig.getGrpcArgKeepaliveTimeoutMs()): defaultKeepAliveTimeout; + long keepAliveInterval = StringUtils.isNotEmpty(grpcConfig.getGrpcArgKeepaliveTimeMs()) ? Long.parseLong(grpcConfig.getGrpcArgKeepaliveTimeMs()) : defaultKeepAliveInterval; + long keepAliveTimeout = StringUtils.isNotEmpty(grpcConfig.getGrpcArgKeepaliveTimeoutMs()) ? Long.parseLong(grpcConfig.getGrpcArgKeepaliveTimeoutMs()) : defaultKeepAliveTimeout; channelBuilder = channelBuilder.keepAliveTime(keepAliveInterval, TimeUnit.MILLISECONDS).keepAliveTimeout(keepAliveTimeout, TimeUnit.MILLISECONDS); From 27fe1c5ae0d9692d43d377eb0a6c7e5d2a17307c Mon Sep 17 00:00:00 2001 From: Mayank Rai Date: Mon, 15 Apr 2024 10:18:01 +0530 Subject: [PATCH 7/7] fix: bump up version --- version.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/version.txt b/version.txt index 571215736..5eef0f10e 100644 --- a/version.txt +++ b/version.txt @@ -1 +1 @@ -0.10.1 +0.10.2