Skip to content

Commit

Permalink
cleanup checks
Browse files Browse the repository at this point in the history
  • Loading branch information
jackyzha0 committed Dec 8, 2023
1 parent cf4d598 commit faa9089
Show file tree
Hide file tree
Showing 8 changed files with 154 additions and 40 deletions.
75 changes: 60 additions & 15 deletions __tests__/e2e.test.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,12 @@
import { afterAll, assert, describe, expect, test } from 'vitest';
import {
afterAll,
afterEach,
assert,
beforeEach,
describe,
expect,
test,
} from 'vitest';
import {
createLocalWebSocketClient,
createWebSocketServer,
Expand All @@ -22,35 +30,53 @@ import { UNCAUGHT_ERROR } from '../router/result';
import { codecs } from '../codec/codec.test';
import { WebSocketClientTransport } from '../transport/impls/ws/client';
import { WebSocketServerTransport } from '../transport/impls/ws/server';
import {
ensureServerIsClean,
ensureTransportIsClean,
} from './fixtures/cleanup';

describe.each(codecs)(
'client <-> server integration test ($name codec)',
async ({ codec }) => {
const httpServer = http.createServer();
const port = await onServerReady(httpServer);
const webSocketServer = await createWebSocketServer(httpServer);
const getTransports = () =>
createWsTransports(port, webSocketServer, codec);

let clientTransport: WebSocketClientTransport;
let serverTransport: WebSocketServerTransport;

beforeEach(() => {
[clientTransport, serverTransport] = createWsTransports(
port,
webSocketServer,
codec,
);
});

afterEach(async () => {
await clientTransport.close();
await serverTransport.close();
ensureTransportIsClean(clientTransport);
ensureTransportIsClean(serverTransport);
});

afterAll(() => {
webSocketServer.clients.forEach((socket) => {
socket.close();
});
webSocketServer.close();
httpServer.close();
});

test('rpc', async () => {
const [clientTransport, serverTransport] = getTransports();
const serviceDefs = { test: TestServiceConstructor() };
const server = await createServer(serverTransport, serviceDefs);
const client = createClient<typeof server>(clientTransport);
const result = await client.test.add.rpc({ n: 3 });
assert(result.ok);
expect(result.payload).toStrictEqual({ result: 3 });
await server.close();
ensureServerIsClean(server);
});

test('fallible rpc', async () => {
const [clientTransport, serverTransport] = getTransports();
const serviceDefs = { test: FallibleServiceConstructor() };
const server = await createServer(serverTransport, serviceDefs);
const client = createClient<typeof server>(clientTransport);
Expand All @@ -66,10 +92,11 @@ describe.each(codecs)(
test: 'abc',
},
});
await server.close();
ensureServerIsClean(server);
});

test('rpc with binary (uint8array)', async () => {
const [clientTransport, serverTransport] = getTransports();
const serviceDefs = { test: BinaryFileServiceConstructor() };
const server = await createServer(serverTransport, serviceDefs);
const client = createClient<typeof server>(clientTransport);
Expand All @@ -79,10 +106,11 @@ describe.each(codecs)(
expect(new TextDecoder().decode(result.payload.contents)).toStrictEqual(
'contents for file test.py',
);
await server.close();
ensureServerIsClean(server);
});

test('stream', async () => {
const [clientTransport, serverTransport] = getTransports();
const serviceDefs = { test: TestServiceConstructor() };
const server = await createServer(serverTransport, serviceDefs);
const client = createClient<typeof server>(clientTransport);
Expand All @@ -106,14 +134,16 @@ describe.each(codecs)(
assert(result3.ok);
expect(result3.payload).toStrictEqual({ response: 'end' });

// after the server stream is ended, the client stream should be ended too
const result4 = await output.next();
assert(result4.done);

close();
await server.close();
ensureServerIsClean(server);
});

test('fallible stream', async () => {
const [clientTransport, serverTransport] = getTransports();
const serviceDefs = { test: FallibleServiceConstructor() };
const server = await createServer(serverTransport, serviceDefs);
const client = createClient<typeof server>(clientTransport);
Expand All @@ -137,6 +167,8 @@ describe.each(codecs)(
message: 'some message',
});
close();
await server.close();
ensureServerIsClean(server);
});

test('subscription', async () => {
Expand Down Expand Up @@ -195,10 +227,18 @@ describe.each(codecs)(

close1();
close2();

await client1Transport.close();
await client2Transport.close();
await serverTransport.close();
ensureTransportIsClean(client1Transport);
ensureTransportIsClean(client2Transport);
ensureTransportIsClean(serverTransport);
await server.close();
ensureServerIsClean(server);
});

test('message order is preserved in the face of disconnects', async () => {
const [clientTransport, serverTransport] = getTransports();
const serviceDefs = { test: OrderingServiceConstructor() };
const server = await createServer(serverTransport, serviceDefs);
const client = createClient<typeof server>(clientTransport);
Expand All @@ -222,12 +262,13 @@ describe.each(codecs)(

const res = await client.test.getAll.rpc({});
assert(res.ok);
return expect(res.payload.msgs).toStrictEqual(expected);
expect(res.payload.msgs).toStrictEqual(expected);
await server.close();
ensureServerIsClean(server);
});

const CONCURRENCY = 10;
test('concurrent rpcs', async () => {
const [clientTransport, serverTransport] = getTransports();
const serviceDefs = { test: OrderingServiceConstructor() };
const server = await createServer(serverTransport, serviceDefs);
const client = createClient<typeof server>(clientTransport);
Expand All @@ -242,10 +283,11 @@ describe.each(codecs)(
assert(result.ok);
expect(result.payload).toStrictEqual({ n: i });
}
await server.close();
ensureServerIsClean(server);
});

test('concurrent streams', async () => {
const [clientTransport, serverTransport] = getTransports();
const serviceDefs = { test: TestServiceConstructor() };
const server = await createServer(serverTransport, serviceDefs);
const client = createClient<typeof server>(clientTransport);
Expand Down Expand Up @@ -275,6 +317,9 @@ describe.each(codecs)(
const [_input, _output, close] = openStreams[i];
close();
}

await server.close();
ensureServerIsClean(server);
});
},
);
30 changes: 30 additions & 0 deletions __tests__/fixtures/cleanup.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import { expect } from 'vitest';
import { Connection, Transport } from '../../transport';
import { Server } from '../../router';

export function ensureTransportIsClean(t: Transport<Connection>) {
expect(t.state, 'transport should be closed after the test').to.not.equal(
'open',
);
expect(
t.connections,
'transport should not have open connections after the test',
).toStrictEqual(new Map());
expect(
t.messageHandlers,
'transport should not have open message handlers after the test',
).toStrictEqual(new Set());
expect(
t.sendQueue,
'transport should not have any messages its waiting to send after the test',
).toStrictEqual(new Map());
return true;
}

export function ensureServerIsClean(s: Server<unknown>) {
expect(
s.streams,
'server should not have any open streams after the test',
).toStrictEqual(new Map());
return true;
}
File renamed without changes.
38 changes: 27 additions & 11 deletions router/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import {
*/
export interface Server<Services> {
services: Services;
streams: Map<string, ProcStream>;
close(): Promise<void>;
}

Expand Down Expand Up @@ -114,15 +115,20 @@ export async function createServer<Services extends Record<string, AnyService>>(
}

// we ended, send a close bit back to the client
transport.send(
closeStream(
transport.clientId,
message.from,
message.serviceName,
message.procedureName,
message.streamId,
),
);
if (
procedure.type === 'subscription' ||
procedure.type === 'stream'
) {
transport.send(
closeStream(
transport.clientId,
message.from,
message.serviceName,
message.procedureName,
message.streamId,
),
);
}
})(),
];

Expand Down Expand Up @@ -216,22 +222,32 @@ export async function createServer<Services extends Record<string, AnyService>>(
}

if (isStreamClose(message.controlFlags)) {
console.log('closing stream');
procStream.incoming.end();
await Promise.all(procStream.openPromises);
// await Promise.all(procStream.openPromises);
await procStream.openPromises[1];
procStream.outgoing.end();
await procStream.openPromises[0];
console.log('stream closed');
streamMap.delete(streamIdx);
}
};

transport.addMessageListener(handler);
return {
services,
streams: streamMap,
async close() {
console.log(`closing server, ${streamMap.size} streams open`);
transport.removeMessageListener(handler);
for (const [_, stream] of streamMap) {
stream.incoming.end();
await Promise.all(stream.openPromises);
// await Promise.all(stream.openPromises);
await stream.openPromises[1];
stream.outgoing.end();
await stream.openPromises[0];
}
console.log('closed server');
},
};
}
5 changes: 5 additions & 0 deletions transport/impls/stdio/stdio.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import stream from 'node:stream';
import { StdioTransport } from './stdio';
import { waitForMessage } from '../..';
import { payloadToTransportMessage } from '../../../util/testHelpers';
import { ensureTransportIsClean } from '../../../__tests__/fixtures/cleanup';

describe('sending and receiving across node streams works', () => {
test('basic send/receive', async () => {
Expand Down Expand Up @@ -35,5 +36,9 @@ describe('sending and receiving across node streams works', () => {
);

await expect(p).resolves.toStrictEqual(msg);
await clientTransport.close();
await serverTransport.close();
ensureTransportIsClean(clientTransport);
ensureTransportIsClean(serverTransport);
});
});
1 change: 0 additions & 1 deletion transport/impls/stdio/stdio.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ export class StdioConnection extends Connection {
}

async close() {
this.transport.onDisconnect(this);
this.output.end();
}
}
Expand Down
Loading

0 comments on commit faa9089

Please sign in to comment.