Skip to content

Commit

Permalink
Merge pull request #1934 from ably/DTP-986/handle-tombstone-and-objec…
Browse files Browse the repository at this point in the history
…t-delete

[DTP-986] Handle `StateObject.tombstone` and `OBJECT_DELETE` messages
  • Loading branch information
VeskeR authored Jan 9, 2025
2 parents f08222d + a7df3b6 commit 2fac0e0
Show file tree
Hide file tree
Showing 11 changed files with 709 additions and 118 deletions.
8 changes: 5 additions & 3 deletions ably.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2103,12 +2103,14 @@ export type DefaultRoot =
*/
export declare interface LiveMap<T extends LiveMapType> extends LiveObject<LiveMapUpdate> {
/**
* Returns the value associated with a given key. Returns `undefined` if the key doesn't exist in a map.
* Returns the value associated with a given key. Returns `undefined` if the key doesn't exist in a map or if the associated {@link LiveObject} has been deleted.
*
* Always returns undefined if this map object is deleted.
*
* @param key - The key to retrieve the value for.
* @returns A {@link LiveObject}, a primitive type (string, number, boolean, or binary data) or `undefined` if the key doesn't exist in a map.
* @returns A {@link LiveObject}, a primitive type (string, number, boolean, or binary data) or `undefined` if the key doesn't exist in a map or the associated {@link LiveObject} has been deleted. Always `undefined` if this map object is deleted.
*/
get<TKey extends keyof T & string>(key: TKey): T[TKey];
get<TKey extends keyof T & string>(key: TKey): T[TKey] | undefined;

/**
* Returns the number of key/value pairs in the map.
Expand Down
39 changes: 31 additions & 8 deletions src/plugins/liveobjects/livecounter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,11 @@ export class LiveCounter extends LiveObject<LiveCounterData, LiveCounterUpdate>
// as it's important to mark that the op was processed by the object
this._siteTimeserials[opSiteCode] = opOriginTimeserial;

if (this.isTombstoned()) {
// this object is tombstoned so the operation cannot be applied
return;
}

let update: LiveCounterUpdate | LiveObjectUpdateNoop;
switch (op.action) {
case StateOperationAction.COUNTER_CREATE:
Expand All @@ -79,6 +84,10 @@ export class LiveCounter extends LiveObject<LiveCounterData, LiveCounterUpdate>
}
break;

case StateOperationAction.OBJECT_DELETE:
update = this._applyObjectDelete();
break;

default:
throw new this._client.ErrorInfo(
`Invalid ${op.action} op for LiveCounter objectId=${this.getObjectId()}`,
Expand All @@ -93,7 +102,7 @@ export class LiveCounter extends LiveObject<LiveCounterData, LiveCounterUpdate>
/**
* @internal
*/
overrideWithStateObject(stateObject: StateObject): LiveCounterUpdate {
overrideWithStateObject(stateObject: StateObject): LiveCounterUpdate | LiveObjectUpdateNoop {
if (stateObject.objectId !== this.getObjectId()) {
throw new this._client.ErrorInfo(
`Invalid state object: state object objectId=${stateObject.objectId}; LiveCounter objectId=${this.getObjectId()}`,
Expand Down Expand Up @@ -121,16 +130,30 @@ export class LiveCounter extends LiveObject<LiveCounterData, LiveCounterUpdate>
}
}

const previousDataRef = this._dataRef;
// override all relevant data for this object with data from the state object
this._createOperationIsMerged = false;
this._dataRef = { data: stateObject.counter?.count ?? 0 };
// should default to empty map if site timeserials do not exist on the state object, so that any future operation can be applied to this object
// object's site timeserials are still updated even if it is tombstoned, so always use the site timeserials received from the op.
// should default to empty map if site timeserials do not exist on the state object, so that any future operation may be applied to this object.
this._siteTimeserials = stateObject.siteTimeserials ?? {};
if (!this._client.Utils.isNil(stateObject.createOp)) {
this._mergeInitialDataFromCreateOperation(stateObject.createOp);

if (this.isTombstoned()) {
// this object is tombstoned. this is a terminal state which can't be overriden. skip the rest of state object message processing
return { noop: true };
}

const previousDataRef = this._dataRef;
if (stateObject.tombstone) {
// tombstone this object and ignore the data from the state object message
this.tombstone();
} else {
// override data for this object with data from the state object
this._createOperationIsMerged = false;
this._dataRef = { data: stateObject.counter?.count ?? 0 };
if (!this._client.Utils.isNil(stateObject.createOp)) {
this._mergeInitialDataFromCreateOperation(stateObject.createOp);
}
}

// if object got tombstoned, the update object will include all data that got cleared.
// otherwise it is a diff between previous value and new value from state object.
return this._updateFromDataDiff(previousDataRef, this._dataRef);
}

Expand Down
85 changes: 69 additions & 16 deletions src/plugins/liveobjects/livemap.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,13 +77,20 @@ export class LiveMap<T extends API.LiveMapType> extends LiveObject<LiveMapData,

/**
* Returns the value associated with the specified key in the underlying Map object.
* If no element is associated with the specified key, undefined is returned.
* If the value that is associated to the provided key is an objectId string of another Live Object,
* then you will get a reference to that Live Object if it exists in the local pool, or undefined otherwise.
* If the value is not an objectId, then you will get that value.
*
* - If this map object is tombstoned (deleted), `undefined` is returned.
* - If no entry is associated with the specified key, `undefined` is returned.
* - If map entry is tombstoned (deleted), `undefined` is returned.
* - If the value associated with the provided key is an objectId string of another Live Object, a reference to that Live Object
* is returned, provided it exists in the local pool and is not tombstoned. Otherwise, `undefined` is returned.
* - If the value is not an objectId, then that value is returned.
*/
// force the key to be of type string as we only allow strings as key in a map
get<TKey extends keyof T & string>(key: TKey): T[TKey] {
get<TKey extends keyof T & string>(key: TKey): T[TKey] | undefined {
if (this.isTombstoned()) {
return undefined as T[TKey];
}

const element = this._dataRef.data.get(key);

if (element === undefined) {
Expand All @@ -94,14 +101,26 @@ export class LiveMap<T extends API.LiveMapType> extends LiveObject<LiveMapData,
return undefined as T[TKey];
}

// data exists for non-tombstoned elements
// data always exists for non-tombstoned elements
const data = element.data!;

if ('value' in data) {
// map entry has a primitive type value, just return it as is.
return data.value as T[TKey];
} else {
return this._liveObjects.getPool().get(data.objectId) as T[TKey];
}

// map entry points to another object, get it from the pool
const refObject: LiveObject | undefined = this._liveObjects.getPool().get(data.objectId);
if (!refObject) {
return undefined as T[TKey];
}

if (refObject.isTombstoned()) {
// tombstoned objects must not be surfaced to the end users
return undefined as T[TKey];
}

return refObject as API.LiveObject as T[TKey];
}

size(): number {
Expand All @@ -112,6 +131,17 @@ export class LiveMap<T extends API.LiveMapType> extends LiveObject<LiveMapData,
continue;
}

// data always exists for non-tombstoned elements
const data = value.data!;
if ('objectId' in data) {
const refObject = this._liveObjects.getPool().get(data.objectId);

if (refObject?.isTombstoned()) {
// should not count tombstoned objects
continue;
}
}

size++;
}

Expand Down Expand Up @@ -145,6 +175,11 @@ export class LiveMap<T extends API.LiveMapType> extends LiveObject<LiveMapData,
// as it's important to mark that the op was processed by the object
this._siteTimeserials[opSiteCode] = opOriginTimeserial;

if (this.isTombstoned()) {
// this object is tombstoned so the operation cannot be applied
return;
}

let update: LiveMapUpdate | LiveObjectUpdateNoop;
switch (op.action) {
case StateOperationAction.MAP_CREATE:
Expand All @@ -171,6 +206,10 @@ export class LiveMap<T extends API.LiveMapType> extends LiveObject<LiveMapData,
}
break;

case StateOperationAction.OBJECT_DELETE:
update = this._applyObjectDelete();
break;

default:
throw new this._client.ErrorInfo(
`Invalid ${op.action} op for LiveMap objectId=${this.getObjectId()}`,
Expand All @@ -185,7 +224,7 @@ export class LiveMap<T extends API.LiveMapType> extends LiveObject<LiveMapData,
/**
* @internal
*/
overrideWithStateObject(stateObject: StateObject): LiveMapUpdate {
overrideWithStateObject(stateObject: StateObject): LiveMapUpdate | LiveObjectUpdateNoop {
if (stateObject.objectId !== this.getObjectId()) {
throw new this._client.ErrorInfo(
`Invalid state object: state object objectId=${stateObject.objectId}; LiveMap objectId=${this.getObjectId()}`,
Expand Down Expand Up @@ -229,16 +268,30 @@ export class LiveMap<T extends API.LiveMapType> extends LiveObject<LiveMapData,
}
}

const previousDataRef = this._dataRef;
// override all relevant data for this object with data from the state object
this._createOperationIsMerged = false;
this._dataRef = this._liveMapDataFromMapEntries(stateObject.map?.entries ?? {});
// should default to empty map if site timeserials do not exist on the state object, so that any future operation can be applied to this object
// object's site timeserials are still updated even if it is tombstoned, so always use the site timeserials received from the op.
// should default to empty map if site timeserials do not exist on the state object, so that any future operation may be applied to this object.
this._siteTimeserials = stateObject.siteTimeserials ?? {};
if (!this._client.Utils.isNil(stateObject.createOp)) {
this._mergeInitialDataFromCreateOperation(stateObject.createOp);

if (this.isTombstoned()) {
// this object is tombstoned. this is a terminal state which can't be overriden. skip the rest of state object message processing
return { noop: true };
}

const previousDataRef = this._dataRef;
if (stateObject.tombstone) {
// tombstone this object and ignore the data from the state object message
this.tombstone();
} else {
// override data for this object with data from the state object
this._createOperationIsMerged = false;
this._dataRef = this._liveMapDataFromMapEntries(stateObject.map?.entries ?? {});
if (!this._client.Utils.isNil(stateObject.createOp)) {
this._mergeInitialDataFromCreateOperation(stateObject.createOp);
}
}

// if object got tombstoned, the update object will include all data that got cleared.
// otherwise it is a diff between previous value and new value from state object.
return this._updateFromDataDiff(previousDataRef, this._dataRef);
}

Expand Down
32 changes: 29 additions & 3 deletions src/plugins/liveobjects/liveobject.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,18 +39,20 @@ export abstract class LiveObject<
protected _dataRef: TData;
protected _siteTimeserials: Record<string, string>;
protected _createOperationIsMerged: boolean;
private _tombstone: boolean;

protected constructor(
protected _liveObjects: LiveObjects,
objectId: string,
) {
this._client = this._liveObjects.getClient();
this._eventEmitter = new this._client.EventEmitter(this._client.logger);
this._dataRef = this._getZeroValueData();
this._createOperationIsMerged = false;
this._objectId = objectId;
this._dataRef = this._getZeroValueData();
// use empty timeserials vector by default, so any future operation can be applied to this object
this._siteTimeserials = {};
this._createOperationIsMerged = false;
this._tombstone = false;
}

subscribe(listener: (update: TUpdate) => void): SubscribeResponse {
Expand Down Expand Up @@ -99,6 +101,24 @@ export abstract class LiveObject<
this._eventEmitter.emit(LiveObjectEvents.Updated, update);
}

/**
* Clears the object's state, cancels any buffered operations and sets the tombstone flag to `true`.
*
* @internal
*/
tombstone(): void {
this._tombstone = true;
this._dataRef = this._getZeroValueData();
// TODO: emit "deleted" event so that end users get notified about this object getting deleted
}

/**
* @internal
*/
isTombstoned(): boolean {
return this._tombstone;
}

/**
* Returns true if the given origin timeserial indicates that the operation to which it belongs should be applied to the object.
*
Expand All @@ -118,6 +138,12 @@ export abstract class LiveObject<
return !siteTimeserial || opOriginTimeserial > siteTimeserial;
}

protected _applyObjectDelete(): TUpdate {
const previousDataRef = this._dataRef;
this.tombstone();
return this._updateFromDataDiff(previousDataRef, this._dataRef);
}

private _createObjectId(): string {
// TODO: implement object id generation based on live object type and initial value
return Math.random().toString().substring(2);
Expand All @@ -141,7 +167,7 @@ export abstract class LiveObject<
*
* @internal
*/
abstract overrideWithStateObject(stateObject: StateObject): TUpdate;
abstract overrideWithStateObject(stateObject: StateObject): TUpdate | LiveObjectUpdateNoop;
protected abstract _getZeroValueData(): TData;
/**
* Calculate the update object based on the current Live Object data and incoming new data.
Expand Down
52 changes: 47 additions & 5 deletions src/plugins/liveobjects/liveobjects.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ import type EventEmitter from 'common/lib/util/eventemitter';
import type * as API from '../../../ably';
import { LiveCounter } from './livecounter';
import { LiveMap } from './livemap';
import { LiveObject, LiveObjectUpdate } from './liveobject';
import { LiveObject, LiveObjectUpdate, LiveObjectUpdateNoop } from './liveobject';
import { LiveObjectsPool, ROOT_OBJECT_ID } from './liveobjectspool';
import { StateMessage } from './statemessage';
import { StateMessage, StateOperationAction } from './statemessage';
import { SyncLiveObjectsDataPool } from './syncliveobjectsdatapool';

enum LiveObjectsEvents {
Expand Down Expand Up @@ -101,7 +101,7 @@ export class LiveObjects {
return;
}

this._liveObjectsPool.applyStateMessages(stateMessages);
this._applyStateMessages(stateMessages);
}

/**
Expand Down Expand Up @@ -159,7 +159,7 @@ export class LiveObjects {
this._applySync();
// should apply buffered state operations after we applied the SYNC data.
// can use regular state messages application logic
this._liveObjectsPool.applyStateMessages(this._bufferedStateOperations);
this._applyStateMessages(this._bufferedStateOperations);

this._bufferedStateOperations = [];
this._syncLiveObjectsDataPool.reset();
Expand Down Expand Up @@ -193,7 +193,7 @@ export class LiveObjects {
}

const receivedObjectIds = new Set<string>();
const existingObjectUpdates: { object: LiveObject; update: LiveObjectUpdate }[] = [];
const existingObjectUpdates: { object: LiveObject; update: LiveObjectUpdate | LiveObjectUpdateNoop }[] = [];

for (const [objectId, entry] of this._syncLiveObjectsDataPool.entries()) {
receivedObjectIds.add(objectId);
Expand Down Expand Up @@ -232,4 +232,46 @@ export class LiveObjects {
// call subscription callbacks for all updated existing objects
existingObjectUpdates.forEach(({ object, update }) => object.notifyUpdated(update));
}

private _applyStateMessages(stateMessages: StateMessage[]): void {
for (const stateMessage of stateMessages) {
if (!stateMessage.operation) {
this._client.Logger.logAction(
this._client.logger,
this._client.Logger.LOG_MAJOR,
'LiveObjects._applyStateMessages()',
`state operation message is received without 'operation' field, skipping message; message id: ${stateMessage.id}, channel: ${this._channel.name}`,
);
continue;
}

const stateOperation = stateMessage.operation;

switch (stateOperation.action) {
case StateOperationAction.MAP_CREATE:
case StateOperationAction.COUNTER_CREATE:
case StateOperationAction.MAP_SET:
case StateOperationAction.MAP_REMOVE:
case StateOperationAction.COUNTER_INC:
case StateOperationAction.OBJECT_DELETE:
// we can receive an op for an object id we don't have yet in the pool. instead of buffering such operations,
// we can create a zero-value object for the provided object id and apply the operation to that zero-value object.
// this also means that all objects are capable of applying the corresponding *_CREATE ops on themselves,
// since they need to be able to eventually initialize themselves from that *_CREATE op.
// so to simplify operations handling, we always try to create a zero-value object in the pool first,
// and then we can always apply the operation on the existing object in the pool.
this._liveObjectsPool.createZeroValueObjectIfNotExists(stateOperation.objectId);
this._liveObjectsPool.get(stateOperation.objectId)!.applyOperation(stateOperation, stateMessage);
break;

default:
this._client.Logger.logAction(
this._client.logger,
this._client.Logger.LOG_MAJOR,
'LiveObjects._applyStateMessages()',
`received unsupported action in state operation message: ${stateOperation.action}, skipping message; message id: ${stateMessage.id}, channel: ${this._channel.name}`,
);
}
}
}
}
Loading

0 comments on commit 2fac0e0

Please sign in to comment.