Skip to content

Commit

Permalink
UBERF-9075 Support LiveKit recordings in Datalake (#7607)
Browse files Browse the repository at this point in the history
Signed-off-by: Alexander Onnikov <[email protected]>
  • Loading branch information
aonnikov authored Jan 8, 2025
1 parent bfe2960 commit f8c27c3
Show file tree
Hide file tree
Showing 6 changed files with 250 additions and 45 deletions.
4 changes: 2 additions & 2 deletions common/config/rush/command-line.json
Original file line number Diff line number Diff line change
Expand Up @@ -238,8 +238,8 @@
"summary": "Build docker with platform",
"description": "use to build all docker containers required for platform",
"safeForSimultaneousRushProcesses": true,
"shellCommand": "rush docker:build -p 20 --to @hcengineering/pod-server --to @hcengineering/pod-front --to @hcengineering/prod --to @hcengineering/pod-account --to @hcengineering/pod-workspace --to @hcengineering/pod-collaborator --to @hcengineering/tool --to @hcengineering/pod-print --to @hcengineering/pod-sign --to @hcengineering/pod-analytics-collector --to @hcengineering/rekoni-service --to @hcengineering/pod-ai-bot --to @hcengineering/import-tool --to @hcengineering/pod-stats --to @hcengineering/pod-fulltext"
},
"shellCommand": "rush docker:build -p 20 --to @hcengineering/pod-server --to @hcengineering/pod-front --to @hcengineering/prod --to @hcengineering/pod-account --to @hcengineering/pod-workspace --to @hcengineering/pod-collaborator --to @hcengineering/tool --to @hcengineering/pod-print --to @hcengineering/pod-sign --to @hcengineering/pod-analytics-collector --to @hcengineering/rekoni-service --to @hcengineering/pod-ai-bot --to @hcengineering/import-tool --to @hcengineering/pod-stats --to @hcengineering/pod-fulltext --to @hcengineering/pod-love"
},
{
"commandKind": "global",
"name": "docker:up",
Expand Down
38 changes: 38 additions & 0 deletions server/datalake/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,11 @@ interface MultipartUploadPart {
etag: string
}

export interface R2UploadParams {
location: string
bucket: string
}

/** @public */
export class DatalakeClient {
constructor (private readonly endpoint: string) {}
Expand Down Expand Up @@ -320,6 +325,8 @@ export class DatalakeClient {
return await this.signObjectComplete(ctx, workspace, objectName)
}

// S3

async uploadFromS3 (
ctx: MeasureContext,
workspace: WorkspaceId,
Expand All @@ -342,6 +349,37 @@ export class DatalakeClient {
})
}

// R2

async getR2UploadParams (ctx: MeasureContext, workspace: WorkspaceId): Promise<R2UploadParams> {
const path = `/upload/r2/${workspace.name}`
const url = concatLink(this.endpoint, path)

const response = await fetchSafe(ctx, url)
const json = (await response.json()) as R2UploadParams
return json
}

async uploadFromR2 (
ctx: MeasureContext,
workspace: WorkspaceId,
objectName: string,
params: {
filename: string
}
): Promise<void> {
const path = `/upload/r2/${workspace.name}/${encodeURIComponent(objectName)}`
const url = concatLink(this.endpoint, path)

await fetchSafe(ctx, url, {
method: 'POST',
headers: {
'Content-Type': 'application/json'
},
body: JSON.stringify(params)
})
}

// Signed URL

private async signObjectSign (ctx: MeasureContext, workspace: WorkspaceId, objectName: string): Promise<string> {
Expand Down
2 changes: 2 additions & 0 deletions services/love/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@
"@hcengineering/platform": "^0.6.11",
"@hcengineering/server-client": "^0.6.0",
"@hcengineering/server-token": "^0.6.11",
"@hcengineering/datalake": "^0.6.0",
"@hcengineering/s3": "^0.6.0",
"livekit-server-sdk": "^2.0.10",
"jwt-simple": "^0.5.6",
"uuid": "^8.3.2",
Expand Down
5 changes: 4 additions & 1 deletion services/love/src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ interface Config {

StorageConfig: string
StorageProviderName: string
S3StorageConfig: string
Secret: string

MongoUrl: string
Expand All @@ -39,6 +40,7 @@ const envMap: { [key in keyof Config]: string } = {

StorageConfig: 'STORAGE_CONFIG',
StorageProviderName: 'STORAGE_PROVIDER_NAME',
S3StorageConfig: 'S3_STORAGE_CONFIG',
Secret: 'SECRET',
ServiceID: 'SERVICE_ID',
MongoUrl: 'MONGO_URL'
Expand All @@ -55,12 +57,13 @@ const config: Config = (() => {
ApiSecret: process.env[envMap.ApiSecret],
StorageConfig: process.env[envMap.StorageConfig],
StorageProviderName: process.env[envMap.StorageProviderName] ?? 's3',
S3StorageConfig: process.env[envMap.S3StorageConfig],
Secret: process.env[envMap.Secret],
ServiceID: process.env[envMap.ServiceID] ?? 'love-service',
MongoUrl: process.env[envMap.MongoUrl]
}

const optional = ['StorageConfig']
const optional = ['StorageConfig', 'S3StorageConfig']

const missingEnv = (Object.keys(params) as Array<keyof Config>)
.filter((key) => !optional.includes(key))
Expand Down
61 changes: 19 additions & 42 deletions services/love/src/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@
// limitations under the License.
//

import { Ref, toWorkspaceString, WorkspaceId } from '@hcengineering/core'
import { MeasureContext, Ref, WorkspaceId } from '@hcengineering/core'
import { setMetadata } from '@hcengineering/platform'
import serverClient from '@hcengineering/server-client'
import { initStatisticsContext, StorageConfig, StorageConfiguration } from '@hcengineering/server-core'
import { buildStorageFromConfig, storageConfigFromEnv } from '@hcengineering/server-storage'
import { storageConfigFromEnv } from '@hcengineering/server-storage'
import serverToken, { decodeToken } from '@hcengineering/server-token'
import { RoomMetadata, TranscriptionStatus, MeetingMinutes } from '@hcengineering/love'
import cors from 'cors'
Expand All @@ -32,8 +32,8 @@ import {
S3Upload,
WebhookReceiver
} from 'livekit-server-sdk'
import { v4 as uuid } from 'uuid'
import config from './config'
import { getS3UploadParams, saveFile } from './storage'
import { WorkspaceClient } from './workspaceClient'

const extractToken = (header: IncomingHttpHeaders): any => {
Expand All @@ -50,11 +50,14 @@ export const main = async (): Promise<void> => {
setMetadata(serverToken.metadata.Secret, config.Secret)

const storageConfigs: StorageConfiguration = storageConfigFromEnv()
const s3StorageConfigs: StorageConfiguration | undefined =
config.S3StorageConfig !== undefined ? storageConfigFromEnv(config.S3StorageConfig) : undefined

const ctx = initStatisticsContext('love', {})

const storageConfig = storageConfigs.storages.findLast((p) => p.name === config.StorageProviderName)
const storageAdapter = buildStorageFromConfig(storageConfigs)
const s3storageConfig = s3StorageConfigs?.storages.findLast((p) => p.kind === 's3')

const app = express()
const port = config.Port
app.use(cors())
Expand All @@ -81,13 +84,11 @@ export const main = async (): Promise<void> => {
if (event.event === 'egress_ended' && event.egressInfo !== undefined) {
for (const res of event.egressInfo.fileResults) {
const data = dataByUUID.get(res.filename)
if (data !== undefined) {
const prefix = rootPrefix(storageConfig, data.workspaceId)
const filename = stripPrefix(prefix, res.filename)
const storedBlob = await storageAdapter.stat(ctx, data.workspaceId, filename)
if (data !== undefined && storageConfig !== undefined) {
const storedBlob = await saveFile(ctx, data.workspaceId, storageConfig, s3storageConfig, res.filename)
if (storedBlob !== undefined) {
const client = await WorkspaceClient.create(data.workspace, ctx)
await client.saveFile(filename, data.name, storedBlob, data.meetingMinutes)
await client.saveFile(storedBlob._id, data.name, storedBlob, data.meetingMinutes)
await client.close()
}
dataByUUID.delete(res.filename)
Expand Down Expand Up @@ -135,7 +136,7 @@ export const main = async (): Promise<void> => {
try {
const dateStr = new Date().toISOString().replace('T', '_').slice(0, 19)
const name = `${room}_${dateStr}.mp4`
const id = await startRecord(storageConfig, egressClient, roomClient, roomName, workspace)
const id = await startRecord(ctx, storageConfig, s3storageConfig, egressClient, roomClient, roomName, workspace)
dataByUUID.set(id, { name, workspace: workspace.name, workspaceId: workspace, meetingMinutes })
ctx.info('Start recording', { workspace: workspace.name, roomName, meetingMinutes })
res.send()
Expand Down Expand Up @@ -257,50 +258,26 @@ const checkRecordAvailable = async (storageConfig: StorageConfig | undefined): P
return storageConfig !== undefined
}

function getBucket (storageConfig: any, workspaceId: WorkspaceId): string {
return storageConfig.rootBucket ?? (storageConfig.bucketPrefix ?? '') + toWorkspaceString(workspaceId)
}

function getBucketFolder (workspaceId: WorkspaceId): string {
return toWorkspaceString(workspaceId)
}

function getDocumentKey (storageConfig: any, workspace: WorkspaceId, name: string): string {
return storageConfig.rootBucket === undefined ? name : `${getBucketFolder(workspace)}/${name}`
}

function stripPrefix (prefix: string | undefined, key: string): string {
if (prefix !== undefined && key.startsWith(prefix)) {
return key.slice(prefix.length)
}
return key
}

function rootPrefix (storageConfig: any, workspaceId: WorkspaceId): string | undefined {
return storageConfig.rootBucket !== undefined ? getBucketFolder(workspaceId) + '/' : undefined
}

const startRecord = async (
ctx: MeasureContext,
storageConfig: StorageConfig | undefined,
s3StorageConfig: StorageConfig | undefined,
egressClient: EgressClient,
roomClient: RoomServiceClient,
roomName: string,
workspaceId: WorkspaceId
): Promise<string> => {
if (storageConfig === undefined) {
console.error('please provide s3 storage configuration')
throw new Error('please provide s3 storage configuration')
console.error('please provide storage configuration')
throw new Error('please provide storage configuration')
}
const endpoint = storageConfig.endpoint
const accessKey = (storageConfig as any).accessKey
const secret = (storageConfig as any).secretKey
const region = (storageConfig as any).region ?? 'auto'
const bucket = getBucket(storageConfig, workspaceId)
const name = uuid()
const filepath = getDocumentKey(storageConfig, workspaceId, `${name}.mp4`)
const uploadParams = await getS3UploadParams(ctx, workspaceId, storageConfig, s3StorageConfig)

const { filepath, endpoint, accessKey, secret, region, bucket } = uploadParams
const output = new EncodedFileOutput({
fileType: EncodedFileType.MP4,
filepath,
disableManifest: true,
output: {
case: 's3',
value: new S3Upload({
Expand Down
Loading

0 comments on commit f8c27c3

Please sign in to comment.