Skip to content

Commit

Permalink
Refactor how it polls
Browse files Browse the repository at this point in the history
  • Loading branch information
dahlia committed Oct 3, 2024
1 parent c45179d commit 383bc78
Show file tree
Hide file tree
Showing 9 changed files with 170 additions and 94 deletions.
10 changes: 5 additions & 5 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
{
"cSpell.enabled": false,
"deno.enable": true,
"deno.unstable": true,
"deno.unstable": [
"temporal"
],
"files.eol": "\n",
"files.insertFinalNewline": true,
"files.trimFinalNewlines": true,
Expand All @@ -18,8 +21,5 @@
"editor.codeActionsOnSave": {
"source.sortImports": "always"
}
},
"cSpell.words": [
"fedify"
]
}
}
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,10 @@ Changelog

To be released.

- Polling is now more efficient.
- Renamed `RedisMessageQueueOptions.loopInterval` option to `pollInterval`
option.

### Version 0.2.0

Released on September 26, 2024.
Expand Down
7 changes: 4 additions & 3 deletions deno.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,10 @@
"imports": {
"@deno/dnt": "jsr:@deno/dnt@^0.41.3",
"@fedify/fedify": "jsr:@fedify/fedify@^1.0.0",
"@std/assert": "jsr:@std/assert@^0.226.0",
"@std/async": "jsr:@std/async@^0.224.2",
"ioredis": "npm:ioredis@^5.4.0"
"@logtape/logtape": "jsr:@logtape/logtape@^0.6.3",
"@std/assert": "jsr:@std/assert@^1.0.6",
"@std/async": "jsr:@std/async@^1.0.5",
"ioredis": "npm:ioredis@^5.4.1"
},
"unstable": [
"temporal"
Expand Down
101 changes: 57 additions & 44 deletions deno.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions dnt.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ await Deno.writeTextFile(
"@fedify/fedify": metadata.imports["@fedify/fedify"]
.replace(/^jsr:/, "npm:")
.replace(/\+.+$/, ""),
"@logtape/logtape": metadata.imports["@logtape/logtape"]
.replace(/^jsr:/, "npm:")
.replace(/\+.+$/, ""),
},
}),
);
Expand Down
4 changes: 2 additions & 2 deletions src/codec.test.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { assertEquals } from "@std/assert/assert-equals";
import { assertThrows } from "@std/assert/assert-throws";
import { assertEquals } from "@std/assert/equals";
import { assertThrows } from "@std/assert/throws";
import { Buffer } from "node:buffer";
import { DecodingError, EncodingError, JsonCodec } from "./codec.ts";

Expand Down
2 changes: 1 addition & 1 deletion src/kv.test.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { assertEquals } from "@std/assert/assert-equals";
import { assertEquals } from "@std/assert/equals";
import { Redis } from "ioredis";
import { RedisKvStore } from "./kv.ts";

Expand Down
22 changes: 17 additions & 5 deletions src/mq.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,24 +8,24 @@ Deno.test("RedisMessageQueue", async (t) => {
const queueKey = `fedify_test_queue_${crypto.randomUUID()}`;
const lockKey = `fedify_test_lock_${crypto.randomUUID()}`;
const mq = new RedisMessageQueue(() => new Redis(), {
loopInterval: { seconds: 1 },
pollInterval: { seconds: 1 },
channelKey,
queueKey,
lockKey,
});
const mq2 = new RedisMessageQueue(() => new Redis(), {
loopInterval: { seconds: 1 },
pollInterval: { seconds: 1 },
channelKey,
queueKey,
lockKey,
});

const messages: string[] = [];
const messages: (string | number)[] = [];
const controller = new AbortController();
const listening = mq.listen((message: string) => {
const listening = mq.listen((message: string | number) => {
messages.push(message);
}, controller);
const listening2 = mq2.listen((message: string) => {
const listening2 = mq2.listen((message: string | number) => {
messages.push(message);
}, controller);

Expand Down Expand Up @@ -55,6 +55,18 @@ Deno.test("RedisMessageQueue", async (t) => {
assertGreater(Date.now() - started, 3_000);
});

await t.step("enqueue() [bulk]", async () => {
for (let i = 0; i < 1_000; i++) await mq.enqueue(i);
});

await waitFor(() => messages.length > 1_001, 30_000);

await t.step("listen() [bulk]", () => {
const numbers: Set<number> = new Set();
for (let i = 0; i < 1_000; i++) numbers.add(i);
assertEquals(new Set(messages.slice(2)), numbers);
});

controller.abort();
await listening;
await listening2;
Expand Down
Loading

0 comments on commit 383bc78

Please sign in to comment.