diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/SourcePartitionedTable.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/SourcePartitionedTable.java
index 7ef10f4e31a..c39dc289f1b 100644
--- a/engine/table/src/main/java/io/deephaven/engine/table/impl/SourcePartitionedTable.java
+++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/SourcePartitionedTable.java
@@ -54,6 +54,7 @@ public class SourcePartitionedTable extends PartitionedTableImpl {
* @param refreshSizes Whether the locations found should be refreshed
* @param locationKeyMatcher Function to filter desired location keys
*/
+ @Deprecated(forRemoval = true)
public SourcePartitionedTable(
@NotNull final TableDefinition constituentDefinition,
@NotNull final UnaryOperator
applyTablePermissions,
@@ -61,13 +62,44 @@ public SourcePartitionedTable(
final boolean refreshLocations,
final boolean refreshSizes,
@NotNull final Predicate locationKeyMatcher) {
+ this(constituentDefinition, applyTablePermissions, tableLocationProvider, refreshLocations, refreshSizes,
+ locationKeyMatcher, true);
+ }
+
+ /**
+ * Construct a {@link SourcePartitionedTable} from the supplied parameters.
+ *
+ * Note that refreshLocations and refreshSizes are distinct because there are use cases that supply an external
+ * RowSet and hence don't require size refreshes. Others might care for size refreshes, but only the
+ * initially-available set of locations.
+ *
+ * @param constituentDefinition The {@link TableDefinition} expected of constituent {@link Table tables}
+ * @param applyTablePermissions Function to apply in order to correctly restrict the visible result rows
+ * @param tableLocationProvider Source for table locations
+ * @param refreshLocations Whether the set of locations should be refreshed
+ * @param refreshSizes Whether the locations found should be refreshed
+ * @param locationKeyMatcher Function to filter desired location keys
+ * @param preCheckExistence Whether to pre-check the existence (non-null, non-zero size) of locations before
+ * including them in the result SourcePartitionedTable as constituents. It is recommended to set this to
+ * {@code false} if you will do subsequent filtering on the result, or if you are confident that all
+ * locations are valid.
+ */
+ public SourcePartitionedTable(
+ @NotNull final TableDefinition constituentDefinition,
+ @NotNull final UnaryOperator
applyTablePermissions,
+ @NotNull final TableLocationProvider tableLocationProvider,
+ final boolean refreshLocations,
+ final boolean refreshSizes,
+ @NotNull final Predicate locationKeyMatcher,
+ final boolean preCheckExistence) {
super(new UnderlyingTableMaintainer(
constituentDefinition,
applyTablePermissions,
tableLocationProvider,
refreshLocations,
refreshSizes,
- locationKeyMatcher).result(),
+ locationKeyMatcher,
+ preCheckExistence).result(),
Set.of(KEY_COLUMN_NAME),
true,
CONSTITUENT_COLUMN_NAME,
@@ -86,6 +118,8 @@ private static final class UnderlyingTableMaintainer extends ReferenceCountedLiv
private final Predicate locationKeyMatcher;
private final TrackingWritableRowSet resultRows;
+ private final String[] partitioningColumnNames;
+ private final WritableColumnSource>[] resultPartitionValues;
private final WritableColumnSource resultTableLocationKeys;
private final WritableColumnSource resultLocationTables;
private final QueryTable result;
@@ -106,7 +140,8 @@ private UnderlyingTableMaintainer(
@NotNull final TableLocationProvider tableLocationProvider,
final boolean refreshLocations,
final boolean refreshSizes,
- @NotNull final Predicate locationKeyMatcher) {
+ @NotNull final Predicate locationKeyMatcher,
+ final boolean preCheckExistence) {
super(false);
this.constituentDefinition = constituentDefinition;
@@ -116,10 +151,20 @@ private UnderlyingTableMaintainer(
this.locationKeyMatcher = locationKeyMatcher;
resultRows = RowSetFactory.empty().toTracking();
+ final List> partitioningColumns = constituentDefinition.getPartitioningColumns();
+ partitioningColumnNames = partitioningColumns.stream()
+ .map(ColumnDefinition::getName)
+ .toArray(String[]::new);
+ resultPartitionValues = partitioningColumns.stream()
+ .map(cd -> ArrayBackedColumnSource.getMemoryColumnSource(cd.getDataType(), cd.getComponentType()))
+ .toArray(WritableColumnSource[]::new);
resultTableLocationKeys = ArrayBackedColumnSource.getMemoryColumnSource(TableLocationKey.class, null);
resultLocationTables = ArrayBackedColumnSource.getMemoryColumnSource(Table.class, null);
- final Map> resultSources = new LinkedHashMap<>(2);
+ final Map> resultSources = new LinkedHashMap<>(partitioningColumns.size() + 2);
+ for (int pci = 0; pci < partitioningColumns.size(); ++pci) {
+ resultSources.put(partitioningColumnNames[pci], resultPartitionValues[pci]);
+ }
resultSources.put(KEY_COLUMN_NAME, resultTableLocationKeys);
resultSources.put(CONSTITUENT_COLUMN_NAME, resultLocationTables);
result = new QueryTable(resultRows, resultSources);
@@ -135,14 +180,17 @@ private UnderlyingTableMaintainer(
}
if (needToRefreshLocations) {
+ Arrays.stream(resultPartitionValues).forEach(ColumnSource::startTrackingPrevValues);
resultTableLocationKeys.startTrackingPrevValues();
resultLocationTables.startTrackingPrevValues();
subscriptionBuffer = new TableLocationSubscriptionBuffer(tableLocationProvider);
manage(subscriptionBuffer);
- pendingLocationStates = new IntrusiveDoublyLinkedQueue<>(
- IntrusiveDoublyLinkedNode.Adapter.getInstance());
+ pendingLocationStates = preCheckExistence
+ ? new IntrusiveDoublyLinkedQueue<>(
+ IntrusiveDoublyLinkedNode.Adapter.getInstance())
+ : null;
readyLocationStates = new IntrusiveDoublyLinkedQueue<>(
IntrusiveDoublyLinkedNode.Adapter.getInstance());
processNewLocationsUpdateRoot = new InstrumentedTableUpdateSource(
@@ -206,12 +254,17 @@ private RowSet sortAndAddLocations(@NotNull final Stream location
// Note that makeConstituentTable expects us to subsequently unmanage the TableLocations
unmanage(locations.sorted(Comparator.comparing(TableLocation::getKey)).peek(tl -> {
final long constituentRowKey = lastInsertedRowKey.incrementAndGet();
- final Table constituentTable = makeConstituentTable(tl);
+
+ for (int pci = 0; pci < resultPartitionValues.length; ++pci) {
+ addPartitionValue(tl.getKey(), partitioningColumnNames[pci], resultPartitionValues[pci],
+ constituentRowKey);
+ }
resultTableLocationKeys.ensureCapacity(constituentRowKey + 1);
resultTableLocationKeys.set(constituentRowKey, tl.getKey());
resultLocationTables.ensureCapacity(constituentRowKey + 1);
+ final Table constituentTable = makeConstituentTable(tl);
resultLocationTables.set(constituentRowKey, constituentTable);
if (result.isRefreshing()) {
@@ -223,6 +276,15 @@ private RowSet sortAndAddLocations(@NotNull final Stream location
: RowSetFactory.fromRange(initialLastRowKey + 1, lastInsertedRowKey.get());
}
+ private static void addPartitionValue(
+ @NotNull final TableLocationKey tableLocationKey,
+ @NotNull final String partitioningColumnName,
+ @NotNull final WritableColumnSource partitionValueColumn,
+ final long rowKey) {
+ partitionValueColumn.ensureCapacity(rowKey + 1);
+ partitionValueColumn.set(rowKey, tableLocationKey.getPartitionValue(partitioningColumnName));
+ }
+
private Table makeConstituentTable(@NotNull final TableLocation tableLocation) {
final PartitionAwareSourceTable constituent = new PartitionAwareSourceTable(
constituentDefinition,
@@ -280,19 +342,24 @@ private RowSet processAdditions(final TableLocationSubscriptionBuffer.LocationUp
* population in STM ColumnSources.
*/
// TODO (https://github.com/deephaven/deephaven-core/issues/867): Refactor around a ticking partition table
- locationUpdate.getPendingAddedLocationKeys().stream()
+ final Stream newPendingLocations = locationUpdate.getPendingAddedLocationKeys()
+ .stream()
.map(LiveSupplier::get)
.filter(locationKeyMatcher)
.map(tableLocationProvider::getTableLocation)
.peek(this::manage)
- .map(PendingLocationState::new)
- .forEach(pendingLocationStates::offer);
- for (final Iterator iter = pendingLocationStates.iterator(); iter.hasNext();) {
- final PendingLocationState pendingLocationState = iter.next();
- if (pendingLocationState.exists()) {
- iter.remove();
- readyLocationStates.offer(pendingLocationState);
+ .map(PendingLocationState::new);
+ if (pendingLocationStates != null) {
+ newPendingLocations.forEach(pendingLocationStates::offer);
+ for (final Iterator iter = pendingLocationStates.iterator(); iter.hasNext();) {
+ final PendingLocationState pendingLocationState = iter.next();
+ if (pendingLocationState.exists()) {
+ iter.remove();
+ readyLocationStates.offer(pendingLocationState);
+ }
}
+ } else {
+ newPendingLocations.forEach(readyLocationStates::offer);
}
if (readyLocationStates.isEmpty()) {
@@ -317,22 +384,24 @@ private RowSet processRemovals(final TableLocationSubscriptionBuffer.LocationUpd
}
// Iterate through the pending locations and remove any that are in the removed set.
- List toUnmanage = null;
- for (final Iterator iter = pendingLocationStates.iterator(); iter.hasNext();) {
- final PendingLocationState pendingLocationState = iter.next();
- if (relevantRemovedLocations.contains(pendingLocationState.location.getKey())) {
- iter.remove();
- // Release the state and plan to unmanage the location
- if (toUnmanage == null) {
- toUnmanage = new ArrayList<>();
+ if (pendingLocationStates != null) {
+ List toUnmanage = null;
+ for (final Iterator iter = pendingLocationStates.iterator(); iter.hasNext();) {
+ final PendingLocationState pendingLocationState = iter.next();
+ if (relevantRemovedLocations.contains(pendingLocationState.location.getKey())) {
+ iter.remove();
+ // Release the state and plan to unmanage the location
+ if (toUnmanage == null) {
+ toUnmanage = new ArrayList<>();
+ }
+ toUnmanage.add(pendingLocationState.release());
}
- toUnmanage.add(pendingLocationState.release());
}
- }
- if (toUnmanage != null) {
- unmanage(toUnmanage.stream());
- // noinspection UnusedAssignment
- toUnmanage = null;
+ if (toUnmanage != null) {
+ unmanage(toUnmanage.stream());
+ // noinspection UnusedAssignment
+ toUnmanage = null;
+ }
}
// At the end of the cycle we need to make sure we unmanage any removed constituents.
@@ -367,6 +436,7 @@ private RowSet processRemovals(final TableLocationSubscriptionBuffer.LocationUpd
this.removedLocationsCommitter.maybeActivate();
final WritableRowSet deletedRows = deleteBuilder.build();
+ Arrays.stream(resultPartitionValues).forEach(cs -> cs.setNull(deletedRows));
resultTableLocationKeys.setNull(deletedRows);
resultLocationTables.setNull(deletedRows);
return deletedRows;
diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/locations/impl/SingleTableLocationProvider.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/locations/impl/SingleTableLocationProvider.java
index 977c4d8f0ef..045c6c7a4da 100644
--- a/engine/table/src/main/java/io/deephaven/engine/table/impl/locations/impl/SingleTableLocationProvider.java
+++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/locations/impl/SingleTableLocationProvider.java
@@ -17,7 +17,7 @@
/**
* A {@link TableLocationProvider} that provides access to exactly one, previously-known {@link TableLocation}. In
* contrast to {@link AbstractTableLocationProvider}, this class does not manage the liveness of the table location.
- * Managment must be done externally (as in {@link io.deephaven.engine.table.impl.SourcePartitionedTable}).
+ * Management must be done externally (as in {@link io.deephaven.engine.table.impl.SourcePartitionedTable}).
*/
public final class SingleTableLocationProvider implements TableLocationProvider {
private static final String IMPLEMENTATION_NAME = SingleTableLocationProvider.class.getSimpleName();
diff --git a/engine/table/src/test/java/io/deephaven/engine/table/impl/SourcePartitionedTableTest.java b/engine/table/src/test/java/io/deephaven/engine/table/impl/SourcePartitionedTableTest.java
index b040d3a7a5f..68906192f8c 100644
--- a/engine/table/src/test/java/io/deephaven/engine/table/impl/SourcePartitionedTableTest.java
+++ b/engine/table/src/test/java/io/deephaven/engine/table/impl/SourcePartitionedTableTest.java
@@ -104,7 +104,8 @@ private SourcePartitionedTable setUpData() {
tlp,
true,
true,
- l -> true);
+ l -> true,
+ true);
}
private void verifyStringColumnContents(Table table, String columnName, String... expectedValues) {
diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/PythonTableDataService.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/PythonTableDataService.java
index 7586ee831fc..e0bc8127aec 100644
--- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/PythonTableDataService.java
+++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/PythonTableDataService.java
@@ -120,10 +120,14 @@ public Table makeTable(@NotNull final TableKeyImpl tableKey, final boolean live)
*
* @param tableKey The table key
* @param live Whether the result should update as new data becomes available
+ * @param preCheckExistence Whether to include only locations observed to have non-empty data
* @return The {@link PartitionedTable}
*/
@ScriptApi
- public PartitionedTable makePartitionedTable(@NotNull final TableKeyImpl tableKey, final boolean live) {
+ public PartitionedTable makePartitionedTable(
+ @NotNull final TableKeyImpl tableKey,
+ final boolean live,
+ final boolean preCheckExistence) {
final TableLocationProviderImpl tableLocationProvider =
(TableLocationProviderImpl) getTableLocationProvider(tableKey);
return new SourcePartitionedTable(
@@ -132,7 +136,8 @@ public PartitionedTable makePartitionedTable(@NotNull final TableKeyImpl tableKe
tableLocationProvider,
live,
live,
- tlk -> true);
+ tlk -> true,
+ preCheckExistence);
}
/**
diff --git a/py/server/deephaven/experimental/table_data_service.py b/py/server/deephaven/experimental/table_data_service.py
index fdf6d677294..9d171b21881 100644
--- a/py/server/deephaven/experimental/table_data_service.py
+++ b/py/server/deephaven/experimental/table_data_service.py
@@ -260,7 +260,7 @@ def make_table(self, table_key: TableKey, *, refreshing: bool) -> Table:
Args:
table_key (TableKey): the table key
- refreshing (bool): whether the table is live or static
+ refreshing (bool): whether the table is live (True) or static (False)
Returns:
Table: a new table
@@ -274,13 +274,15 @@ def make_table(self, table_key: TableKey, *, refreshing: bool) -> Table:
except Exception as e:
raise DHError(e, message=f"failed to make a table for the key {table_key}") from e
- def make_partitioned_table(self, table_key: TableKey, *, refreshing: bool) -> PartitionedTable:
+ def make_partitioned_table(self, table_key: TableKey, *, refreshing: bool,
+ pre_check_existence: bool = False) -> PartitionedTable:
""" Creates a PartitionedTable backed by the backend service with the given table key.
Args:
table_key (TableKey): the table key
- refreshing (bool): whether the partitioned table is live or static
-
+ refreshing (bool): whether the partitioned table is live (True) or static (False)
+ pre_check_existence (bool): whether the partitioned table should verify that locations exist and are
+ non-empty before including them in the table
Returns:
PartitionedTable: a new partitioned table
@@ -289,7 +291,8 @@ def make_partitioned_table(self, table_key: TableKey, *, refreshing: bool) -> Pa
"""
j_table_key = _JTableKeyImpl(table_key)
try:
- return PartitionedTable(self._j_tbl_service.makePartitionedTable(j_table_key, refreshing))
+ return PartitionedTable(
+ self._j_tbl_service.makePartitionedTable(j_table_key, refreshing, pre_check_existence))
except Exception as e:
raise DHError(e, message=f"failed to make a partitioned table for the key {table_key}") from e