Skip to content

Commit

Permalink
Handle StateObject.tombstone and OBJECT_DELETE messages
Browse files Browse the repository at this point in the history
Tombstoned objects clear their underlying data by setting it to a zero
value: 0 for a counter object, empty map for a map.
No state operations can be applied on a tombstoned object.
Tombstoned objects are not surfaced to the end users.
When deleted, object triggers a subscription callback with cleared data.

Resolves DTP-986
  • Loading branch information
VeskeR committed Dec 11, 2024
1 parent edf1a7e commit a44b392
Show file tree
Hide file tree
Showing 10 changed files with 653 additions and 65 deletions.
6 changes: 3 additions & 3 deletions ably.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2103,12 +2103,12 @@ export type DefaultRoot =
*/
export declare interface LiveMap<T extends LiveMapType> extends LiveObject<LiveMapUpdate> {
/**
* Returns the value associated with a given key. Returns `undefined` if the key doesn't exist in a map.
* Returns the value associated with a given key. Returns `undefined` if the key doesn't exist in a map or if the associated {@link LiveObject} has been deleted.
*
* @param key - The key to retrieve the value for.
* @returns A {@link LiveObject}, a primitive type (string, number, boolean, or binary data) or `undefined` if the key doesn't exist in a map.
* @returns A {@link LiveObject}, a primitive type (string, number, boolean, or binary data) or `undefined` if the key doesn't exist in a map or the associated {@link LiveObject} has been deleted.
*/
get<TKey extends keyof T & string>(key: TKey): T[TKey];
get<TKey extends keyof T & string>(key: TKey): T[TKey] extends StateValue ? T[TKey] : T[TKey] | undefined;

/**
* Returns the number of key/value pairs in the map.
Expand Down
39 changes: 31 additions & 8 deletions src/plugins/liveobjects/livecounter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,11 @@ export class LiveCounter extends LiveObject<LiveCounterData, LiveCounterUpdate>
// as it's important to mark that the op was processed by the object
this._siteTimeserials[opSiteCode] = opOriginTimeserial;

if (this.isTombstoned()) {
// this object is tombstoned so the operation cannot be applied
return;
}

let update: LiveCounterUpdate | LiveObjectUpdateNoop;
switch (op.action) {
case StateOperationAction.COUNTER_CREATE:
Expand All @@ -79,6 +84,10 @@ export class LiveCounter extends LiveObject<LiveCounterData, LiveCounterUpdate>
}
break;

case StateOperationAction.OBJECT_DELETE:
update = this._applyObjectDelete();
break;

default:
throw new this._client.ErrorInfo(
`Invalid ${op.action} op for LiveCounter objectId=${this.getObjectId()}`,
Expand All @@ -93,7 +102,7 @@ export class LiveCounter extends LiveObject<LiveCounterData, LiveCounterUpdate>
/**
* @internal
*/
overrideWithStateObject(stateObject: StateObject): LiveCounterUpdate {
overrideWithStateObject(stateObject: StateObject): LiveCounterUpdate | LiveObjectUpdateNoop {
if (stateObject.objectId !== this.getObjectId()) {
throw new this._client.ErrorInfo(
`Invalid state object: state object objectId=${stateObject.objectId}; LiveCounter objectId=${this.getObjectId()}`,
Expand Down Expand Up @@ -121,16 +130,30 @@ export class LiveCounter extends LiveObject<LiveCounterData, LiveCounterUpdate>
}
}

const previousDataRef = this._dataRef;
// override all relevant data for this object with data from the state object
this._createOperationIsMerged = false;
this._dataRef = { data: stateObject.counter?.count ?? 0 };
// should default to empty map if site timeserials do not exist on the state object, so that any future operation can be applied to this object
// object's site timeserials are still updated even if it is tombstoned, so always use the site timeserials received from the op.
// should default to empty map if site timeserials do not exist on the state object, so that any future operation may be applied to this object.
this._siteTimeserials = stateObject.siteTimeserials ?? {};
if (!this._client.Utils.isNil(stateObject.createOp)) {
this._mergeInitialDataFromCreateOperation(stateObject.createOp);

if (this.isTombstoned()) {
// this object is tombstoned. this is a terminal state which can't be overriden. skip the rest of state object message processing
return { noop: true };
}

const previousDataRef = this._dataRef;
if (stateObject.tombstone) {
// tombstone this object and ignore the data from the state object message
this.tombstone();
} else {
// override data for this object with data from the state object
this._createOperationIsMerged = false;
this._dataRef = { data: stateObject.counter?.count ?? 0 };
if (!this._client.Utils.isNil(stateObject.createOp)) {
this._mergeInitialDataFromCreateOperation(stateObject.createOp);
}
}

// if object got tombstoned, the update object will include all data that got cleared.
// otherwise it is a diff between previous value and new value from state object.
return this._updateFromDataDiff(previousDataRef, this._dataRef);
}

Expand Down
80 changes: 64 additions & 16 deletions src/plugins/liveobjects/livemap.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,13 +77,15 @@ export class LiveMap<T extends API.LiveMapType> extends LiveObject<LiveMapData,

/**
* Returns the value associated with the specified key in the underlying Map object.
* If no element is associated with the specified key, undefined is returned.
* If the value that is associated to the provided key is an objectId string of another Live Object,
* then you will get a reference to that Live Object if it exists in the local pool, or undefined otherwise.
* If the value is not an objectId, then you will get that value.
*
* - If no entry is associated with the specified key, `undefined` is returned.
* - If map entry is tombstoned (deleted), `undefined` is returned.
* - If the value associated with the provided key is an objectId string of another Live Object, a reference to that Live Object
* is returned, provided it exists in the local pool and is not tombstoned. Otherwise, `undefined` is returned.
* - If the value is not an objectId, then that value is returned.
*/
// force the key to be of type string as we only allow strings as key in a map
get<TKey extends keyof T & string>(key: TKey): T[TKey] {
get<TKey extends keyof T & string>(key: TKey): T[TKey] extends StateValue ? T[TKey] : T[TKey] | undefined {
const element = this._dataRef.data.get(key);

if (element === undefined) {
Expand All @@ -94,14 +96,26 @@ export class LiveMap<T extends API.LiveMapType> extends LiveObject<LiveMapData,
return undefined as T[TKey];
}

// data exists for non-tombstoned elements
// data always exists for non-tombstoned elements
const data = element.data!;

if ('value' in data) {
// map entry has a primitive type value, just return it as is.
return data.value as T[TKey];
} else {
return this._liveObjects.getPool().get(data.objectId) as T[TKey];
}

// map entry points to another object, get it from the pool
const refObject: LiveObject | undefined = this._liveObjects.getPool().get(data.objectId);
if (!refObject) {
return undefined as T[TKey];
}

if (refObject.isTombstoned()) {
// tombstoned objects must not be surfaced to the end users
return undefined as T[TKey];
}

return refObject as API.LiveObject as T[TKey];
}

size(): number {
Expand All @@ -112,6 +126,17 @@ export class LiveMap<T extends API.LiveMapType> extends LiveObject<LiveMapData,
continue;
}

// data always exists for non-tombstoned elements
const data = value.data!;
if ('objectId' in data) {
const refObject = this._liveObjects.getPool().get(data.objectId);

if (refObject?.isTombstoned()) {
// should not count tombstoned objects
continue;
}
}

size++;
}

Expand Down Expand Up @@ -145,6 +170,11 @@ export class LiveMap<T extends API.LiveMapType> extends LiveObject<LiveMapData,
// as it's important to mark that the op was processed by the object
this._siteTimeserials[opSiteCode] = opOriginTimeserial;

if (this.isTombstoned()) {
// this object is tombstoned so the operation cannot be applied
return;
}

let update: LiveMapUpdate | LiveObjectUpdateNoop;
switch (op.action) {
case StateOperationAction.MAP_CREATE:
Expand All @@ -171,6 +201,10 @@ export class LiveMap<T extends API.LiveMapType> extends LiveObject<LiveMapData,
}
break;

case StateOperationAction.OBJECT_DELETE:
update = this._applyObjectDelete();
break;

default:
throw new this._client.ErrorInfo(
`Invalid ${op.action} op for LiveMap objectId=${this.getObjectId()}`,
Expand All @@ -185,7 +219,7 @@ export class LiveMap<T extends API.LiveMapType> extends LiveObject<LiveMapData,
/**
* @internal
*/
overrideWithStateObject(stateObject: StateObject): LiveMapUpdate {
overrideWithStateObject(stateObject: StateObject): LiveMapUpdate | LiveObjectUpdateNoop {
if (stateObject.objectId !== this.getObjectId()) {
throw new this._client.ErrorInfo(
`Invalid state object: state object objectId=${stateObject.objectId}; LiveMap objectId=${this.getObjectId()}`,
Expand Down Expand Up @@ -229,16 +263,30 @@ export class LiveMap<T extends API.LiveMapType> extends LiveObject<LiveMapData,
}
}

const previousDataRef = this._dataRef;
// override all relevant data for this object with data from the state object
this._createOperationIsMerged = false;
this._dataRef = this._liveMapDataFromMapEntries(stateObject.map?.entries ?? {});
// should default to empty map if site timeserials do not exist on the state object, so that any future operation can be applied to this object
// object's site timeserials are still updated even if it is tombstoned, so always use the site timeserials received from the op.
// should default to empty map if site timeserials do not exist on the state object, so that any future operation may be applied to this object.
this._siteTimeserials = stateObject.siteTimeserials ?? {};
if (!this._client.Utils.isNil(stateObject.createOp)) {
this._mergeInitialDataFromCreateOperation(stateObject.createOp);

if (this.isTombstoned()) {
// this object is tombstoned. this is a terminal state which can't be overriden. skip the rest of state object message processing
return { noop: true };
}

const previousDataRef = this._dataRef;
if (stateObject.tombstone) {
// tombstone this object and ignore the data from the state object message
this.tombstone();
} else {
// override data for this object with data from the state object
this._createOperationIsMerged = false;
this._dataRef = this._liveMapDataFromMapEntries(stateObject.map?.entries ?? {});
if (!this._client.Utils.isNil(stateObject.createOp)) {
this._mergeInitialDataFromCreateOperation(stateObject.createOp);
}
}

// if object got tombstoned, the update object will include all data that got cleared.
// otherwise it is a diff between previous value and new value from state object.
return this._updateFromDataDiff(previousDataRef, this._dataRef);
}

Expand Down
32 changes: 29 additions & 3 deletions src/plugins/liveobjects/liveobject.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,18 +39,20 @@ export abstract class LiveObject<
protected _dataRef: TData;
protected _siteTimeserials: Record<string, string>;
protected _createOperationIsMerged: boolean;
private _tombstone: boolean;

protected constructor(
protected _liveObjects: LiveObjects,
objectId: string,
) {
this._client = this._liveObjects.getClient();
this._eventEmitter = new this._client.EventEmitter(this._client.logger);
this._dataRef = this._getZeroValueData();
this._createOperationIsMerged = false;
this._objectId = objectId;
this._dataRef = this._getZeroValueData();
// use empty timeserials vector by default, so any future operation can be applied to this object
this._siteTimeserials = {};
this._createOperationIsMerged = false;
this._tombstone = false;
}

subscribe(listener: (update: TUpdate) => void): SubscribeResponse {
Expand Down Expand Up @@ -99,6 +101,24 @@ export abstract class LiveObject<
this._eventEmitter.emit(LiveObjectEvents.Updated, update);
}

/**
* Clears the object's state, cancels any buffered operations and sets the tombstone flag to `true`.
*
* @internal
*/
tombstone(): void {
this._tombstone = true;
this._dataRef = this._getZeroValueData();
// TODO: emit "deleted" event so that end users get notified about this object getting deleted
}

/**
* @internal
*/
isTombstoned(): boolean {
return this._tombstone;
}

/**
* Returns true if the given origin timeserial indicates that the operation to which it belongs should be applied to the object.
*
Expand All @@ -118,6 +138,12 @@ export abstract class LiveObject<
return !siteTimeserial || opOriginTimeserial > siteTimeserial;
}

protected _applyObjectDelete(): TUpdate {
const previousDataRef = this._dataRef;
this.tombstone();
return this._updateFromDataDiff(previousDataRef, this._dataRef);
}

private _createObjectId(): string {
// TODO: implement object id generation based on live object type and initial value
return Math.random().toString().substring(2);
Expand All @@ -141,7 +167,7 @@ export abstract class LiveObject<
*
* @internal
*/
abstract overrideWithStateObject(stateObject: StateObject): TUpdate;
abstract overrideWithStateObject(stateObject: StateObject): TUpdate | LiveObjectUpdateNoop;
protected abstract _getZeroValueData(): TData;
/**
* Calculate the update object based on the current Live Object data and incoming new data.
Expand Down
5 changes: 3 additions & 2 deletions src/plugins/liveobjects/liveobjects.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import type EventEmitter from 'common/lib/util/eventemitter';
import type * as API from '../../../ably';
import { LiveCounter } from './livecounter';
import { LiveMap } from './livemap';
import { LiveObject, LiveObjectUpdate } from './liveobject';
import { LiveObject, LiveObjectUpdate, LiveObjectUpdateNoop } from './liveobject';
import { LiveObjectsPool, ROOT_OBJECT_ID } from './liveobjectspool';
import { StateMessage, StateOperationAction } from './statemessage';
import { SyncLiveObjectsDataPool } from './syncliveobjectsdatapool';
Expand Down Expand Up @@ -193,7 +193,7 @@ export class LiveObjects {
}

const receivedObjectIds = new Set<string>();
const existingObjectUpdates: { object: LiveObject; update: LiveObjectUpdate }[] = [];
const existingObjectUpdates: { object: LiveObject; update: LiveObjectUpdate | LiveObjectUpdateNoop }[] = [];

for (const [objectId, entry] of this._syncLiveObjectsDataPool.entries()) {
receivedObjectIds.add(objectId);
Expand Down Expand Up @@ -253,6 +253,7 @@ export class LiveObjects {
case StateOperationAction.MAP_SET:
case StateOperationAction.MAP_REMOVE:
case StateOperationAction.COUNTER_INC:
case StateOperationAction.OBJECT_DELETE:
// we can receive an op for an object id we don't have yet in the pool. instead of buffering such operations,
// we can create a zero-value object for the provided object id and apply the operation to that zero-value object.
// this also means that all objects are capable of applying the corresponding *_CREATE ops on themselves,
Expand Down
3 changes: 3 additions & 0 deletions src/plugins/liveobjects/statemessage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ export enum StateOperationAction {
MAP_REMOVE = 2,
COUNTER_CREATE = 3,
COUNTER_INC = 4,
OBJECT_DELETE = 5,
}

export enum MapSemantics {
Expand Down Expand Up @@ -106,6 +107,8 @@ export interface StateObject {
objectId: string;
/** A vector of origin timeserials keyed by site code of the last operation that was applied to this state object. */
siteTimeserials: Record<string, string>;
/** True if the object has been tombstoned. */
tombstone: boolean;
/**
* The operation that created the state object.
*
Expand Down
19 changes: 17 additions & 2 deletions test/common/modules/live_objects_helper.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ define(['ably', 'shared_helper', 'live_objects'], function (Ably, Helper, LiveOb
MAP_REMOVE: 2,
COUNTER_CREATE: 3,
COUNTER_INC: 4,
OBJECT_DELETE: 5,
};

function nonce() {
Expand Down Expand Up @@ -180,12 +181,25 @@ define(['ably', 'shared_helper', 'live_objects'], function (Ably, Helper, LiveOb
return op;
}

objectDeleteOp(opts) {
const { objectId } = opts ?? {};
const op = {
operation: {
action: ACTIONS.OBJECT_DELETE,
objectId,
},
};

return op;
}

mapObject(opts) {
const { objectId, siteTimeserials, initialEntries, materialisedEntries } = opts;
const { objectId, siteTimeserials, initialEntries, materialisedEntries, tombstone } = opts;
const obj = {
object: {
objectId,
siteTimeserials,
tombstone: tombstone === true,
map: {
semantics: 0,
entries: materialisedEntries,
Expand All @@ -201,11 +215,12 @@ define(['ably', 'shared_helper', 'live_objects'], function (Ably, Helper, LiveOb
}

counterObject(opts) {
const { objectId, siteTimeserials, initialCount, materialisedCount } = opts;
const { objectId, siteTimeserials, initialCount, materialisedCount, tombstone } = opts;
const obj = {
object: {
objectId,
siteTimeserials,
tombstone: tombstone === true,
counter: {
count: materialisedCount,
},
Expand Down
Loading

0 comments on commit a44b392

Please sign in to comment.