Skip to content

Commit

Permalink
[ntuple] Add RNTupleSingleProcessor
Browse files Browse the repository at this point in the history
The `RNTupleSingleProcessor` processes one single RNTuple, using the same
interfaces as the `RNTupleChainProcessor` and `RNTupleJoinProcessor`.
The point of adding this processor is that this way, the other
processors can use a unified interface for loading entries, without
having to know whether they are dealing with a single RNTuple or a
composed one.
  • Loading branch information
enirolf committed Dec 13, 2024
1 parent 2e64025 commit 6a7034e
Show file tree
Hide file tree
Showing 5 changed files with 192 additions and 0 deletions.
2 changes: 2 additions & 0 deletions tree/ntuple/v7/inc/ROOT/REntry.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ namespace ROOT {
namespace Experimental {

class RNTupleProcessor;
class RNTupleSingleProcessor;
class RNTupleChainProcessor;
class RNTupleJoinProcessor;

Expand All @@ -52,6 +53,7 @@ class REntry {
friend class RNTupleReader;
friend class RNTupleFillContext;
friend class RNTupleProcessor;
friend class RNTupleSingleProcessor;
friend class RNTupleChainProcessor;
friend class RNTupleJoinProcessor;

Expand Down
28 changes: 28 additions & 0 deletions tree/ntuple/v7/inc/ROOT/RNTupleProcessor.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ protected:
// clang-format on
class RFieldContext {
friend class RNTupleProcessor;
friend class RNTupleSingleProcessor;
friend class RNTupleChainProcessor;
friend class RNTupleJoinProcessor;

Expand Down Expand Up @@ -224,6 +225,9 @@ public:
RIterator begin() { return RIterator(*this, 0); }
RIterator end() { return RIterator(*this, kInvalidNTupleIndex); }

static std::unique_ptr<RNTupleProcessor> Create(const RNTupleOpenSpec &ntuple);
static std::unique_ptr<RNTupleProcessor> Create(const RNTupleOpenSpec &ntuple, RNTupleModel &model);

/////////////////////////////////////////////////////////////////////////////
/// \brief Create a new RNTuple processor chain for vertical concatenation of RNTuples.
///
Expand Down Expand Up @@ -256,6 +260,30 @@ public:
std::vector<std::unique_ptr<RNTupleModel>> models = {});
};

// clang-format off
/**
\class ROOT::Experimental::RNTupleSingleProcessor
\ingroup NTuple
\brief Processor specializiation for processing a single RNTuple.
*/
// clang-format on
class RNTupleSingleProcessor : public RNTupleProcessor {
friend class RNTupleProcessor;

private:
/////////////////////////////////////////////////////////////////////////////
/// \brief Constructs a new RNTupleProcessor for processing a single RNTuple.
///
/// \param[in] ntuple The source specification (name and storage location) for the RNTuple to process.
/// \param[in] model The model that specifies which fields should be read by the processor.
RNTupleSingleProcessor(const RNTupleOpenSpec &ntuple, RNTupleModel &model);

NTupleSize_t Advance() final;

public:
void LoadEntry() { fEntry->Read(fLocalEntryNumber); }
};

// clang-format off
/**
\class ROOT::Experimental::RNTupleChainProcessor
Expand Down
63 changes: 63 additions & 0 deletions tree/ntuple/v7/src/RNTupleProcessor.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,21 @@ void EnsureUniqueNTupleNames(const std::vector<RNTupleOpenSpec> &ntuples)
}
} // anonymous namespace

std::unique_ptr<ROOT::Experimental::RNTupleProcessor>
ROOT::Experimental::RNTupleProcessor::Create(const RNTupleOpenSpec &ntuple)
{
auto pageSource = Internal::RPageSource::Create(ntuple.fNTupleName, ntuple.fStorage);
pageSource->Attach();
auto model = pageSource->GetSharedDescriptorGuard()->CreateModel();
return RNTupleProcessor::Create(ntuple, *model);
}

std::unique_ptr<ROOT::Experimental::RNTupleProcessor>
ROOT::Experimental::RNTupleProcessor::Create(const RNTupleOpenSpec &ntuple, RNTupleModel &model)
{
return std::unique_ptr<RNTupleSingleProcessor>(new RNTupleSingleProcessor(ntuple, model));
}

std::unique_ptr<ROOT::Experimental::RNTupleProcessor>
ROOT::Experimental::RNTupleProcessor::CreateChain(const std::vector<RNTupleOpenSpec> &ntuples,
std::unique_ptr<RNTupleModel> model)
Expand Down Expand Up @@ -105,6 +120,54 @@ void ROOT::Experimental::RNTupleProcessor::ConnectField(RFieldContext &fieldCont

//------------------------------------------------------------------------------

ROOT::Experimental::RNTupleSingleProcessor::RNTupleSingleProcessor(const RNTupleOpenSpec &ntuple, RNTupleModel &model)
: RNTupleProcessor({ntuple})
{
fPageSource = Internal::RPageSource::Create(ntuple.fNTupleName, ntuple.fStorage);
fPageSource->Attach();

if (fPageSource->GetNEntries() == 0) {
throw RException(R__FAIL("specified RNTuple \"" + ntuple.fNTupleName + "\" at path \"" + ntuple.fStorage +
"\" does not contain any entries"));
}

model.Freeze();
fEntry = model.CreateEntry();

for (const auto &value : *fEntry) {
auto &field = value.GetField();
auto token = fEntry->GetToken(field.GetFieldName());

// If the model has a default entry, use the value pointers from the entry in the entry managed by the
// processor. This way, the pointers returned by RNTupleModel::MakeField can be used in the processor loop to
// access the corresponding field values.
if (!model.IsBare()) {
auto valuePtr = model.GetDefaultEntry().GetPtr<void>(token);
fEntry->BindValue(token, valuePtr);
}

auto fieldContext = RFieldContext(field.Clone(field.GetFieldName()), token);
ConnectField(fieldContext, *fPageSource, *fEntry);
fFieldContexts.try_emplace(field.GetFieldName(), std::move(fieldContext));
}
}

ROOT::Experimental::NTupleSize_t ROOT::Experimental::RNTupleSingleProcessor::Advance()
{
++fNEntriesProcessed;

if (fNEntriesProcessed >= fPageSource->GetNEntries()) {
return kInvalidNTupleIndex;
}

++fLocalEntryNumber;
LoadEntry();

return fNEntriesProcessed;
}

//------------------------------------------------------------------------------

ROOT::Experimental::RNTupleChainProcessor::RNTupleChainProcessor(const std::vector<RNTupleOpenSpec> &ntuples,
std::unique_ptr<RNTupleModel> model)
: RNTupleProcessor(ntuples)
Expand Down
1 change: 1 addition & 0 deletions tree/ntuple/v7/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ ROOT_ADD_GTEST(ntuple_multi_column ntuple_multi_column.cxx LIBRARIES ROOTNTuple)
ROOT_ADD_GTEST(ntuple_packing ntuple_packing.cxx LIBRARIES ROOTNTuple CustomStruct)
ROOT_ADD_GTEST(ntuple_pages ntuple_pages.cxx LIBRARIES ROOTNTuple CustomStruct)
ROOT_ADD_GTEST(ntuple_print ntuple_print.cxx LIBRARIES ROOTNTuple CustomStruct)
ROOT_ADD_GTEST(ntuple_processor ntuple_processor.cxx LIBRARIES ROOTNTuple)
ROOT_ADD_GTEST(ntuple_processor_chain ntuple_processor_chain.cxx LIBRARIES ROOTNTuple)
ROOT_ADD_GTEST(ntuple_processor_join ntuple_processor_join.cxx LIBRARIES ROOTNTuple)
ROOT_ADD_GTEST(ntuple_project ntuple_project.cxx LIBRARIES ROOTNTuple)
Expand Down
98 changes: 98 additions & 0 deletions tree/ntuple/v7/test/ntuple_processor.cxx
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
#include "ntuple_test.hxx"

#include <ROOT/RNTupleProcessor.hxx>

class RNTupleProcessorTest : public testing::Test {
protected:
const std::string fFileName = "test_ntuple_processor.root";
const std::string fNTupleName = "ntuple";

void SetUp() override
{
auto model = RNTupleModel::Create();
auto fldX = model->MakeField<float>("x");
auto fldY = model->MakeField<std::vector<float>>("y");
auto ntuple = RNTupleWriter::Recreate(std::move(model), fNTupleName, fFileName);

for (unsigned i = 0; i < 5; i++) {
*fldX = static_cast<float>(i);
*fldY = {static_cast<float>(i), static_cast<float>(i * 2)};
ntuple->Fill();
}
}
};

TEST_F(RNTupleProcessorTest, Base)
{
RNTupleOpenSpec ntuple{fNTupleName, fFileName};
auto proc = RNTupleProcessor::Create(ntuple);

int nEntries = 0;

for (const auto &entry : *proc) {
EXPECT_FLOAT_EQ(static_cast<float>(proc->GetNEntriesProcessed()), *entry.GetPtr<float>("x"));

std::vector<float> yExp{static_cast<float>(proc->GetNEntriesProcessed()),
static_cast<float>(proc->GetNEntriesProcessed() * 2)};
EXPECT_EQ(yExp, *entry.GetPtr<std::vector<float>>("y"));

EXPECT_EQ(proc->GetNEntriesProcessed(), proc->GetLocalEntryNumber());
++nEntries;
}
EXPECT_EQ(nEntries, 5);
EXPECT_EQ(nEntries, proc->GetNEntriesProcessed());
}

TEST_F(RNTupleProcessorTest, BaseWithModel)
{
RNTupleOpenSpec ntuple{fNTupleName, fFileName};

auto model = RNTupleModel::Create();
auto fldX = model->MakeField<float>("x");

auto proc = RNTupleProcessor::Create(ntuple, *model);

int nEntries = 0;

for (const auto &entry : *proc) {
EXPECT_FLOAT_EQ(static_cast<float>(proc->GetNEntriesProcessed()), *fldX);
EXPECT_EQ(proc->GetNEntriesProcessed(), proc->GetLocalEntryNumber());

try {
entry.GetPtr<std::vector<float>>("y");
FAIL() << "fields not present in the model passed to the processor shouldn't be readable";
} catch (const RException &err) {
EXPECT_THAT(err.what(), testing::HasSubstr("invalid field name: y"));
}
++nEntries;
}
EXPECT_EQ(nEntries, 5);
EXPECT_EQ(nEntries, proc->GetNEntriesProcessed());
}

TEST_F(RNTupleProcessorTest, BaseWithBareModel)
{
RNTupleOpenSpec ntuple{fNTupleName, fFileName};

auto model = RNTupleModel::CreateBare();
model->MakeField<float>("x");

auto proc = RNTupleProcessor::Create(ntuple, *model);

int nEntries = 0;

for (const auto &entry : *proc) {
EXPECT_FLOAT_EQ(static_cast<float>(proc->GetNEntriesProcessed()), *entry.GetPtr<float>("x"));
EXPECT_EQ(proc->GetNEntriesProcessed(), proc->GetLocalEntryNumber());

try {
entry.GetPtr<std::vector<float>>("y");
FAIL() << "fields not present in the model passed to the processor shouldn't be readable";
} catch (const RException &err) {
EXPECT_THAT(err.what(), testing::HasSubstr("invalid field name: y"));
}
++nEntries;
}
EXPECT_EQ(nEntries, 5);
EXPECT_EQ(nEntries, proc->GetNEntriesProcessed());
}

0 comments on commit 6a7034e

Please sign in to comment.