Skip to content

Commit

Permalink
Merge branch 'main' into node/integ_cyip10_hscan
Browse files Browse the repository at this point in the history
  • Loading branch information
cyip10 authored Aug 8, 2024
2 parents 8ef58d5 + 0425f56 commit 7683c0d
Show file tree
Hide file tree
Showing 10 changed files with 411 additions and 14 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,9 @@
* Node: Added PFMERGE command ([#2053](https://github.com/valkey-io/valkey-glide/pull/2053))
* Node: Added WATCH and UNWATCH commands ([#2076](https://github.com/valkey-io/valkey-glide/pull/2076))
* Node: Added ZLEXCOUNT command ([#2022](https://github.com/valkey-io/valkey-glide/pull/2022))
* Node: Added ZREMRANGEBYLEX command ([#2025]((https://github.com/valkey-io/valkey-glide/pull/2025))
* Node: Added ZREMRANGEBYLEX command ([#2025](https://github.com/valkey-io/valkey-glide/pull/2025))
* Node: Added SRANDMEMBER command ([#2067](https://github.com/valkey-io/valkey-glide/pull/2067))
* Node: Added XINFO STREAM command ([#2083](https://github.com/valkey-io/valkey-glide/pull/2083))
* Node: Added ZSCAN command ([#2061](https://github.com/valkey-io/valkey-glide/pull/2061))
* Node: Added SETRANGE command ([#2066](https://github.com/valkey-io/valkey-glide/pull/2066))
* Node: Added APPEND command ([#2095](https://github.com/valkey-io/valkey-glide/pull/2095))
Expand Down
4 changes: 2 additions & 2 deletions java/integTest/src/test/java/glide/TestUtilities.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@
@UtilityClass
public class TestUtilities {
/** Extract integer parameter value from INFO command output */
public static int getValueFromInfo(String data, String value) {
public static long getValueFromInfo(String data, String value) {
for (var line : data.split("\r\n")) {
if (line.contains(value)) {
return Integer.parseInt(line.split(":")[1]);
return Long.parseLong(line.split(":")[1]);
}
}
fail();
Expand Down
4 changes: 2 additions & 2 deletions java/integTest/src/test/java/glide/cluster/CommandTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -416,14 +416,14 @@ public void clientGetName_with_multi_node_route() {
public void config_reset_stat() {
var data = clusterClient.info(InfoOptions.builder().section(STATS).build()).get();
String firstNodeInfo = getFirstEntryFromMultiValue(data);
int value_before = getValueFromInfo(firstNodeInfo, "total_net_input_bytes");
long value_before = getValueFromInfo(firstNodeInfo, "total_net_input_bytes");

var result = clusterClient.configResetStat().get();
assertEquals(OK, result);

data = clusterClient.info(InfoOptions.builder().section(STATS).build()).get();
firstNodeInfo = getFirstEntryFromMultiValue(data);
int value_after = getValueFromInfo(firstNodeInfo, "total_net_input_bytes");
long value_after = getValueFromInfo(firstNodeInfo, "total_net_input_bytes");
assertTrue(value_after < value_before);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,13 +278,13 @@ public void clientGetName() {
@SneakyThrows
public void config_reset_stat() {
String data = regularClient.info(InfoOptions.builder().section(STATS).build()).get();
int value_before = getValueFromInfo(data, "total_net_input_bytes");
long value_before = getValueFromInfo(data, "total_net_input_bytes");

var result = regularClient.configResetStat().get();
assertEquals(OK, result);

data = regularClient.info(InfoOptions.builder().section(STATS).build()).get();
int value_after = getValueFromInfo(data, "total_net_input_bytes");
long value_after = getValueFromInfo(data, "total_net_input_bytes");
assertTrue(value_after < value_before);
}

Expand Down
4 changes: 4 additions & 0 deletions node/npm/glide/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,8 @@ function initialize() {
ExecAbortError,
RedisError,
ReturnType,
StreamEntries,
ReturnTypeXinfoStream,
RequestError,
TimeoutError,
ConnectionError,
Expand Down Expand Up @@ -199,6 +201,8 @@ function initialize() {
FunctionStatsResponse,
SlotIdTypes,
SlotKeyTypes,
StreamEntries,
ReturnTypeXinfoStream,
RouteByAddress,
Routes,
SingleNodeRoute,
Expand Down
72 changes: 72 additions & 0 deletions node/src/BaseClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import {
RangeByIndex,
RangeByLex,
RangeByScore,
ReturnTypeXinfoStream,
ScoreBoundary,
ScoreFilter,
SearchOrigin,
Expand Down Expand Up @@ -164,6 +165,7 @@ import {
createXGroupCreate,
createXGroupDestroy,
createXInfoConsumers,
createXInfoStream,
createXLen,
createXRead,
createXTrim,
Expand Down Expand Up @@ -4076,6 +4078,76 @@ export class BaseClient {
return this.createWritePromise(createXGroupDestroy(key, groupName));
}

/**
* Returns information about the stream stored at `key`.
*
* @param key - The key of the stream.
* @param fullOptions - If `true`, returns verbose information with a limit of the first 10 PEL entries.
* If `number` is specified, returns verbose information limiting the returned PEL entries.
* If `0` is specified, returns verbose information with no limit.
* @returns A {@link ReturnTypeXinfoStream} of detailed stream information for the given `key`. See
* the example for a sample response.
* @example
* ```typescript
* const infoResult = await client.xinfoStream("my_stream");
* console.log(infoResult);
* // Output: {
* // length: 2,
* // 'radix-tree-keys': 1,
* // 'radix-tree-nodes': 2,
* // 'last-generated-id': '1719877599564-1',
* // 'max-deleted-entry-id': '0-0',
* // 'entries-added': 2,
* // 'recorded-first-entry-id': '1719877599564-0',
* // 'first-entry': [ '1719877599564-0', ['some_field", "some_value', ...] ],
* // 'last-entry': [ '1719877599564-0', ['some_field", "some_value', ...] ],
* // groups: 1,
* // }
* ```
*
* @example
* ```typescript
* const infoResult = await client.xinfoStream("my_stream", true); // default limit of 10 entries
* const infoResult = await client.xinfoStream("my_stream", 15); // limit of 15 entries
* console.log(infoResult);
* // Output: {
* // length: 2,
* // 'radix-tree-keys': 1,
* // 'radix-tree-nodes': 2,
* // 'last-generated-id': '1719877599564-1',
* // 'max-deleted-entry-id': '0-0',
* // 'entries-added': 2,
* // 'recorded-first-entry-id': '1719877599564-0',
* // entries: [ [ '1719877599564-0', ['some_field", "some_value', ...] ] ],
* // groups: [ {
* // name: 'group',
* // 'last-delivered-id': '1719877599564-0',
* // 'entries-read': 1,
* // lag: 1,
* // 'pel-count': 1,
* // pending: [ [ '1719877599564-0', 'consumer', 1722624726802, 1 ] ],
* // consumers: [ {
* // name: 'consumer',
* // 'seen-time': 1722624726802,
* // 'active-time': 1722624726802,
* // 'pel-count': 1,
* // pending: [ [ '1719877599564-0', 'consumer', 1722624726802, 1 ] ],
* // }
* // ]
* // }
* // ]
* // }
* ```
*/
public async xinfoStream(
key: string,
fullOptions?: boolean | number,
): Promise<ReturnTypeXinfoStream> {
return this.createWritePromise(
createXInfoStream(key, fullOptions ?? false),
);
}

private readonly MAP_READ_FROM_STRATEGY: Record<
ReadFrom,
connection_request.ReadFrom
Expand Down
35 changes: 35 additions & 0 deletions node/src/Commands.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2352,6 +2352,41 @@ export function createXRead(
return createCommand(RequestType.XRead, args);
}

/**
* Represents a the return type for XInfo Stream in the response
*/
export type ReturnTypeXinfoStream = {
[key: string]:
| StreamEntries
| Record<string, StreamEntries | Record<string, StreamEntries>[]>[];
};

/**
* Represents an array of Stream Entires in the response
*/
export type StreamEntries = string | number | (string | number | string[])[][];

/**
* @internal
*/
export function createXInfoStream(
key: string,
options: boolean | number,
): command_request.Command {
const args: string[] = [key];

if (options != false) {
args.push("FULL");

if (typeof options === "number") {
args.push("COUNT");
args.push(options.toString());
}
}

return createCommand(RequestType.XInfoStream, args);
}

/**
* @internal
*/
Expand Down
18 changes: 18 additions & 0 deletions node/src/Transaction.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
*/

import {
BaseClient, // eslint-disable-line @typescript-eslint/no-unused-vars
ReadFrom, // eslint-disable-line @typescript-eslint/no-unused-vars
} from "./BaseClient";

Expand Down Expand Up @@ -42,6 +43,7 @@ import {
RangeByIndex,
RangeByLex,
RangeByScore,
ReturnTypeXinfoStream, // eslint-disable-line @typescript-eslint/no-unused-vars
ScoreBoundary,
ScoreFilter,
SearchOrigin,
Expand Down Expand Up @@ -195,6 +197,7 @@ import {
createXClaim,
createXDel,
createXInfoConsumers,
createXInfoStream,
createXLen,
createXRead,
createXTrim,
Expand Down Expand Up @@ -2239,6 +2242,21 @@ export class BaseTransaction<T extends BaseTransaction<T>> {
return this.addAndReturn(createXTrim(key, options));
}

/**
* Returns information about the stream stored at `key`.
*
* @param key - The key of the stream.
* @param fullOptions - If `true`, returns verbose information with a limit of the first 10 PEL entries.
* If `number` is specified, returns verbose information limiting the returned PEL entries.
* If `0` is specified, returns verbose information with no limit.
*
* Command Response - A {@link ReturnTypeXinfoStream} of detailed stream information for the given `key`.
* See example of {@link BaseClient.xinfoStream} for more details.
*/
public xinfoStream(key: string, fullOptions?: boolean | number): T {
return this.addAndReturn(createXInfoStream(key, fullOptions ?? false));
}

/** Returns the server time.
* See https://valkey.io/commands/time/ for details.
*
Expand Down
55 changes: 55 additions & 0 deletions node/tests/GlideClient.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1053,6 +1053,61 @@ describe("GlideClient", () => {
TIMEOUT,
);

it.each([ProtocolVersion.RESP2, ProtocolVersion.RESP3])(
"xinfo stream transaction test_%p",
async (protocol) => {
const client = await GlideClient.createClient(
getClientConfigurationOption(cluster.getAddresses(), protocol),
);

const key = uuidv4();

const transaction = new Transaction();
transaction.xadd(key, [["field1", "value1"]], { id: "0-1" });
transaction.xinfoStream(key);
transaction.xinfoStream(key, true);
const result = await client.exec(transaction);
expect(result).not.toBeNull();

const versionLessThan7 =
cluster.checkIfServerVersionLessThan("7.0.0");

const expectedXinfoStreamResult = {
length: 1,
"radix-tree-keys": 1,
"radix-tree-nodes": 2,
"last-generated-id": "0-1",
groups: 0,
"first-entry": ["0-1", ["field1", "value1"]],
"last-entry": ["0-1", ["field1", "value1"]],
"max-deleted-entry-id": versionLessThan7 ? undefined : "0-0",
"entries-added": versionLessThan7 ? undefined : 1,
"recorded-first-entry-id": versionLessThan7 ? undefined : "0-1",
};

const expectedXinfoStreamFullResult = {
length: 1,
"radix-tree-keys": 1,
"radix-tree-nodes": 2,
"last-generated-id": "0-1",
entries: [["0-1", ["field1", "value1"]]],
groups: [],
"max-deleted-entry-id": versionLessThan7 ? undefined : "0-0",
"entries-added": versionLessThan7 ? undefined : 1,
"recorded-first-entry-id": versionLessThan7 ? undefined : "0-1",
};

if (result != null) {
expect(result[0]).toEqual("0-1"); // xadd
expect(result[1]).toEqual(expectedXinfoStreamResult);
expect(result[2]).toEqual(expectedXinfoStreamFullResult);
}

client.close();
},
TIMEOUT,
);

runBaseTests<Context>({
init: async (protocol, clientName?) => {
const options = getClientConfigurationOption(
Expand Down
Loading

0 comments on commit 7683c0d

Please sign in to comment.