From 5d7724762d96363943b5a9b0858edd09e02bf1fc Mon Sep 17 00:00:00 2001 From: wukko Date: Thu, 30 Jan 2025 01:04:33 +0600 Subject: [PATCH] web: very early implementation of a fetch worker --- web/i18n/en/queue.json | 3 +- web/src/lib/api/saving-handler.ts | 6 ++- web/src/lib/queen-bee/queue.ts | 25 ++++++++++ web/src/lib/queen-bee/run-worker.ts | 58 ++++++++++++++++++++-- web/src/lib/types/api.ts | 4 +- web/src/lib/types/workers.ts | 5 +- web/src/lib/workers/fetch.ts | 74 +++++++++++++++++++++++++++++ 7 files changed, 166 insertions(+), 9 deletions(-) create mode 100644 web/src/lib/workers/fetch.ts diff --git a/web/i18n/en/queue.json b/web/i18n/en/queue.json index 508ff2576..d7ae7f57b 100644 --- a/web/i18n/en/queue.json +++ b/web/i18n/en/queue.json @@ -6,5 +6,6 @@ "state.waiting": "queued", "state.starting": "starting...", - "state.running.remux": "remuxing" + "state.running.remux": "remuxing", + "state.running.fetch": "downloading" } diff --git a/web/src/lib/api/saving-handler.ts b/web/src/lib/api/saving-handler.ts index a7f8f8846..50d8b043b 100644 --- a/web/src/lib/api/saving-handler.ts +++ b/web/src/lib/api/saving-handler.ts @@ -5,6 +5,7 @@ import { t } from "$lib/i18n/translations"; import { downloadFile } from "$lib/download"; import { createDialog } from "$lib/state/dialogs"; import { downloadButtonState } from "$lib/state/omnibox"; +import { createSavePipeline } from "$lib/queen-bee/queue"; import type { DialogInfo } from "$lib/types/dialog"; @@ -79,8 +80,11 @@ export const savingHandler = async (link: string) => { } if (response.status === "local-processing") { - // TODO: actual implementation + // TODO: remove debug logging console.log(response); + + downloadButtonState.set("done"); + return createSavePipeline(response); } if (response.status === "picker") { diff --git a/web/src/lib/queen-bee/queue.ts b/web/src/lib/queen-bee/queue.ts index 8e2b3b904..8d9d517b0 100644 --- a/web/src/lib/queen-bee/queue.ts +++ b/web/src/lib/queen-bee/queue.ts @@ -1,5 +1,6 @@ import { addItem } from "$lib/state/queen-bee/queue"; import type { CobaltPipelineItem } from "$lib/types/workers"; +import type { CobaltLocalProcessingResponse } from "$lib/types/api"; export const getMediaType = (type: string) => { const kind = type.split('/')[0]; @@ -34,3 +35,27 @@ export const createRemuxPipeline = (file: File) => { }) } } + +export const createSavePipeline = (info: CobaltLocalProcessingResponse) => { + const parentId = crypto.randomUUID(); + const pipeline: CobaltPipelineItem[] = []; + + for (const tunnel of info.tunnel) { + pipeline.push({ + worker: "fetch", + workerId: crypto.randomUUID(), + parentId, + workerArgs: { + url: tunnel, + }, + }) + } + + addItem({ + id: parentId, + state: "waiting", + pipeline, + filename: info.filename, + mediaType: "video", + }) +} diff --git a/web/src/lib/queen-bee/run-worker.ts b/web/src/lib/queen-bee/run-worker.ts index d3a83c779..f6fc276b0 100644 --- a/web/src/lib/queen-bee/run-worker.ts +++ b/web/src/lib/queen-bee/run-worker.ts @@ -1,4 +1,5 @@ import RemuxWorker from "$lib/workers/remux?worker"; +import FetchWorker from "$lib/workers/fetch?worker"; import { itemDone, itemError, queue } from "$lib/state/queen-bee/queue"; import { updateWorkerProgress } from "$lib/state/queen-bee/current-tasks"; @@ -6,10 +7,10 @@ import { updateWorkerProgress } from "$lib/state/queen-bee/current-tasks"; import type { CobaltQueue } from "$lib/types/queue"; import type { CobaltPipelineItem } from "$lib/types/workers"; -const killWorker = (worker: Worker, unsubscribe: () => void, interval: NodeJS.Timeout) => { +const killWorker = (worker: Worker, unsubscribe: () => void, interval?: NodeJS.Timeout) => { unsubscribe(); worker.terminate(); - clearInterval(interval); + if (interval) clearInterval(interval); } export const runRemuxWorker = async (workerId: string, parentId: string, file: File) => { @@ -89,10 +90,61 @@ export const runRemuxWorker = async (workerId: string, parentId: string, file: F }; } +export const runFetchWorker = async (workerId: string, parentId: string, url: string) => { + const worker = new FetchWorker(); + + const unsubscribe = queue.subscribe((queue: CobaltQueue) => { + if (!queue[parentId]) { + // TODO: remove logging + console.log("worker's parent is gone, so it killed itself"); + killWorker(worker, unsubscribe); + } + }); + + worker.postMessage({ + cobaltFetchWorker: { + url + } + }); + + worker.onmessage = (event) => { + const eventData = event.data.cobaltFetchWorker; + if (!eventData) return; + + if (eventData.progress) { + updateWorkerProgress(workerId, { + percentage: eventData.progress, + size: eventData.size, + }) + } + + if (eventData.file) { + killWorker(worker, unsubscribe); + return itemDone( + parentId, + workerId, + eventData.file, + ); + } + + if (eventData.error) { + killWorker(worker, unsubscribe); + return itemError(parentId, workerId, eventData.error); + } + } +} + export const startWorker = async ({ worker, workerId, parentId, workerArgs }: CobaltPipelineItem) => { switch (worker) { case "remux": - await runRemuxWorker(workerId, parentId, workerArgs.files[0]); + if (workerArgs?.files) { + await runRemuxWorker(workerId, parentId, workerArgs.files[0]); + } + break; + case "fetch": + if (workerArgs?.url) { + await runFetchWorker(workerId, parentId, workerArgs.url) + } break; } } diff --git a/web/src/lib/types/api.ts b/web/src/lib/types/api.ts index cd9aa44c0..d91f71300 100644 --- a/web/src/lib/types/api.ts +++ b/web/src/lib/types/api.ts @@ -41,14 +41,14 @@ type CobaltTunnelResponse = { status: CobaltResponseType.Tunnel, } & CobaltPartialURLResponse; -type CobaltLocalProcessingResponse = { +export type CobaltLocalProcessingResponse = { status: CobaltResponseType.LocalProcessing, tunnel: string[], // TODO: proper type for processing types type: string, service: string, - filename?: string, + filename: string, metadata?: { album?: string, diff --git a/web/src/lib/types/workers.ts b/web/src/lib/types/workers.ts index 0dabeb1cd..abd560e1d 100644 --- a/web/src/lib/types/workers.ts +++ b/web/src/lib/types/workers.ts @@ -1,6 +1,6 @@ export const resultFileTypes = ["video", "audio", "image"] as const; -export type CobaltWorkerType = "remux" | "removebg"; +export type CobaltWorkerType = "remux" | "fetch"; export type CobaltPipelineResultFileType = typeof resultFileTypes[number]; export type CobaltWorkerProgress = { @@ -10,7 +10,8 @@ export type CobaltWorkerProgress = { } export type CobaltWorkerArgs = { - files: File[], + files?: File[], + url?: string, //TODO: args for libav & etc with unique types } diff --git a/web/src/lib/workers/fetch.ts b/web/src/lib/workers/fetch.ts new file mode 100644 index 000000000..0965c51d3 --- /dev/null +++ b/web/src/lib/workers/fetch.ts @@ -0,0 +1,74 @@ +const error = (code: string) => { + // TODO: return proper errors and code here + self.postMessage({ + cobaltFetchWorker: { + error: code, + } + }) +}; + +const fetchFile = async (url: string) => { + try { + const response = await fetch(url); + + if (!response.ok) { + error("file response wasn't ok"); + return self.close(); + } + + const contentType = response.headers.get('Content-Type') || 'application/octet-stream'; + const contentLength = response.headers.get('Content-Length'); + + const totalBytes = contentLength ? parseInt(contentLength, 10) : null; + const reader = response.body?.getReader(); + + if (!reader) { + error("no reader"); + return self.close(); + } + + let receivedBytes = 0; + const chunks = []; + + while (true) { + const { done, value } = await reader.read(); + if (done) break; + + receivedBytes += value.length; + chunks.push(value); + + if (totalBytes) { + self.postMessage({ + cobaltFetchWorker: { + progress: Math.round((receivedBytes / totalBytes) * 100), + size: receivedBytes, + } + }); + } + } + + if (receivedBytes === 0) { + error("tunnel is broken"); + return self.close(); + } + + const file = new File(chunks, "file", { type: contentType }); + + self.postMessage({ + cobaltFetchWorker: { + file + } + }); + } catch (e) { + console.log(e) + error("error when downloading the file"); + return self.close(); + } +} + +self.onmessage = async (event: MessageEvent) => { + if (event.data.cobaltFetchWorker) { + await fetchFile(event.data.cobaltFetchWorker.url); + self.close(); + } +}