Skip to content

Commit

Permalink
Adding way to stop exporter after error, clear queue after it
Browse files Browse the repository at this point in the history
  • Loading branch information
Lyudmil Danailov authored and Lyudmil Danailov committed Jan 23, 2024
1 parent 50bafc6 commit d9f47ea
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 4 deletions.
4 changes: 3 additions & 1 deletion blockchains/eth/eth_worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,9 @@ class ETHWorker extends BaseWorker {
return this.ethClient.request('trace_filter', [{
fromBlock: this.web3Wrapper.parseNumberToHex(fromBlock),
toBlock: this.web3Wrapper.parseNumberToHex(toBlock)
}]).then((data) => this.parseEthInternalTrx(data['result']));
}])
.then((data) => this.parseEthInternalTrx(data['result']))
.catch((err) => { throw err; });
}

async fetchBlocks(fromBlock, toBlock) {
Expand Down
12 changes: 9 additions & 3 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,12 @@ class Main {
.catch((err) => { throw new Error(`${INIT_EXPORTER_ERR_MSG}${err.message}`); });
}


async handleInitPosition() {
const lastRecoveredPosition = await this.exporter.getLastPosition();
this.lastProcessedPosition = this.worker.initPosition(lastRecoveredPosition);
await this.exporter.savePosition(this.lastProcessedPosition);
}

async #initTaskManager(blockNumber) {
this.taskManager = new TaskManager(blockNumber, constants.BLOCK_INTERVAL);
await this.taskManager.initQueue(MAX_CONCURRENT_REQUESTS);
Expand Down Expand Up @@ -90,7 +89,10 @@ class Main {
async workLoop() {
while (this.shouldWork) {
if (this.taskManager.queue.size < constantsBase.PQUEUE_MAX_SIZE) {
this.taskManager.pushToQueue(() => this.worker.work());
this.taskManager.pushToQueue(() => this.worker.work().catch(err => {
logger.error(err.stack);
this.shouldWork = false;
}));
}
this.worker.lastRequestStartTime = new Date();
this.worker.lastExportTime = Date.now();
Expand All @@ -112,6 +114,10 @@ class Main {
async disconnect() {
// This call should be refactored to work with async/await
this.exporter.disconnect();

logger.info('Clearing queue...');
this.taskManager.clearQueue();

await this.microServer.close();
}

Expand Down
6 changes: 6 additions & 0 deletions lib/task_manager.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,12 @@ class TaskManager {
const PQueue = (await import('p-queue')).default;
this.queue = new PQueue({ concurrency: maxConcurrentRequests });
this.queue.on('completed', (data) => this.handleNewData(data));
// this.queue.on('error', err => { return Promise.reject(err.toString()); } );
}

clearQueue() {
this.taskManager.queue.pause();
this.taskManager.queue.clear();
}

#generateDefensiveCopy() {
Expand Down

0 comments on commit d9f47ea

Please sign in to comment.