Skip to content

Commit

Permalink
Encode messages before storing in send buffer
Browse files Browse the repository at this point in the history
  • Loading branch information
cbrewster committed Oct 24, 2024
1 parent 9b4912d commit 577c1d7
Show file tree
Hide file tree
Showing 7 changed files with 63 additions and 42 deletions.
15 changes: 11 additions & 4 deletions testUtil/fixtures/cleanup.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { expect, vi } from 'vitest';
import { assert, expect, vi } from 'vitest';
import {
ClientTransport,
Connection,
Expand Down Expand Up @@ -68,9 +68,16 @@ export async function ensureTransportBuffersAreEventuallyEmpty(
[...t.sessions]
.map(([client, sess]) => {
// get all messages that are not heartbeats
const buff = sess.sendBuffer.filter((msg) => {
return !Value.Check(ControlMessageAckSchema, msg.payload);
});
const buff = sess.sendBuffer
.map((encodedMsg) => {
const msg = sess.parseMsg(encodedMsg.data);
assert(msg);

return msg;
})
.filter(
(msg) => !Value.Check(ControlMessageAckSchema, msg.payload),
);

return [client, buff] as [
string,
Expand Down
11 changes: 11 additions & 0 deletions transport/message.ts
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,17 @@ export function cancelMessage(
export type OpaqueTransportMessage = TransportMessage;
export type TransportClientId = string;

/**
* An encoded message that is ready to be send over the transport.
* The seq number is kept to keep track of which messages have been
* acked by the peer.
*/
export interface EncodedTransportMessage {
id: string;
seq: number;
data: Uint8Array;
}

/**
* Checks if the given control flag (usually found in msg.controlFlag) is an ack message.
* @param controlFlag - The control flag to check.
Expand Down
10 changes: 5 additions & 5 deletions transport/sessionStateMachine/SessionConnected.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,11 @@ export class SessionConnected<
}

send(msg: PartialTransportMessage): string {
const constructedMsg = this.constructMsg(msg);
this.sendBuffer.push(constructedMsg);
this.conn.send(this.options.codec.toBuffer(constructedMsg));
const encodedMsg = this.encodeMsg(msg);
this.sendBuffer.push(encodedMsg);
this.conn.send(encodedMsg.data);

return constructedMsg.id;
return encodedMsg.id;
}

constructor(props: SessionConnectedProps<ConnType>) {
Expand All @@ -75,7 +75,7 @@ export class SessionConnected<
);

for (const msg of this.sendBuffer) {
this.conn.send(this.options.codec.toBuffer(msg));
this.conn.send(msg.data);
}
}

Expand Down
20 changes: 13 additions & 7 deletions transport/sessionStateMachine/common.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import { Logger, MessageMetadata } from '../../logging';
import { TelemetryInfo } from '../../tracing';
import {
EncodedTransportMessage,
OpaqueTransportMessage,
OpaqueTransportMessageSchema,
PartialTransportMessage,
ProtocolVersion,
TransportClientId,
TransportMessage,
} from '../message';
import { Value } from '@sinclair/typebox/value';
import { Codec } from '../../codec';
Expand Down Expand Up @@ -204,7 +204,7 @@ export interface IdentifiedSessionProps extends CommonSessionProps {
to: TransportClientId;
seq: number;
ack: number;
sendBuffer: Array<OpaqueTransportMessage>;
sendBuffer: Array<EncodedTransportMessage>;
telemetry: TelemetryInfo;
protocolVersion: ProtocolVersion;
}
Expand All @@ -224,7 +224,7 @@ export abstract class IdentifiedSession extends CommonSession {
* Number of unique messages we've received this session (excluding handshake)
*/
ack: number;
sendBuffer: Array<OpaqueTransportMessage>;
sendBuffer: Array<EncodedTransportMessage>;

constructor(props: IdentifiedSessionProps) {
const { id, to, seq, ack, sendBuffer, telemetry, log, protocolVersion } =
Expand Down Expand Up @@ -258,9 +258,9 @@ export abstract class IdentifiedSession extends CommonSession {
return metadata;
}

constructMsg<Payload>(
encodeMsg<Payload>(
partialMsg: PartialTransportMessage<Payload>,
): TransportMessage<Payload> {
): EncodedTransportMessage {
const msg = {
...partialMsg,
id: generateId(),
Expand All @@ -270,17 +270,23 @@ export abstract class IdentifiedSession extends CommonSession {
ack: this.ack,
};

const encodedMsg = {
id: msg.id,
seq: msg.seq,
data: this.options.codec.toBuffer(msg),
};

this.seq++;

return msg;
return encodedMsg;
}

nextSeq(): number {
return this.sendBuffer.length > 0 ? this.sendBuffer[0].seq : this.seq;
}

send(msg: PartialTransportMessage): string {
const constructedMsg = this.constructMsg(msg);
const constructedMsg = this.encodeMsg(msg);
this.sendBuffer.push(constructedMsg);

return constructedMsg.id;
Expand Down
36 changes: 16 additions & 20 deletions transport/sessionStateMachine/stateMachine.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1891,8 +1891,8 @@ describe('session state machine', () => {
expect(onConnectionClosed).not.toHaveBeenCalled();
expect(onConnectionErrored).not.toHaveBeenCalled();

const msg = session.constructMsg(payloadToTransportMessage('hello'));
session.conn.emitData(session.options.codec.toBuffer(msg));
const msg = session.encodeMsg(payloadToTransportMessage('hello'));
session.conn.emitData(msg.data);

await waitFor(async () => {
expect(onMessage).toHaveBeenCalledTimes(1);
Expand Down Expand Up @@ -1940,15 +1940,13 @@ describe('session state machine', () => {

// send a heartbeat
conn.emitData(
session.options.codec.toBuffer(
session.constructMsg({
streamId: 'heartbeat',
controlFlags: ControlFlags.AckBit,
payload: {
type: 'ACK',
} satisfies Static<typeof ControlMessageAckSchema>,
}),
),
session.encodeMsg({
streamId: 'heartbeat',
controlFlags: ControlFlags.AckBit,
payload: {
type: 'ACK',
} satisfies Static<typeof ControlMessageAckSchema>,
}).data,
);

// make sure the session acks the heartbeat
Expand All @@ -1962,15 +1960,13 @@ describe('session state machine', () => {

// send a heartbeat
conn.emitData(
session.options.codec.toBuffer(
session.constructMsg({
streamId: 'heartbeat',
controlFlags: ControlFlags.AckBit,
payload: {
type: 'ACK',
} satisfies Static<typeof ControlMessageAckSchema>,
}),
),
session.encodeMsg({
streamId: 'heartbeat',
controlFlags: ControlFlags.AckBit,
payload: {
type: 'ACK',
} satisfies Static<typeof ControlMessageAckSchema>,
}).data,
);

expect(sessionHandle.onMessage).not.toHaveBeenCalled();
Expand Down
9 changes: 6 additions & 3 deletions transport/sessionStateMachine/transitions.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import { OpaqueTransportMessage, TransportClientId } from '..';
import {
SessionConnecting,
SessionConnectingListeners,
Expand Down Expand Up @@ -38,7 +37,11 @@ import {
SessionBackingOff,
SessionBackingOffListeners,
} from './SessionBackingOff';
import { ProtocolVersion } from '../message';
import {
EncodedTransportMessage,
ProtocolVersion,
TransportClientId,
} from '../message';

function inheritSharedSession(
session: IdentifiedSession,
Expand Down Expand Up @@ -78,7 +81,7 @@ export const SessionStateGraph = {
) => {
const id = `session-${generateId()}`;
const telemetry = createSessionTelemetryInfo(id, to, from);
const sendBuffer: Array<OpaqueTransportMessage> = [];
const sendBuffer: Array<EncodedTransportMessage> = [];

const session = new SessionNoConnection({
listeners,
Expand Down
4 changes: 1 addition & 3 deletions transport/transport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,7 @@ export interface DeleteSessionOptions {
unhealthy: boolean;
}

export type SessionBoundSendFn = (
msg: PartialTransportMessage,
) => string | undefined;
export type SessionBoundSendFn = (msg: PartialTransportMessage) => string;

/**
* Transports manage the lifecycle (creation/deletion) of sessions
Expand Down

0 comments on commit 577c1d7

Please sign in to comment.