Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change default compression via CSConfig #12542

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions ydb/core/base/appdata_fwd.h
Original file line number Diff line number Diff line change
Expand Up @@ -297,15 +297,15 @@ inline TAppData* AppData(NActors::TActorSystem* actorSystem) {
}

inline bool HasAppData() {
return !!NActors::TlsActivationContext;
return !!NActors::TlsActivationContext && NActors::TlsActivationContext->ExecutorThread.ActorSystem &&
NActors::TlsActivationContext->ExecutorThread.ActorSystem->AppData<TAppData>();
}

inline TAppData& AppDataVerified() {
Y_ABORT_UNLESS(HasAppData());
auto& actorSystem = NActors::TlsActivationContext->ExecutorThread.ActorSystem;
Y_ABORT_UNLESS(actorSystem);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Зачем уносить проверки в HasAppData? Может быть так, что TlsActivationContext есть, но в нём нет AppData?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

У меня получилась ситация, HasAppData возвращает true, но AppData в нем нет, поэтому я не мог обратиться к конфигурации CS, падало на

Y_ABORT_UNLESS(x && x->Magic == TAppData::MagicTag);
(переменная x была пустая), поэтому я подумал, что проверка HasAppData должна проверять наличие AppData

TAppData* const x = actorSystem->AppData<TAppData>();
Y_ABORT_UNLESS(x && x->Magic == TAppData::MagicTag);
Y_ABORT_UNLESS(x->Magic == TAppData::MagicTag);
return *x;
}

Expand Down
2 changes: 1 addition & 1 deletion ydb/core/formats/arrow/serializer/native.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ NKikimr::TConclusionStatus TNativeSerializer::DoDeserializeFromRequest(NYql::TFe
level = levelLocal;
}
}
auto codecPtrStatus = BuildCodec(codec.value_or(Options.codec->compression_type()), level);
auto codecPtrStatus = BuildCodec(codec.value_or(GetDefaultCompressionType()), level);
if (!codecPtrStatus) {
return codecPtrStatus.GetError();
}
Expand Down
29 changes: 27 additions & 2 deletions ydb/core/formats/arrow/serializer/native.h
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
#pragma once

#include "abstract.h"
#include "parsing.h"

#include <ydb/core/base/appdata_fwd.h>
#include <ydb/core/protos/config.pb.h>
#include <ydb/core/protos/flat_scheme_op.pb.h>

#include <ydb/library/accessor/accessor.h>
Expand Down Expand Up @@ -50,10 +53,32 @@ class TNativeSerializer: public ISerializer {

virtual TConclusionStatus DoDeserializeFromRequest(NYql::TFeaturesExtractor& features) override;

static arrow::Compression::type GetDefaultCompressionType() {
if (!HasAppData() && !AppData()->ColumnShardConfig.HasDefaultCompression()) {
return arrow::Compression::ZSTD;
}
return CompressionFromProto(AppData()->ColumnShardConfig.GetDefaultCompression()).value();
}

static std::shared_ptr<arrow::util::Codec> GetDefaultCodec() {
if (!HasAppData() ||
(!AppData()->ColumnShardConfig.HasDefaultCompression() && !AppData()->ColumnShardConfig.HasDefaultCompressionLevel())) {
return NArrow::TStatusValidator::GetValid(arrow::util::Codec::Create(arrow::Compression::type::ZSTD, 1));
}
arrow::Compression::type codec = GetDefaultCompressionType();
if (AppData()->ColumnShardConfig.HasDefaultCompressionLevel()) {
return NArrow::TStatusValidator::GetValid(
arrow::util::Codec::Create(codec, AppData()->ColumnShardConfig.GetDefaultCompressionLevel()));
} else if (codec == arrow::Compression::ZSTD) {
return NArrow::TStatusValidator::GetValid(arrow::util::Codec::Create(codec, 1));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Зачем хардкодить дефолтный compression level для ZTDS? Create(codec) не выставит нужное значение по-умолчанию?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Create(codec) по умолчанию создает ZSTD с compression level равным 8

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

А как ты это понял? Просто в коде arrow значением по умолчанию для ZSTD стоит 1: https://github.com/ydb-platform/ydb/blob/main/contrib/libs/apache/arrow/cpp/src/arrow/util/compression_internal.h#L73

}
return NArrow::TStatusValidator::GetValid(arrow::util::Codec::Create(codec));
}

static arrow::ipc::IpcOptions BuildDefaultOptions() {
arrow::ipc::IpcWriteOptions options;
options.use_threads = false;
options.codec = *arrow::util::Codec::Create(arrow::Compression::LZ4_FRAME);
options.codec = GetDefaultCodec();
return options;
}

Expand Down Expand Up @@ -83,7 +108,7 @@ class TNativeSerializer: public ISerializer {
}

static arrow::ipc::IpcOptions GetDefaultOptions() {
static arrow::ipc::IpcWriteOptions options = BuildDefaultOptions();
arrow::ipc::IpcWriteOptions options = BuildDefaultOptions();
return options;
}

Expand Down
6 changes: 5 additions & 1 deletion ydb/core/formats/arrow/serializer/utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ bool SupportsCompressionLevel(const arrow::Compression::type compression, const
return true;
}

bool SupportsCompressionLevel(const NKikimrSchemeOp::EColumnCodec compression, const i32 compressionLevel) {
return SupportsCompressionLevel(CompressionFromProto(compression).value(), compressionLevel);
}

std::optional<int> MinimumCompressionLevel(const arrow::Compression::type compression) {
if (!SupportsCompressionLevel(compression)) {
return {};
Expand All @@ -32,4 +36,4 @@ std::optional<int> MaximumCompressionLevel(const arrow::Compression::type compre
}
return NArrow::TStatusValidator::GetValid(arrow::util::Codec::MaximumCompressionLevel(compression));
}
}
} // namespace NKikimr::NArrow
2 changes: 1 addition & 1 deletion ydb/core/formats/arrow/serializer/utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,4 @@ bool SupportsCompressionLevel(const arrow::Compression::type compression, const

std::optional<int> MinimumCompressionLevel(const arrow::Compression::type compression);
std::optional<int> MaximumCompressionLevel(const arrow::Compression::type compression);
}
} // namespace NKikimr::NArrow
13 changes: 10 additions & 3 deletions ydb/core/kqp/host/kqp_gateway_proxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -416,12 +416,19 @@ bool FillColumnTableSchema(NKikimrSchemeOp::TColumnTableSchema& schema, const T&
}
familyDescription->SetColumnCodec(codec);
} else {
code = Ydb::StatusIds::BAD_REQUEST;
error = TStringBuilder() << "Compression is not set for column family'" << family.Name << "'";
return false;
if (family.Name != "default") {
code = Ydb::StatusIds::BAD_REQUEST;
error = TStringBuilder() << "Compression is not set for non `default` column family '" << family.Name << "'";
return false;
}
}

if (family.CompressionLevel.Defined()) {
if (!family.Compression.Defined()) {
code = Ydb::StatusIds::BAD_REQUEST;
error = TStringBuilder() << "Compression is not set for column family '" << family.Name << "', but compression level is set";
return false;
}
familyDescription->SetColumnCodecLevel(family.CompressionLevel.GetRef());
}
}
Expand Down
32 changes: 23 additions & 9 deletions ydb/core/kqp/ut/common/columnshard.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,9 @@ namespace NKqp {

TString TTestHelper::TCompression::BuildQuery() const {
TStringBuilder str;
str << "COMPRESSION=\"" << NArrow::CompressionToString(CompressionType) << "\"";
if (CompressionType.has_value()) {
str << "COMPRESSION=\"" << NArrow::CompressionToString(CompressionType.value()) << "\"";
}
if (CompressionLevel.has_value()) {
str << ", COMPRESSION_LEVEL=" << CompressionLevel.value();
}
Expand All @@ -167,9 +169,16 @@ namespace NKqp {
<< "` and in right value `" << rhs.GetSerializerClassName() << "`";
return false;
}
if (CompressionType != rhs.GetCompressionType()) {
errorMessage = TStringBuilder() << "different compression type: in left value `" << NArrow::CompressionToString(CompressionType)
<< "` and in right value `" << NArrow::CompressionToString(rhs.GetCompressionType()) << "`";
if (CompressionType.has_value() && rhs.GetCompressionType().has_value() && CompressionType.value() != rhs.GetCompressionType().value()) {
errorMessage = TStringBuilder() << "different compression type: in left value `"
<< NArrow::CompressionToString(CompressionType.value()) << "` and in right value `"
<< NArrow::CompressionToString(rhs.GetCompressionType().value()) << "`";
return false;
} else if (CompressionType.has_value() && !rhs.GetCompressionType().has_value()) {
errorMessage = TStringBuilder() << "compression type is set in left value, but not set in right value";
return false;
} else if (!CompressionType.has_value() && rhs.GetCompressionType().has_value()) {
errorMessage = TStringBuilder() << "compression type is not set in left value, but set in right value";
return false;
}
if (CompressionLevel.has_value() && rhs.GetCompressionLevel().has_value() &&
Expand All @@ -193,12 +202,15 @@ namespace NKqp {
}

bool TTestHelper::TColumnFamily::DeserializeFromProto(const NKikimrSchemeOp::TFamilyDescription& family) {
if (!family.HasId() || !family.HasName() || !family.HasColumnCodec()) {
if (!family.HasId() || !family.HasName()) {
return false;
}
Id = family.GetId();
FamilyName = family.GetName();
Compression = TTestHelper::TCompression().SetCompressionType(family.GetColumnCodec());
Compression = TTestHelper::TCompression();
if (family.HasColumnCodec()) {
Compression.SetCompressionType(family.GetColumnCodec());
}
if (family.HasColumnCodecLevel()) {
Compression.SetCompressionLevel(family.GetColumnCodecLevel());
}
Expand Down Expand Up @@ -290,9 +302,11 @@ namespace NKqp {
TString TTestHelper::TColumnTableBase::BuildAlterCompressionQuery(const TString& columnName, const TCompression& compression) const {
auto str = TStringBuilder() << "ALTER OBJECT `" << Name << "` (TYPE " << GetObjectType() << ") SET";
str << " (ACTION=ALTER_COLUMN, NAME=" << columnName << ", `SERIALIZER.CLASS_NAME`=`" << compression.GetSerializerClassName() << "`,";
auto codec = NArrow::CompressionFromProto(compression.GetCompressionType());
Y_VERIFY(codec.has_value());
str << " `COMPRESSION.TYPE`=`" << NArrow::CompressionToString(codec.value()) << "`";
if (compression.GetCompressionType().has_value()) {
auto codec = NArrow::CompressionFromProto(compression.GetCompressionType().value());
Y_VERIFY(codec.has_value());
str << " `COMPRESSION.TYPE`=`" << NArrow::CompressionToString(codec.value()) << "`";
}
if (compression.GetCompressionLevel().has_value()) {
str << "`COMPRESSION.LEVEL`=" << compression.GetCompressionLevel().value();
}
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/ut/common/columnshard.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ class TTestHelper {
public:
class TCompression {
YDB_ACCESSOR(TString, SerializerClassName, "ARROW_SERIALIZER");
YDB_ACCESSOR_DEF(NKikimrSchemeOp::EColumnCodec, CompressionType);
YDB_ACCESSOR_DEF(std::optional<NKikimrSchemeOp::EColumnCodec>, CompressionType);
YDB_ACCESSOR_DEF(std::optional<i32>, CompressionLevel);

public:
Expand Down
80 changes: 80 additions & 0 deletions ydb/core/kqp/ut/olap/compression_ut.cpp
Original file line number Diff line number Diff line change
@@ -1,7 +1,47 @@
#include <ydb/core/kqp/ut/common/columnshard.h>
#include <ydb/core/kqp/ut/olap/helpers/get_value.h>
#include <ydb/core/kqp/ut/olap/helpers/query_executor.h>
#include <ydb/core/tx/columnshard/test_helper/controllers.h>

#include <ut/olap/helpers/typed_local.h>

namespace NKikimr::NKqp {

std::pair<ui64, ui64> GetVolumes(
const TKikimrRunner& runner, const TString& tablePath, const std::vector<TString> columnNames) {
TString selectQuery = "SELECT * FROM `" + tablePath + "/.sys/primary_index_stats` WHERE Activity == 1";
if (columnNames.size()) {
selectQuery += " AND EntityName IN ('" + JoinSeq("','", columnNames) + "')";
}
auto tableClient = runner.GetTableClient();
std::optional<ui64> rawBytesPred;
std::optional<ui64> bytesPred;
while (true) {
auto rows = ExecuteScanQuery(tableClient, selectQuery, false);
ui64 rawBytes = 0;
ui64 bytes = 0;
for (auto&& r : rows) {
for (auto&& c : r) {
if (c.first == "RawBytes") {
rawBytes += GetUint64(c.second);
}
if (c.first == "BlobRangeSize") {
bytes += GetUint64(c.second);
}
}
}
if (rawBytesPred && *rawBytesPred == rawBytes && bytesPred && *bytesPred == bytes) {
break;
} else {
rawBytesPred = rawBytes;
bytesPred = bytes;
Cerr << "Wait changes: " << bytes << "/" << rawBytes << Endl;
Sleep(TDuration::Seconds(5));
}
}
return { rawBytesPred.value(), bytesPred.value() };
}

Y_UNIT_TEST_SUITE(KqpOlapCompression) {
Y_UNIT_TEST(DisabledAlterCompression) {
TKikimrSettings settings = TKikimrSettings().SetWithSampleTables(false).SetEnableOlapCompression(false);
Expand Down Expand Up @@ -63,5 +103,45 @@ Y_UNIT_TEST_SUITE(KqpOlapCompression) {
testHelper.CreateTable(testTable);
testHelper.SetCompression(testTable, "pk_int", compression, NYdb::EStatus::SCHEME_ERROR);
}

std::pair<ui64, ui64> GetVolumesColumnWithCompression(const std::optional<NKikimrConfig::TColumnShardConfig>& CSConfig = {}) {
auto csController = NYDBTest::TControllers::RegisterCSControllerGuard<NOlap::TWaitCompactionController>();
NKikimrConfig::TAppConfig appConfig;
if (CSConfig.has_value()) {
*appConfig.MutableColumnShardConfig() = CSConfig.value();
}
auto settings = TKikimrSettings().SetWithSampleTables(false).SetAppConfig(appConfig);
TTestHelper testHelper(settings);
Tests::NCommon::TLoggerInit(testHelper.GetKikimr()).Initialize();
TTestHelper::TCompression plainCompression =
TTestHelper::TCompression().SetCompressionType(NKikimrSchemeOp::EColumnCodec::ColumnCodecPlain);

TVector<TTestHelper::TColumnSchema> schema = {
TTestHelper::TColumnSchema().SetName("pk_int").SetType(NScheme::NTypeIds::Uint64).SetNullable(false)
};

TString tableName = "/Root/ColumnTableTest";
TTestHelper::TColumnTable testTable;
testTable.SetName(tableName).SetPrimaryKey({ "pk_int" }).SetSharding({ "pk_int" }).SetSchema(schema);
testHelper.CreateTable(testTable);

TVector<NArrow::NConstruction::IArrayBuilder::TPtr> dataBuilders;
dataBuilders.push_back(
NArrow::NConstruction::TSimpleArrayConstructor<NArrow::NConstruction::TIntSeqFiller<arrow::UInt64Type>>::BuildNotNullable(
"pk_int", false));
auto batch = NArrow::NConstruction::TRecordBatchConstructor(dataBuilders).BuildBatch(100000);
testHelper.BulkUpsert(testTable, batch);
csController->WaitCompactions(TDuration::Seconds(10));
return GetVolumes(testHelper.GetKikimr(), tableName, { "pk_int" });
}

Y_UNIT_TEST(DefaultCompressionViaCSConfig) {
NKikimrConfig::TColumnShardConfig csConfig = NKikimrConfig::TColumnShardConfig();
csConfig.SetDefaultCompression(NKikimrSchemeOp::EColumnCodec::ColumnCodecPlain);
auto [rawBytesPK1, bytesPK1] = GetVolumesColumnWithCompression(csConfig);
auto [rawBytesPK2, bytesPK2] = GetVolumesColumnWithCompression(); // Default compression (ZSTD, 1)
AFL_VERIFY(rawBytesPK2 == rawBytesPK1)("pk1", rawBytesPK1)("pk2", rawBytesPK2);
AFL_VERIFY(bytesPK2 < bytesPK1 / 3)("pk1", bytesPK1)("pk2", bytesPK2);
}
}
}
17 changes: 12 additions & 5 deletions ydb/core/kqp/ut/olap/sys_view_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -191,8 +191,10 @@ Y_UNIT_TEST_SUITE(KqpOlapSysView) {
ui64 rawBytesPK1;
ui64 bytesPK1;
auto csController = NYDBTest::TControllers::RegisterCSControllerGuard<NOlap::TWaitCompactionController>();
auto settings = TKikimrSettings()
.SetWithSampleTables(false);
NKikimrConfig::TAppConfig appConfig;
auto* CSConfig = appConfig.MutableColumnShardConfig();
CSConfig->SetDefaultCompression(NKikimrSchemeOp::EColumnCodec::ColumnCodecLZ4);
auto settings = TKikimrSettings().SetWithSampleTables(false).SetAppConfig(appConfig);
TKikimrRunner kikimr(settings);
Tests::NCommon::TLoggerInit(kikimr).Initialize();
TTypedLocalHelper helper("", kikimr, "olapTable", "olapStore");
Expand Down Expand Up @@ -259,8 +261,10 @@ Y_UNIT_TEST_SUITE(KqpOlapSysView) {
ui64 rawBytes1;
ui64 bytes1;
auto csController = NYDBTest::TControllers::RegisterCSControllerGuard<NOlap::TWaitCompactionController>();
auto settings = TKikimrSettings()
.SetWithSampleTables(false);
NKikimrConfig::TAppConfig appConfig;
auto* CSConfig = appConfig.MutableColumnShardConfig();
CSConfig->SetDefaultCompression(NKikimrSchemeOp::EColumnCodec::ColumnCodecLZ4);
auto settings = TKikimrSettings().SetWithSampleTables(false).SetAppConfig(appConfig);
TKikimrRunner kikimr(settings);
Tests::NCommon::TLoggerInit(kikimr).Initialize();
TTypedLocalHelper helper("Utf8", kikimr);
Expand Down Expand Up @@ -298,7 +302,10 @@ Y_UNIT_TEST_SUITE(KqpOlapSysView) {
ui64 bytes1;
auto csController = NYDBTest::TControllers::RegisterCSControllerGuard<NOlap::TWaitCompactionController>();
csController->SetSmallSizeDetector(Max<ui32>());
auto settings = TKikimrSettings().SetWithSampleTables(false);
NKikimrConfig::TAppConfig appConfig;
auto* CSConfig = appConfig.MutableColumnShardConfig();
CSConfig->SetDefaultCompression(NKikimrSchemeOp::EColumnCodec::ColumnCodecLZ4);
auto settings = TKikimrSettings().SetWithSampleTables(false).SetAppConfig(appConfig);
TKikimrRunner kikimr(settings);
Tests::NCommon::TLoggerInit(kikimr).Initialize();
TTypedLocalHelper helper("Utf8", kikimr);
Expand Down
Loading
Loading