Skip to content

Commit

Permalink
Add object-level write API for LiveMap and LiveCounter to update …
Browse files Browse the repository at this point in the history
…existing objects
  • Loading branch information
VeskeR committed Jan 22, 2025
1 parent 5b365bc commit f2804e6
Show file tree
Hide file tree
Showing 3 changed files with 165 additions and 6 deletions.
54 changes: 52 additions & 2 deletions src/plugins/liveobjects/livecounter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,56 @@ export class LiveCounter extends LiveObject<LiveCounterData, LiveCounterUpdate>
return this._dataRef.data;
}

/**
* Send a COUNTER_INC operation to the realtime system to increment a value on this LiveCounter object.
*
* This does not modify the underlying data of this LiveCounter object. Instead, the change will be applied when
* the published COUNTER_INC operation is echoed back to the client and applied to the object following the regular
* operation application procedure.
*
* @returns A promise which resolves upon receiving the ACK message for the published operation message.
*/
async increment(amount: number): Promise<void> {
const stateMessage = this.createCounterIncMessage(amount);
return this._liveObjects.publish([stateMessage]);
}

/**
* @internal
*/
createCounterIncMessage(amount: number): StateMessage {
if (typeof amount !== 'number' || !isFinite(amount)) {
throw new this._client.ErrorInfo('Counter value increment should be a valid number', 40013, 400);
}

const stateMessage = StateMessage.fromValues(
{
operation: {
action: StateOperationAction.COUNTER_INC,
objectId: this.getObjectId(),
counterOp: { amount },
},
},
this._client.Utils,
this._client.MessageEncoding,
);

return stateMessage;
}

/**
* Alias for calling {@link LiveCounter.increment | LiveCounter.increment(-amount)}
*/
async decrement(amount: number): Promise<void> {
// do an explicit type safety check here before negating the amount value,
// so we don't unintentionally change the type sent by a user
if (typeof amount !== 'number' || !isFinite(amount)) {
throw new this._client.ErrorInfo('Counter value decrement should be a valid number', 40013, 400);
}

return this.increment(-amount);
}

/**
* @internal
*/
Expand All @@ -55,7 +105,7 @@ export class LiveCounter extends LiveObject<LiveCounterData, LiveCounterUpdate>
this._client.logger,
this._client.Logger.LOG_MICRO,
'LiveCounter.applyOperation()',
`skipping ${op.action} op: op timeserial ${opOriginTimeserial.toString()} <= site timeserial ${this._siteTimeserials[opSiteCode]?.toString()}; objectId=${this._objectId}`,
`skipping ${op.action} op: op timeserial ${opOriginTimeserial.toString()} <= site timeserial ${this._siteTimeserials[opSiteCode]?.toString()}; objectId=${this.getObjectId()}`,
);
return;
}
Expand Down Expand Up @@ -202,7 +252,7 @@ export class LiveCounter extends LiveObject<LiveCounterData, LiveCounterUpdate>
this._client.logger,
this._client.Logger.LOG_MICRO,
'LiveCounter._applyCounterCreate()',
`skipping applying COUNTER_CREATE op on a counter instance as it was already applied before; objectId=${this._objectId}`,
`skipping applying COUNTER_CREATE op on a counter instance as it was already applied before; objectId=${this.getObjectId()}`,
);
return { noop: true };
}
Expand Down
100 changes: 96 additions & 4 deletions src/plugins/liveobjects/livemap.ts
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,98 @@ export class LiveMap<T extends API.LiveMapType> extends LiveObject<LiveMapData,
return size;
}

/**
* Send a MAP_SET operation to the realtime system to set a key on this LiveMap object to a specified value.
*
* This does not modify the underlying data of this LiveMap object. Instead, the change will be applied when
* the published MAP_SET operation is echoed back to the client and applied to the object following the regular
* operation application procedure.
*
* @returns A promise which resolves upon receiving the ACK message for the published operation message.
*/
async set<TKey extends keyof T & string>(key: TKey, value: T[TKey]): Promise<void> {
const stateMessage = this.createMapSetMessage(key, value);
return this._liveObjects.publish([stateMessage]);
}

/**
* @internal
*/
createMapSetMessage<TKey extends keyof T & string>(key: TKey, value: T[TKey]): StateMessage {
if (typeof key !== 'string') {
throw new this._client.ErrorInfo('Map key should be string', 40013, 400);
}

if (
typeof value !== 'string' &&
typeof value !== 'number' &&
typeof value !== 'boolean' &&
!this._client.Platform.BufferUtils.isBuffer(value) &&
!(value instanceof LiveObject)
) {
throw new this._client.ErrorInfo('Map value data type is unsupported', 40013, 400);
}

const stateData: StateData =
value instanceof LiveObject
? ({ objectId: value.getObjectId() } as ObjectIdStateData)
: ({ value } as ValueStateData);

const stateMessage = StateMessage.fromValues(
{
operation: {
action: StateOperationAction.MAP_SET,
objectId: this.getObjectId(),
mapOp: {
key,
data: stateData,
},
},
},
this._client.Utils,
this._client.MessageEncoding,
);

return stateMessage;
}

/**
* Send a MAP_REMOVE operation to the realtime system to tombstone a key on this LiveMap object.
*
* This does not modify the underlying data of this LiveMap object. Instead, the change will be applied when
* the published MAP_REMOVE operation is echoed back to the client and applied to the object following the regular
* operation application procedure.
*
* @returns A promise which resolves upon receiving the ACK message for the published operation message.
*/
async remove<TKey extends keyof T & string>(key: TKey): Promise<void> {
const stateMessage = this.createMapRemoveMessage(key);
return this._liveObjects.publish([stateMessage]);
}

/**
* @internal
*/
createMapRemoveMessage<TKey extends keyof T & string>(key: TKey): StateMessage {
if (typeof key !== 'string') {
throw new this._client.ErrorInfo('Map key should be string', 40013, 400);
}

const stateMessage = StateMessage.fromValues(
{
operation: {
action: StateOperationAction.MAP_REMOVE,
objectId: this.getObjectId(),
mapOp: { key },
},
},
this._client.Utils,
this._client.MessageEncoding,
);

return stateMessage;
}

/**
* @internal
*/
Expand All @@ -172,7 +264,7 @@ export class LiveMap<T extends API.LiveMapType> extends LiveObject<LiveMapData,
this._client.logger,
this._client.Logger.LOG_MICRO,
'LiveMap.applyOperation()',
`skipping ${op.action} op: op timeserial ${opOriginTimeserial.toString()} <= site timeserial ${this._siteTimeserials[opSiteCode]?.toString()}; objectId=${this._objectId}`,
`skipping ${op.action} op: op timeserial ${opOriginTimeserial.toString()} <= site timeserial ${this._siteTimeserials[opSiteCode]?.toString()}; objectId=${this.getObjectId()}`,
);
return;
}
Expand Down Expand Up @@ -427,7 +519,7 @@ export class LiveMap<T extends API.LiveMapType> extends LiveObject<LiveMapData,
this._client.logger,
this._client.Logger.LOG_MICRO,
'LiveMap._applyMapCreate()',
`skipping applying MAP_CREATE op on a map instance as it was already applied before; objectId=${this._objectId}`,
`skipping applying MAP_CREATE op on a map instance as it was already applied before; objectId=${this.getObjectId()}`,
);
return { noop: true };
}
Expand All @@ -453,7 +545,7 @@ export class LiveMap<T extends API.LiveMapType> extends LiveObject<LiveMapData,
this._client.logger,
this._client.Logger.LOG_MICRO,
'LiveMap._applyMapSet()',
`skipping update for key="${op.key}": op timeserial ${opOriginTimeserial?.toString()} <= entry timeserial ${existingEntry.timeserial?.toString()}; objectId=${this._objectId}`,
`skipping update for key="${op.key}": op timeserial ${opOriginTimeserial?.toString()} <= entry timeserial ${existingEntry.timeserial?.toString()}; objectId=${this.getObjectId()}`,
);
return { noop: true };
}
Expand Down Expand Up @@ -506,7 +598,7 @@ export class LiveMap<T extends API.LiveMapType> extends LiveObject<LiveMapData,
this._client.logger,
this._client.Logger.LOG_MICRO,
'LiveMap._applyMapRemove()',
`skipping remove for key="${op.key}": op timeserial ${opOriginTimeserial?.toString()} <= entry timeserial ${existingEntry.timeserial?.toString()}; objectId=${this._objectId}`,
`skipping remove for key="${op.key}": op timeserial ${opOriginTimeserial?.toString()} <= entry timeserial ${existingEntry.timeserial?.toString()}; objectId=${this.getObjectId()}`,
);
return { noop: true };
}
Expand Down
17 changes: 17 additions & 0 deletions src/plugins/liveobjects/liveobjects.ts
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,23 @@ export class LiveObjects {
}
}

/**
* @internal
*/
async publish(stateMessages: StateMessage[]): Promise<void> {
if (!this._channel.connectionManager.activeState()) {
throw this._channel.connectionManager.getError();
}

if (this._channel.state === 'failed' || this._channel.state === 'suspended') {
throw this._client.ErrorInfo.fromValues(this._channel.invalidStateError());
}

stateMessages.forEach((x) => StateMessage.encode(x, this._client.MessageEncoding));

return this._channel.sendState(stateMessages);
}

private _startNewSync(syncId?: string, syncCursor?: string): void {
// need to discard all buffered state operation messages on new sync start
this._bufferedStateOperations = [];
Expand Down

0 comments on commit f2804e6

Please sign in to comment.