Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support server-stream #16

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ It's like tRPC but...
## Developing

- `npm i` -- install dependencies
- `npm check` -- lint
- `npm format` -- format
- `npm test` -- run tests
- `npm run check` -- lint
- `npm run format` -- format
- `npm run test` -- run tests
- `npm publish` -- cut a new release (should bump version in package.json first)
21 changes: 21 additions & 0 deletions __tests__/e2e.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,27 @@ describe.each(codecs)(
close();
});

test('server-stream', async () => {
const [clientTransport, serverTransport] = getTransports();
const serviceDefs = { test: TestServiceConstructor() };
const server = await createServer(serverTransport, serviceDefs);
const client = createClient<typeof server>(clientTransport);

// TODO: Support the new interface.
const [input, output, close] = await client.test.events();
await input.push({});
await input.end();

const result = await client.test.add({ n: 3 });
assert(result.ok);
expect(result.payload).toStrictEqual({ result: 3 });
const streamResult = await output.next().then((res) => res.value);
assert(streamResult && streamResult.ok);
expect(streamResult.payload).toStrictEqual({ value: 3 });

close();
});

test('message order is preserved in the face of disconnects', async () => {
const [clientTransport, serverTransport] = getTransports();
const serviceDefs = { test: OrderingServiceConstructor() };
Expand Down
17 changes: 17 additions & 0 deletions __tests__/fixtures.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,21 @@ import { Type } from '@sinclair/typebox';
import { ServiceBuilder } from '../router/builder';
import { reply } from '../transport/message';
import { Err, Ok } from '../router/result';
import { EventEmitter } from 'node:events';

export const EchoRequest = Type.Object({
msg: Type.String(),
ignore: Type.Boolean(),
});
export const EchoResponse = Type.Object({ response: Type.String() });

export class NumberEmitter extends EventEmitter {}

export const TestServiceConstructor = () =>
ServiceBuilder.create('test')
.initialState({
count: 0,
emitter: new NumberEmitter(),
})
.defineProcedure('add', {
type: 'rpc',
Expand All @@ -22,9 +26,22 @@ export const TestServiceConstructor = () =>
async handler(ctx, msg) {
const { n } = msg.payload;
ctx.state.count += n;
ctx.state.emitter.emit('add', ctx.state.count);
return reply(msg, Ok({ result: ctx.state.count }));
},
})
.defineProcedure('events', {
type: 'server-stream',
input: Type.Object({}),
output: Type.Object({ value: Type.Number() }),
errors: Type.Never(),
async handler(ctx, msg, returnStream) {
// TODO: support stream cancellation.
ctx.state.emitter.on('add', (n) => {
returnStream.push(reply(msg, Ok({ value: n })));
});
},
})
.defineProcedure('echo', {
type: 'stream',
input: EchoRequest,
Expand Down
8 changes: 6 additions & 2 deletions __tests__/handler.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@ import {
FallibleServiceConstructor,
STREAM_ERROR,
TestServiceConstructor,
NumberEmitter,
} from './fixtures';
import { UNCAUGHT_ERROR } from '../router/result';

describe('server-side test', () => {
const service = TestServiceConstructor();
const initialState = { count: 0 };
const initialState = { count: 0, emitter: new NumberEmitter() };

test('rpc basic', async () => {
const add = asClientRpc(initialState, service.procedures.add);
Expand All @@ -20,7 +21,10 @@ describe('server-side test', () => {
});

test('rpc initial state', async () => {
const add = asClientRpc({ count: 5 }, service.procedures.add);
const add = asClientRpc(
{ count: 5, emitter: new NumberEmitter() },
service.procedures.add,
);
const result = await add({ n: 6 });
assert(result.ok);
expect(result.payload).toStrictEqual({ result: 11 });
Expand Down
21 changes: 20 additions & 1 deletion __tests__/serialize.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,18 @@ import {
BinaryFileServiceConstructor,
FallibleServiceConstructor,
TestServiceConstructor,
NumberEmitter,
} from './fixtures';

describe('serialize service to jsonschema', () => {
test('serialize basic service', () => {
const service = TestServiceConstructor();
expect(serializeService(service)).toStrictEqual({
name: 'test',
state: { count: 0 },
state: {
count: 0,
emitter: new NumberEmitter(),
},
procedures: {
add: {
input: {
Expand All @@ -31,6 +35,21 @@ describe('serialize service to jsonschema', () => {
errors: { not: {} },
type: 'rpc',
},
events: {
input: {
properties: {},
type: 'object',
},
output: {
properties: {
value: { type: 'number' },
},
required: ['value'],
type: 'object',
},
errors: { not: {} },
type: 'server-stream',
},
echo: {
input: {
properties: {
Expand Down
18 changes: 15 additions & 3 deletions router/builder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import { Result, RiverError, RiverUncaughtSchema } from './result';
/**
* The valid {@link Procedure} types.
*/
export type ValidProcType = 'stream' | 'rpc';
export type ValidProcType = 'stream' | 'server-stream' | 'rpc';

/**
* A generic procedure listing where the keys are the names of the procedures
Expand Down Expand Up @@ -111,9 +111,9 @@ export type ProcType<
> = S['procedures'][ProcName]['type'];

/**
* Defines a Procedure type that can be either an RPC or a stream procedure.
* Defines a Procedure type that can be either an RPC, server-stream, or a stream procedure.
* @template State - The TypeBox schema of the state object.
* @template Ty - The type of the procedure, either 'rpc' or 'stream'.
* @template Ty - The type of the procedure, either 'rpc', 'server-stream', or 'stream'.
* @template I - The TypeBox schema of the input object.
* @template O - The TypeBox schema of the output object.
*/
Expand All @@ -134,6 +134,18 @@ export type Procedure<
) => Promise<TransportMessage<Result<Static<O>, Static<E>>>>;
type: Ty;
}
: Ty extends 'server-stream'
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should these strings be enums?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

prolly when we add the client-stream

? {
input: I;
output: O;
errors: E;
handler: (
context: ServiceContextWithState<State>,
input: TransportMessage<Static<I>>,
output: Pushable<TransportMessage<Result<Static<O>, Static<E>>>>,
) => Promise<void>;
type: Ty;
}
: {
input: I;
output: O;
Expand Down
16 changes: 16 additions & 0 deletions router/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,21 @@ type ServiceClient<Router extends AnyService> = {
Static<ProcErrors<Router, ProcName>>
>
>
: ProcType<Router, ProcName> extends 'server-stream'
? // server-stream case
// TODO: This should have a single input
() => Promise<
[
Pushable<Static<ProcInput<Router, ProcName>>>, // input
AsyncIter<
Result<
Static<ProcOutput<Router, ProcName>>,
Static<ProcErrors<Router, ProcName>>
>
>, // output
() => void, // close handle
]
>
: // get stream case
() => Promise<
[
Expand Down Expand Up @@ -172,6 +187,7 @@ export const createClient = <Srv extends Server<Record<string, AnyService>>>(

return [inputStream, outputStream, closeHandler];
} else {
// TODO: how do we support server-stream?
// rpc case
const m = msg(
transport.clientId,
Expand Down
12 changes: 12 additions & 0 deletions router/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,18 @@ export async function createServer<Services extends Record<string, AnyService>>(
.handler(serviceContext, incoming, outgoing)
.catch(errorHandler),
);
} else if (procedure.type === 'server-stream') {
openPromises.push(
(async () => {
for await (const inputMessage of incoming) {
openPromises.push(
procedure
.handler(serviceContext, inputMessage, outgoing)
.catch(errorHandler),
);
}
})(),
);
} else if (procedure.type === 'rpc') {
openPromises.push(
(async () => {
Expand Down