Skip to content

Commit

Permalink
add pause resume for dcgm (#100)
Browse files Browse the repository at this point in the history
Summary:
Pull Request resolved: #100

DCGM conflicts with other NV tools and libraries, adding pause and resume functionality for oss dynolog

Reviewed By: bigzachattack

Differential Revision:
D42753238

Privacy Context Container: L1137347

fbshipit-source-id: 48fb22a6e32e06f41a018191c1b0654e1dd03df7
  • Loading branch information
Hao Wang authored and facebook-github-bot committed Jan 31, 2023
1 parent 7d04a00 commit 740b9c8
Show file tree
Hide file tree
Showing 11 changed files with 213 additions and 23 deletions.
45 changes: 45 additions & 0 deletions cli/src/commands/dcgm.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
// Copyright (c) Meta Platforms, Inc. and affiliates.
//
// This source code is licensed under the MIT license found in the
// LICENSE file in the root directory of this source tree.

use std::net::TcpStream;

use anyhow::Result;

#[path = "utils.rs"]
mod utils;

// This module contains the handling logic for dcgm

/// Pause dcgm module profiling
pub fn run_dcgm_pause(client: TcpStream, duration_s: i32) -> Result<()> {
let request_json = format!(
r#"
{{
"fn": "dcgmProfPause",
"duration_s": {}
}}"#,
duration_s
);

utils::send_msg(&client, &request_json).expect("Error sending message to service");

let resp_str = utils::get_resp(&client).expect("Unable to decode output bytes");

println!("response = {}", resp_str);

Ok(())
}

/// Resume dcgm module profiling
pub fn run_dcgm_resume(client: TcpStream) -> Result<()> {
utils::send_msg(&client, r#"{"fn":"dcgmProfResume"}"#)
.expect("Error sending message to service");

let resp_str = utils::get_resp(&client).expect("Unable to decode output bytes");

println!("response = {}", resp_str);

Ok(())
}
1 change: 1 addition & 0 deletions cli/src/commands/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
// handling code. Additionally, explicitly "exporting" all the command modules here allows
// us to avoid having to explicitly list all the command modules in main.rs.

pub mod dcgm;
pub mod gputrace;
pub mod status;
// ... add new command modules here
10 changes: 10 additions & 0 deletions cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,14 @@ enum Command {
#[clap(long, default_value_t = 3)]
process_limit: u32,
},
/// Pause dcgm profiling. This enables running tools like Nsight compute and avoids conflicts.
DcgmPause {
/// Duration to pause dcgm profiling in seconds
#[clap(long, default_value_t = 300)]
duration_s: i32,
},
/// Resume dcgm profiling
DcgmResume,
}

/// Create a socket connection to dynolog
Expand Down Expand Up @@ -116,6 +124,8 @@ fn main() -> Result<()> {
profile_start_iteration_roundup,
process_limit,
),
Command::DcgmPause { duration_s } => dcgm::run_dcgm_pause(dyno_client, duration_s),
Command::DcgmResume => dcgm::run_dcgm_resume(dyno_client),
// ... add new commands here
}
}
24 changes: 12 additions & 12 deletions dynolog/src/Main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -125,11 +125,7 @@ auto setup_server(std::shared_ptr<ServiceHandler> handler) {
handler, FLAGS_port);
}

void gpu_monitor_loop() {
LOG(INFO) << "Setting up DCGM (GPU) monitoring.";
std::unique_ptr<gpumon::DcgmGroupInfo> dcgm = gpumon::DcgmGroupInfo::factory(
gpumon::FLAGS_dcgm_fields, FLAGS_dcgm_reporting_interval_s * 1000);

void gpu_monitor_loop(std::shared_ptr<gpumon::DcgmGroupInfo> dcgm) {
auto logger = getLogger(FLAGS_scribe_category);

LOG(INFO) << "Running DCGM loop : interval = "
Expand All @@ -155,12 +151,7 @@ int main(int argc, char** argv) {
LOG(INFO) << "Starting dynolog, version = " DYNOLOG_VERSION
<< ", build git-hash = " DYNOLOG_GIT_REV;

// setup service
auto handler = std::make_shared<ServiceHandler>();

// use simple json RPC server for now
auto server = setup_server(handler);
server->run();
std::shared_ptr<gpumon::DcgmGroupInfo> dcgm;

std::unique_ptr<tracing::IPCMonitor> ipcmon;
std::unique_ptr<std::thread> ipcmon_thread, gpumon_thread, pm_thread;
Expand All @@ -173,13 +164,22 @@ int main(int argc, char** argv) {
}

if (FLAGS_enable_gpu_monitor) {
gpumon_thread = std::make_unique<std::thread>(gpu_monitor_loop);
dcgm = gpumon::DcgmGroupInfo::factory(
gpumon::FLAGS_dcgm_fields, FLAGS_dcgm_reporting_interval_s * 1000);
gpumon_thread = std::make_unique<std::thread>(gpu_monitor_loop, dcgm);
}
std::thread km_thread{kernel_monitor_loop};
if (FLAGS_enable_perf_monitor) {
pm_thread = std::make_unique<std::thread>(perf_monitor_loop);
}

// setup service
auto handler = std::make_shared<ServiceHandler>(dcgm);

// use simple json RPC server for now
auto server = setup_server(handler);
server->run();

km_thread.join();
if (pm_thread) {
pm_thread->join();
Expand Down
2 changes: 1 addition & 1 deletion dynolog/src/ScubaLogger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

namespace dynolog {
constexpr int HOSTNAME_MAX = 50;
constexpr char kScubaUrl[] = "https://graph.facebook.com/scribe_logs";
constexpr char kScubaUrl[] = "http://graph.facebook.com/v2.2/scribe_logs";
DEFINE_string(
scribe_category,
"perfpipe_fair_cluster_gpu_stats",
Expand Down
14 changes: 14 additions & 0 deletions dynolog/src/ServiceHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,18 @@ GpuProfilerResult ServiceHandler::setKinetOnDemandRequest(
limit);
}

bool ServiceHandler::dcgmProfPause(int duration_s) {
if (dcgm_) {
return dcgm_->pauseProfiling(duration_s);
}
return false;
}

bool ServiceHandler::dcgmProfResume() {
if (dcgm_) {
return dcgm_->resumeProfiling();
}
return false;
}

} // namespace dynolog
8 changes: 8 additions & 0 deletions dynolog/src/ServiceHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include <set>
#include <string>
#include "dynolog/src/LibkinetoConfigManager.h"
#include "dynolog/src/gpumon/DcgmGroupInfo.h"

namespace dynolog {

Expand All @@ -17,6 +18,8 @@ namespace dynolog {

class ServiceHandler {
public:
explicit ServiceHandler(std::shared_ptr<gpumon::DcgmGroupInfo> dcgm)
: dcgm_(dcgm) {}
// returns the state of the service
int getStatus();

Expand All @@ -26,6 +29,11 @@ class ServiceHandler {
const std::string& config,
int limit);
// ... more to come
bool dcgmProfPause(int duration_s);
bool dcgmProfResume();

private:
std::shared_ptr<gpumon::DcgmGroupInfo> dcgm_;
};

} // namespace dynolog
49 changes: 44 additions & 5 deletions dynolog/src/gpumon/DcgmGroupInfo.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ static inline bool isProfField(unsigned short field_id) {
return field_id >= DCGM_FI_PROF_GR_ENGINE_ACTIVE;
}

std::unique_ptr<DcgmGroupInfo> DcgmGroupInfo::factory(
std::shared_ptr<DcgmGroupInfo> DcgmGroupInfo::factory(
const std::string& fields_str,
int updateIntervalMs) {
std::stringstream field_ss(fields_str);
Expand Down Expand Up @@ -124,7 +124,7 @@ std::unique_ptr<DcgmGroupInfo> DcgmGroupInfo::factory(
}
LOG(INFO) << ss.str();

auto dcgmGroupInfo = std::unique_ptr<DcgmGroupInfo>(
auto dcgmGroupInfo = std::shared_ptr<DcgmGroupInfo>(
new DcgmGroupInfo(fields, prof_fields, updateIntervalMs));
if (dcgmGroupInfo->isFailing()) {
return nullptr;
Expand Down Expand Up @@ -265,9 +265,9 @@ void DcgmGroupInfo::watchProfFields(
retCode_ != DCGM_ST_OK) {
errorCode_ = retCode_;
LOG(ERROR) << "Failed dcgmProfWatchFields() return: " << retCode_;
prof_enabled_ = false;
profEnabled_ = false;
} else {
prof_enabled_ = true;
profEnabled_ = true;
}
}

Expand Down Expand Up @@ -306,7 +306,7 @@ void DcgmGroupInfo::update() {
LOG(ERROR) << "Field id not supported, got: " << v.fieldId;
} else {
// skip prof field reporting if profiling is disabled
if (!prof_enabled_ && isProfField(v.fieldId)) {
if (profEnabled_ && isProfField(v.fieldId)) {
continue;
}

Expand Down Expand Up @@ -338,6 +338,17 @@ void DcgmGroupInfo::update() {
}
}
}

// if profiling disabled, check countdown timer to see if we should
// re-enable dcgm profiling
if (!profEnabled_) {
if (profPauseTimer_.count() <= 0) {
resumeProfiling();
} else {
std::lock_guard<std::mutex> wguard(profLock_);
profPauseTimer_ -= std::chrono::seconds(updateIntervalMs_ / 1000);
}
}
}

void DcgmGroupInfo::log(Logger& logger) {
Expand All @@ -362,6 +373,34 @@ void DcgmGroupInfo::log(Logger& logger) {
}
}

bool DcgmGroupInfo::pauseProfiling(int duration) {
std::lock_guard<std::mutex> wguard(profLock_);
LOG(INFO) << "Pausing dcgm profiling";
profPauseTimer_ = std::chrono::seconds(duration);
profEnabled_ = false;

if (retCode_ = dcgmProfPause_stub(dcgmHandle_); retCode_ != DCGM_ST_OK) {
errorCode_ = retCode_;
LOG(ERROR) << "Failed dcgmProfPause() return: " << retCode_;
return false;
}

return true;
}

bool DcgmGroupInfo::resumeProfiling() {
std::lock_guard<std::mutex> wguard(profLock_);
LOG(INFO) << "Resuming dcgm profiling";
profEnabled_ = true;
if (retCode_ = dcgmProfResume_stub(dcgmHandle_); retCode_ != DCGM_ST_OK) {
errorCode_ = retCode_;
LOG(ERROR) << "Failed dcgmProfResume() return: " << retCode_;
return false;
}

return true;
}

DcgmGroupInfo::~DcgmGroupInfo() {
dcgmProfUnwatchFields_t unwatchFields;
memset(&unwatchFields, 0, sizeof(unwatchFields));
Expand Down
14 changes: 9 additions & 5 deletions dynolog/src/gpumon/DcgmGroupInfo.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
#include <string.h>
#include <cstdint>
#include <memory>
#include <mutex>
#include <unordered_map>
#include <variant>
#include <vector>
#include "dynolog/src/Logger.h"
#include "dynolog/src/gpumon/dcgm_structs.h"
Expand All @@ -23,17 +23,19 @@ constexpr char kDcgmDefaultFieldIds[] =
class DcgmGroupInfo {
public:
~DcgmGroupInfo();
static std::unique_ptr<DcgmGroupInfo> factory(
static std::shared_ptr<DcgmGroupInfo> factory(
const std::string& fields_str,
int updateIntervalMs);
void update();
dcgmStatus_t getDcgmStatus() {
dcgmStatus_t getDcgmStatus() const {
return errorCode_;
}
bool isFailing() {
bool isFailing() const {
return errorCode_ != DCGM_ST_OK;
}
void log(Logger& logger);
bool pauseProfiling(int duration_s);
bool resumeProfiling();

private:
DcgmGroupInfo(
Expand All @@ -48,7 +50,7 @@ class DcgmGroupInfo {

std::vector<unsigned int> gpuIdList_;
int deviceCount_ = 0;
bool prof_enabled_ = false;
bool profEnabled_ = false;
int updateIntervalMs_;
dcgmReturn_t errorCode_{DCGM_ST_OK};
dcgmReturn_t retCode_{DCGM_ST_OK};
Expand All @@ -64,6 +66,8 @@ class DcgmGroupInfo {
std::unordered_map<int, std::unordered_map<std::string, std::string>>
envMetadataMapString_;
std::vector<dcgmFieldGrp_t> fieldGroupIds_;
std::mutex profLock_;
std::chrono::seconds profPauseTimer_;
};

} // namespace gpumon
Expand Down
11 changes: 11 additions & 0 deletions dynolog/src/rpc/SimpleJsonServerInl.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,17 @@ std::string SimpleJsonServer<TServiceHandler>::processOneImpl(
fmt::format("failed with exception = {}", ex.what());
}
}
} else if (request["fn"] == "dcgmProfPause") {
if (!request.contains("duration_s")) {
response["status"] = "failed";
} else {
int duration_s = request.value("duration_s", 300);
bool result = handler_->dcgmProfPause(duration_s);
response["status"] = result;
}
} else if (request["fn"] == "dcgmProfResume") {
bool result = handler_->dcgmProfResume();
response["status"] = result;
} else {
LOG(ERROR) << "Unknown RPC call = " << request["fn"];
return "";
Expand Down
Loading

0 comments on commit 740b9c8

Please sign in to comment.