Skip to content

Commit

Permalink
feat(bonsai-core): candles (#1433)
Browse files Browse the repository at this point in the history
  • Loading branch information
tyleroooo authored Jan 15, 2025
1 parent 5218d74 commit 143c5ab
Show file tree
Hide file tree
Showing 10 changed files with 189 additions and 21 deletions.
8 changes: 6 additions & 2 deletions src/abacus-ts/lib/createStoreEffect.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,17 @@ export function createStoreEffect<T>(
lastValue = thisValue;
// NOTE: some cleanups dispatch actions which cause this to happen recursively.
// we must ensure that those actions don't change the state they subscribe to or this will go infinitely
lastCleanup?.();
const lastCleanupCached = lastCleanup;
lastCleanup = undefined;
lastCleanupCached?.();
lastCleanup = handleChange(thisValue);
}
});

return () => {
lastCleanup?.();
const lastCleanupCached = lastCleanup;
lastCleanup = undefined;
lastCleanupCached?.();
removeStoreListener();
};
}
8 changes: 0 additions & 8 deletions src/abacus-ts/types/rawTypes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ import {
IndexerAssetPositionResponseObject,
IndexerHistoricalBlockTradingReward,
IndexerPerpetualPositionResponseObject,
IndexerTradeResponseObject,
} from '@/types/indexer/indexerApiGen';
import {
IndexerCompositeFillObject,
Expand All @@ -12,8 +11,6 @@ import {
IndexerWsBaseMarketObject,
} from '@/types/indexer/indexerManual';

import { PartialBy } from '@/lib/typeUtils';

export type MarketsData = { [marketId: string]: IndexerWsBaseMarketObject };
export type OrdersData = { [orderId: string]: IndexerCompositeOrderObject };

Expand All @@ -22,11 +19,6 @@ export type OrderbookData = {
asks: { [price: string]: string };
};

export type BaseTrade = PartialBy<IndexerTradeResponseObject, 'createdAtHeight'>;
export type TradesData = {
trades: Array<BaseTrade>;
};

export interface ParentSubaccountData {
address: string;
parentSubaccount: number;
Expand Down
11 changes: 6 additions & 5 deletions src/abacus-ts/types/summaryTypes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@ import {
IndexerOrderType,
IndexerPerpetualPositionResponseObject,
} from '@/types/indexer/indexerApiGen';
import { IndexerWsBaseMarketObject } from '@/types/indexer/indexerManual';

import { BaseTrade } from './rawTypes';
import {
IndexerWsBaseMarketObject,
IndexerWsTradeResponseObject,
} from '@/types/indexer/indexerManual';

type ReplaceBigNumberInUnion<T> = T extends string ? BigNumber : T;

Expand Down Expand Up @@ -147,6 +148,8 @@ export type SubaccountOrder = {
marginMode: MarginMode | undefined;
};

export type LiveTrade = IndexerWsTradeResponseObject;

export type PendingIsolatedPosition = {
marketId: string;
displayId: string;
Expand All @@ -155,5 +158,3 @@ export type PendingIsolatedPosition = {
equity: BigNumber;
orders: SubaccountOrder[];
};

export type LiveTrade = BaseTrade;
51 changes: 51 additions & 0 deletions src/abacus-ts/websocket/candles.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import { orderBy } from 'lodash';

import { isWsCandlesResponse, isWsCandlesUpdateResponse } from '@/types/indexer/indexerChecks';
import { IndexerWsCandleResponse } from '@/types/indexer/indexerManual';

import { Loadable, loadableLoaded, loadablePending } from '../lib/loadable';
import { logAbacusTsError } from '../logs';
import { makeWsValueManager } from './lib/indexerValueManagerHelpers';
import { IndexerWebsocket } from './lib/indexerWebsocket';
import { WebsocketDerivedValue } from './lib/websocketDerivedValue';

function candlesWebsocketValueCreator(
websocket: IndexerWebsocket,
{ marketIdAndResolution }: { marketIdAndResolution: string }
) {
return new WebsocketDerivedValue<Loadable<IndexerWsCandleResponse>>(
websocket,
{
channel: 'v4_candles',
id: marketIdAndResolution,
handleBaseData: (baseMessage) => {
const message = isWsCandlesResponse(baseMessage);
return loadableLoaded({
candles: orderBy(message.candles, [(a) => a.startedAt], ['asc']),
});
},
handleUpdates: (baseUpdates, value) => {
const updates = isWsCandlesUpdateResponse(baseUpdates);
const startingValue = value.data;
if (startingValue == null) {
logAbacusTsError('CandlesTracker', 'found unexpectedly null base data in update');
return value;
}
if (startingValue.candles.length === 0) {
return loadableLoaded({ candles: updates });
}

const allNewTimes = new Set(updates.map(({ startedAt }) => startedAt));
const newArr = [
...updates,
...startingValue.candles.filter(({ startedAt }) => !allNewTimes.has(startedAt)),
];
const sorted = orderBy(newArr, [(a) => a.startedAt], ['asc']);
return loadableLoaded({ candles: sorted });
},
},
loadablePending()
);
}

export const CandlesValuesManager = makeWsValueManager(candlesWebsocketValueCreator);
100 changes: 100 additions & 0 deletions src/abacus-ts/websocket/candlesForTradingView.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
import { createStoreEffect } from '@/abacus-ts/lib/createStoreEffect';
import { selectWebsocketUrl } from '@/abacus-ts/socketSelectors';
import { CandlesValuesManager } from '@/abacus-ts/websocket/candles';
import { subscribeToWsValue } from '@/abacus-ts/websocket/lib/indexerValueManagerHelpers';
import type {
LibrarySymbolInfo,
ResolutionString,
SubscribeBarsCallback,
} from 'public/tradingview/charting_library';

import { CandleResolution, RESOLUTION_MAP } from '@/constants/candles';

import { type RootStore } from '@/state/_store';

import { mapCandle } from '../../lib/tradingView/utils';

export const subscriptionsByGuid: {
[guid: string]:
| {
guid: string;
unsub: () => void;
}
| undefined;
} = {};

export const subscribeOnStream = ({
store,
orderbookCandlesToggleOn,
symbolInfo,
resolution,
onRealtimeCallback,
listenerGuid,
onResetCacheNeededCallback,
}: {
store: RootStore;
orderbookCandlesToggleOn: boolean;
symbolInfo: LibrarySymbolInfo;
resolution: ResolutionString;
onRealtimeCallback: SubscribeBarsCallback;
listenerGuid: string;
onResetCacheNeededCallback: Function;
}) => {
if (!symbolInfo.ticker) return;

const channelId = `${symbolInfo.ticker}/${RESOLUTION_MAP[resolution]}`;
let isFirstRun = true;

const tearDown = createStoreEffect(store, selectWebsocketUrl, (wsUrl) => {
// if the websocket url changes, force a refresh
if (isFirstRun) {
isFirstRun = false;
} else {
setTimeout(() => onResetCacheNeededCallback(), 0);
}

let mostRecentFirstPointStartedAt: string | undefined;
const unsub = subscribeToWsValue(
CandlesValuesManager,
{ wsUrl, marketIdAndResolution: channelId },
({ data }) => {
if (data == null || data.candles.length === 0) {
return;
}
// if we've never seen data before, it's either the existing data or the subscribed message
// either way, we take it as the basis and only send further updates
// there is a small race condition where messages could be missed between
// when trandingview does the rest query and when we start receiving updates
if (mostRecentFirstPointStartedAt == null) {
mostRecentFirstPointStartedAt = data.candles.at(-1)?.startedAt;
return;
}
data.candles.forEach((candle) => {
if (candle.startedAt >= mostRecentFirstPointStartedAt!) {
onRealtimeCallback(
mapCandle(orderbookCandlesToggleOn)({
...candle,
resolution: candle.resolution as unknown as CandleResolution,
orderbookMidPriceClose: candle.orderbookMidPriceClose ?? undefined,
orderbookMidPriceOpen: candle.orderbookMidPriceOpen ?? undefined,
})
);
}
});
mostRecentFirstPointStartedAt = data.candles.at(-1)?.startedAt;
}
);

// happens on network change or unsubscribe from stream
return () => {
unsub();
};
});

subscriptionsByGuid[listenerGuid] = { guid: listenerGuid, unsub: tearDown };
};

export const unsubscribeFromStream = (subscriberUID: string) => {
subscriptionsByGuid[subscriberUID]?.unsub();
subscriptionsByGuid[subscriberUID] = undefined;
};
12 changes: 7 additions & 5 deletions src/abacus-ts/websocket/trades.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,16 @@ import { useEffect, useState } from 'react';
import { orderBy } from 'lodash';

import { isWsTradesResponse, isWsTradesUpdateResponses } from '@/types/indexer/indexerChecks';
import { IndexerWsTradesUpdateObject } from '@/types/indexer/indexerManual';

import { useAppSelector } from '@/state/appTypes';
import { getCurrentMarketIdIfTradeable } from '@/state/perpetualsSelectors';

import { mergeById } from '@/lib/mergeById';

import { Loadable, loadableIdle, loadableLoaded, loadablePending } from '../lib/loadable';
import { logAbacusTsError } from '../logs';
import { selectWebsocketUrl } from '../socketSelectors';
import { TradesData } from '../types/rawTypes';
import { makeWsValueManager, subscribeToWsValue } from './lib/indexerValueManagerHelpers';
import { IndexerWebsocket } from './lib/indexerWebsocket';
import { WebsocketDerivedValue } from './lib/websocketDerivedValue';
Expand All @@ -22,7 +23,7 @@ function tradesWebsocketValueCreator(
websocket: IndexerWebsocket,
{ marketId }: { marketId: string }
) {
return new WebsocketDerivedValue<Loadable<TradesData>>(
return new WebsocketDerivedValue<Loadable<IndexerWsTradesUpdateObject>>(
websocket,
{
channel: 'v4_trades',
Expand All @@ -37,8 +38,9 @@ function tradesWebsocketValueCreator(
const updates = isWsTradesUpdateResponses(baseUpdates);
const startingValue = value.data;
if (startingValue == null) {
// eslint-disable-next-line no-console
console.log('MarketsTracker found unexpectedly null base data in update', { marketId });
logAbacusTsError('TradesTracker', 'found unexpectedly null base data in update', {
marketId,
});
return value;
}
const allNewTrades = updates.flatMap((u) => u.trades).toReversed();
Expand All @@ -58,7 +60,7 @@ export function useCurrentMarketTradesValue() {
const currentMarketId = useAppSelector(getCurrentMarketIdIfTradeable);

// useSyncExternalStore is better but the API doesn't fit this use case very well
const [trades, setTrades] = useState<Loadable<TradesData>>(loadableIdle());
const [trades, setTrades] = useState<Loadable<IndexerWsTradesUpdateObject>>(loadableIdle());

useEffect(() => {
if (currentMarketId == null) {
Expand Down
2 changes: 2 additions & 0 deletions src/lib/tradingView/dydxfeed/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,8 @@ export const getDydxDatafeed = (
onResetCacheNeededCallback: Function
) => {
subscribeOnStream({
store,
orderbookCandlesToggleOn,
symbolInfo,
resolution,
onRealtimeCallback: onTick,
Expand Down
4 changes: 4 additions & 0 deletions src/lib/tradingView/dydxfeed/streaming.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import type {

import { RESOLUTION_MAP } from '@/constants/candles';

import { type RootStore } from '@/state/_store';

import abacusStateManager from '@/lib/abacus';

import { subscriptionsByChannelId } from './cache';
Expand All @@ -16,6 +18,8 @@ export const subscribeOnStream = ({
onRealtimeCallback,
listenerGuid,
}: {
store: RootStore;
orderbookCandlesToggleOn: boolean;
symbolInfo: LibrarySymbolInfo;
resolution: ResolutionString;
onRealtimeCallback: SubscribeBarsCallback;
Expand Down
4 changes: 4 additions & 0 deletions src/types/indexer/indexerChecks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import {
import {
IndexerCompositeFillResponse,
IndexerCompositeOrderObject,
IndexerWsCandleResponse,
IndexerWsCandleResponseObject,
IndexerWsMarketUpdateResponse,
IndexerWsOrderbookUpdateResponse,
IndexerWsParentSubaccountSubscribedResponse,
Expand All @@ -28,6 +30,8 @@ export const isWsOrderbookUpdateResponses =
typia.createAssert<IndexerWsOrderbookUpdateResponse[]>();
export const isWsTradesResponse = typia.createAssert<IndexerTradeResponse>();
export const isWsTradesUpdateResponses = typia.createAssert<IndexerWsTradesUpdateObject[]>();
export const isWsCandlesResponse = typia.createAssert<IndexerWsCandleResponse>();
export const isWsCandlesUpdateResponse = typia.createAssert<IndexerWsCandleResponseObject[]>();
export const isParentSubaccountFillResponse = typia.createAssert<IndexerCompositeFillResponse>();
export const isParentSubaccountOrders = typia.createAssert<IndexerCompositeOrderObject[]>();
export const isParentSubaccountTransferResponse =
Expand Down
10 changes: 9 additions & 1 deletion src/types/indexer/indexerManual.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import {
IndexerAPIOrderStatus,
IndexerAPITimeInForce,
IndexerAssetPositionResponseObject,
IndexerCandleResponseObject,
IndexerFillType,
IndexerHistoricalBlockTradingReward,
IndexerIsoString,
Expand Down Expand Up @@ -164,6 +165,13 @@ export interface IndexerWsParentSubaccountUpdateObject {
transfers?: IndexerTransferCommonResponseObject;
}

// hacking around backend types not quite matching what the websocket sends
export type IndexerWsTradeResponseObject = PartialBy<IndexerTradeResponseObject, 'createdAtHeight'>;
export interface IndexerWsTradesUpdateObject {
trades: PartialBy<IndexerTradeResponseObject, 'createdAtHeight'>[];
trades: IndexerWsTradeResponseObject[];
}

export type IndexerWsCandleResponseObject = Omit<IndexerCandleResponseObject, 'id'>;
export interface IndexerWsCandleResponse {
candles: Array<IndexerWsCandleResponseObject>;
}

0 comments on commit 143c5ab

Please sign in to comment.