diff --git a/Framework/CMakeLists.txt b/Framework/CMakeLists.txt index 28d7e1deeb..ccf2fe252f 100644 --- a/Framework/CMakeLists.txt +++ b/Framework/CMakeLists.txt @@ -140,6 +140,7 @@ set( src/runInformationService.cxx src/runInformationServiceDump.cxx src/runBasic.cxx + src/runAdvanced.cxx src/runReadout.cxx src/runMergerTest.cxx src/runReadoutForDataDump.cxx @@ -151,6 +152,7 @@ set( qcInfoService qcInfoServiceDump qcRunBasic + qcRunAdvanced qcRunReadout runMergerTest qcRunReadoutForDataDump @@ -297,6 +299,7 @@ install( install( FILES basic.json + advanced.json readout.json readoutForDataDump.json DESTINATION etc diff --git a/Framework/advanced.json b/Framework/advanced.json new file mode 100644 index 0000000000..1834836038 --- /dev/null +++ b/Framework/advanced.json @@ -0,0 +1,91 @@ +{ + "qc": { + "config": { + "database": { + "implementation": "CCDB", + "host": "ccdb-test.cern.ch:8080", + "username": "not_applicable", + "password": "not_applicable", + "name": "not_applicable" + }, + "Activity": { + "number": "42", + "type": "2" + } + }, + "tasks": { + "dataSizeTask": { + "active": "true", + "className": "o2::quality_control_modules::skeleton::SkeletonTask", + "moduleName": "QcSkeleton", + "cycleDurationSeconds": "10", + "maxNumberCycles": "-1", + "dataSamplingPolicy": "tst2", + "location": "local", + "machines": [ + "o2flptst1", + "o2flptst2", + "o2flptst3" + ] + }, + "someNumbersTask": { + "active": "true", + "className": "o2::quality_control_modules::skeleton::SkeletonTask", + "moduleName": "QcSkeleton", + "cycleDurationSeconds": "10", + "maxNumberCycles": "-1", + "dataSamplingPolicy": "tst1", + "location": "remote" + } + } + }, + "dataSamplingPolicies": [ + { + "id": "tst1", + "active": "true", + "machines": [], + "dataHeaders": [ + { + "binding": "sum", + "dataOrigin": "TST", + "dataDescription": "SUM" + }, + { + "binding": "param", + "dataOrigin": "TST", + "dataDescription": "PARAM" + } + ], + "subSpec": "2", + "samplingConditions": [ + { + "condition": "random", + "fraction": "0.1", + "seed": "32112332123" + } + ], + "blocking": "false" + }, + { + "id": "tst2", + "active": "true", + "machines": [], + "dataHeaders": [ + { + "binding": "data", + "dataOrigin": "TST", + "dataDescription": "DATA" + } + ], + "subSpec": "*", + "samplingConditions": [ + { + "condition": "payloadSize", + "lowerLimit": "8000", + "upperLimit": "10000" + } + ], + "blocking": "false" + } + ] +} \ No newline at end of file diff --git a/Framework/include/QualityControl/TaskRunnerFactory.h b/Framework/include/QualityControl/TaskRunnerFactory.h index c6107c5b11..9461033392 100644 --- a/Framework/include/QualityControl/TaskRunnerFactory.h +++ b/Framework/include/QualityControl/TaskRunnerFactory.h @@ -22,7 +22,14 @@ class TaskRunnerFactory TaskRunnerFactory(); virtual ~TaskRunnerFactory(); - o2::framework::DataProcessorSpec create(std::string taskName, std::string configurationSource, size_t id = 0); + /// \brief Creator of tasks + /// + /// \param taskName - name of the task, which exists in tasks list in the configuration file + /// \param configurationSource - absolute path to configuration file, preceded with backend (f.e. "json://") + /// \param id - subSpecification for taskRunner's OutputSpec, useful to avoid outputs collisions one more complex topologies + /// \param resetAfterPublish - should taskRunner reset the user's task after each MO publication + o2::framework::DataProcessorSpec + create(std::string taskName, std::string configurationSource, size_t id = 0, bool resetAfterPublish = false); }; } // namespace core diff --git a/Framework/src/HistoMerger.cxx b/Framework/src/HistoMerger.cxx index a8a945da60..db5d8d103e 100644 --- a/Framework/src/HistoMerger.cxx +++ b/Framework/src/HistoMerger.cxx @@ -31,7 +31,8 @@ void HistoMerger::init(framework::InitContext& ctx) { mMonitorObject.reset(); } void HistoMerger::run(framework::ProcessingContext& ctx) { for (const auto& input : ctx.inputs()) { - if (input.header != nullptr && input.spec != nullptr) { + if (input.header != nullptr && input.spec != nullptr && + std::strstr(DataRefUtils::as(input)->getObject()->ClassName(), "TH1") != nullptr) { if (!mMonitorObject) { mMonitorObject.reset(DataRefUtils::as(input).release()); diff --git a/Framework/src/InfrastructureGenerator.cxx b/Framework/src/InfrastructureGenerator.cxx index c918dc0f49..38d4ff2abd 100644 --- a/Framework/src/InfrastructureGenerator.cxx +++ b/Framework/src/InfrastructureGenerator.cxx @@ -39,7 +39,7 @@ WorkflowSpec InfrastructureGenerator::generateLocalInfrastructure(std::string co if (machine.second.get("") == host) { // todo: optimize it by using the same ptree? - workflow.emplace_back(taskRunnerFactory.create(task.first, configurationSource, id)); + workflow.emplace_back(taskRunnerFactory.create(task.first, configurationSource, id, true)); break; } id++; diff --git a/Framework/src/TaskRunnerFactory.cxx b/Framework/src/TaskRunnerFactory.cxx index ed44f08bfd..38d5cb4772 100644 --- a/Framework/src/TaskRunnerFactory.cxx +++ b/Framework/src/TaskRunnerFactory.cxx @@ -29,9 +29,11 @@ TaskRunnerFactory::TaskRunnerFactory() {} TaskRunnerFactory::~TaskRunnerFactory() {} -o2::framework::DataProcessorSpec TaskRunnerFactory::create(std::string taskName, std::string configurationSource, size_t id) +o2::framework::DataProcessorSpec +TaskRunnerFactory::create(std::string taskName, std::string configurationSource, size_t id, bool resetAfterPublish) { auto qcTask = std::make_shared(taskName, configurationSource, id); + qcTask->setResetAfterPublish(resetAfterPublish); DataProcessorSpec newTask{ taskName, diff --git a/Framework/src/runAdvanced.cxx b/Framework/src/runAdvanced.cxx new file mode 100644 index 0000000000..1e659cf3d0 --- /dev/null +++ b/Framework/src/runAdvanced.cxx @@ -0,0 +1,155 @@ +// Copyright CERN and copyright holders of ALICE O2. This software is +// distributed under the terms of the GNU General Public License v3 (GPL +// Version 3), copied verbatim in the file "COPYING". +// +// See http://alice-o2.web.cern.ch/license for full licensing information. +// +// In applying this license CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +/// +/// \file runAdvanced.cxx +/// \author Piotr Konopka +/// +/// \brief This is an executable showing a more complicated QC topology. +/// +/// This is an executable showing a more complicated QC topology. It pretends to spawn 4 separate topologies - 3 of them +/// consist of some dummy processing chain, a dispatcher and a local QC task. The last one represents the remote +/// QC servers topology, which has a merger (joining the results from local QC tasks), a checker (checks the result of +/// the previous) and a different, remote QC task with associated checker. Here they are joined into one big topology +/// just to present the concept. +/// \image html qcRunAdvanced.png +/// +/// To launch it, build the project, load the environment and run the executable: +/// \code{.sh} +/// > aliBuild build QualityControl --defaults o2 +/// > alienv enter QualityControl/latest +/// > qcRunAdvanced +/// \endcode +/// If you have glfw installed, you should see a window with the workflow visualization and sub-windows for each Data +/// Processor where their logs can be seen. The processing will continue until the main window it is closed. Regardless +/// of glfw being installed or not, in the terminal all the logs will be shown as well. + +#include +#include + +using namespace o2::framework; + +// Additional configuration of the topology, which is done by implementing `customize` functions and placing them +// before `runDataProcessing.h` header. In this case, both Dispatcher and Merger are configured to accept incoming +// messages without waiting for the rest of inputs. +void customize(std::vector& policies) +{ + DataSampling::CustomizeInfrastructure(policies); + + CompletionPolicy mergerConsumesASAP{ + "mergers-always-consume", + [](DeviceSpec const& device) { + return device.name.find("merger") != std::string::npos; + }, + [](gsl::span const& inputs) { + return CompletionPolicy::CompletionOp::Consume; + } + }; + policies.push_back(mergerConsumesASAP); +} + +void customize(std::vector& policies) +{ + DataSampling::CustomizeInfrastructure(policies); +} + +#include "QualityControl/InfrastructureGenerator.h" +#include +#include + +using namespace o2::header; +using namespace o2::quality_control::core; +using SubSpecificationType = o2::header::DataHeader::SubSpecificationType; + +// clang-format off +WorkflowSpec processingTopology(SubSpecificationType subspec) +{ + DataProcessorSpec source{ + "source-" + std::to_string(subspec), + Inputs{}, + Outputs{{ "TST", "DATA", subspec }, + { "TST", "PARAM", subspec }}, + AlgorithmSpec{ + (AlgorithmSpec::ProcessCallback) + [generator = std::default_random_engine{ time(nullptr) }, subspec](ProcessingContext & ctx) mutable { + usleep(200000); + auto data = ctx.outputs().make(Output{ "TST", "DATA", subspec }, generator() % 10000); + for (auto&& item : data) { + item = static_cast(generator()); + } + ctx.outputs().make(Output{ "TST", "PARAM", subspec }, 1)[0] = 1 / static_cast(1 + generator()); + } + } + }; + + DataProcessorSpec step{ + "step-" + std::to_string(subspec), + Inputs{{ "data", "TST", "DATA", subspec }}, + Outputs{{ "TST", "SUM", subspec }}, + AlgorithmSpec{ + (AlgorithmSpec::ProcessCallback)[subspec](ProcessingContext & ctx) { + const auto* header = get(ctx.inputs().get("data").header); + auto data = DataRefUtils::as(ctx.inputs().get("data")); + long long sum = 0; + for (auto d : data) { sum += d; } + ctx.outputs().snapshot(Output{ "TST", "SUM", subspec }, sum); + } + } + }; + + DataProcessorSpec sink{ + "sink-" + std::to_string(subspec), + Inputs{{ "sum", "TST", "SUM", subspec }, + { "param", "TST", "PARAM", subspec }}, + Outputs{}, + AlgorithmSpec{ + (AlgorithmSpec::ProcessCallback)[](ProcessingContext & ctx) { + LOG(INFO) << "Sum is: " << DataRefUtils::as(ctx.inputs().get("sum"))[0]; + LOG(INFO) << "Param is: " << DataRefUtils::as(ctx.inputs().get("param"))[0]; + } + } + }; + + return { source, step, sink }; +} +// clang-format on + +WorkflowSpec defineDataProcessing(ConfigContext const&) +{ + const std::string qcConfigurationSource = + std::string("json://") + getenv("QUALITYCONTROL_ROOT") + "/etc/advanced.json"; + LOG(INFO) << "Using config file '" << qcConfigurationSource << "'"; + + WorkflowSpec specs; + // here we pretend to spawn topologies on three processing machines + for (int i = 1; i < 4; i++) { + auto localTopology = processingTopology(i); + + DataSampling::GenerateInfrastructure(localTopology, qcConfigurationSource); + // a fix to make the topologies work when merged together + localTopology.back().name += std::to_string(i); + + std::string host = "o2flptst" + std::to_string(i); + auto qcInfrastructure = InfrastructureGenerator::generateLocalInfrastructure(qcConfigurationSource, host); + // a fix to make the topologies work when merged together + qcInfrastructure.back().name += std::to_string(i); + // temporary fix, which shouldn't be necessary when data sampling uses matchers + qcInfrastructure.back().inputs[0].subSpec = i; + localTopology.insert(localTopology.end(), qcInfrastructure.begin(), qcInfrastructure.end()); + + specs.insert(std::end(specs), std::begin(localTopology), std::end(localTopology)); + } + + // Generation of the remote QC topology (for the QC servers) + auto qcInfrastructure = InfrastructureGenerator::generateRemoteInfrastructure(qcConfigurationSource); + specs.insert(std::end(specs), std::begin(qcInfrastructure), std::end(qcInfrastructure)); + + return specs; +} \ No newline at end of file diff --git a/doc/doxyfile.in b/doc/doxyfile.in index 2619582476..ba997c8056 100644 --- a/doc/doxyfile.in +++ b/doc/doxyfile.in @@ -654,7 +654,7 @@ EXAMPLE_RECURSIVE = NO # directories that contain image that are included in the documentation (see # the \image command). -IMAGE_PATH = +IMAGE_PATH = images # The INPUT_FILTER tag can be used to specify a program that doxygen should # invoke to filter for each input file. Doxygen will invoke the filter program diff --git a/doc/images/qcRunAdvanced.png b/doc/images/qcRunAdvanced.png new file mode 100644 index 0000000000..902d1c1bed Binary files /dev/null and b/doc/images/qcRunAdvanced.png differ