Skip to content

Commit

Permalink
New TORCH_UCC_BLOCKING_WAIT env variable (pytorch#81791)
Browse files Browse the repository at this point in the history
Cherry-pick of facebookresearch/torch_ucc#95.

I recommend waiting until pytorch#81583 is merged first, so the CI is checking if this PR compiles correctly.

Marking this as a draft for now, will change to "ready for review" once pytorch#81583 merged.
Pull Request resolved: pytorch#81791
Approved by: https://github.com/kwen2501
  • Loading branch information
zasdfgbnm authored and pytorchmergebot committed Aug 25, 2022
1 parent 85f82f7 commit a4a55f5
Show file tree
Hide file tree
Showing 8 changed files with 153 additions and 46 deletions.
16 changes: 6 additions & 10 deletions .circleci/docker/build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,8 @@ if [[ "$image" == *xenial* ]] || [[ "$image" == *bionic* ]]; then
fi

TRAVIS_DL_URL_PREFIX="https://s3.amazonaws.com/travis-python-archives/binaries/ubuntu/14.04/x86_64"
UCX_COMMIT=v1.13.x
UCC_COMMIT=a7bda274b10f8adf5bb729f01da064f4e735fb23
_UCX_COMMIT=31e74cac7bee0ef66bef2af72e7d86d9c282e5ab
_UCC_COMMIT=12944da33f911daf505d9bbc51411233d0ed85e1

# It's annoying to rename jobs every time you want to rewrite a
# configuration, so we hardcode everything here rather than do it
Expand Down Expand Up @@ -149,8 +149,8 @@ case "$image" in
DB=yes
VISION=yes
KATEX=yes
UCX_COMMIT=${UCX_COMMIT}
UCC_COMMIT=${UCC_COMMIT}
UCX_COMMIT=${_UCX_COMMIT}
UCC_COMMIT=${_UCC_COMMIT}
;;
pytorch-linux-bionic-cuda11.7-cudnn8-py3-gcc7)
CUDA_VERSION=11.7.0
Expand All @@ -161,8 +161,8 @@ case "$image" in
DB=yes
VISION=yes
KATEX=yes
UCX_COMMIT=${UCX_COMMIT}
UCC_COMMIT=${UCC_COMMIT}
UCX_COMMIT=${_UCX_COMMIT}
UCC_COMMIT=${_UCC_COMMIT}
;;
pytorch-linux-xenial-py3-clang5-asan)
ANACONDA_PYTHON_VERSION=3.7
Expand Down Expand Up @@ -283,8 +283,6 @@ case "$image" in
PROTOBUF=yes
DB=yes
VISION=yes
UCX_COMMIT=${UCX_COMMIT}
UCC_COMMIT=${UCC_COMMIT}
;;
pytorch-linux-jammy-cuda11.7-cudnn8-py3.8-clang12)
ANACONDA_PYTHON_VERSION=3.8
Expand All @@ -294,8 +292,6 @@ case "$image" in
PROTOBUF=yes
DB=yes
VISION=yes
UCX_COMMIT=${UCX_COMMIT}
UCC_COMMIT=${UCC_COMMIT}
;;
*)
# Catch-all for builds that are not hardcoded.
Expand Down
9 changes: 8 additions & 1 deletion .circleci/docker/common/install_ucc.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@

set -ex

if [[ -d "/usr/local/cuda/" ]]; then
with_cuda=/usr/local/cuda/
else
with_cuda=no
fi

function install_ucx() {
set -ex
git clone --recursive https://github.com/openucx/ucx.git
Expand All @@ -12,6 +18,7 @@ function install_ucx() {
./autogen.sh
./configure --prefix=$UCX_HOME \
--enable-mt \
--with-cuda=$with_cuda \
--enable-profiling \
--enable-stats
time make -j
Expand All @@ -29,7 +36,7 @@ function install_ucc() {
git submodule update --init --recursive

./autogen.sh
./configure --prefix=$UCC_HOME --with-ucx=$UCX_HOME --with-nccl=no
./configure --prefix=$UCC_HOME --with-ucx=$UCX_HOME --with-nccl=no --with-cuda=$with_cuda
time make -j
sudo make install

Expand Down
4 changes: 3 additions & 1 deletion .jenkins/pytorch/build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,9 @@ fi
if [[ "$BUILD_ENVIRONMENT" == *cuda11* ]]; then
# enable split torch_cuda build option in CMake
export BUILD_SPLIT_CUDA=ON
if [[ "$BUILD_ENVIRONMENT" != *cuda11.3* ]]; then
if [[ "$BUILD_ENVIRONMENT" != *cuda11.3* && "$BUILD_ENVIRONMENT" != *clang* ]]; then
# TODO: there is a linking issue when building with UCC using clang,
# disable it for now and to be fix later.
export USE_UCC=1
export USE_SYSTEM_UCC=1
fi
Expand Down
13 changes: 13 additions & 0 deletions test/cpp/c10d/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,19 @@ if(USE_CUDA)
install(TARGETS c10d_cuda_test DESTINATION lib)
endif()
endif()
if(USE_UCC AND USE_C10D_UCC)
# UCC is a private dependency of libtorch, but the tests include some
# private headers of libtorch, which in turn include UCC. As a hacky
# alternative to making UCC a public dependency of libtorch, we make it
# a private dependency of the tests as well.
c10d_add_test(
ProcessGroupUCCTest.cpp
torch_cpu c10d_cuda_test gtest_main __caffe2_ucc)
if(INSTALL_TEST)
install(TARGETS ProcessGroupUCCTest DESTINATION bin)
install(TARGETS c10d_cuda_test DESTINATION lib)
endif()
endif()
else()
if(USE_GLOO AND USE_C10D_GLOO)
c10d_add_test(ProcessGroupGlooTest.cpp torch_cpu gtest_main)
Expand Down
35 changes: 35 additions & 0 deletions test/cpp/c10d/ProcessGroupUCCTest.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
#include <gtest/gtest.h>
#include <torch/csrc/distributed/c10d/UCCUtils.hpp>

#include <utility>
#include <vector>

using namespace c10d;

TEST(ProcessGroupUCCTest, testTrim) {
std::vector<std::pair<std::string, std::string>> tests = {
{" allreduce ", "allreduce"},
{"\tallgather", "allgather"},
{"send\n", "send"},
};
for (auto entry : tests) {
ASSERT_EQ(trim(entry.first), entry.second);
}
}

TEST(ProcessGroupUCCTest, testToLower) {
std::vector<std::pair<std::string, std::string>> tests = {
{"AllReduce", "allreduce"},
{"ALLGATHER", "allgather"},
{"send", "send"},
};
for (auto entry : tests) {
ASSERT_EQ(tolower(entry.first), entry.second);
}
}

TEST(ProcessGroupUCCTest, testParseList) {
std::string input = "\tAllReduce, ALLGATHER, send\n";
std::vector<std::string> expect{"allreduce", "allgather", "send"};
ASSERT_EQ(parse_list(input), expect);
}
84 changes: 50 additions & 34 deletions torch/csrc/distributed/c10d/ProcessGroupUCC.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
#include <c10d/UCCUtils.hpp>
#include <list>
#include <memory>
#include <unordered_map>
#include <unordered_set>

namespace c10d {

Expand Down Expand Up @@ -106,19 +108,16 @@ struct torch_ucc_config_t {
bool enable_health_check;
} torch_ucc_config;

// TODO: support UCC_BLOCKING_WAIT that applies to all collectives.
std::map<std::string, std::string> torch_ucc_envs_map = {
{"TORCH_UCC_ALLGATHER_BLOCKING_WAIT", "0"},
{"TORCH_UCC_ALLGATHER_BASE_BLOCKING_WAIT", "0"},
{"TORCH_UCC_ALLREDUCE_BLOCKING_WAIT", "0"},
{"TORCH_UCC_ALLTOALL_BLOCKING_WAIT", "0"},
{"TORCH_UCC_BCAST_BLOCKING_WAIT", "0"},
{"TORCH_UCC_GATHER_BLOCKING_WAIT", "0"},
{"TORCH_UCC_REDUCE_BLOCKING_WAIT", "0"},
{"TORCH_UCC_REDUCE_SCATTER_BLOCKING_WAIT", "0"},
{"TORCH_UCC_SCATTER_BLOCKING_WAIT", "0"},
{"TORCH_UCC_SEND_BLOCKING_WAIT", "0"},
{"TORCH_UCC_RECV_BLOCKING_WAIT", "0"},
std::unordered_map<std::string, std::string> torch_ucc_envs_map = {
// TORCH_UCC_BLOCKING_WAIT allowed syntax:
// - TORCH_UCC_BLOCKING_WAIT=none --> blocking wait completely disabled
// - TORCH_UCC_BLOCKING_WAIT=all --> blocking wait completely enabled
// - TORCH_UCC_BLOCKING_WAIT=allreduce,send,recv --> blocking wait enabled
// on selected operations
// Supported operations:
// [allgather,allgather_base,allreduce,alltoall,broadcast,
// gather,reduce,reduce_scatter,scatter,send,recv]
{"TORCH_UCC_BLOCKING_WAIT", "none"},

{"TORCH_UCC_USE_FUTURE", "1"},
{"TORCH_UCC_PROFILING_ENABLE", "0"},
Expand All @@ -128,11 +127,42 @@ std::map<std::string, std::string> torch_ucc_envs_map = {
{"TORCH_UCC_ENABLE_COMMS_LOGGER", "0"},
};

std::vector<OpType> parse_blocking_wait(std::string op_list_string) {
const static std::unordered_map<std::string, OpType> str2op = {
{"allgather", OpType::ALLGATHER},
{"allgather_base", OpType::_ALLGATHER_BASE},
{"allreduce", OpType::ALLREDUCE},
{"alltoall_base", OpType::ALLTOALL_BASE},
{"broadcast", OpType::BROADCAST},
{"gather", OpType::GATHER},
{"reduce", OpType::REDUCE},
{"reduce_scatter", OpType::REDUCE_SCATTER},
{"scatter", OpType::SCATTER},
{"send", OpType::SEND},
{"recv", OpType::RECV},
};
auto op_list = parse_list(op_list_string);
if (op_list == std::vector<std::string>{"none"}) {
return {};
}
std::vector<OpType> result;
if (op_list == std::vector<std::string>{"all"}) {
for (auto entry : str2op) {
result.push_back(entry.second);
}
} else {
for (auto op_string : op_list) {
result.push_back(str2op.at(op_string));
}
}
return result;
}

} // namespace

void read_confg() {
void read_config() {
// default configuration
torch_ucc_config.blocking_wait.fill(true);
torch_ucc_config.blocking_wait.fill(false);
torch_ucc_config.enable_profiling = false;
torch_ucc_config.use_future = true;
torch_ucc_config.shared_comm = false;
Expand All @@ -149,24 +179,10 @@ void read_confg() {
}
}

#define BUILD_BLOCKING_CFG(op, str) \
(torch_ucc_config.blocking_wait[(std::uint8_t)op] = \
std::stoi(torch_ucc_envs_map.at(str)))

BUILD_BLOCKING_CFG(OpType::ALLGATHER, "TORCH_UCC_ALLGATHER_BLOCKING_WAIT");
BUILD_BLOCKING_CFG(
OpType::_ALLGATHER_BASE, "TORCH_UCC_ALLGATHER_BASE_BLOCKING_WAIT");
BUILD_BLOCKING_CFG(OpType::ALLREDUCE, "TORCH_UCC_ALLREDUCE_BLOCKING_WAIT");
BUILD_BLOCKING_CFG(OpType::ALLTOALL_BASE, "TORCH_UCC_ALLTOALL_BLOCKING_WAIT");
BUILD_BLOCKING_CFG(OpType::BROADCAST, "TORCH_UCC_BCAST_BLOCKING_WAIT");
BUILD_BLOCKING_CFG(OpType::GATHER, "TORCH_UCC_GATHER_BLOCKING_WAIT");
BUILD_BLOCKING_CFG(OpType::REDUCE, "TORCH_UCC_REDUCE_BLOCKING_WAIT");
BUILD_BLOCKING_CFG(
OpType::REDUCE_SCATTER, "TORCH_UCC_REDUCE_SCATTER_BLOCKING_WAIT");
BUILD_BLOCKING_CFG(OpType::SCATTER, "TORCH_UCC_SCATTER_BLOCKING_WAIT");
BUILD_BLOCKING_CFG(OpType::SEND, "TORCH_UCC_SEND_BLOCKING_WAIT");
BUILD_BLOCKING_CFG(OpType::RECV, "TORCH_UCC_RECV_BLOCKING_WAIT");
#undef BUILD_BLOCKING_CFG
auto blocking_wait_str = torch_ucc_envs_map.at("TORCH_UCC_BLOCKING_WAIT");
for (auto op : parse_blocking_wait(blocking_wait_str)) {
torch_ucc_config.blocking_wait[(std::uint8_t)op] = true;
}

torch_ucc_config.use_future =
std::stoi(torch_ucc_envs_map.at("TORCH_UCC_USE_FUTURE"));
Expand Down Expand Up @@ -700,7 +716,7 @@ ProcessGroupUCC::ProcessGroupUCC(
int size,
std::chrono::duration<float> timeout)
: ProcessGroup(rank, size), timeout_(timeout) {
c10::call_once(torch_ucc_config.flag, read_confg);
c10::call_once(torch_ucc_config.flag, read_config);
oob = std::make_shared<torch_ucc_oob_coll_info_t>();
oob->rank = rank;
oob->size = size;
Expand Down
5 changes: 5 additions & 0 deletions torch/csrc/distributed/c10d/UCCUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,11 @@
#include <c10d/UCCTracing.hpp>
#include <c10d/UCCUtils.hpp>

#include <cctype>
#include <string>
#include <unordered_map>
#include <unordered_set>

namespace c10d {

namespace {
Expand Down
33 changes: 33 additions & 0 deletions torch/csrc/distributed/c10d/UCCUtils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,39 @@ ucc_status_t oob_allgather_test(void* req);

ucc_status_t oob_allgather_free(void* req);

// trim: remove spaces before and after the string view
// implementation borrowed from https://stackoverflow.com/a/17976541
inline c10::string_view trim(c10::string_view s) {
auto wsfront = std::find_if_not(
s.begin(), s.end(), [](int c) { return std::isspace(c); });
auto wsback = std::find_if_not(s.rbegin(), s.rend(), [](int c) {
return std::isspace(c);
}).base();
return (
wsback <= wsfront ? "" : s.substr(wsfront - s.begin(), wsback - wsfront));
}

inline std::string tolower(c10::string_view s) {
std::string result;
result.reserve(s.size());
for (auto c : s) {
result.push_back(std::tolower(c));
}
return result;
}

inline std::vector<std::string> parse_list(std::string list) {
std::vector<std::string> result;
list = tolower(trim(list));
while (!list.empty()) {
const auto end_pos = list.find_first_of(',');
const auto token = trim(list.substr(0, end_pos));
result.push_back(std::string(token));
list = (end_pos != c10::string_view::npos) ? list.substr(end_pos + 1) : "";
}
return result;
}

} // namespace c10d

#endif // USE_C10D_UCC

0 comments on commit a4a55f5

Please sign in to comment.