Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SDK-3733] Make realtime transports tree-shakable #1432

Merged
merged 13 commits into from
Nov 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion scripts/moduleReport.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,15 @@
const esbuild = require('esbuild');

// List of all modules accepted in ModulesMap
const moduleNames = ['Rest', 'Crypto', 'MsgPack', 'RealtimePresence'];
const moduleNames = [
'Rest',
'Crypto',
'MsgPack',
'RealtimePresence',
'XHRPolling',
'XHRStreaming',
'WebSocketTransport',
];

// List of all free-standing functions exported by the library along with the
// ModulesMap entries that we expect them to transitively import
Expand Down
14 changes: 14 additions & 0 deletions src/common/constants/TransportName.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
export namespace TransportNames {
export const WebSocket = 'web_socket' as const;
export const Comet = 'comet' as const;
export const XhrStreaming = 'xhr_streaming' as const;
export const XhrPolling = 'xhr_polling' as const;
}

type TransportName =
| typeof TransportNames.WebSocket
| typeof TransportNames.Comet
| typeof TransportNames.XhrStreaming
| typeof TransportNames.XhrPolling;

export default TransportName;
8 changes: 0 additions & 8 deletions src/common/constants/TransportNames.ts

This file was deleted.

21 changes: 21 additions & 0 deletions src/common/lib/client/baserealtime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,24 +11,45 @@ import ClientOptions from '../../types/ClientOptions';
import * as API from '../../../../ably';
import { ModulesMap } from './modulesmap';
import RealtimePresence from './realtimepresence';
import { TransportNames } from 'common/constants/TransportName';
import { TransportImplementations } from 'common/platform';

/**
`BaseRealtime` is an export of the tree-shakable version of the SDK, and acts as the base class for the `DefaultRealtime` class exported by the non tree-shakable version.
*/
class BaseRealtime extends BaseClient {
readonly _RealtimePresence: typeof RealtimePresence | null;
// Extra transport implementations available to this client, in addition to those in Platform.Transports.bundledImplementations
readonly _additionalTransportImplementations: TransportImplementations;
_channels: any;
connection: Connection;

constructor(options: ClientOptions, modules: ModulesMap) {
super(options, modules);
Logger.logAction(Logger.LOG_MINOR, 'Realtime()', '');
this._additionalTransportImplementations = BaseRealtime.transportImplementationsFromModules(modules);
this._RealtimePresence = modules.RealtimePresence ?? null;
this.connection = new Connection(this, this.options);
this._channels = new Channels(this);
if (options.autoConnect !== false) this.connect();
}

private static transportImplementationsFromModules(modules: ModulesMap) {
owenpearson marked this conversation as resolved.
Show resolved Hide resolved
const transports: TransportImplementations = {};

if (modules.WebSocketTransport) {
transports[TransportNames.WebSocket] = modules.WebSocketTransport;
}
if (modules.XHRStreaming) {
transports[TransportNames.XhrStreaming] = modules.XHRStreaming;
}
if (modules.XHRPolling) {
transports[TransportNames.XhrPolling] = modules.XHRPolling;
}

return transports;
}

get channels() {
return this._channels;
}
Expand Down
9 changes: 8 additions & 1 deletion src/common/lib/client/defaultrealtime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import { DefaultMessage } from '../types/defaultmessage';
import { MsgPack } from 'common/types/msgpack';
import RealtimePresence from './realtimepresence';
import { DefaultPresenceMessage } from '../types/defaultpresencemessage';
import initialiseWebSocketTransport from '../transport/websockettransport';

/**
`DefaultRealtime` is the class that the non tree-shakable version of the SDK exports as `Realtime`. It ensures that this version of the SDK includes all of the functionality which is optionally available in the tree-shakable version.
Expand All @@ -20,7 +21,13 @@ export class DefaultRealtime extends BaseRealtime {
throw new Error('Expected DefaultRealtime._MsgPack to have been set');
}

super(options, { ...allCommonModules, Crypto: DefaultRealtime.Crypto ?? undefined, MsgPack, RealtimePresence });
super(options, {
...allCommonModules,
Crypto: DefaultRealtime.Crypto ?? undefined,
MsgPack,
RealtimePresence,
WebSocketTransport: initialiseWebSocketTransport,
});
}

static Utils = Utils;
Expand Down
4 changes: 4 additions & 0 deletions src/common/lib/client/modulesmap.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,16 @@ import { Rest } from './rest';
import { IUntypedCryptoStatic } from '../../types/ICryptoStatic';
import { MsgPack } from 'common/types/msgpack';
import RealtimePresence from './realtimepresence';
import { TransportInitialiser } from '../transport/connectionmanager';

export interface ModulesMap {
Rest?: typeof Rest;
Crypto?: IUntypedCryptoStatic;
MsgPack?: MsgPack;
RealtimePresence?: typeof RealtimePresence;
WebSocketTransport?: TransportInitialiser;
XHRPolling?: TransportInitialiser;
XHRStreaming?: TransportInitialiser;
}

export const allCommonModules: ModulesMap = { Rest };
89 changes: 53 additions & 36 deletions src/common/lib/transport/connectionmanager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import ProtocolMessage from 'common/lib/types/protocolmessage';
import * as Utils from 'common/lib/util/utils';
import Protocol, { PendingMessage } from './protocol';
import Defaults, { getAgentString } from 'common/lib/util/defaults';
import Platform from 'common/platform';
import Platform, { TransportImplementations } from 'common/platform';
import EventEmitter from '../util/eventemitter';
import MessageQueue from './messagequeue';
import Logger from '../util/logger';
Expand All @@ -12,14 +12,13 @@ import ErrorInfo, { IPartialErrorInfo, PartialErrorInfo } from 'common/lib/types
import Auth from 'common/lib/client/auth';
import Message from 'common/lib/types/message';
import Multicaster, { MulticasterInstance } from 'common/lib/util/multicaster';
import WebSocketTransport from './websockettransport';
import Transport, { TransportCtor } from './transport';
import * as API from '../../../../ably';
import { ErrCallback } from 'common/types/utils';
import HttpStatusCodes from 'common/constants/HttpStatusCodes';

type Realtime = any;
type ClientOptions = any;
import BaseRealtime from '../client/baserealtime';
import { NormalisedClientOptions } from 'common/types/ClientOptions';
import TransportName, { TransportNames } from 'common/constants/TransportName';

let globalObject = typeof global !== 'undefined' ? global : typeof window !== 'undefined' ? window : self;

Expand Down Expand Up @@ -91,26 +90,24 @@ type RecoveryContext = {
channelSerials: { [name: string]: string };
};

function decodeRecoveryKey(recoveryKey: string): RecoveryContext | null {
function decodeRecoveryKey(recoveryKey: NormalisedClientOptions['recover']): RecoveryContext | null {
try {
return JSON.parse(recoveryKey);
return JSON.parse(recoveryKey as string);
} catch (e) {
return null;
}
}

const supportedTransports: Record<string, TransportCtor> = {};

export class TransportParams {
options: ClientOptions;
options: NormalisedClientOptions;
host: string | null;
mode: string;
format?: Utils.Format;
connectionKey?: string;
stream?: any;
heartbeats?: boolean;

constructor(options: ClientOptions, host: string | null, mode: string, connectionKey?: string) {
constructor(options: NormalisedClientOptions, host: string | null, mode: string, connectionKey?: string) {
this.options = options;
this.host = host;
this.mode = mode;
Expand Down Expand Up @@ -190,8 +187,9 @@ type ConnectionState = {
};

class ConnectionManager extends EventEmitter {
realtime: Realtime;
options: ClientOptions;
supportedTransports: Partial<Record<TransportName, TransportCtor>> = {};
realtime: BaseRealtime;
options: NormalisedClientOptions;
states: Record<string, ConnectionState>;
state: ConnectionState;
errorReason: IPartialErrorInfo | string | null;
Expand All @@ -202,9 +200,9 @@ class ConnectionManager extends EventEmitter {
connectionKey?: string;
connectionStateTtl: number;
maxIdleInterval: number | null;
transports: string[];
baseTransport: string;
upgradeTransports: string[];
transports: TransportName[];
baseTransport: TransportName;
upgradeTransports: TransportName[];
transportPreference: string | null;
httpHosts: string[];
activeProtocol: null | Protocol;
Expand All @@ -226,10 +224,10 @@ class ConnectionManager extends EventEmitter {
queue: { message: ProtocolMessage; transport: Transport }[];
} = { isProcessing: false, queue: [] };

constructor(realtime: Realtime, options: ClientOptions) {
constructor(realtime: BaseRealtime, options: NormalisedClientOptions) {
super();
ConnectionManager.initTransports();
this.realtime = realtime;
this.initTransports();
this.options = options;
const timeouts = options.timeouts;
/* connectingTimeout: leave preferenceConnectTimeout (~6s) to try the
Expand Down Expand Up @@ -305,10 +303,7 @@ class ConnectionManager extends EventEmitter {
this.connectionStateTtl = timeouts.connectionStateTtl;
this.maxIdleInterval = null;

this.transports = Utils.intersect(
options.transports || Defaults.defaultTransports,
ConnectionManager.supportedTransports
);
this.transports = Utils.intersect(options.transports || Defaults.defaultTransports, this.supportedTransports);
/* baseTransports selects the leftmost transport in the Defaults.baseTransportOrder list
* that's both requested and supported. */
this.baseTransport = Utils.intersect(Defaults.baseTransportOrder, this.transports)[0];
Expand Down Expand Up @@ -404,17 +399,32 @@ class ConnectionManager extends EventEmitter {
* transport management
*********************/

static get supportedTransports() {
return supportedTransports;
// Used by tests
static supportedTransports(additionalImplementations: TransportImplementations) {
const storage: TransportStorage = { supportedTransports: {} };
this.initTransports(additionalImplementations, storage);
return storage.supportedTransports;
}

static initTransports() {
WebSocketTransport(ConnectionManager);
Utils.arrForEach(Platform.Transports, function (initFn) {
initFn(ConnectionManager);
private static initTransports(additionalImplementations: TransportImplementations, storage: TransportStorage) {
const implementations = { ...Platform.Transports.bundledImplementations, ...additionalImplementations };

const initialiseWebSocketTransport = implementations[TransportNames.WebSocket];
if (initialiseWebSocketTransport) {
initialiseWebSocketTransport(storage);
}
Utils.arrForEach(Platform.Transports.order, function (transportName) {
const initFn = implementations[transportName];
if (initFn) {
initFn(storage);
}
});
}

initTransports() {
ConnectionManager.initTransports(this.realtime._additionalTransportImplementations, this);
}

createTransportParams(host: string | null, mode: string): TransportParams {
return new TransportParams(this.options, host, mode, this.connectionKey);
}
Expand Down Expand Up @@ -481,11 +491,11 @@ class ConnectionManager extends EventEmitter {
* @param candidate, the transport to try
* @param callback
*/
tryATransport(transportParams: TransportParams, candidate: string, callback: Function): void {
tryATransport(transportParams: TransportParams, candidate: TransportName, callback: Function): void {
Logger.logAction(Logger.LOG_MICRO, 'ConnectionManager.tryATransport()', 'trying ' + candidate);

Transport.tryConnect(
ConnectionManager.supportedTransports[candidate],
this.supportedTransports[candidate]!,
this,
this.realtime.auth,
transportParams,
Expand Down Expand Up @@ -600,7 +610,7 @@ class ConnectionManager extends EventEmitter {
if (mode === 'recover' && this.options.recover) {
/* After a successful recovery, we unpersist, as a recovery key cannot
* be used more than once */
this.options.recover = null;
delete this.options.recover;
this.unpersistConnection();
}
});
Expand Down Expand Up @@ -1191,7 +1201,8 @@ class ConnectionManager extends EventEmitter {
const newState = (this.state = this.states[stateChange.current as string]);
if (stateChange.reason) {
this.errorReason = stateChange.reason;
this.realtime.connection.errorReason = stateChange.reason;
// TODO remove this type assertion after fixing https://github.com/ably/ably-js/issues/1405
this.realtime.connection.errorReason = stateChange.reason as ErrorInfo;
}
if (newState.terminal || newState.state === 'suspended') {
/* suspended is nonterminal, but once in the suspended state, realtime
Expand Down Expand Up @@ -1673,13 +1684,13 @@ class ConnectionManager extends EventEmitter {
this.tryATransport(transportParams, this.baseTransport, hostAttemptCb);
}

getUpgradePossibilities(): string[] {
getUpgradePossibilities(): TransportName[] {
/* returns the subset of upgradeTransports to the right of the current
* transport in upgradeTransports (if it's in there - if not, currentSerial
* will be -1, so return upgradeTransports.slice(0) == upgradeTransports */
const current = (this.activeProtocol as Protocol).getTransport().shortName;
const currentSerial = Utils.arrIndexOf(this.upgradeTransports, current);
return this.upgradeTransports.slice(currentSerial + 1) as string[];
return this.upgradeTransports.slice(currentSerial + 1);
}

upgradeIfNeeded(transportParams: Record<string, any>): void {
Expand All @@ -1694,7 +1705,7 @@ class ConnectionManager extends EventEmitter {
return;
}

Utils.arrForEach(upgradePossibilities, (upgradeTransport: string) => {
Utils.arrForEach(upgradePossibilities, (upgradeTransport: TransportName) => {
/* Note: the transport may mutate the params, so give each transport a fresh one */
const upgradeTransportParams = this.createTransportParams(transportParams.host, 'upgrade');
this.tryATransport(upgradeTransportParams, upgradeTransport, noop);
Expand Down Expand Up @@ -2097,7 +2108,7 @@ class ConnectionManager extends EventEmitter {
this.proposedTransports.push(transport);
}

getTransportPreference(): string {
getTransportPreference(): TransportName {
return this.transportPreference || (haveWebStorage() && Platform.WebStorage?.get?.(transportPreferenceName));
}

Expand Down Expand Up @@ -2166,3 +2177,9 @@ class ConnectionManager extends EventEmitter {
}

export default ConnectionManager;

export interface TransportStorage {
supportedTransports: Partial<Record<TransportName, TransportCtor>>;
}

export type TransportInitialiser = (transportStorage: TransportStorage) => typeof Transport;
9 changes: 5 additions & 4 deletions src/common/lib/transport/websockettransport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@ import Logger from '../util/logger';
import ProtocolMessage from '../types/protocolmessage';
import ErrorInfo from '../types/errorinfo';
import NodeWebSocket from 'ws';
import ConnectionManager, { TransportParams } from './connectionmanager';
import ConnectionManager, { TransportParams, TransportStorage } from './connectionmanager';
import Auth from '../client/auth';
import { TransportNames } from 'common/constants/TransportName';

const shortName = 'web_socket';
const shortName = TransportNames.WebSocket;

function isNodeWebSocket(ws: WebSocket | NodeWebSocket): ws is NodeWebSocket {
return !!(ws as NodeWebSocket).on;
Expand Down Expand Up @@ -195,8 +196,8 @@ class WebSocketTransport extends Transport {
}
}

function initialiseTransport(connectionManager: any): typeof WebSocketTransport {
if (WebSocketTransport.isAvailable()) connectionManager.supportedTransports[shortName] = WebSocketTransport;
function initialiseTransport(transportStorage: TransportStorage): typeof WebSocketTransport {
if (WebSocketTransport.isAvailable()) transportStorage.supportedTransports[shortName] = WebSocketTransport;

return WebSocketTransport;
}
Expand Down
2 changes: 1 addition & 1 deletion src/common/lib/types/protocolmessage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ class ProtocolMessage {

static serialize = Utils.encodeBody;

static deserialize = function (serialized: unknown, MsgPack: MsgPack, format?: Utils.Format): ProtocolMessage {
static deserialize = function (serialized: unknown, MsgPack: MsgPack | null, format?: Utils.Format): ProtocolMessage {
const deserialized = Utils.decodeBody<Record<string, unknown>>(serialized, MsgPack, format);
return ProtocolMessage.fromDeserialized(deserialized);
};
Expand Down
Loading
Loading