Skip to content

Commit

Permalink
fix: correct OuterJoinTools.leftOuterJoin/fullOuterJoin() output wh…
Browse files Browse the repository at this point in the history
…en RHS initally empty (#6548)
  • Loading branch information
lbooker42 authored Jan 10, 2025
1 parent 3b3661a commit 7a9526c
Show file tree
Hide file tree
Showing 3 changed files with 233 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import io.deephaven.engine.table.impl.sources.BitMaskingColumnSource;
import io.deephaven.engine.table.impl.sources.BitShiftingColumnSource;
import io.deephaven.engine.table.impl.sources.CrossJoinRightColumnSource;
import io.deephaven.engine.table.impl.sources.NullValueColumnSource;
import io.deephaven.util.SafeCloseableList;
import io.deephaven.util.mutable.MutableInt;
import io.deephaven.util.mutable.MutableLong;
Expand Down Expand Up @@ -1418,7 +1419,11 @@ private static <T extends ColumnSource<?>> QueryTable makeResult(
}

for (MatchPair mp : columnsToAdd) {
final T wrappedSource = newRightColumnSource.apply(rightTable.getColumnSource(mp.rightColumn()));
// If rhs is empty and static, can substitute with a NullValueColumnSource
final ColumnSource<?> rcs = rightTable.getColumnSource(mp.rightColumn());
final ColumnSource<?> wrappedSource = rightTable.isEmpty() && !rightTable.isRefreshing()
? NullValueColumnSource.getInstance(rcs.getType(), rcs.getComponentType())
: newRightColumnSource.apply(rcs);
columnSourceMap.put(mp.leftColumn(), wrappedSource);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,11 @@ public class BitMaskingColumnSource<T> extends AbstractColumnSource<T> implement
public static <T> ColumnSource<T> maybeWrap(
final ZeroKeyCrossJoinShiftState shiftState,
@NotNull final ColumnSource<T> innerSource) {
if (innerSource instanceof RowKeyAgnosticChunkSource) {
if (innerSource instanceof NullValueColumnSource) {
return innerSource;
}
// We must wrap all other sources to leverage shiftState.rightEmpty() and shiftState.rightEmptyPrev()
// before calling the inner source.
return new BitMaskingColumnSource<>(shiftState, innerSource);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,22 @@
//
package io.deephaven.engine.util;

import io.deephaven.engine.context.ExecutionContext;
import io.deephaven.engine.rowset.RowSet;
import io.deephaven.engine.rowset.RowSetFactory;
import io.deephaven.engine.table.Table;
import io.deephaven.engine.table.impl.QueryTable;
import io.deephaven.engine.table.vectors.ColumnVectors;
import io.deephaven.engine.testutil.ControlledUpdateGraph;
import io.deephaven.engine.testutil.TstUtils;
import io.deephaven.engine.testutil.junit4.EngineCleanup;
import org.junit.Rule;
import org.junit.Test;

import static io.deephaven.engine.testutil.TstUtils.testRefreshingTable;
import static io.deephaven.engine.testutil.TstUtils.testTable;
import static io.deephaven.engine.util.TableTools.col;
import static io.deephaven.engine.util.TableTools.intCol;
import java.util.Collections;

import static io.deephaven.engine.testutil.TstUtils.*;
import static io.deephaven.engine.util.TableTools.*;
import static io.deephaven.util.QueryConstants.NULL_INT;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
Expand Down Expand Up @@ -253,4 +259,218 @@ public void testFullOuterJoinIdentityMatchWithAddColumn() {
assertArrayEquals(new int[] {0, 2, 4, 6}, ColumnVectors.ofInt(result, "b").toArray());
assertArrayEquals(new int[] {0, 3, 6, 9}, ColumnVectors.ofInt(result, "c").toArray());
}

@Test
public void testStaticEmptyRightZeroKey() {
final Table lhs = TableTools.emptyTable(5).update("I=i");
final Table rhs = TableTools.emptyTable(0).update("J=`asdf`");

Table result = OuterJoinTools.leftOuterJoin(lhs, rhs, Collections.emptyList());
Table expected = lhs.update("J=(String)(null)");
assertTableEquals(expected, result);

result = OuterJoinTools.fullOuterJoin(lhs, rhs, Collections.emptyList());
expected = lhs.update("J=(String)(null)");
assertTableEquals(expected, result);
}

@Test
public void testDynamicRightLeftOuterJoinZeroKey() {
final Table lhs = testRefreshingTable(intCol("I", 0, 1));
final QueryTable rhsSource = testRefreshingTable(stringCol("J"));

// This update creates a SingleValueColumnSource for the RHS table, which causes problems if we query the
// column source for a value that doesn't exist in the table.
final Table rhs = rhsSource.update("J = `asdf`");

final Table result = OuterJoinTools.leftOuterJoin(lhs, rhs, Collections.emptyList());

assertEquals(2, result.numColumns());
assertEquals("I", result.getDefinition().getColumnsArray()[0].getName());
assertEquals("J", result.getDefinition().getColumnsArray()[1].getName());

assertEquals(2, result.size());
assertArrayEquals(new int[] {0, 1}, ColumnVectors.ofInt(result, "I").toArray());
assertArrayEquals(new String[] {null, null}, ColumnVectors.ofObject(result, "J", String.class).toArray());

ControlledUpdateGraph updateGraph = ExecutionContext.getContext().getUpdateGraph().cast();

// Add some matching and non-matching rows to RHS
updateGraph.runWithinUnitTestCycle(() -> {
final RowSet newKeys = i(10);
TstUtils.addToTable(rhsSource, newKeys, stringCol("J", "anything"));
rhsSource.notifyListeners(newKeys, RowSetFactory.empty(), RowSetFactory.empty());
});

assertEquals(2, result.size());
assertArrayEquals(new int[] {0, 1}, ColumnVectors.ofInt(result, "I").toArray());
assertArrayEquals(new String[] {"asdf", "asdf"}, ColumnVectors.ofObject(result, "J", String.class).toArray());

// Remove all rows from RHS
updateGraph.runWithinUnitTestCycle(() -> {
final RowSet removedKeys = i(10);
TstUtils.removeRows(rhsSource, removedKeys);
rhsSource.notifyListeners(RowSetFactory.empty(), removedKeys, RowSetFactory.empty());
});

assertEquals(2, result.size());
assertArrayEquals(new int[] {0, 1}, ColumnVectors.ofInt(result, "I").toArray());
assertArrayEquals(new String[] {null, null}, ColumnVectors.ofObject(result, "J", String.class).toArray());
}

@Test
public void testDynamicRightFullOuterJoinZeroKey() {
final Table lhs = testRefreshingTable(intCol("I", 0, 1));
final QueryTable rhsSource = testRefreshingTable(stringCol("J"));

// This update creates a SingleValueColumnSource for the RHS table, which causes problems if we query the
// column source for a value that doesn't exist in the table.
final Table rhs = rhsSource.update("J = `asdf`");

final Table result = OuterJoinTools.fullOuterJoin(lhs, rhs, Collections.emptyList());

assertEquals(2, result.numColumns());
assertEquals("I", result.getDefinition().getColumnsArray()[0].getName());
assertEquals("J", result.getDefinition().getColumnsArray()[1].getName());

assertEquals(2, result.size());
assertArrayEquals(new int[] {0, 1}, ColumnVectors.ofInt(result, "I").toArray());
assertArrayEquals(new String[] {null, null}, ColumnVectors.ofObject(result, "J", String.class).toArray());

ControlledUpdateGraph updateGraph = ExecutionContext.getContext().getUpdateGraph().cast();

// Add some matching and non-matching rows to RHS
updateGraph.runWithinUnitTestCycle(() -> {
final RowSet newKeys = i(10);
TstUtils.addToTable(rhsSource, newKeys, stringCol("J", "anything"));
rhsSource.notifyListeners(newKeys, RowSetFactory.empty(), RowSetFactory.empty());
});

assertEquals(2, result.size());
assertArrayEquals(new int[] {0, 1}, ColumnVectors.ofInt(result, "I").toArray());
assertArrayEquals(new String[] {"asdf", "asdf"}, ColumnVectors.ofObject(result, "J", String.class).toArray());

// Remove all rows from RHS
updateGraph.runWithinUnitTestCycle(() -> {
final RowSet removedKeys = i(10);
TstUtils.removeRows(rhsSource, removedKeys);
rhsSource.notifyListeners(RowSetFactory.empty(), removedKeys, RowSetFactory.empty());
});

assertEquals(2, result.size());
assertArrayEquals(new int[] {0, 1}, ColumnVectors.ofInt(result, "I").toArray());
assertArrayEquals(new String[] {null, null}, ColumnVectors.ofObject(result, "J", String.class).toArray());
}

@Test
public void testStaticEmptyRight() {
final Table lhs = TableTools.emptyTable(5).update("I=i");
final Table rhs = TableTools.emptyTable(0).update("J=i", "K=`asdf`");

Table result = OuterJoinTools.leftOuterJoin(lhs, rhs, "I=J");
Table expected = lhs.update("J=(int)(null)", "K=(String)(null)");
assertTableEquals(expected, result);

result = OuterJoinTools.fullOuterJoin(lhs, rhs, "I=J");
expected = lhs.update("J=(int)(null)", "K=(String)(null)");
assertTableEquals(expected, result);
}

@Test
public void testDynamicRightLeftOuterJoin() {
final Table lhs = testRefreshingTable(intCol("I", 0, 1));
final QueryTable rhsSource = testRefreshingTable(shortCol("J"));

// This update creates a SingleValueColumnSource for the RHS table, which causes problems if we query the
// column source for a value that doesn't exist in the table.
final Table rhs = rhsSource.update("J=(int)1", "K=`asdf`");

final Table result = OuterJoinTools.leftOuterJoin(lhs, rhs, "I=J");

assertEquals(3, result.numColumns());
assertEquals("I", result.getDefinition().getColumnsArray()[0].getName());
assertEquals("J", result.getDefinition().getColumnsArray()[1].getName());
assertEquals("K", result.getDefinition().getColumnsArray()[2].getName());

assertEquals(2, result.size());
assertArrayEquals(new int[] {0, 1}, ColumnVectors.ofInt(result, "I").toArray());
assertArrayEquals(new int[] {NULL_INT, NULL_INT}, ColumnVectors.ofInt(result, "J").toArray());
assertArrayEquals(new String[] {null, null}, ColumnVectors.ofObject(result, "K", String.class).toArray());

ControlledUpdateGraph updateGraph = ExecutionContext.getContext().getUpdateGraph().cast();

// Add some rows to RHS
updateGraph.runWithinUnitTestCycle(() -> {
final RowSet newKeys = i(10, 11);
TstUtils.addToTable(rhsSource, newKeys, shortCol("J", (short) 100, (short) 200));
rhsSource.notifyListeners(newKeys, RowSetFactory.empty(), RowSetFactory.empty());
});

assertEquals(3, result.size());
assertArrayEquals(new int[] {0, 1, 1}, ColumnVectors.ofInt(result, "I").toArray());
assertArrayEquals(new int[] {NULL_INT, 1, 1}, ColumnVectors.ofInt(result, "J").toArray());
assertArrayEquals(new String[] {null, "asdf", "asdf"},
ColumnVectors.ofObject(result, "K", String.class).toArray());

// Remove all rows from RHS
updateGraph.runWithinUnitTestCycle(() -> {
final RowSet removedKeys = i(10, 11);
TstUtils.removeRows(rhsSource, removedKeys);
rhsSource.notifyListeners(RowSetFactory.empty(), removedKeys, RowSetFactory.empty());
});

assertEquals(2, result.size());
assertArrayEquals(new int[] {0, 1}, ColumnVectors.ofInt(result, "I").toArray());
assertArrayEquals(new int[] {NULL_INT, NULL_INT}, ColumnVectors.ofInt(result, "J").toArray());
assertArrayEquals(new String[] {null, null}, ColumnVectors.ofObject(result, "K", String.class).toArray());
}

@Test
public void testDynamicRightFullOuterJoin() {
final Table lhs = testRefreshingTable(intCol("I", 0, 1));
final QueryTable rhsSource = testRefreshingTable(shortCol("J"));

// This update creates a SingleValueColumnSource for the RHS table, which causes problems if we query the
// column source for a value that doesn't exist in the table.
final Table rhs = rhsSource.update("J=(int)1", "K=`asdf`");

final Table result = OuterJoinTools.fullOuterJoin(lhs, rhs, "I=J");

assertEquals(3, result.numColumns());
assertEquals("I", result.getDefinition().getColumnsArray()[0].getName());
assertEquals("J", result.getDefinition().getColumnsArray()[1].getName());
assertEquals("K", result.getDefinition().getColumnsArray()[2].getName());

assertEquals(2, result.size());
assertArrayEquals(new int[] {0, 1}, ColumnVectors.ofInt(result, "I").toArray());
assertArrayEquals(new int[] {NULL_INT, NULL_INT}, ColumnVectors.ofInt(result, "J").toArray());
assertArrayEquals(new String[] {null, null}, ColumnVectors.ofObject(result, "K", String.class).toArray());

ControlledUpdateGraph updateGraph = ExecutionContext.getContext().getUpdateGraph().cast();

// Add some rows to RHS
updateGraph.runWithinUnitTestCycle(() -> {
final RowSet newKeys = i(10, 11);
TstUtils.addToTable(rhsSource, newKeys, shortCol("J", (short) 100, (short) 200));
rhsSource.notifyListeners(newKeys, RowSetFactory.empty(), RowSetFactory.empty());
});

assertEquals(3, result.size());
assertArrayEquals(new int[] {0, 1, 1}, ColumnVectors.ofInt(result, "I").toArray());
assertArrayEquals(new int[] {NULL_INT, 1, 1}, ColumnVectors.ofInt(result, "J").toArray());
assertArrayEquals(new String[] {null, "asdf", "asdf"},
ColumnVectors.ofObject(result, "K", String.class).toArray());

// Remove all rows from RHS
updateGraph.runWithinUnitTestCycle(() -> {
final RowSet removedKeys = i(10, 11);
TstUtils.removeRows(rhsSource, removedKeys);
rhsSource.notifyListeners(RowSetFactory.empty(), removedKeys, RowSetFactory.empty());
});

assertEquals(2, result.size());
assertArrayEquals(new int[] {0, 1}, ColumnVectors.ofInt(result, "I").toArray());
assertArrayEquals(new int[] {NULL_INT, NULL_INT}, ColumnVectors.ofInt(result, "J").toArray());
assertArrayEquals(new String[] {null, null}, ColumnVectors.ofObject(result, "K", String.class).toArray());
}
}

0 comments on commit 7a9526c

Please sign in to comment.