Skip to content

Commit

Permalink
DPL: get rid of simplified CompletionPolicy (AliceO2Group#12450)
Browse files Browse the repository at this point in the history
The full blown version has been there for a while now, and in any case
it's required if one wants to access the oldest possible timeframe.
  • Loading branch information
ktf authored Dec 18, 2023
1 parent 7fe5ab6 commit 08e4115
Show file tree
Hide file tree
Showing 6 changed files with 23 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ class TPCSectorCompletionPolicy
return std::regex_match(device.name.begin(), device.name.end(), std::regex(expression.c_str()));
};

auto callback = [bRequireAll = mRequireAll, inputMatchers = mInputMatchers, externalInputMatchers = mExternalInputMatchers, pTpcSectorMask = mTpcSectorMask, orderCheck = mOrderCheck](framework::InputSpan const& inputs) -> framework::CompletionPolicy::CompletionOp {
auto callback = [bRequireAll = mRequireAll, inputMatchers = mInputMatchers, externalInputMatchers = mExternalInputMatchers, pTpcSectorMask = mTpcSectorMask, orderCheck = mOrderCheck](framework::InputSpan const& inputs, auto const&, auto&) -> framework::CompletionPolicy::CompletionOp {
unsigned long tpcSectorMask = pTpcSectorMask ? *pTpcSectorMask : 0xFFFFFFFFF;
std::bitset<NSectors> validSectors = 0;
bool haveMatchedInput = false;
Expand Down
9 changes: 2 additions & 7 deletions Framework/Core/include/Framework/CompletionPolicy.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,26 +64,21 @@ struct CompletionPolicy {

using Matcher = std::function<bool(DeviceSpec const& device)>;
using InputSetElement = DataRef;
using Callback = std::function<CompletionOp(InputSpan const&)>;
using CallbackFull = std::function<CompletionOp(InputSpan const&, std::vector<InputSpec> const&, ServiceRegistryRef&)>;
using CallbackConfigureRelayer = std::function<void(DataRelayer&)>;

/// Constructor
CompletionPolicy()
: name{}, matcher{}, callback{} {}
: name{}, matcher{}, callbackFull{} {}
/// Constructor for emplace_back
CompletionPolicy(std::string _name, Matcher _matcher, Callback _callback, bool _balanceChannels = true)
: name(std::move(_name)), matcher(std::move(_matcher)), callback(std::move(_callback)), callbackFull{nullptr}, balanceChannels{_balanceChannels} {}
CompletionPolicy(std::string _name, Matcher _matcher, CallbackFull _callback, bool _balanceChannels = true)
: name(std::move(_name)), matcher(std::move(_matcher)), callback(nullptr), callbackFull{std::move(_callback)}, balanceChannels{_balanceChannels} {}
: name(std::move(_name)), matcher(std::move(_matcher)), callbackFull{std::move(_callback)}, balanceChannels{_balanceChannels} {}

/// Name of the policy itself.
std::string name = "";
/// Callback to be used to understand if the policy should apply
/// to the given device.
Matcher matcher = nullptr;
/// Actual policy which decides what to do with a partial InputRecord.
Callback callback = nullptr;
/// Actual policy which decides what to do with a partial InputRecord, extended version
CallbackFull callbackFull = nullptr;
/// A callback which allows you to configure the behavior of the data relayer associated
Expand Down
17 changes: 7 additions & 10 deletions Framework/Core/src/CompletionPolicyHelpers.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,6 @@
#include <cassert>
#include <regex>

#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wpedantic"

namespace o2::framework
{

Expand All @@ -35,7 +32,7 @@ CompletionPolicy CompletionPolicyHelpers::defineByNameOrigin(std::string const&

auto originReceived = std::make_shared<std::vector<uint64_t>>();

auto callback = [originReceived, origin, op](InputSpan const& inputRefs) -> CompletionPolicy::CompletionOp {
auto callback = [originReceived, origin, op](InputSpan const& inputRefs, std::vector<InputSpec> const&, ServiceRegistryRef&) -> CompletionPolicy::CompletionOp {
// update list of the start times of inputs with origin @origin
for (auto& ref : inputRefs) {
if (ref.header != nullptr) {
Expand Down Expand Up @@ -77,7 +74,7 @@ CompletionPolicy CompletionPolicyHelpers::defineByName(std::string const& name,
auto matcher = [name](DeviceSpec const& device) -> bool {
return std::regex_match(device.name.begin(), device.name.end(), std::regex(name));
};
auto callback = [op](InputSpan const&) -> CompletionPolicy::CompletionOp {
auto callback = [op](InputSpan const&, std::vector<InputSpec> const& specs, ServiceRegistryRef& ref) -> CompletionPolicy::CompletionOp {
return op;
};
switch (op) {
Expand Down Expand Up @@ -108,7 +105,8 @@ CompletionPolicy CompletionPolicyHelpers::defineByName(std::string const& name,

CompletionPolicy CompletionPolicyHelpers::consumeWhenAll(const char* name, CompletionPolicy::Matcher matcher)
{
auto callback = [](InputSpan const& inputs) -> CompletionPolicy::CompletionOp {
auto callback = [](InputSpan const& inputs, std::vector<InputSpec> const& specs, ServiceRegistryRef& ref) -> CompletionPolicy::CompletionOp {
assert(inputs.size() == specs.size());
for (auto& input : inputs) {
if (input.header == nullptr) {
return CompletionPolicy::CompletionOp::Wait;
Expand All @@ -123,7 +121,7 @@ CompletionPolicy CompletionPolicyHelpers::consumeWhenAllOrdered(const char* name
{
auto callbackFull = [](InputSpan const& inputs, std::vector<InputSpec> const&, ServiceRegistryRef& ref) -> CompletionPolicy::CompletionOp {
auto& decongestionService = ref.get<DecongestionService>();
decongestionService.orderedCompletionPolicyActive = 1;
decongestionService.orderedCompletionPolicyActive = true;
for (auto& input : inputs) {
if (input.header == nullptr) {
return CompletionPolicy::CompletionOp::Wait;
Expand Down Expand Up @@ -199,7 +197,7 @@ CompletionPolicy CompletionPolicyHelpers::consumeExistingWhenAny(const char* nam

CompletionPolicy CompletionPolicyHelpers::consumeWhenAny(const char* name, CompletionPolicy::Matcher matcher)
{
auto callback = [](InputSpan const& inputs) -> CompletionPolicy::CompletionOp {
auto callback = [](InputSpan const& inputs, std::vector<InputSpec> const&, ServiceRegistryRef& ref) -> CompletionPolicy::CompletionOp {
for (auto& input : inputs) {
if (input.header != nullptr) {
return CompletionPolicy::CompletionOp::Consume;
Expand Down Expand Up @@ -289,7 +287,7 @@ CompletionPolicy CompletionPolicyHelpers::consumeWhenAnyWithAllConditions(std::s

CompletionPolicy CompletionPolicyHelpers::processWhenAny(const char* name, CompletionPolicy::Matcher matcher)
{
auto callback = [](InputSpan const& inputs) -> CompletionPolicy::CompletionOp {
auto callback = [](InputSpan const& inputs, std::vector<InputSpec> const&, ServiceRegistryRef& ref) -> CompletionPolicy::CompletionOp {
size_t present = 0;
for (auto& input : inputs) {
if (input.header != nullptr) {
Expand All @@ -307,4 +305,3 @@ CompletionPolicy CompletionPolicyHelpers::processWhenAny(const char* name, Compl
}

} // namespace o2::framework
#pragma GCC diagnostic pop
13 changes: 5 additions & 8 deletions Framework/Core/src/DataRelayer.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -675,6 +675,9 @@ void DataRelayer::getReadyToProcess(std::vector<DataRelayer::RecordAction>& comp
notDirty++;
continue;
}
if (!mCompletionPolicy.callbackFull) {
throw runtime_error_f("Completion police %s has no callback set", mCompletionPolicy.name.c_str());
}
auto partial = getPartialRecord(li);
// TODO: get the data ref from message model
auto getter = [&partial](size_t idx, size_t part) {
Expand All @@ -692,14 +695,8 @@ void DataRelayer::getReadyToProcess(std::vector<DataRelayer::RecordAction>& comp
return partial[idx].size();
};
InputSpan span{getter, nPartsGetter, static_cast<size_t>(partial.size())};
CompletionPolicy::CompletionOp action;
if (mCompletionPolicy.callback) {
action = mCompletionPolicy.callback(span);
} else if (mCompletionPolicy.callbackFull) {
action = mCompletionPolicy.callbackFull(span, mInputs, mContext);
} else {
throw runtime_error_f("Completion police %s has no callback set", mCompletionPolicy.name.c_str());
}
CompletionPolicy::CompletionOp action = mCompletionPolicy.callbackFull(span, mInputs, mContext);

auto& variables = mTimesliceIndex.getVariablesForSlot(slot);
auto timeslice = std::get_if<uint64_t>(&variables.get(0));
switch (action) {
Expand Down
9 changes: 7 additions & 2 deletions Framework/Core/test/test_CompletionPolicy.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include <catch_amalgamated.hpp>
#include "Framework/CompletionPolicy.h"
#include "Framework/CompletionPolicyHelpers.h"
#include "Framework/ServiceRegistry.h"
#include "Headers/DataHeader.h"
#include "Headers/NameHeader.h"
#include "Framework/CompletionPolicy.h"
Expand Down Expand Up @@ -39,7 +40,9 @@ TEST_CASE("TestCompletionPolicy_callback")
return true;
};

auto callback = [&stack](InputSpan const& inputRefs) {
ServiceRegistry services;

auto callback = [&stack](InputSpan const& inputRefs, std::vector<InputSpec> const&, ServiceRegistryRef&) {
for (auto const& ref : inputRefs) {
auto const* header = CompletionPolicyHelpers::getHeader<o2::header::DataHeader>(ref);
REQUIRE(header == reinterpret_cast<o2::header::DataHeader*>(stack.data()));
Expand All @@ -53,7 +56,9 @@ TEST_CASE("TestCompletionPolicy_callback")
{"test", matcher, callback}};
CompletionPolicy::InputSetElement ref{nullptr, reinterpret_cast<const char*>(stack.data()), nullptr};
InputSpan const& inputs{[&ref](size_t) { return ref; }, 1};
std::vector<InputSpec> specs;
ServiceRegistryRef servicesRef{services};
for (auto& policy : policies) {
policy.callback(inputs);
policy.callbackFull(inputs, specs, servicesRef);
}
}
2 changes: 1 addition & 1 deletion Framework/Core/test/test_StaggeringWorkflow.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ void customize(std::vector<o2::framework::CompletionPolicy>& policies)
// search for spec names starting with "processor"
return spec.name.find("processor") == 0;
},
[](auto const&) { return o2::framework::CompletionPolicy::CompletionOp::Consume; }});
[](auto const&, auto const&, auto &) { return o2::framework::CompletionPolicy::CompletionOp::Consume; }});
}

#include "Framework/runDataProcessing.h"
Expand Down

0 comments on commit 08e4115

Please sign in to comment.