diff --git a/cli/src/commands/dcgm.rs b/cli/src/commands/dcgm.rs new file mode 100644 index 00000000..89fd9ec9 --- /dev/null +++ b/cli/src/commands/dcgm.rs @@ -0,0 +1,45 @@ +// Copyright (c) Meta Platforms, Inc. and affiliates. +// +// This source code is licensed under the MIT license found in the +// LICENSE file in the root directory of this source tree. + +use std::net::TcpStream; + +use anyhow::Result; + +#[path = "utils.rs"] +mod utils; + +// This module contains the handling logic for dcgm + +/// Pause dcgm module profiling +pub fn run_dcgm_pause(client: TcpStream, duration_s: i32) -> Result<()> { + let request_json = format!( + r#" +{{ + "fn": "dcgmProfPause", + "duration_s": {} +}}"#, + duration_s + ); + + utils::send_msg(&client, &request_json).expect("Error sending message to service"); + + let resp_str = utils::get_resp(&client).expect("Unable to decode output bytes"); + + println!("response = {}", resp_str); + + Ok(()) +} + +/// Resume dcgm module profiling +pub fn run_dcgm_resume(client: TcpStream) -> Result<()> { + utils::send_msg(&client, r#"{"fn":"dcgmProfResume"}"#) + .expect("Error sending message to service"); + + let resp_str = utils::get_resp(&client).expect("Unable to decode output bytes"); + + println!("response = {}", resp_str); + + Ok(()) +} diff --git a/cli/src/commands/mod.rs b/cli/src/commands/mod.rs index f1725177..0c72dd1c 100644 --- a/cli/src/commands/mod.rs +++ b/cli/src/commands/mod.rs @@ -9,6 +9,7 @@ // handling code. Additionally, explicitly "exporting" all the command modules here allows // us to avoid having to explicitly list all the command modules in main.rs. +pub mod dcgm; pub mod gputrace; pub mod status; // ... add new command modules here diff --git a/cli/src/main.rs b/cli/src/main.rs index 1d84e958..4461533c 100644 --- a/cli/src/main.rs +++ b/cli/src/main.rs @@ -72,6 +72,14 @@ enum Command { #[clap(long, default_value_t = 3)] process_limit: u32, }, + /// Pause dcgm profiling. This enables running tools like Nsight compute and avoids conflicts. + DcgmPause { + /// Duration to pause dcgm profiling in seconds + #[clap(long, default_value_t = 300)] + duration_s: i32, + }, + /// Resume dcgm profiling + DcgmResume, } /// Create a socket connection to dynolog @@ -116,6 +124,8 @@ fn main() -> Result<()> { profile_start_iteration_roundup, process_limit, ), + Command::DcgmPause { duration_s } => dcgm::run_dcgm_pause(dyno_client, duration_s), + Command::DcgmResume => dcgm::run_dcgm_resume(dyno_client), // ... add new commands here } } diff --git a/dynolog/src/Main.cpp b/dynolog/src/Main.cpp index 45de06f4..ae275ac9 100644 --- a/dynolog/src/Main.cpp +++ b/dynolog/src/Main.cpp @@ -125,11 +125,7 @@ auto setup_server(std::shared_ptr handler) { handler, FLAGS_port); } -void gpu_monitor_loop() { - LOG(INFO) << "Setting up DCGM (GPU) monitoring."; - std::unique_ptr dcgm = gpumon::DcgmGroupInfo::factory( - gpumon::FLAGS_dcgm_fields, FLAGS_dcgm_reporting_interval_s * 1000); - +void gpu_monitor_loop(std::shared_ptr dcgm) { auto logger = getLogger(FLAGS_scribe_category); LOG(INFO) << "Running DCGM loop : interval = " @@ -155,12 +151,7 @@ int main(int argc, char** argv) { LOG(INFO) << "Starting dynolog, version = " DYNOLOG_VERSION << ", build git-hash = " DYNOLOG_GIT_REV; - // setup service - auto handler = std::make_shared(); - - // use simple json RPC server for now - auto server = setup_server(handler); - server->run(); + std::shared_ptr dcgm; std::unique_ptr ipcmon; std::unique_ptr ipcmon_thread, gpumon_thread, pm_thread; @@ -173,13 +164,22 @@ int main(int argc, char** argv) { } if (FLAGS_enable_gpu_monitor) { - gpumon_thread = std::make_unique(gpu_monitor_loop); + dcgm = gpumon::DcgmGroupInfo::factory( + gpumon::FLAGS_dcgm_fields, FLAGS_dcgm_reporting_interval_s * 1000); + gpumon_thread = std::make_unique(gpu_monitor_loop, dcgm); } std::thread km_thread{kernel_monitor_loop}; if (FLAGS_enable_perf_monitor) { pm_thread = std::make_unique(perf_monitor_loop); } + // setup service + auto handler = std::make_shared(dcgm); + + // use simple json RPC server for now + auto server = setup_server(handler); + server->run(); + km_thread.join(); if (pm_thread) { pm_thread->join(); diff --git a/dynolog/src/ScubaLogger.cpp b/dynolog/src/ScubaLogger.cpp index 12e04b70..f35d7e73 100644 --- a/dynolog/src/ScubaLogger.cpp +++ b/dynolog/src/ScubaLogger.cpp @@ -17,7 +17,7 @@ namespace dynolog { constexpr int HOSTNAME_MAX = 50; -constexpr char kScubaUrl[] = "https://graph.facebook.com/scribe_logs"; +constexpr char kScubaUrl[] = "http://graph.facebook.com/v2.2/scribe_logs"; DEFINE_string( scribe_category, "perfpipe_fair_cluster_gpu_stats", diff --git a/dynolog/src/ServiceHandler.cpp b/dynolog/src/ServiceHandler.cpp index f1f0a0ef..091e78ae 100644 --- a/dynolog/src/ServiceHandler.cpp +++ b/dynolog/src/ServiceHandler.cpp @@ -27,4 +27,18 @@ GpuProfilerResult ServiceHandler::setKinetOnDemandRequest( limit); } +bool ServiceHandler::dcgmProfPause(int duration_s) { + if (dcgm_) { + return dcgm_->pauseProfiling(duration_s); + } + return false; +} + +bool ServiceHandler::dcgmProfResume() { + if (dcgm_) { + return dcgm_->resumeProfiling(); + } + return false; +} + } // namespace dynolog diff --git a/dynolog/src/ServiceHandler.h b/dynolog/src/ServiceHandler.h index e070cfb7..aeca24cd 100644 --- a/dynolog/src/ServiceHandler.h +++ b/dynolog/src/ServiceHandler.h @@ -8,6 +8,7 @@ #include #include #include "dynolog/src/LibkinetoConfigManager.h" +#include "dynolog/src/gpumon/DcgmGroupInfo.h" namespace dynolog { @@ -17,6 +18,8 @@ namespace dynolog { class ServiceHandler { public: + explicit ServiceHandler(std::shared_ptr dcgm) + : dcgm_(dcgm) {} // returns the state of the service int getStatus(); @@ -26,6 +29,11 @@ class ServiceHandler { const std::string& config, int limit); // ... more to come + bool dcgmProfPause(int duration_s); + bool dcgmProfResume(); + + private: + std::shared_ptr dcgm_; }; } // namespace dynolog diff --git a/dynolog/src/gpumon/DcgmGroupInfo.cpp b/dynolog/src/gpumon/DcgmGroupInfo.cpp index 8f8d8e8c..af4e8dbc 100644 --- a/dynolog/src/gpumon/DcgmGroupInfo.cpp +++ b/dynolog/src/gpumon/DcgmGroupInfo.cpp @@ -94,7 +94,7 @@ static inline bool isProfField(unsigned short field_id) { return field_id >= DCGM_FI_PROF_GR_ENGINE_ACTIVE; } -std::unique_ptr DcgmGroupInfo::factory( +std::shared_ptr DcgmGroupInfo::factory( const std::string& fields_str, int updateIntervalMs) { std::stringstream field_ss(fields_str); @@ -124,7 +124,7 @@ std::unique_ptr DcgmGroupInfo::factory( } LOG(INFO) << ss.str(); - auto dcgmGroupInfo = std::unique_ptr( + auto dcgmGroupInfo = std::shared_ptr( new DcgmGroupInfo(fields, prof_fields, updateIntervalMs)); if (dcgmGroupInfo->isFailing()) { return nullptr; @@ -265,9 +265,9 @@ void DcgmGroupInfo::watchProfFields( retCode_ != DCGM_ST_OK) { errorCode_ = retCode_; LOG(ERROR) << "Failed dcgmProfWatchFields() return: " << retCode_; - prof_enabled_ = false; + profEnabled_ = false; } else { - prof_enabled_ = true; + profEnabled_ = true; } } @@ -306,7 +306,7 @@ void DcgmGroupInfo::update() { LOG(ERROR) << "Field id not supported, got: " << v.fieldId; } else { // skip prof field reporting if profiling is disabled - if (!prof_enabled_ && isProfField(v.fieldId)) { + if (profEnabled_ && isProfField(v.fieldId)) { continue; } @@ -338,6 +338,17 @@ void DcgmGroupInfo::update() { } } } + + // if profiling disabled, check countdown timer to see if we should + // re-enable dcgm profiling + if (!profEnabled_) { + if (profPauseTimer_.count() <= 0) { + resumeProfiling(); + } else { + std::lock_guard wguard(profLock_); + profPauseTimer_ -= std::chrono::seconds(updateIntervalMs_ / 1000); + } + } } void DcgmGroupInfo::log(Logger& logger) { @@ -362,6 +373,34 @@ void DcgmGroupInfo::log(Logger& logger) { } } +bool DcgmGroupInfo::pauseProfiling(int duration) { + std::lock_guard wguard(profLock_); + LOG(INFO) << "Pausing dcgm profiling"; + profPauseTimer_ = std::chrono::seconds(duration); + profEnabled_ = false; + + if (retCode_ = dcgmProfPause_stub(dcgmHandle_); retCode_ != DCGM_ST_OK) { + errorCode_ = retCode_; + LOG(ERROR) << "Failed dcgmProfPause() return: " << retCode_; + return false; + } + + return true; +} + +bool DcgmGroupInfo::resumeProfiling() { + std::lock_guard wguard(profLock_); + LOG(INFO) << "Resuming dcgm profiling"; + profEnabled_ = true; + if (retCode_ = dcgmProfResume_stub(dcgmHandle_); retCode_ != DCGM_ST_OK) { + errorCode_ = retCode_; + LOG(ERROR) << "Failed dcgmProfResume() return: " << retCode_; + return false; + } + + return true; +} + DcgmGroupInfo::~DcgmGroupInfo() { dcgmProfUnwatchFields_t unwatchFields; memset(&unwatchFields, 0, sizeof(unwatchFields)); diff --git a/dynolog/src/gpumon/DcgmGroupInfo.h b/dynolog/src/gpumon/DcgmGroupInfo.h index f514c07e..83453630 100644 --- a/dynolog/src/gpumon/DcgmGroupInfo.h +++ b/dynolog/src/gpumon/DcgmGroupInfo.h @@ -7,8 +7,8 @@ #include #include #include +#include #include -#include #include #include "dynolog/src/Logger.h" #include "dynolog/src/gpumon/dcgm_structs.h" @@ -23,17 +23,19 @@ constexpr char kDcgmDefaultFieldIds[] = class DcgmGroupInfo { public: ~DcgmGroupInfo(); - static std::unique_ptr factory( + static std::shared_ptr factory( const std::string& fields_str, int updateIntervalMs); void update(); - dcgmStatus_t getDcgmStatus() { + dcgmStatus_t getDcgmStatus() const { return errorCode_; } - bool isFailing() { + bool isFailing() const { return errorCode_ != DCGM_ST_OK; } void log(Logger& logger); + bool pauseProfiling(int duration_s); + bool resumeProfiling(); private: DcgmGroupInfo( @@ -48,7 +50,7 @@ class DcgmGroupInfo { std::vector gpuIdList_; int deviceCount_ = 0; - bool prof_enabled_ = false; + bool profEnabled_ = false; int updateIntervalMs_; dcgmReturn_t errorCode_{DCGM_ST_OK}; dcgmReturn_t retCode_{DCGM_ST_OK}; @@ -64,6 +66,8 @@ class DcgmGroupInfo { std::unordered_map> envMetadataMapString_; std::vector fieldGroupIds_; + std::mutex profLock_; + std::chrono::seconds profPauseTimer_; }; } // namespace gpumon diff --git a/dynolog/src/rpc/SimpleJsonServerInl.h b/dynolog/src/rpc/SimpleJsonServerInl.h index 9dff1b2a..56489fa1 100644 --- a/dynolog/src/rpc/SimpleJsonServerInl.h +++ b/dynolog/src/rpc/SimpleJsonServerInl.h @@ -100,6 +100,17 @@ std::string SimpleJsonServer::processOneImpl( fmt::format("failed with exception = {}", ex.what()); } } + } else if (request["fn"] == "dcgmProfPause") { + if (!request.contains("duration_s")) { + response["status"] = "failed"; + } else { + int duration_s = request.value("duration_s", 300); + bool result = handler_->dcgmProfPause(duration_s); + response["status"] = result; + } + } else if (request["fn"] == "dcgmProfResume") { + bool result = handler_->dcgmProfResume(); + response["status"] = result; } else { LOG(ERROR) << "Unknown RPC call = " << request["fn"]; return ""; diff --git a/dynolog/tests/rpc/SimpleJsonClientTest.cpp b/dynolog/tests/rpc/SimpleJsonClientTest.cpp index 7f222df7..7d9d0654 100644 --- a/dynolog/tests/rpc/SimpleJsonClientTest.cpp +++ b/dynolog/tests/rpc/SimpleJsonClientTest.cpp @@ -38,6 +38,19 @@ struct MockServiceHandler { return result; } + bool dcgmProfPause(int duration_s) { + duration_s_ = duration_s; + dcgm_prof_enabled_ = false; + dcgmProfPauseCalls_++; + return true; + } + + bool dcgmProfResume() { + dcgm_prof_enabled_ = true; + dcgmProfResumeCalls_++; + return true; + } + int status_ = 0; int statusCalls_ = 0; GpuProfilerResult result; @@ -47,6 +60,10 @@ struct MockServiceHandler { std::set pids_; std::string config_; int limit_ = -1; + int duration_s_ = -1; + bool dcgm_prof_enabled_ = true; + int dcgmProfPauseCalls_ = 0; + int dcgmProfResumeCalls_ = 0; }; using TestSimpleJsonServer = SimpleJsonServer; @@ -202,4 +219,45 @@ TEST_F(SimpleJsonClientTest, SetKinetoOnDemandRequestTest) { resp["activityProfilersBusy"], expected_result.activityProfilersBusy); } +TEST_F(SimpleJsonClientTest, DcgmTest) { + rpc_ready_.notify_one(); + + auto client = std::make_unique("::1", port_); + ASSERT_TRUE(client->initSuccessful()) + << "Failed to connect to port " << port_; + + json request_pause{ + {"fn", "dcgmProfPause"}, + {"duration_s", 100}, + }; + + auto resp_str = client->invoke_rpc(request_pause.dump()); + client.reset(); // disconnect + + ASSERT_TRUE(resp_str) << "Null response on dcgmProfPause()"; + EXPECT_EQ(handler_->dcgmProfPauseCalls_, 1); + EXPECT_EQ(handler_->dcgm_prof_enabled_, false); + EXPECT_EQ(handler_->duration_s_, 100); + + json resp = json::parse(resp_str.value()); + EXPECT_EQ(resp["status"], true); + + rpc_ready_.notify_one(); + + json request_resume{ + {"fn", "dcgmProfResume"}, + }; + // Create a new connection for new rpc + client = std::make_unique("::1", port_); + resp_str = client->invoke_rpc(request_resume.dump()); + client.reset(); // disconnect + + ASSERT_TRUE(resp_str) << "Null response on dcgmProfResume()"; + EXPECT_EQ(handler_->dcgmProfResumeCalls_, 1); + + resp = json::parse(resp_str.value()); + EXPECT_EQ(resp["status"], true); + EXPECT_EQ(handler_->dcgm_prof_enabled_, true); +} + } // namespace dynolog