Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test: migrate client module api tests to junit 5 #4376

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,6 @@
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.LD_INDEX_SCOPE;
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.LD_LEDGER_SCOPE;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.Unpooled;
Expand Down Expand Up @@ -55,6 +52,9 @@
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import org.apache.bookkeeper.bookie.BookieException.DiskPartitionDuplicationException;
import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
import org.apache.bookkeeper.bookie.Journal.JournalScanner;
Expand Down Expand Up @@ -626,7 +626,9 @@ public void process(int journalVersion, long offset, ByteBuffer recBuff) throws
* @throws IOException
*/
private void replay(Journal journal, JournalScanner scanner) throws IOException {
//从文件进行Journal磁盘文件进行恢复
final LogMark markedLog = journal.getLastLogMark().getCurMark();
//列出所有符合要求的Journal标识,markedLog是上次标记的点,也就是恢复从标记点后写入的Journal文件
List<Long> logs = Journal.listJournalIds(journal.getJournalDirectory(), journalId ->
journalId >= markedLog.getLogFileId());
// last log mark may be missed due to no sync up before
Expand All @@ -647,10 +649,12 @@ private void replay(Journal journal, JournalScanner scanner) throws IOException
logPosition = markedLog.getLogFileOffset();
}
LOG.info("Replaying journal {} from position {}", id, logPosition);
//获取当前Journal文件所写到的地方
long scanOffset = journal.scanJournal(id, logPosition, scanner, conf.isSkipReplayJournalInvalidRecord());
// Update LastLogMark after completely replaying journal
// scanOffset will point to EOF position
// After LedgerStorage flush, SyncThread should persist this to disk
//更新最新日志的标识
journal.setLastLogMark(id, scanOffset);
}
}
Expand All @@ -670,6 +674,7 @@ public synchronized void start() {

// replay journals
try {
//恢复Journal
readJournal();
} catch (IOException | BookieException ioe) {
LOG.error("Exception while replaying journals, shutting down", ioe);
Expand All @@ -679,6 +684,7 @@ public synchronized void start() {

// Do a fully flush after journal replay
try {
//在Journal恢复后,进行一次刷新动作
syncThread.requestFlush().get();
} catch (InterruptedException e) {
LOG.warn("Interrupting the fully flush after replaying journals : ", e);
Expand Down Expand Up @@ -717,9 +723,11 @@ public synchronized void start() {
* checkpoint which reduce the chance that we need to replay journals
* again if bookie restarted again before finished journal replays.
*/
//启动同步线程,这样可以定期做检查点,否则再次重启的话又要重新进行恢复
syncThread.start();

// start bookie thread
//启动bookie线程,这个线程没有什么用
bookieThread.start();

// After successful bookie startup, register listener for disk
Expand All @@ -729,11 +737,13 @@ public synchronized void start() {
indexDirsManager.addLedgerDirsListener(getLedgerDirsListener());
}

//启动Ledger存储
ledgerStorage.start();

// check the bookie status to start with, and set running.
// since bookie server use running as a flag to tell bookie server whether it is alive
// if setting it in bookie thread, the watcher might run before bookie thread.
//初始化状态机
stateManager.initState();

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

/**
* Interface to communicate checkpoint progress.
* 获取检查点进度的接口
*/
public interface CheckpointSource {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,15 @@
* When asked to flush, current EntrySkipList is moved to snapshot and is cleared.
* We continue to serve edits out of new EntrySkipList and backing snapshot until
* flusher reports in that the flush succeeded. At that point we let the snapshot go.
*
* EntryMemTable中保存还没被flush的entries数据,在被要求flush时,EntrySkipList会被移到快照并被释放空间
* 此时会继续在新的EntrySkipList提供添加数据服务,知道数据被刷盘成功,快照才会被释放
*/
public class EntryMemTable implements AutoCloseable{
private static Logger logger = LoggerFactory.getLogger(EntryMemTable.class);
/**
* Entry skip list.
* EntryMemTable通过跳表进行数据的存储
*/
static class EntrySkipList extends ConcurrentSkipListMap<EntryKey, EntryKeyValue> {
final Checkpoint cp;
Expand Down Expand Up @@ -85,20 +89,25 @@ public boolean equals(Object o) {
}
}

//存数据的跳表
volatile EntrySkipList kvmap;

// Snapshot of EntryMemTable. Made for flusher.
//存镜像的跳表
volatile EntrySkipList snapshot;

final ServerConfiguration conf;

final CheckpointSource checkpointSource;

final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();

// Used to track own data size
final AtomicLong size;

//跳表大小限制
final long skipListSizeLimit;

final Semaphore skipListSemaphore;

SkipListArena allocator;
Expand Down Expand Up @@ -159,6 +168,7 @@ Checkpoint snapshot() throws IOException {
* Snapshot current EntryMemTable. if given <i>oldCp</i> is older than current checkpoint,
* we don't do any snapshot. If snapshot happened, we return the checkpoint of the snapshot.
*
* 针对当前EntryMemTable组作镜像
* @param oldCp
* checkpoint
* @return checkpoint of the snapshot, null means no snapshot
Expand Down Expand Up @@ -300,6 +310,7 @@ public long addEntry(long ledgerId, long entryId, final ByteBuffer entry, final
boolean success = false;
try {
if (isSizeLimitReached() || (!previousFlushSucceeded.get())) {
//如果当前跳表大小已经写满了,则触发一次写镜像,释放空间用于新的数据写入
Checkpoint cp = snapshot();
if ((null != cp) || (!previousFlushSucceeded.get())) {
cb.onSizeLimitReached(cp);
Expand All @@ -317,7 +328,9 @@ public long addEntry(long ledgerId, long entryId, final ByteBuffer entry, final

this.lock.readLock().lock();
try {
//封装下要写入的数据
EntryKeyValue toAdd = cloneWithAllocator(ledgerId, entryId, entry);
//进行数据写入
size = internalAdd(toAdd);
if (size == 0) {
skipListSemaphore.release(len);
Expand Down Expand Up @@ -345,6 +358,7 @@ public long addEntry(long ledgerId, long entryId, final ByteBuffer entry, final
*/
private long internalAdd(final EntryKeyValue toAdd) throws IOException {
long sizeChange = 0;
//将新写的数据,写到跳表中
if (kvmap.putIfAbsent(toAdd, toAdd) == null) {
sizeChange = toAdd.getLength();
size.addAndGet(sizeChange);
Expand Down
Loading