Skip to content

Commit

Permalink
feat: don't fail entire request when one debrid service fails
Browse files Browse the repository at this point in the history
  • Loading branch information
Viren070 committed Jan 30, 2025
1 parent 2ac1248 commit 468b791
Show file tree
Hide file tree
Showing 12 changed files with 199 additions and 69 deletions.
30 changes: 21 additions & 9 deletions packages/addon/src/addon.ts
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ export class AIOStreams {
];
}
}
const { errorStreams, parsedStreams } =
const { parsedStreams, errorStreams } =
await this.getParsedStreams(streamRequest);

console.log(
Expand Down Expand Up @@ -863,7 +863,7 @@ export class AIOStreams {

private async getParsedStreams(
streamRequest: StreamRequest
): Promise<{ errorStreams: ErrorStream[]; parsedStreams: ParsedStream[] }> {
): Promise<{ parsedStreams: ParsedStream[]; errorStreams: ErrorStream[] }> {
const parsedStreams: ParsedStream[] = [];
const errorStreams: ErrorStream[] = [];
const addonPromises = this.config.addons.map(async (addon) => {
Expand All @@ -875,14 +875,20 @@ export class AIOStreams {
const addonId = `${addon.id}-${JSON.stringify(addon.options)}`;
try {
const startTime = new Date().getTime();
const streams = await this.getStreamsFromAddon(
const { addonStreams, addonErrors } = await this.getStreamsFromAddon(
addon,
addonId,
streamRequest
);
parsedStreams.push(...streams);
parsedStreams.push(...addonStreams);
errorStreams.push(
...[...new Set(addonErrors)].map((error) => ({
error,
addon: { id: addonId, name: addonName },
}))
);
console.log(
`|INF| addon > getParsedStreams: Got ${streams.length} streams from addon ${addonName} in ${getTimeTakenSincePoint(startTime)}`
`|INF| addon > getParsedStreams: Got ${parsedStreams.length} streams from addon ${addonName} in ${getTimeTakenSincePoint(startTime)}`
);
} catch (error: any) {
console.error(
Expand All @@ -899,14 +905,14 @@ export class AIOStreams {
});

await Promise.all(addonPromises);
return { errorStreams, parsedStreams };
return { parsedStreams, errorStreams };
}

private async getStreamsFromAddon(
addon: Config['addons'][0],
addonId: string,
streamRequest: StreamRequest
): Promise<ParsedStream[]> {
): Promise<{ addonStreams: ParsedStream[]; addonErrors: string[] }> {
switch (addon.id) {
case 'torbox': {
return await getTorboxStreams(
Expand Down Expand Up @@ -1009,7 +1015,10 @@ export class AIOStreams {
? parseInt(addon.options.indexerTimeout)
: Settings.DEFAULT_GDRIVE_TIMEOUT
);
return await wrapper.getParsedStreams(streamRequest);
return {
addonStreams: await wrapper.getParsedStreams(streamRequest),
addonErrors: [],
};
}
default: {
if (!addon.options.url) {
Expand All @@ -1026,7 +1035,10 @@ export class AIOStreams {
? parseInt(addon.options.indexerTimeout)
: undefined
);
return await wrapper.getParsedStreams(streamRequest);
return {
addonStreams: await wrapper.getParsedStreams(streamRequest),
addonErrors: [],
};
}
}
}
Expand Down
29 changes: 22 additions & 7 deletions packages/wrappers/src/comet.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,10 @@ export async function getCometStreams(
},
streamRequest: StreamRequest,
addonId: string
): Promise<ParsedStream[]> {
): Promise<{
addonStreams: ParsedStream[];
addonErrors: string[];
}> {
const supportedServices: string[] =
addonDetails.find((addon: AddonDetail) => addon.id === 'comet')
?.supportedServices || [];
Expand All @@ -74,7 +77,10 @@ export async function getCometStreams(
config,
indexerTimeout
);
return comet.getParsedStreams(streamRequest);
return {
addonStreams: await comet.getParsedStreams(streamRequest),
addonErrors: [],
};
}

// find all usable and enabled services
Expand Down Expand Up @@ -128,15 +134,18 @@ export async function getCometStreams(
indexerTimeout
);

return comet.getParsedStreams(streamRequest);
return {
addonStreams: await comet.getParsedStreams(streamRequest),
addonErrors: [],
};
}

// if no prioritised service is provided, create a comet instance for each service
const servicesToUse = usableServices.filter((service) => service.enabled);
if (servicesToUse.length < 1) {
throw new Error('No supported service(s) enabled');
}

const errorMessages: string[] = [];
const streamPromises = servicesToUse.map(async (service) => {
const cometConfig = getCometConfig(service.id, service.credentials.apiKey);
const configString = Buffer.from(JSON.stringify(cometConfig)).toString(
Expand All @@ -153,8 +162,14 @@ export async function getCometStreams(
return comet.getParsedStreams(streamRequest);
});

const streamsArray = await Promise.all(streamPromises);
streamsArray.forEach((streams) => parsedStreams.push(...streams));
const results = await Promise.allSettled(streamPromises);
results.forEach((result) => {
if (result.status === 'fulfilled') {
parsedStreams.push(...result.value);
} else {
errorMessages.push(result.reason.message);
}
});

return parsedStreams;
return { addonStreams: parsedStreams, addonErrors: errorMessages };
}
32 changes: 25 additions & 7 deletions packages/wrappers/src/debridio.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,14 @@ export async function getDebridioStreams(
},
streamRequest: StreamRequest,
addonId: string
): Promise<ParsedStream[]> {
): Promise<{
addonStreams: ParsedStream[];
addonErrors: string[];
}> {
const supportedServices: string[] =
addonDetails.find((addon: AddonDetail) => addon.id === 'debridio')
?.supportedServices || [];
const parsedStreams: ParsedStream[] = [];
const addonStreams: ParsedStream[] = [];
const indexerTimeout = debridioOptions.indexerTimeout
? parseInt(debridioOptions.indexerTimeout)
: undefined;
Expand All @@ -81,7 +84,10 @@ export async function getDebridioStreams(
config,
indexerTimeout
);
return debridio.getParsedStreams(streamRequest);
return {
addonStreams: await debridio.getParsedStreams(streamRequest),
addonErrors: [],
};
}

// find all usable and enabled services
Expand Down Expand Up @@ -134,7 +140,10 @@ export async function getDebridioStreams(
indexerTimeout
);

return debridio.getParsedStreams(streamRequest);
return {
addonStreams: await debridio.getParsedStreams(streamRequest),
addonErrors: [],
};
}

// if no prioritised service is provided, create a debridio instance for each service
Expand All @@ -143,6 +152,8 @@ export async function getDebridioStreams(
throw new Error('No supported service(s) enabled');
}

const addonErrors: string[] = [];

const streamPromises = servicesToUse.map(async (service) => {
const debridioConfigString = getDebridioConfigString(
service.id,
Expand All @@ -159,8 +170,15 @@ export async function getDebridioStreams(
return debridio.getParsedStreams(streamRequest);
});

const streamsArray = await Promise.all(streamPromises);
streamsArray.forEach((streams) => parsedStreams.push(...streams));
const streamsArray = await Promise.allSettled(streamPromises);

streamsArray.forEach((result) => {
if (result.status === 'fulfilled') {
addonStreams.push(...result.value);
} else {
addonErrors.push(result.reason.message);
}
});

return parsedStreams;
return { addonStreams, addonErrors };
}
10 changes: 8 additions & 2 deletions packages/wrappers/src/dmmCast.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,10 @@ export async function getDMMCastStreams(
},
streamRequest: StreamRequest,
addonId: string
): Promise<ParsedStream[]> {
): Promise<{
addonStreams: ParsedStream[];
addonErrors: string[];
}> {
if (!dmmCastOptions.installationUrl) {
throw new Error('DMM Cast installation URL is missing');
} else if (
Expand All @@ -85,5 +88,8 @@ export async function getDMMCastStreams(
? parseInt(dmmCastOptions.indexerTimeout)
: undefined
);
return dmmCast.getParsedStreams(streamRequest);
return {
addonStreams: await dmmCast.getParsedStreams(streamRequest),
addonErrors: [],
};
}
10 changes: 8 additions & 2 deletions packages/wrappers/src/easynews.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,10 @@ export async function getEasynewsStreams(
},
streamRequest: StreamRequest,
addonId: string
): Promise<ParsedStream[]> {
): Promise<{
addonStreams: ParsedStream[];
addonErrors: string[];
}> {
// look for the 'easynews' id in the services array and destructure the username and password
// if we cant find it, throw an error
const easynewsService = serviceDetails.find(
Expand Down Expand Up @@ -81,5 +84,8 @@ export async function getEasynewsStreams(
);

const streams = await easynews.getParsedStreams(streamRequest);
return streams;
return {
addonStreams: streams,
addonErrors: [],
};
}
10 changes: 8 additions & 2 deletions packages/wrappers/src/easynewsPlus.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,10 @@ export async function getEasynewsPlusStreams(
},
streamRequest: StreamRequest,
addonId: string
): Promise<ParsedStream[]> {
): Promise<{
addonStreams: ParsedStream[];
addonErrors: string[];
}> {
// look for the 'easynews' id in the services array and destructure the username and password
// if we cant find it, throw an error
const easynewsService = serviceDetails.find(
Expand Down Expand Up @@ -81,5 +84,8 @@ export async function getEasynewsPlusStreams(
);

const streams = await easynews.getParsedStreams(streamRequest);
return streams;
return {
addonStreams: streams,
addonErrors: [],
};
}
33 changes: 26 additions & 7 deletions packages/wrappers/src/jackettio.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,14 @@ export async function getJackettioStreams(
},
streamRequest: StreamRequest,
addonId: string
): Promise<ParsedStream[]> {
): Promise<{
addonStreams: ParsedStream[];
addonErrors: string[];
}> {
const supportedServices: string[] =
addonDetails.find((addon: AddonDetail) => addon.id === 'jackettio')
?.supportedServices || [];
const parsedStreams: ParsedStream[] = [];
const addonStreams: ParsedStream[] = [];

const indexerTimeout = jackettioOptions.indexerTimeout
? parseInt(jackettioOptions.indexerTimeout)
Expand All @@ -89,7 +92,10 @@ export async function getJackettioStreams(
config,
indexerTimeout
);
return jackettio.getParsedStreams(streamRequest);
return {
addonStreams: await jackettio.getParsedStreams(streamRequest),
addonErrors: [],
};
}

// find all usable and enabled services
Expand Down Expand Up @@ -142,10 +148,14 @@ export async function getJackettioStreams(
indexerTimeout
);

return jackettio.getParsedStreams(streamRequest);
return {
addonStreams: await jackettio.getParsedStreams(streamRequest),
addonErrors: [],
};
}

// if no prioritised service is provided, create a jackettio instance for each service
const addonErrors: string[] = [];
const servicesToUse = usableServices.filter((service) => service.enabled);
if (servicesToUse.length < 1) {
throw new Error('No supported service(s) enabled');
Expand All @@ -167,8 +177,17 @@ export async function getJackettioStreams(
return jackettio.getParsedStreams(streamRequest);
});

const streamsArray = await Promise.all(streamPromises);
streamsArray.forEach((streams) => parsedStreams.push(...streams));
const streamsArray = await Promise.allSettled(streamPromises);
streamsArray.forEach((result) => {
if (result.status === 'fulfilled') {
addonStreams.push(...result.value);
} else {
addonErrors.push(result.reason.message);
}
});

return parsedStreams;
return {
addonStreams,
addonErrors,
};
}
Loading

0 comments on commit 468b791

Please sign in to comment.