Skip to content

Commit

Permalink
fix: use batches for queue
Browse files Browse the repository at this point in the history
  • Loading branch information
fenos committed Apr 13, 2023
1 parent 0bbaac2 commit 129938e
Show file tree
Hide file tree
Showing 6 changed files with 107 additions and 38 deletions.
15 changes: 15 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
"@tus/server": "https://gitpkg.now.sh/supabase/tus-node-server/packages/server/dist?build",
"agentkeepalive": "^4.2.1",
"axios": "^0.27.2",
"axios-rate-limit": "^1.3.0",
"axios-retry": "^3.3.1",
"connection-string": "^4.3.6",
"conventional-changelog-conventionalcommits": "^5.0.0",
Expand Down
2 changes: 2 additions & 0 deletions src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type StorageConfigType = {
webhookURL?: string
webhookApiKey?: string
webhookQueuePullInterval?: number
webhookQueueBatchSize: number
enableImageTransformation: boolean
imgProxyURL?: string
imgProxyRequestTimeout: number
Expand Down Expand Up @@ -143,6 +144,7 @@ export function getConfig(): StorageConfigType {
webhookQueuePullInterval: parseInt(
getOptionalConfigFromEnv('WEBHOOK_QUEUE_PULL_INTERVAL') || '700'
),
webhookQueueBatchSize: parseInt(getOptionalConfigFromEnv('WEBHOOK_QUEUE_BATCH_SIZE') || '100'),
enableImageTransformation: getOptionalConfigFromEnv('ENABLE_IMAGE_TRANSFORMATION') === 'true',
imgProxyRequestTimeout: parseInt(
getOptionalConfigFromEnv('IMGPROXY_REQUEST_TIMEOUT') || '15',
Expand Down
31 changes: 30 additions & 1 deletion src/queue/events/base-event.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,20 @@ export abstract class BaseEvent<T extends Omit<BasePayload, '$version'>> {
return {}
}

static ack(job: Job<BaseEvent<any>['payload']> | Job<BaseEvent<any>['payload']>[]) {
if (enableQueueEvents) {
const jobs = Array.isArray(job) ? job : [job]
return Promise.all(jobs.map((job) => Queue.getInstance().complete(job.id)))
}
}

static fail(job: Job<BaseEvent<any>['payload']> | Job<BaseEvent<any>['payload']>[]) {
if (enableQueueEvents) {
const jobs = Array.isArray(job) ? job : [job]
return Promise.all(jobs.map((job) => Queue.getInstance().fail(job.id)))
}
}

static send<T extends BaseEvent<any>>(
this: StaticThis<T>,
payload: Omit<T['payload'], '$version'>
Expand Down Expand Up @@ -75,7 +89,7 @@ export abstract class BaseEvent<T extends Omit<BasePayload, '$version'>> {
})
}

static handle(job: Job<BaseEvent<any>['payload']>) {
static handle(job: Job<BaseEvent<any>['payload']> | Job<BaseEvent<any>['payload']>[]) {
throw new Error('not implemented')
}

Expand All @@ -101,6 +115,21 @@ export abstract class BaseEvent<T extends Omit<BasePayload, '$version'>> {
const constructor = this.constructor as typeof BaseEvent

if (!enableQueueEvents) {
const options = constructor.getWorkerOptions()

if (options.batchSize) {
return constructor.handle([
{
id: '',
name: constructor.getQueueName(),
data: {
...this.payload,
$version: constructor.version,
},
},
])
}

return constructor.handle({
id: '',
name: constructor.getQueueName(),
Expand Down
8 changes: 6 additions & 2 deletions src/queue/events/webhook.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@ import { Job, WorkOptions } from 'pg-boss'
import axios from 'axios'
import { getConfig } from '../../config'
import { logger } from '../../monitoring'
import rateLimit from 'axios-rate-limit'

const { webhookURL, webhookApiKey, webhookQueuePullInterval } = getConfig()
const { webhookURL, webhookApiKey, webhookQueuePullInterval, webhookQueueBatchSize } = getConfig()

const httpClient = rateLimit(axios.create(), { maxRPS: 100 })

interface WebhookEvent {
event: {
Expand All @@ -26,6 +29,7 @@ export class Webhook extends BaseEvent<WebhookEvent> {
static getWorkerOptions(): WorkOptions {
return {
newJobCheckInterval: webhookQueuePullInterval,
batchSize: webhookQueueBatchSize,
}
}

Expand All @@ -38,7 +42,7 @@ export class Webhook extends BaseEvent<WebhookEvent> {
logger.info({ job }, 'handling webhook')

try {
await axios.post(
await httpClient.post(
webhookURL,
{
type: 'Webhook',
Expand Down
88 changes: 53 additions & 35 deletions src/queue/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,50 +71,68 @@ export abstract class Queue {
const workers: Promise<string>[] = []

Queue.events.forEach((event) => {
const options = event.getWorkerOptions()

workers.push(
Queue.getInstance().work(
event.getQueueName(),
event.getWorkerOptions(),
async (job: Job<BasePayload>) => {
try {
const res = await event.handle(job)

QueueJobCompleted.inc({
tenant_id: job.data.tenant.ref,
name: event.getQueueName(),
})
options,
async (job: Job<BasePayload> | Job<BasePayload>[]) => {
if (!Array.isArray(job)) {
job = [job]
}

return res
} catch (e) {
QueueJobRetryFailed.inc({
tenant_id: job.data.tenant.ref,
name: event.getQueueName(),
})
await Promise.all(
job.map(async (j) => {
try {
const res = await event.handle(j)

Queue.getInstance()
.getJobById(job.id)
.then((dbJob) => {
if (!dbJob) {
return
if (options.batchSize) {
await event.ack(j)
}
if (dbJob.retrycount === dbJob.retrylimit) {
QueueJobError.inc({
tenant_id: job.data.tenant.ref,
name: event.getQueueName(),

QueueJobCompleted.inc({
tenant_id: j.data.tenant.ref,
name: event.getQueueName(),
})

return res
} catch (e) {
QueueJobRetryFailed.inc({
tenant_id: j.data.tenant.ref,
name: event.getQueueName(),
})

Queue.getInstance()
.getJobById(j.id)
.then((dbJob) => {
if (!dbJob) {
return
}
if (dbJob.retrycount === dbJob.retrylimit) {
QueueJobError.inc({
tenant_id: j.data.tenant.ref,
name: event.getQueueName(),
})
}
})
}
})

logger.error(
{
job: JSON.stringify(job),
rawError: normalizeRawError(e),
},
'Error while processing job'
)
logger.error(
{
job: JSON.stringify(j),
rawError: normalizeRawError(e),
},
'Error while processing job'
)

throw e
}
if (!options.batchSize) {
throw e
}

await event.fail(j)
}
})
)
}
)
)
Expand Down

0 comments on commit 129938e

Please sign in to comment.