Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Ensure Flight SQL acquires shared lock #6462

Merged
merged 11 commits into from
Dec 11, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
import io.deephaven.io.logger.Logger;
import io.deephaven.proto.backplane.grpc.ExportNotification;
import io.deephaven.proto.util.ByteHelper;
import io.deephaven.qst.table.NewTable;
import io.deephaven.qst.table.ParentsVisitor;
import io.deephaven.qst.table.TableSpec;
import io.deephaven.qst.table.TicketTable;
import io.deephaven.server.auth.AuthorizationProvider;
Expand Down Expand Up @@ -89,6 +91,7 @@
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.security.SecureRandom;
import java.time.Duration;
import java.time.Instant;
Expand Down Expand Up @@ -651,22 +654,39 @@ private Table executeSqlQuery(SessionState session, String sql) {
final Map<String, Table> queryScopeTables =
(Map<String, Table>) (Map) queryScope.toMap(queryScope::unwrapObject, (n, t) -> t instanceof Table);
final TableSpec tableSpec = Sql.parseSql(sql, queryScopeTables, TicketTable::fromQueryScopeField, null);
// Note: we only technically need the lock if any of the source tables are refreshing, but that requires some
// deeper introspection. We will play it safe and take the shared lock regardless.
final Lock sharedLock = executionContext.getUpdateGraph().sharedLock();
sharedLock.lock();

// We could consider doing finer-grained sharedLock in the future; right now, taking it for the whole operation
// if any of the TicketTable sources are refreshing.
boolean hasRefreshingSource = false;
for (final TableSpec node : ParentsVisitor.reachable(List.of(tableSpec))) {
// Of the source tables, SQL can produce a NewTable or a TicketTable (until we introduce custom functions,
// where we could conceivable have it produce EmptyTable, TimeTable, etc).
if (!(node instanceof TicketTable)) {
continue;
}
final String variableName = new String(((TicketTable) node).ticket(), StandardCharsets.UTF_8).substring(2);
final Table sourceTable = queryScopeTables.get(variableName);
rcaudy marked this conversation as resolved.
Show resolved Hide resolved
Assert.neqNull(sourceTable, "sourceTable");
if (sourceTable.isRefreshing()) {
hasRefreshingSource = true;
break;
}
}

// Note: this is doing io.deephaven.server.session.TicketResolver.Authorization.transform, but not
// io.deephaven.auth.ServiceAuthWiring
// TODO(deephaven-core#6307): Declarative server-side table execution logic that preserves authorization logic
try (final SafeCloseable ignored = LivenessScopeStack.open()) {
try (
final SafeCloseable ignored = LivenessScopeStack.open();
final SafeCloseable ignored1 = hasRefreshingSource
? executionContext.getUpdateGraph().sharedLock().lockCloseable()
: null) {
final Table table = tableSpec.logic()
.create(new TableCreatorScopeTickets(TableCreatorImpl.INSTANCE, scopeTicketResolver, session));
if (table.isRefreshing()) {
table.retainReference();
}
return table;
} finally {
sharedLock.unlock();
}
}

Expand Down