From 0b0aa80425d1851996f1268972a593d0b257fcdf Mon Sep 17 00:00:00 2001 From: yash Date: Thu, 30 Nov 2023 22:20:56 +0530 Subject: [PATCH 1/2] [ISSUE 4622] Improve execution efficiency of integration tests, change sleep to Awaitility.await --- .../acl/plain/PlainAccessValidatorTest.java | 192 ++++++++++++------ .../acl/plain/PlainPermissionManagerTest.java | 38 ++-- .../rocketmq/broker/BrokerControllerTest.java | 4 +- .../rocketmq/broker/BrokerOuterAPITest.java | 4 +- .../controller/ReplicasManagerTest.java | 4 +- .../filter/MessageStoreWithFilterTest.java | 7 +- .../broker/latency/BrokerFastFailureTest.java | 5 +- .../processor/PopBufferMergeServiceTest.java | 5 +- .../schedule/ScheduleMessageServiceTest.java | 4 +- .../consumer/DefaultLitePullConsumerTest.java | 5 +- .../consumer/DefaultMQPushConsumerTest.java | 6 +- ...AllocateMessageQueueConsitentHashTest.java | 6 +- ...ConsumeMessageConcurrentlyServiceTest.java | 4 +- .../producer/DefaultMQProducerTest.java | 7 +- .../common/stats/StatsItemSetTest.java | 8 +- .../impl/DLedgerControllerTest.java | 3 +- .../apache/rocketmq/filter/BitsArrayTest.java | 7 +- .../ClusterTestRequestProcessorTest.java | 4 +- .../route/ClusterTopicRouteServiceTest.java | 4 +- .../rocketmq/store/ConsumeQueueExtTest.java | 7 +- .../store/DefaultMessageStoreTest.java | 11 +- .../rocketmq/store/FlushDiskWatcherTest.java | 6 +- .../org/apache/rocketmq/store/HATest.java | 3 +- .../rocketmq/store/MappedFileQueueTest.java | 12 +- .../store/RocksDBMessageStoreTest.java | 11 +- .../apache/rocketmq/store/StoreTestUtil.java | 8 +- .../store/queue/BatchConsumeMessageTest.java | 8 +- .../store/timer/TimerMessageStoreTest.java | 8 +- .../rocketmq/test/util/MQAdminTestUtils.java | 6 +- .../apache/rocketmq/test/util/TestUtil.java | 16 +- .../apache/rocketmq/test/util/TestUtils.java | 18 +- .../AutoSwitchRoleIntegrationTest.java | 17 +- .../ContainerIntegrationTestBase.java | 6 +- .../tieredstore/index/IndexStoreFileTest.java | 7 +- .../index/IndexStoreServiceTest.java | 9 +- .../provider/TieredFileSegmentTest.java | 13 +- 36 files changed, 321 insertions(+), 162 deletions(-) diff --git a/acl/src/test/java/org/apache/rocketmq/acl/plain/PlainAccessValidatorTest.java b/acl/src/test/java/org/apache/rocketmq/acl/plain/PlainAccessValidatorTest.java index ef0cffbdcc8..d697ed616cd 100644 --- a/acl/src/test/java/org/apache/rocketmq/acl/plain/PlainAccessValidatorTest.java +++ b/acl/src/test/java/org/apache/rocketmq/acl/plain/PlainAccessValidatorTest.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.lang.reflect.Field; import java.nio.ByteBuffer; +import java.time.Duration; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -53,6 +54,7 @@ import org.apache.rocketmq.remoting.protocol.heartbeat.HeartbeatData; import org.apache.rocketmq.remoting.protocol.heartbeat.ProducerData; import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData; +import org.awaitility.Awaitility; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -88,7 +90,8 @@ public void cleanUp() { public void contentTest() { SendMessageRequestHeader messageRequestHeader = new SendMessageRequestHeader(); messageRequestHeader.setTopic("topicA"); - RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, messageRequestHeader); + RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, + messageRequestHeader); aclClient.doBeforeRequest("", remotingCommand); ByteBuffer buf = remotingCommand.encodeHeader(); @@ -96,7 +99,8 @@ public void contentTest() { buf = ByteBuffer.allocate(buf.limit() - buf.position()).put(buf); buf.position(0); try { - PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(RemotingCommand.decode(buf), "127.0.0.1"); + PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator + .parse(RemotingCommand.decode(buf), "127.0.0.1"); String signature = AclUtils.calSignature(accessResource.getContent(), sessionCredentials.getSecretKey()); Assert.assertEquals(accessResource.getSignature(), signature); @@ -112,7 +116,8 @@ public void contentTest() { public void validateTest() { SendMessageRequestHeader messageRequestHeader = new SendMessageRequestHeader(); messageRequestHeader.setTopic("topicB"); - RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, messageRequestHeader); + RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, + messageRequestHeader); aclClient.doBeforeRequest("", remotingCommand); ByteBuffer buf = remotingCommand.encodeHeader(); @@ -120,7 +125,8 @@ public void validateTest() { buf = ByteBuffer.allocate(buf.limit() - buf.position()).put(buf); buf.position(0); try { - PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(RemotingCommand.decode(buf), "123.4.5.6"); + PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator + .parse(RemotingCommand.decode(buf), "123.4.5.6"); plainAccessValidator.validate(accessResource); } catch (RemotingCommandException e) { e.printStackTrace(); @@ -134,7 +140,8 @@ public void validateTest() { public void validateSendMessageTest() { SendMessageRequestHeader messageRequestHeader = new SendMessageRequestHeader(); messageRequestHeader.setTopic("topicB"); - RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, messageRequestHeader); + RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, + messageRequestHeader); aclClient.doBeforeRequest("", remotingCommand); ByteBuffer buf = remotingCommand.encodeHeader(); @@ -142,7 +149,8 @@ public void validateSendMessageTest() { buf = ByteBuffer.allocate(buf.limit() - buf.position()).put(buf); buf.position(0); try { - PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(RemotingCommand.decode(buf), "123.4.5.6"); + PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator + .parse(RemotingCommand.decode(buf), "123.4.5.6"); plainAccessValidator.validate(accessResource); } catch (RemotingCommandException e) { e.printStackTrace(); @@ -155,7 +163,8 @@ public void validateSendMessageTest() { public void validateSendMessageToRetryTopicTest() { SendMessageRequestHeader messageRequestHeader = new SendMessageRequestHeader(); messageRequestHeader.setTopic(MixAll.getRetryTopic("groupB")); - RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, messageRequestHeader); + RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, + messageRequestHeader); aclClient.doBeforeRequest("", remotingCommand); ByteBuffer buf = remotingCommand.encodeHeader(); @@ -163,7 +172,8 @@ public void validateSendMessageToRetryTopicTest() { buf = ByteBuffer.allocate(buf.limit() - buf.position()).put(buf); buf.position(0); try { - PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(RemotingCommand.decode(buf), "123.4.5.6"); + PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator + .parse(RemotingCommand.decode(buf), "123.4.5.6"); plainAccessValidator.validate(accessResource); } catch (RemotingCommandException e) { e.printStackTrace(); @@ -176,7 +186,8 @@ public void validateSendMessageToRetryTopicTest() { public void validateSendMessageV2Test() { SendMessageRequestHeader messageRequestHeader = new SendMessageRequestHeader(); messageRequestHeader.setTopic("topicB"); - RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE_V2, SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(messageRequestHeader)); + RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE_V2, + SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(messageRequestHeader)); aclClient.doBeforeRequest("", remotingCommand); ByteBuffer buf = remotingCommand.encodeHeader(); @@ -184,7 +195,8 @@ public void validateSendMessageV2Test() { buf = ByteBuffer.allocate(buf.limit() - buf.position()).put(buf); buf.position(0); try { - PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(RemotingCommand.decode(buf), "123.4.5.6"); + PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator + .parse(RemotingCommand.decode(buf), "123.4.5.6"); plainAccessValidator.validate(accessResource); } catch (RemotingCommandException e) { e.printStackTrace(); @@ -197,7 +209,8 @@ public void validateSendMessageV2Test() { public void validateSendMessageV2ToRetryTopicTest() { SendMessageRequestHeader messageRequestHeader = new SendMessageRequestHeader(); messageRequestHeader.setTopic(MixAll.getRetryTopic("groupC")); - RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE_V2, SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(messageRequestHeader)); + RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE_V2, + SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(messageRequestHeader)); aclClient.doBeforeRequest("", remotingCommand); ByteBuffer buf = remotingCommand.encodeHeader(); @@ -205,7 +218,8 @@ public void validateSendMessageV2ToRetryTopicTest() { buf = ByteBuffer.allocate(buf.limit() - buf.position()).put(buf); buf.position(0); try { - PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(RemotingCommand.decode(buf), "123.4.5.6:9876"); + PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator + .parse(RemotingCommand.decode(buf), "123.4.5.6:9876"); plainAccessValidator.validate(accessResource); } catch (RemotingCommandException e) { e.printStackTrace(); @@ -216,16 +230,20 @@ public void validateSendMessageV2ToRetryTopicTest() { @Test public void validateForAdminCommandWithOutAclRPCHook() { - RemotingCommand consumerOffsetAdminRequest = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_CONSUMER_OFFSET, null); + RemotingCommand consumerOffsetAdminRequest = RemotingCommand + .createRequestCommand(RequestCode.GET_ALL_CONSUMER_OFFSET, null); plainAccessValidator.parse(consumerOffsetAdminRequest, "192.168.0.1:9876"); - RemotingCommand subscriptionGroupAdminRequest = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_SUBSCRIPTIONGROUP_CONFIG, null); + RemotingCommand subscriptionGroupAdminRequest = RemotingCommand + .createRequestCommand(RequestCode.GET_ALL_SUBSCRIPTIONGROUP_CONFIG, null); plainAccessValidator.parse(subscriptionGroupAdminRequest, "192.168.0.1:9876"); - RemotingCommand delayOffsetAdminRequest = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_DELAY_OFFSET, null); + RemotingCommand delayOffsetAdminRequest = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_DELAY_OFFSET, + null); plainAccessValidator.parse(delayOffsetAdminRequest, "192.168.0.1:9876"); - RemotingCommand allTopicConfigAdminRequest = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_TOPIC_CONFIG, null); + RemotingCommand allTopicConfigAdminRequest = RemotingCommand + .createRequestCommand(RequestCode.GET_ALL_TOPIC_CONFIG, null); plainAccessValidator.parse(allTopicConfigAdminRequest, "192.168.0.1:9876"); } @@ -235,14 +253,16 @@ public void validatePullMessageTest() { PullMessageRequestHeader pullMessageRequestHeader = new PullMessageRequestHeader(); pullMessageRequestHeader.setTopic("topicC"); pullMessageRequestHeader.setConsumerGroup("groupC"); - RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, pullMessageRequestHeader); + RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, + pullMessageRequestHeader); aclClient.doBeforeRequest("", remotingCommand); ByteBuffer buf = remotingCommand.encodeHeader(); buf.getInt(); buf = ByteBuffer.allocate(buf.limit() - buf.position()).put(buf); buf.position(0); try { - PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(RemotingCommand.decode(buf), "123.4.5.6"); + PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator + .parse(RemotingCommand.decode(buf), "123.4.5.6"); plainAccessValidator.validate(accessResource); } catch (RemotingCommandException e) { e.printStackTrace(); @@ -256,14 +276,16 @@ public void validateConsumeMessageBackTest() { ConsumerSendMsgBackRequestHeader consumerSendMsgBackRequestHeader = new ConsumerSendMsgBackRequestHeader(); consumerSendMsgBackRequestHeader.setOriginTopic("topicC"); consumerSendMsgBackRequestHeader.setGroup("groupC"); - RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.CONSUMER_SEND_MSG_BACK, consumerSendMsgBackRequestHeader); + RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.CONSUMER_SEND_MSG_BACK, + consumerSendMsgBackRequestHeader); aclClient.doBeforeRequest("", remotingCommand); ByteBuffer buf = remotingCommand.encodeHeader(); buf.getInt(); buf = ByteBuffer.allocate(buf.limit() - buf.position()).put(buf); buf.position(0); try { - PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(RemotingCommand.decode(buf), "123.4.5.6"); + PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator + .parse(RemotingCommand.decode(buf), "123.4.5.6"); plainAccessValidator.validate(accessResource); } catch (RemotingCommandException e) { e.printStackTrace(); @@ -276,14 +298,16 @@ public void validateConsumeMessageBackTest() { public void validateQueryMessageTest() { QueryMessageRequestHeader queryMessageRequestHeader = new QueryMessageRequestHeader(); queryMessageRequestHeader.setTopic("topicC"); - RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.QUERY_MESSAGE, queryMessageRequestHeader); + RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.QUERY_MESSAGE, + queryMessageRequestHeader); aclClient.doBeforeRequest("", remotingCommand); ByteBuffer buf = remotingCommand.encodeHeader(); buf.getInt(); buf = ByteBuffer.allocate(buf.limit() - buf.position()).put(buf); buf.position(0); try { - PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(RemotingCommand.decode(buf), "123.4.5.6"); + PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator + .parse(RemotingCommand.decode(buf), "123.4.5.6"); plainAccessValidator.validate(accessResource); } catch (RemotingCommandException e) { e.printStackTrace(); @@ -296,7 +320,8 @@ public void validateQueryMessageTest() { public void validateQueryMessageByKeyTest() { QueryMessageRequestHeader queryMessageRequestHeader = new QueryMessageRequestHeader(); queryMessageRequestHeader.setTopic("topicC"); - RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.QUERY_MESSAGE, queryMessageRequestHeader); + RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.QUERY_MESSAGE, + queryMessageRequestHeader); aclClient.doBeforeRequest("", remotingCommand); remotingCommand.addExtField(MixAll.UNIQUE_MSG_QUERY_FLAG, "false"); ByteBuffer buf = remotingCommand.encodeHeader(); @@ -304,7 +329,8 @@ public void validateQueryMessageByKeyTest() { buf = ByteBuffer.allocate(buf.limit() - buf.position()).put(buf); buf.position(0); try { - PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(RemotingCommand.decode(buf), "192.168.1.1:9876"); + PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator + .parse(RemotingCommand.decode(buf), "192.168.1.1:9876"); plainAccessValidator.validate(accessResource); } catch (RemotingCommandException e) { e.printStackTrace(); @@ -339,7 +365,8 @@ public void validateHeartBeatTest() { buf = ByteBuffer.allocate(buf.limit() - buf.position()).put(buf); buf.position(0); try { - PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(RemotingCommand.decode(buf), "123.4.5.6"); + PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator + .parse(RemotingCommand.decode(buf), "123.4.5.6"); plainAccessValidator.validate(accessResource); } catch (RemotingCommandException e) { e.printStackTrace(); @@ -352,14 +379,16 @@ public void validateHeartBeatTest() { public void validateUnRegisterClientTest() { UnregisterClientRequestHeader unregisterClientRequestHeader = new UnregisterClientRequestHeader(); unregisterClientRequestHeader.setConsumerGroup("groupB"); - RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.UNREGISTER_CLIENT, unregisterClientRequestHeader); + RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.UNREGISTER_CLIENT, + unregisterClientRequestHeader); aclClient.doBeforeRequest("", remotingCommand); ByteBuffer buf = remotingCommand.encodeHeader(); buf.getInt(); buf = ByteBuffer.allocate(buf.limit() - buf.position()).put(buf); buf.position(0); try { - PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(RemotingCommand.decode(buf), "123.4.5.6"); + PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator + .parse(RemotingCommand.decode(buf), "123.4.5.6"); plainAccessValidator.validate(accessResource); } catch (RemotingCommandException e) { e.printStackTrace(); @@ -372,14 +401,16 @@ public void validateUnRegisterClientTest() { public void validateGetConsumerListByGroupTest() { GetConsumerListByGroupRequestHeader getConsumerListByGroupRequestHeader = new GetConsumerListByGroupRequestHeader(); getConsumerListByGroupRequestHeader.setConsumerGroup("groupB"); - RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.GET_CONSUMER_LIST_BY_GROUP, getConsumerListByGroupRequestHeader); + RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.GET_CONSUMER_LIST_BY_GROUP, + getConsumerListByGroupRequestHeader); aclClient.doBeforeRequest("", remotingCommand); ByteBuffer buf = remotingCommand.encodeHeader(); buf.getInt(); buf = ByteBuffer.allocate(buf.limit() - buf.position()).put(buf); buf.position(0); try { - PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(RemotingCommand.decode(buf), "123.4.5.6"); + PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator + .parse(RemotingCommand.decode(buf), "123.4.5.6"); plainAccessValidator.validate(accessResource); } catch (RemotingCommandException e) { e.printStackTrace(); @@ -392,14 +423,16 @@ public void validateGetConsumerListByGroupTest() { public void validateUpdateConsumerOffSetTest() { UpdateConsumerOffsetRequestHeader updateConsumerOffsetRequestHeader = new UpdateConsumerOffsetRequestHeader(); updateConsumerOffsetRequestHeader.setConsumerGroup("groupB"); - RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.UPDATE_CONSUMER_OFFSET, updateConsumerOffsetRequestHeader); + RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.UPDATE_CONSUMER_OFFSET, + updateConsumerOffsetRequestHeader); aclClient.doBeforeRequest("", remotingCommand); ByteBuffer buf = remotingCommand.encodeHeader(); buf.getInt(); buf = ByteBuffer.allocate(buf.limit() - buf.position()).put(buf); buf.position(0); try { - PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(RemotingCommand.decode(buf), "123.4.5.6"); + PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator + .parse(RemotingCommand.decode(buf), "123.4.5.6"); plainAccessValidator.validate(accessResource); } catch (RemotingCommandException e) { e.printStackTrace(); @@ -416,7 +449,8 @@ public void validateNullAccessKeyTest() { AclClientRPCHook aclClientRPCHook = new AclClientRPCHook(sessionCredentials); SendMessageRequestHeader messageRequestHeader = new SendMessageRequestHeader(); messageRequestHeader.setTopic("topicB"); - RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, messageRequestHeader); + RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, + messageRequestHeader); aclClientRPCHook.doBeforeRequest("", remotingCommand); ByteBuffer buf = remotingCommand.encodeHeader(); @@ -424,7 +458,8 @@ public void validateNullAccessKeyTest() { buf = ByteBuffer.allocate(buf.limit() - buf.position()).put(buf); buf.position(0); try { - PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(RemotingCommand.decode(buf), "192.168.1.1"); + PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator + .parse(RemotingCommand.decode(buf), "192.168.1.1"); plainAccessValidator.validate(accessResource); } catch (RemotingCommandException e) { e.printStackTrace(); @@ -441,7 +476,8 @@ public void validateErrorSecretKeyTest() { AclClientRPCHook aclClientRPCHook = new AclClientRPCHook(sessionCredentials); SendMessageRequestHeader messageRequestHeader = new SendMessageRequestHeader(); messageRequestHeader.setTopic("topicB"); - RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, messageRequestHeader); + RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, + messageRequestHeader); aclClientRPCHook.doBeforeRequest("", remotingCommand); ByteBuffer buf = remotingCommand.encodeHeader(); @@ -449,7 +485,8 @@ public void validateErrorSecretKeyTest() { buf = ByteBuffer.allocate(buf.limit() - buf.position()).put(buf); buf.position(0); try { - PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(RemotingCommand.decode(buf), "192.168.1.1"); + PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator + .parse(RemotingCommand.decode(buf), "192.168.1.1"); plainAccessValidator.validate(accessResource); } catch (RemotingCommandException e) { e.printStackTrace(); @@ -468,7 +505,8 @@ public void validateGetAllTopicConfigTest() { buf = ByteBuffer.allocate(buf.limit() - buf.position()).put(buf); buf.position(0); try { - PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(RemotingCommand.decode(buf), whiteRemoteAddress); + PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator + .parse(RemotingCommand.decode(buf), whiteRemoteAddress); plainAccessValidator.validate(accessResource); } catch (RemotingCommandException e) { e.printStackTrace(); @@ -480,7 +518,7 @@ public void validateGetAllTopicConfigTest() { @Test public void addAccessAclYamlConfigTest() throws InterruptedException { String backupFileName = System.getProperty("rocketmq.home.dir") - + File.separator + "conf/plain_acl_bak.yml".replace("/", File.separator); + + File.separator + "conf/plain_acl_bak.yml".replace("/", File.separator); String targetFileName = System.getProperty("rocketmq.home.dir") + File.separator + "conf/plain_acl.yml".replace("/", File.separator); PlainAccessData backUpAclConfigMap = AclUtils.getYamlDataObject(backupFileName, PlainAccessData.class); @@ -503,7 +541,7 @@ public void addAccessAclYamlConfigTest() throws InterruptedException { PlainAccessValidator plainAccessValidator = new PlainAccessValidator(); plainAccessValidator.updateAccessConfig(plainAccessConfig); - Thread.sleep(10000); + Awaitility.await().pollDelay(Duration.ofMillis(10000)).until(() -> true); Map verifyMap = new HashMap<>(); AclConfig aclConfig = plainAccessValidator.getAllAclConfig(); @@ -556,7 +594,7 @@ public void getAccessAclYamlConfigTest() { Assert.assertEquals(verifyMap.get(AclConstants.CONFIG_WHITE_ADDR), "192.168.1.*"); String aclFileName = System.getProperty("rocketmq.home.dir") - + File.separator + "conf/plain_acl.yml".replace("/", File.separator); + + File.separator + "conf/plain_acl.yml".replace("/", File.separator); Map dataVersionMap = plainAccessValidator.getAllAclConfigVersion(); DataVersion dataVersion = dataVersionMap.get(aclFileName); Assert.assertEquals(0, dataVersion.getCounter().get()); @@ -589,7 +627,9 @@ public void updateAccessAclYamlConfigTest() throws InterruptedException { PlainAccessValidator plainAccessValidator = new PlainAccessValidator(); plainAccessValidator.updateAccessConfig(plainAccessConfig); - Thread.sleep(10000); + Awaitility.await() + .pollDelay(Duration.ofMillis(10000)) + .until(() -> true); PlainAccessConfig plainAccessConfig1 = new PlainAccessConfig(); plainAccessConfig1.setAccessKey("rocketmq3"); @@ -608,7 +648,9 @@ public void updateAccessAclYamlConfigTest() throws InterruptedException { plainAccessValidator.updateAccessConfig(plainAccessConfig1); - Thread.sleep(10000); + Awaitility.await() + .pollDelay(Duration.ofMillis(10000)) + .until(() -> true); Map verifyMap = new HashMap<>(); AclConfig aclConfig = plainAccessValidator.getAllAclConfig(); @@ -634,7 +676,7 @@ public void updateAccessAclYamlConfigTest() throws InterruptedException { Assert.assertEquals(((List) verifyMap.get(AclConstants.CONFIG_GROUP_PERMS)).size(), 2); String aclFileName = System.getProperty("rocketmq.home.dir") - + File.separator + "conf/plain_acl.yml".replace("/", File.separator); + + File.separator + "conf/plain_acl.yml".replace("/", File.separator); PlainAccessData readableMap = AclUtils.getYamlDataObject(aclFileName, PlainAccessData.class); List dataVersions = readableMap.getDataVersion(); Assert.assertEquals(2L, dataVersions.get(0).getCounter()); @@ -671,7 +713,9 @@ public void deleteAccessAclYamlConfigTest() throws InterruptedException { String accessKey = "rocketmq3"; plainAccessValidator.deleteAccessConfig(accessKey); - Thread.sleep(10000); + Awaitility.await() + .pollDelay(Duration.ofMillis(10000)) + .until(() -> true); Map verifyMap = new HashMap<>(); AclConfig aclConfig = plainAccessValidator.getAllAclConfig(); @@ -709,7 +753,7 @@ public void updateGlobalWhiteRemoteAddressesTest() throws InterruptedException { Assert.assertEquals(plainAccessValidator.updateGlobalWhiteAddrsConfig(globalWhiteAddrsList, null), true); String aclFileName = System.getProperty("rocketmq.home.dir") - + File.separator + "conf/plain_acl.yml".replace("/", File.separator); + + File.separator + "conf/plain_acl.yml".replace("/", File.separator); PlainAccessData readableMap = AclUtils.getYamlDataObject(aclFileName, PlainAccessData.class); List dataVersions = readableMap.getDataVersion(); Assert.assertEquals(1L, dataVersions.get(0).getCounter()); @@ -719,7 +763,7 @@ public void updateGlobalWhiteRemoteAddressesTest() throws InterruptedException { @Test public void addYamlConfigTest() throws IOException, InterruptedException { String fileName = System.getProperty("rocketmq.home.dir") - + File.separator + "conf/acl/plain_acl_test.yml".replace("/", File.separator); + + File.separator + "conf/acl/plain_acl_test.yml".replace("/", File.separator); File transport = new File(fileName); transport.delete(); transport.createNewFile(); @@ -732,7 +776,9 @@ public void addYamlConfigTest() throws IOException, InterruptedException { writer.flush(); writer.close(); - Thread.sleep(1000); + Awaitility.await() + .pollDelay(Duration.ofMillis(1000)) + .until(() -> true); PlainAccessValidator plainAccessValidator = new PlainAccessValidator(); AclConfig aclConfig = plainAccessValidator.getAllAclConfig(); @@ -760,7 +806,7 @@ public void addYamlConfigTest() throws IOException, InterruptedException { @Test public void updateAccessAnotherAclYamlConfigTest() throws IOException, InterruptedException { String fileName = System.getProperty("rocketmq.home.dir") - + File.separator + "conf/acl/plain_acl_test.yml".replace("/", File.separator); + + File.separator + "conf/acl/plain_acl_test.yml".replace("/", File.separator); File transport = new File(fileName); transport.delete(); transport.createNewFile(); @@ -777,7 +823,9 @@ public void updateAccessAnotherAclYamlConfigTest() throws IOException, Interrupt writer.flush(); writer.close(); - Thread.sleep(1000); + Awaitility.await() + .pollDelay(Duration.ofMillis(1000)) + .until(() -> true); PlainAccessValidator plainAccessValidator = new PlainAccessValidator(); @@ -789,7 +837,9 @@ public void updateAccessAnotherAclYamlConfigTest() throws IOException, Interrupt plainAccessValidator.updateAccessConfig(plainAccessConfig); - Thread.sleep(1000); + Awaitility.await() + .pollDelay(Duration.ofMillis(1000)) + .until(() -> true); AclConfig aclConfig = plainAccessValidator.getAllAclConfig(); List plainAccessConfigs = aclConfig.getPlainAccessConfigs(); @@ -851,7 +901,9 @@ public void addAccessDefaultAclYamlConfigTest() throws InterruptedException { plainAccessValidator.updateAccessConfig(plainAccessConfig); - Thread.sleep(10000); + Awaitility.await() + .pollDelay(Duration.ofMillis(10000)) + .until(() -> true); AclConfig aclConfig = plainAccessValidator.getAllAclConfig(); List plainAccessConfigs = aclConfig.getPlainAccessConfigs(); @@ -878,7 +930,7 @@ public void addAccessDefaultAclYamlConfigTest() throws InterruptedException { @Test public void deleteAccessAnotherAclYamlConfigTest() throws IOException, InterruptedException { String fileName = System.getProperty("rocketmq.home.dir") - + File.separator + "conf/acl/plain_acl_test.yml".replace("/", File.separator); + + File.separator + "conf/acl/plain_acl_test.yml".replace("/", File.separator); File transport = new File(fileName); transport.delete(); transport.createNewFile(); @@ -895,11 +947,15 @@ public void deleteAccessAnotherAclYamlConfigTest() throws IOException, Interrupt writer.flush(); writer.close(); - Thread.sleep(1000); + Awaitility.await() + .pollDelay(Duration.ofMillis(10000)) + .until(() -> true); PlainAccessValidator plainAccessValidator = new PlainAccessValidator(); plainAccessValidator.deleteAccessConfig("watchrocketmqx"); - Thread.sleep(10000); + Awaitility.await() + .pollDelay(Duration.ofMillis(10000)) + .until(() -> true); Map verifyMap = new HashMap<>(); AclConfig aclConfig = plainAccessValidator.getAllAclConfig(); @@ -1015,7 +1071,7 @@ public void deleteAccessAclToEmptyTest() { @Test public void testValidateAfterUpdateAccessConfig() throws NoSuchFieldException, IllegalAccessException { String targetFileName = System.getProperty("rocketmq.home.dir") - + File.separator + "conf/update.yml".replace("/", File.separator); + + File.separator + "conf/update.yml".replace("/", File.separator); System.setProperty("rocketmq.acl.plain.file", "conf/update.yml".replace("/", File.separator)); PlainAccessValidator plainAccessValidator = new PlainAccessValidator(); PlainAccessConfig plainAccessConfig = new PlainAccessConfig(); @@ -1037,7 +1093,8 @@ public void testValidateAfterUpdateAccessConfig() throws NoSuchFieldException, I PullMessageRequestHeader pullMessageRequestHeader = new PullMessageRequestHeader(); pullMessageRequestHeader.setTopic("topicC"); pullMessageRequestHeader.setConsumerGroup("consumerGroupA"); - RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, pullMessageRequestHeader); + RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, + pullMessageRequestHeader); AclClientRPCHook aclClient = new AclClientRPCHook(new SessionCredentials(accessKey, secretKey)); aclClient.doBeforeRequest("", remotingCommand); @@ -1046,7 +1103,8 @@ public void testValidateAfterUpdateAccessConfig() throws NoSuchFieldException, I buf = ByteBuffer.allocate(buf.limit() - buf.position()).put(buf); buf.position(0); try { - PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(RemotingCommand.decode(buf), "1.1.1.1:9876"); + PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator + .parse(RemotingCommand.decode(buf), "1.1.1.1:9876"); plainAccessValidator.validate(accessResource); } catch (RemotingCommandException e) { e.printStackTrace(); @@ -1058,6 +1116,7 @@ public void testValidateAfterUpdateAccessConfig() throws NoSuchFieldException, I /** * Fixme: this test case is not thread safe. The design itself is buggy. + * * @throws IOException */ @Test @@ -1067,13 +1126,16 @@ public void testUpdateSpecifiedAclFileGlobalWhiteAddrsConfig() throws IOExceptio System.setProperty("rocketmq.home.dir", home.getAbsolutePath()); System.setProperty("rocketmq.acl.plain.file", "/conf/plain_acl.yml".replace("/", File.separator)); - String targetFileName = Joiner.on(File.separator).join(new String[]{home.getAbsolutePath(), "conf", "plain_acl.yml"}); + String targetFileName = Joiner.on(File.separator) + .join(new String[] { home.getAbsolutePath(), "conf", "plain_acl.yml" }); PlainAccessData backUpAclConfigMap = AclUtils.getYamlDataObject(targetFileName, PlainAccessData.class); - String targetFileName1 = Joiner.on(File.separator).join(new String[]{home.getAbsolutePath(), "conf", "acl", "plain_acl.yml"}); + String targetFileName1 = Joiner.on(File.separator) + .join(new String[] { home.getAbsolutePath(), "conf", "acl", "plain_acl.yml" }); PlainAccessData backUpAclConfigMap1 = AclUtils.getYamlDataObject(targetFileName1, PlainAccessData.class); - String targetFileName2 = Joiner.on(File.separator).join(new String[]{home.getAbsolutePath(), "conf", "acl", "empty.yml"}); + String targetFileName2 = Joiner.on(File.separator) + .join(new String[] { home.getAbsolutePath(), "conf", "acl", "empty.yml" }); PlainAccessData backUpAclConfigMap2 = AclUtils.getYamlDataObject(targetFileName2, PlainAccessData.class); PlainAccessValidator plainAccessValidator = new PlainAccessValidator(); @@ -1084,20 +1146,21 @@ public void testUpdateSpecifiedAclFileGlobalWhiteAddrsConfig() throws IOExceptio List globalWhiteAddrsList3 = new ArrayList<>(); globalWhiteAddrsList3.add("10.10.154.3"); - //Test parameter p is null + // Test parameter p is null plainAccessValidator.updateGlobalWhiteAddrsConfig(globalWhiteAddrsList1, null); String defaultAclFile = targetFileName; PlainAccessData defaultAclFileMap = AclUtils.getYamlDataObject(defaultAclFile, PlainAccessData.class); List defaultAclFileGlobalWhiteAddrList = defaultAclFileMap.getGlobalWhiteRemoteAddresses(); Assert.assertTrue(defaultAclFileGlobalWhiteAddrList.contains("10.10.154.1")); - //Test parameter p is not null + // Test parameter p is not null plainAccessValidator.updateGlobalWhiteAddrsConfig(globalWhiteAddrsList2, targetFileName1); - PlainAccessData aclFileMap1 = AclUtils.getYamlDataObject(targetFileName1, PlainAccessData.class); + PlainAccessData aclFileMap1 = AclUtils.getYamlDataObject(targetFileName1, PlainAccessData.class); List aclFileGlobalWhiteAddrList1 = aclFileMap1.getGlobalWhiteRemoteAddresses(); Assert.assertTrue(aclFileGlobalWhiteAddrList1.contains("10.10.154.2")); - //Test parameter p is not null, but the file does not have globalWhiteRemoteAddresses + // Test parameter p is not null, but the file does not have + // globalWhiteRemoteAddresses plainAccessValidator.updateGlobalWhiteAddrsConfig(globalWhiteAddrsList3, targetFileName2); - PlainAccessData aclFileMap2 = AclUtils.getYamlDataObject(targetFileName2, PlainAccessData.class); + PlainAccessData aclFileMap2 = AclUtils.getYamlDataObject(targetFileName2, PlainAccessData.class); List aclFileGlobalWhiteAddrList2 = aclFileMap2.getGlobalWhiteRemoteAddresses(); Assert.assertTrue(aclFileGlobalWhiteAddrList2.contains("10.10.154.3")); @@ -1108,5 +1171,4 @@ public void testUpdateSpecifiedAclFileGlobalWhiteAddrsConfig() throws IOExceptio AclTestHelper.recursiveDelete(home); } - } diff --git a/acl/src/test/java/org/apache/rocketmq/acl/plain/PlainPermissionManagerTest.java b/acl/src/test/java/org/apache/rocketmq/acl/plain/PlainPermissionManagerTest.java index 941d8c77923..2630241a946 100644 --- a/acl/src/test/java/org/apache/rocketmq/acl/plain/PlainPermissionManagerTest.java +++ b/acl/src/test/java/org/apache/rocketmq/acl/plain/PlainPermissionManagerTest.java @@ -27,6 +27,7 @@ import org.apache.rocketmq.remoting.protocol.DataVersion; import org.assertj.core.api.Assertions; import org.assertj.core.util.Lists; +import org.awaitility.Awaitility; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -34,6 +35,7 @@ import java.io.File; import java.io.FileWriter; import java.io.IOException; +import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.HashSet; @@ -127,9 +129,12 @@ public void buildPlainAccessResourceTest() { Map resourcePermMap = plainAccessResource.getResourcePermMap(); Assert.assertEquals(resourcePermMap.size(), 3); - Assert.assertEquals(resourcePermMap.get(PlainAccessResource.getRetryTopic("groupA")).byteValue(), Permission.DENY); - Assert.assertEquals(resourcePermMap.get(PlainAccessResource.getRetryTopic("groupB")).byteValue(), Permission.PUB | Permission.SUB); - Assert.assertEquals(resourcePermMap.get(PlainAccessResource.getRetryTopic("groupC")).byteValue(), Permission.PUB); + Assert.assertEquals(resourcePermMap.get(PlainAccessResource.getRetryTopic("groupA")).byteValue(), + Permission.DENY); + Assert.assertEquals(resourcePermMap.get(PlainAccessResource.getRetryTopic("groupB")).byteValue(), + Permission.PUB | Permission.SUB); + Assert.assertEquals(resourcePermMap.get(PlainAccessResource.getRetryTopic("groupC")).byteValue(), + Permission.PUB); List topics = new ArrayList<>(); topics.add("topicA=DENY"); @@ -205,11 +210,13 @@ public void passWordThanTest() { @Test public void cleanAuthenticationInfoTest() throws IllegalAccessException { // PlainPermissionManager.addPlainAccessResource(plainAccessResource); - Map> plainAccessResourceMap = (Map>) FieldUtils.readDeclaredField(plainPermissionManager, "aclPlainAccessResourceMap", true); + Map> plainAccessResourceMap = (Map>) FieldUtils + .readDeclaredField(plainPermissionManager, "aclPlainAccessResourceMap", true); Assert.assertFalse(plainAccessResourceMap.isEmpty()); plainPermissionManager.clearPermissionInfo(); - plainAccessResourceMap = (Map>) FieldUtils.readDeclaredField(plainPermissionManager, "aclPlainAccessResourceMap", true); + plainAccessResourceMap = (Map>) FieldUtils + .readDeclaredField(plainPermissionManager, "aclPlainAccessResourceMap", true); Assert.assertTrue(plainAccessResourceMap.isEmpty()); } @@ -222,7 +229,8 @@ public void isWatchStartTest() { @Test public void testWatch() throws IOException, IllegalAccessException, InterruptedException { - String fileName = Joiner.on(File.separator).join(new String[]{System.getProperty("rocketmq.home.dir"), "conf", "acl", "plain_acl_test.yml"}); + String fileName = Joiner.on(File.separator) + .join(new String[] { System.getProperty("rocketmq.home.dir"), "conf", "acl", "plain_acl_test.yml" }); File transport = new File(fileName); transport.delete(); transport.createNewFile(); @@ -235,15 +243,17 @@ public void testWatch() throws IOException, IllegalAccessException, InterruptedE writer.flush(); writer.close(); - Thread.sleep(1000); + Awaitility.await().pollDelay(Duration.ofMillis(1000)).until(() -> true); PlainPermissionManager plainPermissionManager = new PlainPermissionManager(); Assert.assertTrue(plainPermissionManager.isWatchStart()); - Map accessKeyTable = (Map) FieldUtils.readDeclaredField(plainPermissionManager, "accessKeyTable", true); + Map accessKeyTable = (Map) FieldUtils.readDeclaredField(plainPermissionManager, + "accessKeyTable", true); String aclFileName = accessKeyTable.get("watchrocketmqx"); { - Map> plainAccessResourceMap = (Map>) FieldUtils.readDeclaredField(plainPermissionManager, "aclPlainAccessResourceMap", true); + Map> plainAccessResourceMap = (Map>) FieldUtils + .readDeclaredField(plainPermissionManager, "aclPlainAccessResourceMap", true); PlainAccessResource accessResource = plainAccessResourceMap.get(aclFileName).get("watchrocketmqx"); Assert.assertNotNull(accessResource); Assert.assertEquals(accessResource.getSecretKey(), "12345678"); @@ -259,9 +269,10 @@ public void testWatch() throws IOException, IllegalAccessException, InterruptedE // Update file and flush to yaml file AclUtils.writeDataObject(fileName, updatedMap); - Thread.sleep(10000); + Awaitility.await().pollDelay(Duration.ofMillis(10000)).until(() -> true); { - Map> plainAccessResourceMap = (Map>) FieldUtils.readDeclaredField(plainPermissionManager, "aclPlainAccessResourceMap", true); + Map> plainAccessResourceMap = (Map>) FieldUtils + .readDeclaredField(plainPermissionManager, "aclPlainAccessResourceMap", true); PlainAccessResource accessResource = plainAccessResourceMap.get(aclFileName).get("watchrocketmq1y"); Assert.assertNotNull(accessResource); Assert.assertEquals(accessResource.getSecretKey(), "88888888"); @@ -337,7 +348,8 @@ public void createAclAccessConfigMapTest() { plainAccessConfig.setTopicPerms(Arrays.asList(DEFAULT_TOPIC + "=" + AclConstants.PUB)); plainAccessConfig.setGroupPerms(Lists.newArrayList("groupA=SUB")); - final PlainAccessConfig map = plainPermissionManager.createAclAccessConfigMap(existedAccountMap, plainAccessConfig); + final PlainAccessConfig map = plainPermissionManager.createAclAccessConfigMap(existedAccountMap, + plainAccessConfig); Assert.assertEquals(AclConstants.SUB_PUB, map.getDefaultGroupPerm()); Assert.assertEquals("groupA=SUB", map.getGroupPerms().get(0)); Assert.assertEquals("12345678", map.getSecretKey()); @@ -362,7 +374,7 @@ public void deleteAccessConfigTest() throws InterruptedException { plainAccessConfig.setGroupPerms(Lists.newArrayList("groupA=SUB")); plainPermissionManager.updateAccessConfig(plainAccessConfig); - //delete existed accessConfig + // delete existed accessConfig final boolean flag2 = plainPermissionManager.deleteAccessConfig("test_delete"); assert flag2; diff --git a/broker/src/test/java/org/apache/rocketmq/broker/BrokerControllerTest.java b/broker/src/test/java/org/apache/rocketmq/broker/BrokerControllerTest.java index 6035a20acb2..eada122dc19 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/BrokerControllerTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/BrokerControllerTest.java @@ -18,6 +18,7 @@ package org.apache.rocketmq.broker; import java.io.File; +import java.time.Duration; import java.util.UUID; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; @@ -30,6 +31,7 @@ import org.apache.rocketmq.remoting.netty.NettyServerConfig; import org.apache.rocketmq.remoting.netty.RequestTask; import org.apache.rocketmq.store.config.MessageStoreConfig; +import org.awaitility.Awaitility; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -91,7 +93,7 @@ public void run() { queue.add(new FutureTaskExt<>(requestTask, null)); long headSlowTimeMills = 100; - TimeUnit.MILLISECONDS.sleep(headSlowTimeMills); + Awaitility.await().pollDelay(Duration.ofMillis(headSlowTimeMills)).until(()->true); assertThat(brokerController.headSlowTimeMills(queue)).isGreaterThanOrEqualTo(headSlowTimeMills); } } diff --git a/broker/src/test/java/org/apache/rocketmq/broker/BrokerOuterAPITest.java b/broker/src/test/java/org/apache/rocketmq/broker/BrokerOuterAPITest.java index 2541e755e39..b92e0bc14bc 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/BrokerOuterAPITest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/BrokerOuterAPITest.java @@ -23,6 +23,7 @@ import io.netty.channel.ChannelHandlerContext; import java.lang.reflect.Field; import java.lang.reflect.Method; +import java.time.Duration; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -48,6 +49,7 @@ import org.apache.rocketmq.remoting.protocol.route.BrokerData; import org.apache.rocketmq.store.MessageStore; import org.apache.rocketmq.store.config.MessageStoreConfig; +import org.awaitility.Awaitility; import org.junit.Assert; import org.junit.Test; import org.junit.runner.RunWith; @@ -131,7 +133,7 @@ public RemotingCommand answer(InvocationOnMock invocation) throws Throwable { } else if (invocation.getArgument(0) == nameserver2) { return buildResponse(Boolean.FALSE); } else if (invocation.getArgument(0) == nameserver3) { - TimeUnit.MILLISECONDS.sleep(timeOut + 20); + Awaitility.await().pollDelay(Duration.ofMillis(timeOut+20)).until(()->true); return buildResponse(Boolean.TRUE); } return buildResponse(Boolean.TRUE); diff --git a/broker/src/test/java/org/apache/rocketmq/broker/controller/ReplicasManagerTest.java b/broker/src/test/java/org/apache/rocketmq/broker/controller/ReplicasManagerTest.java index c863f7ac96c..65c70e93da4 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/controller/ReplicasManagerTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/controller/ReplicasManagerTest.java @@ -18,6 +18,7 @@ package org.apache.rocketmq.broker.controller; import java.io.File; +import java.time.Duration; import java.util.Arrays; import java.util.HashSet; import java.util.UUID; @@ -43,6 +44,7 @@ import org.apache.rocketmq.store.ha.autoswitch.AutoSwitchHAService; import org.assertj.core.api.Assertions; import org.assertj.core.util.Sets; +import org.awaitility.Awaitility; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -171,7 +173,7 @@ public void before() throws Exception { autoSwitchHAService.init(defaultMessageStore); replicasManager.start(); // execute schedulingSyncBrokerMetadata() - TimeUnit.SECONDS.sleep(SCHEDULE_SERVICE_EXEC_PERIOD); + Awaitility.await().pollDelay(Duration.ofSeconds(SCHEDULE_SERVICE_EXEC_PERIOD)).until(()->true); } @After diff --git a/broker/src/test/java/org/apache/rocketmq/broker/filter/MessageStoreWithFilterTest.java b/broker/src/test/java/org/apache/rocketmq/broker/filter/MessageStoreWithFilterTest.java index 84bca916998..25e4f3771f9 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/filter/MessageStoreWithFilterTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/filter/MessageStoreWithFilterTest.java @@ -23,6 +23,7 @@ import java.net.SocketAddress; import java.net.UnknownHostException; import java.nio.ByteBuffer; +import java.time.Duration; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -47,6 +48,7 @@ import org.apache.rocketmq.store.PutMessageResult; import org.apache.rocketmq.store.config.MessageStoreConfig; import org.apache.rocketmq.store.stats.BrokerStatsManager; +import org.awaitility.Awaitility; import org.awaitility.core.ThrowingRunnable; import org.junit.After; import org.junit.Assume; @@ -234,7 +236,8 @@ protected List filtered(List msgs, public void testGetMessage_withFilterBitMapAndConsumerChanged() throws Exception { List msgs = putMsg(master, topicCount, msgPerTopic); - Thread.sleep(200); + Awaitility.await().pollDelay(Duration.ofMillis(200)).until(()->true); + // reset consumer; String topic = "topic" + 0; @@ -296,7 +299,7 @@ public void testGetMessage_withFilterBitMapAndConsumerChanged() throws Exception public void testGetMessage_withFilterBitMap() throws Exception { List msgs = putMsg(master, topicCount, msgPerTopic); - Thread.sleep(100); + Awaitility.await().pollDelay(Duration.ofMillis(200)).until(()->true); for (int i = 0; i < topicCount; i++) { String realTopic = TOPIC + i; diff --git a/broker/src/test/java/org/apache/rocketmq/broker/latency/BrokerFastFailureTest.java b/broker/src/test/java/org/apache/rocketmq/broker/latency/BrokerFastFailureTest.java index 31b547cf1be..bb98aa99d44 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/latency/BrokerFastFailureTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/latency/BrokerFastFailureTest.java @@ -16,11 +16,13 @@ */ package org.apache.rocketmq.broker.latency; +import java.time.Duration; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import org.apache.rocketmq.common.future.FutureTaskExt; import org.apache.rocketmq.remoting.netty.RequestTask; +import org.awaitility.Awaitility; import org.junit.Test; import static org.assertj.core.api.Assertions.assertThat; @@ -52,7 +54,8 @@ public void run() { //With expired request RequestTask expiredRequest = new RequestTask(runnable, null, null); queue.add(new FutureTaskExt<>(expiredRequest, null)); - TimeUnit.MILLISECONDS.sleep(100); + + Awaitility.await().pollDelay(Duration.ofMillis(100)).until(()->true); RequestTask requestTask = new RequestTask(runnable, null, null); queue.add(new FutureTaskExt<>(requestTask, null)); diff --git a/broker/src/test/java/org/apache/rocketmq/broker/processor/PopBufferMergeServiceTest.java b/broker/src/test/java/org/apache/rocketmq/broker/processor/PopBufferMergeServiceTest.java index acc7a3da74a..83a1fc1eb7b 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/processor/PopBufferMergeServiceTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/processor/PopBufferMergeServiceTest.java @@ -32,6 +32,7 @@ import org.apache.rocketmq.store.config.MessageStoreConfig; import org.apache.rocketmq.store.pop.AckMsg; import org.apache.rocketmq.store.pop.PopCheckPoint; +import org.awaitility.Awaitility; import org.junit.Assume; import org.junit.Before; import org.junit.Test; @@ -44,6 +45,8 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.Mockito.mock; +import java.time.Duration; + @RunWith(MockitoJUnitRunner.Silent.class) public class PopBufferMergeServiceTest { @Spy @@ -115,7 +118,7 @@ public void testBasic() throws Exception { try { assertThat(popBufferMergeService.addCk(ck, reviveQid, ackOffset, nextBeginOffset)).isTrue(); assertThat(popBufferMergeService.getLatestOffset(topic, group, queueId)).isEqualTo(nextBeginOffset); - Thread.sleep(1000); // wait background threads of PopBufferMergeService run for some time + Awaitility.await().pollDelay(Duration.ofMillis(1000)).until(()->true); // wait background threads of PopBufferMergeService run for some time assertThat(popBufferMergeService.addAk(reviveQid, ackMsg)).isTrue(); assertThat(popBufferMergeService.getLatestOffset(topic, group, queueId)).isEqualTo(nextBeginOffset); } finally { diff --git a/broker/src/test/java/org/apache/rocketmq/broker/schedule/ScheduleMessageServiceTest.java b/broker/src/test/java/org/apache/rocketmq/broker/schedule/ScheduleMessageServiceTest.java index b90fb2931d5..f1e91a57a94 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/schedule/ScheduleMessageServiceTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/schedule/ScheduleMessageServiceTest.java @@ -24,6 +24,7 @@ import java.net.SocketAddress; import java.net.UnknownHostException; import java.nio.ByteBuffer; +import java.time.Duration; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -48,6 +49,7 @@ import org.apache.rocketmq.store.PutMessageResult; import org.apache.rocketmq.store.config.MessageStoreConfig; import org.apache.rocketmq.store.stats.BrokerStatsManager; +import org.awaitility.Awaitility; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -218,7 +220,7 @@ public void testDeliverDelayedMessageTimerTask() throws Exception { // timer run maybe delay, then consumer message again // and wait offsetTable - TimeUnit.SECONDS.sleep(15); + Awaitility.await().pollDelay(Duration.ofSeconds(15)).until(()->true); scheduleMessageService.buildRunningStats(new HashMap<>()); messageResult = getMessage(realQueueId, offset); diff --git a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java index 24e39f56689..4c61593924f 100644 --- a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java @@ -54,6 +54,7 @@ import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.remoting.protocol.header.PullMessageRequestHeader; import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel; +import org.awaitility.Awaitility; import org.junit.After; import org.junit.Before; import org.junit.BeforeClass; @@ -480,7 +481,7 @@ public void onChanged(String topic, Set messageQueues) { Set set = new HashSet<>(); set.add(createMessageQueue()); doReturn(set).when(mQAdminImpl).fetchSubscribeMessageQueues(anyString()); - Thread.sleep(11 * 1000); + Awaitility.await().pollDelay(Duration.ofMillis(11*1000)).until(()->true); assertThat(flag).isTrue(); } finally { litePullConsumer.shutdown(); @@ -644,7 +645,7 @@ public void testConsumerAfterShutdown() throws Exception { new AsyncConsumer().executeAsync(defaultLitePullConsumer); - Thread.sleep(100); + Awaitility.await().pollDelay(Duration.ofMillis(100)).until(()->true); defaultLitePullConsumer.shutdown(); assertThat(defaultLitePullConsumer.isRunning()).isFalse(); } diff --git a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java index 3943b922899..b626f223d8e 100644 --- a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java @@ -19,6 +19,7 @@ import java.io.ByteArrayOutputStream; import java.lang.reflect.Field; import java.net.InetSocketAddress; +import java.time.Duration; import java.util.Collections; import java.util.HashSet; import java.util.List; @@ -61,6 +62,7 @@ import org.apache.rocketmq.remoting.exception.RemotingException; import org.apache.rocketmq.remoting.protocol.header.PullMessageRequestHeader; import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel; +import org.awaitility.Awaitility; import org.junit.AfterClass; import org.junit.Assert; import org.junit.Before; @@ -312,9 +314,9 @@ public ConsumeConcurrentlyStatus consumeMessage(List msgs, assertThat(msgs.get(0).getBody()).isEqualTo(msgBody); countDownLatch.countDown(); try { - Thread.sleep(1000); + Awaitility.await().pollDelay(Duration.ofMillis(1000)).until(()->true); messageConsumedFlag.set(true); - } catch (InterruptedException e) { + } catch (Exception e) { } return null; diff --git a/client/src/test/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueConsitentHashTest.java b/client/src/test/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueConsitentHashTest.java index 261d6d65a68..7c38abbf95a 100644 --- a/client/src/test/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueConsitentHashTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueConsitentHashTest.java @@ -16,6 +16,7 @@ */ package org.apache.rocketmq.client.consumer.rebalance; +import java.time.Duration; import java.util.ArrayList; import java.util.Iterator; import java.util.List; @@ -24,6 +25,7 @@ import java.util.TreeMap; import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy; import org.apache.rocketmq.common.message.MessageQueue; +import org.awaitility.Awaitility; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -85,8 +87,8 @@ public void testRun100RandomCase() { int queueSize = new Random().nextInt(20) + 1;//1-20 testAllocate(queueSize, consumerSize); try { - Thread.sleep(1); - } catch (InterruptedException e) { + Awaitility.await().pollDelay(Duration.ofMillis(1)).until(()->true); + } catch (Exception e) { } } } diff --git a/client/src/test/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyServiceTest.java b/client/src/test/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyServiceTest.java index 749201e3c22..9304473d36d 100644 --- a/client/src/test/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyServiceTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyServiceTest.java @@ -19,6 +19,7 @@ import java.io.ByteArrayOutputStream; import java.lang.reflect.Field; import java.net.InetSocketAddress; +import java.time.Duration; import java.util.Collection; import java.util.Collections; import java.util.HashSet; @@ -52,6 +53,7 @@ import org.apache.rocketmq.remoting.exception.RemotingException; import org.apache.rocketmq.remoting.protocol.body.ConsumeStatus; import org.apache.rocketmq.remoting.protocol.header.PullMessageRequestHeader; +import org.awaitility.Awaitility; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -181,7 +183,7 @@ public ConsumeConcurrentlyStatus consumeMessage(List msgs, pullMessageService.executePullRequestImmediately(createPullRequest()); countDownLatch.await(); - Thread.sleep(1000); + Awaitility.await().pollDelay(Duration.ofMillis(1000)).until(()->true); ConsumeStatus stats = normalServie.getConsumerStatsManager().consumeStatus(pushConsumer.getDefaultMQPushConsumerImpl().groupName(),topic); diff --git a/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java b/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java index d4153c7cd97..0f357edf20c 100644 --- a/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java @@ -17,6 +17,7 @@ package org.apache.rocketmq.client.producer; import java.lang.reflect.Field; +import java.time.Duration; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -50,6 +51,8 @@ import org.apache.rocketmq.remoting.protocol.route.BrokerData; import org.apache.rocketmq.remoting.protocol.route.QueueData; import org.apache.rocketmq.remoting.protocol.route.TopicRouteData; +import org.awaitility.Awaitility; +import org.awaitility.core.ConditionTimeoutException; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -406,8 +409,8 @@ public void run() { assertThat(responseMap).isNotNull(); while (!finish.get()) { try { - Thread.sleep(10); - } catch (InterruptedException e) { + Awaitility.await().pollDelay(Duration.ofMillis(10)).until(()->true); + } catch (ConditionTimeoutException e) { } MessageExt responseMsg = new MessageExt(); responseMsg.setTopic(message.getTopic()); diff --git a/common/src/test/java/org/apache/rocketmq/common/stats/StatsItemSetTest.java b/common/src/test/java/org/apache/rocketmq/common/stats/StatsItemSetTest.java index b02fb60ae49..f996f410ee9 100644 --- a/common/src/test/java/org/apache/rocketmq/common/stats/StatsItemSetTest.java +++ b/common/src/test/java/org/apache/rocketmq/common/stats/StatsItemSetTest.java @@ -17,6 +17,7 @@ package org.apache.rocketmq.common.stats; +import java.time.Duration; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -26,6 +27,7 @@ import java.util.concurrent.atomic.LongAdder; import org.apache.rocketmq.common.ThreadFactoryImpl; +import org.awaitility.Awaitility; import org.junit.After; import org.junit.Test; @@ -66,7 +68,7 @@ public void run() { if (executor.getCompletedTaskCount() == 10) { break; } - Thread.sleep(1000); + Awaitility.await().pollDelay(Duration.ofMillis(1000)).until(()->true); } // simulate schedule task execution , tps stat { @@ -113,7 +115,7 @@ public void run() { if (executor.getCompletedTaskCount() == 10) { break; } - Thread.sleep(1000); + Awaitility.await().pollDelay(Duration.ofMillis(1000)).until(()->true); } return statsItemSet.getStatsItem("topicTest").getValue(); } @@ -134,7 +136,7 @@ public void run() { if (executor.getCompletedTaskCount() == 10) { break; } - Thread.sleep(1000); + Awaitility.await().pollDelay(Duration.ofMillis(1000)).until(()->true); } return statsItemSet.getAndCreateStatsItem("test").getValue(); } diff --git a/controller/src/test/java/org/apache/rocketmq/controller/impl/DLedgerControllerTest.java b/controller/src/test/java/org/apache/rocketmq/controller/impl/DLedgerControllerTest.java index d6e5449c51b..538fa51b772 100644 --- a/controller/src/test/java/org/apache/rocketmq/controller/impl/DLedgerControllerTest.java +++ b/controller/src/test/java/org/apache/rocketmq/controller/impl/DLedgerControllerTest.java @@ -45,6 +45,7 @@ import org.apache.rocketmq.remoting.protocol.header.controller.register.GetNextBrokerIdRequestHeader; import org.apache.rocketmq.remoting.protocol.header.controller.register.GetNextBrokerIdResponseHeader; import org.apache.rocketmq.remoting.protocol.header.controller.register.RegisterBrokerToControllerRequestHeader; +import org.awaitility.Awaitility; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -291,7 +292,7 @@ public void testBrokerLifecycleListener() throws Exception { assertEquals(DEFAULT_BROKER_NAME, brokerName); atomicBoolean.set(true); }); - Thread.sleep(2000); + Awaitility.await().pollDelay(Duration.ofMillis(2000)).until(()->true); assertTrue(atomicBoolean.get()); } diff --git a/filter/src/test/java/org/apache/rocketmq/filter/BitsArrayTest.java b/filter/src/test/java/org/apache/rocketmq/filter/BitsArrayTest.java index 1ba736019d2..75e1fe6e552 100644 --- a/filter/src/test/java/org/apache/rocketmq/filter/BitsArrayTest.java +++ b/filter/src/test/java/org/apache/rocketmq/filter/BitsArrayTest.java @@ -18,8 +18,11 @@ package org.apache.rocketmq.filter; import org.apache.rocketmq.filter.util.BitsArray; +import org.awaitility.Awaitility; +import org.awaitility.core.ConditionTimeoutException; import org.junit.Test; +import java.time.Duration; import java.util.Random; import static org.assertj.core.api.Assertions.assertThat; @@ -32,8 +35,8 @@ BitsArray gen(int bitCount) { for (int i = 0; i < bitCount / Byte.SIZE; i++) { bitsArray.setByte(i, (byte) (new Random(System.currentTimeMillis())).nextInt(0xff)); try { - Thread.sleep(2); - } catch (InterruptedException e) { + Awaitility.await().pollDelay(Duration.ofMillis(2)).until(()->true); + } catch (ConditionTimeoutException e) { } } diff --git a/namesrv/src/test/java/org/apache/rocketmq/namesrv/processor/ClusterTestRequestProcessorTest.java b/namesrv/src/test/java/org/apache/rocketmq/namesrv/processor/ClusterTestRequestProcessorTest.java index 283f9033021..f2459327f4f 100644 --- a/namesrv/src/test/java/org/apache/rocketmq/namesrv/processor/ClusterTestRequestProcessorTest.java +++ b/namesrv/src/test/java/org/apache/rocketmq/namesrv/processor/ClusterTestRequestProcessorTest.java @@ -19,6 +19,7 @@ import io.netty.channel.ChannelHandlerContext; import java.lang.reflect.Field; +import java.time.Duration; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -43,6 +44,7 @@ import org.apache.rocketmq.remoting.protocol.route.TopicRouteData; import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; import org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl; +import org.awaitility.Awaitility; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -167,7 +169,7 @@ public void testNamesrv() throws Exception { RemotingCommand response = clientRequestProcessor.processRequest(mock(ChannelHandlerContext.class), remotingCommand); assertThat(response.getCode()).isEqualTo(ResponseCode.SYSTEM_ERROR); - TimeUnit.SECONDS.sleep(waitSecondsForService + 1); + Awaitility.await().pollDelay(Duration.ofSeconds(waitSecondsForService+1)).until(()->true); response = clientRequestProcessor.processRequest(mock(ChannelHandlerContext.class), remotingCommand); assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS); } diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/service/route/ClusterTopicRouteServiceTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/service/route/ClusterTopicRouteServiceTest.java index 15d83483b9d..f0113ad9be5 100644 --- a/proxy/src/test/java/org/apache/rocketmq/proxy/service/route/ClusterTopicRouteServiceTest.java +++ b/proxy/src/test/java/org/apache/rocketmq/proxy/service/route/ClusterTopicRouteServiceTest.java @@ -22,6 +22,7 @@ import com.github.benmanes.caffeine.cache.LoadingCache; import com.google.common.net.HostAndPort; +import java.time.Duration; import java.util.HashMap; import java.util.List; import java.util.concurrent.ThreadPoolExecutor; @@ -38,6 +39,7 @@ import org.apache.rocketmq.remoting.protocol.route.QueueData; import org.apache.rocketmq.remoting.protocol.route.TopicRouteData; import org.assertj.core.util.Lists; +import org.awaitility.Awaitility; import org.checkerframework.checker.nullness.qual.NonNull; import org.checkerframework.checker.nullness.qual.Nullable; import org.junit.Before; @@ -161,7 +163,7 @@ public void testTopicRouteCaffeineCache() throws InterruptedException { } }); assertThat(value).isEqualTo(topicCache.get(key)); - TimeUnit.SECONDS.sleep(5); + Awaitility.await().pollDelay(Duration.ofSeconds(5)).until(()->true); assertThat(value).isEqualTo(topicCache.get(key)); } } diff --git a/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueExtTest.java b/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueExtTest.java index b1ec617ecfc..25402e531f0 100644 --- a/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueExtTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueExtTest.java @@ -18,8 +18,11 @@ package org.apache.rocketmq.store; import java.io.File; +import java.time.Duration; import java.util.Random; import org.apache.rocketmq.common.UtilAll; +import org.awaitility.Awaitility; +import org.awaitility.core.ConditionTimeoutException; import org.junit.After; import org.junit.Test; @@ -79,8 +82,8 @@ protected void putSth(ConsumeQueueExt consumeQueueExt, boolean getAfterPut, } try { - Thread.sleep(10); - } catch (InterruptedException e) { + Awaitility.await().pollDelay(Duration.ofMillis(10)).until(()->true); + } catch (ConditionTimeoutException e) { e.printStackTrace(); assertThat(false).isTrue(); } diff --git a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java index 1d09ca86ecb..917ebd4edfb 100644 --- a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java @@ -30,6 +30,7 @@ import java.nio.channels.FileChannel; import java.nio.channels.OverlappingFileLockException; import java.nio.charset.StandardCharsets; +import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.HashSet; @@ -58,6 +59,8 @@ import org.apache.rocketmq.store.queue.CqUnit; import org.apache.rocketmq.store.stats.BrokerStatsManager; import org.assertj.core.util.Strings; +import org.awaitility.Awaitility; +import org.awaitility.core.ConditionTimeoutException; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -405,9 +408,9 @@ private AppendMessageResult[] putMessages(int totalCount, String topic, int queu assertThat(result.getPutMessageStatus()).isEqualTo(PutMessageStatus.PUT_OK); if (interval) { try { - Thread.sleep(10); - } catch (InterruptedException e) { - throw new RuntimeException("Thread sleep ERROR"); + Awaitility.await().pollDelay(Duration.ofMillis(10)).until(()->true); + } catch (ConditionTimeoutException e) { + throw new RuntimeException("Timeout ERROR"); } } } @@ -553,7 +556,7 @@ public void testMaxOffset() throws InterruptedException { } while (messageStore.dispatchBehindBytes() != 0) { - TimeUnit.MILLISECONDS.sleep(1); + Awaitility.await().pollDelay(Duration.ofMillis(1)).until(()->true); } assertThat(messageStore.getMaxOffsetInQueue(messageTopic, queueId)).isEqualTo(firstBatchMessages); diff --git a/store/src/test/java/org/apache/rocketmq/store/FlushDiskWatcherTest.java b/store/src/test/java/org/apache/rocketmq/store/FlushDiskWatcherTest.java index 97968f5938a..4777fa8272e 100644 --- a/store/src/test/java/org/apache/rocketmq/store/FlushDiskWatcherTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/FlushDiskWatcherTest.java @@ -18,9 +18,11 @@ package org.apache.rocketmq.store; import org.apache.rocketmq.store.CommitLog.GroupCommitRequest; +import org.awaitility.Awaitility; import org.junit.Assert; import org.junit.Test; +import java.time.Duration; import java.util.LinkedList; import java.util.List; @@ -42,7 +44,7 @@ public void testTimeout() throws Exception { requestList.add(groupCommitRequest); flushDiskWatcher.add(groupCommitRequest); } - Thread.sleep(2 * timeoutMill); + Awaitility.await().pollDelay(Duration.ofMillis(2*timeoutMill)).until(()->true); for (GroupCommitRequest request : requestList) { request.wakeupCustomer(PutMessageStatus.PUT_OK); @@ -71,7 +73,7 @@ public void testWatcher() throws Exception { flushDiskWatcher.add(groupCommitRequest); groupCommitRequest.wakeupCustomer(PutMessageStatus.PUT_OK); } - Thread.sleep((timeoutMill << 20) / 1000000); + Awaitility.await().pollDelay(Duration.ofMillis((timeoutMill<<20)/1000000)); for (GroupCommitRequest request : requestList) { Assert.assertTrue(request.future().isDone()); Assert.assertEquals(request.future().get(), PutMessageStatus.PUT_OK); diff --git a/store/src/test/java/org/apache/rocketmq/store/HATest.java b/store/src/test/java/org/apache/rocketmq/store/HATest.java index 38a04358174..a2ec9796f0b 100644 --- a/store/src/test/java/org/apache/rocketmq/store/HATest.java +++ b/store/src/test/java/org/apache/rocketmq/store/HATest.java @@ -40,6 +40,7 @@ import org.apache.rocketmq.store.config.MessageStoreConfig; import org.apache.rocketmq.store.ha.HAConnectionState; import org.apache.rocketmq.store.stats.BrokerStatsManager; +import org.awaitility.Awaitility; import org.junit.After; import org.junit.Assume; import org.junit.Before; @@ -189,7 +190,7 @@ public void testSemiSyncReplicaWhenSlaveActingMaster() throws Exception { messageStore.setAliveReplicaNumInGroup(1); //wait to let master clean the slave's connection - Thread.sleep(masterMessageStoreConfig.getHaHousekeepingInterval() + 500); + Awaitility.await().pollDelay(Duration.ofMillis(masterMessageStoreConfig.getHaHousekeepingInterval()+500)); for (long i = 0; i < totalMsgs; i++) { CompletableFuture putResultFuture = messageStore.asyncPutMessage(buildMessage()); PutMessageResult result = putResultFuture.get(); diff --git a/store/src/test/java/org/apache/rocketmq/store/MappedFileQueueTest.java b/store/src/test/java/org/apache/rocketmq/store/MappedFileQueueTest.java index d92b3cbc0d9..8acab853178 100644 --- a/store/src/test/java/org/apache/rocketmq/store/MappedFileQueueTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/MappedFileQueueTest.java @@ -25,6 +25,7 @@ import org.apache.rocketmq.store.logfile.DefaultMappedFile; import org.apache.rocketmq.store.logfile.MappedFile; import org.assertj.core.util.Lists; +import org.awaitility.Awaitility; import org.junit.After; import org.junit.Assume; import org.junit.Test; @@ -33,6 +34,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; +import java.time.Duration; import java.util.Arrays; import java.util.List; import java.util.concurrent.Executors; @@ -298,7 +300,7 @@ public void testMappedFile_SwapMap() { while (!readOver.get()) { for (MappedFile mappedFile : mappedFileQueue.getMappedFiles()) { mappedFile.swapMap(); - Thread.sleep(10); + Awaitility.await().pollDelay(Duration.ofMillis(10)).until(()->true); mappedFile.cleanSwapedMap(true); } } @@ -318,7 +320,7 @@ public void testMappedFile_SwapMap() { mappedFile = mappedFileQueue.findMappedFileByOffset(i * fixedMsg.getBytes().length); retryTime++; if (mappedFile == null) { - Thread.sleep(1); + Awaitility.await().pollDelay(Duration.ofMillis(1)).until(()->true); } } assertThat(mappedFile != null).isTrue(); @@ -327,7 +329,7 @@ public void testMappedFile_SwapMap() { while ((pos + fixedMsg.getBytes().length) > mappedFile.getReadPosition() && retryTime < 10000) { retryTime++; if ((pos + fixedMsg.getBytes().length) > mappedFile.getReadPosition()) { - Thread.sleep(1); + Awaitility.await().pollDelay(Duration.ofMillis(1)).until(()->true); } } assertThat((pos + fixedMsg.getBytes().length) <= mappedFile.getReadPosition()).isTrue(); @@ -468,13 +470,13 @@ public void testMappedFile_Rename() throws IOException, InterruptedException { mappedFile.destroy(1000); }); - TimeUnit.SECONDS.sleep(3); + Awaitility.await().pollDelay(Duration.ofMillis(3)).until(()->true); ses.shutdown(); mappedFileQueue.getMappedFiles().clear(); mappedFileQueue.getMappedFiles().addAll(compactingMappedFileQueue.getMappedFiles()); - TimeUnit.SECONDS.sleep(3); + Awaitility.await().pollDelay(Duration.ofMillis(3)).until(()->true); } @After diff --git a/store/src/test/java/org/apache/rocketmq/store/RocksDBMessageStoreTest.java b/store/src/test/java/org/apache/rocketmq/store/RocksDBMessageStoreTest.java index acf5edf5117..3311b1d7317 100644 --- a/store/src/test/java/org/apache/rocketmq/store/RocksDBMessageStoreTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/RocksDBMessageStoreTest.java @@ -29,6 +29,7 @@ import java.nio.channels.FileChannel; import java.nio.channels.OverlappingFileLockException; import java.nio.charset.StandardCharsets; +import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.HashSet; @@ -60,6 +61,8 @@ import org.apache.rocketmq.store.queue.CqUnit; import org.apache.rocketmq.store.stats.BrokerStatsManager; import org.assertj.core.util.Strings; +import org.awaitility.Awaitility; +import org.awaitility.core.ConditionTimeoutException; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -465,9 +468,9 @@ private AppendMessageResult[] putMessages(int totalCount, String topic, int queu assertThat(result.getPutMessageStatus()).isEqualTo(PutMessageStatus.PUT_OK); if (interval) { try { - Thread.sleep(10); - } catch (InterruptedException e) { - throw new RuntimeException("Thread sleep ERROR"); + Awaitility.await().pollDelay(Duration.ofMillis(10)).until(()->true); + } catch (ConditionTimeoutException e) { + throw new RuntimeException("Timeout ERROR"); } } } @@ -619,7 +622,7 @@ public void testMaxOffset() throws InterruptedException { } while (messageStore.dispatchBehindBytes() != 0) { - TimeUnit.MILLISECONDS.sleep(1); + Awaitility.await().pollDelay(Duration.ofMillis(1)).until(()->true); } assertThat(messageStore.getMaxOffsetInQueue(messageTopic, queueId)).isEqualTo(firstBatchMessages); diff --git a/store/src/test/java/org/apache/rocketmq/store/StoreTestUtil.java b/store/src/test/java/org/apache/rocketmq/store/StoreTestUtil.java index 17a2b5e19d7..ffa59874447 100644 --- a/store/src/test/java/org/apache/rocketmq/store/StoreTestUtil.java +++ b/store/src/test/java/org/apache/rocketmq/store/StoreTestUtil.java @@ -25,10 +25,14 @@ import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; import org.apache.rocketmq.store.index.IndexFile; import org.apache.rocketmq.store.index.IndexService; +import org.assertj.core.api.DurationAssert; +import org.awaitility.Awaitility; +import org.awaitility.core.ConditionTimeoutException; import java.lang.reflect.Field; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; +import java.time.Duration; import java.util.ArrayList; @@ -72,8 +76,8 @@ public static void flushConsumeQueue(DefaultMessageStore store) throws Exception public static void waitCommitLogReput(DefaultMessageStore store) { for (int i = 0; i < 500 && isCommitLogAvailable(store); i++) { try { - Thread.sleep(100); - } catch (InterruptedException ignored) { + Awaitility.await().pollDelay(Duration.ofMillis(100)).until(()->true); + } catch (ConditionTimeoutException ignored) { } } diff --git a/store/src/test/java/org/apache/rocketmq/store/queue/BatchConsumeMessageTest.java b/store/src/test/java/org/apache/rocketmq/store/queue/BatchConsumeMessageTest.java index e3ac1b6bdac..d00cdf8a0b4 100644 --- a/store/src/test/java/org/apache/rocketmq/store/queue/BatchConsumeMessageTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/queue/BatchConsumeMessageTest.java @@ -19,6 +19,7 @@ import java.io.File; import java.nio.ByteBuffer; +import java.time.Duration; import java.util.ArrayDeque; import java.util.ArrayList; import java.util.List; @@ -47,6 +48,7 @@ import org.apache.rocketmq.store.PutMessageResult; import org.apache.rocketmq.store.PutMessageStatus; import org.apache.rocketmq.store.SelectMappedBufferResult; +import org.awaitility.Awaitility; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -213,7 +215,7 @@ public void testGetOffsetInQueueByTime() throws Exception { for (int i = 0; i < 19; i++) { PutMessageResult putMessageResult = messageStore.putMessage(buildMessage(topic, batchNum)); Assert.assertEquals(PutMessageStatus.PUT_OK, putMessageResult.getPutMessageStatus()); - Thread.sleep(2); + Awaitility.await().pollDelay(Duration.ofMillis(2)).until(()->true); if (i == 7) timeMid = System.currentTimeMillis(); } @@ -253,7 +255,7 @@ public void testDispatchNormalConsumeQueue() throws Exception { PutMessageResult putMessageResult = messageStore.putMessage(messageExtBrokerInner); Assert.assertEquals(PutMessageStatus.PUT_OK, putMessageResult.getPutMessageStatus()); - Thread.sleep(2); + Awaitility.await().pollDelay(Duration.ofMillis(2)).until(()->true); if (i == 0) { timeStart = putMessageResult.getAppendMessageResult().getStoreTimestamp(); } @@ -308,7 +310,7 @@ public void testDispatchBuildBatchConsumeQueue() throws Exception { for (int i = 0; i < 100; i++) { PutMessageResult putMessageResult = messageStore.putMessage(buildMessage(topic, batchNum)); Assert.assertEquals(PutMessageStatus.PUT_OK, putMessageResult.getPutMessageStatus()); - Thread.sleep(2); + Awaitility.await().pollDelay(Duration.ofMillis(2)).until(()->true); if (i == 0) { timeStart = putMessageResult.getAppendMessageResult().getStoreTimestamp(); } diff --git a/store/src/test/java/org/apache/rocketmq/store/timer/TimerMessageStoreTest.java b/store/src/test/java/org/apache/rocketmq/store/timer/TimerMessageStoreTest.java index 02ff35681d0..3b1f99e8efc 100644 --- a/store/src/test/java/org/apache/rocketmq/store/timer/TimerMessageStoreTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/timer/TimerMessageStoreTest.java @@ -23,6 +23,7 @@ import java.net.InetSocketAddress; import java.net.SocketAddress; import java.nio.ByteBuffer; +import java.time.Duration; import java.util.ArrayList; import java.util.HashSet; import java.util.List; @@ -52,6 +53,7 @@ import org.apache.rocketmq.store.config.FlushDiskType; import org.apache.rocketmq.store.config.MessageStoreConfig; import org.apache.rocketmq.store.stats.BrokerStatsManager; +import org.awaitility.Awaitility; import org.junit.After; import org.junit.Assert; import org.junit.Assume; @@ -250,7 +252,7 @@ public void testTimerFlowControl() throws Exception { } } //wait reput - Thread.sleep(5); + Awaitility.await().pollDelay(Duration.ofMillis(5)).until(()->true); } assertThat(passFlowControlNum).isGreaterThan(0).isLessThan(120); } @@ -407,7 +409,7 @@ public Boolean call() { for (int i = 0; i <= first.getTimerLog().getMappedFileQueue().getMappedFiles().size() + 10; i++) { first.getTimerLog().getMappedFileQueue().flush(0); - Thread.sleep(10); + Awaitility.await().pollDelay(Duration.ofMillis(10)).until(()->true); } // Damage the timer wheel, trigger the check physical pos. @@ -493,7 +495,7 @@ public ByteBuffer getOneMessage(String topic, int queue, long offset, int timeou if (null != getMessageResult && GetMessageStatus.FOUND == getMessageResult.getStatus()) { return getMessageResult.getMessageBufferList().get(0); } - Thread.sleep(100); + Awaitility.await().pollDelay(Duration.ofMillis(100)).until(()->true); } return null; } diff --git a/test/src/main/java/org/apache/rocketmq/test/util/MQAdminTestUtils.java b/test/src/main/java/org/apache/rocketmq/test/util/MQAdminTestUtils.java index d3d5de9e271..9f63bd3e9b7 100644 --- a/test/src/main/java/org/apache/rocketmq/test/util/MQAdminTestUtils.java +++ b/test/src/main/java/org/apache/rocketmq/test/util/MQAdminTestUtils.java @@ -17,6 +17,7 @@ package org.apache.rocketmq.test.util; +import java.time.Duration; import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -52,6 +53,7 @@ import org.apache.rocketmq.tools.command.CommandUtil; import org.apache.rocketmq.tools.command.topic.RemappingStaticTopicSubCommand; import org.apache.rocketmq.tools.command.topic.UpdateStaticTopicSubCommand; +import org.awaitility.Awaitility; import static org.apache.rocketmq.remoting.protocol.statictopic.TopicQueueMappingUtils.getMappingDetailFromConfig; import static org.awaitility.Awaitility.await; @@ -121,7 +123,7 @@ public static boolean createSub(String nameSrvAddr, String clusterName, String c addr)); } catch (Exception e) { e.printStackTrace(); - Thread.sleep(1000 * 1); + Awaitility.await().pollDelay(Duration.ofMillis(1000*1)).until(()->true); } } } catch (Exception e) { @@ -171,7 +173,7 @@ public static boolean awaitStaticTopicMs(long timeMs, String topic, DefaultMQAdm if (checkStaticTopic(topic, defaultMQAdminExt, clientInstance)) { return true; } - Thread.sleep(100); + Awaitility.await().pollDelay(Duration.ofMillis(100)).until(()->true); } return false; } diff --git a/test/src/main/java/org/apache/rocketmq/test/util/TestUtil.java b/test/src/main/java/org/apache/rocketmq/test/util/TestUtil.java index 1013759cd14..c80b71fc0a8 100644 --- a/test/src/main/java/org/apache/rocketmq/test/util/TestUtil.java +++ b/test/src/main/java/org/apache/rocketmq/test/util/TestUtil.java @@ -19,12 +19,16 @@ import java.io.IOException; import java.nio.charset.StandardCharsets; +import java.time.Duration; import java.util.LinkedHashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; +import org.awaitility.Awaitility; +import org.awaitility.core.ConditionTimeoutException; + public final class TestUtil { private TestUtil() { @@ -58,24 +62,24 @@ public static String addQuoteToParamater(String param) { public static void waitForMonment(long time) { try { - Thread.sleep(time); - } catch (InterruptedException e) { + Awaitility.await().pollDelay(Duration.ofMillis(time)).until(()->true); + } catch (ConditionTimeoutException e) { e.printStackTrace(); } } public static void waitForSeconds(long time) { try { - TimeUnit.SECONDS.sleep(time); - } catch (InterruptedException e) { + Awaitility.await().pollDelay(Duration.ofSeconds(time)).until(()->true); + } catch (ConditionTimeoutException e) { e.printStackTrace(); } } public static void waitForMinutes(long time) { try { - TimeUnit.MINUTES.sleep(time); - } catch (InterruptedException e) { + Awaitility.await().pollDelay(Duration.ofMinutes(time)).until(()->true); + } catch (ConditionTimeoutException e) { e.printStackTrace(); } } diff --git a/test/src/main/java/org/apache/rocketmq/test/util/TestUtils.java b/test/src/main/java/org/apache/rocketmq/test/util/TestUtils.java index 3eb1f7d741b..0847af9ae84 100644 --- a/test/src/main/java/org/apache/rocketmq/test/util/TestUtils.java +++ b/test/src/main/java/org/apache/rocketmq/test/util/TestUtils.java @@ -17,13 +17,19 @@ package org.apache.rocketmq.test.util; +import java.time.Duration; import java.util.concurrent.TimeUnit; +import org.awaitility.Awaitility; +import org.awaitility.core.ConditionTimeoutException; + +import io.opentelemetry.sdk.metrics.internal.state.DebugUtils; + public class TestUtils { public static void waitForMoment(long time) { try { - Thread.sleep(time); - } catch (InterruptedException var3) { + Awaitility.await().pollDelay(Duration.ofMillis(time)).until(()->true); + } catch (ConditionTimeoutException var3) { var3.printStackTrace(); } @@ -31,8 +37,8 @@ public static void waitForMoment(long time) { public static void waitForSeconds(long time) { try { - TimeUnit.SECONDS.sleep(time); - } catch (InterruptedException var3) { + Awaitility.await().pollDelay(Duration.ofMillis(time)).until(()->true); + } catch (ConditionTimeoutException var3) { var3.printStackTrace(); } @@ -40,8 +46,8 @@ public static void waitForSeconds(long time) { public static void waitForMinutes(long time) { try { - TimeUnit.MINUTES.sleep(time); - } catch (InterruptedException var3) { + Awaitility.await().pollDelay(Duration.ofMillis(time)).until(()->true); + } catch (ConditionTimeoutException var3) { var3.printStackTrace(); } diff --git a/test/src/test/java/org/apache/rocketmq/test/autoswitchrole/AutoSwitchRoleIntegrationTest.java b/test/src/test/java/org/apache/rocketmq/test/autoswitchrole/AutoSwitchRoleIntegrationTest.java index 00263073ebe..b55fc6b1d76 100644 --- a/test/src/test/java/org/apache/rocketmq/test/autoswitchrole/AutoSwitchRoleIntegrationTest.java +++ b/test/src/test/java/org/apache/rocketmq/test/autoswitchrole/AutoSwitchRoleIntegrationTest.java @@ -18,6 +18,7 @@ package org.apache.rocketmq.test.autoswitchrole; import java.io.File; +import java.time.Duration; import java.util.Random; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -43,6 +44,8 @@ import org.apache.rocketmq.store.ha.HAConnectionState; import org.apache.rocketmq.store.ha.autoswitch.AutoSwitchHAService; import org.apache.rocketmq.store.logfile.MappedFile; +import org.assertj.core.api.DurationAssert; +import org.awaitility.Awaitility; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Ignore; @@ -101,7 +104,7 @@ public void initBroker(int mappedFileSize, String brokerName) throws Exception { this.brokerController2 = startBroker(nameserverAddress, controllerAddress, brokerName, 2, nextPort(), nextPort(), nextPort(), BrokerRole.SLAVE, mappedFileSize); // Wait slave connecting to master assertTrue(waitSlaveReady(this.brokerController2.getMessageStore())); - Thread.sleep(1000); + Awaitility.await().pollDelay(Duration.ofMillis(1000)).until(()->true); } public void mockData(String topic) throws Exception { @@ -118,7 +121,7 @@ public boolean waitSlaveReady(MessageStore messageStore) throws InterruptedExcep if (haClient != null && haClient.getCurrentState().equals(HAConnectionState.TRANSFER)) { return true; } else { - Thread.sleep(2000); + Awaitility.await().pollDelay(Duration.ofMillis(2000)).until(()->true); tryTimes++; } } @@ -162,7 +165,7 @@ public void testChangeMaster() throws Exception { // Let master shutdown brokerController1.shutdown(); brokerList.remove(this.brokerController1); - Thread.sleep(6000); + Awaitility.await().pollDelay(Duration.ofMillis(6000)).until(()->true); // The slave should change to master assertTrue(brokerController2.getReplicasManager().isMasterState()); @@ -191,19 +194,19 @@ public void testRestartWithChangedAddress() throws Exception { String brokerName = "Broker-" + AutoSwitchRoleIntegrationTest.class.getSimpleName() + random.nextInt(65535); int oldPort = nextPort(); this.brokerController1 = startBroker(nameserverAddress, controllerAddress, brokerName, 1, nextPort(), oldPort, oldPort, BrokerRole.SYNC_MASTER, DEFAULT_FILE_SIZE); - Thread.sleep(1000); + Awaitility.await().pollDelay(Duration.ofMillis(1000)).until(()->true); assertTrue(brokerController1.getReplicasManager().isMasterState()); assertEquals(brokerController1.getReplicasManager().getMasterEpoch(), 1); // Let master shutdown brokerController1.shutdown(); brokerList.remove(this.brokerController1); - Thread.sleep(6000); + Awaitility.await().pollDelay(Duration.ofMillis(6000)).until(()->true); // Restart with changed address int newPort = nextPort(); this.brokerController1 = startBroker(nameserverAddress, controllerAddress, brokerName, 1, nextPort(), newPort, newPort, BrokerRole.SYNC_MASTER, DEFAULT_FILE_SIZE); - Thread.sleep(1000); + Awaitility.await().pollDelay(Duration.ofMillis(1000)).until(()->true); // Check broker id assertEquals(1, brokerController1.getReplicasManager().getBrokerControllerId().longValue()); @@ -266,7 +269,7 @@ public void testTruncateEpochLogAndChangeMaster() throws Exception { // Step2: shutdown broker1, broker2 as master brokerController1.shutdown(); brokerList.remove(brokerController1); - Thread.sleep(5000); + Awaitility.await().pollDelay(Duration.ofMillis(5000)).until(()->true); assertTrue(brokerController2.getReplicasManager().isMasterState()); assertEquals(brokerController2.getReplicasManager().getMasterEpoch(), 2); diff --git a/test/src/test/java/org/apache/rocketmq/test/container/ContainerIntegrationTestBase.java b/test/src/test/java/org/apache/rocketmq/test/container/ContainerIntegrationTestBase.java index 016f9084e30..4622edde1f3 100644 --- a/test/src/test/java/org/apache/rocketmq/test/container/ContainerIntegrationTestBase.java +++ b/test/src/test/java/org/apache/rocketmq/test/container/ContainerIntegrationTestBase.java @@ -67,6 +67,8 @@ import org.apache.rocketmq.store.ha.HAConnection; import org.apache.rocketmq.store.ha.HAConnectionState; import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; +import org.awaitility.Awaitility; +import org.awaitility.core.ConditionTimeoutException; import org.junit.Assert; import org.junit.BeforeClass; @@ -507,8 +509,8 @@ protected static void awaitUntilSlaveOK() { }); try { - Thread.sleep(2000); - } catch (InterruptedException e) { + Awaitility.await().pollDelay(Duration.ofMillis(2000)).until(()->true); + } catch (ConditionTimeoutException e) { e.printStackTrace(); } } diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/index/IndexStoreFileTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/index/IndexStoreFileTest.java index b408a7c3cf6..94a14ebd7eb 100644 --- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/index/IndexStoreFileTest.java +++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/index/IndexStoreFileTest.java @@ -19,6 +19,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.nio.file.Paths; +import java.time.Duration; import java.util.Collections; import java.util.List; import java.util.Set; @@ -35,6 +36,8 @@ import org.apache.rocketmq.tieredstore.common.TieredStoreExecutor; import org.apache.rocketmq.tieredstore.provider.TieredFileSegment; import org.apache.rocketmq.tieredstore.provider.posix.PosixFileSegment; +import org.awaitility.Awaitility; +import org.awaitility.core.ConditionTimeoutException; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -186,8 +189,8 @@ public void concurrentPutTest() throws InterruptedException { Assert.assertEquals(AppendResult.SUCCESS, indexStoreFile.putKey( TOPIC_NAME, TOPIC_ID, QUEUE_ID, KEY_SET, MESSAGE_OFFSET, MESSAGE_SIZE, timestamp)); try { - Thread.sleep(100); - } catch (InterruptedException ignored) { + Awaitility.await().pollDelay(Duration.ofMillis(100)).until(()->true); + } catch (ConditionTimeoutException ignored) { } latch.countDown(); }); diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/index/IndexStoreServiceTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/index/IndexStoreServiceTest.java index 20b4acbfa11..41d72c49eb3 100644 --- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/index/IndexStoreServiceTest.java +++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/index/IndexStoreServiceTest.java @@ -43,6 +43,7 @@ import org.apache.rocketmq.tieredstore.common.TieredStoreExecutor; import org.apache.rocketmq.tieredstore.file.TieredFileAllocator; import org.apache.rocketmq.tieredstore.util.TieredStoreUtil; +import org.awaitility.Awaitility; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -98,7 +99,7 @@ public void basicServiceTest() throws InterruptedException { for (int i = 0; i < 50; i++) { Assert.assertEquals(AppendResult.SUCCESS, indexService.putKey( TOPIC_NAME, TOPIC_ID, QUEUE_ID, KEY_SET, i * 100, MESSAGE_SIZE, System.currentTimeMillis())); - TimeUnit.MILLISECONDS.sleep(1); + Awaitility.await().pollDelay(Duration.ofMillis(1)).until(()->true); } ConcurrentSkipListMap timeStoreTable = indexService.getTimeStoreTable(); Assert.assertEquals(3, timeStoreTable.size()); @@ -212,7 +213,7 @@ public void restartServiceTest() throws InterruptedException { TOPIC_NAME, TOPIC_ID, QUEUE_ID, Collections.singleton(String.valueOf(i)), i * 100L, MESSAGE_SIZE, System.currentTimeMillis()); Assert.assertEquals(AppendResult.SUCCESS, result); - TimeUnit.MILLISECONDS.sleep(1); + Awaitility.await().pollDelay(Duration.ofMillis(1)).until(()->true); } long timestamp = indexService.getTimeStoreTable().firstKey(); indexService.shutdown(); @@ -245,7 +246,7 @@ public void queryFromFileTest() throws InterruptedException, ExecutionException TOPIC_NAME, TOPIC_ID, QUEUE_ID, Collections.singleton(String.valueOf(j)), i * 100L + j, MESSAGE_SIZE, System.currentTimeMillis()); Assert.assertEquals(AppendResult.SUCCESS, result); - TimeUnit.MILLISECONDS.sleep(1); + Awaitility.await().pollDelay(Duration.ofMillis(1)).until(()->true); } } @@ -279,7 +280,7 @@ public void concurrentGetTest() throws InterruptedException { indexService.putKey(TOPIC_NAME, TOPIC_ID, j, Collections.singleton(String.valueOf(i)), i * 100L, i * 100, System.currentTimeMillis()); } - TimeUnit.MILLISECONDS.sleep(1); + Awaitility.await().pollDelay(Duration.ofMillis(1)).until(()->true); } CountDownLatch latch = new CountDownLatch(fileCount * 3); diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegmentTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegmentTest.java index a655710a500..db1eb7b543c 100644 --- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegmentTest.java +++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegmentTest.java @@ -17,6 +17,7 @@ package org.apache.rocketmq.tieredstore.provider; import java.nio.ByteBuffer; +import java.time.Duration; import java.util.concurrent.CompletableFuture; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.tieredstore.common.FileSegmentType; @@ -26,9 +27,13 @@ import org.apache.rocketmq.tieredstore.provider.memory.MemoryFileSegment; import org.apache.rocketmq.tieredstore.util.MessageBufferUtil; import org.apache.rocketmq.tieredstore.util.MessageBufferUtilTest; +import org.awaitility.Awaitility; +import org.awaitility.core.ConditionTimeoutException; import org.junit.Assert; import org.junit.Test; +import io.opentelemetry.sdk.metrics.internal.state.DebugUtils; + public class TieredFileSegmentTest { public int baseOffset = 1000; @@ -135,8 +140,8 @@ public void testCommitFailedThenSuccess() { segment.blocker = new CompletableFuture<>(); new Thread(() -> { try { - Thread.sleep(1000); - } catch (InterruptedException e) { + Awaitility.await().pollDelay(Duration.ofMillis(1000)).until(()->true); + } catch (ConditionTimeoutException e) { Assert.fail(e.getMessage()); } ByteBuffer buffer = MessageBufferUtilTest.buildMockedMessageBuffer(); @@ -191,8 +196,8 @@ public void testCommitFailed3Times() { segment.blocker = new CompletableFuture<>(); new Thread(() -> { try { - Thread.sleep(3000); - } catch (InterruptedException e) { + Awaitility.await().pollDelay(Duration.ofMillis(3000)).until(()->true); + } catch (ConditionTimeoutException e) { Assert.fail(e.getMessage()); } ByteBuffer buffer = MessageBufferUtilTest.buildMockedMessageBuffer(); From 371235a19ca6a0da73cf6cfc291199d8b400b8be Mon Sep 17 00:00:00 2001 From: yash Date: Thu, 30 Nov 2023 23:29:25 +0530 Subject: [PATCH 2/2] [ISSUE 4622] Checkstyle configured --- .../org/apache/rocketmq/broker/BrokerControllerTest.java | 3 +-- .../org/apache/rocketmq/broker/BrokerOuterAPITest.java | 4 ++-- .../rocketmq/broker/controller/ReplicasManagerTest.java | 4 ++-- .../broker/filter/MessageStoreWithFilterTest.java | 4 ++-- .../rocketmq/broker/latency/BrokerFastFailureTest.java | 3 +-- .../broker/processor/PopBufferMergeServiceTest.java | 2 +- .../broker/schedule/ScheduleMessageServiceTest.java | 3 +-- .../client/consumer/DefaultLitePullConsumerTest.java | 4 ++-- .../client/consumer/DefaultMQPushConsumerTest.java | 2 +- .../rebalance/AllocateMessageQueueConsitentHashTest.java | 2 +- .../consumer/ConsumeMessageConcurrentlyServiceTest.java | 2 +- .../rocketmq/client/producer/DefaultMQProducerTest.java | 2 +- .../apache/rocketmq/common/stats/StatsItemSetTest.java | 6 +++--- .../rocketmq/controller/impl/DLedgerControllerTest.java | 2 +- .../java/org/apache/rocketmq/filter/BitsArrayTest.java | 2 +- .../processor/ClusterTestRequestProcessorTest.java | 3 +-- .../service/route/ClusterTopicRouteServiceTest.java | 2 +- .../org/apache/rocketmq/test/util/MQAdminTestUtils.java | 4 ++-- .../java/org/apache/rocketmq/test/util/TestUtil.java | 8 ++++---- .../java/org/apache/rocketmq/test/util/TestUtils.java | 9 ++++----- .../rocketmq/tieredstore/index/IndexStoreFileTest.java | 2 +- .../tieredstore/index/IndexStoreServiceTest.java | 8 ++++---- .../tieredstore/provider/TieredFileSegmentTest.java | 5 ++--- 23 files changed, 40 insertions(+), 46 deletions(-) diff --git a/broker/src/test/java/org/apache/rocketmq/broker/BrokerControllerTest.java b/broker/src/test/java/org/apache/rocketmq/broker/BrokerControllerTest.java index eada122dc19..d3f7c8274b1 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/BrokerControllerTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/BrokerControllerTest.java @@ -22,7 +22,6 @@ import java.util.UUID; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; import org.apache.rocketmq.common.BrokerConfig; import org.apache.rocketmq.common.UtilAll; @@ -93,7 +92,7 @@ public void run() { queue.add(new FutureTaskExt<>(requestTask, null)); long headSlowTimeMills = 100; - Awaitility.await().pollDelay(Duration.ofMillis(headSlowTimeMills)).until(()->true); + Awaitility.await().pollDelay(Duration.ofMillis(headSlowTimeMills)).until(() -> true); assertThat(brokerController.headSlowTimeMills(queue)).isGreaterThanOrEqualTo(headSlowTimeMills); } } diff --git a/broker/src/test/java/org/apache/rocketmq/broker/BrokerOuterAPITest.java b/broker/src/test/java/org/apache/rocketmq/broker/BrokerOuterAPITest.java index b92e0bc14bc..eaaafecc0f2 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/BrokerOuterAPITest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/BrokerOuterAPITest.java @@ -27,7 +27,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; -import java.util.concurrent.TimeUnit; + import java.util.concurrent.atomic.AtomicBoolean; import org.apache.rocketmq.broker.out.BrokerOuterAPI; import org.apache.rocketmq.common.BrokerConfig; @@ -133,7 +133,7 @@ public RemotingCommand answer(InvocationOnMock invocation) throws Throwable { } else if (invocation.getArgument(0) == nameserver2) { return buildResponse(Boolean.FALSE); } else if (invocation.getArgument(0) == nameserver3) { - Awaitility.await().pollDelay(Duration.ofMillis(timeOut+20)).until(()->true); + Awaitility.await().pollDelay(Duration.ofMillis(timeOut + 20)).until(() -> true); return buildResponse(Boolean.TRUE); } return buildResponse(Boolean.TRUE); diff --git a/broker/src/test/java/org/apache/rocketmq/broker/controller/ReplicasManagerTest.java b/broker/src/test/java/org/apache/rocketmq/broker/controller/ReplicasManagerTest.java index 65c70e93da4..171347da5cf 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/controller/ReplicasManagerTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/controller/ReplicasManagerTest.java @@ -22,7 +22,7 @@ import java.util.Arrays; import java.util.HashSet; import java.util.UUID; -import java.util.concurrent.TimeUnit; + import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.out.BrokerOuterAPI; @@ -173,7 +173,7 @@ public void before() throws Exception { autoSwitchHAService.init(defaultMessageStore); replicasManager.start(); // execute schedulingSyncBrokerMetadata() - Awaitility.await().pollDelay(Duration.ofSeconds(SCHEDULE_SERVICE_EXEC_PERIOD)).until(()->true); + Awaitility.await().pollDelay(Duration.ofSeconds(SCHEDULE_SERVICE_EXEC_PERIOD)).until(() -> true); } @After diff --git a/broker/src/test/java/org/apache/rocketmq/broker/filter/MessageStoreWithFilterTest.java b/broker/src/test/java/org/apache/rocketmq/broker/filter/MessageStoreWithFilterTest.java index 25e4f3771f9..5c190a6605a 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/filter/MessageStoreWithFilterTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/filter/MessageStoreWithFilterTest.java @@ -236,7 +236,7 @@ protected List filtered(List msgs, public void testGetMessage_withFilterBitMapAndConsumerChanged() throws Exception { List msgs = putMsg(master, topicCount, msgPerTopic); - Awaitility.await().pollDelay(Duration.ofMillis(200)).until(()->true); + Awaitility.await().pollDelay(Duration.ofMillis(200)).until(() -> true); // reset consumer; @@ -299,7 +299,7 @@ public void testGetMessage_withFilterBitMapAndConsumerChanged() throws Exception public void testGetMessage_withFilterBitMap() throws Exception { List msgs = putMsg(master, topicCount, msgPerTopic); - Awaitility.await().pollDelay(Duration.ofMillis(200)).until(()->true); + Awaitility.await().pollDelay(Duration.ofMillis(200)).until(() -> true); for (int i = 0; i < topicCount; i++) { String realTopic = TOPIC + i; diff --git a/broker/src/test/java/org/apache/rocketmq/broker/latency/BrokerFastFailureTest.java b/broker/src/test/java/org/apache/rocketmq/broker/latency/BrokerFastFailureTest.java index bb98aa99d44..0325d5314dd 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/latency/BrokerFastFailureTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/latency/BrokerFastFailureTest.java @@ -19,7 +19,6 @@ import java.time.Duration; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; import org.apache.rocketmq.common.future.FutureTaskExt; import org.apache.rocketmq.remoting.netty.RequestTask; import org.awaitility.Awaitility; @@ -55,7 +54,7 @@ public void run() { RequestTask expiredRequest = new RequestTask(runnable, null, null); queue.add(new FutureTaskExt<>(expiredRequest, null)); - Awaitility.await().pollDelay(Duration.ofMillis(100)).until(()->true); + Awaitility.await().pollDelay(Duration.ofMillis(100)).until(() -> true); RequestTask requestTask = new RequestTask(runnable, null, null); queue.add(new FutureTaskExt<>(requestTask, null)); diff --git a/broker/src/test/java/org/apache/rocketmq/broker/processor/PopBufferMergeServiceTest.java b/broker/src/test/java/org/apache/rocketmq/broker/processor/PopBufferMergeServiceTest.java index 83a1fc1eb7b..e10974281a7 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/processor/PopBufferMergeServiceTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/processor/PopBufferMergeServiceTest.java @@ -118,7 +118,7 @@ public void testBasic() throws Exception { try { assertThat(popBufferMergeService.addCk(ck, reviveQid, ackOffset, nextBeginOffset)).isTrue(); assertThat(popBufferMergeService.getLatestOffset(topic, group, queueId)).isEqualTo(nextBeginOffset); - Awaitility.await().pollDelay(Duration.ofMillis(1000)).until(()->true); // wait background threads of PopBufferMergeService run for some time + Awaitility.await().pollDelay(Duration.ofMillis(1000)).until(() -> true); // wait background threads of PopBufferMergeService run for some time assertThat(popBufferMergeService.addAk(reviveQid, ackMsg)).isTrue(); assertThat(popBufferMergeService.getLatestOffset(topic, group, queueId)).isEqualTo(nextBeginOffset); } finally { diff --git a/broker/src/test/java/org/apache/rocketmq/broker/schedule/ScheduleMessageServiceTest.java b/broker/src/test/java/org/apache/rocketmq/broker/schedule/ScheduleMessageServiceTest.java index f1e91a57a94..89d488287ad 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/schedule/ScheduleMessageServiceTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/schedule/ScheduleMessageServiceTest.java @@ -32,7 +32,6 @@ import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.TimeUnit; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.failover.EscapeBridge; import org.apache.rocketmq.broker.util.HookUtils; @@ -220,7 +219,7 @@ public void testDeliverDelayedMessageTimerTask() throws Exception { // timer run maybe delay, then consumer message again // and wait offsetTable - Awaitility.await().pollDelay(Duration.ofSeconds(15)).until(()->true); + Awaitility.await().pollDelay(Duration.ofSeconds(15)).until(() -> true); scheduleMessageService.buildRunningStats(new HashMap<>()); messageResult = getMessage(realQueueId, offset); diff --git a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java index 4c61593924f..6aff4d97ab5 100644 --- a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java @@ -481,7 +481,7 @@ public void onChanged(String topic, Set messageQueues) { Set set = new HashSet<>(); set.add(createMessageQueue()); doReturn(set).when(mQAdminImpl).fetchSubscribeMessageQueues(anyString()); - Awaitility.await().pollDelay(Duration.ofMillis(11*1000)).until(()->true); + Awaitility.await().pollDelay(Duration.ofMillis(11 * 1000)).until(() -> true); assertThat(flag).isTrue(); } finally { litePullConsumer.shutdown(); @@ -645,7 +645,7 @@ public void testConsumerAfterShutdown() throws Exception { new AsyncConsumer().executeAsync(defaultLitePullConsumer); - Awaitility.await().pollDelay(Duration.ofMillis(100)).until(()->true); + Awaitility.await().pollDelay(Duration.ofMillis(100)).until(() -> true); defaultLitePullConsumer.shutdown(); assertThat(defaultLitePullConsumer.isRunning()).isFalse(); } diff --git a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java index b626f223d8e..258f35c318d 100644 --- a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java @@ -314,7 +314,7 @@ public ConsumeConcurrentlyStatus consumeMessage(List msgs, assertThat(msgs.get(0).getBody()).isEqualTo(msgBody); countDownLatch.countDown(); try { - Awaitility.await().pollDelay(Duration.ofMillis(1000)).until(()->true); + Awaitility.await().pollDelay(Duration.ofMillis(1000)).until(() -> true); messageConsumedFlag.set(true); } catch (Exception e) { } diff --git a/client/src/test/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueConsitentHashTest.java b/client/src/test/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueConsitentHashTest.java index 7c38abbf95a..b63918fdf7a 100644 --- a/client/src/test/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueConsitentHashTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueConsitentHashTest.java @@ -87,7 +87,7 @@ public void testRun100RandomCase() { int queueSize = new Random().nextInt(20) + 1;//1-20 testAllocate(queueSize, consumerSize); try { - Awaitility.await().pollDelay(Duration.ofMillis(1)).until(()->true); + Awaitility.await().pollDelay(Duration.ofMillis(1)).until(() -> true); } catch (Exception e) { } } diff --git a/client/src/test/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyServiceTest.java b/client/src/test/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyServiceTest.java index 9304473d36d..0f3ad7e221b 100644 --- a/client/src/test/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyServiceTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyServiceTest.java @@ -183,7 +183,7 @@ public ConsumeConcurrentlyStatus consumeMessage(List msgs, pullMessageService.executePullRequestImmediately(createPullRequest()); countDownLatch.await(); - Awaitility.await().pollDelay(Duration.ofMillis(1000)).until(()->true); + Awaitility.await().pollDelay(Duration.ofMillis(1000)).until(() -> true); ConsumeStatus stats = normalServie.getConsumerStatsManager().consumeStatus(pushConsumer.getDefaultMQPushConsumerImpl().groupName(),topic); diff --git a/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java b/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java index 0f357edf20c..aab080184e6 100644 --- a/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java @@ -409,7 +409,7 @@ public void run() { assertThat(responseMap).isNotNull(); while (!finish.get()) { try { - Awaitility.await().pollDelay(Duration.ofMillis(10)).until(()->true); + Awaitility.await().pollDelay(Duration.ofMillis(10)).until(() -> true); } catch (ConditionTimeoutException e) { } MessageExt responseMsg = new MessageExt(); diff --git a/common/src/test/java/org/apache/rocketmq/common/stats/StatsItemSetTest.java b/common/src/test/java/org/apache/rocketmq/common/stats/StatsItemSetTest.java index f996f410ee9..6d067f593d5 100644 --- a/common/src/test/java/org/apache/rocketmq/common/stats/StatsItemSetTest.java +++ b/common/src/test/java/org/apache/rocketmq/common/stats/StatsItemSetTest.java @@ -68,7 +68,7 @@ public void run() { if (executor.getCompletedTaskCount() == 10) { break; } - Awaitility.await().pollDelay(Duration.ofMillis(1000)).until(()->true); + Awaitility.await().pollDelay(Duration.ofMillis(1000)).until(() -> true); } // simulate schedule task execution , tps stat { @@ -115,7 +115,7 @@ public void run() { if (executor.getCompletedTaskCount() == 10) { break; } - Awaitility.await().pollDelay(Duration.ofMillis(1000)).until(()->true); + Awaitility.await().pollDelay(Duration.ofMillis(1000)).until(() -> true); } return statsItemSet.getStatsItem("topicTest").getValue(); } @@ -136,7 +136,7 @@ public void run() { if (executor.getCompletedTaskCount() == 10) { break; } - Awaitility.await().pollDelay(Duration.ofMillis(1000)).until(()->true); + Awaitility.await().pollDelay(Duration.ofMillis(1000)).until(() -> true); } return statsItemSet.getAndCreateStatsItem("test").getValue(); } diff --git a/controller/src/test/java/org/apache/rocketmq/controller/impl/DLedgerControllerTest.java b/controller/src/test/java/org/apache/rocketmq/controller/impl/DLedgerControllerTest.java index 538fa51b772..f490b123397 100644 --- a/controller/src/test/java/org/apache/rocketmq/controller/impl/DLedgerControllerTest.java +++ b/controller/src/test/java/org/apache/rocketmq/controller/impl/DLedgerControllerTest.java @@ -292,7 +292,7 @@ public void testBrokerLifecycleListener() throws Exception { assertEquals(DEFAULT_BROKER_NAME, brokerName); atomicBoolean.set(true); }); - Awaitility.await().pollDelay(Duration.ofMillis(2000)).until(()->true); + Awaitility.await().pollDelay(Duration.ofMillis(2000)).until(() -> true); assertTrue(atomicBoolean.get()); } diff --git a/filter/src/test/java/org/apache/rocketmq/filter/BitsArrayTest.java b/filter/src/test/java/org/apache/rocketmq/filter/BitsArrayTest.java index 75e1fe6e552..4014cbe1ddf 100644 --- a/filter/src/test/java/org/apache/rocketmq/filter/BitsArrayTest.java +++ b/filter/src/test/java/org/apache/rocketmq/filter/BitsArrayTest.java @@ -35,7 +35,7 @@ BitsArray gen(int bitCount) { for (int i = 0; i < bitCount / Byte.SIZE; i++) { bitsArray.setByte(i, (byte) (new Random(System.currentTimeMillis())).nextInt(0xff)); try { - Awaitility.await().pollDelay(Duration.ofMillis(2)).until(()->true); + Awaitility.await().pollDelay(Duration.ofMillis(2)).until(() -> true); } catch (ConditionTimeoutException e) { } } diff --git a/namesrv/src/test/java/org/apache/rocketmq/namesrv/processor/ClusterTestRequestProcessorTest.java b/namesrv/src/test/java/org/apache/rocketmq/namesrv/processor/ClusterTestRequestProcessorTest.java index f2459327f4f..3740b0e8aae 100644 --- a/namesrv/src/test/java/org/apache/rocketmq/namesrv/processor/ClusterTestRequestProcessorTest.java +++ b/namesrv/src/test/java/org/apache/rocketmq/namesrv/processor/ClusterTestRequestProcessorTest.java @@ -23,7 +23,6 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; -import java.util.concurrent.TimeUnit; import org.apache.rocketmq.client.ClientConfig; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.impl.MQClientAPIImpl; @@ -169,7 +168,7 @@ public void testNamesrv() throws Exception { RemotingCommand response = clientRequestProcessor.processRequest(mock(ChannelHandlerContext.class), remotingCommand); assertThat(response.getCode()).isEqualTo(ResponseCode.SYSTEM_ERROR); - Awaitility.await().pollDelay(Duration.ofSeconds(waitSecondsForService+1)).until(()->true); + Awaitility.await().pollDelay(Duration.ofSeconds(waitSecondsForService + 1)).until(() -> true); response = clientRequestProcessor.processRequest(mock(ChannelHandlerContext.class), remotingCommand); assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS); } diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/service/route/ClusterTopicRouteServiceTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/service/route/ClusterTopicRouteServiceTest.java index f0113ad9be5..0ab4fbe4259 100644 --- a/proxy/src/test/java/org/apache/rocketmq/proxy/service/route/ClusterTopicRouteServiceTest.java +++ b/proxy/src/test/java/org/apache/rocketmq/proxy/service/route/ClusterTopicRouteServiceTest.java @@ -163,7 +163,7 @@ public void testTopicRouteCaffeineCache() throws InterruptedException { } }); assertThat(value).isEqualTo(topicCache.get(key)); - Awaitility.await().pollDelay(Duration.ofSeconds(5)).until(()->true); + Awaitility.await().pollDelay(Duration.ofSeconds(5)).until(() -> true); assertThat(value).isEqualTo(topicCache.get(key)); } } diff --git a/test/src/main/java/org/apache/rocketmq/test/util/MQAdminTestUtils.java b/test/src/main/java/org/apache/rocketmq/test/util/MQAdminTestUtils.java index 9f63bd3e9b7..c3daa43757e 100644 --- a/test/src/main/java/org/apache/rocketmq/test/util/MQAdminTestUtils.java +++ b/test/src/main/java/org/apache/rocketmq/test/util/MQAdminTestUtils.java @@ -123,7 +123,7 @@ public static boolean createSub(String nameSrvAddr, String clusterName, String c addr)); } catch (Exception e) { e.printStackTrace(); - Awaitility.await().pollDelay(Duration.ofMillis(1000*1)).until(()->true); + Awaitility.await().pollDelay(Duration.ofMillis(1000 * 1)).until(() -> true); } } } catch (Exception e) { @@ -173,7 +173,7 @@ public static boolean awaitStaticTopicMs(long timeMs, String topic, DefaultMQAdm if (checkStaticTopic(topic, defaultMQAdminExt, clientInstance)) { return true; } - Awaitility.await().pollDelay(Duration.ofMillis(100)).until(()->true); + Awaitility.await().pollDelay(Duration.ofMillis(100)).until(() -> true); } return false; } diff --git a/test/src/main/java/org/apache/rocketmq/test/util/TestUtil.java b/test/src/main/java/org/apache/rocketmq/test/util/TestUtil.java index c80b71fc0a8..c76c75d5942 100644 --- a/test/src/main/java/org/apache/rocketmq/test/util/TestUtil.java +++ b/test/src/main/java/org/apache/rocketmq/test/util/TestUtil.java @@ -24,7 +24,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.concurrent.TimeUnit; + import org.awaitility.Awaitility; import org.awaitility.core.ConditionTimeoutException; @@ -62,7 +62,7 @@ public static String addQuoteToParamater(String param) { public static void waitForMonment(long time) { try { - Awaitility.await().pollDelay(Duration.ofMillis(time)).until(()->true); + Awaitility.await().pollDelay(Duration.ofMillis(time)).until(() -> true); } catch (ConditionTimeoutException e) { e.printStackTrace(); } @@ -70,7 +70,7 @@ public static void waitForMonment(long time) { public static void waitForSeconds(long time) { try { - Awaitility.await().pollDelay(Duration.ofSeconds(time)).until(()->true); + Awaitility.await().pollDelay(Duration.ofSeconds(time)).until(() -> true); } catch (ConditionTimeoutException e) { e.printStackTrace(); } @@ -78,7 +78,7 @@ public static void waitForSeconds(long time) { public static void waitForMinutes(long time) { try { - Awaitility.await().pollDelay(Duration.ofMinutes(time)).until(()->true); + Awaitility.await().pollDelay(Duration.ofMinutes(time)).until(() -> true); } catch (ConditionTimeoutException e) { e.printStackTrace(); } diff --git a/test/src/main/java/org/apache/rocketmq/test/util/TestUtils.java b/test/src/main/java/org/apache/rocketmq/test/util/TestUtils.java index 0847af9ae84..6ff765a6cf7 100644 --- a/test/src/main/java/org/apache/rocketmq/test/util/TestUtils.java +++ b/test/src/main/java/org/apache/rocketmq/test/util/TestUtils.java @@ -18,17 +18,16 @@ package org.apache.rocketmq.test.util; import java.time.Duration; -import java.util.concurrent.TimeUnit; + import org.awaitility.Awaitility; import org.awaitility.core.ConditionTimeoutException; -import io.opentelemetry.sdk.metrics.internal.state.DebugUtils; public class TestUtils { public static void waitForMoment(long time) { try { - Awaitility.await().pollDelay(Duration.ofMillis(time)).until(()->true); + Awaitility.await().pollDelay(Duration.ofMillis(time)).until(() -> true); } catch (ConditionTimeoutException var3) { var3.printStackTrace(); } @@ -37,7 +36,7 @@ public static void waitForMoment(long time) { public static void waitForSeconds(long time) { try { - Awaitility.await().pollDelay(Duration.ofMillis(time)).until(()->true); + Awaitility.await().pollDelay(Duration.ofMillis(time)).until(() -> true); } catch (ConditionTimeoutException var3) { var3.printStackTrace(); } @@ -46,7 +45,7 @@ public static void waitForSeconds(long time) { public static void waitForMinutes(long time) { try { - Awaitility.await().pollDelay(Duration.ofMillis(time)).until(()->true); + Awaitility.await().pollDelay(Duration.ofMillis(time)).until(() -> true); } catch (ConditionTimeoutException var3) { var3.printStackTrace(); } diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/index/IndexStoreFileTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/index/IndexStoreFileTest.java index 94a14ebd7eb..50a6735eece 100644 --- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/index/IndexStoreFileTest.java +++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/index/IndexStoreFileTest.java @@ -189,7 +189,7 @@ public void concurrentPutTest() throws InterruptedException { Assert.assertEquals(AppendResult.SUCCESS, indexStoreFile.putKey( TOPIC_NAME, TOPIC_ID, QUEUE_ID, KEY_SET, MESSAGE_OFFSET, MESSAGE_SIZE, timestamp)); try { - Awaitility.await().pollDelay(Duration.ofMillis(100)).until(()->true); + Awaitility.await().pollDelay(Duration.ofMillis(100)).until(() -> true); } catch (ConditionTimeoutException ignored) { } latch.countDown(); diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/index/IndexStoreServiceTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/index/IndexStoreServiceTest.java index 41d72c49eb3..71c28eb35df 100644 --- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/index/IndexStoreServiceTest.java +++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/index/IndexStoreServiceTest.java @@ -99,7 +99,7 @@ public void basicServiceTest() throws InterruptedException { for (int i = 0; i < 50; i++) { Assert.assertEquals(AppendResult.SUCCESS, indexService.putKey( TOPIC_NAME, TOPIC_ID, QUEUE_ID, KEY_SET, i * 100, MESSAGE_SIZE, System.currentTimeMillis())); - Awaitility.await().pollDelay(Duration.ofMillis(1)).until(()->true); + Awaitility.await().pollDelay(Duration.ofMillis(1)).until(() -> true); } ConcurrentSkipListMap timeStoreTable = indexService.getTimeStoreTable(); Assert.assertEquals(3, timeStoreTable.size()); @@ -213,7 +213,7 @@ public void restartServiceTest() throws InterruptedException { TOPIC_NAME, TOPIC_ID, QUEUE_ID, Collections.singleton(String.valueOf(i)), i * 100L, MESSAGE_SIZE, System.currentTimeMillis()); Assert.assertEquals(AppendResult.SUCCESS, result); - Awaitility.await().pollDelay(Duration.ofMillis(1)).until(()->true); + Awaitility.await().pollDelay(Duration.ofMillis(1)).until(() -> true); } long timestamp = indexService.getTimeStoreTable().firstKey(); indexService.shutdown(); @@ -246,7 +246,7 @@ public void queryFromFileTest() throws InterruptedException, ExecutionException TOPIC_NAME, TOPIC_ID, QUEUE_ID, Collections.singleton(String.valueOf(j)), i * 100L + j, MESSAGE_SIZE, System.currentTimeMillis()); Assert.assertEquals(AppendResult.SUCCESS, result); - Awaitility.await().pollDelay(Duration.ofMillis(1)).until(()->true); + Awaitility.await().pollDelay(Duration.ofMillis(1)).until(() -> true); } } @@ -280,7 +280,7 @@ public void concurrentGetTest() throws InterruptedException { indexService.putKey(TOPIC_NAME, TOPIC_ID, j, Collections.singleton(String.valueOf(i)), i * 100L, i * 100, System.currentTimeMillis()); } - Awaitility.await().pollDelay(Duration.ofMillis(1)).until(()->true); + Awaitility.await().pollDelay(Duration.ofMillis(1)).until(() -> true); } CountDownLatch latch = new CountDownLatch(fileCount * 3); diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegmentTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegmentTest.java index db1eb7b543c..af55fa4828b 100644 --- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegmentTest.java +++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegmentTest.java @@ -32,7 +32,6 @@ import org.junit.Assert; import org.junit.Test; -import io.opentelemetry.sdk.metrics.internal.state.DebugUtils; public class TieredFileSegmentTest { @@ -140,7 +139,7 @@ public void testCommitFailedThenSuccess() { segment.blocker = new CompletableFuture<>(); new Thread(() -> { try { - Awaitility.await().pollDelay(Duration.ofMillis(1000)).until(()->true); + Awaitility.await().pollDelay(Duration.ofMillis(1000)).until(() -> true); } catch (ConditionTimeoutException e) { Assert.fail(e.getMessage()); } @@ -196,7 +195,7 @@ public void testCommitFailed3Times() { segment.blocker = new CompletableFuture<>(); new Thread(() -> { try { - Awaitility.await().pollDelay(Duration.ofMillis(3000)).until(()->true); + Awaitility.await().pollDelay(Duration.ofMillis(3000)).until(() -> true); } catch (ConditionTimeoutException e) { Assert.fail(e.getMessage()); }