Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: replace bulk upserts with inserts/updates if needed #103

Conversation

amateima
Copy link
Contributor

@amateima amateima commented Nov 15, 2024

  • new BlockchainEventRepository which is intended to be extended by any repository class dealing with events entities.
  • BlockchainEventRepository.saveAndHandleFinalisation() is a function which takes event data and performs a save operation depending on the passed uniqueKeys and comparisonKeys. The result of this save operation can be (1) Nothing if the data is identical to what's stored in the database , (2) Inserted, if the unique keys are not present in the DB, (3) Finalised, if the finalised field was the only one that changed, (4) Updated, if any of the data fields were changed, (5) UpdatedAndFinalised, if both the finalised field and other fields were changed
  • filterSaveQueryResults takes a list of SaveQueryResult[] and filters the data based on a SaveQueryResultType, so for example, we can know which events were inserted in the DB for the first time
  • SpokePoolProcessor functions were updated to perform operations only when necessary (mostly on Inserted and Updated entities)
  • TODOs marks where webhooks should be triggered

Across Indexer (15)

Copy link

linear bot commented Nov 15, 2024

@amateima amateima force-pushed the amatei/acx-3323-replace-bulk-upserts-with-updates-only-if-necessary branch from d4c340c to c0c0348 Compare November 19, 2024 12:18
Copy link
Contributor

@daywiss daywiss left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks good, mainly just questions!

);
const dbEntity = await this.postgres
.getRepository(entity)
.findOne({ where });
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why search a list of unique keys? isnt there a way to identify this with a single id?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

most of the events are identifiable by a pair of fields. For example, in case of a re-org, the only constant in a deposit event are depositId and originChainId fields, so you need query by these 2 fields

}

// Check if the any of values of the comparison keys have changed
const isChanged = comparisonKeys.some((key) => data[key] !== dbEntity[key]);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how do you decide what props to compare?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic will be specific for each event individually. Basically for each event you need to identify the fields that are unique in case of re-orgs and the fields that will be changed and needs to be compared

return {
data: (await this.postgres
.getRepository(entity)
.findOne({ where })) as Entity,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why requery this? shouldnt it be the same as "data"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking for this function to return the db entity because it contains the generated fields too, such as the incremental id

Copy link
Contributor

@melisaguevara melisaguevara left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks good! I left some questions

.findOne({ where });

if (!dbEntity) {
await this.postgres.getRepository(entity).insert(data);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: should we save the repository in a variable to use it in the lines where we are doing this.postgres.getRepository(entity)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done 8a16b88

};
}

// Check if the any of values of the comparison keys have changed
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// Check if the any of values of the comparison keys have changed
// Check if any of the values of the comparison keys have changed

Comment on lines 92 to 94
data: (await this.postgres
.getRepository(entity)
.findOne({ where })) as Entity,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a way to use the return value of update on line 89 instead of querying the entity again?

storedEvents.deposits,
SaveQueryResultType.Inserted,
);
await this.updateNewDepositsWithIntegratorId(newInsertedDeposits);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OOC: Are we ok to do this sync always?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wanted to give it a try again and see how much time it adds in production if we do this sync

@@ -42,14 +48,14 @@ export class SpokePoolRepository extends dbUtils.BaseRepository {
finalised: event.blockNumber <= lastFinalisedBlock,
};
});
const chunkedEvents = across.utils.chunk(formattedEvents, this.chunkSize);
const chunkedEvents = across.utils.chunk(formattedEvents, 100);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we keep this.chunkSize here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, but I lowered the default value of the chunkSize

Comment on lines 156 to +157
["depositId", "originChainId", "transactionHash", "logIndex"],
lastFinalisedBlock,
["transactionHash"],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we use a different set for uniqueKeys here to avoid having transactionHash both in uniqueKeys and comparisonKeys?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is intended here because unfortunately there's no way to identify uniquely a speedup event in case of re-orgs :(

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The outcome of this is that a re-org event would be inserted as a new one

amateima and others added 4 commits November 20, 2024 13:08
…if-necessary' of amateima.github.com:across-protocol/indexer into amatei/acx-3323-replace-bulk-upserts-with-updates-only-if-necessary
@amateima amateima merged commit 2217ea3 into master Nov 20, 2024
3 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants