From 08abba8bcc30f1ee00c29a5be804198db8caf220 Mon Sep 17 00:00:00 2001 From: Nathaniel Bauernfeind Date: Mon, 30 Oct 2023 17:51:18 -0600 Subject: [PATCH] Static Select: Re-Use Already Flattened Results --- .../select/analyzers/PreserveColumnLayer.java | 14 +++- .../analyzers/SelectAndViewAnalyzer.java | 52 +++++++++++--- .../impl/QueryTableSelectUpdateTest.java | 71 +++++++++++++++++++ 3 files changed, 128 insertions(+), 9 deletions(-) diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/PreserveColumnLayer.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/PreserveColumnLayer.java index 8fe1841b205..a74e1efdfae 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/PreserveColumnLayer.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/PreserveColumnLayer.java @@ -24,11 +24,13 @@ */ final public class PreserveColumnLayer extends DependencyLayerBase { private final BitSet dependencyBitSet; + private final boolean flattenedResult; PreserveColumnLayer(SelectAndViewAnalyzer inner, String name, SelectColumn sc, ColumnSource cs, String[] deps, - ModifiedColumnSet mcsBuilder) { + ModifiedColumnSet mcsBuilder, boolean alreadyFlattenedSources) { super(inner, name, sc, cs, deps, mcsBuilder); this.dependencyBitSet = new BitSet(); + this.flattenedResult = alreadyFlattenedSources; Arrays.stream(deps).mapToInt(inner::getLayerIndexFor).forEach(dependencyBitSet::set); } @@ -75,6 +77,16 @@ public LogOutput append(LogOutput logOutput) { .append("}"); } + @Override + public boolean flattenedResult() { + return flattenedResult; + } + + @Override + public boolean alreadyFlattenedSources() { + // we can only preserve a flat source if it was already made flat + return flattenedResult; + } @Override public boolean allowCrossColumnParallelization() { diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/SelectAndViewAnalyzer.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/SelectAndViewAnalyzer.java index eb34118632c..55b4beb173a 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/SelectAndViewAnalyzer.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/SelectAndViewAnalyzer.java @@ -5,6 +5,7 @@ import io.deephaven.base.Pair; import io.deephaven.base.log.LogOutputAppendable; +import io.deephaven.base.verify.Assert; import io.deephaven.datastructures.util.CollectionUtil; import io.deephaven.engine.liveness.LivenessNode; import io.deephaven.engine.rowset.RowSet; @@ -26,6 +27,7 @@ import io.deephaven.engine.table.impl.util.WritableRowRedirection; import io.deephaven.engine.updategraph.UpdateGraph; import io.deephaven.io.log.impl.LogOutputStringImpl; +import io.deephaven.io.streams.ByteBufferStreams; import io.deephaven.util.SafeCloseable; import io.deephaven.util.SafeCloseablePair; import io.deephaven.vector.Vector; @@ -100,6 +102,7 @@ public static SelectAndViewAnalyzerWrapper create( boolean shiftColumnHasPositiveOffset = false; final HashSet resultColumns = flattenedResult ? new HashSet<>() : null; + final HashMap> resultAlias = flattenedResult ? new HashMap<>() : null; for (final SelectColumn sc : selectColumns) { if (remainingCols != null) { remainingCols.add(sc); @@ -156,20 +159,47 @@ public static SelectAndViewAnalyzerWrapper create( continue; } + final SourceColumn realColumn; + if (sc instanceof SourceColumn) { + realColumn = (SourceColumn) sc; + } else if ((sc instanceof SwitchColumn) && ((SwitchColumn) sc).getRealColumn() instanceof SourceColumn) { + realColumn = (SourceColumn) ((SwitchColumn) sc).getRealColumn(); + } else { + realColumn = null; + } + if (shouldPreserve(sc)) { - if (numberOfInternallyFlattenedColumns > 0) { + // this must be a source column to be preserved + Assert.neqNull(realColumn, "realColumn"); + + boolean sourceIsNew = resultColumns != null && resultColumns.contains(realColumn.getSourceName()); + if (!sourceIsNew && numberOfInternallyFlattenedColumns > 0) { // we must preserve this column, but have already created an analyzer for the internally flattened // column, therefore must start over without permitting internal flattening return create(sourceTable, mode, columnSources, originalRowSet, parentMcs, publishTheseSources, - false, selectColumns); + useShiftedColumns, false, selectColumns); + } + + analyzer = analyzer.createLayerForPreserve( + sc.getName(), sc, sc.getDataView(), distinctDeps, mcsBuilder, flatResult && flattenedResult); + + if (!sourceIsNew) { + // we can not flatten future columns because we are preserving this column + flattenedResult = false; } - analyzer = - analyzer.createLayerForPreserve(sc.getName(), sc, sc.getDataView(), distinctDeps, mcsBuilder); - // we can not flatten future columns because we are preserving this column - flattenedResult = false; continue; } + if (flattenedResult && realColumn != null && !resultColumns.contains(realColumn.getSourceName())) { + // this result column a candidate for preservation + final ColumnSource alias = resultAlias.get(realColumn.getSourceName()); + if (alias != null) { + analyzer = analyzer.createLayerForPreserve( + sc.getName(), sc, alias, distinctDeps, mcsBuilder, flatResult); + continue; + } + } + final long targetDestinationCapacity = rowSet.isEmpty() ? 0 : (flattenedResult ? rowSet.size() : rowSet.lastRowKey() + 1); switch (mode) { @@ -189,6 +219,12 @@ public static SelectAndViewAnalyzerWrapper create( final WritableColumnSource scs = flatResult || flattenedResult ? sc.newFlatDestInstance(targetDestinationCapacity) : sc.newDestInstance(targetDestinationCapacity); + + if (flattenedResult && realColumn != null && !resultColumns.contains(realColumn.getSourceName())) { + // this source column a candidate for preservation if more than + resultAlias.put(realColumn.getSourceName(), scs); + } + analyzer = analyzer.createLayerForSelect(updateGraph, rowSet, sc.getName(), sc, scs, null, distinctDeps, mcsBuilder, false, flattenedResult, flatResult && flattenedResult); if (flattenedResult) { @@ -350,8 +386,8 @@ private SelectAndViewAnalyzer createLayerForView(String name, SelectColumn sc, C } private SelectAndViewAnalyzer createLayerForPreserve(String name, SelectColumn sc, ColumnSource cs, - String[] parentColumnDependencies, ModifiedColumnSet mcsBuilder) { - return new PreserveColumnLayer(this, name, sc, cs, parentColumnDependencies, mcsBuilder); + String[] parentColumnDependencies, ModifiedColumnSet mcsBuilder, boolean alreadyFlattened) { + return new PreserveColumnLayer(this, name, sc, cs, parentColumnDependencies, mcsBuilder, alreadyFlattened); } abstract void populateModifiedColumnSetRecurse(ModifiedColumnSet mcsBuilder, Set remainingDepsToSatisfy); diff --git a/engine/table/src/test/java/io/deephaven/engine/table/impl/QueryTableSelectUpdateTest.java b/engine/table/src/test/java/io/deephaven/engine/table/impl/QueryTableSelectUpdateTest.java index 7434df60580..d0271a1174d 100644 --- a/engine/table/src/test/java/io/deephaven/engine/table/impl/QueryTableSelectUpdateTest.java +++ b/engine/table/src/test/java/io/deephaven/engine/table/impl/QueryTableSelectUpdateTest.java @@ -1115,4 +1115,75 @@ public void testRegressionGH3562() { assertTableEquals(expected, result); } + + @Test + public void testStaticSelectPreserveAlreadyFlattenedColumns() { + final Table source = emptyTable(10).updateView("I = ii").where("I % 2 == 0"); + final Table result = source.select("Foo = I", "Bar = Foo", "Baz = I"); + + Assert.assertTrue(result.isFlat()); + + final ColumnSource foo = result.getColumnSource("Foo"); + final ColumnSource bar = result.getColumnSource("Bar"); + final ColumnSource baz = result.getColumnSource("Baz"); + result.getRowSet().forAllRowKeys(rowKey -> { + Assert.assertEquals(rowKey * 2, foo.getLong(rowKey)); + Assert.assertEquals(rowKey * 2, bar.getLong(rowKey)); + Assert.assertEquals(rowKey * 2, baz.getLong(rowKey)); + }); + + Assert.assertEquals(foo, bar); + Assert.assertEquals(foo, baz); + } + + @Test + public void testStaticSelectPreserveColumn() { + final Table source = emptyTable(10).select("I = ii").where("I % 2 == 0"); + final Table result = source.select("Foo = I", "Bar = Foo", "Baz = I"); + + Assert.assertFalse(result.isFlat()); + + final ColumnSource orig = source.getColumnSource("I"); + final ColumnSource foo = result.getColumnSource("Foo"); + final ColumnSource bar = result.getColumnSource("Bar"); + final ColumnSource baz = result.getColumnSource("Baz"); + result.getRowSet().forAllRowKeys(rowKey -> { + Assert.assertEquals(rowKey, foo.getLong(rowKey)); + Assert.assertEquals(rowKey, bar.getLong(rowKey)); + Assert.assertEquals(rowKey, baz.getLong(rowKey)); + }); + + // These columns were preserved and no flattening occurred. + Assert.assertEquals(orig, foo); + Assert.assertEquals(orig, bar); + Assert.assertEquals(orig, baz); + } + + @Test + public void testStaticSelectRevertInternalFlatten() { + // there is some special logic that prevents an internal flatten if it also needs to preserve an original column + final Table source = emptyTable(10) + .select("I = ii") + .updateView("J = ii") + .where("I % 2 == 0"); + + // here `Foo` should be flattened, but `Bar` must be preserved; `Baz` is just for fun + final Table result = source.select("Foo = J", "Bar = I", "Baz = Foo"); + + Assert.assertFalse(result.isFlat()); + + final ColumnSource foo = result.getColumnSource("Foo"); + final ColumnSource bar = result.getColumnSource("Bar"); + final ColumnSource baz = result.getColumnSource("Baz"); + result.getRowSet().forAllRowKeys(rowKey -> { + Assert.assertEquals(rowKey, foo.getLong(rowKey)); + Assert.assertEquals(rowKey, bar.getLong(rowKey)); + Assert.assertEquals(rowKey, baz.getLong(rowKey)); + }); + + // Note that Foo is still being "selected" and therefore "brought into memory" + Assert.assertNotEquals(foo, source.getColumnSource("J")); + Assert.assertEquals(bar, source.getColumnSource("I")); + Assert.assertEquals(baz, foo); + } }