Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RESP improvements #493

Merged
merged 25 commits into from
Dec 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
e6b43b5
feat(json-pack): 🎸 add utility to encode commands
streamich Dec 10, 2023
bb29891
feat(json-pack): 🎸 separate fast and slow command encoding options
streamich Dec 10, 2023
b61232f
feat: 🎸 implement ReconnectingSocket
streamich Dec 11, 2023
d820d34
feat: 🎸 start RedisSocket implementation
streamich Dec 11, 2023
8b17af4
feat: 🎸 track socket connection time
streamich Dec 11, 2023
5553348
feat: 🎸 add jitter to reconnection time calculation
streamich Dec 11, 2023
24c2e63
feat: 🎸 add RedisClient, buffer decoding calls
streamich Dec 11, 2023
23d9adb
feat: 🎸 buffer requests
streamich Dec 11, 2023
aa27e29
feat: 🎸 add possibility for fire-and-forget requests
streamich Dec 11, 2023
e779db7
feat(json-pack): 🎸 add support for null string and array decoding
streamich Dec 11, 2023
41c6f3c
feat(json-pack): 🎸 decode RESP object key blobs as ASCII strings
streamich Dec 11, 2023
ed49b04
feat: 🎸 start cluster client
streamich Dec 11, 2023
70b7f1a
feat(util): 🎸 add isUtf8() method to validate for UTF8 sequence validity
streamich Dec 12, 2023
7a549af
feat(json-pack): 🎸 add ability to try to decode blobs as UTF8
streamich Dec 12, 2023
c08abc5
fix(util): 🐛 correctly verify UTF8
streamich Dec 12, 2023
1540916
feat(util): 🎸 improve cluster client
streamich Dec 12, 2023
1ea2e3d
feat: 🎸 improve hello call
streamich Dec 12, 2023
b4cf8cb
feat: 🎸 add supporting cluster classes
streamich Dec 12, 2023
8091d1f
feat: 🎸 create cluster router
streamich Dec 12, 2023
6c6a71f
feat: 🎸 improve router refresh operation
streamich Dec 12, 2023
71f86ec
feat: 🎸 improve initial router table building
streamich Dec 12, 2023
5a7ff3d
feat: 🎸 add backoff for initial table builds
streamich Dec 12, 2023
6842d38
feat: 🎸 add methods for client picking
streamich Dec 12, 2023
54688bd
chore: 🤖 remoe redis code
streamich Dec 12, 2023
b098fe8
style: 💄 run Prettier
streamich Dec 12, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 46 additions & 14 deletions src/json-pack/resp/RespDecoder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,21 @@ import {RESP} from './constants';
import {RespAttributes, RespPush} from './extensions';
import type {IReader, IReaderResettable} from '../../util/buffers';
import type {BinaryJsonDecoder, PackValue} from '../types';
import {isUtf8} from '../../util/buffers/utf8/isUtf8';

export class RespDecoder<R extends IReader & IReaderResettable = IReader & IReaderResettable>
implements BinaryJsonDecoder
{
/**
* When set to true, the decoder will attempt to decode RESP Bulk strings
* (which are binary strings, i.e. Uint8Array) as UTF-8 strings. If the
* string is not valid UTF-8, it will be returned as a Uint8Array.
*
* You can toggle this setting at any time, before each call to `decode()`
* or `read()`, or other methods.
*/
public tryUtf8 = false;

public constructor(public reader: R = new Reader() as any) {}

public read(uint8: Uint8Array): PackValue {
Expand All @@ -32,20 +43,20 @@ export class RespDecoder<R extends IReader & IReaderResettable = IReader & IRead
return this.readFloat();
case RESP.STR_SIMPLE:
return this.readStrSimple();
case RESP.STR_VERBATIM:
return this.readStrVerbatim();
case RESP.STR_BULK:
return this.readStrBulk();
case RESP.BOOL:
return this.readBool();
case RESP.NULL:
return reader.skip(2), null;
case RESP.STR_BULK:
return this.readStrBulk();
case RESP.OBJ:
return this.readObj();
case RESP.ARR:
return this.readArr();
case RESP.STR_VERBATIM:
return this.readStrVerbatim();
case RESP.PUSH:
return new RespPush(this.readArr());
return new RespPush(this.readArr() || []);
case RESP.BIG:
return this.readBigint();
case RESP.SET:
Expand Down Expand Up @@ -158,6 +169,29 @@ export class RespDecoder<R extends IReader & IReaderResettable = IReader & IRead
}
}

public readStrBulk(): Uint8Array | string | null {
const reader = this.reader;
if (reader.peak() === RESP.MINUS) {
reader.skip(4); // Skip "-1\r\n".
return null;
}
const length = this.readLength();
let res: Uint8Array | string;
if (this.tryUtf8 && isUtf8(reader.uint8, reader.x, length)) res = reader.utf8(length);
else res = reader.buf(length);
reader.skip(2); // Skip "\r\n".
return res;
}

public readAsciiAsStrBulk(): string {
const reader = this.reader;
reader.skip(1); // Skip "$".
const length = this.readLength();
const buf = reader.ascii(length);
reader.skip(2); // Skip "\r\n".
return buf;
}

public readStrVerbatim(): string | Uint8Array {
const reader = this.reader;
const length = this.readLength();
Expand All @@ -173,14 +207,6 @@ export class RespDecoder<R extends IReader & IReaderResettable = IReader & IRead
return buf;
}

public readStrBulk(): Uint8Array {
const reader = this.reader;
const length = this.readLength();
const buf = reader.buf(length);
reader.skip(2); // Skip "\r\n".
return buf;
}

// ------------------------------------------------------------ Error reading

public readErrSimple(): Error {
Expand All @@ -207,7 +233,13 @@ export class RespDecoder<R extends IReader & IReaderResettable = IReader & IRead

// ------------------------------------------------------------ Array reading

public readArr(): unknown[] {
public readArr(): unknown[] | null {
const reader = this.reader;
const c = reader.peak();
if (c === RESP.MINUS) {
reader.skip(4); // Skip "-1\r\n".
return null;
}
const length = this.readLength();
const arr: unknown[] = [];
for (let i = 0; i < length; i++) arr.push(this.val());
Expand Down
45 changes: 44 additions & 1 deletion src/json-pack/resp/RespEncoder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,38 @@ export class RespEncoder<W extends IWriter & IWriterGrowable = IWriter & IWriter
}
}

public encodeCmd(args: unknown[]): Uint8Array {
this.writeCmd(args);
return this.writer.flush();
}

public writeCmd(args: unknown[]): void {
const length = args.length;
this.writeArrHdr(length);
for (let i = 0; i < length; i++) this.writeArg(args[i]);
}

public writeArg(arg: unknown): void {
if (arg instanceof Uint8Array) return this.writeBin(arg);
else this.writeBulkStrAscii(arg + '');
}

public encodeCmdUtf8(args: unknown[]): Uint8Array {
this.writeCmdUtf8(args);
return this.writer.flush();
}

public writeCmdUtf8(args: unknown[]): void {
const length = args.length;
this.writeArrHdr(length);
for (let i = 0; i < length; i++) this.writeArgUtf8(args[i]);
}

public writeArgUtf8(arg: unknown): void {
if (arg instanceof Uint8Array) return this.writeBin(arg);
else this.writeBulkStr(arg + '');
}

public writeNull(): void {
this.writer.u8u16(
RESP.NULL, // _
Expand Down Expand Up @@ -179,6 +211,17 @@ export class RespEncoder<W extends IWriter & IWriterGrowable = IWriter & IWriter
}

public writeBulkStr(str: string): void {
const writer = this.writer;
const size = utf8Size(str);
writer.u8(RESP.STR_BULK); // $
writer.ascii(size + '');
writer.u16(RESP.RN); // \r\n
writer.ensureCapacity(size);
writer.utf8(str);
writer.u16(RESP.RN); // \r\n
}

public writeBulkStrAscii(str: string): void {
const writer = this.writer;
writer.u8(RESP.STR_BULK); // $
writer.ascii(str.length + '');
Expand All @@ -190,7 +233,7 @@ export class RespEncoder<W extends IWriter & IWriterGrowable = IWriter & IWriter
public writeAsciiStr(str: string): void {
const isSimple = !REG_RN.test(str);
if (isSimple) this.writeSimpleStr(str);
else this.writeBulkStr(str);
else this.writeBulkStrAscii(str);
}

public writeVerbatimStr(encoding: string, str: string): void {
Expand Down
12 changes: 12 additions & 0 deletions src/json-pack/resp/RespStreamingDecoder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,18 @@ export class RespStreamingDecoder {
protected readonly reader = new StreamingReader();
protected readonly decoder = new RespDecoder(this.reader);

/**
* When set to true, the decoder will attempt to decode RESP Bulk strings
* (which are binary strings, i.e. Uint8Array) as UTF-8 strings. If the
* string is not valid UTF-8, it will be returned as a Uint8Array.
*/
public get tryUtf8(): boolean {
return this.decoder.tryUtf8;
}
public set tryUtf8(value: boolean) {
this.decoder.tryUtf8 = value;
}

/**
* Add a chunk of data to be decoded.
* @param uint8 `Uint8Array` chunk of data to be decoded.
Expand Down
31 changes: 31 additions & 0 deletions src/json-pack/resp/__tests__/RespDecoder.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import {RespDecoder} from '../RespDecoder';
import {bufferToUint8Array} from '../../../util/buffers/bufferToUint8Array';
import {RespAttributes, RespPush} from '../extensions';
import {Writer} from '../../../util/buffers/Writer';
import {Uint} from '@automerge/automerge';

const decode = (encoded: string | Uint8Array): unknown => {
const decoder = new RespDecoder();
Expand Down Expand Up @@ -171,8 +172,38 @@ const maps: [string, Record<string, unknown>][] = [

describe('objects', () => {
for (const [name, value] of maps) test(name, () => assertCodec(value));

describe('when .tryUtf8 = true', () => {
test('parses bulk strings as UTF8 strings', () => {
const encoded = '%1\r\n$3\r\nfoo\r\n$3\r\nbar\r\n';
const decoder = new RespDecoder();
decoder.tryUtf8 = true;
const decoded = decoder.read(Buffer.from(encoded));
expect(decoded).toStrictEqual({foo: 'bar'});
});

test('parses invalid UTF8 as Uint8Array', () => {
const encoded = encoder.encode({foo: new Uint8Array([0xc3, 0x28])});
const decoder = new RespDecoder();
decoder.tryUtf8 = true;
const decoded = decoder.read(encoded);
expect(decoded).toStrictEqual({foo: new Uint8Array([0xc3, 0x28])});
});
});
});

describe('attributes', () => {
for (const [name, value] of maps) test(name, () => assertCodec(new RespAttributes(value)));
});

describe('nulls', () => {
test('can decode string null', () => {
const decoded = decode('$-1\r\n');
expect(decoded).toBe(null);
});

test('can decode array null', () => {
const decoded = decode('*-1\r\n');
expect(decoded).toBe(null);
});
});
33 changes: 33 additions & 0 deletions src/json-pack/resp/__tests__/RespEncoder.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -312,3 +312,36 @@ describe('streaming data', () => {
});
});
});

describe('commands', () => {
describe('.writeCmd()', () => {
test('can encode a simple command', () => {
const encoder = new RespEncoder();
encoder.writeCmd(['SET', 'foo', 'bar']);
const encoded = encoder.writer.flush();
expect(toStr(encoded)).toBe('*3\r\n$3\r\nSET\r\n$3\r\nfoo\r\n$3\r\nbar\r\n');
});

test('casts numbers to strings', () => {
const encoder = new RespEncoder();
encoder.writeCmd(['SET', 'foo', 123]);
const encoded = encoder.writer.flush();
expect(toStr(encoded)).toBe('*3\r\n$3\r\nSET\r\n$3\r\nfoo\r\n$3\r\n123\r\n');
});

test('can encode Uint8Array', () => {
const encoder = new RespEncoder();
const encoded = encoder.encodeCmd([Buffer.from('SET'), 'foo', 123]);
expect(toStr(encoded)).toBe('*3\r\n$3\r\nSET\r\n$3\r\nfoo\r\n$3\r\n123\r\n');
});
});

describe('.can encode emojis()', () => {
test('can encode a simple command', () => {
const encoder = new RespEncoder();
encoder.writeCmdUtf8(['SET', 'foo 👍', 'bar']);
const encoded = encoder.writer.flush();
expect(toStr(encoded)).toBe('*3\r\n$3\r\nSET\r\n$8\r\nfoo 👍\r\n$3\r\nbar\r\n');
});
});
});
54 changes: 54 additions & 0 deletions src/util/buffers/utf8/__tests__/isUtf8.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import {isUtf8} from '../isUtf8';

describe('returns true for valid UTF8', () => {
const strings = [
'',
'hello',
'hello world',
'emoji: 🤔',
'russian: Привет',
'chinese: 你好',
'japanese: こんにちは',
'korean: 안녕하세요',
'arabic: مرحبا',
'hebrew: שלום',
'greek: γεια σας',
'bulgarian: Здравейте',
'hindi: नमस्ते',
'thai: สวัสดี',
'special chars: !@#$%^&*()_+{}|:"<>?`-=[]\\;\',./',
];
for (const str of strings) {
test(str, () => {
const buf = Buffer.from(str);
expect(isUtf8(buf, 0, buf.length)).toBe(true);
});
}
});

describe('returns false for non-UTF8 sequences', () => {
const strings: [name: string, Uint8Array][] = [
['two octets', Buffer.from([0xc3, 0x28])],
['three octets', Buffer.from([0xe2, 0x82, 0x28])],
['four octets', Buffer.from([0xf0, 0x90, 0x82, 0x28])],
['five octets', Buffer.from([0xf8, 0x88, 0x82, 0x82, 0x28])],
['six octets', Buffer.from([0xfc, 0x84, 0x82, 0x82, 0x82, 0x28])],
];
for (const [name, str] of strings) {
test(name, () => {
expect(isUtf8(str, 0, str.length)).toBe(false);
});
}
});

describe('returns true for valid non-UTF8 sequences in the middle of buffer', () => {
const strings: [name: string, str: Uint8Array, from: number, length: number][] = [
['mid char valid', Buffer.from([0xc3, 0x28, 64, 0xc3, 0x28]), 2, 1],
];
for (const [name, str, from, length] of strings) {
test(name, () => {
expect(isUtf8(str, from, length)).toBe(true);
expect(isUtf8(str, 0, from + length)).toBe(false);
});
}
});
45 changes: 45 additions & 0 deletions src/util/buffers/utf8/isUtf8.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/**
* Validates that the given data is valid UTF-8 text.
* @param buf Data to check.
* @returns True if the data is valid UTF-8.
*/
export const isUtf8 = (buf: Uint8Array, from: number, length: number): boolean => {
const to = from + length;
while (from < to) {
const c = buf[from];
if (c <= 0x7f) {
from++;
continue;
}
if (c >= 0xc2 && c <= 0xdf) {
if (buf[from + 1] >> 6 === 2) {
from += 2;
continue;
} else return false;
}
const c1 = buf[from + 1];
if (
((c === 0xe0 && c1 >= 0xa0 && c1 <= 0xbf) || (c === 0xed && c1 >= 0x80 && c1 <= 0x9f)) &&
buf[from + 2] >> 6 === 2
) {
from += 3;
continue;
}
if (((c >= 0xe1 && c <= 0xec) || (c >= 0xee && c <= 0xef)) && c1 >> 6 === 2 && buf[from + 2] >> 6 === 2) {
from += 3;
continue;
}
if (
((c === 0xf0 && c1 >= 0x90 && c1 <= 0xbf) ||
(c >= 0xf1 && c <= 0xf3 && c1 >> 6 === 2) ||
(c === 0xf4 && c1 >= 0x80 && c1 <= 0x8f)) &&
buf[from + 2] >> 6 === 2 &&
buf[from + 3] >> 6 === 2
) {
from += 4;
continue;
}
return false;
}
return true;
};
8 changes: 8 additions & 0 deletions src/util/trees/avl/AvlMap.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,10 @@ export class AvlMap<K, V> implements Printable {
this.root = remove(this.root, node as IAvlTreeNode<K, V>);
}

public clear(): void {
this.root = undefined;
}

public has(k: K): boolean {
return !!this.find(k);
}
Expand All @@ -81,6 +85,10 @@ export class AvlMap<K, V> implements Printable {
return size;
}

public isEmpty(): boolean {
return !this.root;
}

public getOrNextLower(k: K): AvlNode<K, V> | undefined {
return (findOrNextLower(this.root, k, this.comparator) as AvlNode<K, V>) || undefined;
}
Expand Down