diff --git a/ably.d.ts b/ably.d.ts index 5d1aa1f82..eaccb3ccb 100644 --- a/ably.d.ts +++ b/ably.d.ts @@ -2103,12 +2103,14 @@ export type DefaultRoot = */ export declare interface LiveMap extends LiveObject { /** - * Returns the value associated with a given key. Returns `undefined` if the key doesn't exist in a map. + * Returns the value associated with a given key. Returns `undefined` if the key doesn't exist in a map or if the associated {@link LiveObject} has been deleted. + * + * Always returns undefined if this map object is deleted. * * @param key - The key to retrieve the value for. - * @returns A {@link LiveObject}, a primitive type (string, number, boolean, or binary data) or `undefined` if the key doesn't exist in a map. + * @returns A {@link LiveObject}, a primitive type (string, number, boolean, or binary data) or `undefined` if the key doesn't exist in a map or the associated {@link LiveObject} has been deleted. Always `undefined` if this map object is deleted. */ - get(key: TKey): T[TKey]; + get(key: TKey): T[TKey] | undefined; /** * Returns the number of key/value pairs in the map. diff --git a/src/plugins/liveobjects/livecounter.ts b/src/plugins/liveobjects/livecounter.ts index dc144c325..c96d59afe 100644 --- a/src/plugins/liveobjects/livecounter.ts +++ b/src/plugins/liveobjects/livecounter.ts @@ -63,6 +63,11 @@ export class LiveCounter extends LiveObject // as it's important to mark that the op was processed by the object this._siteTimeserials[opSiteCode] = opOriginTimeserial; + if (this.isTombstoned()) { + // this object is tombstoned so the operation cannot be applied + return; + } + let update: LiveCounterUpdate | LiveObjectUpdateNoop; switch (op.action) { case StateOperationAction.COUNTER_CREATE: @@ -79,6 +84,10 @@ export class LiveCounter extends LiveObject } break; + case StateOperationAction.OBJECT_DELETE: + update = this._applyObjectDelete(); + break; + default: throw new this._client.ErrorInfo( `Invalid ${op.action} op for LiveCounter objectId=${this.getObjectId()}`, @@ -93,7 +102,7 @@ export class LiveCounter extends LiveObject /** * @internal */ - overrideWithStateObject(stateObject: StateObject): LiveCounterUpdate { + overrideWithStateObject(stateObject: StateObject): LiveCounterUpdate | LiveObjectUpdateNoop { if (stateObject.objectId !== this.getObjectId()) { throw new this._client.ErrorInfo( `Invalid state object: state object objectId=${stateObject.objectId}; LiveCounter objectId=${this.getObjectId()}`, @@ -121,16 +130,30 @@ export class LiveCounter extends LiveObject } } - const previousDataRef = this._dataRef; - // override all relevant data for this object with data from the state object - this._createOperationIsMerged = false; - this._dataRef = { data: stateObject.counter?.count ?? 0 }; - // should default to empty map if site timeserials do not exist on the state object, so that any future operation can be applied to this object + // object's site timeserials are still updated even if it is tombstoned, so always use the site timeserials received from the op. + // should default to empty map if site timeserials do not exist on the state object, so that any future operation may be applied to this object. this._siteTimeserials = stateObject.siteTimeserials ?? {}; - if (!this._client.Utils.isNil(stateObject.createOp)) { - this._mergeInitialDataFromCreateOperation(stateObject.createOp); + + if (this.isTombstoned()) { + // this object is tombstoned. this is a terminal state which can't be overriden. skip the rest of state object message processing + return { noop: true }; + } + + const previousDataRef = this._dataRef; + if (stateObject.tombstone) { + // tombstone this object and ignore the data from the state object message + this.tombstone(); + } else { + // override data for this object with data from the state object + this._createOperationIsMerged = false; + this._dataRef = { data: stateObject.counter?.count ?? 0 }; + if (!this._client.Utils.isNil(stateObject.createOp)) { + this._mergeInitialDataFromCreateOperation(stateObject.createOp); + } } + // if object got tombstoned, the update object will include all data that got cleared. + // otherwise it is a diff between previous value and new value from state object. return this._updateFromDataDiff(previousDataRef, this._dataRef); } diff --git a/src/plugins/liveobjects/livemap.ts b/src/plugins/liveobjects/livemap.ts index 1f3729c5f..b6899c2b7 100644 --- a/src/plugins/liveobjects/livemap.ts +++ b/src/plugins/liveobjects/livemap.ts @@ -77,13 +77,20 @@ export class LiveMap extends LiveObject(key: TKey): T[TKey] { + get(key: TKey): T[TKey] | undefined { + if (this.isTombstoned()) { + return undefined as T[TKey]; + } + const element = this._dataRef.data.get(key); if (element === undefined) { @@ -94,14 +101,26 @@ export class LiveMap extends LiveObject extends LiveObject extends LiveObject extends LiveObject extends LiveObject extends LiveObject; protected _createOperationIsMerged: boolean; + private _tombstone: boolean; protected constructor( protected _liveObjects: LiveObjects, @@ -46,11 +47,12 @@ export abstract class LiveObject< ) { this._client = this._liveObjects.getClient(); this._eventEmitter = new this._client.EventEmitter(this._client.logger); - this._dataRef = this._getZeroValueData(); - this._createOperationIsMerged = false; this._objectId = objectId; + this._dataRef = this._getZeroValueData(); // use empty timeserials vector by default, so any future operation can be applied to this object this._siteTimeserials = {}; + this._createOperationIsMerged = false; + this._tombstone = false; } subscribe(listener: (update: TUpdate) => void): SubscribeResponse { @@ -99,6 +101,24 @@ export abstract class LiveObject< this._eventEmitter.emit(LiveObjectEvents.Updated, update); } + /** + * Clears the object's state, cancels any buffered operations and sets the tombstone flag to `true`. + * + * @internal + */ + tombstone(): void { + this._tombstone = true; + this._dataRef = this._getZeroValueData(); + // TODO: emit "deleted" event so that end users get notified about this object getting deleted + } + + /** + * @internal + */ + isTombstoned(): boolean { + return this._tombstone; + } + /** * Returns true if the given origin timeserial indicates that the operation to which it belongs should be applied to the object. * @@ -118,6 +138,12 @@ export abstract class LiveObject< return !siteTimeserial || opOriginTimeserial > siteTimeserial; } + protected _applyObjectDelete(): TUpdate { + const previousDataRef = this._dataRef; + this.tombstone(); + return this._updateFromDataDiff(previousDataRef, this._dataRef); + } + private _createObjectId(): string { // TODO: implement object id generation based on live object type and initial value return Math.random().toString().substring(2); @@ -141,7 +167,7 @@ export abstract class LiveObject< * * @internal */ - abstract overrideWithStateObject(stateObject: StateObject): TUpdate; + abstract overrideWithStateObject(stateObject: StateObject): TUpdate | LiveObjectUpdateNoop; protected abstract _getZeroValueData(): TData; /** * Calculate the update object based on the current Live Object data and incoming new data. diff --git a/src/plugins/liveobjects/liveobjects.ts b/src/plugins/liveobjects/liveobjects.ts index 75b743d5b..6ba3a0f2c 100644 --- a/src/plugins/liveobjects/liveobjects.ts +++ b/src/plugins/liveobjects/liveobjects.ts @@ -4,9 +4,9 @@ import type EventEmitter from 'common/lib/util/eventemitter'; import type * as API from '../../../ably'; import { LiveCounter } from './livecounter'; import { LiveMap } from './livemap'; -import { LiveObject, LiveObjectUpdate } from './liveobject'; +import { LiveObject, LiveObjectUpdate, LiveObjectUpdateNoop } from './liveobject'; import { LiveObjectsPool, ROOT_OBJECT_ID } from './liveobjectspool'; -import { StateMessage } from './statemessage'; +import { StateMessage, StateOperationAction } from './statemessage'; import { SyncLiveObjectsDataPool } from './syncliveobjectsdatapool'; enum LiveObjectsEvents { @@ -101,7 +101,7 @@ export class LiveObjects { return; } - this._liveObjectsPool.applyStateMessages(stateMessages); + this._applyStateMessages(stateMessages); } /** @@ -159,7 +159,7 @@ export class LiveObjects { this._applySync(); // should apply buffered state operations after we applied the SYNC data. // can use regular state messages application logic - this._liveObjectsPool.applyStateMessages(this._bufferedStateOperations); + this._applyStateMessages(this._bufferedStateOperations); this._bufferedStateOperations = []; this._syncLiveObjectsDataPool.reset(); @@ -193,7 +193,7 @@ export class LiveObjects { } const receivedObjectIds = new Set(); - const existingObjectUpdates: { object: LiveObject; update: LiveObjectUpdate }[] = []; + const existingObjectUpdates: { object: LiveObject; update: LiveObjectUpdate | LiveObjectUpdateNoop }[] = []; for (const [objectId, entry] of this._syncLiveObjectsDataPool.entries()) { receivedObjectIds.add(objectId); @@ -232,4 +232,46 @@ export class LiveObjects { // call subscription callbacks for all updated existing objects existingObjectUpdates.forEach(({ object, update }) => object.notifyUpdated(update)); } + + private _applyStateMessages(stateMessages: StateMessage[]): void { + for (const stateMessage of stateMessages) { + if (!stateMessage.operation) { + this._client.Logger.logAction( + this._client.logger, + this._client.Logger.LOG_MAJOR, + 'LiveObjects._applyStateMessages()', + `state operation message is received without 'operation' field, skipping message; message id: ${stateMessage.id}, channel: ${this._channel.name}`, + ); + continue; + } + + const stateOperation = stateMessage.operation; + + switch (stateOperation.action) { + case StateOperationAction.MAP_CREATE: + case StateOperationAction.COUNTER_CREATE: + case StateOperationAction.MAP_SET: + case StateOperationAction.MAP_REMOVE: + case StateOperationAction.COUNTER_INC: + case StateOperationAction.OBJECT_DELETE: + // we can receive an op for an object id we don't have yet in the pool. instead of buffering such operations, + // we can create a zero-value object for the provided object id and apply the operation to that zero-value object. + // this also means that all objects are capable of applying the corresponding *_CREATE ops on themselves, + // since they need to be able to eventually initialize themselves from that *_CREATE op. + // so to simplify operations handling, we always try to create a zero-value object in the pool first, + // and then we can always apply the operation on the existing object in the pool. + this._liveObjectsPool.createZeroValueObjectIfNotExists(stateOperation.objectId); + this._liveObjectsPool.get(stateOperation.objectId)!.applyOperation(stateOperation, stateMessage); + break; + + default: + this._client.Logger.logAction( + this._client.logger, + this._client.Logger.LOG_MAJOR, + 'LiveObjects._applyStateMessages()', + `received unsupported action in state operation message: ${stateOperation.action}, skipping message; message id: ${stateMessage.id}, channel: ${this._channel.name}`, + ); + } + } + } } diff --git a/src/plugins/liveobjects/liveobjectspool.ts b/src/plugins/liveobjects/liveobjectspool.ts index 2c57f1084..eb42d47b4 100644 --- a/src/plugins/liveobjects/liveobjectspool.ts +++ b/src/plugins/liveobjects/liveobjectspool.ts @@ -1,11 +1,9 @@ import type BaseClient from 'common/lib/client/baseclient'; -import type RealtimeChannel from 'common/lib/client/realtimechannel'; import { LiveCounter } from './livecounter'; import { LiveMap } from './livemap'; import { LiveObject } from './liveobject'; import { LiveObjects } from './liveobjects'; import { ObjectId } from './objectid'; -import { StateMessage, StateOperationAction } from './statemessage'; export const ROOT_OBJECT_ID = 'root'; @@ -14,12 +12,10 @@ export const ROOT_OBJECT_ID = 'root'; */ export class LiveObjectsPool { private _client: BaseClient; - private _channel: RealtimeChannel; private _pool: Map; constructor(private _liveObjects: LiveObjects) { this._client = this._liveObjects.getClient(); - this._channel = this._liveObjects.getChannel(); this._pool = this._getInitialPool(); } @@ -66,47 +62,6 @@ export class LiveObjectsPool { this.set(objectId, zeroValueObject); } - applyStateMessages(stateMessages: StateMessage[]): void { - for (const stateMessage of stateMessages) { - if (!stateMessage.operation) { - this._client.Logger.logAction( - this._client.logger, - this._client.Logger.LOG_MAJOR, - 'LiveObjects.LiveObjectsPool.applyStateMessages()', - `state operation message is received without 'operation' field, skipping message; message id: ${stateMessage.id}, channel: ${this._channel.name}`, - ); - continue; - } - - const stateOperation = stateMessage.operation; - - switch (stateOperation.action) { - case StateOperationAction.MAP_CREATE: - case StateOperationAction.COUNTER_CREATE: - case StateOperationAction.MAP_SET: - case StateOperationAction.MAP_REMOVE: - case StateOperationAction.COUNTER_INC: - // we can receive an op for an object id we don't have yet in the pool. instead of buffering such operations, - // we can create a zero-value object for the provided object id and apply the operation to that zero-value object. - // this also means that all objects are capable of applying the corresponding *_CREATE ops on themselves, - // since they need to be able to eventually initialize themselves from that *_CREATE op. - // so to simplify operations handling, we always try to create a zero-value object in the pool first, - // and then we can always apply the operation on the existing object in the pool. - this.createZeroValueObjectIfNotExists(stateOperation.objectId); - this.get(stateOperation.objectId)!.applyOperation(stateOperation, stateMessage); - break; - - default: - this._client.Logger.logAction( - this._client.logger, - this._client.Logger.LOG_MAJOR, - 'LiveObjects.LiveObjectsPool.applyStateMessages()', - `received unsupported action in state operation message: ${stateOperation.action}, skipping message; message id: ${stateMessage.id}, channel: ${this._channel.name}`, - ); - } - } - } - private _getInitialPool(): Map { const pool = new Map(); const root = LiveMap.zeroValue(this._liveObjects, ROOT_OBJECT_ID); diff --git a/src/plugins/liveobjects/statemessage.ts b/src/plugins/liveobjects/statemessage.ts index 74b385630..5dd409811 100644 --- a/src/plugins/liveobjects/statemessage.ts +++ b/src/plugins/liveobjects/statemessage.ts @@ -8,6 +8,7 @@ export enum StateOperationAction { MAP_REMOVE = 2, COUNTER_CREATE = 3, COUNTER_INC = 4, + OBJECT_DELETE = 5, } export enum MapSemantics { @@ -106,6 +107,8 @@ export interface StateObject { objectId: string; /** A vector of origin timeserials keyed by site code of the last operation that was applied to this state object. */ siteTimeserials: Record; + /** True if the object has been tombstoned. */ + tombstone: boolean; /** * The operation that created the state object. * diff --git a/test/common/modules/live_objects_helper.js b/test/common/modules/live_objects_helper.js index a5c7a10f3..b5e42f79d 100644 --- a/test/common/modules/live_objects_helper.js +++ b/test/common/modules/live_objects_helper.js @@ -12,6 +12,7 @@ define(['ably', 'shared_helper', 'live_objects'], function (Ably, Helper, LiveOb MAP_REMOVE: 2, COUNTER_CREATE: 3, COUNTER_INC: 4, + OBJECT_DELETE: 5, }; function nonce() { @@ -180,12 +181,25 @@ define(['ably', 'shared_helper', 'live_objects'], function (Ably, Helper, LiveOb return op; } + objectDeleteOp(opts) { + const { objectId } = opts ?? {}; + const op = { + operation: { + action: ACTIONS.OBJECT_DELETE, + objectId, + }, + }; + + return op; + } + mapObject(opts) { - const { objectId, siteTimeserials, initialEntries, materialisedEntries } = opts; + const { objectId, siteTimeserials, initialEntries, materialisedEntries, tombstone } = opts; const obj = { object: { objectId, siteTimeserials, + tombstone: tombstone === true, map: { semantics: 0, entries: materialisedEntries, @@ -201,11 +215,12 @@ define(['ably', 'shared_helper', 'live_objects'], function (Ably, Helper, LiveOb } counterObject(opts) { - const { objectId, siteTimeserials, initialCount, materialisedCount } = opts; + const { objectId, siteTimeserials, initialCount, materialisedCount, tombstone } = opts; const obj = { object: { objectId, siteTimeserials, + tombstone: tombstone === true, counter: { count: materialisedCount, }, diff --git a/test/package/browser/template/src/ably.config.d.ts b/test/package/browser/template/src/ably.config.d.ts index e5bca7718..3b3c69ddb 100644 --- a/test/package/browser/template/src/ably.config.d.ts +++ b/test/package/browser/template/src/ably.config.d.ts @@ -5,13 +5,13 @@ type CustomRoot = { stringKey: string; booleanKey: boolean; couldBeUndefined?: string; - mapKey?: LiveMap<{ + mapKey: LiveMap<{ foo: 'bar'; nestedMap?: LiveMap<{ baz: 'qux'; }>; }>; - counterKey?: LiveCounter; + counterKey: LiveCounter; }; declare global { diff --git a/test/package/browser/template/src/index-liveobjects.ts b/test/package/browser/template/src/index-liveobjects.ts index 1cd27b021..4059cc01d 100644 --- a/test/package/browser/template/src/index-liveobjects.ts +++ b/test/package/browser/template/src/index-liveobjects.ts @@ -24,14 +24,16 @@ globalThis.testAblyPackage = async function () { const size: number = root.size(); // check custom user provided typings via LiveObjectsTypes are working: + // any LiveMap.get() call can return undefined, as the LiveMap itself can be tombstoned (has empty state), + // or referenced object is tombstoned. // keys on a root: - const aNumber: number = root.get('numberKey'); - const aString: string = root.get('stringKey'); - const aBoolean: boolean = root.get('booleanKey'); - const couldBeUndefined: string | undefined = root.get('couldBeUndefined'); + const aNumber: number | undefined = root.get('numberKey'); + const aString: string | undefined = root.get('stringKey'); + const aBoolean: boolean | undefined = root.get('booleanKey'); + const userProvidedUndefined: string | undefined = root.get('couldBeUndefined'); // live objects on a root: const counter: Ably.LiveCounter | undefined = root.get('counterKey'); - const map: LiveObjectsTypes['root']['mapKey'] = root.get('mapKey'); + const map: LiveObjectsTypes['root']['mapKey'] | undefined = root.get('mapKey'); // check string literal types works // need to use nullish coalescing as we didn't actually create any data on the root, // so the next calls would fail. we only need to check that TypeScript types work @@ -61,5 +63,5 @@ globalThis.testAblyPackage = async function () { // check can provide custom types for the getRoot method, ignoring global LiveObjectsTypes interface const explicitRoot: Ably.LiveMap = await liveObjects.getRoot(); - const someOtherKey: string = explicitRoot.get('someOtherKey'); + const someOtherKey: string | undefined = explicitRoot.get('someOtherKey'); }; diff --git a/test/realtime/live_objects.test.js b/test/realtime/live_objects.test.js index 01eb376fc..22164ef79 100644 --- a/test/realtime/live_objects.test.js +++ b/test/realtime/live_objects.test.js @@ -515,6 +515,162 @@ define(['ably', 'shared_helper', 'chai', 'live_objects', 'live_objects_helper'], { name: 'maxSafeIntegerCounter', count: Number.MAX_SAFE_INTEGER }, { name: 'negativeMaxSafeIntegerCounter', count: -Number.MAX_SAFE_INTEGER }, ]; + + const stateSyncSequenceScanarios = [ + { + description: 'STATE_SYNC sequence with state object "tombstone" property creates tombstoned object', + action: async (ctx) => { + const { root, liveObjectsHelper, channel } = ctx; + + const mapId = liveObjectsHelper.fakeMapObjectId(); + const counterId = liveObjectsHelper.fakeCounterObjectId(); + await liveObjectsHelper.processStateObjectMessageOnChannel({ + channel, + syncSerial: 'serial:', // empty serial so STATE_SYNC ends immediately + // add state objects with tombstone=true + state: [ + liveObjectsHelper.mapObject({ + objectId: mapId, + siteTimeserials: { + aaa: lexicoTimeserial('aaa', 0, 0), + }, + tombstone: true, + initialEntries: {}, + }), + liveObjectsHelper.counterObject({ + objectId: counterId, + siteTimeserials: { + aaa: lexicoTimeserial('aaa', 0, 0), + }, + tombstone: true, + initialCount: 1, + }), + liveObjectsHelper.mapObject({ + objectId: 'root', + siteTimeserials: { aaa: lexicoTimeserial('aaa', 0, 0) }, + initialEntries: { + map: { timeserial: lexicoTimeserial('aaa', 0, 0), data: { objectId: mapId } }, + counter: { timeserial: lexicoTimeserial('aaa', 0, 0), data: { objectId: counterId } }, + foo: { timeserial: lexicoTimeserial('aaa', 0, 0), data: { value: 'bar' } }, + }, + }), + ], + }); + + expect( + root.get('map'), + 'Check map does not exist on root after STATE_SYNC with "tombstone=true" for a map object', + ).to.not.exist; + expect( + root.get('counter'), + 'Check counter does not exist on root after STATE_SYNC with "tombstone=true" for a counter object', + ).to.not.exist; + // control check that STATE_SYNC was applied at all + expect(root.get('foo'), 'Check property exists on root after STATE_SYNC').to.exist; + }, + }, + + { + description: 'STATE_SYNC sequence with state object "tombstone" property deletes existing object', + action: async (ctx) => { + const { root, liveObjectsHelper, channelName, channel } = ctx; + + const { objectId: counterId } = await liveObjectsHelper.createAndSetOnMap(channelName, { + mapObjectId: 'root', + key: 'counter', + createOp: liveObjectsHelper.counterCreateOp({ count: 1 }), + }); + + expect(root.get('counter'), 'Check counter exists on root before STATE_SYNC sequence with "tombstone=true"') + .to.exist; + + // inject a STATE_SYNC sequence where a counter is now tombstoned + await liveObjectsHelper.processStateObjectMessageOnChannel({ + channel, + syncSerial: 'serial:', // empty serial so STATE_SYNC ends immediately + state: [ + liveObjectsHelper.counterObject({ + objectId: counterId, + siteTimeserials: { + aaa: lexicoTimeserial('aaa', 0, 0), + }, + tombstone: true, + initialCount: 1, + }), + liveObjectsHelper.mapObject({ + objectId: 'root', + siteTimeserials: { aaa: lexicoTimeserial('aaa', 0, 0) }, + initialEntries: { + counter: { timeserial: lexicoTimeserial('aaa', 0, 0), data: { objectId: counterId } }, + foo: { timeserial: lexicoTimeserial('aaa', 0, 0), data: { value: 'bar' } }, + }, + }), + ], + }); + + expect( + root.get('counter'), + 'Check counter does not exist on root after STATE_SYNC with "tombstone=true" for an existing counter object', + ).to.not.exist; + // control check that STATE_SYNC was applied at all + expect(root.get('foo'), 'Check property exists on root after STATE_SYNC').to.exist; + }, + }, + + { + description: + 'STATE_SYNC sequence with state object "tombstone" property triggers subscription callback for existing object', + action: async (ctx) => { + const { root, liveObjectsHelper, channelName, channel } = ctx; + + const { objectId: counterId } = await liveObjectsHelper.createAndSetOnMap(channelName, { + mapObjectId: 'root', + key: 'counter', + createOp: liveObjectsHelper.counterCreateOp({ count: 1 }), + }); + + const counterSubPromise = new Promise((resolve, reject) => + root.get('counter').subscribe((update) => { + try { + expect(update).to.deep.equal( + { update: { inc: -1 } }, + 'Check counter subscription callback is called with an expected update object after STATE_SYNC sequence with "tombstone=true"', + ); + resolve(); + } catch (error) { + reject(error); + } + }), + ); + + // inject a STATE_SYNC sequence where a counter is now tombstoned + await liveObjectsHelper.processStateObjectMessageOnChannel({ + channel, + syncSerial: 'serial:', // empty serial so STATE_SYNC ends immediately + state: [ + liveObjectsHelper.counterObject({ + objectId: counterId, + siteTimeserials: { + aaa: lexicoTimeserial('aaa', 0, 0), + }, + tombstone: true, + initialCount: 1, + }), + liveObjectsHelper.mapObject({ + objectId: 'root', + siteTimeserials: { aaa: lexicoTimeserial('aaa', 0, 0) }, + initialEntries: { + counter: { timeserial: lexicoTimeserial('aaa', 0, 0), data: { objectId: counterId } }, + }, + }), + ], + }); + + await counterSubPromise; + }, + }, + ]; + const applyOperationsScenarios = [ { description: 'can apply MAP_CREATE with primitives state operation messages', @@ -1249,24 +1405,337 @@ define(['ably', 'shared_helper', 'chai', 'live_objects', 'live_objects_helper'], ); }, }, - ]; - /** @nospec */ - forScenarios(applyOperationsScenarios, async function (helper, scenario) { - const liveObjectsHelper = new LiveObjectsHelper(helper); - const client = RealtimeWithLiveObjects(helper); + { + description: 'can apply OBJECT_DELETE state operation messages', + action: async (ctx) => { + const { root, liveObjectsHelper, channelName, channel } = ctx; - await helper.monitorConnectionThenCloseAndFinish(async () => { - const channelName = scenario.description; - const channel = client.channels.get(channelName, channelOptionsWithLiveObjects()); - const liveObjects = channel.liveObjects; + // create initial objects and set on root + const { objectId: mapObjectId } = await liveObjectsHelper.createAndSetOnMap(channelName, { + mapObjectId: 'root', + key: 'map', + createOp: liveObjectsHelper.mapCreateOp(), + }); + const { objectId: counterObjectId } = await liveObjectsHelper.createAndSetOnMap(channelName, { + mapObjectId: 'root', + key: 'counter', + createOp: liveObjectsHelper.counterCreateOp(), + }); - await channel.attach(); - const root = await liveObjects.getRoot(); + expect(root.get('map'), 'Check map exists on root before OBJECT_DELETE').to.exist; + expect(root.get('counter'), 'Check counter exists on root before OBJECT_DELETE').to.exist; - await scenario.action({ root, liveObjectsHelper, channelName, channel }); - }, client); - }); + // inject OBJECT_DELETE + await liveObjectsHelper.processStateOperationMessageOnChannel({ + channel, + serial: lexicoTimeserial('aaa', 0, 0), + siteCode: 'aaa', + state: [liveObjectsHelper.objectDeleteOp({ objectId: mapObjectId })], + }); + await liveObjectsHelper.processStateOperationMessageOnChannel({ + channel, + serial: lexicoTimeserial('aaa', 1, 0), + siteCode: 'aaa', + state: [liveObjectsHelper.objectDeleteOp({ objectId: counterObjectId })], + }); + + expect(root.get('map'), 'Check map is not accessible on root after OBJECT_DELETE').to.not.exist; + expect(root.get('counter'), 'Check counter is not accessible on root after OBJECT_DELETE').to.not.exist; + }, + }, + + { + description: 'OBJECT_DELETE for unknown object id creates zero-value tombstoned object', + action: async (ctx) => { + const { root, liveObjectsHelper, channel } = ctx; + + const counterId = liveObjectsHelper.fakeCounterObjectId(); + // inject OBJECT_DELETE. should create a zero-value tombstoned object which can't be modified + await liveObjectsHelper.processStateOperationMessageOnChannel({ + channel, + serial: lexicoTimeserial('aaa', 0, 0), + siteCode: 'aaa', + state: [liveObjectsHelper.objectDeleteOp({ objectId: counterId })], + }); + + // try to create and set tombstoned object on root + await liveObjectsHelper.processStateOperationMessageOnChannel({ + channel, + serial: lexicoTimeserial('bbb', 0, 0), + siteCode: 'bbb', + state: [liveObjectsHelper.counterCreateOp({ objectId: counterId })], + }); + await liveObjectsHelper.processStateOperationMessageOnChannel({ + channel, + serial: lexicoTimeserial('bbb', 1, 0), + siteCode: 'bbb', + state: [liveObjectsHelper.mapSetOp({ objectId: 'root', key: 'counter', data: { objectId: counterId } })], + }); + + expect(root.get('counter'), 'Check counter is not accessible on root').to.not.exist; + }, + }, + + { + description: + 'OBJECT_DELETE state operation messages are applied based on the site timeserials vector of the object', + action: async (ctx) => { + const { root, liveObjectsHelper, channel } = ctx; + + // need to use multiple objects as OBJECT_DELETE op can only be applied once to an object + const counterIds = [ + liveObjectsHelper.fakeCounterObjectId(), + liveObjectsHelper.fakeCounterObjectId(), + liveObjectsHelper.fakeCounterObjectId(), + liveObjectsHelper.fakeCounterObjectId(), + liveObjectsHelper.fakeCounterObjectId(), + ]; + await Promise.all( + counterIds.map(async (counterId, i) => { + // create objects and set them on root with forged timeserials + await liveObjectsHelper.processStateOperationMessageOnChannel({ + channel, + serial: lexicoTimeserial('bbb', 1, 0), + siteCode: 'bbb', + state: [liveObjectsHelper.counterCreateOp({ objectId: counterId })], + }); + await liveObjectsHelper.processStateOperationMessageOnChannel({ + channel, + serial: lexicoTimeserial('aaa', i, 0), + siteCode: 'aaa', + state: [ + liveObjectsHelper.mapSetOp({ objectId: 'root', key: counterId, data: { objectId: counterId } }), + ], + }); + }), + ); + + // inject OBJECT_DELETE operations with various timeserial values + for (const [i, { serial, siteCode }] of [ + { serial: lexicoTimeserial('bbb', 0, 0), siteCode: 'bbb' }, // existing site, earlier CGO, not applied + { serial: lexicoTimeserial('bbb', 1, 0), siteCode: 'bbb' }, // existing site, same CGO, not applied + { serial: lexicoTimeserial('bbb', 2, 0), siteCode: 'bbb' }, // existing site, later CGO, applied + { serial: lexicoTimeserial('aaa', 0, 0), siteCode: 'aaa' }, // different site, earlier CGO, applied + { serial: lexicoTimeserial('ccc', 9, 0), siteCode: 'ccc' }, // different site, later CGO, applied + ].entries()) { + await liveObjectsHelper.processStateOperationMessageOnChannel({ + channel, + serial, + siteCode, + state: [liveObjectsHelper.objectDeleteOp({ objectId: counterIds[i] })], + }); + } + + // check only operations with correct timeserials were applied + const expectedCounters = [ + { exists: true }, + { exists: true }, + { exists: false }, // OBJECT_DELETE applied + { exists: false }, // OBJECT_DELETE applied + { exists: false }, // OBJECT_DELETE applied + ]; + + for (const [i, counterId] of counterIds.entries()) { + const { exists } = expectedCounters[i]; + + if (exists) { + expect( + root.get(counterId), + `Check counter #${i + 1} exists on root as OBJECT_DELETE op was not applied`, + ).to.exist; + } else { + expect( + root.get(counterId), + `Check counter #${i + 1} does not exist on root as OBJECT_DELETE op was applied`, + ).to.not.exist; + } + } + }, + }, + + { + description: 'OBJECT_DELETE triggers subscription callback with deleted data', + action: async (ctx) => { + const { root, liveObjectsHelper, channelName, channel } = ctx; + + // create initial objects and set on root + const { objectId: mapObjectId } = await liveObjectsHelper.createAndSetOnMap(channelName, { + mapObjectId: 'root', + key: 'map', + createOp: liveObjectsHelper.mapCreateOp({ + entries: { + foo: { data: { value: 'bar' } }, + baz: { data: { value: 1 } }, + }, + }), + }); + const { objectId: counterObjectId } = await liveObjectsHelper.createAndSetOnMap(channelName, { + mapObjectId: 'root', + key: 'counter', + createOp: liveObjectsHelper.counterCreateOp({ count: 1 }), + }); + + const mapSubPromise = new Promise((resolve, reject) => + root.get('map').subscribe((update) => { + try { + expect(update).to.deep.equal( + { update: { foo: 'removed', baz: 'removed' } }, + 'Check map subscription callback is called with an expected update object after OBJECT_DELETE operation', + ); + resolve(); + } catch (error) { + reject(error); + } + }), + ); + const counterSubPromise = new Promise((resolve, reject) => + root.get('counter').subscribe((update) => { + try { + expect(update).to.deep.equal( + { update: { inc: -1 } }, + 'Check counter subscription callback is called with an expected update object after OBJECT_DELETE operation', + ); + resolve(); + } catch (error) { + reject(error); + } + }), + ); + + // inject OBJECT_DELETE + await liveObjectsHelper.processStateOperationMessageOnChannel({ + channel, + serial: lexicoTimeserial('aaa', 0, 0), + siteCode: 'aaa', + state: [liveObjectsHelper.objectDeleteOp({ objectId: mapObjectId })], + }); + await liveObjectsHelper.processStateOperationMessageOnChannel({ + channel, + serial: lexicoTimeserial('aaa', 1, 0), + siteCode: 'aaa', + state: [liveObjectsHelper.objectDeleteOp({ objectId: counterObjectId })], + }); + + await Promise.all([mapSubPromise, counterSubPromise]); + }, + }, + + { + description: 'MAP_SET with reference to a tombstoned object results in undefined value on key', + action: async (ctx) => { + const { root, liveObjectsHelper, channelName, channel } = ctx; + + // create initial objects and set on root + const { objectId: counterObjectId } = await liveObjectsHelper.createAndSetOnMap(channelName, { + mapObjectId: 'root', + key: 'foo', + createOp: liveObjectsHelper.counterCreateOp(), + }); + + expect(root.get('foo'), 'Check counter exists on root before OBJECT_DELETE').to.exist; + + // inject OBJECT_DELETE + await liveObjectsHelper.processStateOperationMessageOnChannel({ + channel, + serial: lexicoTimeserial('aaa', 0, 0), + siteCode: 'aaa', + state: [liveObjectsHelper.objectDeleteOp({ objectId: counterObjectId })], + }); + + // set tombstoned counter to another key on root + await liveObjectsHelper.processStateOperationMessageOnChannel({ + channel, + serial: lexicoTimeserial('aaa', 0, 0), + siteCode: 'aaa', + state: [ + liveObjectsHelper.mapSetOp({ objectId: 'root', key: 'bar', data: { objectId: counterObjectId } }), + ], + }); + + expect(root.get('bar'), 'Check counter is not accessible on new key in root after OBJECT_DELETE').to.not + .exist; + }, + }, + + { + description: 'state operation message on a tombstoned object does not revive it', + action: async (ctx) => { + const { root, liveObjectsHelper, channelName, channel } = ctx; + + // create initial objects and set on root + const { objectId: mapId1 } = await liveObjectsHelper.createAndSetOnMap(channelName, { + mapObjectId: 'root', + key: 'map1', + createOp: liveObjectsHelper.mapCreateOp(), + }); + const { objectId: mapId2 } = await liveObjectsHelper.createAndSetOnMap(channelName, { + mapObjectId: 'root', + key: 'map2', + createOp: liveObjectsHelper.mapCreateOp({ entries: { foo: { data: { value: 'bar' } } } }), + }); + const { objectId: counterId1 } = await liveObjectsHelper.createAndSetOnMap(channelName, { + mapObjectId: 'root', + key: 'counter1', + createOp: liveObjectsHelper.counterCreateOp(), + }); + + expect(root.get('map1'), 'Check map1 exists on root before OBJECT_DELETE').to.exist; + expect(root.get('map2'), 'Check map2 exists on root before OBJECT_DELETE').to.exist; + expect(root.get('counter1'), 'Check counter1 exists on root before OBJECT_DELETE').to.exist; + + // inject OBJECT_DELETE + await liveObjectsHelper.processStateOperationMessageOnChannel({ + channel, + serial: lexicoTimeserial('aaa', 0, 0), + siteCode: 'aaa', + state: [liveObjectsHelper.objectDeleteOp({ objectId: mapId1 })], + }); + await liveObjectsHelper.processStateOperationMessageOnChannel({ + channel, + serial: lexicoTimeserial('aaa', 1, 0), + siteCode: 'aaa', + state: [liveObjectsHelper.objectDeleteOp({ objectId: mapId2 })], + }); + await liveObjectsHelper.processStateOperationMessageOnChannel({ + channel, + serial: lexicoTimeserial('aaa', 2, 0), + siteCode: 'aaa', + state: [liveObjectsHelper.objectDeleteOp({ objectId: counterId1 })], + }); + + // inject state ops on tombstoned objects + await liveObjectsHelper.processStateOperationMessageOnChannel({ + channel, + serial: lexicoTimeserial('aaa', 3, 0), + siteCode: 'aaa', + state: [liveObjectsHelper.mapSetOp({ objectId: mapId1, key: 'baz', data: { value: 'qux' } })], + }); + await liveObjectsHelper.processStateOperationMessageOnChannel({ + channel, + serial: lexicoTimeserial('aaa', 4, 0), + siteCode: 'aaa', + state: [liveObjectsHelper.mapRemoveOp({ objectId: mapId2, key: 'foo' })], + }); + await liveObjectsHelper.processStateOperationMessageOnChannel({ + channel, + serial: lexicoTimeserial('aaa', 5, 0), + siteCode: 'aaa', + state: [liveObjectsHelper.counterIncOp({ objectId: counterId1, amount: 1 })], + }); + + // objects should still be deleted + expect(root.get('map1'), 'Check map1 does not exist on root after OBJECT_DELETE and another state op').to + .not.exist; + expect(root.get('map2'), 'Check map2 does not exist on root after OBJECT_DELETE and another state op').to + .not.exist; + expect( + root.get('counter1'), + 'Check counter1 does not exist on root after OBJECT_DELETE and another state op', + ).to.not.exist; + }, + }, + ]; const applyOperationsDuringSyncScenarios = [ { @@ -1590,23 +2059,24 @@ define(['ably', 'shared_helper', 'chai', 'live_objects', 'live_objects_helper'], ]; /** @nospec */ - forScenarios(applyOperationsDuringSyncScenarios, async function (helper, scenario) { - const liveObjectsHelper = new LiveObjectsHelper(helper); - const client = RealtimeWithLiveObjects(helper); + forScenarios( + [...stateSyncSequenceScanarios, ...applyOperationsScenarios, ...applyOperationsDuringSyncScenarios], + async function (helper, scenario) { + const liveObjectsHelper = new LiveObjectsHelper(helper); + const client = RealtimeWithLiveObjects(helper); - await helper.monitorConnectionThenCloseAndFinish(async () => { - const channelName = scenario.description; - const channel = client.channels.get(channelName, channelOptionsWithLiveObjects()); - const liveObjects = channel.liveObjects; + await helper.monitorConnectionThenCloseAndFinish(async () => { + const channelName = scenario.description; + const channel = client.channels.get(channelName, channelOptionsWithLiveObjects()); + const liveObjects = channel.liveObjects; - await channel.attach(); - // wait for getRoot() to resolve so the initial SYNC sequence is completed, - // as we're going to initiate a new one to test applying operations during SYNC sequence. - const root = await liveObjects.getRoot(); + await channel.attach(); + const root = await liveObjects.getRoot(); - await scenario.action({ root, liveObjectsHelper, channelName, channel }); - }, client); - }); + await scenario.action({ root, liveObjectsHelper, channelName, channel }); + }, client); + }, + ); const subscriptionCallbacksScenarios = [ {