diff --git a/joylive-core/joylive-core-api/src/main/java/com/jd/live/agent/core/exception/WrappedException.java b/joylive-core/joylive-core-api/src/main/java/com/jd/live/agent/core/exception/WrappedException.java new file mode 100644 index 000000000..0401e8565 --- /dev/null +++ b/joylive-core/joylive-core-api/src/main/java/com/jd/live/agent/core/exception/WrappedException.java @@ -0,0 +1,19 @@ +/* + * Copyright © ${year} ${owner} (${email}) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.jd.live.agent.core.exception; + +public interface WrappedException { +} diff --git a/joylive-core/joylive-core-api/src/main/java/com/jd/live/agent/core/util/ExceptionUtils.java b/joylive-core/joylive-core-api/src/main/java/com/jd/live/agent/core/util/ExceptionUtils.java index 83e10ce3c..82fbd799f 100644 --- a/joylive-core/joylive-core-api/src/main/java/com/jd/live/agent/core/util/ExceptionUtils.java +++ b/joylive-core/joylive-core-api/src/main/java/com/jd/live/agent/core/util/ExceptionUtils.java @@ -15,9 +15,13 @@ */ package com.jd.live.agent.core.util; +import com.jd.live.agent.core.exception.WrappedException; + +import java.lang.reflect.InvocationTargetException; import java.util.LinkedHashSet; import java.util.LinkedList; import java.util.Set; +import java.util.concurrent.ExecutionException; import java.util.function.Predicate; /** @@ -81,4 +85,14 @@ public static Set getExceptions(Throwable e, Predicate predic return names; } + /** + * Checks if the given throwable is not a wrapped exception. + * + * @param e The throwable to check. + * @return {@code true} if the throwable is not a wrapped exception, {@code false} otherwise. + */ + public static boolean isNoneWrapped(Throwable e) { + return !(e instanceof WrappedException || e instanceof InvocationTargetException || e instanceof ExecutionException); + } + } diff --git a/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/exception/ErrorCause.java b/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/exception/ErrorCause.java index 7dd144aff..4f4768deb 100644 --- a/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/exception/ErrorCause.java +++ b/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/exception/ErrorCause.java @@ -23,6 +23,7 @@ import java.util.function.Function; import java.util.function.Predicate; +import static com.jd.live.agent.core.util.ExceptionUtils.isNoneWrapped; import static com.jd.live.agent.governance.exception.ErrorPolicy.containsException; @AllArgsConstructor @@ -109,7 +110,7 @@ public static ErrorCause cause(Throwable throwable, Set exceptions = new LinkedHashSet<>(8); Throwable cause = null; Throwable candiate = null; - Throwable t = throwable; + Throwable t = isNoneWrapped(throwable) ? throwable : throwable.getCause(); ErrorName errorName; Predicate test = predicate == null ? null : predicate.getPredicate(); while (t != null) { diff --git a/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/invoke/Invocation.java b/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/invoke/Invocation.java index fd2e2b4f4..7e7199b01 100644 --- a/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/invoke/Invocation.java +++ b/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/invoke/Invocation.java @@ -172,7 +172,7 @@ public GatewayRole getGateway() { * Resets the state of the instance. * This method is typically used to restore the instance to its initial state. */ - public void reset() { + public void resetOnRetry() { } diff --git a/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/invoke/OutboundInvocation.java b/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/invoke/OutboundInvocation.java index 8ac3a770a..59085f120 100644 --- a/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/invoke/OutboundInvocation.java +++ b/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/invoke/OutboundInvocation.java @@ -99,7 +99,7 @@ protected LiveParser createLiveParser() { } @Override - public void reset() { + public void resetOnRetry() { listeners = null; routeTarget = null; } diff --git a/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/invoke/cluster/AbstractClusterInvoker.java b/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/invoke/cluster/AbstractClusterInvoker.java index e8a455de0..954ca02ea 100644 --- a/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/invoke/cluster/AbstractClusterInvoker.java +++ b/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/invoke/cluster/AbstractClusterInvoker.java @@ -23,6 +23,7 @@ import com.jd.live.agent.governance.invoke.InvocationContext; import com.jd.live.agent.governance.invoke.OutboundInvocation; import com.jd.live.agent.governance.policy.service.cluster.ClusterPolicy; +import com.jd.live.agent.governance.request.RoutedRequest; import com.jd.live.agent.governance.request.ServiceRequest.OutboundRequest; import com.jd.live.agent.governance.response.ServiceResponse.OutboundResponse; @@ -72,9 +73,6 @@ E extends Endpoint> CompletionStage invoke(LiveCluster cluster, Outb InvocationContext context = invocation.getContext(); R request = invocation.getRequest(); List instances = invocation.getInstances(); - if (counter > 0) { - invocation.reset(); - } CompletionStage> discoveryStage = instances == null || instances.isEmpty() || counter > 0 ? cluster.route(request) : CompletableFuture.completedFuture((List) instances); @@ -82,7 +80,7 @@ E extends Endpoint> CompletionStage invoke(LiveCluster cluster, Outb if (t == null) { E endpoint = null; try { - endpoint = context.route(invocation, v); + endpoint = request instanceof RoutedRequest ? ((RoutedRequest) request).getEndpoint() : context.route(invocation, v); E instance = endpoint; onStartRequest(cluster, request, endpoint); CompletionStage stage = context.outbound(invocation, endpoint, () -> cluster.invoke(request, instance)); @@ -193,8 +191,8 @@ E extends Endpoint> void onException(LiveCluster cluster, try { invocation.onFailure(endpoint, cause); if (error == null) { - // Request was handled successfully by degrade - cluster.onSuccess(response, request, endpoint); + // Request was recover successfully by degrade + cluster.onRecover(response, request, endpoint); } else if (cause instanceof LiveException) { // Request did not go off box cluster.onDiscard(request); diff --git a/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/invoke/cluster/FailoverClusterInvoker.java b/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/invoke/cluster/FailoverClusterInvoker.java index 07c72fe44..a71cd0509 100644 --- a/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/invoke/cluster/FailoverClusterInvoker.java +++ b/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/invoke/cluster/FailoverClusterInvoker.java @@ -17,8 +17,6 @@ import com.jd.live.agent.bootstrap.exception.RejectException.RejectUnreadyException; import com.jd.live.agent.bootstrap.exception.Unretryable; -import com.jd.live.agent.bootstrap.logger.Logger; -import com.jd.live.agent.bootstrap.logger.LoggerFactory; import com.jd.live.agent.core.extension.annotation.Extension; import com.jd.live.agent.core.inject.annotation.Inject; import com.jd.live.agent.core.inject.annotation.Injectable; @@ -29,7 +27,6 @@ import com.jd.live.agent.governance.exception.ServiceError; import com.jd.live.agent.governance.instance.Endpoint; import com.jd.live.agent.governance.invoke.OutboundInvocation; -import com.jd.live.agent.governance.invoke.filter.route.CircuitBreakerFilter; import com.jd.live.agent.governance.policy.service.ServicePolicy; import com.jd.live.agent.governance.policy.service.cluster.ClusterPolicy; import com.jd.live.agent.governance.policy.service.cluster.RetryPolicy; @@ -58,8 +55,6 @@ @Extension(value = ClusterInvoker.TYPE_FAILOVER, order = ClusterInvoker.ORDER_FAILOVER) public class FailoverClusterInvoker extends AbstractClusterInvoker { - private static final Logger logger = LoggerFactory.getLogger(CircuitBreakerFilter.class); - @Inject private Map codeParsers; @@ -78,9 +73,9 @@ E extends Endpoint> CompletionStage execute(LiveCluster cluster, request.getAttributeIfAbsent(Request.KEY_ERROR_POLICY, k -> new HashSet()).add(retryPolicy); } RetryContext retryContext = new RetryContext<>(codeParsers, retryPolicy, cluster); - Supplier> supplier = () -> invoke(cluster, invocation, retryContext.getCount()); + Supplier> supplier = () -> invoke(cluster, invocation, retryContext.getAndIncrement()); cluster.onStart(request); - return retryContext.execute(request, supplier).exceptionally(e -> + return retryContext.execute(invocation, supplier).exceptionally(e -> cluster.createResponse( cluster.createException(e, invocation), request, @@ -149,14 +144,14 @@ private static class RetryContext * - * @param request The request that was being processed. - * @param supplier A supplier providing the operation to be executed as a {@link CompletionStage}. + * @param invocation The {@link OutboundInvocation} representing the specific request and its routing information. + * @param supplier A supplier providing the operation to be executed as a {@link CompletionStage}. * @return A {@link CompletionStage} representing the eventual completion of the operation, * either successfully or with an error. */ - public CompletionStage execute(R request, Supplier> supplier) { + public CompletionStage execute(OutboundInvocation invocation, Supplier> supplier) { CompletableFuture result = new CompletableFuture<>(); - doExecute(request, supplier, result); + doExecute(invocation, supplier, result); return result; } @@ -164,22 +159,21 @@ public CompletionStage execute(R request, Supplier> suppli /** * Recursively executes the operation, applying retry logic and completing the future * based on the outcome of each attempt. - *

- * This method is called internally to handle the actual execution and retry logic. - * It uses the {@link RetryPolicy} to determine whether an operation should be retried - * in case of failure or certain response conditions. - *

* - * @param request The request that was being processed. - * @param supplier A supplier providing the operation to be executed. - * @param future The {@link CompletableFuture} to be completed with the operation's result. + * @param invocation The {@link OutboundInvocation} representing the specific request and its routing information. + * @param supplier A supplier providing the operation to be executed. + * @param future The {@link CompletableFuture} to be completed with the operation's result. */ - private void doExecute(R request, Supplier> supplier, CompletableFuture future) { - int count = counter.getAndIncrement(); - cluster.onRetry(count); + private void doExecute(OutboundInvocation invocation, Supplier> supplier, CompletableFuture future) { + int count = counter.get(); + R request = invocation.getRequest(); + if (count > 0) { + invocation.resetOnRetry(); + } + cluster.onRetry(request, count); CompletionStage stage = supplier.get(); stage.whenComplete((v, e) -> { - ServiceError se = v.getError(); + ServiceError se = v == null ? null : v.getError(); Throwable throwable = se == null ? e : se.getThrowable(); switch (isRetryable(request, v, e, count)) { case RETRY: @@ -187,7 +181,7 @@ private void doExecute(R request, Supplier> supplier, Complet if (unreadyException != null) { future.completeExceptionally(unreadyException); } else { - doExecute(request, supplier, future); + doExecute(invocation, supplier, future); } break; case EXHAUSTED: @@ -206,8 +200,8 @@ private void doExecute(R request, Supplier> supplier, Complet }); } - private int getCount() { - return counter.get(); + private int getAndIncrement() { + return counter.getAndIncrement(); } /** diff --git a/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/invoke/cluster/LiveCluster.java b/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/invoke/cluster/LiveCluster.java index c3b0ebbd2..f3a3cb0d2 100644 --- a/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/invoke/cluster/LiveCluster.java +++ b/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/invoke/cluster/LiveCluster.java @@ -193,10 +193,11 @@ default void onDiscard(R request) { /** * Handles retry logic for a process or operation, providing a hook for custom actions on each retry attempt. * + * @param request The request object that is about to be sent * @param retries The number of retry attempts that have occurred so far. This count is 1-based, meaning that * the first retry attempt will pass a value of 1. */ - default void onRetry(int retries) { + default void onRetry(R request, int retries) { } @@ -236,5 +237,16 @@ default void onSuccess(O response, R request, E endpoint) { } + /** + * Handles the recovery of a request by delegating to the onSuccess method. + * + * @param response The response object. + * @param request The request object that was recovered. + * @param endpoint The endpoint associated with the request. + */ + default void onRecover(O response, R request, E endpoint) { + onSuccess(response, request, endpoint); + } + } diff --git a/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/request/RoutedRequest.java b/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/request/RoutedRequest.java new file mode 100644 index 000000000..ef310ca6e --- /dev/null +++ b/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/request/RoutedRequest.java @@ -0,0 +1,34 @@ +/* + * Copyright © ${year} ${owner} (${email}) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.jd.live.agent.governance.request; + +import com.jd.live.agent.governance.instance.Endpoint; + +/** + * An interface representing a request that has already been routed to an appropriate endpoint. + * Extends the {@link Request} interface. + */ +public interface RoutedRequest extends Request { + + /** + * Retrieves the endpoint to which the request has been routed. + * + * @param The type of the endpoint. + * @return The selected endpoint for routing the request. + */ + E getEndpoint(); +} + diff --git a/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/util/ResponseUtils.java b/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/util/ResponseUtils.java index a9861d65e..6051f7084 100644 --- a/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/util/ResponseUtils.java +++ b/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/util/ResponseUtils.java @@ -15,12 +15,12 @@ */ package com.jd.live.agent.governance.util; +import com.jd.live.agent.core.util.ExceptionUtils; + import java.io.UnsupportedEncodingException; -import java.lang.reflect.InvocationTargetException; import java.net.URLEncoder; import java.nio.charset.StandardCharsets; import java.util.Set; -import java.util.concurrent.ExecutionException; import java.util.function.BiConsumer; import java.util.function.Predicate; @@ -35,8 +35,6 @@ public class ResponseUtils { public static final int HEADER_SIZE_LIMIT = 1024 * 2; - public static final Predicate NONE_EXECUTION_PREDICATE = e -> !(e instanceof ExecutionException) && !(e instanceof InvocationTargetException); - /** * Converts a Set of Strings to a String, using the specified delimiter and truncating if the resulting String exceeds the maximum length. * @@ -103,7 +101,7 @@ public static void labelHeaders(Throwable e, BiConsumer consumer * @param consumer a BiConsumer to accept the generated headers */ public static void labelHeaders(Throwable e, Predicate predicate, BiConsumer consumer) { - describe(e, predicate == null ? NONE_EXECUTION_PREDICATE : predicate, (name, message) -> { + describe(e, predicate == null ? ExceptionUtils::isNoneWrapped : predicate, (name, message) -> { if (name != null && !name.isEmpty()) { consumer.accept(EXCEPTION_NAMES_LABEL, name); } diff --git a/joylive-demo/joylive-demo-grpc/joylive-demo-grpc-provider/src/main/java/com/jd/live/agent/demo/grpc/provider/service/UserServiceGrpcImpl.java b/joylive-demo/joylive-demo-grpc/joylive-demo-grpc-provider/src/main/java/com/jd/live/agent/demo/grpc/provider/service/UserServiceGrpcImpl.java index 0ae2a4705..26f6fb721 100644 --- a/joylive-demo/joylive-demo-grpc/joylive-demo-grpc-provider/src/main/java/com/jd/live/agent/demo/grpc/provider/service/UserServiceGrpcImpl.java +++ b/joylive-demo/joylive-demo-grpc/joylive-demo-grpc-provider/src/main/java/com/jd/live/agent/demo/grpc/provider/service/UserServiceGrpcImpl.java @@ -19,11 +19,17 @@ import io.grpc.stub.StreamObserver; import net.devh.boot.grpc.server.service.GrpcService; +import java.util.concurrent.ThreadLocalRandom; + @GrpcService public class UserServiceGrpcImpl extends UserServiceGrpc.UserServiceImplBase { @Override public void get(UserGetRequest request, StreamObserver responseObserver) { + if (ThreadLocalRandom.current().nextBoolean()) { + responseObserver.onError(new RuntimeException("error")); + return; + } UserGetResponse.Builder builder = UserGetResponse.newBuilder(); builder.setId(request.getId()) .setName("index :" + request.getId() + " time : " + System.currentTimeMillis()) diff --git a/joylive-demo/joylive-demo-grpc/joylive-demo-grpc-provider/src/main/resources/application.yml b/joylive-demo/joylive-demo-grpc/joylive-demo-grpc-provider/src/main/resources/application.yml index 48d13f100..27379b8f6 100644 --- a/joylive-demo/joylive-demo-grpc/joylive-demo-grpc-provider/src/main/resources/application.yml +++ b/joylive-demo/joylive-demo-grpc/joylive-demo-grpc-provider/src/main/resources/application.yml @@ -10,6 +10,6 @@ spring: password: ${NACOS_PASSWORD:nacos} grpc: server: - port: 9898 + port: ${GRPC_SERVER_PORT:${random.int[20000,21000]}} server: - port: ${SERVER_PORT:9899} + port: ${SERVER_PORT:${random.int[20000,21000]}} diff --git a/joylive-package/src/main/assembly/config/microservice.json b/joylive-package/src/main/assembly/config/microservice.json index 3c9bea070..40d12f4e8 100644 --- a/joylive-package/src/main/assembly/config/microservice.json +++ b/joylive-package/src/main/assembly/config/microservice.json @@ -604,5 +604,48 @@ ] } ] + }, + { + "name": "grpc-provider", + "serviceType": "RPC_APP", + "version": 0, + "groups": [ + { + "name": "default", + "defaultGroup": true, + "servicePolicy": { + "livePolicy": { + "unitPolicy": "UNIT", + "writeProtect": false, + "cellPolicy": "PREFER_LOCAL_CELL" + } + }, + "paths": [ + { + "path": "com.jd.live.agent.demo.grpc.service.api.UserService", + "matchType": "EQUAL", + "methods": [ + { + "name": "get", + "servicePolicy": { + "clusterPolicy": { + "type": "failover", + "retryPolicy": { + "retry": 2, + "interval": 1000, + "timeout": 5000, + "exceptions": [ + "java.lang.RuntimeException" + ], + "version": 1704038400000 + } + } + } + } + ] + } + ] + } + ] } ] \ No newline at end of file diff --git a/joylive-plugin/joylive-router/joylive-router-grpc/pom.xml b/joylive-plugin/joylive-router/joylive-router-grpc/pom.xml index ddcfa8156..0ceb1e5b4 100644 --- a/joylive-plugin/joylive-router/joylive-router-grpc/pom.xml +++ b/joylive-plugin/joylive-router/joylive-router-grpc/pom.xml @@ -13,6 +13,7 @@ 1.63.0 3.1.8 2.15.0.RELEASE + 4.29.2 @@ -28,6 +29,12 @@ ${spring.cloud.version} provided + + com.google.protobuf + protobuf-java-util + ${protobuf.version} + provided + net.devh grpc-client-spring-boot-autoconfigure diff --git a/joylive-plugin/joylive-router/joylive-router-grpc/src/main/java/com/jd/live/agent/plugin/router/gprc/cluster/GrpcCluster.java b/joylive-plugin/joylive-router/joylive-router-grpc/src/main/java/com/jd/live/agent/plugin/router/gprc/cluster/GrpcCluster.java index 1740659c6..1f0d93625 100644 --- a/joylive-plugin/joylive-router/joylive-router-grpc/src/main/java/com/jd/live/agent/plugin/router/gprc/cluster/GrpcCluster.java +++ b/joylive-plugin/joylive-router/joylive-router-grpc/src/main/java/com/jd/live/agent/plugin/router/gprc/cluster/GrpcCluster.java @@ -1,6 +1,5 @@ package com.jd.live.agent.plugin.router.gprc.cluster; -import com.jd.live.agent.core.util.CollectionUtils; import com.jd.live.agent.governance.exception.ErrorPredicate; import com.jd.live.agent.governance.exception.ServiceError; import com.jd.live.agent.governance.invoke.OutboundInvocation; @@ -8,11 +7,10 @@ import com.jd.live.agent.governance.policy.service.circuitbreak.DegradeConfig; import com.jd.live.agent.plugin.router.gprc.instance.GrpcEndpoint; import com.jd.live.agent.plugin.router.gprc.loadbalance.LiveDiscovery; -import com.jd.live.agent.plugin.router.gprc.loadbalance.LivePickerAdvice; -import com.jd.live.agent.plugin.router.gprc.loadbalance.LiveSubchannel; +import com.jd.live.agent.plugin.router.gprc.loadbalance.LiveRequest; import com.jd.live.agent.plugin.router.gprc.request.GrpcRequest.GrpcOutboundRequest; import com.jd.live.agent.plugin.router.gprc.response.GrpcResponse.GrpcOutboundResponse; -import io.grpc.ClientCall; +import io.grpc.LoadBalancer.SubchannelPicker; import java.util.List; import java.util.concurrent.CompletableFuture; @@ -22,44 +20,59 @@ public class GrpcCluster extends AbstractLiveCluster { - private ClientCall clientCall; + public static final GrpcCluster INSTANCE = new GrpcCluster(); - private LivePickerAdvice advice; - - private CompletionStage stage; + @Override + public CompletionStage> route(GrpcOutboundRequest request) { + if (!request.hasEndpoint()) { + // the endpoint maybe null in initialization + // wait for picker + SubchannelPicker picker = LiveDiscovery.getSubchannelPicker(request.getService()); + if (picker != null) { + picker.pickSubchannel(request.getRequest()); + } + } + + // request is already routed + return CompletableFuture.completedFuture(null); + } - public GrpcCluster(ClientCall clientCall, LivePickerAdvice advice, CompletionStage stage) { - this.clientCall = clientCall; - this.advice = advice; - this.stage = stage; + @Override + public CompletionStage invoke(GrpcOutboundRequest request, GrpcEndpoint endpoint) { + LiveRequest req = request.getRequest(); + return req.sendMessage(); } @Override - public CompletionStage> route(GrpcOutboundRequest request) { - List subchannels = LiveDiscovery.getSubchannel(request.getService()); - return CompletableFuture.completedFuture(CollectionUtils.convert(subchannels, GrpcEndpoint::new)); + public void onRetry(GrpcOutboundRequest request, int retries) { + if (retries > 0) { + request.getRequest().onRetry(); + } } @Override - public CompletionStage invoke(GrpcOutboundRequest request, GrpcEndpoint endpoint) { - advice.setSubchannel(endpoint.getSubchannel()); - clientCall.sendMessage(request.getRequest()); - return stage; + public void onRecover(GrpcOutboundResponse response, GrpcOutboundRequest request, GrpcEndpoint endpoint) { + request.getRequest().onRecover(); } @Override protected GrpcOutboundResponse createResponse(GrpcOutboundRequest request) { - return null; + return new GrpcOutboundResponse(null); } @Override protected GrpcOutboundResponse createResponse(GrpcOutboundRequest request, DegradeConfig degradeConfig) { - return null; + try { + Object response = request.getRequest().parse(degradeConfig.getResponseBody()); + return new GrpcOutboundResponse(response); + } catch (Throwable e) { + return createResponse(new ServiceError(createException(e, request), false), getRetryPredicate()); + } } @Override protected GrpcOutboundResponse createResponse(ServiceError error, ErrorPredicate predicate) { - return null; + return new GrpcOutboundResponse(error, predicate); } @Override @@ -76,4 +89,6 @@ public Throwable createException(Throwable throwable, GrpcOutboundRequest reques public Throwable createException(Throwable throwable, OutboundInvocation invocation) { return THROWER.createException(throwable, invocation); } + + } diff --git a/joylive-plugin/joylive-router/joylive-router-grpc/src/main/java/com/jd/live/agent/plugin/router/gprc/definition/ClientInterceptorsDefinition.java b/joylive-plugin/joylive-router/joylive-router-grpc/src/main/java/com/jd/live/agent/plugin/router/gprc/definition/ChannelFactoryDefinition.java similarity index 74% rename from joylive-plugin/joylive-router/joylive-router-grpc/src/main/java/com/jd/live/agent/plugin/router/gprc/definition/ClientInterceptorsDefinition.java rename to joylive-plugin/joylive-router/joylive-router-grpc/src/main/java/com/jd/live/agent/plugin/router/gprc/definition/ChannelFactoryDefinition.java index c6f8b9085..2ac11a5c9 100644 --- a/joylive-plugin/joylive-router/joylive-router-grpc/src/main/java/com/jd/live/agent/plugin/router/gprc/definition/ClientInterceptorsDefinition.java +++ b/joylive-plugin/joylive-router/joylive-router-grpc/src/main/java/com/jd/live/agent/plugin/router/gprc/definition/ChannelFactoryDefinition.java @@ -26,32 +26,33 @@ import com.jd.live.agent.core.plugin.definition.PluginDefinitionAdapter; import com.jd.live.agent.governance.invoke.InvocationContext; import com.jd.live.agent.plugin.router.gprc.condition.ConditionalOnGrpcGovernanceEnabled; -import com.jd.live.agent.plugin.router.gprc.interceptor.ClientInterceptorsInterceptor; +import com.jd.live.agent.plugin.router.gprc.interceptor.ChannelFactoryInterceptor; @Injectable -@Extension(value = "ClientInterceptorsDefinition", order = PluginDefinition.ORDER_ROUTER) +@Extension(value = "ChannelFactoryDefinition", order = PluginDefinition.ORDER_ROUTER) @ConditionalOnGrpcGovernanceEnabled -@ConditionalOnClass(ClientInterceptorsDefinition.TYPE) -public class ClientInterceptorsDefinition extends PluginDefinitionAdapter { +@ConditionalOnClass(ChannelFactoryDefinition.TYPE) +public class ChannelFactoryDefinition extends PluginDefinitionAdapter { - public static final String TYPE = "io.grpc.ClientInterceptors"; + public static final String TYPE = "net.devh.boot.grpc.client.channelfactory.AbstractChannelFactory"; - private static final String METHOD = "intercept"; + private static final String METHOD = "createChannel"; private static final String[] ARGUMENTS = new String[]{ - "io.grpc.Channel", - "java.util.List" + "java.lang.String", + "java.util.List", + "boolean" }; @Inject(InvocationContext.COMPONENT_INVOCATION_CONTEXT) private InvocationContext context; - public ClientInterceptorsDefinition() { + public ChannelFactoryDefinition() { this.matcher = () -> MatcherBuilder.named(TYPE); this.interceptors = new InterceptorDefinition[]{ new InterceptorDefinitionAdapter( MatcherBuilder.named(METHOD).and(MatcherBuilder.arguments(ARGUMENTS)), - () -> new ClientInterceptorsInterceptor(context)) + () -> new ChannelFactoryInterceptor(context)) }; } diff --git a/joylive-plugin/joylive-router/joylive-router-grpc/src/main/java/com/jd/live/agent/plugin/router/gprc/definition/LoadbalancerDefinition.java b/joylive-plugin/joylive-router/joylive-router-grpc/src/main/java/com/jd/live/agent/plugin/router/gprc/definition/LoadbalancerDefinition.java index 65d4367b4..474b118bb 100644 --- a/joylive-plugin/joylive-router/joylive-router-grpc/src/main/java/com/jd/live/agent/plugin/router/gprc/definition/LoadbalancerDefinition.java +++ b/joylive-plugin/joylive-router/joylive-router-grpc/src/main/java/com/jd/live/agent/plugin/router/gprc/definition/LoadbalancerDefinition.java @@ -18,13 +18,11 @@ import com.jd.live.agent.core.bytekit.matcher.MatcherBuilder; import com.jd.live.agent.core.extension.annotation.ConditionalOnClass; import com.jd.live.agent.core.extension.annotation.Extension; -import com.jd.live.agent.core.inject.annotation.Inject; import com.jd.live.agent.core.inject.annotation.Injectable; import com.jd.live.agent.core.plugin.definition.InterceptorDefinition; import com.jd.live.agent.core.plugin.definition.InterceptorDefinitionAdapter; import com.jd.live.agent.core.plugin.definition.PluginDefinition; import com.jd.live.agent.core.plugin.definition.PluginDefinitionAdapter; -import com.jd.live.agent.core.util.time.Timer; import com.jd.live.agent.plugin.router.gprc.condition.ConditionalOnGrpcGovernanceEnabled; import com.jd.live.agent.plugin.router.gprc.interceptor.LoadbalancerInterceptor; @@ -42,15 +40,12 @@ public class LoadbalancerDefinition extends PluginDefinitionAdapter { "java.lang.String" }; - @Inject(Timer.COMPONENT_TIMER) - private Timer timer; - public LoadbalancerDefinition() { this.matcher = () -> MatcherBuilder.named(TYPE); this.interceptors = new InterceptorDefinition[]{ new InterceptorDefinitionAdapter( MatcherBuilder.named(METHOD).and(MatcherBuilder.arguments(ARGUMENTS)), - () -> new LoadbalancerInterceptor(timer)) + LoadbalancerInterceptor::new) }; } diff --git a/joylive-plugin/joylive-router/joylive-router-grpc/src/main/java/com/jd/live/agent/plugin/router/gprc/exception/GrpcException.java b/joylive-plugin/joylive-router/joylive-router-grpc/src/main/java/com/jd/live/agent/plugin/router/gprc/exception/GrpcException.java new file mode 100644 index 000000000..89911b175 --- /dev/null +++ b/joylive-plugin/joylive-router/joylive-router-grpc/src/main/java/com/jd/live/agent/plugin/router/gprc/exception/GrpcException.java @@ -0,0 +1,69 @@ +/* + * Copyright © ${year} ${owner} (${email}) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.jd.live.agent.plugin.router.gprc.exception; + +import com.jd.live.agent.core.exception.WrappedException; +import io.grpc.Metadata; +import io.grpc.Status; + +/** + * GrpcException + * + * @see GrpcException + */ +public abstract class GrpcException extends RuntimeException implements WrappedException { + + public GrpcException(Throwable cause) { + super(cause.getMessage(), cause, false, false); + } + + /** + * GrpcClientException + */ + public static class GrpcClientException extends GrpcException { + + public GrpcClientException(Throwable cause) { + super(cause); + } + + } + + /** + * GrpcException + * + * @see GrpcServerException + */ + public static class GrpcServerException extends GrpcException { + + private final Status status; + + private final Metadata trailers; + + public GrpcServerException(Throwable cause, Status status, Metadata trailers) { + super(cause); + this.status = status; + this.trailers = trailers; + } + + public Status getStatus() { + return status; + } + + public Metadata getTrailers() { + return trailers; + } + } +} diff --git a/joylive-plugin/joylive-router/joylive-router-grpc/src/main/java/com/jd/live/agent/plugin/router/gprc/exception/GrpcOutboundThrower.java b/joylive-plugin/joylive-router/joylive-router-grpc/src/main/java/com/jd/live/agent/plugin/router/gprc/exception/GrpcOutboundThrower.java index 6c5599700..feb95ca2b 100644 --- a/joylive-plugin/joylive-router/joylive-router-grpc/src/main/java/com/jd/live/agent/plugin/router/gprc/exception/GrpcOutboundThrower.java +++ b/joylive-plugin/joylive-router/joylive-router-grpc/src/main/java/com/jd/live/agent/plugin/router/gprc/exception/GrpcOutboundThrower.java @@ -39,6 +39,14 @@ public class GrpcOutboundThrower extends AbstractOutboundThrower> KEYS = new ConcurrentHashMap<>(); - private final LiveSubchannel subchannel; + private final Subchannel subchannel; - private final InetSocketAddress address; + private InetSocketAddress socketAddress; - public GrpcEndpoint(LiveSubchannel subchannel) { + public GrpcEndpoint(Subchannel subchannel) { this.subchannel = subchannel; - this.address = subchannel.getAddress(); } @Override public String getHost() { - if (address != null) { - return address.getHostString(); - } - return null; + return socketAddress == null ? null : socketAddress.getHostString(); } @Override public int getPort() { - if (address != null) { - return address.getPort(); - } - return 0; + return socketAddress == null ? 0 : socketAddress.getPort(); } @Override @@ -51,8 +51,54 @@ public EndpointState getState() { return EndpointState.HEALTHY; } - public LiveSubchannel getSubchannel() { + public Subchannel getSubchannel() { return subchannel; } + public void requestConnection() { + subchannel.requestConnection(); + } + + public void shutdown() { + subchannel.shutdown(); + } + + public void start(LoadBalancer.SubchannelStateListener listener) { + subchannel.start(listener); + socketAddress = getInetSocketAddress(subchannel); + } + + /** + * Gets the current ConnectivityState. + * + * @return the current ConnectivityState, or IDLE if no state is set + */ + public ConnectivityState getConnectivityState() { + LiveRef ref = subchannel.getAttributes().get(LiveRef.KEY_STATE); + return ref == null ? IDLE : ref.getState(); + } + + /** + * Sets the ConnectivityState to the specified newState. + * + * @param newState the new ConnectivityState to set + */ + public void setConnectivityState(ConnectivityState newState) { + LiveRef ref = subchannel.getAttributes().get(LiveRef.KEY_STATE); + if (ref != null) { + ref.setState(newState); + } + } + + private static InetSocketAddress getInetSocketAddress(Subchannel subchannel) { + + List addresses = subchannel.getAllAddresses().get(0).getAddresses(); + for (SocketAddress addr : addresses) { + if (addr instanceof InetSocketAddress) { + return (InetSocketAddress) addr; + } + } + return null; + } + } diff --git a/joylive-plugin/joylive-router/joylive-router-grpc/src/main/java/com/jd/live/agent/plugin/router/gprc/interceptor/ChannelFactoryInterceptor.java b/joylive-plugin/joylive-router/joylive-router-grpc/src/main/java/com/jd/live/agent/plugin/router/gprc/interceptor/ChannelFactoryInterceptor.java new file mode 100644 index 000000000..cbcc20266 --- /dev/null +++ b/joylive-plugin/joylive-router/joylive-router-grpc/src/main/java/com/jd/live/agent/plugin/router/gprc/interceptor/ChannelFactoryInterceptor.java @@ -0,0 +1,147 @@ +/* + * Copyright © ${year} ${owner} (${email}) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.jd.live.agent.plugin.router.gprc.interceptor; + +import com.jd.live.agent.bootstrap.bytekit.context.ExecutableContext; +import com.jd.live.agent.bootstrap.bytekit.context.MethodContext; +import com.jd.live.agent.core.plugin.definition.InterceptorAdaptor; +import com.jd.live.agent.governance.invoke.InvocationContext; +import com.jd.live.agent.plugin.router.gprc.loadbalance.LiveRequest; +import io.grpc.*; +import io.grpc.ForwardingClientCall.SimpleForwardingClientCall; +import io.grpc.MethodDescriptor.MethodType; + +/** + * ClientInterceptorsInterceptor + */ +public class ChannelFactoryInterceptor extends InterceptorAdaptor { + + private final InvocationContext context; + + public ChannelFactoryInterceptor(InvocationContext context) { + this.context = context; + } + + @Override + public void onSuccess(ExecutableContext ctx) { + MethodContext mc = (MethodContext) ctx; + mc.setResult(new LiveChannel(context, mc.getResult())); + } + + /** + * A private static class that extends the Channel class to provide a live channel implementation. + */ + private static class LiveChannel extends Channel { + + private final InvocationContext context; + + private final Channel channel; + + LiveChannel(InvocationContext context, Channel channel) { + this.context = context; + this.channel = channel; + } + + @Override + public ClientCall newCall(MethodDescriptor method, CallOptions callOptions) { + if (method.getType() != MethodType.UNARY) { + // This is not a Unary RPC method + return channel.newCall(method, callOptions); + } + LiveRequest request = new LiveRequest<>(method, context); + CallOptions options = callOptions.withOption(LiveRequest.KEY_LIVE_REQUEST, request); + request.setCallOptions(options); + request.setClientCall(channel.newCall(method, options)); + request.setCallSupplier(() -> channel.newCall(method, options)); + return context.isFlowControlEnabled() ? new FlowControlClientCall<>(request) : new LiveClientCall<>(request); + } + + @Override + public String authority() { + return channel.authority(); + } + } + + /** + * A custom client call that extends SimpleForwardingClientCall to provide additional functionality + * such as handling headers, messages, and responses using LivePickerAdvice. + * + * @param The type of the request message. + * @param The type of the response message. + */ + private static class LiveClientCall extends SimpleForwardingClientCall { + + private final LiveRequest request; + + LiveClientCall(LiveRequest request) { + super(request.getClientCall()); + this.request = request; + } + + @Override + public void start(Listener responseListener, Metadata headers) { + request.setHeaders(headers); + super.start(responseListener, headers); + } + + @Override + public void sendMessage(ReqT message) { + request.setMessage(message); + super.sendMessage(message); + } + } + + /** + * A custom client call that extends SimpleForwardingClientCall to provide additional functionality + * such as handling headers, messages, and responses using LivePickerAdvice. + * + * @param The type of the request message. + * @param The type of the response message. + */ + private static class FlowControlClientCall extends SimpleForwardingClientCall { + + private final LiveRequest request; + + FlowControlClientCall(LiveRequest request) { + super(request.getClientCall()); + this.request = request; + } + + @Override + public void start(Listener responseListener, Metadata headers) { + request.setResponseListener(responseListener); + request.setHeaders(headers); + request.start(); + } + + @Override + public void setMessageCompression(boolean enabled) { + request.setMessageCompression(enabled); + super.setMessageCompression(enabled); + } + + @Override + public void request(int numMessages) { + request.setNumMessages(numMessages); + super.request(numMessages); + } + + @Override + public void sendMessage(ReqT message) { + request.sendMessage(message); + } + } +} diff --git a/joylive-plugin/joylive-router/joylive-router-grpc/src/main/java/com/jd/live/agent/plugin/router/gprc/interceptor/ClientInterceptorsInterceptor.java b/joylive-plugin/joylive-router/joylive-router-grpc/src/main/java/com/jd/live/agent/plugin/router/gprc/interceptor/ClientInterceptorsInterceptor.java deleted file mode 100644 index fcb2641be..000000000 --- a/joylive-plugin/joylive-router/joylive-router-grpc/src/main/java/com/jd/live/agent/plugin/router/gprc/interceptor/ClientInterceptorsInterceptor.java +++ /dev/null @@ -1,94 +0,0 @@ -/* - * Copyright © ${year} ${owner} (${email}) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.jd.live.agent.plugin.router.gprc.interceptor; - -import com.jd.live.agent.bootstrap.bytekit.context.ExecutableContext; -import com.jd.live.agent.bootstrap.bytekit.context.MethodContext; -import com.jd.live.agent.core.plugin.definition.InterceptorAdaptor; -import com.jd.live.agent.governance.invoke.InvocationContext; -import com.jd.live.agent.plugin.router.gprc.loadbalance.LivePickerAdvice; -import com.jd.live.agent.plugin.router.gprc.response.GrpcResponse.GrpcOutboundResponse; -import io.grpc.*; -import io.grpc.MethodDescriptor.MethodType; - -import java.util.concurrent.CompletableFuture; - -/** - * ClientInterceptorsInterceptor - */ -public class ClientInterceptorsInterceptor extends InterceptorAdaptor { - - private final InvocationContext context; - - public ClientInterceptorsInterceptor(InvocationContext context) { - this.context = context; - } - - @Override - public void onSuccess(ExecutableContext ctx) { - MethodContext mc = (MethodContext) ctx; - mc.setResult(new LiveChannel(context, mc.getResult())); - } - - /** - * A private static class that extends the Channel class to provide a live channel implementation. - */ - private static class LiveChannel extends Channel { - - private final InvocationContext context; - - private final Channel channel; - - LiveChannel(InvocationContext context, Channel channel) { - this.context = context; - this.channel = channel; - } - - @Override - public ClientCall newCall(MethodDescriptor method, CallOptions callOptions) { - if (method.getType() != MethodType.UNARY) { - // This is not a Unary RPC method - return channel.newCall(method, callOptions); - } - LivePickerAdvice advice = new LivePickerAdvice(method, context); - callOptions = callOptions.withOption(LivePickerAdvice.KEY_PICKER_ADVICE, advice); - ClientCall clientCall = channel.newCall(method, callOptions); - return new ForwardingClientCall.SimpleForwardingClientCall(clientCall) { - - private CompletableFuture future = new CompletableFuture<>(); - - @Override - public void start(Listener responseListener, Metadata headers) { - advice.setHeaders(headers); - // TODO wrap response listener to handle response & error - super.start(responseListener, headers); - } - - @Override - public void sendMessage(ReqT message) { - advice.setMessage(message); - // TODO wrap cluster to invoke & handle void response - super.sendMessage(message); - } - }; - } - - @Override - public String authority() { - return channel.authority(); - } - } -} diff --git a/joylive-plugin/joylive-router/joylive-router-grpc/src/main/java/com/jd/live/agent/plugin/router/gprc/interceptor/GrpcServerInterceptor.java b/joylive-plugin/joylive-router/joylive-router-grpc/src/main/java/com/jd/live/agent/plugin/router/gprc/interceptor/GrpcServerInterceptor.java index 3cdb9d6d4..fb76331d0 100644 --- a/joylive-plugin/joylive-router/joylive-router-grpc/src/main/java/com/jd/live/agent/plugin/router/gprc/interceptor/GrpcServerInterceptor.java +++ b/joylive-plugin/joylive-router/joylive-router-grpc/src/main/java/com/jd/live/agent/plugin/router/gprc/interceptor/GrpcServerInterceptor.java @@ -20,6 +20,7 @@ import com.jd.live.agent.governance.invoke.InvocationContext; import com.jd.live.agent.plugin.router.gprc.request.GrpcRequest.GrpcInboundRequest; import com.jd.live.agent.plugin.router.gprc.request.invoke.GrpcInvocation.GrpcInboundInvocation; +import io.grpc.ForwardingServerCallListener.SimpleForwardingServerCallListener; import io.grpc.*; import java.util.concurrent.Callable; @@ -48,43 +49,60 @@ public GrpcServerInterceptor(InvocationContext context) { @Override public void onSuccess(ExecutableContext ctx) { ServerBuilder target = (ServerBuilder) ctx.getTarget(); - target.intercept(new ServerInterceptor() { - @SuppressWarnings("unchecked") - @Override - public ServerCall.Listener interceptCall( - ServerCall call, - Metadata headers, - ServerCallHandler next) { + target.intercept(new LiveServerInterceptor(context)); + } + + /** + * An interceptor for handling server calls in a live server environment. + * + * This interceptor uses an {@link InvocationContext} to manage the invocation lifecycle. + * It intercepts server calls, processes incoming requests, and handles exceptions. + */ + private static class LiveServerInterceptor implements ServerInterceptor { + + private final InvocationContext context; + + LiveServerInterceptor(InvocationContext context) { + this.context = context; + } - Callable callable = () -> next.startCall(new ForwardingServerCall() { - @Override - protected ServerCall delegate() { - return call; - } + @SuppressWarnings("unchecked") + @Override + public ServerCall.Listener interceptCall( + ServerCall call, + Metadata headers, + ServerCallHandler next) { - @Override - public MethodDescriptor getMethodDescriptor() { - return call.getMethodDescriptor(); - } - }, headers); + Callable callable = () -> new LiveServerCallListener(next.startCall(call, headers)); - GrpcInboundRequest request = new GrpcInboundRequest(call, headers); - try { - Object result = !request.isSystem() - ? context.inward(new GrpcInboundInvocation(request, context), callable) - : callable.call(); - return (ServerCall.Listener) result; - } catch (Throwable e) { - Throwable throwable = THROWER.createException(e, request); - StatusRuntimeException t = throwable instanceof StatusRuntimeException - ? (StatusRuntimeException) throwable - : Status.INTERNAL.withDescription(e.getMessage()).asRuntimeException(new Metadata()); - call.close(t.getStatus(), t.getTrailers()); - return new ServerCall.Listener() { - // no-op - }; - } + GrpcInboundRequest request = new GrpcInboundRequest(call, headers); + try { + Object result = !request.isSystem() + ? context.inward(new GrpcInboundInvocation(request, context), callable) + : callable.call(); + return (ServerCall.Listener) result; + } catch (Throwable e) { + Throwable throwable = THROWER.createException(e, request); + StatusRuntimeException t = throwable instanceof StatusRuntimeException + ? (StatusRuntimeException) throwable + : Status.INTERNAL.withDescription(e.getMessage()).asRuntimeException(new Metadata()); + call.close(t.getStatus(), t.getTrailers()); + return new ServerCall.Listener() { + // no-op + }; } - }); + } + } + + /** + * A listener for handling server calls that extends {@link SimpleForwardingServerCallListener}. + * + * This listener is used to forward server call events to the delegate listener. + */ + private static class LiveServerCallListener extends SimpleForwardingServerCallListener { + + LiveServerCallListener(ServerCall.Listener delegate) { + super(delegate); + } } } diff --git a/joylive-plugin/joylive-router/joylive-router-grpc/src/main/java/com/jd/live/agent/plugin/router/gprc/interceptor/LoadbalancerInterceptor.java b/joylive-plugin/joylive-router/joylive-router-grpc/src/main/java/com/jd/live/agent/plugin/router/gprc/interceptor/LoadbalancerInterceptor.java index fb15883d0..cec20eec5 100644 --- a/joylive-plugin/joylive-router/joylive-router-grpc/src/main/java/com/jd/live/agent/plugin/router/gprc/interceptor/LoadbalancerInterceptor.java +++ b/joylive-plugin/joylive-router/joylive-router-grpc/src/main/java/com/jd/live/agent/plugin/router/gprc/interceptor/LoadbalancerInterceptor.java @@ -18,7 +18,6 @@ import com.jd.live.agent.bootstrap.bytekit.context.ExecutableContext; import com.jd.live.agent.bootstrap.bytekit.context.MethodContext; import com.jd.live.agent.core.plugin.definition.InterceptorAdaptor; -import com.jd.live.agent.core.util.time.Timer; import com.jd.live.agent.plugin.router.gprc.loadbalance.LiveLoadBalancerProvider; /** @@ -26,15 +25,9 @@ */ public class LoadbalancerInterceptor extends InterceptorAdaptor { - private final Timer timer; - - public LoadbalancerInterceptor(Timer timer) { - this.timer = timer; - } - @Override public void onEnter(ExecutableContext ctx) { MethodContext mc = (MethodContext) ctx; - mc.skipWithResult(new LiveLoadBalancerProvider(timer)); + mc.skipWithResult(new LiveLoadBalancerProvider()); } } diff --git a/joylive-plugin/joylive-router/joylive-router-grpc/src/main/java/com/jd/live/agent/plugin/router/gprc/loadbalance/LiveDiscovery.java b/joylive-plugin/joylive-router/joylive-router-grpc/src/main/java/com/jd/live/agent/plugin/router/gprc/loadbalance/LiveDiscovery.java index 70e1be0a0..d420ac578 100644 --- a/joylive-plugin/joylive-router/joylive-router-grpc/src/main/java/com/jd/live/agent/plugin/router/gprc/loadbalance/LiveDiscovery.java +++ b/joylive-plugin/joylive-router/joylive-router-grpc/src/main/java/com/jd/live/agent/plugin/router/gprc/loadbalance/LiveDiscovery.java @@ -15,50 +15,117 @@ */ package com.jd.live.agent.plugin.router.gprc.loadbalance; -import java.util.List; +import io.grpc.LoadBalancer.SubchannelPicker; + import java.util.Map; import java.util.concurrent.ConcurrentHashMap; /** - * A class that provides methods for discovering and managing subchannels for different services. + * A class that provides methods for discovering and managing picker for different services. */ public class LiveDiscovery { - /** - * A map of subchannels, where the key is the service name and the value is a list of subchannels for that service. - */ - private static final Map> SUB_CHANNELS = new ConcurrentHashMap<>(); + private static final Map PICKERS = new ConcurrentHashMap<>(); private static final Map SERVICES = new ConcurrentHashMap<>(); + private static final int INITIALIZE_TIMEOUT_MILLIS = 10000; + /** - * Returns the list of subchannels for the specified service. + * Retrieves the SubchannelPicker for the specified service. * - * @param service The name of the service for which to retrieve the subchannels. - * @return The list of subchannels for the specified service, or null if no subchannels are found. + * @param service The name of the service. + * @return The SubchannelPicker associated with the specified service. */ - public static List getSubchannel(String service) { - return SUB_CHANNELS.get(service); + public static SubchannelPicker getSubchannelPicker(String service) { + return getPicker(service).getPicker(); } /** - * Adds or updates the list of subchannels for the specified service. + * Sets the SubchannelPicker for the specified service. * - * @param service The name of the service for which to add or update the subchannels. - * @param subchannels The list of subchannels to add or update for the specified service. + * @param service The name of the service. + * @param picker The SubchannelPicker to be set for the specified service. */ - public static void putSubchannel(String service, List subchannels) { - SUB_CHANNELS.put(service, subchannels); + public static void setSubchannelPicker(String service, SubchannelPicker picker) { + getPicker(service).setPicker(picker); } + /** + * Retrieves the service name associated with the given interface name. + * + * @param interfaceName The name of the interface. + * @return The service name associated with the interface name, or the interface name itself if no service is found. + */ public static String getService(String interfaceName) { return interfaceName == null || interfaceName.isEmpty() ? null : SERVICES.getOrDefault(interfaceName, interfaceName); } + /** + * Associates a service name with the given interface name. + * + * @param interfaceName The name of the interface. + * @param service The name of the service to be associated with the interface name. + */ public static void putService(String interfaceName, String service) { if (interfaceName != null && !interfaceName.isEmpty() && service != null && !service.isEmpty()) { SERVICES.put(interfaceName, service); } } + private static Picker getPicker(String service) { + return PICKERS.computeIfAbsent(service, Picker::new); + } + + /** + * A class that manages a gRPC SubchannelPicker with thread-safe updates and retrieval. + *

+ * This class is responsible for maintaining a service identifier, an update time, and a SubchannelPicker. + * It provides methods to set and get the SubchannelPicker, ensuring thread safety and proper initialization. + */ + private static class Picker { + + private final String service; + + private volatile long updateTime; + + private volatile SubchannelPicker picker; + + private final Object mutex = new Object(); + + Picker(String service) { + this.service = service; + } + + public String getService() { + return service; + } + + public void setPicker(SubchannelPicker picker) { + this.picker = picker; + long lastUpdateTime = updateTime; + this.updateTime = System.currentTimeMillis(); + if (lastUpdateTime == 0) { + synchronized (mutex) { + mutex.notifyAll(); + } + } + } + + public SubchannelPicker getPicker() { + if (updateTime == 0) { + // wait for initialization + synchronized (mutex) { + if (updateTime == 0) { + try { + mutex.wait(INITIALIZE_TIMEOUT_MILLIS); + } catch (InterruptedException ignored) { + } + } + } + } + return picker; + } + } + } diff --git a/joylive-plugin/joylive-router/joylive-router-grpc/src/main/java/com/jd/live/agent/plugin/router/gprc/loadbalance/LiveLoadBalancer.java b/joylive-plugin/joylive-router/joylive-router-grpc/src/main/java/com/jd/live/agent/plugin/router/gprc/loadbalance/LiveLoadBalancer.java index 48ff40533..3e4838892 100644 --- a/joylive-plugin/joylive-router/joylive-router-grpc/src/main/java/com/jd/live/agent/plugin/router/gprc/loadbalance/LiveLoadBalancer.java +++ b/joylive-plugin/joylive-router/joylive-router-grpc/src/main/java/com/jd/live/agent/plugin/router/gprc/loadbalance/LiveLoadBalancer.java @@ -15,15 +15,12 @@ */ package com.jd.live.agent.plugin.router.gprc.loadbalance; -import com.jd.live.agent.core.util.time.Timer; +import com.jd.live.agent.plugin.router.gprc.instance.GrpcEndpoint; import io.grpc.*; import java.lang.reflect.Field; import java.util.*; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ThreadLocalRandom; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; import static io.grpc.ConnectivityState.*; @@ -38,19 +35,10 @@ public class LiveLoadBalancer extends LoadBalancer { private final String serviceName; - private final Timer timer; + private volatile Map endpoints = new ConcurrentHashMap<>(); - private volatile Map subchannels = new ConcurrentHashMap<>(); - - private final AtomicLong versions = new AtomicLong(0); - - private final AtomicBoolean taskAdded = new AtomicBoolean(false); - - private final AtomicBoolean initialized = new AtomicBoolean(false); - - public LiveLoadBalancer(Helper helper, Timer timer) { + public LiveLoadBalancer(Helper helper) { this.helper = helper; - this.timer = timer; this.serviceName = getServiceName(helper); } @@ -58,37 +46,36 @@ public LiveLoadBalancer(Helper helper, Timer timer) { public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) { Set latestAddresses = deDup(resolvedAddresses); - List removed = new ArrayList<>(); - List added = new ArrayList<>(); + List removed = new ArrayList<>(); + List added = new ArrayList<>(); - Map oldSubchannels = subchannels; - Map newSubchannels = new ConcurrentHashMap<>(); + Map olds = endpoints; + Map news = new ConcurrentHashMap<>(); latestAddresses.forEach(addressGroup -> { - LiveSubchannel subchannel = oldSubchannels.get(addressGroup); - if (subchannel == null) { + GrpcEndpoint endpoint = olds.get(addressGroup); + if (endpoint == null) { // create new connection - subchannel = createSubchannel(addressGroup); - added.add(subchannel); + endpoint = createEndpoint(addressGroup); + added.add(endpoint); } - newSubchannels.put(addressGroup, subchannel); + news.put(addressGroup, endpoint); }); - subchannels = newSubchannels; - oldSubchannels.forEach((addressGroup, subchannel) -> { + endpoints = news; + olds.forEach((addressGroup, endpoint) -> { if (!latestAddresses.contains(addressGroup)) { - removed.remove(subchannel); + removed.remove(endpoint); } }); // close not exists if (!removed.isEmpty()) { - addTask(); - removed.forEach(subchannel -> { - subchannel.setConnectivityState(SHUTDOWN); - subchannel.shutdown(); + removed.forEach(endpoint -> { + endpoint.setConnectivityState(SHUTDOWN); + endpoint.shutdown(); }); } // create new connection if (!added.isEmpty()) { - added.forEach(LiveSubchannel::requestConnection); + added.forEach(GrpcEndpoint::requestConnection); } } @@ -99,7 +86,7 @@ public void handleNameResolutionError(Status error) { @Override public void shutdown() { - subchannels.values().forEach(LiveSubchannel::shutdown); + endpoints.values().forEach(GrpcEndpoint::shutdown); } /** @@ -143,86 +130,61 @@ private Set deDup(ResolvedAddresses resolvedAddresses) { } /** - * Creates a new Subchannel for the given EquivalentAddressGroup. + * Creates a new GrpcEndpoint for the given EquivalentAddressGroup. * - * @param addressGroup the EquivalentAddressGroup to create the Subchannel for - * @return the newly created Subchannel + * @param addressGroup the EquivalentAddressGroup to create the GrpcEndpoint for + * @return the newly created GrpcEndpoint */ - private LiveSubchannel createSubchannel(EquivalentAddressGroup addressGroup) { - Attributes attributes = addressGroup.getAttributes().toBuilder().set(LiveRef.KEY_STATE, new LiveRef<>(IDLE)).build(); + private GrpcEndpoint createEndpoint(EquivalentAddressGroup addressGroup) { + LiveRef ref = new LiveRef(); + Attributes attributes = addressGroup.getAttributes().toBuilder().set(LiveRef.KEY_STATE, ref).build(); CreateSubchannelArgs args = CreateSubchannelArgs.newBuilder().setAddresses(addressGroup).setAttributes(attributes).build(); - LiveSubchannel subchannel = new LiveSubchannel(helper.createSubchannel(args)); - subchannel.start(new LiveSubchannelStateListener(subchannel)); - return subchannel; - } - - /** - * Adds a task to update the ready instances and schedules it for execution. - */ - private void addTask() { - versions.incrementAndGet(); - submitTask(); + GrpcEndpoint endpoint = new GrpcEndpoint(helper.createSubchannel(args)); + ref.setState(IDLE); + ref.setEndpoint(endpoint); + endpoint.start(new LiveStateListener(endpoint)); + return endpoint; } /** - * Submits a task to update the ready instances and schedules it for execution. - * - * @see #addTask() - */ - private void submitTask() { - if (taskAdded.compareAndSet(false, true)) { - String name = "update-ready-instance-" + serviceName; - int interval = 1000 + ThreadLocalRandom.current().nextInt(2000); - timer.add(name, interval, () -> { - long version = versions.get(); - pickReady(); - taskAdded.set(false); - if (versions.get() != version) { - submitTask(); - } - }); - } - } - - /** - * Picks the ready Subchannels and updates the balancing state accordingly. + * Picks the ready GrpcEndpoints and updates the balancing state accordingly. */ private void pickReady() { - List readies = new ArrayList<>(); - subchannels.values().forEach(subchannel -> { - if (subchannel.getConnectivityState() == ConnectivityState.READY) { - readies.add(subchannel); + List readies = new ArrayList<>(); + endpoints.values().forEach(endpoint -> { + if (endpoint.getConnectivityState() == ConnectivityState.READY) { + readies.add(endpoint); } }); - LiveDiscovery.putSubchannel(serviceName, readies); if (readies.isEmpty()) { - helper.updateBalancingState(ConnectivityState.CONNECTING, new LiveSubchannelPicker(PickResult.withNoResult())); + LiveSubchannelPicker picker = new LiveSubchannelPicker(PickResult.withNoResult()); + helper.updateBalancingState(ConnectivityState.CONNECTING, picker); + LiveDiscovery.setSubchannelPicker(serviceName, picker); } else { - helper.updateBalancingState(ConnectivityState.READY, new LiveSubchannelPicker(readies)); + LiveSubchannelPicker picker = new LiveSubchannelPicker(readies); + helper.updateBalancingState(ConnectivityState.READY, picker); + LiveDiscovery.setSubchannelPicker(serviceName, picker); } } - private class LiveSubchannelStateListener implements SubchannelStateListener { + private class LiveStateListener implements SubchannelStateListener { - private final LiveSubchannel subchannel; + private final GrpcEndpoint endpoint; - LiveSubchannelStateListener(LiveSubchannel subchannel) { - this.subchannel = subchannel; + LiveStateListener(GrpcEndpoint endpoint) { + this.endpoint = endpoint; } @Override public void onSubchannelState(ConnectivityStateInfo stateInfo) { - ConnectivityState currentState = subchannel.getConnectivityState(); + ConnectivityState currentState = endpoint.getConnectivityState(); ConnectivityState newState = stateInfo.getState(); - subchannel.setConnectivityState(newState); + endpoint.setConnectivityState(newState); if (currentState == READY && newState != READY || currentState != READY && newState == READY) { - if (initialized.compareAndSet(false, true)) { - pickReady(); - } else { - // Asynchronous processing, preventing massive up and down - addTask(); - } + // call helper.updateBalancingState in this thread to avoid exception + // syncContext.throwIfNotInThisSynchronizationContext() + pickReady(); } } } diff --git a/joylive-plugin/joylive-router/joylive-router-grpc/src/main/java/com/jd/live/agent/plugin/router/gprc/loadbalance/LiveLoadBalancerProvider.java b/joylive-plugin/joylive-router/joylive-router-grpc/src/main/java/com/jd/live/agent/plugin/router/gprc/loadbalance/LiveLoadBalancerProvider.java index 7a10d6ddc..64f763f50 100644 --- a/joylive-plugin/joylive-router/joylive-router-grpc/src/main/java/com/jd/live/agent/plugin/router/gprc/loadbalance/LiveLoadBalancerProvider.java +++ b/joylive-plugin/joylive-router/joylive-router-grpc/src/main/java/com/jd/live/agent/plugin/router/gprc/loadbalance/LiveLoadBalancerProvider.java @@ -15,7 +15,6 @@ */ package com.jd.live.agent.plugin.router.gprc.loadbalance; -import com.jd.live.agent.core.util.time.Timer; import io.grpc.LoadBalancer; import io.grpc.LoadBalancerProvider; @@ -24,12 +23,6 @@ */ public class LiveLoadBalancerProvider extends LoadBalancerProvider { - private final Timer timer; - - public LiveLoadBalancerProvider(Timer timer) { - this.timer = timer; - } - @Override public boolean isAvailable() { return true; @@ -47,6 +40,6 @@ public String getPolicyName() { @Override public LoadBalancer newLoadBalancer(LoadBalancer.Helper helper) { - return new LiveLoadBalancer(helper, timer); + return new LiveLoadBalancer(helper); } } diff --git a/joylive-plugin/joylive-router/joylive-router-grpc/src/main/java/com/jd/live/agent/plugin/router/gprc/loadbalance/LivePickerAdvice.java b/joylive-plugin/joylive-router/joylive-router-grpc/src/main/java/com/jd/live/agent/plugin/router/gprc/loadbalance/LivePickerAdvice.java deleted file mode 100644 index fa50c9614..000000000 --- a/joylive-plugin/joylive-router/joylive-router-grpc/src/main/java/com/jd/live/agent/plugin/router/gprc/loadbalance/LivePickerAdvice.java +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Copyright © ${year} ${owner} (${email}) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.jd.live.agent.plugin.router.gprc.loadbalance; - -import com.jd.live.agent.governance.invoke.InvocationContext; -import com.jd.live.agent.plugin.router.gprc.instance.GrpcEndpoint; -import com.jd.live.agent.plugin.router.gprc.request.GrpcRequest.GrpcOutboundRequest; -import com.jd.live.agent.plugin.router.gprc.request.invoke.GrpcInvocation.GrpcOutboundInvocation; -import io.grpc.CallOptions; -import io.grpc.Metadata; -import io.grpc.MethodDescriptor; - -import java.util.List; - -import static com.jd.live.agent.core.util.CollectionUtils.convert; - -/** - * Represents advice for live picker, which includes a subchannel and an election function to determine the active subchannel. - */ -public class LivePickerAdvice { - - public static final CallOptions.Key KEY_PICKER_ADVICE = CallOptions.Key.create("x-picker-advice"); - - private final MethodDescriptor method; - - private final InvocationContext context; - - private Object message; - - private Metadata headers; - - private LiveSubchannel subchannel; - - public LivePickerAdvice(MethodDescriptor method, InvocationContext context) { - this.method = method; - this.context = context; - } - - public void setHeaders(Metadata headers) { - this.headers = headers; - } - - public void setMessage(Object message) { - this.message = message; - } - - public void setSubchannel(LiveSubchannel subchannel) { - this.subchannel = subchannel; - } - - public LiveSubchannel elect(List subchannels) { - if (subchannel != null) { - return subchannel; - } - String serviceName = LiveDiscovery.getService(method.getServiceName()); - GrpcOutboundRequest request = new GrpcOutboundRequest(message, headers, method, serviceName); - GrpcOutboundInvocation invocation = new GrpcOutboundInvocation(request, context); - GrpcEndpoint endpoint = context.route(invocation, convert(subchannels, GrpcEndpoint::new)); - return endpoint.getSubchannel(); - } - -} diff --git a/joylive-plugin/joylive-router/joylive-router-grpc/src/main/java/com/jd/live/agent/plugin/router/gprc/loadbalance/LiveRef.java b/joylive-plugin/joylive-router/joylive-router-grpc/src/main/java/com/jd/live/agent/plugin/router/gprc/loadbalance/LiveRef.java index de2ee09b3..dbb37bc6c 100644 --- a/joylive-plugin/joylive-router/joylive-router-grpc/src/main/java/com/jd/live/agent/plugin/router/gprc/loadbalance/LiveRef.java +++ b/joylive-plugin/joylive-router/joylive-router-grpc/src/main/java/com/jd/live/agent/plugin/router/gprc/loadbalance/LiveRef.java @@ -15,24 +15,37 @@ */ package com.jd.live.agent.plugin.router.gprc.loadbalance; +import com.jd.live.agent.plugin.router.gprc.instance.GrpcEndpoint; import io.grpc.Attributes.Key; import io.grpc.ConnectivityState; -final class LiveRef { +/** + * Represents a live reference to a gRPC endpoint with its connectivity state. + * + * This class contains a gRPC endpoint and its current connectivity state, providing methods to get and set these values. + */ +public final class LiveRef { + + public static final Key KEY_STATE = Key.create("x-state"); - public static final Key> KEY_STATE = Key.create("x-state"); + private GrpcEndpoint endpoint; - T value; + private ConnectivityState state; - LiveRef(T value) { - this.value = value; + public GrpcEndpoint getEndpoint() { + return endpoint; } - public void setValue(T value) { - this.value = value; + public void setEndpoint(GrpcEndpoint endpoint) { + this.endpoint = endpoint; } - public T getValue() { - return value; + public ConnectivityState getState() { + return state; } + + public void setState(ConnectivityState state) { + this.state = state; + } + } diff --git a/joylive-plugin/joylive-router/joylive-router-grpc/src/main/java/com/jd/live/agent/plugin/router/gprc/loadbalance/LiveRequest.java b/joylive-plugin/joylive-router/joylive-router-grpc/src/main/java/com/jd/live/agent/plugin/router/gprc/loadbalance/LiveRequest.java new file mode 100644 index 000000000..e7683112a --- /dev/null +++ b/joylive-plugin/joylive-router/joylive-router-grpc/src/main/java/com/jd/live/agent/plugin/router/gprc/loadbalance/LiveRequest.java @@ -0,0 +1,455 @@ +/* + * Copyright © ${year} ${owner} (${email}) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.jd.live.agent.plugin.router.gprc.loadbalance; + +import com.google.protobuf.Message; +import com.google.protobuf.util.JsonFormat; +import com.jd.live.agent.bootstrap.exception.RejectException.RejectNoProviderException; +import com.jd.live.agent.governance.exception.ServiceError; +import com.jd.live.agent.governance.invoke.InvocationContext; +import com.jd.live.agent.plugin.router.gprc.cluster.GrpcCluster; +import com.jd.live.agent.plugin.router.gprc.exception.GrpcException.GrpcServerException; +import com.jd.live.agent.plugin.router.gprc.instance.GrpcEndpoint; +import com.jd.live.agent.plugin.router.gprc.request.GrpcRequest.GrpcOutboundRequest; +import com.jd.live.agent.plugin.router.gprc.request.invoke.GrpcInvocation.GrpcOutboundInvocation; +import com.jd.live.agent.plugin.router.gprc.response.GrpcResponse.GrpcOutboundResponse; +import io.grpc.*; +import io.grpc.LoadBalancer.PickResult; +import io.grpc.LoadBalancer.PickSubchannelArgs; +import io.grpc.LoadBalancer.Subchannel; + +import java.io.StringReader; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.BiConsumer; +import java.util.function.Supplier; + +/** + * Live request. + */ +public class LiveRequest extends PickSubchannelArgs { + + public static final CallOptions.Key> KEY_LIVE_REQUEST = CallOptions.Key.create("x-live-request"); + + private static final Map, Optional> METHODS = new ConcurrentHashMap<>(); + + private static final String METHOD_NEW_BUILDER = "newBuilder"; + + private final MethodDescriptor methodDescriptor; + + private final InvocationContext context; + + private CallOptions callOptions; + + private Supplier> callSupplier; + + private Metadata headers; + + private ClientCall.Listener responseListener; + + private int numMessages; + + private Boolean messageCompression; + + private ReqT message; + + private ClientCall clientCall; + + private CompletableFuture future; + + private final GrpcOutboundRequest request; + + private final GrpcOutboundInvocation invocation; + + private LiveRouteResult routeResult; + + private int counter; + + public LiveRequest(MethodDescriptor methodDescriptor, InvocationContext context) { + this.methodDescriptor = methodDescriptor; + this.context = context; + this.request = new GrpcOutboundRequest(this); + this.invocation = new GrpcOutboundInvocation(request, context); + this.future = new CompletableFuture<>(); + } + + public InvocationContext getContext() { + return context; + } + + public MethodDescriptor getMethodDescriptor() { + return methodDescriptor; + } + + public void setCallOptions(CallOptions callOptions) { + this.callOptions = callOptions; + } + + public void setCallSupplier(Supplier> callSupplier) { + this.callSupplier = callSupplier; + } + + public CallOptions getCallOptions() { + return callOptions; + } + + public String getMethodName() { + return methodDescriptor.getBareMethodName(); + } + + public Metadata getHeaders() { + return headers; + } + + public void setHeaders(Metadata headers) { + this.headers = headers; + } + + public void setResponseListener(ClientCall.Listener responseListener) { + this.responseListener = responseListener; + } + + public void setNumMessages(int numMessages) { + this.numMessages = numMessages; + } + + public void setMessageCompression(boolean messageCompression) { + this.messageCompression = messageCompression; + } + + public void setMessage(ReqT message) { + this.message = message; + } + + public String getPath() { + return methodDescriptor.getServiceName(); + } + + public ClientCall getClientCall() { + return clientCall; + } + + public void setClientCall(ClientCall clientCall) { + this.clientCall = clientCall; + } + + public LiveRouteResult getRouteResult() { + return routeResult; + } + + /** + * Resets the state for a retry attempt. + */ + public void onRetry() { + this.routeResult = null; + // cancel the last stream + clientCall.cancel("retry", null); + // recreate client call for retry + counter++; + future = new CompletableFuture<>(); + clientCall = callSupplier.get(); + if (messageCompression != null) { + clientCall.setMessageCompression(messageCompression); + } + start(); + clientCall.request(numMessages); + } + + /** + * Starts the client call with the specified response listener and metadata headers. + */ + public void start() { + clientCall.start(new LiveCallListener<>(responseListener, future), headers); + } + + /** + * Sends a message and handles the response or any exceptions that may occur. + * + * @param message The message to be sent. + */ + public void sendMessage(ReqT message) { + this.message = message; + GrpcCluster.INSTANCE.invoke(invocation).whenComplete(new LiveCompletion<>(clientCall, responseListener)); + } + + /** + * Sends a message using the client call and returns a CompletionStage for the response. + * + * @return A CompletionStage that will be completed with the GrpcOutboundResponse when the message is processed. + */ + public CompletionStage sendMessage() { + CompletableFuture result = future; + if (counter == 0) { + sendMessage(clientCall, result); + } else { + sendMessageOnRetry(clientCall, result); + } + return result; + } + + /** + * Sends a message using the specified client call and future. + * + * @param call The client call to use for sending the message. + * @param future The CompletableFuture to complete with the GrpcOutboundResponse. + */ + private void sendMessage(ClientCall call, CompletableFuture future) { + try { + call.sendMessage(message); + } catch (Throwable e) { + future.completeExceptionally(e); + } + } + + /** + * Sends a message using the specified client call and future. + * + * @param call The client call to use for sending the message. + * @param future The CompletableFuture to complete with the GrpcOutboundResponse. + */ + private void sendMessageOnRetry(ClientCall call, CompletableFuture future) { + try { + call.sendMessage(message); + call.halfClose(); + } catch (Throwable e) { + call.cancel(e.getMessage(), e); + future.completeExceptionally(e); + } + } + + /** + * Sets a header with the specified key and value. + * + * @param key The key of the header to be set. + * @param value The value of the header to be set. + */ + public void setHeader(String key, String value) { + if (headers != null && key != null && !key.isEmpty() && value != null && !value.isEmpty()) { + headers.put(Metadata.Key.of(key, Metadata.ASCII_STRING_MARSHALLER), value); + } + } + + /** + * Routes the request to an appropriate endpoint from the provided list of endpoints. + * + * @param endpoints The list of available endpoint. + */ + public void route(List endpoints) { + try { + GrpcEndpoint endpoint = context.route(invocation, endpoints); + routeResult = new LiveRouteResult(endpoint); + } catch (Throwable e) { + routeResult = new LiveRouteResult(e); + } + } + + /** + * Routes the request based on the provided pick result. + * + * @param pickResult The pick result containing the status and subchannel information. + */ + public void route(PickResult pickResult) { + Status status = pickResult.getStatus(); + if (status.isOk()) { + Subchannel subchannel = pickResult.getSubchannel(); + if (subchannel != null) { + LiveRef ref = subchannel.getAttributes().get(LiveRef.KEY_STATE); + routeResult = new LiveRouteResult(ref != null ? ref.getEndpoint() : new GrpcEndpoint(subchannel)); + } else { + routeResult = new LiveRouteResult(new RejectNoProviderException("There is no provider for invocation " + request.getService())); + } + } else { + routeResult = new LiveRouteResult(status.asRuntimeException()); + } + } + + /** + * Recovers from a degraded state by closing the client call. + * + * If the client call is not null, this method will half-close the client call. + */ + public void onRecover() { + // recover from degrade + clientCall.halfClose(); + } + + /** + * Parses a JSON string into a gRPC message. + *

+ * This method uses the 'newBuilder' method to create a new builder for the message type, then merges + * the provided JSON string into the builder using the {@link JsonFormat} parser. The resulting message + * is returned. + * + * @param json The JSON string to parse. + * @return The parsed gRPC message. + * @throws Throwable If any other exception occurs during the parsing process. + */ + public Object parse(String json) throws Throwable { + Method method = getNewBuilder(); + if (method == null) { + throw new NoSuchMethodException("method 'newBuilder' is not found in " + request.getPath()); + } else if (method.getReturnType() == Void.class) { + return numMessages; + } + try { + Message.Builder builder = (Message.Builder) method.invoke(null); + JsonFormat.parser().merge(new StringReader(json), builder); + return builder.build(); + } catch (InvocationTargetException e) { + throw e.getCause() != null ? e.getCause() : e; + } + } + + /** + * Retrieves the 'newBuilder' method. + * If the method is not found or an exception occurs, it returns null. + * + * @return The 'newBuilder' method if found, otherwise null. + */ + private Method getNewBuilder() { + Class type = methodDescriptor.getResponseMarshaller().getClass(); + Optional optional = METHODS.computeIfAbsent(type, k -> { + try { + String name = k.getName(); + int pos = name.lastIndexOf('$'); + name = name.substring(0, pos); + k = k.getClassLoader().loadClass(name); + Method method = k.getMethod(METHOD_NEW_BUILDER); + method.setAccessible(true); + return Optional.of(method); + } catch (Throwable e) { + return Optional.empty(); + } + }); + return optional.orElse(null); + } + + /** + * A listener for handling gRPC client call events. + * + * @param The type of the response message. + */ + private static class LiveCallListener extends ClientCall.Listener { + + private final ClientCall.Listener listener; + + private final CompletableFuture future; + + LiveCallListener(ClientCall.Listener listener, CompletableFuture future) { + this.listener = listener; + this.future = future; + } + + @Override + public void onMessage(RespT message) { + future.complete(new GrpcOutboundResponse(message)); + listener.onMessage(message); + } + + @Override + public void onHeaders(Metadata headers) { + // called before onMessage + listener.onHeaders(headers); + } + + @Override + public void onReady() { + listener.onReady(); + } + + @Override + public void onClose(Status status, Metadata trailers) { + if (!status.isOk()) { + // retry + GrpcServerException exception = new GrpcServerException(status.asRuntimeException(trailers), status, trailers); + GrpcOutboundResponse response = new GrpcOutboundResponse(new ServiceError(exception, true), null); + future.complete(response); + } else { + // Close when successful. + listener.onClose(status, trailers); + } + } + + } + + /** + * A BiConsumer for handling the completion of a gRPC outbound response. + * + * @param The type of the request message. + * @param The type of the response message. + */ + private static class LiveCompletion implements BiConsumer { + + private final ClientCall clientCall; + + private final ClientCall.Listener listener; + + LiveCompletion(ClientCall clientCall, ClientCall.Listener listener) { + this.clientCall = clientCall; + this.listener = listener; + } + + @Override + public void accept(GrpcOutboundResponse response, Throwable throwable) { + // Just handle the exceptions, the success response has already been handled in the listener. + ServiceError error = response == null ? null : response.getError(); + if (throwable != null) { + onException(throwable); + } else if (error != null) { + if (error.hasException()) { + onException(error.getThrowable()); + } else { + onException(error.getError()); + } + } + } + + /** + * Handles an exception by notifying the listener with the appropriate status and trailers. + * + * @param throwable The throwable that occurred during the gRPC call. + */ + private void onException(Throwable throwable) { + if (throwable instanceof GrpcServerException) { + GrpcServerException gse = (GrpcServerException) throwable; + listener.onClose(gse.getStatus(), gse.getTrailers()); + } else { + clientCall.cancel(throwable.getMessage(), throwable); + if (throwable instanceof StatusRuntimeException) { + StatusRuntimeException sre = (StatusRuntimeException) throwable; + listener.onClose(sre.getStatus(), sre.getTrailers()); + } else { + listener.onClose(Status.UNKNOWN.withDescription(throwable.getMessage()).withCause(throwable), null); + } + } + } + + /** + * Handles an exception by notifying the listener with a generic UNKNOWN status and the provided error description. + * + * @param error The error description. + */ + private void onException(String error) { + listener.onClose(Status.UNKNOWN.withDescription(error), null); + } + } +} diff --git a/joylive-plugin/joylive-router/joylive-router-grpc/src/main/java/com/jd/live/agent/plugin/router/gprc/loadbalance/LiveRouteResult.java b/joylive-plugin/joylive-router/joylive-router-grpc/src/main/java/com/jd/live/agent/plugin/router/gprc/loadbalance/LiveRouteResult.java new file mode 100644 index 000000000..821057c13 --- /dev/null +++ b/joylive-plugin/joylive-router/joylive-router-grpc/src/main/java/com/jd/live/agent/plugin/router/gprc/loadbalance/LiveRouteResult.java @@ -0,0 +1,56 @@ +/* + * Copyright © ${year} ${owner} (${email}) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.jd.live.agent.plugin.router.gprc.loadbalance; + +import com.jd.live.agent.governance.instance.Endpoint; +import com.jd.live.agent.plugin.router.gprc.instance.GrpcEndpoint; + +/** + * Represents the result of a routing operation. + *

+ * This class encapsulates either an {@link Endpoint} or a {@link Throwable} (or both) to indicate the outcome of a routing operation. + */ +public class LiveRouteResult { + + private final GrpcEndpoint endpoint; + + private final Throwable throwable; + + public LiveRouteResult(GrpcEndpoint endpoint) { + this(endpoint, null); + } + + public LiveRouteResult(Throwable throwable) { + this(null, throwable); + } + + public LiveRouteResult(GrpcEndpoint endpoint, Throwable throwable) { + this.endpoint = endpoint; + this.throwable = throwable; + } + + public GrpcEndpoint getEndpoint() { + return endpoint; + } + + public Throwable getThrowable() { + return throwable; + } + + public boolean isSuccess() { + return throwable == null; + } +} diff --git a/joylive-plugin/joylive-router/joylive-router-grpc/src/main/java/com/jd/live/agent/plugin/router/gprc/loadbalance/LiveSubchannel.java b/joylive-plugin/joylive-router/joylive-router-grpc/src/main/java/com/jd/live/agent/plugin/router/gprc/loadbalance/LiveSubchannel.java deleted file mode 100644 index 1db0e60cb..000000000 --- a/joylive-plugin/joylive-router/joylive-router-grpc/src/main/java/com/jd/live/agent/plugin/router/gprc/loadbalance/LiveSubchannel.java +++ /dev/null @@ -1,99 +0,0 @@ -/* - * Copyright © ${year} ${owner} (${email}) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.jd.live.agent.plugin.router.gprc.loadbalance; - -import io.grpc.Attributes; -import io.grpc.ConnectivityState; -import io.grpc.LoadBalancer.Subchannel; -import io.grpc.LoadBalancer.SubchannelStateListener; - -import java.net.InetSocketAddress; -import java.net.SocketAddress; -import java.util.List; - -import static io.grpc.ConnectivityState.IDLE; - -/** - * A class that wrap subchannel. - */ -public class LiveSubchannel { - - private final Subchannel subchannel; - - private InetSocketAddress address; - - public LiveSubchannel(Subchannel subchannel) { - this.subchannel = subchannel; - } - - public Subchannel getSubchannel() { - return subchannel; - } - - public InetSocketAddress getAddress() { - return address; - } - - public Attributes getAttributes() { - return subchannel.getAttributes(); - } - - public void requestConnection() { - subchannel.requestConnection(); - } - - public void shutdown() { - subchannel.shutdown(); - } - - public void start(SubchannelStateListener listener) { - subchannel.start(listener); - address = getInetSocketAddress(subchannel); - } - - /** - * Gets the current ConnectivityState. - * - * @return the current ConnectivityState, or IDLE if no state is set - */ - public ConnectivityState getConnectivityState() { - LiveRef ref = subchannel.getAttributes().get(LiveRef.KEY_STATE); - return ref == null ? IDLE : ref.getValue(); - } - - /** - * Sets the ConnectivityState to the specified newState. - * - * @param newState the new ConnectivityState to set - */ - public void setConnectivityState(ConnectivityState newState) { - LiveRef ref = subchannel.getAttributes().get(LiveRef.KEY_STATE); - if (ref != null) { - ref.setValue(newState); - } - } - - private static InetSocketAddress getInetSocketAddress(Subchannel subchannel) { - - List addresses = subchannel.getAllAddresses().get(0).getAddresses(); - for (SocketAddress addr : addresses) { - if (addr instanceof InetSocketAddress) { - return (InetSocketAddress) addr; - } - } - return null; - } -} diff --git a/joylive-plugin/joylive-router/joylive-router-grpc/src/main/java/com/jd/live/agent/plugin/router/gprc/loadbalance/LiveSubchannelPicker.java b/joylive-plugin/joylive-router/joylive-router-grpc/src/main/java/com/jd/live/agent/plugin/router/gprc/loadbalance/LiveSubchannelPicker.java index 530bee91e..20f4d193b 100644 --- a/joylive-plugin/joylive-router/joylive-router-grpc/src/main/java/com/jd/live/agent/plugin/router/gprc/loadbalance/LiveSubchannelPicker.java +++ b/joylive-plugin/joylive-router/joylive-router-grpc/src/main/java/com/jd/live/agent/plugin/router/gprc/loadbalance/LiveSubchannelPicker.java @@ -15,9 +15,8 @@ */ package com.jd.live.agent.plugin.router.gprc.loadbalance; -import com.jd.live.agent.bootstrap.logger.Logger; -import com.jd.live.agent.bootstrap.logger.LoggerFactory; import com.jd.live.agent.plugin.router.gprc.exception.GrpcStatus; +import com.jd.live.agent.plugin.router.gprc.instance.GrpcEndpoint; import io.grpc.LoadBalancer.PickResult; import io.grpc.LoadBalancer.PickSubchannelArgs; import io.grpc.LoadBalancer.SubchannelPicker; @@ -30,11 +29,9 @@ */ public class LiveSubchannelPicker extends SubchannelPicker { - private static final Logger logger = LoggerFactory.getLogger(LiveSubchannelPicker.class); - private final PickResult pickResult; - private final List subchannels; + private final List endpoints; private final AtomicLong counter = new AtomicLong(); @@ -42,42 +39,43 @@ public LiveSubchannelPicker(PickResult pickResult) { this(pickResult, null); } - public LiveSubchannelPicker(List subchannels) { - this(null, subchannels); + public LiveSubchannelPicker(List endpoints) { + this(null, endpoints); } - public LiveSubchannelPicker(PickResult pickResult, List subchannels) { + public LiveSubchannelPicker(PickResult pickResult, List endpoints) { this.pickResult = pickResult; - this.subchannels = subchannels; + this.endpoints = endpoints; } @Override public PickResult pickSubchannel(PickSubchannelArgs args) { - if (pickResult != null) { + LiveRequest request = args.getCallOptions().getOption(LiveRequest.KEY_LIVE_REQUEST); + if (pickResult != null && request != null) { + request.route(pickResult); return pickResult; - } else { - LivePickerAdvice advice = args.getCallOptions().getOption(LivePickerAdvice.KEY_PICKER_ADVICE); - LiveSubchannel subchannel = null; - if (advice != null) { - try { - subchannel = advice.elect(subchannels); - } catch (Throwable e) { - logger.error(e.getMessage(), e); - return PickResult.withError(GrpcStatus.createException(e)); - } - } - if (subchannel != null) { - return PickResult.withSubchannel(subchannel.getSubchannel()); + } else if (pickResult != null) { + return pickResult; + } else if (request != null) { + request.route(endpoints); + LiveRouteResult result = request.getRouteResult(); + if (result.isSuccess()) { + GrpcEndpoint endpoint = result.getEndpoint(); + return endpoint == null + ? PickResult.withNoResult() + : PickResult.withSubchannel(endpoint.getSubchannel()); } else { - long v = counter.getAndIncrement(); - if (v < 0) { - counter.set(0); - v = counter.getAndIncrement(); - } - int index = (int) (v % subchannels.size()); - subchannel = subchannels.get(index); - return PickResult.withSubchannel(subchannel.getSubchannel()); + return PickResult.withError(GrpcStatus.createException(result.getThrowable())); } + } else { + long v = counter.getAndIncrement(); + if (v < 0) { + counter.set(0); + v = counter.getAndIncrement(); + } + int index = (int) (v % endpoints.size()); + return PickResult.withSubchannel(endpoints.get(index).getSubchannel()); } + } } diff --git a/joylive-plugin/joylive-router/joylive-router-grpc/src/main/java/com/jd/live/agent/plugin/router/gprc/request/GrpcRequest.java b/joylive-plugin/joylive-router/joylive-router-grpc/src/main/java/com/jd/live/agent/plugin/router/gprc/request/GrpcRequest.java index 9e391353b..6e3776e72 100644 --- a/joylive-plugin/joylive-router/joylive-router-grpc/src/main/java/com/jd/live/agent/plugin/router/gprc/request/GrpcRequest.java +++ b/joylive-plugin/joylive-router/joylive-router-grpc/src/main/java/com/jd/live/agent/plugin/router/gprc/request/GrpcRequest.java @@ -15,8 +15,15 @@ */ package com.jd.live.agent.plugin.router.gprc.request; +import com.jd.live.agent.bootstrap.exception.RejectException.RejectNoProviderException; +import com.jd.live.agent.governance.instance.Endpoint; import com.jd.live.agent.governance.request.AbstractRpcRequest.AbstractRpcInboundRequest; import com.jd.live.agent.governance.request.AbstractRpcRequest.AbstractRpcOutboundRequest; +import com.jd.live.agent.governance.request.RoutedRequest; +import com.jd.live.agent.plugin.router.gprc.exception.GrpcException.GrpcClientException; +import com.jd.live.agent.plugin.router.gprc.loadbalance.LiveDiscovery; +import com.jd.live.agent.plugin.router.gprc.loadbalance.LiveRequest; +import com.jd.live.agent.plugin.router.gprc.loadbalance.LiveRouteResult; import io.grpc.Grpc; import io.grpc.Metadata; import io.grpc.MethodDescriptor; @@ -60,27 +67,38 @@ public boolean isNativeGroup() { /** * A nested class representing an outbound gRPC request. */ - class GrpcOutboundRequest extends AbstractRpcOutboundRequest implements GrpcRequest { + class GrpcOutboundRequest extends AbstractRpcOutboundRequest> implements GrpcRequest, RoutedRequest { - private final Metadata metadata; - - private final MethodDescriptor methodDescriptor; - - public GrpcOutboundRequest(Object message, Metadata metadata, MethodDescriptor methodDescriptor, String serviceName) { - super(message); - this.metadata = metadata; - this.methodDescriptor = methodDescriptor; - this.service = serviceName; - this.path = methodDescriptor.getServiceName(); - this.method = methodDescriptor.getBareMethodName(); + public GrpcOutboundRequest(LiveRequest request) { + super(request); + this.service = LiveDiscovery.getService(request.getPath()); + this.path = request.getPath(); + this.method = request.getMethodName(); } @Override public void setHeader(String key, String value) { - if (key != null && !key.isEmpty() && value != null && !value.isEmpty()) { - metadata.put(Metadata.Key.of(key, Metadata.ASCII_STRING_MARSHALLER), value); + request.setHeader(key, value); + } + + @SuppressWarnings("unchecked") + @Override + public E getEndpoint() { + LiveRouteResult result = request.getRouteResult(); + if (result == null) { + throw new RejectNoProviderException("There is no provider for invocation " + service); + } else if (result.isSuccess()) { + return (E) result.getEndpoint(); + } else if (result.getThrowable() instanceof RuntimeException) { + throw (RuntimeException) result.getThrowable(); + } else { + throw new GrpcClientException(result.getThrowable()); } } + public boolean hasEndpoint() { + LiveRouteResult result = request.getRouteResult(); + return result != null && result.isSuccess(); + } } } diff --git a/joylive-plugin/joylive-router/joylive-router-grpc/src/main/java/com/jd/live/agent/plugin/router/gprc/response/GrpcResponse.java b/joylive-plugin/joylive-router/joylive-router-grpc/src/main/java/com/jd/live/agent/plugin/router/gprc/response/GrpcResponse.java index 45bf8ae63..8977ed2d9 100644 --- a/joylive-plugin/joylive-router/joylive-router-grpc/src/main/java/com/jd/live/agent/plugin/router/gprc/response/GrpcResponse.java +++ b/joylive-plugin/joylive-router/joylive-router-grpc/src/main/java/com/jd/live/agent/plugin/router/gprc/response/GrpcResponse.java @@ -26,14 +26,14 @@ */ public interface GrpcResponse { - class GrpcOutboundResponse extends AbstractRpcOutboundResponse implements GrpcResponse { + class GrpcOutboundResponse extends AbstractRpcOutboundResponse implements GrpcResponse { - public GrpcOutboundResponse(T response, ServiceError error) { - super(response, error); + public GrpcOutboundResponse(Object response) { + super(response, null); } - public GrpcOutboundResponse(T response, ServiceError error, ErrorPredicate retryPredicate) { - super(response, error, retryPredicate); + public GrpcOutboundResponse(ServiceError error, ErrorPredicate retryPredicate) { + super(null, error, retryPredicate); } } } diff --git a/joylive-plugin/joylive-router/joylive-router-grpc/src/main/resources/META-INF/services/com.jd.live.agent.core.plugin.definition.PluginDefinition b/joylive-plugin/joylive-router/joylive-router-grpc/src/main/resources/META-INF/services/com.jd.live.agent.core.plugin.definition.PluginDefinition index ed0b54e70..f454aa3ea 100644 --- a/joylive-plugin/joylive-router/joylive-router-grpc/src/main/resources/META-INF/services/com.jd.live.agent.core.plugin.definition.PluginDefinition +++ b/joylive-plugin/joylive-router/joylive-router-grpc/src/main/resources/META-INF/services/com.jd.live.agent.core.plugin.definition.PluginDefinition @@ -1,5 +1,5 @@ com.jd.live.agent.plugin.router.gprc.definition.GrpcServerDefinition com.jd.live.agent.plugin.router.gprc.definition.LoadbalancerDefinition com.jd.live.agent.plugin.router.gprc.definition.NameResolverDefinition -com.jd.live.agent.plugin.router.gprc.definition.ClientInterceptorsDefinition +com.jd.live.agent.plugin.router.gprc.definition.ChannelFactoryDefinition com.jd.live.agent.plugin.router.gprc.definition.GrpcClientBeanDefinition \ No newline at end of file diff --git a/joylive-plugin/joylive-router/joylive-router-sofarpc/src/main/java/com/jd/live/agent/plugin/router/sofarpc/cluster/SofaRpcCluster.java b/joylive-plugin/joylive-router/joylive-router-sofarpc/src/main/java/com/jd/live/agent/plugin/router/sofarpc/cluster/SofaRpcCluster.java index bd9d59dc0..e8f2868cd 100644 --- a/joylive-plugin/joylive-router/joylive-router-sofarpc/src/main/java/com/jd/live/agent/plugin/router/sofarpc/cluster/SofaRpcCluster.java +++ b/joylive-plugin/joylive-router/joylive-router-sofarpc/src/main/java/com/jd/live/agent/plugin/router/sofarpc/cluster/SofaRpcCluster.java @@ -31,6 +31,8 @@ import com.jd.live.agent.core.util.type.ClassDesc; import com.jd.live.agent.core.util.type.ClassUtils; import com.jd.live.agent.core.util.type.FieldDesc; +import com.jd.live.agent.governance.exception.ErrorPredicate; +import com.jd.live.agent.governance.exception.ServiceError; import com.jd.live.agent.governance.invoke.OutboundInvocation; import com.jd.live.agent.governance.invoke.cluster.AbstractLiveCluster; import com.jd.live.agent.governance.invoke.cluster.ClusterInvoker; @@ -38,8 +40,6 @@ import com.jd.live.agent.governance.policy.service.circuitbreak.DegradeConfig; import com.jd.live.agent.governance.policy.service.cluster.ClusterPolicy; import com.jd.live.agent.governance.policy.service.cluster.RetryPolicy; -import com.jd.live.agent.governance.exception.ErrorPredicate; -import com.jd.live.agent.governance.exception.ServiceError; import com.jd.live.agent.plugin.router.sofarpc.exception.SofaRpcOutboundThrower; import com.jd.live.agent.plugin.router.sofarpc.instance.SofaRpcEndpoint; import com.jd.live.agent.plugin.router.sofarpc.request.SofaRpcRequest.GenericType; @@ -193,7 +193,7 @@ public Throwable createException(Throwable throwable, OutboundInvocation