Skip to content

Commit

Permalink
Static Select: Re-Use Already Flattened Results
Browse files Browse the repository at this point in the history
  • Loading branch information
nbauernfeind committed Oct 30, 2023
1 parent 6f8108a commit 08abba8
Show file tree
Hide file tree
Showing 3 changed files with 128 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,13 @@
*/
final public class PreserveColumnLayer extends DependencyLayerBase {
private final BitSet dependencyBitSet;
private final boolean flattenedResult;

PreserveColumnLayer(SelectAndViewAnalyzer inner, String name, SelectColumn sc, ColumnSource<?> cs, String[] deps,
ModifiedColumnSet mcsBuilder) {
ModifiedColumnSet mcsBuilder, boolean alreadyFlattenedSources) {
super(inner, name, sc, cs, deps, mcsBuilder);
this.dependencyBitSet = new BitSet();
this.flattenedResult = alreadyFlattenedSources;
Arrays.stream(deps).mapToInt(inner::getLayerIndexFor).forEach(dependencyBitSet::set);
}

Expand Down Expand Up @@ -75,6 +77,16 @@ public LogOutput append(LogOutput logOutput) {
.append("}");
}

@Override
public boolean flattenedResult() {
return flattenedResult;
}

@Override
public boolean alreadyFlattenedSources() {
// we can only preserve a flat source if it was already made flat
return flattenedResult;
}

@Override
public boolean allowCrossColumnParallelization() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import io.deephaven.base.Pair;
import io.deephaven.base.log.LogOutputAppendable;
import io.deephaven.base.verify.Assert;
import io.deephaven.datastructures.util.CollectionUtil;
import io.deephaven.engine.liveness.LivenessNode;
import io.deephaven.engine.rowset.RowSet;
Expand All @@ -26,6 +27,7 @@
import io.deephaven.engine.table.impl.util.WritableRowRedirection;
import io.deephaven.engine.updategraph.UpdateGraph;
import io.deephaven.io.log.impl.LogOutputStringImpl;
import io.deephaven.io.streams.ByteBufferStreams;
import io.deephaven.util.SafeCloseable;
import io.deephaven.util.SafeCloseablePair;
import io.deephaven.vector.Vector;
Expand Down Expand Up @@ -100,6 +102,7 @@ public static SelectAndViewAnalyzerWrapper create(
boolean shiftColumnHasPositiveOffset = false;

final HashSet<String> resultColumns = flattenedResult ? new HashSet<>() : null;
final HashMap<String, ColumnSource<?>> resultAlias = flattenedResult ? new HashMap<>() : null;
for (final SelectColumn sc : selectColumns) {
if (remainingCols != null) {
remainingCols.add(sc);
Expand Down Expand Up @@ -156,20 +159,47 @@ public static SelectAndViewAnalyzerWrapper create(
continue;
}

final SourceColumn realColumn;
if (sc instanceof SourceColumn) {
realColumn = (SourceColumn) sc;
} else if ((sc instanceof SwitchColumn) && ((SwitchColumn) sc).getRealColumn() instanceof SourceColumn) {
realColumn = (SourceColumn) ((SwitchColumn) sc).getRealColumn();
} else {
realColumn = null;
}

if (shouldPreserve(sc)) {
if (numberOfInternallyFlattenedColumns > 0) {
// this must be a source column to be preserved
Assert.neqNull(realColumn, "realColumn");

boolean sourceIsNew = resultColumns != null && resultColumns.contains(realColumn.getSourceName());
if (!sourceIsNew && numberOfInternallyFlattenedColumns > 0) {
// we must preserve this column, but have already created an analyzer for the internally flattened
// column, therefore must start over without permitting internal flattening
return create(sourceTable, mode, columnSources, originalRowSet, parentMcs, publishTheseSources,
false, selectColumns);
useShiftedColumns, false, selectColumns);
}

analyzer = analyzer.createLayerForPreserve(
sc.getName(), sc, sc.getDataView(), distinctDeps, mcsBuilder, flatResult && flattenedResult);

if (!sourceIsNew) {
// we can not flatten future columns because we are preserving this column
flattenedResult = false;
}
analyzer =
analyzer.createLayerForPreserve(sc.getName(), sc, sc.getDataView(), distinctDeps, mcsBuilder);
// we can not flatten future columns because we are preserving this column
flattenedResult = false;
continue;
}

if (flattenedResult && realColumn != null && !resultColumns.contains(realColumn.getSourceName())) {
// this result column a candidate for preservation
final ColumnSource<?> alias = resultAlias.get(realColumn.getSourceName());
if (alias != null) {
analyzer = analyzer.createLayerForPreserve(
sc.getName(), sc, alias, distinctDeps, mcsBuilder, flatResult);
continue;
}
}

final long targetDestinationCapacity =
rowSet.isEmpty() ? 0 : (flattenedResult ? rowSet.size() : rowSet.lastRowKey() + 1);
switch (mode) {
Expand All @@ -189,6 +219,12 @@ public static SelectAndViewAnalyzerWrapper create(
final WritableColumnSource<?> scs =
flatResult || flattenedResult ? sc.newFlatDestInstance(targetDestinationCapacity)
: sc.newDestInstance(targetDestinationCapacity);

if (flattenedResult && realColumn != null && !resultColumns.contains(realColumn.getSourceName())) {
// this source column a candidate for preservation if more than
resultAlias.put(realColumn.getSourceName(), scs);
}

analyzer = analyzer.createLayerForSelect(updateGraph, rowSet, sc.getName(), sc, scs, null,
distinctDeps, mcsBuilder, false, flattenedResult, flatResult && flattenedResult);
if (flattenedResult) {
Expand Down Expand Up @@ -350,8 +386,8 @@ private SelectAndViewAnalyzer createLayerForView(String name, SelectColumn sc, C
}

private SelectAndViewAnalyzer createLayerForPreserve(String name, SelectColumn sc, ColumnSource<?> cs,
String[] parentColumnDependencies, ModifiedColumnSet mcsBuilder) {
return new PreserveColumnLayer(this, name, sc, cs, parentColumnDependencies, mcsBuilder);
String[] parentColumnDependencies, ModifiedColumnSet mcsBuilder, boolean alreadyFlattened) {
return new PreserveColumnLayer(this, name, sc, cs, parentColumnDependencies, mcsBuilder, alreadyFlattened);
}

abstract void populateModifiedColumnSetRecurse(ModifiedColumnSet mcsBuilder, Set<String> remainingDepsToSatisfy);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1115,4 +1115,75 @@ public void testRegressionGH3562() {

assertTableEquals(expected, result);
}

@Test
public void testStaticSelectPreserveAlreadyFlattenedColumns() {
final Table source = emptyTable(10).updateView("I = ii").where("I % 2 == 0");
final Table result = source.select("Foo = I", "Bar = Foo", "Baz = I");

Assert.assertTrue(result.isFlat());

final ColumnSource<?> foo = result.getColumnSource("Foo");
final ColumnSource<?> bar = result.getColumnSource("Bar");
final ColumnSource<?> baz = result.getColumnSource("Baz");
result.getRowSet().forAllRowKeys(rowKey -> {
Assert.assertEquals(rowKey * 2, foo.getLong(rowKey));
Assert.assertEquals(rowKey * 2, bar.getLong(rowKey));
Assert.assertEquals(rowKey * 2, baz.getLong(rowKey));
});

Assert.assertEquals(foo, bar);
Assert.assertEquals(foo, baz);
}

@Test
public void testStaticSelectPreserveColumn() {
final Table source = emptyTable(10).select("I = ii").where("I % 2 == 0");
final Table result = source.select("Foo = I", "Bar = Foo", "Baz = I");

Assert.assertFalse(result.isFlat());

final ColumnSource<?> orig = source.getColumnSource("I");
final ColumnSource<?> foo = result.getColumnSource("Foo");
final ColumnSource<?> bar = result.getColumnSource("Bar");
final ColumnSource<?> baz = result.getColumnSource("Baz");
result.getRowSet().forAllRowKeys(rowKey -> {
Assert.assertEquals(rowKey, foo.getLong(rowKey));
Assert.assertEquals(rowKey, bar.getLong(rowKey));
Assert.assertEquals(rowKey, baz.getLong(rowKey));
});

// These columns were preserved and no flattening occurred.
Assert.assertEquals(orig, foo);
Assert.assertEquals(orig, bar);
Assert.assertEquals(orig, baz);
}

@Test
public void testStaticSelectRevertInternalFlatten() {
// there is some special logic that prevents an internal flatten if it also needs to preserve an original column
final Table source = emptyTable(10)
.select("I = ii")
.updateView("J = ii")
.where("I % 2 == 0");

// here `Foo` should be flattened, but `Bar` must be preserved; `Baz` is just for fun
final Table result = source.select("Foo = J", "Bar = I", "Baz = Foo");

Assert.assertFalse(result.isFlat());

final ColumnSource<?> foo = result.getColumnSource("Foo");
final ColumnSource<?> bar = result.getColumnSource("Bar");
final ColumnSource<?> baz = result.getColumnSource("Baz");
result.getRowSet().forAllRowKeys(rowKey -> {
Assert.assertEquals(rowKey, foo.getLong(rowKey));
Assert.assertEquals(rowKey, bar.getLong(rowKey));
Assert.assertEquals(rowKey, baz.getLong(rowKey));
});

// Note that Foo is still being "selected" and therefore "brought into memory"
Assert.assertNotEquals(foo, source.getColumnSource("J"));
Assert.assertEquals(bar, source.getColumnSource("I"));
Assert.assertEquals(baz, foo);
}
}

0 comments on commit 08abba8

Please sign in to comment.