Skip to content

Commit

Permalink
Merge pull request #1458 from drift-labs/wphan/user_stats_map_sync
Browse files Browse the repository at this point in the history
sdk: add paginated sync for UserStatsMap
  • Loading branch information
wphan authored Feb 3, 2025
2 parents f60cf2f + 8f0145b commit 3cdf54e
Show file tree
Hide file tree
Showing 2 changed files with 158 additions and 1 deletion.
10 changes: 10 additions & 0 deletions sdk/src/userMap/userMap.ts
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,11 @@ export class UserMap implements UserMapInterface {
}
}

/**
* Syncs the UserMap using the default sync method (single getProgramAccounts call with filters).
* This method may fail when drift has too many users. (nodejs response size limits)
* @returns
*/
private async defaultSync() {
if (this.syncPromise) {
return this.syncPromise;
Expand Down Expand Up @@ -487,6 +492,11 @@ export class UserMap implements UserMapInterface {
}
}

/**
* Syncs the UserMap using the paginated sync method (multiple getMultipleAccounts calls with filters).
* This method is more reliable when drift has many users.
* @returns
*/
private async paginatedSync() {
if (this.syncPromise) {
return this.syncPromise;
Expand Down
149 changes: 148 additions & 1 deletion sdk/src/userMap/userStatsMap.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import {
InsuranceFundStakeRecord,
BulkAccountLoader,
PollingUserStatsAccountSubscriber,
SyncConfig,
getUserStatsFilter,
} from '..';
import { PublicKey } from '@solana/web3.js';

Expand All @@ -27,14 +29,23 @@ export class UserStatsMap {
private userStatsMap = new Map<string, UserStats>();
private driftClient: DriftClient;
private bulkAccountLoader: BulkAccountLoader;
private decode;
private syncConfig: SyncConfig;

private syncPromise?: Promise<void>;
private syncPromiseResolver: () => void;

/**
* Creates a new UserStatsMap instance.
*
* @param {DriftClient} driftClient - The DriftClient instance.
* @param {BulkAccountLoader} [bulkAccountLoader] - If not provided, a new BulkAccountLoader with polling disabled will be created.
*/
constructor(driftClient: DriftClient, bulkAccountLoader?: BulkAccountLoader) {
constructor(
driftClient: DriftClient,
bulkAccountLoader?: BulkAccountLoader,
syncConfig?: SyncConfig
) {
this.driftClient = driftClient;
if (!bulkAccountLoader) {
bulkAccountLoader = new BulkAccountLoader(
Expand All @@ -44,6 +55,15 @@ export class UserStatsMap {
);
}
this.bulkAccountLoader = bulkAccountLoader;

this.syncConfig = syncConfig ?? {
type: 'default',
};

this.decode =
this.driftClient.program.account.userStats.coder.accounts.decodeUnchecked.bind(
this.driftClient.program.account.userStats.coder.accounts
);
}

public async subscribe(authorities: PublicKey[]) {
Expand Down Expand Up @@ -201,6 +221,19 @@ export class UserStatsMap {
* You may want to get this list from UserMap in order to filter out idle users
*/
public async sync(authorities: PublicKey[]) {
if (this.syncConfig.type === 'default') {
return this.defaultSync(authorities);
} else {
return this.paginatedSync(authorities);
}
}

/**
* Sync the UserStatsMap using the default sync method, which loads individual users into the bulkAccountLoader and
* loads them. (bulkAccountLoader uses batch getMultipleAccounts)
* @param authorities
*/
private async defaultSync(authorities: PublicKey[]) {
await Promise.all(
authorities.map((authority) =>
this.addUserStat(authority, undefined, true)
Expand All @@ -209,6 +242,120 @@ export class UserStatsMap {
await this.bulkAccountLoader.load();
}

/**
* Sync the UserStatsMap using the paginated sync method, which uses multiple getMultipleAccounts calls (without RPC batching), and limits concurrency.
* @param authorities
*/
private async paginatedSync(authorities: PublicKey[]) {
if (this.syncPromise) {
return this.syncPromise;
}

this.syncPromise = new Promise<void>((resolve) => {
this.syncPromiseResolver = resolve;
});

try {
let accountsToLoad = authorities;
if (authorities.length === 0) {
const accountsPrefetch =
await this.driftClient.connection.getProgramAccounts(
this.driftClient.program.programId,
{
dataSlice: { offset: 0, length: 0 },
filters: [getUserStatsFilter()],
}
);
accountsToLoad = accountsPrefetch.map((account) => account.pubkey);
}

const limitConcurrency = async (tasks, limit) => {
const executing = [];
const results = [];

for (let i = 0; i < tasks.length; i++) {
const executor = Promise.resolve().then(tasks[i]);
results.push(executor);

if (executing.length < limit) {
executing.push(executor);
executor.finally(() => {
const index = executing.indexOf(executor);
if (index > -1) {
executing.splice(index, 1);
}
});
} else {
await Promise.race(executing);
}
}

return Promise.all(results);
};

const programAccountBufferMap = new Set<string>();

// @ts-ignore
const chunkSize = this.syncConfig.chunkSize ?? 100;
const tasks = [];
for (let i = 0; i < accountsToLoad.length; i += chunkSize) {
const chunk = accountsToLoad.slice(i, i + chunkSize);
tasks.push(async () => {
const accountInfos =
await this.driftClient.connection.getMultipleAccountsInfoAndContext(
chunk,
{
commitment: this.driftClient.opts.commitment,
}
);

for (let j = 0; j < accountInfos.value.length; j += 1) {
const accountInfo = accountInfos.value[j];
if (accountInfo === null) continue;

const publicKeyString = chunk[j].toString();
if (!this.has(publicKeyString)) {
const buffer = Buffer.from(accountInfo.data);
const decodedUserStats = this.decode(
'UserStats',
buffer
) as UserStatsAccount;
programAccountBufferMap.add(
decodedUserStats.authority.toBase58()
);
this.addUserStat(
decodedUserStats.authority,
decodedUserStats,
false
);
}
}
});
}

// @ts-ignore
const concurrencyLimit = this.syncConfig.concurrencyLimit ?? 10;
await limitConcurrency(tasks, concurrencyLimit);

for (const [key] of this.userStatsMap.entries()) {
if (!programAccountBufferMap.has(key)) {
const user = this.get(key);
if (user) {
await user.unsubscribe();
this.userStatsMap.delete(key);
}
}
}
} catch (err) {
console.error(`Error in UserStatsMap.paginatedSync():`, err);
} finally {
if (this.syncPromiseResolver) {
this.syncPromiseResolver();
}
this.syncPromise = undefined;
}
}

public async unsubscribe() {
for (const [key, userStats] of this.userStatsMap.entries()) {
await userStats.unsubscribe();
Expand Down

0 comments on commit 3cdf54e

Please sign in to comment.