From 77e193cb5d3e60ffc7f6f837c68681ec91d387f5 Mon Sep 17 00:00:00 2001 From: Robert Michalski Date: Thu, 20 Aug 2020 12:09:40 +0200 Subject: [PATCH] Rewritten scripts to be more concurrent and for better code reuse. --- .env.example | 1 + clean_completed_jobs.js | 90 ++++++++------------- clean_failed_job_stacks.js | 113 +++++++++++---------------- package.json | 2 +- retry_failed_jobs.js | 110 +++++++++++--------------- utils/getFailedQueues.js | 24 ++++++ utils/getQueues.js | 22 ++++++ utils/init.js | 14 ++++ utils/isNonEmptyString.js | 1 + utils/jobsExceedingAttemptsFilter.js | 1 + slack.js => utils/slack.js | 4 +- utils/sumArray.js | 1 + 12 files changed, 190 insertions(+), 193 deletions(-) create mode 100644 utils/getFailedQueues.js create mode 100644 utils/getQueues.js create mode 100644 utils/init.js create mode 100644 utils/isNonEmptyString.js create mode 100644 utils/jobsExceedingAttemptsFilter.js rename slack.js => utils/slack.js (86%) create mode 100644 utils/sumArray.js diff --git a/.env.example b/.env.example index 54bb9cf..1aecb97 100644 --- a/.env.example +++ b/.env.example @@ -8,6 +8,7 @@ REDIS_DATABASE=0 #REDIS_TLS=true MAX_FAILED_COUNT=100 +CONCURRENCY=25 SLACK_WEBHOOK_URL=https://hooks.slack.com/services/... SLACK_CHANNEL="#my-channel" diff --git a/clean_completed_jobs.js b/clean_completed_jobs.js index 161bf11..8b82a6f 100644 --- a/clean_completed_jobs.js +++ b/clean_completed_jobs.js @@ -1,73 +1,47 @@ -const Queue = require('bull') +const queueNames = require('./utils/init') -const result = require('dotenv').config() -if (result.error) { - throw result.error -} +const sendToSlack = require('./utils/slack')(process.env.SLACK_WEBHOOK_URL, process.env.SLACK_CHANNEL) -const { forEach } = require('p-iteration') - -const REDIS_PORT = Number.parseInt(process.env.REDIS_PORT) || 6379 -const REDIS_HOST = process.env.REDIS_HOST || '127.0.0.1' -const REDIS_DATABASE = Number.parseInt(process.env.REDIS_DATABASE) || 0 -const REDIS_PASS = process.env.REDIS_PASS -const REDIS_TLS = process.env.REDIS_TLS && `${process.env.REDIS_TLS}` === 'true' +const sumArray = require('./utils/sumArray') +const getQueues = require('./utils/getQueues') const JOB_AGE = Number.parseInt(process.env.JOB_AGE) || 5000 -const REDIS_CONFIG = { - redis: { - port: REDIS_PORT, - host: REDIS_HOST, - db: REDIS_DATABASE, - password: REDIS_PASS, - tls: REDIS_TLS ? {} : null - } -} - -const args = process.argv.slice(2) - -if (args.length < 1) { - console.error(`${new Date().toISOString()} - No queue name specified, please specify at least one queue name.`) - process.exit(1) -} - -const sendToSlack = require('./slack')(process.env.SLACK_WEBHOOK_URL, process.env.SLACK_CHANNEL) - const startTime = Date.now() -console.log(`${new Date(startTime).toISOString()} - Start cleaning completed jobs in ${args.join(', ')} queues`) - -forEach(args, async (name) => { - console.log(`${Date.now() - startTime} ms - Start cleaning completed jobs in ${name} queue`) - const queue = new Queue(name, REDIS_CONFIG) - - queue.on('cleaned', async (jobs, type) => { - // jobs is an array of cleaned jobs - const text = `Cleaned ${jobs.length} ${type} jobs in ${name}.` - const jobIds = jobs.map(job => job && job === typeof 'object' && job.id ? job.id : job) - - console.log(`${Date.now() - startTime} ms - ${text}`, jobIds) - await sendToSlack({ - text, - jobs: jobIds - //type // the type of jobs cleaned }) +console.log(`${new Date().toISOString()} - Checking for completed jobs in ${queueNames.join(', ')} queues`) + +getQueues(queueNames) + .then(async queues => { + const operations = queues.map(queue => { + console.log(`${Date.now() - startTime} ms - Start cleaning completed jobs in ${queue.name} queue`) + + return new Promise(resolve => { + queue.on('cleaned', async (jobs, type) => { + // jobs is an array of cleaned jobs + const text = `Cleaned ${jobs.length} ${type} jobs in ${queue.name}.` + const jobIds = jobs.map(job => job && job === typeof 'object' && job.id ? job.id : job) + + console.log(`${Date.now() - startTime} ms - ${text}`) + await Promise.all([ + sendToSlack({ text }), + queue.close() + ]) + resolve(jobIds.length) + }) + queue.clean(JOB_AGE) // start cleaning jobs older than JOB_AGE + }) }) - - await queue.close() + return Promise.all(operations) }) - - await queue.clean(JOB_AGE) - - console.log(`${Date.now() - startTime} ms - Finished cleaning completed jobs in ${name} queue`) -}) - .then(() => { - console.log(`${Date.now() - startTime} ms - Finished cleaning completed jobs in queues`) + .then(sumArray) + .then(cleanedJobCount => { + console.log(`${Date.now() - startTime} ms - Finished cleaning ${cleanedJobCount} completed jobs in ${queueNames.join(', ')} queues`) process.exitCode = 0 }) .catch(err => { console.error(err) - console.log(`${Date.now() - startTime} ms - Finished cleaning completed jobs in queues`) - process.exitCode = 2 + console.log(`${Date.now() - startTime} ms - Finished cleaning some completed jobs in ${queueNames.join(', ')} queues`) + process.exitCode = 1 }) diff --git a/clean_failed_job_stacks.js b/clean_failed_job_stacks.js index f7f9bbc..493e601 100644 --- a/clean_failed_job_stacks.js +++ b/clean_failed_job_stacks.js @@ -1,85 +1,64 @@ -const Queue = require('bull') -const result = require('dotenv').config() +const queueNames = require('./utils/init') -if (result.error) { - throw result.error -} +const pMap = require('p-map') +const sendToSlack = require('./utils/slack')(process.env.SLACK_WEBHOOK_URL, process.env.SLACK_CHANNEL) -const { forEach, reduce } = require('p-iteration') +const sumArray = require('./utils/sumArray') +const getFailedQueues = require('./utils/getFailedQueues') +const getAttemptedJobs = require('./utils/jobsExceedingAttemptsFilter') -const REDIS_PORT = Number.parseInt(process.env.REDIS_PORT) || 6379 -const REDIS_HOST = process.env.REDIS_HOST || '127.0.0.1' -const REDIS_DATABASE = Number.parseInt(process.env.REDIS_DATABASE) || 0 -const REDIS_PASS = process.env.REDIS_PASS -const REDIS_TLS = process.env.REDIS_TLS && `${process.env.REDIS_TLS}` === 'true' -const MAX_FAILED_COUNT = Number.parseInt(process.env.MAX_FAILED_COUNT) || 100 +console.log(`${new Date().toISOString()} - Checking for failed jobs in ${queueNames.join(', ')} queues`) -const REDIS_CONFIG = { - redis: { - port: REDIS_PORT, - host: REDIS_HOST, - db: REDIS_DATABASE, - password: REDIS_PASS, - tls: REDIS_TLS ? {} : null - } -} - -const args = process.argv.slice(2) - -if (args.length < 1) { - console.error(`${new Date().toISOString()} - No queue name specified, please specify at least one queue name.`) - process.exit(1) -} - -const sendToSlack = require('./slack')(process.env.SLACK_WEBHOOK_URL, process.env.SLACK_CHANNEL) +const CONCURRENCY = parseInt(process.env.CONCURRENCY) || 50 const startTime = Date.now() -console.log(`${new Date(startTime).toISOString()} - Checking for failed jobs in ${args.join(', ')} queues`) -forEach(args, async (name, index) => { - console.log(`${Date.now() - startTime} ms - Start cleaning stacks for failed jobs in ${name} queue`) +getFailedQueues(queueNames) + .then(async queues => { + const retriedJobCountsForEachQueue = await Promise.all( + queues.map(async queue => { + const failedJobs = getAttemptedJobs(await queue.getFailed()) - const queue = new Queue(name, REDIS_CONFIG) + if (!failedJobs.length) { + return queue.close().then(() => 0) + } - const failedCount = await queue.getFailedCount() - if (failedCount > 0) { - let text = `Found ${failedCount} failed jobs in ${name} queue.` - console.log(`${Date.now() - startTime} ms - ${text}`) - // await sendToSlack({ text }) + const failedCount = failedJobs.length + const retriedJobs = await pMap( + failedJobs, + async job => { + const { id, data, opts } = job + opts.jobId = id + try { + await job.remove() + const rescheduled = await queue.add(data, opts) + return 1 + } catch (e) { + console.error(e) + return 0 + } + }, + { concurrency: CONCURRENCY } + ) + const retriedJobCountInQueue = sumArray(retriedJobs) + const text = `Cleaned stack traces for ${retriedJobCountInQueue} of ${failedCount} failed jobs in ${queue.name} queue` + console.log(`${Date.now() - startTime} ms - ${text}`) - const jobs = await queue.getFailed() - const retriedJobCount = await reduce(jobs, async (count, job) => { - if (!job || typeof job !== 'object' || job.attemptsMade <= job.opts.attempts) { - return count - } - const { id, data, opts } = job - opts.jobId = id - try { - await job.remove() - const rescheduled = await queue.add(data, opts) - count++ - } catch (e) { - console.error(e) - } - return count - }, 0) + return Promise.all([ + sendToSlack({ text }), + queue.close() + ]).then(() => retriedJobCountInQueue) + }) + ) - text = `Cleaned stack traces for ${retriedJobCount} of ${failedCount} failed jobs in ${name} queue` - console.log(`${Date.now() - startTime} ms - ${text}`) - await sendToSlack({ text }) - } else { - console.log(`${Date.now() - startTime} ms - No jobs failed jobs in ${name} queue`) - } - - await queue.close() + return sumArray(retriedJobCountsForEachQueue) }) - .then(() => { - console.log(`${Date.now() - startTime} ms - Finished cleaning stacks for failed jobs in queues`) + .then(retriedJobsInAllQueues => { + console.log(`${Date.now() - startTime} ms - Finished cleaning stacks for ${retriedJobsInAllQueues} failed jobs in ${queueNames.join(', ')} queues`) process.exitCode = 0 }) .catch(err => { console.error(err) - console.log(`${Date.now() - startTime} ms - Finished cleaning stacks for failed jobs in queues`) + console.log(`${Date.now() - startTime} ms - Finished cleaning stacks for some failed jobs in ${queueNames.join(', ')} queues`) process.exitCode = 1 }) - diff --git a/package.json b/package.json index 3638d9b..b8112ab 100644 --- a/package.json +++ b/package.json @@ -9,7 +9,7 @@ "dependencies": { "dotenv": "^8.2.0", "bull": "^3.18.0", - "p-iteration": "^1.1.8", + "p-map": "^4.0.0", "slack-notify": "^0.1.7" }, "devDependencies": { diff --git a/retry_failed_jobs.js b/retry_failed_jobs.js index eb47966..9d40453 100644 --- a/retry_failed_jobs.js +++ b/retry_failed_jobs.js @@ -1,82 +1,64 @@ -const Queue = require('bull') -const result = require('dotenv').config() +const queueNames = require('./utils/init') -if (result.error) { - throw result.error -} +const pMap = require('p-map') +const sendToSlack = require('./utils/slack')(process.env.SLACK_WEBHOOK_URL, process.env.SLACK_CHANNEL) -const { forEach, reduce } = require('p-iteration') +const sumArray = require('./utils/sumArray') +const getFailedQueues = require('./utils/getFailedQueues') +const getAttemptedJobs = require('./utils/jobsExceedingAttemptsFilter') -const REDIS_PORT = Number.parseInt(process.env.REDIS_PORT) || 6379 -const REDIS_HOST = process.env.REDIS_HOST || '127.0.0.1' -const REDIS_DATABASE = Number.parseInt(process.env.REDIS_DATABASE) || 0 -const REDIS_PASS = process.env.REDIS_PASS -const REDIS_TLS = process.env.REDIS_TLS && `${process.env.REDIS_TLS}` === 'true' -const MAX_FAILED_COUNT = Number.parseInt(process.env.MAX_FAILED_COUNT) || 100 - -const REDIS_CONFIG = { - redis: { - port: REDIS_PORT, - host: REDIS_HOST, - db: REDIS_DATABASE, - password: REDIS_PASS, - tls: REDIS_TLS ? {} : null - } -} - -const args = process.argv.slice(2) - -if (args.length < 1) { - console.error(`${new Date().toISOString()} - No queue name specified, please specify at least one queue name.`) - process.exit(1) -} +console.log(`${new Date().toISOString()} - Checking for failed jobs in ${queueNames.join(', ')} queues`) -const sendToSlack = require('./slack')(process.env.SLACK_WEBHOOK_URL, process.env.SLACK_CHANNEL) +const CONCURRENCY = parseInt(process.env.CONCURRENCY) || 50 +const MAX_FAILED_COUNT = Number.parseInt(process.env.MAX_FAILED_COUNT) || 100 const startTime = Date.now() -console.log(`${new Date(startTime).toISOString()} - Checking for failed jobs in ${args.join(', ')} queues`) - -forEach(args, async (name, index) => { - console.log(`${Date.now() - startTime} ms - Start retrying failed jobs in ${name} queue`) - const queue = new Queue(name, REDIS_CONFIG) +getFailedQueues(queueNames) + .then(async queues => { + const retriedJobCountsForEachQueue = await Promise.all( + queues.map(async queue => { + const failedJobs = getAttemptedJobs(await queue.getFailed()) + .filter(job => job.attemptsMade > MAX_FAILED_COUNT) - const failedCount = await queue.getFailedCount() - if (failedCount > 0) { - let text = `Found ${failedCount} failed jobs in ${name} queue.` - console.log(`${Date.now() - startTime} ms - ${text}`) - // await sendToSlack({ text }) + if (!failedJobs.length) { + return queue.close().then(() => 0) + } - const jobs = await queue.getFailed() - const retriedJobCount = await reduce(jobs, async (count, job) => { - if (!job || typeof job !== 'object' || job.attemptsMade > MAX_FAILED_COUNT) { - return count - } - try { - await job.retry() - count++ - } catch (e) { - console.error(e) - } - return count - }, 0) + const failedCount = failedJobs.length + const retriedJobs = await pMap( + failedJobs, + async job => { + try { + await job.retry() + return 1 + } catch (e) { + console.error(e) + return 0 + } + }, + { concurrency: CONCURRENCY } + ) + const retriedJobCountInQueue = sumArray(retriedJobs) + const text = `Retried ${retriedJobCountInQueue} of ${failedCount} failed jobs in ${queue.name} queue` + console.log(`${Date.now() - startTime} ms - ${text}`) - text = `Retrying ${retriedJobCount} of ${failedCount} failed jobs in ${name} queue` - console.log(`${Date.now() - startTime} ms - ${text}`) - await sendToSlack({ text }) - } else { - console.log(`${Date.now() - startTime} ms - No jobs failed jobs in ${name} queue`) - } + return Promise.all([ + sendToSlack({ text }), + queue.close() + ]).then(() => retriedJobCountInQueue) + }) + ) - await queue.close() -}) - .then(() => { - console.log(`${Date.now() - startTime} ms - Finished retrying failed jobs in queues`) + return sumArray(retriedJobCountsForEachQueue) + }) + .then((retriedJobsInAllQueues) => { + console.log(`${Date.now() - startTime} ms - Finished retrying ${retriedJobsInAllQueues} failed jobs in ${queueNames.join(', ')} queues`) process.exitCode = 0 }) .catch(err => { console.error(err) - console.log(`${Date.now() - startTime} ms - Finished retrying failed jobs in queues`) + console.log(`${Date.now() - startTime} ms - Finished retrying failed jobs in ${queueNames.join(', ')} queues`) process.exitCode = 1 }) diff --git a/utils/getFailedQueues.js b/utils/getFailedQueues.js new file mode 100644 index 0000000..eb8fe31 --- /dev/null +++ b/utils/getFailedQueues.js @@ -0,0 +1,24 @@ +const getQueues = require('./getQueues') +//const sendToSlack = require('./slack')(process.env.SLACK_WEBHOOK_URL, process.env.SLACK_CHANNEL) + +module.exports = async queueNames => { + const startTime = Date.now() + + return getQueues( + queueNames, + async queue => { + console.log(`${Date.now() - startTime} ms - Start cleaning stacks for failed jobs in ${queue.name} queue`) + const failedCount = await queue.getFailedCount() + if (!failedCount) { + console.log(`${Date.now() - startTime} ms - No failed jobs in ${queue.name} queue`) + await queue.close() + return null + } + + const text = `Found ${failedCount} failed jobs in ${queue.name} queue.` + console.log(`${Date.now() - startTime} ms - ${text}`) + // await sendToSlack({ text }) + return queue + } + ) +} diff --git a/utils/getQueues.js b/utils/getQueues.js new file mode 100644 index 0000000..11b6d48 --- /dev/null +++ b/utils/getQueues.js @@ -0,0 +1,22 @@ +const REDIS_PORT = Number.parseInt(process.env.REDIS_PORT) || 6379 +const REDIS_HOST = process.env.REDIS_HOST || '127.0.0.1' +const REDIS_DATABASE = Number.parseInt(process.env.REDIS_DATABASE) || 0 +const REDIS_PASS = process.env.REDIS_PASS +const REDIS_TLS = process.env.REDIS_TLS && `${process.env.REDIS_TLS}` === 'true' + +const REDIS_CONFIG = { + redis: { + port: REDIS_PORT, + host: REDIS_HOST, + db: REDIS_DATABASE, + password: REDIS_PASS, + tls: REDIS_TLS ? {} : null + } +} + +const Queue = require('bull') + +module.exports = async (queueNames, filterFn = async queue => queue) => { + const queues = await Promise.all(queueNames.map(async name => filterFn(new Queue(name, REDIS_CONFIG)))) + return queues.filter(q => !!q) +} diff --git a/utils/init.js b/utils/init.js new file mode 100644 index 0000000..737e228 --- /dev/null +++ b/utils/init.js @@ -0,0 +1,14 @@ +const result = require('dotenv').config() + +if (result.error) { + throw result.error +} + +const args = process.argv.slice(2) + +if (args.length < 1) { + console.error(`${new Date().toISOString()} - No queue name specified, please specify at least one queue name.`) + process.exit(1) +} + +module.exports = args diff --git a/utils/isNonEmptyString.js b/utils/isNonEmptyString.js new file mode 100644 index 0000000..95e3821 --- /dev/null +++ b/utils/isNonEmptyString.js @@ -0,0 +1 @@ +module.exports = value => typeof value === 'string' && value !== '' diff --git a/utils/jobsExceedingAttemptsFilter.js b/utils/jobsExceedingAttemptsFilter.js new file mode 100644 index 0000000..dcaaa42 --- /dev/null +++ b/utils/jobsExceedingAttemptsFilter.js @@ -0,0 +1 @@ +module.exports = jobs => jobs.filter(job => !job || typeof job !== 'object' || job.attemptsMade <= job.opts.attempts) diff --git a/slack.js b/utils/slack.js similarity index 86% rename from slack.js rename to utils/slack.js index 15a062b..64584f0 100644 --- a/slack.js +++ b/utils/slack.js @@ -1,6 +1,4 @@ -function isNonEmptyString(value) { - return typeof value === 'string' && value !== '' -} +const isNonEmptyString = require('./isNonEmptyString') function sendToSlack(slack_webhook_url, slack_channel) { if (isNonEmptyString(slack_webhook_url) && isNonEmptyString(slack_channel)) { diff --git a/utils/sumArray.js b/utils/sumArray.js new file mode 100644 index 0000000..0e582a8 --- /dev/null +++ b/utils/sumArray.js @@ -0,0 +1 @@ +module.exports = arr => arr.reduce((a,b) => a + b, 0)