From 50e239d290bb696661eeead14c788bc5e3228a05 Mon Sep 17 00:00:00 2001 From: David A <4429761+daywiss@users.noreply.github.com> Date: Wed, 30 Oct 2024 13:43:51 -0400 Subject: [PATCH] fix(indexer): allow clean exit by stopping worker (#90) Signed-off-by: david --- packages/indexer/src/data-indexing/service/Indexer.ts | 2 +- packages/indexer/src/generics/BaseIndexer.ts | 2 +- packages/indexer/src/main.ts | 11 +++++++++-- packages/indexer/src/messaging/IntegratorIdWorker.ts | 4 +++- 4 files changed, 14 insertions(+), 5 deletions(-) diff --git a/packages/indexer/src/data-indexing/service/Indexer.ts b/packages/indexer/src/data-indexing/service/Indexer.ts index 34f59cc0..8043baa0 100644 --- a/packages/indexer/src/data-indexing/service/Indexer.ts +++ b/packages/indexer/src/data-indexing/service/Indexer.ts @@ -74,7 +74,7 @@ export class Indexer { public stopGracefully() { this.logger.info({ at: "Indexer::stopGracefully", - message: `Requesting indexer ${this.dataHandler.getDataIdentifier()} to be stoped`, + message: `Requesting indexer ${this.dataHandler.getDataIdentifier()} to be stopped`, }); this.stopRequested = true; } diff --git a/packages/indexer/src/generics/BaseIndexer.ts b/packages/indexer/src/generics/BaseIndexer.ts index 7f88305f..4edf4563 100644 --- a/packages/indexer/src/generics/BaseIndexer.ts +++ b/packages/indexer/src/generics/BaseIndexer.ts @@ -61,7 +61,7 @@ export abstract class BaseIndexer { public stop(): void { this.logger.info({ at: "BaseIndexer#stop", - message: `Requesting indexer ${this.name} to be stoped`, + message: `Requesting indexer ${this.name} to be stopped`, }); this.stopRequested = true; } diff --git a/packages/indexer/src/main.ts b/packages/indexer/src/main.ts index c98b08dc..8bae7459 100644 --- a/packages/indexer/src/main.ts +++ b/packages/indexer/src/main.ts @@ -91,7 +91,12 @@ export async function Main(config: parseEnv.Config, logger: winston.Logger) { const indexerQueuesService = new IndexerQueuesService(redis); // Set up message workers - new IntegratorIdWorker(redis, postgres, logger, retryProvidersFactory); + const integratorIdWorker = new IntegratorIdWorker( + redis, + postgres, + logger, + retryProvidersFactory, + ); const spokePoolIndexers = spokePoolChainsEnabled.map((chainId) => { const spokePoolIndexerDataHandler = new SpokePoolIndexerDataHandler( @@ -156,11 +161,13 @@ export async function Main(config: parseEnv.Config, logger: winston.Logger) { at: "Indexer#Main", message: "Wait for shutdown, or press Ctrl+C again to forcefully exit.", }); + integratorIdWorker.close(); spokePoolIndexers.map((s) => s.stopGracefully()); hubPoolIndexer.stopGracefully(); bundleProcessor.stop(); bundleBuilderProcessor.stop(); } else { + integratorIdWorker.close(); logger.info({ at: "Indexer#Main", message: "Forcing exit..." }); redis?.quit(); postgres?.destroy(); @@ -195,7 +202,7 @@ export async function Main(config: parseEnv.Config, logger: winston.Logger) { bundleBuilderResult.status === "fulfilled", }, }); - + await integratorIdWorker.close(); redis?.quit(); postgres?.destroy(); logger.info({ at: "Indexer#Main", message: "Exiting indexer" }); diff --git a/packages/indexer/src/messaging/IntegratorIdWorker.ts b/packages/indexer/src/messaging/IntegratorIdWorker.ts index 103db334..9093dbac 100644 --- a/packages/indexer/src/messaging/IntegratorIdWorker.ts +++ b/packages/indexer/src/messaging/IntegratorIdWorker.ts @@ -47,7 +47,6 @@ export class IntegratorIdWorker { { connection: this.redis, concurrency: 10 }, ); } - private async run(relayHash: string) { const repository = this.postgres.getRepository(entities.V3FundsDeposited); const deposit = await repository.findOne({ @@ -80,4 +79,7 @@ export class IntegratorIdWorker { } return; } + public async close() { + return this.worker.close(); + } }