Skip to content

Commit

Permalink
Support for sync and async gRPC calls.
Browse files Browse the repository at this point in the history
Signed-off-by: Santiago Pericasgeertsen <[email protected]>
  • Loading branch information
spericas committed Feb 12, 2024
1 parent 1cd3d02 commit 62b4a66
Show file tree
Hide file tree
Showing 3 changed files with 154 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,14 @@

package io.helidon.examples.webserver.protocols;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import com.google.protobuf.StringValue;
import io.grpc.stub.StreamObserver;
Expand All @@ -37,11 +42,36 @@

@ServerTest
class GrpcTest {
private static final long TIMEOUT_SECONDS = 10;

private final GrpcClient grpcClient;
private final GrpcServiceDescriptor serviceDescriptor;

private GrpcTest(GrpcClient grpcClient) {
this.grpcClient = grpcClient;
this.serviceDescriptor = GrpcServiceDescriptor.builder()
.serviceName("StringService")
.putMethod("Upper",
GrpcClientMethodDescriptor.unary("StringService", "Upper")
.requestType(StringValue.class)
.responseType(StringValue.class)
.build())
.putMethod("Split",
GrpcClientMethodDescriptor.serverStreaming("StringService", "Split")
.requestType(StringValue.class)
.responseType(StringValue.class)
.build())
.putMethod("Join",
GrpcClientMethodDescriptor.clientStreaming("StringService", "Join")
.requestType(StringValue.class)
.responseType(StringValue.class)
.build())
.putMethod("Echo",
GrpcClientMethodDescriptor.bidirectional("StringService", "Echo")
.requestType(StringValue.class)
.responseType(StringValue.class)
.build())
.build();
}

@SetUpServer
Expand All @@ -65,6 +95,8 @@ public static void setup(WebServerConfig.Builder builder) {
GrpcTest::echo));
}

// -- gRPC server methods --

private static Strings.StringMessage upper(Strings.StringMessage reqT) {
return Strings.StringMessage.newBuilder()
.setText(reqT.getText().toUpperCase(Locale.ROOT))
Expand Down Expand Up @@ -114,8 +146,6 @@ public void onCompleted() {

private static StreamObserver<Strings.StringMessage> echo(StreamObserver<Strings.StringMessage> streamObserver) {
return new StreamObserver<>() {
private StringBuilder builder;

@Override
public void onNext(Strings.StringMessage value) {
streamObserver.onNext(value);
Expand All @@ -133,77 +163,134 @@ public void onCompleted() {
};
}

// -- Tests --

@Test
void testUnaryUpper() {
GrpcServiceDescriptor serviceDescriptor =
GrpcServiceDescriptor.builder()
.serviceName("StringService")
.putMethod("Upper",
GrpcClientMethodDescriptor.unary("StringService", "Upper")
.requestType(StringValue.class)
.responseType(StringValue.class)
.build())
.build();

StringValue r = grpcClient.serviceClient(serviceDescriptor)
StringValue res = grpcClient.serviceClient(serviceDescriptor)
.unary("Upper", StringValue.of("hello"));
assertThat(r.getValue(), is("HELLO"));
assertThat(res.getValue(), is("HELLO"));
}

@Test
void testUnaryUpperAsync() throws ExecutionException, InterruptedException, TimeoutException {
CompletableFuture<StringValue> future = new CompletableFuture<>();
grpcClient.serviceClient(serviceDescriptor)
.unary("Upper",
StringValue.of("hello"),
singleStreamObserver(future));
StringValue res = future.get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
assertThat(res.getValue(), is("HELLO"));
}

@Test
void testServerStreamingSplit() {
GrpcServiceDescriptor serviceDescriptor =
GrpcServiceDescriptor.builder()
.serviceName("StringService")
.putMethod("Split",
GrpcClientMethodDescriptor.serverStreaming("StringService", "Split")
.requestType(StringValue.class)
.responseType(StringValue.class)
.build())
.build();

Iterator<StringValue> r = grpcClient.serviceClient(serviceDescriptor)
.serverStream("Split", StringValue.of("hello world"));
assertThat(r.next().getValue(), is("hello"));
assertThat(r.next().getValue(), is("world"));
assertThat(r.hasNext(), is(false));
Iterator<StringValue> res = grpcClient.serviceClient(serviceDescriptor)
.serverStream("Split",
StringValue.of("hello world"));
assertThat(res.next().getValue(), is("hello"));
assertThat(res.next().getValue(), is("world"));
assertThat(res.hasNext(), is(false));
}

@Test
void testServerStreamingSplitAsync() throws ExecutionException, InterruptedException, TimeoutException {
CompletableFuture<Iterator<StringValue>> future = new CompletableFuture<>();
grpcClient.serviceClient(serviceDescriptor)
.serverStream("Split",
StringValue.of("hello world"),
multiStreamObserver(future));
Iterator<StringValue> res = future.get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
assertThat(res.next().getValue(), is("hello"));
assertThat(res.next().getValue(), is("world"));
assertThat(res.hasNext(), is(false));
}

@Test
void testClientStreamingJoin() {
GrpcServiceDescriptor serviceDescriptor =
GrpcServiceDescriptor.builder()
.serviceName("StringService")
.putMethod("Join",
GrpcClientMethodDescriptor.clientStreaming("StringService", "Join")
.requestType(StringValue.class)
.responseType(StringValue.class)
.build())
.build();

StringValue r = grpcClient.serviceClient(serviceDescriptor)
StringValue res = grpcClient.serviceClient(serviceDescriptor)
.clientStream("Join", List.of(StringValue.of("hello"),
StringValue.of("world")).iterator());
assertThat(r.getValue(), is("hello world"));
StringValue.of("world")).iterator());
assertThat(res.getValue(), is("hello world"));
}

@Test
void testClientStreamingJoinAsync() throws ExecutionException, InterruptedException, TimeoutException {
CompletableFuture<StringValue> future = new CompletableFuture<>();
StreamObserver<StringValue> req = grpcClient.serviceClient(serviceDescriptor)
.clientStream("Join", singleStreamObserver(future));
req.onNext(StringValue.of("hello"));
req.onNext(StringValue.of("world"));
req.onCompleted();
StringValue res = future.get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
assertThat(res.getValue(), is("hello world"));
}

@Test
void testBidirectionalEcho() {
GrpcServiceDescriptor serviceDescriptor =
GrpcServiceDescriptor.builder()
.serviceName("StringService")
.putMethod("Echo",
GrpcClientMethodDescriptor.bidirectional("StringService", "Echo")
.requestType(StringValue.class)
.responseType(StringValue.class)
.build())
.build();

Iterator<StringValue> r = grpcClient.serviceClient(serviceDescriptor)
Iterator<StringValue> res = grpcClient.serviceClient(serviceDescriptor)
.bidi("Echo", List.of(StringValue.of("hello"),
StringValue.of("world")).iterator());
assertThat(r.next().getValue(), is("hello"));
assertThat(r.next().getValue(), is("world"));
assertThat(r.hasNext(), is(false));
StringValue.of("world")).iterator());
assertThat(res.next().getValue(), is("hello"));
assertThat(res.next().getValue(), is("world"));
assertThat(res.hasNext(), is(false));
}

@Test
void testBidirectionalEchoAsync() throws ExecutionException, InterruptedException, TimeoutException {
CompletableFuture<Iterator<StringValue>> future = new CompletableFuture<>();
StreamObserver<StringValue> req = grpcClient.serviceClient(serviceDescriptor)
.bidi("Echo", multiStreamObserver(future));
req.onNext(StringValue.of("hello"));
req.onNext(StringValue.of("world"));
req.onCompleted();
Iterator<StringValue> res = future.get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
assertThat(res.next().getValue(), is("hello"));
assertThat(res.next().getValue(), is("world"));
assertThat(res.hasNext(), is(false));
}

// -- Utility methods --

private static <Resp> StreamObserver<Resp> singleStreamObserver(CompletableFuture<Resp> future) {
return new StreamObserver<Resp>() {
private Resp value;

@Override
public void onNext(Resp value) {
this.value = value;
}

@Override
public void onError(Throwable t) {
future.completeExceptionally(t);
}

@Override
public void onCompleted() {
future.complete(value);
}
};
}

private static <Resp> StreamObserver<Resp> multiStreamObserver(CompletableFuture<Iterator<Resp>> future) {
return new StreamObserver<Resp>() {
private final List<Resp> value = new ArrayList<>();

@Override
public void onNext(Resp value) {
this.value.add(value);
}

@Override
public void onError(Throwable t) {
future.completeExceptionally(t);
}

@Override
public void onCompleted() {
future.complete(value.iterator());
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public interface GrpcServiceClient {

<ReqT, RespT> RespT unary(String methodName, ReqT request);

<ReqT, RespT> StreamObserver<ReqT> unary(String methodName, StreamObserver<RespT> response);
<ReqT, RespT> void unary(String methodName, ReqT request, StreamObserver<RespT> response);

<ReqT, RespT> Iterator<RespT> serverStream(String methodName, ReqT request);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,9 @@ public <ReqT, RespT> RespT unary(String methodName, ReqT request) {
}

@Override
public <ReqT, RespT> StreamObserver<ReqT> unary(String methodName, StreamObserver<RespT> response) {
return null;
public <ReqT, RespT> void unary(String methodName, ReqT request, StreamObserver<RespT> response) {
ClientCall<ReqT, RespT> call = ensureMethod(methodName, MethodDescriptor.MethodType.UNARY);
ClientCalls.asyncUnaryCall(call, request, response);
}

@Override
Expand All @@ -59,6 +60,8 @@ public <ReqT, RespT> Iterator<RespT> serverStream(String methodName, ReqT reques

@Override
public <ReqT, RespT> void serverStream(String methodName, ReqT request, StreamObserver<RespT> response) {
ClientCall<ReqT, RespT> call = ensureMethod(methodName, MethodDescriptor.MethodType.SERVER_STREAMING);
ClientCalls.asyncServerStreamingCall(call, request, response);
}

@Override
Expand Down Expand Up @@ -100,7 +103,8 @@ public void onCompleted() {

@Override
public <ReqT, RespT> StreamObserver<ReqT> clientStream(String methodName, StreamObserver<RespT> response) {
return null;
ClientCall<ReqT, RespT> call = ensureMethod(methodName, MethodDescriptor.MethodType.CLIENT_STREAMING);
return ClientCalls.asyncClientStreamingCall(call, response);
}

@Override
Expand Down Expand Up @@ -142,7 +146,8 @@ public void onCompleted() {

@Override
public <ReqT, RespT> StreamObserver<ReqT> bidi(String methodName, StreamObserver<RespT> response) {
return null;
ClientCall<ReqT, RespT> call = ensureMethod(methodName, MethodDescriptor.MethodType.BIDI_STREAMING);
return ClientCalls.asyncBidiStreamingCall(call, response);
}

private <ReqT, RespT> ClientCall<ReqT, RespT> ensureMethod(String methodName, MethodDescriptor.MethodType methodType) {
Expand Down

0 comments on commit 62b4a66

Please sign in to comment.