From 6caa6bba4967ba706f13db1ab555d28c7794dd9b Mon Sep 17 00:00:00 2001 From: Devin Smith Date: Thu, 5 Dec 2024 12:44:47 -0800 Subject: [PATCH] handle Ryan's feedback --- .../server/flightsql/FlightSqlResolver.java | 34 +++++++++++++++---- 1 file changed, 27 insertions(+), 7 deletions(-) diff --git a/extensions/flight-sql/src/main/java/io/deephaven/server/flightsql/FlightSqlResolver.java b/extensions/flight-sql/src/main/java/io/deephaven/server/flightsql/FlightSqlResolver.java index d967ac4ade8..784d01190cc 100644 --- a/extensions/flight-sql/src/main/java/io/deephaven/server/flightsql/FlightSqlResolver.java +++ b/extensions/flight-sql/src/main/java/io/deephaven/server/flightsql/FlightSqlResolver.java @@ -33,6 +33,8 @@ import io.deephaven.io.logger.Logger; import io.deephaven.proto.backplane.grpc.ExportNotification; import io.deephaven.proto.util.ByteHelper; +import io.deephaven.qst.table.NewTable; +import io.deephaven.qst.table.ParentsVisitor; import io.deephaven.qst.table.TableSpec; import io.deephaven.qst.table.TicketTable; import io.deephaven.server.auth.AuthorizationProvider; @@ -89,6 +91,7 @@ import java.io.IOException; import java.io.UncheckedIOException; import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; import java.security.SecureRandom; import java.time.Duration; import java.time.Instant; @@ -651,22 +654,39 @@ private Table executeSqlQuery(SessionState session, String sql) { final Map queryScopeTables = (Map) (Map) queryScope.toMap(queryScope::unwrapObject, (n, t) -> t instanceof Table); final TableSpec tableSpec = Sql.parseSql(sql, queryScopeTables, TicketTable::fromQueryScopeField, null); - // Note: we only technically need the lock if any of the source tables are refreshing, but that requires some - // deeper introspection. We will play it safe and take the shared lock regardless. - final Lock sharedLock = executionContext.getUpdateGraph().sharedLock(); - sharedLock.lock(); + + // We could consider doing finer-grained sharedLock in the future; right now, taking it for the whole operation + // if any of the TicketTable sources are refreshing. + boolean hasRefreshingSource = false; + for (final TableSpec node : ParentsVisitor.reachable(List.of(tableSpec))) { + // Of the source tables, SQL can produce a NewTable or a TicketTable (until we introduce custom functions, + // where we could conceivable have it produce EmptyTable, TimeTable, etc). + if (!(node instanceof TicketTable)) { + continue; + } + final String variableName = new String(((TicketTable) node).ticket(), StandardCharsets.UTF_8).substring(2); + final Table sourceTable = queryScopeTables.get(variableName); + Assert.neqNull(sourceTable, "sourceTable"); + if (sourceTable.isRefreshing()) { + hasRefreshingSource = true; + break; + } + } + // Note: this is doing io.deephaven.server.session.TicketResolver.Authorization.transform, but not // io.deephaven.auth.ServiceAuthWiring // TODO(deephaven-core#6307): Declarative server-side table execution logic that preserves authorization logic - try (final SafeCloseable ignored = LivenessScopeStack.open()) { + try ( + final SafeCloseable ignored = LivenessScopeStack.open(); + final SafeCloseable ignored1 = hasRefreshingSource + ? executionContext.getUpdateGraph().sharedLock().lockCloseable() + : null) { final Table table = tableSpec.logic() .create(new TableCreatorScopeTickets(TableCreatorImpl.INSTANCE, scopeTicketResolver, session)); if (table.isRefreshing()) { table.retainReference(); } return table; - } finally { - sharedLock.unlock(); } }