Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE 4622] Improve execution efficiency of integration tests, chang… #7597

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,15 @@
import org.apache.rocketmq.remoting.protocol.DataVersion;
import org.assertj.core.api.Assertions;
import org.assertj.core.util.Lists;
import org.awaitility.Awaitility;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
Expand Down Expand Up @@ -127,9 +129,12 @@ public void buildPlainAccessResourceTest() {
Map<String, Byte> resourcePermMap = plainAccessResource.getResourcePermMap();
Assert.assertEquals(resourcePermMap.size(), 3);

Assert.assertEquals(resourcePermMap.get(PlainAccessResource.getRetryTopic("groupA")).byteValue(), Permission.DENY);
Assert.assertEquals(resourcePermMap.get(PlainAccessResource.getRetryTopic("groupB")).byteValue(), Permission.PUB | Permission.SUB);
Assert.assertEquals(resourcePermMap.get(PlainAccessResource.getRetryTopic("groupC")).byteValue(), Permission.PUB);
Assert.assertEquals(resourcePermMap.get(PlainAccessResource.getRetryTopic("groupA")).byteValue(),
Permission.DENY);
Assert.assertEquals(resourcePermMap.get(PlainAccessResource.getRetryTopic("groupB")).byteValue(),
Permission.PUB | Permission.SUB);
Assert.assertEquals(resourcePermMap.get(PlainAccessResource.getRetryTopic("groupC")).byteValue(),
Permission.PUB);

List<String> topics = new ArrayList<>();
topics.add("topicA=DENY");
Expand Down Expand Up @@ -205,11 +210,13 @@ public void passWordThanTest() {
@Test
public void cleanAuthenticationInfoTest() throws IllegalAccessException {
// PlainPermissionManager.addPlainAccessResource(plainAccessResource);
Map<String, Map<String, PlainAccessResource>> plainAccessResourceMap = (Map<String, Map<String, PlainAccessResource>>) FieldUtils.readDeclaredField(plainPermissionManager, "aclPlainAccessResourceMap", true);
Map<String, Map<String, PlainAccessResource>> plainAccessResourceMap = (Map<String, Map<String, PlainAccessResource>>) FieldUtils
.readDeclaredField(plainPermissionManager, "aclPlainAccessResourceMap", true);
Assert.assertFalse(plainAccessResourceMap.isEmpty());

plainPermissionManager.clearPermissionInfo();
plainAccessResourceMap = (Map<String, Map<String, PlainAccessResource>>) FieldUtils.readDeclaredField(plainPermissionManager, "aclPlainAccessResourceMap", true);
plainAccessResourceMap = (Map<String, Map<String, PlainAccessResource>>) FieldUtils
.readDeclaredField(plainPermissionManager, "aclPlainAccessResourceMap", true);
Assert.assertTrue(plainAccessResourceMap.isEmpty());
}

Expand All @@ -222,7 +229,8 @@ public void isWatchStartTest() {

@Test
public void testWatch() throws IOException, IllegalAccessException, InterruptedException {
String fileName = Joiner.on(File.separator).join(new String[]{System.getProperty("rocketmq.home.dir"), "conf", "acl", "plain_acl_test.yml"});
String fileName = Joiner.on(File.separator)
.join(new String[] { System.getProperty("rocketmq.home.dir"), "conf", "acl", "plain_acl_test.yml" });
File transport = new File(fileName);
transport.delete();
transport.createNewFile();
Expand All @@ -235,15 +243,17 @@ public void testWatch() throws IOException, IllegalAccessException, InterruptedE
writer.flush();
writer.close();

Thread.sleep(1000);
Awaitility.await().pollDelay(Duration.ofMillis(1000)).until(() -> true);

PlainPermissionManager plainPermissionManager = new PlainPermissionManager();
Assert.assertTrue(plainPermissionManager.isWatchStart());

Map<String, String> accessKeyTable = (Map<String, String>) FieldUtils.readDeclaredField(plainPermissionManager, "accessKeyTable", true);
Map<String, String> accessKeyTable = (Map<String, String>) FieldUtils.readDeclaredField(plainPermissionManager,
"accessKeyTable", true);
String aclFileName = accessKeyTable.get("watchrocketmqx");
{
Map<String, Map<String, PlainAccessResource>> plainAccessResourceMap = (Map<String, Map<String, PlainAccessResource>>) FieldUtils.readDeclaredField(plainPermissionManager, "aclPlainAccessResourceMap", true);
Map<String, Map<String, PlainAccessResource>> plainAccessResourceMap = (Map<String, Map<String, PlainAccessResource>>) FieldUtils
.readDeclaredField(plainPermissionManager, "aclPlainAccessResourceMap", true);
PlainAccessResource accessResource = plainAccessResourceMap.get(aclFileName).get("watchrocketmqx");
Assert.assertNotNull(accessResource);
Assert.assertEquals(accessResource.getSecretKey(), "12345678");
Expand All @@ -259,9 +269,10 @@ public void testWatch() throws IOException, IllegalAccessException, InterruptedE
// Update file and flush to yaml file
AclUtils.writeDataObject(fileName, updatedMap);

Thread.sleep(10000);
Awaitility.await().pollDelay(Duration.ofMillis(10000)).until(() -> true);
{
Map<String, Map<String, PlainAccessResource>> plainAccessResourceMap = (Map<String, Map<String, PlainAccessResource>>) FieldUtils.readDeclaredField(plainPermissionManager, "aclPlainAccessResourceMap", true);
Map<String, Map<String, PlainAccessResource>> plainAccessResourceMap = (Map<String, Map<String, PlainAccessResource>>) FieldUtils
.readDeclaredField(plainPermissionManager, "aclPlainAccessResourceMap", true);
PlainAccessResource accessResource = plainAccessResourceMap.get(aclFileName).get("watchrocketmq1y");
Assert.assertNotNull(accessResource);
Assert.assertEquals(accessResource.getSecretKey(), "88888888");
Expand Down Expand Up @@ -337,7 +348,8 @@ public void createAclAccessConfigMapTest() {
plainAccessConfig.setTopicPerms(Arrays.asList(DEFAULT_TOPIC + "=" + AclConstants.PUB));
plainAccessConfig.setGroupPerms(Lists.newArrayList("groupA=SUB"));

final PlainAccessConfig map = plainPermissionManager.createAclAccessConfigMap(existedAccountMap, plainAccessConfig);
final PlainAccessConfig map = plainPermissionManager.createAclAccessConfigMap(existedAccountMap,
plainAccessConfig);
Assert.assertEquals(AclConstants.SUB_PUB, map.getDefaultGroupPerm());
Assert.assertEquals("groupA=SUB", map.getGroupPerms().get(0));
Assert.assertEquals("12345678", map.getSecretKey());
Expand All @@ -362,7 +374,7 @@ public void deleteAccessConfigTest() throws InterruptedException {
plainAccessConfig.setGroupPerms(Lists.newArrayList("groupA=SUB"));
plainPermissionManager.updateAccessConfig(plainAccessConfig);

//delete existed accessConfig
// delete existed accessConfig
final boolean flag2 = plainPermissionManager.deleteAccessConfig("test_delete");
assert flag2;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@
package org.apache.rocketmq.broker;

import java.io.File;
import java.time.Duration;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.UtilAll;
Expand All @@ -30,6 +30,7 @@
import org.apache.rocketmq.remoting.netty.NettyServerConfig;
import org.apache.rocketmq.remoting.netty.RequestTask;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.awaitility.Awaitility;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
Expand Down Expand Up @@ -91,7 +92,7 @@ public void run() {
queue.add(new FutureTaskExt<>(requestTask, null));

long headSlowTimeMills = 100;
TimeUnit.MILLISECONDS.sleep(headSlowTimeMills);
Awaitility.await().pollDelay(Duration.ofMillis(headSlowTimeMills)).until(() -> true);
assertThat(brokerController.headSlowTimeMills(queue)).isGreaterThanOrEqualTo(headSlowTimeMills);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,11 @@
import io.netty.channel.ChannelHandlerContext;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.TimeUnit;

import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.rocketmq.broker.out.BrokerOuterAPI;
import org.apache.rocketmq.common.BrokerConfig;
Expand All @@ -48,6 +49,7 @@
import org.apache.rocketmq.remoting.protocol.route.BrokerData;
import org.apache.rocketmq.store.MessageStore;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.awaitility.Awaitility;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand Down Expand Up @@ -131,7 +133,7 @@ public RemotingCommand answer(InvocationOnMock invocation) throws Throwable {
} else if (invocation.getArgument(0) == nameserver2) {
return buildResponse(Boolean.FALSE);
} else if (invocation.getArgument(0) == nameserver3) {
TimeUnit.MILLISECONDS.sleep(timeOut + 20);
Awaitility.await().pollDelay(Duration.ofMillis(timeOut + 20)).until(() -> true);
return buildResponse(Boolean.TRUE);
}
return buildResponse(Boolean.TRUE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@
package org.apache.rocketmq.broker.controller;

import java.io.File;
import java.time.Duration;
import java.util.Arrays;
import java.util.HashSet;
import java.util.UUID;
import java.util.concurrent.TimeUnit;


import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.out.BrokerOuterAPI;
Expand All @@ -43,6 +44,7 @@
import org.apache.rocketmq.store.ha.autoswitch.AutoSwitchHAService;
import org.assertj.core.api.Assertions;
import org.assertj.core.util.Sets;
import org.awaitility.Awaitility;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
Expand Down Expand Up @@ -171,7 +173,7 @@ public void before() throws Exception {
autoSwitchHAService.init(defaultMessageStore);
replicasManager.start();
// execute schedulingSyncBrokerMetadata()
TimeUnit.SECONDS.sleep(SCHEDULE_SERVICE_EXEC_PERIOD);
Awaitility.await().pollDelay(Duration.ofSeconds(SCHEDULE_SERVICE_EXEC_PERIOD)).until(() -> true);
}

@After
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.net.SocketAddress;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
Expand All @@ -47,6 +48,7 @@
import org.apache.rocketmq.store.PutMessageResult;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.apache.rocketmq.store.stats.BrokerStatsManager;
import org.awaitility.Awaitility;
import org.awaitility.core.ThrowingRunnable;
import org.junit.After;
import org.junit.Assume;
Expand Down Expand Up @@ -234,7 +236,8 @@ protected List<MessageExtBrokerInner> filtered(List<MessageExtBrokerInner> msgs,
public void testGetMessage_withFilterBitMapAndConsumerChanged() throws Exception {
List<MessageExtBrokerInner> msgs = putMsg(master, topicCount, msgPerTopic);

Thread.sleep(200);
Awaitility.await().pollDelay(Duration.ofMillis(200)).until(() -> true);


// reset consumer;
String topic = "topic" + 0;
Expand Down Expand Up @@ -296,7 +299,7 @@ public void testGetMessage_withFilterBitMapAndConsumerChanged() throws Exception
public void testGetMessage_withFilterBitMap() throws Exception {
List<MessageExtBrokerInner> msgs = putMsg(master, topicCount, msgPerTopic);

Thread.sleep(100);
Awaitility.await().pollDelay(Duration.ofMillis(200)).until(() -> true);

for (int i = 0; i < topicCount; i++) {
String realTopic = TOPIC + i;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,12 @@
*/
package org.apache.rocketmq.broker.latency;

import java.time.Duration;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.common.future.FutureTaskExt;
import org.apache.rocketmq.remoting.netty.RequestTask;
import org.awaitility.Awaitility;
import org.junit.Test;

import static org.assertj.core.api.Assertions.assertThat;
Expand Down Expand Up @@ -52,7 +53,8 @@ public void run() {
//With expired request
RequestTask expiredRequest = new RequestTask(runnable, null, null);
queue.add(new FutureTaskExt<>(expiredRequest, null));
TimeUnit.MILLISECONDS.sleep(100);

Awaitility.await().pollDelay(Duration.ofMillis(100)).until(() -> true);

RequestTask requestTask = new RequestTask(runnable, null, null);
queue.add(new FutureTaskExt<>(requestTask, null));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.apache.rocketmq.store.pop.AckMsg;
import org.apache.rocketmq.store.pop.PopCheckPoint;
import org.awaitility.Awaitility;
import org.junit.Assume;
import org.junit.Before;
import org.junit.Test;
Expand All @@ -44,6 +45,8 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;

import java.time.Duration;

@RunWith(MockitoJUnitRunner.Silent.class)
public class PopBufferMergeServiceTest {
@Spy
Expand Down Expand Up @@ -115,7 +118,7 @@ public void testBasic() throws Exception {
try {
assertThat(popBufferMergeService.addCk(ck, reviveQid, ackOffset, nextBeginOffset)).isTrue();
assertThat(popBufferMergeService.getLatestOffset(topic, group, queueId)).isEqualTo(nextBeginOffset);
Thread.sleep(1000); // wait background threads of PopBufferMergeService run for some time
Awaitility.await().pollDelay(Duration.ofMillis(1000)).until(() -> true); // wait background threads of PopBufferMergeService run for some time
assertThat(popBufferMergeService.addAk(reviveQid, ackMsg)).isTrue();
assertThat(popBufferMergeService.getLatestOffset(topic, group, queueId)).isEqualTo(nextBeginOffset);
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,14 @@
import java.net.SocketAddress;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.failover.EscapeBridge;
import org.apache.rocketmq.broker.util.HookUtils;
Expand All @@ -48,6 +48,7 @@
import org.apache.rocketmq.store.PutMessageResult;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.apache.rocketmq.store.stats.BrokerStatsManager;
import org.awaitility.Awaitility;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
Expand Down Expand Up @@ -218,7 +219,7 @@ public void testDeliverDelayedMessageTimerTask() throws Exception {

// timer run maybe delay, then consumer message again
// and wait offsetTable
TimeUnit.SECONDS.sleep(15);
Awaitility.await().pollDelay(Duration.ofSeconds(15)).until(() -> true);
scheduleMessageService.buildRunningStats(new HashMap<>());

messageResult = getMessage(realQueueId, offset);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.protocol.header.PullMessageRequestHeader;
import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel;
import org.awaitility.Awaitility;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
Expand Down Expand Up @@ -480,7 +481,7 @@ public void onChanged(String topic, Set<MessageQueue> messageQueues) {
Set<MessageQueue> set = new HashSet<>();
set.add(createMessageQueue());
doReturn(set).when(mQAdminImpl).fetchSubscribeMessageQueues(anyString());
Thread.sleep(11 * 1000);
Awaitility.await().pollDelay(Duration.ofMillis(11 * 1000)).until(() -> true);
assertThat(flag).isTrue();
} finally {
litePullConsumer.shutdown();
Expand Down Expand Up @@ -644,7 +645,7 @@ public void testConsumerAfterShutdown() throws Exception {

new AsyncConsumer().executeAsync(defaultLitePullConsumer);

Thread.sleep(100);
Awaitility.await().pollDelay(Duration.ofMillis(100)).until(() -> true);
defaultLitePullConsumer.shutdown();
assertThat(defaultLitePullConsumer.isRunning()).isFalse();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.io.ByteArrayOutputStream;
import java.lang.reflect.Field;
import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -61,6 +62,7 @@
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.remoting.protocol.header.PullMessageRequestHeader;
import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel;
import org.awaitility.Awaitility;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
Expand Down Expand Up @@ -312,9 +314,9 @@ public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
assertThat(msgs.get(0).getBody()).isEqualTo(msgBody);
countDownLatch.countDown();
try {
Thread.sleep(1000);
Awaitility.await().pollDelay(Duration.ofMillis(1000)).until(() -> true);
messageConsumedFlag.set(true);
} catch (InterruptedException e) {
} catch (Exception e) {
}

return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.rocketmq.client.consumer.rebalance;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
Expand All @@ -24,6 +25,7 @@
import java.util.TreeMap;
import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
import org.apache.rocketmq.common.message.MessageQueue;
import org.awaitility.Awaitility;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
Expand Down Expand Up @@ -85,8 +87,8 @@ public void testRun100RandomCase() {
int queueSize = new Random().nextInt(20) + 1;//1-20
testAllocate(queueSize, consumerSize);
try {
Thread.sleep(1);
} catch (InterruptedException e) {
Awaitility.await().pollDelay(Duration.ofMillis(1)).until(() -> true);
} catch (Exception e) {
}
}
}
Expand Down
Loading
Loading