Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: add keepalive time and keepalive timeout for grpc external processor #23

Merged
merged 7 commits into from
Apr 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion config/checkstyle/checkstyle.xml
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@
<!-- See http://checkstyle.sf.net/config_sizes.html -->
<module name="MethodLength"/>
<module name="ParameterNumber">
<property name="max" value="17"/>
<property name="max" value="19"/>
</module>

<!-- Checks for whitespace -->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ public class GrpcSourceConfig implements Serializable, SourceConfig {
private String grpcMethodUrl;
private String requestPattern;
private String requestVariables;
private String grpcArgKeepaliveTimeMs;
private String grpcArgKeepaliveTimeoutMs;
mayankrai09 marked this conversation as resolved.
Show resolved Hide resolved
private String streamTimeout;
private String connectTimeout;
private boolean failOnErrors;
Expand Down Expand Up @@ -63,30 +65,34 @@ public GrpcSourceConfig(String endpoint, int servicePort, String grpcRequestProt
/**
* Instantiates a new Grpc source config with specified grpc stencil url.
*
* @param endpoint the endpoint
* @param servicePort the service port
* @param grpcRequestProtoSchema the grpc request proto schema
* @param grpcResponseProtoSchema the grpc response proto schema
* @param grpcMethodUrl the grpc method url
* @param requestPattern the request pattern
* @param requestVariables the request variables
* @param streamTimeout the stream timeout
* @param connectTimeout the connect timeout
* @param failOnErrors the fail on errors
* @param grpcStencilUrl the grpc stencil url
* @param type the type
* @param retainResponseType the retain response type
* @param headers the headers
* @param outputMapping the output mapping
* @param metricId the metric id
* @param capacity the capacity
* @param endpoint the endpoint
* @param servicePort the service port
* @param grpcRequestProtoSchema the grpc request proto schema
* @param grpcResponseProtoSchema the grpc response proto schema
* @param grpcMethodUrl the grpc method url
* @param requestPattern the request pattern
* @param grpcArgKeepaliveTimeMs the grpc Keepalive Time ms
* @param grpcArgKeepaliveTimeoutMs the grpc Keepalive Timeout ms
* @param requestVariables the request variables
* @param streamTimeout the stream timeout
* @param connectTimeout the connect timeout
* @param failOnErrors the fail on errors
* @param grpcStencilUrl the grpc stencil url
* @param type the type
* @param retainResponseType the retain response type
* @param headers the headers
* @param outputMapping the output mapping
* @param metricId the metric id
* @param capacity the capacity
*/
public GrpcSourceConfig(String endpoint, int servicePort, String grpcRequestProtoSchema, String grpcResponseProtoSchema, String grpcMethodUrl, String requestPattern, String requestVariables, String streamTimeout, String connectTimeout, boolean failOnErrors, String grpcStencilUrl, String type, boolean retainResponseType, Map<String, String> headers, Map<String, OutputMapping> outputMapping, String metricId, int capacity) {
public GrpcSourceConfig(String endpoint, int servicePort, String grpcRequestProtoSchema, String grpcResponseProtoSchema, String grpcMethodUrl, String grpcArgKeepaliveTimeMs, String grpcArgKeepaliveTimeoutMs, String requestPattern, String requestVariables, String streamTimeout, String connectTimeout, boolean failOnErrors, String grpcStencilUrl, String type, boolean retainResponseType, Map<String, String> headers, Map<String, OutputMapping> outputMapping, String metricId, int capacity) {
this.endpoint = endpoint;
this.servicePort = servicePort;
this.grpcRequestProtoSchema = grpcRequestProtoSchema;
this.grpcResponseProtoSchema = grpcResponseProtoSchema;
this.grpcMethodUrl = grpcMethodUrl;
this.grpcArgKeepaliveTimeMs = grpcArgKeepaliveTimeMs;
this.grpcArgKeepaliveTimeoutMs = grpcArgKeepaliveTimeoutMs;
this.requestPattern = requestPattern;
this.requestVariables = requestVariables;
this.streamTimeout = streamTimeout;
Expand Down Expand Up @@ -209,6 +215,42 @@ public String getGrpcMethodUrl() {
return grpcMethodUrl;
}

/**
* Gets grpc arg keepalive time ms.
*
* @return grpc arg keepalive time ms
*/
public String getGrpcArgKeepaliveTimeMs() {
return grpcArgKeepaliveTimeMs;
}

/**
* Gets grpc arg keepalive timeout ms.
*
* @return grpc arg keepalive timeout ms
*/
public String getGrpcArgKeepaliveTimeoutMs() {
return grpcArgKeepaliveTimeoutMs;
}

/**
* Sets grpc arg keepalive time ms.
*
* @param grpcArgKeepaliveTimeMs the grpc arg keepalive time ms
*/
public void setGrpcArgKeepaliveTimeMs(String grpcArgKeepaliveTimeMs) {
this.grpcArgKeepaliveTimeMs = grpcArgKeepaliveTimeMs;
}

/**
* Sets grpc arg keepalive timeout ms.
*
* @param grpcArgKeepaliveTimeoutMs the grpc arg keepalive timeout ms
*/
public void setGrpcArgKeepaliveTimeoutMs(String grpcArgKeepaliveTimeoutMs) {
this.grpcArgKeepaliveTimeoutMs = grpcArgKeepaliveTimeoutMs;
}

/**
* Gets service port.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ public class GrpcSourceConfigBuilder {
private String grpcRequestProtoSchema;
private String grpcResponseProtoSchema;
private String grpcMethodUrl;
private String grpcArgKeepaliveTimeMs;
private String grpcArgKeepaliveTimeoutMs;
private String requestPattern;
private String requestVariables;
private Map<String, OutputMapping> outputMapping;
Expand Down Expand Up @@ -108,8 +110,18 @@ public GrpcSourceConfigBuilder setCapacity(int capacity) {
return this;
}

public GrpcSourceConfigBuilder setGrpcArgKeepaliveTimeMs(String grpcArgKeepaliveTimeMs) {
this.grpcArgKeepaliveTimeMs = grpcArgKeepaliveTimeMs;
return this;
}

public GrpcSourceConfigBuilder setGrpcArgKeepaliveTimeoutMs(String grpcArgKeepaliveTimeoutMs) {
this.grpcArgKeepaliveTimeoutMs = grpcArgKeepaliveTimeoutMs;
return this;
}

public GrpcSourceConfig createGrpcSourceConfig() {
return new GrpcSourceConfig(endpoint, servicePort, grpcRequestProtoSchema, grpcResponseProtoSchema, grpcMethodUrl, requestPattern, requestVariables,
return new GrpcSourceConfig(endpoint, servicePort, grpcRequestProtoSchema, grpcResponseProtoSchema, grpcMethodUrl, grpcArgKeepaliveTimeMs, grpcArgKeepaliveTimeoutMs, requestPattern, requestVariables,
streamTimeout, connectTimeout, failOnErrors, grpcStencilUrl, type, retainResponseType, headers, outputMapping, metricId, capacity);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,26 +5,32 @@
import com.google.protobuf.Descriptors.Descriptor;
import com.google.protobuf.DynamicMessage;

import io.grpc.Channel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.ClientInterceptors;
import io.grpc.Metadata;
import io.grpc.CallOptions;
import io.grpc.ClientCall;
import io.grpc.Channel;
import io.grpc.MethodDescriptor;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.Metadata;
import io.grpc.stub.ClientCalls;
import io.grpc.stub.MetadataUtils;
import io.grpc.stub.StreamObserver;
import org.apache.commons.lang3.StringUtils;

import java.util.Map;
import java.util.concurrent.TimeUnit;

/**
* The Grpc client.
*/
public class GrpcClient {
private final GrpcSourceConfig grpcConfig;

private Channel decoratedChannel;
private ManagedChannel decoratedChannel;

private final long defaultKeepAliveTimeout = 20000L;

private final long defaultKeepAliveInterval = Long.MAX_VALUE;

/**
* Instantiates a new Grpc client.
Expand All @@ -39,19 +45,27 @@ public GrpcClient(GrpcSourceConfig grpcConfig) {
* Add channel.
*/
public void addChannel() {
Channel channel = ManagedChannelBuilder.forAddress(grpcConfig.getEndpoint(), grpcConfig.getServicePort()).usePlaintext().build();
ManagedChannelBuilder<?> channelBuilder = ManagedChannelBuilder.forAddress(grpcConfig.getEndpoint(), grpcConfig.getServicePort()).usePlaintext();
channelBuilder = decorateManagedChannelBuilder(channelBuilder);
decoratedChannel = channelBuilder.build();
}

protected ManagedChannelBuilder<?> decorateManagedChannelBuilder(ManagedChannelBuilder<?> channelBuilder) {

Metadata metadata = new Metadata();
long keepAliveInterval = StringUtils.isNotEmpty(grpcConfig.getGrpcArgKeepaliveTimeMs()) ? Long.parseLong(grpcConfig.getGrpcArgKeepaliveTimeMs()) : defaultKeepAliveInterval;
long keepAliveTimeout = StringUtils.isNotEmpty(grpcConfig.getGrpcArgKeepaliveTimeoutMs()) ? Long.parseLong(grpcConfig.getGrpcArgKeepaliveTimeoutMs()) : defaultKeepAliveTimeout;

channelBuilder = channelBuilder.keepAliveTime(keepAliveInterval, TimeUnit.MILLISECONDS).keepAliveTimeout(keepAliveTimeout, TimeUnit.MILLISECONDS);

if (grpcConfig.getHeaders() != null && !grpcConfig.getHeaders().isEmpty()) {
Metadata metadata = new Metadata();
for (Map.Entry<String, String> header : grpcConfig.getHeaders().entrySet()) {
metadata.put(Metadata.Key.of(header.getKey(), Metadata.ASCII_STRING_MARSHALLER), header.getValue());
}
channelBuilder.intercept(MetadataUtils.newAttachHeadersInterceptor(metadata));
}
decoratedChannel = ClientInterceptors.intercept(channel,
MetadataUtils.newAttachHeadersInterceptor(metadata));


return channelBuilder;
}

/**
Expand Down Expand Up @@ -89,6 +103,9 @@ private ClientCall<DynamicMessage, DynamicMessage> createCall(CallOptions callOp
* Close channel.
*/
public void close() {
if (decoratedChannel != null && !decoratedChannel.isShutdown()) {
decoratedChannel.shutdown();
}
this.decoratedChannel = null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,39 @@

import io.grpc.Channel;
import com.gotocompany.dagger.core.processors.external.grpc.GrpcSourceConfig;
import io.grpc.ManagedChannelBuilder;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;

import java.util.concurrent.TimeUnit;

import static org.junit.Assert.*;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.*;
import static org.mockito.Mockito.times;
import static org.mockito.MockitoAnnotations.initMocks;

public class GrpcClientTest {

@Mock
private GrpcSourceConfig grpcSourceConfig;

@Mock
private ManagedChannelBuilder channelBuilder;

@Before
public void setUp() {
initMocks(this);
when(grpcSourceConfig.getEndpoint()).thenReturn("localhost");
when(grpcSourceConfig.getServicePort()).thenReturn(8080);
}

@Test
public void channelShouldBeAddedForAHostAndPort() {

grpcSourceConfig = mock(GrpcSourceConfig.class);

GrpcClient grpcClient = new GrpcClient(grpcSourceConfig);

when(grpcSourceConfig.getEndpoint()).thenReturn("localhost");
when(grpcSourceConfig.getServicePort()).thenReturn(8080);

grpcClient.addChannel();

Channel decoratedChannel = grpcClient.getDecoratedChannel();
Expand All @@ -30,22 +43,29 @@ public void channelShouldBeAddedForAHostAndPort() {
}

@Test
public void grpcClientCloseShouldWork() {

grpcSourceConfig = mock(GrpcSourceConfig.class);
public void channelBuilderShouldBeDecoratedWithKeepaliveAndTimeOutMS() {
when(grpcSourceConfig.getGrpcArgKeepaliveTimeMs()).thenReturn("1000");
when(grpcSourceConfig.getGrpcArgKeepaliveTimeoutMs()).thenReturn("100");
when(channelBuilder.keepAliveTime(anyLong(), any())).thenReturn(channelBuilder);

GrpcClient grpcClient = new GrpcClient(grpcSourceConfig);
grpcClient.decorateManagedChannelBuilder(channelBuilder);
verify(channelBuilder, times(1)).keepAliveTimeout(Long.parseLong("100"), TimeUnit.MILLISECONDS);
verify(channelBuilder, times(1)).keepAliveTime(Long.parseLong("1000"), TimeUnit.MILLISECONDS);
}

when(grpcSourceConfig.getEndpoint()).thenReturn("localhost");
when(grpcSourceConfig.getServicePort()).thenReturn(8080);
@Test
public void grpcClientCloseShouldWork() {

GrpcClient grpcClient = new GrpcClient(grpcSourceConfig);

grpcClient.addChannel();

Channel decoratedChannel = grpcClient.getDecoratedChannel();
assertNotNull(decoratedChannel);

grpcClient.close();
decoratedChannel = grpcClient.getDecoratedChannel();
decoratedChannel = grpcClient.getDecoratedChannel();
assertNull(decoratedChannel);

}
Expand Down
18 changes: 18 additions & 0 deletions docs/docs/advance/post_processor.md
Original file line number Diff line number Diff line change
Expand Up @@ -666,6 +666,24 @@ The timeout value for gRPC client in ms.
- Example value: `5000`
- Type: `required`

##### `grpc_arg_keepalive_time_ms`

The keepalive ping is a way to check if a channel is currently working by sending HTTP2 pings over the transport. It is sent periodically, and if the ping is not acknowledged by the peer within a certain timeout period, the transport is disconnected. Other keepalive configurations are described [here](https://github.com/grpc/grpc/blob/master/doc/keepalive.md).

This channel argument controls the period (in milliseconds) after which a keepalive ping is sent on the transport. If smaller than 10000, 10000 will be used instead.

- Example value: `60000`
- Type: `optional`
- Default value: `infinite`

##### `grpc_arg_keepalive_timeout_ms`

This channel argument controls the amount of time (in milliseconds) the sender of the keepalive ping waits for an acknowledgement. If it does not receive an acknowledgment within this time, it will close the connection.

- Example value: `5000`
- Type: `optional`
- Default value: `20000`

##### `fail_on_errors`

A flag for deciding whether the job should fail on encountering errors or not. If set false the job won’t fail and enrich with empty fields otherwise the job will fail.
Expand Down
2 changes: 1 addition & 1 deletion version.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.10.1
0.10.2
Loading