From ca8b4bb886825718498d6f614125532988725285 Mon Sep 17 00:00:00 2001 From: u$f Date: Fri, 11 Oct 2024 22:24:50 +0200 Subject: [PATCH] edit --- .../usf/inspect/core/MainSessionAspect.java | 8 +++- .../inspect/core/RestClientProperties.java | 2 +- .../core/ScheduledDispatchHandler.java | 42 +++++++++++++++---- .../usf/inspect/rest/RestSessionFilter.java | 2 +- 4 files changed, 43 insertions(+), 11 deletions(-) diff --git a/src/main/java/org/usf/inspect/core/MainSessionAspect.java b/src/main/java/org/usf/inspect/core/MainSessionAspect.java index 2947402..9940251 100644 --- a/src/main/java/org/usf/inspect/core/MainSessionAspect.java +++ b/src/main/java/org/usf/inspect/core/MainSessionAspect.java @@ -16,6 +16,7 @@ import org.aspectj.lang.annotation.Around; import org.aspectj.lang.annotation.Aspect; import org.aspectj.lang.reflect.MethodSignature; +import org.springframework.core.Ordered; import lombok.RequiredArgsConstructor; @@ -26,7 +27,7 @@ */ @Aspect @RequiredArgsConstructor -public class MainSessionAspect { +public class MainSessionAspect implements Ordered { @Around("@annotation(TraceableStage)") Object aroundBatch(ProceedingJoinPoint joinPoint) throws Throwable { @@ -64,4 +65,9 @@ static void fill(LocalRequest stg, Instant start, Instant end, ProceedingJoinPoi .ifPresent(u-> u.update(stg, joinPoint)); } } + + @Override + public int getOrder() { //before @Transactional + return HIGHEST_PRECEDENCE; + } } diff --git a/src/main/java/org/usf/inspect/core/RestClientProperties.java b/src/main/java/org/usf/inspect/core/RestClientProperties.java index bb61c2e..fcad9d5 100644 --- a/src/main/java/org/usf/inspect/core/RestClientProperties.java +++ b/src/main/java/org/usf/inspect/core/RestClientProperties.java @@ -24,7 +24,7 @@ public final class RestClientProperties { private String host = "http://localhost:9000"; private String instanceApi = "v3/trace/instance"; //[POST] async private String sessionApi = "v3/trace/instance/{id}/session"; //[PUT] async - private int compressMinSize = 5_000; //in bytes, -1 no compress + private int compressMinSize = 0; //in bytes, 0 no compress void validate() { assertMatches(host, HOST_PATTERN); diff --git a/src/main/java/org/usf/inspect/core/ScheduledDispatchHandler.java b/src/main/java/org/usf/inspect/core/ScheduledDispatchHandler.java index c5b519d..f61de20 100644 --- a/src/main/java/org/usf/inspect/core/ScheduledDispatchHandler.java +++ b/src/main/java/org/usf/inspect/core/ScheduledDispatchHandler.java @@ -58,13 +58,25 @@ public void handle(T o) { @SuppressWarnings("unchecked") public boolean submit(T... arr) { - if(state != DISABLE) { // CACHE | DISPATCH - doSync(q-> addAll(q, arr)); - log.trace("{} new items buffered", arr.length); - return true; + var res = state == DISABLE || applySync(q-> { // CACHE | DISPATCH + var size = q.size(); + var done = false; + try { + done = addAll(q, arr); //false | OutOfMemoryError + } finally { + if(done) { + log.trace("{} new items buffered", arr.length); + } //addAll or nothing + else if(q.size() > size) { //partial add + q.subList(size, q.size()).clear(); + } + } + return done; + }); + if(!res) { + log.warn("{} items rejected, dispatcher.state={}", arr.length, state); } - log.warn("{} items rejected, dispatcher.state={}", arr.length, state); - return false; + return res; } public void updateState(DispatchState state) { @@ -103,7 +115,18 @@ private void dispatch(boolean complete) { cs.size(), attempts, e.getClass().getSimpleName(), e.getMessage()); //do not log exception stack trace } if(attempts > 0) { //exception | !dispatch - doSync(q-> q.addAll(0, cs)); + doSync(q-> { + var size = q.size(); + var done = false; + try { + done = q.addAll(0, cs); //false | OutOfMemoryError + } + finally { + if(!done) { + log.warn("{} items have been lost from buffer", size + cs.size() - q.size()); + } + } + }); } } } @@ -139,10 +162,13 @@ List pop() { it.remove(); } } + if(q.size() > c.size()) { + log.info("{}/{} sessions are not yet completed", q.size()-c.size(), q.size()); + } return c; }); } - + private void doSync(Consumer> cons) { synchronized(queue){ cons.accept(queue); diff --git a/src/main/java/org/usf/inspect/rest/RestSessionFilter.java b/src/main/java/org/usf/inspect/rest/RestSessionFilter.java index 0475573..5307add 100644 --- a/src/main/java/org/usf/inspect/rest/RestSessionFilter.java +++ b/src/main/java/org/usf/inspect/rest/RestSessionFilter.java @@ -127,7 +127,7 @@ protected boolean shouldNotFilter(HttpServletRequest request) throws ServletExce @Override //Session stage !? public void afterCompletion(HttpServletRequest req, HttpServletResponse res, Object handler, Exception ex) throws Exception { var hm = (handler instanceof HandlerMethod o) ? o : null; - if(!shouldNotFilter(req) && (isNull(hm) || BasicErrorController.class != hm.getBean().getClass())) { + if(!shouldNotFilter(req) && (isNull(hm) || BasicErrorController.class != hm.getBean().getClass())) { //exclude spring controller, called twice : after throwing exception var ses = requireCurrentSession(RestSession.class); if(nonNull(ses)) { ses.setName(defaultEndpointName(req));