Skip to content

Commit

Permalink
Update grpc
Browse files Browse the repository at this point in the history
  • Loading branch information
hexiaofeng committed Dec 24, 2024
1 parent 9f85947 commit f62a9f4
Show file tree
Hide file tree
Showing 5 changed files with 74 additions and 76 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@

public class GrpcCluster extends AbstractLiveCluster<GrpcOutboundRequest, GrpcOutboundResponse, GrpcEndpoint> {

private ClientCall<?, ?> clientCall;
private ClientCall clientCall;

public GrpcCluster(ClientCall<?, ?> clientCall) {
public GrpcCluster(ClientCall clientCall) {
this.clientCall = clientCall;
}

Expand All @@ -36,13 +36,7 @@ public CompletionStage<List<GrpcEndpoint>> route(GrpcOutboundRequest request) {
@Override
public CompletionStage<GrpcOutboundResponse> invoke(GrpcOutboundRequest request, GrpcEndpoint endpoint) {
CompletionStage<GrpcOutboundResponse> stage = new CompletableFuture<>();
clientCall.start(new ClientCall.Listener() {
@Override
public void onMessage(Object message) {
super.onMessage(message);
}

}, new Metadata());
clientCall.sendMessage(request.getRequest());
return stage;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,12 @@
import com.jd.live.agent.bootstrap.bytekit.context.MethodContext;
import com.jd.live.agent.core.plugin.definition.InterceptorAdaptor;
import com.jd.live.agent.governance.invoke.InvocationContext;
import com.jd.live.agent.plugin.router.gprc.instance.GrpcEndpoint;
import com.jd.live.agent.plugin.router.gprc.loadbalance.LiveDiscovery;
import com.jd.live.agent.plugin.router.gprc.loadbalance.LivePickerAdvice;
import com.jd.live.agent.plugin.router.gprc.loadbalance.LiveSubchannel;
import com.jd.live.agent.plugin.router.gprc.request.GrpcRequest.GrpcOutboundRequest;
import com.jd.live.agent.plugin.router.gprc.request.invoke.GrpcInvocation.GrpcOutboundInvocation;
import com.jd.live.agent.plugin.router.gprc.response.GrpcResponse.GrpcOutboundResponse;
import io.grpc.*;
import io.grpc.MethodDescriptor.MethodType;

import java.util.List;
import java.util.function.Function;

import static com.jd.live.agent.core.util.CollectionUtils.convert;
import java.util.concurrent.CompletableFuture;

/**
* ClientInterceptorsInterceptor
Expand Down Expand Up @@ -65,15 +59,30 @@ private static class LiveChannel extends Channel {

@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> newCall(MethodDescriptor<ReqT, RespT> method, CallOptions callOptions) {
LiveRoute<ReqT, RespT> route = new LiveRoute<>(context, method);
LivePickerAdvice advice = new LivePickerAdvice(route);
if (method.getType() != MethodType.UNARY) {
// This is not a Unary RPC method
return channel.newCall(method, callOptions);
}
LivePickerAdvice advice = new LivePickerAdvice(method, context);
callOptions = callOptions.withOption(LivePickerAdvice.KEY_PICKER_ADVICE, advice);
return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(channel.newCall(method, callOptions)) {
ClientCall<ReqT, RespT> clientCall = channel.newCall(method, callOptions);
return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(clientCall) {

private CompletableFuture<GrpcOutboundResponse> future = new CompletableFuture<>();

@Override
public void start(Listener<RespT> responseListener, Metadata headers) {
route.headers = headers;
advice.setHeaders(headers);
// TODO wrap response listener to handle response & error
super.start(responseListener, headers);
}

@Override
public void sendMessage(ReqT message) {
advice.setMessage(message);
// TODO wrap cluster to invoke & handle void response
super.sendMessage(message);
}
};
}

Expand All @@ -82,27 +91,4 @@ public String authority() {
return channel.authority();
}
}

private static class LiveRoute<ReqT, RespT> implements Function<List<LiveSubchannel>, LiveSubchannel> {

private final InvocationContext context;

private final MethodDescriptor<ReqT, RespT> method;

private Metadata headers;

LiveRoute(InvocationContext context, MethodDescriptor<ReqT, RespT> method) {
this.context = context;
this.method = method;
}

@Override
public LiveSubchannel apply(List<LiveSubchannel> subchannels) {
String serviceName = LiveDiscovery.getService(method.getServiceName());
GrpcOutboundRequest request = new GrpcOutboundRequest(headers, serviceName, method);
GrpcOutboundInvocation invocation = new GrpcOutboundInvocation(request, context);
GrpcEndpoint endpoint = context.route(invocation, convert(subchannels, GrpcEndpoint::new));
return endpoint.getSubchannel();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,17 @@
*/
package com.jd.live.agent.plugin.router.gprc.loadbalance;

import com.jd.live.agent.governance.invoke.InvocationContext;
import com.jd.live.agent.plugin.router.gprc.instance.GrpcEndpoint;
import com.jd.live.agent.plugin.router.gprc.request.GrpcRequest.GrpcOutboundRequest;
import com.jd.live.agent.plugin.router.gprc.request.invoke.GrpcInvocation.GrpcOutboundInvocation;
import io.grpc.CallOptions;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;

import java.util.List;
import java.util.function.Function;

import static com.jd.live.agent.core.util.CollectionUtils.convert;

/**
* Represents advice for live picker, which includes a subchannel and an election function to determine the active subchannel.
Expand All @@ -27,32 +34,42 @@ public class LivePickerAdvice {

public static final CallOptions.Key<LivePickerAdvice> KEY_PICKER_ADVICE = CallOptions.Key.create("x-picker-advice");

private LiveSubchannel subchannel;
private final MethodDescriptor<?, ?> method;

private final Function<List<LiveSubchannel>, LiveSubchannel> election;
private final InvocationContext context;

public LivePickerAdvice(LiveSubchannel subchannel) {
this(subchannel, null);
}
private Object message;

private Metadata headers;

public LivePickerAdvice(Function<List<LiveSubchannel>, LiveSubchannel> election) {
this(null, election);
private LiveSubchannel subchannel;

public LivePickerAdvice(MethodDescriptor<?, ?> method, InvocationContext context) {
this.method = method;
this.context = context;
}

public LivePickerAdvice(LiveSubchannel subchannel, Function<List<LiveSubchannel>, LiveSubchannel> election) {
this.subchannel = subchannel;
this.election = election;
public void setHeaders(Metadata headers) {
this.headers = headers;
}

public LiveSubchannel getSubchannel() {
return subchannel;
public void setMessage(Object message) {
this.message = message;
}

public void setSubchannel(LiveSubchannel subchannel) {
this.subchannel = subchannel;
}

public Function<List<LiveSubchannel>, LiveSubchannel> getElection() {
return election;
public LiveSubchannel elect(List<LiveSubchannel> subchannels) {
if (subchannel != null) {
return subchannel;
}
String serviceName = LiveDiscovery.getService(method.getServiceName());
GrpcOutboundRequest request = new GrpcOutboundRequest(message, headers, method, serviceName);
GrpcOutboundInvocation invocation = new GrpcOutboundInvocation(request, context);
GrpcEndpoint endpoint = context.route(invocation, convert(subchannels, GrpcEndpoint::new));
return endpoint.getSubchannel();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@

import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;

/**
* A class that extends the SubchannelPicker class to provide a live subchannel picking strategy.
Expand Down Expand Up @@ -60,15 +59,11 @@ public PickResult pickSubchannel(PickSubchannelArgs args) {
LivePickerAdvice advice = args.getCallOptions().getOption(LivePickerAdvice.KEY_PICKER_ADVICE);
LiveSubchannel subchannel = null;
if (advice != null) {
subchannel = advice.getSubchannel();
Function<List<LiveSubchannel>, LiveSubchannel> election = advice.getElection();
if (subchannel == null && election != null) {
try {
subchannel = election.apply(subchannels);
} catch (Throwable e) {
logger.error(e.getMessage(), e);
return PickResult.withError(GrpcStatus.createException(e));
}
try {
subchannel = advice.elect(subchannels);
} catch (Throwable e) {
logger.error(e.getMessage(), e);
return PickResult.withError(GrpcStatus.createException(e));
}
}
if (subchannel != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,19 +60,25 @@ public boolean isNativeGroup() {
/**
* A nested class representing an outbound gRPC request.
*/
class GrpcOutboundRequest extends AbstractRpcOutboundRequest<Metadata> implements GrpcRequest {
class GrpcOutboundRequest extends AbstractRpcOutboundRequest<Object> implements GrpcRequest {

public GrpcOutboundRequest(Metadata request, String serviceName, MethodDescriptor<?, ?> method) {
super(request);
private final Metadata metadata;

private final MethodDescriptor<?, ?> methodDescriptor;

public GrpcOutboundRequest(Object message, Metadata metadata, MethodDescriptor<?, ?> methodDescriptor, String serviceName) {
super(message);
this.metadata = metadata;
this.methodDescriptor = methodDescriptor;
this.service = serviceName;
this.path = method.getServiceName();
this.method = method.getBareMethodName();
this.path = methodDescriptor.getServiceName();
this.method = methodDescriptor.getBareMethodName();
}

@Override
public void setHeader(String key, String value) {
if (key != null && !key.isEmpty() && value != null && !value.isEmpty()) {
request.put(Metadata.Key.of(key, Metadata.ASCII_STRING_MARSHALLER), value);
metadata.put(Metadata.Key.of(key, Metadata.ASCII_STRING_MARSHALLER), value);
}
}

Expand Down

0 comments on commit f62a9f4

Please sign in to comment.