diff --git a/src/modules/dataAggregator/services/aggregation.helper.ts b/src/modules/dataAggregator/services/aggregation.helper.ts index 518fb91..cb34435 100644 --- a/src/modules/dataAggregator/services/aggregation.helper.ts +++ b/src/modules/dataAggregator/services/aggregation.helper.ts @@ -215,8 +215,22 @@ export class AggregationHelper { const responseBuffer: GetTransfersByAccountQuery['transfers'] = []; let index = 1; + const pubicKeyShort = `${inputData.publicKey.substring( + 0, + 5, + )}..${inputData.publicKey.substring( + inputData.publicKey.length - 5, + inputData.publicKey.length - 1, + )}`; + const runQuery = async (offset: number = 0) => { const currentOffset = offset; + console.log( + `${pubicKeyShort} :: query START :: ${inputData.blockchainTag} :: ${ + inputData.chunkStartBlock + }/${inputData.chunkEndBlock} :: offset ${currentOffset}`, + ); + const resp = await this.dataSourceUtils.getTransfersByAccount({ limit: pageSize, offset: currentOffset, @@ -225,16 +239,12 @@ export class AggregationHelper { blockNumber_lt: inputData.chunkEndBlock, queryUrl: inputData.sourceUrl, }); + console.log( + `${pubicKeyShort} :: query COMPLETED :: ${inputData.blockchainTag} :: ${inputData.chunkStartBlock}/${inputData.chunkEndBlock} `, + ); if (resp.transfers.length === 0) return; responseBuffer.push(...resp.transfers); - console.log( - `runQuery :: ${inputData.blockchainTag} :: ${ - inputData.chunkStartBlock - }/${inputData.chunkEndBlock} :: index ${index} :: offset ${ - currentOffset + pageSize - }`, - ); index++; await runQuery(currentOffset + pageSize); }; diff --git a/src/modules/queueProcessor/queueProcessor.module.ts b/src/modules/queueProcessor/queueProcessor.module.ts index 749cf3b..c599b61 100644 --- a/src/modules/queueProcessor/queueProcessor.module.ts +++ b/src/modules/queueProcessor/queueProcessor.module.ts @@ -47,16 +47,16 @@ import { DatasourceChunkParallelHandlingConsumer } from './services/consumers/da }, { name: SubIdAggregatorQueueName.DATASOURCE_CHUNKS_PARALLEL_HANDLING, - processors: [ - { - concurrency: 101, - name: 'TRANSFER_CHUNK', - path: join( - __dirname, - 'services/workers/collectTransfersDataChunk.worker.js', - ), - }, - ], + // processors: [ + // { + // concurrency: 101, + // name: 'TRANSFER_CHUNK', + // path: join( + // __dirname, + // 'services/workers/collectTransfersDataChunk.worker.js', + // ), + // }, + // ], }, ), BullBoardModule.forFeature( diff --git a/src/modules/queueProcessor/services/consumers/datasourceChunkParallelHandling.consumer.ts b/src/modules/queueProcessor/services/consumers/datasourceChunkParallelHandling.consumer.ts index ee587fa..35a58ea 100644 --- a/src/modules/queueProcessor/services/consumers/datasourceChunkParallelHandling.consumer.ts +++ b/src/modules/queueProcessor/services/consumers/datasourceChunkParallelHandling.consumer.ts @@ -10,28 +10,28 @@ import { CollectEventDataChunkFromDataSourceInput } from '../../dto/collectEvent export class DatasourceChunkParallelHandlingConsumer { constructor(private aggregationHelper: AggregationHelper) {} - // @Process({ - // name: 'TRANSFER_CHUNK', - // concurrency: 200, - // }) - // async collectAccountTransfersChunk( - // job: Job, - // ) { - // await job.takeLock(); - // - // try { - // const result = await this.aggregationHelper.collectTransferEventDataChunk( - // job.data, - // ); - // - // await job.releaseLock(); - // await job.moveToCompleted(JSON.stringify(result), true); - // } catch (e) { - // await job.releaseLock(); - // await job.moveToFailed({ - // message: (e as Error).message || 'Something went wrong.', - // }); - // } - // return {}; - // } + @Process({ + name: 'TRANSFER_CHUNK', + concurrency: 200, + }) + async collectAccountTransfersChunk( + job: Job, + ) { + await job.takeLock(); + + try { + const result = await this.aggregationHelper.collectTransferEventDataChunk( + job.data, + ); + + await job.releaseLock(); + await job.moveToCompleted(JSON.stringify(result), true); + } catch (e) { + await job.releaseLock(); + await job.moveToFailed({ + message: (e as Error).message || 'Something went wrong.', + }); + } + return {}; + } } diff --git a/src/modules/queueProcessor/services/consumers/datasourceHandling.consumer.ts b/src/modules/queueProcessor/services/consumers/datasourceHandling.consumer.ts index 8aac839..d0aa612 100644 --- a/src/modules/queueProcessor/services/consumers/datasourceHandling.consumer.ts +++ b/src/modules/queueProcessor/services/consumers/datasourceHandling.consumer.ts @@ -33,30 +33,30 @@ export class DatasourceHandlingConsumer { return {}; } - @Process({ - name: 'TRANSFER_CHUNK', - concurrency: 200, - }) - async collectAccountTransfersChunk( - job: Job, - ) { - await job.takeLock(); - - try { - const result = await this.aggregationHelper.collectTransferEventDataChunk( - job.data, - ); - - await job.releaseLock(); - await job.moveToCompleted(JSON.stringify(result), true); - } catch (e) { - await job.releaseLock(); - await job.moveToFailed({ - message: (e as Error).message || 'Something went wrong.', - }); - } - return {}; - } + // @Process({ + // name: 'TRANSFER_CHUNK', + // concurrency: 200, + // }) + // async collectAccountTransfersChunk( + // job: Job, + // ) { + // await job.takeLock(); + // + // try { + // const result = await this.aggregationHelper.collectTransferEventDataChunk( + // job.data, + // ); + // + // await job.releaseLock(); + // await job.moveToCompleted(JSON.stringify(result), true); + // } catch (e) { + // await job.releaseLock(); + // await job.moveToFailed({ + // message: (e as Error).message || 'Something went wrong.', + // }); + // } + // return {}; + // } @Process({ name: NativeTransactionKind.VOTE, diff --git a/src/modules/queueProcessor/services/producers/datasourceChunksParallelHandling.producer.ts b/src/modules/queueProcessor/services/producers/datasourceChunksParallelHandling.producer.ts index 9a5e0ee..4e80851 100644 --- a/src/modules/queueProcessor/services/producers/datasourceChunksParallelHandling.producer.ts +++ b/src/modules/queueProcessor/services/producers/datasourceChunksParallelHandling.producer.ts @@ -25,10 +25,13 @@ export class DatasourceChunksParallelHandlingProducer { 'TRANSFER_CHUNK', requestData, { - attempts: 20, + attempts: 5, + timeout: 60 * 1000, jobId: crypto.randomUUID(), - removeOnComplete: false, + removeOnComplete: true, removeOnFail: false, + stackTraceLimit: 100, + }, ); diff --git a/src/modules/queueProcessor/services/workers/collectTransfersDataChunk.worker.ts b/src/modules/queueProcessor/services/workers/collectTransfersDataChunk.worker.ts index de87824..f63b502 100644 --- a/src/modules/queueProcessor/services/workers/collectTransfersDataChunk.worker.ts +++ b/src/modules/queueProcessor/services/workers/collectTransfersDataChunk.worker.ts @@ -19,6 +19,14 @@ export default async function (job: Job, cb: DoneCallback) { const runQuery = async (offset: number = 0) => { const currentOffset = offset; + console.log( + `runQuery started :: ${inputData.blockchainTag} :: ${ + inputData.chunkStartBlock + }/${inputData.chunkEndBlock} :: index ${index} :: offset ${ + currentOffset + pageSize + }`, + ); + const resp = await dataSourceUtils.getTransfersByAccount({ limit: pageSize, offset: currentOffset, @@ -31,7 +39,7 @@ export default async function (job: Job, cb: DoneCallback) { responseBuffer.push(...resp.transfers); console.log( - `runQuery :: ${inputData.blockchainTag} :: ${ + `runQuery completed :: ${inputData.blockchainTag} :: ${ inputData.chunkStartBlock }/${inputData.chunkEndBlock} :: index ${index} :: offset ${ currentOffset + pageSize diff --git a/src/modulesConfig/bullModule.forRoot.ts b/src/modulesConfig/bullModule.forRoot.ts index 1d71d1f..5728a8b 100644 --- a/src/modulesConfig/bullModule.forRoot.ts +++ b/src/modulesConfig/bullModule.forRoot.ts @@ -70,7 +70,7 @@ export default { settings: { lockDuration: 20000, // Check for stalled jobs each 2 min lockRenewTime: 10000, - stalledInterval: 10 * 60 * 1000, + stalledInterval: 5 * 60 * 1000, maxStalledCount: 1, }, }; diff --git a/src/utils/dataSourceUtils.ts b/src/utils/dataSourceUtils.ts index 3742bc0..078800a 100644 --- a/src/utils/dataSourceUtils.ts +++ b/src/utils/dataSourceUtils.ts @@ -60,9 +60,9 @@ export class DataSourceUtils { } async getTransfersByAccount(data: GetTransfersByAccountArgs) { - console.log( - `request started :: ${data.blockNumber_gt}/${data.blockNumber_lt}`, - ); + // console.log( + // `request started :: ${data.blockNumber_gt}/${data.blockNumber_lt}`, + // ); const res = await this.requestWithRetry( this.squidQueryRequest( {