From 30cb3128a51c654389c2e5dc7e94f161f0fb2018 Mon Sep 17 00:00:00 2001 From: tyleroooo Date: Tue, 14 Jan 2025 13:22:36 -0500 Subject: [PATCH 1/9] fix(bonsai-core): id can be null (#1430) --- .../websocket/lib/indexerWebsocket.ts | 79 ++++++++++++------- .../websocket/lib/reconnectingWebsocket.ts | 10 ++- src/hooks/useInitializePage.ts | 4 + src/lib/analytics/datadog.ts | 5 +- 4 files changed, 66 insertions(+), 32 deletions(-) diff --git a/src/abacus-ts/websocket/lib/indexerWebsocket.ts b/src/abacus-ts/websocket/lib/indexerWebsocket.ts index 43e36b337..3de72adbb 100644 --- a/src/abacus-ts/websocket/lib/indexerWebsocket.ts +++ b/src/abacus-ts/websocket/lib/indexerWebsocket.ts @@ -15,6 +15,9 @@ const CHANNEL_RETRY_COOLDOWN_MS = timeUnits.minute; export class IndexerWebsocket { private socket: ReconnectingWebSocket | null = null; + // for logging purposes, to differentiate when user has many tabs open + private indexerWsId = crypto.randomUUID(); + private subscriptions: { [channel: string]: { [id: string]: { @@ -82,7 +85,10 @@ export class IndexerWebsocket { }) => { this.subscriptions[channel] ??= {}; if (this.subscriptions[channel][id ?? NO_ID_SPECIAL_STRING_ID] != null) { - logAbacusTsError('IndexerWebsocket', 'this subscription already exists', `${channel}/${id}`); + logAbacusTsError('IndexerWebsocket', 'this subscription already exists', { + id: `${channel}/${id}`, + wsId: this.indexerWsId, + }); throw new Error(`IndexerWebsocket error: this subscription already exists. ${channel}/${id}`); } logAbacusTsInfo('IndexerWebsocket', 'adding subscription', { @@ -90,6 +96,7 @@ export class IndexerWebsocket { id, socketNonNull: this.socket != null, socketActive: this.socket?.isActive(), + wsId: this.indexerWsId, }); this.subscriptions[channel][id ?? NO_ID_SPECIAL_STRING_ID] = { channel, @@ -115,7 +122,7 @@ export class IndexerWebsocket { logAbacusTsError( 'IndexerWebsocket', 'unsubbing from nonexistent or already unsubbed channel', - channel + { channel, wsId: this.indexerWsId } ); return; } @@ -123,8 +130,7 @@ export class IndexerWebsocket { logAbacusTsError( 'IndexerWebsocket', 'unsubbing from nonexistent or already unsubbed channel', - channel, - id + { channel, id, wsId: this.indexerWsId } ); return; } @@ -133,6 +139,7 @@ export class IndexerWebsocket { id, socketNonNull: this.socket != null, socketActive: this.socket?.isActive(), + wsId: this.indexerWsId, }); if ( this.socket != null && @@ -148,8 +155,8 @@ export class IndexerWebsocket { delete this.subscriptions[channel][id ?? NO_ID_SPECIAL_STRING_ID]; }; - private _refreshSub = (channel: string, id: string) => { - const sub = this.subscriptions[channel]?.[id]; + private _refreshSub = (channel: string, id: string | undefined) => { + const sub = this.subscriptions[channel]?.[id ?? NO_ID_SPECIAL_STRING_ID]; if (sub == null) { return; } @@ -163,25 +170,31 @@ export class IndexerWebsocket { if (message.message.startsWith('Internal error, could not fetch data for subscription: ')) { const maybeChannel = message.channel; const maybeId = message.id; - if (maybeChannel != null && maybeId != null && maybeChannel.startsWith('v4_')) { - const channelAndId = `${maybeChannel}${CHANNEL_ID_SAFE_DIVIDER}${maybeId}`; + if (maybeChannel != null && maybeChannel.startsWith('v4_')) { + const channelAndId = `${maybeChannel}${CHANNEL_ID_SAFE_DIVIDER}${maybeId ?? NO_ID_SPECIAL_STRING_ID}`; const lastRefresh = this.lastRetryTimeMsByChannelAndId[channelAndId] ?? 0; if (Date.now() - lastRefresh > CHANNEL_RETRY_COOLDOWN_MS) { this.lastRetryTimeMsByChannelAndId[channelAndId] = Date.now(); this._refreshSub(maybeChannel, maybeId); - logAbacusTsInfo( - 'IndexerWebsocket', - 'error fetching data for channel, refetching', + logAbacusTsInfo('IndexerWebsocket', 'error fetching data for channel, refetching', { maybeChannel, - maybeId - ); + maybeId, + wsId: this.indexerWsId, + }); return; } - logAbacusTsError('IndexerWebsocket', 'hit max retries for channel:', maybeChannel, maybeId); + logAbacusTsError('IndexerWebsocket', 'hit max retries for channel:', { + maybeChannel, + maybeId, + wsId: this.indexerWsId, + }); return; } } - logAbacusTsError('IndexerWebsocket', 'encountered server side error:', message); + logAbacusTsError('IndexerWebsocket', 'encountered server side error:', { + message, + wsId: this.indexerWsId, + }); }; private _handleMessage = (messagePre: any) => { @@ -195,6 +208,7 @@ export class IndexerWebsocket { logAbacusTsInfo('IndexerWebsocket', `unsubscribe confirmed`, { channel: message.channel, id: message.id, + wsId: this.indexerWsId, }); } else if ( message.type === 'subscribed' || @@ -208,24 +222,22 @@ export class IndexerWebsocket { if (this.subscriptions[channel] == null) { // hide error for channel we expect to see it on if (channel !== 'v4_orderbook') { - logAbacusTsError( - 'IndexerWebsocket', - 'encountered message with unknown target', + logAbacusTsError('IndexerWebsocket', 'encountered message with unknown target', { channel, - id - ); + id, + wsId: this.indexerWsId, + }); } return; } if (this.subscriptions[channel][id ?? NO_ID_SPECIAL_STRING_ID] == null) { // hide error for channel we expect to see it on if (channel !== 'v4_orderbook') { - logAbacusTsError( - 'IndexerWebsocket', - 'encountered message with unknown target', + logAbacusTsError('IndexerWebsocket', 'encountered message with unknown target', { channel, - id - ); + id, + wsId: this.indexerWsId, + }); } return; } @@ -233,6 +245,7 @@ export class IndexerWebsocket { logAbacusTsInfo('IndexerWebsocket', `subscription confirmed`, { channel, id, + wsId: this.indexerWsId, }); this.subscriptions[channel][id ?? NO_ID_SPECIAL_STRING_ID]!.handleBaseData( message.contents, @@ -256,18 +269,25 @@ export class IndexerWebsocket { assertNever(message); } } catch (e) { - logAbacusTsError('IndexerWebsocket', 'Error handling websocket message', messagePre, e); + logAbacusTsError('IndexerWebsocket', 'Error handling websocket message', { + messagePre, + wsId: this.indexerWsId, + error: e, + }); } }; // when websocket churns, reconnect all known subscribers private _handleFreshConnect = () => { logAbacusTsInfo('IndexerWebsocket', 'freshly connected', { + socketUrl: this.socket?.url, + wsId: this.indexerWsId, socketNonNull: this.socket != null, socketActive: this.socket?.isActive(), - numSubs: Object.values(this.subscriptions) + subs: Object.values(this.subscriptions) .flatMap((o) => Object.values(o)) - .filter(isTruthy).length, + .filter(isTruthy) + .map((o) => `${o.channel}///${o.id}`), }); if (this.socket != null && this.socket.isActive()) { Object.values(this.subscriptions) @@ -286,7 +306,8 @@ export class IndexerWebsocket { } else { logAbacusTsError( 'IndexerWebsocket', - "handle fresh connect called when websocket isn't ready." + "handle fresh connect called when websocket isn't ready.", + { wsId: this.indexerWsId } ); } }; diff --git a/src/abacus-ts/websocket/lib/reconnectingWebsocket.ts b/src/abacus-ts/websocket/lib/reconnectingWebsocket.ts index 05189eb0e..73e80d342 100644 --- a/src/abacus-ts/websocket/lib/reconnectingWebsocket.ts +++ b/src/abacus-ts/websocket/lib/reconnectingWebsocket.ts @@ -1,5 +1,5 @@ /* eslint-disable max-classes-per-file */ -import { logAbacusTsError } from '@/abacus-ts/logs'; +import { logAbacusTsError, logAbacusTsInfo } from '@/abacus-ts/logs'; interface ReconnectingWebSocketConfig { url: string; @@ -11,7 +11,7 @@ interface ReconnectingWebSocketConfig { } export class ReconnectingWebSocket { - private readonly url: string; + public readonly url: string; private readonly handleMessage: (data: any) => void; @@ -205,6 +205,12 @@ class WebSocketConnection { reason: close.reason, clean: close.wasClean, }); + } else { + logAbacusTsInfo('WebSocketConnection', `socket ${this.id} closed`, { + code: close.code, + reason: close.reason, + clean: close.wasClean, + }); } this.close(); }; diff --git a/src/hooks/useInitializePage.ts b/src/hooks/useInitializePage.ts index 6c9a740ff..6b5523315 100644 --- a/src/hooks/useInitializePage.ts +++ b/src/hooks/useInitializePage.ts @@ -1,5 +1,7 @@ import { useEffect, useRef } from 'react'; +// eslint-disable-next-line no-restricted-imports +import { logAbacusTsInfo } from '@/abacus-ts/logs'; // eslint-disable-next-line no-restricted-imports import { IndexerWebsocketManager } from '@/abacus-ts/websocket/lib/indexerWebsocketManager'; @@ -50,6 +52,7 @@ export const useInitializePage = () => { // reconnect abacus (reestablish connections to indexer, validator etc.) if app was hidden for more than 10 seconds abacusStateManager.restart({ network: localStorageNetwork }); IndexerWebsocketManager.getActiveResources().forEach((r) => r.restart()); + logAbacusTsInfo('useInitializePage', 'restarting because visibility change'); } hiddenTimeRef.current = null; } @@ -66,6 +69,7 @@ export const useInitializePage = () => { const handleOnline = () => { abacusStateManager.restart({ network: localStorageNetwork }); IndexerWebsocketManager.getActiveResources().forEach((r) => r.restart()); + logAbacusTsInfo('useInitializePage', 'restarting because network status change'); }; document.addEventListener('online', handleOnline); return () => { diff --git a/src/lib/analytics/datadog.ts b/src/lib/analytics/datadog.ts index f296d2f4f..6931f41d6 100644 --- a/src/lib/analytics/datadog.ts +++ b/src/lib/analytics/datadog.ts @@ -7,6 +7,7 @@ const PROXY_URL = import.meta.env.VITE_DATADOG_PROXY_URL; const SERVICE_NAME = 'v4-web'; const LOGGER_NAME = 'v4-web'; const SITE_NAME = 'datadoghq.com'; +const instanceId = crypto.randomUUID(); const LOG_ENDPOINT_PATH = (PROXY_URL ?? '').endsWith('/') ? 'api/v2/logs' : '/api/v2/logs'; @@ -19,13 +20,15 @@ if (CLIENT_TOKEN) { sessionSampleRate: 100, env: CURRENT_MODE, proxy: PROXY_URL ? `${PROXY_URL}${LOG_ENDPOINT_PATH}` : undefined, + sendLogsAfterSessionExpiration: true, }); } datadogLogs.createLogger(LOGGER_NAME); -const datadogLogger = datadogLogs.getLogger(LOGGER_NAME)!!; +const datadogLogger = datadogLogs.getLogger(LOGGER_NAME)!; datadogLogger.setContextProperty('dd-client-token', CLIENT_TOKEN); +datadogLogger.setContextProperty('instance-id', instanceId); /** * TODO: make a logger wrapper that enables us also log to the console From 1af9fb7260768e01866947d7596e2c1d044b7285 Mon Sep 17 00:00:00 2001 From: tyleroooo Date: Tue, 14 Jan 2025 13:52:14 -0500 Subject: [PATCH 2/9] fix: allow smaller bottom panel because it seems nice (#1431) --- src/constants/layout.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/constants/layout.ts b/src/constants/layout.ts index d1c286f0a..6b0c1222a 100644 --- a/src/constants/layout.ts +++ b/src/constants/layout.ts @@ -3,5 +3,5 @@ export enum TradeLayouts { Reverse = 'Reverse', } -export const HORIZONTAL_PANEL_MIN_HEIGHT = 250; +export const HORIZONTAL_PANEL_MIN_HEIGHT = 200; export const HORIZONTAL_PANEL_MAX_HEIGHT = 700; From 1d699462d5d888709c03907fe7d6be96c643b9c5 Mon Sep 17 00:00:00 2001 From: Jared Vu Date: Tue, 14 Jan 2025 12:30:01 -0800 Subject: [PATCH 3/9] feat(bonsai-core): unopened isolated position calculators (#1432) --- src/abacus-ts/calculators/subaccount.ts | 41 +++++++++++++++++++++++++ src/abacus-ts/ontology.ts | 2 ++ src/abacus-ts/selectors/account.ts | 25 +++++++++++++++ 3 files changed, 68 insertions(+) diff --git a/src/abacus-ts/calculators/subaccount.ts b/src/abacus-ts/calculators/subaccount.ts index bb9532427..a661ef1b1 100644 --- a/src/abacus-ts/calculators/subaccount.ts +++ b/src/abacus-ts/calculators/subaccount.ts @@ -11,12 +11,14 @@ import { IndexerWsBaseMarketObject } from '@/types/indexer/indexerManual'; import { getAssetFromMarketId } from '@/lib/assetUtils'; import { calc } from '@/lib/do'; +import { isTruthy } from '@/lib/isTruthy'; import { BIG_NUMBERS, MaybeBigNumber, MustBigNumber, ToBigNumber } from '@/lib/numbers'; import { isPresent } from '@/lib/typeUtils'; import { ChildSubaccountData, MarketsData, ParentSubaccountData } from '../types/rawTypes'; import { GroupedSubaccountSummary, + SubaccountOrder, SubaccountPosition, SubaccountPositionBase, SubaccountPositionDerivedCore, @@ -288,3 +290,42 @@ function calculatePositionDerivedExtra( updatedUnrealizedPnlPercent, }; } + +export function calculateChildSubaccountSummaries( + parent: Omit, + markets: MarketsData +): Record { + return Object.fromEntries( + Object.values(parent.childSubaccounts) + .map((subaccount) => { + if (!subaccount) return undefined; + const summary = calculateSubaccountSummary(subaccount, markets); + return [subaccount.subaccountNumber, summary]; + }) + .filter(isTruthy) + ); +} + +/** + * + * UnopenedIsolatedPosition is a modified SubaccountOrder that meets the following criterias: + * - marginMode is ISOLATED + * - no existing position exists + * - equity of the childSubaccount is returned with the order + */ +export function calculateUnopenedIsolatedPositions( + childSubaccounts: Record, + orders: SubaccountOrder[], + positions: SubaccountPosition[] +): Array> { + const setOfOpenPositionMarkets = new Set(positions.map(({ market }) => market)); + + const filteredOrders = orders.filter( + (o) => !setOfOpenPositionMarkets.has(o.marketId) && o.marginMode === 'ISOLATED' + ); + + return filteredOrders.map((o) => ({ + ...o, + equity: childSubaccounts[o.subaccountNumber]?.equity ?? BIG_NUMBERS.ZERO, + })); +} diff --git a/src/abacus-ts/ontology.ts b/src/abacus-ts/ontology.ts index 667de779c..ba00a732a 100644 --- a/src/abacus-ts/ontology.ts +++ b/src/abacus-ts/ontology.ts @@ -15,6 +15,7 @@ import { selectParentSubaccountOpenPositionsLoading, selectParentSubaccountSummary, selectParentSubaccountSummaryLoading, + selectUnopenedIsolatedPositions, } from './selectors/account'; import { selectAllAssetsInfo, @@ -97,6 +98,7 @@ export const BonsaiHelpers = { fills: getCurrentMarketAccountFills, }, }, + unopenedIsolatedPositions: selectUnopenedIsolatedPositions, } as const satisfies NestedSelectors; export const BonsaiHooks = { diff --git a/src/abacus-ts/selectors/account.ts b/src/abacus-ts/selectors/account.ts index ad248ad02..b3196519c 100644 --- a/src/abacus-ts/selectors/account.ts +++ b/src/abacus-ts/selectors/account.ts @@ -14,9 +14,11 @@ import { calculateOrderHistory, } from '../calculators/orders'; import { + calculateChildSubaccountSummaries, calculateMarketsNeededForSubaccount, calculateParentSubaccountPositions, calculateParentSubaccountSummary, + calculateUnopenedIsolatedPositions, } from '../calculators/subaccount'; import { calculateTransfers } from '../calculators/transfers'; import { mergeLoadableStatus } from '../lib/mapLoadable'; @@ -98,6 +100,7 @@ export const selectParentSubaccountOpenPositions = createAppSelector( return positions?.filter((p) => p.status === IndexerPerpetualPositionStatus.OPEN); } ); + export const selectParentSubaccountOpenPositionsLoading = selectParentSubaccountSummaryLoading; export const selectAccountOrders = createAppSelector( @@ -142,6 +145,28 @@ export const selectAccountOrdersLoading = createAppSelector( mergeLoadableStatus ); +export const selectChildSubaccountSummaries = createAppSelector( + [selectRawParentSubaccountData, selectRelevantMarketsData], + (parentSubaccount, marketsData) => { + if (parentSubaccount == null || marketsData == null) { + return undefined; + } + + return calculateChildSubaccountSummaries(parentSubaccount, marketsData); + } +); + +export const selectUnopenedIsolatedPositions = createAppSelector( + [selectChildSubaccountSummaries, selectOpenOrders, selectParentSubaccountPositions], + (childSubaccounts, orders, positions) => { + if (childSubaccounts == null || positions == null) { + return undefined; + } + + return calculateUnopenedIsolatedPositions(childSubaccounts, orders, positions); + } +); + export const selectAccountFills = createAppSelector( [selectRawFillsRestData, selectRawFillsLiveData], (rest, live) => { From 93517b8c07f0132acf295cf88f2bfb880e90a6ac Mon Sep 17 00:00:00 2001 From: Jared Vu Date: Tue, 14 Jan 2025 16:13:55 -0800 Subject: [PATCH 4/9] fix(bonsai-core): asset position update (#1434) --- src/abacus-ts/websocket/parentSubaccount.ts | 8 ++++---- src/types/indexer/indexerManual.ts | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/abacus-ts/websocket/parentSubaccount.ts b/src/abacus-ts/websocket/parentSubaccount.ts index abe54f743..76b2530f0 100644 --- a/src/abacus-ts/websocket/parentSubaccount.ts +++ b/src/abacus-ts/websocket/parentSubaccount.ts @@ -140,13 +140,13 @@ function accountWebsocketValueCreator( const assetPositions = returnValue.childSubaccounts[positionUpdate.subaccountNumber]!.assetPositions; - if (assetPositions[positionUpdate.assetId] == null) { - assetPositions[positionUpdate.assetId] = + if (assetPositions[positionUpdate.symbol] == null) { + assetPositions[positionUpdate.symbol] = positionUpdate as IndexerAssetPositionResponseObject; } else { - assetPositions[positionUpdate.assetId] = { + assetPositions[positionUpdate.symbol] = { ...(assetPositions[ - positionUpdate.assetId + positionUpdate.symbol ] as IndexerAssetPositionResponseObject), ...positionUpdate, }; diff --git a/src/types/indexer/indexerManual.ts b/src/types/indexer/indexerManual.ts index de91a38d8..8ec0aca83 100644 --- a/src/types/indexer/indexerManual.ts +++ b/src/types/indexer/indexerManual.ts @@ -141,7 +141,7 @@ export interface IndexerWsParentSubaccountSubscribedResponse { export type IndexerWsAssetUpdate = Partial & { subaccountNumber: number; - assetId: string; + symbol: string; }; export type IndexerWsPositionUpdate = Partial & { subaccountNumber: number; From 95fc2ff514c15a09b2371090c7959c156de676cf Mon Sep 17 00:00:00 2001 From: tyleroooo Date: Tue, 14 Jan 2025 20:45:54 -0500 Subject: [PATCH 5/9] fix(bonsai-core): one more tiny log detail (#1435) --- src/abacus-ts/websocket/orderbook.ts | 4 +++- src/abacus-ts/websocket/parentSubaccount.ts | 3 ++- src/abacus-ts/websocket/trades.ts | 2 +- 3 files changed, 6 insertions(+), 3 deletions(-) diff --git a/src/abacus-ts/websocket/orderbook.ts b/src/abacus-ts/websocket/orderbook.ts index be5068198..e15bbf346 100644 --- a/src/abacus-ts/websocket/orderbook.ts +++ b/src/abacus-ts/websocket/orderbook.ts @@ -45,7 +45,9 @@ function orderbookWebsocketValueCreator( const updates = isWsOrderbookUpdateResponses(baseUpdates); let startingValue = value.data; if (startingValue == null) { - logAbacusTsError('OrderbookTracker', 'found unexpectedly null base data in update'); + logAbacusTsError('OrderbookTracker', 'found unexpectedly null base data in update', { + marketId, + }); return value; } startingValue = { asks: { ...startingValue.asks }, bids: { ...startingValue.bids } }; diff --git a/src/abacus-ts/websocket/parentSubaccount.ts b/src/abacus-ts/websocket/parentSubaccount.ts index 76b2530f0..d19eb9a25 100644 --- a/src/abacus-ts/websocket/parentSubaccount.ts +++ b/src/abacus-ts/websocket/parentSubaccount.ts @@ -120,7 +120,8 @@ function accountWebsocketValueCreator( if (value.data == null) { logAbacusTsError( 'ParentSubaccountTracker', - 'found unexpectedly null base data in update' + 'found unexpectedly null base data in update', + { address, subaccountNumber } ); return value; } diff --git a/src/abacus-ts/websocket/trades.ts b/src/abacus-ts/websocket/trades.ts index b423a48ca..9c17f02a3 100644 --- a/src/abacus-ts/websocket/trades.ts +++ b/src/abacus-ts/websocket/trades.ts @@ -38,7 +38,7 @@ function tradesWebsocketValueCreator( const startingValue = value.data; if (startingValue == null) { // eslint-disable-next-line no-console - console.log('MarketsTracker found unexpectedly null base data in update'); + console.log('MarketsTracker found unexpectedly null base data in update', { marketId }); return value; } const allNewTrades = updates.flatMap((u) => u.trades).toReversed(); From 800dd223ce9ac212e4a970b022764a8d290f68b3 Mon Sep 17 00:00:00 2001 From: Jared Vu Date: Wed, 15 Jan 2025 09:29:07 -0800 Subject: [PATCH 6/9] fix(bonsai-core): unopened isolated position struct change (#1436) --- .eslintrc.json | 6 +- src/abacus-ts/calculators/subaccount.ts | 75 ++++++++++++++++--------- src/abacus-ts/types/summaryTypes.ts | 9 +++ 3 files changed, 59 insertions(+), 31 deletions(-) diff --git a/.eslintrc.json b/.eslintrc.json index 3609ee35d..8945e1780 100644 --- a/.eslintrc.json +++ b/.eslintrc.json @@ -53,10 +53,10 @@ "error", { "patterns": [ - "@/abacus-ts/*", + "@/abacus-ts/*/*", "!@/abacus-ts/ontology", - "!@/abacus-ts/lib", - "!@/abacus-ts/summaryTypes" + "!@/abacus-ts/lib/*", + "!@/abacus-ts/types/summaryTypes" ] } ], diff --git a/src/abacus-ts/calculators/subaccount.ts b/src/abacus-ts/calculators/subaccount.ts index a661ef1b1..eed50b49b 100644 --- a/src/abacus-ts/calculators/subaccount.ts +++ b/src/abacus-ts/calculators/subaccount.ts @@ -1,5 +1,6 @@ import BigNumber from 'bignumber.js'; -import { mapValues, orderBy } from 'lodash'; +import { groupBy, map, mapValues, orderBy, pickBy } from 'lodash'; +import { weakMapMemoize } from 'reselect'; import { NUM_PARENT_SUBACCOUNTS } from '@/constants/account'; import { @@ -9,7 +10,11 @@ import { } from '@/types/indexer/indexerApiGen'; import { IndexerWsBaseMarketObject } from '@/types/indexer/indexerManual'; -import { getAssetFromMarketId } from '@/lib/assetUtils'; +import { + getAssetFromMarketId, + getDisplayableAssetFromBaseAsset, + getDisplayableTickerFromMarket, +} from '@/lib/assetUtils'; import { calc } from '@/lib/do'; import { isTruthy } from '@/lib/isTruthy'; import { BIG_NUMBERS, MaybeBigNumber, MustBigNumber, ToBigNumber } from '@/lib/numbers'; @@ -18,6 +23,7 @@ import { isPresent } from '@/lib/typeUtils'; import { ChildSubaccountData, MarketsData, ParentSubaccountData } from '../types/rawTypes'; import { GroupedSubaccountSummary, + PendingIsolatedPosition, SubaccountOrder, SubaccountPosition, SubaccountPositionBase, @@ -76,16 +82,15 @@ export function calculateMarketsNeededForSubaccount(parent: Omit { + const core = calculateSubaccountSummaryCore(subaccountData, markets); + return { + ...core, + ...calculateSubaccountSummaryDerived(core), + }; + } +); function calculateSubaccountSummaryCore( subaccountData: ChildSubaccountData, @@ -295,37 +300,51 @@ export function calculateChildSubaccountSummaries( parent: Omit, markets: MarketsData ): Record { - return Object.fromEntries( - Object.values(parent.childSubaccounts) - .map((subaccount) => { - if (!subaccount) return undefined; - const summary = calculateSubaccountSummary(subaccount, markets); - return [subaccount.subaccountNumber, summary]; - }) - .filter(isTruthy) + return pickBy( + mapValues( + parent.childSubaccounts, + (subaccount) => subaccount && calculateSubaccountSummary(subaccount, markets) + ), + isTruthy ); } /** - * - * UnopenedIsolatedPosition is a modified SubaccountOrder that meets the following criterias: + * @returns a list of pending isolated positions + * PendingIsolatedPosition is exists if there are any orders that meet the following criteria: * - marginMode is ISOLATED * - no existing position exists - * - equity of the childSubaccount is returned with the order + * - childSubaccount has equity */ export function calculateUnopenedIsolatedPositions( childSubaccounts: Record, orders: SubaccountOrder[], positions: SubaccountPosition[] -): Array> { +): PendingIsolatedPosition[] { const setOfOpenPositionMarkets = new Set(positions.map(({ market }) => market)); const filteredOrders = orders.filter( (o) => !setOfOpenPositionMarkets.has(o.marketId) && o.marginMode === 'ISOLATED' ); - return filteredOrders.map((o) => ({ - ...o, - equity: childSubaccounts[o.subaccountNumber]?.equity ?? BIG_NUMBERS.ZERO, - })); + const filteredOrdersMap = groupBy(filteredOrders, 'marketId'); + const marketIdToSubaccountNumber = mapValues( + filteredOrdersMap, + (filteredOrder) => filteredOrder[0]?.subaccountNumber + ); + + return map(filteredOrdersMap, (orderList, marketId) => { + const subaccountNumber = marketIdToSubaccountNumber[marketId]; + if (subaccountNumber == null) return undefined; + const assetId = getAssetFromMarketId(marketId); + + return { + assetId, + displayableAsset: getDisplayableAssetFromBaseAsset(assetId), + marketId, + displayId: getDisplayableTickerFromMarket(marketId), + equity: childSubaccounts[subaccountNumber]?.equity ?? BIG_NUMBERS.ZERO, + orders: orderList, + }; + }).filter(isTruthy); } diff --git a/src/abacus-ts/types/summaryTypes.ts b/src/abacus-ts/types/summaryTypes.ts index 6e80942ca..607be7b3f 100644 --- a/src/abacus-ts/types/summaryTypes.ts +++ b/src/abacus-ts/types/summaryTypes.ts @@ -147,4 +147,13 @@ export type SubaccountOrder = { marginMode: MarginMode | undefined; }; +export type PendingIsolatedPosition = { + marketId: string; + displayId: string; + assetId: string; + displayableAsset: string; + equity: BigNumber; + orders: SubaccountOrder[]; +}; + export type LiveTrade = BaseTrade; From 5218d74104ee56f8b7a991980a2b5f27e86dcf7f Mon Sep 17 00:00:00 2001 From: Jared Vu Date: Wed, 15 Jan 2025 13:58:49 -0800 Subject: [PATCH 7/9] feat(bonsai-core): modifyUsdcAssetPosition (#1423) --- src/abacus-ts/calculators/accountActions.ts | 174 +++++++++++++++++++ src/abacus-ts/lib/subaccountUtils.ts | 60 +++++++ src/abacus-ts/ontology.ts | 12 ++ src/abacus-ts/selectors/account.ts | 2 +- src/abacus-ts/selectors/accountActions.ts | 60 +++++++ src/abacus-ts/types/operationTypes.ts | 32 ++-- src/abacus-ts/websocket/parentSubaccount.ts | 44 +---- src/views/dialogs/DepositDialog2/queries.ts | 19 ++ src/views/dialogs/WithdrawDialog2/queries.ts | 21 +++ 9 files changed, 371 insertions(+), 53 deletions(-) create mode 100644 src/abacus-ts/calculators/accountActions.ts create mode 100644 src/abacus-ts/lib/subaccountUtils.ts create mode 100644 src/abacus-ts/selectors/accountActions.ts diff --git a/src/abacus-ts/calculators/accountActions.ts b/src/abacus-ts/calculators/accountActions.ts new file mode 100644 index 000000000..499bb5e96 --- /dev/null +++ b/src/abacus-ts/calculators/accountActions.ts @@ -0,0 +1,174 @@ +import { produce } from 'immer'; + +import { + IndexerAssetPositionResponseObject, + IndexerPositionSide, +} from '@/types/indexer/indexerApiGen'; + +import { MustBigNumber } from '@/lib/numbers'; + +import { freshChildSubaccount, newUsdcAssetPosition } from '../lib/subaccountUtils'; +import { + ModifyUsdcAssetPositionProps, + SubaccountBatchedOperations, + SubaccountOperations, +} from '../types/operationTypes'; +import { ParentSubaccountData } from '../types/rawTypes'; + +function addUsdcAssetPosition( + parentSubaccount: ParentSubaccountData, + payload: Pick +): ParentSubaccountData { + const { side, size, subaccountNumber } = payload; + return produce(parentSubaccount, (draftParentSubaccountData) => { + let childSubaccount = draftParentSubaccountData.childSubaccounts[subaccountNumber]; + + if (childSubaccount == null) { + // Upsert ChildSubaccountData into parentSubaccountData.childSubaccounts + const updatedChildSubaccount = freshChildSubaccount({ + address: draftParentSubaccountData.address, + subaccountNumber, + }); + + childSubaccount = { + ...updatedChildSubaccount, + assetPositions: { + ...updatedChildSubaccount.assetPositions, + USDC: newUsdcAssetPosition({ + side, + size, + subaccountNumber, + }), + }, + }; + } else { + if (childSubaccount.assetPositions.USDC == null) { + // Upsert USDC Asset Position + childSubaccount.assetPositions.USDC = newUsdcAssetPosition({ + side, + size, + subaccountNumber, + }); + } else { + if (childSubaccount.assetPositions.USDC.side !== side) { + const signedSizeBN = MustBigNumber(childSubaccount.assetPositions.USDC.size).minus(size); + + if (signedSizeBN.lte(0)) { + // New size flips the Asset Position Side + childSubaccount.assetPositions.USDC.side = + side === IndexerPositionSide.LONG + ? IndexerPositionSide.SHORT + : IndexerPositionSide.LONG; + childSubaccount.assetPositions.USDC.size = signedSizeBN.abs().toString(); + } else { + // Set the new size of the Asset Position + childSubaccount.assetPositions.USDC.size = signedSizeBN.toString(); + } + } else { + // Side is maintained, add the size to the existing position + childSubaccount.assetPositions.USDC.size = MustBigNumber( + childSubaccount.assetPositions.USDC.size + ) + .plus(size) + .toString(); + } + } + } + }); +} + +export function createUsdcDepositOperations( + parentSubaccount: ParentSubaccountData, + { + subaccountNumber, + depositAmount, + }: { + subaccountNumber: number; + depositAmount: string; + } +): SubaccountBatchedOperations { + const updatedParentSubaccountData = addUsdcAssetPosition(parentSubaccount, { + side: IndexerPositionSide.LONG, + size: depositAmount, + subaccountNumber, + }); + + if (updatedParentSubaccountData.childSubaccounts[subaccountNumber]?.assetPositions.USDC == null) { + throw new Error('USDC Asset Position was improperly modified'); + } + + return { + operations: [ + SubaccountOperations.ModifyUsdcAssetPosition({ + subaccountNumber, + changes: updatedParentSubaccountData.childSubaccounts[subaccountNumber].assetPositions.USDC, + }), + ], + }; +} + +export function createUsdcWithdrawalOperations( + parentSubaccount: ParentSubaccountData, + { + subaccountNumber, + withdrawAmount, + }: { + subaccountNumber: number; + withdrawAmount: string; + } +): SubaccountBatchedOperations { + const updatedParentSubaccountData = addUsdcAssetPosition(parentSubaccount, { + side: IndexerPositionSide.SHORT, + size: withdrawAmount, + subaccountNumber, + }); + + if (updatedParentSubaccountData.childSubaccounts[subaccountNumber]?.assetPositions.USDC == null) { + throw new Error('USDC Asset Position was improperly modified'); + } + + return { + operations: [ + SubaccountOperations.ModifyUsdcAssetPosition({ + subaccountNumber, + changes: updatedParentSubaccountData.childSubaccounts[subaccountNumber].assetPositions.USDC, + }), + ], + }; +} + +function modifyUsdcAssetPosition( + parentSubaccountData: ParentSubaccountData, + payload: ModifyUsdcAssetPositionProps +): ParentSubaccountData { + const { subaccountNumber, changes } = payload; + + return produce(parentSubaccountData, (draftParentSubaccountData) => { + if (draftParentSubaccountData.childSubaccounts[subaccountNumber]?.assetPositions.USDC != null) { + draftParentSubaccountData.childSubaccounts[subaccountNumber].assetPositions.USDC = changes; + } + }); +} + +export function applyOperationsToSubaccount( + parentSubaccount: ParentSubaccountData, + batchedOperations: SubaccountBatchedOperations +): ParentSubaccountData { + let parentSubaccountData: ParentSubaccountData = parentSubaccount; + + batchedOperations.operations.forEach((op) => { + SubaccountOperations.match(op, { + AddPerpetualPosition: () => { + // TODO: Implement addPerpetualPosition + }, + ModifyPerpetualPosition: () => { + // TODO: Implement modifyPerpetualPosition + }, + ModifyUsdcAssetPosition: (args) => { + parentSubaccountData = modifyUsdcAssetPosition(parentSubaccountData, args); + }, + }); + }); + + return parentSubaccountData; +} diff --git a/src/abacus-ts/lib/subaccountUtils.ts b/src/abacus-ts/lib/subaccountUtils.ts new file mode 100644 index 000000000..868c07604 --- /dev/null +++ b/src/abacus-ts/lib/subaccountUtils.ts @@ -0,0 +1,60 @@ +import { + IndexerPositionSide, + IndexerSubaccountResponseObject, +} from '@/types/indexer/indexerApiGen'; + +import { ChildSubaccountData } from '../types/rawTypes'; + +export function isValidSubaccount(childSubaccount: IndexerSubaccountResponseObject) { + return ( + Object.keys(childSubaccount.assetPositions).length > 0 || + Object.keys(childSubaccount.openPerpetualPositions).length > 0 + ); +} + +export function convertToStoredChildSubaccount({ + address, + subaccountNumber, + assetPositions, + openPerpetualPositions, +}: IndexerSubaccountResponseObject): ChildSubaccountData { + return { + address, + subaccountNumber, + assetPositions, + openPerpetualPositions, + }; +} + +export function freshChildSubaccount({ + address, + subaccountNumber, +}: { + address: string; + subaccountNumber: number; +}): ChildSubaccountData { + return { + address, + subaccountNumber, + assetPositions: {}, + openPerpetualPositions: {}, + }; +} + +export function newUsdcAssetPosition({ + side, + size, + subaccountNumber, +}: { + side: IndexerPositionSide; + size: string; + subaccountNumber: number; +}) { + return { + assetId: '0', + size, + subaccountNumber, + side, + symbol: 'USDC', + }; +} diff --git a/src/abacus-ts/ontology.ts b/src/abacus-ts/ontology.ts index ba00a732a..5b8bfcfcb 100644 --- a/src/abacus-ts/ontology.ts +++ b/src/abacus-ts/ontology.ts @@ -17,6 +17,10 @@ import { selectParentSubaccountSummaryLoading, selectUnopenedIsolatedPositions, } from './selectors/account'; +import { + createSelectParentSubaccountSummaryDeposit, + createSelectParentSubaccountSummaryWithdrawal, +} from './selectors/accountActions'; import { selectAllAssetsInfo, selectAllAssetsInfoLoading, @@ -98,6 +102,14 @@ export const BonsaiHelpers = { fills: getCurrentMarketAccountFills, }, }, + forms: { + deposit: { + createSelectParentSubaccountSummary: createSelectParentSubaccountSummaryDeposit, + }, + withdraw: { + createSelectParentSubaccountSummary: createSelectParentSubaccountSummaryWithdrawal, + }, + }, unopenedIsolatedPositions: selectUnopenedIsolatedPositions, } as const satisfies NestedSelectors; diff --git a/src/abacus-ts/selectors/account.ts b/src/abacus-ts/selectors/account.ts index b3196519c..7d025b4db 100644 --- a/src/abacus-ts/selectors/account.ts +++ b/src/abacus-ts/selectors/account.ts @@ -54,7 +54,7 @@ const selectRelevantMarketsList = createAppSelector( } ); -const selectRelevantMarketsData = createAppSelector( +export const selectRelevantMarketsData = createAppSelector( [selectRelevantMarketsList, selectRawMarketsData], (marketIds, markets) => { if (markets == null || marketIds == null) { diff --git a/src/abacus-ts/selectors/accountActions.ts b/src/abacus-ts/selectors/accountActions.ts new file mode 100644 index 000000000..5ea144373 --- /dev/null +++ b/src/abacus-ts/selectors/accountActions.ts @@ -0,0 +1,60 @@ +import { createAppSelector } from '@/state/appTypes'; + +import { + applyOperationsToSubaccount, + createUsdcDepositOperations, + createUsdcWithdrawalOperations, +} from '../calculators/accountActions'; +import { calculateParentSubaccountSummary } from '../calculators/subaccount'; +import { selectRelevantMarketsData } from './account'; +import { selectRawParentSubaccountData } from './base'; + +export const createSelectParentSubaccountSummaryDeposit = () => + createAppSelector( + [ + selectRawParentSubaccountData, + selectRelevantMarketsData, + ( + _s, + input: { + subaccountNumber: number; + depositAmount: string; + } + ) => input, + ], + (parentSubaccount, markets, depositInputs) => { + if (parentSubaccount == null || markets == null) { + return undefined; + } + + const operations = createUsdcDepositOperations(parentSubaccount, depositInputs); + const modifiedParentSubaccount = applyOperationsToSubaccount(parentSubaccount, operations); + const result = calculateParentSubaccountSummary(modifiedParentSubaccount, markets); + return result; + } + ); + +export const createSelectParentSubaccountSummaryWithdrawal = () => + createAppSelector( + [ + selectRawParentSubaccountData, + selectRelevantMarketsData, + ( + _s, + input: { + subaccountNumber: number; + withdrawAmount: string; + } + ) => input, + ], + (parentSubaccount, markets, withdrawalInputs) => { + if (parentSubaccount == null || markets == null) { + return undefined; + } + + const operations = createUsdcWithdrawalOperations(parentSubaccount, withdrawalInputs); + const modifiedParentSubaccount = applyOperationsToSubaccount(parentSubaccount, operations); + const result = calculateParentSubaccountSummary(modifiedParentSubaccount, markets); + return result; + } + ); diff --git a/src/abacus-ts/types/operationTypes.ts b/src/abacus-ts/types/operationTypes.ts index c8ef0a190..f691ea630 100644 --- a/src/abacus-ts/types/operationTypes.ts +++ b/src/abacus-ts/types/operationTypes.ts @@ -5,22 +5,26 @@ import { IndexerPerpetualPositionResponseObject, } from '@/types/indexer/indexerApiGen'; +export type AddPerpetualPositionProps = { + subaccountNumber: number; + market: string; + changes: Omit; +}; + +export type ModifyPerpetualPositionProps = { + changes: Partial>; +}; + +export type ModifyUsdcAssetPositionProps = { + subaccountNumber: number; + changes: IndexerAssetPositionResponseObject; +}; + export const SubaccountOperations = unionize( { - AddPerpetualPosition: ofType<{ - subaccountNumber: string; - market: string; - position: Omit; - }>(), - ModifyPerpetualPosition: ofType<{ - subaccountNumber: string; - market: string; - changes: Partial>; - }>(), - ModifyUsdcAssetPosition: ofType<{ - subaccountNumber: string; - changes: Partial>; - }>(), + AddPerpetualPosition: ofType(), + ModifyPerpetualPosition: ofType(), + ModifyUsdcAssetPosition: ofType(), }, { tag: 'operation' as const, value: 'payload' as const } ); diff --git a/src/abacus-ts/websocket/parentSubaccount.ts b/src/abacus-ts/websocket/parentSubaccount.ts index d19eb9a25..851279c86 100644 --- a/src/abacus-ts/websocket/parentSubaccount.ts +++ b/src/abacus-ts/websocket/parentSubaccount.ts @@ -5,7 +5,6 @@ import { IndexerAssetPositionResponseObject, IndexerOrderResponseObject, IndexerPerpetualPositionResponseObject, - IndexerSubaccountResponseObject, } from '@/types/indexer/indexerApiGen'; import { isWsParentSubaccountSubscribed, @@ -23,49 +22,18 @@ import { MustBigNumber } from '@/lib/numbers'; import { accountRefreshSignal } from '../accountRefreshSignal'; import { createStoreEffect } from '../lib/createStoreEffect'; import { Loadable, loadableIdle, loadableLoaded, loadablePending } from '../lib/loadable'; +import { + convertToStoredChildSubaccount, + freshChildSubaccount, + isValidSubaccount, +} from '../lib/subaccountUtils'; import { logAbacusTsError } from '../logs'; import { selectParentSubaccountInfo, selectWebsocketUrl } from '../socketSelectors'; -import { ChildSubaccountData, ParentSubaccountData } from '../types/rawTypes'; +import { ParentSubaccountData } from '../types/rawTypes'; import { makeWsValueManager, subscribeToWsValue } from './lib/indexerValueManagerHelpers'; import { IndexerWebsocket } from './lib/indexerWebsocket'; import { WebsocketDerivedValue } from './lib/websocketDerivedValue'; -function isValidSubaccount(childSubaccount: IndexerSubaccountResponseObject) { - return ( - Object.keys(childSubaccount.assetPositions).length > 0 || - Object.keys(childSubaccount.openPerpetualPositions).length > 0 - ); -} - -function convertToStoredChildSubaccount({ - address, - subaccountNumber, - assetPositions, - openPerpetualPositions, -}: IndexerSubaccountResponseObject): ChildSubaccountData { - return { - address, - subaccountNumber, - assetPositions, - openPerpetualPositions, - }; -} - -function freshChildSubaccount({ - address, - subaccountNumber, -}: { - address: string; - subaccountNumber: number; -}): ChildSubaccountData { - return { - address, - subaccountNumber, - assetPositions: {}, - openPerpetualPositions: {}, - }; -} - interface AccountValueArgsBase { address: string; parentSubaccountNumber: string; diff --git a/src/views/dialogs/DepositDialog2/queries.ts b/src/views/dialogs/DepositDialog2/queries.ts index 6d906a985..e0e295121 100644 --- a/src/views/dialogs/DepositDialog2/queries.ts +++ b/src/views/dialogs/DepositDialog2/queries.ts @@ -1,5 +1,6 @@ import { useMemo } from 'react'; +import { BonsaiHelpers } from '@/abacus-ts/ontology'; import { BalanceRequest, RouteRequest, SkipClient } from '@skip-go/client'; import { useQuery } from '@tanstack/react-query'; import { Chain, parseUnits } from 'viem'; @@ -14,6 +15,7 @@ import { WalletNetworkType } from '@/constants/wallets'; import { useSkipClient } from '@/hooks/transfers/skipClient'; import { useAccounts } from '@/hooks/useAccounts'; +import { useParameterizedSelector } from '@/hooks/useParameterizedSelector'; import { SourceAccount } from '@/state/wallet'; @@ -147,3 +149,20 @@ export function useDepositRoutes(token: TokenForTransfer, amount: string) { placeholderData: (prev) => prev, }); } + +export function useDepositDeltas({ depositAmount }: { depositAmount: string }) { + const depositInput = useMemo( + () => ({ + subaccountNumber: 0, + depositAmount, + }), + [depositAmount] + ); + + const modifiedParentSubaccount = useParameterizedSelector( + BonsaiHelpers.forms.deposit.createSelectParentSubaccountSummary, + depositInput + ); + + return modifiedParentSubaccount; +} diff --git a/src/views/dialogs/WithdrawDialog2/queries.ts b/src/views/dialogs/WithdrawDialog2/queries.ts index 6e7fce80e..01642df9e 100644 --- a/src/views/dialogs/WithdrawDialog2/queries.ts +++ b/src/views/dialogs/WithdrawDialog2/queries.ts @@ -1,3 +1,6 @@ +import { useMemo } from 'react'; + +import { BonsaiHelpers } from '@/abacus-ts/ontology'; import { RouteRequest, SkipClient } from '@skip-go/client'; import { useQuery } from '@tanstack/react-query'; import { parseUnits } from 'viem'; @@ -7,6 +10,7 @@ import { timeUnits } from '@/constants/time'; import { DYDX_CHAIN_USDC_DENOM, TokenForTransfer } from '@/constants/tokens'; import { useSkipClient } from '@/hooks/transfers/skipClient'; +import { useParameterizedSelector } from '@/hooks/useParameterizedSelector'; async function getSkipWithdrawalRoutes( skipClient: SkipClient, @@ -53,3 +57,20 @@ export function useWithdrawalRoutes({ retry: false, }); } + +export function useWithdrawalDeltas({ withdrawAmount }: { withdrawAmount: string }) { + const withdrawInput = useMemo( + () => ({ + subaccountNumber: 0, + withdrawAmount, + }), + [withdrawAmount] + ); + + const modifiedParentSubaccount = useParameterizedSelector( + BonsaiHelpers.forms.withdraw.createSelectParentSubaccountSummary, + withdrawInput + ); + + return modifiedParentSubaccount; +} From 143c5abba01181ff86e2afe001971b40eb8fa190 Mon Sep 17 00:00:00 2001 From: tyleroooo Date: Wed, 15 Jan 2025 17:21:30 -0500 Subject: [PATCH 8/9] feat(bonsai-core): candles (#1433) --- src/abacus-ts/lib/createStoreEffect.ts | 8 +- src/abacus-ts/types/rawTypes.ts | 8 -- src/abacus-ts/types/summaryTypes.ts | 11 +- src/abacus-ts/websocket/candles.ts | 51 +++++++++ .../websocket/candlesForTradingView.ts | 100 ++++++++++++++++++ src/abacus-ts/websocket/trades.ts | 12 ++- src/lib/tradingView/dydxfeed/index.ts | 2 + src/lib/tradingView/dydxfeed/streaming.ts | 4 + src/types/indexer/indexerChecks.ts | 4 + src/types/indexer/indexerManual.ts | 10 +- 10 files changed, 189 insertions(+), 21 deletions(-) create mode 100644 src/abacus-ts/websocket/candles.ts create mode 100644 src/abacus-ts/websocket/candlesForTradingView.ts diff --git a/src/abacus-ts/lib/createStoreEffect.ts b/src/abacus-ts/lib/createStoreEffect.ts index 76649d1d4..ada1ddc78 100644 --- a/src/abacus-ts/lib/createStoreEffect.ts +++ b/src/abacus-ts/lib/createStoreEffect.ts @@ -16,13 +16,17 @@ export function createStoreEffect( lastValue = thisValue; // NOTE: some cleanups dispatch actions which cause this to happen recursively. // we must ensure that those actions don't change the state they subscribe to or this will go infinitely - lastCleanup?.(); + const lastCleanupCached = lastCleanup; + lastCleanup = undefined; + lastCleanupCached?.(); lastCleanup = handleChange(thisValue); } }); return () => { - lastCleanup?.(); + const lastCleanupCached = lastCleanup; + lastCleanup = undefined; + lastCleanupCached?.(); removeStoreListener(); }; } diff --git a/src/abacus-ts/types/rawTypes.ts b/src/abacus-ts/types/rawTypes.ts index cfc903302..98fb90336 100644 --- a/src/abacus-ts/types/rawTypes.ts +++ b/src/abacus-ts/types/rawTypes.ts @@ -3,7 +3,6 @@ import { IndexerAssetPositionResponseObject, IndexerHistoricalBlockTradingReward, IndexerPerpetualPositionResponseObject, - IndexerTradeResponseObject, } from '@/types/indexer/indexerApiGen'; import { IndexerCompositeFillObject, @@ -12,8 +11,6 @@ import { IndexerWsBaseMarketObject, } from '@/types/indexer/indexerManual'; -import { PartialBy } from '@/lib/typeUtils'; - export type MarketsData = { [marketId: string]: IndexerWsBaseMarketObject }; export type OrdersData = { [orderId: string]: IndexerCompositeOrderObject }; @@ -22,11 +19,6 @@ export type OrderbookData = { asks: { [price: string]: string }; }; -export type BaseTrade = PartialBy; -export type TradesData = { - trades: Array; -}; - export interface ParentSubaccountData { address: string; parentSubaccount: number; diff --git a/src/abacus-ts/types/summaryTypes.ts b/src/abacus-ts/types/summaryTypes.ts index 607be7b3f..870eaf7d3 100644 --- a/src/abacus-ts/types/summaryTypes.ts +++ b/src/abacus-ts/types/summaryTypes.ts @@ -6,9 +6,10 @@ import { IndexerOrderType, IndexerPerpetualPositionResponseObject, } from '@/types/indexer/indexerApiGen'; -import { IndexerWsBaseMarketObject } from '@/types/indexer/indexerManual'; - -import { BaseTrade } from './rawTypes'; +import { + IndexerWsBaseMarketObject, + IndexerWsTradeResponseObject, +} from '@/types/indexer/indexerManual'; type ReplaceBigNumberInUnion = T extends string ? BigNumber : T; @@ -147,6 +148,8 @@ export type SubaccountOrder = { marginMode: MarginMode | undefined; }; +export type LiveTrade = IndexerWsTradeResponseObject; + export type PendingIsolatedPosition = { marketId: string; displayId: string; @@ -155,5 +158,3 @@ export type PendingIsolatedPosition = { equity: BigNumber; orders: SubaccountOrder[]; }; - -export type LiveTrade = BaseTrade; diff --git a/src/abacus-ts/websocket/candles.ts b/src/abacus-ts/websocket/candles.ts new file mode 100644 index 000000000..ddfaa5858 --- /dev/null +++ b/src/abacus-ts/websocket/candles.ts @@ -0,0 +1,51 @@ +import { orderBy } from 'lodash'; + +import { isWsCandlesResponse, isWsCandlesUpdateResponse } from '@/types/indexer/indexerChecks'; +import { IndexerWsCandleResponse } from '@/types/indexer/indexerManual'; + +import { Loadable, loadableLoaded, loadablePending } from '../lib/loadable'; +import { logAbacusTsError } from '../logs'; +import { makeWsValueManager } from './lib/indexerValueManagerHelpers'; +import { IndexerWebsocket } from './lib/indexerWebsocket'; +import { WebsocketDerivedValue } from './lib/websocketDerivedValue'; + +function candlesWebsocketValueCreator( + websocket: IndexerWebsocket, + { marketIdAndResolution }: { marketIdAndResolution: string } +) { + return new WebsocketDerivedValue>( + websocket, + { + channel: 'v4_candles', + id: marketIdAndResolution, + handleBaseData: (baseMessage) => { + const message = isWsCandlesResponse(baseMessage); + return loadableLoaded({ + candles: orderBy(message.candles, [(a) => a.startedAt], ['asc']), + }); + }, + handleUpdates: (baseUpdates, value) => { + const updates = isWsCandlesUpdateResponse(baseUpdates); + const startingValue = value.data; + if (startingValue == null) { + logAbacusTsError('CandlesTracker', 'found unexpectedly null base data in update'); + return value; + } + if (startingValue.candles.length === 0) { + return loadableLoaded({ candles: updates }); + } + + const allNewTimes = new Set(updates.map(({ startedAt }) => startedAt)); + const newArr = [ + ...updates, + ...startingValue.candles.filter(({ startedAt }) => !allNewTimes.has(startedAt)), + ]; + const sorted = orderBy(newArr, [(a) => a.startedAt], ['asc']); + return loadableLoaded({ candles: sorted }); + }, + }, + loadablePending() + ); +} + +export const CandlesValuesManager = makeWsValueManager(candlesWebsocketValueCreator); diff --git a/src/abacus-ts/websocket/candlesForTradingView.ts b/src/abacus-ts/websocket/candlesForTradingView.ts new file mode 100644 index 000000000..f5f01b732 --- /dev/null +++ b/src/abacus-ts/websocket/candlesForTradingView.ts @@ -0,0 +1,100 @@ +import { createStoreEffect } from '@/abacus-ts/lib/createStoreEffect'; +import { selectWebsocketUrl } from '@/abacus-ts/socketSelectors'; +import { CandlesValuesManager } from '@/abacus-ts/websocket/candles'; +import { subscribeToWsValue } from '@/abacus-ts/websocket/lib/indexerValueManagerHelpers'; +import type { + LibrarySymbolInfo, + ResolutionString, + SubscribeBarsCallback, +} from 'public/tradingview/charting_library'; + +import { CandleResolution, RESOLUTION_MAP } from '@/constants/candles'; + +import { type RootStore } from '@/state/_store'; + +import { mapCandle } from '../../lib/tradingView/utils'; + +export const subscriptionsByGuid: { + [guid: string]: + | { + guid: string; + unsub: () => void; + } + | undefined; +} = {}; + +export const subscribeOnStream = ({ + store, + orderbookCandlesToggleOn, + symbolInfo, + resolution, + onRealtimeCallback, + listenerGuid, + onResetCacheNeededCallback, +}: { + store: RootStore; + orderbookCandlesToggleOn: boolean; + symbolInfo: LibrarySymbolInfo; + resolution: ResolutionString; + onRealtimeCallback: SubscribeBarsCallback; + listenerGuid: string; + onResetCacheNeededCallback: Function; +}) => { + if (!symbolInfo.ticker) return; + + const channelId = `${symbolInfo.ticker}/${RESOLUTION_MAP[resolution]}`; + let isFirstRun = true; + + const tearDown = createStoreEffect(store, selectWebsocketUrl, (wsUrl) => { + // if the websocket url changes, force a refresh + if (isFirstRun) { + isFirstRun = false; + } else { + setTimeout(() => onResetCacheNeededCallback(), 0); + } + + let mostRecentFirstPointStartedAt: string | undefined; + const unsub = subscribeToWsValue( + CandlesValuesManager, + { wsUrl, marketIdAndResolution: channelId }, + ({ data }) => { + if (data == null || data.candles.length === 0) { + return; + } + // if we've never seen data before, it's either the existing data or the subscribed message + // either way, we take it as the basis and only send further updates + // there is a small race condition where messages could be missed between + // when trandingview does the rest query and when we start receiving updates + if (mostRecentFirstPointStartedAt == null) { + mostRecentFirstPointStartedAt = data.candles.at(-1)?.startedAt; + return; + } + data.candles.forEach((candle) => { + if (candle.startedAt >= mostRecentFirstPointStartedAt!) { + onRealtimeCallback( + mapCandle(orderbookCandlesToggleOn)({ + ...candle, + resolution: candle.resolution as unknown as CandleResolution, + orderbookMidPriceClose: candle.orderbookMidPriceClose ?? undefined, + orderbookMidPriceOpen: candle.orderbookMidPriceOpen ?? undefined, + }) + ); + } + }); + mostRecentFirstPointStartedAt = data.candles.at(-1)?.startedAt; + } + ); + + // happens on network change or unsubscribe from stream + return () => { + unsub(); + }; + }); + + subscriptionsByGuid[listenerGuid] = { guid: listenerGuid, unsub: tearDown }; +}; + +export const unsubscribeFromStream = (subscriberUID: string) => { + subscriptionsByGuid[subscriberUID]?.unsub(); + subscriptionsByGuid[subscriberUID] = undefined; +}; diff --git a/src/abacus-ts/websocket/trades.ts b/src/abacus-ts/websocket/trades.ts index 9c17f02a3..04de9d4d5 100644 --- a/src/abacus-ts/websocket/trades.ts +++ b/src/abacus-ts/websocket/trades.ts @@ -3,6 +3,7 @@ import { useEffect, useState } from 'react'; import { orderBy } from 'lodash'; import { isWsTradesResponse, isWsTradesUpdateResponses } from '@/types/indexer/indexerChecks'; +import { IndexerWsTradesUpdateObject } from '@/types/indexer/indexerManual'; import { useAppSelector } from '@/state/appTypes'; import { getCurrentMarketIdIfTradeable } from '@/state/perpetualsSelectors'; @@ -10,8 +11,8 @@ import { getCurrentMarketIdIfTradeable } from '@/state/perpetualsSelectors'; import { mergeById } from '@/lib/mergeById'; import { Loadable, loadableIdle, loadableLoaded, loadablePending } from '../lib/loadable'; +import { logAbacusTsError } from '../logs'; import { selectWebsocketUrl } from '../socketSelectors'; -import { TradesData } from '../types/rawTypes'; import { makeWsValueManager, subscribeToWsValue } from './lib/indexerValueManagerHelpers'; import { IndexerWebsocket } from './lib/indexerWebsocket'; import { WebsocketDerivedValue } from './lib/websocketDerivedValue'; @@ -22,7 +23,7 @@ function tradesWebsocketValueCreator( websocket: IndexerWebsocket, { marketId }: { marketId: string } ) { - return new WebsocketDerivedValue>( + return new WebsocketDerivedValue>( websocket, { channel: 'v4_trades', @@ -37,8 +38,9 @@ function tradesWebsocketValueCreator( const updates = isWsTradesUpdateResponses(baseUpdates); const startingValue = value.data; if (startingValue == null) { - // eslint-disable-next-line no-console - console.log('MarketsTracker found unexpectedly null base data in update', { marketId }); + logAbacusTsError('TradesTracker', 'found unexpectedly null base data in update', { + marketId, + }); return value; } const allNewTrades = updates.flatMap((u) => u.trades).toReversed(); @@ -58,7 +60,7 @@ export function useCurrentMarketTradesValue() { const currentMarketId = useAppSelector(getCurrentMarketIdIfTradeable); // useSyncExternalStore is better but the API doesn't fit this use case very well - const [trades, setTrades] = useState>(loadableIdle()); + const [trades, setTrades] = useState>(loadableIdle()); useEffect(() => { if (currentMarketId == null) { diff --git a/src/lib/tradingView/dydxfeed/index.ts b/src/lib/tradingView/dydxfeed/index.ts index 4db1c05f0..a8e75c80b 100644 --- a/src/lib/tradingView/dydxfeed/index.ts +++ b/src/lib/tradingView/dydxfeed/index.ts @@ -200,6 +200,8 @@ export const getDydxDatafeed = ( onResetCacheNeededCallback: Function ) => { subscribeOnStream({ + store, + orderbookCandlesToggleOn, symbolInfo, resolution, onRealtimeCallback: onTick, diff --git a/src/lib/tradingView/dydxfeed/streaming.ts b/src/lib/tradingView/dydxfeed/streaming.ts index ae830fea2..cd5415cf0 100644 --- a/src/lib/tradingView/dydxfeed/streaming.ts +++ b/src/lib/tradingView/dydxfeed/streaming.ts @@ -6,6 +6,8 @@ import type { import { RESOLUTION_MAP } from '@/constants/candles'; +import { type RootStore } from '@/state/_store'; + import abacusStateManager from '@/lib/abacus'; import { subscriptionsByChannelId } from './cache'; @@ -16,6 +18,8 @@ export const subscribeOnStream = ({ onRealtimeCallback, listenerGuid, }: { + store: RootStore; + orderbookCandlesToggleOn: boolean; symbolInfo: LibrarySymbolInfo; resolution: ResolutionString; onRealtimeCallback: SubscribeBarsCallback; diff --git a/src/types/indexer/indexerChecks.ts b/src/types/indexer/indexerChecks.ts index 7c646d69e..7ac377105 100644 --- a/src/types/indexer/indexerChecks.ts +++ b/src/types/indexer/indexerChecks.ts @@ -9,6 +9,8 @@ import { import { IndexerCompositeFillResponse, IndexerCompositeOrderObject, + IndexerWsCandleResponse, + IndexerWsCandleResponseObject, IndexerWsMarketUpdateResponse, IndexerWsOrderbookUpdateResponse, IndexerWsParentSubaccountSubscribedResponse, @@ -28,6 +30,8 @@ export const isWsOrderbookUpdateResponses = typia.createAssert(); export const isWsTradesResponse = typia.createAssert(); export const isWsTradesUpdateResponses = typia.createAssert(); +export const isWsCandlesResponse = typia.createAssert(); +export const isWsCandlesUpdateResponse = typia.createAssert(); export const isParentSubaccountFillResponse = typia.createAssert(); export const isParentSubaccountOrders = typia.createAssert(); export const isParentSubaccountTransferResponse = diff --git a/src/types/indexer/indexerManual.ts b/src/types/indexer/indexerManual.ts index 8ec0aca83..2fc356fa9 100644 --- a/src/types/indexer/indexerManual.ts +++ b/src/types/indexer/indexerManual.ts @@ -4,6 +4,7 @@ import { IndexerAPIOrderStatus, IndexerAPITimeInForce, IndexerAssetPositionResponseObject, + IndexerCandleResponseObject, IndexerFillType, IndexerHistoricalBlockTradingReward, IndexerIsoString, @@ -164,6 +165,13 @@ export interface IndexerWsParentSubaccountUpdateObject { transfers?: IndexerTransferCommonResponseObject; } +// hacking around backend types not quite matching what the websocket sends +export type IndexerWsTradeResponseObject = PartialBy; export interface IndexerWsTradesUpdateObject { - trades: PartialBy[]; + trades: IndexerWsTradeResponseObject[]; +} + +export type IndexerWsCandleResponseObject = Omit; +export interface IndexerWsCandleResponse { + candles: Array; } From 16e64c9f7fbe718b84d29146a049dea3695ea6b5 Mon Sep 17 00:00:00 2001 From: tyleroooo Date: Wed, 15 Jan 2025 17:32:50 -0500 Subject: [PATCH 9/9] fix(bonsai-core): indexer websocket cleanup and refactor, handle another error type (#1438) --- .../lib/indexerValueManagerHelpers.ts | 2 +- .../websocket/lib/indexerWebsocket.ts | 346 ++++++++++++------ .../websocket/lib/reconnectingWebsocket.ts | 10 +- .../websocket/lib/websocketDerivedValue.ts | 1 + 4 files changed, 239 insertions(+), 120 deletions(-) diff --git a/src/abacus-ts/websocket/lib/indexerValueManagerHelpers.ts b/src/abacus-ts/websocket/lib/indexerValueManagerHelpers.ts index dba78c1f7..5f1d06948 100644 --- a/src/abacus-ts/websocket/lib/indexerValueManagerHelpers.ts +++ b/src/abacus-ts/websocket/lib/indexerValueManagerHelpers.ts @@ -6,7 +6,7 @@ import { IndexerWebsocketManager } from './indexerWebsocketManager'; import { WebsocketDerivedValue } from './websocketDerivedValue'; // this is set to just above the websocket subscribe timeout because of race conditions in the indexer backend -const DESTROY_DELAY_MS = 21000; +const DESTROY_DELAY_MS = 23000; type WebsocketValueCreator = ( websocket: IndexerWebsocket, diff --git a/src/abacus-ts/websocket/lib/indexerWebsocket.ts b/src/abacus-ts/websocket/lib/indexerWebsocket.ts index 3de72adbb..ad2bf35dd 100644 --- a/src/abacus-ts/websocket/lib/indexerWebsocket.ts +++ b/src/abacus-ts/websocket/lib/indexerWebsocket.ts @@ -1,3 +1,4 @@ +/* eslint-disable max-classes-per-file */ import { logAbacusTsError, logAbacusTsInfo } from '@/abacus-ts/logs'; import typia from 'typia'; @@ -9,29 +10,88 @@ import { isTruthy } from '@/lib/isTruthy'; import { ReconnectingWebSocket } from './reconnectingWebsocket'; const NO_ID_SPECIAL_STRING_ID = '______EMPTY_ID______'; -const CHANNEL_ID_SAFE_DIVIDER = '//////////'; const CHANNEL_RETRY_COOLDOWN_MS = timeUnits.minute; +interface SubscriptionHandlerInput { + channel: string; + id: string | undefined; + batched: boolean; + handleBaseData: (data: any, fullMessage: any) => void; + handleUpdates: (updates: any[], fullMessage: any) => void; +} + +type SubscriptionHandlerTrackingMetadata = { + receivedBaseData: boolean; + sentSubMessage: boolean; + // using subs for this data means we lose it when the user fully unsubscribes and thus might actually retry more often than expected + // but this is fine since every consumer has subs wrapped in resource managers that prevent lots of churn + lastRetryBecauseErrorMs: number | undefined; + lastRetryBecauseDuplicateMs: number | undefined; +}; +type SubscriptionHandler = SubscriptionHandlerInput & SubscriptionHandlerTrackingMetadata; + +type SubscriptionMap = { + [channel: string]: { + [idOrSpecialString: string]: SubscriptionHandler; + }; +}; + +class SubscriptionManager { + private subscriptions: SubscriptionMap = {}; + + hasSubscription(channel: string, id?: string): boolean { + const normalizedId = id ?? NO_ID_SPECIAL_STRING_ID; + return Boolean(this.subscriptions[channel]?.[normalizedId]); + } + + getSubscription(channel: string, id?: string): SubscriptionHandler | undefined { + const normalizedId = id ?? NO_ID_SPECIAL_STRING_ID; + if (!this.hasSubscription(channel, normalizedId)) { + return undefined; + } + return this.subscriptions[channel]?.[normalizedId]; + } + + addSubscription(handler: SubscriptionHandler): boolean { + const normalizedId = handler.id ?? NO_ID_SPECIAL_STRING_ID; + const channel = handler.channel; + + if (this.hasSubscription(channel, normalizedId)) { + return false; + } + + this.subscriptions[channel] ??= {}; + this.subscriptions[channel][normalizedId] = handler; + return true; + } + + removeSubscription(channel: string, id: string | undefined): boolean { + const normalizedId = id ?? NO_ID_SPECIAL_STRING_ID; + if (!this.hasSubscription(channel, normalizedId)) { + return false; + } + delete this.subscriptions[channel]![normalizedId]; + return true; + } + + getAllSubscriptions(): SubscriptionHandler[] { + return Object.values(this.subscriptions) + .flatMap((channelSubs) => Object.values(channelSubs)) + .filter(isTruthy); + } + + getChannelSubscriptions(channel: string): SubscriptionHandler[] { + return Object.values(this.subscriptions[channel] ?? {}); + } +} + export class IndexerWebsocket { private socket: ReconnectingWebSocket | null = null; // for logging purposes, to differentiate when user has many tabs open private indexerWsId = crypto.randomUUID(); - private subscriptions: { - [channel: string]: { - [id: string]: { - channel: string; - id: string | undefined; - batched: boolean; - handleBaseData: (data: any, fullMessage: any) => void; - handleUpdates: (updates: any[], fullMessage: any) => void; - sentSubMessage: boolean; - }; - }; - } = {}; - - private lastRetryTimeMsByChannelAndId: { [channelAndId: string]: number } = {}; + private subscriptions = new SubscriptionManager(); constructor(url: string) { this.socket = new ReconnectingWebSocket({ @@ -57,13 +117,7 @@ export class IndexerWebsocket { batched = true, handleUpdates, handleBaseData, - }: { - channel: string; - id: string | undefined; - batched?: boolean; - handleBaseData: (data: any, fullMessage: any) => void; - handleUpdates: (data: any[], fullMessage: any) => void; - }): () => void { + }: SubscriptionHandlerInput): () => void { this._addSub({ channel, id, batched, handleUpdates, handleBaseData }); return () => { this._performUnsub({ channel, id }); @@ -76,38 +130,42 @@ export class IndexerWebsocket { batched = true, handleUpdates, handleBaseData, - }: { - channel: string; - id: string | undefined; - batched?: boolean; - handleBaseData: (data: any, fullMessage: any) => void; - handleUpdates: (data: any[], fullMessage: any) => void; - }) => { - this.subscriptions[channel] ??= {}; - if (this.subscriptions[channel][id ?? NO_ID_SPECIAL_STRING_ID] != null) { + + // below here is any metadata we want to allow maintaining between resubscribes + lastRetryBecauseErrorMs = undefined, + lastRetryBecauseDuplicateMs = undefined, + }: SubscriptionHandlerInput & Partial) => { + const wasSuccessful = this.subscriptions.addSubscription({ + channel, + id, + batched, + handleBaseData, + handleUpdates, + sentSubMessage: false, + receivedBaseData: false, + lastRetryBecauseErrorMs, + lastRetryBecauseDuplicateMs, + }); + + // fails if already exists + if (!wasSuccessful) { logAbacusTsError('IndexerWebsocket', 'this subscription already exists', { id: `${channel}/${id}`, wsId: this.indexerWsId, }); - throw new Error(`IndexerWebsocket error: this subscription already exists. ${channel}/${id}`); + return; } + logAbacusTsInfo('IndexerWebsocket', 'adding subscription', { channel, id, socketNonNull: this.socket != null, - socketActive: this.socket?.isActive(), + socketActive: Boolean(this.socket?.isActive()), wsId: this.indexerWsId, }); - this.subscriptions[channel][id ?? NO_ID_SPECIAL_STRING_ID] = { - channel, - id, - batched, - handleBaseData, - handleUpdates, - sentSubMessage: false, - }; + if (this.socket != null && this.socket.isActive()) { - this.subscriptions[channel][id ?? NO_ID_SPECIAL_STRING_ID]!.sentSubMessage = true; + this.subscriptions.getSubscription(channel, id)!.sentSubMessage = true; this.socket.send({ batched, channel, @@ -117,46 +175,47 @@ export class IndexerWebsocket { } }; - private _performUnsub = ({ channel, id }: { channel: string; id: string | undefined }) => { - if (this.subscriptions[channel] == null) { + private _performUnsub = ( + { channel, id }: { channel: string; id: string | undefined }, + // if true, don't send the unsub message, just remove from the internal data structure + shouldSuppressWsMessage: boolean = false + ) => { + const sub = this.subscriptions.getSubscription(channel, id); + const wasSuccessful = this.subscriptions.removeSubscription(channel, id); + + // only if doesn't exist + if (!wasSuccessful || sub == null) { logAbacusTsError( 'IndexerWebsocket', 'unsubbing from nonexistent or already unsubbed channel', - { channel, wsId: this.indexerWsId } + { channel, id, wsId: this.indexerWsId } ); return; } - if (this.subscriptions[channel][id ?? NO_ID_SPECIAL_STRING_ID] == null) { - logAbacusTsError( - 'IndexerWebsocket', - 'unsubbing from nonexistent or already unsubbed channel', - { channel, id, wsId: this.indexerWsId } - ); + + if (shouldSuppressWsMessage) { return; } + logAbacusTsInfo('IndexerWebsocket', 'removing subscription', { channel, id, socketNonNull: this.socket != null, - socketActive: this.socket?.isActive(), + socketActive: Boolean(this.socket?.isActive()), wsId: this.indexerWsId, }); - if ( - this.socket != null && - this.socket.isActive() && - this.subscriptions[channel][id ?? NO_ID_SPECIAL_STRING_ID]!.sentSubMessage - ) { + + if (this.socket != null && this.socket.isActive() && sub.sentSubMessage) { this.socket.send({ channel, id, type: 'unsubscribe', }); } - delete this.subscriptions[channel][id ?? NO_ID_SPECIAL_STRING_ID]; }; private _refreshSub = (channel: string, id: string | undefined) => { - const sub = this.subscriptions[channel]?.[id ?? NO_ID_SPECIAL_STRING_ID]; + const sub = this.subscriptions.getSubscription(channel, id); if (sub == null) { return; } @@ -164,33 +223,96 @@ export class IndexerWebsocket { this._addSub(sub); }; - // if we get a "could not fetch data" error, we retry once as long as this channel+id is not on cooldown - // TODO: remove this entirely when backend is more reliable - private _handleErrorReceived = (message: IndexerWebsocketErrorMessage) => { + private _maybeRetryTimeoutError = (message: IndexerWebsocketErrorMessage): boolean => { if (message.message.startsWith('Internal error, could not fetch data for subscription: ')) { const maybeChannel = message.channel; const maybeId = message.id; - if (maybeChannel != null && maybeChannel.startsWith('v4_')) { - const channelAndId = `${maybeChannel}${CHANNEL_ID_SAFE_DIVIDER}${maybeId ?? NO_ID_SPECIAL_STRING_ID}`; - const lastRefresh = this.lastRetryTimeMsByChannelAndId[channelAndId] ?? 0; - if (Date.now() - lastRefresh > CHANNEL_RETRY_COOLDOWN_MS) { - this.lastRetryTimeMsByChannelAndId[channelAndId] = Date.now(); + if ( + maybeChannel != null && + maybeChannel.startsWith('v4_') && + this.subscriptions.hasSubscription(maybeChannel, maybeId) + ) { + const sub = this.subscriptions.getSubscription(maybeChannel, maybeId)!; + const lastRefresh = sub.lastRetryBecauseErrorMs ?? 0; + const hasBaseData = sub.receivedBaseData; + if (!hasBaseData && Date.now() - lastRefresh > CHANNEL_RETRY_COOLDOWN_MS) { + sub.lastRetryBecauseErrorMs = Date.now(); this._refreshSub(maybeChannel, maybeId); - logAbacusTsInfo('IndexerWebsocket', 'error fetching data for channel, refetching', { + logAbacusTsInfo('IndexerWebsocket', 'error fetching subscription, refetching', { maybeChannel, maybeId, + socketNonNull: this.socket != null, + socketActive: Boolean(this.socket?.isActive()), wsId: this.indexerWsId, }); - return; + return true; + } + logAbacusTsError('IndexerWebsocket', 'error fetching subscription, not retrying:', { + maybeChannel, + maybeId, + hasBaseData, + elapsedSinceLast: Date.now() - lastRefresh, + wsId: this.indexerWsId, + }); + return true; + } + } + return false; + }; + + private _maybeFixDuplicateSubError = (message: IndexerWebsocketErrorMessage): boolean => { + if (message.message.startsWith('Invalid subscribe message: already subscribed (')) { + // temp until backend adds metadata + const parsedMatches = message.message.match(/\(([\w_]+)-(.+?)\)/); + const maybeChannel = parsedMatches?.[1]; + + let maybeId = parsedMatches?.[2]; + if (maybeId === maybeChannel) { + maybeId = undefined; + } + + if ( + maybeChannel != null && + maybeChannel.startsWith('v4_') && + this.subscriptions.hasSubscription(maybeChannel, maybeId) + ) { + const sub = this.subscriptions.getSubscription(maybeChannel, maybeId)!; + const lastRefresh = sub.lastRetryBecauseDuplicateMs ?? 0; + const hasBaseData = sub.receivedBaseData; + if (!hasBaseData && Date.now() - lastRefresh > CHANNEL_RETRY_COOLDOWN_MS) { + sub.lastRetryBecauseDuplicateMs = Date.now(); + this._refreshSub(maybeChannel, maybeId); + logAbacusTsInfo('IndexerWebsocket', 'error: subscription already exists, refetching', { + maybeChannel, + maybeId, + socketNonNull: this.socket != null, + socketActive: Boolean(this.socket?.isActive()), + wsId: this.indexerWsId, + }); + return true; } - logAbacusTsError('IndexerWebsocket', 'hit max retries for channel:', { + logAbacusTsError('IndexerWebsocket', 'error: subscription already exists, not retrying', { maybeChannel, maybeId, + hasBaseData, + elapsedSinceLast: Date.now() - lastRefresh, wsId: this.indexerWsId, }); - return; + return true; } } + return false; + }; + + // if we get a "could not fetch data" error, we retry once as long as this channel+id is not on cooldown + // TODO: remove this entirely when backend is more reliable + private _handleErrorReceived = (message: IndexerWebsocketErrorMessage) => { + let handled = this._maybeRetryTimeoutError(message); + // this ensures this isn't called if we already retried and tracks if we have handled it yet + handled = handled || this._maybeFixDuplicateSubError(message); + if (handled) { + return; + } logAbacusTsError('IndexerWebsocket', 'encountered server side error:', { message, wsId: this.indexerWsId, @@ -219,23 +341,14 @@ export class IndexerWebsocket { const channel = message.channel; const id = message.id; - if (this.subscriptions[channel] == null) { - // hide error for channel we expect to see it on - if (channel !== 'v4_orderbook') { - logAbacusTsError('IndexerWebsocket', 'encountered message with unknown target', { - channel, - id, - wsId: this.indexerWsId, - }); - } - return; - } - if (this.subscriptions[channel][id ?? NO_ID_SPECIAL_STRING_ID] == null) { + const sub = this.subscriptions.getSubscription(channel, id); + if (!this.subscriptions.hasSubscription(channel, id) || sub == null) { // hide error for channel we expect to see it on if (channel !== 'v4_orderbook') { - logAbacusTsError('IndexerWebsocket', 'encountered message with unknown target', { + logAbacusTsInfo('IndexerWebsocket', 'encountered message with unknown target', { channel, id, + type: message.type, wsId: this.indexerWsId, }); } @@ -247,21 +360,37 @@ export class IndexerWebsocket { id, wsId: this.indexerWsId, }); - this.subscriptions[channel][id ?? NO_ID_SPECIAL_STRING_ID]!.handleBaseData( - message.contents, - message - ); + sub.receivedBaseData = true; + sub.handleBaseData(message.contents, message); } else if (message.type === 'channel_data') { - this.subscriptions[channel][id ?? NO_ID_SPECIAL_STRING_ID]!.handleUpdates( - [message.contents], - message - ); + if (!sub.receivedBaseData) { + logAbacusTsError( + 'IndexerWebsocket', + 'message received before subscription confirmed, hiding', + { + channel, + id, + wsId: this.indexerWsId, + } + ); + return; + } + sub.handleUpdates([message.contents], message); // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition } else if (message.type === 'channel_batch_data') { - this.subscriptions[channel][id ?? NO_ID_SPECIAL_STRING_ID]!.handleUpdates( - message.contents, - message - ); + if (!sub.receivedBaseData) { + logAbacusTsError( + 'IndexerWebsocket', + 'message received before subscription confirmed, hiding', + { + channel, + id, + wsId: this.indexerWsId, + } + ); + return; + } + sub.handleUpdates(message.contents, message); } else { assertNever(message); } @@ -283,26 +412,15 @@ export class IndexerWebsocket { socketUrl: this.socket?.url, wsId: this.indexerWsId, socketNonNull: this.socket != null, - socketActive: this.socket?.isActive(), - subs: Object.values(this.subscriptions) - .flatMap((o) => Object.values(o)) - .filter(isTruthy) - .map((o) => `${o.channel}///${o.id}`), + socketActive: Boolean(this.socket?.isActive()), + subs: this.subscriptions.getAllSubscriptions().map((o) => `${o.channel}///${o.id}`), }); if (this.socket != null && this.socket.isActive()) { - Object.values(this.subscriptions) - .filter(isTruthy) - .flatMap((o) => Object.values(o)) - .filter(isTruthy) - .forEach(({ batched, channel, id }) => { - this.subscriptions[channel]![id ?? NO_ID_SPECIAL_STRING_ID]!.sentSubMessage = true; - this.socket!.send({ - batched, - channel, - id, - type: 'subscribe', - }); - }); + this.subscriptions.getAllSubscriptions().forEach(({ channel, id }) => { + const sub = this.subscriptions.getSubscription(channel, id)!; + this._performUnsub(sub, true); + this._addSub(sub); + }); } else { logAbacusTsError( 'IndexerWebsocket', diff --git a/src/abacus-ts/websocket/lib/reconnectingWebsocket.ts b/src/abacus-ts/websocket/lib/reconnectingWebsocket.ts index 73e80d342..784e05b99 100644 --- a/src/abacus-ts/websocket/lib/reconnectingWebsocket.ts +++ b/src/abacus-ts/websocket/lib/reconnectingWebsocket.ts @@ -154,7 +154,7 @@ class WebSocketConnection { this.ws = new WebSocket(url); this.setupEventHandlers(); } catch (error) { - logAbacusTsError('WebSocketConnection', 'error connecting', error); + logAbacusTsError('WebSocketConnection', 'error connecting', { error }); this.close(); // we don't rethrow because we instead call the handleClose method } @@ -169,7 +169,7 @@ class WebSocketConnection { const data = JSON.parse(event.data); this.handleMessage(this.id, data); } catch (e) { - logAbacusTsError('WebSocketConnection', 'error in handler', e); + logAbacusTsError('WebSocketConnection', 'error in handler', { data: event.data, error: e }); } }; @@ -180,7 +180,7 @@ class WebSocketConnection { try { this.handleConnected(this.id); } catch (e) { - logAbacusTsError('WebSocketConnection', 'error in handleConnected', e); + logAbacusTsError('WebSocketConnection', 'error in handleConnected', { error: e }); } }; @@ -224,7 +224,7 @@ class WebSocketConnection { this.ws?.close(); this.ws = null; } catch (e) { - logAbacusTsError('WebSocketConnection', 'error closing socket', e); + logAbacusTsError('WebSocketConnection', 'error closing socket', { error: e }); } } @@ -245,7 +245,7 @@ class WebSocketConnection { const message = typeof data === 'string' ? data : JSON.stringify(data); this.ws!.send(message); } catch (e) { - logAbacusTsError('WebSocketConnection', 'error sending data', e, data); + logAbacusTsError('WebSocketConnection', 'error sending data', { error: e, data }); } } } diff --git a/src/abacus-ts/websocket/lib/websocketDerivedValue.ts b/src/abacus-ts/websocket/lib/websocketDerivedValue.ts index 53912a015..8dceb2592 100644 --- a/src/abacus-ts/websocket/lib/websocketDerivedValue.ts +++ b/src/abacus-ts/websocket/lib/websocketDerivedValue.ts @@ -25,6 +25,7 @@ export class WebsocketDerivedValue { this.unsubFromWs = websocket.addChannelSubscription({ channel: sub.channel, id: sub.id, + batched: true, handleBaseData: (data, fullMessage) => this._setValue(sub.handleBaseData(data, this.value, fullMessage)), handleUpdates: (updates, fullMessage) =>