From 977119d3d70c36573e8f2b7b7b64d40083df5be0 Mon Sep 17 00:00:00 2001 From: knopers8 Date: Mon, 12 Nov 2018 14:36:30 +0100 Subject: [PATCH] Reworked QC infrastructure generation (#87) This commit introduces a main qc factory which can be used to generate a full topology with just one function call. As the TaskRunner can be placed either locally on FLP or EPN machine or remotely on dedicated QC servers, a distinction between this two has been introduced. When a task is 'local', the taskRunners will be generated by generateLocalInfrastructure(), but mergers and checkers by generateRemoteInfrastructe(). If a task is 'remote', both taskRunner and checker will be generated as remote infrastructure. The commit removes also some deprecated config files. --- Framework/CMakeLists.txt | 11 ++- Framework/basic.json | 14 +-- Framework/example-default.ini | 82 ---------------- Framework/example-default.json | 87 ++++++++++------- .../QualityControl/InfrastructureGenerator.h | 57 +++++++++++ .../include/QualityControl/TaskFactory.h | 2 +- Framework/include/QualityControl/TaskRunner.h | 15 ++- .../QualityControl/TaskRunnerFactory.h | 2 +- Framework/qcTaskDplConfig.ini | 79 --------------- Framework/readout.json | 13 +-- Framework/src/Checker.cxx | 3 +- Framework/src/InfrastructureGenerator.cxx | 96 +++++++++++++++++++ Framework/src/TaskRunner.cxx | 45 +++++---- Framework/src/TaskRunnerFactory.cxx | 4 +- Framework/src/runBasic.cxx | 23 ++--- Framework/src/runReadout.cxx | 18 ++-- .../test/testInfrastructureGenerator.cxx | 95 ++++++++++++++++++ Framework/test/testQCFactory.json | 75 +++++++++++++++ 18 files changed, 456 insertions(+), 265 deletions(-) delete mode 100644 Framework/example-default.ini create mode 100644 Framework/include/QualityControl/InfrastructureGenerator.h delete mode 100644 Framework/qcTaskDplConfig.ini create mode 100644 Framework/src/InfrastructureGenerator.cxx create mode 100644 Framework/test/testInfrastructureGenerator.cxx create mode 100644 Framework/test/testQCFactory.json diff --git a/Framework/CMakeLists.txt b/Framework/CMakeLists.txt index 1870e583e3..28d7e1deeb 100644 --- a/Framework/CMakeLists.txt +++ b/Framework/CMakeLists.txt @@ -17,6 +17,7 @@ set( src/TaskInterface.cxx src/RepositoryBenchmark.cxx src/HistoMerger.cxx + src/InfrastructureGenerator.cxx ) set( @@ -31,6 +32,7 @@ set( include/QualityControl/TaskRunner.h include/QualityControl/TaskRunnerFactory.h include/QualityControl/HistoMerger.h + include/QualityControl/InfrastructureGenerator.h ) set( @@ -202,6 +204,7 @@ set( test/testMonitorObject.cxx test/testPublisher.cxx test/testQcInfoLogger.cxx + test/testInfrastructureGenerator.cxx test/testQCTask.cxx test/testQuality.cxx ) @@ -216,6 +219,12 @@ foreach(test ${TEST_SRCS}) set_tests_properties(${test_name} PROPERTIES TIMEOUT 60) endforeach() +install( + FILES + test/testQCFactory.json + DESTINATION test +) + # ---- Install ---- # Build targets with install rpath on Mac to dramatically speed up installation @@ -280,7 +289,6 @@ install( install(PROGRAMS script/qcDatabaseSetup.sh DESTINATION bin) install( FILES - example-default.ini example-default.json alfa.json dataDump.json @@ -288,7 +296,6 @@ install( ) install( FILES - qcTaskDplConfig.ini basic.json readout.json readoutForDataDump.json diff --git a/Framework/basic.json b/Framework/basic.json index 87db31cec1..055e9d7bd1 100644 --- a/Framework/basic.json +++ b/Framework/basic.json @@ -13,18 +13,18 @@ "type": "2" } }, - "tasks_config": { + "tasks": { "QcTask": { - "taskDefinition": "QcTaskDefinition" - }, - "QcTaskDefinition": { + "active": "true", "className": "o2::quality_control_modules::skeleton::SkeletonTask", "moduleName": "QcSkeleton", - "outputDataOrigin": "ITS", "cycleDurationSeconds": "10", "maxNumberCycles": "-1", - "outputDataDescription": "HIST_SKLT_TASK", - "dataSamplingPolicy": "its-raw" + "dataSamplingPolicy": "its-raw", + "taskParameters": { + "nothing": "rien" + }, + "location": "remote" } } }, diff --git a/Framework/example-default.ini b/Framework/example-default.ini deleted file mode 100644 index f76c575431..0000000000 --- a/Framework/example-default.ini +++ /dev/null @@ -1,82 +0,0 @@ - -; DEPRECATED - -;=============================== -; General -;------------------------------- - -[qc/config/DataSampling] -;implementation=FairSampler -implementation=MockSampler - -[qc/config/database] -implementation=MySql -username=qc_user -password=qc_user -host=localhost -name=quality_control -#implementation=CCDB -#host=localhost:8080 - -[qc/config/Activity] # Similar to a "run" -number=42 -type=2 - -;=============================== -; Tasks -;------------------------------- - -[qc/task_config/myTask_1] # Dummy task 1 -taskDefinition=qc/task_config/taskDefinition_1 - -[qc/task_config/myTask_2] # Dummy task 2 -taskDefinition=qc/task_config/taskDefinition_1 - -[qc/task_config/taskDefinition_1] # Actual definition of the dummy tasks -;className=o2::quality_control_modules::skeleton::SkeletonTask -className=o2::quality_control_modules::example::ExampleTask -moduleName=QcExample -cycleDurationSeconds=10 -; set to -1 for no maximum or remove the line -maxNumberCycles=-1 -;exampleTaskOutput=0 - -[qc/task_config/daqTask] -taskDefinition=qc/task_config/daqTaskDefinition - -[qc/task_config/daqTaskDefinition] -className=o2::quality_control_modules::daq::DaqTask -moduleName=QcDaq -maxNumberCycles=-1 - -[qc/task_config/benchmarkTask_0] -taskDefinition=qc/task_config/benchmark -;[qc/benchmarkTask_1] -;taskDefinition=benchmark -;address=tcp://*:5556 - -[qc/task_config/benchmark] # Benchmark tasks definition -className=o2::quality_control_modules::example::BenchmarkTask -moduleName=QcExample -numberHistos=1 -numberChecks=1 -typeOfChecks=o2::quality_control_modules::example::FakeCheck -moduleOfChecks=QcExample -cycleDurationSeconds=1 - -;=============================== -; Checkers -;------------------------------- - -[qc/checkers_config] ; needed for the time being because we don't have an information service -numberCheckers=1 -numberTasks=1 -tasksAddresses=tcp://localhost:5556,tcp://localhost:5557,tcp://localhost:5558,tcp://localhost:5559 - -[qc_checkers_config_checker_0] -broadcast=0 -broadcastAddress=tcp://*:5600 -id=0 - -[qc/checkers_config/Checks] -checkMeanIsAbove/threshold=1 \ No newline at end of file diff --git a/Framework/example-default.json b/Framework/example-default.json index 4c9ada11ca..8043ea4e67 100644 --- a/Framework/example-default.json +++ b/Framework/example-default.json @@ -1,9 +1,6 @@ { "qc": { "config": { - "DataSampling": { - "implementation": "MockSampler" - }, "database": { "username": "qc_user", "password": "qc_user", @@ -16,53 +13,71 @@ "type": "2" } }, - "tasks_config": { + "tasks": { "myTask_1": { - "taskDefinition": "taskDefinition_1" - }, - "myTask_2": { - "taskDefinition": "taskDefinition_1" - }, - "taskDefinition_1": { "className": "o2::quality_control_modules::example::ExampleTask", "moduleName": "QcExample", "cycleDurationSeconds": "10", - "maxNumberCycles": "-1" + "maxNumberCycles": "-1", + "dataSamplingPolicy": "ex1", + "location": "remote" }, "daqTask": { - "taskDefinition": "daqTaskDefinition" - }, - "daqTaskDefinition": { "className": "o2::quality_control_modules::daq::DaqTask", "moduleName": "QcDaq", "maxNumberCycles": "-1", - "cycleDurationSeconds": "10" + "cycleDurationSeconds": "10", + "dataSamplingPolicy": "mftclusters", + "location": "remote" }, "benchmarkTask_0": { - "taskDefinition": "benchmark" - }, - "benchmark": { "className": "o2::quality_control_modules::example::BenchmarkTask", "moduleName": "QcExample", - "numberHistos": "1", - "numberChecks": "1", - "typeOfChecks": "o2::quality_control_modules::example::FakeCheck", - "moduleOfChecks": "QcExample", - "cycleDurationSeconds": "1" + "cycleDurationSeconds": "1", + "dataSamplingPolicy": "ex1", + "location": "local", + "machines": [ + "o2flp1", + "o2flp2" + ] } + } + }, + "dataSamplingPoliciesFile_comment": "In case that policies are stored in different file, specify its path below. When both dataSamplingPolicies and dataSamplingPoliciesFile are specified, the latter has higher priority", + "dataSamplingPoliciesFile": "json:///home/genghiskhan/alice/QualityControl/dataSamplingConfig.json", + "dataSamplingPolicies_comment": "this is ignored when dataSamplingPoliciesFile is specified", + "dataSamplingPolicies": [ + { + "id": "ex1", + "active": "true", + "dataHeaders": [ + { + "binding": "data", + "dataOrigin": "TST", + "dataDescription": "DATA" + } + ], + "subSpec": "0", + "samplingConditions": [] }, - "checkers_config": { - "numberCheckers": "1", - "numberTasks": "1", - "tasksAddresses": "tcp://localhost:5556,tcp://localhost:5557,tcp://localhost:5558,tcp://localhost:5559", - "checker_0": { - "broadcast": "0", - "broadcastAddress": "tcp://*:5600", - "id": "0" - }, - "Checks": { - "checkMeanIsAbove/threshold": "1" - } + { + "id": "mftclusters", + "active": "true", + "dataHeaders": [ + { + "binding": "mft-clusters", + "dataOrigin": "MFT", + "dataDescription": "CLUSTERS" + } + ], + "subSpec": "0", + "samplingConditions": [ + { + "condition": "payloadSize", + "upperLimit": "5000", + "lowerLimit": "1000" + } + ] } - } + ] } \ No newline at end of file diff --git a/Framework/include/QualityControl/InfrastructureGenerator.h b/Framework/include/QualityControl/InfrastructureGenerator.h new file mode 100644 index 0000000000..7285a5fc69 --- /dev/null +++ b/Framework/include/QualityControl/InfrastructureGenerator.h @@ -0,0 +1,57 @@ +/// +/// \file QualityControlFactory.h +/// \author Piotr Konopka +/// + +#ifndef QC_CORE_QUALITYCONTROLFACTORY_H +#define QC_CORE_QUALITYCONTROLFACTORY_H + +#include +#include + +namespace o2 +{ +namespace quality_control +{ +namespace core +{ + +/// \brief A factory class which can generate QC topologies given a configuration file. +/// +/// A factory class which can generate QC topologies given a configuration file (example in Framework/basic.json and +/// Framework/example-default.json). As QC topologies will be spread on both processing chain machines and dedicated +/// QC servers, a _local_ vs. _remote_ distinction was introduced. Tasks which are _local_ should have taskRunners +/// placed on FLP or EPN machines and their results should be merged and checked on QC servers. The 'remote' option +/// means, that full QC chain should be located on remote (QC) machines. For the laptop development, use 'remote' tasks +/// and generateRemoteInfrastructure() to obtain the full topology in one go. +/// +/// \author Piotr Konopka +class InfrastructureGenerator +{ + public: + InfrastructureGenerator() = delete; + + /// \brief Generates the local part of the QC infrastructure for a specified host. + /// + /// Generates the local part of the QC infrastructure for a specified host - taskRunners which are declared in the + /// configuration to be 'local'. + /// + /// \param configurationSource - full path to configuration file, preceded with the backend (f.e. "json://") + /// \param host - name of the machine + /// \return generated local QC workflow + static o2::framework::WorkflowSpec generateLocalInfrastructure(std::string configurationSource, std::string host); + + /// \brief Generates the remote part of the QC infrastructure + /// + /// Generates the remote part of the QC infrastructure - mergers and checkers for 'local' tasks and full QC chain for + /// 'remote' tasks. + /// + /// \param configurationSource - full path to configuration file, preceded with the backend (f.e. "json://") + /// \return generated remote QC workflow + static o2::framework::WorkflowSpec generateRemoteInfrastructure(std::string configurationSource); +}; +} +} +} + +#endif //QC_CORE_QUALITYCONTROLFACTORY_H diff --git a/Framework/include/QualityControl/TaskFactory.h b/Framework/include/QualityControl/TaskFactory.h index c077a30fc2..39e6aeec8a 100644 --- a/Framework/include/QualityControl/TaskFactory.h +++ b/Framework/include/QualityControl/TaskFactory.h @@ -55,7 +55,7 @@ class TaskFactory std::string library = "lib" + taskConfig.moduleName; logger << "Loading library " << library << AliceO2::InfoLogger::InfoLogger::endm; int libLoaded = gSystem->Load(library.c_str(), "", true); - if (libLoaded) { + if (libLoaded < 0) { BOOST_THROW_EXCEPTION(FatalException() << errinfo_details("Failed to load Detector Publisher Library")); } diff --git a/Framework/include/QualityControl/TaskRunner.h b/Framework/include/QualityControl/TaskRunner.h index 48f5cd771f..4119ef0b27 100644 --- a/Framework/include/QualityControl/TaskRunner.h +++ b/Framework/include/QualityControl/TaskRunner.h @@ -65,7 +65,12 @@ using namespace std::chrono; class TaskRunner { public: - TaskRunner(std::string taskName, std::string configurationSource); + /// \brief Constructor + /// + /// \param taskName - name of the task, which exists in tasks list in the configuration file + /// \param configurationSource - absolute path to configuration file, preceded with backend (f.e. "json://") + /// \param id - subSpecification for taskRunner's OutputSpec, useful to avoid outputs collisions one more complex topologies + TaskRunner(std::string taskName, std::string configurationSource, size_t id = 0); ~TaskRunner(); /// \brief To be invoked during initialization of Data Processor @@ -80,8 +85,10 @@ class TaskRunner void setResetAfterPublish(bool); + /// \brief Unified DataOrigin for Quality Control tasks + static header::DataOrigin createTaskDataOrigin(); /// \brief Unified DataDescription naming scheme for all tasks - static o2::header::DataDescription createTaskDataDescription(const std::string taskName); + static header::DataDescription createTaskDataDescription(const std::string& taskName); private: void populateConfig(std::string taskName); @@ -94,8 +101,8 @@ class TaskRunner private: std::string mTaskName; TaskConfig mTaskConfig; - std::shared_ptr mConfigFile; // used in init only - std::shared_ptr mCollector; + std::shared_ptr mConfigFile; // used in init only + std::shared_ptr mCollector; TaskInterface* mTask; bool mResetAfterPublish; std::shared_ptr mObjectsManager; diff --git a/Framework/include/QualityControl/TaskRunnerFactory.h b/Framework/include/QualityControl/TaskRunnerFactory.h index 05764edd4d..c6107c5b11 100644 --- a/Framework/include/QualityControl/TaskRunnerFactory.h +++ b/Framework/include/QualityControl/TaskRunnerFactory.h @@ -22,7 +22,7 @@ class TaskRunnerFactory TaskRunnerFactory(); virtual ~TaskRunnerFactory(); - o2::framework::DataProcessorSpec create(std::string taskName, std::string configurationSource); + o2::framework::DataProcessorSpec create(std::string taskName, std::string configurationSource, size_t id = 0); }; } // namespace core diff --git a/Framework/qcTaskDplConfig.ini b/Framework/qcTaskDplConfig.ini deleted file mode 100644 index 6b008e7549..0000000000 --- a/Framework/qcTaskDplConfig.ini +++ /dev/null @@ -1,79 +0,0 @@ - -; DEPRECATED - -;=============================== -; General -;------------------------------- - -[qc/config/DataSampling] -# list of tasks that need Data Sampling -tasksList=skeletonTask -# Generate time-pipelined dispatchers if data sources are time-pipelined. When disabled, one dispatcher will be generated for the same time-pipelined Data Processors. -enableTimePipeliningDispatchers=0 -# Generate parallel dispatcher for each parallel flow of data (having different subSpecs). When disabled, one dispatcher will be generated for all parallel flows. -enableParallelDispatchers=0 -# Now it does nothing. -enableProxy=0 - -[qc/config/database] -;implementation=MySql -username=qc_user -password=qc_user -;host=localhost -name=quality_control -implementation=CCDB -host=localhost:8080 - -[qc/config/Activity] # Similar to a "run" -number=42 -type=2 - -;=============================== -; Tasks -;------------------------------- -[qc/task_config/skeletonTask] -# Indirection to the actual task definition. -taskDefinition=skeletonTaskDefinition - -[qc/task_config/skeletonTaskDefinition] -# The name of your class, with full namespace. -className=o2::quality_control_modules::skeleton::SkeletonTask -# Which library contains the class. -moduleName=QcSkeleton -# Number of cycles to perform. '-1' for infinite. -maxNumberCycles=-1 -# 'Origin' header field of all objects published by QC task. Max. 4 characters. Might be generated automatically later. -outputDataOrigin=ITS -# 'Description' header field of all objects published by QC task. Max. 16 characters. Might be generated automatically later. -outputDataDescription=HIST_SKLT_TASK -# Inputs list - all indirections to sampled data specifications, comma-separated (e.g. readoutInput,otherInput,andSoOnInput). -inputs=readoutInput -# Fraction of data that should be passed to QC task. The smaller fraction, the lesser CPU/memory/transfer impact. -fraction=0.1 -# Choose specific subSpec of data. Set to -1 to choose all (or if you don't know what that means) -subSpec=-1 - -[qc/task_config/readoutInput] # Sampled data specification, referenced in the 'inputs' field above. -# Binding, that you will use to access data in the QC code. -inputName=readout -# 'Origin' header field of desired data. -dataOrigin=ITS -# 'Description' header field of desired data. -dataDescription=RAWDATA - -;=============================== -; Checkers -;------------------------------- - -[qc/checkers_config] ; needed for the time being because we don't have an information service -numberCheckers=1 -numberTasks=1 -tasksAddresses=tcp://localhost:5556,tcp://localhost:5557,tcp://localhost:5558,tcp://localhost:5559 - -[qc/checkers_config/checker_0] -broadcast=1 -broadcastAddress=tcp://*:5600 -id=0 - -[qc/checkers_config/Checks] -checkMeanIsAbove/threshold=1 \ No newline at end of file diff --git a/Framework/readout.json b/Framework/readout.json index bbac18614c..0e24201c14 100644 --- a/Framework/readout.json +++ b/Framework/readout.json @@ -13,18 +13,15 @@ "type": "2" } }, - "tasks_config": { + "tasks": { "daqTask": { - "taskDefinition": "daqTaskDefinition" - }, - "daqTaskDefinition": { + "active": "true", "className": "o2::quality_control_modules::skeleton::SkeletonTask", "moduleName": "QcSkeleton", - "outputDataOrigin": "ITS", "cycleDurationSeconds": "10", - "maxNumberCycles": "10", - "outputDataDescription": "HIST_SKLT_TASK", - "dataSamplingPolicy": "readout" + "maxNumberCycles": "-1", + "dataSamplingPolicy": "readout", + "location": "remote" } } }, diff --git a/Framework/src/Checker.cxx b/Framework/src/Checker.cxx index 1e4ba2e026..0415c26191 100644 --- a/Framework/src/Checker.cxx +++ b/Framework/src/Checker.cxx @@ -39,7 +39,7 @@ namespace checker Checker::Checker(std::string checkerName, std::string taskName, std::string configurationSource) : mCheckerName(checkerName), mConfigurationSource(configurationSource), - mInputSpec{ "mo", "QC", TaskRunner::createTaskDataDescription(taskName), 0 }, + mInputSpec{ "mo", TaskRunner::createTaskDataOrigin(), TaskRunner::createTaskDataDescription(taskName), 0 }, mOutputSpec{ "QC", Checker::createCheckerDataDescription(taskName), 0 }, mLogger(QcInfoLogger::GetInstance()), startFirstObject{ system_clock::time_point::min() }, @@ -102,7 +102,6 @@ void Checker::run(framework::ProcessingContext& ctx) } for (auto&& input : ctx.inputs()) { - // will that work properly with shmem? std::shared_ptr mo{ std::move(framework::DataRefUtils::as(input)) }; diff --git a/Framework/src/InfrastructureGenerator.cxx b/Framework/src/InfrastructureGenerator.cxx new file mode 100644 index 0000000000..c918dc0f49 --- /dev/null +++ b/Framework/src/InfrastructureGenerator.cxx @@ -0,0 +1,96 @@ +/// +/// \file QualityControlFactory.cxx +/// \author Piotr Konopka +/// + +#include +#include "QualityControl/InfrastructureGenerator.h" +#include "QualityControl/TaskRunnerFactory.h" +#include "QualityControl/CheckerFactory.h" +#include +#include +#include + +using namespace o2::framework; +using namespace o2::configuration; +using namespace o2::quality_control::checker; +using boost::property_tree::ptree; + +namespace o2 +{ +namespace quality_control +{ +namespace core +{ + +WorkflowSpec InfrastructureGenerator::generateLocalInfrastructure(std::string configurationSource, std::string host) +{ + WorkflowSpec workflow; + TaskRunnerFactory taskRunnerFactory; + auto config = ConfigurationFactory::getConfiguration(configurationSource); + + for (const auto& task : config->getRecursive("qc.tasks")) { + if (task.second.get("active") && task.second.get("location") == "local") { + // ids are assigned to local tasks in order to distinguish monitor objects outputs from each other and be able to + // merge them. If there is no need to merge (only one qc task), it gets subspec 0. + // todo: use matcher for subspec when available in DPL + size_t id = task.second.get_child("machines").size() > 1 ? 1 : 0; + for (const auto& machine : task.second.get_child("machines")) { + + if (machine.second.get("") == host) { + // todo: optimize it by using the same ptree? + workflow.emplace_back(taskRunnerFactory.create(task.first, configurationSource, id)); + break; + } + id++; + } + } + } + return workflow; +} + +o2::framework::WorkflowSpec InfrastructureGenerator::generateRemoteInfrastructure(std::string configurationSource) +{ + WorkflowSpec workflow; + auto config = ConfigurationFactory::getConfiguration(configurationSource); + + TaskRunnerFactory taskRunnerFactory; + CheckerFactory checkerFactory; + for (const auto& task : config->getRecursive("qc.tasks")) { + // todo sanitize somehow this if-frenzy + if (task.second.get("active", true)) { + if (task.second.get("location") == "local") { + // if tasks are LOCAL, generate mergers + checkers + + //todo use real mergers when they are done + + // generate merger only, when there is a need to merge something + if (task.second.get_child("machines").size() > 1) { + HistoMerger merger(task.first + "-merger", 1); + merger.configureInputsOutputs(TaskRunner::createTaskDataOrigin(), + TaskRunner::createTaskDataDescription(task.first), + { 1, task.second.get_child("machines").size() }); + DataProcessorSpec mergerSpec{ + merger.getName(), + merger.getInputSpecs(), + Outputs{ merger.getOutputSpec() }, + adaptFromTask(std::move(merger)), + }; + + workflow.emplace_back(mergerSpec); + } + + } else if (task.second.get("location") == "remote") { + // -- if tasks are REMOTE, generate tasks + mergers + checkers + + workflow.emplace_back(taskRunnerFactory.create(task.first, configurationSource, 0)); + } + + workflow.emplace_back(checkerFactory.create(task.first + "-checker", task.first, configurationSource)); + } + } + return workflow; +} +} +} +} \ No newline at end of file diff --git a/Framework/src/TaskRunner.cxx b/Framework/src/TaskRunner.cxx index 2e8e2c302e..55542d87ca 100644 --- a/Framework/src/TaskRunner.cxx +++ b/Framework/src/TaskRunner.cxx @@ -39,20 +39,21 @@ using namespace o2::configuration; using namespace o2::monitoring; using namespace std::chrono; -TaskRunner::TaskRunner(std::string taskName, std::string configurationSource) +TaskRunner::TaskRunner(std::string taskName, std::string configurationSource, size_t id) : mTaskName(taskName), mNumberBlocks(0), mTotalNumberObjectsPublished(0), mLastNumberObjects(0), mCycleOn(false), mCycleNumber(0), - mMonitorObjectsSpec("", "", 0), + mMonitorObjectsSpec(createTaskDataOrigin(), createTaskDataDescription(taskName), id), mResetAfterPublish(false) { // setup configuration mConfigFile = ConfigurationFactory::getConfiguration(configurationSource); populateConfig(mTaskName); + // todo: consider moving everything below to init // setup monitoring mCollector = MonitoringFactory::Get("infologger://"); @@ -132,32 +133,40 @@ void TaskRunner::timerCallback(ProcessingContext& pCtx) { finishCycle(pCtx.outpu void TaskRunner::setResetAfterPublish(bool resetAfterPublish) { mResetAfterPublish = resetAfterPublish; } -o2::header::DataDescription TaskRunner::createTaskDataDescription(const std::string taskName) +header::DataOrigin TaskRunner::createTaskDataOrigin() +{ + return header::DataOrigin{ "QC" }; +} + +header::DataDescription TaskRunner::createTaskDataDescription(const std::string& taskName) { o2::header::DataDescription description; - description.runtimeInit(std::string(taskName.substr(0, o2::header::DataDescription::size - 3) + "-mo").c_str()); + description.runtimeInit(std::string(taskName.substr(0, header::DataDescription::size - 3) + "-mo").c_str()); return description; } void TaskRunner::populateConfig(std::string taskName) { try { - std::string prefix = std::string("qc.tasks_config."); - std::string taskDefinitionName = mConfigFile->get(prefix + taskName + ".taskDefinition"); + auto tasksConfigList = mConfigFile->getRecursive("qc.tasks"); + auto taskConfigTree = tasksConfigList.find(taskName); + if (taskConfigTree == tasksConfigList.not_found()) { + throw; + } mTaskConfig.taskName = taskName; - auto taskConfigTree = mConfigFile->getRecursive(prefix + taskDefinitionName); - mTaskConfig.moduleName = taskConfigTree.get("moduleName"); - mTaskConfig.className = taskConfigTree.get("className"); - mTaskConfig.cycleDurationSeconds = taskConfigTree.get("cycleDurationSeconds", 10); - mTaskConfig.maxNumberCycles = taskConfigTree.get("maxNumberCycles", -1); - - mInputSpecs = framework::DataSampling::InputSpecsForPolicy(mConfigFile.get(), taskConfigTree.get("dataSamplingPolicy")); - - mMonitorObjectsSpec.origin.runtimeInit("QC"); - mMonitorObjectsSpec.description = createTaskDataDescription(taskName); - mMonitorObjectsSpec.subSpec = 0; - mMonitorObjectsSpec.lifetime = o2::framework::Lifetime::QA; + mTaskConfig.moduleName = taskConfigTree->second.get("moduleName"); + mTaskConfig.className = taskConfigTree->second.get("className"); + mTaskConfig.cycleDurationSeconds = taskConfigTree->second.get("cycleDurationSeconds", 10); + mTaskConfig.maxNumberCycles = taskConfigTree->second.get("maxNumberCycles", -1); + + std::string policiesFilePath = mConfigFile->get("dataSamplingPolicyFile", ""); + if (policiesFilePath.empty()) { + mInputSpecs = framework::DataSampling::InputSpecsForPolicy(mConfigFile.get(), taskConfigTree->second.get("dataSamplingPolicy")); + } else { + mInputSpecs = framework::DataSampling::InputSpecsForPolicy(policiesFilePath, taskConfigTree->second.get("dataSamplingPolicy")); + } + } catch (...) { // catch already here the configuration exception and print it // because if we are in a constructor, the exception could be lost std::string diagnostic = boost::current_exception_diagnostic_information(); diff --git a/Framework/src/TaskRunnerFactory.cxx b/Framework/src/TaskRunnerFactory.cxx index 86153a9def..ed44f08bfd 100644 --- a/Framework/src/TaskRunnerFactory.cxx +++ b/Framework/src/TaskRunnerFactory.cxx @@ -29,9 +29,9 @@ TaskRunnerFactory::TaskRunnerFactory() {} TaskRunnerFactory::~TaskRunnerFactory() {} -DataProcessorSpec TaskRunnerFactory::create(std::string taskName, std::string configurationSource) +o2::framework::DataProcessorSpec TaskRunnerFactory::create(std::string taskName, std::string configurationSource, size_t id) { - auto qcTask = std::make_shared(taskName, configurationSource); + auto qcTask = std::make_shared(taskName, configurationSource, id); DataProcessorSpec newTask{ taskName, diff --git a/Framework/src/runBasic.cxx b/Framework/src/runBasic.cxx index f2d1af91e3..8c6a256646 100644 --- a/Framework/src/runBasic.cxx +++ b/Framework/src/runBasic.cxx @@ -52,8 +52,7 @@ void customize(std::vector& policies) #include "Framework/runDataProcessing.h" #include "QualityControl/Checker.h" -#include "QualityControl/CheckerFactory.h" -#include "QualityControl/TaskRunnerFactory.h" +#include "QualityControl/InfrastructureGenerator.h" using namespace o2::framework; using namespace o2::quality_control::core; @@ -63,6 +62,7 @@ using namespace std::chrono; WorkflowSpec defineDataProcessing(ConfigContext const&) { WorkflowSpec specs; + // The producer to generate some data in the workflow DataProcessorSpec producer{ "producer", Inputs{}, @@ -87,21 +87,21 @@ WorkflowSpec defineDataProcessing(ConfigContext const&) specs.push_back(producer); - // Exemplary initialization of QC Task: - const std::string qcTaskName = "QcTask"; const std::string qcConfigurationSource = std::string("json://") + getenv("QUALITYCONTROL_ROOT") + "/etc/basic.json"; - TaskRunnerFactory taskFactory; - specs.push_back(taskFactory.create(qcTaskName, qcConfigurationSource)); + LOG(INFO) << "Using config file '" << qcConfigurationSource << "'"; + + // Generation of Data Sampling infrastructure + DataSampling::GenerateInfrastructure(specs, qcConfigurationSource); - // Now the QC Checker - CheckerFactory checkerFactory; - specs.push_back(checkerFactory.create("checker_0", qcTaskName, qcConfigurationSource)); + // Generation of the QC topology (one task, one checker in this case) + auto qcInfrastructure = InfrastructureGenerator::generateRemoteInfrastructure(qcConfigurationSource); + specs.insert(std::end(specs), std::begin(qcInfrastructure), std::end(qcInfrastructure)); // Finally the printer DataProcessorSpec printer{ "printer", Inputs{ - { "checked-mo", "QC", Checker::createCheckerDataDescription(qcTaskName), 0 } + { "checked-mo", "QC", Checker::createCheckerDataDescription("QcTask"), 0 } }, Outputs{}, AlgorithmSpec{ @@ -125,8 +125,5 @@ WorkflowSpec defineDataProcessing(ConfigContext const&) }; specs.push_back(printer); - LOG(INFO) << "Using config file '" << qcConfigurationSource << "'"; - o2::framework::DataSampling::GenerateInfrastructure(specs, qcConfigurationSource); - return specs; } diff --git a/Framework/src/runReadout.cxx b/Framework/src/runReadout.cxx index 5ed868c2fa..b73b74603e 100644 --- a/Framework/src/runReadout.cxx +++ b/Framework/src/runReadout.cxx @@ -46,6 +46,8 @@ void customize(std::vector& policies) #include "Framework/DataSamplingReadoutAdapter.h" #include "Framework/runDataProcessing.h" +#include "QualityControl/InfrastructureGenerator.h" + #include "QualityControl/Checker.h" #include "QualityControl/CheckerFactory.h" #include "QualityControl/TaskRunnerFactory.h" @@ -57,6 +59,7 @@ using namespace o2::quality_control::checker; WorkflowSpec defineDataProcessing(ConfigContext const&) { + // Creating the Readout proxy WorkflowSpec specs{ specifyExternalFairMQDeviceProxy( "readout-proxy", @@ -65,20 +68,15 @@ WorkflowSpec defineDataProcessing(ConfigContext const&) dataSamplingReadoutAdapter({ "ITS", "RAWDATA" })) }; - - // Exemplary initialization of QC Task: - const std::string qcTaskName = "daqTask"; - const std::string qcConfigurationSource = - std::string("json:/") + getenv("QUALITYCONTROL_ROOT") + "/etc/readout.json"; - TaskRunnerFactory qcFactory; - specs.push_back(qcFactory.create(qcTaskName, qcConfigurationSource)); - CheckerFactory checkerFactory; - specs.push_back(checkerFactory.create("checker_0", qcTaskName, qcConfigurationSource)); + // Generation of the QC topology + const std::string qcConfigurationSource = std::string("json:/") + getenv("QUALITYCONTROL_ROOT") + "/etc/readout.json"; + auto qcInfrastructure = InfrastructureGenerator::generateRemoteInfrastructure(qcConfigurationSource); + specs.insert(std::end(specs), std::begin(qcInfrastructure), std::end(qcInfrastructure)); DataProcessorSpec printer{ "printer", Inputs{ - { "checked-mo", "QC", Checker::createCheckerDataDescription(qcTaskName), 0 } + { "checked-mo", "QC", Checker::createCheckerDataDescription("daqTask"), 0 } }, Outputs{}, AlgorithmSpec{ diff --git a/Framework/test/testInfrastructureGenerator.cxx b/Framework/test/testInfrastructureGenerator.cxx new file mode 100644 index 0000000000..8363772c3e --- /dev/null +++ b/Framework/test/testInfrastructureGenerator.cxx @@ -0,0 +1,95 @@ +/// +/// \file testQCFactory.cxx +/// \author Piotr Konopka +/// + +#define BOOST_TEST_MODULE QCFactory test +#define BOOST_TEST_MAIN +#define BOOST_TEST_DYN_LINK + +#include +#include + +#include "QualityControl/InfrastructureGenerator.h" + +using namespace o2::quality_control::core; +using namespace o2::framework; + +BOOST_AUTO_TEST_CASE(qc_factory_local_test) +{ + std::string configFilePath = std::string("json:/") + getenv("QUALITYCONTROL_ROOT") + "/test/testQCFactory.json"; + + { + auto workflow = InfrastructureGenerator::generateLocalInfrastructure(configFilePath, "o2flp1"); + + BOOST_REQUIRE_EQUAL(workflow.size(), 1); + + BOOST_CHECK_EQUAL(workflow[0].name, "skeletonTask"); + BOOST_CHECK_EQUAL(workflow[0].inputs.size(), 1); + BOOST_CHECK_EQUAL(workflow[0].outputs.size(), 1); + BOOST_CHECK_EQUAL(workflow[0].outputs[0].subSpec, 1); + } + + { + auto workflow = InfrastructureGenerator::generateLocalInfrastructure(configFilePath, "o2flp2"); + + BOOST_REQUIRE_EQUAL(workflow.size(), 1); + + BOOST_CHECK_EQUAL(workflow[0].name, "skeletonTask"); + BOOST_CHECK_EQUAL(workflow[0].inputs.size(), 1); + BOOST_CHECK_EQUAL(workflow[0].outputs.size(), 1); + BOOST_CHECK_EQUAL(workflow[0].outputs[0].subSpec, 2); + } + + { + auto workflow = InfrastructureGenerator::generateLocalInfrastructure(configFilePath, "o2flp3"); + + BOOST_REQUIRE_EQUAL(workflow.size(), 0); + } +} + +BOOST_AUTO_TEST_CASE(qc_factory_remote_test) +{ + std::string configFilePath = std::string("json:/") + getenv("QUALITYCONTROL_ROOT") + "/test/testQCFactory.json"; + auto workflow = InfrastructureGenerator::generateRemoteInfrastructure(configFilePath); + + // the infrastructure should consist of a merger and checker for the 'skeletonTask' (its taskRunner is declared to be + // local) and also taskRunner and checker for the 'abcTask'. + BOOST_REQUIRE_EQUAL(workflow.size(), 4); + + auto mergerSkeletonTask = std::find_if( + workflow.begin(), workflow.end(), + [](const DataProcessorSpec& d) { + return d.name == "skeletonTask-merger" && + d.inputs.size() == 2 && d.inputs[0].subSpec == 1 && d.inputs[1].subSpec == 2 && + d.outputs.size() == 1 && d.outputs[0].subSpec == 0; + }); + BOOST_CHECK(mergerSkeletonTask != workflow.end()); + + auto checkerSkeletonTask = std::find_if( + workflow.begin(), workflow.end(), + [](const DataProcessorSpec& d) { + return d.name == "skeletonTask-checker" && + d.inputs.size() == 1 && + d.outputs.size() == 1; + }); + BOOST_CHECK(checkerSkeletonTask != workflow.end()); + + auto taskRunnerAbcTask = std::find_if( + workflow.begin(), workflow.end(), + [](const DataProcessorSpec& d) { + return d.name == "abcTask" && + d.inputs.size() == 1 && + d.outputs.size() == 1; + }); + BOOST_CHECK(taskRunnerAbcTask != workflow.end()); + + auto checkerAbcTask = std::find_if( + workflow.begin(), workflow.end(), + [](const DataProcessorSpec& d) { + return d.name == "abcTask-checker" && + d.inputs.size() == 1 && + d.outputs.size() == 1; + }); + BOOST_CHECK(checkerAbcTask != workflow.end()); +} \ No newline at end of file diff --git a/Framework/test/testQCFactory.json b/Framework/test/testQCFactory.json new file mode 100644 index 0000000000..6a5ea1c4f5 --- /dev/null +++ b/Framework/test/testQCFactory.json @@ -0,0 +1,75 @@ +{ + "qc": { + "config": { + "database": { + "username": "qc_user", + "password": "qc_user", + "name": "quality_control", + "implementation": "MySql", + "host": "localhost:3306" + }, + "Activity": { + "number": "42", + "type": "2" + } + }, + "tasks": { + "skeletonTask": { + "active": true, + "className": "o2::quality_control_modules::skeleton::SkeletonTask", + "moduleName": "QcSkeleton", + "dataSamplingPolicy": "tpcclust", + "cycleDurationSeconds": "10", + "maxNumberCycles": "-1", + "taskParameters": { + "parameter1": 100001, + "parameter2": "qu'est-ce que c'est que ce truc la" + }, + "location": "local", + "machines": [ + "o2flp1", + "o2flp2" + ] + }, + "abcTask": { + "active": true, + "className": "o2::quality_control_modules::skeleton::SkeletonTask", + "moduleName": "QcSkeleton", + "dataSamplingPolicy": "tpcclust", + "cycleDurationSeconds": "10", + "maxNumberCycles": "-1", + "taskParameters": { + "parameter1": 100002, + "parameter2": "c'est quoi" + }, + "location": "remote" + }, + "defTask": { + "active": false + } + } + }, + "dataSamplingPolicies": [ + { + "id": "tpcclust", + "active": "true", + "machines": [], + "dataHeaders": [ + { + "binding": "clusters", + "dataOrigin": "TPC", + "dataDescription": "CLUSTERS" + } + ], + "subSpec": "0", + "samplingConditions": [ + { + "condition": "random", + "fraction": "0.1", + "seed": "1234" + } + ], + "blocking": "false" + } + ] +} \ No newline at end of file