Skip to content

Commit

Permalink
Merge branch 'main' into chhwang/macro
Browse files Browse the repository at this point in the history
  • Loading branch information
chhwang authored Jan 18, 2025
2 parents 59f727c + 4ee15b7 commit ac5f17d
Show file tree
Hide file tree
Showing 25 changed files with 271 additions and 82 deletions.
4 changes: 2 additions & 2 deletions .azure-pipelines/ut.yml
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ jobs:
cd /root/mscclpp; \
rm -rf ./npkit_dump && mkdir ./npkit_dump && rm -rf ./npkit_output && mkdir ./npkit_output; \
export PATH=/usr/local/mpi/bin:\$PATH; \
export NPKIT_DUMP_DIR=./npkit_dump; \
export MSCCLPP_NPKIT_DUMP_DIR=./npkit_dump; \
export LD_LIBRARY_PATH=/root/mscclpp/build:\$LD_LIBRARY_PATH; \
mpirun --allow-run-as-root -tag-output -np 2 ./build/test/mp_unit_tests --gtest_filter=\"ExecutorTest.TwoNodesAllreduce\"; \
python3 ./tools/npkit/npkit_trace_generator.py --npkit_dump_dir=./npkit_dump --npkit_event_header_path=./include/mscclpp/npkit/npkit_event.hpp --output_dir=./npkit_output; \
Expand Down Expand Up @@ -270,7 +270,7 @@ jobs:
cd /root/mscclpp; \
rm -rf ./npkit_dump && mkdir ./npkit_dump && rm -rf ./npkit_output && mkdir ./npkit_output; \
export PATH=/usr/local/mpi/bin:\$PATH; \
export NPKIT_DUMP_DIR=./npkit_dump; \
export MSCCLPP_NPKIT_DUMP_DIR=./npkit_dump; \
export LD_LIBRARY_PATH=/root/mscclpp/build:\$LD_LIBRARY_PATH; \
mpirun --allow-run-as-root -tag-output -x MSCCLPP_HOME=/root/mscclpp -np 8 python3 -m pytest ./python/test/test_mscclpp.py -x -k 'test_executor[allreduce.json'; \
python3 ./tools/npkit/npkit_trace_generator.py --npkit_dump_dir=./npkit_dump --npkit_event_header_path=./include/mscclpp/npkit/npkit_event.hpp --output_dir=./npkit_output; \
Expand Down
12 changes: 6 additions & 6 deletions apps/nccl/src/nccl.cu
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include <filesystem>
#include <mscclpp/concurrency_device.hpp>
#include <mscclpp/core.hpp>
#include <mscclpp/env.hpp>
#include <mscclpp/executor.hpp>
#include <mscclpp/sm_channel.hpp>
#include <mscclpp/sm_channel_device.hpp>
Expand Down Expand Up @@ -414,8 +415,8 @@ NCCL_API ncclResult_t ncclCommInitRank(ncclComm_t* comm, int nranks, ncclUniqueI
if (mscclppComm->bootstrap()->getNranks() == mscclppComm->bootstrap()->getNranksPerNode())
ncclCommInitRankFallbackSingleNode(commPtr, mscclppComm, rank);

if (getenv("MSCCLPP_EXECUTION_PLAN_DIR")) {
std::string collectiveDir = getenv("MSCCLPP_EXECUTION_PLAN_DIR");
const std::string& collectiveDir = mscclpp::env()->executionPlanDir;
if (collectiveDir != "") {
if (!std::filesystem::is_directory(collectiveDir)) {
WARN("The value of the environment variable %s is not a directory", collectiveDir.c_str());
return ncclInvalidArgument;
Expand All @@ -430,8 +431,7 @@ NCCL_API ncclResult_t ncclCommInitRank(ncclComm_t* comm, int nranks, ncclUniqueI

*comm = commPtr;
#if defined(ENABLE_NPKIT)
const char* npkitDumpDir = getenv("NPKIT_DUMP_DIR");
if (npkitDumpDir != nullptr) {
if (mscclpp::env()->npkitDumpDir != "") {
NpKit::Init(rank);
}
#endif
Expand All @@ -455,8 +455,8 @@ NCCL_API ncclResult_t ncclCommDestroy(ncclComm_t comm) {
return ncclInvalidArgument;
}
#if defined(ENABLE_NPKIT)
const char* npkitDumpDir = getenv("NPKIT_DUMP_DIR");
if (npkitDumpDir != nullptr) {
const std::string& npkitDumpDir = mscclpp::env()->npkitDumpDir;
if (npkitDumpDir != "") {
NpKit::Dump(npkitDumpDir);
NpKit::Shutdown();
}
Expand Down
42 changes: 42 additions & 0 deletions include/mscclpp/env.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.

#ifndef MSCCLPP_ENV_HPP_
#define MSCCLPP_ENV_HPP_

#include <memory>
#include <string>

namespace mscclpp {

class Env;

/// Get the MSCCL++ environment.
/// @return A reference to the global environment object.
std::shared_ptr<Env> env();

/// The MSCCL++ environment. The constructor reads environment variables and sets the corresponding fields.
/// Use the @ref env() function to get the environment object.
class Env {
public:
const std::string debug;
const std::string debugSubsys;
const std::string debugFile;
const std::string hcaDevices;
const std::string hostid;
const std::string socketFamily;
const std::string socketIfname;
const std::string commId;
const std::string executionPlanDir;
const std::string npkitDumpDir;
const bool cudaIpcUseDefaultStream;

private:
Env();

friend std::shared_ptr<Env> env();
};

} // namespace mscclpp

#endif // MSCCLPP_ENV_HPP_
3 changes: 3 additions & 0 deletions include/mscclpp/gpu_utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,11 @@ struct AvoidCudaGraphCaptureGuard {

/// A RAII wrapper around cudaStream_t that will call cudaStreamDestroy on destruction.
struct CudaStreamWithFlags {
CudaStreamWithFlags() : stream_(nullptr) {}
CudaStreamWithFlags(unsigned int flags);
~CudaStreamWithFlags();
void set(unsigned int flags);
bool empty() const;
operator cudaStream_t() const { return stream_; }
cudaStream_t stream_;
};
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ version = "0.6.0"

[tool.scikit-build]
cmake.version = ">=3.25.0"
cmake.build-type = "Release"
build-dir = "build/{wheel_tag}"
wheel.packages = ["python/mscclpp", "python/mscclpp_benchmark"]
wheel.install-dir = "mscclpp"
Expand Down
2 changes: 2 additions & 0 deletions python/mscclpp/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import os as _os

from ._mscclpp import (
Env,
ErrorCode,
BaseError,
Error,
Expand Down Expand Up @@ -32,6 +33,7 @@
ExecutionPlan,
PacketType,
RawGpuBuffer,
env,
version,
is_nvls_supported,
npkit,
Expand Down
2 changes: 2 additions & 0 deletions python/mscclpp/core_py.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
namespace nb = nanobind;
using namespace mscclpp;

extern void register_env(nb::module_& m);
extern void register_error(nb::module_& m);
extern void register_proxy_channel(nb::module_& m);
extern void register_sm_channel(nb::module_& m);
Expand Down Expand Up @@ -184,6 +185,7 @@ void register_core(nb::module_& m) {
}

NB_MODULE(_mscclpp, m) {
register_env(m);
register_error(m);
register_proxy_channel(m);
register_sm_channel(m);
Expand Down
28 changes: 28 additions & 0 deletions python/mscclpp/env_py.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.

#include <nanobind/nanobind.h>
#include <nanobind/stl/shared_ptr.h>
#include <nanobind/stl/string.h>

#include <mscclpp/env.hpp>

namespace nb = nanobind;
using namespace mscclpp;

void register_env(nb::module_& m) {
nb::class_<Env>(m, "Env")
.def_ro("debug", &Env::debug)
.def_ro("debug_subsys", &Env::debugSubsys)
.def_ro("debug_file", &Env::debugFile)
.def_ro("hca_devices", &Env::hcaDevices)
.def_ro("hostid", &Env::hostid)
.def_ro("socket_family", &Env::socketFamily)
.def_ro("socket_ifname", &Env::socketIfname)
.def_ro("comm_id", &Env::commId)
.def_ro("execution_plan_dir", &Env::executionPlanDir)
.def_ro("npkit_dump_dir", &Env::npkitDumpDir)
.def_ro("cuda_ipc_use_default_stream", &Env::cudaIpcUseDefaultStream);

m.def("env", &env);
}
5 changes: 3 additions & 2 deletions python/test/executor_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
ExecutionPlan,
PacketType,
npkit,
env,
)
import mscclpp.comm as mscclpp_comm
from mscclpp.utils import KernelBuilder, GpuBuffer, pack
Expand Down Expand Up @@ -171,8 +172,8 @@ def main(
mscclpp_group = mscclpp_comm.CommGroup(MPI.COMM_WORLD)
cp.cuda.Device(mscclpp_group.my_rank % mscclpp_group.nranks_per_node).use()
executor = Executor(mscclpp_group.communicator)
npkit_dump_dir = os.getenv("NPKIT_DUMP_DIR")
if npkit_dump_dir is not None:
npkit_dump_dir = env().npkit_dump_dir
if npkit_dump_dir != "":
npkit.init(mscclpp_group.my_rank)
execution_plan = ExecutionPlan(execution_plan_path)
collective = execution_plan.collective()
Expand Down
18 changes: 16 additions & 2 deletions python/test/test_mscclpp.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
Transport,
is_nvls_supported,
npkit,
env,
)
import mscclpp.comm as mscclpp_comm
from mscclpp.utils import KernelBuilder, GpuBuffer, pack
Expand All @@ -36,6 +37,19 @@
ethernet_interface_name = "eth0"


@parametrize_mpi_groups(1)
def test_env(mpi_group: MpiGroup):
e = env()
assert isinstance(e.debug, str)
with pytest.raises(AttributeError):
# all attributes should be read-only
e.debug = "INFO"

# should be the same object
e2 = env()
assert e == e2


def all_ranks_on_the_same_node(mpi_group: MpiGroup):
if (ethernet_interface_name in ni.interfaces()) is False:
pytest.skip(f"{ethernet_interface_name} is not an interface to use on this node")
Expand Down Expand Up @@ -624,8 +638,8 @@ def test_executor(mpi_group: MpiGroup, filename: str):
project_dir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
mscclpp_group = mscclpp_comm.CommGroup(mpi_group.comm)
executor = Executor(mscclpp_group.communicator)
npkit_dump_dir = os.getenv("NPKIT_DUMP_DIR")
if npkit_dump_dir is not None:
npkit_dump_dir = env().npkit_dump_dir
if npkit_dump_dir != "":
npkit.init(mscclpp_group.my_rank)
execution_plan = ExecutionPlan(os.path.join(project_dir, "test", "execution-files", filename))

Expand Down
2 changes: 1 addition & 1 deletion src/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT license.

file(GLOB_RECURSE SOURCES CONFIGURE_DEPENDS *.cc *.cu)
file(GLOB_RECURSE SOURCES CONFIGURE_DEPENDS *.cc *.cpp *.cu)
target_sources(mscclpp_obj PRIVATE ${SOURCES})
target_include_directories(mscclpp_obj PRIVATE include)
27 changes: 12 additions & 15 deletions src/bootstrap/socket.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include <unistd.h>

#include <fstream>
#include <mscclpp/env.hpp>
#include <mscclpp/errors.hpp>
#include <mscclpp/utils.hpp>
#include <sstream>
Expand Down Expand Up @@ -64,14 +65,12 @@ static uint16_t socketToPort(union SocketAddress* addr) {
/* Allow the user to force the IPv4/IPv6 interface selection */
static int envSocketFamily(void) {
int family = -1; // Family selection is not forced, will use first one found
char* env = getenv("MSCCLPP_SOCKET_FAMILY");
if (env == NULL) return family;
const std::string& socketFamily = env()->socketFamily;
if (socketFamily == "") return family;

INFO(MSCCLPP_ENV, "MSCCLPP_SOCKET_FAMILY set by environment to %s", env);

if (strcmp(env, "AF_INET") == 0)
if (socketFamily == "AF_INET")
family = AF_INET; // IPv4
else if (strcmp(env, "AF_INET6") == 0)
else if (socketFamily == "AF_INET6")
family = AF_INET6; // IPv6
return family;
}
Expand Down Expand Up @@ -306,27 +305,25 @@ int FindInterfaces(char* ifNames, union SocketAddress* ifAddrs, int ifNameMaxSiz
// Allow user to force the INET socket family selection
int sock_family = envSocketFamily();
// User specified interface
char* env = getenv("MSCCLPP_SOCKET_IFNAME");
const std::string& socketIfname = env()->socketIfname;
if (inputIfName) {
INFO(MSCCLPP_NET, "using iterface %s", inputIfName);
nIfs = findInterfaces(inputIfName, ifNames, ifAddrs, sock_family, ifNameMaxSize, maxIfs);
} else if (env && strlen(env) > 1) {
INFO(MSCCLPP_ENV, "MSCCLPP_SOCKET_IFNAME set by environment to %s", env);
} else if (socketIfname != "") {
// Specified by user : find or fail
if (shownIfName++ == 0) INFO(MSCCLPP_NET, "MSCCLPP_SOCKET_IFNAME set to %s", env);
nIfs = findInterfaces(env, ifNames, ifAddrs, sock_family, ifNameMaxSize, maxIfs);
if (shownIfName++ == 0) INFO(MSCCLPP_NET, "MSCCLPP_SOCKET_IFNAME set to %s", socketIfname.c_str());
nIfs = findInterfaces(socketIfname.c_str(), ifNames, ifAddrs, sock_family, ifNameMaxSize, maxIfs);
} else {
// Try to automatically pick the right one
// Start with IB
nIfs = findInterfaces("ib", ifNames, ifAddrs, sock_family, ifNameMaxSize, maxIfs);
// else see if we can get some hint from COMM ID
if (nIfs == 0) {
char* commId = getenv("MSCCLPP_COMM_ID");
if (commId && strlen(commId) > 1) {
INFO(MSCCLPP_ENV, "MSCCLPP_COMM_ID set by environment to %s", commId);
const std::string& commId = env()->commId;
if (commId != "") {
// Try to find interface that is in the same subnet as the IP in comm id
union SocketAddress idAddr;
SocketGetAddrFromString(&idAddr, commId);
SocketGetAddrFromString(&idAddr, commId.c_str());
nIfs = FindInterfaceMatchSubnet(ifNames, ifAddrs, &idAddr, ifNameMaxSize, maxIfs);
}
}
Expand Down
17 changes: 13 additions & 4 deletions src/connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include <mscclpp/npkit/npkit.hpp>
#endif

#include <mscclpp/env.hpp>
#include <mscclpp/utils.hpp>
#include <sstream>
#include <thread>
Expand Down Expand Up @@ -40,7 +41,8 @@ int Connection::getMaxWriteQueueSize() { return maxWriteQueueSize; }

// CudaIpcConnection

CudaIpcConnection::CudaIpcConnection(Endpoint localEndpoint, Endpoint remoteEndpoint, cudaStream_t stream)
CudaIpcConnection::CudaIpcConnection(Endpoint localEndpoint, Endpoint remoteEndpoint,
std::shared_ptr<CudaStreamWithFlags> stream)
: Connection(localEndpoint.maxWriteQueueSize()), stream_(stream) {
if (localEndpoint.transport() != Transport::CudaIpc) {
throw mscclpp::Error("Cuda IPC connection can only be made from a Cuda IPC endpoint", ErrorCode::InvalidUsage);
Expand Down Expand Up @@ -74,7 +76,9 @@ void CudaIpcConnection::write(RegisteredMemory dst, uint64_t dstOffset, Register
char* dstPtr = (char*)dst.data();
char* srcPtr = (char*)src.data();

MSCCLPP_CUDATHROW(cudaMemcpyAsync(dstPtr + dstOffset, srcPtr + srcOffset, size, cudaMemcpyDeviceToDevice, stream_));
if (!env()->cudaIpcUseDefaultStream && stream_->empty()) stream_->set(cudaStreamNonBlocking);

MSCCLPP_CUDATHROW(cudaMemcpyAsync(dstPtr + dstOffset, srcPtr + srcOffset, size, cudaMemcpyDeviceToDevice, *stream_));
INFO(MSCCLPP_P2P, "CudaIpcConnection write: from %p to %p, size %lu", srcPtr + srcOffset, dstPtr + dstOffset, size);

#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_CONN_CUDA_IPC_WRITE_EXIT)
Expand All @@ -92,7 +96,9 @@ void CudaIpcConnection::updateAndSync(RegisteredMemory dst, uint64_t dstOffset,
*src = newValue;
uint64_t* dstPtr = reinterpret_cast<uint64_t*>(reinterpret_cast<char*>(dst.data()) + dstOffset);

MSCCLPP_CUDATHROW(cudaMemcpyAsync(dstPtr, src, sizeof(uint64_t), cudaMemcpyHostToDevice, stream_));
if (!env()->cudaIpcUseDefaultStream && stream_->empty()) stream_->set(cudaStreamNonBlocking);

MSCCLPP_CUDATHROW(cudaMemcpyAsync(dstPtr, src, sizeof(uint64_t), cudaMemcpyHostToDevice, *stream_));
INFO(MSCCLPP_P2P, "CudaIpcConnection atomic write: from %p to %p, %lu -> %lu", src, dstPtr + dstOffset, oldValue,
newValue);

Expand All @@ -109,8 +115,11 @@ void CudaIpcConnection::flush(int64_t timeoutUsec) {
if (timeoutUsec >= 0) {
INFO(MSCCLPP_P2P, "CudaIpcConnection flush: timeout is not supported, ignored");
}

if (!env()->cudaIpcUseDefaultStream && stream_->empty()) stream_->set(cudaStreamNonBlocking);

AvoidCudaGraphCaptureGuard guard;
MSCCLPP_CUDATHROW(cudaStreamSynchronize(stream_));
MSCCLPP_CUDATHROW(cudaStreamSynchronize(*stream_));
INFO(MSCCLPP_P2P, "CudaIpcConnection flushing connection");

#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_CONN_CUDA_IPC_FLUSH_EXIT)
Expand Down
7 changes: 2 additions & 5 deletions src/context.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

namespace mscclpp {

Context::Impl::Impl() {}
Context::Impl::Impl() : ipcStream_(std::make_shared<CudaStreamWithFlags>()) {}

IbCtx* Context::Impl::getIbContext(Transport ibTransport) {
// Find IB context or create it
Expand Down Expand Up @@ -43,10 +43,7 @@ MSCCLPP_API_CPP std::shared_ptr<Connection> Context::connect(Endpoint localEndpo
if (remoteEndpoint.transport() != Transport::CudaIpc) {
throw mscclpp::Error("Local transport is CudaIpc but remote is not", ErrorCode::InvalidUsage);
}
if (!(pimpl_->ipcStream_)) {
pimpl_->ipcStream_ = std::make_shared<CudaStreamWithFlags>(cudaStreamNonBlocking);
}
conn = std::make_shared<CudaIpcConnection>(localEndpoint, remoteEndpoint, cudaStream_t(*(pimpl_->ipcStream_)));
conn = std::make_shared<CudaIpcConnection>(localEndpoint, remoteEndpoint, pimpl_->ipcStream_);
} else if (AllIBTransports.has(localEndpoint.transport())) {
if (!AllIBTransports.has(remoteEndpoint.transport())) {
throw mscclpp::Error("Local transport is IB but remote is not", ErrorCode::InvalidUsage);
Expand Down
Loading

0 comments on commit ac5f17d

Please sign in to comment.