Skip to content

Commit

Permalink
First crack at integrating a flag for turning off location existence …
Browse files Browse the repository at this point in the history
…checking in SourcePartitionedTable, as well as adding partitioning column values to the underlying partitioned Table
  • Loading branch information
rcaudy committed Jan 17, 2025
1 parent 47a50c3 commit 9c3ce12
Show file tree
Hide file tree
Showing 5 changed files with 116 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,20 +54,52 @@ public class SourcePartitionedTable extends PartitionedTableImpl {
* @param refreshSizes Whether the locations found should be refreshed
* @param locationKeyMatcher Function to filter desired location keys
*/
@Deprecated(forRemoval = true)
public SourcePartitionedTable(
@NotNull final TableDefinition constituentDefinition,
@NotNull final UnaryOperator<Table> applyTablePermissions,
@NotNull final TableLocationProvider tableLocationProvider,
final boolean refreshLocations,
final boolean refreshSizes,
@NotNull final Predicate<ImmutableTableLocationKey> locationKeyMatcher) {
this(constituentDefinition, applyTablePermissions, tableLocationProvider, refreshLocations, refreshSizes,
locationKeyMatcher, true);
}

/**
* Construct a {@link SourcePartitionedTable} from the supplied parameters.
* <p>
* Note that refreshLocations and refreshSizes are distinct because there are use cases that supply an external
* RowSet and hence don't require size refreshes. Others might care for size refreshes, but only the
* initially-available set of locations.
*
* @param constituentDefinition The {@link TableDefinition} expected of constituent {@link Table tables}
* @param applyTablePermissions Function to apply in order to correctly restrict the visible result rows
* @param tableLocationProvider Source for table locations
* @param refreshLocations Whether the set of locations should be refreshed
* @param refreshSizes Whether the locations found should be refreshed
* @param locationKeyMatcher Function to filter desired location keys
* @param preCheckExistence Whether to pre-check the existence (non-null, non-zero size) of locations before
* including them in the result SourcePartitionedTable as constituents. It is recommended to set this to
* {@code false} if you will do subsequent filtering on the result, or if you are confident that all
* locations are valid.
*/
public SourcePartitionedTable(
@NotNull final TableDefinition constituentDefinition,
@NotNull final UnaryOperator<Table> applyTablePermissions,
@NotNull final TableLocationProvider tableLocationProvider,
final boolean refreshLocations,
final boolean refreshSizes,
@NotNull final Predicate<ImmutableTableLocationKey> locationKeyMatcher,
final boolean preCheckExistence) {
super(new UnderlyingTableMaintainer(
constituentDefinition,
applyTablePermissions,
tableLocationProvider,
refreshLocations,
refreshSizes,
locationKeyMatcher).result(),
locationKeyMatcher,
preCheckExistence).result(),
Set.of(KEY_COLUMN_NAME),
true,
CONSTITUENT_COLUMN_NAME,
Expand All @@ -86,6 +118,8 @@ private static final class UnderlyingTableMaintainer extends ReferenceCountedLiv
private final Predicate<ImmutableTableLocationKey> locationKeyMatcher;

private final TrackingWritableRowSet resultRows;
private final String[] partitioningColumnNames;
private final WritableColumnSource<?>[] resultPartitionValues;
private final WritableColumnSource<TableLocationKey> resultTableLocationKeys;
private final WritableColumnSource<Table> resultLocationTables;
private final QueryTable result;
Expand All @@ -106,7 +140,8 @@ private UnderlyingTableMaintainer(
@NotNull final TableLocationProvider tableLocationProvider,
final boolean refreshLocations,
final boolean refreshSizes,
@NotNull final Predicate<ImmutableTableLocationKey> locationKeyMatcher) {
@NotNull final Predicate<ImmutableTableLocationKey> locationKeyMatcher,
final boolean preCheckExistence) {
super(false);

this.constituentDefinition = constituentDefinition;
Expand All @@ -116,10 +151,20 @@ private UnderlyingTableMaintainer(
this.locationKeyMatcher = locationKeyMatcher;

resultRows = RowSetFactory.empty().toTracking();
final List<ColumnDefinition<?>> partitioningColumns = constituentDefinition.getPartitioningColumns();
partitioningColumnNames = partitioningColumns.stream()
.map(ColumnDefinition::getName)
.toArray(String[]::new);
resultPartitionValues = partitioningColumns.stream()
.map(cd -> ArrayBackedColumnSource.getMemoryColumnSource(cd.getDataType(), cd.getComponentType()))
.toArray(WritableColumnSource[]::new);
resultTableLocationKeys = ArrayBackedColumnSource.getMemoryColumnSource(TableLocationKey.class, null);
resultLocationTables = ArrayBackedColumnSource.getMemoryColumnSource(Table.class, null);

final Map<String, ColumnSource<?>> resultSources = new LinkedHashMap<>(2);
final Map<String, ColumnSource<?>> resultSources = new LinkedHashMap<>(partitioningColumns.size() + 2);
for (int pci = 0; pci < partitioningColumns.size(); ++pci) {
resultSources.put(partitioningColumnNames[pci], resultPartitionValues[pci]);
}
resultSources.put(KEY_COLUMN_NAME, resultTableLocationKeys);
resultSources.put(CONSTITUENT_COLUMN_NAME, resultLocationTables);
result = new QueryTable(resultRows, resultSources);
Expand All @@ -135,14 +180,17 @@ private UnderlyingTableMaintainer(
}

if (needToRefreshLocations) {
Arrays.stream(resultPartitionValues).forEach(ColumnSource::startTrackingPrevValues);
resultTableLocationKeys.startTrackingPrevValues();
resultLocationTables.startTrackingPrevValues();

subscriptionBuffer = new TableLocationSubscriptionBuffer(tableLocationProvider);
manage(subscriptionBuffer);

pendingLocationStates = new IntrusiveDoublyLinkedQueue<>(
IntrusiveDoublyLinkedNode.Adapter.<PendingLocationState>getInstance());
pendingLocationStates = preCheckExistence
? new IntrusiveDoublyLinkedQueue<>(
IntrusiveDoublyLinkedNode.Adapter.<PendingLocationState>getInstance())
: null;
readyLocationStates = new IntrusiveDoublyLinkedQueue<>(
IntrusiveDoublyLinkedNode.Adapter.<PendingLocationState>getInstance());
processNewLocationsUpdateRoot = new InstrumentedTableUpdateSource(
Expand Down Expand Up @@ -206,12 +254,17 @@ private RowSet sortAndAddLocations(@NotNull final Stream<TableLocation> location
// Note that makeConstituentTable expects us to subsequently unmanage the TableLocations
unmanage(locations.sorted(Comparator.comparing(TableLocation::getKey)).peek(tl -> {
final long constituentRowKey = lastInsertedRowKey.incrementAndGet();
final Table constituentTable = makeConstituentTable(tl);

for (int pci = 0; pci < resultPartitionValues.length; ++pci) {
addPartitionValue(tl.getKey(), partitioningColumnNames[pci], resultPartitionValues[pci],
constituentRowKey);
}

resultTableLocationKeys.ensureCapacity(constituentRowKey + 1);
resultTableLocationKeys.set(constituentRowKey, tl.getKey());

resultLocationTables.ensureCapacity(constituentRowKey + 1);
final Table constituentTable = makeConstituentTable(tl);
resultLocationTables.set(constituentRowKey, constituentTable);

if (result.isRefreshing()) {
Expand All @@ -223,6 +276,15 @@ private RowSet sortAndAddLocations(@NotNull final Stream<TableLocation> location
: RowSetFactory.fromRange(initialLastRowKey + 1, lastInsertedRowKey.get());
}

private static <T> void addPartitionValue(
@NotNull final TableLocationKey tableLocationKey,
@NotNull final String partitioningColumnName,
@NotNull final WritableColumnSource<T> partitionValueColumn,
final long rowKey) {
partitionValueColumn.ensureCapacity(rowKey + 1);
partitionValueColumn.set(rowKey, tableLocationKey.getPartitionValue(partitioningColumnName));
}

private Table makeConstituentTable(@NotNull final TableLocation tableLocation) {
final PartitionAwareSourceTable constituent = new PartitionAwareSourceTable(
constituentDefinition,
Expand Down Expand Up @@ -280,19 +342,24 @@ private RowSet processAdditions(final TableLocationSubscriptionBuffer.LocationUp
* population in STM ColumnSources.
*/
// TODO (https://github.com/deephaven/deephaven-core/issues/867): Refactor around a ticking partition table
locationUpdate.getPendingAddedLocationKeys().stream()
final Stream<PendingLocationState> newPendingLocations = locationUpdate.getPendingAddedLocationKeys()
.stream()
.map(LiveSupplier::get)
.filter(locationKeyMatcher)
.map(tableLocationProvider::getTableLocation)
.peek(this::manage)
.map(PendingLocationState::new)
.forEach(pendingLocationStates::offer);
for (final Iterator<PendingLocationState> iter = pendingLocationStates.iterator(); iter.hasNext();) {
final PendingLocationState pendingLocationState = iter.next();
if (pendingLocationState.exists()) {
iter.remove();
readyLocationStates.offer(pendingLocationState);
.map(PendingLocationState::new);
if (pendingLocationStates != null) {
newPendingLocations.forEach(pendingLocationStates::offer);
for (final Iterator<PendingLocationState> iter = pendingLocationStates.iterator(); iter.hasNext();) {
final PendingLocationState pendingLocationState = iter.next();
if (pendingLocationState.exists()) {
iter.remove();
readyLocationStates.offer(pendingLocationState);
}
}
} else {
newPendingLocations.forEach(readyLocationStates::offer);
}

if (readyLocationStates.isEmpty()) {
Expand All @@ -317,22 +384,24 @@ private RowSet processRemovals(final TableLocationSubscriptionBuffer.LocationUpd
}

// Iterate through the pending locations and remove any that are in the removed set.
List<LivenessReferent> toUnmanage = null;
for (final Iterator<PendingLocationState> iter = pendingLocationStates.iterator(); iter.hasNext();) {
final PendingLocationState pendingLocationState = iter.next();
if (relevantRemovedLocations.contains(pendingLocationState.location.getKey())) {
iter.remove();
// Release the state and plan to unmanage the location
if (toUnmanage == null) {
toUnmanage = new ArrayList<>();
if (pendingLocationStates != null) {
List<LivenessReferent> toUnmanage = null;
for (final Iterator<PendingLocationState> iter = pendingLocationStates.iterator(); iter.hasNext();) {
final PendingLocationState pendingLocationState = iter.next();
if (relevantRemovedLocations.contains(pendingLocationState.location.getKey())) {
iter.remove();
// Release the state and plan to unmanage the location
if (toUnmanage == null) {
toUnmanage = new ArrayList<>();
}
toUnmanage.add(pendingLocationState.release());
}
toUnmanage.add(pendingLocationState.release());
}
}
if (toUnmanage != null) {
unmanage(toUnmanage.stream());
// noinspection UnusedAssignment
toUnmanage = null;
if (toUnmanage != null) {
unmanage(toUnmanage.stream());
// noinspection UnusedAssignment
toUnmanage = null;
}
}

// At the end of the cycle we need to make sure we unmanage any removed constituents.
Expand Down Expand Up @@ -367,6 +436,7 @@ private RowSet processRemovals(final TableLocationSubscriptionBuffer.LocationUpd
this.removedLocationsCommitter.maybeActivate();

final WritableRowSet deletedRows = deleteBuilder.build();
Arrays.stream(resultPartitionValues).forEach(cs -> cs.setNull(deletedRows));
resultTableLocationKeys.setNull(deletedRows);
resultLocationTables.setNull(deletedRows);
return deletedRows;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
/**
* A {@link TableLocationProvider} that provides access to exactly one, previously-known {@link TableLocation}. In
* contrast to {@link AbstractTableLocationProvider}, this class does not manage the liveness of the table location.
* Managment must be done externally (as in {@link io.deephaven.engine.table.impl.SourcePartitionedTable}).
* Management must be done externally (as in {@link io.deephaven.engine.table.impl.SourcePartitionedTable}).
*/
public final class SingleTableLocationProvider implements TableLocationProvider {
private static final String IMPLEMENTATION_NAME = SingleTableLocationProvider.class.getSimpleName();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,8 @@ private SourcePartitionedTable setUpData() {
tlp,
true,
true,
l -> true);
l -> true,
true);
}

private void verifyStringColumnContents(Table table, String columnName, String... expectedValues) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,10 +120,14 @@ public Table makeTable(@NotNull final TableKeyImpl tableKey, final boolean live)
*
* @param tableKey The table key
* @param live Whether the result should update as new data becomes available
* @param preCheckExistence Whether to include only locations observed to have non-empty data
* @return The {@link PartitionedTable}
*/
@ScriptApi
public PartitionedTable makePartitionedTable(@NotNull final TableKeyImpl tableKey, final boolean live) {
public PartitionedTable makePartitionedTable(
@NotNull final TableKeyImpl tableKey,
final boolean live,
final boolean preCheckExistence) {
final TableLocationProviderImpl tableLocationProvider =
(TableLocationProviderImpl) getTableLocationProvider(tableKey);
return new SourcePartitionedTable(
Expand All @@ -132,7 +136,8 @@ public PartitionedTable makePartitionedTable(@NotNull final TableKeyImpl tableKe
tableLocationProvider,
live,
live,
tlk -> true);
tlk -> true,
preCheckExistence);
}

/**
Expand Down
13 changes: 8 additions & 5 deletions py/server/deephaven/experimental/table_data_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ def make_table(self, table_key: TableKey, *, refreshing: bool) -> Table:
Args:
table_key (TableKey): the table key
refreshing (bool): whether the table is live or static
refreshing (bool): whether the table is live (True) or static (False)
Returns:
Table: a new table
Expand All @@ -274,13 +274,15 @@ def make_table(self, table_key: TableKey, *, refreshing: bool) -> Table:
except Exception as e:
raise DHError(e, message=f"failed to make a table for the key {table_key}") from e

def make_partitioned_table(self, table_key: TableKey, *, refreshing: bool) -> PartitionedTable:
def make_partitioned_table(self, table_key: TableKey, *, refreshing: bool,
pre_check_existence: bool = False) -> PartitionedTable:
""" Creates a PartitionedTable backed by the backend service with the given table key.
Args:
table_key (TableKey): the table key
refreshing (bool): whether the partitioned table is live or static
refreshing (bool): whether the partitioned table is live (True) or static (False)
pre_check_existence (bool): whether the partitioned table should verify that locations exist and are
non-empty before including them in the table
Returns:
PartitionedTable: a new partitioned table
Expand All @@ -289,7 +291,8 @@ def make_partitioned_table(self, table_key: TableKey, *, refreshing: bool) -> Pa
"""
j_table_key = _JTableKeyImpl(table_key)
try:
return PartitionedTable(self._j_tbl_service.makePartitionedTable(j_table_key, refreshing))
return PartitionedTable(
self._j_tbl_service.makePartitionedTable(j_table_key, refreshing, pre_check_existence))
except Exception as e:
raise DHError(e, message=f"failed to make a partitioned table for the key {table_key}") from e

Expand Down

0 comments on commit 9c3ce12

Please sign in to comment.