Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DTP-1118] Apply ConnectionDetails.maxMessageSize limit to state messages #1963

Open
wants to merge 4 commits into
base: integration/liveobjects
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/common/lib/client/defaultrealtime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import {
import { Http } from 'common/types/http';
import Defaults from '../util/defaults';
import Logger from '../util/logger';
import { MessageEncoding } from '../types/message';

/**
`DefaultRealtime` is the class that the non tree-shakable version of the SDK exports as `Realtime`. It ensures that this version of the SDK includes all of the functionality which is optionally available in the tree-shakable version.
Expand Down Expand Up @@ -71,4 +72,5 @@ export class DefaultRealtime extends BaseRealtime {
// Used by tests
static _Http = Http;
static _PresenceMap = PresenceMap;
static _MessageEncoding = MessageEncoding;
}
6 changes: 1 addition & 5 deletions src/common/lib/client/realtimechannel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -265,11 +265,7 @@ class RealtimeChannel extends EventEmitter {
const size = getMessagesSize(messages);
if (size > maxMessageSize) {
throw new ErrorInfo(
'Maximum size of messages that can be published at once exceeded ( was ' +
size +
' bytes; limit is ' +
maxMessageSize +
' bytes)',
`Maximum size of messages that can be published at once exceeded (was ${size} bytes; limit is ${maxMessageSize} bytes)`,
40009,
400,
);
Expand Down
6 changes: 1 addition & 5 deletions src/common/lib/client/restchannel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -117,11 +117,7 @@ class RestChannel {
maxMessageSize = options.maxMessageSize;
if (size > maxMessageSize) {
throw new ErrorInfo(
'Maximum size of messages that can be published at once exceeded ( was ' +
size +
' bytes; limit is ' +
maxMessageSize +
' bytes)',
`Maximum size of messages that can be published at once exceeded (was ${size} bytes; limit is ${maxMessageSize} bytes)`,
40009,
400,
);
Expand Down
16 changes: 12 additions & 4 deletions src/common/lib/util/utils.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import Platform from 'common/platform';
import Platform, { Bufferlike } from 'common/platform';
import ErrorInfo, { PartialErrorInfo } from 'common/lib/types/errorinfo';
import { ModularPlugins } from '../client/modularplugins';
import { MsgPack } from 'common/types/msgpack';
Expand Down Expand Up @@ -279,15 +279,23 @@ export function inspectBody(body: unknown): string {
}
}

/* Data is assumed to be either a string or a buffer. */
export function dataSizeBytes(data: string | Buffer): number {
/* Data is assumed to be either a string, a number, a boolean or a buffer. */
export function dataSizeBytes(data: string | number | boolean | Bufferlike): number {
if (Platform.BufferUtils.isBuffer(data)) {
return Platform.BufferUtils.byteLength(data);
}
if (typeof data === 'string') {
return Platform.Config.stringByteSize(data);
}
throw new Error('Expected input of Utils.dataSizeBytes to be a buffer or string, but was: ' + typeof data);
if (typeof data === 'number') {
return 8;
}
if (typeof data === 'boolean') {
return 1;
}
throw new Error(
`Expected input of Utils.dataSizeBytes to be a string, a number, a boolean or a buffer, but was: ${typeof data}`,
);
}

export function cheapRandStr(): string {
Expand Down
9 changes: 9 additions & 0 deletions src/plugins/liveobjects/liveobjects.ts
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,15 @@ export class LiveObjects {
}

stateMessages.forEach((x) => StateMessage.encode(x, this._client.MessageEncoding));
const maxMessageSize = this._client.options.maxMessageSize;
const size = stateMessages.reduce((acc, msg) => acc + msg.getMessageSize(), 0);
if (size > maxMessageSize) {
throw new this._client.ErrorInfo(
`Maximum size of state messages that can be published at once exceeded (was ${size} bytes; limit is ${maxMessageSize} bytes)`,
40009,
400,
);
}

return this._channel.sendState(stateMessages);
}
Expand Down
117 changes: 117 additions & 0 deletions src/plugins/liveobjects/statemessage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -445,4 +445,121 @@ export class StateMessage {

return result;
}

getMessageSize(): number {
let size = 0;

size += this.clientId?.length ?? 0;
if (this.operation) {
size += this._getStateOperationSize(this.operation);
}
if (this.object) {
size += this._getStateObjectSize(this.object);
}
if (this.extras) {
size += JSON.stringify(this.extras).length;
}

return size;
}

private _getStateOperationSize(operation: StateOperation): number {
let size = 0;

if (operation.mapOp) {
size += this._getStateMapOpSize(operation.mapOp);
}
if (operation.counterOp) {
size += this._getStateCounterOpSize(operation.counterOp);
}
if (operation.map) {
size += this._getStateMapSize(operation.map);
}
if (operation.counter) {
size += this._getStateCounterSize(operation.counter);
}

return size;
}

private _getStateObjectSize(obj: StateObject): number {
let size = 0;

if (obj.map) {
size += this._getStateMapSize(obj.map);
}
if (obj.counter) {
size += this._getStateCounterSize(obj.counter);
}
if (obj.createOp) {
size += this._getStateOperationSize(obj.createOp);
}
Comment on lines +494 to +496
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the createOp exists on the StateObject for the correct operation of the CRDT semantics and is not something that I'd expect the dev/customer to be setting.

This getMessageSize() is used on publish to liveObjects. But I can't think of the case where we'd be publishing a StateObject rather than a StateOperation (?) so perhaps it's neither here-nor-there.

Copy link
Contributor Author

@VeskeR VeskeR Jan 31, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But I can't think of the case where we'd be publishing a StateObject rather than a StateOperation

Yeah, the client currently never publishes StateObject messages. However, we did discuss the possibility of this happening in the future with Yjs integration, where documents may need to be synced two-way on attachment using STATE_SYNC with StateObject messages. But for now it's very much up in the air.

Even though the client doesn’t publish StateObject at the moment, the additional complexity of handling the calculation is minimal. It’s just a single method, which already reuses other methods required to calculate StateOperation anyway, and one this._getStateObjectSize(this.object) call just a few lines above.

In return, we get parity with the realtime size calculation implementation - even the tests can be shared.

I think the createOp exists on the StateObject for the correct operation of the CRDT semantics and is not something that I'd expect the dev/customer to be setting.

If StateObject size calculation is to satay, then we should definitely include createOp, as it carries user-provided data.


return size;
}

private _getStateMapSize(map: StateMap): number {
let size = 0;

Object.entries(map.entries ?? {}).forEach(([key, entry]) => {
size += key?.length ?? 0;
if (entry) {
size += this._getStateMapEntrySize(entry);
}
});

return size;
}

private _getStateCounterSize(counter: StateCounter): number {
if (counter.count == null) {
return 0;
}

return 8;
}

private _getStateMapEntrySize(entry: StateMapEntry): number {
let size = 0;

if (entry.data) {
size += this._getStateDataSize(entry.data);
}

return size;
}

private _getStateMapOpSize(mapOp: StateMapOp): number {
let size = 0;

size += mapOp.key?.length ?? 0;

if (mapOp.data) {
size += this._getStateDataSize(mapOp.data);
}

return size;
}

private _getStateCounterOpSize(operation: StateCounterOp): number {
if (operation.amount == null) {
return 0;
}

return 8;
}

private _getStateDataSize(data: StateData): number {
let size = 0;

if (data.value) {
size += this._getStateValueSize(data.value);
}

return size;
}

private _getStateValueSize(value: StateValue): number {
return this._utils.dataSizeBytes(value);
}
}
6 changes: 5 additions & 1 deletion test/common/modules/private_api_recorder.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ define(['test/support/output_directory_paths'], function (outputDirectoryPaths)
'call.Defaults.getPort',
'call.Defaults.normaliseOptions',
'call.EventEmitter.emit',
'call.LiveObject.getObjectId',
'call.LiveObject.isTombstoned',
'call.LiveObjects._liveObjectsPool._onGCInterval',
'call.LiveObjects._liveObjectsPool.get',
Expand All @@ -25,7 +26,11 @@ define(['test/support/output_directory_paths'], function (outputDirectoryPaths)
'call.Platform.nextTick',
'call.PresenceMessage.fromValues',
'call.ProtocolMessage.setFlag',
'call.StateMessage.encode',
'call.StateMessage.fromValues',
'call.StateMessage.getMessageSize',
'call.Utils.copy',
'call.Utils.dataSizeBytes',
'call.Utils.getRetryTime',
'call.Utils.inspectError',
'call.Utils.keysArray',
Expand All @@ -47,7 +52,6 @@ define(['test/support/output_directory_paths'], function (outputDirectoryPaths)
'call.http._getHosts',
'call.http.checkConnectivity',
'call.http.doUri',
'call.LiveObject.getObjectId',
'call.msgpack.decode',
'call.msgpack.encode',
'call.presence._myMembers.put',
Expand Down
Loading
Loading