Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

websocket: use batch request in getLogs #107

Merged
merged 4 commits into from
Nov 15, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 63 additions & 11 deletions peripherals/eos-evm-ws-proxy/block-monitor.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ const axios = require('axios');
const {Web3} = require('web3');
const Deque = require('collections/deque');
const {num_from_id} = require('./utils');
const { clearTimeout } = require('timers');
class BlockMonitor extends EventEmitter {

constructor({ web3_rpc_endpoint, nodeos_rpc_endpoint, poll_interval, logger}) {
Expand Down Expand Up @@ -35,27 +36,76 @@ class BlockMonitor extends EventEmitter {
return this.reversible_blocks.peekBack();
}

append_new_block(block) {
append_new_block(block, logs) {
this.reversible_blocks.add(block);
this.emit('block_appended', {block});
this.emit('block_appended', {block, logs});
}

async getBlockWithLogs(number_) {
let number = Number(number_);

let id1 = "get_block_" + number;
let id2 = "get_logs_" + number;
let requests = [
{jsonrpc:"2.0",method:"eth_getBlockByNumber",params:["0x" + number.toString(16), true], id: id1},
{jsonrpc:"2.0",method:"eth_getLogs",params:[{fromBlock: "0x" + number.toString(16), toBlock: "0x" + number.toString(16)}], id: id2}
]
const results = await axios.post(this.web3_rpc_endpoint, requests);

if (!Array.isArray(results.data) || results.data.length != 2) {
throw new Error("invalid RPC response of [getBlock, GetPastLogs] batch request");
}
const block = results.data[0].result;
const logs = results.data[1].result;

//console.log("RPC batch result:" + JSON.stringify(block));
return {block, logs};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we put logs as a property of blocks?

blocks.logs = logs
return block

}

async poll() {
let res = null;
let next_block = null;
let next_logs = null;
try {
// need to be conservative, sometimes getLogs return empty result for head block
let head_block = await this.web3.eth.getBlock("latest", true);
let max_block_num = Number(head_block.number) - 1;

let last = this.reversible_blocks.peekBack();
if( last == undefined ) {
last = await this.web3.eth.getBlock("latest", true);
this.append_new_block(last);
if( last == undefined || last == null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this redundant?

res = await this.getBlockWithLogs(max_block_num);
last = res.block;
if (last != null) {
this.append_new_block(last, res.logs);
}
}

if (last != null && Number(last.number) + 1 < max_block_num) {
res = await this.getBlockWithLogs(Number(last.number) + 1);
next_block = res.block;
next_logs = res.logs;
} else {
next_block = null;
next_logs = null;
}

let next_block = await this.web3.eth.getBlock(last.number+BigInt(1), true);
let found_next_block = false;

while(last != null && next_block != null) {
found_next_block = true;
if(next_block.parentHash == last.hash) {
this.append_new_block(next_block);
this.append_new_block(next_block, next_logs);
last = next_block;
next_block = await this.web3.eth.getBlock(last.number+BigInt(1), true);

if (Number(last.number) + 1 < max_block_num) {
res = await this.getBlockWithLogs(Number(last.number) + 1);
next_block = res.block;
next_logs = res.logs;
} else {
next_block = null;
next_logs = null;
}

} else {
last = this.fork_last_block();
}
Expand All @@ -74,6 +124,7 @@ class BlockMonitor extends EventEmitter {
}

if(this.run == true) {
if (this.timer_id != null) clearTimeout(this.timer_id);
this.timer_id = setTimeout(() => this.poll(), this.poll_interval || 5000);
} else {
this.reversible_blocks.clear();
Expand All @@ -84,13 +135,14 @@ class BlockMonitor extends EventEmitter {
start() {
this.logger.info("BlockMonitor start");
this.run = true;
setTimeout(() => this.poll(), 0);
if (this.timer_id != null) clearTimeout(this.timer_id);
this.timer_id = setTimeout(() => this.poll(), 0);
}

stop() {
clearTimeout(this.timer_id);
this.logger.info("BlockMonitor stopping");
this.run = false;
this.run = false;
// don't clean up timeout. let poll() cleanup reversible_blocks
}

is_running() {
Expand Down
11 changes: 5 additions & 6 deletions peripherals/eos-evm-ws-proxy/subscription-server.js
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ class SubscriptionServer extends EventEmitter {
await this.handle_block_forked({block});
});

this.block_monitor.on('block_appended', async ({block}) => {
await this.handle_block_appended({block});
this.block_monitor.on('block_appended', async ({block, logs}) => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we use logs as a property of block we should revert some modifications here

await this.handle_block_appended({block, logs});
});
}

Expand Down Expand Up @@ -140,11 +140,10 @@ class SubscriptionServer extends EventEmitter {
client.ws.send(JSON.stringify(msg, bigint_replacer));
}

async process_logs_subscriptions(block) {
async process_logs_subscriptions(block, logs) {
// Process all `logs` subscriptions
// Get logs from the recently appended block
if(this.logs_subs.size > 0) {
const logs = await this.web3.eth.getPastLogs({fromBlock:block.number, toBlock:block.number});
this.logger.debug("LOG => ", JSON.stringify(logs, bigint_replacer));
for(const log of logs) {
for(const [subid, client] of this.logs_subs) {
Expand All @@ -171,10 +170,10 @@ class SubscriptionServer extends EventEmitter {
}
}

async handle_block_appended({block}) {
async handle_block_appended({block, logs}) {
this.logger.debug(`handle_block_appended: ${block.number}`);
await this.process_new_heads_subscriptions(block);
await this.process_logs_subscriptions(block);
await this.process_logs_subscriptions(block, logs);
await this.process_mined_transactions_subscriptions(block, false);
}

Expand Down
5 changes: 3 additions & 2 deletions tests/nodeos_eos_evm_ws_test_basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -682,7 +682,7 @@ def makeReservedEvmAddress(account):
recevied_msg=ws.recv()
res=json.loads(recevied_msg)
block_json=res["params"]["result"]
num=(int)(block_json["number"])
num=block_json["number"] # number can be decimal or hex (with 0x prefix)
hash=block_json["hash"]
parent_hash=block_json["parentHash"]
Utils.Print("received block {0} from websocket, hash={1}..., parent={2}...".format(num, hash[0:8], parent_hash[0:8]))
Expand Down Expand Up @@ -727,7 +727,8 @@ def makeReservedEvmAddress(account):
res=json.loads(recevied_msg)
Utils.Print("last ws msg is:" + recevied_msg)
if ("method" in res and res["method"] == "eth_subscription"):
assert(res["params"]["result"]["transaction"]["value"] == "93000000000000000") # 0.103 - 0.01(fee)=0.093
assert(res["params"]["result"]["transaction"]["value"] == "93000000000000000" or \
res["params"]["result"]["transaction"]["value"] == "0x14a6701dc1c8000") # 0.103 - 0.01(fee)=0.093
break
try_count = try_count - 1
if (try_count == 0):
Expand Down