Skip to content

Commit

Permalink
RedisMessageQueue
Browse files Browse the repository at this point in the history
  • Loading branch information
dahlia committed Jun 20, 2024
1 parent be0fe5d commit c7c9dbc
Show file tree
Hide file tree
Showing 6 changed files with 236 additions and 2 deletions.
7 changes: 6 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@
[![GitHub Actions][GitHub Actions badge]][GitHub Actions]

This package provides [Fedify]'s [`KvStore`] and [`MessageQueue`]
implementations for Redis.
implementations for Redis:

- [`RedisKvStore`]
- [`RedisMessageQueue`]

[JSR]: https://jsr.io/@fedify/redis
[JSR badge]: https://jsr.io/badges/@fedify/redis
Expand All @@ -19,6 +22,8 @@ implementations for Redis.
[Fedify]: https://fedify.dev/
[`KvStore`]: https://jsr.io/@fedify/fedify/doc/federation/~/KvStore
[`MessageQueue`]: https://jsr.io/@fedify/fedify/doc/federation/~/MessageQueue
[`RedisKvStore`]: https://jsr.io/@fedify/redis/doc/kv/~/RedisKvStore
[`RedisMessageQueue`]: https://jsr.io/@fedify/redis/doc/kv/~/RedisMessageQueue


Changelog
Expand Down
4 changes: 4 additions & 0 deletions deno.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,12 @@
"@deno/dnt": "jsr:@deno/dnt@^0.41.2",
"@fedify/fedify": "jsr:@fedify/fedify@^0.10.0",
"@std/assert": "jsr:@std/assert@^0.226.0",
"@std/async": "jsr:@std/async@^0.224.2",
"ioredis": "npm:ioredis@^5.4.0"
},
"unstable": [
"temporal"
],
"exclude": [
"npm"
],
Expand Down
5 changes: 5 additions & 0 deletions deno.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 17 additions & 1 deletion dnt.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,23 @@ await build({
outDir: "./npm",
entryPoints: ["./mod.ts"],
importMap,
shims: { deno: true },
shims: {
deno: true,
custom: [
{
package: {
name: "@js-temporal/polyfill",
version: "^0.4.4",
},
globalNames: [
{
name: "Temporal",
exportName: "Temporal",
},
],
},
],
},
typeCheck: "both",
declaration: "separate",
declarationMap: true,
Expand Down
63 changes: 63 additions & 0 deletions src/mq.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import { assertEquals, assertGreater } from "@std/assert";
import { delay } from "@std/async/delay";
import { Redis } from "ioredis";
import { RedisMessageQueue } from "./mq.ts";

Deno.test("RedisMessageQueue", async (t) => {
const mq = new RedisMessageQueue(() => new Redis(), {
loopInterval: { seconds: 1 },
});
const mq2 = new RedisMessageQueue(() => new Redis(), {
loopInterval: { seconds: 1 },
});

const messages: string[] = [];
mq.listen((message: string) => {
messages.push(message);
});
mq2.listen((message: string) => {
messages.push(message);
});

await t.step("enqueue()", async () => {
await mq.enqueue("Hello, world!");
});

await waitFor(() => messages.length > 0, 15_000);

await t.step("listen()", () => {
assertEquals(messages, ["Hello, world!"]);
});

let started = 0;
await t.step("enqueue() with delay", async () => {
started = Date.now();
await mq.enqueue(
"Delayed message",
{ delay: Temporal.Duration.from({ seconds: 3 }) },
);
});

await waitFor(() => messages.length > 1, 15_000);

await t.step("listen() with delay", () => {
assertEquals(messages, ["Hello, world!", "Delayed message"]);
assertGreater(Date.now() - started, 3_000);
});

mq[Symbol.dispose]();
mq2[Symbol.dispose]();
});

async function waitFor(
predicate: () => boolean,
timeoutMs: number,
): Promise<void> {
const started = Date.now();
while (!predicate()) {
await delay(500);
if (Date.now() - started > timeoutMs) {
throw new Error("Timeout");
}
}
}
141 changes: 141 additions & 0 deletions src/mq.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
// deno-lint-ignore-file no-explicit-any
import type { MessageQueue, MessageQueueEnqueueOptions } from "@fedify/fedify";
import type { Redis, RedisKey } from "ioredis";
import { type Codec, JsonCodec } from "./codec.ts";

/**
* Options for {@link RedisMessageQueue} class.
*/
export interface RedisMessageQueueOptions {
/**
* The unique identifier for the worker that is processing messages from the
* queue. If this is not specified, a random identifier will be generated.
* This is used to prevent multiple workers from processing the same message,
* so it should be unique for each worker.
*/
workerId?: string;

/**
* The Pub/Sub channel key to use for the message queue. `"fedify_channel"`
* by default.
*/
channelKey?: RedisKey;

/**
* The Sorted Set key to use for the delayed message queue. `"fedify_queue"`
* by default.
*/
queueKey?: RedisKey;

/**
* The key to use for locking the message queue. `"fedify_lock"` by default.
*/
lockKey?: RedisKey;

/**
* The codec to use for encoding and decoding messages in the key-value store.
* Defaults to {@link JsonCodec}.
*/
codec?: Codec;

/**
* The interval at which to poll the message queue for delayed messages.
* If this interval is too short, it may cause excessive load on the Redis
* server. If it is too long, it may cause messages to be delayed longer
* than expected.
*
* 5 seconds by default.
*/
loopInterval?: Temporal.DurationLike;
}

/**
* A message queue that uses Redis as the underlying storage.
*/
export class RedisMessageQueue implements MessageQueue, Disposable {
#redis: Redis;
#subRedis: Redis;
#workerId: string;
#channelKey: RedisKey;
#queueKey: RedisKey;
#lockKey: RedisKey;
#codec: Codec;
#loopInterval: Temporal.Duration;
#loopHandle?: ReturnType<typeof setInterval>;

/**
* Creates a new Redis message queue.
* @param redis The Redis client factory.
* @param options The options for the message queue.
*/
constructor(redis: () => Redis, options: RedisMessageQueueOptions = {}) {
this.#redis = redis();
this.#subRedis = redis();
this.#workerId = options.workerId ?? crypto.randomUUID();
this.#channelKey = options.channelKey ?? "fedify_channel";
this.#queueKey = options.queueKey ?? "fedify_queue";
this.#lockKey = options.lockKey ?? "fedify_lock";
this.#codec = options.codec ?? new JsonCodec();
this.#loopInterval = Temporal.Duration.from(
options.loopInterval ?? { seconds: 5 },
);
}

async enqueue(
message: any,
options?: MessageQueueEnqueueOptions,
): Promise<void> {
const ts = options?.delay == null
? 0
: Temporal.Now.instant().add(options.delay).epochMilliseconds;
const encodedMessage = this.#codec.encode(message);
await this.#redis.zadd(this.#queueKey, ts, encodedMessage);
if (ts < 1) this.#redis.publish(this.#channelKey, "");
}

async #poll(): Promise<any | undefined> {
const result = await this.#redis.setnx(this.#lockKey, this.#workerId);
if (result < 1) return;
await this.#redis.expire(
this.#lockKey,
this.#loopInterval.total({ unit: "seconds" }) * 2,
);
const messages = await this.#redis.zrangebyscoreBuffer(
this.#queueKey,
0,
Temporal.Now.instant().epochMilliseconds,
);
try {
if (messages.length < 1) return;
const message = messages[0];
await this.#redis.zrem(this.#queueKey, message);
return this.#codec.decode(message);
} finally {
await this.#redis.del(this.#lockKey);
}
}

listen(handler: (message: any) => void | Promise<void>): void {
if (this.#loopHandle != null) {
throw new Error("Already listening");
}
this.#loopHandle = setInterval(async () => {
const message = await this.#poll();
if (message === undefined) return;
await handler(message);
}, this.#loopInterval.total({ unit: "milliseconds" }));
this.#subRedis.subscribe(this.#channelKey, () => {
this.#subRedis.on("message", async () => {
const message = await this.#poll();
if (message === undefined) return;
await handler(message);
});
});
}

[Symbol.dispose](): void {
clearInterval(this.#loopHandle);
this.#redis.disconnect();
this.#subRedis.disconnect();
}
}

0 comments on commit c7c9dbc

Please sign in to comment.