Skip to content

Commit

Permalink
fix tiered store ConstructIndexFile ConcurrentModificationException,a…
Browse files Browse the repository at this point in the history
…dd test
  • Loading branch information
wangshaojie4039 committed Dec 24, 2024
1 parent 6fd33fb commit b675339
Show file tree
Hide file tree
Showing 5 changed files with 203 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.rocketmq.tieredstore.core;

import com.google.common.annotations.VisibleForTesting;
import io.opentelemetry.api.common.Attributes;
import java.nio.ByteBuffer;
import java.time.Duration;
Expand Down Expand Up @@ -91,6 +92,11 @@ public String getServiceName() {
return MessageStoreDispatcher.class.getSimpleName();
}

@VisibleForTesting
public Map<FlatFileInterface, GroupCommitContext> getFailedGroupCommitMap() {
return failedGroupCommitMap;
}

public void dispatchWithSemaphore(FlatFileInterface flatFile) {
try {
if (stopped) {
Expand Down Expand Up @@ -162,7 +168,7 @@ public CompletableFuture<Boolean> doScheduleDispatch(FlatFileInterface flatFile,
if (commitOffset < currentOffset) {
this.commitAsync(flatFile).whenComplete((result, throwable) -> {
if (throwable != null) {
log.error("topic: {}, queueId: {} flat file flush cache failed more than twice.", topic, queueId, throwable);
log.error("MessageDispatcher#flatFile commitOffset less than currentOffset, commitAsync again failed. topic: {}, queueId: {} ", topic, queueId, throwable);
}
});
return CompletableFuture.completedFuture(false);
Expand Down Expand Up @@ -302,7 +308,7 @@ public CompletableFuture<Boolean> doScheduleDispatch(FlatFileInterface flatFile,
//next commit async,execute constructIndexFile.
GroupCommitContext oldCommit = failedGroupCommitMap.put(flatFile, groupCommitContext);
if (oldCommit != null) {
log.warn("MessageDispatcher#dispatch, topic={}, queueId={} old failed commit context not release", topic, queueId);
log.warn("MessageDispatcher#commitAsync failed,flatFile old failed commit context not release, topic={}, queueId={} ", topic, queueId);
oldCommit.release();
}
}
Expand All @@ -326,7 +332,7 @@ public CompletableFuture<Boolean> commitAsync(FlatFileInterface flatFile) {
return flatFile.commitAsync();
}

private void constructIndexFile(long topicId, GroupCommitContext groupCommitContext) {
public void constructIndexFile(long topicId, GroupCommitContext groupCommitContext) {
MessageStoreExecutor.getInstance().bufferCommitExecutor.submit(() -> {
if (storeConfig.isMessageIndexEnable()) {
try {
Expand Down Expand Up @@ -355,7 +361,7 @@ public void constructIndexFile0(long topicId, DispatchRequest request) {
request.getCommitLogOffset(), request.getMsgSize(), request.getStoreTimestamp());
}

private void releaseClosedPendingGroupCommit() {
public void releaseClosedPendingGroupCommit() {
Iterator<Map.Entry<FlatFileInterface, GroupCommitContext>> iterator = failedGroupCommitMap.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<FlatFileInterface, GroupCommitContext> entry = iterator.next();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.rocketmq.tieredstore.file;

import com.alibaba.fastjson.JSON;
import com.google.common.annotations.VisibleForTesting;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -125,6 +126,11 @@ public Lock getFileLock() {
return this.fileLock;
}

@VisibleForTesting
public Semaphore getCommitLock() {
return commitLock;
}

@Override
public boolean rollingFile(long interval) {
return this.commitLog.tryRollingFile(interval);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.tieredstore.common;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import org.apache.rocketmq.store.DispatchRequest;
import org.apache.rocketmq.store.SelectMappedBufferResult;
import org.junit.Assert;
import org.junit.Test;

public class GroupCommitContextTest {

@Test
public void groupCommitContextTest() {
GroupCommitContext releaseGroupCommitContext = new GroupCommitContext();
releaseGroupCommitContext.release();

long endOffset = 1000;
List<DispatchRequest> dispatchRequestList = new ArrayList<>();
dispatchRequestList.add(new DispatchRequest(1000));
List<SelectMappedBufferResult> selectMappedBufferResultList = new ArrayList<>();
selectMappedBufferResultList.add(new SelectMappedBufferResult(100, ByteBuffer.allocate(10), 1000, null));
GroupCommitContext groupCommitContext = new GroupCommitContext();
groupCommitContext.setEndOffset(endOffset);
groupCommitContext.setBufferList(selectMappedBufferResultList);
groupCommitContext.setDispatchRequests(dispatchRequestList);

Assert.assertTrue(groupCommitContext.getEndOffset() == endOffset);
Assert.assertTrue(groupCommitContext.getBufferList().equals(selectMappedBufferResultList));
Assert.assertTrue(groupCommitContext.getDispatchRequests().equals(dispatchRequestList));
groupCommitContext.release();
Assert.assertTrue(groupCommitContext.getDispatchRequests() == null);
Assert.assertTrue(groupCommitContext.getBufferList() == null);
Assert.assertTrue(dispatchRequestList.isEmpty());
Assert.assertTrue(selectMappedBufferResultList.isEmpty());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.rocketmq.tieredstore.MessageStoreConfig;
import org.apache.rocketmq.tieredstore.MessageStoreExecutor;
import org.apache.rocketmq.tieredstore.TieredMessageStore;
import org.apache.rocketmq.tieredstore.common.GroupCommitContext;
import org.apache.rocketmq.tieredstore.file.FlatFileFactory;
import org.apache.rocketmq.tieredstore.file.FlatFileStore;
import org.apache.rocketmq.tieredstore.file.FlatMessageFile;
Expand Down Expand Up @@ -157,6 +158,130 @@ public void dispatchFromCommitLogTest() throws Exception {
Assert.assertEquals(200L, flatFile.getConsumeQueueCommitOffset());
}

@Test
public void dispatchCommitFailedTest() throws Exception {
MessageStore defaultStore = Mockito.mock(MessageStore.class);
Mockito.when(defaultStore.getMinOffsetInQueue(anyString(), anyInt())).thenReturn(100L);
Mockito.when(defaultStore.getMaxOffsetInQueue(anyString(), anyInt())).thenReturn(200L);

messageStore = Mockito.mock(TieredMessageStore.class);
IndexService indexService =
new IndexStoreService(new FlatFileFactory(metadataStore, storeConfig), storePath);
indexService.start();
Mockito.when(messageStore.getDefaultStore()).thenReturn(defaultStore);
Mockito.when(messageStore.getStoreConfig()).thenReturn(storeConfig);
Mockito.when(messageStore.getStoreExecutor()).thenReturn(executor);
Mockito.when(messageStore.getFlatFileStore()).thenReturn(fileStore);
Mockito.when(messageStore.getIndexService()).thenReturn(indexService);

// mock message
ByteBuffer buffer = MessageFormatUtilTest.buildMockedMessageBuffer();
MessageExt messageExt = MessageDecoder.decode(buffer);
messageExt.setKeys("Key");
MessageAccessor.putProperty(
messageExt, MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX, "uk");
messageExt.setBody(new byte[10]);
messageExt.setStoreSize(0);
buffer = ByteBuffer.wrap(MessageDecoder.encode(messageExt, false));
buffer.putInt(0, buffer.remaining());

DispatchRequest request = new DispatchRequest(mq.getTopic(), mq.getQueueId(),
MessageFormatUtil.getCommitLogOffset(buffer), buffer.remaining(), 0L,
MessageFormatUtil.getStoreTimeStamp(buffer), 0L,
"", "", 0, 0L, new HashMap<>());

// construct flat file
MessageStoreDispatcher dispatcher = new MessageStoreDispatcherImpl(messageStore);
dispatcher.dispatch(request);
FlatMessageFile flatFile = fileStore.getFlatFile(mq);
Assert.assertNotNull(flatFile);

// init offset
dispatcher.doScheduleDispatch(flatFile, true).join();
Assert.assertEquals(100L, flatFile.getConsumeQueueMinOffset());
Assert.assertEquals(100L, flatFile.getConsumeQueueMaxOffset());
Assert.assertEquals(100L, flatFile.getConsumeQueueCommitOffset());

ConsumeQueueInterface cq = Mockito.mock(ConsumeQueueInterface.class);
Mockito.when(defaultStore.getConsumeQueue(anyString(), anyInt())).thenReturn(cq);
Mockito.when(cq.get(anyLong())).thenReturn(
new CqUnit(100, 1000, buffer.remaining(), 0L));
Mockito.when(defaultStore.selectOneMessageByOffset(anyLong(), anyInt())).thenReturn(
new SelectMappedBufferResult(0L, buffer.asReadOnlyBuffer(), buffer.remaining(), null));
flatFile.getCommitLock().drainPermits();
dispatcher.doScheduleDispatch(flatFile, true).join();
GroupCommitContext groupCommitContext = ((MessageStoreDispatcherImpl)dispatcher).getFailedGroupCommitMap().get(flatFile);
Assert.assertTrue(groupCommitContext != null);
Assert.assertTrue(groupCommitContext.getEndOffset() == 200);
flatFile.getCommitLock().release();
flatFile.commitAsync().join();
dispatcher.doScheduleDispatch(flatFile, true).join();
Assert.assertTrue(((MessageStoreDispatcherImpl)dispatcher).getFailedGroupCommitMap().get(flatFile) == null);
((MessageStoreDispatcherImpl)dispatcher).flatFileStore.destroyFile(mq);
((MessageStoreDispatcherImpl)dispatcher).releaseClosedPendingGroupCommit();

}

@Test
public void dispatchFailedGroupCommitMapReleaseTest() throws Exception {
MessageStore defaultStore = Mockito.mock(MessageStore.class);
Mockito.when(defaultStore.getMinOffsetInQueue(anyString(), anyInt())).thenReturn(100L);
Mockito.when(defaultStore.getMaxOffsetInQueue(anyString(), anyInt())).thenReturn(200L);

messageStore = Mockito.mock(TieredMessageStore.class);
IndexService indexService =
new IndexStoreService(new FlatFileFactory(metadataStore, storeConfig), storePath);
indexService.start();
Mockito.when(messageStore.getDefaultStore()).thenReturn(defaultStore);
Mockito.when(messageStore.getStoreConfig()).thenReturn(storeConfig);
Mockito.when(messageStore.getStoreExecutor()).thenReturn(executor);
Mockito.when(messageStore.getFlatFileStore()).thenReturn(fileStore);
Mockito.when(messageStore.getIndexService()).thenReturn(indexService);

// mock message
ByteBuffer buffer = MessageFormatUtilTest.buildMockedMessageBuffer();
MessageExt messageExt = MessageDecoder.decode(buffer);
messageExt.setKeys("Key");
MessageAccessor.putProperty(
messageExt, MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX, "uk");
messageExt.setBody(new byte[10]);
messageExt.setStoreSize(0);
buffer = ByteBuffer.wrap(MessageDecoder.encode(messageExt, false));
buffer.putInt(0, buffer.remaining());

DispatchRequest request = new DispatchRequest(mq.getTopic(), mq.getQueueId(),
MessageFormatUtil.getCommitLogOffset(buffer), buffer.remaining(), 0L,
MessageFormatUtil.getStoreTimeStamp(buffer), 0L,
"", "", 0, 0L, new HashMap<>());

// construct flat file
MessageStoreDispatcher dispatcher = new MessageStoreDispatcherImpl(messageStore);
dispatcher.dispatch(request);
FlatMessageFile flatFile = fileStore.getFlatFile(mq);
Assert.assertNotNull(flatFile);

// init offset
dispatcher.doScheduleDispatch(flatFile, true).join();
Assert.assertEquals(100L, flatFile.getConsumeQueueMinOffset());
Assert.assertEquals(100L, flatFile.getConsumeQueueMaxOffset());
Assert.assertEquals(100L, flatFile.getConsumeQueueCommitOffset());

ConsumeQueueInterface cq = Mockito.mock(ConsumeQueueInterface.class);
Mockito.when(defaultStore.getConsumeQueue(anyString(), anyInt())).thenReturn(cq);
Mockito.when(cq.get(anyLong())).thenReturn(
new CqUnit(100, 1000, buffer.remaining(), 0L));
Mockito.when(defaultStore.selectOneMessageByOffset(anyLong(), anyInt())).thenReturn(
new SelectMappedBufferResult(0L, buffer.asReadOnlyBuffer(), buffer.remaining(), null));
flatFile.getCommitLock().drainPermits();
dispatcher.doScheduleDispatch(flatFile, true).join();
GroupCommitContext groupCommitContext = ((MessageStoreDispatcherImpl)dispatcher).getFailedGroupCommitMap().get(flatFile);
Assert.assertTrue(groupCommitContext != null);
((MessageStoreDispatcherImpl)dispatcher).flatFileStore.destroyFile(mq);
((MessageStoreDispatcherImpl)dispatcher).releaseClosedPendingGroupCommit();
Assert.assertTrue(((MessageStoreDispatcherImpl)dispatcher).getFailedGroupCommitMap().get(flatFile) == null);

}

@Test
public void dispatchServiceTest() {
MessageStore defaultStore = Mockito.mock(MessageStore.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,4 +216,12 @@ public void testBinarySearchInQueueByTime() {

flatFile.destroy();
}

@Test
public void testCommitLock() {
String topic = "CommitLogTest";
FlatMessageFile flatFile = new FlatMessageFile(flatFileFactory, topic, 0);
flatFile.getCommitLock().drainPermits();
Assert.assertFalse(flatFile.commitAsync().join());
}
}

0 comments on commit b675339

Please sign in to comment.