diff --git a/src/client/admin/EmitterInfo.svelte b/src/client/admin/EmitterInfo.svelte index 7b38403..70cdedd 100644 --- a/src/client/admin/EmitterInfo.svelte +++ b/src/client/admin/EmitterInfo.svelte @@ -6,6 +6,7 @@ import iconDelete from "bootstrap-icons/icons/trash.svg?raw"; import iconUpload from "bootstrap-icons/icons/upload.svg?raw"; import iconFileCheck from "bootstrap-icons/icons/file-check.svg?raw"; + import iconStop from "bootstrap-icons/icons/stop.svg?raw"; import { _ } from "svelte-i18n"; import type { CallMethod } from "../../common/jsonRpc"; import type { EmitterAdminInfo, RpcServerInterface, ServerSentAdminInfo, ServerSentInfo } from "../../common/rpcInterface"; @@ -101,16 +102,21 @@
{#each emitter.emitterInfo.files as file} {@const keyInFiles = `${emitter.emitterShortId}/${file.name}`} - {@const valueInFiles = files?.[keyInFiles]} + {@const serverFileInfo = files?.[keyInFiles]}
{@html iconFile}{file.name} ({formatSize(file.size, $_)}) socketApi("uploadFile", { emitterId, fileName: file.name, startByte: serverFileInfo && serverFileInfo.size < file.size ? serverFileInfo.size : 0 })} + >{@html iconUpload} - {#if valueInFiles === file.size} + {#if serverFileInfo?.size === file.size && !serverFileInfo?.open} {@html iconFileCheck} - {:else if valueInFiles != null} - {formatSize(valueInFiles, $_)} + {:else if serverFileInfo} + {formatSize(serverFileInfo.size, $_)} + {#if serverFileInfo.open} + + + {/if} {/if}
{/each} diff --git a/src/common/rpcInterface.ts b/src/common/rpcInterface.ts index 1f36faa..e285651 100644 --- a/src/common/rpcInterface.ts +++ b/src/common/rpcInterface.ts @@ -34,11 +34,16 @@ export interface ServerSentReceiverInfo extends EmitterToReceiverInfo { transformImage?: TransformImage; } +export interface ServerFileInfo { + open: boolean; + size: number; +} + export interface ServerSentAdminInfo { mode: "admin"; mediaConstraints?: MediaStreamConstraints; emitters: { [emitterId: string]: EmitterAdminInfo }; - files: Record; + files: Record; } export type ClientSentInfo = ClientSentEmitterInfo | ClientSentReceiverInfo | undefined; @@ -120,6 +125,7 @@ export interface RpcClientInterface { export interface RpcServerInterface { iceCandidate?(arg: { candidate: RTCIceCandidateInit | null }): void; uploadFile?(arg: { emitterId: string; fileName: string; startByte: number }): void; + stopUpload?(arg: { emitterId: string; fileName: string }): void; removeFile?(arg: { emitterId: string; fileName: string }): void; toggleRecording?(arg: { emitterId: string; action: "stop" | "start" | "newFile"; receiver?: boolean; emitter?: boolean }): void; transformImage?(arg: { emitterId: string; transformImage: TransformImage | undefined }): void; diff --git a/src/server/clientConnection.ts b/src/server/clientConnection.ts index c84c10b..dcd5df6 100644 --- a/src/server/clientConnection.ts +++ b/src/server/clientConnection.ts @@ -323,6 +323,12 @@ export const createClientsManager = ( }); } }, + async stopUpload(arg) { + const emitter = emitterConnections.get(arg.emitterId); + if (emitter) { + uploadManager.stopUpload(`${emitter.shortId}/${arg.fileName}`); + } + }, async removeFile(arg) { const emitter = emitterConnections.get(arg.emitterId); if (emitter) { diff --git a/src/server/uploadManager.ts b/src/server/uploadManager.ts index e165270..f1c7290 100644 --- a/src/server/uploadManager.ts +++ b/src/server/uploadManager.ts @@ -7,16 +7,22 @@ import { createWriteStream } from "fs"; import { pipeline } from "stream/promises"; import { immerWritable } from "../common/immerWritable"; import equal from "fast-deep-equal"; +import type { ServerFileInfo } from "../common/rpcInterface"; export interface FileInfo { emitterShortId: string; fileName: string; startByte: number; + fileKey: string; + id: string; + stream?: ReturnType; + res?: ServerResponse; } export const createUploadManager = (config: Pick, configFilePath: string) => { - const receivedFiles$ = immerWritable({} as Record, { equal }); + const receivedFiles$ = immerWritable({} as Record, { equal }); const recordURLs = new Map(); + const openFiles = new Map(); const recordPrefix = config.recordPrefix!; const extractId = (url: string) => { if (url.startsWith(recordPrefix)) { @@ -25,20 +31,43 @@ export const createUploadManager = (config: Pick { + const existingOpenFile = openFiles.get(fileKey); + if (existingOpenFile) { + recordURLs.delete(existingOpenFile.id); + openFiles.delete(fileKey); + if (existingOpenFile.res) { + existingOpenFile.res.statusCode = 500; + existingOpenFile.res.end(JSON.stringify({})); + } + existingOpenFile.stream?.close(); + } + }; + const recordingsFolder = resolve(dirname(configFilePath), config.recordingsFolder!); const handleRequest = async (fileInfo: FileInfo, req: IncomingMessage, res: ServerResponse) => { try { const fullFileName = join(recordingsFolder, fileInfo.emitterShortId, fileInfo.fileName); await mkdir(dirname(fullFileName), { recursive: true }); + if (openFiles.get(fileInfo.fileKey) !== fileInfo) return; const stream = createWriteStream(fullFileName, { start: fileInfo.startByte, flags: fileInfo.startByte === 0 ? "w" : "r+" }); - const update = () => + fileInfo.stream = stream; + fileInfo.res = res; + const update = () => { + const newFile = openFiles.get(fileInfo.fileKey); + if (newFile?.stream && newFile !== fileInfo) return; receivedFiles$.update((receivedFiles) => { - receivedFiles[`${fileInfo.emitterShortId}/${fileInfo.fileName}`] = fileInfo.startByte + stream.bytesWritten; + receivedFiles[fileInfo.fileKey] = { + size: fileInfo.startByte + stream.bytesWritten, + open: !stream.closed, + }; return receivedFiles; }); - update(); + }; req.on("data", update); + stream.on("close", update); + update(); await pipeline(req, stream); update(); res.statusCode = 200; @@ -47,16 +76,25 @@ export const createUploadManager = (config: Pick) { + const fileKey = `${info.emitterShortId}/${info.fileName}`; + stopUpload(fileKey); const id = createId(); - recordURLs.set(id, info); + const fileInfo = { ...info, id, stream: undefined, fileKey }; + recordURLs.set(id, fileInfo); + openFiles.set(fileKey, fileInfo); return `${recordPrefix}${id}`; }, + stopUpload, handleRequest(req: IncomingMessage, res: ServerResponse) { console.log(req.method, req.url); if (req.method === "PUT") {