Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add Retryable Configuration for GRPC Sink (using CEL) #44

Merged
merged 39 commits into from
Jul 24, 2024
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
a186c50
[feat] Add CEL evaluator and add it to grpc sink
ekawinataa Jul 4, 2024
c9a6ac0
Remove blank check
ekawinataa Jul 4, 2024
0cbe4fa
remove unintended change
ekawinataa Jul 4, 2024
2b46bbb
[feat] add success field checking
ekawinataa Jul 5, 2024
7cf9caf
[feat] add evaluator method
ekawinataa Jul 5, 2024
75a5b4a
[feat] handle descriptor update
ekawinataa Jul 5, 2024
a0a9e7b
Add test for evaluator
ekawinataa Jul 5, 2024
9c0eafa
Update test
ekawinataa Jul 5, 2024
9c56c64
Update SINK_GRPC_RESPONSE_RETRY_CEL_EXPRESSION default value
ekawinataa Jul 5, 2024
764d0dd
Add test for GrpcSink
ekawinataa Jul 7, 2024
91ebabc
Rename descriptor to payloadDescriptor
ekawinataa Jul 7, 2024
2b4c984
Checkstyle update
ekawinataa Jul 7, 2024
927ccc3
Refactor instantiation logic to separate method
ekawinataa Jul 8, 2024
b350110
Remove schema refresh
ekawinataa Jul 8, 2024
8485e9d
Remove projectnessie and use implementation from cel-java
ekawinataa Jul 8, 2024
949dab9
update checkstyle
ekawinataa Jul 8, 2024
e4f4080
Add comment
ekawinataa Jul 8, 2024
70d3a5e
Update docs
ekawinataa Jul 8, 2024
44e3ee6
Move the evaluator instantiation to factory method
ekawinataa Jul 8, 2024
188d258
Remove unused sink config
ekawinataa Jul 9, 2024
3c33f10
Add more testcases
ekawinataa Jul 9, 2024
9f994e7
revert protoc version
ekawinataa Jul 9, 2024
61bc693
Add more test cases
ekawinataa Jul 9, 2024
cea966e
Add more comprehensive documentation
ekawinataa Jul 9, 2024
08adb94
Rename default class and update docs
ekawinataa Jul 9, 2024
a22d7ab
Refactor typical cel functionality to util class
ekawinataa Jul 12, 2024
e963eba
Add checking for expression result
ekawinataa Jul 15, 2024
93590c8
Use built in UnsupportedOperationException
ekawinataa Jul 18, 2024
b7f8cec
Update build-info-extractor
ekawinataa Jul 18, 2024
6283b9c
Update to classpath("org.jfrog.buildinfo:build-info-extractor-gradle:…
ekawinataa Jul 18, 2024
3caa4c0
Remove OperationNotSupportedException.java
ekawinataa Jul 18, 2024
7927c58
Remove jfrog build info on dependencies
ekawinataa Jul 18, 2024
02d39be
- Tidy up tests
ekawinataa Jul 19, 2024
312cc13
Bump version
ekawinataa Jul 19, 2024
6a706cb
Makes error type for retryable error configurable through env
ekawinataa Jul 19, 2024
e3a4c3f
Add 1 more test case
ekawinataa Jul 19, 2024
2faa366
Update default value
ekawinataa Jul 19, 2024
1631890
Resolve conflict
ekawinataa Jul 22, 2024
8f2d33d
Use default value of true on CEL Expression config to retry on defaul…
ekawinataa Jul 23, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ dependencies {
implementation 'org.apache.logging.log4j:log4j-core:2.20.0'
implementation group: 'com.gotocompany', name: 'depot', version: '0.9.1'
implementation group: 'com.networknt', name: 'json-schema-validator', version: '1.0.59' exclude group: 'org.slf4j'
implementation(enforcedPlatform("org.projectnessie.cel:cel-bom:0.4.4"))
implementation("org.projectnessie.cel:cel-tools")

testImplementation group: 'junit', name: 'junit', version: '4.11'
testImplementation 'org.hamcrest:hamcrest-all:1.3'
Expand All @@ -117,11 +119,11 @@ dependencies {
protobuf {
generatedFilesBaseDir = "$projectDir/src/generated"
protoc {
artifact = "com.google.protobuf:protoc:3.1.0"
artifact = "com.google.protobuf:protoc:3.17.3"
}
plugins {
grpc {
artifact = "io.grpc:protoc-gen-grpc-java:1.0.3"
artifact = "io.grpc:protoc-gen-grpc-java:1.60.1"
}
}
generateProtoTasks {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ public interface GrpcSinkConfig extends AppConfig {
@Config.Key("SINK_GRPC_ARG_DEADLINE_MS")
Long getSinkGrpcArgDeadlineMS();

@Config.Key("SINK_GRPC_RESPONSE_RETRY_CEL_EXPRESSION")
@DefaultValue("")
String getSinkGrpcResponseRetryCELExpression();

@Key("SINK_GRPC_METADATA")
@DefaultValue("")
@ConverterClass(GrpcMetadataConverter.class)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package com.gotocompany.firehose.evaluator;

import com.google.protobuf.Descriptors;
import com.google.protobuf.DynamicMessage;
import com.google.protobuf.Message;
import lombok.extern.slf4j.Slf4j;
import org.projectnessie.cel.checker.Decls;
import org.projectnessie.cel.tools.Script;
import org.projectnessie.cel.tools.ScriptCreateException;
import org.projectnessie.cel.tools.ScriptException;
import org.projectnessie.cel.tools.ScriptHost;

import java.util.HashMap;
import java.util.Map;

@Slf4j
public class GrpcResponseCELPayloadEvaluator implements PayloadEvaluator<Message> {

private final String celExpression;
private Script script;
private Descriptors.Descriptor descriptor;

public GrpcResponseCELPayloadEvaluator(Descriptors.Descriptor descriptor, String celExpression) {
this.celExpression = celExpression;
this.descriptor = descriptor;
this.script = buildScript(descriptor);
}

@Override
public boolean evaluate(Message payload) {
try {
Map<String, Object> arguments = new HashMap<>();
arguments.put(payload.getDescriptorForType().getFullName(), payload);
return getScript(payload.getDescriptorForType()).execute(Boolean.class, arguments);
} catch (ScriptException e) {
throw new IllegalArgumentException(
"Failed to evaluate payload with CEL Expression with reason: " + e.getMessage(), e);
}
}

private Script getScript(Descriptors.Descriptor payloadDescriptor) throws ScriptCreateException {
if (!payloadDescriptor.equals(this.descriptor)) {
synchronized (this) {
if (!payloadDescriptor.equals(this.descriptor)) {
this.script = buildScript(payloadDescriptor);
this.descriptor = payloadDescriptor;
}
}
}
return this.script;
}

private Script buildScript(Descriptors.Descriptor payloadDescriptor) {
try {
log.info("Building new CEL Script");
return ScriptHost.newBuilder()
ekawinataa marked this conversation as resolved.
Show resolved Hide resolved
ekawinataa marked this conversation as resolved.
Show resolved Hide resolved
.build()
.buildScript(this.celExpression)
.withDeclarations(Decls.newVar(payloadDescriptor.getFullName(), Decls.newObjectType(payloadDescriptor.getFullName())))
.withTypes(DynamicMessage.newBuilder(payloadDescriptor).getDefaultInstanceForType())
.build();
} catch (ScriptCreateException e) {
throw new IllegalArgumentException("Failed to build CEL Script due to : " + e.getMessage(), e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package com.gotocompany.firehose.evaluator;

public interface PayloadEvaluator<T> {
boolean evaluate(T payload);
}
34 changes: 31 additions & 3 deletions src/main/java/com/gotocompany/firehose/sink/grpc/GrpcSink.java
Original file line number Diff line number Diff line change
@@ -1,14 +1,19 @@
package com.gotocompany.firehose.sink.grpc;



import com.gotocompany.depot.error.ErrorInfo;
import com.gotocompany.depot.error.ErrorType;
import com.gotocompany.firehose.config.GrpcSinkConfig;
import com.gotocompany.firehose.evaluator.GrpcResponseCELPayloadEvaluator;
import com.gotocompany.firehose.exception.DefaultException;
import com.gotocompany.firehose.exception.DeserializerException;
import com.gotocompany.firehose.message.Message;
import com.gotocompany.firehose.metrics.FirehoseInstrumentation;
import com.gotocompany.firehose.sink.AbstractSink;
import com.gotocompany.firehose.sink.grpc.client.GrpcClient;
import com.google.protobuf.DynamicMessage;
import com.gotocompany.stencil.client.StencilClient;
import org.apache.commons.lang3.StringUtils;

import java.io.IOException;
import java.util.ArrayList;
Expand All @@ -21,13 +26,24 @@
public class GrpcSink extends AbstractSink {

private final GrpcClient grpcClient;
private final StencilClient stencilClient;
private final GrpcSinkConfig grpcSinkConfig;
private List<Message> messages;
private StencilClient stencilClient;
private GrpcResponseCELPayloadEvaluator retryEvaluator;

public GrpcSink(FirehoseInstrumentation firehoseInstrumentation, GrpcClient grpcClient, StencilClient stencilClient) {
public GrpcSink(FirehoseInstrumentation firehoseInstrumentation,
GrpcClient grpcClient,
StencilClient stencilClient,
GrpcSinkConfig grpcSinkConfig) {
super(firehoseInstrumentation, "grpc");
this.grpcClient = grpcClient;
this.stencilClient = stencilClient;
this.grpcSinkConfig = grpcSinkConfig;
if (StringUtils.isNotBlank(grpcSinkConfig.getSinkGrpcResponseRetryCELExpression())) {
this.retryEvaluator = new GrpcResponseCELPayloadEvaluator(
stencilClient.get(grpcSinkConfig.getSinkGrpcResponseSchemaProtoClass()),
ekawinataa marked this conversation as resolved.
Show resolved Hide resolved
grpcSinkConfig.getSinkGrpcResponseRetryCELExpression());
}
}

@Override
Expand All @@ -44,6 +60,9 @@ protected List<Message> execute() throws Exception {
getFirehoseInstrumentation().logWarn("Grpc Service returned error");
failedMessages.add(message);
}
if (StringUtils.isNotBlank(grpcSinkConfig.getSinkGrpcResponseRetryCELExpression())) {
setRetryEvaluatorErrorInfo(message, response);
}
}
getFirehoseInstrumentation().logDebug("Failed messages count: {}", failedMessages.size());
return failedMessages;
Expand All @@ -60,4 +79,13 @@ public void close() throws IOException {
this.messages = new ArrayList<>();
stencilClient.close();
}

private void setRetryEvaluatorErrorInfo(Message message, DynamicMessage dynamicMessage) {
boolean eligibleToRetry = retryEvaluator.evaluate(dynamicMessage);
if (eligibleToRetry) {
message.setErrorInfo(new ErrorInfo(new DefaultException("Retryable gRPC Error"), ErrorType.SINK_RETRYABLE_ERROR));
return;
}
message.setErrorInfo(new ErrorInfo(new DefaultException("Non Retryable gRPC Error"), ErrorType.SINK_NON_RETRYABLE_ERROR));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public static AbstractSink create(Map<String, String> configuration, StatsDRepor
GrpcClient grpcClient = new GrpcClient(new FirehoseInstrumentation(statsDReporter, GrpcClient.class), grpcConfig, managedChannel, stencilClient);
firehoseInstrumentation.logInfo("GRPC connection established");

return new GrpcSink(new FirehoseInstrumentation(statsDReporter, GrpcSink.class), grpcClient, stencilClient);
return new GrpcSink(new FirehoseInstrumentation(statsDReporter, GrpcSink.class), grpcClient, stencilClient, grpcConfig);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package com.gotocompany.firehose.evaluator;

import com.google.protobuf.DescriptorProtos;
import com.google.protobuf.Descriptors;
import com.google.protobuf.DynamicMessage;
import com.google.protobuf.Message;
import com.gotocompany.firehose.consumer.GenericError;
import com.gotocompany.firehose.consumer.GenericResponse;
import org.junit.Before;
import org.junit.Test;
import org.junit.jupiter.api.Assertions;

public class GrpcResponseCELPayloadEvaluatorTest {

private static final String CEL_EXPRESSION = "GenericResponse.success == false && GenericResponse.errors.exists(e, e.code == \"400\")";
private PayloadEvaluator<Message> grpcPayloadEvaluator;

@Before
public void setup() {
this.grpcPayloadEvaluator = new GrpcResponseCELPayloadEvaluator(GenericResponse.getDescriptor(), CEL_EXPRESSION);
}

@Test
public void shouldEvaluateResponseToTrueWhenCELExpressionMatchesPayload() {
GenericResponse genericResponse = GenericResponse.newBuilder()
.setSuccess(false)
.setDetail("Detail Message")
.addErrors(GenericError.newBuilder()
.setCode("400")
.setEntity("GoFin")
.setCause("Unknown")
.build())
.build();

boolean result = grpcPayloadEvaluator.evaluate(genericResponse);

Assertions.assertTrue(result);
}

@Test
public void shouldEvaluateResponseToFalseWhenCELExpressionDoesntMatchPayload() {
GenericResponse genericResponse = GenericResponse.newBuilder()
.setSuccess(false)
.setDetail("Detail Message")
.addErrors(GenericError.newBuilder()
.setCode("50000")
.setEntity("GoFin")
.setCause("Unknown")
.build())
.build();

boolean result = grpcPayloadEvaluator.evaluate(genericResponse);

Assertions.assertFalse(result);
}

@Test
public void shouldEvaluateResponseWhenDescriptorUpdated() throws Descriptors.DescriptorValidationException {
Descriptors.Descriptor baseDescriptor = GenericResponse.getDescriptor();
DescriptorProtos.FieldDescriptorProto newFieldProto = DescriptorProtos.FieldDescriptorProto.newBuilder()
.setName("new_field")
.setNumber(4)
.setType(DescriptorProtos.FieldDescriptorProto.Type.TYPE_STRING)
.build();
DescriptorProtos.DescriptorProto newDescriptorProto = baseDescriptor.toProto().toBuilder()
.addField(newFieldProto)
.build();
Descriptors.FileDescriptor newFileDescriptor = Descriptors.FileDescriptor.buildFrom(DescriptorProtos.FileDescriptorProto
.newBuilder()
.setName("new.proto")
.addMessageType(newDescriptorProto)
.addMessageType(GenericError.getDescriptor().toProto())
.build(), new Descriptors.FileDescriptor[]{});
Descriptors.Descriptor genericResponseDescriptor = newFileDescriptor.findMessageTypeByName("GenericResponse");
DynamicMessage dynamicMessage = DynamicMessage.newBuilder(genericResponseDescriptor)
.setField(genericResponseDescriptor.findFieldByName("success"), false)
.setField(genericResponseDescriptor.findFieldByName("new_field"), "new_field")
.build();

boolean result = grpcPayloadEvaluator.evaluate(dynamicMessage);

Assertions.assertFalse(result);
}

@Test
public void shouldThrowIllegalArgumentExceptionWhenPayloadNotMatchingDescriptor() {
Assertions.assertThrows(IllegalArgumentException.class,
() -> grpcPayloadEvaluator.evaluate(GenericError.newBuilder()
.setCause("Unknown")
.setCode("500")
.setEntity("GoFin")
.build()));
}
}
74 changes: 71 additions & 3 deletions src/test/java/com/gotocompany/firehose/sink/grpc/GrpcSinkTest.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
package com.gotocompany.firehose.sink.grpc;

import com.google.protobuf.InvalidProtocolBufferException;
import com.gotocompany.firehose.config.GrpcSinkConfig;
import com.gotocompany.firehose.consumer.GenericError;
import com.gotocompany.firehose.consumer.GenericResponse;
import com.gotocompany.firehose.exception.DeserializerException;
import com.gotocompany.firehose.message.Message;
import com.gotocompany.firehose.metrics.FirehoseInstrumentation;
Expand All @@ -12,6 +16,7 @@
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.junit.Before;
import org.junit.Test;
import org.junit.jupiter.api.Assertions;
import org.mockito.Mock;

import java.io.IOException;
Expand Down Expand Up @@ -40,10 +45,13 @@ public class GrpcSinkTest {
@Mock
private FirehoseInstrumentation firehoseInstrumentation;

@Mock
private GrpcSinkConfig grpcSinkConfig;

@Before
public void setUp() {
initMocks(this);
sink = new GrpcSink(firehoseInstrumentation, grpcClient, stencilClient);
sink = new GrpcSink(firehoseInstrumentation, grpcClient, stencilClient, grpcSinkConfig);
}

@Test
Expand Down Expand Up @@ -85,17 +93,77 @@ public void shouldReturnBackListOfFailedMessages() throws IOException, Deseriali

@Test
public void shouldCloseStencilClient() throws IOException {
sink = new GrpcSink(firehoseInstrumentation, grpcClient, stencilClient);
sink = new GrpcSink(firehoseInstrumentation, grpcClient, stencilClient, grpcSinkConfig);

sink.close();
verify(stencilClient, times(1)).close();
}

@Test
public void shouldLogWhenClosingConnection() throws IOException {
sink = new GrpcSink(firehoseInstrumentation, grpcClient, stencilClient);
sink = new GrpcSink(firehoseInstrumentation, grpcClient, stencilClient, grpcSinkConfig);

sink.close();
verify(firehoseInstrumentation, times(1)).logInfo("GRPC connection closing");
}

@Test
public void shouldReturnFailedMessagesWithRetryableErrorsWhenCELExpressionMatches() throws InvalidProtocolBufferException {
Message payload = new Message(new byte[]{}, new byte[]{}, "topic", 0, 1);
GenericResponse response = GenericResponse.newBuilder()
.setSuccess(false)
.setDetail("detail")
.addErrors(GenericError.newBuilder()
.setCode("4000")
.setCause("cause")
.setEntity("gtf")
.build())
.build();
DynamicMessage dynamicMessage = DynamicMessage.parseFrom(
response.getDescriptorForType(),
response.toByteArray()
);
when(grpcSinkConfig.getSinkGrpcResponseRetryCELExpression())
.thenReturn("GenericResponse.success == false && GenericResponse.errors.exists(e, e.code == \"4000\")");
when(grpcClient.execute(any(), any()))
.thenReturn(dynamicMessage);
when(stencilClient.get(any()))
.thenReturn(GenericResponse.getDescriptor());
sink = new GrpcSink(firehoseInstrumentation, grpcClient, stencilClient, grpcSinkConfig);

List<Message> result = sink.pushMessage(Collections.singletonList(payload));

Assertions.assertEquals(1, result.size());
Assertions.assertEquals(result.get(0).getErrorInfo().getErrorType(), ErrorType.SINK_RETRYABLE_ERROR);
}

@Test
public void shouldReturnFailedMessagesWithNonRetryableErrorsWhenCELExpressionDoesntMatch() throws InvalidProtocolBufferException {
Message payload = new Message(new byte[]{}, new byte[]{}, "topic", 0, 1);
GenericResponse response = GenericResponse.newBuilder()
.setSuccess(false)
.setDetail("detail")
.addErrors(GenericError.newBuilder()
.setCode("4000")
.setCause("cause")
.setEntity("gtf")
.build())
.build();
DynamicMessage dynamicMessage = DynamicMessage.parseFrom(
response.getDescriptorForType(),
response.toByteArray()
);
when(grpcSinkConfig.getSinkGrpcResponseRetryCELExpression())
.thenReturn("GenericResponse.success == false && GenericResponse.errors.exists(e, e.code == \"5000\")");
when(grpcClient.execute(any(), any()))
.thenReturn(dynamicMessage);
when(stencilClient.get(any()))
.thenReturn(GenericResponse.getDescriptor());
sink = new GrpcSink(firehoseInstrumentation, grpcClient, stencilClient, grpcSinkConfig);

List<Message> result = sink.pushMessage(Collections.singletonList(payload));

Assertions.assertEquals(1, result.size());
Assertions.assertEquals(result.get(0).getErrorInfo().getErrorType(), ErrorType.SINK_NON_RETRYABLE_ERROR);
}
}
Loading
Loading