diff --git a/.github/workflows/build-and-test.yml b/.github/workflows/build-and-test.yml index c75ebcf7..db6d6261 100644 --- a/.github/workflows/build-and-test.yml +++ b/.github/workflows/build-and-test.yml @@ -10,6 +10,7 @@ on: - 'CITATION' - 'book.toml' - 'CONTRIBUTING.md' + - '.github/workflows/exe-release-prometheuspush.yml' pull_request: branches: [ main, dev ] paths-ignore: @@ -18,6 +19,7 @@ on: - 'CHANGELOG.md' - 'CITATION' - 'book.toml' + - '.github/workflows/exe-release-prometheuspush.yml' env: CARGO_TERM_COLOR: always @@ -171,7 +173,7 @@ jobs: rustup toolchain install stable-x86_64-pc-windows-msvc - name: Tests run: | - cargo test --no-default-features --features "prometheus json riemann" + cargo test --no-default-features --features "prometheus prometheuspush json riemann" exporters - name: Build (debug mode) run: | - cargo build --no-default-features --features "prometheus json riemann" + cargo build --no-default-features --features "prometheus prometheuspush json riemann" diff --git a/.github/workflows/exe-release-prometheuspush.yml b/.github/workflows/exe-release-prometheuspush.yml index 2e8963ca..d7ffd4f7 100644 --- a/.github/workflows/exe-release-prometheuspush.yml +++ b/.github/workflows/exe-release-prometheuspush.yml @@ -9,10 +9,10 @@ on: - 'book.toml' - 'CONTRIBUTING.md' tags: [ 'v*.*.*', 'dev*.*.*' ] - branches: [ '311-github-workflow-to-build-and-publish-a-exemsi-file-including-signed-rapl-driver-at-each-tagrelease' ] + branches: [ '336-proper-handling-of-windows-service-management' ] env: - WRD_VERSION: v0.0.2 + WRD_VERSION: v0.0.4 WRD_BASE_URL: https://github.com/hubblo-org/windows-rapl-driver/releases/download jobs: @@ -31,6 +31,7 @@ jobs: & "D:\a\scaphandre\scaphandre\$dest" /verysilent /suppressmsgbox ls "C:\Program Files (x86)\Inno Setup 6\" - name: Get windows-rapl-driver + shell: pwsh run: | $dest = "DriverLoader.exe" $url = "${{ env.WRD_BASE_URL }}/${{ env.WRD_VERSION }}/DriverLoader.exe" @@ -44,16 +45,6 @@ jobs: $dest = "ScaphandreDrv.inf" $url = "${{ env.WRD_BASE_URL }}/${{ env.WRD_VERSION }}/ScaphandreDrv.inf" Invoke-WebRequest -Uri ($url -replace '"', "") -OutFile $dest - $dest = "ScaphandreDrvTest.cer" - $url = "${{ env.WRD_BASE_URL }}/${{ env.WRD_VERSION }}/ScaphandreDrvTest.cer" - Invoke-WebRequest -Uri ($url -replace '"', "") -OutFile $dest - $dest = "devcon.exe" - $url = "${{ env.WRD_BASE_URL }}/${{ env.WRD_VERSION }}/devcon.exe" - Invoke-WebRequest -Uri ($url -replace '"', "") -OutFile $dest - $dest = "certmgr.exe" - $url = "${{ env.WRD_BASE_URL }}/${{ env.WRD_VERSION }}/certmgr.exe" - Invoke-WebRequest -Uri ($url -replace '"', "") -OutFile $dest - ls - name: Install Rustup uses: crazy-max/ghaction-chocolatey@v2 with: @@ -70,14 +61,14 @@ jobs: - name: Upload artifact #Install-PackageProvider -Name NuGet -MinimumVersion 2.8.5.201 -Force run: | Set-PSRepository -Name 'PSGallery' -InstallationPolicy Trusted - Install-Module -Confirm:$False -Name AWS.Tools.Installer + Install-Module -Confirm:$False -Name AWS.Tools.Installer Set-ExecutionPolicy -ExecutionPolicy RemoteSigned -Scope LocalMachine Import-Module AWS.Tools.Installer - Install-AWSToolsModule AWS.Tools.EC2,AWS.Tools.S3 -CleanUp -Confirm:$False + Install-AWSToolsModule AWS.Tools.EC2,AWS.Tools.S3 -CleanUp -Confirm:$False -AllowClobber Set-AWSCredential -AccessKey ${{ secrets.S3_ACCESS_KEY_ID }} -SecretKey ${{ secrets.S3_SECRET_ACCESS_KEY }} -StoreAs default - mv packaging/windows/Output/scaphandre_installer.exe scaphandre_${GITHUB_REF_NAME}_installer.exe + mv packaging/windows/Output/scaphandre_installer.exe scaphandre_${{ github.ref_name }}_installer.exe $clientconfig=@{ SignatureVersion="s3v4" ServiceUrl="https://s3.fr-par.scw.cloud" } - Write-S3Object -EndpointUrl "https://s3.fr-par.scw.cloud" -Region "fr-par" -BucketName "scaphandre" -File scaphandre_${GITHUB_REF_NAME}_installer.exe -key "x86_64/scaphandre_${GITHUB_REF_NAME}_installer.exe" -PublicReadOnly -ClientConfig $clientconfig \ No newline at end of file + Write-S3Object -EndpointUrl "https://s3.fr-par.scw.cloud" -Region "fr-par" -BucketName "scaphandre" -File scaphandre_${{ github.ref_name }}_installer.exe -key "x86_64/scaphandre_${{ github.ref_name }}_installer.exe" -PublicReadOnly -ClientConfig $clientconfig \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index 3821d73d..94da456c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -118,6 +118,12 @@ version = "0.13.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9e1b586273c5702936fe7b7d6896644d8be71e6314cfe09d3167c95f712589e8" +[[package]] +name = "bit_field" +version = "0.10.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dc827186963e592360843fb5ba4b973e145841266c1357f7180c43526f2e5b61" + [[package]] name = "bitflags" version = "1.3.2" @@ -261,6 +267,17 @@ version = "0.8.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e496a50fda8aacccc86d7529e2c1e0892dbd0f898a6b5645b5561b89c3210efa" +[[package]] +name = "core_affinity" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "622892f5635ce1fc38c8f16dfc938553ed64af482edb5e150bf4caedbfcb2304" +dependencies = [ + "libc", + "num_cpus", + "winapi", +] + [[package]] name = "crc32fast" version = "1.3.2" @@ -1303,6 +1320,15 @@ dependencies = [ "rand_core", ] +[[package]] +name = "raw-cpuid" +version = "10.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c297679cb867470fa8c9f67dbba74a78d78e3e98d7cf2b08d6d71540f797332" +dependencies = [ + "bitflags", +] + [[package]] name = "rayon" version = "1.7.0" @@ -1465,6 +1491,7 @@ dependencies = [ "chrono", "clap", "colored", + "core_affinity", "docker-sync", "hostname", "hyper", @@ -1476,6 +1503,7 @@ dependencies = [ "procfs", "protobuf", "rand", + "raw-cpuid", "regex", "riemann_client", "serde", @@ -1486,6 +1514,7 @@ dependencies = [ "warp10", "windows 0.27.0", "windows-service", + "x86", ] [[package]] @@ -2292,6 +2321,17 @@ version = "0.48.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1a515f5799fe4961cb532f983ce2b23082366b898e52ffbce459c86f67c8378a" +[[package]] +name = "x86" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2781db97787217ad2a2845c396a5efe286f87467a5810836db6d74926e94a385" +dependencies = [ + "bit_field", + "bitflags", + "raw-cpuid", +] + [[package]] name = "yaml-rust" version = "0.4.5" diff --git a/Cargo.toml b/Cargo.toml index cf0319e9..2074d408 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -38,8 +38,11 @@ isahc = { version = "1.7.2", optional = true } procfs = { version = "0.15.0" } [target.'cfg(target_os="windows")'.dependencies] -windows = { version = "0.27.0", features = ["alloc","Win32_Storage_FileSystem","Win32_Foundation","Win32_Security","Win32_System_IO","Win32_System_Ioctl"]} +windows = { version = "0.27.0", features = ["alloc","Win32_Storage_FileSystem","Win32_Foundation","Win32_Security","Win32_System_IO","Win32_System_Ioctl","Win32_System_Threading", "Win32_System_SystemInformation"]} windows-service = { version = "0.6.0" } +raw-cpuid = { version = "10.5.0" } +core_affinity = { version = "0.8.1"} +x86 = { version = "0.52.0" } [features] default = ["prometheus", "riemann", "warpten", "json", "containers", "prometheuspush"] diff --git a/packaging/windows/dev_installer.iss b/packaging/windows/dev_installer.iss index fb06f118..710174d8 100644 --- a/packaging/windows/dev_installer.iss +++ b/packaging/windows/dev_installer.iss @@ -40,13 +40,13 @@ Name: "english"; MessagesFile: "compiler:Default.isl" [Files] Source: "{#MyAppSourceFolder}\target\release\{#MyAppExeName}"; DestDir: "{app}"; Flags: ignoreversion Source: "{#RaplDriverSourceFolder}\x64\Release\DriverLoader.exe"; DestDir: "{app}"; Flags: ignoreversion -Source: "{#RaplDriverSourceFolder}\ScaphandreDrv\ScaphandreDrv.inf"; DestDir: "{app}"; Flags: ignoreversion -; Source: "{#RaplDriverSourceFolder}\ScaphandreDrv\ScaphandreDrv.sys"; DestDir: "{#SystemFolder}"; -; Source: "{#RaplDriverSourceFolder}\ScaphandreDrv\ScaphandreDrv.sys"; DestDir: "{#System64Folder}"; -Source: "{#RaplDriverSourceFolder}\ScaphandreDrv\ScaphandreDrv.sys"; DestDir: "{app}"; -Source: "{#RaplDriverSourceFolder}\ScaphandreDrv\ScaphandreDrv.cat"; DestDir: "{app}"; -; Source: "{#RaplDriverSourceFolder}\ScaphandreDrv\ScaphandreDrv.cat"; DestDir: "{#SystemFolder}"; -; Source: "{#RaplDriverSourceFolder}\ScaphandreDrv\ScaphandreDrv.cat"; DestDir: "{#System64Folder}"; +Source: "{#RaplDriverSourceFolder}\x64\Release\ScaphandreDrv\ScaphandreDrv.inf"; DestDir: "{app}"; Flags: ignoreversion +; Source: "{#RaplDriverSourceFolder}\x64\Release\ScaphandreDrv\ScaphandreDrv.sys"; DestDir: "{#SystemFolder}"; +; Source: "{#RaplDriverSourceFolder}\x64\Release\ScaphandreDrv\ScaphandreDrv.sys"; DestDir: "{#System64Folder}"; +Source: "{#RaplDriverSourceFolder}\x64\Release\ScaphandreDrv\ScaphandreDrv.sys"; DestDir: "{app}"; +Source: "{#RaplDriverSourceFolder}\x64\Release\ScaphandreDrv\ScaphandreDrv.cat"; DestDir: "{app}"; +; Source: "{#RaplDriverSourceFolder}\x64\Release\ScaphandreDrv\ScaphandreDrv.cat"; DestDir: "{#SystemFolder}"; +; Source: "{#RaplDriverSourceFolder}\x64\Release\ScaphandreDrv\ScaphandreDrv.cat"; DestDir: "{#System64Folder}"; Source: "C:\Program Files (x86)\Windows Kits\10\Tools\10.0.22621.0\x64\devcon.exe"; DestDir: "{app}"; Flags: ignoreversion Source: "C:\Program Files (x86)\Windows Kits\10\bin\10.0.22621.0\x64\certmgr.exe"; DestDir: "{app}"; Flags: ignoreversion Source: "{#MyAppSourceFolder}\README.md"; DestDir: "{app}"; Flags: ignoreversion diff --git a/packaging/windows/installer.iss b/packaging/windows/installer.iss index bc5287ac..4817f2a9 100644 --- a/packaging/windows/installer.iss +++ b/packaging/windows/installer.iss @@ -45,11 +45,10 @@ Source: "../../ScaphandreDrv.sys"; DestDir: "{app}"; Source: "../../ScaphandreDrv.cat"; DestDir: "{app}"; ; Source: "../../ScaphandreDrv.cat"; DestDir: "{#SystemFolder}"; ; Source: "../../ScaphandreDrv.cat"; DestDir: "{#System64Folder}"; -Source: "../../devcon.exe"; DestDir: "{app}"; Flags: ignoreversion -Source: "../../certmgr.exe"; DestDir: "{app}"; Flags: ignoreversion +Source: "C:\Program Files (x86)\Windows Kits\10\Tools\10.0.22621.0\x64\devcon.exe"; DestDir: "{app}"; Flags: ignoreversion +Source: "C:\Program Files (x86)\Windows Kits\10\bin\10.0.22621.0\x64\certmgr.exe"; DestDir: "{app}"; Flags: ignoreversion Source: "../../README.md"; DestDir: "{app}"; Flags: ignoreversion Source: "../../CHANGELOG.md"; DestDir: "{app}"; Flags: ignoreversion -Source: "../../ScaphandreDrvTest.cer"; DestDir: "{app}"; Flags: ignoreversion ; NOTE: Don't use "Flags: ignoreversion" on any shared system files [Icons] diff --git a/packaging/windows/register_log_source.ps1 b/packaging/windows/register_log_source.ps1 new file mode 100644 index 00000000..e6d7283b --- /dev/null +++ b/packaging/windows/register_log_source.ps1 @@ -0,0 +1,40 @@ +# https://github.com/dansmith +# +$source = "scaphandre" + + +$wid=[System.Security.Principal.WindowsIdentity]::GetCurrent() +$prp=new-object System.Security.Principal.WindowsPrincipal($wid) +$adm=[System.Security.Principal.WindowsBuiltInRole]::Administrator +$IsAdmin=$prp.IsInRole($adm) + +if($IsAdmin -eq $false) +{ + [System.Reflection.Assembly]::LoadWithPartialName(“System.Windows.Forms”) + [Windows.Forms.MessageBox]::Show(“Please run this as an Administrator”, + “Not Administrator”, + [Windows.Forms.MessageBoxButtons]::OK, + [Windows.Forms.MessageBoxIcon]::Information) + exit +} + + +if ([System.Diagnostics.EventLog]::SourceExists($source) -eq $false) +{ + [System.Diagnostics.EventLog]::CreateEventSource($source, "Application") + + [System.Reflection.Assembly]::LoadWithPartialName(“System.Windows.Forms”) + [Windows.Forms.MessageBox]::Show(“Event log created successfully”, + “Complete”, + [Windows.Forms.MessageBoxButtons]::OK, + [Windows.Forms.MessageBoxIcon]::Information) +} +else +{ + [System.Reflection.Assembly]::LoadWithPartialName(“System.Windows.Forms”) + [Windows.Forms.MessageBox]::Show(“Event log already exists”, + “Complete”, + [Windows.Forms.MessageBoxButtons]::OK, + [Windows.Forms.MessageBoxIcon]::Information) + +} \ No newline at end of file diff --git a/src/exporters/mod.rs b/src/exporters/mod.rs index 036165d2..5b3d750c 100644 --- a/src/exporters/mod.rs +++ b/src/exporters/mod.rs @@ -630,23 +630,6 @@ impl MetricGenerator { description: String::from("Total swap space on the host, in bytes."), metric_value: MetricValueType::Text(metric_value.value), }); - - if let Some(psys) = self.topology.get_rapl_psys_energy_microjoules() { - self.data.push(Metric { - name: String::from("scaph_host_rapl_psys_microjoules"), - metric_type: String::from("counter"), - ttl: 60.0, - timestamp: psys.timestamp, - hostname: self.hostname.clone(), - state: String::from("ok"), - tags: vec!["scaphandre".to_string()], - attributes: HashMap::new(), - description: String::from( - "Raw extract of RAPL PSYS domain energy value, in microjoules", - ), - metric_value: MetricValueType::Text(psys.value), - }) - } } /// Generate socket metrics. @@ -887,7 +870,7 @@ impl MetricGenerator { /// Generate process metrics. fn gen_process_metrics(&mut self) { - debug!("In gen_process_metrics."); + trace!("In gen_process_metrics."); #[cfg(feature = "containers")] if self.watch_containers { let now = current_system_time_since_epoch().as_secs().to_string(); @@ -1029,7 +1012,7 @@ impl MetricGenerator { Utc::now().format("%Y-%m-%dT%H:%M:%S") ); self.gen_process_metrics(); - debug!("self_metrics: {:#?}", self.data); + trace!("self_metrics: {:#?}", self.data); } pub fn pop_metrics(&mut self) -> Vec { diff --git a/src/exporters/prometheus.rs b/src/exporters/prometheus.rs index 05065cc2..29d7cd01 100644 --- a/src/exporters/prometheus.rs +++ b/src/exporters/prometheus.rs @@ -5,8 +5,8 @@ //! [scrape](https://prometheus.io/docs/prometheus/latest/getting_started). use super::utils; -use crate::current_system_time_since_epoch; use crate::exporters::{Exporter, MetricGenerator, MetricValueType}; +use crate::sensors::utils::current_system_time_since_epoch; use crate::sensors::{Sensor, Topology}; use chrono::Utc; use hyper::service::{make_service_fn, service_fn}; diff --git a/src/exporters/stdout.rs b/src/exporters/stdout.rs index e3d0717d..ce50c727 100644 --- a/src/exporters/stdout.rs +++ b/src/exporters/stdout.rs @@ -110,8 +110,14 @@ impl StdoutExporter { fn summarized_view(&mut self, metrics: Vec) { let mut metrics_iter = metrics.iter(); let none_value = MetricValueType::Text("0".to_string()); + let mut host_power_source = String::from(""); let host_power = match metrics_iter.find(|x| x.name == "scaph_host_power_microwatts") { - Some(m) => &m.metric_value, + Some(m) => { + if let Some(src) = &m.attributes.get("value_source") { + host_power_source = src.to_string() + } + &m.metric_value + } None => &none_value, }; @@ -121,8 +127,9 @@ impl StdoutExporter { } println!( - "Host:\t{} W", - (format!("{host_power}").parse::().unwrap() / 1000000.0) + "Host:\t{} W from {}", + (format!("{host_power}").parse::().unwrap() / 1000000.0), + host_power_source ); if domain_names.is_some() { @@ -133,6 +140,7 @@ impl StdoutExporter { .iter() .filter(|x| x.name == "scaph_socket_power_microwatts") { + debug!("✅ Found socket power metric !"); let power = format!("{}", s.metric_value).parse::().unwrap() / 1000000.0; let mut power_str = String::from("----"); if power > 0.0 { @@ -176,6 +184,8 @@ impl StdoutExporter { } } println!("{to_print}\n"); + } else { + println!("{to_print} Could'nt get per-domain metrics.\n"); } } diff --git a/src/lib.rs b/src/lib.rs index 60b65c46..af59dc34 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -14,8 +14,6 @@ use sensors::msr_rapl; #[cfg(target_os = "linux")] use sensors::powercap_rapl; -use std::time::{Duration, SystemTime}; - /// Create a new [`Sensor`] instance with the default sensor available, /// with its default options. pub fn get_default_sensor() -> impl sensors::Sensor { @@ -30,12 +28,6 @@ pub fn get_default_sensor() -> impl sensors::Sensor { return msr_rapl::MsrRAPLSensor::new(); } -fn current_system_time_since_epoch() -> Duration { - SystemTime::now() - .duration_since(SystemTime::UNIX_EPOCH) - .unwrap() -} - // Copyright 2020 The scaphandre authors. // // Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/src/main.rs b/src/main.rs index 83db7998..f1d68881 100644 --- a/src/main.rs +++ b/src/main.rs @@ -19,7 +19,7 @@ use windows_service::{ service::ServiceStatus, service::ServiceType, service_control_handler::{self, ServiceControlHandlerResult}, - service_dispatcher, Result, + service_dispatcher, }; #[cfg(target_os = "windows")] @@ -111,19 +111,46 @@ enum ExporterChoice { } #[cfg(target_os = "windows")] -fn my_service_main(arguments: Vec) { - if let Err(_e) = run_service(arguments) { - // Handle errors in some way. - } -} +fn my_service_main(_arguments: Vec) { + use std::thread::JoinHandle; + let graceful_period = 3; + + let start_status = ServiceStatus { + service_type: ServiceType::OWN_PROCESS, // Should match the one from system service registry + current_state: ServiceState::Running, // The new state + controls_accepted: ServiceControlAccept::STOP, // Accept stop events when running + exit_code: ServiceExitCode::Win32(0), // Used to report an error when starting or stopping only, otherwise must be zero + checkpoint: 0, // Only used for pending states, otherwise must be zero + wait_hint: Duration::default(), // Only used for pending states, otherwise must be zero + process_id: None, // Unused for setting status + }; + let stop_status = ServiceStatus { + service_type: ServiceType::OWN_PROCESS, + current_state: ServiceState::Stopped, + controls_accepted: ServiceControlAccept::STOP, + exit_code: ServiceExitCode::Win32(0), + checkpoint: 0, + wait_hint: Duration::default(), + process_id: None, + }; + let stoppending_status = ServiceStatus { + service_type: ServiceType::OWN_PROCESS, + current_state: ServiceState::StopPending, + controls_accepted: ServiceControlAccept::STOP, + exit_code: ServiceExitCode::Win32(0), + checkpoint: 0, + wait_hint: Duration::from_secs(graceful_period), + process_id: None, + }; -#[cfg(target_os = "windows")] -fn run_service(_arguments: Vec) -> Result<()> { - #[cfg(target_os = "windows")] + let thread_handle: Option>; + let mut _stop = false; let event_handler = move |control_event| -> ServiceControlHandlerResult { + println!("Got service control event: {:?}", control_event); match control_event { ServiceControl::Stop => { // Handle stop event and return control back to the system. + _stop = true; ServiceControlHandlerResult::NoError } // All services must accept Interrogate even if it's a no-op. @@ -131,35 +158,62 @@ fn run_service(_arguments: Vec) -> Result<()> { _ => ServiceControlHandlerResult::NotImplemented, } }; - #[cfg(target_os = "windows")] - if let Ok(system_handler) = service_control_handler::register("Scaphandre", event_handler) { - let next_status = ServiceStatus { - // Should match the one from system service registry - service_type: ServiceType::OWN_PROCESS, - // The new state - current_state: ServiceState::Running, - // Accept stop events when running - controls_accepted: ServiceControlAccept::STOP, - // Used to report an error when starting or stopping only, otherwise must be zero - exit_code: ServiceExitCode::Win32(0), - // Only used for pending states, otherwise must be zero - checkpoint: 0, - // Only used for pending states, otherwise must be zero - wait_hint: Duration::default(), - // Unused for setting status - process_id: None, - }; - - // Tell the system that the service is running now - if let Ok(_status_set) = system_handler.set_service_status(next_status) { - parse_cli_and_run_exporter(); - } else { - panic!("Couldn't set Windows service status."); + + if let Ok(system_handler) = service_control_handler::register("scaphandre", event_handler) { + // Tell the system that the service is running now and run it + match system_handler.set_service_status(start_status.clone()) { + Ok(status_set) => { + println!( + "Starting main thread, service status has been set: {:?}", + status_set + ); + thread_handle = Some(std::thread::spawn(move || { + parse_cli_and_run_exporter(); + })); + } + Err(e) => { + panic!("Couldn't set Windows service status. Error: {:?}", e); + } + } + loop { + if _stop { + // Wait for the thread to finnish, then end the current function + match system_handler.set_service_status(stoppending_status.clone()) { + Ok(status_set) => { + println!("Stop status has been set for service: {:?}", status_set); + if let Some(thr) = thread_handle { + if thr.join().is_ok() { + match system_handler.set_service_status(stop_status.clone()) { + Ok(laststatus_set) => { + println!( + "Scaphandre gracefully stopped: {:?}", + laststatus_set + ); + } + Err(e) => { + panic!( + "Could'nt set Stop status on scaphandre service: {:?}", + e + ); + } + } + } else { + panic!("Joining the thread failed."); + } + break; + } else { + panic!("Thread handle was not initialized."); + } + } + Err(e) => { + panic!("Couldn't set Windows service status. Error: {:?}", e); + } + } + } } } else { - panic!("Couldn't get Windows system events handler."); + panic!("Failed getting system_handle."); } - Ok(()) } fn main() { diff --git a/src/sensors/mod.rs b/src/sensors/mod.rs index 42ffcd63..1085f9ea 100644 --- a/src/sensors/mod.rs +++ b/src/sensors/mod.rs @@ -3,8 +3,10 @@ //! `Sensor` is the root for all sensors. It defines the [Sensor] trait //! needed to implement a sensor. -#[cfg(not(target_os = "linux"))] +#[cfg(target_os = "windows")] pub mod msr_rapl; +#[cfg(target_os = "windows")] +use msr_rapl::get_msr_value; #[cfg(target_os = "linux")] pub mod powercap_rapl; pub mod units; @@ -169,6 +171,7 @@ impl Topology { let sysinfo_system = System::new_all(); let sysinfo_cores = sysinfo_system.cpus(); + warn!("Sysinfo sees {}", sysinfo_cores.len()); #[cfg(target_os = "linux")] let cpuinfo = CpuInfo::new().unwrap(); for (id, c) in (0_u16..).zip(sysinfo_cores.iter()) { @@ -198,7 +201,7 @@ impl Topology { counter_uj_path: String, buffer_max_kbytes: u16, sensor_data: HashMap, - ) { + ) -> Option { if !self.sockets.iter().any(|s| s.id == socket_id) { let socket = CPUSocket::new( socket_id, @@ -208,6 +211,16 @@ impl Topology { buffer_max_kbytes, sensor_data, ); + let res = socket.clone(); + self.sockets.push(socket); + Some(res) + } else { + None + } + } + + pub fn safe_insert_socket(&mut self, socket: CPUSocket) { + if !self.sockets.iter().any(|s| s.id == socket.id) { self.sockets.push(socket); } } @@ -240,6 +253,10 @@ impl Topology { self.domains_names = Some(domain_names); } + pub fn set_domains_names(&mut self, names: Vec) { + self.domains_names = Some(names); + } + /// Adds a Domain instance to a given socket, if and only if the domain /// id doesn't exist already for the socket. pub fn safe_add_domain_to_socket( @@ -268,6 +285,7 @@ impl Topology { /// Generates CPUCore instances for the host and adds them /// to appropriate CPUSocket instance from self.sockets + #[cfg(target_os = "linux")] pub fn add_cpu_cores(&mut self) { if let Some(mut cores) = Topology::generate_cpu_cores() { while let Some(c) = cores.pop() { @@ -292,8 +310,31 @@ impl Topology { warn!("coud't not match core to socket - mapping to first socket instead - if you are not using --vm there is something wrong") } } + + //#[cfg(target_os = "windows")] + //{ + //TODO: fix + //let nb_sockets = &self.sockets.len(); + //let mut socket_counter = 0; + //let nb_cores_per_socket = &cores.len() / nb_sockets; + //warn!("nb_cores_per_socket: {} cores_len: {} sockets_len: {}", nb_cores_per_socket, &cores.len(), &self.sockets.len()); + //for s in self.sockets.iter_mut() { + // for c in (socket_counter * nb_cores_per_socket)..((socket_counter+1) * nb_cores_per_socket) { + // match cores.pop() { + // Some(core) => { + // warn!("adding core {} to socket {}", core.id, s.id); + // s.add_cpu_core(core); + // }, + // None => { + // error!("Uneven number of CPU cores !"); + // } + // } + // } + // socket_counter = socket_counter + 1; + //} + //} } else { - warn!("Couldn't retrieve any CPU Core from the topology. (generate_cpu_cores)"); + panic!("Couldn't retrieve any CPU Core from the topology. (generate_cpu_cores)"); } } @@ -409,29 +450,36 @@ impl Topology { .record_buffer .get(self.record_buffer.len() - 2) .unwrap(); - - if let Ok(last_microjoules) = last_record.value.trim().parse::() { - if let Ok(previous_microjoules) = previous_record.value.trim().parse::() { - if previous_microjoules > last_microjoules { - return None; + match previous_record.value.trim().parse::() { + Ok(previous_microjoules) => match last_record.value.trim().parse::() { + Ok(last_microjoules) => { + if previous_microjoules > last_microjoules { + return None; + } + let microjoules = last_microjoules - previous_microjoules; + let time_diff = last_record.timestamp.as_secs_f64() + - previous_record.timestamp.as_secs_f64(); + let microwatts = microjoules as f64 / time_diff; + return Some(Record::new( + last_record.timestamp, + (microwatts as u64).to_string(), + units::Unit::MicroWatt, + )); } - let microjoules = last_microjoules - previous_microjoules; - let time_diff = last_record.timestamp.as_secs_f64() - - previous_record.timestamp.as_secs_f64(); - let microwatts = microjoules as f64 / time_diff; - return Some(Record::new( - last_record.timestamp, - (microwatts as u64).to_string(), - units::Unit::MicroWatt, - )); - } else { + Err(e) => { + warn!( + "Could'nt get previous_microjoules - value : '{}' - error : {:?}", + previous_record.value, e + ); + } + }, + Err(e) => { warn!( - "Could'nt get previous_microjoules: {}", - previous_record.value + "Couldn't parse previous_microjoules - value : '{}' - error : {:?}", + previous_record.value.trim(), + e ); } - } else { - warn!("Could'nt get last_microjoules: {}", last_record.value); } } None @@ -860,6 +908,7 @@ impl Topology { None } + #[cfg(target_os = "linux")] pub fn get_rapl_psys_energy_microjoules(&self) -> Option { if let Some(psys) = self._sensor_data.get("psys") { match &fs::read_to_string(format!("{psys}/energy_uj")) { @@ -875,22 +924,36 @@ impl Topology { warn!("PSYS Error: {:?}", e); } } + } else { + debug!("Asked for PSYS but there is no psys entry in sensor_data."); } None } - //pub fn get_rapl_psys_power_microwatts(&self) -> Option { - // if let Some(psys) = self._sensor_data.get("psys") { - // if let Ok(val) = &fs::read_to_string(format!("{psys}/energy_uj")) { - // return Some(Record::new( - // current_system_time_since_epoch(), - // val.to_string(), - // units::Unit::MicroJoule - // )); - // } - // } - // None - //} + /// # Safety + /// + /// This function is unsafe rust as it calls get_msr_value function from msr_rapl sensor module. + /// It calls the msr_RAPL::MSR_PLATFORM_ENERGY_STATUS MSR address, which has been tested on several Intel x86 processors + /// but might fail on AMD (needs testing). That being said, it returns None if the msr query fails (which means if the Windows + /// driver fails.) and should not prevent from using a value coming from elsewhere, which means from another get_msr_value calls + /// targeting another msr address. + #[cfg(target_os = "windows")] + pub unsafe fn get_rapl_psys_energy_microjoules(&self) -> Option { + let msr_addr = msr_rapl::MSR_PLATFORM_ENERGY_STATUS; + match get_msr_value(0, msr_addr.into(), &self._sensor_data) { + Ok(res) => { + return Some(Record::new( + current_system_time_since_epoch(), + res.value.to_string(), + units::Unit::MicroJoule, + )) + } + Err(e) => { + debug!("get_msr_value returned error : {}", e); + } + } + None + } } // !!!!!!!!!!!!!!!!! CPUSocket !!!!!!!!!!!!!!!!!!!!!!! @@ -1013,6 +1076,10 @@ impl CPUSocket { } } + pub fn set_id(&mut self, id: u16) { + self.id = id + } + /// Adds a new Domain instance to the domains vector if and only if it doesn't exist in the vector already. fn safe_add_domain(&mut self, domain: Domain) { if !self.domains.iter().any(|d| d.id == domain.id) { @@ -1112,16 +1179,17 @@ impl CPUSocket { steal: Some(0), }; for c in &self.cpu_cores { - let c_stats = c.read_stats().unwrap(); - stats.user += c_stats.user; - stats.nice += c_stats.nice; - stats.system += c_stats.system; - stats.idle += c_stats.idle; - stats.iowait = - Some(stats.iowait.unwrap_or_default() + c_stats.iowait.unwrap_or_default()); - stats.irq = Some(stats.irq.unwrap_or_default() + c_stats.irq.unwrap_or_default()); - stats.softirq = - Some(stats.softirq.unwrap_or_default() + c_stats.softirq.unwrap_or_default()); + if let Some(c_stats) = c.read_stats() { + stats.user += c_stats.user; + stats.nice += c_stats.nice; + stats.system += c_stats.system; + stats.idle += c_stats.idle; + stats.iowait = + Some(stats.iowait.unwrap_or_default() + c_stats.iowait.unwrap_or_default()); + stats.irq = Some(stats.irq.unwrap_or_default() + c_stats.irq.unwrap_or_default()); + stats.softirq = + Some(stats.softirq.unwrap_or_default() + c_stats.softirq.unwrap_or_default()); + } } Some(stats) } @@ -1187,9 +1255,9 @@ impl CPUSocket { &last_record.value, &previous_record.value ); let last_rec_val = last_record.value.trim(); - debug!("socket : l1049 : trying to parse {} as u64", last_rec_val); + debug!("socket : l1187 : trying to parse {} as u64", last_rec_val); let prev_rec_val = previous_record.value.trim(); - debug!("socket : l1051 : trying to parse {} as u64", prev_rec_val); + debug!("socket : l1189 : trying to parse {} as u64", prev_rec_val); if let (Ok(last_microjoules), Ok(previous_microjoules)) = (last_rec_val.parse::(), prev_rec_val.parse::()) { @@ -1213,7 +1281,7 @@ impl CPUSocket { )); } } else { - debug!("Not enough records for socket"); + warn!("Not enough records for socket"); } None } diff --git a/src/sensors/msr_rapl.rs b/src/sensors/msr_rapl.rs index b06095f7..36dd1395 100644 --- a/src/sensors/msr_rapl.rs +++ b/src/sensors/msr_rapl.rs @@ -1,29 +1,39 @@ use crate::sensors::utils::current_system_time_since_epoch; -use crate::sensors::{CPUSocket, Domain, Record, RecordReader, Sensor, Topology}; +use crate::sensors::{CPUCore, CPUSocket, Domain, Record, RecordReader, Sensor, Topology}; +use raw_cpuid::{CpuId, TopologyType}; use std::collections::HashMap; use std::error::Error; use std::mem::size_of; -use sysinfo::{System, SystemExt}; +use sysinfo::{CpuExt, System, SystemExt}; use windows::Win32::Foundation::{CloseHandle, GetLastError, HANDLE, INVALID_HANDLE_VALUE}; use windows::Win32::Storage::FileSystem::{ CreateFileW, FILE_FLAG_OVERLAPPED, FILE_GENERIC_READ, FILE_GENERIC_WRITE, FILE_READ_DATA, FILE_SHARE_READ, FILE_SHARE_WRITE, FILE_WRITE_DATA, OPEN_EXISTING, }; use windows::Win32::System::Ioctl::{FILE_DEVICE_UNKNOWN, METHOD_BUFFERED}; +use windows::Win32::System::SystemInformation::GROUP_AFFINITY; +use windows::Win32::System::Threading::{ + GetActiveProcessorGroupCount, GetCurrentProcess, GetCurrentThread, GetProcessGroupAffinity, + GetThreadGroupAffinity, SetThreadGroupAffinity, +}; use windows::Win32::System::IO::DeviceIoControl; -const MSR_RAPL_POWER_UNIT: u16 = 0x606; // - //const MSR_PKG_POWER_LIMIT: u16 = 0x610; // PKG RAPL Power Limit Control (R/W) See Section 14.7.3, Package RAPL Domain. -const MSR_PKG_ENERGY_STATUS: u16 = 0x611; -//const MSR_PKG_POWER_INFO: u16 = 0x614; -//const MSR_DRAM_ENERGY_STATUS: u16 = 0x619; -//const MSR_PP0_ENERGY_STATUS: u16 = 0x639; //PP0 Energy Status (R/O) See Section 14.7.4, PP0/PP1 RAPL Domains. -//const MSR_PP0_PERF_STATUS: u16 = 0x63b; // PP0 Performance Throttling Status (R/O) See Section 14.7.4, PP0/PP1 RAPL Domains. -//const MSR_PP0_POLICY: u16 = 0x63a; //PP0 Balance Policy (R/W) See Section 14.7.4, PP0/PP1 RAPL Domains. -//const MSR_PP0_POWER_LIMIT: u16 = 0x638; // PP0 RAPL Power Limit Control (R/W) See Section 14.7.4, PP0/PP1 RAPL Domains. -//const MSR_PP1_ENERGY_STATUS: u16 = 0x641; // PP1 Energy Status (R/O) See Section 14.7.4, PP0/PP1 RAPL Domains. -//const MSR_PP1_POLICY: u16 = 0x642; // PP1 Balance Policy (R/W) See Section 14.7.4, PP0/PP1 RAPL Domains. -//const MSR_PP1_POWER_LIMIT: u16 = 0x640; // PP1 RAPL Power Limit Control (R/W) See Section 14.7.4, PP0/PP1 RAPL Domains. +use core_affinity::{self, CoreId}; + +pub use x86::cpuid; +// Intel RAPL MSRs +pub use x86::msr::{ + MSR_DRAM_ENERGY_STATUS, MSR_DRAM_PERF_STATUS, MSR_PKG_ENERGY_STATUS, MSR_PKG_POWER_INFO, + MSR_PKG_POWER_LIMIT, MSR_PP0_ENERGY_STATUS, MSR_PP0_PERF_STATUS, MSR_PP1_ENERGY_STATUS, + MSR_RAPL_POWER_UNIT, +}; +pub const MSR_PLATFORM_ENERGY_STATUS: u32 = 0x0000064d; +pub const MSR_PLATFORM_POWER_LIMIT: u32 = 0x0000065c; + +// AMD RAPL MSRs +pub const MSR_AMD_RAPL_POWER_UNIT: u32 = 0xc0010299; +pub const MSR_AMD_CORE_ENERGY_STATUS: u32 = 0xc001029a; +pub const MSR_AMD_PKG_ENERGY_STATUS: u32 = 0xc001029b; unsafe fn ctl_code(device_type: u32, request_code: u32, method: u32, access: u32) -> u32 { ((device_type) << 16) | ((access) << 14) | ((request_code) << 2) | (method) @@ -159,18 +169,48 @@ impl MsrRAPLSensor { impl RecordReader for Topology { fn read_record(&self) -> Result> { - let randval: i32 = rand::random(); - Ok(Record { - timestamp: current_system_time_since_epoch(), - unit: super::units::Unit::MicroJoule, - value: format!("{}", randval), - }) + let record: Option; + unsafe { + record = self.get_rapl_psys_energy_microjoules(); + } + if let Some(psys_record) = record { + Ok(psys_record) + } else { + let mut res: u128 = 0; + debug!("Topology: I have {} sockets", self.sockets.len()); + for s in &self.sockets { + match s.read_record() { + Ok(rec) => { + debug!("rec: {:?}", rec); + res += rec.value.trim().parse::()?; + } + Err(e) => { + warn!("Failed to get socket record : {:?}", e); + } + } + let dram_filter: Vec<&Domain> = s + .get_domains_passive() + .iter() + .filter(|d| d.name == "dram") + .collect(); + if let Some(dram) = dram_filter.first() { + if let Ok(val) = dram.read_record() { + res += val.value.trim().parse::()?; + } + } + } + Ok(Record { + timestamp: current_system_time_since_epoch(), + unit: super::units::Unit::MicroJoule, + value: res.to_string(), + }) + } } } unsafe fn send_request( device: HANDLE, - request_code: u16, + request_code: u32, request: *const u64, request_length: usize, reply: *mut u64, @@ -180,7 +220,7 @@ unsafe fn send_request( let len_ptr: *mut u32 = &mut len; if DeviceIoControl( - device, // envoi 8 octet et je recoi 8 octet + device, // send 8 bytes, receive 8 bytes crate::sensors::msr_rapl::ctl_code( FILE_DEVICE_UNKNOWN, request_code as _, @@ -205,80 +245,114 @@ unsafe fn send_request( info!("Device answered"); Ok(String::from("Device answered !")) } else { - error!("DeviceIoControl failed"); + info!("DeviceIoControl failed"); Err(String::from("DeviceIoControl failed")) } } impl RecordReader for CPUSocket { fn read_record(&self) -> Result> { unsafe { - let driver_name = self.sensor_data.get("DRIVER_NAME").unwrap(); - if let Ok(device) = get_handle(driver_name) { - let mut msr_result: u64 = 0; - let ptr_result = &mut msr_result as *mut u64; - let mut src = MSR_RAPL_POWER_UNIT as u64; - let ptr = &src as *const u64; - - src = MSR_PKG_ENERGY_STATUS as u64; - trace!("src: {:x}", src); - trace!("src: {:b}", src); - - trace!("*ptr: {}", *ptr); - trace!("&request: {:?} ptr (as *const u8): {:?}", &src, ptr); - - if let Ok(res) = send_request( - device, - MSR_PKG_ENERGY_STATUS, - // nouvelle version à integrer : request_code est ignoré et request doit contenir - // request_code sous forme d'un char * - ptr, - 8, - ptr_result, - size_of::(), - ) { - debug!("{}", res); - - close_handle(device); - - let energy_unit = self - .sensor_data - .get("ENERGY_UNIT") - .unwrap() - .parse::() - .unwrap(); - - Ok(Record { - timestamp: current_system_time_since_epoch(), - unit: super::units::Unit::MicroJoule, - value: MsrRAPLSensor::extract_rapl_current_power(msr_result, energy_unit), - }) + let current_thread = GetCurrentThread(); + let processorgroup_id = self + .sensor_data + .get("PROCESSORGROUP_ID") + .unwrap() + .parse::() + .unwrap(); + let mut thread_group_affinity: GROUP_AFFINITY = GROUP_AFFINITY { + Mask: 255, + Group: processorgroup_id, + Reserved: [0, 0, 0], + }; + let thread_affinity = + GetThreadGroupAffinity(current_thread, &mut thread_group_affinity); + if thread_affinity.as_bool() { + debug!("got thead_affinity : {:?}", thread_group_affinity); + let core_id = self.cpu_cores.last().unwrap().id; //(self.cpu_cores.last().unwrap().id + self.id * self.cpu_cores.len() as u16) as usize + let newaffinity = GROUP_AFFINITY { + Mask: self.cpu_cores.len() + self.id as usize * self.cpu_cores.len() - 1, + Group: processorgroup_id, + Reserved: [0, 0, 0], + }; + let res = SetThreadGroupAffinity( + current_thread, + &newaffinity, + &mut thread_group_affinity, + ); + if res.as_bool() { + debug!( + "Asking get_msr_value, from socket, with core_id={}", + core_id + ); + match get_msr_value( + core_id as usize, + MSR_PKG_ENERGY_STATUS as u64, + &self.sensor_data, + ) { + Ok(rec) => Ok(Record { + timestamp: current_system_time_since_epoch(), + value: rec.value, + unit: super::units::Unit::MicroJoule, + }), + Err(e) => { + error!( + "Could'nt get MSR value for {}: {}", + MSR_PKG_ENERGY_STATUS, e + ); + Ok(Record { + timestamp: current_system_time_since_epoch(), + value: String::from("0"), + unit: super::units::Unit::MicroJoule, + }) + } + } } else { - error!("Failed to get data from send_request."); - close_handle(device); - Ok(Record { - timestamp: current_system_time_since_epoch(), - unit: super::units::Unit::MicroJoule, - value: String::from("0"), - }) + panic!("Couldn't set Thread affinity !"); } + //TODO add DRAM domain to result when available } else { - error!("Couldn't get handle."); - Ok(Record { - timestamp: current_system_time_since_epoch(), - unit: super::units::Unit::MicroJoule, - value: String::from("0"), - }) + panic!("Coudld'nt get Thread affinity !"); } } } } impl RecordReader for Domain { fn read_record(&self) -> Result> { - Ok(Record { - timestamp: current_system_time_since_epoch(), - unit: super::units::Unit::MicroJoule, - value: String::from("10"), - }) + if let Some(core_id) = self.sensor_data.get("CORE_ID") { + let usize_coreid = core_id.parse::().unwrap(); + debug!("Reading Domain {} on Core {}", self.name, usize_coreid); + if let Some(msr_addr) = self.sensor_data.get("MSR_ADDR") { + unsafe { + debug!( + "Asking, from Domain, get_msr_value with core_id={}", + usize_coreid + ); + match get_msr_value( + usize_coreid, + msr_addr.parse::().unwrap(), + &self.sensor_data, + ) { + Ok(rec) => Ok(Record { + timestamp: current_system_time_since_epoch(), + unit: super::units::Unit::MicroJoule, + value: rec.value, + }), + Err(e) => { + error!("Could'nt get MSR value for {}: {}", msr_addr, e); + Ok(Record { + timestamp: current_system_time_since_epoch(), + value: String::from("0"), + unit: super::units::Unit::MicroJoule, + }) + } + } + } + } else { + panic!("Couldn't get msr_addr to target for domain {}", self.name); + } + } else { + panic!("Couldn't get core_id to target for domain {}", self.name); + } } } @@ -293,10 +367,344 @@ impl Sensor for MsrRAPLSensor { let mut topology = Topology::new(sensor_data.clone()); let mut sys = System::new_all(); sys.refresh_all(); - let i = 0; - //TODO fix that to actually count the number of sockets - topology.safe_add_socket(i, vec![], vec![], String::from(""), 4, sensor_data.clone()); + unsafe { + let current_thread = GetCurrentThread(); + + let group_count = GetActiveProcessorGroupCount(); + debug!("GROUP COUNT : {}", group_count); + + for group_id in 0..group_count { + //TODO fix that to actually count the number of sockets + let logical_cpus = sys.cpus(); + let cpuid = CpuId::new(); + let mut logical_cpus_from_cpuid = 1; + match cpuid.get_extended_topology_info() { + Some(info) => { + for t in info { + if t.level_type() == TopologyType::Core { + logical_cpus_from_cpuid = t.processors(); + } + } + } + None => { + panic!("Could'nt get cpuid data."); + } + } + let mut i: u16 = 0; + let mut no_more_sockets = false; + debug!("Entering ProcessorGroup {}", group_id); + let newaffinity = GROUP_AFFINITY { + Mask: 255, + Group: group_id, + Reserved: [0, 0, 0], + }; + let mut thread_group_affinity: GROUP_AFFINITY = GROUP_AFFINITY { + Mask: 255, + Group: 0, + Reserved: [0, 0, 0], + }; + let thread_affinity = + GetThreadGroupAffinity(current_thread, &mut thread_group_affinity); + debug!("Thread group affinity result : {:?}", thread_affinity); + if thread_affinity.as_bool() { + debug!("got thead_affinity : {:?}", thread_group_affinity); + let res = SetThreadGroupAffinity( + current_thread, + &newaffinity, + &mut thread_group_affinity, + ); + if res.as_bool() { + debug!("Have set thread affinity: {:?}", newaffinity); + match core_affinity::get_core_ids() { + Some(core_ids) => { + debug!( + "CPU SETUP - Cores from core_affinity, len={} : {:?}", + core_ids.len(), + core_ids + ); + debug!( + "CPU SETUP - Logical CPUs from sysinfo: {}", + logical_cpus.len() + ); + while !no_more_sockets { + let start = i * logical_cpus_from_cpuid; + let stop = (i + 1) * logical_cpus_from_cpuid; + debug!("Looping over {} .. {}", start, stop); + sensor_data.insert( + String::from("PROCESSORGROUP_ID"), + group_id.to_string(), + ); + let mut current_socket = CPUSocket::new( + i, + vec![], + vec![], + String::from(""), + 1, + sensor_data.clone(), + ); + for c in start..stop { + //core_ids { + if core_affinity::set_for_current(CoreId { id: c.into() }) { + match cpuid.get_vendor_info() { + Some(info) => { + debug!("Got CPU {:?}", info); + } + None => { + warn!("Couldn't get cpuinfo"); + } + } + debug!("Set core_affinity to {}", c); + match cpuid.get_extended_topology_info() { + Some(info) => { + debug!("Got CPU topo info {:?}", info); + for t in info { + if t.level_type() == TopologyType::Core { + //logical_cpus_from_cpuid = t.processors() + let x2apic_id = t.x2apic_id(); + let socket_id = (x2apic_id & 240) >> 4; // upper bits of x2apic_id are socket_id, mask them, then bit shift to get socket_id + current_socket.set_id(socket_id as u16); + let core_id = x2apic_id & 15; // 4 last bits of x2apic_id are the core_id (per-socket) + debug!( + "Found socketid={} and coreid={}", + socket_id, core_id + ); + let mut attributes = + HashMap::::new(); + let ref_core = + logical_cpus.first().unwrap(); + attributes.insert( + String::from("frequency"), + ref_core.frequency().to_string(), + ); + attributes.insert( + String::from("name"), + ref_core.name().to_string(), + ); + attributes.insert( + String::from("vendor_id"), + ref_core.vendor_id().to_string(), + ); + attributes.insert( + String::from("brand"), + ref_core.brand().to_string(), + ); + debug!( + "Adding core id {} to socket_id {}", + ((i * (logical_cpus_from_cpuid + - 1)) + + core_id as u16), + current_socket.id + ); + current_socket.add_cpu_core( + CPUCore::new( + (i * (logical_cpus_from_cpuid + - 1)) + + core_id as u16, + attributes, + ), + ); + debug!( + "Reviewing sockets : {:?}", + topology.get_sockets_passive() + ); + } + } + } + None => { + warn!("Couldn't get cpu topo info"); + } + } + } else { + no_more_sockets = true; + debug!( + "There's likely to be no more socket to explore." + ); + break; + } + } + if !no_more_sockets { + debug!("inserting socket {:?}", current_socket); + topology.safe_insert_socket(current_socket); + i += 1; + } + } + } + None => { + panic!("Could'nt get core ids from core_affinity."); + } + } + if let Some(info) = CpuId::new().get_extended_topology_info() { + for c in info { + if c.level_type() == TopologyType::Core { + debug!("CPUID : {:?}", c); + } + } + } + } else { + error!("Could'nt set thread affinity !"); + let last_error = GetLastError(); + panic!("Error was : {:?}", last_error); + } + } else { + error!("Getting thread group affinity failed !"); + let last_error = GetLastError(); + panic!("Error was: {:?}", last_error); // win32 error 122 is insufficient buffer + } + } + //let process_information = GetProcessInformation(current_process, , , ); + } + //let mut core_id_counter = logical_cpus.len(); + + //match cpuid.get_advanced_power_mgmt_info() { + // Some(info) => { + // warn!("Got CPU power mgmt info {:?}", info); + // }, + // None => { + // warn!("Couldn't get cpu power info"); + // } + //} + //match cpuid.get_extended_feature_info() { + // Some(info) => { + // warn!("Got CPU feature info {:?}", info); + // }, + // None => { + // warn!("Couldn't get cpu feature info"); + // } + //} + //match cpuid.get_performance_monitoring_info() { + // Some(info) => { + // warn!("Got CPU perfmonitoring info {:?}", info); + // }, + // None => { + // warn!("Couldn't get cpu perfmonitoring info"); + // } + //} + //match cpuid.get_thermal_power_info() { + // Some(info) => { + // warn!("Got CPU thermal info {:?}", info); + // }, + // None => { + // warn!("Couldn't get cpu thermal info"); + // } + //} + //match cpuid.get_extended_state_info() { + // Some(info) => { + // warn!("Got CPU state info {:?}", info); + // }, + // None => { + // warn!("Couldn't get cpu state info"); + // } + //} + //match cpuid.get_processor_capacity_feature_info() { + // Some(info) => { + // warn!("Got CPU capacity info {:?}", info); + // }, + // None => { + // warn!("Couldn't get cpu capacity info"); + // } + //} + + //topology.add_cpu_cores(); + let mut domains = vec![]; + for s in topology.get_sockets() { + debug!("Inspecting CPUSocket: {:?}", s); + unsafe { + let core_id = + s.get_cores_passive().last().unwrap().id + s.id * s.cpu_cores.len() as u16; + debug!( + "Asking get_msr_value, from generate_tpopo, with core_id={}", + core_id + ); + match get_msr_value( + core_id as usize, + MSR_DRAM_ENERGY_STATUS as u64, + &sensor_data, + ) { + Ok(_rec) => { + debug!("Adding domain Dram !"); + let mut domain_sensor_data = sensor_data.clone(); + domain_sensor_data + .insert(String::from("MSR_ADDR"), MSR_DRAM_ENERGY_STATUS.to_string()); + domain_sensor_data.insert(String::from("CORE_ID"), core_id.to_string()); // nb of cores in a socket * socket_id + local_core_id + domains.push(String::from("dram")); + s.safe_add_domain(Domain::new( + 2, + String::from("dram"), + String::from(""), + 5, + domain_sensor_data, + )) + } + Err(e) => { + warn!("Could'nt add Dram domain: {}", e); + } + } + match get_msr_value(core_id as usize, MSR_PP0_ENERGY_STATUS as u64, &sensor_data) { + Ok(_rec) => { + debug!("Adding domain Core !"); + let mut domain_sensor_data = sensor_data.clone(); + domain_sensor_data + .insert(String::from("MSR_ADDR"), MSR_PP0_ENERGY_STATUS.to_string()); + domain_sensor_data.insert(String::from("CORE_ID"), core_id.to_string()); + domains.push(String::from("core")); + s.safe_add_domain(Domain::new( + 2, + String::from("core"), + String::from(""), + 5, + domain_sensor_data, + )) + } + Err(e) => { + warn!("Could'nt add Core domain: {}", e); + } + } + match get_msr_value(core_id as usize, MSR_PP1_ENERGY_STATUS as u64, &sensor_data) { + Ok(_rec) => { + debug!("Adding domain Uncore !"); + let mut domain_sensor_data = sensor_data.clone(); + domain_sensor_data + .insert(String::from("MSR_ADDR"), MSR_PP1_ENERGY_STATUS.to_string()); + domain_sensor_data.insert(String::from("CORE_ID"), core_id.to_string()); + domains.push(String::from("uncore")); + s.safe_add_domain(Domain::new( + 2, + String::from("uncore"), + String::from(""), + 5, + domain_sensor_data, + )) + } + Err(e) => { + warn!("Could'nt add Uncore domain: {}", e); + } + } + //match get_msr_value(core_id as usize, MSR_PLATFORM_ENERGY_STATUS as u64, &sensor_data) { + // Ok(rec) => { + // }, + // Err(e) => { + // error!("Could'nt find Platform/PSYS domain."); + // } + //} + } + } + + unsafe { + match get_msr_value(0, MSR_PLATFORM_ENERGY_STATUS as u64, &sensor_data) { + Ok(_rec) => { + debug!("Adding domain Platform / PSYS !"); + topology + ._sensor_data + .insert(String::from("psys"), String::from("")); + } + Err(e) => { + warn!("Could'nt add Uncore domain: {}", e); + } + } + } + + topology.set_domains_names(domains); Ok(topology) } @@ -308,3 +716,105 @@ impl Sensor for MsrRAPLSensor { Box::new(topology) } } + +/// # Safety +/// +/// This function should is unsafe rust as it uses send_request, hence calls a DeviceIO Windows driver. +/// The safety burden actuallr resides in the DeviceIO driver that is called. Please refer to the documentation to +/// get the relationship between Scaphandre and its driver for Windows. The driver should exit smoothly if a wrong +/// MSR address is called, then this function should throw an Error. Any improper issue with the operating system would mean +/// there is an issue in the driver used behind the scene, or the way it is configured. +pub unsafe fn get_msr_value( + core_id: usize, + msr_addr: u64, + sensor_data: &HashMap, +) -> Result { + let current_process = GetCurrentProcess(); + let current_thread = GetCurrentThread(); + let mut thread_group_affinity = GROUP_AFFINITY { + Mask: 255, + Group: 9, + Reserved: [0, 0, 0], + }; + let thread_affinity_res = GetThreadGroupAffinity(current_thread, &mut thread_group_affinity); + if thread_affinity_res.as_bool() { + debug!("Thread affinity found : {:?}", thread_group_affinity); + } else { + error!("Could'nt get thread group affinity"); + } + let mut process_group_array: [u16; 8] = [0, 0, 0, 0, 0, 0, 0, 0]; + let mut process_group_array_len = 8; + let process_affinity_res = GetProcessGroupAffinity( + current_process, + &mut process_group_array_len, + process_group_array.as_mut_ptr(), + ); + if process_affinity_res.as_bool() { + debug!("Process affinity found: {:?}", process_group_array); + } else { + error!("Could'nt get process group affinity"); + error!("Error was : {:?}", GetLastError()); + } + debug!("Core ID requested to the driver : {}", core_id); + match sensor_data.get("DRIVER_NAME") { + Some(driver) => { + match get_handle(driver) { + Ok(device) => { + let mut msr_result: u64 = 0; + let ptr_result = &mut msr_result as *mut u64; + debug!("msr_addr: {:b}", msr_addr); + debug!("core_id: {:x} {:b}", (core_id as u64), (core_id as u64)); + debug!("core_id: {:b}", ((core_id as u64) << 32)); + let src = ((core_id as u64) << 32) | msr_addr; //let src = ((core_id as u64) << 32) | msr_addr; + let ptr = &src as *const u64; + + debug!("src: {:x}", src); + debug!("src: {:b}", src); + debug!("*ptr: {:b}", *ptr); + //warn!("*ptr: {}", *ptr); + //warn!("*ptr: {:b}", *ptr); + + match send_request( + device, + MSR_PKG_ENERGY_STATUS, + ptr, + 8, + ptr_result, + size_of::(), + ) { + Ok(_res) => { + close_handle(device); + + let energy_unit = sensor_data + .get("ENERGY_UNIT") + .unwrap() + .parse::() + .unwrap(); + let current_value = + MsrRAPLSensor::extract_rapl_current_power(msr_result, energy_unit); + debug!("current_value: {}", current_value); + + Ok(Record { + timestamp: current_system_time_since_epoch(), + unit: super::units::Unit::MicroJoule, + value: current_value, + }) + } + Err(e) => { + info!("Failed to get data from send_request: {:?}", e); + close_handle(device); + Err(format!("Failed to get data from send_request: {:?}", e)) + } + } + } + Err(e) => { + error!("Couldn't get driver handle : {:?}", e); + Err(format!("Couldn't get driver handle : {:?}", e)) + } + } + } + None => { + panic!("DRIVER_NAME not set."); + } + } +} diff --git a/src/sensors/utils.rs b/src/sensors/utils.rs index ba80b28d..2ba070ab 100644 --- a/src/sensors/utils.rs +++ b/src/sensors/utils.rs @@ -387,7 +387,7 @@ impl ProcessTracker { /// Returns all vectors of process records linked to a running, sleeping, waiting or zombie process. /// (Not terminated) pub fn get_alive_processes(&self) -> Vec<&Vec> { - debug!("In get alive processes."); + trace!("In get alive processes."); let mut res = vec![]; for p in self.procs.iter() { //#[cfg(target_os = "linux")] @@ -412,7 +412,7 @@ impl ProcessTracker { } } } - debug!("End of get alive processes."); + trace!("End of get alive processes."); res } @@ -632,6 +632,7 @@ impl ProcessTracker { if result.next().is_some() { panic!("Found two vectors of processes with the same id, maintainers should fix this."); } + debug!("End of get process name."); process.first().unwrap().process.comm.clone() } @@ -652,11 +653,9 @@ impl ProcessTracker { cmdline.push_str(&cmdline_vec.remove(0)); } } - debug!("End of get process cmdline."); return Some(cmdline); } } - debug!("End of get process cmdline."); None }