Skip to content

Commit

Permalink
client-mini模块
Browse files Browse the repository at this point in the history
  • Loading branch information
Liubsyy committed Jan 20, 2024
1 parent 79a273e commit 9643a91
Show file tree
Hide file tree
Showing 32 changed files with 1,441 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
public enum SerializerEnum {
KRYO("KRYO","kryo序列化"),
PROTOBUF("PROTOBUF","protobuf序列化"),
JAVA_SERIALISE("JAVA_SERIALIZE","java原生序列化"),

;

Expand All @@ -27,4 +28,7 @@ public static SerializerEnum findByType(String serializeType) {
return null;
}

public String getSerializeType() {
return serializeType;
}
}
16 changes: 16 additions & 0 deletions client-mini/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>io.github.liubsyy</groupId>
<artifactId>shadowrpc</artifactId>
<version>1.0</version>
<relativePath>../pom.xml</relativePath>
</parent>

<artifactId>shadowrpc-client-mini</artifactId>
<version>1.0.0</version>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package com.liubs.shadowrpc.base.annotation;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

/**
* @author Liubsyy
* @date 2023/12/8 22:18 PM
*/
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
public @interface ShadowInterface {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package com.liubs.shadowrpc.clientmini.connection;

import com.liubs.shadowrpc.base.annotation.ShadowInterface;
import com.liubs.shadowrpc.clientmini.handler.ReceiveHolder;
import com.liubs.shadowrpc.clientmini.logger.Logger;
import com.liubs.shadowrpc.protocol.entity.JavaSerializeRPCRequest;
import com.liubs.shadowrpc.protocol.entity.JavaSerializeRPCResponse;

import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.util.UUID;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

/**
* @author Liubsyy
* @date 2023/12/31
**/
public class RemoteProxy implements InvocationHandler {
private static final Logger logger = Logger.getLogger(RemoteProxy.class);


private ShadowClient clientConnection;

/**
* 远程接口stub
*/
private Class<?> serviceStub;

/**
* 服务名
*/
private String serviceName;

public RemoteProxy(ShadowClient client, Class<?> serviceStub, String serviceName) {
this.clientConnection = client;
this.serviceStub = serviceStub;
this.serviceName = serviceName;
}

@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {

try{
JavaSerializeRPCRequest requestModel = new JavaSerializeRPCRequest();
String traceId = UUID.randomUUID().toString();
requestModel.setTraceId(traceId);
requestModel.setServiceName(serviceName);
requestModel.setMethodName(method.getName());
requestModel.setParamTypes(method.getParameterTypes());
requestModel.setParams(args);


Future<?> future = ReceiveHolder.getInstance().initFuture(traceId);


try{
clientConnection.sendMessage(clientConnection.getRequestHandler().handleMessage(requestModel));
}catch (Exception e) {
logger.error("发送请求{}失败",traceId);
return null;
}

JavaSerializeRPCResponse responseModel = (JavaSerializeRPCResponse)future.get(3, TimeUnit.SECONDS);
if(responseModel != null) {
return responseModel.getResult();
}else {
ReceiveHolder.getInstance().deleteWait(traceId);
logger.error("超时请求,抛弃消息{}",traceId);
return null;
}

}catch (Throwable e) {
logger.error("invoke err",e);
}

return null;
}

public static <T> T create(ShadowClient connection, Class<T> serviceStub, String serviceName) {

ShadowInterface shadowInterface = serviceStub.getAnnotation(ShadowInterface.class);
if(null == shadowInterface) {
throw new RuntimeException("服务未找到 @shadowInterface注解");
}

Object proxyInstance = Proxy.newProxyInstance(
serviceStub.getClassLoader(),
new Class<?>[]{serviceStub},
new RemoteProxy(connection,serviceStub,serviceName)
);

return (T)proxyInstance;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package com.liubs.shadowrpc.clientmini.connection;

import com.liubs.shadowrpc.clientmini.handler.RequestHandler;
import com.liubs.shadowrpc.clientmini.handler.ResponseHandler;
import com.liubs.shadowrpc.clientmini.nio.NIOClient;
import com.liubs.shadowrpc.clientmini.seriallize.ISerializer;
import com.liubs.shadowrpc.clientmini.seriallize.JavaSerializer;

import java.io.IOException;

/**
* @author Liubsyy
* @date 2024/1/20
**/
public class ShadowClient {

private ISerializer serializer = new JavaSerializer();

private String host;
private int port;

private NIOClient nioClient;
private RequestHandler requestHandler;
private ResponseHandler responseHandler;



public ShadowClient(String host, int port) {
this.host = host;
this.port = port;
this.requestHandler = new RequestHandler(serializer);
this.responseHandler = new ResponseHandler(serializer);
this.nioClient = new NIOClient(host,port,responseHandler);
}

public void connect() throws IOException, InterruptedException {
nioClient.connect();
}


public <T> T createRemoteProxy(Class<T> serviceStub, String serviceName) {
return RemoteProxy.create(this,serviceStub,serviceName);
}

public RequestHandler getRequestHandler() {
return requestHandler;
}

public ResponseHandler getResponseHandler() {
return responseHandler;
}

public void sendMessage(byte[] bytes) {
nioClient.sendMessage(bytes);
}

public void close(){
nioClient.close();
}


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package com.liubs.shadowrpc.clientmini.handler;

import com.liubs.shadowrpc.protocol.entity.JavaSerializeRPCResponse;

import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;

/**
* @author Liubsyy
* @date 2023/12/3 11:32 PM
**/
public class ReceiveHolder {
public Map<String, CompletableFuture<Object>> futureMap = new ConcurrentHashMap<>();

private static ReceiveHolder instance = new ReceiveHolder();
public static ReceiveHolder getInstance() {
return instance;
}


public Future<?> initFuture(String uuid){
CompletableFuture<Object> completableFuture = new CompletableFuture<>();
futureMap.put(uuid,completableFuture);
return completableFuture;
}

public void deleteWait(String uuid) {
futureMap.remove(uuid);
}


public void receiveData(JavaSerializeRPCResponse responseModel){
CompletableFuture<Object> future = futureMap.remove(responseModel.getTraceId());
if(null != future) {
future.complete(responseModel);
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package com.liubs.shadowrpc.clientmini.handler;

import com.liubs.shadowrpc.clientmini.seriallize.ISerializer;

/**
* @author Liubsyy
* @date 2024/1/20
**/
public class RequestHandler {
private ISerializer serializer;

public RequestHandler(ISerializer serializer) {
this.serializer = serializer;
}

public byte[] handleMessage(Object obj) {
return serializer.serialize(obj);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package com.liubs.shadowrpc.clientmini.handler;

import com.liubs.shadowrpc.clientmini.logger.Logger;
import com.liubs.shadowrpc.clientmini.nio.IMessageListener;
import com.liubs.shadowrpc.clientmini.seriallize.ISerializer;
import com.liubs.shadowrpc.protocol.entity.JavaSerializeRPCResponse;

/**
* @author Liubsyy
* @date 2024/1/20
**/

public class ResponseHandler implements IMessageListener {

private static final int SUCCESS = 10;

private static Logger logger = Logger.getLogger(ResponseHandler.class);

private ISerializer serializer;

public ResponseHandler(ISerializer serializer) {
this.serializer = serializer;
}

@Override
public void handleMessage(byte[] bytes) {
JavaSerializeRPCResponse response = serializer.deserialize(bytes, JavaSerializeRPCResponse.class);
if(response == null) {
return;
}
//接收消息
ReceiveHolder.getInstance().receiveData(response);
}
}
Loading

0 comments on commit 9643a91

Please sign in to comment.