-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
a5c0615
commit ec2f06a
Showing
4 changed files
with
187 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,115 @@ | ||
import { describe, expect, test } from 'vitest' | ||
import { typedEvents } from './TypedEvents.js' | ||
|
||
describe('TypedEvents', () => { | ||
|
||
test('streams every emitted event', async () => { | ||
const te = typedEvents<string>() | ||
|
||
const { stream } = te.listen() | ||
const events = take(2)(stream()) | ||
|
||
te.emit('hello') | ||
te.emit('world') | ||
|
||
expect(await events).toEqual(['hello', 'world']) | ||
}) | ||
|
||
test('only receives events after stream has been opened', async () => { | ||
const te = typedEvents<string>() | ||
|
||
te.emit('this is not included') | ||
|
||
const { stream } = te.listen() | ||
const events = take(2)(stream()) | ||
|
||
te.emit('hello') | ||
te.emit('world') | ||
|
||
expect(await events).toEqual(['hello', 'world']) | ||
}) | ||
|
||
test('event stream can be aborted', async () => { | ||
const te = typedEvents<string>() | ||
|
||
const { stream, abort } = te.listen() | ||
const events = take(3)(stream()) | ||
|
||
te.emit('hello') | ||
te.emit('world') | ||
abort() | ||
te.emit('this is not included') | ||
|
||
expect(await events).toEqual(['hello', 'world']) | ||
}) | ||
|
||
test('event stream can be filtered', async () => { | ||
const te = typedEvents<{ id: number, message: string }>() | ||
|
||
const { stream } = te.listen({ filter: (e) => e.id === 1 }) | ||
const events = take(2)(stream()) | ||
|
||
te.emit({ id: 1, message: 'hello' }) | ||
te.emit({ id: 2, message: 'hello' }) | ||
te.emit({ id: 1, message: 'world' }) | ||
|
||
expect((await events).map(e => e.message)).toEqual(['hello', 'world']) | ||
}) | ||
|
||
test('event stream can be transformed', async () => { | ||
const te = typedEvents<string>() | ||
|
||
const { stream } = te.listen() | ||
const events = take(2)(stream(e => e.toUpperCase())) | ||
|
||
te.emit('hello') | ||
te.emit('world') | ||
|
||
expect((await events)).toEqual(['HELLO', 'WORLD']) | ||
}) | ||
|
||
test('event stream is empty after an abort', async () => { | ||
const te = typedEvents<string>() | ||
|
||
const { stream, abort } = te.listen() | ||
stream() | ||
abort() | ||
const events = take(2)(stream()) | ||
|
||
te.emit('hello') | ||
te.emit('world') | ||
|
||
expect(await events).toEqual([]) | ||
}) | ||
|
||
test('there can be two listeners for an event stream', async () => { | ||
const te = typedEvents<string>() | ||
|
||
const { stream: stream1, abort: abort1 } = te.listen() | ||
const { stream: stream2 } = te.listen() | ||
const events1 = take(3)(stream1()) | ||
const events2 = take(3)(stream2()) | ||
|
||
te.emit('hello') | ||
te.emit('world') | ||
abort1() | ||
te.emit('!') | ||
|
||
expect(await events1).toEqual(['hello', 'world']) | ||
expect(await events2).toEqual(['hello', 'world', '!']) | ||
}) | ||
|
||
}) | ||
|
||
const take = (count: number) => async <T>(asyncIterable: AsyncIterable<T>): Promise<T[]> => { | ||
if (count <= 0) return [] | ||
|
||
const elements: T[] = [] | ||
let i = 0 | ||
for await (const element of asyncIterable) { | ||
elements.push(element) | ||
i++ | ||
if (i >= count) break | ||
} | ||
return elements | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,70 @@ | ||
import { EventEmitter } from 'node:events' | ||
import * as crypto from 'crypto' | ||
import { on } from 'events' | ||
|
||
export type TypedEvents = ReturnType<typeof typedEvents> | ||
|
||
/** | ||
* A type-safe way to use EventEmitter | ||
*/ | ||
export const typedEvents = < | ||
TEvent extends NonNullable<unknown> | ||
>(options?: { | ||
eventEmitter?: EventEmitter, | ||
eventName?: string | symbol | ||
}) => { | ||
const eventEmitter = options?.eventEmitter ?? new EventEmitter() | ||
const eventName = options?.eventName ?? crypto.randomUUID() | ||
|
||
const emit = (event: TEvent) => eventEmitter.emit(eventName, event) | ||
|
||
const listen = (options?: { | ||
filter?: (e: TEvent) => boolean, | ||
onAbort?: (reason?: string) => void, | ||
onError?: (e: unknown) => void, | ||
}) => { | ||
const ac = new AbortController | ||
|
||
const abort = (reason?: string) => ac.abort(reason) | ||
ac.signal.onabort = () => options?.onAbort?.(typeof ac.signal.reason === 'string' ? ac.signal.reason : undefined) | ||
|
||
const stream = async function* <R = TEvent>(transform?: (e: TEvent) => R): AsyncIterable<R> { | ||
try { | ||
for await (const [event] of on(eventEmitter, eventName.toString(), { signal: ac.signal })) { | ||
const typedEvent = event as unknown as TEvent | ||
if (options?.filter?.(typedEvent) ?? true) { | ||
yield transform?.(typedEvent) ?? typedEvent as unknown as R | ||
} | ||
} | ||
} catch (e: unknown) { | ||
if (!(isAbortError(e))) { | ||
if (typeof options?.onError === 'function') options.onError(e) | ||
else throw e | ||
} | ||
} | ||
} | ||
|
||
return { stream, abort } | ||
} | ||
|
||
return { emit, listen } | ||
} | ||
|
||
const isAbortError = (e: unknown) => | ||
e && typeof e === 'object' && 'name' in e && e.name === 'AbortError' | ||
|
||
/* | ||
* Example: | ||
* const te = typedEvents<{ id: number, message: string }>() | ||
* | ||
* function sse(request: Request, reply: Reply) { | ||
* const { stream, abort } = te.listen({ filter: (e) => e.id === request.params.id }) // listen with filter | ||
* | ||
* request.socket.on('close', () => abort('connection closed')) // abort on connection close | ||
* reply.sse(stream((e) => ({ data: JSON.stringify(e) })) // stream transformed events using SSE | ||
* } | ||
* | ||
* // another part of your system | ||
* te.emit({ id: 17, message: 'Hello' }) | ||
* te.emit({ id: 17, message: 'World' }) | ||
*/ |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
export * from './TypedEvents.js' |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters