Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

162-support-spring-cloud-grpc-outbound-grovernace #183

Merged
merged 10 commits into from
Dec 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
* Copyright © ${year} ${owner} (${email})
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.jd.live.agent.core.exception;

public interface WrappedException {
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,13 @@
*/
package com.jd.live.agent.core.util;

import com.jd.live.agent.core.exception.WrappedException;

import java.lang.reflect.InvocationTargetException;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.function.Predicate;

/**
Expand Down Expand Up @@ -81,4 +85,14 @@ public static Set<String> getExceptions(Throwable e, Predicate<Throwable> predic
return names;
}

/**
* Checks if the given throwable is not a wrapped exception.
*
* @param e The throwable to check.
* @return {@code true} if the throwable is not a wrapped exception, {@code false} otherwise.
*/
public static boolean isNoneWrapped(Throwable e) {
return !(e instanceof WrappedException || e instanceof InvocationTargetException || e instanceof ExecutionException);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.function.Function;
import java.util.function.Predicate;

import static com.jd.live.agent.core.util.ExceptionUtils.isNoneWrapped;
import static com.jd.live.agent.governance.exception.ErrorPolicy.containsException;

@AllArgsConstructor
Expand Down Expand Up @@ -109,7 +110,7 @@ public static ErrorCause cause(Throwable throwable,
Set<String> exceptions = new LinkedHashSet<>(8);
Throwable cause = null;
Throwable candiate = null;
Throwable t = throwable;
Throwable t = isNoneWrapped(throwable) ? throwable : throwable.getCause();
ErrorName errorName;
Predicate<Throwable> test = predicate == null ? null : predicate.getPredicate();
while (t != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ public GatewayRole getGateway() {
* Resets the state of the instance.
* This method is typically used to restore the instance to its initial state.
*/
public void reset() {
public void resetOnRetry() {

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ protected LiveParser createLiveParser() {
}

@Override
public void reset() {
public void resetOnRetry() {
listeners = null;
routeTarget = null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.jd.live.agent.governance.invoke.InvocationContext;
import com.jd.live.agent.governance.invoke.OutboundInvocation;
import com.jd.live.agent.governance.policy.service.cluster.ClusterPolicy;
import com.jd.live.agent.governance.request.RoutedRequest;
import com.jd.live.agent.governance.request.ServiceRequest.OutboundRequest;
import com.jd.live.agent.governance.response.ServiceResponse.OutboundResponse;

Expand Down Expand Up @@ -72,17 +73,14 @@ E extends Endpoint> CompletionStage<O> invoke(LiveCluster<R, O, E> cluster, Outb
InvocationContext context = invocation.getContext();
R request = invocation.getRequest();
List<? extends Endpoint> instances = invocation.getInstances();
if (counter > 0) {
invocation.reset();
}
CompletionStage<List<E>> discoveryStage = instances == null || instances.isEmpty() || counter > 0
? cluster.route(request)
: CompletableFuture.completedFuture((List<E>) instances);
discoveryStage.whenComplete((v, t) -> {
if (t == null) {
E endpoint = null;
try {
endpoint = context.route(invocation, v);
endpoint = request instanceof RoutedRequest ? ((RoutedRequest) request).getEndpoint() : context.route(invocation, v);
E instance = endpoint;
onStartRequest(cluster, request, endpoint);
CompletionStage<O> stage = context.outbound(invocation, endpoint, () -> cluster.invoke(request, instance));
Expand Down Expand Up @@ -193,8 +191,8 @@ E extends Endpoint> void onException(LiveCluster<R, O, E> cluster,
try {
invocation.onFailure(endpoint, cause);
if (error == null) {
// Request was handled successfully by degrade
cluster.onSuccess(response, request, endpoint);
// Request was recover successfully by degrade
cluster.onRecover(response, request, endpoint);
} else if (cause instanceof LiveException) {
// Request did not go off box
cluster.onDiscard(request);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

import com.jd.live.agent.bootstrap.exception.RejectException.RejectUnreadyException;
import com.jd.live.agent.bootstrap.exception.Unretryable;
import com.jd.live.agent.bootstrap.logger.Logger;
import com.jd.live.agent.bootstrap.logger.LoggerFactory;
import com.jd.live.agent.core.extension.annotation.Extension;
import com.jd.live.agent.core.inject.annotation.Inject;
import com.jd.live.agent.core.inject.annotation.Injectable;
Expand All @@ -29,7 +27,6 @@
import com.jd.live.agent.governance.exception.ServiceError;
import com.jd.live.agent.governance.instance.Endpoint;
import com.jd.live.agent.governance.invoke.OutboundInvocation;
import com.jd.live.agent.governance.invoke.filter.route.CircuitBreakerFilter;
import com.jd.live.agent.governance.policy.service.ServicePolicy;
import com.jd.live.agent.governance.policy.service.cluster.ClusterPolicy;
import com.jd.live.agent.governance.policy.service.cluster.RetryPolicy;
Expand Down Expand Up @@ -58,8 +55,6 @@
@Extension(value = ClusterInvoker.TYPE_FAILOVER, order = ClusterInvoker.ORDER_FAILOVER)
public class FailoverClusterInvoker extends AbstractClusterInvoker {

private static final Logger logger = LoggerFactory.getLogger(CircuitBreakerFilter.class);

@Inject
private Map<String, CodeParser> codeParsers;

Expand All @@ -78,9 +73,9 @@ E extends Endpoint> CompletionStage<O> execute(LiveCluster<R, O, E> cluster,
request.getAttributeIfAbsent(Request.KEY_ERROR_POLICY, k -> new HashSet<ErrorPolicy>()).add(retryPolicy);
}
RetryContext<R, O, E> retryContext = new RetryContext<>(codeParsers, retryPolicy, cluster);
Supplier<CompletionStage<O>> supplier = () -> invoke(cluster, invocation, retryContext.getCount());
Supplier<CompletionStage<O>> supplier = () -> invoke(cluster, invocation, retryContext.getAndIncrement());
cluster.onStart(request);
return retryContext.execute(request, supplier).exceptionally(e ->
return retryContext.execute(invocation, supplier).exceptionally(e ->
cluster.createResponse(
cluster.createException(e, invocation),
request,
Expand Down Expand Up @@ -149,45 +144,44 @@ private static class RetryContext<R extends OutboundRequest,
* retried until the policy's conditions are no longer met.
* </p>
*
* @param request The request that was being processed.
* @param supplier A supplier providing the operation to be executed as a {@link CompletionStage}.
* @param invocation The {@link OutboundInvocation} representing the specific request and its routing information.
* @param supplier A supplier providing the operation to be executed as a {@link CompletionStage}.
* @return A {@link CompletionStage} representing the eventual completion of the operation,
* either successfully or with an error.
*/
public CompletionStage<O> execute(R request, Supplier<CompletionStage<O>> supplier) {
public CompletionStage<O> execute(OutboundInvocation<R> invocation, Supplier<CompletionStage<O>> supplier) {
CompletableFuture<O> result = new CompletableFuture<>();
doExecute(request, supplier, result);
doExecute(invocation, supplier, result);
return result;

}

/**
* Recursively executes the operation, applying retry logic and completing the future
* based on the outcome of each attempt.
* <p>
* This method is called internally to handle the actual execution and retry logic.
* It uses the {@link RetryPolicy} to determine whether an operation should be retried
* in case of failure or certain response conditions.
* </p>
*
* @param request The request that was being processed.
* @param supplier A supplier providing the operation to be executed.
* @param future The {@link CompletableFuture} to be completed with the operation's result.
* @param invocation The {@link OutboundInvocation} representing the specific request and its routing information.
* @param supplier A supplier providing the operation to be executed.
* @param future The {@link CompletableFuture} to be completed with the operation's result.
*/
private void doExecute(R request, Supplier<CompletionStage<O>> supplier, CompletableFuture<O> future) {
int count = counter.getAndIncrement();
cluster.onRetry(count);
private void doExecute(OutboundInvocation<R> invocation, Supplier<CompletionStage<O>> supplier, CompletableFuture<O> future) {
int count = counter.get();
R request = invocation.getRequest();
if (count > 0) {
invocation.resetOnRetry();
}
cluster.onRetry(request, count);
CompletionStage<O> stage = supplier.get();
stage.whenComplete((v, e) -> {
ServiceError se = v.getError();
ServiceError se = v == null ? null : v.getError();
Throwable throwable = se == null ? e : se.getThrowable();
switch (isRetryable(request, v, e, count)) {
case RETRY:
Throwable unreadyException = cluster.isDestroyed() ? cluster.createException(new RejectUnreadyException(), request) : null;
if (unreadyException != null) {
future.completeExceptionally(unreadyException);
} else {
doExecute(request, supplier, future);
doExecute(invocation, supplier, future);
}
break;
case EXHAUSTED:
Expand All @@ -206,8 +200,8 @@ private void doExecute(R request, Supplier<CompletionStage<O>> supplier, Complet
});
}

private int getCount() {
return counter.get();
private int getAndIncrement() {
return counter.getAndIncrement();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,10 +193,11 @@ default void onDiscard(R request) {
/**
* Handles retry logic for a process or operation, providing a hook for custom actions on each retry attempt.
*
* @param request The request object that is about to be sent
* @param retries The number of retry attempts that have occurred so far. This count is 1-based, meaning that
* the first retry attempt will pass a value of 1.
*/
default void onRetry(int retries) {
default void onRetry(R request, int retries) {

}

Expand Down Expand Up @@ -236,5 +237,16 @@ default void onSuccess(O response, R request, E endpoint) {

}

/**
* Handles the recovery of a request by delegating to the onSuccess method.
*
* @param response The response object.
* @param request The request object that was recovered.
* @param endpoint The endpoint associated with the request.
*/
default void onRecover(O response, R request, E endpoint) {
onSuccess(response, request, endpoint);
}

}

Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright © ${year} ${owner} (${email})
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.jd.live.agent.governance.request;

import com.jd.live.agent.governance.instance.Endpoint;

/**
* An interface representing a request that has already been routed to an appropriate endpoint.
* Extends the {@link Request} interface.
*/
public interface RoutedRequest extends Request {

/**
* Retrieves the endpoint to which the request has been routed.
*
* @param <E> The type of the endpoint.
* @return The selected endpoint for routing the request.
*/
<E extends Endpoint> E getEndpoint();
}

Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@
*/
package com.jd.live.agent.governance.util;

import com.jd.live.agent.core.util.ExceptionUtils;

import java.io.UnsupportedEncodingException;
import java.lang.reflect.InvocationTargetException;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.function.BiConsumer;
import java.util.function.Predicate;

Expand All @@ -35,8 +35,6 @@ public class ResponseUtils {

public static final int HEADER_SIZE_LIMIT = 1024 * 2;

public static final Predicate<Throwable> NONE_EXECUTION_PREDICATE = e -> !(e instanceof ExecutionException) && !(e instanceof InvocationTargetException);

/**
* Converts a Set of Strings to a String, using the specified delimiter and truncating if the resulting String exceeds the maximum length.
*
Expand Down Expand Up @@ -103,7 +101,7 @@ public static void labelHeaders(Throwable e, BiConsumer<String, String> consumer
* @param consumer a BiConsumer to accept the generated headers
*/
public static void labelHeaders(Throwable e, Predicate<Throwable> predicate, BiConsumer<String, String> consumer) {
describe(e, predicate == null ? NONE_EXECUTION_PREDICATE : predicate, (name, message) -> {
describe(e, predicate == null ? ExceptionUtils::isNoneWrapped : predicate, (name, message) -> {
if (name != null && !name.isEmpty()) {
consumer.accept(EXCEPTION_NAMES_LABEL, name);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,17 @@
import io.grpc.stub.StreamObserver;
import net.devh.boot.grpc.server.service.GrpcService;

import java.util.concurrent.ThreadLocalRandom;

@GrpcService
public class UserServiceGrpcImpl extends UserServiceGrpc.UserServiceImplBase {

@Override
public void get(UserGetRequest request, StreamObserver<UserGetResponse> responseObserver) {
if (ThreadLocalRandom.current().nextBoolean()) {
responseObserver.onError(new RuntimeException("error"));
return;
}
UserGetResponse.Builder builder = UserGetResponse.newBuilder();
builder.setId(request.getId())
.setName("index :" + request.getId() + " time : " + System.currentTimeMillis())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,6 @@ spring:
password: ${NACOS_PASSWORD:nacos}
grpc:
server:
port: 9898
port: ${GRPC_SERVER_PORT:${random.int[20000,21000]}}
server:
port: ${SERVER_PORT:9899}
port: ${SERVER_PORT:${random.int[20000,21000]}}
Loading
Loading