Skip to content

Commit

Permalink
Restructure and clean up message and presencemessage types
Browse files Browse the repository at this point in the history
- extract common functionality to a new BaseMessage file
- introduce proper WireMessage and WirePresenceMessage classes, such that:
  - you just call Message.encode() to get a WireMessage, and
    WireMessage.decode() to get a Message, and the type system will tell
    you which one you have
  - the toJSON override hack only applies to internal wire-protocol uses
    and doesn't interfere with the normal public Message/PresenceMessage
    classes
  - all necessary transformations between userfriendly message and wire
    protocol messages (encoding the data and the action)
    are now done by decode() and encode(), rather than (as before) the
    encoding happening in-place and the action encoding happening only
    on json-stringification
  - ProtocolMessage is now always a wire-protocol message and always
    holds WireMessages and WirePresence; all decoding is done by the
    channel
  - The PresencePlugin interface is now just the PresenceMessage and
    WirePresenceMessage types, rather than ad-hoc individual functions
  - The fromWireProtocol, encode, and decode functions in the
    DefaultMessage interface are no longer needed and can be removed
- Cleaned up and simplified the message decoding step in the channel, which
  was unnecessarily complicated
  • Loading branch information
SimonWoolf committed Jan 8, 2025
1 parent cd1fc99 commit e84f548
Show file tree
Hide file tree
Showing 28 changed files with 997 additions and 1,075 deletions.
2 changes: 1 addition & 1 deletion ably.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -875,7 +875,7 @@ export type ChannelMode =
| ChannelModes.PUBLISH
| ChannelModes.SUBSCRIBE
| ChannelModes.PRESENCE
| ChannelModes.PRESENCE_SUBSCRIBE
| ChannelModes.PRESENCE_SUBSCRIBE;

/**
* Passes additional properties to a {@link Channel} or {@link RealtimeChannel} object, such as encryption, {@link ChannelMode} and channel parameters.
Expand Down
11 changes: 3 additions & 8 deletions src/common/lib/client/defaultrealtime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,7 @@ import { DefaultPresenceMessage } from '../types/defaultpresencemessage';
import WebSocketTransport from '../transport/websockettransport';
import { FilteredSubscriptions } from './filteredsubscriptions';
import { PresenceMap } from './presencemap';
import {
fromValues as presenceMessageFromValues,
fromValuesArray as presenceMessagesFromValuesArray,
fromWireProtocol as presenceMessageFromWireProtocol,
} from '../types/presencemessage';
import PresenceMessage, { WirePresenceMessage } from '../types/presencemessage';
import { Http } from 'common/types/http';
import Defaults from '../util/defaults';
import Logger from '../util/logger';
Expand All @@ -39,9 +35,8 @@ export class DefaultRealtime extends BaseRealtime {
MsgPack,
RealtimePresence: {
RealtimePresence,
presenceMessageFromValues,
presenceMessagesFromValuesArray,
presenceMessageFromWireProtocol,
PresenceMessage,
WirePresenceMessage,
},
WebSocketTransport,
MessageInteractions: FilteredSubscriptions,
Expand Down
11 changes: 3 additions & 8 deletions src/common/lib/client/modularplugins.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,13 @@ import RealtimePresence from './realtimepresence';
import XHRRequest from 'platform/web/lib/http/request/xhrrequest';
import fetchRequest from 'platform/web/lib/http/request/fetchrequest';
import { FilteredSubscriptions } from './filteredsubscriptions';
import {
fromValues as presenceMessageFromValues,
fromValuesArray as presenceMessagesFromValuesArray,
fromWireProtocol as presenceMessageFromWireProtocol,
} from '../types/presencemessage';
import PresenceMessage, { WirePresenceMessage } from '../types/presencemessage';
import { TransportCtor } from '../transport/transport';
import * as PushPlugin from 'plugins/push';

export interface PresenceMessagePlugin {
presenceMessageFromValues: typeof presenceMessageFromValues;
presenceMessagesFromValuesArray: typeof presenceMessagesFromValuesArray;
presenceMessageFromWireProtocol: typeof presenceMessageFromWireProtocol;
PresenceMessage: typeof PresenceMessage;
WirePresenceMessage: typeof WirePresenceMessage;
}

export type RealtimePresencePlugin = PresenceMessagePlugin & {
Expand Down
6 changes: 3 additions & 3 deletions src/common/lib/client/presencemap.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import * as Utils from '../util/utils';
import EventEmitter from '../util/eventemitter';
import Logger from '../util/logger';
import PresenceMessage, { fromValues as presenceMessageFromValues } from '../types/presencemessage';
import PresenceMessage from '../types/presencemessage';

import type RealtimePresence from './realtimepresence';

Expand Down Expand Up @@ -80,7 +80,7 @@ export class PresenceMap extends EventEmitter {

put(item: PresenceMessage) {
if (item.action === 'enter' || item.action === 'update') {
item = presenceMessageFromValues(item);
item = PresenceMessage.fromValues(item);
item.action = 'present';
}
const map = this.map,
Expand Down Expand Up @@ -118,7 +118,7 @@ export class PresenceMap extends EventEmitter {

/* RTP2f */
if (this.syncInProgress) {
item = presenceMessageFromValues(item);
item = PresenceMessage.fromValues(item);
item.action = 'absent';
map[key] = item;
} else {
Expand Down
148 changes: 45 additions & 103 deletions src/common/lib/client/realtimechannel.ts
Original file line number Diff line number Diff line change
@@ -1,24 +1,14 @@
import ProtocolMessage, {
actions,
channelModes,
fromValues as protocolMessageFromValues,
} from '../types/protocolmessage';
import { actions, channelModes } from '../types/protocolmessagecommon';
import ProtocolMessage, { fromValues as protocolMessageFromValues } from '../types/protocolmessage';
import EventEmitter from '../util/eventemitter';
import * as Utils from '../util/utils';
import Logger from '../util/logger';
import RealtimePresence from './realtimepresence';
import Message, {
fromValues as messageFromValues,
fromValuesArray as messagesFromValuesArray,
encodeArray as encodeMessagesArray,
decode as decodeMessage,
getMessagesSize,
CipherOptions,
EncodingDecodingContext,
} from '../types/message';
import { EncodingDecodingContext, CipherOptions, populateFieldsFromParent } from '../types/basemessage';
import Message, { WireMessage, getMessagesSize, encodeArray as encodeMessagesArray } from '../types/message';
import ChannelStateChange from './channelstatechange';
import ErrorInfo, { PartialErrorInfo } from '../types/errorinfo';
import PresenceMessage, { decode as decodePresenceMessage } from '../types/presencemessage';
import { WirePresenceMessage } from '../types/presencemessage';
import ConnectionErrors from '../transport/connectionerrors';
import * as API from '../../../../ably';
import ConnectionManager from '../transport/connectionmanager';
Expand Down Expand Up @@ -225,28 +215,32 @@ class RealtimeChannel extends EventEmitter {
}

async publish(...args: any[]): Promise<void> {
let messages = args[0];
let messages: Message[];
let argCount = args.length;

if (!this.connectionManager.activeState()) {
throw this.connectionManager.getError();
}
if (argCount == 1) {
if (Utils.isObject(messages)) messages = [messageFromValues(messages)];
else if (Array.isArray(messages)) messages = messagesFromValuesArray(messages);
else
if (Utils.isObject(args[0])) {
messages = [Message.fromValues(args[0])];
} else if (Array.isArray(args[0])) {
messages = Message.fromValuesArray(args[0]);
} else {
throw new ErrorInfo(
'The single-argument form of publish() expects a message object or an array of message objects',
40013,
400,
);
}
} else {
messages = [messageFromValues({ name: args[0], data: args[1] })];
messages = [Message.fromValues({ name: args[0], data: args[1] })];
}
const maxMessageSize = this.client.options.maxMessageSize;
await encodeMessagesArray(messages, this.channelOptions as CipherOptions);
// TODO get rid of CipherOptions type assertion, indicates channeloptions types are broken
const wireMessages = await encodeMessagesArray(messages, this.channelOptions as CipherOptions);
/* RSL1i */
const size = getMessagesSize(messages);
const size = getMessagesSize(wireMessages);
if (size > maxMessageSize) {
throw new ErrorInfo(
'Maximum size of messages that can be published at once exceeded ( was ' +
Expand All @@ -259,11 +253,11 @@ class RealtimeChannel extends EventEmitter {
);
}
return new Promise((resolve, reject) => {
this._publish(messages, (err) => (err ? reject(err) : resolve()));
this._publish(wireMessages, (err) => (err ? reject(err) : resolve()));
});
}

_publish(messages: Array<Message>, callback: ErrCallback) {
_publish(messages: Array<WireMessage>, callback: ErrCallback) {
Logger.logAction(this.logger, Logger.LOG_MICRO, 'RealtimeChannel.publish()', 'message count = ' + messages.length);
const state = this.state;
switch (state) {
Expand Down Expand Up @@ -482,13 +476,11 @@ class RealtimeChannel extends EventEmitter {
this.connectionManager.send(msg, this.client.options.queueMessages, callback);
}

sendPresence(presence: PresenceMessage | PresenceMessage[], callback?: ErrCallback): void {
sendPresence(presence: WirePresenceMessage[], callback?: ErrCallback): void {
const msg = protocolMessageFromValues({
action: actions.PRESENCE,
channel: this.name,
presence: Array.isArray(presence)
? this.client._RealtimePresence!.presenceMessagesFromValuesArray(presence)
: [this.client._RealtimePresence!.presenceMessageFromValues(presence)],
presence: presence,
});
this.sendMessage(msg, callback);
}
Expand Down Expand Up @@ -565,14 +557,17 @@ class RealtimeChannel extends EventEmitter {
if (!message.presence) break;
// eslint-disable-next-line no-fallthrough
case actions.PRESENCE: {
const presenceMessages = message.presence;

if (!presenceMessages) {
if (!message.presence) {
break;
}

populateFieldsFromParent(message);
const options = this.channelOptions;
await this._decodeAndPrepareMessages(message, presenceMessages, (msg) => decodePresenceMessage(msg, options));
const presenceMessages = await Promise.all(
message.presence.map((wpm) => {
return wpm.decode(options, this.logger);
}),
);

if (this._presence) {
this._presence.setPresence(presenceMessages, isSync, syncChannelSerial as any);
Expand All @@ -597,9 +592,11 @@ class RealtimeChannel extends EventEmitter {
return;
}

const messages = message.messages as Array<Message>,
firstMessage = messages[0],
lastMessage = messages[messages.length - 1],
populateFieldsFromParent(message);

const encoded = message.messages!,
firstMessage = encoded[0],
lastMessage = encoded[encoded.length - 1],
channelSerial = message.channelSerial;

if (
Expand All @@ -618,44 +615,34 @@ class RealtimeChannel extends EventEmitter {
break;
}

const { unrecoverableError } = await this._decodeAndPrepareMessages(
message,
messages,
(msg) => decodeMessage(msg, this._decodingContext),
(e) => {
/* decrypt failed .. the most likely cause is that we have the wrong key */
const errorInfo = e as ErrorInfo;
let messages: Message[] = [];
for (let i = 0; i < encoded.length; i++) {
const { decoded, err } = await encoded[i].decodeWithErr(this._decodingContext, this.logger);
messages[i] = decoded;

switch (errorInfo.code) {
if (err) {
switch (err.code) {
case 40018:
/* decode failure */
this._startDecodeFailureRecovery(errorInfo);
return { unrecoverableError: true };
this._startDecodeFailureRecovery(err);
return;

case 40019:
/* No vcdiff plugin passed in - no point recovering, give up */
// eslint-disable-next-line no-fallthrough
case 40019: /* No vcdiff plugin passed in - no point recovering, give up */
case 40021:
/* Browser does not support deltas, similarly no point recovering */
this.notifyState('failed', errorInfo);
return { unrecoverableError: true };
this.notifyState('failed', err);
return;

default:
return { unrecoverableError: false };
// do nothing, continue decoding
}
},
);
if (unrecoverableError) {
return;
}
}

for (let i = 0; i < messages.length; i++) {
const msg = messages[i];
if (channelSerial && !msg.version) {
msg.version = channelSerial + ':' + i.toString().padStart(3, '0');
// already done in fromWireProtocol -- but for realtime messages the source
// fields might be copied from the protocolmessage, so need to do it again
msg.expandFields();
}
}

Expand Down Expand Up @@ -688,51 +675,6 @@ class RealtimeChannel extends EventEmitter {
}
}

/**
* Mutates provided messages by adding `connectionId`, `timestamp` and `id` fields, and decoding message data.
*
* @returns `unrecoverableError` flag. If `true` indicates that unrecoverable error was encountered during message decoding
* and any further message processing should be stopped. Always equals to `false` if `decodeErrorRecoveryHandler` was not provided
*/
private async _decodeAndPrepareMessages<T extends Message | PresenceMessage>(
protocolMessage: ProtocolMessage,
messages: T[],
decodeFn: (msg: T) => Promise<void>,
decodeErrorRecoveryHandler?: (e: Error) => { unrecoverableError: boolean },
): Promise<{ unrecoverableError: boolean }> {
const { id, connectionId, timestamp } = protocolMessage;

for (let i = 0; i < messages.length; i++) {
const msg = messages[i];

try {
// decode underlying data for a message
await decodeFn(msg);
} catch (e) {
Logger.logAction(
this.logger,
Logger.LOG_ERROR,
'RealtimeChannel.decodeAndPrepareMessages()',
(e as Error).toString(),
);

if (decodeErrorRecoveryHandler) {
const { unrecoverableError } = decodeErrorRecoveryHandler(e as Error);
if (unrecoverableError) {
// break out of for loop by returning
return { unrecoverableError: true };
}
}
}

if (!msg.connectionId) msg.connectionId = connectionId;
if (!msg.timestamp) msg.timestamp = timestamp;
if (id && !msg.id) msg.id = id + ':' + i;
}

return { unrecoverableError: false };
}

_startDecodeFailureRecovery(reason: ErrorInfo): void {
if (!this._lastPayload.decodeFailureRecoveryInProgress) {
Logger.logAction(
Expand Down
Loading

0 comments on commit e84f548

Please sign in to comment.