Skip to content

Commit

Permalink
fix: fix non-retryable error being retried on successive retry attempt (
Browse files Browse the repository at this point in the history
#64)

* fix: fix non-retryable error being retried on successive retry attempt

* test: add unit tests for bug fix in retry mechanism

* test: add unit tests for bug fix in retry mechanism

* chore: bump up firehose version to 0.11.6

* fix: fix typo in test method name
  • Loading branch information
sumitaich1998 authored Jan 20, 2025
1 parent 1f732cc commit 5641228
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 15 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ lombok {
}

group 'com.gotocompany'
version '0.11.5'
version '0.11.6'

def projName = "firehose"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ private List<Message> doRetry(List<Message> messages) throws IOException {
firehoseInstrumentation.incrementCounter(RETRY_ATTEMPTS_TOTAL);
firehoseInstrumentation.logInfo("Retrying messages attempt count: {}, Number of messages: {}", attemptCount, messages.size());
logDebug(retryMessages);
retryMessages = super.pushMessage(retryMessages);
retryMessages = errorHandler.split(super.pushMessage(retryMessages), ErrorScope.RETRY).get(Boolean.TRUE);
backOff(retryMessages, attemptCount);
attemptCount++;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,10 @@
import org.mockito.Mockito;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.*;

import static com.gotocompany.firehose.metrics.Metrics.RETRY_ATTEMPTS_TOTAL;
import static com.gotocompany.firehose.metrics.Metrics.RETRY_MESSAGES_TOTAL;
import static org.junit.Assert.*;
import static org.mockito.Mockito.*;
import static org.mockito.MockitoAnnotations.initMocks;
Expand Down Expand Up @@ -189,9 +186,9 @@ public void shouldAddInstrumentationForRetry() throws Exception {
List<Message> messageList = sinkWithRetry.pushMessage(Collections.singletonList(message));
assertTrue(messageList.isEmpty());
verify(firehoseInstrumentation, times(1)).logInfo("Maximum retry attempts: {}", 3);
verify(firehoseInstrumentation, times(3)).captureMessageMetrics(Metrics.RETRY_MESSAGES_TOTAL, Metrics.MessageType.TOTAL, ErrorType.DESERIALIZATION_ERROR, 1);
verify(firehoseInstrumentation, times(2)).incrementCounter(Metrics.RETRY_ATTEMPTS_TOTAL);
verify(firehoseInstrumentation, times(1)).captureMessageMetrics(Metrics.RETRY_MESSAGES_TOTAL, Metrics.MessageType.SUCCESS, 3);
verify(firehoseInstrumentation, times(3)).captureMessageMetrics(RETRY_MESSAGES_TOTAL, Metrics.MessageType.TOTAL, ErrorType.DESERIALIZATION_ERROR, 1);
verify(firehoseInstrumentation, times(2)).incrementCounter(RETRY_ATTEMPTS_TOTAL);
verify(firehoseInstrumentation, times(1)).captureMessageMetrics(RETRY_MESSAGES_TOTAL, Metrics.MessageType.SUCCESS, 3);
}

@Test
Expand All @@ -207,10 +204,10 @@ public void shouldAddInstrumentationForRetryFailures() throws Exception {
List<Message> messageList = sinkWithRetry.pushMessage(Collections.singletonList(message));
assertFalse(messageList.isEmpty());
verify(firehoseInstrumentation, times(1)).logInfo("Maximum retry attempts: {}", 1);
verify(firehoseInstrumentation, times(3)).captureMessageMetrics(Metrics.RETRY_MESSAGES_TOTAL, Metrics.MessageType.TOTAL, ErrorType.DESERIALIZATION_ERROR, 1);
verify(firehoseInstrumentation, times(1)).incrementCounter(Metrics.RETRY_ATTEMPTS_TOTAL);
verify(firehoseInstrumentation, times(1)).captureMessageMetrics(Metrics.RETRY_MESSAGES_TOTAL, Metrics.MessageType.SUCCESS, 0);
verify(firehoseInstrumentation, times(3)).captureMessageMetrics(Metrics.RETRY_MESSAGES_TOTAL, Metrics.MessageType.FAILURE, ErrorType.DESERIALIZATION_ERROR, 1);
verify(firehoseInstrumentation, times(3)).captureMessageMetrics(RETRY_MESSAGES_TOTAL, Metrics.MessageType.TOTAL, ErrorType.DESERIALIZATION_ERROR, 1);
verify(firehoseInstrumentation, times(1)).incrementCounter(RETRY_ATTEMPTS_TOTAL);
verify(firehoseInstrumentation, times(1)).captureMessageMetrics(RETRY_MESSAGES_TOTAL, Metrics.MessageType.SUCCESS, 0);
verify(firehoseInstrumentation, times(3)).captureMessageMetrics(RETRY_MESSAGES_TOTAL, Metrics.MessageType.FAILURE, ErrorType.DESERIALIZATION_ERROR, 1);
}

@Test(expected = IOException.class)
Expand Down Expand Up @@ -253,4 +250,73 @@ public void shouldRetryMessagesWhenErrorTypesConfigured() throws IOException {
assertEquals(1, args.get(1).size());
assertEquals(messageWithError, args.get(1).get(0));
}

@Test
public void shouldFilterOutNonRetryableErrorsFromRetryAttempts() throws IOException, DeserializerException {
Message retryableMessage = new Message("key1".getBytes(), "value1".getBytes(), "topic", 1, 1,
null, 0, 0, new ErrorInfo(null, ErrorType.DESERIALIZATION_ERROR));
Message nonRetryableMessage = new Message("key2".getBytes(), "value2".getBytes(), "topic", 1, 2,
null, 0, 0, new ErrorInfo(null, ErrorType.SINK_UNKNOWN_ERROR));

ArrayList<Message> initialFailedMessages = new ArrayList<>();
initialFailedMessages.add(retryableMessage);
initialFailedMessages.add(nonRetryableMessage);

when(sinkDecorator.pushMessage(anyList()))
.thenReturn(initialFailedMessages)
.thenReturn(new ArrayList<>());

SinkWithRetry sinkWithRetry = new SinkWithRetry(sinkDecorator, backOffProvider,
firehoseInstrumentation, appConfig, parser, errorHandler);

List<Message> messageList = sinkWithRetry.pushMessage(initialFailedMessages);

assertEquals(1, messageList.size());
assertEquals(ErrorType.SINK_UNKNOWN_ERROR, messageList.get(0).getErrorInfo().getErrorType());

ArgumentCaptor<List> argumentCaptor = ArgumentCaptor.forClass(List.class);
verify(sinkDecorator, times(2)).pushMessage(argumentCaptor.capture());
List<List> capturedArgs = argumentCaptor.getAllValues();

assertEquals(2, capturedArgs.get(0).size());
assertEquals(1, capturedArgs.get(1).size());
assertEquals(ErrorType.DESERIALIZATION_ERROR,
((Message) capturedArgs.get(1).get(0)).getErrorInfo().getErrorType());
}

@Test
public void shouldStopRetryingWhenNonRetryableErrorOccurs() throws IOException, DeserializerException {
errorHandler = new ErrorHandler(ConfigFactory.create(ErrorConfig.class, new HashMap<String, String>() {{
put("ERROR_TYPES_FOR_RETRY", ErrorType.DESERIALIZATION_ERROR.name() + "," + ErrorType.SINK_RETRYABLE_ERROR.name());
}}));

Message message1 = new Message("key1".getBytes(), "value1".getBytes(), "topic", 1, 1);
Message message2 = new Message("key2".getBytes(), "value2".getBytes(), "topic", 1, 2);
ArrayList<Message> messages = new ArrayList<>();
messages.add(message1);
messages.add(message2);

ArrayList<Message> firstAttemptMessages = new ArrayList<>();
firstAttemptMessages.add(new Message(message1, new ErrorInfo(new IOException(), ErrorType.SINK_RETRYABLE_ERROR)));
firstAttemptMessages.add(new Message(message2, new ErrorInfo(new IOException(), ErrorType.SINK_RETRYABLE_ERROR)));

ArrayList<Message> secondAttemptMessages = new ArrayList<>();
secondAttemptMessages.add(new Message(message1, new ErrorInfo(new IOException(), ErrorType.SINK_4XX_ERROR)));
secondAttemptMessages.add(new Message(message2, new ErrorInfo(new IOException(), ErrorType.SINK_4XX_ERROR)));

when(sinkDecorator.pushMessage(anyList()))
.thenReturn(firstAttemptMessages)
.thenReturn(secondAttemptMessages);

SinkWithRetry sinkWithRetry = new SinkWithRetry(sinkDecorator, backOffProvider,
firehoseInstrumentation, appConfig, parser, errorHandler);

List<Message> finalFailedMessages = sinkWithRetry.pushMessage(messages);

assertEquals(0, finalFailedMessages.size());
verify(sinkDecorator, times(2)).pushMessage(anyList());
verify(firehoseInstrumentation, times(1)).incrementCounter(RETRY_ATTEMPTS_TOTAL);
verify(firehoseInstrumentation, times(1)).logInfo("Retrying messages attempt count: {}, Number of messages: {}", 1, 2);
}

}

0 comments on commit 5641228

Please sign in to comment.