From f28f1b734b22e9c93b6c993820c165e4cfad9ce0 Mon Sep 17 00:00:00 2001 From: Nikolay Perfilov Date: Thu, 12 Dec 2024 18:07:26 +0300 Subject: [PATCH 01/20] Add store column support for ydb workload kv --- ydb/library/workload/kv/kv.cpp | 45 +++++++++++++++---- ydb/library/workload/kv/kv.h | 6 +++ .../lib/ydb_cli/commands/ydb_workload.cpp | 7 +++ 3 files changed, 50 insertions(+), 8 deletions(-) diff --git a/ydb/library/workload/kv/kv.cpp b/ydb/library/workload/kv/kv.cpp index 58ba71fc38d5..c696d4b722cd 100644 --- a/ydb/library/workload/kv/kv.cpp +++ b/ydb/library/workload/kv/kv.cpp @@ -151,10 +151,14 @@ std::string TKvWorkloadGenerator::GetDDLQueries() const { for (size_t i = 0; i < Params.ColumnsCnt; ++i) { if (i < Params.IntColumnsCnt) { - ss << "c" << i << " Uint64, "; + ss << "c" << i << " Uint64"; } else { - ss << "c" << i << " String, "; + ss << "c" << i << " String"; } + if (i < Params.KeyColumnsCnt && Params.GetStoreType() == TKvWorkloadParams::EStoreType::Column) { + ss << " NOT NULL"; + } + ss << ", "; } ss << "PRIMARY KEY("; @@ -166,13 +170,23 @@ std::string TKvWorkloadGenerator::GetDDLQueries() const { } ss << ")) WITH ("; - if (Params.PartitionsByLoad) { - ss << "AUTO_PARTITIONING_BY_LOAD = ENABLED, "; + switch (Params.GetStoreType()) { + case TKvWorkloadParams::EStoreType::Row: + ss << "STORE = ROW, "; + if (Params.PartitionsByLoad) { + ss << "AUTO_PARTITIONING_BY_LOAD = ENABLED, "; + } + ss << "UNIFORM_PARTITIONS = " << Params.MinPartitions << ", "; + ss << "AUTO_PARTITIONING_MAX_PARTITIONS_COUNT = " << Max(Params.MinPartitions, Params.MaxPartitions) << ", "; + ss << "AUTO_PARTITIONING_PARTITION_SIZE_MB = " << Params.PartitionSizeMb << ", "; + break; + case TKvWorkloadParams::EStoreType::Column: + ss << "STORE = COLUMN, "; + break; + default: + throw yexception() << "Unsupported store type: " << Params.GetStoreType(); } - ss << "AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = " << Params.MinPartitions << ", "; - ss << "AUTO_PARTITIONING_PARTITION_SIZE_MB = " << Params.PartitionSizeMb << ", "; - ss << "UNIFORM_PARTITIONS = " << Params.MinPartitions << ", "; - ss << "AUTO_PARTITIONING_MAX_PARTITIONS_COUNT = " << Max(Params.MinPartitions, Params.MaxPartitions) << ")"; + ss << "AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = " << Params.MinPartitions << ")"; return ss.str(); } @@ -449,10 +463,14 @@ TQueryInfoList TKvWorkloadGenerator::Mixed() { TQueryInfoList TKvWorkloadGenerator::GetInitialData() { TQueryInfoList res; + try { for (size_t i = 0; i < Params.InitRowCount; ++i) { auto queryInfos = Upsert(GenerateRandomRows()); res.insert(res.end(), queryInfos.begin(), queryInfos.end()); } + } catch (const std::exception& e) { + Cerr << "Exception in GetInitialData(): " << e.what() << Endl; + } return res; } @@ -517,6 +535,17 @@ void TKvWorkloadParams::ConfigureOpts(NLastGetopt::TOpts& opts, const ECommandTy .DefaultValue((ui64)KvWorkloadConstants::KEY_COLUMNS_CNT).StoreResult(&KeyColumnsCnt); opts.AddLongOption("rows", "Number of rows") .DefaultValue((ui64)KvWorkloadConstants::ROWS_CNT).StoreResult(&RowsCnt); + opts.AddLongOption("store", "Storage type." + " Options: row, column\n" + " row - use row-based storage engine;\n" + " column - use column-based storage engine.") + .DefaultValue(StoreType) + .Handler1T([this](TStringBuf arg) { + const auto l = to_lower(TString(arg)); + if (!TryFromString(arg, StoreType)) { + throw yexception() << "Ivalid store type: " << arg; + } + }); break; case TWorkloadParams::ECommandType::Run: opts.AddLongOption("max-first-key", "Maximum value of a first primary key") diff --git a/ydb/library/workload/kv/kv.h b/ydb/library/workload/kv/kv.h index f7859adf5886..05c4520a8acc 100644 --- a/ydb/library/workload/kv/kv.h +++ b/ydb/library/workload/kv/kv.h @@ -30,6 +30,11 @@ enum KvWorkloadConstants : ui64 { class TKvWorkloadParams : public TWorkloadParams { public: + enum class EStoreType { + Row /* "row" */, + Column /* "column" */, + }; + void ConfigureOpts(NLastGetopt::TOpts& opts, const ECommandType commandType, int workloadType) override; THolder CreateGenerator() const override; TString GetWorkloadName() const override; @@ -52,6 +57,7 @@ class TKvWorkloadParams : public TWorkloadParams { const std::string TableName = "kv_test"; bool StaleRO = KvWorkloadConstants::STALE_RO; + YDB_READONLY(EStoreType, StoreType, EStoreType::Row); }; class TKvWorkloadGenerator final: public TWorkloadQueryGeneratorBase { diff --git a/ydb/public/lib/ydb_cli/commands/ydb_workload.cpp b/ydb/public/lib/ydb_cli/commands/ydb_workload.cpp index aeaa1c0e5596..bb0d9f3c5af3 100644 --- a/ydb/public/lib/ydb_cli/commands/ydb_workload.cpp +++ b/ydb/public/lib/ydb_cli/commands/ydb_workload.cpp @@ -482,7 +482,9 @@ int TWorkloadCommandInit::DoRun(NYdbWorkload::IWorkloadQueryGenerator& workloadG Cout << Endl; } } else { + try { for (auto queryInfo : queryInfoList) { + Cerr << "<"; auto result = QueryClient->ExecuteQuery( queryInfo.Query.c_str(), NYdb::NQuery::TTxControl::BeginTx(NYdb::NQuery::TTxSettings::SerializableRW()).CommitTx(), @@ -492,6 +494,11 @@ int TWorkloadCommandInit::DoRun(NYdbWorkload::IWorkloadQueryGenerator& workloadG << "Query:\n" << queryInfo.Query << Endl; return EXIT_FAILURE; } + + Cerr << ">"; + } + } catch (const std::exception& e) { + Cerr << "Exception in GetInitialData(): " << e.what() << Endl; } } return EXIT_SUCCESS; From eac2cd85e0887308343b9b76ca2e79aa53769dee Mon Sep 17 00:00:00 2001 From: Maxim Yurchuk Date: Fri, 10 Jan 2025 12:41:22 +0300 Subject: [PATCH 02/20] move kv workload to other workload tests --- ydb/tests/functional/ya.make | 1 - .../kv/tests/test_workload.py} | 0 .../{functional/kv_workload => workloads/kv/tests}/ya.make | 0 ydb/tests/workloads/kv/ya.make | 4 ++++ ydb/tests/workloads/ya.make | 1 + 5 files changed, 5 insertions(+), 1 deletion(-) rename ydb/tests/{functional/kv_workload/test_kv_workload.py => workloads/kv/tests/test_workload.py} (100%) rename ydb/tests/{functional/kv_workload => workloads/kv/tests}/ya.make (100%) create mode 100644 ydb/tests/workloads/kv/ya.make diff --git a/ydb/tests/functional/ya.make b/ydb/tests/functional/ya.make index 0b76e65922d7..f551b6239e53 100644 --- a/ydb/tests/functional/ya.make +++ b/ydb/tests/functional/ya.make @@ -13,7 +13,6 @@ RECURSE( encryption hive kqp - kv_workload large_serializable limits postgresql diff --git a/ydb/tests/functional/kv_workload/test_kv_workload.py b/ydb/tests/workloads/kv/tests/test_workload.py similarity index 100% rename from ydb/tests/functional/kv_workload/test_kv_workload.py rename to ydb/tests/workloads/kv/tests/test_workload.py diff --git a/ydb/tests/functional/kv_workload/ya.make b/ydb/tests/workloads/kv/tests/ya.make similarity index 100% rename from ydb/tests/functional/kv_workload/ya.make rename to ydb/tests/workloads/kv/tests/ya.make diff --git a/ydb/tests/workloads/kv/ya.make b/ydb/tests/workloads/kv/ya.make new file mode 100644 index 000000000000..d850faba6b2e --- /dev/null +++ b/ydb/tests/workloads/kv/ya.make @@ -0,0 +1,4 @@ +RECURSE_FOR_TESTS( + tests +) + diff --git a/ydb/tests/workloads/ya.make b/ydb/tests/workloads/ya.make index 4ce41e677354..9966ad9e4c10 100644 --- a/ydb/tests/workloads/ya.make +++ b/ydb/tests/workloads/ya.make @@ -1,4 +1,5 @@ RECURSE( + kv olap_workload simple_queue statistics_workload From f0ae5c1a2c90f6902a777d2a51b2fb21114628f3 Mon Sep 17 00:00:00 2001 From: Maxim Yurchuk Date: Fri, 10 Jan 2025 12:47:24 +0300 Subject: [PATCH 03/20] fix --- ydb/tests/workloads/kv/tests/ya.make | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ydb/tests/workloads/kv/tests/ya.make b/ydb/tests/workloads/kv/tests/ya.make index 72ff841b10ef..d99f532e7132 100644 --- a/ydb/tests/workloads/kv/tests/ya.make +++ b/ydb/tests/workloads/kv/tests/ya.make @@ -6,7 +6,7 @@ ENV(YDB_ERASURE=mirror_3_dc) ENV(YDB_USE_IN_MEMORY_PDISKS=true) TEST_SRCS( - test_kv_workload.py + test_workload.py ) IF (SANITIZER_TYPE) From acf965c95070a67f1abea52da28bd055ddcd9c2e Mon Sep 17 00:00:00 2001 From: Maxim Yurchuk Date: Fri, 10 Jan 2025 13:40:50 +0300 Subject: [PATCH 04/20] [wip] kv column --- ydb/library/workload/kv/kv.cpp | 4 ---- ydb/public/lib/ydb_cli/commands/ydb_workload.cpp | 7 ------- ydb/tests/workloads/kv/tests/test_workload.py | 3 ++- 3 files changed, 2 insertions(+), 12 deletions(-) diff --git a/ydb/library/workload/kv/kv.cpp b/ydb/library/workload/kv/kv.cpp index c696d4b722cd..89036e2eded1 100644 --- a/ydb/library/workload/kv/kv.cpp +++ b/ydb/library/workload/kv/kv.cpp @@ -463,14 +463,10 @@ TQueryInfoList TKvWorkloadGenerator::Mixed() { TQueryInfoList TKvWorkloadGenerator::GetInitialData() { TQueryInfoList res; - try { for (size_t i = 0; i < Params.InitRowCount; ++i) { auto queryInfos = Upsert(GenerateRandomRows()); res.insert(res.end(), queryInfos.begin(), queryInfos.end()); } - } catch (const std::exception& e) { - Cerr << "Exception in GetInitialData(): " << e.what() << Endl; - } return res; } diff --git a/ydb/public/lib/ydb_cli/commands/ydb_workload.cpp b/ydb/public/lib/ydb_cli/commands/ydb_workload.cpp index 41525c52f436..7b711ff5440b 100644 --- a/ydb/public/lib/ydb_cli/commands/ydb_workload.cpp +++ b/ydb/public/lib/ydb_cli/commands/ydb_workload.cpp @@ -482,9 +482,7 @@ int TWorkloadCommandInit::DoRun(NYdbWorkload::IWorkloadQueryGenerator& workloadG Cout << Endl; } } else { - try { for (auto queryInfo : queryInfoList) { - Cerr << "<"; auto result = QueryClient->ExecuteQuery( queryInfo.Query.c_str(), NYdb::NQuery::TTxControl::BeginTx(NYdb::NQuery::TTxSettings::SerializableRW()).CommitTx(), @@ -494,11 +492,6 @@ int TWorkloadCommandInit::DoRun(NYdbWorkload::IWorkloadQueryGenerator& workloadG << "Query:\n" << queryInfo.Query << Endl; return EXIT_FAILURE; } - - Cerr << ">"; - } - } catch (const std::exception& e) { - Cerr << "Exception in GetInitialData(): " << e.what() << Endl; } } return EXIT_SUCCESS; diff --git a/ydb/tests/workloads/kv/tests/test_workload.py b/ydb/tests/workloads/kv/tests/test_workload.py index ffb376240b2f..f3d31a832490 100644 --- a/ydb/tests/workloads/kv/tests/test_workload.py +++ b/ydb/tests/workloads/kv/tests/test_workload.py @@ -34,7 +34,8 @@ def test(self): "--init-upserts", "0", "--cols", "5", "--int-cols", "2", - "--key-cols", "3" + "--key-cols", "3", + "--store", "column", ], wait=True ) From d7b36e1dacde45bb72b879bc54162442934666f0 Mon Sep 17 00:00:00 2001 From: Maxim Yurchuk Date: Fri, 10 Jan 2025 15:16:22 +0300 Subject: [PATCH 05/20] Add test --- ydb/library/workload/kv/kv.cpp | 8 ++ ydb/library/workload/kv/kv.h | 2 +- ydb/tests/workloads/kv/tests/test_workload.py | 83 ++++++++++--------- 3 files changed, 52 insertions(+), 41 deletions(-) diff --git a/ydb/library/workload/kv/kv.cpp b/ydb/library/workload/kv/kv.cpp index 89036e2eded1..5f844722d05d 100644 --- a/ydb/library/workload/kv/kv.cpp +++ b/ydb/library/workload/kv/kv.cpp @@ -507,6 +507,14 @@ TVector TKvWorkloadGenerator::GenerateRandomRows(bool randomValues) { } void TKvWorkloadParams::ConfigureOpts(NLastGetopt::TOpts& opts, const ECommandType commandType, int workloadType) { + opts.AddLongOption('p', "path", "Path where benchmark tables are located") + .Optional() + .DefaultValue(TableName) + .Handler1T([this](TStringBuf arg) { + while(arg.SkipPrefix("/")); + while(arg.ChopSuffix("/")); + TableName = arg; + }); switch (commandType) { case TWorkloadParams::ECommandType::Init: opts.AddLongOption("init-upserts", "count of upserts need to create while table initialization") diff --git a/ydb/library/workload/kv/kv.h b/ydb/library/workload/kv/kv.h index 05c4520a8acc..f270152a8e83 100644 --- a/ydb/library/workload/kv/kv.h +++ b/ydb/library/workload/kv/kv.h @@ -54,7 +54,7 @@ class TKvWorkloadParams : public TWorkloadParams { ui64 MixedDoReadRows = KvWorkloadConstants::MIXED_DO_READ_ROWS; ui64 MixedDoSelect = KvWorkloadConstants::MIXED_DO_SELECT; - const std::string TableName = "kv_test"; + std::string TableName = "kv_test"; bool StaleRO = KvWorkloadConstants::STALE_RO; YDB_READONLY(EStoreType, StoreType, EStoreType::Row); diff --git a/ydb/tests/workloads/kv/tests/test_workload.py b/ydb/tests/workloads/kv/tests/test_workload.py index f3d31a832490..13758d7dddb1 100644 --- a/ydb/tests/workloads/kv/tests/test_workload.py +++ b/ydb/tests/workloads/kv/tests/test_workload.py @@ -1,6 +1,8 @@ # -*- coding: utf-8 -*- import os +import pytest + import yatest from ydb.tests.library.harness.kikimr_runner import KiKiMR @@ -13,48 +15,49 @@ class TestYdbKvWorkload(object): def setup_class(cls): cls.cluster = KiKiMR(KikimrConfigGenerator(erasure=Erasure.MIRROR_3_DC)) cls.cluster.start() + cls.init_command_prefix = [ + yatest.common.binary_path(os.getenv("YDB_CLI_BINARY")), + "--verbose", + "--endpoint", "grpc://localhost:%d" % cls.cluster.nodes[1].grpc_port, + "--database=/Root", + "workload", "kv", "init", + "--min-partitions", "1", + "--partition-size", "10", + "--auto-partition", "0", + "--init-upserts", "0", + "--cols", "5", + "--int-cols", "2", + "--key-cols", "3", + ] + + cls.run_command_prefix = [ + yatest.common.binary_path(os.getenv("YDB_CLI_BINARY")), + "--verbose", + "--endpoint", "grpc://localhost:%d" % cls.cluster.nodes[1].grpc_port, + "--database=/Root", + "workload", "kv", "run", "mixed", + "--seconds", "100", + "--threads", "10", + "--cols", "5", + "--len", "200", + "--int-cols", "2", + "--key-cols", "3" + ] @classmethod def teardown_class(cls): cls.cluster.stop() - def test(self): - yatest.common.execute( - [ - yatest.common.binary_path(os.getenv("YDB_CLI_BINARY")), - "--verbose", - "--endpoint", "grpc://localhost:%d" % self.cluster.nodes[1].grpc_port, - "--database=/Root", - - "workload", "kv", "init", - - "--min-partitions", "1", - "--partition-size", "10", - "--auto-partition", "0", - "--init-upserts", "0", - "--cols", "5", - "--int-cols", "2", - "--key-cols", "3", - "--store", "column", - ], - wait=True - ) - - yatest.common.execute( - [ - yatest.common.binary_path(os.getenv("YDB_CLI_BINARY")), - "--verbose", - "--endpoint", "grpc://localhost:%d" % self.cluster.nodes[1].grpc_port, - "--database=/Root", - - "workload", "kv", "run", "mixed", - "--seconds", "100", - "--threads", "10", - - "--cols", "5", - "--len", "200", - "--int-cols", "2", - "--key-cols", "3" - ], - wait=True - ) + @pytest.mark.parametrize("store_type", ["row", "column"]) + def test(self, store_type): + init_command = self.init_command_prefix + init_command.extend([ + "--path", store_type, + "--store", store_type, + ]) + run_command = self.run_command_prefix + run_command.extend([ + "--path", store_type, + ]) + yatest.common.execute(init_command, wait=True) + yatest.common.execute(run_command, wait=True) From 9ca802592abc0388391c1d918e5b87e5a108c26e Mon Sep 17 00:00:00 2001 From: Maxim Yurchuk Date: Fri, 10 Jan 2025 15:21:35 +0300 Subject: [PATCH 06/20] try asan --- main.py | 31 +++++++++++++++++++++++++++++++ 1 file changed, 31 insertions(+) create mode 100644 main.py diff --git a/main.py b/main.py new file mode 100644 index 000000000000..b85cde21b8e9 --- /dev/null +++ b/main.py @@ -0,0 +1,31 @@ +t = int(input()) + +for i in range(t): + k, l1, r1, l2, r2 = [int(val) for val in input().split()] + + current_val = 1 + ans = 0 + # print(min_val, max_val) + while current_val <= 2 * (10**9): + n_l1 = l1 * current_val + n_r1 = r1 * current_val + # print("**", current_val) + l = max(n_l1, l2) + r = min(n_r1, r2) + r += 1 + if (l <= r): + if (l % current_val): + l -= l % current_val + l += current_val + + if (r % current_val): + r -= r % current_val + r += current_val + + d = (r-l) // current_val + # print("*", l, r, current_val, d) + ans += d + + current_val *= k + + print(ans) From d89ef87baffc7a727f82d0df432ae487a090af2d Mon Sep 17 00:00:00 2001 From: Maxim Yurchuk Date: Fri, 10 Jan 2025 19:20:22 +0300 Subject: [PATCH 07/20] fix --- ydb/tests/workloads/kv/tests/test_workload.py | 2 +- ydb/tests/workloads/kv/tests/ya.make | 7 +++---- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/ydb/tests/workloads/kv/tests/test_workload.py b/ydb/tests/workloads/kv/tests/test_workload.py index 13758d7dddb1..7cf2d1c927ac 100644 --- a/ydb/tests/workloads/kv/tests/test_workload.py +++ b/ydb/tests/workloads/kv/tests/test_workload.py @@ -48,7 +48,7 @@ def setup_class(cls): def teardown_class(cls): cls.cluster.stop() - @pytest.mark.parametrize("store_type", ["row", "column"]) + @pytest.mark.parametrize("store_type", ["column"]) def test(self, store_type): init_command = self.init_command_prefix init_command.extend([ diff --git a/ydb/tests/workloads/kv/tests/ya.make b/ydb/tests/workloads/kv/tests/ya.make index d99f532e7132..7dd67a33aae0 100644 --- a/ydb/tests/workloads/kv/tests/ya.make +++ b/ydb/tests/workloads/kv/tests/ya.make @@ -1,4 +1,3 @@ -IF (NOT SANITIZER_TYPE AND NOT WITH_VALGRIND) PY3TEST() ENV(YDB_DRIVER_BINARY="ydb/apps/ydbd/ydbd") ENV(YDB_CLI_BINARY="ydb/apps/ydb/ydb") @@ -10,10 +9,12 @@ TEST_SRCS( ) IF (SANITIZER_TYPE) + SIZE(MEDIUM) REQUIREMENTS(ram:32) +ELSE() + SIZE(MEDIUM) ENDIF() -SIZE(MEDIUM) DEPENDS( ydb/apps/ydbd @@ -26,5 +27,3 @@ PEERDIR( END() - -ENDIF() From e0dea2d90b59475f84a2edfaf7201d99d32a4bc4 Mon Sep 17 00:00:00 2001 From: Maxim Yurchuk Date: Fri, 10 Jan 2025 19:21:45 +0300 Subject: [PATCH 08/20] del --- main.py | 31 ------------------------------- 1 file changed, 31 deletions(-) delete mode 100644 main.py diff --git a/main.py b/main.py deleted file mode 100644 index b85cde21b8e9..000000000000 --- a/main.py +++ /dev/null @@ -1,31 +0,0 @@ -t = int(input()) - -for i in range(t): - k, l1, r1, l2, r2 = [int(val) for val in input().split()] - - current_val = 1 - ans = 0 - # print(min_val, max_val) - while current_val <= 2 * (10**9): - n_l1 = l1 * current_val - n_r1 = r1 * current_val - # print("**", current_val) - l = max(n_l1, l2) - r = min(n_r1, r2) - r += 1 - if (l <= r): - if (l % current_val): - l -= l % current_val - l += current_val - - if (r % current_val): - r -= r % current_val - r += current_val - - d = (r-l) // current_val - # print("*", l, r, current_val, d) - ans += d - - current_val *= k - - print(ans) From a0e134c5eb8abe13428c5d3fc5de368aa6501676 Mon Sep 17 00:00:00 2001 From: Maxim Yurchuk Date: Tue, 14 Jan 2025 12:37:45 +0300 Subject: [PATCH 09/20] wip log workload --- .../abstract/workload_query_generator.h | 1 + .../workload/log_writer/log_writer.cpp | 390 ++++++++++++++++++ ydb/library/workload/log_writer/log_writer.h | 111 +++++ ydb/library/workload/log_writer/registrar.cpp | 12 + ydb/library/workload/log_writer/ya.make | 14 + ydb/library/workload/ya.make | 2 + .../log_writer/tests/test_workload.py | 60 +++ ydb/tests/workloads/log_writer/tests/ya.make | 29 ++ ydb/tests/workloads/log_writer/ya.make | 4 + ydb/tests/workloads/ya.make | 1 + 10 files changed, 624 insertions(+) create mode 100644 ydb/library/workload/log_writer/log_writer.cpp create mode 100644 ydb/library/workload/log_writer/log_writer.h create mode 100644 ydb/library/workload/log_writer/registrar.cpp create mode 100644 ydb/library/workload/log_writer/ya.make create mode 100644 ydb/tests/workloads/log_writer/tests/test_workload.py create mode 100644 ydb/tests/workloads/log_writer/tests/ya.make create mode 100644 ydb/tests/workloads/log_writer/ya.make diff --git a/ydb/library/workload/abstract/workload_query_generator.h b/ydb/library/workload/abstract/workload_query_generator.h index 7181cf30c15c..4ddc6c8d3dfc 100644 --- a/ydb/library/workload/abstract/workload_query_generator.h +++ b/ydb/library/workload/abstract/workload_query_generator.h @@ -35,6 +35,7 @@ struct TQueryInfo { TString TablePath; std::optional KeyToRead; std::optional AlterTable; + std::function TableOperation; std::optional> ReadRowsResultCallback; std::optional> DataQueryResultCallback; diff --git a/ydb/library/workload/log_writer/log_writer.cpp b/ydb/library/workload/log_writer/log_writer.cpp new file mode 100644 index 000000000000..e9b003bb09ab --- /dev/null +++ b/ydb/library/workload/log_writer/log_writer.cpp @@ -0,0 +1,390 @@ +#include "log_writer.h" +#include +#include +#include + +#include + +#include +#include +#include +#include +#include +#include +#include +#include + +namespace NYdbWorkload { + +namespace NLogWriter { + +using TRow = TLogWriterWorkloadGenerator::TRow; + +void Fail() { + // Note: sleep helps to detect more fails + std::this_thread::sleep_for(std::chrono::seconds(3)); + Y_ABORT(); +} + +void AddResultSet(const NYdb::TResultSet& resultSet, TVector& rows) { + NYdb::TResultSetParser parser(resultSet); + while (parser.TryNextRow()) { + TRow row; + + for (size_t col = 0; col < parser.ColumnsCount(); col++) { + auto& valueParser = parser.ColumnParser(col); + bool optional = valueParser.GetKind() == NYdb::TTypeParser::ETypeKind::Optional; + if (optional) { + valueParser.OpenOptional(); + } + if (valueParser.GetPrimitiveType() == NYdb::EPrimitiveType::Uint64) { + row.Ints.push_back(valueParser.GetUint64()); + } else { + row.Strings.push_back(valueParser.GetString()); + } + if (optional) { + valueParser.CloseOptional(); + } + } + + rows.push_back(std::move(row)); + } +} + +void VerifyRows(const TRow& checkRow, const TVector& readRows, TString message) { + if (readRows.empty()) { + Cerr << "Expected to have " << checkRow.ToString() + << " but got empty " + << message + << Endl; + + Fail(); + } + + if (readRows.size() > 1) { + Cerr << "Expected to have " << checkRow.ToString() + << " but got " << readRows.size() << " rows " + << message + << Endl; + + for (auto r : readRows) { + Cerr << r.ToString() << Endl; + } + + Fail(); + } + + if (readRows[0] != checkRow) { + Cerr << "Expected to have " << checkRow.ToString() + << " but got " << readRows[0].ToString() << " " + << message + << Endl; + + Fail(); + } else { + // Cerr << "OK " << checkRow.ToString() << " " << message << Endl; + } +} + + +TLogWriterWorkloadGenerator::TLogWriterWorkloadGenerator(const TLogWriterWorkloadParams* params) + : TBase(params) + , TotalColumnsCnt(1 + Params.IntColumnsCnt + Params.StrColumnsCnt) +{ + Y_ABORT_UNLESS(TotalColumnsCnt >= Params.KeyColumnsCnt); +} + +std::string TLogWriterWorkloadGenerator::GetDDLQueries() const { + std::stringstream ss; + + ss << "--!syntax_v1\n"; + ss << "CREATE TABLE `" << Params.DbPath << "/" << Params.TableName << "`("; + + for (size_t i = 0; i < TotalColumnsCnt; ++i) { + if (i == 0) + { + ss << "ts Timestamp"; + + } + else if (i < Params.IntColumnsCnt + 1) { + ss << "c" << i << " Uint64"; + } + else + { + ss << "c" << i << " String"; + } + if (i < Params.KeyColumnsCnt && Params.GetStoreType() == TLogWriterWorkloadParams::EStoreType::Column) { + ss << " NOT NULL"; + } + ss << ", "; + } + + ss << "PRIMARY KEY("; + ss << "ts, "; + for (size_t i = 1; i < Params.KeyColumnsCnt; ++i) { + ss << "c" << i; + if (i + 1 < Params.KeyColumnsCnt) { + ss << ", "; + } + } + ss << ")) WITH ("; + + ss << "TTL = Interval(\"PT7H\") ON ts, "; + + switch (Params.GetStoreType()) { + + case TLogWriterWorkloadParams::EStoreType::Row: + ss << "STORE = ROW, "; + if (Params.PartitionsByLoad) { + ss << "AUTO_PARTITIONING_BY_LOAD = ENABLED, "; + } + ss << "UNIFORM_PARTITIONS = " << Params.MinPartitions << ", "; + ss << "AUTO_PARTITIONING_MAX_PARTITIONS_COUNT = " << Max(Params.MinPartitions, Params.MaxPartitions) << ", "; + ss << "AUTO_PARTITIONING_PARTITION_SIZE_MB = " << Params.PartitionSizeMb << ", "; + break; + case TLogWriterWorkloadParams::EStoreType::Column: + ss << "STORE = COLUMN, "; + break; + default: + throw yexception() << "Unsupported store type: " << Params.GetStoreType(); + } + ss << "AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = " << Params.MinPartitions << ")"; + return ss.str(); +} + +TQueryInfoList TLogWriterWorkloadGenerator::GetWorkload(int type) { + switch (static_cast(type)) { + case EType::Upsert: + return Upsert(GenerateRandomRows()); + case EType::BulkUpsert: + return BulkUpsert(GenerateRandomRows()); + default: + return TQueryInfoList(); + } +} + + +TVector TLogWriterWorkloadGenerator::GetSupportedWorkloadTypes() const { + TVector result; + result.emplace_back(static_cast(EType::Upsert), "upsert", "Upsert random rows into table near current ts"); + result.emplace_back(static_cast(EType::BulkUpsert), "bulk_upsert", "Bulk upsert random rows into table near current ts"); + return result; +} + +TQueryInfoList TLogWriterWorkloadGenerator::WriteRows(TString operation, TVector&& rows) { + std::stringstream ss; + + NYdb::TParamsBuilder paramsBuilder; + + ss << "--!syntax_v1\n"; + + for (size_t row = 0; row < Params.RowsCnt; ++row) { + for (size_t col = 0; col < TotalColumnsCnt; ++col) { + TString cname = "$c" + std::to_string(row) + "_" + std::to_string(col); + if (col == 0) + { + ss << "DECLARE " << cname << " AS Timestamp;\n"; + paramsBuilder.AddParam(cname).Timestamp(rows[row].Ts).Build(); + } + else if (col < Params.IntColumnsCnt + 1) { + ss << "DECLARE " << cname << " AS Uint64;\n"; + paramsBuilder.AddParam(cname).Uint64(rows[row].Ints[col - 1]).Build(); + } else { + ss << "DECLARE " << cname << " AS String;\n"; + paramsBuilder.AddParam(cname).String(rows[row].Strings[col - Params.IntColumnsCnt - 1]).Build(); + } + } + } + + ss << operation << " INTO `" << Params.TableName << "` ("; + + for (size_t col = 0; col < TotalColumnsCnt; ++col) { + if (col != 0) + { + ss << "c" << col; + } + else + { + ss << "ts"; + } + if (col + 1 < TotalColumnsCnt) { + ss << ", "; + } + } + + ss << ") VALUES "; + + for (size_t row = 0; row < Params.RowsCnt; ++row) { + ss << "("; + + for (size_t col = 0; col < TotalColumnsCnt; ++col) { + ss << "$c" << row << "_" << col; + if (col + 1 < TotalColumnsCnt) { + ss << ", "; + } + } + + ss << ")"; + + if (row + 1 < Params.RowsCnt) { + ss << ", "; + } + } + auto params = paramsBuilder.Build(); + + return TQueryInfoList(1, TQueryInfo(ss.str(), std::move(params))); +} + +TQueryInfoList TLogWriterWorkloadGenerator::Upsert(TVector&& rows) { + return WriteRows("UPSERT", std::move(rows)); +} + +TQueryInfoList TLogWriterWorkloadGenerator::BulkUpsert(TVector&& rows) { + NYdb::TValueBuilder valueBuilder; + valueBuilder.BeginList(); + for (const TRow& row : rows) { + auto &listItem = valueBuilder.AddListItem(); + listItem.BeginStruct(); + for (size_t col = 0; col < TotalColumnsCnt; ++col) { + if (col == 0) + { + listItem.AddMember("ts").Timestamp(row.Ts); + } + else if (col < Params.IntColumnsCnt + 1) { + listItem.AddMember(std::format("c{}", col)).Uint64(row.Ints[col-1]); + } else { + listItem.AddMember(std::format("c{}", col)).String(row.Strings[col - Params.IntColumnsCnt - 1]); + } + } + listItem.EndStruct(); + } + valueBuilder.EndList(); + TString table_path = Params.DbPath + "/" + Params.TableName; + NYdb::TValue rowsValue = valueBuilder.Build(); + auto bulkUpsertOperation = [table_path, rowsValue](NYdb::NTable::TTableClient& tableClient) { + auto r = rowsValue; + auto status = tableClient.BulkUpsert(table_path, std::move(r)); + return status.GetValueSync(); + }; + TQueryInfo queryInfo; + queryInfo.TableOperation = bulkUpsertOperation; + return TQueryInfoList(1, std::move(queryInfo)); +} + + +TQueryInfoList TLogWriterWorkloadGenerator::GetInitialData() { + TQueryInfoList res; + for (size_t i = 0; i < Params.InitRowCount; ++i) { + auto queryInfos = Upsert(GenerateRandomRows()); + res.insert(res.end(), queryInfos.begin(), queryInfos.end()); + } + + return res; +} + +TVector TLogWriterWorkloadGenerator::GetCleanPaths() const { + return { Params.TableName }; +} + +TVector TLogWriterWorkloadGenerator::GenerateRandomRows() { + TVector result(Params.RowsCnt); + + for (size_t row = 0; row < Params.RowsCnt; ++row) { + result[row].Ts = TInstant::Now(); + result[row].Ints.resize(Params.IntColumnsCnt); + result[row].Strings.resize(Params.StrColumnsCnt); + + for (size_t col = 0; col < Params.IntColumnsCnt; ++col) { + ui64 val = RandomNumber(); + result[row].Ints[col] = val; + } + + for (size_t col = 0; col < Params.StrColumnsCnt; ++col) { + TString val; + val = TString(Params.StringLen, '_'); + for (size_t i = 0; i < Params.StringLen; i++) { + val[i] = (char)('a' + RandomNumber(26)); + } + result[row].Strings[col] = val; + } + } + + return result; +} + +void TLogWriterWorkloadParams::ConfigureOpts(NLastGetopt::TOpts& opts, const ECommandType commandType, int workloadType) { + opts.AddLongOption('p', "path", "Path where benchmark tables are located") + .Optional() + .DefaultValue(TableName) + .Handler1T([this](TStringBuf arg) { + while(arg.SkipPrefix("/")); + while(arg.ChopSuffix("/")); + TableName = arg; + }); + switch (commandType) { + case TWorkloadParams::ECommandType::Init: + opts.AddLongOption("init-upserts", "count of upserts need to create while table initialization") + .DefaultValue((ui64)LogWriterWorkloadConstants::INIT_ROW_COUNT).StoreResult(&InitRowCount); + opts.AddLongOption("min-partitions", "Minimum partitions for tables.") + .DefaultValue((ui64)LogWriterWorkloadConstants::MIN_PARTITIONS).StoreResult(&MinPartitions); + opts.AddLongOption("max-partitions", "Maximum partitions for tables.") + .DefaultValue((ui64)LogWriterWorkloadConstants::MAX_PARTITIONS).StoreResult(&MaxPartitions); + opts.AddLongOption("partition-size", "Maximum partition size in megabytes (AUTO_PARTITIONING_PARTITION_SIZE_MB).") + .DefaultValue((ui64)LogWriterWorkloadConstants::PARTITION_SIZE_MB).StoreResult(&PartitionSizeMb); + opts.AddLongOption("auto-partition", "Enable auto partitioning by load.") + .DefaultValue((ui64)LogWriterWorkloadConstants::PARTITIONS_BY_LOAD).StoreResult(&PartitionsByLoad); + opts.AddLongOption("len", "String len") + .DefaultValue((ui64)LogWriterWorkloadConstants::STRING_LEN).StoreResult(&StringLen); + opts.AddLongOption("int-cols", "Number of int columns") + .DefaultValue((ui64)LogWriterWorkloadConstants::INT_COLUMNS_CNT).StoreResult(&IntColumnsCnt); + opts.AddLongOption("str-cols", "Number of columns") + .DefaultValue((ui64)LogWriterWorkloadConstants::STR_COLUMNS_CNT).StoreResult(&StrColumnsCnt); + opts.AddLongOption("key-cols", "Number of key columns") + .DefaultValue((ui64)LogWriterWorkloadConstants::KEY_COLUMNS_CNT).StoreResult(&KeyColumnsCnt); + opts.AddLongOption("rows", "Number of rows") + .DefaultValue((ui64)LogWriterWorkloadConstants::ROWS_CNT).StoreResult(&RowsCnt); + opts.AddLongOption("store", "Storage type." + " Options: row, column\n" + " row - use row-based storage engine;\n" + " column - use column-based storage engine.") + .DefaultValue(StoreType) + .Handler1T([this](TStringBuf arg) { + const auto l = to_lower(TString(arg)); + if (!TryFromString(arg, StoreType)) { + throw yexception() << "Ivalid store type: " << arg; + } + }); + break; + case TWorkloadParams::ECommandType::Run: + opts.AddLongOption("str-cols", "Number of int columns") + .DefaultValue((ui64)LogWriterWorkloadConstants::INT_COLUMNS_CNT).StoreResult(&StrColumnsCnt); + opts.AddLongOption("int-cols", "Number of int columns") + .DefaultValue((ui64)LogWriterWorkloadConstants::INT_COLUMNS_CNT).StoreResult(&IntColumnsCnt); + opts.AddLongOption("key-cols", "Number of key columns") + .DefaultValue((ui64)LogWriterWorkloadConstants::KEY_COLUMNS_CNT).StoreResult(&KeyColumnsCnt); + switch (static_cast(workloadType)) { + case TLogWriterWorkloadGenerator::EType::BulkUpsert: + case TLogWriterWorkloadGenerator::EType::Upsert: + opts.AddLongOption("len", "String len") + .DefaultValue((ui64)LogWriterWorkloadConstants::STRING_LEN).StoreResult(&StringLen); + opts.AddLongOption("rows", "Number of rows to upsert") + .DefaultValue((ui64)LogWriterWorkloadConstants::ROWS_CNT).StoreResult(&RowsCnt); + break; + } + break; + default: + break; + } +} + +THolder TLogWriterWorkloadParams::CreateGenerator() const { + return MakeHolder(this); +} + +TString TLogWriterWorkloadParams::GetWorkloadName() const { + return "Log Writer"; +} + +} // namespace NYdbWorkload + +} // namespace NLogWriter diff --git a/ydb/library/workload/log_writer/log_writer.h b/ydb/library/workload/log_writer/log_writer.h new file mode 100644 index 000000000000..bd057141130d --- /dev/null +++ b/ydb/library/workload/log_writer/log_writer.h @@ -0,0 +1,111 @@ +#pragma once + +#include + +#include +#include +#include + +namespace NYdbWorkload { + +namespace NLogWriter { + +enum LogWriterWorkloadConstants : ui64 { + MIN_PARTITIONS = 40, + MAX_PARTITIONS = 1000, + PARTITION_SIZE_MB = 2000, + INIT_ROW_COUNT = 1000, + STRING_LEN = 8, + STR_COLUMNS_CNT = 1, + INT_COLUMNS_CNT = 1, + KEY_COLUMNS_CNT = 1, + ROWS_CNT = 1, + PARTITIONS_BY_LOAD = true, + + MIXED_CHANGE_PARTITIONS_SIZE = false, + MIXED_DO_READ_ROWS = false, + MIXED_DO_SELECT = true, + + STALE_RO = false, +}; + +class TLogWriterWorkloadParams : public TWorkloadParams { +public: + enum class EStoreType { + Row /* "row" */, + Column /* "column" */, + }; + + void ConfigureOpts(NLastGetopt::TOpts& opts, const ECommandType commandType, int workloadType) override; + THolder CreateGenerator() const override; + TString GetWorkloadName() const override; + ui64 MinPartitions = LogWriterWorkloadConstants::MIN_PARTITIONS; + ui64 MaxPartitions = LogWriterWorkloadConstants::MAX_PARTITIONS; + ui64 PartitionSizeMb = LogWriterWorkloadConstants::PARTITION_SIZE_MB; + ui64 InitRowCount = LogWriterWorkloadConstants::INIT_ROW_COUNT; + ui64 StringLen = LogWriterWorkloadConstants::STRING_LEN; + ui64 StrColumnsCnt = LogWriterWorkloadConstants::STR_COLUMNS_CNT; + ui64 IntColumnsCnt = LogWriterWorkloadConstants::INT_COLUMNS_CNT; + ui64 KeyColumnsCnt = LogWriterWorkloadConstants::KEY_COLUMNS_CNT; + ui64 RowsCnt = LogWriterWorkloadConstants::ROWS_CNT; + bool PartitionsByLoad = LogWriterWorkloadConstants::PARTITIONS_BY_LOAD; + + std::string TableName = "log_writer_test"; + + bool StaleRO = LogWriterWorkloadConstants::STALE_RO; + YDB_READONLY(EStoreType, StoreType, EStoreType::Row); +}; + +class TLogWriterWorkloadGenerator final: public TWorkloadQueryGeneratorBase { +public: + using TBase = TWorkloadQueryGeneratorBase; + struct TRow { + TInstant Ts; + TVector Ints; + TVector Strings; + + TString ToString() const { + std::stringstream ss; + ss << "( "; + for (auto i : Ints) { + ss << i << " "; + } + for (auto s : Strings) { + ss << s << " "; + } + ss << ")"; + return ss.str(); + } + + bool operator == (const TRow &other) const { + return Ts == other.Ts && Ints == other.Ints && Strings == other.Strings; + } + }; + TLogWriterWorkloadGenerator(const TLogWriterWorkloadParams* params); + + std::string GetDDLQueries() const override; + + TQueryInfoList GetInitialData() override; + + TVector GetCleanPaths() const override; + + TQueryInfoList GetWorkload(int type) override; + TVector GetSupportedWorkloadTypes() const override; + + enum class EType { + Upsert, + BulkUpsert, + }; + +private: + TQueryInfoList WriteRows(TString operation, TVector&& rows); + TQueryInfoList Upsert(TVector&& rows); + TQueryInfoList BulkUpsert(TVector&& rows); + TVector GenerateRandomRows(); + + const ui64 TotalColumnsCnt; +}; + +} // namespace NYdbWorkload + +} // namespace NLogWriter \ No newline at end of file diff --git a/ydb/library/workload/log_writer/registrar.cpp b/ydb/library/workload/log_writer/registrar.cpp new file mode 100644 index 000000000000..1db206dcfc75 --- /dev/null +++ b/ydb/library/workload/log_writer/registrar.cpp @@ -0,0 +1,12 @@ +#include "log_writer.h" +#include + +namespace NYdbWorkload { + +namespace NLogWriter { + +TWorkloadFactory::TRegistrator Registrar("log_writer"); + +} // namespace NYdbWorkload + +} // namespace NLogWriter \ No newline at end of file diff --git a/ydb/library/workload/log_writer/ya.make b/ydb/library/workload/log_writer/ya.make new file mode 100644 index 000000000000..9f8eb4cb569e --- /dev/null +++ b/ydb/library/workload/log_writer/ya.make @@ -0,0 +1,14 @@ +LIBRARY() + +SRCS( + GLOBAL registrar.cpp + log_writer.cpp +) + +PEERDIR( + ydb/library/workload/abstract +) + +GENERATE_ENUM_SERIALIZATION_WITH_HEADER(log_writer.h) + +END() diff --git a/ydb/library/workload/ya.make b/ydb/library/workload/ya.make index 8ebc3191616a..95ffb7bdc865 100644 --- a/ydb/library/workload/ya.make +++ b/ydb/library/workload/ya.make @@ -4,6 +4,7 @@ PEERDIR( ydb/library/workload/abstract ydb/library/workload/clickbench ydb/library/workload/kv + ydb/library/workload/log_writer ydb/library/workload/stock ydb/library/workload/tpcds ydb/library/workload/tpch @@ -15,6 +16,7 @@ RECURSE( abstract benchmark_base clickbench + dev_log kv stock tpc_base diff --git a/ydb/tests/workloads/log_writer/tests/test_workload.py b/ydb/tests/workloads/log_writer/tests/test_workload.py new file mode 100644 index 000000000000..6bd0e09fc56e --- /dev/null +++ b/ydb/tests/workloads/log_writer/tests/test_workload.py @@ -0,0 +1,60 @@ +# -*- coding: utf-8 -*- +import os + +import pytest + +import yatest + +from ydb.tests.library.harness.kikimr_runner import KiKiMR +from ydb.tests.library.harness.kikimr_config import KikimrConfigGenerator +from ydb.tests.library.common.types import Erasure + + +class TestYdbKvWorkload(object): + @classmethod + def setup_class(cls): + cls.cluster = KiKiMR(KikimrConfigGenerator(erasure=Erasure.MIRROR_3_DC)) + cls.cluster.start() + cls.init_command_prefix = [ + yatest.common.binary_path(os.getenv("YDB_CLI_BINARY")), + "--verbose", + "--endpoint", "grpc://localhost:%d" % cls.cluster.nodes[1].grpc_port, + "--database=/Root", + "workload", "log_writer", "init", + "--min-partitions", "100", + "--partition-size", "10", + "--auto-partition", "0", + "--int-cols", "10", + "--key-cols", "10", + ] + + cls.run_command_prefix = [ + yatest.common.binary_path(os.getenv("YDB_CLI_BINARY")), + "--verbose", + "--endpoint", "grpc://localhost:%d" % cls.cluster.nodes[1].grpc_port, + "--database=/Root", + "workload", "log_writer", "run", "bulk_upsert", + "--seconds", "10", + "--threads", "10", + "--len", "200", + "--int-cols", "10", + "--key-cols", "10", + ] + + @classmethod + def teardown_class(cls): + cls.cluster.stop() + + @pytest.mark.parametrize("store_type", ["row", "column"]) + def test(self, store_type): + init_command = self.init_command_prefix + init_command.extend([ + "--path", store_type, + "--store", store_type, + ]) + run_command = self.run_command_prefix + run_command.extend([ + "--path", store_type, + ]) + yatest.common.execute(init_command, wait=True) + yatest.common.execute(run_command, wait=True) diff --git a/ydb/tests/workloads/log_writer/tests/ya.make b/ydb/tests/workloads/log_writer/tests/ya.make new file mode 100644 index 000000000000..7dd67a33aae0 --- /dev/null +++ b/ydb/tests/workloads/log_writer/tests/ya.make @@ -0,0 +1,29 @@ +PY3TEST() +ENV(YDB_DRIVER_BINARY="ydb/apps/ydbd/ydbd") +ENV(YDB_CLI_BINARY="ydb/apps/ydb/ydb") +ENV(YDB_ERASURE=mirror_3_dc) +ENV(YDB_USE_IN_MEMORY_PDISKS=true) + +TEST_SRCS( + test_workload.py +) + +IF (SANITIZER_TYPE) + SIZE(MEDIUM) + REQUIREMENTS(ram:32) +ELSE() + SIZE(MEDIUM) +ENDIF() + + +DEPENDS( + ydb/apps/ydbd + ydb/apps/ydb +) + +PEERDIR( + ydb/tests/library +) + + +END() diff --git a/ydb/tests/workloads/log_writer/ya.make b/ydb/tests/workloads/log_writer/ya.make new file mode 100644 index 000000000000..d850faba6b2e --- /dev/null +++ b/ydb/tests/workloads/log_writer/ya.make @@ -0,0 +1,4 @@ +RECURSE_FOR_TESTS( + tests +) + diff --git a/ydb/tests/workloads/ya.make b/ydb/tests/workloads/ya.make index 9966ad9e4c10..a2783576d52d 100644 --- a/ydb/tests/workloads/ya.make +++ b/ydb/tests/workloads/ya.make @@ -1,5 +1,6 @@ RECURSE( kv + log_writer olap_workload simple_queue statistics_workload From 95c1dd119560fea20cd90bb85b4cbd953391e39d Mon Sep 17 00:00:00 2001 From: Maxim Yurchuk Date: Tue, 14 Jan 2025 12:39:33 +0300 Subject: [PATCH 10/20] fixes --- ydb/library/workload/log_writer/log_writer.cpp | 5 ++--- ydb/library/workload/log_writer/log_writer.h | 4 ++-- ydb/library/workload/log_writer/registrar.cpp | 4 ++-- 3 files changed, 6 insertions(+), 7 deletions(-) diff --git a/ydb/library/workload/log_writer/log_writer.cpp b/ydb/library/workload/log_writer/log_writer.cpp index e9b003bb09ab..c2eac1c0662c 100644 --- a/ydb/library/workload/log_writer/log_writer.cpp +++ b/ydb/library/workload/log_writer/log_writer.cpp @@ -138,7 +138,6 @@ std::string TLogWriterWorkloadGenerator::GetDDLQueries() const { if (Params.PartitionsByLoad) { ss << "AUTO_PARTITIONING_BY_LOAD = ENABLED, "; } - ss << "UNIFORM_PARTITIONS = " << Params.MinPartitions << ", "; ss << "AUTO_PARTITIONING_MAX_PARTITIONS_COUNT = " << Max(Params.MinPartitions, Params.MaxPartitions) << ", "; ss << "AUTO_PARTITIONING_PARTITION_SIZE_MB = " << Params.PartitionSizeMb << ", "; break; @@ -385,6 +384,6 @@ TString TLogWriterWorkloadParams::GetWorkloadName() const { return "Log Writer"; } -} // namespace NYdbWorkload - } // namespace NLogWriter + +} // namespace NYdbWorkload diff --git a/ydb/library/workload/log_writer/log_writer.h b/ydb/library/workload/log_writer/log_writer.h index bd057141130d..dc18e955a215 100644 --- a/ydb/library/workload/log_writer/log_writer.h +++ b/ydb/library/workload/log_writer/log_writer.h @@ -106,6 +106,6 @@ class TLogWriterWorkloadGenerator final: public TWorkloadQueryGeneratorBase Registrar("log_writer"); -} // namespace NYdbWorkload +} // namespace NLogWriter -} // namespace NLogWriter \ No newline at end of file +} // namespace NYdbWorkload From bf835d3fce364034204ff7bd262d86bbb616173a Mon Sep 17 00:00:00 2001 From: Maxim Yurchuk Date: Tue, 14 Jan 2025 12:39:51 +0300 Subject: [PATCH 11/20] Revert "fix" This reverts commit d89ef87baffc7a727f82d0df432ae487a090af2d. --- ydb/tests/workloads/kv/tests/ya.make | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/ydb/tests/workloads/kv/tests/ya.make b/ydb/tests/workloads/kv/tests/ya.make index 7dd67a33aae0..d99f532e7132 100644 --- a/ydb/tests/workloads/kv/tests/ya.make +++ b/ydb/tests/workloads/kv/tests/ya.make @@ -1,3 +1,4 @@ +IF (NOT SANITIZER_TYPE AND NOT WITH_VALGRIND) PY3TEST() ENV(YDB_DRIVER_BINARY="ydb/apps/ydbd/ydbd") ENV(YDB_CLI_BINARY="ydb/apps/ydb/ydb") @@ -9,12 +10,10 @@ TEST_SRCS( ) IF (SANITIZER_TYPE) - SIZE(MEDIUM) REQUIREMENTS(ram:32) -ELSE() - SIZE(MEDIUM) ENDIF() +SIZE(MEDIUM) DEPENDS( ydb/apps/ydbd @@ -27,3 +26,5 @@ PEERDIR( END() + +ENDIF() From 27e6e0201c2c6433e5d5b2b9849033eb96107385 Mon Sep 17 00:00:00 2001 From: Maxim Yurchuk Date: Tue, 14 Jan 2025 13:09:17 +0300 Subject: [PATCH 12/20] fix --- ydb/library/workload/ya.make | 2 +- .../lib/ydb_cli/commands/ydb_workload.cpp | 23 +++++++++++-------- 2 files changed, 14 insertions(+), 11 deletions(-) diff --git a/ydb/library/workload/ya.make b/ydb/library/workload/ya.make index 95ffb7bdc865..4a185abab8e3 100644 --- a/ydb/library/workload/ya.make +++ b/ydb/library/workload/ya.make @@ -16,8 +16,8 @@ RECURSE( abstract benchmark_base clickbench - dev_log kv + log_writer stock tpc_base tpcds diff --git a/ydb/public/lib/ydb_cli/commands/ydb_workload.cpp b/ydb/public/lib/ydb_cli/commands/ydb_workload.cpp index 7b711ff5440b..7717c9c6b2f9 100644 --- a/ydb/public/lib/ydb_cli/commands/ydb_workload.cpp +++ b/ydb/public/lib/ydb_cli/commands/ydb_workload.cpp @@ -123,12 +123,13 @@ void TWorkloadCommand::PrepareForRun(TConfig& config) { driverConfig.UseSecureConnection(config.CaCerts); } Driver = std::make_unique(NYdb::TDriver(driverConfig)); + auto tableClientSettings = NTable::TClientSettings() + .SessionPoolSettings( + NTable::TSessionPoolSettings() + .MaxActiveSessions(10+Threads)); + TableClient = std::make_unique(*Driver, tableClientSettings); if (QueryExecuterType == "data") { - auto tableClientSettings = NTable::TClientSettings() - .SessionPoolSettings( - NTable::TSessionPoolSettings() - .MaxActiveSessions(10+Threads)); - TableClient = std::make_unique(*Driver, tableClientSettings); + // nothing to do } else if (QueryExecuterType == "generic") { auto queryClientSettings = NQuery::TClientSettings() .SessionPoolSettings( @@ -153,7 +154,7 @@ void TWorkloadCommand::WorkerFn(int taskId, NYdbWorkload::IWorkloadQueryGenerato auto runTableClient = [this, &queryInfo, &dataQuerySettings, &retryCount] (NYdb::NTable::TSession session) -> NYdb::TStatus { if (!TableClient) { - Y_FAIL_S("Only data query executer type supported."); + Y_FAIL_S("TableClient is not inited ."); } ++retryCount; if (queryInfo.AlterTable) { @@ -168,6 +169,9 @@ void TWorkloadCommand::WorkerFn(int taskId, NYdbWorkload::IWorkloadQueryGenerato queryInfo.ReadRowsResultCallback.value()(result); } return result; + } else if (queryInfo.TableOperation) { + auto result = queryInfo.TableOperation(*TableClient); + return result; } else { auto mode = queryInfo.UseStaleRO ? NYdb::NTable::TTxSettings::StaleRO() : NYdb::NTable::TTxSettings::SerializableRW(); auto result = session.ExecuteDataQuery(queryInfo.Query.c_str(), @@ -183,7 +187,7 @@ void TWorkloadCommand::WorkerFn(int taskId, NYdbWorkload::IWorkloadQueryGenerato auto runQueryClient = [this, &queryInfo, &genericQuerySettings, &retryCount] (NYdb::NQuery::TSession session) -> NYdb::NQuery::TAsyncExecuteQueryResult { if (!QueryClient) { - Y_FAIL_S("Only generic query executer type supported."); + Y_FAIL_S("QueryClient is not inited."); } ++retryCount; if (queryInfo.AlterTable) { @@ -200,8 +204,7 @@ void TWorkloadCommand::WorkerFn(int taskId, NYdbWorkload::IWorkloadQueryGenerato }; auto runQuery = [this, &runQueryClient, &runTableClient, &queryInfo]() -> NYdb::TStatus { - Y_ENSURE_BT(TableClient || QueryClient); - if (TableClient) { + if (QueryExecuterType == "data") { return TableClient->RetryOperationSync(runTableClient); } else { auto result = QueryClient->RetryQuery(runQueryClient).GetValueSync(); @@ -233,7 +236,7 @@ void TWorkloadCommand::WorkerFn(int taskId, NYdbWorkload::IWorkloadQueryGenerato } queryInfo = *it; - auto status = runQuery(); + auto status = queryInfo.TableOperation ? TableClient->RetryOperationSync(runTableClient) : runQuery(); if (status.IsSuccess()) { TotalQueries++; } else { From 560fea288cc388597c285080788f227aa42c99d8 Mon Sep 17 00:00:00 2001 From: Maxim Yurchuk Date: Tue, 14 Jan 2025 15:14:07 +0300 Subject: [PATCH 13/20] fixes --- .../workload/log_writer/log_writer.cpp | 110 +++++------------- ydb/library/workload/log_writer/log_writer.h | 15 ++- 2 files changed, 36 insertions(+), 89 deletions(-) diff --git a/ydb/library/workload/log_writer/log_writer.cpp b/ydb/library/workload/log_writer/log_writer.cpp index c2eac1c0662c..56582eb341f1 100644 --- a/ydb/library/workload/log_writer/log_writer.cpp +++ b/ydb/library/workload/log_writer/log_writer.cpp @@ -20,76 +20,12 @@ namespace NLogWriter { using TRow = TLogWriterWorkloadGenerator::TRow; -void Fail() { - // Note: sleep helps to detect more fails - std::this_thread::sleep_for(std::chrono::seconds(3)); - Y_ABORT(); -} - -void AddResultSet(const NYdb::TResultSet& resultSet, TVector& rows) { - NYdb::TResultSetParser parser(resultSet); - while (parser.TryNextRow()) { - TRow row; - - for (size_t col = 0; col < parser.ColumnsCount(); col++) { - auto& valueParser = parser.ColumnParser(col); - bool optional = valueParser.GetKind() == NYdb::TTypeParser::ETypeKind::Optional; - if (optional) { - valueParser.OpenOptional(); - } - if (valueParser.GetPrimitiveType() == NYdb::EPrimitiveType::Uint64) { - row.Ints.push_back(valueParser.GetUint64()); - } else { - row.Strings.push_back(valueParser.GetString()); - } - if (optional) { - valueParser.CloseOptional(); - } - } - - rows.push_back(std::move(row)); - } -} - -void VerifyRows(const TRow& checkRow, const TVector& readRows, TString message) { - if (readRows.empty()) { - Cerr << "Expected to have " << checkRow.ToString() - << " but got empty " - << message - << Endl; - - Fail(); - } - - if (readRows.size() > 1) { - Cerr << "Expected to have " << checkRow.ToString() - << " but got " << readRows.size() << " rows " - << message - << Endl; - - for (auto r : readRows) { - Cerr << r.ToString() << Endl; - } - - Fail(); - } - - if (readRows[0] != checkRow) { - Cerr << "Expected to have " << checkRow.ToString() - << " but got " << readRows[0].ToString() << " " - << message - << Endl; - - Fail(); - } else { - // Cerr << "OK " << checkRow.ToString() << " " << message << Endl; - } -} - TLogWriterWorkloadGenerator::TLogWriterWorkloadGenerator(const TLogWriterWorkloadParams* params) : TBase(params) , TotalColumnsCnt(1 + Params.IntColumnsCnt + Params.StrColumnsCnt) + , RandomDevice() + , Mt19937(RandomDevice()) { Y_ABORT_UNLESS(TotalColumnsCnt >= Params.KeyColumnsCnt); } @@ -132,14 +68,8 @@ std::string TLogWriterWorkloadGenerator::GetDDLQueries() const { ss << "TTL = Interval(\"PT7H\") ON ts, "; switch (Params.GetStoreType()) { - case TLogWriterWorkloadParams::EStoreType::Row: ss << "STORE = ROW, "; - if (Params.PartitionsByLoad) { - ss << "AUTO_PARTITIONING_BY_LOAD = ENABLED, "; - } - ss << "AUTO_PARTITIONING_MAX_PARTITIONS_COUNT = " << Max(Params.MinPartitions, Params.MaxPartitions) << ", "; - ss << "AUTO_PARTITIONING_PARTITION_SIZE_MB = " << Params.PartitionSizeMb << ", "; break; case TLogWriterWorkloadParams::EStoreType::Column: ss << "STORE = COLUMN, "; @@ -147,12 +77,19 @@ std::string TLogWriterWorkloadGenerator::GetDDLQueries() const { default: throw yexception() << "Unsupported store type: " << Params.GetStoreType(); } + if (Params.PartitionsByLoad) { + ss << "AUTO_PARTITIONING_BY_LOAD = ENABLED, "; + } + ss << "AUTO_PARTITIONING_MAX_PARTITIONS_COUNT = " << Max(Params.MinPartitions, Params.MaxPartitions) << ", "; + ss << "AUTO_PARTITIONING_PARTITION_SIZE_MB = " << Params.PartitionSizeMb << ", "; ss << "AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = " << Params.MinPartitions << ")"; return ss.str(); } TQueryInfoList TLogWriterWorkloadGenerator::GetWorkload(int type) { switch (static_cast(type)) { + case EType::Insert: + return Insert(GenerateRandomRows()); case EType::Upsert: return Upsert(GenerateRandomRows()); case EType::BulkUpsert: @@ -165,6 +102,7 @@ TQueryInfoList TLogWriterWorkloadGenerator::GetWorkload(int type) { TVector TLogWriterWorkloadGenerator::GetSupportedWorkloadTypes() const { TVector result; + result.emplace_back(static_cast(EType::Upsert), "insert", "Insert random rows into table near current ts"); result.emplace_back(static_cast(EType::Upsert), "upsert", "Upsert random rows into table near current ts"); result.emplace_back(static_cast(EType::BulkUpsert), "bulk_upsert", "Bulk upsert random rows into table near current ts"); return result; @@ -230,10 +168,13 @@ TQueryInfoList TLogWriterWorkloadGenerator::WriteRows(TString operation, TVector } } auto params = paramsBuilder.Build(); - return TQueryInfoList(1, TQueryInfo(ss.str(), std::move(params))); } +TQueryInfoList TLogWriterWorkloadGenerator::Insert(TVector&& rows) { + return WriteRows("INSERT", std::move(rows)); +} + TQueryInfoList TLogWriterWorkloadGenerator::Upsert(TVector&& rows) { return WriteRows("UPSERT", std::move(rows)); } @@ -273,11 +214,6 @@ TQueryInfoList TLogWriterWorkloadGenerator::BulkUpsert(TVector&& rows) { TQueryInfoList TLogWriterWorkloadGenerator::GetInitialData() { TQueryInfoList res; - for (size_t i = 0; i < Params.InitRowCount; ++i) { - auto queryInfos = Upsert(GenerateRandomRows()); - res.insert(res.end(), queryInfos.begin(), queryInfos.end()); - } - return res; } @@ -288,8 +224,19 @@ TVector TLogWriterWorkloadGenerator::GetCleanPaths() const { TVector TLogWriterWorkloadGenerator::GenerateRandomRows() { TVector result(Params.RowsCnt); + std::normal_distribution normal_distribution_generator(0, static_cast(Params.TimestampStandardDeviation)); for (size_t row = 0; row < Params.RowsCnt; ++row) { result[row].Ts = TInstant::Now(); + i64 millisecondsDiff = 60*1000*normal_distribution_generator(Mt19937); + if (millisecondsDiff >= 0) // TDuration::MilliSeconds can't be negative for some reason... + { + result[row].Ts = result[row].Ts + TDuration::MilliSeconds(millisecondsDiff); + } + else + { + result[row].Ts = result[row].Ts - TDuration::MilliSeconds(-millisecondsDiff); + } + result[row].Ints.resize(Params.IntColumnsCnt); result[row].Strings.resize(Params.StrColumnsCnt); @@ -322,8 +269,6 @@ void TLogWriterWorkloadParams::ConfigureOpts(NLastGetopt::TOpts& opts, const ECo }); switch (commandType) { case TWorkloadParams::ECommandType::Init: - opts.AddLongOption("init-upserts", "count of upserts need to create while table initialization") - .DefaultValue((ui64)LogWriterWorkloadConstants::INIT_ROW_COUNT).StoreResult(&InitRowCount); opts.AddLongOption("min-partitions", "Minimum partitions for tables.") .DefaultValue((ui64)LogWriterWorkloadConstants::MIN_PARTITIONS).StoreResult(&MinPartitions); opts.AddLongOption("max-partitions", "Maximum partitions for tables.") @@ -362,12 +307,15 @@ void TLogWriterWorkloadParams::ConfigureOpts(NLastGetopt::TOpts& opts, const ECo opts.AddLongOption("key-cols", "Number of key columns") .DefaultValue((ui64)LogWriterWorkloadConstants::KEY_COLUMNS_CNT).StoreResult(&KeyColumnsCnt); switch (static_cast(workloadType)) { - case TLogWriterWorkloadGenerator::EType::BulkUpsert: + case TLogWriterWorkloadGenerator::EType::Insert: case TLogWriterWorkloadGenerator::EType::Upsert: + case TLogWriterWorkloadGenerator::EType::BulkUpsert: opts.AddLongOption("len", "String len") .DefaultValue((ui64)LogWriterWorkloadConstants::STRING_LEN).StoreResult(&StringLen); opts.AddLongOption("rows", "Number of rows to upsert") .DefaultValue((ui64)LogWriterWorkloadConstants::ROWS_CNT).StoreResult(&RowsCnt); + opts.AddLongOption("timestamp_deviation_minutes", "Standard deviation. For each timestamp, a random variable with a specified standard deviation in minutes is added.") + .DefaultValue((ui64)LogWriterWorkloadConstants::TIMESTAMP_STANDARD_DEVIATION).StoreResult(&TimestampStandardDeviation); break; } break; diff --git a/ydb/library/workload/log_writer/log_writer.h b/ydb/library/workload/log_writer/log_writer.h index dc18e955a215..016c6bad64f3 100644 --- a/ydb/library/workload/log_writer/log_writer.h +++ b/ydb/library/workload/log_writer/log_writer.h @@ -14,7 +14,6 @@ enum LogWriterWorkloadConstants : ui64 { MIN_PARTITIONS = 40, MAX_PARTITIONS = 1000, PARTITION_SIZE_MB = 2000, - INIT_ROW_COUNT = 1000, STRING_LEN = 8, STR_COLUMNS_CNT = 1, INT_COLUMNS_CNT = 1, @@ -22,11 +21,7 @@ enum LogWriterWorkloadConstants : ui64 { ROWS_CNT = 1, PARTITIONS_BY_LOAD = true, - MIXED_CHANGE_PARTITIONS_SIZE = false, - MIXED_DO_READ_ROWS = false, - MIXED_DO_SELECT = true, - - STALE_RO = false, + TIMESTAMP_STANDARD_DEVIATION = 0, }; class TLogWriterWorkloadParams : public TWorkloadParams { @@ -42,17 +37,16 @@ class TLogWriterWorkloadParams : public TWorkloadParams { ui64 MinPartitions = LogWriterWorkloadConstants::MIN_PARTITIONS; ui64 MaxPartitions = LogWriterWorkloadConstants::MAX_PARTITIONS; ui64 PartitionSizeMb = LogWriterWorkloadConstants::PARTITION_SIZE_MB; - ui64 InitRowCount = LogWriterWorkloadConstants::INIT_ROW_COUNT; ui64 StringLen = LogWriterWorkloadConstants::STRING_LEN; ui64 StrColumnsCnt = LogWriterWorkloadConstants::STR_COLUMNS_CNT; ui64 IntColumnsCnt = LogWriterWorkloadConstants::INT_COLUMNS_CNT; ui64 KeyColumnsCnt = LogWriterWorkloadConstants::KEY_COLUMNS_CNT; + ui64 TimestampStandardDeviation = LogWriterWorkloadConstants::TIMESTAMP_STANDARD_DEVIATION; ui64 RowsCnt = LogWriterWorkloadConstants::ROWS_CNT; bool PartitionsByLoad = LogWriterWorkloadConstants::PARTITIONS_BY_LOAD; std::string TableName = "log_writer_test"; - bool StaleRO = LogWriterWorkloadConstants::STALE_RO; YDB_READONLY(EStoreType, StoreType, EStoreType::Row); }; @@ -93,17 +87,22 @@ class TLogWriterWorkloadGenerator final: public TWorkloadQueryGeneratorBase GetSupportedWorkloadTypes() const override; enum class EType { + Insert, Upsert, BulkUpsert, }; private: TQueryInfoList WriteRows(TString operation, TVector&& rows); + TQueryInfoList Insert(TVector&& rows); TQueryInfoList Upsert(TVector&& rows); TQueryInfoList BulkUpsert(TVector&& rows); TVector GenerateRandomRows(); const ui64 TotalColumnsCnt; + + std::random_device RandomDevice; + std::mt19937 Mt19937; }; } // namespace NLogWriter From 5530d5b129f40e9bcf4f57d8d319f53751bf13e4 Mon Sep 17 00:00:00 2001 From: Maxim Yurchuk Date: Tue, 14 Jan 2025 17:04:19 +0300 Subject: [PATCH 14/20] fixes --- ydb/library/workload/log_writer/log_writer.cpp | 8 +++++--- ydb/library/workload/log_writer/log_writer.h | 6 ++++-- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/ydb/library/workload/log_writer/log_writer.cpp b/ydb/library/workload/log_writer/log_writer.cpp index 56582eb341f1..7df105ccfb5c 100644 --- a/ydb/library/workload/log_writer/log_writer.cpp +++ b/ydb/library/workload/log_writer/log_writer.cpp @@ -65,7 +65,7 @@ std::string TLogWriterWorkloadGenerator::GetDDLQueries() const { } ss << ")) WITH ("; - ss << "TTL = Interval(\"PT7H\") ON ts, "; + ss << "TTL = Interval(\"PT" << Params.TimeStampTtlMinutes << "M\") ON ts, "; switch (Params.GetStoreType()) { case TLogWriterWorkloadParams::EStoreType::Row: @@ -224,7 +224,7 @@ TVector TLogWriterWorkloadGenerator::GetCleanPaths() const { TVector TLogWriterWorkloadGenerator::GenerateRandomRows() { TVector result(Params.RowsCnt); - std::normal_distribution normal_distribution_generator(0, static_cast(Params.TimestampStandardDeviation)); + std::normal_distribution normal_distribution_generator(0, static_cast(Params.TimestampStandardDeviationMinutes)); for (size_t row = 0; row < Params.RowsCnt; ++row) { result[row].Ts = TInstant::Now(); i64 millisecondsDiff = 60*1000*normal_distribution_generator(Mt19937); @@ -287,6 +287,8 @@ void TLogWriterWorkloadParams::ConfigureOpts(NLastGetopt::TOpts& opts, const ECo .DefaultValue((ui64)LogWriterWorkloadConstants::KEY_COLUMNS_CNT).StoreResult(&KeyColumnsCnt); opts.AddLongOption("rows", "Number of rows") .DefaultValue((ui64)LogWriterWorkloadConstants::ROWS_CNT).StoreResult(&RowsCnt); + opts.AddLongOption("ttl_minutes", "TTL for timestamp column") + .DefaultValue((ui64)LogWriterWorkloadConstants::TIMESTAMP_STANDARD_DEVIATION_MINUTES).StoreResult(&TimeStampTtlMinutes); opts.AddLongOption("store", "Storage type." " Options: row, column\n" " row - use row-based storage engine;\n" @@ -315,7 +317,7 @@ void TLogWriterWorkloadParams::ConfigureOpts(NLastGetopt::TOpts& opts, const ECo opts.AddLongOption("rows", "Number of rows to upsert") .DefaultValue((ui64)LogWriterWorkloadConstants::ROWS_CNT).StoreResult(&RowsCnt); opts.AddLongOption("timestamp_deviation_minutes", "Standard deviation. For each timestamp, a random variable with a specified standard deviation in minutes is added.") - .DefaultValue((ui64)LogWriterWorkloadConstants::TIMESTAMP_STANDARD_DEVIATION).StoreResult(&TimestampStandardDeviation); + .DefaultValue((ui64)LogWriterWorkloadConstants::TIMESTAMP_STANDARD_DEVIATION_MINUTES).StoreResult(&TimestampStandardDeviationMinutes); break; } break; diff --git a/ydb/library/workload/log_writer/log_writer.h b/ydb/library/workload/log_writer/log_writer.h index 016c6bad64f3..69264ae68649 100644 --- a/ydb/library/workload/log_writer/log_writer.h +++ b/ydb/library/workload/log_writer/log_writer.h @@ -21,7 +21,8 @@ enum LogWriterWorkloadConstants : ui64 { ROWS_CNT = 1, PARTITIONS_BY_LOAD = true, - TIMESTAMP_STANDARD_DEVIATION = 0, + TIMESTAMP_STANDARD_DEVIATION_MINUTES = 0, + TIMESTAMP_TTL_MIN = 60, }; class TLogWriterWorkloadParams : public TWorkloadParams { @@ -41,7 +42,8 @@ class TLogWriterWorkloadParams : public TWorkloadParams { ui64 StrColumnsCnt = LogWriterWorkloadConstants::STR_COLUMNS_CNT; ui64 IntColumnsCnt = LogWriterWorkloadConstants::INT_COLUMNS_CNT; ui64 KeyColumnsCnt = LogWriterWorkloadConstants::KEY_COLUMNS_CNT; - ui64 TimestampStandardDeviation = LogWriterWorkloadConstants::TIMESTAMP_STANDARD_DEVIATION; + ui64 TimestampStandardDeviationMinutes = LogWriterWorkloadConstants::TIMESTAMP_STANDARD_DEVIATION_MINUTES; + ui64 TimeStampTtlMinutes = LogWriterWorkloadConstants::TIMESTAMP_STANDARD_DEVIATION_MINUTES; ui64 RowsCnt = LogWriterWorkloadConstants::ROWS_CNT; bool PartitionsByLoad = LogWriterWorkloadConstants::PARTITIONS_BY_LOAD; From 484c124b4445d8eb9e37860b79d5309ae5a2fc5d Mon Sep 17 00:00:00 2001 From: Maxim Yurchuk Date: Tue, 14 Jan 2025 17:28:30 +0300 Subject: [PATCH 15/20] fixed --- ydb/library/workload/log_writer/log_writer.cpp | 12 +++++------- ydb/library/workload/log_writer/log_writer.h | 2 +- 2 files changed, 6 insertions(+), 8 deletions(-) diff --git a/ydb/library/workload/log_writer/log_writer.cpp b/ydb/library/workload/log_writer/log_writer.cpp index 7df105ccfb5c..2aae2f363814 100644 --- a/ydb/library/workload/log_writer/log_writer.cpp +++ b/ydb/library/workload/log_writer/log_writer.cpp @@ -65,7 +65,7 @@ std::string TLogWriterWorkloadGenerator::GetDDLQueries() const { } ss << ")) WITH ("; - ss << "TTL = Interval(\"PT" << Params.TimeStampTtlMinutes << "M\") ON ts, "; + ss << "TTL = Interval(\"PT" << Params.TimestampTtlMinutes << "M\") ON ts, "; switch (Params.GetStoreType()) { case TLogWriterWorkloadParams::EStoreType::Row: @@ -281,14 +281,12 @@ void TLogWriterWorkloadParams::ConfigureOpts(NLastGetopt::TOpts& opts, const ECo .DefaultValue((ui64)LogWriterWorkloadConstants::STRING_LEN).StoreResult(&StringLen); opts.AddLongOption("int-cols", "Number of int columns") .DefaultValue((ui64)LogWriterWorkloadConstants::INT_COLUMNS_CNT).StoreResult(&IntColumnsCnt); - opts.AddLongOption("str-cols", "Number of columns") + opts.AddLongOption("str-cols", "Number of string columns") .DefaultValue((ui64)LogWriterWorkloadConstants::STR_COLUMNS_CNT).StoreResult(&StrColumnsCnt); opts.AddLongOption("key-cols", "Number of key columns") .DefaultValue((ui64)LogWriterWorkloadConstants::KEY_COLUMNS_CNT).StoreResult(&KeyColumnsCnt); - opts.AddLongOption("rows", "Number of rows") - .DefaultValue((ui64)LogWriterWorkloadConstants::ROWS_CNT).StoreResult(&RowsCnt); opts.AddLongOption("ttl_minutes", "TTL for timestamp column") - .DefaultValue((ui64)LogWriterWorkloadConstants::TIMESTAMP_STANDARD_DEVIATION_MINUTES).StoreResult(&TimeStampTtlMinutes); + .DefaultValue((ui64)LogWriterWorkloadConstants::TIMESTAMP_TTL_MIN).StoreResult(&TimestampTtlMinutes); opts.AddLongOption("store", "Storage type." " Options: row, column\n" " row - use row-based storage engine;\n" @@ -302,10 +300,10 @@ void TLogWriterWorkloadParams::ConfigureOpts(NLastGetopt::TOpts& opts, const ECo }); break; case TWorkloadParams::ECommandType::Run: - opts.AddLongOption("str-cols", "Number of int columns") - .DefaultValue((ui64)LogWriterWorkloadConstants::INT_COLUMNS_CNT).StoreResult(&StrColumnsCnt); opts.AddLongOption("int-cols", "Number of int columns") .DefaultValue((ui64)LogWriterWorkloadConstants::INT_COLUMNS_CNT).StoreResult(&IntColumnsCnt); + opts.AddLongOption("str-cols", "Number of string columns") + .DefaultValue((ui64)LogWriterWorkloadConstants::STR_COLUMNS_CNT).StoreResult(&StrColumnsCnt); opts.AddLongOption("key-cols", "Number of key columns") .DefaultValue((ui64)LogWriterWorkloadConstants::KEY_COLUMNS_CNT).StoreResult(&KeyColumnsCnt); switch (static_cast(workloadType)) { diff --git a/ydb/library/workload/log_writer/log_writer.h b/ydb/library/workload/log_writer/log_writer.h index 69264ae68649..582cd0e623a4 100644 --- a/ydb/library/workload/log_writer/log_writer.h +++ b/ydb/library/workload/log_writer/log_writer.h @@ -43,7 +43,7 @@ class TLogWriterWorkloadParams : public TWorkloadParams { ui64 IntColumnsCnt = LogWriterWorkloadConstants::INT_COLUMNS_CNT; ui64 KeyColumnsCnt = LogWriterWorkloadConstants::KEY_COLUMNS_CNT; ui64 TimestampStandardDeviationMinutes = LogWriterWorkloadConstants::TIMESTAMP_STANDARD_DEVIATION_MINUTES; - ui64 TimeStampTtlMinutes = LogWriterWorkloadConstants::TIMESTAMP_STANDARD_DEVIATION_MINUTES; + ui64 TimestampTtlMinutes = LogWriterWorkloadConstants::TIMESTAMP_STANDARD_DEVIATION_MINUTES; ui64 RowsCnt = LogWriterWorkloadConstants::ROWS_CNT; bool PartitionsByLoad = LogWriterWorkloadConstants::PARTITIONS_BY_LOAD; From 19bbe057e913d8e629bfa948ebe0a34447435939 Mon Sep 17 00:00:00 2001 From: Maxim Yurchuk Date: Tue, 14 Jan 2025 17:55:03 +0300 Subject: [PATCH 16/20] fixes --- ydb/library/workload/log_writer/log_writer.cpp | 2 -- 1 file changed, 2 deletions(-) diff --git a/ydb/library/workload/log_writer/log_writer.cpp b/ydb/library/workload/log_writer/log_writer.cpp index 2aae2f363814..3226e90a70e7 100644 --- a/ydb/library/workload/log_writer/log_writer.cpp +++ b/ydb/library/workload/log_writer/log_writer.cpp @@ -3,8 +3,6 @@ #include #include -#include - #include #include #include From 0132fe61ef5fad9fc5071b8044020016dd2712a4 Mon Sep 17 00:00:00 2001 From: Maxim Yurchuk Date: Tue, 14 Jan 2025 20:23:34 +0300 Subject: [PATCH 17/20] fixes --- .../log_writer.cpp => log/log.cpp} | 84 +++++++++---------- .../{log_writer/log_writer.h => log/log.h} | 36 ++++---- ydb/library/workload/log/registrar.cpp | 12 +++ .../workload/{log_writer => log}/ya.make | 4 +- ydb/library/workload/log_writer/registrar.cpp | 12 --- ydb/library/workload/ya.make | 4 +- .../tests/test_workload.py | 6 +- .../{log_writer => log}/tests/ya.make | 0 .../workloads/{log_writer => log}/ya.make | 0 ydb/tests/workloads/ya.make | 2 +- 10 files changed, 80 insertions(+), 80 deletions(-) rename ydb/library/workload/{log_writer/log_writer.cpp => log/log.cpp} (73%) rename ydb/library/workload/{log_writer/log_writer.h => log/log.h} (64%) create mode 100644 ydb/library/workload/log/registrar.cpp rename ydb/library/workload/{log_writer => log}/ya.make (57%) delete mode 100644 ydb/library/workload/log_writer/registrar.cpp rename ydb/tests/workloads/{log_writer => log}/tests/test_workload.py (92%) rename ydb/tests/workloads/{log_writer => log}/tests/ya.make (100%) rename ydb/tests/workloads/{log_writer => log}/ya.make (100%) diff --git a/ydb/library/workload/log_writer/log_writer.cpp b/ydb/library/workload/log/log.cpp similarity index 73% rename from ydb/library/workload/log_writer/log_writer.cpp rename to ydb/library/workload/log/log.cpp index 3226e90a70e7..98bd9f02996b 100644 --- a/ydb/library/workload/log_writer/log_writer.cpp +++ b/ydb/library/workload/log/log.cpp @@ -1,4 +1,4 @@ -#include "log_writer.h" +#include "log.h" #include #include #include @@ -14,12 +14,12 @@ namespace NYdbWorkload { -namespace NLogWriter { +namespace NLog { -using TRow = TLogWriterWorkloadGenerator::TRow; +using TRow = TLogGenerator::TRow; -TLogWriterWorkloadGenerator::TLogWriterWorkloadGenerator(const TLogWriterWorkloadParams* params) +TLogGenerator::TLogGenerator(const TLogWorkloadParams* params) : TBase(params) , TotalColumnsCnt(1 + Params.IntColumnsCnt + Params.StrColumnsCnt) , RandomDevice() @@ -28,7 +28,7 @@ TLogWriterWorkloadGenerator::TLogWriterWorkloadGenerator(const TLogWriterWorkloa Y_ABORT_UNLESS(TotalColumnsCnt >= Params.KeyColumnsCnt); } -std::string TLogWriterWorkloadGenerator::GetDDLQueries() const { +std::string TLogGenerator::GetDDLQueries() const { std::stringstream ss; ss << "--!syntax_v1\n"; @@ -47,7 +47,7 @@ std::string TLogWriterWorkloadGenerator::GetDDLQueries() const { { ss << "c" << i << " String"; } - if (i < Params.KeyColumnsCnt && Params.GetStoreType() == TLogWriterWorkloadParams::EStoreType::Column) { + if (i < Params.KeyColumnsCnt && Params.GetStoreType() == TLogWorkloadParams::EStoreType::Column) { ss << " NOT NULL"; } ss << ", "; @@ -66,10 +66,10 @@ std::string TLogWriterWorkloadGenerator::GetDDLQueries() const { ss << "TTL = Interval(\"PT" << Params.TimestampTtlMinutes << "M\") ON ts, "; switch (Params.GetStoreType()) { - case TLogWriterWorkloadParams::EStoreType::Row: + case TLogWorkloadParams::EStoreType::Row: ss << "STORE = ROW, "; break; - case TLogWriterWorkloadParams::EStoreType::Column: + case TLogWorkloadParams::EStoreType::Column: ss << "STORE = COLUMN, "; break; default: @@ -84,7 +84,7 @@ std::string TLogWriterWorkloadGenerator::GetDDLQueries() const { return ss.str(); } -TQueryInfoList TLogWriterWorkloadGenerator::GetWorkload(int type) { +TQueryInfoList TLogGenerator::GetWorkload(int type) { switch (static_cast(type)) { case EType::Insert: return Insert(GenerateRandomRows()); @@ -98,7 +98,7 @@ TQueryInfoList TLogWriterWorkloadGenerator::GetWorkload(int type) { } -TVector TLogWriterWorkloadGenerator::GetSupportedWorkloadTypes() const { +TVector TLogGenerator::GetSupportedWorkloadTypes() const { TVector result; result.emplace_back(static_cast(EType::Upsert), "insert", "Insert random rows into table near current ts"); result.emplace_back(static_cast(EType::Upsert), "upsert", "Upsert random rows into table near current ts"); @@ -106,7 +106,7 @@ TVector TLogWriterWorkloadGenerator::Get return result; } -TQueryInfoList TLogWriterWorkloadGenerator::WriteRows(TString operation, TVector&& rows) { +TQueryInfoList TLogGenerator::WriteRows(TString operation, TVector&& rows) { std::stringstream ss; NYdb::TParamsBuilder paramsBuilder; @@ -169,15 +169,15 @@ TQueryInfoList TLogWriterWorkloadGenerator::WriteRows(TString operation, TVector return TQueryInfoList(1, TQueryInfo(ss.str(), std::move(params))); } -TQueryInfoList TLogWriterWorkloadGenerator::Insert(TVector&& rows) { +TQueryInfoList TLogGenerator::Insert(TVector&& rows) { return WriteRows("INSERT", std::move(rows)); } -TQueryInfoList TLogWriterWorkloadGenerator::Upsert(TVector&& rows) { +TQueryInfoList TLogGenerator::Upsert(TVector&& rows) { return WriteRows("UPSERT", std::move(rows)); } -TQueryInfoList TLogWriterWorkloadGenerator::BulkUpsert(TVector&& rows) { +TQueryInfoList TLogGenerator::BulkUpsert(TVector&& rows) { NYdb::TValueBuilder valueBuilder; valueBuilder.BeginList(); for (const TRow& row : rows) { @@ -210,16 +210,16 @@ TQueryInfoList TLogWriterWorkloadGenerator::BulkUpsert(TVector&& rows) { } -TQueryInfoList TLogWriterWorkloadGenerator::GetInitialData() { +TQueryInfoList TLogGenerator::GetInitialData() { TQueryInfoList res; return res; } -TVector TLogWriterWorkloadGenerator::GetCleanPaths() const { +TVector TLogGenerator::GetCleanPaths() const { return { Params.TableName }; } -TVector TLogWriterWorkloadGenerator::GenerateRandomRows() { +TVector TLogGenerator::GenerateRandomRows() { TVector result(Params.RowsCnt); std::normal_distribution normal_distribution_generator(0, static_cast(Params.TimestampStandardDeviationMinutes)); @@ -256,7 +256,7 @@ TVector TLogWriterWorkloadGenerator::GenerateRandomRows() { return result; } -void TLogWriterWorkloadParams::ConfigureOpts(NLastGetopt::TOpts& opts, const ECommandType commandType, int workloadType) { +void TLogWorkloadParams::ConfigureOpts(NLastGetopt::TOpts& opts, const ECommandType commandType, int workloadType) { opts.AddLongOption('p', "path", "Path where benchmark tables are located") .Optional() .DefaultValue(TableName) @@ -268,23 +268,23 @@ void TLogWriterWorkloadParams::ConfigureOpts(NLastGetopt::TOpts& opts, const ECo switch (commandType) { case TWorkloadParams::ECommandType::Init: opts.AddLongOption("min-partitions", "Minimum partitions for tables.") - .DefaultValue((ui64)LogWriterWorkloadConstants::MIN_PARTITIONS).StoreResult(&MinPartitions); + .DefaultValue((ui64)LogWorkloadConstants::MIN_PARTITIONS).StoreResult(&MinPartitions); opts.AddLongOption("max-partitions", "Maximum partitions for tables.") - .DefaultValue((ui64)LogWriterWorkloadConstants::MAX_PARTITIONS).StoreResult(&MaxPartitions); + .DefaultValue((ui64)LogWorkloadConstants::MAX_PARTITIONS).StoreResult(&MaxPartitions); opts.AddLongOption("partition-size", "Maximum partition size in megabytes (AUTO_PARTITIONING_PARTITION_SIZE_MB).") - .DefaultValue((ui64)LogWriterWorkloadConstants::PARTITION_SIZE_MB).StoreResult(&PartitionSizeMb); + .DefaultValue((ui64)LogWorkloadConstants::PARTITION_SIZE_MB).StoreResult(&PartitionSizeMb); opts.AddLongOption("auto-partition", "Enable auto partitioning by load.") - .DefaultValue((ui64)LogWriterWorkloadConstants::PARTITIONS_BY_LOAD).StoreResult(&PartitionsByLoad); + .DefaultValue((ui64)LogWorkloadConstants::PARTITIONS_BY_LOAD).StoreResult(&PartitionsByLoad); opts.AddLongOption("len", "String len") - .DefaultValue((ui64)LogWriterWorkloadConstants::STRING_LEN).StoreResult(&StringLen); + .DefaultValue((ui64)LogWorkloadConstants::STRING_LEN).StoreResult(&StringLen); opts.AddLongOption("int-cols", "Number of int columns") - .DefaultValue((ui64)LogWriterWorkloadConstants::INT_COLUMNS_CNT).StoreResult(&IntColumnsCnt); + .DefaultValue((ui64)LogWorkloadConstants::INT_COLUMNS_CNT).StoreResult(&IntColumnsCnt); opts.AddLongOption("str-cols", "Number of string columns") - .DefaultValue((ui64)LogWriterWorkloadConstants::STR_COLUMNS_CNT).StoreResult(&StrColumnsCnt); + .DefaultValue((ui64)LogWorkloadConstants::STR_COLUMNS_CNT).StoreResult(&StrColumnsCnt); opts.AddLongOption("key-cols", "Number of key columns") - .DefaultValue((ui64)LogWriterWorkloadConstants::KEY_COLUMNS_CNT).StoreResult(&KeyColumnsCnt); + .DefaultValue((ui64)LogWorkloadConstants::KEY_COLUMNS_CNT).StoreResult(&KeyColumnsCnt); opts.AddLongOption("ttl_minutes", "TTL for timestamp column") - .DefaultValue((ui64)LogWriterWorkloadConstants::TIMESTAMP_TTL_MIN).StoreResult(&TimestampTtlMinutes); + .DefaultValue((ui64)LogWorkloadConstants::TIMESTAMP_TTL_MIN).StoreResult(&TimestampTtlMinutes); opts.AddLongOption("store", "Storage type." " Options: row, column\n" " row - use row-based storage engine;\n" @@ -299,21 +299,21 @@ void TLogWriterWorkloadParams::ConfigureOpts(NLastGetopt::TOpts& opts, const ECo break; case TWorkloadParams::ECommandType::Run: opts.AddLongOption("int-cols", "Number of int columns") - .DefaultValue((ui64)LogWriterWorkloadConstants::INT_COLUMNS_CNT).StoreResult(&IntColumnsCnt); + .DefaultValue((ui64)LogWorkloadConstants::INT_COLUMNS_CNT).StoreResult(&IntColumnsCnt); opts.AddLongOption("str-cols", "Number of string columns") - .DefaultValue((ui64)LogWriterWorkloadConstants::STR_COLUMNS_CNT).StoreResult(&StrColumnsCnt); + .DefaultValue((ui64)LogWorkloadConstants::STR_COLUMNS_CNT).StoreResult(&StrColumnsCnt); opts.AddLongOption("key-cols", "Number of key columns") - .DefaultValue((ui64)LogWriterWorkloadConstants::KEY_COLUMNS_CNT).StoreResult(&KeyColumnsCnt); - switch (static_cast(workloadType)) { - case TLogWriterWorkloadGenerator::EType::Insert: - case TLogWriterWorkloadGenerator::EType::Upsert: - case TLogWriterWorkloadGenerator::EType::BulkUpsert: + .DefaultValue((ui64)LogWorkloadConstants::KEY_COLUMNS_CNT).StoreResult(&KeyColumnsCnt); + switch (static_cast(workloadType)) { + case TLogGenerator::EType::Insert: + case TLogGenerator::EType::Upsert: + case TLogGenerator::EType::BulkUpsert: opts.AddLongOption("len", "String len") - .DefaultValue((ui64)LogWriterWorkloadConstants::STRING_LEN).StoreResult(&StringLen); + .DefaultValue((ui64)LogWorkloadConstants::STRING_LEN).StoreResult(&StringLen); opts.AddLongOption("rows", "Number of rows to upsert") - .DefaultValue((ui64)LogWriterWorkloadConstants::ROWS_CNT).StoreResult(&RowsCnt); + .DefaultValue((ui64)LogWorkloadConstants::ROWS_CNT).StoreResult(&RowsCnt); opts.AddLongOption("timestamp_deviation_minutes", "Standard deviation. For each timestamp, a random variable with a specified standard deviation in minutes is added.") - .DefaultValue((ui64)LogWriterWorkloadConstants::TIMESTAMP_STANDARD_DEVIATION_MINUTES).StoreResult(&TimestampStandardDeviationMinutes); + .DefaultValue((ui64)LogWorkloadConstants::TIMESTAMP_STANDARD_DEVIATION_MINUTES).StoreResult(&TimestampStandardDeviationMinutes); break; } break; @@ -322,14 +322,14 @@ void TLogWriterWorkloadParams::ConfigureOpts(NLastGetopt::TOpts& opts, const ECo } } -THolder TLogWriterWorkloadParams::CreateGenerator() const { - return MakeHolder(this); +THolder TLogWorkloadParams::CreateGenerator() const { + return MakeHolder(this); } -TString TLogWriterWorkloadParams::GetWorkloadName() const { - return "Log Writer"; +TString TLogWorkloadParams::GetWorkloadName() const { + return "Log"; } -} // namespace NLogWriter +} // namespace NLog } // namespace NYdbWorkload diff --git a/ydb/library/workload/log_writer/log_writer.h b/ydb/library/workload/log/log.h similarity index 64% rename from ydb/library/workload/log_writer/log_writer.h rename to ydb/library/workload/log/log.h index 582cd0e623a4..04851a2cc797 100644 --- a/ydb/library/workload/log_writer/log_writer.h +++ b/ydb/library/workload/log/log.h @@ -8,9 +8,9 @@ namespace NYdbWorkload { -namespace NLogWriter { +namespace NLog { -enum LogWriterWorkloadConstants : ui64 { +enum LogWorkloadConstants : ui64 { MIN_PARTITIONS = 40, MAX_PARTITIONS = 1000, PARTITION_SIZE_MB = 2000, @@ -25,7 +25,7 @@ enum LogWriterWorkloadConstants : ui64 { TIMESTAMP_TTL_MIN = 60, }; -class TLogWriterWorkloadParams : public TWorkloadParams { +class TLogWorkloadParams : public TWorkloadParams { public: enum class EStoreType { Row /* "row" */, @@ -35,26 +35,26 @@ class TLogWriterWorkloadParams : public TWorkloadParams { void ConfigureOpts(NLastGetopt::TOpts& opts, const ECommandType commandType, int workloadType) override; THolder CreateGenerator() const override; TString GetWorkloadName() const override; - ui64 MinPartitions = LogWriterWorkloadConstants::MIN_PARTITIONS; - ui64 MaxPartitions = LogWriterWorkloadConstants::MAX_PARTITIONS; - ui64 PartitionSizeMb = LogWriterWorkloadConstants::PARTITION_SIZE_MB; - ui64 StringLen = LogWriterWorkloadConstants::STRING_LEN; - ui64 StrColumnsCnt = LogWriterWorkloadConstants::STR_COLUMNS_CNT; - ui64 IntColumnsCnt = LogWriterWorkloadConstants::INT_COLUMNS_CNT; - ui64 KeyColumnsCnt = LogWriterWorkloadConstants::KEY_COLUMNS_CNT; - ui64 TimestampStandardDeviationMinutes = LogWriterWorkloadConstants::TIMESTAMP_STANDARD_DEVIATION_MINUTES; - ui64 TimestampTtlMinutes = LogWriterWorkloadConstants::TIMESTAMP_STANDARD_DEVIATION_MINUTES; - ui64 RowsCnt = LogWriterWorkloadConstants::ROWS_CNT; - bool PartitionsByLoad = LogWriterWorkloadConstants::PARTITIONS_BY_LOAD; + ui64 MinPartitions = LogWorkloadConstants::MIN_PARTITIONS; + ui64 MaxPartitions = LogWorkloadConstants::MAX_PARTITIONS; + ui64 PartitionSizeMb = LogWorkloadConstants::PARTITION_SIZE_MB; + ui64 StringLen = LogWorkloadConstants::STRING_LEN; + ui64 StrColumnsCnt = LogWorkloadConstants::STR_COLUMNS_CNT; + ui64 IntColumnsCnt = LogWorkloadConstants::INT_COLUMNS_CNT; + ui64 KeyColumnsCnt = LogWorkloadConstants::KEY_COLUMNS_CNT; + ui64 TimestampStandardDeviationMinutes = LogWorkloadConstants::TIMESTAMP_STANDARD_DEVIATION_MINUTES; + ui64 TimestampTtlMinutes = LogWorkloadConstants::TIMESTAMP_STANDARD_DEVIATION_MINUTES; + ui64 RowsCnt = LogWorkloadConstants::ROWS_CNT; + bool PartitionsByLoad = LogWorkloadConstants::PARTITIONS_BY_LOAD; std::string TableName = "log_writer_test"; YDB_READONLY(EStoreType, StoreType, EStoreType::Row); }; -class TLogWriterWorkloadGenerator final: public TWorkloadQueryGeneratorBase { +class TLogGenerator final: public TWorkloadQueryGeneratorBase { public: - using TBase = TWorkloadQueryGeneratorBase; + using TBase = TWorkloadQueryGeneratorBase; struct TRow { TInstant Ts; TVector Ints; @@ -77,7 +77,7 @@ class TLogWriterWorkloadGenerator final: public TWorkloadQueryGeneratorBase + +namespace NYdbWorkload { + +namespace NLog { + +TWorkloadFactory::TRegistrator Registrar("log"); + +} // namespace NLog + +} // namespace NYdbWorkload diff --git a/ydb/library/workload/log_writer/ya.make b/ydb/library/workload/log/ya.make similarity index 57% rename from ydb/library/workload/log_writer/ya.make rename to ydb/library/workload/log/ya.make index 9f8eb4cb569e..fa1dadfca1d1 100644 --- a/ydb/library/workload/log_writer/ya.make +++ b/ydb/library/workload/log/ya.make @@ -2,13 +2,13 @@ LIBRARY() SRCS( GLOBAL registrar.cpp - log_writer.cpp + log.cpp ) PEERDIR( ydb/library/workload/abstract ) -GENERATE_ENUM_SERIALIZATION_WITH_HEADER(log_writer.h) +GENERATE_ENUM_SERIALIZATION_WITH_HEADER(log.h) END() diff --git a/ydb/library/workload/log_writer/registrar.cpp b/ydb/library/workload/log_writer/registrar.cpp deleted file mode 100644 index 561701ad43c0..000000000000 --- a/ydb/library/workload/log_writer/registrar.cpp +++ /dev/null @@ -1,12 +0,0 @@ -#include "log_writer.h" -#include - -namespace NYdbWorkload { - -namespace NLogWriter { - -TWorkloadFactory::TRegistrator Registrar("log_writer"); - -} // namespace NLogWriter - -} // namespace NYdbWorkload diff --git a/ydb/library/workload/ya.make b/ydb/library/workload/ya.make index 4a185abab8e3..7819c15847a4 100644 --- a/ydb/library/workload/ya.make +++ b/ydb/library/workload/ya.make @@ -4,7 +4,7 @@ PEERDIR( ydb/library/workload/abstract ydb/library/workload/clickbench ydb/library/workload/kv - ydb/library/workload/log_writer + ydb/library/workload/log ydb/library/workload/stock ydb/library/workload/tpcds ydb/library/workload/tpch @@ -17,7 +17,7 @@ RECURSE( benchmark_base clickbench kv - log_writer + log stock tpc_base tpcds diff --git a/ydb/tests/workloads/log_writer/tests/test_workload.py b/ydb/tests/workloads/log/tests/test_workload.py similarity index 92% rename from ydb/tests/workloads/log_writer/tests/test_workload.py rename to ydb/tests/workloads/log/tests/test_workload.py index 6bd0e09fc56e..098cdfc70ae1 100644 --- a/ydb/tests/workloads/log_writer/tests/test_workload.py +++ b/ydb/tests/workloads/log/tests/test_workload.py @@ -10,7 +10,7 @@ from ydb.tests.library.common.types import Erasure -class TestYdbKvWorkload(object): +class TestYdbLogWorkload(object): @classmethod def setup_class(cls): cls.cluster = KiKiMR(KikimrConfigGenerator(erasure=Erasure.MIRROR_3_DC)) @@ -20,7 +20,7 @@ def setup_class(cls): "--verbose", "--endpoint", "grpc://localhost:%d" % cls.cluster.nodes[1].grpc_port, "--database=/Root", - "workload", "log_writer", "init", + "workload", "log", "init", "--min-partitions", "100", "--partition-size", "10", "--auto-partition", "0", @@ -33,7 +33,7 @@ def setup_class(cls): "--verbose", "--endpoint", "grpc://localhost:%d" % cls.cluster.nodes[1].grpc_port, "--database=/Root", - "workload", "log_writer", "run", "bulk_upsert", + "workload", "log", "run", "bulk_upsert", "--seconds", "10", "--threads", "10", "--len", "200", diff --git a/ydb/tests/workloads/log_writer/tests/ya.make b/ydb/tests/workloads/log/tests/ya.make similarity index 100% rename from ydb/tests/workloads/log_writer/tests/ya.make rename to ydb/tests/workloads/log/tests/ya.make diff --git a/ydb/tests/workloads/log_writer/ya.make b/ydb/tests/workloads/log/ya.make similarity index 100% rename from ydb/tests/workloads/log_writer/ya.make rename to ydb/tests/workloads/log/ya.make diff --git a/ydb/tests/workloads/ya.make b/ydb/tests/workloads/ya.make index a2783576d52d..d4628eab406f 100644 --- a/ydb/tests/workloads/ya.make +++ b/ydb/tests/workloads/ya.make @@ -1,6 +1,6 @@ RECURSE( kv - log_writer + log olap_workload simple_queue statistics_workload From 72185dfcbca648acbcfb735acdbcdf58743c0f58 Mon Sep 17 00:00:00 2001 From: Maxim Yurchuk Date: Tue, 14 Jan 2025 20:28:25 +0300 Subject: [PATCH 18/20] code style --- ydb/library/workload/log/log.cpp | 41 +++++++++++--------------------- 1 file changed, 14 insertions(+), 27 deletions(-) diff --git a/ydb/library/workload/log/log.cpp b/ydb/library/workload/log/log.cpp index 98bd9f02996b..f6e740668121 100644 --- a/ydb/library/workload/log/log.cpp +++ b/ydb/library/workload/log/log.cpp @@ -35,18 +35,14 @@ std::string TLogGenerator::GetDDLQueries() const { ss << "CREATE TABLE `" << Params.DbPath << "/" << Params.TableName << "`("; for (size_t i = 0; i < TotalColumnsCnt; ++i) { - if (i == 0) - { + if (i == 0) { ss << "ts Timestamp"; - - } - else if (i < Params.IntColumnsCnt + 1) { + } else if (i < Params.IntColumnsCnt + 1) { ss << "c" << i << " Uint64"; - } - else - { + } else { ss << "c" << i << " String"; } + if (i < Params.KeyColumnsCnt && Params.GetStoreType() == TLogWorkloadParams::EStoreType::Column) { ss << " NOT NULL"; } @@ -116,12 +112,10 @@ TQueryInfoList TLogGenerator::WriteRows(TString operation, TVector&& rows) for (size_t row = 0; row < Params.RowsCnt; ++row) { for (size_t col = 0; col < TotalColumnsCnt; ++col) { TString cname = "$c" + std::to_string(row) + "_" + std::to_string(col); - if (col == 0) - { + if (col == 0) { ss << "DECLARE " << cname << " AS Timestamp;\n"; paramsBuilder.AddParam(cname).Timestamp(rows[row].Ts).Build(); - } - else if (col < Params.IntColumnsCnt + 1) { + } else if (col < Params.IntColumnsCnt + 1) { ss << "DECLARE " << cname << " AS Uint64;\n"; paramsBuilder.AddParam(cname).Uint64(rows[row].Ints[col - 1]).Build(); } else { @@ -134,14 +128,12 @@ TQueryInfoList TLogGenerator::WriteRows(TString operation, TVector&& rows) ss << operation << " INTO `" << Params.TableName << "` ("; for (size_t col = 0; col < TotalColumnsCnt; ++col) { - if (col != 0) - { + if (col != 0) { ss << "c" << col; - } - else - { + } else { ss << "ts"; } + if (col + 1 < TotalColumnsCnt) { ss << ", "; } @@ -184,11 +176,9 @@ TQueryInfoList TLogGenerator::BulkUpsert(TVector&& rows) { auto &listItem = valueBuilder.AddListItem(); listItem.BeginStruct(); for (size_t col = 0; col < TotalColumnsCnt; ++col) { - if (col == 0) - { + if (col == 0) { listItem.AddMember("ts").Timestamp(row.Ts); - } - else if (col < Params.IntColumnsCnt + 1) { + } else if (col < Params.IntColumnsCnt + 1) { listItem.AddMember(std::format("c{}", col)).Uint64(row.Ints[col-1]); } else { listItem.AddMember(std::format("c{}", col)).String(row.Strings[col - Params.IntColumnsCnt - 1]); @@ -226,12 +216,9 @@ TVector TLogGenerator::GenerateRandomRows() { for (size_t row = 0; row < Params.RowsCnt; ++row) { result[row].Ts = TInstant::Now(); i64 millisecondsDiff = 60*1000*normal_distribution_generator(Mt19937); - if (millisecondsDiff >= 0) // TDuration::MilliSeconds can't be negative for some reason... - { + if (millisecondsDiff >= 0) { // TDuration::MilliSeconds can't be negative for some reason... result[row].Ts = result[row].Ts + TDuration::MilliSeconds(millisecondsDiff); - } - else - { + } else { result[row].Ts = result[row].Ts - TDuration::MilliSeconds(-millisecondsDiff); } @@ -283,7 +270,7 @@ void TLogWorkloadParams::ConfigureOpts(NLastGetopt::TOpts& opts, const ECommandT .DefaultValue((ui64)LogWorkloadConstants::STR_COLUMNS_CNT).StoreResult(&StrColumnsCnt); opts.AddLongOption("key-cols", "Number of key columns") .DefaultValue((ui64)LogWorkloadConstants::KEY_COLUMNS_CNT).StoreResult(&KeyColumnsCnt); - opts.AddLongOption("ttl_minutes", "TTL for timestamp column") + opts.AddLongOption("ttl", "TTL for timestamp column in minutes") .DefaultValue((ui64)LogWorkloadConstants::TIMESTAMP_TTL_MIN).StoreResult(&TimestampTtlMinutes); opts.AddLongOption("store", "Storage type." " Options: row, column\n" From 8d151667685cd6b3f5d91fd01cd4f60bdbc447dd Mon Sep 17 00:00:00 2001 From: Maxim Yurchuk Date: Tue, 14 Jan 2025 20:30:42 +0300 Subject: [PATCH 19/20] timestamp_deviation_minutes - _minutes --- ydb/library/workload/log/log.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ydb/library/workload/log/log.cpp b/ydb/library/workload/log/log.cpp index f6e740668121..2852a85afaa1 100644 --- a/ydb/library/workload/log/log.cpp +++ b/ydb/library/workload/log/log.cpp @@ -96,7 +96,7 @@ TQueryInfoList TLogGenerator::GetWorkload(int type) { TVector TLogGenerator::GetSupportedWorkloadTypes() const { TVector result; - result.emplace_back(static_cast(EType::Upsert), "insert", "Insert random rows into table near current ts"); + result.emplace_back(static_cast(EType::Insert), "insert", "Insert random rows into table near current ts"); result.emplace_back(static_cast(EType::Upsert), "upsert", "Upsert random rows into table near current ts"); result.emplace_back(static_cast(EType::BulkUpsert), "bulk_upsert", "Bulk upsert random rows into table near current ts"); return result; @@ -299,7 +299,7 @@ void TLogWorkloadParams::ConfigureOpts(NLastGetopt::TOpts& opts, const ECommandT .DefaultValue((ui64)LogWorkloadConstants::STRING_LEN).StoreResult(&StringLen); opts.AddLongOption("rows", "Number of rows to upsert") .DefaultValue((ui64)LogWorkloadConstants::ROWS_CNT).StoreResult(&RowsCnt); - opts.AddLongOption("timestamp_deviation_minutes", "Standard deviation. For each timestamp, a random variable with a specified standard deviation in minutes is added.") + opts.AddLongOption("timestamp_deviation", "Standard deviation. For each timestamp, a random variable with a specified standard deviation in minutes is added.") .DefaultValue((ui64)LogWorkloadConstants::TIMESTAMP_STANDARD_DEVIATION_MINUTES).StoreResult(&TimestampStandardDeviationMinutes); break; } From 5a7305ae2e577b508d301cadf1eafb68ce6e47aa Mon Sep 17 00:00:00 2001 From: Maxim Yurchuk Date: Tue, 14 Jan 2025 20:33:58 +0300 Subject: [PATCH 20/20] more review fixes --- ydb/public/lib/ydb_cli/commands/ydb_workload.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ydb/public/lib/ydb_cli/commands/ydb_workload.cpp b/ydb/public/lib/ydb_cli/commands/ydb_workload.cpp index 7717c9c6b2f9..8cc76d67ac15 100644 --- a/ydb/public/lib/ydb_cli/commands/ydb_workload.cpp +++ b/ydb/public/lib/ydb_cli/commands/ydb_workload.cpp @@ -154,7 +154,7 @@ void TWorkloadCommand::WorkerFn(int taskId, NYdbWorkload::IWorkloadQueryGenerato auto runTableClient = [this, &queryInfo, &dataQuerySettings, &retryCount] (NYdb::NTable::TSession session) -> NYdb::TStatus { if (!TableClient) { - Y_FAIL_S("TableClient is not inited ."); + Y_FAIL_S("TableClient is not initialized."); } ++retryCount; if (queryInfo.AlterTable) { @@ -187,7 +187,7 @@ void TWorkloadCommand::WorkerFn(int taskId, NYdbWorkload::IWorkloadQueryGenerato auto runQueryClient = [this, &queryInfo, &genericQuerySettings, &retryCount] (NYdb::NQuery::TSession session) -> NYdb::NQuery::TAsyncExecuteQueryResult { if (!QueryClient) { - Y_FAIL_S("QueryClient is not inited."); + Y_FAIL_S("QueryClient is not initialized."); } ++retryCount; if (queryInfo.AlterTable) {