diff --git a/src/common/lib/client/baseclient.ts b/src/common/lib/client/baseclient.ts index be2cde205..c79677b65 100644 --- a/src/common/lib/client/baseclient.ts +++ b/src/common/lib/client/baseclient.ts @@ -18,6 +18,7 @@ import { HTTPRequestImplementations } from 'platform/web/lib/http/http'; import { FilteredSubscriptions } from './filteredsubscriptions'; import type { LocalDevice } from 'plugins/push/pushactivation'; import EventEmitter from '../util/eventemitter'; +import { MessageEncoding } from '../types/message'; type BatchResult = API.BatchResult; type BatchPublishSpec = API.BatchPublishSpec; @@ -181,6 +182,7 @@ class BaseClient { Defaults = Defaults; Utils = Utils; EventEmitter = EventEmitter; + MessageEncoding = MessageEncoding; } export default BaseClient; diff --git a/src/common/lib/client/realtimechannel.ts b/src/common/lib/client/realtimechannel.ts index 427b6398e..1208453ae 100644 --- a/src/common/lib/client/realtimechannel.ts +++ b/src/common/lib/client/realtimechannel.ts @@ -12,10 +12,10 @@ import Message, { fromValuesArray as messagesFromValuesArray, encodeArray as encodeMessagesArray, decode as decodeMessage, - decodeData, getMessagesSize, CipherOptions, EncodingDecodingContext, + MessageEncoding, } from '../types/message'; import ChannelStateChange from './channelstatechange'; import ErrorInfo, { PartialErrorInfo } from '../types/errorinfo'; @@ -511,6 +511,17 @@ class RealtimeChannel extends EventEmitter { this.sendMessage(msg, callback); } + sendState(state: StateMessage[]): Promise { + return new Promise((resolve, reject) => { + const msg = protocolMessageFromValues({ + action: actions.STATE, + channel: this.name, + state, + }); + this.sendMessage(msg, (err) => (err ? reject(err) : resolve())); + }); + } + // Access to this method is synchronised by ConnectionManager#processChannelMessage, in order to synchronise access to the state stored in _decodingContext. async processMessage(message: ProtocolMessage): Promise { if ( @@ -615,7 +626,7 @@ class RealtimeChannel extends EventEmitter { const options = this.channelOptions; await this._decodeAndPrepareMessages(message, stateMessages, (msg) => this.client._LiveObjectsPlugin - ? this.client._LiveObjectsPlugin.StateMessage.decode(msg, options, decodeData) + ? this.client._LiveObjectsPlugin.StateMessage.decode(msg, options, MessageEncoding) : Utils.throwMissingPluginError('LiveObjects'), ); diff --git a/src/common/lib/transport/protocol.ts b/src/common/lib/transport/protocol.ts index 88a5947e4..47a71171f 100644 --- a/src/common/lib/transport/protocol.ts +++ b/src/common/lib/transport/protocol.ts @@ -20,7 +20,8 @@ export class PendingMessage { this.merged = false; const action = message.action; this.sendAttempted = false; - this.ackRequired = action == actions.MESSAGE || action == actions.PRESENCE; + this.ackRequired = + typeof action === 'number' && [actions.MESSAGE, actions.PRESENCE, actions.STATE].includes(action); } } diff --git a/src/common/lib/types/message.ts b/src/common/lib/types/message.ts index a1a449942..3abad6b2d 100644 --- a/src/common/lib/types/message.ts +++ b/src/common/lib/types/message.ts @@ -131,41 +131,120 @@ export async function fromEncodedArray( ); } -async function encrypt(msg: T, options: CipherOptions): Promise { - let data = msg.data, - encoding = msg.encoding, - cipher = options.channelCipher; - - encoding = encoding ? encoding + '/' : ''; - if (!Platform.BufferUtils.isBuffer(data)) { - data = Platform.BufferUtils.utf8Encode(String(data)); - encoding = encoding + 'utf-8/'; - } - const ciphertext = await cipher.encrypt(data); - msg.data = ciphertext; - msg.encoding = encoding + 'cipher+' + cipher.algorithm; +async function encrypt(msg: T, cipherOptions: CipherOptions): Promise { + const { data, encoding } = await encryptData(msg.data, msg.encoding, cipherOptions); + msg.data = data; + msg.encoding = encoding; return msg; } -export async function encode(msg: T, options: CipherOptions): Promise { - const data = msg.data; +export async function encryptData( + data: any, + encoding: string | null | undefined, + cipherOptions: CipherOptions, +): Promise<{ data: any; encoding: string | null | undefined }> { + let cipher = cipherOptions.channelCipher; + let dataToEncrypt = data; + let finalEncoding = encoding ? encoding + '/' : ''; + + if (!Platform.BufferUtils.isBuffer(dataToEncrypt)) { + dataToEncrypt = Platform.BufferUtils.utf8Encode(String(dataToEncrypt)); + finalEncoding = finalEncoding + 'utf-8/'; + } + + const ciphertext = await cipher.encrypt(dataToEncrypt); + finalEncoding = finalEncoding + 'cipher+' + cipher.algorithm; + + return { + data: ciphertext, + encoding: finalEncoding, + }; +} + +/** + * Protocol agnostic encoding and encryption of the message's payload. Mutates the message. + * Implements RSL4 (only parts that are common for all protocols), and RSL5. + * + * Since this encoding function is protocol agnostic, it won't apply the final encodings + * required by the protocol used by the client (like encoding binary data to the appropriate representation). + */ +export async function encode(msg: T, cipherOptions: CipherOptions): Promise { + const { data, encoding } = encodeData(msg.data, msg.encoding); + msg.data = data; + msg.encoding = encoding; + + if (cipherOptions != null && cipherOptions.cipher) { + return encrypt(msg, cipherOptions); + } else { + return msg; + } +} + +/** + * Protocol agnostic encoding of the provided payload data. Implements RSL4 (only parts that are common for all protocols). + */ +export function encodeData( + data: any, + encoding: string | null | undefined, +): { data: any; encoding: string | null | undefined } { + // RSL4a, supported types const nativeDataType = typeof data == 'string' || Platform.BufferUtils.isBuffer(data) || data === null || data === undefined; - if (!nativeDataType) { - if (Utils.isObject(data) || Array.isArray(data)) { - msg.data = JSON.stringify(data); - msg.encoding = msg.encoding ? msg.encoding + '/json' : 'json'; - } else { - throw new ErrorInfo('Data type is unsupported', 40013, 400); - } + if (nativeDataType) { + // nothing to do with the native data types at this point + return { + data, + encoding, + }; } - if (options != null && options.cipher) { - return encrypt(msg, options); - } else { - return msg; + if (Utils.isObject(data) || Array.isArray(data)) { + // RSL4c3 and RSL4d3, encode objects and arrays as strings + return { + data: JSON.stringify(data), + encoding: encoding ? encoding + '/json' : 'json', + }; } + + // RSL4a, throw an error for unsupported types + throw new ErrorInfo('Data type is unsupported', 40013, 400); +} + +/** + * Prepares the payload data to be transmitted over the wire to Ably. + * Encodes the data depending on the selected protocol format. + * + * Implements RSL4c1 and RSL4d1 + */ +export function encodeDataForWireProtocol( + data: any, + encoding: string | null | undefined, + format: Utils.Format, +): { data: any; encoding: string | null | undefined } { + if (!data || !Platform.BufferUtils.isBuffer(data)) { + // no transformation required for non-buffer payloads + return { + data, + encoding, + }; + } + + if (format === Utils.Format.msgpack) { + // RSL4c1 + // BufferUtils.toBuffer returns a datatype understandable by that platform's msgpack implementation: + // Buffer in node, Uint8Array in browsers + return { + data: Platform.BufferUtils.toBuffer(data), + encoding, + }; + } + + // RSL4d1, encode binary payload as base64 string + return { + data: Platform.BufferUtils.base64Encode(data), + encoding: encoding ? encoding + '/base64' : 'base64', + }; } export async function encodeArray(messages: Array, options: CipherOptions): Promise> { @@ -178,6 +257,8 @@ export async function decode( message: Message | PresenceMessage, inputContext: CipherOptions | EncodingDecodingContext | ChannelOptions, ): Promise { + // data can be decoded partially and throw an error on a later decoding step. + // so we need to reassign the data and encoding values we got, and only then throw an error if there is one const { data, encoding, error } = await decodeData(message.data, message.encoding, inputContext); message.data = data; message.encoding = encoding; @@ -187,6 +268,9 @@ export async function decode( } } +/** + * Implements RSL6 + */ export async function decodeData( data: any, encoding: string | null | undefined, @@ -199,8 +283,8 @@ export async function decodeData( const context = normaliseContext(inputContext); let lastPayload = data; let decodedData = data; - let finalEncoding: string | null | undefined = encoding; - let decodingError: ErrorInfo | undefined = undefined; + let finalEncoding = encoding; + let decodingError: ErrorInfo | undefined; if (encoding) { const xforms = encoding.split('/'); @@ -358,6 +442,13 @@ export function getMessagesSize(messages: Message[]): number { return total; } +export const MessageEncoding = { + encryptData, + encodeData, + encodeDataForWireProtocol, + decodeData, +}; + class Message { name?: string; id?: string; @@ -378,27 +469,19 @@ class Message { operation?: API.Operation; /** - * Overload toJSON() to intercept JSON.stringify() - * @return {*} + * Overload toJSON() to intercept JSON.stringify(). + * + * This will prepare the message to be transmitted over the wire to Ably. + * It will encode the data payload according to the wire protocol used on the client. + * It will transform any client-side enum string representations into their corresponding numbers, if needed (like "action" fields). */ toJSON() { - /* encode data to base64 if present and we're returning real JSON; - * although msgpack calls toJSON(), we know it is a stringify() - * call if it has a non-empty arguments list */ - let encoding = this.encoding; - let data = this.data; - if (data && Platform.BufferUtils.isBuffer(data)) { - if (arguments.length > 0) { - /* stringify call */ - encoding = encoding ? encoding + '/base64' : 'base64'; - data = Platform.BufferUtils.base64Encode(data); - } else { - /* Called by msgpack. toBuffer returns a datatype understandable by - * that platform's msgpack implementation (Buffer in node, Uint8Array - * in browsers) */ - data = Platform.BufferUtils.toBuffer(data); - } - } + // we can infer the format used by client by inspecting with what arguments this method was called. + // if JSON protocol is being used, the JSON.stringify() will be called and this toJSON() method will have a non-empty arguments list. + // MSGPack protocol implementation also calls toJSON(), but with an empty arguments list. + const format = arguments.length > 0 ? Utils.Format.json : Utils.Format.msgpack; + const { data, encoding } = encodeDataForWireProtocol(this.data, this.encoding, format); + return { name: this.name, id: this.id, diff --git a/src/common/lib/types/presencemessage.ts b/src/common/lib/types/presencemessage.ts index 34e0d2d06..144ceb3d2 100644 --- a/src/common/lib/types/presencemessage.ts +++ b/src/common/lib/types/presencemessage.ts @@ -1,6 +1,12 @@ import Logger from '../util/logger'; import Platform from 'common/platform'; -import { encode as encodeMessage, decode as decodeMessage, getMessagesSize, CipherOptions } from './message'; +import { + encode as encodeMessage, + decode as decodeMessage, + getMessagesSize, + CipherOptions, + encodeDataForWireProtocol, +} from './message'; import * as Utils from '../util/utils'; import * as API from '../../../../ably'; import { MsgPack } from 'common/types/msgpack'; @@ -128,8 +134,11 @@ class PresenceMessage { } /** - * Overload toJSON() to intercept JSON.stringify() - * @return {*} + * Overload toJSON() to intercept JSON.stringify(). + * + * This will prepare the message to be transmitted over the wire to Ably. + * It will encode the data payload according to the wire protocol used on the client. + * It will transform any client-side enum string representations into their corresponding numbers, if needed (like "action" fields). */ toJSON(): { id?: string; @@ -139,30 +148,19 @@ class PresenceMessage { encoding?: string; extras?: any; } { - /* encode data to base64 if present and we're returning real JSON; - * although msgpack calls toJSON(), we know it is a stringify() - * call if it has a non-empty arguments list */ - let data = this.data as string | Buffer | Uint8Array; - let encoding = this.encoding; - if (data && Platform.BufferUtils.isBuffer(data)) { - if (arguments.length > 0) { - /* stringify call */ - encoding = encoding ? encoding + '/base64' : 'base64'; - data = Platform.BufferUtils.base64Encode(data); - } else { - /* Called by msgpack. toBuffer returns a datatype understandable by - * that platform's msgpack implementation (Buffer in node, Uint8Array - * in browsers) */ - data = Platform.BufferUtils.toBuffer(data); - } - } + // we can infer the format used by client by inspecting with what arguments this method was called. + // if JSON protocol is being used, the JSON.stringify() will be called and this toJSON() method will have a non-empty arguments list. + // MSGPack protocol implementation also calls toJSON(), but with an empty arguments list. + const format = arguments.length > 0 ? Utils.Format.json : Utils.Format.msgpack; + const { data, encoding } = encodeDataForWireProtocol(this.data, this.encoding, format); + return { id: this.id, clientId: this.clientId, /* Convert presence action back to an int for sending to Ably */ action: toActionValue(this.action as string), data: data, - encoding: encoding, + encoding: encoding!, extras: this.extras, }; } diff --git a/src/common/lib/types/protocolmessage.ts b/src/common/lib/types/protocolmessage.ts index b0617cdf8..fb473177a 100644 --- a/src/common/lib/types/protocolmessage.ts +++ b/src/common/lib/types/protocolmessage.ts @@ -3,13 +3,16 @@ import * as API from '../../../../ably'; import { PresenceMessagePlugin } from '../client/modularplugins'; import * as Utils from '../util/utils'; import ErrorInfo from './errorinfo'; -import Message, { fromValues as messageFromValues, fromValuesArray as messagesFromValuesArray } from './message'; +import Message, { + fromValues as messageFromValues, + fromValuesArray as messagesFromValuesArray, + MessageEncoding, +} from './message'; import PresenceMessage, { fromValues as presenceMessageFromValues, fromValuesArray as presenceMessagesFromValuesArray, } from './presencemessage'; import type * as LiveObjectsPlugin from 'plugins/liveobjects'; -import Platform from '../../platform'; export const actions = { HEARTBEAT: 0, @@ -128,7 +131,7 @@ export function fromDeserialized( state = deserialized.state as LiveObjectsPlugin.StateMessage[]; if (state) { for (let i = 0; i < state.length; i++) { - state[i] = liveObjectsPlugin.StateMessage.fromValues(state[i], Platform); + state[i] = liveObjectsPlugin.StateMessage.fromValues(state[i], Utils, MessageEncoding); } } } @@ -177,7 +180,8 @@ export function stringify( if (msg.presence && presenceMessagePlugin) result += '; presence=' + toStringArray(presenceMessagePlugin.presenceMessagesFromValuesArray(msg.presence)); if (msg.state && liveObjectsPlugin) { - result += '; state=' + toStringArray(liveObjectsPlugin.StateMessage.fromValuesArray(msg.state, Platform)); + result += + '; state=' + toStringArray(liveObjectsPlugin.StateMessage.fromValuesArray(msg.state, Utils, MessageEncoding)); } if (msg.error) result += '; error=' + ErrorInfo.fromValues(msg.error).toString(); if (msg.auth && msg.auth.accessToken) result += '; token=' + msg.auth.accessToken; diff --git a/src/plugins/liveobjects/livecounter.ts b/src/plugins/liveobjects/livecounter.ts index 95b8dbe52..10542f067 100644 --- a/src/plugins/liveobjects/livecounter.ts +++ b/src/plugins/liveobjects/livecounter.ts @@ -36,6 +36,56 @@ export class LiveCounter extends LiveObject return this._dataRef.data; } + /** + * Send a COUNTER_INC operation to the realtime system to increment a value on this LiveCounter object. + * + * This does not modify the underlying data of this LiveCounter object. Instead, the change will be applied when + * the published COUNTER_INC operation is echoed back to the client and applied to the object following the regular + * operation application procedure. + * + * @returns A promise which resolves upon receiving the ACK message for the published operation message. + */ + async increment(amount: number): Promise { + const stateMessage = this.createCounterIncMessage(amount); + return this._liveObjects.publish([stateMessage]); + } + + /** + * @internal + */ + createCounterIncMessage(amount: number): StateMessage { + if (typeof amount !== 'number' || !isFinite(amount)) { + throw new this._client.ErrorInfo('Counter value increment should be a valid number', 40013, 400); + } + + const stateMessage = StateMessage.fromValues( + { + operation: { + action: StateOperationAction.COUNTER_INC, + objectId: this.getObjectId(), + counterOp: { amount }, + }, + }, + this._client.Utils, + this._client.MessageEncoding, + ); + + return stateMessage; + } + + /** + * Alias for calling {@link LiveCounter.increment | LiveCounter.increment(-amount)} + */ + async decrement(amount: number): Promise { + // do an explicit type safety check here before negating the amount value, + // so we don't unintentionally change the type sent by a user + if (typeof amount !== 'number' || !isFinite(amount)) { + throw new this._client.ErrorInfo('Counter value decrement should be a valid number', 40013, 400); + } + + return this.increment(-amount); + } + /** * @internal */ @@ -55,7 +105,7 @@ export class LiveCounter extends LiveObject this._client.logger, this._client.Logger.LOG_MICRO, 'LiveCounter.applyOperation()', - `skipping ${op.action} op: op timeserial ${opOriginTimeserial.toString()} <= site timeserial ${this._siteTimeserials[opSiteCode]?.toString()}; objectId=${this._objectId}`, + `skipping ${op.action} op: op timeserial ${opOriginTimeserial.toString()} <= site timeserial ${this._siteTimeserials[opSiteCode]?.toString()}; objectId=${this.getObjectId()}`, ); return; } @@ -202,7 +252,7 @@ export class LiveCounter extends LiveObject this._client.logger, this._client.Logger.LOG_MICRO, 'LiveCounter._applyCounterCreate()', - `skipping applying COUNTER_CREATE op on a counter instance as it was already applied before; objectId=${this._objectId}`, + `skipping applying COUNTER_CREATE op on a counter instance as it was already applied before; objectId=${this.getObjectId()}`, ); return { noop: true }; } diff --git a/src/plugins/liveobjects/livemap.ts b/src/plugins/liveobjects/livemap.ts index 378ef3b58..79b291d3d 100644 --- a/src/plugins/liveobjects/livemap.ts +++ b/src/plugins/liveobjects/livemap.ts @@ -153,6 +153,98 @@ export class LiveMap extends LiveObject(key: TKey, value: T[TKey]): Promise { + const stateMessage = this.createMapSetMessage(key, value); + return this._liveObjects.publish([stateMessage]); + } + + /** + * @internal + */ + createMapSetMessage(key: TKey, value: T[TKey]): StateMessage { + if (typeof key !== 'string') { + throw new this._client.ErrorInfo('Map key should be string', 40013, 400); + } + + if ( + typeof value !== 'string' && + typeof value !== 'number' && + typeof value !== 'boolean' && + !this._client.Platform.BufferUtils.isBuffer(value) && + !(value instanceof LiveObject) + ) { + throw new this._client.ErrorInfo('Map value data type is unsupported', 40013, 400); + } + + const stateData: StateData = + value instanceof LiveObject + ? ({ objectId: value.getObjectId() } as ObjectIdStateData) + : ({ value } as ValueStateData); + + const stateMessage = StateMessage.fromValues( + { + operation: { + action: StateOperationAction.MAP_SET, + objectId: this.getObjectId(), + mapOp: { + key, + data: stateData, + }, + }, + }, + this._client.Utils, + this._client.MessageEncoding, + ); + + return stateMessage; + } + + /** + * Send a MAP_REMOVE operation to the realtime system to tombstone a key on this LiveMap object. + * + * This does not modify the underlying data of this LiveMap object. Instead, the change will be applied when + * the published MAP_REMOVE operation is echoed back to the client and applied to the object following the regular + * operation application procedure. + * + * @returns A promise which resolves upon receiving the ACK message for the published operation message. + */ + async remove(key: TKey): Promise { + const stateMessage = this.createMapRemoveMessage(key); + return this._liveObjects.publish([stateMessage]); + } + + /** + * @internal + */ + createMapRemoveMessage(key: TKey): StateMessage { + if (typeof key !== 'string') { + throw new this._client.ErrorInfo('Map key should be string', 40013, 400); + } + + const stateMessage = StateMessage.fromValues( + { + operation: { + action: StateOperationAction.MAP_REMOVE, + objectId: this.getObjectId(), + mapOp: { key }, + }, + }, + this._client.Utils, + this._client.MessageEncoding, + ); + + return stateMessage; + } + /** * @internal */ @@ -172,7 +264,7 @@ export class LiveMap extends LiveObject extends LiveObject extends LiveObject extends LiveObject { + if (!this._channel.connectionManager.activeState()) { + throw this._channel.connectionManager.getError(); + } + + if (this._channel.state === 'failed' || this._channel.state === 'suspended') { + throw this._client.ErrorInfo.fromValues(this._channel.invalidStateError()); + } + + stateMessages.forEach((x) => StateMessage.encode(x, this._client.MessageEncoding)); + + return this._channel.sendState(stateMessages); + } + private _startNewSync(syncId?: string, syncCursor?: string): void { // need to discard all buffered state operation messages on new sync start this._bufferedStateOperations = []; diff --git a/src/plugins/liveobjects/statemessage.ts b/src/plugins/liveobjects/statemessage.ts index 5dd409811..429256ee3 100644 --- a/src/plugins/liveobjects/statemessage.ts +++ b/src/plugins/liveobjects/statemessage.ts @@ -1,7 +1,12 @@ -import type { decodeData } from 'common/lib/types/message'; -import type Platform from 'common/platform'; +import type { MessageEncoding } from 'common/lib/types/message'; +import type * as Utils from 'common/lib/util/utils'; import type { ChannelOptions } from 'common/types/channel'; +export type StateDataEncodeFunction = ( + value: StateValue | undefined, + encoding: string | undefined, +) => { value: StateValue | undefined; encoding: string | undefined }; + export enum StateOperationAction { MAP_CREATE = 0, MAP_SET = 1, @@ -146,46 +151,82 @@ export class StateMessage { /** Site code corresponding to this message's timeserial */ siteCode?: string; - constructor(private _platform: typeof Platform) {} + constructor( + private _utils: typeof Utils, + private _messageEncoding: typeof MessageEncoding, + ) {} + + /** + * Protocol agnostic encoding of the state message's data entries. + * Mutates the provided StateMessage. + * + * Uses encoding functions from regular `Message` processing. + */ + static async encode(message: StateMessage, messageEncoding: typeof MessageEncoding): Promise { + const encodeFn: StateDataEncodeFunction = (value, encoding) => { + const { data: newValue, encoding: newEncoding } = messageEncoding.encodeData(value, encoding); + + return { + value: newValue, + encoding: newEncoding!, + }; + }; + + message.operation = message.operation ? StateMessage._encodeStateOperation(message.operation, encodeFn) : undefined; + message.object = message.object ? StateMessage._encodeStateObject(message.object, encodeFn) : undefined; + + return message; + } + /** + * Mutates the provided StateMessage and decodes all data entries in the message + */ static async decode( message: StateMessage, inputContext: ChannelOptions, - decodeDataFn: typeof decodeData, + messageEncoding: typeof MessageEncoding, ): Promise { // TODO: decide how to handle individual errors from decoding values. currently we throw first ever error we get if (message.object?.map?.entries) { - await StateMessage._decodeMapEntries(message.object.map.entries, inputContext, decodeDataFn); + await StateMessage._decodeMapEntries(message.object.map.entries, inputContext, messageEncoding); } if (message.object?.createOp?.map?.entries) { - await StateMessage._decodeMapEntries(message.object.createOp.map.entries, inputContext, decodeDataFn); + await StateMessage._decodeMapEntries(message.object.createOp.map.entries, inputContext, messageEncoding); } if (message.object?.createOp?.mapOp?.data && 'value' in message.object.createOp.mapOp.data) { - await StateMessage._decodeStateData(message.object.createOp.mapOp.data, inputContext, decodeDataFn); + await StateMessage._decodeStateData(message.object.createOp.mapOp.data, inputContext, messageEncoding); } if (message.operation?.map?.entries) { - await StateMessage._decodeMapEntries(message.operation.map.entries, inputContext, decodeDataFn); + await StateMessage._decodeMapEntries(message.operation.map.entries, inputContext, messageEncoding); } if (message.operation?.mapOp?.data && 'value' in message.operation.mapOp.data) { - await StateMessage._decodeStateData(message.operation.mapOp.data, inputContext, decodeDataFn); + await StateMessage._decodeStateData(message.operation.mapOp.data, inputContext, messageEncoding); } } - static fromValues(values: StateMessage | Record, platform: typeof Platform): StateMessage { - return Object.assign(new StateMessage(platform), values); + static fromValues( + values: StateMessage | Record, + utils: typeof Utils, + messageEncoding: typeof MessageEncoding, + ): StateMessage { + return Object.assign(new StateMessage(utils, messageEncoding), values); } - static fromValuesArray(values: unknown[], platform: typeof Platform): StateMessage[] { + static fromValuesArray( + values: (StateMessage | Record)[], + utils: typeof Utils, + messageEncoding: typeof MessageEncoding, + ): StateMessage[] { const count = values.length; const result = new Array(count); for (let i = 0; i < count; i++) { - result[i] = StateMessage.fromValues(values[i] as Record, platform); + result[i] = StateMessage.fromValues(values[i], utils, messageEncoding); } return result; @@ -194,19 +235,23 @@ export class StateMessage { private static async _decodeMapEntries( mapEntries: Record, inputContext: ChannelOptions, - decodeDataFn: typeof decodeData, + messageEncoding: typeof MessageEncoding, ): Promise { for (const entry of Object.values(mapEntries)) { - await StateMessage._decodeStateData(entry.data, inputContext, decodeDataFn); + await StateMessage._decodeStateData(entry.data, inputContext, messageEncoding); } } private static async _decodeStateData( stateData: StateData, inputContext: ChannelOptions, - decodeDataFn: typeof decodeData, + messageEncoding: typeof MessageEncoding, ): Promise { - const { data, encoding, error } = await decodeDataFn(stateData.value, stateData.encoding, inputContext); + const { data, encoding, error } = await messageEncoding.decodeData( + stateData.value, + stateData.encoding, + inputContext, + ); stateData.value = data; stateData.encoding = encoding ?? undefined; @@ -216,9 +261,8 @@ export class StateMessage { } private static _encodeStateOperation( - platform: typeof Platform, stateOperation: StateOperation, - withBase64Encoding: boolean, + encodeFn: StateDataEncodeFunction, ): StateOperation { // deep copy "stateOperation" object so we can modify the copy here. // buffer values won't be correctly copied, so we will need to set them again explictly. @@ -226,32 +270,20 @@ export class StateMessage { if (stateOperationCopy.mapOp?.data && 'value' in stateOperationCopy.mapOp.data) { // use original "stateOperation" object when encoding values, so we have access to the original buffer values. - stateOperationCopy.mapOp.data = StateMessage._encodeStateData( - platform, - stateOperation.mapOp?.data!, - withBase64Encoding, - ); + stateOperationCopy.mapOp.data = StateMessage._encodeStateData(stateOperation.mapOp?.data!, encodeFn); } if (stateOperationCopy.map?.entries) { Object.entries(stateOperationCopy.map.entries).forEach(([key, entry]) => { // use original "stateOperation" object when encoding values, so we have access to original buffer values. - entry.data = StateMessage._encodeStateData( - platform, - stateOperation?.map?.entries?.[key].data!, - withBase64Encoding, - ); + entry.data = StateMessage._encodeStateData(stateOperation?.map?.entries?.[key].data!, encodeFn); }); } return stateOperationCopy; } - private static _encodeStateObject( - platform: typeof Platform, - stateObject: StateObject, - withBase64Encoding: boolean, - ): StateObject { + private static _encodeStateObject(stateObject: StateObject, encodeFn: StateDataEncodeFunction): StateObject { // deep copy "stateObject" object so we can modify the copy here. // buffer values won't be correctly copied, so we will need to set them again explictly. const stateObjectCopy = JSON.parse(JSON.stringify(stateObject)) as StateObject; @@ -259,71 +291,34 @@ export class StateMessage { if (stateObjectCopy.map?.entries) { Object.entries(stateObjectCopy.map.entries).forEach(([key, entry]) => { // use original "stateObject" object when encoding values, so we have access to original buffer values. - entry.data = StateMessage._encodeStateData( - platform, - stateObject?.map?.entries?.[key].data!, - withBase64Encoding, - ); + entry.data = StateMessage._encodeStateData(stateObject?.map?.entries?.[key].data!, encodeFn); }); } if (stateObjectCopy.createOp) { // use original "stateObject" object when encoding values, so we have access to original buffer values. - stateObjectCopy.createOp = StateMessage._encodeStateOperation( - platform, - stateObject.createOp!, - withBase64Encoding, - ); + stateObjectCopy.createOp = StateMessage._encodeStateOperation(stateObject.createOp!, encodeFn); } return stateObjectCopy; } - private static _encodeStateData(platform: typeof Platform, data: StateData, withBase64Encoding: boolean): StateData { - const { value, encoding } = StateMessage._encodeStateValue( - platform, - data?.value, - data?.encoding, - withBase64Encoding, - ); - return { - ...data, - value, - encoding, - }; - } - - private static _encodeStateValue( - platform: typeof Platform, - value: StateValue | undefined, - encoding: string | undefined, - withBase64Encoding: boolean, - ): { - value: StateValue | undefined; - encoding: string | undefined; - } { - if (!value || !platform.BufferUtils.isBuffer(value)) { - return { value, encoding }; - } - - if (withBase64Encoding) { - return { - value: platform.BufferUtils.base64Encode(value), - encoding: encoding ? encoding + '/base64' : 'base64', - }; - } + private static _encodeStateData(data: StateData, encodeFn: StateDataEncodeFunction): StateData { + const { value: newValue, encoding: newEncoding } = encodeFn(data?.value, data?.encoding); - // toBuffer returns a datatype understandable by - // that platform's msgpack implementation (Buffer in node, Uint8Array in browsers) return { - value: platform.BufferUtils.toBuffer(value), - encoding, + ...data, + value: newValue, + encoding: newEncoding!, }; } /** - * Overload toJSON() to intercept JSON.stringify() - * @return {*} + * Overload toJSON() to intercept JSON.stringify(). + * + * This will prepare the message to be transmitted over the wire to Ably. + * It will encode the data payload according to the wire protocol used on the client. + * It will transform any client-side enum string representations into their corresponding numbers, if needed (like "action" fields). */ toJSON(): { id?: string; @@ -332,19 +327,24 @@ export class StateMessage { object?: StateObject; extras?: any; } { - // need to encode buffer data to base64 if present and if we're returning a real JSON. - // although msgpack also calls toJSON() directly, - // we know it is a JSON.stringify() call if we have a non-empty arguments list. - // if withBase64Encoding = true - JSON.stringify() call - // if withBase64Encoding = false - we were called by msgpack - const withBase64Encoding = arguments.length > 0; - - const encodedOperation = this.operation - ? StateMessage._encodeStateOperation(this._platform, this.operation, withBase64Encoding) - : undefined; - const encodedObject = this.object - ? StateMessage._encodeStateObject(this._platform, this.object, withBase64Encoding) - : undefined; + // we can infer the format used by client by inspecting with what arguments this method was called. + // if JSON protocol is being used, the JSON.stringify() will be called and this toJSON() method will have a non-empty arguments list. + // MSGPack protocol implementation also calls toJSON(), but with an empty arguments list. + const format = arguments.length > 0 ? this._utils.Format.json : this._utils.Format.msgpack; + const encodeFn: StateDataEncodeFunction = (value, encoding) => { + const { data: newValue, encoding: newEncoding } = this._messageEncoding.encodeDataForWireProtocol( + value, + encoding, + format, + ); + return { + value: newValue, + encoding: newEncoding!, + }; + }; + + const encodedOperation = this.operation ? StateMessage._encodeStateOperation(this.operation, encodeFn) : undefined; + const encodedObject = this.object ? StateMessage._encodeStateObject(this.object, encodeFn) : undefined; return { id: this.id, diff --git a/test/realtime/live_objects.test.js b/test/realtime/live_objects.test.js index d1b3551ff..0d60a6c19 100644 --- a/test/realtime/live_objects.test.js +++ b/test/realtime/live_objects.test.js @@ -57,6 +57,17 @@ define(['ably', 'shared_helper', 'chai', 'live_objects', 'live_objects_helper'], return `${paddedTimestamp}-${paddedCounter}@${seriesId}` + (paddedIndex ? `:${paddedIndex}` : ''); } + async function expectRejectedWith(fn, errorStr) { + let verifiedError = false; + try { + await fn(); + } catch (error) { + expect(error.message).to.have.string(errorStr); + verifiedError = true; + } + expect(verifiedError, 'Expected async function to throw an error').to.be.true; + } + describe('realtime/live_objects', function () { this.timeout(60 * 1000); @@ -518,7 +529,7 @@ define(['ably', 'shared_helper', 'chai', 'live_objects', 'live_objects_helper'], { name: 'negativeMaxSafeIntegerCounter', count: -Number.MAX_SAFE_INTEGER }, ]; - const stateSyncSequenceScanarios = [ + const stateSyncSequenceScenarios = [ { description: 'STATE_SYNC sequence with state object "tombstone" property creates tombstoned object', action: async (ctx) => { @@ -2060,9 +2071,370 @@ define(['ably', 'shared_helper', 'chai', 'live_objects', 'live_objects_helper'], }, ]; + const writeApiScenarios = [ + { + description: 'LiveCounter.increment sends COUNTER_INC operation', + action: async (ctx) => { + const { root, liveObjectsHelper, channelName } = ctx; + + await liveObjectsHelper.createAndSetOnMap(channelName, { + mapObjectId: 'root', + key: 'counter', + createOp: liveObjectsHelper.counterCreateOp(), + }); + + const counter = root.get('counter'); + const increments = [ + 1, // value=1 + 10, // value=11 + -11, // value=0 + -1, // value=-1 + -10, // value=-11 + 11, // value=0 + Number.MAX_SAFE_INTEGER, // value=9007199254740991 + -Number.MAX_SAFE_INTEGER, // value=0 + -Number.MAX_SAFE_INTEGER, // value=-9007199254740991 + ]; + let expectedCounterValue = 0; + + for (let i = 0; i < increments.length; i++) { + const increment = increments[i]; + expectedCounterValue += increment; + await counter.increment(increment); + + expect(counter.value()).to.equal( + expectedCounterValue, + `Check counter has correct value after ${i + 1} LiveCounter.increment calls`, + ); + } + }, + }, + + { + description: 'LiveCounter.increment throws on invalid input', + action: async (ctx) => { + const { root, liveObjectsHelper, channelName } = ctx; + + await liveObjectsHelper.createAndSetOnMap(channelName, { + mapObjectId: 'root', + key: 'counter', + createOp: liveObjectsHelper.counterCreateOp(), + }); + + const counter = root.get('counter'); + + await expectRejectedWith( + async () => counter.increment(), + 'Counter value increment should be a valid number', + ); + await expectRejectedWith( + async () => counter.increment(null), + 'Counter value increment should be a valid number', + ); + await expectRejectedWith( + async () => counter.increment(Number.NaN), + 'Counter value increment should be a valid number', + ); + await expectRejectedWith( + async () => counter.increment(Number.POSITIVE_INFINITY), + 'Counter value increment should be a valid number', + ); + await expectRejectedWith( + async () => counter.increment(Number.NEGATIVE_INFINITY), + 'Counter value increment should be a valid number', + ); + await expectRejectedWith( + async () => counter.increment('foo'), + 'Counter value increment should be a valid number', + ); + await expectRejectedWith( + async () => counter.increment(BigInt(1)), + 'Counter value increment should be a valid number', + ); + await expectRejectedWith( + async () => counter.increment(true), + 'Counter value increment should be a valid number', + ); + await expectRejectedWith( + async () => counter.increment(Symbol()), + 'Counter value increment should be a valid number', + ); + await expectRejectedWith( + async () => counter.increment({}), + 'Counter value increment should be a valid number', + ); + await expectRejectedWith( + async () => counter.increment([]), + 'Counter value increment should be a valid number', + ); + await expectRejectedWith( + async () => counter.increment(counter), + 'Counter value increment should be a valid number', + ); + }, + }, + + { + description: 'LiveCounter.decrement sends COUNTER_INC operation', + action: async (ctx) => { + const { root, liveObjectsHelper, channelName } = ctx; + + await liveObjectsHelper.createAndSetOnMap(channelName, { + mapObjectId: 'root', + key: 'counter', + createOp: liveObjectsHelper.counterCreateOp(), + }); + + const counter = root.get('counter'); + const decrements = [ + 1, // value=-1 + 10, // value=-11 + -11, // value=0 + -1, // value=1 + -10, // value=11 + 11, // value=0 + Number.MAX_SAFE_INTEGER, // value=-9007199254740991 + -Number.MAX_SAFE_INTEGER, // value=0 + -Number.MAX_SAFE_INTEGER, // value=9007199254740991 + ]; + let expectedCounterValue = 0; + + for (let i = 0; i < decrements.length; i++) { + const decrement = decrements[i]; + expectedCounterValue -= decrement; + await counter.decrement(decrement); + + expect(counter.value()).to.equal( + expectedCounterValue, + `Check counter has correct value after ${i + 1} LiveCounter.decrement calls`, + ); + } + }, + }, + + { + description: 'LiveCounter.decrement throws on invalid input', + action: async (ctx) => { + const { root, liveObjectsHelper, channelName } = ctx; + + await liveObjectsHelper.createAndSetOnMap(channelName, { + mapObjectId: 'root', + key: 'counter', + createOp: liveObjectsHelper.counterCreateOp(), + }); + + const counter = root.get('counter'); + + await expectRejectedWith( + async () => counter.decrement(), + 'Counter value decrement should be a valid number', + ); + await expectRejectedWith( + async () => counter.decrement(null), + 'Counter value decrement should be a valid number', + ); + await expectRejectedWith( + async () => counter.decrement(Number.NaN), + 'Counter value decrement should be a valid number', + ); + await expectRejectedWith( + async () => counter.decrement(Number.POSITIVE_INFINITY), + 'Counter value decrement should be a valid number', + ); + await expectRejectedWith( + async () => counter.decrement(Number.NEGATIVE_INFINITY), + 'Counter value decrement should be a valid number', + ); + await expectRejectedWith( + async () => counter.decrement('foo'), + 'Counter value decrement should be a valid number', + ); + await expectRejectedWith( + async () => counter.decrement(BigInt(1)), + 'Counter value decrement should be a valid number', + ); + await expectRejectedWith( + async () => counter.decrement(true), + 'Counter value decrement should be a valid number', + ); + await expectRejectedWith( + async () => counter.decrement(Symbol()), + 'Counter value decrement should be a valid number', + ); + await expectRejectedWith( + async () => counter.decrement({}), + 'Counter value decrement should be a valid number', + ); + await expectRejectedWith( + async () => counter.decrement([]), + 'Counter value decrement should be a valid number', + ); + await expectRejectedWith( + async () => counter.decrement(counter), + 'Counter value decrement should be a valid number', + ); + }, + }, + + { + description: 'LiveMap.set sends MAP_SET operation with primitive values', + action: async (ctx) => { + const { root } = ctx; + + await Promise.all( + primitiveKeyData.map(async (keyData) => { + const value = keyData.data.encoding ? BufferUtils.base64Decode(keyData.data.value) : keyData.data.value; + await root.set(keyData.key, value); + }), + ); + + // check everything is applied correctly + primitiveKeyData.forEach((keyData) => { + if (keyData.data.encoding) { + expect( + BufferUtils.areBuffersEqual(root.get(keyData.key), BufferUtils.base64Decode(keyData.data.value)), + `Check root has correct value for "${keyData.key}" key after LiveMap.set call`, + ).to.be.true; + } else { + expect(root.get(keyData.key)).to.equal( + keyData.data.value, + `Check root has correct value for "${keyData.key}" key after LiveMap.set call`, + ); + } + }); + }, + }, + + { + description: 'LiveMap.set sends MAP_SET operation with reference to another LiveObject', + action: async (ctx) => { + const { root, liveObjectsHelper, channelName } = ctx; + + await liveObjectsHelper.createAndSetOnMap(channelName, { + mapObjectId: 'root', + key: 'counter', + createOp: liveObjectsHelper.counterCreateOp(), + }); + await liveObjectsHelper.createAndSetOnMap(channelName, { + mapObjectId: 'root', + key: 'map', + createOp: liveObjectsHelper.mapCreateOp(), + }); + + const counter = root.get('counter'); + const map = root.get('map'); + + await root.set('counter2', counter); + await root.set('map2', map); + + expect(root.get('counter2')).to.equal( + counter, + 'Check can set a reference to a LiveCounter object on a root via a LiveMap.set call', + ); + expect(root.get('map2')).to.equal( + map, + 'Check can set a reference to a LiveMap object on a root via a LiveMap.set call', + ); + }, + }, + + { + description: 'LiveMap.set throws on invalid input', + action: async (ctx) => { + const { root, liveObjectsHelper, channelName } = ctx; + + await liveObjectsHelper.createAndSetOnMap(channelName, { + mapObjectId: 'root', + key: 'map', + createOp: liveObjectsHelper.mapCreateOp(), + }); + + const map = root.get('map'); + + await expectRejectedWith(async () => map.set(), 'Map key should be string'); + await expectRejectedWith(async () => map.set(null), 'Map key should be string'); + await expectRejectedWith(async () => map.set(1), 'Map key should be string'); + await expectRejectedWith(async () => map.set(BigInt(1)), 'Map key should be string'); + await expectRejectedWith(async () => map.set(true), 'Map key should be string'); + await expectRejectedWith(async () => map.set(Symbol()), 'Map key should be string'); + await expectRejectedWith(async () => map.set({}), 'Map key should be string'); + await expectRejectedWith(async () => map.set([]), 'Map key should be string'); + await expectRejectedWith(async () => map.set(map), 'Map key should be string'); + + await expectRejectedWith(async () => map.set('key'), 'Map value data type is unsupported'); + await expectRejectedWith(async () => map.set('key', null), 'Map value data type is unsupported'); + await expectRejectedWith(async () => map.set('key', BigInt(1)), 'Map value data type is unsupported'); + await expectRejectedWith(async () => map.set('key', Symbol()), 'Map value data type is unsupported'); + await expectRejectedWith(async () => map.set('key', {}), 'Map value data type is unsupported'); + await expectRejectedWith(async () => map.set('key', []), 'Map value data type is unsupported'); + }, + }, + + { + description: 'LiveMap.remove sends MAP_REMOVE operation', + action: async (ctx) => { + const { root, liveObjectsHelper, channelName } = ctx; + + await liveObjectsHelper.createAndSetOnMap(channelName, { + mapObjectId: 'root', + key: 'map', + createOp: liveObjectsHelper.mapCreateOp({ + entries: { + foo: { data: { value: 1 } }, + bar: { data: { value: 1 } }, + baz: { data: { value: 1 } }, + }, + }), + }); + + const map = root.get('map'); + + await map.remove('foo'); + await map.remove('bar'); + + expect(map.get('foo'), 'Check can remove a key from a root via a LiveMap.remove call').to.not.exist; + expect(map.get('bar'), 'Check can remove a key from a root via a LiveMap.remove call').to.not.exist; + expect( + map.get('baz'), + 'Check non-removed keys are still present on a root after LiveMap.remove call for another keys', + ).to.equal(1); + }, + }, + + { + description: 'LiveMap.remove throws on invalid input', + action: async (ctx) => { + const { root, liveObjectsHelper, channelName } = ctx; + + await liveObjectsHelper.createAndSetOnMap(channelName, { + mapObjectId: 'root', + key: 'map', + createOp: liveObjectsHelper.mapCreateOp(), + }); + + const map = root.get('map'); + + await expectRejectedWith(async () => map.remove(), 'Map key should be string'); + await expectRejectedWith(async () => map.remove(null), 'Map key should be string'); + await expectRejectedWith(async () => map.remove(1), 'Map key should be string'); + await expectRejectedWith(async () => map.remove(BigInt(1)), 'Map key should be string'); + await expectRejectedWith(async () => map.remove(true), 'Map key should be string'); + await expectRejectedWith(async () => map.remove(Symbol()), 'Map key should be string'); + await expectRejectedWith(async () => map.remove({}), 'Map key should be string'); + await expectRejectedWith(async () => map.remove([]), 'Map key should be string'); + await expectRejectedWith(async () => map.remove(map), 'Map key should be string'); + }, + }, + ]; + /** @nospec */ forScenarios( - [...stateSyncSequenceScanarios, ...applyOperationsScenarios, ...applyOperationsDuringSyncScenarios], + [ + ...stateSyncSequenceScenarios, + ...applyOperationsScenarios, + ...applyOperationsDuringSyncScenarios, + ...writeApiScenarios, + ], async function (helper, scenario) { const liveObjectsHelper = new LiveObjectsHelper(helper); const client = RealtimeWithLiveObjects(helper);