Skip to content

Commit

Permalink
fix: Ensure Flight SQL acquires shared lock
Browse files Browse the repository at this point in the history
  • Loading branch information
devinrsmith committed Dec 4, 2024
1 parent 3827ba0 commit 7c259c8
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@
import java.util.Objects;
import java.util.PrimitiveIterator;
import java.util.Set;
import java.util.concurrent.locks.Lock;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
Expand Down Expand Up @@ -644,11 +645,16 @@ interface TicketHandlerReleasable extends TicketHandler {

private Table executeSqlQuery(SessionState session, String sql) {
// See SQLTODO(catalog-reader-implementation)
final QueryScope queryScope = ExecutionContext.getContext().getQueryScope();
final ExecutionContext executionContext = ExecutionContext.getContext();
final QueryScope queryScope = executionContext.getQueryScope();
// noinspection unchecked,rawtypes
final Map<String, Table> queryScopeTables =
(Map<String, Table>) (Map) queryScope.toMap(queryScope::unwrapObject, (n, t) -> t instanceof Table);
final TableSpec tableSpec = Sql.parseSql(sql, queryScopeTables, TicketTable::fromQueryScopeField, null);
// Note: we only technically need the lock if any of the source tables are refreshing, but that requires some
// deeper introspection. We will play it safe and take the shared lock regardless.
final Lock sharedLock = executionContext.getUpdateGraph().sharedLock();
sharedLock.lock();
// Note: this is doing io.deephaven.server.session.TicketResolver.Authorization.transform, but not
// io.deephaven.auth.ServiceAuthWiring
// TODO(deephaven-core#6307): Declarative server-side table execution logic that preserves authorization logic
Expand All @@ -659,6 +665,8 @@ private Table executeSqlQuery(SessionState session, String sql) {
table.retainReference();
}
return table;
} finally {
sharedLock.unlock();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import dagger.Component;
import dagger.Module;
import io.deephaven.engine.context.ExecutionContext;
import io.deephaven.engine.context.QueryScope;
import io.deephaven.engine.table.ColumnDefinition;
import io.deephaven.engine.table.Table;
import io.deephaven.engine.table.TableDefinition;
Expand Down Expand Up @@ -86,6 +87,8 @@
import javax.inject.Singleton;
import java.io.PrintStream;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.Iterator;
Expand Down Expand Up @@ -893,6 +896,55 @@ public void unknownAction() {
actionNoResolver(() -> doAction(action), type);
}

@Test
public void refreshingTableTest() throws Exception {
// Set a start time so we can test we get out the expected number of rows
final Instant startTime = Instant.now().minus(Duration.ofHours(1));
final Table tt1 = TableTools.timeTableBuilder()
.startTime(startTime)
.period(Duration.ofSeconds(1))
.build()
.view("Timestamp1=Timestamp", "Id=ii % 11")
.lastBy("Id");
final Table tt2 = TableTools.timeTableBuilder()
.startTime(startTime)
.period(Duration.ofSeconds(5))
.build()
.view("Timestamp2=Timestamp", "Id=ii % 11")
.lastBy("Id");
final QueryScope queryScope = ExecutionContext.getContext().getQueryScope();
queryScope.putParam("my_table_1", tt1);
queryScope.putParam("my_table_2", tt2);
try {
final String query = "SELECT\n" +
" my_table_1.Id,\n" +
" my_table_1.Timestamp1,\n" +
" my_table_2.Timestamp2\n" +
"FROM\n" +
" my_table_1\n" +
" INNER JOIN my_table_2 ON my_table_1.Id = my_table_2.Id";
{
final FlightInfo info = flightSqlClient.execute(query);
consume(info, 1, 11, false);
}
{
final PreparedStatement prepared = flightSqlClient.prepare(query);
{
final FlightInfo info = prepared.execute();
consume(info, 1, 11, false);
}
{
final FlightInfo info = prepared.execute();
consume(info, 1, 11, false);
}
}
} finally {
queryScope.putParam("my_table_2", null);
queryScope.putParam("my_table_1", null);
}

}

private Result doAction(Action action) {
final Iterator<Result> it = flightClient.doAction(action);
if (!it.hasNext()) {
Expand Down

0 comments on commit 7c259c8

Please sign in to comment.