diff --git a/ydb/core/fq/libs/row_dispatcher/format_handler/format_handler.cpp b/ydb/core/fq/libs/row_dispatcher/format_handler/format_handler.cpp index 85cf11c99eeb..270589f0f830 100644 --- a/ydb/core/fq/libs/row_dispatcher/format_handler/format_handler.cpp +++ b/ydb/core/fq/libs/row_dispatcher/format_handler/format_handler.cpp @@ -457,6 +457,12 @@ class TTopicFormatHandler : public NActors::TActor, public return statistics; } + void ForceRefresh() override { + if (Parser) { + Parser->Refresh(true); + } + } + protected: NActors::TActorId GetSelfId() const override { return SelfId(); diff --git a/ydb/core/fq/libs/row_dispatcher/format_handler/format_handler.h b/ydb/core/fq/libs/row_dispatcher/format_handler/format_handler.h index 6e2fbb82d1a0..8e0efb13dc94 100644 --- a/ydb/core/fq/libs/row_dispatcher/format_handler/format_handler.h +++ b/ydb/core/fq/libs/row_dispatcher/format_handler/format_handler.h @@ -55,6 +55,7 @@ class ITopicFormatHandler : public TNonCopyable { virtual bool HasClients() const = 0; virtual TFormatHandlerStatistic GetStatistics() = 0; + virtual void ForceRefresh() = 0; protected: virtual NActors::TActorId GetSelfId() const = 0; diff --git a/ydb/core/fq/libs/row_dispatcher/topic_session.cpp b/ydb/core/fq/libs/row_dispatcher/topic_session.cpp index 684f54ed5d13..81b45ee550b6 100644 --- a/ydb/core/fq/libs/row_dispatcher/topic_session.cpp +++ b/ydb/core/fq/libs/row_dispatcher/topic_session.cpp @@ -311,6 +311,7 @@ class TTopicSession : public TActorBootstrapped { TMaybe GetOffset(const NFq::NRowDispatcherProto::TEvStartSession& settings); void SendSessionError(TActorId readActorId, TStatus status); void RestartSessionIfOldestClient(const TClientsInfo& info); + void RefreshParsers(); private: @@ -501,6 +502,7 @@ void TTopicSession::Handle(NFq::TEvPrivate::TEvReconnectSession::TPtr&) { LOG_ROW_DISPATCHER_DEBUG("Reconnect topic session, " << TopicPathPartition << ", StartingMessageTimestamp " << minTime << ", BufferSize " << BufferSize << ", WithoutConsumer " << Config.GetWithoutConsumer()); + RefreshParsers(); StopReadSession(); CreateTopicSession(); Schedule(ReconnectPeriod, new NFq::TEvPrivate::TEvReconnectSession()); @@ -777,6 +779,7 @@ void TTopicSession::RestartSessionIfOldestClient(const TClientsInfo& info) { Metrics.RestartSessionByOffsets->Inc(); ++RestartSessionByOffsets; info.RestartSessionByOffsetsByQuery->Inc(); + RefreshParsers(); StopReadSession(); if (!ReadSession) { @@ -910,6 +913,12 @@ TMaybe TTopicSession::GetOffset(const NFq::NRowDispatcherProto::TEvStartSe return Nothing(); } +void TTopicSession::RefreshParsers() { + for (const auto& [_, formatHandler] : FormatHandlers) { + formatHandler->ForceRefresh(); + } +} + } // anonymous namespace ////////////////////////////////////////////////////////////////////////////////