From c7c9dbcd29a850d4230d772071296d67be2a095f Mon Sep 17 00:00:00 2001 From: Hong Minhee Date: Thu, 20 Jun 2024 15:51:00 +0900 Subject: [PATCH] RedisMessageQueue --- README.md | 7 ++- deno.json | 4 ++ deno.lock | 5 ++ dnt.ts | 18 ++++++- src/mq.test.ts | 63 ++++++++++++++++++++++ src/mq.ts | 141 +++++++++++++++++++++++++++++++++++++++++++++++++ 6 files changed, 236 insertions(+), 2 deletions(-) create mode 100644 src/mq.test.ts create mode 100644 src/mq.ts diff --git a/README.md b/README.md index b971f0d..a43ca51 100644 --- a/README.md +++ b/README.md @@ -8,7 +8,10 @@ [![GitHub Actions][GitHub Actions badge]][GitHub Actions] This package provides [Fedify]'s [`KvStore`] and [`MessageQueue`] -implementations for Redis. +implementations for Redis: + + - [`RedisKvStore`] + - [`RedisMessageQueue`] [JSR]: https://jsr.io/@fedify/redis [JSR badge]: https://jsr.io/badges/@fedify/redis @@ -19,6 +22,8 @@ implementations for Redis. [Fedify]: https://fedify.dev/ [`KvStore`]: https://jsr.io/@fedify/fedify/doc/federation/~/KvStore [`MessageQueue`]: https://jsr.io/@fedify/fedify/doc/federation/~/MessageQueue +[`RedisKvStore`]: https://jsr.io/@fedify/redis/doc/kv/~/RedisKvStore +[`RedisMessageQueue`]: https://jsr.io/@fedify/redis/doc/kv/~/RedisMessageQueue Changelog diff --git a/deno.json b/deno.json index 73032fe..e506f42 100644 --- a/deno.json +++ b/deno.json @@ -10,8 +10,12 @@ "@deno/dnt": "jsr:@deno/dnt@^0.41.2", "@fedify/fedify": "jsr:@fedify/fedify@^0.10.0", "@std/assert": "jsr:@std/assert@^0.226.0", + "@std/async": "jsr:@std/async@^0.224.2", "ioredis": "npm:ioredis@^5.4.0" }, + "unstable": [ + "temporal" + ], "exclude": [ "npm" ], diff --git a/deno.lock b/deno.lock index 9f96ec9..5eac546 100644 --- a/deno.lock +++ b/deno.lock @@ -8,6 +8,7 @@ "jsr:@logtape/logtape@^0.4.0": "jsr:@logtape/logtape@0.4.0", "jsr:@std/assert@^0.218.2": "jsr:@std/assert@0.218.2", "jsr:@std/assert@^0.226.0": "jsr:@std/assert@0.226.0", + "jsr:@std/async@^0.224.2": "jsr:@std/async@0.224.2", "jsr:@std/bytes@^0.218.2": "jsr:@std/bytes@0.218.2", "jsr:@std/bytes@^1.0.0": "jsr:@std/bytes@1.0.0", "jsr:@std/encoding@^0.224.3": "jsr:@std/encoding@0.224.3", @@ -83,6 +84,9 @@ "jsr:@std/internal@^1.0.0" ] }, + "@std/async@0.224.2": { + "integrity": "4d277d6e165df43d5e061ba0ef3edfddb8e8d558f5b920e3e6b1d2614b44d074" + }, "@std/bytes@0.218.2": { "integrity": "91fe54b232dcca73856b79a817247f4a651dbb60d51baafafb6408c137241670" }, @@ -549,6 +553,7 @@ "jsr:@deno/dnt@^0.41.2", "jsr:@fedify/fedify@^0.10.0", "jsr:@std/assert@^0.226.0", + "jsr:@std/async@^0.224.2", "npm:ioredis@^5.4.0" ] } diff --git a/dnt.ts b/dnt.ts index f845de0..a0c7b3f 100644 --- a/dnt.ts +++ b/dnt.ts @@ -40,7 +40,23 @@ await build({ outDir: "./npm", entryPoints: ["./mod.ts"], importMap, - shims: { deno: true }, + shims: { + deno: true, + custom: [ + { + package: { + name: "@js-temporal/polyfill", + version: "^0.4.4", + }, + globalNames: [ + { + name: "Temporal", + exportName: "Temporal", + }, + ], + }, + ], + }, typeCheck: "both", declaration: "separate", declarationMap: true, diff --git a/src/mq.test.ts b/src/mq.test.ts new file mode 100644 index 0000000..ac3072a --- /dev/null +++ b/src/mq.test.ts @@ -0,0 +1,63 @@ +import { assertEquals, assertGreater } from "@std/assert"; +import { delay } from "@std/async/delay"; +import { Redis } from "ioredis"; +import { RedisMessageQueue } from "./mq.ts"; + +Deno.test("RedisMessageQueue", async (t) => { + const mq = new RedisMessageQueue(() => new Redis(), { + loopInterval: { seconds: 1 }, + }); + const mq2 = new RedisMessageQueue(() => new Redis(), { + loopInterval: { seconds: 1 }, + }); + + const messages: string[] = []; + mq.listen((message: string) => { + messages.push(message); + }); + mq2.listen((message: string) => { + messages.push(message); + }); + + await t.step("enqueue()", async () => { + await mq.enqueue("Hello, world!"); + }); + + await waitFor(() => messages.length > 0, 15_000); + + await t.step("listen()", () => { + assertEquals(messages, ["Hello, world!"]); + }); + + let started = 0; + await t.step("enqueue() with delay", async () => { + started = Date.now(); + await mq.enqueue( + "Delayed message", + { delay: Temporal.Duration.from({ seconds: 3 }) }, + ); + }); + + await waitFor(() => messages.length > 1, 15_000); + + await t.step("listen() with delay", () => { + assertEquals(messages, ["Hello, world!", "Delayed message"]); + assertGreater(Date.now() - started, 3_000); + }); + + mq[Symbol.dispose](); + mq2[Symbol.dispose](); +}); + +async function waitFor( + predicate: () => boolean, + timeoutMs: number, +): Promise { + const started = Date.now(); + while (!predicate()) { + await delay(500); + if (Date.now() - started > timeoutMs) { + throw new Error("Timeout"); + } + } +} diff --git a/src/mq.ts b/src/mq.ts new file mode 100644 index 0000000..a99f789 --- /dev/null +++ b/src/mq.ts @@ -0,0 +1,141 @@ +// deno-lint-ignore-file no-explicit-any +import type { MessageQueue, MessageQueueEnqueueOptions } from "@fedify/fedify"; +import type { Redis, RedisKey } from "ioredis"; +import { type Codec, JsonCodec } from "./codec.ts"; + +/** + * Options for {@link RedisMessageQueue} class. + */ +export interface RedisMessageQueueOptions { + /** + * The unique identifier for the worker that is processing messages from the + * queue. If this is not specified, a random identifier will be generated. + * This is used to prevent multiple workers from processing the same message, + * so it should be unique for each worker. + */ + workerId?: string; + + /** + * The Pub/Sub channel key to use for the message queue. `"fedify_channel"` + * by default. + */ + channelKey?: RedisKey; + + /** + * The Sorted Set key to use for the delayed message queue. `"fedify_queue"` + * by default. + */ + queueKey?: RedisKey; + + /** + * The key to use for locking the message queue. `"fedify_lock"` by default. + */ + lockKey?: RedisKey; + + /** + * The codec to use for encoding and decoding messages in the key-value store. + * Defaults to {@link JsonCodec}. + */ + codec?: Codec; + + /** + * The interval at which to poll the message queue for delayed messages. + * If this interval is too short, it may cause excessive load on the Redis + * server. If it is too long, it may cause messages to be delayed longer + * than expected. + * + * 5 seconds by default. + */ + loopInterval?: Temporal.DurationLike; +} + +/** + * A message queue that uses Redis as the underlying storage. + */ +export class RedisMessageQueue implements MessageQueue, Disposable { + #redis: Redis; + #subRedis: Redis; + #workerId: string; + #channelKey: RedisKey; + #queueKey: RedisKey; + #lockKey: RedisKey; + #codec: Codec; + #loopInterval: Temporal.Duration; + #loopHandle?: ReturnType; + + /** + * Creates a new Redis message queue. + * @param redis The Redis client factory. + * @param options The options for the message queue. + */ + constructor(redis: () => Redis, options: RedisMessageQueueOptions = {}) { + this.#redis = redis(); + this.#subRedis = redis(); + this.#workerId = options.workerId ?? crypto.randomUUID(); + this.#channelKey = options.channelKey ?? "fedify_channel"; + this.#queueKey = options.queueKey ?? "fedify_queue"; + this.#lockKey = options.lockKey ?? "fedify_lock"; + this.#codec = options.codec ?? new JsonCodec(); + this.#loopInterval = Temporal.Duration.from( + options.loopInterval ?? { seconds: 5 }, + ); + } + + async enqueue( + message: any, + options?: MessageQueueEnqueueOptions, + ): Promise { + const ts = options?.delay == null + ? 0 + : Temporal.Now.instant().add(options.delay).epochMilliseconds; + const encodedMessage = this.#codec.encode(message); + await this.#redis.zadd(this.#queueKey, ts, encodedMessage); + if (ts < 1) this.#redis.publish(this.#channelKey, ""); + } + + async #poll(): Promise { + const result = await this.#redis.setnx(this.#lockKey, this.#workerId); + if (result < 1) return; + await this.#redis.expire( + this.#lockKey, + this.#loopInterval.total({ unit: "seconds" }) * 2, + ); + const messages = await this.#redis.zrangebyscoreBuffer( + this.#queueKey, + 0, + Temporal.Now.instant().epochMilliseconds, + ); + try { + if (messages.length < 1) return; + const message = messages[0]; + await this.#redis.zrem(this.#queueKey, message); + return this.#codec.decode(message); + } finally { + await this.#redis.del(this.#lockKey); + } + } + + listen(handler: (message: any) => void | Promise): void { + if (this.#loopHandle != null) { + throw new Error("Already listening"); + } + this.#loopHandle = setInterval(async () => { + const message = await this.#poll(); + if (message === undefined) return; + await handler(message); + }, this.#loopInterval.total({ unit: "milliseconds" })); + this.#subRedis.subscribe(this.#channelKey, () => { + this.#subRedis.on("message", async () => { + const message = await this.#poll(); + if (message === undefined) return; + await handler(message); + }); + }); + } + + [Symbol.dispose](): void { + clearInterval(this.#loopHandle); + this.#redis.disconnect(); + this.#subRedis.disconnect(); + } +}