Skip to content

Commit

Permalink
test: Add Flight SQL ADBC Java tests (#6432)
Browse files Browse the repository at this point in the history
This is adding Flight SQL Java ADBC tests, mainly in support of a
documentation effort on how to connect different clients to Deephaven
Flight SQL, see
deephaven/deephaven-docs-community#365
  • Loading branch information
devinrsmith authored Jan 13, 2025
1 parent 7a9526c commit b61abcf
Show file tree
Hide file tree
Showing 7 changed files with 392 additions and 1 deletion.
37 changes: 36 additions & 1 deletion extensions/flight-sql/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,21 @@ plugins {
description = 'The Deephaven Flight SQL library'

sourceSets {
adbcTest {
compileClasspath += sourceSets.main.output
runtimeClasspath += sourceSets.main.output
}

jdbcTest {
compileClasspath += sourceSets.main.output
runtimeClasspath += sourceSets.main.output
}
}

configurations {
adbcTestImplementation.extendsFrom implementation
adbcTestRuntimeOnly.extendsFrom runtimeOnly

jdbcTestImplementation.extendsFrom implementation
jdbcTestRuntimeOnly.extendsFrom runtimeOnly
}
Expand Down Expand Up @@ -42,8 +50,23 @@ dependencies {
testRuntimeOnly project(':log-to-slf4j')
testRuntimeOnly libs.slf4j.simple

// ADBC testing needs an actually server instance bound to a port because it can only connect over ADBC URIs like
// grpc://localhost:10000
adbcTestImplementation project(':server-jetty')
adbcTestImplementation libs.adbc.flight.sql

adbcTestImplementation project(':server-test-utils')
adbcTestAnnotationProcessor libs.dagger.compiler
adbcTestImplementation libs.assertj
adbcTestImplementation platform(libs.junit.bom)
adbcTestImplementation libs.junit.jupiter
adbcTestRuntimeOnly libs.junit.platform.launcher
adbcTestRuntimeOnly libs.junit.vintage.engine
adbcTestRuntimeOnly project(':log-to-slf4j')
adbcTestRuntimeOnly libs.slf4j.simple

// JDBC testing needs an actually server instance bound to a port because it can only connect over JDBC URIs like
// jdbc:arrow-flight-sql://localhost:1000.
// jdbc:arrow-flight-sql://localhost:10000.
jdbcTestImplementation project(':server-jetty')
jdbcTestRuntimeOnly libs.arrow.flight.sql.jdbc

Expand All @@ -62,6 +85,17 @@ test {
useJUnitPlatform()
}

def adbcTest = tasks.register('adbcTest', Test) {
description = 'Runs ADBC tests.'
group = 'verification'

testClassesDirs = sourceSets.adbcTest.output.classesDirs
classpath = sourceSets.adbcTest.runtimeClasspath
shouldRunAfter test

useJUnitPlatform()
}

def jdbcTest = tasks.register('jdbcTest', Test) {
description = 'Runs JDBC tests.'
group = 'verification'
Expand All @@ -73,6 +107,7 @@ def jdbcTest = tasks.register('jdbcTest', Test) {
useJUnitPlatform()
}

check.dependsOn adbcTest
check.dependsOn jdbcTest

apply plugin: 'io.deephaven.java-open-nio'
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
//
// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
//
package io.deephaven.server;

import io.deephaven.engine.context.ExecutionContext;
import io.deephaven.io.logger.LogBuffer;
import io.deephaven.io.logger.LogBufferGlobal;
import io.deephaven.server.runner.GrpcServer;
import io.deephaven.server.runner.MainHelper;
import io.deephaven.util.SafeCloseable;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Timeout;

import java.io.IOException;
import java.util.concurrent.TimeUnit;

@Timeout(30)
public abstract class DeephavenServerTestBase {

public interface TestComponent {

GrpcServer server();

ExecutionContext executionContext();
}

protected TestComponent component;

private LogBuffer logBuffer;
private SafeCloseable executionContext;
private GrpcServer server;
protected int localPort;

protected abstract TestComponent component();

@BeforeAll
static void setupOnce() throws IOException {
MainHelper.bootstrapProjectDirectories();
}

@BeforeEach
void setup() throws IOException {
logBuffer = new LogBuffer(128);
LogBufferGlobal.setInstance(logBuffer);
component = component();
executionContext = component.executionContext().open();
server = component.server();
server.start();
localPort = server.getPort();
}

@AfterEach
void tearDown() throws InterruptedException {
server.stopWithTimeout(10, TimeUnit.SECONDS);
server.join();
executionContext.close();
LogBufferGlobal.clear(logBuffer);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
//
// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
//
package io.deephaven.server.flightsql;

import io.deephaven.server.DeephavenServerTestBase;
import org.apache.arrow.adbc.core.AdbcConnection;
import org.apache.arrow.adbc.core.AdbcDatabase;
import org.apache.arrow.adbc.core.AdbcDriver;
import org.apache.arrow.adbc.core.AdbcException;
import org.apache.arrow.adbc.core.AdbcStatement;
import org.apache.arrow.adbc.driver.flightsql.FlightSqlConnectionProperties;
import org.apache.arrow.adbc.driver.flightsql.FlightSqlDriverFactory;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.IntVector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.ipc.ArrowReader;
import org.apache.arrow.vector.types.Types;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.FieldType;
import org.apache.arrow.vector.types.pojo.Schema;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static org.assertj.core.api.Assertions.assertThat;

public abstract class FlightSqlAdbcTestBase extends DeephavenServerTestBase {

private static final Map<String, String> DEEPHAVEN_INT = Map.of(
"deephaven:isSortable", "true",
"deephaven:isRowStyle", "false",
"deephaven:isPartitioning", "false",
"deephaven:type", "int",
"deephaven:isNumberFormat", "false",
"deephaven:isStyle", "false",
"deephaven:isDateFormat", "false");

BufferAllocator allocator;
AdbcDatabase database;
AdbcConnection connection;

@BeforeEach
void setUp() throws AdbcException {
final Map<String, Object> options = new HashMap<>();
AdbcDriver.PARAM_URI.set(options, String.format("grpc://localhost:%d", localPort));
FlightSqlConnectionProperties.WITH_COOKIE_MIDDLEWARE.set(options, true);
options.put(FlightSqlConnectionProperties.RPC_CALL_HEADER_PREFIX + "Authorization", "Anonymous");
options.put(FlightSqlConnectionProperties.RPC_CALL_HEADER_PREFIX + "x-deephaven-auth-cookie-request", "true");
allocator = new RootAllocator();
database = new FlightSqlDriverFactory().getDriver(allocator).open(options);
connection = database.connect();
}

@AfterEach
void tearDown() throws Exception {
connection.close();
database.close();
allocator.close();
}

@Test
void executeSchema() throws Exception {
final Schema expectedSchema = new Schema(List
.of(new Field("Foo", new FieldType(true, Types.MinorType.INT.getType(), null, DEEPHAVEN_INT), null)));
try (final AdbcStatement statement = connection.createStatement()) {
statement.setSqlQuery("SELECT 42 as Foo");
assertThat(statement.executeSchema()).isEqualTo(expectedSchema);
}
}

@Test
void executeQuery() throws Exception {
final Schema expectedSchema = new Schema(List
.of(new Field("Foo", new FieldType(true, Types.MinorType.INT.getType(), null, DEEPHAVEN_INT), null)));
try (final AdbcStatement statement = connection.createStatement()) {
statement.setSqlQuery("SELECT 42 as Foo");
try (final AdbcStatement.QueryResult result = statement.executeQuery()) {
final ArrowReader reader = result.getReader();
assertThat(reader.loadNextBatch()).isTrue();
final VectorSchemaRoot root = reader.getVectorSchemaRoot();
assertThat(root.getSchema()).isEqualTo(expectedSchema);
final IntVector vector = (IntVector) root.getVector(0);
assertThat(vector.isNull(0)).isFalse();
assertThat(vector.get(0)).isEqualTo(42);
assertThat(reader.loadNextBatch()).isFalse();
}
}
}

@Test
void preparedExecuteQuery() throws Exception {
final Schema expectedSchema = new Schema(List
.of(new Field("Foo", new FieldType(true, Types.MinorType.INT.getType(), null, DEEPHAVEN_INT), null)));
try (final AdbcStatement statement = connection.createStatement()) {
statement.setSqlQuery("SELECT 42 as Foo");
statement.prepare();
try (final AdbcStatement.QueryResult result = statement.executeQuery()) {
final ArrowReader reader = result.getReader();
assertThat(reader.loadNextBatch()).isTrue();
final VectorSchemaRoot root = reader.getVectorSchemaRoot();
assertThat(root.getSchema()).isEqualTo(expectedSchema);
final IntVector vector = (IntVector) root.getVector(0);
assertThat(vector.isNull(0)).isFalse();
assertThat(vector.get(0)).isEqualTo(42);
assertThat(reader.loadNextBatch()).isFalse();
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
//
// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
//
package io.deephaven.server.flightsql;

import dagger.Module;
import dagger.Provides;
import dagger.multibindings.IntoSet;
import io.deephaven.base.clock.Clock;
import io.deephaven.engine.context.ExecutionContext;
import io.deephaven.engine.updategraph.OperationInitializer;
import io.deephaven.engine.updategraph.UpdateGraph;
import io.deephaven.engine.util.AbstractScriptSession;
import io.deephaven.engine.util.NoLanguageDeephavenSession;
import io.deephaven.engine.util.ScriptSession;
import io.deephaven.server.arrow.ArrowModule;
import io.deephaven.server.auth.AuthorizationProvider;
import io.deephaven.server.config.ConfigServiceModule;
import io.deephaven.server.console.ConsoleModule;
import io.deephaven.server.log.LogModule;
import io.deephaven.server.plugin.PluginsModule;
import io.deephaven.server.session.ExportTicketResolver;
import io.deephaven.server.session.ObfuscatingErrorTransformerModule;
import io.deephaven.server.session.SessionModule;
import io.deephaven.server.session.TicketResolver;
import io.deephaven.server.table.TableModule;
import io.deephaven.server.test.TestAuthModule;
import io.deephaven.server.test.TestAuthorizationProvider;
import io.deephaven.server.util.Scheduler;

import javax.inject.Named;
import javax.inject.Singleton;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;

@Module(includes = {
ArrowModule.class,
ConfigServiceModule.class,
ConsoleModule.class,
LogModule.class,
SessionModule.class,
TableModule.class,
TestAuthModule.class,
ObfuscatingErrorTransformerModule.class,
PluginsModule.class,
FlightSqlModule.class
})
public class FlightSqlTestModule {
@IntoSet
@Provides
TicketResolver ticketResolver(ExportTicketResolver resolver) {
return resolver;
}

@Singleton
@Provides
AbstractScriptSession<?> provideAbstractScriptSession(
final UpdateGraph updateGraph,
final OperationInitializer operationInitializer) {
return new NoLanguageDeephavenSession(
updateGraph, operationInitializer, "non-script-session");
}

@Provides
ScriptSession provideScriptSession(AbstractScriptSession<?> scriptSession) {
return scriptSession;
}

@Provides
@Singleton
ScheduledExecutorService provideExecutorService() {
return Executors.newScheduledThreadPool(1);
}

@Provides
Scheduler provideScheduler(ScheduledExecutorService concurrentExecutor) {
return new Scheduler.DelegatingImpl(
Executors.newSingleThreadExecutor(),
concurrentExecutor,
Clock.system());
}

@Provides
@Named("session.tokenExpireMs")
long provideTokenExpireMs() {
return 60_000_000;
}

@Provides
@Named("http.port")
int provideHttpPort() {
return 0;// 'select first available'
}

@Provides
@Named("grpc.maxInboundMessageSize")
int provideMaxInboundMessageSize() {
return 1024 * 1024;
}

@Provides
AuthorizationProvider provideAuthorizationProvider(TestAuthorizationProvider provider) {
return provider;
}

@Provides
@Singleton
TestAuthorizationProvider provideTestAuthorizationProvider() {
return new TestAuthorizationProvider();
}

@Provides
@Singleton
static UpdateGraph provideUpdateGraph() {
return ExecutionContext.getContext().getUpdateGraph();
}

@Provides
@Singleton
static OperationInitializer provideOperationInitializer() {
return ExecutionContext.getContext().getOperationInitializer();
}
}
Loading

0 comments on commit b61abcf

Please sign in to comment.