From e08069a1c78f3c75d568cf7dc4eb4533f83bd6cb Mon Sep 17 00:00:00 2001 From: Tyler Date: Tue, 14 Jan 2025 15:54:48 -0500 Subject: [PATCH 1/6] candles --- src/abacus-ts/types/rawTypes.ts | 8 ----- src/abacus-ts/websocket/candles.ts | 52 ++++++++++++++++++++++++++++++ src/abacus-ts/websocket/trades.ts | 10 +++--- src/types/indexer/indexerChecks.ts | 4 +++ src/types/indexer/indexerManual.ts | 10 +++++- 5 files changed, 70 insertions(+), 14 deletions(-) create mode 100644 src/abacus-ts/websocket/candles.ts diff --git a/src/abacus-ts/types/rawTypes.ts b/src/abacus-ts/types/rawTypes.ts index cfc903302..98fb90336 100644 --- a/src/abacus-ts/types/rawTypes.ts +++ b/src/abacus-ts/types/rawTypes.ts @@ -3,7 +3,6 @@ import { IndexerAssetPositionResponseObject, IndexerHistoricalBlockTradingReward, IndexerPerpetualPositionResponseObject, - IndexerTradeResponseObject, } from '@/types/indexer/indexerApiGen'; import { IndexerCompositeFillObject, @@ -12,8 +11,6 @@ import { IndexerWsBaseMarketObject, } from '@/types/indexer/indexerManual'; -import { PartialBy } from '@/lib/typeUtils'; - export type MarketsData = { [marketId: string]: IndexerWsBaseMarketObject }; export type OrdersData = { [orderId: string]: IndexerCompositeOrderObject }; @@ -22,11 +19,6 @@ export type OrderbookData = { asks: { [price: string]: string }; }; -export type BaseTrade = PartialBy; -export type TradesData = { - trades: Array; -}; - export interface ParentSubaccountData { address: string; parentSubaccount: number; diff --git a/src/abacus-ts/websocket/candles.ts b/src/abacus-ts/websocket/candles.ts new file mode 100644 index 000000000..f07c246e5 --- /dev/null +++ b/src/abacus-ts/websocket/candles.ts @@ -0,0 +1,52 @@ +import { orderBy } from 'lodash'; + +import { isWsCandlesResponse, isWsCandlesUpdateResponse } from '@/types/indexer/indexerChecks'; +import { IndexerWsCandleResponse } from '@/types/indexer/indexerManual'; + +import { Loadable, loadableLoaded, loadablePending } from '../lib/loadable'; +import { logAbacusTsError } from '../logs'; +import { makeWsValueManager } from './lib/indexerValueManagerHelpers'; +import { IndexerWebsocket } from './lib/indexerWebsocket'; +import { WebsocketDerivedValue } from './lib/websocketDerivedValue'; + +function candlesWebsocketValueCreator( + websocket: IndexerWebsocket, + { marketId }: { marketId: string } +) { + return new WebsocketDerivedValue>( + websocket, + { + channel: 'v4_candles', + id: marketId, + handleBaseData: (baseMessage) => { + const message = isWsCandlesResponse(baseMessage); + return loadableLoaded({ + // subscribed message is in descending order + candles: message.candles.toReversed(), + }); + }, + handleUpdates: (baseUpdates, value) => { + const updates = isWsCandlesUpdateResponse(baseUpdates); + const startingValue = value.data; + if (startingValue == null) { + logAbacusTsError('CandlesTracker', 'found unexpectedly null base data in update'); + return value; + } + if (startingValue.candles.length === 0) { + return loadableLoaded({ candles: updates }); + } + + const allNewTimes = new Set(updates.map(({ startedAt }) => startedAt)); + const newArr = [ + ...updates, + ...startingValue.candles.filter(({ startedAt }) => !allNewTimes.has(startedAt)), + ]; + const sorted = orderBy(newArr, [(a) => a.startedAt], ['desc']); + return loadableLoaded({ candles: sorted }); + }, + }, + loadablePending() + ); +} + +export const CandlesValuesManager = makeWsValueManager(candlesWebsocketValueCreator); diff --git a/src/abacus-ts/websocket/trades.ts b/src/abacus-ts/websocket/trades.ts index b423a48ca..25db6e732 100644 --- a/src/abacus-ts/websocket/trades.ts +++ b/src/abacus-ts/websocket/trades.ts @@ -3,6 +3,7 @@ import { useEffect, useState } from 'react'; import { orderBy } from 'lodash'; import { isWsTradesResponse, isWsTradesUpdateResponses } from '@/types/indexer/indexerChecks'; +import { IndexerWsTradesUpdateObject } from '@/types/indexer/indexerManual'; import { useAppSelector } from '@/state/appTypes'; import { getCurrentMarketIdIfTradeable } from '@/state/perpetualsSelectors'; @@ -10,8 +11,8 @@ import { getCurrentMarketIdIfTradeable } from '@/state/perpetualsSelectors'; import { mergeById } from '@/lib/mergeById'; import { Loadable, loadableIdle, loadableLoaded, loadablePending } from '../lib/loadable'; +import { logAbacusTsError } from '../logs'; import { selectWebsocketUrl } from '../socketSelectors'; -import { TradesData } from '../types/rawTypes'; import { makeWsValueManager, subscribeToWsValue } from './lib/indexerValueManagerHelpers'; import { IndexerWebsocket } from './lib/indexerWebsocket'; import { WebsocketDerivedValue } from './lib/websocketDerivedValue'; @@ -22,7 +23,7 @@ function tradesWebsocketValueCreator( websocket: IndexerWebsocket, { marketId }: { marketId: string } ) { - return new WebsocketDerivedValue>( + return new WebsocketDerivedValue>( websocket, { channel: 'v4_trades', @@ -37,8 +38,7 @@ function tradesWebsocketValueCreator( const updates = isWsTradesUpdateResponses(baseUpdates); const startingValue = value.data; if (startingValue == null) { - // eslint-disable-next-line no-console - console.log('MarketsTracker found unexpectedly null base data in update'); + logAbacusTsError('TradesTracker', 'found unexpectedly null base data in update'); return value; } const allNewTrades = updates.flatMap((u) => u.trades).toReversed(); @@ -58,7 +58,7 @@ export function useCurrentMarketTradesValue() { const currentMarketId = useAppSelector(getCurrentMarketIdIfTradeable); // useSyncExternalStore is better but the API doesn't fit this use case very well - const [trades, setTrades] = useState>(loadableIdle()); + const [trades, setTrades] = useState>(loadableIdle()); useEffect(() => { if (currentMarketId == null) { diff --git a/src/types/indexer/indexerChecks.ts b/src/types/indexer/indexerChecks.ts index 7c646d69e..7ac377105 100644 --- a/src/types/indexer/indexerChecks.ts +++ b/src/types/indexer/indexerChecks.ts @@ -9,6 +9,8 @@ import { import { IndexerCompositeFillResponse, IndexerCompositeOrderObject, + IndexerWsCandleResponse, + IndexerWsCandleResponseObject, IndexerWsMarketUpdateResponse, IndexerWsOrderbookUpdateResponse, IndexerWsParentSubaccountSubscribedResponse, @@ -28,6 +30,8 @@ export const isWsOrderbookUpdateResponses = typia.createAssert(); export const isWsTradesResponse = typia.createAssert(); export const isWsTradesUpdateResponses = typia.createAssert(); +export const isWsCandlesResponse = typia.createAssert(); +export const isWsCandlesUpdateResponse = typia.createAssert(); export const isParentSubaccountFillResponse = typia.createAssert(); export const isParentSubaccountOrders = typia.createAssert(); export const isParentSubaccountTransferResponse = diff --git a/src/types/indexer/indexerManual.ts b/src/types/indexer/indexerManual.ts index de91a38d8..85241fc10 100644 --- a/src/types/indexer/indexerManual.ts +++ b/src/types/indexer/indexerManual.ts @@ -4,6 +4,7 @@ import { IndexerAPIOrderStatus, IndexerAPITimeInForce, IndexerAssetPositionResponseObject, + IndexerCandleResponseObject, IndexerFillType, IndexerHistoricalBlockTradingReward, IndexerIsoString, @@ -164,6 +165,13 @@ export interface IndexerWsParentSubaccountUpdateObject { transfers?: IndexerTransferCommonResponseObject; } +// hacking around backend types not quite matching what the websocket sends +export type IndexerWsTradeResponseObject = PartialBy; export interface IndexerWsTradesUpdateObject { - trades: PartialBy[]; + trades: IndexerWsTradeResponseObject[]; +} + +export type IndexerWsCandleResponseObject = Omit; +export interface IndexerWsCandleResponse { + candles: Array; } From 82885bd79f563b26af7be77ab2848ed54678d67e Mon Sep 17 00:00:00 2001 From: Tyler Date: Tue, 14 Jan 2025 15:56:55 -0500 Subject: [PATCH 2/6] fix --- src/abacus-ts/websocket/candles.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/abacus-ts/websocket/candles.ts b/src/abacus-ts/websocket/candles.ts index f07c246e5..4fc961a47 100644 --- a/src/abacus-ts/websocket/candles.ts +++ b/src/abacus-ts/websocket/candles.ts @@ -11,13 +11,13 @@ import { WebsocketDerivedValue } from './lib/websocketDerivedValue'; function candlesWebsocketValueCreator( websocket: IndexerWebsocket, - { marketId }: { marketId: string } + { marketIdAndResolution }: { marketIdAndResolution: string } ) { return new WebsocketDerivedValue>( websocket, { channel: 'v4_candles', - id: marketId, + id: marketIdAndResolution, handleBaseData: (baseMessage) => { const message = isWsCandlesResponse(baseMessage); return loadableLoaded({ From 3a548d242044c0d15aae89fc0ab1b2b53df38da8 Mon Sep 17 00:00:00 2001 From: Tyler Date: Tue, 14 Jan 2025 17:38:19 -0500 Subject: [PATCH 3/6] fix --- src/abacus-ts/websocket/candles.ts | 5 ++--- src/lib/tradingView/dydxfeed/index.ts | 2 ++ src/lib/tradingView/dydxfeed/streaming.ts | 4 ++++ 3 files changed, 8 insertions(+), 3 deletions(-) diff --git a/src/abacus-ts/websocket/candles.ts b/src/abacus-ts/websocket/candles.ts index 4fc961a47..ddfaa5858 100644 --- a/src/abacus-ts/websocket/candles.ts +++ b/src/abacus-ts/websocket/candles.ts @@ -21,8 +21,7 @@ function candlesWebsocketValueCreator( handleBaseData: (baseMessage) => { const message = isWsCandlesResponse(baseMessage); return loadableLoaded({ - // subscribed message is in descending order - candles: message.candles.toReversed(), + candles: orderBy(message.candles, [(a) => a.startedAt], ['asc']), }); }, handleUpdates: (baseUpdates, value) => { @@ -41,7 +40,7 @@ function candlesWebsocketValueCreator( ...updates, ...startingValue.candles.filter(({ startedAt }) => !allNewTimes.has(startedAt)), ]; - const sorted = orderBy(newArr, [(a) => a.startedAt], ['desc']); + const sorted = orderBy(newArr, [(a) => a.startedAt], ['asc']); return loadableLoaded({ candles: sorted }); }, }, diff --git a/src/lib/tradingView/dydxfeed/index.ts b/src/lib/tradingView/dydxfeed/index.ts index 4db1c05f0..a8e75c80b 100644 --- a/src/lib/tradingView/dydxfeed/index.ts +++ b/src/lib/tradingView/dydxfeed/index.ts @@ -200,6 +200,8 @@ export const getDydxDatafeed = ( onResetCacheNeededCallback: Function ) => { subscribeOnStream({ + store, + orderbookCandlesToggleOn, symbolInfo, resolution, onRealtimeCallback: onTick, diff --git a/src/lib/tradingView/dydxfeed/streaming.ts b/src/lib/tradingView/dydxfeed/streaming.ts index ae830fea2..cd5415cf0 100644 --- a/src/lib/tradingView/dydxfeed/streaming.ts +++ b/src/lib/tradingView/dydxfeed/streaming.ts @@ -6,6 +6,8 @@ import type { import { RESOLUTION_MAP } from '@/constants/candles'; +import { type RootStore } from '@/state/_store'; + import abacusStateManager from '@/lib/abacus'; import { subscriptionsByChannelId } from './cache'; @@ -16,6 +18,8 @@ export const subscribeOnStream = ({ onRealtimeCallback, listenerGuid, }: { + store: RootStore; + orderbookCandlesToggleOn: boolean; symbolInfo: LibrarySymbolInfo; resolution: ResolutionString; onRealtimeCallback: SubscribeBarsCallback; From 7f144e8d00987f3030a01031499196aa1dcae36b Mon Sep 17 00:00:00 2001 From: Tyler Date: Tue, 14 Jan 2025 17:40:15 -0500 Subject: [PATCH 4/6] fix --- .../websocket/candlesForTradingView.ts | 97 +++++++++++++++++++ 1 file changed, 97 insertions(+) create mode 100644 src/abacus-ts/websocket/candlesForTradingView.ts diff --git a/src/abacus-ts/websocket/candlesForTradingView.ts b/src/abacus-ts/websocket/candlesForTradingView.ts new file mode 100644 index 000000000..0b1653878 --- /dev/null +++ b/src/abacus-ts/websocket/candlesForTradingView.ts @@ -0,0 +1,97 @@ +import { createStoreEffect } from '@/abacus-ts/lib/createStoreEffect'; +import { selectWebsocketUrl } from '@/abacus-ts/socketSelectors'; +import { CandlesValuesManager } from '@/abacus-ts/websocket/candles'; +import { subscribeToWsValue } from '@/abacus-ts/websocket/lib/indexerValueManagerHelpers'; +import type { + LibrarySymbolInfo, + ResolutionString, + SubscribeBarsCallback, +} from 'public/tradingview/charting_library'; + +import { CandleResolution, RESOLUTION_MAP } from '@/constants/candles'; + +import { type RootStore } from '@/state/_store'; + +import { mapCandle } from '../../lib/tradingView/utils'; + +export const subscriptionsByGuid: { + [guid: string]: { + guid: string; + unsub: () => void; + }; +} = {}; + +export const subscribeOnStream = ({ + store, + orderbookCandlesToggleOn, + symbolInfo, + resolution, + onRealtimeCallback, + listenerGuid, + onResetCacheNeededCallback, +}: { + store: RootStore; + orderbookCandlesToggleOn: boolean; + symbolInfo: LibrarySymbolInfo; + resolution: ResolutionString; + onRealtimeCallback: SubscribeBarsCallback; + listenerGuid: string; + onResetCacheNeededCallback: Function; +}) => { + if (!symbolInfo.ticker) return; + + const channelId = `${symbolInfo.ticker}/${RESOLUTION_MAP[resolution]}`; + let isFirstRun = true; + + const tearDown = createStoreEffect(store, selectWebsocketUrl, (wsUrl) => { + // if the websocket url changes, force a refresh + if (isFirstRun) { + isFirstRun = false; + } else { + onResetCacheNeededCallback(); + } + + let mostRecentFirstPointStartedAt: string | undefined; + const unsub = subscribeToWsValue( + CandlesValuesManager, + { wsUrl, marketIdAndResolution: channelId }, + ({ data }) => { + if (data == null || data.candles.length === 0) { + return; + } + // if we've never seen data before, it's either the existing data or the subscribed message + // either way, we take it as the basis and only send further updates + // there is a small race condition where messages could be missed between + // when trandingview does the rest query and when we start receiving updates + if (mostRecentFirstPointStartedAt == null) { + mostRecentFirstPointStartedAt = data.candles.at(-1)?.startedAt; + return; + } + data.candles.forEach((candle) => { + if (candle.startedAt >= mostRecentFirstPointStartedAt!) { + onRealtimeCallback( + mapCandle(orderbookCandlesToggleOn)({ + ...candle, + resolution: candle.resolution as unknown as CandleResolution, + orderbookMidPriceClose: candle.orderbookMidPriceClose ?? undefined, + orderbookMidPriceOpen: candle.orderbookMidPriceOpen ?? undefined, + }) + ); + } + }); + mostRecentFirstPointStartedAt = data.candles.at(-1)?.startedAt; + } + ); + + // happens on network change or unsubscribe from stream + return () => { + unsub(); + }; + }); + + subscriptionsByGuid[listenerGuid] = { guid: listenerGuid, unsub: tearDown }; +}; + +export const unsubscribeFromStream = (subscriberUID: string) => { + subscriptionsByGuid[subscriberUID]?.unsub(); +}; From 91d6b1be93dfe28b0ba20030cade57d9d1221770 Mon Sep 17 00:00:00 2001 From: Tyler Date: Tue, 14 Jan 2025 17:41:18 -0500 Subject: [PATCH 5/6] fix --- src/abacus-ts/types/summaryTypes.ts | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/src/abacus-ts/types/summaryTypes.ts b/src/abacus-ts/types/summaryTypes.ts index 6e80942ca..5a4eed586 100644 --- a/src/abacus-ts/types/summaryTypes.ts +++ b/src/abacus-ts/types/summaryTypes.ts @@ -6,9 +6,10 @@ import { IndexerOrderType, IndexerPerpetualPositionResponseObject, } from '@/types/indexer/indexerApiGen'; -import { IndexerWsBaseMarketObject } from '@/types/indexer/indexerManual'; - -import { BaseTrade } from './rawTypes'; +import { + IndexerWsBaseMarketObject, + IndexerWsTradeResponseObject, +} from '@/types/indexer/indexerManual'; type ReplaceBigNumberInUnion = T extends string ? BigNumber : T; @@ -147,4 +148,4 @@ export type SubaccountOrder = { marginMode: MarginMode | undefined; }; -export type LiveTrade = BaseTrade; +export type LiveTrade = IndexerWsTradeResponseObject; From 980c942b43016f88ab56d1546fdaf1d7bf806f85 Mon Sep 17 00:00:00 2001 From: Tyler Date: Tue, 14 Jan 2025 18:16:16 -0500 Subject: [PATCH 6/6] fix --- src/abacus-ts/lib/createStoreEffect.ts | 8 ++++++-- src/abacus-ts/websocket/candlesForTradingView.ts | 13 ++++++++----- 2 files changed, 14 insertions(+), 7 deletions(-) diff --git a/src/abacus-ts/lib/createStoreEffect.ts b/src/abacus-ts/lib/createStoreEffect.ts index 76649d1d4..ada1ddc78 100644 --- a/src/abacus-ts/lib/createStoreEffect.ts +++ b/src/abacus-ts/lib/createStoreEffect.ts @@ -16,13 +16,17 @@ export function createStoreEffect( lastValue = thisValue; // NOTE: some cleanups dispatch actions which cause this to happen recursively. // we must ensure that those actions don't change the state they subscribe to or this will go infinitely - lastCleanup?.(); + const lastCleanupCached = lastCleanup; + lastCleanup = undefined; + lastCleanupCached?.(); lastCleanup = handleChange(thisValue); } }); return () => { - lastCleanup?.(); + const lastCleanupCached = lastCleanup; + lastCleanup = undefined; + lastCleanupCached?.(); removeStoreListener(); }; } diff --git a/src/abacus-ts/websocket/candlesForTradingView.ts b/src/abacus-ts/websocket/candlesForTradingView.ts index 0b1653878..f5f01b732 100644 --- a/src/abacus-ts/websocket/candlesForTradingView.ts +++ b/src/abacus-ts/websocket/candlesForTradingView.ts @@ -15,10 +15,12 @@ import { type RootStore } from '@/state/_store'; import { mapCandle } from '../../lib/tradingView/utils'; export const subscriptionsByGuid: { - [guid: string]: { - guid: string; - unsub: () => void; - }; + [guid: string]: + | { + guid: string; + unsub: () => void; + } + | undefined; } = {}; export const subscribeOnStream = ({ @@ -48,7 +50,7 @@ export const subscribeOnStream = ({ if (isFirstRun) { isFirstRun = false; } else { - onResetCacheNeededCallback(); + setTimeout(() => onResetCacheNeededCallback(), 0); } let mostRecentFirstPointStartedAt: string | undefined; @@ -94,4 +96,5 @@ export const subscribeOnStream = ({ export const unsubscribeFromStream = (subscriberUID: string) => { subscriptionsByGuid[subscriberUID]?.unsub(); + subscriptionsByGuid[subscriberUID] = undefined; };