Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(bonsai-core): candles #1433

Merged
merged 7 commits into from
Jan 15, 2025
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions src/abacus-ts/lib/createStoreEffect.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,17 @@ export function createStoreEffect<T>(
lastValue = thisValue;
// NOTE: some cleanups dispatch actions which cause this to happen recursively.
// we must ensure that those actions don't change the state they subscribe to or this will go infinitely
lastCleanup?.();
const lastCleanupCached = lastCleanup;
lastCleanup = undefined;
lastCleanupCached?.();
lastCleanup = handleChange(thisValue);
}
});

return () => {
lastCleanup?.();
const lastCleanupCached = lastCleanup;
lastCleanup = undefined;
lastCleanupCached?.();
removeStoreListener();
};
}
8 changes: 0 additions & 8 deletions src/abacus-ts/types/rawTypes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ import {
IndexerAssetPositionResponseObject,
IndexerHistoricalBlockTradingReward,
IndexerPerpetualPositionResponseObject,
IndexerTradeResponseObject,
} from '@/types/indexer/indexerApiGen';
import {
IndexerCompositeFillObject,
Expand All @@ -12,8 +11,6 @@ import {
IndexerWsBaseMarketObject,
} from '@/types/indexer/indexerManual';

import { PartialBy } from '@/lib/typeUtils';

export type MarketsData = { [marketId: string]: IndexerWsBaseMarketObject };
export type OrdersData = { [orderId: string]: IndexerCompositeOrderObject };

Expand All @@ -22,11 +19,6 @@ export type OrderbookData = {
asks: { [price: string]: string };
};

export type BaseTrade = PartialBy<IndexerTradeResponseObject, 'createdAtHeight'>;
export type TradesData = {
trades: Array<BaseTrade>;
};

export interface ParentSubaccountData {
address: string;
parentSubaccount: number;
Expand Down
9 changes: 5 additions & 4 deletions src/abacus-ts/types/summaryTypes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@ import {
IndexerOrderType,
IndexerPerpetualPositionResponseObject,
} from '@/types/indexer/indexerApiGen';
import { IndexerWsBaseMarketObject } from '@/types/indexer/indexerManual';

import { BaseTrade } from './rawTypes';
import {
IndexerWsBaseMarketObject,
IndexerWsTradeResponseObject,
} from '@/types/indexer/indexerManual';

type ReplaceBigNumberInUnion<T> = T extends string ? BigNumber : T;

Expand Down Expand Up @@ -147,4 +148,4 @@ export type SubaccountOrder = {
marginMode: MarginMode | undefined;
};

export type LiveTrade = BaseTrade;
export type LiveTrade = IndexerWsTradeResponseObject;
51 changes: 51 additions & 0 deletions src/abacus-ts/websocket/candles.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import { orderBy } from 'lodash';

import { isWsCandlesResponse, isWsCandlesUpdateResponse } from '@/types/indexer/indexerChecks';
import { IndexerWsCandleResponse } from '@/types/indexer/indexerManual';

import { Loadable, loadableLoaded, loadablePending } from '../lib/loadable';
import { logAbacusTsError } from '../logs';
import { makeWsValueManager } from './lib/indexerValueManagerHelpers';
import { IndexerWebsocket } from './lib/indexerWebsocket';
import { WebsocketDerivedValue } from './lib/websocketDerivedValue';

function candlesWebsocketValueCreator(
websocket: IndexerWebsocket,
{ marketIdAndResolution }: { marketIdAndResolution: string }
) {
return new WebsocketDerivedValue<Loadable<IndexerWsCandleResponse>>(
websocket,
{
channel: 'v4_candles',
id: marketIdAndResolution,
handleBaseData: (baseMessage) => {
const message = isWsCandlesResponse(baseMessage);
return loadableLoaded({
candles: orderBy(message.candles, [(a) => a.startedAt], ['asc']),
});
},
handleUpdates: (baseUpdates, value) => {
const updates = isWsCandlesUpdateResponse(baseUpdates);
const startingValue = value.data;
if (startingValue == null) {
logAbacusTsError('CandlesTracker', 'found unexpectedly null base data in update');
return value;
}
if (startingValue.candles.length === 0) {
return loadableLoaded({ candles: updates });
}

const allNewTimes = new Set(updates.map(({ startedAt }) => startedAt));
const newArr = [
...updates,
...startingValue.candles.filter(({ startedAt }) => !allNewTimes.has(startedAt)),
];
const sorted = orderBy(newArr, [(a) => a.startedAt], ['asc']);
return loadableLoaded({ candles: sorted });
},
},
loadablePending()
);
}

export const CandlesValuesManager = makeWsValueManager(candlesWebsocketValueCreator);
100 changes: 100 additions & 0 deletions src/abacus-ts/websocket/candlesForTradingView.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
import { createStoreEffect } from '@/abacus-ts/lib/createStoreEffect';
import { selectWebsocketUrl } from '@/abacus-ts/socketSelectors';
import { CandlesValuesManager } from '@/abacus-ts/websocket/candles';
import { subscribeToWsValue } from '@/abacus-ts/websocket/lib/indexerValueManagerHelpers';
import type {
LibrarySymbolInfo,
ResolutionString,
SubscribeBarsCallback,
} from 'public/tradingview/charting_library';

import { CandleResolution, RESOLUTION_MAP } from '@/constants/candles';

import { type RootStore } from '@/state/_store';

import { mapCandle } from '../../lib/tradingView/utils';

export const subscriptionsByGuid: {
[guid: string]:
| {
guid: string;
unsub: () => void;
}
| undefined;
} = {};

export const subscribeOnStream = ({
store,
orderbookCandlesToggleOn,
symbolInfo,
resolution,
onRealtimeCallback,
listenerGuid,
onResetCacheNeededCallback,
}: {
store: RootStore;
orderbookCandlesToggleOn: boolean;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this value trigger an immediate refresh of the current candle?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's a statsig feature flag now so can't change during runtime (I hope).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

by gawd you're right. i remember we used to expose a toggle

symbolInfo: LibrarySymbolInfo;
resolution: ResolutionString;
onRealtimeCallback: SubscribeBarsCallback;
listenerGuid: string;
onResetCacheNeededCallback: Function;
}) => {
if (!symbolInfo.ticker) return;

const channelId = `${symbolInfo.ticker}/${RESOLUTION_MAP[resolution]}`;
let isFirstRun = true;

const tearDown = createStoreEffect(store, selectWebsocketUrl, (wsUrl) => {
// if the websocket url changes, force a refresh
if (isFirstRun) {
isFirstRun = false;
} else {
setTimeout(() => onResetCacheNeededCallback(), 0);
}

let mostRecentFirstPointStartedAt: string | undefined;
const unsub = subscribeToWsValue(
CandlesValuesManager,
{ wsUrl, marketIdAndResolution: channelId },
({ data }) => {
if (data == null || data.candles.length === 0) {
return;
}
// if we've never seen data before, it's either the existing data or the subscribed message
// either way, we take it as the basis and only send further updates
// there is a small race condition where messages could be missed between
// when trandingview does the rest query and when we start receiving updates
if (mostRecentFirstPointStartedAt == null) {
mostRecentFirstPointStartedAt = data.candles.at(-1)?.startedAt;
return;
}
data.candles.forEach((candle) => {
if (candle.startedAt >= mostRecentFirstPointStartedAt!) {
onRealtimeCallback(
mapCandle(orderbookCandlesToggleOn)({
...candle,
resolution: candle.resolution as unknown as CandleResolution,
orderbookMidPriceClose: candle.orderbookMidPriceClose ?? undefined,
orderbookMidPriceOpen: candle.orderbookMidPriceOpen ?? undefined,
})
);
}
});
mostRecentFirstPointStartedAt = data.candles.at(-1)?.startedAt;
}
);

// happens on network change or unsubscribe from stream
return () => {
unsub();
};
});

subscriptionsByGuid[listenerGuid] = { guid: listenerGuid, unsub: tearDown };
};

export const unsubscribeFromStream = (subscriberUID: string) => {
subscriptionsByGuid[subscriberUID]?.unsub();
subscriptionsByGuid[subscriberUID] = undefined;
};
10 changes: 5 additions & 5 deletions src/abacus-ts/websocket/trades.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,16 @@ import { useEffect, useState } from 'react';
import { orderBy } from 'lodash';

import { isWsTradesResponse, isWsTradesUpdateResponses } from '@/types/indexer/indexerChecks';
import { IndexerWsTradesUpdateObject } from '@/types/indexer/indexerManual';

import { useAppSelector } from '@/state/appTypes';
import { getCurrentMarketIdIfTradeable } from '@/state/perpetualsSelectors';

import { mergeById } from '@/lib/mergeById';

import { Loadable, loadableIdle, loadableLoaded, loadablePending } from '../lib/loadable';
import { logAbacusTsError } from '../logs';
import { selectWebsocketUrl } from '../socketSelectors';
import { TradesData } from '../types/rawTypes';
import { makeWsValueManager, subscribeToWsValue } from './lib/indexerValueManagerHelpers';
import { IndexerWebsocket } from './lib/indexerWebsocket';
import { WebsocketDerivedValue } from './lib/websocketDerivedValue';
Expand All @@ -22,7 +23,7 @@ function tradesWebsocketValueCreator(
websocket: IndexerWebsocket,
{ marketId }: { marketId: string }
) {
return new WebsocketDerivedValue<Loadable<TradesData>>(
return new WebsocketDerivedValue<Loadable<IndexerWsTradesUpdateObject>>(
websocket,
{
channel: 'v4_trades',
Expand All @@ -37,8 +38,7 @@ function tradesWebsocketValueCreator(
const updates = isWsTradesUpdateResponses(baseUpdates);
const startingValue = value.data;
if (startingValue == null) {
// eslint-disable-next-line no-console
console.log('MarketsTracker found unexpectedly null base data in update');
logAbacusTsError('TradesTracker', 'found unexpectedly null base data in update');
return value;
}
const allNewTrades = updates.flatMap((u) => u.trades).toReversed();
Expand All @@ -58,7 +58,7 @@ export function useCurrentMarketTradesValue() {
const currentMarketId = useAppSelector(getCurrentMarketIdIfTradeable);

// useSyncExternalStore is better but the API doesn't fit this use case very well
const [trades, setTrades] = useState<Loadable<TradesData>>(loadableIdle());
const [trades, setTrades] = useState<Loadable<IndexerWsTradesUpdateObject>>(loadableIdle());

useEffect(() => {
if (currentMarketId == null) {
Expand Down
2 changes: 2 additions & 0 deletions src/lib/tradingView/dydxfeed/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,8 @@ export const getDydxDatafeed = (
onResetCacheNeededCallback: Function
) => {
subscribeOnStream({
store,
orderbookCandlesToggleOn,
symbolInfo,
resolution,
onRealtimeCallback: onTick,
Expand Down
4 changes: 4 additions & 0 deletions src/lib/tradingView/dydxfeed/streaming.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import type {

import { RESOLUTION_MAP } from '@/constants/candles';

import { type RootStore } from '@/state/_store';

import abacusStateManager from '@/lib/abacus';

import { subscriptionsByChannelId } from './cache';
Expand All @@ -16,6 +18,8 @@ export const subscribeOnStream = ({
onRealtimeCallback,
listenerGuid,
}: {
store: RootStore;
orderbookCandlesToggleOn: boolean;
symbolInfo: LibrarySymbolInfo;
resolution: ResolutionString;
onRealtimeCallback: SubscribeBarsCallback;
Expand Down
4 changes: 4 additions & 0 deletions src/types/indexer/indexerChecks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import {
import {
IndexerCompositeFillResponse,
IndexerCompositeOrderObject,
IndexerWsCandleResponse,
IndexerWsCandleResponseObject,
IndexerWsMarketUpdateResponse,
IndexerWsOrderbookUpdateResponse,
IndexerWsParentSubaccountSubscribedResponse,
Expand All @@ -28,6 +30,8 @@ export const isWsOrderbookUpdateResponses =
typia.createAssert<IndexerWsOrderbookUpdateResponse[]>();
export const isWsTradesResponse = typia.createAssert<IndexerTradeResponse>();
export const isWsTradesUpdateResponses = typia.createAssert<IndexerWsTradesUpdateObject[]>();
export const isWsCandlesResponse = typia.createAssert<IndexerWsCandleResponse>();
export const isWsCandlesUpdateResponse = typia.createAssert<IndexerWsCandleResponseObject[]>();
export const isParentSubaccountFillResponse = typia.createAssert<IndexerCompositeFillResponse>();
export const isParentSubaccountOrders = typia.createAssert<IndexerCompositeOrderObject[]>();
export const isParentSubaccountTransferResponse =
Expand Down
10 changes: 9 additions & 1 deletion src/types/indexer/indexerManual.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import {
IndexerAPIOrderStatus,
IndexerAPITimeInForce,
IndexerAssetPositionResponseObject,
IndexerCandleResponseObject,
IndexerFillType,
IndexerHistoricalBlockTradingReward,
IndexerIsoString,
Expand Down Expand Up @@ -164,6 +165,13 @@ export interface IndexerWsParentSubaccountUpdateObject {
transfers?: IndexerTransferCommonResponseObject;
}

// hacking around backend types not quite matching what the websocket sends
export type IndexerWsTradeResponseObject = PartialBy<IndexerTradeResponseObject, 'createdAtHeight'>;
export interface IndexerWsTradesUpdateObject {
trades: PartialBy<IndexerTradeResponseObject, 'createdAtHeight'>[];
trades: IndexerWsTradeResponseObject[];
}

export type IndexerWsCandleResponseObject = Omit<IndexerCandleResponseObject, 'id'>;
export interface IndexerWsCandleResponse {
candles: Array<IndexerWsCandleResponseObject>;
}
Loading