Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dev support graph 2d parallel on npu #10566

Draft
wants to merge 5 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cmake/oneflow.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -547,6 +547,7 @@ if(BUILD_PYTHON)
PATTERN "oneflow/core/register/register_manager.h"
PATTERN "oneflow/core/register/runtime_register_desc.h"
PATTERN "oneflow/core/register/tensor_slice_view.h"
PATTERN "oneflow/core/register/tensor_slice_copier.h"
PATTERN "oneflow/core/ndarray/xpu_util.h"
PATTERN "oneflow/core/rpc/include/base.h"
PATTERN "oneflow/core/rpc/include/ctrl.h"
Expand All @@ -558,6 +559,7 @@ if(BUILD_PYTHON)
PATTERN "oneflow/core/operator/operator.h"
PATTERN "oneflow/core/operator/operator_util.h"
PATTERN "oneflow/core/operator/op_conf_util.h"
PATTERN "oneflow/core/operator/nccl_send_recv_boxing_op_util.h"
PATTERN "oneflow/core/graph/compute_task_node.h"
PATTERN "oneflow/core/graph/copy_task_node.h"
PATTERN "oneflow/core/graph/exec_graph.h"
Expand Down
21 changes: 18 additions & 3 deletions oneflow/core/graph/task_graph.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -880,14 +880,16 @@ DEFINE_BLD_SUB_TASK_GRAPH_METHOD(BldSubTskGphByBoxing) {
if (device_type != DeviceType::kCPU
&& device_type2sub_tsk_gph_builder_.find(device_type)
!= device_type2sub_tsk_gph_builder_.end()) {
status = CHECK_JUST( // NOLINT
auto maybe_status = // NOLINT
device_type2sub_tsk_gph_builder_ // NOLINT
.at(device_type) // NOLINT
->Build(sub_tsk_gph_builder_ctx_.get(), in_nodes, &out_nodes, // NOLINT
&sorted_ctrl_tasks, src_parallel_desc, dst_parallel_desc, lbi, // NOLINT
blob_desc, src_nd_sbp, dst_nd_sbp, // NOLINT
*(CHECK_JUST(src_op_node->op().GetOpTimeShape()).get()))); // NOLINT
} else {
*(CHECK_JUST(src_op_node->op().GetOpTimeShape()).get())); // NOLINT
if (maybe_status.IsOk()) { status = CHECK_JUST(maybe_status); }
}
if (!status) {
status = CHECK_JUST(hierarchical_sub_tsk_gph_builder_->Build(
sub_tsk_gph_builder_ctx_.get(), in_nodes, &out_nodes, &sorted_ctrl_tasks,
src_parallel_desc, dst_parallel_desc, lbi, blob_desc, src_nd_sbp, dst_nd_sbp,
Expand Down Expand Up @@ -1052,6 +1054,12 @@ Maybe<void> GlobalTaskGraph::Init() {
OpGraph* op_graph = Singleton<OpGraph>::Get();
sub_tsk_gph_builder_ctx_.reset(new SubTskGphBuilderCtx(this));
boxing_logger_ = CreateBoxingLogger();
// Register the corresponding task graph builder based on the device type and store them to map
const auto* global_device_type_create_sub_tsk_gph_builder_fn =
GlobalDeviceType2CreateSubTskGphBuilderFn();
for (const auto& pair : *global_device_type_create_sub_tsk_gph_builder_fn) {
device_type2sub_tsk_gph_builder_.emplace(pair.first, pair.second());
}
hierarchical_sub_tsk_gph_builder_.reset(new DispatchHierarchicalSubTskGphBuilder());
HashMap<const OpNode*, std::vector<CompTaskNode*>> op_node2sorted_comp_tasks;

Expand Down Expand Up @@ -1088,6 +1096,13 @@ Maybe<void> BoxingTaskGraph::Init(
OpGraph* op_graph = Singleton<OpGraph>::Get();
sub_tsk_gph_builder_ctx_.reset(new SubTskGphBuilderCtx(this));
boxing_logger_ = CreateBoxingLogger();
// Register the corresponding task graph builder based on the device type and store them to map
const auto* global_device_type_create_sub_tsk_gph_builder_fn =
GlobalDeviceType2CreateSubTskGphBuilderFn();
for (const auto& pair : *global_device_type_create_sub_tsk_gph_builder_fn) {
device_type2sub_tsk_gph_builder_.emplace(pair.first, pair.second());
}

hierarchical_sub_tsk_gph_builder_.reset(new DispatchHierarchicalSubTskGphBuilder());

const auto& TryCreateSortedCompTaskNodes = [&](const OpNode* op_node) {
Expand Down
11 changes: 6 additions & 5 deletions oneflow/core/job/resource_desc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,12 @@ CollectiveBoxingConf ResourceDesc::collective_boxing_conf() const {
}

bool ResourceDesc::nccl_use_compute_stream() const {
#if defined(WITH_CUDA) && NCCL_VERSION_CODE > 2700
return resource_.nccl_use_compute_stream();
#else
return false;
#endif
// #if defined(WITH_CUDA) && NCCL_VERSION_CODE > 2700
// return resource_.nccl_use_compute_stream();
// #else
// return false;
// #endif
return true;
}

void ResourceDesc::DumpCudnnConf(const JobConfigProto& job_conf) {
Expand Down
10 changes: 4 additions & 6 deletions oneflow/core/job_rewriter/insert_nccl_logical_op_pass.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ limitations under the License.
#include "oneflow/core/auto_parallel/auto_memory.h"
#include "oneflow/core/common/util.h"
#include "oneflow/core/job/nd_sbp_util.h"
#ifdef WITH_CUDA
// #ifdef WITH_CUDA
#include "oneflow/core/framework/framework.h"
#include "oneflow/core/framework/nd_sbp.h"
#include "oneflow/core/framework/instructions_builder.h"
Expand Down Expand Up @@ -146,7 +146,8 @@ void FindAllConnectedSubgraphForGpuExecOrder(std::vector<HashSet<const OpNode*>>
CHECK(visited.insert(seed_node).second);
const ParallelDesc& seed_parallel_desc = seed_node->parallel_desc();
// NOTE(chengcheng): ONLY consider GPU op and parallel num > 1.
if (seed_parallel_desc.device_type() != DeviceType::kCUDA) { continue; }
// if (seed_parallel_desc.device_type() != DeviceType::kCUDA) { continue; }
if (seed_parallel_desc.device_type() != DeviceType::kNPU) { continue; }
if (seed_parallel_desc.parallel_num() <= 1) { continue; }
// NOTE(chengcheng): using fastest time shape for merge acc into bw subgraph.
if (!SharedPtrShapeEqual(GetOpNodeFastestTimeShape(seed_node), seed_time_shape)) { continue; }
Expand Down Expand Up @@ -486,7 +487,6 @@ bool TryBuildNcclLogicalOpConf(OperatorConf* ret, const OpNode* src_node, const

int64_t scope_symbol_id = CHECK_JUST(BuildScopeWithReducedParallelDesc(
src_node->op().op_conf().scope_symbol_id(), *src_reduced_parallel_desc));

if (src_reduced_hierarchy->NumAxes() == 1 && dst_reduced_hierarchy->NumAxes() == 1) {
return TryBuildNcclBy1DHierarchy(ret, src_reduced_nd_sbp->sbp_parallel(0),
dst_reduced_nd_sbp->sbp_parallel(0), lbn, scope_symbol_id,
Expand Down Expand Up @@ -786,7 +786,6 @@ Maybe<void> InsertNcclLogicalOpPass::Apply(const OpGraph& op_graph, JobBuilder*
} else {
auto_parallel::StraightenOpGraph(op_graph, &ordered_op_nodes);
}

HashMap<const OpNode*, int64_t> op_node2global_order;
for (int32_t global_order = 0; global_order < ordered_op_nodes.size(); global_order++) {
op_node2global_order.emplace(ordered_op_nodes[global_order], global_order);
Expand Down Expand Up @@ -844,7 +843,6 @@ Maybe<void> InsertNcclLogicalOpPass::Apply(const OpGraph& op_graph, JobBuilder*

for (auto& pair : placement2subgraphs) {
PlacementNcclSubGraghsInfo& info = pair.second;

// NOTE(chengcheng): insert nccl ops for each subgraph
int64_t stream_offset = 0;
int64_t total_op_num = 0;
Expand Down Expand Up @@ -883,4 +881,4 @@ REGISTER_JOB_PASS("InsertNcclLogicalOpPass", InsertNcclLogicalOpPass);

} // namespace oneflow

#endif // WITH_CUDA
// #endif // WITH_CUDA
4 changes: 2 additions & 2 deletions oneflow/core/job_rewriter/job_completer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ Maybe<void> JobCompleter::Complete(Job* job) {
compile_tc->Count("[GraphCompile]" + job_name + " SystemOpFillJobNamePass", 1, true);
JUST(JobPass4Name("DumpBlobParallelConfPass")(job, &job_pass_ctx));
compile_tc->Count("[GraphCompile]" + job_name + " DumpBlobParallelConfPass", 1, true);
#ifdef WITH_CUDA
// #ifdef WITH_CUDA
if (Singleton<ResourceDesc, ForSession>::Get()->nccl_use_compute_stream()) {
// NOTE(chengcheng): this pass need as last pass for insert correct op with nccl boxing.
JUST(JobPass4Name("InsertNcclLogicalOpPass")(job, &job_pass_ctx));
Expand All @@ -169,7 +169,7 @@ Maybe<void> JobCompleter::Complete(Job* job) {
JUST(JobPass4Name("DumpBlobParallelConfPass")(job, &job_pass_ctx));
compile_tc->Count("[GraphCompile]" + job_name + " DumpBlobParallelConfPass", 1, true);
}
#endif // WITH_CUDA
// #endif // WITH_CUDA
JUST(JobPass4Name("LogicalChainPass")(job, &job_pass_ctx));
JUST(JobPass4Name("DumpBlobParallelConfPass")(job, &job_pass_ctx));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
#ifdef WITH_CUDA
// #ifdef WITH_CUDA
#include "oneflow/core/auto_parallel/auto_memory.h"
#include "oneflow/core/job/nd_sbp_util.h"
#include "oneflow/core/framework/framework.h"
Expand Down Expand Up @@ -210,4 +210,4 @@ REGISTER_JOB_PASS("NcclLogicalChainStrictOrderPass", NcclLogicalChainStrictOrder

} // namespace oneflow

#endif // WITH_CUDA
// #endif // WITH_CUDA
4 changes: 2 additions & 2 deletions oneflow/core/job_rewriter/nccl_logical_op_fusion_pass.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
#ifdef WITH_CUDA
// #ifdef WITH_CUDA
#include "oneflow/core/auto_parallel/auto_memory.h"
#include "oneflow/core/job/nd_sbp_util.h"
#include "oneflow/core/framework/framework.h"
Expand Down Expand Up @@ -293,4 +293,4 @@ REGISTER_JOB_PASS("NcclLogicalOpFusionPass", NcclLogicalOpFusionPass);

} // namespace oneflow

#endif // WITH_CUDA
// #endif // WITH_CUDA