From 383db7facbe43e69bc5ab99e65177e99db624d47 Mon Sep 17 00:00:00 2001 From: vlad-gogov Date: Fri, 20 Dec 2024 06:10:26 +0000 Subject: [PATCH 01/13] implemented without test --- ydb/core/formats/arrow/serializer/native.cpp | 2 +- ydb/core/formats/arrow/serializer/native.h | 18 +++- ydb/core/formats/arrow/serializer/utils.cpp | 6 +- ydb/core/formats/arrow/serializer/utils.h | 2 +- ydb/core/kqp/host/kqp_gateway_proxy.cpp | 13 ++- ydb/core/kqp/ut/common/columnshard.cpp | 32 +++++-- ydb/core/kqp/ut/common/columnshard.h | 2 +- ydb/core/kqp/ut/olap/sys_view_ut.cpp | 17 ++-- ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp | 89 ++++++++++++++----- .../olap/column_families/schema.cpp | 35 ++++---- .../olap/column_families/update.cpp | 65 ++++++++------ .../schemeshard/olap/column_families/update.h | 2 +- .../olap/operations/create_table.cpp | 1 - 13 files changed, 195 insertions(+), 89 deletions(-) diff --git a/ydb/core/formats/arrow/serializer/native.cpp b/ydb/core/formats/arrow/serializer/native.cpp index a580ed33414f..cd51b840640e 100644 --- a/ydb/core/formats/arrow/serializer/native.cpp +++ b/ydb/core/formats/arrow/serializer/native.cpp @@ -143,7 +143,7 @@ NKikimr::TConclusionStatus TNativeSerializer::DoDeserializeFromRequest(NYql::TFe level = levelLocal; } } - auto codecPtrStatus = BuildCodec(codec.value_or(Options.codec->compression_type()), level); + auto codecPtrStatus = BuildCodec(codec.value_or(GetDefaultCompressionType()), level); if (!codecPtrStatus) { return codecPtrStatus.GetError(); } diff --git a/ydb/core/formats/arrow/serializer/native.h b/ydb/core/formats/arrow/serializer/native.h index d09241a7799f..23247dc28a31 100644 --- a/ydb/core/formats/arrow/serializer/native.h +++ b/ydb/core/formats/arrow/serializer/native.h @@ -1,7 +1,10 @@ #pragma once #include "abstract.h" +#include "parsing.h" +#include +#include #include #include @@ -50,10 +53,23 @@ class TNativeSerializer: public ISerializer { virtual TConclusionStatus DoDeserializeFromRequest(NYql::TFeaturesExtractor& features) override; + static arrow::Compression::type GetDefaultCompressionType() { + return CompressionFromProto(AppData()->ColumnShardConfig.GetDefaultCompression()).value(); + } + + static std::shared_ptr GetDefaultCodec() { + arrow::Compression::type codec = GetDefaultCompressionType(); + if (codec == arrow::Compression::type::ZSTD) { + i32 codecLevel = AppData()->ColumnShardConfig.GetDefaultCompressionLevel(); + return NArrow::TStatusValidator::GetValid(arrow::util::Codec::Create(codec, codecLevel)); + } + return NArrow::TStatusValidator::GetValid(arrow::util::Codec::Create(codec)); + } + static arrow::ipc::IpcOptions BuildDefaultOptions() { arrow::ipc::IpcWriteOptions options; options.use_threads = false; - options.codec = *arrow::util::Codec::Create(arrow::Compression::LZ4_FRAME); + options.codec = GetDefaultCodec(); return options; } diff --git a/ydb/core/formats/arrow/serializer/utils.cpp b/ydb/core/formats/arrow/serializer/utils.cpp index 432086605caf..0bcd5ba28770 100644 --- a/ydb/core/formats/arrow/serializer/utils.cpp +++ b/ydb/core/formats/arrow/serializer/utils.cpp @@ -20,6 +20,10 @@ bool SupportsCompressionLevel(const arrow::Compression::type compression, const return true; } +bool SupportsCompressionLevel(const NKikimrSchemeOp::EColumnCodec compression, const i32 compressionLevel) { + return SupportsCompressionLevel(CompressionFromProto(compression).value(), compressionLevel); +} + std::optional MinimumCompressionLevel(const arrow::Compression::type compression) { if (!SupportsCompressionLevel(compression)) { return {}; @@ -32,4 +36,4 @@ std::optional MaximumCompressionLevel(const arrow::Compression::type compre } return NArrow::TStatusValidator::GetValid(arrow::util::Codec::MaximumCompressionLevel(compression)); } -} +} // namespace NKikimr::NArrow diff --git a/ydb/core/formats/arrow/serializer/utils.h b/ydb/core/formats/arrow/serializer/utils.h index 9d0e37378b8a..8e7de11170ed 100644 --- a/ydb/core/formats/arrow/serializer/utils.h +++ b/ydb/core/formats/arrow/serializer/utils.h @@ -10,4 +10,4 @@ bool SupportsCompressionLevel(const arrow::Compression::type compression, const std::optional MinimumCompressionLevel(const arrow::Compression::type compression); std::optional MaximumCompressionLevel(const arrow::Compression::type compression); -} +} // namespace NKikimr::NArrow diff --git a/ydb/core/kqp/host/kqp_gateway_proxy.cpp b/ydb/core/kqp/host/kqp_gateway_proxy.cpp index 81ceb584d6ba..1aa312d1c2e5 100644 --- a/ydb/core/kqp/host/kqp_gateway_proxy.cpp +++ b/ydb/core/kqp/host/kqp_gateway_proxy.cpp @@ -416,12 +416,19 @@ bool FillColumnTableSchema(NKikimrSchemeOp::TColumnTableSchema& schema, const T& } familyDescription->SetColumnCodec(codec); } else { - code = Ydb::StatusIds::BAD_REQUEST; - error = TStringBuilder() << "Compression is not set for column family'" << family.Name << "'"; - return false; + if (family.Name != "default") { + code = Ydb::StatusIds::BAD_REQUEST; + error = TStringBuilder() << "Compression is not set for non `default` column family '" << family.Name << "'"; + return false; + } } if (family.CompressionLevel.Defined()) { + if (!family.Compression.Defined()) { + code = Ydb::StatusIds::BAD_REQUEST; + error = TStringBuilder() << "Compression is not set for column family '" << family.Name << "', but compression level is set"; + return false; + } familyDescription->SetColumnCodecLevel(family.CompressionLevel.GetRef()); } } diff --git a/ydb/core/kqp/ut/common/columnshard.cpp b/ydb/core/kqp/ut/common/columnshard.cpp index 016440eff634..d1f25cc5bcbd 100644 --- a/ydb/core/kqp/ut/common/columnshard.cpp +++ b/ydb/core/kqp/ut/common/columnshard.cpp @@ -154,7 +154,9 @@ namespace NKqp { TString TTestHelper::TCompression::BuildQuery() const { TStringBuilder str; - str << "COMPRESSION=\"" << NArrow::CompressionToString(CompressionType) << "\""; + if (CompressionType.has_value()) { + str << "COMPRESSION=\"" << NArrow::CompressionToString(CompressionType.value()) << "\""; + } if (CompressionLevel.has_value()) { str << ", COMPRESSION_LEVEL=" << CompressionLevel.value(); } @@ -167,9 +169,16 @@ namespace NKqp { << "` and in right value `" << rhs.GetSerializerClassName() << "`"; return false; } - if (CompressionType != rhs.GetCompressionType()) { - errorMessage = TStringBuilder() << "different compression type: in left value `" << NArrow::CompressionToString(CompressionType) - << "` and in right value `" << NArrow::CompressionToString(rhs.GetCompressionType()) << "`"; + if (CompressionType.has_value() && rhs.GetCompressionType().has_value() && CompressionType.value() != rhs.GetCompressionType().value()) { + errorMessage = TStringBuilder() << "different compression type: in left value `" + << NArrow::CompressionToString(CompressionType.value()) << "` and in right value `" + << NArrow::CompressionToString(rhs.GetCompressionType().value()) << "`"; + return false; + } else if (CompressionType.has_value() && !rhs.GetCompressionType().has_value()) { + errorMessage = TStringBuilder() << "compression type is set in left value, but not set in right value"; + return false; + } else if (!CompressionType.has_value() && rhs.GetCompressionType().has_value()) { + errorMessage = TStringBuilder() << "compression type is not set in left value, but set in right value"; return false; } if (CompressionLevel.has_value() && rhs.GetCompressionLevel().has_value() && @@ -193,12 +202,15 @@ namespace NKqp { } bool TTestHelper::TColumnFamily::DeserializeFromProto(const NKikimrSchemeOp::TFamilyDescription& family) { - if (!family.HasId() || !family.HasName() || !family.HasColumnCodec()) { + if (!family.HasId() || !family.HasName()) { return false; } Id = family.GetId(); FamilyName = family.GetName(); - Compression = TTestHelper::TCompression().SetCompressionType(family.GetColumnCodec()); + Compression = TTestHelper::TCompression(); + if (family.HasColumnCodec()) { + Compression.SetCompressionType(family.GetColumnCodec()); + } if (family.HasColumnCodecLevel()) { Compression.SetCompressionLevel(family.GetColumnCodecLevel()); } @@ -290,9 +302,11 @@ namespace NKqp { TString TTestHelper::TColumnTableBase::BuildAlterCompressionQuery(const TString& columnName, const TCompression& compression) const { auto str = TStringBuilder() << "ALTER OBJECT `" << Name << "` (TYPE " << GetObjectType() << ") SET"; str << " (ACTION=ALTER_COLUMN, NAME=" << columnName << ", `SERIALIZER.CLASS_NAME`=`" << compression.GetSerializerClassName() << "`,"; - auto codec = NArrow::CompressionFromProto(compression.GetCompressionType()); - Y_VERIFY(codec.has_value()); - str << " `COMPRESSION.TYPE`=`" << NArrow::CompressionToString(codec.value()) << "`"; + if (compression.GetCompressionType().has_value()) { + auto codec = NArrow::CompressionFromProto(compression.GetCompressionType().value()); + Y_VERIFY(codec.has_value()); + str << " `COMPRESSION.TYPE`=`" << NArrow::CompressionToString(codec.value()) << "`"; + } if (compression.GetCompressionLevel().has_value()) { str << "`COMPRESSION.LEVEL`=" << compression.GetCompressionLevel().value(); } diff --git a/ydb/core/kqp/ut/common/columnshard.h b/ydb/core/kqp/ut/common/columnshard.h index ba85f6686162..e3e9d676729e 100644 --- a/ydb/core/kqp/ut/common/columnshard.h +++ b/ydb/core/kqp/ut/common/columnshard.h @@ -20,7 +20,7 @@ class TTestHelper { public: class TCompression { YDB_ACCESSOR(TString, SerializerClassName, "ARROW_SERIALIZER"); - YDB_ACCESSOR_DEF(NKikimrSchemeOp::EColumnCodec, CompressionType); + YDB_ACCESSOR_DEF(std::optional, CompressionType); YDB_ACCESSOR_DEF(std::optional, CompressionLevel); public: diff --git a/ydb/core/kqp/ut/olap/sys_view_ut.cpp b/ydb/core/kqp/ut/olap/sys_view_ut.cpp index 9d12b54efab6..223a942f467c 100644 --- a/ydb/core/kqp/ut/olap/sys_view_ut.cpp +++ b/ydb/core/kqp/ut/olap/sys_view_ut.cpp @@ -191,8 +191,10 @@ Y_UNIT_TEST_SUITE(KqpOlapSysView) { ui64 rawBytesPK1; ui64 bytesPK1; auto csController = NYDBTest::TControllers::RegisterCSControllerGuard(); - auto settings = TKikimrSettings() - .SetWithSampleTables(false); + NKikimrConfig::TAppConfig appConfig; + auto* CSConfig = appConfig.MutableColumnShardConfig(); + CSConfig->SetDefaultCompression(NKikimrSchemeOp::EColumnCodec::ColumnCodecPlain); + auto settings = TKikimrSettings().SetWithSampleTables(false).SetAppConfig(appConfig); TKikimrRunner kikimr(settings); Tests::NCommon::TLoggerInit(kikimr).Initialize(); TTypedLocalHelper helper("", kikimr, "olapTable", "olapStore"); @@ -259,8 +261,10 @@ Y_UNIT_TEST_SUITE(KqpOlapSysView) { ui64 rawBytes1; ui64 bytes1; auto csController = NYDBTest::TControllers::RegisterCSControllerGuard(); - auto settings = TKikimrSettings() - .SetWithSampleTables(false); + NKikimrConfig::TAppConfig appConfig; + auto* CSConfig = appConfig.MutableColumnShardConfig(); + CSConfig->SetDefaultCompression(NKikimrSchemeOp::EColumnCodec::ColumnCodecLZ4); + auto settings = TKikimrSettings().SetWithSampleTables(false).SetAppConfig(appConfig); TKikimrRunner kikimr(settings); Tests::NCommon::TLoggerInit(kikimr).Initialize(); TTypedLocalHelper helper("Utf8", kikimr); @@ -298,7 +302,10 @@ Y_UNIT_TEST_SUITE(KqpOlapSysView) { ui64 bytes1; auto csController = NYDBTest::TControllers::RegisterCSControllerGuard(); csController->SetSmallSizeDetector(Max()); - auto settings = TKikimrSettings().SetWithSampleTables(false); + NKikimrConfig::TAppConfig appConfig; + auto* CSConfig = appConfig.MutableColumnShardConfig(); + CSConfig->SetDefaultCompression(NKikimrSchemeOp::EColumnCodec::ColumnCodecLZ4); + auto settings = TKikimrSettings().SetWithSampleTables(false).SetAppConfig(appConfig); TKikimrRunner kikimr(settings); Tests::NCommon::TLoggerInit(kikimr).Initialize(); TTypedLocalHelper helper("Utf8", kikimr); diff --git a/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp b/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp index f9cd4148955a..7243ed178b9e 100644 --- a/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp +++ b/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp @@ -9519,10 +9519,7 @@ Y_UNIT_TEST_SUITE(KqpOlapScheme) { auto describeResult = DescribeTable(&runner.GetTestServer(), sender, tableName); auto schema = describeResult.GetPathDescription().GetColumnTableDescription().GetSchema(); - TTestHelper::TCompression plainCompression = - TTestHelper::TCompression().SetCompressionType(NKikimrSchemeOp::EColumnCodec::ColumnCodecPlain); - TTestHelper::TColumnFamily defaultFamily = - TTestHelper::TColumnFamily().SetId(0).SetFamilyName("default").SetCompression(plainCompression); + TTestHelper::TColumnFamily defaultFamily = TTestHelper::TColumnFamily().SetId(0).SetFamilyName("default"); UNIT_ASSERT_EQUAL(schema.ColumnFamiliesSize(), 1); TTestHelper::TColumnFamily defaultFromScheme; @@ -9531,18 +9528,6 @@ Y_UNIT_TEST_SUITE(KqpOlapScheme) { TString errorMessage; UNIT_ASSERT_C(defaultFromScheme.IsEqual(defaultFamily, errorMessage), errorMessage); } - - auto columns = schema.GetColumns(); - for (ui32 i = 0; i < schema.ColumnsSize(); i++) { - auto column = columns[i]; - UNIT_ASSERT(column.HasSerializer()); - UNIT_ASSERT_EQUAL_C( - column.GetColumnFamilyId(), 0, TStringBuilder() << "family for column " << column.GetName() << " is not default"); - TTestHelper::TCompression compression; - UNIT_ASSERT(compression.DeserializeFromProto(schema.GetColumns(i).GetSerializer())); - TString errorMessage; - UNIT_ASSERT_C(compression.IsEqual(defaultFamily.GetCompression(), errorMessage), errorMessage); - } } // Field `Data` is not used in ColumnFamily for ColumnTable @@ -10314,11 +10299,10 @@ Y_UNIT_TEST_SUITE(KqpOlapScheme) { TTestHelper testHelper(TKikimrSettings().SetWithSampleTables(false)); TString tableName = "/Root/TableWithFamily"; - TTestHelper::TCompression plainCompression = - TTestHelper::TCompression().SetCompressionType(NKikimrSchemeOp::EColumnCodec::ColumnCodecPlain); + TTestHelper::TCompression lz4Compression = TTestHelper::TCompression().SetCompressionType(NKikimrSchemeOp::EColumnCodec::ColumnCodecLZ4); TVector families = { - TTestHelper::TColumnFamily().SetId(1).SetFamilyName("family1").SetCompression(plainCompression), + TTestHelper::TColumnFamily().SetId(1).SetFamilyName("family1").SetCompression(lz4Compression), }; { @@ -10341,7 +10325,7 @@ Y_UNIT_TEST_SUITE(KqpOlapScheme) { testHelper.CreateTable(testTable); } - families.push_back(TTestHelper::TColumnFamily().SetId(0).SetFamilyName("default").SetCompression(plainCompression)); + families.push_back(TTestHelper::TColumnFamily().SetId(0).SetFamilyName("default")); auto& runner = testHelper.GetKikimr(); auto runtime = runner.GetTestServer().GetRuntime(); TActorId sender = runtime->AllocateEdgeActor(); @@ -10686,6 +10670,70 @@ Y_UNIT_TEST_SUITE(KqpOlapScheme) { testHelper.CreateTable(testTable, EStatus::GENERIC_ERROR); } + Y_UNIT_TEST(CreateTableWithDefaultFamilyWithoutSettings) { + TTestHelper testHelper(TKikimrSettings().SetWithSampleTables(false)); + TString tableName = "/Root/ColumnTableTest"; + auto session = testHelper.GetSession(); + auto createQuery = TStringBuilder() << R"(CREATE TABLE `)" << tableName << R"(` ( + Key Uint64 NOT NULL, + Value1 String, + Value2 Uint32, + PRIMARY KEY (Key), + FAMILY default ()) + WITH (STORE = COLUMN);)"; + auto result = session.ExecuteSchemeQuery(createQuery).GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + + auto& runner = testHelper.GetKikimr(); + auto runtime = runner.GetTestServer().GetRuntime(); + TActorId sender = runtime->AllocateEdgeActor(); + + auto describeResult = DescribeTable(&runner.GetTestServer(), sender, tableName); + auto schema = describeResult.GetPathDescription().GetColumnTableDescription().GetSchema(); + TTestHelper::TColumnFamily defaultFamily = TTestHelper::TColumnFamily().SetId(0).SetFamilyName("default"); + + UNIT_ASSERT_EQUAL(schema.ColumnFamiliesSize(), 1); + TTestHelper::TColumnFamily defaultFromScheme; + UNIT_ASSERT(defaultFromScheme.DeserializeFromProto(schema.GetColumnFamilies(0))); + { + TString errorMessage; + UNIT_ASSERT_C(defaultFromScheme.IsEqual(defaultFamily, errorMessage), errorMessage); + } + } + + Y_UNIT_TEST(CreateTableWithFamilyWithOnlyCompressionLevel) { + TTestHelper testHelper(TKikimrSettings().SetWithSampleTables(false)); + TString tableName = "/Root/ColumnTableTest"; + auto session = testHelper.GetSession(); + auto createQuery = TStringBuilder() << R"(CREATE TABLE `)" << tableName << R"(` ( + Key Uint64 NOT NULL, + Value1 String, + Value2 Uint32, + PRIMARY KEY (Key), + FAMILY family1 ( + COMPRESSION_LEVEL = 2 + )) + WITH (STORE = COLUMN);)"; + auto result = session.ExecuteSchemeQuery(createQuery).GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::GENERIC_ERROR, result.GetIssues().ToString()); + } + + Y_UNIT_TEST(CreateTableNonDefaultFamilyWithoutCompression) { + TTestHelper testHelper(TKikimrSettings().SetWithSampleTables(false)); + TString tableName = "/Root/ColumnTableTest"; + auto session = testHelper.GetSession(); + auto createQuery = TStringBuilder() << R"(CREATE TABLE `)" << tableName << R"(` ( + Key Uint64 NOT NULL, + Value1 String, + Value2 Uint32, + PRIMARY KEY (Key), + FAMILY family1 ( + )) + WITH (STORE = COLUMN);)"; + auto result = session.ExecuteSchemeQuery(createQuery).GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::GENERIC_ERROR, result.GetIssues().ToString()); + } + Y_UNIT_TEST(DropColumnAndResetTtl) { TKikimrSettings runnerSettings; runnerSettings.WithSampleTables = false; @@ -10765,7 +10813,6 @@ Y_UNIT_TEST_SUITE(KqpOlapScheme) { testHelper.RebootTablets("/Root/ColumnTableTest"); } - } Y_UNIT_TEST_SUITE(KqpOlapTypes) { diff --git a/ydb/core/tx/schemeshard/olap/column_families/schema.cpp b/ydb/core/tx/schemeshard/olap/column_families/schema.cpp index 7667e984d532..875f5678e1e0 100644 --- a/ydb/core/tx/schemeshard/olap/column_families/schema.cpp +++ b/ydb/core/tx/schemeshard/olap/column_families/schema.cpp @@ -119,23 +119,24 @@ bool TOlapColumnFamiliesDescription::Validate(const NKikimrSchemeOp::TColumnTabl } lastColumnFamilyId = familyProto.GetId(); - if (!familyProto.HasColumnCodec()) { - errors.AddError("missing column codec for column family '" + columnFamilyName + "'"); - return false; - } - - auto serializerProto = ConvertFamilyDescriptionToProtoSerializer(familyProto); - if (serializerProto.IsFail()) { - errors.AddError(serializerProto.GetErrorMessage()); - return false; - } - NArrow::NSerialization::TSerializerContainer serializer; - if (!serializer.DeserializeFromProto(serializerProto.GetResult())) { - errors.AddError(TStringBuilder() << "can't deserialize column family `" << columnFamilyName << "` from proto "); - return false; - } - if (!family->GetSerializerContainer().IsEqualTo(serializer)) { - errors.AddError(TStringBuilder() << "compression from column family '" << columnFamilyName << "` is not matching schema preset"); + if (familyProto.HasColumnCodec() && family->GetSerializerContainer().has_value()) { + auto serializerProto = ConvertFamilyDescriptionToProtoSerializer(familyProto); + if (serializerProto.IsFail()) { + errors.AddError(serializerProto.GetErrorMessage()); + return false; + } + NArrow::NSerialization::TSerializerContainer serializer; + if (!serializer.DeserializeFromProto(serializerProto.GetResult())) { + errors.AddError(TStringBuilder() << "can't deserialize column family `" << columnFamilyName << "` from proto "); + return false; + } + if (!family->GetSerializerContainer()->IsEqualTo(serializer)) { + errors.AddError(TStringBuilder() << "compression from column family '" << columnFamilyName << "` is not matching schema preset"); + return false; + } + } else if ((!familyProto.HasColumnCodec() && family->GetSerializerContainer().has_value()) || + (familyProto.HasColumnCodec() && !family->GetSerializerContainer().has_value())) { + errors.AddError(TStringBuilder() << "compression is not matching schema preset in column family `" << columnFamilyName << "`"); return false; } } diff --git a/ydb/core/tx/schemeshard/olap/column_families/update.cpp b/ydb/core/tx/schemeshard/olap/column_families/update.cpp index d365688ca46e..979df0f0701a 100644 --- a/ydb/core/tx/schemeshard/olap/column_families/update.cpp +++ b/ydb/core/tx/schemeshard/olap/column_families/update.cpp @@ -85,55 +85,66 @@ bool TOlapColumnFamlilyAdd::ParseFromRequest(const NKikimrSchemeOp::TFamilyDescr } Name = columnFamily.GetName(); - auto serializer = ConvertFamilyDescriptionToProtoSerializer(columnFamily); - if (serializer.IsFail()) { - errors.AddError(serializer.GetErrorMessage()); - return false; - } - auto resultBuild = NArrow::NSerialization::TSerializerContainer::BuildFromProto(serializer.GetResult()); - if (resultBuild.IsFail()) { - errors.AddError(resultBuild.GetErrorMessage()); - return false; + if (columnFamily.HasColumnCodec()) { + auto serializer = ConvertFamilyDescriptionToProtoSerializer(columnFamily); + if (serializer.IsFail()) { + errors.AddError(serializer.GetErrorMessage()); + return false; + } + auto resultBuild = NArrow::NSerialization::TSerializerContainer::BuildFromProto(serializer.GetResult()); + if (resultBuild.IsFail()) { + errors.AddError(resultBuild.GetErrorMessage()); + return false; + } + SerializerContainer = resultBuild.GetResult(); } - SerializerContainer = resultBuild.GetResult(); return true; } void TOlapColumnFamlilyAdd::ParseFromLocalDB(const NKikimrSchemeOp::TFamilyDescription& columnFamily) { Name = columnFamily.GetName(); - auto serializer = ConvertFamilyDescriptionToProtoSerializer(columnFamily); - Y_VERIFY_S(serializer.IsSuccess(), serializer.GetErrorMessage()); - Y_VERIFY(SerializerContainer.DeserializeFromProto(serializer.GetResult())); + if (columnFamily.HasColumnCodec()) { + auto serializer = ConvertFamilyDescriptionToProtoSerializer(columnFamily); + Y_VERIFY_S(serializer.IsSuccess(), serializer.GetErrorMessage()); + SerializerContainer = NArrow::NSerialization::TSerializerContainer(); + Y_VERIFY(SerializerContainer->DeserializeFromProto(serializer.GetResult())); + } } void TOlapColumnFamlilyAdd::Serialize(NKikimrSchemeOp::TFamilyDescription& columnFamily) const { - auto result = ConvertSerializerContainerToFamilyDescription(SerializerContainer); - Y_VERIFY_S(result.IsSuccess(), result.GetErrorMessage()); columnFamily.SetName(Name); - columnFamily.SetColumnCodec(result->GetColumnCodec()); - if (result->HasColumnCodecLevel()) { - columnFamily.SetColumnCodecLevel(result->GetColumnCodecLevel()); + if (SerializerContainer.has_value()) { + auto result = ConvertSerializerContainerToFamilyDescription(SerializerContainer.value()); + Y_VERIFY_S(result.IsSuccess(), result.GetErrorMessage()); + columnFamily.SetColumnCodec(result->GetColumnCodec()); + if (result->HasColumnCodecLevel()) { + columnFamily.SetColumnCodecLevel(result->GetColumnCodecLevel()); + } } } bool TOlapColumnFamlilyAdd::ApplyDiff(const TOlapColumnFamlilyDiff& diffColumnFamily, IErrorCollector& errors) { Y_ABORT_UNLESS(GetName() == diffColumnFamily.GetName()); - auto newColumnFamily = ConvertSerializerContainerToFamilyDescription(SerializerContainer); - if (newColumnFamily.IsFail()) { - errors.AddError(newColumnFamily.GetErrorMessage()); - return false; + NKikimrSchemeOp::TFamilyDescription newColumnFamily; + if (SerializerContainer.has_value()) { + auto resultConvert = ConvertSerializerContainerToFamilyDescription(SerializerContainer.value()); + if (resultConvert.IsFail()) { + errors.AddError(resultConvert.GetErrorMessage()); + return false; + } + newColumnFamily = resultConvert.GetResult(); } - newColumnFamily->SetName(GetName()); + newColumnFamily.SetName(GetName()); auto codec = diffColumnFamily.GetCodec(); if (codec.has_value()) { - newColumnFamily->SetColumnCodec(codec.value()); - newColumnFamily->ClearColumnCodecLevel(); + newColumnFamily.SetColumnCodec(codec.value()); + newColumnFamily.ClearColumnCodecLevel(); } auto codecLevel = diffColumnFamily.GetCodecLevel(); if (codecLevel.has_value()) { - newColumnFamily->SetColumnCodecLevel(codecLevel.value()); + newColumnFamily.SetColumnCodecLevel(codecLevel.value()); } - auto serializer = ConvertFamilyDescriptionToProtoSerializer(newColumnFamily.GetResult()); + auto serializer = ConvertFamilyDescriptionToProtoSerializer(newColumnFamily); if (serializer.IsFail()) { errors.AddError(serializer.GetErrorMessage()); return false; diff --git a/ydb/core/tx/schemeshard/olap/column_families/update.h b/ydb/core/tx/schemeshard/olap/column_families/update.h index 63c0e53af16d..171567f411f7 100644 --- a/ydb/core/tx/schemeshard/olap/column_families/update.h +++ b/ydb/core/tx/schemeshard/olap/column_families/update.h @@ -23,7 +23,7 @@ class TOlapColumnFamlilyDiff { class TOlapColumnFamlilyAdd { private: YDB_READONLY_DEF(TString, Name); - YDB_READONLY_DEF(NArrow::NSerialization::TSerializerContainer, SerializerContainer); + YDB_READONLY_DEF(std::optional, SerializerContainer); public: bool ParseFromRequest(const NKikimrSchemeOp::TFamilyDescription& columnFamily, IErrorCollector& errors); diff --git a/ydb/core/tx/schemeshard/olap/operations/create_table.cpp b/ydb/core/tx/schemeshard/olap/operations/create_table.cpp index a407a991689c..00e434421e09 100644 --- a/ydb/core/tx/schemeshard/olap/operations/create_table.cpp +++ b/ydb/core/tx/schemeshard/olap/operations/create_table.cpp @@ -571,7 +571,6 @@ class TCreateColumnTable: public TSubOperation { auto defaultFamily = mutableSchema->AddColumnFamilies(); defaultFamily->SetName("default"); defaultFamily->SetId(0); - defaultFamily->SetColumnCodec(NKikimrSchemeOp::EColumnCodec::ColumnCodecPlain); for (ui32 i = 0; i < schema.ColumnsSize(); i++) { if (!schema.GetColumns(i).HasColumnFamilyName() || !schema.GetColumns(i).HasColumnFamilyId()) { From d6ecc915d0cc7ca14336ded8c45932c6779d3b0f Mon Sep 17 00:00:00 2001 From: vlad-gogov Date: Fri, 20 Dec 2024 10:34:34 +0000 Subject: [PATCH 02/13] fix --- ydb/core/formats/arrow/serializer/native.h | 19 ++++++++++++++----- ydb/core/kqp/ut/olap/sys_view_ut.cpp | 2 +- 2 files changed, 15 insertions(+), 6 deletions(-) diff --git a/ydb/core/formats/arrow/serializer/native.h b/ydb/core/formats/arrow/serializer/native.h index 23247dc28a31..0bf637704e0d 100644 --- a/ydb/core/formats/arrow/serializer/native.h +++ b/ydb/core/formats/arrow/serializer/native.h @@ -54,14 +54,23 @@ class TNativeSerializer: public ISerializer { virtual TConclusionStatus DoDeserializeFromRequest(NYql::TFeaturesExtractor& features) override; static arrow::Compression::type GetDefaultCompressionType() { - return CompressionFromProto(AppData()->ColumnShardConfig.GetDefaultCompression()).value(); + if (!HasAppData() && !AppDataVerified().ColumnShardConfig.HasDefaultCompression()) { + return arrow::Compression::ZSTD; + } + return CompressionFromProto(AppDataVerified().ColumnShardConfig.GetDefaultCompression()).value(); } static std::shared_ptr GetDefaultCodec() { + if (!HasAppData() || (!AppDataVerified().ColumnShardConfig.HasDefaultCompression() && + !AppDataVerified().ColumnShardConfig.HasDefaultCompressionLevel())) { + return NArrow::TStatusValidator::GetValid(arrow::util::Codec::Create(arrow::Compression::type::ZSTD, 1)); + } arrow::Compression::type codec = GetDefaultCompressionType(); - if (codec == arrow::Compression::type::ZSTD) { - i32 codecLevel = AppData()->ColumnShardConfig.GetDefaultCompressionLevel(); - return NArrow::TStatusValidator::GetValid(arrow::util::Codec::Create(codec, codecLevel)); + if (AppDataVerified().ColumnShardConfig.HasDefaultCompressionLevel()) { + return NArrow::TStatusValidator::GetValid( + arrow::util::Codec::Create(codec, AppDataVerified().ColumnShardConfig.GetDefaultCompressionLevel())); + } else if (codec == arrow::Compression::ZSTD) { + return NArrow::TStatusValidator::GetValid(arrow::util::Codec::Create(codec, 1)); } return NArrow::TStatusValidator::GetValid(arrow::util::Codec::Create(codec)); } @@ -99,7 +108,7 @@ class TNativeSerializer: public ISerializer { } static arrow::ipc::IpcOptions GetDefaultOptions() { - static arrow::ipc::IpcWriteOptions options = BuildDefaultOptions(); + arrow::ipc::IpcWriteOptions options = BuildDefaultOptions(); return options; } diff --git a/ydb/core/kqp/ut/olap/sys_view_ut.cpp b/ydb/core/kqp/ut/olap/sys_view_ut.cpp index 223a942f467c..0d9108b5d241 100644 --- a/ydb/core/kqp/ut/olap/sys_view_ut.cpp +++ b/ydb/core/kqp/ut/olap/sys_view_ut.cpp @@ -193,7 +193,7 @@ Y_UNIT_TEST_SUITE(KqpOlapSysView) { auto csController = NYDBTest::TControllers::RegisterCSControllerGuard(); NKikimrConfig::TAppConfig appConfig; auto* CSConfig = appConfig.MutableColumnShardConfig(); - CSConfig->SetDefaultCompression(NKikimrSchemeOp::EColumnCodec::ColumnCodecPlain); + CSConfig->SetDefaultCompression(NKikimrSchemeOp::EColumnCodec::ColumnCodecLZ4); auto settings = TKikimrSettings().SetWithSampleTables(false).SetAppConfig(appConfig); TKikimrRunner kikimr(settings); Tests::NCommon::TLoggerInit(kikimr).Initialize(); From 3e28787039c0b63a7ff53ae533bf61447aa9e1cf Mon Sep 17 00:00:00 2001 From: vlad-gogov Date: Fri, 20 Dec 2024 12:18:09 +0000 Subject: [PATCH 03/13] fix --- ydb/core/formats/arrow/serializer/native.h | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/ydb/core/formats/arrow/serializer/native.h b/ydb/core/formats/arrow/serializer/native.h index 0bf637704e0d..1e3506d525c3 100644 --- a/ydb/core/formats/arrow/serializer/native.h +++ b/ydb/core/formats/arrow/serializer/native.h @@ -61,14 +61,14 @@ class TNativeSerializer: public ISerializer { } static std::shared_ptr GetDefaultCodec() { - if (!HasAppData() || (!AppDataVerified().ColumnShardConfig.HasDefaultCompression() && - !AppDataVerified().ColumnShardConfig.HasDefaultCompressionLevel())) { + if (!HasAppData() || + (!AppData()->ColumnShardConfig.HasDefaultCompression() && !AppData()->ColumnShardConfig.HasDefaultCompressionLevel())) { return NArrow::TStatusValidator::GetValid(arrow::util::Codec::Create(arrow::Compression::type::ZSTD, 1)); } arrow::Compression::type codec = GetDefaultCompressionType(); - if (AppDataVerified().ColumnShardConfig.HasDefaultCompressionLevel()) { + if (AppData()->ColumnShardConfig.HasDefaultCompressionLevel()) { return NArrow::TStatusValidator::GetValid( - arrow::util::Codec::Create(codec, AppDataVerified().ColumnShardConfig.GetDefaultCompressionLevel())); + arrow::util::Codec::Create(codec, AppData()->ColumnShardConfig.GetDefaultCompressionLevel())); } else if (codec == arrow::Compression::ZSTD) { return NArrow::TStatusValidator::GetValid(arrow::util::Codec::Create(codec, 1)); } From b3a9d1ab23d7af88b51aea23a97ce83322cb4e46 Mon Sep 17 00:00:00 2001 From: vlad-gogov Date: Fri, 20 Dec 2024 18:40:15 +0000 Subject: [PATCH 04/13] test --- ydb/core/kqp/ut/olap/compression_ut.cpp | 90 +++++++++++++++++++++++++ 1 file changed, 90 insertions(+) diff --git a/ydb/core/kqp/ut/olap/compression_ut.cpp b/ydb/core/kqp/ut/olap/compression_ut.cpp index b325324a1f3d..391f6b688eec 100644 --- a/ydb/core/kqp/ut/olap/compression_ut.cpp +++ b/ydb/core/kqp/ut/olap/compression_ut.cpp @@ -1,7 +1,54 @@ #include +#include +#include +#include + +#include namespace NKikimr::NKqp { +std::pair GetVolumes( + const TKikimrRunner& runner, const TString& tablePath, const std::vector columnNames, const bool verbose = false) { + TString selectQuery = "SELECT * FROM `" + tablePath + "/.sys/primary_index_stats` WHERE Activity == 1"; + if (columnNames.size()) { + selectQuery += " AND EntityName IN ('" + JoinSeq("','", columnNames) + "')"; + } + + auto tableClient = runner.GetTableClient(); + std::optional rawBytesPred; + std::optional bytesPred; + while (true) { + auto rows = ExecuteScanQuery(tableClient, selectQuery, verbose); + ui64 rawBytes = 0; + ui64 bytes = 0; + for (auto&& r : rows) { + if (verbose) { + Cerr << "-------" << Endl; + } + for (auto&& c : r) { + if (c.first == "RawBytes") { + rawBytes += GetUint64(c.second); + } + if (c.first == "BlobRangeSize") { + bytes += GetUint64(c.second); + } + if (verbose) { + Cerr << c.first << ":" << Endl << c.second.GetProto().DebugString() << Endl; + } + } + } + if (rawBytesPred && *rawBytesPred == rawBytes && bytesPred && *bytesPred == bytes) { + break; + } else { + rawBytesPred = rawBytes; + bytesPred = bytes; + Cerr << "Wait changes: " << bytes << "/" << rawBytes << Endl; + Sleep(TDuration::Seconds(5)); + } + } + return { rawBytesPred.value(), bytesPred.value() }; +} + Y_UNIT_TEST_SUITE(KqpOlapCompression) { Y_UNIT_TEST(DisabledAlterCompression) { TKikimrSettings settings = TKikimrSettings().SetWithSampleTables(false).SetEnableOlapCompression(false); @@ -63,5 +110,48 @@ Y_UNIT_TEST_SUITE(KqpOlapCompression) { testHelper.CreateTable(testTable); testHelper.SetCompression(testTable, "pk_int", compression, NYdb::EStatus::SCHEME_ERROR); } + + std::pair GetVolumesColumnWithCompression(const std::optional& CSConfig = {}) { + auto csController = NYDBTest::TControllers::RegisterCSControllerGuard(); + NKikimrConfig::TAppConfig appConfig; + if (CSConfig.has_value()) { + *appConfig.MutableColumnShardConfig() = CSConfig.value(); + } + Cerr << appConfig.columnshardconfig().HasDefaultCompression() << Endl; + auto settings = TKikimrSettings().SetWithSampleTables(false).SetAppConfig(appConfig); + TTestHelper testHelper(settings); + Tests::NCommon::TLoggerInit(testHelper.GetKikimr()).Initialize(); + TTestHelper::TCompression plainCompression = + TTestHelper::TCompression().SetCompressionType(NKikimrSchemeOp::EColumnCodec::ColumnCodecPlain); + + TVector schema = { + TTestHelper::TColumnSchema().SetName("pk_int").SetType(NScheme::NTypeIds::Uint64).SetNullable(false) + }; + + TString tableName = "/Root/ColumnTableTest"; + TTestHelper::TColumnTable testTable; + testTable.SetName(tableName).SetPrimaryKey({ "pk_int" }).SetSharding({ "pk_int" }).SetSchema(schema); + testHelper.CreateTable(testTable); + + TVector dataBuilders; + dataBuilders.push_back( + NArrow::NConstruction::TSimpleArrayConstructor>::BuildNotNullable( + "pk_int", false)); + auto batch = NArrow::NConstruction::TRecordBatchConstructor(dataBuilders).BuildBatch(1000000); + testHelper.BulkUpsert(testTable, batch); + csController->WaitCompactions(TDuration::Seconds(10)); + return GetVolumes(testHelper.GetKikimr(), tableName, { "pk_int" }, true); + } + + Y_UNIT_TEST(DefaultCompressionViaCSConfig) { + NKikimrConfig::TColumnShardConfig csConfig = NKikimrConfig::TColumnShardConfig(); + csConfig.SetDefaultCompression(NKikimrSchemeOp::EColumnCodec::ColumnCodecPlain); + auto [rawBytesPK1, bytesPK1] = GetVolumesColumnWithCompression(csConfig); + auto [rawBytesPK2, bytesPK2] = GetVolumesColumnWithCompression(); // Default compression (ZSTD, 1) + Cerr << rawBytesPK1 << " " << bytesPK1 << Endl; + Cerr << rawBytesPK2 << " " << bytesPK2 << Endl; + AFL_VERIFY(rawBytesPK2 == rawBytesPK1)("pk1", rawBytesPK1)("pk2", rawBytesPK2); + AFL_VERIFY(bytesPK2 < bytesPK1 / 3)("pk1", bytesPK1)("pk2", bytesPK2); + } } } From e62eef1fbdc74e440652c319921781ae2718dded Mon Sep 17 00:00:00 2001 From: vlad-gogov Date: Thu, 9 Jan 2025 07:35:08 +0000 Subject: [PATCH 05/13] fix --- ydb/core/base/appdata_fwd.h | 3 ++- ydb/core/formats/arrow/serializer/native.h | 4 ++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/ydb/core/base/appdata_fwd.h b/ydb/core/base/appdata_fwd.h index ae2bc9ab9fa5..a93548dc1294 100644 --- a/ydb/core/base/appdata_fwd.h +++ b/ydb/core/base/appdata_fwd.h @@ -297,7 +297,8 @@ inline TAppData* AppData(NActors::TActorSystem* actorSystem) { } inline bool HasAppData() { - return !!NActors::TlsActivationContext; + return !!NActors::TlsActivationContext && NActors::TlsActivationContext->ExecutorThread.ActorSystem && + NActors::TlsActivationContext->ExecutorThread.ActorSystem->AppData(); } inline TAppData& AppDataVerified() { diff --git a/ydb/core/formats/arrow/serializer/native.h b/ydb/core/formats/arrow/serializer/native.h index 1e3506d525c3..b76ad9577d98 100644 --- a/ydb/core/formats/arrow/serializer/native.h +++ b/ydb/core/formats/arrow/serializer/native.h @@ -54,10 +54,10 @@ class TNativeSerializer: public ISerializer { virtual TConclusionStatus DoDeserializeFromRequest(NYql::TFeaturesExtractor& features) override; static arrow::Compression::type GetDefaultCompressionType() { - if (!HasAppData() && !AppDataVerified().ColumnShardConfig.HasDefaultCompression()) { + if (!HasAppData() && !AppData()->ColumnShardConfig.HasDefaultCompression()) { return arrow::Compression::ZSTD; } - return CompressionFromProto(AppDataVerified().ColumnShardConfig.GetDefaultCompression()).value(); + return CompressionFromProto(AppData()->ColumnShardConfig.GetDefaultCompression()).value(); } static std::shared_ptr GetDefaultCodec() { From c00d14485a83cd36a8f60a2def6dd0775910a917 Mon Sep 17 00:00:00 2001 From: vlad-gogov Date: Fri, 10 Jan 2025 06:14:57 +0000 Subject: [PATCH 06/13] fix --- ydb/core/base/appdata_fwd.h | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/ydb/core/base/appdata_fwd.h b/ydb/core/base/appdata_fwd.h index a93548dc1294..09da416778ab 100644 --- a/ydb/core/base/appdata_fwd.h +++ b/ydb/core/base/appdata_fwd.h @@ -304,9 +304,8 @@ inline bool HasAppData() { inline TAppData& AppDataVerified() { Y_ABORT_UNLESS(HasAppData()); auto& actorSystem = NActors::TlsActivationContext->ExecutorThread.ActorSystem; - Y_ABORT_UNLESS(actorSystem); TAppData* const x = actorSystem->AppData(); - Y_ABORT_UNLESS(x && x->Magic == TAppData::MagicTag); + Y_ABORT_UNLESS(x->Magic == TAppData::MagicTag); return *x; } From db36b0ac562c9aac3590cd37f2bf6fd49dd13d59 Mon Sep 17 00:00:00 2001 From: vlad-gogov Date: Tue, 14 Jan 2025 06:50:01 +0000 Subject: [PATCH 07/13] fix test and delete debug messages --- ydb/core/kqp/ut/olap/compression_ut.cpp | 18 ++++-------------- 1 file changed, 4 insertions(+), 14 deletions(-) diff --git a/ydb/core/kqp/ut/olap/compression_ut.cpp b/ydb/core/kqp/ut/olap/compression_ut.cpp index 391f6b688eec..1ac700490a08 100644 --- a/ydb/core/kqp/ut/olap/compression_ut.cpp +++ b/ydb/core/kqp/ut/olap/compression_ut.cpp @@ -8,23 +8,19 @@ namespace NKikimr::NKqp { std::pair GetVolumes( - const TKikimrRunner& runner, const TString& tablePath, const std::vector columnNames, const bool verbose = false) { + const TKikimrRunner& runner, const TString& tablePath, const std::vector columnNames) { TString selectQuery = "SELECT * FROM `" + tablePath + "/.sys/primary_index_stats` WHERE Activity == 1"; if (columnNames.size()) { selectQuery += " AND EntityName IN ('" + JoinSeq("','", columnNames) + "')"; } - auto tableClient = runner.GetTableClient(); std::optional rawBytesPred; std::optional bytesPred; while (true) { - auto rows = ExecuteScanQuery(tableClient, selectQuery, verbose); + auto rows = ExecuteScanQuery(tableClient, selectQuery, false); ui64 rawBytes = 0; ui64 bytes = 0; for (auto&& r : rows) { - if (verbose) { - Cerr << "-------" << Endl; - } for (auto&& c : r) { if (c.first == "RawBytes") { rawBytes += GetUint64(c.second); @@ -32,9 +28,6 @@ std::pair GetVolumes( if (c.first == "BlobRangeSize") { bytes += GetUint64(c.second); } - if (verbose) { - Cerr << c.first << ":" << Endl << c.second.GetProto().DebugString() << Endl; - } } } if (rawBytesPred && *rawBytesPred == rawBytes && bytesPred && *bytesPred == bytes) { @@ -117,7 +110,6 @@ Y_UNIT_TEST_SUITE(KqpOlapCompression) { if (CSConfig.has_value()) { *appConfig.MutableColumnShardConfig() = CSConfig.value(); } - Cerr << appConfig.columnshardconfig().HasDefaultCompression() << Endl; auto settings = TKikimrSettings().SetWithSampleTables(false).SetAppConfig(appConfig); TTestHelper testHelper(settings); Tests::NCommon::TLoggerInit(testHelper.GetKikimr()).Initialize(); @@ -137,10 +129,10 @@ Y_UNIT_TEST_SUITE(KqpOlapCompression) { dataBuilders.push_back( NArrow::NConstruction::TSimpleArrayConstructor>::BuildNotNullable( "pk_int", false)); - auto batch = NArrow::NConstruction::TRecordBatchConstructor(dataBuilders).BuildBatch(1000000); + auto batch = NArrow::NConstruction::TRecordBatchConstructor(dataBuilders).BuildBatch(100000); testHelper.BulkUpsert(testTable, batch); csController->WaitCompactions(TDuration::Seconds(10)); - return GetVolumes(testHelper.GetKikimr(), tableName, { "pk_int" }, true); + return GetVolumes(testHelper.GetKikimr(), tableName, { "pk_int" }); } Y_UNIT_TEST(DefaultCompressionViaCSConfig) { @@ -148,8 +140,6 @@ Y_UNIT_TEST_SUITE(KqpOlapCompression) { csConfig.SetDefaultCompression(NKikimrSchemeOp::EColumnCodec::ColumnCodecPlain); auto [rawBytesPK1, bytesPK1] = GetVolumesColumnWithCompression(csConfig); auto [rawBytesPK2, bytesPK2] = GetVolumesColumnWithCompression(); // Default compression (ZSTD, 1) - Cerr << rawBytesPK1 << " " << bytesPK1 << Endl; - Cerr << rawBytesPK2 << " " << bytesPK2 << Endl; AFL_VERIFY(rawBytesPK2 == rawBytesPK1)("pk1", rawBytesPK1)("pk2", rawBytesPK2); AFL_VERIFY(bytesPK2 < bytesPK1 / 3)("pk1", bytesPK1)("pk2", bytesPK2); } From dc5c43df16b8d34113e0db7ccdea3f70694ed95a Mon Sep 17 00:00:00 2001 From: vlad-gogov Date: Wed, 15 Jan 2025 08:27:14 +0000 Subject: [PATCH 08/13] revert changes --- ydb/core/base/appdata_fwd.h | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/ydb/core/base/appdata_fwd.h b/ydb/core/base/appdata_fwd.h index 09da416778ab..ae2bc9ab9fa5 100644 --- a/ydb/core/base/appdata_fwd.h +++ b/ydb/core/base/appdata_fwd.h @@ -297,15 +297,15 @@ inline TAppData* AppData(NActors::TActorSystem* actorSystem) { } inline bool HasAppData() { - return !!NActors::TlsActivationContext && NActors::TlsActivationContext->ExecutorThread.ActorSystem && - NActors::TlsActivationContext->ExecutorThread.ActorSystem->AppData(); + return !!NActors::TlsActivationContext; } inline TAppData& AppDataVerified() { Y_ABORT_UNLESS(HasAppData()); auto& actorSystem = NActors::TlsActivationContext->ExecutorThread.ActorSystem; + Y_ABORT_UNLESS(actorSystem); TAppData* const x = actorSystem->AppData(); - Y_ABORT_UNLESS(x->Magic == TAppData::MagicTag); + Y_ABORT_UNLESS(x && x->Magic == TAppData::MagicTag); return *x; } From f94e573477120d6226a11af667a998a487a7c1a8 Mon Sep 17 00:00:00 2001 From: vlad-gogov Date: Wed, 15 Jan 2025 12:06:05 +0000 Subject: [PATCH 09/13] resolve conversations --- ydb/core/base/appdata_fwd.h | 6 +++--- ydb/core/formats/arrow/serializer/native.h | 8 +++++--- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/ydb/core/base/appdata_fwd.h b/ydb/core/base/appdata_fwd.h index ae2bc9ab9fa5..09da416778ab 100644 --- a/ydb/core/base/appdata_fwd.h +++ b/ydb/core/base/appdata_fwd.h @@ -297,15 +297,15 @@ inline TAppData* AppData(NActors::TActorSystem* actorSystem) { } inline bool HasAppData() { - return !!NActors::TlsActivationContext; + return !!NActors::TlsActivationContext && NActors::TlsActivationContext->ExecutorThread.ActorSystem && + NActors::TlsActivationContext->ExecutorThread.ActorSystem->AppData(); } inline TAppData& AppDataVerified() { Y_ABORT_UNLESS(HasAppData()); auto& actorSystem = NActors::TlsActivationContext->ExecutorThread.ActorSystem; - Y_ABORT_UNLESS(actorSystem); TAppData* const x = actorSystem->AppData(); - Y_ABORT_UNLESS(x && x->Magic == TAppData::MagicTag); + Y_ABORT_UNLESS(x->Magic == TAppData::MagicTag); return *x; } diff --git a/ydb/core/formats/arrow/serializer/native.h b/ydb/core/formats/arrow/serializer/native.h index b76ad9577d98..c3cbad051c39 100644 --- a/ydb/core/formats/arrow/serializer/native.h +++ b/ydb/core/formats/arrow/serializer/native.h @@ -23,6 +23,10 @@ class TNativeSerializer: public ISerializer { private: arrow::ipc::IpcWriteOptions Options; + static inline std::shared_ptr DefaultCodec() { + return NArrow::TStatusValidator::GetValid(arrow::util::Codec::Create(arrow::Compression::type::ZSTD, 1)); + } + TConclusion> BuildCodec(const arrow::Compression::type& cType, const std::optional level) const; static const inline TFactory::TRegistrator Registrator = TFactory::TRegistrator(GetClassNameStatic()); protected: @@ -63,14 +67,12 @@ class TNativeSerializer: public ISerializer { static std::shared_ptr GetDefaultCodec() { if (!HasAppData() || (!AppData()->ColumnShardConfig.HasDefaultCompression() && !AppData()->ColumnShardConfig.HasDefaultCompressionLevel())) { - return NArrow::TStatusValidator::GetValid(arrow::util::Codec::Create(arrow::Compression::type::ZSTD, 1)); + return DefaultCodec(); } arrow::Compression::type codec = GetDefaultCompressionType(); if (AppData()->ColumnShardConfig.HasDefaultCompressionLevel()) { return NArrow::TStatusValidator::GetValid( arrow::util::Codec::Create(codec, AppData()->ColumnShardConfig.GetDefaultCompressionLevel())); - } else if (codec == arrow::Compression::ZSTD) { - return NArrow::TStatusValidator::GetValid(arrow::util::Codec::Create(codec, 1)); } return NArrow::TStatusValidator::GetValid(arrow::util::Codec::Create(codec)); } From 06990a629036f9fb87113b301b8fddc3b6445a02 Mon Sep 17 00:00:00 2001 From: vlad-gogov Date: Fri, 17 Jan 2025 11:01:01 +0000 Subject: [PATCH 10/13] post-review fixes --- ydb/core/base/appdata_fwd.h | 5 +++-- ydb/core/formats/arrow/serializer/utils.h | 2 +- ydb/core/kqp/ut/common/columnshard.cpp | 12 ++++++------ ydb/core/kqp/ut/common/columnshard.h | 2 +- .../tx/schemeshard/olap/column_families/schema.cpp | 8 ++++---- .../tx/schemeshard/olap/column_families/update.h | 2 +- ydb/core/tx/schemeshard/olap/columns/schema.cpp | 2 +- ydb/core/tx/schemeshard/olap/columns/update.cpp | 4 ++-- 8 files changed, 19 insertions(+), 18 deletions(-) diff --git a/ydb/core/base/appdata_fwd.h b/ydb/core/base/appdata_fwd.h index 09da416778ab..83d75f41941a 100644 --- a/ydb/core/base/appdata_fwd.h +++ b/ydb/core/base/appdata_fwd.h @@ -297,8 +297,9 @@ inline TAppData* AppData(NActors::TActorSystem* actorSystem) { } inline bool HasAppData() { - return !!NActors::TlsActivationContext && NActors::TlsActivationContext->ExecutorThread.ActorSystem && - NActors::TlsActivationContext->ExecutorThread.ActorSystem->AppData(); + return !!NActors::TlsActivationContext + && NActors::TlsActivationContext->ExecutorThread.ActorSystem + && NActors::TlsActivationContext->ExecutorThread.ActorSystem->AppData(); } inline TAppData& AppDataVerified() { diff --git a/ydb/core/formats/arrow/serializer/utils.h b/ydb/core/formats/arrow/serializer/utils.h index 8e7de11170ed..9d0e37378b8a 100644 --- a/ydb/core/formats/arrow/serializer/utils.h +++ b/ydb/core/formats/arrow/serializer/utils.h @@ -10,4 +10,4 @@ bool SupportsCompressionLevel(const arrow::Compression::type compression, const std::optional MinimumCompressionLevel(const arrow::Compression::type compression); std::optional MaximumCompressionLevel(const arrow::Compression::type compression); -} // namespace NKikimr::NArrow +} diff --git a/ydb/core/kqp/ut/common/columnshard.cpp b/ydb/core/kqp/ut/common/columnshard.cpp index d1f25cc5bcbd..d456eba2a292 100644 --- a/ydb/core/kqp/ut/common/columnshard.cpp +++ b/ydb/core/kqp/ut/common/columnshard.cpp @@ -169,15 +169,15 @@ namespace NKqp { << "` and in right value `" << rhs.GetSerializerClassName() << "`"; return false; } - if (CompressionType.has_value() && rhs.GetCompressionType().has_value() && CompressionType.value() != rhs.GetCompressionType().value()) { + if (CompressionType.has_value() && rhs.HasCompressionType() && CompressionType.value() != rhs.GetCompressionTypeUnsafe()) { errorMessage = TStringBuilder() << "different compression type: in left value `" << NArrow::CompressionToString(CompressionType.value()) << "` and in right value `" - << NArrow::CompressionToString(rhs.GetCompressionType().value()) << "`"; + << NArrow::CompressionToString(rhs.GetCompressionTypeUnsafe()) << "`"; return false; - } else if (CompressionType.has_value() && !rhs.GetCompressionType().has_value()) { + } else if (CompressionType.has_value() && !rhs.HasCompressionType()) { errorMessage = TStringBuilder() << "compression type is set in left value, but not set in right value"; return false; - } else if (!CompressionType.has_value() && rhs.GetCompressionType().has_value()) { + } else if (!CompressionType.has_value() && rhs.HasCompressionType()) { errorMessage = TStringBuilder() << "compression type is not set in left value, but set in right value"; return false; } @@ -302,8 +302,8 @@ namespace NKqp { TString TTestHelper::TColumnTableBase::BuildAlterCompressionQuery(const TString& columnName, const TCompression& compression) const { auto str = TStringBuilder() << "ALTER OBJECT `" << Name << "` (TYPE " << GetObjectType() << ") SET"; str << " (ACTION=ALTER_COLUMN, NAME=" << columnName << ", `SERIALIZER.CLASS_NAME`=`" << compression.GetSerializerClassName() << "`,"; - if (compression.GetCompressionType().has_value()) { - auto codec = NArrow::CompressionFromProto(compression.GetCompressionType().value()); + if (compression.HasCompressionType()) { + auto codec = NArrow::CompressionFromProto(compression.GetCompressionTypeUnsafe()); Y_VERIFY(codec.has_value()); str << " `COMPRESSION.TYPE`=`" << NArrow::CompressionToString(codec.value()) << "`"; } diff --git a/ydb/core/kqp/ut/common/columnshard.h b/ydb/core/kqp/ut/common/columnshard.h index e3e9d676729e..8d5bcf0ee518 100644 --- a/ydb/core/kqp/ut/common/columnshard.h +++ b/ydb/core/kqp/ut/common/columnshard.h @@ -20,7 +20,7 @@ class TTestHelper { public: class TCompression { YDB_ACCESSOR(TString, SerializerClassName, "ARROW_SERIALIZER"); - YDB_ACCESSOR_DEF(std::optional, CompressionType); + YDB_OPT(NKikimrSchemeOp::EColumnCodec, CompressionType); YDB_ACCESSOR_DEF(std::optional, CompressionLevel); public: diff --git a/ydb/core/tx/schemeshard/olap/column_families/schema.cpp b/ydb/core/tx/schemeshard/olap/column_families/schema.cpp index 875f5678e1e0..3496937d26ad 100644 --- a/ydb/core/tx/schemeshard/olap/column_families/schema.cpp +++ b/ydb/core/tx/schemeshard/olap/column_families/schema.cpp @@ -119,7 +119,7 @@ bool TOlapColumnFamiliesDescription::Validate(const NKikimrSchemeOp::TColumnTabl } lastColumnFamilyId = familyProto.GetId(); - if (familyProto.HasColumnCodec() && family->GetSerializerContainer().has_value()) { + if (familyProto.HasColumnCodec() && family->HasSerializerContainer()) { auto serializerProto = ConvertFamilyDescriptionToProtoSerializer(familyProto); if (serializerProto.IsFail()) { errors.AddError(serializerProto.GetErrorMessage()); @@ -130,12 +130,12 @@ bool TOlapColumnFamiliesDescription::Validate(const NKikimrSchemeOp::TColumnTabl errors.AddError(TStringBuilder() << "can't deserialize column family `" << columnFamilyName << "` from proto "); return false; } - if (!family->GetSerializerContainer()->IsEqualTo(serializer)) { + if (!family->GetSerializerContainerOptional()->IsEqualTo(serializer)) { errors.AddError(TStringBuilder() << "compression from column family '" << columnFamilyName << "` is not matching schema preset"); return false; } - } else if ((!familyProto.HasColumnCodec() && family->GetSerializerContainer().has_value()) || - (familyProto.HasColumnCodec() && !family->GetSerializerContainer().has_value())) { + } else if ((!familyProto.HasColumnCodec() && family->HasSerializerContainer()) || + (familyProto.HasColumnCodec() && !family->HasSerializerContainer())) { errors.AddError(TStringBuilder() << "compression is not matching schema preset in column family `" << columnFamilyName << "`"); return false; } diff --git a/ydb/core/tx/schemeshard/olap/column_families/update.h b/ydb/core/tx/schemeshard/olap/column_families/update.h index 171567f411f7..37d1b1c661cc 100644 --- a/ydb/core/tx/schemeshard/olap/column_families/update.h +++ b/ydb/core/tx/schemeshard/olap/column_families/update.h @@ -23,7 +23,7 @@ class TOlapColumnFamlilyDiff { class TOlapColumnFamlilyAdd { private: YDB_READONLY_DEF(TString, Name); - YDB_READONLY_DEF(std::optional, SerializerContainer); + YDB_READONLY_OPT(NArrow::NSerialization::TSerializerContainer, SerializerContainer); public: bool ParseFromRequest(const NKikimrSchemeOp::TFamilyDescription& columnFamily, IErrorCollector& errors); diff --git a/ydb/core/tx/schemeshard/olap/columns/schema.cpp b/ydb/core/tx/schemeshard/olap/columns/schema.cpp index 950ba879b133..507a96561f20 100644 --- a/ydb/core/tx/schemeshard/olap/columns/schema.cpp +++ b/ydb/core/tx/schemeshard/olap/columns/schema.cpp @@ -117,7 +117,7 @@ bool TOlapColumnsDescription::ApplyUpdate( } ui32 id = column.GetColumnFamilyId().value(); if (alterColumnFamiliesId.contains(id)) { - column.SetSerializer(columnFamilies.GetByIdVerified(id)->GetSerializerContainer()); + column.SetSerializer(columnFamilies.GetByIdVerified(id)->GetSerializerContainerOptional()); } } } diff --git a/ydb/core/tx/schemeshard/olap/columns/update.cpp b/ydb/core/tx/schemeshard/olap/columns/update.cpp index 30ffe4e3faf3..1d3cccc02bfc 100644 --- a/ydb/core/tx/schemeshard/olap/columns/update.cpp +++ b/ydb/core/tx/schemeshard/olap/columns/update.cpp @@ -198,7 +198,7 @@ void TOlapColumnBase::Serialize(NKikimrSchemeOp::TOlapColumnDescription& columnS bool TOlapColumnBase::ApplySerializerFromColumnFamily(const TOlapColumnFamiliesDescription& columnFamilies, IErrorCollector& errors) { if (GetColumnFamilyId().has_value()) { - SetSerializer(columnFamilies.GetByIdVerified(GetColumnFamilyId().value())->GetSerializerContainer()); + SetSerializer(columnFamilies.GetByIdVerified(GetColumnFamilyId().value())->GetSerializerContainerOptional()); } else { TString familyName = "default"; const TOlapColumnFamily* columnFamily = columnFamilies.GetByName(familyName); @@ -210,7 +210,7 @@ bool TOlapColumnBase::ApplySerializerFromColumnFamily(const TOlapColumnFamiliesD } ColumnFamilyId = columnFamily->GetId(); - SetSerializer(columnFamilies.GetByIdVerified(columnFamily->GetId())->GetSerializerContainer()); + SetSerializer(columnFamilies.GetByIdVerified(columnFamily->GetId())->GetSerializerContainerOptional()); } return true; } From 1cf5f023f6517ef21375cf3c28442bbb4b6ede16 Mon Sep 17 00:00:00 2001 From: vlad-gogov Date: Thu, 23 Jan 2025 09:56:40 +0000 Subject: [PATCH 11/13] post-review fixes --- ydb/core/formats/arrow/serializer/native.cpp | 2 +- ydb/core/formats/arrow/serializer/native.h | 6 +----- ydb/core/formats/arrow/serializer/utils.cpp | 4 ---- 3 files changed, 2 insertions(+), 10 deletions(-) diff --git a/ydb/core/formats/arrow/serializer/native.cpp b/ydb/core/formats/arrow/serializer/native.cpp index cd51b840640e..a580ed33414f 100644 --- a/ydb/core/formats/arrow/serializer/native.cpp +++ b/ydb/core/formats/arrow/serializer/native.cpp @@ -143,7 +143,7 @@ NKikimr::TConclusionStatus TNativeSerializer::DoDeserializeFromRequest(NYql::TFe level = levelLocal; } } - auto codecPtrStatus = BuildCodec(codec.value_or(GetDefaultCompressionType()), level); + auto codecPtrStatus = BuildCodec(codec.value_or(Options.codec->compression_type()), level); if (!codecPtrStatus) { return codecPtrStatus.GetError(); } diff --git a/ydb/core/formats/arrow/serializer/native.h b/ydb/core/formats/arrow/serializer/native.h index c3cbad051c39..372ce988a07f 100644 --- a/ydb/core/formats/arrow/serializer/native.h +++ b/ydb/core/formats/arrow/serializer/native.h @@ -23,10 +23,6 @@ class TNativeSerializer: public ISerializer { private: arrow::ipc::IpcWriteOptions Options; - static inline std::shared_ptr DefaultCodec() { - return NArrow::TStatusValidator::GetValid(arrow::util::Codec::Create(arrow::Compression::type::ZSTD, 1)); - } - TConclusion> BuildCodec(const arrow::Compression::type& cType, const std::optional level) const; static const inline TFactory::TRegistrator Registrator = TFactory::TRegistrator(GetClassNameStatic()); protected: @@ -67,7 +63,7 @@ class TNativeSerializer: public ISerializer { static std::shared_ptr GetDefaultCodec() { if (!HasAppData() || (!AppData()->ColumnShardConfig.HasDefaultCompression() && !AppData()->ColumnShardConfig.HasDefaultCompressionLevel())) { - return DefaultCodec(); + return NArrow::TStatusValidator::GetValid(arrow::util::Codec::Create(arrow::Compression::type::ZSTD, 1)); } arrow::Compression::type codec = GetDefaultCompressionType(); if (AppData()->ColumnShardConfig.HasDefaultCompressionLevel()) { diff --git a/ydb/core/formats/arrow/serializer/utils.cpp b/ydb/core/formats/arrow/serializer/utils.cpp index 0bcd5ba28770..cda91a203b10 100644 --- a/ydb/core/formats/arrow/serializer/utils.cpp +++ b/ydb/core/formats/arrow/serializer/utils.cpp @@ -20,10 +20,6 @@ bool SupportsCompressionLevel(const arrow::Compression::type compression, const return true; } -bool SupportsCompressionLevel(const NKikimrSchemeOp::EColumnCodec compression, const i32 compressionLevel) { - return SupportsCompressionLevel(CompressionFromProto(compression).value(), compressionLevel); -} - std::optional MinimumCompressionLevel(const arrow::Compression::type compression) { if (!SupportsCompressionLevel(compression)) { return {}; From c7afb537cabaeefca685964893a81ce883021860 Mon Sep 17 00:00:00 2001 From: vlad-gogov Date: Thu, 23 Jan 2025 11:33:37 +0000 Subject: [PATCH 12/13] fix --- ydb/core/formats/arrow/serializer/native.h | 41 +++++++++++----------- ydb/core/kqp/ut/olap/compression_ut.cpp | 7 ++-- 2 files changed, 24 insertions(+), 24 deletions(-) diff --git a/ydb/core/formats/arrow/serializer/native.h b/ydb/core/formats/arrow/serializer/native.h index 372ce988a07f..de33865692b9 100644 --- a/ydb/core/formats/arrow/serializer/native.h +++ b/ydb/core/formats/arrow/serializer/native.h @@ -25,6 +25,11 @@ class TNativeSerializer: public ISerializer { TConclusion> BuildCodec(const arrow::Compression::type& cType, const std::optional level) const; static const inline TFactory::TRegistrator Registrator = TFactory::TRegistrator(GetClassNameStatic()); + + static std::shared_ptr GetDefaultCodec() { + return *arrow::util::Codec::Create(arrow::Compression::LZ4_FRAME); + } + protected: virtual bool IsCompatibleForExchangeWithSameClass(const ISerializer& /*item*/) const override { return true; @@ -53,30 +58,24 @@ class TNativeSerializer: public ISerializer { virtual TConclusionStatus DoDeserializeFromRequest(NYql::TFeaturesExtractor& features) override; - static arrow::Compression::type GetDefaultCompressionType() { - if (!HasAppData() && !AppData()->ColumnShardConfig.HasDefaultCompression()) { - return arrow::Compression::ZSTD; - } - return CompressionFromProto(AppData()->ColumnShardConfig.GetDefaultCompression()).value(); - } - - static std::shared_ptr GetDefaultCodec() { - if (!HasAppData() || - (!AppData()->ColumnShardConfig.HasDefaultCompression() && !AppData()->ColumnShardConfig.HasDefaultCompressionLevel())) { - return NArrow::TStatusValidator::GetValid(arrow::util::Codec::Create(arrow::Compression::type::ZSTD, 1)); - } - arrow::Compression::type codec = GetDefaultCompressionType(); - if (AppData()->ColumnShardConfig.HasDefaultCompressionLevel()) { - return NArrow::TStatusValidator::GetValid( - arrow::util::Codec::Create(codec, AppData()->ColumnShardConfig.GetDefaultCompressionLevel())); - } - return NArrow::TStatusValidator::GetValid(arrow::util::Codec::Create(codec)); - } - static arrow::ipc::IpcOptions BuildDefaultOptions() { arrow::ipc::IpcWriteOptions options; options.use_threads = false; - options.codec = GetDefaultCodec(); + if (HasAppData()) { + if (AppData()->ColumnShardConfig.HasDefaultCompression()) { + arrow::Compression::type codec = CompressionFromProto(AppData()->ColumnShardConfig.GetDefaultCompression()).value(); + if (AppData()->ColumnShardConfig.HasDefaultCompressionLevel()) { + options.codec = NArrow::TStatusValidator::GetValid( + arrow::util::Codec::Create(codec, AppData()->ColumnShardConfig.GetDefaultCompressionLevel())); + } else { + options.codec = NArrow::TStatusValidator::GetValid(arrow::util::Codec::Create(codec)); + } + } else { + options.codec = GetDefaultCodec(); + } + } else { + options.codec = GetDefaultCodec(); + } return options; } diff --git a/ydb/core/kqp/ut/olap/compression_ut.cpp b/ydb/core/kqp/ut/olap/compression_ut.cpp index 1ac700490a08..454a1ff00170 100644 --- a/ydb/core/kqp/ut/olap/compression_ut.cpp +++ b/ydb/core/kqp/ut/olap/compression_ut.cpp @@ -136,10 +136,11 @@ Y_UNIT_TEST_SUITE(KqpOlapCompression) { } Y_UNIT_TEST(DefaultCompressionViaCSConfig) { + auto [rawBytesPK1, bytesPK1] = GetVolumesColumnWithCompression(); // Default compression LZ4 NKikimrConfig::TColumnShardConfig csConfig = NKikimrConfig::TColumnShardConfig(); - csConfig.SetDefaultCompression(NKikimrSchemeOp::EColumnCodec::ColumnCodecPlain); - auto [rawBytesPK1, bytesPK1] = GetVolumesColumnWithCompression(csConfig); - auto [rawBytesPK2, bytesPK2] = GetVolumesColumnWithCompression(); // Default compression (ZSTD, 1) + csConfig.SetDefaultCompression(NKikimrSchemeOp::EColumnCodec::ColumnCodecZSTD); + csConfig.SetDefaultCompressionLevel(1); + auto [rawBytesPK2, bytesPK2] = GetVolumesColumnWithCompression(csConfig); AFL_VERIFY(rawBytesPK2 == rawBytesPK1)("pk1", rawBytesPK1)("pk2", rawBytesPK2); AFL_VERIFY(bytesPK2 < bytesPK1 / 3)("pk1", bytesPK1)("pk2", bytesPK2); } From 63ab3c06f4b2800e79b1f82d87b68cdc32577803 Mon Sep 17 00:00:00 2001 From: vlad-gogov Date: Fri, 24 Jan 2025 05:19:04 +0000 Subject: [PATCH 13/13] post-review fixes --- .../tx/schemeshard/olap/column_families/schema.cpp | 8 ++++---- .../tx/schemeshard/olap/column_families/update.cpp | 13 ++++++++----- .../tx/schemeshard/olap/column_families/update.h | 2 +- ydb/core/tx/schemeshard/olap/columns/schema.cpp | 4 ++-- ydb/core/tx/schemeshard/olap/columns/update.cpp | 6 +++--- ydb/core/tx/schemeshard/olap/columns/update.h | 2 +- 6 files changed, 19 insertions(+), 16 deletions(-) diff --git a/ydb/core/tx/schemeshard/olap/column_families/schema.cpp b/ydb/core/tx/schemeshard/olap/column_families/schema.cpp index 3496937d26ad..957788be869a 100644 --- a/ydb/core/tx/schemeshard/olap/column_families/schema.cpp +++ b/ydb/core/tx/schemeshard/olap/column_families/schema.cpp @@ -119,7 +119,7 @@ bool TOlapColumnFamiliesDescription::Validate(const NKikimrSchemeOp::TColumnTabl } lastColumnFamilyId = familyProto.GetId(); - if (familyProto.HasColumnCodec() && family->HasSerializerContainer()) { + if (familyProto.HasColumnCodec() && family->GetSerializerContainer().HasObject()) { auto serializerProto = ConvertFamilyDescriptionToProtoSerializer(familyProto); if (serializerProto.IsFail()) { errors.AddError(serializerProto.GetErrorMessage()); @@ -130,12 +130,12 @@ bool TOlapColumnFamiliesDescription::Validate(const NKikimrSchemeOp::TColumnTabl errors.AddError(TStringBuilder() << "can't deserialize column family `" << columnFamilyName << "` from proto "); return false; } - if (!family->GetSerializerContainerOptional()->IsEqualTo(serializer)) { + if (!family->GetSerializerContainer().IsEqualTo(serializer)) { errors.AddError(TStringBuilder() << "compression from column family '" << columnFamilyName << "` is not matching schema preset"); return false; } - } else if ((!familyProto.HasColumnCodec() && family->HasSerializerContainer()) || - (familyProto.HasColumnCodec() && !family->HasSerializerContainer())) { + } else if ((!familyProto.HasColumnCodec() && family->GetSerializerContainer().HasObject()) || + (familyProto.HasColumnCodec() && !family->GetSerializerContainer().HasObject())) { errors.AddError(TStringBuilder() << "compression is not matching schema preset in column family `" << columnFamilyName << "`"); return false; } diff --git a/ydb/core/tx/schemeshard/olap/column_families/update.cpp b/ydb/core/tx/schemeshard/olap/column_families/update.cpp index 979df0f0701a..2234c3e0f36c 100644 --- a/ydb/core/tx/schemeshard/olap/column_families/update.cpp +++ b/ydb/core/tx/schemeshard/olap/column_families/update.cpp @@ -44,6 +44,9 @@ NKikimr::TConclusion ConvertFamilyDes NKikimr::TConclusion ConvertSerializerContainerToFamilyDescription( const NArrow::NSerialization::TSerializerContainer& serializer) { + if (!serializer.HasObject()) { + return NKikimr::TConclusionStatus::Fail("convert TSerializerContainer to TFamilyDescription: container doesn't have object"); + } NKikimrSchemeOp::TFamilyDescription result; if (serializer->GetClassName().empty()) { return NKikimr::TConclusionStatus::Fail("convert TSerializerContainer to TFamilyDescription: field `ClassName` is empty"); @@ -107,14 +110,14 @@ void TOlapColumnFamlilyAdd::ParseFromLocalDB(const NKikimrSchemeOp::TFamilyDescr auto serializer = ConvertFamilyDescriptionToProtoSerializer(columnFamily); Y_VERIFY_S(serializer.IsSuccess(), serializer.GetErrorMessage()); SerializerContainer = NArrow::NSerialization::TSerializerContainer(); - Y_VERIFY(SerializerContainer->DeserializeFromProto(serializer.GetResult())); + Y_VERIFY(SerializerContainer.DeserializeFromProto(serializer.GetResult())); } } void TOlapColumnFamlilyAdd::Serialize(NKikimrSchemeOp::TFamilyDescription& columnFamily) const { columnFamily.SetName(Name); - if (SerializerContainer.has_value()) { - auto result = ConvertSerializerContainerToFamilyDescription(SerializerContainer.value()); + if (SerializerContainer.HasObject()) { + auto result = ConvertSerializerContainerToFamilyDescription(SerializerContainer); Y_VERIFY_S(result.IsSuccess(), result.GetErrorMessage()); columnFamily.SetColumnCodec(result->GetColumnCodec()); if (result->HasColumnCodecLevel()) { @@ -126,8 +129,8 @@ void TOlapColumnFamlilyAdd::Serialize(NKikimrSchemeOp::TFamilyDescription& colum bool TOlapColumnFamlilyAdd::ApplyDiff(const TOlapColumnFamlilyDiff& diffColumnFamily, IErrorCollector& errors) { Y_ABORT_UNLESS(GetName() == diffColumnFamily.GetName()); NKikimrSchemeOp::TFamilyDescription newColumnFamily; - if (SerializerContainer.has_value()) { - auto resultConvert = ConvertSerializerContainerToFamilyDescription(SerializerContainer.value()); + if (SerializerContainer.HasObject()) { + auto resultConvert = ConvertSerializerContainerToFamilyDescription(SerializerContainer); if (resultConvert.IsFail()) { errors.AddError(resultConvert.GetErrorMessage()); return false; diff --git a/ydb/core/tx/schemeshard/olap/column_families/update.h b/ydb/core/tx/schemeshard/olap/column_families/update.h index 37d1b1c661cc..63c0e53af16d 100644 --- a/ydb/core/tx/schemeshard/olap/column_families/update.h +++ b/ydb/core/tx/schemeshard/olap/column_families/update.h @@ -23,7 +23,7 @@ class TOlapColumnFamlilyDiff { class TOlapColumnFamlilyAdd { private: YDB_READONLY_DEF(TString, Name); - YDB_READONLY_OPT(NArrow::NSerialization::TSerializerContainer, SerializerContainer); + YDB_READONLY_DEF(NArrow::NSerialization::TSerializerContainer, SerializerContainer); public: bool ParseFromRequest(const NKikimrSchemeOp::TFamilyDescription& columnFamily, IErrorCollector& errors); diff --git a/ydb/core/tx/schemeshard/olap/columns/schema.cpp b/ydb/core/tx/schemeshard/olap/columns/schema.cpp index 507a96561f20..f533a3476bf9 100644 --- a/ydb/core/tx/schemeshard/olap/columns/schema.cpp +++ b/ydb/core/tx/schemeshard/olap/columns/schema.cpp @@ -56,7 +56,7 @@ bool TOlapColumnsDescription::ApplyUpdate( if (newColumn.GetKeyOrder()) { Y_ABORT_UNLESS(orderedKeyColumnIds.emplace(*newColumn.GetKeyOrder(), newColumn.GetId()).second); } - if (!newColumn.GetSerializer().has_value() && !columnFamilies.GetColumnFamilies().empty() && + if (!newColumn.GetSerializer().HasObject() && !columnFamilies.GetColumnFamilies().empty() && !newColumn.ApplySerializerFromColumnFamily(columnFamilies, errors)) { return false; } @@ -117,7 +117,7 @@ bool TOlapColumnsDescription::ApplyUpdate( } ui32 id = column.GetColumnFamilyId().value(); if (alterColumnFamiliesId.contains(id)) { - column.SetSerializer(columnFamilies.GetByIdVerified(id)->GetSerializerContainerOptional()); + column.SetSerializer(columnFamilies.GetByIdVerified(id)->GetSerializerContainer()); } } } diff --git a/ydb/core/tx/schemeshard/olap/columns/update.cpp b/ydb/core/tx/schemeshard/olap/columns/update.cpp index 1d3cccc02bfc..dff1c82bb8c2 100644 --- a/ydb/core/tx/schemeshard/olap/columns/update.cpp +++ b/ydb/core/tx/schemeshard/olap/columns/update.cpp @@ -180,7 +180,7 @@ void TOlapColumnBase::Serialize(NKikimrSchemeOp::TOlapColumnDescription& columnS columnSchema.SetColumnFamilyId(ColumnFamilyId.value()); } if (Serializer) { - Serializer->SerializeToProto(*columnSchema.MutableSerializer()); + Serializer.SerializeToProto(*columnSchema.MutableSerializer()); } if (AccessorConstructor) { *columnSchema.MutableDataAccessorConstructor() = AccessorConstructor.SerializeToProto(); @@ -198,7 +198,7 @@ void TOlapColumnBase::Serialize(NKikimrSchemeOp::TOlapColumnDescription& columnS bool TOlapColumnBase::ApplySerializerFromColumnFamily(const TOlapColumnFamiliesDescription& columnFamilies, IErrorCollector& errors) { if (GetColumnFamilyId().has_value()) { - SetSerializer(columnFamilies.GetByIdVerified(GetColumnFamilyId().value())->GetSerializerContainerOptional()); + SetSerializer(columnFamilies.GetByIdVerified(GetColumnFamilyId().value())->GetSerializerContainer()); } else { TString familyName = "default"; const TOlapColumnFamily* columnFamily = columnFamilies.GetByName(familyName); @@ -210,7 +210,7 @@ bool TOlapColumnBase::ApplySerializerFromColumnFamily(const TOlapColumnFamiliesD } ColumnFamilyId = columnFamily->GetId(); - SetSerializer(columnFamilies.GetByIdVerified(columnFamily->GetId())->GetSerializerContainerOptional()); + SetSerializer(columnFamilies.GetByIdVerified(columnFamily->GetId())->GetSerializerContainer()); } return true; } diff --git a/ydb/core/tx/schemeshard/olap/columns/update.h b/ydb/core/tx/schemeshard/olap/columns/update.h index 4f87cf014d37..5670ac5f23e9 100644 --- a/ydb/core/tx/schemeshard/olap/columns/update.h +++ b/ydb/core/tx/schemeshard/olap/columns/update.h @@ -35,7 +35,7 @@ class TOlapColumnBase { YDB_READONLY_DEF(NScheme::TTypeInfo, Type); YDB_READONLY_DEF(TString, StorageId); YDB_FLAG_ACCESSOR(NotNull, false); - YDB_ACCESSOR_DEF(std::optional, Serializer); + YDB_ACCESSOR_DEF(NArrow::NSerialization::TSerializerContainer, Serializer); YDB_READONLY_DEF(std::optional, DictionaryEncoding); YDB_READONLY_DEF(NOlap::TColumnDefaultScalarValue, DefaultValue); YDB_READONLY_DEF(NArrow::NAccessor::TConstructorContainer, AccessorConstructor);