Skip to content

Commit

Permalink
Don't migrate uninitialized in-memory vars between datashards (#13235)
Browse files Browse the repository at this point in the history
  • Loading branch information
snaury authored Jan 10, 2025
1 parent b4275e1 commit d8812f9
Show file tree
Hide file tree
Showing 5 changed files with 167 additions and 9 deletions.
10 changes: 2 additions & 8 deletions ydb/core/tx/datashard/create_table_unit.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,14 +80,8 @@ EExecutionStatus TCreateTableUnit::Execute(TOperation::TPtr op,
BuildResult(op, NKikimrTxDataShard::TEvProposeTransactionResult::COMPLETE);
op->Result()->SetStepOrderId(op->GetStepOrder().ToPair());

if (DataShard.GetState() == TShardState::WaitScheme) {
txc.DB.NoMoreReadsForTx();
DataShard.SetPersistState(TShardState::Ready, txc);
// We could perform snapshot reads after becoming ready
// Make sure older versions restore mediator state in that case
DataShard.PersistUnprotectedReadsEnabled(txc);
DataShard.SendRegistrationRequestTimeCast(ctx);
}
txc.DB.NoMoreReadsForTx();
DataShard.OnTableCreated(txc, ctx);

return EExecutionStatus::DelayCompleteNoMoreRestarts;
}
Expand Down
13 changes: 13 additions & 0 deletions ydb/core/tx/datashard/datashard.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2863,6 +2863,7 @@ void TDataShard::FinishMediatorStateRestore(TTransactionContext& txc, ui64 readS
}

MediatorStateWaiting = false;
InMemoryVarsRestored = true;

// Resend all waiting messages
TVector<THolder<IEventHandle>> msgs;
Expand Down Expand Up @@ -4746,6 +4747,18 @@ void TDataShard::Handle(TEvTxUserProxy::TEvAllocateTxIdResult::TPtr& ev, const T
}
}

void TDataShard::OnTableCreated(TTransactionContext &txc, const TActorContext &ctx) {
if (GetState() == TShardState::WaitScheme) {
SetPersistState(TShardState::Ready, txc);
// A newly created table doesn't need to wait for mediator state
// restore and initial values can be trusted.
InMemoryVarsRestored = true;
// We could perform snapshot reads after becoming ready
// Make sure older versions restore mediator state in that case
PersistUnprotectedReadsEnabled(txc);
SendRegistrationRequestTimeCast(ctx);
}
}

} // NDataShard

Expand Down
2 changes: 2 additions & 0 deletions ydb/core/tx/datashard/datashard_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -2147,6 +2147,8 @@ class TDataShard
return LogThrottlers[type];
};

void OnTableCreated(TTransactionContext& txc, const TActorContext& ctx);

private:
///
class TLoanReturnTracker {
Expand Down
149 changes: 149 additions & 0 deletions ydb/core/tx/datashard/datashard_ut_snapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4314,9 +4314,16 @@ Y_UNIT_TEST_SUITE(DataShardSnapshots) {
Blocked.clear();
}

void Wait() {
Runtime.WaitFor("blocked TEvSplit", [&]{ return Blocked.size() > 0; });
}

private:
void Process(TEvDataShard::TEvSplit::TPtr& ev) {
Cerr << "... blocking TEvSplit" << Endl;
for (const auto& dstRange : ev->Get()->Record.GetSplitDescription().GetDestinationRanges()) {
DstShards.push_back(dstRange.GetTabletID());
}
Blocked.emplace_back(ev.Release());
}

Expand All @@ -4325,6 +4332,9 @@ Y_UNIT_TEST_SUITE(DataShardSnapshots) {
const ui32 Node1;
std::vector<std::unique_ptr<IEventHandle>> Blocked;
TTestActorRuntime::TEventObserverHolder Observer;

public:
std::vector<ui64> DstShards;
};

Y_UNIT_TEST(RepeatableReadAfterSplitRace) {
Expand Down Expand Up @@ -5709,6 +5719,145 @@ Y_UNIT_TEST_SUITE(DataShardSnapshots) {
"{ items { uint32_value: 2 } items { uint32_value: 22 } }");
}

Y_UNIT_TEST(DelayedWriteReadableAfterSplitAndReboot) {
TPortManager pm;
TServerSettings serverSettings(pm.GetPort(2134));
serverSettings.SetDomainName("Root")
.SetNodeCount(2)
.SetUseRealThreads(false)
.SetDomainPlanResolution(100);

Tests::TServer::TPtr server = new TServer(serverSettings);
auto &runtime = *server->GetRuntime();
auto sender = runtime.AllocateEdgeActor();

// Don't allow granular timecast side-stepping mediator time hacks in this test
TBlockEvents<TEvMediatorTimecast::TEvGranularUpdate> blockGranularUpdate(runtime);

runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_DEBUG);
runtime.SetLogPriority(NKikimrServices::KQP_EXECUTER, NLog::PRI_TRACE);
runtime.SetLogPriority(NKikimrServices::KQP_SESSION, NLog::PRI_TRACE);

InitRoot(server, sender);

TCreateTabletNodePinning createTabletNodePinning(runtime);

// Create table-1 at node 1
createTabletNodePinning.SetNodeIndexes({ 0 });
CreateShardedTable(server, sender, "/Root", "table-1", 1);
auto shards1 = GetTableShards(server, sender, "/Root/table-1");

// Create table-2 at node 2
createTabletNodePinning.SetNodeIndexes({ 1 });
CreateShardedTable(server, sender, "/Root", "table-2", 1);

// Insert initial values
ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 10);"));
ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-2` (key, value) VALUES (2, 20);"));

TSplitSrcBlocking splitSrcBlocking(runtime);
TMediatorTimeCastBlocking mediatorTimeCastBlocking(runtime);

// We need to make a snapshot read to force unprotected reads
// This will also ensure both nodes have an updated mediator time cast
UNIT_ASSERT_VALUES_EQUAL(
KqpSimpleExec(runtime, Q_(R"(
SELECT key, value FROM `/Root/table-1`
UNION ALL
SELECT key, value FROM `/Root/table-2`
ORDER BY key;
)")),
"{ items { uint32_value: 1 } items { uint32_value: 10 } }, "
"{ items { uint32_value: 2 } items { uint32_value: 20 } }");

// One more upsert to table-2, this will bump mediator time past the snapshot
ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-2` (key, value) VALUES (3, 30);"));

// Wait for the next step, we will expect to read at that step
ui64 step = mediatorTimeCastBlocking.WaitNextStep();
Cerr << "... expecting next read at step " << step << Endl;

// We won't allow node 2 to go past that snapshot
mediatorTimeCastBlocking.SetMaxAllowedStep(1, step);

// Start a snapshot read from table-1
// This will run at node 1 shard where mediator time is recent
Cerr << "... reading from table-1" << Endl;
TString sessionId, txId;
UNIT_ASSERT_VALUES_EQUAL(
KqpSimpleBegin(runtime, sessionId, txId, Q_(R"(
SELECT key, value FROM `/Root/table-1`
ORDER BY key;
)")),
"{ items { uint32_value: 1 } items { uint32_value: 10 } }");

// Start split of table-1 at key 10
auto splitSender = runtime.AllocateEdgeActor();
SetSplitMergePartCountLimit(server->GetRuntime(), -1);
Cerr << "... starting split of table-1" << Endl;
ui64 splitTxId = AsyncSplitTable(server, splitSender, "/Root/table-1", shards1.at(0), 10);

// Perform an immediate write, which will happen after the above snapshot
// We also wait for the result to make sure mediator time advances at node 1
UNIT_ASSERT_VALUES_EQUAL(
KqpSimpleExec(runtime, Q_(R"(
UPSERT INTO `/Root/table-1` (key, value) VALUES (4, 40);
)")),
"<empty>");

// Reboot destination shards
splitSrcBlocking.Wait();
for (ui64 dstShard : splitSrcBlocking.DstShards) {
Cerr << "... rebooting shard " << dstShard << Endl;
RebootTablet(runtime, dstShard, sender);
}

// Unblock split at src
splitSrcBlocking.Unblock();

// Wait until split finishes, so we can continue working with the new shards
Cerr << "... waiting for split to finish" << Endl;
WaitTxNotification(server, splitSender, splitTxId);

// Start an immediate read from the new left shard of table-1
TString readSessionId = CreateSessionRPC(runtime);
Cerr << "... starting immediate read from table-1" << Endl;
auto readFuture = SendRequest(runtime, MakeSimpleRequestRPC(Q_(R"(
SELECT key, value FROM `/Root/table-1`
WHERE key <= 5
ORDER BY key;
)"), readSessionId, "", /* commit */ true));

// We cannot wait for it to finish (bug may be fixed)
// So we sleep for several seconds instead
Cerr << "... sleeping for 2 seconds" << Endl;
runtime.SimulateSleep(TDuration::Seconds(2));

// Unblock mediator timecast at node 2
mediatorTimeCastBlocking.Unblock(1);

// Wait until read finishes, we must observe previously inserted row
Cerr << "... waiting for table-1 read to finish" << Endl;
UNIT_ASSERT_VALUES_EQUAL(
FormatResult(AwaitResponse(runtime, std::move(readFuture))),
"{ items { uint32_value: 1 } items { uint32_value: 10 } }, "
"{ items { uint32_value: 4 } items { uint32_value: 40 } }");

// Repeat read in a previous tx, this read must be repeatable
// In other words we must not observe the new 4/40 row
UNIT_ASSERT_VALUES_EQUAL(
KqpSimpleContinue(runtime, sessionId, txId, Q_(R"(
SELECT key, value FROM `/Root/table-1`
UNION ALL
SELECT key, value FROM `/Root/table-2`
ORDER BY key;
)")),
"{ items { uint32_value: 1 } items { uint32_value: 10 } }, "
"{ items { uint32_value: 2 } items { uint32_value: 20 } }, "
"{ items { uint32_value: 3 } items { uint32_value: 30 } }");
}

}

} // namespace NKikimr
2 changes: 1 addition & 1 deletion ydb/core/tx/datashard/memory_state_migration.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -560,7 +560,7 @@ TDataShard::TPreservedInMemoryState TDataShard::PreserveInMemoryState() {
};

// Serialize important in-memory vars
{
if (InMemoryVarsRestored) {
auto* vars = state->MutableVars();
SnapshotManager.GetImmediateWriteEdge().ToProto(vars->MutableImmediateWriteEdge());
SnapshotManager.GetImmediateWriteEdgeReplied().ToProto(vars->MutableImmediateWriteEdgeReplied());
Expand Down

0 comments on commit d8812f9

Please sign in to comment.