diff --git a/src/main/kotlin/net/ccbluex/liquidbounce/utils/block/ChunkScanner.kt b/src/main/kotlin/net/ccbluex/liquidbounce/utils/block/ChunkScanner.kt index 3185032794b..9aae6092a5a 100644 --- a/src/main/kotlin/net/ccbluex/liquidbounce/utils/block/ChunkScanner.kt +++ b/src/main/kotlin/net/ccbluex/liquidbounce/utils/block/ChunkScanner.kt @@ -32,19 +32,17 @@ import net.minecraft.util.math.BlockPos import net.minecraft.util.math.ChunkPos import net.minecraft.world.chunk.WorldChunk import java.util.concurrent.CopyOnWriteArrayList -import kotlin.system.measureNanoTime object ChunkScanner : EventListener, MinecraftShortcuts { + init { + ChunkScannerThread + } + private val subscribers = CopyOnWriteArrayList() private val loadedChunks = LongOpenHashSet() - private fun clearAllChunks() { - subscribers.forEach(BlockChangeSubscriber::clearAllChunks) - loadedChunks.clear() - } - @Suppress("unused") private val chunkLoadHandler = handler { event -> val chunk = world.getChunk(event.x, event.z) @@ -79,12 +77,9 @@ object ChunkScanner : EventListener, MinecraftShortcuts { @Suppress("unused") private val worldChangeHandler = handler { - clearAllChunks() - } - - @Suppress("unused") - private val disconnectHandler = handler { - clearAllChunks() + ChunkScannerThread.cancelCurrentJobs() + subscribers.forEach(BlockChangeSubscriber::clearAllChunks) + loadedChunks.clear() } fun subscribe(newSubscriber: BlockChangeSubscriber) { @@ -128,38 +123,44 @@ object ChunkScanner : EventListener, MinecraftShortcuts { private val dispatcher = Dispatchers.Default .limitedParallelism((Runtime.getRuntime().availableProcessors() / 2).coerceAtLeast(2)) - private val scope = CoroutineScope(dispatcher + SupervisorJob()) + /** + * The parent job for the current client world. + * All children will be cancelled on [WorldChangeEvent]. + */ + private val worldJob = SupervisorJob() + + private val scope = CoroutineScope(dispatcher + worldJob) + + private val eventFlow = MutableSharedFlow() /** - * Shared cache for CoroutineScope + * Shared cache for [scope] */ private val mutable by ThreadLocal.withInitial(BlockPos::Mutable) - private val eventFlow = MutableSharedFlow() + /** + * A standalone [Job] to dispatch all [UpdateRequest] from [eventFlow] + */ + private val collectorJob = scope.launch(Job()) { + eventFlow.collect { chunkUpdate -> + // Discard current request when world is null + if (mc.world == null) { + delay(50L) + return@collect + } - init { - // cyclic job, used to process tasks concurrently - scope.launch { - eventFlow.collect { chunkUpdate -> + // Process the update request + launch { try { - // stop current when world is null - if (mc.world == null) { - delay(50L) - return@collect - } - - // process the update request - launch { - when (chunkUpdate) { - is UpdateRequest.ChunkUpdateRequest -> scanChunk(chunkUpdate) + when (chunkUpdate) { + is UpdateRequest.ChunkUpdateRequest -> scanChunk(chunkUpdate) - is UpdateRequest.ChunkUnloadRequest -> subscribers.forEach { - it.clearChunk(chunkUpdate.x, chunkUpdate.z) - } + is UpdateRequest.ChunkUnloadRequest -> subscribers.forEach { + it.clearChunk(chunkUpdate.x, chunkUpdate.z) + } - is UpdateRequest.BlockUpdateEvent -> subscribers.forEach { - it.recordBlock(chunkUpdate.blockPos, chunkUpdate.newState, cleared = false) - } + is UpdateRequest.BlockUpdateEvent -> subscribers.forEach { + it.recordBlock(chunkUpdate.blockPos, chunkUpdate.newState, cleared = false) } } } catch (e: Throwable) { @@ -175,6 +176,13 @@ object ChunkScanner : EventListener, MinecraftShortcuts { } } + /** + * Cancel all existing enqueue(emit) jobs and scanner jobs + */ + fun cancelCurrentJobs() { + worldJob.cancelChildren() + } + /** * Scans the chunks for a block */ @@ -189,12 +197,12 @@ object ChunkScanner : EventListener, MinecraftShortcuts { val currentSubscriber = request.singleSubscriber?.let { listOf(it) } ?: subscribers - if (currentSubscriber.size > 1) { - currentSubscriber.map { + when (currentSubscriber.size) { + 0 -> return + 1 -> currentSubscriber.first().chunkUpdate(chunk.pos.x, chunk.pos.z) + else -> currentSubscriber.map { scope.launch { it.chunkUpdate(chunk.pos.x, chunk.pos.z) } }.joinAll() - } else { - currentSubscriber.first().chunkUpdate(chunk.pos.x, chunk.pos.z) } // Contains all subscriber that want recordBlock called on a chunk update @@ -234,19 +242,17 @@ object ChunkScanner : EventListener, MinecraftShortcuts { fun stopThread() { scope.cancel() + collectorJob.cancel() logger.info("Stopped Chunk Scanner Thread!") } sealed interface UpdateRequest { - @JvmRecord - data class ChunkUpdateRequest(val chunk: WorldChunk, val singleSubscriber: BlockChangeSubscriber? = null) : + class ChunkUpdateRequest(val chunk: WorldChunk, val singleSubscriber: BlockChangeSubscriber? = null) : UpdateRequest - @JvmRecord - data class ChunkUnloadRequest(val x: Int, val z: Int) : UpdateRequest + class ChunkUnloadRequest(val x: Int, val z: Int) : UpdateRequest - @JvmRecord - data class BlockUpdateEvent(val blockPos: BlockPos, val newState: BlockState) : UpdateRequest + class BlockUpdateEvent(val blockPos: BlockPos, val newState: BlockState) : UpdateRequest } }