Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sim-all: Allow running the simulator without it writing on disk #146

Merged
merged 1 commit into from
Oct 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions sim-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ struct Cli {
/// Multiplier of the overall network capacity used by the random activity generator
#[clap(long, short, default_value_t = ACTIVITY_MULTIPLIER)]
capacity_multiplier: f64,
/// Do not create an output file containing the simulations results
#[clap(long, default_value_t = false)]
no_results: bool,
}

#[tokio::main]
Expand Down Expand Up @@ -164,6 +167,7 @@ async fn main() -> anyhow::Result<()> {
cli.print_batch_size,
cli.expected_pmt_amt,
cli.capacity_multiplier,
cli.no_results,
);
let sim2 = sim.clone();

Expand Down
58 changes: 32 additions & 26 deletions sim-lib/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,8 @@ pub struct Simulation {
/// The number of times that the network sends its total capacity in a month of operation when generating random
/// activity.
activity_multiplier: f64,
/// Whether we want the simulation not to produce and result file. Useful for developing, defaults to false.
no_results: bool,
}

impl Simulation {
Expand All @@ -338,6 +340,7 @@ impl Simulation {
print_batch_size: u32,
expected_payment_msat: u64,
activity_multiplier: f64,
no_results: bool,
) -> Self {
let (shutdown_trigger, shutdown_listener) = triggered::trigger();
Self {
Expand All @@ -349,6 +352,7 @@ impl Simulation {
print_batch_size,
expected_payment_msat,
activity_multiplier,
no_results,
}
}

Expand Down Expand Up @@ -528,7 +532,6 @@ impl Simulation {
tasks: &mut JoinSet<()>,
) {
let listener = self.shutdown_listener.clone();
let print_batch_size = self.print_batch_size;
log::debug!("Setting up simulator data collection.");

// Create a sender/receiver pair that will be used to report final results of simulation.
Expand All @@ -544,7 +547,8 @@ impl Simulation {
tasks.spawn(consume_simulation_results(
results_receiver,
listener,
print_batch_size,
self.print_batch_size,
self.no_results,
));
log::debug!("Simulator data collection set up.");
}
Expand Down Expand Up @@ -864,10 +868,11 @@ async fn consume_simulation_results(
receiver: Receiver<(Payment, PaymentResult)>,
listener: Listener,
print_batch_size: u32,
no_results: bool,
) {
log::debug!("Simulation results consumer started.");

if let Err(e) = write_payment_results(receiver, listener, print_batch_size).await {
if let Err(e) = write_payment_results(receiver, listener, print_batch_size, no_results).await {
log::error!("Error while reporting payment results: {:?}.", e);
}

Expand All @@ -878,47 +883,48 @@ async fn write_payment_results(
mut receiver: Receiver<(Payment, PaymentResult)>,
listener: Listener,
print_batch_size: u32,
no_results: bool,
) -> Result<(), SimulationError> {
let mut writer = WriterBuilder::new().from_path(format!(
"simulation_{:?}.csv",
SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_secs()
))?;
let mut writer = if !no_results {
Some(WriterBuilder::new().from_path(format!(
"simulation_{:?}.csv",
SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_secs()
))?)
} else {
None
};

let mut result_logger = PaymentResultLogger::new();

let mut counter = 1;
let mut counter = 0;
loop {
tokio::select! {
biased;
_ = listener.clone() => {
log::debug!("Simulation results consumer received shutdown signal.");
break writer.flush().map_err(|_| SimulationError::FileError)
return writer.map_or(Ok(()), |ref mut w| w.flush().map_err(|_| SimulationError::FileError))
},
payment_report = receiver.recv() => {
match payment_report {
Some((details, result)) => {
result_logger.report_result(&details, &result);
log::trace!("Resolved dispatched payment: {} with: {}.", details, result);

writer.serialize((details, result)).map_err(|e| {
let _ = writer.flush();
SimulationError::CsvError(e)
})?;
if let Some(ref mut w) = writer {
w.serialize((details, result)).map_err(|e| {
let _ = w.flush();
SimulationError::CsvError(e)
})?;

if print_batch_size == counter {
writer.flush().map_err(|_| SimulationError::FileError)?;
counter = 1;
} else {
counter += 1;
counter = counter % print_batch_size + 1;
sr-gi marked this conversation as resolved.
Show resolved Hide resolved
if print_batch_size == counter {
w.flush().map_err(|_| SimulationError::FileError)?;
}
}
continue;
sr-gi marked this conversation as resolved.
Show resolved Hide resolved
},
None => {
break writer.flush().map_err(|_| SimulationError::FileError)
}
None => return writer.map(|ref mut w| w.flush().map_err(|_| SimulationError::FileError)).unwrap_or(Ok(())),
}
}
}
Expand Down