Skip to content

Commit

Permalink
Use WebClient to create GrpcClient(s).
Browse files Browse the repository at this point in the history
  • Loading branch information
spericas committed Feb 21, 2024
1 parent 01cc067 commit 0bdd951
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,10 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import io.grpc.Channel;
import io.grpc.stub.StreamObserver;
import io.helidon.examples.grpc.strings.StringServiceGrpc;
import io.helidon.examples.grpc.strings.Strings;
import io.helidon.webclient.grpc.GrpcChannel;
import io.helidon.webclient.api.WebClient;
import io.helidon.webclient.grpc.GrpcClient;
import io.helidon.webserver.WebServerConfig;
import io.helidon.webserver.grpc.GrpcRouting;
Expand All @@ -44,10 +43,10 @@
class GrpcStubTest {
private static final long TIMEOUT_SECONDS = 10;

private final GrpcClient grpcClient;
private final WebClient webClient;

private GrpcStubTest(GrpcClient grpcClient) {
this.grpcClient = grpcClient;
private GrpcStubTest(WebClient webClient) {
this.webClient = webClient;
}

@SetUpServer
Expand Down Expand Up @@ -143,16 +142,16 @@ public void onCompleted() {

// @Test -- blocks indefinitely
void testUnaryUpper() {
Channel channel = new GrpcChannel(grpcClient);
StringServiceGrpc.StringServiceBlockingStub service = StringServiceGrpc.newBlockingStub(channel);
GrpcClient grpcClient = webClient.client(GrpcClient.PROTOCOL);
StringServiceGrpc.StringServiceBlockingStub service = StringServiceGrpc.newBlockingStub(grpcClient.channel());
Strings.StringMessage res = service.upper(newStringMessage("hello"));
assertThat(res.getText(), is("HELLO"));
}

@Test
void testUnaryUpperAsync() throws ExecutionException, InterruptedException, TimeoutException {
Channel channel = new GrpcChannel(grpcClient);
StringServiceGrpc.StringServiceStub service = StringServiceGrpc.newStub(channel);
GrpcClient grpcClient = webClient.client(GrpcClient.PROTOCOL);
StringServiceGrpc.StringServiceStub service = StringServiceGrpc.newStub(grpcClient.channel());
CompletableFuture<Strings.StringMessage> future = new CompletableFuture<>();
service.upper(newStringMessage("hello"), singleStreamObserver(future));
Strings.StringMessage res = future.get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
Expand All @@ -161,8 +160,8 @@ void testUnaryUpperAsync() throws ExecutionException, InterruptedException, Time

@Test
void testServerStreamingSplit() {
Channel channel = new GrpcChannel(grpcClient);
StringServiceGrpc.StringServiceBlockingStub service = StringServiceGrpc.newBlockingStub(channel);
GrpcClient grpcClient = webClient.client(GrpcClient.PROTOCOL);
StringServiceGrpc.StringServiceBlockingStub service = StringServiceGrpc.newBlockingStub(grpcClient.channel());
Iterator<Strings.StringMessage> res = service.split(newStringMessage("hello world"));
assertThat(res.next().getText(), is("hello"));
assertThat(res.next().getText(), is("world"));
Expand All @@ -171,8 +170,8 @@ void testServerStreamingSplit() {

@Test
void testServerStreamingSplitAsync() throws ExecutionException, InterruptedException, TimeoutException {
Channel channel = new GrpcChannel(grpcClient);
StringServiceGrpc.StringServiceStub service = StringServiceGrpc.newStub(channel);
GrpcClient grpcClient = webClient.client(GrpcClient.PROTOCOL);
StringServiceGrpc.StringServiceStub service = StringServiceGrpc.newStub(grpcClient.channel());
CompletableFuture<Iterator<Strings.StringMessage>> future = new CompletableFuture<>();
service.split(newStringMessage("hello world"), multiStreamObserver(future));
Iterator<Strings.StringMessage> res = future.get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
Expand All @@ -183,8 +182,8 @@ void testServerStreamingSplitAsync() throws ExecutionException, InterruptedExcep

@Test
void testClientStreamingJoinAsync() throws ExecutionException, InterruptedException, TimeoutException {
Channel channel = new GrpcChannel(grpcClient);
StringServiceGrpc.StringServiceStub service = StringServiceGrpc.newStub(channel);
GrpcClient grpcClient = webClient.client(GrpcClient.PROTOCOL);
StringServiceGrpc.StringServiceStub service = StringServiceGrpc.newStub(grpcClient.channel());
CompletableFuture<Strings.StringMessage> future = new CompletableFuture<>();
StreamObserver<Strings.StringMessage> req = service.join(singleStreamObserver(future));
req.onNext(newStringMessage("hello"));
Expand All @@ -196,8 +195,8 @@ void testClientStreamingJoinAsync() throws ExecutionException, InterruptedExcept

@Test
void testBidirectionalEchoAsync() throws ExecutionException, InterruptedException, TimeoutException {
Channel channel = new GrpcChannel(grpcClient);
StringServiceGrpc.StringServiceStub service = StringServiceGrpc.newStub(channel);
GrpcClient grpcClient = webClient.client(GrpcClient.PROTOCOL);
StringServiceGrpc.StringServiceStub service = StringServiceGrpc.newStub(grpcClient.channel());
CompletableFuture<Iterator<Strings.StringMessage>> future = new CompletableFuture<>();
StreamObserver<Strings.StringMessage> req = service.echo(multiStreamObserver(future));
req.onNext(newStringMessage("hello"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import java.util.function.Consumer;

import io.grpc.Channel;
import io.helidon.builder.api.RuntimeType;
import io.helidon.webclient.api.WebClient;
import io.helidon.webclient.spi.Protocol;
Expand All @@ -27,6 +28,8 @@
*/
@RuntimeType.PrototypedBy(GrpcClientConfig.class)
public interface GrpcClient extends RuntimeType.Api<GrpcClientConfig> {
String PROTOCOL_ID = "grpc";

/**
* Protocol to use to obtain an instance of gRPC specific client from
* {@link io.helidon.webclient.api.WebClient#client(io.helidon.webclient.spi.Protocol)}.
Expand Down Expand Up @@ -80,4 +83,11 @@ static GrpcClient create() {
* @return client for the provided descriptor
*/
GrpcServiceClient serviceClient(GrpcServiceDescriptor descriptor);

/**
* Create a gRPC channel for this client that can be used to create stubs.
*
* @return a new gRPC channel
*/
Channel channel();
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package io.helidon.webclient.grpc;

import io.grpc.Channel;
import io.helidon.webclient.api.WebClient;
import io.helidon.webclient.http2.Http2Client;

Expand Down Expand Up @@ -47,4 +48,9 @@ public GrpcClientConfig prototype() {
public GrpcServiceClient serviceClient(GrpcServiceDescriptor descriptor) {
return new GrpcServiceClientImpl(descriptor, this);
}

@Override
public Channel channel() {
return new GrpcChannel(this);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public GrpcProtocolProvider() {

@Override
public String protocolId() {
return CONFIG_KEY;
return GrpcClient.PROTOCOL_ID;
}

@Override
Expand All @@ -49,7 +49,8 @@ public GrpcClientProtocolConfig defaultConfig() {
@Override
public GrpcClient protocol(WebClient client, GrpcClientProtocolConfig config) {
return new GrpcClientImpl(client,
GrpcClientConfig.builder().from(client.prototype())
GrpcClientConfig.builder()
.from(client.prototype())
.protocolConfig(config)
.buildPrototype());
}
Expand Down

0 comments on commit 0bdd951

Please sign in to comment.