diff --git a/CHANGELOG.md b/CHANGELOG.md index e0001e562..bd40fed92 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -55,6 +55,7 @@ Bob versions changelog - Add different logs for different branches in error on vdisk search (#808) #### Fixed +- Error propagation for JoinHandle errors (#572) - Ensure correct working when node contains multiple replicas of single vdisk (#654) - Fix memory leak due to prometheus lib (#788) - Fix for grinder delete metrics not being initialized (#824) diff --git a/bob/src/cluster/operations.rs b/bob/src/cluster/operations.rs index f2789e6d7..4acb1c585 100644 --- a/bob/src/cluster/operations.rs +++ b/bob/src/cluster/operations.rs @@ -1,8 +1,8 @@ use crate::link_manager::LinkManager; use crate::prelude::*; -use super::support_types::{RemoteDeleteError, RemotePutResponse, RemotePutError}; +use super::support_types::{RemoteDeleteError, RemotePutResponse, RemotePutError, NodeOutputJoinHandle}; -pub(crate) type Tasks = FuturesUnordered, NodeOutput>>>; +pub(crate) type Tasks = FuturesUnordered>; // ======================= Helpers ================= @@ -23,26 +23,21 @@ impl AffectedReplicasProvider for RemotePutResponse { } fn process_result( - join_res: Result, NodeOutput>, JoinError>, + join_res: Result, NodeOutput>, oks: &mut Vec>, errors: &mut Vec> ) -> usize { debug!("handle returned"); match join_res { - Ok(res) => match res { - Ok(r) => - { - let affected_replicas = r.inner().get_affected_replicas_count(); - oks.push(r); - return affected_replicas; - } - Err(e) => { - debug!("{:?}", e); - errors.push(e); - } - }, + Ok(r) => + { + let affected_replicas = r.inner().get_affected_replicas_count(); + oks.push(r); + return affected_replicas; + } Err(e) => { - error!("{:?}", e); + debug!("{:?}", e); + errors.push(e); } } 0 @@ -69,8 +64,8 @@ pub(crate) async fn finish_at_least_handles( target_nodes: impl Iterator, at_least: usize, - f: impl Fn(TOp) -> JoinHandle, NodeOutput>> -) -> (FuturesUnordered, NodeOutput>>>, Vec>, Vec>) { + f: impl Fn(TOp) -> NodeOutputJoinHandle +) -> (FuturesUnordered>, Vec>, Vec>) { let mut handles: FuturesUnordered<_> = target_nodes.map(|op| f(op)).collect(); trace!("total handles count: {}", handles.len()); let (oks, errors) = finish_at_least_handles(&mut handles, at_least).await; @@ -80,7 +75,7 @@ async fn call_at_least( async fn finish_all_handles( - handles: &mut FuturesUnordered, NodeOutput>>> + handles: &mut FuturesUnordered> ) -> Vec> { let mut ok_count = 0; let mut total_count = 0; @@ -96,7 +91,7 @@ async fn finish_all_handles( async fn call_all( operations: impl Iterator, - f: impl Fn(TOp) -> JoinHandle, NodeOutput>>, + f: impl Fn(TOp) -> NodeOutputJoinHandle<(), TErr>, ) -> (usize, Vec>) { let mut handles: FuturesUnordered<_> = operations.map(|op| f(op)).collect(); let handles_len = handles.len(); @@ -211,8 +206,9 @@ fn call_node_put( node: Node, options: BobPutOptions, affected_replicas: usize -) -> JoinHandle, NodeOutput>> { +) -> NodeOutputJoinHandle { debug!("PUT[{}] put to {}", key, node.name()); + let name = node.name().clone(); let task = async move { let grpc_options = options.to_grpc(); let call_result = LinkManager::call_node(&node, |conn| conn.put(key, data, grpc_options).boxed()).await; @@ -220,7 +216,7 @@ fn call_node_put( .map(|o| o.map(|_| RemotePutResponse::new(affected_replicas))) .map_err(|o| o.map(|e| RemotePutError::new(affected_replicas, e))) }; - tokio::spawn(task) + NodeOutputJoinHandle::new(tokio::spawn(task), name) } @@ -418,15 +414,16 @@ fn call_node_delete( meta: BobMeta, options: BobDeleteOptions, node: Node, -) -> JoinHandle, NodeOutput>> { +) -> NodeOutputJoinHandle<(), RemoteDeleteError> { trace!("DELETE[{}] delete to {}", key, node.name()); + let name = node.name().clone(); let task = async move { let force_alien_nodes_copy = options.force_delete_nodes().iter().cloned().collect(); let grpc_options = options.to_grpc(); let call_result = LinkManager::call_node(&node, |conn| conn.delete(key, meta, grpc_options).boxed()).await; call_result.map_err(|err| err.map(|inner| RemoteDeleteError::new(force_alien_nodes_copy, inner))) }; - tokio::spawn(task) + NodeOutputJoinHandle::new(tokio::spawn(task), name) } diff --git a/bob/src/cluster/quorum.rs b/bob/src/cluster/quorum.rs index 4667fc631..9b370a788 100644 --- a/bob/src/cluster/quorum.rs +++ b/bob/src/cluster/quorum.rs @@ -97,16 +97,15 @@ impl Quorum { debug!("PUT[{}] ~~~BACKGROUND PUT TO REMOTE NODES~~~", key); while let Some(join_res) = rest_tasks.next().await { match join_res { - Ok(Ok(output)) => debug!( + Ok(output) => debug!( "PUT[{}] successful background put to: {}", key, output.node_name() ), - Ok(Err(e)) => { + Err(e) => { error!("{:?}", e); failed_nodes.push(e.node_name().clone()); } - Err(e) => error!("{:?}", e), } } debug!("PUT[{}] ~~~PUT TO REMOTE NODES ALIEN~~~", key); diff --git a/bob/src/cluster/support_types.rs b/bob/src/cluster/support_types.rs index d1eafd5f5..5b27d215a 100644 --- a/bob/src/cluster/support_types.rs +++ b/bob/src/cluster/support_types.rs @@ -157,3 +157,32 @@ impl IndexMap { }); } } + +pub(crate) struct NodeOutputJoinHandle(JoinHandle, NodeOutput>>, NodeName); + +impl NodeOutputJoinHandle { + pub(super) fn new(handle: JoinHandle, NodeOutput>>, node_name: NodeName) -> Self { + Self(handle, node_name) + } +} + +impl Future for NodeOutputJoinHandle { + type Output = Result, NodeOutput>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll { + use std::task::Poll; + + match self.0.poll_unpin(cx) { + Poll::Ready(Ok(r)) => Poll::Ready(r), + Poll::Ready(Err(e)) => { + error!("{:?}", e); + if e.is_panic() { + panic!("panic in thread"); + } + panic!("cancellation is not supported") + }, + Poll::Pending => std::task::Poll::Pending + } + } + +}