diff --git a/.github/workflows/publish.yml b/.github/workflows/publish.yml new file mode 100644 index 0000000..84fa29d --- /dev/null +++ b/.github/workflows/publish.yml @@ -0,0 +1,29 @@ +name: Publish + +on: + push: + branches: + - master + +jobs: + publish: + runs-on: ubuntu-latest + + permissions: + contents: read + id-token: write + + steps: + - name: Checkout + uses: actions/checkout@v2 + + - name: Deno setup + uses: denoland/setup-deno@v1 + with: + deno-version: vx.x.x + + - name: Check + run: deno task check + + - name: Publish + run: deno task publish diff --git a/README.md b/README.md new file mode 100644 index 0000000..f22d752 --- /dev/null +++ b/README.md @@ -0,0 +1,49 @@ +# Topics for Deno KV Queues + +Use Deno KV queues with topics. + +This module calls `kv.listenQueue` internally, so you cannot call +`kv.listenQueue` yourself or use any modules that call `kv.listenQueue`. It is +not recommended to use `kv.enqueue` directly because this module will ignore +those items. + +## Quick Start + +```typescript +import { enqueue, listenQueue } from "https://deno.land/x/topics/mod.ts"; + +// Call listenQueue on the top level. +listenQueue("my-queue", async (data) => { + console.log(data); +}); + +enqueue("my-queue", { example: "Hello!" }); +``` + +## Advanced Usage + +```typescript +import { enqueue, listenQueue } from "https://deno.land/x/topics/mod.ts"; + +type MyData = { + example: string; +}; + +listenQueue(["my", "queue"], async (data) => { + console.log(data.example); +}); + +// You can optionally pass an atomic instance to enqueue. +await using kv = await Deno.openKv(); +const atomic = kv.atomic(); + +// If you call enqueue before listenQueue, an error will be thrown. +enqueue( + ["my", "queue"], + { example: "Hello, World!" }, + { backOffSchedule: [5000, 10_000] }, // Optional + atomic, // Optional +); + +await atomic.commit(); +``` diff --git a/deno.json b/deno.json new file mode 100644 index 0000000..9ce8833 --- /dev/null +++ b/deno.json @@ -0,0 +1,9 @@ +{ + "name": "@mieszko/topics", + "version": "1.0.1", + "exports": "./mod.ts", + "tasks": { + "format": "deno fmt --unstable-kv", + "check": "deno fmt --check && deno lint && deno check **/*.ts" + } +} diff --git a/mod.ts b/mod.ts new file mode 100644 index 0000000..3112690 --- /dev/null +++ b/mod.ts @@ -0,0 +1,81 @@ +export type QueueKey = string | (string | number | boolean)[]; +export type QueueListener = (payload: T) => void | Promise; + +type WrappedPayload = { + queueKey: QueueKey; + payload: T; + keysIfUndelivered: Deno.KvKey[]; + __mieszko_topics__: 1 | undefined; +}; + +// deno-lint-ignore no-explicit-any +const listeners: Map> = new Map(); + +function computeQueueId(queueKey: QueueKey) { + return JSON.stringify(queueKey); +} + +/** + * Enqueues a payload to the Deno KV queue. You must call `listenQueue` before calling this function. + */ +export async function enqueue( + queueKey: QueueKey, + payload: T, + options: Parameters[1] = {}, + atomic?: Deno.AtomicOperation, +) { + const queueId = computeQueueId(queueKey); + + if (!listeners.has(queueId)) { + throw new Error(`No listener for queue ${queueId}`); + } + + const wrappedPayload: WrappedPayload = { + queueKey, + payload, + keysIfUndelivered: options.keysIfUndelivered ?? [], + __mieszko_topics__: 1, + }; + + if (atomic) { + atomic.enqueue(wrappedPayload, options); + } else { + await using kv = await Deno.openKv(); + const { ok } = await kv.enqueue(wrappedPayload, options); + + if (!ok) { + console.log("Payload", wrappedPayload); + throw new Error(`Failed to enqueue ${queueId}`); + } + } +} + +/** + * Listens to a queue and calls the callback when a payload is enqueued. + */ +export function listenQueue(queueKey: QueueKey, callback: QueueListener) { + const queueId = computeQueueId(queueKey); + + listeners.set(queueId, callback); +} + +Deno.openKv().then((kv) => { + kv.listenQueue( + async ( + { queueKey, payload, __mieszko_topics__ }: WrappedPayload, + ) => { + if (__mieszko_topics__ !== 1) { + throw new Error("Unknown queue payload received"); + } + + const queueId = computeQueueId(queueKey); + const listener = listeners.get(queueId); + + if (listener) { + await listener(payload); + } else { + throw new Error(`No listener for queue ${queueId}`); + } + }, + ); +});