diff --git a/CHANGELOG.md b/CHANGELOG.md index b933d0b9d9..e09c609094 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -53,8 +53,9 @@ * Node: Added PFMERGE command ([#2053](https://github.com/valkey-io/valkey-glide/pull/2053)) * Node: Added WATCH and UNWATCH commands ([#2076](https://github.com/valkey-io/valkey-glide/pull/2076)) * Node: Added ZLEXCOUNT command ([#2022](https://github.com/valkey-io/valkey-glide/pull/2022)) -* Node: Added ZREMRANGEBYLEX command ([#2025]((https://github.com/valkey-io/valkey-glide/pull/2025)) +* Node: Added ZREMRANGEBYLEX command ([#2025](https://github.com/valkey-io/valkey-glide/pull/2025)) * Node: Added SRANDMEMBER command ([#2067](https://github.com/valkey-io/valkey-glide/pull/2067)) +* Node: Added XINFO STREAM command ([#2083](https://github.com/valkey-io/valkey-glide/pull/2083)) * Node: Added ZSCAN command ([#2061](https://github.com/valkey-io/valkey-glide/pull/2061)) * Node: Added SETRANGE command ([#2066](https://github.com/valkey-io/valkey-glide/pull/2066)) * Node: Added APPEND command ([#2095](https://github.com/valkey-io/valkey-glide/pull/2095)) diff --git a/java/integTest/src/test/java/glide/TestUtilities.java b/java/integTest/src/test/java/glide/TestUtilities.java index 55d5a69d55..9fc3a2931b 100644 --- a/java/integTest/src/test/java/glide/TestUtilities.java +++ b/java/integTest/src/test/java/glide/TestUtilities.java @@ -30,10 +30,10 @@ @UtilityClass public class TestUtilities { /** Extract integer parameter value from INFO command output */ - public static int getValueFromInfo(String data, String value) { + public static long getValueFromInfo(String data, String value) { for (var line : data.split("\r\n")) { if (line.contains(value)) { - return Integer.parseInt(line.split(":")[1]); + return Long.parseLong(line.split(":")[1]); } } fail(); diff --git a/java/integTest/src/test/java/glide/cluster/CommandTests.java b/java/integTest/src/test/java/glide/cluster/CommandTests.java index 89ca922122..0ea506226e 100644 --- a/java/integTest/src/test/java/glide/cluster/CommandTests.java +++ b/java/integTest/src/test/java/glide/cluster/CommandTests.java @@ -416,14 +416,14 @@ public void clientGetName_with_multi_node_route() { public void config_reset_stat() { var data = clusterClient.info(InfoOptions.builder().section(STATS).build()).get(); String firstNodeInfo = getFirstEntryFromMultiValue(data); - int value_before = getValueFromInfo(firstNodeInfo, "total_net_input_bytes"); + long value_before = getValueFromInfo(firstNodeInfo, "total_net_input_bytes"); var result = clusterClient.configResetStat().get(); assertEquals(OK, result); data = clusterClient.info(InfoOptions.builder().section(STATS).build()).get(); firstNodeInfo = getFirstEntryFromMultiValue(data); - int value_after = getValueFromInfo(firstNodeInfo, "total_net_input_bytes"); + long value_after = getValueFromInfo(firstNodeInfo, "total_net_input_bytes"); assertTrue(value_after < value_before); } diff --git a/java/integTest/src/test/java/glide/standalone/CommandTests.java b/java/integTest/src/test/java/glide/standalone/CommandTests.java index 0a422e52dc..f518766b4c 100644 --- a/java/integTest/src/test/java/glide/standalone/CommandTests.java +++ b/java/integTest/src/test/java/glide/standalone/CommandTests.java @@ -278,13 +278,13 @@ public void clientGetName() { @SneakyThrows public void config_reset_stat() { String data = regularClient.info(InfoOptions.builder().section(STATS).build()).get(); - int value_before = getValueFromInfo(data, "total_net_input_bytes"); + long value_before = getValueFromInfo(data, "total_net_input_bytes"); var result = regularClient.configResetStat().get(); assertEquals(OK, result); data = regularClient.info(InfoOptions.builder().section(STATS).build()).get(); - int value_after = getValueFromInfo(data, "total_net_input_bytes"); + long value_after = getValueFromInfo(data, "total_net_input_bytes"); assertTrue(value_after < value_before); } diff --git a/node/npm/glide/index.ts b/node/npm/glide/index.ts index 992bc06190..a78c0bdd0c 100644 --- a/node/npm/glide/index.ts +++ b/node/npm/glide/index.ts @@ -146,6 +146,8 @@ function initialize() { ExecAbortError, RedisError, ReturnType, + StreamEntries, + ReturnTypeXinfoStream, RequestError, TimeoutError, ConnectionError, @@ -199,6 +201,8 @@ function initialize() { FunctionStatsResponse, SlotIdTypes, SlotKeyTypes, + StreamEntries, + ReturnTypeXinfoStream, RouteByAddress, Routes, SingleNodeRoute, diff --git a/node/src/BaseClient.ts b/node/src/BaseClient.ts index b24b7b43b7..12ac3de8f9 100644 --- a/node/src/BaseClient.ts +++ b/node/src/BaseClient.ts @@ -40,6 +40,7 @@ import { RangeByIndex, RangeByLex, RangeByScore, + ReturnTypeXinfoStream, ScoreBoundary, ScoreFilter, SearchOrigin, @@ -164,6 +165,7 @@ import { createXGroupCreate, createXGroupDestroy, createXInfoConsumers, + createXInfoStream, createXLen, createXRead, createXTrim, @@ -4076,6 +4078,76 @@ export class BaseClient { return this.createWritePromise(createXGroupDestroy(key, groupName)); } + /** + * Returns information about the stream stored at `key`. + * + * @param key - The key of the stream. + * @param fullOptions - If `true`, returns verbose information with a limit of the first 10 PEL entries. + * If `number` is specified, returns verbose information limiting the returned PEL entries. + * If `0` is specified, returns verbose information with no limit. + * @returns A {@link ReturnTypeXinfoStream} of detailed stream information for the given `key`. See + * the example for a sample response. + * @example + * ```typescript + * const infoResult = await client.xinfoStream("my_stream"); + * console.log(infoResult); + * // Output: { + * // length: 2, + * // 'radix-tree-keys': 1, + * // 'radix-tree-nodes': 2, + * // 'last-generated-id': '1719877599564-1', + * // 'max-deleted-entry-id': '0-0', + * // 'entries-added': 2, + * // 'recorded-first-entry-id': '1719877599564-0', + * // 'first-entry': [ '1719877599564-0', ['some_field", "some_value', ...] ], + * // 'last-entry': [ '1719877599564-0', ['some_field", "some_value', ...] ], + * // groups: 1, + * // } + * ``` + * + * @example + * ```typescript + * const infoResult = await client.xinfoStream("my_stream", true); // default limit of 10 entries + * const infoResult = await client.xinfoStream("my_stream", 15); // limit of 15 entries + * console.log(infoResult); + * // Output: { + * // length: 2, + * // 'radix-tree-keys': 1, + * // 'radix-tree-nodes': 2, + * // 'last-generated-id': '1719877599564-1', + * // 'max-deleted-entry-id': '0-0', + * // 'entries-added': 2, + * // 'recorded-first-entry-id': '1719877599564-0', + * // entries: [ [ '1719877599564-0', ['some_field", "some_value', ...] ] ], + * // groups: [ { + * // name: 'group', + * // 'last-delivered-id': '1719877599564-0', + * // 'entries-read': 1, + * // lag: 1, + * // 'pel-count': 1, + * // pending: [ [ '1719877599564-0', 'consumer', 1722624726802, 1 ] ], + * // consumers: [ { + * // name: 'consumer', + * // 'seen-time': 1722624726802, + * // 'active-time': 1722624726802, + * // 'pel-count': 1, + * // pending: [ [ '1719877599564-0', 'consumer', 1722624726802, 1 ] ], + * // } + * // ] + * // } + * // ] + * // } + * ``` + */ + public async xinfoStream( + key: string, + fullOptions?: boolean | number, + ): Promise { + return this.createWritePromise( + createXInfoStream(key, fullOptions ?? false), + ); + } + private readonly MAP_READ_FROM_STRATEGY: Record< ReadFrom, connection_request.ReadFrom diff --git a/node/src/Commands.ts b/node/src/Commands.ts index 25de2e0cf8..10035334be 100644 --- a/node/src/Commands.ts +++ b/node/src/Commands.ts @@ -2352,6 +2352,41 @@ export function createXRead( return createCommand(RequestType.XRead, args); } +/** + * Represents a the return type for XInfo Stream in the response + */ +export type ReturnTypeXinfoStream = { + [key: string]: + | StreamEntries + | Record[]>[]; +}; + +/** + * Represents an array of Stream Entires in the response + */ +export type StreamEntries = string | number | (string | number | string[])[][]; + +/** + * @internal + */ +export function createXInfoStream( + key: string, + options: boolean | number, +): command_request.Command { + const args: string[] = [key]; + + if (options != false) { + args.push("FULL"); + + if (typeof options === "number") { + args.push("COUNT"); + args.push(options.toString()); + } + } + + return createCommand(RequestType.XInfoStream, args); +} + /** * @internal */ diff --git a/node/src/Transaction.ts b/node/src/Transaction.ts index bbfc58971e..614577ea0a 100644 --- a/node/src/Transaction.ts +++ b/node/src/Transaction.ts @@ -3,6 +3,7 @@ */ import { + BaseClient, // eslint-disable-line @typescript-eslint/no-unused-vars ReadFrom, // eslint-disable-line @typescript-eslint/no-unused-vars } from "./BaseClient"; @@ -42,6 +43,7 @@ import { RangeByIndex, RangeByLex, RangeByScore, + ReturnTypeXinfoStream, // eslint-disable-line @typescript-eslint/no-unused-vars ScoreBoundary, ScoreFilter, SearchOrigin, @@ -195,6 +197,7 @@ import { createXClaim, createXDel, createXInfoConsumers, + createXInfoStream, createXLen, createXRead, createXTrim, @@ -2239,6 +2242,21 @@ export class BaseTransaction> { return this.addAndReturn(createXTrim(key, options)); } + /** + * Returns information about the stream stored at `key`. + * + * @param key - The key of the stream. + * @param fullOptions - If `true`, returns verbose information with a limit of the first 10 PEL entries. + * If `number` is specified, returns verbose information limiting the returned PEL entries. + * If `0` is specified, returns verbose information with no limit. + * + * Command Response - A {@link ReturnTypeXinfoStream} of detailed stream information for the given `key`. + * See example of {@link BaseClient.xinfoStream} for more details. + */ + public xinfoStream(key: string, fullOptions?: boolean | number): T { + return this.addAndReturn(createXInfoStream(key, fullOptions ?? false)); + } + /** Returns the server time. * See https://valkey.io/commands/time/ for details. * diff --git a/node/tests/GlideClient.test.ts b/node/tests/GlideClient.test.ts index eb4f0d793d..d74776ced0 100644 --- a/node/tests/GlideClient.test.ts +++ b/node/tests/GlideClient.test.ts @@ -1053,6 +1053,61 @@ describe("GlideClient", () => { TIMEOUT, ); + it.each([ProtocolVersion.RESP2, ProtocolVersion.RESP3])( + "xinfo stream transaction test_%p", + async (protocol) => { + const client = await GlideClient.createClient( + getClientConfigurationOption(cluster.getAddresses(), protocol), + ); + + const key = uuidv4(); + + const transaction = new Transaction(); + transaction.xadd(key, [["field1", "value1"]], { id: "0-1" }); + transaction.xinfoStream(key); + transaction.xinfoStream(key, true); + const result = await client.exec(transaction); + expect(result).not.toBeNull(); + + const versionLessThan7 = + cluster.checkIfServerVersionLessThan("7.0.0"); + + const expectedXinfoStreamResult = { + length: 1, + "radix-tree-keys": 1, + "radix-tree-nodes": 2, + "last-generated-id": "0-1", + groups: 0, + "first-entry": ["0-1", ["field1", "value1"]], + "last-entry": ["0-1", ["field1", "value1"]], + "max-deleted-entry-id": versionLessThan7 ? undefined : "0-0", + "entries-added": versionLessThan7 ? undefined : 1, + "recorded-first-entry-id": versionLessThan7 ? undefined : "0-1", + }; + + const expectedXinfoStreamFullResult = { + length: 1, + "radix-tree-keys": 1, + "radix-tree-nodes": 2, + "last-generated-id": "0-1", + entries: [["0-1", ["field1", "value1"]]], + groups: [], + "max-deleted-entry-id": versionLessThan7 ? undefined : "0-0", + "entries-added": versionLessThan7 ? undefined : 1, + "recorded-first-entry-id": versionLessThan7 ? undefined : "0-1", + }; + + if (result != null) { + expect(result[0]).toEqual("0-1"); // xadd + expect(result[1]).toEqual(expectedXinfoStreamResult); + expect(result[2]).toEqual(expectedXinfoStreamFullResult); + } + + client.close(); + }, + TIMEOUT, + ); + runBaseTests({ init: async (protocol, clientName?) => { const options = getClientConfigurationOption( diff --git a/node/tests/SharedTests.ts b/node/tests/SharedTests.ts index 766ec0ade2..485d3b329a 100644 --- a/node/tests/SharedTests.ts +++ b/node/tests/SharedTests.ts @@ -4072,13 +4072,7 @@ export function runBaseTests(config: { expect(await client.type(key)).toEqual("hash"); expect(await client.del([key])).toEqual(1); - await client.customCommand([ - "XADD", - key, - "*", - "field", - "value", - ]); + await client.xadd(key, [["field", "value"]]); expect(await client.type(key)).toEqual("stream"); expect(await client.del([key])).toEqual(1); expect(await client.type(key)).toEqual("none"); @@ -4931,6 +4925,224 @@ export function runBaseTests(config: { config.timeout, ); + it.each([ProtocolVersion.RESP2, ProtocolVersion.RESP3])( + `xinfo stream test_%p`, + async (protocol) => { + await runTest(async (client: BaseClient) => { + const key = uuidv4(); + const groupName = `group-${uuidv4()}`; + const consumerName = `consumer-${uuidv4()}`; + const streamId0_0 = "0-0"; + const streamId1_0 = "1-0"; + const streamId1_1 = "1-1"; + + // Setup: add stream entry, create consumer group and consumer, read from stream with consumer + expect( + await client.xadd( + key, + [ + ["a", "b"], + ["c", "d"], + ], + { id: streamId1_0 }, + ), + ).toEqual(streamId1_0); + + // TODO: uncomment when XGROUP CREATE is implemented + // expect(await client.xgroupCreate(key, groupName, streamId0_0)).toEqual("Ok"); + expect( + await client.customCommand([ + "XGROUP", + "CREATE", + key, + groupName, + streamId0_0, + ]), + ).toEqual("OK"); + + // TODO: uncomment when XREADGROUP is implemented + // const xreadgroupResult = await client.xreadgroup([[key, ">"]], groupName, consumerName); + await client.customCommand([ + "XREADGROUP", + "GROUP", + groupName, + consumerName, + "STREAMS", + key, + ">", + ]); + + // test xinfoStream base (non-full) case: + const result = (await client.xinfoStream(key)) as { + length: number; + "radix-tree-keys": number; + "radix-tree-nodes": number; + "last-generated-id": string; + "max-deleted-entry-id": string; + "entries-added": number; + "recorded-first-entry-id": string; + "first-entry": (string | number | string[])[]; + "last-entry": (string | number | string[])[]; + groups: number; + }; + console.log(result); + + // verify result: + expect(result.length).toEqual(1); + const expectedFirstEntry = ["1-0", ["a", "b", "c", "d"]]; + expect(result["first-entry"]).toEqual(expectedFirstEntry); + expect(result["last-entry"]).toEqual(expectedFirstEntry); + expect(result.groups).toEqual(1); + + // Add one more entry + expect( + await client.xadd(key, [["foo", "bar"]], { + id: streamId1_1, + }), + ).toEqual(streamId1_1); + const fullResult = (await client.xinfoStream(key, 1)) as { + length: number; + "radix-tree-keys": number; + "radix-tree-nodes": number; + "last-generated-id": string; + "max-deleted-entry-id": string; + "entries-added": number; + "recorded-first-entry-id": string; + entries: (string | number | string[])[][]; + groups: [ + { + name: string; + "last-delivered-id": string; + "entries-read": number; + lag: number; + "pel-count": number; + pending: (string | number)[][]; + consumers: [ + { + name: string; + "seen-time": number; + "active-time": number; + "pel-count": number; + pending: (string | number)[][]; + }, + ]; + }, + ]; + }; + + // verify full result like: + // { + // length: 2, + // 'radix-tree-keys': 1, + // 'radix-tree-nodes': 2, + // 'last-generated-id': '1-1', + // 'max-deleted-entry-id': '0-0', + // 'entries-added': 2, + // 'recorded-first-entry-id': '1-0', + // entries: [ [ '1-0', ['a', 'b', ...] ] ], + // groups: [ { + // name: 'group', + // 'last-delivered-id': '1-0', + // 'entries-read': 1, + // lag: 1, + // 'pel-count': 1, + // pending: [ [ '1-0', 'consumer', 1722624726802, 1 ] ], + // consumers: [ { + // name: 'consumer', + // 'seen-time': 1722624726802, + // 'active-time': 1722624726802, + // 'pel-count': 1, + // pending: [ [ '1-0', 'consumer', 1722624726802, 1 ] ], + // } + // ] + // } + // ] + // } + expect(fullResult.length).toEqual(2); + expect(fullResult["recorded-first-entry-id"]).toEqual( + streamId1_0, + ); + + // Only the first entry will be returned since we passed count: 1 + expect(fullResult.entries).toEqual([expectedFirstEntry]); + + // compare groupName, consumerName, and pending messages from the full info result: + const fullResultGroups = fullResult.groups; + expect(fullResultGroups.length).toEqual(1); + expect(fullResultGroups[0]["name"]).toEqual(groupName); + + const pendingResult = fullResultGroups[0]["pending"]; + expect(pendingResult.length).toEqual(1); + expect(pendingResult[0][0]).toEqual(streamId1_0); + expect(pendingResult[0][1]).toEqual(consumerName); + + const consumersResult = fullResultGroups[0]["consumers"]; + expect(consumersResult.length).toEqual(1); + expect(consumersResult[0]["name"]).toEqual(consumerName); + + const consumerPendingResult = fullResultGroups[0]["pending"]; + expect(consumerPendingResult.length).toEqual(1); + expect(consumerPendingResult[0][0]).toEqual(streamId1_0); + expect(consumerPendingResult[0][1]).toEqual(consumerName); + }, protocol); + }, + config.timeout, + ); + + it.each([ProtocolVersion.RESP2, ProtocolVersion.RESP3])( + `xinfo stream edge cases and failures test_%p`, + async (protocol) => { + await runTest(async (client: BaseClient) => { + const key = `{key}-1-${uuidv4()}`; + const stringKey = `{key}-2-${uuidv4()}`; + const nonExistentKey = `{key}-3-${uuidv4()}`; + const streamId1_0 = "1-0"; + + // Setup: create empty stream + expect( + await client.xadd(key, [["field", "value"]], { + id: streamId1_0, + }), + ).toEqual(streamId1_0); + expect(await client.xdel(key, [streamId1_0])).toEqual(1); + + // XINFO STREAM called against empty stream + const result = await client.xinfoStream(key); + expect(result["length"]).toEqual(0); + expect(result["first-entry"]).toEqual(null); + expect(result["last-entry"]).toEqual(null); + + // XINFO STREAM FULL called against empty stream. Negative count values are ignored. + const fullResult = await client.xinfoStream(key, -3); + expect(fullResult["length"]).toEqual(0); + expect(fullResult["entries"]).toEqual([]); + expect(fullResult["groups"]).toEqual([]); + + // Calling XINFO STREAM with a non-existing key raises an error + await expect( + client.xinfoStream(nonExistentKey), + ).rejects.toThrow(); + await expect( + client.xinfoStream(nonExistentKey, true), + ).rejects.toThrow(); + await expect( + client.xinfoStream(nonExistentKey, 2), + ).rejects.toThrow(); + + // Key exists, but it is not a stream + await client.set(stringKey, "boofar"); + await expect(client.xinfoStream(stringKey)).rejects.toThrow(); + await expect( + client.xinfoStream(stringKey, true), + ).rejects.toThrow(); + await expect( + client.xinfoStream(stringKey, 2), + ).rejects.toThrow(); + }, protocol); + }, + config.timeout, + ); + it.each([ProtocolVersion.RESP2, ProtocolVersion.RESP3])( "rename test_%p", async (protocol) => {