Skip to content

Commit

Permalink
YDB-2568 Enable match_recognize in ydb (ydb-platform#6807)
Browse files Browse the repository at this point in the history
  • Loading branch information
kardymonds authored Aug 6, 2024
1 parent 9c9f411 commit 18b7d76
Show file tree
Hide file tree
Showing 29 changed files with 299 additions and 67 deletions.
2 changes: 1 addition & 1 deletion ydb/core/kqp/compile_service/kqp_compile_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ class TKqpCompileActor : public TActorBootstrapped<TKqpCompileActor> {
Config->FeatureFlags = AppData(ctx)->FeatureFlags;

KqpHost = CreateKqpHost(Gateway, QueryId.Cluster, QueryId.Database, Config, ModuleResolverState->ModuleResolver,
FederatedQuerySetup, UserToken, GUCSettings, ApplicationName, AppData(ctx)->FunctionRegistry,
FederatedQuerySetup, UserToken, GUCSettings, QueryServiceConfig, ApplicationName, AppData(ctx)->FunctionRegistry,
false, false, std::move(TempTablesState), nullptr, SplitCtx);

IKqpHost::TPrepareSettings prepareSettings;
Expand Down
3 changes: 2 additions & 1 deletion ydb/core/kqp/executer_actor/ut/kqp_executer_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include <ydb/library/yql/core/services/mounts/yql_mounts.h>

#include <library/cpp/protobuf/util/pb_io.h>
#include <ydb/core/protos/config.pb.h>

namespace NKikimr {
namespace NKqp {
Expand All @@ -28,7 +29,7 @@ NKqpProto::TKqpPhyTx BuildTxPlan(const TString& sql, TIntrusivePtr<IKqpGateway>
IModuleResolver::TPtr moduleResolver;
UNIT_ASSERT(GetYqlDefaultModuleResolver(moduleCtx, moduleResolver));

auto qp = CreateKqpHost(gateway, cluster, "/Root", config, moduleResolver, NYql::IHTTPGateway::Make(), nullptr, nullptr, Nothing(), nullptr, nullptr, false, false, nullptr, actorSystem);
auto qp = CreateKqpHost(gateway, cluster, "/Root", config, moduleResolver, NYql::IHTTPGateway::Make(), nullptr, nullptr, NKikimrConfig::TQueryServiceConfig(), Nothing(), nullptr, nullptr, false, false, nullptr, actorSystem, nullptr);
auto result = qp->SyncPrepareDataQuery(sql, IKqpHost::TPrepareSettings());
result.Issues().PrintTo(Cerr);
UNIT_ASSERT(result.Success());
Expand Down
13 changes: 10 additions & 3 deletions ydb/core/kqp/host/kqp_host.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1033,7 +1033,7 @@ class TKqpHost : public IKqpHost {
std::optional<TKqpFederatedQuerySetup> federatedQuerySetup, const TIntrusiveConstPtr<NACLib::TUserToken>& userToken,
const NKikimr::NMiniKQL::IFunctionRegistry* funcRegistry, bool keepConfigChanges, bool isInternalCall,
TKqpTempTablesState::TConstPtr tempTablesState = nullptr, NActors::TActorSystem* actorSystem = nullptr,
NYql::TExprContext* ctx = nullptr)
NYql::TExprContext* ctx = nullptr, const NKikimrConfig::TQueryServiceConfig& queryServiceConfig = NKikimrConfig::TQueryServiceConfig())
: Gateway(gateway)
, Cluster(cluster)
, GUCSettings(gUCSettings)
Expand All @@ -1051,6 +1051,7 @@ class TKqpHost : public IKqpHost {
, FakeWorld(ctx ? nullptr : ExprCtx->NewWorld(TPosition()))
, ExecuteCtx(MakeIntrusive<TExecuteContext>())
, ActorSystem(actorSystem ? actorSystem : NActors::TActivationContext::ActorSystem())
, QueryServiceConfig(queryServiceConfig)
{
if (funcRegistry) {
FuncRegistry = funcRegistry;
Expand Down Expand Up @@ -1825,10 +1826,15 @@ class TKqpHost : public IKqpHost {
|| settingName == "FilterPushdownOverJoinOptionalSide"
|| settingName == "DisableFilterPushdownOverJoinOptionalSide"
|| settingName == "RotateJoinTree"
|| settingName == "TimeOrderRecoverDelay"
|| settingName == "TimeOrderRecoverAhead"
|| settingName == "TimeOrderRecoverRowLimit"
|| settingName == "MatchRecognizeStream"
;
};
auto configProvider = CreateConfigProvider(*TypesCtx, gatewaysConfig, {}, allowSettings);
TypesCtx->AddDataSource(ConfigProviderName, configProvider);
TypesCtx->MatchRecognize = QueryServiceConfig.GetEnableMatchRecognize();

YQL_ENSURE(TypesCtx->Initialize(*ExprCtx));

Expand Down Expand Up @@ -1930,6 +1936,7 @@ class TKqpHost : public IKqpHost {

TKqpTempTablesState::TConstPtr TempTablesState;
NActors::TActorSystem* ActorSystem = nullptr;
NKikimrConfig::TQueryServiceConfig QueryServiceConfig;
};

} // namespace
Expand All @@ -1950,11 +1957,11 @@ Ydb::Table::QueryStatsCollection::Mode GetStatsMode(NYql::EKikimrStatsMode stats
TIntrusivePtr<IKqpHost> CreateKqpHost(TIntrusivePtr<IKqpGateway> gateway, const TString& cluster,
const TString& database, TKikimrConfiguration::TPtr config, IModuleResolver::TPtr moduleResolver,
std::optional<TKqpFederatedQuerySetup> federatedQuerySetup, const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, const TGUCSettings::TPtr& gUCSettings,
const TMaybe<TString>& applicationName, const NKikimr::NMiniKQL::IFunctionRegistry* funcRegistry, bool keepConfigChanges,
const NKikimrConfig::TQueryServiceConfig& queryServiceConfig, const TMaybe<TString>& applicationName, const NKikimr::NMiniKQL::IFunctionRegistry* funcRegistry, bool keepConfigChanges,
bool isInternalCall, TKqpTempTablesState::TConstPtr tempTablesState, NActors::TActorSystem* actorSystem, NYql::TExprContext* ctx)
{
return MakeIntrusive<TKqpHost>(gateway, cluster, database, gUCSettings, applicationName, config, moduleResolver, federatedQuerySetup, userToken, funcRegistry,
keepConfigChanges, isInternalCall, std::move(tempTablesState), actorSystem, ctx);
keepConfigChanges, isInternalCall, std::move(tempTablesState), actorSystem, ctx, queryServiceConfig);
}

} // namespace NKqp
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/host/kqp_host.h
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ class IKqpHost : public TThrRefBase {
TIntrusivePtr<IKqpHost> CreateKqpHost(TIntrusivePtr<IKqpGateway> gateway,
const TString& cluster, const TString& database, NYql::TKikimrConfiguration::TPtr config, NYql::IModuleResolver::TPtr moduleResolver,
std::optional<TKqpFederatedQuerySetup> federatedQuerySetup, const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, const TGUCSettings::TPtr& gUCSettings,
const TMaybe<TString>& applicationName = Nothing(), const NKikimr::NMiniKQL::IFunctionRegistry* funcRegistry = nullptr,
const NKikimrConfig::TQueryServiceConfig& queryServiceConfig, const TMaybe<TString>& applicationName = Nothing(), const NKikimr::NMiniKQL::IFunctionRegistry* funcRegistry = nullptr,
bool keepConfigChanges = false, bool isInternalCall = false, TKqpTempTablesState::TConstPtr tempTablesState = nullptr,
NActors::TActorSystem* actorSystem = nullptr /*take from TLS by default*/,
NYql::TExprContext* ctx = nullptr);
Expand Down
10 changes: 10 additions & 0 deletions ydb/core/kqp/opt/logical/kqp_opt_log.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include <ydb/core/kqp/opt/physical/kqp_opt_phy_rules.h>
#include <ydb/core/kqp/provider/yql_kikimr_provider_impl.h>

#include <ydb/library/yql/core/yql_opt_match_recognize.h>
#include <ydb/library/yql/core/yql_opt_utils.h>
#include <ydb/library/yql/dq/opt/dq_opt_join.h>
#include <ydb/library/yql/dq/opt/dq_opt_log.h>
Expand Down Expand Up @@ -60,6 +61,7 @@ class TKqpLogicalOptTransformer : public TOptimizeTransformerBase {
AddHandler(0, &TCoNarrowFlatMap::Match, HNDL(DqReadWideWrapFieldSubset));
AddHandler(0, &TCoNarrowMultiMap::Match, HNDL(DqReadWideWrapFieldSubset));
AddHandler(0, &TCoWideMap::Match, HNDL(DqReadWideWrapFieldSubset));
AddHandler(0, &TCoMatchRecognize::Match, HNDL(MatchRecognize));

AddHandler(1, &TCoTop::Match, HNDL(RewriteTopSortOverIndexRead));
AddHandler(1, &TCoTopSort::Match, HNDL(RewriteTopSortOverIndexRead));
Expand Down Expand Up @@ -288,6 +290,14 @@ class TKqpLogicalOptTransformer : public TOptimizeTransformerBase {
return output;
}

TMaybeNode<TExprBase> MatchRecognize(TExprBase node, TExprContext& ctx) {
auto output = ExpandMatchRecognize(node.Ptr(), ctx, TypesCtx);
if (output) {
DumpAppliedRule("MatchRecognize", node.Ptr(), output, ctx);
}
return output;
}

TMaybeNode<TExprBase> DqReadWrapByProvider(TExprBase node, TExprContext& ctx) {
auto output = NDq::DqReadWrapByProvider(node, ctx, TypesCtx);
if (output) {
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/session_actor/kqp_worker_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ class TKqpWorkerActor : public TActorBootstrapped<TKqpWorkerActor> {
Config->FeatureFlags = AppData(ctx)->FeatureFlags;

KqpHost = CreateKqpHost(Gateway, Settings.Cluster, Settings.Database, Config, ModuleResolverState->ModuleResolver, FederatedQuerySetup,
QueryState->RequestEv->GetUserToken(), GUCSettings, Settings.ApplicationName, AppData(ctx)->FunctionRegistry, !Settings.LongSession, false);
QueryState->RequestEv->GetUserToken(), GUCSettings, QueryServiceConfig, Settings.ApplicationName, AppData(ctx)->FunctionRegistry, !Settings.LongSession, false, nullptr, nullptr, nullptr);

auto& queryRequest = QueryState->RequestEv;
QueryState->ProxyRequestId = proxyRequestId;
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ TIntrusivePtr<IKqpHost> CreateKikimrQueryProcessor(TIntrusivePtr<IKqpGateway> ga

auto federatedQuerySetup = std::make_optional<TKqpFederatedQuerySetup>({NYql::IHTTPGateway::Make(), nullptr, nullptr, nullptr, {}, {}, {}, nullptr, nullptr, {}});
return NKqp::CreateKqpHost(gateway, cluster, "/Root", kikimrConfig, moduleResolver,
federatedQuerySetup, nullptr, nullptr, {}, funcRegistry, funcRegistry, keepConfigChanges, nullptr, actorSystem);
federatedQuerySetup, nullptr, nullptr, NKikimrConfig::TQueryServiceConfig(), {}, funcRegistry, funcRegistry, keepConfigChanges, nullptr, actorSystem, nullptr);
}

NYql::NNodes::TExprBase GetExpr(const TString& ast, NYql::TExprContext& ctx, NYql::IModuleResolver* moduleResolver) {
Expand Down
87 changes: 87 additions & 0 deletions ydb/core/kqp/ut/yql/kqp_pragma_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,93 @@ Y_UNIT_TEST_SUITE(KqpPragma) {
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
UNIT_ASSERT_C(result.GetIssues().Empty(), result.GetIssues().ToString());
}

Y_UNIT_TEST(MatchRecognizeWithTimeOrderRecoverer) {
TKikimrSettings settings;
NKikimrConfig::TAppConfig appConfig;
appConfig.MutableQueryServiceConfig()->SetEnableMatchRecognize(true);
settings.SetAppConfig(appConfig);

TKikimrRunner kikimr(settings);
NYdb::NScripting::TScriptingClient client(kikimr.GetDriver());

auto result = client.ExecuteYqlScript(R"(
PRAGMA FeatureR010="prototype";

CREATE TABLE `/Root/NewTable` (
dt Uint64,
value String,
PRIMARY KEY (dt)
);
COMMIT;

INSERT INTO `/Root/NewTable` (dt, value) VALUES
(1, 'value1'), (2, 'value2'), (3, 'value3'), (4, 'value4');
COMMIT;

SELECT * FROM (SELECT dt, value FROM `/Root/NewTable`)
MATCH_RECOGNIZE(
ORDER BY CAST(dt as Timestamp)
MEASURES
LAST(V1.dt) as v1,
LAST(V4.dt) as v4
ONE ROW PER MATCH
PATTERN (V1 V* V4)
DEFINE
V1 as V1.value = "value1",
V as True,
V4 as V4.value = "value4"
);
)").GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
CompareYson(R"([
[[1u];[4u]];
])", FormatResultSetYson(result.GetResultSet(0)));
}

Y_UNIT_TEST(MatchRecognizeWithoutTimeOrderRecoverer) {
TKikimrSettings settings;
NKikimrConfig::TAppConfig appConfig;
appConfig.MutableQueryServiceConfig()->SetEnableMatchRecognize(true);
settings.SetAppConfig(appConfig);

TKikimrRunner kikimr(settings);
NYdb::NScripting::TScriptingClient client(kikimr.GetDriver());

auto result = client.ExecuteYqlScript(R"(
PRAGMA FeatureR010="prototype";
PRAGMA config.flags("MatchRecognizeStream", "disable");

CREATE TABLE `/Root/NewTable` (
dt Uint64,
value String,
PRIMARY KEY (dt)
);
COMMIT;

INSERT INTO `/Root/NewTable` (dt, value) VALUES
(1, 'value1'), (2, 'value2'), (3, 'value3'), (4, 'value4');
COMMIT;

SELECT * FROM (SELECT dt, value FROM `/Root/NewTable`)
MATCH_RECOGNIZE(
ORDER BY CAST(dt as Timestamp)
MEASURES
LAST(V1.dt) as v1,
LAST(V4.dt) as v4
ONE ROW PER MATCH
PATTERN (V1 V* V4)
DEFINE
V1 as V1.value = "value1",
V as True,
V4 as V4.value = "value4"
);
)").GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
CompareYson(R"([
[[1u];[4u]];
])", FormatResultSetYson(result.GetResultSet(0)));
}
}

} // namspace NKqp
Expand Down
1 change: 1 addition & 0 deletions ydb/core/protos/config.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1016,6 +1016,7 @@ message TQueryServiceConfig {
optional TFinalizeScriptServiceConfig FinalizeScriptServiceConfig = 12;
optional uint64 ProgressStatsPeriodMs = 14 [default = 0]; // 0 = disabled
optional uint32 QueryTimeoutDefaultSeconds = 19 [default = 1800];
optional bool EnableMatchRecognize = 20 [default = false];
}

// Config describes immediate controls and allows
Expand Down
42 changes: 10 additions & 32 deletions ydb/library/yql/core/yql_opt_match_recognize.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ TExprNode::TPtr ExpandMatchRecognize(const TExprNode::TPtr& node, TExprContext&
ExtractSortKeyAndOrder(pos, sortTraits, sortKey, sortOrder, ctx);
TExprNode::TPtr result;
if (isStreaming) {
YQL_ENSURE(sortOrder->ChildrenSize() == 1, "Expect ORDER BY timestamp for MATCH_RECOGNIZE on streams");
YQL_ENSURE(sortOrder->ChildrenSize() == 1, "Expect ORDER BY timestamp for MATCH_RECOGNIZE");
const auto reordered = ctx.Builder(pos)
.Lambda()
.Param("partition")
Expand Down Expand Up @@ -216,37 +216,15 @@ TExprNode::TPtr ExpandMatchRecognize(const TExprNode::TPtr& node, TExprContext&
.Seal()
.Build();
} else { //non-streaming
if (partitionColumns->ChildrenSize() != 0) {
result = ctx.Builder(pos)
.Callable("PartitionsByKeys")
.Add(0, input)
.Add(1, partitionKeySelector)
.Add(2, sortOrder)
.Add(3, sortKey)
.Add(4, matchRecognize)
.Seal()
.Build();
} else {
if (sortOrder->IsCallable("Void")) {
result = ctx.Builder(pos)
.Apply(matchRecognize)
.With(0, input)
.Seal()
.Build();;
} else {
result = ctx.Builder(pos)
.Apply(matchRecognize)
.With(0)
.Callable("Sort")
.Add(0, input)
.Add(1, sortOrder)
.Add(2, sortKey)
.Seal()
.Done()
.Seal()
.Build();
}
}
result = ctx.Builder(pos)
.Callable("PartitionsByKeys")
.Add(0, input)
.Add(1, partitionKeySelector)
.Add(2, sortOrder)
.Add(3, sortKey)
.Add(4, matchRecognize)
.Seal()
.Build();
}
YQL_CLOG(INFO, Core) << "Expanded MatchRecognize";
return result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1892,9 +1892,9 @@
],
"test.test[match_recognize-test_type-default.txt-Debug]": [
{
"checksum": "cb5512aae3f5566055b2388be6d114af",
"size": 3220,
"uri": "https://{canondata_backend}/1937367/518bbcf510ad7a43c5e77746bafd21ed0e3fdc6e/resource.tar.gz#test.test_match_recognize-test_type-default.txt-Debug_/opt.yql_patched"
"checksum": "648119cc488bae598a0936f9d2c82b7e",
"size": 3458,
"uri": "https://{canondata_backend}/1942173/c4d7dbc720e57397caf847cd2616b1362110ddd2/resource.tar.gz#test.test_match_recognize-test_type-default.txt-Debug_/opt.yql_patched"
}
],
"test.test[match_recognize-test_type-default.txt-Plan]": [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1717,9 +1717,9 @@
],
"test.test[match_recognize-alerts-default.txt-Debug]": [
{
"checksum": "782bb90b80a43308dfef1dbd81055b12",
"size": 5618,
"uri": "https://{canondata_backend}/1942173/e32f1de19c4f2770a6f215d1dc22bc97e318bf22/resource.tar.gz#test.test_match_recognize-alerts-default.txt-Debug_/opt.yql_patched"
"checksum": "c8b1e13d6da573f8a1afd415db1d00e7",
"size": 5787,
"uri": "https://{canondata_backend}/1917492/86ab0de654a60bf1e3145a3d8e3d7eae4a9f26b8/resource.tar.gz#test.test_match_recognize-alerts-default.txt-Debug_/opt.yql_patched"
}
],
"test.test[match_recognize-alerts-default.txt-Plan]": [
Expand Down
22 changes: 22 additions & 0 deletions ydb/library/yql/tests/sql/dq_file/part19/canondata/result.json
Original file line number Diff line number Diff line change
Expand Up @@ -1784,6 +1784,28 @@
}
],
"test.test[limit-empty_input_after_limit-default.txt-Results]": [],
"test.test[match_recognize-alerts_without_order-default.txt-Analyze]": [
{
"checksum": "b4dd508a329723c74293d80f0278c705",
"size": 505,
"uri": "https://{canondata_backend}/1917492/ef839f70e5a2f493427f7f92ed00d26a993f6d4a/resource.tar.gz#test.test_match_recognize-alerts_without_order-default.txt-Analyze_/plan.txt"
}
],
"test.test[match_recognize-alerts_without_order-default.txt-Debug]": [
{
"checksum": "17c5c1f84ac65b6a82234cd0b0a41a68",
"size": 5699,
"uri": "https://{canondata_backend}/1917492/ef839f70e5a2f493427f7f92ed00d26a993f6d4a/resource.tar.gz#test.test_match_recognize-alerts_without_order-default.txt-Debug_/opt.yql_patched"
}
],
"test.test[match_recognize-alerts_without_order-default.txt-Plan]": [
{
"checksum": "b4dd508a329723c74293d80f0278c705",
"size": 505,
"uri": "https://{canondata_backend}/1917492/ef839f70e5a2f493427f7f92ed00d26a993f6d4a/resource.tar.gz#test.test_match_recognize-alerts_without_order-default.txt-Plan_/plan.txt"
}
],
"test.test[match_recognize-alerts_without_order-default.txt-Results]": [],
"test.test[optimizers-unused_columns_group_one_of_multi--Analyze]": [
{
"checksum": "ffcfe803a5b4bbfe9af72cc128197217",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
<tmp_path>/program.sql:<main>: Fatal: Optimization

<tmp_path>/program.sql:<main>:8:1: Fatal: ydb/library/yql/core/yql_opt_match_recognize.cpp:xxx ExpandMatchRecognize(): requirement sortOrder->ChildrenSize() == 1 failed, message: Expect ORDER BY timestamp for MATCH_RECOGNIZE
select * from (select * from AS_TABLE($data) MATCH_RECOGNIZE(
^
Original file line number Diff line number Diff line change
Expand Up @@ -1653,9 +1653,9 @@
],
"test.test[match_recognize-alerts-default.txt-Debug]": [
{
"checksum": "900161f08e14b0b4c725130ed055ca73",
"size": 5617,
"uri": "https://{canondata_backend}/1880306/5d2fb97b23cd70975bc5d744391981f9d5595c04/resource.tar.gz#test.test_match_recognize-alerts-default.txt-Debug_/opt.yql_patched"
"checksum": "902f8b167c5875200480d237a6493bb7",
"size": 5786,
"uri": "https://{canondata_backend}/1903885/f00a3197fa44aa3d49bf7fe1bbf0fed52ce265b9/resource.tar.gz#test.test_match_recognize-alerts-default.txt-Debug_/opt.yql_patched"
}
],
"test.test[match_recognize-alerts-default.txt-Plan]": [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1737,9 +1737,9 @@
],
"test.test[match_recognize-test_type-default.txt-Debug]": [
{
"checksum": "e0e549a969a71f8fddbd772e08bbeebe",
"size": 3219,
"uri": "https://{canondata_backend}/1937492/7ae37c32b42bb57d4df171a62ced7ab76867a8ea/resource.tar.gz#test.test_match_recognize-test_type-default.txt-Debug_/opt.yql_patched"
"checksum": "367551185530c7b04aa9da2f8afa111f",
"size": 3457,
"uri": "https://{canondata_backend}/1903885/a4d0122d8471ff0ca85352e617bed922d9ad8df1/resource.tar.gz#test.test_match_recognize-test_type-default.txt-Debug_/opt.yql_patched"
}
],
"test.test[match_recognize-test_type-default.txt-Plan]": [
Expand Down
Loading

0 comments on commit 18b7d76

Please sign in to comment.