From 687633985754fe7dfdafb897a756a78f56adfdd0 Mon Sep 17 00:00:00 2001 From: Andrew Bulat Date: Fri, 25 Oct 2024 11:08:35 +0100 Subject: [PATCH] Use common `_decodeAndPrepareMessages` for processing of `STATE` and `STATE_SYNC` messages This implements the proposal from one of the earlier PRs [1] [1] https://github.com/ably/ably-js/pull/1897#discussion_r1814781743 --- src/common/lib/client/realtimechannel.ts | 70 +++++------------------- 1 file changed, 14 insertions(+), 56 deletions(-) diff --git a/src/common/lib/client/realtimechannel.ts b/src/common/lib/client/realtimechannel.ts index 1b75fc06e..427b6398e 100644 --- a/src/common/lib/client/realtimechannel.ts +++ b/src/common/lib/client/realtimechannel.ts @@ -30,7 +30,7 @@ import { ChannelOptions } from '../../types/channel'; import { normaliseChannelOptions } from '../util/defaults'; import { PaginatedResult } from './paginatedresource'; import type { PushChannel } from 'plugins/push'; -import type { LiveObjects } from 'plugins/liveobjects'; +import type { LiveObjects, StateMessage } from 'plugins/liveobjects'; interface RealtimeHistoryParams { start?: number; @@ -604,69 +604,27 @@ class RealtimeChannel extends EventEmitter { break; } - case actions.STATE: { - if (!this._liveObjects) { - return; - } - - const { id, connectionId, timestamp } = message; - const options = this.channelOptions; - - const stateMessages = message.state ?? []; - for (let i = 0; i < stateMessages.length; i++) { - try { - const stateMessage = stateMessages[i]; - - await this.client._LiveObjectsPlugin?.StateMessage.decode(stateMessage, options, decodeData); - - if (!stateMessage.connectionId) stateMessage.connectionId = connectionId; - if (!stateMessage.timestamp) stateMessage.timestamp = timestamp; - if (!stateMessage.id) stateMessage.id = id + ':' + i; - } catch (e) { - Logger.logAction( - this.logger, - Logger.LOG_ERROR, - 'RealtimeChannel.processMessage()', - (e as Error).toString(), - ); - } - } - - this._liveObjects.handleStateMessages(stateMessages); - - break; - } - + // STATE and STATE_SYNC message processing share most of the logic, so group them together + case actions.STATE: case actions.STATE_SYNC: { if (!this._liveObjects) { return; } - const { id, connectionId, timestamp } = message; + const stateMessages = message.state ?? []; const options = this.channelOptions; + await this._decodeAndPrepareMessages(message, stateMessages, (msg) => + this.client._LiveObjectsPlugin + ? this.client._LiveObjectsPlugin.StateMessage.decode(msg, options, decodeData) + : Utils.throwMissingPluginError('LiveObjects'), + ); - const stateMessages = message.state ?? []; - for (let i = 0; i < stateMessages.length; i++) { - try { - const stateMessage = stateMessages[i]; - - await this.client._LiveObjectsPlugin?.StateMessage.decode(stateMessage, options, decodeData); - - if (!stateMessage.connectionId) stateMessage.connectionId = connectionId; - if (!stateMessage.timestamp) stateMessage.timestamp = timestamp; - if (!stateMessage.id) stateMessage.id = id + ':' + i; - } catch (e) { - Logger.logAction( - this.logger, - Logger.LOG_ERROR, - 'RealtimeChannel.processMessage()', - (e as Error).toString(), - ); - } + if (message.action === actions.STATE) { + this._liveObjects.handleStateMessages(stateMessages); + } else { + this._liveObjects.handleStateSyncMessages(stateMessages, message.channelSerial); } - this._liveObjects.handleStateSyncMessages(stateMessages, message.channelSerial); - break; } @@ -774,7 +732,7 @@ class RealtimeChannel extends EventEmitter { * @returns `unrecoverableError` flag. If `true` indicates that unrecoverable error was encountered during message decoding * and any further message processing should be stopped. Always equals to `false` if `decodeErrorRecoveryHandler` was not provided */ - private async _decodeAndPrepareMessages( + private async _decodeAndPrepareMessages( protocolMessage: ProtocolMessage, messages: T[], decodeFn: (msg: T) => Promise,