Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: fetch complete block numbers & ignore false positive error #52

Merged
merged 5 commits into from
Jan 9, 2025
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 16 additions & 10 deletions packages/data-flow/src/eventsFetcher.ts
Original file line number Diff line number Diff line change
@@ -1,23 +1,29 @@
import { GetEventsFilters, IIndexerClient } from "@grants-stack-indexer/indexer-client";
import { AnyIndexerFetchedEvent, ChainId } from "@grants-stack-indexer/shared";
import {
GetEventsAfterBlockNumberAndLogIndexParams,
GetEventsFilters,
IIndexerClient,
} from "@grants-stack-indexer/indexer-client";
import { AnyIndexerFetchedEvent } from "@grants-stack-indexer/shared";

import { IEventsFetcher } from "./interfaces/index.js";

export class EventsFetcher implements IEventsFetcher {
constructor(private indexerClient: IIndexerClient) {}
/* @inheritdoc */
async fetchEventsByBlockNumberAndLogIndex(
chainId: ChainId,
blockNumber: number,
logIndex: number,
limit: number = 100,
): Promise<AnyIndexerFetchedEvent[]> {
return await this.indexerClient.getEventsAfterBlockNumberAndLogIndex(
async fetchEventsByBlockNumberAndLogIndex({
chainId,
blockNumber,
logIndex,
limit = 100,
lastBlockComplete = false,
}: GetEventsAfterBlockNumberAndLogIndexParams): Promise<AnyIndexerFetchedEvent[]> {
return await this.indexerClient.getEventsAfterBlockNumberAndLogIndex({
chainId,
blockNumber,
logIndex,
limit,
);
lastBlockComplete,
});
}

/** @inheritdoc */
Expand Down
15 changes: 8 additions & 7 deletions packages/data-flow/src/interfaces/eventsFetcher.interface.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
import { GetEventsFilters } from "@grants-stack-indexer/indexer-client";
import { AnyIndexerFetchedEvent, ChainId } from "@grants-stack-indexer/shared";
import {
GetEventsAfterBlockNumberAndLogIndexParams,
GetEventsFilters,
} from "@grants-stack-indexer/indexer-client";
import { AnyIndexerFetchedEvent } from "@grants-stack-indexer/shared";

/**
* Interface for the events fetcher
Expand All @@ -10,13 +13,11 @@ export interface IEventsFetcher {
* @param chainId id of the chain
* @param blockNumber block number to fetch events from
* @param logIndex log index in the block to fetch events from
* @param limit limit of events to fetch
* @param limit limit of events to fetch\
* @param lastBlockComplete Whether to fetch the last block completely
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hanging backslash

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will fix in another PR the typo

*/
fetchEventsByBlockNumberAndLogIndex(
chainId: ChainId,
blockNumber: number,
logIndex: number,
limit?: number,
params: GetEventsAfterBlockNumberAndLogIndexParams,
): Promise<AnyIndexerFetchedEvent[]>;

/**
Expand Down
64 changes: 55 additions & 9 deletions packages/data-flow/src/orchestrator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,12 @@ import {
UnsupportedEventException,
UnsupportedStrategy,
} from "@grants-stack-indexer/processors";
import { RoundNotFound } from "@grants-stack-indexer/repository";
import { RoundNotFoundForId } from "@grants-stack-indexer/repository/dist/src/exceptions/roundNotFound.exception.js";
0xyaco marked this conversation as resolved.
Show resolved Hide resolved
import {
Address,
AnyEvent,
AnyIndexerFetchedEvent,
ChainId,
ContractName,
Hex,
Expand Down Expand Up @@ -57,6 +60,7 @@ import { CoreDependencies, DataLoader, delay, IQueue, iStrategyAbi, Queue } from
*/
export class Orchestrator {
private readonly eventsQueue: IQueue<ProcessorEvent<ContractName, AnyEvent>>;
private readonly eventsByBlockContext: Map<number, AnyIndexerFetchedEvent[]>;
private readonly eventsFetcher: IEventsFetcher;
private readonly eventsProcessor: EventsProcessor;
private readonly eventsRegistry: IEventsRegistry;
Expand Down Expand Up @@ -105,6 +109,7 @@ export class Orchestrator {
this.logger,
);
this.eventsQueue = new Queue<ProcessorEvent<ContractName, AnyEvent>>(fetchLimit);
this.eventsByBlockContext = new Map<number, AnyIndexerFetchedEvent[]>();
this.retryHandler = new RetryHandler(retryStrategy, this.logger);
}

Expand Down Expand Up @@ -163,11 +168,17 @@ export class Orchestrator {
chainId: this.chainId,
});
} else if (error instanceof Error || isNativeError(error)) {
this.logger.error(error, {
event,
className: Orchestrator.name,
chainId: this.chainId,
});
const shouldIgnoreError = this.shouldIgnoreTimestampsUpdatedError(
error,
event!,
);
if (!shouldIgnoreError) {
this.logger.error(error, {
event,
className: Orchestrator.name,
chainId: this.chainId,
});
}
} else {
this.logger.error(
new Error(`Error processing event: ${stringify(event)} ${error}`),
Expand All @@ -187,6 +198,31 @@ export class Orchestrator {
});
}

/**
* Sometimes the TimestampsUpdated event is part of the _initialize() function of a strategy.
* In this case, the event is emitted before the PoolCreated event. We can safely ignore the error
* if the PoolCreated event is present in the same block.
*/
private shouldIgnoreTimestampsUpdatedError(
error: Error,
event: ProcessorEvent<ContractName, AnyEvent>,
): boolean {
if (
(error instanceof RoundNotFound || error instanceof RoundNotFoundForId) &&
(event?.eventName === "TimestampsUpdated" ||
event?.eventName === "TimestampsUpdatedWithRegistrationAndAllocation")
) {
0xyaco marked this conversation as resolved.
Show resolved Hide resolved
const events = this.eventsByBlockContext.get(event.blockNumber);
return (
events
?.filter((e) => e.logIndex > event.logIndex)
.some((event) => event.eventName === "PoolCreated") ?? false
);
}

return false;
}

/**
* Enqueue new events from the events fetcher using the last processed event as a starting point
*/
Expand All @@ -195,12 +231,22 @@ export class Orchestrator {
const blockNumber = lastProcessedEvent?.blockNumber ?? 0;
const logIndex = lastProcessedEvent?.logIndex ?? 0;

const events = await this.eventsFetcher.fetchEventsByBlockNumberAndLogIndex(
this.chainId,
const events = await this.eventsFetcher.fetchEventsByBlockNumberAndLogIndex({
chainId: this.chainId,
blockNumber,
logIndex,
this.fetchLimit,
);
limit: this.fetchLimit,
lastBlockComplete: true,
});

// Clear previous context
this.eventsByBlockContext.clear();
for (const event of events) {
if (!this.eventsByBlockContext.has(event.blockNumber)) {
this.eventsByBlockContext.set(event.blockNumber, []);
}
this.eventsByBlockContext.get(event.blockNumber)!.push(event);
}

this.eventsQueue.push(...events);
}
Expand Down
18 changes: 11 additions & 7 deletions packages/data-flow/test/unit/eventsFetcher.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,22 +54,22 @@ describe("EventsFetcher", () => {
const chainId = 1 as ChainId;
const blockNumber = 1000;
const logIndex = 0;
const limit = 100;

indexerClientMock.getEventsAfterBlockNumberAndLogIndex.mockResolvedValue(mockEvents);

const result = await eventsFetcher.fetchEventsByBlockNumberAndLogIndex(
const result = await eventsFetcher.fetchEventsByBlockNumberAndLogIndex({
chainId,
blockNumber,
logIndex,
);
});

expect(indexerClientMock.getEventsAfterBlockNumberAndLogIndex).toHaveBeenCalledWith(
expect(indexerClientMock.getEventsAfterBlockNumberAndLogIndex).toHaveBeenCalledWith({
chainId,
blockNumber,
logIndex,
limit,
);
limit: 100,
lastBlockComplete: false,
});
expect(result).toEqual(mockEvents);
});

Expand All @@ -83,7 +83,11 @@ describe("EventsFetcher", () => {
);

await expect(
eventsFetcher.fetchEventsByBlockNumberAndLogIndex(chainId, blockNumber, logIndex),
eventsFetcher.fetchEventsByBlockNumberAndLogIndex({
chainId,
blockNumber,
logIndex,
}),
).rejects.toThrow("Network error");
});
});
61 changes: 61 additions & 0 deletions packages/data-flow/test/unit/orchestrator.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ import {
IProjectRepository,
IRoundRepository,
ITransactionManager,
RoundNotFound,
} from "@grants-stack-indexer/repository";
import { RoundNotFoundForId } from "@grants-stack-indexer/repository/dist/src/internal.js";
import {
AlloEvent,
ChainId,
Expand Down Expand Up @@ -693,6 +695,65 @@ describe("Orchestrator", { sequential: true }, () => {
});
expect(dataLoaderSpy).toHaveBeenCalledTimes(1);
});

it("ignores TimestampsUpdated errors if PoolCreated is in the same block", async () => {
const strategyAddress = "0x123" as Address;
const strategyId =
"0x6f9291df02b2664139cec5703c124e4ebce32879c74b6297faa1468aa5ff9ebf" as Hex;
const timestampsUpdatedEvent = createMockEvent("Strategy", "TimestampsUpdated", 1);
timestampsUpdatedEvent.logIndex = 0;
const timestampUpdatedEvent2 = createMockEvent(
"Strategy",
"TimestampsUpdatedWithRegistrationAndAllocation",
1,
);
timestampUpdatedEvent2.logIndex = 1;
const poolCreatedEvent = createMockEvent("Allo", "PoolCreated", 1, {
strategy: strategyAddress,
poolId: "1",
profileId: "0x123",
token: "0x123",
amount: "100",
metadata: ["1", "1"],
});
poolCreatedEvent.logIndex = 3;

const eventsProcessorSpy = vi.spyOn(orchestrator["eventsProcessor"], "processEvent");

vi.spyOn(mockEventsRegistry, "getLastProcessedEvent").mockResolvedValue(undefined);
vi.spyOn(mockIndexerClient, "getEventsAfterBlockNumberAndLogIndex")
.mockResolvedValueOnce([
timestampsUpdatedEvent,
timestampUpdatedEvent2,
poolCreatedEvent,
])
.mockResolvedValue([]);

vi.spyOn(mockStrategyRegistry, "getStrategyId").mockResolvedValue(undefined);
vi.spyOn(mockEvmProvider, "readContract").mockResolvedValue(strategyId);

eventsProcessorSpy
.mockRejectedValueOnce(new RoundNotFound(chainId, strategyAddress))
.mockRejectedValueOnce(new RoundNotFoundForId(chainId, "1"))
.mockResolvedValueOnce([]);

vi.spyOn(mockEventsRegistry, "saveLastProcessedEvent").mockImplementation(() => {
return Promise.resolve();
});

vi.spyOn(orchestrator["dataLoader"], "applyChanges").mockResolvedValue(
await Promise.resolve(),
);

runPromise = orchestrator.run(abortController.signal);

await vi.waitFor(() => {
if (eventsProcessorSpy.mock.calls.length < 3) throw new Error("Not yet called");
});

expect(orchestrator["eventsProcessor"].processEvent).toHaveBeenCalledTimes(3);
expect(logger.error).not.toHaveBeenCalled();
});
});
});

Expand Down
2 changes: 1 addition & 1 deletion packages/indexer-client/src/external.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@ export type { IIndexerClient } from "./internal.js";

export { EnvioIndexerClient } from "./internal.js";

export type { GetEventsFilters } from "./internal.js";
export type { GetEventsFilters, GetEventsAfterBlockNumberAndLogIndexParams } from "./internal.js";
16 changes: 11 additions & 5 deletions packages/indexer-client/src/interfaces/indexerClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,28 @@ import { AnyIndexerFetchedEvent, ChainId } from "@grants-stack-indexer/shared";

import { GetEventsFilters } from "../internal.js";

export type GetEventsAfterBlockNumberAndLogIndexParams = {
chainId: ChainId;
blockNumber: number;
logIndex: number;
limit?: number;
lastBlockComplete?: boolean;
};

/**
* Interface for the indexer client
*/
export interface IIndexerClient {
/**
* Get the events by block number and log index from the indexer service
* @param chainId Id of the chain
* @param fromBlock Block number to start fetching events from
* @param blockNumber Block number to start fetching events from
* @param logIndex Log index in the block
* @param limit Limit of events to fetch
* @param lastBlockComplete Whether to fetch the last block completely
*/
getEventsAfterBlockNumberAndLogIndex(
chainId: ChainId,
fromBlock: number,
logIndex: number,
limit?: number,
params: GetEventsAfterBlockNumberAndLogIndexParams,
): Promise<AnyIndexerFetchedEvent[]>;

/**
Expand Down
Loading
Loading