From 1abe6c3a8eab049dc0bf79f02d53a96871f40d6b Mon Sep 17 00:00:00 2001 From: Santiago Pericas-Geertsen Date: Fri, 21 Jun 2024 09:45:27 -0400 Subject: [PATCH] Removes redundant class. Signed-off-by: Santiago Pericas-Geertsen --- .../webserver/grpc/ResponseHelper.java | 485 ------------------ 1 file changed, 485 deletions(-) delete mode 100644 webserver/grpc/src/main/java/io/helidon/webserver/grpc/ResponseHelper.java diff --git a/webserver/grpc/src/main/java/io/helidon/webserver/grpc/ResponseHelper.java b/webserver/grpc/src/main/java/io/helidon/webserver/grpc/ResponseHelper.java deleted file mode 100644 index f555589c705..00000000000 --- a/webserver/grpc/src/main/java/io/helidon/webserver/grpc/ResponseHelper.java +++ /dev/null @@ -1,485 +0,0 @@ -/* - * Copyright (c) 2019, 2023 Oracle and/or its affiliates. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.helidon.webserver.grpc; - -import java.util.concurrent.Callable; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionException; -import java.util.concurrent.CompletionStage; -import java.util.concurrent.Executor; -import java.util.concurrent.Executors; -import java.util.function.BiConsumer; -import java.util.function.Consumer; -import java.util.function.Supplier; -import java.util.stream.Stream; - -import io.grpc.stub.StreamObserver; - -/** - * A number of helper methods to handle sending responses to a {@link io.grpc.stub.StreamObserver}. - */ -public final class ResponseHelper { - private ResponseHelper() { - } - - /** - * Complete a gRPC request. - *

- * The request will be completed by calling {@link io.grpc.stub.StreamObserver#onNext(Object)} using the - * specified value then calling {@link io.grpc.stub.StreamObserver#onCompleted()}. - * - * @param observer the {@link io.grpc.stub.StreamObserver} to complete - * @param value the value to use when calling {@link io.grpc.stub.StreamObserver#onNext(Object)} - * @param they type of the request result - */ - public static void complete(StreamObserver observer, T value) { - StreamObserver safe = SafeStreamObserver.ensureSafeObserver(observer); - safe.onNext(value); - safe.onCompleted(); - } - - /** - * Complete a gRPC request based on the result of a {@link java.util.concurrent.CompletionStage}. - *

- * The request will be completed by calling {@link io.grpc.stub.StreamObserver#onNext(Object)} using the - * result obtained on completion of the specified {@link java.util.concurrent.CompletionStage} and then calling - * {@link io.grpc.stub.StreamObserver#onCompleted()}. - *

- * If the {@link java.util.concurrent.CompletionStage} completes with an error then - * {@link io.grpc.stub.StreamObserver#onError(Throwable)} - * will be called. - * - * @param observer the {@link io.grpc.stub.StreamObserver} to complete - * @param future the {@link java.util.concurrent.CompletionStage} to use to obtain the value to use to call - * {@link io.grpc.stub.StreamObserver#onNext(Object)} - * @param they type of the request result - */ - public static void complete(StreamObserver observer, CompletionStage future) { - future.whenComplete(completeWithResult(observer)); - } - - /** - * Asynchronously complete a gRPC request based on the result of a {@link java.util.concurrent.CompletionStage}. - *

- * The request will be completed by calling {@link io.grpc.stub.StreamObserver#onNext(Object)} using the - * result obtained on completion of the specified {@link java.util.concurrent.CompletionStage} and then calling - * {@link io.grpc.stub.StreamObserver#onCompleted()}. - *

- * If the {@link java.util.concurrent.CompletionStage} completes with an error then - * {@link io.grpc.stub.StreamObserver#onError(Throwable)} - * will be called. - *

- * The execution will take place asynchronously on the fork-join thread pool. - * - * @param observer the {@link io.grpc.stub.StreamObserver} to complete - * @param future the {@link java.util.concurrent.CompletionStage} to use to obtain the value to use to call - * {@link io.grpc.stub.StreamObserver#onNext(Object)} - * @param they type of the request result - */ - public static void completeAsync(StreamObserver observer, CompletionStage future) { - future.whenCompleteAsync(completeWithResult(observer)); - } - - /** - * Asynchronously complete a gRPC request based on the result of a {@link java.util.concurrent.CompletionStage}. - *

- * The request will be completed by calling {@link io.grpc.stub.StreamObserver#onNext(Object)} using the - * result obtained on completion of the specified {@link java.util.concurrent.CompletionStage} and then calling - * {@link io.grpc.stub.StreamObserver#onCompleted()}. - *

- * If the {@link java.util.concurrent.CompletionStage} completes with an error then - * {@link io.grpc.stub.StreamObserver#onError(Throwable)} - * will be called. - * - * @param observer the {@link io.grpc.stub.StreamObserver} to complete - * @param future the {@link java.util.concurrent.CompletionStage} to use to obtain the value to use to call - * {@link io.grpc.stub.StreamObserver#onNext(Object)} - * @param executor the {@link java.util.concurrent.Executor} on which to execute the asynchronous - * request completion - * @param they type of the request result - */ - public static void completeAsync(StreamObserver observer, CompletionStage future, Executor executor) { - future.whenCompleteAsync(completeWithResult(observer), executor); - } - - /** - * Complete a gRPC request based on the result of a {@link java.util.concurrent.Callable}. - *

- * The request will be completed by calling {@link io.grpc.stub.StreamObserver#onNext(Object)} using the - * result obtained on completion of the specified {@link java.util.concurrent.Callable} and then calling - * {@link io.grpc.stub.StreamObserver#onCompleted()}. - *

- * If the {@link java.util.concurrent.Callable#call()} method throws an exception then - * {@link io.grpc.stub.StreamObserver#onError(Throwable)} - * will be called. - * - * @param observer the {@link io.grpc.stub.StreamObserver} to complete - * @param callable the {@link java.util.concurrent.Callable} to use to obtain the value to use to call - * {@link io.grpc.stub.StreamObserver#onNext(Object)} - * @param they type of the request result - */ - public static void complete(StreamObserver observer, Callable callable) { - try { - observer.onNext(callable.call()); - observer.onCompleted(); - } catch (Throwable t) { - observer.onError(t); - } - } - - /** - * Asynchronously complete a gRPC request based on the result of a {@link java.util.concurrent.Callable}. - *

- * The request will be completed by calling {@link io.grpc.stub.StreamObserver#onNext(Object)} using the - * result obtained on completion of the specified {@link java.util.concurrent.Callable} and then calling - * {@link io.grpc.stub.StreamObserver#onCompleted()}. - *

- * If the {@link java.util.concurrent.Callable#call()} method throws an exception then - * {@link io.grpc.stub.StreamObserver#onError(Throwable)} - * will be called. - *

- * The execution will take place asynchronously on the fork-join thread pool. - * - * @param observer the {@link io.grpc.stub.StreamObserver} to complete - * @param callable the {@link java.util.concurrent.Callable} to use to obtain the value to use to call - * {@link io.grpc.stub.StreamObserver#onNext(Object)} - * @param they type of the request result - */ - public static void completeAsync(StreamObserver observer, Callable callable) { - completeAsync(observer, CompletableFuture.supplyAsync(createSupplier(callable))); - } - - /** - * Asynchronously complete a gRPC request based on the result of a {@link java.util.concurrent.Callable}. - *

- * The request will be completed by calling {@link io.grpc.stub.StreamObserver#onNext(Object)} using the - * result obtained on completion of the specified {@link java.util.concurrent.Callable} and then calling - * {@link io.grpc.stub.StreamObserver#onCompleted()}. - *

- * If the {@link java.util.concurrent.Callable#call()} method throws an exception then - * {@link io.grpc.stub.StreamObserver#onError(Throwable)} - * will be called. - * - * @param observer the {@link io.grpc.stub.StreamObserver} to complete - * @param callable the {@link java.util.concurrent.Callable} to use to obtain the value to use to call - * {@link io.grpc.stub.StreamObserver#onNext(Object)} - * @param executor the {@link java.util.concurrent.Executor} on which to execute the asynchronous - * request completion - * @param they type of the request result - */ - public static void completeAsync(StreamObserver observer, Callable callable, Executor executor) { - completeAsync(observer, CompletableFuture.supplyAsync(createSupplier(callable), executor)); - } - - /** - * Execute a {@link Runnable} task and on completion of the task complete the gRPC request by - * calling {@link io.grpc.stub.StreamObserver#onNext(Object)} using the specified result and then call - * {@link io.grpc.stub.StreamObserver#onCompleted()}. - *

- * If the {@link Runnable#run()} method throws an exception then {@link io.grpc.stub.StreamObserver#onError(Throwable)} - * will be called. - * - * @param observer the {@link io.grpc.stub.StreamObserver} to complete - * @param task the {@link Runnable} to execute - * @param result the result to pass to {@link io.grpc.stub.StreamObserver#onNext(Object)} - * @param they type of the request result - */ - public static void complete(StreamObserver observer, Runnable task, T result) { - complete(observer, Executors.callable(task, result)); - } - - /** - * Asynchronously execute a {@link Runnable} task and on completion of the task complete the gRPC - * request by calling {@link io.grpc.stub.StreamObserver#onNext(Object)} using the specified result and then - * call {@link io.grpc.stub.StreamObserver#onCompleted()}. - *

- * If the {@link Runnable#run()} method throws an exception then {@link io.grpc.stub.StreamObserver#onError(Throwable)} - * will be called. - *

- * The task and and request completion will be executed on the fork-join thread pool. - * - * @param observer the {@link io.grpc.stub.StreamObserver} to complete - * @param task the {@link Runnable} to execute - * @param result the result to pass to {@link io.grpc.stub.StreamObserver#onNext(Object)} - * @param they type of the request result - */ - public static void completeAsync(StreamObserver observer, Runnable task, T result) { - completeAsync(observer, Executors.callable(task, result)); - } - - /** - * Asynchronously execute a {@link Runnable} task and on completion of the task complete the gRPC - * request by calling {@link io.grpc.stub.StreamObserver#onNext(Object)} using the specified result and then - * call {@link io.grpc.stub.StreamObserver#onCompleted()}. - *

- * If the {@link Runnable#run()} method throws an exception then {@link io.grpc.stub.StreamObserver#onError(Throwable)} - * will be called. - * - * @param observer the {@link io.grpc.stub.StreamObserver} to complete - * @param task the {@link Runnable} to execute - * @param result the result to pass to {@link io.grpc.stub.StreamObserver#onNext(Object)} - * @param executor the {@link java.util.concurrent.Executor} on which to execute the asynchronous - * request completion - * @param they type of the request result - */ - public static void completeAsync(StreamObserver observer, Runnable task, T result, Executor executor) { - completeAsync(observer, Executors.callable(task, result), executor); - } - - /** - * Send the values from a {@link java.util.stream.Stream} to the {@link io.grpc.stub.StreamObserver#onNext(Object)} method - * until the - * {@link java.util.stream.Stream} is exhausted call {@link io.grpc.stub.StreamObserver#onCompleted()}. - *

- * If an error occurs whilst streaming results then {@link io.grpc.stub.StreamObserver#onError(Throwable)} will be called. - * - * @param observer the {@link io.grpc.stub.StreamObserver} to complete - * @param stream the {@link java.util.stream.Stream} of results to send to - * {@link io.grpc.stub.StreamObserver#onNext(Object)} - * @param they type of the request result - */ - public static void stream(StreamObserver observer, Stream stream) { - stream(observer, () -> stream); - } - - /** - * Asynchronously send the values from a {@link java.util.stream.Stream} to the - * {@link io.grpc.stub.StreamObserver#onNext(Object)} method until - * the {@link java.util.stream.Stream} is exhausted call {@link io.grpc.stub.StreamObserver#onCompleted()}. - *

- * If an error occurs whilst streaming results then {@link io.grpc.stub.StreamObserver#onError(Throwable)} will be called. - * - * @param observer the {@link io.grpc.stub.StreamObserver} to complete - * @param stream the {@link java.util.stream.Stream} of results to send to - * {@link io.grpc.stub.StreamObserver#onNext(Object)} - * @param executor the {@link java.util.concurrent.Executor} on which to execute the asynchronous - * request completion - * @param they type of the request result - */ - public static void streamAsync(StreamObserver observer, Stream stream, Executor executor) { - executor.execute(() -> stream(observer, () -> stream)); - } - - /** - * Send the values from a {@link java.util.stream.Stream} to the {@link io.grpc.stub.StreamObserver#onNext(Object)} method - * until the - * {@link java.util.stream.Stream} is exhausted call {@link io.grpc.stub.StreamObserver#onCompleted()}. - *

- * If an error occurs whilst streaming results then {@link io.grpc.stub.StreamObserver#onError(Throwable)} will be called. - * - * @param observer the {@link io.grpc.stub.StreamObserver} to complete - * @param supplier the {@link java.util.function.Supplier} of the {@link java.util.stream.Stream} of results to send to - * {@link io.grpc.stub.StreamObserver#onNext(Object)} - * @param they type of the request result - */ - public static void stream(StreamObserver observer, Supplier> supplier) { - StreamObserver safe = SafeStreamObserver.ensureSafeObserver(observer); - Throwable thrown = null; - - try { - supplier.get().forEach(safe::onNext); - } catch (Throwable t) { - thrown = t; - } - - if (thrown == null) { - safe.onCompleted(); - } else { - safe.onError(thrown); - } - } - - /** - * Asynchronously send the values from a {@link java.util.stream.Stream} to the - * {@link io.grpc.stub.StreamObserver#onNext(Object)} method - * until the {@link java.util.stream.Stream} is exhausted call {@link io.grpc.stub.StreamObserver#onCompleted()}. - *

- * If an error occurs whilst streaming results then {@link io.grpc.stub.StreamObserver#onError(Throwable)} will be called. - * - * @param observer the {@link io.grpc.stub.StreamObserver} to complete - * @param supplier the {@link java.util.function.Supplier} of the {@link java.util.stream.Stream} of results to send to - * {@link io.grpc.stub.StreamObserver#onNext(Object)} - * @param executor the {@link java.util.concurrent.Executor} on which to execute the asynchronous - * request completion - * @param they type of the request result - */ - public static void streamAsync(StreamObserver observer, Supplier> supplier, Executor executor) { - executor.execute(() -> stream(observer, supplier)); - } - - /** - * Obtain a {@link java.util.function.Consumer} that can be used to send values to the - * {@link io.grpc.stub.StreamObserver#onNext(Object)} method until - * the {@link java.util.concurrent.CompletionStage} completes then call {@link io.grpc.stub.StreamObserver#onCompleted()}. - *

- * If the {@link java.util.concurrent.CompletionStage} completes with an error then - * {@link io.grpc.stub.StreamObserver#onError(Throwable)} - * will be called instead of {@link io.grpc.stub.StreamObserver#onCompleted()}. - * - * @param observer the {@link io.grpc.stub.StreamObserver} to send values to and complete when the - * {@link java.util.concurrent.CompletionStage} completes - * @param stage the {@link java.util.concurrent.CompletionStage} to await completion of - * @param they type of the request result - * @return a {@link java.util.function.Consumer} that can be used to send values to the - * {@link io.grpc.stub.StreamObserver#onNext(Object)} method - */ - // todo: a bit of a chicken or egg when used with Coherence streaming methods, isn't it? - public static Consumer stream(StreamObserver observer, CompletionStage stage) { - StreamObserver safe = SafeStreamObserver.ensureSafeObserver(observer); - stage.whenComplete(completeWithoutResult(safe)); - return safe::onNext; - } - - /** - * Obtain a {@link java.util.function.Consumer} that can be used to send values to the - * {@link io.grpc.stub.StreamObserver#onNext(Object)} method until - * the {@link java.util.concurrent.CompletionStage} completes then asynchronously call - * {@link io.grpc.stub.StreamObserver#onCompleted()} using the - * fork-join thread pool. - *

- * If the {@link java.util.concurrent.CompletionStage} completes with an error then - * {@link io.grpc.stub.StreamObserver#onError(Throwable)} - * will be called instead of {@link io.grpc.stub.StreamObserver#onCompleted()}. - * - * @param observer the {@link io.grpc.stub.StreamObserver} to send values to and complete when the - * {@link java.util.concurrent.CompletionStage} completes - * @param stage the {@link java.util.concurrent.CompletionStage} to await completion of - * @param they type of the request result - * @return a {@link java.util.function.Consumer} that can be used to send values to the - * {@link io.grpc.stub.StreamObserver#onNext(Object)} method - */ - public static Consumer streamAsync(StreamObserver observer, CompletionStage stage) { - StreamObserver safe = SafeStreamObserver.ensureSafeObserver(observer); - stage.whenCompleteAsync(completeWithoutResult(safe)); - return value -> CompletableFuture.runAsync(() -> safe.onNext(value)); - } - - /** - * Obtain a {@link java.util.function.Consumer} that can be used to send values to the - * {@link io.grpc.stub.StreamObserver#onNext(Object)} method until - * the {@link java.util.concurrent.CompletionStage} completes then asynchronously call - * {@link io.grpc.stub.StreamObserver#onCompleted()} using the executor - * thread. - *

- * If the {@link java.util.concurrent.CompletionStage} completes with an error then - * {@link io.grpc.stub.StreamObserver#onError(Throwable)} - * will be called instead of {@link io.grpc.stub.StreamObserver#onCompleted()}. - * - * @param observer the {@link io.grpc.stub.StreamObserver} to send values to and complete when the - * {@link java.util.concurrent.CompletionStage} completes - * @param stage the {@link java.util.concurrent.CompletionStage} to await completion of - * @param executor the {@link java.util.concurrent.Executor} on which to execute the asynchronous - * request completion - * @param they type of the request result - * @return a {@link java.util.function.Consumer} that can be used to send values to the - * {@link io.grpc.stub.StreamObserver#onNext(Object)} method - */ - public static Consumer streamAsync(StreamObserver observer, CompletionStage stage, Executor executor) { - StreamObserver safe = SafeStreamObserver.ensureSafeObserver(observer); - stage.whenCompleteAsync(completeWithoutResult(safe), executor); - return value -> CompletableFuture.runAsync(() -> safe.onNext(value), executor); - } - - /** - * Obtain a {@link java.util.function.Consumer} that can be used to send values to the - * {@link io.grpc.stub.StreamObserver#onNext(Object)} method. - * - * @param observer the {@link io.grpc.stub.StreamObserver} to complete - * @param the type of the result - * @param the type of the response - * @return a {@link java.util.function.Consumer} that can be used to send values to the - * {@link io.grpc.stub.StreamObserver#onNext(Object)} method - */ - public static BiConsumer completeWithResult(StreamObserver observer) { - return new CompletionAction<>(observer, true); - } - - /** - * Obtain a {@link java.util.function.Consumer} that can be used to complete a {@link io.grpc.stub.StreamObserver}. - * - * @param observer the {@link io.grpc.stub.StreamObserver} to complete - * @param the type of the response - * @return a {@link java.util.function.Consumer} that can be used to complete a {@link io.grpc.stub.StreamObserver} - */ - public static BiConsumer completeWithoutResult(StreamObserver observer) { - return new CompletionAction<>(observer, false); - } - - /** - * Convert a {@link java.util.concurrent.Callable} to a {@link java.util.function.Supplier}. - * - * @param callable the {@link java.util.concurrent.Callable} to convert - * @param the result returned by the {@link java.util.concurrent.Callable} - * @return a {@link java.util.function.Supplier} that wraps the {@link java.util.concurrent.Callable} - */ - public static Supplier createSupplier(Callable callable) { - return new CallableSupplier<>(callable); - } - - /** - * A {@link java.util.function.BiConsumer} that is used to handle completion of a - * {@link java.util.concurrent.CompletionStage} by forwarding - * the result to a {@link io.grpc.stub.StreamObserver}. - * - * @param the type of the {@link java.util.concurrent.CompletionStage}'s result - * @param the type of result expected by the {@link io.grpc.stub.StreamObserver} - */ - private static class CompletionAction implements BiConsumer { - private StreamObserver observer; - private boolean sendResult; - - CompletionAction(StreamObserver observer, boolean sendResult) { - this.observer = observer; - this.sendResult = sendResult; - } - - @Override - @SuppressWarnings("unchecked") - public void accept(T result, Throwable error) { - if (error != null) { - observer.onError(error); - } else { - if (sendResult) { - observer.onNext((U) result); - } - observer.onCompleted(); - } - } - } - - /** - * A class that converts a {@link java.util.concurrent.Callable} to a {@link java.util.function.Supplier}. - * - * @param the type of result returned from the callable - */ - private static class CallableSupplier implements Supplier { - private Callable callable; - - CallableSupplier(Callable callable) { - this.callable = callable; - } - - @Override - public T get() { - try { - return callable.call(); - } catch (Exception e) { - throw new CompletionException(e.getMessage(), e); - } - } - } -}