From 0bdd95163c4769f4e3ff84f57b02a867306d2a54 Mon Sep 17 00:00:00 2001 From: Santiago Pericasgeertsen Date: Wed, 21 Feb 2024 10:32:28 -0500 Subject: [PATCH] Use WebClient to create GrpcClient(s). --- .../webserver/protocols/GrpcStubTest.java | 33 +++++++++---------- .../io/helidon/webclient/grpc/GrpcClient.java | 10 ++++++ .../webclient/grpc/GrpcClientImpl.java | 6 ++++ .../webclient/grpc/GrpcProtocolProvider.java | 5 +-- 4 files changed, 35 insertions(+), 19 deletions(-) diff --git a/examples/webserver/protocols/src/test/java/io/helidon/examples/webserver/protocols/GrpcStubTest.java b/examples/webserver/protocols/src/test/java/io/helidon/examples/webserver/protocols/GrpcStubTest.java index 3706d4e9f1f..2f6a14c1f82 100644 --- a/examples/webserver/protocols/src/test/java/io/helidon/examples/webserver/protocols/GrpcStubTest.java +++ b/examples/webserver/protocols/src/test/java/io/helidon/examples/webserver/protocols/GrpcStubTest.java @@ -25,11 +25,10 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import io.grpc.Channel; import io.grpc.stub.StreamObserver; import io.helidon.examples.grpc.strings.StringServiceGrpc; import io.helidon.examples.grpc.strings.Strings; -import io.helidon.webclient.grpc.GrpcChannel; +import io.helidon.webclient.api.WebClient; import io.helidon.webclient.grpc.GrpcClient; import io.helidon.webserver.WebServerConfig; import io.helidon.webserver.grpc.GrpcRouting; @@ -44,10 +43,10 @@ class GrpcStubTest { private static final long TIMEOUT_SECONDS = 10; - private final GrpcClient grpcClient; + private final WebClient webClient; - private GrpcStubTest(GrpcClient grpcClient) { - this.grpcClient = grpcClient; + private GrpcStubTest(WebClient webClient) { + this.webClient = webClient; } @SetUpServer @@ -143,16 +142,16 @@ public void onCompleted() { // @Test -- blocks indefinitely void testUnaryUpper() { - Channel channel = new GrpcChannel(grpcClient); - StringServiceGrpc.StringServiceBlockingStub service = StringServiceGrpc.newBlockingStub(channel); + GrpcClient grpcClient = webClient.client(GrpcClient.PROTOCOL); + StringServiceGrpc.StringServiceBlockingStub service = StringServiceGrpc.newBlockingStub(grpcClient.channel()); Strings.StringMessage res = service.upper(newStringMessage("hello")); assertThat(res.getText(), is("HELLO")); } @Test void testUnaryUpperAsync() throws ExecutionException, InterruptedException, TimeoutException { - Channel channel = new GrpcChannel(grpcClient); - StringServiceGrpc.StringServiceStub service = StringServiceGrpc.newStub(channel); + GrpcClient grpcClient = webClient.client(GrpcClient.PROTOCOL); + StringServiceGrpc.StringServiceStub service = StringServiceGrpc.newStub(grpcClient.channel()); CompletableFuture future = new CompletableFuture<>(); service.upper(newStringMessage("hello"), singleStreamObserver(future)); Strings.StringMessage res = future.get(TIMEOUT_SECONDS, TimeUnit.SECONDS); @@ -161,8 +160,8 @@ void testUnaryUpperAsync() throws ExecutionException, InterruptedException, Time @Test void testServerStreamingSplit() { - Channel channel = new GrpcChannel(grpcClient); - StringServiceGrpc.StringServiceBlockingStub service = StringServiceGrpc.newBlockingStub(channel); + GrpcClient grpcClient = webClient.client(GrpcClient.PROTOCOL); + StringServiceGrpc.StringServiceBlockingStub service = StringServiceGrpc.newBlockingStub(grpcClient.channel()); Iterator res = service.split(newStringMessage("hello world")); assertThat(res.next().getText(), is("hello")); assertThat(res.next().getText(), is("world")); @@ -171,8 +170,8 @@ void testServerStreamingSplit() { @Test void testServerStreamingSplitAsync() throws ExecutionException, InterruptedException, TimeoutException { - Channel channel = new GrpcChannel(grpcClient); - StringServiceGrpc.StringServiceStub service = StringServiceGrpc.newStub(channel); + GrpcClient grpcClient = webClient.client(GrpcClient.PROTOCOL); + StringServiceGrpc.StringServiceStub service = StringServiceGrpc.newStub(grpcClient.channel()); CompletableFuture> future = new CompletableFuture<>(); service.split(newStringMessage("hello world"), multiStreamObserver(future)); Iterator res = future.get(TIMEOUT_SECONDS, TimeUnit.SECONDS); @@ -183,8 +182,8 @@ void testServerStreamingSplitAsync() throws ExecutionException, InterruptedExcep @Test void testClientStreamingJoinAsync() throws ExecutionException, InterruptedException, TimeoutException { - Channel channel = new GrpcChannel(grpcClient); - StringServiceGrpc.StringServiceStub service = StringServiceGrpc.newStub(channel); + GrpcClient grpcClient = webClient.client(GrpcClient.PROTOCOL); + StringServiceGrpc.StringServiceStub service = StringServiceGrpc.newStub(grpcClient.channel()); CompletableFuture future = new CompletableFuture<>(); StreamObserver req = service.join(singleStreamObserver(future)); req.onNext(newStringMessage("hello")); @@ -196,8 +195,8 @@ void testClientStreamingJoinAsync() throws ExecutionException, InterruptedExcept @Test void testBidirectionalEchoAsync() throws ExecutionException, InterruptedException, TimeoutException { - Channel channel = new GrpcChannel(grpcClient); - StringServiceGrpc.StringServiceStub service = StringServiceGrpc.newStub(channel); + GrpcClient grpcClient = webClient.client(GrpcClient.PROTOCOL); + StringServiceGrpc.StringServiceStub service = StringServiceGrpc.newStub(grpcClient.channel()); CompletableFuture> future = new CompletableFuture<>(); StreamObserver req = service.echo(multiStreamObserver(future)); req.onNext(newStringMessage("hello")); diff --git a/webclient/grpc/src/main/java/io/helidon/webclient/grpc/GrpcClient.java b/webclient/grpc/src/main/java/io/helidon/webclient/grpc/GrpcClient.java index 9eb07db371c..d86950b215d 100644 --- a/webclient/grpc/src/main/java/io/helidon/webclient/grpc/GrpcClient.java +++ b/webclient/grpc/src/main/java/io/helidon/webclient/grpc/GrpcClient.java @@ -18,6 +18,7 @@ import java.util.function.Consumer; +import io.grpc.Channel; import io.helidon.builder.api.RuntimeType; import io.helidon.webclient.api.WebClient; import io.helidon.webclient.spi.Protocol; @@ -27,6 +28,8 @@ */ @RuntimeType.PrototypedBy(GrpcClientConfig.class) public interface GrpcClient extends RuntimeType.Api { + String PROTOCOL_ID = "grpc"; + /** * Protocol to use to obtain an instance of gRPC specific client from * {@link io.helidon.webclient.api.WebClient#client(io.helidon.webclient.spi.Protocol)}. @@ -80,4 +83,11 @@ static GrpcClient create() { * @return client for the provided descriptor */ GrpcServiceClient serviceClient(GrpcServiceDescriptor descriptor); + + /** + * Create a gRPC channel for this client that can be used to create stubs. + * + * @return a new gRPC channel + */ + Channel channel(); } diff --git a/webclient/grpc/src/main/java/io/helidon/webclient/grpc/GrpcClientImpl.java b/webclient/grpc/src/main/java/io/helidon/webclient/grpc/GrpcClientImpl.java index bf0934074d6..903dcfb2657 100644 --- a/webclient/grpc/src/main/java/io/helidon/webclient/grpc/GrpcClientImpl.java +++ b/webclient/grpc/src/main/java/io/helidon/webclient/grpc/GrpcClientImpl.java @@ -16,6 +16,7 @@ package io.helidon.webclient.grpc; +import io.grpc.Channel; import io.helidon.webclient.api.WebClient; import io.helidon.webclient.http2.Http2Client; @@ -47,4 +48,9 @@ public GrpcClientConfig prototype() { public GrpcServiceClient serviceClient(GrpcServiceDescriptor descriptor) { return new GrpcServiceClientImpl(descriptor, this); } + + @Override + public Channel channel() { + return new GrpcChannel(this); + } } diff --git a/webclient/grpc/src/main/java/io/helidon/webclient/grpc/GrpcProtocolProvider.java b/webclient/grpc/src/main/java/io/helidon/webclient/grpc/GrpcProtocolProvider.java index 7ea3122e521..43b10f17f40 100644 --- a/webclient/grpc/src/main/java/io/helidon/webclient/grpc/GrpcProtocolProvider.java +++ b/webclient/grpc/src/main/java/io/helidon/webclient/grpc/GrpcProtocolProvider.java @@ -33,7 +33,7 @@ public GrpcProtocolProvider() { @Override public String protocolId() { - return CONFIG_KEY; + return GrpcClient.PROTOCOL_ID; } @Override @@ -49,7 +49,8 @@ public GrpcClientProtocolConfig defaultConfig() { @Override public GrpcClient protocol(WebClient client, GrpcClientProtocolConfig config) { return new GrpcClientImpl(client, - GrpcClientConfig.builder().from(client.prototype()) + GrpcClientConfig.builder() + .from(client.prototype()) .protocolConfig(config) .buildPrototype()); }