-
Notifications
You must be signed in to change notification settings - Fork 7
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: support 1:n transports #17
Conversation
transport/types.ts
Outdated
@@ -92,6 +92,7 @@ export abstract class Transport { | |||
} | |||
|
|||
// handle actual message | |||
cb?.(parsedMsg); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a load-bearing callback that appears extremely innocuous. Related to my above comment, I wonder if there's
- A way to establish a list of client connections during connection, rather than having this mechanism that updates during any given message.
- A transport-agnostic way to handle 1:1 vs. 1:n rather than having logic within a given transport.
} received invalid payload: ${JSON.stringify(msg.payload)}`, | ||
); | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we still want to respect StreamClose
flag even if the payload is malformed
@@ -108,7 +110,8 @@ function _createRecursiveProxy( | |||
* @returns The client for the server. | |||
*/ | |||
export const createClient = <Srv extends Server<Record<string, AnyService>>>( | |||
transport: Transport, | |||
transport: Transport<Connection>, | |||
serverId: TransportClientId = 'SERVER', |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice.
@@ -48,7 +49,7 @@ interface ProcStream { | |||
* @returns A promise that resolves to a server instance with the registered services. | |||
*/ | |||
export async function createServer<Services extends Record<string, AnyService>>( | |||
transport: Transport, | |||
transport: Transport<Connection>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Still feels a little opaque that a client gets a default server ID of "SERVER", but the developer must know to set their server's ID to "SERVER" in their transport. I wouldn't worry about figuring out a better way now though, maybe just something to keep in mind for later
@@ -179,8 +172,17 @@ export async function createServer<Services extends Record<string, AnyService>>( | |||
return; | |||
} | |||
|
|||
procStream.incoming.push(inputMessage); | |||
if (isStreamClose(inputMessage.controlFlags)) { | |||
if (Value.Check(procedure.input, msg.payload)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why did check move down here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the old implementation was wrong where it would respect the stream open bit for malformed payloads (because it happens before the payload check) but not the stream close bit check (because its after the early return). this just moves it and removes the early return so that we respect both bits
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
* Any River transport methods need to implement this interface. | ||
* @abstract | ||
*/ | ||
export abstract class Transport<ConnType extends Connection> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My main comment to Jacky (in slack) was:
"if transport impl and connection impls are always 1:1 (like, what is a WebSocketServerTransport w/ a StdioConnection ??), then using a generic here feels like it complicates things without it being necessary"
However, I didn't have a better implementation at the ready, so i think we movefast & update later if necessary.
* @param msg The message to parse. | ||
* @returns The parsed message, or null if the message is malformed or invalid. | ||
*/ | ||
protected parseMsg(msg: Uint8Array): OpaqueTransportMessage | null { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know a lot of this is just copied from types.ts
Currently, the isomorphic
WebSocketTransport
assumes a 1:1 connection. This isn't always true (especially on the server)This PR refactors
Transport
and introducesConnection
so that there is a 1:n relationship betweenTransport
s andConnection
s.A valid transport is expected to define its
Connection
type. This also adds aconnections
map to the transport so that we can extend and add connection update listeners to the client.