diff --git a/apps/processing/README.md b/apps/processing/README.md index 96f7b4f..03e5392 100644 --- a/apps/processing/README.md +++ b/apps/processing/README.md @@ -55,6 +55,7 @@ Available scripts that can be run using `pnpm`: | `start` | Run the compiled app from dist folder | | `test` | Run tests using vitest | | `test:cov` | Run tests with coverage report | +| `retroactive` | Run retroactive processing for all chains | TODO: e2e tests TODO: Docker image diff --git a/apps/processing/src/services/processing.service.ts b/apps/processing/src/services/processing.service.ts index 5e31d13..9a87df2 100644 --- a/apps/processing/src/services/processing.service.ts +++ b/apps/processing/src/services/processing.service.ts @@ -44,7 +44,11 @@ export class ProcessingService { const sharedDependencies = await SharedDependenciesService.initialize(env); const { CHAINS: chains } = env; const { core, registriesRepositories, indexerClient, kyselyDatabase } = sharedDependencies; - const { eventRegistryRepository, strategyRegistryRepository } = registriesRepositories; + const { + eventRegistryRepository, + strategyRegistryRepository, + strategyProcessingCheckpointRepository, + } = registriesRepositories; const orchestrators: Map = new Map(); const strategyRegistry = await InMemoryCachedStrategyRegistry.initialize( @@ -90,6 +94,7 @@ export class ProcessingService { { eventsRegistry: cachedEventsRegistry, strategyRegistry, + checkpointRepository: strategyProcessingCheckpointRepository, }, chain.fetchLimit, chainLogger, diff --git a/apps/processing/src/services/sharedDependencies.service.ts b/apps/processing/src/services/sharedDependencies.service.ts index 3fb29fe..df70dc1 100644 --- a/apps/processing/src/services/sharedDependencies.service.ts +++ b/apps/processing/src/services/sharedDependencies.service.ts @@ -5,6 +5,7 @@ import { PricingProviderFactory } from "@grants-stack-indexer/pricing"; import { createKyselyDatabase, IEventRegistryRepository, + IStrategyProcessingCheckpointRepository, IStrategyRegistryRepository, KyselyApplicationPayoutRepository, KyselyApplicationRepository, @@ -12,6 +13,7 @@ import { KyselyEventRegistryRepository, KyselyProjectRepository, KyselyRoundRepository, + KyselyStrategyProcessingCheckpointRepository, KyselyStrategyRegistryRepository, } from "@grants-stack-indexer/repository"; import { Logger } from "@grants-stack-indexer/shared"; @@ -23,6 +25,7 @@ export type SharedDependencies = { registriesRepositories: { eventRegistryRepository: IEventRegistryRepository; strategyRegistryRepository: IStrategyRegistryRepository; + strategyProcessingCheckpointRepository: IStrategyProcessingCheckpointRepository; }; indexerClient: EnvioIndexerClient; kyselyDatabase: ReturnType; @@ -73,6 +76,9 @@ export class SharedDependenciesService { env.DATABASE_SCHEMA, ); + const strategyProcessingCheckpointRepository = + new KyselyStrategyProcessingCheckpointRepository(kyselyDatabase, env.DATABASE_SCHEMA); + // Initialize indexer client const indexerClient = new EnvioIndexerClient( env.INDEXER_GRAPHQL_URL, @@ -92,6 +98,7 @@ export class SharedDependenciesService { registriesRepositories: { eventRegistryRepository, strategyRegistryRepository, + strategyProcessingCheckpointRepository, }, indexerClient, kyselyDatabase, diff --git a/apps/processing/test/unit/processing.service.spec.ts b/apps/processing/test/unit/processing.service.spec.ts index 15b59ff..ce71a27 100644 --- a/apps/processing/test/unit/processing.service.spec.ts +++ b/apps/processing/test/unit/processing.service.spec.ts @@ -7,6 +7,7 @@ import { InMemoryCachedEventRegistry, InMemoryCachedStrategyRegistry, Orchestrator, + RetroactiveProcessor, } from "@grants-stack-indexer/data-flow"; import type { Environment } from "../../src/config/env.js"; @@ -67,6 +68,11 @@ vi.spyOn(Orchestrator.prototype, "run").mockImplementation(async function (signa await new Promise((resolve) => setTimeout(resolve, 100)); } }); +vi.spyOn(RetroactiveProcessor.prototype, "processRetroactiveStrategies").mockImplementation( + async () => { + await new Promise((resolve) => setTimeout(resolve, 100)); + }, +); describe("ProcessingService", () => { let processingService: ProcessingService; @@ -100,96 +106,109 @@ describe("ProcessingService", () => { vi.clearAllMocks(); }); - it("initializes multiple orchestrators correctly", () => { - expect(InMemoryCachedStrategyRegistry.initialize).toHaveBeenCalledTimes(1); - expect(DatabaseStrategyRegistry).toHaveBeenCalledTimes(1); - expect(DatabaseEventRegistry).toHaveBeenCalledTimes(1); - expect(EvmProvider).toHaveBeenCalledTimes(2); - expect(InMemoryCachedEventRegistry.initialize).toHaveBeenCalledTimes(2); - - // Verify orchestrators were created with correct parameters - expect(processingService["orchestrators"].size).toBe(2); - - // Verify first chain initialization - expect(EvmProvider).toHaveBeenNthCalledWith( - 1, - ["http://localhost:8545"], - expect.any(Object), - expect.any(Object), - ); - - // Verify second chain initialization - expect(EvmProvider).toHaveBeenNthCalledWith( - 2, - ["http://localhost:8546"], - expect.any(Object), - expect.any(Object), - ); - }); - - it("starts all orchestrators and handles shutdown signals", async () => { - const abortSpy = vi.spyOn(AbortController.prototype, "abort"); - const runSpy = vi.mocked(Orchestrator.prototype.run); - const logSpy = vi.spyOn(processingService["logger"], "info"); - - const startPromise = processingService.start(); - - // Wait for orchestrators to start - await new Promise((resolve) => setTimeout(resolve, 100)); - - // Verify both orchestrators are running - // const orchestratorInstances = vi.mocked(Orchestrator).mock.results; - // Verify both orchestrators are running - expect(runSpy).toHaveBeenCalledTimes(2); - expect(runSpy.mock.calls.map((call) => call[0])).toEqual([ - expect.any(AbortSignal), - expect.any(AbortSignal), - ]); - expect(logSpy).toHaveBeenNthCalledWith(2, "Starting orchestrator for chain 1..."); - expect(logSpy).toHaveBeenNthCalledWith(3, "Starting orchestrator for chain 2..."); - - // Simulate SIGINT - process.emit("SIGINT"); - expect(abortSpy).toHaveBeenCalled(); - - // Wait for orchestrators to shut down - await startPromise; - - // Verify all orchestrators were properly shut down - expect(runSpy.mock.results.every((result) => result.value)).toBeTruthy(); - }); - - it("handles SIGTERM signal", async () => { - const abortSpy = vi.spyOn(AbortController.prototype, "abort"); - const startPromise = processingService.start(); - const runSpy = vi.mocked(Orchestrator.prototype.run); - - // Wait for orchestrators to start - await new Promise((resolve) => setTimeout(resolve, 100)); - - // Simulate SIGTERM - process.emit("SIGTERM"); - expect(abortSpy).toHaveBeenCalled(); - - await startPromise; - - // Verify all orchestrators were properly shut down - expect(runSpy.mock.results.every((result) => result.value)).toBeTruthy(); - }); - - it("releases resources correctly", async () => { - await processingService.releaseResources(); - - expect(processingService["kyselyDatabase"].destroy).toHaveBeenCalled(); + describe("start", () => { + it("initializes multiple orchestrators correctly", () => { + expect(InMemoryCachedStrategyRegistry.initialize).toHaveBeenCalledTimes(1); + expect(DatabaseStrategyRegistry).toHaveBeenCalledTimes(1); + expect(DatabaseEventRegistry).toHaveBeenCalledTimes(1); + expect(EvmProvider).toHaveBeenCalledTimes(2); + expect(InMemoryCachedEventRegistry.initialize).toHaveBeenCalledTimes(2); + + // Verify orchestrators were created with correct parameters + expect(processingService["orchestrators"].size).toBe(2); + + // Verify first chain initialization + expect(EvmProvider).toHaveBeenNthCalledWith( + 1, + ["http://localhost:8545"], + expect.any(Object), + expect.any(Object), + ); + + // Verify second chain initialization + expect(EvmProvider).toHaveBeenNthCalledWith( + 2, + ["http://localhost:8546"], + expect.any(Object), + expect.any(Object), + ); + }); + + it("starts all orchestrators and handles shutdown signals", async () => { + const abortSpy = vi.spyOn(AbortController.prototype, "abort"); + const runSpy = vi.mocked(Orchestrator.prototype.run); + const logSpy = vi.spyOn(processingService["logger"], "info"); + + const startPromise = processingService.start(); + + // Wait for orchestrators to start + await new Promise((resolve) => setTimeout(resolve, 100)); + + // Verify both orchestrators are running + // const orchestratorInstances = vi.mocked(Orchestrator).mock.results; + // Verify both orchestrators are running + expect(runSpy).toHaveBeenCalledTimes(2); + expect(runSpy.mock.calls.map((call) => call[0])).toEqual([ + expect.any(AbortSignal), + expect.any(AbortSignal), + ]); + expect(logSpy).toHaveBeenNthCalledWith(2, "Starting orchestrator for chain 1..."); + expect(logSpy).toHaveBeenNthCalledWith(3, "Starting orchestrator for chain 2..."); + + // Simulate SIGINT + process.emit("SIGINT"); + expect(abortSpy).toHaveBeenCalled(); + + // Wait for orchestrators to shut down + await startPromise; + + // Verify all orchestrators were properly shut down + expect(runSpy.mock.results.every((result) => result.value)).toBeTruthy(); + }); + + it("handles SIGTERM signal", async () => { + const abortSpy = vi.spyOn(AbortController.prototype, "abort"); + const startPromise = processingService.start(); + const runSpy = vi.mocked(Orchestrator.prototype.run); + + // Wait for orchestrators to start + await new Promise((resolve) => setTimeout(resolve, 100)); + + // Simulate SIGTERM + process.emit("SIGTERM"); + expect(abortSpy).toHaveBeenCalled(); + + await startPromise; + + // Verify all orchestrators were properly shut down + expect(runSpy.mock.results.every((result) => result.value)).toBeTruthy(); + }); + + it("releases resources correctly", async () => { + await processingService.releaseResources(); + + expect(processingService["kyselyDatabase"].destroy).toHaveBeenCalled(); + }); + + it("logs error during resource release", async () => { + const mockError = new Error("Database error"); + const logSpy = vi.spyOn(processingService["logger"], "error"); + vi.mocked(processingService["kyselyDatabase"].destroy).mockRejectedValueOnce(mockError); + + await processingService.releaseResources(); + + expect(logSpy).toHaveBeenCalledWith(`Error releasing resources: ${mockError}`); + }); }); - it("logs error during resource release", async () => { - const mockError = new Error("Database error"); - const logSpy = vi.spyOn(processingService["logger"], "error"); - vi.mocked(processingService["kyselyDatabase"].destroy).mockRejectedValueOnce(mockError); + describe("retroactiveProcessing", () => { + it("processes retroactive strategies", async () => { + const runSpy = vi.mocked(RetroactiveProcessor.prototype.processRetroactiveStrategies); - await processingService.releaseResources(); + await processingService.processRetroactiveEvents(); - expect(logSpy).toHaveBeenCalledWith(`Error releasing resources: ${mockError}`); + // Verify both retroactive processors were run + expect(runSpy).toHaveBeenCalledTimes(2); + }); }); }); diff --git a/apps/processing/test/unit/sharedDependencies.service.spec.ts b/apps/processing/test/unit/sharedDependencies.service.spec.ts index bff277e..97b722b 100644 --- a/apps/processing/test/unit/sharedDependencies.service.spec.ts +++ b/apps/processing/test/unit/sharedDependencies.service.spec.ts @@ -23,6 +23,7 @@ vi.mock("@grants-stack-indexer/repository", () => ({ saveStrategyId: vi.fn(), })), KyselyEventRegistryRepository: vi.fn(), + KyselyStrategyProcessingCheckpointRepository: vi.fn(), })); vi.mock("@grants-stack-indexer/pricing", () => ({ @@ -126,5 +127,8 @@ describe("SharedDependenciesService", () => { // Verify registries expect(dependencies.registriesRepositories).toHaveProperty("eventRegistryRepository"); expect(dependencies.registriesRepositories).toHaveProperty("strategyRegistryRepository"); + expect(dependencies.registriesRepositories).toHaveProperty( + "strategyProcessingCheckpointRepository", + ); }); }); diff --git a/packages/data-flow/README.md b/packages/data-flow/README.md index d6ab601..9fc328a 100644 --- a/packages/data-flow/README.md +++ b/packages/data-flow/README.md @@ -101,3 +101,7 @@ There are 3 implementations: ### [DataLoader](./src/data-loader/dataLoader.ts) The `DataLoader` is responsible for applying changesets to the database. + +### [RetroactiveProcessor](./src/retroactiveProcessor.ts) + +The `RetroactiveProcessor` is an independent runner class for retroactively processing strategies from strategies that were previously unsupported. diff --git a/packages/data-flow/src/orchestrator.ts b/packages/data-flow/src/orchestrator.ts index 0a2f53e..1a02aee 100644 --- a/packages/data-flow/src/orchestrator.ts +++ b/packages/data-flow/src/orchestrator.ts @@ -48,7 +48,6 @@ import { CoreDependencies, DataLoader, delay, IQueue, iStrategyAbi, Queue } from * - Registry tracking of supported/unsupported strategies and events * * TODO: Enhance the error handling/retries, logging and observability - * TODO: Handle unhandled strategies appropriately */ export class Orchestrator { private readonly eventsQueue: IQueue>; diff --git a/packages/data-flow/src/retroactiveProcessor.ts b/packages/data-flow/src/retroactiveProcessor.ts index 5456cb1..d9fbd80 100644 --- a/packages/data-flow/src/retroactiveProcessor.ts +++ b/packages/data-flow/src/retroactiveProcessor.ts @@ -1,5 +1,6 @@ import { IIndexerClient } from "@grants-stack-indexer/indexer-client"; import { existsHandler, UnsupportedEventException } from "@grants-stack-indexer/processors"; +import { IStrategyProcessingCheckpointRepository } from "@grants-stack-indexer/repository"; import { Address, AnyEvent, @@ -39,10 +40,20 @@ type EventPointer = { * catch up on missed events and maintain data consistency. * * Key responsibilities: - * 1. Identify newly handleable strategies - * 2. Fetch historical events for these strategies - * 3. Process events through the appropriate handlers + * 1. Identify newly handleable strategies that were previously unsupported + * 2. Fetch historical events for these strategies from the Indexer client + * 3. Process events through the appropriate handlers to update system state * 4. Update strategy registry with processed status + * 5. Track processing progress via checkpoints to enable resumability + * + * The checkpoint registry maintains processing state for each strategy, storing: + * - Last processed block number and log index + * + * This enables the processor to: + * - Resume processing from last checkpoint after interruption + * - Track multiple strategies independently + * - Provide processing status visibility + * - Ensure exactly-once processing semantics */ export class RetroactiveProcessor { private readonly eventsFetcher: IEventsFetcher; @@ -50,6 +61,7 @@ export class RetroactiveProcessor { private readonly eventsRegistry: IEventsRegistry; private readonly strategyRegistry: IStrategyRegistry; private readonly dataLoader: DataLoader; + private readonly checkpointRepository: IStrategyProcessingCheckpointRepository; /** * Creates a new instance of RetroactiveProcessor @@ -67,6 +79,7 @@ export class RetroactiveProcessor { private registries: { eventsRegistry: IEventsRegistry; strategyRegistry: IStrategyRegistry; + checkpointRepository: IStrategyProcessingCheckpointRepository; }, private fetchLimit: number = 1000, private logger: ILogger, @@ -78,6 +91,7 @@ export class RetroactiveProcessor { }); this.eventsRegistry = registries.eventsRegistry; this.strategyRegistry = registries.strategyRegistry; + this.checkpointRepository = registries.checkpointRepository; this.dataLoader = new DataLoader( { project: this.dependencies.projectRepository, @@ -146,7 +160,16 @@ export class RetroactiveProcessor { strategyAddresses: Set
, lastEventPointer: Readonly, ): Promise { - const currentPointer: EventPointer = { blockNumber: 0, logIndex: 0 }; + // Check if we have a checkpoint for this strategy + const checkpoint = await this.checkpointRepository.getCheckpoint(this.chainId, strategyId); + + const currentPointer: EventPointer = checkpoint + ? { + blockNumber: checkpoint.lastProcessedBlockNumber, + logIndex: checkpoint.lastProcessedLogIndex, + } + : { blockNumber: 0, logIndex: 0 }; + const events = new Queue & { strategyId?: Hex }>(); let event: (ProcessorEvent & { strategyId?: Hex }) | undefined; @@ -176,8 +199,6 @@ export class RetroactiveProcessor { `Failed to apply changesets. ${executionResult.errors.join("\n")} Event: ${stringify(event)}`, ); } - - // Update pointer } catch (error) { if (error instanceof InvalidEvent || error instanceof UnsupportedEventException) { // Expected errors that we can safely ignore @@ -186,9 +207,25 @@ export class RetroactiveProcessor { this.logger.error(`Error processing event: ${stringify(event)} ${error}`); } } + + // Update checkpoint after processing + await this.updateCheckpoint(strategyId, currentPointer); } await this.markStrategyAsHandled(strategyId, strategyAddresses); + // Delete checkpoint after processing of all events + await this.checkpointRepository.deleteCheckpoint(this.chainId, strategyId); + } + + private async updateCheckpoint(strategyId: Hex, currentPointer: EventPointer): Promise { + const checkpointData = { + chainId: this.chainId, + strategyId, + lastProcessedBlockNumber: currentPointer.blockNumber, + lastProcessedLogIndex: currentPointer.logIndex, + }; + + await this.checkpointRepository.upsertCheckpoint(checkpointData); } private async enqueueEventsIfEmpty( diff --git a/packages/data-flow/test/unit/retroactiveProcessor.spec.ts b/packages/data-flow/test/unit/retroactiveProcessor.spec.ts index 69f5add..d4d3a85 100644 --- a/packages/data-flow/test/unit/retroactiveProcessor.spec.ts +++ b/packages/data-flow/test/unit/retroactiveProcessor.spec.ts @@ -8,6 +8,7 @@ import { IDonationRepository, IProjectRepository, IRoundRepository, + IStrategyProcessingCheckpointRepository, Strategy, } from "@grants-stack-indexer/repository"; import { @@ -64,6 +65,7 @@ describe("RetroactiveProcessor", () => { let mockEventsRegistry: IEventsRegistry; let mockStrategyRegistry: IStrategyRegistry; let mockEvmProvider: EvmProvider; + let mockCheckpointRepository: IStrategyProcessingCheckpointRepository; let mockLogger: ILogger; let mockEventsProcessor: EventsProcessor; let mockDataLoader: DataLoader; @@ -123,6 +125,12 @@ describe("RetroactiveProcessor", () => { warn: vi.fn(), }; + mockCheckpointRepository = { + upsertCheckpoint: vi.fn(), + deleteCheckpoint: vi.fn(), + getCheckpoint: vi.fn(), + }; + const dependencies: CoreDependencies = { evmProvider: mockEvmProvider, projectRepository: {} as IProjectRepository, @@ -145,6 +153,7 @@ describe("RetroactiveProcessor", () => { { eventsRegistry: mockEventsRegistry, strategyRegistry: mockStrategyRegistry, + checkpointRepository: mockCheckpointRepository, }, mockFetchLimit, mockLogger, @@ -217,6 +226,8 @@ describe("RetroactiveProcessor", () => { ); expect(mockEventsProcessor.processEvent).toHaveBeenCalledTimes(1); expect(mockStrategyRegistry.saveStrategyId).toHaveBeenCalledTimes(2); + expect(mockCheckpointRepository.upsertCheckpoint).toHaveBeenCalledTimes(1); + expect(mockCheckpointRepository.deleteCheckpoint).toHaveBeenCalledTimes(1); }); it("process multiple new handleable strategies", async () => { @@ -272,6 +283,59 @@ describe("RetroactiveProcessor", () => { "0x6f9291df02b2664139cec5703c124e4ebce32879c74b6297faa1468aa5ff9ebf", true, ); + expect(mockCheckpointRepository.upsertCheckpoint).toHaveBeenCalledTimes(2); + expect(mockCheckpointRepository.deleteCheckpoint).toHaveBeenCalledTimes(2); + }); + + it("starts from checkpoint if exists", async () => { + const mockEvent = createMockEvent(eventName, defaultParams, existentStrategyId, { + blockNumber: 95, + logIndex: 4, + }); + + vi.spyOn(mockStrategyRegistry, "getStrategies").mockResolvedValue(mockValidStrategies); + vi.spyOn(mockEventsRegistry, "getLastProcessedEvent").mockResolvedValue({ + blockNumber: 100, + logIndex: 1, + chainId, + blockTimestamp: 1234567890, + }); + vi.spyOn(mockCheckpointRepository, "getCheckpoint").mockResolvedValue({ + chainId, + strategyId: existentStrategyId, + lastProcessedBlockNumber: 90, + lastProcessedLogIndex: 0, + }); + + vi.spyOn(mockEventsFetcher, "fetchEvents") + .mockResolvedValueOnce([mockEvent]) + .mockResolvedValue([]); + vi.spyOn(mockEventsProcessor, "processEvent").mockResolvedValue([]); + vi.spyOn(mockDataLoader, "applyChanges").mockResolvedValue({ + numFailed: 0, + errors: [], + changesets: [], + numExecuted: 1, + numSuccessful: 1, + }); + vi.spyOn(mockStrategyRegistry, "saveStrategyId").mockResolvedValue(); + + await processor.processRetroactiveStrategies(); + + expect(mockLogger.info).toHaveBeenCalledWith( + "Retroactive processing complete. Succeeded: 1, Failed: 0", + ); + + expect(mockEventsProcessor.processEvent).toHaveBeenCalledTimes(1); + expect(mockStrategyRegistry.saveStrategyId).toHaveBeenCalledTimes(2); + expect(mockEventsFetcher.fetchEvents).not.toHaveBeenNthCalledWith( + 1, + expect.objectContaining({ + from: { blockNumber: 0, logIndex: 0 }, + }), + ); + expect(mockCheckpointRepository.upsertCheckpoint).toHaveBeenCalledTimes(1); + expect(mockCheckpointRepository.deleteCheckpoint).toHaveBeenCalledTimes(1); }); it("breaks loop if event is older than last processed", async () => { @@ -432,7 +496,6 @@ describe("RetroactiveProcessor", () => { }); }); }); - /** * Creates a mock event for testing. * diff --git a/packages/repository/src/db/connection.ts b/packages/repository/src/db/connection.ts index d7398a4..f243832 100644 --- a/packages/repository/src/db/connection.ts +++ b/packages/repository/src/db/connection.ts @@ -21,6 +21,7 @@ import { Round, RoundRole as RoundRoleTable, StatusSnapshot, + StrategyProcessingCheckpoint as StrategyProcessingCheckpointTable, Strategy as StrategyRegistryTable, } from "../internal.js"; @@ -63,6 +64,7 @@ export interface Database { applicationsPayouts: ApplicationPayoutTable; strategiesRegistry: StrategyRegistryTable; eventsRegistry: EventRegistryTable; + strategyProcessingCheckpoints: StrategyProcessingCheckpointTable; } /** diff --git a/packages/repository/src/external.ts b/packages/repository/src/external.ts index 08222ec..9966150 100644 --- a/packages/repository/src/external.ts +++ b/packages/repository/src/external.ts @@ -10,6 +10,7 @@ export type { IApplicationPayoutRepository, IStrategyRegistryRepository, IEventRegistryRepository, + IStrategyProcessingCheckpointRepository, DatabaseConfig, } from "./internal.js"; @@ -57,6 +58,7 @@ export { KyselyApplicationPayoutRepository, KyselyStrategyRegistryRepository, KyselyEventRegistryRepository, + KyselyStrategyProcessingCheckpointRepository, } from "./repositories/kysely/index.js"; export { @@ -66,4 +68,6 @@ export { ProjectByRoleNotFound, } from "./internal.js"; +export type { StrategyProcessingCheckpoint, NewStrategyProcessingCheckpoint } from "./internal.js"; + export { createKyselyPostgresDb as createKyselyDatabase } from "./internal.js"; diff --git a/packages/repository/src/interfaces/index.ts b/packages/repository/src/interfaces/index.ts index f1a27be..f8818a5 100644 --- a/packages/repository/src/interfaces/index.ts +++ b/packages/repository/src/interfaces/index.ts @@ -5,3 +5,4 @@ export * from "./donationRepository.interface.js"; export * from "./applicationPayoutRepository.interface.js"; export * from "./strategyRepository.interface.js"; export * from "./eventsRepository.interface.js"; +export * from "./strategyProcessingCheckpointRepository.interface.js"; diff --git a/packages/repository/src/interfaces/strategyProcessingCheckpointRepository.interface.ts b/packages/repository/src/interfaces/strategyProcessingCheckpointRepository.interface.ts new file mode 100644 index 0000000..2ad8d7f --- /dev/null +++ b/packages/repository/src/interfaces/strategyProcessingCheckpointRepository.interface.ts @@ -0,0 +1,28 @@ +import { ChainId, Hex } from "@grants-stack-indexer/shared"; + +import { NewStrategyProcessingCheckpoint, StrategyProcessingCheckpoint } from "../internal.js"; + +export interface IStrategyProcessingCheckpointRepository { + /** + * Get the latest checkpoint for a strategy + * @param chainId - The chain ID + * @param strategyId - The strategy ID + */ + getCheckpoint( + chainId: ChainId, + strategyId: Hex, + ): Promise; + + /** + * Upsert a checkpoint for a strategy + * @param checkpoint - The checkpoint data to upsert + */ + upsertCheckpoint(checkpoint: NewStrategyProcessingCheckpoint): Promise; + + /** + * Delete the checkpoint for a strategy + * @param chainId - The chain ID + * @param strategyId - The strategy ID + */ + deleteCheckpoint(chainId: ChainId, strategyId: Hex): Promise; +} diff --git a/packages/repository/src/repositories/kysely/index.ts b/packages/repository/src/repositories/kysely/index.ts index 66febbe..fcdd4e3 100644 --- a/packages/repository/src/repositories/kysely/index.ts +++ b/packages/repository/src/repositories/kysely/index.ts @@ -5,3 +5,4 @@ export * from "./donation.repository.js"; export * from "./applicationPayout.repository.js"; export * from "./strategyRegistry.repository.js"; export * from "./eventRegistry.repository.js"; +export * from "./strategyProcessingCheckpoint.repository.js"; diff --git a/packages/repository/src/repositories/kysely/strategyProcessingCheckpoint.repository.ts b/packages/repository/src/repositories/kysely/strategyProcessingCheckpoint.repository.ts new file mode 100644 index 0000000..80c6e34 --- /dev/null +++ b/packages/repository/src/repositories/kysely/strategyProcessingCheckpoint.repository.ts @@ -0,0 +1,60 @@ +import { Kysely } from "kysely"; + +import { ChainId, Hex } from "@grants-stack-indexer/shared"; + +import { Database } from "../../db/connection.js"; +import { IStrategyProcessingCheckpointRepository } from "../../interfaces/strategyProcessingCheckpointRepository.interface.js"; +import { NewStrategyProcessingCheckpoint, StrategyProcessingCheckpoint } from "../../internal.js"; + +export class KyselyStrategyProcessingCheckpointRepository + implements IStrategyProcessingCheckpointRepository +{ + constructor( + private readonly db: Kysely, + private readonly schemaName: string, + ) {} + + /** @inheritdoc */ + async getCheckpoint( + chainId: ChainId, + strategyId: Hex, + ): Promise { + return this.db + .withSchema(this.schemaName) + .selectFrom("strategyProcessingCheckpoints") + .where("chainId", "=", chainId) + .where("strategyId", "=", strategyId) + .selectAll() + .executeTakeFirst(); + } + + /** @inheritdoc */ + async upsertCheckpoint(checkpoint: NewStrategyProcessingCheckpoint): Promise { + await this.db + .withSchema(this.schemaName) + .insertInto("strategyProcessingCheckpoints") + .values({ + ...checkpoint, + createdAt: new Date(), + updatedAt: new Date(), + }) + .onConflict((oc) => + oc.columns(["chainId", "strategyId"]).doUpdateSet({ + lastProcessedBlockNumber: checkpoint.lastProcessedBlockNumber, + lastProcessedLogIndex: checkpoint.lastProcessedLogIndex, + updatedAt: new Date(), + }), + ) + .execute(); + } + + /** @inheritdoc */ + async deleteCheckpoint(chainId: ChainId, strategyId: Hex): Promise { + await this.db + .withSchema(this.schemaName) + .deleteFrom("strategyProcessingCheckpoints") + .where("chainId", "=", chainId) + .where("strategyId", "=", strategyId) + .execute(); + } +} diff --git a/packages/repository/src/types/index.ts b/packages/repository/src/types/index.ts index 9832417..64da8f9 100644 --- a/packages/repository/src/types/index.ts +++ b/packages/repository/src/types/index.ts @@ -6,3 +6,4 @@ export * from "./donation.types.js"; export * from "./applicationPayout.types.js"; export * from "./strategy.types.js"; export * from "./event.types.js"; +export * from "./strategyProcessingCheckpoint.types.js"; diff --git a/packages/repository/src/types/strategyProcessingCheckpoint.types.ts b/packages/repository/src/types/strategyProcessingCheckpoint.types.ts new file mode 100644 index 0000000..1d88cad --- /dev/null +++ b/packages/repository/src/types/strategyProcessingCheckpoint.types.ts @@ -0,0 +1,15 @@ +import { ChainId, Hex } from "@grants-stack-indexer/shared"; + +export type StrategyProcessingCheckpoint = { + chainId: ChainId; + strategyId: Hex; + lastProcessedBlockNumber: number; + lastProcessedLogIndex: number; + createdAt?: Date; + updatedAt?: Date; +}; + +export type NewStrategyProcessingCheckpoint = Omit< + StrategyProcessingCheckpoint, + "createdAt" | "updatedAt" +>; diff --git a/scripts/migrations/src/migrations/20241223T000000_add_strategy_processing_checkpoints.ts b/scripts/migrations/src/migrations/20241223T000000_add_strategy_processing_checkpoints.ts new file mode 100644 index 0000000..6a633c1 --- /dev/null +++ b/scripts/migrations/src/migrations/20241223T000000_add_strategy_processing_checkpoints.ts @@ -0,0 +1,27 @@ +import { Kysely, sql } from "kysely"; + +/** + * The up function is called when you update your database schema to the next version and down when you go back to previous version. + * The only argument for the functions is an instance of Kysely. It's important to use Kysely and not Kysely. + * ref: https://kysely.dev/docs/migrations#migration-files + */ +// eslint-disable-next-line @typescript-eslint/no-explicit-any +export async function up(db: Kysely): Promise { + const CHAIN_ID_TYPE = "integer"; + + await db.schema + .createTable("strategy_processing_checkpoints") + .addColumn("chainId", CHAIN_ID_TYPE) + .addColumn("strategyId", "text") + .addColumn("lastProcessedBlockNumber", "integer") + .addColumn("lastProcessedLogIndex", "integer") + .addColumn("createdAt", "timestamptz", (col) => col.defaultTo(sql`now()`)) + .addColumn("updatedAt", "timestamptz", (col) => col.defaultTo(sql`now()`)) + .addPrimaryKeyConstraint("strategy_processing_checkpoints_pkey", ["chainId", "strategyId"]) + .execute(); +} + +// eslint-disable-next-line @typescript-eslint/no-explicit-any +export async function down(db: Kysely): Promise { + await db.schema.dropTable("strategy_processing_checkpoints").execute(); +}