diff --git a/.eslintrc.json b/.eslintrc.json index 3609ee35d..8945e1780 100644 --- a/.eslintrc.json +++ b/.eslintrc.json @@ -53,10 +53,10 @@ "error", { "patterns": [ - "@/abacus-ts/*", + "@/abacus-ts/*/*", "!@/abacus-ts/ontology", - "!@/abacus-ts/lib", - "!@/abacus-ts/summaryTypes" + "!@/abacus-ts/lib/*", + "!@/abacus-ts/types/summaryTypes" ] } ], diff --git a/src/abacus-ts/calculators/accountActions.ts b/src/abacus-ts/calculators/accountActions.ts index 3bcaa1c58..499bb5e96 100644 --- a/src/abacus-ts/calculators/accountActions.ts +++ b/src/abacus-ts/calculators/accountActions.ts @@ -7,42 +7,131 @@ import { import { MustBigNumber } from '@/lib/numbers'; -import { SubaccountBatchedOperations, SubaccountOperations } from '../types/operationTypes'; -import { ChildSubaccountData, ParentSubaccountData } from '../types/rawTypes'; - -export function createUsdcDepositOperations({ - subaccountNumber, - depositAmount, -}: { - subaccountNumber: number; - depositAmount: string; -}): SubaccountBatchedOperations { +import { freshChildSubaccount, newUsdcAssetPosition } from '../lib/subaccountUtils'; +import { + ModifyUsdcAssetPositionProps, + SubaccountBatchedOperations, + SubaccountOperations, +} from '../types/operationTypes'; +import { ParentSubaccountData } from '../types/rawTypes'; + +function addUsdcAssetPosition( + parentSubaccount: ParentSubaccountData, + payload: Pick +): ParentSubaccountData { + const { side, size, subaccountNumber } = payload; + return produce(parentSubaccount, (draftParentSubaccountData) => { + let childSubaccount = draftParentSubaccountData.childSubaccounts[subaccountNumber]; + + if (childSubaccount == null) { + // Upsert ChildSubaccountData into parentSubaccountData.childSubaccounts + const updatedChildSubaccount = freshChildSubaccount({ + address: draftParentSubaccountData.address, + subaccountNumber, + }); + + childSubaccount = { + ...updatedChildSubaccount, + assetPositions: { + ...updatedChildSubaccount.assetPositions, + USDC: newUsdcAssetPosition({ + side, + size, + subaccountNumber, + }), + }, + }; + } else { + if (childSubaccount.assetPositions.USDC == null) { + // Upsert USDC Asset Position + childSubaccount.assetPositions.USDC = newUsdcAssetPosition({ + side, + size, + subaccountNumber, + }); + } else { + if (childSubaccount.assetPositions.USDC.side !== side) { + const signedSizeBN = MustBigNumber(childSubaccount.assetPositions.USDC.size).minus(size); + + if (signedSizeBN.lte(0)) { + // New size flips the Asset Position Side + childSubaccount.assetPositions.USDC.side = + side === IndexerPositionSide.LONG + ? IndexerPositionSide.SHORT + : IndexerPositionSide.LONG; + childSubaccount.assetPositions.USDC.size = signedSizeBN.abs().toString(); + } else { + // Set the new size of the Asset Position + childSubaccount.assetPositions.USDC.size = signedSizeBN.toString(); + } + } else { + // Side is maintained, add the size to the existing position + childSubaccount.assetPositions.USDC.size = MustBigNumber( + childSubaccount.assetPositions.USDC.size + ) + .plus(size) + .toString(); + } + } + } + }); +} + +export function createUsdcDepositOperations( + parentSubaccount: ParentSubaccountData, + { + subaccountNumber, + depositAmount, + }: { + subaccountNumber: number; + depositAmount: string; + } +): SubaccountBatchedOperations { + const updatedParentSubaccountData = addUsdcAssetPosition(parentSubaccount, { + side: IndexerPositionSide.LONG, + size: depositAmount, + subaccountNumber, + }); + + if (updatedParentSubaccountData.childSubaccounts[subaccountNumber]?.assetPositions.USDC == null) { + throw new Error('USDC Asset Position was improperly modified'); + } + return { operations: [ SubaccountOperations.ModifyUsdcAssetPosition({ subaccountNumber, - changes: { - size: depositAmount, - }, + changes: updatedParentSubaccountData.childSubaccounts[subaccountNumber].assetPositions.USDC, }), ], }; } -export function createUsdcWithdrawalOperations({ - subaccountNumber, - withdrawAmount, -}: { - subaccountNumber: number; - withdrawAmount: string; -}): SubaccountBatchedOperations { +export function createUsdcWithdrawalOperations( + parentSubaccount: ParentSubaccountData, + { + subaccountNumber, + withdrawAmount, + }: { + subaccountNumber: number; + withdrawAmount: string; + } +): SubaccountBatchedOperations { + const updatedParentSubaccountData = addUsdcAssetPosition(parentSubaccount, { + side: IndexerPositionSide.SHORT, + size: withdrawAmount, + subaccountNumber, + }); + + if (updatedParentSubaccountData.childSubaccounts[subaccountNumber]?.assetPositions.USDC == null) { + throw new Error('USDC Asset Position was improperly modified'); + } + return { operations: [ SubaccountOperations.ModifyUsdcAssetPosition({ subaccountNumber, - changes: { - size: MustBigNumber(withdrawAmount).negated().toString(), - }, + changes: updatedParentSubaccountData.childSubaccounts[subaccountNumber].assetPositions.USDC, }), ], }; @@ -50,57 +139,14 @@ export function createUsdcWithdrawalOperations({ function modifyUsdcAssetPosition( parentSubaccountData: ParentSubaccountData, - payload: { - subaccountNumber: number; - changes: Partial>; - } + payload: ModifyUsdcAssetPositionProps ): ParentSubaccountData { const { subaccountNumber, changes } = payload; - if (!changes.size) return parentSubaccountData; return produce(parentSubaccountData, (draftParentSubaccountData) => { - const sizeBN = MustBigNumber(changes.size); - - let childSubaccount: ChildSubaccountData | undefined = - draftParentSubaccountData.childSubaccounts[subaccountNumber]; - - if (childSubaccount != null) { - // Modify childSubaccount - if (childSubaccount.assetPositions.USDC != null) { - const size = MustBigNumber(childSubaccount.assetPositions.USDC.size) - .plus(sizeBN) - .toString(); - - childSubaccount.assetPositions.USDC.size = size; - } else if (sizeBN.gt(0)) { - // Upsert USDC Asset Position - childSubaccount.assetPositions.USDC = { - assetId: '0', - symbol: 'USDC', - size: sizeBN.toString(), - side: IndexerPositionSide.LONG, - subaccountNumber, - } satisfies IndexerAssetPositionResponseObject; - } - } else { - // Upsert ChildSubaccountData into parentSubaccountData.childSubaccounts - childSubaccount = { - address: parentSubaccountData.address, - subaccountNumber, - openPerpetualPositions: {}, - assetPositions: { - USDC: { - assetId: '0', - symbol: 'USDC', - size: sizeBN.toString(), - side: IndexerPositionSide.LONG, - subaccountNumber, - }, - }, - } satisfies ChildSubaccountData; + if (draftParentSubaccountData.childSubaccounts[subaccountNumber]?.assetPositions.USDC != null) { + draftParentSubaccountData.childSubaccounts[subaccountNumber].assetPositions.USDC = changes; } - - draftParentSubaccountData.childSubaccounts[subaccountNumber] = childSubaccount; }); } diff --git a/src/abacus-ts/calculators/subaccount.ts b/src/abacus-ts/calculators/subaccount.ts index bb9532427..eed50b49b 100644 --- a/src/abacus-ts/calculators/subaccount.ts +++ b/src/abacus-ts/calculators/subaccount.ts @@ -1,5 +1,6 @@ import BigNumber from 'bignumber.js'; -import { mapValues, orderBy } from 'lodash'; +import { groupBy, map, mapValues, orderBy, pickBy } from 'lodash'; +import { weakMapMemoize } from 'reselect'; import { NUM_PARENT_SUBACCOUNTS } from '@/constants/account'; import { @@ -9,14 +10,21 @@ import { } from '@/types/indexer/indexerApiGen'; import { IndexerWsBaseMarketObject } from '@/types/indexer/indexerManual'; -import { getAssetFromMarketId } from '@/lib/assetUtils'; +import { + getAssetFromMarketId, + getDisplayableAssetFromBaseAsset, + getDisplayableTickerFromMarket, +} from '@/lib/assetUtils'; import { calc } from '@/lib/do'; +import { isTruthy } from '@/lib/isTruthy'; import { BIG_NUMBERS, MaybeBigNumber, MustBigNumber, ToBigNumber } from '@/lib/numbers'; import { isPresent } from '@/lib/typeUtils'; import { ChildSubaccountData, MarketsData, ParentSubaccountData } from '../types/rawTypes'; import { GroupedSubaccountSummary, + PendingIsolatedPosition, + SubaccountOrder, SubaccountPosition, SubaccountPositionBase, SubaccountPositionDerivedCore, @@ -74,16 +82,15 @@ export function calculateMarketsNeededForSubaccount(parent: Omit { + const core = calculateSubaccountSummaryCore(subaccountData, markets); + return { + ...core, + ...calculateSubaccountSummaryDerived(core), + }; + } +); function calculateSubaccountSummaryCore( subaccountData: ChildSubaccountData, @@ -288,3 +295,56 @@ function calculatePositionDerivedExtra( updatedUnrealizedPnlPercent, }; } + +export function calculateChildSubaccountSummaries( + parent: Omit, + markets: MarketsData +): Record { + return pickBy( + mapValues( + parent.childSubaccounts, + (subaccount) => subaccount && calculateSubaccountSummary(subaccount, markets) + ), + isTruthy + ); +} + +/** + * @returns a list of pending isolated positions + * PendingIsolatedPosition is exists if there are any orders that meet the following criteria: + * - marginMode is ISOLATED + * - no existing position exists + * - childSubaccount has equity + */ +export function calculateUnopenedIsolatedPositions( + childSubaccounts: Record, + orders: SubaccountOrder[], + positions: SubaccountPosition[] +): PendingIsolatedPosition[] { + const setOfOpenPositionMarkets = new Set(positions.map(({ market }) => market)); + + const filteredOrders = orders.filter( + (o) => !setOfOpenPositionMarkets.has(o.marketId) && o.marginMode === 'ISOLATED' + ); + + const filteredOrdersMap = groupBy(filteredOrders, 'marketId'); + const marketIdToSubaccountNumber = mapValues( + filteredOrdersMap, + (filteredOrder) => filteredOrder[0]?.subaccountNumber + ); + + return map(filteredOrdersMap, (orderList, marketId) => { + const subaccountNumber = marketIdToSubaccountNumber[marketId]; + if (subaccountNumber == null) return undefined; + const assetId = getAssetFromMarketId(marketId); + + return { + assetId, + displayableAsset: getDisplayableAssetFromBaseAsset(assetId), + marketId, + displayId: getDisplayableTickerFromMarket(marketId), + equity: childSubaccounts[subaccountNumber]?.equity ?? BIG_NUMBERS.ZERO, + orders: orderList, + }; + }).filter(isTruthy); +} diff --git a/src/abacus-ts/lib/createStoreEffect.ts b/src/abacus-ts/lib/createStoreEffect.ts index 76649d1d4..ada1ddc78 100644 --- a/src/abacus-ts/lib/createStoreEffect.ts +++ b/src/abacus-ts/lib/createStoreEffect.ts @@ -16,13 +16,17 @@ export function createStoreEffect( lastValue = thisValue; // NOTE: some cleanups dispatch actions which cause this to happen recursively. // we must ensure that those actions don't change the state they subscribe to or this will go infinitely - lastCleanup?.(); + const lastCleanupCached = lastCleanup; + lastCleanup = undefined; + lastCleanupCached?.(); lastCleanup = handleChange(thisValue); } }); return () => { - lastCleanup?.(); + const lastCleanupCached = lastCleanup; + lastCleanup = undefined; + lastCleanupCached?.(); removeStoreListener(); }; } diff --git a/src/abacus-ts/lib/subaccountUtils.ts b/src/abacus-ts/lib/subaccountUtils.ts new file mode 100644 index 000000000..868c07604 --- /dev/null +++ b/src/abacus-ts/lib/subaccountUtils.ts @@ -0,0 +1,60 @@ +import { + IndexerPositionSide, + IndexerSubaccountResponseObject, +} from '@/types/indexer/indexerApiGen'; + +import { ChildSubaccountData } from '../types/rawTypes'; + +export function isValidSubaccount(childSubaccount: IndexerSubaccountResponseObject) { + return ( + Object.keys(childSubaccount.assetPositions).length > 0 || + Object.keys(childSubaccount.openPerpetualPositions).length > 0 + ); +} + +export function convertToStoredChildSubaccount({ + address, + subaccountNumber, + assetPositions, + openPerpetualPositions, +}: IndexerSubaccountResponseObject): ChildSubaccountData { + return { + address, + subaccountNumber, + assetPositions, + openPerpetualPositions, + }; +} + +export function freshChildSubaccount({ + address, + subaccountNumber, +}: { + address: string; + subaccountNumber: number; +}): ChildSubaccountData { + return { + address, + subaccountNumber, + assetPositions: {}, + openPerpetualPositions: {}, + }; +} + +export function newUsdcAssetPosition({ + side, + size, + subaccountNumber, +}: { + side: IndexerPositionSide; + size: string; + subaccountNumber: number; +}) { + return { + assetId: '0', + size, + subaccountNumber, + side, + symbol: 'USDC', + }; +} diff --git a/src/abacus-ts/ontology.ts b/src/abacus-ts/ontology.ts index 1e3933d38..5b8bfcfcb 100644 --- a/src/abacus-ts/ontology.ts +++ b/src/abacus-ts/ontology.ts @@ -15,6 +15,7 @@ import { selectParentSubaccountOpenPositionsLoading, selectParentSubaccountSummary, selectParentSubaccountSummaryLoading, + selectUnopenedIsolatedPositions, } from './selectors/account'; import { createSelectParentSubaccountSummaryDeposit, @@ -103,12 +104,13 @@ export const BonsaiHelpers = { }, forms: { deposit: { - parentSubaccountSummary: createSelectParentSubaccountSummaryDeposit, + createSelectParentSubaccountSummary: createSelectParentSubaccountSummaryDeposit, }, withdraw: { - parentSubaccountSummary: createSelectParentSubaccountSummaryWithdrawal, + createSelectParentSubaccountSummary: createSelectParentSubaccountSummaryWithdrawal, }, }, + unopenedIsolatedPositions: selectUnopenedIsolatedPositions, } as const satisfies NestedSelectors; export const BonsaiHooks = { diff --git a/src/abacus-ts/selectors/account.ts b/src/abacus-ts/selectors/account.ts index a3d42465a..7d025b4db 100644 --- a/src/abacus-ts/selectors/account.ts +++ b/src/abacus-ts/selectors/account.ts @@ -14,9 +14,11 @@ import { calculateOrderHistory, } from '../calculators/orders'; import { + calculateChildSubaccountSummaries, calculateMarketsNeededForSubaccount, calculateParentSubaccountPositions, calculateParentSubaccountSummary, + calculateUnopenedIsolatedPositions, } from '../calculators/subaccount'; import { calculateTransfers } from '../calculators/transfers'; import { mergeLoadableStatus } from '../lib/mapLoadable'; @@ -98,6 +100,7 @@ export const selectParentSubaccountOpenPositions = createAppSelector( return positions?.filter((p) => p.status === IndexerPerpetualPositionStatus.OPEN); } ); + export const selectParentSubaccountOpenPositionsLoading = selectParentSubaccountSummaryLoading; export const selectAccountOrders = createAppSelector( @@ -142,6 +145,28 @@ export const selectAccountOrdersLoading = createAppSelector( mergeLoadableStatus ); +export const selectChildSubaccountSummaries = createAppSelector( + [selectRawParentSubaccountData, selectRelevantMarketsData], + (parentSubaccount, marketsData) => { + if (parentSubaccount == null || marketsData == null) { + return undefined; + } + + return calculateChildSubaccountSummaries(parentSubaccount, marketsData); + } +); + +export const selectUnopenedIsolatedPositions = createAppSelector( + [selectChildSubaccountSummaries, selectOpenOrders, selectParentSubaccountPositions], + (childSubaccounts, orders, positions) => { + if (childSubaccounts == null || positions == null) { + return undefined; + } + + return calculateUnopenedIsolatedPositions(childSubaccounts, orders, positions); + } +); + export const selectAccountFills = createAppSelector( [selectRawFillsRestData, selectRawFillsLiveData], (rest, live) => { diff --git a/src/abacus-ts/selectors/accountActions.ts b/src/abacus-ts/selectors/accountActions.ts index 5351cbd6b..5ea144373 100644 --- a/src/abacus-ts/selectors/accountActions.ts +++ b/src/abacus-ts/selectors/accountActions.ts @@ -27,7 +27,7 @@ export const createSelectParentSubaccountSummaryDeposit = () => return undefined; } - const operations = createUsdcDepositOperations(depositInputs); + const operations = createUsdcDepositOperations(parentSubaccount, depositInputs); const modifiedParentSubaccount = applyOperationsToSubaccount(parentSubaccount, operations); const result = calculateParentSubaccountSummary(modifiedParentSubaccount, markets); return result; @@ -52,7 +52,7 @@ export const createSelectParentSubaccountSummaryWithdrawal = () => return undefined; } - const operations = createUsdcWithdrawalOperations(withdrawalInputs); + const operations = createUsdcWithdrawalOperations(parentSubaccount, withdrawalInputs); const modifiedParentSubaccount = applyOperationsToSubaccount(parentSubaccount, operations); const result = calculateParentSubaccountSummary(modifiedParentSubaccount, markets); return result; diff --git a/src/abacus-ts/types/operationTypes.ts b/src/abacus-ts/types/operationTypes.ts index 0d71e275d..f691ea630 100644 --- a/src/abacus-ts/types/operationTypes.ts +++ b/src/abacus-ts/types/operationTypes.ts @@ -5,22 +5,26 @@ import { IndexerPerpetualPositionResponseObject, } from '@/types/indexer/indexerApiGen'; +export type AddPerpetualPositionProps = { + subaccountNumber: number; + market: string; + changes: Omit; +}; + +export type ModifyPerpetualPositionProps = { + changes: Partial>; +}; + +export type ModifyUsdcAssetPositionProps = { + subaccountNumber: number; + changes: IndexerAssetPositionResponseObject; +}; + export const SubaccountOperations = unionize( { - AddPerpetualPosition: ofType<{ - subaccountNumber: number; - market: string; - position: Omit; - }>(), - ModifyPerpetualPosition: ofType<{ - subaccountNumber: number; - market: string; - changes: Partial>; - }>(), - ModifyUsdcAssetPosition: ofType<{ - subaccountNumber: number; - changes: Partial>; - }>(), + AddPerpetualPosition: ofType(), + ModifyPerpetualPosition: ofType(), + ModifyUsdcAssetPosition: ofType(), }, { tag: 'operation' as const, value: 'payload' as const } ); diff --git a/src/abacus-ts/types/rawTypes.ts b/src/abacus-ts/types/rawTypes.ts index cfc903302..98fb90336 100644 --- a/src/abacus-ts/types/rawTypes.ts +++ b/src/abacus-ts/types/rawTypes.ts @@ -3,7 +3,6 @@ import { IndexerAssetPositionResponseObject, IndexerHistoricalBlockTradingReward, IndexerPerpetualPositionResponseObject, - IndexerTradeResponseObject, } from '@/types/indexer/indexerApiGen'; import { IndexerCompositeFillObject, @@ -12,8 +11,6 @@ import { IndexerWsBaseMarketObject, } from '@/types/indexer/indexerManual'; -import { PartialBy } from '@/lib/typeUtils'; - export type MarketsData = { [marketId: string]: IndexerWsBaseMarketObject }; export type OrdersData = { [orderId: string]: IndexerCompositeOrderObject }; @@ -22,11 +19,6 @@ export type OrderbookData = { asks: { [price: string]: string }; }; -export type BaseTrade = PartialBy; -export type TradesData = { - trades: Array; -}; - export interface ParentSubaccountData { address: string; parentSubaccount: number; diff --git a/src/abacus-ts/types/summaryTypes.ts b/src/abacus-ts/types/summaryTypes.ts index 6e80942ca..870eaf7d3 100644 --- a/src/abacus-ts/types/summaryTypes.ts +++ b/src/abacus-ts/types/summaryTypes.ts @@ -6,9 +6,10 @@ import { IndexerOrderType, IndexerPerpetualPositionResponseObject, } from '@/types/indexer/indexerApiGen'; -import { IndexerWsBaseMarketObject } from '@/types/indexer/indexerManual'; - -import { BaseTrade } from './rawTypes'; +import { + IndexerWsBaseMarketObject, + IndexerWsTradeResponseObject, +} from '@/types/indexer/indexerManual'; type ReplaceBigNumberInUnion = T extends string ? BigNumber : T; @@ -147,4 +148,13 @@ export type SubaccountOrder = { marginMode: MarginMode | undefined; }; -export type LiveTrade = BaseTrade; +export type LiveTrade = IndexerWsTradeResponseObject; + +export type PendingIsolatedPosition = { + marketId: string; + displayId: string; + assetId: string; + displayableAsset: string; + equity: BigNumber; + orders: SubaccountOrder[]; +}; diff --git a/src/abacus-ts/websocket/candles.ts b/src/abacus-ts/websocket/candles.ts new file mode 100644 index 000000000..ddfaa5858 --- /dev/null +++ b/src/abacus-ts/websocket/candles.ts @@ -0,0 +1,51 @@ +import { orderBy } from 'lodash'; + +import { isWsCandlesResponse, isWsCandlesUpdateResponse } from '@/types/indexer/indexerChecks'; +import { IndexerWsCandleResponse } from '@/types/indexer/indexerManual'; + +import { Loadable, loadableLoaded, loadablePending } from '../lib/loadable'; +import { logAbacusTsError } from '../logs'; +import { makeWsValueManager } from './lib/indexerValueManagerHelpers'; +import { IndexerWebsocket } from './lib/indexerWebsocket'; +import { WebsocketDerivedValue } from './lib/websocketDerivedValue'; + +function candlesWebsocketValueCreator( + websocket: IndexerWebsocket, + { marketIdAndResolution }: { marketIdAndResolution: string } +) { + return new WebsocketDerivedValue>( + websocket, + { + channel: 'v4_candles', + id: marketIdAndResolution, + handleBaseData: (baseMessage) => { + const message = isWsCandlesResponse(baseMessage); + return loadableLoaded({ + candles: orderBy(message.candles, [(a) => a.startedAt], ['asc']), + }); + }, + handleUpdates: (baseUpdates, value) => { + const updates = isWsCandlesUpdateResponse(baseUpdates); + const startingValue = value.data; + if (startingValue == null) { + logAbacusTsError('CandlesTracker', 'found unexpectedly null base data in update'); + return value; + } + if (startingValue.candles.length === 0) { + return loadableLoaded({ candles: updates }); + } + + const allNewTimes = new Set(updates.map(({ startedAt }) => startedAt)); + const newArr = [ + ...updates, + ...startingValue.candles.filter(({ startedAt }) => !allNewTimes.has(startedAt)), + ]; + const sorted = orderBy(newArr, [(a) => a.startedAt], ['asc']); + return loadableLoaded({ candles: sorted }); + }, + }, + loadablePending() + ); +} + +export const CandlesValuesManager = makeWsValueManager(candlesWebsocketValueCreator); diff --git a/src/abacus-ts/websocket/candlesForTradingView.ts b/src/abacus-ts/websocket/candlesForTradingView.ts new file mode 100644 index 000000000..f5f01b732 --- /dev/null +++ b/src/abacus-ts/websocket/candlesForTradingView.ts @@ -0,0 +1,100 @@ +import { createStoreEffect } from '@/abacus-ts/lib/createStoreEffect'; +import { selectWebsocketUrl } from '@/abacus-ts/socketSelectors'; +import { CandlesValuesManager } from '@/abacus-ts/websocket/candles'; +import { subscribeToWsValue } from '@/abacus-ts/websocket/lib/indexerValueManagerHelpers'; +import type { + LibrarySymbolInfo, + ResolutionString, + SubscribeBarsCallback, +} from 'public/tradingview/charting_library'; + +import { CandleResolution, RESOLUTION_MAP } from '@/constants/candles'; + +import { type RootStore } from '@/state/_store'; + +import { mapCandle } from '../../lib/tradingView/utils'; + +export const subscriptionsByGuid: { + [guid: string]: + | { + guid: string; + unsub: () => void; + } + | undefined; +} = {}; + +export const subscribeOnStream = ({ + store, + orderbookCandlesToggleOn, + symbolInfo, + resolution, + onRealtimeCallback, + listenerGuid, + onResetCacheNeededCallback, +}: { + store: RootStore; + orderbookCandlesToggleOn: boolean; + symbolInfo: LibrarySymbolInfo; + resolution: ResolutionString; + onRealtimeCallback: SubscribeBarsCallback; + listenerGuid: string; + onResetCacheNeededCallback: Function; +}) => { + if (!symbolInfo.ticker) return; + + const channelId = `${symbolInfo.ticker}/${RESOLUTION_MAP[resolution]}`; + let isFirstRun = true; + + const tearDown = createStoreEffect(store, selectWebsocketUrl, (wsUrl) => { + // if the websocket url changes, force a refresh + if (isFirstRun) { + isFirstRun = false; + } else { + setTimeout(() => onResetCacheNeededCallback(), 0); + } + + let mostRecentFirstPointStartedAt: string | undefined; + const unsub = subscribeToWsValue( + CandlesValuesManager, + { wsUrl, marketIdAndResolution: channelId }, + ({ data }) => { + if (data == null || data.candles.length === 0) { + return; + } + // if we've never seen data before, it's either the existing data or the subscribed message + // either way, we take it as the basis and only send further updates + // there is a small race condition where messages could be missed between + // when trandingview does the rest query and when we start receiving updates + if (mostRecentFirstPointStartedAt == null) { + mostRecentFirstPointStartedAt = data.candles.at(-1)?.startedAt; + return; + } + data.candles.forEach((candle) => { + if (candle.startedAt >= mostRecentFirstPointStartedAt!) { + onRealtimeCallback( + mapCandle(orderbookCandlesToggleOn)({ + ...candle, + resolution: candle.resolution as unknown as CandleResolution, + orderbookMidPriceClose: candle.orderbookMidPriceClose ?? undefined, + orderbookMidPriceOpen: candle.orderbookMidPriceOpen ?? undefined, + }) + ); + } + }); + mostRecentFirstPointStartedAt = data.candles.at(-1)?.startedAt; + } + ); + + // happens on network change or unsubscribe from stream + return () => { + unsub(); + }; + }); + + subscriptionsByGuid[listenerGuid] = { guid: listenerGuid, unsub: tearDown }; +}; + +export const unsubscribeFromStream = (subscriberUID: string) => { + subscriptionsByGuid[subscriberUID]?.unsub(); + subscriptionsByGuid[subscriberUID] = undefined; +}; diff --git a/src/abacus-ts/websocket/lib/indexerValueManagerHelpers.ts b/src/abacus-ts/websocket/lib/indexerValueManagerHelpers.ts index dba78c1f7..5f1d06948 100644 --- a/src/abacus-ts/websocket/lib/indexerValueManagerHelpers.ts +++ b/src/abacus-ts/websocket/lib/indexerValueManagerHelpers.ts @@ -6,7 +6,7 @@ import { IndexerWebsocketManager } from './indexerWebsocketManager'; import { WebsocketDerivedValue } from './websocketDerivedValue'; // this is set to just above the websocket subscribe timeout because of race conditions in the indexer backend -const DESTROY_DELAY_MS = 21000; +const DESTROY_DELAY_MS = 23000; type WebsocketValueCreator = ( websocket: IndexerWebsocket, diff --git a/src/abacus-ts/websocket/lib/indexerWebsocket.ts b/src/abacus-ts/websocket/lib/indexerWebsocket.ts index 43e36b337..ad2bf35dd 100644 --- a/src/abacus-ts/websocket/lib/indexerWebsocket.ts +++ b/src/abacus-ts/websocket/lib/indexerWebsocket.ts @@ -1,3 +1,4 @@ +/* eslint-disable max-classes-per-file */ import { logAbacusTsError, logAbacusTsInfo } from '@/abacus-ts/logs'; import typia from 'typia'; @@ -9,26 +10,88 @@ import { isTruthy } from '@/lib/isTruthy'; import { ReconnectingWebSocket } from './reconnectingWebsocket'; const NO_ID_SPECIAL_STRING_ID = '______EMPTY_ID______'; -const CHANNEL_ID_SAFE_DIVIDER = '//////////'; const CHANNEL_RETRY_COOLDOWN_MS = timeUnits.minute; +interface SubscriptionHandlerInput { + channel: string; + id: string | undefined; + batched: boolean; + handleBaseData: (data: any, fullMessage: any) => void; + handleUpdates: (updates: any[], fullMessage: any) => void; +} + +type SubscriptionHandlerTrackingMetadata = { + receivedBaseData: boolean; + sentSubMessage: boolean; + // using subs for this data means we lose it when the user fully unsubscribes and thus might actually retry more often than expected + // but this is fine since every consumer has subs wrapped in resource managers that prevent lots of churn + lastRetryBecauseErrorMs: number | undefined; + lastRetryBecauseDuplicateMs: number | undefined; +}; +type SubscriptionHandler = SubscriptionHandlerInput & SubscriptionHandlerTrackingMetadata; + +type SubscriptionMap = { + [channel: string]: { + [idOrSpecialString: string]: SubscriptionHandler; + }; +}; + +class SubscriptionManager { + private subscriptions: SubscriptionMap = {}; + + hasSubscription(channel: string, id?: string): boolean { + const normalizedId = id ?? NO_ID_SPECIAL_STRING_ID; + return Boolean(this.subscriptions[channel]?.[normalizedId]); + } + + getSubscription(channel: string, id?: string): SubscriptionHandler | undefined { + const normalizedId = id ?? NO_ID_SPECIAL_STRING_ID; + if (!this.hasSubscription(channel, normalizedId)) { + return undefined; + } + return this.subscriptions[channel]?.[normalizedId]; + } + + addSubscription(handler: SubscriptionHandler): boolean { + const normalizedId = handler.id ?? NO_ID_SPECIAL_STRING_ID; + const channel = handler.channel; + + if (this.hasSubscription(channel, normalizedId)) { + return false; + } + + this.subscriptions[channel] ??= {}; + this.subscriptions[channel][normalizedId] = handler; + return true; + } + + removeSubscription(channel: string, id: string | undefined): boolean { + const normalizedId = id ?? NO_ID_SPECIAL_STRING_ID; + if (!this.hasSubscription(channel, normalizedId)) { + return false; + } + delete this.subscriptions[channel]![normalizedId]; + return true; + } + + getAllSubscriptions(): SubscriptionHandler[] { + return Object.values(this.subscriptions) + .flatMap((channelSubs) => Object.values(channelSubs)) + .filter(isTruthy); + } + + getChannelSubscriptions(channel: string): SubscriptionHandler[] { + return Object.values(this.subscriptions[channel] ?? {}); + } +} + export class IndexerWebsocket { private socket: ReconnectingWebSocket | null = null; - private subscriptions: { - [channel: string]: { - [id: string]: { - channel: string; - id: string | undefined; - batched: boolean; - handleBaseData: (data: any, fullMessage: any) => void; - handleUpdates: (updates: any[], fullMessage: any) => void; - sentSubMessage: boolean; - }; - }; - } = {}; + // for logging purposes, to differentiate when user has many tabs open + private indexerWsId = crypto.randomUUID(); - private lastRetryTimeMsByChannelAndId: { [channelAndId: string]: number } = {}; + private subscriptions = new SubscriptionManager(); constructor(url: string) { this.socket = new ReconnectingWebSocket({ @@ -54,13 +117,7 @@ export class IndexerWebsocket { batched = true, handleUpdates, handleBaseData, - }: { - channel: string; - id: string | undefined; - batched?: boolean; - handleBaseData: (data: any, fullMessage: any) => void; - handleUpdates: (data: any[], fullMessage: any) => void; - }): () => void { + }: SubscriptionHandlerInput): () => void { this._addSub({ channel, id, batched, handleUpdates, handleBaseData }); return () => { this._performUnsub({ channel, id }); @@ -73,34 +130,42 @@ export class IndexerWebsocket { batched = true, handleUpdates, handleBaseData, - }: { - channel: string; - id: string | undefined; - batched?: boolean; - handleBaseData: (data: any, fullMessage: any) => void; - handleUpdates: (data: any[], fullMessage: any) => void; - }) => { - this.subscriptions[channel] ??= {}; - if (this.subscriptions[channel][id ?? NO_ID_SPECIAL_STRING_ID] != null) { - logAbacusTsError('IndexerWebsocket', 'this subscription already exists', `${channel}/${id}`); - throw new Error(`IndexerWebsocket error: this subscription already exists. ${channel}/${id}`); - } - logAbacusTsInfo('IndexerWebsocket', 'adding subscription', { - channel, - id, - socketNonNull: this.socket != null, - socketActive: this.socket?.isActive(), - }); - this.subscriptions[channel][id ?? NO_ID_SPECIAL_STRING_ID] = { + + // below here is any metadata we want to allow maintaining between resubscribes + lastRetryBecauseErrorMs = undefined, + lastRetryBecauseDuplicateMs = undefined, + }: SubscriptionHandlerInput & Partial) => { + const wasSuccessful = this.subscriptions.addSubscription({ channel, id, batched, handleBaseData, handleUpdates, sentSubMessage: false, - }; + receivedBaseData: false, + lastRetryBecauseErrorMs, + lastRetryBecauseDuplicateMs, + }); + + // fails if already exists + if (!wasSuccessful) { + logAbacusTsError('IndexerWebsocket', 'this subscription already exists', { + id: `${channel}/${id}`, + wsId: this.indexerWsId, + }); + return; + } + + logAbacusTsInfo('IndexerWebsocket', 'adding subscription', { + channel, + id, + socketNonNull: this.socket != null, + socketActive: Boolean(this.socket?.isActive()), + wsId: this.indexerWsId, + }); + if (this.socket != null && this.socket.isActive()) { - this.subscriptions[channel][id ?? NO_ID_SPECIAL_STRING_ID]!.sentSubMessage = true; + this.subscriptions.getSubscription(channel, id)!.sentSubMessage = true; this.socket.send({ batched, channel, @@ -110,46 +175,47 @@ export class IndexerWebsocket { } }; - private _performUnsub = ({ channel, id }: { channel: string; id: string | undefined }) => { - if (this.subscriptions[channel] == null) { + private _performUnsub = ( + { channel, id }: { channel: string; id: string | undefined }, + // if true, don't send the unsub message, just remove from the internal data structure + shouldSuppressWsMessage: boolean = false + ) => { + const sub = this.subscriptions.getSubscription(channel, id); + const wasSuccessful = this.subscriptions.removeSubscription(channel, id); + + // only if doesn't exist + if (!wasSuccessful || sub == null) { logAbacusTsError( 'IndexerWebsocket', 'unsubbing from nonexistent or already unsubbed channel', - channel + { channel, id, wsId: this.indexerWsId } ); return; } - if (this.subscriptions[channel][id ?? NO_ID_SPECIAL_STRING_ID] == null) { - logAbacusTsError( - 'IndexerWebsocket', - 'unsubbing from nonexistent or already unsubbed channel', - channel, - id - ); + + if (shouldSuppressWsMessage) { return; } + logAbacusTsInfo('IndexerWebsocket', 'removing subscription', { channel, id, socketNonNull: this.socket != null, - socketActive: this.socket?.isActive(), + socketActive: Boolean(this.socket?.isActive()), + wsId: this.indexerWsId, }); - if ( - this.socket != null && - this.socket.isActive() && - this.subscriptions[channel][id ?? NO_ID_SPECIAL_STRING_ID]!.sentSubMessage - ) { + + if (this.socket != null && this.socket.isActive() && sub.sentSubMessage) { this.socket.send({ channel, id, type: 'unsubscribe', }); } - delete this.subscriptions[channel][id ?? NO_ID_SPECIAL_STRING_ID]; }; - private _refreshSub = (channel: string, id: string) => { - const sub = this.subscriptions[channel]?.[id]; + private _refreshSub = (channel: string, id: string | undefined) => { + const sub = this.subscriptions.getSubscription(channel, id); if (sub == null) { return; } @@ -157,31 +223,100 @@ export class IndexerWebsocket { this._addSub(sub); }; - // if we get a "could not fetch data" error, we retry once as long as this channel+id is not on cooldown - // TODO: remove this entirely when backend is more reliable - private _handleErrorReceived = (message: IndexerWebsocketErrorMessage) => { + private _maybeRetryTimeoutError = (message: IndexerWebsocketErrorMessage): boolean => { if (message.message.startsWith('Internal error, could not fetch data for subscription: ')) { const maybeChannel = message.channel; const maybeId = message.id; - if (maybeChannel != null && maybeId != null && maybeChannel.startsWith('v4_')) { - const channelAndId = `${maybeChannel}${CHANNEL_ID_SAFE_DIVIDER}${maybeId}`; - const lastRefresh = this.lastRetryTimeMsByChannelAndId[channelAndId] ?? 0; - if (Date.now() - lastRefresh > CHANNEL_RETRY_COOLDOWN_MS) { - this.lastRetryTimeMsByChannelAndId[channelAndId] = Date.now(); + if ( + maybeChannel != null && + maybeChannel.startsWith('v4_') && + this.subscriptions.hasSubscription(maybeChannel, maybeId) + ) { + const sub = this.subscriptions.getSubscription(maybeChannel, maybeId)!; + const lastRefresh = sub.lastRetryBecauseErrorMs ?? 0; + const hasBaseData = sub.receivedBaseData; + if (!hasBaseData && Date.now() - lastRefresh > CHANNEL_RETRY_COOLDOWN_MS) { + sub.lastRetryBecauseErrorMs = Date.now(); this._refreshSub(maybeChannel, maybeId); - logAbacusTsInfo( - 'IndexerWebsocket', - 'error fetching data for channel, refetching', + logAbacusTsInfo('IndexerWebsocket', 'error fetching subscription, refetching', { maybeChannel, - maybeId - ); - return; + maybeId, + socketNonNull: this.socket != null, + socketActive: Boolean(this.socket?.isActive()), + wsId: this.indexerWsId, + }); + return true; + } + logAbacusTsError('IndexerWebsocket', 'error fetching subscription, not retrying:', { + maybeChannel, + maybeId, + hasBaseData, + elapsedSinceLast: Date.now() - lastRefresh, + wsId: this.indexerWsId, + }); + return true; + } + } + return false; + }; + + private _maybeFixDuplicateSubError = (message: IndexerWebsocketErrorMessage): boolean => { + if (message.message.startsWith('Invalid subscribe message: already subscribed (')) { + // temp until backend adds metadata + const parsedMatches = message.message.match(/\(([\w_]+)-(.+?)\)/); + const maybeChannel = parsedMatches?.[1]; + + let maybeId = parsedMatches?.[2]; + if (maybeId === maybeChannel) { + maybeId = undefined; + } + + if ( + maybeChannel != null && + maybeChannel.startsWith('v4_') && + this.subscriptions.hasSubscription(maybeChannel, maybeId) + ) { + const sub = this.subscriptions.getSubscription(maybeChannel, maybeId)!; + const lastRefresh = sub.lastRetryBecauseDuplicateMs ?? 0; + const hasBaseData = sub.receivedBaseData; + if (!hasBaseData && Date.now() - lastRefresh > CHANNEL_RETRY_COOLDOWN_MS) { + sub.lastRetryBecauseDuplicateMs = Date.now(); + this._refreshSub(maybeChannel, maybeId); + logAbacusTsInfo('IndexerWebsocket', 'error: subscription already exists, refetching', { + maybeChannel, + maybeId, + socketNonNull: this.socket != null, + socketActive: Boolean(this.socket?.isActive()), + wsId: this.indexerWsId, + }); + return true; } - logAbacusTsError('IndexerWebsocket', 'hit max retries for channel:', maybeChannel, maybeId); - return; + logAbacusTsError('IndexerWebsocket', 'error: subscription already exists, not retrying', { + maybeChannel, + maybeId, + hasBaseData, + elapsedSinceLast: Date.now() - lastRefresh, + wsId: this.indexerWsId, + }); + return true; } } - logAbacusTsError('IndexerWebsocket', 'encountered server side error:', message); + return false; + }; + + // if we get a "could not fetch data" error, we retry once as long as this channel+id is not on cooldown + // TODO: remove this entirely when backend is more reliable + private _handleErrorReceived = (message: IndexerWebsocketErrorMessage) => { + let handled = this._maybeRetryTimeoutError(message); + // this ensures this isn't called if we already retried and tracks if we have handled it yet + handled = handled || this._maybeFixDuplicateSubError(message); + if (handled) { + return; + } + logAbacusTsError('IndexerWebsocket', 'encountered server side error:', { + message, + wsId: this.indexerWsId, + }); }; private _handleMessage = (messagePre: any) => { @@ -195,6 +330,7 @@ export class IndexerWebsocket { logAbacusTsInfo('IndexerWebsocket', `unsubscribe confirmed`, { channel: message.channel, id: message.id, + wsId: this.indexerWsId, }); } else if ( message.type === 'subscribed' || @@ -205,27 +341,16 @@ export class IndexerWebsocket { const channel = message.channel; const id = message.id; - if (this.subscriptions[channel] == null) { - // hide error for channel we expect to see it on - if (channel !== 'v4_orderbook') { - logAbacusTsError( - 'IndexerWebsocket', - 'encountered message with unknown target', - channel, - id - ); - } - return; - } - if (this.subscriptions[channel][id ?? NO_ID_SPECIAL_STRING_ID] == null) { + const sub = this.subscriptions.getSubscription(channel, id); + if (!this.subscriptions.hasSubscription(channel, id) || sub == null) { // hide error for channel we expect to see it on if (channel !== 'v4_orderbook') { - logAbacusTsError( - 'IndexerWebsocket', - 'encountered message with unknown target', + logAbacusTsInfo('IndexerWebsocket', 'encountered message with unknown target', { channel, - id - ); + id, + type: message.type, + wsId: this.indexerWsId, + }); } return; } @@ -233,22 +358,39 @@ export class IndexerWebsocket { logAbacusTsInfo('IndexerWebsocket', `subscription confirmed`, { channel, id, + wsId: this.indexerWsId, }); - this.subscriptions[channel][id ?? NO_ID_SPECIAL_STRING_ID]!.handleBaseData( - message.contents, - message - ); + sub.receivedBaseData = true; + sub.handleBaseData(message.contents, message); } else if (message.type === 'channel_data') { - this.subscriptions[channel][id ?? NO_ID_SPECIAL_STRING_ID]!.handleUpdates( - [message.contents], - message - ); + if (!sub.receivedBaseData) { + logAbacusTsError( + 'IndexerWebsocket', + 'message received before subscription confirmed, hiding', + { + channel, + id, + wsId: this.indexerWsId, + } + ); + return; + } + sub.handleUpdates([message.contents], message); // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition } else if (message.type === 'channel_batch_data') { - this.subscriptions[channel][id ?? NO_ID_SPECIAL_STRING_ID]!.handleUpdates( - message.contents, - message - ); + if (!sub.receivedBaseData) { + logAbacusTsError( + 'IndexerWebsocket', + 'message received before subscription confirmed, hiding', + { + channel, + id, + wsId: this.indexerWsId, + } + ); + return; + } + sub.handleUpdates(message.contents, message); } else { assertNever(message); } @@ -256,37 +398,34 @@ export class IndexerWebsocket { assertNever(message); } } catch (e) { - logAbacusTsError('IndexerWebsocket', 'Error handling websocket message', messagePre, e); + logAbacusTsError('IndexerWebsocket', 'Error handling websocket message', { + messagePre, + wsId: this.indexerWsId, + error: e, + }); } }; // when websocket churns, reconnect all known subscribers private _handleFreshConnect = () => { logAbacusTsInfo('IndexerWebsocket', 'freshly connected', { + socketUrl: this.socket?.url, + wsId: this.indexerWsId, socketNonNull: this.socket != null, - socketActive: this.socket?.isActive(), - numSubs: Object.values(this.subscriptions) - .flatMap((o) => Object.values(o)) - .filter(isTruthy).length, + socketActive: Boolean(this.socket?.isActive()), + subs: this.subscriptions.getAllSubscriptions().map((o) => `${o.channel}///${o.id}`), }); if (this.socket != null && this.socket.isActive()) { - Object.values(this.subscriptions) - .filter(isTruthy) - .flatMap((o) => Object.values(o)) - .filter(isTruthy) - .forEach(({ batched, channel, id }) => { - this.subscriptions[channel]![id ?? NO_ID_SPECIAL_STRING_ID]!.sentSubMessage = true; - this.socket!.send({ - batched, - channel, - id, - type: 'subscribe', - }); - }); + this.subscriptions.getAllSubscriptions().forEach(({ channel, id }) => { + const sub = this.subscriptions.getSubscription(channel, id)!; + this._performUnsub(sub, true); + this._addSub(sub); + }); } else { logAbacusTsError( 'IndexerWebsocket', - "handle fresh connect called when websocket isn't ready." + "handle fresh connect called when websocket isn't ready.", + { wsId: this.indexerWsId } ); } }; diff --git a/src/abacus-ts/websocket/lib/reconnectingWebsocket.ts b/src/abacus-ts/websocket/lib/reconnectingWebsocket.ts index 05189eb0e..784e05b99 100644 --- a/src/abacus-ts/websocket/lib/reconnectingWebsocket.ts +++ b/src/abacus-ts/websocket/lib/reconnectingWebsocket.ts @@ -1,5 +1,5 @@ /* eslint-disable max-classes-per-file */ -import { logAbacusTsError } from '@/abacus-ts/logs'; +import { logAbacusTsError, logAbacusTsInfo } from '@/abacus-ts/logs'; interface ReconnectingWebSocketConfig { url: string; @@ -11,7 +11,7 @@ interface ReconnectingWebSocketConfig { } export class ReconnectingWebSocket { - private readonly url: string; + public readonly url: string; private readonly handleMessage: (data: any) => void; @@ -154,7 +154,7 @@ class WebSocketConnection { this.ws = new WebSocket(url); this.setupEventHandlers(); } catch (error) { - logAbacusTsError('WebSocketConnection', 'error connecting', error); + logAbacusTsError('WebSocketConnection', 'error connecting', { error }); this.close(); // we don't rethrow because we instead call the handleClose method } @@ -169,7 +169,7 @@ class WebSocketConnection { const data = JSON.parse(event.data); this.handleMessage(this.id, data); } catch (e) { - logAbacusTsError('WebSocketConnection', 'error in handler', e); + logAbacusTsError('WebSocketConnection', 'error in handler', { data: event.data, error: e }); } }; @@ -180,7 +180,7 @@ class WebSocketConnection { try { this.handleConnected(this.id); } catch (e) { - logAbacusTsError('WebSocketConnection', 'error in handleConnected', e); + logAbacusTsError('WebSocketConnection', 'error in handleConnected', { error: e }); } }; @@ -205,6 +205,12 @@ class WebSocketConnection { reason: close.reason, clean: close.wasClean, }); + } else { + logAbacusTsInfo('WebSocketConnection', `socket ${this.id} closed`, { + code: close.code, + reason: close.reason, + clean: close.wasClean, + }); } this.close(); }; @@ -218,7 +224,7 @@ class WebSocketConnection { this.ws?.close(); this.ws = null; } catch (e) { - logAbacusTsError('WebSocketConnection', 'error closing socket', e); + logAbacusTsError('WebSocketConnection', 'error closing socket', { error: e }); } } @@ -239,7 +245,7 @@ class WebSocketConnection { const message = typeof data === 'string' ? data : JSON.stringify(data); this.ws!.send(message); } catch (e) { - logAbacusTsError('WebSocketConnection', 'error sending data', e, data); + logAbacusTsError('WebSocketConnection', 'error sending data', { error: e, data }); } } } diff --git a/src/abacus-ts/websocket/lib/websocketDerivedValue.ts b/src/abacus-ts/websocket/lib/websocketDerivedValue.ts index 53912a015..8dceb2592 100644 --- a/src/abacus-ts/websocket/lib/websocketDerivedValue.ts +++ b/src/abacus-ts/websocket/lib/websocketDerivedValue.ts @@ -25,6 +25,7 @@ export class WebsocketDerivedValue { this.unsubFromWs = websocket.addChannelSubscription({ channel: sub.channel, id: sub.id, + batched: true, handleBaseData: (data, fullMessage) => this._setValue(sub.handleBaseData(data, this.value, fullMessage)), handleUpdates: (updates, fullMessage) => diff --git a/src/abacus-ts/websocket/orderbook.ts b/src/abacus-ts/websocket/orderbook.ts index be5068198..e15bbf346 100644 --- a/src/abacus-ts/websocket/orderbook.ts +++ b/src/abacus-ts/websocket/orderbook.ts @@ -45,7 +45,9 @@ function orderbookWebsocketValueCreator( const updates = isWsOrderbookUpdateResponses(baseUpdates); let startingValue = value.data; if (startingValue == null) { - logAbacusTsError('OrderbookTracker', 'found unexpectedly null base data in update'); + logAbacusTsError('OrderbookTracker', 'found unexpectedly null base data in update', { + marketId, + }); return value; } startingValue = { asks: { ...startingValue.asks }, bids: { ...startingValue.bids } }; diff --git a/src/abacus-ts/websocket/parentSubaccount.ts b/src/abacus-ts/websocket/parentSubaccount.ts index abe54f743..851279c86 100644 --- a/src/abacus-ts/websocket/parentSubaccount.ts +++ b/src/abacus-ts/websocket/parentSubaccount.ts @@ -5,7 +5,6 @@ import { IndexerAssetPositionResponseObject, IndexerOrderResponseObject, IndexerPerpetualPositionResponseObject, - IndexerSubaccountResponseObject, } from '@/types/indexer/indexerApiGen'; import { isWsParentSubaccountSubscribed, @@ -23,49 +22,18 @@ import { MustBigNumber } from '@/lib/numbers'; import { accountRefreshSignal } from '../accountRefreshSignal'; import { createStoreEffect } from '../lib/createStoreEffect'; import { Loadable, loadableIdle, loadableLoaded, loadablePending } from '../lib/loadable'; +import { + convertToStoredChildSubaccount, + freshChildSubaccount, + isValidSubaccount, +} from '../lib/subaccountUtils'; import { logAbacusTsError } from '../logs'; import { selectParentSubaccountInfo, selectWebsocketUrl } from '../socketSelectors'; -import { ChildSubaccountData, ParentSubaccountData } from '../types/rawTypes'; +import { ParentSubaccountData } from '../types/rawTypes'; import { makeWsValueManager, subscribeToWsValue } from './lib/indexerValueManagerHelpers'; import { IndexerWebsocket } from './lib/indexerWebsocket'; import { WebsocketDerivedValue } from './lib/websocketDerivedValue'; -function isValidSubaccount(childSubaccount: IndexerSubaccountResponseObject) { - return ( - Object.keys(childSubaccount.assetPositions).length > 0 || - Object.keys(childSubaccount.openPerpetualPositions).length > 0 - ); -} - -function convertToStoredChildSubaccount({ - address, - subaccountNumber, - assetPositions, - openPerpetualPositions, -}: IndexerSubaccountResponseObject): ChildSubaccountData { - return { - address, - subaccountNumber, - assetPositions, - openPerpetualPositions, - }; -} - -function freshChildSubaccount({ - address, - subaccountNumber, -}: { - address: string; - subaccountNumber: number; -}): ChildSubaccountData { - return { - address, - subaccountNumber, - assetPositions: {}, - openPerpetualPositions: {}, - }; -} - interface AccountValueArgsBase { address: string; parentSubaccountNumber: string; @@ -120,7 +88,8 @@ function accountWebsocketValueCreator( if (value.data == null) { logAbacusTsError( 'ParentSubaccountTracker', - 'found unexpectedly null base data in update' + 'found unexpectedly null base data in update', + { address, subaccountNumber } ); return value; } @@ -140,13 +109,13 @@ function accountWebsocketValueCreator( const assetPositions = returnValue.childSubaccounts[positionUpdate.subaccountNumber]!.assetPositions; - if (assetPositions[positionUpdate.assetId] == null) { - assetPositions[positionUpdate.assetId] = + if (assetPositions[positionUpdate.symbol] == null) { + assetPositions[positionUpdate.symbol] = positionUpdate as IndexerAssetPositionResponseObject; } else { - assetPositions[positionUpdate.assetId] = { + assetPositions[positionUpdate.symbol] = { ...(assetPositions[ - positionUpdate.assetId + positionUpdate.symbol ] as IndexerAssetPositionResponseObject), ...positionUpdate, }; diff --git a/src/abacus-ts/websocket/trades.ts b/src/abacus-ts/websocket/trades.ts index b423a48ca..04de9d4d5 100644 --- a/src/abacus-ts/websocket/trades.ts +++ b/src/abacus-ts/websocket/trades.ts @@ -3,6 +3,7 @@ import { useEffect, useState } from 'react'; import { orderBy } from 'lodash'; import { isWsTradesResponse, isWsTradesUpdateResponses } from '@/types/indexer/indexerChecks'; +import { IndexerWsTradesUpdateObject } from '@/types/indexer/indexerManual'; import { useAppSelector } from '@/state/appTypes'; import { getCurrentMarketIdIfTradeable } from '@/state/perpetualsSelectors'; @@ -10,8 +11,8 @@ import { getCurrentMarketIdIfTradeable } from '@/state/perpetualsSelectors'; import { mergeById } from '@/lib/mergeById'; import { Loadable, loadableIdle, loadableLoaded, loadablePending } from '../lib/loadable'; +import { logAbacusTsError } from '../logs'; import { selectWebsocketUrl } from '../socketSelectors'; -import { TradesData } from '../types/rawTypes'; import { makeWsValueManager, subscribeToWsValue } from './lib/indexerValueManagerHelpers'; import { IndexerWebsocket } from './lib/indexerWebsocket'; import { WebsocketDerivedValue } from './lib/websocketDerivedValue'; @@ -22,7 +23,7 @@ function tradesWebsocketValueCreator( websocket: IndexerWebsocket, { marketId }: { marketId: string } ) { - return new WebsocketDerivedValue>( + return new WebsocketDerivedValue>( websocket, { channel: 'v4_trades', @@ -37,8 +38,9 @@ function tradesWebsocketValueCreator( const updates = isWsTradesUpdateResponses(baseUpdates); const startingValue = value.data; if (startingValue == null) { - // eslint-disable-next-line no-console - console.log('MarketsTracker found unexpectedly null base data in update'); + logAbacusTsError('TradesTracker', 'found unexpectedly null base data in update', { + marketId, + }); return value; } const allNewTrades = updates.flatMap((u) => u.trades).toReversed(); @@ -58,7 +60,7 @@ export function useCurrentMarketTradesValue() { const currentMarketId = useAppSelector(getCurrentMarketIdIfTradeable); // useSyncExternalStore is better but the API doesn't fit this use case very well - const [trades, setTrades] = useState>(loadableIdle()); + const [trades, setTrades] = useState>(loadableIdle()); useEffect(() => { if (currentMarketId == null) { diff --git a/src/constants/layout.ts b/src/constants/layout.ts index d1c286f0a..6b0c1222a 100644 --- a/src/constants/layout.ts +++ b/src/constants/layout.ts @@ -3,5 +3,5 @@ export enum TradeLayouts { Reverse = 'Reverse', } -export const HORIZONTAL_PANEL_MIN_HEIGHT = 250; +export const HORIZONTAL_PANEL_MIN_HEIGHT = 200; export const HORIZONTAL_PANEL_MAX_HEIGHT = 700; diff --git a/src/hooks/useInitializePage.ts b/src/hooks/useInitializePage.ts index 6c9a740ff..6b5523315 100644 --- a/src/hooks/useInitializePage.ts +++ b/src/hooks/useInitializePage.ts @@ -1,5 +1,7 @@ import { useEffect, useRef } from 'react'; +// eslint-disable-next-line no-restricted-imports +import { logAbacusTsInfo } from '@/abacus-ts/logs'; // eslint-disable-next-line no-restricted-imports import { IndexerWebsocketManager } from '@/abacus-ts/websocket/lib/indexerWebsocketManager'; @@ -50,6 +52,7 @@ export const useInitializePage = () => { // reconnect abacus (reestablish connections to indexer, validator etc.) if app was hidden for more than 10 seconds abacusStateManager.restart({ network: localStorageNetwork }); IndexerWebsocketManager.getActiveResources().forEach((r) => r.restart()); + logAbacusTsInfo('useInitializePage', 'restarting because visibility change'); } hiddenTimeRef.current = null; } @@ -66,6 +69,7 @@ export const useInitializePage = () => { const handleOnline = () => { abacusStateManager.restart({ network: localStorageNetwork }); IndexerWebsocketManager.getActiveResources().forEach((r) => r.restart()); + logAbacusTsInfo('useInitializePage', 'restarting because network status change'); }; document.addEventListener('online', handleOnline); return () => { diff --git a/src/lib/analytics/datadog.ts b/src/lib/analytics/datadog.ts index f296d2f4f..6931f41d6 100644 --- a/src/lib/analytics/datadog.ts +++ b/src/lib/analytics/datadog.ts @@ -7,6 +7,7 @@ const PROXY_URL = import.meta.env.VITE_DATADOG_PROXY_URL; const SERVICE_NAME = 'v4-web'; const LOGGER_NAME = 'v4-web'; const SITE_NAME = 'datadoghq.com'; +const instanceId = crypto.randomUUID(); const LOG_ENDPOINT_PATH = (PROXY_URL ?? '').endsWith('/') ? 'api/v2/logs' : '/api/v2/logs'; @@ -19,13 +20,15 @@ if (CLIENT_TOKEN) { sessionSampleRate: 100, env: CURRENT_MODE, proxy: PROXY_URL ? `${PROXY_URL}${LOG_ENDPOINT_PATH}` : undefined, + sendLogsAfterSessionExpiration: true, }); } datadogLogs.createLogger(LOGGER_NAME); -const datadogLogger = datadogLogs.getLogger(LOGGER_NAME)!!; +const datadogLogger = datadogLogs.getLogger(LOGGER_NAME)!; datadogLogger.setContextProperty('dd-client-token', CLIENT_TOKEN); +datadogLogger.setContextProperty('instance-id', instanceId); /** * TODO: make a logger wrapper that enables us also log to the console diff --git a/src/lib/tradingView/dydxfeed/index.ts b/src/lib/tradingView/dydxfeed/index.ts index 4db1c05f0..a8e75c80b 100644 --- a/src/lib/tradingView/dydxfeed/index.ts +++ b/src/lib/tradingView/dydxfeed/index.ts @@ -200,6 +200,8 @@ export const getDydxDatafeed = ( onResetCacheNeededCallback: Function ) => { subscribeOnStream({ + store, + orderbookCandlesToggleOn, symbolInfo, resolution, onRealtimeCallback: onTick, diff --git a/src/lib/tradingView/dydxfeed/streaming.ts b/src/lib/tradingView/dydxfeed/streaming.ts index ae830fea2..cd5415cf0 100644 --- a/src/lib/tradingView/dydxfeed/streaming.ts +++ b/src/lib/tradingView/dydxfeed/streaming.ts @@ -6,6 +6,8 @@ import type { import { RESOLUTION_MAP } from '@/constants/candles'; +import { type RootStore } from '@/state/_store'; + import abacusStateManager from '@/lib/abacus'; import { subscriptionsByChannelId } from './cache'; @@ -16,6 +18,8 @@ export const subscribeOnStream = ({ onRealtimeCallback, listenerGuid, }: { + store: RootStore; + orderbookCandlesToggleOn: boolean; symbolInfo: LibrarySymbolInfo; resolution: ResolutionString; onRealtimeCallback: SubscribeBarsCallback; diff --git a/src/types/indexer/indexerChecks.ts b/src/types/indexer/indexerChecks.ts index 7c646d69e..7ac377105 100644 --- a/src/types/indexer/indexerChecks.ts +++ b/src/types/indexer/indexerChecks.ts @@ -9,6 +9,8 @@ import { import { IndexerCompositeFillResponse, IndexerCompositeOrderObject, + IndexerWsCandleResponse, + IndexerWsCandleResponseObject, IndexerWsMarketUpdateResponse, IndexerWsOrderbookUpdateResponse, IndexerWsParentSubaccountSubscribedResponse, @@ -28,6 +30,8 @@ export const isWsOrderbookUpdateResponses = typia.createAssert(); export const isWsTradesResponse = typia.createAssert(); export const isWsTradesUpdateResponses = typia.createAssert(); +export const isWsCandlesResponse = typia.createAssert(); +export const isWsCandlesUpdateResponse = typia.createAssert(); export const isParentSubaccountFillResponse = typia.createAssert(); export const isParentSubaccountOrders = typia.createAssert(); export const isParentSubaccountTransferResponse = diff --git a/src/types/indexer/indexerManual.ts b/src/types/indexer/indexerManual.ts index de91a38d8..2fc356fa9 100644 --- a/src/types/indexer/indexerManual.ts +++ b/src/types/indexer/indexerManual.ts @@ -4,6 +4,7 @@ import { IndexerAPIOrderStatus, IndexerAPITimeInForce, IndexerAssetPositionResponseObject, + IndexerCandleResponseObject, IndexerFillType, IndexerHistoricalBlockTradingReward, IndexerIsoString, @@ -141,7 +142,7 @@ export interface IndexerWsParentSubaccountSubscribedResponse { export type IndexerWsAssetUpdate = Partial & { subaccountNumber: number; - assetId: string; + symbol: string; }; export type IndexerWsPositionUpdate = Partial & { subaccountNumber: number; @@ -164,6 +165,13 @@ export interface IndexerWsParentSubaccountUpdateObject { transfers?: IndexerTransferCommonResponseObject; } +// hacking around backend types not quite matching what the websocket sends +export type IndexerWsTradeResponseObject = PartialBy; export interface IndexerWsTradesUpdateObject { - trades: PartialBy[]; + trades: IndexerWsTradeResponseObject[]; +} + +export type IndexerWsCandleResponseObject = Omit; +export interface IndexerWsCandleResponse { + candles: Array; } diff --git a/src/views/dialogs/DepositDialog2/queries.ts b/src/views/dialogs/DepositDialog2/queries.ts index 2390707a8..e0e295121 100644 --- a/src/views/dialogs/DepositDialog2/queries.ts +++ b/src/views/dialogs/DepositDialog2/queries.ts @@ -160,7 +160,7 @@ export function useDepositDeltas({ depositAmount }: { depositAmount: string }) { ); const modifiedParentSubaccount = useParameterizedSelector( - BonsaiHelpers.forms.deposit.parentSubaccountSummary, + BonsaiHelpers.forms.deposit.createSelectParentSubaccountSummary, depositInput ); diff --git a/src/views/dialogs/WithdrawDialog2/queries.ts b/src/views/dialogs/WithdrawDialog2/queries.ts index 76e969026..01642df9e 100644 --- a/src/views/dialogs/WithdrawDialog2/queries.ts +++ b/src/views/dialogs/WithdrawDialog2/queries.ts @@ -68,7 +68,7 @@ export function useWithdrawalDeltas({ withdrawAmount }: { withdrawAmount: string ); const modifiedParentSubaccount = useParameterizedSelector( - BonsaiHelpers.forms.withdraw.parentSubaccountSummary, + BonsaiHelpers.forms.withdraw.createSelectParentSubaccountSummary, withdrawInput );