From 36cdbd4b07d781987af17822e623067d28d41500 Mon Sep 17 00:00:00 2001 From: Hannah Bast Date: Fri, 21 Feb 2025 03:18:10 +0100 Subject: [PATCH] Revert "Better error message on parallel turtle parsing ... (#1807)" (#1827) This reverts commit 8678731edf09615c066f682b53d9660784388529, which breaks the index build, see https://github.com/ad-freiburg/qlever-control/issues/139 --- src/engine/GraphStoreProtocol.cpp | 1 - src/index/IndexImpl.cpp | 2 - src/parser/RdfEscaping.h | 3 ++ src/parser/RdfParser.cpp | 62 +++++++++---------------------- src/parser/RdfParser.h | 57 ++++++++++++++-------------- src/parser/Tokenizer.h | 7 ++-- src/parser/TokenizerCtre.h | 2 - src/util/TaskQueue.h | 5 --- test/DeltaTriplesTest.cpp | 3 +- test/IndexTest.cpp | 2 +- test/RdfParserTest.cpp | 45 +--------------------- 11 files changed, 55 insertions(+), 134 deletions(-) diff --git a/src/engine/GraphStoreProtocol.cpp b/src/engine/GraphStoreProtocol.cpp index a708f5e948..fc46ec6fa0 100644 --- a/src/engine/GraphStoreProtocol.cpp +++ b/src/engine/GraphStoreProtocol.cpp @@ -4,7 +4,6 @@ #include "engine/GraphStoreProtocol.h" -#include "parser/Tokenizer.h" #include "util/http/beast.h" // ____________________________________________________________________________ diff --git a/src/index/IndexImpl.cpp b/src/index/IndexImpl.cpp index 3b2cc58d1a..1b0c762256 100644 --- a/src/index/IndexImpl.cpp +++ b/src/index/IndexImpl.cpp @@ -21,8 +21,6 @@ #include "index/IndexFormatVersion.h" #include "index/VocabularyMerger.h" #include "parser/ParallelParseBuffer.h" -#include "parser/Tokenizer.h" -#include "parser/TokenizerCtre.h" #include "util/BatchedPipeline.h" #include "util/CachingMemoryResource.h" #include "util/HashMap.h" diff --git a/src/parser/RdfEscaping.h b/src/parser/RdfEscaping.h index 29d69aa2b4..36bbcd74f7 100644 --- a/src/parser/RdfEscaping.h +++ b/src/parser/RdfEscaping.h @@ -5,12 +5,15 @@ #ifndef QLEVER_RDFESCAPING_H #define QLEVER_RDFESCAPING_H +#include + #include #include #include "global/TypedIndex.h" #include "parser/NormalizedString.h" #include "util/Exception.h" +#include "util/HashSet.h" #include "util/StringUtils.h" namespace RdfEscaping { diff --git a/src/parser/RdfParser.cpp b/src/parser/RdfParser.cpp index fdca43e3e1..c100a4b1dc 100644 --- a/src/parser/RdfParser.cpp +++ b/src/parser/RdfParser.cpp @@ -15,11 +15,10 @@ #include "global/Constants.h" #include "parser/GeoPoint.h" #include "parser/NormalizedString.h" -#include "parser/Tokenizer.h" -#include "parser/TokenizerCtre.h" +#include "parser/RdfEscaping.h" +#include "util/Conversions.h" #include "util/DateYearDuration.h" #include "util/OnDestructionDontThrowDuringStackUnwinding.h" -#include "util/TransparentFunctors.h" using namespace std::chrono_literals; // _______________________________________________________________ @@ -32,17 +31,7 @@ bool TurtleParser::statement() { // ______________________________________________________________ template bool TurtleParser::directive() { - bool successfulParse = prefixID() || base() || sparqlPrefix() || sparqlBase(); - if (successfulParse && prefixAndBaseDisabled_) { - raise( - "@prefix or @base directives need to be at the beginning of the file " - "when using the parallel parser. Use '--parse-parallel false' if you " - "can't guarantee this. If the reason for this error is that the input " - "is a concatenation of Turtle files, each of which has the prefixes at " - "the beginning, you should feed the files to QLever separately instead " - "of concatenated"); - } - return successfulParse; + return prefixID() || base() || sparqlPrefix() || sparqlBase(); } // ________________________________________________________________ @@ -641,7 +630,7 @@ bool TurtleParser::iri() { // _____________________________________________________________________ template bool TurtleParser::prefixedName() { - if constexpr (T::UseRelaxedParsing) { + if constexpr (UseRelaxedParsing) { if (!(pnameLnRelaxed() || pnameNS())) { return false; } @@ -756,7 +745,7 @@ bool TurtleParser::iriref() { // In relaxed mode, that is all we check. Otherwise, we check if the IRI is // standard-compliant. If not, we output a warning and try to parse it in a // more relaxed way. - if constexpr (T::UseRelaxedParsing) { + if constexpr (UseRelaxedParsing) { tok_.remove_prefix(endPos + 1); lastParseResult_ = TripleComponent::Iri::fromIrirefConsiderBase( view.substr(0, endPos + 1), baseForRelativeIri(), baseForAbsoluteIri()); @@ -959,20 +948,20 @@ bool RdfStreamParser::getLineImpl(TurtleTriple* triple) { // `parallelParser_` have been fully processed. After the last batch we will // push another call to this lambda to the `parallelParser_` which will then // finish the `tripleCollector_` as soon as all batches have been computed. -template -void RdfParallelParser::finishTripleCollectorIfLastBatch() { +template +void RdfParallelParser::finishTripleCollectorIfLastBatch() { if (batchIdx_.fetch_add(1) == numBatchesTotal_) { tripleCollector_.finish(); } } // __________________________________________________________________________________ -template -void RdfParallelParser::parseBatch(size_t parsePosition, auto batch) { +template +void RdfParallelParser::parseBatch(size_t parsePosition, + auto batch) { try { - RdfStringParser parser{defaultGraphIri_}; + RdfStringParser parser{defaultGraphIri_}; parser.prefixMap_ = this->prefixMap_; - parser.disablePrefixParsing(); parser.setPositionOffset(parsePosition); parser.setInputStream(std::move(batch)); // TODO: raise error message if a prefix parsing fails; @@ -983,15 +972,14 @@ void RdfParallelParser::parseBatch(size_t parsePosition, auto batch) { }); finishTripleCollectorIfLastBatch(); } catch (std::exception& e) { - errorMessages_.wlock()->emplace_back(parsePosition, e.what()); tripleCollector_.pushException(std::current_exception()); parallelParser_.finish(); } }; // _______________________________________________________________________ -template -void RdfParallelParser::feedBatchesToParser( +template +void RdfParallelParser::feedBatchesToParser( auto remainingBatchFromInitialization) { bool first = true; size_t parsePosition = 0; @@ -1031,15 +1019,14 @@ void RdfParallelParser::feedBatchesToParser( } } } catch (std::exception& e) { - errorMessages_.wlock()->emplace_back(parsePosition, e.what()); tripleCollector_.pushException(std::current_exception()); } }; // _______________________________________________________________________ -template -void RdfParallelParser::initialize(const string& filename, - ad_utility::MemorySize bufferSize) { +template +void RdfParallelParser::initialize( + const string& filename, ad_utility::MemorySize bufferSize) { fileBuffer_ = std::make_unique( bufferSize.getBytes(), "\\.[\\t ]*([\\r\\n]+)"); ParallelBuffer::BufferType remainingBatchFromInitialization; @@ -1048,7 +1035,7 @@ void RdfParallelParser::initialize(const string& filename, LOG(WARN) << "Empty input to the TURTLE parser, is this what you intended?" << std::endl; } else { - RdfStringParser declarationParser{}; + RdfStringParser declarationParser{}; declarationParser.setInputStream(std::move(batch.value())); while (declarationParser.parseDirectiveManually()) { } @@ -1075,20 +1062,7 @@ bool RdfParallelParser::getLineImpl(TurtleTriple* triple) { // contains no triples. (Theoretically this might happen, and it is safer this // way) while (triples_.empty()) { - auto optionalTripleTask = [&]() { - try { - return tripleCollector_.pop(); - } catch (const std::exception&) { - // In case of multiple errors in parallel batches, we always report the - // first error. - parallelParser_.waitUntilFinished(); - auto errors = std::move(*errorMessages_.wlock()); - const auto& firstError = - ql::ranges::min_element(errors, {}, ad_utility::first); - AD_CORRECTNESS_CHECK(firstError != errors.end()); - throw std::runtime_error{firstError->second}; - } - }(); + auto optionalTripleTask = tripleCollector_.pop(); if (!optionalTripleTask) { // Everything has been parsed return false; diff --git a/src/parser/RdfParser.h b/src/parser/RdfParser.h index d8aced692e..1fb36871c9 100644 --- a/src/parser/RdfParser.h +++ b/src/parser/RdfParser.h @@ -4,29 +4,35 @@ #pragma once -#include #include +#include +#include +#include #include #include -#include #include +#include "absl/strings/str_cat.h" #include "global/Constants.h" #include "global/SpecialIds.h" #include "index/ConstantsIndexBuilding.h" #include "index/InputFileSpecification.h" #include "parser/ParallelBuffer.h" +#include "parser/Tokenizer.h" +#include "parser/TokenizerCtre.h" #include "parser/TripleComponent.h" -#include "parser/TurtleTokenId.h" #include "parser/data/BlankNode.h" #include "util/Exception.h" +#include "util/File.h" #include "util/HashMap.h" #include "util/Log.h" #include "util/ParseException.h" #include "util/TaskQueue.h" #include "util/ThreadSafeQueue.h" +using std::string; + enum class TurtleParserIntegerOverflowBehavior { Error, OverflowingToDouble, @@ -120,6 +126,10 @@ class TurtleParser : public RdfParserBase { public: using ParseException = ::ParseException; + // The CTRE Tokenizer implies relaxed parsing. + static constexpr bool UseRelaxedParsing = + std::is_same_v; + // Get the result of the single rule that was parsed most recently. Used for // testing. const TripleComponent& getLastParseResult() const { return lastParseResult_; } @@ -194,10 +204,10 @@ class TurtleParser : public RdfParserBase { // Getters for the two base prefixes. Without BASE declaration, these will // both return the empty IRI. - const TripleComponent::Iri& baseForRelativeIri() const { + const TripleComponent::Iri& baseForRelativeIri() { return prefixMap_.at(baseForRelativeIriKey_); } - const TripleComponent::Iri& baseForAbsoluteIri() const { + const TripleComponent::Iri& baseForAbsoluteIri() { return prefixMap_.at(baseForAbsoluteIriKey_); } @@ -216,8 +226,6 @@ class TurtleParser : public RdfParserBase { static inline std::atomic numParsers_ = 0; size_t blankNodePrefix_ = numParsers_.fetch_add(1); - bool prefixAndBaseDisabled_ = false; - public: TurtleParser() = default; explicit TurtleParser(TripleComponent defaultGraphIri) @@ -392,7 +400,7 @@ class TurtleParser : public RdfParserBase { } // create a new, unused, unique blank node string - std::string createAnonNode() { + string createAnonNode() { return BlankNode{true, absl::StrCat(blankNodePrefix_, "_", numBlankNodes_++)} .toSparql(); @@ -471,7 +479,9 @@ CPP_template(typename Parser)( return positionOffset_ + tmpToParse_.size() - this->tok_.data().size(); } - void initialize(const std::string&, ad_utility::MemorySize) { + void initialize(const string& filename, ad_utility::MemorySize bufferSize) { + (void)filename; + (void)bufferSize; throw std::runtime_error( "RdfStringParser doesn't support calls to initialize. Only use " "parseUtf8String() for unit tests\n"); @@ -524,7 +534,7 @@ CPP_template(typename Parser)( // testing interface for reusing a parser // only specifies the tokenizers input stream. // Does not alter the tokenizers state - void setInputStream(const std::string& toParse) { + void setInputStream(const string& toParse) { tmpToParse_.clear(); tmpToParse_.reserve(toParse.size()); tmpToParse_.insert(tmpToParse_.end(), toParse.begin(), toParse.end()); @@ -545,9 +555,6 @@ CPP_template(typename Parser)( // as expected size_t getPosition() const { return this->tok_.begin() - tmpToParse_.data(); } - // Disable prefix parsing for turtle parsers during parallel parsing. - void disablePrefixParsing() { this->prefixAndBaseDisabled_ = true; } - FRIEND_TEST(RdfParserTest, prefixedName); FRIEND_TEST(RdfParserTest, prefixID); FRIEND_TEST(RdfParserTest, stringParse); @@ -583,7 +590,7 @@ class RdfStreamParser : public Parser { // Default construction needed for tests RdfStreamParser() = default; explicit RdfStreamParser( - const std::string& filename, + const string& filename, ad_utility::MemorySize bufferSize = DEFAULT_PARSER_BUFFER_SIZE, TripleComponent defaultGraphIri = qlever::specialIds().at(DEFAULT_GRAPH_IRI)) @@ -595,8 +602,7 @@ class RdfStreamParser : public Parser { bool getLineImpl(TurtleTriple* triple) override; - void initialize(const std::string& filename, - ad_utility::MemorySize bufferSize); + void initialize(const string& filename, ad_utility::MemorySize bufferSize); size_t getParsePosition() const override { return numBytesBeforeCurrentBatch_ + (tok_.data().data() - byteVec_.data()); @@ -638,7 +644,7 @@ class RdfStreamParser : public Parser { template class RdfParallelParser : public Parser { public: - using Triple = std::array; + using Triple = std::array; // Default construction needed for tests RdfParallelParser() = default; @@ -646,7 +652,7 @@ class RdfParallelParser : public Parser { // parser will sleep for the specified time before parsing each batch s.t. // certain corner cases can be tested. explicit RdfParallelParser( - const std::string& filename, + const string& filename, ad_utility::MemorySize bufferSize = DEFAULT_PARSER_BUFFER_SIZE, std::chrono::milliseconds sleepTimeForTesting = std::chrono::milliseconds{0}) @@ -659,8 +665,7 @@ class RdfParallelParser : public Parser { } // Construct a parser from a file and a given default graph iri. - RdfParallelParser(const std::string& filename, - ad_utility::MemorySize bufferSize, + RdfParallelParser(const string& filename, ad_utility::MemorySize bufferSize, const TripleComponent& defaultGraphIri) : Parser{defaultGraphIri}, defaultGraphIri_{defaultGraphIri} { initialize(filename, bufferSize); @@ -678,8 +683,7 @@ class RdfParallelParser : public Parser { parallelParser_.resetTimers(); } - void initialize(const std::string& filename, - ad_utility::MemorySize bufferSize); + void initialize(const string& filename, ad_utility::MemorySize bufferSize); size_t getParsePosition() const override { // TODO: can we really define this position here? @@ -716,12 +720,6 @@ class RdfParallelParser : public Parser { QUEUE_SIZE_BEFORE_PARALLEL_PARSING, NUM_PARALLEL_PARSER_THREADS, "parallel parser"}; std::future parseFuture_; - - // Collect error messages in case of multiple failures. The `size_t` is the - // start position of the corresponding batch, used to order the errors in case - // the batches are finished out of order. - ad_utility::Synchronized>> - errorMessages_; // The parallel parsers need to know when the last batch has been parsed, s.t. // the parser threads can be destroyed. The following two members are needed // for keeping track of this condition. @@ -781,8 +779,7 @@ class RdfMultifileParser : public RdfParserBase { // `parsingQueue_` is declared *after* the `finishedBatchQueue_`, s.t. when // destroying the parser, the threads from the `parsingQueue_` are all joined // before the `finishedBatchQueue_` (which they are using!) is destroyed. - ad_utility::TaskQueue parsingQueue_{QUEUE_SIZE_BEFORE_PARALLEL_PARSING, - NUM_PARALLEL_PARSER_THREADS}; + ad_utility::TaskQueue parsingQueue_{10, NUM_PARALLEL_PARSER_THREADS}; // The number of parsers that have started, but not yet finished. This is // needed to detect the complete parsing. diff --git a/src/parser/Tokenizer.h b/src/parser/Tokenizer.h index 6d971ccba8..a8dc50d0ac 100644 --- a/src/parser/Tokenizer.h +++ b/src/parser/Tokenizer.h @@ -7,7 +7,10 @@ #include #include +#include + #include "parser/TurtleTokenId.h" +#include "util/Exception.h" #include "util/Log.h" using re2::RE2; @@ -237,7 +240,7 @@ struct SkipWhitespaceAndCommentsMixin { auto v = self().view(); if (v.starts_with('#')) { auto pos = v.find('\n'); - if (pos == std::string::npos) { + if (pos == string::npos) { // TODO: This should rather yield an error. LOG(INFO) << "Warning, unfinished comment found while parsing" << std::endl; @@ -270,8 +273,6 @@ class Tokenizer : public SkipWhitespaceAndCommentsMixin { Tokenizer(std::string_view input) : _tokens(), _data(input.data(), input.size()) {} - static constexpr bool UseRelaxedParsing = false; - // if a prefix of the input stream matches the regex argument, // return true and that prefix and move the input stream forward // by the length of the match. If no match is found, diff --git a/src/parser/TokenizerCtre.h b/src/parser/TokenizerCtre.h index cd1f81cbe5..28c2e48731 100644 --- a/src/parser/TokenizerCtre.h +++ b/src/parser/TokenizerCtre.h @@ -154,8 +154,6 @@ class TokenizerCtre : public SkipWhitespaceAndCommentsMixin { */ explicit TokenizerCtre(std::string_view data) : _data(data) {} - static constexpr bool UseRelaxedParsing = true; - /// iterator to the next character that we have not yet consumed [[nodiscard]] auto begin() const { return _data.begin(); } diff --git a/src/util/TaskQueue.h b/src/util/TaskQueue.h index caf1147649..c18b0a4e8d 100644 --- a/src/util/TaskQueue.h +++ b/src/util/TaskQueue.h @@ -120,11 +120,6 @@ class TaskQueue { std::to_string(popTime_) + "ms (pop)"; } - // Block the current thread until `finish()` on the queue has been called and - // successfully completed. This function may NOT be called from inside a queue - // thread, otherwise there will be a deadlock. - void waitUntilFinished() const { finishedFinishing_.wait(false); } - ~TaskQueue() { if (startedFinishing_.test_and_set()) { // Someone has already called `finish`, we have to wait for the finishing diff --git a/test/DeltaTriplesTest.cpp b/test/DeltaTriplesTest.cpp index eb906be6d8..3b4564f483 100644 --- a/test/DeltaTriplesTest.cpp +++ b/test/DeltaTriplesTest.cpp @@ -4,18 +4,17 @@ // 2023 Hannah Bast // 2024 Julian Mundhahs -#include #include #include "./DeltaTriplesTestHelpers.h" #include "./util/GTestHelpers.h" #include "./util/IndexTestHelpers.h" +#include "absl/strings/str_split.h" #include "engine/ExportQueryExecutionTrees.h" #include "index/DeltaTriples.h" #include "index/IndexImpl.h" #include "index/Permutation.h" #include "parser/RdfParser.h" -#include "parser/Tokenizer.h" using namespace deltaTriplesTestHelpers; diff --git a/test/IndexTest.cpp b/test/IndexTest.cpp index 3c81f18cf3..68cee86b02 100644 --- a/test/IndexTest.cpp +++ b/test/IndexTest.cpp @@ -14,12 +14,12 @@ #include "./util/IdTableHelpers.h" #include "./util/IdTestHelpers.h" #include "./util/TripleComponentTestHelpers.h" +#include "global/Pattern.h" #include "index/Index.h" #include "index/IndexImpl.h" #include "util/IndexTestHelpers.h" using namespace ad_utility::testing; -using namespace std::string_literals; using ::testing::UnorderedElementsAre; diff --git a/test/RdfParserTest.cpp b/test/RdfParserTest.cpp index 7323deef9d..780952acfe 100644 --- a/test/RdfParserTest.cpp +++ b/test/RdfParserTest.cpp @@ -13,9 +13,8 @@ #include "gmock/gmock.h" #include "gtest/gtest.h" #include "parser/RdfParser.h" -#include "parser/Tokenizer.h" -#include "parser/TokenizerCtre.h" #include "parser/TripleComponent.h" +#include "util/Conversions.h" #include "util/MemorySize/MemorySize.h" using std::string; @@ -1003,48 +1002,6 @@ TEST(RdfParserTest, exceptionPropagationFileBufferReading) { forAllParallelParsers(testWithParser, 40_B, inputWithLongTriple); } -// Test that in parallel parsing scattered prefixes or base declarations lead to -// an exception -TEST(RdfParserTest, exceptionOnScatteredPrefixOrBaseInParallelParser) { - std::string filename{"turtleParserExceptionPropagationFileBufferReading.dat"}; - auto testWithParser = [&](bool useBatchInterface, - ad_utility::MemorySize bufferSize, - std::string_view input) { - { - auto of = ad_utility::makeOfstream(filename); - of << input; - } - AD_EXPECT_THROW_WITH_MESSAGE( - (parseFromFile(filename, useBatchInterface, bufferSize)), - ::testing::HasSubstr("'--parse-parallel false'")); - ad_utility::deleteFile(filename); - }; - std::string inputWithScatteredPrefix = - "@prefix ex: . \n" - " . \n " - " . \n" - "@prefix ex: . \n"; - forAllParallelParsers(testWithParser, 40_B, inputWithScatteredPrefix); - std::string inputWithScatteredSparqlPrefix = - "PREFIX ex: . \n" - " . \n " - " . \n" - "PREFIX ex: . \n"; - forAllParallelParsers(testWithParser, 40_B, inputWithScatteredPrefix); - std::string inputWithScatteredBase = - "@base . \n" - " . \n " - " . \n" - "@base . \n"; - forAllParallelParsers(testWithParser, 40_B, inputWithScatteredPrefix); - std::string inputWithScatteredSparqlBase = - "BASE . \n" - " . \n " - " . \n" - "BASE . \n"; - forAllParallelParsers(testWithParser, 40_B, inputWithScatteredPrefix); -} - // Test that the parallel parser's destructor can be run quickly and without // blocking, even when there are still lots of blocks in the pipeline that are // currently being parsed.