-
Notifications
You must be signed in to change notification settings - Fork 13
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: worker pooling #962
base: main
Are you sure you want to change the base?
feat: worker pooling #962
Changes from 5 commits
cfa9b01
3d77943
7e4462e
9be7ee4
ad8e667
7838471
6487117
94e9926
d57b4ea
2c4e0e2
4389e57
4f120b4
b548328
ebf7188
49c5d1f
577ed2a
348c359
16a91dc
4457e63
10f46df
c9f7284
74f97b3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
Large diffs are not rendered by default.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,6 +2,12 @@ | |
// SPDX-License-Identifier: MPL-2.0 | ||
|
||
import { getLogger } from "../../../log.ts"; | ||
import { | ||
createSimpleWaitQueue, | ||
PoolConfig, | ||
WaitQueue, | ||
WaitQueueWithTimeout, | ||
} from "./pooling.ts"; | ||
import { BaseMessage, EventHandler, TaskId } from "./types.ts"; | ||
|
||
const logger = getLogger(import.meta, "WARN"); | ||
|
@@ -17,25 +23,53 @@ export abstract class BaseWorker<M extends BaseMessage, E extends BaseMessage> { | |
abstract get id(): TaskId; | ||
} | ||
|
||
type DeallocateOptions = { | ||
destroy?: boolean; | ||
/// defaults to `true` | ||
/// recreate workers to replace destroyed ones if `.destroy` is `true`. | ||
/// Set to `false` for deinit. | ||
ensureMinWorkers?: boolean; | ||
}; | ||
|
||
export class BaseWorkerManager< | ||
T, | ||
M extends BaseMessage, | ||
E extends BaseMessage, | ||
> { | ||
#name: string; | ||
#activeTasks: Map<TaskId, { | ||
worker: BaseWorker<M, E>; | ||
taskSpec: T; | ||
}> = new Map(); | ||
#tasksByName: Map<string, Set<TaskId>> = new Map(); | ||
#startedAt: Map<TaskId, Date> = new Map(); | ||
#poolConfig: PoolConfig; | ||
// TODO auto-remove idle workers after a certain time | ||
#idleWorkers: BaseWorker<M, E>[] = []; | ||
#waitQueue: WaitQueue<BaseWorker<M, E>>; | ||
#nextWorkerId = 1; | ||
|
||
#workerFactory: () => BaseWorker<M, E>; | ||
|
||
#workerFactory: (taskId: TaskId) => BaseWorker<M, E>; | ||
protected constructor(workerFactory: (taskId: TaskId) => BaseWorker<M, E>) { | ||
this.#workerFactory = workerFactory; | ||
get #workerCount() { | ||
return this.#idleWorkers.length + this.#activeTasks.size; | ||
} | ||
|
||
get workerFactory() { | ||
return this.#workerFactory; | ||
protected constructor( | ||
name: string, | ||
workerFactory: (taskId: TaskId) => BaseWorker<M, E>, | ||
config: PoolConfig = {}, | ||
) { | ||
this.#name = name; | ||
this.#workerFactory = () => | ||
workerFactory(`${this.#name} worker #${this.#nextWorkerId++}`); | ||
this.#poolConfig = config; | ||
|
||
if (config.waitTimeoutMs == null) { // no timeout | ||
this.#waitQueue = createSimpleWaitQueue(); | ||
} else { | ||
this.#waitQueue = new WaitQueueWithTimeout(config.waitTimeoutMs ?? 30000); | ||
} | ||
} | ||
|
||
protected getActiveTaskNames() { | ||
|
@@ -68,49 +102,85 @@ export class BaseWorkerManager< | |
return startedAt; | ||
} | ||
|
||
// allocate worker? | ||
protected createWorker(name: string, taskId: TaskId, taskSpec: T) { | ||
const worker = this.#workerFactory(taskId); | ||
// TODO inline | ||
this.addWorker(name, taskId, worker, taskSpec, new Date()); | ||
#nextWorker() { | ||
const idleWorker = this.#idleWorkers.shift(); | ||
if (idleWorker) { | ||
return Promise.resolve(idleWorker); | ||
} | ||
if ( | ||
this.#poolConfig.maxWorkers == null || | ||
this.#activeTasks.size < this.#poolConfig.maxWorkers | ||
) { | ||
return Promise.resolve(this.#workerFactory()); | ||
} | ||
return this.#waitForWorker(); | ||
} | ||
|
||
protected addWorker( | ||
#waitForWorker() { | ||
return new Promise<BaseWorker<M, E>>((resolve, reject) => { | ||
this.#waitQueue.push( | ||
(worker) => resolve(worker), | ||
() => | ||
reject( | ||
new Error("timeout while waiting for a worker to be available"), | ||
), | ||
); | ||
}); | ||
} | ||
|
||
protected async delegateTask( | ||
name: string, | ||
taskId: TaskId, | ||
worker: BaseWorker<M, E>, | ||
taskSpec: T, | ||
startedAt: Date, | ||
) { | ||
): Promise<void> { | ||
const worker = await this.#nextWorker(); | ||
|
||
if (!this.#tasksByName.has(name)) { | ||
this.#tasksByName.set(name, new Set()); | ||
} | ||
|
||
this.#tasksByName.get(name)!.add(taskId); | ||
this.#activeTasks.set(taskId, { worker, taskSpec }); | ||
if (!this.#startedAt.has(taskId)) { | ||
this.#startedAt.set(taskId, startedAt); | ||
this.#startedAt.set(taskId, new Date()); | ||
} | ||
} | ||
|
||
protected destroyAllWorkers() { | ||
for (const name of this.getActiveTaskNames()) { | ||
this.destroyWorkersByName(name); | ||
protected deallocateAllWorkers(options: DeallocateOptions = {}) { | ||
const activeTaskNames = this.getActiveTaskNames(); | ||
if (activeTaskNames.length > 0) { | ||
if (options.destroy) { | ||
logger.warn( | ||
`destroying workers for tasks ${ | ||
activeTaskNames.map((w) => `"${w}"`).join(", ") | ||
}`, | ||
); | ||
} | ||
for (const name of activeTaskNames) { | ||
this.deallocateWorkersByName(name, options); | ||
} | ||
} | ||
} | ||
|
||
protected destroyWorkersByName(name: string) { | ||
protected deallocateWorkersByName( | ||
name: string, | ||
options: DeallocateOptions = {}, | ||
) { | ||
const taskIds = this.#tasksByName.get(name); | ||
if (taskIds) { | ||
for (const taskId of taskIds) { | ||
this.destroyWorker(name, taskId); | ||
this.deallocateWorker(name, taskId, options); | ||
} | ||
return true; | ||
} | ||
return false; | ||
} | ||
|
||
protected destroyWorker(name: string, taskId: TaskId) { | ||
deallocateWorker( | ||
name: string, | ||
taskId: TaskId, | ||
{ destroy = false, ensureMinWorkers = true }: DeallocateOptions = {}, | ||
) { | ||
const task = this.#activeTasks.get(taskId); | ||
if (this.#tasksByName.has(name)) { | ||
if (!task) { | ||
|
@@ -120,14 +190,41 @@ export class BaseWorkerManager< | |
return false; | ||
} | ||
|
||
task.worker.destroy(); | ||
this.#activeTasks.delete(taskId); | ||
this.#tasksByName.get(name)!.delete(taskId); | ||
// startedAt records are not deleted | ||
|
||
if (destroy) { | ||
task.worker.destroy(); | ||
|
||
const taskAdded = this.#waitQueue.shift(() => this.#workerFactory()); | ||
if (!taskAdded) { // no task from the queue | ||
if (ensureMinWorkers) { | ||
const { minWorkers } = this.#poolConfig; | ||
if (minWorkers != null && this.#workerCount < minWorkers) { | ||
this.#idleWorkers.push(this.#workerFactory()); | ||
} | ||
} | ||
} | ||
} else { | ||
const taskAdded = this.#waitQueue.shift(() => task.worker); | ||
if (!taskAdded) { // worker has not been reassigned | ||
const { maxWorkers } = this.#poolConfig; | ||
// how?? xD | ||
// We might add "urgent" tasks in the future; | ||
// in this case the worker count might exceed `maxWorkers`. | ||
if (maxWorkers != null && this.#workerCount >= maxWorkers) { | ||
task.worker.destroy(); | ||
} else { | ||
this.#idleWorkers.push(task.worker); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I feel like we should never exceed max but instead ensure it is big enough at start. Not sure how that would fit here but a design idea is to allocate as much as needed with a small room for newer tasks: allocate |
||
} | ||
} | ||
} | ||
|
||
return true; | ||
} | ||
|
||
logger.warn(`Task with name "${name}" does not exist`); | ||
return false; | ||
} | ||
|
||
|
@@ -140,6 +237,22 @@ export class BaseWorkerManager< | |
worker.send(msg); | ||
this.logMessage(taskId, msg); | ||
} | ||
|
||
deinit() { | ||
this.deallocateAllWorkers({ destroy: true, ensureMinWorkers: false }); | ||
if (this.#idleWorkers.length > 0) { | ||
logger.warn( | ||
`destroying idle workers: ${ | ||
this.#idleWorkers.map((w) => `"${w.id}"`).join(", ") | ||
}`, | ||
); | ||
for (const worker of this.#idleWorkers) { | ||
worker.destroy(); | ||
} | ||
this.#idleWorkers = []; | ||
} | ||
return Promise.resolve(); | ||
} | ||
} | ||
|
||
export function createTaskId(name: string) { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,96 @@ | ||
// Copyright Metatype OÜ, licensed under the Mozilla Public License Version 2.0. | ||
// SPDX-License-Identifier: MPL-2.0 | ||
|
||
export type PoolConfig = { | ||
maxWorkers?: number | null; | ||
minWorkers?: number | null; | ||
waitTimeoutMs?: number | null; | ||
}; | ||
|
||
export type Consumer<T> = (x: T) => void; | ||
|
||
export interface WaitQueue<W> { | ||
push(consumer: Consumer<W>, onCancel: () => void): void; | ||
shift(produce: () => W): boolean; | ||
} | ||
|
||
export function createSimpleWaitQueue<W>(): WaitQueue<W> { | ||
const queue: Array<Consumer<W>> = []; | ||
return { | ||
push(consumer, _onCancel) { | ||
queue.push(consumer); | ||
}, | ||
shift(produce) { | ||
const consumer = queue.shift(); | ||
if (consumer) { | ||
consumer(produce()); | ||
return true; | ||
} | ||
return false; | ||
}, | ||
}; | ||
} | ||
|
||
export class WaitQueueWithTimeout<W> implements WaitQueue<W> { | ||
#queue: Array<{ | ||
consumer: Consumer<W>; | ||
cancellationHandler: () => void; | ||
addedAt: number; // timestamp | ||
}> = []; | ||
Comment on lines
+46
to
+48
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. addedAt + Millis/Ms? |
||
#timerId: number | null = null; | ||
#waitTimeoutMs: number; | ||
|
||
constructor(timeoutMs: number) { | ||
this.#waitTimeoutMs = timeoutMs; | ||
} | ||
|
||
push(consumer: Consumer<W>, onCancel: () => void) { | ||
this.#queue.push({ | ||
consumer, | ||
cancellationHandler: onCancel, | ||
addedAt: Date.now(), | ||
}); | ||
if (this.#timerId == null) { | ||
if (this.#queue.length !== 1) { | ||
throw new Error("unreachable: inconsistent state: no active timer"); | ||
} | ||
this.#updateTimer(); | ||
} | ||
} | ||
|
||
shift(produce: () => W) { | ||
const entry = this.#queue.shift(); | ||
if (entry) { | ||
entry.consumer(produce()); | ||
return true; | ||
} | ||
return false; | ||
} | ||
|
||
#timeoutHandler() { | ||
this.#cancelNextEntry(); | ||
this.#updateTimer(); | ||
} | ||
|
||
#updateTimer() { | ||
if (this.#queue.length > 0) { | ||
const timeoutMs = this.#queue[0].addedAt + this.#waitTimeoutMs - | ||
Date.now(); | ||
if (timeoutMs <= 0) { | ||
this.#cancelNextEntry(); | ||
this.#updateTimer(); | ||
return; | ||
} | ||
Comment on lines
+88
to
+92
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Wouldn't this be an infinite loop if There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes. That is the goal. It always checks the front of the queue. If it has not changed, we are sure that |
||
this.#timerId = setTimeout( | ||
this.#timeoutHandler.bind(this), | ||
timeoutMs, | ||
); | ||
} else { | ||
this.#timerId = null; | ||
} | ||
} | ||
|
||
#cancelNextEntry() { | ||
this.#queue.shift()!.cancellationHandler(); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider awaiting worker manager deinitialization.
Not awaiting
workerManager.deinit()
could lead to resource leaks or race conditions during shutdown. Consider maintaining the async await to ensure proper cleanup.Apply this diff to ensure proper resource cleanup: