From 9c7bddeeba6a3bd3f172fafddf5bc6ee84530627 Mon Sep 17 00:00:00 2001 From: Santiago Pericasgeertsen Date: Wed, 7 Feb 2024 11:39:19 -0500 Subject: [PATCH] Simple unary method call without using stubs. Signed-off-by: Santiago Pericasgeertsen --- .../webserver/protocols/GrpcTest.java | 3 +- .../webclient/grpc/GrpcClientCall.java | 79 ++++++++++++++++--- .../webclient/grpc/GrpcClientStream.java | 23 ++++++ .../http2/Http2ClientConnection.java | 2 +- .../webclient/http2/Http2ClientStream.java | 14 ++-- 5 files changed, 100 insertions(+), 21 deletions(-) diff --git a/examples/webserver/protocols/src/test/java/io/helidon/examples/webserver/protocols/GrpcTest.java b/examples/webserver/protocols/src/test/java/io/helidon/examples/webserver/protocols/GrpcTest.java index 56e05ca422a..eaeff53afdb 100644 --- a/examples/webserver/protocols/src/test/java/io/helidon/examples/webserver/protocols/GrpcTest.java +++ b/examples/webserver/protocols/src/test/java/io/helidon/examples/webserver/protocols/GrpcTest.java @@ -65,7 +65,8 @@ void testSimpleCall() { .build()) .build(); - String r = grpcClient.serviceClient(serviceDescriptor) + StringValue r = grpcClient.serviceClient(serviceDescriptor) .unary("Upper", StringValue.of("hello")); + System.out.println("### " + r); } } diff --git a/webclient/grpc/src/main/java/io/helidon/webclient/grpc/GrpcClientCall.java b/webclient/grpc/src/main/java/io/helidon/webclient/grpc/GrpcClientCall.java index 8e4cc327c4e..15bcd07a3f2 100644 --- a/webclient/grpc/src/main/java/io/helidon/webclient/grpc/GrpcClientCall.java +++ b/webclient/grpc/src/main/java/io/helidon/webclient/grpc/GrpcClientCall.java @@ -16,22 +16,31 @@ package io.helidon.webclient.grpc; +import java.io.InputStream; import java.time.Duration; import java.util.Collections; +import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import io.grpc.ClientCall; import io.grpc.Metadata; +import io.grpc.MethodDescriptor; +import io.grpc.Status; +import io.helidon.common.buffers.BufferData; +import io.helidon.common.tls.Tls; import io.helidon.http.Header; import io.helidon.http.HeaderNames; import io.helidon.http.HeaderValues; +import io.helidon.http.WritableHeaders; +import io.helidon.http.http2.Http2Headers; import io.helidon.http.http2.Http2Settings; import io.helidon.webclient.api.ClientConnection; import io.helidon.webclient.api.ClientUri; import io.helidon.webclient.api.ConnectionKey; import io.helidon.webclient.api.DefaultDnsResolver; import io.helidon.webclient.api.DnsAddressLookup; +import io.helidon.webclient.api.Proxy; import io.helidon.webclient.api.TcpClientConnection; import io.helidon.webclient.api.WebClient; import io.helidon.webclient.http2.Http2ClientConnection; @@ -41,12 +50,13 @@ /** * A gRPC client call handler. The typical order of calls will be: * - * start request* sendMessage* halfClose + * start request* sendMessage* halfClose * * @param * @param */ class GrpcClientCall extends ClientCall { + private static final Header GRPC_ACCEPT_ENCODING = HeaderValues.create(HeaderNames.ACCEPT_ENCODING, "gzip"); private static final Header GRPC_CONTENT_TYPE = HeaderValues.create(HeaderNames.CONTENT_TYPE, "application/grpc"); private final AtomicReference> responseListener = new AtomicReference<>(); @@ -54,20 +64,28 @@ class GrpcClientCall extends ClientCall { private final GrpcClientMethodDescriptor method; private final AtomicInteger messages = new AtomicInteger(); + private final MethodDescriptor.Marshaller requestMarshaller; + private final MethodDescriptor.Marshaller responseMarshaller; + + private Http2ClientConnection connection; + private GrpcClientStream clientStream; + GrpcClientCall(GrpcClientImpl grpcClient, GrpcClientMethodDescriptor method) { this.grpcClient = grpcClient; this.method = method; + this.requestMarshaller = (MethodDescriptor.Marshaller) method.descriptor().getRequestMarshaller(); + this.responseMarshaller = (MethodDescriptor.Marshaller) method.descriptor().getResponseMarshaller(); } @Override - public void start(Listener responseListener, Metadata headers) { + public void start(Listener responseListener, Metadata metadata) { if (this.responseListener.compareAndSet(null, responseListener)) { // obtain HTTP2 connection - Http2ClientConnection connection = Http2ClientConnection.create( - (Http2ClientImpl) grpcClient.http2Client(), clientConnection(), true); + connection = Http2ClientConnection.create((Http2ClientImpl) grpcClient.http2Client(), + clientConnection(), true); // create HTTP2 stream from connection - GrpcClientStream clientStream = new GrpcClientStream( + clientStream = new GrpcClientStream( connection, Http2Settings.create(), // Http2Settings null, // SocketContext @@ -84,13 +102,22 @@ public int priority() { @Override public Duration readTimeout() { - return grpcClient.prototype().readTimeout().orElse(null); + return grpcClient.prototype().readTimeout().orElse(Duration.ofSeconds(60)); } }, null, // Http2ClientConfig connection.streamIdSequence()); // send HEADERS frame + ClientUri clientUri = grpcClient.prototype().baseUri().orElseThrow(); + WritableHeaders headers = WritableHeaders.create(); + headers.add(Http2Headers.AUTHORITY_NAME, clientUri.authority()); + headers.add(Http2Headers.METHOD_NAME, "POST"); + headers.add(Http2Headers.PATH_NAME, "/" + method.descriptor().getFullMethodName()); + headers.add(Http2Headers.SCHEME_NAME, "http"); + headers.add(GRPC_CONTENT_TYPE); + headers.add(GRPC_ACCEPT_ENCODING); + clientStream.writeHeaders(Http2Headers.create(headers), false); } else { throw new IllegalStateException("Response listener was already set"); } @@ -99,23 +126,49 @@ public Duration readTimeout() { @Override public void request(int numMessages) { messages.addAndGet(numMessages); + + ExecutorService executor = grpcClient.webClient().executor(); + executor.submit(() -> { + clientStream.readHeaders(); + while (messages.decrementAndGet() > 0) { + BufferData bufferData = clientStream.read(); + bufferData.read(); // compression + bufferData.readUnsignedInt32(); // length prefixed + ResT res = responseMarshaller.parse(new InputStream() { + @Override + public int read() { + return bufferData.available() > 0 ? bufferData.read() : -1; + } + }); + responseListener.get().onMessage(res); + } + responseListener.get().onClose(Status.OK, new Metadata()); + clientStream.close(); + }); } @Override public void cancel(String message, Throwable cause) { // close the stream/connection via RST_STREAM - // can be closed even if halfClosed + clientStream.cancel(); } @Override public void halfClose() { // close the stream/connection - // GOAWAY frame + // clientStream.close(); + // connection.close(); } @Override public void sendMessage(ReqT message) { // send a DATA frame + BufferData messageData = BufferData.growing(512); + messageData.readFrom(requestMarshaller.stream(message)); + BufferData headerData = BufferData.create(5); + headerData.writeInt8(0); // no compression + headerData.writeUnsignedInt32(messageData.available()); // length prefixed + clientStream.writeData(BufferData.create(headerData, messageData), true); // todo EOS } private ClientConnection clientConnection() { @@ -123,20 +176,22 @@ private ClientConnection clientConnection() { ClientUri clientUri = clientConfig.baseUri().orElseThrow(); WebClient webClient = grpcClient.webClient(); + Tls tls = Tls.builder().enabled(false).build(); ConnectionKey connectionKey = new ConnectionKey( clientUri.scheme(), clientUri.host(), clientUri.port(), - clientConfig.readTimeout().orElse(null), - null, + clientConfig.readTimeout().orElse(Duration.ZERO), + tls, DefaultDnsResolver.create(), DnsAddressLookup.defaultLookup(), - null); + Proxy.noProxy()); return TcpClientConnection.create(webClient, connectionKey, Collections.emptyList(), connection -> false, - connection -> {}).connect(); + connection -> { + }).connect(); } } diff --git a/webclient/grpc/src/main/java/io/helidon/webclient/grpc/GrpcClientStream.java b/webclient/grpc/src/main/java/io/helidon/webclient/grpc/GrpcClientStream.java index 0da4f3171f3..d4a4d60e09e 100644 --- a/webclient/grpc/src/main/java/io/helidon/webclient/grpc/GrpcClientStream.java +++ b/webclient/grpc/src/main/java/io/helidon/webclient/grpc/GrpcClientStream.java @@ -16,7 +16,10 @@ package io.helidon.webclient.grpc; +import io.helidon.common.buffers.BufferData; import io.helidon.common.socket.SocketContext; +import io.helidon.http.http2.Http2FrameHeader; +import io.helidon.http.http2.Http2Headers; import io.helidon.http.http2.Http2Settings; import io.helidon.webclient.http2.Http2ClientConfig; import io.helidon.webclient.http2.Http2ClientConnection; @@ -34,4 +37,24 @@ class GrpcClientStream extends Http2ClientStream { LockingStreamIdSequence streamIdSeq) { super(connection, serverSettings, ctx, http2StreamConfig, http2ClientConfig, streamIdSeq); } + + @Override + public void headers(Http2Headers headers, boolean endOfStream) { + super.headers(headers, endOfStream); + } + + @Override + public void data(Http2FrameHeader header, BufferData data, boolean endOfStream) { + super.data(header, data, endOfStream); + } + + @Override + public void cancel() { + super.cancel(); + } + + @Override + public void close() { + super.close(); + } } diff --git a/webclient/http2/src/main/java/io/helidon/webclient/http2/Http2ClientConnection.java b/webclient/http2/src/main/java/io/helidon/webclient/http2/Http2ClientConnection.java index ff114a795ac..32cb9b903f9 100644 --- a/webclient/http2/src/main/java/io/helidon/webclient/http2/Http2ClientConnection.java +++ b/webclient/http2/src/main/java/io/helidon/webclient/http2/Http2ClientConnection.java @@ -206,7 +206,7 @@ void updateLastStreamId(int lastStreamId) { this.lastStreamId = lastStreamId; } - void close() { + public void close() { this.goAway(0, Http2ErrorCode.NO_ERROR, "Closing connection"); if (state.getAndSet(State.CLOSED) != State.CLOSED) { try { diff --git a/webclient/http2/src/main/java/io/helidon/webclient/http2/Http2ClientStream.java b/webclient/http2/src/main/java/io/helidon/webclient/http2/Http2ClientStream.java index 9d78605ef4b..9834b5ee8f8 100644 --- a/webclient/http2/src/main/java/io/helidon/webclient/http2/Http2ClientStream.java +++ b/webclient/http2/src/main/java/io/helidon/webclient/http2/Http2ClientStream.java @@ -190,7 +190,7 @@ boolean hasEntity() { return hasEntity; } - void cancel() { + public void cancel() { if (NON_CANCELABLE.contains(state)) { return; } @@ -206,7 +206,7 @@ void cancel() { } } - void close() { + public void close() { connection.removeStream(streamId); } @@ -227,7 +227,7 @@ BufferData read(int i) { return read(); } - BufferData read() { + public BufferData read() { while (state == Http2StreamState.HALF_CLOSED_LOCAL && readState != ReadState.END && hasEntity) { Http2FrameData frameData = readOne(timeout); if (frameData != null) { @@ -258,7 +258,7 @@ Status waitFor100Continue() { return null; } - void writeHeaders(Http2Headers http2Headers, boolean endOfStream) { + public void writeHeaders(Http2Headers http2Headers, boolean endOfStream) { this.state = Http2StreamState.checkAndGetState(this.state, Http2FrameType.HEADERS, true, endOfStream, true); this.readState = readState.check(http2Headers.httpHeaders().contains(HeaderValues.EXPECT_100) ? ReadState.CONTINUE_100_HEADERS @@ -294,7 +294,7 @@ void writeHeaders(Http2Headers http2Headers, boolean endOfStream) { } } - void writeData(BufferData entityBytes, boolean endOfStream) { + public void writeData(BufferData entityBytes, boolean endOfStream) { Http2FrameHeader frameHeader = Http2FrameHeader.create(entityBytes.available(), Http2FrameTypes.DATA, Http2Flag.DataFlags.create(endOfStream @@ -305,7 +305,7 @@ void writeData(BufferData entityBytes, boolean endOfStream) { splitAndWrite(frameData); } - Http2Headers readHeaders() { + public Http2Headers readHeaders() { while (readState == ReadState.HEADERS) { Http2FrameData frameData = readOne(timeout); if (frameData != null) { @@ -315,7 +315,7 @@ Http2Headers readHeaders() { return currentHeaders; } - SocketContext ctx() { + public SocketContext ctx() { return ctx; }