Skip to content

Commit

Permalink
fix(core): delete reputation manager in process_block to avoid forks
Browse files Browse the repository at this point in the history
  • Loading branch information
lrubiorod committed Feb 13, 2019
1 parent a3d79c5 commit 9a7774f
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 114 deletions.
1 change: 0 additions & 1 deletion core/src/actors/chain_manager/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -465,7 +465,6 @@ impl Handler<PeerLastEpoch> for ChainManager {
.map(|x| x.highest_block_checkpoint.checkpoint)
.unwrap_or(0)
{
info!("Blockchain synced");
self.synced = true;
} else {
warn!("Mine flag disabled");
Expand Down
187 changes: 80 additions & 107 deletions core/src/actors/chain_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ use std::collections::HashMap;
use std::time::Duration;

use actix::{
ActorFuture, AsyncContext, Context, ContextFutureSpawner, MailboxError, Supervised, System,
SystemService, WrapFuture,
ActorFuture, AsyncContext, Context, ContextFutureSpawner, Supervised, System, SystemService,
WrapFuture,
};
use log::{debug, error, info, warn};

Expand All @@ -44,7 +44,6 @@ use witnet_util::error::WitnetError;

use crate::actors::{
inventory_manager::{messages::AddItem, InventoryManager},
reputation_manager::{messages::ValidatePoE, ReputationManager},
session::messages::{InventoryExchange, SendInventoryItem},
sessions_manager::{
messages::{Anycast, Broadcast},
Expand Down Expand Up @@ -286,9 +285,6 @@ impl ChainManager {
}

fn process_block(&mut self, ctx: &mut Context<Self>, block: Block) {
// Block verification process
let reputation_manager_addr = System::current().registry().get::<ReputationManager>();

let block_epoch = block.block_header.beacon.checkpoint;
let hash_prev_block = block.block_header.beacon.hash_prev_block;

Expand All @@ -305,6 +301,7 @@ impl ChainManager {
// Check beforehand if a previous block candidate exists to validate
if let Some(candidate_to_validate) = self.candidate_to_validate.take() {
if candidate_to_validate.hash() == hash_prev_block {
debug!("Processing block in memory: {}", hash_prev_block);
self.process_block(ctx, candidate_to_validate);
} else {
self.candidate_to_validate = Some(candidate_to_validate);
Expand Down Expand Up @@ -359,6 +356,7 @@ impl ChainManager {
{
if current_epoch == block_epoch && self.synced {
// Keep possible block_candidate
debug!("Block to memory: {}", block.hash());
self.candidate_to_validate = Some(block);
self.request_block(InventoryEntry::Block(hash_prev_block));
debug!("Requesting previous block: {}", hash_prev_block)
Expand All @@ -382,19 +380,12 @@ impl ChainManager {
current_epoch, block_epoch
);
}
// Request proof of eligibility validation to ReputationManager
reputation_manager_addr
.send(ValidatePoE {
beacon: block.block_header.beacon,
proof: block.proof.clone(),
})
.into_actor(self)
.then(|res, act, ctx| {
act.process_poe_validation_response(res, ctx, block);

actix::fut::ok(())
})
.wait(ctx);

// TODO: Use a real PoE
let poe = true;
if poe {
self.process_poe_validation_response(ctx, block);
}
}
})
.unwrap_or_else(|| {
Expand All @@ -408,98 +399,80 @@ impl ChainManager {
}
}

fn process_poe_validation_response(
&mut self,
res: Result<bool, MailboxError>,
ctx: &mut Context<Self>,
block: Block,
) {
match res {
Err(e) => {
// Error when sending message
error!("Unsuccessful communication with reputation manager: {}", e);
}
Ok(false) => {
warn!("Block PoE not valid");
}
Ok(true) => {
let mut utxo_set = self.chain_state.unspent_outputs_pool.clone();
let mut data_request_pool = self.data_request_pool.clone();

if validate_transactions(
&mut utxo_set,
&self.transactions_pool,
&mut data_request_pool,
&block,
) {
let block_hash: Hash = block.hash();
let block_epoch = block.block_header.beacon.checkpoint;
if Some(block_epoch) == self.current_epoch {
// Update block candidate
self.best_candidate = Some(Candidate {
block: block.clone(),
utxo_set,
data_request_pool,
});
//Broadcast blocks in current epoch
self.broadcast_item(InventoryItem::Block(block));
} else {
//TODO: Now we assume there are no forked older blocks

// Persist block item
self.persist_item(ctx, InventoryItem::Block(block.clone()));

// Update utxo set with block's transactions
self.chain_state.unspent_outputs_pool =
self.chain_state.generate_unspent_outputs_pool(&block);
self.update_transaction_pool(block.txns.as_ref());

// FIXME: Revisit for potential refactor
// Update data requests pool
self.data_request_pool = data_request_pool;
self.data_request_pool.update_data_request_stages();
// Persist finished data requests into storage
let to_be_stored = self.data_request_pool.finished_data_requests();
to_be_stored.into_iter().for_each(|dr| {
self.persist_data_request(ctx, &dr);
});
// Store active data requests
self.chain_state.data_request_pool = ActiveDataRequestPool {
waiting_for_reveal: self.data_request_pool.waiting_for_reveal.clone(),
data_requests_by_epoch: self
.data_request_pool
.data_requests_by_epoch
.clone(),
data_request_pool: self.data_request_pool.data_request_pool.clone(),
to_be_stored: self.data_request_pool.to_be_stored.clone(),
dr_pointer_cache: self.data_request_pool.dr_pointer_cache.clone(),
fn process_poe_validation_response(&mut self, ctx: &mut Context<Self>, block: Block) {
let mut utxo_set = self.chain_state.unspent_outputs_pool.clone();
let mut data_request_pool = self.data_request_pool.clone();

if validate_transactions(
&mut utxo_set,
&self.transactions_pool,
&mut data_request_pool,
&block,
) {
let block_hash: Hash = block.hash();
let block_epoch = block.block_header.beacon.checkpoint;
if Some(block_epoch) == self.current_epoch {
// Update block candidate
self.best_candidate = Some(Candidate {
block: block.clone(),
utxo_set,
data_request_pool,
});
//Broadcast blocks in current epoch
self.broadcast_item(InventoryItem::Block(block));
} else {
//TODO: Now we assume there are no forked older blocks

// Persist block item
self.persist_item(ctx, InventoryItem::Block(block.clone()));

// Update utxo set with block's transactions
self.chain_state.unspent_outputs_pool =
self.chain_state.generate_unspent_outputs_pool(&block);
self.update_transaction_pool(block.txns.as_ref());

// FIXME: Revisit for potential refactor
// Update data requests pool
self.data_request_pool = data_request_pool;
self.data_request_pool.update_data_request_stages();
// Persist finished data requests into storage
let to_be_stored = self.data_request_pool.finished_data_requests();
to_be_stored.into_iter().for_each(|dr| {
self.persist_data_request(ctx, &dr);
});
// Store active data requests
self.chain_state.data_request_pool = ActiveDataRequestPool {
waiting_for_reveal: self.data_request_pool.waiting_for_reveal.clone(),
data_requests_by_epoch: self.data_request_pool.data_requests_by_epoch.clone(),
data_request_pool: self.data_request_pool.data_request_pool.clone(),
to_be_stored: self.data_request_pool.to_be_stored.clone(),
dr_pointer_cache: self.data_request_pool.dr_pointer_cache.clone(),
};
// End of FIX ME

// Update chain_info
match self.chain_state.chain_info.as_mut() {
Some(chain_info) => {
let beacon = CheckpointBeacon {
checkpoint: block_epoch,
hash_prev_block: block_hash,
};
// End of FIX ME

// Update chain_info
match self.chain_state.chain_info.as_mut() {
Some(chain_info) => {
let beacon = CheckpointBeacon {
checkpoint: block_epoch,
hash_prev_block: block_hash,
};

chain_info.highest_block_checkpoint = beacon;

// Insert candidate block into `block_chain`
self.chain_state.block_chain.insert(block_epoch, block_hash);
}

None => {
error!("No ChainInfo loaded in ChainManager");
}
}

chain_info.highest_block_checkpoint = beacon;

// Insert candidate block into `block_chain`
self.chain_state.block_chain.insert(block_epoch, block_hash);
debug!("Chain Info updated");
}

None => {
error!("No ChainInfo loaded in ChainManager");
}
} else {
warn!("Transactions not valid")
}
}
};
} else {
warn!("Transactions not valid")
}
}

/// Method to periodically synchronize inventory items with our peers
Expand Down
6 changes: 0 additions & 6 deletions core/src/actors/sessions_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use actix::{

use ansi_term::Color::Cyan;

use crate::actors::chain_manager::messages::PeerLastEpoch;
use crate::actors::{
chain_manager::{messages::SetNetworkReady, ChainManager},
connections_manager::{messages::OutboundTcpConnect, ConnectionsManager},
Expand Down Expand Up @@ -54,11 +53,6 @@ impl SessionsManager {

// Check if bootstrap is needed
if act.sessions.is_outbound_bootstrap_needed() {
// TODO: Use a properly synchronization process
// Disable mining if we lost the minimum outbounds required
act.network_ready = false;
ChainManager::from_registry().do_send(PeerLastEpoch { epoch: 0 });

// Get peers manager address
let peers_manager_addr = System::current().registry().get::<PeersManager>();

Expand Down

0 comments on commit 9a7774f

Please sign in to comment.