Skip to content

Commit

Permalink
YQ: Added force refresh by reconnects in topic_session (#14058)
Browse files Browse the repository at this point in the history
  • Loading branch information
kardymonds authored Feb 3, 2025
1 parent e122885 commit 9d9a33e
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -457,6 +457,12 @@ class TTopicFormatHandler : public NActors::TActor<TTopicFormatHandler>, public
return statistics;
}

void ForceRefresh() override {
if (Parser) {
Parser->Refresh(true);
}
}

protected:
NActors::TActorId GetSelfId() const override {
return SelfId();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ class ITopicFormatHandler : public TNonCopyable {
virtual bool HasClients() const = 0;

virtual TFormatHandlerStatistic GetStatistics() = 0;
virtual void ForceRefresh() = 0;

protected:
virtual NActors::TActorId GetSelfId() const = 0;
Expand Down
9 changes: 9 additions & 0 deletions ydb/core/fq/libs/row_dispatcher/topic_session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,7 @@ class TTopicSession : public TActorBootstrapped<TTopicSession> {
TMaybe<ui64> GetOffset(const NFq::NRowDispatcherProto::TEvStartSession& settings);
void SendSessionError(TActorId readActorId, TStatus status);
void RestartSessionIfOldestClient(const TClientsInfo& info);
void RefreshParsers();

private:

Expand Down Expand Up @@ -501,6 +502,7 @@ void TTopicSession::Handle(NFq::TEvPrivate::TEvReconnectSession::TPtr&) {
LOG_ROW_DISPATCHER_DEBUG("Reconnect topic session, " << TopicPathPartition
<< ", StartingMessageTimestamp " << minTime
<< ", BufferSize " << BufferSize << ", WithoutConsumer " << Config.GetWithoutConsumer());
RefreshParsers();
StopReadSession();
CreateTopicSession();
Schedule(ReconnectPeriod, new NFq::TEvPrivate::TEvReconnectSession());
Expand Down Expand Up @@ -777,6 +779,7 @@ void TTopicSession::RestartSessionIfOldestClient(const TClientsInfo& info) {
Metrics.RestartSessionByOffsets->Inc();
++RestartSessionByOffsets;
info.RestartSessionByOffsetsByQuery->Inc();
RefreshParsers();
StopReadSession();

if (!ReadSession) {
Expand Down Expand Up @@ -910,6 +913,12 @@ TMaybe<ui64> TTopicSession::GetOffset(const NFq::NRowDispatcherProto::TEvStartSe
return Nothing();
}

void TTopicSession::RefreshParsers() {
for (const auto& [_, formatHandler] : FormatHandlers) {
formatHandler->ForceRefresh();
}
}

} // anonymous namespace

////////////////////////////////////////////////////////////////////////////////
Expand Down

0 comments on commit 9d9a33e

Please sign in to comment.