Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix the allocator for Flow Graph critical tasks creating #1596

Open
wants to merge 3 commits into
base: master
Choose a base branch
from

Conversation

kboyarinov
Copy link
Contributor

@kboyarinov kboyarinov commented Jan 8, 2025

Description

There is a bug in the current implementation of Flow Graph while using functional node concurrency limits together with node priorities.
Current algorithm for creating tasks in function node is the following:

  • If the concurrency limit is not reached, two tasks are created - graph_task for executing the body and priority_task_selector task for prioritization. Both tasks are created by the same thread.
  • If the concurrency limit is reached, the postponed items are stored in the queue.
  • Once the resources of the node are available, the thread X tries to get the postponed item from the queue using the aggregator.
  • Since the aggregator is used, another thread Y can take the "aggregated" work and hence, create the graph_task using its small object pool.
  • Once the control goes back to thread X, it creates a priority_task_selector wrapper. The bug in the current implementation is that the small object pool, associated with the underlying task (created by thread Y) is used instead of small object pool of thread X.

Also this patch adds extra assert for small object pool to ensure that the pool, for which the allocation is requested is the same as the TLS pool.

Fixes #1595

Type of change

Choose one or multiple, leave empty if none of the other choices apply

Add a respective label(s) to PR if you have permissions

  • bug fix - change that fixes an issue
  • new feature - change that adds functionality
  • tests - change in tests
  • infrastructure - change in infrastructure and CI
  • documentation - documentation update

Tests

  • added - required for new features and some bug fixes
  • not needed

Documentation

  • updated in # - add PR number
  • needs to be updated
  • not needed

Breaks backward compatibility

  • Yes
  • No
  • Unknown

Notify the following users

List users with @ to send notifications

Other information

src/tbb/small_object_pool.cpp Outdated Show resolved Hide resolved
Comment on lines +815 to +830
TEST_CASE("test critical tasks memory pool correctness") {
using node_type = tbb::flow::function_node<int, tbb::flow::continue_msg>;
constexpr int num_iterations = 10000;
int num_calls = 0;
auto node_body = [&](int) { ++num_calls; };

tbb::flow::graph g;
node_type node(g, tbb::flow::serial, node_body, tbb::flow::node_priority_t{1});

for (int i = 0; i < num_iterations; ++i) {
node.try_put(i);
}

g.wait_for_all();
REQUIRE_MESSAGE(num_calls == num_iterations, "Incorrect number of body executions");
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we can make the test more lightweight.

How reproducible is this? From what I've understood the task should go into the node's queue in order to reproduce the behavior. Maybe, we can decrease the num_iterations at the expense of increasing the amount of compute done in node's body using some useless work pattern such as putting thread to sleep or introducing empty loop which is not elided by the compiler. IMHO, that would induce less pressure on the system. But will it still be reproducible enough?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will investigate if it is possible to change the test to make it more lightweight.
As far as I understand, the root cause of the issue is the aggregator of function_node and we need to maximize the pressure on the aggregator to increase the chance that one thread will request creating of the task, another thread will create it under the aggregator and return the control to the first thread. So I am not sure that increasing or blocking the node body would help.

Comment on lines +529 to +530
d1::small_object_allocator allocator;
d1::task* critical_task = allocator.new_object<priority_task_selector>(g.my_priority_queue, allocator);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At the very least I would add a comment why it is necessary, explaining why graph_task's allocator cannot be used here. After all, its purpose is to cache allocations. BTW, if it cannot be used, do we need to have it in graph_task at all?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I agree that the comment is required.
Regarding the necessity of the allocator inside of the graph_task, it is required not only for allocating the critical tasks, but also for deallocation of the graph_task itself after the execution (even in the case without critical tasks).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

flow_graph: Assertion m_private_counter >= 0 failed
2 participants