From bbbfbeca58804a818cbb490a5c373a43a89ac1ae Mon Sep 17 00:00:00 2001 From: janekdererste Date: Wed, 13 Dec 2023 14:50:58 +0100 Subject: [PATCH 01/11] Add instrumentation to simulation and events publisher --- src/simulation/controller.rs | 6 ++---- src/simulation/logging.rs | 3 +++ src/simulation/messaging/events.rs | 3 ++- src/simulation/simulation.rs | 16 ++++++++++++++-- 4 files changed, 21 insertions(+), 7 deletions(-) diff --git a/src/simulation/controller.rs b/src/simulation/controller.rs index 8b98e090..ef9b6b01 100644 --- a/src/simulation/controller.rs +++ b/src/simulation/controller.rs @@ -24,7 +24,7 @@ use crate::simulation::replanning::replanner::{DummyReplanner, ReRouteTripReplan use crate::simulation::replanning::routing::travel_time_collector::TravelTimeCollector; use crate::simulation::simulation::Simulation; use crate::simulation::vehicles::garage::Garage; -use crate::simulation::{id, logging, profiling}; +use crate::simulation::{id, logging}; pub fn run_channel() { let args = CommandLineArgs::parse(); @@ -161,9 +161,7 @@ fn execute_partition(comm: C, args: &CommandLineAr replanner, ); - profiling::measure_duration(None, "Overall Execution Time", None, || { - simulation.run(start_time, end_time) - }); + simulation.run(start_time, end_time); } /// Have this more complicated join logic, so that threads in the back of the handle vec can also diff --git a/src/simulation/logging.rs b/src/simulation/logging.rs index 829463a5..ba69ac2d 100644 --- a/src/simulation/logging.rs +++ b/src/simulation/logging.rs @@ -6,6 +6,7 @@ use tracing::Level; use tracing_appender::non_blocking::WorkerGuard; use tracing_appender::{non_blocking, rolling}; use tracing_subscriber::fmt; +use tracing_subscriber::fmt::format::FmtSpan; use tracing_subscriber::fmt::writer::MakeWriterExt; use tracing_subscriber::layer::SubscriberExt; use tracing_subscriber::Layer; @@ -32,6 +33,7 @@ pub fn init_logging(dir: &Path, file_discriminant: &str) -> (WorkerGuard, Worker let collector = tracing_subscriber::registry() .with( fmt::Layer::new() + .with_span_events(FmtSpan::CLOSE) .with_writer(io::stdout) .with_filter(LevelFilter::INFO), ) @@ -45,6 +47,7 @@ pub fn init_logging(dir: &Path, file_discriminant: &str) -> (WorkerGuard, Worker .with( fmt::Layer::new() .with_writer(trace_file.with_min_level(Level::TRACE)) + .with_span_events(FmtSpan::CLOSE) .json(), ); tracing::subscriber::set_global_default(collector).expect("Unable to set a global collector"); diff --git a/src/simulation/messaging/events.rs b/src/simulation/messaging/events.rs index 10ce0de2..3f0f828b 100644 --- a/src/simulation/messaging/events.rs +++ b/src/simulation/messaging/events.rs @@ -1,7 +1,7 @@ use std::any::Any; use std::collections::HashMap; -use tracing::info; +use tracing::{info, instrument}; use crate::simulation::wire_types::events::event::Type::{ ActEnd, ActStart, Arrival, Departure, Generic, LinkEnter, LinkLeave, PersonEntersVeh, @@ -85,6 +85,7 @@ impl EventsPublisher { } } + #[instrument(skip_all, level = "trace")] pub fn finish(&mut self) { for handler in self.handlers.iter_mut() { handler.finish(); diff --git a/src/simulation/simulation.rs b/src/simulation/simulation.rs index a6a30fe7..a61a957c 100644 --- a/src/simulation/simulation.rs +++ b/src/simulation/simulation.rs @@ -1,3 +1,6 @@ +use std::fmt::Debug; +use std::fmt::Formatter; + use tracing::info; use crate::simulation::config::Config; @@ -62,6 +65,7 @@ where } } + #[tracing::instrument(skip_all, level = "trace")] pub fn run(&mut self, start_time: u32, end_time: u32) { // use fixed start and end times let mut now = start_time; @@ -73,8 +77,6 @@ where while now <= end_time { if self.net_message_broker.rank() == 0 && now % 1800 == 0 { - //if now % 600 == 0 { - //if now % 800 == 0 { let _hour = now / 3600; let _min = (now % 3600) / 60; info!( @@ -253,6 +255,16 @@ where } } +impl Debug for Simulation { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!( + f, + "Simulation with Rank #{}", + self.net_message_broker.rank() + ) + } +} + #[cfg(test)] mod tests { /* From a6b3a4db8582537d51909e5cfbdadbe9688e7b63 Mon Sep 17 00:00:00 2001 From: janekdererste Date: Thu, 14 Dec 2023 17:09:04 +0100 Subject: [PATCH 02/11] Improve logging Add conversion for networks Don't override partitions if num_parts == 1 because that is surprising --- src/bin/convert_network.rs | 57 ++++++++++++++++++++++++ src/bin/proto2xml.rs | 22 ++++++--- src/simulation/controller.rs | 2 +- src/simulation/id/id_store.rs | 2 + src/simulation/network/global_network.rs | 14 +----- src/simulation/network/io.rs | 2 +- 6 files changed, 77 insertions(+), 22 deletions(-) create mode 100644 src/bin/convert_network.rs diff --git a/src/bin/convert_network.rs b/src/bin/convert_network.rs new file mode 100644 index 00000000..8602b00d --- /dev/null +++ b/src/bin/convert_network.rs @@ -0,0 +1,57 @@ +use std::path::PathBuf; + +use clap::Parser; +use tracing::info; + +use rust_q_sim::simulation::config::PartitionMethod; +use rust_q_sim::simulation::id; +use rust_q_sim::simulation::logging::init_std_out_logging; +use rust_q_sim::simulation::network::global_network::Network; + +fn main() { + init_std_out_logging(); + let args = InputArgs::parse(); + info!("Starting network conversion with args: {args:?}"); + + let net_path = PathBuf::from(&args.network); + let id_path = PathBuf::from(&args.id_store); + + if is_binary_format(&net_path) { + info!("Converting from binary to xml format. Load id store first."); + id::load_from_file(&id_path); + } + + let net = Network::from_file_path(&net_path, 1, PartitionMethod::None); + let out_path = replace_extension(&net_path); + net.to_file(&out_path); + + if is_binary_format(&net_path) { + info!("Converting from xml to binary format. Writing ids to store file."); + id::store_to_file(&id_path); + } +} + +fn is_binary_format(path: &PathBuf) -> bool { + path.extension() + .expect("Network files must either end with xml, xml.gz, or binpb") + .eq("binpb") +} + +fn replace_extension(path: &PathBuf) -> PathBuf { + let new_ext = if is_binary_format(path) { + "xml.gz" + } else { + "binpb" + }; + let mut result = PathBuf::from(path); + result.set_extension(new_ext); + result +} + +#[derive(Parser, Debug)] +struct InputArgs { + #[arg(short, long)] + pub network: String, + #[arg(short, long)] + pub id_store: String, +} diff --git a/src/bin/proto2xml.rs b/src/bin/proto2xml.rs index 3e16c592..180390e7 100644 --- a/src/bin/proto2xml.rs +++ b/src/bin/proto2xml.rs @@ -2,9 +2,12 @@ use std::io::{Read, Seek}; use std::path::PathBuf; use clap::Parser; +use tracing::info; +use rust_q_sim::simulation::id; use rust_q_sim::simulation::io::proto_events::EventsReader; use rust_q_sim::simulation::io::xml_events::XmlEventsWriter; +use rust_q_sim::simulation::logging::init_std_out_logging; use rust_q_sim::simulation::messaging::events::EventsPublisher; use rust_q_sim::simulation::wire_types::events::Event; @@ -26,15 +29,18 @@ impl StatefulReader { } fn main() { + init_std_out_logging(); let args = InputArgs::parse(); + info!("Proto2Xml with args: {args:?}"); - println!("Proto2Xml with args: {args:?}"); + info!("Load Id Store"); + id::load_from_file(&PathBuf::from(args.id_store)); let mut readers = Vec::new(); - println!("Reading from Files: "); + info!("Reading from Files: "); for i in 0..args.num_parts { - let file_string = format!("{}events.{i}.pbf", args.path); - println!("\t {}", file_string); + let file_string = format!("{}events.{i}.binpb", args.path); + info!("\t {}", file_string); let file_path = PathBuf::from(file_string); let reader = EventsReader::from_file(&file_path); let wrapper = StatefulReader { @@ -43,7 +49,7 @@ fn main() { }; readers.push(wrapper); } - let output_file_string = format!("{}.xml", args.path); + let output_file_string = format!("{}events.xml", args.path); let output_file_path = PathBuf::from(output_file_string); let mut publisher = EventsPublisher::new(); publisher.add_subscriber(Box::new(XmlEventsWriter::new(&output_file_path))); @@ -67,9 +73,9 @@ fn main() { }; } - println!("Finished reading proto files. Calling finish on XmlWriter"); + info!("Finished reading proto files. Calling finish on XmlWriter"); publisher.finish(); - println!("Finished writing to xml-file.") + info!("Finished writing to xml-file.") } fn process_events(time: u32, events: &Vec, publisher: &mut EventsPublisher) { @@ -82,6 +88,8 @@ fn process_events(time: u32, events: &Vec, publisher: &mut EventsPublishe struct InputArgs { #[arg(long)] pub path: String, + #[arg(long)] + pub id_store: String, #[arg(long, default_value_t = 1)] pub num_parts: u32, } diff --git a/src/simulation/controller.rs b/src/simulation/controller.rs index ef9b6b01..062a63d3 100644 --- a/src/simulation/controller.rs +++ b/src/simulation/controller.rs @@ -127,7 +127,7 @@ fn execute_partition(comm: C, args: &CommandLineAr let mut events = EventsPublisher::new(); - let events_file = format!("events.{rank}.pbf"); + let events_file = format!("events.{rank}.binpb"); let events_path = output_path.join(events_file); events.add_subscriber(Box::new(ProtoEventsWriter::new(&events_path))); let travel_time_collector = Box::new(TravelTimeCollector::new()); diff --git a/src/simulation/id/id_store.rs b/src/simulation/id/id_store.rs index d9499925..7f46f5d6 100644 --- a/src/simulation/id/id_store.rs +++ b/src/simulation/id/id_store.rs @@ -24,9 +24,11 @@ enum IdCompression { } fn serialize_to_file(store: &IdStore, file_path: &Path, compression: IdCompression) { + info!("Starting writing IdStore to file {file_path:?}"); let file = File::create(file_path).unwrap(); let mut file_writer = BufWriter::new(file); serialize(store, &mut file_writer, compression); + info!("Finished writing IdStore to file {file_path:?}"); } fn serialize(store: &IdStore, writer: &mut W, compression: IdCompression) { diff --git a/src/simulation/network/global_network.rs b/src/simulation/network/global_network.rs index 0bc4a2af..e5114a50 100644 --- a/src/simulation/network/global_network.rs +++ b/src/simulation/network/global_network.rs @@ -139,19 +139,7 @@ impl Network { } } } - PartitionMethod::None => { - // We can have the situation, that someone specified more partitions in the network file than the actual simulation is started with. - // Since the partitioning should normally be precomputed with the same number, it's ok to not check this here. - // But for testing purposes (compare base case with 1 partition and with more) we reset the partition of the nodes in that case. - if num_parts == 1 { - for n in network.nodes.iter_mut() { - n.partition = 0; - } - for l in network.links.iter_mut() { - l.partition = 0; - } - } - } + PartitionMethod::None => {} } } diff --git a/src/simulation/network/io.rs b/src/simulation/network/io.rs index 0ffc0487..b6eada4f 100644 --- a/src/simulation/network/io.rs +++ b/src/simulation/network/io.rs @@ -25,7 +25,7 @@ pub fn from_file(path: &Path) -> Network { pub fn to_file(network: &Network, path: &Path) { if path.extension().unwrap().eq("binpb") { write_to_proto(network, path); - } else if path.extension().unwrap().eq("xml") || path.extension().unwrap().eq("xml.gz") { + } else if path.extension().unwrap().eq("xml") || path.extension().unwrap().eq("gz") { write_to_xml(network, path); } else { panic!("Tried to write {path:?} . File format not supported. Either use `.xml`, `.xml.gz`, or `.binpb` as extension"); From abc41070d777dcb22f816831934eeb9780412119 Mon Sep 17 00:00:00 2001 From: janekdererste Date: Thu, 14 Dec 2023 18:37:02 +0100 Subject: [PATCH 03/11] improve naming in convert_network --- src/bin/convert_network.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/bin/convert_network.rs b/src/bin/convert_network.rs index 8602b00d..20f1f1e7 100644 --- a/src/bin/convert_network.rs +++ b/src/bin/convert_network.rs @@ -25,7 +25,7 @@ fn main() { let out_path = replace_extension(&net_path); net.to_file(&out_path); - if is_binary_format(&net_path) { + if is_binary_format(&out_path) { info!("Converting from xml to binary format. Writing ids to store file."); id::store_to_file(&id_path); } From cb779de833c0963e63d6d639edd1c0618728ad81 Mon Sep 17 00:00:00 2001 From: janekdererste Date: Wed, 3 Jan 2024 19:20:42 +0100 Subject: [PATCH 04/11] tinkering with logging and tracing --- Cargo.lock | 34 +- Cargo.toml | 2 +- assets/equil/equil-config.yml | 2 +- src/simulation/controller.rs | 5 +- src/simulation/logging.rs | 8 +- .../messaging/communication/communicators.rs | 2 + src/simulation/profiling/mod.rs | 333 +++++++++++++++++- src/simulation/simulation.rs | 23 +- 8 files changed, 387 insertions(+), 22 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a1d2667d..e48c3c0b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -794,6 +794,15 @@ dependencies = [ "twox-hash", ] +[[package]] +name = "matchers" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8263075bb86c5a1b1427b5ae862e8889656f126e9f77c484496e8b47cf5c5558" +dependencies = [ + "regex-automata 0.1.10", +] + [[package]] name = "memchr" version = "2.6.4" @@ -1161,8 +1170,17 @@ checksum = "ebee201405406dbf528b8b672104ae6d6d63e6d118cb10e4d51abbc7b58044ff" dependencies = [ "aho-corasick", "memchr", - "regex-automata", - "regex-syntax", + "regex-automata 0.3.9", + "regex-syntax 0.7.5", +] + +[[package]] +name = "regex-automata" +version = "0.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c230d73fb8d8c1b9c0b3135c5142a8acee3a0558fb8db5cf1cb65f8d7862132" +dependencies = [ + "regex-syntax 0.6.29", ] [[package]] @@ -1173,9 +1191,15 @@ checksum = "59b23e92ee4318893fa3fe3e6fb365258efbfe6ac6ab30f090cdcbb7aa37efa9" dependencies = [ "aho-corasick", "memchr", - "regex-syntax", + "regex-syntax 0.7.5", ] +[[package]] +name = "regex-syntax" +version = "0.6.29" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f162c6dd7b008981e4d40210aca20b4bd0f9b60ca9271061b07f78537722f2e1" + [[package]] name = "regex-syntax" version = "0.7.5" @@ -1583,12 +1607,16 @@ version = "0.3.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "30a651bc37f915e81f087d86e62a18eec5f79550c7faff886f7090b4ea757c77" dependencies = [ + "matchers", "nu-ansi-term", + "once_cell", + "regex", "serde", "serde_json", "sharded-slab", "smallvec", "thread_local", + "tracing", "tracing-core", "tracing-log", "tracing-serde", diff --git a/Cargo.toml b/Cargo.toml index ff048a7f..f984f9f4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -24,7 +24,7 @@ bytes = "1.3.0" serial_test = "1.0.0" wait-timeout = "0.2.0" tracing = "0.1" -tracing-subscriber = { version = "0.3.0", features = ["json", "fmt", "std", "registry"] } +tracing-subscriber = { version = "0.3.0", features = ["json", "fmt", "std", "registry", "env-filter"] } tracing-appender = "0.2" itertools = "0.10.5" assert_approx_eq = "1.1.0" diff --git a/assets/equil/equil-config.yml b/assets/equil/equil-config.yml index d68fbf7f..1f27f0a4 100644 --- a/assets/equil/equil-config.yml +++ b/assets/equil/equil-config.yml @@ -11,7 +11,7 @@ modules: method: Metis output: type: Output - output_dir: /Users/janek/Documents/equil-output/output + output_dir: /Users/janek/Documents/rust_q_sim/equil/output/size routing: type: Routing mode: UsePlans diff --git a/src/simulation/controller.rs b/src/simulation/controller.rs index 062a63d3..a2ee7f04 100644 --- a/src/simulation/controller.rs +++ b/src/simulation/controller.rs @@ -148,9 +148,6 @@ fn execute_partition(comm: C, args: &CommandLineAr }; let net_message_broker = NetMessageBroker::new(rc, &network, &network_partition); - let start_time = config.simulation().start_time; - let end_time = config.simulation().end_time; - let mut simulation: Simulation = Simulation::new( config, network_partition, @@ -161,7 +158,7 @@ fn execute_partition(comm: C, args: &CommandLineAr replanner, ); - simulation.run(start_time, end_time); + simulation.run(); } /// Have this more complicated join logic, so that threads in the back of the handle vec can also diff --git a/src/simulation/logging.rs b/src/simulation/logging.rs index ba69ac2d..fcbd25dc 100644 --- a/src/simulation/logging.rs +++ b/src/simulation/logging.rs @@ -2,12 +2,10 @@ use std::io; use std::path::Path; use tracing::level_filters::LevelFilter; -use tracing::Level; use tracing_appender::non_blocking::WorkerGuard; use tracing_appender::{non_blocking, rolling}; use tracing_subscriber::fmt; use tracing_subscriber::fmt::format::FmtSpan; -use tracing_subscriber::fmt::writer::MakeWriterExt; use tracing_subscriber::layer::SubscriberExt; use tracing_subscriber::Layer; @@ -46,9 +44,11 @@ pub fn init_logging(dir: &Path, file_discriminant: &str) -> (WorkerGuard, Worker ) .with( fmt::Layer::new() - .with_writer(trace_file.with_min_level(Level::TRACE)) + .with_writer(trace_file) .with_span_events(FmtSpan::CLOSE) - .json(), + .with_ansi(false) + .json() + .with_filter(LevelFilter::TRACE), ); tracing::subscriber::set_global_default(collector).expect("Unable to set a global collector"); (_guard_log, _guard_performance) diff --git a/src/simulation/messaging/communication/communicators.rs b/src/simulation/messaging/communication/communicators.rs index 5ed50d94..079c51b9 100644 --- a/src/simulation/messaging/communication/communicators.rs +++ b/src/simulation/messaging/communication/communicators.rs @@ -6,6 +6,7 @@ use mpi::datatype::PartitionMut; use mpi::point_to_point::{Destination, Source}; use mpi::topology::{Communicator, SystemCommunicator}; use mpi::{Count, Rank}; +use tracing::instrument; use crate::simulation::wire_types::messages::{SimMessage, SyncMessage, TravelTimesMessage}; @@ -167,6 +168,7 @@ pub struct MpiSimCommunicator { } impl SimCommunicator for MpiSimCommunicator { + #[instrument(level = "trace", skip_all, fields(rank = self.rank()))] fn send_receive_vehicles( &self, out_messages: HashMap, diff --git a/src/simulation/profiling/mod.rs b/src/simulation/profiling/mod.rs index 4dcc7186..11ce468c 100644 --- a/src/simulation/profiling/mod.rs +++ b/src/simulation/profiling/mod.rs @@ -1,8 +1,20 @@ -use std::env; -use std::time::Instant; +use std::fmt::Debug; +use std::fmt::Write; +use std::fs::File; +use std::io::BufWriter; +use std::path::Path; +use std::sync::{Arc, Mutex, MutexGuard}; +use std::time::{Instant, SystemTime}; +use std::{env, fs}; use serde_json::{json, Value}; -use tracing::trace; +use tracing::field::{Field, FieldSet, Visit}; +use tracing::span::Attributes; +use tracing::{field, trace, Event, Id, Subscriber}; +use tracing_subscriber::fmt::{format, FmtContext, FormatEvent, FormatFields, MakeWriter}; +use tracing_subscriber::layer::Context; +use tracing_subscriber::registry::LookupSpan; +use tracing_subscriber::Layer; const DEFAULT_PERFORMANCE_INTERVAL: u32 = 900; @@ -35,3 +47,318 @@ pub fn measure_duration Out>( } res } + +struct CustomLayer { + writer: Arc>>, +} + +impl CustomLayer { + fn new(path: &Path) -> Self { + let header = "timestamp,target,func_name,elapsed_time,\n".to_string(); + let file = + File::create(path).unwrap_or_else(|_e| panic!("Unable to create file at {:?}", path)); + let mut writer = BufWriter::new(file); + std::io::Write::write(&mut writer, header.as_bytes()).expect("Failed to write header."); + Self { + writer: Arc::new(Mutex::new(writer)), + } + } + + fn write_metadata(e: &Event) -> String { + format!( + "{},{},{},", + SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap() + .as_nanos(), + e.metadata().target(), + e.metadata().name() + ) + } +} + +impl Layer for CustomLayer +where + S: Subscriber + for<'a> LookupSpan<'a>, +{ + fn on_new_span(&self, attrs: &Attributes<'_>, id: &Id, ctx: Context<'_, S>) { + let span = ctx.span(id).expect("should exist"); + let mut extensions = span.extensions_mut(); + + println!("{attrs:?}"); + extensions.insert(CustomTimings::new()); + } + + fn on_event(&self, event: &Event, _ctx: Context<'_, S>) { + let result = Self::write_metadata(event); + let mut visitor = CustomVisitor { result }; + event.record(&mut visitor as &mut dyn Visit); + visitor.result.push('\n'); + println!("Writing: {}", visitor.result); + + let mut bla = self.writer.lock().unwrap(); + std::io::Write::write(&mut *bla, visitor.result.as_bytes()).expect(""); + // bla.write(visitor.result.as_bytes()).expect("TODO: panic message"); + } + + fn on_enter(&self, id: &Id, ctx: Context<'_, S>) { + let span = ctx.span(id).expect("Should exist"); + let mut extensions = span.extensions_mut(); + + if let Some(timing) = extensions.get_mut::() { + timing.last = Instant::now(); + } + } + + fn on_exit(&self, id: &Id, ctx: Context<'_, S>) { + let span = ctx.span(id).expect("Span should be there"); + let mut extensions = span.extensions_mut(); + + if let Some(timing) = extensions.get_mut::() { + let now = Instant::now(); + timing.busy += (now - timing.last).as_nanos() as u64; + } + } + + fn on_close(&self, id: Id, ctx: Context<'_, S>) { + println!("Start close"); + let span = ctx.span(&id).expect("Span should be there!"); + let extensions = span.extensions(); + let callsite = span.metadata().callsite(); + let meta = span.metadata(); + let fs = FieldSet::new(&["time", "custom close"], callsite); + + let time = extensions.get::().unwrap(); + let v = [ + ( + &fs.field("time").unwrap(), + Some(&time.busy as &dyn field::Value), + ), + ( + &fs.field("custom close").unwrap(), + Some(&"custom close message" as &dyn field::Value), + ), + ]; + let value_set = fs.value_set(&v); + Event::child_of(id, meta, &value_set); + + drop(extensions); + drop(span); + println!("End close"); + } +} + +impl Drop for CustomLayer { + fn drop(&mut self) { + println!("Custom layer gets dropped."); + let mut writer = self.writer.lock().unwrap(); + std::io::Write::flush(&mut *writer).expect("TODO: panic message"); + } +} + +struct CustomTimings { + busy: u64, + last: Instant, +} + +impl CustomTimings { + fn new() -> Self { + Self { + busy: 0, + last: Instant::now(), + } + } +} + +struct CustomVisitor { + result: String, +} + +impl Visit for CustomVisitor { + fn record_debug(&mut self, field: &Field, value: &dyn Debug) { + if field.name().eq("time") { + write!(&mut self.result, "{:?},", value).unwrap(); + } + } +} + +struct CustomWriter { + inner: Arc>>, +} + +struct CustomWriterGuard { + writer_ref: Arc>>, +} + +impl Drop for CustomWriterGuard { + fn drop(&mut self) { + println!("Starting drop of Custom Writer Guard"); + let mut locked = self.writer_ref.lock().unwrap(); + std::io::Write::flush(&mut *locked) + .unwrap_or_else(|e| panic!("failed to flush writer on drop")); + } +} + +impl CustomWriter { + fn new(path: &Path) -> (Self, CustomWriterGuard) { + // create necessary file path and corresponding file wrapped in buffered writer + let prefix = path.parent().unwrap(); + fs::create_dir_all(prefix).unwrap(); + let file = + File::create(path).unwrap_or_else(|e| panic!("Failed to open file at: {path:?}")); + let mut writer = BufWriter::new(file); + + // write header for csv file + std::io::Write::write( + &mut writer, + "timestamp,target,func_name,duration\n".as_bytes(), + ) + .unwrap_or_else(|e| panic!("Failed to write header.")); + + // wire up stuff, so that we can have a guard and so that we are sync and send, because that + // is required by the tracing_subcriber crate + let inner = Arc::new(Mutex::new(writer)); + let result = Self { + inner: inner.clone(), + }; + let guard = CustomWriterGuard { + writer_ref: result.inner.clone(), + }; + (result, guard) + } +} + +struct MakeWriteResult<'a>(MutexGuard<'a, BufWriter>); + +impl<'a> MakeWriter<'a> for CustomWriter { + type Writer = MakeWriteResult<'a>; + + fn make_writer(&'a self) -> Self::Writer { + let locked = self.inner.lock().unwrap(); + MakeWriteResult(locked) + } +} + +impl std::io::Write for MakeWriteResult<'_> { + fn write(&mut self, buf: &[u8]) -> std::io::Result { + self.0.write(buf) + } + + fn flush(&mut self) -> std::io::Result<()> { + self.0.flush() + } +} + +struct CustomFormatter; + +struct CustomTimeVisitor<'a> { + writer: &'a mut dyn Write, +} + +impl<'a> CustomTimeVisitor<'a> { + fn new(writer: &'a mut dyn Write) -> Self { + Self { writer } + } +} + +impl<'a> Visit for CustomTimeVisitor<'a> { + fn record_u64(&mut self, field: &Field, value: u64) { + println!("Visitor record u64"); + if field.name().eq("time.busy") { + write!(&mut self.writer, "{value}") + .unwrap_or_else(|e| panic!("Failed to write field time.")); + } + } + + fn record_debug(&mut self, field: &Field, value: &dyn Debug) { + println!("Visitor record debug"); + if field.name().eq("time.busy") { + write!(&mut self.writer, "{value:?}") + .unwrap_or_else(|e| panic!("Failed to write field time.")); + } + } +} + +impl FormatEvent for CustomFormatter +where + S: Subscriber + for<'a> LookupSpan<'a>, + N: for<'a> FormatFields<'a> + 'static, +{ + fn format_event( + &self, + _ctx: &FmtContext<'_, S, N>, + mut writer: format::Writer<'_>, + e: &Event<'_>, + ) -> std::fmt::Result { + write!( + &mut writer, + "{},{},{},", + SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap() + .as_nanos(), + e.metadata().target(), + e.metadata().name() + ) + .unwrap_or_else(|e| panic!("OH no! Can't write.")); + + let mut visitor = CustomTimeVisitor::new(&mut writer); + e.record(&mut visitor); + writeln!(&mut writer) + } +} + +#[cfg(test)] +mod tests { + use std::path::PathBuf; + use std::thread::sleep; + use std::time::Duration; + + use tracing::{info, instrument, Level}; + use tracing_subscriber::filter::FilterExt; + use tracing_subscriber::fmt::format::FmtSpan; + use tracing_subscriber::layer::SubscriberExt; + use tracing_subscriber::{fmt, EnvFilter, Layer}; + + use crate::simulation::profiling::{CustomFormatter, CustomWriter}; + + #[test] + fn test_events() { + let path = PathBuf::from("./test_output/simulation/profiling/test_events"); + let (writer, guard) = CustomWriter::new(&path); + let custom_formatter = CustomFormatter {}; + let custom_filter = EnvFilter::from_default_env().add_directive(Level::TRACE.into()); + let custom_filter_2 = EnvFilter::from_default_env() + .add_directive(Level::INFO.into()) + .not(); + + // let custom_layer = CustomLayer::new(&path); + // let writer_ref = custom_layer.writer.clone(); + let layers = tracing_subscriber::registry().with( + fmt::Layer::new() + .with_span_events(FmtSpan::CLOSE) + .with_writer(writer) + .event_format(custom_formatter) + .with_filter(custom_filter) + .with_filter(custom_filter_2), + ); + tracing::subscriber::set_global_default(layers).expect("TODO: panic message"); + + info!("Before func"); + some_function(); + info!("After func"); + + some_other_function(42, std::f32::consts::PI); + } + + #[instrument] + fn some_function() { + info!("Inside some function.") + } + + #[instrument(level = "trace")] + fn some_other_function(a: u32, b: f32) { + info!("Inside some other function"); + sleep(Duration::from_nanos(10)); + } +} diff --git a/src/simulation/simulation.rs b/src/simulation/simulation.rs index a61a957c..19e03b71 100644 --- a/src/simulation/simulation.rs +++ b/src/simulation/simulation.rs @@ -1,7 +1,7 @@ use std::fmt::Debug; use std::fmt::Formatter; -use tracing::info; +use tracing::{info, instrument}; use crate::simulation::config::Config; use crate::simulation::id::Id; @@ -29,6 +29,8 @@ where net_message_broker: NetMessageBroker, events: EventsPublisher, replanner: Box, + start_time: u32, + end_time: u32, } impl Simulation @@ -62,20 +64,24 @@ where net_message_broker, events, replanner, + start_time: config.simulation().start_time, + end_time: config.simulation().end_time, } } - #[tracing::instrument(skip_all, level = "trace")] - pub fn run(&mut self, start_time: u32, end_time: u32) { + #[tracing::instrument(level = "trace", skip(self), fields(rank = self.net_message_broker.rank()))] + pub fn run(&mut self) { // use fixed start and end times - let mut now = start_time; + let mut now = self.start_time; info!( - "Starting #{}. Network neighbors: {:?}, Start time {start_time}, End time {end_time}", + "Starting #{}. Network neighbors: {:?}, Start time {}, End time {}", self.net_message_broker.rank(), self.network.neighbors(), + self.start_time, + self.end_time, ); - while now <= end_time { + while now <= self.end_time { if self.net_message_broker.rank() == 0 && now % 1800 == 0 { let _hour = now / 3600; let _min = (now % 3600) / 60; @@ -98,6 +104,7 @@ where self.events.finish(); } + #[tracing::instrument(level = "trace", skip(self), fields(rank = self.net_message_broker.rank()))] fn wakeup(&mut self, now: u32) { let agents = self.activity_q.pop(now); @@ -138,6 +145,7 @@ where } } + #[instrument(level = "trace", skip(self, agent), fields(rank = self.net_message_broker.rank()))] fn departure(&mut self, mut agent: Person, now: u32) -> Vehicle { //here, current element counter is going to be increased agent.advance_plan(); @@ -160,6 +168,7 @@ where self.replanner.replan(now, agent, &self.garage) } + #[instrument(level = "trace", skip(self), fields(rank = self.net_message_broker.rank()))] fn terminate_teleportation(&mut self, now: u32) { let teleportation_vehicles = self.teleportation_q.pop(now); for vehicle in teleportation_vehicles { @@ -195,6 +204,7 @@ where } } + #[instrument(level = "trace", skip(self), fields(rank = self.net_message_broker.rank()))] fn move_nodes(&mut self, now: u32) { let exited_vehicles = self.network.move_nodes(&mut self.events, now); @@ -220,6 +230,7 @@ where } } + #[instrument(level = "trace", skip(self), fields(rank = self.net_message_broker.rank()))] fn move_links(&mut self, now: u32) { let (vehicles, storage_cap) = self.network.move_links(now); From b452f57aa9bce0cdeec77668984cb546954b85d1 Mon Sep 17 00:00:00 2001 From: janekdererste Date: Thu, 4 Jan 2024 11:27:14 +0100 Subject: [PATCH 05/11] Cleanup monitoring code and use it in loggin.rs --- Cargo.lock | 34 +--- Cargo.toml | 2 +- src/simulation/logging.rs | 39 ++-- src/simulation/profiling/mod.rs | 304 ++++++++------------------------ 4 files changed, 105 insertions(+), 274 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e48c3c0b..a1d2667d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -794,15 +794,6 @@ dependencies = [ "twox-hash", ] -[[package]] -name = "matchers" -version = "0.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8263075bb86c5a1b1427b5ae862e8889656f126e9f77c484496e8b47cf5c5558" -dependencies = [ - "regex-automata 0.1.10", -] - [[package]] name = "memchr" version = "2.6.4" @@ -1170,17 +1161,8 @@ checksum = "ebee201405406dbf528b8b672104ae6d6d63e6d118cb10e4d51abbc7b58044ff" dependencies = [ "aho-corasick", "memchr", - "regex-automata 0.3.9", - "regex-syntax 0.7.5", -] - -[[package]] -name = "regex-automata" -version = "0.1.10" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6c230d73fb8d8c1b9c0b3135c5142a8acee3a0558fb8db5cf1cb65f8d7862132" -dependencies = [ - "regex-syntax 0.6.29", + "regex-automata", + "regex-syntax", ] [[package]] @@ -1191,15 +1173,9 @@ checksum = "59b23e92ee4318893fa3fe3e6fb365258efbfe6ac6ab30f090cdcbb7aa37efa9" dependencies = [ "aho-corasick", "memchr", - "regex-syntax 0.7.5", + "regex-syntax", ] -[[package]] -name = "regex-syntax" -version = "0.6.29" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f162c6dd7b008981e4d40210aca20b4bd0f9b60ca9271061b07f78537722f2e1" - [[package]] name = "regex-syntax" version = "0.7.5" @@ -1607,16 +1583,12 @@ version = "0.3.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "30a651bc37f915e81f087d86e62a18eec5f79550c7faff886f7090b4ea757c77" dependencies = [ - "matchers", "nu-ansi-term", - "once_cell", - "regex", "serde", "serde_json", "sharded-slab", "smallvec", "thread_local", - "tracing", "tracing-core", "tracing-log", "tracing-serde", diff --git a/Cargo.toml b/Cargo.toml index f984f9f4..ff048a7f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -24,7 +24,7 @@ bytes = "1.3.0" serial_test = "1.0.0" wait-timeout = "0.2.0" tracing = "0.1" -tracing-subscriber = { version = "0.3.0", features = ["json", "fmt", "std", "registry", "env-filter"] } +tracing-subscriber = { version = "0.3.0", features = ["json", "fmt", "std", "registry"] } tracing-appender = "0.2" itertools = "0.10.5" assert_approx_eq = "1.1.0" diff --git a/src/simulation/logging.rs b/src/simulation/logging.rs index fcbd25dc..0afbb7bd 100644 --- a/src/simulation/logging.rs +++ b/src/simulation/logging.rs @@ -5,10 +5,11 @@ use tracing::level_filters::LevelFilter; use tracing_appender::non_blocking::WorkerGuard; use tracing_appender::{non_blocking, rolling}; use tracing_subscriber::fmt; -use tracing_subscriber::fmt::format::FmtSpan; use tracing_subscriber::layer::SubscriberExt; use tracing_subscriber::Layer; +use crate::simulation::profiling::{SpanDurationToCSVLayer, WriterGuard}; + pub fn init_std_out_logging() { let collector = tracing_subscriber::registry().with( fmt::Layer::new() @@ -18,21 +19,29 @@ pub fn init_std_out_logging() { tracing::subscriber::set_global_default(collector).expect("Unable to set a global collector"); } -pub fn init_logging(dir: &Path, file_discriminant: &str) -> (WorkerGuard, WorkerGuard) { +pub fn init_logging(dir: &Path, file_discriminant: &str) -> (WorkerGuard, WriterGuard) { let log_file_name = format!("log_process_{file_discriminant}.txt"); - let log_file_appender = rolling::never(dir, &log_file_name); + let log_file_appender = rolling::never(dir, log_file_name); let (log_file, _guard_log) = non_blocking(log_file_appender); - let trace_dir = dir.join("trace"); + /*let trace_dir = dir.join("trace"); let trace_file_name = format!("trace_process_{file_discriminant}.txt"); let trace_file_appender = rolling::never(&trace_dir, &trace_file_name); let (trace_file, _guard_performance) = non_blocking(trace_file_appender); + */ + + let duration_dir = dir.join("instrument"); + let duration_file_name = format!("instrument_process_{file_discriminant}.csv"); + let duration_path = duration_dir.join(duration_file_name); + let (csv_layer, _guard) = SpanDurationToCSVLayer::new(&duration_path); + let collector = tracing_subscriber::registry() + .with(csv_layer) .with( fmt::Layer::new() - .with_span_events(FmtSpan::CLOSE) .with_writer(io::stdout) + .pretty() .with_filter(LevelFilter::INFO), ) .with( @@ -41,15 +50,17 @@ pub fn init_logging(dir: &Path, file_discriminant: &str) -> (WorkerGuard, Worker .json() .with_ansi(false) .with_filter(LevelFilter::DEBUG), - ) - .with( - fmt::Layer::new() - .with_writer(trace_file) - .with_span_events(FmtSpan::CLOSE) - .with_ansi(false) - .json() - .with_filter(LevelFilter::TRACE), ); + /*.with( + fmt::Layer::new() + .with_writer(trace_file) + .with_span_events(FmtSpan::CLOSE) + .with_ansi(false) + .json() + .with_filter(LevelFilter::TRACE), + ); + + */ tracing::subscriber::set_global_default(collector).expect("Unable to set a global collector"); - (_guard_log, _guard_performance) + (_guard_log, _guard) } diff --git a/src/simulation/profiling/mod.rs b/src/simulation/profiling/mod.rs index 11ce468c..592d6a37 100644 --- a/src/simulation/profiling/mod.rs +++ b/src/simulation/profiling/mod.rs @@ -1,17 +1,13 @@ -use std::fmt::Debug; -use std::fmt::Write; use std::fs::File; use std::io::BufWriter; use std::path::Path; -use std::sync::{Arc, Mutex, MutexGuard}; +use std::sync::{Arc, Mutex}; use std::time::{Instant, SystemTime}; use std::{env, fs}; use serde_json::{json, Value}; -use tracing::field::{Field, FieldSet, Visit}; use tracing::span::Attributes; -use tracing::{field, trace, Event, Id, Subscriber}; -use tracing_subscriber::fmt::{format, FmtContext, FormatEvent, FormatFields, MakeWriter}; +use tracing::{trace, Id, Subscriber}; use tracing_subscriber::layer::Context; use tracing_subscriber::registry::LookupSpan; use tracing_subscriber::Layer; @@ -48,64 +44,81 @@ pub fn measure_duration Out>( res } -struct CustomLayer { +pub struct SpanDurationToCSVLayer { writer: Arc>>, } -impl CustomLayer { - fn new(path: &Path) -> Self { - let header = "timestamp,target,func_name,elapsed_time,\n".to_string(); +pub struct WriterGuard { + writer_ref: Arc>>, +} + +struct SpanDuration { + elapsed: u64, + last: Instant, +} + +impl SpanDurationToCSVLayer { + pub fn new(path: &Path) -> (Self, WriterGuard) { + // create necessary file path and corresponding file wrapped in buffered writer + let prefix = path.parent().unwrap(); + fs::create_dir_all(prefix).unwrap(); let file = - File::create(path).unwrap_or_else(|_e| panic!("Unable to create file at {:?}", path)); + File::create(path).unwrap_or_else(|_e| panic!("Failed to open file at: {path:?}")); let mut writer = BufWriter::new(file); - std::io::Write::write(&mut writer, header.as_bytes()).expect("Failed to write header."); - Self { - writer: Arc::new(Mutex::new(writer)), - } + + // write header for csv file + std::io::Write::write( + &mut writer, + "timestamp,target,func_name,duration\n".as_bytes(), + ) + .unwrap_or_else(|_e| panic!("Failed to write header.")); + + // wrap the writer into an arc> so that we can keep a reference which gets dropped + // at the end of the scope calling this method. The mutex is necessary, because the Layer + // must be Sync + Send for the tracing_subscriber subscriber + let writer_ref = Arc::new(Mutex::new(writer)); + let new_self = Self { + writer: writer_ref.clone(), + }; + let guard = WriterGuard { writer_ref }; + (new_self, guard) } - fn write_metadata(e: &Event) -> String { - format!( + fn write_metadata(writer: &mut BufWriter, m: &tracing::Metadata) { + // import Write here, to avoid conflicts with std::fmt::Write + use std::io::Write; + + write!( + writer, "{},{},{},", SystemTime::now() .duration_since(SystemTime::UNIX_EPOCH) .unwrap() .as_nanos(), - e.metadata().target(), - e.metadata().name() + m.target(), + m.name(), ) + .unwrap(); } } -impl Layer for CustomLayer +/// Simple Layer implementation, which records the time elapsed between a a span being opened and being +/// closed again. Once a span is closed, it writes the elapsed time into a csv journal +impl Layer for SpanDurationToCSVLayer where S: Subscriber + for<'a> LookupSpan<'a>, { - fn on_new_span(&self, attrs: &Attributes<'_>, id: &Id, ctx: Context<'_, S>) { + fn on_new_span(&self, _attrs: &Attributes<'_>, id: &Id, ctx: Context<'_, S>) { let span = ctx.span(id).expect("should exist"); let mut extensions = span.extensions_mut(); - - println!("{attrs:?}"); - extensions.insert(CustomTimings::new()); - } - - fn on_event(&self, event: &Event, _ctx: Context<'_, S>) { - let result = Self::write_metadata(event); - let mut visitor = CustomVisitor { result }; - event.record(&mut visitor as &mut dyn Visit); - visitor.result.push('\n'); - println!("Writing: {}", visitor.result); - - let mut bla = self.writer.lock().unwrap(); - std::io::Write::write(&mut *bla, visitor.result.as_bytes()).expect(""); - // bla.write(visitor.result.as_bytes()).expect("TODO: panic message"); + extensions.insert(SpanDuration::new()); } fn on_enter(&self, id: &Id, ctx: Context<'_, S>) { let span = ctx.span(id).expect("Should exist"); let mut extensions = span.extensions_mut(); - if let Some(timing) = extensions.get_mut::() { + if let Some(timing) = extensions.get_mut::() { timing.last = Instant::now(); } } @@ -114,234 +127,69 @@ where let span = ctx.span(id).expect("Span should be there"); let mut extensions = span.extensions_mut(); - if let Some(timing) = extensions.get_mut::() { + if let Some(timing) = extensions.get_mut::() { let now = Instant::now(); - timing.busy += (now - timing.last).as_nanos() as u64; + timing.elapsed += (now - timing.last).as_nanos() as u64; } } fn on_close(&self, id: Id, ctx: Context<'_, S>) { - println!("Start close"); + use std::io::Write; + let span = ctx.span(&id).expect("Span should be there!"); let extensions = span.extensions(); - let callsite = span.metadata().callsite(); let meta = span.metadata(); - let fs = FieldSet::new(&["time", "custom close"], callsite); - - let time = extensions.get::().unwrap(); - let v = [ - ( - &fs.field("time").unwrap(), - Some(&time.busy as &dyn field::Value), - ), - ( - &fs.field("custom close").unwrap(), - Some(&"custom close message" as &dyn field::Value), - ), - ]; - let value_set = fs.value_set(&v); - Event::child_of(id, meta, &value_set); + let writer = &mut *self.writer.lock().unwrap(); + Self::write_metadata(writer, meta); + + let span_duration = extensions.get::().unwrap(); + write!(writer, "{},", span_duration.elapsed).unwrap(); + writeln!(writer).unwrap(); + + // extensions and span must be dropped explicitly, says the tracing documentation drop(extensions); drop(span); - println!("End close"); } } -impl Drop for CustomLayer { +impl Drop for WriterGuard { fn drop(&mut self) { - println!("Custom layer gets dropped."); - let mut writer = self.writer.lock().unwrap(); + println!("Writer guard gets dropped."); + let mut writer = self.writer_ref.lock().unwrap(); std::io::Write::flush(&mut *writer).expect("TODO: panic message"); } } -struct CustomTimings { - busy: u64, - last: Instant, -} - -impl CustomTimings { +impl SpanDuration { fn new() -> Self { Self { - busy: 0, + elapsed: 0, last: Instant::now(), } } } -struct CustomVisitor { - result: String, -} - -impl Visit for CustomVisitor { - fn record_debug(&mut self, field: &Field, value: &dyn Debug) { - if field.name().eq("time") { - write!(&mut self.result, "{:?},", value).unwrap(); - } - } -} - -struct CustomWriter { - inner: Arc>>, -} - -struct CustomWriterGuard { - writer_ref: Arc>>, -} - -impl Drop for CustomWriterGuard { - fn drop(&mut self) { - println!("Starting drop of Custom Writer Guard"); - let mut locked = self.writer_ref.lock().unwrap(); - std::io::Write::flush(&mut *locked) - .unwrap_or_else(|e| panic!("failed to flush writer on drop")); - } -} - -impl CustomWriter { - fn new(path: &Path) -> (Self, CustomWriterGuard) { - // create necessary file path and corresponding file wrapped in buffered writer - let prefix = path.parent().unwrap(); - fs::create_dir_all(prefix).unwrap(); - let file = - File::create(path).unwrap_or_else(|e| panic!("Failed to open file at: {path:?}")); - let mut writer = BufWriter::new(file); - - // write header for csv file - std::io::Write::write( - &mut writer, - "timestamp,target,func_name,duration\n".as_bytes(), - ) - .unwrap_or_else(|e| panic!("Failed to write header.")); - - // wire up stuff, so that we can have a guard and so that we are sync and send, because that - // is required by the tracing_subcriber crate - let inner = Arc::new(Mutex::new(writer)); - let result = Self { - inner: inner.clone(), - }; - let guard = CustomWriterGuard { - writer_ref: result.inner.clone(), - }; - (result, guard) - } -} - -struct MakeWriteResult<'a>(MutexGuard<'a, BufWriter>); - -impl<'a> MakeWriter<'a> for CustomWriter { - type Writer = MakeWriteResult<'a>; - - fn make_writer(&'a self) -> Self::Writer { - let locked = self.inner.lock().unwrap(); - MakeWriteResult(locked) - } -} - -impl std::io::Write for MakeWriteResult<'_> { - fn write(&mut self, buf: &[u8]) -> std::io::Result { - self.0.write(buf) - } - - fn flush(&mut self) -> std::io::Result<()> { - self.0.flush() - } -} - -struct CustomFormatter; - -struct CustomTimeVisitor<'a> { - writer: &'a mut dyn Write, -} - -impl<'a> CustomTimeVisitor<'a> { - fn new(writer: &'a mut dyn Write) -> Self { - Self { writer } - } -} - -impl<'a> Visit for CustomTimeVisitor<'a> { - fn record_u64(&mut self, field: &Field, value: u64) { - println!("Visitor record u64"); - if field.name().eq("time.busy") { - write!(&mut self.writer, "{value}") - .unwrap_or_else(|e| panic!("Failed to write field time.")); - } - } - - fn record_debug(&mut self, field: &Field, value: &dyn Debug) { - println!("Visitor record debug"); - if field.name().eq("time.busy") { - write!(&mut self.writer, "{value:?}") - .unwrap_or_else(|e| panic!("Failed to write field time.")); - } - } -} - -impl FormatEvent for CustomFormatter -where - S: Subscriber + for<'a> LookupSpan<'a>, - N: for<'a> FormatFields<'a> + 'static, -{ - fn format_event( - &self, - _ctx: &FmtContext<'_, S, N>, - mut writer: format::Writer<'_>, - e: &Event<'_>, - ) -> std::fmt::Result { - write!( - &mut writer, - "{},{},{},", - SystemTime::now() - .duration_since(SystemTime::UNIX_EPOCH) - .unwrap() - .as_nanos(), - e.metadata().target(), - e.metadata().name() - ) - .unwrap_or_else(|e| panic!("OH no! Can't write.")); - - let mut visitor = CustomTimeVisitor::new(&mut writer); - e.record(&mut visitor); - writeln!(&mut writer) - } -} - #[cfg(test)] mod tests { use std::path::PathBuf; use std::thread::sleep; use std::time::Duration; - use tracing::{info, instrument, Level}; - use tracing_subscriber::filter::FilterExt; - use tracing_subscriber::fmt::format::FmtSpan; + use tracing::{info, instrument}; + use tracing_subscriber::fmt::Layer; use tracing_subscriber::layer::SubscriberExt; - use tracing_subscriber::{fmt, EnvFilter, Layer}; - use crate::simulation::profiling::{CustomFormatter, CustomWriter}; + use crate::simulation::profiling::SpanDurationToCSVLayer; #[test] fn test_events() { - let path = PathBuf::from("./test_output/simulation/profiling/test_events"); - let (writer, guard) = CustomWriter::new(&path); - let custom_formatter = CustomFormatter {}; - let custom_filter = EnvFilter::from_default_env().add_directive(Level::TRACE.into()); - let custom_filter_2 = EnvFilter::from_default_env() - .add_directive(Level::INFO.into()) - .not(); - - // let custom_layer = CustomLayer::new(&path); - // let writer_ref = custom_layer.writer.clone(); - let layers = tracing_subscriber::registry().with( - fmt::Layer::new() - .with_span_events(FmtSpan::CLOSE) - .with_writer(writer) - .event_format(custom_formatter) - .with_filter(custom_filter) - .with_filter(custom_filter_2), - ); + let path = PathBuf::from("./test_output/simulation/profiling/test_events.csv"); + + let (csv_layer, _guard) = SpanDurationToCSVLayer::new(&path); + let layers = tracing_subscriber::registry() + .with(csv_layer) + .with(Layer::new().pretty()); tracing::subscriber::set_global_default(layers).expect("TODO: panic message"); info!("Before func"); From 711132a65b214a968f1af82ca96da4d9a0c332fd Mon Sep 17 00:00:00 2001 From: janekdererste Date: Thu, 4 Jan 2024 14:40:16 +0100 Subject: [PATCH 06/11] Add rank to csv output Remove trailing comma on csv line, so that tidyverse doesn't get confused --- src/simulation/logging.rs | 20 ++---------- src/simulation/profiling/mod.rs | 57 +++++++++++++++++++++++++++++---- src/simulation/simulation.rs | 2 +- 3 files changed, 54 insertions(+), 25 deletions(-) diff --git a/src/simulation/logging.rs b/src/simulation/logging.rs index 0afbb7bd..c4d50994 100644 --- a/src/simulation/logging.rs +++ b/src/simulation/logging.rs @@ -5,6 +5,7 @@ use tracing::level_filters::LevelFilter; use tracing_appender::non_blocking::WorkerGuard; use tracing_appender::{non_blocking, rolling}; use tracing_subscriber::fmt; +use tracing_subscriber::fmt::format::FmtSpan; use tracing_subscriber::layer::SubscriberExt; use tracing_subscriber::Layer; @@ -24,13 +25,6 @@ pub fn init_logging(dir: &Path, file_discriminant: &str) -> (WorkerGuard, Writer let log_file_appender = rolling::never(dir, log_file_name); let (log_file, _guard_log) = non_blocking(log_file_appender); - /*let trace_dir = dir.join("trace"); - let trace_file_name = format!("trace_process_{file_discriminant}.txt"); - let trace_file_appender = rolling::never(&trace_dir, &trace_file_name); - let (trace_file, _guard_performance) = non_blocking(trace_file_appender); - - */ - let duration_dir = dir.join("instrument"); let duration_file_name = format!("instrument_process_{file_discriminant}.csv"); let duration_path = duration_dir.join(duration_file_name); @@ -41,7 +35,7 @@ pub fn init_logging(dir: &Path, file_discriminant: &str) -> (WorkerGuard, Writer .with( fmt::Layer::new() .with_writer(io::stdout) - .pretty() + .with_span_events(FmtSpan::CLOSE) .with_filter(LevelFilter::INFO), ) .with( @@ -51,16 +45,6 @@ pub fn init_logging(dir: &Path, file_discriminant: &str) -> (WorkerGuard, Writer .with_ansi(false) .with_filter(LevelFilter::DEBUG), ); - /*.with( - fmt::Layer::new() - .with_writer(trace_file) - .with_span_events(FmtSpan::CLOSE) - .with_ansi(false) - .json() - .with_filter(LevelFilter::TRACE), - ); - - */ tracing::subscriber::set_global_default(collector).expect("Unable to set a global collector"); (_guard_log, _guard) } diff --git a/src/simulation/profiling/mod.rs b/src/simulation/profiling/mod.rs index 592d6a37..0264b784 100644 --- a/src/simulation/profiling/mod.rs +++ b/src/simulation/profiling/mod.rs @@ -1,3 +1,4 @@ +use std::fmt::Debug; use std::fs::File; use std::io::BufWriter; use std::path::Path; @@ -6,8 +7,10 @@ use std::time::{Instant, SystemTime}; use std::{env, fs}; use serde_json::{json, Value}; +use tracing::field::Field; use tracing::span::Attributes; use tracing::{trace, Id, Subscriber}; +use tracing_subscriber::field::Visit; use tracing_subscriber::layer::Context; use tracing_subscriber::registry::LookupSpan; use tracing_subscriber::Layer; @@ -57,6 +60,30 @@ struct SpanDuration { last: Instant, } +struct Rank(u64); + +struct RankVisitor { + rank: Option, +} + +impl RankVisitor { + fn new() -> Self { + RankVisitor { rank: None } + } +} + +impl Visit for RankVisitor { + fn record_u64(&mut self, field: &Field, value: u64) { + if field.name().eq("rank") { + self.rank = Some(value); + } + } + + fn record_debug(&mut self, _field: &Field, _value: &dyn Debug) { + panic!("record_debug not implemented for RankVisitor. This is on purpose, becuase we always expect a rank to be a number") + } +} + impl SpanDurationToCSVLayer { pub fn new(path: &Path) -> (Self, WriterGuard) { // create necessary file path and corresponding file wrapped in buffered writer @@ -69,7 +96,7 @@ impl SpanDurationToCSVLayer { // write header for csv file std::io::Write::write( &mut writer, - "timestamp,target,func_name,duration\n".as_bytes(), + "timestamp,target,func_name,duration,rank\n".as_bytes(), ) .unwrap_or_else(|_e| panic!("Failed to write header.")); @@ -112,6 +139,12 @@ where let span = ctx.span(id).expect("should exist"); let mut extensions = span.extensions_mut(); extensions.insert(SpanDuration::new()); + + let mut visitor = RankVisitor::new(); + _attrs.record(&mut visitor as &mut dyn Visit); + if let Some(rank) = visitor.rank { + extensions.insert(Rank(rank)); + } } fn on_enter(&self, id: &Id, ctx: Context<'_, S>) { @@ -145,6 +178,14 @@ where let span_duration = extensions.get::().unwrap(); write!(writer, "{},", span_duration.elapsed).unwrap(); + + // if a method has supplied a rank, write it into the csv file + let rank = if let Some(rank) = extensions.get::() { + rank.0 as i64 + } else { + -1 + }; + write!(writer, "{rank}").unwrap(); writeln!(writer).unwrap(); // extensions and span must be dropped explicitly, says the tracing documentation @@ -155,7 +196,6 @@ where impl Drop for WriterGuard { fn drop(&mut self) { - println!("Writer guard gets dropped."); let mut writer = self.writer_ref.lock().unwrap(); std::io::Write::flush(&mut *writer).expect("TODO: panic message"); } @@ -176,9 +216,12 @@ mod tests { use std::thread::sleep; use std::time::Duration; + use tracing::level_filters::LevelFilter; use tracing::{info, instrument}; + use tracing_subscriber::fmt::format::FmtSpan; use tracing_subscriber::fmt::Layer; use tracing_subscriber::layer::SubscriberExt; + use tracing_subscriber::Layer as OtherLayer; use crate::simulation::profiling::SpanDurationToCSVLayer; @@ -187,9 +230,11 @@ mod tests { let path = PathBuf::from("./test_output/simulation/profiling/test_events.csv"); let (csv_layer, _guard) = SpanDurationToCSVLayer::new(&path); - let layers = tracing_subscriber::registry() - .with(csv_layer) - .with(Layer::new().pretty()); + let layers = tracing_subscriber::registry().with(csv_layer).with( + Layer::new() + .with_span_events(FmtSpan::CLOSE) + .with_filter(LevelFilter::TRACE), + ); tracing::subscriber::set_global_default(layers).expect("TODO: panic message"); info!("Before func"); @@ -204,7 +249,7 @@ mod tests { info!("Inside some function.") } - #[instrument(level = "trace")] + #[instrument(level = "trace", fields(rank = 42u32))] fn some_other_function(a: u32, b: f32) { info!("Inside some other function"); sleep(Duration::from_nanos(10)); diff --git a/src/simulation/simulation.rs b/src/simulation/simulation.rs index 19e03b71..323ca2d0 100644 --- a/src/simulation/simulation.rs +++ b/src/simulation/simulation.rs @@ -69,7 +69,7 @@ where } } - #[tracing::instrument(level = "trace", skip(self), fields(rank = self.net_message_broker.rank()))] + #[tracing::instrument(level = "info", skip(self), fields(rank = self.net_message_broker.rank()))] pub fn run(&mut self) { // use fixed start and end times let mut now = self.start_time; From 564dd540c693bc25596733e0738fca12e6bf1282 Mon Sep 17 00:00:00 2001 From: Paul Heinrich Date: Fri, 5 Jan 2024 12:05:27 +0100 Subject: [PATCH 07/11] fixed test --- src/simulation/profiling/mod.rs | 2 +- tests/test_simulation.rs | 5 +---- 2 files changed, 2 insertions(+), 5 deletions(-) diff --git a/src/simulation/profiling/mod.rs b/src/simulation/profiling/mod.rs index 0264b784..34b72ce4 100644 --- a/src/simulation/profiling/mod.rs +++ b/src/simulation/profiling/mod.rs @@ -80,7 +80,7 @@ impl Visit for RankVisitor { } fn record_debug(&mut self, _field: &Field, _value: &dyn Debug) { - panic!("record_debug not implemented for RankVisitor. This is on purpose, becuase we always expect a rank to be a number") + panic!("record_debug not implemented for RankVisitor. This is on purpose, because we always expect a rank to be a number") } } diff --git a/tests/test_simulation.rs b/tests/test_simulation.rs index 1fc971a1..8029de76 100644 --- a/tests/test_simulation.rs +++ b/tests/test_simulation.rs @@ -119,14 +119,11 @@ pub fn execute_sim( Box::new(DummyReplanner {}) }; - let start_time = config.simulation().start_time; - let end_time = config.simulation().end_time; - let mut sim = Simulation::new( config, sim_net, garage, population, broker, events, replanner, ); - sim.run(start_time, end_time); + sim.run(); } fn all_temp_files_created(temp_network_file: &PathBuf, temp_population_file: &PathBuf) -> bool { From ee21ac320f03a3f332e9427d5dbad46139512e9d Mon Sep 17 00:00:00 2001 From: Paul Heinrich Date: Fri, 5 Jan 2024 14:45:26 +0100 Subject: [PATCH 08/11] fixed test --- src/simulation/profiling/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/simulation/profiling/mod.rs b/src/simulation/profiling/mod.rs index 34b72ce4..cd82fb52 100644 --- a/src/simulation/profiling/mod.rs +++ b/src/simulation/profiling/mod.rs @@ -249,7 +249,7 @@ mod tests { info!("Inside some function.") } - #[instrument(level = "trace", fields(rank = 42u32))] + #[instrument(level = "trace", skip(a, b), fields(rank = 42u32))] fn some_other_function(a: u32, b: f32) { info!("Inside some other function"); sleep(Duration::from_nanos(10)); From 9a671cc1131068330a4df80e5c2b48f43e867f8d Mon Sep 17 00:00:00 2001 From: janekdererste Date: Fri, 5 Jan 2024 15:00:36 +0100 Subject: [PATCH 09/11] Make profiling optional --- assets/equil/equil-config.yml | 1 + src/simulation/config.rs | 11 +++++++++++ src/simulation/controller.rs | 7 ++----- src/simulation/logging.rs | 26 ++++++++++++++++++-------- 4 files changed, 32 insertions(+), 13 deletions(-) diff --git a/assets/equil/equil-config.yml b/assets/equil/equil-config.yml index 1f27f0a4..9ff0501a 100644 --- a/assets/equil/equil-config.yml +++ b/assets/equil/equil-config.yml @@ -12,6 +12,7 @@ modules: output: type: Output output_dir: /Users/janek/Documents/rust_q_sim/equil/output/size + profiling: None routing: type: Routing mode: UsePlans diff --git a/src/simulation/config.rs b/src/simulation/config.rs index 7e86fb20..ad0da73e 100644 --- a/src/simulation/config.rs +++ b/src/simulation/config.rs @@ -35,6 +35,7 @@ impl Config { let out_dir = format!("{}-{part_args}", config.output().output_dir); config.set_output(Output { output_dir: out_dir, + profiling: config.output().profiling, }); } config @@ -81,6 +82,7 @@ impl Config { } else { let default = Output { output_dir: "./".to_string(), + profiling: Profiling::None, }; self.modules .borrow_mut() @@ -150,6 +152,8 @@ pub struct Partitioning { #[derive(Serialize, Deserialize, Clone)] pub struct Output { pub output_dir: String, + #[serde(default)] + pub profiling: Profiling, } #[derive(Serialize, Deserialize, Clone)] @@ -216,6 +220,13 @@ pub enum PartitionMethod { None, } +#[derive(PartialEq, Debug, ValueEnum, Clone, Copy, Serialize, Deserialize, Default)] +pub enum Profiling { + #[default] + None, + CSV, +} + #[cfg(test)] mod tests { use crate::simulation::config::{Config, PartitionMethod, Partitioning}; diff --git a/src/simulation/controller.rs b/src/simulation/controller.rs index a2ee7f04..734377c2 100644 --- a/src/simulation/controller.rs +++ b/src/simulation/controller.rs @@ -31,7 +31,7 @@ pub fn run_channel() { let config = Config::from_file(&args); let _guards = logging::init_logging( - config.output().output_dir.as_ref(), + &config, config.partitioning().num_parts.to_string().as_str(), ); @@ -65,10 +65,7 @@ pub fn run_mpi() { let args = CommandLineArgs::parse(); let config = Config::from_file(&args); - let _guards = logging::init_logging( - config.output().output_dir.as_ref(), - comm.rank().to_string().as_str(), - ); + let _guards = logging::init_logging(&config, comm.rank().to_string().as_str()); info!( "Starting MPI Simulation with {} partitions", diff --git a/src/simulation/logging.rs b/src/simulation/logging.rs index c4d50994..0a11571a 100644 --- a/src/simulation/logging.rs +++ b/src/simulation/logging.rs @@ -1,5 +1,5 @@ use std::io; -use std::path::Path; +use std::path::PathBuf; use tracing::level_filters::LevelFilter; use tracing_appender::non_blocking::WorkerGuard; @@ -9,6 +9,7 @@ use tracing_subscriber::fmt::format::FmtSpan; use tracing_subscriber::layer::SubscriberExt; use tracing_subscriber::Layer; +use crate::simulation::config::{Config, Profiling}; use crate::simulation::profiling::{SpanDurationToCSVLayer, WriterGuard}; pub fn init_std_out_logging() { @@ -20,15 +21,24 @@ pub fn init_std_out_logging() { tracing::subscriber::set_global_default(collector).expect("Unable to set a global collector"); } -pub fn init_logging(dir: &Path, file_discriminant: &str) -> (WorkerGuard, WriterGuard) { +pub fn init_logging( + config: &Config, + file_discriminant: &str, +) -> (WorkerGuard, Option) { + let dir = PathBuf::from(&config.output().output_dir); let log_file_name = format!("log_process_{file_discriminant}.txt"); - let log_file_appender = rolling::never(dir, log_file_name); + let log_file_appender = rolling::never(&dir, log_file_name); let (log_file, _guard_log) = non_blocking(log_file_appender); - let duration_dir = dir.join("instrument"); - let duration_file_name = format!("instrument_process_{file_discriminant}.csv"); - let duration_path = duration_dir.join(duration_file_name); - let (csv_layer, _guard) = SpanDurationToCSVLayer::new(&duration_path); + let (csv_layer, guard) = if config.output().profiling.eq(&Profiling::CSV) { + let duration_dir = dir.join("instrument"); + let duration_file_name = format!("instrument_process_{file_discriminant}.csv"); + let duration_path = duration_dir.join(duration_file_name); + let (layer, writer_guard) = SpanDurationToCSVLayer::new(&duration_path); + (Some(layer), Some(writer_guard)) + } else { + (None, None) + }; let collector = tracing_subscriber::registry() .with(csv_layer) @@ -46,5 +56,5 @@ pub fn init_logging(dir: &Path, file_discriminant: &str) -> (WorkerGuard, Writer .with_filter(LevelFilter::DEBUG), ); tracing::subscriber::set_global_default(collector).expect("Unable to set a global collector"); - (_guard_log, _guard) + (_guard_log, guard) } From 75686420895a7438a458142dbacc49a245b1548a Mon Sep 17 00:00:00 2001 From: janekdererste Date: Fri, 5 Jan 2024 15:33:29 +0100 Subject: [PATCH 10/11] Fix test and merge errors --- src/simulation/config.rs | 7 ------- src/simulation/profiling/mod.rs | 2 +- 2 files changed, 1 insertion(+), 8 deletions(-) diff --git a/src/simulation/config.rs b/src/simulation/config.rs index 9f85a08a..6d371da2 100644 --- a/src/simulation/config.rs +++ b/src/simulation/config.rs @@ -304,13 +304,6 @@ fn u32_value_100() -> u32 { 100 } -#[derive(PartialEq, Debug, ValueEnum, Clone, Copy, Serialize, Deserialize, Default)] -pub enum Profiling { - #[default] - None, - CSV, -} - #[cfg(test)] mod tests { use crate::simulation::config::{ diff --git a/src/simulation/profiling/mod.rs b/src/simulation/profiling/mod.rs index cd82fb52..d5a43173 100644 --- a/src/simulation/profiling/mod.rs +++ b/src/simulation/profiling/mod.rs @@ -80,7 +80,7 @@ impl Visit for RankVisitor { } fn record_debug(&mut self, _field: &Field, _value: &dyn Debug) { - panic!("record_debug not implemented for RankVisitor. This is on purpose, because we always expect a rank to be a number") + //nothing to do here } } From 7d31333c96608d342d700e85b4ffaf4a17bff40f Mon Sep 17 00:00:00 2001 From: Paul Heinrich Date: Fri, 5 Jan 2024 16:08:12 +0100 Subject: [PATCH 11/11] add now to trace output --- src/simulation/profiling/mod.rs | 47 ++++++++++++++++++++++----------- 1 file changed, 32 insertions(+), 15 deletions(-) diff --git a/src/simulation/profiling/mod.rs b/src/simulation/profiling/mod.rs index d5a43173..5fffb227 100644 --- a/src/simulation/profiling/mod.rs +++ b/src/simulation/profiling/mod.rs @@ -62,21 +62,33 @@ struct SpanDuration { struct Rank(u64); -struct RankVisitor { +struct SimTime(u64); + +struct MetadataVisitor { rank: Option, + sim_time: Option, } -impl RankVisitor { +impl MetadataVisitor { fn new() -> Self { - RankVisitor { rank: None } + MetadataVisitor { + rank: None, + sim_time: None, + } } } -impl Visit for RankVisitor { +impl Visit for MetadataVisitor { fn record_u64(&mut self, field: &Field, value: u64) { + //fetch rank if field.name().eq("rank") { self.rank = Some(value); } + + //fetch now (in some cases, the field name is "now" and in others "_now") + if field.name().contains("now") { + self.sim_time = Some(value); + } } fn record_debug(&mut self, _field: &Field, _value: &dyn Debug) { @@ -96,7 +108,7 @@ impl SpanDurationToCSVLayer { // write header for csv file std::io::Write::write( &mut writer, - "timestamp,target,func_name,duration,rank\n".as_bytes(), + "timestamp,target,func_name,duration,sim_time,rank\n".as_bytes(), ) .unwrap_or_else(|_e| panic!("Failed to write header.")); @@ -140,11 +152,15 @@ where let mut extensions = span.extensions_mut(); extensions.insert(SpanDuration::new()); - let mut visitor = RankVisitor::new(); + let mut visitor = MetadataVisitor::new(); _attrs.record(&mut visitor as &mut dyn Visit); if let Some(rank) = visitor.rank { extensions.insert(Rank(rank)); } + + if let Some(sim_time) = visitor.sim_time { + extensions.insert(SimTime(sim_time)); + } } fn on_enter(&self, id: &Id, ctx: Context<'_, S>) { @@ -179,13 +195,14 @@ where let span_duration = extensions.get::().unwrap(); write!(writer, "{},", span_duration.elapsed).unwrap(); - // if a method has supplied a rank, write it into the csv file - let rank = if let Some(rank) = extensions.get::() { - rank.0 as i64 - } else { - -1 - }; + let sim_time = extensions + .get::() + .map_or(-1, |sim_time| sim_time.0 as i64); + write!(writer, "{sim_time},").unwrap(); + + let rank = extensions.get::().map_or(-1, |rank| rank.0 as i64); write!(writer, "{rank}").unwrap(); + writeln!(writer).unwrap(); // extensions and span must be dropped explicitly, says the tracing documentation @@ -241,7 +258,7 @@ mod tests { some_function(); info!("After func"); - some_other_function(42, std::f32::consts::PI); + some_other_function(7, std::f32::consts::PI); } #[instrument] @@ -249,8 +266,8 @@ mod tests { info!("Inside some function.") } - #[instrument(level = "trace", skip(a, b), fields(rank = 42u32))] - fn some_other_function(a: u32, b: f32) { + #[instrument(level = "trace", fields(rank = 42u32))] + fn some_other_function(_now: u32, b: f32) { info!("Inside some other function"); sleep(Duration::from_nanos(10)); }