Skip to content

Commit

Permalink
add typed events
Browse files Browse the repository at this point in the history
  • Loading branch information
andrej-dyck committed Jan 1, 2024
1 parent 60d4229 commit d88a594
Show file tree
Hide file tree
Showing 4 changed files with 187 additions and 0 deletions.
115 changes: 115 additions & 0 deletions events/TypedEvents.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
import { describe, expect, test } from 'vitest'
import { typedEvents } from './TypedEvents.js'

describe('TypedEvents', () => {

test('streams every emitted event', async () => {
const te = typedEvents<string>()

const { stream } = te.listen()
const events = take(2)(stream())

te.emit('hello')
te.emit('world')

expect(await events).toEqual(['hello', 'world'])
})

test('only receives events after stream has been opened', async () => {
const te = typedEvents<string>()

te.emit('this is not included')

const { stream } = te.listen()
const events = take(2)(stream())

te.emit('hello')
te.emit('world')

expect(await events).toEqual(['hello', 'world'])
})

test('event stream can be aborted', async () => {
const te = typedEvents<string>()

const { stream, abort } = te.listen()
const events = take(3)(stream())

te.emit('hello')
te.emit('world')
abort()
te.emit('this is not included')

expect(await events).toEqual(['hello', 'world'])
})

test('event stream can be filtered', async () => {
const te = typedEvents<{ id: number, message: string }>()

const { stream } = te.listen({ filter: (e) => e.id === 1 })
const events = take(2)(stream())

te.emit({ id: 1, message: 'hello' })
te.emit({ id: 2, message: 'hello' })
te.emit({ id: 1, message: 'world' })

expect((await events).map(e => e.message)).toEqual(['hello', 'world'])
})

test('event stream can be transformed', async () => {
const te = typedEvents<string>()

const { stream } = te.listen()
const events = take(2)(stream(e => e.toUpperCase()))

te.emit('hello')
te.emit('world')

expect((await events)).toEqual(['HELLO', 'WORLD'])
})

test('event stream is empty after an abort', async () => {
const te = typedEvents<string>()

const { stream, abort } = te.listen()
stream()
abort()
const events = take(2)(stream())

te.emit('hello')
te.emit('world')

expect(await events).toEqual([])
})

test('there can be two listeners for an event stream', async () => {
const te = typedEvents<string>()

const { stream: stream1, abort: abort1 } = te.listen()
const { stream: stream2 } = te.listen()
const events1 = take(3)(stream1())
const events2 = take(3)(stream2())

te.emit('hello')
te.emit('world')
abort1()
te.emit('!')

expect(await events1).toEqual(['hello', 'world'])
expect(await events2).toEqual(['hello', 'world', '!'])
})

})

const take = (count: number) => async <T>(asyncIterable: AsyncIterable<T>): Promise<T[]> => {
if (count <= 0) return []

const elements: T[] = []
let i = 0
for await (const element of asyncIterable) {
elements.push(element)
i++
if (i >= count) break
}
return elements
}
70 changes: 70 additions & 0 deletions events/TypedEvents.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import { EventEmitter } from 'node:events'
import * as crypto from 'crypto'
import { on } from 'events'

export type TypedEvents = ReturnType<typeof typedEvents>

/**
* A type-safe way to use EventEmitter
*/
export const typedEvents = <
TEvent extends NonNullable<unknown>
>(options?: {
eventEmitter?: EventEmitter,
eventName?: string | symbol
}) => {
const eventEmitter = options?.eventEmitter ?? new EventEmitter()
const eventName = options?.eventName ?? crypto.randomUUID()

const emit = (event: TEvent) => eventEmitter.emit(eventName, event)

const listen = (options?: {
filter?: (e: TEvent) => boolean,
onAbort?: (reason?: string) => void,
onError?: (e: unknown) => void,
}) => {
const ac = new AbortController

const abort = (reason?: string) => ac.abort(reason)
ac.signal.onabort = () => options?.onAbort?.(typeof ac.signal.reason === 'string' ? ac.signal.reason : undefined)

const stream = async function* <R = TEvent>(transform?: (e: TEvent) => R): AsyncIterable<R> {
try {
for await (const [event] of on(eventEmitter, eventName.toString(), { signal: ac.signal })) {
const typedEvent = event as unknown as TEvent
if (options?.filter?.(typedEvent) ?? true) {
yield transform?.(typedEvent) ?? typedEvent as unknown as R
}
}
} catch (e: unknown) {
if (!(isAbortError(e))) {
if (typeof options?.onError === 'function') options.onError(e)
else throw e
}
}
}

return { stream, abort }
}

return { emit, listen }
}

const isAbortError = (e: unknown) =>
e && typeof e === 'object' && 'name' in e && e.name === 'AbortError'

/*
* Example:
* const te = typedEvents<{ id: number, message: string }>()
*
* function sse(request: Request, reply: Reply) {
* const { stream, abort } = te.listen({ filter: (e) => e.id === request.params.id }) // listen with filter
*
* request.socket.on('close', () => abort('connection closed')) // abort on connection close
* reply.sse(stream((e) => ({ data: JSON.stringify(e) })) // stream transformed events using SSE
* }
*
* // another part of your system
* te.emit({ id: 17, message: 'Hello' })
* te.emit({ id: 17, message: 'World' })
*/
1 change: 1 addition & 0 deletions events/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export * from './TypedEvents.js'
1 change: 1 addition & 0 deletions index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
export * from './events/index.js'
export * from './json/index.js'
export * from './lazy/index.js'
export * from './locale/index.js'
Expand Down

0 comments on commit d88a594

Please sign in to comment.