From 31ad327d86b4624ad40dd0a71212dd1653eb86cf Mon Sep 17 00:00:00 2001 From: tyleroooo Date: Thu, 9 Jan 2025 17:35:43 -0500 Subject: [PATCH] fix(bonsai-core): work around indexer race condition by managing websocket value lifecycles more clearly (#1420) --- src/abacus-ts/lib/resourceCacheManager.ts | 20 ++++++-- .../lib/indexerValueManagerHelpers.ts | 48 +++++++++++++++++++ .../websocket/lib/reconnectingWebsocket.ts | 2 + .../websocket/lib/websocketDerivedValue.ts | 7 +-- src/abacus-ts/websocket/markets.ts | 24 +++++----- src/abacus-ts/websocket/orderbook.ts | 25 +++++----- src/abacus-ts/websocket/parentSubaccount.ts | 29 ++++++----- src/abacus-ts/websocket/trades.ts | 42 ++++++---------- 8 files changed, 121 insertions(+), 76 deletions(-) create mode 100644 src/abacus-ts/websocket/lib/indexerValueManagerHelpers.ts diff --git a/src/abacus-ts/lib/resourceCacheManager.ts b/src/abacus-ts/lib/resourceCacheManager.ts index 918976ecd..c2734d037 100644 --- a/src/abacus-ts/lib/resourceCacheManager.ts +++ b/src/abacus-ts/lib/resourceCacheManager.ts @@ -1,3 +1,5 @@ +import { logAbacusTsError } from '../logs'; + type CacheEntry = { resource: T; count: number; @@ -14,7 +16,7 @@ export class ResourceCacheManager { constructor( private options: { constructor: (key: U) => T; - destroyer: (resource: NoInfer) => void; + destroyer: (resource: NoInfer, key: NoInfer) => void; keySerializer: (key: NoInfer) => string; destroyDelayMs?: number; } @@ -42,7 +44,14 @@ export class ResourceCacheManager { markDone(key: U): void { const serializedKey = this.options.keySerializer(key); const entry = this.cache[serializedKey]; - if (!entry) return; + if (!entry) { + logAbacusTsError('ResourceCacheManager', 'tried to mark done unknown key', key); + return; + } + if (entry.count <= 0) { + logAbacusTsError('ResourceCacheManager', 'tried to mark done key with no subscribers', key); + entry.count = 1; + } entry.count -= 1; @@ -55,9 +64,12 @@ export class ResourceCacheManager { const delay = this.options.destroyDelayMs ?? 1000; entry.destroyTimeout = setTimeout(() => { const latestVal = this.cache[serializedKey]; - if (!latestVal) return; + if (!latestVal) { + logAbacusTsError('ResourceCacheManager', 'could not find resource to destroy', key); + return; + } - this.options.destroyer(latestVal.resource); + this.options.destroyer(latestVal.resource, key); delete this.cache[serializedKey]; }, delay); } diff --git a/src/abacus-ts/websocket/lib/indexerValueManagerHelpers.ts b/src/abacus-ts/websocket/lib/indexerValueManagerHelpers.ts new file mode 100644 index 000000000..dba78c1f7 --- /dev/null +++ b/src/abacus-ts/websocket/lib/indexerValueManagerHelpers.ts @@ -0,0 +1,48 @@ +import { ResourceCacheManager } from '@/abacus-ts/lib/resourceCacheManager'; +import stableStringify from 'fast-json-stable-stringify'; + +import { IndexerWebsocket } from './indexerWebsocket'; +import { IndexerWebsocketManager } from './indexerWebsocketManager'; +import { WebsocketDerivedValue } from './websocketDerivedValue'; + +// this is set to just above the websocket subscribe timeout because of race conditions in the indexer backend +const DESTROY_DELAY_MS = 21000; + +type WebsocketValueCreator = ( + websocket: IndexerWebsocket, + args: Args +) => WebsocketDerivedValue; + +export function makeWsValueManager( + creator: WebsocketValueCreator +): ResourceCacheManager, Args & { wsUrl: string }> { + return new ResourceCacheManager({ + constructor: (allArgs: Args & { wsUrl: string }) => + creator(IndexerWebsocketManager.use(allArgs.wsUrl), allArgs), + + destroyer: (instance, { wsUrl }) => { + instance.teardown(); + IndexerWebsocketManager.markDone(wsUrl); + }, + + // take care - extra properties on the key will cause divergent behavior + // (cache misses, unexpected new object creation, marking incorrect objects as done, etc) + // only ever pass the exact key type for correct behavior + keySerializer: (allArgs) => stableStringify(allArgs), + + destroyDelayMs: DESTROY_DELAY_MS, + }); +} + +export function subscribeToWsValue( + manager: ResourceCacheManager, Args & { wsUrl: string }>, + args: NoInfer & { wsUrl: string }, + handleChange: (val: NoInfer) => void +): () => void { + const value = manager.use(args); + const unsub = value.subscribe(handleChange); + return () => { + unsub(); + manager.markDone(args); + }; +} diff --git a/src/abacus-ts/websocket/lib/reconnectingWebsocket.ts b/src/abacus-ts/websocket/lib/reconnectingWebsocket.ts index 4c342f5fc..05189eb0e 100644 --- a/src/abacus-ts/websocket/lib/reconnectingWebsocket.ts +++ b/src/abacus-ts/websocket/lib/reconnectingWebsocket.ts @@ -196,6 +196,8 @@ class WebSocketConnection { 1001, // normal but no code 1005, + // supposedly abnormal tcp failure but super super common + 1006, ]); if (!allowedCodes.has(close.code)) { logAbacusTsError('WebSocketConnection', `socket ${this.id} closed abnormally`, { diff --git a/src/abacus-ts/websocket/lib/websocketDerivedValue.ts b/src/abacus-ts/websocket/lib/websocketDerivedValue.ts index 2868aff02..53912a015 100644 --- a/src/abacus-ts/websocket/lib/websocketDerivedValue.ts +++ b/src/abacus-ts/websocket/lib/websocketDerivedValue.ts @@ -18,15 +18,10 @@ export class WebsocketDerivedValue { handleBaseData: (data: any, value: T, fullMessage: any) => T; handleUpdates: (updates: any[], value: T, fullMessage: any) => T; }, - value: T, - changeHandler: ((val: T) => void) | undefined + value: T ) { this.value = value; - if (changeHandler) { - this.subscribe(changeHandler); - } - this.unsubFromWs = websocket.addChannelSubscription({ channel: sub.channel, id: sub.id, diff --git a/src/abacus-ts/websocket/markets.ts b/src/abacus-ts/websocket/markets.ts index 370d9f093..0e32bf265 100644 --- a/src/abacus-ts/websocket/markets.ts +++ b/src/abacus-ts/websocket/markets.ts @@ -11,16 +11,14 @@ import { setAllMarketsRaw } from '@/state/raw'; import { createStoreEffect } from '../lib/createStoreEffect'; import { Loadable, loadableLoaded, loadablePending } from '../lib/loadable'; +import { logAbacusTsError } from '../logs'; import { MarketsData } from '../rawTypes'; import { selectWebsocketUrl } from '../socketSelectors'; +import { makeWsValueManager, subscribeToWsValue } from './lib/indexerValueManagerHelpers'; import { IndexerWebsocket } from './lib/indexerWebsocket'; -import { IndexerWebsocketManager } from './lib/indexerWebsocketManager'; import { WebsocketDerivedValue } from './lib/websocketDerivedValue'; -function marketsWebsocketValue( - websocket: IndexerWebsocket, - onChange: (val: Loadable) => void -) { +function marketsWebsocketValueCreator(websocket: IndexerWebsocket) { return new WebsocketDerivedValue>( websocket, { @@ -34,8 +32,7 @@ function marketsWebsocketValue( const updates = isWsMarketUpdateResponses(baseUpdates); let startingValue = value.data; if (startingValue == null) { - // eslint-disable-next-line no-console - console.log('MarketsTracker found unexpectedly null base data in update'); + logAbacusTsError('MarketsTracker', 'found unexpectedly null base data in update'); return value; } startingValue = { ...startingValue }; @@ -65,24 +62,25 @@ function marketsWebsocketValue( return loadableLoaded(startingValue); }, }, - loadablePending(), - onChange + loadablePending() ); } +const MarketsValueManager = makeWsValueManager(marketsWebsocketValueCreator); + export function setUpMarkets(store: RootStore) { const throttledSetMarkets = throttle((val: Loadable) => { store.dispatch(setAllMarketsRaw(val)); }, 2 * timeUnits.second); return createStoreEffect(store, selectWebsocketUrl, (wsUrl) => { - const thisTracker = marketsWebsocketValue(IndexerWebsocketManager.use(wsUrl), (val) => + const unsub = subscribeToWsValue(MarketsValueManager, { wsUrl }, (val) => throttledSetMarkets(val) ); - return () => { - thisTracker.teardown(); - IndexerWebsocketManager.markDone(wsUrl); + unsub(); + throttledSetMarkets.cancel(); + store.dispatch(setAllMarketsRaw(loadablePending())); }; }); } diff --git a/src/abacus-ts/websocket/orderbook.ts b/src/abacus-ts/websocket/orderbook.ts index 95ee0e06a..49f56a3af 100644 --- a/src/abacus-ts/websocket/orderbook.ts +++ b/src/abacus-ts/websocket/orderbook.ts @@ -12,16 +12,16 @@ import { isTruthy } from '@/lib/isTruthy'; import { createStoreEffect } from '../lib/createStoreEffect'; import { Loadable, loadableIdle, loadableLoaded, loadablePending } from '../lib/loadable'; +import { logAbacusTsError } from '../logs'; import { OrderbookData } from '../rawTypes'; import { selectWebsocketUrl } from '../socketSelectors'; +import { makeWsValueManager, subscribeToWsValue } from './lib/indexerValueManagerHelpers'; import { IndexerWebsocket } from './lib/indexerWebsocket'; -import { IndexerWebsocketManager } from './lib/indexerWebsocketManager'; import { WebsocketDerivedValue } from './lib/websocketDerivedValue'; -function orderbookWebsocketValue( +function orderbookWebsocketValueCreator( websocket: IndexerWebsocket, - marketId: string, - onChange: (val: Loadable) => void + { marketId }: { marketId: string } ) { return new WebsocketDerivedValue>( websocket, @@ -45,8 +45,7 @@ function orderbookWebsocketValue( const updates = isWsOrderbookUpdateResponses(baseUpdates); let startingValue = value.data; if (startingValue == null) { - // eslint-disable-next-line no-console - console.log('MarketsTracker found unexpectedly null base data in update'); + logAbacusTsError('OrderbookTracker', 'found unexpectedly null base data in update'); return value; } startingValue = { asks: { ...startingValue.asks }, bids: { ...startingValue.bids } }; @@ -61,11 +60,12 @@ function orderbookWebsocketValue( return loadableLoaded(startingValue); }, }, - loadablePending(), - onChange + loadablePending() ); } +const OrderbookValueManager = makeWsValueManager(orderbookWebsocketValueCreator); + const selectMarketAndWsInfo = createAppSelector( [selectWebsocketUrl, getCurrentMarketIdIfTradeable], (wsUrl, currentMarketId) => ({ wsUrl, currentMarketId }) @@ -80,15 +80,14 @@ export function setUpOrderbook(store: RootStore) { store.dispatch(setOrderbookRaw({ marketId: currentMarketId, data })); }, timeUnits.second / 2); - const thisTracker = orderbookWebsocketValue( - IndexerWebsocketManager.use(wsUrl), - currentMarketId, + const unsub = subscribeToWsValue( + OrderbookValueManager, + { wsUrl, marketId: currentMarketId }, (data) => throttledSetOrderbook(data) ); return () => { - thisTracker.teardown(); - IndexerWebsocketManager.markDone(wsUrl); + unsub(); throttledSetOrderbook.cancel(); store.dispatch(setOrderbookRaw({ marketId: currentMarketId, data: loadableIdle() })); }; diff --git a/src/abacus-ts/websocket/parentSubaccount.ts b/src/abacus-ts/websocket/parentSubaccount.ts index 81698d3bf..faf721956 100644 --- a/src/abacus-ts/websocket/parentSubaccount.ts +++ b/src/abacus-ts/websocket/parentSubaccount.ts @@ -24,8 +24,8 @@ import { createStoreEffect } from '../lib/createStoreEffect'; import { Loadable, loadableIdle, loadableLoaded, loadablePending } from '../lib/loadable'; import { ChildSubaccountData, ParentSubaccountData } from '../rawTypes'; import { selectParentSubaccountInfo, selectWebsocketUrl } from '../socketSelectors'; +import { makeWsValueManager, subscribeToWsValue } from './lib/indexerValueManagerHelpers'; import { IndexerWebsocket } from './lib/indexerWebsocket'; -import { IndexerWebsocketManager } from './lib/indexerWebsocketManager'; import { WebsocketDerivedValue } from './lib/websocketDerivedValue'; function isValidSubaccount(childSubaccount: IndexerSubaccountResponseObject) { @@ -64,11 +64,14 @@ function freshChildSubaccount({ }; } -function accountWebsocketValue( +interface AccountValueArgsBase { + address: string; + parentSubaccountNumber: string; +} + +function accountWebsocketValueCreator( websocket: IndexerWebsocket, - address: string, - parentSubaccountNumber: string, - onChange: (val: Loadable) => void + { address, parentSubaccountNumber }: AccountValueArgsBase ) { return new WebsocketDerivedValue>( websocket, @@ -214,11 +217,12 @@ function accountWebsocketValue( return { ...value, data: resultData }; }, }, - loadablePending(), - onChange + loadablePending() ); } +const AccountValueManager = makeWsValueManager(accountWebsocketValueCreator); + const selectParentSubaccount = createAppSelector( [selectWebsocketUrl, selectParentSubaccountInfo], (wsUrl, { wallet, subaccount }) => ({ wsUrl, wallet, subaccount }) @@ -229,16 +233,15 @@ export function setUpParentSubaccount(store: RootStore) { if (!isTruthy(wallet) || subaccount == null) { return undefined; } - const thisTracker = accountWebsocketValue( - IndexerWebsocketManager.use(wsUrl), - wallet, - subaccount.toString(), + + const unsub = subscribeToWsValue( + AccountValueManager, + { wsUrl, address: wallet, parentSubaccountNumber: subaccount.toString() }, (val) => store.dispatch(setParentSubaccountRaw(val)) ); return () => { - thisTracker.teardown(); - IndexerWebsocketManager.markDone(wsUrl); + unsub(); store.dispatch(setParentSubaccountRaw(loadableIdle())); }; }); diff --git a/src/abacus-ts/websocket/trades.ts b/src/abacus-ts/websocket/trades.ts index 1262c1eaa..23237d8ee 100644 --- a/src/abacus-ts/websocket/trades.ts +++ b/src/abacus-ts/websocket/trades.ts @@ -3,28 +3,23 @@ import { useEffect, useState } from 'react'; import { isWsTradesResponse, isWsTradesUpdateResponses } from '@/types/indexer/indexerChecks'; import { orderBy } from 'lodash'; -import { DydxNetwork } from '@/constants/networks'; - -import { getSelectedNetwork } from '@/state/appSelectors'; import { useAppSelector } from '@/state/appTypes'; import { getCurrentMarketIdIfTradeable } from '@/state/perpetualsSelectors'; import { mergeById } from '@/lib/mergeById'; import { Loadable, loadableIdle, loadableLoaded, loadablePending } from '../lib/loadable'; -import { ResourceCacheManager } from '../lib/resourceCacheManager'; import { TradesData } from '../rawTypes'; -import { getWebsocketUrlForNetwork } from '../socketSelectors'; +import { selectWebsocketUrl } from '../socketSelectors'; +import { makeWsValueManager, subscribeToWsValue } from './lib/indexerValueManagerHelpers'; import { IndexerWebsocket } from './lib/indexerWebsocket'; -import { IndexerWebsocketManager } from './lib/indexerWebsocketManager'; import { WebsocketDerivedValue } from './lib/websocketDerivedValue'; const POST_LIMIT = 250; -function tradesWebsocketValue( +function tradesWebsocketValueCreator( websocket: IndexerWebsocket, - marketId: string, - onChange?: (val: Loadable) => void + { marketId }: { marketId: string } ) { return new WebsocketDerivedValue>( websocket, @@ -51,23 +46,16 @@ function tradesWebsocketValue( return loadableLoaded({ trades: sortedMerged.slice(0, POST_LIMIT) }); }, }, - loadablePending(), - onChange + loadablePending() ); } -export const TradeValuesManager = new ResourceCacheManager({ - constructor: ({ marketId, network }: { network: DydxNetwork; marketId: string }) => - tradesWebsocketValue(IndexerWebsocketManager.use(getWebsocketUrlForNetwork(network)), marketId), - destroyer: (instance) => { - instance.teardown(); - }, - keySerializer: ({ network, marketId }) => `${network}//////${marketId}`, -}); +export const TradeValuesManager = makeWsValueManager(tradesWebsocketValueCreator); export function useCurrentMarketTradesValue() { - const selectedNetwork = useAppSelector(getSelectedNetwork); + const wsUrl = useAppSelector(selectWebsocketUrl); const currentMarketId = useAppSelector(getCurrentMarketIdIfTradeable); + // useSyncExternalStore is better but the API doesn't fit this use case very well const [trades, setTrades] = useState>(loadableIdle()); @@ -75,17 +63,17 @@ export function useCurrentMarketTradesValue() { if (currentMarketId == null) { return () => null; } - const tradesManager = TradeValuesManager.use({ - marketId: currentMarketId, - network: selectedNetwork, - }); - const unsubListener = tradesManager.subscribe((val) => setTrades(val)); + + const unsubListener = subscribeToWsValue( + TradeValuesManager, + { wsUrl, marketId: currentMarketId }, + (val) => setTrades(val) + ); return () => { setTrades(loadableIdle()); unsubListener(); - TradeValuesManager.markDone({ marketId: currentMarketId, network: selectedNetwork }); }; - }, [selectedNetwork, currentMarketId]); + }, [currentMarketId, wsUrl]); return trades; }