Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

24-3-13-hotfix: Fix uncommitted changes leak and clean them up on startup #13487

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ydb/core/tx/datashard/datashard.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,7 @@ void TDataShard::SwitchToWork(const TActorContext &ctx) {
NotifySchemeshard(ctx);
CheckInitiateBorrowedPartsReturn(ctx);
CheckStateChange(ctx);
CleanupUncommitted(ctx);
}

void TDataShard::SyncConfig() {
Expand Down
88 changes: 88 additions & 0 deletions ydb/core/tx/datashard/datashard__cleanup_uncommitted.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
#include "datashard_impl.h"

namespace NKikimr::NDataShard {

using namespace NTabletFlatExecutor;

class TDataShard::TTxCleanupUncommitted : public TTransactionBase<TDataShard> {
public:
TTxCleanupUncommitted(TDataShard* self)
: TTransactionBase(self)
{}

bool Execute(TTransactionContext& txc, const TActorContext& ctx) override {
if (Self->State != TShardState::Ready) {
// We need to be very careful about cleaning up uncommitted changes
// Avoid mistakes by waiting until shard restarts in a Ready state
return true;
}

size_t removed = 0;
for (const auto& pr : Self->TableInfos) {
if (pr.second->IsReplicated()) {
// Replicated tables use uncommitted changes for replication
// Since we don't track them we cannot know whether they leaked or not
continue;
}

auto localTid = pr.second->LocalTid;
if (!txc.DB.GetScheme().GetTableInfo(localTid)) {
// Note: this check is likely not needed, since all user tables
// must be present in the Ready state, but make sure we don't
// trip since this code always runs at startup.
continue;
}

auto openTxs = txc.DB.GetOpenTxs(localTid);
for (ui64 txId : openTxs) {
if (Self->SysLocksTable().GetLocks().contains(txId)) {
// Changes are associated with a known lock
continue;
}
if (Self->GetVolatileTxManager().FindByCommitTxId(txId)) {
// Changes are associated with a known volatile tx
continue;
}

// Changes are neither committed nor removed and are not tracked
if (removed >= 1000) {
// Avoid removing more than 1k transactions per transaction
Reschedule = true;
break;
}

// Remove otherwise untracked changes
txc.DB.RemoveTx(localTid, txId);
++removed;
}

if (Reschedule) {
break;
}
}

if (removed > 0) {
LOG_WARN_S(ctx, NKikimrServices::TX_DATASHARD,
"DataShard " << Self->TabletID() << " removed " << removed << " untracked uncommitted changes");
}

return true;
}

void Complete(const TActorContext& ctx) override {
if (Reschedule) {
Self->CleanupUncommitted(ctx);
}
}

private:
bool Reschedule = false;
};

void TDataShard::CleanupUncommitted(const TActorContext& ctx) {
if (State == TShardState::Ready) {
Execute(new TTxCleanupUncommitted(this), ctx);
}
}

} // namespace NKikimr::NDataShard
4 changes: 4 additions & 0 deletions ydb/core/tx/datashard/datashard_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,7 @@ class TDataShard
class TTxCdcStreamEmitHeartbeats;
class TTxUpdateFollowerReadEdge;
class TTxRemoveSchemaSnapshots;
class TTxCleanupUncommitted;

template <typename T> friend class TTxDirectBase;
class TTxUploadRows;
Expand Down Expand Up @@ -1422,6 +1423,9 @@ class TDataShard
void SwitchToWork(const TActorContext &ctx);
void SyncConfig();

// Cleanup for bug https://github.com/ydb-platform/ydb/issues/13387
void CleanupUncommitted(const TActorContext &ctx);

TMaybe<TInstant> GetTxPlanStartTimeAndCleanup(ui64 step);

struct TPersistentTablet;
Expand Down
54 changes: 54 additions & 0 deletions ydb/core/tx/datashard/datashard_ut_snapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5149,6 +5149,60 @@ Y_UNIT_TEST_SUITE(DataShardSnapshots) {
}
}

Y_UNIT_TEST(BrokenLockChangesDontLeak) {
TPortManager pm;
TServerSettings serverSettings(pm.GetPort(2134));
serverSettings.SetDomainName("Root")
.SetUseRealThreads(false)
.SetDomainPlanResolution(100);

Tests::TServer::TPtr server = new TServer(serverSettings);
auto &runtime = *server->GetRuntime();
auto sender = runtime.AllocateEdgeActor();

runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);

InitRoot(server, sender);

TDisableDataShardLogBatching disableDataShardLogBatching;

UNIT_ASSERT_VALUES_EQUAL(
KqpSchemeExec(runtime, R"(
CREATE TABLE `/Root/table` (key Uint32, value Uint32, PRIMARY KEY (key));
)"),
"SUCCESS");

ExecSQL(server, sender, "UPSERT INTO `/Root/table` (key, value) VALUES (1, 11);");

TString sessionId, txId;
UNIT_ASSERT_VALUES_EQUAL(
KqpSimpleBegin(runtime, sessionId, txId, R"(
SELECT key, value FROM `/Root/table`
ORDER BY key;
)"),
"{ items { uint32_value: 1 } items { uint32_value: 11 } }");

ExecSQL(server, sender, "UPSERT INTO `/Root/table` (key, value) VALUES (2, 22);");

UNIT_ASSERT_VALUES_EQUAL(
KqpSimpleContinue(runtime, sessionId, txId, R"(
UPSERT INTO `/Root/table` (key, value) VALUES (3, 33);
SELECT key, value FROM `/Root/table` ORDER BY key;
)"),
"ERROR: ABORTED");

const auto shards = GetTableShards(server, sender, "/Root/table");
const auto tableId = ResolveTableId(server, sender, "/Root/table");
UNIT_ASSERT_VALUES_EQUAL(shards.size(), 1u);

// Check shard doesn't have open transactions
{
runtime.SendToPipe(shards.at(0), sender, new TEvDataShard::TEvGetOpenTxs(tableId.PathId));
auto ev = runtime.GrabEdgeEventRethrow<TEvDataShard::TEvGetOpenTxsResult>(sender);
UNIT_ASSERT_C(ev->Get()->OpenTxs.empty(), "at shard " << shards.at(0));
}
}

}

} // namespace NKikimr
30 changes: 19 additions & 11 deletions ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -146,34 +146,42 @@ EExecutionStatus TExecuteKqpDataTxUnit::Execute(TOperation::TPtr op, TTransactio
}

if (guardLocks.LockTxId) {
auto abortLock = [&]() {
LOG_T("Operation " << *op << " (execute_kqp_data_tx) at " << tabletId
<< " aborting because it cannot acquire locks");

op->SetAbortedFlag();
BuildResult(op, NKikimrTxDataShard::TEvProposeTransactionResult::LOCKS_BROKEN);
return EExecutionStatus::Executed;
};

switch (DataShard.SysLocksTable().EnsureCurrentLock()) {
case EEnsureCurrentLock::Success:
// Lock is valid, we may continue with reads and side-effects
break;

case EEnsureCurrentLock::Broken:
// Lock is valid, but broken, we could abort early in some
// cases, but it doesn't affect correctness.
// cases, but it doesn't affect correctness. For write
// transactions we need to abort, since we may otherwise
// perform writes that are not attached to any lock.
if (!op->IsReadOnly()) {
return abortLock();
}
break;

case EEnsureCurrentLock::TooMany:
// Lock cannot be created, it's not necessarily a problem
// for read-only transactions, for non-readonly we need to
// abort;
if (op->IsReadOnly()) {
break;
if (!op->IsReadOnly()) {
return abortLock();
}

[[fallthrough]];
break;

case EEnsureCurrentLock::Abort:
// Lock cannot be created and we must abort
LOG_T("Operation " << *op << " (execute_kqp_data_tx) at " << tabletId
<< " aborting because it cannot acquire locks");

op->SetAbortedFlag();
BuildResult(op, NKikimrTxDataShard::TEvProposeTransactionResult::LOCKS_BROKEN);
return EExecutionStatus::Executed;
return abortLock();
}
}

Expand Down
20 changes: 13 additions & 7 deletions ydb/core/tx/datashard/execute_write_unit.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,12 @@ class TExecuteWriteUnit : public TExecutionUnit {
}

if (guardLocks.LockTxId) {
auto abortLock = [&]() {
LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, "Operation " << *op << " at " << tabletId << " aborting because it cannot acquire locks");
writeOp->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_LOCKS_BROKEN, "Operation is aborting because it cannot acquire locks");
return EExecutionStatus::Executed;
};

switch (DataShard.SysLocksTable().EnsureCurrentLock()) {
case EEnsureCurrentLock::Success:
// Lock is valid, we may continue with reads and side-effects
Expand All @@ -294,23 +300,23 @@ class TExecuteWriteUnit : public TExecutionUnit {
case EEnsureCurrentLock::Broken:
// Lock is valid, but broken, we could abort early in some
// cases, but it doesn't affect correctness.
if (!op->IsReadOnly()) {
return abortLock();
}
break;

case EEnsureCurrentLock::TooMany:
// Lock cannot be created, it's not necessarily a problem
// for read-only transactions, for non-readonly we need to
// abort;
if (op->IsReadOnly()) {
break;
if (!op->IsReadOnly()) {
return abortLock();
}

[[fallthrough]];
break;

case EEnsureCurrentLock::Abort:
// Lock cannot be created and we must abort
LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, "Operation " << *op << " at " << tabletId << " aborting because it cannot acquire locks");
writeOp->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_LOCKS_BROKEN, "Operation is aborting because it cannot acquire locks");
return EExecutionStatus::Executed;
return abortLock();
}
}

Expand Down
1 change: 1 addition & 0 deletions ydb/core/tx/datashard/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ SRCS(
datashard__cleanup_borrowed.cpp
datashard__cleanup_in_rs.cpp
datashard__cleanup_tx.cpp
datashard__cleanup_uncommitted.cpp
datashard__conditional_erase_rows.cpp
datashard__engine_host.cpp
datashard__engine_host.h
Expand Down
Loading