Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #9112] Speedup revive scan in Pop Consumption and support server side reset offset #9113

Merged
merged 2 commits into from
Jan 8, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,13 @@ public interface PopConsumerKVStore {
void deleteRecords(List<PopConsumerRecord> consumerRecordList);

/**
* Scans and returns a list of expired consumer records before the current time.
* @param currentTime The current revive checkpoint timestamp.
* Scans and returns a list of expired consumer records within the specified time range.
* @param lowerTime The start time (inclusive) of the time range to search, in milliseconds.
* @param upperTime The end time (exclusive) of the time range to search, in milliseconds.
* @param maxCount The maximum number of records to return.
* @return A list of expired consumer records.
* Even if more records match the criteria, only this many will be returned.
* @return A list of expired consumer records within the specified time range.
* If no matching records are found, an empty list is returned.
*/
List<PopConsumerRecord> scanExpiredRecords(long currentTime, int maxCount);
List<PopConsumerRecord> scanExpiredRecords(long lowerTime, long upperTime, int maxCount);
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,11 @@
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ColumnFamilyOptions;
import org.rocksdb.CompactRangeOptions;
import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.RocksIterator;
import org.rocksdb.Slice;
import org.rocksdb.WriteBatch;
import org.rocksdb.WriteOptions;
import org.slf4j.Logger;
Expand All @@ -43,7 +45,7 @@ public class PopConsumerRocksdbStore extends AbstractRocksDBStorage implements P

private WriteOptions writeOptions;
private WriteOptions deleteOptions;
private ColumnFamilyHandle columnFamilyHandle;
protected ColumnFamilyHandle columnFamilyHandle;

public PopConsumerRocksdbStore(String filePath) {
super(filePath);
Expand All @@ -60,8 +62,7 @@ protected void initOptions() {
this.writeOptions.setNoSlowdown(false);

this.deleteOptions = new WriteOptions();
this.deleteOptions.setSync(false);
this.deleteOptions.setLowPri(true);
this.deleteOptions.setSync(true);
this.deleteOptions.setDisableWAL(false);
this.deleteOptions.setNoSlowdown(false);

Expand Down Expand Up @@ -135,18 +136,19 @@ public void deleteRecords(List<PopConsumerRecord> consumerRecordList) {
}

@Override
public List<PopConsumerRecord> scanExpiredRecords(long currentTime, int maxCount) {
// https://github.com/facebook/rocksdb/issues/10300
public List<PopConsumerRecord> scanExpiredRecords(long lower, long upper, int maxCount) {
// In RocksDB, we can use SstPartitionerFixedPrefixFactory in cfOptions
// and new ColumnFamilyOptions().useFixedLengthPrefixExtractor() to
// configure prefix indexing to improve the performance of scans.
// However, in the current implementation, this is not the bottleneck.
List<PopConsumerRecord> consumerRecordList = new ArrayList<>();
try (RocksIterator iterator = db.newIterator(this.columnFamilyHandle)) {
iterator.seekToFirst();
try (ReadOptions scanOptions = new ReadOptions()
.setIterateLowerBound(new Slice(ByteBuffer.allocate(Long.BYTES).putLong(lower).array()))
.setIterateUpperBound(new Slice(ByteBuffer.allocate(Long.BYTES).putLong(upper).array()));
RocksIterator iterator = db.newIterator(this.columnFamilyHandle, scanOptions)) {
iterator.seek(ByteBuffer.allocate(Long.BYTES).putLong(lower).array());
while (iterator.isValid() && consumerRecordList.size() < maxCount) {
if (ByteBuffer.wrap(iterator.key()).getLong() > currentTime) {
break;
}
consumerRecordList.add(PopConsumerRecord.decode(iterator.value()));
iterator.next();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ public class PopConsumerService extends ServiceThread {
private final AtomicBoolean consumerRunning;
private final BrokerConfig brokerConfig;
private final BrokerController brokerController;
private final AtomicLong currentTime;
private final AtomicLong lastCleanupLockTime;
private final PopConsumerCache popConsumerCache;
private final PopConsumerKVStore popConsumerStore;
Expand All @@ -88,6 +89,7 @@ public PopConsumerService(BrokerController brokerController) {

this.consumerRunning = new AtomicBoolean(false);
this.requestCountTable = new ConcurrentHashMap<>();
this.currentTime = new AtomicLong(TimeUnit.SECONDS.toMillis(3));
this.lastCleanupLockTime = new AtomicLong(System.currentTimeMillis());
this.consumerLockService = new PopConsumerLockService(TimeUnit.MINUTES.toMillis(2));
this.popConsumerStore = new PopConsumerRocksdbStore(Paths.get(
Expand Down Expand Up @@ -195,12 +197,27 @@ public PopConsumerContext addGetMessageResult(PopConsumerContext context, GetMes
return context;
}

public Long getPopOffset(String groupId, String topicId, int queueId) {
Long resetOffset =
this.brokerController.getConsumerOffsetManager().queryThenEraseResetOffset(topicId, groupId, queueId);
if (resetOffset != null) {
this.clearCache(groupId, topicId, queueId);
this.brokerController.getConsumerOrderInfoManager().clearBlock(topicId, groupId, queueId);
this.brokerController.getConsumerOffsetManager()
.commitOffset("ResetPopOffset", groupId, topicId, queueId, resetOffset);
}
return resetOffset;
}

public CompletableFuture<GetMessageResult> getMessageAsync(String clientHost,
String groupId, String topicId, int queueId, long offset, int batchSize, MessageFilter filter) {

log.debug("PopConsumerService getMessageAsync, groupId={}, topicId={}, queueId={}, offset={}, batchSize={}, filter={}",
groupId, topicId, offset, queueId, batchSize, filter != null);

Long resetOffset = this.getPopOffset(groupId, topicId, queueId);
final long currentOffset = resetOffset != null ? resetOffset : offset;

CompletableFuture<GetMessageResult> getMessageFuture =
brokerController.getMessageStore().getMessageAsync(groupId, topicId, queueId, offset, batchSize, filter);

Expand All @@ -223,7 +240,7 @@ public CompletableFuture<GetMessageResult> getMessageAsync(String clientHost,

log.warn("PopConsumerService getMessageAsync, initial offset because store is no correct, " +
"groupId={}, topicId={}, queueId={}, batchSize={}, offset={}->{}",
groupId, topicId, queueId, batchSize, offset, result.getNextBeginOffset());
groupId, topicId, queueId, batchSize, currentOffset, result.getNextBeginOffset());

return brokerController.getMessageStore().getMessageAsync(
groupId, topicId, queueId, result.getNextBeginOffset(), batchSize, filter);
Expand Down Expand Up @@ -482,10 +499,12 @@ public void clearCache(String groupId, String topicId, int queueId) {
}
}

public long revive(long currentTime, int maxCount) {
public long revive(AtomicLong currentTime, int maxCount) {
Stopwatch stopwatch = Stopwatch.createStarted();
List<PopConsumerRecord> consumerRecords =
this.popConsumerStore.scanExpiredRecords(currentTime, maxCount);
long upperTime = System.currentTimeMillis() - 50L;
List<PopConsumerRecord> consumerRecords = this.popConsumerStore.scanExpiredRecords(
currentTime.get() - TimeUnit.SECONDS.toMillis(3), upperTime, maxCount);
long scanCostTime = stopwatch.elapsed(TimeUnit.MILLISECONDS);
Queue<PopConsumerRecord> failureList = new LinkedBlockingQueue<>();
List<CompletableFuture<?>> futureList = new ArrayList<>(consumerRecords.size());

Expand All @@ -497,9 +516,9 @@ public long revive(long currentTime, int maxCount) {
long backoffInterval = 1000L * REWRITE_INTERVALS_IN_SECONDS[
Math.min(REWRITE_INTERVALS_IN_SECONDS.length, record.getAttemptTimes())];
long nextInvisibleTime = record.getInvisibleTime() + backoffInterval;
PopConsumerRecord retryRecord = new PopConsumerRecord(record.getPopTime(), record.getGroupId(),
record.getTopicId(), record.getQueueId(), record.getRetryFlag(), nextInvisibleTime,
record.getOffset(), record.getAttemptId());
PopConsumerRecord retryRecord = new PopConsumerRecord(System.currentTimeMillis(),
record.getGroupId(), record.getTopicId(), record.getQueueId(),
record.getRetryFlag(), nextInvisibleTime, record.getOffset(), record.getAttemptId());
retryRecord.setAttemptTimes(record.getAttemptTimes() + 1);
failureList.add(retryRecord);
log.warn("PopConsumerService revive backoff retry, record={}", retryRecord);
Expand All @@ -513,14 +532,20 @@ public long revive(long currentTime, int maxCount) {
CompletableFuture.allOf(futureList.toArray(new CompletableFuture[0])).join();
this.popConsumerStore.writeRecords(new ArrayList<>(failureList));
this.popConsumerStore.deleteRecords(consumerRecords);
currentTime.set(consumerRecords.isEmpty() ?
upperTime : consumerRecords.get(consumerRecords.size() - 1).getVisibilityTimeout());

if (brokerConfig.isEnablePopBufferMerge()) {
log.info("PopConsumerService, key size={}, cache size={}, revive count={}, failure count={}, cost={}ms",
popConsumerCache.getCacheKeySize(), popConsumerCache.getCacheSize(), consumerRecords.size(),
failureList.size(), stopwatch.elapsed(TimeUnit.MILLISECONDS));
log.info("PopConsumerService, key size={}, cache size={}, revive count={}, failure count={}, " +
"behindInMillis={}, scanInMillis={}, costInMillis={}",
popConsumerCache.getCacheKeySize(), popConsumerCache.getCacheSize(),
consumerRecords.size(), failureList.size(), upperTime - currentTime.get(),
scanCostTime, stopwatch.elapsed(TimeUnit.MILLISECONDS));
} else {
log.info("PopConsumerService, revive count={}, failure count={}, cost={}ms",
consumerRecords.size(), failureList.size(), stopwatch.elapsed(TimeUnit.MILLISECONDS));
log.info("PopConsumerService, revive count={}, failure count={}, " +
"behindInMillis={}, scanInMillis={}, costInMillis={}",
consumerRecords.size(), failureList.size(), upperTime - currentTime.get(),
scanCostTime, stopwatch.elapsed(TimeUnit.MILLISECONDS));
}

return consumerRecords.size();
Expand Down Expand Up @@ -588,11 +613,6 @@ public boolean reviveRetry(PopConsumerRecord record, MessageExt messageExt) {
PutMessageResult putMessageResult =
brokerController.getEscapeBridge().putMessageToSpecificQueue(msgInner);

if (brokerConfig.isEnablePopLog()) {
log.debug("PopConsumerService revive retry msg, put status={}, ck={}, delay={}ms",
putMessageResult, JSON.toJSONString(record), System.currentTimeMillis() - record.getVisibilityTimeout());
}

if (putMessageResult.getAppendMessageResult() == null ||
putMessageResult.getAppendMessageResult().getStatus() != AppendMessageStatus.PUT_OK) {
log.error("PopConsumerService revive retry msg error, put status={}, ck={}, delay={}ms",
Expand All @@ -616,7 +636,7 @@ public synchronized void transferToFsStore() {
while (true) {
try {
List<PopConsumerRecord> consumerRecords = this.popConsumerStore.scanExpiredRecords(
Long.MAX_VALUE, brokerConfig.getPopReviveMaxReturnSizePerRead());
0, Long.MAX_VALUE, brokerConfig.getPopReviveMaxReturnSizePerRead());
if (consumerRecords == null || consumerRecords.isEmpty()) {
break;
}
Expand Down Expand Up @@ -695,7 +715,7 @@ public void run() {
while (!isStopped()) {
try {
// to prevent concurrency issues during read and write operations
long reviveCount = this.revive(System.currentTimeMillis() - 50L,
long reviveCount = this.revive(this.currentTime,
brokerConfig.getPopReviveMaxReturnSizePerRead());

long current = System.currentTimeMillis();
Expand All @@ -704,7 +724,7 @@ public void run() {
this.lastCleanupLockTime.set(current);
}

if (reviveCount == 0) {
if (reviveCount < brokerConfig.getPopReviveMaxReturnSizePerRead()) {
this.waitForRunning(500);
}
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,26 @@
*/
package org.apache.rocketmq.broker.pop;

import com.google.common.base.Stopwatch;
import java.io.File;
import java.io.IOException;
import java.lang.reflect.Field;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.commons.io.FileUtils;
import org.apache.rocketmq.common.config.AbstractRocksDBStorage;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.tieredstore.util.MessageStoreUtil;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -85,18 +93,101 @@ public void rocksdbStoreWriteDeleteTest() {
.collect(Collectors.toList()));

List<PopConsumerRecord> consumerRecords =
consumerStore.scanExpiredRecords(20002, 2);
consumerStore.scanExpiredRecords(0, 20002, 2);
Assert.assertEquals(2, consumerRecords.size());
consumerStore.deleteRecords(consumerRecords);

consumerRecords = consumerStore.scanExpiredRecords(20002, 2);
consumerRecords = consumerStore.scanExpiredRecords(0, 20003, 2);
Assert.assertEquals(1, consumerRecords.size());
consumerStore.deleteRecords(consumerRecords);

consumerRecords = consumerStore.scanExpiredRecords(20004, 3);
consumerRecords = consumerStore.scanExpiredRecords(0, 20005, 3);
Assert.assertEquals(2, consumerRecords.size());

consumerStore.shutdown();
deleteStoreDirectory(filePath);
}

private long getDirectorySizeRecursive(File directory) {
long size = 0;
File[] files = directory.listFiles();
if (files != null) {
for (File file : files) {
if (file.isFile()) {
size += file.length();
} else if (file.isDirectory()) {
size += getDirectorySizeRecursive(file);
}
}
}
return size;
}

@Test
@Ignore
@SuppressWarnings("ConstantValue")
public void tombstoneDeletionTest() throws IllegalAccessException, NoSuchFieldException {
PopConsumerRocksdbStore rocksdbStore = new PopConsumerRocksdbStore(getRandomStorePath());
rocksdbStore.start();

int iterCount = 1000 * 1000;
boolean useSeekFirstDelete = false;
Field dbField = AbstractRocksDBStorage.class.getDeclaredField("db");
dbField.setAccessible(true);
RocksDB rocksDB = (RocksDB) dbField.get(rocksdbStore);

long currentTime = 0L;
Stopwatch stopwatch = Stopwatch.createStarted();
for (int i = 0; i < iterCount; i++) {
List<PopConsumerRecord> records = new ArrayList<>();
for (int j = 0; j < 1000; j++) {
PopConsumerRecord record = getConsumerRecord();
record.setPopTime((long) i * iterCount + j);
record.setGroupId("GroupTest");
record.setTopicId("TopicTest");
record.setQueueId(i % 10);
record.setRetryFlag(0);
record.setInvisibleTime(TimeUnit.SECONDS.toMillis(30));
record.setOffset(i);
records.add(record);
}
rocksdbStore.writeRecords(records);

long start = stopwatch.elapsed(TimeUnit.MILLISECONDS);
List<PopConsumerRecord> deleteList = new ArrayList<>();
if (useSeekFirstDelete) {
try (RocksIterator iterator = rocksDB.newIterator(rocksdbStore.columnFamilyHandle)) {
iterator.seekToFirst();
if (i % 10 == 0) {
long fileSize = getDirectorySizeRecursive(new File(rocksdbStore.getFilePath()));
log.info("DirectorySize={}, Cost={}ms",
MessageStoreUtil.toHumanReadable(fileSize), stopwatch.elapsed(TimeUnit.MILLISECONDS) - start);
}
while (iterator.isValid() && deleteList.size() < 1024) {
deleteList.add(PopConsumerRecord.decode(iterator.value()));
iterator.next();
}
}
} else {
long upper = System.currentTimeMillis();
deleteList = rocksdbStore.scanExpiredRecords(currentTime, upper, 800);
if (!deleteList.isEmpty()) {
currentTime = deleteList.get(deleteList.size() - 1).getVisibilityTimeout();
}
long scanCost = stopwatch.elapsed(TimeUnit.MILLISECONDS) - start;
if (i % 100 == 0) {
long fileSize = getDirectorySizeRecursive(new File(rocksdbStore.getFilePath()));
long seekTime = stopwatch.elapsed(TimeUnit.MILLISECONDS);
try (RocksIterator iterator = rocksDB.newIterator(rocksdbStore.columnFamilyHandle)) {
iterator.seekToFirst();
}
log.info("DirectorySize={}, Cost={}ms, SeekFirstCost={}ms", MessageStoreUtil.toHumanReadable(fileSize),
scanCost, stopwatch.elapsed(TimeUnit.MILLISECONDS) - seekTime);
}
}
rocksdbStore.deleteRecords(deleteList);
}
rocksdbStore.shutdown();
deleteStoreDirectory(rocksdbStore.getFilePath());
}
}
Loading
Loading