Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add marl::DAG - a AoT declarative task graph #167

Merged
merged 1 commit into from
Jul 10, 2020

Conversation

ben-clayton
Copy link
Contributor

No description provided.

notify(ctx, node->outs[0]);
break;
default: {
// 2 or more dependees.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not notify all here? wouldn't you want the fan-out to be able to start all dependees immediately?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not notify all here

All are being notified, but concurrently.

wouldn't you want the fan-out to be able to start all dependees immediately

If immediately means "on this thread" - the reason is simple - notify() may call invoke(), blocking until the work is finished. We don't want to run all the dependees serially.

Copy link

@benvanik benvanik Jul 8, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahh that makes sense!

Though notify may also return early in the case of a join and each dependee notify schedule() will be pinning a fiber to wait on the waitgroup. If the dependee notify doesn't return early and instead invokes then the waitgroup will also block - so if A -> B | C then the waitgroup won't complete until max(B, C) (which, if B and C are of vastly different durations will be bad).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Took me a while to understand what you're saying - yes - this is correct. In other words:

The wg.wait() will block until all dependees have entirely finished. There is the potential for a fiber switch for every node that fans out. The implementation to schedule, call notify() on this fiber, then wg.wait() is an attempt to avoid an unnecessary fiber switch. It uses an arbitrary dependee to directly call notify() on, and we could be smarter here, say picking the dependee with the most child nodes, but you're upping the complexity a bit.

then the waitgroup won't complete until max(B, C) (which, if B and C are of vastly different durations will be bad).

Why is it bad? The node has to wait for all dependees to finish, and the call to wait() will either be a no-op if the schedule()s have already finished, or fiber switch to go and help out with the tasks.

Copy link
Contributor Author

@ben-clayton ben-clayton Jul 9, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking about this some more, I think I see what you're getting at. So I can do the dependee count decrement and test before the marl::schedule() call. That way we don't schedule a whole load of tiny tasks that just decrement an atomic for the fan in case.
I'll do this tomorrow morning.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, it took me awhile to formulate even that :P but I think you got it: trying to avoid creating additional no-op work (the schedule()s/done()s of non-ready work) but also trying to minimize lifetime of all tasks such that as few fibers are waiting as possible.

IIUC, I could have a chain of N nodes where by construction I end up with N-1 live fibers sitting at wg.wait() as a depth-first style notify-and-invoke-inline chain continues along. Since the contract for the DAG is really just that all dependees are notified and allowed to proceed and not that all dependees need to have completed execution that feels suboptimal. Mapping this to std::condvar, I'd want to perform a notify_all() here and then immediately return, not caring who was woken or how long they took to do their work (or whether they themselves performed additional notifications).

I think your suggestion may solve this (if the wg goes away), and exploits the fact that you know what you are notifying (other DAG nodes within the same DAG) and can use that information freely.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC, I could have a chain of N nodes where by construction I end up with N-1 live fibers sitting at wg.wait() as a depth-first style notify-and-invoke-inline chain continues along. Since the contract for the DAG is really just that all dependees are notified and allowed to proceed and not that all dependees need to have completed execution that feels suboptimal.

Right - gotcha. You're spot on - I've added some follow up changes that should dramatically reduce the number of fiber switches. Please take a look.

I think your suggestion may solve this (if the wg goes away), and exploits the fact that you know what you are notifying (other DAG nodes within the same DAG) and can use that information freely.

So we still have one WaitGroup - but it is shared by the entire DAG execution.
DAG::run() still needs to wait until all the nodes have processed, and we definitely want to process fan-outs concurrently, so a synchronisation primitive is required. Fortunately, because each node does its work before kicking the children, and the node doesn't need wait on the children to finish (with exception of the root), we can drop the wg.wait() from each of the nodes.
We still need one call to wg.wait() in DAG::run() so we don't return from run() before the graph has fully processed.

This is much nicer. Thank you!

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks awesome!

auto a0 = root.then([](Data& data) { data.push("A0"); });
auto a1 = root.then([](Data& data) { data.push("A1"); });

auto b = builder.node([](Data& data) { data.push("B"); }, {a0, a1});
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is really nice!


// Run this node's work.
if (node->work) {
ctx->invoke(node->work);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if the work enqueues tasks of its own? would the work just use a waitgroup/parallelize like normal (in which case invoke here wouldn't return until the join completed)? that seems pretty nice

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup. The work function is free to schedule new tasks and wait on them if it so wishes. The contract is simple: once the work function returns, the DAG node is complete and the dependees will execute as soon as possible. If you want to run sub DAGs from a work function, you can.

@ben-clayton ben-clayton changed the title WIP: Add marl::DAG - a AoT declarative task graph Add marl::DAG - a AoT declarative task graph Jul 9, 2020
@ben-clayton ben-clayton marked this pull request as ready for review July 9, 2020 11:15
@@ -203,7 +226,7 @@ std::shared_ptr<T> Allocator::make_shared(ARGS&&... args) {

auto alloc = allocate(request);
new (alloc.ptr) T(std::forward<ARGS>(args)...);
return std::shared_ptr<T>(reinterpret_cast<T*>(alloc.ptr), Deleter{this});
return std::shared_ptr<T>(reinterpret_cast<T*>(alloc.ptr), Deleter{this, 1});

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had not noticed this before but:
std::shared_ptr<> typically allocates more memory internally, so you may end up with a heap allocation that is not caught by your allocator. Not sure how much this matters, but it's something I noticed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great catch. Yes - I hadn't thought about this either, but you're right. Unfortunately there's no make_shared that takes an allocator. Grr - looks like I'm going to have to roll my own ref-counted types.

Mentioning #131 so this isn't forgotten.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@AWoloszyn just pointed me at std::allocate_shared which looks like it might do exactly what we need. Thanks Andrew!

@ben-clayton ben-clayton merged commit 3448974 into google:main Jul 10, 2020
@ben-clayton ben-clayton deleted the dag branch July 10, 2020 19:46
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants