diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java index 955a0d78502754..b89a7abbe8f49d 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java @@ -696,4 +696,6 @@ default void skipNonRecoverableLedger(long ledgerId){} * Check if managed ledger should cache backlog reads. */ void checkCursorsToCacheEntries(); + + void updateMaxReadPosition(Position position); } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index c0992e48dba8ac..30ca0d41324d4f 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -154,6 +154,12 @@ public class ManagedCursorImpl implements ManagedCursor { @SuppressWarnings("unused") private volatile OpReadEntry waitingReadOp = null; + protected static final AtomicReferenceFieldUpdater + WAITING_READ_OP_UPDATER_BY_MAX_READ_POSITION = AtomicReferenceFieldUpdater + .newUpdater(ManagedCursorImpl.class, OpReadEntry.class, "waitingReadOpByMaxReadPosition"); + @SuppressWarnings("unused") + private volatile OpReadEntry waitingReadOpByMaxReadPosition = null; + public static final int FALSE = 0; public static final int TRUE = 1; private static final AtomicIntegerFieldUpdater RESET_CURSOR_IN_PROGRESS_UPDATER = @@ -942,13 +948,45 @@ public void asyncReadEntriesWithSkipOrWait(int maxEntries, long maxSizeBytes, Re int numberOfEntriesToRead = applyMaxSizeCap(maxEntries, maxSizeBytes); - if (hasMoreEntries()) { + boolean hasMoreEntries = hasMoreEntries(); + boolean hasMoreEntriesByMaxReadPosition = hasMoreEntriesByMaxReadPosition(); + if (hasMoreEntries && hasMoreEntriesByMaxReadPosition) { // If we have available entries, we can read them immediately if (log.isDebugEnabled()) { log.debug("[{}] [{}] Read entries immediately", ledger.getName(), name); } asyncReadEntriesWithSkip(numberOfEntriesToRead, NO_MAX_SIZE_LIMIT, callback, ctx, maxPosition, skipCondition); + } else if (hasMoreEntries) { + // We have available entries, but we can't read them immediately + // because readPosition > maxReadPosition + + // Skip deleted entries. + skipCondition = skipCondition == null ? this::isMessageDeleted : skipCondition.or(this::isMessageDeleted); + OpReadEntry op = OpReadEntry.create(this, readPosition, numberOfEntriesToRead, callback, + ctx, maxPosition, skipCondition); + + if (!WAITING_READ_OP_UPDATER_BY_MAX_READ_POSITION.compareAndSet(this, null, op)) { + op.recycle(); + callback.readEntriesFailed(new ManagedLedgerException.ConcurrentWaitCallbackException(), ctx); + return; + } + + if (log.isDebugEnabled()) { + log.debug("[{}] [{}] Deferring retry of read at readPosition {} maxPosition {}", + ledger.getName(), name, op.readPosition, op.maxPosition); + } + + // Check again for new entries after the configured time, then if still no entries are available register + // to be notified + if (getConfig().getNewEntriesCheckDelayInMillis() > 0) { + ledger.getScheduledExecutor() + .schedule(() -> checkForNewEntriesByMaxReadPosition(op, callback, ctx), + getConfig().getNewEntriesCheckDelayInMillis(), TimeUnit.MILLISECONDS); + } else { + // If there's no delay, check directly from the same thread + checkForNewEntriesByMaxReadPosition(op, callback, ctx); + } } else { // Skip deleted entries. skipCondition = skipCondition == null ? this::isMessageDeleted : skipCondition.or(this::isMessageDeleted); @@ -1033,6 +1071,63 @@ private void checkForNewEntries(OpReadEntry op, ReadEntriesCallback callback, Ob } } + private void checkForNewEntriesByMaxReadPosition(OpReadEntry op, ReadEntriesCallback callback, Object ctx) { + try { + if (log.isDebugEnabled()) { + log.debug("[{}] [{}] Re-trying the read at readPosition {} maxPosition {}", + ledger.getName(), name, op.readPosition, op.maxPosition); + } + + if (isClosed()) { + callback.readEntriesFailed(new CursorAlreadyClosedException("Cursor was already closed"), ctx); + return; + } + + if (!hasMoreEntriesByMaxReadPosition()) { + if (log.isDebugEnabled()) { + log.debug("[{}] [{}] Still no entries available by maxReadPosition. Register for notification", + ledger.getName(), name); + } + ledger.addWaitingCursorByMaxReadPosition(this); + } else { + if (log.isDebugEnabled()) { + log.debug("[{}] [{}] Skip notification registering since we do have entries " + + "available by maxReadPosition", + ledger.getName(), name); + } + } + + // Check again the entries count, since maxReadPosition may be changed between the time we + // checked and the time we've asked to be notified by managed ledger + if (hasMoreEntriesByMaxReadPosition()) { + if (log.isDebugEnabled()) { + log.debug("[{}] [{}] Found more entries by maxReadPosition", ledger.getName(), name); + } + // Try to cancel the notification request + if (WAITING_READ_OP_UPDATER_BY_MAX_READ_POSITION.compareAndSet(this, op, null)) { + if (log.isDebugEnabled()) { + log.debug("[{}] [{}] Cancelled notification by maxReadPosition and scheduled read at {}", + ledger.getName(), name, op.readPosition); + } + PENDING_READ_OPS_UPDATER.incrementAndGet(this); + ledger.asyncReadEntries(op); + } else { + if (log.isDebugEnabled()) { + log.debug("[{}] [{}] notification by maxReadPosition was already cancelled", + ledger.getName(), name); + } + } + } else if (ledger.isTerminated()) { + // At this point we registered for notification and still there were no more available + // entries by maxReadPosition. + // If the managed ledger was indeed terminated, we need to notify the cursor + callback.readEntriesFailed(new NoMoreEntriesToReadException("Topic was terminated"), ctx); + } + } catch (Throwable t) { + callback.readEntriesFailed(new ManagedLedgerException(t), ctx); + } + } + @Override public boolean isClosed() { return state == State.Closed || state == State.Closing; @@ -1044,14 +1139,20 @@ public boolean cancelPendingReadRequest() { log.debug("[{}] [{}] Cancel pending read request", ledger.getName(), name); } final OpReadEntry op = WAITING_READ_OP_UPDATER.getAndSet(this, null); + final OpReadEntry op_by_max_read_position = + WAITING_READ_OP_UPDATER_BY_MAX_READ_POSITION.getAndSet(this, null); if (op != null) { op.recycle(); } - return op != null; + if (op_by_max_read_position != null) { + op_by_max_read_position.recycle(); + } + return op != null || op_by_max_read_position != null; } public boolean hasPendingReadRequest() { - return WAITING_READ_OP_UPDATER.get(this) != null; + return WAITING_READ_OP_UPDATER.get(this) != null || + WAITING_READ_OP_UPDATER_BY_MAX_READ_POSITION.get(this) != null; } @Override @@ -1073,6 +1174,22 @@ public boolean hasMoreEntries() { } } + private boolean hasMoreEntriesByMaxReadPosition() { + Position maxReadPosition = ledger.getMaxReadPosition(); + if (maxReadPosition.getEntryId() != -1) { + return readPosition.compareTo(maxReadPosition) <= 0; + } else { + // Fall back to checking the number of entries to ensure we are at the last entry in ledger and no ledgers + // are in the middle + Position maxReadPositionNext = PositionFactory.create(maxReadPosition.getLedgerId(), 0); + if (readPosition.compareTo(maxReadPositionNext) > 0) { + return false; + } else { + return getNumberOfEntries(Range.closedOpen(readPosition, maxReadPositionNext)) > 0; + } + } + } + @Override public long getNumberOfEntries() { if (readPosition.compareTo(ledger.getLastPosition().getNext()) > 0) { @@ -3258,6 +3375,8 @@ void notifyEntriesAvailable() { } PENDING_READ_OPS_UPDATER.incrementAndGet(this); + // 触发监听的时候,就只修改了原来记录的op里面的readPosition + // op的maxReadPosition没有变,那不就寄了?? opReadEntry.readPosition = getReadPosition(); ledger.asyncReadEntries(opReadEntry); } else { @@ -3268,6 +3387,37 @@ void notifyEntriesAvailable() { } } + /** + * + * @return Whether the cursor responded to the notification + */ + void notifyEntriesAvailableByMaxReadPosition() { + if (log.isDebugEnabled()) { + log.debug("[{}] [{}] Received ml notification by maxReadPosition", ledger.getName(), name); + } + OpReadEntry opReadEntry = WAITING_READ_OP_UPDATER_BY_MAX_READ_POSITION.getAndSet(this, null); + + if (opReadEntry != null) { + if (log.isDebugEnabled()) { + log.debug("[{}] [{}] Received notification of maxReadPosition change, reading at {} -- mPos: {}", + ledger.getName(), name, opReadEntry.readPosition, ledger.maxReadPosition); + log.debug("[{}] Consumer {} cursor notification: other counters: consumed {} mdPos {} rdPos {}", + ledger.getName(), name, messagesConsumedCounter, markDeletePosition, readPosition); + } + + PENDING_READ_OPS_UPDATER.incrementAndGet(this); + opReadEntry.readPosition = getReadPosition(); + opReadEntry.maxPosition = ledger.maxReadPosition; + ledger.asyncReadEntries(opReadEntry); + } else { + // No one is waiting to be notified. Ignore + if (log.isDebugEnabled()) { + log.debug("[{}] [{}] Received notification by maxReadPosition but had no pending read operation", + ledger.getName(), name); + } + } + } + void asyncCloseCursorLedger(final AsyncCallbacks.CloseCallback callback, final Object ctx) { LedgerHandle lh = cursorLedger; ledger.mbean.startCursorLedgerCloseOp(); diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 8d1919dd0529cf..dbf4e85ff02210 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -194,6 +194,9 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { // Cursors that are waiting to be notified when new entries are persisted final ConcurrentLinkedQueue waitingCursors; + // Cursors that are waiting to be notified when new maxReadPosition is updated + final ConcurrentLinkedQueue waitingCursorsByMaxReadPosition; + // Objects that are waiting to be notified when new entries are persisted final ConcurrentLinkedQueue waitingEntryCallBacks; @@ -236,6 +239,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { protected final Supplier> mlOwnershipChecker; volatile Position lastConfirmedEntry; + volatile Position maxReadPosition = PositionFactory.LATEST; protected ManagedLedgerInterceptor managedLedgerInterceptor; @@ -361,6 +365,7 @@ public ManagedLedgerImpl(ManagedLedgerFactoryImpl factory, BookKeeper bookKeeper } this.entryCache = factory.getEntryCacheManager().getEntryCache(this); this.waitingCursors = Queues.newConcurrentLinkedQueue(); + this.waitingCursorsByMaxReadPosition = Queues.newConcurrentLinkedQueue(); this.waitingEntryCallBacks = Queues.newConcurrentLinkedQueue(); this.uninitializedCursors = new HashMap(); this.clock = config.getClock(); @@ -2049,7 +2054,6 @@ public void asyncReadEntry(Position position, ReadEntryCallback callback, Object } private void internalReadFromLedger(ReadHandle ledger, OpReadEntry opReadEntry) { - if (opReadEntry.readPosition.compareTo(opReadEntry.maxPosition) > 0) { opReadEntry.checkReadCompletion(); return; @@ -2059,7 +2063,6 @@ private void internalReadFromLedger(ReadHandle ledger, OpReadEntry opReadEntry) long lastEntryInLedger; Position lastPosition = lastConfirmedEntry; - if (ledger.getId() == lastPosition.getLedgerId()) { // For the current ledger, we only give read visibility to the last entry we have received a confirmation in // the managed ledger layer @@ -2407,6 +2410,17 @@ Position startReadOperationOnLedger(Position position) { return position; } + void notifyCursorsByMaxReadPositionChanged() { + while (true) { + final ManagedCursorImpl waitingCursor = waitingCursorsByMaxReadPosition.poll(); + if (waitingCursor == null) { + break; + } + + executor.execute(waitingCursor::notifyEntriesAvailableByMaxReadPosition); + } + } + void notifyCursors() { while (true) { final ManagedCursorImpl waitingCursor = waitingCursors.poll(); @@ -3749,6 +3763,10 @@ Position getLastPosition() { return lastConfirmedEntry; } + Position getMaxReadPosition() { + return maxReadPosition; + } + @Override public ManagedCursor getSlowestConsumer() { return cursors.getSlowestReader(); @@ -3823,12 +3841,17 @@ private void deactivateCursorByName(String cursorName) { public void removeWaitingCursor(ManagedCursor cursor) { this.waitingCursors.remove(cursor); + this.waitingCursorsByMaxReadPosition.remove(cursor); } public void addWaitingCursor(ManagedCursorImpl cursor) { this.waitingCursors.add(cursor); } + public void addWaitingCursorByMaxReadPosition(ManagedCursorImpl cursor) { + this.waitingCursorsByMaxReadPosition.add(cursor); + } + public boolean isCursorActive(ManagedCursor cursor) { return activeCursors.get(cursor.getName()) != null; } @@ -3976,7 +3999,7 @@ public long getLastLedgerCreationFailureTimestamp() { } public int getWaitingCursorsCount() { - return waitingCursors.size(); + return waitingCursors.size() + waitingCursorsByMaxReadPosition.size(); } public int getPendingAddEntriesCount() { @@ -4565,4 +4588,13 @@ public Position getTheSlowestNonDurationReadPosition() { } return theSlowestNonDurableReadPosition; } + + public void updateMaxReadPosition(Position position) { + if (position != null) { + this.maxReadPosition = position; + // When maxReadPosition is updated, can notify the cursor + // waiting for maxReadPosition to update which can be read + this.notifyCursorsByMaxReadPositionChanged(); + } + } } diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java index 8913c4013b4abb..0f40541b21d4a7 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java @@ -62,6 +62,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -3898,10 +3899,169 @@ public void testReadEntriesOrWaitWithMaxSize() throws Exception { } @Test - public void testReadEntriesOrWaitWithMaxPosition() throws Exception { + public void testTwoPendingReadRequestExistAndCancel() throws Exception { + ManagedLedger ledger = factory.open("testCancelAllWaitPendingReadRequest"); + ManagedCursor c = ledger.openCursor("c"); + + int sendNumber = 20; + // add waitingReadOp + c.asyncReadEntriesOrWait(sendNumber, new ReadEntriesCallback() { + @Override + public void readEntriesComplete(List entries, Object ctx) { + } + + @Override + public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { + } + }, null, PositionFactory.EARLIEST); + + // waitingReadOp exist, then readPosition in cursor is changed + // and it is able to add waitingReadOpByMaxReadPosition + Field field1 = ManagedLedgerImpl.class.getDeclaredField("maxReadPosition"); + field1.setAccessible(true); + field1.set(ledger, PositionFactory.create(3, 0)); + + Field field2 = ManagedLedgerImpl.class.getDeclaredField("lastConfirmedEntry"); + field2.setAccessible(true); + field2.set(ledger, PositionFactory.create(3, 10)); + + Field field3 = ManagedCursorImpl.class.getDeclaredField("readPosition"); + field3.setAccessible(true); + field3.set(c, PositionFactory.create(3, 5)); + + c.asyncReadEntriesOrWait(sendNumber, new ReadEntriesCallback() { + @Override + public void readEntriesComplete(List entries, Object ctx) { + } + + @Override + public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { + } + }, null, PositionFactory.LATEST); + + Field field = ManagedCursorImpl.class.getDeclaredField("waitingReadOp"); + field.setAccessible(true); + assertNotNull(field.get(c)); + + field = ManagedCursorImpl.class.getDeclaredField("waitingReadOpByMaxReadPosition"); + field.setAccessible(true); + assertNotNull(field.get(c)); + + // cancel PendingReadRequest + c.cancelPendingReadRequest(); + field = ManagedCursorImpl.class.getDeclaredField("waitingReadOp"); + field.setAccessible(true); + assertNull(field.get(c)); + + field = ManagedCursorImpl.class.getDeclaredField("waitingReadOpByMaxReadPosition"); + field.setAccessible(true); + assertNull(field.get(c)); + } + + @Test + public void testOnePendingReadRequestExistAndCancel() throws Exception { + ManagedLedger ledger = factory.open("testCancelAllWaitPendingReadRequest"); + ManagedCursor c = ledger.openCursor("c"); + + int sendNumber = 20; + c.asyncReadEntriesOrWait(sendNumber, new ReadEntriesCallback() { + @Override + public void readEntriesComplete(List entries, Object ctx) { + } + + @Override + public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { + } + }, null, PositionFactory.EARLIEST); + + c.asyncReadEntriesOrWait(sendNumber, new ReadEntriesCallback() { + @Override + public void readEntriesComplete(List entries, Object ctx) { + } + + @Override + public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { + } + }, null, PositionFactory.LATEST); + + // waitingReadOp is not null, waitingReadOpByMaxReadPosition is null + Field field = ManagedCursorImpl.class.getDeclaredField("waitingReadOp"); + field.setAccessible(true); + assertNotNull(field.get(c)); + + field = ManagedCursorImpl.class.getDeclaredField("waitingReadOpByMaxReadPosition"); + field.setAccessible(true); + assertNull(field.get(c)); + + // cancel PendingReadRequest + c.cancelPendingReadRequest(); + field = ManagedCursorImpl.class.getDeclaredField("waitingReadOp"); + field.setAccessible(true); + assertNull(field.get(c)); + + field = ManagedCursorImpl.class.getDeclaredField("waitingReadOpByMaxReadPosition"); + field.setAccessible(true); + assertNull(field.get(c)); + + // set readPosition=5, maxReadPosition=0, lastConfirmedEntry=10 + // waitingReadOpByMaxReadPosition is not null, waitingReadOp is null, + Field field1 = ManagedLedgerImpl.class.getDeclaredField("maxReadPosition"); + field1.setAccessible(true); + field1.set(ledger, PositionFactory.create(3, 0)); + + Field field2 = ManagedLedgerImpl.class.getDeclaredField("lastConfirmedEntry"); + field2.setAccessible(true); + field2.set(ledger, PositionFactory.create(3, 10)); + + Field field3 = ManagedCursorImpl.class.getDeclaredField("readPosition"); + field3.setAccessible(true); + field3.set(c, PositionFactory.create(3, 5)); + + c.asyncReadEntriesOrWait(sendNumber, new ReadEntriesCallback() { + @Override + public void readEntriesComplete(List entries, Object ctx) { + } + + @Override + public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { + } + }, null, PositionFactory.EARLIEST); + + c.asyncReadEntriesOrWait(sendNumber, new ReadEntriesCallback() { + @Override + public void readEntriesComplete(List entries, Object ctx) { + } + + @Override + public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { + } + }, null, PositionFactory.LATEST); + + field = ManagedCursorImpl.class.getDeclaredField("waitingReadOp"); + field.setAccessible(true); + assertNull(field.get(c)); + + field = ManagedCursorImpl.class.getDeclaredField("waitingReadOpByMaxReadPosition"); + field.setAccessible(true); + assertNotNull(field.get(c)); + + // cancel PendingReadRequest + c.cancelPendingReadRequest(); + field = ManagedCursorImpl.class.getDeclaredField("waitingReadOp"); + field.setAccessible(true); + assertNull(field.get(c)); + + field = ManagedCursorImpl.class.getDeclaredField("waitingReadOpByMaxReadPosition"); + field.setAccessible(true); + assertNull(field.get(c)); + } + + @Test + public void testReadEntriesOrWaitWithMaxPositionWithoutEnableTransaction() throws Exception { int readMaxNumber = 10; int sendNumber = 20; - ManagedLedger ledger = factory.open("testReadEntriesOrWaitWithMaxPosition"); + ManagedLedgerImpl ledger = (ManagedLedgerImpl) + factory.open("testReadEntriesOrWaitWithMaxPosition"); ManagedCursor c = ledger.openCursor("c"); Position position = PositionFactory.EARLIEST; Position maxCanReadPosition = PositionFactory.EARLIEST; @@ -3913,8 +4073,14 @@ public void testReadEntriesOrWaitWithMaxPosition() throws Exception { } else { ledger.addEntry(new byte[1024]); } - } + + // when disable transaction, ml.maxReadPosition is always latest, + // readPosition is always <= ml.maxReadPosition, + // so it won't affect the previous readEntry process + Assert.assertEquals(ledger.getLastConfirmedEntry(), maxCanReadPosition); + Assert.assertEquals(ledger.getMaxReadPosition(), PositionFactory.LATEST); + CompletableFuture completableFuture = new CompletableFuture<>(); c.asyncReadEntriesOrWait(sendNumber, new ReadEntriesCallback() { @Override @@ -3947,6 +4113,216 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { } + @Test + public void testReadEntriesOrWaitWithMaxPositionWithEnableTransaction() throws Exception { + int readMaxNumber = 10; + int sendNumber = 20; + ManagedLedgerImpl ledger = (ManagedLedgerImpl) + factory.open("testReadEntriesOrWaitWithMaxPosition"); + ManagedCursor c = ledger.openCursor("c"); + Position position = PositionFactory.EARLIEST; + Position lastPosition = PositionFactory.EARLIEST; + for (int i = 0; i < sendNumber; i++) { + if (i == readMaxNumber - 1) { + position = ledger.addEntry(new byte[1024]); + } else if (i == sendNumber - 1) { + lastPosition = ledger.addEntry(new byte[1024]); + } else { + ledger.addEntry(new byte[1024]); + } + } + + // update maxReadPosition to position. + // simulate that 0-position is normal add entry, + // position-maxCanReadPosition is txn add entry, and txn not commit + ledger.updateMaxReadPosition(position); + + Assert.assertEquals(ledger.getLastConfirmedEntry(), lastPosition); + Assert.assertEquals(ledger.getMaxReadPosition(), position); + + CompletableFuture completableFuture = new CompletableFuture<>(); + c.asyncReadEntriesOrWait(sendNumber, new ReadEntriesCallback() { + @Override + public void readEntriesComplete(List entries, Object ctx) { + completableFuture.complete(entries.size()); + } + + @Override + public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { + completableFuture.completeExceptionally(exception); + } + }, null, ledger.getMaxReadPosition()); + + int number = completableFuture.get(); + assertEquals(number, readMaxNumber); + + + CompletableFuture completableFuture2 = new CompletableFuture<>(); + c.asyncReadEntriesOrWait(sendNumber, new ReadEntriesCallback() { + @Override + public void readEntriesComplete(List entries, Object ctx) { + completableFuture2.complete(entries.size()); + } + + @Override + public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { + completableFuture2.completeExceptionally(exception); + } + }, null, lastPosition); + + try { + number = completableFuture2.get(2, TimeUnit.SECONDS); + fail(); + } catch (TimeoutException e) { + // read op in wait state, so will throw timeout exception + } + + // update maxReadPosition to maxCanReadPosition, notify wait cursor to read + ledger.updateMaxReadPosition(lastPosition); + number = completableFuture2.get(2, TimeUnit.SECONDS); + assertEquals(number, sendNumber - readMaxNumber); + + + // reset cursor to earliest + c.resetCursor(PositionFactory.EARLIEST); + CompletableFuture completableFuture3 = new CompletableFuture<>(); + c.asyncReadEntriesOrWait(sendNumber, new ReadEntriesCallback() { + @Override + public void readEntriesComplete(List entries, Object ctx) { + completableFuture3.complete(entries.size()); + } + + @Override + public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { + completableFuture3.completeExceptionally(exception); + } + }, null, lastPosition); + number = completableFuture3.get(2, TimeUnit.SECONDS); + // after reset to earliest, can read from earliest to lastPosition + assertEquals(number, sendNumber); + } + + @DataProvider(name = "updateMaxReadPositionThenAddEntry") + public Object[][] updateMaxReadPositionThenAddEntryProvider() { + return new Object[][] { { Boolean.TRUE }, { Boolean.FALSE } }; + } + + @Test(dataProvider = "updateMaxReadPositionThenAddEntry") + public void testPendingReadOpWithResetCursor(boolean flag) throws Exception { + int readMaxNumber = 10; + int sendNumber = 20; + ManagedLedgerImpl ledger = (ManagedLedgerImpl) + factory.open("testReadEntriesOrWaitWithMaxPosition"); + ManagedCursor c = ledger.openCursor("c"); + Position middlePosition = PositionFactory.EARLIEST; + Position lastPosition = PositionFactory.EARLIEST; + for (int i = 0; i < sendNumber; i++) { + if (i == readMaxNumber - 1) { + middlePosition = ledger.addEntry(new byte[1024]); + } else if (i == sendNumber - 1) { + lastPosition = ledger.addEntry(new byte[1024]); + } else { + ledger.addEntry(new byte[1024]); + } + } + + // update maxReadPosition to earliest. + // simulate that all entries is txn add entry, and txn not commit + ledger.updateMaxReadPosition(PositionFactory.EARLIEST); + + Assert.assertEquals(ledger.getLastConfirmedEntry(), lastPosition); + Assert.assertEquals(ledger.getMaxReadPosition(), PositionFactory.EARLIEST); + + // completableFuture would go into waitingCursorsByMaxReadPosition + CompletableFuture completableFuture = new CompletableFuture<>(); + c.asyncReadEntriesOrWait(sendNumber, new ReadEntriesCallback() { + @Override + public void readEntriesComplete(List entries, Object ctx) { + completableFuture.complete(entries.size()); + } + + @Override + public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { + completableFuture.completeExceptionally(exception); + } + }, null, lastPosition); + + int number; + try { + number = completableFuture.get(2, TimeUnit.SECONDS); + fail(); + } catch (TimeoutException e) { + // read op in wait state, so will throw timeout exception + } + + // reset cursor to lastPosition.getNext() + c.resetCursor(PositionFactory.LATEST); + Assert.assertEquals(c.getReadPosition(), lastPosition.getNext()); + + // completableFuture2 would go into waitingCursors + CompletableFuture completableFuture2 = new CompletableFuture<>(); + c.asyncReadEntriesOrWait(sendNumber, new ReadEntriesCallback() { + @Override + public void readEntriesComplete(List entries, Object ctx) { + completableFuture2.complete(entries.size()); + } + + @Override + public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { + completableFuture2.completeExceptionally(exception); + } + }, null, lastPosition); + + int number2; + try { + number2 = completableFuture2.get(2, TimeUnit.SECONDS); + fail(); + } catch (TimeoutException e) { + // read op in wait state, so will throw timeout exception + } + + Assert.assertEquals(ledger.waitingCursors.size(), 1); + Assert.assertEquals(ledger.waitingCursorsByMaxReadPosition.size(), 1); + + if (flag) { + // update maxReadPosition to lastPosition, + // notify waitingCursorsByMaxReadPosition to read + ledger.updateMaxReadPosition(lastPosition); + number = completableFuture.get(2, TimeUnit.SECONDS); + assertEquals(number, 0); + try { + number2 = completableFuture2.get(2, TimeUnit.SECONDS); + fail(); + } catch (TimeoutException e) { + // read op in wait state, so will throw timeout exception + } + + // then add entry to move forward lastConfirmedEntry + // notify waitingCursors to read + ledger.addEntry(new byte[1024]); + number2 = completableFuture2.get(2, TimeUnit.SECONDS); + assertEquals(number2, 0); + } else { + // add entry to move forward lastConfirmedEntry + // notify waitingCursors to read + ledger.addEntry(new byte[1024]); + number2 = completableFuture2.get(2, TimeUnit.SECONDS); + assertEquals(number2, 0); + try { + number = completableFuture.get(2, TimeUnit.SECONDS); + fail(); + } catch (TimeoutException e) { + // read op in wait state, so will throw timeout exception + } + + // then update maxReadPosition to lastPosition, + // notify waitingCursorsByMaxReadPosition to read + ledger.updateMaxReadPosition(lastPosition); + number = completableFuture.get(2, TimeUnit.SECONDS); + assertEquals(number, 0); + } + } + @Test public void testFlushCursorAfterInactivity() throws Exception { ManagedLedgerConfig config = new ManagedLedgerConfig(); @@ -4348,11 +4724,15 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { } Awaitility.await().atMost(Duration.ofSeconds(1)) .untilAsserted(() -> assertEquals(ledger.waitingCursors.size(), 1)); + Awaitility.await().atMost(Duration.ofSeconds(1)) + .untilAsserted(() -> assertEquals(ledger.waitingCursorsByMaxReadPosition.size(), 0)); assertTrue(cursor.cancelPendingReadRequest()); ledger.addEntry(new byte[1]); Awaitility.await().atMost(Duration.ofSeconds(1)) .untilAsserted(() -> assertTrue(ledger.waitingCursors.isEmpty())); + Awaitility.await().atMost(Duration.ofSeconds(1)) + .untilAsserted(() -> assertTrue(ledger.waitingCursorsByMaxReadPosition.isEmpty())); assertFalse(readEntriesSuccess.get()); assertEquals(exceptions.size(), numReadRequests - 1); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 711e1d93f742ff..d94b5cc97a493c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -422,6 +422,8 @@ public PersistentTopic(String topic, ManagedLedger ledger, BrokerService brokerS .getTransactionBufferProvider().newTransactionBuffer(this); } else { this.transactionBuffer = new TransactionBufferDisable(this); + // when disable transaction, update ml.maxReadPosition to PositionImpl.LATEST + this.ledger.updateMaxReadPosition(PositionFactory.LATEST); } transactionBuffer.syncMaxReadPositionForNormalPublish(ledger.getLastConfirmedEntry(), true); if (ledger instanceof ShadowManagedLedgerImpl) { @@ -515,6 +517,8 @@ public CompletableFuture initialize() { .getTransactionBufferProvider().newTransactionBuffer(this); } else { this.transactionBuffer = new TransactionBufferDisable(this); + // when disable transaction, update ml.maxReadPosition to PositionImpl.LATEST + this.ledger.updateMaxReadPosition(PositionFactory.LATEST); } shadowSourceTopic = null; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java index b4662e5fa83ed8..755e3795037821 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java @@ -143,6 +143,8 @@ public void recoverComplete() { "Transaction buffer recover failed to change the status to Ready," + "current state is: " + getState())); } else { + // when TB recover success, update ml.maxReadPosition + topic.getManagedLedger().updateMaxReadPosition(maxReadPosition); timer.newTimeout(TopicTransactionBuffer.this, takeSnapshotIntervalTime, TimeUnit.MILLISECONDS); transactionBufferFuture.complete(null); @@ -155,6 +157,8 @@ public void recoverComplete() { public void noNeedToRecover() { synchronized (TopicTransactionBuffer.this) { maxReadPosition = topic.getManagedLedger().getLastConfirmedEntry(); + // when TB no need to recover, update ml.maxReadPosition + topic.getManagedLedger().updateMaxReadPosition(maxReadPosition); if (!changeToNoSnapshotState()) { log.error("[{}]Transaction buffer recover fail", topic.getName()); } else { @@ -487,6 +491,8 @@ void updateMaxReadPosition(Position newPosition, boolean disableCallback) { if (!disableCallback) { maxReadPositionCallBack.maxReadPositionMovedForward(preMaxReadPosition, this.maxReadPosition); } + // When TB.maxReadPosition move forward, need to synchronize to ml + this.topic.getManagedLedger().updateMaxReadPosition(maxReadPosition); } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionConsumeTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionConsumeTest.java index 25479e657d4560..1758e6b49a3702 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionConsumeTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionConsumeTest.java @@ -35,6 +35,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.PositionFactory; +import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; import org.apache.pulsar.broker.service.Topic; import org.apache.pulsar.broker.service.persistent.MessageRedeliveryController; import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers; @@ -415,4 +416,126 @@ public void testAckChunkMessage() throws Exception { Assert.assertEquals(admin.topics().getStats(CONSUME_TOPIC).getSubscriptions().get(subName) .getUnackedMessages(), 0); } + + @Test + public void testConsumeWhenUpdateMaxReadPosition() throws Exception { + int messageCntBeforeTxn = 10; + int transactionMessageCnt = 10; + int messageCntAfterTxn = 10; + int commitTime = 0; + int totalMsgCnt = messageCntBeforeTxn + transactionMessageCnt + messageCntAfterTxn; + + @Cleanup + Producer producer = pulsarClient.newProducer() + .topic(CONSUME_TOPIC) + .create(); + + @Cleanup + Consumer exclusiveConsumer = pulsarClient.newConsumer() + .topic(CONSUME_TOPIC) + .subscriptionName("exclusive-test") + .subscribe(); + + @Cleanup + Consumer sharedConsumer = pulsarClient.newConsumer() + .topic(CONSUME_TOPIC) + .subscriptionName("shared-test") + .subscriptionType(SubscriptionType.Shared) + .subscribe(); + + Awaitility.await().until(exclusiveConsumer::isConnected); + Awaitility.await().until(sharedConsumer::isConnected); + + long mostSigBits = 2L; + long leastSigBits = 5L; + TxnID txnID = new TxnID(mostSigBits, leastSigBits); + + PersistentTopic persistentTopic = (PersistentTopic) getPulsarServiceList().get(0).getBrokerService() + .getTopic(CONSUME_TOPIC, false).get().get(); + log.info("transactionBuffer init finish."); + + Field field = ManagedLedgerImpl.class.getDeclaredField("maxReadPosition"); + field.setAccessible(true); + + Assert.assertEquals(persistentTopic.getTransactionBuffer().getMaxReadPosition(), + field.get(persistentTopic.getManagedLedger())); + Assert.assertEquals(persistentTopic.getTransactionBuffer().getMaxReadPosition().getEntryId(), -1L); + + // send normal message + // ml.maxReadPosition = tb.maxReadPosition = ml.lastConfirmedEntry + List sendMessageList = new ArrayList<>(); + sendNormalMessages(producer, 0, messageCntBeforeTxn, sendMessageList); + Assert.assertEquals(persistentTopic.getTransactionBuffer().getMaxReadPosition(), + field.get(persistentTopic.getManagedLedger())); + Assert.assertEquals(persistentTopic.getTransactionBuffer().getMaxReadPosition().getEntryId(), + messageCntBeforeTxn - 1); + Assert.assertEquals(persistentTopic.getManagedLedger().getLastConfirmedEntry().getEntryId(), + messageCntBeforeTxn - 1); + + // send txn message and normal message + // ml.maxReadPosition = tb.maxReadPosition < ml.lastConfirmedEntry + appendTransactionMessages(txnID, persistentTopic, transactionMessageCnt, sendMessageList); + sendNormalMessages(producer, 0, messageCntAfterTxn, sendMessageList); + Assert.assertEquals(persistentTopic.getTransactionBuffer().getMaxReadPosition(), + field.get(persistentTopic.getManagedLedger())); + Assert.assertEquals(persistentTopic.getTransactionBuffer().getMaxReadPosition().getEntryId(), + messageCntBeforeTxn - 1); + Assert.assertEquals(persistentTopic.getManagedLedger().getLastConfirmedEntry().getEntryId(), + totalMsgCnt - 1); + + + Message message; + for (int i = 0; i < totalMsgCnt; i++) { + if (i < messageCntBeforeTxn) { + // receive normal messages successfully + message = exclusiveConsumer.receive(2, TimeUnit.SECONDS); + Assert.assertNotNull(message); + log.info("Receive exclusive normal msg: {}" + new String(message.getData(), UTF_8)); + message = sharedConsumer.receive(2, TimeUnit.SECONDS); + Assert.assertNotNull(message); + log.info("Receive shared normal msg: {}" + new String(message.getData(), UTF_8)); + } else { + // can't receive transaction messages before commit + message = exclusiveConsumer.receive(500, TimeUnit.MILLISECONDS); + Assert.assertNull(message); + log.info("exclusive consumer can't receive message before commit."); + message = sharedConsumer.receive(500, TimeUnit.MILLISECONDS); + Assert.assertNull(message); + log.info("shared consumer can't receive message before commit."); + } + } + + // commit txn, ml.maxReadPosition = tb.maxReadPosition = ml.lastConfirmedEntry + // maxReadPosition and lastConfirmedEntry are 30, actually has 30 messages and 1 commitmarker + persistentTopic.endTxn(txnID, TxnAction.COMMIT_VALUE, 0L).get(); + log.info("Commit txn."); + commitTime++; + + Assert.assertEquals(persistentTopic.getTransactionBuffer().getMaxReadPosition(), + field.get(persistentTopic.getManagedLedger())); + Assert.assertEquals(persistentTopic.getTransactionBuffer().getMaxReadPosition().getEntryId(), + totalMsgCnt - 1 + commitTime); + Assert.assertEquals(persistentTopic.getManagedLedger().getLastConfirmedEntry().getEntryId(), + totalMsgCnt - 1 + commitTime); + + // receive transaction messages successfully after commit + for (int i = 0; i < transactionMessageCnt + messageCntAfterTxn; i++) { + message = exclusiveConsumer.receive(5, TimeUnit.SECONDS); + Assert.assertNotNull(message); + log.info("Receive txn exclusive id: {}, msg: {}", message.getMessageId(), new String(message.getData())); + + message = sharedConsumer.receive(5, TimeUnit.SECONDS); + Assert.assertNotNull(message); + log.info("Receive txn shared id: {}, msg: {}", message.getMessageId(), new String(message.getData())); + } + + + message = exclusiveConsumer.receive(500, TimeUnit.MILLISECONDS); + Assert.assertNull(message); + log.info("exclusive consumer can't receive the 31th message."); + + message = sharedConsumer.receive(500, TimeUnit.MILLISECONDS); + Assert.assertNull(message); + log.info("shared consumer can't receive the 31th message."); + } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionStablePositionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionStablePositionTest.java index eb7b24c7326dc7..c6b13564f8ae24 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionStablePositionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionStablePositionTest.java @@ -21,14 +21,17 @@ import static org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBufferState.State.NoSnapshot; import static org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBufferState.State.Ready; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotEquals; import static org.testng.Assert.assertNull; import java.lang.reflect.Field; import java.lang.reflect.Method; import java.util.concurrent.TimeUnit; import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.mledger.ManagedLedger; import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.PositionFactory; +import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.broker.transaction.TransactionTestBase; import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer; @@ -198,11 +201,20 @@ public void testSyncNormalPositionWhenTBRecover(boolean clientEnableTransaction, PersistentTopic persistentTopic = (PersistentTopic) getPulsarServiceList().get(0).getBrokerService() .getTopic(TopicName.get(topicName).toString(), false).get().get(); + ManagedLedger managedLedger = persistentTopic.getManagedLedger(); + Field field1 = ManagedLedgerImpl.class.getDeclaredField("maxReadPosition"); + field1.setAccessible(true); + TopicTransactionBuffer topicTransactionBuffer = (TopicTransactionBuffer) persistentTopic.getTransactionBuffer(); // wait topic transaction buffer recover success checkTopicTransactionBufferState(clientEnableTransaction, topicTransactionBuffer); + // since topic transaction buffer recover success, + // ml.maxReadPosition would be updated to lastConfirmedEntry + Position mlMaxReadPosition1 = (Position) field1.get(managedLedger); + assertEquals(mlMaxReadPosition1, managedLedger.getLastConfirmedEntry()); + Field field = TopicTransactionBufferState.class.getDeclaredField("state"); field.setAccessible(true); field.set(topicTransactionBuffer, state); @@ -216,6 +228,13 @@ public void testSyncNormalPositionWhenTBRecover(boolean clientEnableTransaction, // send normal message can't change MaxReadPosition when state is None or Initializing position = topicTransactionBuffer.getMaxReadPosition(); assertEquals(position, PositionFactory.EARLIEST); + // send normal message can't change ml.maxReadPosition when state is None or Initializing + // but ml.lastConfirmedEntry would be changed + Position mlMaxReadPosition2 = (Position) field1.get(managedLedger); + assertEquals(mlMaxReadPosition2, mlMaxReadPosition1); + assertEquals(managedLedger.getLastConfirmedEntry(), + PositionFactory.create(messageId.getLedgerId(), messageId.getEntryId())); + assertNotEquals(mlMaxReadPosition2, managedLedger.getLastConfirmedEntry()); // change to None state can recover field.set(topicTransactionBuffer, TopicTransactionBufferState.State.None); @@ -231,6 +250,10 @@ public void testSyncNormalPositionWhenTBRecover(boolean clientEnableTransaction, // change MaxReadPosition to normal message position assertEquals(PositionFactory.create(messageId.getLedgerId(), messageId.getEntryId()), topicTransactionBuffer.getMaxReadPosition()); + // change ml.maxReadPosition to normal message position + Position mlMaxReadPosition3 = (Position) field1.get(managedLedger); + assertEquals(mlMaxReadPosition3, + PositionFactory.create(messageId.getLedgerId(), messageId.getEntryId())); } private void checkTopicTransactionBufferState(boolean clientEnableTransaction, diff --git a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/MockManagedLedger.java b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/MockManagedLedger.java index 66ace69d7cda23..757e2ad87be840 100644 --- a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/MockManagedLedger.java +++ b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/MockManagedLedger.java @@ -392,4 +392,8 @@ public boolean isMigrated() { // no-op return false; } + + @Override + public void updateMaxReadPosition(Position position) { + } }