Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix the allocator for Flow Graph critical tasks creating #1596

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions include/oneapi/tbb/detail/_flow_graph_impl.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright (c) 2005-2024 Intel Corporation
Copyright (c) 2005-2025 Intel Corporation

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -526,7 +526,8 @@ inline graph_task* prioritize_task(graph& g, graph_task& gt) {
//! priority queue, and a new critical task is created to take and execute a work item with
//! the highest known priority. The reference counting responsibility is transferred to
//! the new task.
d1::task* critical_task = gt.my_allocator.new_object<priority_task_selector>(g.my_priority_queue, gt.my_allocator);
d1::small_object_allocator allocator;
d1::task* critical_task = allocator.new_object<priority_task_selector>(g.my_priority_queue, allocator);
Comment on lines +529 to +530
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At the very least I would add a comment why it is necessary, explaining why graph_task's allocator cannot be used here. After all, its purpose is to cache allocations. BTW, if it cannot be used, do we need to have it in graph_task at all?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I agree that the comment is required.
Regarding the necessity of the allocator inside of the graph_task, it is required not only for allocating the critical tasks, but also for deallocation of the graph_task itself after the execution (even in the case without critical tasks).

__TBB_ASSERT( critical_task, "bad_alloc?" );
g.my_priority_queue.push(&gt);
using tbb::detail::d1::submit;
Expand Down
4 changes: 3 additions & 1 deletion src/tbb/small_object_pool.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright (c) 2020-2021 Intel Corporation
Copyright (c) 2020-2025 Intel Corporation

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -40,6 +40,8 @@ void* __TBB_EXPORTED_FUNC allocate(d1::small_object_pool*& allocator, std::size_
// TODO: optimize if the allocator contains a valid pool.
auto tls = governor::get_thread_data();
auto pool = tls->my_small_object_pool;
__TBB_ASSERT(allocator == nullptr || pool == allocator,
"An attempt was made to allocate using another thread's small memory pool");
return pool->allocate_impl(allocator, number_of_bytes);
}

Expand Down
24 changes: 23 additions & 1 deletion test/tbb/test_function_node.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright (c) 2005-2024 Intel Corporation
Copyright (c) 2005-2025 Intel Corporation

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -806,3 +806,25 @@ TEST_CASE("test function_node try_put_and_wait") {
test_try_put_and_wait();
}
#endif

// It was an issue when the critical task wrapper was allocated using the small object pool
// of the task being wrapped. Since the original task creates under the aggregator, there is no
// guarantee that the thread that requested the task creating is the same as actually created the task
// Mismatch between memory pull caused internal assertion failure while deallocating the task
//! \brief \ref regression
TEST_CASE("test critical tasks memory pool correctness") {
using node_type = tbb::flow::function_node<int, tbb::flow::continue_msg>;
constexpr int num_iterations = 10000;
int num_calls = 0;
auto node_body = [&](int) { ++num_calls; };

tbb::flow::graph g;
node_type node(g, tbb::flow::serial, node_body, tbb::flow::node_priority_t{1});

for (int i = 0; i < num_iterations; ++i) {
node.try_put(i);
}

g.wait_for_all();
REQUIRE_MESSAGE(num_calls == num_iterations, "Incorrect number of body executions");
}
Comment on lines +815 to +830
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we can make the test more lightweight.

How reproducible is this? From what I've understood the task should go into the node's queue in order to reproduce the behavior. Maybe, we can decrease the num_iterations at the expense of increasing the amount of compute done in node's body using some useless work pattern such as putting thread to sleep or introducing empty loop which is not elided by the compiler. IMHO, that would induce less pressure on the system. But will it still be reproducible enough?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will investigate if it is possible to change the test to make it more lightweight.
As far as I understand, the root cause of the issue is the aggregator of function_node and we need to maximize the pressure on the aggregator to increase the chance that one thread will request creating of the task, another thread will create it under the aggregator and return the control to the first thread. So I am not sure that increasing or blocking the node body would help.

Loading