diff --git a/all/pom.xml b/all/pom.xml index 938982eb245..07c051e5277 100644 --- a/all/pom.xml +++ b/all/pom.xml @@ -970,6 +970,10 @@ io.helidon.webclient helidon-webclient-websocket + + io.helidon.webclient + helidon-webclient-grpc + io.helidon.webclient helidon-webclient-sse diff --git a/bom/pom.xml b/bom/pom.xml index 88a10fcf973..5b34b22c3a4 100644 --- a/bom/pom.xml +++ b/bom/pom.xml @@ -94,6 +94,12 @@ helidon-microprofile-graphql-server ${helidon.version} + + + io.helidon.grpc + helidon-grpc-core + ${helidon.version} + io.helidon.integrations.micronaut @@ -1247,6 +1253,11 @@ helidon-webserver-testing-junit5-websocket ${helidon.version} + + io.helidon.webserver.testing.junit5 + helidon-webserver-testing-junit5-grpc + ${helidon.version} + io.helidon.webclient helidon-webclient-api @@ -1272,6 +1283,11 @@ helidon-webclient-websocket ${helidon.version} + + io.helidon.webclient + helidon-webclient-grpc + ${helidon.version} + io.helidon.webclient helidon-webclient-sse diff --git a/examples/webserver/protocols/pom.xml b/examples/webserver/protocols/pom.xml index 33192caa6c9..25204c3e339 100644 --- a/examples/webserver/protocols/pom.xml +++ b/examples/webserver/protocols/pom.xml @@ -44,6 +44,14 @@ io.helidon.webserver helidon-webserver + + io.helidon.webserver + helidon-webserver-http2 + + + io.helidon.webserver + helidon-webserver-websocket + io.helidon.webserver helidon-webserver-grpc @@ -53,12 +61,12 @@ helidon-webclient-http2 - io.helidon.webserver - helidon-webserver-websocket + io.helidon.webclient + helidon-webclient-websocket - io.helidon.webserver - helidon-webserver-http2 + io.helidon.webclient + helidon-webclient-grpc org.junit.jupiter @@ -70,6 +78,26 @@ hamcrest-all test + + io.helidon.webserver.testing.junit5 + helidon-webserver-testing-junit5 + test + + + io.helidon.webserver.testing.junit5 + helidon-webserver-testing-junit5-http2 + test + + + io.helidon.webserver.testing.junit5 + helidon-webserver-testing-junit5-websocket + test + + + io.helidon.webserver.testing.junit5 + helidon-webserver-testing-junit5-grpc + test + diff --git a/examples/webserver/protocols/src/main/java/io/helidon/examples/webserver/protocols/ProtocolsMain.java b/examples/webserver/protocols/src/main/java/io/helidon/examples/webserver/protocols/ProtocolsMain.java index e8010bacc8f..0a1a9fb69c2 100644 --- a/examples/webserver/protocols/src/main/java/io/helidon/examples/webserver/protocols/ProtocolsMain.java +++ b/examples/webserver/protocols/src/main/java/io/helidon/examples/webserver/protocols/ProtocolsMain.java @@ -79,13 +79,23 @@ public static void main(String[] args) { .unary(Strings.getDescriptor(), "StringService", "Upper", - ProtocolsMain::grpcUpper)) + ProtocolsMain::grpcUpper) + .unary(Strings.getDescriptor(), + "StringService", + "Upper", + ProtocolsMain::blockingGrpcUpper)) .addRouting(WsRouting.builder() .endpoint("/tyrus/echo", ProtocolsMain::wsEcho)) .build() .start(); } + private static Strings.StringMessage blockingGrpcUpper(Strings.StringMessage reqT) { + return Strings.StringMessage.newBuilder() + .setText(reqT.getText().toUpperCase(Locale.ROOT)) + .build(); + } + private static void grpcUpper(Strings.StringMessage request, StreamObserver observer) { String requestText = request.getText(); System.out.println("grpc request: " + requestText); diff --git a/examples/webserver/protocols/src/test/java/io/helidon/examples/webserver/protocols/ProtocolsTest.java b/examples/webserver/protocols/src/test/java/io/helidon/examples/webserver/protocols/ProtocolsTest.java new file mode 100644 index 00000000000..1ca63a5a0e5 --- /dev/null +++ b/examples/webserver/protocols/src/test/java/io/helidon/examples/webserver/protocols/ProtocolsTest.java @@ -0,0 +1,23 @@ +/* + * Copyright (c) 2024 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.helidon.examples.webserver.protocols; + +import io.helidon.webserver.testing.junit5.ServerTest; + +@ServerTest +class ProtocolsTest { +} diff --git a/grpc/core/pom.xml b/grpc/core/pom.xml new file mode 100644 index 00000000000..def59c84858 --- /dev/null +++ b/grpc/core/pom.xml @@ -0,0 +1,85 @@ + + + + + + helidon-grpc-project + io.helidon.grpc + 4.0.0-SNAPSHOT + + 4.0.0 + + helidon-grpc-core + Helidon gRPC related modules + + + + io.helidon.common + helidon-common-context + + + io.helidon.common + helidon-common-config + + + io.helidon.tracing + helidon-tracing + + + io.grpc + grpc-api + + + io.grpc + grpc-core + + + io.grpc + grpc-protobuf + + + io.grpc + grpc-stub + + + com.google.j2objc + j2objc-annotations + + + + + jakarta.inject + jakarta.inject-api + + + jakarta.annotation + jakarta.annotation-api + + + org.junit.jupiter + junit-jupiter-api + test + + + org.hamcrest + hamcrest-all + test + + + diff --git a/grpc/core/src/main/java/io/helidon/grpc/core/ContextKeys.java b/grpc/core/src/main/java/io/helidon/grpc/core/ContextKeys.java new file mode 100644 index 00000000000..f51358e14e4 --- /dev/null +++ b/grpc/core/src/main/java/io/helidon/grpc/core/ContextKeys.java @@ -0,0 +1,52 @@ +/* + * Copyright (c) 2019, 2021 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.helidon.grpc.core; + +import java.lang.reflect.Method; + +import io.grpc.Context; +import io.grpc.Metadata; + +/** + * A collection of common gRPC {@link Context.Key} and + * {@link Metadata.Key} instances. + */ +public final class ContextKeys { + /** + * The {@link Metadata.Key} to use to obtain the authorization data. + */ + public static final Metadata.Key AUTHORIZATION = + Metadata.Key.of("Authorization", Metadata.ASCII_STRING_MARSHALLER); + + /** + * The gRPC context key to use to obtain the Helidon {@link io.helidon.common.context.Context} + * from the gRPC {@link Context}. + */ + public static final Context.Key HELIDON_CONTEXT = + Context.key(io.helidon.common.context.Context.class.getCanonicalName()); + + /** + * The {@link Context.Key} to use to obtain the actual underlying rpc {@link Method}. + */ + public static final Context.Key SERVICE_METHOD = Context.key(Method.class.getName()); + + /** + * Private constructor for utility class. + */ + private ContextKeys() { + } +} diff --git a/grpc/core/src/main/java/io/helidon/grpc/core/GrpcTracingContext.java b/grpc/core/src/main/java/io/helidon/grpc/core/GrpcTracingContext.java new file mode 100644 index 00000000000..e310ed43937 --- /dev/null +++ b/grpc/core/src/main/java/io/helidon/grpc/core/GrpcTracingContext.java @@ -0,0 +1,45 @@ +/* + * Copyright (c) 2022 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.helidon.grpc.core; + +import java.util.Optional; + +import io.grpc.Context; +import io.helidon.tracing.Span; + +/** + * Contextual information related to Tracing. + */ +public final class GrpcTracingContext { + private static final String SPAN_KEY_NAME = "io.helidon.tracing.active-span"; + + /** + * Context key for Span instance. + */ + public static final Context.Key SPAN_KEY = Context.key(SPAN_KEY_NAME); + + /** + * Get the current active span associated with the context. + * + * @return span if one is in current context + */ + public static Optional activeSpan() { + return Optional.ofNullable(SPAN_KEY.get()); + } + + private GrpcTracingContext() { + } +} diff --git a/grpc/core/src/main/java/io/helidon/grpc/core/GrpcTracingName.java b/grpc/core/src/main/java/io/helidon/grpc/core/GrpcTracingName.java new file mode 100644 index 00000000000..1a48b0f2f8a --- /dev/null +++ b/grpc/core/src/main/java/io/helidon/grpc/core/GrpcTracingName.java @@ -0,0 +1,32 @@ +/* + * Copyright (c) 2022 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.helidon.grpc.core; + +import io.grpc.MethodDescriptor; + +/** + * Name generator for span operation name. + */ +@FunctionalInterface +public interface GrpcTracingName { + /** + * Constructs a span's operation name from the gRPC method. + * + * @param method method to extract a name from + * @return operation name + */ + String name(MethodDescriptor method); +} diff --git a/grpc/core/src/main/java/io/helidon/grpc/core/InterceptorPriorities.java b/grpc/core/src/main/java/io/helidon/grpc/core/InterceptorPriorities.java new file mode 100644 index 00000000000..d4da4008e66 --- /dev/null +++ b/grpc/core/src/main/java/io/helidon/grpc/core/InterceptorPriorities.java @@ -0,0 +1,63 @@ +/* + * Copyright (c) 2019, 2021 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.helidon.grpc.core; + +/** + * Constants that represent a priority ordering that interceptors registered with + * a gRPC service or method will be applied. + */ +public class InterceptorPriorities { + /** + * Context priority. + *

+ * Interceptors with this priority typically only perform tasks + * such as adding state to the call {@link io.grpc.Context}. + */ + public static final int CONTEXT = 1000; + + /** + * Tracing priority. + *

+ * Tracing and metrics interceptors are typically applied after any context + * interceptors so that they can trace and gather metrics on the whole call + * stack of remaining interceptors. + */ + public static final int TRACING = CONTEXT + 1; + + /** + * Security authentication priority. + */ + public static final int AUTHENTICATION = 2000; + + /** + * Security authorization priority. + */ + public static final int AUTHORIZATION = 2000; + + /** + * User-level priority. + * + * This value is also used as a default priority for application-supplied interceptors. + */ + public static final int USER = 5000; + + /** + * Cannot create instances. + */ + private InterceptorPriorities() { + } +} diff --git a/grpc/core/src/main/java/io/helidon/grpc/core/MarshallerSupplier.java b/grpc/core/src/main/java/io/helidon/grpc/core/MarshallerSupplier.java new file mode 100644 index 00000000000..47e14278f76 --- /dev/null +++ b/grpc/core/src/main/java/io/helidon/grpc/core/MarshallerSupplier.java @@ -0,0 +1,105 @@ +/* + * Copyright (c) 2019, 2022 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.helidon.grpc.core; + +import com.google.protobuf.MessageLite; +import io.grpc.MethodDescriptor; +import io.grpc.protobuf.lite.ProtoLiteUtils; +import jakarta.inject.Named; + +/** + * A supplier of {@link MethodDescriptor.Marshaller} instances for specific + * classes. + */ +@FunctionalInterface +public interface MarshallerSupplier { + + /** + * The name of the Protocol Buffer marshaller supplier. + */ + String PROTO = "proto"; + + /** + * The name to use to specify the default marshaller supplier. + */ + String DEFAULT = "default"; + + /** + * Obtain a {@link MethodDescriptor.Marshaller} for a type. + * + * @param clazz the {@link Class} of the type to obtain the {@link MethodDescriptor.Marshaller} for + * @param the type to be marshalled + * + * @return a {@link MethodDescriptor.Marshaller} for a type + */ + MethodDescriptor.Marshaller get(Class clazz); + + /** + * Obtain the default marshaller. + * + * @return the default marshaller + */ + static MarshallerSupplier defaultInstance() { + return new DefaultMarshallerSupplier(); + } + + /** + * The default {@link MarshallerSupplier}. + */ + @Named(MarshallerSupplier.DEFAULT) + class DefaultMarshallerSupplier + implements MarshallerSupplier { + + private final ProtoMarshallerSupplier proto = new ProtoMarshallerSupplier(); + + @Override + public MethodDescriptor.Marshaller get(Class clazz) { + if (MessageLite.class.isAssignableFrom(clazz)) { + return proto.get(clazz); + } + String msg = String.format( + "Class %s must be a valid ProtoBuf message, or a custom marshaller for it must be specified explicitly", + clazz.getName()); + throw new IllegalArgumentException(msg); + } + } + + /** + * A {@link MarshallerSupplier} implementation that + * supplies Protocol Buffer marshaller instances. + */ + @Named(PROTO) + class ProtoMarshallerSupplier + implements MarshallerSupplier { + + @Override + @SuppressWarnings("unchecked") + public MethodDescriptor.Marshaller get(Class clazz) { + try { + java.lang.reflect.Method getDefaultInstance = clazz.getDeclaredMethod("getDefaultInstance"); + MessageLite instance = (MessageLite) getDefaultInstance.invoke(clazz); + + return (MethodDescriptor.Marshaller) ProtoLiteUtils.marshaller(instance); + } catch (Exception e) { + String msg = String.format( + "Attempting to use class %s, which is not a valid Protocol buffer message, with a default marshaller", + clazz.getName()); + throw new IllegalArgumentException(msg); + } + } + } +} diff --git a/grpc/core/src/main/java/io/helidon/grpc/core/MethodHandler.java b/grpc/core/src/main/java/io/helidon/grpc/core/MethodHandler.java new file mode 100644 index 00000000000..205152293f5 --- /dev/null +++ b/grpc/core/src/main/java/io/helidon/grpc/core/MethodHandler.java @@ -0,0 +1,191 @@ +/* + * Copyright (c) 2019, 2021 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.helidon.grpc.core; + +import java.util.concurrent.CompletionStage; + +import io.grpc.MethodDescriptor; +import io.grpc.Status; +import io.grpc.stub.ServerCalls; +import io.grpc.stub.StreamObserver; + +/** + * A gRPC method call handler. + * + * @param the request type + * @param the response type + */ +public interface MethodHandler + extends ServerCalls.UnaryMethod, + ServerCalls.ClientStreamingMethod, + ServerCalls.ServerStreamingMethod, + ServerCalls.BidiStreamingMethod { + /** + * Obtain the {@link MethodDescriptor.MethodType gRPC method tyoe} that + * this {@link MethodHandler} handles. + * + * @return the {@link MethodDescriptor.MethodType gRPC method type} that + * this {@link MethodHandler} handles + */ + MethodDescriptor.MethodType type(); + + /** + * Obtain the request type. + * @return the request type + */ + Class getRequestType(); + + /** + * Obtain the response type. + * @return the response type + */ + Class getResponseType(); + + /** + * Obtain the name of the underlying Java method that this handler maps to. + * + * @return the name of the underlying Java method that this handler maps to + */ + String javaMethodName(); + + /** + * Determine whether this is a client side only handler. + * + * @return {@code true} if this handler can only be used on the client + */ + default boolean clientOnly() { + return false; + } + + @Override + default void invoke(ReqT request, StreamObserver observer) { + observer.onError(Status.UNIMPLEMENTED.asException()); + } + + @Override + default StreamObserver invoke(StreamObserver observer) { + observer.onError(Status.UNIMPLEMENTED.asException()); + return null; + } + + /** + * Handle a bi-directional client call. + * + * @param args the call arguments. + * @param client the {@link BidirectionalClient} instance to forward the call to. + * @return the call result + */ + default Object bidirectional(Object[] args, BidirectionalClient client) { + throw Status.UNIMPLEMENTED.asRuntimeException(); + } + + /** + * Handle a bi-directional client call. + * + * @param args the call arguments. + * @param client the {@link ClientStreaming} instance to forward the call to. + * @return the call result + */ + default Object clientStreaming(Object[] args, ClientStreaming client) { + throw Status.UNIMPLEMENTED.asRuntimeException(); + } + + /** + * Handle a bi-directional client call. + * + * @param args the call arguments. + * @param client the {@link ServerStreamingClient} instance to forward the call to. + * @return the call result + */ + default Object serverStreaming(Object[] args, ServerStreamingClient client) { + throw Status.UNIMPLEMENTED.asRuntimeException(); + } + + /** + * Handle a bi-directional client call. + * + * @param args the call arguments. + * @param client the {@link UnaryClient} instance to forward the call to. + * @return the call result + */ + default Object unary(Object[] args, UnaryClient client) { + throw Status.UNIMPLEMENTED.asRuntimeException(); + } + + /** + * A bidirectional client call handler. + */ + interface BidirectionalClient { + /** + * Perform a bidirectional client call. + * + * @param methodName the name of the gRPC method + * @param observer the {@link StreamObserver} that will receive the responses + * @param the request type + * @param the response type + * @return a {@link StreamObserver} to use to send requests + */ + StreamObserver bidiStreaming(String methodName, StreamObserver observer); + } + + /** + * A client streaming client call handler. + */ + interface ClientStreaming { + /** + * Perform a client streaming client call. + * + * @param methodName the name of the gRPC method + * @param observer the {@link StreamObserver} that will receive the responses + * @param the request type + * @param the response type + * @return a {@link StreamObserver} to use to send requests + */ + StreamObserver clientStreaming(String methodName, StreamObserver observer); + } + + /** + * A server streaming client call handler. + */ + interface ServerStreamingClient { + /** + * Perform a server streaming client call. + * + * @param methodName the name of the gRPC method + * @param request the request message + * @param observer the {@link StreamObserver} that will receive the responses + * @param the request type + * @param the response type + */ + void serverStreaming(String methodName, ReqT request, StreamObserver observer); + } + + /** + * A unary client call handler. + */ + interface UnaryClient { + /** + * Perform a unary client call. + * + * @param methodName the name of the gRPC method + * @param request the request message + * @param the request type + * @param the response type + * @return a {@link java.util.concurrent.CompletableFuture} that completes when the call completes + */ + CompletionStage unary(String methodName, ReqT request); + } +} diff --git a/grpc/core/src/main/java/io/helidon/grpc/core/PriorityBag.java b/grpc/core/src/main/java/io/helidon/grpc/core/PriorityBag.java new file mode 100644 index 00000000000..7114abb16d2 --- /dev/null +++ b/grpc/core/src/main/java/io/helidon/grpc/core/PriorityBag.java @@ -0,0 +1,214 @@ +/* + * Copyright (c) 2019, 2021 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.helidon.grpc.core; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import java.util.stream.Stream; + +// import io.helidon.common.Prioritized; +import jakarta.annotation.Priority; + +/** + * A bag of values ordered by priority. + *

+ * An element with lower priority number is more significant than an element + * with a higher priority number. + *

+ * For cases where priority is the same, elements are ordered in the order that + * they were added to the bag. + *

+ * Elements added with negative priorities are assumed to have no priority and + * will be least significant in order. + * + * @param the type of elements in the bag + */ +public class PriorityBag implements Iterable { + + private final Map> contents; + + private final List noPriorityList; + + private final int defaultPriority; + + private PriorityBag(Map> contents, List noPriorityList, int defaultPriority) { + this.contents = contents; + this.noPriorityList = noPriorityList; + this.defaultPriority = defaultPriority; + } + + /** + * Create a new {@link PriorityBag} where elements + * added with no priority will be last in the order. + * + * @param the type of elements in the bag + * @return a new {@link PriorityBag} where elements + * dded with no priority will be last in the + * order + */ + public static PriorityBag create() { + return new PriorityBag<>(new TreeMap<>(), new ArrayList<>(), -1); + } + + /** + * Create a new {@link PriorityBag} where elements + * added with no priority will be be given a default + * priority value. + * + * @param priority the default priority value to assign + * to elements added with no priority + * @param the type of elements in the bag + * + * @return a new {@link PriorityBag} where elements + * added with no priority will be be given + * a default priority value + */ + public static PriorityBag withDefaultPriority(int priority) { + return new PriorityBag<>(new TreeMap<>(), new ArrayList<>(), priority); + } + + + /** + * Obtain a copy of this {@link PriorityBag}. + * + * @return a copy of this {@link PriorityBag} + */ + public PriorityBag copyMe() { + PriorityBag copy = PriorityBag.create(); + copy.merge(this); + return copy; + } + + /** + * Obtain an immutable copy of this {@link PriorityBag}. + * + * @return an immutable copy of this {@link PriorityBag} + */ + public PriorityBag readOnly() { + return new PriorityBag<>(Collections.unmodifiableMap(contents), + Collections.unmodifiableList(noPriorityList), + defaultPriority); + } + + /** + * Merge a {@link PriorityBag} into this {@link PriorityBag}. + * + * @param bag the bag to merge + */ + public void merge(PriorityBag bag) { + bag.contents.forEach((priority, value) -> addAll(value, priority)); + this.noPriorityList.addAll(bag.noPriorityList); + } + + /** + * Add elements to the bag. + *

+ * If the element's class is annotated with the {@link jakarta.annotation.Priority} + * annotation then that value will be used to determine priority otherwise the + * default priority value will be used. + * + * @param values the elements to add + */ + public void addAll(Iterable values) { + for (T value : values) { + add(value); + } + } + + /** + * Add elements to the bag. + * + * @param values the elements to add + * @param priority the priority to assign to the elements + */ + public void addAll(Iterable values, int priority) { + for (T value : values) { + add(value, priority); + } + } + + /** + * Add an element to the bag. + *

+ * If the element's class is annotated with the {@link jakarta.annotation.Priority} + * annotation then that value will be used to determine priority otherwise the + * default priority value will be used. + * + * @param value the element to add + */ + public void add(T value) { + if (value != null) { + int priority; + // if (value instanceof Prioritized) { + // priority = ((Prioritized) value).priority(); + // } else { + Priority annotation = value.getClass().getAnnotation(Priority.class); + priority = annotation == null ? defaultPriority : annotation.value(); + // } + add(value, priority); + } + } + + /** + * Add an element to the bag with a specific priority. + *

+ * + * @param value the element to add + * @param priority the priority of the element + */ + public void add(T value, int priority) { + if (value != null) { + if (priority < 0) { + noPriorityList.add(value); + } else { + contents.compute(priority, (key, list) -> combine(list, value)); + } + } + } + + /** + * Obtain the contents of this {@link PriorityBag} as + * an ordered {@link Stream}. + * + * @return the contents of this {@link PriorityBag} as + * an ordered {@link Stream} + */ + public Stream stream() { + Stream stream = contents.entrySet() + .stream() + .flatMap(e -> e.getValue().stream()); + + return Stream.concat(stream, noPriorityList.stream()); + } + + @Override + public Iterator iterator() { + return stream().iterator(); + } + + private List combine(List list, T value) { + if (list == null) { + list = new ArrayList<>(); + } + list.add(value); + return list; + } +} diff --git a/grpc/core/src/main/java/io/helidon/grpc/core/ResponseHelper.java b/grpc/core/src/main/java/io/helidon/grpc/core/ResponseHelper.java new file mode 100644 index 00000000000..3878c748664 --- /dev/null +++ b/grpc/core/src/main/java/io/helidon/grpc/core/ResponseHelper.java @@ -0,0 +1,455 @@ +/* + * Copyright (c) 2019, 2021 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.helidon.grpc.core; + +import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import java.util.function.BiConsumer; +import java.util.function.Consumer; +import java.util.function.Supplier; +import java.util.stream.Stream; + +import io.grpc.stub.StreamObserver; + +/** + * A number of helper methods to handle sending responses to a {@link StreamObserver}. + */ +public final class ResponseHelper { + private ResponseHelper() { + } + + /** + * Complete a gRPC request. + *

+ * The request will be completed by calling {@link StreamObserver#onNext(Object)} using the + * specified value then calling {@link StreamObserver#onCompleted()}. + * + * @param observer the {@link StreamObserver} to complete + * @param value the value to use when calling {@link StreamObserver#onNext(Object)} + * @param they type of the request result + */ + public static void complete(StreamObserver observer, T value) { + StreamObserver safe = SafeStreamObserver.ensureSafeObserver(observer); + safe.onNext(value); + safe.onCompleted(); + } + + /** + * Complete a gRPC request based on the result of a {@link CompletionStage}. + *

+ * The request will be completed by calling {@link StreamObserver#onNext(Object)} using the + * result obtained on completion of the specified {@link CompletionStage} and then calling + * {@link StreamObserver#onCompleted()}. + *

+ * If the {@link CompletionStage} completes with an error then {@link StreamObserver#onError(Throwable)} + * will be called. + * + * @param observer the {@link StreamObserver} to complete + * @param future the {@link CompletionStage} to use to obtain the value to use to call + * {@link StreamObserver#onNext(Object)} + * @param they type of the request result + */ + public static void complete(StreamObserver observer, CompletionStage future) { + future.whenComplete(completeWithResult(observer)); + } + + /** + * Asynchronously complete a gRPC request based on the result of a {@link CompletionStage}. + *

+ * The request will be completed by calling {@link StreamObserver#onNext(Object)} using the + * result obtained on completion of the specified {@link CompletionStage} and then calling + * {@link StreamObserver#onCompleted()}. + *

+ * If the {@link CompletionStage} completes with an error then {@link StreamObserver#onError(Throwable)} + * will be called. + *

+ * The execution will take place asynchronously on the fork-join thread pool. + * + * @param observer the {@link StreamObserver} to complete + * @param future the {@link CompletionStage} to use to obtain the value to use to call + * {@link StreamObserver#onNext(Object)} + * @param they type of the request result + */ + public static void completeAsync(StreamObserver observer, CompletionStage future) { + future.whenCompleteAsync(completeWithResult(observer)); + } + + /** + * Asynchronously complete a gRPC request based on the result of a {@link CompletionStage}. + *

+ * The request will be completed by calling {@link StreamObserver#onNext(Object)} using the + * result obtained on completion of the specified {@link CompletionStage} and then calling + * {@link StreamObserver#onCompleted()}. + *

+ * If the {@link CompletionStage} completes with an error then {@link StreamObserver#onError(Throwable)} + * will be called. + * + * @param observer the {@link StreamObserver} to complete + * @param future the {@link CompletionStage} to use to obtain the value to use to call + * {@link StreamObserver#onNext(Object)} + * @param executor the {@link Executor} on which to execute the asynchronous + * request completion + * @param they type of the request result + */ + public static void completeAsync(StreamObserver observer, CompletionStage future, Executor executor) { + future.whenCompleteAsync(completeWithResult(observer), executor); + } + + /** + * Complete a gRPC request based on the result of a {@link Callable}. + *

+ * The request will be completed by calling {@link StreamObserver#onNext(Object)} using the + * result obtained on completion of the specified {@link Callable} and then calling + * {@link StreamObserver#onCompleted()}. + *

+ * If the {@link Callable#call()} method throws an exception then {@link StreamObserver#onError(Throwable)} + * will be called. + * + * @param observer the {@link StreamObserver} to complete + * @param callable the {@link Callable} to use to obtain the value to use to call + * {@link StreamObserver#onNext(Object)} + * @param they type of the request result + */ + public static void complete(StreamObserver observer, Callable callable) { + try { + observer.onNext(callable.call()); + observer.onCompleted(); + } catch (Throwable t) { + observer.onError(t); + } + } + + /** + * Asynchronously complete a gRPC request based on the result of a {@link Callable}. + *

+ * The request will be completed by calling {@link StreamObserver#onNext(Object)} using the + * result obtained on completion of the specified {@link Callable} and then calling + * {@link StreamObserver#onCompleted()}. + *

+ * If the {@link Callable#call()} method throws an exception then {@link StreamObserver#onError(Throwable)} + * will be called. + *

+ * The execution will take place asynchronously on the fork-join thread pool. + * + * @param observer the {@link StreamObserver} to complete + * @param callable the {@link Callable} to use to obtain the value to use to call + * {@link StreamObserver#onNext(Object)} + * @param they type of the request result + */ + public static void completeAsync(StreamObserver observer, Callable callable) { + completeAsync(observer, CompletableFuture.supplyAsync(createSupplier(callable))); + } + + /** + * Asynchronously complete a gRPC request based on the result of a {@link Callable}. + *

+ * The request will be completed by calling {@link StreamObserver#onNext(Object)} using the + * result obtained on completion of the specified {@link Callable} and then calling + * {@link StreamObserver#onCompleted()}. + *

+ * If the {@link Callable#call()} method throws an exception then {@link StreamObserver#onError(Throwable)} + * will be called. + * + * @param observer the {@link StreamObserver} to complete + * @param callable the {@link Callable} to use to obtain the value to use to call + * {@link StreamObserver#onNext(Object)} + * @param executor the {@link Executor} on which to execute the asynchronous + * request completion + * @param they type of the request result + */ + public static void completeAsync(StreamObserver observer, Callable callable, Executor executor) { + completeAsync(observer, CompletableFuture.supplyAsync(createSupplier(callable), executor)); + } + + /** + * Execute a {@link Runnable} task and on completion of the task complete the gRPC request by + * calling {@link StreamObserver#onNext(Object)} using the specified result and then call + * {@link StreamObserver#onCompleted()}. + *

+ * If the {@link Runnable#run()} method throws an exception then {@link StreamObserver#onError(Throwable)} + * will be called. + * + * @param observer the {@link StreamObserver} to complete + * @param task the {@link Runnable} to execute + * @param result the result to pass to {@link StreamObserver#onNext(Object)} + * @param they type of the request result + */ + public static void complete(StreamObserver observer, Runnable task, T result) { + complete(observer, Executors.callable(task, result)); + } + + /** + * Asynchronously execute a {@link Runnable} task and on completion of the task complete the gRPC + * request by calling {@link StreamObserver#onNext(Object)} using the specified result and then + * call {@link StreamObserver#onCompleted()}. + *

+ * If the {@link Runnable#run()} method throws an exception then {@link StreamObserver#onError(Throwable)} + * will be called. + *

+ * The task and and request completion will be executed on the fork-join thread pool. + * + * @param observer the {@link StreamObserver} to complete + * @param task the {@link Runnable} to execute + * @param result the result to pass to {@link StreamObserver#onNext(Object)} + * @param they type of the request result + */ + public static void completeAsync(StreamObserver observer, Runnable task, T result) { + completeAsync(observer, Executors.callable(task, result)); + } + + /** + * Asynchronously execute a {@link Runnable} task and on completion of the task complete the gRPC + * request by calling {@link StreamObserver#onNext(Object)} using the specified result and then + * call {@link StreamObserver#onCompleted()}. + *

+ * If the {@link Runnable#run()} method throws an exception then {@link StreamObserver#onError(Throwable)} + * will be called. + * + * @param observer the {@link StreamObserver} to complete + * @param task the {@link Runnable} to execute + * @param result the result to pass to {@link StreamObserver#onNext(Object)} + * @param executor the {@link Executor} on which to execute the asynchronous + * request completion + * @param they type of the request result + */ + public static void completeAsync(StreamObserver observer, Runnable task, T result, Executor executor) { + completeAsync(observer, Executors.callable(task, result), executor); + } + + /** + * Send the values from a {@link Stream} to the {@link StreamObserver#onNext(Object)} method until the + * {@link Stream} is exhausted call {@link StreamObserver#onCompleted()}. + *

+ * If an error occurs whilst streaming results then {@link StreamObserver#onError(Throwable)} will be called. + * + * @param observer the {@link StreamObserver} to complete + * @param stream the {@link Stream} of results to send to {@link StreamObserver#onNext(Object)} + * @param they type of the request result + */ + public static void stream(StreamObserver observer, Stream stream) { + stream(observer, () -> stream); + } + + /** + * Asynchronously send the values from a {@link Stream} to the {@link StreamObserver#onNext(Object)} method until + * the {@link Stream} is exhausted call {@link StreamObserver#onCompleted()}. + *

+ * If an error occurs whilst streaming results then {@link StreamObserver#onError(Throwable)} will be called. + * + * @param observer the {@link StreamObserver} to complete + * @param stream the {@link Stream} of results to send to {@link StreamObserver#onNext(Object)} + * @param executor the {@link Executor} on which to execute the asynchronous + * request completion + * @param they type of the request result + */ + public static void streamAsync(StreamObserver observer, Stream stream, Executor executor) { + executor.execute(() -> stream(observer, () -> stream)); + } + + /** + * Send the values from a {@link Stream} to the {@link StreamObserver#onNext(Object)} method until the + * {@link Stream} is exhausted call {@link StreamObserver#onCompleted()}. + *

+ * If an error occurs whilst streaming results then {@link StreamObserver#onError(Throwable)} will be called. + * + * @param observer the {@link StreamObserver} to complete + * @param supplier the {@link Supplier} of the {@link Stream} of results to send to {@link StreamObserver#onNext(Object)} + * @param they type of the request result + */ + public static void stream(StreamObserver observer, Supplier> supplier) { + StreamObserver safe = SafeStreamObserver.ensureSafeObserver(observer); + Throwable thrown = null; + + try { + supplier.get().forEach(safe::onNext); + } catch (Throwable t) { + thrown = t; + } + + if (thrown == null) { + safe.onCompleted(); + } else { + safe.onError(thrown); + } + } + + /** + * Asynchronously send the values from a {@link Stream} to the {@link StreamObserver#onNext(Object)} method + * until the {@link Stream} is exhausted call {@link StreamObserver#onCompleted()}. + *

+ * If an error occurs whilst streaming results then {@link StreamObserver#onError(Throwable)} will be called. + * + * @param observer the {@link StreamObserver} to complete + * @param supplier the {@link Supplier} of the {@link Stream} of results to send to {@link StreamObserver#onNext(Object)} + * @param executor the {@link Executor} on which to execute the asynchronous + * request completion + * @param they type of the request result + */ + public static void streamAsync(StreamObserver observer, Supplier> supplier, Executor executor) { + executor.execute(() -> stream(observer, supplier)); + } + + + /** + * Obtain a {@link Consumer} that can be used to send values to the {@link StreamObserver#onNext(Object)} method until + * the {@link CompletionStage} completes then call {@link StreamObserver#onCompleted()}. + *

+ * If the {@link CompletionStage} completes with an error then {@link StreamObserver#onError(Throwable)} + * will be called instead of {@link StreamObserver#onCompleted()}. + * + * @param observer the {@link StreamObserver} to send values to and complete when the {@link CompletionStage} completes + * @param stage the {@link CompletionStage} to await completion of + * @param they type of the request result + * + * @return a {@link Consumer} that can be used to send values to the {@link StreamObserver#onNext(Object)} method + */ + // todo: a bit of a chicken or egg when used with Coherence streaming methods, isn't it? + public static Consumer stream(StreamObserver observer, CompletionStage stage) { + StreamObserver safe = SafeStreamObserver.ensureSafeObserver(observer); + stage.whenComplete(completeWithoutResult(safe)); + return safe::onNext; + } + + /** + * Obtain a {@link Consumer} that can be used to send values to the {@link StreamObserver#onNext(Object)} method until + * the {@link CompletionStage} completes then asynchronously call {@link StreamObserver#onCompleted()} using the + * fork-join thread pool. + *

+ * If the {@link CompletionStage} completes with an error then {@link StreamObserver#onError(Throwable)} + * will be called instead of {@link StreamObserver#onCompleted()}. + * + * @param observer the {@link StreamObserver} to send values to and complete when the {@link CompletionStage} completes + * @param stage the {@link CompletionStage} to await completion of + * @param they type of the request result + * + * @return a {@link Consumer} that can be used to send values to the {@link StreamObserver#onNext(Object)} method + */ + public static Consumer streamAsync(StreamObserver observer, CompletionStage stage) { + StreamObserver safe = SafeStreamObserver.ensureSafeObserver(observer); + stage.whenCompleteAsync(completeWithoutResult(safe)); + return value -> CompletableFuture.runAsync(() -> safe.onNext(value)); + } + + /** + * Obtain a {@link Consumer} that can be used to send values to the {@link StreamObserver#onNext(Object)} method until + * the {@link CompletionStage} completes then asynchronously call {@link StreamObserver#onCompleted()} using the executor + * thread. + *

+ * If the {@link CompletionStage} completes with an error then {@link StreamObserver#onError(Throwable)} + * will be called instead of {@link StreamObserver#onCompleted()}. + * + * @param observer the {@link StreamObserver} to send values to and complete when the {@link CompletionStage} completes + * @param stage the {@link CompletionStage} to await completion of + * @param executor the {@link Executor} on which to execute the asynchronous + * request completion + * @param they type of the request result + * + * @return a {@link Consumer} that can be used to send values to the {@link StreamObserver#onNext(Object)} method + */ + public static Consumer streamAsync(StreamObserver observer, CompletionStage stage, Executor executor) { + StreamObserver safe = SafeStreamObserver.ensureSafeObserver(observer); + stage.whenCompleteAsync(completeWithoutResult(safe), executor); + return value -> CompletableFuture.runAsync(() -> safe.onNext(value), executor); + } + + /** + * Obtain a {@link Consumer} that can be used to send values to the {@link StreamObserver#onNext(Object)} method. + * @param observer the {@link StreamObserver} to complete + * @param the type of the result + * @param the type of the response + * @return a {@link Consumer} that can be used to send values to the {@link StreamObserver#onNext(Object)} method + */ + public static BiConsumer completeWithResult(StreamObserver observer) { + return new CompletionAction<>(observer, true); + } + + /** + * Obtain a {@link Consumer} that can be used to complete a {@link StreamObserver}. + * @param observer the {@link StreamObserver} to complete + * @param the type of the response + * @return a {@link Consumer} that can be used to complete a {@link StreamObserver} + */ + public static BiConsumer completeWithoutResult(StreamObserver observer) { + return new CompletionAction<>(observer, false); + } + + /** + * Convert a {@link Callable} to a {@link Supplier}. + * @param callable the {@link Callable} to convert + * @param the result returned by the {@link Callable} + * @return a {@link Supplier} that wraps the {@link Callable} + */ + public static Supplier createSupplier(Callable callable) { + return new CallableSupplier<>(callable); + } + + /** + * A {@link BiConsumer} that is used to handle completion of a + * {@link CompletionStage} by forwarding + * the result to a {@link io.grpc.stub.StreamObserver}. + * + * @param the type of the {@link CompletionStage}'s result + * @param the type of result expected by the {@link io.grpc.stub.StreamObserver} + */ + private static class CompletionAction implements BiConsumer { + private StreamObserver observer; + private boolean sendResult; + + CompletionAction(StreamObserver observer, boolean sendResult) { + this.observer = observer; + this.sendResult = sendResult; + } + + @Override + @SuppressWarnings("unchecked") + public void accept(T result, Throwable error) { + if (error != null) { + observer.onError(error); + } else { + if (sendResult) { + observer.onNext((U) result); + } + observer.onCompleted(); + } + } + } + + /** + * A class that converts a {@link Callable} to a {@link Supplier}. + * @param the type of result returned from the callable + */ + private static class CallableSupplier implements Supplier { + private Callable callable; + + CallableSupplier(Callable callable) { + this.callable = callable; + } + + @Override + public T get() { + try { + return callable.call(); + } catch (Exception e) { + throw new CompletionException(e.getMessage(), e); + } + } + } +} diff --git a/grpc/core/src/main/java/io/helidon/grpc/core/SafeStreamObserver.java b/grpc/core/src/main/java/io/helidon/grpc/core/SafeStreamObserver.java new file mode 100644 index 00000000000..9661a45be3f --- /dev/null +++ b/grpc/core/src/main/java/io/helidon/grpc/core/SafeStreamObserver.java @@ -0,0 +1,169 @@ +/* + * Copyright (c) 2019, 2021 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.helidon.grpc.core; + +import java.util.logging.Level; +import java.util.logging.Logger; + +import io.grpc.Status; +import io.grpc.stub.StreamObserver; + +/** + * A {@link io.grpc.stub.StreamObserver} that handles exceptions correctly. + * + * @param the type of response expected + */ +public class SafeStreamObserver + implements StreamObserver { + + /** + * Create a {@link SafeStreamObserver} that wraps + * another {@link io.grpc.stub.StreamObserver}. + * + * @param streamObserver the {@link io.grpc.stub.StreamObserver} to wrap + */ + private SafeStreamObserver(StreamObserver streamObserver) { + delegate = streamObserver; + } + + @Override + public void onNext(T t) { + if (done) { + return; + } + + if (t == null) { + onError(Status.INVALID_ARGUMENT + .withDescription("onNext called with null. Null values are generally not allowed.") + .asRuntimeException()); + } else { + try { + delegate.onNext(t); + } catch (Throwable thrown) { + throwIfFatal(thrown); + onError(thrown); + } + } + } + + @Override + public void onError(Throwable thrown) { + try { + if (done) { + LOGGER.log(Level.SEVERE, checkNotNull(thrown), () -> "OnError called after StreamObserver was closed"); + } else { + done = true; + delegate.onError(checkNotNull(thrown)); + } + } catch (Throwable t) { + throwIfFatal(t); + LOGGER.log(Level.SEVERE, t, () -> "Caught exception handling onError"); + } + } + + @Override + public void onCompleted() { + if (done) { + LOGGER.log(Level.WARNING, "onComplete called after StreamObserver was closed"); + } else { + try { + done = true; + delegate.onCompleted(); + } catch (Throwable thrown) { + throwIfFatal(thrown); + LOGGER.log(Level.SEVERE, thrown, () -> "Caught exception handling onComplete"); + } + } + } + + /** + * Obtain the wrapped {@link StreamObserver}. + * @return the wrapped {@link StreamObserver} + */ + public StreamObserver delegate() { + return delegate; + } + + private Throwable checkNotNull(Throwable thrown) { + if (thrown == null) { + thrown = Status.INVALID_ARGUMENT + .withDescription("onError called with null Throwable. Null exceptions are generally not allowed.") + .asRuntimeException(); + } + + return thrown; + } + + /** + * Throws a particular {@code Throwable} only if it belongs to a set of "fatal" error varieties. These varieties are + * as follows: + *

    + *
  • {@code VirtualMachineError}
  • + *
  • {@code ThreadDeath}
  • + *
  • {@code LinkageError}
  • + *
+ * + * @param thrown the {@code Throwable} to test and perhaps throw + */ + private static void throwIfFatal(Throwable thrown) { + if (thrown instanceof VirtualMachineError) { + throw (VirtualMachineError) thrown; + } else if (thrown instanceof ThreadDeath) { + throw (ThreadDeath) thrown; + } else if (thrown instanceof LinkageError) { + throw (LinkageError) thrown; + } + } + + /** + * Ensure that the specified {@link StreamObserver} is a safe observer. + *

+ * If the specified observer is not an instance of {@link SafeStreamObserver} then wrap + * it in a {@link SafeStreamObserver}. + * + * @param observer the {@link StreamObserver} to test + * @param the response type expected by the observer + * + * @return a safe {@link StreamObserver} + */ + public static StreamObserver ensureSafeObserver(StreamObserver observer) { + if (observer instanceof SafeStreamObserver) { + return observer; + } + + return new SafeStreamObserver<>(observer); + } + + // ----- constants ------------------------------------------------------ + + /** + * The {2link Logger} to use. + */ + private static final Logger LOGGER = Logger.getLogger(SafeStreamObserver.class.getName()); + + // ----- data members --------------------------------------------------- + + /** + * The actual StreamObserver. + */ + private StreamObserver delegate; + + /** + * Indicates a terminal state. + */ + private boolean done; +} diff --git a/grpc/core/src/main/java/io/helidon/grpc/core/package-info.java b/grpc/core/src/main/java/io/helidon/grpc/core/package-info.java new file mode 100644 index 00000000000..019189e5db5 --- /dev/null +++ b/grpc/core/src/main/java/io/helidon/grpc/core/package-info.java @@ -0,0 +1,20 @@ +/* + * Copyright (c) 2019, 2021 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Core classes used by both the reactive gRPC server API and gRPC client API. + */ +package io.helidon.grpc.core; diff --git a/grpc/core/src/main/java/module-info.java b/grpc/core/src/main/java/module-info.java new file mode 100644 index 00000000000..a46489ed6ad --- /dev/null +++ b/grpc/core/src/main/java/module-info.java @@ -0,0 +1,37 @@ +/* + * Copyright (c) 2022, 2024 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Helidon GRPC core package. + */ +module io.helidon.grpc.core { + + requires transitive io.helidon.common.context; + requires transitive io.helidon.common.config; + requires transitive io.grpc; + requires transitive io.grpc.stub; + requires transitive com.google.protobuf; + requires transitive io.grpc.protobuf; + requires transitive io.grpc.protobuf.lite; + + requires java.logging; + requires io.helidon.tracing; + requires jakarta.inject; + requires jakarta.annotation; + + exports io.helidon.grpc.core; + +} diff --git a/grpc/pom.xml b/grpc/pom.xml new file mode 100644 index 00000000000..64b1c84c1a2 --- /dev/null +++ b/grpc/pom.xml @@ -0,0 +1,38 @@ + + + + + 4.0.0 + + io.helidon + helidon-project + 4.0.0-SNAPSHOT + + pom + + io.helidon.grpc + helidon-grpc-project + Helidon gRPC Project + + gRPC support for Helidon + + + core + + diff --git a/pom.xml b/pom.xml index 3ec957e4fce..43a19128fbb 100644 --- a/pom.xml +++ b/pom.xml @@ -193,6 +193,7 @@ dependencies fault-tolerance graphql + grpc health helidon http diff --git a/webclient/grpc/pom.xml b/webclient/grpc/pom.xml new file mode 100644 index 00000000000..8ddcaf6fccf --- /dev/null +++ b/webclient/grpc/pom.xml @@ -0,0 +1,146 @@ + + + + 4.0.0 + + io.helidon.webclient + helidon-webclient-project + 4.0.0-SNAPSHOT + + + helidon-webclient-grpc + Helidon WebClient gRPC + + + + io.grpc + grpc-core + + + io.grpc + grpc-stub + + + io.helidon.grpc + helidon-grpc-core + + + io.helidon.http + helidon-http-http2 + + + io.helidon.http.encoding + helidon-http-encoding + + + io.helidon.webclient + helidon-webclient + + + io.helidon.webclient + helidon-webclient-http2 + + + io.helidon.common.features + helidon-common-features-api + true + + + org.junit.jupiter + junit-jupiter-api + test + + + org.hamcrest + hamcrest-all + test + + + io.helidon.webserver + helidon-webserver-grpc + test + + + io.helidon.webserver.testing.junit5 + helidon-webserver-testing-junit5 + test + + + org.junit.jupiter + junit-jupiter-params + test + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + + + + io.helidon.common.features + helidon-common-features-processor + ${helidon.version} + + + io.helidon.config + helidon-config-metadata-processor + ${helidon.version} + + + io.helidon.builder + helidon-builder-processor + ${helidon.version} + + + io.helidon.common.processor + helidon-common-processor-helidon-copyright + ${helidon.version} + + + + + + io.helidon.common.features + helidon-common-features-processor + ${helidon.version} + + + io.helidon.config + helidon-config-metadata-processor + ${helidon.version} + + + io.helidon.builder + helidon-builder-processor + ${helidon.version} + + + io.helidon.common.processor + helidon-common-processor-helidon-copyright + ${helidon.version} + + + + + + + diff --git a/webclient/grpc/src/main/java/io/helidon/webclient/grpc/ClientMethodDescriptor.java b/webclient/grpc/src/main/java/io/helidon/webclient/grpc/ClientMethodDescriptor.java new file mode 100644 index 00000000000..71982384ce2 --- /dev/null +++ b/webclient/grpc/src/main/java/io/helidon/webclient/grpc/ClientMethodDescriptor.java @@ -0,0 +1,419 @@ +/* + * Copyright (c) 2024 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.helidon.webclient.grpc; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import io.grpc.CallCredentials; +import io.grpc.ClientInterceptor; +import io.grpc.MethodDescriptor; +import io.helidon.grpc.core.InterceptorPriorities; +import io.helidon.grpc.core.MarshallerSupplier; +import io.helidon.grpc.core.MethodHandler; +import io.helidon.grpc.core.PriorityBag; + +/** + * Encapsulates all metadata necessary to define a gRPC method. In addition to wrapping a {@link io.grpc.MethodDescriptor}, + * this class also holds the request and response types of the gRPC method. A + * {@link io.helidon.webclient.grpc.GrpcServiceDescriptor} can contain zero or more {@link io.grpc.MethodDescriptor}. + *

+ * An instance of ClientMethodDescriptor can be created either from an existing {@link io.grpc.MethodDescriptor} or + * from one of the factory methods {@link #bidirectional(String, String)}, {@link #clientStreaming(String, String)}, + * {@link #serverStreaming(String, String)} or {@link #unary(String, String)}. + */ +public final class ClientMethodDescriptor { + + /** + * The simple name of the method. + */ + private final String name; + + /** + * The {@link io.grpc.MethodDescriptor} for this method. This is usually obtained from protocol buffer + * method getDescriptor (from service getDescriptor). + */ + private final MethodDescriptor descriptor; + + /** + * The list of client interceptors for this method. + */ + private final List interceptors; + + /** + * The {@link io.grpc.CallCredentials} for this method. + */ + private final CallCredentials callCredentials; + + /** + * The method handler for this method. + */ + private final MethodHandler methodHandler; + + private ClientMethodDescriptor(String name, + MethodDescriptor descriptor, + List interceptors, + CallCredentials callCredentials, + MethodHandler methodHandler) { + this.name = name; + this.descriptor = descriptor; + this.interceptors = interceptors; + this.callCredentials = callCredentials; + this.methodHandler = methodHandler; + } + + /** + * Creates a new {@link io.helidon.webclient.grpc.ClientMethodDescriptor.Builder} with the specified name and {@link io.grpc.MethodDescriptor}. + * + * @param serviceName the name of the owning gRPC service + * @param name the simple method name + * @param descriptor the underlying gRPC {@link io.grpc.MethodDescriptor.Builder} + * @return A new instance of a {@link io.helidon.webclient.grpc.ClientMethodDescriptor.Builder} + */ + static Builder builder(String serviceName, + String name, + MethodDescriptor.Builder descriptor) { + return new Builder(serviceName, name, descriptor); + } + + /** + * Creates a new {@link io.helidon.webclient.grpc.ClientMethodDescriptor.Builder} with the specified + * name and {@link io.grpc.MethodDescriptor}. + * + * @param serviceName the name of the owning gRPC service + * @param name the simple method name + * @param descriptor the underlying gRPC {@link io.grpc.MethodDescriptor.Builder} + * @return a new instance of a {@link io.helidon.webclient.grpc.ClientMethodDescriptor.Builder} + */ + static ClientMethodDescriptor create(String serviceName, + String name, + MethodDescriptor.Builder descriptor) { + return builder(serviceName, name, descriptor).build(); + } + + /** + * Creates a new unary {@link io.helidon.webclient.grpc.ClientMethodDescriptor.Builder} with + * the specified name. + * + * @param serviceName the name of the owning gRPC service + * @param name the method name + * @return a new instance of a {@link io.helidon.webclient.grpc.ClientMethodDescriptor.Builder} + */ + static Builder unary(String serviceName, String name) { + return builder(serviceName, name, MethodDescriptor.MethodType.UNARY); + } + + /** + * Creates a new client Streaming {@link io.helidon.webclient.grpc.ClientMethodDescriptor.Builder} with + * the specified name. + * + * @param serviceName the name of the owning gRPC service + * @param name the method name + * @return a new instance of a {@link io.helidon.webclient.grpc.ClientMethodDescriptor.Builder} + */ + static Builder clientStreaming(String serviceName, String name) { + return builder(serviceName, name, MethodDescriptor.MethodType.CLIENT_STREAMING); + } + + /** + * Creates a new server streaming {@link io.helidon.webclient.grpc.ClientMethodDescriptor.Builder} with + * the specified name. + * + * @param serviceName the name of the owning gRPC service + * @param name the method name + * @return a new instance of a {@link io.helidon.webclient.grpc.ClientMethodDescriptor.Builder} + */ + static Builder serverStreaming(String serviceName, String name) { + return builder(serviceName, name, MethodDescriptor.MethodType.SERVER_STREAMING); + } + + /** + * Creates a new bidirectional {@link io.helidon.webclient.grpc.ClientMethodDescriptor.Builder} with + * the specified name. + * + * @param serviceName the name of the owning gRPC service + * @param name the method name + * @return a new instance of a {@link io.helidon.webclient.grpc.ClientMethodDescriptor.Builder} + */ + static Builder bidirectional(String serviceName, String name) { + return builder(serviceName, name, MethodDescriptor.MethodType.BIDI_STREAMING); + } + + /** + * Return the {@link io.grpc.CallCredentials} set on this service. + * + * @return the {@link io.grpc.CallCredentials} set on this service + */ + public CallCredentials callCredentials() { + return this.callCredentials; + } + + /** + * Creates a new {@link io.helidon.webclient.grpc.ClientMethodDescriptor.Builder} with the specified name. + * + * @param serviceName the name of the owning gRPC service + * @param name the method name + * @param methodType the gRPC method type + * @return a new instance of a {@link io.helidon.webclient.grpc.ClientMethodDescriptor.Builder} + */ + static Builder builder(String serviceName, + String name, + MethodDescriptor.MethodType methodType) { + + MethodDescriptor.Builder builder = MethodDescriptor.newBuilder() + .setFullMethodName(serviceName + "/" + name) + .setType(methodType); + + return new Builder(serviceName, name, builder) + .requestType(Object.class) + .responseType(Object.class); + } + + /** + * Returns the simple name of the method. + * + * @return The simple name of the method. + */ + public String name() { + return name; + } + + /** + * Returns the {@link io.grpc.MethodDescriptor} of this method. + * + * @param the request type + * @param the response type + * @return The {@link io.grpc.MethodDescriptor} of this method. + */ + @SuppressWarnings("unchecked") + public MethodDescriptor descriptor() { + return descriptor; + } + + public MethodDescriptor.MethodType type() { + return descriptor.getType(); + } + + /** + * Obtain the {@link io.grpc.ClientInterceptor}s to use for this method. + * + * @return the {@link io.grpc.ClientInterceptor}s to use for this method + */ + List interceptors() { + return Collections.unmodifiableList(interceptors); + } + + /** + * Obtain the {@link MethodHandler} to use to make client calls. + * + * @return the {@link MethodHandler} to use to make client calls + */ + public MethodHandler methodHandler() { + return methodHandler; + } + + /** + * ClientMethod configuration API. + */ + public interface Rules { + + /** + * Sets the type of parameter of this method. + * + * @param type The type of parameter of this method. + * @return this {@link io.helidon.webclient.grpc.ClientMethodDescriptor.Rules} instance for fluent call chaining + */ + Rules requestType(Class type); + + /** + * Sets the type of parameter of this method. + * + * @param type The type of parameter of this method. + * @return this {@link io.helidon.webclient.grpc.ClientMethodDescriptor.Rules} instance for fluent call chaining + */ + Rules responseType(Class type); + + /** + * Register one or more {@link io.grpc.ClientInterceptor interceptors} for the method. + * + * @param interceptors the interceptor(s) to register + * @return this {@link io.helidon.webclient.grpc.ClientMethodDescriptor.Rules} instance for fluent call chaining + */ + Rules intercept(ClientInterceptor... interceptors); + + /** + * Register one or more {@link io.grpc.ClientInterceptor interceptors} for the method. + *

+ * The added interceptors will be applied using the specified priority. + * + * @param priority the priority to assign to the interceptors + * @param interceptors one or more {@link io.grpc.ClientInterceptor}s to register + * @return this {@link io.helidon.webclient.grpc.ClientMethodDescriptor.Rules} to allow fluent method chaining + */ + Rules intercept(int priority, ClientInterceptor... interceptors); + + /** + * Register the {@link MarshallerSupplier} for the method. + *

+ * If not set the default {@link MarshallerSupplier} from the service will be used. + * + * @param marshallerSupplier the {@link MarshallerSupplier} for the service + * @return this {@link io.helidon.webclient.grpc.ClientMethodDescriptor.Rules} instance for fluent call chaining + */ + Rules marshallerSupplier(MarshallerSupplier marshallerSupplier); + + /** + * Register the specified {@link io.grpc.CallCredentials} to be used for this method. This overrides + * any {@link io.grpc.CallCredentials} set on the {@link io.helidon.webclient.grpc.ClientMethodDescriptor}. + * + * @param callCredentials the {@link io.grpc.CallCredentials} to set. + * @return this {@link io.helidon.webclient.grpc.ClientMethodDescriptor.Rules} instance for fluent call chaining + */ + Rules callCredentials(CallCredentials callCredentials); + + /** + * Set the {@link MethodHandler} that can be used to invoke the method. + * + * @param methodHandler the {2link MethodHandler} to use + * @return this {@link io.helidon.webclient.grpc.ClientMethodDescriptor.Rules} instance for fluent call chaining + */ + Rules methodHandler(MethodHandler methodHandler); + } + + /** + * {@link io.grpc.MethodDescriptor} builder implementation. + */ + public static class Builder + implements Rules, io.helidon.common.Builder { + + private String name; + private MethodDescriptor.Builder descriptor; + private Class requestType; + private Class responseType; + private PriorityBag interceptors = PriorityBag.withDefaultPriority(InterceptorPriorities.USER); + private MarshallerSupplier defaultMarshallerSupplier = MarshallerSupplier.defaultInstance(); + private MarshallerSupplier marshallerSupplier; + private CallCredentials callCredentials; + private MethodHandler methodHandler; + + /** + * Constructs a new Builder instance. + * + * @param serviceName The name of the service ths method belongs to + * @param name the name of this method + * @param descriptor The gRPC method descriptor builder + */ + Builder(String serviceName, String name, MethodDescriptor.Builder descriptor) { + this.name = name; + this.descriptor = descriptor.setFullMethodName(serviceName + "/" + name); + } + + @Override + public Builder requestType(Class type) { + this.requestType = type; + return this; + } + + @Override + public Builder responseType(Class type) { + this.responseType = type; + return this; + } + + @Override + public Builder intercept(ClientInterceptor... interceptors) { + this.interceptors.addAll(Arrays.asList(interceptors)); + return this; + } + + @Override + public Builder intercept(int priority, ClientInterceptor... interceptors) { + this.interceptors.addAll(Arrays.asList(interceptors), priority); + return this; + } + + @Override + public Builder marshallerSupplier(MarshallerSupplier supplier) { + this.marshallerSupplier = supplier; + return this; + } + + Builder defaultMarshallerSupplier(MarshallerSupplier supplier) { + if (supplier == null) { + this.defaultMarshallerSupplier = MarshallerSupplier.defaultInstance(); + } else { + this.defaultMarshallerSupplier = supplier; + } + return this; + } + + @Override + public Builder methodHandler(MethodHandler methodHandler) { + this.methodHandler = methodHandler; + return this; + } + + /** + * Sets the full name of this Method. + * + * @param fullName the full name of the method + * @return this builder instance for fluent API + */ + Builder fullName(String fullName) { + descriptor.setFullMethodName(fullName); + this.name = fullName.substring(fullName.lastIndexOf('/') + 1); + return this; + } + + @Override + public Rules callCredentials(CallCredentials callCredentials) { + this.callCredentials = callCredentials; + return this; + } + + /** + * Builds and returns a new instance of {@link io.helidon.webclient.grpc.ClientMethodDescriptor}. + * + * @return a new instance of {@link io.helidon.webclient.grpc.ClientMethodDescriptor} + */ + @Override + @SuppressWarnings("unchecked") + public ClientMethodDescriptor build() { + MarshallerSupplier supplier = this.marshallerSupplier; + + if (supplier == null) { + supplier = defaultMarshallerSupplier; + } + + if (requestType != null) { + descriptor.setRequestMarshaller(supplier.get(requestType)); + } + + if (responseType != null) { + descriptor.setResponseMarshaller(supplier.get(responseType)); + } + + return new ClientMethodDescriptor(name, + descriptor.build(), + interceptors.stream().toList(), + callCredentials, + methodHandler); + } + + } +} diff --git a/webclient/grpc/src/main/java/io/helidon/webclient/grpc/GrpcClient.java b/webclient/grpc/src/main/java/io/helidon/webclient/grpc/GrpcClient.java new file mode 100644 index 00000000000..9eb07db371c --- /dev/null +++ b/webclient/grpc/src/main/java/io/helidon/webclient/grpc/GrpcClient.java @@ -0,0 +1,83 @@ +/* + * Copyright (c) 2024 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.helidon.webclient.grpc; + +import java.util.function.Consumer; + +import io.helidon.builder.api.RuntimeType; +import io.helidon.webclient.api.WebClient; +import io.helidon.webclient.spi.Protocol; + +/** + * gRPC client. + */ +@RuntimeType.PrototypedBy(GrpcClientConfig.class) +public interface GrpcClient extends RuntimeType.Api { + /** + * Protocol to use to obtain an instance of gRPC specific client from + * {@link io.helidon.webclient.api.WebClient#client(io.helidon.webclient.spi.Protocol)}. + */ + Protocol PROTOCOL = GrpcProtocolProvider::new; + + /** + * A new fluent API builder to customize client setup. + * + * @return a new builder + */ + static GrpcClientConfig.Builder builder() { + return GrpcClientConfig.builder(); + } + + /** + * Create a new instance with custom configuration. + * + * @param clientConfig HTTP/2 client configuration + * @return a new HTTP/2 client + */ + static GrpcClient create(GrpcClientConfig clientConfig) { + return new GrpcClientImpl(WebClient.create(it -> it.from(clientConfig)), clientConfig); + } + + /** + * Create a new instance customizing its configuration. + * + * @param consumer HTTP/2 client configuration + * @return a new HTTP/2 client + */ + static GrpcClient create(Consumer consumer) { + return create(GrpcClientConfig.builder() + .update(consumer) + .buildPrototype()); + } + + /** + * Create a new instance with default configuration. + * + * @return a new HTTP/2 client + */ + static GrpcClient create() { + return create(GrpcClientConfig.create()); + } + + /** + * Create a client for a specific service. The client will be backed by the same HTTP/2 client. + * + * @param descriptor descriptor to use + * @return client for the provided descriptor + */ + GrpcServiceClient serviceClient(GrpcServiceDescriptor descriptor); +} diff --git a/webclient/grpc/src/main/java/io/helidon/webclient/grpc/GrpcClientCall.java b/webclient/grpc/src/main/java/io/helidon/webclient/grpc/GrpcClientCall.java new file mode 100644 index 00000000000..3d4506dcb02 --- /dev/null +++ b/webclient/grpc/src/main/java/io/helidon/webclient/grpc/GrpcClientCall.java @@ -0,0 +1,77 @@ +/* + * Copyright (c) 2024 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.helidon.webclient.grpc; + +import java.util.concurrent.atomic.AtomicReference; + +import io.helidon.http.Header; +import io.helidon.http.HeaderNames; +import io.helidon.http.HeaderValues; +import io.helidon.webclient.http2.Http2Client; + +import io.grpc.ClientCall; +import io.grpc.Metadata; + +class GrpcClientCall extends ClientCall { + private static final Header GRPC_CONTENT_TYPE = HeaderValues.create(HeaderNames.CONTENT_TYPE, "application/grpc"); + + private final AtomicReference> responseListener = new AtomicReference<>(); + private final Http2Client http2Client; + private final ClientMethodDescriptor method; + + GrpcClientCall(Http2Client http2Client, ClientMethodDescriptor method) { + this.http2Client = http2Client; + this.method = method; + } + + @Override + public void start(Listener responseListener, Metadata headers) { + // connect + // send headers + if (this.responseListener.compareAndSet(null, responseListener)) { + /* + Http2ClientConnection http2ClientRequest = http2Client + .post("") // must be post + .header(GRPC_CONTENT_TYPE) + .connect(); + */ + + } else { + throw new IllegalStateException("Response listener was already set"); + } + } + + @Override + public void request(int numMessages) { + + } + + @Override + public void cancel(String message, Throwable cause) { + + } + + @Override + public void halfClose() { + + } + + @Override + public void sendMessage(ReqT message) { + + } +} diff --git a/webclient/grpc/src/main/java/io/helidon/webclient/grpc/GrpcClientConfigBlueprint.java b/webclient/grpc/src/main/java/io/helidon/webclient/grpc/GrpcClientConfigBlueprint.java new file mode 100644 index 00000000000..8d1821acfea --- /dev/null +++ b/webclient/grpc/src/main/java/io/helidon/webclient/grpc/GrpcClientConfigBlueprint.java @@ -0,0 +1,37 @@ +/* + * Copyright (c) 2024 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.helidon.webclient.grpc; + +import io.helidon.builder.api.Option; +import io.helidon.builder.api.Prototype; +import io.helidon.webclient.api.HttpClientConfig; + +/** + * Configuration of a grpc client. + */ +@Prototype.Blueprint +@Prototype.Configured +interface GrpcClientConfigBlueprint extends HttpClientConfig, Prototype.Factory { + /** + * WebSocket specific configuration. + * + * @return protocol specific configuration + */ + @Option.Default("create()") + @Option.Configured + GrpcClientProtocolConfig protocolConfig(); +} diff --git a/webclient/grpc/src/main/java/io/helidon/webclient/grpc/GrpcClientImpl.java b/webclient/grpc/src/main/java/io/helidon/webclient/grpc/GrpcClientImpl.java new file mode 100644 index 00000000000..6eadc8044c3 --- /dev/null +++ b/webclient/grpc/src/main/java/io/helidon/webclient/grpc/GrpcClientImpl.java @@ -0,0 +1,42 @@ +/* + * Copyright (c) 2024 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.helidon.webclient.grpc; + +import io.helidon.webclient.api.WebClient; +import io.helidon.webclient.http2.Http2Client; + +class GrpcClientImpl implements GrpcClient { + private final WebClient webClient; + private final Http2Client http2Client; + private final GrpcClientConfig clientConfig; + + GrpcClientImpl(WebClient webClient, GrpcClientConfig clientConfig) { + this.webClient = webClient; + this.http2Client = webClient.client(Http2Client.PROTOCOL); + this.clientConfig = clientConfig; + } + + @Override + public GrpcClientConfig prototype() { + return clientConfig; + } + + @Override + public GrpcServiceClient serviceClient(GrpcServiceDescriptor descriptor) { + throw new UnsupportedOperationException("Not implemented"); + } +} diff --git a/webclient/grpc/src/main/java/io/helidon/webclient/grpc/GrpcClientProtocol.java b/webclient/grpc/src/main/java/io/helidon/webclient/grpc/GrpcClientProtocol.java new file mode 100644 index 00000000000..f3248d2ba88 --- /dev/null +++ b/webclient/grpc/src/main/java/io/helidon/webclient/grpc/GrpcClientProtocol.java @@ -0,0 +1,34 @@ +/* + * Copyright (c) 2024 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.helidon.webclient.grpc; + +import io.helidon.common.socket.SocketContext; +import io.helidon.http.http2.Http2Settings; +import io.helidon.http.http2.Http2StreamState; +import io.helidon.http.http2.StreamFlowControl; +import io.helidon.webclient.http2.Http2ClientConfig; + +class GrpcClientProtocol { + static GrpcClientStream create(SocketContext scoketContext, + Http2Settings serverSettings, + Http2ClientConfig clientConfig, + int streamId, + StreamFlowControl flowControl, + Http2StreamState streamState) { + return new GrpcClientStream(); + } +} diff --git a/webclient/grpc/src/main/java/io/helidon/webclient/grpc/GrpcClientProtocolConfigBlueprint.java b/webclient/grpc/src/main/java/io/helidon/webclient/grpc/GrpcClientProtocolConfigBlueprint.java new file mode 100644 index 00000000000..938277994e3 --- /dev/null +++ b/webclient/grpc/src/main/java/io/helidon/webclient/grpc/GrpcClientProtocolConfigBlueprint.java @@ -0,0 +1,39 @@ +/* + * Copyright (c) 2024 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.helidon.webclient.grpc; + +import io.helidon.builder.api.Option; +import io.helidon.builder.api.Prototype; +import io.helidon.webclient.spi.ProtocolConfig; + +/** + * Configuration of an HTTP/1.1 client. + */ +@Prototype.Blueprint +@Prototype.Configured +interface GrpcClientProtocolConfigBlueprint extends ProtocolConfig { + @Override + default String type() { + return GrpcProtocolProvider.CONFIG_KEY; + } + + @Option.Configured + @Option.Default(GrpcProtocolProvider.CONFIG_KEY) + @Override + String name(); + +} diff --git a/webclient/grpc/src/main/java/io/helidon/webclient/grpc/GrpcClientStream.java b/webclient/grpc/src/main/java/io/helidon/webclient/grpc/GrpcClientStream.java new file mode 100644 index 00000000000..1bac6c80f36 --- /dev/null +++ b/webclient/grpc/src/main/java/io/helidon/webclient/grpc/GrpcClientStream.java @@ -0,0 +1,75 @@ +/* + * Copyright (c) 2024 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.helidon.webclient.grpc; + +import io.helidon.common.buffers.BufferData; +import io.helidon.http.http2.Http2FrameHeader; +import io.helidon.http.http2.Http2Headers; +import io.helidon.http.http2.Http2Priority; +import io.helidon.http.http2.Http2RstStream; +import io.helidon.http.http2.Http2Stream; +import io.helidon.http.http2.Http2StreamState; +import io.helidon.http.http2.Http2WindowUpdate; +import io.helidon.http.http2.StreamFlowControl; +import io.helidon.webclient.api.ReleasableResource; + +class GrpcClientStream implements Http2Stream, ReleasableResource { + @Override + public boolean rstStream(Http2RstStream rstStream) { + return false; + } + + @Override + public void windowUpdate(Http2WindowUpdate windowUpdate) { + + } + + @Override + public void headers(Http2Headers headers, boolean endOfStream) { + + } + + @Override + public void data(Http2FrameHeader header, BufferData data, boolean endOfStream) { + + } + + @Override + public void priority(Http2Priority http2Priority) { + + } + + @Override + public int streamId() { + return 0; + } + + @Override + public Http2StreamState streamState() { + return null; + } + + @Override + public StreamFlowControl flowControl() { + return null; + } + + @Override + public void closeResource() { + + } +} diff --git a/webclient/grpc/src/main/java/io/helidon/webclient/grpc/GrpcProtocolConfigProvider.java b/webclient/grpc/src/main/java/io/helidon/webclient/grpc/GrpcProtocolConfigProvider.java new file mode 100644 index 00000000000..a6b43a5ba63 --- /dev/null +++ b/webclient/grpc/src/main/java/io/helidon/webclient/grpc/GrpcProtocolConfigProvider.java @@ -0,0 +1,45 @@ +/* + * Copyright (c) 2024 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.helidon.webclient.grpc; + +import io.helidon.common.config.Config; +import io.helidon.webclient.spi.ProtocolConfigProvider; + +/** + * Implementation of protocol config provider for gRPC. + */ +public class GrpcProtocolConfigProvider implements ProtocolConfigProvider { + /** + * Required to be used by {@link java.util.ServiceLoader}. + * @deprecated do not use directly, use Http1ClientProtocol + */ + public GrpcProtocolConfigProvider() { + } + + @Override + public String configKey() { + return GrpcProtocolProvider.CONFIG_KEY; + } + + @Override + public GrpcClientProtocolConfig create(Config config, String name) { + return GrpcClientProtocolConfig.builder() + .config(config) + .name(name) + .build(); + } +} diff --git a/webclient/grpc/src/main/java/io/helidon/webclient/grpc/GrpcProtocolProvider.java b/webclient/grpc/src/main/java/io/helidon/webclient/grpc/GrpcProtocolProvider.java new file mode 100644 index 00000000000..b0555ac0c49 --- /dev/null +++ b/webclient/grpc/src/main/java/io/helidon/webclient/grpc/GrpcProtocolProvider.java @@ -0,0 +1,53 @@ +/* + * Copyright (c) 2024 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.helidon.webclient.grpc; + +import io.helidon.webclient.api.WebClient; +import io.helidon.webclient.spi.ClientProtocolProvider; + +public class GrpcProtocolProvider implements ClientProtocolProvider { + static final String CONFIG_KEY = "grpc"; + + /** + * Public constructor required by {@link java.util.ServiceLoader}. + */ + public GrpcProtocolProvider() { + } + + @Override + public String protocolId() { + return "grpc"; + } + + @Override + public Class configType() { + return GrpcClientProtocolConfig.class; + } + + @Override + public GrpcClientProtocolConfig defaultConfig() { + return GrpcClientProtocolConfig.create(); + } + + @Override + public GrpcClient protocol(WebClient client, GrpcClientProtocolConfig config) { + return new GrpcClientImpl(client, + GrpcClientConfig.builder().from(client.prototype()) + .protocolConfig(config) + .buildPrototype()); + } +} diff --git a/webclient/grpc/src/main/java/io/helidon/webclient/grpc/GrpcServiceClient.java b/webclient/grpc/src/main/java/io/helidon/webclient/grpc/GrpcServiceClient.java new file mode 100644 index 00000000000..0a578892e78 --- /dev/null +++ b/webclient/grpc/src/main/java/io/helidon/webclient/grpc/GrpcServiceClient.java @@ -0,0 +1,51 @@ +/* + * Copyright (c) 2024 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.helidon.webclient.grpc; + +import java.util.Collection; + +import io.grpc.stub.StreamObserver; + +/** + * Client for a single service. + * + * @see io.helidon.webclient.grpc.GrpcClient#serviceClient(io.helidon.webclient.grpc.GrpcServiceDescriptor) + */ +public interface GrpcServiceClient { + /** + * Name of the service this client was created for. + * + * @return service name + */ + String serviceName(); + + RespT unary(String methodName, ReqT request); + + StreamObserver unary(String methodName, StreamObserver responseObserver); + + Collection serverStream(String methodName, ReqT request); + + void serverStream(String methodName, ReqT request, StreamObserver responseObserver); + + RespT clientStream(String methodName, Collection request); + + StreamObserver clientStream(String methodName, StreamObserver responseObserver); + + Collection bidi(String methodName, Collection responseObserver); + + StreamObserver bidi(String methodName, StreamObserver responseObserver); +} diff --git a/webclient/grpc/src/main/java/io/helidon/webclient/grpc/GrpcServiceClientImpl.java b/webclient/grpc/src/main/java/io/helidon/webclient/grpc/GrpcServiceClientImpl.java new file mode 100644 index 00000000000..7a9ba194c27 --- /dev/null +++ b/webclient/grpc/src/main/java/io/helidon/webclient/grpc/GrpcServiceClientImpl.java @@ -0,0 +1,97 @@ +/* + * Copyright (c) 2024 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.helidon.webclient.grpc; + +import java.util.Collection; + +import io.grpc.stub.StreamObserver; +import io.helidon.webclient.http2.Http2Client; + +import io.grpc.ClientCall; +import io.grpc.MethodDescriptor; +import io.grpc.stub.ClientCalls; + +class GrpcServiceClientImpl implements GrpcServiceClient { + private final GrpcServiceDescriptor descriptor; + private final Http2Client http2Client; + + GrpcServiceClientImpl(GrpcServiceDescriptor descriptor, Http2Client http2Client) { + this.descriptor = descriptor; + this.http2Client = http2Client; + } + + @Override + public String serviceName() { + return null; + } + + @Override + public RespT unary(String methodName, ReqT request) { + ClientCall call = ensureMethod(methodName, MethodDescriptor.MethodType.UNARY); + return ClientCalls.blockingUnaryCall(call, request); + } + + @Override + public StreamObserver unary(String methodName, StreamObserver responseObserver) { + return null; + } + + @Override + public Collection serverStream(String methodName, ReqT request) { + return null; + } + + @Override + public void serverStream(String methodName, ReqT request, StreamObserver responseObserver) { + + } + + @Override + public RespT clientStream(String methodName, Collection request) { + return null; + } + + @Override + public StreamObserver clientStream(String methodName, StreamObserver responseObserver) { + return null; + } + + @Override + public Collection bidi(String methodName, Collection responseObserver) { + return null; + } + + @Override + public StreamObserver bidi(String methodName, StreamObserver responseObserver) { + return null; + } + + private ClientCall ensureMethod(String methodName, MethodDescriptor.MethodType methodType) { + ClientMethodDescriptor method = descriptor.method(methodName); + + if (!method.type().equals(methodType)) { + throw new IllegalArgumentException("Method " + methodName + " is of type " + method.type() + ", yet " + methodType + " was requested."); + } + + return createClientCall(method); + } + + private ClientCall createClientCall(ClientMethodDescriptor method) { + + return new GrpcClientCall<>(http2Client, method); + } +} diff --git a/webclient/grpc/src/main/java/io/helidon/webclient/grpc/GrpcServiceDescriptor.java b/webclient/grpc/src/main/java/io/helidon/webclient/grpc/GrpcServiceDescriptor.java new file mode 100644 index 00000000000..f96b71a766a --- /dev/null +++ b/webclient/grpc/src/main/java/io/helidon/webclient/grpc/GrpcServiceDescriptor.java @@ -0,0 +1,42 @@ +/* + * Copyright (c) 2024 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.helidon.webclient.grpc; + +import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; + +import io.grpc.CallCredentials; +import io.grpc.ClientInterceptor; + +/** + * All required meta-data about a client side gRPC service. + */ +public class GrpcServiceDescriptor { + private String serviceName; + private Map methods; + private List interceptors; + private CallCredentials callCredentials; + + ClientMethodDescriptor method(String name) { + ClientMethodDescriptor clientMethodDescriptor = methods.get(name); + if (clientMethodDescriptor == null) { + throw new NoSuchElementException("There is no method " + name + " defined for service " + this); + } + return clientMethodDescriptor; + } +} diff --git a/webclient/grpc/src/main/java/module-info.java b/webclient/grpc/src/main/java/module-info.java new file mode 100644 index 00000000000..a13e5880898 --- /dev/null +++ b/webclient/grpc/src/main/java/module-info.java @@ -0,0 +1,47 @@ +/* + * Copyright (c) 2024 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import io.helidon.common.features.api.Feature; +import io.helidon.common.features.api.HelidonFlavor; + +/** + * Helidon WebClient gRPC Support. + */ +@Feature(value = "gRPC", + description = "WebClient gRPC support", + in = HelidonFlavor.SE, + path = {"WebClient", "gRPC"} +) +module io.helidon.webclient.grpc { + + requires static io.helidon.common.features.api; + + requires transitive io.grpc; + requires transitive io.grpc.stub; + requires transitive io.helidon.builder.api; + requires transitive io.helidon.common.pki; + requires transitive io.helidon.webclient.http2; + requires transitive io.helidon.webclient; + + requires io.helidon.grpc.core; + + exports io.helidon.webclient.grpc; + + provides io.helidon.webclient.spi.ClientProtocolProvider + with io.helidon.webclient.grpc.GrpcProtocolProvider; + provides io.helidon.webclient.spi.ProtocolConfigProvider + with io.helidon.webclient.grpc.GrpcProtocolConfigProvider; +} \ No newline at end of file diff --git a/webclient/http2/src/main/java/io/helidon/webclient/http2/Http2Client.java b/webclient/http2/src/main/java/io/helidon/webclient/http2/Http2Client.java index c1d35f6c7a3..286731ba1d9 100644 --- a/webclient/http2/src/main/java/io/helidon/webclient/http2/Http2Client.java +++ b/webclient/http2/src/main/java/io/helidon/webclient/http2/Http2Client.java @@ -22,6 +22,7 @@ import io.helidon.common.config.Config; import io.helidon.webclient.api.HttpClient; import io.helidon.webclient.api.WebClient; +import io.helidon.webclient.spi.Protocol; /** * HTTP2 client. @@ -32,6 +33,11 @@ public interface Http2Client extends HttpClient, RuntimeType * HTTP/2 protocol ID, as used by ALPN. */ String PROTOCOL_ID = "h2"; + /** + * Protocol to use to obtain an instance of http/2 specific client from + * {@link io.helidon.webclient.api.WebClient#client(io.helidon.webclient.spi.Protocol)}. + */ + Protocol PROTOCOL = Http2ProtocolProvider::new; /** * A new fluent API builder to customize client setup. diff --git a/webclient/pom.xml b/webclient/pom.xml index 818b8621299..e2ee8899811 100644 --- a/webclient/pom.xml +++ b/webclient/pom.xml @@ -41,6 +41,7 @@ tracing webclient websocket + grpc diff --git a/webclient/websocket/src/main/java/io/helidon/webclient/websocket/WsClient.java b/webclient/websocket/src/main/java/io/helidon/webclient/websocket/WsClient.java index 1070e29ba50..007dd200af1 100644 --- a/webclient/websocket/src/main/java/io/helidon/webclient/websocket/WsClient.java +++ b/webclient/websocket/src/main/java/io/helidon/webclient/websocket/WsClient.java @@ -31,7 +31,7 @@ @RuntimeType.PrototypedBy(WsClientConfig.class) public interface WsClient extends RuntimeType.Api { /** - * Protocol to use to obtain an instance of WebSocket specific clietn from + * Protocol to use to obtain an instance of WebSocket specific client from * {@link io.helidon.webclient.api.WebClient#client(io.helidon.webclient.spi.Protocol)}. */ Protocol PROTOCOL = WsProtocolProvider::new; diff --git a/webserver/grpc/src/main/java/io/helidon/webserver/grpc/Grpc.java b/webserver/grpc/src/main/java/io/helidon/webserver/grpc/Grpc.java index 0dcc498eaea..f9ff3d2eb4a 100644 --- a/webserver/grpc/src/main/java/io/helidon/webserver/grpc/Grpc.java +++ b/webserver/grpc/src/main/java/io/helidon/webserver/grpc/Grpc.java @@ -54,6 +54,15 @@ static Grpc unary(Descriptors.FileDescriptor proto, return grpc(proto, serviceName, methodName, ServerCalls.asyncUnaryCall(method)); } + static Grpc unary(Descriptors.FileDescriptor proto, + String serviceName, + String methodName, + GrpcServerCalls.Unary method) { + + return grpc(proto, serviceName, methodName, GrpcServerCalls.unaryCall(method)); + } + + static Grpc bidi(Descriptors.FileDescriptor proto, String serviceName, String methodName, @@ -136,17 +145,17 @@ private static Grpc grpc(Descriptors.FileDescriptor pro - to invoke a static method on it */ Class requestType = load(getClassName(mtd.getInputType())); - Class responsetype = load(getClassName(mtd.getOutputType())); + Class responseType = load(getClassName(mtd.getOutputType())); MethodDescriptor.Marshaller reqMarshaller = ProtoMarshaller.get(requestType); - MethodDescriptor.Marshaller resMarshaller = ProtoMarshaller.get(responsetype); + MethodDescriptor.Marshaller resMarshaller = ProtoMarshaller.get(responseType); io.grpc.MethodDescriptor.Builder grpcDesc = io.grpc.MethodDescriptor.newBuilder() .setFullMethodName(io.grpc.MethodDescriptor.generateFullMethodName(serviceName, methodName)) .setType(getMethodType(mtd)).setFullMethodName(path).setRequestMarshaller(reqMarshaller) .setResponseMarshaller(resMarshaller).setSampledToLocalTracing(true); - return new Grpc<>(grpcDesc.build(), PathMatchers.exact(path), requestType, responsetype, callHandler); + return new Grpc<>(grpcDesc.build(), PathMatchers.exact(path), requestType, responseType, callHandler); } diff --git a/webserver/grpc/src/main/java/io/helidon/webserver/grpc/GrpcProtocolHandler.java b/webserver/grpc/src/main/java/io/helidon/webserver/grpc/GrpcProtocolHandler.java index 8cc8b28f183..98aec247264 100644 --- a/webserver/grpc/src/main/java/io/helidon/webserver/grpc/GrpcProtocolHandler.java +++ b/webserver/grpc/src/main/java/io/helidon/webserver/grpc/GrpcProtocolHandler.java @@ -160,8 +160,8 @@ public void request(int numMessages) { @Override public void sendHeaders(Metadata headers) { - // todo ignoring headers, just sending required response headers WritableHeaders writable = WritableHeaders.create(); + writable.set(GRPC_CONTENT_TYPE); writable.set(GRPC_ENCODING_IDENTITY); diff --git a/webserver/grpc/src/main/java/io/helidon/webserver/grpc/GrpcProtocolSelector.java b/webserver/grpc/src/main/java/io/helidon/webserver/grpc/GrpcProtocolSelector.java index 3c7d108a0ad..c625af21e65 100644 --- a/webserver/grpc/src/main/java/io/helidon/webserver/grpc/GrpcProtocolSelector.java +++ b/webserver/grpc/src/main/java/io/helidon/webserver/grpc/GrpcProtocolSelector.java @@ -65,7 +65,7 @@ public SubProtocolResult subProtocol(ConnectionContext ctx, Headers httpHeaders = headers.httpHeaders(); if (httpHeaders.contains(HeaderNames.CONTENT_TYPE)) { - String contentType = httpHeaders.get(HeaderNames.CONTENT_TYPE).value(); + String contentType = httpHeaders.get(HeaderNames.CONTENT_TYPE).get(); if (contentType.startsWith("application/grpc")) { GrpcRouting routing = router.routing(GrpcRouting.class, GrpcRouting.empty()); diff --git a/webserver/grpc/src/main/java/io/helidon/webserver/grpc/GrpcRouting.java b/webserver/grpc/src/main/java/io/helidon/webserver/grpc/GrpcRouting.java index ee42c284e4c..43011ed6854 100644 --- a/webserver/grpc/src/main/java/io/helidon/webserver/grpc/GrpcRouting.java +++ b/webserver/grpc/src/main/java/io/helidon/webserver/grpc/GrpcRouting.java @@ -128,6 +128,25 @@ public Builder unary(Descriptors.FileDescriptor proto, return route(Grpc.unary(proto, serviceName, methodName, method)); } + /** + * Unary route. + * + * @param proto proto descriptor + * @param serviceName service name + * @param methodName method name + * @param method method to handle this route + * @param request type + * @param response type + * @return updated builder + */ + public Builder unary(Descriptors.FileDescriptor proto, + String serviceName, + String methodName, + GrpcServerCalls.Unary method) { + + return route(Grpc.unary(proto, serviceName, methodName, method)); + } + /** * Bidirectional route. * diff --git a/webserver/grpc/src/main/java/io/helidon/webserver/grpc/GrpcServerCalls.java b/webserver/grpc/src/main/java/io/helidon/webserver/grpc/GrpcServerCalls.java new file mode 100644 index 00000000000..c5e72a39b54 --- /dev/null +++ b/webserver/grpc/src/main/java/io/helidon/webserver/grpc/GrpcServerCalls.java @@ -0,0 +1,137 @@ +package io.helidon.webserver.grpc; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; + +import io.grpc.ServerCallHandler; +import io.grpc.stub.ServerCalls; +import io.grpc.stub.StreamObserver; + +public final class GrpcServerCalls { + private GrpcServerCalls() { + } + + static ServerCallHandler unaryCall(Unary method) { + return ServerCalls.asyncUnaryCall((request, responseObserver) -> { + try { + ResT response = method.invoke(request); + responseObserver.onNext(response); + responseObserver.onCompleted(); + } catch (Exception e) { + responseObserver.onError(e); + } + }); + } + + static ServerCallHandler clientStream(ClientStream method, Duration timeout) { + return ServerCalls.asyncClientStreamingCall(responseObserver -> { + CompletableFuture> future = new CompletableFuture<>(); + + future.orTimeout(timeout.getNano(), TimeUnit.NANOSECONDS) + .thenAccept(requests -> { + responseObserver.onNext(method.invoke(requests)); + responseObserver.onCompleted(); + }) + .exceptionally(throwable -> { + responseObserver.onError(throwable); + return null; + }); + + return new CollectingObserver<>(future); + }); + } + + static ServerCallHandler serverStream(ServerStream method) { + return ServerCalls.asyncServerStreamingCall((request, responseObserver) -> { + try { + Collection response = method.invoke(request); + for (ResT resT : response) { + responseObserver.onNext(resT); + } + responseObserver.onCompleted(); + } catch (Exception e) { + responseObserver.onError(e); + } + }); + } + + static ServerCallHandler bidi(Bidi method, Duration timeout) { + return ServerCalls.asyncBidiStreamingCall(responseObserver -> { + CompletableFuture> future = new CompletableFuture<>(); + + future.orTimeout(timeout.getNano(), TimeUnit.NANOSECONDS) + .thenAccept(requests -> { + Collection response = method.invoke(requests); + response.forEach(responseObserver::onNext); + responseObserver.onCompleted(); + }) + .exceptionally(throwable -> { + responseObserver.onError(throwable); + return null; + }); + + return new CollectingObserver<>(future); + }); + } + + public interface Unary { + RespT invoke(ReqT request); + } + + public interface ServerStream { + Collection invoke(ReqT request); + } + + public interface ClientStream { + RespT invoke(Collection requests); + } + + /** + * Bidirectional streaming is by its design created for asynchronous communication. + * This interface should be used only when you have a guarantee that the client sends all of its messages + * and DOES NOT WAIT for the responses on each of them. + *

+ * In case you need true asynchronous communication (e.g. clients sends a message, waits for server response, + * send another one), + * please use {@link io.grpc.stub.ServerCalls#asyncBidiStreamingCall(io.grpc.stub.ServerCalls.BidiStreamingMethod)}. + * + * @param request type + * @param response type + */ + public interface Bidi { + Collection invoke(Collection requests); + } + + /** + * Collects all elements (and possible exception) and completes the completable future when finished collecting. + * + * @param + */ + private static class CollectingObserver implements StreamObserver { + private final List collectedValues = new ArrayList<>(); + private final CompletableFuture> future; + + private CollectingObserver(CompletableFuture> future) { + this.future = future; + } + + @Override + public void onNext(T value) { + collectedValues.add(value); + } + + @Override + public void onError(Throwable t) { + future.completeExceptionally(t); + } + + @Override + public void onCompleted() { + future.complete(collectedValues); + } + } +} diff --git a/webserver/grpc/src/main/java/io/helidon/webserver/grpc/GrpcService.java b/webserver/grpc/src/main/java/io/helidon/webserver/grpc/GrpcService.java index 59581ae076b..a41f661a3a9 100644 --- a/webserver/grpc/src/main/java/io/helidon/webserver/grpc/GrpcService.java +++ b/webserver/grpc/src/main/java/io/helidon/webserver/grpc/GrpcService.java @@ -60,6 +60,7 @@ interface Routing { * @return updated routing */ Routing unary(String methodName, ServerCalls.UnaryMethod method); + Routing unary(String methodName, GrpcServerCalls.Unary method); /** * Bidirectional route. @@ -71,6 +72,7 @@ interface Routing { * @return updated routing */ Routing bidi(String methodName, ServerCalls.BidiStreamingMethod method); + Routing bidi(String methodName, GrpcServerCalls.Bidi method); /** * Server streaming route. @@ -82,6 +84,7 @@ interface Routing { * @return updated routing */ Routing serverStream(String methodName, ServerCalls.ServerStreamingMethod method); + Routing serverStream(String methodName, GrpcServerCalls.ServerStream method); /** * Client streaming route. @@ -93,5 +96,6 @@ interface Routing { * @return updated routing */ Routing clientStream(String methodName, ServerCalls.ClientStreamingMethod method); + Routing clientStream(String methodName, GrpcServerCalls.ClientStream method); } } diff --git a/webserver/grpc/src/main/java/io/helidon/webserver/grpc/GrpcServiceRoute.java b/webserver/grpc/src/main/java/io/helidon/webserver/grpc/GrpcServiceRoute.java index f90927009fd..5c553223872 100644 --- a/webserver/grpc/src/main/java/io/helidon/webserver/grpc/GrpcServiceRoute.java +++ b/webserver/grpc/src/main/java/io/helidon/webserver/grpc/GrpcServiceRoute.java @@ -78,12 +78,22 @@ public GrpcService.Routing unary(String methodName, ServerCalls.Una return this; } + @Override + public GrpcService.Routing unary(String methodName, GrpcServerCalls.Unary method) { + return null; + } + @Override public GrpcService.Routing bidi(String methodName, ServerCalls.BidiStreamingMethod method) { routes.add(Grpc.bidi(proto, serviceName, methodName, method)); return this; } + @Override + public GrpcService.Routing bidi(String methodName, GrpcServerCalls.Bidi method) { + return null; + } + @Override public GrpcService.Routing serverStream(String methodName, ServerCalls.ServerStreamingMethod method) { @@ -91,6 +101,11 @@ public GrpcService.Routing serverStream(String methodName, return this; } + @Override + public GrpcService.Routing serverStream(String methodName, GrpcServerCalls.ServerStream method) { + return null; + } + @Override public GrpcService.Routing clientStream(String methodName, ServerCalls.ClientStreamingMethod method) { @@ -98,6 +113,11 @@ public GrpcService.Routing clientStream(String methodName, return this; } + @Override + public GrpcService.Routing clientStream(String methodName, GrpcServerCalls.ClientStream method) { + return null; + } + public GrpcServiceRoute build() { return new GrpcServiceRoute(serviceName, List.copyOf(routes)); } diff --git a/webserver/testing/junit5/grpc/pom.xml b/webserver/testing/junit5/grpc/pom.xml new file mode 100644 index 00000000000..92a63e7f6c8 --- /dev/null +++ b/webserver/testing/junit5/grpc/pom.xml @@ -0,0 +1,57 @@ + + + + + 4.0.0 + + io.helidon.webserver.testing.junit5 + helidon-webserver-testing-junit5-project + 4.0.0-SNAPSHOT + + + helidon-webserver-testing-junit5-grpc + Helidon WebServer Testing JUnit5 gRPC + + + + io.helidon.webserver.testing.junit5 + helidon-webserver-testing-junit5 + + + io.helidon.webclient + helidon-webclient-grpc + + + io.helidon.webserver + helidon-webserver-grpc + + + org.junit.jupiter + junit-jupiter-api + provided + + + org.hamcrest + hamcrest-all + provided + + + diff --git a/webserver/testing/junit5/grpc/src/main/java/io/helidon/webserver/testing/junit5/grpc/GrpcServerExtension.java b/webserver/testing/junit5/grpc/src/main/java/io/helidon/webserver/testing/junit5/grpc/GrpcServerExtension.java new file mode 100644 index 00000000000..90a44068812 --- /dev/null +++ b/webserver/testing/junit5/grpc/src/main/java/io/helidon/webserver/testing/junit5/grpc/GrpcServerExtension.java @@ -0,0 +1,59 @@ +/* + * Copyright (c) 2024 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.helidon.webserver.testing.junit5.grpc; + +import io.helidon.webclient.grpc.GrpcClient; +import io.helidon.webserver.WebServer; +import io.helidon.webserver.testing.junit5.Junit5Util; +import io.helidon.webserver.testing.junit5.spi.ServerJunitExtension; + +import org.junit.jupiter.api.extension.ExtensionContext; +import org.junit.jupiter.api.extension.ParameterContext; +import org.junit.jupiter.api.extension.ParameterResolutionException; + +/** + * A {@link java.util.ServiceLoader} provider implementation that adds support for injection of gRPC related + * artifacts, such as {@link io.helidon.webclient.grpc.GrpcClient} in Helidon integration tests. + */ +public class GrpcServerExtension implements ServerJunitExtension { + /** + * Required constructor for {@link java.util.ServiceLoader}. + */ + public GrpcServerExtension() { + } + + @Override + public boolean supportsParameter(ParameterContext parameterContext, ExtensionContext extensionContext) + throws ParameterResolutionException { + return GrpcClient.class.equals(parameterContext.getParameter().getType()); + } + + @Override + public Object resolveParameter(ParameterContext parameterContext, + ExtensionContext extensionContext, + Class parameterType, + WebServer server) { + String socketName = Junit5Util.socketName(parameterContext.getParameter()); + + if (GrpcClient.class.equals(parameterType)) { + return GrpcClient.builder() + .baseUri("http://localhost:" + server.port(socketName)) + .build(); + } + throw new ParameterResolutionException("gRPC extension only supports GrpcClient parameter type"); + } +} diff --git a/webserver/testing/junit5/grpc/src/main/java/module-info.java b/webserver/testing/junit5/grpc/src/main/java/module-info.java new file mode 100644 index 00000000000..81f02a3db8a --- /dev/null +++ b/webserver/testing/junit5/grpc/src/main/java/module-info.java @@ -0,0 +1,32 @@ +/* + * Copyright (c) 2024 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Helidon WebServer Testing JUnit 5 Support for gRPC. + */ +module io.helidon.webserver.testing.junit5.grpc { + + requires io.helidon.webclient.grpc; + requires io.helidon.webserver.grpc; + + requires transitive io.helidon.webserver.testing.junit5; + + exports io.helidon.webserver.testing.junit5.grpc; + + provides io.helidon.webserver.testing.junit5.spi.ServerJunitExtension + with io.helidon.webserver.testing.junit5.grpc.GrpcServerExtension; + +} \ No newline at end of file diff --git a/webserver/testing/junit5/pom.xml b/webserver/testing/junit5/pom.xml index 00e10496c9f..0994ebe57cb 100644 --- a/webserver/testing/junit5/pom.xml +++ b/webserver/testing/junit5/pom.xml @@ -37,6 +37,7 @@ junit5 websocket http2 + grpc