Skip to content

Commit

Permalink
24-3-13-hotfix: Fix uncommitted changes leak and clean them up on sta…
Browse files Browse the repository at this point in the history
…rtup (#13487)
  • Loading branch information
snaury authored Jan 17, 2025
1 parent 7ef8e47 commit 6d10a7a
Show file tree
Hide file tree
Showing 7 changed files with 180 additions and 18 deletions.
1 change: 1 addition & 0 deletions ydb/core/tx/datashard/datashard.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,7 @@ void TDataShard::SwitchToWork(const TActorContext &ctx) {
NotifySchemeshard(ctx);
CheckInitiateBorrowedPartsReturn(ctx);
CheckStateChange(ctx);
CleanupUncommitted(ctx);
}

void TDataShard::SyncConfig() {
Expand Down
88 changes: 88 additions & 0 deletions ydb/core/tx/datashard/datashard__cleanup_uncommitted.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
#include "datashard_impl.h"

namespace NKikimr::NDataShard {

using namespace NTabletFlatExecutor;

class TDataShard::TTxCleanupUncommitted : public TTransactionBase<TDataShard> {
public:
TTxCleanupUncommitted(TDataShard* self)
: TTransactionBase(self)
{}

bool Execute(TTransactionContext& txc, const TActorContext& ctx) override {
if (Self->State != TShardState::Ready) {
// We need to be very careful about cleaning up uncommitted changes
// Avoid mistakes by waiting until shard restarts in a Ready state
return true;
}

size_t removed = 0;
for (const auto& pr : Self->TableInfos) {
if (pr.second->IsReplicated()) {
// Replicated tables use uncommitted changes for replication
// Since we don't track them we cannot know whether they leaked or not
continue;
}

auto localTid = pr.second->LocalTid;
if (!txc.DB.GetScheme().GetTableInfo(localTid)) {
// Note: this check is likely not needed, since all user tables
// must be present in the Ready state, but make sure we don't
// trip since this code always runs at startup.
continue;
}

auto openTxs = txc.DB.GetOpenTxs(localTid);
for (ui64 txId : openTxs) {
if (Self->SysLocksTable().GetLocks().contains(txId)) {
// Changes are associated with a known lock
continue;
}
if (Self->GetVolatileTxManager().FindByCommitTxId(txId)) {
// Changes are associated with a known volatile tx
continue;
}

// Changes are neither committed nor removed and are not tracked
if (removed >= 1000) {
// Avoid removing more than 1k transactions per transaction
Reschedule = true;
break;
}

// Remove otherwise untracked changes
txc.DB.RemoveTx(localTid, txId);
++removed;
}

if (Reschedule) {
break;
}
}

if (removed > 0) {
LOG_WARN_S(ctx, NKikimrServices::TX_DATASHARD,
"DataShard " << Self->TabletID() << " removed " << removed << " untracked uncommitted changes");
}

return true;
}

void Complete(const TActorContext& ctx) override {
if (Reschedule) {
Self->CleanupUncommitted(ctx);
}
}

private:
bool Reschedule = false;
};

void TDataShard::CleanupUncommitted(const TActorContext& ctx) {
if (State == TShardState::Ready) {
Execute(new TTxCleanupUncommitted(this), ctx);
}
}

} // namespace NKikimr::NDataShard
4 changes: 4 additions & 0 deletions ydb/core/tx/datashard/datashard_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,7 @@ class TDataShard
class TTxCdcStreamEmitHeartbeats;
class TTxUpdateFollowerReadEdge;
class TTxRemoveSchemaSnapshots;
class TTxCleanupUncommitted;

template <typename T> friend class TTxDirectBase;
class TTxUploadRows;
Expand Down Expand Up @@ -1422,6 +1423,9 @@ class TDataShard
void SwitchToWork(const TActorContext &ctx);
void SyncConfig();

// Cleanup for bug https://github.com/ydb-platform/ydb/issues/13387
void CleanupUncommitted(const TActorContext &ctx);

TMaybe<TInstant> GetTxPlanStartTimeAndCleanup(ui64 step);

struct TPersistentTablet;
Expand Down
54 changes: 54 additions & 0 deletions ydb/core/tx/datashard/datashard_ut_snapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5149,6 +5149,60 @@ Y_UNIT_TEST_SUITE(DataShardSnapshots) {
}
}

Y_UNIT_TEST(BrokenLockChangesDontLeak) {
TPortManager pm;
TServerSettings serverSettings(pm.GetPort(2134));
serverSettings.SetDomainName("Root")
.SetUseRealThreads(false)
.SetDomainPlanResolution(100);

Tests::TServer::TPtr server = new TServer(serverSettings);
auto &runtime = *server->GetRuntime();
auto sender = runtime.AllocateEdgeActor();

runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);

InitRoot(server, sender);

TDisableDataShardLogBatching disableDataShardLogBatching;

UNIT_ASSERT_VALUES_EQUAL(
KqpSchemeExec(runtime, R"(
CREATE TABLE `/Root/table` (key Uint32, value Uint32, PRIMARY KEY (key));
)"),
"SUCCESS");

ExecSQL(server, sender, "UPSERT INTO `/Root/table` (key, value) VALUES (1, 11);");

TString sessionId, txId;
UNIT_ASSERT_VALUES_EQUAL(
KqpSimpleBegin(runtime, sessionId, txId, R"(
SELECT key, value FROM `/Root/table`
ORDER BY key;
)"),
"{ items { uint32_value: 1 } items { uint32_value: 11 } }");

ExecSQL(server, sender, "UPSERT INTO `/Root/table` (key, value) VALUES (2, 22);");

UNIT_ASSERT_VALUES_EQUAL(
KqpSimpleContinue(runtime, sessionId, txId, R"(
UPSERT INTO `/Root/table` (key, value) VALUES (3, 33);
SELECT key, value FROM `/Root/table` ORDER BY key;
)"),
"ERROR: ABORTED");

const auto shards = GetTableShards(server, sender, "/Root/table");
const auto tableId = ResolveTableId(server, sender, "/Root/table");
UNIT_ASSERT_VALUES_EQUAL(shards.size(), 1u);

// Check shard doesn't have open transactions
{
runtime.SendToPipe(shards.at(0), sender, new TEvDataShard::TEvGetOpenTxs(tableId.PathId));
auto ev = runtime.GrabEdgeEventRethrow<TEvDataShard::TEvGetOpenTxsResult>(sender);
UNIT_ASSERT_C(ev->Get()->OpenTxs.empty(), "at shard " << shards.at(0));
}
}

}

} // namespace NKikimr
30 changes: 19 additions & 11 deletions ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -146,34 +146,42 @@ EExecutionStatus TExecuteKqpDataTxUnit::Execute(TOperation::TPtr op, TTransactio
}

if (guardLocks.LockTxId) {
auto abortLock = [&]() {
LOG_T("Operation " << *op << " (execute_kqp_data_tx) at " << tabletId
<< " aborting because it cannot acquire locks");

op->SetAbortedFlag();
BuildResult(op, NKikimrTxDataShard::TEvProposeTransactionResult::LOCKS_BROKEN);
return EExecutionStatus::Executed;
};

switch (DataShard.SysLocksTable().EnsureCurrentLock()) {
case EEnsureCurrentLock::Success:
// Lock is valid, we may continue with reads and side-effects
break;

case EEnsureCurrentLock::Broken:
// Lock is valid, but broken, we could abort early in some
// cases, but it doesn't affect correctness.
// cases, but it doesn't affect correctness. For write
// transactions we need to abort, since we may otherwise
// perform writes that are not attached to any lock.
if (!op->IsReadOnly()) {
return abortLock();
}
break;

case EEnsureCurrentLock::TooMany:
// Lock cannot be created, it's not necessarily a problem
// for read-only transactions, for non-readonly we need to
// abort;
if (op->IsReadOnly()) {
break;
if (!op->IsReadOnly()) {
return abortLock();
}

[[fallthrough]];
break;

case EEnsureCurrentLock::Abort:
// Lock cannot be created and we must abort
LOG_T("Operation " << *op << " (execute_kqp_data_tx) at " << tabletId
<< " aborting because it cannot acquire locks");

op->SetAbortedFlag();
BuildResult(op, NKikimrTxDataShard::TEvProposeTransactionResult::LOCKS_BROKEN);
return EExecutionStatus::Executed;
return abortLock();
}
}

Expand Down
20 changes: 13 additions & 7 deletions ydb/core/tx/datashard/execute_write_unit.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,12 @@ class TExecuteWriteUnit : public TExecutionUnit {
}

if (guardLocks.LockTxId) {
auto abortLock = [&]() {
LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, "Operation " << *op << " at " << tabletId << " aborting because it cannot acquire locks");
writeOp->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_LOCKS_BROKEN, "Operation is aborting because it cannot acquire locks");
return EExecutionStatus::Executed;
};

switch (DataShard.SysLocksTable().EnsureCurrentLock()) {
case EEnsureCurrentLock::Success:
// Lock is valid, we may continue with reads and side-effects
Expand All @@ -294,23 +300,23 @@ class TExecuteWriteUnit : public TExecutionUnit {
case EEnsureCurrentLock::Broken:
// Lock is valid, but broken, we could abort early in some
// cases, but it doesn't affect correctness.
if (!op->IsReadOnly()) {
return abortLock();
}
break;

case EEnsureCurrentLock::TooMany:
// Lock cannot be created, it's not necessarily a problem
// for read-only transactions, for non-readonly we need to
// abort;
if (op->IsReadOnly()) {
break;
if (!op->IsReadOnly()) {
return abortLock();
}

[[fallthrough]];
break;

case EEnsureCurrentLock::Abort:
// Lock cannot be created and we must abort
LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, "Operation " << *op << " at " << tabletId << " aborting because it cannot acquire locks");
writeOp->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_LOCKS_BROKEN, "Operation is aborting because it cannot acquire locks");
return EExecutionStatus::Executed;
return abortLock();
}
}

Expand Down
1 change: 1 addition & 0 deletions ydb/core/tx/datashard/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ SRCS(
datashard__cleanup_borrowed.cpp
datashard__cleanup_in_rs.cpp
datashard__cleanup_tx.cpp
datashard__cleanup_uncommitted.cpp
datashard__conditional_erase_rows.cpp
datashard__engine_host.cpp
datashard__engine_host.h
Expand Down

0 comments on commit 6d10a7a

Please sign in to comment.