From 759ff2de1635f4b910c2b4957b071707883ec108 Mon Sep 17 00:00:00 2001 From: nigiri <168690269+0xnigir1@users.noreply.github.com> Date: Wed, 8 Jan 2025 22:26:57 -0300 Subject: [PATCH 1/4] feat: fetch complete block numbers & ignore false positive error --- packages/data-flow/src/eventsFetcher.ts | 26 ++-- .../src/interfaces/eventsFetcher.interface.ts | 15 +- packages/data-flow/src/orchestrator.ts | 64 ++++++-- .../data-flow/test/unit/eventsFetcher.spec.ts | 18 ++- .../data-flow/test/unit/orchestrator.spec.ts | 61 ++++++++ packages/indexer-client/src/external.ts | 2 +- .../src/interfaces/indexerClient.ts | 16 +- .../src/providers/envioIndexerClient.ts | 119 ++++++++++++--- .../test/unit/envioIndexerClient.spec.ts | 138 +++++++++++++----- 9 files changed, 360 insertions(+), 99 deletions(-) diff --git a/packages/data-flow/src/eventsFetcher.ts b/packages/data-flow/src/eventsFetcher.ts index 9848215..fb13565 100644 --- a/packages/data-flow/src/eventsFetcher.ts +++ b/packages/data-flow/src/eventsFetcher.ts @@ -1,23 +1,29 @@ -import { GetEventsFilters, IIndexerClient } from "@grants-stack-indexer/indexer-client"; -import { AnyIndexerFetchedEvent, ChainId } from "@grants-stack-indexer/shared"; +import { + GetEventsAfterBlockNumberAndLogIndexParams, + GetEventsFilters, + IIndexerClient, +} from "@grants-stack-indexer/indexer-client"; +import { AnyIndexerFetchedEvent } from "@grants-stack-indexer/shared"; import { IEventsFetcher } from "./interfaces/index.js"; export class EventsFetcher implements IEventsFetcher { constructor(private indexerClient: IIndexerClient) {} /* @inheritdoc */ - async fetchEventsByBlockNumberAndLogIndex( - chainId: ChainId, - blockNumber: number, - logIndex: number, - limit: number = 100, - ): Promise { - return await this.indexerClient.getEventsAfterBlockNumberAndLogIndex( + async fetchEventsByBlockNumberAndLogIndex({ + chainId, + blockNumber, + logIndex, + limit = 100, + lastBlockComplete = false, + }: GetEventsAfterBlockNumberAndLogIndexParams): Promise { + return await this.indexerClient.getEventsAfterBlockNumberAndLogIndex({ chainId, blockNumber, logIndex, limit, - ); + lastBlockComplete, + }); } /** @inheritdoc */ diff --git a/packages/data-flow/src/interfaces/eventsFetcher.interface.ts b/packages/data-flow/src/interfaces/eventsFetcher.interface.ts index 27969fa..1e6af7d 100644 --- a/packages/data-flow/src/interfaces/eventsFetcher.interface.ts +++ b/packages/data-flow/src/interfaces/eventsFetcher.interface.ts @@ -1,5 +1,8 @@ -import { GetEventsFilters } from "@grants-stack-indexer/indexer-client"; -import { AnyIndexerFetchedEvent, ChainId } from "@grants-stack-indexer/shared"; +import { + GetEventsAfterBlockNumberAndLogIndexParams, + GetEventsFilters, +} from "@grants-stack-indexer/indexer-client"; +import { AnyIndexerFetchedEvent } from "@grants-stack-indexer/shared"; /** * Interface for the events fetcher @@ -10,13 +13,11 @@ export interface IEventsFetcher { * @param chainId id of the chain * @param blockNumber block number to fetch events from * @param logIndex log index in the block to fetch events from - * @param limit limit of events to fetch + * @param limit limit of events to fetch\ + * @param lastBlockComplete Whether to fetch the last block completely */ fetchEventsByBlockNumberAndLogIndex( - chainId: ChainId, - blockNumber: number, - logIndex: number, - limit?: number, + params: GetEventsAfterBlockNumberAndLogIndexParams, ): Promise; /** diff --git a/packages/data-flow/src/orchestrator.ts b/packages/data-flow/src/orchestrator.ts index 1e732a5..d2e0a69 100644 --- a/packages/data-flow/src/orchestrator.ts +++ b/packages/data-flow/src/orchestrator.ts @@ -6,9 +6,12 @@ import { UnsupportedEventException, UnsupportedStrategy, } from "@grants-stack-indexer/processors"; +import { RoundNotFound } from "@grants-stack-indexer/repository"; +import { RoundNotFoundForId } from "@grants-stack-indexer/repository/dist/src/exceptions/roundNotFound.exception.js"; import { Address, AnyEvent, + AnyIndexerFetchedEvent, ChainId, ContractName, Hex, @@ -57,6 +60,7 @@ import { CoreDependencies, DataLoader, delay, IQueue, iStrategyAbi, Queue } from */ export class Orchestrator { private readonly eventsQueue: IQueue>; + private readonly eventsByBlockContext: Map; private readonly eventsFetcher: IEventsFetcher; private readonly eventsProcessor: EventsProcessor; private readonly eventsRegistry: IEventsRegistry; @@ -105,6 +109,7 @@ export class Orchestrator { this.logger, ); this.eventsQueue = new Queue>(fetchLimit); + this.eventsByBlockContext = new Map(); this.retryHandler = new RetryHandler(retryStrategy, this.logger); } @@ -163,11 +168,17 @@ export class Orchestrator { chainId: this.chainId, }); } else if (error instanceof Error || isNativeError(error)) { - this.logger.error(error, { - event, - className: Orchestrator.name, - chainId: this.chainId, - }); + const shouldIgnoreError = this.shouldIgnoreTimestampsUpdatedError( + error, + event!, + ); + if (!shouldIgnoreError) { + this.logger.error(error, { + event, + className: Orchestrator.name, + chainId: this.chainId, + }); + } } else { this.logger.error( new Error(`Error processing event: ${stringify(event)} ${error}`), @@ -187,6 +198,31 @@ export class Orchestrator { }); } + /** + * Sometimes the TimestampsUpdated event is part of the _initialize() function of a strategy. + * In this case, the event is emitted before the PoolCreated event. We can safely ignore the error + * if the PoolCreated event is present in the same block. + */ + private shouldIgnoreTimestampsUpdatedError( + error: Error, + event: ProcessorEvent, + ): boolean { + if ( + (error instanceof RoundNotFound || error instanceof RoundNotFoundForId) && + (event?.eventName === "TimestampsUpdated" || + event?.eventName === "TimestampsUpdatedWithRegistrationAndAllocation") + ) { + const events = this.eventsByBlockContext.get(event.blockNumber); + return ( + events + ?.filter((e) => e.logIndex > event.logIndex) + .some((event) => event.eventName === "PoolCreated") ?? false + ); + } + + return false; + } + /** * Enqueue new events from the events fetcher using the last processed event as a starting point */ @@ -195,12 +231,22 @@ export class Orchestrator { const blockNumber = lastProcessedEvent?.blockNumber ?? 0; const logIndex = lastProcessedEvent?.logIndex ?? 0; - const events = await this.eventsFetcher.fetchEventsByBlockNumberAndLogIndex( - this.chainId, + const events = await this.eventsFetcher.fetchEventsByBlockNumberAndLogIndex({ + chainId: this.chainId, blockNumber, logIndex, - this.fetchLimit, - ); + limit: this.fetchLimit, + lastBlockComplete: true, + }); + + // Clear previous context + this.eventsByBlockContext.clear(); + for (const event of events) { + if (!this.eventsByBlockContext.has(event.blockNumber)) { + this.eventsByBlockContext.set(event.blockNumber, []); + } + this.eventsByBlockContext.get(event.blockNumber)!.push(event); + } this.eventsQueue.push(...events); } diff --git a/packages/data-flow/test/unit/eventsFetcher.spec.ts b/packages/data-flow/test/unit/eventsFetcher.spec.ts index 586eedb..1236478 100644 --- a/packages/data-flow/test/unit/eventsFetcher.spec.ts +++ b/packages/data-flow/test/unit/eventsFetcher.spec.ts @@ -54,22 +54,22 @@ describe("EventsFetcher", () => { const chainId = 1 as ChainId; const blockNumber = 1000; const logIndex = 0; - const limit = 100; indexerClientMock.getEventsAfterBlockNumberAndLogIndex.mockResolvedValue(mockEvents); - const result = await eventsFetcher.fetchEventsByBlockNumberAndLogIndex( + const result = await eventsFetcher.fetchEventsByBlockNumberAndLogIndex({ chainId, blockNumber, logIndex, - ); + }); - expect(indexerClientMock.getEventsAfterBlockNumberAndLogIndex).toHaveBeenCalledWith( + expect(indexerClientMock.getEventsAfterBlockNumberAndLogIndex).toHaveBeenCalledWith({ chainId, blockNumber, logIndex, - limit, - ); + limit: 100, + lastBlockComplete: false, + }); expect(result).toEqual(mockEvents); }); @@ -83,7 +83,11 @@ describe("EventsFetcher", () => { ); await expect( - eventsFetcher.fetchEventsByBlockNumberAndLogIndex(chainId, blockNumber, logIndex), + eventsFetcher.fetchEventsByBlockNumberAndLogIndex({ + chainId, + blockNumber, + logIndex, + }), ).rejects.toThrow("Network error"); }); }); diff --git a/packages/data-flow/test/unit/orchestrator.spec.ts b/packages/data-flow/test/unit/orchestrator.spec.ts index 31a2596..5989b59 100644 --- a/packages/data-flow/test/unit/orchestrator.spec.ts +++ b/packages/data-flow/test/unit/orchestrator.spec.ts @@ -11,7 +11,9 @@ import { IProjectRepository, IRoundRepository, ITransactionManager, + RoundNotFound, } from "@grants-stack-indexer/repository"; +import { RoundNotFoundForId } from "@grants-stack-indexer/repository/dist/src/internal.js"; import { AlloEvent, ChainId, @@ -693,6 +695,65 @@ describe("Orchestrator", { sequential: true }, () => { }); expect(dataLoaderSpy).toHaveBeenCalledTimes(1); }); + + it("ignores TimestampsUpdated errors if PoolCreated is in the same block", async () => { + const strategyAddress = "0x123" as Address; + const strategyId = + "0x6f9291df02b2664139cec5703c124e4ebce32879c74b6297faa1468aa5ff9ebf" as Hex; + const timestampsUpdatedEvent = createMockEvent("Strategy", "TimestampsUpdated", 1); + timestampsUpdatedEvent.logIndex = 0; + const timestampUpdatedEvent2 = createMockEvent( + "Strategy", + "TimestampsUpdatedWithRegistrationAndAllocation", + 1, + ); + timestampUpdatedEvent2.logIndex = 1; + const poolCreatedEvent = createMockEvent("Allo", "PoolCreated", 1, { + strategy: strategyAddress, + poolId: "1", + profileId: "0x123", + token: "0x123", + amount: "100", + metadata: ["1", "1"], + }); + poolCreatedEvent.logIndex = 3; + + const eventsProcessorSpy = vi.spyOn(orchestrator["eventsProcessor"], "processEvent"); + + vi.spyOn(mockEventsRegistry, "getLastProcessedEvent").mockResolvedValue(undefined); + vi.spyOn(mockIndexerClient, "getEventsAfterBlockNumberAndLogIndex") + .mockResolvedValueOnce([ + timestampsUpdatedEvent, + timestampUpdatedEvent2, + poolCreatedEvent, + ]) + .mockResolvedValue([]); + + vi.spyOn(mockStrategyRegistry, "getStrategyId").mockResolvedValue(undefined); + vi.spyOn(mockEvmProvider, "readContract").mockResolvedValue(strategyId); + + eventsProcessorSpy + .mockRejectedValueOnce(new RoundNotFound(chainId, strategyAddress)) + .mockRejectedValueOnce(new RoundNotFoundForId(chainId, "1")) + .mockResolvedValueOnce([]); + + vi.spyOn(mockEventsRegistry, "saveLastProcessedEvent").mockImplementation(() => { + return Promise.resolve(); + }); + + vi.spyOn(orchestrator["dataLoader"], "applyChanges").mockResolvedValue( + await Promise.resolve(), + ); + + runPromise = orchestrator.run(abortController.signal); + + await vi.waitFor(() => { + if (eventsProcessorSpy.mock.calls.length < 3) throw new Error("Not yet called"); + }); + + expect(orchestrator["eventsProcessor"].processEvent).toHaveBeenCalledTimes(3); + expect(logger.error).not.toHaveBeenCalled(); + }); }); }); diff --git a/packages/indexer-client/src/external.ts b/packages/indexer-client/src/external.ts index b9a9716..27b9591 100644 --- a/packages/indexer-client/src/external.ts +++ b/packages/indexer-client/src/external.ts @@ -2,4 +2,4 @@ export type { IIndexerClient } from "./internal.js"; export { EnvioIndexerClient } from "./internal.js"; -export type { GetEventsFilters } from "./internal.js"; +export type { GetEventsFilters, GetEventsAfterBlockNumberAndLogIndexParams } from "./internal.js"; diff --git a/packages/indexer-client/src/interfaces/indexerClient.ts b/packages/indexer-client/src/interfaces/indexerClient.ts index 7051923..4c10e8e 100644 --- a/packages/indexer-client/src/interfaces/indexerClient.ts +++ b/packages/indexer-client/src/interfaces/indexerClient.ts @@ -2,6 +2,14 @@ import { AnyIndexerFetchedEvent, ChainId } from "@grants-stack-indexer/shared"; import { GetEventsFilters } from "../internal.js"; +export type GetEventsAfterBlockNumberAndLogIndexParams = { + chainId: ChainId; + blockNumber: number; + logIndex: number; + limit?: number; + lastBlockComplete?: boolean; +}; + /** * Interface for the indexer client */ @@ -9,15 +17,13 @@ export interface IIndexerClient { /** * Get the events by block number and log index from the indexer service * @param chainId Id of the chain - * @param fromBlock Block number to start fetching events from + * @param blockNumber Block number to start fetching events from * @param logIndex Log index in the block * @param limit Limit of events to fetch + * @param lastBlockComplete Whether to fetch the last block completely */ getEventsAfterBlockNumberAndLogIndex( - chainId: ChainId, - fromBlock: number, - logIndex: number, - limit?: number, + params: GetEventsAfterBlockNumberAndLogIndexParams, ): Promise; /** diff --git a/packages/indexer-client/src/providers/envioIndexerClient.ts b/packages/indexer-client/src/providers/envioIndexerClient.ts index 516d291..d453530 100644 --- a/packages/indexer-client/src/providers/envioIndexerClient.ts +++ b/packages/indexer-client/src/providers/envioIndexerClient.ts @@ -3,7 +3,11 @@ import { gql, GraphQLClient } from "graphql-request"; import { AnyIndexerFetchedEvent, ChainId, stringify } from "@grants-stack-indexer/shared"; import { IndexerClientError, InvalidIndexerResponse } from "../exceptions/index.js"; -import { GetEventsFilters, IIndexerClient } from "../internal.js"; +import { + GetEventsAfterBlockNumberAndLogIndexParams, + GetEventsFilters, + IIndexerClient, +} from "../internal.js"; /** * Indexer client for the Envio indexer service @@ -16,12 +20,13 @@ export class EnvioIndexerClient implements IIndexerClient { this.client.setHeader("x-hasura-admin-secret", secret); } /* @inheritdoc */ - public async getEventsAfterBlockNumberAndLogIndex( - chainId: ChainId, - blockNumber: number, - logIndex: number, - limit: number = 100, - ): Promise { + public async getEventsAfterBlockNumberAndLogIndex({ + chainId, + blockNumber, + logIndex, + limit = 100, + lastBlockComplete = false, + }: GetEventsAfterBlockNumberAndLogIndexParams): Promise { try { const response = (await this.client.request( gql` @@ -61,23 +66,82 @@ export class EnvioIndexerClient implements IIndexerClient { `, { chainId, blockNumber, logIndex, limit }, )) as { raw_events: AnyIndexerFetchedEvent[] }; - if (response?.raw_events) { - return response.raw_events; + const events = response?.raw_events; + + if (events) { + if (!lastBlockComplete || events.length === 0) { + return events; + } else { + const lastBlockNumber = events[events.length - 1]!.blockNumber; + const countLastBlockEvents = events.filter( + (e) => e.blockNumber === lastBlockNumber, + ).length; + const { lastBlockEvents } = await this.getTotalEventsInBlock( + chainId, + lastBlockNumber, + ); + + // If the last event's block has more events than what we fetched, + // we need to exclude that block's events + return lastBlockEvents.aggregate.count > countLastBlockEvents + ? events.filter((e) => e.blockNumber < lastBlockNumber) + : events; + } } else { throw new InvalidIndexerResponse(stringify(response)); } } catch (error) { - if (error instanceof InvalidIndexerResponse) { - throw error; - } - throw new IndexerClientError( - stringify(error, Object.getOwnPropertyNames(error)), - { - className: EnvioIndexerClient.name, - methodName: "getEventsAfterBlockNumberAndLogIndex", - }, - error as Error, - ); + throw this.handleError(error, "getEventsAfterBlockNumberAndLogIndex"); + } + } + + /** + * Get the total number of events in a block + * @param chainId - The chain ID + * @param blockNumber - The block number + * @returns The total number of events in the block + */ + private async getTotalEventsInBlock( + chainId: ChainId, + blockNumber: number, + ): Promise<{ + lastBlockEvents: { + aggregate: { count: number }; + nodes: { block_number: number }[]; + }; + }> { + try { + const response = (await this.client.request( + gql` + query getTotalEventsInBlock($chainId: Int!, $blockNumber: Int!) { + last_block_events: raw_events_aggregate( + where: { + chain_id: { _eq: $chainId } + block_number: { _eq: $blockNumber } + } + ) { + aggregate { + count + } + nodes { + block_number + } + } + } + `, + { chainId, blockNumber }, + )) as { + last_block_events: { + aggregate: { count: number }; + nodes: { block_number: number }[]; + }; + }; + + return { + lastBlockEvents: response.last_block_events, + }; + } catch (error) { + throw this.handleError(error, "getTotalEventsInBlock"); } } @@ -174,10 +238,17 @@ export class EnvioIndexerClient implements IIndexerClient { throw new InvalidIndexerResponse(stringify(response)); } } catch (error) { - if (error instanceof InvalidIndexerResponse) { - throw error; - } - throw new IndexerClientError(stringify(error, Object.getOwnPropertyNames(error))); + throw this.handleError(error, "getEvents"); + } + } + + private handleError(error: unknown, methodName: string): Error { + if (error instanceof InvalidIndexerResponse) { + return error; } + return new IndexerClientError(stringify(error, Object.getOwnPropertyNames(error)), { + className: EnvioIndexerClient.name, + methodName, + }); } } diff --git a/packages/indexer-client/test/unit/envioIndexerClient.spec.ts b/packages/indexer-client/test/unit/envioIndexerClient.spec.ts index 10cd34a..00f7528 100644 --- a/packages/indexer-client/test/unit/envioIndexerClient.spec.ts +++ b/packages/indexer-client/test/unit/envioIndexerClient.spec.ts @@ -148,12 +148,12 @@ describe("EnvioIndexerClient", () => { }); it("returns events after the specified block number", async () => { - const result = await envioIndexerClient.getEventsAfterBlockNumberAndLogIndex( - 1 as ChainId, - 100, - 0, - 100, - ); + const result = await envioIndexerClient.getEventsAfterBlockNumberAndLogIndex({ + chainId: 1 as ChainId, + blockNumber: 100, + logIndex: 0, + limit: 100, + }); expect(result).toHaveLength(3); expect(result).toEqual( @@ -166,12 +166,12 @@ describe("EnvioIndexerClient", () => { }); it("returns only events after the specified log index within the same block", async () => { - const result = await envioIndexerClient.getEventsAfterBlockNumberAndLogIndex( - 1 as ChainId, - 100, - 2, - 100, - ); + const result = await envioIndexerClient.getEventsAfterBlockNumberAndLogIndex({ + chainId: 1 as ChainId, + blockNumber: 100, + logIndex: 2, + limit: 100, + }); expect(result).toHaveLength(2); expect(result).toEqual( @@ -183,23 +183,23 @@ describe("EnvioIndexerClient", () => { }); it("respects the limit parameter", async () => { - const result = await envioIndexerClient.getEventsAfterBlockNumberAndLogIndex( - 1 as ChainId, - 100, - 0, - 2, - ); + const result = await envioIndexerClient.getEventsAfterBlockNumberAndLogIndex({ + chainId: 1 as ChainId, + blockNumber: 100, + logIndex: 0, + limit: 2, + }); expect(result).toHaveLength(2); }); it("returns empty array when no matching events found", async () => { - const result = await envioIndexerClient.getEventsAfterBlockNumberAndLogIndex( - 1 as ChainId, - 102, - 0, - 100, - ); + const result = await envioIndexerClient.getEventsAfterBlockNumberAndLogIndex({ + chainId: 1 as ChainId, + blockNumber: 102, + logIndex: 0, + limit: 100, + }); expect(result).toHaveLength(0); }); @@ -215,7 +215,12 @@ describe("EnvioIndexerClient", () => { graphqlClient.request.mockResolvedValue(mockedResponse); await expect( - envioIndexerClient.getEventsAfterBlockNumberAndLogIndex(1 as ChainId, 12345, 0), + envioIndexerClient.getEventsAfterBlockNumberAndLogIndex({ + chainId: 1 as ChainId, + blockNumber: 12345, + logIndex: 0, + limit: 100, + }), ).rejects.toThrow(InvalidIndexerResponse); }); @@ -224,7 +229,12 @@ describe("EnvioIndexerClient", () => { graphqlClient.request.mockRejectedValue(error); await expect( - envioIndexerClient.getEventsAfterBlockNumberAndLogIndex(1 as ChainId, 12345, 0), + envioIndexerClient.getEventsAfterBlockNumberAndLogIndex({ + chainId: 1 as ChainId, + blockNumber: 12345, + logIndex: 0, + limit: 100, + }), ).rejects.toThrow(IndexerClientError); }); @@ -237,11 +247,11 @@ describe("EnvioIndexerClient", () => { graphqlClient.request.mockResolvedValue(mockedResponse); // Call the method without the limit argument - const result = await envioIndexerClient.getEventsAfterBlockNumberAndLogIndex( - 1 as ChainId, - 12345, - 0, - ); + const result = await envioIndexerClient.getEventsAfterBlockNumberAndLogIndex({ + chainId: 1 as ChainId, + blockNumber: 12345, + logIndex: 0, + }); expect(result).toEqual(testEvents); expect(graphqlClient.request).toHaveBeenCalledWith( @@ -256,13 +266,69 @@ describe("EnvioIndexerClient", () => { }); it("returns an empty array when no events are found", async () => { - const result = await envioIndexerClient.getEventsAfterBlockNumberAndLogIndex( - 1 as ChainId, - 10_000, - 0, - ); + const result = await envioIndexerClient.getEventsAfterBlockNumberAndLogIndex({ + chainId: 1 as ChainId, + blockNumber: 10_000, + logIndex: 0, + }); expect(result).toEqual([]); }); + + it("discards events from the last block if lastBlockComplete is true", async () => { + // Mock the request implementation to simulate database querying + graphqlClient.request + .mockImplementationOnce( + async ( + _document: RequestDocument | RequestOptions, + ...args: object[] + ) => { + const variables = args[0] as { + chainId: ChainId; + blockNumber: number; + logIndex: number; + limit: number; + }; + const { chainId, blockNumber, logIndex, limit } = variables; + + const filteredEvents = testEvents + .filter((event) => { + // Match chainId + if (event.chainId !== chainId) return false; + + // Implement the _or condition from the GraphQL query + return ( + event.blockNumber > blockNumber || + (event.blockNumber === blockNumber && event.logIndex > logIndex) + ); + }) + .slice(0, limit); // Apply limit + + return { raw_events: filteredEvents }; + }, + ) + .mockResolvedValueOnce({ + last_block_events: { + aggregate: { count: 5 }, + nodes: [], + }, + }); + + const result = await envioIndexerClient.getEventsAfterBlockNumberAndLogIndex({ + chainId: 1 as ChainId, + blockNumber: 100, + logIndex: 0, + limit: 3, + lastBlockComplete: true, + }); + + expect(result).toHaveLength(2); + expect(result).toEqual(testEvents.slice(0, 2)); + expect(graphqlClient.request).toHaveBeenCalledTimes(2); + expect(graphqlClient.request).toHaveBeenNthCalledWith(2, expect.any(String), { + chainId: 1, + blockNumber: 101, + }); + }); }); describe("getEvents (old test cases)", () => { From aae2080eeb31f8647186d67f1d87d69de2a32f98 Mon Sep 17 00:00:00 2001 From: nigiri <168690269+0xnigir1@users.noreply.github.com> Date: Thu, 9 Jan 2025 15:27:00 -0300 Subject: [PATCH 2/4] fix: import path --- packages/data-flow/src/orchestrator.ts | 3 +-- packages/repository/src/external.ts | 1 + 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/data-flow/src/orchestrator.ts b/packages/data-flow/src/orchestrator.ts index d2e0a69..5979b60 100644 --- a/packages/data-flow/src/orchestrator.ts +++ b/packages/data-flow/src/orchestrator.ts @@ -6,8 +6,7 @@ import { UnsupportedEventException, UnsupportedStrategy, } from "@grants-stack-indexer/processors"; -import { RoundNotFound } from "@grants-stack-indexer/repository"; -import { RoundNotFoundForId } from "@grants-stack-indexer/repository/dist/src/exceptions/roundNotFound.exception.js"; +import { RoundNotFound, RoundNotFoundForId } from "@grants-stack-indexer/repository"; import { Address, AnyEvent, diff --git a/packages/repository/src/external.ts b/packages/repository/src/external.ts index fd9428b..04a2893 100644 --- a/packages/repository/src/external.ts +++ b/packages/repository/src/external.ts @@ -63,6 +63,7 @@ export { export { RoundNotFound, + RoundNotFoundForId, ApplicationNotFound, ProjectNotFound, ProjectByRoleNotFound, From fd9aac9910e65fa3f324a8d3ae505ad1cd1c8d86 Mon Sep 17 00:00:00 2001 From: nigiri <168690269+0xnigir1@users.noreply.github.com> Date: Thu, 9 Jan 2025 15:39:10 -0300 Subject: [PATCH 3/4] style: change variable name for better clarity --- packages/data-flow/src/eventsFetcher.ts | 4 ++-- packages/data-flow/src/interfaces/eventsFetcher.interface.ts | 2 +- packages/data-flow/src/orchestrator.ts | 2 +- packages/data-flow/test/unit/eventsFetcher.spec.ts | 2 +- packages/indexer-client/src/interfaces/indexerClient.ts | 4 ++-- packages/indexer-client/src/providers/envioIndexerClient.ts | 4 ++-- packages/indexer-client/test/unit/envioIndexerClient.spec.ts | 4 ++-- 7 files changed, 11 insertions(+), 11 deletions(-) diff --git a/packages/data-flow/src/eventsFetcher.ts b/packages/data-flow/src/eventsFetcher.ts index fb13565..9d25533 100644 --- a/packages/data-flow/src/eventsFetcher.ts +++ b/packages/data-flow/src/eventsFetcher.ts @@ -15,14 +15,14 @@ export class EventsFetcher implements IEventsFetcher { blockNumber, logIndex, limit = 100, - lastBlockComplete = false, + allowPartialLastBlock = true, }: GetEventsAfterBlockNumberAndLogIndexParams): Promise { return await this.indexerClient.getEventsAfterBlockNumberAndLogIndex({ chainId, blockNumber, logIndex, limit, - lastBlockComplete, + allowPartialLastBlock, }); } diff --git a/packages/data-flow/src/interfaces/eventsFetcher.interface.ts b/packages/data-flow/src/interfaces/eventsFetcher.interface.ts index 1e6af7d..42abc95 100644 --- a/packages/data-flow/src/interfaces/eventsFetcher.interface.ts +++ b/packages/data-flow/src/interfaces/eventsFetcher.interface.ts @@ -14,7 +14,7 @@ export interface IEventsFetcher { * @param blockNumber block number to fetch events from * @param logIndex log index in the block to fetch events from * @param limit limit of events to fetch\ - * @param lastBlockComplete Whether to fetch the last block completely + * @param allowPartialLastBlock Whether last block is allowed to be partially fetched */ fetchEventsByBlockNumberAndLogIndex( params: GetEventsAfterBlockNumberAndLogIndexParams, diff --git a/packages/data-flow/src/orchestrator.ts b/packages/data-flow/src/orchestrator.ts index 5979b60..2d541a7 100644 --- a/packages/data-flow/src/orchestrator.ts +++ b/packages/data-flow/src/orchestrator.ts @@ -235,7 +235,7 @@ export class Orchestrator { blockNumber, logIndex, limit: this.fetchLimit, - lastBlockComplete: true, + allowPartialLastBlock: false, }); // Clear previous context diff --git a/packages/data-flow/test/unit/eventsFetcher.spec.ts b/packages/data-flow/test/unit/eventsFetcher.spec.ts index 1236478..2563a55 100644 --- a/packages/data-flow/test/unit/eventsFetcher.spec.ts +++ b/packages/data-flow/test/unit/eventsFetcher.spec.ts @@ -68,7 +68,7 @@ describe("EventsFetcher", () => { blockNumber, logIndex, limit: 100, - lastBlockComplete: false, + allowPartialLastBlock: true, }); expect(result).toEqual(mockEvents); }); diff --git a/packages/indexer-client/src/interfaces/indexerClient.ts b/packages/indexer-client/src/interfaces/indexerClient.ts index 4c10e8e..e2d7edc 100644 --- a/packages/indexer-client/src/interfaces/indexerClient.ts +++ b/packages/indexer-client/src/interfaces/indexerClient.ts @@ -7,7 +7,7 @@ export type GetEventsAfterBlockNumberAndLogIndexParams = { blockNumber: number; logIndex: number; limit?: number; - lastBlockComplete?: boolean; + allowPartialLastBlock?: boolean; }; /** @@ -20,7 +20,7 @@ export interface IIndexerClient { * @param blockNumber Block number to start fetching events from * @param logIndex Log index in the block * @param limit Limit of events to fetch - * @param lastBlockComplete Whether to fetch the last block completely + * @param allowPartialLastBlock Whether last block is allowed to be partially fetched */ getEventsAfterBlockNumberAndLogIndex( params: GetEventsAfterBlockNumberAndLogIndexParams, diff --git a/packages/indexer-client/src/providers/envioIndexerClient.ts b/packages/indexer-client/src/providers/envioIndexerClient.ts index d453530..5156f93 100644 --- a/packages/indexer-client/src/providers/envioIndexerClient.ts +++ b/packages/indexer-client/src/providers/envioIndexerClient.ts @@ -25,7 +25,7 @@ export class EnvioIndexerClient implements IIndexerClient { blockNumber, logIndex, limit = 100, - lastBlockComplete = false, + allowPartialLastBlock = true, }: GetEventsAfterBlockNumberAndLogIndexParams): Promise { try { const response = (await this.client.request( @@ -69,7 +69,7 @@ export class EnvioIndexerClient implements IIndexerClient { const events = response?.raw_events; if (events) { - if (!lastBlockComplete || events.length === 0) { + if (allowPartialLastBlock || events.length === 0) { return events; } else { const lastBlockNumber = events[events.length - 1]!.blockNumber; diff --git a/packages/indexer-client/test/unit/envioIndexerClient.spec.ts b/packages/indexer-client/test/unit/envioIndexerClient.spec.ts index 00f7528..3466851 100644 --- a/packages/indexer-client/test/unit/envioIndexerClient.spec.ts +++ b/packages/indexer-client/test/unit/envioIndexerClient.spec.ts @@ -274,7 +274,7 @@ describe("EnvioIndexerClient", () => { expect(result).toEqual([]); }); - it("discards events from the last block if lastBlockComplete is true", async () => { + it("discards events from the last block if allowPartialLastBlock is false", async () => { // Mock the request implementation to simulate database querying graphqlClient.request .mockImplementationOnce( @@ -318,7 +318,7 @@ describe("EnvioIndexerClient", () => { blockNumber: 100, logIndex: 0, limit: 3, - lastBlockComplete: true, + allowPartialLastBlock: false, }); expect(result).toHaveLength(2); From c23065046701685ab016abbd3b3fb21f4dd124b0 Mon Sep 17 00:00:00 2001 From: nigiri <168690269+0xnigir1@users.noreply.github.com> Date: Thu, 9 Jan 2025 15:41:09 -0300 Subject: [PATCH 4/4] style: extract condition to vars --- packages/data-flow/src/orchestrator.ts | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/packages/data-flow/src/orchestrator.ts b/packages/data-flow/src/orchestrator.ts index 2d541a7..16b6c21 100644 --- a/packages/data-flow/src/orchestrator.ts +++ b/packages/data-flow/src/orchestrator.ts @@ -206,11 +206,13 @@ export class Orchestrator { error: Error, event: ProcessorEvent, ): boolean { - if ( - (error instanceof RoundNotFound || error instanceof RoundNotFoundForId) && - (event?.eventName === "TimestampsUpdated" || - event?.eventName === "TimestampsUpdatedWithRegistrationAndAllocation") - ) { + const canIgnoreErrorClass = + error instanceof RoundNotFound || error instanceof RoundNotFoundForId; + const canIgnoreEventName = + event?.eventName === "TimestampsUpdated" || + event?.eventName === "TimestampsUpdatedWithRegistrationAndAllocation"; + + if (canIgnoreErrorClass && canIgnoreEventName) { const events = this.eventsByBlockContext.get(event.blockNumber); return ( events