Skip to content

Commit

Permalink
Rewritten scripts to be more concurrent and for better code reuse.
Browse files Browse the repository at this point in the history
  • Loading branch information
Robert Michalski committed Aug 20, 2020
1 parent d4a63f9 commit 77e193c
Show file tree
Hide file tree
Showing 12 changed files with 190 additions and 193 deletions.
1 change: 1 addition & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ REDIS_DATABASE=0
#REDIS_TLS=true

MAX_FAILED_COUNT=100
CONCURRENCY=25

SLACK_WEBHOOK_URL=https://hooks.slack.com/services/...
SLACK_CHANNEL="#my-channel"
90 changes: 32 additions & 58 deletions clean_completed_jobs.js
Original file line number Diff line number Diff line change
@@ -1,73 +1,47 @@
const Queue = require('bull')
const queueNames = require('./utils/init')

const result = require('dotenv').config()
if (result.error) {
throw result.error
}
const sendToSlack = require('./utils/slack')(process.env.SLACK_WEBHOOK_URL, process.env.SLACK_CHANNEL)

const { forEach } = require('p-iteration')

const REDIS_PORT = Number.parseInt(process.env.REDIS_PORT) || 6379
const REDIS_HOST = process.env.REDIS_HOST || '127.0.0.1'
const REDIS_DATABASE = Number.parseInt(process.env.REDIS_DATABASE) || 0
const REDIS_PASS = process.env.REDIS_PASS
const REDIS_TLS = process.env.REDIS_TLS && `${process.env.REDIS_TLS}` === 'true'
const sumArray = require('./utils/sumArray')
const getQueues = require('./utils/getQueues')

const JOB_AGE = Number.parseInt(process.env.JOB_AGE) || 5000

const REDIS_CONFIG = {
redis: {
port: REDIS_PORT,
host: REDIS_HOST,
db: REDIS_DATABASE,
password: REDIS_PASS,
tls: REDIS_TLS ? {} : null
}
}

const args = process.argv.slice(2)

if (args.length < 1) {
console.error(`${new Date().toISOString()} - No queue name specified, please specify at least one queue name.`)
process.exit(1)
}

const sendToSlack = require('./slack')(process.env.SLACK_WEBHOOK_URL, process.env.SLACK_CHANNEL)

const startTime = Date.now()
console.log(`${new Date(startTime).toISOString()} - Start cleaning completed jobs in ${args.join(', ')} queues`)

forEach(args, async (name) => {
console.log(`${Date.now() - startTime} ms - Start cleaning completed jobs in ${name} queue`)

const queue = new Queue(name, REDIS_CONFIG)

queue.on('cleaned', async (jobs, type) => {
// jobs is an array of cleaned jobs
const text = `Cleaned ${jobs.length} ${type} jobs in ${name}.`
const jobIds = jobs.map(job => job && job === typeof 'object' && job.id ? job.id : job)

console.log(`${Date.now() - startTime} ms - ${text}`, jobIds)
await sendToSlack({
text,
jobs: jobIds
//type // the type of jobs cleaned })
console.log(`${new Date().toISOString()} - Checking for completed jobs in ${queueNames.join(', ')} queues`)

getQueues(queueNames)
.then(async queues => {
const operations = queues.map(queue => {
console.log(`${Date.now() - startTime} ms - Start cleaning completed jobs in ${queue.name} queue`)

return new Promise(resolve => {
queue.on('cleaned', async (jobs, type) => {
// jobs is an array of cleaned jobs
const text = `Cleaned ${jobs.length} ${type} jobs in ${queue.name}.`
const jobIds = jobs.map(job => job && job === typeof 'object' && job.id ? job.id : job)

console.log(`${Date.now() - startTime} ms - ${text}`)
await Promise.all([
sendToSlack({ text }),
queue.close()
])
resolve(jobIds.length)
})
queue.clean(JOB_AGE) // start cleaning jobs older than JOB_AGE
})
})

await queue.close()
return Promise.all(operations)
})

await queue.clean(JOB_AGE)

console.log(`${Date.now() - startTime} ms - Finished cleaning completed jobs in ${name} queue`)
})
.then(() => {
console.log(`${Date.now() - startTime} ms - Finished cleaning completed jobs in queues`)
.then(sumArray)
.then(cleanedJobCount => {
console.log(`${Date.now() - startTime} ms - Finished cleaning ${cleanedJobCount} completed jobs in ${queueNames.join(', ')} queues`)
process.exitCode = 0
})
.catch(err => {
console.error(err)
console.log(`${Date.now() - startTime} ms - Finished cleaning completed jobs in queues`)
process.exitCode = 2
console.log(`${Date.now() - startTime} ms - Finished cleaning some completed jobs in ${queueNames.join(', ')} queues`)
process.exitCode = 1
})

113 changes: 46 additions & 67 deletions clean_failed_job_stacks.js
Original file line number Diff line number Diff line change
@@ -1,85 +1,64 @@
const Queue = require('bull')
const result = require('dotenv').config()
const queueNames = require('./utils/init')

if (result.error) {
throw result.error
}
const pMap = require('p-map')
const sendToSlack = require('./utils/slack')(process.env.SLACK_WEBHOOK_URL, process.env.SLACK_CHANNEL)

const { forEach, reduce } = require('p-iteration')
const sumArray = require('./utils/sumArray')
const getFailedQueues = require('./utils/getFailedQueues')
const getAttemptedJobs = require('./utils/jobsExceedingAttemptsFilter')

const REDIS_PORT = Number.parseInt(process.env.REDIS_PORT) || 6379
const REDIS_HOST = process.env.REDIS_HOST || '127.0.0.1'
const REDIS_DATABASE = Number.parseInt(process.env.REDIS_DATABASE) || 0
const REDIS_PASS = process.env.REDIS_PASS
const REDIS_TLS = process.env.REDIS_TLS && `${process.env.REDIS_TLS}` === 'true'
const MAX_FAILED_COUNT = Number.parseInt(process.env.MAX_FAILED_COUNT) || 100
console.log(`${new Date().toISOString()} - Checking for failed jobs in ${queueNames.join(', ')} queues`)

const REDIS_CONFIG = {
redis: {
port: REDIS_PORT,
host: REDIS_HOST,
db: REDIS_DATABASE,
password: REDIS_PASS,
tls: REDIS_TLS ? {} : null
}
}

const args = process.argv.slice(2)

if (args.length < 1) {
console.error(`${new Date().toISOString()} - No queue name specified, please specify at least one queue name.`)
process.exit(1)
}

const sendToSlack = require('./slack')(process.env.SLACK_WEBHOOK_URL, process.env.SLACK_CHANNEL)
const CONCURRENCY = parseInt(process.env.CONCURRENCY) || 50

const startTime = Date.now()
console.log(`${new Date(startTime).toISOString()} - Checking for failed jobs in ${args.join(', ')} queues`)

forEach(args, async (name, index) => {
console.log(`${Date.now() - startTime} ms - Start cleaning stacks for failed jobs in ${name} queue`)
getFailedQueues(queueNames)
.then(async queues => {
const retriedJobCountsForEachQueue = await Promise.all(
queues.map(async queue => {
const failedJobs = getAttemptedJobs(await queue.getFailed())

const queue = new Queue(name, REDIS_CONFIG)
if (!failedJobs.length) {
return queue.close().then(() => 0)
}

const failedCount = await queue.getFailedCount()
if (failedCount > 0) {
let text = `Found ${failedCount} failed jobs in ${name} queue.`
console.log(`${Date.now() - startTime} ms - ${text}`)
// await sendToSlack({ text })
const failedCount = failedJobs.length
const retriedJobs = await pMap(
failedJobs,
async job => {
const { id, data, opts } = job
opts.jobId = id
try {
await job.remove()
const rescheduled = await queue.add(data, opts)
return 1
} catch (e) {
console.error(e)
return 0
}
},
{ concurrency: CONCURRENCY }
)
const retriedJobCountInQueue = sumArray(retriedJobs)
const text = `Cleaned stack traces for ${retriedJobCountInQueue} of ${failedCount} failed jobs in ${queue.name} queue`
console.log(`${Date.now() - startTime} ms - ${text}`)

const jobs = await queue.getFailed()
const retriedJobCount = await reduce(jobs, async (count, job) => {
if (!job || typeof job !== 'object' || job.attemptsMade <= job.opts.attempts) {
return count
}
const { id, data, opts } = job
opts.jobId = id
try {
await job.remove()
const rescheduled = await queue.add(data, opts)
count++
} catch (e) {
console.error(e)
}
return count
}, 0)
return Promise.all([
sendToSlack({ text }),
queue.close()
]).then(() => retriedJobCountInQueue)
})
)

text = `Cleaned stack traces for ${retriedJobCount} of ${failedCount} failed jobs in ${name} queue`
console.log(`${Date.now() - startTime} ms - ${text}`)
await sendToSlack({ text })
} else {
console.log(`${Date.now() - startTime} ms - No jobs failed jobs in ${name} queue`)
}

await queue.close()
return sumArray(retriedJobCountsForEachQueue)
})
.then(() => {
console.log(`${Date.now() - startTime} ms - Finished cleaning stacks for failed jobs in queues`)
.then(retriedJobsInAllQueues => {
console.log(`${Date.now() - startTime} ms - Finished cleaning stacks for ${retriedJobsInAllQueues} failed jobs in ${queueNames.join(', ')} queues`)
process.exitCode = 0
})
.catch(err => {
console.error(err)
console.log(`${Date.now() - startTime} ms - Finished cleaning stacks for failed jobs in queues`)
console.log(`${Date.now() - startTime} ms - Finished cleaning stacks for some failed jobs in ${queueNames.join(', ')} queues`)
process.exitCode = 1
})

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
"dependencies": {
"dotenv": "^8.2.0",
"bull": "^3.18.0",
"p-iteration": "^1.1.8",
"p-map": "^4.0.0",
"slack-notify": "^0.1.7"
},
"devDependencies": {
Expand Down
110 changes: 46 additions & 64 deletions retry_failed_jobs.js
Original file line number Diff line number Diff line change
@@ -1,82 +1,64 @@
const Queue = require('bull')
const result = require('dotenv').config()
const queueNames = require('./utils/init')

if (result.error) {
throw result.error
}
const pMap = require('p-map')
const sendToSlack = require('./utils/slack')(process.env.SLACK_WEBHOOK_URL, process.env.SLACK_CHANNEL)

const { forEach, reduce } = require('p-iteration')
const sumArray = require('./utils/sumArray')
const getFailedQueues = require('./utils/getFailedQueues')
const getAttemptedJobs = require('./utils/jobsExceedingAttemptsFilter')

const REDIS_PORT = Number.parseInt(process.env.REDIS_PORT) || 6379
const REDIS_HOST = process.env.REDIS_HOST || '127.0.0.1'
const REDIS_DATABASE = Number.parseInt(process.env.REDIS_DATABASE) || 0
const REDIS_PASS = process.env.REDIS_PASS
const REDIS_TLS = process.env.REDIS_TLS && `${process.env.REDIS_TLS}` === 'true'
const MAX_FAILED_COUNT = Number.parseInt(process.env.MAX_FAILED_COUNT) || 100

const REDIS_CONFIG = {
redis: {
port: REDIS_PORT,
host: REDIS_HOST,
db: REDIS_DATABASE,
password: REDIS_PASS,
tls: REDIS_TLS ? {} : null
}
}

const args = process.argv.slice(2)

if (args.length < 1) {
console.error(`${new Date().toISOString()} - No queue name specified, please specify at least one queue name.`)
process.exit(1)
}
console.log(`${new Date().toISOString()} - Checking for failed jobs in ${queueNames.join(', ')} queues`)

const sendToSlack = require('./slack')(process.env.SLACK_WEBHOOK_URL, process.env.SLACK_CHANNEL)
const CONCURRENCY = parseInt(process.env.CONCURRENCY) || 50
const MAX_FAILED_COUNT = Number.parseInt(process.env.MAX_FAILED_COUNT) || 100

const startTime = Date.now()
console.log(`${new Date(startTime).toISOString()} - Checking for failed jobs in ${args.join(', ')} queues`)

forEach(args, async (name, index) => {
console.log(`${Date.now() - startTime} ms - Start retrying failed jobs in ${name} queue`)

const queue = new Queue(name, REDIS_CONFIG)
getFailedQueues(queueNames)
.then(async queues => {
const retriedJobCountsForEachQueue = await Promise.all(
queues.map(async queue => {
const failedJobs = getAttemptedJobs(await queue.getFailed())
.filter(job => job.attemptsMade > MAX_FAILED_COUNT)

const failedCount = await queue.getFailedCount()
if (failedCount > 0) {
let text = `Found ${failedCount} failed jobs in ${name} queue.`
console.log(`${Date.now() - startTime} ms - ${text}`)
// await sendToSlack({ text })
if (!failedJobs.length) {
return queue.close().then(() => 0)
}

const jobs = await queue.getFailed()
const retriedJobCount = await reduce(jobs, async (count, job) => {
if (!job || typeof job !== 'object' || job.attemptsMade > MAX_FAILED_COUNT) {
return count
}
try {
await job.retry()
count++
} catch (e) {
console.error(e)
}
return count
}, 0)
const failedCount = failedJobs.length
const retriedJobs = await pMap(
failedJobs,
async job => {
try {
await job.retry()
return 1
} catch (e) {
console.error(e)
return 0
}
},
{ concurrency: CONCURRENCY }
)
const retriedJobCountInQueue = sumArray(retriedJobs)
const text = `Retried ${retriedJobCountInQueue} of ${failedCount} failed jobs in ${queue.name} queue`
console.log(`${Date.now() - startTime} ms - ${text}`)

text = `Retrying ${retriedJobCount} of ${failedCount} failed jobs in ${name} queue`
console.log(`${Date.now() - startTime} ms - ${text}`)
await sendToSlack({ text })
} else {
console.log(`${Date.now() - startTime} ms - No jobs failed jobs in ${name} queue`)
}
return Promise.all([
sendToSlack({ text }),
queue.close()
]).then(() => retriedJobCountInQueue)
})
)

await queue.close()
})
.then(() => {
console.log(`${Date.now() - startTime} ms - Finished retrying failed jobs in queues`)
return sumArray(retriedJobCountsForEachQueue)
})
.then((retriedJobsInAllQueues) => {
console.log(`${Date.now() - startTime} ms - Finished retrying ${retriedJobsInAllQueues} failed jobs in ${queueNames.join(', ')} queues`)
process.exitCode = 0
})
.catch(err => {
console.error(err)
console.log(`${Date.now() - startTime} ms - Finished retrying failed jobs in queues`)
console.log(`${Date.now() - startTime} ms - Finished retrying failed jobs in ${queueNames.join(', ')} queues`)
process.exitCode = 1
})

Loading

0 comments on commit 77e193c

Please sign in to comment.