From 6d10a7acc298fc9476fb2bc81b456d29795ad6eb Mon Sep 17 00:00:00 2001 From: Aleksei Borzenkov Date: Fri, 17 Jan 2025 17:12:27 +0300 Subject: [PATCH] 24-3-13-hotfix: Fix uncommitted changes leak and clean them up on startup (#13487) --- ydb/core/tx/datashard/datashard.cpp | 1 + .../datashard__cleanup_uncommitted.cpp | 88 +++++++++++++++++++ ydb/core/tx/datashard/datashard_impl.h | 4 + .../tx/datashard/datashard_ut_snapshot.cpp | 54 ++++++++++++ .../tx/datashard/execute_kqp_data_tx_unit.cpp | 30 ++++--- ydb/core/tx/datashard/execute_write_unit.cpp | 20 +++-- ydb/core/tx/datashard/ya.make | 1 + 7 files changed, 180 insertions(+), 18 deletions(-) create mode 100644 ydb/core/tx/datashard/datashard__cleanup_uncommitted.cpp diff --git a/ydb/core/tx/datashard/datashard.cpp b/ydb/core/tx/datashard/datashard.cpp index 905dcc978a89..f918481494c1 100644 --- a/ydb/core/tx/datashard/datashard.cpp +++ b/ydb/core/tx/datashard/datashard.cpp @@ -409,6 +409,7 @@ void TDataShard::SwitchToWork(const TActorContext &ctx) { NotifySchemeshard(ctx); CheckInitiateBorrowedPartsReturn(ctx); CheckStateChange(ctx); + CleanupUncommitted(ctx); } void TDataShard::SyncConfig() { diff --git a/ydb/core/tx/datashard/datashard__cleanup_uncommitted.cpp b/ydb/core/tx/datashard/datashard__cleanup_uncommitted.cpp new file mode 100644 index 000000000000..04ca649579c4 --- /dev/null +++ b/ydb/core/tx/datashard/datashard__cleanup_uncommitted.cpp @@ -0,0 +1,88 @@ +#include "datashard_impl.h" + +namespace NKikimr::NDataShard { + +using namespace NTabletFlatExecutor; + +class TDataShard::TTxCleanupUncommitted : public TTransactionBase { +public: + TTxCleanupUncommitted(TDataShard* self) + : TTransactionBase(self) + {} + + bool Execute(TTransactionContext& txc, const TActorContext& ctx) override { + if (Self->State != TShardState::Ready) { + // We need to be very careful about cleaning up uncommitted changes + // Avoid mistakes by waiting until shard restarts in a Ready state + return true; + } + + size_t removed = 0; + for (const auto& pr : Self->TableInfos) { + if (pr.second->IsReplicated()) { + // Replicated tables use uncommitted changes for replication + // Since we don't track them we cannot know whether they leaked or not + continue; + } + + auto localTid = pr.second->LocalTid; + if (!txc.DB.GetScheme().GetTableInfo(localTid)) { + // Note: this check is likely not needed, since all user tables + // must be present in the Ready state, but make sure we don't + // trip since this code always runs at startup. + continue; + } + + auto openTxs = txc.DB.GetOpenTxs(localTid); + for (ui64 txId : openTxs) { + if (Self->SysLocksTable().GetLocks().contains(txId)) { + // Changes are associated with a known lock + continue; + } + if (Self->GetVolatileTxManager().FindByCommitTxId(txId)) { + // Changes are associated with a known volatile tx + continue; + } + + // Changes are neither committed nor removed and are not tracked + if (removed >= 1000) { + // Avoid removing more than 1k transactions per transaction + Reschedule = true; + break; + } + + // Remove otherwise untracked changes + txc.DB.RemoveTx(localTid, txId); + ++removed; + } + + if (Reschedule) { + break; + } + } + + if (removed > 0) { + LOG_WARN_S(ctx, NKikimrServices::TX_DATASHARD, + "DataShard " << Self->TabletID() << " removed " << removed << " untracked uncommitted changes"); + } + + return true; + } + + void Complete(const TActorContext& ctx) override { + if (Reschedule) { + Self->CleanupUncommitted(ctx); + } + } + +private: + bool Reschedule = false; +}; + +void TDataShard::CleanupUncommitted(const TActorContext& ctx) { + if (State == TShardState::Ready) { + Execute(new TTxCleanupUncommitted(this), ctx); + } +} + +} // namespace NKikimr::NDataShard diff --git a/ydb/core/tx/datashard/datashard_impl.h b/ydb/core/tx/datashard/datashard_impl.h index dd92f5f31874..84e031086e11 100644 --- a/ydb/core/tx/datashard/datashard_impl.h +++ b/ydb/core/tx/datashard/datashard_impl.h @@ -242,6 +242,7 @@ class TDataShard class TTxCdcStreamEmitHeartbeats; class TTxUpdateFollowerReadEdge; class TTxRemoveSchemaSnapshots; + class TTxCleanupUncommitted; template friend class TTxDirectBase; class TTxUploadRows; @@ -1422,6 +1423,9 @@ class TDataShard void SwitchToWork(const TActorContext &ctx); void SyncConfig(); + // Cleanup for bug https://github.com/ydb-platform/ydb/issues/13387 + void CleanupUncommitted(const TActorContext &ctx); + TMaybe GetTxPlanStartTimeAndCleanup(ui64 step); struct TPersistentTablet; diff --git a/ydb/core/tx/datashard/datashard_ut_snapshot.cpp b/ydb/core/tx/datashard/datashard_ut_snapshot.cpp index a72385be1ea1..7e45dfcfee38 100644 --- a/ydb/core/tx/datashard/datashard_ut_snapshot.cpp +++ b/ydb/core/tx/datashard/datashard_ut_snapshot.cpp @@ -5149,6 +5149,60 @@ Y_UNIT_TEST_SUITE(DataShardSnapshots) { } } + Y_UNIT_TEST(BrokenLockChangesDontLeak) { + TPortManager pm; + TServerSettings serverSettings(pm.GetPort(2134)); + serverSettings.SetDomainName("Root") + .SetUseRealThreads(false) + .SetDomainPlanResolution(100); + + Tests::TServer::TPtr server = new TServer(serverSettings); + auto &runtime = *server->GetRuntime(); + auto sender = runtime.AllocateEdgeActor(); + + runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE); + + InitRoot(server, sender); + + TDisableDataShardLogBatching disableDataShardLogBatching; + + UNIT_ASSERT_VALUES_EQUAL( + KqpSchemeExec(runtime, R"( + CREATE TABLE `/Root/table` (key Uint32, value Uint32, PRIMARY KEY (key)); + )"), + "SUCCESS"); + + ExecSQL(server, sender, "UPSERT INTO `/Root/table` (key, value) VALUES (1, 11);"); + + TString sessionId, txId; + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleBegin(runtime, sessionId, txId, R"( + SELECT key, value FROM `/Root/table` + ORDER BY key; + )"), + "{ items { uint32_value: 1 } items { uint32_value: 11 } }"); + + ExecSQL(server, sender, "UPSERT INTO `/Root/table` (key, value) VALUES (2, 22);"); + + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleContinue(runtime, sessionId, txId, R"( + UPSERT INTO `/Root/table` (key, value) VALUES (3, 33); + SELECT key, value FROM `/Root/table` ORDER BY key; + )"), + "ERROR: ABORTED"); + + const auto shards = GetTableShards(server, sender, "/Root/table"); + const auto tableId = ResolveTableId(server, sender, "/Root/table"); + UNIT_ASSERT_VALUES_EQUAL(shards.size(), 1u); + + // Check shard doesn't have open transactions + { + runtime.SendToPipe(shards.at(0), sender, new TEvDataShard::TEvGetOpenTxs(tableId.PathId)); + auto ev = runtime.GrabEdgeEventRethrow(sender); + UNIT_ASSERT_C(ev->Get()->OpenTxs.empty(), "at shard " << shards.at(0)); + } + } + } } // namespace NKikimr diff --git a/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp b/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp index b61b78d7f180..8c667b4266b7 100644 --- a/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp +++ b/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp @@ -146,6 +146,15 @@ EExecutionStatus TExecuteKqpDataTxUnit::Execute(TOperation::TPtr op, TTransactio } if (guardLocks.LockTxId) { + auto abortLock = [&]() { + LOG_T("Operation " << *op << " (execute_kqp_data_tx) at " << tabletId + << " aborting because it cannot acquire locks"); + + op->SetAbortedFlag(); + BuildResult(op, NKikimrTxDataShard::TEvProposeTransactionResult::LOCKS_BROKEN); + return EExecutionStatus::Executed; + }; + switch (DataShard.SysLocksTable().EnsureCurrentLock()) { case EEnsureCurrentLock::Success: // Lock is valid, we may continue with reads and side-effects @@ -153,27 +162,26 @@ EExecutionStatus TExecuteKqpDataTxUnit::Execute(TOperation::TPtr op, TTransactio case EEnsureCurrentLock::Broken: // Lock is valid, but broken, we could abort early in some - // cases, but it doesn't affect correctness. + // cases, but it doesn't affect correctness. For write + // transactions we need to abort, since we may otherwise + // perform writes that are not attached to any lock. + if (!op->IsReadOnly()) { + return abortLock(); + } break; case EEnsureCurrentLock::TooMany: // Lock cannot be created, it's not necessarily a problem // for read-only transactions, for non-readonly we need to // abort; - if (op->IsReadOnly()) { - break; + if (!op->IsReadOnly()) { + return abortLock(); } - - [[fallthrough]]; + break; case EEnsureCurrentLock::Abort: // Lock cannot be created and we must abort - LOG_T("Operation " << *op << " (execute_kqp_data_tx) at " << tabletId - << " aborting because it cannot acquire locks"); - - op->SetAbortedFlag(); - BuildResult(op, NKikimrTxDataShard::TEvProposeTransactionResult::LOCKS_BROKEN); - return EExecutionStatus::Executed; + return abortLock(); } } diff --git a/ydb/core/tx/datashard/execute_write_unit.cpp b/ydb/core/tx/datashard/execute_write_unit.cpp index b5ba1432cecb..173649c10bcc 100644 --- a/ydb/core/tx/datashard/execute_write_unit.cpp +++ b/ydb/core/tx/datashard/execute_write_unit.cpp @@ -286,6 +286,12 @@ class TExecuteWriteUnit : public TExecutionUnit { } if (guardLocks.LockTxId) { + auto abortLock = [&]() { + LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, "Operation " << *op << " at " << tabletId << " aborting because it cannot acquire locks"); + writeOp->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_LOCKS_BROKEN, "Operation is aborting because it cannot acquire locks"); + return EExecutionStatus::Executed; + }; + switch (DataShard.SysLocksTable().EnsureCurrentLock()) { case EEnsureCurrentLock::Success: // Lock is valid, we may continue with reads and side-effects @@ -294,23 +300,23 @@ class TExecuteWriteUnit : public TExecutionUnit { case EEnsureCurrentLock::Broken: // Lock is valid, but broken, we could abort early in some // cases, but it doesn't affect correctness. + if (!op->IsReadOnly()) { + return abortLock(); + } break; case EEnsureCurrentLock::TooMany: // Lock cannot be created, it's not necessarily a problem // for read-only transactions, for non-readonly we need to // abort; - if (op->IsReadOnly()) { - break; + if (!op->IsReadOnly()) { + return abortLock(); } - - [[fallthrough]]; + break; case EEnsureCurrentLock::Abort: // Lock cannot be created and we must abort - LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, "Operation " << *op << " at " << tabletId << " aborting because it cannot acquire locks"); - writeOp->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_LOCKS_BROKEN, "Operation is aborting because it cannot acquire locks"); - return EExecutionStatus::Executed; + return abortLock(); } } diff --git a/ydb/core/tx/datashard/ya.make b/ydb/core/tx/datashard/ya.make index 6cfc69901adb..2d403ad6da3d 100644 --- a/ydb/core/tx/datashard/ya.make +++ b/ydb/core/tx/datashard/ya.make @@ -49,6 +49,7 @@ SRCS( datashard__cleanup_borrowed.cpp datashard__cleanup_in_rs.cpp datashard__cleanup_tx.cpp + datashard__cleanup_uncommitted.cpp datashard__conditional_erase_rows.cpp datashard__engine_host.cpp datashard__engine_host.h