Skip to content

Commit

Permalink
feat: add event processing checkpoint on retroactive processing
Browse files Browse the repository at this point in the history
  • Loading branch information
0xnigir1 committed Dec 24, 2024
1 parent f94c48d commit 1ccb408
Show file tree
Hide file tree
Showing 18 changed files with 374 additions and 96 deletions.
1 change: 1 addition & 0 deletions apps/processing/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ Available scripts that can be run using `pnpm`:
| `start` | Run the compiled app from dist folder |
| `test` | Run tests using vitest |
| `test:cov` | Run tests with coverage report |
| `retroactive` | Run retroactive processing for all chains |

TODO: e2e tests
TODO: Docker image
7 changes: 6 additions & 1 deletion apps/processing/src/services/processing.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,11 @@ export class ProcessingService {
const sharedDependencies = await SharedDependenciesService.initialize(env);
const { CHAINS: chains } = env;
const { core, registriesRepositories, indexerClient, kyselyDatabase } = sharedDependencies;
const { eventRegistryRepository, strategyRegistryRepository } = registriesRepositories;
const {
eventRegistryRepository,
strategyRegistryRepository,
strategyProcessingCheckpointRepository,
} = registriesRepositories;
const orchestrators: Map<ChainId, [Orchestrator, RetroactiveProcessor]> = new Map();

const strategyRegistry = await InMemoryCachedStrategyRegistry.initialize(
Expand Down Expand Up @@ -90,6 +94,7 @@ export class ProcessingService {
{
eventsRegistry: cachedEventsRegistry,
strategyRegistry,
checkpointRepository: strategyProcessingCheckpointRepository,
},
chain.fetchLimit,
chainLogger,
Expand Down
7 changes: 7 additions & 0 deletions apps/processing/src/services/sharedDependencies.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@ import { PricingProviderFactory } from "@grants-stack-indexer/pricing";
import {
createKyselyDatabase,
IEventRegistryRepository,
IStrategyProcessingCheckpointRepository,
IStrategyRegistryRepository,
KyselyApplicationPayoutRepository,
KyselyApplicationRepository,
KyselyDonationRepository,
KyselyEventRegistryRepository,
KyselyProjectRepository,
KyselyRoundRepository,
KyselyStrategyProcessingCheckpointRepository,
KyselyStrategyRegistryRepository,
} from "@grants-stack-indexer/repository";
import { Logger } from "@grants-stack-indexer/shared";
Expand All @@ -23,6 +25,7 @@ export type SharedDependencies = {
registriesRepositories: {
eventRegistryRepository: IEventRegistryRepository;
strategyRegistryRepository: IStrategyRegistryRepository;
strategyProcessingCheckpointRepository: IStrategyProcessingCheckpointRepository;
};
indexerClient: EnvioIndexerClient;
kyselyDatabase: ReturnType<typeof createKyselyDatabase>;
Expand Down Expand Up @@ -73,6 +76,9 @@ export class SharedDependenciesService {
env.DATABASE_SCHEMA,
);

const strategyProcessingCheckpointRepository =
new KyselyStrategyProcessingCheckpointRepository(kyselyDatabase, env.DATABASE_SCHEMA);

// Initialize indexer client
const indexerClient = new EnvioIndexerClient(
env.INDEXER_GRAPHQL_URL,
Expand All @@ -92,6 +98,7 @@ export class SharedDependenciesService {
registriesRepositories: {
eventRegistryRepository,
strategyRegistryRepository,
strategyProcessingCheckpointRepository,
},
indexerClient,
kyselyDatabase,
Expand Down
193 changes: 106 additions & 87 deletions apps/processing/test/unit/processing.service.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import {
InMemoryCachedEventRegistry,
InMemoryCachedStrategyRegistry,
Orchestrator,
RetroactiveProcessor,
} from "@grants-stack-indexer/data-flow";

import type { Environment } from "../../src/config/env.js";
Expand Down Expand Up @@ -67,6 +68,11 @@ vi.spyOn(Orchestrator.prototype, "run").mockImplementation(async function (signa
await new Promise((resolve) => setTimeout(resolve, 100));
}
});
vi.spyOn(RetroactiveProcessor.prototype, "processRetroactiveStrategies").mockImplementation(
async () => {
await new Promise((resolve) => setTimeout(resolve, 100));
},
);

describe("ProcessingService", () => {
let processingService: ProcessingService;
Expand Down Expand Up @@ -100,96 +106,109 @@ describe("ProcessingService", () => {
vi.clearAllMocks();
});

it("initializes multiple orchestrators correctly", () => {
expect(InMemoryCachedStrategyRegistry.initialize).toHaveBeenCalledTimes(1);
expect(DatabaseStrategyRegistry).toHaveBeenCalledTimes(1);
expect(DatabaseEventRegistry).toHaveBeenCalledTimes(1);
expect(EvmProvider).toHaveBeenCalledTimes(2);
expect(InMemoryCachedEventRegistry.initialize).toHaveBeenCalledTimes(2);

// Verify orchestrators were created with correct parameters
expect(processingService["orchestrators"].size).toBe(2);

// Verify first chain initialization
expect(EvmProvider).toHaveBeenNthCalledWith(
1,
["http://localhost:8545"],
expect.any(Object),
expect.any(Object),
);

// Verify second chain initialization
expect(EvmProvider).toHaveBeenNthCalledWith(
2,
["http://localhost:8546"],
expect.any(Object),
expect.any(Object),
);
});

it("starts all orchestrators and handles shutdown signals", async () => {
const abortSpy = vi.spyOn(AbortController.prototype, "abort");
const runSpy = vi.mocked(Orchestrator.prototype.run);
const logSpy = vi.spyOn(processingService["logger"], "info");

const startPromise = processingService.start();

// Wait for orchestrators to start
await new Promise((resolve) => setTimeout(resolve, 100));

// Verify both orchestrators are running
// const orchestratorInstances = vi.mocked(Orchestrator).mock.results;
// Verify both orchestrators are running
expect(runSpy).toHaveBeenCalledTimes(2);
expect(runSpy.mock.calls.map((call) => call[0])).toEqual([
expect.any(AbortSignal),
expect.any(AbortSignal),
]);
expect(logSpy).toHaveBeenNthCalledWith(2, "Starting orchestrator for chain 1...");
expect(logSpy).toHaveBeenNthCalledWith(3, "Starting orchestrator for chain 2...");

// Simulate SIGINT
process.emit("SIGINT");
expect(abortSpy).toHaveBeenCalled();

// Wait for orchestrators to shut down
await startPromise;

// Verify all orchestrators were properly shut down
expect(runSpy.mock.results.every((result) => result.value)).toBeTruthy();
});

it("handles SIGTERM signal", async () => {
const abortSpy = vi.spyOn(AbortController.prototype, "abort");
const startPromise = processingService.start();
const runSpy = vi.mocked(Orchestrator.prototype.run);

// Wait for orchestrators to start
await new Promise((resolve) => setTimeout(resolve, 100));

// Simulate SIGTERM
process.emit("SIGTERM");
expect(abortSpy).toHaveBeenCalled();

await startPromise;

// Verify all orchestrators were properly shut down
expect(runSpy.mock.results.every((result) => result.value)).toBeTruthy();
});

it("releases resources correctly", async () => {
await processingService.releaseResources();

expect(processingService["kyselyDatabase"].destroy).toHaveBeenCalled();
describe("start", () => {
it("initializes multiple orchestrators correctly", () => {
expect(InMemoryCachedStrategyRegistry.initialize).toHaveBeenCalledTimes(1);
expect(DatabaseStrategyRegistry).toHaveBeenCalledTimes(1);
expect(DatabaseEventRegistry).toHaveBeenCalledTimes(1);
expect(EvmProvider).toHaveBeenCalledTimes(2);
expect(InMemoryCachedEventRegistry.initialize).toHaveBeenCalledTimes(2);

// Verify orchestrators were created with correct parameters
expect(processingService["orchestrators"].size).toBe(2);

// Verify first chain initialization
expect(EvmProvider).toHaveBeenNthCalledWith(
1,
["http://localhost:8545"],
expect.any(Object),
expect.any(Object),
);

// Verify second chain initialization
expect(EvmProvider).toHaveBeenNthCalledWith(
2,
["http://localhost:8546"],
expect.any(Object),
expect.any(Object),
);
});

it("starts all orchestrators and handles shutdown signals", async () => {
const abortSpy = vi.spyOn(AbortController.prototype, "abort");
const runSpy = vi.mocked(Orchestrator.prototype.run);
const logSpy = vi.spyOn(processingService["logger"], "info");

const startPromise = processingService.start();

// Wait for orchestrators to start
await new Promise((resolve) => setTimeout(resolve, 100));

// Verify both orchestrators are running
// const orchestratorInstances = vi.mocked(Orchestrator).mock.results;
// Verify both orchestrators are running
expect(runSpy).toHaveBeenCalledTimes(2);
expect(runSpy.mock.calls.map((call) => call[0])).toEqual([
expect.any(AbortSignal),
expect.any(AbortSignal),
]);
expect(logSpy).toHaveBeenNthCalledWith(2, "Starting orchestrator for chain 1...");
expect(logSpy).toHaveBeenNthCalledWith(3, "Starting orchestrator for chain 2...");

// Simulate SIGINT
process.emit("SIGINT");
expect(abortSpy).toHaveBeenCalled();

// Wait for orchestrators to shut down
await startPromise;

// Verify all orchestrators were properly shut down
expect(runSpy.mock.results.every((result) => result.value)).toBeTruthy();
});

it("handles SIGTERM signal", async () => {
const abortSpy = vi.spyOn(AbortController.prototype, "abort");
const startPromise = processingService.start();
const runSpy = vi.mocked(Orchestrator.prototype.run);

// Wait for orchestrators to start
await new Promise((resolve) => setTimeout(resolve, 100));

// Simulate SIGTERM
process.emit("SIGTERM");
expect(abortSpy).toHaveBeenCalled();

await startPromise;

// Verify all orchestrators were properly shut down
expect(runSpy.mock.results.every((result) => result.value)).toBeTruthy();
});

it("releases resources correctly", async () => {
await processingService.releaseResources();

expect(processingService["kyselyDatabase"].destroy).toHaveBeenCalled();
});

it("logs error during resource release", async () => {
const mockError = new Error("Database error");
const logSpy = vi.spyOn(processingService["logger"], "error");
vi.mocked(processingService["kyselyDatabase"].destroy).mockRejectedValueOnce(mockError);

await processingService.releaseResources();

expect(logSpy).toHaveBeenCalledWith(`Error releasing resources: ${mockError}`);
});
});

it("logs error during resource release", async () => {
const mockError = new Error("Database error");
const logSpy = vi.spyOn(processingService["logger"], "error");
vi.mocked(processingService["kyselyDatabase"].destroy).mockRejectedValueOnce(mockError);
describe("retroactiveProcessing", () => {
it("processes retroactive strategies", async () => {
const runSpy = vi.mocked(RetroactiveProcessor.prototype.processRetroactiveStrategies);

await processingService.releaseResources();
await processingService.processRetroactiveEvents();

expect(logSpy).toHaveBeenCalledWith(`Error releasing resources: ${mockError}`);
// Verify both retroactive processors were run
expect(runSpy).toHaveBeenCalledTimes(2);
});
});
});
4 changes: 4 additions & 0 deletions apps/processing/test/unit/sharedDependencies.service.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ vi.mock("@grants-stack-indexer/repository", () => ({
saveStrategyId: vi.fn(),
})),
KyselyEventRegistryRepository: vi.fn(),
KyselyStrategyProcessingCheckpointRepository: vi.fn(),
}));

vi.mock("@grants-stack-indexer/pricing", () => ({
Expand Down Expand Up @@ -126,5 +127,8 @@ describe("SharedDependenciesService", () => {
// Verify registries
expect(dependencies.registriesRepositories).toHaveProperty("eventRegistryRepository");
expect(dependencies.registriesRepositories).toHaveProperty("strategyRegistryRepository");
expect(dependencies.registriesRepositories).toHaveProperty(
"strategyProcessingCheckpointRepository",
);
});
});
4 changes: 4 additions & 0 deletions packages/data-flow/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -101,3 +101,7 @@ There are 3 implementations:
### [DataLoader](./src/data-loader/dataLoader.ts)

The `DataLoader` is responsible for applying changesets to the database.

### [RetroactiveProcessor](./src/retroactiveProcessor.ts)

The `RetroactiveProcessor` is an independent runner class for retroactively processing strategies from strategies that were previously unsupported.
1 change: 0 additions & 1 deletion packages/data-flow/src/orchestrator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ import { CoreDependencies, DataLoader, delay, IQueue, iStrategyAbi, Queue } from
* - Registry tracking of supported/unsupported strategies and events
*
* TODO: Enhance the error handling/retries, logging and observability
* TODO: Handle unhandled strategies appropriately
*/
export class Orchestrator {
private readonly eventsQueue: IQueue<ProcessorEvent<ContractName, AnyEvent>>;
Expand Down
Loading

0 comments on commit 1ccb408

Please sign in to comment.