diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/SourcePartitionedTable.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/SourcePartitionedTable.java index 77e0c5ca2c4..7864c0dff79 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/SourcePartitionedTable.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/SourcePartitionedTable.java @@ -248,6 +248,9 @@ private void processPendingLocations(final boolean notifyListeners) { try (final TableLocationSubscriptionBuffer.LocationUpdate locationUpdate = subscriptionBuffer.processPending()) { + if (locationUpdate == null) { + return; + } removed = processRemovals(locationUpdate); added = processAdditions(locationUpdate); } diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/SourceTable.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/SourceTable.java index 88dd71fc12f..93e8ebd9bf0 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/SourceTable.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/SourceTable.java @@ -152,8 +152,10 @@ private void initializeAvailableLocations() { manage(locationBuffer); try (final TableLocationSubscriptionBuffer.LocationUpdate locationUpdate = locationBuffer.processPending()) { - maybeRemoveLocations(locationUpdate.getPendingRemovedLocationKeys()); - maybeAddLocations(locationUpdate.getPendingAddedLocationKeys()); + if (locationUpdate != null) { + maybeRemoveLocations(locationUpdate.getPendingRemovedLocationKeys()); + maybeAddLocations(locationUpdate.getPendingAddedLocationKeys()); + } } updateSourceRegistrar.addSource(locationChangePoller = new LocationChangePoller(locationBuffer)); } else { @@ -235,16 +237,19 @@ private LocationChangePoller(@NotNull final TableLocationSubscriptionBuffer loca protected void instrumentedRefresh() { try (final TableLocationSubscriptionBuffer.LocationUpdate locationUpdate = locationBuffer.processPending()) { - if (!locationProvider.getUpdateMode().removeAllowed() - && !locationUpdate.getPendingRemovedLocationKeys().isEmpty()) { - // This TLP doesn't support removed locations, we need to throw an exception. - final ImmutableTableLocationKey[] keys = locationUpdate.getPendingRemovedLocationKeys().stream() - .map(LiveSupplier::get).toArray(ImmutableTableLocationKey[]::new); - throw new TableLocationRemovedException("Source table does not support removed locations", keys); - } + if (locationUpdate != null) { + if (!locationProvider.getUpdateMode().removeAllowed() + && !locationUpdate.getPendingRemovedLocationKeys().isEmpty()) { + // This TLP doesn't support removed locations, we need to throw an exception. + final ImmutableTableLocationKey[] keys = locationUpdate.getPendingRemovedLocationKeys().stream() + .map(LiveSupplier::get).toArray(ImmutableTableLocationKey[]::new); + throw new TableLocationRemovedException( + "Source table does not support removed locations", keys); + } - maybeRemoveLocations(locationUpdate.getPendingRemovedLocationKeys()); - maybeAddLocations(locationUpdate.getPendingAddedLocationKeys()); + maybeRemoveLocations(locationUpdate.getPendingRemovedLocationKeys()); + maybeAddLocations(locationUpdate.getPendingAddedLocationKeys()); + } } // This class previously had functionality to notify "location listeners", but it was never used. diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/locations/impl/PartitionedTableLocationKey.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/locations/impl/PartitionedTableLocationKey.java index 89325bc1351..b6ec5d69d73 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/locations/impl/PartitionedTableLocationKey.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/locations/impl/PartitionedTableLocationKey.java @@ -27,7 +27,7 @@ public abstract class PartitionedTableLocationKey implements ImmutableTableLocat protected final Map> partitions; - private int cachedHashCode; + protected int cachedHashCode; /** * Construct a new PartitionedTableLocationKey for the supplied {@code partitions}. diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/locations/impl/TableLocationSubscriptionBuffer.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/locations/impl/TableLocationSubscriptionBuffer.java index 3a59197a265..3fbf3aefc3b 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/locations/impl/TableLocationSubscriptionBuffer.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/locations/impl/TableLocationSubscriptionBuffer.java @@ -21,8 +21,8 @@ public class TableLocationSubscriptionBuffer extends ReferenceCountedLivenessNode implements TableLocationProvider.Listener { - private static final Set> EMPTY_TABLE_LOCATION_KEYS = - Collections.emptySet(); + private static final Map> EMPTY_TABLE_LOCATION_KEYS = + Collections.emptyMap(); private final TableLocationProvider tableLocationProvider; @@ -30,10 +30,7 @@ public class TableLocationSubscriptionBuffer extends ReferenceCountedLivenessNod private final Object updateLock = new Object(); - // These sets represent adds and removes from completed transactions. - private Set> pendingLocationsAdded = EMPTY_TABLE_LOCATION_KEYS; - private Set> pendingLocationsRemoved = EMPTY_TABLE_LOCATION_KEYS; - + private LocationUpdate pendingUpdate = null; private TableDataException pendingException = null; public TableLocationSubscriptionBuffer(@NotNull final TableLocationProvider tableLocationProvider) { @@ -42,28 +39,77 @@ public TableLocationSubscriptionBuffer(@NotNull final TableLocationProvider tabl } public final class LocationUpdate implements SafeCloseable { - private final Collection> pendingAddedLocationKeys; - private final Collection> pendingRemovedLocations; - public LocationUpdate( - @NotNull final Collection> pendingAddedLocationKeys, - @NotNull final Collection> pendingRemovedLocations) { - this.pendingAddedLocationKeys = pendingAddedLocationKeys; - this.pendingRemovedLocations = pendingRemovedLocations; + private final ReferenceCountedLivenessNode livenessNode = new ReferenceCountedLivenessNode(false) {}; + + // These sets represent adds and removes from completed transactions. + private Map> added = + EMPTY_TABLE_LOCATION_KEYS; + private Map> removed = + EMPTY_TABLE_LOCATION_KEYS; + + private LocationUpdate() { + TableLocationSubscriptionBuffer.this.manage(livenessNode); + } + + private void processAdd(@NotNull final LiveSupplier addedKeySupplier) { + final ImmutableTableLocationKey addedKey = addedKeySupplier.get(); + // Need to verify that we don't have stacked adds (without intervening removes). + if (added.containsKey(addedKey)) { + throw new IllegalStateException("TableLocationKey " + addedKey + + " was already added by a previous transaction."); + } + if (added == EMPTY_TABLE_LOCATION_KEYS) { + added = new HashMap<>(); + } + livenessNode.manage(addedKeySupplier); + added.put(addedKey, addedKeySupplier); + } + + private void processRemove(@NotNull final LiveSupplier removedKeySupplier) { + final ImmutableTableLocationKey removedKey = removedKeySupplier.get(); + // If we have a pending add, it is being cancelled by this remove. + if (added.remove(removedKey) != null) { + return; + } + // Verify that we don't have stacked removes (without intervening adds). + if (removed.containsKey(removedKey)) { + throw new IllegalStateException("TableLocationKey " + removedKey + + " was already removed and has not been replaced."); + } + if (removed == EMPTY_TABLE_LOCATION_KEYS) { + removed = new HashMap<>(); + } + livenessNode.manage(removedKeySupplier); + removed.put(removedKey, removedKeySupplier); + } + + private void processTransaction( + @Nullable Collection> addedKeySuppliers, + @Nullable Collection> removedKeySuppliers) { + if (removedKeySuppliers != null) { + for (final LiveSupplier removedKeySupplier : removedKeySuppliers) { + processRemove(removedKeySupplier); + } + } + if (addedKeySuppliers != null) { + for (final LiveSupplier addedKeySupplier : addedKeySuppliers) { + processAdd(addedKeySupplier); + } + } } public Collection> getPendingAddedLocationKeys() { - return pendingAddedLocationKeys; + return added.values(); } public Collection> getPendingRemovedLocationKeys() { - return pendingRemovedLocations; + return removed.values(); } @Override public void close() { - pendingAddedLocationKeys.forEach(TableLocationSubscriptionBuffer.this::unmanage); - pendingRemovedLocations.forEach(TableLocationSubscriptionBuffer.this::unmanage); + TableLocationSubscriptionBuffer.this.unmanage(livenessNode); } } @@ -76,7 +122,6 @@ public void close() { * @return The collection of pending location keys. */ public synchronized LocationUpdate processPending() { - // TODO: Should I change this to instead re-use the collection? if (!subscribed) { if (tableLocationProvider.supportsSubscriptions()) { tableLocationProvider.subscribe(this); @@ -90,23 +135,21 @@ public synchronized LocationUpdate processPending() { } subscribed = true; } - final Collection> resultLocationKeys; - final Collection> resultLocationsRemoved; + final LocationUpdate resultUpdate; final TableDataException resultException; synchronized (updateLock) { - resultLocationKeys = pendingLocationsAdded; - pendingLocationsAdded = EMPTY_TABLE_LOCATION_KEYS; - resultLocationsRemoved = pendingLocationsRemoved; - pendingLocationsRemoved = EMPTY_TABLE_LOCATION_KEYS; + resultUpdate = pendingUpdate; + pendingUpdate = null; resultException = pendingException; pendingException = null; } if (resultException != null) { - throw new TableDataException("Processed pending exception", resultException); + try (final SafeCloseable ignored = resultUpdate) { + throw new TableDataException("Processed pending exception", resultException); + } } - - return new LocationUpdate(resultLocationKeys, resultLocationsRemoved); + return resultUpdate; } /** @@ -119,92 +162,52 @@ public synchronized void reset() { } subscribed = false; } + final LocationUpdate toClose; synchronized (updateLock) { - pendingLocationsAdded.forEach(this::unmanage); - pendingLocationsRemoved.forEach(this::unmanage); - pendingLocationsAdded = EMPTY_TABLE_LOCATION_KEYS; - pendingLocationsRemoved = EMPTY_TABLE_LOCATION_KEYS; + toClose = pendingUpdate; + pendingUpdate = null; pendingException = null; } + if (toClose != null) { + toClose.close(); + } } // ------------------------------------------------------------------------------------------------------------------ // TableLocationProvider.Listener implementation // ------------------------------------------------------------------------------------------------------------------ + private LocationUpdate ensurePendingUpdate() { + if (pendingUpdate == null) { + pendingUpdate = new LocationUpdate(); + } + return pendingUpdate; + } + @Override - public void handleTableLocationKeyAdded(@NotNull final LiveSupplier tableLocationKey) { + public void handleTableLocationKeyAdded(@NotNull final LiveSupplier addedKeySupplier) { synchronized (updateLock) { - // Need to verify that we don't have stacked adds (without intervening removes). - if (pendingLocationsAdded.contains(tableLocationKey)) { - throw new IllegalStateException("TableLocationKey " + tableLocationKey - + " was already added by a previous transaction."); - } - if (pendingLocationsAdded == EMPTY_TABLE_LOCATION_KEYS) { - pendingLocationsAdded = new HashSet<>(); - } - manage(tableLocationKey); - pendingLocationsAdded.add(tableLocationKey); + // noinspection resource + ensurePendingUpdate().processAdd(addedKeySupplier); } } @Override - public void handleTableLocationKeyRemoved(@NotNull final LiveSupplier tableLocationKey) { + public void handleTableLocationKeyRemoved( + @NotNull final LiveSupplier removedKeySupplier) { synchronized (updateLock) { - // If we have a pending add, it is being cancelled by this remove. - if (pendingLocationsAdded.remove(tableLocationKey)) { - return; - } - // Verify that we don't have stacked removes (without intervening adds). - if (pendingLocationsRemoved.contains(tableLocationKey)) { - throw new IllegalStateException("TableLocationKey " + tableLocationKey - + " was already removed and has not been replaced."); - } - if (pendingLocationsRemoved == EMPTY_TABLE_LOCATION_KEYS) { - pendingLocationsRemoved = new HashSet<>(); - } - manage(tableLocationKey); - pendingLocationsRemoved.add(tableLocationKey); + // noinspection resource + ensurePendingUpdate().processRemove(removedKeySupplier); } } @Override public void handleTableLocationKeysUpdate( - @Nullable Collection> addedKeys, - @Nullable Collection> removedKeys) { + @Nullable Collection> addedKeySuppliers, + @Nullable Collection> removedKeySuppliers) { synchronized (updateLock) { - if (removedKeys != null) { - for (final LiveSupplier removedTableLocationKey : removedKeys) { - // If we have a pending add, it is being cancelled by this remove. - if (pendingLocationsAdded.remove(removedTableLocationKey)) { - continue; - } - // Verify that we don't have stacked removes. - if (pendingLocationsRemoved.contains(removedTableLocationKey)) { - throw new IllegalStateException("TableLocationKey " + removedTableLocationKey - + " was already removed and has not been replaced."); - } - if (pendingLocationsRemoved == EMPTY_TABLE_LOCATION_KEYS) { - pendingLocationsRemoved = new HashSet<>(); - } - manage(removedTableLocationKey); - pendingLocationsRemoved.add(removedTableLocationKey); - } - } - if (addedKeys != null) { - for (final LiveSupplier addedTableLocationKey : addedKeys) { - // Need to verify that we don't have stacked adds. - if (pendingLocationsAdded.contains(addedTableLocationKey)) { - throw new IllegalStateException("TableLocationKey " + addedTableLocationKey - + " was already added by a previous transaction."); - } - if (pendingLocationsAdded == EMPTY_TABLE_LOCATION_KEYS) { - pendingLocationsAdded = new HashSet<>(); - } - manage(addedTableLocationKey); - pendingLocationsAdded.add(addedTableLocationKey); - } - } + // noinspection resource + ensurePendingUpdate().processTransaction(addedKeySuppliers, removedKeySuppliers); } } diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/locations/local/FileTableLocationKey.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/locations/local/FileTableLocationKey.java index 516542396e4..eae7dbd88cd 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/locations/local/FileTableLocationKey.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/locations/local/FileTableLocationKey.java @@ -27,8 +27,6 @@ public class FileTableLocationKey extends PartitionedTableLocationKey { protected final File file; private final int order; - private int cachedHashCode; - /** * Construct a new FileTableLocationKey for the supplied {@code file} and {@code partitions}. * diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/locations/local/URITableLocationKey.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/locations/local/URITableLocationKey.java index 83d4476f762..90fa92f71c4 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/locations/local/URITableLocationKey.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/locations/local/URITableLocationKey.java @@ -30,8 +30,6 @@ public class URITableLocationKey extends PartitionedTableLocationKey { protected final URI uri; protected final int order; - private int cachedHashCode; - /** * Construct a new URITableLocationKey for the supplied {@code uri} and {@code partitions}. * diff --git a/engine/table/src/test/java/io/deephaven/engine/table/impl/ManyLocationsTest.java b/engine/table/src/test/java/io/deephaven/engine/table/impl/ManyLocationsTest.java new file mode 100644 index 00000000000..8161faf8ae6 --- /dev/null +++ b/engine/table/src/test/java/io/deephaven/engine/table/impl/ManyLocationsTest.java @@ -0,0 +1,64 @@ +// +// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending +// +package io.deephaven.engine.table.impl; + +import io.deephaven.engine.context.ExecutionContext; +import io.deephaven.engine.table.ColumnDefinition; +import io.deephaven.engine.table.Table; +import io.deephaven.engine.table.TableDefinition; +import io.deephaven.engine.table.impl.sources.regioned.RegionedTableComponentFactoryImpl; +import io.deephaven.engine.testutil.ControlledUpdateGraph; +import io.deephaven.engine.testutil.locations.TableBackedTableLocationProvider; +import io.deephaven.engine.testutil.testcase.RefreshingTableTestCase; +import io.deephaven.engine.util.TableTools; +import io.deephaven.test.types.OutOfBandTest; +import org.junit.experimental.categories.Category; + +import java.util.Map; + +@Category(OutOfBandTest.class) +public class ManyLocationsTest extends RefreshingTableTestCase { + + private static final boolean DISABLE_PERFORMANCE_TEST = true; + + public void testManyLocationsCoalesce() { + if (DISABLE_PERFORMANCE_TEST) { + return; + } + + final long startConstructionNanos = System.nanoTime(); + final ControlledUpdateGraph updateGraph = ExecutionContext.getContext().getUpdateGraph().cast(); + final TableBackedTableLocationProvider tlp = new TableBackedTableLocationProvider( + updateGraph, + true, + TableUpdateMode.APPEND_ONLY, + TableUpdateMode.APPEND_ONLY); + final Table singleLocationTable = TableTools.emptyTable(1000).update("AA=ii"); + final Table past = new PartitionAwareSourceTable( + TableDefinition.of( + ColumnDefinition.ofInt("PI").withPartitioning(), + ColumnDefinition.ofLong("AA")), + "TestTable", + RegionedTableComponentFactoryImpl.INSTANCE, + tlp, + updateGraph); + final long endConstructionNanos = System.nanoTime(); + System.out.printf("Construction time: %.2fs%n", + (endConstructionNanos - startConstructionNanos) / 1_000_000_000D); + + final long startCreationNanos = System.nanoTime(); + for (int pi = 0; pi < 100_000; ++pi) { + tlp.add(singleLocationTable, Map.of("PI", pi)); + } + final long endCreationNanos = System.nanoTime(); + System.out.printf("Creation time: %.2fs%n", (endCreationNanos - startCreationNanos) / 1_000_000_000D); + + final long startCoalesceNanos = System.nanoTime(); + past.coalesce(); + final long endCoalesceNanos = System.nanoTime(); + System.out.printf("Coalesce time: %.2fs%n", (endCoalesceNanos - startCoalesceNanos) / 1_000_000_000D); + + System.out.printf("Total time: %.2fs%n", (endCoalesceNanos - startConstructionNanos) / 1_000_000_000D); + } +} diff --git a/engine/table/src/test/java/io/deephaven/engine/table/impl/SourcePartitionedTableTest.java b/engine/table/src/test/java/io/deephaven/engine/table/impl/SourcePartitionedTableTest.java index af416b73aad..b040d3a7a5f 100644 --- a/engine/table/src/test/java/io/deephaven/engine/table/impl/SourcePartitionedTableTest.java +++ b/engine/table/src/test/java/io/deephaven/engine/table/impl/SourcePartitionedTableTest.java @@ -182,8 +182,7 @@ public void testAddAndRemoveLocations() { // Add a new location (p3) //////////////////////////////////////////// - tlp.addPending(p3); - tlp.refresh(); + tlp.add(p3); updateGraph.getDelegate().startCycleForUnitTests(false); updateGraph.refreshSources(); @@ -210,9 +209,8 @@ public void testAddAndRemoveLocations() { //////////////////////////////////////////// tlks = tlp.getTableLocationKeys().stream().sorted().toArray(ImmutableTableLocationKey[]::new); - tlp.addPending(p4); tlp.removeTableLocationKey(tlks[0]); - tlp.refresh(); + tlp.add(p4); updateGraph.getDelegate().startCycleForUnitTests(false); updateGraph.refreshSources(); @@ -245,8 +243,7 @@ public void testAddAndRemoveLocations() { intCol("intCol", 10000, 20000, 40000, 60000), doubleCol("doubleCol", 0.1, 0.2, 0.4, 0.6)); p5.setAttribute(Table.APPEND_ONLY_TABLE_ATTRIBUTE, true); - tlp.addPending(p5); - tlp.refresh(); + tlp.add(p5); updateGraph.getDelegate().startCycleForUnitTests(false); updateGraph.refreshSources(); diff --git a/engine/test-utils/src/main/java/io/deephaven/engine/testutil/locations/TableBackedTableLocationKey.java b/engine/test-utils/src/main/java/io/deephaven/engine/testutil/locations/TableBackedTableLocationKey.java index 97829cfaf9f..b08ab3498bf 100644 --- a/engine/test-utils/src/main/java/io/deephaven/engine/testutil/locations/TableBackedTableLocationKey.java +++ b/engine/test-utils/src/main/java/io/deephaven/engine/testutil/locations/TableBackedTableLocationKey.java @@ -5,23 +5,26 @@ import io.deephaven.base.log.LogOutput; import io.deephaven.engine.table.impl.QueryTable; -import io.deephaven.engine.table.impl.locations.ImmutableTableLocationKey; import io.deephaven.engine.table.impl.locations.TableLocationKey; -import io.deephaven.engine.table.impl.locations.UnknownPartitionKeyException; +import io.deephaven.engine.table.impl.locations.impl.PartitionedTableLocationKey; import io.deephaven.io.log.impl.LogOutputStringImpl; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; -import java.util.Collections; -import java.util.Set; +import java.util.Map; -public final class TableBackedTableLocationKey implements ImmutableTableLocationKey { +import static io.deephaven.engine.testutil.locations.TableBackedTableLocationProvider.LOCATION_ID_ATTR; + +public final class TableBackedTableLocationKey extends PartitionedTableLocationKey { private static final String NAME = TableBackedTableLocationKey.class.getSimpleName(); final QueryTable table; - public TableBackedTableLocationKey(@NotNull final QueryTable table) { + public TableBackedTableLocationKey( + @Nullable final Map> partitions, + @NotNull final QueryTable table) { + super(partitions); this.table = table; } @@ -46,38 +49,55 @@ public String toString() { @Override public int compareTo(@NotNull final TableLocationKey other) { + if (this == other) { + return 0; + } if (other instanceof TableBackedTableLocationKey) { final TableBackedTableLocationKey otherTyped = (TableBackedTableLocationKey) other; - // noinspection DataFlowIssue - final int idComparisonResult = - Integer.compare((int) table.getAttribute("ID"), (int) otherTyped.table.getAttribute("ID")); + final int partitionComparisonResult = + PartitionsComparator.INSTANCE.compare(partitions, otherTyped.partitions); + if (partitionComparisonResult != 0) { + return partitionComparisonResult; + } + if (table == otherTyped.table) { + return 0; + } + final int idComparisonResult = Integer.compare(getId(), otherTyped.getId()); if (idComparisonResult != 0) { return idComparisonResult; } + throw new UnsupportedOperationException(getImplementationName() + + " cannot be compared to instances that have different tables but the same \"" + LOCATION_ID_ATTR + + "\" attribute"); } - return ImmutableTableLocationKey.super.compareTo(other); + return super.compareTo(other); + } + + private int getId() { + // noinspection DataFlowIssue + return (int) table.getAttribute(LOCATION_ID_ATTR); } @Override public int hashCode() { - return System.identityHashCode(table); + if (cachedHashCode == 0) { + final int computedHashCode = 31 * partitions.hashCode() + Integer.hashCode(getId()); + // Don't use 0; that's used by StandaloneTableLocationKey, and also our sentinel for the need to compute + if (computedHashCode == 0) { + final int fallbackHashCode = TableBackedTableLocationKey.class.hashCode(); + cachedHashCode = fallbackHashCode == 0 ? 1 : fallbackHashCode; + } else { + cachedHashCode = computedHashCode; + } + } + return cachedHashCode; } @Override public boolean equals(@Nullable final Object other) { return other == this || (other instanceof TableBackedTableLocationKey - && ((TableBackedTableLocationKey) other).table == table); - } - - @Override - public > PARTITION_VALUE_TYPE getPartitionValue( - @NotNull final String partitionKey) { - throw new UnknownPartitionKeyException(partitionKey, this); - } - - @Override - public Set getPartitionKeys() { - return Collections.emptySet(); + && ((TableBackedTableLocationKey) other).table == table + && partitions.equals(((TableBackedTableLocationKey) other).partitions)); } } diff --git a/engine/test-utils/src/main/java/io/deephaven/engine/testutil/locations/TableBackedTableLocationProvider.java b/engine/test-utils/src/main/java/io/deephaven/engine/testutil/locations/TableBackedTableLocationProvider.java index 65e196b6d46..d8afa6c5795 100644 --- a/engine/test-utils/src/main/java/io/deephaven/engine/testutil/locations/TableBackedTableLocationProvider.java +++ b/engine/test-utils/src/main/java/io/deephaven/engine/testutil/locations/TableBackedTableLocationProvider.java @@ -11,23 +11,25 @@ import io.deephaven.engine.table.impl.locations.TableLocationKey; import io.deephaven.engine.table.impl.locations.impl.AbstractTableLocationProvider; import io.deephaven.engine.table.impl.locations.impl.StandaloneTableKey; +import io.deephaven.engine.table.impl.perf.QueryPerformanceRecorder; import io.deephaven.engine.updategraph.UpdateSourceRegistrar; -import io.deephaven.util.mutable.MutableInt; import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; import java.util.ArrayList; -import java.util.Arrays; import java.util.List; import java.util.Map; -import java.util.stream.Stream; +import java.util.concurrent.atomic.AtomicInteger; public final class TableBackedTableLocationProvider extends AbstractTableLocationProvider { public static final String LOCATION_ID_ATTR = "ID"; private final UpdateSourceRegistrar registrar; - private final List pending = new ArrayList<>(); - private final MutableInt nextId = new MutableInt(); + private final String callSite; + + private final List pending = new ArrayList<>(); + private final AtomicInteger nextId = new AtomicInteger(); public TableBackedTableLocationProvider( @NotNull final UpdateSourceRegistrar registrar, @@ -37,20 +39,82 @@ public TableBackedTableLocationProvider( @NotNull final Table... tables) { super(StandaloneTableKey.getInstance(), supportsSubscriptions, updateMode, locationUpdateMode); this.registrar = registrar; - processPending(Arrays.stream(tables)); + + callSite = QueryPerformanceRecorder.getCallerLine(); + + for (final Table table : tables) { + add(table); + } } - private void processPending(@NotNull final Stream
tableStream) { - tableStream - .map(table -> (QueryTable) table.coalesce() - .withAttributes(Map.of(LOCATION_ID_ATTR, nextId.getAndIncrement()))) - .peek(table -> Assert.assertion(table.isAppendOnly(), "table is append only")) - .map(TableBackedTableLocationKey::new) - .forEach(this::handleTableLocationKeyAdded); + private TableBackedTableLocationKey makeTableLocationKey( + @NotNull final Table table, + @Nullable final Map> partitions) { + final boolean needToClearCallsite = QueryPerformanceRecorder.setCallsite(callSite); + final QueryTable coalesced = (QueryTable) table.coalesce(); + Assert.assertion(coalesced.isAppendOnly(), "table is append only"); + final QueryTable withId = + (QueryTable) coalesced.withAttributes(Map.of(LOCATION_ID_ATTR, nextId.getAndIncrement())); + if (needToClearCallsite) { + QueryPerformanceRecorder.clearCallsite(); + } + return new TableBackedTableLocationKey(partitions, withId); } - public synchronized void addPending(@NotNull final Table toAdd) { - pending.add(toAdd); + /** + * Enqueue a table that belongs to the supplied {@code partitions} to be added upon the next {@link #refresh() + * refresh}. + * + * @param toAdd The {@link Table} to add + * @param partitions The partitions the newly-added table-backed location belongs to + */ + public void addPending( + @NotNull final Table toAdd, + @Nullable final Map> partitions) { + final TableBackedTableLocationKey tlk = makeTableLocationKey(toAdd, partitions); + synchronized (pending) { + pending.add(tlk); + } + } + + /** + * Enqueue a table that belongs to no partitions to be added upon the next {@link #refresh() refresh}. + * + * @param toAdd The {@link Table} to add + */ + public void addPending(@NotNull final Table toAdd) { + addPending(toAdd, null); + } + + private void processPending() { + synchronized (pending) { + if (pending.isEmpty()) { + return; + } + pending.forEach(this::handleTableLocationKeyAdded); + pending.clear(); + } + } + + /** + * Add a table that belongs to the supplied {@code partitions}. + * + * @param toAdd The {@link Table} to add + * @param partitions The partitions the newly-added table-backed location belongs to + */ + public void add( + @NotNull final Table toAdd, + @Nullable final Map> partitions) { + handleTableLocationKeyAdded(makeTableLocationKey(toAdd, partitions)); + } + + /** + * Add a table that belongs to no partitionns. + * + * @param toAdd The {@link Table} to add + */ + public void add(@NotNull final Table toAdd) { + add(toAdd, null); } @Override @@ -68,12 +132,7 @@ protected void deactivateUnderlyingDataSource() {} @Override public void refresh() { - if (pending.isEmpty()) { - return; - } - - processPending(pending.stream()); - pending.clear(); + processPending(); } @Override