From b5dc8b896fd463ed61aa23325be843c6e50ece50 Mon Sep 17 00:00:00 2001 From: Devadathan M B Date: Sat, 27 Jan 2024 16:37:11 +0530 Subject: [PATCH 01/11] Add bull queue package --- package-lock.json | 270 +++++++++++++++++++++++++++++++++++++++++++++- package.json | 1 + 2 files changed, 269 insertions(+), 2 deletions(-) diff --git a/package-lock.json b/package-lock.json index 2bd3542..8cfe252 100644 --- a/package-lock.json +++ b/package-lock.json @@ -11,6 +11,7 @@ "dependencies": { "axios": "^1.5.1", "axios-cache-interceptor": "^1.3.2", + "bull": "^4.12.2", "dotenv": "^16.3.1", "firebase-admin": "^11.11.0", "node-cron": "^3.0.2", @@ -246,6 +247,11 @@ "node": ">=6" } }, + "node_modules/@ioredis/commands": { + "version": "1.2.0", + "resolved": "https://registry.npmjs.org/@ioredis/commands/-/commands-1.2.0.tgz", + "integrity": "sha512-Sx1pU8EM64o2BrqNpEO1CNLtKQwyhuXuqyfH7oGKCk+1a33d2r5saW8zNwm3j6BTExtjrv2BxTgzzkMwts6vGg==" + }, "node_modules/@jridgewell/resolve-uri": { "version": "3.1.1", "resolved": "https://registry.npmjs.org/@jridgewell/resolve-uri/-/resolve-uri-3.1.1.tgz", @@ -283,6 +289,78 @@ "node": ">=v12.0.0" } }, + "node_modules/@msgpackr-extract/msgpackr-extract-darwin-arm64": { + "version": "3.0.2", + "resolved": "https://registry.npmjs.org/@msgpackr-extract/msgpackr-extract-darwin-arm64/-/msgpackr-extract-darwin-arm64-3.0.2.tgz", + "integrity": "sha512-9bfjwDxIDWmmOKusUcqdS4Rw+SETlp9Dy39Xui9BEGEk19dDwH0jhipwFzEff/pFg95NKymc6TOTbRKcWeRqyQ==", + "cpu": [ + "arm64" + ], + "optional": true, + "os": [ + "darwin" + ] + }, + "node_modules/@msgpackr-extract/msgpackr-extract-darwin-x64": { + "version": "3.0.2", + "resolved": "https://registry.npmjs.org/@msgpackr-extract/msgpackr-extract-darwin-x64/-/msgpackr-extract-darwin-x64-3.0.2.tgz", + "integrity": "sha512-lwriRAHm1Yg4iDf23Oxm9n/t5Zpw1lVnxYU3HnJPTi2lJRkKTrps1KVgvL6m7WvmhYVt/FIsssWay+k45QHeuw==", + "cpu": [ + "x64" + ], + "optional": true, + "os": [ + "darwin" + ] + }, + "node_modules/@msgpackr-extract/msgpackr-extract-linux-arm": { + "version": "3.0.2", + "resolved": "https://registry.npmjs.org/@msgpackr-extract/msgpackr-extract-linux-arm/-/msgpackr-extract-linux-arm-3.0.2.tgz", + "integrity": "sha512-MOI9Dlfrpi2Cuc7i5dXdxPbFIgbDBGgKR5F2yWEa6FVEtSWncfVNKW5AKjImAQ6CZlBK9tympdsZJ2xThBiWWA==", + "cpu": [ + "arm" + ], + "optional": true, + "os": [ + "linux" + ] + }, + "node_modules/@msgpackr-extract/msgpackr-extract-linux-arm64": { + "version": "3.0.2", + "resolved": "https://registry.npmjs.org/@msgpackr-extract/msgpackr-extract-linux-arm64/-/msgpackr-extract-linux-arm64-3.0.2.tgz", + "integrity": "sha512-FU20Bo66/f7He9Fp9sP2zaJ1Q8L9uLPZQDub/WlUip78JlPeMbVL8546HbZfcW9LNciEXc8d+tThSJjSC+tmsg==", + "cpu": [ + "arm64" + ], + "optional": true, + "os": [ + "linux" + ] + }, + "node_modules/@msgpackr-extract/msgpackr-extract-linux-x64": { + "version": "3.0.2", + "resolved": "https://registry.npmjs.org/@msgpackr-extract/msgpackr-extract-linux-x64/-/msgpackr-extract-linux-x64-3.0.2.tgz", + "integrity": "sha512-gsWNDCklNy7Ajk0vBBf9jEx04RUxuDQfBse918Ww+Qb9HCPoGzS+XJTLe96iN3BVK7grnLiYghP/M4L8VsaHeA==", + "cpu": [ + "x64" + ], + "optional": true, + "os": [ + "linux" + ] + }, + "node_modules/@msgpackr-extract/msgpackr-extract-win32-x64": { + "version": "3.0.2", + "resolved": "https://registry.npmjs.org/@msgpackr-extract/msgpackr-extract-win32-x64/-/msgpackr-extract-win32-x64-3.0.2.tgz", + "integrity": "sha512-O+6Gs8UeDbyFpbSh2CPEz/UOrrdWPTBYNblZK5CxxLisYt4kGX3Sc+czffFonyjiGSq3jWLwJS/CCJc7tBr4sQ==", + "cpu": [ + "x64" + ], + "optional": true, + "os": [ + "win32" + ] + }, "node_modules/@protobufjs/aspromise": { "version": "1.1.2", "resolved": "https://registry.npmjs.org/@protobufjs/aspromise/-/aspromise-1.1.2.tgz", @@ -823,6 +901,31 @@ "resolved": "https://registry.npmjs.org/buffer-fill/-/buffer-fill-1.0.0.tgz", "integrity": "sha512-T7zexNBwiiaCOGDg9xNX9PBmjrubblRkENuptryuI64URkXDFum9il/JGL8Lm8wYfAXpredVXXZz7eMHilimiQ==" }, + "node_modules/bull": { + "version": "4.12.2", + "resolved": "https://registry.npmjs.org/bull/-/bull-4.12.2.tgz", + "integrity": "sha512-WPuc0VCYx+cIVMiZtPwRpWyyJFBrj4/OgKJ6n9Jf4tIw7rQNV+HAKQv15UDkcTvfpGFehvod7Fd1YztbYSJIDQ==", + "dependencies": { + "cron-parser": "^4.2.1", + "get-port": "^5.1.1", + "ioredis": "^5.3.2", + "lodash": "^4.17.21", + "msgpackr": "^1.10.1", + "semver": "^7.5.2", + "uuid": "^8.3.0" + }, + "engines": { + "node": ">=12" + } + }, + "node_modules/bull/node_modules/uuid": { + "version": "8.3.2", + "resolved": "https://registry.npmjs.org/uuid/-/uuid-8.3.2.tgz", + "integrity": "sha512-+NYs2QeMWy+GWFOEm9xnn6HCDp0l7QBD7ml8zLUmJ+93Q5NF0NocErnwkTkXVFNiX3/fpC6afS8Dhb/gz7R7eg==", + "bin": { + "uuid": "dist/bin/uuid" + } + }, "node_modules/cache-parser": { "version": "1.2.4", "resolved": "https://registry.npmjs.org/cache-parser/-/cache-parser-1.2.4.tgz", @@ -918,6 +1021,14 @@ "node": ">=12" } }, + "node_modules/cluster-key-slot": { + "version": "1.1.2", + "resolved": "https://registry.npmjs.org/cluster-key-slot/-/cluster-key-slot-1.1.2.tgz", + "integrity": "sha512-RMr0FhtfXemyinomL4hrWcYJxmX6deFdCxpJzhDttxgO1+bcCnkk+9drydLVDmAMG7NE6aN/fl4F7ucU/90gAA==", + "engines": { + "node": ">=0.10.0" + } + }, "node_modules/color-convert": { "version": "2.0.1", "resolved": "https://registry.npmjs.org/color-convert/-/color-convert-2.0.1.tgz", @@ -971,6 +1082,17 @@ "integrity": "sha512-dcKFX3jn0MpIaXjisoRvexIJVEKzaq7z2rZKxf+MSr9TkdmHmsU4m2lcLojrj/FHl8mk5VxMmYA+ftRkP/3oKQ==", "dev": true }, + "node_modules/cron-parser": { + "version": "4.9.0", + "resolved": "https://registry.npmjs.org/cron-parser/-/cron-parser-4.9.0.tgz", + "integrity": "sha512-p0SaNjrHOnQeR8/VnfGbmg9te2kfyYSQ7Sc/j/6DtPL3JQvKxmjO9TSjNFpujqV3vEYYBvNNvXSxzyksBWAx1Q==", + "dependencies": { + "luxon": "^3.2.1" + }, + "engines": { + "node": ">=12.0.0" + } + }, "node_modules/debug": { "version": "3.2.7", "resolved": "https://registry.npmjs.org/debug/-/debug-3.2.7.tgz", @@ -994,6 +1116,14 @@ "node": ">=0.4.0" } }, + "node_modules/denque": { + "version": "2.1.0", + "resolved": "https://registry.npmjs.org/denque/-/denque-2.1.0.tgz", + "integrity": "sha512-HVQE3AAb/pxF8fQAoiqpvg9i3evqug3hoiwakOyZAwJm+6vZehbkYXZ0l4JxS+I3QxM97v5aaRNhj8v5oBhekw==", + "engines": { + "node": ">=0.10" + } + }, "node_modules/diff": { "version": "4.0.2", "resolved": "https://registry.npmjs.org/diff/-/diff-4.0.2.tgz", @@ -1372,6 +1502,17 @@ "node": "6.* || 8.* || >= 10.*" } }, + "node_modules/get-port": { + "version": "5.1.1", + "resolved": "https://registry.npmjs.org/get-port/-/get-port-5.1.1.tgz", + "integrity": "sha512-g/Q1aTSDOxFpchXC4i8ZWvxA1lnPqx/JHqcpIw0/LX9T8x/GBbi6YnlN5nhaKIFkT8oFsscUKgDJYxfwfS6QsQ==", + "engines": { + "node": ">=8" + }, + "funding": { + "url": "https://github.com/sponsors/sindresorhus" + } + }, "node_modules/glob": { "version": "8.1.0", "resolved": "https://registry.npmjs.org/glob/-/glob-8.1.0.tgz", @@ -1642,6 +1783,50 @@ "integrity": "sha512-k/vGaX4/Yla3WzyMCvTQOXYeIHvqOKtnqBduzTHpzpQZzAskKMhZ2K+EnBiSM9zGSoIFeMpXKxa4dYeZIQqewQ==", "optional": true }, + "node_modules/ioredis": { + "version": "5.3.2", + "resolved": "https://registry.npmjs.org/ioredis/-/ioredis-5.3.2.tgz", + "integrity": "sha512-1DKMMzlIHM02eBBVOFQ1+AolGjs6+xEcM4PDL7NqOS6szq7H9jSaEkIUH6/a5Hl241LzW6JLSiAbNvTQjUupUA==", + "dependencies": { + "@ioredis/commands": "^1.1.1", + "cluster-key-slot": "^1.1.0", + "debug": "^4.3.4", + "denque": "^2.1.0", + "lodash.defaults": "^4.2.0", + "lodash.isarguments": "^3.1.0", + "redis-errors": "^1.2.0", + "redis-parser": "^3.0.0", + "standard-as-callback": "^2.1.0" + }, + "engines": { + "node": ">=12.22.0" + }, + "funding": { + "type": "opencollective", + "url": "https://opencollective.com/ioredis" + } + }, + "node_modules/ioredis/node_modules/debug": { + "version": "4.3.4", + "resolved": "https://registry.npmjs.org/debug/-/debug-4.3.4.tgz", + "integrity": "sha512-PRWFHuSU3eDtQJPvnNY7Jcket1j0t5OuOsFzPPzsekD52Zl8qUfFIPEiswXqIvHWGVHOgX+7G/vCNNhehwxfkQ==", + "dependencies": { + "ms": "2.1.2" + }, + "engines": { + "node": ">=6.0" + }, + "peerDependenciesMeta": { + "supports-color": { + "optional": true + } + } + }, + "node_modules/ioredis/node_modules/ms": { + "version": "2.1.2", + "resolved": "https://registry.npmjs.org/ms/-/ms-2.1.2.tgz", + "integrity": "sha512-sGkPx+VjMtmA6MX27oA4FBFELFCZZ4S4XqeGOXCv68tT+jb3vk/RyaKWP0PTKyWtmLSM0b+adUTEvbs1PEaH2w==" + }, "node_modules/is-binary-path": { "version": "2.1.0", "resolved": "https://registry.npmjs.org/is-binary-path/-/is-binary-path-2.1.0.tgz", @@ -1903,8 +2088,7 @@ "node_modules/lodash": { "version": "4.17.21", "resolved": "https://registry.npmjs.org/lodash/-/lodash-4.17.21.tgz", - "integrity": "sha512-v2kDEe57lecTulaDIuNTPy3Ry4gLGJ6Z1O3vE1krgXZNrsQ+LFTGHVxVjcXPs17LhbZVGedAJv8XZ1tvj5FvSg==", - "optional": true + "integrity": "sha512-v2kDEe57lecTulaDIuNTPy3Ry4gLGJ6Z1O3vE1krgXZNrsQ+LFTGHVxVjcXPs17LhbZVGedAJv8XZ1tvj5FvSg==" }, "node_modules/lodash.camelcase": { "version": "4.3.0", @@ -1917,11 +2101,21 @@ "resolved": "https://registry.npmjs.org/lodash.clonedeep/-/lodash.clonedeep-4.5.0.tgz", "integrity": "sha512-H5ZhCF25riFd9uB5UCkVKo61m3S/xZk1x4wA6yp/L3RFP6Z/eHH1ymQcGLo7J3GMPfm0V/7m1tryHuGVxpqEBQ==" }, + "node_modules/lodash.defaults": { + "version": "4.2.0", + "resolved": "https://registry.npmjs.org/lodash.defaults/-/lodash.defaults-4.2.0.tgz", + "integrity": "sha512-qjxPLHd3r5DnsdGacqOMU6pb/avJzdh9tFX2ymgoZE27BmjXrNy/y4LoaiTeAb+O3gL8AfpJGtqfX/ae2leYYQ==" + }, "node_modules/lodash.includes": { "version": "4.3.0", "resolved": "https://registry.npmjs.org/lodash.includes/-/lodash.includes-4.3.0.tgz", "integrity": "sha512-W3Bx6mdkRTGtlJISOvVD/lbqjTlPPUDTMnlXZFnVwi9NKJ6tiAk6LVdlhZMm17VZisqhKcgzpO5Wz91PCt5b0w==" }, + "node_modules/lodash.isarguments": { + "version": "3.1.0", + "resolved": "https://registry.npmjs.org/lodash.isarguments/-/lodash.isarguments-3.1.0.tgz", + "integrity": "sha512-chi4NHZlZqZD18a0imDHnZPrDeBbTtVN7GXMwuGdRH9qotxAjYs3aVLKc7zNOG9eddR5Ksd8rvFEBc9SsggPpg==" + }, "node_modules/lodash.isboolean": { "version": "3.0.3", "resolved": "https://registry.npmjs.org/lodash.isboolean/-/lodash.isboolean-3.0.3.tgz", @@ -1992,6 +2186,14 @@ "resolved": "https://registry.npmjs.org/yallist/-/yallist-2.1.2.tgz", "integrity": "sha512-ncTzHV7NvsQZkYe1DW7cbDLm0YpzHmZF5r/iyP3ZnQtMiJ+pjzisCiMNI+Sj+xQF5pXhSHxSB3uDbsBTzY/c2A==" }, + "node_modules/luxon": { + "version": "3.4.4", + "resolved": "https://registry.npmjs.org/luxon/-/luxon-3.4.4.tgz", + "integrity": "sha512-zobTr7akeGHnv7eBOXcRgMeCP6+uyYsczwmeRCauvpvaAltgNyTbLH/+VaEAPUeWBT+1GuNmz4wC/6jtQzbbVA==", + "engines": { + "node": ">=12" + } + }, "node_modules/make-error": { "version": "1.3.6", "resolved": "https://registry.npmjs.org/make-error/-/make-error-1.3.6.tgz", @@ -2119,6 +2321,35 @@ "resolved": "https://registry.npmjs.org/ms/-/ms-2.1.3.tgz", "integrity": "sha512-6FlzubTLZG3J2a/NVCAleEhjzq5oxgHyaCU9yYXvcLsvoVaHJq/s5xXI6/XXP6tz7R9xAOtHnSO/tXtF3WRTlA==" }, + "node_modules/msgpackr": { + "version": "1.10.1", + "resolved": "https://registry.npmjs.org/msgpackr/-/msgpackr-1.10.1.tgz", + "integrity": "sha512-r5VRLv9qouXuLiIBrLpl2d5ZvPt8svdQTl5/vMvE4nzDMyEX4sgW5yWhuBBj5UmgwOTWj8CIdSXn5sAfsHAWIQ==", + "optionalDependencies": { + "msgpackr-extract": "^3.0.2" + } + }, + "node_modules/msgpackr-extract": { + "version": "3.0.2", + "resolved": "https://registry.npmjs.org/msgpackr-extract/-/msgpackr-extract-3.0.2.tgz", + "integrity": "sha512-SdzXp4kD/Qf8agZ9+iTu6eql0m3kWm1A2y1hkpTeVNENutaB0BwHlSvAIaMxwntmRUAUjon2V4L8Z/njd0Ct8A==", + "hasInstallScript": true, + "optional": true, + "dependencies": { + "node-gyp-build-optional-packages": "5.0.7" + }, + "bin": { + "download-msgpackr-prebuilds": "bin/download-prebuilds.js" + }, + "optionalDependencies": { + "@msgpackr-extract/msgpackr-extract-darwin-arm64": "3.0.2", + "@msgpackr-extract/msgpackr-extract-darwin-x64": "3.0.2", + "@msgpackr-extract/msgpackr-extract-linux-arm": "3.0.2", + "@msgpackr-extract/msgpackr-extract-linux-arm64": "3.0.2", + "@msgpackr-extract/msgpackr-extract-linux-x64": "3.0.2", + "@msgpackr-extract/msgpackr-extract-win32-x64": "3.0.2" + } + }, "node_modules/node-cron": { "version": "3.0.2", "resolved": "https://registry.npmjs.org/node-cron/-/node-cron-3.0.2.tgz", @@ -2165,6 +2396,17 @@ "node": ">= 6.13.0" } }, + "node_modules/node-gyp-build-optional-packages": { + "version": "5.0.7", + "resolved": "https://registry.npmjs.org/node-gyp-build-optional-packages/-/node-gyp-build-optional-packages-5.0.7.tgz", + "integrity": "sha512-YlCCc6Wffkx0kHkmam79GKvDQ6x+QZkMjFGrIMxgFNILFvGSbCp2fCBC55pGTT9gVaz8Na5CLmxt/urtzRv36w==", + "optional": true, + "bin": { + "node-gyp-build-optional-packages": "bin.js", + "node-gyp-build-optional-packages-optional": "optional.js", + "node-gyp-build-optional-packages-test": "build-test.js" + } + }, "node_modules/nodemon": { "version": "3.0.1", "resolved": "https://registry.npmjs.org/nodemon/-/nodemon-3.0.1.tgz", @@ -2424,6 +2666,25 @@ "node": ">=8.10.0" } }, + "node_modules/redis-errors": { + "version": "1.2.0", + "resolved": "https://registry.npmjs.org/redis-errors/-/redis-errors-1.2.0.tgz", + "integrity": "sha512-1qny3OExCf0UvUV/5wpYKf2YwPcOqXzkwKKSmKHiE6ZMQs5heeE/c8eXK+PNllPvmjgAbfnsbpkGZWy8cBpn9w==", + "engines": { + "node": ">=4" + } + }, + "node_modules/redis-parser": { + "version": "3.0.0", + "resolved": "https://registry.npmjs.org/redis-parser/-/redis-parser-3.0.0.tgz", + "integrity": "sha512-DJnGAeenTdpMEH6uAJRK/uiyEIH9WVsUmoLwzudwGJUwZPp80PDBWPHXSAGNPwNvIXAbe7MSUB1zQFugFml66A==", + "dependencies": { + "redis-errors": "^1.0.0" + }, + "engines": { + "node": ">=4" + } + }, "node_modules/require-directory": { "version": "2.1.1", "resolved": "https://registry.npmjs.org/require-directory/-/require-directory-2.1.1.tgz", @@ -2592,6 +2853,11 @@ "node": ">=0.10.0" } }, + "node_modules/standard-as-callback": { + "version": "2.1.0", + "resolved": "https://registry.npmjs.org/standard-as-callback/-/standard-as-callback-2.1.0.tgz", + "integrity": "sha512-qoRRSyROncaz1z0mvYqIE4lCd9p2R90i6GxW3uZv5ucSu8tU7B5HXUP1gG8pVZsYNVaXjk8ClXHPttLyxAL48A==" + }, "node_modules/stream-events": { "version": "1.0.5", "resolved": "https://registry.npmjs.org/stream-events/-/stream-events-1.0.5.tgz", diff --git a/package.json b/package.json index 4febc07..fe2629d 100644 --- a/package.json +++ b/package.json @@ -25,6 +25,7 @@ "dependencies": { "axios": "^1.5.1", "axios-cache-interceptor": "^1.3.2", + "bull": "^4.12.2", "dotenv": "^16.3.1", "firebase-admin": "^11.11.0", "node-cron": "^3.0.2", From e3bffad7d7abb892eb266aba72fdbff885959604 Mon Sep 17 00:00:00 2001 From: Devadathan M B Date: Sat, 27 Jan 2024 16:37:43 +0530 Subject: [PATCH 02/11] feat: Use queue based mechanism for message broadcasting --- src/handlers/broadcast.ts | 64 ++++++++++++++++++++++----------------- 1 file changed, 36 insertions(+), 28 deletions(-) diff --git a/src/handlers/broadcast.ts b/src/handlers/broadcast.ts index 0089bf4..ec00c03 100644 --- a/src/handlers/broadcast.ts +++ b/src/handlers/broadcast.ts @@ -1,8 +1,9 @@ // Handler to broadcast messages to all subscribed users // Only to be used by admin -// This current implementation is blocking the main thread, will need to be fixed in future import { Firestore } from "firebase-admin/firestore"; import { CustomContext } from "../types/customContext.type"; +import Queue = require("bull"); +import { TelegramError } from "telegraf"; async function broadcast(ctx: CustomContext, db: Firestore) { const stickerId = @@ -21,35 +22,42 @@ async function broadcast(ctx: CustomContext, db: Firestore) { const snapshot = await usersRef.get(); const chatIds = snapshot.docs.map((doc) => doc.data().chatId); - // Send message to all users - // Send in batahces of 25 - // Wait one minute after each batch - const batchSize = 25; - const delay = 60000; - - await ctx.reply("Broadcasting message..."); - for (let i = 0; i < chatIds.length; i += batchSize) { - console.log(`⚡ Broadcasting batch ${i / batchSize + 1} at ${new Date()}`); - const batch = chatIds.slice(i, i + batchSize); - let batchPromises: Promise[] = []; - batch.forEach((chatId) => { - batchPromises.push( - ctx.telegram - .sendMessage(chatId, message, { - parse_mode: "HTML", - disable_web_page_preview: true, - }) - .catch((error) => { - console.log(error); - }) - ); + // Create a queue to store all the chatIds + const queue = new Queue("broadcast-queue"); + for (let i = 0; i < chatIds.length; i++) { + await queue.add({ + chatId: chatIds[i], + message: message, }); - - await Promise.all(batchPromises); - await new Promise((resolve) => setTimeout(resolve, delay)); } - console.log(`⚡ Broadcast completed at ${new Date()}`); - await ctx.reply("Broadcast completed"); + + // Consumer + // Send messages to users one by one until telegram start throwing 429 + // Wait for the retry_after time and then add the job back to the queue + // And then continue with the next job + queue.process(async (job) => { + try { + await ctx.telegram.sendMessage(job.data.chatId, job.data.message, { + parse_mode: "HTML", + disable_web_page_preview: true, + }); + await job.remove(); + } catch (error: any) { + if (error instanceof TelegramError) { + if (error.code === 429) { + const retryAfter = error.parameters?.retry_after!; + await new Promise((resolve) => + setTimeout(resolve, retryAfter * 1000 + 2000) + ); + await job.retry(); + } + } + } + }); + + queue.on("completed", (job) => { + console.log(`✅ Message sent to ${job.data.chatId}`); + }); } export default broadcast; From 5808604001497a4c1a387d9fe5103d45068efdb7 Mon Sep 17 00:00:00 2001 From: Devadathan M B Date: Sat, 27 Jan 2024 23:42:21 +0530 Subject: [PATCH 03/11] refactor: Use queue for broadcasting notifications --- src/cron/notifyUserCron.ts | 190 +++++++++++++++---------------------- 1 file changed, 77 insertions(+), 113 deletions(-) diff --git a/src/cron/notifyUserCron.ts b/src/cron/notifyUserCron.ts index 5811029..0e6034f 100644 --- a/src/cron/notifyUserCron.ts +++ b/src/cron/notifyUserCron.ts @@ -7,13 +7,7 @@ import { readFile, writeFile } from "fs"; import fetchAttachment from "../services/fetchAttachment"; import { Announcement, Attachment } from "../types/types"; import findFilters from "../utils/findFilters"; - -// Telegram API only allows 30 messages per second -// So to be safe, we will send 25 messages per second -// And wait 1 minute between batches -// This will not block the bot from receiving messages since everything is asynchronous -const batchSize = 25; -const delayBetweenBatches = 1000 * 60; +import Bull = require("bull"); async function notifyUserCron(db: Firestore, bot: Telegraf) { console.log("Cron job initialized"); @@ -103,118 +97,88 @@ async function notifyUserCron(db: Firestore, bot: Telegraf) { }) ); - // Some notifications do not have attachments - // this is messy code, need to be refactored later - if (attachments.length === 0) { - for (let i = 0; i < chatIds.length; i += batchSize) { - console.log( - `🔴 Sending batch ${i / batchSize + 1} at ${new Date()}` - ); - - // Get the current batch - const batch = chatIds.slice(i, i + batchSize); - let batchPromises: Promise[] = []; - - // Push each sendDocument promise to the batchPromises array - batch.forEach((chatId) => { - batchPromises.push( - bot.telegram - .sendMessage(chatId, captionMsg, { parse_mode: "HTML" }) - .catch(async (err: TelegramError) => { - // If the user has blocked the bot, or the account is deleted - // or the bot was removed from the group - // then remove the chatid from the database - // because this leads to bot slowing down - if (err.code === 403) { - console.log( - `🔴 Telegram Error: 403. Removing chatId ${chatId} from database` - ); - try { - await usersRef.doc(chatId.toString()).delete(); - } catch (error) { - console.log(error); - } - } - }) - ); - }); + // Create queue + const queue = new Bull("notify-user-queue"); - // Wait for the batch to finish sending - await Promise.all(batchPromises); - - // Log the batch stats - console.log( - `🔴 Batch ${batch} finished sending at ${new Date()}` - ); - console.log(`🔴 Successfully sent to ${batch.length} users`); - - // Wait for the delay between batches - await new Promise((resolve) => - setTimeout(resolve, delayBetweenBatches) - ); - } + // Add all the chatIds to the queue + for (let i = 0; i < chatIds.length; i++) { + await queue.add({ + chatId: chatIds[i], + }); } - // For each attachment, fetch the annoucement, send the attachment to each chatIds in batches - attachments.forEach(async (attachment: Attachment) => { - const file = await fetchAttachment(attachment.encryptId); - const fileBuffer = Buffer.from(file, "base64"); - - // Send the attachment in batches - for (let i = 0; i < chatIds.length; i += batchSize) { - console.log( - `🔴 Sending batch ${i / batchSize + 1} at ${new Date()}` - ); - - // Get the current batch - const batch = chatIds.slice(i, i + batchSize); - let batchPromises: Promise[] = []; - - // Push each sendDocument promise to the batchPromises array - batch.forEach((chatId) => { - batchPromises.push( - bot.telegram - .sendDocument( - chatId, - { - source: fileBuffer, - filename: attachment.name, - }, - { caption: captionMsg, parse_mode: "HTML" } - ) - .catch(async (err: TelegramError) => { - // If the user has blocked the bot, or the account is deleted - // or the bot was removed from the group - // then remove the chatid from the database - // because this leads to bot slowing down - if (err.code === 403) { - console.log( - `🔴 Telegram Error: 403. Removing chatId ${chatId} from database` - ); - try { - await usersRef.doc(chatId.toString()).delete(); - } catch (error) { - console.log(error); - } + // Consumer + if (attachments.length === 0) { + queue.process(async (job) => { + const { chatId } = job.data; + try { + await bot.telegram.sendMessage(chatId, captionMsg, { + parse_mode: "HTML", + }); + } catch (error: any) { + if (error instanceof TelegramError) { + console.log(error); + if (error.code === 429) { + const retryAfter = error.parameters?.retry_after!; + await new Promise((resolve) => + setTimeout(resolve, retryAfter * 1000 + 2000) + ); + await job.retry(); + } else if (error.code === 403) { + try { + await usersRef.doc(chatId.toString()).delete(); + await job.remove(); + } catch (error) { + console.log(error); + } + } + } + } + }); + } else { + // fetch attachments + // send each attachment to each chatId + for (let i = 0; i < attachments.length; i++) { + const file = await fetchAttachment(attachments[i].encryptId); + const fileBuffer = Buffer.from(file, "base64"); + queue.process(async (job) => { + const chatId = job.data.chatId; + try { + await bot.telegram.sendDocument( + chatId, + { + source: fileBuffer, + filename: attachments[i].name, + }, + { caption: captionMsg, parse_mode: "HTML" } + ); + } catch (error: any) { + if (error instanceof TelegramError) { + console.log(error); + if (error.code === 429) { + const retryAfter = error.parameters?.retry_after!; + await new Promise((resolve) => + setTimeout(resolve, retryAfter * 1000 + 2000) + ); + await job.retry(); + } else if (error.code === 403) { + try { + await usersRef.doc(chatId.toString()).delete(); + await job.remove(); + } catch (error) { + console.log(error); } - }) - ); + } + } + } }); - - // Wait for the batch to finish sending - await Promise.all(batchPromises); - - // Log the batch stats - console.log( - `🔴 Batch ${batch} finished sending at ${new Date()}` - ); - console.log(`🔴 Successfully sent to ${batch.length} users`); - - // Wait for the delay between batches - await new Promise((resolve) => - setTimeout(resolve, delayBetweenBatches) - ); } + } + + // Job completed event + queue.on("completed", async (job) => { + console.log(`✅ Message sent to ${job.data.chatId}`); + await job.remove(); }); } } From 91038000ba1f61b1bdb2aa351d8360d3bb6f7ade Mon Sep 17 00:00:00 2001 From: Devadathan M B Date: Sat, 27 Jan 2024 23:43:48 +0530 Subject: [PATCH 04/11] cd: Add commands to handle redis queue to build-deploy action --- .github/workflows/build-deploy.yml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/.github/workflows/build-deploy.yml b/.github/workflows/build-deploy.yml index c78548d..9435543 100644 --- a/.github/workflows/build-deploy.yml +++ b/.github/workflows/build-deploy.yml @@ -48,5 +48,8 @@ jobs: docker tag ${{ secrets.DOCKERHUB_USERNAME }}/ktu-bot:${{ github.sha }} ktu-bot:latest docker logs ktu-bot >> ~/bot.log || true docker stop ktu-bot || true + docker stop redis-queue || true docker rm ktu-bot || true + docker rm redis-queue || true docker run -d --restart always -p 5000:5000 -e TZ=Asia/Kolkata -e ENV_TYPE=${{ secrets.ENV_TYPE }} -e ADMIN_ID=${{ secrets.ADMIN_ID }} -e WEBHOOK_DOMAIN=${{ secrets.WEBHOOK_DOMAIN }} -e WEBHOOK_PORT=${{ secrets.WEBHOOK_PORT }} -e BOT_TOKEN=${{ secrets.BOT_TOKEN }} -e FIREBASE_SERVICE_ACCOUNT="${{ secrets.FIREBASE_SERVICE_ACCOUNT }}" --name ktu-bot ktu-bot:latest + docker run -d --restart always -p 6379:6379 --name redis-queue redis From 18fa1928f3ef857326b89a0deb7af2790f7e8c2b Mon Sep 17 00:00:00 2001 From: Devadathan M B Date: Sat, 27 Jan 2024 23:44:23 +0530 Subject: [PATCH 05/11] Add redis queue config to docker-compose.yml --- docker-compose.yml | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/docker-compose.yml b/docker-compose.yml index 14487e6..30c2283 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -3,3 +3,17 @@ services: ktu-bot: build: . restart: always + depends_on: + redis-queue: + condition: service_healthy + + redis-queue: + image: redis + restart: always + ports: + - "6379:6379" + healthcheck: + test: ["CMD", "redis-cli", "ping"] + interval: 5s + timeout: 5s + retries: 5 From 9962538901e190649dc41a26c0d00d6a9927b842 Mon Sep 17 00:00:00 2001 From: Devadathan M B Date: Sun, 28 Jan 2024 00:03:43 +0530 Subject: [PATCH 06/11] Clear bull queue on bot stop/exit --- src/app.ts | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/src/app.ts b/src/app.ts index 7e96737..05e9b9e 100644 --- a/src/app.ts +++ b/src/app.ts @@ -1,5 +1,7 @@ import createBot from "./createBot"; import notifyUserCron from "./cron/notifyUserCron"; +import Bull = require("bull"); +const notifyUserQueue = new Bull("notify-user-queue"); // Create the bot and initialize the database const { bot, db } = createBot(); @@ -36,7 +38,13 @@ const launchBot = async () => { }; // Graceful stop -process.once("SIGINT", () => bot.stop("SIGINT")); -process.once("SIGTERM", () => bot.stop("SIGTERM")); +process.once("SIGINT", async () => { + bot.stop("SIGINT"); + await notifyUserQueue.obliterate({ force: true }); +}); +process.once("SIGTERM", async () => { + bot.stop("SIGTERM"); + await notifyUserQueue.obliterate({ force: true }); +}); launchBot(); From fca2180ee216cc50a539b3bdecf60b9cbaefaf15 Mon Sep 17 00:00:00 2001 From: Devadathan M B Date: Sun, 28 Jan 2024 00:04:02 +0530 Subject: [PATCH 07/11] Remove logging statements --- src/cron/notifyUserCron.ts | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/cron/notifyUserCron.ts b/src/cron/notifyUserCron.ts index 0e6034f..c9a9436 100644 --- a/src/cron/notifyUserCron.ts +++ b/src/cron/notifyUserCron.ts @@ -117,7 +117,6 @@ async function notifyUserCron(db: Firestore, bot: Telegraf) { }); } catch (error: any) { if (error instanceof TelegramError) { - console.log(error); if (error.code === 429) { const retryAfter = error.parameters?.retry_after!; await new Promise((resolve) => @@ -154,7 +153,6 @@ async function notifyUserCron(db: Firestore, bot: Telegraf) { ); } catch (error: any) { if (error instanceof TelegramError) { - console.log(error); if (error.code === 429) { const retryAfter = error.parameters?.retry_after!; await new Promise((resolve) => From b875604dcb8d0a91208fb1cb279df8eaa28e20ea Mon Sep 17 00:00:00 2001 From: Devadathan M B Date: Mon, 29 Jan 2024 22:10:52 +0530 Subject: [PATCH 08/11] cd: Create network and attach containers to the network --- .github/workflows/build-deploy.yml | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/.github/workflows/build-deploy.yml b/.github/workflows/build-deploy.yml index 9435543..609c5d9 100644 --- a/.github/workflows/build-deploy.yml +++ b/.github/workflows/build-deploy.yml @@ -48,8 +48,10 @@ jobs: docker tag ${{ secrets.DOCKERHUB_USERNAME }}/ktu-bot:${{ github.sha }} ktu-bot:latest docker logs ktu-bot >> ~/bot.log || true docker stop ktu-bot || true - docker stop redis-queue || true + docker stop redis-queue-db || true docker rm ktu-bot || true - docker rm redis-queue || true - docker run -d --restart always -p 5000:5000 -e TZ=Asia/Kolkata -e ENV_TYPE=${{ secrets.ENV_TYPE }} -e ADMIN_ID=${{ secrets.ADMIN_ID }} -e WEBHOOK_DOMAIN=${{ secrets.WEBHOOK_DOMAIN }} -e WEBHOOK_PORT=${{ secrets.WEBHOOK_PORT }} -e BOT_TOKEN=${{ secrets.BOT_TOKEN }} -e FIREBASE_SERVICE_ACCOUNT="${{ secrets.FIREBASE_SERVICE_ACCOUNT }}" --name ktu-bot ktu-bot:latest - docker run -d --restart always -p 6379:6379 --name redis-queue redis + docker rm redis-queue-db || true + docker network rm ktu-bot-network || true + docker network create ktu-bot-network || true + docker run -d --restart always -p 6379:6379 --network ktu-bot --network-alias redis-queue-db --name redis-db redis + docker run -d --restart always -p 5000:5000 -e TZ=Asia/Kolkata --network ktu-bot -e ENV_TYPE=${{ secrets.ENV_TYPE }} -e ADMIN_ID=${{ secrets.ADMIN_ID }} -e WEBHOOK_DOMAIN=${{ secrets.WEBHOOK_DOMAIN }} -e WEBHOOK_PORT=${{ secrets.WEBHOOK_PORT }} -e BOT_TOKEN=${{ secrets.BOT_TOKEN }} -e FIREBASE_SERVICE_ACCOUNT="${{ secrets.FIREBASE_SERVICE_ACCOUNT }}" --name ktu-bot ktu-bot:latest From d35315fa5e39d26b12ed8d9af58a38179a58ed42 Mon Sep 17 00:00:00 2001 From: Devadathan M B Date: Mon, 29 Jan 2024 22:21:21 +0530 Subject: [PATCH 09/11] Pass custom hostname to redis-io --- src/cron/notifyUserCron.ts | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/cron/notifyUserCron.ts b/src/cron/notifyUserCron.ts index c9a9436..730a434 100644 --- a/src/cron/notifyUserCron.ts +++ b/src/cron/notifyUserCron.ts @@ -98,7 +98,11 @@ async function notifyUserCron(db: Firestore, bot: Telegraf) { ); // Create queue - const queue = new Bull("notify-user-queue"); + const queue = new Bull("notify-user-queue", { + redis: { + host: "redis-queue-db", + }, + }); // Add all the chatIds to the queue for (let i = 0; i < chatIds.length; i++) { From a965e5bca17f0ab2fc34711133276290c5c35869 Mon Sep 17 00:00:00 2001 From: Devadathan M B Date: Mon, 29 Jan 2024 22:21:44 +0530 Subject: [PATCH 10/11] Change redis service name --- docker-compose.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docker-compose.yml b/docker-compose.yml index 30c2283..e9e09f0 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -4,10 +4,10 @@ services: build: . restart: always depends_on: - redis-queue: + redis-queue-db: condition: service_healthy - redis-queue: + redis-queue-db: image: redis restart: always ports: From 5fdc2b047c36c7811ccdbfe04696cbb48e003712 Mon Sep 17 00:00:00 2001 From: Devadathan M B Date: Mon, 29 Jan 2024 22:22:04 +0530 Subject: [PATCH 11/11] Remove redis port mapping --- docker-compose.yml | 2 -- 1 file changed, 2 deletions(-) diff --git a/docker-compose.yml b/docker-compose.yml index e9e09f0..2d14356 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -10,8 +10,6 @@ services: redis-queue-db: image: redis restart: always - ports: - - "6379:6379" healthcheck: test: ["CMD", "redis-cli", "ping"] interval: 5s