Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Kernel][Metrics][PR#7] Support ScanReport to log metrics for a Scan operation #4068

Merged
merged 10 commits into from
Feb 3, 2025
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.math.BigDecimal;
import java.util.Collections;
import java.util.List;
import java.util.Objects;

/**
* A literal value.
Expand Down Expand Up @@ -247,4 +248,16 @@ public String toString() {
public List<Expression> getChildren() {
return Collections.emptyList();
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
Literal other = (Literal) o;
return Objects.equals(dataType, other.dataType) && Objects.equals(value, other.value);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.delta.kernel.internal.actions.Protocol;
import io.delta.kernel.internal.fs.Path;
import io.delta.kernel.internal.replay.LogReplay;
import io.delta.kernel.metrics.SnapshotReport;
import io.delta.kernel.types.StructType;
import java.util.Optional;

Expand All @@ -36,6 +37,7 @@ public class ScanBuilderImpl implements ScanBuilder {
private final StructType snapshotSchema;
private final LogReplay logReplay;
private final Engine engine;
private final SnapshotReport snapshotReport;

private StructType readSchema;
private Optional<Predicate> predicate;
Expand All @@ -46,7 +48,8 @@ public ScanBuilderImpl(
Metadata metadata,
StructType snapshotSchema,
LogReplay logReplay,
Engine engine) {
Engine engine,
SnapshotReport snapshotReport) {
this.dataPath = dataPath;
this.protocol = protocol;
this.metadata = metadata;
Expand All @@ -55,6 +58,7 @@ public ScanBuilderImpl(
this.engine = engine;
this.readSchema = snapshotSchema;
this.predicate = Optional.empty();
this.snapshotReport = snapshotReport;
}

@Override
Expand All @@ -76,6 +80,13 @@ public ScanBuilder withReadSchema(Engine engine, StructType readSchema) {
@Override
public Scan build() {
return new ScanImpl(
snapshotSchema, readSchema, protocol, metadata, logReplay, predicate, dataPath);
snapshotSchema,
readSchema,
protocol,
metadata,
logReplay,
predicate,
dataPath,
snapshotReport);
}
}
163 changes: 139 additions & 24 deletions kernel/kernel-api/src/main/java/io/delta/kernel/internal/ScanImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,15 @@
import io.delta.kernel.internal.actions.Protocol;
import io.delta.kernel.internal.data.ScanStateRow;
import io.delta.kernel.internal.fs.Path;
import io.delta.kernel.internal.metrics.ScanMetrics;
import io.delta.kernel.internal.metrics.ScanReportImpl;
import io.delta.kernel.internal.metrics.Timer;
import io.delta.kernel.internal.replay.LogReplay;
import io.delta.kernel.internal.skipping.DataSkippingPredicate;
import io.delta.kernel.internal.skipping.DataSkippingUtils;
import io.delta.kernel.internal.util.*;
import io.delta.kernel.metrics.ScanReport;
import io.delta.kernel.metrics.SnapshotReport;
import io.delta.kernel.types.StructField;
import io.delta.kernel.types.StructType;
import io.delta.kernel.utils.CloseableIterator;
Expand All @@ -56,9 +61,12 @@ public class ScanImpl implements Scan {
private final Metadata metadata;
private final LogReplay logReplay;
private final Path dataPath;
private final Optional<Predicate> filter;
private final Optional<Tuple2<Predicate, Predicate>> partitionAndDataFilters;
private final Supplier<Map<String, StructField>> partitionColToStructFieldMap;
private boolean accessedScanFiles;
private final SnapshotReport snapshotReport;
private final ScanMetrics scanMetrics = new ScanMetrics();

public ScanImpl(
StructType snapshotSchema,
Expand All @@ -67,12 +75,14 @@ public ScanImpl(
Metadata metadata,
LogReplay logReplay,
Optional<Predicate> filter,
Path dataPath) {
Path dataPath,
SnapshotReport snapshotReport) {
this.snapshotSchema = snapshotSchema;
this.readSchema = readSchema;
this.protocol = protocol;
this.metadata = metadata;
this.logReplay = logReplay;
this.filter = filter;
this.partitionAndDataFilters = splitFilters(filter);
this.dataPath = dataPath;
this.partitionColToStructFieldMap =
Expand All @@ -82,6 +92,7 @@ public ScanImpl(
.filter(field -> partitionColNames.contains(field.getName().toLowerCase(Locale.ROOT)))
.collect(toMap(field -> field.getName().toLowerCase(Locale.ROOT), identity()));
};
this.snapshotReport = snapshotReport;
}

/**
Expand Down Expand Up @@ -118,30 +129,59 @@ public CloseableIterator<FilteredColumnarBatch> getScanFiles(
boolean hasDataSkippingFilter = dataSkippingFilter.isPresent();
boolean shouldReadStats = hasDataSkippingFilter || includeStats;

// Get active AddFiles via log replay
// If there is a partition predicate, construct a predicate to prune checkpoint files
// while constructing the table state.
CloseableIterator<FilteredColumnarBatch> scanFileIter =
logReplay.getAddFilesAsColumnarBatches(
engine,
shouldReadStats,
getPartitionsFilters()
.map(
predicate ->
rewritePartitionPredicateOnCheckpointFileSchema(
predicate, partitionColToStructFieldMap.get())));

// Apply partition pruning
scanFileIter = applyPartitionPruning(engine, scanFileIter);

// Apply data skipping
if (hasDataSkippingFilter) {
// there was a usable data skipping filter --> apply data skipping
scanFileIter = applyDataSkipping(engine, scanFileIter, dataSkippingFilter.get());
}
Timer.Timed planningDuration = scanMetrics.totalPlanningTimer.start();
// ScanReportReporter stores the current context and can be invoked (in the future) with
// `reportError` or `reportSuccess` to stop the planning duration timer and push a report to
// the engine
ScanReportReporter reportReporter =
allisonport-db marked this conversation as resolved.
Show resolved Hide resolved
exceptionOpt -> {
planningDuration.stop();
ScanReport scanReport =
new ScanReportImpl(
dataPath.toString() /* tablePath */,
logReplay.getVersion() /* table version */,
snapshotSchema,
snapshotReport.getReportUUID(),
filter,
readSchema,
getPartitionsFilters() /* partitionPredicate */,
dataSkippingFilter.map(p -> p),
scanMetrics,
exceptionOpt);
engine.getMetricsReporters().forEach(reporter -> reporter.report(scanReport));
};

try {
// Get active AddFiles via log replay
// If there is a partition predicate, construct a predicate to prune checkpoint files
// while constructing the table state.
CloseableIterator<FilteredColumnarBatch> scanFileIter =
logReplay.getAddFilesAsColumnarBatches(
engine,
shouldReadStats,
getPartitionsFilters()
.map(
predicate ->
rewritePartitionPredicateOnCheckpointFileSchema(
predicate, partitionColToStructFieldMap.get())),
scanMetrics);

// Apply partition pruning
scanFileIter = applyPartitionPruning(engine, scanFileIter);

// Apply data skipping
if (hasDataSkippingFilter) {
// there was a usable data skipping filter --> apply data skipping
scanFileIter = applyDataSkipping(engine, scanFileIter, dataSkippingFilter.get());
}

// TODO when !includeStats drop the stats column if present before returning
return wrapWithMetricsReporting(scanFileIter, reportReporter);

// TODO when !includeStats drop the stats column if present before returning
return scanFileIter;
} catch (Exception e) {
reportReporter.reportError(e);
throw e;
}
}

@Override
Expand Down Expand Up @@ -309,4 +349,79 @@ private CloseableIterator<FilteredColumnarBatch> applyDataSkipping(
filteredScanFileBatch.getData(), Optional.of(newSelectionVector));
});
}

/**
* Wraps a scan file iterator such that we emit {@link ScanReport} to the engine upon success and
* failure. Since most of our scan building code is lazily executed (since it occurs as
* maps/filters over an iterator) potential errors don't occur just within `getScanFile`s
* execution, but rather may occur as the returned iterator is consumed. Similarly, we cannot
scottsand-db marked this conversation as resolved.
Show resolved Hide resolved
* report a successful scan until the iterator has been fully consumed and the log read/filtered
* etc. This means we cannot report the successful scan within `getScanFiles` but rather must
* report after the iterator has been consumed.
*
* <p>This method takes a scan file iterator. It wraps the methods `next` and `hasNext` such that
allisonport-db marked this conversation as resolved.
Show resolved Hide resolved
* in the case of an exception originating from there, we will first report a failed ScanReport
* before propagating the exception. It also reports a success ScanReport when the iterator is
* closed, if and only if, all the elements have been consumed.
*/
private CloseableIterator<FilteredColumnarBatch> wrapWithMetricsReporting(
CloseableIterator<FilteredColumnarBatch> scanIter, ScanReportReporter reporter) {
return new CloseableIterator<FilteredColumnarBatch>() {

@Override
public void close() throws IOException {
try {
if (!scanIter.hasNext()) {
// The entire scan file iterator has been successfully consumed, report successful Scan
reporter.reportSuccess();
} else {
// TODO what should we do if the iterator is not fully consumed?
scottsand-db marked this conversation as resolved.
Show resolved Hide resolved
// - Do nothing (we don't know why the iterator is being closed early or what is going
// on in the connector)
// - Report a failure report with some sort of placeholder exception
// - Something else?
allisonport-db marked this conversation as resolved.
Show resolved Hide resolved
}
} finally {
scanIter.close();
}
}

@Override
public boolean hasNext() {
return wrapWithErrorReporting(() -> scanIter.hasNext());
}

@Override
public FilteredColumnarBatch next() {
return wrapWithErrorReporting(() -> scanIter.next());
}

private <T> T wrapWithErrorReporting(Supplier<T> s) {
try {
return s.get();
} catch (Exception e) {
reporter.reportError(e);
throw e;
}
}
};
}

/**
* Defines methods to report {@link ScanReport} to the engine. This allows us to avoid ambiguous
* lambdas/anonymous classes as well as reuse the defined default methods.
*/
private interface ScanReportReporter {

default void reportError(Exception e) {
report(Optional.of(e));
}

default void reportSuccess() {
report(Optional.empty());
}

/** Given an optional exception, reports a {@link ScanReport} to the engine */
void report(Optional<Exception> exceptionOpt);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,8 @@ public StructType getSchema(Engine engine) {

@Override
public ScanBuilder getScanBuilder(Engine engine) {
// TODO when we add ScanReport we will pass the SnapshotReport downstream here
return new ScanBuilderImpl(dataPath, protocol, metadata, getSchema(engine), logReplay, engine);
return new ScanBuilderImpl(
dataPath, protocol, metadata, getSchema(engine), logReplay, engine, snapshotReport);
}

///////////////////
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,12 @@
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.fasterxml.jackson.databind.ser.std.ToStringSerializer;
import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
import io.delta.kernel.expressions.Predicate;
import io.delta.kernel.metrics.MetricsReport;
import io.delta.kernel.metrics.ScanReport;
import io.delta.kernel.metrics.SnapshotReport;
import io.delta.kernel.metrics.TransactionReport;
import io.delta.kernel.types.StructType;

/** Defines JSON serializers for {@link MetricsReport} types */
public final class MetricsReportSerializers {
Expand All @@ -41,6 +44,15 @@ public static String serializeSnapshotReport(SnapshotReport snapshotReport)
return OBJECT_MAPPER.writeValueAsString(snapshotReport);
}

/**
* Serializes a {@link ScanReport} to a JSON string
*
* @throws JsonProcessingException
*/
public static String serializeScanReport(ScanReport scanReport) throws JsonProcessingException {
return OBJECT_MAPPER.writeValueAsString(scanReport);
}

/**
* Serializes a {@link TransactionReport} to a JSON string
*
Expand All @@ -59,7 +71,11 @@ public static String serializeTransactionReport(TransactionReport transactionRep
new ObjectMapper()
.registerModule(new Jdk8Module()) // To support Optional
.registerModule( // Serialize Exception using toString()
new SimpleModule().addSerializer(Exception.class, new ToStringSerializer()));
new SimpleModule().addSerializer(Exception.class, new ToStringSerializer()))
.registerModule( // Serialize StructType using toString
new SimpleModule().addSerializer(StructType.class, new ToStringSerializer()))
.registerModule( // Serialize Predicate using toString
new SimpleModule().addSerializer(Predicate.class, new ToStringSerializer()));

private MetricsReportSerializers() {}
}
Loading
Loading