Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix connection leakage in native Azure filesystem #24773

Merged
merged 2 commits into from
Jan 30, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 16 additions & 28 deletions lib/trino-filesystem-azure/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-core-http-okhttp</artifactId>
<artifactId>azure-core-http-netty</artifactId>
</dependency>

<dependency>
Expand All @@ -36,10 +36,6 @@
<groupId>com.azure</groupId>
<artifactId>azure-identity</artifactId>
<exclusions>
<exclusion>
<groupId>com.azure</groupId>
<artifactId>azure-core-http-netty</artifactId>
</exclusion>
<exclusion>
<groupId>com.nimbusds</groupId>
<artifactId>oauth2-oidc-sdk</artifactId>
Expand All @@ -54,34 +50,16 @@
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-storage-blob</artifactId>
<exclusions>
<exclusion>
<groupId>com.azure</groupId>
<artifactId>azure-core-http-netty</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-storage-common</artifactId>
<exclusions>
<exclusion>
<groupId>com.azure</groupId>
<artifactId>azure-core-http-netty</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-storage-file-datalake</artifactId>
<exclusions>
<exclusion>
<groupId>com.azure</groupId>
<artifactId>azure-core-http-netty</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
Expand All @@ -94,11 +72,6 @@
<artifactId>guice</artifactId>
</dependency>

<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>okhttp</artifactId>
</dependency>

<dependency>
<groupId>io.airlift</groupId>
<artifactId>configuration</artifactId>
Expand All @@ -109,11 +82,26 @@
<artifactId>units</artifactId>
</dependency>

<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-common</artifactId>
</dependency>

<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport</artifactId>
</dependency>

<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-api</artifactId>
</dependency>

<dependency>
<groupId>io.projectreactor.netty</groupId>
<artifactId>reactor-netty-core</artifactId>
</dependency>

<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-filesystem</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,22 +14,22 @@
package io.trino.filesystem.azure;

import com.azure.core.http.HttpClient;
import com.azure.core.http.okhttp.OkHttpAsyncHttpClientBuilder;
import com.azure.core.http.netty.NettyAsyncHttpClientBuilder;
import com.azure.core.tracing.opentelemetry.OpenTelemetryTracingOptions;
import com.azure.core.util.HttpClientOptions;
import com.azure.core.util.TracingOptions;
import com.google.inject.Inject;
import io.airlift.units.DataSize;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.opentelemetry.api.OpenTelemetry;
import io.trino.filesystem.TrinoFileSystem;
import io.trino.filesystem.TrinoFileSystemFactory;
import io.trino.spi.security.ConnectorIdentity;
import jakarta.annotation.PreDestroy;
import okhttp3.ConnectionPool;
import okhttp3.Dispatcher;
import okhttp3.OkHttpClient;
import reactor.netty.resources.ConnectionProvider;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.ExecutionException;

import static com.google.common.base.Preconditions.checkArgument;
import static java.util.Objects.requireNonNull;
Expand All @@ -44,8 +44,9 @@ public class AzureFileSystemFactory
private final int maxWriteConcurrency;
private final DataSize maxSingleUploadSize;
private final TracingOptions tracingOptions;
private final OkHttpClient okHttpClient;
private final HttpClient httpClient;
private final ConnectionProvider connectionProvider;
private final EventLoopGroup eventLoopGroup;

@Inject
public AzureFileSystemFactory(OpenTelemetry openTelemetry, AzureAuth azureAuth, AzureFileSystemConfig config)
Expand Down Expand Up @@ -80,24 +81,32 @@ public AzureFileSystemFactory(
this.maxWriteConcurrency = maxWriteConcurrency;
this.maxSingleUploadSize = requireNonNull(maxSingleUploadSize, "maxSingleUploadSize is null");
this.tracingOptions = new OpenTelemetryTracingOptions().setOpenTelemetry(openTelemetry);

Dispatcher dispatcher = new Dispatcher();
dispatcher.setMaxRequests(maxHttpRequests);
dispatcher.setMaxRequestsPerHost(maxHttpRequests);
okHttpClient = new OkHttpClient.Builder()
.dispatcher(dispatcher)
.build();
this.connectionProvider = ConnectionProvider.create(applicationId, maxHttpRequests);
this.eventLoopGroup = new NioEventLoopGroup(maxHttpRequests);
HttpClientOptions clientOptions = new HttpClientOptions();
clientOptions.setTracingOptions(tracingOptions);
clientOptions.setApplicationId(applicationId);
httpClient = createAzureHttpClient(okHttpClient, clientOptions);
httpClient = createAzureHttpClient(connectionProvider, eventLoopGroup, clientOptions);
}

@PreDestroy
public void destroy()
{
okHttpClient.dispatcher().executorService().shutdownNow();
okHttpClient.connectionPool().evictAll();
if (connectionProvider != null) {
connectionProvider.dispose();
}
if (eventLoopGroup != null) {
try {
eventLoopGroup.shutdownGracefully().get();
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
catch (ExecutionException _) {
// ignored
}
}
}

@Override
Expand All @@ -106,20 +115,17 @@ public TrinoFileSystem create(ConnectorIdentity identity)
return new AzureFileSystem(httpClient, tracingOptions, auth, endpoint, readBlockSize, writeBlockSize, maxWriteConcurrency, maxSingleUploadSize);
}

public static HttpClient createAzureHttpClient(OkHttpClient okHttpClient, HttpClientOptions clientOptions)
public static HttpClient createAzureHttpClient(ConnectionProvider connectionProvider, EventLoopGroup eventLoopGroup, HttpClientOptions clientOptions)
{
Integer poolSize = clientOptions.getMaximumConnectionPoolSize();
// By default, OkHttp uses a maximum idle connection count of 5.
int maximumConnectionPoolSize = (poolSize != null && poolSize > 0) ? poolSize : 5;

return new OkHttpAsyncHttpClientBuilder(okHttpClient)
return new NettyAsyncHttpClientBuilder()
.proxy(clientOptions.getProxyOptions())
.configuration(clientOptions.getConfiguration())
.connectionTimeout(clientOptions.getConnectTimeout())
.connectTimeout(clientOptions.getConnectTimeout())
.writeTimeout(clientOptions.getWriteTimeout())
.readTimeout(clientOptions.getReadTimeout())
.connectionPool(new ConnectionPool(maximumConnectionPoolSize,
clientOptions.getConnectionIdleTimeout().toMillis(), TimeUnit.MILLISECONDS))
.responseTimeout(clientOptions.getResponseTimeout())
.connectionProvider(connectionProvider)
.eventLoopGroup(eventLoopGroup)
.build();
}
}
22 changes: 18 additions & 4 deletions plugin/trino-delta-lake/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -260,10 +260,6 @@
<artifactId>azure-storage-blob</artifactId>
<scope>runtime</scope>
<exclusions>
<exclusion>
<groupId>com.azure</groupId>
<artifactId>azure-core-http-netty</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-xml</artifactId>
Expand All @@ -283,12 +279,30 @@
<scope>runtime</scope>
</dependency>

<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-common</artifactId>
<scope>runtime</scope>
</dependency>

<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport</artifactId>
<scope>runtime</scope>
</dependency>

<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-sdk-trace</artifactId>
<scope>runtime</scope>
</dependency>

<dependency>
<groupId>io.projectreactor.netty</groupId>
<artifactId>reactor-netty-core</artifactId>
<scope>runtime</scope>
</dependency>

<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-filesystem-azure</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,13 @@
import com.google.common.io.ByteSource;
import com.google.common.io.Resources;
import com.google.common.reflect.ClassPath;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.trino.plugin.hive.containers.HiveHadoop;
import io.trino.testing.QueryRunner;
import okhttp3.OkHttpClient;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.TestInstance;
import reactor.netty.resources.ConnectionProvider;

import java.io.IOException;
import java.io.UncheckedIOException;
Expand Down Expand Up @@ -77,13 +79,14 @@ protected HiveHadoop createHiveHadoop()
throws Exception
{
String connectionString = format("DefaultEndpointsProtocol=https;AccountName=%s;AccountKey=%s;EndpointSuffix=core.windows.net", account, accessKey);
OkHttpClient okHttpClient = new OkHttpClient.Builder().build();
closeAfterClass(() -> {
okHttpClient.dispatcher().executorService().shutdownNow();
okHttpClient.connectionPool().evictAll();
});
ConnectionProvider provider = ConnectionProvider.create("TestDeltaLakeAdsl");
closeAfterClass(provider::dispose);

EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
closeAfterClass(eventLoopGroup::shutdownGracefully);

BlobServiceClient blobServiceClient = new BlobServiceClientBuilder().connectionString(connectionString)
.httpClient(createAzureHttpClient(okHttpClient, new HttpClientOptions()))
.httpClient(createAzureHttpClient(provider, eventLoopGroup, new HttpClientOptions()))
.buildClient();
this.azureContainerClient = blobServiceClient.getBlobContainerClient(container);

Expand Down
6 changes: 6 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -900,6 +900,12 @@
<version>3.7.2</version>
</dependency>

<dependency>
<groupId>io.projectreactor.netty</groupId>
<artifactId>reactor-netty-core</artifactId>
<version>1.1.21</version>
</dependency>

<!-- io.confluent:kafka-avro-serializer uses multiple versions of this transitive dependency -->
<dependency>
<groupId>io.swagger.core.v3</groupId>
Expand Down
Loading