From b7773e8321bace95f754bfe7d4a0325a8b28c3b0 Mon Sep 17 00:00:00 2001 From: Tyler Date: Wed, 15 Jan 2025 16:48:57 -0500 Subject: [PATCH 1/5] fix-one-more-error --- .../lib/indexerValueManagerHelpers.ts | 2 +- .../websocket/lib/indexerWebsocket.ts | 339 ++++++++++++------ .../websocket/lib/reconnectingWebsocket.ts | 10 +- 3 files changed, 234 insertions(+), 117 deletions(-) diff --git a/src/abacus-ts/websocket/lib/indexerValueManagerHelpers.ts b/src/abacus-ts/websocket/lib/indexerValueManagerHelpers.ts index dba78c1f7..40d5ca6c9 100644 --- a/src/abacus-ts/websocket/lib/indexerValueManagerHelpers.ts +++ b/src/abacus-ts/websocket/lib/indexerValueManagerHelpers.ts @@ -6,7 +6,7 @@ import { IndexerWebsocketManager } from './indexerWebsocketManager'; import { WebsocketDerivedValue } from './websocketDerivedValue'; // this is set to just above the websocket subscribe timeout because of race conditions in the indexer backend -const DESTROY_DELAY_MS = 21000; +const DESTROY_DELAY_MS = 22000; type WebsocketValueCreator = ( websocket: IndexerWebsocket, diff --git a/src/abacus-ts/websocket/lib/indexerWebsocket.ts b/src/abacus-ts/websocket/lib/indexerWebsocket.ts index 3de72adbb..7257ca75a 100644 --- a/src/abacus-ts/websocket/lib/indexerWebsocket.ts +++ b/src/abacus-ts/websocket/lib/indexerWebsocket.ts @@ -1,3 +1,4 @@ +/* eslint-disable max-classes-per-file */ import { logAbacusTsError, logAbacusTsInfo } from '@/abacus-ts/logs'; import typia from 'typia'; @@ -9,29 +10,87 @@ import { isTruthy } from '@/lib/isTruthy'; import { ReconnectingWebSocket } from './reconnectingWebsocket'; const NO_ID_SPECIAL_STRING_ID = '______EMPTY_ID______'; -const CHANNEL_ID_SAFE_DIVIDER = '//////////'; const CHANNEL_RETRY_COOLDOWN_MS = timeUnits.minute; +interface SubscriptionHandlerInput { + channel: string; + id: string | undefined; + batched: boolean; + handleBaseData: (data: any, fullMessage: any) => void; + handleUpdates: (updates: any[], fullMessage: any) => void; +} + +type SubscriptionHandlerTrackingMetadata = { + receivedBaseData: boolean; + sentSubMessage: boolean; + lastRetryBecauseErrorMs: number | undefined; + lastRetryBecauseDuplicateMs: number | undefined; +}; +type SubscriptionHandler = SubscriptionHandlerInput & SubscriptionHandlerTrackingMetadata; + +type SubscriptionMap = { + [channel: string]: { + [idOrSpecialString: string]: SubscriptionHandler; + }; +}; + +class SubscriptionManager { + private subscriptions: SubscriptionMap = {}; + + hasSubscription(channel: string, id?: string): boolean { + const normalizedId = id ?? NO_ID_SPECIAL_STRING_ID; + return Boolean(this.subscriptions[channel]?.[normalizedId]); + } + + getSubscription(channel: string, id?: string): SubscriptionHandler | undefined { + const normalizedId = id ?? NO_ID_SPECIAL_STRING_ID; + if (!this.hasSubscription(channel, normalizedId)) { + return undefined; + } + return this.subscriptions[channel]?.[normalizedId]; + } + + addSubscription(handler: SubscriptionHandler): boolean { + const normalizedId = handler.id ?? NO_ID_SPECIAL_STRING_ID; + const channel = handler.channel; + + if (this.hasSubscription(channel, normalizedId)) { + return false; + } + + this.subscriptions[channel] ??= {}; + this.subscriptions[channel][normalizedId] = handler; + return true; + } + + removeSubscription(channel: string, id: string | undefined): boolean { + const normalizedId = id ?? NO_ID_SPECIAL_STRING_ID; + if (!this.hasSubscription(channel, normalizedId)) { + return false; + } + delete this.subscriptions[channel]![normalizedId]; + return true; + } + + getAllSubscriptions(): SubscriptionHandler[] { + return Object.values(this.subscriptions) + .filter(isTruthy) + .flatMap((channelSubs) => Object.values(channelSubs)) + .filter(isTruthy); + } + + getChannelSubscriptions(channel: string): SubscriptionHandler[] { + return Object.values(this.subscriptions[channel] ?? {}); + } +} + export class IndexerWebsocket { private socket: ReconnectingWebSocket | null = null; // for logging purposes, to differentiate when user has many tabs open private indexerWsId = crypto.randomUUID(); - private subscriptions: { - [channel: string]: { - [id: string]: { - channel: string; - id: string | undefined; - batched: boolean; - handleBaseData: (data: any, fullMessage: any) => void; - handleUpdates: (updates: any[], fullMessage: any) => void; - sentSubMessage: boolean; - }; - }; - } = {}; - - private lastRetryTimeMsByChannelAndId: { [channelAndId: string]: number } = {}; + private subscriptions = new SubscriptionManager(); constructor(url: string) { this.socket = new ReconnectingWebSocket({ @@ -57,13 +116,7 @@ export class IndexerWebsocket { batched = true, handleUpdates, handleBaseData, - }: { - channel: string; - id: string | undefined; - batched?: boolean; - handleBaseData: (data: any, fullMessage: any) => void; - handleUpdates: (data: any[], fullMessage: any) => void; - }): () => void { + }: SubscriptionHandlerInput): () => void { this._addSub({ channel, id, batched, handleUpdates, handleBaseData }); return () => { this._performUnsub({ channel, id }); @@ -76,21 +129,32 @@ export class IndexerWebsocket { batched = true, handleUpdates, handleBaseData, - }: { - channel: string; - id: string | undefined; - batched?: boolean; - handleBaseData: (data: any, fullMessage: any) => void; - handleUpdates: (data: any[], fullMessage: any) => void; - }) => { - this.subscriptions[channel] ??= {}; - if (this.subscriptions[channel][id ?? NO_ID_SPECIAL_STRING_ID] != null) { + + // below here is any metadata we want to allow maintaining between resubscribes + lastRetryBecauseErrorMs = undefined, + lastRetryBecauseDuplicateMs = undefined, + }: SubscriptionHandlerInput & Partial) => { + const success = this.subscriptions.addSubscription({ + channel, + id, + batched, + handleBaseData, + handleUpdates, + sentSubMessage: false, + receivedBaseData: false, + lastRetryBecauseErrorMs, + lastRetryBecauseDuplicateMs, + }); + + // fails if already exists + if (!success) { logAbacusTsError('IndexerWebsocket', 'this subscription already exists', { id: `${channel}/${id}`, wsId: this.indexerWsId, }); - throw new Error(`IndexerWebsocket error: this subscription already exists. ${channel}/${id}`); + return; } + logAbacusTsInfo('IndexerWebsocket', 'adding subscription', { channel, id, @@ -98,16 +162,9 @@ export class IndexerWebsocket { socketActive: this.socket?.isActive(), wsId: this.indexerWsId, }); - this.subscriptions[channel][id ?? NO_ID_SPECIAL_STRING_ID] = { - channel, - id, - batched, - handleBaseData, - handleUpdates, - sentSubMessage: false, - }; + if (this.socket != null && this.socket.isActive()) { - this.subscriptions[channel][id ?? NO_ID_SPECIAL_STRING_ID]!.sentSubMessage = true; + this.subscriptions.getSubscription(channel, id)!.sentSubMessage = true; this.socket.send({ batched, channel, @@ -117,23 +174,28 @@ export class IndexerWebsocket { } }; - private _performUnsub = ({ channel, id }: { channel: string; id: string | undefined }) => { - if (this.subscriptions[channel] == null) { + private _performUnsub = ( + { channel, id }: { channel: string; id: string | undefined }, + // if true, don't send the unsub message, just remove from the internal data structure + forceSuppressWsMessage: boolean = false + ) => { + const sub = this.subscriptions.getSubscription(channel, id); + const success = this.subscriptions.removeSubscription(channel, id); + + // only if doesn't exist + if (!success || sub == null) { logAbacusTsError( 'IndexerWebsocket', 'unsubbing from nonexistent or already unsubbed channel', - { channel, wsId: this.indexerWsId } + { channel, id, wsId: this.indexerWsId } ); return; } - if (this.subscriptions[channel][id ?? NO_ID_SPECIAL_STRING_ID] == null) { - logAbacusTsError( - 'IndexerWebsocket', - 'unsubbing from nonexistent or already unsubbed channel', - { channel, id, wsId: this.indexerWsId } - ); + + if (forceSuppressWsMessage) { return; } + logAbacusTsInfo('IndexerWebsocket', 'removing subscription', { channel, id, @@ -141,22 +203,18 @@ export class IndexerWebsocket { socketActive: this.socket?.isActive(), wsId: this.indexerWsId, }); - if ( - this.socket != null && - this.socket.isActive() && - this.subscriptions[channel][id ?? NO_ID_SPECIAL_STRING_ID]!.sentSubMessage - ) { + + if (this.socket != null && this.socket.isActive() && sub.sentSubMessage) { this.socket.send({ channel, id, type: 'unsubscribe', }); } - delete this.subscriptions[channel][id ?? NO_ID_SPECIAL_STRING_ID]; }; private _refreshSub = (channel: string, id: string | undefined) => { - const sub = this.subscriptions[channel]?.[id ?? NO_ID_SPECIAL_STRING_ID]; + const sub = this.subscriptions.getSubscription(channel, id); if (sub == null) { return; } @@ -164,33 +222,96 @@ export class IndexerWebsocket { this._addSub(sub); }; - // if we get a "could not fetch data" error, we retry once as long as this channel+id is not on cooldown - // TODO: remove this entirely when backend is more reliable - private _handleErrorReceived = (message: IndexerWebsocketErrorMessage) => { + private _maybeRetryTimeoutError = (message: IndexerWebsocketErrorMessage): boolean => { if (message.message.startsWith('Internal error, could not fetch data for subscription: ')) { const maybeChannel = message.channel; const maybeId = message.id; - if (maybeChannel != null && maybeChannel.startsWith('v4_')) { - const channelAndId = `${maybeChannel}${CHANNEL_ID_SAFE_DIVIDER}${maybeId ?? NO_ID_SPECIAL_STRING_ID}`; - const lastRefresh = this.lastRetryTimeMsByChannelAndId[channelAndId] ?? 0; - if (Date.now() - lastRefresh > CHANNEL_RETRY_COOLDOWN_MS) { - this.lastRetryTimeMsByChannelAndId[channelAndId] = Date.now(); + if ( + maybeChannel != null && + maybeChannel.startsWith('v4_') && + this.subscriptions.hasSubscription(maybeChannel, maybeId) + ) { + const sub = this.subscriptions.getSubscription(maybeChannel, maybeId)!; + const lastRefresh = sub.lastRetryBecauseErrorMs ?? 0; + const hasBaseData = sub.receivedBaseData; + if (!hasBaseData && Date.now() - lastRefresh > CHANNEL_RETRY_COOLDOWN_MS) { + sub.lastRetryBecauseErrorMs = Date.now(); this._refreshSub(maybeChannel, maybeId); - logAbacusTsInfo('IndexerWebsocket', 'error fetching data for channel, refetching', { + logAbacusTsInfo('IndexerWebsocket', 'error fetching subscription, refetching', { maybeChannel, maybeId, + socketNonNull: this.socket != null, + socketActive: this.socket?.isActive(), wsId: this.indexerWsId, }); - return; + return true; + } + logAbacusTsError('IndexerWebsocket', 'error fetching subscription, not retrying:', { + maybeChannel, + maybeId, + hasBaseData, + elapsedSinceLast: Date.now() - lastRefresh, + wsId: this.indexerWsId, + }); + return true; + } + } + return false; + }; + + private _maybeFixDuplicateSubError = (message: IndexerWebsocketErrorMessage): boolean => { + if (message.message.startsWith('Invalid subscribe message: already subscribed (')) { + // temp until backend adds metadata + const parsedMatches = message.message.match(/\(([\w_]+)-(.+?)\)/); + const maybeChannel = parsedMatches?.[1]; + + let maybeId = parsedMatches?.[2]; + if (maybeId === maybeChannel) { + maybeId = undefined; + } + + if ( + maybeChannel != null && + maybeChannel.startsWith('v4_') && + this.subscriptions.hasSubscription(maybeChannel, maybeId) + ) { + const sub = this.subscriptions.getSubscription(maybeChannel, maybeId)!; + const lastRefresh = sub.lastRetryBecauseDuplicateMs ?? 0; + const hasBaseData = sub.receivedBaseData; + if (!hasBaseData && Date.now() - lastRefresh > CHANNEL_RETRY_COOLDOWN_MS) { + sub.lastRetryBecauseDuplicateMs = Date.now(); + this._refreshSub(maybeChannel, maybeId); + logAbacusTsInfo('IndexerWebsocket', 'error: subscription already exists, refetching', { + maybeChannel, + maybeId, + socketNonNull: this.socket != null, + socketActive: this.socket?.isActive(), + wsId: this.indexerWsId, + }); + return true; } - logAbacusTsError('IndexerWebsocket', 'hit max retries for channel:', { + logAbacusTsError('IndexerWebsocket', 'error: subscription already exists, not retrying', { maybeChannel, maybeId, + hasBaseData, + elapsedSinceLast: Date.now() - lastRefresh, wsId: this.indexerWsId, }); - return; + return true; } } + return false; + }; + + // if we get a "could not fetch data" error, we retry once as long as this channel+id is not on cooldown + // TODO: remove this entirely when backend is more reliable + private _handleErrorReceived = (message: IndexerWebsocketErrorMessage) => { + let handled = this._maybeRetryTimeoutError(message); + // this ensures this isn't called if we already retried and tracks if we have handled it yet + handled = handled || this._maybeFixDuplicateSubError(message); + if (handled) { + return; + } logAbacusTsError('IndexerWebsocket', 'encountered server side error:', { message, wsId: this.indexerWsId, @@ -219,23 +340,14 @@ export class IndexerWebsocket { const channel = message.channel; const id = message.id; - if (this.subscriptions[channel] == null) { - // hide error for channel we expect to see it on - if (channel !== 'v4_orderbook') { - logAbacusTsError('IndexerWebsocket', 'encountered message with unknown target', { - channel, - id, - wsId: this.indexerWsId, - }); - } - return; - } - if (this.subscriptions[channel][id ?? NO_ID_SPECIAL_STRING_ID] == null) { + const sub = this.subscriptions.getSubscription(channel, id); + if (!this.subscriptions.hasSubscription(channel, id) || sub == null) { // hide error for channel we expect to see it on if (channel !== 'v4_orderbook') { - logAbacusTsError('IndexerWebsocket', 'encountered message with unknown target', { + logAbacusTsInfo('IndexerWebsocket', 'encountered message with unknown target', { channel, id, + type: message.type, wsId: this.indexerWsId, }); } @@ -247,21 +359,37 @@ export class IndexerWebsocket { id, wsId: this.indexerWsId, }); - this.subscriptions[channel][id ?? NO_ID_SPECIAL_STRING_ID]!.handleBaseData( - message.contents, - message - ); + sub.receivedBaseData = true; + sub.handleBaseData(message.contents, message); } else if (message.type === 'channel_data') { - this.subscriptions[channel][id ?? NO_ID_SPECIAL_STRING_ID]!.handleUpdates( - [message.contents], - message - ); + if (!sub.receivedBaseData) { + logAbacusTsError( + 'IndexerWebsocket', + 'message received before subscription confirmed, hiding', + { + channel, + id, + wsId: this.indexerWsId, + } + ); + return; + } + sub.handleUpdates([message.contents], message); // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition } else if (message.type === 'channel_batch_data') { - this.subscriptions[channel][id ?? NO_ID_SPECIAL_STRING_ID]!.handleUpdates( - message.contents, - message - ); + if (!sub.receivedBaseData) { + logAbacusTsError( + 'IndexerWebsocket', + 'message received before subscription confirmed, hiding', + { + channel, + id, + wsId: this.indexerWsId, + } + ); + return; + } + sub.handleUpdates(message.contents, message); } else { assertNever(message); } @@ -284,25 +412,14 @@ export class IndexerWebsocket { wsId: this.indexerWsId, socketNonNull: this.socket != null, socketActive: this.socket?.isActive(), - subs: Object.values(this.subscriptions) - .flatMap((o) => Object.values(o)) - .filter(isTruthy) - .map((o) => `${o.channel}///${o.id}`), + subs: this.subscriptions.getAllSubscriptions().map((o) => `${o.channel}///${o.id}`), }); if (this.socket != null && this.socket.isActive()) { - Object.values(this.subscriptions) - .filter(isTruthy) - .flatMap((o) => Object.values(o)) - .filter(isTruthy) - .forEach(({ batched, channel, id }) => { - this.subscriptions[channel]![id ?? NO_ID_SPECIAL_STRING_ID]!.sentSubMessage = true; - this.socket!.send({ - batched, - channel, - id, - type: 'subscribe', - }); - }); + this.subscriptions.getAllSubscriptions().forEach(({ channel, id }) => { + const sub = this.subscriptions.getSubscription(channel, id)!; + this._performUnsub(sub, true); + this._addSub(sub); + }); } else { logAbacusTsError( 'IndexerWebsocket', diff --git a/src/abacus-ts/websocket/lib/reconnectingWebsocket.ts b/src/abacus-ts/websocket/lib/reconnectingWebsocket.ts index 73e80d342..784e05b99 100644 --- a/src/abacus-ts/websocket/lib/reconnectingWebsocket.ts +++ b/src/abacus-ts/websocket/lib/reconnectingWebsocket.ts @@ -154,7 +154,7 @@ class WebSocketConnection { this.ws = new WebSocket(url); this.setupEventHandlers(); } catch (error) { - logAbacusTsError('WebSocketConnection', 'error connecting', error); + logAbacusTsError('WebSocketConnection', 'error connecting', { error }); this.close(); // we don't rethrow because we instead call the handleClose method } @@ -169,7 +169,7 @@ class WebSocketConnection { const data = JSON.parse(event.data); this.handleMessage(this.id, data); } catch (e) { - logAbacusTsError('WebSocketConnection', 'error in handler', e); + logAbacusTsError('WebSocketConnection', 'error in handler', { data: event.data, error: e }); } }; @@ -180,7 +180,7 @@ class WebSocketConnection { try { this.handleConnected(this.id); } catch (e) { - logAbacusTsError('WebSocketConnection', 'error in handleConnected', e); + logAbacusTsError('WebSocketConnection', 'error in handleConnected', { error: e }); } }; @@ -224,7 +224,7 @@ class WebSocketConnection { this.ws?.close(); this.ws = null; } catch (e) { - logAbacusTsError('WebSocketConnection', 'error closing socket', e); + logAbacusTsError('WebSocketConnection', 'error closing socket', { error: e }); } } @@ -245,7 +245,7 @@ class WebSocketConnection { const message = typeof data === 'string' ? data : JSON.stringify(data); this.ws!.send(message); } catch (e) { - logAbacusTsError('WebSocketConnection', 'error sending data', e, data); + logAbacusTsError('WebSocketConnection', 'error sending data', { error: e, data }); } } } From cc383ab9d8d9521e198729c7e5441d7d7af787eb Mon Sep 17 00:00:00 2001 From: Tyler Date: Wed, 15 Jan 2025 16:54:43 -0500 Subject: [PATCH 2/5] fix --- src/abacus-ts/websocket/lib/websocketDerivedValue.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/src/abacus-ts/websocket/lib/websocketDerivedValue.ts b/src/abacus-ts/websocket/lib/websocketDerivedValue.ts index 53912a015..8dceb2592 100644 --- a/src/abacus-ts/websocket/lib/websocketDerivedValue.ts +++ b/src/abacus-ts/websocket/lib/websocketDerivedValue.ts @@ -25,6 +25,7 @@ export class WebsocketDerivedValue { this.unsubFromWs = websocket.addChannelSubscription({ channel: sub.channel, id: sub.id, + batched: true, handleBaseData: (data, fullMessage) => this._setValue(sub.handleBaseData(data, this.value, fullMessage)), handleUpdates: (updates, fullMessage) => From 52edbb567e3b7d4818afff11bb23af6517f9bb58 Mon Sep 17 00:00:00 2001 From: Tyler Date: Wed, 15 Jan 2025 16:56:00 -0500 Subject: [PATCH 3/5] fix --- src/abacus-ts/websocket/lib/indexerValueManagerHelpers.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/abacus-ts/websocket/lib/indexerValueManagerHelpers.ts b/src/abacus-ts/websocket/lib/indexerValueManagerHelpers.ts index 40d5ca6c9..5f1d06948 100644 --- a/src/abacus-ts/websocket/lib/indexerValueManagerHelpers.ts +++ b/src/abacus-ts/websocket/lib/indexerValueManagerHelpers.ts @@ -6,7 +6,7 @@ import { IndexerWebsocketManager } from './indexerWebsocketManager'; import { WebsocketDerivedValue } from './websocketDerivedValue'; // this is set to just above the websocket subscribe timeout because of race conditions in the indexer backend -const DESTROY_DELAY_MS = 22000; +const DESTROY_DELAY_MS = 23000; type WebsocketValueCreator = ( websocket: IndexerWebsocket, From 710160e32842821e1ff5c7971218da4e299c040c Mon Sep 17 00:00:00 2001 From: Tyler Date: Wed, 15 Jan 2025 17:07:10 -0500 Subject: [PATCH 4/5] fix --- src/abacus-ts/websocket/lib/indexerWebsocket.ts | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/abacus-ts/websocket/lib/indexerWebsocket.ts b/src/abacus-ts/websocket/lib/indexerWebsocket.ts index 7257ca75a..a92ff9934 100644 --- a/src/abacus-ts/websocket/lib/indexerWebsocket.ts +++ b/src/abacus-ts/websocket/lib/indexerWebsocket.ts @@ -23,6 +23,8 @@ interface SubscriptionHandlerInput { type SubscriptionHandlerTrackingMetadata = { receivedBaseData: boolean; sentSubMessage: boolean; + // using subs for this data means we lose it when the user fully unsubscribes and thus might actually retry more often than expected + // but this is fine since every consumer has subs wrapped in resource managers that prevent lots of churn lastRetryBecauseErrorMs: number | undefined; lastRetryBecauseDuplicateMs: number | undefined; }; From 7eece14cb5e1eb81b23d61df7fb5e93153298305 Mon Sep 17 00:00:00 2001 From: Tyler Date: Wed, 15 Jan 2025 17:28:02 -0500 Subject: [PATCH 5/5] fix --- .../websocket/lib/indexerWebsocket.ts | 23 +++++++++---------- 1 file changed, 11 insertions(+), 12 deletions(-) diff --git a/src/abacus-ts/websocket/lib/indexerWebsocket.ts b/src/abacus-ts/websocket/lib/indexerWebsocket.ts index a92ff9934..ad2bf35dd 100644 --- a/src/abacus-ts/websocket/lib/indexerWebsocket.ts +++ b/src/abacus-ts/websocket/lib/indexerWebsocket.ts @@ -76,7 +76,6 @@ class SubscriptionManager { getAllSubscriptions(): SubscriptionHandler[] { return Object.values(this.subscriptions) - .filter(isTruthy) .flatMap((channelSubs) => Object.values(channelSubs)) .filter(isTruthy); } @@ -136,7 +135,7 @@ export class IndexerWebsocket { lastRetryBecauseErrorMs = undefined, lastRetryBecauseDuplicateMs = undefined, }: SubscriptionHandlerInput & Partial) => { - const success = this.subscriptions.addSubscription({ + const wasSuccessful = this.subscriptions.addSubscription({ channel, id, batched, @@ -149,7 +148,7 @@ export class IndexerWebsocket { }); // fails if already exists - if (!success) { + if (!wasSuccessful) { logAbacusTsError('IndexerWebsocket', 'this subscription already exists', { id: `${channel}/${id}`, wsId: this.indexerWsId, @@ -161,7 +160,7 @@ export class IndexerWebsocket { channel, id, socketNonNull: this.socket != null, - socketActive: this.socket?.isActive(), + socketActive: Boolean(this.socket?.isActive()), wsId: this.indexerWsId, }); @@ -179,13 +178,13 @@ export class IndexerWebsocket { private _performUnsub = ( { channel, id }: { channel: string; id: string | undefined }, // if true, don't send the unsub message, just remove from the internal data structure - forceSuppressWsMessage: boolean = false + shouldSuppressWsMessage: boolean = false ) => { const sub = this.subscriptions.getSubscription(channel, id); - const success = this.subscriptions.removeSubscription(channel, id); + const wasSuccessful = this.subscriptions.removeSubscription(channel, id); // only if doesn't exist - if (!success || sub == null) { + if (!wasSuccessful || sub == null) { logAbacusTsError( 'IndexerWebsocket', 'unsubbing from nonexistent or already unsubbed channel', @@ -194,7 +193,7 @@ export class IndexerWebsocket { return; } - if (forceSuppressWsMessage) { + if (shouldSuppressWsMessage) { return; } @@ -202,7 +201,7 @@ export class IndexerWebsocket { channel, id, socketNonNull: this.socket != null, - socketActive: this.socket?.isActive(), + socketActive: Boolean(this.socket?.isActive()), wsId: this.indexerWsId, }); @@ -243,7 +242,7 @@ export class IndexerWebsocket { maybeChannel, maybeId, socketNonNull: this.socket != null, - socketActive: this.socket?.isActive(), + socketActive: Boolean(this.socket?.isActive()), wsId: this.indexerWsId, }); return true; @@ -287,7 +286,7 @@ export class IndexerWebsocket { maybeChannel, maybeId, socketNonNull: this.socket != null, - socketActive: this.socket?.isActive(), + socketActive: Boolean(this.socket?.isActive()), wsId: this.indexerWsId, }); return true; @@ -413,7 +412,7 @@ export class IndexerWebsocket { socketUrl: this.socket?.url, wsId: this.indexerWsId, socketNonNull: this.socket != null, - socketActive: this.socket?.isActive(), + socketActive: Boolean(this.socket?.isActive()), subs: this.subscriptions.getAllSubscriptions().map((o) => `${o.channel}///${o.id}`), }); if (this.socket != null && this.socket.isActive()) {