From 7927b24bdd70b3a910ff9fffbd5aeb6167c136bc Mon Sep 17 00:00:00 2001 From: Andrew Bulat Date: Wed, 4 Dec 2024 05:43:32 +0000 Subject: [PATCH] Move `applyStateMessages` to LiveObjects class --- src/plugins/liveobjects/liveobjects.ts | 47 ++++++++++++++++++++-- src/plugins/liveobjects/liveobjectspool.ts | 45 --------------------- 2 files changed, 44 insertions(+), 48 deletions(-) diff --git a/src/plugins/liveobjects/liveobjects.ts b/src/plugins/liveobjects/liveobjects.ts index 75b743d5b..aa064de78 100644 --- a/src/plugins/liveobjects/liveobjects.ts +++ b/src/plugins/liveobjects/liveobjects.ts @@ -6,7 +6,7 @@ import { LiveCounter } from './livecounter'; import { LiveMap } from './livemap'; import { LiveObject, LiveObjectUpdate } from './liveobject'; import { LiveObjectsPool, ROOT_OBJECT_ID } from './liveobjectspool'; -import { StateMessage } from './statemessage'; +import { StateMessage, StateOperationAction } from './statemessage'; import { SyncLiveObjectsDataPool } from './syncliveobjectsdatapool'; enum LiveObjectsEvents { @@ -101,7 +101,7 @@ export class LiveObjects { return; } - this._liveObjectsPool.applyStateMessages(stateMessages); + this._applyStateMessages(stateMessages); } /** @@ -159,7 +159,7 @@ export class LiveObjects { this._applySync(); // should apply buffered state operations after we applied the SYNC data. // can use regular state messages application logic - this._liveObjectsPool.applyStateMessages(this._bufferedStateOperations); + this._applyStateMessages(this._bufferedStateOperations); this._bufferedStateOperations = []; this._syncLiveObjectsDataPool.reset(); @@ -232,4 +232,45 @@ export class LiveObjects { // call subscription callbacks for all updated existing objects existingObjectUpdates.forEach(({ object, update }) => object.notifyUpdated(update)); } + + private _applyStateMessages(stateMessages: StateMessage[]): void { + for (const stateMessage of stateMessages) { + if (!stateMessage.operation) { + this._client.Logger.logAction( + this._client.logger, + this._client.Logger.LOG_MAJOR, + 'LiveObjects._applyStateMessages()', + `state operation message is received without 'operation' field, skipping message; message id: ${stateMessage.id}, channel: ${this._channel.name}`, + ); + continue; + } + + const stateOperation = stateMessage.operation; + + switch (stateOperation.action) { + case StateOperationAction.MAP_CREATE: + case StateOperationAction.COUNTER_CREATE: + case StateOperationAction.MAP_SET: + case StateOperationAction.MAP_REMOVE: + case StateOperationAction.COUNTER_INC: + // we can receive an op for an object id we don't have yet in the pool. instead of buffering such operations, + // we can create a zero-value object for the provided object id and apply the operation to that zero-value object. + // this also means that all objects are capable of applying the corresponding *_CREATE ops on themselves, + // since they need to be able to eventually initialize themselves from that *_CREATE op. + // so to simplify operations handling, we always try to create a zero-value object in the pool first, + // and then we can always apply the operation on the existing object in the pool. + this._liveObjectsPool.createZeroValueObjectIfNotExists(stateOperation.objectId); + this._liveObjectsPool.get(stateOperation.objectId)!.applyOperation(stateOperation, stateMessage); + break; + + default: + this._client.Logger.logAction( + this._client.logger, + this._client.Logger.LOG_MAJOR, + 'LiveObjects._applyStateMessages()', + `received unsupported action in state operation message: ${stateOperation.action}, skipping message; message id: ${stateMessage.id}, channel: ${this._channel.name}`, + ); + } + } + } } diff --git a/src/plugins/liveobjects/liveobjectspool.ts b/src/plugins/liveobjects/liveobjectspool.ts index 2c57f1084..eb42d47b4 100644 --- a/src/plugins/liveobjects/liveobjectspool.ts +++ b/src/plugins/liveobjects/liveobjectspool.ts @@ -1,11 +1,9 @@ import type BaseClient from 'common/lib/client/baseclient'; -import type RealtimeChannel from 'common/lib/client/realtimechannel'; import { LiveCounter } from './livecounter'; import { LiveMap } from './livemap'; import { LiveObject } from './liveobject'; import { LiveObjects } from './liveobjects'; import { ObjectId } from './objectid'; -import { StateMessage, StateOperationAction } from './statemessage'; export const ROOT_OBJECT_ID = 'root'; @@ -14,12 +12,10 @@ export const ROOT_OBJECT_ID = 'root'; */ export class LiveObjectsPool { private _client: BaseClient; - private _channel: RealtimeChannel; private _pool: Map; constructor(private _liveObjects: LiveObjects) { this._client = this._liveObjects.getClient(); - this._channel = this._liveObjects.getChannel(); this._pool = this._getInitialPool(); } @@ -66,47 +62,6 @@ export class LiveObjectsPool { this.set(objectId, zeroValueObject); } - applyStateMessages(stateMessages: StateMessage[]): void { - for (const stateMessage of stateMessages) { - if (!stateMessage.operation) { - this._client.Logger.logAction( - this._client.logger, - this._client.Logger.LOG_MAJOR, - 'LiveObjects.LiveObjectsPool.applyStateMessages()', - `state operation message is received without 'operation' field, skipping message; message id: ${stateMessage.id}, channel: ${this._channel.name}`, - ); - continue; - } - - const stateOperation = stateMessage.operation; - - switch (stateOperation.action) { - case StateOperationAction.MAP_CREATE: - case StateOperationAction.COUNTER_CREATE: - case StateOperationAction.MAP_SET: - case StateOperationAction.MAP_REMOVE: - case StateOperationAction.COUNTER_INC: - // we can receive an op for an object id we don't have yet in the pool. instead of buffering such operations, - // we can create a zero-value object for the provided object id and apply the operation to that zero-value object. - // this also means that all objects are capable of applying the corresponding *_CREATE ops on themselves, - // since they need to be able to eventually initialize themselves from that *_CREATE op. - // so to simplify operations handling, we always try to create a zero-value object in the pool first, - // and then we can always apply the operation on the existing object in the pool. - this.createZeroValueObjectIfNotExists(stateOperation.objectId); - this.get(stateOperation.objectId)!.applyOperation(stateOperation, stateMessage); - break; - - default: - this._client.Logger.logAction( - this._client.logger, - this._client.Logger.LOG_MAJOR, - 'LiveObjects.LiveObjectsPool.applyStateMessages()', - `received unsupported action in state operation message: ${stateOperation.action}, skipping message; message id: ${stateMessage.id}, channel: ${this._channel.name}`, - ); - } - } - } - private _getInitialPool(): Map { const pool = new Map(); const root = LiveMap.zeroValue(this._liveObjects, ROOT_OBJECT_ID);