Skip to content

Commit

Permalink
feat: add a flag to supress kv.listenQueue
Browse files Browse the repository at this point in the history
Signed-off-by: Mieszko Kycermann <[email protected]>
  • Loading branch information
Kycermann committed Oct 2, 2024
1 parent ec903ec commit 72f5cc3
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 13 deletions.
8 changes: 7 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,13 @@ import { connectTopicQueue } from "jsr:@mieszko/topics";

// Connect to any KV store by passing a KV connection to connectTopicQueue.
const kv = await Deno.openKv(":memory:");
const { enqueue, listenQueue, close } = await connectTopicQueue(kv);

// If kv.listenQueue freezes your build process, you can disable it here.
const disableListenQueue = Deno.env.has("FRESH_IS_BUILDING");
const { enqueue, listenQueue, close } = await connectTopicQueue(
kv,
disableListenQueue,
);

type MyData = {
example: string;
Expand Down
10 changes: 7 additions & 3 deletions deno.json
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
{
"name": "@mieszko/topics",
"version": "2.0.1",
"version": "2.1.0",
"exports": "./mod.ts",
"tasks": {
"check": "deno fmt --check && deno lint && deno check **/*.ts",
"test": "deno test --unstable-kv --parallel"
"test": "deno test --unstable-kv"
},
"publish": {
"exclude": ["test.ts", "deno.lock", ".github"]
"exclude": [
"test.ts",
"deno.lock",
".github"
]
}
}
14 changes: 9 additions & 5 deletions mod.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@ function getTopicId(topicKey: Topic) {
* Connects to Deno KV and returns functions to enqueue and listen to a queue with topics.
*
* @param kvInstance Deno KV connection to use. If not provided, a new connection will be created.
* @param disableListenQueue Whether to disable listening to the KV queue. Default is `false`.
*/
export async function connectTopicQueue(
kvInstance?: Deno.Kv,
disableListenQueue = false,
): Promise<TopicQueueConnection> {
const kv = kvInstance ?? await Deno.openKv();
const kv = kvInstance ?? (await Deno.openKv());
const listeners: Map<string, QueueListener<unknown>> = new Map();

/**
Expand Down Expand Up @@ -70,14 +72,16 @@ export async function connectTopicQueue(

listeners.set(queueId, callback as QueueListener<unknown>);

if (listeners.size > 1) {
if (listeners.size > 1 || disableListenQueue) {
return;
}

kv.listenQueue(
async (
{ topicKey, payload, __mieszko_topics__ }: WrappedPayload<unknown>,
) => {
async ({
topicKey,
payload,
__mieszko_topics__,
}: WrappedPayload<unknown>) => {
if (__mieszko_topics__ !== 1) {
throw new Error("Unrecognised payload format");
}
Expand Down
22 changes: 22 additions & 0 deletions test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -262,3 +262,25 @@ Deno.test("Should pass options to kv.enqueue", async () => {
// Ensure kv.enqueue was called with the correct options
assertEquals(options, testOptions);
});

Deno.test("Should not call kv.listenQueue if disableListenQueue is true", async () => {
await using kv = await Deno.openKv(":memory:");
await using q = await connectTopicQueue(kv, true);

const testTopic = "testTopic";

let called = false;

// @ts-ignore Mock kv.listenQueue to catch calls
kv.listenQueue = () => {
called = true;
};

// Setup listener and enqueue
q.listenQueue(testTopic, () => {});

// Ensure kv.listenQueue was not called
if (called) {
throw new Error("kv.listenQueue was called");
}
});
5 changes: 1 addition & 4 deletions types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,7 @@ export type TopicQueueConnection = {
options?: Parameters<Deno.Kv["enqueue"]>[1],
atomic?: Deno.AtomicOperation,
) => Promise<void>;
listenQueue: <T>(
topicKey: Topic,
callback: QueueListener<T>,
) => void;
listenQueue: <T>(topicKey: Topic, callback: QueueListener<T>) => void;
close: () => void;
[Symbol.dispose]: () => void;
};
Expand Down

0 comments on commit 72f5cc3

Please sign in to comment.