diff --git a/common/config/rush/command-line.json b/common/config/rush/command-line.json index 137cedd174..f43af3a182 100644 --- a/common/config/rush/command-line.json +++ b/common/config/rush/command-line.json @@ -238,8 +238,8 @@ "summary": "Build docker with platform", "description": "use to build all docker containers required for platform", "safeForSimultaneousRushProcesses": true, - "shellCommand": "rush docker:build -p 20 --to @hcengineering/pod-server --to @hcengineering/pod-front --to @hcengineering/prod --to @hcengineering/pod-account --to @hcengineering/pod-workspace --to @hcengineering/pod-collaborator --to @hcengineering/tool --to @hcengineering/pod-print --to @hcengineering/pod-sign --to @hcengineering/pod-analytics-collector --to @hcengineering/rekoni-service --to @hcengineering/pod-ai-bot --to @hcengineering/import-tool --to @hcengineering/pod-stats --to @hcengineering/pod-fulltext" - }, + "shellCommand": "rush docker:build -p 20 --to @hcengineering/pod-server --to @hcengineering/pod-front --to @hcengineering/prod --to @hcengineering/pod-account --to @hcengineering/pod-workspace --to @hcengineering/pod-collaborator --to @hcengineering/tool --to @hcengineering/pod-print --to @hcengineering/pod-sign --to @hcengineering/pod-analytics-collector --to @hcengineering/rekoni-service --to @hcengineering/pod-ai-bot --to @hcengineering/import-tool --to @hcengineering/pod-stats --to @hcengineering/pod-fulltext --to @hcengineering/pod-love" + }, { "commandKind": "global", "name": "docker:up", diff --git a/server/datalake/src/client.ts b/server/datalake/src/client.ts index 8f64dfa372..cca743083f 100644 --- a/server/datalake/src/client.ts +++ b/server/datalake/src/client.ts @@ -73,6 +73,11 @@ interface MultipartUploadPart { etag: string } +export interface R2UploadParams { + location: string + bucket: string +} + /** @public */ export class DatalakeClient { constructor (private readonly endpoint: string) {} @@ -320,6 +325,8 @@ export class DatalakeClient { return await this.signObjectComplete(ctx, workspace, objectName) } + // S3 + async uploadFromS3 ( ctx: MeasureContext, workspace: WorkspaceId, @@ -342,6 +349,37 @@ export class DatalakeClient { }) } + // R2 + + async getR2UploadParams (ctx: MeasureContext, workspace: WorkspaceId): Promise { + const path = `/upload/r2/${workspace.name}` + const url = concatLink(this.endpoint, path) + + const response = await fetchSafe(ctx, url) + const json = (await response.json()) as R2UploadParams + return json + } + + async uploadFromR2 ( + ctx: MeasureContext, + workspace: WorkspaceId, + objectName: string, + params: { + filename: string + } + ): Promise { + const path = `/upload/r2/${workspace.name}/${encodeURIComponent(objectName)}` + const url = concatLink(this.endpoint, path) + + await fetchSafe(ctx, url, { + method: 'POST', + headers: { + 'Content-Type': 'application/json' + }, + body: JSON.stringify(params) + }) + } + // Signed URL private async signObjectSign (ctx: MeasureContext, workspace: WorkspaceId, objectName: string): Promise { diff --git a/services/love/package.json b/services/love/package.json index e82672d9ad..218bf46189 100644 --- a/services/love/package.json +++ b/services/love/package.json @@ -65,6 +65,8 @@ "@hcengineering/platform": "^0.6.11", "@hcengineering/server-client": "^0.6.0", "@hcengineering/server-token": "^0.6.11", + "@hcengineering/datalake": "^0.6.0", + "@hcengineering/s3": "^0.6.0", "livekit-server-sdk": "^2.0.10", "jwt-simple": "^0.5.6", "uuid": "^8.3.2", diff --git a/services/love/src/config.ts b/services/love/src/config.ts index 7fee523d4a..1aa1b22521 100644 --- a/services/love/src/config.ts +++ b/services/love/src/config.ts @@ -24,6 +24,7 @@ interface Config { StorageConfig: string StorageProviderName: string + S3StorageConfig: string Secret: string MongoUrl: string @@ -39,6 +40,7 @@ const envMap: { [key in keyof Config]: string } = { StorageConfig: 'STORAGE_CONFIG', StorageProviderName: 'STORAGE_PROVIDER_NAME', + S3StorageConfig: 'S3_STORAGE_CONFIG', Secret: 'SECRET', ServiceID: 'SERVICE_ID', MongoUrl: 'MONGO_URL' @@ -55,12 +57,13 @@ const config: Config = (() => { ApiSecret: process.env[envMap.ApiSecret], StorageConfig: process.env[envMap.StorageConfig], StorageProviderName: process.env[envMap.StorageProviderName] ?? 's3', + S3StorageConfig: process.env[envMap.S3StorageConfig], Secret: process.env[envMap.Secret], ServiceID: process.env[envMap.ServiceID] ?? 'love-service', MongoUrl: process.env[envMap.MongoUrl] } - const optional = ['StorageConfig'] + const optional = ['StorageConfig', 'S3StorageConfig'] const missingEnv = (Object.keys(params) as Array) .filter((key) => !optional.includes(key)) diff --git a/services/love/src/main.ts b/services/love/src/main.ts index a2d36dca80..a65054e581 100644 --- a/services/love/src/main.ts +++ b/services/love/src/main.ts @@ -13,11 +13,11 @@ // limitations under the License. // -import { Ref, toWorkspaceString, WorkspaceId } from '@hcengineering/core' +import { MeasureContext, Ref, WorkspaceId } from '@hcengineering/core' import { setMetadata } from '@hcengineering/platform' import serverClient from '@hcengineering/server-client' import { initStatisticsContext, StorageConfig, StorageConfiguration } from '@hcengineering/server-core' -import { buildStorageFromConfig, storageConfigFromEnv } from '@hcengineering/server-storage' +import { storageConfigFromEnv } from '@hcengineering/server-storage' import serverToken, { decodeToken } from '@hcengineering/server-token' import { RoomMetadata, TranscriptionStatus, MeetingMinutes } from '@hcengineering/love' import cors from 'cors' @@ -32,8 +32,8 @@ import { S3Upload, WebhookReceiver } from 'livekit-server-sdk' -import { v4 as uuid } from 'uuid' import config from './config' +import { getS3UploadParams, saveFile } from './storage' import { WorkspaceClient } from './workspaceClient' const extractToken = (header: IncomingHttpHeaders): any => { @@ -50,11 +50,14 @@ export const main = async (): Promise => { setMetadata(serverToken.metadata.Secret, config.Secret) const storageConfigs: StorageConfiguration = storageConfigFromEnv() + const s3StorageConfigs: StorageConfiguration | undefined = + config.S3StorageConfig !== undefined ? storageConfigFromEnv(config.S3StorageConfig) : undefined const ctx = initStatisticsContext('love', {}) const storageConfig = storageConfigs.storages.findLast((p) => p.name === config.StorageProviderName) - const storageAdapter = buildStorageFromConfig(storageConfigs) + const s3storageConfig = s3StorageConfigs?.storages.findLast((p) => p.kind === 's3') + const app = express() const port = config.Port app.use(cors()) @@ -81,13 +84,11 @@ export const main = async (): Promise => { if (event.event === 'egress_ended' && event.egressInfo !== undefined) { for (const res of event.egressInfo.fileResults) { const data = dataByUUID.get(res.filename) - if (data !== undefined) { - const prefix = rootPrefix(storageConfig, data.workspaceId) - const filename = stripPrefix(prefix, res.filename) - const storedBlob = await storageAdapter.stat(ctx, data.workspaceId, filename) + if (data !== undefined && storageConfig !== undefined) { + const storedBlob = await saveFile(ctx, data.workspaceId, storageConfig, s3storageConfig, res.filename) if (storedBlob !== undefined) { const client = await WorkspaceClient.create(data.workspace, ctx) - await client.saveFile(filename, data.name, storedBlob, data.meetingMinutes) + await client.saveFile(storedBlob._id, data.name, storedBlob, data.meetingMinutes) await client.close() } dataByUUID.delete(res.filename) @@ -135,7 +136,7 @@ export const main = async (): Promise => { try { const dateStr = new Date().toISOString().replace('T', '_').slice(0, 19) const name = `${room}_${dateStr}.mp4` - const id = await startRecord(storageConfig, egressClient, roomClient, roomName, workspace) + const id = await startRecord(ctx, storageConfig, s3storageConfig, egressClient, roomClient, roomName, workspace) dataByUUID.set(id, { name, workspace: workspace.name, workspaceId: workspace, meetingMinutes }) ctx.info('Start recording', { workspace: workspace.name, roomName, meetingMinutes }) res.send() @@ -257,50 +258,26 @@ const checkRecordAvailable = async (storageConfig: StorageConfig | undefined): P return storageConfig !== undefined } -function getBucket (storageConfig: any, workspaceId: WorkspaceId): string { - return storageConfig.rootBucket ?? (storageConfig.bucketPrefix ?? '') + toWorkspaceString(workspaceId) -} - -function getBucketFolder (workspaceId: WorkspaceId): string { - return toWorkspaceString(workspaceId) -} - -function getDocumentKey (storageConfig: any, workspace: WorkspaceId, name: string): string { - return storageConfig.rootBucket === undefined ? name : `${getBucketFolder(workspace)}/${name}` -} - -function stripPrefix (prefix: string | undefined, key: string): string { - if (prefix !== undefined && key.startsWith(prefix)) { - return key.slice(prefix.length) - } - return key -} - -function rootPrefix (storageConfig: any, workspaceId: WorkspaceId): string | undefined { - return storageConfig.rootBucket !== undefined ? getBucketFolder(workspaceId) + '/' : undefined -} - const startRecord = async ( + ctx: MeasureContext, storageConfig: StorageConfig | undefined, + s3StorageConfig: StorageConfig | undefined, egressClient: EgressClient, roomClient: RoomServiceClient, roomName: string, workspaceId: WorkspaceId ): Promise => { if (storageConfig === undefined) { - console.error('please provide s3 storage configuration') - throw new Error('please provide s3 storage configuration') + console.error('please provide storage configuration') + throw new Error('please provide storage configuration') } - const endpoint = storageConfig.endpoint - const accessKey = (storageConfig as any).accessKey - const secret = (storageConfig as any).secretKey - const region = (storageConfig as any).region ?? 'auto' - const bucket = getBucket(storageConfig, workspaceId) - const name = uuid() - const filepath = getDocumentKey(storageConfig, workspaceId, `${name}.mp4`) + const uploadParams = await getS3UploadParams(ctx, workspaceId, storageConfig, s3StorageConfig) + + const { filepath, endpoint, accessKey, secret, region, bucket } = uploadParams const output = new EncodedFileOutput({ fileType: EncodedFileType.MP4, filepath, + disableManifest: true, output: { case: 's3', value: new S3Upload({ diff --git a/services/love/src/storage.ts b/services/love/src/storage.ts new file mode 100644 index 0000000000..a6631c7a09 --- /dev/null +++ b/services/love/src/storage.ts @@ -0,0 +1,185 @@ +// +// Copyright © 2024 Hardcore Engineering Inc. +// +// Licensed under the Eclipse Public License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. You may +// obtain a copy of the License at https://www.eclipse.org/legal/epl-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// +// See the License for the specific language governing permissions and +// limitations under the License. +// + +import { Blob, MeasureContext, toWorkspaceString, WorkspaceId } from '@hcengineering/core' +import { DatalakeConfig, DatalakeService, createDatalakeClient } from '@hcengineering/datalake' +import { S3Config, S3Service } from '@hcengineering/s3' +import { StorageConfig } from '@hcengineering/server-core' +import { v4 as uuid } from 'uuid' + +export interface S3UploadParams { + filepath: string + endpoint: string + accessKey: string + region: string + secret: string + bucket: string +} + +export async function getS3UploadParams ( + ctx: MeasureContext, + workspaceId: WorkspaceId, + storageConfig: StorageConfig, + s3StorageConfig: StorageConfig | undefined +): Promise { + if (storageConfig.kind === 's3') { + if (storageConfig.kind !== 's3') { + throw new Error('Please provide S3 storage config') + } + return await getS3UploadParamsS3(ctx, workspaceId, storageConfig as S3Config) + } else if (storageConfig.kind === 'datalake') { + if (s3StorageConfig === undefined || s3StorageConfig.kind !== 's3') { + throw new Error('Please provide S3 storage config') + } + return await getS3UploadParamsDatalake( + ctx, + workspaceId, + storageConfig as DatalakeConfig, + s3StorageConfig as S3Config + ) + } else { + throw new Error('Unknown storage kind: ' + storageConfig.kind) + } +} + +export async function saveFile ( + ctx: MeasureContext, + workspaceId: WorkspaceId, + storageConfig: StorageConfig, + s3StorageConfig: StorageConfig | undefined, + filename: string +): Promise { + if (storageConfig.kind === 's3') { + if (storageConfig.kind !== 's3') { + throw new Error('Please provide S3 storage config') + } + return await saveFileToS3(ctx, workspaceId, storageConfig as S3Config, filename) + } else if (storageConfig.kind === 'datalake') { + if (s3StorageConfig === undefined || s3StorageConfig.kind !== 's3') { + throw new Error('Please provide S3 storage config') + } + return await saveFileToDatalake( + ctx, + workspaceId, + storageConfig as DatalakeConfig, + s3StorageConfig as S3Config, + filename + ) + } else { + throw new Error('Unknown storage kind: ' + storageConfig.kind) + } +} + +async function getS3UploadParamsS3 ( + ctx: MeasureContext, + workspaceId: WorkspaceId, + storageConfig: S3Config +): Promise { + const endpoint = storageConfig.endpoint + const accessKey = storageConfig.accessKey + const secret = storageConfig.secretKey + const region = storageConfig.region ?? 'auto' + const bucket = getBucket(storageConfig, workspaceId) + const name = uuid() + const filepath = getDocumentKey(storageConfig, workspaceId, `${name}.mp4`) + + return { + filepath, + endpoint, + accessKey, + region, + secret, + bucket + } +} + +async function getS3UploadParamsDatalake ( + ctx: MeasureContext, + workspaceId: WorkspaceId, + config: DatalakeConfig, + s3config: S3Config +): Promise { + const client = createDatalakeClient(config) + const { bucket } = await client.getR2UploadParams(ctx, workspaceId) + + const endpoint = s3config.endpoint + const accessKey = s3config.accessKey + const secret = s3config.secretKey + const region = s3config.region ?? 'auto' + const name = uuid() + const filepath = getDocumentKey(s3config, workspaceId, `${name}.mp4`) + + return { + filepath, + endpoint, + accessKey, + region, + secret, + bucket + } +} + +async function saveFileToS3 ( + ctx: MeasureContext, + workspaceId: WorkspaceId, + config: S3Config, + filename: string +): Promise { + const storageAdapter = new S3Service(config) + const prefix = rootPrefix(config, workspaceId) + const uuid = stripPrefix(prefix, filename) + return await storageAdapter.stat(ctx, workspaceId, uuid) +} + +async function saveFileToDatalake ( + ctx: MeasureContext, + workspaceId: WorkspaceId, + config: DatalakeConfig, + s3config: S3Config, + filename: string +): Promise { + const client = createDatalakeClient(config) + const storageAdapter = new DatalakeService(config) + + const prefix = rootPrefix(s3config, workspaceId) + const uuid = stripPrefix(prefix, filename) + + await client.uploadFromR2(ctx, workspaceId, uuid, { filename: uuid }) + + return await storageAdapter.stat(ctx, workspaceId, uuid) +} + +function getBucket (storageConfig: S3Config, workspaceId: WorkspaceId): string { + return storageConfig.rootBucket ?? (storageConfig.bucketPrefix ?? '') + toWorkspaceString(workspaceId) +} + +function getBucketFolder (workspaceId: WorkspaceId): string { + return toWorkspaceString(workspaceId) +} + +function getDocumentKey (storageConfig: any, workspace: WorkspaceId, name: string): string { + return storageConfig.rootBucket === undefined ? name : `${getBucketFolder(workspace)}/${name}` +} + +function stripPrefix (prefix: string | undefined, key: string): string { + if (prefix !== undefined && key.startsWith(prefix)) { + return key.slice(prefix.length) + } + return key +} + +function rootPrefix (storageConfig: S3Config, workspaceId: WorkspaceId): string | undefined { + return storageConfig.rootBucket !== undefined ? getBucketFolder(workspaceId) + '/' : undefined +}