Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Explicitly named query #1739

Draft
wants to merge 9 commits into
base: master
Choose a base branch
from
2 changes: 1 addition & 1 deletion benchmark/GroupByHashMapBenchmark.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@
#include <random>

#include "../benchmark/infrastructure/Benchmark.h"
#include "../test/engine/ValuesForTesting.h"
#include "../test/util/IdTableHelpers.h"
#include "../test/util/IndexTestHelpers.h"
#include "engine/GroupBy.h"
#include "engine/Sort.h"
#include "engine/Values.h"
#include "engine/ValuesForTesting.h"
#include "engine/sparqlExpressions/AggregateExpression.h"
#include "engine/sparqlExpressions/GroupConcatExpression.h"
#include "engine/sparqlExpressions/LiteralExpression.h"
Expand Down
2 changes: 1 addition & 1 deletion src/engine/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,5 @@ add_library(engine
TextLimit.cpp LazyGroupBy.cpp GroupByHashMapOptimization.cpp SpatialJoin.cpp
CountConnectedSubgraphs.cpp SpatialJoinAlgorithms.cpp PathSearch.cpp ExecuteUpdate.cpp
Describe.cpp GraphStoreProtocol.cpp
QueryExecutionContext.cpp ExistsJoin.cpp)
QueryExecutionContext.cpp ExistsJoin.cpp NamedQueryCache.cpp)
qlever_target_link_libraries(engine util index parser sparqlExpressions http SortPerformanceEstimator Boost::iostreams s2)
8 changes: 5 additions & 3 deletions src/engine/CheckUsePatternTrick.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,11 @@ bool isVariableContainedInGraphPatternOperation(
} else if constexpr (std::is_same_v<T, p::Service>) {
return ad_utility::contains(arg.visibleVariables_, variable);
} else {
static_assert(
std::is_same_v<T, p::TransPath> || std::is_same_v<T, p::PathQuery> ||
std::is_same_v<T, p::Describe> || std::is_same_v<T, p::SpatialQuery>);
static_assert(std::is_same_v<T, p::TransPath> ||
std::is_same_v<T, p::PathQuery> ||
std::is_same_v<T, p::Describe> ||
std::is_same_v<T, p::SpatialQuery> ||
std::is_same_v<T, p::NamedCachedQuery>);
// The `TransPath` is set up later in the query planning, when this
// function should not be called anymore.
AD_FAIL();
Expand Down
2 changes: 1 addition & 1 deletion src/engine/Describe.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@

#include "engine/Describe.h"

#include "../../test/engine/ValuesForTesting.h"
#include "engine/IndexScan.h"
#include "engine/Join.h"
#include "engine/ValuesForTesting.h"

// _____________________________________________________________________________
Describe::Describe(QueryExecutionContext* qec,
Expand Down
40 changes: 40 additions & 0 deletions src/engine/NamedQueryCache.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Copyright 2025, University of Freiburg,
// Chair of Algorithms and Data Structures.
// Author: Johannes Kalmbach <[email protected]>

#include "engine/NamedQueryCache.h"

// _____________________________________________________________________________
std::shared_ptr<ValuesForTesting> NamedQueryCache ::getOperation(
const Key& key, QueryExecutionContext* ctx) {
const auto& ptr = get(key);
const auto& [table, map, sortedOn] = *ptr;
// TODO<joka921> Add a local vocab, and consider also passing a shared_ptr for
// the local vocab.
return std::make_shared<ValuesForTesting>(ctx, table, map, sortedOn);
}

// _____________________________________________________________________________
auto NamedQueryCache::get(const Key& key) -> std::shared_ptr<const Value> {
auto l = cache_.wlock();
if (!l->contains(key)) {
throw std::runtime_error{
absl::StrCat("The named query with the name \"", key,
"\" was not pinned to the named query cache")};
}
return (*l)[key];
}

// _____________________________________________________________________________
void NamedQueryCache::store(const Key& key, Value value) {
// TODO<joka921> Check the overwrite semantics of the cache class.
cache_.wlock()->insert(key, std::move(value));
}

// _____________________________________________________________________________
void NamedQueryCache::clear() { cache_.wlock()->clearAll(); }

// _____________________________________________________________________________
size_t NamedQueryCache::numEntries() const {
return cache_.rlock()->numNonPinnedEntries();
}
55 changes: 55 additions & 0 deletions src/engine/NamedQueryCache.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
// Copyright 2025, University of Freiburg,
// Chair of Algorithms and Data Structures.
// Author: Johannes Kalmbach <[email protected]>
#pragma once

#include "engine/ValuesForTesting.h"
#include "util/Cache.h"
#include "util/Synchronized.h"

// A simple thread-safe cache that associates query results with an explicit
// name.
class NamedQueryCache {
public:
// The cache value. It stores all the information required to construct a
// proper `QueryExecutionTree` later on.
// TODO<joka921> We definitely need the local vocab here...
struct Value {
std::shared_ptr<const IdTable> result_;
VariableToColumnMap varToColMap_;
std::vector<ColumnIndex> resultSortedOn_;
};

// TODO<joka921> Use a better size getter for better statistics.
struct ValueSizeGetter {
ad_utility::MemorySize operator()(const Value&) {
return ad_utility::MemorySize::bytes(1);
}
};
using Key = std::string;
using Cache = ad_utility::LRUCache<Key, Value, ValueSizeGetter>;

private:
ad_utility::Synchronized<Cache> cache_;

public:
// Store an explicit query result with a given `key`. Previously stored
// `value`s with the same `key` are overwritten.
void store(const Key& key, Value value);

// Clear the cache.
void clear();

// Get the number of cached queries.
size_t numEntries() const;

// Retrieve the query result that is associated with the `key`.
// Throw an exception if the `key` doesn't exist.
std::shared_ptr<const Value> get(const Key& key);

// Retrieve the query result with the given `key` and convert it into an
// explicit `ValuesForTesting` operation that can be used as part of a
// `QueryExecutionTree`.
std::shared_ptr<ValuesForTesting> getOperation(const Key& key,
QueryExecutionContext* ctx);
};
24 changes: 22 additions & 2 deletions src/engine/Operation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@

#include "engine/Operation.h"

#include <absl/cleanup/cleanup.h>

#include "engine/NamedQueryCache.h"
#include "engine/QueryExecutionTree.h"
#include "global/RuntimeParameters.h"
#include "util/OnDestructionDontThrowDuringStackUnwinding.h"
Expand Down Expand Up @@ -293,6 +292,12 @@ std::shared_ptr<const Result> Operation::getResult(
_executionContext->_pinResult && isRoot;
const bool pinResult =
_executionContext->_pinSubtrees || pinFinalResultButNotSubtrees;
const bool pinWithName =
_executionContext->pinWithExplicitName().has_value() && isRoot;

if (pinWithName) {
computationMode = ComputationMode::FULLY_MATERIALIZED;
}

try {
// In case of an exception, create the correct runtime info, no matter which
Expand Down Expand Up @@ -339,6 +344,21 @@ std::shared_ptr<const Result> Operation::getResult(
updateRuntimeInformationOnSuccess(result, timer.msecs());
}

if (pinWithName) {
const auto& name = _executionContext->pinWithExplicitName().value();
// The query is to be pinned in the named cache. In this case we don't
// return the result, but only pin it.
const auto& actualResult = result._resultPointer->resultTable();
AD_CORRECTNESS_CHECK(actualResult.isFullyMaterialized());
// TODO<joka921> probably we don't need to `clone()` the IdTable here.
auto t = NamedQueryCache::Value(
std::make_shared<const IdTable>(actualResult.idTable().clone()),
getExternallyVisibleVariableColumns(), actualResult.sortedBy());
_executionContext->namedQueryCache().store(name, std::move(t));

runtimeInfo().addDetail("pinned-with-explicit-name", name);
}

return result._resultPointer->resultTablePtr();
} catch (ad_utility::CancellationException& e) {
e.setOperation(getDescriptor());
Expand Down
19 changes: 18 additions & 1 deletion src/engine/QueryExecutionContext.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright 2025, University of Freiburg,
// Chair of Algorithms and Data Structures.
// Author: Robin Textor-Falconi <[email protected]>
// Authors: Robin Textor-Falconi <[email protected]>
// Johannes Kalmbach <[email protected]>

#include "engine/QueryExecutionContext.h"

Expand All @@ -9,3 +10,19 @@
bool QueryExecutionContext::areWebSocketUpdatesEnabled() {
return RuntimeParameters().get<"websocket-updates-enabled">();
}
// _____________________________________________________________________________
QueryExecutionContext::QueryExecutionContext(
const Index& index, QueryResultCache* const cache,
ad_utility::AllocatorWithLimit<Id> allocator,
SortPerformanceEstimator sortPerformanceEstimator,
NamedQueryCache* namedCache,
std::function<void(std::string)> updateCallback, const bool pinSubtrees,
const bool pinResult)
: _pinSubtrees(pinSubtrees),
_pinResult(pinResult),
_index(index),
_subtreeCache(cache),
_allocator(std::move(allocator)),
_sortPerformanceEstimator(sortPerformanceEstimator),
updateCallback_(std::move(updateCallback)),
namedQueryCache_{namedCache} {}
31 changes: 20 additions & 11 deletions src/engine/QueryExecutionContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ class CacheValue {
};
};

// Forward declaration because of cyclic dependencies
class NamedQueryCache;

// The key for the `QueryResultCache` below. It consists of a `string` (the
// actual cache key of a `QueryExecutionTree` and the index of the
// `LocatedTriplesSnapshot` that was used to create the corresponding value.
Expand All @@ -86,6 +89,9 @@ struct QueryCacheKey {
using QueryResultCache = ad_utility::ConcurrentCache<
ad_utility::LRUCache<QueryCacheKey, CacheValue, CacheValue::SizeGetter>>;

// Forward declaration because of cyclic dependency
class NamedQueryCache;

// Execution context for queries.
// Holds references to index and engine, implements caching.
class QueryExecutionContext {
Expand All @@ -94,17 +100,10 @@ class QueryExecutionContext {
const Index& index, QueryResultCache* const cache,
ad_utility::AllocatorWithLimit<Id> allocator,
SortPerformanceEstimator sortPerformanceEstimator,
NamedQueryCache* namedCache,
std::function<void(std::string)> updateCallback =
[](std::string) { /* No-op by default for testing */ },
const bool pinSubtrees = false, const bool pinResult = false)
: _pinSubtrees(pinSubtrees),
_pinResult(pinResult),
_index(index),
_subtreeCache(cache),
_allocator(std::move(allocator)),
_costFactors(),
_sortPerformanceEstimator(sortPerformanceEstimator),
updateCallback_(std::move(updateCallback)) {}
bool pinSubtrees = false, bool pinResult = false);

QueryResultCache& getQueryTreeCache() { return *_subtreeCache; }

Expand Down Expand Up @@ -148,10 +147,16 @@ class QueryExecutionContext {
return areWebsocketUpdatesEnabled_;
}

private:
static bool areWebSocketUpdatesEnabled();
NamedQueryCache& namedQueryCache() {
AD_CORRECTNESS_CHECK(namedQueryCache_ != nullptr);
return *namedQueryCache_;
}

auto& pinWithExplicitName() { return pinWithExplicitName_; }
const auto& pinWithExplicitName() const { return pinWithExplicitName_; }

private:
static bool areWebSocketUpdatesEnabled();
const Index& _index;

// When the `QueryExecutionContext` is constructed, get a stable read-only
Expand All @@ -169,4 +174,8 @@ class QueryExecutionContext {
// Cache the state of that runtime parameter to reduce the contention of the
// mutex.
bool areWebsocketUpdatesEnabled_ = areWebSocketUpdatesEnabled();

NamedQueryCache* namedQueryCache_ = nullptr;

std::optional<std::string> pinWithExplicitName_ = std::nullopt;
};
12 changes: 12 additions & 0 deletions src/engine/QueryPlanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include <type_traits>
#include <variant>

#include "NamedQueryCache.h"
#include "backports/algorithm.h"
#include "engine/Bind.h"
#include "engine/CartesianProductJoin.h"
Expand Down Expand Up @@ -2436,6 +2437,8 @@ void QueryPlanner::GraphPatternPlanner::graphPatternOperationVisitor(Arg& arg) {
visitDescribe(arg);
} else if constexpr (std::is_same_v<T, p::SpatialQuery>) {
visitSpatialSearch(arg);
} else if constexpr (std::is_same_v<T, p::NamedCachedQuery>) {
visitNamedCachedQuery(arg);
} else {
static_assert(std::is_same_v<T, p::BasicGraphPattern>);
visitBasicGraphPattern(arg);
Expand Down Expand Up @@ -2609,6 +2612,15 @@ void QueryPlanner::GraphPatternPlanner::visitSpatialSearch(
visitGroupOptionalOrMinus(std::move(candidatesOut));
}

// _____________________________________________________________________________
void QueryPlanner::GraphPatternPlanner::visitNamedCachedQuery(
parsedQuery::NamedCachedQuery& arg) {
auto candidate = SubtreePlan{
planner_._qec, planner_._qec->namedQueryCache().getOperation(
arg.validateAndGetIdentifier(), planner_._qec)};
visitGroupOptionalOrMinus(std::vector{std::move(candidate)});
}

// _______________________________________________________________
void QueryPlanner::GraphPatternPlanner::visitUnion(parsedQuery::Union& arg) {
// TODO<joka921> here we could keep all the candidates, and create a
Expand Down
1 change: 1 addition & 0 deletions src/engine/QueryPlanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -544,6 +544,7 @@ class QueryPlanner {
void visitTransitivePath(parsedQuery::TransPath& transitivePath);
void visitPathSearch(parsedQuery::PathQuery& config);
void visitSpatialSearch(parsedQuery::SpatialQuery& config);
void visitNamedCachedQuery(parsedQuery::NamedCachedQuery& config);
void visitUnion(parsedQuery::Union& un);
void visitSubquery(parsedQuery::Subquery& subquery);
void visitDescribe(parsedQuery::Describe& describe);
Expand Down
38 changes: 32 additions & 6 deletions src/engine/Result.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,18 @@ Result::Result(IdTable idTable, std::vector<ColumnIndex> sortedBy,
assertSortOrderIsRespected(this->idTable(), sortedBy_);
}

// _____________________________________________________________________________
Result::Result(std::shared_ptr<const IdTable> idTablePtr,
std::vector<ColumnIndex> sortedBy, LocalVocab&& localVocab)
: data_{IdTableSharedLocalVocabPair{
std::move(idTablePtr),
std::make_shared<const LocalVocab>(std::move(localVocab))}},
sortedBy_{std::move(sortedBy)} {
AD_CONTRACT_CHECK(std::get<IdTableSharedLocalVocabPair>(data_).localVocab_ !=
nullptr);
assertSortOrderIsRespected(this->idTable(), sortedBy_);
}

// _____________________________________________________________________________
Result::Result(IdTable idTable, std::vector<ColumnIndex> sortedBy,
LocalVocab&& localVocab)
Expand Down Expand Up @@ -124,8 +136,13 @@ void Result::applyLimitOffset(
}
if (isFullyMaterialized()) {
ad_utility::timer::Timer limitTimer{ad_utility::timer::Timer::Started};
resizeIdTable(std::get<IdTableSharedLocalVocabPair>(data_).idTable_,
limitOffset);

auto& tableOrPtr = std::get<IdTableSharedLocalVocabPair>(data_).idTable_;
if (auto sharedTable =
std::get_if<std::shared_ptr<const IdTable>>(&tableOrPtr)) {
tableOrPtr = (**sharedTable).clone();
}
resizeIdTable(std::get<IdTable>(tableOrPtr), limitOffset);
limitTimeCallback(limitTimer.msecs(), idTable());
} else {
auto generator = [](LazyResult original, LimitOffsetClause limitOffset,
Expand Down Expand Up @@ -181,7 +198,7 @@ void Result::assertThatLimitWasRespected(const LimitOffsetClause& limitOffset) {

// _____________________________________________________________________________
void Result::checkDefinedness(const VariableToColumnMap& varColMap) {
auto performCheck = [](const auto& map, IdTable& idTable) {
auto performCheck = [](const auto& map, const IdTable& idTable) {
return ql::ranges::all_of(map, [&](const auto& varAndCol) {
const auto& [columnIndex, mightContainUndef] = varAndCol.second;
if (mightContainUndef == ColumnIndexAndTypeInfo::AlwaysDefined) {
Expand All @@ -193,8 +210,7 @@ void Result::checkDefinedness(const VariableToColumnMap& varColMap) {
});
};
if (isFullyMaterialized()) {
AD_EXPENSIVE_CHECK(performCheck(
varColMap, std::get<IdTableSharedLocalVocabPair>(data_).idTable_));
AD_EXPENSIVE_CHECK(performCheck(varColMap, idTable()));
} else {
auto generator = [](LazyResult original,
[[maybe_unused]] VariableToColumnMap varColMap,
Expand Down Expand Up @@ -254,7 +270,17 @@ void Result::assertSortOrderIsRespected(
// _____________________________________________________________________________
const IdTable& Result::idTable() const {
AD_CONTRACT_CHECK(isFullyMaterialized());
return std::get<IdTableSharedLocalVocabPair>(data_).idTable_;
auto visitor = []<typename T>(const T& arg) -> const IdTable& {
if constexpr (std::is_same_v<T, IdTable>) {
return arg;
} else {
static_assert(std::is_same_v<T, std::shared_ptr<const IdTable>>);
AD_CORRECTNESS_CHECK(arg != nullptr);
return *arg;
}
};
return std::visit(visitor,
std::get<IdTableSharedLocalVocabPair>(data_).idTable_);
}

// _____________________________________________________________________________
Expand Down
Loading
Loading