Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Ensure that DataIndexes produced by a RegionedColumnSourceManager are retained by the DataIndexer #6528

Merged
merged 6 commits into from
Jan 8, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import io.deephaven.engine.table.ColumnSource;
import io.deephaven.engine.table.DataIndex;
import io.deephaven.engine.table.Table;
import io.deephaven.engine.table.impl.indexer.DataIndexer;
import org.jetbrains.annotations.NotNull;

import java.util.*;
Expand All @@ -17,7 +18,7 @@
* A {@link AbstractDataIndex} that remaps the key columns of another {@link AbstractDataIndex}. Used to implement
* {@link io.deephaven.engine.table.DataIndex#remapKeyColumns(Map)}.
*/
public class RemappedDataIndex extends AbstractDataIndex {
public class RemappedDataIndex extends AbstractDataIndex implements DataIndexer.RetainableDataIndex {

private final AbstractDataIndex sourceIndex;
private final Map<ColumnSource<?>, ColumnSource<?>> oldToNewColumnMap;
Expand Down Expand Up @@ -109,4 +110,9 @@ public boolean isRefreshing() {
public boolean isValid() {
return sourceIndex.isValid();
}

@Override
public boolean shouldRetain() {
return DataIndexer.RetainableDataIndex.shouldRetain(sourceIndex);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
package io.deephaven.engine.table.impl.indexer;

import com.google.common.collect.Sets;
import io.deephaven.base.reference.HardSimpleReference;
import io.deephaven.base.reference.SimpleReference;
import io.deephaven.base.reference.WeakSimpleReference;
import io.deephaven.base.verify.Require;
import io.deephaven.engine.liveness.LivenessScopeStack;
import io.deephaven.engine.rowset.RowSet;
Expand All @@ -20,8 +23,6 @@
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

import java.lang.ref.Reference;
import java.lang.ref.WeakReference;
import java.util.*;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Predicate;
Expand Down Expand Up @@ -390,6 +391,27 @@ private static DataIndex validateAndManageCachedDataIndex(@Nullable final DataIn
return dataIndex;
}

/**
* Interface for {@link DataIndex} implementations that may opt into strong reachability within the DataIndexer's
* cache.
*/
public interface RetainableDataIndex extends DataIndex {

/**
* @return Whether {@code this} should be strongly held (if {@link #addDataIndex(DataIndex) added}) to maintain
* reachability
*/
boolean shouldRetain();

/**
* @return Whether {@code dataIndex} should be strongly held (if {@link #addDataIndex(DataIndex) added}) to
* maintain reachability
*/
static boolean shouldRetain(@NotNull final DataIndex dataIndex) {
return dataIndex instanceof RetainableDataIndex && ((RetainableDataIndex) dataIndex).shouldRetain();
}
}

/**
* Node structure for our multi-level cache of indexes.
*/
Expand All @@ -399,14 +421,14 @@ private static class DataIndexCache {
@SuppressWarnings("rawtypes")
private static final AtomicReferenceFieldUpdater<DataIndexCache, Map> DESCENDANT_CACHES_UPDATER =
AtomicReferenceFieldUpdater.newUpdater(DataIndexCache.class, Map.class, "descendantCaches");
private static final Reference<DataIndex> MISSING_INDEX_REFERENCE = new WeakReference<>(null);
private static final SimpleReference<DataIndex> MISSING_INDEX_REFERENCE = new WeakSimpleReference<>(null);

/** The sub-indexes below this level. */
@SuppressWarnings("FieldMayBeFinal")
private volatile Map<ColumnSource<?>, DataIndexCache> descendantCaches = EMPTY_DESCENDANT_CACHES;

/** A reference to the index at this level. Note that there will never be an index at the "root" level. */
private volatile Reference<DataIndex> dataIndexReference = MISSING_INDEX_REFERENCE;
private volatile SimpleReference<DataIndex> dataIndexReference = MISSING_INDEX_REFERENCE;

private DataIndexCache() {}

Expand Down Expand Up @@ -509,7 +531,9 @@ private boolean add(@NotNull final List<ColumnSource<?>> keyColumns, @NotNull fi
// noinspection SynchronizationOnLocalVariableOrMethodParameter
synchronized (cache) {
if (!isValidAndLive(cache.dataIndexReference.get())) {
cache.dataIndexReference = new WeakReference<>(dataIndex);
cache.dataIndexReference = RetainableDataIndex.shouldRetain(dataIndex)
? new HardSimpleReference<>(dataIndex)
: new WeakSimpleReference<>(dataIndex);
return true;
}
}
Expand Down Expand Up @@ -544,7 +568,7 @@ private DataIndex computeIfAbsent(
// managed by the appropriate scope for the caller's own use. Further validation is deferred
// as in add.
dataIndex = dataIndexFactory.get();
cache.dataIndexReference = new WeakReference<>(dataIndex);
cache.dataIndexReference = new WeakSimpleReference<>(dataIndex);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.deephaven.engine.table.impl.by.AggregationProcessor;
import io.deephaven.engine.table.impl.by.AggregationRowLookup;
import io.deephaven.engine.table.impl.dataindex.AbstractDataIndex;
import io.deephaven.engine.table.impl.indexer.DataIndexer;
import io.deephaven.engine.table.impl.locations.TableLocation;
import io.deephaven.engine.table.impl.perf.QueryPerformanceRecorder;
import io.deephaven.engine.table.impl.select.FunctionalColumn;
Expand All @@ -43,7 +44,7 @@
* source table".
*/
@InternalUseOnly
class MergedDataIndex extends AbstractDataIndex {
class MergedDataIndex extends AbstractDataIndex implements DataIndexer.RetainableDataIndex {

private static final String LOCATION_DATA_INDEX_TABLE_COLUMN_NAME = "__DataIndexTable";

Expand Down Expand Up @@ -239,4 +240,9 @@ public boolean isValid() {
}
return isValid = true;
}

@Override
public boolean shouldRetain() {
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import io.deephaven.engine.table.impl.QueryTable;
import io.deephaven.engine.table.impl.TableUpdateImpl;
import io.deephaven.engine.table.impl.dataindex.AbstractDataIndex;
import io.deephaven.engine.table.impl.indexer.DataIndexer;
import io.deephaven.engine.table.impl.sources.ArrayBackedColumnSource;
import io.deephaven.engine.table.impl.sources.ObjectArraySource;
import io.deephaven.engine.table.impl.sources.ReinterpretUtils;
Expand All @@ -30,7 +31,7 @@
/**
* DataIndex over a partitioning column of a {@link Table} backed by a {@link RegionedColumnSourceManager}.
*/
class PartitioningColumnDataIndex<KEY_TYPE> extends AbstractDataIndex {
class PartitioningColumnDataIndex<KEY_TYPE> extends AbstractDataIndex implements DataIndexer.RetainableDataIndex {

private static final int KEY_NOT_FOUND = (int) RowSequence.NULL_ROW_KEY;

Expand Down Expand Up @@ -318,4 +319,9 @@ public boolean isRefreshing() {
public boolean isValid() {
return true;
}

@Override
public boolean shouldRetain() {
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -964,6 +964,43 @@ public void testStringContainsFilter() {
}
}

public void testIndexRetentionThroughGC() {
final Table childTable;

// We don't need this liveness scope for liveness management, but rather to opt out of the enclosing scope's
// enforceStrongReachability
try (final SafeCloseable ignored = LivenessScopeStack.open()) {
final Map<String, Object> retained = new HashMap<>();
final Random random = new Random(0);
final int size = 500;
final QueryTable parentTable = getTable(false, size, random,
initColumnInfos(new String[] {"S1", "S2"},
new SetGenerator<>("aa", "bb", "cc", "dd", "AA", "BB", "CC", "DD"),
new SetGenerator<>("aaa", "bbb", "ccc", "ddd", "AAA", "BBB", "CCC", "DDD")));

// Explicitly retain the index references.
retained.put("di1", DataIndexer.getOrCreateDataIndex(parentTable, "S1"));
retained.put("di2", DataIndexer.getOrCreateDataIndex(parentTable, "S2"));
childTable = parentTable.update("isEven = ii % 2 == 0");

// While retained, the indexes will survive GC
System.gc();

// While the references are held, the parent and child tables should have the indexes.
Assert.assertTrue(DataIndexer.hasDataIndex(parentTable, "S1"));
Assert.assertTrue(DataIndexer.hasDataIndex(parentTable, "S2"));
Assert.assertTrue(DataIndexer.hasDataIndex(childTable, "S1"));
Assert.assertTrue(DataIndexer.hasDataIndex(childTable, "S2"));

// Explicitly release the references.
retained.clear();
}
// After a GC, the child table should not have the indexes.
System.gc();
Assert.assertFalse(DataIndexer.hasDataIndex(childTable, "S1"));
Assert.assertFalse(DataIndexer.hasDataIndex(childTable, "S2"));
}

public void testStringMatchFilterIndexed() {
// MatchFilters (currently) only use indexes on initial creation but this incremental test will recreate
// index-enabled match filtered tables and compare them against incremental non-indexed filtered tables.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import io.deephaven.base.FileUtils;
import io.deephaven.base.verify.Assert;
import io.deephaven.engine.context.ExecutionContext;
import io.deephaven.engine.liveness.LivenessScopeStack;
import io.deephaven.engine.primitive.function.ByteConsumer;
import io.deephaven.engine.primitive.function.CharConsumer;
import io.deephaven.engine.primitive.function.FloatConsumer;
Expand Down Expand Up @@ -58,6 +59,7 @@
import io.deephaven.test.types.OutOfBandTest;
import io.deephaven.time.DateTimeUtils;
import io.deephaven.util.QueryConstants;
import io.deephaven.util.SafeCloseable;
import io.deephaven.util.codec.SimpleByteArrayCodec;
import io.deephaven.util.compare.DoubleComparisons;
import io.deephaven.util.compare.FloatComparisons;
Expand Down Expand Up @@ -88,6 +90,7 @@
import java.math.BigInteger;
import java.net.URI;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
import java.time.Instant;
import java.time.LocalDate;
Expand Down Expand Up @@ -337,6 +340,102 @@ public void vectorParquetFormat() {
groupedTable("largeAggParquet", LARGE_TABLE_SIZE, false);
}

@Test
public void indexRetentionThroughGC() {
final String destPath = Path.of(rootFile.getPath(), "ParquetTest_indexRetention_test").toString();
final int tableSize = 10_000;
final Table testTable = TableTools.emptyTable(tableSize).update(
"symbol = randomInt(0,4)",
"price = randomInt(0,10000) * 0.01",
"str_id = `str_` + String.format(`%08d`, randomInt(0,1_000_000))",
"indexed_val = ii % 10_000");
final ParquetInstructions writeInstructions = ParquetInstructions.builder()
.setGenerateMetadataFiles(true)
.addIndexColumns("indexed_val")
.build();
final PartitionedTable partitionedTable = testTable.partitionBy("symbol");
ParquetTools.writeKeyValuePartitionedTable(partitionedTable, destPath, writeInstructions);
final Table child;

// We don't need this liveness scope for liveness management, but rather to opt out of the enclosing scope's
// enforceStrongReachability
try (final SafeCloseable ignored = LivenessScopeStack.open()) {
// Read from disk and validate the indexes through GC.
Table parent = ParquetTools.readTable(destPath);
child = parent.update("new_val = indexed_val + 1")
.update("new_val = new_val + 1")
.update("new_val = new_val + 1")
.update("new_val = new_val + 1");

// These indexes will survive GC because the parent table is holding strong references.
System.gc();

// The parent table should have the indexes.
Assert.eqTrue(DataIndexer.hasDataIndex(parent, "symbol"), "hasDataIndex -> symbol");
Assert.eqTrue(DataIndexer.hasDataIndex(parent, "indexed_val"), "hasDataIndex -> indexed_val");

// The child table should have the indexes while the parent is retained.
Assert.eqTrue(DataIndexer.hasDataIndex(child, "symbol"), "hasDataIndex -> symbol");
Assert.eqTrue(DataIndexer.hasDataIndex(child, "indexed_val"), "hasDataIndex -> indexed_val");

// Force the parent to null to allow GC to collect it.
parent = null;
}

// After a GC, the child table should still have access to the indexes.
System.gc();
Assert.eqTrue(DataIndexer.hasDataIndex(child, "symbol"), "hasDataIndex -> symbol");
Assert.eqTrue(DataIndexer.hasDataIndex(child, "indexed_val"), "hasDataIndex -> indexed_val");
}

@Test
public void remappedIndexRetentionThroughGC() {
final String destPath =
Path.of(rootFile.getPath(), "ParquetTest_remappedIndexRetention_test.parquet").toString();
final int tableSize = 10_000;
final Table testTable = TableTools.emptyTable(tableSize).update(
"symbol = randomInt(0,4)",
"price = randomInt(0,10000) * 0.01",
"str_id = `str_` + String.format(`%08d`, randomInt(0,1_000_000))",
"indexed_val = ii % 10_000");
final ParquetInstructions writeInstructions = ParquetInstructions.builder()
.setGenerateMetadataFiles(true)
.addIndexColumns("symbol")
.addIndexColumns("indexed_val")
.build();
ParquetTools.writeTable(testTable, destPath, writeInstructions);
final Table child;

// We don't need this liveness scope for liveness management, but rather to opt out of the enclosing scope's
// enforceStrongReachability
try (final SafeCloseable ignored = LivenessScopeStack.open()) {
// Read from disk and validate the indexes through GC.
Table parent = ParquetTools.readTable(destPath);

// select() produces in-memory column sources, triggering the remapping of the indexes.
child = parent.select();

// These indexes will survive GC because the parent table is holding strong references.
System.gc();

// The parent table should have the indexes.
Assert.eqTrue(DataIndexer.hasDataIndex(parent, "symbol"), "hasDataIndex -> symbol");
Assert.eqTrue(DataIndexer.hasDataIndex(parent, "indexed_val"), "hasDataIndex -> indexed_val");

// The child table should have the indexes while the parent is retained.
Assert.eqTrue(DataIndexer.hasDataIndex(child, "symbol"), "hasDataIndex -> symbol");
Assert.eqTrue(DataIndexer.hasDataIndex(child, "indexed_val"), "hasDataIndex -> indexed_val");

// Force the parent to null to allow GC to collect it.
parent = null;
}

// After a GC, the child table should still have access to the indexes.
System.gc();
Assert.eqTrue(DataIndexer.hasDataIndex(child, "symbol"), "hasDataIndex -> symbol");
Assert.eqTrue(DataIndexer.hasDataIndex(child, "indexed_val"), "hasDataIndex -> indexed_val");
}

@Test
public void indexByLongKey() {
final TableDefinition definition = TableDefinition.of(
Expand Down