diff --git a/CMakeLists.txt b/CMakeLists.txt index 4894b1f..d0a3063 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -162,17 +162,18 @@ function(marl_set_target_options target) # Enable all warnings if(MSVC) - target_compile_options(${target} PRIVATE - "-W4" - "/wd4127" # conditional expression is constant - ) + target_compile_options(${target} PRIVATE "-W4") else() target_compile_options(${target} PRIVATE "-Wall") endif() # Disable specific, pedantic warnings if(MSVC) - target_compile_options(${target} PRIVATE "-D_CRT_SECURE_NO_WARNINGS") + target_compile_options(${target} PRIVATE + "-D_CRT_SECURE_NO_WARNINGS" + "/wd4127" # conditional expression is constant + "/wd4324" # structure was padded due to alignment specifier + ) endif() # Treat all warnings as errors @@ -276,6 +277,7 @@ if(MARL_BUILD_TESTS) ${MARL_SRC_DIR}/blockingcall_test.cpp ${MARL_SRC_DIR}/conditionvariable_test.cpp ${MARL_SRC_DIR}/containers_test.cpp + ${MARL_SRC_DIR}/dag_test.cpp ${MARL_SRC_DIR}/defer_test.cpp ${MARL_SRC_DIR}/event_test.cpp ${MARL_SRC_DIR}/marl_test.cpp @@ -289,12 +291,14 @@ if(MARL_BUILD_TESTS) ${MARL_SRC_DIR}/ticket_test.cpp ${MARL_SRC_DIR}/waitgroup_test.cpp ${MARL_GOOGLETEST_DIR}/googletest/src/gtest-all.cc + ${MARL_GOOGLETEST_DIR}/googlemock/src/gmock-all.cc ) set(MARL_TEST_INCLUDE_DIR ${MARL_GOOGLETEST_DIR}/googletest/include/ ${MARL_GOOGLETEST_DIR}/googlemock/include/ ${MARL_GOOGLETEST_DIR}/googletest/ + ${MARL_GOOGLETEST_DIR}/googlemock/ ) add_executable(marl-unittests ${MARL_TEST_LIST}) diff --git a/include/marl/dag.h b/include/marl/dag.h new file mode 100644 index 0000000..d7a4c23 --- /dev/null +++ b/include/marl/dag.h @@ -0,0 +1,390 @@ +// Copyright 2020 The Marl Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// marl::DAG<> provides an ahead of time, declarative, directed acyclic +// task graph. + +#ifndef marl_dag_h +#define marl_dag_h + +#include "containers.h" +#include "export.h" +#include "memory.h" +#include "scheduler.h" +#include "waitgroup.h" + +namespace marl { +namespace detail { +using DAGCounter = std::atomic; +template +struct DAGRunContext { + T data; + Allocator::unique_ptr counters; + + template + MARL_NO_EXPORT inline void invoke(F&& f) { + f(data); + } +}; +template <> +struct DAGRunContext { + Allocator::unique_ptr counters; + + template + MARL_NO_EXPORT inline void invoke(F&& f) { + f(); + } +}; +template +struct DAGWork { + using type = std::function; +}; +template <> +struct DAGWork { + using type = std::function; +}; +} // namespace detail + +/////////////////////////////////////////////////////////////////////////////// +// Forward declarations +/////////////////////////////////////////////////////////////////////////////// +template +class DAG; + +template +class DAGBuilder; + +template +class DAGNodeBuilder; + +/////////////////////////////////////////////////////////////////////////////// +// DAGBase +/////////////////////////////////////////////////////////////////////////////// + +// DAGBase is derived by DAG and DAG. It has no public API. +template +class DAGBase { + protected: + friend DAGBuilder; + friend DAGNodeBuilder; + + using RunContext = detail::DAGRunContext; + using Counter = detail::DAGCounter; + using NodeIndex = size_t; + using Work = typename detail::DAGWork::type; + static const constexpr size_t NumReservedNodes = 32; + static const constexpr size_t NumReservedNumOuts = 4; + static const constexpr size_t InvalidCounterIndex = ~static_cast(0); + static const constexpr NodeIndex RootIndex = 0; + + // DAG work node. + struct Node { + MARL_NO_EXPORT inline Node() = default; + MARL_NO_EXPORT inline Node(Work&& work); + + // The work to perform for this node in the graph. + Work work; + + // counterIndex if valid, is the index of the counter in the RunContext for + // this node. The counter is decremented for each completed dependency task + // (ins), and once it reaches 0, this node will be invoked. + size_t counterIndex = InvalidCounterIndex; + + // Indices for all downstream nodes. + containers::vector outs; + }; + + // initCounters() allocates and initializes the ctx->coutners from + // initialCounters. + MARL_NO_EXPORT inline void initCounters(RunContext* ctx, + Allocator* allocator); + + // notify() is called whenever a dependency task (ins) have completed. + // If all dependency tasks have completed (or this is the root node) then the + // node is run, and then notify() is called on all dependee nodes (outs). + MARL_NO_EXPORT inline void notify(RunContext*, NodeIndex); + + // nodes is the full list of the nodes in the graph. + // nodes[0] is always the root node, which has no dependencies (ins). + containers::vector nodes; + + // initialCounters is a list of initial counter values to be copied to + // RunContext::counters on DAG<>::run(). + // initialCounters is indexed by Node::counterIndex, and only contains counts + // for nodes that have at least 2 dependencies (ins) - because of this the + // number of entries in initialCounters may be fewer than nodes. + containers::vector initialCounters; +}; + +template +DAGBase::Node::Node(Work&& work) : work(std::move(work)) {} + +template +void DAGBase::initCounters(RunContext* ctx, Allocator* allocator) { + auto numCounters = initialCounters.size(); + ctx->counters = allocator->make_unique_n(numCounters); + for (size_t i = 0; i < numCounters; i++) { + ctx->counters.get()[i] = {initialCounters[i]}; + } +} + +template +void DAGBase::notify(RunContext* ctx, NodeIndex nodeIdx) { + Node* node = &nodes[nodeIdx]; + + // If we have multiple dependencies, decrement the counter and check whether + // we've reached 0. + if (node->counterIndex != InvalidCounterIndex) { + auto counters = ctx->counters.get(); + auto counter = --counters[node->counterIndex]; + if (counter > 0) { + // Still waiting on more dependencies to finish. + return; + } + } + + // Run this node's work. + if (node->work) { + ctx->invoke(node->work); + } + + // Then call notify() all dependees (outs). + size_t numOuts = node->outs.size(); + switch (numOuts) { + case 0: + // No dependees. + break; + case 1: + // Single dependee, just call notify directly. + notify(ctx, node->outs[0]); + break; + default: { + // 2 or more dependees. + // Schedule notify() calls for 1..n, directly call notify() for the first + // dependee, then wait on all the scheduled calls to complete. + marl::WaitGroup wg(static_cast(numOuts - 1)); + for (NodeIndex i = 1; i < numOuts; i++) { + marl::schedule([=] { + notify(ctx, node->outs[i]); + wg.done(); + }); + } + notify(ctx, node->outs[0]); + wg.wait(); + } + } +} + +/////////////////////////////////////////////////////////////////////////////// +// DAGNodeBuilder +/////////////////////////////////////////////////////////////////////////////// + +// DAGNodeBuilder is the builder interface for a DAG node. +template +class DAGNodeBuilder { + using NodeIndex = typename DAGBase::NodeIndex; + + public: + // then() builds and returns a new DAG node that will be invoked after this + // node has completed. + // + // F is a function that will be called when the new DAG node is invoked, with + // the signature: + // void(T) when T is not void + // or + // void() when T is void + template + MARL_NO_EXPORT inline DAGNodeBuilder then(F&&); + + private: + friend DAGBuilder; + MARL_NO_EXPORT inline DAGNodeBuilder(DAGBuilder*, NodeIndex); + DAGBuilder* builder; + NodeIndex index; +}; + +template +DAGNodeBuilder::DAGNodeBuilder(DAGBuilder* builder, NodeIndex index) + : builder(builder), index(index) {} + +template +template +DAGNodeBuilder DAGNodeBuilder::then(F&& work) { + auto node = builder->node(std::move(work)); + builder->addDependency(*this, node); + return node; +} + +/////////////////////////////////////////////////////////////////////////////// +// DAGBuilder +/////////////////////////////////////////////////////////////////////////////// +template +class DAGBuilder { + public: + // DAGBuilder constructor + MARL_NO_EXPORT inline DAGBuilder(Allocator* allocator = Allocator::Default); + + // root() returns the root DAG node. + MARL_NO_EXPORT inline DAGNodeBuilder root(); + + // node() builds and returns a new DAG node with no initial dependencies. + // The returned node must be attached to the graph in order to invoke F or any + // of the dependees of this returned node. + // + // F is a function that will be called when the new DAG node is invoked, with + // the signature: + // void(T) when T is not void + // or + // void() when T is void + template + MARL_NO_EXPORT inline DAGNodeBuilder node(F&& work); + + // node() builds and returns a new DAG node that depends on all the tasks in + // after to be completed before invoking F. + // + // F is a function that will be called when the new DAG node is invoked, with + // the signature: + // void(T) when T is not void + // or + // void() when T is void + template + MARL_NO_EXPORT inline DAGNodeBuilder node( + F&& work, + std::initializer_list> after); + + // addDependency() adds parent as dependency on child. All dependencies of + // child must have completed before child is invoked. + MARL_NO_EXPORT inline void addDependency(DAGNodeBuilder parent, + DAGNodeBuilder child); + + // build() constructs and returns the DAG. No other methods of this class may + // be called after calling build(). + MARL_NO_EXPORT inline Allocator::unique_ptr> build(); + + private: + static const constexpr size_t NumReservedNumIns = 4; + using Node = typename DAG::Node; + + // The DAG being built. + Allocator::unique_ptr> dag; + + // Number of dependencies (ins) for each node in dag->nodes. + containers::vector numIns; +}; + +template +DAGBuilder::DAGBuilder(Allocator* allocator /* = Allocator::Default */) + : dag(allocator->make_unique>()), numIns(allocator) { + // Add root + dag->nodes.emplace_back(Node{}); + numIns.emplace_back(0); +} + +template +DAGNodeBuilder DAGBuilder::root() { + return DAGNodeBuilder{this, DAGBase::RootIndex}; +} + +template +template +DAGNodeBuilder DAGBuilder::node(F&& work) { + return node(std::forward(work), {}); +} + +template +template +DAGNodeBuilder DAGBuilder::node( + F&& work, + std::initializer_list> after) { + MARL_ASSERT(numIns.size() == dag->nodes.size(), + "NodeBuilder vectors out of sync"); + auto index = dag->nodes.size(); + numIns.emplace_back(0); + dag->nodes.emplace_back(Node{std::move(work)}); + auto node = DAGNodeBuilder{this, index}; + for (auto in : after) { + addDependency(in, node); + } + return node; +} + +template +void DAGBuilder::addDependency(DAGNodeBuilder parent, + DAGNodeBuilder child) { + numIns[child.index]++; + dag->nodes[parent.index].outs.push_back(child.index); +} + +template +Allocator::unique_ptr> DAGBuilder::build() { + auto numNodes = dag->nodes.size(); + MARL_ASSERT(numIns.size() == dag->nodes.size(), + "NodeBuilder vectors out of sync"); + for (size_t i = 0; i < numNodes; i++) { + if (numIns[i] > 1) { + auto& node = dag->nodes[i]; + node.counterIndex = dag->initialCounters.size(); + dag->initialCounters.push_back(numIns[i]); + } + } + return std::move(dag); +} + +/////////////////////////////////////////////////////////////////////////////// +// DAG +/////////////////////////////////////////////////////////////////////////////// +template +class DAG : public DAGBase { + public: + using Builder = DAGBuilder; + using NodeBuilder = DAGNodeBuilder; + + // run() invokes the function of each node in the graph of the DAG, passing + // data to each, starting with the root node. All dependencies need to have + // completed their function before dependees will be invoked. + MARL_NO_EXPORT inline void run(T& data, + Allocator* allocator = Allocator::Default); +}; + +template +void DAG::run(T& arg, Allocator* allocator /* = Allocator::Default */) { + typename DAGBase::RunContext ctx{arg}; + this->initCounters(&ctx, allocator); + this->notify(&ctx, this->RootIndex); +} + +/////////////////////////////////////////////////////////////////////////////// +// DAG +/////////////////////////////////////////////////////////////////////////////// +template <> +class DAG : public DAGBase { + public: + using Builder = DAGBuilder; + using NodeBuilder = DAGNodeBuilder; + + // run() invokes the function of each node in the graph of the DAG, starting + // with the root node. All dependencies need to have completed their function + // before dependees will be invoked. + MARL_NO_EXPORT inline void run(Allocator* allocator = Allocator::Default); +}; + +void DAG::run(Allocator* allocator /* = Allocator::Default */) { + typename DAGBase::RunContext ctx{}; + this->initCounters(&ctx, allocator); + this->notify(&ctx, this->RootIndex); +} + +} // namespace marl + +#endif // marl_dag_h diff --git a/include/marl/memory.h b/include/marl/memory.h index a8dba4b..d80e592 100644 --- a/include/marl/memory.h +++ b/include/marl/memory.h @@ -96,12 +96,13 @@ class Allocator { // pointers returned by make_shared() and make_unique(). struct MARL_EXPORT Deleter { MARL_NO_EXPORT inline Deleter(); - MARL_NO_EXPORT inline Deleter(Allocator* allocator); + MARL_NO_EXPORT inline Deleter(Allocator* allocator, size_t count); template MARL_NO_EXPORT inline void operator()(T* object); Allocator* allocator = nullptr; + size_t count = 0; }; // unique_ptr is an alias to std::unique_ptr. @@ -134,6 +135,12 @@ class Allocator { template inline unique_ptr make_unique(ARGS&&... args); + // make_unique_n() returns an array of n new objects allocated from the + // allocator wrapped in a unique_ptr that respects the alignemnt of the + // type. + template + inline unique_ptr make_unique_n(size_t n, ARGS&&... args); + // make_shared() returns a new object allocated from the allocator // wrapped in a std::shared_ptr that respects the alignemnt of the type. template @@ -143,8 +150,12 @@ class Allocator { Allocator() = default; }; +/////////////////////////////////////////////////////////////////////////////// +// Allocator::Deleter +/////////////////////////////////////////////////////////////////////////////// Allocator::Deleter::Deleter() : allocator(nullptr) {} -Allocator::Deleter::Deleter(Allocator* allocator) : allocator(allocator) {} +Allocator::Deleter::Deleter(Allocator* allocator, size_t count) + : allocator(allocator), count(count) {} template void Allocator::Deleter::operator()(T* object) { @@ -152,12 +163,15 @@ void Allocator::Deleter::operator()(T* object) { Allocation allocation; allocation.ptr = object; - allocation.request.size = sizeof(T); + allocation.request.size = sizeof(T) * count; allocation.request.alignment = alignof(T); allocation.request.usage = Allocation::Usage::Create; allocator->free(allocation); } +/////////////////////////////////////////////////////////////////////////////// +// Allocator +/////////////////////////////////////////////////////////////////////////////// template T* Allocator::create(ARGS&&... args) { Allocation::Request request; @@ -184,14 +198,23 @@ void Allocator::destroy(T* object) { template Allocator::unique_ptr Allocator::make_unique(ARGS&&... args) { + return make_unique_n(1, std::forward(args)...); +} + +template +Allocator::unique_ptr Allocator::make_unique_n(size_t n, ARGS&&... args) { + if (n == 0) { + return nullptr; + } + Allocation::Request request; - request.size = sizeof(T); + request.size = sizeof(T) * n; request.alignment = alignof(T); request.usage = Allocation::Usage::Create; auto alloc = allocate(request); new (alloc.ptr) T(std::forward(args)...); - return unique_ptr(reinterpret_cast(alloc.ptr), Deleter{this}); + return unique_ptr(reinterpret_cast(alloc.ptr), Deleter{this, n}); } template @@ -203,7 +226,7 @@ std::shared_ptr Allocator::make_shared(ARGS&&... args) { auto alloc = allocate(request); new (alloc.ptr) T(std::forward(args)...); - return std::shared_ptr(reinterpret_cast(alloc.ptr), Deleter{this}); + return std::shared_ptr(reinterpret_cast(alloc.ptr), Deleter{this, 1}); } /////////////////////////////////////////////////////////////////////////////// diff --git a/src/dag_test.cpp b/src/dag_test.cpp new file mode 100644 index 0000000..f4a3742 --- /dev/null +++ b/src/dag_test.cpp @@ -0,0 +1,155 @@ +// Copyright 2020 The Marl Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "marl/dag.h" + +#include "marl_test.h" + +using namespace testing; + +namespace { + +struct Data { + std::mutex mutex; + std::vector order; + + void push(std::string&& s) { + std::unique_lock lock(mutex); + order.emplace_back(std::move(s)); + } +}; + +template +std::vector slice(const std::vector& in, size_t from, size_t to) { + return {in.begin() + from, in.begin() + to}; +} + +} // namespace + +// [A] --> [B] --> [C] | +TEST_P(WithBoundScheduler, DAGChainNoArg) { + marl::DAG<>::Builder builder; + + Data data; + builder.root() + .then([&] { data.push("A"); }) + .then([&] { data.push("B"); }) + .then([&] { data.push("C"); }); + + auto dag = builder.build(); + dag->run(); + + ASSERT_THAT(data.order, ElementsAre("A", "B", "C")); +} + +// [A] --> [B] --> [C] | +TEST_P(WithBoundScheduler, DAGChain) { + marl::DAG::Builder builder; + + builder.root() + .then([](Data& data) { data.push("A"); }) + .then([](Data& data) { data.push("B"); }) + .then([](Data& data) { data.push("C"); }); + + auto dag = builder.build(); + + Data data; + dag->run(data); + + ASSERT_THAT(data.order, ElementsAre("A", "B", "C")); +} + +// /--> [A] | +// [root] --|--> [B] | +// \--> [C] | +TEST_P(WithBoundScheduler, DAGFanOutFromRoot) { + marl::DAG::Builder builder; + + auto root = builder.root(); + root.then([](Data& data) { data.push("A"); }); + root.then([](Data& data) { data.push("B"); }); + root.then([](Data& data) { data.push("C"); }); + + auto dag = builder.build(); + + Data data; + dag->run(data); + + ASSERT_THAT(data.order, UnorderedElementsAre("A", "B", "C")); +} + +// /--> [A] | +// [root] -->[N]--|--> [B] | +// \--> [C] | +TEST_P(WithBoundScheduler, DAGFanOutFromNonRoot) { + marl::DAG::Builder builder; + + auto root = builder.root(); + auto node = root.then([](Data& data) { data.push("N"); }); + node.then([](Data& data) { data.push("A"); }); + node.then([](Data& data) { data.push("B"); }); + node.then([](Data& data) { data.push("C"); }); + + auto dag = builder.build(); + + Data data; + dag->run(data); + + ASSERT_THAT(data.order, UnorderedElementsAre("N", "A", "B", "C")); + ASSERT_EQ(data.order[0], "N"); + ASSERT_THAT(slice(data.order, 1, 4), UnorderedElementsAre("A", "B", "C")); +} + +// /--> [A0] --\ /--> [C0] --\ /--> [E0] --\ | +// [root] --|--> [A1] --|-->[B]--|--> [C1] --|-->[D]--|--> [E1] --|-->[F] | +// \--> [C2] --/ |--> [E2] --| | +// \--> [E3] --/ | +TEST_P(WithBoundScheduler, DAGFanOutFanIn) { + marl::DAG::Builder builder; + + auto root = builder.root(); + auto a0 = root.then([](Data& data) { data.push("A0"); }); + auto a1 = root.then([](Data& data) { data.push("A1"); }); + + auto b = builder.node([](Data& data) { data.push("B"); }, {a0, a1}); + + auto c0 = b.then([](Data& data) { data.push("C0"); }); + auto c1 = b.then([](Data& data) { data.push("C1"); }); + auto c2 = b.then([](Data& data) { data.push("C2"); }); + + auto d = builder.node([](Data& data) { data.push("D"); }, {c0, c1, c2}); + + auto e0 = d.then([](Data& data) { data.push("E0"); }); + auto e1 = d.then([](Data& data) { data.push("E1"); }); + auto e2 = d.then([](Data& data) { data.push("E2"); }); + auto e3 = d.then([](Data& data) { data.push("E3"); }); + + builder.node([](Data& data) { data.push("F"); }, {e0, e1, e2, e3}); + + auto dag = builder.build(); + + Data data; + dag->run(data); + + ASSERT_THAT(data.order, + UnorderedElementsAre("A0", "A1", "B", "C0", "C1", "C2", "D", "E0", + "E1", "E2", "E3", "F")); + ASSERT_THAT(slice(data.order, 0, 2), UnorderedElementsAre("A0", "A1")); + ASSERT_THAT(data.order[2], "B"); + ASSERT_THAT(slice(data.order, 3, 6), UnorderedElementsAre("C0", "C1", "C2")); + ASSERT_THAT(data.order[6], "D"); + ASSERT_THAT(slice(data.order, 7, 11), + UnorderedElementsAre("E0", "E1", "E2", "E3")); + ASSERT_THAT(data.order[11], "F"); +}