Skip to content

Commit

Permalink
init
Browse files Browse the repository at this point in the history
Signed-off-by: Mieszko Kycermann <[email protected]>
  • Loading branch information
Kycermann committed May 21, 2024
0 parents commit 8ebfcc2
Show file tree
Hide file tree
Showing 4 changed files with 168 additions and 0 deletions.
29 changes: 29 additions & 0 deletions .github/workflows/publish.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
name: Publish

on:
push:
branches:
- master

jobs:
publish:
runs-on: ubuntu-latest

permissions:
contents: read
id-token: write

steps:
- name: Checkout
uses: actions/checkout@v2

- name: Deno setup
uses: denoland/setup-deno@v1
with:
deno-version: vx.x.x

- name: Check
run: deno task check

- name: Publish
run: deno task publish
49 changes: 49 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
# Topics for Deno KV Queues

Use Deno KV queues with topics.

This module calls `kv.listenQueue` internally, so you cannot call
`kv.listenQueue` yourself or use any modules that call `kv.listenQueue`. It is
not recommended to use `kv.enqueue` directly because this module will ignore
those items.

## Quick Start

```typescript
import { enqueue, listenQueue } from "https://deno.land/x/topics/mod.ts";

// Call listenQueue on the top level.
listenQueue("my-queue", async (data) => {
console.log(data);
});

enqueue("my-queue", { example: "Hello!" });
```

## Advanced Usage

```typescript
import { enqueue, listenQueue } from "https://deno.land/x/topics/mod.ts";

type MyData = {
example: string;
};

listenQueue<MyData>(["my", "queue"], async (data) => {
console.log(data.example);
});

// You can optionally pass an atomic instance to enqueue.
await using kv = await Deno.openKv();
const atomic = kv.atomic();

// If you call enqueue before listenQueue, an error will be thrown.
enqueue<MyData>(
["my", "queue"],
{ example: "Hello, World!" },
{ backOffSchedule: [5000, 10_000] }, // Optional
atomic, // Optional
);

await atomic.commit();
```
9 changes: 9 additions & 0 deletions deno.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{
"name": "@mieszko/topics",
"version": "1.0.1",
"exports": "./mod.ts",
"tasks": {
"format": "deno fmt --unstable-kv",
"check": "deno fmt --check && deno lint && deno check **/*.ts"
}
}
81 changes: 81 additions & 0 deletions mod.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
export type QueueKey = string | (string | number | boolean)[];
export type QueueListener<T> = (payload: T) => void | Promise<void>;

type WrappedPayload<T> = {
queueKey: QueueKey;
payload: T;
keysIfUndelivered: Deno.KvKey[];
__mieszko_topics__: 1 | undefined;
};

// deno-lint-ignore no-explicit-any
const listeners: Map<string, QueueListener<any>> = new Map();

function computeQueueId(queueKey: QueueKey) {
return JSON.stringify(queueKey);
}

/**
* Enqueues a payload to the Deno KV queue. You must call `listenQueue` before calling this function.
*/
export async function enqueue<T = unknown>(
queueKey: QueueKey,
payload: T,
options: Parameters<Deno.Kv["enqueue"]>[1] = {},
atomic?: Deno.AtomicOperation,
) {
const queueId = computeQueueId(queueKey);

if (!listeners.has(queueId)) {
throw new Error(`No listener for queue ${queueId}`);
}

const wrappedPayload: WrappedPayload<T> = {
queueKey,
payload,
keysIfUndelivered: options.keysIfUndelivered ?? [],
__mieszko_topics__: 1,
};

if (atomic) {
atomic.enqueue(wrappedPayload, options);
} else {
await using kv = await Deno.openKv();
const { ok } = await kv.enqueue(wrappedPayload, options);

if (!ok) {
console.log("Payload", wrappedPayload);
throw new Error(`Failed to enqueue ${queueId}`);
}
}
}

/**
* Listens to a queue and calls the callback when a payload is enqueued.
*/
export function listenQueue<T>(queueKey: QueueKey, callback: QueueListener<T>) {
const queueId = computeQueueId(queueKey);

listeners.set(queueId, callback);
}

Deno.openKv().then((kv) => {
kv.listenQueue(
async (
{ queueKey, payload, __mieszko_topics__ }: WrappedPayload<unknown>,
) => {
if (__mieszko_topics__ !== 1) {
throw new Error("Unknown queue payload received");
}

const queueId = computeQueueId(queueKey);
const listener = listeners.get(queueId);

if (listener) {
await listener(payload);
} else {
throw new Error(`No listener for queue ${queueId}`);
}
},
);
});

0 comments on commit 8ebfcc2

Please sign in to comment.