diff --git a/src/abacus-ts/websocket/lib/indexerValueManagerHelpers.ts b/src/abacus-ts/websocket/lib/indexerValueManagerHelpers.ts index dba78c1f7..40d5ca6c9 100644 --- a/src/abacus-ts/websocket/lib/indexerValueManagerHelpers.ts +++ b/src/abacus-ts/websocket/lib/indexerValueManagerHelpers.ts @@ -6,7 +6,7 @@ import { IndexerWebsocketManager } from './indexerWebsocketManager'; import { WebsocketDerivedValue } from './websocketDerivedValue'; // this is set to just above the websocket subscribe timeout because of race conditions in the indexer backend -const DESTROY_DELAY_MS = 21000; +const DESTROY_DELAY_MS = 22000; type WebsocketValueCreator = ( websocket: IndexerWebsocket, diff --git a/src/abacus-ts/websocket/lib/indexerWebsocket.ts b/src/abacus-ts/websocket/lib/indexerWebsocket.ts index 3de72adbb..7257ca75a 100644 --- a/src/abacus-ts/websocket/lib/indexerWebsocket.ts +++ b/src/abacus-ts/websocket/lib/indexerWebsocket.ts @@ -1,3 +1,4 @@ +/* eslint-disable max-classes-per-file */ import { logAbacusTsError, logAbacusTsInfo } from '@/abacus-ts/logs'; import typia from 'typia'; @@ -9,29 +10,87 @@ import { isTruthy } from '@/lib/isTruthy'; import { ReconnectingWebSocket } from './reconnectingWebsocket'; const NO_ID_SPECIAL_STRING_ID = '______EMPTY_ID______'; -const CHANNEL_ID_SAFE_DIVIDER = '//////////'; const CHANNEL_RETRY_COOLDOWN_MS = timeUnits.minute; +interface SubscriptionHandlerInput { + channel: string; + id: string | undefined; + batched: boolean; + handleBaseData: (data: any, fullMessage: any) => void; + handleUpdates: (updates: any[], fullMessage: any) => void; +} + +type SubscriptionHandlerTrackingMetadata = { + receivedBaseData: boolean; + sentSubMessage: boolean; + lastRetryBecauseErrorMs: number | undefined; + lastRetryBecauseDuplicateMs: number | undefined; +}; +type SubscriptionHandler = SubscriptionHandlerInput & SubscriptionHandlerTrackingMetadata; + +type SubscriptionMap = { + [channel: string]: { + [idOrSpecialString: string]: SubscriptionHandler; + }; +}; + +class SubscriptionManager { + private subscriptions: SubscriptionMap = {}; + + hasSubscription(channel: string, id?: string): boolean { + const normalizedId = id ?? NO_ID_SPECIAL_STRING_ID; + return Boolean(this.subscriptions[channel]?.[normalizedId]); + } + + getSubscription(channel: string, id?: string): SubscriptionHandler | undefined { + const normalizedId = id ?? NO_ID_SPECIAL_STRING_ID; + if (!this.hasSubscription(channel, normalizedId)) { + return undefined; + } + return this.subscriptions[channel]?.[normalizedId]; + } + + addSubscription(handler: SubscriptionHandler): boolean { + const normalizedId = handler.id ?? NO_ID_SPECIAL_STRING_ID; + const channel = handler.channel; + + if (this.hasSubscription(channel, normalizedId)) { + return false; + } + + this.subscriptions[channel] ??= {}; + this.subscriptions[channel][normalizedId] = handler; + return true; + } + + removeSubscription(channel: string, id: string | undefined): boolean { + const normalizedId = id ?? NO_ID_SPECIAL_STRING_ID; + if (!this.hasSubscription(channel, normalizedId)) { + return false; + } + delete this.subscriptions[channel]![normalizedId]; + return true; + } + + getAllSubscriptions(): SubscriptionHandler[] { + return Object.values(this.subscriptions) + .filter(isTruthy) + .flatMap((channelSubs) => Object.values(channelSubs)) + .filter(isTruthy); + } + + getChannelSubscriptions(channel: string): SubscriptionHandler[] { + return Object.values(this.subscriptions[channel] ?? {}); + } +} + export class IndexerWebsocket { private socket: ReconnectingWebSocket | null = null; // for logging purposes, to differentiate when user has many tabs open private indexerWsId = crypto.randomUUID(); - private subscriptions: { - [channel: string]: { - [id: string]: { - channel: string; - id: string | undefined; - batched: boolean; - handleBaseData: (data: any, fullMessage: any) => void; - handleUpdates: (updates: any[], fullMessage: any) => void; - sentSubMessage: boolean; - }; - }; - } = {}; - - private lastRetryTimeMsByChannelAndId: { [channelAndId: string]: number } = {}; + private subscriptions = new SubscriptionManager(); constructor(url: string) { this.socket = new ReconnectingWebSocket({ @@ -57,13 +116,7 @@ export class IndexerWebsocket { batched = true, handleUpdates, handleBaseData, - }: { - channel: string; - id: string | undefined; - batched?: boolean; - handleBaseData: (data: any, fullMessage: any) => void; - handleUpdates: (data: any[], fullMessage: any) => void; - }): () => void { + }: SubscriptionHandlerInput): () => void { this._addSub({ channel, id, batched, handleUpdates, handleBaseData }); return () => { this._performUnsub({ channel, id }); @@ -76,21 +129,32 @@ export class IndexerWebsocket { batched = true, handleUpdates, handleBaseData, - }: { - channel: string; - id: string | undefined; - batched?: boolean; - handleBaseData: (data: any, fullMessage: any) => void; - handleUpdates: (data: any[], fullMessage: any) => void; - }) => { - this.subscriptions[channel] ??= {}; - if (this.subscriptions[channel][id ?? NO_ID_SPECIAL_STRING_ID] != null) { + + // below here is any metadata we want to allow maintaining between resubscribes + lastRetryBecauseErrorMs = undefined, + lastRetryBecauseDuplicateMs = undefined, + }: SubscriptionHandlerInput & Partial) => { + const success = this.subscriptions.addSubscription({ + channel, + id, + batched, + handleBaseData, + handleUpdates, + sentSubMessage: false, + receivedBaseData: false, + lastRetryBecauseErrorMs, + lastRetryBecauseDuplicateMs, + }); + + // fails if already exists + if (!success) { logAbacusTsError('IndexerWebsocket', 'this subscription already exists', { id: `${channel}/${id}`, wsId: this.indexerWsId, }); - throw new Error(`IndexerWebsocket error: this subscription already exists. ${channel}/${id}`); + return; } + logAbacusTsInfo('IndexerWebsocket', 'adding subscription', { channel, id, @@ -98,16 +162,9 @@ export class IndexerWebsocket { socketActive: this.socket?.isActive(), wsId: this.indexerWsId, }); - this.subscriptions[channel][id ?? NO_ID_SPECIAL_STRING_ID] = { - channel, - id, - batched, - handleBaseData, - handleUpdates, - sentSubMessage: false, - }; + if (this.socket != null && this.socket.isActive()) { - this.subscriptions[channel][id ?? NO_ID_SPECIAL_STRING_ID]!.sentSubMessage = true; + this.subscriptions.getSubscription(channel, id)!.sentSubMessage = true; this.socket.send({ batched, channel, @@ -117,23 +174,28 @@ export class IndexerWebsocket { } }; - private _performUnsub = ({ channel, id }: { channel: string; id: string | undefined }) => { - if (this.subscriptions[channel] == null) { + private _performUnsub = ( + { channel, id }: { channel: string; id: string | undefined }, + // if true, don't send the unsub message, just remove from the internal data structure + forceSuppressWsMessage: boolean = false + ) => { + const sub = this.subscriptions.getSubscription(channel, id); + const success = this.subscriptions.removeSubscription(channel, id); + + // only if doesn't exist + if (!success || sub == null) { logAbacusTsError( 'IndexerWebsocket', 'unsubbing from nonexistent or already unsubbed channel', - { channel, wsId: this.indexerWsId } + { channel, id, wsId: this.indexerWsId } ); return; } - if (this.subscriptions[channel][id ?? NO_ID_SPECIAL_STRING_ID] == null) { - logAbacusTsError( - 'IndexerWebsocket', - 'unsubbing from nonexistent or already unsubbed channel', - { channel, id, wsId: this.indexerWsId } - ); + + if (forceSuppressWsMessage) { return; } + logAbacusTsInfo('IndexerWebsocket', 'removing subscription', { channel, id, @@ -141,22 +203,18 @@ export class IndexerWebsocket { socketActive: this.socket?.isActive(), wsId: this.indexerWsId, }); - if ( - this.socket != null && - this.socket.isActive() && - this.subscriptions[channel][id ?? NO_ID_SPECIAL_STRING_ID]!.sentSubMessage - ) { + + if (this.socket != null && this.socket.isActive() && sub.sentSubMessage) { this.socket.send({ channel, id, type: 'unsubscribe', }); } - delete this.subscriptions[channel][id ?? NO_ID_SPECIAL_STRING_ID]; }; private _refreshSub = (channel: string, id: string | undefined) => { - const sub = this.subscriptions[channel]?.[id ?? NO_ID_SPECIAL_STRING_ID]; + const sub = this.subscriptions.getSubscription(channel, id); if (sub == null) { return; } @@ -164,33 +222,96 @@ export class IndexerWebsocket { this._addSub(sub); }; - // if we get a "could not fetch data" error, we retry once as long as this channel+id is not on cooldown - // TODO: remove this entirely when backend is more reliable - private _handleErrorReceived = (message: IndexerWebsocketErrorMessage) => { + private _maybeRetryTimeoutError = (message: IndexerWebsocketErrorMessage): boolean => { if (message.message.startsWith('Internal error, could not fetch data for subscription: ')) { const maybeChannel = message.channel; const maybeId = message.id; - if (maybeChannel != null && maybeChannel.startsWith('v4_')) { - const channelAndId = `${maybeChannel}${CHANNEL_ID_SAFE_DIVIDER}${maybeId ?? NO_ID_SPECIAL_STRING_ID}`; - const lastRefresh = this.lastRetryTimeMsByChannelAndId[channelAndId] ?? 0; - if (Date.now() - lastRefresh > CHANNEL_RETRY_COOLDOWN_MS) { - this.lastRetryTimeMsByChannelAndId[channelAndId] = Date.now(); + if ( + maybeChannel != null && + maybeChannel.startsWith('v4_') && + this.subscriptions.hasSubscription(maybeChannel, maybeId) + ) { + const sub = this.subscriptions.getSubscription(maybeChannel, maybeId)!; + const lastRefresh = sub.lastRetryBecauseErrorMs ?? 0; + const hasBaseData = sub.receivedBaseData; + if (!hasBaseData && Date.now() - lastRefresh > CHANNEL_RETRY_COOLDOWN_MS) { + sub.lastRetryBecauseErrorMs = Date.now(); this._refreshSub(maybeChannel, maybeId); - logAbacusTsInfo('IndexerWebsocket', 'error fetching data for channel, refetching', { + logAbacusTsInfo('IndexerWebsocket', 'error fetching subscription, refetching', { maybeChannel, maybeId, + socketNonNull: this.socket != null, + socketActive: this.socket?.isActive(), wsId: this.indexerWsId, }); - return; + return true; + } + logAbacusTsError('IndexerWebsocket', 'error fetching subscription, not retrying:', { + maybeChannel, + maybeId, + hasBaseData, + elapsedSinceLast: Date.now() - lastRefresh, + wsId: this.indexerWsId, + }); + return true; + } + } + return false; + }; + + private _maybeFixDuplicateSubError = (message: IndexerWebsocketErrorMessage): boolean => { + if (message.message.startsWith('Invalid subscribe message: already subscribed (')) { + // temp until backend adds metadata + const parsedMatches = message.message.match(/\(([\w_]+)-(.+?)\)/); + const maybeChannel = parsedMatches?.[1]; + + let maybeId = parsedMatches?.[2]; + if (maybeId === maybeChannel) { + maybeId = undefined; + } + + if ( + maybeChannel != null && + maybeChannel.startsWith('v4_') && + this.subscriptions.hasSubscription(maybeChannel, maybeId) + ) { + const sub = this.subscriptions.getSubscription(maybeChannel, maybeId)!; + const lastRefresh = sub.lastRetryBecauseDuplicateMs ?? 0; + const hasBaseData = sub.receivedBaseData; + if (!hasBaseData && Date.now() - lastRefresh > CHANNEL_RETRY_COOLDOWN_MS) { + sub.lastRetryBecauseDuplicateMs = Date.now(); + this._refreshSub(maybeChannel, maybeId); + logAbacusTsInfo('IndexerWebsocket', 'error: subscription already exists, refetching', { + maybeChannel, + maybeId, + socketNonNull: this.socket != null, + socketActive: this.socket?.isActive(), + wsId: this.indexerWsId, + }); + return true; } - logAbacusTsError('IndexerWebsocket', 'hit max retries for channel:', { + logAbacusTsError('IndexerWebsocket', 'error: subscription already exists, not retrying', { maybeChannel, maybeId, + hasBaseData, + elapsedSinceLast: Date.now() - lastRefresh, wsId: this.indexerWsId, }); - return; + return true; } } + return false; + }; + + // if we get a "could not fetch data" error, we retry once as long as this channel+id is not on cooldown + // TODO: remove this entirely when backend is more reliable + private _handleErrorReceived = (message: IndexerWebsocketErrorMessage) => { + let handled = this._maybeRetryTimeoutError(message); + // this ensures this isn't called if we already retried and tracks if we have handled it yet + handled = handled || this._maybeFixDuplicateSubError(message); + if (handled) { + return; + } logAbacusTsError('IndexerWebsocket', 'encountered server side error:', { message, wsId: this.indexerWsId, @@ -219,23 +340,14 @@ export class IndexerWebsocket { const channel = message.channel; const id = message.id; - if (this.subscriptions[channel] == null) { - // hide error for channel we expect to see it on - if (channel !== 'v4_orderbook') { - logAbacusTsError('IndexerWebsocket', 'encountered message with unknown target', { - channel, - id, - wsId: this.indexerWsId, - }); - } - return; - } - if (this.subscriptions[channel][id ?? NO_ID_SPECIAL_STRING_ID] == null) { + const sub = this.subscriptions.getSubscription(channel, id); + if (!this.subscriptions.hasSubscription(channel, id) || sub == null) { // hide error for channel we expect to see it on if (channel !== 'v4_orderbook') { - logAbacusTsError('IndexerWebsocket', 'encountered message with unknown target', { + logAbacusTsInfo('IndexerWebsocket', 'encountered message with unknown target', { channel, id, + type: message.type, wsId: this.indexerWsId, }); } @@ -247,21 +359,37 @@ export class IndexerWebsocket { id, wsId: this.indexerWsId, }); - this.subscriptions[channel][id ?? NO_ID_SPECIAL_STRING_ID]!.handleBaseData( - message.contents, - message - ); + sub.receivedBaseData = true; + sub.handleBaseData(message.contents, message); } else if (message.type === 'channel_data') { - this.subscriptions[channel][id ?? NO_ID_SPECIAL_STRING_ID]!.handleUpdates( - [message.contents], - message - ); + if (!sub.receivedBaseData) { + logAbacusTsError( + 'IndexerWebsocket', + 'message received before subscription confirmed, hiding', + { + channel, + id, + wsId: this.indexerWsId, + } + ); + return; + } + sub.handleUpdates([message.contents], message); // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition } else if (message.type === 'channel_batch_data') { - this.subscriptions[channel][id ?? NO_ID_SPECIAL_STRING_ID]!.handleUpdates( - message.contents, - message - ); + if (!sub.receivedBaseData) { + logAbacusTsError( + 'IndexerWebsocket', + 'message received before subscription confirmed, hiding', + { + channel, + id, + wsId: this.indexerWsId, + } + ); + return; + } + sub.handleUpdates(message.contents, message); } else { assertNever(message); } @@ -284,25 +412,14 @@ export class IndexerWebsocket { wsId: this.indexerWsId, socketNonNull: this.socket != null, socketActive: this.socket?.isActive(), - subs: Object.values(this.subscriptions) - .flatMap((o) => Object.values(o)) - .filter(isTruthy) - .map((o) => `${o.channel}///${o.id}`), + subs: this.subscriptions.getAllSubscriptions().map((o) => `${o.channel}///${o.id}`), }); if (this.socket != null && this.socket.isActive()) { - Object.values(this.subscriptions) - .filter(isTruthy) - .flatMap((o) => Object.values(o)) - .filter(isTruthy) - .forEach(({ batched, channel, id }) => { - this.subscriptions[channel]![id ?? NO_ID_SPECIAL_STRING_ID]!.sentSubMessage = true; - this.socket!.send({ - batched, - channel, - id, - type: 'subscribe', - }); - }); + this.subscriptions.getAllSubscriptions().forEach(({ channel, id }) => { + const sub = this.subscriptions.getSubscription(channel, id)!; + this._performUnsub(sub, true); + this._addSub(sub); + }); } else { logAbacusTsError( 'IndexerWebsocket', diff --git a/src/abacus-ts/websocket/lib/reconnectingWebsocket.ts b/src/abacus-ts/websocket/lib/reconnectingWebsocket.ts index 73e80d342..784e05b99 100644 --- a/src/abacus-ts/websocket/lib/reconnectingWebsocket.ts +++ b/src/abacus-ts/websocket/lib/reconnectingWebsocket.ts @@ -154,7 +154,7 @@ class WebSocketConnection { this.ws = new WebSocket(url); this.setupEventHandlers(); } catch (error) { - logAbacusTsError('WebSocketConnection', 'error connecting', error); + logAbacusTsError('WebSocketConnection', 'error connecting', { error }); this.close(); // we don't rethrow because we instead call the handleClose method } @@ -169,7 +169,7 @@ class WebSocketConnection { const data = JSON.parse(event.data); this.handleMessage(this.id, data); } catch (e) { - logAbacusTsError('WebSocketConnection', 'error in handler', e); + logAbacusTsError('WebSocketConnection', 'error in handler', { data: event.data, error: e }); } }; @@ -180,7 +180,7 @@ class WebSocketConnection { try { this.handleConnected(this.id); } catch (e) { - logAbacusTsError('WebSocketConnection', 'error in handleConnected', e); + logAbacusTsError('WebSocketConnection', 'error in handleConnected', { error: e }); } }; @@ -224,7 +224,7 @@ class WebSocketConnection { this.ws?.close(); this.ws = null; } catch (e) { - logAbacusTsError('WebSocketConnection', 'error closing socket', e); + logAbacusTsError('WebSocketConnection', 'error closing socket', { error: e }); } } @@ -245,7 +245,7 @@ class WebSocketConnection { const message = typeof data === 'string' ? data : JSON.stringify(data); this.ws!.send(message); } catch (e) { - logAbacusTsError('WebSocketConnection', 'error sending data', e, data); + logAbacusTsError('WebSocketConnection', 'error sending data', { error: e, data }); } } }