From d9f47ea311493e47a8fc604488df8cf38c6915ac Mon Sep 17 00:00:00 2001 From: Lyudmil Danailov Date: Tue, 23 Jan 2024 13:22:56 +0200 Subject: [PATCH] Adding way to stop exporter after error, clear queue after it --- blockchains/eth/eth_worker.js | 4 +++- index.js | 12 +++++++++--- lib/task_manager.js | 6 ++++++ 3 files changed, 18 insertions(+), 4 deletions(-) diff --git a/blockchains/eth/eth_worker.js b/blockchains/eth/eth_worker.js index da6dce6b..e8312936 100644 --- a/blockchains/eth/eth_worker.js +++ b/blockchains/eth/eth_worker.js @@ -40,7 +40,9 @@ class ETHWorker extends BaseWorker { return this.ethClient.request('trace_filter', [{ fromBlock: this.web3Wrapper.parseNumberToHex(fromBlock), toBlock: this.web3Wrapper.parseNumberToHex(toBlock) - }]).then((data) => this.parseEthInternalTrx(data['result'])); + }]) + .then((data) => this.parseEthInternalTrx(data['result'])) + .catch((err) => { throw err; }); } async fetchBlocks(fromBlock, toBlock) { diff --git a/index.js b/index.js index 486686d0..75f47792 100644 --- a/index.js +++ b/index.js @@ -30,13 +30,12 @@ class Main { .catch((err) => { throw new Error(`${INIT_EXPORTER_ERR_MSG}${err.message}`); }); } - async handleInitPosition() { const lastRecoveredPosition = await this.exporter.getLastPosition(); this.lastProcessedPosition = this.worker.initPosition(lastRecoveredPosition); await this.exporter.savePosition(this.lastProcessedPosition); } - + async #initTaskManager(blockNumber) { this.taskManager = new TaskManager(blockNumber, constants.BLOCK_INTERVAL); await this.taskManager.initQueue(MAX_CONCURRENT_REQUESTS); @@ -90,7 +89,10 @@ class Main { async workLoop() { while (this.shouldWork) { if (this.taskManager.queue.size < constantsBase.PQUEUE_MAX_SIZE) { - this.taskManager.pushToQueue(() => this.worker.work()); + this.taskManager.pushToQueue(() => this.worker.work().catch(err => { + logger.error(err.stack); + this.shouldWork = false; + })); } this.worker.lastRequestStartTime = new Date(); this.worker.lastExportTime = Date.now(); @@ -112,6 +114,10 @@ class Main { async disconnect() { // This call should be refactored to work with async/await this.exporter.disconnect(); + + logger.info('Clearing queue...'); + this.taskManager.clearQueue(); + await this.microServer.close(); } diff --git a/lib/task_manager.js b/lib/task_manager.js index 6b186672..e2ad9a13 100644 --- a/lib/task_manager.js +++ b/lib/task_manager.js @@ -10,6 +10,12 @@ class TaskManager { const PQueue = (await import('p-queue')).default; this.queue = new PQueue({ concurrency: maxConcurrentRequests }); this.queue.on('completed', (data) => this.handleNewData(data)); + // this.queue.on('error', err => { return Promise.reject(err.toString()); } ); + } + + clearQueue() { + this.taskManager.queue.pause(); + this.taskManager.queue.clear(); } #generateDefensiveCopy() {