Skip to content

Commit

Permalink
Allow stopping upload
Browse files Browse the repository at this point in the history
  • Loading branch information
davdiv committed Aug 22, 2023
1 parent 042c9d9 commit 8049e7b
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 12 deletions.
16 changes: 11 additions & 5 deletions src/client/admin/EmitterInfo.svelte
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import iconDelete from "bootstrap-icons/icons/trash.svg?raw";
import iconUpload from "bootstrap-icons/icons/upload.svg?raw";
import iconFileCheck from "bootstrap-icons/icons/file-check.svg?raw";
import iconStop from "bootstrap-icons/icons/stop.svg?raw";
import { _ } from "svelte-i18n";
import type { CallMethod } from "../../common/jsonRpc";
import type { EmitterAdminInfo, RpcServerInterface, ServerSentAdminInfo, ServerSentInfo } from "../../common/rpcInterface";
Expand Down Expand Up @@ -101,16 +102,21 @@
<div>
{#each emitter.emitterInfo.files as file}
{@const keyInFiles = `${emitter.emitterShortId}/${file.name}`}
{@const valueInFiles = files?.[keyInFiles]}
{@const serverFileInfo = files?.[keyInFiles]}
<div class="flex">
{@html iconFile}<span>{file.name} (<span title={$_("byte", { values: { size: file.size } })}>{formatSize(file.size, $_)}</span>)</span><button
class="flex"
on:click={() => socketApi("uploadFile", { emitterId, fileName: file.name, startByte: valueInFiles != null && valueInFiles < file.size ? valueInFiles : 0 })}>{@html iconUpload}</button
on:click={() => socketApi("uploadFile", { emitterId, fileName: file.name, startByte: serverFileInfo && serverFileInfo.size < file.size ? serverFileInfo.size : 0 })}
>{@html iconUpload}</button
><button class="flex" on:click={() => socketApi("removeFile", { emitterId, fileName: file.name })}>{@html iconDelete}</button>
{#if valueInFiles === file.size}
{#if serverFileInfo?.size === file.size && !serverFileInfo?.open}
{@html iconFileCheck}
{:else if valueInFiles != null}
<span title={$_("byte", { values: { size: valueInFiles } })}>{formatSize(valueInFiles, $_)}</span><progress value={valueInFiles} max={file.size}></progress>
{:else if serverFileInfo}
<span title={$_("byte", { values: { size: serverFileInfo.size } })}>{formatSize(serverFileInfo.size, $_)}</span>
{#if serverFileInfo.open}
<button class="flex" on:click={() => socketApi("stopUpload", { emitterId, fileName: file.name })}>{@html iconStop}</button>
<progress value={serverFileInfo.size} max={file.size}></progress>
{/if}
{/if}
</div>
{/each}
Expand Down
8 changes: 7 additions & 1 deletion src/common/rpcInterface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,16 @@ export interface ServerSentReceiverInfo extends EmitterToReceiverInfo {
transformImage?: TransformImage;
}

export interface ServerFileInfo {
open: boolean;
size: number;
}

export interface ServerSentAdminInfo {
mode: "admin";
mediaConstraints?: MediaStreamConstraints;
emitters: { [emitterId: string]: EmitterAdminInfo };
files: Record<string, number>;
files: Record<string, ServerFileInfo>;
}

export type ClientSentInfo = ClientSentEmitterInfo | ClientSentReceiverInfo | undefined;
Expand Down Expand Up @@ -120,6 +125,7 @@ export interface RpcClientInterface {
export interface RpcServerInterface {
iceCandidate?(arg: { candidate: RTCIceCandidateInit | null }): void;
uploadFile?(arg: { emitterId: string; fileName: string; startByte: number }): void;
stopUpload?(arg: { emitterId: string; fileName: string }): void;
removeFile?(arg: { emitterId: string; fileName: string }): void;
toggleRecording?(arg: { emitterId: string; action: "stop" | "start" | "newFile"; receiver?: boolean; emitter?: boolean }): void;
transformImage?(arg: { emitterId: string; transformImage: TransformImage | undefined }): void;
Expand Down
6 changes: 6 additions & 0 deletions src/server/clientConnection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,12 @@ export const createClientsManager = (
});
}
},
async stopUpload(arg) {
const emitter = emitterConnections.get(arg.emitterId);
if (emitter) {
uploadManager.stopUpload(`${emitter.shortId}/${arg.fileName}`);
}
},
async removeFile(arg) {
const emitter = emitterConnections.get(arg.emitterId);
if (emitter) {
Expand Down
50 changes: 44 additions & 6 deletions src/server/uploadManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,22 @@ import { createWriteStream } from "fs";
import { pipeline } from "stream/promises";
import { immerWritable } from "../common/immerWritable";
import equal from "fast-deep-equal";
import type { ServerFileInfo } from "../common/rpcInterface";

export interface FileInfo {
emitterShortId: string;
fileName: string;
startByte: number;
fileKey: string;
id: string;
stream?: ReturnType<typeof createWriteStream>;
res?: ServerResponse;
}

export const createUploadManager = (config: Pick<ServerConfig, "recordPrefix" | "recordingsFolder">, configFilePath: string) => {
const receivedFiles$ = immerWritable({} as Record<string, number>, { equal });
const receivedFiles$ = immerWritable({} as Record<string, ServerFileInfo>, { equal });
const recordURLs = new Map<string, FileInfo>();
const openFiles = new Map<string, FileInfo>();
const recordPrefix = config.recordPrefix!;
const extractId = (url: string) => {
if (url.startsWith(recordPrefix)) {
Expand All @@ -25,20 +31,43 @@ export const createUploadManager = (config: Pick<ServerConfig, "recordPrefix" |
return undefined;
};

const stopUpload = (fileKey: string) => {
const existingOpenFile = openFiles.get(fileKey);
if (existingOpenFile) {
recordURLs.delete(existingOpenFile.id);
openFiles.delete(fileKey);
if (existingOpenFile.res) {
existingOpenFile.res.statusCode = 500;
existingOpenFile.res.end(JSON.stringify({}));
}
existingOpenFile.stream?.close();
}
};

const recordingsFolder = resolve(dirname(configFilePath), config.recordingsFolder!);

const handleRequest = async (fileInfo: FileInfo, req: IncomingMessage, res: ServerResponse) => {
try {
const fullFileName = join(recordingsFolder, fileInfo.emitterShortId, fileInfo.fileName);
await mkdir(dirname(fullFileName), { recursive: true });
if (openFiles.get(fileInfo.fileKey) !== fileInfo) return;
const stream = createWriteStream(fullFileName, { start: fileInfo.startByte, flags: fileInfo.startByte === 0 ? "w" : "r+" });
const update = () =>
fileInfo.stream = stream;
fileInfo.res = res;
const update = () => {
const newFile = openFiles.get(fileInfo.fileKey);
if (newFile?.stream && newFile !== fileInfo) return;
receivedFiles$.update((receivedFiles) => {
receivedFiles[`${fileInfo.emitterShortId}/${fileInfo.fileName}`] = fileInfo.startByte + stream.bytesWritten;
receivedFiles[fileInfo.fileKey] = {
size: fileInfo.startByte + stream.bytesWritten,
open: !stream.closed,
};
return receivedFiles;
});
update();
};
req.on("data", update);
stream.on("close", update);
update();
await pipeline(req, stream);
update();
res.statusCode = 200;
Expand All @@ -47,16 +76,25 @@ export const createUploadManager = (config: Pick<ServerConfig, "recordPrefix" |
console.log(error);
res.statusCode = 500;
res.end(JSON.stringify({}));
} finally {
if (openFiles.get(fileInfo.fileKey) === fileInfo) {
openFiles.delete(fileInfo.fileKey);
}
}
};

return {
receivedFiles$,
createUploadURL(info: FileInfo) {
createUploadURL(info: Pick<FileInfo, "emitterShortId" | "startByte" | "fileName">) {
const fileKey = `${info.emitterShortId}/${info.fileName}`;
stopUpload(fileKey);
const id = createId();
recordURLs.set(id, info);
const fileInfo = { ...info, id, stream: undefined, fileKey };
recordURLs.set(id, fileInfo);
openFiles.set(fileKey, fileInfo);
return `${recordPrefix}${id}`;
},
stopUpload,
handleRequest(req: IncomingMessage, res: ServerResponse) {
console.log(req.method, req.url);
if (req.method === "PUT") {
Expand Down

0 comments on commit 8049e7b

Please sign in to comment.