diff --git a/src/abacus-ts/websocket/lib/indexerWebsocket.ts b/src/abacus-ts/websocket/lib/indexerWebsocket.ts index 2bef6bcaa..9663c501d 100644 --- a/src/abacus-ts/websocket/lib/indexerWebsocket.ts +++ b/src/abacus-ts/websocket/lib/indexerWebsocket.ts @@ -1,12 +1,15 @@ import { logAbacusTsError } from '@/abacus-ts/logs'; import typia from 'typia'; +import { timeUnits } from '@/constants/time'; + import { assertNever } from '@/lib/assertNever'; import { isTruthy } from '@/lib/isTruthy'; import { ReconnectingWebSocket } from './reconnectingWebsocket'; const NO_ID_SPECIAL_STRING_ID = '______EMPTY_ID______'; +const CHANNEL_RETRY_COOLDOWN_MS = timeUnits.minute; export class IndexerWebsocket { private socket: ReconnectingWebSocket | null = null; @@ -24,6 +27,8 @@ export class IndexerWebsocket { }; } = {}; + private lastRetryTimeMsByChannel: { [channel: string]: number } = {}; + constructor(url: string) { this.socket = new ReconnectingWebSocket({ url, @@ -55,6 +60,25 @@ export class IndexerWebsocket { handleBaseData: (data: any, fullMessage: any) => void; handleUpdates: (data: any[], fullMessage: any) => void; }): () => void { + this._addSub({ channel, id, batched, handleUpdates, handleBaseData }); + return () => { + this._performUnsub({ channel, id }); + }; + } + + private _addSub = ({ + channel, + id, + batched = true, + handleUpdates, + handleBaseData, + }: { + channel: string; + id: string | undefined; + batched?: boolean; + handleBaseData: (data: any, fullMessage: any) => void; + handleUpdates: (data: any[], fullMessage: any) => void; + }) => { this.subscriptions[channel] ??= {}; if (this.subscriptions[channel][id ?? NO_ID_SPECIAL_STRING_ID] != null) { logAbacusTsError('IndexerWebsocket', 'this subscription already exists', `${channel}/${id}`); @@ -77,46 +101,81 @@ export class IndexerWebsocket { type: 'subscribe', }); } + }; - return () => { - if (this.subscriptions[channel] == null) { - logAbacusTsError( - 'IndexerWebsocket', - 'unsubbing from nonexistent or already unsubbed channel', - channel - ); - return; - } - if (this.subscriptions[channel][id ?? NO_ID_SPECIAL_STRING_ID] == null) { - logAbacusTsError( - 'IndexerWebsocket', - 'unsubbing from nonexistent or already unsubbed channel', - channel, - id - ); + private _performUnsub = ({ channel, id }: { channel: string; id: string | undefined }) => { + if (this.subscriptions[channel] == null) { + logAbacusTsError( + 'IndexerWebsocket', + 'unsubbing from nonexistent or already unsubbed channel', + channel + ); + return; + } + if (this.subscriptions[channel][id ?? NO_ID_SPECIAL_STRING_ID] == null) { + logAbacusTsError( + 'IndexerWebsocket', + 'unsubbing from nonexistent or already unsubbed channel', + channel, + id + ); + return; + } + if ( + this.socket != null && + this.socket.isActive() && + this.subscriptions[channel][id ?? NO_ID_SPECIAL_STRING_ID]!.sentSubMessage + ) { + this.socket.send({ + channel, + id, + type: 'unsubscribe', + }); + } + delete this.subscriptions[channel][id ?? NO_ID_SPECIAL_STRING_ID]; + }; + + private _refreshChannelSubs = (channel: string) => { + const allSubs = Object.values(this.subscriptions[channel] ?? {}); + allSubs.forEach((sub) => { + this._performUnsub(sub); + this._addSub(sub); + }); + }; + + // if we get a "could not fetch data" error, we retry once as long as this channel is not on cooldown + // TODO: when backend adds the channel and id to the error message, use that to retry only one subscription + // TODO: remove this entirely when backend is more reliable + private _handleErrorReceived = (message: string) => { + if (message.startsWith('Internal error, could not fetch data for subscription: ')) { + const maybeChannel = message + .trim() + .split(/[\s,.]/) + .at(-2); + if (maybeChannel != null && maybeChannel.startsWith('v4_')) { + const lastRefresh = this.lastRetryTimeMsByChannel[maybeChannel] ?? 0; + if (Date.now() - lastRefresh > CHANNEL_RETRY_COOLDOWN_MS) { + this.lastRetryTimeMsByChannel[maybeChannel] = Date.now(); + this._refreshChannelSubs(maybeChannel); + logAbacusTsError( + 'IndexerWebsocket', + 'error fetching data for channel, refetching', + maybeChannel + ); + return; + } + logAbacusTsError('IndexerWebsocket', 'hit max retries for channel:', maybeChannel); return; } - if ( - this.socket != null && - this.socket.isActive() && - this.subscriptions[channel][id ?? NO_ID_SPECIAL_STRING_ID]!.sentSubMessage - ) { - this.socket.send({ - channel, - id, - type: 'unsubscribe', - }); - } - delete this.subscriptions[channel][id ?? NO_ID_SPECIAL_STRING_ID]; - }; - } + } + logAbacusTsError('IndexerWebsocket', 'encountered server side error:', message); + }; private _handleMessage = (messagePre: any) => { try { const message = isWsMessage(messagePre); if (message.type === 'error') { - // todo we should unsub and resub to the connection if we can - logAbacusTsError('IndexerWebsocket', 'encountered server side error:', message.message); + this._handleErrorReceived(message.message); } else if (message.type === 'connected' || message.type === 'unsubscribed') { // do nothing } else if ( @@ -128,6 +187,7 @@ export class IndexerWebsocket { const channel = message.channel; const id = message.id; if (this.subscriptions[channel] == null) { + // hide error for channel we expect to see it on if (channel !== 'v4_orderbook') { logAbacusTsError( 'IndexerWebsocket', @@ -139,6 +199,7 @@ export class IndexerWebsocket { return; } if (this.subscriptions[channel][id ?? NO_ID_SPECIAL_STRING_ID] == null) { + // hide error for channel we expect to see it on if (channel !== 'v4_orderbook') { logAbacusTsError( 'IndexerWebsocket', diff --git a/src/abacus-ts/websocket/lib/reconnectingWebsocket.ts b/src/abacus-ts/websocket/lib/reconnectingWebsocket.ts index 5786d4300..4c342f5fc 100644 --- a/src/abacus-ts/websocket/lib/reconnectingWebsocket.ts +++ b/src/abacus-ts/websocket/lib/reconnectingWebsocket.ts @@ -184,12 +184,26 @@ class WebSocketConnection { } }; - this.ws.onerror = (error) => { - logAbacusTsError('WebSocketConnection', `socket ${this.id} error encountered`, error); + this.ws.onerror = () => { this.close(); }; - this.ws.onclose = () => { + this.ws.onclose = (close) => { + const allowedCodes = new Set([ + // normal + 1000, + // going away (nav or graceful server shutdown) + 1001, + // normal but no code + 1005, + ]); + if (!allowedCodes.has(close.code)) { + logAbacusTsError('WebSocketConnection', `socket ${this.id} closed abnormally`, { + code: close.code, + reason: close.reason, + clean: close.wasClean, + }); + } this.close(); }; }