diff --git a/engine/src/main/java/org/terasology/engine/world/chunks/pipeline/ChunkExecutorCompletionService.java b/engine/src/main/java/org/terasology/engine/world/chunks/pipeline/ChunkExecutorCompletionService.java new file mode 100644 index 00000000000..01a6ef7357c --- /dev/null +++ b/engine/src/main/java/org/terasology/engine/world/chunks/pipeline/ChunkExecutorCompletionService.java @@ -0,0 +1,126 @@ +// Copyright 2023 The Terasology Foundation +// SPDX-License-Identifier: Apache-2.0 + +package org.terasology.engine.world.chunks.pipeline; + +import org.joml.Vector3i; +import org.joml.Vector3ic; +import org.terasology.engine.world.chunks.Chunk; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Callable; +import java.util.concurrent.CompletionService; +import java.util.concurrent.Future; +import java.util.concurrent.RunnableFuture; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +/** + * A specialised alternative to {@link java.util.concurrent.ExecutorCompletionService}, + * used for submitting chunk tasks and queuing their results. + * + * Whilst this class adheres to the {@link CompletionService} interface, use of the class's + * {@link #submit(Callable, Vector3ic)} overload is preferred over those inherited from the interface. + */ +public class ChunkExecutorCompletionService implements CompletionService { + private static final Vector3ic EMPTY_VECTOR3I = new Vector3i(); + private final ThreadPoolExecutor threadPoolExecutor; + private final BlockingQueue> completionQueue; + + private final class ChunkFutureWithCompletion extends PositionFuture { + ChunkFutureWithCompletion(Callable callable, Vector3ic position) { + super(callable, position); + } + + ChunkFutureWithCompletion(Runnable runnable, Chunk result, Vector3ic position) { + super(runnable, result, position); + } + + @Override + protected void done() { + super.done(); + try { + completionQueue.put(this); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + } + + public ChunkExecutorCompletionService(ThreadPoolExecutor threadPoolExecutor, BlockingQueue> completionQueue) { + this.threadPoolExecutor = threadPoolExecutor; + this.completionQueue = completionQueue; + } + + /** + * Submits a task to be executed. + * @param callable the task to submit + * + * @deprecated Use {@link #submit(Callable, Vector3ic)} instead + */ + @Override + @Deprecated + public Future submit(Callable callable) { + RunnableFuture task = new ChunkFutureWithCompletion(callable, EMPTY_VECTOR3I); + threadPoolExecutor.execute(task); + return task; + } + + /** + * Submits a chunk task to be executed. + * @param callable the chunk task to execute. + * @param position the position of the chunk. + * @return the submitted task. + */ + public Future submit(Callable callable, Vector3ic position) { + RunnableFuture task = new ChunkFutureWithCompletion(callable, position); + threadPoolExecutor.execute(task); + return task; + } + + /** + * Submits a task to be executed. + * @param runnable the task to run. + * @param value the value to return upon task completion. + * + * @deprecated Use {@link #submit(Callable, Vector3ic)} instead + */ + @Override + @Deprecated + public Future submit(Runnable runnable, Chunk value) { + RunnableFuture task = new ChunkFutureWithCompletion(runnable, value, EMPTY_VECTOR3I); + threadPoolExecutor.execute(task); + return task; + } + + /** + * Retrieves a completed task from the queue. + * @return a completed task. + * @throws InterruptedException if interrupted whilst waiting on the queue. + */ + @Override + public Future take() throws InterruptedException { + return completionQueue.take(); + } + + /** + * Retrieves a completed task from the queue if not empty. + * @return a completed task, or null if there are no tasks in the queue. + */ + @Override + public Future poll() { + return completionQueue.poll(); + } + + /** + * Retrieves a completed task from the queue if not empty. + * @param l the timeout duration before returning null. + * @param timeUnit the time units of the timeout duration. + * + * @return a completed task, or null if there are no tasks in the queue. + */ + @Override + public Future poll(long l, TimeUnit timeUnit) throws InterruptedException { + return completionQueue.poll(l, timeUnit); + } +} diff --git a/engine/src/main/java/org/terasology/engine/world/chunks/pipeline/ChunkProcessingPipeline.java b/engine/src/main/java/org/terasology/engine/world/chunks/pipeline/ChunkProcessingPipeline.java index 5323ec245c5..15fce52938f 100644 --- a/engine/src/main/java/org/terasology/engine/world/chunks/pipeline/ChunkProcessingPipeline.java +++ b/engine/src/main/java/org/terasology/engine/world/chunks/pipeline/ChunkProcessingPipeline.java @@ -13,7 +13,6 @@ import org.slf4j.LoggerFactory; import org.terasology.engine.monitoring.ThreadActivity; import org.terasology.engine.monitoring.ThreadMonitor; -import org.terasology.engine.utilities.ReflectionUtil; import org.terasology.engine.world.chunks.Chunk; import org.terasology.engine.world.chunks.pipeline.stages.ChunkTask; import org.terasology.engine.world.chunks.pipeline.stages.ChunkTaskProvider; @@ -21,14 +20,10 @@ import java.util.Comparator; import java.util.List; import java.util.Map; -import java.util.concurrent.Callable; import java.util.concurrent.CancellationException; -import java.util.concurrent.CompletionService; import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.Future; import java.util.concurrent.PriorityBlockingQueue; -import java.util.concurrent.RunnableFuture; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.function.Function; @@ -50,7 +45,7 @@ public class ChunkProcessingPipeline { private final List stages = Lists.newArrayList(); private final Thread reactor; - private final CompletionService chunkProcessor; + private final ChunkExecutorCompletionService chunkProcessor; private final ThreadPoolExecutor executor; private final Function chunkProvider; private final Map chunkProcessingInfoMap = Maps.newConcurrentMap(); @@ -66,17 +61,11 @@ public ChunkProcessingPipeline(Function chunkProvider, Compara NUM_TASK_THREADS, NUM_TASK_THREADS, 0L, TimeUnit.MILLISECONDS, - new PriorityBlockingQueue(800, unwrappingComporator(comparable)), + new PriorityBlockingQueue(800, comparable), this::threadFactory, - this::rejectQueueHandler) { - @Override - protected RunnableFuture newTaskFor(Callable callable) { - RunnableFuture newTaskFor = super.newTaskFor(callable); - return new PositionFuture<>(newTaskFor, ((PositionalCallable) callable).getPosition()); - } - }; + this::rejectQueueHandler); logger.debug("allocated {} threads", NUM_TASK_THREADS); - chunkProcessor = new ExecutorCompletionService<>(executor, + chunkProcessor = new ChunkExecutorCompletionService(executor, new PriorityBlockingQueue<>(800, comparable)); reactor = new Thread(this::chunkTaskHandler); reactor.setDaemon(true); @@ -84,18 +73,6 @@ protected RunnableFuture newTaskFor(Callable callable) { reactor.start(); } - /** - * BlackMagic method: {@link ExecutorCompletionService} wraps task with QueueingFuture (private access) - * there takes wrapped task for comparing in {@link ThreadPoolExecutor} - */ - private Comparator unwrappingComporator(Comparator> comparable) { - return (o1, o2) -> { - Object unwrapped1 = ReflectionUtil.readField(o1, "task"); - Object unwrapped2 = ReflectionUtil.readField(o2, "task"); - return comparable.compare((Future) unwrapped1, (Future) unwrapped2); - }; - } - /** * Reactor thread. Handles all ChunkTask dependency logic and running. */ @@ -190,11 +167,11 @@ private Chunk getChunkBy(ChunkTaskProvider requiredStage, Vector3ic position) { } private Future runTask(ChunkTask task, List chunks) { - return chunkProcessor.submit(new PositionalCallable(() -> { + return chunkProcessor.submit(() -> { try (ThreadActivity ignored = ThreadMonitor.startThreadActivity(task.getName())) { return task.apply(chunks); } - }, task.getPosition())); + }, task.getPosition()); } private Thread threadFactory(Runnable runnable) { @@ -236,8 +213,7 @@ public ListenableFuture invokeGeneratorTask(Vector3ic position, Supplier< SettableFuture exitFuture = SettableFuture.create(); chunkProcessingInfo = new ChunkProcessingInfo(position, exitFuture); chunkProcessingInfoMap.put(position, chunkProcessingInfo); - chunkProcessingInfo.setCurrentFuture(chunkProcessor.submit(new PositionalCallable(generatorTask::get, - position))); + chunkProcessingInfo.setCurrentFuture(chunkProcessor.submit(generatorTask::get, position)); return exitFuture; } } @@ -317,26 +293,4 @@ public boolean isPositionProcessing(Vector3ic pos) { public Iterable getProcessingPosition() { return chunkProcessingInfoMap.keySet(); } - - /** - * Dummy callable for passthru position for {@link java.util.concurrent.ThreadPoolExecutor}#newTaskFor - */ - private static final class PositionalCallable implements Callable { - private final Callable callable; - private final Vector3ic position; - - private PositionalCallable(Callable callable, Vector3ic position) { - this.callable = callable; - this.position = position; - } - - public Vector3ic getPosition() { - return position; - } - - @Override - public Chunk call() throws Exception { - return callable.call(); - } - } } diff --git a/engine/src/main/java/org/terasology/engine/world/chunks/pipeline/PositionFuture.java b/engine/src/main/java/org/terasology/engine/world/chunks/pipeline/PositionFuture.java index 5ce0e3452a4..51791bc741c 100644 --- a/engine/src/main/java/org/terasology/engine/world/chunks/pipeline/PositionFuture.java +++ b/engine/src/main/java/org/terasology/engine/world/chunks/pipeline/PositionFuture.java @@ -5,53 +5,23 @@ import org.joml.Vector3ic; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.RunnableFuture; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; +import java.util.concurrent.Callable; +import java.util.concurrent.FutureTask; -public class PositionFuture implements RunnableFuture { - - private final RunnableFuture delegate; +public class PositionFuture extends FutureTask { private final Vector3ic position; - public PositionFuture(RunnableFuture delegate, Vector3ic position) { - this.delegate = delegate; + public PositionFuture(Callable callable, Vector3ic position) { + super(callable); this.position = position; } - public Vector3ic getPosition() { - return position; - } - - @Override - public void run() { - delegate.run(); - } - - @Override - public boolean cancel(boolean mayInterruptIfRunning) { - return delegate.cancel(mayInterruptIfRunning); - } - - @Override - public boolean isCancelled() { - return delegate.isCancelled(); - } - - @Override - public boolean isDone() { - return delegate.isDone(); - } - - @Override - public T get() throws InterruptedException, ExecutionException { - return delegate.get(); + public PositionFuture(Runnable runnable, T result, Vector3ic position) { + super(runnable, result); + this.position = position; } - @Override - public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, - TimeoutException { - return delegate.get(timeout, unit); + public Vector3ic getPosition() { + return position; } }