Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #8976] Modify file segment construct method #8977

Merged
merged 1 commit into from
Nov 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@

package org.apache.rocketmq.tieredstore.file;

import com.google.common.annotations.VisibleForTesting;
import org.apache.rocketmq.tieredstore.MessageStoreConfig;
import org.apache.rocketmq.tieredstore.MessageStoreExecutor;
import org.apache.rocketmq.tieredstore.common.FileSegmentType;
import org.apache.rocketmq.tieredstore.metadata.MetadataStore;
import org.apache.rocketmq.tieredstore.provider.FileSegmentFactory;
Expand All @@ -28,10 +30,19 @@ public class FlatFileFactory {
private final MessageStoreConfig storeConfig;
private final FileSegmentFactory fileSegmentFactory;

@VisibleForTesting
public FlatFileFactory(MetadataStore metadataStore, MessageStoreConfig storeConfig) {
this.metadataStore = metadataStore;
this.storeConfig = storeConfig;
this.fileSegmentFactory = new FileSegmentFactory(metadataStore, storeConfig);
this.fileSegmentFactory = new FileSegmentFactory(metadataStore, storeConfig, new MessageStoreExecutor());
}

public FlatFileFactory(MetadataStore metadataStore,
MessageStoreConfig storeConfig, MessageStoreExecutor executor) {

this.metadataStore = metadataStore;
this.storeConfig = storeConfig;
this.fileSegmentFactory = new FileSegmentFactory(metadataStore, storeConfig, executor);
}

public MessageStoreConfig getStoreConfig() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public FlatFileStore(MessageStoreConfig storeConfig, MetadataStore metadataStore
this.storeConfig = storeConfig;
this.metadataStore = metadataStore;
this.executor = executor;
this.flatFileFactory = new FlatFileFactory(metadataStore, storeConfig);
this.flatFileFactory = new FlatFileFactory(metadataStore, storeConfig, executor);
this.flatFileConcurrentMap = new ConcurrentHashMap<>();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.concurrent.Semaphore;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.rocketmq.tieredstore.MessageStoreConfig;
import org.apache.rocketmq.tieredstore.MessageStoreExecutor;
import org.apache.rocketmq.tieredstore.common.AppendResult;
import org.apache.rocketmq.tieredstore.common.FileSegmentType;
import org.apache.rocketmq.tieredstore.exception.TieredStoreErrorCode;
Expand All @@ -45,6 +46,7 @@ public abstract class FileSegment implements Comparable<FileSegment>, FileSegmen
protected final MessageStoreConfig storeConfig;

protected final long maxSize;
protected final MessageStoreExecutor executor;
protected final ReentrantLock fileLock = new ReentrantLock();
protected final Semaphore commitLock = new Semaphore(1);

Expand All @@ -58,13 +60,14 @@ public abstract class FileSegment implements Comparable<FileSegment>, FileSegmen
protected volatile FileSegmentInputStream fileSegmentInputStream;
protected volatile CompletableFuture<Boolean> flightCommitRequest;

public FileSegment(MessageStoreConfig storeConfig,
FileSegmentType fileType, String filePath, long baseOffset) {
public FileSegment(MessageStoreConfig storeConfig, FileSegmentType fileType,
String filePath, long baseOffset, MessageStoreExecutor executor) {

this.storeConfig = storeConfig;
this.fileType = fileType;
this.filePath = filePath;
this.baseOffset = baseOffset;
this.executor = executor;
this.maxSize = this.getMaxSizeByFileType();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,28 @@

import java.lang.reflect.Constructor;
import org.apache.rocketmq.tieredstore.MessageStoreConfig;
import org.apache.rocketmq.tieredstore.MessageStoreExecutor;
import org.apache.rocketmq.tieredstore.common.FileSegmentType;
import org.apache.rocketmq.tieredstore.metadata.MetadataStore;

public class FileSegmentFactory {

private final MetadataStore metadataStore;
private final MessageStoreConfig storeConfig;
private final MessageStoreExecutor executor;
private final Constructor<? extends FileSegment> fileSegmentConstructor;

public FileSegmentFactory(MetadataStore metadataStore, MessageStoreConfig storeConfig) {
public FileSegmentFactory(MetadataStore metadataStore,
MessageStoreConfig storeConfig, MessageStoreExecutor executor) {

try {
this.storeConfig = storeConfig;
this.metadataStore = metadataStore;
this.executor = executor;
Class<? extends FileSegment> clazz =
Class.forName(storeConfig.getTieredBackendServiceProvider()).asSubclass(FileSegment.class);
fileSegmentConstructor = clazz.getConstructor(
MessageStoreConfig.class, FileSegmentType.class, String.class, Long.TYPE);
MessageStoreConfig.class, FileSegmentType.class, String.class, Long.TYPE, MessageStoreExecutor.class);
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand All @@ -51,7 +56,7 @@ public MessageStoreConfig getStoreConfig() {

public FileSegment createSegment(FileSegmentType fileType, String filePath, long baseOffset) {
try {
return fileSegmentConstructor.newInstance(this.storeConfig, fileType, filePath, baseOffset);
return fileSegmentConstructor.newInstance(this.storeConfig, fileType, filePath, baseOffset, executor);
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
import org.apache.rocketmq.tieredstore.MessageStoreConfig;
import org.apache.rocketmq.tieredstore.MessageStoreExecutor;
import org.apache.rocketmq.tieredstore.common.FileSegmentType;
import org.apache.rocketmq.tieredstore.stream.FileSegmentInputStream;
import org.apache.rocketmq.tieredstore.util.MessageStoreUtil;
Expand All @@ -35,9 +36,9 @@ public class MemoryFileSegment extends FileSegment {
protected boolean checkSize = true;

public MemoryFileSegment(MessageStoreConfig storeConfig,
FileSegmentType fileType, String filePath, long baseOffset) {
FileSegmentType fileType, String filePath, long baseOffset, MessageStoreExecutor executor) {

super(storeConfig, fileType, filePath, baseOffset);
super(storeConfig, fileType, filePath, baseOffset, executor);
memStore = ByteBuffer.allocate(10000);
memStore.position((int) getSize());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.rocketmq.tieredstore.provider;

import com.google.common.base.Stopwatch;
import com.google.common.base.Supplier;
import com.google.common.io.ByteStreams;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.common.AttributesBuilder;
Expand All @@ -30,6 +31,7 @@
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.tieredstore.MessageStoreConfig;
import org.apache.rocketmq.tieredstore.MessageStoreExecutor;
import org.apache.rocketmq.tieredstore.common.FileSegmentType;
import org.apache.rocketmq.tieredstore.metrics.TieredStoreMetricsManager;
import org.apache.rocketmq.tieredstore.stream.FileSegmentInputStream;
Expand Down Expand Up @@ -58,9 +60,9 @@ public class PosixFileSegment extends FileSegment {
private volatile FileChannel writeFileChannel;

public PosixFileSegment(MessageStoreConfig storeConfig,
FileSegmentType fileType, String filePath, long baseOffset) {
FileSegmentType fileType, String filePath, long baseOffset, MessageStoreExecutor executor) {

super(storeConfig, fileType, filePath, baseOffset);
super(storeConfig, fileType, filePath, baseOffset, executor);

// basePath
String basePath = StringUtils.defaultString(storeConfig.getTieredStoreFilePath(),
Expand Down Expand Up @@ -168,32 +170,30 @@ public CompletableFuture<ByteBuffer> read0(long position, int length) {
AttributesBuilder attributesBuilder = newAttributesBuilder()
.put(LABEL_OPERATION, OPERATION_POSIX_READ);

CompletableFuture<ByteBuffer> future = new CompletableFuture<>();
ByteBuffer byteBuffer = ByteBuffer.allocate(length);
try {
readFileChannel.position(position);
readFileChannel.read(byteBuffer);
byteBuffer.flip();
byteBuffer.limit(length);

attributesBuilder.put(LABEL_SUCCESS, true);
long costTime = stopwatch.stop().elapsed(TimeUnit.MILLISECONDS);
TieredStoreMetricsManager.providerRpcLatency.record(costTime, attributesBuilder.build());

Attributes metricsAttributes = newAttributesBuilder()
.put(LABEL_OPERATION, OPERATION_POSIX_READ)
.build();
int downloadedBytes = byteBuffer.remaining();
TieredStoreMetricsManager.downloadBytes.record(downloadedBytes, metricsAttributes);

future.complete(byteBuffer);
} catch (IOException e) {
long costTime = stopwatch.stop().elapsed(TimeUnit.MILLISECONDS);
attributesBuilder.put(LABEL_SUCCESS, false);
TieredStoreMetricsManager.providerRpcLatency.record(costTime, attributesBuilder.build());
future.completeExceptionally(e);
}
return future;
return CompletableFuture.supplyAsync((Supplier<ByteBuffer>) () -> {
ByteBuffer byteBuffer = ByteBuffer.allocate(length);
try {
readFileChannel.position(position);
readFileChannel.read(byteBuffer);
byteBuffer.flip();
byteBuffer.limit(length);

attributesBuilder.put(LABEL_SUCCESS, true);
long costTime = stopwatch.stop().elapsed(TimeUnit.MILLISECONDS);
TieredStoreMetricsManager.providerRpcLatency.record(costTime, attributesBuilder.build());

Attributes metricsAttributes = newAttributesBuilder()
.put(LABEL_OPERATION, OPERATION_POSIX_READ)
.build();
int downloadedBytes = byteBuffer.remaining();
TieredStoreMetricsManager.downloadBytes.record(downloadedBytes, metricsAttributes);
} catch (IOException e) {
long costTime = stopwatch.stop().elapsed(TimeUnit.MILLISECONDS);
attributesBuilder.put(LABEL_SUCCESS, false);
TieredStoreMetricsManager.providerRpcLatency.record(costTime, attributesBuilder.build());
}
return byteBuffer;
}, executor.bufferFetchExecutor);
}

@Override
Expand Down Expand Up @@ -229,6 +229,6 @@ public CompletableFuture<Boolean> commit0(
return false;
}
return true;
});
}, executor.bufferCommitExecutor);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.concurrent.Executors;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.tieredstore.MessageStoreConfig;
import org.apache.rocketmq.tieredstore.MessageStoreExecutor;
import org.apache.rocketmq.tieredstore.common.AppendResult;
import org.apache.rocketmq.tieredstore.common.FileSegmentType;
import org.apache.rocketmq.tieredstore.provider.FileSegment;
Expand Down Expand Up @@ -219,7 +220,7 @@ public void doCompactionTest() {

ByteBuffer byteBuffer = indexStoreFile.doCompaction();
FileSegment fileSegment = new PosixFileSegment(
storeConfig, FileSegmentType.INDEX, filePath, 0L);
storeConfig, FileSegmentType.INDEX, filePath, 0L, new MessageStoreExecutor());
fileSegment.append(byteBuffer, timestamp);
fileSegment.commitAsync().join();
Assert.assertEquals(byteBuffer.limit(), fileSegment.getSize());
Expand Down Expand Up @@ -252,7 +253,7 @@ public void queryAsyncFromSegmentFileTest() throws ExecutionException, Interrupt

ByteBuffer byteBuffer = indexStoreFile.doCompaction();
FileSegment fileSegment = new PosixFileSegment(
storeConfig, FileSegmentType.INDEX, filePath, 0L);
storeConfig, FileSegmentType.INDEX, filePath, 0L, new MessageStoreExecutor());
fileSegment.append(byteBuffer, timestamp);
fileSegment.commitAsync().join();
Assert.assertEquals(byteBuffer.limit(), fileSegment.getSize());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.rocketmq.tieredstore.provider;

import org.apache.rocketmq.tieredstore.MessageStoreConfig;
import org.apache.rocketmq.tieredstore.MessageStoreExecutor;
import org.apache.rocketmq.tieredstore.common.FileSegmentType;
import org.apache.rocketmq.tieredstore.metadata.DefaultMetadataStore;
import org.apache.rocketmq.tieredstore.metadata.MetadataStore;
Expand All @@ -34,9 +35,10 @@ public void fileSegmentInstanceTest() throws ClassNotFoundException, NoSuchMetho
MessageStoreConfig storeConfig = new MessageStoreConfig();
storeConfig.setTieredStoreCommitLogMaxSize(1024);
storeConfig.setTieredStoreFilePath(storePath);
MessageStoreExecutor executor = new MessageStoreExecutor();

MetadataStore metadataStore = new DefaultMetadataStore(storeConfig);
FileSegmentFactory factory = new FileSegmentFactory(metadataStore, storeConfig);
FileSegmentFactory factory = new FileSegmentFactory(metadataStore, storeConfig, executor);

Assert.assertEquals(metadataStore, factory.getMetadataStore());
Assert.assertEquals(storeConfig, factory.getStoreConfig());
Expand All @@ -60,7 +62,9 @@ public void fileSegmentInstanceTest() throws ClassNotFoundException, NoSuchMetho
() -> factory.createSegment(null, null, 0L));
storeConfig.setTieredBackendServiceProvider(null);
Assert.assertThrows(RuntimeException.class,
() -> new FileSegmentFactory(metadataStore, storeConfig));
() -> new FileSegmentFactory(metadataStore, storeConfig, executor));

executor.shutdown();
MessageStoreUtilTest.deleteStoreDirectory(storePath);
}
}
Loading
Loading