diff --git a/packages/ts-uploader/src/state/MultipartUpload.ts b/packages/ts-uploader/src/state/MultipartUpload.ts index f0137b6b9..cd7583893 100644 --- a/packages/ts-uploader/src/state/MultipartUpload.ts +++ b/packages/ts-uploader/src/state/MultipartUpload.ts @@ -30,10 +30,11 @@ function slicePart(file: File, offset: number, length: number) { return end < file.size ? file.slice(start, end) : file.slice(start); } -async function createUploadRequest( +async function createUploadRequest( part: UploadPart, - onProgress?: UploadRequestConfig["onProgress"] -): Promise { + onProgress: UploadRequestConfig["onProgress"], + transformer: UploadRequest["transformer"] +): Promise> { // TODO: Async md5 calculation via either wasm or workers const md5 = await calculateMD5(part.payload); return new UploadRequest( @@ -44,7 +45,8 @@ async function createUploadRequest( { url: part.meta.url, onProgress, - } + }, + transformer ); } @@ -84,7 +86,7 @@ class UploadHandle implements IUploadHandle { readonly parts: UploadPart[]; readonly onProgress = new TypedEventEmitter(); - private requests?: UploadRequest[]; + private requests?: UploadRequest[]; constructor( handle: UserMedia, @@ -111,6 +113,20 @@ class UploadHandle implements IUploadHandle { ); } + private startRequests(count?: number) { + if (this.requests == undefined) { + throw new Error("Upload not yet prepared"); + } + + const promises = []; + const candidates = this.requests.filter((x) => x.status === "pending"); + for (let i = 0; i < (count ?? candidates.length); i++) { + if (i >= candidates.length) break; + promises.push(candidates[i].upload()); + } + return promises; + } + async startUpload(onProgress?: (progress: UploadProgress) => any) { if (this.requests != undefined) { throw new Error("Upload already initiated!"); @@ -124,28 +140,27 @@ class UploadHandle implements IUploadHandle { this.requests = []; - const promises = []; for (let part of this.parts) { - const request = await createUploadRequest(part, progressCallback); - this.requests.push(request); - promises.push( - request.upload((response) => { + const request = await createUploadRequest( + part, + progressCallback, + (response) => { const etag = response.headers.get("etag"); if (!etag) { // ETag is filtered out by some browser extensions // TODO: Handle somehow better throw new Error("ETag header was missing from the response!"); } - const result: CompletedPart = { + return { ETag: etag, PartNumber: part.meta.part_number, }; - return result; - }) + } ); + this.requests.push(request); } - const parts = await Promise.all(promises); + const parts = await Promise.all(this.startRequests()); return UsermediaEndpoints.finish(this.opts.api, { data: { parts }, uuid: this.handle.uuid, diff --git a/packages/ts-uploader/src/state/UploadRequest.ts b/packages/ts-uploader/src/state/UploadRequest.ts index fcecc33cd..a82af7628 100644 --- a/packages/ts-uploader/src/state/UploadRequest.ts +++ b/packages/ts-uploader/src/state/UploadRequest.ts @@ -1,9 +1,8 @@ import { fetchWithProgress } from "../client/fetch"; -import { CompletedPart } from "../client/types"; -export type UploadRequestConfig = { +export type UploadRequestConfig = { url: string; - onProgress?: (instance: UploadRequest, progress: UploadProgress) => any; + onProgress?: (instance: UploadRequest, progress: UploadProgress) => any; }; export type UploadProgress = { total: number; @@ -14,27 +13,42 @@ type UploadPayload = { md5: string; }; -export class UploadRequest { +export type UploadRequestStatus = + | "pending" + | "complete" + | "failed" + | "running" + | "aborted"; + +export class UploadRequest { readonly payload: UploadPayload; - readonly config: UploadRequestConfig; + readonly config: UploadRequestConfig; + readonly transformer: (res: Response) => T; + status: UploadRequestStatus; progress: UploadProgress; + result: T | undefined; // Only intended to be used for aborting an ongoing upload private _ongoingRequest?: XMLHttpRequest; - result?: CompletedPart; - - constructor(payload: UploadPayload, config: UploadRequestConfig) { + constructor( + payload: UploadPayload, + config: UploadRequestConfig, + transformer: (res: Response) => T + ) { this.payload = payload; this.config = config; + this.transformer = transformer; this.progress = { total: payload.data.size, complete: 0 }; + this.status = "pending"; } public abort() { if (this._ongoingRequest) { this._ongoingRequest.abort(); this._ongoingRequest = undefined; + this.status = "aborted"; } } @@ -43,7 +57,7 @@ export class UploadRequest { console.error(e); } - public async upload(transformResult: (res: Response) => T): Promise { + public async upload(): Promise { this.progress = { total: this.payload.data.size, complete: 0 }; const fetchArgs = { @@ -69,6 +83,7 @@ export class UploadRequest { let numRetries = 0; let lastError: unknown; + this.status = "running"; while (numRetries < 3) { try { const { request, response } = fetchWithProgress(fetchArgs); @@ -79,7 +94,9 @@ export class UploadRequest { `Upload failed due to non-success status code: ${resp.status}` ); } - return transformResult(resp); + this.result = this.transformer(resp); + this.status = "complete"; + return this.result; } catch (e) { numRetries++; this.onError(e); @@ -93,6 +110,7 @@ export class UploadRequest { // either finished or been aborted externally. That promise should // not resolve even if we run out of automatic retries or the upload // is paused for example. + this.status = "failed"; throw lastError; } }