Skip to content

Commit

Permalink
edit
Browse files Browse the repository at this point in the history
  • Loading branch information
usfalami committed Oct 11, 2024
1 parent af7da58 commit ca8b4bb
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 11 deletions.
8 changes: 7 additions & 1 deletion src/main/java/org/usf/inspect/core/MainSessionAspect.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.reflect.MethodSignature;
import org.springframework.core.Ordered;

import lombok.RequiredArgsConstructor;

Expand All @@ -26,7 +27,7 @@
*/
@Aspect
@RequiredArgsConstructor
public class MainSessionAspect {
public class MainSessionAspect implements Ordered {

@Around("@annotation(TraceableStage)")
Object aroundBatch(ProceedingJoinPoint joinPoint) throws Throwable {
Expand Down Expand Up @@ -64,4 +65,9 @@ static void fill(LocalRequest stg, Instant start, Instant end, ProceedingJoinPoi
.ifPresent(u-> u.update(stg, joinPoint));
}
}

@Override
public int getOrder() { //before @Transactional
return HIGHEST_PRECEDENCE;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public final class RestClientProperties {
private String host = "http://localhost:9000";
private String instanceApi = "v3/trace/instance"; //[POST] async
private String sessionApi = "v3/trace/instance/{id}/session"; //[PUT] async
private int compressMinSize = 5_000; //in bytes, -1 no compress
private int compressMinSize = 0; //in bytes, 0 no compress

void validate() {
assertMatches(host, HOST_PATTERN);
Expand Down
42 changes: 34 additions & 8 deletions src/main/java/org/usf/inspect/core/ScheduledDispatchHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,25 @@ public void handle(T o) {

@SuppressWarnings("unchecked")
public boolean submit(T... arr) {
if(state != DISABLE) { // CACHE | DISPATCH
doSync(q-> addAll(q, arr));
log.trace("{} new items buffered", arr.length);
return true;
var res = state == DISABLE || applySync(q-> { // CACHE | DISPATCH
var size = q.size();
var done = false;
try {
done = addAll(q, arr); //false | OutOfMemoryError
} finally {
if(done) {
log.trace("{} new items buffered", arr.length);
} //addAll or nothing
else if(q.size() > size) { //partial add
q.subList(size, q.size()).clear();
}
}
return done;
});
if(!res) {
log.warn("{} items rejected, dispatcher.state={}", arr.length, state);
}
log.warn("{} items rejected, dispatcher.state={}", arr.length, state);
return false;
return res;
}

public void updateState(DispatchState state) {
Expand Down Expand Up @@ -103,7 +115,18 @@ private void dispatch(boolean complete) {
cs.size(), attempts, e.getClass().getSimpleName(), e.getMessage()); //do not log exception stack trace
}
if(attempts > 0) { //exception | !dispatch
doSync(q-> q.addAll(0, cs));
doSync(q-> {
var size = q.size();
var done = false;
try {
done = q.addAll(0, cs); //false | OutOfMemoryError
}
finally {
if(!done) {
log.warn("{} items have been lost from buffer", size + cs.size() - q.size());
}
}
});
}
}
}
Expand Down Expand Up @@ -139,10 +162,13 @@ List<T> pop() {
it.remove();
}
}
if(q.size() > c.size()) {
log.info("{}/{} sessions are not yet completed", q.size()-c.size(), q.size());
}
return c;
});
}

private void doSync(Consumer<List<T>> cons) {
synchronized(queue){
cons.accept(queue);
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/org/usf/inspect/rest/RestSessionFilter.java
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ protected boolean shouldNotFilter(HttpServletRequest request) throws ServletExce
@Override //Session stage !?
public void afterCompletion(HttpServletRequest req, HttpServletResponse res, Object handler, Exception ex) throws Exception {
var hm = (handler instanceof HandlerMethod o) ? o : null;
if(!shouldNotFilter(req) && (isNull(hm) || BasicErrorController.class != hm.getBean().getClass())) {
if(!shouldNotFilter(req) && (isNull(hm) || BasicErrorController.class != hm.getBean().getClass())) { //exclude spring controller, called twice : after throwing exception
var ses = requireCurrentSession(RestSession.class);
if(nonNull(ses)) {
ses.setName(defaultEndpointName(req));
Expand Down

0 comments on commit ca8b4bb

Please sign in to comment.