Skip to content

Commit

Permalink
feat: support server-stream
Browse files Browse the repository at this point in the history
gRPC supports four kinds of RPCs: unary, client-stream, server-stream
and bidirectional. We want to support them all too.

This change adds support for server-stream RPCs (one message from the
client, a stream of messages from the server). This is pending real
support for knowing what the type of each function is, but should
unblock progress for now.
  • Loading branch information
lhchavez committed Dec 6, 2023
1 parent 40827af commit 2400159
Show file tree
Hide file tree
Showing 8 changed files with 110 additions and 9 deletions.
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ It's like tRPC but...
## Developing

- `npm i` -- install dependencies
- `npm check` -- lint
- `npm format` -- format
- `npm test` -- run tests
- `npm run check` -- lint
- `npm run format` -- format
- `npm run test` -- run tests
- `npm publish` -- cut a new release (should bump version in package.json first)
21 changes: 21 additions & 0 deletions __tests__/e2e.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,27 @@ describe.each(codecs)(
close();
});

test('server-stream', async () => {
const [clientTransport, serverTransport] = getTransports();
const serviceDefs = { test: TestServiceConstructor() };
const server = await createServer(serverTransport, serviceDefs);
const client = createClient<typeof server>(clientTransport);

// TODO: Support the new interface.
const [input, output, close] = await client.test.events();
await input.push({});
await input.end();

const result = await client.test.add({ n: 3 });
assert(result.ok);
expect(result.payload).toStrictEqual({ result: 3 });
const streamResult = await output.next().then((res) => res.value);
assert(streamResult && streamResult.ok);
expect(streamResult.payload).toStrictEqual({ value: 3 });

close();
});

test('message order is preserved in the face of disconnects', async () => {
const [clientTransport, serverTransport] = getTransports();
const serviceDefs = { test: OrderingServiceConstructor() };
Expand Down
17 changes: 17 additions & 0 deletions __tests__/fixtures.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,21 @@ import { Type } from '@sinclair/typebox';
import { ServiceBuilder } from '../router/builder';
import { reply } from '../transport/message';
import { Err, Ok } from '../router/result';
import { EventEmitter } from 'node:events';

export const EchoRequest = Type.Object({
msg: Type.String(),
ignore: Type.Boolean(),
});
export const EchoResponse = Type.Object({ response: Type.String() });

export class NumberEmitter extends EventEmitter {}

export const TestServiceConstructor = () =>
ServiceBuilder.create('test')
.initialState({
count: 0,
emitter: new NumberEmitter(),
})
.defineProcedure('add', {
type: 'rpc',
Expand All @@ -22,9 +26,22 @@ export const TestServiceConstructor = () =>
async handler(ctx, msg) {
const { n } = msg.payload;
ctx.state.count += n;
ctx.state.emitter.emit('add', ctx.state.count);
return reply(msg, Ok({ result: ctx.state.count }));
},
})
.defineProcedure('events', {
type: 'server-stream',
input: Type.Object({}),
output: Type.Object({ value: Type.Number() }),
errors: Type.Never(),
async handler(ctx, msg, returnStream) {
// TODO: support stream cancellation.
ctx.state.emitter.on('add', (n) => {
returnStream.push(reply(msg, Ok({ value: n })));
});
},
})
.defineProcedure('echo', {
type: 'stream',
input: EchoRequest,
Expand Down
8 changes: 6 additions & 2 deletions __tests__/handler.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@ import {
FallibleServiceConstructor,
STREAM_ERROR,
TestServiceConstructor,
NumberEmitter,
} from './fixtures';
import { UNCAUGHT_ERROR } from '../router/result';

describe('server-side test', () => {
const service = TestServiceConstructor();
const initialState = { count: 0 };
const initialState = { count: 0, emitter: new NumberEmitter() };

test('rpc basic', async () => {
const add = asClientRpc(initialState, service.procedures.add);
Expand All @@ -20,7 +21,10 @@ describe('server-side test', () => {
});

test('rpc initial state', async () => {
const add = asClientRpc({ count: 5 }, service.procedures.add);
const add = asClientRpc(
{ count: 5, emitter: new NumberEmitter() },
service.procedures.add,
);
const result = await add({ n: 6 });
assert(result.ok);
expect(result.payload).toStrictEqual({ result: 11 });
Expand Down
21 changes: 20 additions & 1 deletion __tests__/serialize.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,18 @@ import {
BinaryFileServiceConstructor,
FallibleServiceConstructor,
TestServiceConstructor,
NumberEmitter,
} from './fixtures';

describe('serialize service to jsonschema', () => {
test('serialize basic service', () => {
const service = TestServiceConstructor();
expect(serializeService(service)).toStrictEqual({
name: 'test',
state: { count: 0 },
state: {
count: 0,
emitter: new NumberEmitter(),
},
procedures: {
add: {
input: {
Expand All @@ -31,6 +35,21 @@ describe('serialize service to jsonschema', () => {
errors: { not: {} },
type: 'rpc',
},
events: {
input: {
properties: {},
type: 'object',
},
output: {
properties: {
value: { type: 'number' },
},
required: ['value'],
type: 'object',
},
errors: { not: {} },
type: 'server-stream',
},
echo: {
input: {
properties: {
Expand Down
18 changes: 15 additions & 3 deletions router/builder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import { Result, RiverError, RiverUncaughtSchema } from './result';
/**
* The valid {@link Procedure} types.
*/
export type ValidProcType = 'stream' | 'rpc';
export type ValidProcType = 'stream' | 'server-stream' | 'rpc';

/**
* A generic procedure listing where the keys are the names of the procedures
Expand Down Expand Up @@ -111,9 +111,9 @@ export type ProcType<
> = S['procedures'][ProcName]['type'];

/**
* Defines a Procedure type that can be either an RPC or a stream procedure.
* Defines a Procedure type that can be either an RPC, server-stream, or a stream procedure.
* @template State - The TypeBox schema of the state object.
* @template Ty - The type of the procedure, either 'rpc' or 'stream'.
* @template Ty - The type of the procedure, either 'rpc', 'server-stream', or 'stream'.
* @template I - The TypeBox schema of the input object.
* @template O - The TypeBox schema of the output object.
*/
Expand All @@ -134,6 +134,18 @@ export type Procedure<
) => Promise<TransportMessage<Result<Static<O>, Static<E>>>>;
type: Ty;
}
: Ty extends 'server-stream'
? {
input: I;
output: O;
errors: E;
handler: (
context: ServiceContextWithState<State>,
input: TransportMessage<Static<I>>,
output: Pushable<TransportMessage<Result<Static<O>, Static<E>>>>,
) => Promise<void>;
type: Ty;
}
: {
input: I;
output: O;
Expand Down
16 changes: 16 additions & 0 deletions router/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,21 @@ type ServiceClient<Router extends AnyService> = {
Static<ProcErrors<Router, ProcName>>
>
>
: ProcType<Router, ProcName> extends 'server-stream'
? // server-stream case
// TODO: This should have a single input
() => Promise<
[
Pushable<Static<ProcInput<Router, ProcName>>>, // input
AsyncIter<
Result<
Static<ProcOutput<Router, ProcName>>,
Static<ProcErrors<Router, ProcName>>
>
>, // output
() => void, // close handle
]
>
: // get stream case
() => Promise<
[
Expand Down Expand Up @@ -172,6 +187,7 @@ export const createClient = <Srv extends Server<Record<string, AnyService>>>(

return [inputStream, outputStream, closeHandler];
} else {
// TODO: how do we support server-stream?
// rpc case
const m = msg(
transport.clientId,
Expand Down
12 changes: 12 additions & 0 deletions router/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,18 @@ export async function createServer<Services extends Record<string, AnyService>>(
.handler(serviceContext, incoming, outgoing)
.catch(errorHandler),
);
} else if (procedure.type === 'server-stream') {
openPromises.push(
(async () => {
for await (const inputMessage of incoming) {
openPromises.push(
procedure
.handler(serviceContext, inputMessage, outgoing)
.catch(errorHandler),
);
}
})(),
);
} else if (procedure.type === 'rpc') {
openPromises.push(
(async () => {
Expand Down

0 comments on commit 2400159

Please sign in to comment.