diff --git a/.azure-pipelines/ut.yml b/.azure-pipelines/ut.yml index 5e950606e..aaabc716e 100644 --- a/.azure-pipelines/ut.yml +++ b/.azure-pipelines/ut.yml @@ -241,7 +241,7 @@ jobs: cd /root/mscclpp; \ rm -rf ./npkit_dump && mkdir ./npkit_dump && rm -rf ./npkit_output && mkdir ./npkit_output; \ export PATH=/usr/local/mpi/bin:\$PATH; \ - export NPKIT_DUMP_DIR=./npkit_dump; \ + export MSCCLPP_NPKIT_DUMP_DIR=./npkit_dump; \ export LD_LIBRARY_PATH=/root/mscclpp/build:\$LD_LIBRARY_PATH; \ mpirun --allow-run-as-root -tag-output -np 2 ./build/test/mp_unit_tests --gtest_filter=\"ExecutorTest.TwoNodesAllreduce\"; \ python3 ./tools/npkit/npkit_trace_generator.py --npkit_dump_dir=./npkit_dump --npkit_event_header_path=./include/mscclpp/npkit/npkit_event.hpp --output_dir=./npkit_output; \ @@ -270,7 +270,7 @@ jobs: cd /root/mscclpp; \ rm -rf ./npkit_dump && mkdir ./npkit_dump && rm -rf ./npkit_output && mkdir ./npkit_output; \ export PATH=/usr/local/mpi/bin:\$PATH; \ - export NPKIT_DUMP_DIR=./npkit_dump; \ + export MSCCLPP_NPKIT_DUMP_DIR=./npkit_dump; \ export LD_LIBRARY_PATH=/root/mscclpp/build:\$LD_LIBRARY_PATH; \ mpirun --allow-run-as-root -tag-output -x MSCCLPP_HOME=/root/mscclpp -np 8 python3 -m pytest ./python/test/test_mscclpp.py -x -k 'test_executor[allreduce.json'; \ python3 ./tools/npkit/npkit_trace_generator.py --npkit_dump_dir=./npkit_dump --npkit_event_header_path=./include/mscclpp/npkit/npkit_event.hpp --output_dir=./npkit_output; \ diff --git a/apps/nccl/src/nccl.cu b/apps/nccl/src/nccl.cu index 734ec6a04..f91d15e69 100644 --- a/apps/nccl/src/nccl.cu +++ b/apps/nccl/src/nccl.cu @@ -5,6 +5,7 @@ #include #include #include +#include #include #include #include @@ -414,8 +415,8 @@ NCCL_API ncclResult_t ncclCommInitRank(ncclComm_t* comm, int nranks, ncclUniqueI if (mscclppComm->bootstrap()->getNranks() == mscclppComm->bootstrap()->getNranksPerNode()) ncclCommInitRankFallbackSingleNode(commPtr, mscclppComm, rank); - if (getenv("MSCCLPP_EXECUTION_PLAN_DIR")) { - std::string collectiveDir = getenv("MSCCLPP_EXECUTION_PLAN_DIR"); + const std::string& collectiveDir = mscclpp::env()->executionPlanDir; + if (collectiveDir != "") { if (!std::filesystem::is_directory(collectiveDir)) { WARN("The value of the environment variable %s is not a directory", collectiveDir.c_str()); return ncclInvalidArgument; @@ -430,8 +431,7 @@ NCCL_API ncclResult_t ncclCommInitRank(ncclComm_t* comm, int nranks, ncclUniqueI *comm = commPtr; #if defined(ENABLE_NPKIT) - const char* npkitDumpDir = getenv("NPKIT_DUMP_DIR"); - if (npkitDumpDir != nullptr) { + if (mscclpp::env()->npkitDumpDir != "") { NpKit::Init(rank); } #endif @@ -455,8 +455,8 @@ NCCL_API ncclResult_t ncclCommDestroy(ncclComm_t comm) { return ncclInvalidArgument; } #if defined(ENABLE_NPKIT) - const char* npkitDumpDir = getenv("NPKIT_DUMP_DIR"); - if (npkitDumpDir != nullptr) { + const std::string& npkitDumpDir = mscclpp::env()->npkitDumpDir; + if (npkitDumpDir != "") { NpKit::Dump(npkitDumpDir); NpKit::Shutdown(); } diff --git a/include/mscclpp/env.hpp b/include/mscclpp/env.hpp new file mode 100644 index 000000000..6708628bd --- /dev/null +++ b/include/mscclpp/env.hpp @@ -0,0 +1,42 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +#ifndef MSCCLPP_ENV_HPP_ +#define MSCCLPP_ENV_HPP_ + +#include +#include + +namespace mscclpp { + +class Env; + +/// Get the MSCCL++ environment. +/// @return A reference to the global environment object. +std::shared_ptr env(); + +/// The MSCCL++ environment. The constructor reads environment variables and sets the corresponding fields. +/// Use the @ref env() function to get the environment object. +class Env { + public: + const std::string debug; + const std::string debugSubsys; + const std::string debugFile; + const std::string hcaDevices; + const std::string hostid; + const std::string socketFamily; + const std::string socketIfname; + const std::string commId; + const std::string executionPlanDir; + const std::string npkitDumpDir; + const bool cudaIpcUseDefaultStream; + + private: + Env(); + + friend std::shared_ptr env(); +}; + +} // namespace mscclpp + +#endif // MSCCLPP_ENV_HPP_ diff --git a/include/mscclpp/gpu_utils.hpp b/include/mscclpp/gpu_utils.hpp index 2392f7484..210999212 100644 --- a/include/mscclpp/gpu_utils.hpp +++ b/include/mscclpp/gpu_utils.hpp @@ -46,8 +46,11 @@ struct AvoidCudaGraphCaptureGuard { /// A RAII wrapper around cudaStream_t that will call cudaStreamDestroy on destruction. struct CudaStreamWithFlags { + CudaStreamWithFlags() : stream_(nullptr) {} CudaStreamWithFlags(unsigned int flags); ~CudaStreamWithFlags(); + void set(unsigned int flags); + bool empty() const; operator cudaStream_t() const { return stream_; } cudaStream_t stream_; }; diff --git a/pyproject.toml b/pyproject.toml index d5e6d3230..59597fbe0 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -11,6 +11,7 @@ version = "0.6.0" [tool.scikit-build] cmake.version = ">=3.25.0" +cmake.build-type = "Release" build-dir = "build/{wheel_tag}" wheel.packages = ["python/mscclpp", "python/mscclpp_benchmark"] wheel.install-dir = "mscclpp" diff --git a/python/mscclpp/__init__.py b/python/mscclpp/__init__.py index c66624cab..678379ac2 100644 --- a/python/mscclpp/__init__.py +++ b/python/mscclpp/__init__.py @@ -4,6 +4,7 @@ import os as _os from ._mscclpp import ( + Env, ErrorCode, BaseError, Error, @@ -32,6 +33,7 @@ ExecutionPlan, PacketType, RawGpuBuffer, + env, version, is_nvls_supported, npkit, diff --git a/python/mscclpp/core_py.cpp b/python/mscclpp/core_py.cpp index 95048f18b..90ee22860 100644 --- a/python/mscclpp/core_py.cpp +++ b/python/mscclpp/core_py.cpp @@ -13,6 +13,7 @@ namespace nb = nanobind; using namespace mscclpp; +extern void register_env(nb::module_& m); extern void register_error(nb::module_& m); extern void register_proxy_channel(nb::module_& m); extern void register_sm_channel(nb::module_& m); @@ -184,6 +185,7 @@ void register_core(nb::module_& m) { } NB_MODULE(_mscclpp, m) { + register_env(m); register_error(m); register_proxy_channel(m); register_sm_channel(m); diff --git a/python/mscclpp/env_py.cpp b/python/mscclpp/env_py.cpp new file mode 100644 index 000000000..a0ba4a4e8 --- /dev/null +++ b/python/mscclpp/env_py.cpp @@ -0,0 +1,28 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +#include +#include +#include + +#include + +namespace nb = nanobind; +using namespace mscclpp; + +void register_env(nb::module_& m) { + nb::class_(m, "Env") + .def_ro("debug", &Env::debug) + .def_ro("debug_subsys", &Env::debugSubsys) + .def_ro("debug_file", &Env::debugFile) + .def_ro("hca_devices", &Env::hcaDevices) + .def_ro("hostid", &Env::hostid) + .def_ro("socket_family", &Env::socketFamily) + .def_ro("socket_ifname", &Env::socketIfname) + .def_ro("comm_id", &Env::commId) + .def_ro("execution_plan_dir", &Env::executionPlanDir) + .def_ro("npkit_dump_dir", &Env::npkitDumpDir) + .def_ro("cuda_ipc_use_default_stream", &Env::cudaIpcUseDefaultStream); + + m.def("env", &env); +} diff --git a/python/test/executor_test.py b/python/test/executor_test.py index c973ae0e9..8553e4651 100644 --- a/python/test/executor_test.py +++ b/python/test/executor_test.py @@ -8,6 +8,7 @@ ExecutionPlan, PacketType, npkit, + env, ) import mscclpp.comm as mscclpp_comm from mscclpp.utils import KernelBuilder, GpuBuffer, pack @@ -171,8 +172,8 @@ def main( mscclpp_group = mscclpp_comm.CommGroup(MPI.COMM_WORLD) cp.cuda.Device(mscclpp_group.my_rank % mscclpp_group.nranks_per_node).use() executor = Executor(mscclpp_group.communicator) - npkit_dump_dir = os.getenv("NPKIT_DUMP_DIR") - if npkit_dump_dir is not None: + npkit_dump_dir = env().npkit_dump_dir + if npkit_dump_dir != "": npkit.init(mscclpp_group.my_rank) execution_plan = ExecutionPlan(execution_plan_path) collective = execution_plan.collective() diff --git a/python/test/test_mscclpp.py b/python/test/test_mscclpp.py index 691a169a9..1a5f99c42 100644 --- a/python/test/test_mscclpp.py +++ b/python/test/test_mscclpp.py @@ -27,6 +27,7 @@ Transport, is_nvls_supported, npkit, + env, ) import mscclpp.comm as mscclpp_comm from mscclpp.utils import KernelBuilder, GpuBuffer, pack @@ -36,6 +37,19 @@ ethernet_interface_name = "eth0" +@parametrize_mpi_groups(1) +def test_env(mpi_group: MpiGroup): + e = env() + assert isinstance(e.debug, str) + with pytest.raises(AttributeError): + # all attributes should be read-only + e.debug = "INFO" + + # should be the same object + e2 = env() + assert e == e2 + + def all_ranks_on_the_same_node(mpi_group: MpiGroup): if (ethernet_interface_name in ni.interfaces()) is False: pytest.skip(f"{ethernet_interface_name} is not an interface to use on this node") @@ -624,8 +638,8 @@ def test_executor(mpi_group: MpiGroup, filename: str): project_dir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) mscclpp_group = mscclpp_comm.CommGroup(mpi_group.comm) executor = Executor(mscclpp_group.communicator) - npkit_dump_dir = os.getenv("NPKIT_DUMP_DIR") - if npkit_dump_dir is not None: + npkit_dump_dir = env().npkit_dump_dir + if npkit_dump_dir != "": npkit.init(mscclpp_group.my_rank) execution_plan = ExecutionPlan(os.path.join(project_dir, "test", "execution-files", filename)) diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 45b4075d2..5d5275403 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -1,6 +1,6 @@ # Copyright (c) Microsoft Corporation. # Licensed under the MIT license. -file(GLOB_RECURSE SOURCES CONFIGURE_DEPENDS *.cc *.cu) +file(GLOB_RECURSE SOURCES CONFIGURE_DEPENDS *.cc *.cpp *.cu) target_sources(mscclpp_obj PRIVATE ${SOURCES}) target_include_directories(mscclpp_obj PRIVATE include) diff --git a/src/bootstrap/socket.cc b/src/bootstrap/socket.cc index b9216e783..8c012c281 100644 --- a/src/bootstrap/socket.cc +++ b/src/bootstrap/socket.cc @@ -12,6 +12,7 @@ #include #include +#include #include #include #include @@ -64,14 +65,12 @@ static uint16_t socketToPort(union SocketAddress* addr) { /* Allow the user to force the IPv4/IPv6 interface selection */ static int envSocketFamily(void) { int family = -1; // Family selection is not forced, will use first one found - char* env = getenv("MSCCLPP_SOCKET_FAMILY"); - if (env == NULL) return family; + const std::string& socketFamily = env()->socketFamily; + if (socketFamily == "") return family; - INFO(MSCCLPP_ENV, "MSCCLPP_SOCKET_FAMILY set by environment to %s", env); - - if (strcmp(env, "AF_INET") == 0) + if (socketFamily == "AF_INET") family = AF_INET; // IPv4 - else if (strcmp(env, "AF_INET6") == 0) + else if (socketFamily == "AF_INET6") family = AF_INET6; // IPv6 return family; } @@ -306,27 +305,25 @@ int FindInterfaces(char* ifNames, union SocketAddress* ifAddrs, int ifNameMaxSiz // Allow user to force the INET socket family selection int sock_family = envSocketFamily(); // User specified interface - char* env = getenv("MSCCLPP_SOCKET_IFNAME"); + const std::string& socketIfname = env()->socketIfname; if (inputIfName) { INFO(MSCCLPP_NET, "using iterface %s", inputIfName); nIfs = findInterfaces(inputIfName, ifNames, ifAddrs, sock_family, ifNameMaxSize, maxIfs); - } else if (env && strlen(env) > 1) { - INFO(MSCCLPP_ENV, "MSCCLPP_SOCKET_IFNAME set by environment to %s", env); + } else if (socketIfname != "") { // Specified by user : find or fail - if (shownIfName++ == 0) INFO(MSCCLPP_NET, "MSCCLPP_SOCKET_IFNAME set to %s", env); - nIfs = findInterfaces(env, ifNames, ifAddrs, sock_family, ifNameMaxSize, maxIfs); + if (shownIfName++ == 0) INFO(MSCCLPP_NET, "MSCCLPP_SOCKET_IFNAME set to %s", socketIfname.c_str()); + nIfs = findInterfaces(socketIfname.c_str(), ifNames, ifAddrs, sock_family, ifNameMaxSize, maxIfs); } else { // Try to automatically pick the right one // Start with IB nIfs = findInterfaces("ib", ifNames, ifAddrs, sock_family, ifNameMaxSize, maxIfs); // else see if we can get some hint from COMM ID if (nIfs == 0) { - char* commId = getenv("MSCCLPP_COMM_ID"); - if (commId && strlen(commId) > 1) { - INFO(MSCCLPP_ENV, "MSCCLPP_COMM_ID set by environment to %s", commId); + const std::string& commId = env()->commId; + if (commId != "") { // Try to find interface that is in the same subnet as the IP in comm id union SocketAddress idAddr; - SocketGetAddrFromString(&idAddr, commId); + SocketGetAddrFromString(&idAddr, commId.c_str()); nIfs = FindInterfaceMatchSubnet(ifNames, ifAddrs, &idAddr, ifNameMaxSize, maxIfs); } } diff --git a/src/connection.cc b/src/connection.cc index fa37c405c..1e8d972e0 100644 --- a/src/connection.cc +++ b/src/connection.cc @@ -7,6 +7,7 @@ #include #endif +#include #include #include #include @@ -40,7 +41,8 @@ int Connection::getMaxWriteQueueSize() { return maxWriteQueueSize; } // CudaIpcConnection -CudaIpcConnection::CudaIpcConnection(Endpoint localEndpoint, Endpoint remoteEndpoint, cudaStream_t stream) +CudaIpcConnection::CudaIpcConnection(Endpoint localEndpoint, Endpoint remoteEndpoint, + std::shared_ptr stream) : Connection(localEndpoint.maxWriteQueueSize()), stream_(stream) { if (localEndpoint.transport() != Transport::CudaIpc) { throw mscclpp::Error("Cuda IPC connection can only be made from a Cuda IPC endpoint", ErrorCode::InvalidUsage); @@ -74,7 +76,9 @@ void CudaIpcConnection::write(RegisteredMemory dst, uint64_t dstOffset, Register char* dstPtr = (char*)dst.data(); char* srcPtr = (char*)src.data(); - MSCCLPP_CUDATHROW(cudaMemcpyAsync(dstPtr + dstOffset, srcPtr + srcOffset, size, cudaMemcpyDeviceToDevice, stream_)); + if (!env()->cudaIpcUseDefaultStream && stream_->empty()) stream_->set(cudaStreamNonBlocking); + + MSCCLPP_CUDATHROW(cudaMemcpyAsync(dstPtr + dstOffset, srcPtr + srcOffset, size, cudaMemcpyDeviceToDevice, *stream_)); INFO(MSCCLPP_P2P, "CudaIpcConnection write: from %p to %p, size %lu", srcPtr + srcOffset, dstPtr + dstOffset, size); #if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_CONN_CUDA_IPC_WRITE_EXIT) @@ -92,7 +96,9 @@ void CudaIpcConnection::updateAndSync(RegisteredMemory dst, uint64_t dstOffset, *src = newValue; uint64_t* dstPtr = reinterpret_cast(reinterpret_cast(dst.data()) + dstOffset); - MSCCLPP_CUDATHROW(cudaMemcpyAsync(dstPtr, src, sizeof(uint64_t), cudaMemcpyHostToDevice, stream_)); + if (!env()->cudaIpcUseDefaultStream && stream_->empty()) stream_->set(cudaStreamNonBlocking); + + MSCCLPP_CUDATHROW(cudaMemcpyAsync(dstPtr, src, sizeof(uint64_t), cudaMemcpyHostToDevice, *stream_)); INFO(MSCCLPP_P2P, "CudaIpcConnection atomic write: from %p to %p, %lu -> %lu", src, dstPtr + dstOffset, oldValue, newValue); @@ -109,8 +115,11 @@ void CudaIpcConnection::flush(int64_t timeoutUsec) { if (timeoutUsec >= 0) { INFO(MSCCLPP_P2P, "CudaIpcConnection flush: timeout is not supported, ignored"); } + + if (!env()->cudaIpcUseDefaultStream && stream_->empty()) stream_->set(cudaStreamNonBlocking); + AvoidCudaGraphCaptureGuard guard; - MSCCLPP_CUDATHROW(cudaStreamSynchronize(stream_)); + MSCCLPP_CUDATHROW(cudaStreamSynchronize(*stream_)); INFO(MSCCLPP_P2P, "CudaIpcConnection flushing connection"); #if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_CONN_CUDA_IPC_FLUSH_EXIT) diff --git a/src/context.cc b/src/context.cc index 12b52b78a..25c17c6da 100644 --- a/src/context.cc +++ b/src/context.cc @@ -11,7 +11,7 @@ namespace mscclpp { -Context::Impl::Impl() {} +Context::Impl::Impl() : ipcStream_(std::make_shared()) {} IbCtx* Context::Impl::getIbContext(Transport ibTransport) { // Find IB context or create it @@ -43,10 +43,7 @@ MSCCLPP_API_CPP std::shared_ptr Context::connect(Endpoint localEndpo if (remoteEndpoint.transport() != Transport::CudaIpc) { throw mscclpp::Error("Local transport is CudaIpc but remote is not", ErrorCode::InvalidUsage); } - if (!(pimpl_->ipcStream_)) { - pimpl_->ipcStream_ = std::make_shared(cudaStreamNonBlocking); - } - conn = std::make_shared(localEndpoint, remoteEndpoint, cudaStream_t(*(pimpl_->ipcStream_))); + conn = std::make_shared(localEndpoint, remoteEndpoint, pimpl_->ipcStream_); } else if (AllIBTransports.has(localEndpoint.transport())) { if (!AllIBTransports.has(remoteEndpoint.transport())) { throw mscclpp::Error("Local transport is IB but remote is not", ErrorCode::InvalidUsage); diff --git a/src/cuda_utils.cc b/src/cuda_utils.cc deleted file mode 100644 index 2e0d4a1b0..000000000 --- a/src/cuda_utils.cc +++ /dev/null @@ -1,19 +0,0 @@ -// Copyright (c) Microsoft Corporation. -// Licensed under the MIT license. - -#include - -namespace mscclpp { - -AvoidCudaGraphCaptureGuard::AvoidCudaGraphCaptureGuard() : mode_(cudaStreamCaptureModeRelaxed) { - MSCCLPP_CUDATHROW(cudaThreadExchangeStreamCaptureMode(&mode_)); -} - -AvoidCudaGraphCaptureGuard::~AvoidCudaGraphCaptureGuard() { (void)cudaThreadExchangeStreamCaptureMode(&mode_); } - -CudaStreamWithFlags::CudaStreamWithFlags(unsigned int flags) { - MSCCLPP_CUDATHROW(cudaStreamCreateWithFlags(&stream_, flags)); -} -CudaStreamWithFlags::~CudaStreamWithFlags() { (void)cudaStreamDestroy(stream_); } - -} // namespace mscclpp diff --git a/src/debug.cc b/src/debug.cc index dea9ee713..a8350fbab 100644 --- a/src/debug.cc +++ b/src/debug.cc @@ -9,6 +9,7 @@ #include #include +#include #include #include #include @@ -34,7 +35,7 @@ void mscclppDebugInit() { pthread_mutex_unlock(&mscclppDebugLock); return; } - const char* mscclpp_debug = getenv("MSCCLPP_DEBUG"); + const char* mscclpp_debug = mscclpp::env()->debug.c_str(); int tempNcclDebugLevel = -1; if (mscclpp_debug == NULL) { tempNcclDebugLevel = MSCCLPP_LOG_NONE; @@ -54,8 +55,9 @@ void mscclppDebugInit() { * This can be a comma separated list such as INIT,COLL * or ^INIT,COLL etc */ - char* mscclppDebugSubsysEnv = getenv("MSCCLPP_DEBUG_SUBSYS"); - if (mscclppDebugSubsysEnv != NULL) { + std::string mscclppDebugSubsysStr = mscclpp::env()->debugSubsys; + const char* mscclppDebugSubsysEnv = mscclppDebugSubsysStr.c_str(); + if (mscclppDebugSubsysStr != "") { int invert = 0; if (mscclppDebugSubsysEnv[0] == '^') { invert = 1; @@ -108,7 +110,7 @@ void mscclppDebugInit() { * then create the debug file. But don't bother unless the * MSCCLPP_DEBUG level is > VERSION */ - const char* mscclppDebugFileEnv = getenv("MSCCLPP_DEBUG_FILE"); + const char* mscclppDebugFileEnv = mscclpp::env()->debugFile.c_str(); if (tempNcclDebugLevel > MSCCLPP_LOG_VERSION && mscclppDebugFileEnv != NULL) { int c = 0; char debugFn[PATH_MAX + 1] = ""; diff --git a/src/env.cpp b/src/env.cpp new file mode 100644 index 000000000..625de0ad3 --- /dev/null +++ b/src/env.cpp @@ -0,0 +1,87 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +#include +#include + +// clang-format off +#include +#include +// clang-format on + +#include "debug.h" + +template +T readEnv(const std::string &envName, const T &defaultValue) { + const char *envCstr = getenv(envName.c_str()); + if (envCstr == nullptr) return defaultValue; + if constexpr (std::is_same_v) { + return atoi(envCstr); + } else if constexpr (std::is_same_v) { + return (std::string(envCstr) != "0"); + } + return T(envCstr); +} + +template +void readAndSetEnv(const std::string &envName, T &env) { + const char *envCstr = getenv(envName.c_str()); + if (envCstr == nullptr) return; + if constexpr (std::is_same_v) { + env = atoi(envCstr); + } else if constexpr (std::is_same_v) { + env = (std::string(envCstr) != "0"); + } else { + env = std::string(envCstr); + } +} + +template +void logEnv(const std::string &envName, const T &env) { + if (!getenv(envName.c_str())) return; + INFO(MSCCLPP_ENV, "%s=%d", envName.c_str(), env); +} + +template <> +void logEnv(const std::string &envName, const std::string &env) { + if (!getenv(envName.c_str())) return; + INFO(MSCCLPP_ENV, "%s=%s", envName.c_str(), env.c_str()); +} + +namespace mscclpp { + +Env::Env() + : debug(readEnv("MSCCLPP_DEBUG", "")), + debugSubsys(readEnv("MSCCLPP_DEBUG_SUBSYS", "")), + debugFile(readEnv("MSCCLPP_DEBUG_FILE", "")), + hcaDevices(readEnv("MSCCLPP_HCA_DEVICES", "")), + hostid(readEnv("MSCCLPP_HOSTID", "")), + socketFamily(readEnv("MSCCLPP_SOCKET_FAMILY", "")), + socketIfname(readEnv("MSCCLPP_SOCKET_IFNAME", "")), + commId(readEnv("MSCCLPP_COMM_ID", "")), + executionPlanDir(readEnv("MSCCLPP_EXECUTION_PLAN_DIR", "")), + npkitDumpDir(readEnv("MSCCLPP_NPKIT_DUMP_DIR", "")), + cudaIpcUseDefaultStream(readEnv("MSCCLPP_CUDAIPC_USE_DEFAULT_STREAM", false)) {} + +std::shared_ptr env() { + static std::shared_ptr globalEnv = std::shared_ptr(new Env()); + static bool logged = false; + if (!logged) { + logged = true; + // cannot log inside the constructor because of circular dependency + logEnv("MSCCLPP_DEBUG", globalEnv->debug); + logEnv("MSCCLPP_DEBUG_SUBSYS", globalEnv->debugSubsys); + logEnv("MSCCLPP_DEBUG_FILE", globalEnv->debugFile); + logEnv("MSCCLPP_HCA_DEVICES", globalEnv->hcaDevices); + logEnv("MSCCLPP_HOSTID", globalEnv->hostid); + logEnv("MSCCLPP_SOCKET_FAMILY", globalEnv->socketFamily); + logEnv("MSCCLPP_SOCKET_IFNAME", globalEnv->socketIfname); + logEnv("MSCCLPP_COMM_ID", globalEnv->commId); + logEnv("MSCCLPP_EXECUTION_PLAN_DIR", globalEnv->executionPlanDir); + logEnv("MSCCLPP_NPKIT_DUMP_DIR", globalEnv->npkitDumpDir); + logEnv("MSCCLPP_CUDAIPC_USE_DEFAULT_STREAM", globalEnv->cudaIpcUseDefaultStream); + } + return globalEnv; +} + +} // namespace mscclpp diff --git a/src/gpu_utils.cc b/src/gpu_utils.cc index c70cdcfa1..aac7a611f 100644 --- a/src/gpu_utils.cc +++ b/src/gpu_utils.cc @@ -7,6 +7,27 @@ namespace mscclpp { +AvoidCudaGraphCaptureGuard::AvoidCudaGraphCaptureGuard() : mode_(cudaStreamCaptureModeRelaxed) { + MSCCLPP_CUDATHROW(cudaThreadExchangeStreamCaptureMode(&mode_)); +} + +AvoidCudaGraphCaptureGuard::~AvoidCudaGraphCaptureGuard() { (void)cudaThreadExchangeStreamCaptureMode(&mode_); } + +CudaStreamWithFlags::CudaStreamWithFlags(unsigned int flags) { + MSCCLPP_CUDATHROW(cudaStreamCreateWithFlags(&stream_, flags)); +} + +CudaStreamWithFlags::~CudaStreamWithFlags() { + if (!empty()) (void)cudaStreamDestroy(stream_); +} + +void CudaStreamWithFlags::set(unsigned int flags) { + if (!empty()) throw Error("CudaStreamWithFlags already set", ErrorCode::InternalError); + MSCCLPP_CUDATHROW(cudaStreamCreateWithFlags(&stream_, flags)); +} + +bool CudaStreamWithFlags::empty() const { return stream_ == nullptr; } + namespace detail { /// set memory access permission to read-write diff --git a/src/ib.cc b/src/ib.cc index 107abc77d..542527e3f 100644 --- a/src/ib.cc +++ b/src/ib.cc @@ -9,6 +9,8 @@ #include #include #include +#include +#include #include #include #include @@ -388,18 +390,16 @@ MSCCLPP_API_CPP int getIBDeviceCount() { } std::string getHcaDevices(int deviceIndex) { - const char* envValue = std::getenv("MSCCLPP_HCA_DEVICES"); - if (envValue) { + std::string envStr = env()->hcaDevices; + if (envStr != "") { std::vector devices; - std::string envStr(envValue); std::stringstream ss(envStr); std::string device; while (std::getline(ss, device, ',')) { devices.push_back(device); } if (deviceIndex >= (int)devices.size()) { - throw std::invalid_argument("Not enough HCA devices are defined with MSCCLPP_HCA_DEVICES: " + - std::string(envValue)); + throw Error("Not enough HCA devices are defined with MSCCLPP_HCA_DEVICES: " + envStr, ErrorCode::InvalidUsage); } return devices[deviceIndex]; } @@ -434,7 +434,7 @@ MSCCLPP_API_CPP std::string getIBDeviceName(Transport ibTransport) { ibTransportIndex = 7; break; default: - throw std::invalid_argument("Not an IB transport"); + throw Error("Not an IB transport", ErrorCode::InvalidUsage); } std::string userHcaDevice = getHcaDevices(ibTransportIndex); if (!userHcaDevice.empty()) { @@ -446,7 +446,7 @@ MSCCLPP_API_CPP std::string getIBDeviceName(Transport ibTransport) { if (ibTransportIndex >= num) { std::stringstream ss; ss << "IB transport out of range: " << ibTransportIndex << " >= " << num; - throw std::out_of_range(ss.str()); + throw Error(ss.str(), ErrorCode::InvalidUsage); } return devices[ibTransportIndex]->name; } @@ -474,11 +474,11 @@ MSCCLPP_API_CPP Transport getIBTransportByDeviceName(const std::string& ibDevice case 7: return Transport::IB7; default: - throw std::out_of_range("IB device index out of range"); + throw Error("IB device index out of range", ErrorCode::InvalidUsage); } } } - throw std::invalid_argument("IB device not found"); + throw Error("IB device not found", ErrorCode::InvalidUsage); } #else // !defined(USE_IBVERBS) diff --git a/src/include/connection.hpp b/src/include/connection.hpp index 283bb8d07..258f9c0f3 100644 --- a/src/include/connection.hpp +++ b/src/include/connection.hpp @@ -5,7 +5,7 @@ #define MSCCLPP_CONNECTION_HPP_ #include -#include +#include #include "communicator.hpp" #include "context.hpp" @@ -16,10 +16,10 @@ namespace mscclpp { class CudaIpcConnection : public Connection { - cudaStream_t stream_; + std::shared_ptr stream_; public: - CudaIpcConnection(Endpoint localEndpoint, Endpoint remoteEndpoint, cudaStream_t stream); + CudaIpcConnection(Endpoint localEndpoint, Endpoint remoteEndpoint, std::shared_ptr stream); Transport transport() override; diff --git a/src/utils_internal.cc b/src/utils_internal.cc index b3f3c28e3..ce0a4cdec 100644 --- a/src/utils_internal.cc +++ b/src/utils_internal.cc @@ -8,6 +8,7 @@ #include #include #include +#include #include #include @@ -77,10 +78,9 @@ uint64_t computeHostHash(void) { std::string hostName = getHostName(hashLen, '\0'); strncpy(hostHash, hostName.c_str(), hostName.size()); - char* hostId; - if ((hostId = getenv("MSCCLPP_HOSTID")) != NULL) { - INFO(MSCCLPP_ENV, "MSCCLPP_HOSTID set by environment to %s", hostId); - strncpy(hostHash, hostId, hashLen); + std::string hostid = env()->hostid; + if (hostid != "") { + strncpy(hostHash, hostid.c_str(), hashLen); } else if (hostName.size() < hashLen) { std::ifstream file(HOSTID_FILE, std::ios::binary); if (file.is_open()) { diff --git a/test/executor_test.cc b/test/executor_test.cc index ff5ad9e36..afab876db 100644 --- a/test/executor_test.cc +++ b/test/executor_test.cc @@ -5,6 +5,7 @@ #include #include +#include #include #include #include @@ -109,7 +110,7 @@ int main(int argc, char* argv[]) { const std::string executionPlanPath = argv[2]; const int niters = std::stoi(argv[3]); const int ngraphIters = std::stoi(argv[4]); - const char* npkitDumpDir = getenv("NPKIT_DUMP_DIR"); + const char* npkitDumpDir = mscclpp::env()->npkitDumpDir.c_str(); mscclpp::PacketType packetType = mscclpp::PacketType::LL16; if (argc == 6) { packetType = parsePacketType(argv[5]); diff --git a/test/mp_unit/executor_tests.cc b/test/mp_unit/executor_tests.cc index d5f87e6d7..7075d1a43 100644 --- a/test/mp_unit/executor_tests.cc +++ b/test/mp_unit/executor_tests.cc @@ -4,6 +4,7 @@ #include #include +#include #include #include "mp_unit_tests.hpp" @@ -31,14 +32,14 @@ void ExecutorTest::SetUp() { bootstrap->initialize(id); std::shared_ptr communicator = std::make_shared(bootstrap); executor = std::make_shared(communicator); - npkitDumpDir = getenv("NPKIT_DUMP_DIR"); - if (npkitDumpDir != nullptr) { + npkitDumpDir = mscclpp::env()->npkitDumpDir; + if (npkitDumpDir != "") { NpKit::Init(gEnv->rank); } } void ExecutorTest::TearDown() { - if (npkitDumpDir != nullptr) { + if (npkitDumpDir != "") { NpKit::Dump(npkitDumpDir); NpKit::Shutdown(); } diff --git a/test/mp_unit/mp_unit_tests.hpp b/test/mp_unit/mp_unit_tests.hpp index e1c9068ef..c00ecb6b6 100644 --- a/test/mp_unit/mp_unit_tests.hpp +++ b/test/mp_unit/mp_unit_tests.hpp @@ -170,6 +170,6 @@ class ExecutorTest : public MultiProcessTest { void TearDown() override; std::shared_ptr executor; - const char* npkitDumpDir; + std::string npkitDumpDir; }; #endif // MSCCLPP_MP_UNIT_TESTS_HPP_ diff --git a/tools/npkit/build_and_run_npkit.sh b/tools/npkit/build_and_run_npkit.sh index c2ccea049..3f1d2a2e8 100644 --- a/tools/npkit/build_and_run_npkit.sh +++ b/tools/npkit/build_and_run_npkit.sh @@ -17,6 +17,6 @@ parallel-ssh -h ${HOSTFILE} "mkdir -p ${NPKIT_RUN_DIR}/npkit_dump" parallel-ssh -h ${HOSTFILE} "mkdir -p ${NPKIT_RUN_DIR}/npkit_trace" # --bind-to numa is required because hardware timer from different cores (or core groups) can be non-synchronized. -mpirun --allow-run-as-root -hostfile ${HOSTFILE} -map-by ppr:8:node --bind-to numa -x LD_PRELOAD=${NPKIT_RUN_DIR}/mscclpp/build/lib/libmscclpp.so -x MSCCLPP_DEBUG=WARN -x NPKIT_DUMP_DIR=${NPKIT_RUN_DIR}/npkit_dump ${NPKIT_RUN_DIR}/mscclpp/build/bin/tests/allgather_test -ip_port ${LEADER_IP_PORT} -kernel 0 +mpirun --allow-run-as-root -hostfile ${HOSTFILE} -map-by ppr:8:node --bind-to numa -x LD_PRELOAD=${NPKIT_RUN_DIR}/mscclpp/build/lib/libmscclpp.so -x MSCCLPP_DEBUG=WARN -x MSCCLPP_NPKIT_DUMP_DIR=${NPKIT_RUN_DIR}/npkit_dump ${NPKIT_RUN_DIR}/mscclpp/build/bin/tests/allgather_test -ip_port ${LEADER_IP_PORT} -kernel 0 parallel-ssh -h ${HOSTFILE} "cd ${NPKIT_RUN_DIR}/mscclpp/tools/npkit && python npkit_trace_generator.py --npkit_dump_dir ${NPKIT_RUN_DIR}/npkit_dump --npkit_event_header_path ${NPKIT_RUN_DIR}/mscclpp/src/include/npkit/npkit_event.h --output_dir ${NPKIT_RUN_DIR}/npkit_trace"