Skip to content

Commit

Permalink
New test for gRPC that uses stubs. Fixes a number of checkstyle and c…
Browse files Browse the repository at this point in the history
…opyright issues.
  • Loading branch information
spericas committed Feb 20, 2024
1 parent 9ee5e0c commit 9b7d549
Show file tree
Hide file tree
Showing 19 changed files with 265 additions and 23 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022, 2023 Oracle and/or its affiliates.
* Copyright (c) 2022, 2024 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down
1 change: 0 additions & 1 deletion examples/webserver/protocols/src/main/proto/strings.proto
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ option java_package = "io.helidon.examples.grpc.strings";

service StringService {
rpc Upper (StringMessage) returns (StringMessage) {}
rpc Lower (StringMessage) returns (StringMessage) {}
rpc Split (StringMessage) returns (stream StringMessage) {}
rpc Join (stream StringMessage) returns (StringMessage) {}
rpc Echo (stream StringMessage) returns (stream StringMessage) {}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#
# Copyright (c) 2022, 2023 Oracle and/or its affiliates.
# Copyright (c) 2022, 2024 Oracle and/or its affiliates.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,6 @@ public void onCompleted() {
void testUnaryUpper() {
Channel channel = new GrpcChannel(grpcClient);
StringServiceGrpc.StringServiceBlockingStub service = StringServiceGrpc.newBlockingStub(channel);
Strings.StringMessage message = Strings.StringMessage.newBuilder().setText("hello").build();
Strings.StringMessage res = service.upper(newStringMessage("hello"));
assertThat(res.getText(), is("HELLO"));
}
Expand All @@ -160,6 +159,16 @@ void testUnaryUpperAsync() throws ExecutionException, InterruptedException, Time
assertThat(res.getText(), is("HELLO"));
}

@Test
void testServerStreamingSplit() {
Channel channel = new GrpcChannel(grpcClient);
StringServiceGrpc.StringServiceBlockingStub service = StringServiceGrpc.newBlockingStub(channel);
Iterator<Strings.StringMessage> res = service.split(newStringMessage("hello world"));
assertThat(res.next().getText(), is("hello"));
assertThat(res.next().getText(), is("world"));
assertThat(res.hasNext(), is(false));
}

@Test
void testServerStreamingSplitAsync() throws ExecutionException, InterruptedException, TimeoutException {
Channel channel = new GrpcChannel(grpcClient);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,25 @@

package io.helidon.webclient.grpc;

import io.helidon.webclient.api.ClientUri;

import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ClientCall;
import io.grpc.MethodDescriptor;
import io.helidon.webclient.api.ClientUri;

/**
* Helidon's implementation of a gRPC {@link Channel}.
*/
public class GrpcChannel extends Channel {

private final GrpcClientImpl grpcClient;

/**
* Creates a new channel from a {@link GrpcClient}.
*
* @param grpcClient the gRPC client
*/
public GrpcChannel(GrpcClient grpcClient) {
this.grpcClient = (GrpcClientImpl) grpcClient;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,6 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Logger;

import io.grpc.ClientCall;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.Status;
import io.helidon.common.buffers.BufferData;
import io.helidon.common.tls.Tls;
import io.helidon.http.Header;
Expand All @@ -54,6 +50,11 @@
import io.helidon.webclient.http2.Http2StreamConfig;
import io.helidon.webclient.http2.StreamTimeoutException;

import io.grpc.ClientCall;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.Status;

/**
* A gRPC client call handler. The typical order of calls will be:
*
Expand Down Expand Up @@ -162,6 +163,8 @@ public void request(int numMessages) {
public void cancel(String message, Throwable cause) {
LOGGER.finest(() -> "cancel called " + message);
responseListener.onClose(Status.CANCELLED, new Metadata());
readStreamFuture.cancel(true);
writeStreamFuture.cancel(true);
close();
}

Expand Down Expand Up @@ -262,11 +265,10 @@ private void startStreamingThreads() {

private void close() {
LOGGER.finest("closing client call");
readStreamFuture.cancel(true);
writeStreamFuture.cancel(true);
sendingQueue.clear();
clientStream.cancel();
connection.close();
LOGGER.finest("closing client call ends");
}

private ClientConnection clientConnection() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,15 @@
import java.util.Collections;
import java.util.List;

import io.grpc.CallCredentials;
import io.grpc.ClientInterceptor;
import io.grpc.MethodDescriptor;
import io.helidon.grpc.core.InterceptorPriorities;
import io.helidon.grpc.core.MarshallerSupplier;
import io.helidon.grpc.core.MethodHandler;
import io.helidon.grpc.core.PriorityBag;

import io.grpc.CallCredentials;
import io.grpc.ClientInterceptor;
import io.grpc.MethodDescriptor;

/**
* Encapsulates all metadata necessary to define a gRPC method. In addition to wrapping
* a {@link io.grpc.MethodDescriptor}, this class also holds the request and response
Expand Down Expand Up @@ -203,6 +204,11 @@ public <ReqT, RespT> MethodDescriptor<ReqT, RespT> descriptor() {
return (MethodDescriptor<ReqT, RespT>) descriptor;
}

/**
* Returns the {@link MethodDescriptor.MethodType} of this method.
*
* @return the method type
*/
public MethodDescriptor.MethodType type() {
return descriptor.getType();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@
import io.helidon.webclient.http2.Http2ClientConfig;

class GrpcClientProtocol {

private GrpcClientProtocol() {
}

static GrpcClientStream create(SocketContext scoketContext,
Http2Settings serverSettings,
Http2ClientConfig clientConfig,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@
import io.helidon.webclient.api.WebClient;
import io.helidon.webclient.spi.ClientProtocolProvider;

/**
* Provider for {@link GrpcClient}.
*/
public class GrpcProtocolProvider implements ClientProtocolProvider<GrpcClient, GrpcClientProtocolConfig> {
static final String CONFIG_KEY = "grpc";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,26 +26,99 @@
* @see io.helidon.webclient.grpc.GrpcClient#serviceClient(io.helidon.webclient.grpc.GrpcServiceDescriptor)
*/
public interface GrpcServiceClient {

/**
* Name of the service this client was created for.
*
* @return service name
*/
String serviceName();

/**
* Blocking gRPC unary call.
*
* @param methodName method name
* @param request the request
* @return the response
* @param <ReqT> type of request
* @param <ResT> type of response
*/
<ReqT, ResT> ResT unary(String methodName, ReqT request);

/**
* Asynchronous gRPC unary call.
*
* @param methodName method name
* @param request the request
* @param response the response observer
* @param <ReqT> type of request
* @param <ResT> type of response
*/
<ReqT, ResT> void unary(String methodName, ReqT request, StreamObserver<ResT> response);

/**
* Blocking gRPC server stream call.
*
* @param methodName method name
* @param request the request
* @return the response iterator
* @param <ReqT> type of request
* @param <ResT> type of response
*/
<ReqT, ResT> Iterator<ResT> serverStream(String methodName, ReqT request);

/**
* Asynchronous gRPC server stream call.
*
* @param methodName method name
* @param request the request
* @param response the response observer
* @param <ReqT> type of request
* @param <ResT> type of response
*/
<ReqT, ResT> void serverStream(String methodName, ReqT request, StreamObserver<ResT> response);

/**
* Blocking gRPC client stream call.
*
* @param methodName method name
* @param request the request iterator
* @return the response
* @param <ReqT> type of request
* @param <ResT> type of response
*/
<ReqT, ResT> ResT clientStream(String methodName, Iterator<ReqT> request);

/**
* Asynchronous gRPC client stream call.
*
* @param methodName method name
* @param response the response observer
* @return the request observer
* @param <ReqT> type of request
* @param <ResT> type of response
*/
<ReqT, ResT> StreamObserver<ReqT> clientStream(String methodName, StreamObserver<ResT> response);

/**
* gRPC bidirectional call using {@link Iterator}.
*
* @param methodName method name
* @param request request iterator
* @return response iterator
* @param <ReqT> type of request
* @param <ResT> type of response
*/
<ReqT, ResT> Iterator<ResT> bidi(String methodName, Iterator<ReqT> request);

/**
* gRPC bidirectional call using {@link StreamObserver}.
*
* @param methodName method name
* @param response the response observer
* @return the request observer
* @param <ReqT> type of request
* @param <ResT> type of response
*/
<ReqT, ResT> StreamObserver<ReqT> bidi(String methodName, StreamObserver<ResT> response);
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,12 @@
import java.util.NoSuchElementException;
import java.util.Optional;

import io.grpc.CallCredentials;
import io.grpc.ClientInterceptor;
import io.helidon.builder.api.Option;
import io.helidon.builder.api.Prototype;

import io.grpc.CallCredentials;
import io.grpc.ClientInterceptor;

@Prototype.Blueprint
interface GrpcServiceDescriptorBlueprint {

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Copyright (c) 2024 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

/**
* Helidon WebClient HTTP/1.1 Support.
*/
package io.helidon.webclient.grpc;
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022, 2023 Oracle and/or its affiliates.
* Copyright (c) 2022, 2024 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022, 2023 Oracle and/or its affiliates.
* Copyright (c) 2022, 2024 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -61,6 +61,9 @@
import static java.lang.System.Logger.Level.TRACE;
import static java.lang.System.Logger.Level.WARNING;

/**
* Represents an HTTP2 connection on the client.
*/
public class Http2ClientConnection {
private static final System.Logger LOGGER = System.getLogger(Http2ClientConnection.class.getName());
private static final int FRAME_HEADER_LENGTH = 9;
Expand Down Expand Up @@ -103,6 +106,14 @@ public class Http2ClientConnection {
this.writer = new Http2ConnectionWriter(connection.helidonSocket(), connection.writer(), List.of());
}

/**
* Creates an HTTP2 client connection.
*
* @param http2Client the HTTP2 client
* @param connection the client connection
* @param sendSettings whether to send the settings or not
* @return an HTTP2 client connection
*/
public static Http2ClientConnection create(Http2ClientImpl http2Client,
ClientConnection connection,
boolean sendSettings) {
Expand Down Expand Up @@ -136,6 +147,11 @@ Http2ClientStream stream(int streamId) {

}

/**
* Stream ID sequence.
*
* @return the ID sequence
*/
public LockingStreamIdSequence streamIdSequence() {
return streamIdSeq;
}
Expand All @@ -151,6 +167,12 @@ Http2ClientStream createStream(Http2StreamConfig config) {
return stream;
}

/**
* Adds a stream to the connection.
*
* @param streamId the stream ID
* @param stream the stream
*/
public void addStream(int streamId, Http2ClientStream stream) {
Lock lock = streamsLock.writeLock();
lock.lock();
Expand All @@ -161,6 +183,11 @@ public void addStream(int streamId, Http2ClientStream stream) {
}
}

/**
* Removes a stream from the connection.
*
* @param streamId the stream ID
*/
public void removeStream(int streamId) {
Lock lock = streamsLock.writeLock();
lock.lock();
Expand Down Expand Up @@ -206,6 +233,9 @@ void updateLastStreamId(int lastStreamId) {
this.lastStreamId = lastStreamId;
}

/**
* Closes this connection.
*/
public void close() {
this.goAway(0, Http2ErrorCode.NO_ERROR, "Closing connection");
if (state.getAndSet(State.CLOSED) != State.CLOSED) {
Expand Down
Loading

0 comments on commit 9b7d549

Please sign in to comment.