Skip to content

Commit

Permalink
fix: Fix duplicate handler attachment for new cron jobs
Browse files Browse the repository at this point in the history
  • Loading branch information
devadathanmb committed Feb 23, 2024
1 parent a379426 commit efed838
Show file tree
Hide file tree
Showing 5 changed files with 116 additions and 87 deletions.
6 changes: 4 additions & 2 deletions src/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ import createBot from "./createBot";
import notifyUserCron from "./cron/notifyUserCron";
import Bull = require("bull");
const notifyUserQueue = new Bull("notify-user-queue");
import createJobQueue from "./cron/queue";

// Create the bot and initialize the database
const { bot, db } = createBot();
const queue = createJobQueue(bot);

// Launch the bot
const launchBot = async () => {
Expand All @@ -16,7 +18,7 @@ const launchBot = async () => {
console.log(
`Bot started in polling mode. Available at https://t.me/${res.username}`
);
notifyUserCron(db, bot);
notifyUserCron(db, queue);
});
}
// Launch in webhook mode if in production
Expand All @@ -32,7 +34,7 @@ const launchBot = async () => {
console.log(
`Bot started in webhook mode. Available at https://t.me/${res.username}`
);
notifyUserCron(db, bot);
notifyUserCron(db, queue);
});
}
};
Expand Down
112 changes: 27 additions & 85 deletions src/cron/notifyUserCron.ts
Original file line number Diff line number Diff line change
@@ -1,22 +1,14 @@
import * as cron from "node-cron";
import { Firestore } from "firebase-admin/firestore";
import { Telegraf, TelegramError } from "telegraf";
import { CustomContext } from "../types/customContext.type";
import fetchAnnouncements from "../services/fetchAnnouncements";
import { readFile, writeFile } from "fs";
import fetchAttachment from "../services/fetchAttachment";
import { Announcement, Attachment } from "../types/types";
import findFilters from "../utils/findFilters";
import getCaptionMsg from "../utils/getCaptionMsg";
import Bull = require("bull");

// Create queue
const queue = new Bull("notify-user-queue", {
redis: {
host: "redis-queue-db",
},
});

async function notifyUserCron(db: Firestore, bot: Telegraf<CustomContext>) {
async function notifyUserCron(db: Firestore, queue: Bull.Queue) {
console.log("Cron job initialized");
cron.schedule("*/10 * * * *", async () => {
const startTime = new Date().toString();
Expand Down Expand Up @@ -89,14 +81,8 @@ async function notifyUserCron(db: Firestore, bot: Telegraf<CustomContext>) {
continue;
}

const captionMsg = `
<b>Subject:</b> ${announcement.subject ? announcement.subject : "N/A"}
<b>Date:</b> ${announcement.date ? announcement.date : "N/A"}
<b>Message:</b> ${announcement.message ? announcement.message : "N/A"}
`;
// Get the caption message
const captionMsg = getCaptionMsg(announcement);

// Get the data to fetch the attachments
const attachments = announcement.attachments.map(
Expand All @@ -106,84 +92,40 @@ async function notifyUserCron(db: Firestore, bot: Telegraf<CustomContext>) {
})
);

// Add all the chatIds to the queue
for (let i = 0; i < chatIds.length; i++) {
await queue.add({
chatId: chatIds[i],
});
}

// Consumer
// Add stuff to the queue
// Pass fileBuffer as null since there are no attachments
if (attachments.length === 0) {
queue.process(async (job) => {
const { chatId } = job.data;
for (let i = 0; i < chatIds.length; i++) {
try {
await bot.telegram.sendMessage(chatId, captionMsg, {
parse_mode: "HTML",
await queue.add({
chatId: chatIds[i],
fileBuffer: null,
captionMsg: captionMsg,
fileName: null,
});
} catch (error: any) {
if (error instanceof TelegramError) {
if (error.code === 429) {
const retryAfter = error.parameters?.retry_after!;
await new Promise((resolve) =>
setTimeout(resolve, retryAfter * 1000 + 2000)
);
await job.retry();
} else if (error.code === 403) {
try {
await usersRef.doc(chatId.toString()).delete();
await job.remove();
} catch (error) {
console.log(error);
}
}
}
} catch (error) {
console.log(error);
}
});
}
} else {
// fetch attachments
// send each attachment to each chatId
// Loop through each attachment and add to the queue
for (let i = 0; i < attachments.length; i++) {
const file = await fetchAttachment(attachments[i].encryptId);
const fileBuffer = Buffer.from(file, "base64");
queue.process(async (job) => {
const chatId = job.data.chatId;

for (let j = 0; j < chatIds.length; j++) {
try {
await bot.telegram.sendDocument(
chatId,
{
source: fileBuffer,
filename: attachments[i].name,
},
{ caption: captionMsg, parse_mode: "HTML" }
);
} catch (error: any) {
if (error instanceof TelegramError) {
if (error.code === 429) {
const retryAfter = error.parameters?.retry_after!;
await new Promise((resolve) =>
setTimeout(resolve, retryAfter * 1000 + 2000)
);
await job.retry();
} else if (error.code === 403) {
try {
await usersRef.doc(chatId.toString()).delete();
await job.remove();
} catch (error) {
console.log(error);
}
}
}
await queue.add({
chatId: chatIds[i],
file: file,
captionMsg: captionMsg,
fileName: attachments[i].name,
});
} catch (error) {
console.log(error);
}
});
}
}
}

// Job completed event
queue.on("completed", async (job) => {
console.log(`✅ Message sent to ${job.data.chatId}`);
await job.remove();
});
}
}
} catch (error) {
Expand Down
60 changes: 60 additions & 0 deletions src/cron/queue.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import { Telegraf, TelegramError } from "telegraf";
import { CustomContext } from "../types/customContext.type";
import Bull = require("bull");
import { JobData } from "../types/types";

function createJobQueue(bot: Telegraf<CustomContext>) {
// Create queue
const queue = new Bull<JobData>("notify-user-queue");

// Job completed event
queue.on("completed", async (job) => {
console.log(`✅ Message sent to ${job.data.chatId}`);
await job.remove();
});

// Consumer
queue.process(async (job) => {
const { chatId, file, captionMsg, fileName } = job.data;

try {
if (!file || !fileName) {
await bot.telegram.sendMessage(chatId, captionMsg, {
parse_mode: "HTML",
});
} else {
const fileBuffer = Buffer.from(file as string, "base64");
await bot.telegram.sendDocument(
chatId,
{
source: fileBuffer,
filename: fileName,
},
{ caption: captionMsg, parse_mode: "HTML" }
);
}
} catch (error: any) {
console.log(error);
if (error instanceof TelegramError) {
if (error.code === 429) {
const retryAfter = error.parameters?.retry_after!;
await new Promise((resolve) =>
setTimeout(resolve, retryAfter * 1000 + 2000)
);
await job.retry();
} else if (error.code === 403) {
try {
/* await usersRef.doc(chatId.toString()).delete(); */
await job.remove();
} catch (error) {
console.log(error);
}
}
}
}
});

return queue;
}

export default createJobQueue;
8 changes: 8 additions & 0 deletions src/types/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,13 @@ interface Command {
description: string;
}

interface JobData {
chatId: number;
file: string | null;
captionMsg: string;
fileName: string | null;
}

export {
ResultDetails,
ResultSummary,
Expand All @@ -73,4 +80,5 @@ export {
AcademicCalendar,
Timetable,
Command,
JobData,
};
17 changes: 17 additions & 0 deletions src/utils/getCaptionMsg.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
// Utility function to get the caption message from the message object
import { Announcement } from "../types/types";

function getCaptionMsg(announcement: Announcement) {
const captionMsg = `
<b>Subject:</b> ${announcement.subject ? announcement.subject : "N/A"}
<b>Date:</b> ${announcement.date ? announcement.date : "N/A"}
<b>Message:</b> ${announcement.message ? announcement.message : "N/A"}
`;

return captionMsg;
}

export default getCaptionMsg;

0 comments on commit efed838

Please sign in to comment.