Skip to content

Commit

Permalink
refactor cursor read entry process to fix dead loop in read entry pro…
Browse files Browse the repository at this point in the history
…cess
  • Loading branch information
fanjianye committed Jun 19, 2024
1 parent fa74538 commit 2760f43
Show file tree
Hide file tree
Showing 9 changed files with 733 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -696,4 +696,6 @@ default void skipNonRecoverableLedger(long ledgerId){}
* Check if managed ledger should cache backlog reads.
*/
void checkCursorsToCacheEntries();

void updateMaxReadPosition(Position position);
}
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,12 @@ public class ManagedCursorImpl implements ManagedCursor {
@SuppressWarnings("unused")
private volatile OpReadEntry waitingReadOp = null;

protected static final AtomicReferenceFieldUpdater<ManagedCursorImpl, OpReadEntry>
WAITING_READ_OP_UPDATER_BY_MAX_READ_POSITION = AtomicReferenceFieldUpdater
.newUpdater(ManagedCursorImpl.class, OpReadEntry.class, "waitingReadOpByMaxReadPosition");
@SuppressWarnings("unused")
private volatile OpReadEntry waitingReadOpByMaxReadPosition = null;

public static final int FALSE = 0;
public static final int TRUE = 1;
private static final AtomicIntegerFieldUpdater<ManagedCursorImpl> RESET_CURSOR_IN_PROGRESS_UPDATER =
Expand Down Expand Up @@ -942,13 +948,45 @@ public void asyncReadEntriesWithSkipOrWait(int maxEntries, long maxSizeBytes, Re

int numberOfEntriesToRead = applyMaxSizeCap(maxEntries, maxSizeBytes);

if (hasMoreEntries()) {
boolean hasMoreEntries = hasMoreEntries();
boolean hasMoreEntriesByMaxReadPosition = hasMoreEntriesByMaxReadPosition();
if (hasMoreEntries && hasMoreEntriesByMaxReadPosition) {
// If we have available entries, we can read them immediately
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Read entries immediately", ledger.getName(), name);
}
asyncReadEntriesWithSkip(numberOfEntriesToRead, NO_MAX_SIZE_LIMIT, callback, ctx,
maxPosition, skipCondition);
} else if (hasMoreEntries) {
// We have available entries, but we can't read them immediately
// because readPosition > maxReadPosition

// Skip deleted entries.
skipCondition = skipCondition == null ? this::isMessageDeleted : skipCondition.or(this::isMessageDeleted);
OpReadEntry op = OpReadEntry.create(this, readPosition, numberOfEntriesToRead, callback,
ctx, maxPosition, skipCondition);

if (!WAITING_READ_OP_UPDATER_BY_MAX_READ_POSITION.compareAndSet(this, null, op)) {
op.recycle();
callback.readEntriesFailed(new ManagedLedgerException.ConcurrentWaitCallbackException(), ctx);
return;
}

if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Deferring retry of read at readPosition {} maxPosition {}",
ledger.getName(), name, op.readPosition, op.maxPosition);
}

// Check again for new entries after the configured time, then if still no entries are available register
// to be notified
if (getConfig().getNewEntriesCheckDelayInMillis() > 0) {
ledger.getScheduledExecutor()
.schedule(() -> checkForNewEntriesByMaxReadPosition(op, callback, ctx),
getConfig().getNewEntriesCheckDelayInMillis(), TimeUnit.MILLISECONDS);
} else {
// If there's no delay, check directly from the same thread
checkForNewEntriesByMaxReadPosition(op, callback, ctx);
}
} else {
// Skip deleted entries.
skipCondition = skipCondition == null ? this::isMessageDeleted : skipCondition.or(this::isMessageDeleted);
Expand Down Expand Up @@ -1033,6 +1071,63 @@ private void checkForNewEntries(OpReadEntry op, ReadEntriesCallback callback, Ob
}
}

private void checkForNewEntriesByMaxReadPosition(OpReadEntry op, ReadEntriesCallback callback, Object ctx) {
try {
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Re-trying the read at readPosition {} maxPosition {}",
ledger.getName(), name, op.readPosition, op.maxPosition);
}

if (isClosed()) {
callback.readEntriesFailed(new CursorAlreadyClosedException("Cursor was already closed"), ctx);
return;
}

if (!hasMoreEntriesByMaxReadPosition()) {
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Still no entries available by maxReadPosition. Register for notification",
ledger.getName(), name);
}
ledger.addWaitingCursorByMaxReadPosition(this);
} else {
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Skip notification registering since we do have entries "
+ "available by maxReadPosition",
ledger.getName(), name);
}
}

// Check again the entries count, since maxReadPosition may be changed between the time we
// checked and the time we've asked to be notified by managed ledger
if (hasMoreEntriesByMaxReadPosition()) {
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Found more entries by maxReadPosition", ledger.getName(), name);
}
// Try to cancel the notification request
if (WAITING_READ_OP_UPDATER_BY_MAX_READ_POSITION.compareAndSet(this, op, null)) {
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Cancelled notification by maxReadPosition and scheduled read at {}",
ledger.getName(), name, op.readPosition);
}
PENDING_READ_OPS_UPDATER.incrementAndGet(this);
ledger.asyncReadEntries(op);
} else {
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] notification by maxReadPosition was already cancelled",
ledger.getName(), name);
}
}
} else if (ledger.isTerminated()) {
// At this point we registered for notification and still there were no more available
// entries by maxReadPosition.
// If the managed ledger was indeed terminated, we need to notify the cursor
callback.readEntriesFailed(new NoMoreEntriesToReadException("Topic was terminated"), ctx);
}
} catch (Throwable t) {
callback.readEntriesFailed(new ManagedLedgerException(t), ctx);
}
}

@Override
public boolean isClosed() {
return state == State.Closed || state == State.Closing;
Expand All @@ -1044,14 +1139,20 @@ public boolean cancelPendingReadRequest() {
log.debug("[{}] [{}] Cancel pending read request", ledger.getName(), name);
}
final OpReadEntry op = WAITING_READ_OP_UPDATER.getAndSet(this, null);
final OpReadEntry op_by_max_read_position =
WAITING_READ_OP_UPDATER_BY_MAX_READ_POSITION.getAndSet(this, null);
if (op != null) {
op.recycle();
}
return op != null;
if (op_by_max_read_position != null) {
op_by_max_read_position.recycle();
}
return op != null || op_by_max_read_position != null;
}

public boolean hasPendingReadRequest() {
return WAITING_READ_OP_UPDATER.get(this) != null;
return WAITING_READ_OP_UPDATER.get(this) != null ||
WAITING_READ_OP_UPDATER_BY_MAX_READ_POSITION.get(this) != null;
}

@Override
Expand All @@ -1073,6 +1174,22 @@ public boolean hasMoreEntries() {
}
}

private boolean hasMoreEntriesByMaxReadPosition() {
Position maxReadPosition = ledger.getMaxReadPosition();
if (maxReadPosition.getEntryId() != -1) {
return readPosition.compareTo(maxReadPosition) <= 0;
} else {
// Fall back to checking the number of entries to ensure we are at the last entry in ledger and no ledgers
// are in the middle
Position maxReadPositionNext = PositionFactory.create(maxReadPosition.getLedgerId(), 0);
if (readPosition.compareTo(maxReadPositionNext) > 0) {
return false;
} else {
return getNumberOfEntries(Range.closedOpen(readPosition, maxReadPositionNext)) > 0;
}
}
}

@Override
public long getNumberOfEntries() {
if (readPosition.compareTo(ledger.getLastPosition().getNext()) > 0) {
Expand Down Expand Up @@ -3258,6 +3375,8 @@ void notifyEntriesAvailable() {
}

PENDING_READ_OPS_UPDATER.incrementAndGet(this);
// 触发监听的时候,就只修改了原来记录的op里面的readPosition
// op的maxReadPosition没有变,那不就寄了??
opReadEntry.readPosition = getReadPosition();
ledger.asyncReadEntries(opReadEntry);
} else {
Expand All @@ -3268,6 +3387,37 @@ void notifyEntriesAvailable() {
}
}

/**
*
* @return Whether the cursor responded to the notification
*/
void notifyEntriesAvailableByMaxReadPosition() {
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Received ml notification by maxReadPosition", ledger.getName(), name);
}
OpReadEntry opReadEntry = WAITING_READ_OP_UPDATER_BY_MAX_READ_POSITION.getAndSet(this, null);

if (opReadEntry != null) {
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Received notification of maxReadPosition change, reading at {} -- mPos: {}",
ledger.getName(), name, opReadEntry.readPosition, ledger.maxReadPosition);
log.debug("[{}] Consumer {} cursor notification: other counters: consumed {} mdPos {} rdPos {}",
ledger.getName(), name, messagesConsumedCounter, markDeletePosition, readPosition);
}

PENDING_READ_OPS_UPDATER.incrementAndGet(this);
opReadEntry.readPosition = getReadPosition();
opReadEntry.maxPosition = ledger.maxReadPosition;
ledger.asyncReadEntries(opReadEntry);
} else {
// No one is waiting to be notified. Ignore
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Received notification by maxReadPosition but had no pending read operation",
ledger.getName(), name);
}
}
}

void asyncCloseCursorLedger(final AsyncCallbacks.CloseCallback callback, final Object ctx) {
LedgerHandle lh = cursorLedger;
ledger.mbean.startCursorLedgerCloseOp();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,9 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
// Cursors that are waiting to be notified when new entries are persisted
final ConcurrentLinkedQueue<ManagedCursorImpl> waitingCursors;

// Cursors that are waiting to be notified when new maxReadPosition is updated
final ConcurrentLinkedQueue<ManagedCursorImpl> waitingCursorsByMaxReadPosition;

// Objects that are waiting to be notified when new entries are persisted
final ConcurrentLinkedQueue<WaitingEntryCallBack> waitingEntryCallBacks;

Expand Down Expand Up @@ -236,6 +239,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
protected final Supplier<CompletableFuture<Boolean>> mlOwnershipChecker;

volatile Position lastConfirmedEntry;
volatile Position maxReadPosition = PositionFactory.LATEST;

protected ManagedLedgerInterceptor managedLedgerInterceptor;

Expand Down Expand Up @@ -361,6 +365,7 @@ public ManagedLedgerImpl(ManagedLedgerFactoryImpl factory, BookKeeper bookKeeper
}
this.entryCache = factory.getEntryCacheManager().getEntryCache(this);
this.waitingCursors = Queues.newConcurrentLinkedQueue();
this.waitingCursorsByMaxReadPosition = Queues.newConcurrentLinkedQueue();
this.waitingEntryCallBacks = Queues.newConcurrentLinkedQueue();
this.uninitializedCursors = new HashMap();
this.clock = config.getClock();
Expand Down Expand Up @@ -2049,7 +2054,6 @@ public void asyncReadEntry(Position position, ReadEntryCallback callback, Object
}

private void internalReadFromLedger(ReadHandle ledger, OpReadEntry opReadEntry) {

if (opReadEntry.readPosition.compareTo(opReadEntry.maxPosition) > 0) {
opReadEntry.checkReadCompletion();
return;
Expand All @@ -2059,7 +2063,6 @@ private void internalReadFromLedger(ReadHandle ledger, OpReadEntry opReadEntry)
long lastEntryInLedger;

Position lastPosition = lastConfirmedEntry;

if (ledger.getId() == lastPosition.getLedgerId()) {
// For the current ledger, we only give read visibility to the last entry we have received a confirmation in
// the managed ledger layer
Expand Down Expand Up @@ -2407,6 +2410,17 @@ Position startReadOperationOnLedger(Position position) {
return position;
}

void notifyCursorsByMaxReadPositionChanged() {
while (true) {
final ManagedCursorImpl waitingCursor = waitingCursorsByMaxReadPosition.poll();
if (waitingCursor == null) {
break;
}

executor.execute(waitingCursor::notifyEntriesAvailableByMaxReadPosition);
}
}

void notifyCursors() {
while (true) {
final ManagedCursorImpl waitingCursor = waitingCursors.poll();
Expand Down Expand Up @@ -3749,6 +3763,10 @@ Position getLastPosition() {
return lastConfirmedEntry;
}

Position getMaxReadPosition() {
return maxReadPosition;
}

@Override
public ManagedCursor getSlowestConsumer() {
return cursors.getSlowestReader();
Expand Down Expand Up @@ -3823,12 +3841,17 @@ private void deactivateCursorByName(String cursorName) {

public void removeWaitingCursor(ManagedCursor cursor) {
this.waitingCursors.remove(cursor);
this.waitingCursorsByMaxReadPosition.remove(cursor);
}

public void addWaitingCursor(ManagedCursorImpl cursor) {
this.waitingCursors.add(cursor);
}

public void addWaitingCursorByMaxReadPosition(ManagedCursorImpl cursor) {
this.waitingCursorsByMaxReadPosition.add(cursor);
}

public boolean isCursorActive(ManagedCursor cursor) {
return activeCursors.get(cursor.getName()) != null;
}
Expand Down Expand Up @@ -3976,7 +3999,7 @@ public long getLastLedgerCreationFailureTimestamp() {
}

public int getWaitingCursorsCount() {
return waitingCursors.size();
return waitingCursors.size() + waitingCursorsByMaxReadPosition.size();
}

public int getPendingAddEntriesCount() {
Expand Down Expand Up @@ -4565,4 +4588,13 @@ public Position getTheSlowestNonDurationReadPosition() {
}
return theSlowestNonDurableReadPosition;
}

public void updateMaxReadPosition(Position position) {
if (position != null) {
this.maxReadPosition = position;
// When maxReadPosition is updated, can notify the cursor
// waiting for maxReadPosition to update which can be read
this.notifyCursorsByMaxReadPositionChanged();
}
}
}
Loading

0 comments on commit 2760f43

Please sign in to comment.