Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add option --parser-buffer-size for IndexBuilderMain #1698

Merged
merged 6 commits into from
Jan 8, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/global/Constants.h
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
// Copyright 2023, University of Freiburg,
// Copyright 2023 - 2025, University of Freiburg,
// Chair of Algorithms and Data Structures.
//
// Authors: Björn Buchhold <[email protected]>
// Authors: Björn Buchhold <[email protected]> [2014 - 2017]
// Johannes Kalmbach <[email protected]>
// Hannah Bast <[email protected]>

Expand All @@ -22,6 +21,7 @@ using namespace ad_utility::memory_literals;
constexpr inline ad_utility::MemorySize DEFAULT_MEMORY_LIMIT_INDEX_BUILDING =
5_GB;
constexpr inline ad_utility::MemorySize STXXL_DISK_SIZE_INDEX_BUILDER = 1_GB;
constexpr inline ad_utility::MemorySize DEFAULT_PARSER_BUFFER_SIZE = 10_MB;

constexpr inline ad_utility::MemorySize DEFAULT_MEM_FOR_QUERIES = 4_GB;

Expand Down
4 changes: 0 additions & 4 deletions src/index/ConstantsIndexBuilding.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,6 @@ constexpr inline size_t PARSER_BATCH_SIZE = 1'000'000;
// streams faster.
constexpr inline size_t PARSER_MIN_TRIPLES_AT_ONCE = 10'000;

// When reading from a file, Chunks of this size will
// be fed to the parser at once (10 MiB).
constinit inline std::atomic<size_t> FILE_BUFFER_SIZE = 10 * (1ul << 20);

constinit inline std::atomic<size_t> BUFFER_SIZE_JOIN_PATTERNS_WITH_OSP =
50'000;

Expand Down
18 changes: 14 additions & 4 deletions src/index/Index.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -181,13 +181,23 @@ ad_utility::MemorySize& Index::memoryLimitIndexBuilding() {
}

// ____________________________________________________________________________
ad_utility::MemorySize& Index::blocksizePermutationsPerColumn() {
return pimpl_->blocksizePermutationPerColumn();
const ad_utility::MemorySize& Index::memoryLimitIndexBuilding() const {
return std::as_const(*pimpl_).memoryLimitIndexBuilding();
}

// ____________________________________________________________________________
const ad_utility::MemorySize& Index::memoryLimitIndexBuilding() const {
return std::as_const(*pimpl_).memoryLimitIndexBuilding();
ad_utility::MemorySize& Index::parserBufferSize() {
return pimpl_->parserBufferSize();
}

// ____________________________________________________________________________
const ad_utility::MemorySize& Index::parserBufferSize() const {
return std::as_const(*pimpl_).parserBufferSize();
}

// ____________________________________________________________________________
ad_utility::MemorySize& Index::blocksizePermutationsPerColumn() {
return pimpl_->blocksizePermutationPerColumn();
}

// ____________________________________________________________________________
Expand Down
3 changes: 3 additions & 0 deletions src/index/Index.h
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,9 @@ class Index {
ad_utility::MemorySize& memoryLimitIndexBuilding();
const ad_utility::MemorySize& memoryLimitIndexBuilding() const;

ad_utility::MemorySize& parserBufferSize();
const ad_utility::MemorySize& parserBufferSize() const;

ad_utility::MemorySize& blocksizePermutationsPerColumn();

void setOnDiskBase(const std::string& onDiskBase);
Expand Down
15 changes: 11 additions & 4 deletions src/index/IndexBuilderMain.cpp
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
// Copyright 2014, University of Freiburg,
// Copyright 2014 - 2025 University of Freiburg
// Chair of Algorithms and Data Structures.
// Author:
// 2014-2017 Björn Buchhold (buchhold@informatik.uni-freiburg.de)
// 2018- Johannes Kalmbach (kalmbach@informatik.uni-freiburg.de)
// Authors: Björn Buchhold <[email protected]> [2014 - 2017]
// Johannes Kalmbach <kalmbach@cs.uni-freiburg.de>
// Hannah Bast <bast@cs.uni-freiburg.de>

#include <boost/program_options.hpp>
#include <cstdlib>
Expand Down Expand Up @@ -165,6 +165,7 @@ int main(int argc, char** argv) {
bool onlyPsoAndPos = false;
bool addWordsFromLiterals = false;
std::optional<ad_utility::MemorySize> stxxlMemory;
std::optional<ad_utility::MemorySize> parserBufferSize;
optind = 1;

Index index{ad_utility::makeUnlimitedAllocator<Id>()};
Expand Down Expand Up @@ -228,6 +229,9 @@ int main(int argc, char** argv) {
add("stxxl-memory,m", po::value(&stxxlMemory),
"The amount of memory in to use for sorting during the index build. "
"Decrease if the index builder runs out of memory.");
add("parser-buffer-size,b", po::value(&parserBufferSize),
"The size of the buffer used for parsing the input files. This must be "
"large enough to hold a single input triple. Default: 10 MB.");
add("keep-temporary-files,k", po::bool_switch(&keepTemporaryFiles),
"Do not delete temporary files from index creation for debugging.");

Expand All @@ -249,6 +253,9 @@ int main(int argc, char** argv) {
if (stxxlMemory.has_value()) {
index.memoryLimitIndexBuilding() = stxxlMemory.value();
}
if (parserBufferSize.has_value()) {
index.parserBufferSize() = parserBufferSize.value();
}

// If no text index name was specified, take the part of the wordsfile after
// the last slash.
Expand Down
5 changes: 3 additions & 2 deletions src/index/IndexImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,11 @@ IndexBuilderDataAsFirstPermutationSorter IndexImpl::createIdTriplesAndVocab(
std::unique_ptr<RdfParserBase> IndexImpl::makeRdfParser(
const std::vector<Index::InputFileSpecification>& files) const {
auto makeRdfParserImpl =
[&files]<int useCtre>() -> std::unique_ptr<RdfParserBase> {
[this, &files]<int useCtre>() -> std::unique_ptr<RdfParserBase> {
using TokenizerT =
std::conditional_t<useCtre == 1, TokenizerCtre, Tokenizer>;
return std::make_unique<RdfMultifileParser<TokenizerT>>(files);
return std::make_unique<RdfMultifileParser<TokenizerT>>(
files, this->parserBufferSize());
};

// `callFixedSize` litfts runtime integers to compile time integers. We use it
Expand Down
6 changes: 6 additions & 0 deletions src/index/IndexImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ class IndexImpl {
bool keepTempFiles_ = false;
ad_utility::MemorySize memoryLimitIndexBuilding_ =
DEFAULT_MEMORY_LIMIT_INDEX_BUILDING;
ad_utility::MemorySize parserBufferSize_ = DEFAULT_PARSER_BUFFER_SIZE;
ad_utility::MemorySize blocksizePermutationPerColumn_ =
UNCOMPRESSED_BLOCKSIZE_COMPRESSED_METADATA_PER_COLUMN;
json configurationJson_;
Expand Down Expand Up @@ -406,6 +407,11 @@ class IndexImpl {
return memoryLimitIndexBuilding_;
}

ad_utility::MemorySize& parserBufferSize() { return parserBufferSize_; }
const ad_utility::MemorySize& parserBufferSize() const {
return parserBufferSize_;
}

ad_utility::MemorySize& blocksizePermutationPerColumn() {
return blocksizePermutationPerColumn_;
}
Expand Down
16 changes: 8 additions & 8 deletions src/index/Vocabulary.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,14 +105,14 @@ class Vocabulary {
static constexpr bool isCompressed_ =
std::is_same_v<StringType, CompressedString>;

// If a word uses one of these language tags it will be internalized.
vector<std::string> internalizedLangs_{"en"};

// If a word starts with one of those prefixes, it will be externalized When
// a word matched both `externalizedPrefixes_` and `internalizedLangs_`, it
// will be externalized. Qlever-internal prefixes are currently not
// externalized.
vector<std::string> externalizedPrefixes_;
// If a literal uses one of these language tags or starts with one of these
// prefixes, it will be externalized. By default, everything is externalized.
// Both of these settings can be overridden using the `settings.json` file.
//
// NOTE: Qlever-internal prefixes are currently always internalized, no matter
// how `internalizedLangs_` and `externalizedPrefixes_` are set.
vector<std::string> internalizedLangs_;
vector<std::string> externalizedPrefixes_{""};

using UnderlyingVocabulary =
std::conditional_t<isCompressed_,
Expand Down
13 changes: 8 additions & 5 deletions src/parser/ParallelBuffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,11 +102,14 @@ ParallelBufferWithEndRegex::getNextBlock() {
if (!endPosition) {
if (rawBuffer_.getNextBlock()) {
throw std::runtime_error(absl::StrCat(
"The regex \"", endRegexAsString_,
"\" which marks the end of a statement was not found at "
"all within a single batch that was not the last one. Please "
"increase the FILE_BUFFER_SIZE "
"or set \"parallel-parsing: false\" in the settings file."));
"The regex ", endRegexAsString_,
" which marks the end of a statement was not found in the current "
"input batch (that was not the last one) of size ",
ad_utility::insertThousandSeparator(std::to_string(rawInput->size()),
','),
"; possible fixes are: "
"use `--parser-buffer-size` to increase the buffer size or "
"use `--parse-parallel false` to disable parallel parsing"));
}
endPosition = rawInput->size();
exhausted_ = true;
Expand Down
3 changes: 3 additions & 0 deletions src/parser/ParallelBuffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ class ParallelBuffer {
*/
virtual std::optional<BufferType> getNextBlock() = 0;

// Get the blocksize of this buffer.
size_t getBlocksize() const { return blocksize_; }

protected:
size_t blocksize_ = 100 * (2 << 20);
};
Expand Down
41 changes: 25 additions & 16 deletions src/parser/RdfParser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -785,6 +785,7 @@ template <class T>
bool RdfStreamParser<T>::resetStateAndRead(
RdfStreamParser::TurtleParserBackupState* bPtr) {
auto& b = *bPtr;
AD_CORRECTNESS_CHECK(fileBuffer_);
auto nextBytesOpt = fileBuffer_->getNextBlock();
if (!nextBytesOpt || nextBytesOpt.value().empty()) {
// there are no more decompressed bytes, just continue with what we've got
Expand Down Expand Up @@ -821,7 +822,8 @@ bool RdfStreamParser<T>::resetStateAndRead(
}

template <class T>
void RdfStreamParser<T>::initialize(const string& filename) {
void RdfStreamParser<T>::initialize(const string& filename,
ad_utility::MemorySize bufferSize) {
this->clear();
// Make sure that a block of data ends with a newline. This is important for
// two reasons:
Expand All @@ -834,10 +836,10 @@ void RdfStreamParser<T>::initialize(const string& filename) {
// The reason is that with a `.` at the end, we cannot decide whether we are
// in the middle of a `PN_LOCAL` (that continues in the next buffer) or at the
// end of a statement.
fileBuffer_ =
std::make_unique<ParallelBufferWithEndRegex>(bufferSize_, "([\\r\\n]+)");
fileBuffer_ = std::make_unique<ParallelBufferWithEndRegex>(
bufferSize.getBytes(), "([\\r\\n]+)");
fileBuffer_->open(filename);
byteVec_.resize(bufferSize_);
byteVec_.resize(bufferSize.getBytes());
// decompress the first block and initialize Tokenizer
if (auto res = fileBuffer_->getNextBlock(); res) {
byteVec_ = std::move(res.value());
Expand Down Expand Up @@ -998,7 +1000,7 @@ void RdfParallelParser<Tokenizer_T>::feedBatchesToParser(
inputBatch = std::move(remainingBatchFromInitialization);
first = false;
} else {
auto nextOptional = fileBuffer_.getNextBlock();
auto nextOptional = fileBuffer_->getNextBlock();
if (!nextOptional) {
return;
}
Expand Down Expand Up @@ -1026,10 +1028,13 @@ void RdfParallelParser<Tokenizer_T>::feedBatchesToParser(

// _______________________________________________________________________
template <typename Tokenizer_T>
void RdfParallelParser<Tokenizer_T>::initialize(const string& filename) {
void RdfParallelParser<Tokenizer_T>::initialize(
const string& filename, ad_utility::MemorySize bufferSize) {
fileBuffer_ = std::make_unique<ParallelBufferWithEndRegex>(
bufferSize.getBytes(), "\\.[\\t ]*([\\r\\n]+)");
ParallelBuffer::BufferType remainingBatchFromInitialization;
fileBuffer_.open(filename);
if (auto batch = fileBuffer_.getNextBlock(); !batch) {
fileBuffer_->open(filename);
if (auto batch = fileBuffer_->getNextBlock(); !batch) {
LOG(WARN) << "Empty input to the TURTLE parser, is this what you intended?"
<< std::endl;
} else {
Expand Down Expand Up @@ -1109,7 +1114,8 @@ RdfParallelParser<T>::~RdfParallelParser() {
// file is to be parsed in parallel.
template <typename TokenizerT>
static std::unique_ptr<RdfParserBase> makeSingleRdfParser(
const Index::InputFileSpecification& file) {
const Index::InputFileSpecification& file,
ad_utility::MemorySize bufferSize) {
auto graph = [file]() -> TripleComponent {
if (file.defaultGraph_.has_value()) {
return TripleComponent::Iri::fromIrirefWithoutBrackets(
Expand All @@ -1118,7 +1124,7 @@ static std::unique_ptr<RdfParserBase> makeSingleRdfParser(
return qlever::specialIds().at(DEFAULT_GRAPH_IRI);
}
};
auto makeRdfParserImpl = [&filename = file.filename_,
auto makeRdfParserImpl = [&filename = file.filename_, &bufferSize,
&graph]<int useParallel, int isTurtleInput>()
-> std::unique_ptr<RdfParserBase> {
using InnerParser =
Expand All @@ -1127,7 +1133,7 @@ static std::unique_ptr<RdfParserBase> makeSingleRdfParser(
using Parser =
std::conditional_t<useParallel == 1, RdfParallelParser<InnerParser>,
RdfStreamParser<InnerParser>>;
return std::make_unique<Parser>(filename, graph());
return std::make_unique<Parser>(filename, bufferSize, graph());
};

// The call to `callFixedSize` lifts runtime integers to compile time
Expand All @@ -1142,13 +1148,15 @@ static std::unique_ptr<RdfParserBase> makeSingleRdfParser(
// ______________________________________________________________
template <typename T>
RdfMultifileParser<T>::RdfMultifileParser(
const std::vector<qlever::InputFileSpecification>& files) {
const std::vector<qlever::InputFileSpecification>& files,
ad_utility::MemorySize bufferSize) {
using namespace qlever;
// This lambda parses a single file and pushes the results and all occurring
// exceptions to the `finishedBatchQueue_`.
auto parseFile = [this](const InputFileSpecification& file) {
auto parseFile = [this](const InputFileSpecification& file,
ad_utility::MemorySize bufferSize) {
try {
auto parser = makeSingleRdfParser<Tokenizer>(file);
auto parser = makeSingleRdfParser<Tokenizer>(file, bufferSize);
while (auto batch = parser->getBatch()) {
bool active = finishedBatchQueue_.push(std::move(batch.value()));
if (!active) {
Expand All @@ -1169,10 +1177,11 @@ RdfMultifileParser<T>::RdfMultifileParser(
};

// Feed all the input files to the `parsingQueue_`.
auto makeParsers = [files, this, parseFile]() {
auto makeParsers = [files, bufferSize, this, parseFile]() {
for (const auto& file : files) {
numActiveParsers_++;
bool active = parsingQueue_.push(std::bind_front(parseFile, file));
bool active =
parsingQueue_.push(std::bind_front(parseFile, file, bufferSize));
if (!active) {
// The queue was finished prematurely, stop this thread. This is
// important to avoid deadlocks.
Expand Down
Loading
Loading