diff --git a/README.md b/README.md index 489ee6f..92bee1b 100644 --- a/README.md +++ b/README.md @@ -72,7 +72,8 @@ TYPEORM_ENTITIES=dist/orm/*Entity.js # entity path | L1_RPC_URI | L1 node RPC URI | | | L2_LCD_URI | L2 node LCD URI | | | L2_RPC_URI | L2 node RPC URI | | -| L2_GAS_PRICES | Gas prices for L2 chain | '0.15umin' | +| L1_GAS_PRICES | Gas prices for L1 chain | '0.15uinit' | +| L2_GAS_PRICES | Gas prices for L2 chain | '0umin' | | BRIDGE_ID | Bridge ID | '' | | EXECUTOR_PORT | Executor port | 5000 | | EXECUTOR_MNEMONIC | Mnemonic seed for executor | '' | @@ -98,6 +99,7 @@ TYPEORM_ENTITIES=dist/orm/*Entity.js # entity path | ------------------------- | ------------------------------------------ | ------------------------ | | L1_LCD_URI | L1 node LCD URI | | | L1_RPC_URI | L1 node RPC URI | | +| L1_GAS_PRICES | Gas prices for L1 chain | '0.15uinit' | | BRIDGE_ID | Bridge ID | '' | | OUTPUT_SUBMITTER_MNEMONIC | Mnemonic seed for output submitter | '' | | SLACK_WEB_HOOK | Slack web hook for notification (optional) | '' | @@ -128,7 +130,7 @@ TYPEORM_ENTITIES=dist/orm/*Entity.js # entity path | ENABLE_API_ONLY | Enable API only mode (optional) | false | | BATCH_METRICS_PORT | Batch metrics port | 8082 | | PROMETHEUS_METRICS_MODE | Prometheus metrics mode ['pull', 'push'] | 'pull' | -| PROMETHEUS_GATEWAY_URI | Prometheus push gateway URI | 'http://127.0.0.1:9091' | +| PROMETHEUS_GATEWAY_URI | Prometheus push gateway URI | 'http://127.0.0.1:9091' | | PROMETHEUS_TIME_OUT | Prometheus push time out (unit: ms) | 5000 | - `.env.challenger` @@ -139,6 +141,7 @@ TYPEORM_ENTITIES=dist/orm/*Entity.js # entity path | L1_RPC_URI | L1 node RPC URI | | | L2_LCD_URI | L2 node LCD URI | | | L2_RPC_URI | L2 node RPC URI | | +| L1_GAS_PRICES | Gas prices for L1 chain | '0.15uinit' | | BRIDGE_ID | Bridge ID | '' | | CHALLENGER_MNEMONIC | Mnemonic seed for challenger | '' | | DELETE_OUTPUT_PROPOSAL | Delete output proposal | false | diff --git a/src/config.ts b/src/config.ts index f346c3f..1a87d24 100644 --- a/src/config.ts +++ b/src/config.ts @@ -39,6 +39,7 @@ const { BATCH_SUBMITTER_MNEMONIC, CHALLENGER_MNEMONIC, USE_LOG_FILE, + L1_GAS_PRICES, L2_GAS_PRICES, L1_CHAIN_ID, L2_CHAIN_ID, @@ -150,7 +151,7 @@ export const config = { l1lcd: new LCDClientL1( getUri(L1_LCD_URI), { - gasPrices: '0.15uinit', + gasPrices: L1_GAS_PRICES || '0.15uinit', gasAdjustment: '2', chainId: L1_CHAIN_ID }, @@ -225,3 +226,5 @@ validateCelestiaConfig() export const INTERVAL_BATCH = 100_000 export const INTERVAL_MONITOR = 100 export const INTERVAL_OUTPUT = 10_000 + +export const SECOND = 1_000 diff --git a/src/worker/batchSubmitter/batchSubmitter.ts b/src/worker/batchSubmitter/batchSubmitter.ts index 4009641..7b476b3 100644 --- a/src/worker/batchSubmitter/batchSubmitter.ts +++ b/src/worker/batchSubmitter/batchSubmitter.ts @@ -14,7 +14,7 @@ import { import { delay } from 'bluebird' import { INTERVAL_BATCH } from '../../config' import { config } from '../../config' -import MonitorHelper from '../../lib/monitor/helper' +import MonitorHelper from '../bridgeExecutor/monitor/helper' import { createBlob, getCelestiaFeeGasLimit } from '../../celestia/utils' import { bech32 } from 'bech32' import { TxWalletL1 } from '../../lib/walletL1' diff --git a/src/worker/bridgeExecutor/Resurrector.ts b/src/worker/bridgeExecutor/Resurrector.ts deleted file mode 100644 index b24062e..0000000 --- a/src/worker/bridgeExecutor/Resurrector.ts +++ /dev/null @@ -1,124 +0,0 @@ -import { getDB } from './db' -import UnconfirmedTxEntity from '../../orm/executor/UnconfirmedTxEntity' -import { Coin, MsgFinalizeTokenDeposit } from 'initia-l2' -import { INTERVAL_MONITOR, config } from '../../config' -import { DataSource } from 'typeorm' -import Bluebird from 'bluebird' -import winston from 'winston' -import { - TxWalletL2, - WalletType, - getWallet, - initWallet -} from '../../lib/walletL2' -import { - buildFailedTxNotification, - buildResolveErrorNotification, - notifySlack -} from '../../lib/slack' - -export class Resurrector { - private db: DataSource - isRunning = true - executorL2: TxWalletL2 - errorCounter = 0 - - constructor(public logger: winston.Logger) { - [this.db] = getDB() - initWallet(WalletType.Executor, config.l2lcd) - this.executorL2 = getWallet(WalletType.Executor) - } - - async updateProcessed(unconfirmedTx: UnconfirmedTxEntity): Promise { - await this.db.getRepository(UnconfirmedTxEntity).update( - { - bridgeId: unconfirmedTx.bridgeId, - sequence: unconfirmedTx.sequence, - processed: false - }, - { processed: true } - ) - - this.logger.info( - `Resurrected failed tx: ${unconfirmedTx.bridgeId} ${unconfirmedTx.sequence}` - ) - } - - async resubmitFailedDepositTx( - unconfirmedTx: UnconfirmedTxEntity - ): Promise { - const txKey = `${unconfirmedTx.sender}-${unconfirmedTx.receiver}-${unconfirmedTx.amount}` - const msg = new MsgFinalizeTokenDeposit( - this.executorL2.key.accAddress, - unconfirmedTx.sender, - unconfirmedTx.receiver, - new Coin(unconfirmedTx.l2Denom, unconfirmedTx.amount), - parseInt(unconfirmedTx.sequence), - unconfirmedTx.l1Height, - unconfirmedTx.l1Denom, - Buffer.from(unconfirmedTx.data, 'hex').toString('base64') - ) - try { - await this.executorL2.transaction([msg]) - await this.updateProcessed(unconfirmedTx) - await notifySlack( - txKey, - buildResolveErrorNotification( - `[INFO] Transaction successfully resubmitted and processed for ${unconfirmedTx.sender} to ${unconfirmedTx.receiver} of amount ${unconfirmedTx.amount}.` - ), - false - ) - } catch (err) { - if (this.errorCounter++ < 20) { - await Bluebird.delay(5 * 1000) - return - } - this.errorCounter = 0 - await notifySlack(txKey, buildFailedTxNotification(unconfirmedTx)) - this.logger.error( - `Failed to resubmit tx: bridge id ${unconfirmedTx.bridgeId} sequence ${unconfirmedTx.sequence}`, - err - ) - } - } - - async getunconfirmedTxs(): Promise { - return await this.db.getRepository(UnconfirmedTxEntity).find({ - where: { - processed: false - } - }) - } - - public async ressurect(): Promise { - const unconfirmedTxs = await this.getunconfirmedTxs() - - for (const unconfirmedTx of unconfirmedTxs) { - const error = unconfirmedTx.error - - // Check x/opchild/errors.go - if (error.includes('deposit already finalized')) { - await this.updateProcessed(unconfirmedTx) - continue - } - await this.resubmitFailedDepositTx(unconfirmedTx) - } - } - - stop(): void { - this.isRunning = false - } - - public async run() { - while (this.isRunning) { - try { - await this.ressurect() - } catch (err) { - this.stop() - throw new Error(err) - } finally { - await Bluebird.delay(INTERVAL_MONITOR) - } - } - } -} diff --git a/src/worker/bridgeExecutor/index.ts b/src/worker/bridgeExecutor/index.ts index 63d0f20..a4bfb24 100644 --- a/src/worker/bridgeExecutor/index.ts +++ b/src/worker/bridgeExecutor/index.ts @@ -1,6 +1,6 @@ import { RPCClient } from '../../lib/rpc' -import { L1Monitor } from '../../lib/monitor/l1' -import { L2Monitor } from '../../lib/monitor/l2' +import { L1Monitor } from './monitor/l1' +import { L2Monitor } from './monitor/l2' import { executorController, metricsController } from '../../controller' import { executorLogger as logger } from '../../lib/logger' @@ -8,15 +8,13 @@ import { initORM, finalizeORM } from './db' import { initServer, finalizeServer, initMetricsServer } from '../../loader' import { once } from 'lodash' import { config } from '../../config' -import { Resurrector } from './Resurrector' let monitors async function runBot(): Promise { monitors = [ new L1Monitor(new RPCClient(config.L1_RPC_URI, logger), logger), - new L2Monitor(new RPCClient(config.L2_RPC_URI, logger), logger), - new Resurrector(logger) + new L2Monitor(new RPCClient(config.L2_RPC_URI, logger), logger) ] try { await Promise.all( diff --git a/src/lib/monitor/helper.ts b/src/worker/bridgeExecutor/monitor/helper.ts similarity index 94% rename from src/lib/monitor/helper.ts rename to src/worker/bridgeExecutor/monitor/helper.ts index 7fd6d1c..b8d49b3 100644 --- a/src/lib/monitor/helper.ts +++ b/src/worker/bridgeExecutor/monitor/helper.ts @@ -4,13 +4,16 @@ import { TxSearchOptions, TxSearchResult } from 'initia-l2' -import { getLatestOutputFromExecutor, getOutputFromExecutor } from '../query' -import { WithdrawStorage } from '../storage' -import { WithdrawalTx } from '../types' -import { sha3_256 } from '../util' -import OutputEntity from '../../orm/executor/OutputEntity' +import { + getLatestOutputFromExecutor, + getOutputFromExecutor +} from '../../../lib/query' +import { WithdrawStorage } from '../../../lib/storage' +import { WithdrawalTx } from '../../../lib/types' +import { sha3_256 } from '../../../lib/util' +import OutputEntity from '../../../orm/executor/OutputEntity' import { EntityManager, EntityTarget, ObjectLiteral } from 'typeorm' -import { Block, BlockResults, RPCClient } from '../../lib/rpc' +import { Block, BlockResults, RPCClient } from '../../../lib/rpc' class MonitorHelper { /// @@ -202,8 +205,7 @@ class MonitorHelper { let attempt = 0 while (!blockResults && attempt < maxRetry) { try { - blockResults = await rpcClient - .getBlockResults(minHeight + i) + blockResults = await rpcClient.getBlockResults(minHeight + i) } catch { if (attempt === maxRetry) { throw new Error('Failed to feed block results') diff --git a/src/lib/monitor/index.ts b/src/worker/bridgeExecutor/monitor/index.ts similarity index 100% rename from src/lib/monitor/index.ts rename to src/worker/bridgeExecutor/monitor/index.ts diff --git a/src/lib/monitor/l1.ts b/src/worker/bridgeExecutor/monitor/l1.ts similarity index 90% rename from src/lib/monitor/l1.ts rename to src/worker/bridgeExecutor/monitor/l1.ts index fb061bd..4d5f754 100644 --- a/src/lib/monitor/l1.ts +++ b/src/worker/bridgeExecutor/monitor/l1.ts @@ -12,17 +12,24 @@ import { ExecutorUnconfirmedTxEntity, ExecutorOutputEntity, StateEntity -} from '../../orm' +} from '../../../orm' import { EntityManager } from 'typeorm' -import { RPCClient } from '../rpc' -import { getDB } from '../../worker/bridgeExecutor/db' +import { RPCClient } from '../../../lib/rpc' +import { getDB } from '../db' import winston from 'winston' -import { config } from '../../config' -import { TxWalletL2, WalletType, getWallet, initWallet } from '../walletL2' +import { config } from '../../../config' +import { + TxWalletL2, + WalletType, + getWallet, + initWallet +} from '../../../lib/walletL2' +import { Resurrector } from './resurrector' export class L1Monitor extends Monitor { executorL2: TxWalletL2 oracleHeight: number + resurrector: Resurrector constructor( public rpcClient: RPCClient, @@ -32,7 +39,7 @@ export class L1Monitor extends Monitor { [this.db] = getDB() initWallet(WalletType.Executor, config.l2lcd) this.executorL2 = getWallet(WalletType.Executor) - + this.resurrector = new Resurrector(logger, this.executorL2) this.oracleHeight = 0 } @@ -100,18 +107,22 @@ export class L1Monitor extends Monitor { } public async handleNewBlock(): Promise { + await this.resurrector.ressurect() + if (!config.ENABLE_ORACLE) return - if (!this.latestHeight) return + if (!this.latestHeight || this.oracleHeight == this.latestHeight) { + this.logger.info( + `[handleNewBlock - ${this.name()}] No new block to update oracle tx` + ) + return + } const latestTx0 = this.getBlockByHeight(this.latestHeight)?.block.data .txs[0] - if ( - !latestTx0 || - this.oracleHeight == this.latestHeight - ) { + if (!latestTx0) { this.logger.info( - `[handleNewBlock - ${this.name()}] No new block to update oracle tx` + `[handleNewBlock - ${this.name()}] No txs in height: ${this.latestHeight}` ) return } @@ -190,7 +201,7 @@ export class L1Monitor extends Monitor { } public async handleEvents(manager: EntityManager): Promise { - const blockResults = this.getBlockResultsByHeight(this.currentHeight) + const blockResults = await this.getBlockResultsByHeight(this.currentHeight) const [isEmpty, events] = await this.helper.fetchAllEvents(blockResults) if (isEmpty) { this.logger.info( diff --git a/src/lib/monitor/l2.ts b/src/worker/bridgeExecutor/monitor/l2.ts similarity index 93% rename from src/lib/monitor/l2.ts rename to src/worker/bridgeExecutor/monitor/l2.ts index 4b2b772..419b646 100644 --- a/src/lib/monitor/l2.ts +++ b/src/worker/bridgeExecutor/monitor/l2.ts @@ -1,13 +1,18 @@ -import { ExecutorOutputEntity, ExecutorWithdrawalTxEntity } from '../../orm' +import { ExecutorOutputEntity, ExecutorWithdrawalTxEntity } from '../../../orm' import { Monitor } from './monitor' import { EntityManager } from 'typeorm' import { BlockInfo } from 'initia-l2' -import { getDB } from '../../worker/bridgeExecutor/db' -import { RPCClient } from '../rpc' +import { getDB } from '../db' +import { RPCClient } from '../../../lib/rpc' import winston from 'winston' -import { config } from '../../config' -import { getBridgeInfo, getLastOutputInfo } from '../query' -import { TxWalletL2, WalletType, getWallet, initWallet } from '../walletL2' +import { config } from '../../../config' +import { getBridgeInfo, getLastOutputInfo } from '../../../lib/query' +import { + TxWalletL2, + WalletType, + getWallet, + initWallet +} from '../../../lib/walletL2' export class L2Monitor extends Monitor { executorL2: TxWalletL2 @@ -89,7 +94,7 @@ export class L2Monitor extends Monitor { } public async handleEvents(manager: EntityManager): Promise { - const blockResults = this.getBlockResultsByHeight(this.currentHeight) + const blockResults = await this.getBlockResultsByHeight(this.currentHeight) const [isEmpty, events] = await this.helper.fetchAllEvents(blockResults) if (isEmpty) { this.logger.info( diff --git a/src/lib/monitor/monitor.ts b/src/worker/bridgeExecutor/monitor/monitor.ts similarity index 63% rename from src/lib/monitor/monitor.ts rename to src/worker/bridgeExecutor/monitor/monitor.ts index 4ad4940..cba6f5a 100644 --- a/src/lib/monitor/monitor.ts +++ b/src/worker/bridgeExecutor/monitor/monitor.ts @@ -1,19 +1,22 @@ import Bluebird from 'bluebird' -import { Block, BlockResults, RPCClient } from '../rpc' -import { StateEntity } from '../../orm' +import { Block, BlockResults, RPCClient } from '../../../lib/rpc' +import { StateEntity } from '../../../orm' import { DataSource, EntityManager } from 'typeorm' import MonitorHelper from './helper' import winston from 'winston' -import { INTERVAL_MONITOR, config } from '../../config' -import { updateExecutorUsageMetrics } from '../../lib/metrics' +import { INTERVAL_MONITOR, SECOND, config } from '../../../config' +import { updateExecutorUsageMetrics } from '../../../lib/metrics' const MAX_BLOCKS = 20 // DO NOT CHANGE THIS, hard limit is 20 in cometbft. +const MAX_QUEUE_SIZE = 1000 const MAX_RETRY_INTERVAL = 30_000 export abstract class Monitor { public syncedHeight: number public currentHeight: number public latestHeight: number + + public isFirstRun = true public blockQueue: [number, Block][] = [] public blockResultsQueue: [number, BlockResults][] = [] @@ -30,18 +33,84 @@ export abstract class Monitor { this.bridgeId = config.BRIDGE_ID } + public async feedQueue(): Promise { + if (!this.isFirstRun) throw new Error('not first run') + this.isFirstRun = false + for (let i = 0; ; i++) { + try { + this.blockQueue = this.blockQueue.filter( + // eslint-disable-next-line + ([height, _]) => height > this.syncedHeight + ) + this.blockResultsQueue = this.blockResultsQueue.filter( + // eslint-disable-next-line + ([height, _]) => height > this.syncedHeight + ) + + if (this.blockQueue.length < MAX_QUEUE_SIZE) { + const feedStartHeight = + this.blockQueue.length > 0 + ? this.blockQueue[this.blockQueue.length - 1][0] + 1 + : this.syncedHeight + 1 + const feedEndHeight = Math.min( + this.latestHeight, + feedStartHeight + MAX_BLOCKS + ) + + const newBlocks = await this.helper.feedBlock( + this.rpcClient, + feedStartHeight, + feedEndHeight + ) + this.blockQueue = this.blockQueue.concat(newBlocks) + } + + if (this.blockResultsQueue.length < MAX_QUEUE_SIZE) { + const feedStartHeight = + this.blockResultsQueue.length > 0 + ? this.blockResultsQueue[this.blockResultsQueue.length - 1][0] + 1 + : this.syncedHeight + 1 + const feedEndHeight = Math.min( + this.latestHeight, + feedStartHeight + MAX_BLOCKS + ) + const newBlockResults = await this.helper.feedBlockResults( + this.rpcClient, + feedStartHeight, + feedEndHeight + ) + this.blockResultsQueue = + this.blockResultsQueue.concat(newBlockResults) + } + } catch (e) { + this.logger.error(`Error in feedQueue: `, e) + } finally { + await Bluebird.delay(SECOND) + } + } + } + public getBlockByHeight(height: number): Block | null { const block = this.blockQueue.find((block) => block[0] === height) if (!block) return null return block[1] } - public getBlockResultsByHeight(height: number): BlockResults { + public async getBlockResultsByHeight(height: number): Promise { const blockResult = this.blockResultsQueue.find( (blockResults) => blockResults[0] === height ) - if (!blockResult) - throw new Error(`block result not found for height ${height}`) + if (!blockResult) { + this.logger.info( + `${this.name()} fetching block results for height ${height}...` + ) + const res = await this.helper.feedBlockResults( + this.rpcClient, + height, + height + ) + return res[0][1] + } return blockResult[1] } @@ -77,7 +146,9 @@ export abstract class Monitor { async handleBlockWithStateUpdate(manager: EntityManager): Promise { await this.handleBlock(manager) if (this.syncedHeight % 10 === 0) { - this.logger.info(`${this.name()} height ${this.syncedHeight}`) + this.logger.info( + `${this.name()} syncedHeight ${this.syncedHeight}, blockQueue ${this.blockQueue.length}, blockResultsQueue ${this.blockResultsQueue.length}` + ) } this.syncedHeight++ await manager @@ -88,6 +159,7 @@ export abstract class Monitor { public async monitor(): Promise { await this.prepareMonitor() + this.feedQueue() while (this.isRunning) { try { this.latestHeight = await this.rpcClient.getLatestBlockHeight() @@ -107,18 +179,6 @@ export abstract class Monitor { ) if (blockchainData === null) continue - this.blockQueue = await this.helper.feedBlock( - this.rpcClient, - this.syncedHeight + 1, - maxHeight - ) - - this.blockResultsQueue = await this.helper.feedBlockResults( - this.rpcClient, - this.syncedHeight + 1, - maxHeight - ) - await this.handleNewBlock() await this.db.transaction(async (manager: EntityManager) => { diff --git a/src/worker/bridgeExecutor/monitor/resurrector.ts b/src/worker/bridgeExecutor/monitor/resurrector.ts new file mode 100644 index 0000000..94d8e36 --- /dev/null +++ b/src/worker/bridgeExecutor/monitor/resurrector.ts @@ -0,0 +1,135 @@ +import { getDB } from '../db' +import UnconfirmedTxEntity from '../../../orm/executor/UnconfirmedTxEntity' +import { Coin, Msg, MsgFinalizeTokenDeposit } from 'initia-l2' +import { SECOND, config } from '../../../config' +import { DataSource } from 'typeorm' +import Bluebird from 'bluebird' +import winston from 'winston' +import { TxWalletL2, WalletType, initWallet } from '../../../lib/walletL2' +import { + buildFailedTxNotification, + buildResolveErrorNotification, + notifySlack +} from '../../../lib/slack' + +const MAX_RESURRECT_SIZE = 100 + +export class Resurrector { + private db: DataSource + isRunning = true + errorCounter = 0 + + constructor( + public logger: winston.Logger, + public executorL2: TxWalletL2 + ) { + [this.db] = getDB() + initWallet(WalletType.Executor, config.l2lcd) + } + + public name(): string { + return 'resurrector' + } + + async updateProcessed(unconfirmedTx: UnconfirmedTxEntity): Promise { + await this.db.getRepository(UnconfirmedTxEntity).update( + { + bridgeId: unconfirmedTx.bridgeId, + sequence: unconfirmedTx.sequence, + processed: false + }, + { processed: true } + ) + + this.logger.info( + `[updateProcessed - ${this.name()}] Resurrected failed tx sequence ${unconfirmedTx.sequence}` + ) + } + + createMsg(unconfirmedTx: UnconfirmedTxEntity): Msg { + const msg = new MsgFinalizeTokenDeposit( + this.executorL2.key.accAddress, + unconfirmedTx.sender, + unconfirmedTx.receiver, + new Coin(unconfirmedTx.l2Denom, unconfirmedTx.amount), + parseInt(unconfirmedTx.sequence), + unconfirmedTx.l1Height, + unconfirmedTx.l1Denom, + Buffer.from(unconfirmedTx.data, 'hex').toString('base64') + ) + return msg + } + + createTxKey(unconfirmedTxs: UnconfirmedTxEntity[]): string { + return `${unconfirmedTxs[0].sender}-${unconfirmedTxs[0].receiver}-${unconfirmedTxs[0].amount}` + } + + async resubmitFailedDepositTxs( + unconfirmedTxs: UnconfirmedTxEntity[] + ): Promise { + const msgs = unconfirmedTxs.map((unconfirmedTx) => + this.createMsg(unconfirmedTx) + ) + const txKey = this.createTxKey(unconfirmedTxs) + try { + await this.executorL2.transaction(msgs) + await Promise.all(unconfirmedTxs.map((tx) => this.updateProcessed(tx))) + await notifySlack( + txKey, + buildResolveErrorNotification( + `[INFO] Transaction successfully resubmitted and processed from ${unconfirmedTxs[0].sequence} to ${unconfirmedTxs[unconfirmedTxs.length - 1].sequence} sequence.` + ), + false + ) + } catch (err) { + if (this.errorCounter++ < 30) { + await Bluebird.delay(SECOND) + return + } + this.errorCounter = 0 + await notifySlack(txKey, buildFailedTxNotification(unconfirmedTxs[0])) + this.logger.error( + `[resubmitFailedDepositTxs - ${this.name()}] Failed to resubmit txs: bridge id ${unconfirmedTxs[0].bridgeId} sequence ${unconfirmedTxs[0].sequence}`, + err + ) + } + } + + async getUnconfirmedTxs(): Promise { + return await this.db.getRepository(UnconfirmedTxEntity).find({ + where: { + processed: false + } + }) + } + + public async ressurect(): Promise { + const unconfirmedTxs = await this.getUnconfirmedTxs() + + if (unconfirmedTxs.length === 0) { + this.logger.info(`[ressurect - ${this.name()}] No unconfirmed txs found`) + return + } + + this.logger.info( + `[ressurect - ${this.name()}] Found ${unconfirmedTxs.length} unconfirmed txs` + ) + + const unconfirmedTxsChunks: UnconfirmedTxEntity[] = [] + for (const unconfirmedTx of unconfirmedTxs) { + // Check x/opchild/errors.go + if (unconfirmedTx.error.includes('deposit already finalized')) { + await this.updateProcessed(unconfirmedTx) + continue + } + unconfirmedTxsChunks.push(unconfirmedTx) + if (unconfirmedTxsChunks.length === MAX_RESURRECT_SIZE) { + await this.resubmitFailedDepositTxs(unconfirmedTxsChunks) + unconfirmedTxsChunks.length = 0 + } + } + if (unconfirmedTxsChunks.length > 0) { + await this.resubmitFailedDepositTxs(unconfirmedTxsChunks) + } + } +} diff --git a/src/worker/challenger/challenger.ts b/src/worker/challenger/challenger.ts index 0001d56..3877426 100644 --- a/src/worker/challenger/challenger.ts +++ b/src/worker/challenger/challenger.ts @@ -18,7 +18,7 @@ import { getOutputInfoByIndex, getBridgeInfo } from '../../lib/query' -import MonitorHelper from '../../lib/monitor/helper' +import MonitorHelper from '../bridgeExecutor/monitor/helper' import winston from 'winston' import { TxWalletL1, diff --git a/src/worker/challenger/index.ts b/src/worker/challenger/index.ts index bfdc46f..063e6b9 100644 --- a/src/worker/challenger/index.ts +++ b/src/worker/challenger/index.ts @@ -1,5 +1,5 @@ import { RPCClient } from '../../lib/rpc' -import { Monitor } from '../../lib/monitor' +import { Monitor } from '../bridgeExecutor/monitor' import { Challenger } from './challenger' import { initORM, finalizeORM } from './db' import { challengerLogger as logger } from '../../lib/logger' diff --git a/src/worker/challenger/monitor_l1.ts b/src/worker/challenger/monitor_l1.ts index 0fa06ef..7ca93ed 100644 --- a/src/worker/challenger/monitor_l1.ts +++ b/src/worker/challenger/monitor_l1.ts @@ -1,4 +1,4 @@ -import { Monitor } from '../../lib/monitor' +import { Monitor } from '../bridgeExecutor/monitor' import { ChallengerDepositTxEntity, ChallengerFinalizeWithdrawalTxEntity @@ -58,7 +58,7 @@ export class L1Monitor extends Monitor { } public async handleEvents(manager: EntityManager): Promise { - const blockResults = this.getBlockResultsByHeight(this.currentHeight) + const blockResults = await this.getBlockResultsByHeight(this.currentHeight) const [isEmpty, events] = await this.helper.fetchAllEvents(blockResults) if (isEmpty) return false diff --git a/src/worker/challenger/monitor_l2.ts b/src/worker/challenger/monitor_l2.ts index 2fd7bd8..84fe043 100644 --- a/src/worker/challenger/monitor_l2.ts +++ b/src/worker/challenger/monitor_l2.ts @@ -4,7 +4,7 @@ import { ChallengerWithdrawalTxEntity } from '../../orm' import { OutputInfo } from 'initia-l2' -import { Monitor } from '../../lib/monitor' +import { Monitor } from '../bridgeExecutor/monitor' import { EntityManager } from 'typeorm' import { RPCClient } from '../../lib/rpc' import winston from 'winston' @@ -87,7 +87,7 @@ export class L2Monitor extends Monitor { } public async handleEvents(manager: EntityManager): Promise { - const blockResults = this.getBlockResultsByHeight(this.currentHeight) + const blockResults = await this.getBlockResultsByHeight(this.currentHeight) const [isEmpty, events] = await this.helper.fetchAllEvents(blockResults) if (isEmpty) return false diff --git a/src/worker/outputSubmitter/outputSubmitter.ts b/src/worker/outputSubmitter/outputSubmitter.ts index 5ed87de..522f92c 100644 --- a/src/worker/outputSubmitter/outputSubmitter.ts +++ b/src/worker/outputSubmitter/outputSubmitter.ts @@ -6,7 +6,7 @@ import { outputLogger as logger } from '../../lib/logger' import { ErrorTypes } from '../../lib/error' import { config } from '../../config' import { getLastOutputInfo } from '../../lib/query' -import MonitorHelper from '../../lib/monitor/helper' +import MonitorHelper from '../bridgeExecutor/monitor/helper' import { DataSource, EntityManager } from 'typeorm' import { getDB } from './db' import {