Skip to content

Commit

Permalink
fix: DH-18433: SourcePartitionedTable needs to check the size of pend…
Browse files Browse the repository at this point in the history
…ing locations even if there are no added or removed locations (#6570) (#6571)
  • Loading branch information
rcaudy authored Jan 17, 2025
1 parent 1518ad5 commit d3148f7
Show file tree
Hide file tree
Showing 4 changed files with 85 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -250,27 +250,42 @@ private void processPendingLocations(final boolean notifyListeners) {
try (final TableLocationSubscriptionBuffer.LocationUpdate locationUpdate =
subscriptionBuffer.processPending()) {
if (locationUpdate == null) {
return;
removed = null;
} else {
removed = processRemovals(locationUpdate);
processAdditions(locationUpdate);
}
removed = processRemovals(locationUpdate);
added = processAdditions(locationUpdate);
added = checkPendingLocations();
}

resultRows.update(added, removed);
if (removed == null) {
if (added == null) {
return;
}
resultRows.insert(added);
} else if (added == null) {
resultRows.remove(removed);
} else {
resultRows.update(added, removed);
}
if (notifyListeners) {
result.notifyListeners(new TableUpdateImpl(
added,
removed,
added == null ? RowSetFactory.empty() : added,
removed == null ? RowSetFactory.empty() : removed,
RowSetFactory.empty(),
RowSetShiftData.EMPTY,
ModifiedColumnSet.EMPTY));
} else {
added.close();
removed.close();
if (added != null) {
added.close();
}
if (removed != null) {
removed.close();
}
}
}

private RowSet processAdditions(final TableLocationSubscriptionBuffer.LocationUpdate locationUpdate) {
private void processAdditions(final TableLocationSubscriptionBuffer.LocationUpdate locationUpdate) {
/*
* This block of code is unfortunate, because it largely duplicates the intent and effort of similar code in
* RegionedColumnSourceManager. I think that the RegionedColumnSourceManager could be changed to
Expand All @@ -280,13 +295,18 @@ private RowSet processAdditions(final TableLocationSubscriptionBuffer.LocationUp
* population in STM ColumnSources.
*/
// TODO (https://github.com/deephaven/deephaven-core/issues/867): Refactor around a ticking partition table
locationUpdate.getPendingAddedLocationKeys().stream()
.map(LiveSupplier::get)
.filter(locationKeyMatcher)
.map(tableLocationProvider::getTableLocation)
.peek(this::manage)
.map(PendingLocationState::new)
.forEach(pendingLocationStates::offer);
if (locationUpdate != null) {
locationUpdate.getPendingAddedLocationKeys().stream()
.map(LiveSupplier::get)
.filter(locationKeyMatcher)
.map(tableLocationProvider::getTableLocation)
.peek(this::manage)
.map(PendingLocationState::new)
.forEach(pendingLocationStates::offer);
}
}

private RowSet checkPendingLocations() {
for (final Iterator<PendingLocationState> iter = pendingLocationStates.iterator(); iter.hasNext();) {
final PendingLocationState pendingLocationState = iter.next();
if (pendingLocationState.exists()) {
Expand All @@ -296,7 +316,7 @@ private RowSet processAdditions(final TableLocationSubscriptionBuffer.LocationUp
}

if (readyLocationStates.isEmpty()) {
return RowSetFactory.empty();
return null;
}

final RowSet added = sortAndAddLocations(readyLocationStates.stream().map(PendingLocationState::release));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,8 @@ public void close() {
* reset). No order is maintained internally. If a pending exception is thrown, this signals that the subscription
* is no longer valid and no subsequent location keys will be returned.
*
* @return The collection of pending location keys.
* @return A {@link LocationUpdate} collecting pending added and removed location keys, or {@code null} if there are
* none; the caller must {@link LocationUpdate#close() close} the returned object when done.
*/
public synchronized LocationUpdate processPending() {
if (!subscribed) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,35 +62,42 @@ public void tearDown() throws Exception {
private QueryTable p2;
private QueryTable p3;
private QueryTable p4;
private QueryTable p5;

private DependentRegistrar registrar;
private TableBackedTableLocationProvider tlp;

private SourcePartitionedTable setUpData() {
p1 = testRefreshingTable(i(0, 1, 2, 3).toTracking(),
p1 = testRefreshingTable(ir(0, 3).toTracking(),
stringCol("Sym", "aa", "bb", "aa", "bb"),
intCol("intCol", 10, 20, 40, 60),
doubleCol("doubleCol", 0.1, 0.2, 0.4, 0.6));
p1.setAttribute(Table.APPEND_ONLY_TABLE_ATTRIBUTE, true);

p2 = testRefreshingTable(i(0, 1, 2, 3).toTracking(),
p2 = testRefreshingTable(ir(0, 3).toTracking(),
stringCol("Sym", "cc", "dd", "cc", "dd"),
intCol("intCol", 100, 200, 400, 600),
doubleCol("doubleCol", 0.1, 0.2, 0.4, 0.6));
p2.setAttribute(Table.APPEND_ONLY_TABLE_ATTRIBUTE, true);

p3 = testRefreshingTable(i(0, 1, 2, 3).toTracking(),
p3 = testRefreshingTable(ir(0, 3).toTracking(),
stringCol("Sym", "ee", "ff", "ee", "ff"),
intCol("intCol", 1000, 2000, 4000, 6000),
doubleCol("doubleCol", 0.1, 0.2, 0.4, 0.6));
p3.setAttribute(Table.APPEND_ONLY_TABLE_ATTRIBUTE, true);

p4 = testRefreshingTable(i(0, 1, 2, 3).toTracking(),
p4 = testRefreshingTable(ir(0, 3).toTracking(),
stringCol("Sym", "gg", "hh", "gg", "hh"),
intCol("intCol", 10000, 20000, 40000, 60000),
doubleCol("doubleCol", 0.1, 0.2, 0.4, 0.6));
p4.setAttribute(Table.APPEND_ONLY_TABLE_ATTRIBUTE, true);

p5 = testRefreshingTable(i().toTracking(), // Initially empty
stringCol("Sym"),
intCol("intCol"),
doubleCol("doubleCol"));
p5.setAttribute(Table.APPEND_ONLY_TABLE_ATTRIBUTE, true);

registrar = new DependentRegistrar();
tlp = new TableBackedTableLocationProvider(
registrar,
Expand Down Expand Up @@ -238,7 +245,7 @@ public void testAddAndRemoveLocations() {
*/
final TableLocation location5;
try (final SafeCloseable ignored = LivenessScopeStack.open(new LivenessScope(), true)) {
final QueryTable p5 = testRefreshingTable(i(0, 1, 2, 3).toTracking(),
final QueryTable p5 = testRefreshingTable(ir(0, 3).toTracking(),
stringCol("Sym", "ii", "jj", "ii", "jj"),
intCol("intCol", 10000, 20000, 40000, 60000),
doubleCol("doubleCol", 0.1, 0.2, 0.4, 0.6));
Expand Down Expand Up @@ -407,4 +414,27 @@ public void testCantReadPrev() {
TableLocationRemovedException.class).isPresent()));
getUpdateErrors().clear();
}

@Test
public void testInitiallyEmptyLocation() {
final SourcePartitionedTable spt = setUpData();
final Table ptSummary = spt.merge().selectDistinct("Sym");
verifyStringColumnContents(ptSummary, "Sym", "aa", "bb", "cc", "dd");
tlp.add(p5);
updateGraph.getDelegate().runWithinUnitTestCycle(() -> {
updateGraph.refreshSources();
// We refreshed the source first, so it won't see a new size for the location backed by p5 on this cycle.
addToTable(p5, ir(0, 3),
stringCol("Sym", "ii", "jj", "kk", "ll"),
intCol("intCol", 10000, 20000, 40000, 60000),
doubleCol("doubleCol", 0.1, 0.2, 0.4, 0.6));
}, true);
verifyStringColumnContents(ptSummary, "Sym", "aa", "bb", "cc", "dd");
updateGraph.getDelegate().runWithinUnitTestCycle(() -> {
updateGraph.refreshSources();
// Now the source has been refreshed, so it should see the new size of the location backed by p5, and
// include it in the result.
}, true);
verifyStringColumnContents(ptSummary, "Sym", "aa", "bb", "cc", "dd", "ii", "jj", "kk", "ll");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,17 @@ public static WritableRowSet i(long... keys) {
return RowSetFactory.fromKeys(keys);
}

/**
* A shorthand for {@link RowSetFactory#fromRange(long, long)} for use in unit tests.
*
* @param firstRowKey the first key of the new RowSet
* @param lastRowKey the last key (inclusive) of the new RowSet
* @return a new RowSet with the given key range
*/
public static WritableRowSet ir(final long firstRowKey, final long lastRowKey) {
return RowSetFactory.fromRange(firstRowKey, lastRowKey);
}

public static void addToTable(final Table table, final RowSet rowSet, final ColumnHolder<?>... columnHolders) {
if (rowSet.isEmpty()) {
return;
Expand Down

0 comments on commit d3148f7

Please sign in to comment.