Skip to content

Commit

Permalink
YQ-3871 Restart session on query stopping (if query read historical d…
Browse files Browse the repository at this point in the history
…ata) (#12135)
  • Loading branch information
kardymonds authored Jan 16, 2025
1 parent 4c4b273 commit 3fde25c
Show file tree
Hide file tree
Showing 8 changed files with 494 additions and 130 deletions.
40 changes: 39 additions & 1 deletion ydb/core/fq/libs/row_dispatcher/topic_session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,7 @@ class TTopicSession : public TActorBootstrapped<TTopicSession> {
void SendStatisticToRowDispatcher();
void SendSessionError(TActorId readActorId, TStatus status);
bool CheckNewClient(NFq::TEvRowDispatcher::TEvStartSession::TPtr& ev);
void RestartSessionIfOldestClient(const TClientsInfo& info);

private:

Expand Down Expand Up @@ -750,10 +751,12 @@ void TTopicSession::Handle(NFq::TEvRowDispatcher::TEvStopSession::TPtr& ev) {

auto it = Clients.find(ev->Sender);
if (it == Clients.end()) {
LOG_ROW_DISPATCHER_DEBUG("Wrong ClientSettings");
LOG_ROW_DISPATCHER_WARN("Ignore TEvStopSession from " << ev->Sender << ", no client");
return;
}
auto& info = *it->second;
RestartSessionIfOldestClient(info);

UnreadBytes -= info.UnreadBytes;
Metrics.UnreadBytes->Sub(info.UnreadBytes);
if (const auto formatIt = FormatHandlers.find(info.HandlerSettings); formatIt != FormatHandlers.end()) {
Expand All @@ -769,6 +772,41 @@ void TTopicSession::Handle(NFq::TEvRowDispatcher::TEvStopSession::TPtr& ev) {
SubscribeOnNextEvent();
}

void TTopicSession::RestartSessionIfOldestClient(const TClientsInfo& info) {
// if we read historical data (because of this client), then we restart the session.

if (!ReadSession || !info.NextMessageOffset) {
return;
}
TMaybe<ui64> minMessageOffset;
for (auto& [readActorId, clientPtr] : Clients) {
if (info.ReadActorId == readActorId || !clientPtr->NextMessageOffset) {
continue;
}
if (!minMessageOffset) {
minMessageOffset = clientPtr->NextMessageOffset;
continue;
}
minMessageOffset = std::min(minMessageOffset, clientPtr->NextMessageOffset);
}
if (!minMessageOffset) {
return;
}

if (info.NextMessageOffset >= minMessageOffset) {
return;
}
LOG_ROW_DISPATCHER_INFO("Client (on StopSession) has less offset (" << info.NextMessageOffset << ") than others clients (" << minMessageOffset << "), stop (restart) topic session");
Metrics.RestartSessionByOffsets->Inc();
++RestartSessionByOffsets;
info.RestartSessionByOffsetsByQuery->Inc();
StopReadSession();

if (!ReadSession) {
Schedule(TDuration::Seconds(Config.GetTimeoutBeforeStartSessionSec()), new NFq::TEvPrivate::TEvCreateSession());
}
}

void TTopicSession::FatalError(TStatus status) {
LOG_ROW_DISPATCHER_ERROR("FatalError: " << status.GetErrorMessage());

Expand Down
Loading

0 comments on commit 3fde25c

Please sign in to comment.