From e122885d71d6c413432e4e7d74312860a440358d Mon Sep 17 00:00:00 2001 From: Dmitry Kardymon Date: Mon, 3 Feb 2025 08:31:03 +0300 Subject: [PATCH] YQ-4071 Fix exception handling in topic session (#14055) --- .../fq/libs/row_dispatcher/topic_session.cpp | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/ydb/core/fq/libs/row_dispatcher/topic_session.cpp b/ydb/core/fq/libs/row_dispatcher/topic_session.cpp index 5420ed0de64e..684f54ed5d13 100644 --- a/ydb/core/fq/libs/row_dispatcher/topic_session.cpp +++ b/ydb/core/fq/libs/row_dispatcher/topic_session.cpp @@ -288,7 +288,8 @@ class TTopicSession : public TActorBootstrapped { void SubscribeOnNextEvent(); void SendToParsing(const std::vector& messages); void SendData(TClientsInfo& info); - void FatalError(TStatus status); + void FatalError(const TStatus& status); + void ThrowFatalError(const TStatus& status); void SendDataArrived(TClientsInfo& client); void StopReadSession(); TString GetSessionId() const; @@ -332,7 +333,8 @@ class TTopicSession : public TActorBootstrapped { IgnoreFunc(TEvRowDispatcher::TEvGetNextBatch); IgnoreFunc(NFq::TEvRowDispatcher::TEvStartSession); IgnoreFunc(NFq::TEvRowDispatcher::TEvStopSession); - IgnoreFunc(NFq::TEvPrivate::TEvSendStatistic);, + IgnoreFunc(NFq::TEvPrivate::TEvSendStatistic); + IgnoreFunc(NFq::TEvPrivate::TEvReconnectSession);, ExceptionFunc(std::exception, HandleException) ) }; @@ -567,7 +569,7 @@ void TTopicSession::TTopicEventProcessor::operator()(NYdb::NTopic::TSessionClose const TString message = TStringBuilder() << "Read session to topic \"" << Self.TopicPathPartition << "\" was closed"; LOG_ROW_DISPATCHER_DEBUG(message << ": " << ev.DebugString()); - Self.FatalError(TStatus::Fail( + Self.ThrowFatalError(TStatus::Fail( NYql::NDq::YdbStatusToDqStatus(static_cast(ev.GetStatus())), NYdb::NAdapters::ToYqlIssues(ev.GetIssues()) ).AddParentIssue(message)); @@ -782,7 +784,7 @@ void TTopicSession::RestartSessionIfOldestClient(const TClientsInfo& info) { } } -void TTopicSession::FatalError(TStatus status) { +void TTopicSession::FatalError(const TStatus& status) { LOG_ROW_DISPATCHER_ERROR("FatalError: " << status.GetErrorMessage()); for (auto& [readActorId, info] : Clients) { @@ -791,7 +793,11 @@ void TTopicSession::FatalError(TStatus status) { } StopReadSession(); Become(&TTopicSession::ErrorState); - ythrow yexception() << "FatalError: " << status.GetErrorMessage(); // To exit from current stack and call once PassAway() in HandleException(). +} + +void TTopicSession::ThrowFatalError(const TStatus& status) { + FatalError(status); + ythrow yexception() << "FatalError: " << status.GetErrorMessage(); } void TTopicSession::SendSessionError(TActorId readActorId, TStatus status) {