diff --git a/lib/trino-filesystem-azure/pom.xml b/lib/trino-filesystem-azure/pom.xml
index e67100cd4db4..e69bb372af1b 100644
--- a/lib/trino-filesystem-azure/pom.xml
+++ b/lib/trino-filesystem-azure/pom.xml
@@ -24,7 +24,7 @@
com.azure
- azure-core-http-okhttp
+ azure-core-http-netty
@@ -36,10 +36,6 @@
com.azure
azure-identity
-
- com.azure
- azure-core-http-netty
-
com.nimbusds
oauth2-oidc-sdk
@@ -54,34 +50,16 @@
com.azure
azure-storage-blob
-
-
- com.azure
- azure-core-http-netty
-
-
com.azure
azure-storage-common
-
-
- com.azure
- azure-core-http-netty
-
-
com.azure
azure-storage-file-datalake
-
-
- com.azure
- azure-core-http-netty
-
-
@@ -94,11 +72,6 @@
guice
-
- com.squareup.okhttp3
- okhttp
-
-
io.airlift
configuration
@@ -114,6 +87,11 @@
opentelemetry-api
+
+ io.projectreactor.netty
+ reactor-netty-core
+
+
io.trino
trino-filesystem
diff --git a/lib/trino-filesystem-azure/src/main/java/io/trino/filesystem/azure/AzureFileSystemFactory.java b/lib/trino-filesystem-azure/src/main/java/io/trino/filesystem/azure/AzureFileSystemFactory.java
index 95f7a5cb1d06..989c7f45aee5 100644
--- a/lib/trino-filesystem-azure/src/main/java/io/trino/filesystem/azure/AzureFileSystemFactory.java
+++ b/lib/trino-filesystem-azure/src/main/java/io/trino/filesystem/azure/AzureFileSystemFactory.java
@@ -14,7 +14,7 @@
package io.trino.filesystem.azure;
import com.azure.core.http.HttpClient;
-import com.azure.core.http.okhttp.OkHttpAsyncHttpClientBuilder;
+import com.azure.core.http.netty.NettyAsyncHttpClientBuilder;
import com.azure.core.tracing.opentelemetry.OpenTelemetryTracingOptions;
import com.azure.core.util.HttpClientOptions;
import com.azure.core.util.TracingOptions;
@@ -25,11 +25,7 @@
import io.trino.filesystem.TrinoFileSystemFactory;
import io.trino.spi.security.ConnectorIdentity;
import jakarta.annotation.PreDestroy;
-import okhttp3.ConnectionPool;
-import okhttp3.Dispatcher;
-import okhttp3.OkHttpClient;
-
-import java.util.concurrent.TimeUnit;
+import reactor.netty.resources.ConnectionProvider;
import static com.google.common.base.Preconditions.checkArgument;
import static java.util.Objects.requireNonNull;
@@ -44,8 +40,8 @@ public class AzureFileSystemFactory
private final int maxWriteConcurrency;
private final DataSize maxSingleUploadSize;
private final TracingOptions tracingOptions;
- private final OkHttpClient okHttpClient;
private final HttpClient httpClient;
+ private final ConnectionProvider connectionProvider;
@Inject
public AzureFileSystemFactory(OpenTelemetry openTelemetry, AzureAuth azureAuth, AzureFileSystemConfig config)
@@ -80,24 +76,19 @@ public AzureFileSystemFactory(
this.maxWriteConcurrency = maxWriteConcurrency;
this.maxSingleUploadSize = requireNonNull(maxSingleUploadSize, "maxSingleUploadSize is null");
this.tracingOptions = new OpenTelemetryTracingOptions().setOpenTelemetry(openTelemetry);
-
- Dispatcher dispatcher = new Dispatcher();
- dispatcher.setMaxRequests(maxHttpRequests);
- dispatcher.setMaxRequestsPerHost(maxHttpRequests);
- okHttpClient = new OkHttpClient.Builder()
- .dispatcher(dispatcher)
- .build();
+ this.connectionProvider = ConnectionProvider.create(applicationId, maxHttpRequests);
HttpClientOptions clientOptions = new HttpClientOptions();
clientOptions.setTracingOptions(tracingOptions);
clientOptions.setApplicationId(applicationId);
- httpClient = createAzureHttpClient(okHttpClient, clientOptions);
+ httpClient = createAzureHttpClient(connectionProvider, clientOptions);
}
@PreDestroy
public void destroy()
{
- okHttpClient.dispatcher().executorService().shutdownNow();
- okHttpClient.connectionPool().evictAll();
+ if (connectionProvider != null) {
+ connectionProvider.dispose();
+ }
}
@Override
@@ -106,20 +97,19 @@ public TrinoFileSystem create(ConnectorIdentity identity)
return new AzureFileSystem(httpClient, tracingOptions, auth, endpoint, readBlockSize, writeBlockSize, maxWriteConcurrency, maxSingleUploadSize);
}
- public static HttpClient createAzureHttpClient(OkHttpClient okHttpClient, HttpClientOptions clientOptions)
+ public static HttpClient createAzureHttpClient(ConnectionProvider connectionProvider, HttpClientOptions clientOptions)
{
Integer poolSize = clientOptions.getMaximumConnectionPoolSize();
- // By default, OkHttp uses a maximum idle connection count of 5.
int maximumConnectionPoolSize = (poolSize != null && poolSize > 0) ? poolSize : 5;
+ clientOptions.setMaximumConnectionPoolSize(maximumConnectionPoolSize);
- return new OkHttpAsyncHttpClientBuilder(okHttpClient)
+ return new NettyAsyncHttpClientBuilder()
.proxy(clientOptions.getProxyOptions())
.configuration(clientOptions.getConfiguration())
- .connectionTimeout(clientOptions.getConnectTimeout())
+ .connectTimeout(clientOptions.getConnectTimeout())
.writeTimeout(clientOptions.getWriteTimeout())
.readTimeout(clientOptions.getReadTimeout())
- .connectionPool(new ConnectionPool(maximumConnectionPoolSize,
- clientOptions.getConnectionIdleTimeout().toMillis(), TimeUnit.MILLISECONDS))
+ .connectionProvider(connectionProvider)
.build();
}
}
diff --git a/plugin/trino-delta-lake/pom.xml b/plugin/trino-delta-lake/pom.xml
index efc785b7db2e..7fad2e4291e0 100644
--- a/plugin/trino-delta-lake/pom.xml
+++ b/plugin/trino-delta-lake/pom.xml
@@ -260,10 +260,6 @@
azure-storage-blob
runtime
-
- com.azure
- azure-core-http-netty
-
com.fasterxml.jackson.dataformat
jackson-dataformat-xml
@@ -289,6 +285,12 @@
runtime
+
+ io.projectreactor.netty
+ reactor-netty-core
+ runtime
+
+
io.trino
trino-filesystem-azure
diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeAdlsConnectorSmokeTest.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeAdlsConnectorSmokeTest.java
index 8c1e94a1addb..1ef73182529a 100644
--- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeAdlsConnectorSmokeTest.java
+++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeAdlsConnectorSmokeTest.java
@@ -25,9 +25,9 @@
import com.google.common.reflect.ClassPath;
import io.trino.plugin.hive.containers.HiveHadoop;
import io.trino.testing.QueryRunner;
-import okhttp3.OkHttpClient;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.TestInstance;
+import reactor.netty.resources.ConnectionProvider;
import java.io.IOException;
import java.io.UncheckedIOException;
@@ -77,13 +77,10 @@ protected HiveHadoop createHiveHadoop()
throws Exception
{
String connectionString = format("DefaultEndpointsProtocol=https;AccountName=%s;AccountKey=%s;EndpointSuffix=core.windows.net", account, accessKey);
- OkHttpClient okHttpClient = new OkHttpClient.Builder().build();
- closeAfterClass(() -> {
- okHttpClient.dispatcher().executorService().shutdownNow();
- okHttpClient.connectionPool().evictAll();
- });
+ ConnectionProvider provider = ConnectionProvider.create("TestDeltaLakeAdsl");
+ closeAfterClass(provider::dispose);
BlobServiceClient blobServiceClient = new BlobServiceClientBuilder().connectionString(connectionString)
- .httpClient(createAzureHttpClient(okHttpClient, new HttpClientOptions()))
+ .httpClient(createAzureHttpClient(provider, new HttpClientOptions()))
.buildClient();
this.azureContainerClient = blobServiceClient.getBlobContainerClient(container);
diff --git a/pom.xml b/pom.xml
index f92830c9b173..5791fcd2814b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -945,6 +945,12 @@
3.7.2
+
+ io.projectreactor.netty
+ reactor-netty-core
+ 1.1.21
+
+
io.swagger.core.v3