From 7c259c88a4ffe03599b0fc3cc7a21e8f55520aff Mon Sep 17 00:00:00 2001 From: Devin Smith Date: Wed, 4 Dec 2024 11:34:05 -0800 Subject: [PATCH] fix: Ensure Flight SQL acquires shared lock --- .../server/flightsql/FlightSqlResolver.java | 10 +++- .../server/flightsql/FlightSqlTest.java | 52 +++++++++++++++++++ 2 files changed, 61 insertions(+), 1 deletion(-) diff --git a/extensions/flight-sql/src/main/java/io/deephaven/server/flightsql/FlightSqlResolver.java b/extensions/flight-sql/src/main/java/io/deephaven/server/flightsql/FlightSqlResolver.java index 2f103832a0a..d967ac4ade8 100644 --- a/extensions/flight-sql/src/main/java/io/deephaven/server/flightsql/FlightSqlResolver.java +++ b/extensions/flight-sql/src/main/java/io/deephaven/server/flightsql/FlightSqlResolver.java @@ -99,6 +99,7 @@ import java.util.Objects; import java.util.PrimitiveIterator; import java.util.Set; +import java.util.concurrent.locks.Lock; import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Predicate; @@ -644,11 +645,16 @@ interface TicketHandlerReleasable extends TicketHandler { private Table executeSqlQuery(SessionState session, String sql) { // See SQLTODO(catalog-reader-implementation) - final QueryScope queryScope = ExecutionContext.getContext().getQueryScope(); + final ExecutionContext executionContext = ExecutionContext.getContext(); + final QueryScope queryScope = executionContext.getQueryScope(); // noinspection unchecked,rawtypes final Map queryScopeTables = (Map) (Map) queryScope.toMap(queryScope::unwrapObject, (n, t) -> t instanceof Table); final TableSpec tableSpec = Sql.parseSql(sql, queryScopeTables, TicketTable::fromQueryScopeField, null); + // Note: we only technically need the lock if any of the source tables are refreshing, but that requires some + // deeper introspection. We will play it safe and take the shared lock regardless. + final Lock sharedLock = executionContext.getUpdateGraph().sharedLock(); + sharedLock.lock(); // Note: this is doing io.deephaven.server.session.TicketResolver.Authorization.transform, but not // io.deephaven.auth.ServiceAuthWiring // TODO(deephaven-core#6307): Declarative server-side table execution logic that preserves authorization logic @@ -659,6 +665,8 @@ private Table executeSqlQuery(SessionState session, String sql) { table.retainReference(); } return table; + } finally { + sharedLock.unlock(); } } diff --git a/extensions/flight-sql/src/test/java/io/deephaven/server/flightsql/FlightSqlTest.java b/extensions/flight-sql/src/test/java/io/deephaven/server/flightsql/FlightSqlTest.java index d20d8d50c9e..b761c7aa039 100644 --- a/extensions/flight-sql/src/test/java/io/deephaven/server/flightsql/FlightSqlTest.java +++ b/extensions/flight-sql/src/test/java/io/deephaven/server/flightsql/FlightSqlTest.java @@ -11,6 +11,7 @@ import dagger.Component; import dagger.Module; import io.deephaven.engine.context.ExecutionContext; +import io.deephaven.engine.context.QueryScope; import io.deephaven.engine.table.ColumnDefinition; import io.deephaven.engine.table.Table; import io.deephaven.engine.table.TableDefinition; @@ -86,6 +87,8 @@ import javax.inject.Singleton; import java.io.PrintStream; import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.time.Instant; import java.util.ArrayList; import java.util.Comparator; import java.util.Iterator; @@ -893,6 +896,55 @@ public void unknownAction() { actionNoResolver(() -> doAction(action), type); } + @Test + public void refreshingTableTest() throws Exception { + // Set a start time so we can test we get out the expected number of rows + final Instant startTime = Instant.now().minus(Duration.ofHours(1)); + final Table tt1 = TableTools.timeTableBuilder() + .startTime(startTime) + .period(Duration.ofSeconds(1)) + .build() + .view("Timestamp1=Timestamp", "Id=ii % 11") + .lastBy("Id"); + final Table tt2 = TableTools.timeTableBuilder() + .startTime(startTime) + .period(Duration.ofSeconds(5)) + .build() + .view("Timestamp2=Timestamp", "Id=ii % 11") + .lastBy("Id"); + final QueryScope queryScope = ExecutionContext.getContext().getQueryScope(); + queryScope.putParam("my_table_1", tt1); + queryScope.putParam("my_table_2", tt2); + try { + final String query = "SELECT\n" + + " my_table_1.Id,\n" + + " my_table_1.Timestamp1,\n" + + " my_table_2.Timestamp2\n" + + "FROM\n" + + " my_table_1\n" + + " INNER JOIN my_table_2 ON my_table_1.Id = my_table_2.Id"; + { + final FlightInfo info = flightSqlClient.execute(query); + consume(info, 1, 11, false); + } + { + final PreparedStatement prepared = flightSqlClient.prepare(query); + { + final FlightInfo info = prepared.execute(); + consume(info, 1, 11, false); + } + { + final FlightInfo info = prepared.execute(); + consume(info, 1, 11, false); + } + } + } finally { + queryScope.putParam("my_table_2", null); + queryScope.putParam("my_table_1", null); + } + + } + private Result doAction(Action action) { final Iterator it = flightClient.doAction(action); if (!it.hasNext()) {