Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DTP-955] Buffer and flush state operations during a STATE_SYNC sequence #1909

Merged
merged 4 commits into from
Nov 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/common/lib/client/realtimechannel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -649,7 +649,7 @@ class RealtimeChannel extends EventEmitter {
}
}

this._liveObjects.handleStateMessages(stateMessages);
this._liveObjects.handleStateMessages(stateMessages, message.channelSerial);

break;
}
Expand Down
19 changes: 14 additions & 5 deletions src/plugins/liveobjects/livecounter.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { LiveObject, LiveObjectData } from './liveobject';
import { LiveObjects } from './liveobjects';
import { StateCounter, StateCounterOp, StateOperation, StateOperationAction } from './statemessage';
import { StateCounter, StateCounterOp, StateMessage, StateOperation, StateOperationAction } from './statemessage';
import { Timeserial } from './timeserial';

export interface LiveCounterData extends LiveObjectData {
data: number;
Expand All @@ -12,17 +13,23 @@ export class LiveCounter extends LiveObject<LiveCounterData> {
private _created: boolean,
initialData?: LiveCounterData | null,
objectId?: string,
regionalTimeserial?: Timeserial,
) {
super(liveObjects, initialData, objectId);
super(liveObjects, initialData, objectId, regionalTimeserial);
}

/**
* Returns a {@link LiveCounter} instance with a 0 value.
*
* @internal
*/
static zeroValue(liveobjects: LiveObjects, isCreated: boolean, objectId?: string): LiveCounter {
return new LiveCounter(liveobjects, isCreated, null, objectId);
static zeroValue(
liveobjects: LiveObjects,
isCreated: boolean,
objectId?: string,
regionalTimeserial?: Timeserial,
): LiveCounter {
return new LiveCounter(liveobjects, isCreated, null, objectId, regionalTimeserial);
}

value(): number {
Expand All @@ -46,7 +53,7 @@ export class LiveCounter extends LiveObject<LiveCounterData> {
/**
* @internal
*/
applyOperation(op: StateOperation): void {
applyOperation(op: StateOperation, msg: StateMessage, opRegionalTimeserial: Timeserial): void {
if (op.objectId !== this.getObjectId()) {
throw new this._client.ErrorInfo(
`Cannot apply state operation with objectId=${op.objectId}, to this LiveCounter with objectId=${this.getObjectId()}`,
Expand Down Expand Up @@ -75,6 +82,8 @@ export class LiveCounter extends LiveObject<LiveCounterData> {
500,
);
}

this.setRegionalTimeserial(opRegionalTimeserial);
VeskeR marked this conversation as resolved.
Show resolved Hide resolved
}

protected _getZeroValueData(): LiveCounterData {
Expand Down
11 changes: 7 additions & 4 deletions src/plugins/liveobjects/livemap.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,17 +46,18 @@ export class LiveMap extends LiveObject<LiveMapData> {
private _semantics: MapSemantics,
initialData?: LiveMapData | null,
objectId?: string,
regionalTimeserial?: Timeserial,
) {
super(liveObjects, initialData, objectId);
super(liveObjects, initialData, objectId, regionalTimeserial);
}

/**
* Returns a {@link LiveMap} instance with an empty map data.
*
* @internal
*/
static zeroValue(liveobjects: LiveObjects, objectId?: string): LiveMap {
return new LiveMap(liveobjects, MapSemantics.LWW, null, objectId);
static zeroValue(liveobjects: LiveObjects, objectId?: string, regionalTimeserial?: Timeserial): LiveMap {
return new LiveMap(liveobjects, MapSemantics.LWW, null, objectId, regionalTimeserial);
}

static liveMapDataFromMapEntries(client: BaseClient, entries: Record<string, StateMapEntry>): LiveMapData {
Expand Down Expand Up @@ -134,7 +135,7 @@ export class LiveMap extends LiveObject<LiveMapData> {
/**
* @internal
*/
applyOperation(op: StateOperation, msg: StateMessage): void {
applyOperation(op: StateOperation, msg: StateMessage, opRegionalTimeserial: Timeserial): void {
if (op.objectId !== this.getObjectId()) {
throw new this._client.ErrorInfo(
`Cannot apply state operation with objectId=${op.objectId}, to this LiveMap with objectId=${this.getObjectId()}`,
Expand Down Expand Up @@ -171,6 +172,8 @@ export class LiveMap extends LiveObject<LiveMapData> {
500,
);
}

this.setRegionalTimeserial(opRegionalTimeserial);
}

protected _getZeroValueData(): LiveMapData {
Expand Down
12 changes: 8 additions & 4 deletions src/plugins/liveobjects/liveobject.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import type BaseClient from 'common/lib/client/baseclient';
import { LiveObjects } from './liveobjects';
import { StateMessage, StateOperation } from './statemessage';
import { DefaultTimeserial, Timeserial } from './timeserial';

export interface LiveObjectData {
data: any;
Expand All @@ -10,16 +11,19 @@ export abstract class LiveObject<T extends LiveObjectData = LiveObjectData> {
protected _client: BaseClient;
protected _dataRef: T;
protected _objectId: string;
protected _regionalTimeserial?: string;
protected _regionalTimeserial: Timeserial;

constructor(
protected _liveObjects: LiveObjects,
initialData?: T | null,
objectId?: string,
regionalTimeserial?: Timeserial,
) {
this._client = this._liveObjects.getClient();
this._dataRef = initialData ?? this._getZeroValueData();
this._objectId = objectId ?? this._createObjectId();
// use zero value timeserial by default, so any future operation can be applied for this object
this._regionalTimeserial = regionalTimeserial ?? DefaultTimeserial.zeroValueTimeserial(this._client);
}

/**
Expand All @@ -32,7 +36,7 @@ export abstract class LiveObject<T extends LiveObjectData = LiveObjectData> {
/**
* @internal
*/
getRegionalTimeserial(): string | undefined {
getRegionalTimeserial(): Timeserial {
return this._regionalTimeserial;
}

Expand All @@ -46,7 +50,7 @@ export abstract class LiveObject<T extends LiveObjectData = LiveObjectData> {
/**
* @internal
*/
setRegionalTimeserial(regionalTimeserial: string): void {
setRegionalTimeserial(regionalTimeserial: Timeserial): void {
this._regionalTimeserial = regionalTimeserial;
}

Expand All @@ -58,6 +62,6 @@ export abstract class LiveObject<T extends LiveObjectData = LiveObjectData> {
/**
* @internal
*/
abstract applyOperation(op: StateOperation, msg: StateMessage): void;
abstract applyOperation(op: StateOperation, msg: StateMessage, opRegionalTimeserial: Timeserial): void;
protected abstract _getZeroValueData(): T;
}
43 changes: 33 additions & 10 deletions src/plugins/liveobjects/liveobjects.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,17 @@ import { LiveObject } from './liveobject';
import { LiveObjectsPool, ROOT_OBJECT_ID } from './liveobjectspool';
import { StateMessage } from './statemessage';
import { LiveCounterDataEntry, SyncLiveObjectsDataPool } from './syncliveobjectsdatapool';
import { DefaultTimeserial, Timeserial } from './timeserial';

enum LiveObjectsEvents {
SyncCompleted = 'SyncCompleted',
}

export interface BufferedStateMessage {
stateMessage: StateMessage;
regionalTimeserial: Timeserial;
}

export class LiveObjects {
private _client: BaseClient;
private _channel: RealtimeChannel;
Expand All @@ -24,6 +30,7 @@ export class LiveObjects {
private _syncInProgress: boolean;
private _currentSyncId: string | undefined;
private _currentSyncCursor: string | undefined;
private _bufferedStateOperations: BufferedStateMessage[];

constructor(channel: RealtimeChannel) {
this._channel = channel;
Expand All @@ -32,6 +39,7 @@ export class LiveObjects {
this._liveObjectsPool = new LiveObjectsPool(this);
this._syncLiveObjectsDataPool = new SyncLiveObjectsDataPool(this);
this._syncInProgress = true;
this._bufferedStateOperations = [];
}

async getRoot(): Promise<LiveMap> {
Expand Down Expand Up @@ -84,12 +92,23 @@ export class LiveObjects {
/**
* @internal
*/
handleStateMessages(stateMessages: StateMessage[]): void {
handleStateMessages(stateMessages: StateMessage[], msgRegionalTimeserial: string | null | undefined): void {
const timeserial = DefaultTimeserial.calculateTimeserial(this._client, msgRegionalTimeserial);

if (this._syncInProgress) {
// TODO: handle buffering of state messages during SYNC
// The client receives state messages in realtime over the channel concurrently with the SYNC sequence.
// Some of the incoming state messages may have already been applied to the state objects described in
// the SYNC sequence, but others may not; therefore we must buffer these messages so that we can apply
// them to the state objects once the SYNC is complete. To avoid double-counting, the buffered operations
// are applied according to the state object's regional timeserial, which reflects the regional timeserial
// of the state message that was last applied to that state object.
stateMessages.forEach((x) =>
this._bufferedStateOperations.push({ stateMessage: x, regionalTimeserial: timeserial }),
);
return;
}

this._liveObjectsPool.applyStateMessages(stateMessages);
this._liveObjectsPool.applyStateMessages(stateMessages, timeserial);
}

/**
Expand All @@ -100,7 +119,7 @@ export class LiveObjects {
this._client.logger,
this._client.Logger.LOG_MINOR,
'LiveObjects.onAttached()',
'channel = ' + this._channel.name + ', hasState = ' + hasState,
`channel=${this._channel.name}, hasState=${hasState}`,
);

if (hasState) {
Expand Down Expand Up @@ -135,16 +154,20 @@ export class LiveObjects {
}

private _startNewSync(syncId?: string, syncCursor?: string): void {
// need to discard all buffered state operation messages on new sync start
this._bufferedStateOperations = [];
this._syncLiveObjectsDataPool.reset();
this._currentSyncId = syncId;
this._currentSyncCursor = syncCursor;
this._syncInProgress = true;
}

private _endSync(): void {
// TODO: handle applying buffered state messages when SYNC is finished

this._applySync();
// should apply buffered state operations after we applied the SYNC data
this._liveObjectsPool.applyBufferedStateMessages(this._bufferedStateOperations);

this._bufferedStateOperations = [];
this._syncLiveObjectsDataPool.reset();
this._currentSyncId = undefined;
this._currentSyncCursor = undefined;
Expand Down Expand Up @@ -180,10 +203,11 @@ export class LiveObjects {
for (const [objectId, entry] of this._syncLiveObjectsDataPool.entries()) {
receivedObjectIds.add(objectId);
const existingObject = this._liveObjectsPool.get(objectId);
const regionalTimeserialObj = DefaultTimeserial.calculateTimeserial(this._client, entry.regionalTimeserial);

if (existingObject) {
existingObject.setData(entry.objectData);
existingObject.setRegionalTimeserial(entry.regionalTimeserial);
existingObject.setRegionalTimeserial(regionalTimeserialObj);
if (existingObject instanceof LiveCounter) {
existingObject.setCreated((entry as LiveCounterDataEntry).created);
}
Expand All @@ -195,17 +219,16 @@ export class LiveObjects {
const objectType = entry.objectType;
switch (objectType) {
case 'LiveCounter':
newObject = new LiveCounter(this, entry.created, entry.objectData, objectId);
newObject = new LiveCounter(this, entry.created, entry.objectData, objectId, regionalTimeserialObj);
break;

case 'LiveMap':
newObject = new LiveMap(this, entry.semantics, entry.objectData, objectId);
newObject = new LiveMap(this, entry.semantics, entry.objectData, objectId, regionalTimeserialObj);
break;

default:
throw new this._client.ErrorInfo(`Unknown live object type: ${objectType}`, 50000, 500);
}
newObject.setRegionalTimeserial(entry.regionalTimeserial);

this._liveObjectsPool.set(objectId, newObject);
}
Expand Down
Loading
Loading