Skip to content

Commit

Permalink
Fix query compaction topic offset
Browse files Browse the repository at this point in the history
  • Loading branch information
redlsz committed Nov 6, 2023
1 parent 00965d8 commit 7501bb5
Show file tree
Hide file tree
Showing 5 changed files with 141 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,8 @@ public static String buildPollingKey(String topic, String cid, int queueId) {
public static String buildPollingNotificationKey(String topic, int queueId) {
return topic + PopAckConstants.SPLIT + queueId;
}

public static String buildCompactionLogKey(String topic, int queueId) {
return topic + "_" + queueId;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -983,6 +983,12 @@ public long getMaxOffsetInQueue(String topic, int queueId) {
@Override
public long getMaxOffsetInQueue(String topic, int queueId, boolean committed) {
if (committed) {
Optional<TopicConfig> topicConfig = getTopicConfig(topic);
CleanupPolicy policy = CleanupPolicyUtils.getDeletePolicy(topicConfig);
// Query compaction topic offset from compactionStore
if (Objects.equals(policy, CleanupPolicy.COMPACTION) && messageStoreConfig.isEnableCompaction()) {
return compactionStore.getMaxOffsetInQueue(topic, queueId);
}
ConsumeQueueInterface logic = this.findConsumeQueue(topic, queueId);
if (logic != null) {
return logic.getMaxOffsetInQueue();
Expand All @@ -999,6 +1005,12 @@ public long getMaxOffsetInQueue(String topic, int queueId, boolean committed) {

@Override
public long getMinOffsetInQueue(String topic, int queueId) {
Optional<TopicConfig> topicConfig = getTopicConfig(topic);
CleanupPolicy policy = CleanupPolicyUtils.getDeletePolicy(topicConfig);
// Query compaction topic offset from compactionStore
if (Objects.equals(policy, CleanupPolicy.COMPACTION) && messageStoreConfig.isEnableCompaction()) {
return compactionStore.getMinOffsetInQueue(topic, queueId);
}
try {
return this.consumeQueueStore.getMinOffsetInQueue(topic, queueId);
} catch (RocksDBException e) {
Expand Down Expand Up @@ -1036,6 +1048,12 @@ public long getOffsetInQueueByTime(String topic, int queueId, long timestamp) {

@Override
public long getOffsetInQueueByTime(String topic, int queueId, long timestamp, BoundaryType boundaryType) {
Optional<TopicConfig> topicConfig = getTopicConfig(topic);
CleanupPolicy policy = CleanupPolicyUtils.getDeletePolicy(topicConfig);
// Query compaction topic offset from compactionStore
if (Objects.equals(policy, CleanupPolicy.COMPACTION) && messageStoreConfig.isEnableCompaction()) {
return compactionStore.getOffsetInQueueByTime(topic, queueId, timestamp, boundaryType);
}
try {
return this.consumeQueueStore.getOffsetInQueueByTime(topic, queueId, timestamp, boundaryType);
} catch (RocksDBException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.rocketmq.store.kv;

import org.apache.rocketmq.common.ConfigManager;
import org.apache.rocketmq.common.KeyBuilder;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;

import java.io.File;
Expand All @@ -42,11 +43,11 @@ public CompactionPositionMgr(final String compactionPath) {
}

public void setOffset(String topic, int queueId, final long offset) {
queueOffsetMap.put(topic + "_" + queueId, offset);
queueOffsetMap.put(KeyBuilder.buildCompactionLogKey(topic, queueId), offset);
}

public long getOffset(String topic, int queueId) {
return queueOffsetMap.getOrDefault(topic + "_" + queueId, -1L);
return queueOffsetMap.getOrDefault(KeyBuilder.buildCompactionLogKey(topic, queueId), -1L);
}

public boolean isEmpty() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.common.BoundaryType;
import org.apache.rocketmq.common.KeyBuilder;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.attribute.CleanupPolicy;
Expand Down Expand Up @@ -144,7 +146,7 @@ private void scanAllTopicConfig() {
}

private CompactionLog loadAndGetClog(String topic, int queueId) {
CompactionLog clog = compactionLogTable.compute(topic + "_" + queueId, (k, v) -> {
CompactionLog clog = compactionLogTable.compute(KeyBuilder.buildCompactionLogKey(topic, queueId), (k, v) -> {
if (v == null) {
try {
v = new CompactionLog(defaultMessageStore, this, topic, queueId);
Expand Down Expand Up @@ -179,7 +181,7 @@ public void doDispatch(DispatchRequest dispatchRequest, SelectMappedBufferResult

public GetMessageResult getMessage(final String group, final String topic, final int queueId, final long offset,
final int maxMsgNums, final int maxTotalMsgSize) {
CompactionLog log = compactionLogTable.get(topic + "_" + queueId);
CompactionLog log = compactionLogTable.get(KeyBuilder.buildCompactionLogKey(topic, queueId));
if (log == null) {
return GetMessageResult.NO_MATCH_LOGIC_QUEUE;
} else {
Expand All @@ -188,6 +190,31 @@ public GetMessageResult getMessage(final String group, final String topic, final

}

public long getMaxOffsetInQueue(final String topic, final int queueId) {
CompactionLog log = compactionLogTable.get(KeyBuilder.buildCompactionLogKey(topic, queueId));
if (log == null) {
return 0;
}
return log.getCQ().getMaxOffsetInQueue();
}

public long getMinOffsetInQueue(final String topic, final int queueId) {
CompactionLog log = compactionLogTable.get(KeyBuilder.buildCompactionLogKey(topic, queueId));
if (log == null) {
return -1;
}
return log.getCQ().getMinOffsetInQueue();
}

public long getOffsetInQueueByTime(final String topic, final int queueId, final long timestamp,
BoundaryType boundaryType) {
CompactionLog log = compactionLogTable.get(KeyBuilder.buildCompactionLogKey(topic, queueId));
if (log == null) {
return 0;
}
return log.getCQ().getOffsetInQueueByTime(timestamp, boundaryType);
}

public void flush(int flushLeastPages) {
compactionLogTable.values().forEach(log -> log.flush(flushLeastPages));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
Expand All @@ -43,7 +44,9 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.attribute.CleanupPolicy;
import org.apache.rocketmq.common.message.MessageBatch;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
Expand All @@ -65,6 +68,7 @@
import org.junit.runner.RunWith;
import org.mockito.junit.MockitoJUnitRunner;

import static org.apache.rocketmq.common.TopicAttributes.CLEANUP_POLICY_ATTRIBUTE;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
Expand All @@ -73,6 +77,7 @@
public class DefaultMessageStoreTest {
private final String storeMessage = "Once, there was a chance for me!";
private final String messageTopic = "FooBar";
private final String compactionTopic = "CT";
private int queueTotal = 100;
private AtomicInteger queueId = new AtomicInteger(0);
private SocketAddress bornHost;
Expand Down Expand Up @@ -145,10 +150,17 @@ private MessageStore buildMessageStore(String storePathRootDir) throws Exception
storePathRootDir = System.getProperty("java.io.tmpdir") + File.separator + "store-" + uuid.toString();
}
messageStoreConfig.setStorePathRootDir(storePathRootDir);
// build compaction topic config
Map<String, String> attributes = new HashMap<>();
attributes.put(CLEANUP_POLICY_ATTRIBUTE.getName(), CleanupPolicy.COMPACTION.name());
TopicConfig topicConfig = new TopicConfig(compactionTopic);
topicConfig.setAttributes(attributes);
ConcurrentMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<>();
topicConfigTable.put(compactionTopic, topicConfig);
return new DefaultMessageStore(messageStoreConfig,
new BrokerStatsManager("simpleTest", true),
new MyMessageArrivingListener(),
new BrokerConfig(), new ConcurrentHashMap<>());
new BrokerConfig(), topicConfigTable);
}

@Test
Expand Down Expand Up @@ -538,16 +550,53 @@ public void testGroupCommit() throws Exception {
verifyThatMasterIsFunctional(totalMsgs, messageStore);
}

@Test
public void testMinOffset() throws InterruptedException {
testMinOffset(messageTopic);
}

@Test
public void testCompactionTopicMinOffset() throws InterruptedException {
testMinOffset(compactionTopic);
}

private void testMinOffset(String topic) throws InterruptedException {
int batchMessages = 2;
int queueId = 0;
messageBody = storeMessage.getBytes();

for (int i = 0; i < batchMessages; i++) {
final MessageExtBrokerInner msg = buildMessage(messageBody, topic);
msg.setQueueId(queueId);
messageStore.putMessage(msg);
}

while (messageStore.dispatchBehindBytes() != 0) {
TimeUnit.MILLISECONDS.sleep(1);
}

assertThat(messageStore.getMinOffsetInQueue(topic, queueId)).isEqualTo(0);
}

@Test
public void testMaxOffset() throws InterruptedException {
testMaxOffset(messageTopic);
}

@Test
public void testCompactionTopicMaxOffset() throws InterruptedException {
testMaxOffset(compactionTopic);
}

private void testMaxOffset(String topic) throws InterruptedException {
int firstBatchMessages = 3;
int queueId = 0;
messageBody = storeMessage.getBytes();

assertThat(messageStore.getMaxOffsetInQueue(messageTopic, queueId)).isEqualTo(0);
assertThat(messageStore.getMaxOffsetInQueue(topic, queueId)).isEqualTo(0);

for (int i = 0; i < firstBatchMessages; i++) {
final MessageExtBrokerInner msg = buildMessage();
final MessageExtBrokerInner msg = buildMessage(messageBody, topic);
msg.setQueueId(queueId);
messageStore.putMessage(msg);
}
Expand All @@ -556,22 +605,52 @@ public void testMaxOffset() throws InterruptedException {
TimeUnit.MILLISECONDS.sleep(1);
}

assertThat(messageStore.getMaxOffsetInQueue(messageTopic, queueId)).isEqualTo(firstBatchMessages);
assertThat(messageStore.getMaxOffsetInQueue(topic, queueId)).isEqualTo(firstBatchMessages);

// Disable the dispatcher
messageStore.getDispatcherList().clear();

int secondBatchMessages = 2;

for (int i = 0; i < secondBatchMessages; i++) {
final MessageExtBrokerInner msg = buildMessage();
final MessageExtBrokerInner msg = buildMessage(messageBody, topic);
msg.setQueueId(queueId);
messageStore.putMessage(msg);
}

assertThat(messageStore.getMaxOffsetInQueue(messageTopic, queueId)).isEqualTo(firstBatchMessages);
assertThat(messageStore.getMaxOffsetInQueue(messageTopic, queueId, true)).isEqualTo(firstBatchMessages);
assertThat(messageStore.getMaxOffsetInQueue(messageTopic, queueId, false)).isEqualTo(firstBatchMessages + secondBatchMessages);
assertThat(messageStore.getMaxOffsetInQueue(topic, queueId)).isEqualTo(firstBatchMessages);
assertThat(messageStore.getMaxOffsetInQueue(topic, queueId, true)).isEqualTo(firstBatchMessages);
assertThat(messageStore.getMaxOffsetInQueue(topic, queueId, false)).isEqualTo(firstBatchMessages + secondBatchMessages);
}

@Test
public void testOffsetByTime() throws InterruptedException {
testOffsetByTime(messageTopic);
}

@Test
public void testCompactionTopicOffsetByTime() throws InterruptedException {
testOffsetByTime(compactionTopic);
}

private void testOffsetByTime(String topic) throws InterruptedException {
int batchMessages = 3;
int queueId = 0;
messageBody = storeMessage.getBytes();

long timestamp = System.currentTimeMillis();
for (int i = 0; i < batchMessages; i++) {
final MessageExtBrokerInner msg = buildMessage(messageBody, topic);
msg.setQueueId(queueId);
messageStore.putMessage(msg);
TimeUnit.SECONDS.sleep(1);
}

while (messageStore.dispatchBehindBytes() != 0) {
TimeUnit.MILLISECONDS.sleep(1);
}

assertThat(messageStore.getOffsetInQueueByTime(topic, queueId, timestamp + 1000L)).isEqualTo(1);
}

private MessageExtBrokerInner buildIPv6HostMessage() {
Expand Down

0 comments on commit 7501bb5

Please sign in to comment.