Skip to content

Commit

Permalink
apacheGH-45591: [C++][Acero] Refine hash join benchmark and remove op…
Browse files Browse the repository at this point in the history
…enmp from the project (apache#45593)

### Rationale for this change

See apache#45591 .

### What changes are included in this PR?

1. Replace the usage of openmp with arrow-native multi-threading primitives;
2. Remove all the occurrences of openmp from the project;
3. Support stats for build side rows in hash join benchmark, and update certain benchmark.

### Are these changes tested?

Manually tested.

### Are there any user-facing changes?

Removed a public CMake option but I think it shouldn't affect the user.

* GitHub Issue: apache#45591

Authored-by: Rossi Sun <[email protected]>
Signed-off-by: Rossi Sun <[email protected]>
  • Loading branch information
zanmato1984 authored and arashandishgar committed Feb 25, 2025
1 parent a8ea219 commit 69bfa48
Show file tree
Hide file tree
Showing 8 changed files with 34 additions and 52 deletions.
1 change: 0 additions & 1 deletion ci/scripts/cpp_build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,6 @@ else
-DARROW_BUILD_BENCHMARKS=${ARROW_BUILD_BENCHMARKS:-OFF} \
-DARROW_BUILD_EXAMPLES=${ARROW_BUILD_EXAMPLES:-OFF} \
-DARROW_BUILD_INTEGRATION=${ARROW_BUILD_INTEGRATION:-OFF} \
-DARROW_BUILD_OPENMP_BENCHMARKS=${ARROW_BUILD_OPENMP_BENCHMARKS:-OFF} \
-DARROW_BUILD_SHARED=${ARROW_BUILD_SHARED:-ON} \
-DARROW_BUILD_STATIC=${ARROW_BUILD_STATIC:-ON} \
-DARROW_BUILD_TESTS=${ARROW_BUILD_TESTS:-OFF} \
Expand Down
1 change: 0 additions & 1 deletion cpp/CMakePresets.json
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
"cacheVariables": {
"ARROW_BUILD_BENCHMARKS": "ON",
"ARROW_BUILD_BENCHMARKS_REFERENCE": "ON",
"ARROW_BUILD_OPENMP_BENCHMARKS": "ON",
"ARROW_BUILD_DETAILED_BENCHMARKS": "OFF",
"CMAKE_BUILD_TYPE": "RelWithDebInfo"
}
Expand Down
3 changes: 0 additions & 3 deletions cpp/cmake_modules/DefineOptions.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -243,9 +243,6 @@ takes precedence over ccache if a storage backend is configured" ON)
define_option(ARROW_BUILD_BENCHMARKS_REFERENCE
"Build the Arrow micro reference benchmarks" OFF)

define_option(ARROW_BUILD_OPENMP_BENCHMARKS
"Build the Arrow benchmarks that rely on OpenMP" OFF)

define_option(ARROW_BUILD_DETAILED_BENCHMARKS
"Build benchmarks that do a longer exploration of performance" OFF)

Expand Down
21 changes: 3 additions & 18 deletions cpp/src/arrow/acero/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -221,36 +221,21 @@ if(ARROW_BUILD_BENCHMARKS)

add_arrow_acero_benchmark(aggregate_benchmark SOURCES aggregate_benchmark.cc)

if(ARROW_BUILD_OPENMP_BENCHMARKS)
find_package(OpenMP REQUIRED)
add_arrow_acero_benchmark(hash_join_benchmark
EXTRA_LINK_LIBS
OpenMP::OpenMP_CXX
SOURCES
hash_join_benchmark.cc)
if(MSVC)
target_compile_options(arrow-compute-hash-join-benchmark
PRIVATE "-openmp:experimental -openmp:llvm")
endif()
endif()
add_arrow_acero_benchmark(hash_join_benchmark SOURCES hash_join_benchmark.cc)

if(ARROW_BUILD_STATIC)
target_link_libraries(arrow-acero-expression-benchmark PUBLIC arrow_acero_static)
target_link_libraries(arrow-acero-filter-benchmark PUBLIC arrow_acero_static)
target_link_libraries(arrow-acero-project-benchmark PUBLIC arrow_acero_static)
target_link_libraries(arrow-acero-asof-join-benchmark PUBLIC arrow_acero_static)
target_link_libraries(arrow-acero-tpch-benchmark PUBLIC arrow_acero_static)
if(ARROW_BUILD_OPENMP_BENCHMARKS)
target_link_libraries(arrow-acero-hash-join-benchmark PUBLIC arrow_acero_static)
endif()
target_link_libraries(arrow-acero-hash-join-benchmark PUBLIC arrow_acero_static)
else()
target_link_libraries(arrow-acero-expression-benchmark PUBLIC arrow_acero_shared)
target_link_libraries(arrow-acero-filter-benchmark PUBLIC arrow_acero_shared)
target_link_libraries(arrow-acero-project-benchmark PUBLIC arrow_acero_shared)
target_link_libraries(arrow-acero-asof-join-benchmark PUBLIC arrow_acero_shared)
target_link_libraries(arrow-acero-tpch-benchmark PUBLIC arrow_acero_shared)
if(ARROW_BUILD_OPENMP_BENCHMARKS)
target_link_libraries(arrow-acero-hash-join-benchmark PUBLIC arrow_acero_shared)
endif()
target_link_libraries(arrow-acero-hash-join-benchmark PUBLIC arrow_acero_shared)
endif()
endif()
2 changes: 1 addition & 1 deletion cpp/src/arrow/acero/accumulation_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ using arrow::compute::ExecBatch;

/// \brief A container that accumulates batches until they are ready to
/// be processed.
class AccumulationQueue {
class ARROW_ACERO_EXPORT AccumulationQueue {
public:
AccumulationQueue() : row_count_(0) {}
~AccumulationQueue() = default;
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/acero/hash_join.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ namespace acero {

using util::AccumulationQueue;

class HashJoinImpl {
class ARROW_ACERO_EXPORT HashJoinImpl {
public:
using OutputBatchCallback = std::function<Status(int64_t, ExecBatch)>;
using BuildFinishedCallback = std::function<Status(size_t)>;
Expand Down
54 changes: 28 additions & 26 deletions cpp/src/arrow/acero/hash_join_benchmark.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@
#include <cstdio>
#include <memory>

#include <omp.h>

namespace arrow {
namespace acero {
struct BenchmarkSettings {
Expand All @@ -56,6 +54,8 @@ struct BenchmarkSettings {
int var_length_max = 20; // Maximum length of any var length types

Expression residual_filter = literal(true);

bool stats_probe_rows = true;
};

class JoinBenchmark {
Expand Down Expand Up @@ -128,6 +128,7 @@ class JoinBenchmark {
for (ExecBatch& batch : r_batches_with_schema.batches)
r_batches_.InsertBatch(std::move(batch));

stats_.num_build_rows = settings.num_build_batches * settings.batch_size;
stats_.num_probe_rows = settings.num_probe_batches * settings.batch_size;

schema_mgr_ = std::make_unique<HashJoinSchema>();
Expand All @@ -141,14 +142,9 @@ class JoinBenchmark {
join_ = *HashJoinImpl::MakeSwiss();
}

omp_set_num_threads(settings.num_threads);
auto schedule_callback = [](std::function<Status(size_t)> func) -> Status {
#pragma omp task
{ DCHECK_OK(func(omp_get_thread_num())); }
return Status::OK();
};

scheduler_ = TaskScheduler::Make();
thread_pool_ = arrow::internal::GetCpuThreadPool();
DCHECK_OK(thread_pool_->SetCapacity(settings.num_threads));
DCHECK_OK(ctx_.Init(nullptr));

auto register_task_group_callback = [&](std::function<Status(size_t, int64_t)> task,
Expand All @@ -157,15 +153,15 @@ class JoinBenchmark {
};

auto start_task_group_callback = [&](int task_group_id, int64_t num_tasks) {
return scheduler_->StartTaskGroup(omp_get_thread_num(), task_group_id, num_tasks);
return scheduler_->StartTaskGroup(/*thread_id=*/0, task_group_id, num_tasks);
};

DCHECK_OK(join_->Init(
&ctx_, settings.join_type, settings.num_threads, &(schema_mgr_->proj_maps[0]),
&(schema_mgr_->proj_maps[1]), std::move(key_cmp), settings.residual_filter,
std::move(register_task_group_callback), std::move(start_task_group_callback),
[](int64_t, ExecBatch) { return Status::OK(); },
[](int64_t) { return Status::OK(); }));
[&](int64_t) { return Status::OK(); }));

task_group_probe_ = scheduler_->RegisterTaskGroup(
[this](size_t thread_index, int64_t task_id) -> Status {
Expand All @@ -178,25 +174,27 @@ class JoinBenchmark {
scheduler_->RegisterEnd();

DCHECK_OK(scheduler_->StartScheduling(
0 /*thread index*/, std::move(schedule_callback),
static_cast<int>(2 * settings.num_threads) /*concurrent tasks*/,
settings.num_threads == 1));
/*thread_id=*/0,
[&](std::function<Status(size_t)> task) -> Status {
return thread_pool_->Spawn([&, task]() { DCHECK_OK(task(thread_indexer_())); });
},
thread_pool_->GetCapacity(), settings.num_threads == 1));
}

void RunJoin() {
#pragma omp parallel
{
int tid = omp_get_thread_num();
#pragma omp single
DCHECK_OK(
join_->BuildHashTable(tid, std::move(r_batches_), [this](size_t thread_index) {
return scheduler_->StartTaskGroup(thread_index, task_group_probe_,
l_batches_.batch_count());
}));
}
DCHECK_OK(join_->BuildHashTable(
/*thread_id=*/0, std::move(r_batches_), [this](size_t thread_index) {
return scheduler_->StartTaskGroup(thread_index, task_group_probe_,
l_batches_.batch_count());
}));

thread_pool_->WaitForIdle();
}

std::unique_ptr<TaskScheduler> scheduler_;
ThreadIndexer thread_indexer_;
arrow::internal::ThreadPool* thread_pool_;

AccumulationQueue l_batches_;
AccumulationQueue r_batches_;
std::unique_ptr<HashJoinSchema> schema_mgr_;
Expand All @@ -205,6 +203,7 @@ class JoinBenchmark {
int task_group_probe_;

struct {
uint64_t num_build_rows;
uint64_t num_probe_rows;
} stats_;
};
Expand All @@ -219,11 +218,13 @@ static void HashJoinBasicBenchmarkImpl(benchmark::State& st,
st.ResumeTiming();
bm.RunJoin();
st.PauseTiming();
total_rows += bm.stats_.num_probe_rows;
total_rows += (settings.stats_probe_rows ? bm.stats_.num_probe_rows
: bm.stats_.num_build_rows);
}
st.ResumeTiming();
}
st.counters["rows/sec"] = benchmark::Counter(total_rows, benchmark::Counter::kIsRate);
st.counters["rows/sec"] =
benchmark::Counter(static_cast<double>(total_rows), benchmark::Counter::kIsRate);
}

template <typename... Args>
Expand Down Expand Up @@ -302,6 +303,7 @@ static void BM_HashJoinBasic_BuildParallelism(benchmark::State& st) {
settings.num_threads = static_cast<int>(st.range(0));
settings.num_build_batches = static_cast<int>(st.range(1));
settings.num_probe_batches = settings.num_threads;
settings.stats_probe_rows = false;

HashJoinBasicBenchmarkImpl(st, settings);
}
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/acero/swiss_join_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ class RowArrayAccessor {
// Read operations (row comparison, column decoding)
// can be called by multiple threads concurrently.
//
struct RowArray {
struct ARROW_ACERO_EXPORT RowArray {
RowArray() : is_initialized_(false), hardware_flags_(0) {}

Status InitIfNeeded(MemoryPool* pool, int64_t hardware_flags, const ExecBatch& batch);
Expand Down

0 comments on commit 69bfa48

Please sign in to comment.