-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: retroactively handle strategy #45
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cool PR 👌
let processor: ProcessingService; | ||
|
||
const main = async (): Promise<void> => { | ||
processor = await ProcessingService.initialize(environment); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might be good to add some way to communicate if it initialized successfully/unsuccessfully
Could either have an initialized log after success
Or could have a failed to initialize X service message in the initialize error handling
process.exit(1); | ||
}); | ||
|
||
process.on("uncaughtException", (error: Error) => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Type Error or unknown? 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
actually it's typed as Error
for uncaughtException
but unknown for unhandledRejection
type UncaughtExceptionListener = (error: Error, origin: UncaughtExceptionOrigin) => void;
/**
* Most of the time the unhandledRejection will be an Error, but this should not be relied upon
* as *anything* can be thrown/rejected, it is therefore unsafe to assume that the value is an Error.
*/
type UnhandledRejectionListener = (reason: unknown, promise: Promise<unknown>) => void;
@@ -126,6 +142,12 @@ export class ProcessingService { | |||
} | |||
} | |||
|
|||
async processRetroactiveEvents(): Promise<void> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
docs here?
await new Promise((resolve) => setTimeout(resolve, 100)); | ||
|
||
// Verify both orchestrators are running | ||
// const orchestratorInstances = vi.mocked(Orchestrator).mock.results; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Commented out code?
stringify, | ||
} from "@grants-stack-indexer/shared"; | ||
|
||
import { EventsProcessor } from "./eventsProcessor.js"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure if you have an index/internal file where you're exporting the processors? if not it's ok
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah we have, nice catch
* @param dependencies - Core system dependencies for data access and processing | ||
* @param indexerClient - Client for fetching blockchain events | ||
* @param registries - Event and strategy registries for tracking processing state | ||
* @param fetchLimit - Maximum number of events to fetch in a single batch |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe note here that you've set a default
await this.checkpointRepository.deleteCheckpoint(this.chainId, strategyId); | ||
} | ||
|
||
private async updateCheckpoint(strategyId: Hex, currentPointer: EventPointer): Promise<void> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe should add docs for the functions in this class
* from: "0xcBf407C33d68a55CB594Ffc8f4fD1416Bba39DA5", | ||
* }, | ||
*/ | ||
export const createMockEvent = <T extends ContractToEventName<"Strategy">>( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you're going to create mock events for other classes it might be good to move this to a shared folder or rename this to be for the specific event
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we have but is not in the Shared package, a PR later moving it there and refactoring where needed is the best i think
import { ChainId, Hex } from "@grants-stack-indexer/shared"; | ||
|
||
import { Database } from "../../db/connection.js"; | ||
import { IStrategyProcessingCheckpointRepository } from "../../interfaces/strategyProcessingCheckpointRepository.interface.js"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there's an index.js where this is being exported
strategyId: Hex; | ||
lastProcessedBlockNumber: number; | ||
lastProcessedLogIndex: number; | ||
createdAt?: Date; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Strange that createdAt is optional but I guess not all the created at times are available 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
left it optional because I delegated (in this case) the default to sql.now()
instead of doing it in the repository. Aldo at business level is not so critical but is nice to have it there on the DB
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What a PR, nice job!
Just a clarification comment and it's good to go IMO
expect(runSpy.mock.results.every((result) => result.value)).toBeTruthy(); | ||
}); | ||
|
||
it("handles SIGTERM signal", async () => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice!
// Delete checkpoint after processing of all events | ||
await this.checkpointRepository.deleteCheckpoint(this.chainId, strategyId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm sure I'm getting this wrong due to not being familiar with the codebase but wouldn't deleteCheckpoint
delete the updated checkpoint on line 214? Or the idea is to use these checkpoints just for this retroactive processing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as you're saying, the checkpoints are specific for this retroactive processing in case smth critical happens (like machine goes down), so we restart and keep processing from the point we were. this won't be used a lot actually and when used is not that we will have tons of new strategies
🤖 Linear
Closes GIT-206 GIT-207 GIT-187
Description
We want to retroactively process events for strategies that were previously unhandleable and now we have the corresponding handler implemented. For this we create the
RetroactiveProcessor
an independent class from the main Orchestrator.Also, we introduce a new table so we checkpoint every event we process and can safely manage a critical error and restart retroactive processing without processing an event twice.
Checklist before requesting a review