Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Ensure Flight SQL acquires shared lock #6462

Merged
merged 11 commits into from
Dec 11, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import io.deephaven.io.logger.Logger;
import io.deephaven.proto.backplane.grpc.ExportNotification;
import io.deephaven.proto.util.ByteHelper;
import io.deephaven.qst.table.ParentsVisitor;
import io.deephaven.qst.table.TableSpec;
import io.deephaven.qst.table.TicketTable;
import io.deephaven.server.auth.AuthorizationProvider;
Expand Down Expand Up @@ -644,15 +645,39 @@ interface TicketHandlerReleasable extends TicketHandler {

private Table executeSqlQuery(SessionState session, String sql) {
// See SQLTODO(catalog-reader-implementation)
final QueryScope queryScope = ExecutionContext.getContext().getQueryScope();
// noinspection unchecked,rawtypes
final ExecutionContext executionContext = ExecutionContext.getContext();
final QueryScope queryScope = executionContext.getQueryScope();
final Map<String, Table> queryScopeTables =
(Map<String, Table>) (Map) queryScope.toMap(queryScope::unwrapObject, (n, t) -> t instanceof Table);
queryScope.toMap(o -> queryScopeAuthorizedTableMapper(queryScope, o), (n, t) -> t != null);
rcaudy marked this conversation as resolved.
Show resolved Hide resolved
final TableSpec tableSpec = Sql.parseSql(sql, queryScopeTables, TicketTable::fromQueryScopeField, null);

// We could consider doing finer-grained sharedLock in the future; right now, taking it for the whole operation
// if any of the TicketTable sources are refreshing.
boolean hasRefreshingSource = false;
for (final TableSpec node : ParentsVisitor.reachable(List.of(tableSpec))) {
// Of the source tables, SQL can produce a NewTable or a TicketTable (until we introduce custom functions,
// where we could conceivable have it produce EmptyTable, TimeTable, etc).
if (!(node instanceof TicketTable)) {
continue;
}
// We know that all TicketTables currently produced by Flight SQL will be global scope tickets
final Table sourceTable = scopeTicketResolver
rcaudy marked this conversation as resolved.
Show resolved Hide resolved
.<Table>resolve(session, ByteBuffer.wrap(((TicketTable) node).ticket()), "table")
.get();
if (sourceTable.isRefreshing()) {
hasRefreshingSource = true;
break;
}
}

// Note: this is doing io.deephaven.server.session.TicketResolver.Authorization.transform, but not
// io.deephaven.auth.ServiceAuthWiring
// TODO(deephaven-core#6307): Declarative server-side table execution logic that preserves authorization logic
try (final SafeCloseable ignored = LivenessScopeStack.open()) {
try (
final SafeCloseable ignored = LivenessScopeStack.open();
final SafeCloseable ignored1 = hasRefreshingSource
? executionContext.getUpdateGraph().sharedLock().lockCloseable()
: null) {
final Table table = tableSpec.logic()
.create(new TableCreatorScopeTickets(TableCreatorImpl.INSTANCE, scopeTicketResolver, session));
if (table.isRefreshing()) {
Expand All @@ -662,6 +687,22 @@ private Table executeSqlQuery(SessionState session, String sql) {
}
}

private Table queryScopeTableMapper(QueryScope queryScope, Object object) {
if (object == null) {
return null;
}
object = queryScope.unwrapObject(object);
if (!(object instanceof Table)) {
return null;
}
return (Table) object;
}

private Table queryScopeAuthorizedTableMapper(QueryScope queryScope, Object object) {
final Table table = queryScopeTableMapper(queryScope, object);
return table == null ? null : authorization.transform(table);
}

/**
* This is the base class for "easy" commands; that is, commands that have a fixed schema and are cheap to
* initialize.
Expand Down Expand Up @@ -1319,8 +1360,11 @@ private Table getTablesEmpty(boolean includeSchema, Map<String, Object> attribut
private Table getTables(boolean includeSchema, QueryScope queryScope, Map<String, Object> attributes,
Predicate<String> tableNameFilter) {
Objects.requireNonNull(attributes);
final Map<String, Table> queryScopeTables =
(Map<String, Table>) (Map) queryScope.toMap(queryScope::unwrapObject, (n, t) -> t instanceof Table);
// Note: _not_ using queryScopeAuthorizedTable mapper; we can have a more efficient implementation when
// !includeSchema that only needs to check authorization.isDeniedAccess.
final Map<String, Table> queryScopeTables = queryScope.toMap(
o -> queryScopeTableMapper(queryScope, o),
(tableName, table) -> table != null && tableNameFilter.test(tableName));
final int size = queryScopeTables.size();
final String[] catalogNames = new String[size];
final String[] dbSchemaNames = new String[size];
Expand All @@ -1330,9 +1374,6 @@ private Table getTables(boolean includeSchema, QueryScope queryScope, Map<String
int count = 0;
for (Entry<String, Table> e : queryScopeTables.entrySet()) {
final String tableName = e.getKey();
if (!tableNameFilter.test(tableName)) {
continue;
}
final Schema schema;
if (includeSchema) {
final Table table = authorization.transform(e.getValue());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import dagger.Component;
import dagger.Module;
import io.deephaven.engine.context.ExecutionContext;
import io.deephaven.engine.context.QueryScope;
import io.deephaven.engine.table.ColumnDefinition;
import io.deephaven.engine.table.Table;
import io.deephaven.engine.table.TableDefinition;
Expand Down Expand Up @@ -86,6 +87,8 @@
import javax.inject.Singleton;
import java.io.PrintStream;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.Iterator;
Expand Down Expand Up @@ -893,6 +896,55 @@ public void unknownAction() {
actionNoResolver(() -> doAction(action), type);
}

@Test
public void refreshingTableTest() throws Exception {
// Set a start time so we can test we get out the expected number of rows
final Instant startTime = Instant.now().minus(Duration.ofHours(1));
final Table tt1 = TableTools.timeTableBuilder()
.startTime(startTime)
.period(Duration.ofSeconds(1))
.build()
.view("Timestamp1=Timestamp", "Id=ii % 11")
.lastBy("Id");
final Table tt2 = TableTools.timeTableBuilder()
.startTime(startTime)
.period(Duration.ofSeconds(5))
.build()
.view("Timestamp2=Timestamp", "Id=ii % 11")
.lastBy("Id");
final QueryScope queryScope = ExecutionContext.getContext().getQueryScope();
queryScope.putParam("my_table_1", tt1);
queryScope.putParam("my_table_2", tt2);
try {
final String query = "SELECT\n" +
" my_table_1.Id,\n" +
" my_table_1.Timestamp1,\n" +
" my_table_2.Timestamp2\n" +
"FROM\n" +
" my_table_1\n" +
" INNER JOIN my_table_2 ON my_table_1.Id = my_table_2.Id";
{
final FlightInfo info = flightSqlClient.execute(query);
consume(info, 1, 11, false);
}
{
final PreparedStatement prepared = flightSqlClient.prepare(query);
{
final FlightInfo info = prepared.execute();
consume(info, 1, 11, false);
}
{
final FlightInfo info = prepared.execute();
consume(info, 1, 11, false);
}
}
} finally {
queryScope.putParam("my_table_2", null);
queryScope.putParam("my_table_1", null);
}

}

private Result doAction(Action action) {
final Iterator<Result> it = flightClient.doAction(action);
if (!it.hasNext()) {
Expand Down