Skip to content

Commit

Permalink
feat: Add support for idempotent enqueues
Browse files Browse the repository at this point in the history
  • Loading branch information
MohamedBassem committed Nov 3, 2024
1 parent a96c9b5 commit 3687c0c
Show file tree
Hide file tree
Showing 11 changed files with 226 additions and 350 deletions.
6 changes: 3 additions & 3 deletions src/drizzle.config.ts → drizzle.config.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import type { Config } from "drizzle-kit";

export default {
schema: "./schema.ts",
out: "./drizzle",
driver: "better-sqlite",
schema: "./src/schema.ts",
out: "./src/drizzle",
dialect: "sqlite",
dbCredentials: {
url: "data.db",
},
Expand Down
4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"name": "liteque",
"description": "A sqlite-based job queue for Node.js",
"author": "Mohamed Bassem <[email protected]>",
"version": "0.1.3",
"version": "0.2.0",
"type": "module",
"main": "dist/index.js",
"types": "dist/index.d.ts",
Expand Down Expand Up @@ -32,7 +32,7 @@
"devDependencies": {
"@tsconfig/node21": "^21.0.3",
"@types/better-sqlite3": "^7.6.11",
"drizzle-kit": "^0.20.14",
"drizzle-kit": "^0.24.02",
"typescript": "^5.6.3",
"vitest": "^1.3.1"
},
Expand Down
343 changes: 11 additions & 332 deletions pnpm-lock.yaml

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions src/drizzle/0001_wandering_giant_man.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
ALTER TABLE `tasks` ADD `idempotencyKey` text;--> statement-breakpoint
CREATE UNIQUE INDEX `tasks_queue_idempotencyKey_unique` ON `tasks` (`queue`,`idempotencyKey`);
9 changes: 4 additions & 5 deletions src/drizzle/meta/0000_snapshot.json
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
{
"version": "5",
"version": "6",
"dialect": "sqlite",
"id": "3094773c-0138-46b2-b617-4b10093b0f53",
"prevId": "00000000-0000-0000-0000-000000000000",
"tables": {
"tasks": {
"name": "tasks",
Expand Down Expand Up @@ -123,8 +121,9 @@
},
"enums": {},
"_meta": {
"schemas": {},
"tables": {},
"columns": {}
}
},
"id": "3094773c-0138-46b2-b617-4b10093b0f53",
"prevId": "00000000-0000-0000-0000-000000000000"
}
148 changes: 148 additions & 0 deletions src/drizzle/meta/0001_snapshot.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
{
"version": "6",
"dialect": "sqlite",
"id": "0f918c72-5b27-4e4c-9027-631ead290bf2",
"prevId": "3094773c-0138-46b2-b617-4b10093b0f53",
"tables": {
"tasks": {
"name": "tasks",
"columns": {
"id": {
"name": "id",
"type": "integer",
"primaryKey": true,
"notNull": true,
"autoincrement": true
},
"queue": {
"name": "queue",
"type": "text",
"primaryKey": false,
"notNull": true,
"autoincrement": false
},
"payload": {
"name": "payload",
"type": "text",
"primaryKey": false,
"notNull": true,
"autoincrement": false
},
"createdAt": {
"name": "createdAt",
"type": "integer",
"primaryKey": false,
"notNull": true,
"autoincrement": false
},
"status": {
"name": "status",
"type": "text",
"primaryKey": false,
"notNull": true,
"autoincrement": false,
"default": "'pending'"
},
"expireAt": {
"name": "expireAt",
"type": "integer",
"primaryKey": false,
"notNull": false,
"autoincrement": false
},
"allocationId": {
"name": "allocationId",
"type": "text",
"primaryKey": false,
"notNull": true,
"autoincrement": false
},
"numRunsLeft": {
"name": "numRunsLeft",
"type": "integer",
"primaryKey": false,
"notNull": true,
"autoincrement": false
},
"maxNumRuns": {
"name": "maxNumRuns",
"type": "integer",
"primaryKey": false,
"notNull": true,
"autoincrement": false
},
"idempotencyKey": {
"name": "idempotencyKey",
"type": "text",
"primaryKey": false,
"notNull": false,
"autoincrement": false
}
},
"indexes": {
"tasks_queue_idx": {
"name": "tasks_queue_idx",
"columns": [
"queue"
],
"isUnique": false
},
"tasks_status_idx": {
"name": "tasks_status_idx",
"columns": [
"status"
],
"isUnique": false
},
"tasks_expire_at_idx": {
"name": "tasks_expire_at_idx",
"columns": [
"expireAt"
],
"isUnique": false
},
"tasks_num_runs_left_idx": {
"name": "tasks_num_runs_left_idx",
"columns": [
"numRunsLeft"
],
"isUnique": false
},
"tasks_max_num_runs_idx": {
"name": "tasks_max_num_runs_idx",
"columns": [
"maxNumRuns"
],
"isUnique": false
},
"tasks_allocation_id_idx": {
"name": "tasks_allocation_id_idx",
"columns": [
"allocationId"
],
"isUnique": false
},
"tasks_queue_idempotencyKey_unique": {
"name": "tasks_queue_idempotencyKey_unique",
"columns": [
"queue",
"idempotencyKey"
],
"isUnique": true
}
},
"foreignKeys": {},
"compositePrimaryKeys": {},
"uniqueConstraints": {}
}
},
"enums": {},
"_meta": {
"schemas": {},
"tables": {},
"columns": {}
},
"internal": {
"indexes": {}
}
}
7 changes: 7 additions & 0 deletions src/drizzle/meta/_journal.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,13 @@
"when": 1720992922192,
"tag": "0000_wonderful_talisman",
"breakpoints": true
},
{
"idx": 1,
"version": "6",
"when": 1730656128884,
"tag": "0001_wandering_giant_man",
"breakpoints": true
}
]
}
5 changes: 5 additions & 0 deletions src/options.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,11 @@ export interface SqliteQueueOptions {
};
}

export interface EnqueueOptions {
numRetries?: number;
idempotencyKey?: string;
}

export interface RunnerFuncs<T> {
run: (job: DequeuedJob<T>) => Promise<void>;
onComplete?: (job: DequeuedJob<T>) => Promise<void>;
Expand Down
20 changes: 14 additions & 6 deletions src/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import assert from "node:assert";
import { and, asc, count, eq, gt, lt, or } from "drizzle-orm";

import { buildDBClient } from "./db";
import { SqliteQueueOptions } from "./options";
import { EnqueueOptions, SqliteQueueOptions } from "./options";
import { Job, tasksTable } from "./schema";

// generate random id
Expand All @@ -29,19 +29,27 @@ export class SqliteQueue<T> {
return this.queueName;
}

async enqueue(payload: T): Promise<Job> {
const job = await this.db
/**
* Enqueue a job into the queue.
* If a job with the same idempotency key is already enqueued, it will be ignored and undefined will be returned.
*/
async enqueue(payload: T, options?: EnqueueOptions): Promise<Job | undefined> {
const opts = options ?? {};
const numRetries = opts.numRetries ?? this.options.defaultJobArgs.numRetries;
const [job] = await this.db
.insert(tasksTable)
.values({
queue: this.queueName,
payload: JSON.stringify(payload),
numRunsLeft: this.options.defaultJobArgs.numRetries + 1,
maxNumRuns: this.options.defaultJobArgs.numRetries + 1,
numRunsLeft: numRetries + 1,
maxNumRuns: numRetries + 1,
allocationId: generateAllocationId(),
idempotencyKey: opts.idempotencyKey,
})
.onConflictDoNothing({target: [tasksTable.queue, tasksTable.idempotencyKey]})
.returning();

return job[0];
return job;
}

async stats() {
Expand Down
28 changes: 27 additions & 1 deletion src/runner.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ describe("SqiteQueueRunner", () => {
await queue.db
.update(tasksTable)
.set({ payload: "{}" })
.where(eq(tasksTable.id, job.id));
.where(eq(tasksTable.id, job!.id));

const barrier = new Barrier(1);
barrier.allowParticipantsToProceed();
Expand Down Expand Up @@ -437,4 +437,30 @@ describe("SqiteQueueRunner", () => {
expect(results.numCompleted).toEqual(1000);
expect(results.numFailed).toEqual(0);
});

test("idempotency keys", async () => {
const queue = new SqliteQueue<Work>(
"queue1",
buildDBClient(":memory:", true),
{
defaultJobArgs: {
numRetries: 0,
},
},
);

await queue.enqueue({ increment: 1 });
await queue.enqueue({ increment: 2 }, { idempotencyKey: "2" });
await queue.enqueue({ increment: 2 }, { idempotencyKey: "2" });
await queue.enqueue({ increment: 2 }, { idempotencyKey: "2" });
await queue.enqueue({ increment: 3 }, { idempotencyKey: "3" });

expect(await queue.stats()).toEqual({
pending: 3,
running: 0,
pending_retry: 0,
failed: 0,
});

});
});
4 changes: 3 additions & 1 deletion src/schema.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { index, integer, sqliteTable, text } from "drizzle-orm/sqlite-core";
import { index, integer, sqliteTable, text, unique } from "drizzle-orm/sqlite-core";

function createdAtField() {
return integer("createdAt", { mode: "timestamp" })
Expand All @@ -22,6 +22,7 @@ export const tasksTable = sqliteTable(
allocationId: text("allocationId").notNull(),
numRunsLeft: integer("numRunsLeft").notNull(),
maxNumRuns: integer("maxNumRuns").notNull(),
idempotencyKey: text("idempotencyKey"),
},
(tasks) => ({
queueIdx: index("tasks_queue_idx").on(tasks.queue),
Expand All @@ -30,6 +31,7 @@ export const tasksTable = sqliteTable(
numRunsLeftIdx: index("tasks_num_runs_left_idx").on(tasks.numRunsLeft),
maxNumRunsIdx: index("tasks_max_num_runs_idx").on(tasks.maxNumRuns),
allocationIdIdx: index("tasks_allocation_id_idx").on(tasks.allocationId),
idempotencyKeyIdx: unique().on(tasks.queue, tasks.idempotencyKey),
}),
);

Expand Down

0 comments on commit 3687c0c

Please sign in to comment.