Skip to content

Commit

Permalink
DPL Analysis: move ownership of payloads to the fragment (#13931)
Browse files Browse the repository at this point in the history
This makes sure the FileFragment is the entity which owns the
TTree / RNtuple, so that its caching and memory management have the
correct life-cycle and we do not end up with memory churn or having
to reconfigure the caches.
  • Loading branch information
ktf authored Feb 3, 2025
1 parent 9dcdaae commit ff8ba81
Show file tree
Hide file tree
Showing 6 changed files with 265 additions and 131 deletions.
60 changes: 35 additions & 25 deletions Framework/AnalysisSupport/src/RNTuplePlugin.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <ROOT/RFieldVisitor.hxx>
#include <ROOT/RNTupleInspector.hxx>
#include <ROOT/RVec.hxx>
#include <memory>
#include <TBufferFile.h>

#include <TDirectory.h>
Expand Down Expand Up @@ -51,10 +52,6 @@ class RNTupleFileSystem : public VirtualRootFileSystemBase
public:
~RNTupleFileSystem() override;

std::shared_ptr<VirtualRootFileSystemBase> GetSubFilesystem(arrow::dataset::FileSource source) override
{
return std::dynamic_pointer_cast<VirtualRootFileSystemBase>(shared_from_this());
};
virtual ROOT::Experimental::RNTuple* GetRNTuple(arrow::dataset::FileSource source) = 0;
};

Expand Down Expand Up @@ -100,9 +97,28 @@ class RNTupleFileFragment : public arrow::dataset::FileFragment
std::shared_ptr<arrow::dataset::FileFormat> format,
arrow::compute::Expression partition_expression,
std::shared_ptr<arrow::Schema> physical_schema)
: FileFragment(std::move(source), std::move(format), std::move(partition_expression), std::move(physical_schema))
: FileFragment(source, format, partition_expression, physical_schema)
{
auto fs = std::dynamic_pointer_cast<VirtualRootFileSystemBase>(source.filesystem());
if (!fs.get()) {
throw runtime_error_f("Do not know how to extract %s from %s", source.path().c_str(), fs->type_name().c_str());
}
auto handler = fs->GetObjectHandler(source);
if (!handler->format->Equals(*format)) {
throw runtime_error_f("Format for %s does not match. Found %s, expected %s.", source.path().c_str(),
handler->format->type_name().c_str(),
format->type_name().c_str());
}
mNTuple = handler->GetObjectAsOwner<ROOT::Experimental::RNTuple>();
}

ROOT::Experimental::RNTuple* GetRNTuple()
{
return mNTuple.get();
}

private:
std::unique_ptr<ROOT::Experimental::RNTuple> mNTuple;
};

class RNTupleFileFormat : public arrow::dataset::FileFormat
Expand Down Expand Up @@ -133,11 +149,10 @@ class RNTupleFileFormat : public arrow::dataset::FileFormat
arrow::Result<bool> IsSupported(const arrow::dataset::FileSource& source) const override
{
auto fs = std::dynamic_pointer_cast<VirtualRootFileSystemBase>(source.filesystem());
auto subFs = fs->GetSubFilesystem(source);
if (std::dynamic_pointer_cast<RNTupleFileSystem>(subFs)) {
return true;
if (!fs) {
return false;
}
return false;
return fs->CheckSupport(source);
}

arrow::Result<std::shared_ptr<arrow::Schema>> Inspect(const arrow::dataset::FileSource& source) const override;
Expand Down Expand Up @@ -493,11 +508,12 @@ arrow::Result<std::shared_ptr<arrow::Schema>> RNTupleFileFormat::Inspect(const a

auto fs = std::dynamic_pointer_cast<VirtualRootFileSystemBase>(source.filesystem());
// Actually get the TTree from the ROOT file.
auto ntupleFs = std::dynamic_pointer_cast<RNTupleFileSystem>(fs->GetSubFilesystem(source));
if (!ntupleFs.get()) {
throw runtime_error_f("Unknown filesystem %s\n", source.filesystem()->type_name().c_str());
auto objectHandler = fs->GetObjectHandler(source);
if (objectHandler->format->type_name() != this->type_name()) {
throw runtime_error_f("Unexpected kind of filesystem %s to handle payload %s.\n", source.filesystem()->type_name().c_str(), source.path().c_str());
}
ROOT::Experimental::RNTuple* rntuple = ntupleFs->GetRNTuple(source);
// We know this is a RNTuple, so we can continue with the inspection.
auto rntuple = objectHandler->GetObjectAsOwner<ROOT::Experimental::RNTuple>().release();

auto inspector = ROOT::Experimental::RNTupleInspector::Create(rntuple);

Expand Down Expand Up @@ -526,11 +542,8 @@ arrow::Result<arrow::RecordBatchGenerator> RNTupleFileFormat::ScanBatchesAsync(
std::vector<std::shared_ptr<arrow::Array>> columns;
std::vector<std::shared_ptr<arrow::Field>> fields = dataset_schema->fields();

auto containerFS = std::dynamic_pointer_cast<VirtualRootFileSystemBase>(ntupleFragment->source().filesystem());
auto fs = std::dynamic_pointer_cast<RNTupleFileSystem>(containerFS->GetSubFilesystem(ntupleFragment->source()));

int64_t rows = -1;
ROOT::Experimental::RNTuple* rntuple = fs->GetRNTuple(ntupleFragment->source());
ROOT::Experimental::RNTuple* rntuple = ntupleFragment->GetRNTuple();
auto reader = ROOT::Experimental::RNTupleReader::Open(rntuple);
auto& model = reader->GetModel();
for (auto& physicalField : fields) {
Expand Down Expand Up @@ -670,7 +683,7 @@ arrow::Result<arrow::RecordBatchGenerator> RNTupleFileFormat::ScanBatchesAsync(
if (!result.ok()) {
throw runtime_error("Cannot allocate offset buffer");
}
arrowOffsetBuffer = std::move(result).ValueUnsafe();
arrowOffsetBuffer = result.MoveValueUnsafe();

// Offset bulk
auto offsetBulk = model.CreateBulk(physicalField->name());
Expand All @@ -692,7 +705,7 @@ arrow::Result<arrow::RecordBatchGenerator> RNTupleFileFormat::ScanBatchesAsync(
if (!result.ok()) {
throw runtime_error("Cannot allocate values buffer");
}
arrowValuesBuffer = std::move(result).ValueUnsafe();
arrowValuesBuffer = result.MoveValueUnsafe();
ptr = (uint8_t*)(arrowValuesBuffer->mutable_data());
// Calculate the size of the buffer here.
for (size_t i = 0; i < total; i++) {
Expand Down Expand Up @@ -811,9 +824,9 @@ arrow::Result<std::shared_ptr<arrow::dataset::FileFragment>> RNTupleFileFormat::
{
std::shared_ptr<arrow::dataset::FileFormat> format = std::make_shared<RNTupleFileFormat>(mTotCompressedSize, mTotUncompressedSize);

auto fragment = std::make_shared<RNTupleFileFragment>(std::move(source), std::move(format),
std::move(partition_expression),
std::move(physical_schema));
auto fragment = std::make_shared<RNTupleFileFragment>(source, format,
partition_expression,
physical_schema);
return std::dynamic_pointer_cast<arrow::dataset::FileFragment>(fragment);
}

Expand All @@ -839,9 +852,6 @@ struct RNTupleObjectReadingImplementation : public RootArrowFactoryPlugin {
return new RootArrowFactory{
.options = [context]() { return context->format->DefaultWriteOptions(); },
.format = [context]() { return context->format; },
.getSubFilesystem = [](void* handle) {
auto rntuple = (ROOT::Experimental::RNTuple*)handle;
return std::shared_ptr<VirtualRootFileSystemBase>(new SingleRNTupleFileSystem(rntuple)); },
};
}
};
Expand Down
Loading

0 comments on commit ff8ba81

Please sign in to comment.