Skip to content

Commit

Permalink
Address PR comments - majorly move away from grpc-netty-shaded
Browse files Browse the repository at this point in the history
Signed-off-by: Rishabh Maurya <[email protected]>
  • Loading branch information
rishabhmaurya committed Feb 10, 2025
1 parent 488572c commit 98a5fab
Show file tree
Hide file tree
Showing 34 changed files with 137 additions and 925 deletions.
4 changes: 2 additions & 2 deletions libs/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,12 @@ subprojects {
dependencies.matching { it instanceof ProjectDependency }.all { ProjectDependency dep ->
Project depProject = project.project(dep.path)
if (depProject != null
&& (false == depProject.path.equals(':libs:opensearch-core') && false == depProject.path.equals(':libs:opensearch-arrow-memory-shaded') &&
&& (false == depProject.path.equals(':libs:opensearch-core') &&
false == depProject.path.equals(':libs:opensearch-common'))
&& depProject.path.startsWith(':libs')) {
throw new InvalidUserDataException("projects in :libs "
+ "may not depend on other projects libs except "
+ ":libs:opensearch-core or :libs:opensearch-common or :libs:opensearch-arrow-memory-shaded but "
+ ":libs:opensearch-core or :libs:opensearch-common but "
+ "${project.path} depends on ${depProject.path}")
}
}
Expand Down
83 changes: 42 additions & 41 deletions plugins/arrow-flight-rpc/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ apply plugin: 'opensearch.internal-cluster-test'

opensearchplugin {
description = 'Arrow flight based Stream implementation'
classname = 'org.opensearch.arrow.flight.FlightStreamPlugin'
classname = 'org.opensearch.arrow.flight.bootstrap.FlightStreamPlugin'
}

dependencies {
Expand All @@ -27,18 +27,28 @@ dependencies {

runtimeOnly "org.apache.arrow:arrow-memory-netty:${versions.arrow}"
runtimeOnly "org.apache.arrow:arrow-memory-netty-buffer-patch:${versions.arrow}"
runtimeOnly "io.netty:netty-buffer:${versions.netty}"
runtimeOnly "io.netty:netty-common:${versions.netty}"

implementation "io.netty:netty-buffer:${versions.netty}"
implementation "io.netty:netty-common:${versions.netty}"

implementation "io.netty:netty-codec:${versions.netty}"
implementation "io.netty:netty-codec-http:${versions.netty}"
implementation "io.netty:netty-codec-http2:${versions.netty}"
implementation "io.netty:netty-handler:${versions.netty}"
implementation "io.netty:netty-resolver:${versions.netty}"
implementation "io.netty:netty-transport:${versions.netty}"
implementation "io.netty:netty-transport-native-unix-common:${versions.netty}"
implementation "io.netty:netty-transport-classes-epoll:${versions.netty}"
implementation "io.netty:netty-tcnative-classes:2.0.66.Final"

implementation "org.slf4j:slf4j-api:${versions.slf4j}"
runtimeOnly "com.google.flatbuffers:flatbuffers-java:${versions.flatbuffers}"
runtimeOnly "commons-codec:commons-codec:${versions.commonscodec}"

implementation "io.grpc:grpc-netty-shaded:${versions.grpc}"

implementation "io.grpc:grpc-api:${versions.grpc}"
runtimeOnly "io.grpc:grpc-core:${versions.grpc}"
implementation "io.grpc:grpc-stub:${versions.grpc}"
implementation "io.grpc:grpc-netty:${versions.grpc}"

runtimeOnly group: 'com.google.code.findbugs', name: 'jsr305', version: '3.0.2'
compileOnly 'org.immutables:value:2.10.1'
Expand Down Expand Up @@ -194,16 +204,7 @@ tasks.named('thirdPartyAudit').configure {
'reactor.blockhound.BlockHound$Builder',
'reactor.blockhound.integration.BlockHoundIntegration',

'com.google.protobuf.util.Timestamps',
'io.grpc.netty.GrpcSslContexts',
'io.grpc.netty.NettyChannelBuilder',
'io.grpc.netty.NettyServerBuilder',
'io.netty.channel.EventLoopGroup',
'io.netty.channel.ServerChannel',
'io.netty.handler.ssl.ClientAuth',
'io.netty.handler.ssl.SslContext',
'io.netty.handler.ssl.SslContextBuilder',
'io.netty.handler.ssl.util.InsecureTrustManagerFactory'
'com.google.protobuf.util.Timestamps'
)
ignoreViolations(
// Guava internal classes
Expand All @@ -221,32 +222,32 @@ tasks.named('thirdPartyAudit').configure {
'com.google.common.util.concurrent.AbstractFuture$UnsafeAtomicHelper',
'com.google.common.util.concurrent.AbstractFuture$UnsafeAtomicHelper$1',

'io.grpc.netty.shaded.io.netty.handler.ssl.util.OpenJdkSelfSignedCertGenerator',
'io.grpc.netty.shaded.io.netty.handler.ssl.util.OpenJdkSelfSignedCertGenerator$1',
'io.grpc.netty.shaded.io.netty.handler.ssl.util.OpenJdkSelfSignedCertGenerator$2',
'io.grpc.netty.shaded.io.netty.handler.ssl.util.OpenJdkSelfSignedCertGenerator$3',
'io.grpc.netty.shaded.io.netty.handler.ssl.util.OpenJdkSelfSignedCertGenerator$4',
'io.grpc.netty.shaded.io.netty.handler.ssl.util.OpenJdkSelfSignedCertGenerator$5',
'io.grpc.netty.shaded.io.netty.util.internal.PlatformDependent0',
'io.grpc.netty.shaded.io.netty.util.internal.PlatformDependent0$1',
'io.grpc.netty.shaded.io.netty.util.internal.PlatformDependent0$2',
'io.grpc.netty.shaded.io.netty.util.internal.PlatformDependent0$3',
'io.grpc.netty.shaded.io.netty.util.internal.PlatformDependent0$4',
'io.grpc.netty.shaded.io.netty.util.internal.PlatformDependent0$6',
'io.grpc.netty.shaded.io.netty.util.internal.shaded.org.jctools.queues.BaseLinkedQueueConsumerNodeRef',
'io.grpc.netty.shaded.io.netty.util.internal.shaded.org.jctools.queues.BaseLinkedQueueProducerNodeRef',
'io.grpc.netty.shaded.io.netty.util.internal.shaded.org.jctools.queues.BaseMpscLinkedArrayQueueColdProducerFields',
'io.grpc.netty.shaded.io.netty.util.internal.shaded.org.jctools.queues.BaseMpscLinkedArrayQueueConsumerFields',
'io.grpc.netty.shaded.io.netty.util.internal.shaded.org.jctools.queues.BaseMpscLinkedArrayQueueProducerFields',
'io.grpc.netty.shaded.io.netty.util.internal.shaded.org.jctools.queues.LinkedQueueNode',
'io.grpc.netty.shaded.io.netty.util.internal.shaded.org.jctools.queues.MpmcArrayQueueConsumerIndexField',
'io.grpc.netty.shaded.io.netty.util.internal.shaded.org.jctools.queues.MpmcArrayQueueProducerIndexField',
'io.grpc.netty.shaded.io.netty.util.internal.shaded.org.jctools.queues.MpscArrayQueueConsumerIndexField',
'io.grpc.netty.shaded.io.netty.util.internal.shaded.org.jctools.queues.MpscArrayQueueProducerIndexField',
'io.grpc.netty.shaded.io.netty.util.internal.shaded.org.jctools.queues.MpscArrayQueueProducerLimitField',
'io.grpc.netty.shaded.io.netty.util.internal.shaded.org.jctools.util.UnsafeAccess',
'io.grpc.netty.shaded.io.netty.util.internal.shaded.org.jctools.util.UnsafeLongArrayAccess',
'io.grpc.netty.shaded.io.netty.util.internal.shaded.org.jctools.util.UnsafeRefArrayAccess',
'io.netty.handler.ssl.util.OpenJdkSelfSignedCertGenerator',
'io.netty.handler.ssl.util.OpenJdkSelfSignedCertGenerator$1',
'io.netty.handler.ssl.util.OpenJdkSelfSignedCertGenerator$2',
'io.netty.handler.ssl.util.OpenJdkSelfSignedCertGenerator$3',
'io.netty.handler.ssl.util.OpenJdkSelfSignedCertGenerator$4',
'io.netty.handler.ssl.util.OpenJdkSelfSignedCertGenerator$5',
'io.netty.util.internal.PlatformDependent0',
'io.netty.util.internal.PlatformDependent0$1',
'io.netty.util.internal.PlatformDependent0$2',
'io.netty.util.internal.PlatformDependent0$3',
'io.netty.util.internal.PlatformDependent0$4',
'io.netty.util.internal.PlatformDependent0$6',
'io.netty.util.internal.shaded.org.jctools.queues.BaseLinkedQueueConsumerNodeRef',
'io.netty.util.internal.shaded.org.jctools.queues.BaseLinkedQueueProducerNodeRef',
'io.netty.util.internal.shaded.org.jctools.queues.BaseMpscLinkedArrayQueueColdProducerFields',
'io.netty.util.internal.shaded.org.jctools.queues.BaseMpscLinkedArrayQueueConsumerFields',
'io.netty.util.internal.shaded.org.jctools.queues.BaseMpscLinkedArrayQueueProducerFields',
'io.netty.util.internal.shaded.org.jctools.queues.LinkedQueueNode',
'io.netty.util.internal.shaded.org.jctools.queues.MpmcArrayQueueConsumerIndexField',
'io.netty.util.internal.shaded.org.jctools.queues.MpmcArrayQueueProducerIndexField',
'io.netty.util.internal.shaded.org.jctools.queues.MpscArrayQueueConsumerIndexField',
'io.netty.util.internal.shaded.org.jctools.queues.MpscArrayQueueProducerIndexField',
'io.netty.util.internal.shaded.org.jctools.queues.MpscArrayQueueProducerLimitField',
'io.netty.util.internal.shaded.org.jctools.util.UnsafeAccess',
'io.netty.util.internal.shaded.org.jctools.util.UnsafeLongArrayAccess',
'io.netty.util.internal.shaded.org.jctools.util.UnsafeRefArrayAccess',
'io.netty.util.internal.PlatformDependent0',
'io.netty.util.internal.PlatformDependent0$1',
'io.netty.util.internal.PlatformDependent0$2',
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
3c3279d2e3520195fd26e0c3d9aca2ed1157d8c3

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
2831d3431ed93d9c0b64b1c0cce2ced4737539aa
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
5a0f8cd908b8b09b2cd1d39c1d2086a4d12e6029
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
e0a678ac80e00b08a4c0118d496efabc4516ebbf
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
db14cd99515f8c98a3f2a347718e59f14d85c503
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
581b37489a03162f473264b65f53d504269a74b0
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
9588bd2f891157538a78d86c945aa34bf9308dda
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
f81d72962bd134d8d8e11b514321134fa5fd0ce6
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
0ebca585acd227b8682ed5b2aafbb86d07f77848
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
684f2316ff2b2171babbc17c95ac3bd97f5f091e
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.apache.arrow.flight.OSFlightClient;
import org.opensearch.arrow.flight.bootstrap.FlightClientManager;
import org.opensearch.arrow.flight.bootstrap.FlightService;
import org.opensearch.arrow.flight.bootstrap.FlightStreamPlugin;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.plugins.Plugin;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,17 +54,17 @@
import io.grpc.ManagedChannel;
import io.grpc.MethodDescriptor;
import io.grpc.StatusRuntimeException;
import io.grpc.netty.shaded.io.grpc.netty.GrpcSslContexts;
import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder;
import io.grpc.netty.shaded.io.netty.channel.EventLoopGroup;
import io.grpc.netty.shaded.io.netty.channel.ServerChannel;
import io.grpc.netty.shaded.io.netty.handler.ssl.SslContext;
import io.grpc.netty.shaded.io.netty.handler.ssl.SslContextBuilder;
import io.grpc.netty.shaded.io.netty.handler.ssl.util.InsecureTrustManagerFactory;
import io.grpc.netty.GrpcSslContexts;
import io.grpc.netty.NettyChannelBuilder;
import io.grpc.stub.ClientCallStreamObserver;
import io.grpc.stub.ClientCalls;
import io.grpc.stub.ClientResponseObserver;
import io.grpc.stub.StreamObserver;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.ServerChannel;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;

/**
* Clone of {@link FlightClient} to support setting SslContext directly. It can be discarded once
Expand Down Expand Up @@ -718,7 +718,7 @@ public static Builder builder(BufferAllocator allocator, Location location) {
public static Builder builder(
BufferAllocator allocator,
Location location,
Class<? extends io.grpc.netty.shaded.io.netty.channel.Channel> channelType,
Class<? extends io.netty.channel.Channel> channelType,
ExecutorService executorService,
EventLoopGroup workerELG,
SslContext sslContext
Expand Down Expand Up @@ -747,7 +747,7 @@ public static final class Builder {
private boolean verifyServer = true;
private EventLoopGroup workerELG;
private ExecutorService executorService;
private Class<? extends io.grpc.netty.shaded.io.netty.channel.Channel> channelType;
private Class<? extends io.netty.channel.Channel> channelType;
private SslContext sslContext;

private Builder() {}
Expand Down Expand Up @@ -820,7 +820,7 @@ public Builder executor(ExecutorService executorService) {
return this;
}

public Builder channelType(Class<? extends io.grpc.netty.shaded.io.netty.channel.Channel> channelType) {
public Builder channelType(Class<? extends io.netty.channel.Channel> channelType) {
this.channelType = channelType;
return this;
}
Expand Down Expand Up @@ -860,10 +860,9 @@ public OSFlightClient build() {
try {
// Linux
builder.channelType(
Class.forName("io.grpc.netty.shaded.io.netty.channel.epoll.EpollDomainSocketChannel")
.asSubclass(ServerChannel.class)
Class.forName("io.netty.channel.epoll.EpollDomainSocketChannel").asSubclass(ServerChannel.class)
);
final EventLoopGroup elg = Class.forName("io.grpc.netty.shaded.io.netty.channel.epoll.EpollEventLoopGroup")
final EventLoopGroup elg = Class.forName("io.netty.channel.epoll.EpollEventLoopGroup")
.asSubclass(EventLoopGroup.class)
.getDeclaredConstructor()
.newInstance();
Expand All @@ -872,10 +871,9 @@ public OSFlightClient build() {
// BSD
// this might not work as io.netty.channel.kqueue classes aren't present in grpc-netty-shaded
builder.channelType(
Class.forName("io.grpc.netty.shaded.io.netty.channel.kqueue.KQueueDomainSocketChannel")
.asSubclass(ServerChannel.class)
Class.forName("io.netty.channel.kqueue.KQueueDomainSocketChannel").asSubclass(ServerChannel.class)
);
final EventLoopGroup elg = Class.forName("io.grpc.netty.shaded.io.netty.channel.kqueue.KQueueEventLoopGroup")
final EventLoopGroup elg = Class.forName("io.netty.channel.kqueue.KQueueEventLoopGroup")
.asSubclass(EventLoopGroup.class)
.getDeclaredConstructor()
.newInstance();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,14 @@

import io.grpc.Server;
import io.grpc.ServerInterceptors;
import io.grpc.netty.shaded.io.grpc.netty.GrpcSslContexts;
import io.grpc.netty.shaded.io.grpc.netty.NettyServerBuilder;
import io.grpc.netty.shaded.io.netty.channel.Channel;
import io.grpc.netty.shaded.io.netty.channel.EventLoopGroup;
import io.grpc.netty.shaded.io.netty.channel.ServerChannel;
import io.grpc.netty.shaded.io.netty.handler.ssl.ClientAuth;
import io.grpc.netty.shaded.io.netty.handler.ssl.SslContext;
import io.grpc.netty.shaded.io.netty.handler.ssl.SslContextBuilder;
import io.grpc.netty.GrpcSslContexts;
import io.grpc.netty.NettyServerBuilder;
import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.ServerChannel;
import io.netty.handler.ssl.ClientAuth;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;

/**
* Clone of {@link FlightServer} to support setting SslContext directly. It can be discarded once
Expand Down Expand Up @@ -243,10 +243,9 @@ public OSFlightServer build() {
try {
// Linux
builder.channelType(
Class.forName("io.grpc.netty.shaded.io.netty.channel.epoll.EpollServerDomainSocketChannel")
.asSubclass(ServerChannel.class)
Class.forName("io.netty.channel.epoll.EpollServerDomainSocketChannel").asSubclass(ServerChannel.class)
);
final EventLoopGroup elg = Class.forName("io.grpc.netty.shaded.io.netty.channel.epoll.EpollEventLoopGroup")
final EventLoopGroup elg = Class.forName("io.netty.channel.epoll.EpollEventLoopGroup")
.asSubclass(EventLoopGroup.class)
.getConstructor()
.newInstance();
Expand All @@ -255,10 +254,9 @@ public OSFlightServer build() {
// BSD
// below logic may not work as the kqueue classes aren't present in grpc-netty-shaded
builder.channelType(
Class.forName("io.grpc.netty.shaded.io.netty.channel.kqueue.KQueueServerDomainSocketChannel")
.asSubclass(ServerChannel.class)
Class.forName("io.netty.channel.kqueue.KQueueServerDomainSocketChannel").asSubclass(ServerChannel.class)
);
final EventLoopGroup elg = Class.forName("io.grpc.netty.shaded.io.netty.channel.kqueue.KQueueEventLoopGroup")
final EventLoopGroup elg = Class.forName("io.netty.channel.kqueue.KQueueEventLoopGroup")
.asSubclass(EventLoopGroup.class)
.getConstructor()
.newInstance();
Expand Down
Loading

0 comments on commit 98a5fab

Please sign in to comment.