Skip to content

Commit

Permalink
fix: spokepool processor batch queries (#73)
Browse files Browse the repository at this point in the history
Co-authored-by: Alexandru Matei <[email protected]>
  • Loading branch information
amateima and alexandrumatei36 authored Oct 15, 2024
1 parent 5732d63 commit 106430c
Showing 1 changed file with 33 additions and 25 deletions.
58 changes: 33 additions & 25 deletions packages/indexer/src/services/spokePoolProcessor.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { utils } from "@across-protocol/sdk";
import { DataSource, entities } from "@repo/indexer-database";
import winston from "winston";
import { RelayStatus } from "../../../indexer-database/dist/src/entities";
Expand All @@ -9,6 +10,8 @@ enum SpokePoolEvents {
}

export class SpokePoolProcessor {
private queryBatchSize = 100;

constructor(
private readonly postgres: DataSource,
private readonly logger: winston.Logger,
Expand Down Expand Up @@ -61,35 +64,40 @@ export class SpokePoolProcessor {
[SpokePoolEvents.FilledV3Relay]: "fillEventId",
[SpokePoolEvents.RequestedV3SlowFill]: "requestSlowFillEventId",
};

const updatedRelays = await relayHashInfoRepository.upsert(
events.map((event) => {
const eventField = eventTypeToField[eventType];
return {
relayHash: event.relayHash,
depositId: event.depositId,
originChainId: event.originChainId,
destinationChainId: event.destinationChainId,
fillDeadline: event.fillDeadline,
[eventField]: event.id,
...(eventType === SpokePoolEvents.V3FundsDeposited && {
depositTxHash: event.transactionHash,
}),
...(eventType === SpokePoolEvents.FilledV3Relay && {
status: RelayStatus.Filled,
fillTxHash: event.transactionHash,
}),
...(eventType === SpokePoolEvents.RequestedV3SlowFill && {
status: RelayStatus.SlowFillRequested,
}),
};
}),
["relayHash"],
const data = events.map((event) => {
const eventField = eventTypeToField[eventType];
return {
relayHash: event.relayHash,
depositId: event.depositId,
originChainId: event.originChainId,
destinationChainId: event.destinationChainId,
fillDeadline: event.fillDeadline,
[eventField]: event.id,
...(eventType === SpokePoolEvents.V3FundsDeposited && {
depositTxHash: event.transactionHash,
}),
...(eventType === SpokePoolEvents.FilledV3Relay && {
status: RelayStatus.Filled,
fillTxHash: event.transactionHash,
}),
...(eventType === SpokePoolEvents.RequestedV3SlowFill && {
status: RelayStatus.SlowFillRequested,
}),
};
});
const chunkedData = utils.chunk(data, this.queryBatchSize);
const upsertResult = await Promise.all(
chunkedData.map((chunk) =>
relayHashInfoRepository.upsert(chunk, ["relayHash"]),
),
);
this.logger.info({
at: "SpokePoolProcessor#assignSpokeEventsToRelayHashInfo",
message: `${eventType} events associated with RelayHashInfo`,
updatedRelayHashInfoRows: updatedRelays.generatedMaps.length,
updatedRelayHashInfoRows: upsertResult.reduce(
(acc, res) => acc + res.generatedMaps.length,
0,
),
});
}

Expand Down

0 comments on commit 106430c

Please sign in to comment.