Skip to content

Commit

Permalink
An advanced example of QC topology (#88)
Browse files Browse the repository at this point in the history
  • Loading branch information
knopers8 authored Nov 15, 2018
1 parent c7bcb35 commit 6d07be6
Show file tree
Hide file tree
Showing 9 changed files with 264 additions and 5 deletions.
3 changes: 3 additions & 0 deletions Framework/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ set(
src/runInformationService.cxx
src/runInformationServiceDump.cxx
src/runBasic.cxx
src/runAdvanced.cxx
src/runReadout.cxx
src/runMergerTest.cxx
src/runReadoutForDataDump.cxx
Expand All @@ -151,6 +152,7 @@ set(
qcInfoService
qcInfoServiceDump
qcRunBasic
qcRunAdvanced
qcRunReadout
runMergerTest
qcRunReadoutForDataDump
Expand Down Expand Up @@ -297,6 +299,7 @@ install(
install(
FILES
basic.json
advanced.json
readout.json
readoutForDataDump.json
DESTINATION etc
Expand Down
91 changes: 91 additions & 0 deletions Framework/advanced.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
{
"qc": {
"config": {
"database": {
"implementation": "CCDB",
"host": "ccdb-test.cern.ch:8080",
"username": "not_applicable",
"password": "not_applicable",
"name": "not_applicable"
},
"Activity": {
"number": "42",
"type": "2"
}
},
"tasks": {
"dataSizeTask": {
"active": "true",
"className": "o2::quality_control_modules::skeleton::SkeletonTask",
"moduleName": "QcSkeleton",
"cycleDurationSeconds": "10",
"maxNumberCycles": "-1",
"dataSamplingPolicy": "tst2",
"location": "local",
"machines": [
"o2flptst1",
"o2flptst2",
"o2flptst3"
]
},
"someNumbersTask": {
"active": "true",
"className": "o2::quality_control_modules::skeleton::SkeletonTask",
"moduleName": "QcSkeleton",
"cycleDurationSeconds": "10",
"maxNumberCycles": "-1",
"dataSamplingPolicy": "tst1",
"location": "remote"
}
}
},
"dataSamplingPolicies": [
{
"id": "tst1",
"active": "true",
"machines": [],
"dataHeaders": [
{
"binding": "sum",
"dataOrigin": "TST",
"dataDescription": "SUM"
},
{
"binding": "param",
"dataOrigin": "TST",
"dataDescription": "PARAM"
}
],
"subSpec": "2",
"samplingConditions": [
{
"condition": "random",
"fraction": "0.1",
"seed": "32112332123"
}
],
"blocking": "false"
},
{
"id": "tst2",
"active": "true",
"machines": [],
"dataHeaders": [
{
"binding": "data",
"dataOrigin": "TST",
"dataDescription": "DATA"
}
],
"subSpec": "*",
"samplingConditions": [
{
"condition": "payloadSize",
"lowerLimit": "8000",
"upperLimit": "10000"
}
],
"blocking": "false"
}
]
}
9 changes: 8 additions & 1 deletion Framework/include/QualityControl/TaskRunnerFactory.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,14 @@ class TaskRunnerFactory
TaskRunnerFactory();
virtual ~TaskRunnerFactory();

o2::framework::DataProcessorSpec create(std::string taskName, std::string configurationSource, size_t id = 0);
/// \brief Creator of tasks
///
/// \param taskName - name of the task, which exists in tasks list in the configuration file
/// \param configurationSource - absolute path to configuration file, preceded with backend (f.e. "json://")
/// \param id - subSpecification for taskRunner's OutputSpec, useful to avoid outputs collisions one more complex topologies
/// \param resetAfterPublish - should taskRunner reset the user's task after each MO publication
o2::framework::DataProcessorSpec
create(std::string taskName, std::string configurationSource, size_t id = 0, bool resetAfterPublish = false);
};

} // namespace core
Expand Down
3 changes: 2 additions & 1 deletion Framework/src/HistoMerger.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ void HistoMerger::init(framework::InitContext& ctx) { mMonitorObject.reset(); }
void HistoMerger::run(framework::ProcessingContext& ctx)
{
for (const auto& input : ctx.inputs()) {
if (input.header != nullptr && input.spec != nullptr) {
if (input.header != nullptr && input.spec != nullptr &&
std::strstr(DataRefUtils::as<MonitorObject>(input)->getObject()->ClassName(), "TH1") != nullptr) {

if (!mMonitorObject) {
mMonitorObject.reset(DataRefUtils::as<MonitorObject>(input).release());
Expand Down
2 changes: 1 addition & 1 deletion Framework/src/InfrastructureGenerator.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ WorkflowSpec InfrastructureGenerator::generateLocalInfrastructure(std::string co

if (machine.second.get<std::string>("") == host) {
// todo: optimize it by using the same ptree?
workflow.emplace_back(taskRunnerFactory.create(task.first, configurationSource, id));
workflow.emplace_back(taskRunnerFactory.create(task.first, configurationSource, id, true));
break;
}
id++;
Expand Down
4 changes: 3 additions & 1 deletion Framework/src/TaskRunnerFactory.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,11 @@ TaskRunnerFactory::TaskRunnerFactory() {}

TaskRunnerFactory::~TaskRunnerFactory() {}

o2::framework::DataProcessorSpec TaskRunnerFactory::create(std::string taskName, std::string configurationSource, size_t id)
o2::framework::DataProcessorSpec
TaskRunnerFactory::create(std::string taskName, std::string configurationSource, size_t id, bool resetAfterPublish)
{
auto qcTask = std::make_shared<TaskRunner>(taskName, configurationSource, id);
qcTask->setResetAfterPublish(resetAfterPublish);

DataProcessorSpec newTask{
taskName,
Expand Down
155 changes: 155 additions & 0 deletions Framework/src/runAdvanced.cxx
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
// Copyright CERN and copyright holders of ALICE O2. This software is
// distributed under the terms of the GNU General Public License v3 (GPL
// Version 3), copied verbatim in the file "COPYING".
//
// See http://alice-o2.web.cern.ch/license for full licensing information.
//
// In applying this license CERN does not waive the privileges and immunities
// granted to it by virtue of its status as an Intergovernmental Organization
// or submit itself to any jurisdiction.

///
/// \file runAdvanced.cxx
/// \author Piotr Konopka
///
/// \brief This is an executable showing a more complicated QC topology.
///
/// This is an executable showing a more complicated QC topology. It pretends to spawn 4 separate topologies - 3 of them
/// consist of some dummy processing chain, a dispatcher and a local QC task. The last one represents the remote
/// QC servers topology, which has a merger (joining the results from local QC tasks), a checker (checks the result of
/// the previous) and a different, remote QC task with associated checker. Here they are joined into one big topology
/// just to present the concept.
/// \image html qcRunAdvanced.png
///
/// To launch it, build the project, load the environment and run the executable:
/// \code{.sh}
/// > aliBuild build QualityControl --defaults o2
/// > alienv enter QualityControl/latest
/// > qcRunAdvanced
/// \endcode
/// If you have glfw installed, you should see a window with the workflow visualization and sub-windows for each Data
/// Processor where their logs can be seen. The processing will continue until the main window it is closed. Regardless
/// of glfw being installed or not, in the terminal all the logs will be shown as well.

#include <Framework/CompletionPolicyHelpers.h>
#include <Framework/DataSampling.h>

using namespace o2::framework;

// Additional configuration of the topology, which is done by implementing `customize` functions and placing them
// before `runDataProcessing.h` header. In this case, both Dispatcher and Merger are configured to accept incoming
// messages without waiting for the rest of inputs.
void customize(std::vector<CompletionPolicy>& policies)
{
DataSampling::CustomizeInfrastructure(policies);

CompletionPolicy mergerConsumesASAP{
"mergers-always-consume",
[](DeviceSpec const& device) {
return device.name.find("merger") != std::string::npos;
},
[](gsl::span<PartRef const> const& inputs) {
return CompletionPolicy::CompletionOp::Consume;
}
};
policies.push_back(mergerConsumesASAP);
}

void customize(std::vector<ChannelConfigurationPolicy>& policies)
{
DataSampling::CustomizeInfrastructure(policies);
}

#include "QualityControl/InfrastructureGenerator.h"
#include <Framework/runDataProcessing.h>
#include <random>

using namespace o2::header;
using namespace o2::quality_control::core;
using SubSpecificationType = o2::header::DataHeader::SubSpecificationType;

// clang-format off
WorkflowSpec processingTopology(SubSpecificationType subspec)
{
DataProcessorSpec source{
"source-" + std::to_string(subspec),
Inputs{},
Outputs{{ "TST", "DATA", subspec },
{ "TST", "PARAM", subspec }},
AlgorithmSpec{
(AlgorithmSpec::ProcessCallback)
[generator = std::default_random_engine{ time(nullptr) }, subspec](ProcessingContext & ctx) mutable {
usleep(200000);
auto data = ctx.outputs().make<int>(Output{ "TST", "DATA", subspec }, generator() % 10000);
for (auto&& item : data) {
item = static_cast<int>(generator());
}
ctx.outputs().make<double>(Output{ "TST", "PARAM", subspec }, 1)[0] = 1 / static_cast<double>(1 + generator());
}
}
};

DataProcessorSpec step{
"step-" + std::to_string(subspec),
Inputs{{ "data", "TST", "DATA", subspec }},
Outputs{{ "TST", "SUM", subspec }},
AlgorithmSpec{
(AlgorithmSpec::ProcessCallback)[subspec](ProcessingContext & ctx) {
const auto* header = get<DataHeader*>(ctx.inputs().get("data").header);
auto data = DataRefUtils::as<int>(ctx.inputs().get("data"));
long long sum = 0;
for (auto d : data) { sum += d; }
ctx.outputs().snapshot(Output{ "TST", "SUM", subspec }, sum);
}
}
};

DataProcessorSpec sink{
"sink-" + std::to_string(subspec),
Inputs{{ "sum", "TST", "SUM", subspec },
{ "param", "TST", "PARAM", subspec }},
Outputs{},
AlgorithmSpec{
(AlgorithmSpec::ProcessCallback)[](ProcessingContext & ctx) {
LOG(INFO) << "Sum is: " << DataRefUtils::as<long long>(ctx.inputs().get("sum"))[0];
LOG(INFO) << "Param is: " << DataRefUtils::as<double>(ctx.inputs().get("param"))[0];
}
}
};

return { source, step, sink };
}
// clang-format on

WorkflowSpec defineDataProcessing(ConfigContext const&)
{
const std::string qcConfigurationSource =
std::string("json://") + getenv("QUALITYCONTROL_ROOT") + "/etc/advanced.json";
LOG(INFO) << "Using config file '" << qcConfigurationSource << "'";

WorkflowSpec specs;
// here we pretend to spawn topologies on three processing machines
for (int i = 1; i < 4; i++) {
auto localTopology = processingTopology(i);

DataSampling::GenerateInfrastructure(localTopology, qcConfigurationSource);
// a fix to make the topologies work when merged together
localTopology.back().name += std::to_string(i);

std::string host = "o2flptst" + std::to_string(i);
auto qcInfrastructure = InfrastructureGenerator::generateLocalInfrastructure(qcConfigurationSource, host);
// a fix to make the topologies work when merged together
qcInfrastructure.back().name += std::to_string(i);
// temporary fix, which shouldn't be necessary when data sampling uses matchers
qcInfrastructure.back().inputs[0].subSpec = i;
localTopology.insert(localTopology.end(), qcInfrastructure.begin(), qcInfrastructure.end());

specs.insert(std::end(specs), std::begin(localTopology), std::end(localTopology));
}

// Generation of the remote QC topology (for the QC servers)
auto qcInfrastructure = InfrastructureGenerator::generateRemoteInfrastructure(qcConfigurationSource);
specs.insert(std::end(specs), std::begin(qcInfrastructure), std::end(qcInfrastructure));

return specs;
}
2 changes: 1 addition & 1 deletion doc/doxyfile.in
Original file line number Diff line number Diff line change
Expand Up @@ -654,7 +654,7 @@ EXAMPLE_RECURSIVE = NO
# directories that contain image that are included in the documentation (see
# the \image command).

IMAGE_PATH =
IMAGE_PATH = images

# The INPUT_FILTER tag can be used to specify a program that doxygen should
# invoke to filter for each input file. Doxygen will invoke the filter program
Expand Down
Binary file added doc/images/qcRunAdvanced.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.

0 comments on commit 6d07be6

Please sign in to comment.