Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RESP improvements #493

Merged
merged 25 commits into from
Dec 12, 2023
Merged
Changes from 1 commit
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
e6b43b5
feat(json-pack): 🎸 add utility to encode commands
streamich Dec 10, 2023
bb29891
feat(json-pack): 🎸 separate fast and slow command encoding options
streamich Dec 10, 2023
b61232f
feat: 🎸 implement ReconnectingSocket
streamich Dec 11, 2023
d820d34
feat: 🎸 start RedisSocket implementation
streamich Dec 11, 2023
8b17af4
feat: 🎸 track socket connection time
streamich Dec 11, 2023
5553348
feat: 🎸 add jitter to reconnection time calculation
streamich Dec 11, 2023
24c2e63
feat: 🎸 add RedisClient, buffer decoding calls
streamich Dec 11, 2023
23d9adb
feat: 🎸 buffer requests
streamich Dec 11, 2023
aa27e29
feat: 🎸 add possibility for fire-and-forget requests
streamich Dec 11, 2023
e779db7
feat(json-pack): 🎸 add support for null string and array decoding
streamich Dec 11, 2023
41c6f3c
feat(json-pack): 🎸 decode RESP object key blobs as ASCII strings
streamich Dec 11, 2023
ed49b04
feat: 🎸 start cluster client
streamich Dec 11, 2023
70b7f1a
feat(util): 🎸 add isUtf8() method to validate for UTF8 sequence validity
streamich Dec 12, 2023
7a549af
feat(json-pack): 🎸 add ability to try to decode blobs as UTF8
streamich Dec 12, 2023
c08abc5
fix(util): 🐛 correctly verify UTF8
streamich Dec 12, 2023
1540916
feat(util): 🎸 improve cluster client
streamich Dec 12, 2023
1ea2e3d
feat: 🎸 improve hello call
streamich Dec 12, 2023
b4cf8cb
feat: 🎸 add supporting cluster classes
streamich Dec 12, 2023
8091d1f
feat: 🎸 create cluster router
streamich Dec 12, 2023
6c6a71f
feat: 🎸 improve router refresh operation
streamich Dec 12, 2023
71f86ec
feat: 🎸 improve initial router table building
streamich Dec 12, 2023
5a7ff3d
feat: 🎸 add backoff for initial table builds
streamich Dec 12, 2023
6842d38
feat: 🎸 add methods for client picking
streamich Dec 12, 2023
54688bd
chore: 🤖 remoe redis code
streamich Dec 12, 2023
b098fe8
style: 💄 run Prettier
streamich Dec 12, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 35 additions & 23 deletions src/redis-client/RedisClient.ts
Original file line number Diff line number Diff line change
@@ -14,7 +14,7 @@ export class RedisClient {
protected readonly encoder: RespEncoder;
protected readonly decoder: RespStreamingDecoder;
protected readonly requests: unknown[][] = [];
protected readonly responses: Defer<unknown>[] = [];
protected readonly responses: Array<null | Defer<unknown>> = [];
protected encodingTimer?: NodeJS.Immediate = undefined;
protected decodingTimer?: NodeJS.Immediate = undefined;

@@ -26,19 +26,19 @@ export class RedisClient {
const decoder = this.decoder = opts.decoder;
socket.onData.listen((data) => {
decoder.push(data);
this.scheduleDecoding();
this.scheduleRead();
});
socket.onError.listen((err: Error) => {
console.log('err', err);
});
}

protected scheduleEncoding() {
protected scheduleWrite() {
if (this.encodingTimer) return;
this.encodingTimer = setImmediate(this.handleEncoding);
this.encodingTimer = setImmediate(this.handleWrite);
}

private readonly handleEncoding = () => {
private readonly handleWrite = () => {
try {
this.encodingTimer = undefined;
const requests = this.requests;
@@ -58,28 +58,34 @@ export class RedisClient {
}
};

protected scheduleDecoding() {
protected scheduleRead() {
if (this.decodingTimer) return;
this.decodingTimer = setImmediate(this.handleDecoding);
this.decodingTimer = setImmediate(this.handleRead);
}

private readonly handleDecoding = () => {
this.decodingTimer = undefined;
const decoder = this.decoder;
let msg;
let i = 0;
const responses = this.responses;
while ((msg = decoder.read()) !== undefined) {
const defer = responses[i++];
if (!defer) {
this.onProtocolError.reject(new Error('UNEXPECTED_RESPONSE'));
// TODO: reconnect socket ...
// TODO: clear client state ...
return;
private readonly handleRead = () => {
try {
this.decodingTimer = undefined;
const decoder = this.decoder;
const responses = this.responses;
const length = responses.length;
let i = 0;
for (; i < length; i++) {
const defer = responses[i];
if (defer instanceof Defer) {
const msg = decoder.read();
if (msg === undefined) break;
if (msg instanceof Error) defer.reject(msg); else defer.resolve(msg);
} else {
// const length = decoder.skip();
// if (!length) break;
}
}
if (msg instanceof Error) defer.reject(msg); else defer.resolve(msg);
if (i > 0) responses.splice(0, i);
} catch (error) {
// this.onProtocolError.reject(error);
// TODO: Re-establish socket ...
}
if (i > 0) responses.splice(0, i);
};

public start() {
@@ -94,10 +100,16 @@ export class RedisClient {
const defer = new Defer<unknown>();
this.requests.push(args);
this.responses.push(defer);
this.scheduleEncoding();
this.scheduleWrite();
return defer.promise;
}

public cmdSync(args: unknown[]): void {
this.requests.push(args);
this.responses.push(null);
this.scheduleWrite();
}

public async cmdUtf8(args: unknown[]): Promise<unknown> {
throw new Error('TODO');
}