Skip to content

Commit

Permalink
Fixed problem with EOS and improved logging. All basic tests are pass…
Browse files Browse the repository at this point in the history
…ing.
  • Loading branch information
spericas committed Feb 16, 2024
1 parent e0d67f3 commit a4065a8
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 54 deletions.
8 changes: 8 additions & 0 deletions examples/webserver/protocols/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,14 @@
<groupId>io.helidon.webclient</groupId>
<artifactId>helidon-webclient-grpc</artifactId>
</dependency>
<dependency>
<groupId>io.helidon.logging</groupId>
<artifactId>helidon-logging-common</artifactId>
</dependency>
<dependency>
<groupId>io.helidon.logging</groupId>
<artifactId>helidon-logging-jul</artifactId>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Logger;

import io.grpc.ClientCall;
import io.grpc.Metadata;
Expand Down Expand Up @@ -53,8 +54,6 @@
import io.helidon.webclient.http2.Http2StreamConfig;
import io.helidon.webclient.http2.StreamTimeoutException;

import static java.lang.System.Logger.Level.DEBUG;

/**
* A gRPC client call handler. The typical order of calls will be:
*
Expand All @@ -64,7 +63,7 @@
* @param <ResT>
*/
class GrpcClientCall<ReqT, ResT> extends ClientCall<ReqT, ResT> {
private static final System.Logger LOGGER = System.getLogger(GrpcClientCall.class.getName());
private static final Logger LOGGER = Logger.getLogger(GrpcClientCall.class.getName());

private static final Header GRPC_ACCEPT_ENCODING = HeaderValues.create(HeaderNames.ACCEPT_ENCODING, "gzip");
private static final Header GRPC_CONTENT_TYPE = HeaderValues.create(HeaderNames.CONTENT_TYPE, "application/grpc");
Expand Down Expand Up @@ -105,17 +104,20 @@ class GrpcClientCall<ReqT, ResT> extends ClientCall<ReqT, ResT> {

@Override
public void start(Listener<ResT> responseListener, Metadata metadata) {
LOGGER.finest("start called");

this.responseListener = responseListener;

// obtain HTTP2 connection
ClientConnection clientConnection = clientConnection();
connection = Http2ClientConnection.create((Http2ClientImpl) grpcClient.http2Client(),
clientConnection(), true);
clientConnection, true);

// create HTTP2 stream from connection
clientStream = new GrpcClientStream(
connection,
Http2Settings.create(), // Http2Settings
null, // SocketContext
Http2Settings.create(), // Http2Settings
clientConnection.helidonSocket(), // SocketContext
new Http2StreamConfig() {
@Override
public boolean priorKnowledge() {
Expand All @@ -129,7 +131,7 @@ public int priority() {

@Override
public Duration readTimeout() {
return grpcClient.prototype().readTimeout().orElse(Duration.ofSeconds(60));
return grpcClient.prototype().readTimeout().orElse(Duration.ofSeconds(10));
}
},
null, // Http2ClientConfig
Expand All @@ -152,25 +154,27 @@ public Duration readTimeout() {

@Override
public void request(int numMessages) {
LOGGER.finest(() -> "request called " + numMessages);
messageRequest.addAndGet(numMessages);
LOGGER.log(DEBUG, () -> "Messages requested " + numMessages);
startReadBarrier.countDown();
}

@Override
public void cancel(String message, Throwable cause) {
LOGGER.finest(() -> "cancel called " + message);
responseListener.onClose(Status.CANCELLED, new Metadata());
close();
}

@Override
public void halfClose() {
LOGGER.finest("halfClose called");
sendingQueue.add(EMPTY_BUFFER_DATA); // end marker
}

@Override
public void sendMessage(ReqT message) {
// queue a message
LOGGER.finest("sendMessage called");
BufferData messageData = BufferData.growing(512);
messageData.readFrom(requestMarshaller.stream(message));
BufferData headerData = BufferData.create(5);
Expand All @@ -185,70 +189,80 @@ private void startStreamingThreads() {
writeStreamFuture = executor.submit(() -> {
try {
startWriteBarrier.await();
LOGGER.log(DEBUG, "[Writing thread] started");
LOGGER.fine("[Writing thread] started");

boolean endOfStream = false;
while (isRemoteOpen()) {
LOGGER.log(DEBUG, "[Writing thread] polling sending queue");
LOGGER.finest("[Writing thread] polling sending queue");
BufferData bufferData = sendingQueue.poll(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS);
if (bufferData != null) {
if (bufferData == EMPTY_BUFFER_DATA) { // end marker
LOGGER.log(DEBUG, "[Writing thread] sending queue end marker found");
LOGGER.finest("[Writing thread] sending queue end marker found");
if (!endOfStream) {
LOGGER.finest("[Writing thread] sending empty buffer to end stream");
clientStream.writeData(EMPTY_BUFFER_DATA, true);
}
break;
}
boolean endOfStream = (sendingQueue.peek() == EMPTY_BUFFER_DATA);
LOGGER.log(DEBUG, () -> "[Writing thread] writing bufferData " + endOfStream);
endOfStream = (sendingQueue.peek() == EMPTY_BUFFER_DATA);
boolean lastEndOfStream = endOfStream;
LOGGER.finest(() -> "[Writing thread] writing bufferData " + lastEndOfStream);
clientStream.writeData(bufferData, endOfStream);
}
}
} catch (InterruptedException e) {
// falls through
} catch (Throwable e) {
LOGGER.finest(e.getMessage());
}
LOGGER.log(DEBUG, "[Writing thread] exiting");
LOGGER.fine("[Writing thread] exiting");
});

// read streaming thread
readStreamFuture = executor.submit(() -> {
try {
startReadBarrier.await();
LOGGER.log(DEBUG, "[Reading thread] started");
LOGGER.fine("[Reading thread] started");

// read response headers
clientStream.readHeaders();

while (isRemoteOpen()) {
// attempt to send queued messages
// drain queue
drainReceivingQueue();

// trailers received?
if (clientStream.trailers().isDone()) {
LOGGER.finest("[Reading thread] trailers received");
break;
}

// attempt to read and queue
Http2FrameData frameData;
try {
frameData = clientStream.readOne(WAIT_TIME_MILLIS_DURATION);
} catch (StreamTimeoutException e) {
LOGGER.log(DEBUG, "[Reading thread] read timeout");
LOGGER.fine("[Reading thread] read timeout");
continue;
}
if (frameData != null) {
receivingQueue.add(frameData.data());
LOGGER.log(DEBUG, "[Reading thread] adding bufferData to receiving queue");
}

// trailers received?
if (clientStream.trailers().isDone()) {
drainReceivingQueue(); // one more attempt
break;
LOGGER.finest("[Reading thread] adding bufferData to receiving queue");
}
}

LOGGER.finest("[Reading thread] closing listener");
responseListener.onClose(Status.OK, new Metadata());
} catch (Throwable e) {
LOGGER.finest(e.getMessage());
responseListener.onClose(Status.UNKNOWN, new Metadata());
} finally {
close();
} catch (InterruptedException e) {
// falls through
}
LOGGER.log(DEBUG, "[Reading thread] exiting");
LOGGER.fine("[Reading thread] exiting");
});
}

private void close() {
LOGGER.finest("closing client call");
readStreamFuture.cancel(true);
writeStreamFuture.cancel(true);
sendingQueue.clear();
Expand Down Expand Up @@ -297,10 +311,11 @@ public int read() {
}

private void drainReceivingQueue() {
LOGGER.finest("[Reading thread] draining receiving queue");
while (messageRequest.get() > 0 && !receivingQueue.isEmpty()) {
messageRequest.getAndDecrement();
ResT res = toResponse(receivingQueue.remove());
LOGGER.log(DEBUG, "[Reading thread] sending response to listener");
LOGGER.finest("[Reading thread] sending response to listener");
responseListener.onMessage(res);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,7 @@

package io.helidon.webclient.grpc;

import io.helidon.common.buffers.BufferData;
import io.helidon.common.socket.SocketContext;
import io.helidon.http.http2.Http2FrameHeader;
import io.helidon.http.http2.Http2Headers;
import io.helidon.http.http2.Http2Settings;
import io.helidon.webclient.http2.Http2ClientConfig;
import io.helidon.webclient.http2.Http2ClientConnection;
Expand All @@ -37,24 +34,4 @@ class GrpcClientStream extends Http2ClientStream {
LockingStreamIdSequence streamIdSeq) {
super(connection, serverSettings, ctx, http2StreamConfig, http2ClientConfig, streamIdSeq);
}

@Override
public void headers(Http2Headers headers, boolean endOfStream) {
super.headers(headers, endOfStream);
}

@Override
public void data(Http2FrameHeader header, BufferData data, boolean endOfStream) {
super.data(header, data, endOfStream);
}

@Override
public void cancel() {
super.cancel();
}

@Override
public void close() {
super.close();
}
}
1 change: 1 addition & 0 deletions webclient/grpc/src/main/java/module-info.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
requires transitive io.helidon.webclient;

requires io.helidon.grpc.core;
requires java.logging;

exports io.helidon.webclient.grpc;

Expand Down

0 comments on commit a4065a8

Please sign in to comment.