Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: use listen and trigger universally #164

Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -3,4 +3,4 @@ node_modules
*.json
activity-generator/releases/*
.DS_Store
/results
/results
enigbe marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: newline still here - might be editor automatically adding it

75 changes: 32 additions & 43 deletions docs/ARCHITECTURE.md
Original file line number Diff line number Diff line change
@@ -2,23 +2,25 @@

## Simulation Stages

Simulation activities are broken down into four stages:
1. Simulation Event: an action that the simulator should execute,
produced from the user-provided description of desired activities.
Simulation activities are broken down into four stages:

1. Simulation Event: an action that the simulator should execute,
produced from the user-provided description of desired activities.
2. Simulation Output: the output of executing an simulation event. Note
that this is not the final outcome of the event, as the event itself
that this is not the final outcome of the event, as the event itself
may take a non-trivial amount of time to complete.
3. Result Tracking: per-output tracking of the final outcome of the
3. Result Tracking: per-output tracking of the final outcome of the
simulation event.
4. Simulation Result: recording of the final result as provided by
4. Simulation Result: recording of the final result as provided by
result tracking, which is a final result of the simulation.

## Activity Generation

The simulator uses tokio's asynchronous multi-producer, single-consumer
channels to implement a consume/producer architecture to produce,
execute and track simulation events. Stages of the simulation
communicate by passing the _receiver_ for the next stage to the
consumer from the previous stage, allowing each stage to pass it's
channels to implement a consume/producer architecture to produce,
execute and track simulation events. Stages of the simulation
communicate by passing the _receiver_ for the next stage to the
consumer from the previous stage, allowing each stage to pass it's
output on to the next one.

```
@@ -52,38 +54,25 @@ output on to the next one.
```

### Shutdown
To ensure that all tasks shutdown on completion (or failure) of the
simulation, we relay on the following:
1. [Triggered](https://docs.rs/triggered/latest/triggered): a `Trigger`
that can be used to inform threads that it's time to shut down, and
a `Listener` that propagates this signal.
2. Channel mechanics: if all of the senders for a channel have been
dropped (in our context, all of the producers have exited) then the
corresponding receiving channel will error out on receiving. Likewise,
if the receiving channel is dropped, the sending channels will error
out.

All events are handled in a `tokio::select` to allow waiting on
multiple asynchronous tasks at once. These selects should be `biased`
on the exit case (ie, the `Listener` being triggered) so that we
prioritize exit above generating more events.

Practically, this means that we're handling the various shutdown
scenarios in the following way:

1. Consumer error:
- Receiving channel is dropped on consumer exit.
- Sending channels used by producers will error out.
- Producers will trigger shutdown.
- Listening producers will exit.

2. Producer error:
- Producers will trigger shutdown.
- Listening producers will exit.
- Once all producers have exited, the consuming channel's receiver
will error out causing it to exit.

3. Miscellaneous tasks:
- Trigger shutdown on exit.
- Listen for shutdown from other errors.
To ensure that all tasks shutdown on completion (or failure) of the
enigbe marked this conversation as resolved.
Show resolved Hide resolved
simulation, we relay on the following:

1. [Triggered](https://docs.rs/triggered/latest/triggered): a `Trigger`
that can be used to inform threads/tasks that it's time to shut down,
and a `Listener` that propagates this signal.
2. The (`Trigger`, `Listener`) pair are used with channels: if a channel
errors out across `send()` or `recv()`, shutdown is triggered. There is
no reliance on channel mechanics, i.e. errors generated when all senders
are and/or a receiver is dropped.
3. All events are handled in a `tokio::select` to allow waiting on
multiple asynchronous tasks at once. These selects should be `biased`
on the exit case (ie, the `Listener` being triggered) so that we
prioritize exit above generating more events.
4. Additionally, we `select!` on shutdown signal on `send()`/`recv()`
for all channels to guarantee this:
- A task's receiver exiting while one or more corresponding senders
(in different tasks) are actively sending, doesn't result in the
sending tasks erroring due to channel `SendError`. Any sender's
inability to `send()` due to a dropped receiver triggers a clean
shutdown across all listening tasks.
444 changes: 265 additions & 179 deletions sim-lib/src/lib.rs
Original file line number Diff line number Diff line change
@@ -10,7 +10,7 @@ use std::collections::HashSet;
use std::fmt::{Display, Formatter};
use std::marker::Send;
use std::path::PathBuf;
use std::time::UNIX_EPOCH;
use std::time::{SystemTimeError, UNIX_EPOCH};
use std::{collections::HashMap, sync::Arc, time::SystemTime};
use thiserror::Error;
use tokio::sync::mpsc::{channel, Receiver, Sender};
@@ -50,12 +50,18 @@ impl NodeId {
crate::NodeId::PublicKey(pk) => {
if pk != node_id {
return Err(LightningError::ValidationError(format!(
"the provided node id does not match the one returned by the backend ({} != {}).", pk, node_id)));
"the provided node id does not match the one returned by the backend ({} != {}).",
pk, node_id
)));
}
},
crate::NodeId::Alias(a) => {
if alias != a {
log::warn!("The provided alias does not match the one returned by the backend ({} != {}).", a, alias)
log::warn!(
"The provided alias does not match the one returned by the backend ({} != {}).",
a,
alias
)
}
*alias = a.to_string();
},
@@ -167,6 +173,14 @@ pub enum SimulationError {
RandomActivityError(RandomActivityError),
#[error("Simulated Network Error: {0}")]
SimulatedNetworkError(String),
#[error("System Time Error: {0}")]
SystemTimeError(#[from] SystemTimeError),
#[error("Missing Node Error: {0}")]
MissingNodeError(String),
#[error("Mpsc Channel Error: {0}")]
MpscChannelError(String),
#[error("Payment Generation Error: {0}")]
PaymentGenerationError(PaymentGenerationError),
}

#[derive(Debug, Error)]
@@ -324,11 +338,16 @@ struct Payment {

impl Display for Payment {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
let dispatch_time = self
.dispatch_time
.duration_since(UNIX_EPOCH)
.expect("Failed to compute duration since unix epoch.");

write!(
f,
"Payment {} dispatched at {:?} sending {} msat from {} -> {}",
"Payment {} dispatched at {:?} sending {} msat from {} -> {}.",
self.hash.map(|h| hex::encode(h.0)).unwrap_or_default(),
self.dispatch_time.duration_since(UNIX_EPOCH).unwrap(),
dispatch_time,
self.amount_msat,
self.source,
self.destination,
@@ -338,14 +357,15 @@ impl Display for Payment {

/// SimulationEvent describes the set of actions that the simulator can run on nodes that it has execution permissions
/// on.
#[derive(Clone)]
#[derive(Clone, Debug)]
enum SimulationEvent {
/// Dispatch a payment of the specified amount to the public key provided.
/// Results in `SimulationOutput::SendPaymentSuccess` or `SimulationOutput::SendPaymentFailure`.
SendPayment(NodeInfo, u64),
}

/// SimulationOutput provides the output of a simulation event.
#[derive(Debug, Clone)]
enum SimulationOutput {
/// Intermediate output for when simulator has successfully dispatched a payment.
/// We need to track the result of the payment to report on it.
@@ -429,7 +449,10 @@ impl Simulation {
for node in self.nodes.values() {
let node = node.lock().await;
if !node.get_info().features.supports_keysend() {
return Err(LightningError::ValidationError(format!("All nodes eligible for random activity generation must support keysend, {} does not", node.get_info())));
return Err(LightningError::ValidationError(format!(
"All nodes eligible for random activity generation must support keysend, {} does not",
node.get_info()
)));
}
}
}
@@ -485,7 +508,10 @@ impl Simulation {
}
}

log::info!("Simulation is running on {}.", running_network.unwrap());
log::info!(
"Simulation is running on {}.",
running_network.expect("Network not provided.")
);

Ok(())
}
@@ -546,7 +572,7 @@ impl Simulation {
});
}

// We always want to wait ofr all threads to exit, so we wait for all of them to exit and track any errors
// We always want to wait for all threads to exit, so we wait for all of them to exit and track any errors
// that surface. It's okay if there are multiple and one is overwritten, we just want to know whether we
// exited with an error or not.
let mut success = true;
@@ -572,32 +598,63 @@ impl Simulation {
tasks: &mut JoinSet<()>,
) {
let listener = self.shutdown_listener.clone();
let shutdown = self.shutdown_trigger.clone();
log::debug!("Setting up simulator data collection.");

// Create a sender/receiver pair that will be used to report final results of simulation.
let (results_sender, results_receiver) = channel(1);

tasks.spawn(produce_simulation_results(
self.nodes.clone(),
output_receiver,
results_sender,
listener.clone(),
));
let nodes = self.nodes.clone();
// psr: produce simulation results
let psr_listener = listener.clone();
let psr_shutdown = shutdown.clone();
tasks.spawn(async move {
log::debug!("Starting simulation results producer.");
if let Err(e) =
produce_simulation_results(nodes, output_receiver, results_sender, psr_listener)
.await
{
psr_shutdown.trigger();
log::error!("Produce simulation results exited with error: {e:?}.");
} else {
log::debug!("Produce simulation results received shutdown signal.");
}
});

let result_logger = Arc::new(Mutex::new(PaymentResultLogger::new()));

tasks.spawn(run_results_logger(
listener.clone(),
result_logger.clone(),
Duration::from_secs(60),
));
let result_logger_clone = result_logger.clone();
let result_logger_listener = listener.clone();
tasks.spawn(async move {
log::debug!("Starting results logger.");
run_results_logger(
result_logger_listener,
result_logger_clone,
Duration::from_secs(60),
)
.await;
log::debug!("Exiting results logger.");
});

// csr: consume simulation results
let csr_write_results = self.write_results.clone();
tasks.spawn(async move {
log::debug!("Staring simulation results consumer.");
if let Err(e) = consume_simulation_results(
result_logger,
results_receiver,
listener,
csr_write_results,
)
.await
{
shutdown.trigger();
log::error!("Consume simulation results exited with error: {e:?}.");
} else {
log::debug!("Consume simulation result received shutdown signal.");
}
});

tasks.spawn(consume_simulation_results(
result_logger,
results_receiver,
listener,
self.write_results.clone(),
));
log::debug!("Simulator data collection set up.");
}

@@ -708,12 +765,23 @@ impl Simulation {

// Generate a consumer for the receiving end of the channel. It takes the event receiver that it'll pull
// events from and the results sender to report the events it has triggered for further monitoring.
tasks.spawn(consume_events(
node.clone(),
receiver,
output_sender.clone(),
self.shutdown_trigger.clone(),
));
// ce: consume event
let ce_listener = self.shutdown_listener.clone();
let ce_shutdown = self.shutdown_trigger.clone();
let ce_output_sender = output_sender.clone();
let ce_node = node.clone();
tasks.spawn(async move {
let node_info = ce_node.lock().await.get_info().clone();
log::debug!("Starting events consumer for {}.", node_info);
if let Err(e) =
consume_events(ce_node, receiver, ce_output_sender, ce_listener).await
{
ce_shutdown.trigger();
log::error!("Event consumer exited with error: {e:?}.");
} else {
log::debug!("Event consumer for node {node_info} received shutdown signal.");
}
});
}

channels
@@ -735,88 +803,109 @@ impl Simulation {
))),
)?;

tasks.spawn(produce_events(
executor.source_info,
executor.network_generator,
executor.payment_generator,
sender.clone(),
self.shutdown_trigger.clone(),
self.shutdown_listener.clone(),
));
// pe: produce events
let pe_shutdown = self.shutdown_trigger.clone();
let pe_listener = self.shutdown_listener.clone();
let pe_sender = sender.clone();
tasks.spawn(async move {
let source = executor.source_info.clone();

log::info!(
"Starting activity producer for {}: {}.",
source,
Comment on lines +811 to +815
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can just inline + clone because we don't have any locks here

executor.payment_generator
);

if let Err(e) = produce_events(
executor.source_info,
executor.network_generator,
executor.payment_generator,
pe_sender,
pe_listener,
)
.await
{
pe_shutdown.trigger();
log::debug!("Event producer exited with error {e}.");
} else {
log::debug!("Random activity generator for {source} received shutdown signal.");
}
});
}

Ok(())
}
}

/// events that are crated for a lightning node that we can execute events on. Any output that is generated from the
/// event being executed is piped into a channel to handle the result of the event. If it exits, it will use the
/// trigger provided to trigger shutdown in other threads. If an error occurs elsewhere, we expect the senders
/// corresponding to our receiver to be dropped, which will cause the receiver to error out and
/// exit.
/// event being executed is piped into a channel to handle the result of the event.
async fn consume_events(
node: Arc<Mutex<dyn LightningNode>>,
mut receiver: Receiver<SimulationEvent>,
sender: Sender<SimulationOutput>,
shutdown: Trigger,
) {
log::debug!("Started consumer for {}.", node.lock().await.get_info());

while let Some(event) = receiver.recv().await {
match event {
SimulationEvent::SendPayment(dest, amt_msat) => {
let mut node = node.lock().await;

let mut payment = Payment {
source: node.get_info().pubkey,
hash: None,
amount_msat: amt_msat,
destination: dest.pubkey,
dispatch_time: SystemTime::now(),
};
listener: Listener,
) -> Result<(), SimulationError> {
loop {
select! {
biased;
_ = listener.clone() => {
return Ok(());
},
simulation_event = receiver.recv() => {
if let Some(event) = simulation_event {
match event {
SimulationEvent::SendPayment(dest, amt_msat) => {
let mut node = node.lock().await;

let mut payment = Payment {
source: node.get_info().pubkey,
hash: None,
amount_msat: amt_msat,
destination: dest.pubkey,
dispatch_time: SystemTime::now(),
};

let outcome = match node.send_payment(dest.pubkey, amt_msat).await {
Ok(payment_hash) => {
log::debug!(
"Send payment: {} -> {}: ({}).",
node.get_info(),
dest,
hex::encode(payment_hash.0)
);
// We need to track the payment outcome using the payment hash that we have received.
payment.hash = Some(payment_hash);
SimulationOutput::SendPaymentSuccess(payment)
}
Err(e) => {
log::error!(
"Error while sending payment {} -> {}.",
node.get_info(),
dest
);

match e {
LightningError::PermanentError(s) => {
return Err(SimulationError::LightningError(LightningError::PermanentError(s)));
}
_ => SimulationOutput::SendPaymentFailure(
payment,
PaymentResult::not_dispatched(),
),
}
}
};

let outcome = match node.send_payment(dest.pubkey, amt_msat).await {
Ok(payment_hash) => {
log::debug!(
"Send payment: {} -> {}: ({}).",
node.get_info(),
dest,
hex::encode(payment_hash.0)
);
// We need to track the payment outcome using the payment hash that we have received.
payment.hash = Some(payment_hash);
SimulationOutput::SendPaymentSuccess(payment)
},
Err(e) => {
log::error!(
"Error while sending payment {} -> {}.",
node.get_info(),
dest
);

match e {
LightningError::PermanentError(s) => {
log::error!("Simulation terminated with error: {s}.");
shutdown.trigger();
break;
},
_ => SimulationOutput::SendPaymentFailure(
payment,
PaymentResult::not_dispatched(),
),
if sender.send(outcome.clone()).await.is_err() {
return Err(SimulationError::MpscChannelError(format!("Error sending simulation output {outcome:?}.")));
}
}
},
};

match sender.send(outcome).await {
Ok(_) => {},
Err(e) => {
log::error!("Error sending action outcome: {:?}.", e);
break;
},
}
} else {
return Ok(())
}
},
};
}
}
}
}

@@ -827,20 +916,16 @@ async fn produce_events<N: DestinationGenerator + ?Sized, A: PaymentGenerator +
network_generator: Arc<Mutex<N>>,
node_generator: Box<A>,
sender: Sender<SimulationEvent>,
shutdown: Trigger,
listener: Listener,
) {
log::info!("Started activity producer for {source}: {node_generator}.");

) -> Result<(), SimulationError> {
loop {
let wait = node_generator.next_payment_wait();
log::debug!("Next payment for {source} in {:?} seconds.", wait);

select! {
biased;
_ = listener.clone() => {
log::debug!("Random activity generator for {source} received signal to shut down.");
break;
return Ok(());
},
// Wait until our time to next payment has elapsed then execute a random amount payment to a random
// destination.
@@ -859,72 +944,54 @@ async fn produce_events<N: DestinationGenerator + ?Sized, A: PaymentGenerator +
amt
},
Err(e) => {
log::error!("Could not get amount for {source} -> {destination}: {e}. Please report a bug!");
break;
return Err(SimulationError::PaymentGenerationError(e));
},
};

log::debug!("Generated random payment: {source} -> {}: {amount} msat.", destination);

// Send the payment, exiting if we can no longer send to the consumer.
let event = SimulationEvent::SendPayment(destination.clone(), amount);
if let Err(e) = sender.send(event).await {
log::debug!(
"Stopped random producer for {amount}: {source} -> {destination}. Consumer error: {e}.",
);
break;
if sender.send(event.clone()).await.is_err() {
return Err(SimulationError::MpscChannelError (format!("Stopped random producer for {amount}: {source} -> {destination}.")));
}

},
}
}

log::debug!("Stopped random activity producer {source}.");
shutdown.trigger();
}

async fn consume_simulation_results(
logger: Arc<Mutex<PaymentResultLogger>>,
receiver: Receiver<(Payment, PaymentResult)>,
listener: Listener,
write_results: Option<WriteResults>,
) {
log::debug!("Simulation results consumer started.");

if let Err(e) = write_payment_results(logger, receiver, listener, write_results).await {
log::error!("Error while reporting payment results: {:?}.", e);
}

log::debug!("Simulation results consumer exiting.");
}

async fn write_payment_results(
logger: Arc<Mutex<PaymentResultLogger>>,
mut receiver: Receiver<(Payment, PaymentResult)>,
listener: Listener,
write_results: Option<WriteResults>,
) -> Result<(), SimulationError> {
let mut writer = write_results.and_then(|write_result| {
let file = write_result.results_dir.join(format!(
"simulation_{:?}.csv",
SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_secs()
));
let writer = WriterBuilder::new().from_path(file).ok()?;
Some((writer, write_result.batch_size))
});
let mut writer = match write_results {
Some(res) => {
let duration = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH)?;
let file = res
.results_dir
.join(format!("simulation_{:?}.csv", duration));
let writer = WriterBuilder::new().from_path(file)?;
Some((writer, res.batch_size))
},
None => None,
};

let mut counter = 1;

loop {
tokio::select! {
select! {
biased;
_ = listener.clone() => {
log::debug!("Simulation results consumer received shutdown signal.");
return writer.map_or(Ok(()), |(ref mut w, _)| w.flush().map_err(|_| SimulationError::FileError))
writer.map_or(Ok(()), |(ref mut w, _)| w.flush().map_err(|_| {
SimulationError::FileError
}))?;
return Ok(());
carlaKC marked this conversation as resolved.
Show resolved Hide resolved
Comment on lines +988 to +991
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't need ? + return Ok(()) -> can just return writer.map_or...

},
payment_report = receiver.recv() => {
match payment_report {
payment_result = receiver.recv() => {
match payment_result {
Some((details, result)) => {
logger.lock().await.report_result(&details, &result);
log::trace!("Resolved dispatched payment: {} with: {}.", details, result);
@@ -936,7 +1003,9 @@ async fn write_payment_results(
})?;
counter = counter % batch_size + 1;
if batch_size == counter {
w.flush().map_err(|_| SimulationError::FileError)?;
w.flush().map_err(|_| {
SimulationError::FileError
})?;
}
}
},
@@ -985,12 +1054,14 @@ impl Display for PaymentResultLogger {
}
}

/// Reports a summary of payment results at a duration specified by `interval`
/// Note that `run_results_logger` does not error in any way, thus it has no
/// trigger. It listens for triggers to ensure clean exit.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: line wrap at 120

async fn run_results_logger(
listener: Listener,
logger: Arc<Mutex<PaymentResultLogger>>,
interval: Duration,
) {
log::debug!("Results logger started.");
log::info!("Summary of results will be reported every {:?}.", interval);

loop {
@@ -1005,8 +1076,6 @@ async fn run_results_logger(
}
}
}

log::debug!("Results logger stopped.")
}

/// produce_results is responsible for receiving the outputs of events that the simulator has taken and
@@ -1022,64 +1091,71 @@ async fn produce_simulation_results(
nodes: HashMap<PublicKey, Arc<Mutex<dyn LightningNode>>>,
mut output_receiver: Receiver<SimulationOutput>,
results: Sender<(Payment, PaymentResult)>,
shutdown: Listener,
) {
log::debug!("Simulation results producer started.");

listener: Listener,
) -> Result<(), SimulationError> {
let mut set = tokio::task::JoinSet::new();

loop {
tokio::select! {
biased;
_ = shutdown.clone() => break,
_ = listener.clone() => {
return Ok(())
},
output = output_receiver.recv() => {
match output{
match output {
Some(simulation_output) => {
match simulation_output{
SimulationOutput::SendPaymentSuccess(payment) => {
let source_node = nodes.get(&payment.source).unwrap().clone();
set.spawn(track_payment_result(
source_node, results.clone(), payment, shutdown.clone(),
));
if let Some(source_node) = nodes.get(&payment.source) {
carlaKC marked this conversation as resolved.
Show resolved Hide resolved
set.spawn(track_payment_result(
source_node.clone(), results.clone(), payment, listener.clone()
));
Comment on lines +1110 to +1112
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we handle errors returned by track_payment_result and trigger shutdown here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we should. I refactored produce_simulation_results to include an additional branch to wait on concurrently. Within this branch, we propagate any track_payment_result error to produce_simulation_results and trigger shutdown at the latter's spawn site.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, realize that we weren't actually waiting on that set at all before this ☠️

This method is an interesting one (/different to our other ones) because it has its own set of tasks that it should wait on. As is, if we get the shutdown listener signal we won't wait for all the spawned payment tracking tasks to complete (which is messy).

Don't need to update in this PR, let's gettit in, but note to self to create an issue/fix this up in future!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've taken note of this and will create an issue for better handling of exits on all tasks spawned in set.

} else {
return Err(SimulationError::MissingNodeError(format!("Source node with public key: {} unavailable.", payment.source)));
}
},
SimulationOutput::SendPaymentFailure(payment, result) => {
if results.clone().send((payment, result)).await.is_err() {
log::debug!("Could not send payment result.");
if results.send((payment, result.clone())).await.is_err() {
return Err(SimulationError::MpscChannelError(
format!("Failed to send payment result: {result} for payment {:?} dispatched at {:?}.", payment.hash, payment.dispatch_time),
));
}
}
};

},
None => {
return
None => return Ok(())
}
},
track_payment = set.join_next() => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: add a TODO explaining that we're not going to wait for all tasks to exit, just so we don't forget

if let Some(res) = track_payment {
match res {
Ok(track_payment_res) => {
track_payment_res?
},
Err(_) => {
return Err(SimulationError::TaskError);
},
}
}
}
}
}

log::debug!("Simulation results producer exiting.");
while let Some(res) = set.join_next().await {
if let Err(e) = res {
log::error!("Simulation results producer task exited with error: {e}.");
}
}
}

async fn track_payment_result(
node: Arc<Mutex<dyn LightningNode>>,
results: Sender<(Payment, PaymentResult)>,
payment: Payment,
shutdown: Listener,
) {
listener: Listener,
) -> Result<(), SimulationError> {
log::trace!("Payment result tracker starting.");

let mut node = node.lock().await;

let res = match payment.hash {
Some(hash) => {
log::debug!("Tracking payment outcome for: {}.", hex::encode(hash.0));
let track_payment = node.track_payment(hash, shutdown.clone());
let track_payment = node.track_payment(hash, listener.clone());

match track_payment.await {
Ok(res) => {
@@ -1105,9 +1181,19 @@ async fn track_payment_result(
},
};

if results.clone().send((payment, res)).await.is_err() {
log::debug!("Could not send payment result.");
select! {
biased;
_ = listener.clone() => {
log::debug!("Track payment result received a shutdown signal.");
},
send_payment_result = results.send((payment, res.clone())) => {
if send_payment_result.is_err() {
return Err(SimulationError::MpscChannelError(format!("Failed to send payment result {res} for payment {payment}.")))
}
}
}

log::trace!("Payment result tracker exiting.");
log::trace!("Result tracking complete. Payment result tracker exiting.");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: logging at call site, not in function as with others.


Ok(())
}