Skip to content

Commit

Permalink
RealtimeChannel: refactor to make sendMessage return a promise
Browse files Browse the repository at this point in the history
  • Loading branch information
SimonWoolf committed Jan 23, 2025
1 parent 5880ced commit d96b1a4
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 60 deletions.
74 changes: 36 additions & 38 deletions src/common/lib/client/realtimechannel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,22 @@ import ProtocolMessage, { fromValues as protocolMessageFromValues } from '../typ
import EventEmitter from '../util/eventemitter';
import * as Utils from '../util/utils';
import Logger from '../util/logger';
import RealtimePresence from './realtimepresence';
import { EncodingDecodingContext, CipherOptions, populateFieldsFromParent } from '../types/basemessage';
import Message, { WireMessage, getMessagesSize, encodeArray as encodeMessagesArray } from '../types/message';
import Message, { getMessagesSize, encodeArray as encodeMessagesArray } from '../types/message';
import ChannelStateChange from './channelstatechange';
import ErrorInfo, { PartialErrorInfo } from '../types/errorinfo';
import ConnectionErrors from '../transport/connectionerrors';
import * as API from '../../../../ably';
import ConnectionManager from '../transport/connectionmanager';
import ConnectionStateChange from './connectionstatechange';
import { ErrCallback, StandardCallback } from '../../types/utils';
import { StandardCallback } from '../../types/utils';
import BaseRealtime from './baserealtime';
import { ChannelOptions } from '../../types/channel';
import { normaliseChannelOptions } from '../util/defaults';
import { PaginatedResult } from './paginatedresource';
import type { PushChannel } from 'plugins/push';
import type { WirePresenceMessage } from '../types/presencemessage';
import type RealtimePresence from './realtimepresence';

interface RealtimeHistoryParams {
start?: number;
Expand Down Expand Up @@ -218,9 +218,6 @@ class RealtimeChannel extends EventEmitter {
let messages: Message[];
let argCount = args.length;

if (!this.connectionManager.activeState()) {
throw this.connectionManager.getError();
}
if (argCount == 1) {
if (Utils.isObject(args[0])) {
messages = [Message.fromValues(args[0])];
Expand Down Expand Up @@ -252,33 +249,26 @@ class RealtimeChannel extends EventEmitter {
400,
);
}
return new Promise((resolve, reject) => {
this._publish(wireMessages, (err) => (err ? reject(err) : resolve()));
});

this._throwIfUnpublishableState();

Logger.logAction(
this.logger,
Logger.LOG_MICRO,
'RealtimeChannel.publish()',
'sending message; channel state is ' + this.state + ', message count = ' + wireMessages.length,
);

const pm = protocolMessageFromValues({ action: actions.MESSAGE, channel: this.name, messages: wireMessages });
return this.sendMessage(pm);
}

_publish(messages: Array<WireMessage>, callback: ErrCallback) {
Logger.logAction(this.logger, Logger.LOG_MICRO, 'RealtimeChannel.publish()', 'message count = ' + messages.length);
const state = this.state;
switch (state) {
case 'failed':
case 'suspended':
callback(ErrorInfo.fromValues(this.invalidStateError()));
break;
default: {
Logger.logAction(
this.logger,
Logger.LOG_MICRO,
'RealtimeChannel.publish()',
'sending message; channel state is ' + state,
);
const msg = new ProtocolMessage();
msg.action = actions.MESSAGE;
msg.channel = this.name;
msg.messages = messages;
this.sendMessage(msg, callback);
break;
}
_throwIfUnpublishableState(): void {
if (!this.connectionManager.activeState()) {
throw this.connectionManager.getError();
}
if (this.state === 'failed' || this.state === 'suspended') {
throw this.invalidStateError();
}
}

Expand Down Expand Up @@ -369,7 +359,7 @@ class RealtimeChannel extends EventEmitter {
if (this._lastPayload.decodeFailureRecoveryInProgress) {
attachMsg.channelSerial = this._lastPayload.protocolMessageChannelSerial;
}
this.sendMessage(attachMsg, noop);
this.sendMessage(attachMsg).catch(noop);
}

async detach(): Promise<void> {
Expand Down Expand Up @@ -413,10 +403,10 @@ class RealtimeChannel extends EventEmitter {
}
}

detachImpl(callback?: ErrCallback): void {
detachImpl(): void {
Logger.logAction(this.logger, Logger.LOG_MICRO, 'RealtimeChannel.detach()', 'sending DETACH message');
const msg = protocolMessageFromValues({ action: actions.DETACH, channel: this.name });
this.sendMessage(msg, callback || noop);
this.sendMessage(msg).catch(noop);
}

async subscribe(...args: unknown[] /* [event], listener */): Promise<ChannelStateChange | null> {
Expand Down Expand Up @@ -477,17 +467,25 @@ class RealtimeChannel extends EventEmitter {
connectionManager.send(syncMessage);
}

sendMessage(msg: ProtocolMessage, callback?: ErrCallback): void {
this.connectionManager.send(msg, this.client.options.queueMessages, callback);
async sendMessage(msg: ProtocolMessage): Promise<void> {
return new Promise((resolve, reject) => {
this.connectionManager.send(msg, this.client.options.queueMessages, (err) => {
if (err) {
reject(err);
} else {
resolve();
}
});
});
}

sendPresence(presence: WirePresenceMessage[], callback?: ErrCallback): void {
async sendPresence(presence: WirePresenceMessage[]): Promise<void> {
const msg = protocolMessageFromValues({
action: actions.PRESENCE,
channel: this.name,
presence: presence,
});
this.sendMessage(msg, callback);
return this.sendMessage(msg);
}

// Access to this method is synchronised by ConnectionManager#processChannelMessage, in order to synchronise access to the state stored in _decodingContext.
Expand Down
41 changes: 19 additions & 22 deletions src/common/lib/client/realtimepresence.ts
Original file line number Diff line number Diff line change
Expand Up @@ -127,9 +127,7 @@ class RealtimePresence extends EventEmitter {

switch (channel.state) {
case 'attached':
return new Promise((resolve, reject) => {
channel.sendPresence([wirePresMsg], (err) => (err ? reject(err) : resolve()));
});
return channel.sendPresence([wirePresMsg]);
case 'initialized':
case 'detached':
channel.attach();
Expand Down Expand Up @@ -178,29 +176,25 @@ class RealtimePresence extends EventEmitter {
}
const wirePresMsg = await presence.encode(channel.channelOptions as CipherOptions);

return new Promise((resolve, reject) => {
switch (channel.state) {
case 'attached':
channel.sendPresence([wirePresMsg], (err) => (err ? reject(err) : resolve()));
break;
case 'attaching':
switch (channel.state) {
case 'attached':
return channel.sendPresence([wirePresMsg]);
case 'attaching':
return new Promise((resolve, reject) => {
this.pendingPresence.push({
presence: wirePresMsg,
callback: (err) => (err ? reject(err) : resolve()),
});
break;
case 'initialized':
case 'failed': {
/* we're not attached; therefore we let any entered status
* timeout by itself instead of attaching just in order to leave */
const err = new PartialErrorInfo('Unable to leave presence channel (incompatible state)', 90001);
reject(err);
break;
}
default:
reject(channel.invalidStateError());
});
case 'initialized':
case 'failed': {
/* we're not attached; therefore we let any entered status
* timeout by itself instead of attaching just in order to leave */
throw new PartialErrorInfo('Unable to leave presence channel (incompatible state)', 90001);
}
});
default:
throw channel.invalidStateError();
}
}

async get(params?: RealtimePresenceParams): Promise<PresenceMessage[]> {
Expand Down Expand Up @@ -357,7 +351,10 @@ class RealtimePresence extends EventEmitter {
presenceArray.push(event.presence);
multicaster.push(event.callback);
}
this.channel.sendPresence(presenceArray, multicaster);
this.channel
.sendPresence(presenceArray)
.then(() => multicaster())
.catch((err: ErrorInfo) => multicaster(err));
}
}

Expand Down

0 comments on commit d96b1a4

Please sign in to comment.