Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

client transport .hardDisconnect #273

Merged
merged 5 commits into from
Oct 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "@replit/river",
"description": "It's like tRPC but... with JSON Schema Support, duplex streaming and support for service multiplexing. Transport agnostic!",
"version": "0.202.0",
"version": "0.203.0",
"type": "module",
"exports": {
".": {
Expand Down
11 changes: 11 additions & 0 deletions transport/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,17 @@ export abstract class ClientTransport<
this.updateSession(backingOffSession);
}

/**
* Manually kills all sessions to the server (including all pending state).
* This is useful for when you want to close all connections to a server
* and don't want to wait for the grace period to elapse.
*/
hardDisconnect() {
for (const session of this.sessions.values()) {
this.deleteSession(session);
}
}

protected onBackoffFinished(session: SessionBackingOff) {
const connPromise = tracer.startActiveSpan('connect', async (span) => {
try {
Expand Down
8 changes: 8 additions & 0 deletions transport/sessionStateMachine/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,14 @@ abstract class StateMachineState {
// by consumers, the proxy will call this when .close is closed
abstract _handleClose(): void;

/**
* Cleanup this state machine state and mark it as consumed.
* After calling close, it is an error to access any properties on the state.
* You should never need to call this as a consumer.
*
* If you're looking to close the session from the client,
* use `.hardDisconnect` on the client transport.
*/
close(): void {
this._handleClose();
}
Expand Down
37 changes: 37 additions & 0 deletions transport/transport.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -818,6 +818,43 @@ describe.each(testMatrix())(
});
});

test('client transport calling .hardDisconnect() immediately kills the session and updates bookkeeping', async () => {
const clientTransport = testHelpers.getClientTransport('client');
const serverTransport = testHelpers.getServerTransport();
clientTransport.connect(serverTransport.clientId);

addPostTestCleanup(async () => {
await cleanupTransports([clientTransport, serverTransport]);
});

await waitFor(() => {
expect(numberOfConnections(clientTransport)).toBe(1);
expect(numberOfConnections(serverTransport)).toBe(1);
});

const oldClientSessionId = serverTransport.sessions.get('client')?.id;
const oldServerSessionId = clientTransport.sessions.get('SERVER')?.id;
expect(oldClientSessionId).not.toBeUndefined();
expect(oldServerSessionId).not.toBeUndefined();

clientTransport.hardDisconnect();

expect(numberOfConnections(clientTransport)).toBe(0);
expect(clientTransport.sessions.size).toBe(0);

await advanceFakeTimersByDisconnectGrace();
await advanceFakeTimersBySessionGrace();
await waitFor(() => {
expect(numberOfConnections(serverTransport)).toBe(0);
expect(serverTransport.sessions.size).toBe(0);
});

await testFinishesCleanly({
clientTransports: [clientTransport],
serverTransport,
});
});

// make a custom auth thing that rejects all connections
// session grace should elapse at some point despite retry loop
test('session grace elapses during long reconnect loop', async () => {
Expand Down
Loading