Skip to content

Commit

Permalink
Check existing record against update window (#787)
Browse files Browse the repository at this point in the history
* Check to see if existing record was updated during integrity check window

* Fetch existing closer to upsert
  • Loading branch information
bryzettler authored Jan 29, 2025
1 parent 342df5d commit 79e6321
Showing 1 changed file with 84 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,17 +63,10 @@ export const integrityCheckProgramAccounts = async ({
}

const performIntegrityCheck = async () => {
const startTime = new Date();
const t = await sequelize.transaction({
isolationLevel: Transaction.ISOLATION_LEVELS.READ_COMMITTED,
});
const txIdsByAccountId: { [key: string]: string[] } = {};
const corrections: {
type: string;
accountId: string;
txSignatures: string[];
currentValues: null | { [key: string]: any };
newValues: { [key: string]: any };
}[] = [];

try {
const program = new anchor.Program(idl, programId, provider);
Expand All @@ -85,6 +78,15 @@ export const integrityCheckProgramAccounts = async ({
provider,
});

const txIdsByAccountId: { [key: string]: string[] } = {};
const corrections: {
type: string;
accountId: string;
txSignatures: string[];
currentValues: null | { [key: string]: any };
newValues: { [key: string]: any };
}[] = [];

if (!blockTime24HoursAgo) {
throw new Error("Unable to get blocktime from 24 hours ago");
}
Expand Down Expand Up @@ -158,62 +160,90 @@ export const integrityCheckProgramAccounts = async ({
return acc;
}, {} as Record<string, IInitedPlugin[]>);

const discriminatorsByType = new Map(
accounts.map(({ type }) => [
type,
anchor.BorshAccountsCoder.accountDiscriminator(type),
])
);

await Promise.all(
chunks(accountInfosWithPk, 1000).map(async (chunk) => {
for (const c of chunk) {
const accName = accounts.find(({ type }) => {
return (
c.data &&
anchor.BorshAccountsCoder.accountDiscriminator(type).equals(
c.data.subarray(0, 8)
)
);
})?.type;
const accountsByType: Record<string, typeof accountInfosWithPk> = {};
chunk.forEach((accountInfo) => {
const accName = accounts.find(
({ type }) =>
accountInfo.data &&
discriminatorsByType
.get(type)
?.equals(accountInfo.data.subarray(0, 8))
)?.type;

if (!accName) {
continue;
if (accName) {
accountsByType[accName] = accountsByType[accName] || [];
accountsByType[accName].push(accountInfo);
}
});

const decodedAcc = program.coder.accounts.decode(
accName!,
c.data as Buffer
);

if (accName) {
await Promise.all(
Object.entries(accountsByType).map(async ([accName, accounts]) => {
const model = sequelize.models[accName];
const existing = await model.findByPk(c.pubkey, {
transaction: t,
});
await Promise.all(
accounts.map(async (c) => {
const decodedAcc = program.coder.accounts.decode(
accName,
c.data as Buffer
);

let sanitized = {
refreshed_at: new Date().toISOString(),
address: c.pubkey,
...sanitizeAccount(decodedAcc),
};
let sanitized = {
refreshed_at: new Date().toISOString(),
address: c.pubkey,
...sanitizeAccount(decodedAcc),
};

for (const plugin of pluginsByAccountType[accName]) {
if (plugin?.processAccount) {
sanitized = await plugin.processAccount(sanitized);
}
}
if (pluginsByAccountType[accName]?.length > 0) {
const pluginResults = await Promise.all(
pluginsByAccountType[accName].map((plugin) =>
plugin?.processAccount
? plugin.processAccount(sanitized)
: sanitized
)
);
sanitized = pluginResults.reduce(
(acc, curr) => ({ ...acc, ...curr }),
sanitized
);
}

const shouldUpdate = !deepEqual(
_omit(sanitized, OMIT_KEYS),
_omit(existing?.dataValues, OMIT_KEYS)
);
const existing = await model.findByPk(c.pubkey, {
transaction: t,
});

if (shouldUpdate) {
corrections.push({
type: accName,
accountId: c.pubkey,
txSignatures: txIdsByAccountId[c.pubkey],
currentValues: existing ? existing.dataValues : null,
newValues: sanitized,
});
await model.upsert({ ...sanitized }, { transaction: t });
}
}
}
const shouldUpdate =
!deepEqual(
_omit(sanitized, OMIT_KEYS),
_omit(existing?.dataValues, OMIT_KEYS)
) &&
!(
existing?.dataValues.refreshed_at &&
new Date(existing.dataValues.refreshed_at) >= startTime &&
new Date(existing.dataValues.refreshed_at) <= new Date()
);

if (shouldUpdate) {
corrections.push({
type: accName,
accountId: c.pubkey,
txSignatures: txIdsByAccountId[c.pubkey],
currentValues: existing ? existing.dataValues : null,
newValues: sanitized,
});
await model.upsert({ ...sanitized }, { transaction: t });
}
})
);
})
);
})
);

Expand Down

0 comments on commit 79e6321

Please sign in to comment.