diff --git a/sim-lib/src/lib.rs b/sim-lib/src/lib.rs index f5412ba3..b26f2251 100644 --- a/sim-lib/src/lib.rs +++ b/sim-lib/src/lib.rs @@ -135,6 +135,12 @@ pub enum SimulationError { RandomActivityError(RandomActivityError), #[error("System Time Error: {0}")] SystemTimeError(#[from] SystemTimeError), + #[error("Missing Node Error: {0}")] + MissingNodeError(String), + #[error("Mpsc Channel Error: {msg}: {error_str}")] + MpscChannelError{msg: String, error_str: String}, + #[error("Payment Generation Error: {0}")] + PaymentGenerationError(PaymentGenerationError) } #[derive(Debug, Error)] @@ -314,7 +320,7 @@ impl Display for Payment { /// SimulationEvent describes the set of actions that the simulator can run on nodes that it has execution permissions /// on. -#[derive(Clone)] +#[derive(Clone, Debug)] enum SimulationEvent { /// Dispatch a payment of the specified amount to the public key provided. /// Results in `SimulationOutput::SendPaymentSuccess` or `SimulationOutput::SendPaymentFailure`. @@ -462,10 +468,7 @@ impl Simulation { } } - log::info!( - "Simulation is running on {}.", - running_network.expect("Invalid network") - ); + log::info!("Simulation is running on {}.", running_network.expect("Invalid network")); Ok(()) } @@ -526,7 +529,7 @@ impl Simulation { }); } - // We always want to wait ofr all threads to exit, so we wait for all of them to exit and track any errors + // We always want to wait for all threads to exit, so we wait for all of them to exit and track any errors // that surface. It's okay if there are multiple and one is overwritten, we just want to know whether we // exited with an error or not. let mut success = true; @@ -558,13 +561,21 @@ impl Simulation { // Create a sender/receiver pair that will be used to report final results of simulation. let (results_sender, results_receiver) = channel(1); - tasks.spawn(produce_simulation_results( - self.nodes.clone(), - output_receiver, - results_sender, - listener.clone(), - shutdown.clone(), - )); + let nodes = self.nodes.clone(); + // psr: produce simulation results + let psr_listener = listener.clone(); + let psr_shutdown = shutdown.clone(); + tasks.spawn(async move { + if let Err(e) = produce_simulation_results( + nodes, + output_receiver, + results_sender, + psr_listener, + ).await { + psr_shutdown.trigger(); + log::error!("Produce simulation results exited with error: {e:?}."); + } + }); let result_logger = Arc::new(Mutex::new(PaymentResultLogger::new())); @@ -574,13 +585,20 @@ impl Simulation { Duration::from_secs(60), )); - tasks.spawn(consume_simulation_results( - result_logger, - results_receiver, - listener, - shutdown, - self.write_results.clone(), - )); + // csr: consume simulation results + let csr_write_results = self.write_results.clone(); + tasks.spawn(async move { + if let Err(e) = consume_simulation_results( + result_logger, + results_receiver, + listener, + csr_write_results, + ).await { + shutdown.trigger(); + log::error!("Consume simulation results exited with error: {e:?}."); + } + }); + log::debug!("Simulator data collection set up."); } @@ -691,13 +709,22 @@ impl Simulation { // Generate a consumer for the receiving end of the channel. It takes the event receiver that it'll pull // events from and the results sender to report the events it has triggered for further monitoring. - tasks.spawn(consume_events( - node.clone(), - receiver, - output_sender.clone(), - self.shutdown_trigger.clone(), - self.shutdown_listener.clone(), - )); + // ce: consume event + let ce_listener = self.shutdown_listener.clone(); + let ce_shutdown = self.shutdown_trigger.clone(); + let ce_output_sender = output_sender.clone(); + let ce_node = node.clone(); + tasks.spawn(async move { + if let Err(e) = consume_events( + ce_node, + receiver, + ce_output_sender, + ce_listener, + ).await { + ce_shutdown.trigger(); + log::debug!("Event consumer exited with error: {e:?}."); + } + }); } channels @@ -719,14 +746,22 @@ impl Simulation { ))), )?; - tasks.spawn(produce_events( - executor.source_info, - executor.network_generator, - executor.payment_generator, - sender.clone(), - self.shutdown_trigger.clone(), - self.shutdown_listener.clone(), - )); + // pe: produce events + let pe_shutdown = self.shutdown_trigger.clone(); + let pe_listener = self.shutdown_listener.clone(); + let pe_sender = sender.clone(); + tasks.spawn(async move { + if let Err(e) = produce_events( + executor.source_info, + executor.network_generator, + executor.payment_generator, + pe_sender, + pe_listener, + ).await { + pe_shutdown.trigger(); + log::debug!("Event producer exited with error {e}."); + } + }); } Ok(()) @@ -734,110 +769,84 @@ impl Simulation { } /// events that are crated for a lightning node that we can execute events on. Any output that is generated from the -/// event being executed is piped into a channel to handle the result of the event. If it exits, it will use the -/// trigger provided to trigger shutdown in other threads. If an error occurs elsewhere, we expect the senders -/// corresponding to our receiver to be dropped, which will cause the receiver to error out and -/// exit. +/// event being executed is piped into a channel to handle the result of the event. async fn consume_events( node: Arc>, mut receiver: Receiver, sender: Sender, - shutdown: Trigger, listener: Listener, -) { +) -> Result<(), SimulationError> { log::debug!("Started consumer for {}.", node.lock().await.get_info()); loop { select! { biased; _ = listener.clone() => { - let node = node.lock().await; - let node_info = node.get_info(); - log::debug!("Simulation event consumer for {node_info} received shutdown signal"); + log::debug!("Simulation event consumer for node {} received shutdown signal", node.lock().await.get_info()); break; }, simulation_event = receiver.recv() => { - match simulation_event { - Some(event) => { - match event { - SimulationEvent::SendPayment(dest, amt_msat) => { - let mut node = node.lock().await; - - let mut payment = Payment { - source: node.get_info().pubkey, - hash: None, - amount_msat: amt_msat, - destination: dest.pubkey, - dispatch_time: SystemTime::now(), - }; - - let outcome = match node.send_payment(dest.pubkey, amt_msat).await { - Ok(payment_hash) => { - log::debug!( - "Send payment: {} -> {}: ({}).", - node.get_info(), - dest, - hex::encode(payment_hash.0) - ); - // We need to track the payment outcome using the payment hash that we have received. - payment.hash = Some(payment_hash); - SimulationOutput::SendPaymentSuccess(payment) - } - Err(e) => { - log::error!( - "Error while sending payment {} -> {}.", - node.get_info(), - dest - ); - - match e { - LightningError::PermanentError(s) => { - log::error!("Simulation terminated with error: {s}."); - shutdown.trigger(); - break; - } - _ => SimulationOutput::SendPaymentFailure( - payment, - PaymentResult::not_dispatched(), - ), - } - } - }; - - select! { - biased; - _ = listener.clone() => { - let sim_output = outcome.clone(); - log::debug!("Simulation output producer for {sim_output:?} received shutdown signal"); - break; - - }, - send_output_result = sender.send(outcome.clone()) => { - match send_output_result { - Ok(_) => {}, - Err(e) => { - log::error!("Error sending simulation output: {:?}.", e); - shutdown.trigger(); - break; - } + if let Some(event) = simulation_event { + match event { + SimulationEvent::SendPayment(dest, amt_msat) => { + let mut node = node.lock().await; + + let mut payment = Payment { + source: node.get_info().pubkey, + hash: None, + amount_msat: amt_msat, + destination: dest.pubkey, + dispatch_time: SystemTime::now(), + }; + + let outcome = match node.send_payment(dest.pubkey, amt_msat).await { + Ok(payment_hash) => { + log::debug!( + "Send payment: {} -> {}: ({}).", + node.get_info(), + dest, + hex::encode(payment_hash.0) + ); + // We need to track the payment outcome using the payment hash that we have received. + payment.hash = Some(payment_hash); + SimulationOutput::SendPaymentSuccess(payment) + } + Err(e) => { + log::error!( + "Error while sending payment {} -> {}.", + node.get_info(), + dest + ); + + match e { + LightningError::PermanentError(s) => { + log::error!("Simulation terminated with error: {s}."); + return Err(SimulationError::LightningError(LightningError::PermanentError(s))); } + _ => SimulationOutput::SendPaymentFailure( + payment, + PaymentResult::not_dispatched(), + ), } } + }; + + if let Err(e) = sender.send(outcome.clone()).await { + log::error!("Error sending simulation output: {:?}.", e); + return Err(SimulationError::MpscChannelError{ msg: format!("Error sending simulation output {outcome:?}."), error_str: e.to_string() }); } - }; - }, - None => { - return - }, + } + } + } else { + return Ok(()) } } } } - log::debug!( - "Stopped event consumer for {}.", - node.lock().await.get_info() + log::debug!("Stopped event consumer for node {}.", node.lock().await.get_info() ); + Ok(()) } /// produce events generates events for the activity description provided. It accepts a shutdown listener so it can @@ -847,9 +856,8 @@ async fn produce_events>, node_generator: Box, sender: Sender, - shutdown: Trigger, listener: Listener, -) { +) -> Result<(), SimulationError> { log::info!("Started activity producer for {source}: {node_generator}."); loop { @@ -880,8 +888,7 @@ async fn produce_events { log::error!("Could not get amount for {source} -> {destination}: {e}. Please report a bug!"); - shutdown.trigger(); - break; + return Err(SimulationError::PaymentGenerationError(e)); }, }; @@ -889,24 +896,11 @@ async fn produce_events { - log::debug!("Simulation event producer for {source} received shutdown signal"); - break; - }, - send_event_result = sender.send(event) => { - match send_event_result { - Ok(_) => {}, - Err(e) => { - log::error!( - "Stopped random producer for {amount}: {source} -> {destination}. Consumer error: {e}.", - ); - shutdown.trigger(); - break; - }, - } - } + if let Err(e) = sender.send(event.clone()).await { + log::debug!( + "Stopped random producer for {amount}: {source} -> {destination}. Consumer error: {e}.", + ); + return Err(SimulationError::MpscChannelError { msg: format!("Error sending simulation event {event:?} to consumer"), error_str: e.to_string() }); } }, @@ -914,36 +908,33 @@ async fn produce_events>, receiver: Receiver<(Payment, PaymentResult)>, listener: Listener, - shutdown: Trigger, write_results: Option, -) { +) -> Result<(), SimulationError> { log::debug!("Simulation results consumer started."); select! { biased; _ = listener.clone() => { - log::debug!("Error writing payment result. Shutdown signal received."); + log::debug!("Consume simulation result received shutdown signal."); }, write_result = write_payment_results(logger, receiver, listener, write_results) => { - match write_result { - Ok(_) => {}, - Err(e) => { - log::error!("Error while reporting payment results: {:?}.", e); - shutdown.trigger(); - return; - } + if let Err(e) = write_result { + log::error!("Error while reporting payment results: {:?}.", e); + return Err(e); } } } log::debug!("Simulation results consumer exiting."); + Ok(()) } async fn write_payment_results( @@ -969,7 +960,7 @@ async fn write_payment_results( select! { biased; _ = listener.clone() => { - log::debug!("Simulation results consumer received shutdown signal."); + log::debug!("Write payment results received shutdown signal."); return writer.map_or(Ok(()), |(ref mut w, _)| w.flush().map_err(|_| SimulationError::FileError)) }, payment_report = receiver.recv() => { @@ -1072,65 +1063,67 @@ async fn produce_simulation_results( mut output_receiver: Receiver, results: Sender<(Payment, PaymentResult)>, listener: Listener, - shutdown: Trigger, -) { +) -> Result<(), SimulationError> { log::debug!("Simulation results producer started."); let mut set = tokio::task::JoinSet::new(); loop { - select! { + tokio::select! { biased; - _ = listener.clone() => + _ = listener.clone() => { + log::debug!("Produce simulation results received shutdown signal"); break - , + }, output = output_receiver.recv() => { - match output{ + match output { Some(simulation_output) => { match simulation_output{ SimulationOutput::SendPaymentSuccess(payment) => { if let Some(source_node) = nodes.get(&payment.source) { set.spawn(track_payment_result( - source_node.clone(), results.clone(), payment, listener.clone(), shutdown.clone() + source_node.clone(), results.clone(), payment, listener.clone() )); + } else { + log::debug!("Can't find source node with public key: {}.", payment.source); + return Err(SimulationError::MissingNodeError(format!("Source node with public key: {} unavailable.", payment.source))); } }, SimulationOutput::SendPaymentFailure(payment, result) => { - select! { - biased; - _ = listener.clone() => { - log::debug!("Handling for failed payment {payment} received a shutdown signal"); - break; - }, - send_payment_result = results.send((payment, result)) => { - match send_payment_result { - Ok(_) => {}, - Err(e) => { - log::error!("Could not send payment result. Send error: {e}"); - shutdown.trigger(); - } - } - } + if let Err(e) = results.send((payment, result.clone())).await { + log::debug!("Could not send payment result."); + return Err(SimulationError::MpscChannelError{ msg: format!("Failed to send payment result {result} for payment {payment}."), error_str: e.to_string() }); } } }; - }, - None => { - return + None => return Ok(()) + } + }, + track_payment = set.join_next() => { + match track_payment { + Some(res) => match res { + Ok(track_payment_res) => { + if let Err(e) = track_payment_res { + log::error!("Track payment result task exited with error: {e}."); + return Err(e); + } + }, + Err(joinset_error) => { + log::error!("Track payment result task did not execute to completion. Error: {joinset_error}."); + return Err(SimulationError::TaskError); + }, } + None => (), } } } } + log::debug!("Simulation results producer exiting."); - while let Some(res) = set.join_next().await { - if let Err(e) = res { - log::error!("Simulation results producer task exited with error: {e}."); - shutdown.trigger(); - } - } + + Ok(()) } async fn track_payment_result( @@ -1138,8 +1131,7 @@ async fn track_payment_result( results: Sender<(Payment, PaymentResult)>, payment: Payment, listener: Listener, - shutdown: Trigger, -) { +) -> Result<(), SimulationError> { log::trace!("Payment result tracker starting."); let mut node = node.lock().await; @@ -1176,18 +1168,17 @@ async fn track_payment_result( select! { biased; _ = listener.clone() => { - log::debug!("Track payment received a shutdown signal") + log::debug!("Track payment result received a shutdown signal."); }, - send_payment_result = results.send((payment, res)) => { - match send_payment_result { - Ok(_) => {}, - Err(e) => { - log::error!("Could not send payment result. Send error: {e}"); - shutdown.trigger(); - }, + send_payment_result = results.send((payment, res.clone())) => { + if let Err(e) = send_payment_result { + log::error!("Could not send payment result. Send error: {e}."); + return Err(SimulationError::MpscChannelError { msg: format!("Failed to send payment result {res} for payment {payment}."), error_str: e.to_string() }) } } } - log::trace!("Payment result tracker exiting."); + log::trace!("Result tracking complete. Payment result tracker exiting."); + + Ok(()) }