From d8812f9fe2730434e59af94b4ad3dda98addb5ca Mon Sep 17 00:00:00 2001 From: Aleksei Borzenkov Date: Fri, 10 Jan 2025 10:03:55 +0300 Subject: [PATCH] Don't migrate uninitialized in-memory vars between datashards (#13235) --- ydb/core/tx/datashard/create_table_unit.cpp | 10 +- ydb/core/tx/datashard/datashard.cpp | 13 ++ ydb/core/tx/datashard/datashard_impl.h | 2 + .../tx/datashard/datashard_ut_snapshot.cpp | 149 ++++++++++++++++++ .../tx/datashard/memory_state_migration.cpp | 2 +- 5 files changed, 167 insertions(+), 9 deletions(-) diff --git a/ydb/core/tx/datashard/create_table_unit.cpp b/ydb/core/tx/datashard/create_table_unit.cpp index 8c8a72888e10..cc7383f6ba63 100644 --- a/ydb/core/tx/datashard/create_table_unit.cpp +++ b/ydb/core/tx/datashard/create_table_unit.cpp @@ -80,14 +80,8 @@ EExecutionStatus TCreateTableUnit::Execute(TOperation::TPtr op, BuildResult(op, NKikimrTxDataShard::TEvProposeTransactionResult::COMPLETE); op->Result()->SetStepOrderId(op->GetStepOrder().ToPair()); - if (DataShard.GetState() == TShardState::WaitScheme) { - txc.DB.NoMoreReadsForTx(); - DataShard.SetPersistState(TShardState::Ready, txc); - // We could perform snapshot reads after becoming ready - // Make sure older versions restore mediator state in that case - DataShard.PersistUnprotectedReadsEnabled(txc); - DataShard.SendRegistrationRequestTimeCast(ctx); - } + txc.DB.NoMoreReadsForTx(); + DataShard.OnTableCreated(txc, ctx); return EExecutionStatus::DelayCompleteNoMoreRestarts; } diff --git a/ydb/core/tx/datashard/datashard.cpp b/ydb/core/tx/datashard/datashard.cpp index 8c9845346bce..04afe57f9479 100644 --- a/ydb/core/tx/datashard/datashard.cpp +++ b/ydb/core/tx/datashard/datashard.cpp @@ -2863,6 +2863,7 @@ void TDataShard::FinishMediatorStateRestore(TTransactionContext& txc, ui64 readS } MediatorStateWaiting = false; + InMemoryVarsRestored = true; // Resend all waiting messages TVector> msgs; @@ -4746,6 +4747,18 @@ void TDataShard::Handle(TEvTxUserProxy::TEvAllocateTxIdResult::TPtr& ev, const T } } +void TDataShard::OnTableCreated(TTransactionContext &txc, const TActorContext &ctx) { + if (GetState() == TShardState::WaitScheme) { + SetPersistState(TShardState::Ready, txc); + // A newly created table doesn't need to wait for mediator state + // restore and initial values can be trusted. + InMemoryVarsRestored = true; + // We could perform snapshot reads after becoming ready + // Make sure older versions restore mediator state in that case + PersistUnprotectedReadsEnabled(txc); + SendRegistrationRequestTimeCast(ctx); + } +} } // NDataShard diff --git a/ydb/core/tx/datashard/datashard_impl.h b/ydb/core/tx/datashard/datashard_impl.h index f7130708990a..56fb693a8f5f 100644 --- a/ydb/core/tx/datashard/datashard_impl.h +++ b/ydb/core/tx/datashard/datashard_impl.h @@ -2147,6 +2147,8 @@ class TDataShard return LogThrottlers[type]; }; + void OnTableCreated(TTransactionContext& txc, const TActorContext& ctx); + private: /// class TLoanReturnTracker { diff --git a/ydb/core/tx/datashard/datashard_ut_snapshot.cpp b/ydb/core/tx/datashard/datashard_ut_snapshot.cpp index 6353c7b0c3ee..1ce4b33de7c4 100644 --- a/ydb/core/tx/datashard/datashard_ut_snapshot.cpp +++ b/ydb/core/tx/datashard/datashard_ut_snapshot.cpp @@ -4314,9 +4314,16 @@ Y_UNIT_TEST_SUITE(DataShardSnapshots) { Blocked.clear(); } + void Wait() { + Runtime.WaitFor("blocked TEvSplit", [&]{ return Blocked.size() > 0; }); + } + private: void Process(TEvDataShard::TEvSplit::TPtr& ev) { Cerr << "... blocking TEvSplit" << Endl; + for (const auto& dstRange : ev->Get()->Record.GetSplitDescription().GetDestinationRanges()) { + DstShards.push_back(dstRange.GetTabletID()); + } Blocked.emplace_back(ev.Release()); } @@ -4325,6 +4332,9 @@ Y_UNIT_TEST_SUITE(DataShardSnapshots) { const ui32 Node1; std::vector> Blocked; TTestActorRuntime::TEventObserverHolder Observer; + + public: + std::vector DstShards; }; Y_UNIT_TEST(RepeatableReadAfterSplitRace) { @@ -5709,6 +5719,145 @@ Y_UNIT_TEST_SUITE(DataShardSnapshots) { "{ items { uint32_value: 2 } items { uint32_value: 22 } }"); } + Y_UNIT_TEST(DelayedWriteReadableAfterSplitAndReboot) { + TPortManager pm; + TServerSettings serverSettings(pm.GetPort(2134)); + serverSettings.SetDomainName("Root") + .SetNodeCount(2) + .SetUseRealThreads(false) + .SetDomainPlanResolution(100); + + Tests::TServer::TPtr server = new TServer(serverSettings); + auto &runtime = *server->GetRuntime(); + auto sender = runtime.AllocateEdgeActor(); + + // Don't allow granular timecast side-stepping mediator time hacks in this test + TBlockEvents blockGranularUpdate(runtime); + + runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE); + runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_DEBUG); + runtime.SetLogPriority(NKikimrServices::KQP_EXECUTER, NLog::PRI_TRACE); + runtime.SetLogPriority(NKikimrServices::KQP_SESSION, NLog::PRI_TRACE); + + InitRoot(server, sender); + + TCreateTabletNodePinning createTabletNodePinning(runtime); + + // Create table-1 at node 1 + createTabletNodePinning.SetNodeIndexes({ 0 }); + CreateShardedTable(server, sender, "/Root", "table-1", 1); + auto shards1 = GetTableShards(server, sender, "/Root/table-1"); + + // Create table-2 at node 2 + createTabletNodePinning.SetNodeIndexes({ 1 }); + CreateShardedTable(server, sender, "/Root", "table-2", 1); + + // Insert initial values + ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 10);")); + ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-2` (key, value) VALUES (2, 20);")); + + TSplitSrcBlocking splitSrcBlocking(runtime); + TMediatorTimeCastBlocking mediatorTimeCastBlocking(runtime); + + // We need to make a snapshot read to force unprotected reads + // This will also ensure both nodes have an updated mediator time cast + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleExec(runtime, Q_(R"( + SELECT key, value FROM `/Root/table-1` + UNION ALL + SELECT key, value FROM `/Root/table-2` + ORDER BY key; + )")), + "{ items { uint32_value: 1 } items { uint32_value: 10 } }, " + "{ items { uint32_value: 2 } items { uint32_value: 20 } }"); + + // One more upsert to table-2, this will bump mediator time past the snapshot + ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-2` (key, value) VALUES (3, 30);")); + + // Wait for the next step, we will expect to read at that step + ui64 step = mediatorTimeCastBlocking.WaitNextStep(); + Cerr << "... expecting next read at step " << step << Endl; + + // We won't allow node 2 to go past that snapshot + mediatorTimeCastBlocking.SetMaxAllowedStep(1, step); + + // Start a snapshot read from table-1 + // This will run at node 1 shard where mediator time is recent + Cerr << "... reading from table-1" << Endl; + TString sessionId, txId; + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleBegin(runtime, sessionId, txId, Q_(R"( + SELECT key, value FROM `/Root/table-1` + ORDER BY key; + )")), + "{ items { uint32_value: 1 } items { uint32_value: 10 } }"); + + // Start split of table-1 at key 10 + auto splitSender = runtime.AllocateEdgeActor(); + SetSplitMergePartCountLimit(server->GetRuntime(), -1); + Cerr << "... starting split of table-1" << Endl; + ui64 splitTxId = AsyncSplitTable(server, splitSender, "/Root/table-1", shards1.at(0), 10); + + // Perform an immediate write, which will happen after the above snapshot + // We also wait for the result to make sure mediator time advances at node 1 + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleExec(runtime, Q_(R"( + UPSERT INTO `/Root/table-1` (key, value) VALUES (4, 40); + )")), + ""); + + // Reboot destination shards + splitSrcBlocking.Wait(); + for (ui64 dstShard : splitSrcBlocking.DstShards) { + Cerr << "... rebooting shard " << dstShard << Endl; + RebootTablet(runtime, dstShard, sender); + } + + // Unblock split at src + splitSrcBlocking.Unblock(); + + // Wait until split finishes, so we can continue working with the new shards + Cerr << "... waiting for split to finish" << Endl; + WaitTxNotification(server, splitSender, splitTxId); + + // Start an immediate read from the new left shard of table-1 + TString readSessionId = CreateSessionRPC(runtime); + Cerr << "... starting immediate read from table-1" << Endl; + auto readFuture = SendRequest(runtime, MakeSimpleRequestRPC(Q_(R"( + SELECT key, value FROM `/Root/table-1` + WHERE key <= 5 + ORDER BY key; + )"), readSessionId, "", /* commit */ true)); + + // We cannot wait for it to finish (bug may be fixed) + // So we sleep for several seconds instead + Cerr << "... sleeping for 2 seconds" << Endl; + runtime.SimulateSleep(TDuration::Seconds(2)); + + // Unblock mediator timecast at node 2 + mediatorTimeCastBlocking.Unblock(1); + + // Wait until read finishes, we must observe previously inserted row + Cerr << "... waiting for table-1 read to finish" << Endl; + UNIT_ASSERT_VALUES_EQUAL( + FormatResult(AwaitResponse(runtime, std::move(readFuture))), + "{ items { uint32_value: 1 } items { uint32_value: 10 } }, " + "{ items { uint32_value: 4 } items { uint32_value: 40 } }"); + + // Repeat read in a previous tx, this read must be repeatable + // In other words we must not observe the new 4/40 row + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleContinue(runtime, sessionId, txId, Q_(R"( + SELECT key, value FROM `/Root/table-1` + UNION ALL + SELECT key, value FROM `/Root/table-2` + ORDER BY key; + )")), + "{ items { uint32_value: 1 } items { uint32_value: 10 } }, " + "{ items { uint32_value: 2 } items { uint32_value: 20 } }, " + "{ items { uint32_value: 3 } items { uint32_value: 30 } }"); + } + } } // namespace NKikimr diff --git a/ydb/core/tx/datashard/memory_state_migration.cpp b/ydb/core/tx/datashard/memory_state_migration.cpp index 826031acc31e..49ea8f109ae7 100644 --- a/ydb/core/tx/datashard/memory_state_migration.cpp +++ b/ydb/core/tx/datashard/memory_state_migration.cpp @@ -560,7 +560,7 @@ TDataShard::TPreservedInMemoryState TDataShard::PreserveInMemoryState() { }; // Serialize important in-memory vars - { + if (InMemoryVarsRestored) { auto* vars = state->MutableVars(); SnapshotManager.GetImmediateWriteEdge().ToProto(vars->MutableImmediateWriteEdge()); SnapshotManager.GetImmediateWriteEdgeReplied().ToProto(vars->MutableImmediateWriteEdgeReplied());