Skip to content

Commit

Permalink
Merge pull request #44341 from mkouba/scheduler-offloading-invoker-fix
Browse files Browse the repository at this point in the history
Scheduler: fix OffLoadingInvoker
  • Loading branch information
manovotn authored Nov 6, 2024
2 parents 4bb81a2 + 81c9a0f commit c0efe5b
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,7 @@ public CompletionStage<Void> invoke(ScheduledExecution execution) throws Excepti
executor.schedule(new Runnable() {
@Override
public void run() {
try {
delegate.invoke(execution);
ret.complete(null);
} catch (Exception e) {
ret.completeExceptionally(e);
}
invokeComplete(ret, execution);
}
}, delay, TimeUnit.MILLISECONDS);
return ret;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.BiConsumer;

import io.quarkus.scheduler.ScheduledExecution;

Expand Down Expand Up @@ -30,4 +31,17 @@ protected CompletionStage<Void> invokeDelegate(ScheduledExecution execution) {
return CompletableFuture.failedStage(e);
}
}

protected void invokeComplete(CompletableFuture<Void> ret, ScheduledExecution execution) {
invokeDelegate(execution).whenComplete(new BiConsumer<>() {
@Override
public void accept(Void r, Throwable t) {
if (t != null) {
ret.completeExceptionally(t);
} else {
ret.complete(null);
}
}
});
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.quarkus.scheduler.common.runtime;

import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;

import io.quarkus.scheduler.ScheduledExecution;
Expand Down Expand Up @@ -28,6 +29,7 @@ public OffloadingInvoker(ScheduledInvoker delegate, Vertx vertx) {

@Override
public CompletionStage<Void> invoke(ScheduledExecution execution) throws Exception {
CompletableFuture<Void> ret = new CompletableFuture<>();
Context context = VertxContext.getOrCreateDuplicatedContext(vertx);
VertxContextSafetyToggle.setContextSafe(context, true);
if (delegate.isBlocking()) {
Expand All @@ -40,7 +42,7 @@ public void handle(Void event) {
VirtualThreadsRecorder.getCurrent().execute(new Runnable() {
@Override
public void run() {
doInvoke(execution);
invokeComplete(ret, execution);
}
});
}
Expand All @@ -49,7 +51,7 @@ public void run() {
context.executeBlocking(new Callable<Void>() {
@Override
public Void call() {
doInvoke(execution);
invokeComplete(ret, execution);
return null;
}
}, false);
Expand All @@ -58,19 +60,11 @@ public Void call() {
context.runOnContext(new Handler<Void>() {
@Override
public void handle(Void event) {
doInvoke(execution);
invokeComplete(ret, execution);
}
});
}
return null;
}

void doInvoke(ScheduledExecution execution) {
try {
delegate.invoke(execution);
} catch (Throwable t) {
// already logged by the StatusEmitterInvoker
}
return ret;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ public interface ScheduledInvoker {

/**
* @param execution
* @return the result
* @return the result, never {@code null}
* @throws Exception
*/
CompletionStage<Void> invoke(ScheduledExecution execution) throws Exception;
Expand Down

0 comments on commit c0efe5b

Please sign in to comment.