diff --git a/.gitignore b/.gitignore index a12a87b6..9a64cedc 100644 --- a/.gitignore +++ b/.gitignore @@ -3,4 +3,4 @@ node_modules *.json activity-generator/releases/* .DS_Store -/results \ No newline at end of file +/results diff --git a/docs/ARCHITECTURE.md b/docs/ARCHITECTURE.md index 60681d88..ec487e7c 100644 --- a/docs/ARCHITECTURE.md +++ b/docs/ARCHITECTURE.md @@ -2,23 +2,25 @@ ## Simulation Stages -Simulation activities are broken down into four stages: -1. Simulation Event: an action that the simulator should execute, - produced from the user-provided description of desired activities. +Simulation activities are broken down into four stages: + +1. Simulation Event: an action that the simulator should execute, + produced from the user-provided description of desired activities. 2. Simulation Output: the output of executing an simulation event. Note - that this is not the final outcome of the event, as the event itself + that this is not the final outcome of the event, as the event itself may take a non-trivial amount of time to complete. -3. Result Tracking: per-output tracking of the final outcome of the +3. Result Tracking: per-output tracking of the final outcome of the simulation event. -4. Simulation Result: recording of the final result as provided by +4. Simulation Result: recording of the final result as provided by result tracking, which is a final result of the simulation. ## Activity Generation + The simulator uses tokio's asynchronous multi-producer, single-consumer -channels to implement a consume/producer architecture to produce, -execute and track simulation events. Stages of the simulation -communicate by passing the _receiver_ for the next stage to the -consumer from the previous stage, allowing each stage to pass it's +channels to implement a consume/producer architecture to produce, +execute and track simulation events. Stages of the simulation +communicate by passing the _receiver_ for the next stage to the +consumer from the previous stage, allowing each stage to pass it's output on to the next one. ``` @@ -52,38 +54,25 @@ output on to the next one. ``` ### Shutdown -To ensure that all tasks shutdown on completion (or failure) of the -simulation, we relay on the following: -1. [Triggered](https://docs.rs/triggered/latest/triggered): a `Trigger` - that can be used to inform threads that it's time to shut down, and - a `Listener` that propagates this signal. -2. Channel mechanics: if all of the senders for a channel have been - dropped (in our context, all of the producers have exited) then the - corresponding receiving channel will error out on receiving. Likewise, - if the receiving channel is dropped, the sending channels will error - out. - -All events are handled in a `tokio::select` to allow waiting on -multiple asynchronous tasks at once. These selects should be `biased` -on the exit case (ie, the `Listener` being triggered) so that we -prioritize exit above generating more events. -Practically, this means that we're handling the various shutdown -scenarios in the following way: - -1. Consumer error: - - Receiving channel is dropped on consumer exit. - - Sending channels used by producers will error out. - - Producers will trigger shutdown. - - Listening producers will exit. - -2. Producer error: - - Producers will trigger shutdown. - - Listening producers will exit. - - Once all producers have exited, the consuming channel's receiver - will error out causing it to exit. - -3. Miscellaneous tasks: - - Trigger shutdown on exit. - - Listen for shutdown from other errors. +To ensure that all tasks shutdown on completion (or failure) of the +simulation, we relay on the following: +1. [Triggered](https://docs.rs/triggered/latest/triggered): a `Trigger` + that can be used to inform threads/tasks that it's time to shut down, + and a `Listener` that propagates this signal. +2. The (`Trigger`, `Listener`) pair are used with channels: if a channel + errors out across `send()` or `recv()`, shutdown is triggered. There is + no reliance on channel mechanics, i.e. errors generated when all senders + are and/or a receiver is dropped. +3. All events are handled in a `tokio::select` to allow waiting on +multiple asynchronous tasks at once. These selects should be `biased` +on the exit case (ie, the `Listener` being triggered) so that we +prioritize exit above generating more events. +4. Additionally, we `select!` on shutdown signal on `send()`/`recv()` +for all channels to guarantee this: + - A task's receiver exiting while one or more corresponding senders + (in different tasks) are actively sending, doesn't result in the + sending tasks erroring due to channel `SendError`. Any sender's + inability to `send()` due to a dropped receiver triggers a clean + shutdown across all listening tasks. diff --git a/sim-lib/src/lib.rs b/sim-lib/src/lib.rs index bd1d44f1..3e28fb78 100644 --- a/sim-lib/src/lib.rs +++ b/sim-lib/src/lib.rs @@ -10,7 +10,7 @@ use std::collections::HashSet; use std::fmt::{Display, Formatter}; use std::marker::Send; use std::path::PathBuf; -use std::time::UNIX_EPOCH; +use std::time::{SystemTimeError, UNIX_EPOCH}; use std::{collections::HashMap, sync::Arc, time::SystemTime}; use thiserror::Error; use tokio::sync::mpsc::{channel, Receiver, Sender}; @@ -50,12 +50,18 @@ impl NodeId { crate::NodeId::PublicKey(pk) => { if pk != node_id { return Err(LightningError::ValidationError(format!( - "the provided node id does not match the one returned by the backend ({} != {}).", pk, node_id))); + "the provided node id does not match the one returned by the backend ({} != {}).", + pk, node_id + ))); } }, crate::NodeId::Alias(a) => { if alias != a { - log::warn!("The provided alias does not match the one returned by the backend ({} != {}).", a, alias) + log::warn!( + "The provided alias does not match the one returned by the backend ({} != {}).", + a, + alias + ) } *alias = a.to_string(); }, @@ -167,6 +173,14 @@ pub enum SimulationError { RandomActivityError(RandomActivityError), #[error("Simulated Network Error: {0}")] SimulatedNetworkError(String), + #[error("System Time Error: {0}")] + SystemTimeError(#[from] SystemTimeError), + #[error("Missing Node Error: {0}")] + MissingNodeError(String), + #[error("Mpsc Channel Error: {0}")] + MpscChannelError(String), + #[error("Payment Generation Error: {0}")] + PaymentGenerationError(PaymentGenerationError), } #[derive(Debug, Error)] @@ -324,11 +338,16 @@ struct Payment { impl Display for Payment { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + let dispatch_time = self + .dispatch_time + .duration_since(UNIX_EPOCH) + .expect("Failed to compute duration since unix epoch."); + write!( f, - "Payment {} dispatched at {:?} sending {} msat from {} -> {}", + "Payment {} dispatched at {:?} sending {} msat from {} -> {}.", self.hash.map(|h| hex::encode(h.0)).unwrap_or_default(), - self.dispatch_time.duration_since(UNIX_EPOCH).unwrap(), + dispatch_time, self.amount_msat, self.source, self.destination, @@ -338,7 +357,7 @@ impl Display for Payment { /// SimulationEvent describes the set of actions that the simulator can run on nodes that it has execution permissions /// on. -#[derive(Clone)] +#[derive(Clone, Debug)] enum SimulationEvent { /// Dispatch a payment of the specified amount to the public key provided. /// Results in `SimulationOutput::SendPaymentSuccess` or `SimulationOutput::SendPaymentFailure`. @@ -346,6 +365,7 @@ enum SimulationEvent { } /// SimulationOutput provides the output of a simulation event. +#[derive(Debug, Clone)] enum SimulationOutput { /// Intermediate output for when simulator has successfully dispatched a payment. /// We need to track the result of the payment to report on it. @@ -429,7 +449,10 @@ impl Simulation { for node in self.nodes.values() { let node = node.lock().await; if !node.get_info().features.supports_keysend() { - return Err(LightningError::ValidationError(format!("All nodes eligible for random activity generation must support keysend, {} does not", node.get_info()))); + return Err(LightningError::ValidationError(format!( + "All nodes eligible for random activity generation must support keysend, {} does not", + node.get_info() + ))); } } } @@ -485,7 +508,10 @@ impl Simulation { } } - log::info!("Simulation is running on {}.", running_network.unwrap()); + log::info!( + "Simulation is running on {}.", + running_network.expect("Network not provided.") + ); Ok(()) } @@ -546,7 +572,7 @@ impl Simulation { }); } - // We always want to wait ofr all threads to exit, so we wait for all of them to exit and track any errors + // We always want to wait for all threads to exit, so we wait for all of them to exit and track any errors // that surface. It's okay if there are multiple and one is overwritten, we just want to know whether we // exited with an error or not. let mut success = true; @@ -572,32 +598,63 @@ impl Simulation { tasks: &mut JoinSet<()>, ) { let listener = self.shutdown_listener.clone(); + let shutdown = self.shutdown_trigger.clone(); log::debug!("Setting up simulator data collection."); // Create a sender/receiver pair that will be used to report final results of simulation. let (results_sender, results_receiver) = channel(1); - tasks.spawn(produce_simulation_results( - self.nodes.clone(), - output_receiver, - results_sender, - listener.clone(), - )); + let nodes = self.nodes.clone(); + // psr: produce simulation results + let psr_listener = listener.clone(); + let psr_shutdown = shutdown.clone(); + tasks.spawn(async move { + log::debug!("Starting simulation results producer."); + if let Err(e) = + produce_simulation_results(nodes, output_receiver, results_sender, psr_listener) + .await + { + psr_shutdown.trigger(); + log::error!("Produce simulation results exited with error: {e:?}."); + } else { + log::debug!("Produce simulation results received shutdown signal."); + } + }); let result_logger = Arc::new(Mutex::new(PaymentResultLogger::new())); - tasks.spawn(run_results_logger( - listener.clone(), - result_logger.clone(), - Duration::from_secs(60), - )); + let result_logger_clone = result_logger.clone(); + let result_logger_listener = listener.clone(); + tasks.spawn(async move { + log::debug!("Starting results logger."); + run_results_logger( + result_logger_listener, + result_logger_clone, + Duration::from_secs(60), + ) + .await; + log::debug!("Exiting results logger."); + }); + + // csr: consume simulation results + let csr_write_results = self.write_results.clone(); + tasks.spawn(async move { + log::debug!("Staring simulation results consumer."); + if let Err(e) = consume_simulation_results( + result_logger, + results_receiver, + listener, + csr_write_results, + ) + .await + { + shutdown.trigger(); + log::error!("Consume simulation results exited with error: {e:?}."); + } else { + log::debug!("Consume simulation result received shutdown signal."); + } + }); - tasks.spawn(consume_simulation_results( - result_logger, - results_receiver, - listener, - self.write_results.clone(), - )); log::debug!("Simulator data collection set up."); } @@ -708,12 +765,23 @@ impl Simulation { // Generate a consumer for the receiving end of the channel. It takes the event receiver that it'll pull // events from and the results sender to report the events it has triggered for further monitoring. - tasks.spawn(consume_events( - node.clone(), - receiver, - output_sender.clone(), - self.shutdown_trigger.clone(), - )); + // ce: consume event + let ce_listener = self.shutdown_listener.clone(); + let ce_shutdown = self.shutdown_trigger.clone(); + let ce_output_sender = output_sender.clone(); + let ce_node = node.clone(); + tasks.spawn(async move { + let node_info = ce_node.lock().await.get_info().clone(); + log::debug!("Starting events consumer for {}.", node_info); + if let Err(e) = + consume_events(ce_node, receiver, ce_output_sender, ce_listener).await + { + ce_shutdown.trigger(); + log::error!("Event consumer exited with error: {e:?}."); + } else { + log::debug!("Event consumer for node {node_info} received shutdown signal."); + } + }); } channels @@ -735,14 +803,34 @@ impl Simulation { ))), )?; - tasks.spawn(produce_events( - executor.source_info, - executor.network_generator, - executor.payment_generator, - sender.clone(), - self.shutdown_trigger.clone(), - self.shutdown_listener.clone(), - )); + // pe: produce events + let pe_shutdown = self.shutdown_trigger.clone(); + let pe_listener = self.shutdown_listener.clone(); + let pe_sender = sender.clone(); + tasks.spawn(async move { + let source = executor.source_info.clone(); + + log::info!( + "Starting activity producer for {}: {}.", + source, + executor.payment_generator + ); + + if let Err(e) = produce_events( + executor.source_info, + executor.network_generator, + executor.payment_generator, + pe_sender, + pe_listener, + ) + .await + { + pe_shutdown.trigger(); + log::debug!("Event producer exited with error {e}."); + } else { + log::debug!("Random activity generator for {source} received shutdown signal."); + } + }); } Ok(()) @@ -750,73 +838,74 @@ impl Simulation { } /// events that are crated for a lightning node that we can execute events on. Any output that is generated from the -/// event being executed is piped into a channel to handle the result of the event. If it exits, it will use the -/// trigger provided to trigger shutdown in other threads. If an error occurs elsewhere, we expect the senders -/// corresponding to our receiver to be dropped, which will cause the receiver to error out and -/// exit. +/// event being executed is piped into a channel to handle the result of the event. async fn consume_events( node: Arc>, mut receiver: Receiver, sender: Sender, - shutdown: Trigger, -) { - log::debug!("Started consumer for {}.", node.lock().await.get_info()); - - while let Some(event) = receiver.recv().await { - match event { - SimulationEvent::SendPayment(dest, amt_msat) => { - let mut node = node.lock().await; - - let mut payment = Payment { - source: node.get_info().pubkey, - hash: None, - amount_msat: amt_msat, - destination: dest.pubkey, - dispatch_time: SystemTime::now(), - }; + listener: Listener, +) -> Result<(), SimulationError> { + loop { + select! { + biased; + _ = listener.clone() => { + return Ok(()); + }, + simulation_event = receiver.recv() => { + if let Some(event) = simulation_event { + match event { + SimulationEvent::SendPayment(dest, amt_msat) => { + let mut node = node.lock().await; + + let mut payment = Payment { + source: node.get_info().pubkey, + hash: None, + amount_msat: amt_msat, + destination: dest.pubkey, + dispatch_time: SystemTime::now(), + }; + + let outcome = match node.send_payment(dest.pubkey, amt_msat).await { + Ok(payment_hash) => { + log::debug!( + "Send payment: {} -> {}: ({}).", + node.get_info(), + dest, + hex::encode(payment_hash.0) + ); + // We need to track the payment outcome using the payment hash that we have received. + payment.hash = Some(payment_hash); + SimulationOutput::SendPaymentSuccess(payment) + } + Err(e) => { + log::error!( + "Error while sending payment {} -> {}.", + node.get_info(), + dest + ); + + match e { + LightningError::PermanentError(s) => { + return Err(SimulationError::LightningError(LightningError::PermanentError(s))); + } + _ => SimulationOutput::SendPaymentFailure( + payment, + PaymentResult::not_dispatched(), + ), + } + } + }; - let outcome = match node.send_payment(dest.pubkey, amt_msat).await { - Ok(payment_hash) => { - log::debug!( - "Send payment: {} -> {}: ({}).", - node.get_info(), - dest, - hex::encode(payment_hash.0) - ); - // We need to track the payment outcome using the payment hash that we have received. - payment.hash = Some(payment_hash); - SimulationOutput::SendPaymentSuccess(payment) - }, - Err(e) => { - log::error!( - "Error while sending payment {} -> {}.", - node.get_info(), - dest - ); - - match e { - LightningError::PermanentError(s) => { - log::error!("Simulation terminated with error: {s}."); - shutdown.trigger(); - break; - }, - _ => SimulationOutput::SendPaymentFailure( - payment, - PaymentResult::not_dispatched(), - ), + if sender.send(outcome.clone()).await.is_err() { + return Err(SimulationError::MpscChannelError(format!("Error sending simulation output {outcome:?}."))); + } } - }, - }; - - match sender.send(outcome).await { - Ok(_) => {}, - Err(e) => { - log::error!("Error sending action outcome: {:?}.", e); - break; - }, + } + } else { + return Ok(()) } - }, - }; + } + } } } @@ -827,11 +916,8 @@ async fn produce_events>, node_generator: Box, sender: Sender, - shutdown: Trigger, listener: Listener, -) { - log::info!("Started activity producer for {source}: {node_generator}."); - +) -> Result<(), SimulationError> { loop { let wait = node_generator.next_payment_wait(); log::debug!("Next payment for {source} in {:?} seconds.", wait); @@ -839,8 +925,7 @@ async fn produce_events { - log::debug!("Random activity generator for {source} received signal to shut down."); - break; + return Ok(()); }, // Wait until our time to next payment has elapsed then execute a random amount payment to a random // destination. @@ -859,8 +944,7 @@ async fn produce_events { - log::error!("Could not get amount for {source} -> {destination}: {e}. Please report a bug!"); - break; + return Err(SimulationError::PaymentGenerationError(e)); }, }; @@ -868,63 +952,46 @@ async fn produce_events {destination}. Consumer error: {e}.", - ); - break; + if sender.send(event.clone()).await.is_err() { + return Err(SimulationError::MpscChannelError (format!("Stopped random producer for {amount}: {source} -> {destination}."))); } + }, } } - - log::debug!("Stopped random activity producer {source}."); - shutdown.trigger(); } async fn consume_simulation_results( - logger: Arc>, - receiver: Receiver<(Payment, PaymentResult)>, - listener: Listener, - write_results: Option, -) { - log::debug!("Simulation results consumer started."); - - if let Err(e) = write_payment_results(logger, receiver, listener, write_results).await { - log::error!("Error while reporting payment results: {:?}.", e); - } - - log::debug!("Simulation results consumer exiting."); -} - -async fn write_payment_results( logger: Arc>, mut receiver: Receiver<(Payment, PaymentResult)>, listener: Listener, write_results: Option, ) -> Result<(), SimulationError> { - let mut writer = write_results.and_then(|write_result| { - let file = write_result.results_dir.join(format!( - "simulation_{:?}.csv", - SystemTime::now() - .duration_since(SystemTime::UNIX_EPOCH) - .unwrap() - .as_secs() - )); - let writer = WriterBuilder::new().from_path(file).ok()?; - Some((writer, write_result.batch_size)) - }); + let mut writer = match write_results { + Some(res) => { + let duration = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH)?; + let file = res + .results_dir + .join(format!("simulation_{:?}.csv", duration)); + let writer = WriterBuilder::new().from_path(file)?; + Some((writer, res.batch_size)) + }, + None => None, + }; let mut counter = 1; + loop { - tokio::select! { + select! { biased; _ = listener.clone() => { - log::debug!("Simulation results consumer received shutdown signal."); - return writer.map_or(Ok(()), |(ref mut w, _)| w.flush().map_err(|_| SimulationError::FileError)) + writer.map_or(Ok(()), |(ref mut w, _)| w.flush().map_err(|_| { + SimulationError::FileError + }))?; + return Ok(()); }, - payment_report = receiver.recv() => { - match payment_report { + payment_result = receiver.recv() => { + match payment_result { Some((details, result)) => { logger.lock().await.report_result(&details, &result); log::trace!("Resolved dispatched payment: {} with: {}.", details, result); @@ -936,7 +1003,9 @@ async fn write_payment_results( })?; counter = counter % batch_size + 1; if batch_size == counter { - w.flush().map_err(|_| SimulationError::FileError)?; + w.flush().map_err(|_| { + SimulationError::FileError + })?; } } }, @@ -985,12 +1054,14 @@ impl Display for PaymentResultLogger { } } +/// Reports a summary of payment results at a duration specified by `interval` +/// Note that `run_results_logger` does not error in any way, thus it has no +/// trigger. It listens for triggers to ensure clean exit. async fn run_results_logger( listener: Listener, logger: Arc>, interval: Duration, ) { - log::debug!("Results logger started."); log::info!("Summary of results will be reported every {:?}.", interval); loop { @@ -1005,8 +1076,6 @@ async fn run_results_logger( } } } - - log::debug!("Results logger stopped.") } /// produce_results is responsible for receiving the outputs of events that the simulator has taken and @@ -1022,56 +1091,63 @@ async fn produce_simulation_results( nodes: HashMap>>, mut output_receiver: Receiver, results: Sender<(Payment, PaymentResult)>, - shutdown: Listener, -) { - log::debug!("Simulation results producer started."); - + listener: Listener, +) -> Result<(), SimulationError> { let mut set = tokio::task::JoinSet::new(); loop { tokio::select! { biased; - _ = shutdown.clone() => break, + _ = listener.clone() => { + return Ok(()) + }, output = output_receiver.recv() => { - match output{ + match output { Some(simulation_output) => { match simulation_output{ SimulationOutput::SendPaymentSuccess(payment) => { - let source_node = nodes.get(&payment.source).unwrap().clone(); - set.spawn(track_payment_result( - source_node, results.clone(), payment, shutdown.clone(), - )); + if let Some(source_node) = nodes.get(&payment.source) { + set.spawn(track_payment_result( + source_node.clone(), results.clone(), payment, listener.clone() + )); + } else { + return Err(SimulationError::MissingNodeError(format!("Source node with public key: {} unavailable.", payment.source))); + } }, SimulationOutput::SendPaymentFailure(payment, result) => { - if results.clone().send((payment, result)).await.is_err() { - log::debug!("Could not send payment result."); + if results.send((payment, result.clone())).await.is_err() { + return Err(SimulationError::MpscChannelError( + format!("Failed to send payment result: {result} for payment {:?} dispatched at {:?}.", payment.hash, payment.dispatch_time), + )); } } }; - }, - None => { - return + None => return Ok(()) + } + }, + track_payment = set.join_next() => { + if let Some(res) = track_payment { + match res { + Ok(track_payment_res) => { + track_payment_res? + }, + Err(_) => { + return Err(SimulationError::TaskError); + }, } } } } } - - log::debug!("Simulation results producer exiting."); - while let Some(res) = set.join_next().await { - if let Err(e) = res { - log::error!("Simulation results producer task exited with error: {e}."); - } - } } async fn track_payment_result( node: Arc>, results: Sender<(Payment, PaymentResult)>, payment: Payment, - shutdown: Listener, -) { + listener: Listener, +) -> Result<(), SimulationError> { log::trace!("Payment result tracker starting."); let mut node = node.lock().await; @@ -1079,7 +1155,7 @@ async fn track_payment_result( let res = match payment.hash { Some(hash) => { log::debug!("Tracking payment outcome for: {}.", hex::encode(hash.0)); - let track_payment = node.track_payment(hash, shutdown.clone()); + let track_payment = node.track_payment(hash, listener.clone()); match track_payment.await { Ok(res) => { @@ -1105,9 +1181,19 @@ async fn track_payment_result( }, }; - if results.clone().send((payment, res)).await.is_err() { - log::debug!("Could not send payment result."); + select! { + biased; + _ = listener.clone() => { + log::debug!("Track payment result received a shutdown signal."); + }, + send_payment_result = results.send((payment, res.clone())) => { + if send_payment_result.is_err() { + return Err(SimulationError::MpscChannelError(format!("Failed to send payment result {res} for payment {payment}."))) + } + } } - log::trace!("Payment result tracker exiting."); + log::trace!("Result tracking complete. Payment result tracker exiting."); + + Ok(()) }