Skip to content

Commit

Permalink
Update
Browse files Browse the repository at this point in the history
  • Loading branch information
yx9o committed Jan 4, 2025
1 parent c1bad83 commit 54929bb
Show file tree
Hide file tree
Showing 7 changed files with 75 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -290,17 +290,15 @@ public void testSendHeartbeatToBrokerV1() {

@Test
public void testSendHeartbeatToBrokerV2() throws MQBrokerException, RemotingException, InterruptedException {
if (!MixAll.isJdk8()) {
return;
}
consumerTable.put(group, createMQConsumerInner());
when(clientConfig.isUseHeartbeatV2()).thenReturn(true);
boolean isJdk8 = System.getProperty("java.version").startsWith("1.8.");
if (isJdk8) {
HeartbeatV2Result heartbeatV2Result = mock(HeartbeatV2Result.class);
when(heartbeatV2Result.isSupportV2()).thenReturn(true);
when(mQClientAPIImpl.sendHeartbeatV2(any(), any(HeartbeatData.class), anyLong())).thenReturn(heartbeatV2Result);
assertTrue(mqClientInstance.sendHeartbeatToBroker(0L, defaultBroker, defaultBrokerAddr));
} else {
assertFalse(mqClientInstance.sendHeartbeatToBroker(0L, defaultBroker, defaultBrokerAddr));
}
HeartbeatV2Result heartbeatV2Result = mock(HeartbeatV2Result.class);
when(heartbeatV2Result.isSupportV2()).thenReturn(true);
when(mQClientAPIImpl.sendHeartbeatV2(any(), any(HeartbeatData.class), anyLong())).thenReturn(heartbeatV2Result);
assertTrue(mqClientInstance.sendHeartbeatToBroker(0L, defaultBroker, defaultBrokerAddr));
}

@Test
Expand All @@ -312,15 +310,13 @@ public void testSendHeartbeatToAllBrokerWithLockV1() {

@Test
public void testSendHeartbeatToAllBrokerWithLockV2() {
if (!MixAll.isJdk8()) {
return;
}
brokerAddrTable.put(defaultBroker, createBrokerAddrMap());
consumerTable.put(group, createMQConsumerInner());
when(clientConfig.isUseHeartbeatV2()).thenReturn(true);
boolean isJdk8 = System.getProperty("java.version").startsWith("1.8.");
if (isJdk8) {
assertTrue(mqClientInstance.sendHeartbeatToAllBrokerWithLock());
} else {
assertFalse(mqClientInstance.sendHeartbeatToAllBrokerWithLock());
}
assertTrue(mqClientInstance.sendHeartbeatToAllBrokerWithLock());
}

@Test
Expand Down
4 changes: 4 additions & 0 deletions common/src/main/java/org/apache/rocketmq/common/MixAll.java
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,10 @@ public static boolean isMac() {
return OS.contains("mac");
}

public static boolean isJdk8() {
return System.getProperty("java.version").startsWith("1.8.");
}

public static boolean isUnix() {
return OS.contains("nix")
|| OS.contains("nux")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@
*/
package org.apache.rocketmq.tools.command.connection;

import java.util.HashSet;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.DefaultParser;
import org.apache.commons.cli.Options;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.remoting.protocol.body.Connection;
import org.apache.rocketmq.remoting.protocol.body.ConsumerConnection;
import org.apache.rocketmq.srvutil.ServerUtil;
Expand All @@ -30,6 +30,8 @@
import org.junit.Before;
import org.junit.Test;

import java.util.HashSet;

import static org.mockito.Mockito.mock;

public class ConsumerConnectionSubCommandTest {
Expand All @@ -39,18 +41,28 @@ public class ConsumerConnectionSubCommandTest {

@Before
public void before() {
if (!MixAll.isJdk8()) {
return;
}
brokerMocker = startOneBroker();
nameServerMocker = NameServerMocker.startByDefaultConf(brokerMocker.listenPort());
}

@After
public void after() {
brokerMocker.shutdown();
nameServerMocker.shutdown();
if (null != brokerMocker) {
brokerMocker.shutdown();
}
if (null != nameServerMocker) {
nameServerMocker.shutdown();
}
}

@Test
public void testExecute() throws SubCommandException {
if (!MixAll.isJdk8()) {
return;
}
ConsumerConnectionSubCommand cmd = new ConsumerConnectionSubCommand();
Options options = ServerUtil.buildCommandlineOptions(new Options());
String[] subargs = new String[] {"-g default-consumer-group", "-b localhost:" + brokerMocker.listenPort()};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.DefaultParser;
import org.apache.commons.cli.Options;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.remoting.protocol.body.Connection;
import org.apache.rocketmq.remoting.protocol.body.ProducerConnection;
import org.apache.rocketmq.srvutil.ServerUtil;
Expand All @@ -40,18 +41,28 @@ public class ProducerConnectionSubCommandTest {

@Before
public void before() {
if (!MixAll.isJdk8()) {
return;
}
brokerMocker = startOneBroker();
nameServerMocker = NameServerMocker.startByDefaultConf(brokerMocker.listenPort());
}

@After
public void after() {
brokerMocker.shutdown();
nameServerMocker.shutdown();
if (null != brokerMocker) {
brokerMocker.shutdown();
}
if (null != nameServerMocker) {
nameServerMocker.shutdown();
}
}

@Test
public void testExecute() throws SubCommandException {
if (!MixAll.isJdk8()) {
return;
}
ProducerConnectionSubCommand cmd = new ProducerConnectionSubCommand();
Options options = ServerUtil.buildCommandlineOptions(new Options());
String[] subargs = new String[] {"-g default-producer-group", "-t unit-test", String.format("-n localhost:%d", nameServerMocker.listenPort())};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.DefaultParser;
import org.apache.commons.cli.Options;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.protocol.admin.ConsumeStats;
import org.apache.rocketmq.remoting.protocol.admin.OffsetWrapper;
Expand All @@ -40,19 +41,29 @@ public class ConsumerProgressSubCommandTest {

@Before
public void before() {
if (!MixAll.isJdk8()) {
return;
}
brokerMocker = startOneBroker();
nameServerMocker = NameServerMocker.startByDefaultConf(brokerMocker.listenPort());
}

@After
public void after() {
brokerMocker.shutdown();
nameServerMocker.shutdown();
if (null != brokerMocker) {
brokerMocker.shutdown();
}
if (null != nameServerMocker) {
nameServerMocker.shutdown();
}
}

@Ignore
@Test
public void testExecute() throws SubCommandException {
if (!MixAll.isJdk8()) {
return;
}
ConsumerProgressSubCommand cmd = new ConsumerProgressSubCommand();
Options options = ServerUtil.buildCommandlineOptions(new Options());
String[] subargs = new String[] {"-g default-group",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.DefaultParser;
import org.apache.commons.cli.Options;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.remoting.protocol.body.Connection;
import org.apache.rocketmq.remoting.protocol.body.ConsumerConnection;
import org.apache.rocketmq.srvutil.ServerUtil;
Expand All @@ -40,11 +41,11 @@ public class ConsumerStatusSubCommandTest {

@Before
public void before() {
boolean isJdk8 = System.getProperty("java.version").startsWith("1.8.");
if (isJdk8) {
brokerMocker = startOneBroker();
nameServerMocker = NameServerMocker.startByDefaultConf(brokerMocker.listenPort());
if (!MixAll.isJdk8()) {
return;
}
brokerMocker = startOneBroker();
nameServerMocker = NameServerMocker.startByDefaultConf(brokerMocker.listenPort());
}

@After
Expand All @@ -59,8 +60,7 @@ public void after() {

@Test
public void testExecute() throws SubCommandException {
boolean isJdk8 = System.getProperty("java.version").startsWith("1.8.");
if (!isJdk8) {
if (!MixAll.isJdk8()) {
return;
}
ConsumerStatusSubCommand cmd = new ConsumerStatusSubCommand();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.DefaultParser;
import org.apache.commons.cli.Options;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.remoting.protocol.body.ClusterInfo;
import org.apache.rocketmq.remoting.protocol.body.Connection;
import org.apache.rocketmq.remoting.protocol.body.ConsumerConnection;
Expand All @@ -43,18 +44,28 @@ public class GetConsumerConfigSubCommandTest {

@Before
public void before() {
if (!MixAll.isJdk8()) {
return;
}
brokerMocker = startOneBroker();
nameServerMocker = startNameServer();
}

@After
public void after() {
brokerMocker.shutdown();
nameServerMocker.shutdown();
if (null != brokerMocker) {
brokerMocker.shutdown();
}
if (null != nameServerMocker) {
nameServerMocker.shutdown();
}
}

@Test
public void testExecute() throws SubCommandException {
if (!MixAll.isJdk8()) {
return;
}
GetConsumerConfigSubCommand cmd = new GetConsumerConfigSubCommand();
Options options = ServerUtil.buildCommandlineOptions(new Options());
String[] subargs = new String[] {"-g group_test", String.format("-n localhost:%d", nameServerMocker.listenPort())};
Expand Down

0 comments on commit 54929bb

Please sign in to comment.