Skip to content

Commit

Permalink
add unit test
Browse files Browse the repository at this point in the history
  • Loading branch information
duanlinlin committed Nov 28, 2024
1 parent da0e1c4 commit bc66462
Showing 1 changed file with 227 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,227 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.broker.slave;

import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.when;
import java.io.UnsupportedEncodingException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.loadbalance.MessageRequestModeManager;
import org.apache.rocketmq.broker.offset.ConsumerOffsetManager;
import org.apache.rocketmq.broker.out.BrokerOuterAPI;
import org.apache.rocketmq.broker.processor.QueryAssignmentProcessor;
import org.apache.rocketmq.broker.schedule.ScheduleMessageService;
import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager;
import org.apache.rocketmq.broker.topic.TopicConfigManager;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.remoting.exception.RemotingConnectException;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
import org.apache.rocketmq.remoting.netty.NettyServerConfig;
import org.apache.rocketmq.remoting.protocol.DataVersion;
import org.apache.rocketmq.remoting.protocol.body.ConsumerOffsetSerializeWrapper;
import org.apache.rocketmq.remoting.protocol.body.MessageRequestModeSerializeWrapper;
import org.apache.rocketmq.remoting.protocol.body.SubscriptionGroupWrapper;
import org.apache.rocketmq.remoting.protocol.body.TopicConfigAndMappingSerializeWrapper;
import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.store.MessageStore;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.apache.rocketmq.store.timer.TimerCheckpoint;
import org.apache.rocketmq.store.timer.TimerMessageStore;
import org.apache.rocketmq.store.timer.TimerMetrics;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Spy;
import org.mockito.junit.MockitoJUnitRunner;

@RunWith(MockitoJUnitRunner.class)
public class SlaveSynchronizeAtomicTest {
@Spy
private BrokerController brokerController =
new BrokerController(new BrokerConfig(), new NettyServerConfig(), new NettyClientConfig(),
new MessageStoreConfig());

private SlaveSynchronize slaveSynchronize;

@Mock
private BrokerOuterAPI brokerOuterAPI;

@Mock
private TopicConfigManager topicConfigManager;

@Mock
private ConsumerOffsetManager consumerOffsetManager;

@Mock
private MessageStoreConfig messageStoreConfig;

@Mock
private MessageStore messageStore;

@Mock
private ScheduleMessageService scheduleMessageService;

@Mock
private SubscriptionGroupManager subscriptionGroupManager;

@Mock
private QueryAssignmentProcessor queryAssignmentProcessor;

@Mock
private MessageRequestModeManager messageRequestModeManager;

@Mock
private TimerMessageStore timerMessageStore;

@Mock
private TimerMetrics timerMetrics;

@Mock
private TimerCheckpoint timerCheckpoint;

private static final String BROKER_ADDR = "127.0.0.1:10911";
private final SubscriptionGroupWrapper subscriptionGroupWrapper = createSubscriptionGroupWrapper();
private final MessageRequestModeSerializeWrapper requestModeSerializeWrapper = createMessageRequestModeWrapper();
private final DataVersion dataVersion = new DataVersion();

@Before
public void init() {
for (int i = 0; i < 100000; i++) {
subscriptionGroupWrapper.getSubscriptionGroupTable().put("group" + i, new SubscriptionGroupConfig());
}
for (int i = 0; i < 100000; i++) {
requestModeSerializeWrapper.getMessageRequestModeMap().put("topic" + i, new ConcurrentHashMap<>());
}
when(brokerController.getBrokerOuterAPI()).thenReturn(brokerOuterAPI);
when(brokerController.getTopicConfigManager()).thenReturn(topicConfigManager);
when(brokerController.getMessageStoreConfig()).thenReturn(messageStoreConfig);
when(brokerController.getScheduleMessageService()).thenReturn(scheduleMessageService);
when(brokerController.getSubscriptionGroupManager()).thenReturn(subscriptionGroupManager);
when(brokerController.getQueryAssignmentProcessor()).thenReturn(queryAssignmentProcessor);
when(brokerController.getMessageStore()).thenReturn(messageStore);
when(brokerController.getTimerMessageStore()).thenReturn(timerMessageStore);
when(brokerController.getTimerCheckpoint()).thenReturn(timerCheckpoint);
when(topicConfigManager.getDataVersion()).thenReturn(new DataVersion());
when(topicConfigManager.getTopicConfigTable()).thenReturn(new ConcurrentHashMap<>());
when(brokerController.getConsumerOffsetManager()).thenReturn(consumerOffsetManager);
when(consumerOffsetManager.getOffsetTable()).thenReturn(new ConcurrentHashMap<>());
when(consumerOffsetManager.getDataVersion()).thenReturn(new DataVersion());
when(subscriptionGroupManager.getDataVersion()).thenReturn(dataVersion);
when(subscriptionGroupManager.getSubscriptionGroupTable()).thenReturn(
subscriptionGroupWrapper.getSubscriptionGroupTable());
when(queryAssignmentProcessor.getMessageRequestModeManager()).thenReturn(messageRequestModeManager);
when(messageRequestModeManager.getMessageRequestModeMap()).thenReturn(
requestModeSerializeWrapper.getMessageRequestModeMap());
when(messageStoreConfig.isTimerWheelEnable()).thenReturn(true);
when(messageStore.getTimerMessageStore()).thenReturn(timerMessageStore);
when(timerMessageStore.isShouldRunningDequeue()).thenReturn(false);
when(timerMessageStore.getTimerMetrics()).thenReturn(timerMetrics);
when(timerMetrics.getDataVersion()).thenReturn(new DataVersion());
when(timerCheckpoint.getDataVersion()).thenReturn(new DataVersion());
slaveSynchronize = new SlaveSynchronize(brokerController);
slaveSynchronize.setMasterAddr(BROKER_ADDR);
}

private TopicConfigAndMappingSerializeWrapper createTopicConfigWrapper(TopicConfig topicConfig) {
TopicConfigAndMappingSerializeWrapper wrapper = new TopicConfigAndMappingSerializeWrapper();
wrapper.setTopicConfigTable(new ConcurrentHashMap<>());
wrapper.getTopicConfigTable().put(topicConfig.getTopicName(), topicConfig);
DataVersion dataVersion = new DataVersion();
dataVersion.setStateVersion(1L);
wrapper.setDataVersion(dataVersion);
wrapper.setMappingDataVersion(dataVersion);
return wrapper;
}

private ConsumerOffsetSerializeWrapper createConsumerOffsetWrapper() {
ConsumerOffsetSerializeWrapper wrapper = new ConsumerOffsetSerializeWrapper();
wrapper.setOffsetTable(new ConcurrentHashMap<>());
DataVersion dataVersion = new DataVersion();
dataVersion.setStateVersion(1L);
wrapper.setDataVersion(dataVersion);
return wrapper;
}

private SubscriptionGroupWrapper createSubscriptionGroupWrapper() {
SubscriptionGroupWrapper wrapper = new SubscriptionGroupWrapper();
wrapper.setSubscriptionGroupTable(new ConcurrentHashMap<>());
DataVersion dataVersion = new DataVersion();
dataVersion.setStateVersion(1L);
wrapper.setDataVersion(dataVersion);
return wrapper;
}

private MessageRequestModeSerializeWrapper createMessageRequestModeWrapper() {
MessageRequestModeSerializeWrapper wrapper = new MessageRequestModeSerializeWrapper();
wrapper.setMessageRequestModeMap(new ConcurrentHashMap<>());
return wrapper;
}

private TimerMetrics.TimerMetricsSerializeWrapper createTimerMetricsWrapper() {
TimerMetrics.TimerMetricsSerializeWrapper wrapper = new TimerMetrics.TimerMetricsSerializeWrapper();
wrapper.setTimingCount(new ConcurrentHashMap<>());
DataVersion dataVersion = new DataVersion();
dataVersion.setStateVersion(1L);
wrapper.setDataVersion(dataVersion);
return wrapper;
}

@Test
public void testSyncAtomically()
throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQBrokerException,
InterruptedException, UnsupportedEncodingException {
when(brokerOuterAPI.getAllTopicConfig(anyString())).thenReturn(
createTopicConfigWrapper(new TopicConfig("NewTopic")));
when(brokerOuterAPI.getAllConsumerOffset(anyString())).thenReturn(createConsumerOffsetWrapper());
when(brokerOuterAPI.getAllDelayOffset(anyString())).thenReturn("");
when(brokerOuterAPI.getAllSubscriptionGroupConfig(anyString())).thenReturn(subscriptionGroupWrapper);
when(brokerOuterAPI.getAllMessageRequestMode(anyString())).thenReturn(requestModeSerializeWrapper);
when(brokerOuterAPI.getTimerMetrics(anyString())).thenReturn(createTimerMetricsWrapper());
when(brokerOuterAPI.getAllConsumerOffset(anyString())).thenReturn(createConsumerOffsetWrapper());
when(brokerOuterAPI.getAllDelayOffset(anyString())).thenReturn("");
when(brokerOuterAPI.getTimerMetrics(anyString())).thenReturn(createTimerMetricsWrapper());

CountDownLatch countDownLatch = new CountDownLatch(1);
new Thread(() -> {
while (countDownLatch.getCount() > 0) {
dataVersion.nextVersion();
try {
slaveSynchronize.syncAll();
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();

for (int i = 0; i < 10000000; i++) {
Assert.assertTrue(subscriptionGroupWrapper.getSubscriptionGroupTable()
.containsKey("group" + ThreadLocalRandom.current().nextInt(0, 100000)));
Assert.assertTrue(requestModeSerializeWrapper.getMessageRequestModeMap()
.containsKey("topic" + ThreadLocalRandom.current().nextInt(0, 100000)));
}
countDownLatch.countDown();
}
}

0 comments on commit bc66462

Please sign in to comment.