From c08e1ca308e95ae124003177d4b0a49f5c0ed76f Mon Sep 17 00:00:00 2001 From: Matias Romeo Date: Tue, 14 Nov 2023 22:33:12 -0300 Subject: [PATCH] eos-evm-ws-proxy blockmonitor: Fix fork handling --- peripherals/eos-evm-ws-proxy/block-monitor.js | 99 ++++++++++++------- peripherals/eos-evm-ws-proxy/config.js | 4 +- peripherals/eos-evm-ws-proxy/main.js | 9 +- .../eos-evm-ws-proxy/subscription-server.js | 30 +++--- peripherals/eos-evm-ws-proxy/utils.js | 21 +++- 5 files changed, 108 insertions(+), 55 deletions(-) diff --git a/peripherals/eos-evm-ws-proxy/block-monitor.js b/peripherals/eos-evm-ws-proxy/block-monitor.js index 6a6e24c..353fe3a 100644 --- a/peripherals/eos-evm-ws-proxy/block-monitor.js +++ b/peripherals/eos-evm-ws-proxy/block-monitor.js @@ -2,43 +2,63 @@ const EventEmitter = require('events'); const axios = require('axios'); const {Web3} = require('web3'); const Deque = require('collections/deque'); -const {num_from_id} = require('./utils'); +const {num_from_id, convert_to_epoch} = require('./utils'); const { clearTimeout } = require('timers'); class BlockMonitor extends EventEmitter { - constructor({ web3_rpc_endpoint, nodeos_rpc_endpoint, poll_interval, logger}) { + constructor({ web3_rpc_endpoint, nodeos_rpc_endpoint, poll_interval, genesis, logger}) { super(); this.web3_rpc_endpoint = web3_rpc_endpoint; this.nodeos_rpc_endpoint = nodeos_rpc_endpoint; this.poll_interval = poll_interval; this.web3 = new Web3(web3_rpc_endpoint); this.logger = logger; + this.genesis = genesis; + + if(this.genesis == undefined || this.genesis.timestamp == undefined) throw ("invalid genesis timestamp"); + this.genesis_timestamp = parseInt(this.genesis.timestamp, 16); + this.logger.debug(`Using genesis timestamp: ${this.genesis_timestamp}`); this.reversible_blocks = new Deque(); this.run = false; this.timer_id = null; } - async get_eos_lib() { - const response = await axios.post(this.nodeos_rpc_endpoint+'/v1/chain/get_info', {}); - return response.data.last_irreversible_block_num; + async get_evm_lib() { + let response = await axios.post(this.nodeos_rpc_endpoint+'/v1/chain/get_info', {}); + response = await axios.post(this.nodeos_rpc_endpoint+'/v1/chain/get_block', {block_num_or_id:response.data.last_irreversible_block_num}); + let lib = this.timestamp_to_evm_block_num(convert_to_epoch(response.data.timestamp)); + this.logger.debug(`BlockMonitor::get_evm_lib ${lib}`); + return lib; } + timestamp_to_evm_block_num(timestamp) { + const block_interval = 1; + if (timestamp < this.genesis_timestamp) { + return 0; + } + return 1 + Math.floor((timestamp - this.genesis_timestamp) / block_interval); + } + + remove_front_block() { const block = this.reversible_blocks.shift(); + this.logger.debug(`BlockMonitor::remove_front_block ${block.number}`); this.emit('block_removed', {block}); } fork_last_block() { const block = this.reversible_blocks.pop(); - this.logger.debug(`FORK_LAST_BLOCK ${block}`); + this.logger.debug(`BlockMonitor::fork_last_block ${block.number}`); this.emit('block_forked', {block}); return this.reversible_blocks.peekBack(); } append_new_block(block) { this.reversible_blocks.add(block); + this.logger.debug(`BlockMonitor::append_new_block ${block.number} ${block.hash}`); this.emit('block_appended', {block}); + return block; } async getBlockWithLogs(number_) { @@ -63,56 +83,62 @@ class BlockMonitor extends EventEmitter { return block; } + async get_next_block(block) { + // need to be conservative, sometimes getLogs return empty result for head block + let head_block = await this.web3.eth.getBlock("latest", true); + if( head_block == null ) return null; + + if(block == null) + return await this.getBlockWithLogs(Number(head_block.number) - 1); + + let next_block_num = Number(block.number) + 1; + + let max_block_num = Number(head_block.number) - 1; + if (next_block_num >= max_block_num) { + return null; + } + + return await this.getBlockWithLogs(next_block_num); + } + async poll() { - let next_block = null; try { - // need to be conservative, sometimes getLogs return empty result for head block - let head_block = await this.web3.eth.getBlock("latest", true); - let max_block_num = Number(head_block.number) - 1; let last = this.reversible_blocks.peekBack(); if( last == undefined ) { - last = await this.getBlockWithLogs(max_block_num); + this.logger.debug("BlockMonitor::poll last not defined"); + last = await this.get_next_block() if (last != null) { + this.logger.debug(`BlockMonitor::poll Obtained ${last.number}`); this.append_new_block(last); + } else { + throw new Error("BlockMonitor::poll Unable to get block"); } } - if (last != null && Number(last.number) + 1 < max_block_num) { - next_block = await this.getBlockWithLogs(Number(last.number) + 1); - } else { - next_block = null; - } - + let next_block = await this.get_next_block(last); let found_next_block = false; - while(last != null && next_block != null) { found_next_block = true; if(next_block.parentHash == last.hash) { - this.append_new_block(next_block); - last = next_block; - - if (Number(last.number) + 1 < max_block_num) { - next_block = await this.getBlockWithLogs(Number(last.number) + 1); - } else { - next_block = null; - } - + last = this.append_new_block(next_block); + next_block = await this.get_next_block(last); } else { + this.logger.debug(`BlockMonitor::poll next: ${next_block.number} ${next_block.parentHash} != last: ${last.number} ${last.hash}`); last = this.fork_last_block(); + next_block = await this.get_next_block(last); } } if( found_next_block == true ) { - const eos_lib = await this.get_eos_lib(); - while(this.reversible_blocks.length > 0 && num_from_id(this.reversible_blocks.peek().mixHash) <= eos_lib) { - this.logger.debug(`eoslib: ${eos_lib} ${num_from_id(this.reversible_blocks.peek().mixHash)} ${this.reversible_blocks.peek().number} ${this.reversible_blocks.peek().mixHash}`); + const evm_lib = await this.get_evm_lib(); + while(this.reversible_blocks.length > 0 && this.reversible_blocks.peek().number < evm_lib) { this.remove_front_block(); } } } catch (error) { - this.logger.error(error.message); + this.logger.error(`BlockMonitor::poll => ERR: ${error.message}`); } if(this.run == true) { @@ -120,21 +146,22 @@ class BlockMonitor extends EventEmitter { this.timer_id = setTimeout(() => this.poll(), this.poll_interval || 5000); } else { this.reversible_blocks.clear(); - this.logger.info("BlockMonitor stopped"); + this.logger.info("BlockMonitor::poll => Stopped"); + if (this.timer_id != null) clearTimeout(this.timer_id); } } start() { - this.logger.info("BlockMonitor start"); + if( this.run == true ) return; + this.logger.info("BlockMonitor::start => BlockMonitor starting"); this.run = true; if (this.timer_id != null) clearTimeout(this.timer_id); this.timer_id = setTimeout(() => this.poll(), 0); } stop() { - this.logger.info("BlockMonitor stopping"); - this.run = false; - // don't clean up timeout. let poll() cleanup reversible_blocks + this.logger.info("BlockMonitor::stop => BlockMonitor stopping"); + this.run = false; } is_running() { diff --git a/peripherals/eos-evm-ws-proxy/config.js b/peripherals/eos-evm-ws-proxy/config.js index 0f373a0..1ef7f72 100644 --- a/peripherals/eos-evm-ws-proxy/config.js +++ b/peripherals/eos-evm-ws-proxy/config.js @@ -7,6 +7,7 @@ const poll_interval = parseInt(process.env.POLL_INTERVAL, 10) || 1000; const max_logs_subs_per_connection = parseInt(process.env.MAX_LOGS_SUBS_PER_CONNECTION, 10) || 1; const max_minedtx_subs_per_connection = parseInt(process.env.MAX_MINEDTX_SUBS_PER_CONNECTION, 10) || 1; const log_level = process.env.LOG_LEVEL || 'info'; +const genesis_json = process.env.GENESIS_JSON || 'eos-evm-genesis.json'; module.exports = { ws_listening_port, @@ -16,5 +17,6 @@ module.exports = { poll_interval, max_logs_subs_per_connection, max_minedtx_subs_per_connection, - log_level + log_level, + genesis_json }; diff --git a/peripherals/eos-evm-ws-proxy/main.js b/peripherals/eos-evm-ws-proxy/main.js index f3f2133..a6632cc 100644 --- a/peripherals/eos-evm-ws-proxy/main.js +++ b/peripherals/eos-evm-ws-proxy/main.js @@ -1,5 +1,10 @@ const config = require('./config'); const {logger} = require('./logger'); const SubscriptionServer = require('./subscription-server'); -const server = new SubscriptionServer({...config, logger}); -server.start(); \ No newline at end of file + +try { + const server = new SubscriptionServer({...config, logger}); + server.start(); +} catch (error) { + logger.error(`main: ${error.message}`); +} \ No newline at end of file diff --git a/peripherals/eos-evm-ws-proxy/subscription-server.js b/peripherals/eos-evm-ws-proxy/subscription-server.js index 1100064..118333b 100644 --- a/peripherals/eos-evm-ws-proxy/subscription-server.js +++ b/peripherals/eos-evm-ws-proxy/subscription-server.js @@ -2,19 +2,21 @@ const EventEmitter = require('events'); const WebSocketHandler = require('./websocket-handler'); const BlockMonitor = require('./block-monitor'); const {Web3} = require('web3'); -const {bigint_replacer} = require('./utils'); - +const {bigint_replacer, load_json_file} = require('./utils'); class SubscriptionServer extends EventEmitter { - constructor({web3_rpc_endpoint, nodeos_rpc_endpoint, ws_listening_host, ws_listening_port, poll_interval, max_logs_subs_per_connection, max_minedtx_subs_per_connection, logger}) { + constructor({web3_rpc_endpoint, nodeos_rpc_endpoint, ws_listening_host, ws_listening_port, poll_interval, max_logs_subs_per_connection, max_minedtx_subs_per_connection, genesis_json, logger}) { super(); - this.block_monitor = new BlockMonitor({web3_rpc_endpoint, nodeos_rpc_endpoint, poll_interval, logger}); + const genesis = load_json_file(genesis_json); + + this.block_monitor = new BlockMonitor({web3_rpc_endpoint, nodeos_rpc_endpoint, poll_interval, genesis, logger}); this.web_socket_handler = new WebSocketHandler({ws_listening_host, ws_listening_port, web3_rpc_endpoint, logger}); this.max_logs_subs_per_connection = max_logs_subs_per_connection; this.max_minedtx_subs_per_connection = max_minedtx_subs_per_connection; this.web3 = new Web3(web3_rpc_endpoint); this.logger = logger + this.genesis = genesis; this.new_head_subs = new Map(); this.logs_subs = new Map(); @@ -55,21 +57,21 @@ class SubscriptionServer extends EventEmitter { } handle_block_removed({block}) { - this.logger.debug(`handle_block_removed: ${block.number}`); + this.logger.debug(`SubscriptionServer::handle_block_removed: ${block.number}`); if(this.logs_sent.has(block.number)) { this.logs_sent.delete(block.number); } } async handle_block_forked({block}) { - this.logger.debug(`handle_block_forked: ${block.number}`); + this.logger.debug(`SubscriptionServer::handle_block_forked: ${block.number}`); const events_sent_at_block = this.logs_sent.get(block.number); if (events_sent_at_block != undefined) { for(const event_sent of events_sent_at_block) { event_sent.msg.params.result.removed = true; event_sent.client.ws.send(JSON.stringify(event_sent.msg, bigint_replacer)); } - this.logger.debug(`REMOVING ${block.number}`); + this.logger.debug(`SubscriptionServer::handle_block_forked: REMOVING ${block.number}`); this.logs_sent.delete(block.number); } await this.process_mined_transactions_subscriptions(block, true); @@ -193,7 +195,7 @@ class SubscriptionServer extends EventEmitter { for (const [_, client] of this.new_head_subs) { if(ws === client.ws) { throw new Error('Already subscribed');} } - this.logger.debug(`Adding newHeads subscription ${subid}`); + this.logger.debug(`SubscriptionServer::handle_new_heads: Adding newHeads subscription ${subid}`); this.new_head_subs.set(subid, {ws}); this.check_start_monitor(); } @@ -206,7 +208,7 @@ class SubscriptionServer extends EventEmitter { if( total_logs_subs >= this.max_logs_subs_per_connection ) { throw new Error('Max logs subscriptions reached'); } - this.logger.debug(`Adding logs subscription ${subid}`); + this.logger.debug(`SubscriptionServer::handle_logs: Adding logs subscription ${subid}`); this.logs_subs.set(subid, {ws, filter}); this.check_start_monitor(); } @@ -219,7 +221,7 @@ class SubscriptionServer extends EventEmitter { if( total_minedtx_subs >= this.max_minedtx_subs_per_connection ) { throw new Error("Max minedTransactions subscriptions reached"); } - this.logger.debug(`Adding minedTransactions subscription ${subid}`); + this.logger.debug(`SubscriptionServer::handle_minedTransactions: Adding minedTransactions subscription ${subid}`); this.mined_transactions_subs.set(subid, {ws, filter}); this.check_start_monitor(); } @@ -238,7 +240,7 @@ class SubscriptionServer extends EventEmitter { throw new Error('Not found'); } - this.logger.debug(`Unsubscribing ${subid}`); + this.logger.debug(`SubscriptionServer::handle_unsubscribe: Unsubscribing ${subid}`); subscription_map.delete(subid); this.check_stop_monitor(); } @@ -247,21 +249,21 @@ class SubscriptionServer extends EventEmitter { for (const [subid, client] of this.new_head_subs) { if(ws === client.ws) { - this.logger.debug(`Removing new_head_subs ${subid}`); + this.logger.debug(`SubscriptionServer::handle_disconnect: Removing new_head_subs ${subid}`); this.new_head_subs.delete(subid); } } for (const [subid, client] of this.logs_subs) { if(ws === client.ws) { - this.logger.debug(`Removing logs_subs ${subid}`); + this.logger.debug(`SubscriptionServer::handle_disconnect: Removing logs_subs ${subid}`); this.logs_subs.delete(subid); } } for (const [subid, client] of this.mined_transactions_subs) { if(ws === client.ws) { - this.logger.debug(`Removing mined_transactions_subs ${subid}`); + this.logger.debug(`SubscriptionServer::handle_disconnect:Removing mined_transactions_subs ${subid}`); this.mined_transactions_subs.delete(subid); } } diff --git a/peripherals/eos-evm-ws-proxy/utils.js b/peripherals/eos-evm-ws-proxy/utils.js index a14953b..fb5242c 100644 --- a/peripherals/eos-evm-ws-proxy/utils.js +++ b/peripherals/eos-evm-ws-proxy/utils.js @@ -1,7 +1,18 @@ +const fs = require('fs'); + function is_plain_object(value) { return Object.prototype.toString.call(value) === '[object Object]'; } +function load_json_file(path) { + try { + const data = fs.readFileSync(path, 'utf8'); + return JSON.parse(data); + } catch (error) { + throw error; + } +} + function num_from_id(id) { if( id.startsWith('0x') ) { id = id.substring(2); @@ -16,9 +27,15 @@ function bigint_replacer(key, value) { return value; } +function convert_to_epoch(dateString) { + return Math.floor(new Date(dateString+"+0000").getTime() / 1000); +} + module.exports = { is_plain_object, num_from_id, - bigint_replacer + bigint_replacer, + convert_to_epoch, + load_json_file }; - \ No newline at end of file +