Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

572 join error #670

Open
wants to merge 14 commits into
base: master
Choose a base branch
from
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ Bob versions changelog
- Add different logs for different branches in error on vdisk search (#808)

#### Fixed
- Error propagation for JoinHandle errors (#572)
- Ensure correct working when node contains multiple replicas of single vdisk (#654)
- Fix memory leak due to prometheus lib (#788)
- Fix for grinder delete metrics not being initialized (#824)
Expand Down
45 changes: 21 additions & 24 deletions bob/src/cluster/operations.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use crate::link_manager::LinkManager;
use crate::prelude::*;
use super::support_types::{RemoteDeleteError, RemotePutResponse, RemotePutError};
use super::support_types::{RemoteDeleteError, RemotePutResponse, RemotePutError, NodeOutputJoinHandle};

pub(crate) type Tasks<Res, Err> = FuturesUnordered<JoinHandle<Result<NodeOutput<Res>, NodeOutput<Err>>>>;
pub(crate) type Tasks<Res, Err> = FuturesUnordered<NodeOutputJoinHandle<Res, Err>>;

// ======================= Helpers =================

Expand All @@ -23,26 +23,21 @@ impl AffectedReplicasProvider for RemotePutResponse {
}

fn process_result<TRes: AffectedReplicasProvider, TErr: Debug>(
join_res: Result<Result<NodeOutput<TRes>, NodeOutput<TErr>>, JoinError>,
join_res: Result<NodeOutput<TRes>, NodeOutput<TErr>>,
oks: &mut Vec<NodeOutput<TRes>>,
errors: &mut Vec<NodeOutput<TErr>>
) -> usize {
debug!("handle returned");
match join_res {
Ok(res) => match res {
Ok(r) =>
{
let affected_replicas = r.inner().get_affected_replicas_count();
oks.push(r);
return affected_replicas;
}
Err(e) => {
debug!("{:?}", e);
errors.push(e);
}
},
Ok(r) =>
{
let affected_replicas = r.inner().get_affected_replicas_count();
oks.push(r);
return affected_replicas;
}
Err(e) => {
error!("{:?}", e);
debug!("{:?}", e);
errors.push(e);
}
}
0
Expand All @@ -69,8 +64,8 @@ pub(crate) async fn finish_at_least_handles<TRes: AffectedReplicasProvider, TErr
async fn call_at_least<TOp, TRes: AffectedReplicasProvider, TErr: Debug>(
target_nodes: impl Iterator<Item = TOp>,
at_least: usize,
f: impl Fn(TOp) -> JoinHandle<Result<NodeOutput<TRes>, NodeOutput<TErr>>>
) -> (FuturesUnordered<JoinHandle<Result<NodeOutput<TRes>, NodeOutput<TErr>>>>, Vec<NodeOutput<TRes>>, Vec<NodeOutput<TErr>>) {
f: impl Fn(TOp) -> NodeOutputJoinHandle<TRes, TErr>
) -> (FuturesUnordered<NodeOutputJoinHandle<TRes, TErr>>, Vec<NodeOutput<TRes>>, Vec<NodeOutput<TErr>>) {
let mut handles: FuturesUnordered<_> = target_nodes.map(|op| f(op)).collect();
trace!("total handles count: {}", handles.len());
let (oks, errors) = finish_at_least_handles(&mut handles, at_least).await;
Expand All @@ -80,7 +75,7 @@ async fn call_at_least<TOp, TRes: AffectedReplicasProvider, TErr: Debug>(


async fn finish_all_handles<TErr: Debug>(
handles: &mut FuturesUnordered<JoinHandle<Result<NodeOutput<()>, NodeOutput<TErr>>>>
handles: &mut FuturesUnordered<NodeOutputJoinHandle<(), TErr>>
) -> Vec<NodeOutput<TErr>> {
let mut ok_count = 0;
let mut total_count = 0;
Expand All @@ -96,7 +91,7 @@ async fn finish_all_handles<TErr: Debug>(

async fn call_all<TOp, TErr: Debug>(
operations: impl Iterator<Item = TOp>,
f: impl Fn(TOp) -> JoinHandle<Result<NodeOutput<()>, NodeOutput<TErr>>>,
f: impl Fn(TOp) -> NodeOutputJoinHandle<(), TErr>,
) -> (usize, Vec<NodeOutput<TErr>>) {
let mut handles: FuturesUnordered<_> = operations.map(|op| f(op)).collect();
let handles_len = handles.len();
Expand Down Expand Up @@ -211,16 +206,17 @@ fn call_node_put(
node: Node,
options: BobPutOptions,
affected_replicas: usize
) -> JoinHandle<Result<NodeOutput<RemotePutResponse>, NodeOutput<RemotePutError>>> {
) -> NodeOutputJoinHandle<RemotePutResponse, RemotePutError> {
debug!("PUT[{}] put to {}", key, node.name());
let name = node.name().clone();
let task = async move {
let grpc_options = options.to_grpc();
let call_result = LinkManager::call_node(&node, |conn| conn.put(key, data, grpc_options).boxed()).await;
call_result
.map(|o| o.map(|_| RemotePutResponse::new(affected_replicas)))
.map_err(|o| o.map(|e| RemotePutError::new(affected_replicas, e)))
};
tokio::spawn(task)
NodeOutputJoinHandle::new(tokio::spawn(task), name)
idruzhitskiy marked this conversation as resolved.
Show resolved Hide resolved
}


Expand Down Expand Up @@ -418,15 +414,16 @@ fn call_node_delete(
meta: BobMeta,
options: BobDeleteOptions,
node: Node,
) -> JoinHandle<Result<NodeOutput<()>, NodeOutput<RemoteDeleteError>>> {
) -> NodeOutputJoinHandle<(), RemoteDeleteError> {
trace!("DELETE[{}] delete to {}", key, node.name());
let name = node.name().clone();
let task = async move {
let force_alien_nodes_copy = options.force_delete_nodes().iter().cloned().collect();
let grpc_options = options.to_grpc();
let call_result = LinkManager::call_node(&node, |conn| conn.delete(key, meta, grpc_options).boxed()).await;
call_result.map_err(|err| err.map(|inner| RemoteDeleteError::new(force_alien_nodes_copy, inner)))
};
tokio::spawn(task)
NodeOutputJoinHandle::new(tokio::spawn(task), name)
}


Expand Down
5 changes: 2 additions & 3 deletions bob/src/cluster/quorum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,16 +97,15 @@ impl Quorum {
debug!("PUT[{}] ~~~BACKGROUND PUT TO REMOTE NODES~~~", key);
while let Some(join_res) = rest_tasks.next().await {
match join_res {
Ok(Ok(output)) => debug!(
Ok(output) => debug!(
"PUT[{}] successful background put to: {}",
key,
output.node_name()
),
Ok(Err(e)) => {
Err(e) => {
error!("{:?}", e);
failed_nodes.push(e.node_name().clone());
}
Err(e) => error!("{:?}", e),
}
}
debug!("PUT[{}] ~~~PUT TO REMOTE NODES ALIEN~~~", key);
Expand Down
29 changes: 29 additions & 0 deletions bob/src/cluster/support_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,3 +157,32 @@ impl IndexMap {
});
}
}

pub(crate) struct NodeOutputJoinHandle<Res, Err>(JoinHandle<Result<NodeOutput<Res>, NodeOutput<Err>>>, NodeName);

impl<Res, Err> NodeOutputJoinHandle<Res, Err> {
pub(super) fn new(handle: JoinHandle<Result<NodeOutput<Res>, NodeOutput<Err>>>, node_name: NodeName) -> Self {
Self(handle, node_name)
}
}

impl<Res, Err> Future for NodeOutputJoinHandle<Res, Err> {
type Output = Result<NodeOutput<Res>, NodeOutput<Err>>;

fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<Self::Output> {
use std::task::Poll;

match self.0.poll_unpin(cx) {
Poll::Ready(Ok(r)) => Poll::Ready(r),
Poll::Ready(Err(e)) => {
error!("{:?}", e);
if e.is_panic() {
panic!("panic in thread");
}
panic!("cancellation is not supported")
},
Poll::Pending => std::task::Poll::Pending
}
}

}
Loading