-
Notifications
You must be signed in to change notification settings - Fork 30
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
refactor: use listen and trigger universally #164
refactor: use listen and trigger universally #164
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR! A few structural/stylistic comments on first pass.
One high level question that I'm wondering about is whether we can simplify this by always triggering shutdown
whenever we break one of the loop/select patterns in the code Right now we have to inline on shutdown, but if the expected behavior is to break the loop on error we could just have a single trigger on break? Perhaps with the exception of listener
triggering, because we know we can just return there.
Update: see comment below.
I think that could lead to simpler understanding of the shutdown logic (which is a nightmare right now), but I haven't looked at whether it works in every instance across the codebase. Interested to hear thoughts on this from others!
Discussed this PR a little more offline. Another option for a cleaner/more testable solution would be:
This saves us from having to pass shutdown all the way down to every task, and cuts down on the number of places where we need to call Eg, for our simulation results task:
|
Can be rebased on #160 and sincerest apolgies in advance for all the rebase conflicts :') |
26a6378
to
a86e3f6
Compare
I have refactored the handling of errors across all tasks. This reduces the number of places From the snippet from sim-lib/lib.rs::run below, we await the completion of all tasks in the join set while let Some(res) = tasks.join_next().await {
if let Err(e) = res {
// log::error!("Task exited with error: {e}.");
// success = false;
self.shutdown();
break;
}
} This would mean we have to have another loop waiting for all tasks to exit (post |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
vrrynice. Just some annoying nitpicking about logging and breaking from me, I think this is almost there!
Only major comment is about cleaning up the result writer functions a bit, but that's pre-existing.
docs/ARCHITECTURE.md
Outdated
1. [Triggered](https://docs.rs/triggered/latest/triggered): a `Trigger` | ||
that can be used to inform threads that it's time to shut down, and | ||
a `Listener` that propagates this signal. | ||
2. The (`Trigger`, `Listener`) pair are used with channels: if a channel errors out across `send()` or `receive()`, shutdown is triggered. There is no reliance on channel mechanics, i.e. errors generated when all senders are and/or a receiver is dropped. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would clarify this to note that our channels don't have buffers, so we have to select on the shutdown signal on send/receive to make sure that a receiver exiting before a sender does doesn't indefinitely hang.
Ie, this scenario:
Task 1: Sending into sender
Task 2: Receiving on receiver
- Task 1 sends into
sender
, unblocks due to buffer size of 1 that we use everywhere - Task 2 errors out before consuming from
receiver
- Task 1 wants to send into
sender
again, but can't because thereceiver
has shut down
If we always select on exit, then we don't run into this.
452d1da
to
d8ff458
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for addressing comments, think this only needs one more round!
Remaining comments are really about logging consistency - if we're logging an exit error at a function's call site there's not need to also log on error return (we'll double log). Would also like to have all the starting/stopping logs moved into the spawn
as well.
set.spawn(track_payment_result( | ||
source_node.clone(), results.clone(), payment, listener.clone() | ||
)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we handle errors returned by track_payment_result
and trigger shutdown here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, we should. I refactored produce_simulation_results
to include an additional branch to wait on concurrently. Within this branch, we propagate any track_payment_result
error to produce_simulation_results
and trigger shutdown at the latter's spawn site.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice, realize that we weren't actually waiting on that set at all before this ☠️
This method is an interesting one (/different to our other ones) because it has its own set of tasks that it should wait on. As is, if we get the shutdown listener signal we won't wait for all the spawned payment tracking tasks to complete (which is messy).
Don't need to update in this PR, let's gettit in, but note to self to create an issue/fix this up in future!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've taken note of this and will create an issue for better handling of exits on all tasks spawned in set
.
9a8a3c8
to
b84ce8d
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
alrightalrightalright!
Actually, actually last comments:
- We still need to flush our batched writer to disk on shutdown
- Take a look at line wrapping at 120 (I think that a few places are over / feel free to shorten error messagse)
Otherwise, you can go ahead and squash the fixups and we'll merge. Nice stuff 🏅
set.spawn(track_payment_result( | ||
source_node.clone(), results.clone(), payment, listener.clone() | ||
)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice, realize that we weren't actually waiting on that set at all before this ☠️
This method is an interesting one (/different to our other ones) because it has its own set of tasks that it should wait on. As is, if we get the shutdown listener signal we won't wait for all the spawned payment tracking tasks to complete (which is messy).
Don't need to update in this PR, let's gettit in, but note to self to create an issue/fix this up in future!
3ddaef0
to
d0be6ec
Compare
Testing this and it looks like Will take a look at the code, but iirc this is a regression since my last review (I tested last time and this was fine). |
That's weird. Taking another look right now. |
sim-lib/src/lib.rs
Outdated
log::error!("Event consumer exited with error: {e:?}."); | ||
}, | ||
let consume_event_node = ce_node.clone(); | ||
let node_guard = ce_node.lock().await; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is deadlocking with the lock in consume_events
!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lol I hate programming, can fix this with:
tasks.spawn(async move {
let node_info = ce_node.lock().await.get_info().clone();
log::debug!("Starting events consumer for {}.", node_info);
if let Err(e) =
consume_events(ce_node, receiver, ce_output_sender, ce_listener).await
{
ce_shutdown.trigger();
log::error!("Event consumer exited with error: {e:?}.");
} else {
log::debug!("Event consumer for node {node_info} received shutdown signal.");
}
});
Problem is that the node_guard
lock doesn't get dropped until after consume_events
(because we're still borrowing node_info
in the log). Had to crack out le old rust book for that one.
581eab9
to
46b4713
Compare
- additionally, remove every `unwrap()` call that could panic, replacing with error propagation and/or context with `expect()` - return Result<(), SimulationError> for all spawned tasks - handles triggering shutdown at call site for spawned tasks - move starting/stopping logs to spawn site
4b08725
to
5aa5f65
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
tACK at 5aa5f65.
Some minor comments that can be addressed in a followup if desired.
} | ||
|
||
log::trace!("Payment result tracker exiting."); | ||
log::trace!("Result tracking complete. Payment result tracker exiting."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: logging at call site, not in function as with others.
@@ -985,12 +1045,14 @@ impl Display for PaymentResultLogger { | |||
} | |||
} | |||
|
|||
/// Reports a summary of payment results at a duration specified by `interval` | |||
/// Note that `run_results_logger` does not error in any way, thus it has no | |||
/// trigger. It listens for triggers to ensure clean exit. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: line wrap at 120
writer.map_or(Ok(()), |(ref mut w, _)| w.flush().map_err(|_| { | ||
SimulationError::FileError | ||
}))?; | ||
return Ok(()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't need ?
+ return Ok(())
-> can just return writer.map_or...
/results |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: newline still here - might be editor automatically adding it
let source = executor.source_info.clone(); | ||
|
||
log::info!( | ||
"Starting activity producer for {}: {}.", | ||
source, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: can just inline + clone because we don't have any locks here
None => return Ok(()) | ||
} | ||
}, | ||
track_payment = set.join_next() => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: add a TODO explaining that we're not going to wait for all tasks to exit, just so we don't forget
What this PR does
Related Issue(s)
Notes