Skip to content

Commit

Permalink
fix ut
Browse files Browse the repository at this point in the history
  • Loading branch information
duanlinlin committed Dec 6, 2024
1 parent bc66462 commit b176f06
Showing 1 changed file with 1 addition and 87 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,37 +18,27 @@

import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.when;
import java.io.UnsupportedEncodingException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.loadbalance.MessageRequestModeManager;
import org.apache.rocketmq.broker.offset.ConsumerOffsetManager;
import org.apache.rocketmq.broker.out.BrokerOuterAPI;
import org.apache.rocketmq.broker.processor.QueryAssignmentProcessor;
import org.apache.rocketmq.broker.schedule.ScheduleMessageService;
import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager;
import org.apache.rocketmq.broker.topic.TopicConfigManager;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.remoting.exception.RemotingConnectException;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
import org.apache.rocketmq.remoting.netty.NettyServerConfig;
import org.apache.rocketmq.remoting.protocol.DataVersion;
import org.apache.rocketmq.remoting.protocol.body.ConsumerOffsetSerializeWrapper;
import org.apache.rocketmq.remoting.protocol.body.MessageRequestModeSerializeWrapper;
import org.apache.rocketmq.remoting.protocol.body.SubscriptionGroupWrapper;
import org.apache.rocketmq.remoting.protocol.body.TopicConfigAndMappingSerializeWrapper;
import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.store.MessageStore;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.apache.rocketmq.store.timer.TimerCheckpoint;
import org.apache.rocketmq.store.timer.TimerMessageStore;
import org.apache.rocketmq.store.timer.TimerMetrics;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
Expand All @@ -72,17 +62,6 @@ public class SlaveSynchronizeAtomicTest {
@Mock
private TopicConfigManager topicConfigManager;

@Mock
private ConsumerOffsetManager consumerOffsetManager;

@Mock
private MessageStoreConfig messageStoreConfig;

@Mock
private MessageStore messageStore;

@Mock
private ScheduleMessageService scheduleMessageService;

@Mock
private SubscriptionGroupManager subscriptionGroupManager;
Expand All @@ -93,14 +72,6 @@ public class SlaveSynchronizeAtomicTest {
@Mock
private MessageRequestModeManager messageRequestModeManager;

@Mock
private TimerMessageStore timerMessageStore;

@Mock
private TimerMetrics timerMetrics;

@Mock
private TimerCheckpoint timerCheckpoint;

private static final String BROKER_ADDR = "127.0.0.1:10911";
private final SubscriptionGroupWrapper subscriptionGroupWrapper = createSubscriptionGroupWrapper();
Expand All @@ -117,54 +88,14 @@ public void init() {
}
when(brokerController.getBrokerOuterAPI()).thenReturn(brokerOuterAPI);
when(brokerController.getTopicConfigManager()).thenReturn(topicConfigManager);
when(brokerController.getMessageStoreConfig()).thenReturn(messageStoreConfig);
when(brokerController.getScheduleMessageService()).thenReturn(scheduleMessageService);
when(brokerController.getSubscriptionGroupManager()).thenReturn(subscriptionGroupManager);
when(brokerController.getQueryAssignmentProcessor()).thenReturn(queryAssignmentProcessor);
when(brokerController.getMessageStore()).thenReturn(messageStore);
when(brokerController.getTimerMessageStore()).thenReturn(timerMessageStore);
when(brokerController.getTimerCheckpoint()).thenReturn(timerCheckpoint);
when(topicConfigManager.getDataVersion()).thenReturn(new DataVersion());
when(topicConfigManager.getTopicConfigTable()).thenReturn(new ConcurrentHashMap<>());
when(brokerController.getConsumerOffsetManager()).thenReturn(consumerOffsetManager);
when(consumerOffsetManager.getOffsetTable()).thenReturn(new ConcurrentHashMap<>());
when(consumerOffsetManager.getDataVersion()).thenReturn(new DataVersion());
when(subscriptionGroupManager.getDataVersion()).thenReturn(dataVersion);
when(subscriptionGroupManager.getSubscriptionGroupTable()).thenReturn(
subscriptionGroupWrapper.getSubscriptionGroupTable());
when(queryAssignmentProcessor.getMessageRequestModeManager()).thenReturn(messageRequestModeManager);
when(messageRequestModeManager.getMessageRequestModeMap()).thenReturn(
requestModeSerializeWrapper.getMessageRequestModeMap());
when(messageStoreConfig.isTimerWheelEnable()).thenReturn(true);
when(messageStore.getTimerMessageStore()).thenReturn(timerMessageStore);
when(timerMessageStore.isShouldRunningDequeue()).thenReturn(false);
when(timerMessageStore.getTimerMetrics()).thenReturn(timerMetrics);
when(timerMetrics.getDataVersion()).thenReturn(new DataVersion());
when(timerCheckpoint.getDataVersion()).thenReturn(new DataVersion());
slaveSynchronize = new SlaveSynchronize(brokerController);
slaveSynchronize.setMasterAddr(BROKER_ADDR);
}

private TopicConfigAndMappingSerializeWrapper createTopicConfigWrapper(TopicConfig topicConfig) {
TopicConfigAndMappingSerializeWrapper wrapper = new TopicConfigAndMappingSerializeWrapper();
wrapper.setTopicConfigTable(new ConcurrentHashMap<>());
wrapper.getTopicConfigTable().put(topicConfig.getTopicName(), topicConfig);
DataVersion dataVersion = new DataVersion();
dataVersion.setStateVersion(1L);
wrapper.setDataVersion(dataVersion);
wrapper.setMappingDataVersion(dataVersion);
return wrapper;
}

private ConsumerOffsetSerializeWrapper createConsumerOffsetWrapper() {
ConsumerOffsetSerializeWrapper wrapper = new ConsumerOffsetSerializeWrapper();
wrapper.setOffsetTable(new ConcurrentHashMap<>());
DataVersion dataVersion = new DataVersion();
dataVersion.setStateVersion(1L);
wrapper.setDataVersion(dataVersion);
return wrapper;
}

private SubscriptionGroupWrapper createSubscriptionGroupWrapper() {
SubscriptionGroupWrapper wrapper = new SubscriptionGroupWrapper();
wrapper.setSubscriptionGroupTable(new ConcurrentHashMap<>());
Expand All @@ -180,29 +111,12 @@ private MessageRequestModeSerializeWrapper createMessageRequestModeWrapper() {
return wrapper;
}

private TimerMetrics.TimerMetricsSerializeWrapper createTimerMetricsWrapper() {
TimerMetrics.TimerMetricsSerializeWrapper wrapper = new TimerMetrics.TimerMetricsSerializeWrapper();
wrapper.setTimingCount(new ConcurrentHashMap<>());
DataVersion dataVersion = new DataVersion();
dataVersion.setStateVersion(1L);
wrapper.setDataVersion(dataVersion);
return wrapper;
}

@Test
public void testSyncAtomically()
throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQBrokerException,
InterruptedException, UnsupportedEncodingException {
when(brokerOuterAPI.getAllTopicConfig(anyString())).thenReturn(
createTopicConfigWrapper(new TopicConfig("NewTopic")));
when(brokerOuterAPI.getAllConsumerOffset(anyString())).thenReturn(createConsumerOffsetWrapper());
when(brokerOuterAPI.getAllDelayOffset(anyString())).thenReturn("");
InterruptedException {
when(brokerOuterAPI.getAllSubscriptionGroupConfig(anyString())).thenReturn(subscriptionGroupWrapper);
when(brokerOuterAPI.getAllMessageRequestMode(anyString())).thenReturn(requestModeSerializeWrapper);
when(brokerOuterAPI.getTimerMetrics(anyString())).thenReturn(createTimerMetricsWrapper());
when(brokerOuterAPI.getAllConsumerOffset(anyString())).thenReturn(createConsumerOffsetWrapper());
when(brokerOuterAPI.getAllDelayOffset(anyString())).thenReturn("");
when(brokerOuterAPI.getTimerMetrics(anyString())).thenReturn(createTimerMetricsWrapper());

CountDownLatch countDownLatch = new CountDownLatch(1);
new Thread(() -> {
Expand Down

0 comments on commit b176f06

Please sign in to comment.