Skip to content

Commit

Permalink
feat: improve first sync
Browse files Browse the repository at this point in the history
  • Loading branch information
anxolin committed Jun 18, 2024
1 parent 8a4c044 commit 3e88069
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 57 deletions.
121 changes: 70 additions & 51 deletions src/services/chain.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import {
Registry,
ReplayPlan,
ConditionalOrderCreatedEvent,
Multicall3,
ComposableCoW,
Expand All @@ -18,6 +17,7 @@ import { addContract } from "../domain/events";
import { checkForAndPlaceOrder } from "../domain/polling";
import { ethers, providers } from "ethers";
import {
LoggerWithMethods,
composableCowContract,
getLogger,
isRunningInKubernetesPod,
Expand Down Expand Up @@ -185,7 +185,6 @@ export class ChainContext {
let currentBlock = await provider.getBlock("latest");

let printSyncInfo = true; // Print sync info only once
let plan: ReplayPlan = {};
let toBlock: "latest" | number = 0;
do {
do {
Expand Down Expand Up @@ -228,13 +227,31 @@ export class ChainContext {
log.debug(`Found ${events.length} events`);
}

// process the events
for (const event of events) {
if (plan[event.blockNumber] === undefined) {
plan[event.blockNumber] = new Set();
// Get relevant block numbers to process (the ones with relevant events)
const eventsByBlock = events.reduce<
Record<number, ConditionalOrderCreatedEvent[]>
>((acc, event) => {
const events = acc[event.blockNumber];
if (events) {
events.push(event);
} else {
acc[event.blockNumber] = [event];
}

plan[event.blockNumber].add(event);
return acc;
}, {});

// Process blocks in order
for (const blockNumberKey of Object.keys(eventsByBlock).sort()) {
const blockNumber = Number(blockNumberKey);
await processBlockAndPersist({
context: this,
blockNumber,
events: eventsByBlock[blockNumber],
currentBlock,
log,
provider,
});
}

// only possible string value for toBlock is 'latest'
Expand All @@ -243,41 +260,9 @@ export class ChainContext {
}
} while (toBlock !== "latest" && toBlock !== currentBlock.number);

// Replay only the blocks that had some events.
for (const [blockNumber, events] of Object.entries(plan)) {
log.debug(`Processing block ${blockNumber}`);
const historicalBlock = await provider.getBlock(Number(blockNumber));
try {
await processBlock(
this,
historicalBlock,
events,
currentBlock.number,
currentBlock.timestamp
);

// Set the last processed block to this iteration's block number
this.registry.lastProcessedBlock =
blockToRegistryBlock(historicalBlock);
await this.registry.write();

// Set the block height metric
metrics.blockHeight
.labels(chainId.toString())
.set(Number(blockNumber));
} catch (err) {
log.error(`Error processing block ${blockNumber}`, err);
}

log.debug(`Block ${blockNumber} has been processed`);
}

// Set the last processed block to the current block number
this.registry.lastProcessedBlock = blockToRegistryBlock(currentBlock);

// Save the registry
await this.registry.write();

// It may have taken some time to process the blocks, so refresh the current block number
// and check if we are in sync
currentBlock = await provider.getBlock("latest");
Expand Down Expand Up @@ -325,6 +310,7 @@ export class ChainContext {
provider.on("block", async (blockNumber: number) => {
try {
const block = await provider.getBlock(blockNumber);

log.debug(`New block ${blockNumber}`);

// Set the block time metric
Expand All @@ -350,18 +336,13 @@ export class ChainContext {
this
);

try {
await processBlock(this, block, events);

// Block height metric
this.registry.lastProcessedBlock = blockToRegistryBlock(block);
this.registry.write();
metrics.blockHeight.labels(chainId.toString()).set(blockNumber);
} catch {
log.error(`Error processing block ${blockNumber}`);
}

log.debug(`Block ${blockNumber} has been processed`);
await processBlockAndPersist({
context: this,
blockNumber,
events,
log,
provider,
});
} catch (error) {
log.error(
`Error in pollContractForEvents for block ${blockNumber}`,
Expand Down Expand Up @@ -505,6 +486,44 @@ async function processBlock(
}
}

async function processBlockAndPersist(params: {
context: ChainContext;
blockNumber: number;
events: ConditionalOrderCreatedEvent[];
currentBlock?: providers.Block;
log: LoggerWithMethods;
provider: ethers.providers.Provider;
}) {
const { context, blockNumber, events, currentBlock, log, provider } = params;
const block = await provider.getBlock(blockNumber);
try {
await processBlock(
context,
block,
events,
currentBlock?.number,
currentBlock?.timestamp
);

// Set the last processed block to this iteration's block number
context.registry.lastProcessedBlock = blockToRegistryBlock(block);
await context.registry.write();

// Set the block height metric
metrics.blockHeight.labels(context.toString()).set(blockNumber);
} catch (err) {
log.error(`Error processing block ${block.number}`, err);
}

// Set the last processed block to the current block number
context.registry.lastProcessedBlock = blockToRegistryBlock(block);

log.debug(`Block ${blockNumber} has been processed`);

// Save the registry
await context.registry.write();
}

async function pollContractForEvents(
fromBlock: number,
toBlock: number | "latest",
Expand Down
6 changes: 0 additions & 6 deletions src/types/model.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ import Slack = require("node-slack");

import { BytesLike, ethers } from "ethers";

import type { ConditionalOrderCreatedEvent } from "./generated/ComposableCoW";
import { ConditionalOrderParams, PollResult } from "@cowprotocol/cow-sdk";
import { DBService } from "../services";
import { metrics } from "../utils";
Expand All @@ -27,11 +26,6 @@ export interface ExecutionContext {
storage: DBService;
}

// Todo: This should also encompass `MerkleRootSet`
export interface ReplayPlan {
[key: number]: Set<ConditionalOrderCreatedEvent>;
}

/**
* A merkle proof is a set of parameters:
* - `merkleRoot`: the merkle root of the conditional order
Expand Down

0 comments on commit 3e88069

Please sign in to comment.