Skip to content

Commit

Permalink
web: very early implementation of a fetch worker
Browse files Browse the repository at this point in the history
  • Loading branch information
wukko committed Jan 29, 2025
1 parent affe494 commit 5d77247
Show file tree
Hide file tree
Showing 7 changed files with 166 additions and 9 deletions.
3 changes: 2 additions & 1 deletion web/i18n/en/queue.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,6 @@

"state.waiting": "queued",
"state.starting": "starting...",
"state.running.remux": "remuxing"
"state.running.remux": "remuxing",
"state.running.fetch": "downloading"
}
6 changes: 5 additions & 1 deletion web/src/lib/api/saving-handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { t } from "$lib/i18n/translations";
import { downloadFile } from "$lib/download";
import { createDialog } from "$lib/state/dialogs";
import { downloadButtonState } from "$lib/state/omnibox";
import { createSavePipeline } from "$lib/queen-bee/queue";

import type { DialogInfo } from "$lib/types/dialog";

Expand Down Expand Up @@ -79,8 +80,11 @@ export const savingHandler = async (link: string) => {
}

if (response.status === "local-processing") {
// TODO: actual implementation
// TODO: remove debug logging
console.log(response);

downloadButtonState.set("done");
return createSavePipeline(response);
}

if (response.status === "picker") {
Expand Down
25 changes: 25 additions & 0 deletions web/src/lib/queen-bee/queue.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { addItem } from "$lib/state/queen-bee/queue";
import type { CobaltPipelineItem } from "$lib/types/workers";
import type { CobaltLocalProcessingResponse } from "$lib/types/api";

export const getMediaType = (type: string) => {
const kind = type.split('/')[0];
Expand Down Expand Up @@ -34,3 +35,27 @@ export const createRemuxPipeline = (file: File) => {
})
}
}

export const createSavePipeline = (info: CobaltLocalProcessingResponse) => {
const parentId = crypto.randomUUID();
const pipeline: CobaltPipelineItem[] = [];

for (const tunnel of info.tunnel) {
pipeline.push({
worker: "fetch",
workerId: crypto.randomUUID(),
parentId,
workerArgs: {
url: tunnel,
},
})
}

addItem({
id: parentId,
state: "waiting",
pipeline,
filename: info.filename,
mediaType: "video",
})
}
58 changes: 55 additions & 3 deletions web/src/lib/queen-bee/run-worker.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
import RemuxWorker from "$lib/workers/remux?worker";
import FetchWorker from "$lib/workers/fetch?worker";

import { itemDone, itemError, queue } from "$lib/state/queen-bee/queue";
import { updateWorkerProgress } from "$lib/state/queen-bee/current-tasks";

import type { CobaltQueue } from "$lib/types/queue";
import type { CobaltPipelineItem } from "$lib/types/workers";

const killWorker = (worker: Worker, unsubscribe: () => void, interval: NodeJS.Timeout) => {
const killWorker = (worker: Worker, unsubscribe: () => void, interval?: NodeJS.Timeout) => {
unsubscribe();
worker.terminate();
clearInterval(interval);
if (interval) clearInterval(interval);
}

export const runRemuxWorker = async (workerId: string, parentId: string, file: File) => {
Expand Down Expand Up @@ -89,10 +90,61 @@ export const runRemuxWorker = async (workerId: string, parentId: string, file: F
};
}

export const runFetchWorker = async (workerId: string, parentId: string, url: string) => {
const worker = new FetchWorker();

const unsubscribe = queue.subscribe((queue: CobaltQueue) => {
if (!queue[parentId]) {
// TODO: remove logging
console.log("worker's parent is gone, so it killed itself");
killWorker(worker, unsubscribe);
}
});

worker.postMessage({
cobaltFetchWorker: {
url
}
});

worker.onmessage = (event) => {
const eventData = event.data.cobaltFetchWorker;
if (!eventData) return;

if (eventData.progress) {
updateWorkerProgress(workerId, {
percentage: eventData.progress,
size: eventData.size,
})
}

if (eventData.file) {
killWorker(worker, unsubscribe);
return itemDone(
parentId,
workerId,
eventData.file,
);
}

if (eventData.error) {
killWorker(worker, unsubscribe);
return itemError(parentId, workerId, eventData.error);
}
}
}

export const startWorker = async ({ worker, workerId, parentId, workerArgs }: CobaltPipelineItem) => {
switch (worker) {
case "remux":
await runRemuxWorker(workerId, parentId, workerArgs.files[0]);
if (workerArgs?.files) {
await runRemuxWorker(workerId, parentId, workerArgs.files[0]);
}
break;
case "fetch":
if (workerArgs?.url) {
await runFetchWorker(workerId, parentId, workerArgs.url)
}
break;
}
}
4 changes: 2 additions & 2 deletions web/src/lib/types/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,14 @@ type CobaltTunnelResponse = {
status: CobaltResponseType.Tunnel,
} & CobaltPartialURLResponse;

type CobaltLocalProcessingResponse = {
export type CobaltLocalProcessingResponse = {
status: CobaltResponseType.LocalProcessing,
tunnel: string[],

// TODO: proper type for processing types
type: string,
service: string,
filename?: string,
filename: string,

metadata?: {
album?: string,
Expand Down
5 changes: 3 additions & 2 deletions web/src/lib/types/workers.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
export const resultFileTypes = ["video", "audio", "image"] as const;

export type CobaltWorkerType = "remux" | "removebg";
export type CobaltWorkerType = "remux" | "fetch";
export type CobaltPipelineResultFileType = typeof resultFileTypes[number];

export type CobaltWorkerProgress = {
Expand All @@ -10,7 +10,8 @@ export type CobaltWorkerProgress = {
}

export type CobaltWorkerArgs = {
files: File[],
files?: File[],
url?: string,
//TODO: args for libav & etc with unique types
}

Expand Down
74 changes: 74 additions & 0 deletions web/src/lib/workers/fetch.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
const error = (code: string) => {
// TODO: return proper errors and code here
self.postMessage({
cobaltFetchWorker: {
error: code,
}
})
};

const fetchFile = async (url: string) => {
try {
const response = await fetch(url);

if (!response.ok) {
error("file response wasn't ok");
return self.close();
}

const contentType = response.headers.get('Content-Type') || 'application/octet-stream';
const contentLength = response.headers.get('Content-Length');

const totalBytes = contentLength ? parseInt(contentLength, 10) : null;
const reader = response.body?.getReader();

if (!reader) {
error("no reader");
return self.close();
}

let receivedBytes = 0;
const chunks = [];

while (true) {
const { done, value } = await reader.read();
if (done) break;

receivedBytes += value.length;
chunks.push(value);

if (totalBytes) {
self.postMessage({
cobaltFetchWorker: {
progress: Math.round((receivedBytes / totalBytes) * 100),
size: receivedBytes,
}
});
}
}

if (receivedBytes === 0) {
error("tunnel is broken");
return self.close();
}

const file = new File(chunks, "file", { type: contentType });

self.postMessage({
cobaltFetchWorker: {
file
}
});
} catch (e) {
console.log(e)
error("error when downloading the file");
return self.close();
}
}

self.onmessage = async (event: MessageEvent) => {
if (event.data.cobaltFetchWorker) {
await fetchFile(event.data.cobaltFetchWorker.url);
self.close();
}
}

0 comments on commit 5d77247

Please sign in to comment.