From b3adb465d3bb135d94cf1dfca8ce889e38cbbcd7 Mon Sep 17 00:00:00 2001 From: Guillaume Chau Date: Fri, 17 Feb 2023 01:22:44 +0100 Subject: [PATCH 1/2] fix: only handle specific message --- src/common.ts | 1 + src/index.ts | 1 + src/worker.ts | 1 + 3 files changed, 3 insertions(+) diff --git a/src/common.ts b/src/common.ts index fcf9c09..b4109db 100644 --- a/src/common.ts +++ b/src/common.ts @@ -1,6 +1,7 @@ import type { MessagePort } from 'worker_threads' export interface StartupMessage { + tinypoolStartupMessage: true filename: string | null name: string port: MessagePort diff --git a/src/index.ts b/src/index.ts index 2dc5b62..e0f84d2 100644 --- a/src/index.ts +++ b/src/index.ts @@ -688,6 +688,7 @@ class ThreadPool { } const message: StartupMessage = { + tinypoolStartupMessage: true, filename: this.options.filename, name: this.options.name, port: port2, diff --git a/src/worker.ts b/src/worker.ts index ad8d747..c14073b 100644 --- a/src/worker.ts +++ b/src/worker.ts @@ -101,6 +101,7 @@ async function getHandler( parentPort!.on('message', (message: StartupMessage) => { useAtomics = process.env.PISCINA_DISABLE_ATOMICS === '1' ? false : message.useAtomics + if (!message?.tinypoolStartupMessage) return const { port, sharedBuffer, filename, name } = message ;(async function () { if (filename !== null) { From 82aa1142013b4eea97087ff0396a6c5b832234c0 Mon Sep 17 00:00:00 2001 From: Guillaume Chau Date: Fri, 17 Feb 2023 01:26:33 +0100 Subject: [PATCH 2/2] feat: broadcastMessage --- src/index.ts | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/src/index.ts b/src/index.ts index e0f84d2..3f03cc8 100644 --- a/src/index.ts +++ b/src/index.ts @@ -3,6 +3,7 @@ import { MessageChannel, MessagePort, receiveMessageOnPort, + parentPort, } from 'worker_threads' import { once } from 'events' import EventEmitterAsyncResource from './EventEmitterAsyncResource' @@ -1025,6 +1026,12 @@ class Tinypool extends EventEmitterAsyncResource { return this.#pool.runTask(task, { transferList, filename, name, signal }) } + broadcastMessage(message: any) { + for (const workerInfo of this.#pool.workers) { + workerInfo.worker.postMessage(message) + } + } + destroy() { return this.#pool.destroy() } @@ -1103,6 +1110,14 @@ class Tinypool extends EventEmitterAsyncResource { } } +export function onBroadcastedMessage(handler: (message: any) => void) { + if (parentPort) { + parentPort.on('message', handler) + } else { + throw new Error('onBroadcastedMessage can only be used in worker threads') + } +} + const _workerId = process.__tinypool_state__?.workerId export * from './common'