From c1a2dc690d764bcb7b475c5254745fea5065f70d Mon Sep 17 00:00:00 2001 From: Ivan Druzhitskiy Date: Thu, 24 Nov 2022 15:15:20 +0300 Subject: [PATCH 01/11] save joinhandle errors as made by unknown node --- CHANGELOG.md | 1 + bob/src/cluster/operations.rs | 9 +++++---- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f349b8eb0..afe6f0f0e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ Bob versions changelog #### Fixed +- Error propagation for JoinHandle errors (#572) #### Updated diff --git a/bob/src/cluster/operations.rs b/bob/src/cluster/operations.rs index bec341f3c..55d434257 100644 --- a/bob/src/cluster/operations.rs +++ b/bob/src/cluster/operations.rs @@ -57,6 +57,10 @@ fn is_result_successful( }, Err(e) => { error!("{:?}", e); + errors.push(NodeOutput::new( + "unknown".to_string(), + Error::failed(e.to_string()), + )) } } 0 @@ -278,10 +282,7 @@ pub(crate) async fn put_local_node( backend.put_local(key, data, op).await } -pub(crate) async fn delete_at_local_node( - backend: &Backend, - key: BobKey, -) -> Result<(), Error> { +pub(crate) async fn delete_at_local_node(backend: &Backend, key: BobKey) -> Result<(), Error> { debug!("local node has vdisk replica, put local"); backend.delete(key).await?; Ok(()) From e50f171bfa250251f603bf46be0e99655e9e7ef9 Mon Sep 17 00:00:00 2001 From: Ivan Druzhitskiy Date: Fri, 11 Aug 2023 16:31:11 +0300 Subject: [PATCH 02/11] revert style change --- bob/src/cluster/operations.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/bob/src/cluster/operations.rs b/bob/src/cluster/operations.rs index 55d434257..e287926c8 100644 --- a/bob/src/cluster/operations.rs +++ b/bob/src/cluster/operations.rs @@ -282,7 +282,10 @@ pub(crate) async fn put_local_node( backend.put_local(key, data, op).await } -pub(crate) async fn delete_at_local_node(backend: &Backend, key: BobKey) -> Result<(), Error> { +pub(crate) async fn delete_at_local_node( + backend: &Backend, + key: BobKey +) -> Result<(), Error> { debug!("local node has vdisk replica, put local"); backend.delete(key).await?; Ok(()) From 54bf9a58bd6d4d46a3ce49d79ca4646113f9db3f Mon Sep 17 00:00:00 2001 From: Ivan Druzhitskiy Date: Fri, 11 Aug 2023 16:31:38 +0300 Subject: [PATCH 03/11] add missing comma --- bob/src/cluster/operations.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bob/src/cluster/operations.rs b/bob/src/cluster/operations.rs index e287926c8..a924dd86e 100644 --- a/bob/src/cluster/operations.rs +++ b/bob/src/cluster/operations.rs @@ -284,7 +284,7 @@ pub(crate) async fn put_local_node( pub(crate) async fn delete_at_local_node( backend: &Backend, - key: BobKey + key: BobKey, ) -> Result<(), Error> { debug!("local node has vdisk replica, put local"); backend.delete(key).await?; From c5680044ee4f7c84774d151ffac3767c69fc2203 Mon Sep 17 00:00:00 2001 From: Ivan Druzhitskiy Date: Mon, 14 Aug 2023 17:21:52 +0300 Subject: [PATCH 04/11] move joinhandle error processing --- bob/src/cluster/operations.rs | 50 ++++++++++++++++++----------------- bob/src/cluster/quorum.rs | 7 +++-- 2 files changed, 29 insertions(+), 28 deletions(-) diff --git a/bob/src/cluster/operations.rs b/bob/src/cluster/operations.rs index a924dd86e..a6c3993a6 100644 --- a/bob/src/cluster/operations.rs +++ b/bob/src/cluster/operations.rs @@ -1,7 +1,7 @@ use crate::link_manager::LinkManager; use crate::prelude::*; -pub(crate) type Tasks = FuturesUnordered, NodeOutput>>>; +pub(crate) type Tasks = FuturesUnordered, NodeOutput>> + Send>>>; pub(crate) async fn get_any( key: BobKey, @@ -22,54 +22,54 @@ fn call_node_put( data: BobData, node: Node, options: PutOptions, -) -> JoinHandle, NodeOutput>> { +) -> impl Future, NodeOutput>> { debug!("PUT[{}] put to {}", key, node.name()); + let name = node.name().to_string(); let task = async move { LinkManager::call_node(&node, |conn| conn.put(key, data, options).boxed()).await }; - tokio::spawn(task) + tokio::spawn(task).map(|r| match r { + Ok(r) => r, + Err(e) => Err(NodeOutput::new(name, Error::failed(e.to_string()))), + }) } fn call_node_delete( key: BobKey, options: DeleteOptions, node: Node, -) -> JoinHandle, NodeOutput>> { +) -> impl Future, NodeOutput>> { debug!("DELETE[{}] delete to {}", key, node.name()); + let name = node.name().to_string(); let task = async move { LinkManager::call_node(&node, |conn| conn.delete(key, options).boxed()).await }; - tokio::spawn(task) + tokio::spawn(task).map(|r| match r { + Ok(r) => r, + Err(e) => Err(NodeOutput::new(name, Error::failed(e.to_string()))), + }) } fn is_result_successful( - join_res: Result, NodeOutput>, JoinError>, + join_res: Result, NodeOutput>, errors: &mut Vec>, ) -> usize { debug!("handle returned"); match join_res { - Ok(res) => match res { - Ok(_) => return 1, - Err(e) => { - error!("{:?}", e); - errors.push(e); - } - }, + Ok(_) => return 1, Err(e) => { error!("{:?}", e); - errors.push(NodeOutput::new( - "unknown".to_string(), - Error::failed(e.to_string()), - )) + errors.push(e); } } 0 } -async fn finish_at_least_handles( - handles: &mut FuturesUnordered, NodeOutput>>>, +async fn finish_at_least_handles( + handles: &mut FuturesUnordered, at_least: usize, -) -> Vec> { +) -> Vec> +where R: Future, NodeOutput>>{ let mut ok_count = 0; let mut errors = Vec::new(); while ok_count < at_least { @@ -110,12 +110,14 @@ pub(crate) async fn delete_at_nodes( errors } -async fn call_at_least( +async fn call_at_least( target_nodes: impl Iterator, at_least: usize, - f: impl Fn(Node) -> JoinHandle, NodeOutput>>, -) -> (Tasks, Vec>) { - let mut handles: FuturesUnordered<_> = target_nodes.cloned().map(|node| f(node)).collect(); + f: F, +) -> (Tasks, Vec>) +where F: Fn(Node) -> R, + R: Future, NodeOutput>> + Send + 'static { + let mut handles: Tasks = target_nodes.cloned().map(|node| f(node).boxed()).collect(); debug!("total handles count: {}", handles.len()); let errors = finish_at_least_handles(&mut handles, at_least).await; debug!("remains: {}, errors: {}", handles.len(), errors.len()); diff --git a/bob/src/cluster/quorum.rs b/bob/src/cluster/quorum.rs index 76e637925..9e22a112d 100644 --- a/bob/src/cluster/quorum.rs +++ b/bob/src/cluster/quorum.rs @@ -120,16 +120,15 @@ impl Quorum { debug!("PUT[{}] ~~~BACKGROUND PUT TO REMOTE NODES~~~", key); while let Some(join_res) = rest_tasks.next().await { match join_res { - Ok(Ok(output)) => debug!( + Ok(output) => debug!( "PUT[{}] successful background put to: {}", key, output.node_name() ), - Ok(Err(e)) => { + Err(e) => { error!("{:?}", e); failed_nodes.push(e.node_name().to_string()); } - Err(e) => error!("{:?}", e), } } debug!("PUT[{}] ~~~PUT TO REMOTE NODES ALIEN~~~", key); @@ -153,7 +152,7 @@ impl Quorum { key, target_nodes.len(), ); - let target_nodes = target_nodes.iter().filter(|node| node.name() != local_node); + let target_nodes = target_nodes.iter().filter(move |node| node.name() != local_node); put_at_least(key, data, target_nodes, at_least, PutOptions::new_local()).await } From 932a37d589397a19538e236273ad3e7b25308069 Mon Sep 17 00:00:00 2001 From: Ivan Druzhitskiy Date: Thu, 17 Aug 2023 16:36:31 +0300 Subject: [PATCH 05/11] Revert "move joinhandle error processing" This reverts commit c5680044ee4f7c84774d151ffac3767c69fc2203. --- bob/src/cluster/operations.rs | 50 +++++++++++++++++------------------ bob/src/cluster/quorum.rs | 7 ++--- 2 files changed, 28 insertions(+), 29 deletions(-) diff --git a/bob/src/cluster/operations.rs b/bob/src/cluster/operations.rs index a6c3993a6..a924dd86e 100644 --- a/bob/src/cluster/operations.rs +++ b/bob/src/cluster/operations.rs @@ -1,7 +1,7 @@ use crate::link_manager::LinkManager; use crate::prelude::*; -pub(crate) type Tasks = FuturesUnordered, NodeOutput>> + Send>>>; +pub(crate) type Tasks = FuturesUnordered, NodeOutput>>>; pub(crate) async fn get_any( key: BobKey, @@ -22,54 +22,54 @@ fn call_node_put( data: BobData, node: Node, options: PutOptions, -) -> impl Future, NodeOutput>> { +) -> JoinHandle, NodeOutput>> { debug!("PUT[{}] put to {}", key, node.name()); - let name = node.name().to_string(); let task = async move { LinkManager::call_node(&node, |conn| conn.put(key, data, options).boxed()).await }; - tokio::spawn(task).map(|r| match r { - Ok(r) => r, - Err(e) => Err(NodeOutput::new(name, Error::failed(e.to_string()))), - }) + tokio::spawn(task) } fn call_node_delete( key: BobKey, options: DeleteOptions, node: Node, -) -> impl Future, NodeOutput>> { +) -> JoinHandle, NodeOutput>> { debug!("DELETE[{}] delete to {}", key, node.name()); - let name = node.name().to_string(); let task = async move { LinkManager::call_node(&node, |conn| conn.delete(key, options).boxed()).await }; - tokio::spawn(task).map(|r| match r { - Ok(r) => r, - Err(e) => Err(NodeOutput::new(name, Error::failed(e.to_string()))), - }) + tokio::spawn(task) } fn is_result_successful( - join_res: Result, NodeOutput>, + join_res: Result, NodeOutput>, JoinError>, errors: &mut Vec>, ) -> usize { debug!("handle returned"); match join_res { - Ok(_) => return 1, + Ok(res) => match res { + Ok(_) => return 1, + Err(e) => { + error!("{:?}", e); + errors.push(e); + } + }, Err(e) => { error!("{:?}", e); - errors.push(e); + errors.push(NodeOutput::new( + "unknown".to_string(), + Error::failed(e.to_string()), + )) } } 0 } -async fn finish_at_least_handles( - handles: &mut FuturesUnordered, +async fn finish_at_least_handles( + handles: &mut FuturesUnordered, NodeOutput>>>, at_least: usize, -) -> Vec> -where R: Future, NodeOutput>>{ +) -> Vec> { let mut ok_count = 0; let mut errors = Vec::new(); while ok_count < at_least { @@ -110,14 +110,12 @@ pub(crate) async fn delete_at_nodes( errors } -async fn call_at_least( +async fn call_at_least( target_nodes: impl Iterator, at_least: usize, - f: F, -) -> (Tasks, Vec>) -where F: Fn(Node) -> R, - R: Future, NodeOutput>> + Send + 'static { - let mut handles: Tasks = target_nodes.cloned().map(|node| f(node).boxed()).collect(); + f: impl Fn(Node) -> JoinHandle, NodeOutput>>, +) -> (Tasks, Vec>) { + let mut handles: FuturesUnordered<_> = target_nodes.cloned().map(|node| f(node)).collect(); debug!("total handles count: {}", handles.len()); let errors = finish_at_least_handles(&mut handles, at_least).await; debug!("remains: {}, errors: {}", handles.len(), errors.len()); diff --git a/bob/src/cluster/quorum.rs b/bob/src/cluster/quorum.rs index 9e22a112d..76e637925 100644 --- a/bob/src/cluster/quorum.rs +++ b/bob/src/cluster/quorum.rs @@ -120,15 +120,16 @@ impl Quorum { debug!("PUT[{}] ~~~BACKGROUND PUT TO REMOTE NODES~~~", key); while let Some(join_res) = rest_tasks.next().await { match join_res { - Ok(output) => debug!( + Ok(Ok(output)) => debug!( "PUT[{}] successful background put to: {}", key, output.node_name() ), - Err(e) => { + Ok(Err(e)) => { error!("{:?}", e); failed_nodes.push(e.node_name().to_string()); } + Err(e) => error!("{:?}", e), } } debug!("PUT[{}] ~~~PUT TO REMOTE NODES ALIEN~~~", key); @@ -152,7 +153,7 @@ impl Quorum { key, target_nodes.len(), ); - let target_nodes = target_nodes.iter().filter(move |node| node.name() != local_node); + let target_nodes = target_nodes.iter().filter(|node| node.name() != local_node); put_at_least(key, data, target_nodes, at_least, PutOptions::new_local()).await } From 7178710e5df3de3c841a6911124e0b852b4fdd62 Mon Sep 17 00:00:00 2001 From: Ivan Druzhitskiy Date: Thu, 17 Aug 2023 16:37:18 +0300 Subject: [PATCH 06/11] add panic --- bob/src/cluster/operations.rs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/bob/src/cluster/operations.rs b/bob/src/cluster/operations.rs index a924dd86e..5e85fd2ec 100644 --- a/bob/src/cluster/operations.rs +++ b/bob/src/cluster/operations.rs @@ -57,10 +57,7 @@ fn is_result_successful( }, Err(e) => { error!("{:?}", e); - errors.push(NodeOutput::new( - "unknown".to_string(), - Error::failed(e.to_string()), - )) + panic!("thread panicked"); } } 0 From 223b88f14c563c29ca3135e0ed38cc482b270b4b Mon Sep 17 00:00:00 2001 From: Ivan Druzhitskiy Date: Thu, 17 Aug 2023 16:41:13 +0300 Subject: [PATCH 07/11] add missing panic --- bob/src/cluster/operations.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/bob/src/cluster/operations.rs b/bob/src/cluster/operations.rs index 5e495f99e..76023d071 100644 --- a/bob/src/cluster/operations.rs +++ b/bob/src/cluster/operations.rs @@ -21,6 +21,7 @@ fn is_result_successful( }, Err(e) => { error!("{:?}", e); + panic!("panic in thread"); } } 0 From 8bcc6a62b0ade5ff63725f60399547b0ff7f8dad Mon Sep 17 00:00:00 2001 From: Ivan Druzhitskiy Date: Thu, 31 Aug 2023 15:25:38 +0300 Subject: [PATCH 08/11] fix discussion --- bob-common/src/node.rs | 9 ++++++++- bob/src/cluster/operations.rs | 17 ++++++++++------- bob/src/cluster/support_types.rs | 24 ++++++++++++++++++++++++ 3 files changed, 42 insertions(+), 8 deletions(-) diff --git a/bob-common/src/node.rs b/bob-common/src/node.rs index 05645e612..f871e4281 100644 --- a/bob-common/src/node.rs +++ b/bob-common/src/node.rs @@ -176,10 +176,17 @@ impl Output { inner: map_fn(self.inner), } } + + pub fn with_node_name(self, node_name: NodeName) -> Output { + Self { + node_name, + inner: self.inner + } + } } impl Output { pub fn timestamp(&self) -> u64 { self.inner.meta().timestamp() } -} \ No newline at end of file +} diff --git a/bob/src/cluster/operations.rs b/bob/src/cluster/operations.rs index 76023d071..010da585f 100644 --- a/bob/src/cluster/operations.rs +++ b/bob/src/cluster/operations.rs @@ -1,8 +1,8 @@ use crate::link_manager::LinkManager; use crate::prelude::*; -use super::support_types::RemoteDeleteError; +use super::support_types::{RemoteDeleteError, NodeOutputJoinHandle}; -pub(crate) type Tasks = FuturesUnordered, NodeOutput>>>; +pub(crate) type Tasks = FuturesUnordered>; // ======================= Helpers ================= @@ -21,7 +21,9 @@ fn is_result_successful( }, Err(e) => { error!("{:?}", e); - panic!("panic in thread"); + if e.is_panic() { + panic!("panic in thread"); + } } } 0 @@ -47,8 +49,8 @@ pub(crate) async fn finish_at_least_handles( async fn call_at_least( target_nodes: impl Iterator, at_least: usize, - f: impl Fn(TOp) -> JoinHandle, NodeOutput>>, -) -> (FuturesUnordered, NodeOutput>>>, Vec>) { + f: impl Fn(TOp) -> NodeOutputJoinHandle, +) -> (FuturesUnordered>, Vec>) { let mut handles: FuturesUnordered<_> = target_nodes.map(|op| f(op)).collect(); trace!("total handles count: {}", handles.len()); let errors = finish_at_least_handles(&mut handles, at_least).await; @@ -189,13 +191,14 @@ fn call_node_put( data: BobData, node: Node, options: BobPutOptions, -) -> JoinHandle, NodeOutput>> { +) -> NodeOutputJoinHandle { debug!("PUT[{}] put to {}", key, node.name()); + let name = node.name().clone(); let task = async move { let grpc_options = options.to_grpc(); LinkManager::call_node(&node, |conn| conn.put(key, data, grpc_options).boxed()).await }; - tokio::spawn(task) + NodeOutputJoinHandle::new(tokio::spawn(task), name) } diff --git a/bob/src/cluster/support_types.rs b/bob/src/cluster/support_types.rs index 7e9bafb82..219007125 100644 --- a/bob/src/cluster/support_types.rs +++ b/bob/src/cluster/support_types.rs @@ -128,3 +128,27 @@ impl IndexMap { }); } } + +pub(crate) struct NodeOutputJoinHandle(JoinHandle, NodeOutput>>, NodeName); + +impl NodeOutputJoinHandle { + pub(super) fn new(handle: JoinHandle, NodeOutput>>, node_name: NodeName) -> Self { + Self(handle, node_name) + } +} + +impl Future for NodeOutputJoinHandle { + type Output = Result, NodeOutput>, JoinError>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll { + use std::task::Poll; + + match self.0.poll_unpin(cx) { + Poll::Ready(r) => Poll::Ready(r.map(|r| r + .map(|o| o.with_node_name(self.1.clone())) + .map_err(|o| o.with_node_name(self.1.clone())))), + Poll::Pending => std::task::Poll::Pending + } + } + +} From 5b40f4fba9657f3b4aedc70b0e995335370f3ff9 Mon Sep 17 00:00:00 2001 From: Ivan Druzhitskiy Date: Wed, 6 Sep 2023 16:14:02 +0300 Subject: [PATCH 09/11] fix bob/832 --- bob-backend/src/pearl/utils.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bob-backend/src/pearl/utils.rs b/bob-backend/src/pearl/utils.rs index 72b1569c3..98b8f1d59 100755 --- a/bob-backend/src/pearl/utils.rs +++ b/bob-backend/src/pearl/utils.rs @@ -93,7 +93,7 @@ impl Utils { Error::failed(format!("smth wrong with time: {:?}, error: {}", period, e)) }) .map(|period| { - let time = DateTime::from_utc( + let time = DateTime::from_naive_utc_and_offset( NaiveDateTime::from_timestamp_opt(time.try_into().unwrap(), 0).expect("time out of range"), Utc, ); From 6b1729c9dc4466c44f2f431d65bd8f1a9a5fc47c Mon Sep 17 00:00:00 2001 From: Ivan Druzhitskiy Date: Wed, 29 Nov 2023 17:13:29 +0300 Subject: [PATCH 10/11] fix misuse of new type --- bob/src/cluster/operations.rs | 25 +++++++++---------------- bob/src/cluster/quorum.rs | 5 ++--- bob/src/cluster/support_types.rs | 13 +++++++++---- 3 files changed, 20 insertions(+), 23 deletions(-) diff --git a/bob/src/cluster/operations.rs b/bob/src/cluster/operations.rs index 010da585f..534f7a99c 100644 --- a/bob/src/cluster/operations.rs +++ b/bob/src/cluster/operations.rs @@ -7,23 +7,15 @@ pub(crate) type Tasks = FuturesUnordered>; // ======================= Helpers ================= fn is_result_successful( - join_res: Result, NodeOutput>, JoinError>, + join_res: Result, NodeOutput>, errors: &mut Vec>, ) -> usize { debug!("handle returned"); match join_res { - Ok(res) => match res { - Ok(_) => return 1, - Err(e) => { - debug!("{:?}", e); - errors.push(e); - } - }, + Ok(_) => return 1, Err(e) => { - error!("{:?}", e); - if e.is_panic() { - panic!("panic in thread"); - } + debug!("{:?}", e); + errors.push(e); } } 0 @@ -60,7 +52,7 @@ async fn call_at_least( async fn finish_all_handles( - handles: &mut FuturesUnordered, NodeOutput>>> + handles: &mut FuturesUnordered> ) -> Vec> { let mut ok_count = 0; let mut total_count = 0; @@ -75,7 +67,7 @@ async fn finish_all_handles( async fn call_all( operations: impl Iterator, - f: impl Fn(TOp) -> JoinHandle, NodeOutput>>, + f: impl Fn(TOp) -> NodeOutputJoinHandle, ) -> (usize, Vec>) { let mut handles: FuturesUnordered<_> = operations.map(|op| f(op)).collect(); let handles_len = handles.len(); @@ -375,15 +367,16 @@ fn call_node_delete( meta: BobMeta, options: BobDeleteOptions, node: Node, -) -> JoinHandle, NodeOutput>> { +) -> NodeOutputJoinHandle { trace!("DELETE[{}] delete to {}", key, node.name()); + let name = node.name().clone(); let task = async move { let force_alien_nodes_copy = options.force_delete_nodes().iter().cloned().collect(); let grpc_options = options.to_grpc(); let call_result = LinkManager::call_node(&node, |conn| conn.delete(key, meta, grpc_options).boxed()).await; call_result.map_err(|err| err.map(|inner| RemoteDeleteError::new(force_alien_nodes_copy, inner))) }; - tokio::spawn(task) + NodeOutputJoinHandle::new(tokio::spawn(task), name) } diff --git a/bob/src/cluster/quorum.rs b/bob/src/cluster/quorum.rs index a1a8cdef5..42657d5d9 100644 --- a/bob/src/cluster/quorum.rs +++ b/bob/src/cluster/quorum.rs @@ -100,16 +100,15 @@ impl Quorum { debug!("PUT[{}] ~~~BACKGROUND PUT TO REMOTE NODES~~~", key); while let Some(join_res) = rest_tasks.next().await { match join_res { - Ok(Ok(output)) => debug!( + Ok(output) => debug!( "PUT[{}] successful background put to: {}", key, output.node_name() ), - Ok(Err(e)) => { + Err(e) => { error!("{:?}", e); failed_nodes.push(e.node_name().clone()); } - Err(e) => error!("{:?}", e), } } debug!("PUT[{}] ~~~PUT TO REMOTE NODES ALIEN~~~", key); diff --git a/bob/src/cluster/support_types.rs b/bob/src/cluster/support_types.rs index 219007125..0d5256ba7 100644 --- a/bob/src/cluster/support_types.rs +++ b/bob/src/cluster/support_types.rs @@ -138,15 +138,20 @@ impl NodeOutputJoinHandle { } impl Future for NodeOutputJoinHandle { - type Output = Result, NodeOutput>, JoinError>; + type Output = Result, NodeOutput>; fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll { use std::task::Poll; match self.0.poll_unpin(cx) { - Poll::Ready(r) => Poll::Ready(r.map(|r| r - .map(|o| o.with_node_name(self.1.clone())) - .map_err(|o| o.with_node_name(self.1.clone())))), + Poll::Ready(Ok(r)) => Poll::Ready(r), + Poll::Ready(Err(e)) => { + error!("{:?}", e); + if e.is_panic() { + panic!("panic in thread"); + } + panic!("cancellation is not supported") + }, Poll::Pending => std::task::Poll::Pending } } From 2e20b2ab2af773b98ff15f7d48c7fbbcf9bfb6c5 Mon Sep 17 00:00:00 2001 From: Ivan Druzhitskiy Date: Wed, 29 Nov 2023 17:27:54 +0300 Subject: [PATCH 11/11] remove unnecessary change --- bob-common/src/node.rs | 7 ------- 1 file changed, 7 deletions(-) diff --git a/bob-common/src/node.rs b/bob-common/src/node.rs index f871e4281..01614cff5 100644 --- a/bob-common/src/node.rs +++ b/bob-common/src/node.rs @@ -176,13 +176,6 @@ impl Output { inner: map_fn(self.inner), } } - - pub fn with_node_name(self, node_name: NodeName) -> Output { - Self { - node_name, - inner: self.inner - } - } } impl Output {