Skip to content

Commit

Permalink
Only call DqOptimizeEquiJoinWithCosts when DQ connections are present (
Browse files Browse the repository at this point in the history
  • Loading branch information
alephonea authored Jan 14, 2025
1 parent 7e240c6 commit f4f16ff
Showing 1 changed file with 45 additions and 39 deletions.
84 changes: 45 additions & 39 deletions ydb/library/yql/providers/dq/opt/logical_optimize.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -258,53 +258,48 @@ class TDqsLogicalOptProposalTransformer : public TOptimizeTransformerBase {
}

TMaybeNode<TExprBase> OptimizeEquiJoinWithCosts(TExprBase node, TExprContext& ctx) {
if (TypesCtx.CostBasedOptimizer != ECostBasedOptimizerType::Disable) {
std::function<void(const TString&)> log = [&](auto str) {
YQL_CLOG(INFO, ProviderDq) << str;
};

auto factory = MakeCBOOptimizerFactory();
std::shared_ptr<IOptimizerNew> opt;
TDqCBOProviderContext pctx(TypesCtx, Config);

switch (TypesCtx.CostBasedOptimizer) {
case ECostBasedOptimizerType::Native:
opt = factory->MakeJoinCostBasedOptimizerNative(pctx, ctx, {.MaxDPhypDPTableSize = 100000});
break;
case ECostBasedOptimizerType::PG:
opt = factory->MakeJoinCostBasedOptimizerPG(pctx, ctx, {.Logger = log});
break;
default:
YQL_ENSURE(false, "Unknown CBO type");
break;
}
std::function<void(TVector<std::shared_ptr<TRelOptimizerNode>>&, TStringBuf, const TExprNode::TPtr, const std::shared_ptr<TOptimizerStatistics>&)> providerCollect = [](auto& rels, auto label, auto node, auto stats) {
Y_UNUSED(node);
auto rel = std::make_shared<TRelOptimizerNode>(TString(label), *stats);
rels.push_back(rel);
};

return DqOptimizeEquiJoinWithCosts(node, ctx, TypesCtx, 2, *opt, providerCollect);
} else {
auto equiJoin = node.Cast<TCoEquiJoin>();
if (!HasDqConnectionsInEquiJoin(equiJoin)) {
return node;
}
if (TypesCtx.CostBasedOptimizer == ECostBasedOptimizerType::Disable) {
return node;
}

std::function<void(const TString&)> log = [&](auto str) {
YQL_CLOG(INFO, ProviderDq) << str;
};

auto factory = MakeCBOOptimizerFactory();
std::shared_ptr<IOptimizerNew> opt;
TDqCBOProviderContext pctx(TypesCtx, Config);

switch (TypesCtx.CostBasedOptimizer) {
case ECostBasedOptimizerType::Native:
opt = factory->MakeJoinCostBasedOptimizerNative(pctx, ctx, {.MaxDPhypDPTableSize = 100000});
break;
case ECostBasedOptimizerType::PG:
opt = factory->MakeJoinCostBasedOptimizerPG(pctx, ctx, {.Logger = log});
break;
case NYql::ECostBasedOptimizerType::Disable:
break;
}
std::function<void(TVector<std::shared_ptr<TRelOptimizerNode>>&, TStringBuf, const TExprNode::TPtr, const std::shared_ptr<TOptimizerStatistics>&)> providerCollect = [](auto& rels, auto label, auto node, auto stats) {
Y_UNUSED(node);
auto rel = std::make_shared<TRelOptimizerNode>(TString(label), *stats);
rels.push_back(rel);
};

return DqOptimizeEquiJoinWithCosts(node, ctx, TypesCtx, 2, *opt, providerCollect);
}

TMaybeNode<TExprBase> RewriteEquiJoin(TExprBase node, TExprContext& ctx) {
auto equiJoin = node.Cast<TCoEquiJoin>();
bool hasDqConnections = false;
for (size_t i = 0; i + 2 < equiJoin.ArgCount(); ++i) {
auto list = equiJoin.Arg(i).Cast<TCoEquiJoinInput>().List();
if (auto maybeExtractMembers = list.Maybe<TCoExtractMembers>()) {
list = maybeExtractMembers.Cast().Input();
}
if (auto maybeFlatMap = list.Maybe<TCoFlatMapBase>()) {
list = maybeFlatMap.Cast().Input();
}
hasDqConnections |= !!list.Maybe<TDqConnection>();
if (!HasDqConnectionsInEquiJoin(equiJoin)) {
return node;
}

return hasDqConnections ? DqRewriteEquiJoin(node, Config->HashJoinMode.Get().GetOrElse(EHashJoinMode::Off), false, ctx, TypesCtx) : node;
return DqRewriteEquiJoin(node, Config->HashJoinMode.Get().GetOrElse(EHashJoinMode::Off), false, ctx, TypesCtx);
}

TMaybeNode<TExprBase> ExpandWindowFunctions(TExprBase node, TExprContext& ctx) {
Expand Down Expand Up @@ -349,6 +344,17 @@ class TDqsLogicalOptProposalTransformer : public TOptimizeTransformerBase {

private:

bool HasDqConnectionsInEquiJoin(const TCoEquiJoin& equiJoin) {
for (size_t i = 0; i + 2 < equiJoin.ArgCount(); ++i) {
const auto& list = SkipCallables(equiJoin.Arg(i).Cast<TCoEquiJoinInput>().List().Ref(),
{"ExtractMembers", "FlatMap", "OrderedFlatMap"});
if (TDqConnection::Match(&list)) {
return true;
}
}
return false;
}

void EnsureNotDistinct(const TCoAggregate& aggregate) {
const auto& aggregateHandlers = aggregate.Handlers();

Expand Down

0 comments on commit f4f16ff

Please sign in to comment.