From 6a701e300a1779023d7e6eaee70350212be6838b Mon Sep 17 00:00:00 2001 From: kayan Date: Fri, 10 Nov 2023 17:08:22 +0800 Subject: [PATCH 1/3] websocket: use batch request in getLogs --- peripherals/eos-evm-ws-proxy/block-monitor.js | 74 ++++++++++++++++--- .../eos-evm-ws-proxy/subscription-server.js | 11 ++- 2 files changed, 68 insertions(+), 17 deletions(-) diff --git a/peripherals/eos-evm-ws-proxy/block-monitor.js b/peripherals/eos-evm-ws-proxy/block-monitor.js index 8172366..af0a5fc 100644 --- a/peripherals/eos-evm-ws-proxy/block-monitor.js +++ b/peripherals/eos-evm-ws-proxy/block-monitor.js @@ -3,6 +3,7 @@ const axios = require('axios'); const {Web3} = require('web3'); const Deque = require('collections/deque'); const {num_from_id} = require('./utils'); +const { clearTimeout } = require('timers'); class BlockMonitor extends EventEmitter { constructor({ web3_rpc_endpoint, nodeos_rpc_endpoint, poll_interval, logger}) { @@ -35,27 +36,76 @@ class BlockMonitor extends EventEmitter { return this.reversible_blocks.peekBack(); } - append_new_block(block) { + append_new_block(block, logs) { this.reversible_blocks.add(block); - this.emit('block_appended', {block}); + this.emit('block_appended', {block, logs}); + } + + async getBlockWithLogs(number_) { + let number = Number(number_); + + let id1 = "get_block_" + number; + let id2 = "get_logs_" + number; + let requests = [ + {jsonrpc:"2.0",method:"eth_getBlockByNumber",params:["0x" + number.toString(16), false], id: id1}, + {jsonrpc:"2.0",method:"eth_getLogs",params:[{fromBlock: "0x" + number.toString(16), toBlock: "0x" + number.toString(16)}], id: id2} + ] + const results = await axios.post(this.web3_rpc_endpoint, requests); + + if (!Array.isArray(results.data) || results.data.length != 2) { + throw new Error("invalid RPC response of [getBlock, GetPastLogs] batch request"); + } + const block = results.data[0].result; + const logs = results.data[1].result; + + //console.log("RPC batch result:" + JSON.stringify(block)); + return {block, logs}; } async poll() { + let res = null; + let next_block = null; + let next_logs = null; try { + // need to be conservative, sometimes getLogs return empty result for head block + let head_block = await this.web3.eth.getBlock("latest", true); + let max_block_num = Number(head_block.number) - 1; + let last = this.reversible_blocks.peekBack(); - if( last == undefined ) { - last = await this.web3.eth.getBlock("latest", true); - this.append_new_block(last); + if( last == undefined || last == null) { + res = await this.getBlockWithLogs(max_block_num); + last = res.block; + if (last != null) { + this.append_new_block(last, res.logs); + } + } + + if (last != null && Number(last.number) + 1 < max_block_num) { + res = await this.getBlockWithLogs(Number(last.number) + 1); + next_block = res.block; + next_logs = res.logs; + } else { + next_block = null; + next_logs = null; } - let next_block = await this.web3.eth.getBlock(last.number+BigInt(1), true); let found_next_block = false; + while(last != null && next_block != null) { found_next_block = true; if(next_block.parentHash == last.hash) { - this.append_new_block(next_block); + this.append_new_block(next_block, next_logs); last = next_block; - next_block = await this.web3.eth.getBlock(last.number+BigInt(1), true); + + if (Number(last.number) + 1 < max_block_num) { + res = await this.getBlockWithLogs(Number(last.number) + 1); + next_block = res.block; + next_logs = res.logs; + } else { + next_block = null; + next_logs = null; + } + } else { last = this.fork_last_block(); } @@ -74,6 +124,7 @@ class BlockMonitor extends EventEmitter { } if(this.run == true) { + if (this.timer_id != null) clearTimeout(this.timer_id); this.timer_id = setTimeout(() => this.poll(), this.poll_interval || 5000); } else { this.reversible_blocks.clear(); @@ -84,13 +135,14 @@ class BlockMonitor extends EventEmitter { start() { this.logger.info("BlockMonitor start"); this.run = true; - setTimeout(() => this.poll(), 0); + if (this.timer_id != null) clearTimeout(this.timer_id); + this.timer_id = setTimeout(() => this.poll(), 0); } stop() { - clearTimeout(this.timer_id); this.logger.info("BlockMonitor stopping"); - this.run = false; + this.run = false; + // don't clean up timeout. let poll() cleanup reversible_blocks } is_running() { diff --git a/peripherals/eos-evm-ws-proxy/subscription-server.js b/peripherals/eos-evm-ws-proxy/subscription-server.js index a442112..b8348d8 100644 --- a/peripherals/eos-evm-ws-proxy/subscription-server.js +++ b/peripherals/eos-evm-ws-proxy/subscription-server.js @@ -49,8 +49,8 @@ class SubscriptionServer extends EventEmitter { await this.handle_block_forked({block}); }); - this.block_monitor.on('block_appended', async ({block}) => { - await this.handle_block_appended({block}); + this.block_monitor.on('block_appended', async ({block, logs}) => { + await this.handle_block_appended({block, logs}); }); } @@ -140,11 +140,10 @@ class SubscriptionServer extends EventEmitter { client.ws.send(JSON.stringify(msg, bigint_replacer)); } - async process_logs_subscriptions(block) { + async process_logs_subscriptions(block, logs) { // Process all `logs` subscriptions // Get logs from the recently appended block if(this.logs_subs.size > 0) { - const logs = await this.web3.eth.getPastLogs({fromBlock:block.number, toBlock:block.number}); this.logger.debug("LOG => ", JSON.stringify(logs, bigint_replacer)); for(const log of logs) { for(const [subid, client] of this.logs_subs) { @@ -171,10 +170,10 @@ class SubscriptionServer extends EventEmitter { } } - async handle_block_appended({block}) { + async handle_block_appended({block, logs}) { this.logger.debug(`handle_block_appended: ${block.number}`); await this.process_new_heads_subscriptions(block); - await this.process_logs_subscriptions(block); + await this.process_logs_subscriptions(block, logs); await this.process_mined_transactions_subscriptions(block, false); } From f1c4e5ca3504c1ac25ccbf1821f8c53b6430c744 Mon Sep 17 00:00:00 2001 From: kayan Date: Mon, 13 Nov 2023 16:57:40 +0800 Subject: [PATCH 2/3] fix test error --- peripherals/eos-evm-ws-proxy/block-monitor.js | 2 +- tests/nodeos_eos_evm_ws_test_basic.py | 5 +++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/peripherals/eos-evm-ws-proxy/block-monitor.js b/peripherals/eos-evm-ws-proxy/block-monitor.js index af0a5fc..ed7a4cf 100644 --- a/peripherals/eos-evm-ws-proxy/block-monitor.js +++ b/peripherals/eos-evm-ws-proxy/block-monitor.js @@ -47,7 +47,7 @@ class BlockMonitor extends EventEmitter { let id1 = "get_block_" + number; let id2 = "get_logs_" + number; let requests = [ - {jsonrpc:"2.0",method:"eth_getBlockByNumber",params:["0x" + number.toString(16), false], id: id1}, + {jsonrpc:"2.0",method:"eth_getBlockByNumber",params:["0x" + number.toString(16), true], id: id1}, {jsonrpc:"2.0",method:"eth_getLogs",params:[{fromBlock: "0x" + number.toString(16), toBlock: "0x" + number.toString(16)}], id: id2} ] const results = await axios.post(this.web3_rpc_endpoint, requests); diff --git a/tests/nodeos_eos_evm_ws_test_basic.py b/tests/nodeos_eos_evm_ws_test_basic.py index 737c594..5b9a456 100755 --- a/tests/nodeos_eos_evm_ws_test_basic.py +++ b/tests/nodeos_eos_evm_ws_test_basic.py @@ -682,7 +682,7 @@ def makeReservedEvmAddress(account): recevied_msg=ws.recv() res=json.loads(recevied_msg) block_json=res["params"]["result"] - num=(int)(block_json["number"]) + num=block_json["number"] # number can be decimal or hex (with 0x prefix) hash=block_json["hash"] parent_hash=block_json["parentHash"] Utils.Print("received block {0} from websocket, hash={1}..., parent={2}...".format(num, hash[0:8], parent_hash[0:8])) @@ -727,7 +727,8 @@ def makeReservedEvmAddress(account): res=json.loads(recevied_msg) Utils.Print("last ws msg is:" + recevied_msg) if ("method" in res and res["method"] == "eth_subscription"): - assert(res["params"]["result"]["transaction"]["value"] == "93000000000000000") # 0.103 - 0.01(fee)=0.093 + assert(res["params"]["result"]["transaction"]["value"] == "93000000000000000" or \ + res["params"]["result"]["transaction"]["value"] == "0x14a6701dc1c8000") # 0.103 - 0.01(fee)=0.093 break try_count = try_count - 1 if (try_count == 0): From a4db0e132f20f652c165881e5b6146026966c10d Mon Sep 17 00:00:00 2001 From: kayan Date: Wed, 15 Nov 2023 17:47:13 +0800 Subject: [PATCH 3/3] put logs as property of block --- peripherals/eos-evm-ws-proxy/block-monitor.js | 28 +++++++------------ .../eos-evm-ws-proxy/subscription-server.js | 14 +++++----- tests/nodeos_eos_evm_ws_test_basic.py | 2 ++ 3 files changed, 19 insertions(+), 25 deletions(-) diff --git a/peripherals/eos-evm-ws-proxy/block-monitor.js b/peripherals/eos-evm-ws-proxy/block-monitor.js index ed7a4cf..6a6e24c 100644 --- a/peripherals/eos-evm-ws-proxy/block-monitor.js +++ b/peripherals/eos-evm-ws-proxy/block-monitor.js @@ -36,9 +36,9 @@ class BlockMonitor extends EventEmitter { return this.reversible_blocks.peekBack(); } - append_new_block(block, logs) { + append_new_block(block) { this.reversible_blocks.add(block); - this.emit('block_appended', {block, logs}); + this.emit('block_appended', {block}); } async getBlockWithLogs(number_) { @@ -58,35 +58,30 @@ class BlockMonitor extends EventEmitter { const block = results.data[0].result; const logs = results.data[1].result; + block.logs = logs; //console.log("RPC batch result:" + JSON.stringify(block)); - return {block, logs}; + return block; } async poll() { - let res = null; let next_block = null; - let next_logs = null; try { // need to be conservative, sometimes getLogs return empty result for head block let head_block = await this.web3.eth.getBlock("latest", true); let max_block_num = Number(head_block.number) - 1; let last = this.reversible_blocks.peekBack(); - if( last == undefined || last == null) { - res = await this.getBlockWithLogs(max_block_num); - last = res.block; + if( last == undefined ) { + last = await this.getBlockWithLogs(max_block_num); if (last != null) { - this.append_new_block(last, res.logs); + this.append_new_block(last); } } if (last != null && Number(last.number) + 1 < max_block_num) { - res = await this.getBlockWithLogs(Number(last.number) + 1); - next_block = res.block; - next_logs = res.logs; + next_block = await this.getBlockWithLogs(Number(last.number) + 1); } else { next_block = null; - next_logs = null; } let found_next_block = false; @@ -94,16 +89,13 @@ class BlockMonitor extends EventEmitter { while(last != null && next_block != null) { found_next_block = true; if(next_block.parentHash == last.hash) { - this.append_new_block(next_block, next_logs); + this.append_new_block(next_block); last = next_block; if (Number(last.number) + 1 < max_block_num) { - res = await this.getBlockWithLogs(Number(last.number) + 1); - next_block = res.block; - next_logs = res.logs; + next_block = await this.getBlockWithLogs(Number(last.number) + 1); } else { next_block = null; - next_logs = null; } } else { diff --git a/peripherals/eos-evm-ws-proxy/subscription-server.js b/peripherals/eos-evm-ws-proxy/subscription-server.js index b8348d8..1100064 100644 --- a/peripherals/eos-evm-ws-proxy/subscription-server.js +++ b/peripherals/eos-evm-ws-proxy/subscription-server.js @@ -49,8 +49,8 @@ class SubscriptionServer extends EventEmitter { await this.handle_block_forked({block}); }); - this.block_monitor.on('block_appended', async ({block, logs}) => { - await this.handle_block_appended({block, logs}); + this.block_monitor.on('block_appended', async ({block}) => { + await this.handle_block_appended({block}); }); } @@ -140,12 +140,12 @@ class SubscriptionServer extends EventEmitter { client.ws.send(JSON.stringify(msg, bigint_replacer)); } - async process_logs_subscriptions(block, logs) { + async process_logs_subscriptions(block) { // Process all `logs` subscriptions // Get logs from the recently appended block if(this.logs_subs.size > 0) { - this.logger.debug("LOG => ", JSON.stringify(logs, bigint_replacer)); - for(const log of logs) { + this.logger.debug("LOG => ", JSON.stringify(block.logs, bigint_replacer)); + for(const log of block.logs) { for(const [subid, client] of this.logs_subs) { if(this.logs_filter_match(client.filter, log)) { this.send_logs_response_and_save(block, client, subid, log); @@ -170,10 +170,10 @@ class SubscriptionServer extends EventEmitter { } } - async handle_block_appended({block, logs}) { + async handle_block_appended({block}) { this.logger.debug(`handle_block_appended: ${block.number}`); await this.process_new_heads_subscriptions(block); - await this.process_logs_subscriptions(block, logs); + await this.process_logs_subscriptions(block); await this.process_mined_transactions_subscriptions(block, false); } diff --git a/tests/nodeos_eos_evm_ws_test_basic.py b/tests/nodeos_eos_evm_ws_test_basic.py index 5b9a456..cbde096 100755 --- a/tests/nodeos_eos_evm_ws_test_basic.py +++ b/tests/nodeos_eos_evm_ws_test_basic.py @@ -681,6 +681,8 @@ def makeReservedEvmAddress(account): time.sleep(0.5) recevied_msg=ws.recv() res=json.loads(recevied_msg) + if block_count == 0: + Utils.Print("recevied block message from websocket:" + recevied_msg) block_json=res["params"]["result"] num=block_json["number"] # number can be decimal or hex (with 0x prefix) hash=block_json["hash"]