Skip to content

Commit

Permalink
Apply ConnectionDetails.maxMessageSize limit when publishing state …
Browse files Browse the repository at this point in the history
…messages

Resolves DTP-1118
  • Loading branch information
VeskeR committed Jan 27, 2025
1 parent a0dc0db commit 908ddae
Show file tree
Hide file tree
Showing 5 changed files with 415 additions and 4 deletions.
2 changes: 2 additions & 0 deletions src/common/lib/client/defaultrealtime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import {
import { Http } from 'common/types/http';
import Defaults from '../util/defaults';
import Logger from '../util/logger';
import { MessageEncoding } from '../types/message';

/**
`DefaultRealtime` is the class that the non tree-shakable version of the SDK exports as `Realtime`. It ensures that this version of the SDK includes all of the functionality which is optionally available in the tree-shakable version.
Expand Down Expand Up @@ -71,4 +72,5 @@ export class DefaultRealtime extends BaseRealtime {
// Used by tests
static _Http = Http;
static _PresenceMap = PresenceMap;
static _MessageEncoding = MessageEncoding;
}
9 changes: 9 additions & 0 deletions src/plugins/liveobjects/liveobjects.ts
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,15 @@ export class LiveObjects {
}

stateMessages.forEach((x) => StateMessage.encode(x, this._client.MessageEncoding));
const maxMessageSize = this._client.options.maxMessageSize;
const size = stateMessages.reduce((acc, msg) => acc + msg.getMessageSize(), 0);
if (size > maxMessageSize) {
throw new this._client.ErrorInfo(
`Maximum size of state messages that can be published at once exceeded (was ${size} bytes; limit is ${maxMessageSize} bytes)`,
40009,
400,
);
}

return this._channel.sendState(stateMessages);
}
Expand Down
117 changes: 117 additions & 0 deletions src/plugins/liveobjects/statemessage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -397,4 +397,121 @@ export class StateMessage {

return result;
}

getMessageSize(): number {
let size = 0;

size += this.clientId?.length ?? 0;
if (this.operation) {
size += this._getStateOperationSize(this.operation);
}
if (this.object) {
size += this._getStateObjectSize(this.object);
}
if (this.extras) {
size += JSON.stringify(this.extras).length;
}

return size;
}

private _getStateOperationSize(operation: StateOperation): number {
let size = 0;

if (operation.mapOp) {
size += this._getStateMapOpSize(operation.mapOp);
}
if (operation.counterOp) {
size += this._getStateCounterOpSize(operation.counterOp);
}
if (operation.map) {
size += this._getStateMapSize(operation.map);
}
if (operation.counter) {
size += this._getStateCounterSize(operation.counter);
}

return size;
}

private _getStateObjectSize(obj: StateObject): number {
let size = 0;

if (obj.map) {
size += this._getStateMapSize(obj.map);
}
if (obj.counter) {
size += this._getStateCounterSize(obj.counter);
}
if (obj.createOp) {
size += this._getStateOperationSize(obj.createOp);
}

return size;
}

private _getStateMapSize(map: StateMap): number {
let size = 0;

Object.entries(map.entries ?? {}).forEach(([key, entry]) => {
size += key?.length ?? 0;
if (entry) {
size += this._getStateMapEntrySize(entry);
}
});

return size;
}

private _getStateCounterSize(counter: StateCounter): number {
if (counter.count == null) {
return 0;
}

return 8;
}

private _getStateMapEntrySize(entry: StateMapEntry): number {
let size = 0;

if (entry.data) {
size += this._getStateDataSize(entry.data);
}

return size;
}

private _getStateMapOpSize(mapOp: StateMapOp): number {
let size = 0;

size += mapOp.key?.length ?? 0;

if (mapOp.data) {
size += this._getStateDataSize(mapOp.data);
}

return size;
}

private _getStateCounterOpSize(operation: StateCounterOp): number {
if (operation.amount == null) {
return 0;
}

return 8;
}

private _getStateDataSize(data: StateData): number {
let size = 0;

if (data.value) {
size += this._getStateValueSize(data.value);
}

return size;
}

private _getStateValueSize(value: StateValue): number {
return this._utils.dataSizeBytes(value);
}
}
6 changes: 5 additions & 1 deletion test/common/modules/private_api_recorder.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ define(['test/support/output_directory_paths'], function (outputDirectoryPaths)
'call.Defaults.getPort',
'call.Defaults.normaliseOptions',
'call.EventEmitter.emit',
'call.LiveObject.getObjectId',
'call.LiveObject.isTombstoned',
'call.LiveObjects._liveObjectsPool._onGCInterval',
'call.LiveObjects._liveObjectsPool.get',
Expand All @@ -25,7 +26,11 @@ define(['test/support/output_directory_paths'], function (outputDirectoryPaths)
'call.Platform.nextTick',
'call.PresenceMessage.fromValues',
'call.ProtocolMessage.setFlag',
'call.StateMessage.encode',
'call.StateMessage.fromValues',
'call.StateMessage.getMessageSize',
'call.Utils.copy',
'call.Utils.dataSizeBytes',
'call.Utils.getRetryTime',
'call.Utils.inspectError',
'call.Utils.keysArray',
Expand All @@ -47,7 +52,6 @@ define(['test/support/output_directory_paths'], function (outputDirectoryPaths)
'call.http._getHosts',
'call.http.checkConnectivity',
'call.http.doUri',
'call.LiveObject.getObjectId',
'call.msgpack.decode',
'call.msgpack.encode',
'call.presence._myMembers.put',
Expand Down
Loading

0 comments on commit 908ddae

Please sign in to comment.