Skip to content

Commit

Permalink
Merge pull request #119 from Janekdererste/profile_runtime
Browse files Browse the repository at this point in the history
Profile runtime
  • Loading branch information
paulheinr authored Jan 5, 2024
2 parents e4b0981 + 7d31333 commit 227f8d4
Show file tree
Hide file tree
Showing 14 changed files with 393 additions and 72 deletions.
3 changes: 2 additions & 1 deletion assets/equil/equil-config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ modules:
method: Metis
output:
type: Output
output_dir: /Users/janek/Documents/equil-output/output
output_dir: /Users/janek/Documents/rust_q_sim/equil/output/size
profiling: None
routing:
type: Routing
mode: UsePlans
Expand Down
57 changes: 57 additions & 0 deletions src/bin/convert_network.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
use std::path::PathBuf;

use clap::Parser;
use tracing::info;

use rust_q_sim::simulation::config::PartitionMethod;
use rust_q_sim::simulation::id;
use rust_q_sim::simulation::logging::init_std_out_logging;
use rust_q_sim::simulation::network::global_network::Network;

fn main() {
init_std_out_logging();
let args = InputArgs::parse();
info!("Starting network conversion with args: {args:?}");

let net_path = PathBuf::from(&args.network);
let id_path = PathBuf::from(&args.id_store);

if is_binary_format(&net_path) {
info!("Converting from binary to xml format. Load id store first.");
id::load_from_file(&id_path);
}

let net = Network::from_file_path(&net_path, 1, PartitionMethod::None);
let out_path = replace_extension(&net_path);
net.to_file(&out_path);

if is_binary_format(&out_path) {
info!("Converting from xml to binary format. Writing ids to store file.");
id::store_to_file(&id_path);
}
}

fn is_binary_format(path: &PathBuf) -> bool {
path.extension()
.expect("Network files must either end with xml, xml.gz, or binpb")
.eq("binpb")
}

fn replace_extension(path: &PathBuf) -> PathBuf {
let new_ext = if is_binary_format(path) {
"xml.gz"
} else {
"binpb"
};
let mut result = PathBuf::from(path);
result.set_extension(new_ext);
result
}

#[derive(Parser, Debug)]
struct InputArgs {
#[arg(short, long)]
pub network: String,
#[arg(short, long)]
pub id_store: String,
}
26 changes: 14 additions & 12 deletions src/bin/proto2xml.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@ use std::io::{Read, Seek};
use std::path::PathBuf;

use clap::Parser;
use rust_q_sim::simulation::id;
use tracing::info;

use rust_q_sim::simulation::id;
use rust_q_sim::simulation::io::proto_events::EventsReader;
use rust_q_sim::simulation::io::xml_events::XmlEventsWriter;
use rust_q_sim::simulation::logging::init_std_out_logging;
use rust_q_sim::simulation::messaging::events::EventsPublisher;
use rust_q_sim::simulation::wire_types::events::Event;

Expand All @@ -27,18 +29,18 @@ impl<R: Read + Seek> StatefulReader<R> {
}

fn main() {
init_std_out_logging();
let args = InputArgs::parse();
info!("Proto2Xml with args: {args:?}");

println!("Proto2Xml with args: {args:?}");

println!("Loading ids.");
id::load_from_file(&PathBuf::from(args.ids_path));
info!("Load Id Store");
id::load_from_file(&PathBuf::from(args.id_store));

let mut readers = Vec::new();
println!("Reading from Files: ");
info!("Reading from Files: ");
for i in 0..args.num_parts {
let file_string = format!("{}events.{i}.pbf", args.path);
println!("\t {}", file_string);
let file_string = format!("{}events.{i}.binpb", args.path);
info!("\t {}", file_string);
let file_path = PathBuf::from(file_string);
let reader = EventsReader::from_file(&file_path);
let wrapper = StatefulReader {
Expand Down Expand Up @@ -71,9 +73,9 @@ fn main() {
};
}

println!("Finished reading proto files. Calling finish on XmlWriter");
info!("Finished reading proto files. Calling finish on XmlWriter");
publisher.finish();
println!("Finished writing to xml-file.")
info!("Finished writing to xml-file.")
}

fn process_events(time: u32, events: &Vec<Event>, publisher: &mut EventsPublisher) {
Expand All @@ -86,8 +88,8 @@ fn process_events(time: u32, events: &Vec<Event>, publisher: &mut EventsPublishe
struct InputArgs {
#[arg(long)]
pub path: String,
#[arg(long)]
pub id_store: String,
#[arg(long, default_value_t = 1)]
pub num_parts: u32,
#[arg(long)]
pub ids_path: String,
}
15 changes: 13 additions & 2 deletions src/simulation/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ impl Config {
let out_dir = format!("{}-{part_args}", config.output().output_dir);
config.set_output(Output {
output_dir: out_dir,
profiling: config.output().profiling,
});
}
config
Expand Down Expand Up @@ -81,6 +82,7 @@ impl Config {
} else {
let default = Output {
output_dir: "./".to_string(),
profiling: Profiling::None,
};
self.modules
.borrow_mut()
Expand Down Expand Up @@ -150,6 +152,8 @@ pub struct Partitioning {
#[derive(Serialize, Deserialize, Clone)]
pub struct Output {
pub output_dir: String,
#[serde(default)]
pub profiling: Profiling,
}

#[derive(Serialize, Deserialize, Clone)]
Expand Down Expand Up @@ -216,6 +220,13 @@ pub enum PartitionMethod {
None,
}

#[derive(PartialEq, Debug, ValueEnum, Clone, Copy, Serialize, Deserialize, Default)]
pub enum Profiling {
#[default]
None,
CSV,
}

#[derive(PartialEq, Debug, Clone, Serialize, Deserialize)]
pub struct MetisOptions {
pub vertex_weight: Vec<VertexWeight>,
Expand Down Expand Up @@ -340,7 +351,7 @@ mod tests {
#[test]
fn read_none_partitioning() {
let yaml = r#"
modules:
modules:
partitioning:
type: Partitioning
num_parts: 1
Expand All @@ -359,7 +370,7 @@ mod tests {
type: Partitioning
num_parts: 1
method: !Metis
vertex_weight:
vertex_weight:
- InLinkCount
imbalance_factor: 1.1
edge_weight: Capacity
Expand Down
18 changes: 5 additions & 13 deletions src/simulation/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,14 @@ use crate::simulation::replanning::replanner::{DummyReplanner, ReRouteTripReplan
use crate::simulation::replanning::routing::travel_time_collector::TravelTimeCollector;
use crate::simulation::simulation::Simulation;
use crate::simulation::vehicles::garage::Garage;
use crate::simulation::{id, logging, profiling};
use crate::simulation::{id, logging};

pub fn run_channel() {
let args = CommandLineArgs::parse();
let config = Config::from_file(&args);

let _guards = logging::init_logging(
config.output().output_dir.as_ref(),
&config,
config.partitioning().num_parts.to_string().as_str(),
);

Expand Down Expand Up @@ -65,10 +65,7 @@ pub fn run_mpi() {
let args = CommandLineArgs::parse();
let config = Config::from_file(&args);

let _guards = logging::init_logging(
config.output().output_dir.as_ref(),
comm.rank().to_string().as_str(),
);
let _guards = logging::init_logging(&config, comm.rank().to_string().as_str());

info!(
"Starting MPI Simulation with {} partitions",
Expand Down Expand Up @@ -127,7 +124,7 @@ fn execute_partition<C: SimCommunicator + 'static>(comm: C, args: &CommandLineAr

let mut events = EventsPublisher::new();

let events_file = format!("events.{rank}.pbf");
let events_file = format!("events.{rank}.binpb");
let events_path = output_path.join(events_file);
events.add_subscriber(Box::new(ProtoEventsWriter::new(&events_path)));
let travel_time_collector = Box::new(TravelTimeCollector::new());
Expand All @@ -148,9 +145,6 @@ fn execute_partition<C: SimCommunicator + 'static>(comm: C, args: &CommandLineAr
};
let net_message_broker = NetMessageBroker::new(rc, &network, &network_partition);

let start_time = config.simulation().start_time;
let end_time = config.simulation().end_time;

let mut simulation: Simulation<C> = Simulation::new(
config,
network_partition,
Expand All @@ -161,9 +155,7 @@ fn execute_partition<C: SimCommunicator + 'static>(comm: C, args: &CommandLineAr
replanner,
);

profiling::measure_duration(None, "Overall Execution Time", None, || {
simulation.run(start_time, end_time)
});
simulation.run();
}

/// Have this more complicated join logic, so that threads in the back of the handle vec can also
Expand Down
2 changes: 2 additions & 0 deletions src/simulation/id/id_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,11 @@ enum IdCompression {
}

fn serialize_to_file(store: &IdStore, file_path: &Path, compression: IdCompression) {
info!("Starting writing IdStore to file {file_path:?}");
let file = File::create(file_path).unwrap();
let mut file_writer = BufWriter::new(file);
serialize(store, &mut file_writer, compression);
info!("Finished writing IdStore to file {file_path:?}");
}

fn serialize<W: Write>(store: &IdStore, writer: &mut W, compression: IdCompression) {
Expand Down
38 changes: 23 additions & 15 deletions src/simulation/logging.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
use std::io;
use std::path::Path;
use std::path::PathBuf;

use tracing::level_filters::LevelFilter;
use tracing::Level;
use tracing_appender::non_blocking::WorkerGuard;
use tracing_appender::{non_blocking, rolling};
use tracing_subscriber::fmt;
use tracing_subscriber::fmt::writer::MakeWriterExt;
use tracing_subscriber::fmt::format::FmtSpan;
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::Layer;

use crate::simulation::config::{Config, Profiling};
use crate::simulation::profiling::{SpanDurationToCSVLayer, WriterGuard};

pub fn init_std_out_logging() {
let collector = tracing_subscriber::registry().with(
fmt::Layer::new()
Expand All @@ -19,20 +21,31 @@ pub fn init_std_out_logging() {
tracing::subscriber::set_global_default(collector).expect("Unable to set a global collector");
}

pub fn init_logging(dir: &Path, file_discriminant: &str) -> (WorkerGuard, WorkerGuard) {
pub fn init_logging(
config: &Config,
file_discriminant: &str,
) -> (WorkerGuard, Option<WriterGuard>) {
let dir = PathBuf::from(&config.output().output_dir);
let log_file_name = format!("log_process_{file_discriminant}.txt");
let log_file_appender = rolling::never(dir, log_file_name);
let log_file_appender = rolling::never(&dir, log_file_name);
let (log_file, _guard_log) = non_blocking(log_file_appender);

let trace_dir = dir.join("trace");
let trace_file_name = format!("trace_process_{file_discriminant}.txt");
let trace_file_appender = rolling::never(trace_dir, trace_file_name);
let (trace_file, _guard_performance) = non_blocking(trace_file_appender);
let (csv_layer, guard) = if config.output().profiling.eq(&Profiling::CSV) {
let duration_dir = dir.join("instrument");
let duration_file_name = format!("instrument_process_{file_discriminant}.csv");
let duration_path = duration_dir.join(duration_file_name);
let (layer, writer_guard) = SpanDurationToCSVLayer::new(&duration_path);
(Some(layer), Some(writer_guard))
} else {
(None, None)
};

let collector = tracing_subscriber::registry()
.with(csv_layer)
.with(
fmt::Layer::new()
.with_writer(io::stdout)
.with_span_events(FmtSpan::CLOSE)
.with_filter(LevelFilter::INFO),
)
.with(
Expand All @@ -41,12 +54,7 @@ pub fn init_logging(dir: &Path, file_discriminant: &str) -> (WorkerGuard, Worker
.json()
.with_ansi(false)
.with_filter(LevelFilter::DEBUG),
)
.with(
fmt::Layer::new()
.with_writer(trace_file.with_min_level(Level::TRACE))
.json(),
);
tracing::subscriber::set_global_default(collector).expect("Unable to set a global collector");
(_guard_log, _guard_performance)
(_guard_log, guard)
}
2 changes: 2 additions & 0 deletions src/simulation/messaging/communication/communicators.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use mpi::datatype::PartitionMut;
use mpi::point_to_point::{Destination, Source};
use mpi::topology::{Communicator, SystemCommunicator};
use mpi::{Count, Rank};
use tracing::instrument;

use crate::simulation::wire_types::messages::{SimMessage, SyncMessage, TravelTimesMessage};

Expand Down Expand Up @@ -167,6 +168,7 @@ pub struct MpiSimCommunicator {
}

impl SimCommunicator for MpiSimCommunicator {
#[instrument(level = "trace", skip_all, fields(rank = self.rank()))]
fn send_receive_vehicles<F>(
&self,
out_messages: HashMap<u32, SyncMessage>,
Expand Down
3 changes: 2 additions & 1 deletion src/simulation/messaging/events.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::any::Any;
use std::collections::HashMap;

use tracing::info;
use tracing::{info, instrument};

use crate::simulation::wire_types::events::event::Type::{
ActEnd, ActStart, Arrival, Departure, Generic, LinkEnter, LinkLeave, PersonEntersVeh,
Expand Down Expand Up @@ -85,6 +85,7 @@ impl EventsPublisher {
}
}

#[instrument(skip_all, level = "trace")]
pub fn finish(&mut self) {
for handler in self.handlers.iter_mut() {
handler.finish();
Expand Down
14 changes: 1 addition & 13 deletions src/simulation/network/global_network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,19 +139,7 @@ impl Network {
}
}
}
PartitionMethod::None => {
// We can have the situation, that someone specified more partitions in the network file than the actual simulation is started with.
// Since the partitioning should normally be precomputed with the same number, it's ok to not check this here.
// But for testing purposes (compare base case with 1 partition and with more) we reset the partition of the nodes in that case.
if num_parts == 1 {
for n in network.nodes.iter_mut() {
n.partition = 0;
}
for l in network.links.iter_mut() {
l.partition = 0;
}
}
}
PartitionMethod::None => {}
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/simulation/network/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ pub fn from_file(path: &Path) -> Network {
pub fn to_file(network: &Network, path: &Path) {
if path.extension().unwrap().eq("binpb") {
write_to_proto(network, path);
} else if path.extension().unwrap().eq("xml") || path.extension().unwrap().eq("xml.gz") {
} else if path.extension().unwrap().eq("xml") || path.extension().unwrap().eq("gz") {
write_to_xml(network, path);
} else {
panic!("Tried to write {path:?} . File format not supported. Either use `.xml`, `.xml.gz`, or `.binpb` as extension");
Expand Down
Loading

0 comments on commit 227f8d4

Please sign in to comment.