Skip to content

Commit

Permalink
[ISSUE #9106] Fix revive backoff retry not effective in Pop Consumpti…
Browse files Browse the repository at this point in the history
…on based on rocksdb (#9107)
  • Loading branch information
redlsz authored Jan 6, 2025
1 parent a3afb05 commit 2538c34
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -496,10 +496,13 @@ public long revive(long currentTime, int maxCount) {
if (record.getAttemptTimes() < brokerConfig.getPopReviveMaxAttemptTimes()) {
long backoffInterval = 1000L * REWRITE_INTERVALS_IN_SECONDS[
Math.min(REWRITE_INTERVALS_IN_SECONDS.length, record.getAttemptTimes())];
record.setInvisibleTime(record.getInvisibleTime() + backoffInterval);
record.setAttemptTimes(record.getAttemptTimes() + 1);
failureList.add(record);
log.warn("PopConsumerService revive backoff retry, record={}", record);
long nextInvisibleTime = record.getInvisibleTime() + backoffInterval;
PopConsumerRecord retryRecord = new PopConsumerRecord(record.getPopTime(), record.getGroupId(),
record.getTopicId(), record.getQueueId(), record.getRetryFlag(), nextInvisibleTime,
record.getOffset(), record.getAttemptId());
retryRecord.setAttemptTimes(record.getAttemptTimes() + 1);
failureList.add(retryRecord);
log.warn("PopConsumerService revive backoff retry, record={}", retryRecord);
} else {
log.error("PopConsumerService drop record, message may be lost, record={}", record);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,42 @@ public void reviveRetryTest() {
consumerService.shutdown();
}

@Test
public void reviveBackoffRetryTest() {
Mockito.when(brokerController.getEscapeBridge()).thenReturn(Mockito.mock(EscapeBridge.class));
PopConsumerService consumerServiceSpy = Mockito.spy(consumerService);

consumerService.getPopConsumerStore().start();

long popTime = 1000000000L;
long invisibleTime = 60 * 1000L;
PopConsumerRecord record = new PopConsumerRecord();
record.setPopTime(popTime);
record.setInvisibleTime(invisibleTime);
record.setTopicId("topic");
record.setGroupId("group");
record.setQueueId(0);
record.setOffset(0);
consumerService.getPopConsumerStore().writeRecords(Collections.singletonList(record));

Mockito.doReturn(CompletableFuture.completedFuture(Triple.of(Mockito.mock(MessageExt.class), "", false)))
.when(consumerServiceSpy).getMessageAsync(any(PopConsumerRecord.class));
Mockito.when(brokerController.getEscapeBridge().putMessageToSpecificQueue(any(MessageExtBrokerInner.class))).thenReturn(
new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR))
);

long visibleTimestamp = popTime + invisibleTime;

// revive fails
Assert.assertEquals(1, consumerServiceSpy.revive(visibleTimestamp, 1));
// should be invisible now
Assert.assertEquals(0, consumerService.getPopConsumerStore().scanExpiredRecords(visibleTimestamp, 1).size());
// will be visible again in 10 seconds
Assert.assertEquals(1, consumerService.getPopConsumerStore().scanExpiredRecords(visibleTimestamp + 10 * 1000, 1).size());

consumerService.shutdown();
}

@Test
public void transferToFsStoreTest() {
Assert.assertNotNull(consumerService.getServiceName());
Expand Down

0 comments on commit 2538c34

Please sign in to comment.