From e84f5483da2f1e289085c24e99297a8e934889e2 Mon Sep 17 00:00:00 2001 From: Simon Woolf Date: Wed, 8 Jan 2025 00:05:15 +0000 Subject: [PATCH] Restructure and clean up message and presencemessage types - extract common functionality to a new BaseMessage file - introduce proper WireMessage and WirePresenceMessage classes, such that: - you just call Message.encode() to get a WireMessage, and WireMessage.decode() to get a Message, and the type system will tell you which one you have - the toJSON override hack only applies to internal wire-protocol uses and doesn't interfere with the normal public Message/PresenceMessage classes - all necessary transformations between userfriendly message and wire protocol messages (encoding the data and the action) are now done by decode() and encode(), rather than (as before) the encoding happening in-place and the action encoding happening only on json-stringification - ProtocolMessage is now always a wire-protocol message and always holds WireMessages and WirePresence; all decoding is done by the channel - The PresencePlugin interface is now just the PresenceMessage and WirePresenceMessage types, rather than ad-hoc individual functions - The fromWireProtocol, encode, and decode functions in the DefaultMessage interface are no longer needed and can be removed - Cleaned up and simplified the message decoding step in the channel, which was unnecessarily complicated --- ably.d.ts | 2 +- src/common/lib/client/defaultrealtime.ts | 11 +- src/common/lib/client/modularplugins.ts | 11 +- src/common/lib/client/presencemap.ts | 6 +- src/common/lib/client/realtimechannel.ts | 148 ++--- src/common/lib/client/realtimepresence.ts | 32 +- src/common/lib/client/restchannel.ts | 18 +- src/common/lib/client/restchannelmixin.ts | 8 +- src/common/lib/client/restpresence.ts | 8 +- src/common/lib/client/restpresencemixin.ts | 8 +- src/common/lib/transport/comettransport.ts | 2 +- src/common/lib/transport/connectionmanager.ts | 2 +- src/common/lib/transport/protocol.ts | 3 +- src/common/lib/transport/transport.ts | 2 +- src/common/lib/types/basemessage.ts | 270 ++++++++ src/common/lib/types/defaultmessage.ts | 39 +- .../lib/types/defaultpresencemessage.ts | 18 +- src/common/lib/types/message.ts | 394 +++-------- src/common/lib/types/presencemessage.ts | 181 ++---- src/common/lib/types/protocolmessage.ts | 94 +-- src/common/lib/types/protocolmessagecommon.ts | 49 ++ src/platform/web/modular/presencemessage.ts | 4 +- src/platform/web/modular/realtimepresence.ts | 11 +- test/realtime/crypto.test.js | 18 +- test/realtime/message.test.js | 31 +- test/realtime/presence.test.js | 84 +-- test/realtime/sync.test.js | 614 ++++++++++-------- test/rest/presence.test.js | 4 +- 28 files changed, 997 insertions(+), 1075 deletions(-) create mode 100644 src/common/lib/types/basemessage.ts create mode 100644 src/common/lib/types/protocolmessagecommon.ts diff --git a/ably.d.ts b/ably.d.ts index 77ed8b98e..5028351ea 100644 --- a/ably.d.ts +++ b/ably.d.ts @@ -875,7 +875,7 @@ export type ChannelMode = | ChannelModes.PUBLISH | ChannelModes.SUBSCRIBE | ChannelModes.PRESENCE - | ChannelModes.PRESENCE_SUBSCRIBE + | ChannelModes.PRESENCE_SUBSCRIBE; /** * Passes additional properties to a {@link Channel} or {@link RealtimeChannel} object, such as encryption, {@link ChannelMode} and channel parameters. diff --git a/src/common/lib/client/defaultrealtime.ts b/src/common/lib/client/defaultrealtime.ts index da67d5d4a..447b82ee6 100644 --- a/src/common/lib/client/defaultrealtime.ts +++ b/src/common/lib/client/defaultrealtime.ts @@ -12,11 +12,7 @@ import { DefaultPresenceMessage } from '../types/defaultpresencemessage'; import WebSocketTransport from '../transport/websockettransport'; import { FilteredSubscriptions } from './filteredsubscriptions'; import { PresenceMap } from './presencemap'; -import { - fromValues as presenceMessageFromValues, - fromValuesArray as presenceMessagesFromValuesArray, - fromWireProtocol as presenceMessageFromWireProtocol, -} from '../types/presencemessage'; +import PresenceMessage, { WirePresenceMessage } from '../types/presencemessage'; import { Http } from 'common/types/http'; import Defaults from '../util/defaults'; import Logger from '../util/logger'; @@ -39,9 +35,8 @@ export class DefaultRealtime extends BaseRealtime { MsgPack, RealtimePresence: { RealtimePresence, - presenceMessageFromValues, - presenceMessagesFromValuesArray, - presenceMessageFromWireProtocol, + PresenceMessage, + WirePresenceMessage, }, WebSocketTransport, MessageInteractions: FilteredSubscriptions, diff --git a/src/common/lib/client/modularplugins.ts b/src/common/lib/client/modularplugins.ts index c3bdf1ea0..1729d59f9 100644 --- a/src/common/lib/client/modularplugins.ts +++ b/src/common/lib/client/modularplugins.ts @@ -5,18 +5,13 @@ import RealtimePresence from './realtimepresence'; import XHRRequest from 'platform/web/lib/http/request/xhrrequest'; import fetchRequest from 'platform/web/lib/http/request/fetchrequest'; import { FilteredSubscriptions } from './filteredsubscriptions'; -import { - fromValues as presenceMessageFromValues, - fromValuesArray as presenceMessagesFromValuesArray, - fromWireProtocol as presenceMessageFromWireProtocol, -} from '../types/presencemessage'; +import PresenceMessage, { WirePresenceMessage } from '../types/presencemessage'; import { TransportCtor } from '../transport/transport'; import * as PushPlugin from 'plugins/push'; export interface PresenceMessagePlugin { - presenceMessageFromValues: typeof presenceMessageFromValues; - presenceMessagesFromValuesArray: typeof presenceMessagesFromValuesArray; - presenceMessageFromWireProtocol: typeof presenceMessageFromWireProtocol; + PresenceMessage: typeof PresenceMessage; + WirePresenceMessage: typeof WirePresenceMessage; } export type RealtimePresencePlugin = PresenceMessagePlugin & { diff --git a/src/common/lib/client/presencemap.ts b/src/common/lib/client/presencemap.ts index 793fde54b..0cace6c59 100644 --- a/src/common/lib/client/presencemap.ts +++ b/src/common/lib/client/presencemap.ts @@ -1,7 +1,7 @@ import * as Utils from '../util/utils'; import EventEmitter from '../util/eventemitter'; import Logger from '../util/logger'; -import PresenceMessage, { fromValues as presenceMessageFromValues } from '../types/presencemessage'; +import PresenceMessage from '../types/presencemessage'; import type RealtimePresence from './realtimepresence'; @@ -80,7 +80,7 @@ export class PresenceMap extends EventEmitter { put(item: PresenceMessage) { if (item.action === 'enter' || item.action === 'update') { - item = presenceMessageFromValues(item); + item = PresenceMessage.fromValues(item); item.action = 'present'; } const map = this.map, @@ -118,7 +118,7 @@ export class PresenceMap extends EventEmitter { /* RTP2f */ if (this.syncInProgress) { - item = presenceMessageFromValues(item); + item = PresenceMessage.fromValues(item); item.action = 'absent'; map[key] = item; } else { diff --git a/src/common/lib/client/realtimechannel.ts b/src/common/lib/client/realtimechannel.ts index 988d7828f..685ff88e2 100644 --- a/src/common/lib/client/realtimechannel.ts +++ b/src/common/lib/client/realtimechannel.ts @@ -1,24 +1,14 @@ -import ProtocolMessage, { - actions, - channelModes, - fromValues as protocolMessageFromValues, -} from '../types/protocolmessage'; +import { actions, channelModes } from '../types/protocolmessagecommon'; +import ProtocolMessage, { fromValues as protocolMessageFromValues } from '../types/protocolmessage'; import EventEmitter from '../util/eventemitter'; import * as Utils from '../util/utils'; import Logger from '../util/logger'; import RealtimePresence from './realtimepresence'; -import Message, { - fromValues as messageFromValues, - fromValuesArray as messagesFromValuesArray, - encodeArray as encodeMessagesArray, - decode as decodeMessage, - getMessagesSize, - CipherOptions, - EncodingDecodingContext, -} from '../types/message'; +import { EncodingDecodingContext, CipherOptions, populateFieldsFromParent } from '../types/basemessage'; +import Message, { WireMessage, getMessagesSize, encodeArray as encodeMessagesArray } from '../types/message'; import ChannelStateChange from './channelstatechange'; import ErrorInfo, { PartialErrorInfo } from '../types/errorinfo'; -import PresenceMessage, { decode as decodePresenceMessage } from '../types/presencemessage'; +import { WirePresenceMessage } from '../types/presencemessage'; import ConnectionErrors from '../transport/connectionerrors'; import * as API from '../../../../ably'; import ConnectionManager from '../transport/connectionmanager'; @@ -225,28 +215,32 @@ class RealtimeChannel extends EventEmitter { } async publish(...args: any[]): Promise { - let messages = args[0]; + let messages: Message[]; let argCount = args.length; if (!this.connectionManager.activeState()) { throw this.connectionManager.getError(); } if (argCount == 1) { - if (Utils.isObject(messages)) messages = [messageFromValues(messages)]; - else if (Array.isArray(messages)) messages = messagesFromValuesArray(messages); - else + if (Utils.isObject(args[0])) { + messages = [Message.fromValues(args[0])]; + } else if (Array.isArray(args[0])) { + messages = Message.fromValuesArray(args[0]); + } else { throw new ErrorInfo( 'The single-argument form of publish() expects a message object or an array of message objects', 40013, 400, ); + } } else { - messages = [messageFromValues({ name: args[0], data: args[1] })]; + messages = [Message.fromValues({ name: args[0], data: args[1] })]; } const maxMessageSize = this.client.options.maxMessageSize; - await encodeMessagesArray(messages, this.channelOptions as CipherOptions); + // TODO get rid of CipherOptions type assertion, indicates channeloptions types are broken + const wireMessages = await encodeMessagesArray(messages, this.channelOptions as CipherOptions); /* RSL1i */ - const size = getMessagesSize(messages); + const size = getMessagesSize(wireMessages); if (size > maxMessageSize) { throw new ErrorInfo( 'Maximum size of messages that can be published at once exceeded ( was ' + @@ -259,11 +253,11 @@ class RealtimeChannel extends EventEmitter { ); } return new Promise((resolve, reject) => { - this._publish(messages, (err) => (err ? reject(err) : resolve())); + this._publish(wireMessages, (err) => (err ? reject(err) : resolve())); }); } - _publish(messages: Array, callback: ErrCallback) { + _publish(messages: Array, callback: ErrCallback) { Logger.logAction(this.logger, Logger.LOG_MICRO, 'RealtimeChannel.publish()', 'message count = ' + messages.length); const state = this.state; switch (state) { @@ -482,13 +476,11 @@ class RealtimeChannel extends EventEmitter { this.connectionManager.send(msg, this.client.options.queueMessages, callback); } - sendPresence(presence: PresenceMessage | PresenceMessage[], callback?: ErrCallback): void { + sendPresence(presence: WirePresenceMessage[], callback?: ErrCallback): void { const msg = protocolMessageFromValues({ action: actions.PRESENCE, channel: this.name, - presence: Array.isArray(presence) - ? this.client._RealtimePresence!.presenceMessagesFromValuesArray(presence) - : [this.client._RealtimePresence!.presenceMessageFromValues(presence)], + presence: presence, }); this.sendMessage(msg, callback); } @@ -565,14 +557,17 @@ class RealtimeChannel extends EventEmitter { if (!message.presence) break; // eslint-disable-next-line no-fallthrough case actions.PRESENCE: { - const presenceMessages = message.presence; - - if (!presenceMessages) { + if (!message.presence) { break; } + populateFieldsFromParent(message); const options = this.channelOptions; - await this._decodeAndPrepareMessages(message, presenceMessages, (msg) => decodePresenceMessage(msg, options)); + const presenceMessages = await Promise.all( + message.presence.map((wpm) => { + return wpm.decode(options, this.logger); + }), + ); if (this._presence) { this._presence.setPresence(presenceMessages, isSync, syncChannelSerial as any); @@ -597,9 +592,11 @@ class RealtimeChannel extends EventEmitter { return; } - const messages = message.messages as Array, - firstMessage = messages[0], - lastMessage = messages[messages.length - 1], + populateFieldsFromParent(message); + + const encoded = message.messages!, + firstMessage = encoded[0], + lastMessage = encoded[encoded.length - 1], channelSerial = message.channelSerial; if ( @@ -618,44 +615,34 @@ class RealtimeChannel extends EventEmitter { break; } - const { unrecoverableError } = await this._decodeAndPrepareMessages( - message, - messages, - (msg) => decodeMessage(msg, this._decodingContext), - (e) => { - /* decrypt failed .. the most likely cause is that we have the wrong key */ - const errorInfo = e as ErrorInfo; + let messages: Message[] = []; + for (let i = 0; i < encoded.length; i++) { + const { decoded, err } = await encoded[i].decodeWithErr(this._decodingContext, this.logger); + messages[i] = decoded; - switch (errorInfo.code) { + if (err) { + switch (err.code) { case 40018: /* decode failure */ - this._startDecodeFailureRecovery(errorInfo); - return { unrecoverableError: true }; + this._startDecodeFailureRecovery(err); + return; - case 40019: - /* No vcdiff plugin passed in - no point recovering, give up */ - // eslint-disable-next-line no-fallthrough + case 40019: /* No vcdiff plugin passed in - no point recovering, give up */ case 40021: /* Browser does not support deltas, similarly no point recovering */ - this.notifyState('failed', errorInfo); - return { unrecoverableError: true }; + this.notifyState('failed', err); + return; default: - return { unrecoverableError: false }; + // do nothing, continue decoding } - }, - ); - if (unrecoverableError) { - return; + } } for (let i = 0; i < messages.length; i++) { const msg = messages[i]; if (channelSerial && !msg.version) { msg.version = channelSerial + ':' + i.toString().padStart(3, '0'); - // already done in fromWireProtocol -- but for realtime messages the source - // fields might be copied from the protocolmessage, so need to do it again - msg.expandFields(); } } @@ -688,51 +675,6 @@ class RealtimeChannel extends EventEmitter { } } - /** - * Mutates provided messages by adding `connectionId`, `timestamp` and `id` fields, and decoding message data. - * - * @returns `unrecoverableError` flag. If `true` indicates that unrecoverable error was encountered during message decoding - * and any further message processing should be stopped. Always equals to `false` if `decodeErrorRecoveryHandler` was not provided - */ - private async _decodeAndPrepareMessages( - protocolMessage: ProtocolMessage, - messages: T[], - decodeFn: (msg: T) => Promise, - decodeErrorRecoveryHandler?: (e: Error) => { unrecoverableError: boolean }, - ): Promise<{ unrecoverableError: boolean }> { - const { id, connectionId, timestamp } = protocolMessage; - - for (let i = 0; i < messages.length; i++) { - const msg = messages[i]; - - try { - // decode underlying data for a message - await decodeFn(msg); - } catch (e) { - Logger.logAction( - this.logger, - Logger.LOG_ERROR, - 'RealtimeChannel.decodeAndPrepareMessages()', - (e as Error).toString(), - ); - - if (decodeErrorRecoveryHandler) { - const { unrecoverableError } = decodeErrorRecoveryHandler(e as Error); - if (unrecoverableError) { - // break out of for loop by returning - return { unrecoverableError: true }; - } - } - } - - if (!msg.connectionId) msg.connectionId = connectionId; - if (!msg.timestamp) msg.timestamp = timestamp; - if (id && !msg.id) msg.id = id + ':' + i; - } - - return { unrecoverableError: false }; - } - _startDecodeFailureRecovery(reason: ErrorInfo): void { if (!this._lastPayload.decodeFailureRecoveryInProgress) { Logger.logAction( diff --git a/src/common/lib/client/realtimepresence.ts b/src/common/lib/client/realtimepresence.ts index 64ad344f6..7f1ce2d9d 100644 --- a/src/common/lib/client/realtimepresence.ts +++ b/src/common/lib/client/realtimepresence.ts @@ -1,16 +1,12 @@ import * as Utils from '../util/utils'; import EventEmitter from '../util/eventemitter'; import Logger from '../util/logger'; -import PresenceMessage, { - fromValues as presenceMessageFromValues, - fromData as presenceMessageFromData, - encode as encodePresenceMessage, -} from '../types/presencemessage'; +import PresenceMessage, { WirePresenceMessage } from '../types/presencemessage'; +import type { CipherOptions } from '../types/basemessage'; import ErrorInfo, { PartialErrorInfo } from '../types/errorinfo'; import RealtimeChannel from './realtimechannel'; import Multicaster from '../util/multicaster'; import ChannelStateChange from './channelstatechange'; -import { CipherOptions } from '../types/message'; import { ErrCallback } from '../../types/utils'; import { PaginatedResult } from './paginatedresource'; import { PresenceMap, RealtimePresenceParams } from './presencemap'; @@ -61,7 +57,7 @@ function waitAttached(channel: RealtimeChannel, callback: ErrCallback, action: ( class RealtimePresence extends EventEmitter { channel: RealtimeChannel; - pendingPresence: { presence: PresenceMessage; callback: ErrCallback }[]; + pendingPresence: { presence: WirePresenceMessage; callback: ErrCallback }[]; syncComplete: boolean; members: PresenceMap; _myMembers: PresenceMap; @@ -119,7 +115,7 @@ class RealtimePresence extends EventEmitter { 'channel = ' + channel.name + ', id = ' + id + ', client = ' + (clientId || '(implicit) ' + getClientId(this)), ); - const presence = presenceMessageFromData(data); + const presence = PresenceMessage.fromData(data); presence.action = action; if (id) { presence.id = id; @@ -127,12 +123,12 @@ class RealtimePresence extends EventEmitter { if (clientId) { presence.clientId = clientId; } + const wirePresMsg = await presence.encode(channel.channelOptions as CipherOptions); - await encodePresenceMessage(presence, channel.channelOptions as CipherOptions); switch (channel.state) { case 'attached': return new Promise((resolve, reject) => { - channel.sendPresence(presence, (err) => (err ? reject(err) : resolve())); + channel.sendPresence([wirePresMsg], (err) => (err ? reject(err) : resolve())); }); case 'initialized': case 'detached': @@ -141,7 +137,7 @@ class RealtimePresence extends EventEmitter { case 'attaching': return new Promise((resolve, reject) => { this.pendingPresence.push({ - presence: presence, + presence: wirePresMsg, callback: (err) => (err ? reject(err) : resolve()), }); }); @@ -175,20 +171,21 @@ class RealtimePresence extends EventEmitter { 'RealtimePresence.leaveClient()', 'leaving; channel = ' + this.channel.name + ', client = ' + clientId, ); - const presence = presenceMessageFromData(data); + const presence = PresenceMessage.fromData(data); presence.action = 'leave'; if (clientId) { presence.clientId = clientId; } + const wirePresMsg = await presence.encode(channel.channelOptions as CipherOptions); return new Promise((resolve, reject) => { switch (channel.state) { case 'attached': - channel.sendPresence(presence, (err) => (err ? reject(err) : resolve())); + channel.sendPresence([wirePresMsg], (err) => (err ? reject(err) : resolve())); break; case 'attaching': this.pendingPresence.push({ - presence: presence, + presence: wirePresMsg, callback: (err) => (err ? reject(err) : resolve()), }); break; @@ -288,8 +285,7 @@ class RealtimePresence extends EventEmitter { } } - for (let i = 0; i < presenceSet.length; i++) { - const presence = presenceMessageFromValues(presenceSet[i]); + for (let presence of presenceSet) { switch (presence.action) { case 'leave': if (members.remove(presence)) { @@ -320,7 +316,7 @@ class RealtimePresence extends EventEmitter { /* broadcast to listeners */ for (let i = 0; i < broadcastMessages.length; i++) { const presence = broadcastMessages[i]; - this.subscriptions.emit(presence.action as string, presence); + this.subscriptions.emit(presence.action!, presence); } } @@ -435,7 +431,7 @@ class RealtimePresence extends EventEmitter { _synthesizeLeaves(items: PresenceMessage[]): void { const subscriptions = this.subscriptions; items.forEach(function (item) { - const presence = presenceMessageFromValues({ + const presence = PresenceMessage.fromValues({ action: 'leave', connectionId: item.connectionId, clientId: item.clientId, diff --git a/src/common/lib/client/restchannel.ts b/src/common/lib/client/restchannel.ts index e27133c6e..0c14d120a 100644 --- a/src/common/lib/client/restchannel.ts +++ b/src/common/lib/client/restchannel.ts @@ -2,13 +2,11 @@ import * as Utils from '../util/utils'; import Logger from '../util/logger'; import RestPresence from './restpresence'; import Message, { - encodeArray as encodeMessagesArray, serialize as serializeMessage, getMessagesSize, - CipherOptions, - fromValues as messageFromValues, - fromValuesArray as messagesFromValuesArray, + encodeArray as encodeMessagesArray, } from '../types/message'; +import { CipherOptions } from '../types/basemessage'; import ErrorInfo from '../types/errorinfo'; import { PaginatedResult } from './paginatedresource'; import Resource from './resource'; @@ -74,13 +72,13 @@ class RestChannel { if (typeof first === 'string' || first === null) { /* (name, data, ...) */ - messages = [messageFromValues({ name: first, data: second })]; + messages = [Message.fromValues({ name: first, data: second })]; params = args[2]; } else if (Utils.isObject(first)) { - messages = [messageFromValues(first)]; + messages = [Message.fromValues(first)]; params = args[1]; } else if (Array.isArray(first)) { - messages = messagesFromValuesArray(first); + messages = Message.fromValuesArray(first); params = args[1]; } else { throw new ErrorInfo( @@ -110,10 +108,10 @@ class RestChannel { }); } - await encodeMessagesArray(messages, this.channelOptions as CipherOptions); + const wireMessages = await encodeMessagesArray(messages, this.channelOptions as CipherOptions); /* RSL1i */ - const size = getMessagesSize(messages), + const size = getMessagesSize(wireMessages), maxMessageSize = options.maxMessageSize; if (size > maxMessageSize) { throw new ErrorInfo( @@ -127,7 +125,7 @@ class RestChannel { ); } - await this._publish(serializeMessage(messages, client._MsgPack, format), headers, params); + await this._publish(serializeMessage(wireMessages, client._MsgPack, format), headers, params); } async _publish(requestBody: RequestBody | null, headers: Record, params: any): Promise { diff --git a/src/common/lib/client/restchannelmixin.ts b/src/common/lib/client/restchannelmixin.ts index 67fd701f3..b24ccb6bd 100644 --- a/src/common/lib/client/restchannelmixin.ts +++ b/src/common/lib/client/restchannelmixin.ts @@ -2,7 +2,7 @@ import * as API from '../../../../ably'; import RestChannel from './restchannel'; import RealtimeChannel from './realtimechannel'; import * as Utils from '../util/utils'; -import Message, { WireProtocolMessage, _fromEncodedArray } from '../types/message'; +import Message, { WireMessage, _fromEncodedArray } from '../types/message'; import Defaults from '../util/defaults'; import PaginatedResource, { PaginatedResult } from './paginatedresource'; import Resource from './resource'; @@ -35,9 +35,9 @@ export class RestChannelMixin { headers, unpacked, ) { - const decoded: WireProtocolMessage[] = unpacked - ? (body as WireProtocolMessage[]) - : Utils.decodeBody(body, client._MsgPack, format); + const decoded = ( + unpacked ? body : Utils.decodeBody(body, client._MsgPack, format) + ) as Utils.Properties[]; return _fromEncodedArray(decoded, channel); }).get(params as Record); diff --git a/src/common/lib/client/restpresence.ts b/src/common/lib/client/restpresence.ts index 6bf932746..0ac42a0c5 100644 --- a/src/common/lib/client/restpresence.ts +++ b/src/common/lib/client/restpresence.ts @@ -1,7 +1,7 @@ import * as Utils from '../util/utils'; import Logger from '../util/logger'; import PaginatedResource, { PaginatedResult } from './paginatedresource'; -import PresenceMessage, { WireProtocolPresenceMessage, _fromEncodedArray } from '../types/presencemessage'; +import PresenceMessage, { WirePresenceMessage, _fromEncodedArray } from '../types/presencemessage'; import RestChannel from './restchannel'; import Defaults from '../util/defaults'; @@ -31,9 +31,9 @@ class RestPresence { headers, envelope, async (body, headers, unpacked) => { - const decoded: WireProtocolPresenceMessage[] = unpacked - ? (body as WireProtocolPresenceMessage[]) - : Utils.decodeBody(body, client._MsgPack, format); + const decoded = ( + unpacked ? body : Utils.decodeBody(body, client._MsgPack, format) + ) as Utils.Properties[]; return _fromEncodedArray(decoded, this.channel); }, diff --git a/src/common/lib/client/restpresencemixin.ts b/src/common/lib/client/restpresencemixin.ts index e9d562120..5ca340fb2 100644 --- a/src/common/lib/client/restpresencemixin.ts +++ b/src/common/lib/client/restpresencemixin.ts @@ -3,7 +3,7 @@ import RealtimePresence from './realtimepresence'; import * as Utils from '../util/utils'; import Defaults from '../util/defaults'; import PaginatedResource, { PaginatedResult } from './paginatedresource'; -import PresenceMessage, { WireProtocolPresenceMessage, _fromEncodedArray } from '../types/presencemessage'; +import PresenceMessage, { WirePresenceMessage, _fromEncodedArray } from '../types/presencemessage'; import { RestChannelMixin } from './restchannelmixin'; export class RestPresenceMixin { @@ -28,9 +28,9 @@ export class RestPresenceMixin { headers, envelope, async (body, headers, unpacked) => { - const decoded: WireProtocolPresenceMessage[] = unpacked - ? (body as WireProtocolPresenceMessage[]) - : Utils.decodeBody(body, client._MsgPack, format); + const decoded = ( + unpacked ? body : Utils.decodeBody(body, client._MsgPack, format) + ) as Utils.Properties[]; return _fromEncodedArray(decoded, presence.channel); }, diff --git a/src/common/lib/transport/comettransport.ts b/src/common/lib/transport/comettransport.ts index ab468b6cc..77f0a75f6 100644 --- a/src/common/lib/transport/comettransport.ts +++ b/src/common/lib/transport/comettransport.ts @@ -1,6 +1,6 @@ import * as Utils from '../util/utils'; +import { actions } from '../types/protocolmessagecommon'; import ProtocolMessage, { - actions, fromValues as protocolMessageFromValues, fromDeserialized as protocolMessageFromDeserialized, } from '../types/protocolmessage'; diff --git a/src/common/lib/transport/connectionmanager.ts b/src/common/lib/transport/connectionmanager.ts index 8ef56dcc2..73e948379 100644 --- a/src/common/lib/transport/connectionmanager.ts +++ b/src/common/lib/transport/connectionmanager.ts @@ -1,5 +1,5 @@ +import { actions } from '../types/protocolmessagecommon'; import ProtocolMessage, { - actions, stringify as stringifyProtocolMessage, fromValues as protocolMessageFromValues, } from 'common/lib/types/protocolmessage'; diff --git a/src/common/lib/transport/protocol.ts b/src/common/lib/transport/protocol.ts index cb91a1db2..5f792b1a7 100644 --- a/src/common/lib/transport/protocol.ts +++ b/src/common/lib/transport/protocol.ts @@ -1,4 +1,5 @@ -import ProtocolMessage, { actions, stringify as stringifyProtocolMessage } from '../types/protocolmessage'; +import { actions } from '../types/protocolmessagecommon'; +import ProtocolMessage, { stringify as stringifyProtocolMessage } from '../types/protocolmessage'; import * as Utils from '../util/utils'; import EventEmitter from '../util/eventemitter'; import Logger from '../util/logger'; diff --git a/src/common/lib/transport/transport.ts b/src/common/lib/transport/transport.ts index 23ad1ec05..5172bd6d6 100644 --- a/src/common/lib/transport/transport.ts +++ b/src/common/lib/transport/transport.ts @@ -1,5 +1,5 @@ +import { actions } from '../types/protocolmessagecommon'; import ProtocolMessage, { - actions, fromValues as protocolMessageFromValues, stringify as stringifyProtocolMessage, } from '../types/protocolmessage'; diff --git a/src/common/lib/types/basemessage.ts b/src/common/lib/types/basemessage.ts new file mode 100644 index 000000000..e2a86e95c --- /dev/null +++ b/src/common/lib/types/basemessage.ts @@ -0,0 +1,270 @@ +import Platform from 'common/platform'; +import Logger from '../util/logger'; +import ErrorInfo from './errorinfo'; +import * as Utils from '../util/utils'; +import { Bufferlike as BrowserBufferlike } from '../../../platform/web/lib/util/bufferutils'; +import * as API from '../../../../ably'; +import { actions } from './protocolmessagecommon'; + +import type { IUntypedCryptoStatic } from 'common/types/ICryptoStatic'; +import type { ChannelOptions } from '../../types/channel'; +import type ProtocolMessage from './protocolmessage'; + +export type CipherOptions = { + channelCipher: { + encrypt: Function; + algorithm: 'aes'; + }; + cipher?: { + channelCipher: { + encrypt: Function; + algorithm: 'aes'; + }; + }; +}; + +export type EncodingDecodingContext = { + channelOptions: ChannelOptions; + plugins: { + vcdiff?: { + decode: (delta: Uint8Array, source: Uint8Array) => Uint8Array; + }; + }; + baseEncodedPreviousPayload?: Buffer | BrowserBufferlike; +}; + +function normaliseContext(context: CipherOptions | EncodingDecodingContext | ChannelOptions): EncodingDecodingContext { + if (!context || !(context as EncodingDecodingContext).channelOptions) { + return { + channelOptions: context as ChannelOptions, + plugins: {}, + baseEncodedPreviousPayload: undefined, + }; + } + return context as EncodingDecodingContext; +} + +export function normalizeCipherOptions( + Crypto: IUntypedCryptoStatic | null, + logger: Logger, + options: API.ChannelOptions | null, +): ChannelOptions { + if (options && options.cipher) { + if (!Crypto) Utils.throwMissingPluginError('Crypto'); + const cipher = Crypto.getCipher(options.cipher, logger); + return { + cipher: cipher.cipherParams, + channelCipher: cipher.cipher, + }; + } + return options ?? {}; +} + +async function encrypt(msg: T, options: CipherOptions): Promise { + let data = msg.data, + encoding = msg.encoding, + cipher = options.channelCipher; + + encoding = encoding ? encoding + '/' : ''; + if (!Platform.BufferUtils.isBuffer(data)) { + data = Platform.BufferUtils.utf8Encode(String(data)); + encoding = encoding + 'utf-8/'; + } + const ciphertext = await cipher.encrypt(data); + msg.data = ciphertext; + msg.encoding = encoding + 'cipher+' + cipher.algorithm; + return msg; +} + +export async function encode(msg: T, options: CipherOptions): Promise { + const data = msg.data; + const nativeDataType = + typeof data == 'string' || Platform.BufferUtils.isBuffer(data) || data === null || data === undefined; + + if (!nativeDataType) { + if (Utils.isObject(data) || Array.isArray(data)) { + msg.data = JSON.stringify(data); + msg.encoding = msg.encoding ? msg.encoding + '/json' : 'json'; + } else { + throw new ErrorInfo('Data type is unsupported', 40013, 400); + } + } + + if (options != null && options.cipher) { + return encrypt(msg, options); + } else { + return msg; + } +} + +export async function decode( + message: T, + inputContext: CipherOptions | EncodingDecodingContext | ChannelOptions, +): Promise { + const context = normaliseContext(inputContext); + + let lastPayload = message.data; + const encoding = message.encoding; + if (encoding) { + const xforms = encoding.split('/'); + let lastProcessedEncodingIndex, + encodingsToProcess = xforms.length, + data = message.data; + + let xform = ''; + try { + while ((lastProcessedEncodingIndex = encodingsToProcess) > 0) { + // eslint-disable-next-line security/detect-unsafe-regex + const match = xforms[--encodingsToProcess].match(/([-\w]+)(\+([\w-]+))?/); + if (!match) break; + xform = match[1]; + switch (xform) { + case 'base64': + data = Platform.BufferUtils.base64Decode(String(data)); + if (lastProcessedEncodingIndex == xforms.length) { + lastPayload = data; + } + continue; + case 'utf-8': + data = Platform.BufferUtils.utf8Decode(data); + continue; + case 'json': + data = JSON.parse(data); + continue; + case 'cipher': + if ( + context.channelOptions != null && + context.channelOptions.cipher && + context.channelOptions.channelCipher + ) { + const xformAlgorithm = match[3], + cipher = context.channelOptions.channelCipher; + /* don't attempt to decrypt unless the cipher params are compatible */ + if (xformAlgorithm != cipher.algorithm) { + throw new Error('Unable to decrypt message with given cipher; incompatible cipher params'); + } + data = await cipher.decrypt(data); + continue; + } else { + throw new Error('Unable to decrypt message; not an encrypted channel'); + } + case 'vcdiff': + if (!context.plugins || !context.plugins.vcdiff) { + throw new ErrorInfo('Missing Vcdiff decoder (https://github.com/ably-forks/vcdiff-decoder)', 40019, 400); + } + if (typeof Uint8Array === 'undefined') { + throw new ErrorInfo( + 'Delta decoding not supported on this browser (need ArrayBuffer & Uint8Array)', + 40020, + 400, + ); + } + try { + let deltaBase = context.baseEncodedPreviousPayload; + if (typeof deltaBase === 'string') { + deltaBase = Platform.BufferUtils.utf8Encode(deltaBase); + } + + // vcdiff expects Uint8Arrays, can't copy with ArrayBuffers. + const deltaBaseBuffer = Platform.BufferUtils.toBuffer(deltaBase as Buffer); + data = Platform.BufferUtils.toBuffer(data); + + data = Platform.BufferUtils.arrayBufferViewToBuffer(context.plugins.vcdiff.decode(data, deltaBaseBuffer)); + lastPayload = data; + } catch (e) { + throw new ErrorInfo('Vcdiff delta decode failed with ' + e, 40018, 400); + } + continue; + default: + throw new Error('Unknown encoding'); + } + } + } catch (e) { + const err = e as ErrorInfo; + throw new ErrorInfo( + 'Error processing the ' + xform + ' encoding, decoder returned ‘' + err.message + '’', + err.code || 40013, + 400, + ); + } finally { + message.encoding = + (lastProcessedEncodingIndex as number) <= 0 ? null : xforms.slice(0, lastProcessedEncodingIndex).join('/'); + message.data = data; + } + } + context.baseEncodedPreviousPayload = lastPayload; +} + +export function wireToJSON(this: BaseMessage, ...args: any[]): any { + /* encode data to base64 if present and we're returning real JSON; + * although msgpack calls toJSON(), we know it is a stringify() + * call if it has a non-empty arguments list */ + let encoding = this.encoding; + let data = this.data; + if (data && Platform.BufferUtils.isBuffer(data)) { + if (args.length > 0) { + /* stringify call */ + encoding = encoding ? encoding + '/base64' : 'base64'; + data = Platform.BufferUtils.base64Encode(data); + } else { + /* Called by msgpack. toBuffer returns a datatype understandable by + * that platform's msgpack implementation (Buffer in node, Uint8Array + * in browsers) */ + data = Platform.BufferUtils.toBuffer(data); + } + } + return Object.assign({}, this, { encoding, data }); +} + +// in-place, generally called on the protocol message before decoding +export function populateFieldsFromParent(parent: ProtocolMessage) { + let msgs: BaseMessage[]; + switch (parent.action) { + case actions.MESSAGE: + msgs = parent.messages!; + break; + case actions.PRESENCE: + case actions.SYNC: + msgs = parent.presence!; + break; + default: + throw new ErrorInfo('Unexpected action ' + parent.action, 40000, 400); + } + + const { id, connectionId, timestamp } = parent; + for (let i = 0; i < msgs.length; i++) { + const msg = msgs[i]; + if (!msg.connectionId) msg.connectionId = connectionId; + if (!msg.timestamp) msg.timestamp = timestamp; + if (id && !msg.id) msg.id = id + ':' + i; + } +} + +export abstract class BaseMessage { + id?: string; + timestamp?: number; + clientId?: string; + connectionId?: string; + data?: any; + encoding?: string | null; + extras?: any; + size?: number; + + toString(): string { + let result = ''; + if (this.id) result += '; id=' + this.id; + if (this.timestamp) result += '; timestamp=' + this.timestamp; + if (this.clientId) result += '; clientId=' + this.clientId; + if (this.connectionId) result += '; connectionId=' + this.connectionId; + if (this.encoding) result += '; encoding=' + this.encoding; + if (this.extras) result += '; extras =' + JSON.stringify(this.extras); + if (this.data) { + if (typeof this.data == 'string') result += '; data=' + this.data; + else if (Platform.BufferUtils.isBuffer(this.data)) + result += '; data (buffer)=' + Platform.BufferUtils.base64Encode(this.data); + else result += '; data (json)=' + JSON.stringify(this.data); + } + if (this.extras) result += '; extras=' + JSON.stringify(this.extras); + return result; + } +} diff --git a/src/common/lib/types/defaultmessage.ts b/src/common/lib/types/defaultmessage.ts index 1161e909c..41221f088 100644 --- a/src/common/lib/types/defaultmessage.ts +++ b/src/common/lib/types/defaultmessage.ts @@ -1,18 +1,6 @@ -import Message, { - WireProtocolMessage, - CipherOptions, - decode, - encode, - EncodingDecodingContext, - fromEncoded, - fromEncodedArray, - fromValues, - fromWireProtocol, -} from './message'; +import Message, { WireMessage, fromEncoded, fromEncodedArray } from './message'; import * as API from '../../../../ably'; import Platform from 'common/platform'; -import PresenceMessage from './presencemessage'; -import { ChannelOptions } from 'common/types/channel'; import Logger from '../util/logger'; import type { Properties } from '../util/utils'; @@ -21,33 +9,14 @@ import type { Properties } from '../util/utils'; */ export class DefaultMessage extends Message { static async fromEncoded(encoded: unknown, inputOptions?: API.ChannelOptions): Promise { - return fromEncoded(Logger.defaultLogger, Platform.Crypto, encoded as WireProtocolMessage, inputOptions); + return fromEncoded(Logger.defaultLogger, Platform.Crypto, encoded as WireMessage, inputOptions); } static async fromEncodedArray(encodedArray: Array, options?: API.ChannelOptions): Promise { - return fromEncodedArray(Logger.defaultLogger, Platform.Crypto, encodedArray as WireProtocolMessage[], options); + return fromEncodedArray(Logger.defaultLogger, Platform.Crypto, encodedArray as WireMessage[], options); } - // Used by tests static fromValues(values: Properties): Message { - return fromValues(values); - } - - // Used by tests - static fromWireProtocol(values: WireProtocolMessage): Message { - return fromWireProtocol(values); - } - - // Used by tests - static async encode(msg: T, options: CipherOptions): Promise { - return encode(msg, options); - } - - // Used by tests - static async decode( - message: Message | PresenceMessage, - inputContext: CipherOptions | EncodingDecodingContext | ChannelOptions, - ): Promise { - return decode(message, inputContext); + return Message.fromValues(values); } } diff --git a/src/common/lib/types/defaultpresencemessage.ts b/src/common/lib/types/defaultpresencemessage.ts index a5ef3877d..cc5a64424 100644 --- a/src/common/lib/types/defaultpresencemessage.ts +++ b/src/common/lib/types/defaultpresencemessage.ts @@ -1,11 +1,6 @@ import * as API from '../../../../ably'; import Logger from '../util/logger'; -import PresenceMessage, { - fromEncoded, - fromEncodedArray, - fromValues, - WireProtocolPresenceMessage, -} from './presencemessage'; +import PresenceMessage, { fromEncoded, fromEncodedArray, WirePresenceMessage } from './presencemessage'; import Platform from 'common/platform'; import type { Properties } from '../util/utils'; @@ -14,22 +9,17 @@ import type { Properties } from '../util/utils'; */ export class DefaultPresenceMessage extends PresenceMessage { static async fromEncoded(encoded: unknown, inputOptions?: API.ChannelOptions): Promise { - return fromEncoded(Logger.defaultLogger, Platform.Crypto, encoded as WireProtocolPresenceMessage, inputOptions); + return fromEncoded(Logger.defaultLogger, Platform.Crypto, encoded as WirePresenceMessage, inputOptions); } static async fromEncodedArray( encodedArray: Array, options?: API.ChannelOptions, ): Promise { - return fromEncodedArray( - Logger.defaultLogger, - Platform.Crypto, - encodedArray as WireProtocolPresenceMessage[], - options, - ); + return fromEncodedArray(Logger.defaultLogger, Platform.Crypto, encodedArray as WirePresenceMessage[], options); } static fromValues(values: Properties): PresenceMessage { - return fromValues(values); + return PresenceMessage.fromValues(values); } } diff --git a/src/common/lib/types/message.ts b/src/common/lib/types/message.ts index 4946f34d8..73f6e4f2f 100644 --- a/src/common/lib/types/message.ts +++ b/src/common/lib/types/message.ts @@ -1,9 +1,14 @@ -import Platform from 'common/platform'; import Logger from '../util/logger'; -import ErrorInfo from './errorinfo'; -import PresenceMessage from './presencemessage'; +import { + BaseMessage, + encode, + decode, + wireToJSON, + normalizeCipherOptions, + EncodingDecodingContext, + CipherOptions, +} from './basemessage'; import * as Utils from '../util/utils'; -import { Bufferlike as BrowserBufferlike } from '../../../platform/web/lib/util/bufferutils'; import * as API from '../../../../ably'; import type { IUntypedCryptoStatic } from 'common/types/ICryptoStatic'; @@ -11,80 +16,19 @@ import type { ChannelOptions } from '../../types/channel'; import type { Properties } from '../util/utils'; import type RestChannel from '../client/restchannel'; import type RealtimeChannel from '../client/realtimechannel'; +import type ErrorInfo from './errorinfo'; type Channel = RestChannel | RealtimeChannel; -const MessageActionArray: API.MessageAction[] = [ +const actions: API.MessageAction[] = [ 'message.create', 'message.update', 'message.delete', 'meta.occupancy', - 'message.summary' + 'message.summary', ]; -const MessageActionMap = new Map(MessageActionArray.map((action, index) => [action, index])); - -const ReverseMessageActionMap = new Map( - MessageActionArray.map((action, index) => [index, action]), -); - -function toMessageActionString(actionNumber: number): API.MessageAction | undefined { - return ReverseMessageActionMap.get(actionNumber); -} - -function toMessageActionNumber(messageAction?: API.MessageAction): number | undefined { - return messageAction ? MessageActionMap.get(messageAction) : undefined; -} - -export type CipherOptions = { - channelCipher: { - encrypt: Function; - algorithm: 'aes'; - }; - cipher?: { - channelCipher: { - encrypt: Function; - algorithm: 'aes'; - }; - }; -}; - -export type EncodingDecodingContext = { - channelOptions: ChannelOptions; - plugins: { - vcdiff?: { - decode: (delta: Uint8Array, source: Uint8Array) => Uint8Array; - }; - }; - baseEncodedPreviousPayload?: Buffer | BrowserBufferlike; -}; - -export type WireProtocolMessage = Omit & { action: number }; - -function normaliseContext(context: CipherOptions | EncodingDecodingContext | ChannelOptions): EncodingDecodingContext { - if (!context || !(context as EncodingDecodingContext).channelOptions) { - return { - channelOptions: context as ChannelOptions, - plugins: {}, - baseEncodedPreviousPayload: undefined, - }; - } - return context as EncodingDecodingContext; -} - -export function normalizeCipherOptions( - Crypto: IUntypedCryptoStatic | null, - logger: Logger, - options: API.ChannelOptions | null, -): ChannelOptions { - if (options && options.cipher) { - if (!Crypto) Utils.throwMissingPluginError('Crypto'); - const cipher = Crypto.getCipher(options.cipher, logger); - return { - cipher: cipher.cipherParams, - channelCipher: cipher.cipher, - }; - } - return options ?? {}; +function stringifyAction(action: number): string { + return actions[action || 0] || 'unknown'; } function getMessageSize(msg: Message) { @@ -107,25 +51,18 @@ function getMessageSize(msg: Message) { export async function fromEncoded( logger: Logger, Crypto: IUntypedCryptoStatic | null, - encoded: WireProtocolMessage, + encoded: Properties, inputOptions?: API.ChannelOptions, ): Promise { - const msg = fromWireProtocol(encoded); const options = normalizeCipherOptions(Crypto, logger, inputOptions ?? null); - /* if decoding fails at any point, catch and return the message decoded to - * the fullest extent possible */ - try { - await decode(msg, options); - } catch (e) { - Logger.logAction(logger, Logger.LOG_ERROR, 'Message.fromEncoded()', (e as Error).toString()); - } - return msg; + const wm = WireMessage.fromValues(encoded); + return wm.decode(options, logger); } export async function fromEncodedArray( logger: Logger, Crypto: IUntypedCryptoStatic | null, - encodedArray: Array, + encodedArray: Array, options?: API.ChannelOptions, ): Promise { return Promise.all( @@ -137,17 +74,12 @@ export async function fromEncodedArray( // these forms of the functions are used internally when we have a channel instance // already, so don't need to normalise channel options -export async function _fromEncoded(encoded: WireProtocolMessage, channel: Channel): Promise { - const msg = fromWireProtocol(encoded); - try { - await decode(msg, channel.channelOptions); - } catch (e) { - Logger.logAction(channel.logger, Logger.LOG_ERROR, 'Message._fromEncoded()', (e as Error).toString()); - } - return msg; +export async function _fromEncoded(encoded: Properties, channel: Channel): Promise { + const wm = WireMessage.fromValues(encoded); + return wm.decode(channel.channelOptions, channel.logger); } -export async function _fromEncodedArray(encodedArray: WireProtocolMessage[], channel: Channel): Promise { +export async function _fromEncodedArray(encodedArray: Properties[], channel: Channel): Promise { return Promise.all( encodedArray.map(function (encoded) { return _fromEncoded(encoded, channel); @@ -155,162 +87,12 @@ export async function _fromEncodedArray(encodedArray: WireProtocolMessage[], cha ); } -async function encrypt(msg: T, options: CipherOptions): Promise { - let data = msg.data, - encoding = msg.encoding, - cipher = options.channelCipher; - - encoding = encoding ? encoding + '/' : ''; - if (!Platform.BufferUtils.isBuffer(data)) { - data = Platform.BufferUtils.utf8Encode(String(data)); - encoding = encoding + 'utf-8/'; - } - const ciphertext = await cipher.encrypt(data); - msg.data = ciphertext; - msg.encoding = encoding + 'cipher+' + cipher.algorithm; - return msg; -} - -export async function encode(msg: T, options: CipherOptions): Promise { - const data = msg.data; - const nativeDataType = - typeof data == 'string' || Platform.BufferUtils.isBuffer(data) || data === null || data === undefined; - - if (!nativeDataType) { - if (Utils.isObject(data) || Array.isArray(data)) { - msg.data = JSON.stringify(data); - msg.encoding = msg.encoding ? msg.encoding + '/json' : 'json'; - } else { - throw new ErrorInfo('Data type is unsupported', 40013, 400); - } - } - - if (options != null && options.cipher) { - return encrypt(msg, options); - } else { - return msg; - } -} - -export async function encodeArray(messages: Array, options: CipherOptions): Promise> { - return Promise.all(messages.map((message) => encode(message, options))); +export async function encodeArray(messages: Array, options: CipherOptions): Promise> { + return Promise.all(messages.map((message) => message.encode(options))); } export const serialize = Utils.encodeBody; -export async function decode( - message: Message | PresenceMessage, - inputContext: CipherOptions | EncodingDecodingContext | ChannelOptions, -): Promise { - const context = normaliseContext(inputContext); - - let lastPayload = message.data; - const encoding = message.encoding; - if (encoding) { - const xforms = encoding.split('/'); - let lastProcessedEncodingIndex, - encodingsToProcess = xforms.length, - data = message.data; - - let xform = ''; - try { - while ((lastProcessedEncodingIndex = encodingsToProcess) > 0) { - // eslint-disable-next-line security/detect-unsafe-regex - const match = xforms[--encodingsToProcess].match(/([-\w]+)(\+([\w-]+))?/); - if (!match) break; - xform = match[1]; - switch (xform) { - case 'base64': - data = Platform.BufferUtils.base64Decode(String(data)); - if (lastProcessedEncodingIndex == xforms.length) { - lastPayload = data; - } - continue; - case 'utf-8': - data = Platform.BufferUtils.utf8Decode(data); - continue; - case 'json': - data = JSON.parse(data); - continue; - case 'cipher': - if ( - context.channelOptions != null && - context.channelOptions.cipher && - context.channelOptions.channelCipher - ) { - const xformAlgorithm = match[3], - cipher = context.channelOptions.channelCipher; - /* don't attempt to decrypt unless the cipher params are compatible */ - if (xformAlgorithm != cipher.algorithm) { - throw new Error('Unable to decrypt message with given cipher; incompatible cipher params'); - } - data = await cipher.decrypt(data); - continue; - } else { - throw new Error('Unable to decrypt message; not an encrypted channel'); - } - case 'vcdiff': - if (!context.plugins || !context.plugins.vcdiff) { - throw new ErrorInfo('Missing Vcdiff decoder (https://github.com/ably-forks/vcdiff-decoder)', 40019, 400); - } - if (typeof Uint8Array === 'undefined') { - throw new ErrorInfo( - 'Delta decoding not supported on this browser (need ArrayBuffer & Uint8Array)', - 40020, - 400, - ); - } - try { - let deltaBase = context.baseEncodedPreviousPayload; - if (typeof deltaBase === 'string') { - deltaBase = Platform.BufferUtils.utf8Encode(deltaBase); - } - - // vcdiff expects Uint8Arrays, can't copy with ArrayBuffers. - const deltaBaseBuffer = Platform.BufferUtils.toBuffer(deltaBase as Buffer); - data = Platform.BufferUtils.toBuffer(data); - - data = Platform.BufferUtils.arrayBufferViewToBuffer(context.plugins.vcdiff.decode(data, deltaBaseBuffer)); - lastPayload = data; - } catch (e) { - throw new ErrorInfo('Vcdiff delta decode failed with ' + e, 40018, 400); - } - continue; - default: - throw new Error('Unknown encoding'); - } - } - } catch (e) { - const err = e as ErrorInfo; - throw new ErrorInfo( - 'Error processing the ' + xform + ' encoding, decoder returned ‘' + err.message + '’', - err.code || 40013, - 400, - ); - } finally { - message.encoding = - (lastProcessedEncodingIndex as number) <= 0 ? null : xforms.slice(0, lastProcessedEncodingIndex).join('/'); - message.data = data; - } - } - context.baseEncodedPreviousPayload = lastPayload; -} - -export function fromValues(values: Properties): Message { - return Object.assign(new Message(), values); -} - -export function fromWireProtocol(values: WireProtocolMessage): Message { - const action = toMessageActionString(values.action as number) || values.action; - const res = Object.assign(new Message(), { ...values, action }); - res.expandFields(); - return res; -} - -export function fromValuesArray(values: Properties[]): Message[] { - return values.map(fromValues); -} - /* This should be called on encode()d (and encrypt()d) Messages (as it * assumes the data is a string or buffer) */ export function getMessagesSize(messages: Message[]): number { @@ -323,17 +105,9 @@ export function getMessagesSize(messages: Message[]): number { return total; } -class Message { +class Message extends BaseMessage { name?: string; - id?: string; - timestamp?: number; - clientId?: string; - connectionId?: string; connectionKey?: string; - data?: any; - encoding?: string | null; - extras?: any; - size?: number; action?: API.MessageAction; serial?: string; refSerial?: string; @@ -342,47 +116,6 @@ class Message { version?: string; operation?: API.Operation; - /** - * Overload toJSON() to intercept JSON.stringify() - * @return {*} - */ - toJSON() { - /* encode data to base64 if present and we're returning real JSON; - * although msgpack calls toJSON(), we know it is a stringify() - * call if it has a non-empty arguments list */ - let encoding = this.encoding; - let data = this.data; - if (data && Platform.BufferUtils.isBuffer(data)) { - if (arguments.length > 0) { - /* stringify call */ - encoding = encoding ? encoding + '/base64' : 'base64'; - data = Platform.BufferUtils.base64Encode(data); - } else { - /* Called by msgpack. toBuffer returns a datatype understandable by - * that platform's msgpack implementation (Buffer in node, Uint8Array - * in browsers) */ - data = Platform.BufferUtils.toBuffer(data); - } - } - return { - name: this.name, - id: this.id, - clientId: this.clientId, - connectionId: this.connectionId, - connectionKey: this.connectionKey, - extras: this.extras, - serial: this.serial, - action: toMessageActionNumber(this.action as API.MessageAction) || this.action, - refSerial: this.refSerial, - refType: this.refType, - createdAt: this.createdAt, - version: this.version, - operation: this.operation, - encoding, - data, - }; - } - expandFields() { if (this.action === 'message.create') { // TM2k @@ -396,24 +129,18 @@ class Message { } } + async encode(options: CipherOptions): Promise { + const res = Object.assign(new WireMessage(), this, { + action: actions.indexOf(this.action || 'message.create'), + }); + return encode(res, options); + } + toString(): string { let result = '[Message'; - if (this.name) result += '; name=' + this.name; - if (this.id) result += '; id=' + this.id; - if (this.timestamp) result += '; timestamp=' + this.timestamp; - if (this.clientId) result += '; clientId=' + this.clientId; - if (this.connectionId) result += '; connectionId=' + this.connectionId; - if (this.encoding) result += '; encoding=' + this.encoding; - if (this.extras) result += '; extras =' + JSON.stringify(this.extras); - if (this.data) { - if (typeof this.data == 'string') result += '; data=' + this.data; - else if (Platform.BufferUtils.isBuffer(this.data)) - result += '; data (buffer)=' + Platform.BufferUtils.base64Encode(this.data); - else result += '; data (json)=' + JSON.stringify(this.data); - } - if (this.extras) result += '; extras=' + JSON.stringify(this.extras); - if (this.action) result += '; action=' + this.action; + if (this.name) result += '; name=' + this.name; + result += super.toString(); if (this.serial) result += '; serial=' + this.serial; if (this.version) result += '; version=' + this.version; if (this.refSerial) result += '; refSerial=' + this.refSerial; @@ -423,6 +150,59 @@ class Message { result += ']'; return result; } + + static fromValues(values: Properties): Message { + return Object.assign(new Message(), values); + } + + static fromValuesArray(values: Properties[]): Message[] { + return values.map(Message.fromValues); + } +} + +export class WireMessage extends Message { + action!: any; + + // Overload toJSON() to intercept JSON.stringify() + toJSON(...args: any[]) { + return wireToJSON.call(this, ...args); + } + + static fromValues(values: Properties): WireMessage { + return Object.assign(new WireMessage(), values); + } + + static fromValuesArray(values: Properties[]): WireMessage[] { + return values.map(WireMessage.fromValues); + } + + // for contexts where some decoding errors need to be handled specially by the caller + async decodeWithErr( + inputContext: CipherOptions | EncodingDecodingContext | ChannelOptions, + logger: Logger, + ): Promise<{ decoded: Message; err: ErrorInfo | undefined }> { + const res = Object.assign(new Message(), { + ...this, + action: stringifyAction(this.action), + }); + let err: ErrorInfo | undefined; + try { + await decode(res, inputContext); + } catch (e) { + Logger.logAction(logger, Logger.LOG_ERROR, 'WireMessage.decode()', Utils.inspectError(e)); + err = e as ErrorInfo; + } + res.expandFields(); + return { decoded: res, err: err }; + } + + async decode( + inputContext: CipherOptions | EncodingDecodingContext | ChannelOptions, + logger: Logger, + ): Promise { + const { decoded } = await this.decodeWithErr(inputContext, logger); + return decoded; + } } export default Message; diff --git a/src/common/lib/types/presencemessage.ts b/src/common/lib/types/presencemessage.ts index 62ac6c073..a3229cdd2 100644 --- a/src/common/lib/types/presencemessage.ts +++ b/src/common/lib/types/presencemessage.ts @@ -1,44 +1,32 @@ import Logger from '../util/logger'; -import Platform from 'common/platform'; -import { normalizeCipherOptions, encode as encodeMessage, decode as decodeMessage, getMessagesSize } from './message'; +import { BaseMessage, encode, decode, wireToJSON, normalizeCipherOptions, CipherOptions } from './basemessage'; import * as API from '../../../../ably'; +import * as Utils from '../util/utils'; import type { IUntypedCryptoStatic } from 'common/types/ICryptoStatic'; import type { Properties } from '../util/utils'; import type RestChannel from '../client/restchannel'; import type RealtimeChannel from '../client/realtimechannel'; +import type { ChannelOptions } from '../../types/channel'; type Channel = RestChannel | RealtimeChannel; const actions = ['absent', 'present', 'enter', 'leave', 'update']; -export type WireProtocolPresenceMessage = Omit & { action: number }; - -function toActionValue(actionString: string) { - return actions.indexOf(actionString); -} - export async function fromEncoded( logger: Logger, Crypto: IUntypedCryptoStatic | null, - encoded: WireProtocolPresenceMessage, + encoded: WirePresenceMessage, inputOptions?: API.ChannelOptions, ): Promise { - const msg = fromWireProtocol(encoded); const options = normalizeCipherOptions(Crypto, logger, inputOptions ?? null); - /* if decoding fails at any point, catch and return the message decoded to - * the fullest extent possible */ - try { - await decode(msg, options ?? {}); - } catch (e) { - Logger.logAction(logger, Logger.LOG_ERROR, 'PresenceMessage.fromEncoded()', (e as Error).toString()); - } - return msg; + const wpm = WirePresenceMessage.fromValues(encoded); + return wpm.decode(options, logger); } export async function fromEncodedArray( logger: Logger, Crypto: IUntypedCryptoStatic | null, - encodedArray: WireProtocolPresenceMessage[], + encodedArray: WirePresenceMessage[], options?: API.ChannelOptions, ): Promise { return Promise.all( @@ -50,18 +38,15 @@ export async function fromEncodedArray( // these forms of the functions are used internally when we have a channel instance // already, so don't need to normalise channel options -export async function _fromEncoded(encoded: WireProtocolPresenceMessage, channel: Channel): Promise { - const msg = fromWireProtocol(encoded); - try { - await decode(msg, channel.channelOptions); - } catch (e) { - Logger.logAction(channel.logger, Logger.LOG_ERROR, 'PresenceMessage._fromEncoded()', (e as Error).toString()); - } - return msg; +export async function _fromEncoded( + encoded: Properties, + channel: Channel, +): Promise { + return WirePresenceMessage.fromValues(encoded).decode(channel.channelOptions, channel.logger); } export async function _fromEncodedArray( - encodedArray: WireProtocolPresenceMessage[], + encodedArray: Properties[], channel: Channel, ): Promise { return Promise.all( @@ -71,43 +56,8 @@ export async function _fromEncodedArray( ); } -export function fromValues(values: Properties): PresenceMessage { - return Object.assign(new PresenceMessage(), values); -} - -export function fromWireProtocol(values: WireProtocolPresenceMessage): PresenceMessage { - const action = actions[values.action]; - return Object.assign(new PresenceMessage(), { ...values, action }); -} - -export { encodeMessage as encode }; -export const decode = decodeMessage; - -export function fromValuesArray(values: Properties[]): PresenceMessage[] { - return values.map(fromValues); -} - -export function fromData(data: any): PresenceMessage { - if (data instanceof PresenceMessage) { - return data; - } - return fromValues({ - data, - }); -} - -export { getMessagesSize }; - -class PresenceMessage { +class PresenceMessage extends BaseMessage { action?: string; - id?: string; - timestamp?: number; - clientId?: string; - connectionId?: string; - data?: string | Buffer | Uint8Array; - encoding?: string; - extras?: any; - size?: number; /* Returns whether this presenceMessage is synthesized, i.e. was not actually * sent by the connection (usually means a leave event sent 15s after a @@ -132,66 +82,67 @@ class PresenceMessage { }; } - /** - * Overload toJSON() to intercept JSON.stringify() - * @return {*} - */ - toJSON(): { - id?: string; - clientId?: string; - action: number; - data: string | Buffer | Uint8Array; - encoding?: string; - extras?: any; - } { - /* encode data to base64 if present and we're returning real JSON; - * although msgpack calls toJSON(), we know it is a stringify() - * call if it has a non-empty arguments list */ - let data = this.data as string | Buffer | Uint8Array; - let encoding = this.encoding; - if (data && Platform.BufferUtils.isBuffer(data)) { - if (arguments.length > 0) { - /* stringify call */ - encoding = encoding ? encoding + '/base64' : 'base64'; - data = Platform.BufferUtils.base64Encode(data); - } else { - /* Called by msgpack. toBuffer returns a datatype understandable by - * that platform's msgpack implementation (Buffer in node, Uint8Array - * in browsers) */ - data = Platform.BufferUtils.toBuffer(data); - } - } - return { - id: this.id, - clientId: this.clientId, - /* Convert presence action back to an int for sending to Ably */ - action: toActionValue(this.action as string), - data: data, - encoding: encoding, - extras: this.extras, - }; + async encode(options: CipherOptions): Promise { + const res = Object.assign(new WirePresenceMessage(), this, { + action: actions.indexOf(this.action || 'present'), + }); + return encode(res, options); } toString(): string { let result = '[PresenceMessage'; result += '; action=' + this.action; if (this.id) result += '; id=' + this.id; - if (this.timestamp) result += '; timestamp=' + this.timestamp; - if (this.clientId) result += '; clientId=' + this.clientId; - if (this.connectionId) result += '; connectionId=' + this.connectionId; - if (this.encoding) result += '; encoding=' + this.encoding; - if (this.data) { - if (typeof this.data == 'string') result += '; data=' + this.data; - else if (Platform.BufferUtils.isBuffer(this.data)) - result += '; data (buffer)=' + Platform.BufferUtils.base64Encode(this.data); - else result += '; data (json)=' + JSON.stringify(this.data); - } - if (this.extras) { - result += '; extras=' + JSON.stringify(this.extras); - } + result += super.toString(); result += ']'; return result; } + + static fromValues(values: Properties): PresenceMessage { + return Object.assign(new PresenceMessage(), values); + } + + static fromValuesArray(values: Properties[]): PresenceMessage[] { + return values.map(PresenceMessage.fromValues); + } + + static fromData(data: any): PresenceMessage { + if (data instanceof PresenceMessage) { + return data; + } + return PresenceMessage.fromValues({ + data, + }); + } +} + +export class WirePresenceMessage extends PresenceMessage { + action: any; + + toJSON(...args: any[]) { + return wireToJSON.call(this, ...args); + } + + static fromValues(values: Properties): WirePresenceMessage { + return Object.assign(new WirePresenceMessage(), values); + } + + static fromValuesArray(values: Properties[]): WirePresenceMessage[] { + return values.map(WirePresenceMessage.fromValues); + } + + async decode(channelOptions: ChannelOptions, logger: Logger): Promise { + const res = Object.assign(new PresenceMessage(), { + ...this, + action: actions[this.action], + }); + try { + await decode(res, channelOptions); + } catch (e) { + Logger.logAction(logger, Logger.LOG_ERROR, 'WirePresenceMessage.decode()', Utils.inspectError(e)); + } + return res; + } } export default PresenceMessage; diff --git a/src/common/lib/types/protocolmessage.ts b/src/common/lib/types/protocolmessage.ts index 1912d2002..9b1218470 100644 --- a/src/common/lib/types/protocolmessage.ts +++ b/src/common/lib/types/protocolmessage.ts @@ -3,60 +3,12 @@ import * as API from '../../../../ably'; import { PresenceMessagePlugin } from '../client/modularplugins'; import * as Utils from '../util/utils'; import ErrorInfo from './errorinfo'; -import Message, { - fromWireProtocol as messageFromWireProtocol, - fromValuesArray as messagesFromValuesArray, - WireProtocolMessage, -} from './message'; -import PresenceMessage, { - fromWireProtocol as presenceMessageFromWireProtocol, - fromValues as presenceMessageFromValues, - fromValuesArray as presenceMessagesFromValuesArray, - WireProtocolPresenceMessage, -} from './presencemessage'; - -export const actions = { - HEARTBEAT: 0, - ACK: 1, - NACK: 2, - CONNECT: 3, - CONNECTED: 4, - DISCONNECT: 5, - DISCONNECTED: 6, - CLOSE: 7, - CLOSED: 8, - ERROR: 9, - ATTACH: 10, - ATTACHED: 11, - DETACH: 12, - DETACHED: 13, - PRESENCE: 14, - MESSAGE: 15, - SYNC: 16, - AUTH: 17, - ACTIVATE: 18, -}; - -export const ActionName: string[] = []; -Object.keys(actions).forEach(function (name) { - ActionName[(actions as { [key: string]: number })[name]] = name; -}); - -const flags: { [key: string]: number } = { - /* Channel attach state flags */ - HAS_PRESENCE: 1 << 0, - HAS_BACKLOG: 1 << 1, - RESUMED: 1 << 2, - TRANSIENT: 1 << 4, - ATTACH_RESUME: 1 << 5, - /* Channel mode flags */ - PRESENCE: 1 << 16, - PUBLISH: 1 << 17, - SUBSCRIBE: 1 << 18, - PRESENCE_SUBSCRIBE: 1 << 19, -}; -const flagNames = Object.keys(flags); -flags.MODE_ALL = flags.PRESENCE | flags.PUBLISH | flags.SUBSCRIBE | flags.PRESENCE_SUBSCRIBE; +import Message, { WireMessage } from './message'; +import PresenceMessage, { WirePresenceMessage } from './presencemessage'; +import { flags, flagNames, channelModes, ActionName } from './protocolmessagecommon'; +import type { Properties } from '../util/utils'; + +export const serialize = Utils.encodeBody; function toStringArray(array?: any[]): string { const result = []; @@ -68,10 +20,6 @@ function toStringArray(array?: any[]): string { return '[ ' + result.join(', ') + ' ]'; } -export const channelModes = ['PRESENCE', 'PUBLISH', 'SUBSCRIBE', 'PRESENCE_SUBSCRIBE']; - -export const serialize = Utils.encodeBody; - export function deserialize( serialized: unknown, MsgPack: MsgPack | null, @@ -86,35 +34,31 @@ export function fromDeserialized( deserialized: Record, presenceMessagePlugin: PresenceMessagePlugin | null, ): ProtocolMessage { - const error = deserialized.error; - if (error) { - deserialized.error = ErrorInfo.fromValues(error as ErrorInfo); + let error: ErrorInfo | undefined; + if (deserialized.error) { + error = ErrorInfo.fromValues(deserialized.error as ErrorInfo); } let messages: Message[] | undefined; if (deserialized.messages) { - const dm = deserialized.messages as WireProtocolMessage[]; - messages = dm.map((m) => messageFromWireProtocol(m)); + messages = WireMessage.fromValuesArray(deserialized.messages as Array>); } let presence: PresenceMessage[] | undefined; if (presenceMessagePlugin && deserialized.presence) { - const dp = deserialized.presence as WireProtocolPresenceMessage[]; - presence = dp.map((pm) => presenceMessagePlugin.presenceMessageFromWireProtocol(pm)); + presence = presenceMessagePlugin.WirePresenceMessage.fromValuesArray( + deserialized.presence as Array>, + ); } - return Object.assign(new ProtocolMessage(), { ...deserialized, presence, messages }); + return Object.assign(new ProtocolMessage(), { ...deserialized, presence, messages, error }); } /** * Used by the tests. */ export function fromDeserializedIncludingDependencies(deserialized: Record): ProtocolMessage { - return fromDeserialized(deserialized, { - presenceMessageFromValues, - presenceMessagesFromValuesArray, - presenceMessageFromWireProtocol, - }); + return fromDeserialized(deserialized, { PresenceMessage, WirePresenceMessage }); } export function fromValues(values: unknown): ProtocolMessage { @@ -132,9 +76,9 @@ export function stringify(msg: any, presenceMessagePlugin: PresenceMessagePlugin if (msg[attribute] !== undefined) result += '; ' + attribute + '=' + msg[attribute]; } - if (msg.messages) result += '; messages=' + toStringArray(messagesFromValuesArray(msg.messages)); + if (msg.messages) result += '; messages=' + toStringArray(WireMessage.fromValuesArray(msg.messages)); if (msg.presence && presenceMessagePlugin) - result += '; presence=' + toStringArray(presenceMessagePlugin.presenceMessagesFromValuesArray(msg.presence)); + result += '; presence=' + toStringArray(presenceMessagePlugin.WirePresenceMessage.fromValuesArray(msg.presence)); if (msg.error) result += '; error=' + ErrorInfo.fromValues(msg.error).toString(); if (msg.auth && msg.auth.accessToken) result += '; token=' + msg.auth.accessToken; if (msg.flags) result += '; flags=' + flagNames.filter(msg.hasFlag).join(','); @@ -165,9 +109,9 @@ class ProtocolMessage { channel?: string; channelSerial?: string | null; msgSerial?: number; - messages?: Message[]; + messages?: WireMessage[]; // This will be undefined if we skipped decoding this property due to user not requesting presence functionality — see `fromDeserialized` - presence?: PresenceMessage[]; + presence?: WirePresenceMessage[]; auth?: unknown; connectionDetails?: Record; diff --git a/src/common/lib/types/protocolmessagecommon.ts b/src/common/lib/types/protocolmessagecommon.ts new file mode 100644 index 000000000..d6131d8eb --- /dev/null +++ b/src/common/lib/types/protocolmessagecommon.ts @@ -0,0 +1,49 @@ +// constant definitions that can be imported by anyone without worrying about circular +// deps + +export const actions = { + HEARTBEAT: 0, + ACK: 1, + NACK: 2, + CONNECT: 3, + CONNECTED: 4, + DISCONNECT: 5, + DISCONNECTED: 6, + CLOSE: 7, + CLOSED: 8, + ERROR: 9, + ATTACH: 10, + ATTACHED: 11, + DETACH: 12, + DETACHED: 13, + PRESENCE: 14, + MESSAGE: 15, + SYNC: 16, + AUTH: 17, + ACTIVATE: 18, +}; + +export const ActionName: string[] = []; +Object.keys(actions).forEach(function (name) { + ActionName[(actions as { [key: string]: number })[name]] = name; +}); + +export const flags: { [key: string]: number } = { + /* Channel attach state flags */ + HAS_PRESENCE: 1 << 0, + HAS_BACKLOG: 1 << 1, + RESUMED: 1 << 2, + TRANSIENT: 1 << 4, + ATTACH_RESUME: 1 << 5, + /* Channel mode flags */ + PRESENCE: 1 << 16, + PUBLISH: 1 << 17, + SUBSCRIBE: 1 << 18, + PRESENCE_SUBSCRIBE: 1 << 19, +}; + +export const flagNames = Object.keys(flags); + +flags.MODE_ALL = flags.PRESENCE | flags.PUBLISH | flags.SUBSCRIBE | flags.PRESENCE_SUBSCRIBE; + +export const channelModes = ['PRESENCE', 'PUBLISH', 'SUBSCRIBE', 'PRESENCE_SUBSCRIBE']; diff --git a/src/platform/web/modular/presencemessage.ts b/src/platform/web/modular/presencemessage.ts index 40e90830e..1c1cdaeb5 100644 --- a/src/platform/web/modular/presencemessage.ts +++ b/src/platform/web/modular/presencemessage.ts @@ -1,5 +1,5 @@ import * as API from '../../../../ably'; -import { fromEncoded, fromEncodedArray, fromValues } from '../../../common/lib/types/presencemessage'; +import PresenceMessage, { fromEncoded, fromEncodedArray } from '../../../common/lib/types/presencemessage'; import { Crypto } from './crypto'; import Logger from '../../../common/lib/util/logger'; @@ -21,4 +21,4 @@ export const decodeEncryptedPresenceMessages = ((obj, options) => { return fromEncodedArray(Logger.defaultLogger, Crypto, obj, options); }) as API.PresenceMessageStatic['fromEncodedArray']; -export const constructPresenceMessage = fromValues as API.PresenceMessageStatic['fromValues']; +export const constructPresenceMessage = PresenceMessage.fromValues as API.PresenceMessageStatic['fromValues']; diff --git a/src/platform/web/modular/realtimepresence.ts b/src/platform/web/modular/realtimepresence.ts index 4d42fd932..1d3335ce3 100644 --- a/src/platform/web/modular/realtimepresence.ts +++ b/src/platform/web/modular/realtimepresence.ts @@ -1,16 +1,11 @@ import { RealtimePresencePlugin } from 'common/lib/client/modularplugins'; import { default as realtimePresenceClass } from '../../../common/lib/client/realtimepresence'; -import { - fromValues as presenceMessageFromValues, - fromValuesArray as presenceMessagesFromValuesArray, - fromWireProtocol as presenceMessageFromWireProtocol, -} from '../../../common/lib/types/presencemessage'; +import PresenceMessage, { WirePresenceMessage } from '../../../common/lib/types/presencemessage'; const RealtimePresence: RealtimePresencePlugin = { RealtimePresence: realtimePresenceClass, - presenceMessageFromValues, - presenceMessagesFromValuesArray, - presenceMessageFromWireProtocol, + PresenceMessage, + WirePresenceMessage, }; export { RealtimePresence }; diff --git a/test/realtime/crypto.test.js b/test/realtime/crypto.test.js index 9eeca0eef..a5bfea646 100644 --- a/test/realtime/crypto.test.js +++ b/test/realtime/crypto.test.js @@ -264,9 +264,8 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, Helper, async function (channelOpts, testMessage, encryptedMessage) { /* encrypt plaintext message; encode() also to handle data that is not already string or buffer */ helper.recordPrivateApi('call.Message.encode'); - Helper.whenPromiseSettles(Message.encode(testMessage, channelOpts), function () { - /* compare */ - testMessageEquality(done, helper, testMessage, encryptedMessage); + Helper.whenPromiseSettles(testMessage.encode(channelOpts), function (_, encrypted) { + testMessageEquality(done, helper, encrypted, encryptedMessage); }); }, ); @@ -288,9 +287,8 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, Helper, async function (channelOpts, testMessage, encryptedMessage) { /* encrypt plaintext message; encode() also to handle data that is not already string or buffer */ helper.recordPrivateApi('call.Message.encode'); - Helper.whenPromiseSettles(Message.encode(testMessage, channelOpts), function () { - /* compare */ - testMessageEquality(done, helper, testMessage, encryptedMessage); + Helper.whenPromiseSettles(testMessage.encode(channelOpts), function (_, encrypted) { + testMessageEquality(done, helper, encrypted, encryptedMessage); }); }, ); @@ -387,9 +385,9 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, Helper, async false, function (channelOpts, testMessage, encryptedMessage, msgpackEncodedMessage) { helper.recordPrivateApi('call.Message.encode'); - Helper.whenPromiseSettles(Message.encode(testMessage, channelOpts), function () { + Helper.whenPromiseSettles(testMessage.encode(channelOpts), function (_, encrypted) { helper.recordPrivateApi('call.msgpack.encode'); - var msgpackFromEncoded = msgpack.encode(testMessage); + var msgpackFromEncoded = msgpack.encode(encrypted); var msgpackFromEncrypted = msgpack.encode(encryptedMessage); helper.recordPrivateApi('call.BufferUtils.base64Decode'); helper.recordPrivateApi('call.msgpack.decode'); @@ -431,9 +429,9 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, Helper, async 2, false, function (channelOpts, testMessage, encryptedMessage, msgpackEncodedMessage) { - Helper.whenPromiseSettles(Message.encode(testMessage, channelOpts), function () { + Helper.whenPromiseSettles(testMessage.encode(channelOpts), function (_, encrypted) { helper.recordPrivateApi('call.msgpack.encode'); - var msgpackFromEncoded = msgpack.encode(testMessage); + var msgpackFromEncoded = msgpack.encode(encrypted); var msgpackFromEncrypted = msgpack.encode(encryptedMessage); helper.recordPrivateApi('call.BufferUtils.base64Decode'); helper.recordPrivateApi('call.msgpack.decode'); diff --git a/test/realtime/message.test.js b/test/realtime/message.test.js index 9e57327e5..12728b3da 100644 --- a/test/realtime/message.test.js +++ b/test/realtime/message.test.js @@ -1275,7 +1275,7 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, Helper, async /** * @spec TM2j */ - describe('DefaultMessage.fromWireProtocol', function () { + describe('DefaultMessage.fromEncoded', function () { const testCases = [ { description: 'should stringify the numeric action', @@ -1284,30 +1284,31 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, Helper, async expectedJSON: { action: 0 }, }, { - description: 'should accept an already stringified action', - action: 'message.update', + description: 'should stringify the numeric action', + action: 1, expectedString: '[Message; action=message.update]', expectedJSON: { action: 1 }, }, { description: 'should handle no action provided', action: undefined, - expectedString: '[Message]', - expectedJSON: { action: undefined }, + expectedString: '[Message; action=message.create]', + expectedJSON: { action: 0 }, }, { description: 'should handle unknown action provided', action: 10, - expectedString: '[Message; action=10]', - expectedJSON: { action: 10 }, + expectedString: '[Message; action=unknown]', }, ]; testCases.forEach(({ description, action, options, expectedString, expectedJSON }) => { - it(description, function () { + it(description, async function () { const values = { action }; - const message = Message.fromWireProtocol(values); + const message = await Message.fromEncoded(values, {}); expect(message.toString()).to.equal(expectedString); - expect(message.toJSON()).to.deep.contains(expectedJSON); + if (expectedJSON) { + expect((await message.encode({})).toJSON()).to.deep.contains(expectedJSON); + } }); }); @@ -1315,17 +1316,17 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, Helper, async * @spec TM2k * @spec TM2o */ - it('create message should fill out serial and createdAt from version/timestamp', function () { - const values = { action: 1, timestamp: 12345, version: 'foo' }; - const message = Message.fromWireProtocol(values); + it('create message should fill out serial and createdAt from version/timestamp', async function () { + const values = { action: 0, timestamp: 12345, version: 'foo' }; + const message = await Message.fromEncoded(values); expect(message.timestamp).to.equal(12345); expect(message.createdAt).to.equal(12345); expect(message.version).to.equal('foo'); expect(message.serial).to.equal('foo'); // should only apply to creates - const update = { action: 2, timestamp: 12345, version: 'foo' }; - const updateMessage = Message.fromWireProtocol(update); + const update = { action: 1, timestamp: 12345, version: 'foo' }; + const updateMessage = await Message.fromEncoded(update); expect(updateMessage.createdAt).to.equal(undefined); expect(updateMessage.serial).to.equal(undefined); }); diff --git a/test/realtime/presence.test.js b/test/realtime/presence.test.js index 231219ea3..4b4696bdb 100644 --- a/test/realtime/presence.test.js +++ b/test/realtime/presence.test.js @@ -89,7 +89,7 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, Helper, async }; describe('realtime/presence', function () { - this.timeout(60 * 1000); + this.timeout(20 * 1000); before(function (done) { const helper = Helper.forHook(this); helper.setupApp(function (err) { @@ -1633,13 +1633,15 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, Helper, async const connId = realtime.connection.connectionManager.connectionId; helper.recordPrivateApi('call.presence._myMembers.put'); - channel.presence._myMembers.put({ - action: 'enter', - clientId: 'two', - connectionId: connId, - id: connId + ':0:0', - data: 'twodata', - }); + channel.presence._myMembers.put( + PresenceMessage.fromValues({ + action: 'present', + clientId: 'two', + connectionId: connId, + id: connId + ':0:0', + data: 'twodata', + }), + ); await helper.becomeSuspended(realtime); @@ -1787,12 +1789,14 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, Helper, async helper.recordPrivateApi('read.connectionManager.connectionId'); var connId = realtime.connection.connectionManager.connectionId; helper.recordPrivateApi('call.presence._myMembers.put'); - channel.presence._myMembers.put({ - action: 'enter', - clientId: 'me', - connectionId: connId, - id: connId + ':0:0', - }); + channel.presence._myMembers.put( + PresenceMessage.fromValues({ + action: 2, + clientId: 'me', + connectionId: connId, + id: connId + ':0:0', + }), + ); helper.becomeSuspended(realtime, cb); }, function (cb) { @@ -1948,18 +1952,20 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, Helper, async /* Inject an additional member locally */ helper.recordPrivateApi('call.channel.processMessage'); channel - .processMessage({ - action: 14, - id: 'messageid:0', - connectionId: 'connid', - timestamp: Date.now(), - presence: [ - { - clientId: goneClientId, - action: 'enter', - }, - ], - }) + .processMessage( + createPM({ + action: 14, + id: 'messageid:0', + connectionId: 'connid', + timestamp: Date.now(), + presence: [ + { + clientId: goneClientId, + action: 2, + }, + ], + }), + ) .then(function () { cb(null); }) @@ -2043,18 +2049,20 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, Helper, async /* Inject a member locally */ helper.recordPrivateApi('call.channel.processMessage'); channel - .processMessage({ - action: 14, - id: 'messageid:0', - connectionId: 'connid', - timestamp: Date.now(), - presence: [ - { - clientId: fakeClientId, - action: 'enter', - }, - ], - }) + .processMessage( + createPM({ + action: 14, + id: 'messageid:0', + connectionId: 'connid', + timestamp: Date.now(), + presence: [ + { + clientId: fakeClientId, + action: 2, + }, + ], + }), + ) .then(function () { cb(); }) diff --git a/test/realtime/sync.test.js b/test/realtime/sync.test.js index dccbdeff5..40437ed84 100644 --- a/test/realtime/sync.test.js +++ b/test/realtime/sync.test.js @@ -70,26 +70,28 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, Helper, async function (cb) { helper.recordPrivateApi('call.channel.processMessage'); channel - .processMessage({ - action: 16, - channel: channelName, - presence: [ - { - action: 'present', - clientId: 'one', - connectionId: 'one_connid', - id: 'one_connid:0:0', - timestamp: 1e12, - }, - { - action: 'present', - clientId: 'two', - connectionId: 'two_connid', - id: 'two_connid:0:0', - timestamp: 1e12, - }, - ], - }) + .processMessage( + createPM({ + action: 16, + channel: channelName, + presence: [ + { + action: 1, + clientId: 'one', + connectionId: 'one_connid', + id: 'one_connid:0:0', + timestamp: 1e12, + }, + { + action: 1, + clientId: 'two', + connectionId: 'two_connid', + id: 'two_connid:0:0', + timestamp: 1e12, + }, + ], + }), + ) .then(function () { cb(); }) @@ -114,26 +116,28 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, Helper, async /* Trigger another sync. Two has gone without so much as a `leave` message! */ helper.recordPrivateApi('call.channel.processMessage'); channel - .processMessage({ - action: 16, - channel: channelName, - presence: [ - { - action: 'present', - clientId: 'one', - connectionId: 'one_connid', - id: 'one_connid:0:0', - timestamp: 1e12, - }, - { - action: 'present', - clientId: 'three', - connectionId: 'three_connid', - id: 'three_connid:0:0', - timestamp: 1e12, - }, - ], - }) + .processMessage( + createPM({ + action: 16, + channel: channelName, + presence: [ + { + action: 1, + clientId: 'one', + connectionId: 'one_connid', + id: 'one_connid:0:0', + timestamp: 1e12, + }, + { + action: 1, + clientId: 'three', + connectionId: 'three_connid', + id: 'three_connid:0:0', + timestamp: 1e12, + }, + ], + }), + ) .then(function () { cb(); }) @@ -191,67 +195,75 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, Helper, async /* First sync */ helper.recordPrivateApi('call.channel.processMessage'); - await channel.processMessage({ - action: 16, - channel: channelName, - presence: [ - { - action: 'present', - clientId: 'one', - connectionId: 'one_connid', - id: 'one_connid:0:0', - timestamp: 1e12, - }, - ], - }); + await channel.processMessage( + createPM({ + action: 16, + channel: channelName, + presence: [ + { + action: 1, + clientId: 'one', + connectionId: 'one_connid', + id: 'one_connid:0:0', + timestamp: 1e12, + }, + ], + }), + ); /* A second sync, this time in multiple parts, with a presence message in the middle */ helper.recordPrivateApi('call.channel.processMessage'); - await channel.processMessage({ - action: 16, - channel: channelName, - channelSerial: 'serial:cursor', - presence: [ - { - action: 'present', - clientId: 'two', - connectionId: 'two_connid', - id: 'two_connid:0:0', - timestamp: 1e12, - }, - ], - }); + await channel.processMessage( + createPM({ + action: 16, + channel: channelName, + channelSerial: 'serial:cursor', + presence: [ + { + action: 1, + clientId: 'two', + connectionId: 'two_connid', + id: 'two_connid:0:0', + timestamp: 1e12, + }, + ], + }), + ); helper.recordPrivateApi('call.channel.processMessage'); - await channel.processMessage({ - action: 14, - channel: channelName, - presence: [ - { - action: 'enter', - clientId: 'three', - connectionId: 'three_connid', - id: 'three_connid:0:0', - timestamp: 1e12, - }, - ], - }); + await channel.processMessage( + createPM({ + action: 14, + channel: channelName, + presence: [ + { + action: 2, + clientId: 'three', + connectionId: 'three_connid', + id: 'three_connid:0:0', + timestamp: 1e12, + }, + ], + }), + ); helper.recordPrivateApi('call.channel.processMessage'); - await channel.processMessage({ - action: 16, - channel: channelName, - channelSerial: 'serial:', - presence: [ - { - action: 'present', - clientId: 'four', - connectionId: 'four_connid', - id: 'four_connid:0:0', - timestamp: 1e12, - }, - ], - }); + await channel.processMessage( + createPM({ + action: 16, + channel: channelName, + channelSerial: 'serial:', + presence: [ + { + action: 1, + clientId: 'four', + connectionId: 'four_connid', + id: 'four_connid:0:0', + timestamp: 1e12, + }, + ], + }), + ); await new Promise(function (resolve, reject) { var done = function (err) { @@ -302,51 +314,57 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, Helper, async ); helper.recordPrivateApi('call.channel.processMessage'); - await channel.processMessage({ - action: 16, - channel: channelName, - channelSerial: 'serial:cursor', - presence: [ - { - action: 'present', - clientId: 'one', - connectionId: 'one_connid', - id: 'one_connid:0:0', - timestamp: 1e12, - }, - ], - }); + await channel.processMessage( + createPM({ + action: 16, + channel: channelName, + channelSerial: 'serial:cursor', + presence: [ + { + action: 1, + clientId: 'one', + connectionId: 'one_connid', + id: 'one_connid:0:0', + timestamp: 1e12, + }, + ], + }), + ); helper.recordPrivateApi('call.channel.processMessage'); - await channel.processMessage({ - action: 14, - channel: channelName, - presence: [ - { - action: 'enter', - clientId: 'one', - connectionId: 'one_connid', - id: 'one_connid:0:0', - timestamp: 1e12, - }, - ], - }); + await channel.processMessage( + createPM({ + action: 14, + channel: channelName, + presence: [ + { + action: 2, + clientId: 'one', + connectionId: 'one_connid', + id: 'one_connid:0:0', + timestamp: 1e12, + }, + ], + }), + ); helper.recordPrivateApi('call.channel.processMessage'); - await channel.processMessage({ - action: 16, - channel: channelName, - channelSerial: 'serial:', - presence: [ - { - action: 'present', - clientId: 'two', - connectionId: 'two_connid', - id: 'two_connid:0:0', - timestamp: 1e12, - }, - ], - }); + await channel.processMessage( + createPM({ + action: 16, + channel: channelName, + channelSerial: 'serial:', + presence: [ + { + action: 1, + clientId: 'two', + connectionId: 'two_connid', + id: 'two_connid:0:0', + timestamp: 1e12, + }, + ], + }), + ); await new Promise(function (resolve, reject) { var done = function (err) { @@ -394,51 +412,57 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, Helper, async ); helper.recordPrivateApi('call.channel.processMessage'); - await channel.processMessage({ - action: 16, - channel: channelName, - channelSerial: 'serial:cursor', - presence: [ - { - action: 'present', - clientId: 'one', - connectionId: 'one_connid', - id: 'one_connid:0:0', - timestamp: 1e12, - }, - ], - }); + await channel.processMessage( + createPM({ + action: 16, + channel: channelName, + channelSerial: 'serial:cursor', + presence: [ + { + action: 1, + clientId: 'one', + connectionId: 'one_connid', + id: 'one_connid:0:0', + timestamp: 1e12, + }, + ], + }), + ); helper.recordPrivateApi('call.channel.processMessage'); - await channel.processMessage({ - action: 14, - channel: channelName, - presence: [ - { - action: 'enter', - clientId: 'two', - connectionId: 'two_connid', - id: 'two_connid:0:0', - timestamp: 1e12, - }, - ], - }); + await channel.processMessage( + createPM({ + action: 14, + channel: channelName, + presence: [ + { + action: 2, + clientId: 'two', + connectionId: 'two_connid', + id: 'two_connid:0:0', + timestamp: 1e12, + }, + ], + }), + ); helper.recordPrivateApi('call.channel.processMessage'); - await channel.processMessage({ - action: 16, - channel: channelName, - channelSerial: 'serial:', - presence: [ - { - action: 'present', - clientId: 'two', - connectionId: 'two_connid', - id: 'two_connid:0:0', - timestamp: 1e12, - }, - ], - }); + await channel.processMessage( + createPM({ + action: 16, + channel: channelName, + channelSerial: 'serial:', + presence: [ + { + action: 1, + clientId: 'two', + connectionId: 'two_connid', + id: 'two_connid:0:0', + timestamp: 1e12, + }, + ], + }), + ); await new Promise(function (resolve, reject) { var done = function (err) { @@ -490,124 +514,138 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, Helper, async /* One enters */ helper.recordPrivateApi('call.channel.processMessage'); - await channel.processMessage({ - action: 14, - channel: channelName, - id: 'one_connid:1', - connectionId: 'one_connid', - timestamp: 1e12, - presence: [ - { - action: 'enter', - clientId: 'one', - }, - ], - }); + await channel.processMessage( + createPM({ + action: 14, + channel: channelName, + id: 'one_connid:1', + connectionId: 'one_connid', + timestamp: 1e12, + presence: [ + { + action: 2, + clientId: 'one', + }, + ], + }), + ); /* An earlier leave from one (should be ignored) */ helper.recordPrivateApi('call.channel.processMessage'); - await channel.processMessage({ - action: 14, - channel: channelName, - connectionId: 'one_connid', - id: 'one_connid:0', - timestamp: 1e12, - presence: [ - { - action: 'leave', - clientId: 'one', - }, - ], - }); + await channel.processMessage( + createPM({ + action: 14, + channel: channelName, + connectionId: 'one_connid', + id: 'one_connid:0', + timestamp: 1e12, + presence: [ + { + action: 3, + clientId: 'one', + }, + ], + }), + ); /* One adds some data in a newer msgSerial */ helper.recordPrivateApi('call.channel.processMessage'); - await channel.processMessage({ - action: 14, - channel: channelName, - connectionId: 'one_connid', - id: 'one_connid:2', - timestamp: 1e12, - presence: [ - { - action: 'update', - clientId: 'one', - data: 'onedata', - }, - ], - }); + await channel.processMessage( + createPM({ + action: 14, + channel: channelName, + connectionId: 'one_connid', + id: 'one_connid:2', + timestamp: 1e12, + presence: [ + { + action: 4, + clientId: 'one', + data: 'onedata', + }, + ], + }), + ); /* Two enters */ helper.recordPrivateApi('call.channel.processMessage'); - await channel.processMessage({ - action: 14, - channel: channelName, - connectionId: 'two_connid', - id: 'two_connid:0', - timestamp: 1e12, - presence: [ - { - action: 'enter', - clientId: 'two', - }, - ], - }); + await channel.processMessage( + createPM({ + action: 14, + channel: channelName, + connectionId: 'two_connid', + id: 'two_connid:0', + timestamp: 1e12, + presence: [ + { + action: 2, + clientId: 'two', + }, + ], + }), + ); /* Two updates twice in the same message */ helper.recordPrivateApi('call.channel.processMessage'); - await channel.processMessage({ - action: 14, - channel: channelName, - connectionId: 'two_connid', - id: 'two_connid:0', - timestamp: 1e12, - presence: [ - { - action: 'update', - clientId: 'two', - data: 'twowrongdata', - }, - { - action: 'update', - clientId: 'two', - data: 'twodata', - }, - ], - }); + await channel.processMessage( + createPM({ + action: 14, + channel: channelName, + connectionId: 'two_connid', + id: 'two_connid:0', + timestamp: 1e12, + presence: [ + { + action: 4, + clientId: 'two', + data: 'twowrongdata', + }, + { + action: 4, + clientId: 'two', + data: 'twodata', + }, + ], + }), + ); /* Three enters */ helper.recordPrivateApi('call.channel.processMessage'); - await channel.processMessage({ - action: 14, - channel: channelName, - connectionId: 'three_connid', - id: 'three_connid:99', - timestamp: 1e12, - presence: [ - { - action: 'enter', - clientId: 'three', - }, - ], - }); + await channel.processMessage( + createPM({ + action: 14, + channel: channelName, + connectionId: 'three_connid', + id: 'three_connid:99', + timestamp: 1e12, + presence: [ + { + action: 2, + clientId: 'three', + }, + ], + }), + ); /* Synthesized leave for three (with earlier msgSerial, incompatible id, * and later timestamp) */ helper.recordPrivateApi('call.channel.processMessage'); - await channel.processMessage({ - action: 14, - channel: channelName, - connectionId: 'synthesized', - id: 'synthesized:0', - timestamp: 1e12 + 1, - presence: [ - { - action: 'leave', - clientId: 'three', - connectionId: 'three_connid', - }, - ], - }); + await channel.processMessage( + createPM({ + action: 14, + channel: channelName, + connectionId: 'synthesized', + id: 'synthesized:0', + timestamp: 1e12 + 1, + presence: [ + { + action: 3, + clientId: 'three', + connectionId: 'three_connid', + }, + ], + }), + ); await new Promise(function (resolve, reject) { var done = function (err) { @@ -690,18 +728,20 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, Helper, async helper.recordPrivateApi('replace.channel.processMessage'); syncerChannel.processMessage = originalProcessMessage; helper.recordPrivateApi('call.channel.processMessage'); - await syncerChannel.processMessage({ - action: 14, - id: 'messageid:0', - connectionId: 'connid', - timestamp: 2000000000000, - presence: [ - { - clientId: interrupterClientId, - action: 'enter', - }, - ], - }); + await syncerChannel.processMessage( + createPM({ + action: 14, + id: 'messageid:0', + connectionId: 'connid', + timestamp: 2000000000000, + presence: [ + { + clientId: interrupterClientId, + action: 2, + }, + ], + }), + ); } }; Helper.whenPromiseSettles(syncerChannel.attach(), cb); diff --git a/test/rest/presence.test.js b/test/rest/presence.test.js index 4f2bfae46..8381fe3c5 100644 --- a/test/rest/presence.test.js +++ b/test/rest/presence.test.js @@ -89,9 +89,9 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, Helper, async var presenceBool = presenceMessages.find(function (msg) { return msg.clientId == 'client_bool'; }); - expect(JSON.parse(JSON.stringify(presenceBool)).action).to.equal(1); // present + expect(JSON.parse(JSON.stringify(await presenceBool.encode({}))).action).to.equal(1); // present presenceBool.action = 'leave'; - expect(JSON.parse(JSON.stringify(presenceBool)).action).to.equal(3); // leave + expect(JSON.parse(JSON.stringify(await presenceBool.encode({}))).action).to.equal(3); // leave }); /**