Skip to content

Commit

Permalink
Respect snapshotTimeMillis when doing optimized count.
Browse files Browse the repository at this point in the history
  • Loading branch information
Thomas Powell committed Oct 17, 2024
1 parent 14fa9b8 commit d7497b8
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -598,25 +598,35 @@ public static String fullTableName(TableId tableId) {
}
}

public long calculateTableSize(TableId tableId, Optional<String> filter) {
return calculateTableSize(getTable(tableId), filter);
public long calculateTableSize(
TableId tableId, Optional<String> filter, OptionalLong snapshotTimeMillis) {
return calculateTableSize(getTable(tableId), filter, snapshotTimeMillis);
}

public long calculateTableSize(TableInfo tableInfo, Optional<String> filter) {
public long calculateTableSize(
TableInfo tableInfo, Optional<String> filter, OptionalLong snapshotTimeMillis) {
TableDefinition.Type type = tableInfo.getDefinition().getType();
if (type == TableDefinition.Type.TABLE && !filter.isPresent()) {
if (type == TableDefinition.Type.TABLE && filter.isEmpty() && snapshotTimeMillis.isEmpty()) {
return tableInfo.getNumRows().longValue();
} else if (type == TableDefinition.Type.EXTERNAL && !filter.isPresent()) {
} else if (type == TableDefinition.Type.EXTERNAL
&& filter.isEmpty()
&& snapshotTimeMillis.isEmpty()) {
String table = fullTableName(tableInfo.getTableId());
return getNumberOfRows(String.format("SELECT COUNT(*) from `%s`", table));
} else if (type == TableDefinition.Type.VIEW
|| type == TableDefinition.Type.MATERIALIZED_VIEW
|| ((type == TableDefinition.Type.TABLE || type == TableDefinition.Type.EXTERNAL)
&& filter.isPresent())) {
&& (filter.isPresent() || snapshotTimeMillis.isPresent()))) {
// run a query
String table = fullTableName(tableInfo.getTableId());
String timeTravelClause =
snapshotTimeMillis.isPresent()
? String.format(
"FOR SYSTEM TIME AS OF TIMESTAMP_MILLIS(%d)", snapshotTimeMillis.getAsLong())
: "";
String whereClause = filter.map(f -> "WHERE " + f).orElse("");
return getNumberOfRows(String.format("SELECT COUNT(*) from `%s` %s", table, whereClause));
return getNumberOfRows(
String.format("SELECT COUNT(*) from `%s` %s %s", table, timeTravelClause, whereClause));
} else {
throw new IllegalArgumentException(
String.format(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.time.Duration;
import java.time.Instant;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import org.slf4j.Logger;
Expand Down Expand Up @@ -98,14 +99,16 @@ public ReadSessionResponse create(
log.info(
"|creation a read session for table {}, parameters: "
+ "|selectedFields=[{}],"
+ "|filter=[{}]"
+ "|snapshotTimeMillis[{}]",
+ "|filter=[{}],"
+ "|snapshotTimeMillis[{}],"
+ "|view=[{}]",
actualTable.getFriendlyName(),
String.join(",", selectedFields),
filter.orElse("None"),
config.getSnapshotTimeMillis().isPresent()
? String.valueOf(config.getSnapshotTimeMillis().getAsLong())
: "None");
: "None",
isInputTableAView(tableDetails));

String tablePath = toTablePath(actualTable.getTableId());
CreateReadSessionRequest request =
Expand Down Expand Up @@ -264,7 +267,7 @@ TableInfo getActualTable(
// get it from the view
String querySql =
bigQueryClient.createSql(
table.getTableId(), requiredColumns, filters, config.getSnapshotTimeMillis());
table.getTableId(), requiredColumns, filters, OptionalLong.empty());
log.debug("querySql is {}", querySql);
return bigQueryClient.materializeViewToTable(
querySql, table.getTableId(), config.getMaterializationExpirationTimeInMinutes());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.util.Arrays;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.stream.Collectors;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.Dataset;
Expand Down Expand Up @@ -130,7 +131,11 @@ public RDD<Row> buildScan(String[] requiredColumns, Filter[] filters) {
BigQueryUtil.emptyIfNeeded(compiledFilter));
return (RDD<Row>)
generateEmptyRowRDD(
actualTable, readSessionCreator.isInputTableAView(table) ? "" : compiledFilter);
actualTable,
readSessionCreator.isInputTableAView(table) ? "" : compiledFilter,
readSessionCreator.isInputTableAView(table)
? OptionalLong.empty()
: options.getSnapshotTimeMillis());
} else if (requiredColumns.length == 0) {
log.debug("Not using optimized empty projection");
}
Expand Down Expand Up @@ -182,11 +187,13 @@ String getCompiledFilter(Filter[] filters) {
}
}

private RDD<?> generateEmptyRowRDD(TableInfo tableInfo, String filter) {
private RDD<?> generateEmptyRowRDD(
TableInfo tableInfo, String filter, OptionalLong snapshotTimeMillis) {
emptyRowRDDsCreated += 1;
Optional<String> optionalFilter =
(filter.length() == 0) ? Optional.empty() : Optional.of(filter);
long numberOfRows = bigQueryClient.calculateTableSize(tableInfo, optionalFilter);
long numberOfRows =
bigQueryClient.calculateTableSize(tableInfo, optionalFilter, snapshotTimeMillis);

Function1<Object, InternalRow> objectToInternalRowConverter =
new ObjectToInternalRowConverter();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,7 @@ private ReadSessionResponse createReadSession() {

Stream<InputPartitionContext<InternalRow>> createEmptyProjectionPartitions() {
Optional<String> filter = getCombinedFilter();
long rowCount = bigQueryClient.calculateTableSize(tableId, filter);
long rowCount = bigQueryClient.calculateTableSize(tableId, filter, readSessionCreatorConfig.getSnapshotTimeMillis());
logger.info("Used optimized BQ count(*) path. Count: " + rowCount);
int partitionsCount = readSessionCreatorConfig.getDefaultParallelism();
int partitionSize = (int) (rowCount / partitionsCount);
Expand Down

0 comments on commit d7497b8

Please sign in to comment.