From a4a55f5ea6403472a25b12a6e9b8f4f713e664a3 Mon Sep 17 00:00:00 2001 From: Xiang Gao Date: Thu, 25 Aug 2022 21:33:15 +0000 Subject: [PATCH] New TORCH_UCC_BLOCKING_WAIT env variable (#81791) Cherry-pick of https://github.com/facebookresearch/torch_ucc/pull/95. I recommend waiting until https://github.com/pytorch/pytorch/pull/81583 is merged first, so the CI is checking if this PR compiles correctly. Marking this as a draft for now, will change to "ready for review" once https://github.com/pytorch/pytorch/pull/81583 merged. Pull Request resolved: https://github.com/pytorch/pytorch/pull/81791 Approved by: https://github.com/kwen2501 --- .circleci/docker/build.sh | 16 ++-- .circleci/docker/common/install_ucc.sh | 9 +- .jenkins/pytorch/build.sh | 4 +- test/cpp/c10d/CMakeLists.txt | 13 +++ test/cpp/c10d/ProcessGroupUCCTest.cpp | 35 ++++++++ .../csrc/distributed/c10d/ProcessGroupUCC.cpp | 84 +++++++++++-------- torch/csrc/distributed/c10d/UCCUtils.cpp | 5 ++ torch/csrc/distributed/c10d/UCCUtils.hpp | 33 ++++++++ 8 files changed, 153 insertions(+), 46 deletions(-) create mode 100644 test/cpp/c10d/ProcessGroupUCCTest.cpp diff --git a/.circleci/docker/build.sh b/.circleci/docker/build.sh index 6eeee5f1ebaa8..b7fef829b798e 100755 --- a/.circleci/docker/build.sh +++ b/.circleci/docker/build.sh @@ -84,8 +84,8 @@ if [[ "$image" == *xenial* ]] || [[ "$image" == *bionic* ]]; then fi TRAVIS_DL_URL_PREFIX="https://s3.amazonaws.com/travis-python-archives/binaries/ubuntu/14.04/x86_64" -UCX_COMMIT=v1.13.x -UCC_COMMIT=a7bda274b10f8adf5bb729f01da064f4e735fb23 +_UCX_COMMIT=31e74cac7bee0ef66bef2af72e7d86d9c282e5ab +_UCC_COMMIT=12944da33f911daf505d9bbc51411233d0ed85e1 # It's annoying to rename jobs every time you want to rewrite a # configuration, so we hardcode everything here rather than do it @@ -149,8 +149,8 @@ case "$image" in DB=yes VISION=yes KATEX=yes - UCX_COMMIT=${UCX_COMMIT} - UCC_COMMIT=${UCC_COMMIT} + UCX_COMMIT=${_UCX_COMMIT} + UCC_COMMIT=${_UCC_COMMIT} ;; pytorch-linux-bionic-cuda11.7-cudnn8-py3-gcc7) CUDA_VERSION=11.7.0 @@ -161,8 +161,8 @@ case "$image" in DB=yes VISION=yes KATEX=yes - UCX_COMMIT=${UCX_COMMIT} - UCC_COMMIT=${UCC_COMMIT} + UCX_COMMIT=${_UCX_COMMIT} + UCC_COMMIT=${_UCC_COMMIT} ;; pytorch-linux-xenial-py3-clang5-asan) ANACONDA_PYTHON_VERSION=3.7 @@ -283,8 +283,6 @@ case "$image" in PROTOBUF=yes DB=yes VISION=yes - UCX_COMMIT=${UCX_COMMIT} - UCC_COMMIT=${UCC_COMMIT} ;; pytorch-linux-jammy-cuda11.7-cudnn8-py3.8-clang12) ANACONDA_PYTHON_VERSION=3.8 @@ -294,8 +292,6 @@ case "$image" in PROTOBUF=yes DB=yes VISION=yes - UCX_COMMIT=${UCX_COMMIT} - UCC_COMMIT=${UCC_COMMIT} ;; *) # Catch-all for builds that are not hardcoded. diff --git a/.circleci/docker/common/install_ucc.sh b/.circleci/docker/common/install_ucc.sh index a7b90286a0fb6..4d691ebb5e9ed 100755 --- a/.circleci/docker/common/install_ucc.sh +++ b/.circleci/docker/common/install_ucc.sh @@ -2,6 +2,12 @@ set -ex +if [[ -d "/usr/local/cuda/" ]]; then + with_cuda=/usr/local/cuda/ +else + with_cuda=no +fi + function install_ucx() { set -ex git clone --recursive https://github.com/openucx/ucx.git @@ -12,6 +18,7 @@ function install_ucx() { ./autogen.sh ./configure --prefix=$UCX_HOME \ --enable-mt \ + --with-cuda=$with_cuda \ --enable-profiling \ --enable-stats time make -j @@ -29,7 +36,7 @@ function install_ucc() { git submodule update --init --recursive ./autogen.sh - ./configure --prefix=$UCC_HOME --with-ucx=$UCX_HOME --with-nccl=no + ./configure --prefix=$UCC_HOME --with-ucx=$UCX_HOME --with-nccl=no --with-cuda=$with_cuda time make -j sudo make install diff --git a/.jenkins/pytorch/build.sh b/.jenkins/pytorch/build.sh index e258c8e9b6b15..a215459fcc7e1 100755 --- a/.jenkins/pytorch/build.sh +++ b/.jenkins/pytorch/build.sh @@ -45,7 +45,9 @@ fi if [[ "$BUILD_ENVIRONMENT" == *cuda11* ]]; then # enable split torch_cuda build option in CMake export BUILD_SPLIT_CUDA=ON - if [[ "$BUILD_ENVIRONMENT" != *cuda11.3* ]]; then + if [[ "$BUILD_ENVIRONMENT" != *cuda11.3* && "$BUILD_ENVIRONMENT" != *clang* ]]; then + # TODO: there is a linking issue when building with UCC using clang, + # disable it for now and to be fix later. export USE_UCC=1 export USE_SYSTEM_UCC=1 fi diff --git a/test/cpp/c10d/CMakeLists.txt b/test/cpp/c10d/CMakeLists.txt index d50c6c4f8ef41..89c6b9155f5b7 100644 --- a/test/cpp/c10d/CMakeLists.txt +++ b/test/cpp/c10d/CMakeLists.txt @@ -54,6 +54,19 @@ if(USE_CUDA) install(TARGETS c10d_cuda_test DESTINATION lib) endif() endif() + if(USE_UCC AND USE_C10D_UCC) + # UCC is a private dependency of libtorch, but the tests include some + # private headers of libtorch, which in turn include UCC. As a hacky + # alternative to making UCC a public dependency of libtorch, we make it + # a private dependency of the tests as well. + c10d_add_test( + ProcessGroupUCCTest.cpp + torch_cpu c10d_cuda_test gtest_main __caffe2_ucc) + if(INSTALL_TEST) + install(TARGETS ProcessGroupUCCTest DESTINATION bin) + install(TARGETS c10d_cuda_test DESTINATION lib) + endif() + endif() else() if(USE_GLOO AND USE_C10D_GLOO) c10d_add_test(ProcessGroupGlooTest.cpp torch_cpu gtest_main) diff --git a/test/cpp/c10d/ProcessGroupUCCTest.cpp b/test/cpp/c10d/ProcessGroupUCCTest.cpp new file mode 100644 index 0000000000000..a31e990536e10 --- /dev/null +++ b/test/cpp/c10d/ProcessGroupUCCTest.cpp @@ -0,0 +1,35 @@ +#include +#include + +#include +#include + +using namespace c10d; + +TEST(ProcessGroupUCCTest, testTrim) { + std::vector> tests = { + {" allreduce ", "allreduce"}, + {"\tallgather", "allgather"}, + {"send\n", "send"}, + }; + for (auto entry : tests) { + ASSERT_EQ(trim(entry.first), entry.second); + } +} + +TEST(ProcessGroupUCCTest, testToLower) { + std::vector> tests = { + {"AllReduce", "allreduce"}, + {"ALLGATHER", "allgather"}, + {"send", "send"}, + }; + for (auto entry : tests) { + ASSERT_EQ(tolower(entry.first), entry.second); + } +} + +TEST(ProcessGroupUCCTest, testParseList) { + std::string input = "\tAllReduce, ALLGATHER, send\n"; + std::vector expect{"allreduce", "allgather", "send"}; + ASSERT_EQ(parse_list(input), expect); +} diff --git a/torch/csrc/distributed/c10d/ProcessGroupUCC.cpp b/torch/csrc/distributed/c10d/ProcessGroupUCC.cpp index 191ba4b2ddd75..fcae4e08e637f 100644 --- a/torch/csrc/distributed/c10d/ProcessGroupUCC.cpp +++ b/torch/csrc/distributed/c10d/ProcessGroupUCC.cpp @@ -5,6 +5,8 @@ #include #include #include +#include +#include namespace c10d { @@ -106,19 +108,16 @@ struct torch_ucc_config_t { bool enable_health_check; } torch_ucc_config; -// TODO: support UCC_BLOCKING_WAIT that applies to all collectives. -std::map torch_ucc_envs_map = { - {"TORCH_UCC_ALLGATHER_BLOCKING_WAIT", "0"}, - {"TORCH_UCC_ALLGATHER_BASE_BLOCKING_WAIT", "0"}, - {"TORCH_UCC_ALLREDUCE_BLOCKING_WAIT", "0"}, - {"TORCH_UCC_ALLTOALL_BLOCKING_WAIT", "0"}, - {"TORCH_UCC_BCAST_BLOCKING_WAIT", "0"}, - {"TORCH_UCC_GATHER_BLOCKING_WAIT", "0"}, - {"TORCH_UCC_REDUCE_BLOCKING_WAIT", "0"}, - {"TORCH_UCC_REDUCE_SCATTER_BLOCKING_WAIT", "0"}, - {"TORCH_UCC_SCATTER_BLOCKING_WAIT", "0"}, - {"TORCH_UCC_SEND_BLOCKING_WAIT", "0"}, - {"TORCH_UCC_RECV_BLOCKING_WAIT", "0"}, +std::unordered_map torch_ucc_envs_map = { + // TORCH_UCC_BLOCKING_WAIT allowed syntax: + // - TORCH_UCC_BLOCKING_WAIT=none --> blocking wait completely disabled + // - TORCH_UCC_BLOCKING_WAIT=all --> blocking wait completely enabled + // - TORCH_UCC_BLOCKING_WAIT=allreduce,send,recv --> blocking wait enabled + // on selected operations + // Supported operations: + // [allgather,allgather_base,allreduce,alltoall,broadcast, + // gather,reduce,reduce_scatter,scatter,send,recv] + {"TORCH_UCC_BLOCKING_WAIT", "none"}, {"TORCH_UCC_USE_FUTURE", "1"}, {"TORCH_UCC_PROFILING_ENABLE", "0"}, @@ -128,11 +127,42 @@ std::map torch_ucc_envs_map = { {"TORCH_UCC_ENABLE_COMMS_LOGGER", "0"}, }; +std::vector parse_blocking_wait(std::string op_list_string) { + const static std::unordered_map str2op = { + {"allgather", OpType::ALLGATHER}, + {"allgather_base", OpType::_ALLGATHER_BASE}, + {"allreduce", OpType::ALLREDUCE}, + {"alltoall_base", OpType::ALLTOALL_BASE}, + {"broadcast", OpType::BROADCAST}, + {"gather", OpType::GATHER}, + {"reduce", OpType::REDUCE}, + {"reduce_scatter", OpType::REDUCE_SCATTER}, + {"scatter", OpType::SCATTER}, + {"send", OpType::SEND}, + {"recv", OpType::RECV}, + }; + auto op_list = parse_list(op_list_string); + if (op_list == std::vector{"none"}) { + return {}; + } + std::vector result; + if (op_list == std::vector{"all"}) { + for (auto entry : str2op) { + result.push_back(entry.second); + } + } else { + for (auto op_string : op_list) { + result.push_back(str2op.at(op_string)); + } + } + return result; +} + } // namespace -void read_confg() { +void read_config() { // default configuration - torch_ucc_config.blocking_wait.fill(true); + torch_ucc_config.blocking_wait.fill(false); torch_ucc_config.enable_profiling = false; torch_ucc_config.use_future = true; torch_ucc_config.shared_comm = false; @@ -149,24 +179,10 @@ void read_confg() { } } -#define BUILD_BLOCKING_CFG(op, str) \ - (torch_ucc_config.blocking_wait[(std::uint8_t)op] = \ - std::stoi(torch_ucc_envs_map.at(str))) - - BUILD_BLOCKING_CFG(OpType::ALLGATHER, "TORCH_UCC_ALLGATHER_BLOCKING_WAIT"); - BUILD_BLOCKING_CFG( - OpType::_ALLGATHER_BASE, "TORCH_UCC_ALLGATHER_BASE_BLOCKING_WAIT"); - BUILD_BLOCKING_CFG(OpType::ALLREDUCE, "TORCH_UCC_ALLREDUCE_BLOCKING_WAIT"); - BUILD_BLOCKING_CFG(OpType::ALLTOALL_BASE, "TORCH_UCC_ALLTOALL_BLOCKING_WAIT"); - BUILD_BLOCKING_CFG(OpType::BROADCAST, "TORCH_UCC_BCAST_BLOCKING_WAIT"); - BUILD_BLOCKING_CFG(OpType::GATHER, "TORCH_UCC_GATHER_BLOCKING_WAIT"); - BUILD_BLOCKING_CFG(OpType::REDUCE, "TORCH_UCC_REDUCE_BLOCKING_WAIT"); - BUILD_BLOCKING_CFG( - OpType::REDUCE_SCATTER, "TORCH_UCC_REDUCE_SCATTER_BLOCKING_WAIT"); - BUILD_BLOCKING_CFG(OpType::SCATTER, "TORCH_UCC_SCATTER_BLOCKING_WAIT"); - BUILD_BLOCKING_CFG(OpType::SEND, "TORCH_UCC_SEND_BLOCKING_WAIT"); - BUILD_BLOCKING_CFG(OpType::RECV, "TORCH_UCC_RECV_BLOCKING_WAIT"); -#undef BUILD_BLOCKING_CFG + auto blocking_wait_str = torch_ucc_envs_map.at("TORCH_UCC_BLOCKING_WAIT"); + for (auto op : parse_blocking_wait(blocking_wait_str)) { + torch_ucc_config.blocking_wait[(std::uint8_t)op] = true; + } torch_ucc_config.use_future = std::stoi(torch_ucc_envs_map.at("TORCH_UCC_USE_FUTURE")); @@ -700,7 +716,7 @@ ProcessGroupUCC::ProcessGroupUCC( int size, std::chrono::duration timeout) : ProcessGroup(rank, size), timeout_(timeout) { - c10::call_once(torch_ucc_config.flag, read_confg); + c10::call_once(torch_ucc_config.flag, read_config); oob = std::make_shared(); oob->rank = rank; oob->size = size; diff --git a/torch/csrc/distributed/c10d/UCCUtils.cpp b/torch/csrc/distributed/c10d/UCCUtils.cpp index 37cd829122f97..558568baf7567 100644 --- a/torch/csrc/distributed/c10d/UCCUtils.cpp +++ b/torch/csrc/distributed/c10d/UCCUtils.cpp @@ -3,6 +3,11 @@ #include #include +#include +#include +#include +#include + namespace c10d { namespace { diff --git a/torch/csrc/distributed/c10d/UCCUtils.hpp b/torch/csrc/distributed/c10d/UCCUtils.hpp index 70cef1d7f99a4..bc1bb1dde3a96 100644 --- a/torch/csrc/distributed/c10d/UCCUtils.hpp +++ b/torch/csrc/distributed/c10d/UCCUtils.hpp @@ -188,6 +188,39 @@ ucc_status_t oob_allgather_test(void* req); ucc_status_t oob_allgather_free(void* req); +// trim: remove spaces before and after the string view +// implementation borrowed from https://stackoverflow.com/a/17976541 +inline c10::string_view trim(c10::string_view s) { + auto wsfront = std::find_if_not( + s.begin(), s.end(), [](int c) { return std::isspace(c); }); + auto wsback = std::find_if_not(s.rbegin(), s.rend(), [](int c) { + return std::isspace(c); + }).base(); + return ( + wsback <= wsfront ? "" : s.substr(wsfront - s.begin(), wsback - wsfront)); +} + +inline std::string tolower(c10::string_view s) { + std::string result; + result.reserve(s.size()); + for (auto c : s) { + result.push_back(std::tolower(c)); + } + return result; +} + +inline std::vector parse_list(std::string list) { + std::vector result; + list = tolower(trim(list)); + while (!list.empty()) { + const auto end_pos = list.find_first_of(','); + const auto token = trim(list.substr(0, end_pos)); + result.push_back(std::string(token)); + list = (end_pos != c10::string_view::npos) ? list.substr(end_pos + 1) : ""; + } + return result; +} + } // namespace c10d #endif // USE_C10D_UCC