Skip to content

Commit

Permalink
Review
Browse files Browse the repository at this point in the history
  • Loading branch information
hexiaofeng committed Jan 10, 2025
1 parent e39c9d9 commit c8fa646
Show file tree
Hide file tree
Showing 6 changed files with 95 additions and 26 deletions.
Original file line number Diff line number Diff line change
@@ -1,3 +1,18 @@
/*
* Copyright © ${year} ${owner} (${email})
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.jd.live.agent.plugin.router.springcloud.v2.cluster;

import com.jd.live.agent.core.util.Futures;
Expand Down Expand Up @@ -50,7 +65,6 @@ public class ReactiveCluster extends AbstractClientCluster<ReactiveClusterReques

private static final ErrorPredicate RETRY_PREDICATE = new ErrorPredicate.DefaultErrorPredicate(null, RETRY_EXCEPTIONS);


private static final String FIELD_LOAD_BALANCER = "loadBalancerClient";

private static final String FIELD_LOAD_BALANCER_FACTORY = "loadBalancerFactory";
Expand All @@ -63,39 +77,34 @@ public class ReactiveCluster extends AbstractClientCluster<ReactiveClusterReques

private final List<LoadBalancerClientRequestTransformer> transformers;



public ReactiveCluster(ExchangeFilterFunction exchangeFilterFunction) {
LoadBalancerClient client = getValue(exchangeFilterFunction, FIELD_LOAD_BALANCER);
this.filterFunction = exchangeFilterFunction;
/**
* If client is not null, it indicates that the currently intercepted class is LoadBalancerExchangeFilterFunction;
* otherwise, the intercepted class is ReactorLoadBalancerExchangeFilterFunction.
/*
If client is not null, it indicates that the currently intercepted class is LoadBalancerExchangeFilterFunction;
otherwise, the intercepted class is ReactorLoadBalancerExchangeFilterFunction.
*/
this.loadBalancerFactory = client != null ? LoadBalancerUtil.getFactory(client) : getValue(filterFunction, FIELD_LOAD_BALANCER_FACTORY);
this.transformers = getValue(filterFunction, FIELD_TRANSFORMERS);
}



public ReactiveLoadBalancer.Factory<ServiceInstance> getLoadBalancerFactory() {
return loadBalancerFactory;
}

@Override
public ErrorPredicate getRetryPredicate() {
return RETRY_PREDICATE;
}

@Override
public CompletionStage<ReactiveClusterResponse> invoke(ReactiveClusterRequest request, SpringEndpoint endpoint) {

try {
ClientRequest newRequest = buildRequest(request, endpoint.getInstance());
return request.getNext().exchange(newRequest).map(ReactiveClusterResponse::new).toFuture();
} catch (Throwable e) {
return Futures.future(e);
}

}

/**
Expand All @@ -109,7 +118,6 @@ public CompletionStage<ReactiveClusterResponse> invoke(ReactiveClusterRequest re
* potentially transformed by any configured {@link LoadBalancerClientRequestTransformer}s.
*/
private ClientRequest buildRequest(ReactiveClusterRequest request, ServiceInstance serviceInstance) {

ClientRequest clientRequest = request.getRequest();
URI originalUrl = clientRequest.url();
ClientRequest result = ClientRequest
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,18 @@
/*
* Copyright © ${year} ${owner} (${email})
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.jd.live.agent.plugin.router.springcloud.v2.definition;

import com.jd.live.agent.core.bytekit.matcher.MatcherBuilder;
Expand Down Expand Up @@ -27,7 +42,6 @@ public class LoadBalancerExchangeFilterFunctionDefinition extends PluginDefinit

protected static final String TYPE_LOADBALANCER_EXCHANGE_FILTER = "org.springframework.cloud.client.loadbalancer.reactive.LoadBalancerExchangeFilterFunction";


private static final String METHOD_INTERCEPT = "filter";

private static final String[] ARGUMENT_INTERCEPT = new String[]{
Expand All @@ -39,7 +53,6 @@ public class LoadBalancerExchangeFilterFunctionDefinition extends PluginDefinit
private InvocationContext context;

public LoadBalancerExchangeFilterFunctionDefinition() {

this.matcher = () -> MatcherBuilder.named(TYPE_LOADBALANCER_EXCHANGE_FILTER);
this.interceptors = new InterceptorDefinition[]{
new InterceptorDefinitionAdapter(
Expand All @@ -48,7 +61,5 @@ public LoadBalancerExchangeFilterFunctionDefinition() {
() -> new ReactiveClusterInterceptor(context)
)
};


}
}
Original file line number Diff line number Diff line change
@@ -1,3 +1,18 @@
/*
* Copyright © ${year} ${owner} (${email})
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.jd.live.agent.plugin.router.springcloud.v2.definition;

import com.jd.live.agent.core.bytekit.matcher.MatcherBuilder;
Expand Down Expand Up @@ -38,7 +53,6 @@ public class ReactorLoadBalancerExchangeFilterFunctionDefinition extends PluginD
private InvocationContext context;

public ReactorLoadBalancerExchangeFilterFunctionDefinition() {

this.matcher = () -> MatcherBuilder.named(TYPE_REACTOR_LOADBALANCER_EXCHANGE_FILTER);
this.interceptors = new InterceptorDefinition[]{
new InterceptorDefinitionAdapter(
Expand All @@ -47,6 +61,5 @@ public ReactorLoadBalancerExchangeFilterFunctionDefinition() {
() -> new ReactiveClusterInterceptor(context)
)
};

}
}
Original file line number Diff line number Diff line change
@@ -1,3 +1,18 @@
/*
* Copyright © ${year} ${owner} (${email})
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.jd.live.agent.plugin.router.springcloud.v2.interceptor;

import com.jd.live.agent.bootstrap.bytekit.context.ExecutableContext;
Expand Down Expand Up @@ -52,16 +67,12 @@ public void onEnter(ExecutableContext ctx) {
CompletableFuture<ClientResponse> future = response.toCompletableFuture().thenApply(ReactiveClusterResponse::getResponse);
Mono<ClientResponse> mono = Mono.fromFuture(future);
// mono will be consumed later by the processing pipeline in mc
mc.setResult(mono);
mc.setSkip(true);
mc.skipWithResult(mono);
} else {
// only for live & lane
String serviceName = request.url().getHost();
RequestContext.setAttribute(Carrier.ATTRIBUTE_SERVICE_ID, serviceName);
RequestContext.setAttribute(Carrier.ATTRIBUTE_REQUEST, request);
}



}
}
Original file line number Diff line number Diff line change
@@ -1,3 +1,18 @@
/*
* Copyright © ${year} ${owner} (${email})
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.jd.live.agent.plugin.router.springcloud.v2.request;

import com.jd.live.agent.core.util.cache.UnsafeLazyObject;
Expand Down Expand Up @@ -66,5 +81,4 @@ public ExchangeFunction getNext() {
return next;
}


}
Original file line number Diff line number Diff line change
@@ -1,3 +1,18 @@
/*
* Copyright © ${year} ${owner} (${email})
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.jd.live.agent.plugin.router.springcloud.v2.response;

import com.jd.live.agent.core.util.cache.UnsafeLazyObject;
Expand All @@ -16,7 +31,6 @@
*/
public class ReactiveClusterResponse extends AbstractHttpOutboundResponse<ClientResponse> {


private String body;

public ReactiveClusterResponse(ClientResponse response) {
Expand Down Expand Up @@ -48,6 +62,4 @@ public Object getResult() {
return body;
}



}

0 comments on commit c8fa646

Please sign in to comment.