From cbdf20484fd13d61968347dc882bde996ca96070 Mon Sep 17 00:00:00 2001
From: Thegaram <th307q@gmail.com>
Date: Fri, 14 Aug 2020 15:17:03 +0200
Subject: [PATCH] propagate light node sync errors to RPC caller

---
 core/src/light_protocol/common/ledger_info.rs |  67 ++++--
 core/src/light_protocol/error.rs              | 192 +++++++++-------
 core/src/light_protocol/handler/mod.rs        |  95 ++++----
 .../light_protocol/handler/sync/block_txs.rs  |  66 ++++--
 .../src/light_protocol/handler/sync/blooms.rs |  69 +++---
 .../handler/sync/common/future_item.rs        | 128 +++++++++--
 .../handler/sync/common/ledger_proof.rs       |  28 +--
 .../handler/sync/common/sync_manager.rs       |  11 +-
 .../light_protocol/handler/sync/receipts.rs   |  65 ++++--
 .../handler/sync/state_entries.rs             |  66 ++++--
 .../handler/sync/state_roots.rs               |  90 +++++---
 .../handler/sync/storage_roots.rs             |  65 ++++--
 .../light_protocol/handler/sync/tx_infos.rs   | 132 +++++++----
 core/src/light_protocol/handler/sync/txs.rs   |  47 +++-
 .../light_protocol/handler/sync/witnesses.rs  |   5 +-
 core/src/light_protocol/provider.rs           | 206 +++++++++---------
 core/src/light_protocol/query_service.rs      |   8 +-
 core/src/sync/node_type.rs                    |   2 +-
 18 files changed, 843 insertions(+), 499 deletions(-)

diff --git a/core/src/light_protocol/common/ledger_info.rs b/core/src/light_protocol/common/ledger_info.rs
index 86df8a034b..80daf16439 100644
--- a/core/src/light_protocol/common/ledger_info.rs
+++ b/core/src/light_protocol/common/ledger_info.rs
@@ -42,8 +42,10 @@ impl LedgerInfo {
             .get_data_manager()
             .block_by_hash(&hash, false /* update_cache */)
             .map(|b| (*b).clone())
-            // FIXME: what's this internal error?
-            .ok_or(ErrorKind::InternalError.into())
+            .ok_or_else(|| {
+                ErrorKind::InternalError(format!("Block {:?} not found", hash))
+                    .into()
+            })
     }
 
     /// Get header `hash`, if it exists.
@@ -53,12 +55,15 @@ impl LedgerInfo {
             .get_data_manager()
             .block_header_by_hash(&hash)
             .map(|h| (*h).clone())
-            .ok_or(ErrorKind::InternalError.into())
+            .ok_or_else(|| {
+                ErrorKind::InternalError(format!("Header {:?} not found", hash))
+                    .into()
+            })
     }
 
     /// Get hash of block at `height` on the pivot chain, if it exists.
     #[inline]
-    pub fn pivot_hash_of(&self, height: u64) -> Result<H256, Error> {
+    fn pivot_hash_of(&self, height: u64) -> Result<H256, Error> {
         let epoch = EpochNumber::Number(height);
         Ok(self.consensus.get_hash_from_epoch_number(epoch)?)
     }
@@ -81,7 +86,7 @@ impl LedgerInfo {
     /// Get the correct deferred state root of the block at `height` on the
     /// pivot chain based on local execution information.
     #[inline]
-    pub fn correct_deferred_state_root_hash_of(
+    fn correct_deferred_state_root_hash_of(
         &self, height: u64,
     ) -> Result<H256, Error> {
         let epoch = height.saturating_sub(DEFERRED_STATE_EPOCH_COUNT);
@@ -92,7 +97,7 @@ impl LedgerInfo {
     /// Get the correct deferred receipts root of the block at `height` on the
     /// pivot chain based on local execution information.
     #[inline]
-    pub fn correct_deferred_receipts_root_hash_of(
+    fn correct_deferred_receipts_root_hash_of(
         &self, height: u64,
     ) -> Result<H256, Error> {
         let epoch = height.saturating_sub(DEFERRED_STATE_EPOCH_COUNT);
@@ -102,13 +107,19 @@ impl LedgerInfo {
             .get_data_manager()
             .get_epoch_execution_commitment(&pivot)
             .map(|c| c.receipts_root)
-            .ok_or(ErrorKind::InternalError.into())
+            .ok_or_else(|| {
+                ErrorKind::InternalError(format!(
+                    "Execution commitments for {:?} not found",
+                    pivot
+                ))
+                .into()
+            })
     }
 
     /// Get the correct deferred logs bloom root of the block at `height` on the
     /// pivot chain based on local execution information.
     #[inline]
-    pub fn correct_deferred_logs_root_hash_of(
+    fn correct_deferred_logs_root_hash_of(
         &self, height: u64,
     ) -> Result<H256, Error> {
         let epoch = height.saturating_sub(DEFERRED_STATE_EPOCH_COUNT);
@@ -118,7 +129,13 @@ impl LedgerInfo {
             .get_data_manager()
             .get_epoch_execution_commitment(&pivot)
             .map(|c| c.logs_bloom_hash)
-            .ok_or(ErrorKind::InternalError.into())
+            .ok_or_else(|| {
+                ErrorKind::InternalError(format!(
+                    "Execution commitments for {:?} not found",
+                    pivot
+                ))
+                .into()
+            })
     }
 
     /// Get the number of epochs per snapshot period.
@@ -129,7 +146,7 @@ impl LedgerInfo {
 
     /// Get the state trie corresponding to the execution of `epoch`.
     #[inline]
-    pub fn state_of(&self, epoch: u64) -> Result<State, Error> {
+    fn state_of(&self, epoch: u64) -> Result<State, Error> {
         let pivot = self.pivot_hash_of(epoch)?;
 
         let maybe_state_index = self
@@ -145,7 +162,12 @@ impl LedgerInfo {
 
         match state {
             Some(Ok(Some(state))) => Ok(state),
-            _ => Err(ErrorKind::InternalError.into()),
+            _ => {
+                bail!(ErrorKind::InternalError(format!(
+                    "State of epoch {} not found",
+                    epoch
+                )));
+            }
         }
     }
 
@@ -156,7 +178,12 @@ impl LedgerInfo {
     ) -> Result<StateRootWithAuxInfo, Error> {
         match self.state_of(epoch)?.get_state_root() {
             Ok(root) => Ok(root),
-            _ => Err(ErrorKind::InternalError.into()),
+            Err(e) => {
+                bail!(ErrorKind::InternalError(format!(
+                    "State root of epoch {} not found: {:?}",
+                    epoch, e
+                )));
+            }
         }
     }
 
@@ -207,7 +234,13 @@ impl LedgerInfo {
                         false, /* update_cache */
                     )
                     .map(|res| (*res.block_receipts).clone())
-                    .ok_or(ErrorKind::InternalError.into())
+                    .ok_or_else(|| {
+                        ErrorKind::InternalError(format!(
+                            "Receipts of epoch {} not found",
+                            epoch
+                        ))
+                        .into()
+                    })
             })
             .collect()
     }
@@ -232,7 +265,13 @@ impl LedgerInfo {
                         false, /* update_cache */
                     )
                     .map(|res| res.bloom)
-                    .ok_or(ErrorKind::InternalError.into())
+                    .ok_or_else(|| {
+                        ErrorKind::InternalError(format!(
+                            "Logs bloom of epoch {} not found",
+                            epoch
+                        ))
+                        .into()
+                    })
             })
             .collect::<Result<Vec<Bloom>, Error>>()?;
 
diff --git a/core/src/light_protocol/error.rs b/core/src/light_protocol/error.rs
index 53ae7ded87..913c22cf39 100644
--- a/core/src/light_protocol/error.rs
+++ b/core/src/light_protocol/error.rs
@@ -3,15 +3,18 @@
 // See http://www.gnu.org/licenses/
 
 use crate::{
-    message::{Message, MsgId},
+    message::{Message, MsgId, RequestId},
     network::{self, NetworkContext, UpdateNodeOperation},
     statedb,
-    sync::message::Throttled,
+    sync::{message::Throttled, node_type::NodeType},
 };
+use cfx_types::{H160, H256};
 use error_chain::ChainedError;
 use network::node_table::NodeId;
-use primitives::{filter::FilterError, ChainIdParams};
+use parking_lot::Mutex;
+use primitives::{filter::FilterError, ChainIdParams, StateRoot};
 use rlp::DecoderError;
+use std::sync::Arc;
 
 error_chain! {
     links {
@@ -30,29 +33,34 @@ error_chain! {
             display("packet already throttled: {:?}", msg_name),
         }
 
-        ChainIdMismatch{ours: ChainIdParams, theirs: ChainIdParams} {
+        ChainIdMismatch{ ours: ChainIdParams, theirs: ChainIdParams } {
             description("ChainId mismatch"),
-            display("ChainId mismatch, ours {:?}, theirs {:?}.", ours, theirs),
+            display("ChainId mismatch, ours={:?}, theirs={:?}.", ours, theirs),
         }
 
-        GenesisMismatch {
+        ClonableErrorWrapper(error: ClonableError) {
+            description("Clonable error"),
+            display("{:?}", error.0.lock().to_string()),
+        }
+
+        GenesisMismatch{ ours: H256, theirs: H256 } {
             description("Genesis mismatch"),
-            display("Genesis mismatch"),
+            display("Genesis mismatch, ours={:?}, theirs={:?}.", ours, theirs),
         }
 
-        InternalError {
+        InternalError(details: String) {
             description("Internal error"),
-            display("Internal error"),
+            display("Internal error: {:?}", details),
         }
 
-        InvalidBloom {
-            description("Invalid bloom"),
-            display("Invalid bloom"),
+        InvalidBloom{ epoch: u64, expected: H256, received: H256 } {
+            description("Logs bloom hash validation failed"),
+            display("Logs bloom hash validation for epoch {} failed, expected={:?}, received={:?}", epoch, expected, received),
         }
 
-        InvalidLedgerProof {
-            description("Invalid ledger proof"),
-            display("Invalid ledger proof"),
+        InvalidLedgerProofSize{ hash: H256, expected: u64, received: u64 } {
+            description("Invalid ledger proof size"),
+            display("Invalid ledger proof size for header {:?}: expected={}, received={}", hash, expected, received),
         }
 
         InvalidMessageFormat {
@@ -60,44 +68,54 @@ error_chain! {
             display("Invalid message format"),
         }
 
-        InvalidReceipts {
-            description("Invalid receipts"),
-            display("Invalid receipts"),
+        InvalidPreviousStateRoot{ current_epoch: u64, snapshot_epoch_count: u64, root: Option<StateRoot> } {
+            description("Invalid previous state root"),
+            display("Invalid previous state root for epoch {} with snapshot epoch count {}: {:?}", current_epoch, snapshot_epoch_count, root),
         }
 
-        InvalidStateProof(reason: &'static str) {
+        InvalidReceipts{ epoch: u64, expected: H256, received: H256 } {
+            description("Receipts root validation failed"),
+            display("Receipts root validation for epoch {} failed, expected={:?}, received={:?}", epoch, expected, received),
+        }
+
+        InvalidStateProof{ epoch: u64, key: Vec<u8>, value: Option<Vec<u8>>, reason: &'static str } {
             description("Invalid state proof"),
-            display("Invalid state proof: {}", reason),
+            display("Invalid state proof for key {:?} and value {:?} in epoch {}: {:?}", value, key, epoch, reason),
         }
 
-        InvalidStateRoot {
-            description("Invalid state root"),
-            display("Invalid state root"),
+        InvalidStateRoot{ epoch: u64, expected: H256, received: H256 } {
+            description("State root validation failed"),
+            display("State root validation for epoch {} failed, expected={:?}, received={:?}", epoch, expected, received),
         }
 
-        InvalidStorageRootProof(reason: &'static str) {
+        InvalidStorageRootProof{ epoch: u64, address: H160, reason: &'static str } {
             description("Invalid storage root proof"),
-            display("Invalid storage root proof: {}", reason),
+            display("Invalid storage root proof for address {:?} in epoch {}: {}", address, epoch, reason),
         }
 
-        InvalidTxInfo {
+        InvalidTxInfo{ reason: String } {
             description("Invalid tx info"),
-            display("Invalid tx info"),
+            display("Invalid tx info: {:?}", reason),
         }
 
-        InvalidTxRoot {
-            description("Invalid tx root"),
-            display("Invalid tx root"),
+        InvalidTxRoot{ hash: H256, expected: H256, received: H256 } {
+            description("Transaction root validation failed"),
+            display("Transaction root validation for block {:?} failed, expected={:?}, received={:?}", hash, expected, received),
         }
 
-        InvalidTxSignature {
+        InvalidTxSignature{ hash: H256 } {
             description("Invalid tx signature"),
-            display("Invalid tx signature"),
+            display("Invalid signature for transaction {:?}", hash),
+        }
+
+        InvalidWitnessRoot{ hash: H256, expected: H256, received: H256 } {
+            description("Witness root validation failed"),
+            display("Witness root validation for header {:?} failed, expected={:?}, received={:?}", hash, expected, received),
         }
 
-        SendStatusFailed {
+        SendStatusFailed{ peer: NodeId } {
             description("Send status failed"),
-            display("Send status failed"),
+            display("Failed to send status to peer {:?}", peer),
         }
 
         Timeout(details: String) {
@@ -110,49 +128,41 @@ error_chain! {
             display("packet {:?} throttled: {:?}", msg_name, response),
         }
 
-        UnableToProduceProof {
-            description("Unable to produce proof"),
-            display("Unable to produce proof"),
-        }
-
-        UnableToProduceTxInfo(details: String) {
+        UnableToProduceTxInfo{ reason: String } {
             description("Unable to produce tx info"),
-            display("Unable to produce tx info: {:?}", details),
+            display("Unable to produce tx info: {:?}", reason),
         }
 
-        UnexpectedMessage {
+        UnexpectedMessage{ expected: Vec<MsgId>, received: MsgId } {
             description("Unexpected message"),
-            display("Unexpected message"),
+            display("Unexpected message id={:?}, expected one of {:?}", received, expected),
         }
 
-        UnexpectedPeerType {
+        UnexpectedPeerType{ node_type: NodeType } {
             description("Unexpected peer type"),
-            display("Unexpected peer type"),
-        }
-
-        UnexpectedRequestId {
-            description("Unexpected request id"),
-            display("Unexpected request id"),
+            display("Unexpected peer type: {:?}", node_type),
         }
 
-        UnexpectedResponse {
+        UnexpectedResponse{ expected: Option<RequestId>, received: RequestId } {
             description("Unexpected response"),
-            display("Unexpected response"),
+            display("Unexpected response id; expected = {:?}, received = {:?}", expected, received),
         }
 
-        UnknownMessage {
+        UnknownMessage{ id: MsgId } {
             description("Unknown message"),
-            display("Unknown message"),
+            display("Unknown message: {:?}", id),
         }
 
-        UnknownPeer {
-            description("Unknown peer"),
-            display("Unknown peer"),
+        WitnessUnavailable{ epoch: u64 } {
+            description("Witness unavailable"),
+            display("Witness for epoch {} is not available", epoch),
         }
     }
 }
 
-pub fn handle(io: &dyn NetworkContext, peer: &NodeId, msg_id: MsgId, e: Error) {
+pub fn handle(
+    io: &dyn NetworkContext, peer: &NodeId, msg_id: MsgId, e: &Error,
+) {
     warn!(
         "Error while handling message, peer={}, msg_id={:?}, error={}",
         peer,
@@ -166,9 +176,14 @@ pub fn handle(io: &dyn NetworkContext, peer: &NodeId, msg_id: MsgId, e: Error) {
 
     // NOTE: do not use wildcard; this way, the compiler
     // will help covering all the cases.
-    match e.0 {
+    match &e.0 {
+        // for wrapped errors, handle based on the inner error
+        ErrorKind::ClonableErrorWrapper(e) => {
+            handle(io, peer, msg_id, &*e.0.lock())
+        }
+
         ErrorKind::Filter(_)
-        | ErrorKind::InternalError
+        | ErrorKind::InternalError(_)
 
         // NOTE: we should be tolerant of non-critical errors,
         // e.g. do not disconnect on requesting non-existing epoch
@@ -176,44 +191,44 @@ pub fn handle(io: &dyn NetworkContext, peer: &NodeId, msg_id: MsgId, e: Error) {
 
         // NOTE: in order to let other protocols run,
         // we should not disconnect on protocol failure
-        | ErrorKind::SendStatusFailed
+        | ErrorKind::SendStatusFailed{..}
 
         | ErrorKind::Timeout(_)
 
-        // NOTE: if we do not have a confirmed (non-blamed) block
-        // with the info needed to produce a state root proof, we
-        // should not disconnect the peer
-        | ErrorKind::UnableToProduceProof
-
         // if the tx requested has been removed locally,
         // we should not disconnect the peer
-        | ErrorKind::UnableToProduceTxInfo(_)
+        | ErrorKind::UnableToProduceTxInfo{..}
+
+        // if the witness is not available, it is probably
+        // due to the local witness sync process
+        | ErrorKind::WitnessUnavailable{..}
 
         // NOTE: to help with backward-compatibility, we
         // should not disconnect on `UnknownMessage`
-        | ErrorKind::UnknownMessage => disconnect = false,
+        | ErrorKind::UnknownMessage{..} => disconnect = false,
 
 
-        ErrorKind::GenesisMismatch
+        ErrorKind::GenesisMismatch{..}
         | ErrorKind::ChainIdMismatch{..}
-        | ErrorKind::UnexpectedMessage
-        | ErrorKind::UnexpectedPeerType
-        | ErrorKind::UnknownPeer => op = Some(UpdateNodeOperation::Failure),
+        | ErrorKind::UnexpectedMessage{..}
+        | ErrorKind::UnexpectedPeerType{..} => op = Some(UpdateNodeOperation::Failure),
 
-        ErrorKind::UnexpectedRequestId | ErrorKind::UnexpectedResponse => {
+        ErrorKind::UnexpectedResponse{..} => {
             op = Some(UpdateNodeOperation::Demotion)
         }
 
-        ErrorKind::InvalidBloom
-        | ErrorKind::InvalidLedgerProof
+        ErrorKind::InvalidBloom{..}
+        | ErrorKind::InvalidLedgerProofSize{..}
         | ErrorKind::InvalidMessageFormat
-        | ErrorKind::InvalidReceipts
-        | ErrorKind::InvalidStateProof(_)
-        | ErrorKind::InvalidStateRoot
-        | ErrorKind::InvalidStorageRootProof(_)
-        | ErrorKind::InvalidTxInfo
-        | ErrorKind::InvalidTxRoot
-        | ErrorKind::InvalidTxSignature
+        | ErrorKind::InvalidPreviousStateRoot{..}
+        | ErrorKind::InvalidReceipts{..}
+        | ErrorKind::InvalidStateProof{..}
+        | ErrorKind::InvalidStateRoot{..}
+        | ErrorKind::InvalidStorageRootProof{..}
+        | ErrorKind::InvalidTxInfo{..}
+        | ErrorKind::InvalidTxRoot{..}
+        | ErrorKind::InvalidTxSignature{..}
+        | ErrorKind::InvalidWitnessRoot{..}
         | ErrorKind::AlreadyThrottled(_)
         | ErrorKind::Decoder(_) => op = Some(UpdateNodeOperation::Remove),
 
@@ -274,3 +289,16 @@ pub fn handle(io: &dyn NetworkContext, peer: &NodeId, msg_id: MsgId, e: Error) {
         io.disconnect_peer(peer, op, reason.as_str());
     }
 }
+
+#[derive(Clone, Debug)]
+pub struct ClonableError(Arc<Mutex<Error>>);
+
+impl Into<Error> for ClonableError {
+    fn into(self) -> Error { ErrorKind::ClonableErrorWrapper(self).into() }
+}
+
+impl From<Error> for ClonableError {
+    fn from(e: Error) -> ClonableError {
+        ClonableError(Arc::new(Mutex::new(e)))
+    }
+}
diff --git a/core/src/light_protocol/handler/mod.rs b/core/src/light_protocol/handler/mod.rs
index be248b50d9..d641b7f61d 100644
--- a/core/src/light_protocol/handler/mod.rs
+++ b/core/src/light_protocol/handler/mod.rs
@@ -8,6 +8,7 @@ use crate::{
     consensus::SharedConsensusGraph,
     light_protocol::{
         common::{validate_chain_id, FullPeerState, Peers},
+        error::*,
         handle_error,
         message::{
             msgid, BlockHashes as GetBlockHashesResponse,
@@ -21,8 +22,8 @@ use crate::{
             TxInfos as GetTxInfosResponse, Txs as GetTxsResponse,
             WitnessInfo as GetWitnessInfoResponse,
         },
-        Error, ErrorKind, LIGHT_PROTOCOL_OLD_VERSIONS_TO_SUPPORT,
-        LIGHT_PROTOCOL_VERSION, LIGHT_PROTO_V1,
+        LIGHT_PROTOCOL_OLD_VERSIONS_TO_SUPPORT, LIGHT_PROTOCOL_VERSION,
+        LIGHT_PROTO_V1,
     },
     message::{decode_msg, decode_rlp_and_check_deprecation, Message, MsgId},
     network::{NetworkContext, NetworkProtocolHandler},
@@ -211,28 +212,28 @@ impl Handler {
     #[inline]
     fn get_existing_peer_state(
         &self, peer: &NodeId,
-    ) -> Result<Arc<RwLock<FullPeerState>>, Error> {
+    ) -> Result<Arc<RwLock<FullPeerState>>> {
         match self.peers.get(&peer) {
             Some(state) => Ok(state),
             None => {
                 // NOTE: this should not happen as we register
                 // all peers in `on_peer_connected`
-                error!("Received message from unknown peer={:?}", peer);
-                Err(ErrorKind::InternalError.into())
+                bail!(ErrorKind::InternalError(format!(
+                    "Received message from unknown peer={:?}",
+                    peer
+                )));
             }
         }
     }
 
     #[allow(unused)]
     #[inline]
-    fn peer_version(&self, peer: &NodeId) -> Result<ProtocolVersion, Error> {
+    fn peer_version(&self, peer: &NodeId) -> Result<ProtocolVersion> {
         Ok(self.get_existing_peer_state(peer)?.read().protocol_version)
     }
 
     #[inline]
-    fn validate_peer_state(
-        &self, peer: &NodeId, msg_id: MsgId,
-    ) -> Result<(), Error> {
+    fn validate_peer_state(&self, peer: &NodeId, msg_id: MsgId) -> Result<()> {
         let state = self.get_existing_peer_state(&peer)?;
 
         if msg_id != msgid::STATUS_PONG_DEPRECATED
@@ -240,39 +241,43 @@ impl Handler {
             && !state.read().handshake_completed
         {
             warn!("Received msg={:?} from handshaking peer={:?}", msg_id, peer);
-            return Err(ErrorKind::UnexpectedMessage.into());
+            bail!(ErrorKind::UnexpectedMessage {
+                expected: vec![
+                    msgid::STATUS_PING_DEPRECATED,
+                    msgid::STATUS_PING_V2
+                ],
+                received: msg_id,
+            });
         }
 
         Ok(())
     }
 
     #[inline]
-    fn validate_peer_type(&self, node_type: &NodeType) -> Result<(), Error> {
+    fn validate_peer_type(&self, node_type: NodeType) -> Result<()> {
         match node_type {
             NodeType::Archive => Ok(()),
             NodeType::Full => Ok(()),
-            _ => Err(ErrorKind::UnexpectedPeerType.into()),
+            _ => bail!(ErrorKind::UnexpectedPeerType { node_type }),
         }
     }
 
     #[inline]
-    fn validate_genesis_hash(&self, genesis: H256) -> Result<(), Error> {
-        match self.consensus.get_data_manager().true_genesis.hash() {
-            h if h == genesis => Ok(()),
-            h => {
-                debug!(
-                    "Genesis mismatch (ours: {:?}, theirs: {:?})",
-                    h, genesis
-                );
-                Err(ErrorKind::GenesisMismatch.into())
-            }
+    fn validate_genesis_hash(&self, genesis: H256) -> Result<()> {
+        let ours = self.consensus.get_data_manager().true_genesis.hash();
+        let theirs = genesis;
+
+        if ours != theirs {
+            bail!(ErrorKind::GenesisMismatch { ours, theirs });
         }
+
+        Ok(())
     }
 
     #[rustfmt::skip]
     fn dispatch_message(
         &self, io: &dyn NetworkContext, peer: &NodeId, msg_id: MsgId, rlp: Rlp,
-    ) -> Result<(), Error> {
+    ) -> Result<()> {
         trace!("Dispatching message: peer={:?}, msg_id={:?}", peer, msg_id);
         self.validate_peer_state(peer, msg_id)?;
         let min_supported_ver = self.minimum_supported_version();
@@ -300,7 +305,7 @@ impl Handler {
             // request was throttled by service provider
             msgid::THROTTLED => self.on_throttled(io, peer, decode_rlp_and_check_deprecation(&rlp, min_supported_ver, protocol)?),
 
-            _ => Err(ErrorKind::UnknownMessage.into()),
+            _ => bail!(ErrorKind::UnknownMessage{id: msg_id}),
         }
     }
 
@@ -356,7 +361,7 @@ impl Handler {
     fn send_status(
         &self, io: &dyn NetworkContext, peer: &NodeId,
         peer_protocol_version: ProtocolVersion,
-    ) -> Result<(), Error>
+    ) -> Result<()>
     {
         let msg: Box<dyn Message>;
 
@@ -389,7 +394,7 @@ impl Handler {
     #[inline]
     pub fn send_raw_tx(
         &self, io: &dyn NetworkContext, peer: &NodeId, raw: Vec<u8>,
-    ) -> Result<(), Error> {
+    ) -> Result<()> {
         let msg: Box<dyn Message> = Box::new(SendRawTx { raw });
         msg.send(io, peer)?;
         Ok(())
@@ -397,10 +402,10 @@ impl Handler {
 
     fn on_status_v2(
         &self, io: &dyn NetworkContext, peer: &NodeId, status: StatusPongV2,
-    ) -> Result<(), Error> {
+    ) -> Result<()> {
         info!("on_status peer={:?} status={:?}", peer, status);
 
-        self.validate_peer_type(&status.node_type)?;
+        self.validate_peer_type(status.node_type)?;
         self.validate_genesis_hash(status.genesis_hash)?;
         validate_chain_id(
             &self.consensus.get_config().chain_id,
@@ -424,7 +429,7 @@ impl Handler {
     fn on_status_deprecated(
         &self, io: &dyn NetworkContext, peer: &NodeId,
         status: StatusPongDeprecatedV1,
-    ) -> Result<(), Error>
+    ) -> Result<()>
     {
         info!("on_status peer={:?} status={:?}", peer, status);
 
@@ -444,7 +449,7 @@ impl Handler {
     fn on_block_hashes(
         &self, io: &dyn NetworkContext, _peer: &NodeId,
         resp: GetBlockHashesResponse,
-    ) -> Result<(), Error>
+    ) -> Result<()>
     {
         debug!("on_block_hashes resp={:?}", resp);
 
@@ -460,7 +465,7 @@ impl Handler {
     fn on_block_headers(
         &self, io: &dyn NetworkContext, peer: &NodeId,
         resp: GetBlockHeadersResponse,
-    ) -> Result<(), Error>
+    ) -> Result<()>
     {
         debug!("on_block_headers resp={:?}", resp);
 
@@ -477,7 +482,7 @@ impl Handler {
     fn on_block_txs(
         &self, io: &dyn NetworkContext, peer: &NodeId,
         resp: GetBlockTxsResponse,
-    ) -> Result<(), Error>
+    ) -> Result<()>
     {
         debug!("on_block_txs resp={:?}", resp);
 
@@ -493,7 +498,7 @@ impl Handler {
 
     fn on_blooms(
         &self, io: &dyn NetworkContext, peer: &NodeId, resp: GetBloomsResponse,
-    ) -> Result<(), Error> {
+    ) -> Result<()> {
         debug!("on_blooms resp={:?}", resp);
 
         self.blooms
@@ -505,7 +510,7 @@ impl Handler {
 
     fn on_new_block_hashes(
         &self, io: &dyn NetworkContext, peer: &NodeId, msg: NewBlockHashes,
-    ) -> Result<(), Error> {
+    ) -> Result<()> {
         debug!("on_new_block_hashes msg={:?}", msg);
 
         if self.catch_up_mode() {
@@ -530,7 +535,7 @@ impl Handler {
     fn on_receipts(
         &self, io: &dyn NetworkContext, peer: &NodeId,
         resp: GetReceiptsResponse,
-    ) -> Result<(), Error>
+    ) -> Result<()>
     {
         debug!("on_receipts resp={:?}", resp);
 
@@ -547,7 +552,7 @@ impl Handler {
     fn on_state_entries(
         &self, io: &dyn NetworkContext, peer: &NodeId,
         resp: GetStateEntriesResponse,
-    ) -> Result<(), Error>
+    ) -> Result<()>
     {
         debug!("on_state_entries resp={:?}", resp);
 
@@ -564,7 +569,7 @@ impl Handler {
     fn on_state_roots(
         &self, io: &dyn NetworkContext, peer: &NodeId,
         resp: GetStateRootsResponse,
-    ) -> Result<(), Error>
+    ) -> Result<()>
     {
         debug!("on_state_roots resp={:?}", resp);
 
@@ -581,7 +586,7 @@ impl Handler {
     fn on_storage_roots(
         &self, io: &dyn NetworkContext, peer: &NodeId,
         resp: GetStorageRootsResponse,
-    ) -> Result<(), Error>
+    ) -> Result<()>
     {
         debug!("on_storage_roots resp={:?}", resp);
 
@@ -597,7 +602,7 @@ impl Handler {
 
     fn on_txs(
         &self, io: &dyn NetworkContext, peer: &NodeId, resp: GetTxsResponse,
-    ) -> Result<(), Error> {
+    ) -> Result<()> {
         debug!("on_txs resp={:?}", resp);
 
         self.txs
@@ -609,7 +614,7 @@ impl Handler {
 
     fn on_tx_infos(
         &self, io: &dyn NetworkContext, peer: &NodeId, resp: GetTxInfosResponse,
-    ) -> Result<(), Error> {
+    ) -> Result<()> {
         debug!("on_tx_infos resp={:?}", resp);
 
         self.tx_infos
@@ -622,7 +627,7 @@ impl Handler {
     fn on_witness_info(
         &self, io: &dyn NetworkContext, peer: &NodeId,
         resp: GetWitnessInfoResponse,
-    ) -> Result<(), Error>
+    ) -> Result<()>
     {
         debug!("on_witness_info resp={:?}", resp);
 
@@ -678,7 +683,7 @@ impl Handler {
 
     fn on_throttled(
         &self, _io: &dyn NetworkContext, peer: &NodeId, resp: Throttled,
-    ) -> Result<(), Error> {
+    ) -> Result<()> {
         debug!("on_throttled resp={:?}", resp);
 
         let peer = self.get_existing_peer_state(peer)?;
@@ -729,7 +734,7 @@ impl NetworkProtocolHandler for Handler {
                     io,
                     peer,
                     msgid::INVALID,
-                    ErrorKind::InvalidMessageFormat.into(),
+                    &ErrorKind::InvalidMessageFormat.into(),
                 )
             }
         };
@@ -737,7 +742,7 @@ impl NetworkProtocolHandler for Handler {
         debug!("on_message: peer={:?}, msgid={:?}", peer, msg_id);
 
         if let Err(e) = self.dispatch_message(io, peer, msg_id.into(), rlp) {
-            handle_error(io, peer, msg_id.into(), e);
+            handle_error(io, peer, msg_id.into(), &e);
         }
     }
 
@@ -770,7 +775,7 @@ impl NetworkProtocolHandler for Handler {
                     io,
                     peer,
                     msgid::INVALID,
-                    ErrorKind::SendStatusFailed.into(),
+                    &ErrorKind::SendStatusFailed { peer: *peer }.into(),
                 );
             }
         }
diff --git a/core/src/light_protocol/handler/sync/block_txs.rs b/core/src/light_protocol/handler/sync/block_txs.rs
index c746b878b2..b0b7b2ef8b 100644
--- a/core/src/light_protocol/handler/sync/block_txs.rs
+++ b/core/src/light_protocol/handler/sync/block_txs.rs
@@ -12,8 +12,8 @@ use crate::{
     consensus::SharedConsensusGraph,
     light_protocol::{
         common::{FullPeerState, LedgerInfo, Peers},
+        error::*,
         message::{msgid, BlockTxsWithHash, GetBlockTxs},
-        Error, ErrorKind,
     },
     message::{Message, RequestId},
     network::NetworkContext,
@@ -25,6 +25,7 @@ use crate::{
     UniqueId,
 };
 use cfx_types::H256;
+use futures::future::FutureExt;
 use lru_time_cache::LruCache;
 use network::node_table::NodeId;
 use parking_lot::RwLock;
@@ -41,6 +42,8 @@ struct Statistics {
 // prioritize earlier requests
 type MissingBlockTxs = TimeOrdered<H256>;
 
+type PendingBlockTxs = PendingItem<Vec<SignedTransaction>, ClonableError>;
+
 pub struct BlockTxs {
     // helper API for retrieving ledger information
     ledger: LedgerInfo,
@@ -55,7 +58,7 @@ pub struct BlockTxs {
     txs: Arc<Txs>,
 
     // block txs received from full node
-    verified: Arc<RwLock<LruCache<H256, PendingItem<Vec<SignedTransaction>>>>>,
+    verified: Arc<RwLock<LruCache<H256, PendingBlockTxs>>>,
 }
 
 impl BlockTxs {
@@ -92,20 +95,28 @@ impl BlockTxs {
     #[inline]
     pub fn request(
         &self, hash: H256,
-    ) -> impl Future<Output = Vec<SignedTransaction>> {
-        if !self.verified.read().contains_key(&hash) {
+    ) -> impl Future<Output = Result<Vec<SignedTransaction>>> {
+        let mut verified = self.verified.write();
+
+        if !verified.contains_key(&hash) {
             let missing = MissingBlockTxs::new(hash);
             self.sync_manager.insert_waiting(std::iter::once(missing));
         }
 
+        verified
+            .entry(hash)
+            .or_insert(PendingItem::pending())
+            .clear_error();
+
         FutureItem::new(hash, self.verified.clone())
+            .map(|res| res.map_err(|e| e.into()))
     }
 
     #[inline]
     pub fn receive(
         &self, peer: &NodeId, id: RequestId,
         block_txs: impl Iterator<Item = BlockTxsWithHash>,
-    ) -> Result<(), Error>
+    ) -> Result<()>
     {
         for BlockTxsWithHash { hash, block_txs } in block_txs {
             debug!("Validating block_txs {:?} with hash {}", block_txs, hash);
@@ -122,14 +133,21 @@ impl BlockTxs {
     #[inline]
     pub fn validate_and_store(
         &self, hash: H256, block_txs: Vec<SignedTransaction>,
-    ) -> Result<(), Error> {
-        // validate and store each transaction
-        for tx in &block_txs {
-            self.txs.validate_and_store(tx.clone())?;
-        }
-
+    ) -> Result<()> {
         // validate block txs
-        self.validate_block_txs(hash, &block_txs)?;
+        if let Err(e) = self.validate_block_txs(hash, &block_txs) {
+            // forward error to both rpc caller(s) and sync handler
+            // so we need to make it clonable
+            let e = ClonableError::from(e);
+
+            self.verified
+                .write()
+                .entry(hash)
+                .or_insert(PendingItem::pending())
+                .set_error(e.clone());
+
+            bail!(e);
+        }
 
         // store block bodies by block hash
         self.verified
@@ -156,7 +174,7 @@ impl BlockTxs {
     #[inline]
     fn send_request(
         &self, io: &dyn NetworkContext, peer: &NodeId, hashes: Vec<H256>,
-    ) -> Result<Option<RequestId>, Error> {
+    ) -> Result<Option<RequestId>> {
         debug!("send_request peer={:?} hashes={:?}", peer, hashes);
 
         if hashes.is_empty() {
@@ -185,20 +203,22 @@ impl BlockTxs {
     #[inline]
     pub fn validate_block_txs(
         &self, hash: H256, txs: &Vec<SignedTransaction>,
-    ) -> Result<(), Error> {
-        // NOTE: tx signatures have been validated previously
-
-        let local = *self.ledger.header(hash)?.transactions_root();
+    ) -> Result<()> {
+        // validate each transaction first
+        for tx in txs {
+            self.txs.validate_tx(&tx)?;
+        }
 
+        let expected = *self.ledger.header(hash)?.transactions_root();
         let txs: Vec<_> = txs.iter().map(|tx| Arc::new(tx.clone())).collect();
         let received = compute_transaction_root(&txs);
 
-        if received != local {
-            warn!(
-                "Tx root validation failed, received={:?}, local={:?}",
-                received, local
-            );
-            return Err(ErrorKind::InvalidTxRoot.into());
+        if received != expected {
+            bail!(ErrorKind::InvalidTxRoot {
+                hash,
+                expected,
+                received,
+            });
         }
 
         Ok(())
diff --git a/core/src/light_protocol/handler/sync/blooms.rs b/core/src/light_protocol/handler/sync/blooms.rs
index eb57cf1ac3..43a03b931c 100644
--- a/core/src/light_protocol/handler/sync/blooms.rs
+++ b/core/src/light_protocol/handler/sync/blooms.rs
@@ -13,8 +13,8 @@ use crate::{
     hash::keccak,
     light_protocol::{
         common::{FullPeerState, Peers},
+        error::*,
         message::{msgid, BloomWithEpoch, GetBlooms},
-        Error, ErrorKind,
     },
     message::{Message, RequestId},
     network::NetworkContext,
@@ -29,6 +29,7 @@ use super::{
     common::{FutureItem, KeyOrdered, PendingItem, SyncManager},
     witnesses::Witnesses,
 };
+use futures::future::FutureExt;
 use network::node_table::NodeId;
 
 #[derive(Debug)]
@@ -41,6 +42,8 @@ struct Statistics {
 // prioritize higher epochs
 type MissingBloom = KeyOrdered<u64>;
 
+type PendingBloom = PendingItem<Bloom, ClonableError>;
+
 pub struct Blooms {
     // series of unique request ids
     request_id_allocator: Arc<UniqueId>,
@@ -49,7 +52,7 @@ pub struct Blooms {
     sync_manager: SyncManager<u64, MissingBloom>,
 
     // bloom filters received from full node
-    verified: Arc<RwLock<LruCache<u64, PendingItem<Bloom>>>>,
+    verified: Arc<RwLock<LruCache<u64, PendingBloom>>>,
 
     // witness sync manager
     witnesses: Arc<Witnesses>,
@@ -84,26 +87,32 @@ impl Blooms {
     }
 
     #[inline]
-    pub fn request(&self, epoch: u64) -> impl Future<Output = Bloom> {
+    pub fn request(&self, epoch: u64) -> impl Future<Output = Result<Bloom>> {
+        let mut verified = self.verified.write();
+
         if epoch == 0 {
-            self.verified
-                .write()
-                .insert(0, PendingItem::ready(Bloom::zero()));
+            verified.insert(0, PendingItem::ready(Bloom::zero()));
         }
 
-        if !self.verified.read().contains_key(&epoch) {
+        if !verified.contains_key(&epoch) {
             let missing = MissingBloom::new(epoch);
             self.sync_manager.insert_waiting(std::iter::once(missing));
         }
 
+        verified
+            .entry(epoch)
+            .or_insert(PendingItem::pending())
+            .clear_error();
+
         FutureItem::new(epoch, self.verified.clone())
+            .map(|res| res.map_err(|e| e.into()))
     }
 
     #[inline]
     pub fn receive(
         &self, peer: &NodeId, id: RequestId,
         blooms: impl Iterator<Item = BloomWithEpoch>,
-    ) -> Result<(), Error>
+    ) -> Result<()>
     {
         for BloomWithEpoch { epoch, bloom } in blooms {
             debug!("Validating bloom {:?} with epoch {}", bloom, epoch);
@@ -118,11 +127,21 @@ impl Blooms {
     }
 
     #[inline]
-    pub fn validate_and_store(
-        &self, epoch: u64, bloom: Bloom,
-    ) -> Result<(), Error> {
+    pub fn validate_and_store(&self, epoch: u64, bloom: Bloom) -> Result<()> {
         // validate bloom
-        self.validate_bloom(epoch, bloom)?;
+        if let Err(e) = self.validate_bloom(epoch, bloom) {
+            // forward error to both rpc caller(s) and sync handler
+            // so we need to make it clonable
+            let e = ClonableError::from(e);
+
+            self.verified
+                .write()
+                .entry(epoch)
+                .or_insert(PendingItem::pending())
+                .set_error(e.clone());
+
+            bail!(e);
+        }
 
         // store bloom by epoch
         self.verified
@@ -149,7 +168,7 @@ impl Blooms {
     #[inline]
     fn send_request(
         &self, io: &dyn NetworkContext, peer: &NodeId, epochs: Vec<u64>,
-    ) -> Result<Option<RequestId>, Error> {
+    ) -> Result<Option<RequestId>> {
         debug!("send_request peer={:?} epochs={:?}", peer, epochs);
 
         if epochs.is_empty() {
@@ -175,29 +194,23 @@ impl Blooms {
     }
 
     #[inline]
-    fn validate_bloom(&self, epoch: u64, bloom: Bloom) -> Result<(), Error> {
+    fn validate_bloom(&self, epoch: u64, bloom: Bloom) -> Result<()> {
         // calculate received bloom hash
         let received = keccak(bloom);
 
         // retrieve local bloom hash
-        let local = match self.witnesses.root_hashes_of(epoch) {
+        let expected = match self.witnesses.root_hashes_of(epoch) {
             Some((_, _, bloom_hash)) => bloom_hash,
-            None => {
-                warn!(
-                    "Bloom hash not found, epoch={}, bloom={:?}",
-                    epoch, bloom
-                );
-                return Err(ErrorKind::InternalError.into());
-            }
+            None => bail!(ErrorKind::WitnessUnavailable { epoch }),
         };
 
         // check
-        if received != local {
-            warn!(
-                "Bloom validation failed, received={:?}, local={:?}",
-                received, local
-            );
-            return Err(ErrorKind::InvalidBloom.into());
+        if received != expected {
+            bail!(ErrorKind::InvalidBloom {
+                epoch,
+                expected,
+                received,
+            });
         }
 
         Ok(())
diff --git a/core/src/light_protocol/handler/sync/common/future_item.rs b/core/src/light_protocol/handler/sync/common/future_item.rs
index 97e4a32321..a1daef824f 100644
--- a/core/src/light_protocol/handler/sync/common/future_item.rs
+++ b/core/src/light_protocol/handler/sync/common/future_item.rs
@@ -14,20 +14,25 @@ use std::{
     task::{Context, Poll, Waker},
 };
 
-pub enum PendingItem<T> {
-    Ready(T),
+pub enum PendingItem<Item, Err> {
+    Ready(Item),
     Pending(Vec<Waker>),
+    Error(Err),
 }
 
-impl<T> PendingItem<T> {
-    pub fn pending() -> PendingItem<T> { Self::Pending(vec![]) }
+impl<Item, Err> PendingItem<Item, Err> {
+    pub fn pending() -> Self { Self::Pending(vec![]) }
 
-    pub fn ready(item: T) -> PendingItem<T> { Self::Ready(item) }
-}
+    pub fn ready(item: Item) -> Self { Self::Ready(item) }
+
+    pub fn clear_error(&mut self) {
+        if let Self::Error(_) = self {
+            *self = Self::pending();
+        }
+    }
 
-impl<T> PendingItem<T> {
     // NOTE: `set` has to be called in a thread-safe environment
-    pub fn set(&mut self, item: T) {
+    pub fn set(&mut self, item: Item) {
         match self {
             Self::Ready(_old) => {
                 // FIXME: we might want to check if old == item and raise an
@@ -46,43 +51,77 @@ impl<T> PendingItem<T> {
                     w.wake();
                 }
             }
+            Self::Error(_) => {
+                // if we managed to verify the item, we do not care about the
+                // error anymore. wakers must have been notified when `self` was
+                // set to `Error`, so they either received an error or haven't
+                // polled yet.
+                *self = Self::Ready(item);
+            }
+        }
+    }
+
+    // NOTE: `set_error` has to be called in a thread-safe environment
+    pub fn set_error(&mut self, err: Err) {
+        match self {
+            Self::Ready(_) => {
+                // if we already have a verified value, we do not care about
+                // errors anymore
+            }
+            Self::Pending(ws) => {
+                // move `ws` out
+                let ws = std::mem::replace(ws, Vec::<Waker>::new());
+
+                // transform `self`
+                *self = Self::Error(err);
+
+                // notify waiting futures
+                for w in ws {
+                    w.wake();
+                }
+            }
+            Self::Error(_) => {
+                *self = Self::Error(err);
+            }
         }
     }
 }
 
-impl<T: Clone> PendingItem<T> {
+impl<Item: Clone, Err: Clone> PendingItem<Item, Err> {
     // NOTE: `poll` has to be called in a thread-safe environment
-    fn poll(&mut self, ctx: &mut Context) -> Poll<T> {
+    fn poll(&mut self, ctx: &mut Context) -> Poll<Result<Item, Err>> {
         match self {
-            Self::Ready(item) => Poll::Ready(item.clone()),
+            Self::Ready(item) => Poll::Ready(Ok(item.clone())),
             Self::Pending(ws) => {
                 // FIXME: is it safe to keep old wakers?
                 ws.push(ctx.waker().clone());
                 Poll::Pending
             }
+            Self::Error(e) => Poll::Ready(Err(e.clone())),
         }
     }
 }
 
-pub struct FutureItem<K, V> {
+pub struct FutureItem<K, V, E> {
     key: K,
-    verified: Arc<RwLock<LruCache<K, PendingItem<V>>>>,
+    verified: Arc<RwLock<LruCache<K, PendingItem<V, E>>>>,
 }
 
-impl<K, V> FutureItem<K, V> {
+impl<K, V, E> FutureItem<K, V, E> {
     pub fn new(
-        key: K, verified: Arc<RwLock<LruCache<K, PendingItem<V>>>>,
-    ) -> FutureItem<K, V> {
+        key: K, verified: Arc<RwLock<LruCache<K, PendingItem<V, E>>>>,
+    ) -> FutureItem<K, V, E> {
         FutureItem { key, verified }
     }
 }
 
-impl<K, V> Future for FutureItem<K, V>
+impl<K, V, E> Future for FutureItem<K, V, E>
 where
     K: Clone + Eq + Hash + Ord,
     V: Clone,
+    E: Clone,
 {
-    type Output = V;
+    type Output = Result<V, E>;
 
     fn poll(self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Self::Output> {
         self.verified
@@ -102,13 +141,58 @@ mod tests {
     use std::{sync::Arc, time::Duration};
     use tokio::{runtime::Runtime, time::delay_for};
 
+    #[test]
+    fn test_set() {
+        const KEY: u64 = 1;
+        const VALUE: u64 = 2;
+        const ERROR: u64 = 3;
+
+        let cache = LruCache::<u64, PendingItem<u64, u64>>::with_capacity(1);
+        let verified = Arc::new(RwLock::new(cache));
+
+        let mut runtime = Runtime::new().expect("Unable to create a runtime");
+
+        // set error
+        verified
+            .write()
+            .entry(KEY)
+            .or_insert(PendingItem::pending())
+            .set_error(ERROR);
+
+        // caller should get the error
+        let res = runtime.block_on(FutureItem::new(KEY, verified.clone()));
+        assert_eq!(res, Err(ERROR));
+
+        // set value
+        verified
+            .write()
+            .entry(KEY)
+            .or_insert(PendingItem::pending())
+            .set(VALUE);
+
+        // caller should get the value
+        let res = runtime.block_on(FutureItem::new(KEY, verified.clone()));
+        assert_eq!(res, Ok(VALUE));
+
+        // set error again
+        verified
+            .write()
+            .entry(KEY)
+            .or_insert(PendingItem::pending())
+            .set_error(ERROR);
+
+        // result is not overwritten by error
+        let res = runtime.block_on(FutureItem::new(KEY, verified.clone()));
+        assert_eq!(res, Ok(VALUE));
+    }
+
     #[test]
     fn test_concurrent_access() {
         const KEY: u64 = 1;
         const VALUE: u64 = 2;
         const DELAY: u64 = 10;
 
-        let cache = LruCache::<u64, PendingItem<u64>>::with_capacity(1);
+        let cache = LruCache::<u64, PendingItem<u64, ()>>::with_capacity(1);
         let verified = Arc::new(RwLock::new(cache));
 
         // we will simulate 3 concurrent accesses to the same item
@@ -141,8 +225,8 @@ mod tests {
         let mut runtime = Runtime::new().expect("Unable to create a runtime");
         let (res1, (res2, res3), _) = runtime.block_on(join3(fut1, fut2, fut3));
 
-        assert_eq!(res1, VALUE);
-        assert_eq!(res2, VALUE);
-        assert_eq!(res3, VALUE);
+        assert_eq!(res1, Ok(VALUE));
+        assert_eq!(res2, Ok(VALUE));
+        assert_eq!(res3, Ok(VALUE));
     }
 }
diff --git a/core/src/light_protocol/handler/sync/common/ledger_proof.rs b/core/src/light_protocol/handler/sync/common/ledger_proof.rs
index c3ba3b2184..2181653e0e 100644
--- a/core/src/light_protocol/handler/sync/common/ledger_proof.rs
+++ b/core/src/light_protocol/handler/sync/common/ledger_proof.rs
@@ -31,7 +31,7 @@ impl Index<usize> for LedgerProof {
 impl LedgerProof {
     pub fn validate(&self, witness: &BlockHeader) -> Result<(), Error> {
         // extract proof hashes and corresponding local root hash
-        let (hashes, local_root_hash) = match self {
+        let (hashes, expected) = match self {
             LedgerProof::StateRoot(hashes) => {
                 (hashes, *witness.deferred_state_root())
             }
@@ -44,19 +44,19 @@ impl LedgerProof {
         };
 
         // validate the number of hashes provided against local witness blame
+        let hash = witness.hash();
         let blame = witness.blame() as u64;
 
         if hashes.len() as u64 != blame + 1 {
-            info!(
-                "Invalid number of hashes provided: expected={}, received={}",
-                blame + 1,
-                hashes.len()
-            );
-            return Err(ErrorKind::InvalidLedgerProof.into());
+            bail!(ErrorKind::InvalidLedgerProofSize {
+                hash,
+                expected: blame + 1,
+                received: hashes.len() as u64
+            });
         }
 
         // compute witness deferred root hash from the hashes provided
-        let received_root_hash = match blame {
+        let received = match blame {
             0 => hashes[0],
             _ => {
                 let hashes = hashes.clone();
@@ -65,12 +65,12 @@ impl LedgerProof {
         };
 
         // validate against local witness deferred state root hash
-        if received_root_hash != local_root_hash {
-            info!(
-                "Witness root hash mismatch: local={:?}, received={:?}",
-                local_root_hash, received_root_hash
-            );
-            return Err(ErrorKind::InvalidLedgerProof.into());
+        if received != expected {
+            bail!(ErrorKind::InvalidWitnessRoot {
+                hash,
+                expected,
+                received,
+            });
         }
 
         Ok(())
diff --git a/core/src/light_protocol/handler/sync/common/sync_manager.rs b/core/src/light_protocol/handler/sync/common/sync_manager.rs
index e545835b07..f8eb9a4ca9 100644
--- a/core/src/light_protocol/handler/sync/common/sync_manager.rs
+++ b/core/src/light_protocol/handler/sync/common/sync_manager.rs
@@ -93,8 +93,10 @@ where
         match self.peers.get(peer) {
             Some(state) => Ok(state),
             None => {
-                error!("Received message from unknown peer={:?}", peer);
-                Err(ErrorKind::InternalError.into())
+                bail!(ErrorKind::InternalError(format!(
+                    "Received message from unknown peer={:?}",
+                    peer
+                )));
             }
         }
     }
@@ -123,7 +125,10 @@ where
             ThrottleResult::Success => Ok(id),
             ThrottleResult::Throttled(_) => Ok(id),
             ThrottleResult::AlreadyThrottled => {
-                Err(ErrorKind::UnexpectedResponse.into())
+                bail!(ErrorKind::UnexpectedResponse {
+                    expected: id,
+                    received: request_id,
+                });
             }
         }
     }
diff --git a/core/src/light_protocol/handler/sync/receipts.rs b/core/src/light_protocol/handler/sync/receipts.rs
index 2631ac10b8..0d1ccf0968 100644
--- a/core/src/light_protocol/handler/sync/receipts.rs
+++ b/core/src/light_protocol/handler/sync/receipts.rs
@@ -11,8 +11,8 @@ use std::{future::Future, sync::Arc};
 use crate::{
     light_protocol::{
         common::{FullPeerState, Peers},
+        error::*,
         message::{msgid, GetReceipts, ReceiptsWithEpoch},
-        Error, ErrorKind,
     },
     message::{Message, RequestId},
     network::NetworkContext,
@@ -29,6 +29,7 @@ use super::{
     witnesses::Witnesses,
 };
 use crate::verification::compute_receipts_root;
+use futures::future::FutureExt;
 use network::node_table::NodeId;
 
 #[derive(Debug)]
@@ -41,6 +42,8 @@ struct Statistics {
 // prioritize higher epochs
 type MissingReceipts = KeyOrdered<u64>;
 
+type PendingReceipts = PendingItem<Vec<BlockReceipts>, ClonableError>;
+
 pub struct Receipts {
     // series of unique request ids
     request_id_allocator: Arc<UniqueId>,
@@ -49,7 +52,7 @@ pub struct Receipts {
     sync_manager: SyncManager<u64, MissingReceipts>,
 
     // epoch receipts received from full node
-    verified: Arc<RwLock<LruCache<u64, PendingItem<Vec<BlockReceipts>>>>>,
+    verified: Arc<RwLock<LruCache<u64, PendingReceipts>>>,
 
     // witness sync manager
     witnesses: Arc<Witnesses>,
@@ -86,24 +89,32 @@ impl Receipts {
     #[inline]
     pub fn request(
         &self, epoch: u64,
-    ) -> impl Future<Output = Vec<BlockReceipts>> {
+    ) -> impl Future<Output = Result<Vec<BlockReceipts>>> {
+        let mut verified = self.verified.write();
+
         if epoch == 0 {
-            self.verified.write().insert(0, PendingItem::ready(vec![]));
+            verified.insert(0, PendingItem::ready(vec![]));
         }
 
-        if !self.verified.read().contains_key(&epoch) {
+        if !verified.contains_key(&epoch) {
             let missing = MissingReceipts::new(epoch);
             self.sync_manager.insert_waiting(std::iter::once(missing));
         }
 
+        verified
+            .entry(epoch)
+            .or_insert(PendingItem::pending())
+            .clear_error();
+
         FutureItem::new(epoch, self.verified.clone())
+            .map(|res| res.map_err(|e| e.into()))
     }
 
     #[inline]
     pub fn receive(
         &self, peer: &NodeId, id: RequestId,
         receipts: impl Iterator<Item = ReceiptsWithEpoch>,
-    ) -> Result<(), Error>
+    ) -> Result<()>
     {
         for ReceiptsWithEpoch {
             epoch,
@@ -127,9 +138,21 @@ impl Receipts {
     #[inline]
     pub fn validate_and_store(
         &self, epoch: u64, receipts: Vec<BlockReceipts>,
-    ) -> Result<(), Error> {
+    ) -> Result<()> {
         // validate receipts
-        self.validate_receipts(epoch, &receipts)?;
+        if let Err(e) = self.validate_receipts(epoch, &receipts) {
+            // forward error to both rpc caller(s) and sync handler
+            // so we need to make it clonable
+            let e = ClonableError::from(e);
+
+            self.verified
+                .write()
+                .entry(epoch)
+                .or_insert(PendingItem::pending())
+                .set_error(e.clone());
+
+            bail!(e);
+        }
 
         // store receipts by epoch
         self.verified
@@ -156,7 +179,7 @@ impl Receipts {
     #[inline]
     fn send_request(
         &self, io: &dyn NetworkContext, peer: &NodeId, epochs: Vec<u64>,
-    ) -> Result<Option<RequestId>, Error> {
+    ) -> Result<Option<RequestId>> {
         debug!("send_request peer={:?} epochs={:?}", peer, epochs);
 
         if epochs.is_empty() {
@@ -185,7 +208,7 @@ impl Receipts {
     #[inline]
     fn validate_receipts(
         &self, epoch: u64, receipts: &Vec<BlockReceipts>,
-    ) -> Result<(), Error> {
+    ) -> Result<()> {
         // calculate received receipts root
         // convert Vec<Vec<Receipt>> -> Vec<Arc<Vec<Receipt>>>
         // for API compatibility
@@ -198,24 +221,18 @@ impl Receipts {
         let received = compute_receipts_root(&rs);
 
         // retrieve local receipts root
-        let local = match self.witnesses.root_hashes_of(epoch) {
+        let expected = match self.witnesses.root_hashes_of(epoch) {
             Some((_, receipts_root, _)) => receipts_root,
-            None => {
-                warn!(
-                    "Receipt root not found, epoch={}, receipts={:?}",
-                    epoch, receipts
-                );
-                return Err(ErrorKind::InternalError.into());
-            }
+            None => bail!(ErrorKind::WitnessUnavailable { epoch }),
         };
 
         // check
-        if received != local {
-            warn!(
-                "Receipt validation failed, received={:?}, local={:?}",
-                received, local
-            );
-            return Err(ErrorKind::InvalidBloom.into());
+        if received != expected {
+            bail!(ErrorKind::InvalidReceipts {
+                epoch,
+                expected,
+                received,
+            });
         }
 
         Ok(())
diff --git a/core/src/light_protocol/handler/sync/state_entries.rs b/core/src/light_protocol/handler/sync/state_entries.rs
index 9951c95470..ac00f55dca 100644
--- a/core/src/light_protocol/handler/sync/state_entries.rs
+++ b/core/src/light_protocol/handler/sync/state_entries.rs
@@ -30,6 +30,7 @@ use super::{
     common::{FutureItem, PendingItem, SyncManager, TimeOrdered},
     state_roots::StateRoots,
 };
+use futures::future::FutureExt;
 use network::node_table::NodeId;
 use primitives::StorageKey;
 
@@ -44,6 +45,8 @@ struct Statistics {
 
 type MissingStateEntry = TimeOrdered<StateKey>;
 
+type PendingStateEntry = PendingItem<StateEntry, ClonableError>;
+
 pub struct StateEntries {
     // series of unique request ids
     request_id_allocator: Arc<UniqueId>,
@@ -55,7 +58,7 @@ pub struct StateEntries {
     sync_manager: SyncManager<StateKey, MissingStateEntry>,
 
     // state entries received from full node
-    verified: Arc<RwLock<LruCache<StateKey, PendingItem<StateEntry>>>>,
+    verified: Arc<RwLock<LruCache<StateKey, PendingStateEntry>>>,
 }
 
 impl StateEntries {
@@ -90,10 +93,11 @@ impl StateEntries {
     #[inline]
     pub fn request_now(
         &self, io: &dyn NetworkContext, epoch: u64, key: Vec<u8>,
-    ) -> impl Future<Output = StateEntry> {
+    ) -> impl Future<Output = Result<StateEntry>> {
+        let mut verified = self.verified.write();
         let key = StateKey { epoch, key };
 
-        if !self.verified.read().contains_key(&key) {
+        if !verified.contains_key(&key) {
             let missing = std::iter::once(MissingStateEntry::new(key.clone()));
 
             self.sync_manager.request_now(missing, |peer, keys| {
@@ -101,7 +105,13 @@ impl StateEntries {
             });
         }
 
+        verified
+            .entry(key.clone())
+            .or_insert(PendingItem::pending())
+            .clear_error();
+
         FutureItem::new(key, self.verified.clone())
+            .map(|res| res.map_err(|e| e.into()))
     }
 
     #[inline]
@@ -111,7 +121,10 @@ impl StateEntries {
     ) -> Result<()>
     {
         for StateEntryWithKey { key, entry, proof } in entries {
-            debug!("Validating state entry {:?} with key {:?}", entry, key);
+            debug!(
+                "Validating state entry {:?} with key {:?} and proof {:?}",
+                entry, key, proof
+            );
 
             match self.sync_manager.check_if_requested(peer, id, &key)? {
                 None => continue,
@@ -127,7 +140,21 @@ impl StateEntries {
         &self, key: StateKey, entry: Option<Vec<u8>>, proof: StateEntryProof,
     ) -> Result<()> {
         // validate state entry
-        self.validate_state_entry(key.epoch, &key.key, &entry, proof)?;
+        if let Err(e) =
+            self.validate_state_entry(key.epoch, &key.key, &entry, proof)
+        {
+            // forward error to both rpc caller(s) and sync handler
+            // so we need to make it clonable
+            let e = ClonableError::from(e);
+
+            self.verified
+                .write()
+                .entry(key.clone())
+                .or_insert(PendingItem::pending())
+                .set_error(e.clone());
+
+            bail!(e);
+        }
 
         // store state entry by state key
         self.verified
@@ -192,10 +219,11 @@ impl StateEntries {
 
         self.state_roots
             .validate_state_root(epoch, &state_root)
-            .chain_err(|| {
-                ErrorKind::InvalidStateProof(
-                    "Validation of current state root failed",
-                )
+            .chain_err(|| ErrorKind::InvalidStateProof {
+                epoch,
+                key: key.clone(),
+                value: value.clone(),
+                reason: "Validation of current state root failed",
             })?;
 
         // validate previous state root
@@ -203,10 +231,11 @@ impl StateEntries {
 
         self.state_roots
             .validate_prev_snapshot_state_root(epoch, &maybe_prev_root)
-            .chain_err(|| {
-                ErrorKind::InvalidStateProof(
-                    "Validation of previous state root failed",
-                )
+            .chain_err(|| ErrorKind::InvalidStateProof {
+                epoch,
+                key: key.clone(),
+                value: value.clone(),
+                reason: "Validation of previous state root failed",
             })?;
 
         // construct padding
@@ -224,11 +253,12 @@ impl StateEntries {
             state_root,
             maybe_intermediate_padding,
         ) {
-            warn!("Invalid state proof for {:?} under key {:?}", value, key);
-            return Err(ErrorKind::InvalidStateProof(
-                "State proof validation failed",
-            )
-            .into());
+            bail!(ErrorKind::InvalidStateProof {
+                epoch,
+                key: key.clone(),
+                value: value.clone(),
+                reason: "Validation of merkle proof failed",
+            });
         }
 
         Ok(())
diff --git a/core/src/light_protocol/handler/sync/state_roots.rs b/core/src/light_protocol/handler/sync/state_roots.rs
index a46d1fdc7c..0373a5ed5f 100644
--- a/core/src/light_protocol/handler/sync/state_roots.rs
+++ b/core/src/light_protocol/handler/sync/state_roots.rs
@@ -12,8 +12,8 @@ use std::{future::Future, sync::Arc};
 use crate::{
     light_protocol::{
         common::{FullPeerState, Peers},
+        error::*,
         message::{msgid, GetStateRoots, StateRootWithEpoch},
-        Error, ErrorKind,
     },
     message::{Message, RequestId},
     network::NetworkContext,
@@ -28,6 +28,7 @@ use super::{
     common::{FutureItem, PendingItem, SyncManager, TimeOrdered},
     witnesses::Witnesses,
 };
+use futures::future::FutureExt;
 use network::node_table::NodeId;
 
 #[derive(Debug)]
@@ -39,6 +40,8 @@ struct Statistics {
 
 type MissingStateRoot = TimeOrdered<u64>;
 
+type PendingStateRoot = PendingItem<StateRoot, ClonableError>;
+
 pub struct StateRoots {
     // series of unique request ids
     request_id_allocator: Arc<UniqueId>,
@@ -50,7 +53,7 @@ pub struct StateRoots {
     sync_manager: SyncManager<u64, MissingStateRoot>,
 
     // bloom filters received from full node
-    verified: Arc<RwLock<LruCache<u64, PendingItem<StateRoot>>>>,
+    verified: Arc<RwLock<LruCache<u64, PendingStateRoot>>>,
 
     // witness sync manager
     witnesses: Arc<Witnesses>,
@@ -98,8 +101,10 @@ impl StateRoots {
     #[inline]
     pub fn request_now(
         &self, io: &dyn NetworkContext, epoch: u64,
-    ) -> impl Future<Output = StateRoot> {
-        if !self.verified.read().contains_key(&epoch) {
+    ) -> impl Future<Output = Result<StateRoot>> {
+        let mut verified = self.verified.write();
+
+        if !verified.contains_key(&epoch) {
             let missing = std::iter::once(MissingStateRoot::new(epoch));
 
             self.sync_manager.request_now(missing, |peer, epochs| {
@@ -107,14 +112,20 @@ impl StateRoots {
             });
         }
 
+        verified
+            .entry(epoch)
+            .or_insert(PendingItem::pending())
+            .clear_error();
+
         FutureItem::new(epoch, self.verified.clone())
+            .map(|res| res.map_err(|e| e.into()))
     }
 
     #[inline]
     pub fn receive(
         &self, peer: &NodeId, id: RequestId,
         state_roots: impl Iterator<Item = StateRootWithEpoch>,
-    ) -> Result<(), Error>
+    ) -> Result<()>
     {
         for StateRootWithEpoch { epoch, state_root } in state_roots {
             debug!(
@@ -134,9 +145,21 @@ impl StateRoots {
     #[inline]
     pub fn validate_and_store(
         &self, epoch: u64, state_root: StateRoot,
-    ) -> Result<(), Error> {
+    ) -> Result<()> {
         // validate state root
-        self.validate_state_root(epoch, &state_root)?;
+        if let Err(e) = self.validate_state_root(epoch, &state_root) {
+            // forward error to both rpc caller(s) and sync handler
+            // so we need to make it clonable
+            let e = ClonableError::from(e);
+
+            self.verified
+                .write()
+                .entry(epoch)
+                .or_insert(PendingItem::pending())
+                .set_error(e.clone());
+
+            bail!(e);
+        }
 
         // store state root by epoch
         self.verified
@@ -163,7 +186,7 @@ impl StateRoots {
     #[inline]
     fn send_request(
         &self, io: &dyn NetworkContext, peer: &NodeId, epochs: Vec<u64>,
-    ) -> Result<Option<RequestId>, Error> {
+    ) -> Result<Option<RequestId>> {
         debug!("send_request peer={:?} epochs={:?}", peer, epochs);
 
         if epochs.is_empty() {
@@ -192,29 +215,23 @@ impl StateRoots {
     #[inline]
     pub fn validate_state_root(
         &self, epoch: u64, state_root: &StateRoot,
-    ) -> Result<(), Error> {
+    ) -> Result<()> {
         // calculate received state root hash
         let received = state_root.compute_state_root_hash();
 
         // retrieve local state root hash
-        let local = match self.witnesses.root_hashes_of(epoch) {
+        let expected = match self.witnesses.root_hashes_of(epoch) {
             Some((state_root, _, _)) => state_root,
-            None => {
-                warn!(
-                    "State root hash not found, epoch={}, state_root={:?}",
-                    epoch, state_root
-                );
-                return Err(ErrorKind::InternalError.into());
-            }
+            None => bail!(ErrorKind::WitnessUnavailable { epoch }),
         };
 
         // check
-        if received != local {
-            warn!(
-                "State root validation failed, received={:?}, local={:?}",
-                received, local
-            );
-            return Err(ErrorKind::InvalidStateRoot.into());
+        if received != expected {
+            bail!(ErrorKind::InvalidStateRoot {
+                epoch,
+                expected,
+                received,
+            });
         }
 
         Ok(())
@@ -224,25 +241,38 @@ impl StateRoots {
     pub fn validate_prev_snapshot_state_root(
         &self, current_epoch: u64,
         maybe_prev_snapshot_state_root: &Option<StateRoot>,
-    ) -> Result<(), Error>
+    ) -> Result<()>
     {
+        let snapshot_epoch_count = self.snapshot_epoch_count;
+
         match maybe_prev_snapshot_state_root {
             Some(ref root) => {
                 // root provided for non-existent epoch
-                if current_epoch <= self.snapshot_epoch_count {
-                    return Err(ErrorKind::InvalidStateRoot.into());
+                if current_epoch <= snapshot_epoch_count {
+                    // previous root should not have been provided
+                    // for the first snapshot period
+                    bail!(ErrorKind::InvalidPreviousStateRoot {
+                        current_epoch,
+                        snapshot_epoch_count,
+                        root: maybe_prev_snapshot_state_root.clone()
+                    });
                 }
 
                 // root provided for previous snapshot
                 self.validate_state_root(
-                    current_epoch - self.snapshot_epoch_count,
+                    current_epoch - snapshot_epoch_count,
                     &root,
                 )?;
             }
             None => {
-                // root not provided even though previous snapshot exists
-                if current_epoch > self.snapshot_epoch_count {
-                    return Err(ErrorKind::InvalidStateRoot.into());
+                if current_epoch > snapshot_epoch_count {
+                    // previous root should have been provided
+                    // for subsequent snapshot periods
+                    bail!(ErrorKind::InvalidPreviousStateRoot {
+                        current_epoch,
+                        snapshot_epoch_count,
+                        root: maybe_prev_snapshot_state_root.clone()
+                    });
                 }
             }
         }
diff --git a/core/src/light_protocol/handler/sync/storage_roots.rs b/core/src/light_protocol/handler/sync/storage_roots.rs
index 7ca84c7828..145e06fb6b 100644
--- a/core/src/light_protocol/handler/sync/storage_roots.rs
+++ b/core/src/light_protocol/handler/sync/storage_roots.rs
@@ -31,9 +31,9 @@ use super::{
     state_roots::StateRoots,
 };
 use cfx_types::H160;
+use futures::future::FutureExt;
 use network::node_table::NodeId;
 use primitives::{StorageKey, StorageRoot};
-
 #[derive(Debug)]
 struct Statistics {
     cached: usize,
@@ -43,6 +43,8 @@ struct Statistics {
 
 type MissingStorageRoot = TimeOrdered<StorageRootKey>;
 
+type PendingStorageRoot = PendingItem<Option<StorageRoot>, ClonableError>;
+
 pub struct StorageRoots {
     // series of unique request ids
     request_id_allocator: Arc<UniqueId>,
@@ -54,8 +56,7 @@ pub struct StorageRoots {
     sync_manager: SyncManager<StorageRootKey, MissingStorageRoot>,
 
     // state entries received from full node
-    verified:
-        Arc<RwLock<LruCache<StorageRootKey, PendingItem<Option<StorageRoot>>>>>,
+    verified: Arc<RwLock<LruCache<StorageRootKey, PendingStorageRoot>>>,
 }
 
 impl StorageRoots {
@@ -90,10 +91,11 @@ impl StorageRoots {
     #[inline]
     pub fn request_now(
         &self, io: &dyn NetworkContext, epoch: u64, address: H160,
-    ) -> impl Future<Output = Option<StorageRoot>> {
+    ) -> impl Future<Output = Result<Option<StorageRoot>>> {
+        let mut verified = self.verified.write();
         let key = StorageRootKey { epoch, address };
 
-        if !self.verified.read().contains_key(&key) {
+        if !verified.contains_key(&key) {
             let missing = std::iter::once(MissingStorageRoot::new(key.clone()));
 
             self.sync_manager.request_now(missing, |peer, keys| {
@@ -101,7 +103,13 @@ impl StorageRoots {
             });
         }
 
+        verified
+            .entry(key.clone())
+            .or_insert(PendingItem::pending())
+            .clear_error();
+
         FutureItem::new(key, self.verified.clone())
+            .map(|res| res.map_err(|e| e.into()))
     }
 
     #[inline]
@@ -129,7 +137,21 @@ impl StorageRoots {
     ) -> Result<()>
     {
         // validate storage root
-        self.validate_storage_root(key.epoch, &key.address, &root, proof)?;
+        if let Err(e) =
+            self.validate_storage_root(key.epoch, key.address, &root, proof)
+        {
+            // forward error to both rpc caller(s) and sync handler
+            // so we need to make it clonable
+            let e = ClonableError::from(e);
+
+            self.verified
+                .write()
+                .entry(key.clone())
+                .or_insert(PendingItem::pending())
+                .set_error(e.clone());
+
+            bail!(e);
+        }
 
         // store storage root by storage root key
         self.verified
@@ -187,7 +209,7 @@ impl StorageRoots {
 
     #[inline]
     fn validate_storage_root(
-        &self, epoch: u64, address: &H160, storage_root: &Option<StorageRoot>,
+        &self, epoch: u64, address: H160, storage_root: &Option<StorageRoot>,
         proof: StorageRootProof,
     ) -> Result<()>
     {
@@ -196,10 +218,10 @@ impl StorageRoots {
 
         self.state_roots
             .validate_state_root(epoch, &state_root)
-            .chain_err(|| {
-                ErrorKind::InvalidStorageRootProof(
-                    "Validation of current state root failed",
-                )
+            .chain_err(|| ErrorKind::InvalidStorageRootProof {
+                epoch,
+                address,
+                reason: "Validation of current state root failed",
             })?;
 
         // validate previous state root
@@ -207,10 +229,10 @@ impl StorageRoots {
 
         self.state_roots
             .validate_prev_snapshot_state_root(epoch, &maybe_prev_root)
-            .chain_err(|| {
-                ErrorKind::InvalidStorageRootProof(
-                    "Validation of previous state root failed",
-                )
+            .chain_err(|| ErrorKind::InvalidStorageRootProof {
+                epoch,
+                address,
+                reason: "Validation of previous state root failed",
             })?;
 
         // construct padding
@@ -230,14 +252,11 @@ impl StorageRoots {
             state_root,
             maybe_intermediate_padding,
         ) {
-            warn!(
-                "Invalid storage root proof for {:?} under key {:?}",
-                storage_root, key
-            );
-            return Err(ErrorKind::InvalidStorageRootProof(
-                "Validation of merkle proof failed",
-            )
-            .into());
+            bail!(ErrorKind::InvalidStorageRootProof {
+                epoch,
+                address,
+                reason: "Validation of merkle proof failed",
+            });
         }
 
         Ok(())
diff --git a/core/src/light_protocol/handler/sync/tx_infos.rs b/core/src/light_protocol/handler/sync/tx_infos.rs
index 5b92480801..82e6d2f8cf 100644
--- a/core/src/light_protocol/handler/sync/tx_infos.rs
+++ b/core/src/light_protocol/handler/sync/tx_infos.rs
@@ -18,8 +18,8 @@ use crate::{
     consensus::SharedConsensusGraph,
     light_protocol::{
         common::{FullPeerState, LedgerInfo, Peers},
+        error::*,
         message::{msgid, GetTxInfos, TxInfo},
-        Error, ErrorKind,
     },
     message::{Message, RequestId},
     network::NetworkContext,
@@ -32,6 +32,7 @@ use crate::{
     },
     UniqueId,
 };
+use futures::future::FutureExt;
 use network::node_table::NodeId;
 
 #[derive(Debug)]
@@ -47,6 +48,8 @@ type MissingTxInfo = TimeOrdered<H256>;
 // FIXME: struct
 pub type TxInfoValidated = (SignedTransaction, Receipt, TransactionIndex, U256);
 
+type PendingTxInfo = PendingItem<TxInfoValidated, ClonableError>;
+
 pub struct TxInfos {
     // helper API for retrieving ledger information
     ledger: LedgerInfo,
@@ -58,7 +61,7 @@ pub struct TxInfos {
     sync_manager: SyncManager<H256, MissingTxInfo>,
 
     // block txs received from full node
-    verified: Arc<RwLock<LruCache<H256, PendingItem<TxInfoValidated>>>>,
+    verified: Arc<RwLock<LruCache<H256, PendingTxInfo>>>,
 
     // witness sync manager
     pub witnesses: Arc<Witnesses>,
@@ -97,8 +100,10 @@ impl TxInfos {
     #[inline]
     pub fn request_now(
         &self, io: &dyn NetworkContext, hash: H256,
-    ) -> impl Future<Output = TxInfoValidated> {
-        if !self.verified.read().contains_key(&hash) {
+    ) -> impl Future<Output = Result<TxInfoValidated>> {
+        let mut verified = self.verified.write();
+
+        if !verified.contains_key(&hash) {
             let missing = std::iter::once(MissingTxInfo::new(hash));
 
             self.sync_manager.request_now(missing, |peer, hashes| {
@@ -106,14 +111,20 @@ impl TxInfos {
             });
         }
 
+        verified
+            .entry(hash)
+            .or_insert(PendingItem::pending())
+            .clear_error();
+
         FutureItem::new(hash, self.verified.clone())
+            .map(|res| res.map_err(|e| e.into()))
     }
 
     #[inline]
     pub fn receive(
         &self, peer: &NodeId, id: RequestId,
         infos: impl Iterator<Item = TxInfo>,
-    ) -> Result<(), Error>
+    ) -> Result<()>
     {
         for info in infos {
             debug!("Validating tx_info {:?}", info);
@@ -132,7 +143,29 @@ impl TxInfos {
     }
 
     #[inline]
-    pub fn validate_and_store(&self, info: TxInfo) -> Result<(), Error> {
+    fn validate_and_store(&self, info: TxInfo) -> Result<()> {
+        let tx_hash = info.tx.hash();
+
+        // validate bloom
+        if let Err(e) = self.validate_and_store_tx_info(info) {
+            // forward error to both rpc caller(s) and sync handler
+            // so we need to make it clonable
+            let e = ClonableError::from(e);
+
+            self.verified
+                .write()
+                .entry(tx_hash)
+                .or_insert(PendingItem::pending())
+                .set_error(e.clone());
+
+            bail!(e);
+        }
+
+        Ok(())
+    }
+
+    #[inline]
+    fn validate_and_store_tx_info(&self, info: TxInfo) -> Result<()> {
         let TxInfo {
             epoch,
 
@@ -156,35 +189,41 @@ impl TxInfos {
 
         // quick check for well-formedness
         if block_index_in_epoch >= num_blocks_in_epoch {
-            debug!(
-                "Inconsisent block index: {} >= {}",
-                block_index_in_epoch, num_blocks_in_epoch
-            );
-            return Err(ErrorKind::InvalidTxInfo.into());
+            bail!(ErrorKind::InvalidTxInfo {
+                reason: format!(
+                    "Inconsisent block index: {} >= {}",
+                    block_index_in_epoch, num_blocks_in_epoch
+                )
+            });
         }
 
         if tx_index_in_block >= num_txs_in_block {
-            debug!(
-                "Inconsisent tx index: {} >= {}",
-                tx_index_in_block, num_txs_in_block
-            );
-            return Err(ErrorKind::InvalidTxInfo.into());
+            bail!(ErrorKind::InvalidTxInfo {
+                reason: format!(
+                    "Inconsisent tx index: {} >= {}",
+                    tx_index_in_block, num_txs_in_block
+                )
+            });
         }
 
         // only executed instances of the transaction are acceptable;
         // receipts belonging to non-executed instances should not be sent
         if receipt.outcome_status != 0 && receipt.outcome_status != 1 {
-            debug!(
-                "Unexpected outcome status in tx info: {}",
-                receipt.outcome_status
-            );
-            return Err(ErrorKind::InvalidTxInfo.into());
+            bail!(ErrorKind::InvalidTxInfo {
+                reason: format!(
+                    "Unexpected outcome status in tx info: {}",
+                    receipt.outcome_status
+                )
+            });
         }
 
         let block_hash = match self.ledger.block_hashes_in(epoch)? {
             hs if hs.len() != num_blocks_in_epoch => {
-                debug!("Number of blocks in epoch mismatch: local = {}, received = {}", hs.len(), num_blocks_in_epoch);
-                return Err(ErrorKind::InvalidTxInfo.into());
+                bail!(ErrorKind::InvalidTxInfo {
+                    reason: format!(
+                        "Number of blocks in epoch mismatch: local = {}, received = {}",
+                        hs.len(), num_blocks_in_epoch),
+                });
             }
             hs => hs[block_index_in_epoch],
         };
@@ -215,20 +254,16 @@ impl TxInfos {
             tx_hash,
             &tx_proof,
         ) {
-            debug!("Transaction proof verification failed");
-            return Err(ErrorKind::InvalidTxInfo.into());
+            bail!(ErrorKind::InvalidTxInfo {
+                reason: "Transaction proof verification failed".to_owned()
+            });
         }
 
         // verify receipt proof
         let verified_epoch_receipts_root =
             match self.witnesses.root_hashes_of(epoch) {
                 Some((_, receipts_root, _)) => receipts_root,
-                None => {
-                    // TODO(thegaram): signal to RPC layer that the
-                    // corresponding roots are not available yet
-                    warn!("Receipt root not found, epoch={}", epoch,);
-                    return Err(ErrorKind::InternalError.into());
-                }
+                None => bail!(ErrorKind::WitnessUnavailable { epoch }),
             };
 
         trace!(
@@ -261,8 +296,9 @@ impl TxInfos {
             &receipt,
             &receipt_proof,
         ) {
-            debug!("Receipt proof verification failed");
-            return Err(ErrorKind::InvalidTxInfo.into());
+            bail!(ErrorKind::InvalidTxInfo {
+                reason: "Receipt proof verification failed".to_owned()
+            });
         }
 
         // find prior gas used
@@ -288,8 +324,10 @@ impl TxInfos {
                     &prev_receipt,
                     &prev_receipt_proof,
                 ) {
-                    debug!("Receipt proof verification failed");
-                    return Err(ErrorKind::InvalidTxInfo.into());
+                    bail!(ErrorKind::InvalidTxInfo {
+                        reason: "Previous receipt proof verification failed"
+                            .to_owned()
+                    });
                 }
 
                 prev_receipt.accumulated_gas_used
@@ -297,17 +335,17 @@ impl TxInfos {
 
             // not the first receipt but no previous receipt was provided
             (_, maybe_prev_receipt, maybe_prev_receipt_proof) => {
-                debug!(
-                    "Expected two receipts; received one.
-                    tx_index_in_block = {:?},
-                    maybe_prev_receipt = {:?},
-                    maybe_prev_receipt_proof = {:?}",
-                    tx_index_in_block,
-                    maybe_prev_receipt,
-                    maybe_prev_receipt_proof
-                );
-
-                return Err(ErrorKind::InvalidTxInfo.into());
+                bail!(ErrorKind::InvalidTxInfo {
+                    reason: format!(
+                        "Expected two receipts; received one.
+                        tx_index_in_block = {:?},
+                        maybe_prev_receipt = {:?},
+                        maybe_prev_receipt_proof = {:?}",
+                        tx_index_in_block,
+                        maybe_prev_receipt,
+                        maybe_prev_receipt_proof
+                    )
+                });
             }
         };
 
@@ -336,7 +374,7 @@ impl TxInfos {
     #[inline]
     fn send_request(
         &self, io: &dyn NetworkContext, peer: &NodeId, hashes: Vec<H256>,
-    ) -> Result<Option<RequestId>, Error> {
+    ) -> Result<Option<RequestId>> {
         debug!("send_request peer={:?} hashes={:?}", peer, hashes);
 
         if hashes.is_empty() {
diff --git a/core/src/light_protocol/handler/sync/txs.rs b/core/src/light_protocol/handler/sync/txs.rs
index 33d655492f..606f4ab918 100644
--- a/core/src/light_protocol/handler/sync/txs.rs
+++ b/core/src/light_protocol/handler/sync/txs.rs
@@ -13,8 +13,8 @@ use std::{future::Future, sync::Arc};
 use crate::{
     light_protocol::{
         common::{FullPeerState, Peers},
+        error::*,
         message::{msgid, GetTxs},
-        Error, ErrorKind,
     },
     message::{Message, RequestId},
     network::NetworkContext,
@@ -26,6 +26,7 @@ use crate::{
 };
 
 use super::common::{FutureItem, PendingItem, SyncManager, TimeOrdered};
+use futures::future::FutureExt;
 use network::node_table::NodeId;
 
 #[derive(Debug)]
@@ -38,6 +39,8 @@ struct Statistics {
 // prioritize earlier requests
 type MissingTx = TimeOrdered<H256>;
 
+type PendingTx = PendingItem<SignedTransaction, ClonableError>;
+
 pub struct Txs {
     // series of unique request ids
     request_id_allocator: Arc<UniqueId>,
@@ -46,7 +49,7 @@ pub struct Txs {
     sync_manager: SyncManager<H256, MissingTx>,
 
     // txs received from full node
-    verified: Arc<RwLock<LruCache<H256, PendingItem<SignedTransaction>>>>,
+    verified: Arc<RwLock<LruCache<H256, PendingTx>>>,
 }
 
 impl Txs {
@@ -77,8 +80,10 @@ impl Txs {
     #[inline]
     pub fn request_now(
         &self, io: &dyn NetworkContext, hash: H256,
-    ) -> impl Future<Output = SignedTransaction> {
-        if !self.verified.read().contains_key(&hash) {
+    ) -> impl Future<Output = Result<SignedTransaction>> {
+        let mut verified = self.verified.write();
+
+        if !verified.contains_key(&hash) {
             let missing = std::iter::once(MissingTx::new(hash));
 
             self.sync_manager.request_now(missing, |peer, hashes| {
@@ -86,14 +91,20 @@ impl Txs {
             });
         }
 
+        verified
+            .entry(hash)
+            .or_insert(PendingItem::pending())
+            .clear_error();
+
         FutureItem::new(hash, self.verified.clone())
+            .map(|res| res.map_err(|e| e.into()))
     }
 
     #[inline]
     pub fn receive(
         &self, peer: &NodeId, id: RequestId,
         txs: impl Iterator<Item = SignedTransaction>,
-    ) -> Result<(), Error>
+    ) -> Result<()>
     {
         for tx in txs {
             let hash = tx.hash();
@@ -109,11 +120,23 @@ impl Txs {
     }
 
     #[inline]
-    pub fn validate_and_store(
-        &self, tx: SignedTransaction,
-    ) -> Result<(), Error> {
+    fn validate_and_store(&self, tx: SignedTransaction) -> Result<()> {
         let hash = tx.hash();
-        self.validate_tx(&tx)?;
+
+        // validate tx
+        if let Err(e) = self.validate_tx(&tx) {
+            // forward error to both rpc caller(s) and sync handler
+            // so we need to make it clonable
+            let e = ClonableError::from(e);
+
+            self.verified
+                .write()
+                .entry(hash)
+                .or_insert(PendingItem::pending())
+                .set_error(e.clone());
+
+            bail!(e);
+        }
 
         self.verified
             .write()
@@ -139,7 +162,7 @@ impl Txs {
     #[inline]
     fn send_request(
         &self, io: &dyn NetworkContext, peer: &NodeId, hashes: Vec<H256>,
-    ) -> Result<Option<RequestId>, Error> {
+    ) -> Result<Option<RequestId>> {
         debug!("send_request peer={:?} hashes={:?}", peer, hashes);
 
         if hashes.is_empty() {
@@ -165,12 +188,12 @@ impl Txs {
     }
 
     #[inline]
-    fn validate_tx(&self, tx: &SignedTransaction) -> Result<(), Error> {
+    pub fn validate_tx(&self, tx: &SignedTransaction) -> Result<()> {
         match tx.verify_public(false /* skip */) {
             Ok(true) => {}
             _ => {
                 warn!("Tx signature verification failed for {:?}", tx);
-                return Err(ErrorKind::InvalidTxSignature.into());
+                bail!(ErrorKind::InvalidTxSignature { hash: tx.hash() });
             }
         }
 
diff --git a/core/src/light_protocol/handler/sync/witnesses.rs b/core/src/light_protocol/handler/sync/witnesses.rs
index c518860952..b97709ee12 100644
--- a/core/src/light_protocol/handler/sync/witnesses.rs
+++ b/core/src/light_protocol/handler/sync/witnesses.rs
@@ -303,7 +303,10 @@ impl Witnesses {
                 Some(w) => w,
                 None => {
                     warn!("Unable to get witness!");
-                    return Err(ErrorKind::InternalError.into());
+                    bail!(ErrorKind::InternalError(format!(
+                        "Witness at height {} is not available",
+                        height
+                    )));
                 }
             };
 
diff --git a/core/src/light_protocol/provider.rs b/core/src/light_protocol/provider.rs
index 47b2a2353f..70ee80c3ac 100644
--- a/core/src/light_protocol/provider.rs
+++ b/core/src/light_protocol/provider.rs
@@ -17,6 +17,7 @@ use crate::{
             partition_results, validate_chain_id, LedgerInfo, LightPeerState,
             Peers,
         },
+        error::*,
         handle_error,
         message::{
             msgid, BlockHashes as GetBlockHashesResponse,
@@ -36,9 +37,8 @@ use crate::{
             TxInfos as GetTxInfosResponse, Txs as GetTxsResponse,
             WitnessInfo as GetWitnessInfoResponse,
         },
-        Error, ErrorKind, LIGHT_PROTOCOL_ID,
-        LIGHT_PROTOCOL_OLD_VERSIONS_TO_SUPPORT, LIGHT_PROTOCOL_VERSION,
-        LIGHT_PROTO_V1,
+        LIGHT_PROTOCOL_ID, LIGHT_PROTOCOL_OLD_VERSIONS_TO_SUPPORT,
+        LIGHT_PROTOCOL_VERSION, LIGHT_PROTO_V1,
     },
     message::{decode_msg, decode_rlp_and_check_deprecation, Message, MsgId},
     network::{
@@ -121,7 +121,7 @@ impl Provider {
 
     pub fn register(
         self: &Arc<Self>, network: Arc<NetworkService>,
-    ) -> Result<(), String> {
+    ) -> std::result::Result<(), String> {
         network
             .register_protocol(
                 self.clone(),
@@ -136,27 +136,27 @@ impl Provider {
     #[inline]
     fn get_existing_peer_state(
         &self, peer: &NodeId,
-    ) -> Result<Arc<RwLock<LightPeerState>>, Error> {
+    ) -> Result<Arc<RwLock<LightPeerState>>> {
         match self.peers.get(peer) {
             Some(state) => Ok(state),
             None => {
                 // NOTE: this should not happen as we register
                 // all peers in `on_peer_connected`
-                error!("Received message from unknown peer={:?}", peer);
-                bail!(ErrorKind::InternalError)
+                bail!(ErrorKind::InternalError(format!(
+                    "Received message from unknown peer={:?}",
+                    peer
+                )))
             }
         }
     }
 
     #[inline]
-    fn peer_version(&self, peer: &NodeId) -> Result<ProtocolVersion, Error> {
+    fn peer_version(&self, peer: &NodeId) -> Result<ProtocolVersion> {
         Ok(self.get_existing_peer_state(peer)?.read().protocol_version)
     }
 
     #[inline]
-    fn validate_peer_state(
-        &self, peer: &NodeId, msg_id: MsgId,
-    ) -> Result<(), Error> {
+    fn validate_peer_state(&self, peer: &NodeId, msg_id: MsgId) -> Result<()> {
         let state = self.get_existing_peer_state(&peer)?;
 
         if msg_id != msgid::STATUS_PING_DEPRECATED
@@ -164,7 +164,13 @@ impl Provider {
             && !state.read().handshake_completed
         {
             warn!("Received msg={:?} from handshaking peer={:?}", msg_id, peer);
-            bail!(ErrorKind::UnexpectedMessage);
+            bail!(ErrorKind::UnexpectedMessage {
+                expected: vec![
+                    msgid::STATUS_PING_DEPRECATED,
+                    msgid::STATUS_PING_V2
+                ],
+                received: msg_id,
+            });
         }
 
         Ok(())
@@ -173,7 +179,7 @@ impl Provider {
     #[rustfmt::skip]
     fn dispatch_message(
         &self, io: &dyn NetworkContext, peer: &NodeId, msg_id: MsgId, rlp: Rlp,
-    ) -> Result<(), Error> {
+    ) -> Result<()> {
         trace!("Dispatching message: peer={:?}, msg_id={:?}", peer, msg_id);
         self.validate_peer_state(peer, msg_id)?;
         let min_supported_ver = self.minimum_supported_version();
@@ -194,7 +200,7 @@ impl Provider {
             msgid::GET_BLOCK_TXS => self.on_get_block_txs(io, peer, decode_rlp_and_check_deprecation(&rlp, min_supported_ver, protocol)?),
             msgid::GET_TX_INFOS => self.on_get_tx_infos(io, peer, decode_rlp_and_check_deprecation(&rlp, min_supported_ver, protocol)?),
             msgid::GET_STORAGE_ROOTS => self.on_get_storage_roots(io, peer, decode_rlp_and_check_deprecation(&rlp, min_supported_ver, protocol)?),
-            _ => bail!(ErrorKind::UnknownMessage),
+            _ => bail!(ErrorKind::UnknownMessage{id: msg_id}),
         }
     }
 
@@ -218,20 +224,18 @@ impl Provider {
     }
 
     #[inline]
-    fn tx_info_by_hash(&self, hash: H256) -> Result<TxInfo, Error> {
+    fn tx_info_by_hash(&self, hash: H256) -> Result<TxInfo> {
         let (tx, tx_index, receipt) =
             match self.consensus.get_transaction_info_by_hash(&hash) {
                 None => {
-                    bail!(ErrorKind::UnableToProduceTxInfo(format!(
-                        "Unable to get tx info for {:?}",
-                        hash
-                    )));
+                    bail!(ErrorKind::UnableToProduceTxInfo {
+                        reason: format!("Unable to get tx info for {:?}", hash)
+                    });
                 }
                 Some((_, _, None)) => {
-                    bail!(ErrorKind::UnableToProduceTxInfo(format!(
-                        "Unable to get receipt for {:?}",
-                        hash
-                    )));
+                    bail!(ErrorKind::UnableToProduceTxInfo {
+                        reason: format!("Unable to get receipt for {:?}", hash)
+                    });
                 }
                 Some((tx, tx_index, Some((receipt, _)))) => {
                     assert_eq!(tx.hash(), hash); // sanity check
@@ -240,17 +244,7 @@ impl Provider {
             };
 
         let block_hash = tx_index.block_hash;
-
-        let block = match self.ledger.block(block_hash) {
-            Ok(b) => b,
-            Err(e) => {
-                bail!(ErrorKind::UnableToProduceTxInfo(format!(
-                    "Unable to retrieve block {:?}: {}",
-                    block_hash, e
-                )));
-            }
-        };
-
+        let block = self.ledger.block(block_hash)?;
         let tx_index_in_block = tx_index.index;
         let num_txs_in_block = block.transactions.len();
 
@@ -260,20 +254,24 @@ impl Provider {
         let epoch = match self.consensus.get_block_epoch_number(&block_hash) {
             Some(epoch) => epoch,
             None => {
-                bail!(ErrorKind::UnableToProduceTxInfo(format!(
-                    "Unable to get epoch number for block {:?}",
-                    block_hash
-                )));
+                bail!(ErrorKind::UnableToProduceTxInfo {
+                    reason: format!(
+                        "Unable to get epoch number for block {:?}",
+                        block_hash
+                    )
+                });
             }
         };
 
         let epoch_hashes = match self.ledger.block_hashes_in(epoch) {
             Ok(hs) => hs,
             Err(e) => {
-                bail!(ErrorKind::UnableToProduceTxInfo(format!(
-                    "Unable to find epoch hashes for {}: {}",
-                    epoch, e
-                )));
+                bail!(ErrorKind::UnableToProduceTxInfo {
+                    reason: format!(
+                        "Unable to find epoch hashes for {}: {}",
+                        epoch, e
+                    )
+                });
             }
         };
 
@@ -283,22 +281,22 @@ impl Provider {
             match epoch_hashes.iter().position(|h| *h == block_hash) {
                 Some(id) => id,
                 None => {
-                    bail!(ErrorKind::UnableToProduceTxInfo(format!(
-                        "Unable to find {:?} in epoch {}",
-                        block_hash, epoch
-                    )));
+                    bail!(ErrorKind::UnableToProduceTxInfo {
+                        reason: format!(
+                            "Unable to find {:?} in epoch {}",
+                            block_hash, epoch
+                        )
+                    });
                 }
             };
 
-        let epoch_receipts = match self.ledger.receipts_of(epoch) {
-            Ok(rs) => rs.iter().cloned().map(Arc::new).collect::<Vec<_>>(),
-            Err(e) => {
-                bail!(ErrorKind::UnableToProduceTxInfo(format!(
-                    "Unable to retrieve receipts for {}: {}",
-                    epoch, e
-                )));
-            }
-        };
+        let epoch_receipts = self
+            .ledger
+            .receipts_of(epoch)?
+            .iter()
+            .cloned()
+            .map(Arc::new)
+            .collect::<Vec<_>>();
 
         let epoch_receipt_proof = compute_epoch_receipt_proof(
             &epoch_receipts,
@@ -348,7 +346,7 @@ impl Provider {
 
     fn send_status(
         &self, io: &dyn NetworkContext, peer: &NodeId,
-    ) -> Result<(), Error> {
+    ) -> Result<()> {
         let best_info = self.consensus.best_info();
         let genesis_hash = self.graph.data_man.true_genesis.hash();
 
@@ -378,44 +376,40 @@ impl Provider {
     }
 
     #[inline]
-    fn validate_peer_type(&self, node_type: &NodeType) -> Result<(), Error> {
+    fn validate_peer_type(&self, node_type: NodeType) -> Result<()> {
         match node_type {
             NodeType::Light => Ok(()),
-            _ => bail!(ErrorKind::UnexpectedPeerType),
+            _ => bail!(ErrorKind::UnexpectedPeerType { node_type }),
         }
     }
 
     #[inline]
-    fn validate_genesis_hash(&self, genesis: H256) -> Result<(), Error> {
-        match self.graph.data_man.true_genesis.hash() {
-            h if h == genesis => Ok(()),
-            h => {
-                debug!(
-                    "Genesis mismatch (ours: {:?}, theirs: {:?})",
-                    h, genesis
-                );
-                bail!(ErrorKind::GenesisMismatch)
-            }
+    fn validate_genesis_hash(&self, genesis: H256) -> Result<()> {
+        let ours = self.graph.data_man.true_genesis.hash();
+        let theirs = genesis;
+
+        if ours != theirs {
+            bail!(ErrorKind::GenesisMismatch { ours, theirs });
         }
+
+        Ok(())
     }
 
     fn on_status_v2(
         &self, io: &dyn NetworkContext, peer: &NodeId, status: StatusPingV2,
-    ) -> Result<(), Error> {
+    ) -> Result<()> {
         info!("on_status peer={:?} status={:?}", peer, status);
         self.throttle(peer, &status)?;
 
-        self.validate_peer_type(&status.node_type)?;
+        self.validate_peer_type(status.node_type)?;
         self.validate_genesis_hash(status.genesis_hash)?;
         validate_chain_id(
             &self.consensus.get_config().chain_id,
             &status.chain_id,
         )?;
 
-        if let Err(e) = self.send_status(io, peer) {
-            warn!("Failed to send status to peer={:?}: {:?}", peer, e);
-            bail!(ErrorKind::SendStatusFailed);
-        };
+        self.send_status(io, peer)
+            .chain_err(|| ErrorKind::SendStatusFailed { peer: *peer })?;
 
         let state = self.get_existing_peer_state(peer)?;
         let mut state = state.write();
@@ -426,7 +420,7 @@ impl Provider {
     fn on_status_deprecated(
         &self, io: &dyn NetworkContext, peer: &NodeId,
         status: StatusPingDeprecatedV1,
-    ) -> Result<(), Error>
+    ) -> Result<()>
     {
         self.on_status_v2(
             io,
@@ -441,7 +435,7 @@ impl Provider {
 
     fn on_get_state_roots(
         &self, io: &dyn NetworkContext, peer: &NodeId, req: GetStateRoots,
-    ) -> Result<(), Error> {
+    ) -> Result<()> {
         debug!("on_get_state_roots req={:?}", req);
         self.throttle(peer, &req)?;
         let request_id = req.request_id;
@@ -450,7 +444,7 @@ impl Provider {
             .epochs
             .into_iter()
             .take(MAX_ITEMS_TO_SEND)
-            .map::<Result<_, Error>, _>(|epoch| {
+            .map::<Result<_>, _>(|epoch| {
                 let state_root = self.ledger.state_root_of(epoch)?.state_root;
                 Ok(StateRootWithEpoch { epoch, state_root })
             });
@@ -470,7 +464,7 @@ impl Provider {
         Ok(())
     }
 
-    fn state_entry(&self, key: StateKey) -> Result<StateEntryWithKey, Error> {
+    fn state_entry(&self, key: StateKey) -> Result<StateEntryWithKey> {
         let snapshot_epoch_count = self.ledger.snapshot_epoch_count() as u64;
 
         // state root in current snapshot period
@@ -501,7 +495,7 @@ impl Provider {
 
     fn on_get_state_entries(
         &self, io: &dyn NetworkContext, peer: &NodeId, req: GetStateEntries,
-    ) -> Result<(), Error> {
+    ) -> Result<()> {
         debug!("on_get_state_entries req={:?}", req);
         self.throttle(peer, &req)?;
         let request_id = req.request_id;
@@ -533,7 +527,7 @@ impl Provider {
     fn on_get_block_hashes_by_epoch(
         &self, io: &dyn NetworkContext, peer: &NodeId,
         req: GetBlockHashesByEpoch,
-    ) -> Result<(), Error>
+    ) -> Result<()>
     {
         debug!("on_get_block_hashes_by_epoch req={:?}", req);
         self.throttle(peer, &req)?;
@@ -565,7 +559,7 @@ impl Provider {
 
     fn on_get_block_headers(
         &self, io: &dyn NetworkContext, peer: &NodeId, req: GetBlockHeaders,
-    ) -> Result<(), Error> {
+    ) -> Result<()> {
         debug!("on_get_block_headers req={:?}", req);
         self.throttle(peer, &req)?;
         let request_id = req.request_id;
@@ -574,7 +568,7 @@ impl Provider {
             .hashes
             .iter()
             .take(MAX_HEADERS_TO_SEND)
-            .map::<Result<_, Error>, _>(|h| {
+            .map::<Result<_>, _>(|h| {
                 self.graph
                     .data_man
                     .block_header_by_hash(&h)
@@ -605,7 +599,7 @@ impl Provider {
 
     fn on_send_raw_tx(
         &self, _io: &dyn NetworkContext, peer: &NodeId, req: SendRawTx,
-    ) -> Result<(), Error> {
+    ) -> Result<()> {
         debug!("on_send_raw_tx req={:?}", req);
         self.throttle(peer, &req)?;
         let tx: TransactionWithSignature = rlp::decode(&req.raw)?;
@@ -629,18 +623,17 @@ impl Provider {
             }
             _ => {
                 // NOTE: this should not happen
-                error!(
+                bail!(ErrorKind::InternalError(format!(
                     "insert_new_transactions failed: {:?}, {:?}",
                     passed, failed
-                );
-                bail!(ErrorKind::InternalError)
+                )))
             }
         }
     }
 
     fn on_get_receipts(
         &self, io: &dyn NetworkContext, peer: &NodeId, req: GetReceipts,
-    ) -> Result<(), Error> {
+    ) -> Result<()> {
         debug!("on_get_receipts req={:?}", req);
         self.throttle(peer, &req)?;
         let request_id = req.request_id;
@@ -671,7 +664,7 @@ impl Provider {
 
     fn on_get_txs(
         &self, io: &dyn NetworkContext, peer: &NodeId, req: GetTxs,
-    ) -> Result<(), Error> {
+    ) -> Result<()> {
         debug!("on_get_txs req={:?}", req);
         self.throttle(peer, &req)?;
         let request_id = req.request_id;
@@ -680,7 +673,7 @@ impl Provider {
             .hashes
             .into_iter()
             .take(MAX_TXS_TO_SEND)
-            .map::<Result<_, Error>, _>(|h| {
+            .map::<Result<_>, _>(|h| {
                 self.tx_by_hash(h).ok_or_else(|| {
                     ErrorKind::Msg(format!("Tx {:?} not found", h)).into()
                 })
@@ -701,7 +694,7 @@ impl Provider {
 
     fn on_get_witness_info(
         &self, io: &dyn NetworkContext, peer: &NodeId, req: GetWitnessInfo,
-    ) -> Result<(), Error> {
+    ) -> Result<()> {
         debug!("on_get_witness_info req={:?}", req);
         self.throttle(peer, &req)?;
         let request_id = req.request_id;
@@ -727,7 +720,7 @@ impl Provider {
 
     fn on_get_blooms(
         &self, io: &dyn NetworkContext, peer: &NodeId, req: GetBlooms,
-    ) -> Result<(), Error> {
+    ) -> Result<()> {
         debug!("on_get_blooms req={:?}", req);
         self.throttle(peer, &req)?;
         let request_id = req.request_id;
@@ -753,7 +746,7 @@ impl Provider {
 
     fn on_get_block_txs(
         &self, io: &dyn NetworkContext, peer: &NodeId, req: GetBlockTxs,
-    ) -> Result<(), Error> {
+    ) -> Result<()> {
         debug!("on_get_block_txs req={:?}", req);
         self.throttle(peer, &req)?;
         let request_id = req.request_id;
@@ -762,7 +755,7 @@ impl Provider {
             .hashes
             .into_iter()
             .take(MAX_ITEMS_TO_SEND)
-            .map::<Result<_, Error>, _>(|h| {
+            .map::<Result<_>, _>(|h| {
                 let block = self.ledger.block(h)?;
 
                 let block_txs = block
@@ -795,7 +788,7 @@ impl Provider {
 
     fn on_get_tx_infos(
         &self, io: &dyn NetworkContext, peer: &NodeId, req: GetTxInfos,
-    ) -> Result<(), Error> {
+    ) -> Result<()> {
         debug!("on_get_tx_infos req={:?}", req);
         self.throttle(peer, &req)?;
         let request_id = req.request_id;
@@ -819,9 +812,7 @@ impl Provider {
         Ok(())
     }
 
-    fn storage_root(
-        &self, key: StorageRootKey,
-    ) -> Result<StorageRootWithKey, Error> {
+    fn storage_root(&self, key: StorageRootKey) -> Result<StorageRootWithKey> {
         let snapshot_epoch_count = self.ledger.snapshot_epoch_count() as u64;
 
         // state root in current snapshot period
@@ -852,7 +843,7 @@ impl Provider {
 
     fn on_get_storage_roots(
         &self, io: &dyn NetworkContext, peer: &NodeId, req: GetStorageRoots,
-    ) -> Result<(), Error> {
+    ) -> Result<()> {
         debug!("on_get_storage_roots req={:?}", req);
         self.throttle(peer, &req)?;
         let request_id = req.request_id;
@@ -882,7 +873,7 @@ impl Provider {
     fn broadcast(
         &self, io: &dyn NetworkContext, mut peers: Vec<NodeId>,
         msg: &dyn Message,
-    ) -> Result<(), Error>
+    ) -> Result<()>
     {
         debug!("broadcast peers={:?}", peers);
 
@@ -906,9 +897,7 @@ impl Provider {
         Ok(())
     }
 
-    pub fn relay_hashes(
-        self: &Arc<Self>, hashes: Vec<H256>,
-    ) -> Result<(), Error> {
+    pub fn relay_hashes(self: &Arc<Self>, hashes: Vec<H256>) -> Result<()> {
         debug!("relay_hashes hashes={:?}", hashes);
 
         if hashes.is_empty() {
@@ -919,8 +908,9 @@ impl Provider {
         let network = match self.network.upgrade() {
             Some(network) => network,
             None => {
-                error!("Network unavailable, not relaying hashes");
-                bail!(ErrorKind::InternalError);
+                bail!(ErrorKind::InternalError(
+                    "Network unavailable, not relaying hashes".to_owned()
+                ));
             }
         };
 
@@ -937,9 +927,7 @@ impl Provider {
         Ok(())
     }
 
-    fn throttle<T: Message>(
-        &self, peer: &NodeId, msg: &T,
-    ) -> Result<(), Error> {
+    fn throttle<T: Message>(&self, peer: &NodeId, msg: &T) -> Result<()> {
         let peer = self.get_existing_peer_state(peer)?;
 
         let bucket_name = msg.msg_name().to_string();
@@ -990,7 +978,7 @@ impl NetworkProtocolHandler for Provider {
                     io,
                     peer,
                     msgid::INVALID,
-                    ErrorKind::InvalidMessageFormat.into(),
+                    &ErrorKind::InvalidMessageFormat.into(),
                 )
             }
         };
@@ -998,7 +986,7 @@ impl NetworkProtocolHandler for Provider {
         debug!("on_message: peer={:?}, msgid={:?}", peer, msg_id);
 
         if let Err(e) = self.dispatch_message(io, peer, msg_id.into(), rlp) {
-            handle_error(io, peer, msg_id.into(), e);
+            handle_error(io, peer, msg_id.into(), &e);
         }
     }
 
diff --git a/core/src/light_protocol/query_service.rs b/core/src/light_protocol/query_service.rs
index 7b95385f17..db30e9b189 100644
--- a/core/src/light_protocol/query_service.rs
+++ b/core/src/light_protocol/query_service.rs
@@ -48,8 +48,10 @@ type TxInfo = (
 // Because of this, our RPC runtime cannot handle tokio@0.2 timing primitives.
 // As a temporary workaround, we use the old `tokio_timer::Timeout` instead.
 async fn with_timeout<T>(
-    d: Duration, msg: String, fut: impl Future<Output = T> + Send + Sync,
-) -> Result<T, Error> {
+    d: Duration, msg: String,
+    fut: impl Future<Output = Result<T, Error>> + Send + Sync,
+) -> Result<T, Error>
+{
     // convert `fut` into futures@0.1
     let fut = fut.unit_error().boxed().compat();
 
@@ -63,7 +65,7 @@ async fn with_timeout<T>(
     // set error message
     with_timeout
         .await
-        .map_err(|_| ErrorKind::Timeout(msg).into())
+        .map_err(|_| Error::from(ErrorKind::Timeout(msg)))?
 }
 
 pub struct QueryService {
diff --git a/core/src/sync/node_type.rs b/core/src/sync/node_type.rs
index 61fa213b32..871f007d44 100644
--- a/core/src/sync/node_type.rs
+++ b/core/src/sync/node_type.rs
@@ -5,7 +5,7 @@
 use malloc_size_of_derive::MallocSizeOf as DeriveMallocSizeOf;
 use rlp::{Decodable, DecoderError, Encodable, Rlp, RlpStream};
 
-#[derive(Clone, Debug, PartialEq, DeriveMallocSizeOf)]
+#[derive(Clone, Copy, Debug, PartialEq, DeriveMallocSizeOf)]
 #[repr(u8)]
 pub enum NodeType {
     Archive,