Skip to content

Commit

Permalink
Move applyStateMessages to LiveObjects class
Browse files Browse the repository at this point in the history
  • Loading branch information
VeskeR committed Dec 11, 2024
1 parent f08222d commit 7927b24
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 48 deletions.
47 changes: 44 additions & 3 deletions src/plugins/liveobjects/liveobjects.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import { LiveCounter } from './livecounter';
import { LiveMap } from './livemap';
import { LiveObject, LiveObjectUpdate } from './liveobject';
import { LiveObjectsPool, ROOT_OBJECT_ID } from './liveobjectspool';
import { StateMessage } from './statemessage';
import { StateMessage, StateOperationAction } from './statemessage';
import { SyncLiveObjectsDataPool } from './syncliveobjectsdatapool';

enum LiveObjectsEvents {
Expand Down Expand Up @@ -101,7 +101,7 @@ export class LiveObjects {
return;
}

this._liveObjectsPool.applyStateMessages(stateMessages);
this._applyStateMessages(stateMessages);
}

/**
Expand Down Expand Up @@ -159,7 +159,7 @@ export class LiveObjects {
this._applySync();
// should apply buffered state operations after we applied the SYNC data.
// can use regular state messages application logic
this._liveObjectsPool.applyStateMessages(this._bufferedStateOperations);
this._applyStateMessages(this._bufferedStateOperations);

this._bufferedStateOperations = [];
this._syncLiveObjectsDataPool.reset();
Expand Down Expand Up @@ -232,4 +232,45 @@ export class LiveObjects {
// call subscription callbacks for all updated existing objects
existingObjectUpdates.forEach(({ object, update }) => object.notifyUpdated(update));
}

private _applyStateMessages(stateMessages: StateMessage[]): void {
for (const stateMessage of stateMessages) {
if (!stateMessage.operation) {
this._client.Logger.logAction(
this._client.logger,
this._client.Logger.LOG_MAJOR,
'LiveObjects._applyStateMessages()',
`state operation message is received without 'operation' field, skipping message; message id: ${stateMessage.id}, channel: ${this._channel.name}`,
);
continue;
}

const stateOperation = stateMessage.operation;

switch (stateOperation.action) {
case StateOperationAction.MAP_CREATE:
case StateOperationAction.COUNTER_CREATE:
case StateOperationAction.MAP_SET:
case StateOperationAction.MAP_REMOVE:
case StateOperationAction.COUNTER_INC:
// we can receive an op for an object id we don't have yet in the pool. instead of buffering such operations,
// we can create a zero-value object for the provided object id and apply the operation to that zero-value object.
// this also means that all objects are capable of applying the corresponding *_CREATE ops on themselves,
// since they need to be able to eventually initialize themselves from that *_CREATE op.
// so to simplify operations handling, we always try to create a zero-value object in the pool first,
// and then we can always apply the operation on the existing object in the pool.
this._liveObjectsPool.createZeroValueObjectIfNotExists(stateOperation.objectId);
this._liveObjectsPool.get(stateOperation.objectId)!.applyOperation(stateOperation, stateMessage);
break;

default:
this._client.Logger.logAction(
this._client.logger,
this._client.Logger.LOG_MAJOR,
'LiveObjects._applyStateMessages()',
`received unsupported action in state operation message: ${stateOperation.action}, skipping message; message id: ${stateMessage.id}, channel: ${this._channel.name}`,
);
}
}
}
}
45 changes: 0 additions & 45 deletions src/plugins/liveobjects/liveobjectspool.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
import type BaseClient from 'common/lib/client/baseclient';
import type RealtimeChannel from 'common/lib/client/realtimechannel';
import { LiveCounter } from './livecounter';
import { LiveMap } from './livemap';
import { LiveObject } from './liveobject';
import { LiveObjects } from './liveobjects';
import { ObjectId } from './objectid';
import { StateMessage, StateOperationAction } from './statemessage';

export const ROOT_OBJECT_ID = 'root';

Expand All @@ -14,12 +12,10 @@ export const ROOT_OBJECT_ID = 'root';
*/
export class LiveObjectsPool {
private _client: BaseClient;
private _channel: RealtimeChannel;
private _pool: Map<string, LiveObject>;

constructor(private _liveObjects: LiveObjects) {
this._client = this._liveObjects.getClient();
this._channel = this._liveObjects.getChannel();
this._pool = this._getInitialPool();
}

Expand Down Expand Up @@ -66,47 +62,6 @@ export class LiveObjectsPool {
this.set(objectId, zeroValueObject);
}

applyStateMessages(stateMessages: StateMessage[]): void {
for (const stateMessage of stateMessages) {
if (!stateMessage.operation) {
this._client.Logger.logAction(
this._client.logger,
this._client.Logger.LOG_MAJOR,
'LiveObjects.LiveObjectsPool.applyStateMessages()',
`state operation message is received without 'operation' field, skipping message; message id: ${stateMessage.id}, channel: ${this._channel.name}`,
);
continue;
}

const stateOperation = stateMessage.operation;

switch (stateOperation.action) {
case StateOperationAction.MAP_CREATE:
case StateOperationAction.COUNTER_CREATE:
case StateOperationAction.MAP_SET:
case StateOperationAction.MAP_REMOVE:
case StateOperationAction.COUNTER_INC:
// we can receive an op for an object id we don't have yet in the pool. instead of buffering such operations,
// we can create a zero-value object for the provided object id and apply the operation to that zero-value object.
// this also means that all objects are capable of applying the corresponding *_CREATE ops on themselves,
// since they need to be able to eventually initialize themselves from that *_CREATE op.
// so to simplify operations handling, we always try to create a zero-value object in the pool first,
// and then we can always apply the operation on the existing object in the pool.
this.createZeroValueObjectIfNotExists(stateOperation.objectId);
this.get(stateOperation.objectId)!.applyOperation(stateOperation, stateMessage);
break;

default:
this._client.Logger.logAction(
this._client.logger,
this._client.Logger.LOG_MAJOR,
'LiveObjects.LiveObjectsPool.applyStateMessages()',
`received unsupported action in state operation message: ${stateOperation.action}, skipping message; message id: ${stateMessage.id}, channel: ${this._channel.name}`,
);
}
}
}

private _getInitialPool(): Map<string, LiveObject> {
const pool = new Map<string, LiveObject>();
const root = LiveMap.zeroValue(this._liveObjects, ROOT_OBJECT_ID);
Expand Down

0 comments on commit 7927b24

Please sign in to comment.