Skip to content

Commit

Permalink
Merge pull request #66 from oneteme/develop
Browse files Browse the repository at this point in the history
Develop
  • Loading branch information
usfalami authored Oct 14, 2024
2 parents c3dbefc + 76af027 commit 89831d6
Show file tree
Hide file tree
Showing 11 changed files with 143 additions and 101 deletions.
8 changes: 4 additions & 4 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>io.github.oneteme</groupId>
<artifactId>inspect-core</artifactId>
<version>0.0.4</version>
<version>0.0.5</version>
<packaging>jar</packaging>
<name>inspect-core</name>
<description>INtegrated System Performance Evaluation and Communication Tracking core libray</description>
Expand Down Expand Up @@ -63,9 +63,9 @@
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.springframework.ldap</groupId>
<artifactId>spring-ldap-core</artifactId>
<version>3.2.0</version>
<groupId>org.springframework.ldap</groupId>
<artifactId>spring-ldap-core</artifactId>
<version>3.2.0</version>
<scope>provided</scope>
</dependency>
<dependency>
Expand Down
1 change: 0 additions & 1 deletion src/main/java/org/usf/inspect/core/DispatchState.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,4 @@
public enum DispatchState {

DISABLE, CACHE, DISPACH;

}
18 changes: 12 additions & 6 deletions src/main/java/org/usf/inspect/core/Helper.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@

import static java.lang.Math.min;
import static java.lang.Thread.currentThread;
import static java.util.Collections.synchronizedList;
import static java.util.Objects.isNull;
import static java.util.Objects.nonNull;
import static java.util.Optional.empty;
import static org.slf4j.LoggerFactory.getLogger;

import java.util.ArrayList;
import java.util.List;
import java.util.Optional;

Expand All @@ -31,7 +33,7 @@ public final class Helper {
static {
var p = Helper.class.getPackageName();
ROOT_PACKAGE = p.substring(0, p.lastIndexOf(".")); //root
log = getLogger(ROOT_PACKAGE + ".Collector");
log = getLogger(ROOT_PACKAGE + ".collector");
}

public static String threadName() {
Expand All @@ -52,7 +54,7 @@ public static <T> Optional<T> newInstance(Class<? extends T> clazz) {
try {
return Optional.of(clazz.getDeclaredConstructor().newInstance());
} catch (Exception e) {
log.warn("cannot instantiate class " + clazz.getName(), e);
log.warn("cannot instantiate class '{}', exception={}", clazz.getName(), e.getMessage());
return empty();
}
}
Expand All @@ -64,12 +66,12 @@ public static Optional<StackTraceElement> outerStackTraceElement() {
return i<arr.length ? Optional.of(arr[i]) : empty();
}

public static void warnNoActiveSession() {
log.warn("no active session");
public static void warnStackTrace(String msg) {
log.warn(msg);
var arr = currentThread().getStackTrace();
var i = 1; //skip this method call
while(i<arr.length && arr[i].getClassName().startsWith(ROOT_PACKAGE)) {i++;}
var max = min(arr.length, --i+MAX_STACK); //first JQuery method call
var max = min(arr.length, --i+MAX_STACK); //first inspect method call
while(i<max) {
log.warn("\tat {}", arr[i++]);
}
Expand All @@ -81,7 +83,7 @@ public static void warnNoActiveSession() {
public static String prettyURLFormat(String user, String protocol, String host, int port, String path) {
var s = isNull(user) ? "" : '<' + user + '>';
if(nonNull(protocol)) {
s += protocol + "://";
s+= protocol + "://";
}
if(nonNull(host)) {
s+= host;
Expand All @@ -97,4 +99,8 @@ public static String prettyURLFormat(String user, String protocol, String host,
}
return s;
}

static <T> List<T> synchronizedArrayList() {
return synchronizedList(new ArrayList<>());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ class InspectConfiguration implements WebMvcConfigurer, ApplicationListener<Spri
// @ConditionalOnExpression("${inspect.track.rest-session:true}!=false")
public void addInterceptors(InterceptorRegistry registry) {
if(nonNull(config.getTrack().getRestSession())) {
registry.addInterceptor(sessionFilter()).order(LOWEST_PRECEDENCE);
registry.addInterceptor(sessionFilter()).order(LOWEST_PRECEDENCE); //after auth.,.. interceptors
// .excludePathPatterns(config.getTrack().getRestSession().excludedPaths())
}
}
Expand Down
8 changes: 7 additions & 1 deletion src/main/java/org/usf/inspect/core/MainSessionAspect.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.reflect.MethodSignature;
import org.springframework.core.Ordered;

import lombok.RequiredArgsConstructor;

Expand All @@ -26,7 +27,7 @@
*/
@Aspect
@RequiredArgsConstructor
public class MainSessionAspect {
public class MainSessionAspect implements Ordered {

@Around("@annotation(TraceableStage)")
Object aroundBatch(ProceedingJoinPoint joinPoint) throws Throwable {
Expand Down Expand Up @@ -64,4 +65,9 @@ static void fill(LocalRequest stg, Instant start, Instant end, ProceedingJoinPoi
.ifPresent(u-> u.update(stg, joinPoint));
}
}

@Override
public int getOrder() { //before @Transactional
return HIGHEST_PRECEDENCE;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public final class RestClientProperties {
private String host = "http://localhost:9000";
private String instanceApi = "v3/trace/instance"; //[POST] async
private String sessionApi = "v3/trace/instance/{id}/session"; //[PUT] async
private int compressMinSize = 5_000; //in bytes, -1 no compress
private int compressMinSize = 0; //in bytes, 0 no compress

void validate() {
assertMatches(host, HOST_PATTERN);
Expand Down
117 changes: 81 additions & 36 deletions src/main/java/org/usf/inspect/core/ScheduledDispatchHandler.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.usf.inspect.core;

import static java.lang.Thread.currentThread;
import static java.util.Collections.addAll;
import static java.util.Collections.emptyList;
import static java.util.Collections.unmodifiableList;
Expand Down Expand Up @@ -58,17 +59,35 @@ public void handle(T o) {

@SuppressWarnings("unchecked")
public boolean submit(T... arr) {
if(state != DISABLE) { // CACHE | DISPATCH
doSync(q-> addAll(q, arr));
log.trace("{} new items buffered", arr.length);
return true;
var res = state == DISABLE || applySync(q-> { // CACHE | DISPATCH
var size = q.size();
var done = false;
try {
done = addAll(q, arr); //false | OutOfMemoryError
} finally {
if(done) {
log.trace("{} new items buffered", arr.length);
} //addAll or nothing
else if(q.size() > size) { //partial add
q.subList(size, q.size()).clear();
}
}
return done;
});
if(!res) {
log.warn("{} items rejected, dispatcher.state={}", arr.length, state);
}
log.warn("{} items rejected, dispatcher.state={}", arr.length, state);
return false;
return res;
}

public void updateState(DispatchState state) {
this.state = state;
if(this.state != state) {
this.state = state;
}
if(state == DISABLE) {
awaitDispatching(); //wait for last dispatch complete
log.info("dispatcher has been disabled");
}
}

private void tryDispatch() {
Expand All @@ -79,7 +98,7 @@ private void tryDispatch() {
log.warn("dispatcher.state={}", state);
}
if(properties.getBufferMaxSize() > -1 && (state != DISPACH || attempts > 0)) { // !DISPACH | dispatch=fail
doSync(q-> {
doSync(q-> {
if(q.size() > properties.getBufferMaxSize()) {
var diff = q.size() - properties.getBufferMaxSize();
q.subList(properties.getBufferMaxSize(), q.size()).clear(); //remove exceeding cache sessions (LIFO)
Expand All @@ -99,26 +118,40 @@ private void dispatch(boolean complete) {
}
}
catch (Exception e) {// do not throw exception : retry later
log.warn("error while dispatching {} items, attempts={} because : {}",
cs.size(), attempts, e.getMessage()); //do not log exception stack trace
log.warn("error while dispatching {} items, attempts={} because :[{}] {}",
cs.size(), attempts, e.getClass().getSimpleName(), e.getMessage()); //do not log exception stack trace
}
if(attempts > 0) { //exception | !dispatch
doSync(q-> q.addAll(0, cs));
doSync(q-> {
var size = q.size();
var done = false;
try {
done = q.addAll(0, cs); //false | OutOfMemoryError
}
finally {
if(!done) {
log.warn("{} items have been lost from buffer", size + cs.size() - q.size());
}
}
});
}
}
}

public List<T> peek() {
return applySync(q-> {
if(q.isEmpty()) {
return emptyList();
}
var s = q.stream();
if(nonNull(filter)) {
s = s.filter(filter);
}
return s.toList();
});
if(state == DISABLE) {
return applySync(q-> {
if(q.isEmpty()) {
return emptyList();
}
var s = q.stream();
if(nonNull(filter)) {
s = s.filter(filter);
}
return s.toList();
});
}
throw new IllegalStateException("dispatcher.state=" + state);
}

List<T> pop() {
Expand All @@ -139,10 +172,13 @@ List<T> pop() {
it.remove();
}
}
if(q.size() > c.size()) {
log.info("{}/{} sessions are not yet completed", q.size()-c.size(), q.size());
}
return c;
});
}

private void doSync(Consumer<List<T>> cons) {
synchronized(queue){
cons.accept(queue);
Expand All @@ -156,24 +192,33 @@ private <R> R applySync(Function<List<T>, R> fn) {
}

@Override
public void complete() throws InterruptedException {
var stt = this.state;
updateState(DISABLE); //stop add items
public void complete() {
var stt = state;
log.info("shutting down scheduler service");
try {
executor.shutdown(); //cancel future
while(!executor.awaitTermination(10, SECONDS)); //wait for last dispatch complete
}
finally {
if(stt == DISPACH) {
executor.shutdown(); //cancel future
updateState(DISABLE); //stop add items
if(stt == DISPACH) {
try {
dispatch(true); //complete signal
}
if(!queue.isEmpty()) { //!dispatch || dispatch=fail
log.warn("{} items aborted, dispatcher.state={}", queue.size(), stt); // safe queue access
}
}
finally {
if(!queue.isEmpty()) { //!dispatch || dispatch=fail + incomplete session
log.warn("{} items aborted, dispatcher.state={}", queue.size(), stt); // safe queue access
}
}
}
}


void awaitDispatching() {
try {
while(!executor.awaitTermination(10, SECONDS));
}
catch (InterruptedException e) {
log.error("awaitDispatching interrupted", e);
currentThread().interrupt();
}
}

@FunctionalInterface
public interface Dispatcher<T> {

Expand Down
33 changes: 16 additions & 17 deletions src/main/java/org/usf/inspect/core/Session.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ public interface Session extends Metric {

void setId(String id); //used in server side

List<RestRequest> getRestRequests(); // rename to getApiRequests
List<RestRequest> getRestRequests();

List<DatabaseRequest> getDatabaseRequests(); //rename to getDatabaseRequests
List<DatabaseRequest> getDatabaseRequests();

List<LocalRequest> getLocalRequests();

Expand All @@ -38,28 +38,27 @@ public interface Session extends Metric {

AtomicInteger getLock();

default void append(SessionStage stage) {
default boolean append(SessionStage stage) {
if(stage instanceof RestRequest req) {
getRestRequests().add(req);
return getRestRequests().add(req);
}
else if(stage instanceof DatabaseRequest req) {
getDatabaseRequests().add(req);
if(stage instanceof DatabaseRequest req) {
return getDatabaseRequests().add(req);
}
else if(stage instanceof FtpRequest req) {
getFtpRequests().add(req);
if(stage instanceof FtpRequest req) {
return getFtpRequests().add(req);
}
else if(stage instanceof MailRequest req) {
getMailRequests().add(req);
if(stage instanceof MailRequest req) {
return getMailRequests().add(req);
}
else if(stage instanceof NamingRequest req) {
getLdapRequests().add(req);
if(stage instanceof NamingRequest req) {
return getLdapRequests().add(req);
}
else if(stage instanceof LocalRequest req) {
getLocalRequests().add(req);
}
else {
log.warn("unsupported session stage {}", stage);
if(stage instanceof LocalRequest req) {
return getLocalRequests().add(req);
}
log.warn("unsupported session stage {}", stage);
return false;
}

default void lock(){
Expand Down
Loading

0 comments on commit 89831d6

Please sign in to comment.