Skip to content

Commit

Permalink
Merge pull request #204 from yuanjinzhong/feature/issue_163
Browse files Browse the repository at this point in the history
issue_163
  • Loading branch information
hexiaofeng authored Jan 10, 2025
2 parents d70f787 + f91b5af commit e39c9d9
Show file tree
Hide file tree
Showing 8 changed files with 494 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,7 @@ feign.httpclient.enabled=false
feign.okhttp.enabled=true
#spring.main.web-application-type=none

logging.level.root=${LIVE_LOG_LEVEL:INFO}
logging.level.root=${LIVE_LOG_LEVEL:INFO}

# false?ReactorLoadBalancerExchangeFilterFunction: LoadBalancerExchangeFilterFunction
spring.cloud.loadbalancer.ribbon.enabled=false
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
package com.jd.live.agent.plugin.router.springcloud.v2.cluster;

import com.jd.live.agent.core.util.Futures;
import com.jd.live.agent.core.util.type.ClassUtils;
import com.jd.live.agent.core.util.type.FieldDesc;
import com.jd.live.agent.governance.exception.ErrorPredicate;
import com.jd.live.agent.governance.exception.ServiceError;
import com.jd.live.agent.governance.policy.service.circuitbreak.DegradeConfig;
import com.jd.live.agent.plugin.router.springcloud.v2.instance.SpringEndpoint;
import com.jd.live.agent.plugin.router.springcloud.v2.request.ReactiveClusterRequest;
import com.jd.live.agent.plugin.router.springcloud.v2.response.ReactiveClusterResponse;
import com.jd.live.agent.plugin.router.springcloud.v2.util.LoadBalancerUtil;
import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.client.loadbalancer.LoadBalancerClient;
import org.springframework.cloud.client.loadbalancer.LoadBalancerUriTools;
import org.springframework.cloud.client.loadbalancer.reactive.LoadBalancerClientRequestTransformer;
import org.springframework.cloud.client.loadbalancer.reactive.ReactiveLoadBalancer;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.http.HttpRequest;
import org.springframework.lang.NonNull;
import org.springframework.web.reactive.function.client.ClientRequest;
import org.springframework.web.reactive.function.client.ClientResponse;
import org.springframework.web.reactive.function.client.ExchangeFilterFunction;
import org.springframework.web.reactive.function.client.ExchangeStrategies;

import java.net.URI;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletionStage;

import static com.jd.live.agent.core.util.type.ClassUtils.getValue;

/**
* @author: yuanjinzhong
* @date: 2025/1/3 17:43
* @description: a cluster for reactor mode
* @see org.springframework.cloud.client.loadbalancer.reactive.ReactorLoadBalancerExchangeFilterFunction
*/
public class ReactiveCluster extends AbstractClientCluster<ReactiveClusterRequest, ReactiveClusterResponse> {


private static final Set<String> RETRY_EXCEPTIONS = new HashSet<>(Arrays.asList(
"java.io.IOException",
"java.util.concurrent.TimeoutException",
"org.springframework.cloud.client.loadbalancer.reactive.RetryableStatusCodeException"
));

private static final ErrorPredicate RETRY_PREDICATE = new ErrorPredicate.DefaultErrorPredicate(null, RETRY_EXCEPTIONS);


private static final String FIELD_LOAD_BALANCER = "loadBalancerClient";

private static final String FIELD_LOAD_BALANCER_FACTORY = "loadBalancerFactory";

private static final String FIELD_TRANSFORMERS = "transformers";

private final ExchangeFilterFunction filterFunction;

private final ReactiveLoadBalancer.Factory<ServiceInstance> loadBalancerFactory;

private final List<LoadBalancerClientRequestTransformer> transformers;



public ReactiveCluster(ExchangeFilterFunction exchangeFilterFunction) {
LoadBalancerClient client = getValue(exchangeFilterFunction, FIELD_LOAD_BALANCER);
this.filterFunction = exchangeFilterFunction;
/**
* If client is not null, it indicates that the currently intercepted class is LoadBalancerExchangeFilterFunction;
* otherwise, the intercepted class is ReactorLoadBalancerExchangeFilterFunction.
*/
this.loadBalancerFactory = client != null ? LoadBalancerUtil.getFactory(client) : getValue(filterFunction, FIELD_LOAD_BALANCER_FACTORY);
this.transformers = getValue(filterFunction, FIELD_TRANSFORMERS);
}



public ReactiveLoadBalancer.Factory<ServiceInstance> getLoadBalancerFactory() {
return loadBalancerFactory;
}
@Override
public ErrorPredicate getRetryPredicate() {
return RETRY_PREDICATE;
}

@Override
public CompletionStage<ReactiveClusterResponse> invoke(ReactiveClusterRequest request, SpringEndpoint endpoint) {

try {
ClientRequest newRequest = buildRequest(request, endpoint.getInstance());
return request.getNext().exchange(newRequest).map(ReactiveClusterResponse::new).toFuture();
} catch (Throwable e) {
return Futures.future(e);
}

}

/**
* Builds a new {@link ClientRequest} tailored for a specific {@link ServiceInstance}, incorporating sticky session
* configurations and potential transformations.
*
* @param request The original {@link ReactiveClusterRequest} containing the request to be sent and its associated
* load balancer properties.
* @param serviceInstance The {@link ServiceInstance} to which the request should be directed.
* @return A new {@link ClientRequest} instance, modified to target the specified {@link ServiceInstance} and
* potentially transformed by any configured {@link LoadBalancerClientRequestTransformer}s.
*/
private ClientRequest buildRequest(ReactiveClusterRequest request, ServiceInstance serviceInstance) {

ClientRequest clientRequest = request.getRequest();
URI originalUrl = clientRequest.url();
ClientRequest result = ClientRequest
.create(clientRequest.method(), LoadBalancerUriTools.reconstructURI(serviceInstance, originalUrl))
.headers(headers -> headers.addAll(clientRequest.headers()))
.cookies(cookies -> {
cookies.addAll(clientRequest.cookies());
})
.attributes(attributes -> attributes.putAll(clientRequest.attributes()))
.body(clientRequest.body())
.build();
if (transformers != null) {
for (LoadBalancerClientRequestTransformer transformer : transformers) {
result = transformer.transformRequest(result, serviceInstance);
}
}
return result;
}

@Override
protected ReactiveClusterResponse createResponse(ReactiveClusterRequest request, DegradeConfig degradeConfig) {
ExchangeStrategies strategies;
try {
FieldDesc field = ClassUtils.describe(request.getNext().getClass()).getFieldList().getField("strategies");
strategies = field == null ? ExchangeStrategies.withDefaults() : (ExchangeStrategies) field.get(request.getNext());
} catch (Throwable ignored) {
strategies = ExchangeStrategies.withDefaults();
}
return new ReactiveClusterResponse(ClientResponse.create(degradeConfig.getResponseCode(), strategies)
.body(degradeConfig.getResponseBody() == null ? "" : degradeConfig.getResponseBody())
.request(new DegradeHttpRequest(request))
.headers(headers -> {
headers.addAll(request.getRequest().headers());
degradeConfig.foreach(headers::add);
headers.set(HttpHeaders.CONTENT_TYPE, degradeConfig.contentType());
headers.set(HttpHeaders.CONTENT_LENGTH, String.valueOf(degradeConfig.bodyLength()));
}).build());
}

@Override
protected ReactiveClusterResponse createResponse(ServiceError error, ErrorPredicate predicate) {
return new ReactiveClusterResponse(error, predicate);
}

/**
* A class that implements the HttpRequest interface and wrap a ReactiveClusterRequest.
*/
private static class DegradeHttpRequest implements HttpRequest {

private final ReactiveClusterRequest request;

DegradeHttpRequest(ReactiveClusterRequest request) {
this.request = request;
}

@Override
@NonNull
public HttpMethod getMethod() {
return request.getRequest().method();
}

@Override
public String getMethodValue() {
return request.getRequest().method().name();
}

@Override
@NonNull
public URI getURI() {
return request.getRequest().url();
}

@Override
@NonNull
public HttpHeaders getHeaders() {
return request.getRequest().headers();
}
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package com.jd.live.agent.plugin.router.springcloud.v2.definition;

import com.jd.live.agent.core.bytekit.matcher.MatcherBuilder;
import com.jd.live.agent.core.extension.annotation.ConditionalOnClass;
import com.jd.live.agent.core.extension.annotation.Extension;
import com.jd.live.agent.core.inject.annotation.Inject;
import com.jd.live.agent.core.inject.annotation.Injectable;
import com.jd.live.agent.core.plugin.definition.InterceptorDefinition;
import com.jd.live.agent.core.plugin.definition.InterceptorDefinitionAdapter;
import com.jd.live.agent.core.plugin.definition.PluginDefinitionAdapter;
import com.jd.live.agent.governance.invoke.InvocationContext;
import com.jd.live.agent.plugin.router.springcloud.v2.condition.ConditionalOnSpringCloud2GovernanceEnabled;
import com.jd.live.agent.plugin.router.springcloud.v2.interceptor.ReactiveClusterInterceptor;

/**
* @author: yuanjinzhong
* @date: 2025/1/2 19:55
* @description: When <code>spring.cloud.loadbalancer.ribbon.enabled=false</code> is configured in the application, ReactorLoadBalancerExchangeFilterFunction is automatically injected;
* otherwise, LoadBalancerExchangeFilterFunction is injected. Note that they have an either-or relationship.
* @see org.springframework.cloud.client.loadbalancer.reactive.LoadBalancerExchangeFilterFunction
*/
@Injectable
@Extension(value = "LoadBalancerExchangeFilterFunction_v2")
@ConditionalOnSpringCloud2GovernanceEnabled
@ConditionalOnClass(LoadBalancerExchangeFilterFunctionDefinition.TYPE_LOADBALANCER_EXCHANGE_FILTER)
public class LoadBalancerExchangeFilterFunctionDefinition extends PluginDefinitionAdapter {

protected static final String TYPE_LOADBALANCER_EXCHANGE_FILTER = "org.springframework.cloud.client.loadbalancer.reactive.LoadBalancerExchangeFilterFunction";


private static final String METHOD_INTERCEPT = "filter";

private static final String[] ARGUMENT_INTERCEPT = new String[]{
"org.springframework.web.reactive.function.client.ClientRequest",
"org.springframework.web.reactive.function.client.ExchangeFunction"
};

@Inject(InvocationContext.COMPONENT_INVOCATION_CONTEXT)
private InvocationContext context;

public LoadBalancerExchangeFilterFunctionDefinition() {

this.matcher = () -> MatcherBuilder.named(TYPE_LOADBALANCER_EXCHANGE_FILTER);
this.interceptors = new InterceptorDefinition[]{
new InterceptorDefinitionAdapter(
MatcherBuilder.named(METHOD_INTERCEPT).
and(MatcherBuilder.arguments(ARGUMENT_INTERCEPT)),
() -> new ReactiveClusterInterceptor(context)
)
};


}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package com.jd.live.agent.plugin.router.springcloud.v2.definition;

import com.jd.live.agent.core.bytekit.matcher.MatcherBuilder;
import com.jd.live.agent.core.extension.annotation.ConditionalOnClass;
import com.jd.live.agent.core.extension.annotation.Extension;
import com.jd.live.agent.core.inject.annotation.Inject;
import com.jd.live.agent.core.inject.annotation.Injectable;
import com.jd.live.agent.core.plugin.definition.InterceptorDefinition;
import com.jd.live.agent.core.plugin.definition.InterceptorDefinitionAdapter;
import com.jd.live.agent.core.plugin.definition.PluginDefinitionAdapter;
import com.jd.live.agent.governance.invoke.InvocationContext;
import com.jd.live.agent.plugin.router.springcloud.v2.condition.ConditionalOnSpringCloud2GovernanceEnabled;
import com.jd.live.agent.plugin.router.springcloud.v2.interceptor.ReactiveClusterInterceptor;

/**
* @author: yuanjinzhong
* @date: 2025/1/2 19:52
* @description: When <code>spring.cloud.loadbalancer.ribbon.enabled=false </code> is configured in the application, ReactorLoadBalancerExchangeFilterFunction is automatically injected;
* otherwise, LoadBalancerExchangeFilterFunction is injected. Note that they have an either-or relationship.
* @see org.springframework.cloud.client.loadbalancer.reactive.ReactorLoadBalancerExchangeFilterFunction
*/
@Injectable
@Extension(value = "ReactorExchangeFilterFunctionDefinition_v2")
@ConditionalOnSpringCloud2GovernanceEnabled
@ConditionalOnClass(ReactorLoadBalancerExchangeFilterFunctionDefinition.TYPE_REACTOR_LOADBALANCER_EXCHANGE_FILTER)
public class ReactorLoadBalancerExchangeFilterFunctionDefinition extends PluginDefinitionAdapter {

protected static final String TYPE_REACTOR_LOADBALANCER_EXCHANGE_FILTER = "org.springframework.cloud.client.loadbalancer.reactive.ReactorLoadBalancerExchangeFilterFunction";

private static final String METHOD_INTERCEPT = "filter";

private static final String[] ARGUMENT_INTERCEPT = new String[]{
"org.springframework.web.reactive.function.client.ClientRequest",
"org.springframework.web.reactive.function.client.ExchangeFunction"
};

@Inject(InvocationContext.COMPONENT_INVOCATION_CONTEXT)
private InvocationContext context;

public ReactorLoadBalancerExchangeFilterFunctionDefinition() {

this.matcher = () -> MatcherBuilder.named(TYPE_REACTOR_LOADBALANCER_EXCHANGE_FILTER);
this.interceptors = new InterceptorDefinition[]{
new InterceptorDefinitionAdapter(
MatcherBuilder.named(METHOD_INTERCEPT).
and(MatcherBuilder.arguments(ARGUMENT_INTERCEPT)),
() -> new ReactiveClusterInterceptor(context)
)
};

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package com.jd.live.agent.plugin.router.springcloud.v2.interceptor;

import com.jd.live.agent.bootstrap.bytekit.context.ExecutableContext;
import com.jd.live.agent.bootstrap.bytekit.context.MethodContext;
import com.jd.live.agent.core.plugin.definition.InterceptorAdaptor;
import com.jd.live.agent.governance.context.RequestContext;
import com.jd.live.agent.governance.context.bag.Carrier;
import com.jd.live.agent.governance.invoke.InvocationContext;
import com.jd.live.agent.governance.invoke.OutboundInvocation.HttpOutboundInvocation;
import com.jd.live.agent.plugin.router.springcloud.v2.cluster.ReactiveCluster;
import com.jd.live.agent.plugin.router.springcloud.v2.request.ReactiveClusterRequest;
import com.jd.live.agent.plugin.router.springcloud.v2.response.ReactiveClusterResponse;
import org.springframework.web.reactive.function.client.ClientRequest;
import org.springframework.web.reactive.function.client.ClientResponse;
import org.springframework.web.reactive.function.client.ExchangeFilterFunction;
import org.springframework.web.reactive.function.client.ExchangeFunction;
import reactor.core.publisher.Mono;

import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;

/**
* @author: yuanjinzhong
* @date: 2025/1/2 20:02
* @description:
*/
public class ReactiveClusterInterceptor extends InterceptorAdaptor {

private final InvocationContext context;

private final Map<ExchangeFilterFunction, ReactiveCluster> clusters = new ConcurrentHashMap<>();

public ReactiveClusterInterceptor(InvocationContext context) {
this.context = context;
}

@Override
public void onEnter(ExecutableContext ctx) {
MethodContext mc = (MethodContext) ctx;
Object[] arguments = ctx.getArguments();
ClientRequest request = (ClientRequest) arguments[0];
if (context.isFlowControlEnabled()) {
ReactiveCluster cluster = clusters.computeIfAbsent((ExchangeFilterFunction) ctx.getTarget(), ReactiveCluster::new);

ReactiveClusterRequest clusterRequest = new ReactiveClusterRequest(request, cluster.getLoadBalancerFactory(), (ExchangeFunction) arguments[1]);
HttpOutboundInvocation<ReactiveClusterRequest> invocation = new HttpOutboundInvocation<>(clusterRequest, context);

CompletionStage<ReactiveClusterResponse> response = cluster.invoke(invocation);

CompletableFuture<ClientResponse> future = response.toCompletableFuture().thenApply(ReactiveClusterResponse::getResponse);
Mono<ClientResponse> mono = Mono.fromFuture(future);
// mono will be consumed later by the processing pipeline in mc
mc.setResult(mono);
mc.setSkip(true);
} else {
// only for live & lane
String serviceName = request.url().getHost();
RequestContext.setAttribute(Carrier.ATTRIBUTE_SERVICE_ID, serviceName);
RequestContext.setAttribute(Carrier.ATTRIBUTE_REQUEST, request);
}



}
}
Loading

0 comments on commit e39c9d9

Please sign in to comment.