diff --git a/src/abacus-ts/lib/createStoreEffect.ts b/src/abacus-ts/lib/createStoreEffect.ts index 76649d1d4..ada1ddc78 100644 --- a/src/abacus-ts/lib/createStoreEffect.ts +++ b/src/abacus-ts/lib/createStoreEffect.ts @@ -16,13 +16,17 @@ export function createStoreEffect( lastValue = thisValue; // NOTE: some cleanups dispatch actions which cause this to happen recursively. // we must ensure that those actions don't change the state they subscribe to or this will go infinitely - lastCleanup?.(); + const lastCleanupCached = lastCleanup; + lastCleanup = undefined; + lastCleanupCached?.(); lastCleanup = handleChange(thisValue); } }); return () => { - lastCleanup?.(); + const lastCleanupCached = lastCleanup; + lastCleanup = undefined; + lastCleanupCached?.(); removeStoreListener(); }; } diff --git a/src/abacus-ts/types/rawTypes.ts b/src/abacus-ts/types/rawTypes.ts index cfc903302..98fb90336 100644 --- a/src/abacus-ts/types/rawTypes.ts +++ b/src/abacus-ts/types/rawTypes.ts @@ -3,7 +3,6 @@ import { IndexerAssetPositionResponseObject, IndexerHistoricalBlockTradingReward, IndexerPerpetualPositionResponseObject, - IndexerTradeResponseObject, } from '@/types/indexer/indexerApiGen'; import { IndexerCompositeFillObject, @@ -12,8 +11,6 @@ import { IndexerWsBaseMarketObject, } from '@/types/indexer/indexerManual'; -import { PartialBy } from '@/lib/typeUtils'; - export type MarketsData = { [marketId: string]: IndexerWsBaseMarketObject }; export type OrdersData = { [orderId: string]: IndexerCompositeOrderObject }; @@ -22,11 +19,6 @@ export type OrderbookData = { asks: { [price: string]: string }; }; -export type BaseTrade = PartialBy; -export type TradesData = { - trades: Array; -}; - export interface ParentSubaccountData { address: string; parentSubaccount: number; diff --git a/src/abacus-ts/types/summaryTypes.ts b/src/abacus-ts/types/summaryTypes.ts index 607be7b3f..870eaf7d3 100644 --- a/src/abacus-ts/types/summaryTypes.ts +++ b/src/abacus-ts/types/summaryTypes.ts @@ -6,9 +6,10 @@ import { IndexerOrderType, IndexerPerpetualPositionResponseObject, } from '@/types/indexer/indexerApiGen'; -import { IndexerWsBaseMarketObject } from '@/types/indexer/indexerManual'; - -import { BaseTrade } from './rawTypes'; +import { + IndexerWsBaseMarketObject, + IndexerWsTradeResponseObject, +} from '@/types/indexer/indexerManual'; type ReplaceBigNumberInUnion = T extends string ? BigNumber : T; @@ -147,6 +148,8 @@ export type SubaccountOrder = { marginMode: MarginMode | undefined; }; +export type LiveTrade = IndexerWsTradeResponseObject; + export type PendingIsolatedPosition = { marketId: string; displayId: string; @@ -155,5 +158,3 @@ export type PendingIsolatedPosition = { equity: BigNumber; orders: SubaccountOrder[]; }; - -export type LiveTrade = BaseTrade; diff --git a/src/abacus-ts/websocket/candles.ts b/src/abacus-ts/websocket/candles.ts new file mode 100644 index 000000000..ddfaa5858 --- /dev/null +++ b/src/abacus-ts/websocket/candles.ts @@ -0,0 +1,51 @@ +import { orderBy } from 'lodash'; + +import { isWsCandlesResponse, isWsCandlesUpdateResponse } from '@/types/indexer/indexerChecks'; +import { IndexerWsCandleResponse } from '@/types/indexer/indexerManual'; + +import { Loadable, loadableLoaded, loadablePending } from '../lib/loadable'; +import { logAbacusTsError } from '../logs'; +import { makeWsValueManager } from './lib/indexerValueManagerHelpers'; +import { IndexerWebsocket } from './lib/indexerWebsocket'; +import { WebsocketDerivedValue } from './lib/websocketDerivedValue'; + +function candlesWebsocketValueCreator( + websocket: IndexerWebsocket, + { marketIdAndResolution }: { marketIdAndResolution: string } +) { + return new WebsocketDerivedValue>( + websocket, + { + channel: 'v4_candles', + id: marketIdAndResolution, + handleBaseData: (baseMessage) => { + const message = isWsCandlesResponse(baseMessage); + return loadableLoaded({ + candles: orderBy(message.candles, [(a) => a.startedAt], ['asc']), + }); + }, + handleUpdates: (baseUpdates, value) => { + const updates = isWsCandlesUpdateResponse(baseUpdates); + const startingValue = value.data; + if (startingValue == null) { + logAbacusTsError('CandlesTracker', 'found unexpectedly null base data in update'); + return value; + } + if (startingValue.candles.length === 0) { + return loadableLoaded({ candles: updates }); + } + + const allNewTimes = new Set(updates.map(({ startedAt }) => startedAt)); + const newArr = [ + ...updates, + ...startingValue.candles.filter(({ startedAt }) => !allNewTimes.has(startedAt)), + ]; + const sorted = orderBy(newArr, [(a) => a.startedAt], ['asc']); + return loadableLoaded({ candles: sorted }); + }, + }, + loadablePending() + ); +} + +export const CandlesValuesManager = makeWsValueManager(candlesWebsocketValueCreator); diff --git a/src/abacus-ts/websocket/candlesForTradingView.ts b/src/abacus-ts/websocket/candlesForTradingView.ts new file mode 100644 index 000000000..f5f01b732 --- /dev/null +++ b/src/abacus-ts/websocket/candlesForTradingView.ts @@ -0,0 +1,100 @@ +import { createStoreEffect } from '@/abacus-ts/lib/createStoreEffect'; +import { selectWebsocketUrl } from '@/abacus-ts/socketSelectors'; +import { CandlesValuesManager } from '@/abacus-ts/websocket/candles'; +import { subscribeToWsValue } from '@/abacus-ts/websocket/lib/indexerValueManagerHelpers'; +import type { + LibrarySymbolInfo, + ResolutionString, + SubscribeBarsCallback, +} from 'public/tradingview/charting_library'; + +import { CandleResolution, RESOLUTION_MAP } from '@/constants/candles'; + +import { type RootStore } from '@/state/_store'; + +import { mapCandle } from '../../lib/tradingView/utils'; + +export const subscriptionsByGuid: { + [guid: string]: + | { + guid: string; + unsub: () => void; + } + | undefined; +} = {}; + +export const subscribeOnStream = ({ + store, + orderbookCandlesToggleOn, + symbolInfo, + resolution, + onRealtimeCallback, + listenerGuid, + onResetCacheNeededCallback, +}: { + store: RootStore; + orderbookCandlesToggleOn: boolean; + symbolInfo: LibrarySymbolInfo; + resolution: ResolutionString; + onRealtimeCallback: SubscribeBarsCallback; + listenerGuid: string; + onResetCacheNeededCallback: Function; +}) => { + if (!symbolInfo.ticker) return; + + const channelId = `${symbolInfo.ticker}/${RESOLUTION_MAP[resolution]}`; + let isFirstRun = true; + + const tearDown = createStoreEffect(store, selectWebsocketUrl, (wsUrl) => { + // if the websocket url changes, force a refresh + if (isFirstRun) { + isFirstRun = false; + } else { + setTimeout(() => onResetCacheNeededCallback(), 0); + } + + let mostRecentFirstPointStartedAt: string | undefined; + const unsub = subscribeToWsValue( + CandlesValuesManager, + { wsUrl, marketIdAndResolution: channelId }, + ({ data }) => { + if (data == null || data.candles.length === 0) { + return; + } + // if we've never seen data before, it's either the existing data or the subscribed message + // either way, we take it as the basis and only send further updates + // there is a small race condition where messages could be missed between + // when trandingview does the rest query and when we start receiving updates + if (mostRecentFirstPointStartedAt == null) { + mostRecentFirstPointStartedAt = data.candles.at(-1)?.startedAt; + return; + } + data.candles.forEach((candle) => { + if (candle.startedAt >= mostRecentFirstPointStartedAt!) { + onRealtimeCallback( + mapCandle(orderbookCandlesToggleOn)({ + ...candle, + resolution: candle.resolution as unknown as CandleResolution, + orderbookMidPriceClose: candle.orderbookMidPriceClose ?? undefined, + orderbookMidPriceOpen: candle.orderbookMidPriceOpen ?? undefined, + }) + ); + } + }); + mostRecentFirstPointStartedAt = data.candles.at(-1)?.startedAt; + } + ); + + // happens on network change or unsubscribe from stream + return () => { + unsub(); + }; + }); + + subscriptionsByGuid[listenerGuid] = { guid: listenerGuid, unsub: tearDown }; +}; + +export const unsubscribeFromStream = (subscriberUID: string) => { + subscriptionsByGuid[subscriberUID]?.unsub(); + subscriptionsByGuid[subscriberUID] = undefined; +}; diff --git a/src/abacus-ts/websocket/trades.ts b/src/abacus-ts/websocket/trades.ts index 9c17f02a3..04de9d4d5 100644 --- a/src/abacus-ts/websocket/trades.ts +++ b/src/abacus-ts/websocket/trades.ts @@ -3,6 +3,7 @@ import { useEffect, useState } from 'react'; import { orderBy } from 'lodash'; import { isWsTradesResponse, isWsTradesUpdateResponses } from '@/types/indexer/indexerChecks'; +import { IndexerWsTradesUpdateObject } from '@/types/indexer/indexerManual'; import { useAppSelector } from '@/state/appTypes'; import { getCurrentMarketIdIfTradeable } from '@/state/perpetualsSelectors'; @@ -10,8 +11,8 @@ import { getCurrentMarketIdIfTradeable } from '@/state/perpetualsSelectors'; import { mergeById } from '@/lib/mergeById'; import { Loadable, loadableIdle, loadableLoaded, loadablePending } from '../lib/loadable'; +import { logAbacusTsError } from '../logs'; import { selectWebsocketUrl } from '../socketSelectors'; -import { TradesData } from '../types/rawTypes'; import { makeWsValueManager, subscribeToWsValue } from './lib/indexerValueManagerHelpers'; import { IndexerWebsocket } from './lib/indexerWebsocket'; import { WebsocketDerivedValue } from './lib/websocketDerivedValue'; @@ -22,7 +23,7 @@ function tradesWebsocketValueCreator( websocket: IndexerWebsocket, { marketId }: { marketId: string } ) { - return new WebsocketDerivedValue>( + return new WebsocketDerivedValue>( websocket, { channel: 'v4_trades', @@ -37,8 +38,9 @@ function tradesWebsocketValueCreator( const updates = isWsTradesUpdateResponses(baseUpdates); const startingValue = value.data; if (startingValue == null) { - // eslint-disable-next-line no-console - console.log('MarketsTracker found unexpectedly null base data in update', { marketId }); + logAbacusTsError('TradesTracker', 'found unexpectedly null base data in update', { + marketId, + }); return value; } const allNewTrades = updates.flatMap((u) => u.trades).toReversed(); @@ -58,7 +60,7 @@ export function useCurrentMarketTradesValue() { const currentMarketId = useAppSelector(getCurrentMarketIdIfTradeable); // useSyncExternalStore is better but the API doesn't fit this use case very well - const [trades, setTrades] = useState>(loadableIdle()); + const [trades, setTrades] = useState>(loadableIdle()); useEffect(() => { if (currentMarketId == null) { diff --git a/src/lib/tradingView/dydxfeed/index.ts b/src/lib/tradingView/dydxfeed/index.ts index 4db1c05f0..a8e75c80b 100644 --- a/src/lib/tradingView/dydxfeed/index.ts +++ b/src/lib/tradingView/dydxfeed/index.ts @@ -200,6 +200,8 @@ export const getDydxDatafeed = ( onResetCacheNeededCallback: Function ) => { subscribeOnStream({ + store, + orderbookCandlesToggleOn, symbolInfo, resolution, onRealtimeCallback: onTick, diff --git a/src/lib/tradingView/dydxfeed/streaming.ts b/src/lib/tradingView/dydxfeed/streaming.ts index ae830fea2..cd5415cf0 100644 --- a/src/lib/tradingView/dydxfeed/streaming.ts +++ b/src/lib/tradingView/dydxfeed/streaming.ts @@ -6,6 +6,8 @@ import type { import { RESOLUTION_MAP } from '@/constants/candles'; +import { type RootStore } from '@/state/_store'; + import abacusStateManager from '@/lib/abacus'; import { subscriptionsByChannelId } from './cache'; @@ -16,6 +18,8 @@ export const subscribeOnStream = ({ onRealtimeCallback, listenerGuid, }: { + store: RootStore; + orderbookCandlesToggleOn: boolean; symbolInfo: LibrarySymbolInfo; resolution: ResolutionString; onRealtimeCallback: SubscribeBarsCallback; diff --git a/src/types/indexer/indexerChecks.ts b/src/types/indexer/indexerChecks.ts index 7c646d69e..7ac377105 100644 --- a/src/types/indexer/indexerChecks.ts +++ b/src/types/indexer/indexerChecks.ts @@ -9,6 +9,8 @@ import { import { IndexerCompositeFillResponse, IndexerCompositeOrderObject, + IndexerWsCandleResponse, + IndexerWsCandleResponseObject, IndexerWsMarketUpdateResponse, IndexerWsOrderbookUpdateResponse, IndexerWsParentSubaccountSubscribedResponse, @@ -28,6 +30,8 @@ export const isWsOrderbookUpdateResponses = typia.createAssert(); export const isWsTradesResponse = typia.createAssert(); export const isWsTradesUpdateResponses = typia.createAssert(); +export const isWsCandlesResponse = typia.createAssert(); +export const isWsCandlesUpdateResponse = typia.createAssert(); export const isParentSubaccountFillResponse = typia.createAssert(); export const isParentSubaccountOrders = typia.createAssert(); export const isParentSubaccountTransferResponse = diff --git a/src/types/indexer/indexerManual.ts b/src/types/indexer/indexerManual.ts index 8ec0aca83..2fc356fa9 100644 --- a/src/types/indexer/indexerManual.ts +++ b/src/types/indexer/indexerManual.ts @@ -4,6 +4,7 @@ import { IndexerAPIOrderStatus, IndexerAPITimeInForce, IndexerAssetPositionResponseObject, + IndexerCandleResponseObject, IndexerFillType, IndexerHistoricalBlockTradingReward, IndexerIsoString, @@ -164,6 +165,13 @@ export interface IndexerWsParentSubaccountUpdateObject { transfers?: IndexerTransferCommonResponseObject; } +// hacking around backend types not quite matching what the websocket sends +export type IndexerWsTradeResponseObject = PartialBy; export interface IndexerWsTradesUpdateObject { - trades: PartialBy[]; + trades: IndexerWsTradeResponseObject[]; +} + +export type IndexerWsCandleResponseObject = Omit; +export interface IndexerWsCandleResponse { + candles: Array; }