Skip to content

Commit

Permalink
Use explicit ExecutorService for Java scenario event listening
Browse files Browse the repository at this point in the history
Avoid exhausting the ForkJoin.commonPool() in constrained environments, which can cause deadlocks.

Signed-off-by: Mark S. Lewis <[email protected]>
  • Loading branch information
bestbeforetoday committed Dec 2, 2023
1 parent 633e073 commit 41aa93a
Showing 1 changed file with 14 additions and 7 deletions.
21 changes: 14 additions & 7 deletions java/src/test/java/scenario/BasicEventListener.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,27 +9,33 @@
import java.util.Objects;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;

import org.hyperledger.fabric.client.CloseableIterator;

public final class BasicEventListener<T> implements EventListener<T> {
private final BlockingQueue<T> eventQueue = new SynchronousQueue<>();
private final ExecutorService executor = Executors.newSingleThreadExecutor();
private final Runnable close;

public BasicEventListener(final CloseableIterator<T> iterator) {
close = iterator::close;

// Start reading events immediately as Java gRPC implementation may not invoke the gRPC service until the first
// read attempt occurs.
CompletableFuture.runAsync(() -> iterator.forEachRemaining(event -> {
try {
eventQueue.put(event);
} catch (InterruptedException e) {
iterator.close();
}
}));
CompletableFuture.runAsync(
() -> iterator.forEachRemaining(event -> {
try {
eventQueue.put(event);
} catch (InterruptedException e) {
iterator.close();
}
}),
executor
);
}

public T next() throws InterruptedException {
Expand All @@ -40,5 +46,6 @@ public T next() throws InterruptedException {

public void close() {
close.run();
executor.shutdownNow();
}
}

0 comments on commit 41aa93a

Please sign in to comment.