Skip to content

Commit

Permalink
gergo/eventBus (#2498)
Browse files Browse the repository at this point in the history
* feat(eventBus): WIP event bus typescript wizardy

* feat(eventBus): final eventbus setup with all the typescript foo

* fix(workspaces): fix workspace core imports

* test(workspaces): fix expected events name

* test(workspaces): fix tests
  • Loading branch information
gjedlicska authored Jul 12, 2024
1 parent 4da196e commit ee6e5e2
Show file tree
Hide file tree
Showing 14 changed files with 355 additions and 32 deletions.
126 changes: 126 additions & 0 deletions packages/server/modules/shared/services/eventBus.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
import {
WorkspaceEventsPayloads,
workspaceEventNamespace
} from '@/modules/workspacesCore/domain/events'
import { MaybeAsync } from '@speckle/shared'
import { UnionToIntersection } from 'type-fest'

import EventEmitter from 'eventemitter2'

type EventWildcard = '*'

type TestEvents = {
['test.string']: string
['test.number']: number
}

// we should only ever extend this type, other helper types will be derived from this
type EventsByNamespace = {
test: TestEvents
[workspaceEventNamespace]: WorkspaceEventsPayloads
}

type EventTypes = UnionToIntersection<EventsByNamespace[keyof EventsByNamespace]>

// generated union to collect all event
type EventNamesByNamespace = {
[Namespace in keyof EventsByNamespace]: keyof EventsByNamespace[Namespace]
}

// generated type for a top level wildcard one level nested wildcards per namespace and each possible event
type EventSubscriptionKey =
| EventWildcard
| `${keyof EventNamesByNamespace}.${EventWildcard}`
| {
[Namespace in keyof EventNamesByNamespace]: EventNamesByNamespace[Namespace]
}[keyof EventNamesByNamespace]

// generated flatten of each specific event name with the emitted event type
type EventPayloadsMap = UnionToIntersection<
EventPayloadsByNamespaceMap[keyof EventPayloadsByNamespaceMap]
>

type EventNames = keyof EventPayloadsMap

type EventPayloadsByNamespaceMap = {
// for each event namespace
[Key in keyof EventsByNamespace]: {
// for each event
[EventName in keyof EventsByNamespace[Key]]: {
// create a type with they original event as the payload, and the eventName
eventName: EventName
payload: EventsByNamespace[Key][EventName]
}
}
}

type EventPayload<T extends EventSubscriptionKey> = T extends EventWildcard
? // if event key is "*", get all events from the flat object
EventPayloadsMap[keyof EventPayloadsMap]
: // else if, the key is a "namespace.*" wildcard
T extends `${infer Namespace}.${EventWildcard}`
? // the Namespace needs to extend the keys of the type, otherwise we never
Namespace extends keyof EventPayloadsByNamespaceMap
? // get the union type of all possible events in a namespace
EventPayloadsByNamespaceMap[Namespace][keyof EventPayloadsByNamespaceMap[Namespace]]
: never
: // else if, the key is a "namespace.event" concrete key
T extends keyof EventPayloadsMap
? EventPayloadsMap[T]
: never

export function initializeEventBus() {
const emitter = new EventEmitter({ wildcard: true })

return {
/**
* Emit a module event. This function must be awaited to ensure all listeners
* execute. Any errors thrown in the listeners will bubble up and throw from
* the part of code that triggers this emit() call.
*/
emit: async <EventName extends EventNames>(args: {
eventName: EventName
payload: EventTypes[EventName]
}): Promise<unknown[]> => {
// curate the proper payload here and eventName object here, before emitting
return emitter.emitAsync(args.eventName, args)
},

/**
* Listen for module events. Any errors thrown here will bubble out of where
* emit() was invoked.
*
* @returns Callback for stopping listening
*/
listen: <K extends EventSubscriptionKey>(
eventName: K,
// we should add some error type object here with a type discriminator
handler: (event: EventPayload<K>) => MaybeAsync<unknown>
) => {
emitter.on(eventName, handler, {
async: true,
promisify: true
})

return () => {
emitter.removeListener(eventName, handler)
}
},

/**
* Destroy event emitter
*/
destroy() {
emitter.removeAllListeners()
}
}
}

type EventBus = ReturnType<typeof initializeEventBus>

let eventBus: EventBus

export function getEventBus(): EventBus {
if (!eventBus) eventBus = initializeEventBus()
return eventBus
}
1 change: 0 additions & 1 deletion packages/server/modules/shared/test/authz.e2e.spec.js

This file was deleted.

188 changes: 188 additions & 0 deletions packages/server/modules/shared/test/unit/eventBus.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
import { getEventBus, initializeEventBus } from '@/modules/shared/services/eventBus'
import { WorkspaceEvents } from '@/modules/workspacesCore/domain/events'
import { Workspace } from '@/modules/workspacesCore/domain/types'
import { Roles } from '@speckle/shared'
import { expect } from 'chai'
import cryptoRandomString from 'crypto-random-string'

const createFakeWorkspace = (): Workspace => {
return {
id: cryptoRandomString({ length: 10 }),
description: cryptoRandomString({ length: 10 }),
logoUrl: null,
name: cryptoRandomString({ length: 10 }),
updatedAt: new Date(),
createdAt: new Date()
}
}

describe('Event Bus', () => {
describe('initializeEventBus creates an event bus instance, that', () => {
it('calls back all the listeners', async () => {
const testEventBus = initializeEventBus()
const eventNames: string[] = []
testEventBus.listen('test.string', ({ eventName }) => {
eventNames.push(eventName)
})

testEventBus.listen('test.string', ({ eventName }) => {
eventNames.push(eventName)
})

await testEventBus.emit({ eventName: 'test.number', payload: 1 })
expect(eventNames.length).to.equal(0)

const eventName = 'test.string' as const
await testEventBus.emit({ eventName, payload: 'fake event' })

expect(eventNames.length).to.equal(2)
expect(eventNames).to.deep.equal([eventName, eventName])
})
it('can removes listeners from itself', async () => {
const testEventBus = initializeEventBus()
const eventNumbers: number[] = []
testEventBus.listen('test.string', () => {
eventNumbers.push(1)
})

const listenerOff = testEventBus.listen('test.string', () => {
eventNumbers.push(2)
})

await testEventBus.emit({ eventName: 'test.string', payload: 'fake event' })
expect(eventNumbers.sort((a, b) => a - b)).to.deep.equal([1, 2])

listenerOff()

await testEventBus.emit({ eventName: 'test.string', payload: 'fake event' })
expect(eventNumbers.sort((a, b) => a - b)).to.deep.equal([1, 1, 2])
})
it('returns results from listeners to the emitter', async () => {
const testEventBus = initializeEventBus()

testEventBus.listen('test.string', ({ payload }) => ({
outcome: payload
}))

const lookWhatHappened = 'echo this back to me'
const results = await testEventBus.emit({
eventName: 'test.string',
payload: lookWhatHappened
})

expect(results.length).to.equal(1)
expect(results[0]).to.deep.equal({ outcome: lookWhatHappened })
})
it('bubbles up listener exceptions to emitter', async () => {
const testEventBus = initializeEventBus()

testEventBus.listen('test.string', ({ payload }) => {
throw new Error(payload)
})

const lookWhatHappened = 'kabumm'
try {
await testEventBus.emit({ eventName: 'test.string', payload: lookWhatHappened })
throw new Error('this should have thrown by now')
} catch (error) {
if (error instanceof Error) {
expect(error.message).to.equal(lookWhatHappened)
} else {
throw error
}
}
})
it('can be destroyed, removing all listeners', async () => {
const testEventBus = initializeEventBus()
const eventNumbers: number[] = []
testEventBus.listen('test.string', () => {
eventNumbers.push(1)
})

testEventBus.listen('test.string', () => {
eventNumbers.push(2)
})

await testEventBus.emit({ eventName: 'test.string', payload: 'test' })
expect(eventNumbers.sort((a, b) => a - b)).to.deep.equal([1, 2])

testEventBus.destroy()

await testEventBus.emit({ eventName: 'test.string', payload: 'test' })
expect(eventNumbers.sort((a, b) => a - b)).to.deep.equal([1, 2])
})
})
describe('getEventBus', () => {
it('returns a unified event bus instance', async () => {
const bus1 = getEventBus()
const bus2 = getEventBus()

const workspaces: Workspace[] = []

bus1.listen(WorkspaceEvents.Created, ({ payload }) => {
workspaces.push(payload)
})

bus2.listen(WorkspaceEvents.Created, ({ payload }) => {
workspaces.push(payload)
})

const workspacePayload = {
...createFakeWorkspace(),
createdByUserId: cryptoRandomString({ length: 10 }),
eventName: WorkspaceEvents.Created
}

await bus1.emit({ eventName: WorkspaceEvents.Created, payload: workspacePayload })

expect(workspaces.length).to.equal(2)
expect(workspaces).to.deep.equal([workspacePayload, workspacePayload])
})
it('allows to subscribe to wildcard events', async () => {
const eventBus = getEventBus()

const events: string[] = []

eventBus.listen('workspace.*', ({ payload, eventName }) => {
switch (eventName) {
case 'workspace.created':
events.push(payload.id)
break
case 'workspace.role-deleted':
events.push(payload.userId)
break
default:
events.push('default')
}
})

const workspace = createFakeWorkspace()

await eventBus.emit({
eventName: WorkspaceEvents.Created,
payload: {
...workspace,
createdByUserId: cryptoRandomString({ length: 10 })
}
})

const workspaceAcl = {
userId: cryptoRandomString({ length: 10 }),
workspaceId: cryptoRandomString({ length: 10 }),
role: Roles.Workspace.Member
}

await eventBus.emit({
eventName: WorkspaceEvents.RoleDeleted,
payload: workspaceAcl
})

await eventBus.emit({
eventName: WorkspaceEvents.RoleUpdated,
payload: workspaceAcl
})

expect([workspace.id, workspaceAcl.userId, 'default']).to.deep.equal(events)
})
})
})
4 changes: 2 additions & 2 deletions packages/server/modules/workspaces/domain/operations.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import {
WorkspaceEvents,
WorkspaceEventsPayloads
} from '@/modules/workspacesCore/domain/events'
import { Workspace, WorkspaceAcl } from '@/modules/workspaces/domain/types'
import { Workspace, WorkspaceAcl } from '@/modules/workspacesCore/domain/types'

/** Workspace */

Expand Down Expand Up @@ -70,6 +70,6 @@ export type StoreBlob = (args: string) => Promise<string>
/** Events */

export type EmitWorkspaceEvent = <TEvent extends WorkspaceEvents>(args: {
event: TEvent
eventName: TEvent
payload: WorkspaceEventsPayloads[TEvent]
}) => Promise<unknown[]>
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Workspace, WorkspaceAcl } from '@/modules/workspaces/domain/types'
import { Workspace, WorkspaceAcl } from '@/modules/workspacesCore/domain/types'
import {
DeleteWorkspaceRole,
GetWorkspace,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import {
UpsertWorkspace,
UpsertWorkspaceRole
} from '@/modules/workspaces/domain/operations'
import { Workspace } from '@/modules/workspaces/domain/types'
import { Workspace } from '@/modules/workspacesCore/domain/types'
import { Roles } from '@speckle/shared'
import cryptoRandomString from 'crypto-random-string'

Expand Down Expand Up @@ -47,7 +47,10 @@ export const createWorkspaceFactory =
workspaceId: workspace.id
})

await emitWorkspaceEvent({ event: WorkspaceEvents.Created, payload: workspace })
await emitWorkspaceEvent({
eventName: WorkspaceEvents.Created,
payload: { ...workspace, createdByUserId: userId }
})
// emit a workspace created event

return workspace
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import {
GetWorkspaceRoles,
UpsertWorkspaceRole
} from '@/modules/workspaces/domain/operations'
import { WorkspaceAcl } from '@/modules/workspaces/domain/types'
import { WorkspaceAcl } from '@/modules/workspacesCore/domain/types'
import { WorkspaceAdminRequiredError } from '@/modules/workspaces/errors/workspace'
import { isUserLastWorkspaceAdmin } from '@/modules/workspaces/utils/isUserLastWorkspaceAdmin'
import { WorkspaceEvents } from '@/modules/workspacesCore/domain/events'
Expand Down Expand Up @@ -41,7 +41,7 @@ export const deleteWorkspaceRoleFactory =
return null
}

emitWorkspaceEvent({ event: WorkspaceEvents.RoleDeleted, payload: deletedRole })
emitWorkspaceEvent({ eventName: WorkspaceEvents.RoleDeleted, payload: deletedRole })

return deletedRole
}
Expand Down Expand Up @@ -83,7 +83,7 @@ export const setWorkspaceRoleFactory =
await upsertWorkspaceRole({ userId, workspaceId, role })

await emitWorkspaceEvent({
event: WorkspaceEvents.RoleUpdated,
eventName: WorkspaceEvents.RoleUpdated,
payload: { userId, workspaceId, role }
})
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import {
import db from '@/db/knex'
import cryptoRandomString from 'crypto-random-string'
import { expect } from 'chai'
import { Workspace, WorkspaceAcl } from '@/modules/workspaces/domain/types'
import { Workspace, WorkspaceAcl } from '@/modules/workspacesCore/domain/types'
import { expectToThrow } from '@/test/assertionHelper'
import { BasicTestUser, createTestUser } from '@/test/authHelper'

Expand Down
Loading

0 comments on commit ee6e5e2

Please sign in to comment.