Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for SharedWorker #458

Draft
wants to merge 4 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 54 additions & 13 deletions src/master/implementation.browser.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// tslint:disable max-classes-per-file

import { ImplementationExport, ThreadsWorkerOptions } from "../types/master"
import { ImplementationExport, ThreadsWorkerOptions, TransferList } from "../types/master"
import { getBundleURL } from "./get-bundle-url.browser"

export const defaultPoolSize = typeof navigator !== "undefined" && navigator.hardwareConcurrency
Expand All @@ -18,16 +18,6 @@ function createSourceBlobURL(code: string): string {
}

function selectWorkerImplementation(): ImplementationExport {
if (typeof Worker === "undefined") {
// Might happen on Safari, for instance
// The idea is to only fail if the constructor is actually used
return class NoWebWorker {
constructor() {
throw Error("No web worker implementation available. You might have tried to spawn a worker within a worker in a browser that doesn't support workers in workers.")
}
} as any
}

class WebWorker extends Worker {
constructor(url: string | URL, options?: ThreadsWorkerOptions) {
if (typeof url === "string" && options && options._baseURL) {
Expand All @@ -49,6 +39,20 @@ function selectWorkerImplementation(): ImplementationExport {
}
}

function getWorkerImpl() {
if (typeof Worker === "undefined") {
// Might happen on Safari, for instance
// The idea is to only fail if the constructor is actually used
return class NoWebWorker {
constructor() {
throw Error("No web worker implementation available. You might have tried to spawn a worker within a worker in a browser that doesn't support workers in workers.")
}
} as any
}

return WebWorker
}

class BlobWorker extends WebWorker {
constructor(blob: Blob, options?: ThreadsWorkerOptions) {
const url = window.URL.createObjectURL(blob)
Expand All @@ -61,9 +65,46 @@ function selectWorkerImplementation(): ImplementationExport {
}
}

class SharedWebWorker extends SharedWorker {
constructor(url: string | URL, options?: ThreadsWorkerOptions) {
if (typeof url === "string" && options && options._baseURL) {
url = new URL(url, options._baseURL)
} else if (typeof url === "string" && !isAbsoluteURL(url) && getBundleURL().match(/^file:\/\//i)) {
url = new URL(url, getBundleURL().replace(/\/[^\/]+$/, "/"))
if (options?.CORSWorkaround ?? true) {
url = createSourceBlobURL(`importScripts(${JSON.stringify(url)});`)
}
}
if (typeof url === "string" && isAbsoluteURL(url)) {
// Create source code blob loading JS file via `importScripts()`
// to circumvent worker CORS restrictions
if (options?.CORSWorkaround ?? true) {
url = createSourceBlobURL(`importScripts(${JSON.stringify(url)});`)
}
}
super(url.toString(), options)
this.port.start()
}
}

function getSharedWorkerImpl() {
if (typeof SharedWorker === "undefined") {
// Might happen on Safari, for instance
// The idea is to only fail if the constructor is actually used
return class NoSharedWebWorker {
constructor() {
throw Error("No shared web worker implementation available. Maybe your browser doesn't support shared web workers.")
}
} as any
}

return SharedWebWorker
}

return {
blob: BlobWorker,
default: WebWorker
default: getWorkerImpl(),
shared: getSharedWorkerImpl()
}
}

Expand All @@ -78,5 +119,5 @@ export function getWorkerImplementation(): ImplementationExport {

export function isWorkerRuntime() {
const isWindowContext = typeof self !== "undefined" && typeof Window !== "undefined" && self instanceof Window
return typeof self !== "undefined" && self.postMessage && !isWindowContext ? true : false
return typeof self !== "undefined" && typeof self.postMessage === "function" && !isWindowContext ? true : false
}
14 changes: 11 additions & 3 deletions src/master/implementation.node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,12 @@ function rebaseScriptPath(scriptPath: string, ignoreRegex: RegExp) {
return rebasedScriptPath
}

class NoSharedWebWorker {
constructor() {
throw Error("Shared workers are not supported in node.")
}
}

function resolveScriptPath(scriptPath: string, baseURL?: string | undefined) {
const makeRelative = (filePath: string) => {
// eval() hack is also webpack-related
Expand Down Expand Up @@ -164,7 +170,8 @@ function initWorkerThreadsWorker(): ImplementationExport {

return {
blob: BlobWorker as any,
default: Worker as any
default: Worker as any,
shared: NoSharedWebWorker as any
}
}

Expand Down Expand Up @@ -245,7 +252,8 @@ function initTinyWorker(): ImplementationExport {

return {
blob: BlobWorker as any,
default: Worker as any
default: Worker as any,
shared: NoSharedWebWorker as any
}
}

Expand Down Expand Up @@ -273,7 +281,7 @@ export function getWorkerImplementation(): ImplementationExport {

export function isWorkerRuntime() {
if (isTinyWorker) {
return typeof self !== "undefined" && self.postMessage ? true : false
return typeof self !== "undefined" && typeof self.postMessage === "function" ? true : false
} else {
// Webpack hack
const isMainThread = typeof __non_webpack_require__ === "function"
Expand Down
2 changes: 1 addition & 1 deletion src/master/implementation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import * as BrowserImplementation from "./implementation.browser"
import * as NodeImplementation from "./implementation.node"

const runningInNode = typeof process !== 'undefined' && process.arch !== 'browser' && 'pid' in process
const runningInNode = typeof process !== 'undefined' && 'pid' in process
const implementation = runningInNode ? NodeImplementation : BrowserImplementation

/** Default size of pools. Depending on the platform the value might vary from device to device. */
Expand Down
6 changes: 5 additions & 1 deletion src/master/index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// tslint:disable no-duplicate-imports
import type { BlobWorker as BlobWorkerClass } from "../types/master"
import { Worker as WorkerType } from "../types/master"
import { Worker as WorkerType, SharedWorker as SharedWorkerType } from "../types/master"
import { getWorkerImplementation, isWorkerRuntime } from "./implementation"

export { FunctionThread, ModuleThread } from "../types/master"
Expand All @@ -11,9 +11,13 @@ export { isWorkerRuntime }

export type BlobWorker = typeof BlobWorkerClass
export type Worker = WorkerType
export type SharedWorker = SharedWorkerType

/** Separate class to spawn workers from source code blobs or strings. */
export const BlobWorker = getWorkerImplementation().blob

/** Worker implementation. Either web worker or a node.js Worker class. */
export const Worker = getWorkerImplementation().default

/** SharedWorker implementation. Can only be a shared web worker class. */
export const SharedWorker = getWorkerImplementation().shared
16 changes: 10 additions & 6 deletions src/master/invocation-proxy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ import {
ModuleMethods,
ModuleProxy,
ProxyableFunction,
Worker as WorkerType
Worker as WorkerType,
SharedWorker as SharedWorkerType
} from "../types/master"
import {
MasterJobCancelMessage,
Expand All @@ -26,6 +27,7 @@ import {
WorkerMessageType
} from "../types/messages"

type TWorker = WorkerType | SharedWorkerType
const debugMessages = DebugLogger("threads:master:messages")

let nextJobUID = 1
Expand All @@ -36,7 +38,7 @@ const isJobErrorMessage = (data: any): data is WorkerJobErrorMessage => data &&
const isJobResultMessage = (data: any): data is WorkerJobResultMessage => data && data.type === WorkerMessageType.result
const isJobStartMessage = (data: any): data is WorkerJobStartMessage => data && data.type === WorkerMessageType.running

function createObservableForJob<ResultType>(worker: WorkerType, jobUID: number): Observable<ResultType> {
function createObservableForJob<ResultType>(worker: WorkerType | SharedWorkerType, jobUID: number): Observable<ResultType> {
return new Observable(observer => {
let asyncType: "observable" | "promise" | undefined

Expand Down Expand Up @@ -81,7 +83,8 @@ function createObservableForJob<ResultType>(worker: WorkerType, jobUID: number):
type: MasterMessageType.cancel,
uid: jobUID
}
worker.postMessage(cancelMessage)
if('port' in worker) worker.port.postMessage(cancelMessage);
else worker.postMessage(cancelMessage)
}
worker.removeEventListener("message", messageHandler)
}
Expand Down Expand Up @@ -115,7 +118,7 @@ function prepareArguments(rawArgs: any[]): { args: any[], transferables: Transfe
}
}

export function createProxyFunction<Args extends any[], ReturnType>(worker: WorkerType, method?: string) {
export function createProxyFunction<Args extends any[], ReturnType>(worker: TWorker, method?: string) {
return ((...rawArgs: Args) => {
const uid = nextJobUID++
const { args, transferables } = prepareArguments(rawArgs)
Expand All @@ -129,7 +132,8 @@ export function createProxyFunction<Args extends any[], ReturnType>(worker: Work
debugMessages("Sending command to run function to worker:", runMessage)

try {
worker.postMessage(runMessage, transferables)
if('port' in worker) worker.port.postMessage(runMessage, transferables)
else worker.postMessage(runMessage, transferables)
} catch (error) {
return ObservablePromise.from(Promise.reject(error))
}
Expand All @@ -139,7 +143,7 @@ export function createProxyFunction<Args extends any[], ReturnType>(worker: Work
}

export function createProxyModule<Methods extends ModuleMethods>(
worker: WorkerType,
worker: TWorker,
methodNames: string[]
): ModuleProxy<Methods> {
const proxy: any = {}
Expand Down
8 changes: 5 additions & 3 deletions src/master/register.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import { Worker as WorkerImplementation } from "./index"
import { Worker as WorkerImplementation, SharedWorker as SharedWorkerImplementation } from "./index"

declare const window: any

if (typeof global !== "undefined") {
(global as any).Worker = WorkerImplementation
(global as any).Worker = WorkerImplementation;
(global as any).SharedWorker = SharedWorkerImplementation;
} else if (typeof window !== "undefined") {
(window as any).Worker = WorkerImplementation
(window as any).Worker = WorkerImplementation;
(window as any).SharedWorker = SharedWorkerImplementation;
}
20 changes: 13 additions & 7 deletions src/master/spawn.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import {
PrivateThreadProps,
StripAsync,
Worker as WorkerType,
SharedWorker as SharedWorkerType,
WorkerEvent,
WorkerEventType,
WorkerInternalErrorEvent,
Expand All @@ -19,6 +20,7 @@ import { WorkerInitMessage, WorkerUncaughtErrorMessage } from "../types/messages
import { WorkerFunction, WorkerModule } from "../types/worker"
import { createProxyFunction, createProxyModule } from "./invocation-proxy"

type TWorker = WorkerType | SharedWorkerType
type ArbitraryWorkerInterface = WorkerFunction & WorkerModule<string> & { somekeythatisneverusedinproductioncode123: "magicmarker123" }
type ArbitraryThreadType = FunctionThread<any, any> & ModuleThread<any>

Expand Down Expand Up @@ -58,7 +60,7 @@ async function withTimeout<T>(promise: Promise<T>, timeoutInMs: number, errorMes
return result
}

function receiveInitMessage(worker: WorkerType): Promise<WorkerInitMessage> {
function receiveInitMessage(worker: TWorker): Promise<WorkerInitMessage> {
return new Promise((resolve, reject) => {
const messageHandler = ((event: MessageEvent) => {
debugMessages("Message from worker before finishing initialization:", event.data)
Expand All @@ -74,7 +76,7 @@ function receiveInitMessage(worker: WorkerType): Promise<WorkerInitMessage> {
})
}

function createEventObservable(worker: WorkerType, workerTermination: Promise<any>): Observable<WorkerEvent> {
function createEventObservable(worker: TWorker, workerTermination: Promise<any>): Observable<WorkerEvent> {
return new Observable<WorkerEvent>(observer => {
const messageHandler = ((messageEvent: MessageEvent) => {
const workerEvent: WorkerMessageEvent<any> = {
Expand Down Expand Up @@ -106,23 +108,27 @@ function createEventObservable(worker: WorkerType, workerTermination: Promise<an
})
}

function createTerminator(worker: WorkerType): { termination: Promise<void>, terminate: () => Promise<void> } {
function createTerminator(worker: TWorker): { termination: Promise<void>, terminate: () => Promise<void> } {
const [termination, resolver] = createPromiseWithResolver<void>()
const terminate = async () => {
debugThreadUtils("Terminating worker")
// Newer versions of worker_threads workers return a promise
await worker.terminate()
if('port' in worker) {
// TODO: send termination message to shared worker.
worker.port.close()
}
else await worker.terminate()
resolver()
}
return { terminate, termination }
}

function setPrivateThreadProps<T>(raw: T, worker: WorkerType, workerEvents: Observable<WorkerEvent>, terminate: () => Promise<void>): T & PrivateThreadProps {
function setPrivateThreadProps<T extends object>(raw: T, worker: TWorker, workerEvents: Observable<WorkerEvent>, terminate: () => Promise<void>): T & PrivateThreadProps {
const workerErrors = workerEvents
.filter(event => event.type === WorkerEventType.internalError)
.map(errorEvent => (errorEvent as WorkerInternalErrorEvent).error)

// tslint:disable-next-line prefer-object-spread
// tslint:disable-next-line:prefer-object-spread
return Object.assign(raw, {
[$errors]: workerErrors,
[$events]: workerEvents,
Expand All @@ -141,7 +147,7 @@ function setPrivateThreadProps<T>(raw: T, worker: WorkerType, workerEvents: Obse
* @param [options.timeout] Init message timeout. Default: 10000 or set by environment variable.
*/
export async function spawn<Exposed extends WorkerFunction | WorkerModule<any> = ArbitraryWorkerInterface>(
worker: WorkerType,
worker: TWorker,
options?: { timeout?: number }
): Promise<ExposedToThreadType<Exposed>> {
debugSpawn("Initializing new thread")
Expand Down
4 changes: 3 additions & 1 deletion src/observable-promise.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,9 @@ export class ObservablePromise<T> extends Observable<T> implements Promise<T> {
private rejection: Error | undefined
private state: "fulfilled" | "pending" | "rejected" = "pending"

public readonly [Symbol.toStringTag]: "[object ObservablePromise]"
get [Symbol.toStringTag]() {
return "[object ObservablePromise]";
}

constructor(init: Initializer<T>) {
super((originalObserver: SubscriptionObserver<T>) => {
Expand Down
14 changes: 13 additions & 1 deletion src/types/master.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ export interface PrivateThreadProps {
[$errors]: Observable<Error>
[$events]: Observable<WorkerEvent>
[$terminate]: () => Promise<void>
[$worker]: Worker
[$worker]: Worker | SharedWorker
}

export type FunctionThread<Args extends any[] = any[], ReturnType = any> = ProxyableFunction<Args, ReturnType> & PrivateThreadProps
Expand Down Expand Up @@ -79,6 +79,12 @@ export interface Worker extends EventTarget {
/** In nodejs 10+ return type is Promise while with tiny-worker and in browser return type is void */
terminate(callback?: (error?: Error, exitCode?: number) => void): void | Promise<number>
}

/** SharedWorker instance. Either a web worker or a node.js Worker provided by `worker_threads` or `tiny-worker`. */
export interface SharedWorker extends EventTarget {
port: MessagePort
}

export interface ThreadsWorkerOptions extends WorkerOptions {
/** Prefix for the path passed to the Worker constructor. Web worker only. */
_baseURL?: string
Expand Down Expand Up @@ -111,9 +117,15 @@ export declare class BlobWorker extends WorkerImplementation {
public static fromText(source: string, options?: ThreadsWorkerOptions): WorkerImplementation
}

export declare class SharedWorkerImplementation extends EventTarget implements SharedWorker {
constructor(path: string, options?: ThreadsWorkerOptions)
port: MessagePort
}

export interface ImplementationExport {
blob: typeof BlobWorker
default: typeof WorkerImplementation
shared: typeof SharedWorkerImplementation
}

/** Event as emitted by worker thread. Subscribe to using `Thread.events(thread)`. */
Expand Down
4 changes: 2 additions & 2 deletions src/types/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ type UnsubscribeFn = () => void

export interface AbstractedWorkerAPI {
isWorkerRuntime(): boolean
postMessageToMaster(message: any, transferList?: Transferable[]): void
subscribeToMasterMessages(onMessage: (data: any) => void): UnsubscribeFn
postMessageToMaster(context: any, message: any, transferList?: Transferable[]): void
subscribeToMasterMessages(context: any, onMessage: (context: any, data: any) => void): UnsubscribeFn
}

export type WorkerFunction = ((...args: any[]) => any) | (() => any)
Expand Down
Loading