Skip to content

Commit

Permalink
eos-evm-ws-proxy blockmonitor: Fix fork handling
Browse files Browse the repository at this point in the history
  • Loading branch information
elmato committed Nov 16, 2023
1 parent f75ce6a commit c08e1ca
Show file tree
Hide file tree
Showing 5 changed files with 108 additions and 55 deletions.
99 changes: 63 additions & 36 deletions peripherals/eos-evm-ws-proxy/block-monitor.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,43 +2,63 @@ const EventEmitter = require('events');
const axios = require('axios');
const {Web3} = require('web3');
const Deque = require('collections/deque');
const {num_from_id} = require('./utils');
const {num_from_id, convert_to_epoch} = require('./utils');
const { clearTimeout } = require('timers');
class BlockMonitor extends EventEmitter {

constructor({ web3_rpc_endpoint, nodeos_rpc_endpoint, poll_interval, logger}) {
constructor({ web3_rpc_endpoint, nodeos_rpc_endpoint, poll_interval, genesis, logger}) {
super();
this.web3_rpc_endpoint = web3_rpc_endpoint;
this.nodeos_rpc_endpoint = nodeos_rpc_endpoint;
this.poll_interval = poll_interval;
this.web3 = new Web3(web3_rpc_endpoint);
this.logger = logger;
this.genesis = genesis;

if(this.genesis == undefined || this.genesis.timestamp == undefined) throw ("invalid genesis timestamp");
this.genesis_timestamp = parseInt(this.genesis.timestamp, 16);
this.logger.debug(`Using genesis timestamp: ${this.genesis_timestamp}`);

this.reversible_blocks = new Deque();
this.run = false;
this.timer_id = null;
}

async get_eos_lib() {
const response = await axios.post(this.nodeos_rpc_endpoint+'/v1/chain/get_info', {});
return response.data.last_irreversible_block_num;
async get_evm_lib() {
let response = await axios.post(this.nodeos_rpc_endpoint+'/v1/chain/get_info', {});
response = await axios.post(this.nodeos_rpc_endpoint+'/v1/chain/get_block', {block_num_or_id:response.data.last_irreversible_block_num});
let lib = this.timestamp_to_evm_block_num(convert_to_epoch(response.data.timestamp));
this.logger.debug(`BlockMonitor::get_evm_lib ${lib}`);
return lib;
}

timestamp_to_evm_block_num(timestamp) {
const block_interval = 1;
if (timestamp < this.genesis_timestamp) {
return 0;
}
return 1 + Math.floor((timestamp - this.genesis_timestamp) / block_interval);
}


remove_front_block() {
const block = this.reversible_blocks.shift();
this.logger.debug(`BlockMonitor::remove_front_block ${block.number}`);
this.emit('block_removed', {block});
}

fork_last_block() {
const block = this.reversible_blocks.pop();
this.logger.debug(`FORK_LAST_BLOCK ${block}`);
this.logger.debug(`BlockMonitor::fork_last_block ${block.number}`);
this.emit('block_forked', {block});
return this.reversible_blocks.peekBack();
}

append_new_block(block) {
this.reversible_blocks.add(block);
this.logger.debug(`BlockMonitor::append_new_block ${block.number} ${block.hash}`);
this.emit('block_appended', {block});
return block;
}

async getBlockWithLogs(number_) {
Expand All @@ -63,78 +83,85 @@ class BlockMonitor extends EventEmitter {
return block;
}

async get_next_block(block) {
// need to be conservative, sometimes getLogs return empty result for head block
let head_block = await this.web3.eth.getBlock("latest", true);
if( head_block == null ) return null;

if(block == null)
return await this.getBlockWithLogs(Number(head_block.number) - 1);

let next_block_num = Number(block.number) + 1;

let max_block_num = Number(head_block.number) - 1;
if (next_block_num >= max_block_num) {
return null;
}

return await this.getBlockWithLogs(next_block_num);
}

async poll() {
let next_block = null;
try {
// need to be conservative, sometimes getLogs return empty result for head block
let head_block = await this.web3.eth.getBlock("latest", true);
let max_block_num = Number(head_block.number) - 1;

let last = this.reversible_blocks.peekBack();
if( last == undefined ) {
last = await this.getBlockWithLogs(max_block_num);
this.logger.debug("BlockMonitor::poll last not defined");
last = await this.get_next_block()
if (last != null) {
this.logger.debug(`BlockMonitor::poll Obtained ${last.number}`);
this.append_new_block(last);
} else {
throw new Error("BlockMonitor::poll Unable to get block");
}
}

if (last != null && Number(last.number) + 1 < max_block_num) {
next_block = await this.getBlockWithLogs(Number(last.number) + 1);
} else {
next_block = null;
}

let next_block = await this.get_next_block(last);
let found_next_block = false;

while(last != null && next_block != null) {
found_next_block = true;
if(next_block.parentHash == last.hash) {
this.append_new_block(next_block);
last = next_block;

if (Number(last.number) + 1 < max_block_num) {
next_block = await this.getBlockWithLogs(Number(last.number) + 1);
} else {
next_block = null;
}

last = this.append_new_block(next_block);
next_block = await this.get_next_block(last);
} else {
this.logger.debug(`BlockMonitor::poll next: ${next_block.number} ${next_block.parentHash} != last: ${last.number} ${last.hash}`);
last = this.fork_last_block();
next_block = await this.get_next_block(last);
}
}

if( found_next_block == true ) {
const eos_lib = await this.get_eos_lib();
while(this.reversible_blocks.length > 0 && num_from_id(this.reversible_blocks.peek().mixHash) <= eos_lib) {
this.logger.debug(`eoslib: ${eos_lib} ${num_from_id(this.reversible_blocks.peek().mixHash)} ${this.reversible_blocks.peek().number} ${this.reversible_blocks.peek().mixHash}`);
const evm_lib = await this.get_evm_lib();
while(this.reversible_blocks.length > 0 && this.reversible_blocks.peek().number < evm_lib) {
this.remove_front_block();
}
}

} catch (error) {
this.logger.error(error.message);
this.logger.error(`BlockMonitor::poll => ERR: ${error.message}`);
}

if(this.run == true) {
if (this.timer_id != null) clearTimeout(this.timer_id);
this.timer_id = setTimeout(() => this.poll(), this.poll_interval || 5000);
} else {
this.reversible_blocks.clear();
this.logger.info("BlockMonitor stopped");
this.logger.info("BlockMonitor::poll => Stopped");
if (this.timer_id != null) clearTimeout(this.timer_id);
}
}

start() {
this.logger.info("BlockMonitor start");
if( this.run == true ) return;
this.logger.info("BlockMonitor::start => BlockMonitor starting");
this.run = true;
if (this.timer_id != null) clearTimeout(this.timer_id);
this.timer_id = setTimeout(() => this.poll(), 0);
}

stop() {
this.logger.info("BlockMonitor stopping");
this.run = false;
// don't clean up timeout. let poll() cleanup reversible_blocks
this.logger.info("BlockMonitor::stop => BlockMonitor stopping");
this.run = false;
}

is_running() {
Expand Down
4 changes: 3 additions & 1 deletion peripherals/eos-evm-ws-proxy/config.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ const poll_interval = parseInt(process.env.POLL_INTERVAL, 10) || 1000;
const max_logs_subs_per_connection = parseInt(process.env.MAX_LOGS_SUBS_PER_CONNECTION, 10) || 1;
const max_minedtx_subs_per_connection = parseInt(process.env.MAX_MINEDTX_SUBS_PER_CONNECTION, 10) || 1;
const log_level = process.env.LOG_LEVEL || 'info';
const genesis_json = process.env.GENESIS_JSON || 'eos-evm-genesis.json';

module.exports = {
ws_listening_port,
Expand All @@ -16,5 +17,6 @@ module.exports = {
poll_interval,
max_logs_subs_per_connection,
max_minedtx_subs_per_connection,
log_level
log_level,
genesis_json
};
9 changes: 7 additions & 2 deletions peripherals/eos-evm-ws-proxy/main.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
const config = require('./config');
const {logger} = require('./logger');
const SubscriptionServer = require('./subscription-server');
const server = new SubscriptionServer({...config, logger});
server.start();

try {
const server = new SubscriptionServer({...config, logger});
server.start();
} catch (error) {
logger.error(`main: ${error.message}`);
}
30 changes: 16 additions & 14 deletions peripherals/eos-evm-ws-proxy/subscription-server.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,21 @@ const EventEmitter = require('events');
const WebSocketHandler = require('./websocket-handler');
const BlockMonitor = require('./block-monitor');
const {Web3} = require('web3');
const {bigint_replacer} = require('./utils');

const {bigint_replacer, load_json_file} = require('./utils');
class SubscriptionServer extends EventEmitter {

constructor({web3_rpc_endpoint, nodeos_rpc_endpoint, ws_listening_host, ws_listening_port, poll_interval, max_logs_subs_per_connection, max_minedtx_subs_per_connection, logger}) {
constructor({web3_rpc_endpoint, nodeos_rpc_endpoint, ws_listening_host, ws_listening_port, poll_interval, max_logs_subs_per_connection, max_minedtx_subs_per_connection, genesis_json, logger}) {
super();

this.block_monitor = new BlockMonitor({web3_rpc_endpoint, nodeos_rpc_endpoint, poll_interval, logger});
const genesis = load_json_file(genesis_json);

this.block_monitor = new BlockMonitor({web3_rpc_endpoint, nodeos_rpc_endpoint, poll_interval, genesis, logger});
this.web_socket_handler = new WebSocketHandler({ws_listening_host, ws_listening_port, web3_rpc_endpoint, logger});
this.max_logs_subs_per_connection = max_logs_subs_per_connection;
this.max_minedtx_subs_per_connection = max_minedtx_subs_per_connection;
this.web3 = new Web3(web3_rpc_endpoint);
this.logger = logger
this.genesis = genesis;

this.new_head_subs = new Map();
this.logs_subs = new Map();
Expand Down Expand Up @@ -55,21 +57,21 @@ class SubscriptionServer extends EventEmitter {
}

handle_block_removed({block}) {
this.logger.debug(`handle_block_removed: ${block.number}`);
this.logger.debug(`SubscriptionServer::handle_block_removed: ${block.number}`);
if(this.logs_sent.has(block.number)) {
this.logs_sent.delete(block.number);
}
}

async handle_block_forked({block}) {
this.logger.debug(`handle_block_forked: ${block.number}`);
this.logger.debug(`SubscriptionServer::handle_block_forked: ${block.number}`);
const events_sent_at_block = this.logs_sent.get(block.number);
if (events_sent_at_block != undefined) {
for(const event_sent of events_sent_at_block) {
event_sent.msg.params.result.removed = true;
event_sent.client.ws.send(JSON.stringify(event_sent.msg, bigint_replacer));
}
this.logger.debug(`REMOVING ${block.number}`);
this.logger.debug(`SubscriptionServer::handle_block_forked: REMOVING ${block.number}`);
this.logs_sent.delete(block.number);
}
await this.process_mined_transactions_subscriptions(block, true);
Expand Down Expand Up @@ -193,7 +195,7 @@ class SubscriptionServer extends EventEmitter {
for (const [_, client] of this.new_head_subs) {
if(ws === client.ws) { throw new Error('Already subscribed');}
}
this.logger.debug(`Adding newHeads subscription ${subid}`);
this.logger.debug(`SubscriptionServer::handle_new_heads: Adding newHeads subscription ${subid}`);
this.new_head_subs.set(subid, {ws});
this.check_start_monitor();
}
Expand All @@ -206,7 +208,7 @@ class SubscriptionServer extends EventEmitter {
if( total_logs_subs >= this.max_logs_subs_per_connection ) {
throw new Error('Max logs subscriptions reached');
}
this.logger.debug(`Adding logs subscription ${subid}`);
this.logger.debug(`SubscriptionServer::handle_logs: Adding logs subscription ${subid}`);
this.logs_subs.set(subid, {ws, filter});
this.check_start_monitor();
}
Expand All @@ -219,7 +221,7 @@ class SubscriptionServer extends EventEmitter {
if( total_minedtx_subs >= this.max_minedtx_subs_per_connection ) {
throw new Error("Max minedTransactions subscriptions reached");
}
this.logger.debug(`Adding minedTransactions subscription ${subid}`);
this.logger.debug(`SubscriptionServer::handle_minedTransactions: Adding minedTransactions subscription ${subid}`);
this.mined_transactions_subs.set(subid, {ws, filter});
this.check_start_monitor();
}
Expand All @@ -238,7 +240,7 @@ class SubscriptionServer extends EventEmitter {
throw new Error('Not found');
}

this.logger.debug(`Unsubscribing ${subid}`);
this.logger.debug(`SubscriptionServer::handle_unsubscribe: Unsubscribing ${subid}`);
subscription_map.delete(subid);
this.check_stop_monitor();
}
Expand All @@ -247,21 +249,21 @@ class SubscriptionServer extends EventEmitter {

for (const [subid, client] of this.new_head_subs) {
if(ws === client.ws) {
this.logger.debug(`Removing new_head_subs ${subid}`);
this.logger.debug(`SubscriptionServer::handle_disconnect: Removing new_head_subs ${subid}`);
this.new_head_subs.delete(subid);
}
}

for (const [subid, client] of this.logs_subs) {
if(ws === client.ws) {
this.logger.debug(`Removing logs_subs ${subid}`);
this.logger.debug(`SubscriptionServer::handle_disconnect: Removing logs_subs ${subid}`);
this.logs_subs.delete(subid);
}
}

for (const [subid, client] of this.mined_transactions_subs) {
if(ws === client.ws) {
this.logger.debug(`Removing mined_transactions_subs ${subid}`);
this.logger.debug(`SubscriptionServer::handle_disconnect:Removing mined_transactions_subs ${subid}`);
this.mined_transactions_subs.delete(subid);
}
}
Expand Down
21 changes: 19 additions & 2 deletions peripherals/eos-evm-ws-proxy/utils.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,18 @@
const fs = require('fs');

function is_plain_object(value) {
return Object.prototype.toString.call(value) === '[object Object]';
}

function load_json_file(path) {
try {
const data = fs.readFileSync(path, 'utf8');
return JSON.parse(data);
} catch (error) {
throw error;
}
}

function num_from_id(id) {
if( id.startsWith('0x') ) {
id = id.substring(2);
Expand All @@ -16,9 +27,15 @@ function bigint_replacer(key, value) {
return value;
}

function convert_to_epoch(dateString) {
return Math.floor(new Date(dateString+"+0000").getTime() / 1000);
}

module.exports = {
is_plain_object,
num_from_id,
bigint_replacer
bigint_replacer,
convert_to_epoch,
load_json_file
};


0 comments on commit c08e1ca

Please sign in to comment.