Skip to content

Commit

Permalink
fix(indexer): allow clean exit by stopping worker (#90)
Browse files Browse the repository at this point in the history
Signed-off-by: david <[email protected]>
  • Loading branch information
daywiss authored Oct 30, 2024
1 parent 62e02d1 commit 50e239d
Show file tree
Hide file tree
Showing 4 changed files with 14 additions and 5 deletions.
2 changes: 1 addition & 1 deletion packages/indexer/src/data-indexing/service/Indexer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ export class Indexer {
public stopGracefully() {
this.logger.info({
at: "Indexer::stopGracefully",
message: `Requesting indexer ${this.dataHandler.getDataIdentifier()} to be stoped`,
message: `Requesting indexer ${this.dataHandler.getDataIdentifier()} to be stopped`,
});
this.stopRequested = true;
}
Expand Down
2 changes: 1 addition & 1 deletion packages/indexer/src/generics/BaseIndexer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ export abstract class BaseIndexer {
public stop(): void {
this.logger.info({
at: "BaseIndexer#stop",
message: `Requesting indexer ${this.name} to be stoped`,
message: `Requesting indexer ${this.name} to be stopped`,
});
this.stopRequested = true;
}
Expand Down
11 changes: 9 additions & 2 deletions packages/indexer/src/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,12 @@ export async function Main(config: parseEnv.Config, logger: winston.Logger) {

const indexerQueuesService = new IndexerQueuesService(redis);
// Set up message workers
new IntegratorIdWorker(redis, postgres, logger, retryProvidersFactory);
const integratorIdWorker = new IntegratorIdWorker(
redis,
postgres,
logger,
retryProvidersFactory,
);

const spokePoolIndexers = spokePoolChainsEnabled.map((chainId) => {
const spokePoolIndexerDataHandler = new SpokePoolIndexerDataHandler(
Expand Down Expand Up @@ -156,11 +161,13 @@ export async function Main(config: parseEnv.Config, logger: winston.Logger) {
at: "Indexer#Main",
message: "Wait for shutdown, or press Ctrl+C again to forcefully exit.",
});
integratorIdWorker.close();
spokePoolIndexers.map((s) => s.stopGracefully());
hubPoolIndexer.stopGracefully();
bundleProcessor.stop();
bundleBuilderProcessor.stop();
} else {
integratorIdWorker.close();
logger.info({ at: "Indexer#Main", message: "Forcing exit..." });
redis?.quit();
postgres?.destroy();
Expand Down Expand Up @@ -195,7 +202,7 @@ export async function Main(config: parseEnv.Config, logger: winston.Logger) {
bundleBuilderResult.status === "fulfilled",
},
});

await integratorIdWorker.close();
redis?.quit();
postgres?.destroy();
logger.info({ at: "Indexer#Main", message: "Exiting indexer" });
Expand Down
4 changes: 3 additions & 1 deletion packages/indexer/src/messaging/IntegratorIdWorker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ export class IntegratorIdWorker {
{ connection: this.redis, concurrency: 10 },
);
}

private async run(relayHash: string) {
const repository = this.postgres.getRepository(entities.V3FundsDeposited);
const deposit = await repository.findOne({
Expand Down Expand Up @@ -80,4 +79,7 @@ export class IntegratorIdWorker {
}
return;
}
public async close() {
return this.worker.close();
}
}

0 comments on commit 50e239d

Please sign in to comment.