diff --git a/store/src/test/java/org/apache/rocketmq/store/HATest.java b/store/src/test/java/org/apache/rocketmq/store/HATest.java index 38a04358174..5623adb64fa 100644 --- a/store/src/test/java/org/apache/rocketmq/store/HATest.java +++ b/store/src/test/java/org/apache/rocketmq/store/HATest.java @@ -213,12 +213,15 @@ public void testSemiSyncReplicaWhenAdaptiveDegradation() throws Exception { assertEquals(PutMessageStatus.PUT_OK, result.getPutMessageStatus()); //message has been replicated to slave's commitLog, but maybe not dispatch to ConsumeQueue yet //so direct read from commitLog by physical offset - MessageExt slaveMsg = slaveMessageStore.lookMessageByOffset(result.getAppendMessageResult().getWroteOffset()); - assertNotNull(slaveMsg); - assertArrayEquals(msg.getBody(), slaveMsg.getBody()); - assertEquals(msg.getTopic(), slaveMsg.getTopic()); - assertEquals(msg.getTags(), slaveMsg.getTags()); - assertEquals(msg.getKeys(), slaveMsg.getKeys()); + final MessageExt[] slaveMsg = {null}; + await().atMost(Duration.ofSeconds(3)).until(() -> { + slaveMsg[0] = slaveMessageStore.lookMessageByOffset(result.getAppendMessageResult().getWroteOffset()); + return slaveMsg[0] != null; + }); + assertArrayEquals(msg.getBody(), slaveMsg[0].getBody()); + assertEquals(msg.getTopic(), slaveMsg[0].getTopic()); + assertEquals(msg.getTags(), slaveMsg[0].getTags()); + assertEquals(msg.getKeys(), slaveMsg[0].getKeys()); } //shutdown slave, putMessage should return IN_SYNC_REPLICAS_NOT_ENOUGH