Skip to content

Commit

Permalink
Add offload ledger info for admin topics stats (apache#11465)
Browse files Browse the repository at this point in the history
### Motivation
Currently, we don't have offload stats when getting the topic stats bin/pulsar-admin topics stats.
We should add metrics in topic stats on last offloaded ledger id, last successful offloaded timestamp, and last offload failure timestamp. 

### Modifications
Add lastOffloadedLedgerId, lastOffloadSuccessTimestamp, lastOffloadFailureTimestamp for ManagedLedgerImpl and TopicStatsImpl.
  • Loading branch information
frankxieke authored Aug 3, 2021
1 parent 03aedc7 commit f1f0add
Show file tree
Hide file tree
Showing 6 changed files with 78 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,27 @@ public interface ManagedLedger {
*/
long getOffloadedSize();

/**
* Get last offloaded ledgerId. If no offloaded yet, it returns 0.
*
* @return last offloaded ledgerId
*/
long getLastOffloadedLedgerId();

/**
* Get last suceessful offloaded timestamp. If no successful offload, it returns 0.
*
* @return last successful offloaded timestamp
*/
long getLastOffloadedSuccessTimestamp();

/**
* Get last failed offloaded timestamp. If no failed offload, it returns 0.
*
* @return last failed offloaded timestamp
*/
long getLastOffloadedFailureTimestamp();

void asyncTerminate(TerminateCallback callback, Object ctx);

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,10 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
private long lastLedgerCreationFailureTimestamp = 0;
private long lastLedgerCreationInitiationTimestamp = 0;

private long lastOffloadLedgerId = 0;
private long lastOffloadSuccessTimestamp = 0;
private long lastOffloadFailureTimestamp = 0;

private static final Random random = new Random(System.currentTimeMillis());
private long maximumRolloverTimeMs;
protected final Supplier<Boolean> mlOwnershipChecker;
Expand Down Expand Up @@ -2873,8 +2877,8 @@ private void offloadLoop(CompletableFuture<PositionImpl> promise, Queue<LedgerIn
})
.whenComplete((ignore, exception) -> {
if (exception != null) {
log.warn("[{}] Exception occurred during offload", name, exception);

lastOffloadFailureTimestamp = System.currentTimeMillis();
log.warn("[{}] Exception occurred for ledgerId {} timestamp {} during offload", name, ledgerId, lastOffloadFailureTimestamp, exception);
PositionImpl newFirstUnoffloaded = PositionImpl.get(ledgerId, 0);
if (newFirstUnoffloaded.compareTo(firstUnoffloaded) > 0) {
newFirstUnoffloaded = firstUnoffloaded;
Expand All @@ -2891,6 +2895,9 @@ private void offloadLoop(CompletableFuture<PositionImpl> promise, Queue<LedgerIn
newFirstUnoffloaded,
errorToReport);
} else {
lastOffloadSuccessTimestamp = System.currentTimeMillis();
log.info("[{}] offload for ledgerId {} timestamp {} succeed", name, ledgerId, lastOffloadSuccessTimestamp);
lastOffloadLedgerId = ledgerId;
invalidateReadHandle(ledgerId);
offloadLoop(promise, ledgersToOffload, firstUnoffloaded, firstError);
}
Expand Down Expand Up @@ -3711,6 +3718,21 @@ public long getOffloadedSize() {
return offloadedSize;
}

@Override
public long getLastOffloadedLedgerId() {
return lastOffloadLedgerId;
}

@Override
public long getLastOffloadedSuccessTimestamp() {
return lastOffloadSuccessTimestamp;
}

@Override
public long getLastOffloadedFailureTimestamp() {
return lastOffloadFailureTimestamp;
}

@Override
public Map<String, String> getProperties() {
return propertiesMap;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1799,6 +1799,9 @@ public TopicStatsImpl getStats(boolean getPreciseBacklog, boolean subscriptionBa
stats.deduplicationStatus = messageDeduplication.getStatus().toString();
stats.topicEpoch = topicEpoch.orElse(null);
stats.offloadedStorageSize = ledger.getOffloadedSize();
stats.lastOffloadLedgerId = ledger.getLastOffloadedLedgerId();
stats.lastOffloadSuccessTimeStamp = ledger.getLastOffloadedSuccessTimestamp();
stats.lastOffloadFailureTimeStamp = ledger.getLastOffloadedFailureTimestamp();
return stats;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,15 @@ public class TopicStatsImpl implements TopicStats {
/** Space used to store the offloaded messages for the topic/. */
public long offloadedStorageSize;

/** record last successful offloaded ledgerId. If no offload ledger, the value should be 0 */
public long lastOffloadLedgerId;

/** record last successful offloaded timestamp. If no successful offload, the value should be 0 */
public long lastOffloadSuccessTimeStamp;

/** record last failed offloaded timestamp. If no failed offload, the value should be 0 */
public long lastOffloadFailureTimeStamp;

/** List of connected publishers on this topic w/ their stats. */
@Getter(AccessLevel.NONE)
public List<PublisherStatsImpl> publishers;
Expand Down Expand Up @@ -145,6 +154,9 @@ public void reset() {
this.nonContiguousDeletedMessagesRanges = 0;
this.nonContiguousDeletedMessagesRangesSerializedSize = 0;
this.offloadedStorageSize = 0;
this.lastOffloadLedgerId = 0;
this.lastOffloadFailureTimeStamp = 0;
this.lastOffloadSuccessTimeStamp = 0;
}

// if the stats are added for the 1st time, we will need to make a copy of these stats and add it to the current
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ public void testPersistentTopicStats() {
assertEquals(topicStats.msgThroughputOut, 1.0);
assertEquals(topicStats.averageMsgSize, 1.0);
assertEquals(topicStats.offloadedStorageSize, 1);
assertEquals(topicStats.lastOffloadLedgerId, 0);
assertEquals(topicStats.lastOffloadSuccessTimeStamp, 0);
assertEquals(topicStats.lastOffloadFailureTimeStamp, 0);
assertEquals(topicStats.storageSize, 1);
assertEquals(topicStats.publishers.size(), 1);
assertEquals(topicStats.subscriptions.size(), 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,21 @@ public long getOffloadedSize() {
return 0;
}

@Override
public long getLastOffloadedLedgerId() {
return 0;
}

@Override
public long getLastOffloadedSuccessTimestamp() {
return 0;
}

@Override
public long getLastOffloadedFailureTimestamp() {
return 0;
}

@Override
public void asyncTerminate(AsyncCallbacks.TerminateCallback callback, Object ctx) {

Expand Down

0 comments on commit f1f0add

Please sign in to comment.