Skip to content

Commit

Permalink
[FLINK-36679][runtime] Add a metric to track checkpoint _metadata size
Browse files Browse the repository at this point in the history
  • Loading branch information
liuml07 committed Nov 8, 2024
1 parent 2ceb62d commit 3cd40bb
Show file tree
Hide file tree
Showing 11 changed files with 83 additions and 13 deletions.
5 changes: 5 additions & 0 deletions docs/content.zh/docs/ops/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -1317,6 +1317,11 @@ Note that for failed checkpoints, metrics are updated on a best efforts basis an
<td>The full size of the last checkpoint (in bytes).</td>
<td>Gauge</td>
</tr>
<tr>
<td>lastCheckpointMetadataSize</td>
<td>上次检查点 _metadata 文件的大小(字节).</td>
<td>Gauge</td>
</tr>
<tr>
<td>lastCheckpointExternalPath</td>
<td>The path where the last external checkpoint was stored.</td>
Expand Down
5 changes: 5 additions & 0 deletions docs/content/docs/ops/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -1307,6 +1307,11 @@ Note that for failed checkpoints, metrics are updated on a best efforts basis an
<td>The full size of the last checkpoint (in bytes).</td>
<td>Gauge</td>
</tr>
<tr>
<td>lastCheckpointMetadataSize</td>
<td>The metadata file size of the last checkpoint (in bytes).</td>
<td>Gauge</td>
</tr>
<tr>
<td>lastCheckpointExternalPath</td>
<td>The path where the last external checkpoint was stored.</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,11 @@ public abstract class AbstractCheckpointStats implements Serializable {
*/
public abstract long getCheckpointedSize();

/** @return The metadata file size, 0 if unknown. */
public long getMetadataSize() {
return 0;
}

/** @return the total number of processed bytes during the checkpoint. */
public abstract long getProcessedData();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ public class CompletedCheckpointStats extends AbstractCheckpointStats {
/** Total checkpoint state size over all subtasks. */
private final long stateSize;

/** The persisted metadata file size. */
private final long metadataSize;

private final long processedData;

private final long persistedData;
Expand All @@ -67,6 +70,7 @@ public class CompletedCheckpointStats extends AbstractCheckpointStats {
Map<JobVertexID, TaskStateStats> taskStats,
int numAcknowledgedSubtasks,
long stateSize,
long metadataSize,
long processedData,
long persistedData,
boolean unalignedCheckpoint,
Expand All @@ -81,6 +85,7 @@ public class CompletedCheckpointStats extends AbstractCheckpointStats {
numAcknowledgedSubtasks,
stateSize,
stateSize,
metadataSize,
processedData,
persistedData,
unalignedCheckpoint,
Expand All @@ -100,6 +105,7 @@ public class CompletedCheckpointStats extends AbstractCheckpointStats {
* @param checkpointedSize Total persisted data size over all subtasks during the sync and async
* phases of this checkpoint.
* @param stateSize Total checkpoint state size over all subtasks.
* @param metadataSize The metadata file size
* @param processedData Processed data during the checkpoint.
* @param persistedData Persisted data during the checkpoint.
* @param unalignedCheckpoint Whether the checkpoint is unaligned.
Expand All @@ -115,6 +121,7 @@ public class CompletedCheckpointStats extends AbstractCheckpointStats {
int numAcknowledgedSubtasks,
long checkpointedSize,
long stateSize,
long metadataSize,
long processedData,
long persistedData,
boolean unalignedCheckpoint,
Expand All @@ -128,6 +135,7 @@ public class CompletedCheckpointStats extends AbstractCheckpointStats {
this.checkpointedSize = checkpointedSize;
checkArgument(stateSize >= 0, "Negative state size");
this.stateSize = stateSize;
this.metadataSize = metadataSize;
this.processedData = processedData;
this.persistedData = persistedData;
this.unalignedCheckpoint = unalignedCheckpoint;
Expand Down Expand Up @@ -155,6 +163,11 @@ public long getCheckpointedSize() {
return checkpointedSize;
}

@Override
public long getMetadataSize() {
return metadataSize;
}

@Override
public long getProcessedData() {
return processedData;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,7 @@ private void logCheckpointStatistics(AbstractCheckpointStats checkpointStats) {
.setAttribute("checkpointId", checkpointStats.getCheckpointId())
.setAttribute("fullSize", checkpointStats.getStateSize())
.setAttribute("checkpointedSize", checkpointStats.getCheckpointedSize())
.setAttribute("metadataSize", checkpointStats.getMetadataSize())
.setAttribute("checkpointStatus", checkpointStats.getStatus().name()));
if (LOG.isDebugEnabled()) {
StringWriter sw = new StringWriter();
Expand Down Expand Up @@ -416,6 +417,10 @@ private void setDurationSpanAttribute(
@VisibleForTesting
static final String LATEST_COMPLETED_CHECKPOINT_FULL_SIZE_METRIC = "lastCheckpointFullSize";

@VisibleForTesting
static final String LATEST_COMPLETED_CHECKPOINT_METADATA_SIZE_METRIC =
"lastCheckpointMetadataSize";

@VisibleForTesting
static final String LATEST_COMPLETED_CHECKPOINT_DURATION_METRIC = "lastCheckpointDuration";

Expand Down Expand Up @@ -457,6 +462,9 @@ private void registerMetrics(MetricGroup metricGroup) {
metricGroup.gauge(
LATEST_COMPLETED_CHECKPOINT_FULL_SIZE_METRIC,
new LatestCompletedCheckpointFullSizeGauge());
metricGroup.gauge(
LATEST_COMPLETED_CHECKPOINT_METADATA_SIZE_METRIC,
new LatestCompletedCheckpointMetadataSizeGauge());
metricGroup.gauge(
LATEST_COMPLETED_CHECKPOINT_DURATION_METRIC,
new LatestCompletedCheckpointDurationGauge());
Expand Down Expand Up @@ -537,6 +545,18 @@ public Long getValue() {
}
}

private class LatestCompletedCheckpointMetadataSizeGauge implements Gauge<Long> {
@Override
public Long getValue() {
CompletedCheckpointStats completed = latestCompletedCheckpoint;
if (completed != null) {
return completed.getMetadataSize();
} else {
return -1L;
}
}
}

private class LatestCompletedCheckpointDurationGauge implements Gauge<Long> {
@Override
public Long getValue() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,8 @@ private CompletedCheckpointStats toCompletedCheckpointStats(
CompletedCheckpointStorageLocation finalizedLocation) {
return pendingCheckpointStats != null
? pendingCheckpointStats.toCompletedCheckpointStats(
finalizedLocation.getExternalPointer())
finalizedLocation.getExternalPointer(),
finalizedLocation.getMetadataHandle().getStateSize())
: null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ boolean reportSubtaskStats(JobVertexID jobVertexId, SubtaskStateStats subtask) {
}
}

CompletedCheckpointStats toCompletedCheckpointStats(String externalPointer) {
CompletedCheckpointStats toCompletedCheckpointStats(String externalPointer, long metadataSize) {
return new CompletedCheckpointStats(
checkpointId,
triggerTimestamp,
Expand All @@ -231,6 +231,7 @@ CompletedCheckpointStats toCompletedCheckpointStats(String externalPointer) {
currentNumAcknowledgedSubtasks,
currentCheckpointedSize,
currentStateSize,
metadataSize,
currentProcessedData,
currentPersistedData,
unalignedCheckpoint,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ void testSimpleUpdates() {
long triggerTimestamp = 123123L;
long ackTimestamp = 123123 + 1212312399L;
long stateSize = Integer.MAX_VALUE + 17787L;
long metadataSize = Integer.MAX_VALUE + 1984L;
long processedData = Integer.MAX_VALUE + 123123L;
long persistedData = Integer.MAX_VALUE + 42L;
boolean unalignedCheckpoint = true;
Expand All @@ -59,6 +60,7 @@ void testSimpleUpdates() {
triggerTimestamp,
ackTimestamp + i,
stateSize + i,
metadataSize + i,
processedData + i,
persistedData + i,
unalignedCheckpoint);
Expand Down Expand Up @@ -94,6 +96,7 @@ private CompletedCheckpointStats createCompletedCheckpoint(
long triggerTimestamp,
long ackTimestamp,
long stateSize,
long metadataSize,
long processedData,
long persistedData,
boolean unalignedCheckpoint) {
Expand All @@ -114,6 +117,7 @@ private CompletedCheckpointStats createCompletedCheckpoint(
taskStats,
1,
stateSize,
metadataSize,
processedData,
persistedData,
unalignedCheckpoint,
Expand All @@ -125,6 +129,7 @@ private CompletedCheckpointStats createCompletedCheckpoint(
@Test
void testQuantiles() {
int stateSize = 100;
int metadataSize = 110;
int processedData = 200;
int persistedData = 300;
boolean unalignedCheckpoint = true;
Expand All @@ -141,6 +146,7 @@ void testQuantiles() {
singletonMap(new JobVertexID(), new TaskStateStats(new JobVertexID(), 1)),
1,
stateSize,
metadataSize,
processedData,
persistedData,
unalignedCheckpoint,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,7 @@ void testCompletedCheckpointStatsCallbacks() throws Exception {
1,
1,
1,
1,
true,
mock(SubtaskStateStats.class),
null);
Expand Down Expand Up @@ -409,6 +410,7 @@ void testIsJavaSerializable() throws Exception {
taskStats,
1337,
123129837912L,
2222379996L,
42L,
44L,
true,
Expand All @@ -426,6 +428,7 @@ void testIsJavaSerializable() throws Exception {
.isEqualTo(completed.getNumberOfAcknowledgedSubtasks());
assertThat(copy.getEndToEndDuration()).isEqualTo(completed.getEndToEndDuration());
assertThat(copy.getStateSize()).isEqualTo(completed.getStateSize());
assertThat(copy.getMetadataSize()).isEqualTo(completed.getMetadataSize());
assertThat(copy.getProcessedData()).isEqualTo(completed.getProcessedData());
assertThat(copy.getPersistedData()).isEqualTo(completed.getPersistedData());
assertThat(copy.isUnalignedCheckpoint()).isEqualTo(completed.isUnalignedCheckpoint());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ void testTrackerWithoutHistory() throws Exception {
pending.reportSubtaskStats(jobVertexID, createSubtaskStats(1));
pending.reportSubtaskStats(jobVertexID, createSubtaskStats(2));

tracker.reportCompletedCheckpoint(pending.toCompletedCheckpointStats(null));
tracker.reportCompletedCheckpoint(pending.toCompletedCheckpointStats(null, 1984));

CheckpointStatsSnapshot snapshot = tracker.createSnapshot();
// History should be empty
Expand Down Expand Up @@ -135,7 +135,7 @@ void testCheckpointTracking() throws Exception {
completed1.reportSubtaskStats(jobVertexID, createSubtaskStats(1));
completed1.reportSubtaskStats(jobVertexID, createSubtaskStats(2));

tracker.reportCompletedCheckpoint(completed1.toCompletedCheckpointStats(null));
tracker.reportCompletedCheckpoint(completed1.toCompletedCheckpointStats(null, 1984));

// Failed checkpoint
PendingCheckpointStats failed =
Expand All @@ -160,7 +160,7 @@ void testCheckpointTracking() throws Exception {
savepoint.reportSubtaskStats(jobVertexID, createSubtaskStats(1));
savepoint.reportSubtaskStats(jobVertexID, createSubtaskStats(2));

tracker.reportCompletedCheckpoint(savepoint.toCompletedCheckpointStats(null));
tracker.reportCompletedCheckpoint(savepoint.toCompletedCheckpointStats(null, 1984));

// In Progress
PendingCheckpointStats inProgress =
Expand Down Expand Up @@ -247,7 +247,7 @@ void testCheckpointStatsListenerOnCompletedCheckpoint() {
(checkpointStatsTracker, pendingCheckpointStats) ->
checkpointStatsTracker.reportCompletedCheckpoint(
pendingCheckpointStats.toCompletedCheckpointStats(
"random-external-pointer")),
"random-external-pointer", 1984)),
1,
0);
}
Expand Down Expand Up @@ -333,7 +333,7 @@ void testCreateSnapshot() {
assertThat(tracker.createSnapshot()).isEqualTo(snapshot2);

// Complete checkpoint => new snapshot
tracker.reportCompletedCheckpoint(pending.toCompletedCheckpointStats(null));
tracker.reportCompletedCheckpoint(pending.toCompletedCheckpointStats(null, 1984));

CheckpointStatsSnapshot snapshot3 = tracker.createSnapshot();
assertThat(snapshot3).isNotEqualTo(snapshot2);
Expand Down Expand Up @@ -382,7 +382,7 @@ public void addSpan(SpanBuilder spanBuilder) {
pending.reportSubtaskStats(jobVertexID, createSubtaskStats(0));

// Complete checkpoint => new snapshot
tracker.reportCompletedCheckpoint(pending.toCompletedCheckpointStats(null));
tracker.reportCompletedCheckpoint(pending.toCompletedCheckpointStats(null, 1984));

assertThat(reportedSpans.size()).isEqualTo(1);
assertThat(
Expand Down Expand Up @@ -524,6 +524,8 @@ public <T, G extends Gauge<T>> G gauge(String name, G gauge) {
.LATEST_RESTORED_CHECKPOINT_TIMESTAMP_METRIC,
DefaultCheckpointStatsTracker
.LATEST_COMPLETED_CHECKPOINT_SIZE_METRIC,
DefaultCheckpointStatsTracker
.LATEST_COMPLETED_CHECKPOINT_METADATA_SIZE_METRIC,
DefaultCheckpointStatsTracker
.LATEST_COMPLETED_CHECKPOINT_FULL_SIZE_METRIC,
DefaultCheckpointStatsTracker
Expand All @@ -537,7 +539,7 @@ public <T, G extends Gauge<T>> G gauge(String name, G gauge) {
DefaultCheckpointStatsTracker.LATEST_COMPLETED_CHECKPOINT_ID_METRIC,
DefaultCheckpointStatsTracker
.LATEST_CHECKPOINT_COMPLETED_TIMESTAMP));
assertThat(registeredGaugeNames).hasSize(13);
assertThat(registeredGaugeNames).hasSize(14);
}

/**
Expand Down Expand Up @@ -566,7 +568,7 @@ public <T, G extends Gauge<T>> G gauge(String name, G gauge) {
CheckpointStatsTracker stats = new DefaultCheckpointStatsTracker(0, metricGroup);

// Make sure to adjust this test if metrics are added/removed
assertThat(registeredGauges).hasSize(13);
assertThat(registeredGauges).hasSize(14);

// Check initial values
Gauge<Long> numCheckpoints =
Expand Down Expand Up @@ -597,6 +599,11 @@ public <T, G extends Gauge<T>> G gauge(String name, G gauge) {
registeredGauges.get(
DefaultCheckpointStatsTracker
.LATEST_COMPLETED_CHECKPOINT_SIZE_METRIC);
Gauge<Long> latestCompletedMetadataSize =
(Gauge<Long>)
registeredGauges.get(
DefaultCheckpointStatsTracker
.LATEST_COMPLETED_CHECKPOINT_METADATA_SIZE_METRIC);
Gauge<Long> latestCompletedFullSize =
(Gauge<Long>)
registeredGauges.get(
Expand Down Expand Up @@ -639,6 +646,7 @@ public <T, G extends Gauge<T>> G gauge(String name, G gauge) {
assertThat(numFailedCheckpoints.getValue()).isZero();
assertThat(latestRestoreTimestamp.getValue()).isEqualTo(-1);
assertThat(latestCompletedSize.getValue()).isEqualTo(-1);
assertThat(latestCompletedMetadataSize.getValue()).isEqualTo(-1);
assertThat(latestCompletedFullSize.getValue()).isEqualTo(-1);
assertThat(latestCompletedDuration.getValue()).isEqualTo(-1);
assertThat(latestProcessedData.getValue()).isEqualTo(-1);
Expand All @@ -663,6 +671,7 @@ public <T, G extends Gauge<T>> G gauge(String name, G gauge) {

long ackTimestamp = 11231230L;
long checkpointedSize = 123L;
Long metadataSize = 1984L;
long fullCheckpointSize = 12381238L;
long processedData = 4242L;
long persistedData = 4444L;
Expand All @@ -686,7 +695,8 @@ public <T, G extends Gauge<T>> G gauge(String name, G gauge) {

assertThat(pending.reportSubtaskStats(jobVertexID, subtaskStats)).isTrue();

stats.reportCompletedCheckpoint(pending.toCompletedCheckpointStats(externalPath));
stats.reportCompletedCheckpoint(
pending.toCompletedCheckpointStats(externalPath, metadataSize));

// Verify completed checkpoint updated
assertThat(numCheckpoints.getValue()).isOne();
Expand All @@ -695,6 +705,7 @@ public <T, G extends Gauge<T>> G gauge(String name, G gauge) {
assertThat(numFailedCheckpoints.getValue()).isZero();
assertThat(latestRestoreTimestamp.getValue()).isEqualTo(-1);
assertThat(latestCompletedSize.getValue()).isEqualTo(checkpointedSize);
assertThat(latestCompletedMetadataSize.getValue()).isEqualTo(metadataSize);
assertThat(latestCompletedFullSize.getValue()).isEqualTo(fullCheckpointSize);
assertThat(latestProcessedData.getValue()).isEqualTo(processedData);
assertThat(latestPersistedData.getValue()).isEqualTo(persistedData);
Expand Down Expand Up @@ -753,7 +764,7 @@ public <T, G extends Gauge<T>> G gauge(String name, G gauge) {
singletonMap(jobVertexID, 1));

thirdPending.reportSubtaskStats(jobVertexID, subtaskStats);
stats.reportCompletedCheckpoint(thirdPending.toCompletedCheckpointStats(null));
stats.reportCompletedCheckpoint(thirdPending.toCompletedCheckpointStats(null, 1984));
assertThat(latestCompletedId.getValue()).isEqualTo(2);

// Verify external path is "n/a", because internal checkpoint won't generate external path.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ void testReportCompletedCheckpoint() {
// Report completed
String externalPath = "asdjkasdjkasd";

callback.reportCompletedCheckpoint(pending.toCompletedCheckpointStats(externalPath));
callback.reportCompletedCheckpoint(pending.toCompletedCheckpointStats(externalPath, 1984));

ArgumentCaptor<CompletedCheckpointStats> args =
ArgumentCaptor.forClass(CompletedCheckpointStats.class);
Expand Down

0 comments on commit 3cd40bb

Please sign in to comment.