diff --git a/src/common/lib/client/realtimechannel.ts b/src/common/lib/client/realtimechannel.ts index bea04fa84..1d11c04ff 100644 --- a/src/common/lib/client/realtimechannel.ts +++ b/src/common/lib/client/realtimechannel.ts @@ -3,22 +3,22 @@ import ProtocolMessage, { fromValues as protocolMessageFromValues } from '../typ import EventEmitter from '../util/eventemitter'; import * as Utils from '../util/utils'; import Logger from '../util/logger'; -import RealtimePresence from './realtimepresence'; import { EncodingDecodingContext, CipherOptions, populateFieldsFromParent } from '../types/basemessage'; -import Message, { WireMessage, getMessagesSize, encodeArray as encodeMessagesArray } from '../types/message'; +import Message, { getMessagesSize, encodeArray as encodeMessagesArray } from '../types/message'; import ChannelStateChange from './channelstatechange'; import ErrorInfo, { PartialErrorInfo } from '../types/errorinfo'; import ConnectionErrors from '../transport/connectionerrors'; import * as API from '../../../../ably'; import ConnectionManager from '../transport/connectionmanager'; import ConnectionStateChange from './connectionstatechange'; -import { ErrCallback, StandardCallback } from '../../types/utils'; +import { StandardCallback } from '../../types/utils'; import BaseRealtime from './baserealtime'; import { ChannelOptions } from '../../types/channel'; import { normaliseChannelOptions } from '../util/defaults'; import { PaginatedResult } from './paginatedresource'; import type { PushChannel } from 'plugins/push'; import type { WirePresenceMessage } from '../types/presencemessage'; +import type RealtimePresence from './realtimepresence'; interface RealtimeHistoryParams { start?: number; @@ -218,9 +218,6 @@ class RealtimeChannel extends EventEmitter { let messages: Message[]; let argCount = args.length; - if (!this.connectionManager.activeState()) { - throw this.connectionManager.getError(); - } if (argCount == 1) { if (Utils.isObject(args[0])) { messages = [Message.fromValues(args[0])]; @@ -252,33 +249,26 @@ class RealtimeChannel extends EventEmitter { 400, ); } - return new Promise((resolve, reject) => { - this._publish(wireMessages, (err) => (err ? reject(err) : resolve())); - }); + + this._throwIfUnpublishableState(); + + Logger.logAction( + this.logger, + Logger.LOG_MICRO, + 'RealtimeChannel.publish()', + 'sending message; channel state is ' + this.state + ', message count = ' + wireMessages.length, + ); + + const pm = protocolMessageFromValues({ action: actions.MESSAGE, channel: this.name, messages: wireMessages }); + return this.sendMessage(pm); } - _publish(messages: Array, callback: ErrCallback) { - Logger.logAction(this.logger, Logger.LOG_MICRO, 'RealtimeChannel.publish()', 'message count = ' + messages.length); - const state = this.state; - switch (state) { - case 'failed': - case 'suspended': - callback(ErrorInfo.fromValues(this.invalidStateError())); - break; - default: { - Logger.logAction( - this.logger, - Logger.LOG_MICRO, - 'RealtimeChannel.publish()', - 'sending message; channel state is ' + state, - ); - const msg = new ProtocolMessage(); - msg.action = actions.MESSAGE; - msg.channel = this.name; - msg.messages = messages; - this.sendMessage(msg, callback); - break; - } + _throwIfUnpublishableState(): void { + if (!this.connectionManager.activeState()) { + throw this.connectionManager.getError(); + } + if (this.state === 'failed' || this.state === 'suspended') { + throw this.invalidStateError(); } } @@ -369,7 +359,7 @@ class RealtimeChannel extends EventEmitter { if (this._lastPayload.decodeFailureRecoveryInProgress) { attachMsg.channelSerial = this._lastPayload.protocolMessageChannelSerial; } - this.sendMessage(attachMsg, noop); + this.sendMessage(attachMsg).catch(noop); } async detach(): Promise { @@ -413,10 +403,10 @@ class RealtimeChannel extends EventEmitter { } } - detachImpl(callback?: ErrCallback): void { + detachImpl(): void { Logger.logAction(this.logger, Logger.LOG_MICRO, 'RealtimeChannel.detach()', 'sending DETACH message'); const msg = protocolMessageFromValues({ action: actions.DETACH, channel: this.name }); - this.sendMessage(msg, callback || noop); + this.sendMessage(msg).catch(noop); } async subscribe(...args: unknown[] /* [event], listener */): Promise { @@ -477,17 +467,25 @@ class RealtimeChannel extends EventEmitter { connectionManager.send(syncMessage); } - sendMessage(msg: ProtocolMessage, callback?: ErrCallback): void { - this.connectionManager.send(msg, this.client.options.queueMessages, callback); + async sendMessage(msg: ProtocolMessage): Promise { + return new Promise((resolve, reject) => { + this.connectionManager.send(msg, this.client.options.queueMessages, (err) => { + if (err) { + reject(err); + } else { + resolve(); + } + }); + }); } - sendPresence(presence: WirePresenceMessage[], callback?: ErrCallback): void { + async sendPresence(presence: WirePresenceMessage[]): Promise { const msg = protocolMessageFromValues({ action: actions.PRESENCE, channel: this.name, presence: presence, }); - this.sendMessage(msg, callback); + return this.sendMessage(msg); } // Access to this method is synchronised by ConnectionManager#processChannelMessage, in order to synchronise access to the state stored in _decodingContext. diff --git a/src/common/lib/client/realtimepresence.ts b/src/common/lib/client/realtimepresence.ts index 63b1d4b82..195108376 100644 --- a/src/common/lib/client/realtimepresence.ts +++ b/src/common/lib/client/realtimepresence.ts @@ -127,9 +127,7 @@ class RealtimePresence extends EventEmitter { switch (channel.state) { case 'attached': - return new Promise((resolve, reject) => { - channel.sendPresence([wirePresMsg], (err) => (err ? reject(err) : resolve())); - }); + return channel.sendPresence([wirePresMsg]); case 'initialized': case 'detached': channel.attach(); @@ -178,29 +176,25 @@ class RealtimePresence extends EventEmitter { } const wirePresMsg = await presence.encode(channel.channelOptions as CipherOptions); - return new Promise((resolve, reject) => { - switch (channel.state) { - case 'attached': - channel.sendPresence([wirePresMsg], (err) => (err ? reject(err) : resolve())); - break; - case 'attaching': + switch (channel.state) { + case 'attached': + return channel.sendPresence([wirePresMsg]); + case 'attaching': + return new Promise((resolve, reject) => { this.pendingPresence.push({ presence: wirePresMsg, callback: (err) => (err ? reject(err) : resolve()), }); - break; - case 'initialized': - case 'failed': { - /* we're not attached; therefore we let any entered status - * timeout by itself instead of attaching just in order to leave */ - const err = new PartialErrorInfo('Unable to leave presence channel (incompatible state)', 90001); - reject(err); - break; - } - default: - reject(channel.invalidStateError()); + }); + case 'initialized': + case 'failed': { + /* we're not attached; therefore we let any entered status + * timeout by itself instead of attaching just in order to leave */ + throw new PartialErrorInfo('Unable to leave presence channel (incompatible state)', 90001); } - }); + default: + throw channel.invalidStateError(); + } } async get(params?: RealtimePresenceParams): Promise { @@ -357,7 +351,10 @@ class RealtimePresence extends EventEmitter { presenceArray.push(event.presence); multicaster.push(event.callback); } - this.channel.sendPresence(presenceArray, multicaster); + this.channel + .sendPresence(presenceArray) + .then(() => multicaster()) + .catch((err: ErrorInfo) => multicaster(err)); } }