From bc62961eddb5ddb23be8d16cc24a90fddbe1d4c2 Mon Sep 17 00:00:00 2001 From: Florine de Geus Date: Wed, 18 Dec 2024 11:33:36 +0100 Subject: [PATCH 1/6] [ntuple] Lazily initialize `RNTupleSingleProcessor` Prevents the page sources corresponding to the processor to be openened upon creation. Instead, defer opening them until the first `Advance` call. --- tree/ntuple/v7/inc/ROOT/RNTupleProcessor.hxx | 8 +++++++- tree/ntuple/v7/src/RNTupleProcessor.cxx | 18 ++++++++++++++++-- 2 files changed, 23 insertions(+), 3 deletions(-) diff --git a/tree/ntuple/v7/inc/ROOT/RNTupleProcessor.hxx b/tree/ntuple/v7/inc/ROOT/RNTupleProcessor.hxx index 7bf820775c22c..5633cf7b9db91 100644 --- a/tree/ntuple/v7/inc/ROOT/RNTupleProcessor.hxx +++ b/tree/ntuple/v7/inc/ROOT/RNTupleProcessor.hxx @@ -290,6 +290,12 @@ class RNTupleSingleProcessor : public RNTupleProcessor { friend class RNTupleProcessor; private: + bool fIsConnected = false; + + ///////////////////////////////////////////////////////////////////////////// + /// \brief Connects the page source of the underlying RNTuple. + void Connect(); + ///////////////////////////////////////////////////////////////////////////// /// \brief Constructs a new RNTupleProcessor for processing a single RNTuple. /// @@ -300,7 +306,7 @@ private: NTupleSize_t Advance() final; public: - void LoadEntry() { fEntry->Read(fLocalEntryNumber); } + void LoadEntry() final { fEntry->Read(fLocalEntryNumber); } }; // clang-format off diff --git a/tree/ntuple/v7/src/RNTupleProcessor.cxx b/tree/ntuple/v7/src/RNTupleProcessor.cxx index 45b2fe17ffc8b..fa097fac80ed5 100644 --- a/tree/ntuple/v7/src/RNTupleProcessor.cxx +++ b/tree/ntuple/v7/src/RNTupleProcessor.cxx @@ -123,7 +123,6 @@ ROOT::Experimental::RNTupleSingleProcessor::RNTupleSingleProcessor(const RNTuple : RNTupleProcessor({ntuple}) { fPageSource = Internal::RPageSource::Create(ntuple.fNTupleName, ntuple.fStorage); - fPageSource->Attach(); model.Freeze(); fEntry = model.CreateEntry(); @@ -141,13 +140,14 @@ ROOT::Experimental::RNTupleSingleProcessor::RNTupleSingleProcessor(const RNTuple } auto fieldContext = RFieldContext(field.Clone(field.GetFieldName()), token); - ConnectField(fieldContext, *fPageSource, *fEntry); fFieldContexts.try_emplace(field.GetFieldName(), std::move(fieldContext)); } } ROOT::Experimental::NTupleSize_t ROOT::Experimental::RNTupleSingleProcessor::Advance() { + Connect(); + if (fLocalEntryNumber == kInvalidNTupleIndex || fLocalEntryNumber >= fPageSource->GetNEntries()) { return kInvalidNTupleIndex; } @@ -158,6 +158,20 @@ ROOT::Experimental::NTupleSize_t ROOT::Experimental::RNTupleSingleProcessor::Adv return fLocalEntryNumber; } +void ROOT::Experimental::RNTupleSingleProcessor::Connect() +{ + if (fIsConnected) + return; + + fPageSource->Attach(); + + for (auto &[_, fieldContext] : fFieldContexts) { + ConnectField(fieldContext, *fPageSource, *fEntry); + } + + fIsConnected = true; +} + //------------------------------------------------------------------------------ ROOT::Experimental::RNTupleChainProcessor::RNTupleChainProcessor(const std::vector &ntuples, From 84e35abf0c1d554e75a2fcf41f9a8dd9c3377c8f Mon Sep 17 00:00:00 2001 From: Florine de Geus Date: Wed, 18 Dec 2024 09:51:32 +0100 Subject: [PATCH 2/6] [ntuple] Make `LoadEntry` random access In anticipation of the composition of the different processors (in particular, the join processor), entries loading should offer the possibility for random access. This will not change anything about the way users can interact with the processor, which is exclusively through the (linear) iterator. --- tree/ntuple/v7/inc/ROOT/RNTupleProcessor.hxx | 70 ++++++++--------- tree/ntuple/v7/src/RNTupleProcessor.cxx | 78 +++++++++++-------- tree/ntuple/v7/test/ntuple_processor.cxx | 6 +- .../ntuple/v7/test/ntuple_processor_chain.cxx | 8 +- tree/ntuple/v7/test/ntuple_processor_join.cxx | 22 +++--- 5 files changed, 95 insertions(+), 89 deletions(-) diff --git a/tree/ntuple/v7/inc/ROOT/RNTupleProcessor.hxx b/tree/ntuple/v7/inc/ROOT/RNTupleProcessor.hxx index 5633cf7b9db91..7e5f65ad1cf53 100644 --- a/tree/ntuple/v7/inc/ROOT/RNTupleProcessor.hxx +++ b/tree/ntuple/v7/inc/ROOT/RNTupleProcessor.hxx @@ -122,35 +122,22 @@ protected: NTupleSize_t fNEntriesProcessed; //< Total number of entries processed so far std::size_t fCurrentNTupleNumber; //< Index of the currently open RNTuple - NTupleSize_t fLocalEntryNumber; //< Entry number within the current ntuple + NTupleSize_t fCurrentEntryNumber; //< Entry number within the current ntuple ///////////////////////////////////////////////////////////////////////////// /// \brief Creates and connects a concrete field to the current page source, based on its proto field. void ConnectField(RFieldContext &fieldContext, Internal::RPageSource &pageSource, REntry &entry); - ////////////////////////////////////////////////////////////////////////// - /// \brief Advance the processor to the next available entry. - /// - /// \return The number of the entry loaded after advancing, or kInvalidNTupleIndex if there was no entry to advance - /// to. - /// - /// Checks if the end of the currently connected RNTuple is reached. If this is the case, either the next RNTuple - /// is connected or the iterator has reached the end. - virtual NTupleSize_t Advance() = 0; - - ///////////////////////////////////////////////////////////////////////////// - /// \brief Fill the entry with values belonging to the current entry number. - virtual void LoadEntry() = 0; - ///////////////////////////////////////////////////////////////////////////// - /// \brief Set the local (i.e. relative to the page source currently openend) entry number. Used by - /// `RNTupleProcessor::RIterator`. + /// \brief Load the entry identified by the provided entry number. /// - /// \param[in] entryNumber - void SetLocalEntryNumber(NTupleSize_t entryNumber) { fLocalEntryNumber = entryNumber; } + /// \param[in] entryNumber Entry number to load + /// + /// \return `entryNumber` if the entry was successfully loaded, `kInvalidNTupleIndex` otherwise. + virtual NTupleSize_t LoadEntry(NTupleSize_t entryNumber) = 0; RNTupleProcessor(const std::vector &ntuples) - : fNTuples(ntuples), fNEntriesProcessed(0), fCurrentNTupleNumber(0), fLocalEntryNumber(0) + : fNTuples(ntuples), fNEntriesProcessed(0), fCurrentNTupleNumber(0), fCurrentEntryNumber(0) { } @@ -175,7 +162,7 @@ public: /// \brief Get the entry number local to the RNTuple that is currently being processed. /// /// When only one RNTuple is present in the processor chain, the return value is equal to GetGlobalEntryNumber. - NTupleSize_t GetLocalEntryNumber() const { return fLocalEntryNumber; } + NTupleSize_t GetCurrentEntryNumber() const { return fCurrentEntryNumber; } ///////////////////////////////////////////////////////////////////////////// /// \brief Returns a reference to the entry used by the processor. @@ -209,16 +196,14 @@ public: { // This constructor is called with kInvalidNTupleIndex for RNTupleProcessor::end(). In that case, we already // know there is nothing to advance to. - if (fCurrentEntryNumber != kInvalidNTupleIndex) { - fProcessor.SetLocalEntryNumber(fCurrentEntryNumber); - fCurrentEntryNumber = fProcessor.Advance(); + if (processor.GetCurrentEntryNumber() != kInvalidNTupleIndex) { + fCurrentEntryNumber = fProcessor.LoadEntry(fCurrentEntryNumber); } } iterator operator++() { - fProcessor.SetLocalEntryNumber(fCurrentEntryNumber + 1); - fCurrentEntryNumber = fProcessor.Advance(); + fCurrentEntryNumber = fProcessor.LoadEntry(fCurrentEntryNumber + 1); return *this; } @@ -296,17 +281,18 @@ private: /// \brief Connects the page source of the underlying RNTuple. void Connect(); + ///////////////////////////////////////////////////////////////////////////// + /// \brief Load the entry identified by the provided entry number. + /// + /// \sa ROOT::Experimental::RNTupleProcessor::LoadEntry + NTupleSize_t LoadEntry(NTupleSize_t entryNumber) final; + ///////////////////////////////////////////////////////////////////////////// /// \brief Constructs a new RNTupleProcessor for processing a single RNTuple. /// /// \param[in] ntuple The source specification (name and storage location) for the RNTuple to process. /// \param[in] model The model that specifies which fields should be read by the processor. RNTupleSingleProcessor(const RNTupleOpenSpec &ntuple, RNTupleModel &model); - - NTupleSize_t Advance() final; - -public: - void LoadEntry() final { fEntry->Read(fLocalEntryNumber); } }; // clang-format off @@ -320,19 +306,25 @@ class RNTupleChainProcessor : public RNTupleProcessor { friend class RNTupleProcessor; private: - NTupleSize_t Advance() final; - void LoadEntry() final { fEntry->Read(fLocalEntryNumber); } + std::vector fInnerNEntries; + + ///////////////////////////////////////////////////////////////////////////// + /// \brief Load the entry identified by the provided (global) entry number (i.e., considering all RNTuples in this + /// processor). + /// + /// \sa ROOT::Experimental::RNTupleProcessor::LoadEntry + NTupleSize_t LoadEntry(NTupleSize_t entryNumber) final; ///////////////////////////////////////////////////////////////////////////// /// \brief Connect an RNTuple for processing. /// - /// \param[in] ntuple The RNTupleOpenSpec describing the RNTuple to connect. + /// \param[in] ntupleNumber Which RNTuple (according to the initial specified order) to connect. /// /// \return The number of entries in the newly-connected RNTuple. /// /// Creates and attaches new page source for the specified RNTuple, and connects the fields that are known by /// the processor to it. - NTupleSize_t ConnectNTuple(const RNTupleOpenSpec &ntuple); + NTupleSize_t ConnectNTuple(std::size_t ntupleNumber); ///////////////////////////////////////////////////////////////////////////// /// \brief Constructs a new RNTupleChainProcessor. @@ -365,11 +357,11 @@ private: bool IsUsingIndex() const { return fJoinIndices.size() > 0; } - NTupleSize_t Advance() final; - ///////////////////////////////////////////////////////////////////////////// - /// \brief Fill the entry with values belonging to the current entry number of the primary RNTuple. - void LoadEntry() final; + /// \brief Load the entry identified by the provided entry number of the primary RNTuple. + /// + /// \sa ROOT::Experimental::RNTupleProcessor::LoadEntry + NTupleSize_t LoadEntry(NTupleSize_t entryNumber) final; ///////////////////////////////////////////////////////////////////////////// /// \brief Constructs a new RNTupleJoinProcessor. diff --git a/tree/ntuple/v7/src/RNTupleProcessor.cxx b/tree/ntuple/v7/src/RNTupleProcessor.cxx index fa097fac80ed5..c2bc4d3d583a3 100644 --- a/tree/ntuple/v7/src/RNTupleProcessor.cxx +++ b/tree/ntuple/v7/src/RNTupleProcessor.cxx @@ -144,18 +144,18 @@ ROOT::Experimental::RNTupleSingleProcessor::RNTupleSingleProcessor(const RNTuple } } -ROOT::Experimental::NTupleSize_t ROOT::Experimental::RNTupleSingleProcessor::Advance() +ROOT::Experimental::NTupleSize_t ROOT::Experimental::RNTupleSingleProcessor::LoadEntry(NTupleSize_t entryNumber) { Connect(); - if (fLocalEntryNumber == kInvalidNTupleIndex || fLocalEntryNumber >= fPageSource->GetNEntries()) { + if (entryNumber >= fPageSource->GetNEntries()) return kInvalidNTupleIndex; - } - LoadEntry(); + fEntry->Read(entryNumber); fNEntriesProcessed++; - return fLocalEntryNumber; + fCurrentEntryNumber = entryNumber; + return entryNumber; } void ROOT::Experimental::RNTupleSingleProcessor::Connect() @@ -181,9 +181,13 @@ ROOT::Experimental::RNTupleChainProcessor::RNTupleChainProcessor(const std::vect if (fNTuples.empty()) throw RException(R__FAIL("at least one RNTuple must be provided")); + fInnerNEntries.assign(ntuples.size(), kInvalidNTupleIndex); + fPageSource = Internal::RPageSource::Create(fNTuples[0].fNTupleName, fNTuples[0].fStorage); fPageSource->Attach(); + fInnerNEntries[0] = fPageSource->GetNEntries(); + if (fPageSource->GetNEntries() == 0) { throw RException(R__FAIL("first RNTuple does not contain any entries")); } @@ -212,8 +216,13 @@ ROOT::Experimental::RNTupleChainProcessor::RNTupleChainProcessor(const std::vect } } -ROOT::Experimental::NTupleSize_t ROOT::Experimental::RNTupleChainProcessor::ConnectNTuple(const RNTupleOpenSpec &ntuple) +ROOT::Experimental::NTupleSize_t ROOT::Experimental::RNTupleChainProcessor::ConnectNTuple(std::size_t ntupleNumber) { + if (ntupleNumber >= fNTuples.size()) + return kInvalidNTupleIndex; + + const auto &ntuple = fNTuples[ntupleNumber]; + // Before destroying the current page source and replacing it by the new one, we need to reset the concrete fields // belonging to the current page source. Otherwise, these concrete fields become invalid. for (auto &[_, fieldContext] : fFieldContexts) { @@ -223,6 +232,8 @@ ROOT::Experimental::NTupleSize_t ROOT::Experimental::RNTupleChainProcessor::Conn fPageSource = Internal::RPageSource::Create(ntuple.fNTupleName, ntuple.fStorage); fPageSource->Attach(); + fInnerNEntries[ntupleNumber] = fPageSource->GetNEntries(); + // Now that the new page source has been created and attached, we can create and connect the concrete fields again. for (auto &[_, fieldContext] : fFieldContexts) { ConnectField(fieldContext, *fPageSource, *fEntry); @@ -231,26 +242,33 @@ ROOT::Experimental::NTupleSize_t ROOT::Experimental::RNTupleChainProcessor::Conn return fPageSource->GetNEntries(); } -ROOT::Experimental::NTupleSize_t ROOT::Experimental::RNTupleChainProcessor::Advance() +ROOT::Experimental::NTupleSize_t ROOT::Experimental::RNTupleChainProcessor::LoadEntry(NTupleSize_t entryNumber) { - if (fLocalEntryNumber == kInvalidNTupleIndex) - return kInvalidNTupleIndex; + NTupleSize_t localEntryNumber = entryNumber; + size_t currentNTuple = fCurrentNTupleNumber; - if (fLocalEntryNumber >= fPageSource->GetNEntries()) { - do { - if (++fCurrentNTupleNumber >= fNTuples.size()) { - return kInvalidNTupleIndex; - } - // Skip over empty ntuples we might encounter. - } while (ConnectNTuple(fNTuples.at(fCurrentNTupleNumber)) == 0); + while (localEntryNumber >= fInnerNEntries[currentNTuple]) { + localEntryNumber -= fInnerNEntries[currentNTuple]; - fLocalEntryNumber = 0; + if (++currentNTuple >= fNTuples.size()) + return kInvalidNTupleIndex; + + if (fInnerNEntries[currentNTuple] == kInvalidNTupleIndex) { + auto tmpPageSource = + Internal::RPageSource::Create(fNTuples[currentNTuple].fNTupleName, fNTuples[currentNTuple].fStorage); + tmpPageSource->Attach(); + fInnerNEntries[currentNTuple] = tmpPageSource->GetNEntries(); + } } - LoadEntry(); + if (currentNTuple != fCurrentNTupleNumber) { + ConnectNTuple(currentNTuple); + } + fEntry->Read(localEntryNumber); fNEntriesProcessed++; - return fLocalEntryNumber; + fCurrentEntryNumber = entryNumber; + return entryNumber; } //------------------------------------------------------------------------------ @@ -395,32 +413,26 @@ void ROOT::Experimental::RNTupleJoinProcessor::ConnectFields() } } -ROOT::Experimental::NTupleSize_t ROOT::Experimental::RNTupleJoinProcessor::Advance() +ROOT::Experimental::NTupleSize_t ROOT::Experimental::RNTupleJoinProcessor::LoadEntry(NTupleSize_t entryNumber) { - if (fLocalEntryNumber == kInvalidNTupleIndex || fLocalEntryNumber >= fPageSource->GetNEntries()) { + if (entryNumber >= fPageSource->GetNEntries()) return kInvalidNTupleIndex; - } - - LoadEntry(); - - fNEntriesProcessed++; - return fLocalEntryNumber; -} -void ROOT::Experimental::RNTupleJoinProcessor::LoadEntry() -{ // Read the values of the primary ntuple. If no index is used (i.e., the join is aligned), also read the values of // auxiliary ntuples. for (const auto &[_, fieldContext] : fFieldContexts) { if (!fieldContext.IsAuxiliary() || !IsUsingIndex()) { auto &value = fEntry->GetValue(fieldContext.fToken); - value.Read(fLocalEntryNumber); + value.Read(entryNumber); } } + fCurrentEntryNumber = entryNumber; + fNEntriesProcessed++; + // If no index is used (i.e., the join is aligned), there's nothing left to do. if (!IsUsingIndex()) - return; + return entryNumber; // Collect the values of the join fields for this entry. std::vector valPtrs; @@ -456,4 +468,6 @@ void ROOT::Experimental::RNTupleJoinProcessor::LoadEntry() value.Read(indexEntryNumbers[fieldContext.fNTupleIdx - 1]); } } + + return entryNumber; } diff --git a/tree/ntuple/v7/test/ntuple_processor.cxx b/tree/ntuple/v7/test/ntuple_processor.cxx index b9462517e3520..16deefbf2a459 100644 --- a/tree/ntuple/v7/test/ntuple_processor.cxx +++ b/tree/ntuple/v7/test/ntuple_processor.cxx @@ -51,7 +51,7 @@ TEST_F(RNTupleProcessorTest, Base) for (const auto &entry : *proc) { EXPECT_EQ(++nEntries, proc->GetNEntriesProcessed()); - EXPECT_EQ(nEntries - 1, proc->GetLocalEntryNumber()); + EXPECT_EQ(nEntries - 1, proc->GetCurrentEntryNumber()); EXPECT_FLOAT_EQ(static_cast(nEntries - 1), *entry.GetPtr("x")); @@ -75,7 +75,7 @@ TEST_F(RNTupleProcessorTest, BaseWithModel) for (const auto &entry : *proc) { EXPECT_EQ(++nEntries, proc->GetNEntriesProcessed()); - EXPECT_EQ(nEntries - 1, proc->GetLocalEntryNumber()); + EXPECT_EQ(nEntries - 1, proc->GetCurrentEntryNumber()); EXPECT_FLOAT_EQ(static_cast(nEntries - 1), *fldX); @@ -103,7 +103,7 @@ TEST_F(RNTupleProcessorTest, BaseWithBareModel) for (const auto &entry : *proc) { EXPECT_EQ(++nEntries, proc->GetNEntriesProcessed()); - EXPECT_EQ(nEntries - 1, proc->GetLocalEntryNumber()); + EXPECT_EQ(nEntries - 1, proc->GetCurrentEntryNumber()); EXPECT_FLOAT_EQ(static_cast(nEntries - 1), *entry.GetPtr("x")); diff --git a/tree/ntuple/v7/test/ntuple_processor_chain.cxx b/tree/ntuple/v7/test/ntuple_processor_chain.cxx index 4023bc6cd6bc9..d49a5469b4de1 100644 --- a/tree/ntuple/v7/test/ntuple_processor_chain.cxx +++ b/tree/ntuple/v7/test/ntuple_processor_chain.cxx @@ -88,10 +88,10 @@ TEST_F(RNTupleChainProcessorTest, SingleNTuple) auto proc = RNTupleProcessor::CreateChain(ntuples); for (const auto &entry : *proc) { EXPECT_EQ(++nEntries, proc->GetNEntriesProcessed()); - EXPECT_EQ(nEntries - 1, proc->GetLocalEntryNumber()); + EXPECT_EQ(nEntries - 1, proc->GetCurrentEntryNumber()); auto x = entry.GetPtr("x"); - EXPECT_FLOAT_EQ(static_cast(proc->GetLocalEntryNumber()), *x); + EXPECT_FLOAT_EQ(static_cast(proc->GetCurrentEntryNumber()), *x); } EXPECT_EQ(nEntries, 5); EXPECT_EQ(nEntries, proc->GetNEntriesProcessed()); @@ -106,9 +106,9 @@ TEST_F(RNTupleChainProcessorTest, Basic) for (const auto &entry : *proc) { EXPECT_EQ(++nEntries, proc->GetNEntriesProcessed()); if (proc->GetCurrentNTupleNumber() == 0) { - EXPECT_EQ(nEntries - 1, proc->GetLocalEntryNumber()); + EXPECT_EQ(nEntries - 1, proc->GetCurrentEntryNumber()); } else { - EXPECT_EQ(nEntries - 1, proc->GetLocalEntryNumber() + 5); + EXPECT_EQ(nEntries - 1, proc->GetCurrentEntryNumber() + 5); } auto x = entry.GetPtr("x"); diff --git a/tree/ntuple/v7/test/ntuple_processor_join.cxx b/tree/ntuple/v7/test/ntuple_processor_join.cxx index 14436e1612194..5bbd09418a020 100644 --- a/tree/ntuple/v7/test/ntuple_processor_join.cxx +++ b/tree/ntuple/v7/test/ntuple_processor_join.cxx @@ -98,11 +98,11 @@ TEST_F(RNTupleJoinProcessorTest, Basic) int nEntries = 0; for (const auto &entry : *proc) { EXPECT_EQ(++nEntries, proc->GetNEntriesProcessed()); - EXPECT_EQ(nEntries - 1, proc->GetLocalEntryNumber()); + EXPECT_EQ(nEntries - 1, proc->GetCurrentEntryNumber()); ; auto i = entry.GetPtr("i"); - EXPECT_EQ(proc->GetLocalEntryNumber() * 2, *i); + EXPECT_EQ(proc->GetCurrentEntryNumber() * 2, *i); } EXPECT_EQ(5, proc->GetNEntriesProcessed()); @@ -126,7 +126,7 @@ TEST_F(RNTupleJoinProcessorTest, Aligned) std::vector yExpected; for (auto &entry : *proc) { EXPECT_EQ(++nEntries, proc->GetNEntriesProcessed()); - EXPECT_EQ(nEntries - 1, proc->GetLocalEntryNumber()); + EXPECT_EQ(nEntries - 1, proc->GetCurrentEntryNumber()); auto i = entry.GetPtr("i"); @@ -166,9 +166,9 @@ TEST_F(RNTupleJoinProcessorTest, UnalignedSingleJoinField) auto y = proc->GetEntry().GetPtr>("ntuple2.y"); std::vector yExpected; for ([[maybe_unused]] auto &entry : *proc) { - EXPECT_EQ(proc->GetLocalEntryNumber(), nEntries++); + EXPECT_EQ(proc->GetCurrentEntryNumber(), nEntries++); - EXPECT_FLOAT_EQ(proc->GetLocalEntryNumber() * 2, *i); + EXPECT_FLOAT_EQ(proc->GetCurrentEntryNumber() * 2, *i); EXPECT_FLOAT_EQ(*i * 0.5f, *x); yExpected = {static_cast(*i * 0.2), 3.14, static_cast(*i * 1.3)}; @@ -211,9 +211,9 @@ TEST_F(RNTupleJoinProcessorTest, UnalignedMultipleJoinFields) auto x = proc->GetEntry().GetPtr("x"); auto a = proc->GetEntry().GetPtr("ntuple4.a"); for ([[maybe_unused]] auto &entry : *proc) { - EXPECT_EQ(proc->GetLocalEntryNumber(), nEntries++); + EXPECT_EQ(proc->GetCurrentEntryNumber(), nEntries++); - EXPECT_FLOAT_EQ(proc->GetLocalEntryNumber() * 2, *i); + EXPECT_FLOAT_EQ(proc->GetCurrentEntryNumber() * 2, *i); EXPECT_FLOAT_EQ(*i * 0.5f, *x); EXPECT_EQ(*i * 0.1f, *a); } @@ -232,9 +232,9 @@ TEST_F(RNTupleJoinProcessorTest, MissingEntries) auto a = proc->GetEntry().GetPtr("ntuple4.a"); std::vector yExpected; for ([[maybe_unused]] auto &entry : *proc) { - EXPECT_EQ(proc->GetLocalEntryNumber(), nEntries++); + EXPECT_EQ(proc->GetCurrentEntryNumber(), nEntries++); - EXPECT_FLOAT_EQ(proc->GetLocalEntryNumber(), *i); + EXPECT_FLOAT_EQ(proc->GetCurrentEntryNumber(), *i); if (*i == 3 || *i == 9) { EXPECT_EQ(0.f, *a) << "entries with i=3 and i=9 are missing from ntuple4, ntuple4.a should have been " @@ -272,9 +272,9 @@ TEST_F(RNTupleJoinProcessorTest, WithModel) int nEntries = 0; std::vector yExpected; for (auto &entry : *proc) { - EXPECT_EQ(proc->GetLocalEntryNumber(), nEntries++); + EXPECT_EQ(proc->GetCurrentEntryNumber(), nEntries++); - EXPECT_EQ(proc->GetLocalEntryNumber() * 2, *i); + EXPECT_EQ(proc->GetCurrentEntryNumber() * 2, *i); EXPECT_EQ(*entry.GetPtr("i"), *i); EXPECT_FLOAT_EQ(*i * 0.5f, *x); From 5b31c5d3b605194bada040bc49644bf1e547341d Mon Sep 17 00:00:00 2001 From: Florine de Geus Date: Thu, 9 Jan 2025 10:52:39 +0100 Subject: [PATCH 3/6] [ntuple] Store model in base processor class --- tree/ntuple/v7/inc/ROOT/RNTupleProcessor.hxx | 29 ++++++--- tree/ntuple/v7/src/RNTupleProcessor.cxx | 65 ++++++++++---------- tree/ntuple/v7/test/ntuple_processor.cxx | 4 +- 3 files changed, 53 insertions(+), 45 deletions(-) diff --git a/tree/ntuple/v7/inc/ROOT/RNTupleProcessor.hxx b/tree/ntuple/v7/inc/ROOT/RNTupleProcessor.hxx index 7e5f65ad1cf53..f7caf934de15f 100644 --- a/tree/ntuple/v7/inc/ROOT/RNTupleProcessor.hxx +++ b/tree/ntuple/v7/inc/ROOT/RNTupleProcessor.hxx @@ -120,9 +120,11 @@ protected: // Maps the (qualified) field name to its corresponding field context. std::unordered_map fFieldContexts; - NTupleSize_t fNEntriesProcessed; //< Total number of entries processed so far - std::size_t fCurrentNTupleNumber; //< Index of the currently open RNTuple - NTupleSize_t fCurrentEntryNumber; //< Entry number within the current ntuple + std::unique_ptr fModel; + + NTupleSize_t fNEntriesProcessed = 0; //< Total number of entries processed so far + std::size_t fCurrentNTupleNumber = 0; //< Index of the currently open RNTuple + NTupleSize_t fCurrentEntryNumber = 0; //< Entry number within the current ntuple ///////////////////////////////////////////////////////////////////////////// /// \brief Creates and connects a concrete field to the current page source, based on its proto field. @@ -136,8 +138,16 @@ protected: /// \return `entryNumber` if the entry was successfully loaded, `kInvalidNTupleIndex` otherwise. virtual NTupleSize_t LoadEntry(NTupleSize_t entryNumber) = 0; - RNTupleProcessor(const std::vector &ntuples) - : fNTuples(ntuples), fNEntriesProcessed(0), fCurrentNTupleNumber(0), fCurrentEntryNumber(0) + ///////////////////////////////////////////////////////////////////////////// + /// \brief Create a new base RNTupleProcessor. + /// + /// \param[in] ntuples The input RNTuples for processing + /// \param[in] model The RNTupleModel representing the entries returned by the processor. + /// + /// \note Before processing, a model *must* exist. However, this is handled downstream by the RNTupleProcessor's + /// factory functions (CreateSingle, CreateChain and CreateJoin) and constructors. + RNTupleProcessor(const std::vector &ntuples, std::unique_ptr model) + : fNTuples(ntuples), fModel(std::move(model)) { } @@ -164,6 +174,8 @@ public: /// When only one RNTuple is present in the processor chain, the return value is equal to GetGlobalEntryNumber. NTupleSize_t GetCurrentEntryNumber() const { return fCurrentEntryNumber; } + const RNTupleModel &GetModel() const { return *fModel; } + ///////////////////////////////////////////////////////////////////////////// /// \brief Returns a reference to the entry used by the processor. /// @@ -229,8 +241,8 @@ public: RIterator begin() { return RIterator(*this, 0); } RIterator end() { return RIterator(*this, kInvalidNTupleIndex); } - static std::unique_ptr Create(const RNTupleOpenSpec &ntuple); - static std::unique_ptr Create(const RNTupleOpenSpec &ntuple, RNTupleModel &model); + static std::unique_ptr + Create(const RNTupleOpenSpec &ntuple, std::unique_ptr model = nullptr); ///////////////////////////////////////////////////////////////////////////// /// \brief Create a new RNTuple processor chain for vertical concatenation of RNTuples. @@ -292,7 +304,7 @@ private: /// /// \param[in] ntuple The source specification (name and storage location) for the RNTuple to process. /// \param[in] model The model that specifies which fields should be read by the processor. - RNTupleSingleProcessor(const RNTupleOpenSpec &ntuple, RNTupleModel &model); + RNTupleSingleProcessor(const RNTupleOpenSpec &ntuple, std::unique_ptr model); }; // clang-format off @@ -349,7 +361,6 @@ class RNTupleJoinProcessor : public RNTupleProcessor { friend class RNTupleProcessor; private: - std::unique_ptr fJoinModel; std::vector> fAuxiliaryPageSources; /// Tokens representing the join fields present in the main RNTuple std::vector fJoinFieldTokens; diff --git a/tree/ntuple/v7/src/RNTupleProcessor.cxx b/tree/ntuple/v7/src/RNTupleProcessor.cxx index c2bc4d3d583a3..9c38f9f588dcc 100644 --- a/tree/ntuple/v7/src/RNTupleProcessor.cxx +++ b/tree/ntuple/v7/src/RNTupleProcessor.cxx @@ -32,18 +32,14 @@ void EnsureUniqueNTupleNames(const std::vector &ntuples) } // anonymous namespace std::unique_ptr -ROOT::Experimental::RNTupleProcessor::Create(const RNTupleOpenSpec &ntuple) +ROOT::Experimental::RNTupleProcessor::Create(const RNTupleOpenSpec &ntuple, std::unique_ptr model) { - auto pageSource = Internal::RPageSource::Create(ntuple.fNTupleName, ntuple.fStorage); - pageSource->Attach(); - auto model = pageSource->GetSharedDescriptorGuard()->CreateModel(); - return RNTupleProcessor::Create(ntuple, *model); -} - -std::unique_ptr -ROOT::Experimental::RNTupleProcessor::Create(const RNTupleOpenSpec &ntuple, RNTupleModel &model) -{ - return std::unique_ptr(new RNTupleSingleProcessor(ntuple, model)); + if (!model) { + auto pageSource = Internal::RPageSource::Create(ntuple.fNTupleName, ntuple.fStorage); + pageSource->Attach(); + model = pageSource->GetSharedDescriptorGuard()->CreateModel(); + } + return std::unique_ptr(new RNTupleSingleProcessor(ntuple, std::move(model))); } std::unique_ptr @@ -119,13 +115,14 @@ void ROOT::Experimental::RNTupleProcessor::ConnectField(RFieldContext &fieldCont //------------------------------------------------------------------------------ -ROOT::Experimental::RNTupleSingleProcessor::RNTupleSingleProcessor(const RNTupleOpenSpec &ntuple, RNTupleModel &model) - : RNTupleProcessor({ntuple}) +ROOT::Experimental::RNTupleSingleProcessor::RNTupleSingleProcessor(const RNTupleOpenSpec &ntuple, + std::unique_ptr model) + : RNTupleProcessor({ntuple}, std::move(model)) { fPageSource = Internal::RPageSource::Create(ntuple.fNTupleName, ntuple.fStorage); - model.Freeze(); - fEntry = model.CreateEntry(); + fModel->Freeze(); + fEntry = fModel->CreateEntry(); for (const auto &value : *fEntry) { auto &field = value.GetField(); @@ -134,8 +131,8 @@ ROOT::Experimental::RNTupleSingleProcessor::RNTupleSingleProcessor(const RNTuple // If the model has a default entry, use the value pointers from the entry in the entry managed by the // processor. This way, the pointers returned by RNTupleModel::MakeField can be used in the processor loop to // access the corresponding field values. - if (!model.IsBare()) { - auto valuePtr = model.GetDefaultEntry().GetPtr(token); + if (!fModel->IsBare()) { + auto valuePtr = fModel->GetDefaultEntry().GetPtr(token); fEntry->BindValue(token, valuePtr); } @@ -176,7 +173,7 @@ void ROOT::Experimental::RNTupleSingleProcessor::Connect() ROOT::Experimental::RNTupleChainProcessor::RNTupleChainProcessor(const std::vector &ntuples, std::unique_ptr model) - : RNTupleProcessor(ntuples) + : RNTupleProcessor(ntuples, std::move(model)) { if (fNTuples.empty()) throw RException(R__FAIL("at least one RNTuple must be provided")); @@ -192,11 +189,11 @@ ROOT::Experimental::RNTupleChainProcessor::RNTupleChainProcessor(const std::vect throw RException(R__FAIL("first RNTuple does not contain any entries")); } - if (!model) - model = fPageSource->GetSharedDescriptorGuard()->CreateModel(); + if (!fModel) + fModel = fPageSource->GetSharedDescriptorGuard()->CreateModel(); - model->Freeze(); - fEntry = model->CreateEntry(); + fModel->Freeze(); + fEntry = fModel->CreateEntry(); for (const auto &value : *fEntry) { auto &field = value.GetField(); @@ -205,8 +202,8 @@ ROOT::Experimental::RNTupleChainProcessor::RNTupleChainProcessor(const std::vect // If the model has a default entry, use the value pointers from the entry in the entry managed by the // processor. This way, the pointers returned by RNTupleModel::MakeField can be used in the processor loop to // access the corresponding field values. - if (!model->IsBare()) { - auto valuePtr = model->GetDefaultEntry().GetPtr(token); + if (!fModel->IsBare()) { + auto valuePtr = fModel->GetDefaultEntry().GetPtr(token); fEntry->BindValue(token, valuePtr); } @@ -275,7 +272,7 @@ ROOT::Experimental::NTupleSize_t ROOT::Experimental::RNTupleChainProcessor::Load ROOT::Experimental::RNTupleJoinProcessor::RNTupleJoinProcessor(const RNTupleOpenSpec &mainNTuple, std::unique_ptr model) - : RNTupleProcessor({mainNTuple}) + : RNTupleProcessor({mainNTuple}, nullptr) { fPageSource = Internal::RPageSource::Create(mainNTuple.fNTupleName, mainNTuple.fStorage); fPageSource->Attach(); @@ -287,9 +284,9 @@ ROOT::Experimental::RNTupleJoinProcessor::RNTupleJoinProcessor(const RNTupleOpen if (!model) model = fPageSource->GetSharedDescriptorGuard()->CreateModel(); - fJoinModel = model->Clone(); - fJoinModel->Freeze(); - fEntry = fJoinModel->CreateEntry(); + fModel = model->Clone(); + fModel->Freeze(); + fEntry = fModel->CreateEntry(); for (const auto &value : *fEntry) { auto &field = value.GetField(); @@ -298,7 +295,7 @@ ROOT::Experimental::RNTupleJoinProcessor::RNTupleJoinProcessor(const RNTupleOpen // If the model has a default entry, use the value pointers from the default entry of the model that was passed to // this constructor. This way, the pointers returned by RNTupleModel::MakeField can be used in the processor loop // to access the corresponding field values. - if (!fJoinModel->IsBare()) { + if (!fModel->IsBare()) { auto valuePtr = model->GetDefaultEntry().GetPtr(fieldName); fEntry->BindValue(fieldName, valuePtr); } @@ -331,7 +328,7 @@ void ROOT::Experimental::RNTupleJoinProcessor::AddAuxiliary(const RNTupleOpenSpe auto entry = model->CreateBareEntry(); // Append the auxiliary fields to the join model - fJoinModel->Unfreeze(); + fModel->Unfreeze(); // The fields of the auxiliary ntuple are contained in an anonymous record field and subsequently registered as // subfields to the join model. This way they can be accessed through the processor as `auxNTupleName.fieldName`, @@ -352,16 +349,16 @@ void ROOT::Experimental::RNTupleJoinProcessor::AddAuxiliary(const RNTupleOpenSpe } const auto &subFields = auxParentField->GetSubFields(); - fJoinModel->AddField(std::move(auxParentField)); + fModel->AddField(std::move(auxParentField)); for (const auto &field : subFields) { - fJoinModel->RegisterSubfield(field->GetQualifiedFieldName()); + fModel->RegisterSubfield(field->GetQualifiedFieldName()); } - fJoinModel->Freeze(); + fModel->Freeze(); // After modifying the join model, we need to create a new entry since the old one is invalidated. However, we do // want to carry over the value pointers, so the pointers returned by `MakeField` during the creation of the original // model by the user can be used in the processor loop. - auto newEntry = fJoinModel->CreateEntry(); + auto newEntry = fModel->CreateEntry(); for (const auto &value : *newEntry) { const auto &field = value.GetField(); diff --git a/tree/ntuple/v7/test/ntuple_processor.cxx b/tree/ntuple/v7/test/ntuple_processor.cxx index 16deefbf2a459..09d161d83ed37 100644 --- a/tree/ntuple/v7/test/ntuple_processor.cxx +++ b/tree/ntuple/v7/test/ntuple_processor.cxx @@ -69,7 +69,7 @@ TEST_F(RNTupleProcessorTest, BaseWithModel) auto model = RNTupleModel::Create(); auto fldX = model->MakeField("x"); - auto proc = RNTupleProcessor::Create(ntuple, *model); + auto proc = RNTupleProcessor::Create(ntuple, std::move(model)); int nEntries = 0; @@ -97,7 +97,7 @@ TEST_F(RNTupleProcessorTest, BaseWithBareModel) auto model = RNTupleModel::CreateBare(); model->MakeField("x"); - auto proc = RNTupleProcessor::Create(ntuple, *model); + auto proc = RNTupleProcessor::Create(ntuple, std::move(model)); int nEntries = 0; From 8494e6344a594fb1fc1397d18cd6e55c844bbefd Mon Sep 17 00:00:00 2001 From: Florine de Geus Date: Thu, 9 Jan 2025 09:59:07 +0100 Subject: [PATCH 4/6] [ntuple] Make `RNTupleChainProcessor` composable With this change, the `RNTupleChainProcessor` iterates over other `RNTupleProcessor` objects instead of individual ntuples. This allows us to chain, for example, ntuples that have previously been joined using the `RNTupleJoinProcessor`. --- tree/ntuple/v7/inc/ROOT/RNTupleProcessor.hxx | 92 ++++++++--- tree/ntuple/v7/src/RNTupleProcessor.cxx | 149 +++++++++++------- .../ntuple/v7/test/ntuple_processor_chain.cxx | 23 +-- 3 files changed, 171 insertions(+), 93 deletions(-) diff --git a/tree/ntuple/v7/inc/ROOT/RNTupleProcessor.hxx b/tree/ntuple/v7/inc/ROOT/RNTupleProcessor.hxx index f7caf934de15f..1d6b3cf16b0f7 100644 --- a/tree/ntuple/v7/inc/ROOT/RNTupleProcessor.hxx +++ b/tree/ntuple/v7/inc/ROOT/RNTupleProcessor.hxx @@ -74,6 +74,10 @@ entry. Additional bookkeeping information can be obtained through the RNTuplePro */ // clang-format on class RNTupleProcessor { + friend class RNTupleSingleProcessor; + friend class RNTupleChainProcessor; + friend class RNTupleJoinProcessor; + protected: // clang-format off /** @@ -122,9 +126,9 @@ protected: std::unique_ptr fModel; - NTupleSize_t fNEntriesProcessed = 0; //< Total number of entries processed so far - std::size_t fCurrentNTupleNumber = 0; //< Index of the currently open RNTuple - NTupleSize_t fCurrentEntryNumber = 0; //< Entry number within the current ntuple + NTupleSize_t fNEntriesProcessed = 0; //< Total number of entries processed so far + NTupleSize_t fCurrentEntryNumber = 0; //< Current processor entry number + std::size_t fCurrentProcessorNumber = 0; //< Number of the currently open inner processor ///////////////////////////////////////////////////////////////////////////// /// \brief Creates and connects a concrete field to the current page source, based on its proto field. @@ -138,6 +142,16 @@ protected: /// \return `entryNumber` if the entry was successfully loaded, `kInvalidNTupleIndex` otherwise. virtual NTupleSize_t LoadEntry(NTupleSize_t entryNumber) = 0; + ///////////////////////////////////////////////////////////////////////////// + /// \brief Points processor entry's field values to those in the provided entry. + /// + /// \param[in] entry The entry whose field values to use. + virtual void SetEntryPointers(const REntry &entry) = 0; + + ///////////////////////////////////////////////////////////////////////////// + /// \brief Get the total number of entries in this processor + virtual NTupleSize_t GetNEntries() = 0; + ///////////////////////////////////////////////////////////////////////////// /// \brief Create a new base RNTupleProcessor. /// @@ -160,19 +174,17 @@ public: ///////////////////////////////////////////////////////////////////////////// /// \brief Get the total number of entries processed so far. - /// - /// When only one RNTuple is present in the processor chain, the return value is equal to GetLocalEntryNumber. NTupleSize_t GetNEntriesProcessed() const { return fNEntriesProcessed; } ///////////////////////////////////////////////////////////////////////////// - /// \brief Get the index to the RNTuple currently being processed, according to the sources specified upon creation. - std::size_t GetCurrentNTupleNumber() const { return fCurrentNTupleNumber; } + /// \brief Get the entry number that is currently being processed. + NTupleSize_t GetCurrentEntryNumber() const { return fCurrentEntryNumber; } ///////////////////////////////////////////////////////////////////////////// - /// \brief Get the entry number local to the RNTuple that is currently being processed. + /// \brief Get the number of the inner processor currently being read. /// - /// When only one RNTuple is present in the processor chain, the return value is equal to GetGlobalEntryNumber. - NTupleSize_t GetCurrentEntryNumber() const { return fCurrentEntryNumber; } + /// This method is only relevant for the RNTupleChainProcessor. For the other processors, 0 is always returned. + std::size_t GetCurrentProcessorNumber() const { return fCurrentProcessorNumber; } const RNTupleModel &GetModel() const { return *fModel; } @@ -180,7 +192,6 @@ public: /// \brief Returns a reference to the entry used by the processor. /// /// \return A reference to the entry used by the processor. - /// const REntry &GetEntry() const { return *fEntry; } // clang-format off @@ -207,8 +218,8 @@ public: : fProcessor(processor), fCurrentEntryNumber(entryNumber) { // This constructor is called with kInvalidNTupleIndex for RNTupleProcessor::end(). In that case, we already - // know there is nothing to advance to. - if (processor.GetCurrentEntryNumber() != kInvalidNTupleIndex) { + // know there is nothing to load. + if (fCurrentEntryNumber != kInvalidNTupleIndex) { fCurrentEntryNumber = fProcessor.LoadEntry(fCurrentEntryNumber); } } @@ -226,7 +237,7 @@ public: return obj; } - reference operator*() { return *fProcessor.fEntry; } + reference operator*() { return fProcessor.GetEntry(); } friend bool operator!=(const iterator &lh, const iterator &rh) { @@ -255,6 +266,17 @@ public: static std::unique_ptr CreateChain(const std::vector &ntuples, std::unique_ptr model = nullptr); + ///////////////////////////////////////////////////////////////////////////// + /// \brief Create a new RNTuple processor chain for vertical concatenation of previously created processors. + /// + /// \param[in] innerProcessors A list with the processors to chain. + /// \param[in] model An RNTupleModel specifying which fields can be read by the processor. If no model is provided, + /// one will be created based on the descriptor of the first ntuple specified. + /// + /// \return A pointer to the newly created RNTupleProcessor. + static std::unique_ptr CreateChain(std::vector> innerProcessors, + std::unique_ptr model = nullptr); + ///////////////////////////////////////////////////////////////////////////// /// \brief Create a new RNTuple processor for horizontallly concatenated RNTuples. /// @@ -294,11 +316,24 @@ private: void Connect(); ///////////////////////////////////////////////////////////////////////////// - /// \brief Load the entry identified by the provided entry number. + /// \brief Load the entry identified by the provided (global) entry number (i.e., considering all RNTuples in this + /// processor). /// /// \sa ROOT::Experimental::RNTupleProcessor::LoadEntry NTupleSize_t LoadEntry(NTupleSize_t entryNumber) final; + ///////////////////////////////////////////////////////////////////////////// + /// \sa ROOT::Experimental::RNTupleProcessor::SetEntryPointers. + void SetEntryPointers(const REntry &entry) final; + + ///////////////////////////////////////////////////////////////////////////// + /// \brief Get the total number of entries in this processor. + NTupleSize_t GetNEntries() final + { + Connect(); + return fPageSource->GetNEntries(); + } + ///////////////////////////////////////////////////////////////////////////// /// \brief Constructs a new RNTupleProcessor for processing a single RNTuple. /// @@ -318,6 +353,7 @@ class RNTupleChainProcessor : public RNTupleProcessor { friend class RNTupleProcessor; private: + std::vector> fInnerProcessors; std::vector fInnerNEntries; ///////////////////////////////////////////////////////////////////////////// @@ -328,15 +364,12 @@ private: NTupleSize_t LoadEntry(NTupleSize_t entryNumber) final; ///////////////////////////////////////////////////////////////////////////// - /// \brief Connect an RNTuple for processing. - /// - /// \param[in] ntupleNumber Which RNTuple (according to the initial specified order) to connect. - /// - /// \return The number of entries in the newly-connected RNTuple. - /// - /// Creates and attaches new page source for the specified RNTuple, and connects the fields that are known by - /// the processor to it. - NTupleSize_t ConnectNTuple(std::size_t ntupleNumber); + /// \sa ROOT::Experimental::RNTupleProcessor::SetEntryPointers. + void SetEntryPointers(const REntry &) final; + + ///////////////////////////////////////////////////////////////////////////// + /// \brief Get the total number of entries in this processor. + NTupleSize_t GetNEntries() final; ///////////////////////////////////////////////////////////////////////////// /// \brief Constructs a new RNTupleChainProcessor. @@ -347,7 +380,8 @@ private: /// specified, it is created from the descriptor of the first RNTuple specified in `ntuples`. /// /// RNTuples are processed in the order in which they are specified. - RNTupleChainProcessor(const std::vector &ntuples, std::unique_ptr model = nullptr); + RNTupleChainProcessor(std::vector> processors, + std::unique_ptr model); }; // clang-format off @@ -374,6 +408,14 @@ private: /// \sa ROOT::Experimental::RNTupleProcessor::LoadEntry NTupleSize_t LoadEntry(NTupleSize_t entryNumber) final; + ///////////////////////////////////////////////////////////////////////////// + /// \sa ROOT::Experimental::RNTupleProcessor::SetEntryPointers. + void SetEntryPointers(const REntry &) final; + + ///////////////////////////////////////////////////////////////////////////// + /// \brief Get the total number of entries in this processor. + NTupleSize_t GetNEntries() final { return fPageSource->GetNEntries(); } + ///////////////////////////////////////////////////////////////////////////// /// \brief Constructs a new RNTupleJoinProcessor. /// diff --git a/tree/ntuple/v7/src/RNTupleProcessor.cxx b/tree/ntuple/v7/src/RNTupleProcessor.cxx index 9c38f9f588dcc..f2767e4bda380 100644 --- a/tree/ntuple/v7/src/RNTupleProcessor.cxx +++ b/tree/ntuple/v7/src/RNTupleProcessor.cxx @@ -46,7 +46,40 @@ std::unique_ptr ROOT::Experimental::RNTupleProcessor::CreateChain(const std::vector &ntuples, std::unique_ptr model) { - return std::unique_ptr(new RNTupleChainProcessor(ntuples, std::move(model))); + if (ntuples.empty()) + throw RException(R__FAIL("at least one RNTuple must be provided")); + + std::vector> innerProcessors; + innerProcessors.reserve(ntuples.size()); + + // If no model is provided, infer it from the first ntuple. + if (!model) { + auto firstPageSource = Internal::RPageSource::Create(ntuples[0].fNTupleName, ntuples[0].fStorage); + firstPageSource->Attach(); + model = firstPageSource->GetSharedDescriptorGuard()->CreateModel(); + } + + for (const auto &ntuple : ntuples) { + innerProcessors.emplace_back(Create(ntuple, model->Clone())); + } + + return CreateChain(std::move(innerProcessors), std::move(model)); +} + +std::unique_ptr +ROOT::Experimental::RNTupleProcessor::CreateChain(std::vector> innerProcessors, + std::unique_ptr model) +{ + if (innerProcessors.empty()) + throw RException(R__FAIL("at least one inner processor must be provided")); + + // If no model is provided, infer it from the first inner processor. + if (!model) { + model = innerProcessors[0]->GetModel().Clone(); + } + + return std::unique_ptr( + new RNTupleChainProcessor(std::move(innerProcessors), std::move(model))); } std::unique_ptr @@ -155,6 +188,16 @@ ROOT::Experimental::NTupleSize_t ROOT::Experimental::RNTupleSingleProcessor::Loa return entryNumber; } +void ROOT::Experimental::RNTupleSingleProcessor::SetEntryPointers(const REntry &entry) +{ + for (const auto &value : *fEntry) { + auto &field = value.GetField(); + auto valuePtr = entry.GetPtr(field.GetQualifiedFieldName()); + + fEntry->BindValue(field.GetQualifiedFieldName(), valuePtr); + } +} + void ROOT::Experimental::RNTupleSingleProcessor::Connect() { if (fIsConnected) @@ -171,33 +214,19 @@ void ROOT::Experimental::RNTupleSingleProcessor::Connect() //------------------------------------------------------------------------------ -ROOT::Experimental::RNTupleChainProcessor::RNTupleChainProcessor(const std::vector &ntuples, - std::unique_ptr model) - : RNTupleProcessor(ntuples, std::move(model)) +ROOT::Experimental::RNTupleChainProcessor::RNTupleChainProcessor( + std::vector> processors, std::unique_ptr model) + : RNTupleProcessor({}, std::move(model)), fInnerProcessors(std::move(processors)) { - if (fNTuples.empty()) - throw RException(R__FAIL("at least one RNTuple must be provided")); - - fInnerNEntries.assign(ntuples.size(), kInvalidNTupleIndex); - - fPageSource = Internal::RPageSource::Create(fNTuples[0].fNTupleName, fNTuples[0].fStorage); - fPageSource->Attach(); - - fInnerNEntries[0] = fPageSource->GetNEntries(); - - if (fPageSource->GetNEntries() == 0) { - throw RException(R__FAIL("first RNTuple does not contain any entries")); - } - - if (!fModel) - fModel = fPageSource->GetSharedDescriptorGuard()->CreateModel(); + fInnerNEntries.assign(fInnerProcessors.size(), kInvalidNTupleIndex); + fInnerNEntries[0] = fInnerProcessors[0]->GetNEntries(); fModel->Freeze(); fEntry = fModel->CreateEntry(); for (const auto &value : *fEntry) { auto &field = value.GetField(); - auto token = fEntry->GetToken(field.GetFieldName()); + auto token = fEntry->GetToken(field.GetQualifiedFieldName()); // If the model has a default entry, use the value pointers from the entry in the entry managed by the // processor. This way, the pointers returned by RNTupleModel::MakeField can be used in the processor loop to @@ -206,63 +235,65 @@ ROOT::Experimental::RNTupleChainProcessor::RNTupleChainProcessor(const std::vect auto valuePtr = fModel->GetDefaultEntry().GetPtr(token); fEntry->BindValue(token, valuePtr); } + } - const auto &[fieldContext, _] = - fFieldContexts.try_emplace(field.GetFieldName(), field.Clone(field.GetFieldName()), token); - ConnectField(fieldContext->second, *fPageSource, *fEntry); + for (auto &innerProc : fInnerProcessors) { + innerProc->SetEntryPointers(*fEntry); } } -ROOT::Experimental::NTupleSize_t ROOT::Experimental::RNTupleChainProcessor::ConnectNTuple(std::size_t ntupleNumber) +ROOT::Experimental::NTupleSize_t ROOT::Experimental::RNTupleChainProcessor::GetNEntries() { - if (ntupleNumber >= fNTuples.size()) - return kInvalidNTupleIndex; + NTupleSize_t nEntries = 0; - const auto &ntuple = fNTuples[ntupleNumber]; + for (unsigned i = 0; i < fInnerProcessors.size(); ++i) { + if (fInnerNEntries[i] == kInvalidNTupleIndex) { + fInnerNEntries[i] = fInnerProcessors[i]->GetNEntries(); + } - // Before destroying the current page source and replacing it by the new one, we need to reset the concrete fields - // belonging to the current page source. Otherwise, these concrete fields become invalid. - for (auto &[_, fieldContext] : fFieldContexts) { - fieldContext.ResetConcreteField(); + nEntries += fInnerNEntries[i]; } - // Replace the current page source with a new one, belonging to the provided ntuple. - fPageSource = Internal::RPageSource::Create(ntuple.fNTupleName, ntuple.fStorage); - fPageSource->Attach(); - fInnerNEntries[ntupleNumber] = fPageSource->GetNEntries(); + return nEntries; +} - // Now that the new page source has been created and attached, we can create and connect the concrete fields again. - for (auto &[_, fieldContext] : fFieldContexts) { - ConnectField(fieldContext, *fPageSource, *fEntry); +void ROOT::Experimental::RNTupleChainProcessor::SetEntryPointers(const REntry &entry) +{ + for (const auto &value : *fEntry) { + auto &field = value.GetField(); + auto valuePtr = entry.GetPtr(field.GetQualifiedFieldName()); + + fEntry->BindValue(field.GetQualifiedFieldName(), valuePtr); } - return fPageSource->GetNEntries(); + for (auto &innerProc : fInnerProcessors) { + innerProc->SetEntryPointers(*fEntry); + } } ROOT::Experimental::NTupleSize_t ROOT::Experimental::RNTupleChainProcessor::LoadEntry(NTupleSize_t entryNumber) { NTupleSize_t localEntryNumber = entryNumber; - size_t currentNTuple = fCurrentNTupleNumber; + size_t currProcessor = 0; - while (localEntryNumber >= fInnerNEntries[currentNTuple]) { - localEntryNumber -= fInnerNEntries[currentNTuple]; + // Determine to which inner processer and local entry number the provided global entry number belongs. + while (localEntryNumber >= fInnerNEntries[currProcessor]) { + localEntryNumber -= fInnerNEntries[currProcessor]; - if (++currentNTuple >= fNTuples.size()) + // The provided global entry number is larger than the number of available entries. + if (++currProcessor >= fInnerProcessors.size()) return kInvalidNTupleIndex; - if (fInnerNEntries[currentNTuple] == kInvalidNTupleIndex) { - auto tmpPageSource = - Internal::RPageSource::Create(fNTuples[currentNTuple].fNTupleName, fNTuples[currentNTuple].fStorage); - tmpPageSource->Attach(); - fInnerNEntries[currentNTuple] = tmpPageSource->GetNEntries(); + if (fInnerNEntries[currProcessor] == kInvalidNTupleIndex) { + fInnerNEntries[currProcessor] = fInnerProcessors[currProcessor]->GetNEntries(); } } - if (currentNTuple != fCurrentNTupleNumber) { - ConnectNTuple(currentNTuple); - } + if (currProcessor != fCurrentProcessorNumber) + fCurrentProcessorNumber = currProcessor; + + fInnerProcessors[fCurrentProcessorNumber]->LoadEntry(localEntryNumber); - fEntry->Read(localEntryNumber); fNEntriesProcessed++; fCurrentEntryNumber = entryNumber; return entryNumber; @@ -410,6 +441,18 @@ void ROOT::Experimental::RNTupleJoinProcessor::ConnectFields() } } +void ROOT::Experimental::RNTupleJoinProcessor::SetEntryPointers(const REntry &entry) +{ + for (const auto &[_, fieldContext] : fFieldContexts) { + auto fieldName = fieldContext.GetProtoField().GetQualifiedFieldName(); + if (fieldContext.IsAuxiliary()) { + fieldName = fNTuples[fieldContext.fNTupleIdx].fNTupleName + "." + fieldName; + } + auto valuePtr = entry.GetPtr(fieldName); + fEntry->BindValue(fieldName, valuePtr); + } +} + ROOT::Experimental::NTupleSize_t ROOT::Experimental::RNTupleJoinProcessor::LoadEntry(NTupleSize_t entryNumber) { if (entryNumber >= fPageSource->GetNEntries()) diff --git a/tree/ntuple/v7/test/ntuple_processor_chain.cxx b/tree/ntuple/v7/test/ntuple_processor_chain.cxx index d49a5469b4de1..b1ce5db25efc7 100644 --- a/tree/ntuple/v7/test/ntuple_processor_chain.cxx +++ b/tree/ntuple/v7/test/ntuple_processor_chain.cxx @@ -103,15 +103,11 @@ TEST_F(RNTupleChainProcessorTest, Basic) std::uint64_t nEntries = 0; auto proc = RNTupleProcessor::CreateChain(ntuples); + auto x = proc->GetEntry().GetPtr("x"); for (const auto &entry : *proc) { EXPECT_EQ(++nEntries, proc->GetNEntriesProcessed()); - if (proc->GetCurrentNTupleNumber() == 0) { - EXPECT_EQ(nEntries - 1, proc->GetCurrentEntryNumber()); - } else { - EXPECT_EQ(nEntries - 1, proc->GetCurrentEntryNumber() + 5); - } + EXPECT_EQ(nEntries - 1, proc->GetCurrentEntryNumber()); - auto x = entry.GetPtr("x"); EXPECT_EQ(static_cast(nEntries - 1), *x); auto y = entry.GetPtr>("y"); @@ -201,17 +197,14 @@ TEST_F(RNTupleChainProcessorTest, EmptyNTuples) std::uint64_t nEntries = 0; - try { - auto proc = RNTupleProcessor::CreateChain(ntuples); - FAIL() << "creating a processor where the first RNTuple does not contain any entries should throw"; - } catch (const ROOT::RException &err) { - EXPECT_THAT(err.what(), testing::HasSubstr("first RNTuple does not contain any entries")); - } - - // Empty ntuples in the middle are just skipped (as long as their model complies) - ntuples = {{fNTupleName, fFileNames[0]}, {fNTupleName, fileGuard.GetPath()}, {fNTupleName, fFileNames[1]}}; + // Empty ntuples are skipped (as long as their model complies) + ntuples = {{fNTupleName, fileGuard.GetPath()}, + {fNTupleName, fFileNames[0]}, + {fNTupleName, fileGuard.GetPath()}, + {fNTupleName, fFileNames[1]}}; auto proc = RNTupleProcessor::CreateChain(ntuples); + for (const auto &entry : *proc) { auto x = entry.GetPtr("x"); EXPECT_EQ(static_cast(nEntries), *x); From fdceaaf3ad5792ae57275eaae06899026810ef1e Mon Sep 17 00:00:00 2001 From: Florine de Geus Date: Thu, 9 Jan 2025 11:56:33 +0100 Subject: [PATCH 5/6] [ntuple] Add composed chain tests --- tree/ntuple/v7/test/ntuple_processor.cxx | 92 ++++++++++++++++++++---- 1 file changed, 78 insertions(+), 14 deletions(-) diff --git a/tree/ntuple/v7/test/ntuple_processor.cxx b/tree/ntuple/v7/test/ntuple_processor.cxx index 09d161d83ed37..d4098c2b0c108 100644 --- a/tree/ntuple/v7/test/ntuple_processor.cxx +++ b/tree/ntuple/v7/test/ntuple_processor.cxx @@ -24,27 +24,43 @@ TEST(RNTupleProcessor, EmptyNTuple) class RNTupleProcessorTest : public testing::Test { protected: - const std::string fFileName = "test_ntuple_processor.root"; - const std::string fNTupleName = "ntuple"; + const std::array fFileNames{"test_ntuple_processor1.root", "test_ntuple_processor2.root"}; + const std::array fNTupleNames{"ntuple", "ntuple_aux"}; void SetUp() override { - auto model = RNTupleModel::Create(); - auto fldX = model->MakeField("x"); - auto fldY = model->MakeField>("y"); - auto ntuple = RNTupleWriter::Recreate(std::move(model), fNTupleName, fFileName); - - for (unsigned i = 0; i < 5; i++) { - *fldX = static_cast(i); - *fldY = {static_cast(i), static_cast(i * 2)}; - ntuple->Fill(); + { + auto model = RNTupleModel::Create(); + auto fldI = model->MakeField("i"); + auto fldX = model->MakeField("x"); + auto fldY = model->MakeField>("y"); + auto ntuple = RNTupleWriter::Recreate(std::move(model), fNTupleNames[0], fFileNames[0]); + + for (unsigned i = 0; i < 5; i++) { + *fldI = i; + *fldX = static_cast(i); + *fldY = {static_cast(i), static_cast(i * 2)}; + ntuple->Fill(); + } + } + { + auto model = RNTupleModel::Create(); + auto fldI = model->MakeField("i"); + auto fldZ = model->MakeField("z"); + auto ntuple = RNTupleWriter::Recreate(std::move(model), fNTupleNames[1], fFileNames[1]); + + for (unsigned i = 0; i < 5; ++i) { + *fldI = i; + *fldZ = i * 2.f; + ntuple->Fill(); + } } } }; TEST_F(RNTupleProcessorTest, Base) { - RNTupleOpenSpec ntuple{fNTupleName, fFileName}; + RNTupleOpenSpec ntuple{fNTupleNames[0], fFileNames[0]}; auto proc = RNTupleProcessor::Create(ntuple); int nEntries = 0; @@ -64,7 +80,7 @@ TEST_F(RNTupleProcessorTest, Base) TEST_F(RNTupleProcessorTest, BaseWithModel) { - RNTupleOpenSpec ntuple{fNTupleName, fFileName}; + RNTupleOpenSpec ntuple{fNTupleNames[0], fFileNames[0]}; auto model = RNTupleModel::Create(); auto fldX = model->MakeField("x"); @@ -92,7 +108,7 @@ TEST_F(RNTupleProcessorTest, BaseWithModel) TEST_F(RNTupleProcessorTest, BaseWithBareModel) { - RNTupleOpenSpec ntuple{fNTupleName, fFileName}; + RNTupleOpenSpec ntuple{fNTupleNames[0], fFileNames[0]}; auto model = RNTupleModel::CreateBare(); model->MakeField("x"); @@ -117,3 +133,51 @@ TEST_F(RNTupleProcessorTest, BaseWithBareModel) EXPECT_EQ(nEntries, 5); EXPECT_EQ(nEntries, proc->GetNEntriesProcessed()); } + +TEST_F(RNTupleProcessorTest, ChainedChain) +{ + std::vector ntuples{{fNTupleNames[0], fFileNames[0]}, {fNTupleNames[0], fFileNames[0]}}; + + std::vector> innerProcs; + innerProcs.push_back(RNTupleProcessor::CreateChain(ntuples)); + innerProcs.push_back(RNTupleProcessor::Create(ntuples[0])); + + auto proc = RNTupleProcessor::CreateChain(std::move(innerProcs)); + + int nEntries = 0; + + for (const auto &entry [[maybe_unused]] : *proc) { + EXPECT_EQ(++nEntries, proc->GetNEntriesProcessed()); + EXPECT_EQ(nEntries - 1, proc->GetCurrentEntryNumber()); + EXPECT_EQ(*entry.GetPtr("i"), proc->GetCurrentEntryNumber() % 5); + EXPECT_EQ(static_cast(*entry.GetPtr("i")), *entry.GetPtr("x")); + } + EXPECT_EQ(nEntries, 15); + EXPECT_EQ(nEntries, proc->GetNEntriesProcessed()); +} + +TEST_F(RNTupleProcessorTest, ChainedJoin) +{ + std::vector ntuples{{fNTupleNames[0], fFileNames[0]}, {fNTupleNames[1], fFileNames[1]}}; + + std::vector> innerProcs; + innerProcs.push_back(RNTupleProcessor::CreateJoin(ntuples, {})); + innerProcs.push_back(RNTupleProcessor::CreateJoin(ntuples, {"i"})); + + auto proc = RNTupleProcessor::CreateChain(std::move(innerProcs)); + + int nEntries = 0; + + auto x = proc->GetEntry().GetPtr("x"); + + for (const auto &entry [[maybe_unused]] : *proc) { + EXPECT_EQ(++nEntries, proc->GetNEntriesProcessed()); + EXPECT_EQ(nEntries - 1, proc->GetCurrentEntryNumber()); + EXPECT_EQ(*entry.GetPtr("i"), proc->GetCurrentEntryNumber() % 5); + + EXPECT_EQ(static_cast(*entry.GetPtr("i")), *x); + EXPECT_EQ(*x * 2, *entry.GetPtr("ntuple_aux.z")); + } + EXPECT_EQ(nEntries, 10); + EXPECT_EQ(nEntries, proc->GetNEntriesProcessed()); +} From b538fccbee8723d26d079803e86fec06aff07214 Mon Sep 17 00:00:00 2001 From: Florine de Geus Date: Thu, 9 Jan 2025 15:39:46 +0100 Subject: [PATCH 6/6] [ntuple] Update chain processing tutorial --- tutorials/io/ntuple/ntpl012_processor_chain.C | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/tutorials/io/ntuple/ntpl012_processor_chain.C b/tutorials/io/ntuple/ntpl012_processor_chain.C index 1b019b119f631..983c6b52ed8ec 100644 --- a/tutorials/io/ntuple/ntpl012_processor_chain.C +++ b/tutorials/io/ntuple/ntpl012_processor_chain.C @@ -74,12 +74,14 @@ void Read(const std::vector &ntuples) // Access to the entry values in this case can be achieved through RNTupleProcessor::GetEntry() or through its // iterator. auto processor = RNTupleProcessor::CreateChain(ntuples, std::move(model)); + int prevProcessorNumber{-1}; for (const auto &entry : *processor) { // The RNTupleProcessor provides some additional bookkeeping information. The local entry number is reset each // a new ntuple in the chain is opened for processing. - if (processor->GetLocalEntryNumber() == 0) { - std::cout << "Processing " << ntuples.at(processor->GetCurrentNTupleNumber()).fNTupleName << " (" + if (static_cast(processor->GetCurrentProcessorNumber()) > prevProcessorNumber) { + prevProcessorNumber = processor->GetCurrentProcessorNumber(); + std::cout << "Processing " << ntuples.at(prevProcessorNumber).fNTupleName << " (" << processor->GetNEntriesProcessed() << " total entries processed so far)" << std::endl; }