diff --git a/events/TypedEvents.test.ts b/events/TypedEvents.test.ts new file mode 100644 index 0000000..90ba864 --- /dev/null +++ b/events/TypedEvents.test.ts @@ -0,0 +1,115 @@ +import { describe, expect, test } from 'vitest' +import { typedEvents } from './TypedEvents.js' + +describe('TypedEvents', () => { + + test('streams every emitted event', async () => { + const te = typedEvents() + + const { stream } = te.listen() + const events = take(2)(stream()) + + te.emit('hello') + te.emit('world') + + expect(await events).toEqual(['hello', 'world']) + }) + + test('only receives events after stream has been opened', async () => { + const te = typedEvents() + + te.emit('this is not included') + + const { stream } = te.listen() + const events = take(2)(stream()) + + te.emit('hello') + te.emit('world') + + expect(await events).toEqual(['hello', 'world']) + }) + + test('event stream can be aborted', async () => { + const te = typedEvents() + + const { stream, abort } = te.listen() + const events = take(3)(stream()) + + te.emit('hello') + te.emit('world') + abort() + te.emit('this is not included') + + expect(await events).toEqual(['hello', 'world']) + }) + + test('event stream can be filtered', async () => { + const te = typedEvents<{ id: number, message: string }>() + + const { stream } = te.listen({ filter: (e) => e.id === 1 }) + const events = take(2)(stream()) + + te.emit({ id: 1, message: 'hello' }) + te.emit({ id: 2, message: 'hello' }) + te.emit({ id: 1, message: 'world' }) + + expect((await events).map(e => e.message)).toEqual(['hello', 'world']) + }) + + test('event stream can be transformed', async () => { + const te = typedEvents() + + const { stream } = te.listen() + const events = take(2)(stream(e => e.toUpperCase())) + + te.emit('hello') + te.emit('world') + + expect((await events)).toEqual(['HELLO', 'WORLD']) + }) + + test('event stream is empty after an abort', async () => { + const te = typedEvents() + + const { stream, abort } = te.listen() + stream() + abort() + const events = take(2)(stream()) + + te.emit('hello') + te.emit('world') + + expect(await events).toEqual([]) + }) + + test('there can be two listeners for an event stream', async () => { + const te = typedEvents() + + const { stream: stream1, abort: abort1 } = te.listen() + const { stream: stream2 } = te.listen() + const events1 = take(3)(stream1()) + const events2 = take(3)(stream2()) + + te.emit('hello') + te.emit('world') + abort1() + te.emit('!') + + expect(await events1).toEqual(['hello', 'world']) + expect(await events2).toEqual(['hello', 'world', '!']) + }) + +}) + +const take = (count: number) => async (asyncIterable: AsyncIterable): Promise => { + if (count <= 0) return [] + + const elements: T[] = [] + let i = 0 + for await (const element of asyncIterable) { + elements.push(element) + i++ + if (i >= count) break + } + return elements +} diff --git a/events/TypedEvents.ts b/events/TypedEvents.ts new file mode 100644 index 0000000..d2ed597 --- /dev/null +++ b/events/TypedEvents.ts @@ -0,0 +1,70 @@ +import { EventEmitter } from 'node:events' +import * as crypto from 'crypto' +import { on } from 'events' + +export type TypedEvents = ReturnType + +/** + * A type-safe way to use EventEmitter + */ +export const typedEvents = < + TEvent extends NonNullable +>(options?: { + eventEmitter?: EventEmitter, + eventName?: string | symbol +}) => { + const eventEmitter = options?.eventEmitter ?? new EventEmitter() + const eventName = options?.eventName ?? crypto.randomUUID() + + const emit = (event: TEvent) => eventEmitter.emit(eventName, event) + + const listen = (options?: { + filter?: (e: TEvent) => boolean, + onAbort?: (reason?: string) => void, + onError?: (e: unknown) => void, + }) => { + const ac = new AbortController + + const abort = (reason?: string) => ac.abort(reason) + ac.signal.onabort = () => options?.onAbort?.(typeof ac.signal.reason === 'string' ? ac.signal.reason : undefined) + + const stream = async function* (transform?: (e: TEvent) => R): AsyncIterable { + try { + for await (const [event] of on(eventEmitter, eventName.toString(), { signal: ac.signal })) { + const typedEvent = event as unknown as TEvent + if (options?.filter?.(typedEvent) ?? true) { + yield transform?.(typedEvent) ?? typedEvent as unknown as R + } + } + } catch (e: unknown) { + if (!(isAbortError(e))) { + if (typeof options?.onError === 'function') options.onError(e) + else throw e + } + } + } + + return { stream, abort } + } + + return { emit, listen } +} + +const isAbortError = (e: unknown) => + e && typeof e === 'object' && 'name' in e && e.name === 'AbortError' + +/* + * Example: + * const te = typedEvents<{ id: number, message: string }>() + * + * function sse(request: Request, reply: Reply) { + * const { stream, abort } = te.listen({ filter: (e) => e.id === request.params.id }) // listen with filter + * + * request.socket.on('close', () => abort('connection closed')) // abort on connection close + * reply.sse(stream((e) => ({ data: JSON.stringify(e) })) // stream transformed events using SSE + * } + * + * // another part of your system + * te.emit({ id: 17, message: 'Hello' }) + * te.emit({ id: 17, message: 'World' }) + */ diff --git a/events/index.ts b/events/index.ts new file mode 100644 index 0000000..01fc904 --- /dev/null +++ b/events/index.ts @@ -0,0 +1 @@ +export * from './TypedEvents.js' diff --git a/index.ts b/index.ts index c956aa0..d0bc8ba 100644 --- a/index.ts +++ b/index.ts @@ -1,3 +1,4 @@ +export * from './events/index.js' export * from './json/index.js' export * from './lazy/index.js' export * from './locale/index.js'