diff --git a/packages/account-postgres-sink-service/src/utils/integrityCheckProgramAccounts.ts b/packages/account-postgres-sink-service/src/utils/integrityCheckProgramAccounts.ts index 72d39fe71..befc99a81 100644 --- a/packages/account-postgres-sink-service/src/utils/integrityCheckProgramAccounts.ts +++ b/packages/account-postgres-sink-service/src/utils/integrityCheckProgramAccounts.ts @@ -63,17 +63,10 @@ export const integrityCheckProgramAccounts = async ({ } const performIntegrityCheck = async () => { + const startTime = new Date(); const t = await sequelize.transaction({ isolationLevel: Transaction.ISOLATION_LEVELS.READ_COMMITTED, }); - const txIdsByAccountId: { [key: string]: string[] } = {}; - const corrections: { - type: string; - accountId: string; - txSignatures: string[]; - currentValues: null | { [key: string]: any }; - newValues: { [key: string]: any }; - }[] = []; try { const program = new anchor.Program(idl, programId, provider); @@ -85,6 +78,15 @@ export const integrityCheckProgramAccounts = async ({ provider, }); + const txIdsByAccountId: { [key: string]: string[] } = {}; + const corrections: { + type: string; + accountId: string; + txSignatures: string[]; + currentValues: null | { [key: string]: any }; + newValues: { [key: string]: any }; + }[] = []; + if (!blockTime24HoursAgo) { throw new Error("Unable to get blocktime from 24 hours ago"); } @@ -158,62 +160,90 @@ export const integrityCheckProgramAccounts = async ({ return acc; }, {} as Record); + const discriminatorsByType = new Map( + accounts.map(({ type }) => [ + type, + anchor.BorshAccountsCoder.accountDiscriminator(type), + ]) + ); + await Promise.all( chunks(accountInfosWithPk, 1000).map(async (chunk) => { - for (const c of chunk) { - const accName = accounts.find(({ type }) => { - return ( - c.data && - anchor.BorshAccountsCoder.accountDiscriminator(type).equals( - c.data.subarray(0, 8) - ) - ); - })?.type; + const accountsByType: Record = {}; + chunk.forEach((accountInfo) => { + const accName = accounts.find( + ({ type }) => + accountInfo.data && + discriminatorsByType + .get(type) + ?.equals(accountInfo.data.subarray(0, 8)) + )?.type; - if (!accName) { - continue; + if (accName) { + accountsByType[accName] = accountsByType[accName] || []; + accountsByType[accName].push(accountInfo); } + }); - const decodedAcc = program.coder.accounts.decode( - accName!, - c.data as Buffer - ); - - if (accName) { + await Promise.all( + Object.entries(accountsByType).map(async ([accName, accounts]) => { const model = sequelize.models[accName]; - const existing = await model.findByPk(c.pubkey, { - transaction: t, - }); + await Promise.all( + accounts.map(async (c) => { + const decodedAcc = program.coder.accounts.decode( + accName, + c.data as Buffer + ); - let sanitized = { - refreshed_at: new Date().toISOString(), - address: c.pubkey, - ...sanitizeAccount(decodedAcc), - }; + let sanitized = { + refreshed_at: new Date().toISOString(), + address: c.pubkey, + ...sanitizeAccount(decodedAcc), + }; - for (const plugin of pluginsByAccountType[accName]) { - if (plugin?.processAccount) { - sanitized = await plugin.processAccount(sanitized); - } - } + if (pluginsByAccountType[accName]?.length > 0) { + const pluginResults = await Promise.all( + pluginsByAccountType[accName].map((plugin) => + plugin?.processAccount + ? plugin.processAccount(sanitized) + : sanitized + ) + ); + sanitized = pluginResults.reduce( + (acc, curr) => ({ ...acc, ...curr }), + sanitized + ); + } - const shouldUpdate = !deepEqual( - _omit(sanitized, OMIT_KEYS), - _omit(existing?.dataValues, OMIT_KEYS) - ); + const existing = await model.findByPk(c.pubkey, { + transaction: t, + }); - if (shouldUpdate) { - corrections.push({ - type: accName, - accountId: c.pubkey, - txSignatures: txIdsByAccountId[c.pubkey], - currentValues: existing ? existing.dataValues : null, - newValues: sanitized, - }); - await model.upsert({ ...sanitized }, { transaction: t }); - } - } - } + const shouldUpdate = + !deepEqual( + _omit(sanitized, OMIT_KEYS), + _omit(existing?.dataValues, OMIT_KEYS) + ) && + !( + existing?.dataValues.refreshed_at && + new Date(existing.dataValues.refreshed_at) >= startTime && + new Date(existing.dataValues.refreshed_at) <= new Date() + ); + + if (shouldUpdate) { + corrections.push({ + type: accName, + accountId: c.pubkey, + txSignatures: txIdsByAccountId[c.pubkey], + currentValues: existing ? existing.dataValues : null, + newValues: sanitized, + }); + await model.upsert({ ...sanitized }, { transaction: t }); + } + }) + ); + }) + ); }) );