Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DTP-986] Handle StateObject.tombstone and OBJECT_DELETE messages #1934

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions ably.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2103,12 +2103,14 @@ export type DefaultRoot =
*/
export declare interface LiveMap<T extends LiveMapType> extends LiveObject<LiveMapUpdate> {
/**
* Returns the value associated with a given key. Returns `undefined` if the key doesn't exist in a map.
* Returns the value associated with a given key. Returns `undefined` if the key doesn't exist in a map or if the associated {@link LiveObject} has been deleted.
*
* Always returns undefined if this map object is deleted.
*
* @param key - The key to retrieve the value for.
* @returns A {@link LiveObject}, a primitive type (string, number, boolean, or binary data) or `undefined` if the key doesn't exist in a map.
* @returns A {@link LiveObject}, a primitive type (string, number, boolean, or binary data) or `undefined` if the key doesn't exist in a map or the associated {@link LiveObject} has been deleted. Always `undefined` if this map object is deleted.
*/
get<TKey extends keyof T & string>(key: TKey): T[TKey];
get<TKey extends keyof T & string>(key: TKey): T[TKey] | undefined;

/**
* Returns the number of key/value pairs in the map.
Expand Down
39 changes: 31 additions & 8 deletions src/plugins/liveobjects/livecounter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,11 @@ export class LiveCounter extends LiveObject<LiveCounterData, LiveCounterUpdate>
// as it's important to mark that the op was processed by the object
this._siteTimeserials[opSiteCode] = opOriginTimeserial;

if (this.isTombstoned()) {
// this object is tombstoned so the operation cannot be applied
return;
}

let update: LiveCounterUpdate | LiveObjectUpdateNoop;
switch (op.action) {
case StateOperationAction.COUNTER_CREATE:
Expand All @@ -79,6 +84,10 @@ export class LiveCounter extends LiveObject<LiveCounterData, LiveCounterUpdate>
}
break;

case StateOperationAction.OBJECT_DELETE:
update = this._applyObjectDelete();
break;

default:
throw new this._client.ErrorInfo(
`Invalid ${op.action} op for LiveCounter objectId=${this.getObjectId()}`,
Expand All @@ -93,7 +102,7 @@ export class LiveCounter extends LiveObject<LiveCounterData, LiveCounterUpdate>
/**
* @internal
*/
overrideWithStateObject(stateObject: StateObject): LiveCounterUpdate {
overrideWithStateObject(stateObject: StateObject): LiveCounterUpdate | LiveObjectUpdateNoop {
if (stateObject.objectId !== this.getObjectId()) {
throw new this._client.ErrorInfo(
`Invalid state object: state object objectId=${stateObject.objectId}; LiveCounter objectId=${this.getObjectId()}`,
Expand Down Expand Up @@ -121,16 +130,30 @@ export class LiveCounter extends LiveObject<LiveCounterData, LiveCounterUpdate>
}
}

const previousDataRef = this._dataRef;
// override all relevant data for this object with data from the state object
this._createOperationIsMerged = false;
this._dataRef = { data: stateObject.counter?.count ?? 0 };
// should default to empty map if site timeserials do not exist on the state object, so that any future operation can be applied to this object
// object's site timeserials are still updated even if it is tombstoned, so always use the site timeserials received from the op.
// should default to empty map if site timeserials do not exist on the state object, so that any future operation may be applied to this object.
this._siteTimeserials = stateObject.siteTimeserials ?? {};
if (!this._client.Utils.isNil(stateObject.createOp)) {
this._mergeInitialDataFromCreateOperation(stateObject.createOp);

if (this.isTombstoned()) {
// this object is tombstoned. this is a terminal state which can't be overriden. skip the rest of state object message processing
return { noop: true };
}

const previousDataRef = this._dataRef;
if (stateObject.tombstone) {
// tombstone this object and ignore the data from the state object message
this.tombstone();
} else {
// override data for this object with data from the state object
this._createOperationIsMerged = false;
this._dataRef = { data: stateObject.counter?.count ?? 0 };
if (!this._client.Utils.isNil(stateObject.createOp)) {
this._mergeInitialDataFromCreateOperation(stateObject.createOp);
}
}

// if object got tombstoned, the update object will include all data that got cleared.
// otherwise it is a diff between previous value and new value from state object.
return this._updateFromDataDiff(previousDataRef, this._dataRef);
}

Expand Down
85 changes: 69 additions & 16 deletions src/plugins/liveobjects/livemap.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,13 +77,20 @@ export class LiveMap<T extends API.LiveMapType> extends LiveObject<LiveMapData,

/**
* Returns the value associated with the specified key in the underlying Map object.
* If no element is associated with the specified key, undefined is returned.
* If the value that is associated to the provided key is an objectId string of another Live Object,
* then you will get a reference to that Live Object if it exists in the local pool, or undefined otherwise.
* If the value is not an objectId, then you will get that value.
*
* - If this map object is tombstoned (deleted), `undefined` is returned.
* - If no entry is associated with the specified key, `undefined` is returned.
* - If map entry is tombstoned (deleted), `undefined` is returned.
* - If the value associated with the provided key is an objectId string of another Live Object, a reference to that Live Object
* is returned, provided it exists in the local pool and is not tombstoned. Otherwise, `undefined` is returned.
* - If the value is not an objectId, then that value is returned.
*/
// force the key to be of type string as we only allow strings as key in a map
get<TKey extends keyof T & string>(key: TKey): T[TKey] {
get<TKey extends keyof T & string>(key: TKey): T[TKey] | undefined {
if (this.isTombstoned()) {
return undefined as T[TKey];
}

const element = this._dataRef.data.get(key);

if (element === undefined) {
Expand All @@ -94,14 +101,26 @@ export class LiveMap<T extends API.LiveMapType> extends LiveObject<LiveMapData,
return undefined as T[TKey];
}

// data exists for non-tombstoned elements
// data always exists for non-tombstoned elements
const data = element.data!;

if ('value' in data) {
// map entry has a primitive type value, just return it as is.
return data.value as T[TKey];
} else {
return this._liveObjects.getPool().get(data.objectId) as T[TKey];
}

// map entry points to another object, get it from the pool
const refObject: LiveObject | undefined = this._liveObjects.getPool().get(data.objectId);
if (!refObject) {
return undefined as T[TKey];
}

if (refObject.isTombstoned()) {
// tombstoned objects must not be surfaced to the end users
return undefined as T[TKey];
}

return refObject as API.LiveObject as T[TKey];
}

size(): number {
Expand All @@ -112,6 +131,17 @@ export class LiveMap<T extends API.LiveMapType> extends LiveObject<LiveMapData,
continue;
}

// data always exists for non-tombstoned elements
const data = value.data!;
if ('objectId' in data) {
const refObject = this._liveObjects.getPool().get(data.objectId);

if (refObject?.isTombstoned()) {
// should not count tombstoned objects
continue;
}
}

size++;
}

Expand Down Expand Up @@ -145,6 +175,11 @@ export class LiveMap<T extends API.LiveMapType> extends LiveObject<LiveMapData,
// as it's important to mark that the op was processed by the object
this._siteTimeserials[opSiteCode] = opOriginTimeserial;

if (this.isTombstoned()) {
// this object is tombstoned so the operation cannot be applied
return;
}

let update: LiveMapUpdate | LiveObjectUpdateNoop;
switch (op.action) {
case StateOperationAction.MAP_CREATE:
Expand All @@ -171,6 +206,10 @@ export class LiveMap<T extends API.LiveMapType> extends LiveObject<LiveMapData,
}
break;

case StateOperationAction.OBJECT_DELETE:
update = this._applyObjectDelete();
break;

default:
throw new this._client.ErrorInfo(
`Invalid ${op.action} op for LiveMap objectId=${this.getObjectId()}`,
Expand All @@ -185,7 +224,7 @@ export class LiveMap<T extends API.LiveMapType> extends LiveObject<LiveMapData,
/**
* @internal
*/
overrideWithStateObject(stateObject: StateObject): LiveMapUpdate {
overrideWithStateObject(stateObject: StateObject): LiveMapUpdate | LiveObjectUpdateNoop {
if (stateObject.objectId !== this.getObjectId()) {
throw new this._client.ErrorInfo(
`Invalid state object: state object objectId=${stateObject.objectId}; LiveMap objectId=${this.getObjectId()}`,
Expand Down Expand Up @@ -229,16 +268,30 @@ export class LiveMap<T extends API.LiveMapType> extends LiveObject<LiveMapData,
}
}

const previousDataRef = this._dataRef;
// override all relevant data for this object with data from the state object
this._createOperationIsMerged = false;
this._dataRef = this._liveMapDataFromMapEntries(stateObject.map?.entries ?? {});
// should default to empty map if site timeserials do not exist on the state object, so that any future operation can be applied to this object
// object's site timeserials are still updated even if it is tombstoned, so always use the site timeserials received from the op.
// should default to empty map if site timeserials do not exist on the state object, so that any future operation may be applied to this object.
this._siteTimeserials = stateObject.siteTimeserials ?? {};
if (!this._client.Utils.isNil(stateObject.createOp)) {
this._mergeInitialDataFromCreateOperation(stateObject.createOp);

if (this.isTombstoned()) {
// this object is tombstoned. this is a terminal state which can't be overriden. skip the rest of state object message processing
return { noop: true };
}

const previousDataRef = this._dataRef;
if (stateObject.tombstone) {
// tombstone this object and ignore the data from the state object message
this.tombstone();
} else {
// override data for this object with data from the state object
this._createOperationIsMerged = false;
this._dataRef = this._liveMapDataFromMapEntries(stateObject.map?.entries ?? {});
if (!this._client.Utils.isNil(stateObject.createOp)) {
this._mergeInitialDataFromCreateOperation(stateObject.createOp);
}
}

// if object got tombstoned, the update object will include all data that got cleared.
// otherwise it is a diff between previous value and new value from state object.
return this._updateFromDataDiff(previousDataRef, this._dataRef);
}

Expand Down
32 changes: 29 additions & 3 deletions src/plugins/liveobjects/liveobject.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,18 +39,20 @@ export abstract class LiveObject<
protected _dataRef: TData;
protected _siteTimeserials: Record<string, string>;
protected _createOperationIsMerged: boolean;
private _tombstone: boolean;

protected constructor(
protected _liveObjects: LiveObjects,
objectId: string,
) {
this._client = this._liveObjects.getClient();
this._eventEmitter = new this._client.EventEmitter(this._client.logger);
this._dataRef = this._getZeroValueData();
this._createOperationIsMerged = false;
this._objectId = objectId;
this._dataRef = this._getZeroValueData();
// use empty timeserials vector by default, so any future operation can be applied to this object
this._siteTimeserials = {};
this._createOperationIsMerged = false;
this._tombstone = false;
}

subscribe(listener: (update: TUpdate) => void): SubscribeResponse {
Expand Down Expand Up @@ -99,6 +101,24 @@ export abstract class LiveObject<
this._eventEmitter.emit(LiveObjectEvents.Updated, update);
}

/**
* Clears the object's state, cancels any buffered operations and sets the tombstone flag to `true`.
*
* @internal
*/
tombstone(): void {
this._tombstone = true;
this._dataRef = this._getZeroValueData();
// TODO: emit "deleted" event so that end users get notified about this object getting deleted
}

/**
* @internal
*/
isTombstoned(): boolean {
return this._tombstone;
}

/**
* Returns true if the given origin timeserial indicates that the operation to which it belongs should be applied to the object.
*
Expand All @@ -118,6 +138,12 @@ export abstract class LiveObject<
return !siteTimeserial || opOriginTimeserial > siteTimeserial;
}

protected _applyObjectDelete(): TUpdate {
const previousDataRef = this._dataRef;
this.tombstone();
return this._updateFromDataDiff(previousDataRef, this._dataRef);
}

private _createObjectId(): string {
// TODO: implement object id generation based on live object type and initial value
return Math.random().toString().substring(2);
Expand All @@ -141,7 +167,7 @@ export abstract class LiveObject<
*
* @internal
*/
abstract overrideWithStateObject(stateObject: StateObject): TUpdate;
abstract overrideWithStateObject(stateObject: StateObject): TUpdate | LiveObjectUpdateNoop;
protected abstract _getZeroValueData(): TData;
/**
* Calculate the update object based on the current Live Object data and incoming new data.
Expand Down
52 changes: 47 additions & 5 deletions src/plugins/liveobjects/liveobjects.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ import type EventEmitter from 'common/lib/util/eventemitter';
import type * as API from '../../../ably';
import { LiveCounter } from './livecounter';
import { LiveMap } from './livemap';
import { LiveObject, LiveObjectUpdate } from './liveobject';
import { LiveObject, LiveObjectUpdate, LiveObjectUpdateNoop } from './liveobject';
import { LiveObjectsPool, ROOT_OBJECT_ID } from './liveobjectspool';
import { StateMessage } from './statemessage';
import { StateMessage, StateOperationAction } from './statemessage';
import { SyncLiveObjectsDataPool } from './syncliveobjectsdatapool';

enum LiveObjectsEvents {
Expand Down Expand Up @@ -101,7 +101,7 @@ export class LiveObjects {
return;
}

this._liveObjectsPool.applyStateMessages(stateMessages);
this._applyStateMessages(stateMessages);
}

/**
Expand Down Expand Up @@ -159,7 +159,7 @@ export class LiveObjects {
this._applySync();
// should apply buffered state operations after we applied the SYNC data.
// can use regular state messages application logic
this._liveObjectsPool.applyStateMessages(this._bufferedStateOperations);
this._applyStateMessages(this._bufferedStateOperations);

this._bufferedStateOperations = [];
this._syncLiveObjectsDataPool.reset();
Expand Down Expand Up @@ -193,7 +193,7 @@ export class LiveObjects {
}

const receivedObjectIds = new Set<string>();
const existingObjectUpdates: { object: LiveObject; update: LiveObjectUpdate }[] = [];
const existingObjectUpdates: { object: LiveObject; update: LiveObjectUpdate | LiveObjectUpdateNoop }[] = [];

for (const [objectId, entry] of this._syncLiveObjectsDataPool.entries()) {
receivedObjectIds.add(objectId);
Expand Down Expand Up @@ -232,4 +232,46 @@ export class LiveObjects {
// call subscription callbacks for all updated existing objects
existingObjectUpdates.forEach(({ object, update }) => object.notifyUpdated(update));
}

private _applyStateMessages(stateMessages: StateMessage[]): void {
for (const stateMessage of stateMessages) {
if (!stateMessage.operation) {
this._client.Logger.logAction(
this._client.logger,
this._client.Logger.LOG_MAJOR,
'LiveObjects._applyStateMessages()',
`state operation message is received without 'operation' field, skipping message; message id: ${stateMessage.id}, channel: ${this._channel.name}`,
);
continue;
}

const stateOperation = stateMessage.operation;

switch (stateOperation.action) {
case StateOperationAction.MAP_CREATE:
case StateOperationAction.COUNTER_CREATE:
case StateOperationAction.MAP_SET:
case StateOperationAction.MAP_REMOVE:
case StateOperationAction.COUNTER_INC:
case StateOperationAction.OBJECT_DELETE:
// we can receive an op for an object id we don't have yet in the pool. instead of buffering such operations,
// we can create a zero-value object for the provided object id and apply the operation to that zero-value object.
// this also means that all objects are capable of applying the corresponding *_CREATE ops on themselves,
// since they need to be able to eventually initialize themselves from that *_CREATE op.
// so to simplify operations handling, we always try to create a zero-value object in the pool first,
// and then we can always apply the operation on the existing object in the pool.
this._liveObjectsPool.createZeroValueObjectIfNotExists(stateOperation.objectId);
this._liveObjectsPool.get(stateOperation.objectId)!.applyOperation(stateOperation, stateMessage);
break;

default:
this._client.Logger.logAction(
this._client.logger,
this._client.Logger.LOG_MAJOR,
'LiveObjects._applyStateMessages()',
`received unsupported action in state operation message: ${stateOperation.action}, skipping message; message id: ${stateMessage.id}, channel: ${this._channel.name}`,
);
}
}
}
}
Loading
Loading