Skip to content

Commit

Permalink
Merge branch 'main' into jv/subaccount-operations-2
Browse files Browse the repository at this point in the history
  • Loading branch information
jaredvu committed Jan 14, 2025
2 parents c389a28 + 025de60 commit 61d9204
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 33 deletions.
6 changes: 5 additions & 1 deletion src/abacus-ts/logs.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
import { log } from '@/lib/telemetry';
import { log, logInfo } from '@/lib/telemetry';

export function logAbacusTsError(source: string, message: string, ...args: any[]) {
log(`bonsai: ${source}: ${message}`, undefined, { context: args });
}

export function logAbacusTsInfo(source: string, message: string, ...args: any[]) {
logInfo(`bonsai: ${source}: ${message}`, { context: args });
}
88 changes: 62 additions & 26 deletions src/abacus-ts/websocket/lib/indexerWebsocket.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { logAbacusTsError } from '@/abacus-ts/logs';
import { logAbacusTsError, logAbacusTsInfo } from '@/abacus-ts/logs';
import typia from 'typia';

import { timeUnits } from '@/constants/time';
Expand All @@ -9,6 +9,7 @@ import { isTruthy } from '@/lib/isTruthy';
import { ReconnectingWebSocket } from './reconnectingWebsocket';

const NO_ID_SPECIAL_STRING_ID = '______EMPTY_ID______';
const CHANNEL_ID_SAFE_DIVIDER = '//////////';
const CHANNEL_RETRY_COOLDOWN_MS = timeUnits.minute;

export class IndexerWebsocket {
Expand All @@ -27,7 +28,7 @@ export class IndexerWebsocket {
};
} = {};

private lastRetryTimeMsByChannel: { [channel: string]: number } = {};
private lastRetryTimeMsByChannelAndId: { [channelAndId: string]: number } = {};

constructor(url: string) {
this.socket = new ReconnectingWebSocket({
Expand Down Expand Up @@ -84,6 +85,12 @@ export class IndexerWebsocket {
logAbacusTsError('IndexerWebsocket', 'this subscription already exists', `${channel}/${id}`);
throw new Error(`IndexerWebsocket error: this subscription already exists. ${channel}/${id}`);
}
logAbacusTsInfo('IndexerWebsocket', 'adding subscription', {
channel,
id,
socketNonNull: this.socket != null,
socketActive: this.socket?.isActive(),
});
this.subscriptions[channel][id ?? NO_ID_SPECIAL_STRING_ID] = {
channel,
id,
Expand Down Expand Up @@ -121,6 +128,12 @@ export class IndexerWebsocket {
);
return;
}
logAbacusTsInfo('IndexerWebsocket', 'removing subscription', {
channel,
id,
socketNonNull: this.socket != null,
socketActive: this.socket?.isActive(),
});
if (
this.socket != null &&
this.socket.isActive() &&
Expand All @@ -135,36 +148,36 @@ export class IndexerWebsocket {
delete this.subscriptions[channel][id ?? NO_ID_SPECIAL_STRING_ID];
};

private _refreshChannelSubs = (channel: string) => {
const allSubs = Object.values(this.subscriptions[channel] ?? {});
allSubs.forEach((sub) => {
this._performUnsub(sub);
this._addSub(sub);
});
private _refreshSub = (channel: string, id: string) => {
const sub = this.subscriptions[channel]?.[id];
if (sub == null) {
return;
}
this._performUnsub(sub);
this._addSub(sub);
};

// if we get a "could not fetch data" error, we retry once as long as this channel is not on cooldown
// TODO: when backend adds the channel and id to the error message, use that to retry only one subscription
// if we get a "could not fetch data" error, we retry once as long as this channel+id is not on cooldown
// TODO: remove this entirely when backend is more reliable
private _handleErrorReceived = (message: string) => {
if (message.startsWith('Internal error, could not fetch data for subscription: ')) {
const maybeChannel = message
.trim()
.split(/[\s,.]/)
.at(-2);
if (maybeChannel != null && maybeChannel.startsWith('v4_')) {
const lastRefresh = this.lastRetryTimeMsByChannel[maybeChannel] ?? 0;
private _handleErrorReceived = (message: IndexerWebsocketErrorMessage) => {
if (message.message.startsWith('Internal error, could not fetch data for subscription: ')) {
const maybeChannel = message.channel;
const maybeId = message.id;
if (maybeChannel != null && maybeId != null && maybeChannel.startsWith('v4_')) {
const channelAndId = `${maybeChannel}${CHANNEL_ID_SAFE_DIVIDER}${maybeId}`;
const lastRefresh = this.lastRetryTimeMsByChannelAndId[channelAndId] ?? 0;
if (Date.now() - lastRefresh > CHANNEL_RETRY_COOLDOWN_MS) {
this.lastRetryTimeMsByChannel[maybeChannel] = Date.now();
this._refreshChannelSubs(maybeChannel);
logAbacusTsError(
this.lastRetryTimeMsByChannelAndId[channelAndId] = Date.now();
this._refreshSub(maybeChannel, maybeId);
logAbacusTsInfo(
'IndexerWebsocket',
'error fetching data for channel, refetching',
maybeChannel
maybeChannel,
maybeId
);
return;
}
logAbacusTsError('IndexerWebsocket', 'hit max retries for channel:', maybeChannel);
logAbacusTsError('IndexerWebsocket', 'hit max retries for channel:', maybeChannel, maybeId);
return;
}
}
Expand All @@ -175,9 +188,14 @@ export class IndexerWebsocket {
try {
const message = isWsMessage(messagePre);
if (message.type === 'error') {
this._handleErrorReceived(message.message);
} else if (message.type === 'connected' || message.type === 'unsubscribed') {
this._handleErrorReceived(message);
} else if (message.type === 'connected') {
// do nothing
} else if (message.type === 'unsubscribed') {
logAbacusTsInfo('IndexerWebsocket', `unsubscribe confirmed`, {
channel: message.channel,
id: message.id,
});
} else if (
message.type === 'subscribed' ||
message.type === 'channel_batch_data' ||
Expand All @@ -186,6 +204,7 @@ export class IndexerWebsocket {
) {
const channel = message.channel;
const id = message.id;

if (this.subscriptions[channel] == null) {
// hide error for channel we expect to see it on
if (channel !== 'v4_orderbook') {
Expand All @@ -211,6 +230,10 @@ export class IndexerWebsocket {
return;
}
if (message.type === 'subscribed') {
logAbacusTsInfo('IndexerWebsocket', `subscription confirmed`, {
channel,
id,
});
this.subscriptions[channel][id ?? NO_ID_SPECIAL_STRING_ID]!.handleBaseData(
message.contents,
message
Expand Down Expand Up @@ -239,6 +262,13 @@ export class IndexerWebsocket {

// when websocket churns, reconnect all known subscribers
private _handleFreshConnect = () => {
logAbacusTsInfo('IndexerWebsocket', 'freshly connected', {
socketNonNull: this.socket != null,
socketActive: this.socket?.isActive(),
numSubs: Object.values(this.subscriptions)
.flatMap((o) => Object.values(o))
.filter(isTruthy).length,
});
if (this.socket != null && this.socket.isActive()) {
Object.values(this.subscriptions)
.filter(isTruthy)
Expand All @@ -262,8 +292,14 @@ export class IndexerWebsocket {
};
}

type IndexerWebsocketErrorMessage = {
type: 'error';
message: string;
channel?: string;
id?: string;
};
type IndexerWebsocketMessageType =
| { type: 'error'; message: string }
| IndexerWebsocketErrorMessage
| { type: 'connected' }
| {
type: 'channel_batch_data';
Expand Down
16 changes: 10 additions & 6 deletions src/hooks/useDydxClient.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ export const DydxProvider = ({ ...props }) => (

export const useDydxClient = () => useContext(DydxContext);

const DEFAULT_PAGE_SIZE_TARGET = 1000;
// parallel requests should be limited to prevent hitting 429 errors and failing the whole operation
const DEFAULT_MAX_REQUESTS = 20;

const useDydxClientContext = () => {
// ------ Network ------ //

Expand Down Expand Up @@ -256,7 +260,7 @@ const useDydxClientContext = () => {
subaccountNumber,
undefined,
undefined,
100,
DEFAULT_PAGE_SIZE_TARGET,
undefined,
undefined,
1
Expand All @@ -268,7 +272,7 @@ const useDydxClientContext = () => {
length: Math.ceil(totalResults / pageSize) - 1,
},
(_, index) => index + 2
);
).slice(0, DEFAULT_MAX_REQUESTS);

const results = await Promise.all(
pages.map((page) =>
Expand All @@ -277,7 +281,7 @@ const useDydxClientContext = () => {
subaccountNumber,
undefined,
undefined,
100,
pageSize,
undefined,
undefined,
page
Expand Down Expand Up @@ -306,7 +310,7 @@ const useDydxClientContext = () => {
} = await indexerClient.account.getParentSubaccountNumberTransfers(
address,
subaccountNumber,
100,
DEFAULT_PAGE_SIZE_TARGET,
undefined,
undefined,
1
Expand All @@ -318,14 +322,14 @@ const useDydxClientContext = () => {
length: Math.ceil(totalResults / pageSize) - 1,
},
(_, index) => index + 2
);
).slice(0, DEFAULT_MAX_REQUESTS);

const results = await Promise.all(
pages.map((page) =>
indexerClient.account.getParentSubaccountNumberTransfers(
address,
subaccountNumber,
100,
pageSize,
undefined,
undefined,
page
Expand Down
9 changes: 9 additions & 0 deletions src/lib/telemetry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,15 @@ export const log = (location: string, error?: Error, metadata?: object) => {
globalThis.dispatchEvent(customEvent);
};

export const logInfo = (location: string, metadata?: object) => {
if (isDev) {
// eslint-disable-next-line no-console
console.log('telemetry/logInfo:', { location, metadata });
}

dd.info(`[Info] ${location}`, metadata);
};

// Log rejected Promises without a .catch() handler
globalThis.addEventListener('unhandledrejection', (event) => {
event.preventDefault();
Expand Down

0 comments on commit 61d9204

Please sign in to comment.