diff --git a/peripherals/eos-evm-ws-proxy/block-monitor.js b/peripherals/eos-evm-ws-proxy/block-monitor.js index 8172366..6a6e24c 100644 --- a/peripherals/eos-evm-ws-proxy/block-monitor.js +++ b/peripherals/eos-evm-ws-proxy/block-monitor.js @@ -3,6 +3,7 @@ const axios = require('axios'); const {Web3} = require('web3'); const Deque = require('collections/deque'); const {num_from_id} = require('./utils'); +const { clearTimeout } = require('timers'); class BlockMonitor extends EventEmitter { constructor({ web3_rpc_endpoint, nodeos_rpc_endpoint, poll_interval, logger}) { @@ -40,22 +41,63 @@ class BlockMonitor extends EventEmitter { this.emit('block_appended', {block}); } + async getBlockWithLogs(number_) { + let number = Number(number_); + + let id1 = "get_block_" + number; + let id2 = "get_logs_" + number; + let requests = [ + {jsonrpc:"2.0",method:"eth_getBlockByNumber",params:["0x" + number.toString(16), true], id: id1}, + {jsonrpc:"2.0",method:"eth_getLogs",params:[{fromBlock: "0x" + number.toString(16), toBlock: "0x" + number.toString(16)}], id: id2} + ] + const results = await axios.post(this.web3_rpc_endpoint, requests); + + if (!Array.isArray(results.data) || results.data.length != 2) { + throw new Error("invalid RPC response of [getBlock, GetPastLogs] batch request"); + } + const block = results.data[0].result; + const logs = results.data[1].result; + + block.logs = logs; + //console.log("RPC batch result:" + JSON.stringify(block)); + return block; + } + async poll() { + let next_block = null; try { + // need to be conservative, sometimes getLogs return empty result for head block + let head_block = await this.web3.eth.getBlock("latest", true); + let max_block_num = Number(head_block.number) - 1; + let last = this.reversible_blocks.peekBack(); if( last == undefined ) { - last = await this.web3.eth.getBlock("latest", true); - this.append_new_block(last); + last = await this.getBlockWithLogs(max_block_num); + if (last != null) { + this.append_new_block(last); + } + } + + if (last != null && Number(last.number) + 1 < max_block_num) { + next_block = await this.getBlockWithLogs(Number(last.number) + 1); + } else { + next_block = null; } - let next_block = await this.web3.eth.getBlock(last.number+BigInt(1), true); let found_next_block = false; + while(last != null && next_block != null) { found_next_block = true; if(next_block.parentHash == last.hash) { this.append_new_block(next_block); last = next_block; - next_block = await this.web3.eth.getBlock(last.number+BigInt(1), true); + + if (Number(last.number) + 1 < max_block_num) { + next_block = await this.getBlockWithLogs(Number(last.number) + 1); + } else { + next_block = null; + } + } else { last = this.fork_last_block(); } @@ -74,6 +116,7 @@ class BlockMonitor extends EventEmitter { } if(this.run == true) { + if (this.timer_id != null) clearTimeout(this.timer_id); this.timer_id = setTimeout(() => this.poll(), this.poll_interval || 5000); } else { this.reversible_blocks.clear(); @@ -84,13 +127,14 @@ class BlockMonitor extends EventEmitter { start() { this.logger.info("BlockMonitor start"); this.run = true; - setTimeout(() => this.poll(), 0); + if (this.timer_id != null) clearTimeout(this.timer_id); + this.timer_id = setTimeout(() => this.poll(), 0); } stop() { - clearTimeout(this.timer_id); this.logger.info("BlockMonitor stopping"); - this.run = false; + this.run = false; + // don't clean up timeout. let poll() cleanup reversible_blocks } is_running() { diff --git a/peripherals/eos-evm-ws-proxy/subscription-server.js b/peripherals/eos-evm-ws-proxy/subscription-server.js index a442112..1100064 100644 --- a/peripherals/eos-evm-ws-proxy/subscription-server.js +++ b/peripherals/eos-evm-ws-proxy/subscription-server.js @@ -144,9 +144,8 @@ class SubscriptionServer extends EventEmitter { // Process all `logs` subscriptions // Get logs from the recently appended block if(this.logs_subs.size > 0) { - const logs = await this.web3.eth.getPastLogs({fromBlock:block.number, toBlock:block.number}); - this.logger.debug("LOG => ", JSON.stringify(logs, bigint_replacer)); - for(const log of logs) { + this.logger.debug("LOG => ", JSON.stringify(block.logs, bigint_replacer)); + for(const log of block.logs) { for(const [subid, client] of this.logs_subs) { if(this.logs_filter_match(client.filter, log)) { this.send_logs_response_and_save(block, client, subid, log); diff --git a/tests/nodeos_eos_evm_ws_test_basic.py b/tests/nodeos_eos_evm_ws_test_basic.py index 737c594..cbde096 100755 --- a/tests/nodeos_eos_evm_ws_test_basic.py +++ b/tests/nodeos_eos_evm_ws_test_basic.py @@ -681,8 +681,10 @@ def makeReservedEvmAddress(account): time.sleep(0.5) recevied_msg=ws.recv() res=json.loads(recevied_msg) + if block_count == 0: + Utils.Print("recevied block message from websocket:" + recevied_msg) block_json=res["params"]["result"] - num=(int)(block_json["number"]) + num=block_json["number"] # number can be decimal or hex (with 0x prefix) hash=block_json["hash"] parent_hash=block_json["parentHash"] Utils.Print("received block {0} from websocket, hash={1}..., parent={2}...".format(num, hash[0:8], parent_hash[0:8])) @@ -727,7 +729,8 @@ def makeReservedEvmAddress(account): res=json.loads(recevied_msg) Utils.Print("last ws msg is:" + recevied_msg) if ("method" in res and res["method"] == "eth_subscription"): - assert(res["params"]["result"]["transaction"]["value"] == "93000000000000000") # 0.103 - 0.01(fee)=0.093 + assert(res["params"]["result"]["transaction"]["value"] == "93000000000000000" or \ + res["params"]["result"]["transaction"]["value"] == "0x14a6701dc1c8000") # 0.103 - 0.01(fee)=0.093 break try_count = try_count - 1 if (try_count == 0):