Skip to content

Commit

Permalink
ts-uploader: Add request state tracking
Browse files Browse the repository at this point in the history
Add minimal upload request state tracking in order to enable postponed
starts or retries of upload requests.
  • Loading branch information
MythicManiac committed Nov 14, 2023
1 parent a236d77 commit 88743de
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 24 deletions.
43 changes: 29 additions & 14 deletions packages/ts-uploader/src/state/MultipartUpload.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,11 @@ function slicePart(file: File, offset: number, length: number) {
return end < file.size ? file.slice(start, end) : file.slice(start);
}

async function createUploadRequest(
async function createUploadRequest<T>(
part: UploadPart,
onProgress?: UploadRequestConfig["onProgress"]
): Promise<UploadRequest> {
onProgress: UploadRequestConfig<T>["onProgress"],
transformer: UploadRequest<T>["transformer"]
): Promise<UploadRequest<T>> {
// TODO: Async md5 calculation via either wasm or workers
const md5 = await calculateMD5(part.payload);
return new UploadRequest(
Expand All @@ -44,7 +45,8 @@ async function createUploadRequest(
{
url: part.meta.url,
onProgress,
}
},
transformer
);
}

Expand Down Expand Up @@ -84,7 +86,7 @@ class UploadHandle implements IUploadHandle {
readonly parts: UploadPart[];
readonly onProgress = new TypedEventEmitter<UploadProgress>();

private requests?: UploadRequest[];
private requests?: UploadRequest<CompletedPart>[];

constructor(
handle: UserMedia,
Expand All @@ -111,6 +113,20 @@ class UploadHandle implements IUploadHandle {
);
}

private startRequests(count?: number) {
if (this.requests == undefined) {
throw new Error("Upload not yet prepared");
}

const promises = [];
const candidates = this.requests.filter((x) => x.status === "pending");
for (let i = 0; i < (count ?? candidates.length); i++) {
if (i >= candidates.length) break;
promises.push(candidates[i].upload());
}
return promises;
}

async startUpload(onProgress?: (progress: UploadProgress) => any) {
if (this.requests != undefined) {
throw new Error("Upload already initiated!");
Expand All @@ -124,28 +140,27 @@ class UploadHandle implements IUploadHandle {

this.requests = [];

const promises = [];
for (let part of this.parts) {
const request = await createUploadRequest(part, progressCallback);
this.requests.push(request);
promises.push(
request.upload((response) => {
const request = await createUploadRequest<CompletedPart>(
part,
progressCallback,
(response) => {
const etag = response.headers.get("etag");
if (!etag) {
// ETag is filtered out by some browser extensions
// TODO: Handle somehow better
throw new Error("ETag header was missing from the response!");
}
const result: CompletedPart = {
return {
ETag: etag,
PartNumber: part.meta.part_number,
};
return result;
})
}
);
this.requests.push(request);
}

const parts = await Promise.all(promises);
const parts = await Promise.all(this.startRequests());
return UsermediaEndpoints.finish(this.opts.api, {
data: { parts },
uuid: this.handle.uuid,
Expand Down
38 changes: 28 additions & 10 deletions packages/ts-uploader/src/state/UploadRequest.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
import { fetchWithProgress } from "../client/fetch";
import { CompletedPart } from "../client/types";

export type UploadRequestConfig = {
export type UploadRequestConfig<T> = {
url: string;
onProgress?: (instance: UploadRequest, progress: UploadProgress) => any;
onProgress?: (instance: UploadRequest<T>, progress: UploadProgress) => any;
};
export type UploadProgress = {
total: number;
Expand All @@ -14,27 +13,42 @@ type UploadPayload = {
md5: string;
};

export class UploadRequest {
export type UploadRequestStatus =
| "pending"
| "complete"
| "failed"
| "running"
| "aborted";

export class UploadRequest<T> {
readonly payload: UploadPayload;
readonly config: UploadRequestConfig;
readonly config: UploadRequestConfig<T>;
readonly transformer: (res: Response) => T;

status: UploadRequestStatus;
progress: UploadProgress;
result: T | undefined;

// Only intended to be used for aborting an ongoing upload
private _ongoingRequest?: XMLHttpRequest;

result?: CompletedPart;

constructor(payload: UploadPayload, config: UploadRequestConfig) {
constructor(
payload: UploadPayload,
config: UploadRequestConfig<T>,
transformer: (res: Response) => T
) {
this.payload = payload;
this.config = config;
this.transformer = transformer;
this.progress = { total: payload.data.size, complete: 0 };
this.status = "pending";
}

public abort() {
if (this._ongoingRequest) {
this._ongoingRequest.abort();
this._ongoingRequest = undefined;
this.status = "aborted";
}
}

Expand All @@ -43,7 +57,7 @@ export class UploadRequest {
console.error(e);
}

public async upload<T>(transformResult: (res: Response) => T): Promise<T> {
public async upload(): Promise<T> {
this.progress = { total: this.payload.data.size, complete: 0 };

const fetchArgs = {
Expand All @@ -69,6 +83,7 @@ export class UploadRequest {
let numRetries = 0;
let lastError: unknown;

this.status = "running";
while (numRetries < 3) {
try {
const { request, response } = fetchWithProgress(fetchArgs);
Expand All @@ -79,7 +94,9 @@ export class UploadRequest {
`Upload failed due to non-success status code: ${resp.status}`
);
}
return transformResult(resp);
this.result = this.transformer(resp);
this.status = "complete";
return this.result;
} catch (e) {
numRetries++;
this.onError(e);
Expand All @@ -93,6 +110,7 @@ export class UploadRequest {
// either finished or been aborted externally. That promise should
// not resolve even if we run out of automatic retries or the upload
// is paused for example.
this.status = "failed";
throw lastError;
}
}

0 comments on commit 88743de

Please sign in to comment.