Skip to content

Commit

Permalink
* 修复http缓存太小问题
Browse files Browse the repository at this point in the history
* 添加客户端访问池
  • Loading branch information
qiwiwi committed Aug 18, 2021
1 parent 2eb53af commit 3ede36d
Show file tree
Hide file tree
Showing 9 changed files with 96 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ protected void channelRead0(ChannelHandlerContext channelHandlerContext, NDCMess
/* ================================== CONNECTION_INTERRUPTED ================================== */
if (type == NDCMessageProtocol.CONNECTION_INTERRUPTED) {
//todo CONNECTION_INTERRUPTED 连接由服务端中断
logger.info("interrupt connection " + ndcMessageProtocol);
logger.debug("interrupt connection " + ndcMessageProtocol);
JNDCClientConfigCenter bean = UniqueBeanManage.getBean(JNDCClientConfigCenter.class);
bean.shutDownClientServiceProvider(ndcMessageProtocol);
return;
Expand Down
4 changes: 2 additions & 2 deletions jndc_client/src/main/java/jndc_client/exmaple/ClientTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ public class ClientTest {
private static final Logger logger = LoggerFactory.getLogger(ClientTest.class);

public static void main(String[] args) {
File file = new File("D:\\NewWorkSpace\\Tools\\jndc\\jndc_client\\src\\main\\java\\jndc_client\\exmaple\\config_file\\config.yml");
//File file = new File("D:\\JAVA_WORK_SPACE\\jndc\\jndc_client\\src\\main\\java\\jndc_client\\exmaple\\config_file\\config.yml");
//File file = new File("D:\\NewWorkSpace\\Tools\\jndc\\jndc_client\\src\\main\\java\\jndc_client\\exmaple\\config_file\\config.yml");
File file = new File("D:\\JAVA_WORK_SPACE\\jndc\\jndc_client\\src\\main\\java\\jndc_client\\exmaple\\config_file\\config.yml");

YmlParser ymlParser = new YmlParser();
JNDCClientConfig jndcClientConfig = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public static void serviceRebind(List<TcpServiceDescriptionOnServer> tcpServiceD
TcpServiceDescriptionOnServer tcpServiceDescription = map.get(s);
if (tcpServiceDescription != null) {
//todo do rebind
logger_static.info("rebind:" + map + "----->" + bindClientId);
logger_static.debug("rebind:" + map + "----->" + bindClientId);


//rebind the port service
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,11 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.time.LocalTime;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

/**
Expand Down Expand Up @@ -67,7 +65,7 @@ public void parseEnableDateRange(String dateRange) {
String endString = split[1];
startDatePoint = LocalTime.parse(startString);
endDatePoint = LocalTime.parse(endString);
logger.info("set enable range between " + startDatePoint + " to " + endDatePoint);
logger.debug("set enable range between " + startDatePoint + " to " + endDatePoint);
} catch (Exception e) {
//todo if get any exception, the port will reject all requests

Expand Down
6 changes: 3 additions & 3 deletions jndc_server/src/main/java/jndc_server/exmaple/ServerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@ public class ServerTest {

public static void main(String[] args) {

ServerRuntimeConfig.DEBUG_MODEL=true;
ServerRuntimeConfig.DEBUG_MODEL = true;

File file = new File("D:\\NewWorkSpace\\Tools\\jndc\\jndc_server\\src\\main\\java\\jndc_server\\exmaple\\config_file\\config.yml");
//file=new File("D:\\JAVA_WORK_SPACE\\jndc\\jndc_server\\src\\main\\java\\jndc_server\\exmaple\\config_file\\config.yml");
// File file = new File("D:\\NewWorkSpace\\Tools\\jndc\\jndc_server\\src\\main\\java\\jndc_server\\exmaple\\config_file\\config.yml");
File file = new File("D:\\JAVA_WORK_SPACE\\jndc\\jndc_server\\src\\main\\java\\jndc_server\\exmaple\\config_file\\config.yml");

YmlParser ymlParser = new YmlParser();
JNDCServerConfig jndcServerConfig = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,11 @@ protected void channelRead0(ChannelHandlerContext channelHandlerContext, FullHtt

LiteHttpProxy liteHttpProxy = LiteHttpProxyPool.getLiteHttpProxy();
fullHttpResponse = liteHttpProxy.forward(httpHostRoute, fullHttpRequest.retain());
liteHttpProxy.release();

if (fullHttpResponse == null) {
//todo 超时的直接断开
channelHandlerContext.close();
return;
}
}

} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,23 @@
import jndc.utils.InetUtils;
import jndc_server.web_support.model.data_object.HttpHostRoute;
import jndc_server.web_support.utils.BlockValueFeature;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;

import java.util.UUID;
import java.util.function.Consumer;


@Slf4j
@Data
public class LiteHttpProxy {

private String id = UUID.randomUUID().toString();

private volatile boolean canBeReUse;

private volatile boolean hasBeenPut = false;

//回收操作
private Consumer<LiteHttpProxy> recycleOption;

Expand All @@ -32,11 +41,27 @@ public LiteHttpProxy(Consumer<LiteHttpProxy> recycleOption, boolean canBeReUse)
this.canBeReUse = canBeReUse;
}

public boolean canBePut() {
return hasBeenPut == false;
}

public boolean canBeReuse() {
return canBeReUse;
}

public void takeOption() {
hasBeenPut = false;
}

public void putOption() {
hasBeenPut = true;
}

public void release() {
eventLoopGroup.shutdownGracefully();
eventLoopGroup = NettyComponentConfig.getNioEventLoopGroup();

if (canBeReUse && recycleOption != null) {
if (recycleOption != null) {
//todo 能被继续使用
recycleOption.accept(this);
}
Expand All @@ -52,7 +77,7 @@ protected void initChannel(SocketChannel socketChannel) throws Exception {


pipeline.addLast(new HttpClientCodec());
pipeline.addLast(new HttpObjectAggregator(2 * 1024 * 1024));//限制缓冲最大值为2mb
pipeline.addLast(new HttpObjectAggregator(20 * 1024 * 1024));//限制缓冲最大值为2mb
pipeline.addLast(new LiteProxyHandle(liteHttpProxy));
}
};
Expand All @@ -72,6 +97,8 @@ protected void initChannel(SocketChannel socketChannel) throws Exception {
return fullHttpResponse;
} catch (Exception e) {
throw new RuntimeException("转发请求异常" + e);
} finally {
release();
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,43 +11,70 @@
@Slf4j
public class LiteHttpProxyPool {

private static final Integer LIMIT = 10;

private static AtomicInteger blockNum = new AtomicInteger(0);
private static final Integer FIX_SIZE = 1;

private static BlockingQueue<LiteHttpProxy> blockingQueue = new LinkedBlockingQueue<>();
private static final Integer INCREASE_STEP = 5;

private static volatile AtomicInteger freeNum = new AtomicInteger(0);

private static BlockingQueue<LiteHttpProxy> blockingQueue = new LinkedBlockingQueue<>();

private static final Consumer<LiteHttpProxy> tConsumer = (x) -> {
try {
blockingQueue.put(x);
} catch (InterruptedException e) {
throw new RuntimeException("获取请求器异常" + e);
}
};

static {
//5个请求器对象
IntStream.generate(() -> 1).limit(10).forEach(x -> {
IntStream.generate(() -> 1).limit(FIX_SIZE).forEach(x -> {
try {
blockingQueue.put(new LiteHttpProxy(tConsumer, true));
LiteHttpProxy liteHttpProxy = new LiteHttpProxy(getConsumer(), true);
liteHttpProxy.setId("INIT_CLIENT");
blockingQueue.put(liteHttpProxy);
log.debug("初始化,可用计数可用:" + freeNum.incrementAndGet() + "实际可用:" + blockingQueue.size());
} catch (InterruptedException e) {
throw new RuntimeException("获取请求器异常" + e);
}
});
}


private static Consumer<LiteHttpProxy> getConsumer() {
Consumer<LiteHttpProxy> tConsumer = (x) -> {
try {
if (x.canBeReuse()) {

//同步
if (x.canBePut()) {
synchronized (x) {
if (x.canBePut()) {
//todo 可重用且未被回收
freeNum.incrementAndGet();
blockingQueue.put(x);
x.putOption();
log.debug("客户端回收,当前可用客户端计数:" + freeNum.get() + "实际可用:" + blockingQueue.size());
}
}
}


}
} catch (InterruptedException e) {
throw new RuntimeException("获取请求器异常" + e);
}
};
return tConsumer;
}

/**
* 获取请求客户端
*
* @return
*/
public static LiteHttpProxy getLiteHttpProxy() {
try {
blockCheck(blockNum.incrementAndGet());
blockCheck(freeNum.get());
LiteHttpProxy take = blockingQueue.take();
blockNum.decrementAndGet();
take.takeOption();
log.debug("客户端使用,当前可用客户端计数:" + freeNum.decrementAndGet() + "实际可用:" + blockingQueue.size());

return take;
} catch (InterruptedException e) {
throw new RuntimeException("获取请求器异常" + e);
Expand All @@ -57,20 +84,20 @@ public static LiteHttpProxy getLiteHttpProxy() {
/**
* 阻塞检查
*
* @param i
* @param free
*/
public static void blockCheck(int i) {
if (i > LIMIT) {
long v = (long) (i * 0.5);
log.info("线程阻塞数量:" + i + ",执行扩容,添加" + v);
public static void blockCheck(int free) {
if (free == 0) {
//todo 增加不可回收工作者
IntStream.generate(() -> 1).limit(v).forEach(x -> {
IntStream.generate(() -> 1).limit(INCREASE_STEP).parallel().forEach(x -> {
try {
blockingQueue.put(new LiteHttpProxy(tConsumer, false));
freeNum.incrementAndGet();
blockingQueue.put(new LiteHttpProxy(getConsumer(), false));
} catch (InterruptedException e) {
throw new RuntimeException("获取请求器异常" + e);
throw new RuntimeException("扩充请求器异常" + e);
}
});
log.info("扩容后,可用客户端计数:" + freeNum.get() + "实际可用:" + blockingQueue.size());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.FullHttpResponse;
import jndc_server.web_support.utils.HttpResponseBuilder;
import lombok.extern.slf4j.Slf4j;

@Slf4j
Expand Down Expand Up @@ -34,7 +35,10 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception {

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
//todo 异常直接返回异常信息
log.error("proxy handle error " + cause);
FullHttpResponse fullHttpResponse = HttpResponseBuilder.textResponse(cause.toString().getBytes());
liteHttpProxy.writeData(fullHttpResponse);
}

public void release() {
Expand Down

0 comments on commit 3ede36d

Please sign in to comment.