-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathheartbeats.rs
82 lines (70 loc) · 3.11 KB
/
heartbeats.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
use std::collections::HashMap;
use std::time::Duration;
use hydroflow_plus::*;
use stageleft::*;
use super::heartbeats_protocol::Message;
pub fn heartbeats<'a, D: Deploy<'a>>(
flow: &'a FlowBuilder<'a, D>,
cluster_spec: &impl ClusterSpec<'a, D>,
) -> D::Cluster {
let cluster = flow.cluster(cluster_spec);
// members: a persistent hf+ collection of the cluster ids
let members = cluster.source_iter(cluster.ids()).all_ticks(); // persistent state across ticks
// generate a heartbeat every 100ms
let hbs = cluster
.source_interval(q!(Duration::from_millis(100)))
.map(q!(|_| Message::Heartbeat))
.tick_batch(); // transient state per tick
// generate a heartbeat msg for each recipient
let node_ack_pairs = hbs
.cross_product(&members.cloned())
.map(q!(|(msg, id)| (id, msg)));
// scatter heartbeats, gather acks
let acks = hbs
.broadcast_bincode_tagged(&cluster) // broadcast to cluster, tagged with sender id
// at each cluster member
.inspect(q!(|n| println!("Received {:?}", n))) // debugging info
.map(q!(|(id, _m)| (id, Message::HeartbeatAck))) // generate an Ack
.filter(q!(|_m| rand::random::<f32>() < 0.3)) // artificial drop for testing
.demux_bincode_tagged(&cluster) // return Ack to sender
// back at sender
.inspect(q!(|n| println!("Got Ack {:?}", n)))
.tick_batch();
// track each nodes sent heartbeats and sets the unacked count to 0 when an ack comes in
// this state is persistent across ticks (all_ticks())
let unacked_hbs = node_ack_pairs.union(&acks).all_ticks().fold(
q!(HashMap::<u32, usize>::new), // state is a hashmap from node_id => count
q!(|accum, (id, msg)| {
match msg {
Message::Heartbeat => {
// increment for each Heartbeat
*accum.entry(id).or_insert(0) += 1
}
Message::HeartbeatAck => {
// reset to 0 on any HeartbeatAck
*accum.entry(id).or_insert(0) = 0
}
}
}),
);
// every second, check which nodes have missed their last 3 heartbeats or more
cluster
.source_interval(q!(Duration::from_millis(1000)))
.tick_batch() // this "pulse" is transient state
.cross_product(&unacked_hbs) // attach a handle to the unacked_hbs state
.map(q!(|(_, unacked_hbs)| unacked_hbs))
.flat_map(q!(|h| h.into_iter())) // go through the entries of unacked_hbs
.filter(q!(|(_key, value)| *value > 3)) // a node has counted 3 unanswered acks in a row
.for_each(q!(|n| println!("---------\ndead_list: {:?}\n---------", n))); // debugging
cluster
}
use hydroflow_plus::util::cli::HydroCLI;
use hydroflow_plus_cli_integration::{CLIRuntime, HydroflowPlusMeta};
#[stageleft::entry]
pub fn heartbeats_runtime<'a>(
flow: &'a FlowBuilder<'a, CLIRuntime>,
cli: RuntimeData<&'a HydroCLI<HydroflowPlusMeta>>,
) -> impl Quoted<'a, Hydroflow<'a>> {
let _ = heartbeats(flow, &cli);
flow.build(q!(cli.meta.subgraph_id))
}