diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/SourcePartitionedTable.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/SourcePartitionedTable.java index 7ef10f4e31a..c1636aad52e 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/SourcePartitionedTable.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/SourcePartitionedTable.java @@ -250,27 +250,42 @@ private void processPendingLocations(final boolean notifyListeners) { try (final TableLocationSubscriptionBuffer.LocationUpdate locationUpdate = subscriptionBuffer.processPending()) { if (locationUpdate == null) { - return; + removed = null; + } else { + removed = processRemovals(locationUpdate); + processAdditions(locationUpdate); } - removed = processRemovals(locationUpdate); - added = processAdditions(locationUpdate); + added = checkPendingLocations(); } - resultRows.update(added, removed); + if (removed == null) { + if (added == null) { + return; + } + resultRows.insert(added); + } else if (added == null) { + resultRows.remove(removed); + } else { + resultRows.update(added, removed); + } if (notifyListeners) { result.notifyListeners(new TableUpdateImpl( - added, - removed, + added == null ? RowSetFactory.empty() : added, + removed == null ? RowSetFactory.empty() : removed, RowSetFactory.empty(), RowSetShiftData.EMPTY, ModifiedColumnSet.EMPTY)); } else { - added.close(); - removed.close(); + if (added != null) { + added.close(); + } + if (removed != null) { + removed.close(); + } } } - private RowSet processAdditions(final TableLocationSubscriptionBuffer.LocationUpdate locationUpdate) { + private void processAdditions(final TableLocationSubscriptionBuffer.LocationUpdate locationUpdate) { /* * This block of code is unfortunate, because it largely duplicates the intent and effort of similar code in * RegionedColumnSourceManager. I think that the RegionedColumnSourceManager could be changed to @@ -280,13 +295,18 @@ private RowSet processAdditions(final TableLocationSubscriptionBuffer.LocationUp * population in STM ColumnSources. */ // TODO (https://github.com/deephaven/deephaven-core/issues/867): Refactor around a ticking partition table - locationUpdate.getPendingAddedLocationKeys().stream() - .map(LiveSupplier::get) - .filter(locationKeyMatcher) - .map(tableLocationProvider::getTableLocation) - .peek(this::manage) - .map(PendingLocationState::new) - .forEach(pendingLocationStates::offer); + if (locationUpdate != null) { + locationUpdate.getPendingAddedLocationKeys().stream() + .map(LiveSupplier::get) + .filter(locationKeyMatcher) + .map(tableLocationProvider::getTableLocation) + .peek(this::manage) + .map(PendingLocationState::new) + .forEach(pendingLocationStates::offer); + } + } + + private RowSet checkPendingLocations() { for (final Iterator iter = pendingLocationStates.iterator(); iter.hasNext();) { final PendingLocationState pendingLocationState = iter.next(); if (pendingLocationState.exists()) { @@ -296,7 +316,7 @@ private RowSet processAdditions(final TableLocationSubscriptionBuffer.LocationUp } if (readyLocationStates.isEmpty()) { - return RowSetFactory.empty(); + return null; } final RowSet added = sortAndAddLocations(readyLocationStates.stream().map(PendingLocationState::release)); diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/locations/impl/TableLocationSubscriptionBuffer.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/locations/impl/TableLocationSubscriptionBuffer.java index 9dcd940d496..97b6d3d825e 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/locations/impl/TableLocationSubscriptionBuffer.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/locations/impl/TableLocationSubscriptionBuffer.java @@ -129,7 +129,8 @@ public void close() { * reset). No order is maintained internally. If a pending exception is thrown, this signals that the subscription * is no longer valid and no subsequent location keys will be returned. * - * @return The collection of pending location keys. + * @return A {@link LocationUpdate} collecting pending added and removed location keys, or {@code null} if there are + * none; the caller must {@link LocationUpdate#close() close} the returned object when done. */ public synchronized LocationUpdate processPending() { if (!subscribed) { diff --git a/engine/table/src/test/java/io/deephaven/engine/table/impl/SourcePartitionedTableTest.java b/engine/table/src/test/java/io/deephaven/engine/table/impl/SourcePartitionedTableTest.java index b040d3a7a5f..9d995790e0f 100644 --- a/engine/table/src/test/java/io/deephaven/engine/table/impl/SourcePartitionedTableTest.java +++ b/engine/table/src/test/java/io/deephaven/engine/table/impl/SourcePartitionedTableTest.java @@ -62,35 +62,42 @@ public void tearDown() throws Exception { private QueryTable p2; private QueryTable p3; private QueryTable p4; + private QueryTable p5; private DependentRegistrar registrar; private TableBackedTableLocationProvider tlp; private SourcePartitionedTable setUpData() { - p1 = testRefreshingTable(i(0, 1, 2, 3).toTracking(), + p1 = testRefreshingTable(ir(0, 3).toTracking(), stringCol("Sym", "aa", "bb", "aa", "bb"), intCol("intCol", 10, 20, 40, 60), doubleCol("doubleCol", 0.1, 0.2, 0.4, 0.6)); p1.setAttribute(Table.APPEND_ONLY_TABLE_ATTRIBUTE, true); - p2 = testRefreshingTable(i(0, 1, 2, 3).toTracking(), + p2 = testRefreshingTable(ir(0, 3).toTracking(), stringCol("Sym", "cc", "dd", "cc", "dd"), intCol("intCol", 100, 200, 400, 600), doubleCol("doubleCol", 0.1, 0.2, 0.4, 0.6)); p2.setAttribute(Table.APPEND_ONLY_TABLE_ATTRIBUTE, true); - p3 = testRefreshingTable(i(0, 1, 2, 3).toTracking(), + p3 = testRefreshingTable(ir(0, 3).toTracking(), stringCol("Sym", "ee", "ff", "ee", "ff"), intCol("intCol", 1000, 2000, 4000, 6000), doubleCol("doubleCol", 0.1, 0.2, 0.4, 0.6)); p3.setAttribute(Table.APPEND_ONLY_TABLE_ATTRIBUTE, true); - p4 = testRefreshingTable(i(0, 1, 2, 3).toTracking(), + p4 = testRefreshingTable(ir(0, 3).toTracking(), stringCol("Sym", "gg", "hh", "gg", "hh"), intCol("intCol", 10000, 20000, 40000, 60000), doubleCol("doubleCol", 0.1, 0.2, 0.4, 0.6)); p4.setAttribute(Table.APPEND_ONLY_TABLE_ATTRIBUTE, true); + p5 = testRefreshingTable(i().toTracking(), // Initially empty + stringCol("Sym"), + intCol("intCol"), + doubleCol("doubleCol")); + p5.setAttribute(Table.APPEND_ONLY_TABLE_ATTRIBUTE, true); + registrar = new DependentRegistrar(); tlp = new TableBackedTableLocationProvider( registrar, @@ -238,7 +245,7 @@ public void testAddAndRemoveLocations() { */ final TableLocation location5; try (final SafeCloseable ignored = LivenessScopeStack.open(new LivenessScope(), true)) { - final QueryTable p5 = testRefreshingTable(i(0, 1, 2, 3).toTracking(), + final QueryTable p5 = testRefreshingTable(ir(0, 3).toTracking(), stringCol("Sym", "ii", "jj", "ii", "jj"), intCol("intCol", 10000, 20000, 40000, 60000), doubleCol("doubleCol", 0.1, 0.2, 0.4, 0.6)); @@ -407,4 +414,27 @@ public void testCantReadPrev() { TableLocationRemovedException.class).isPresent())); getUpdateErrors().clear(); } + + @Test + public void testInitiallyEmptyLocation() { + final SourcePartitionedTable spt = setUpData(); + final Table ptSummary = spt.merge().selectDistinct("Sym"); + verifyStringColumnContents(ptSummary, "Sym", "aa", "bb", "cc", "dd"); + tlp.add(p5); + updateGraph.getDelegate().runWithinUnitTestCycle(() -> { + updateGraph.refreshSources(); + // We refreshed the source first, so it won't see a new size for the location backed by p5 on this cycle. + addToTable(p5, ir(0, 3), + stringCol("Sym", "ii", "jj", "kk", "ll"), + intCol("intCol", 10000, 20000, 40000, 60000), + doubleCol("doubleCol", 0.1, 0.2, 0.4, 0.6)); + }, true); + verifyStringColumnContents(ptSummary, "Sym", "aa", "bb", "cc", "dd"); + updateGraph.getDelegate().runWithinUnitTestCycle(() -> { + updateGraph.refreshSources(); + // Now the source has been refreshed, so it should see the new size of the location backed by p5, and + // include it in the result. + }, true); + verifyStringColumnContents(ptSummary, "Sym", "aa", "bb", "cc", "dd", "ii", "jj", "kk", "ll"); + } } diff --git a/engine/test-utils/src/main/java/io/deephaven/engine/testutil/TstUtils.java b/engine/test-utils/src/main/java/io/deephaven/engine/testutil/TstUtils.java index 9518badf6af..d94439da608 100644 --- a/engine/test-utils/src/main/java/io/deephaven/engine/testutil/TstUtils.java +++ b/engine/test-utils/src/main/java/io/deephaven/engine/testutil/TstUtils.java @@ -136,6 +136,17 @@ public static WritableRowSet i(long... keys) { return RowSetFactory.fromKeys(keys); } + /** + * A shorthand for {@link RowSetFactory#fromRange(long, long)} for use in unit tests. + * + * @param firstRowKey the first key of the new RowSet + * @param lastRowKey the last key (inclusive) of the new RowSet + * @return a new RowSet with the given key range + */ + public static WritableRowSet ir(final long firstRowKey, final long lastRowKey) { + return RowSetFactory.fromRange(firstRowKey, lastRowKey); + } + public static void addToTable(final Table table, final RowSet rowSet, final ColumnHolder... columnHolders) { if (rowSet.isEmpty()) { return;