Skip to content

Commit

Permalink
YQ-4071 Fix exception handling in topic session (#14055)
Browse files Browse the repository at this point in the history
  • Loading branch information
kardymonds authored Feb 3, 2025
1 parent 07ed9e7 commit e122885
Showing 1 changed file with 11 additions and 5 deletions.
16 changes: 11 additions & 5 deletions ydb/core/fq/libs/row_dispatcher/topic_session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,8 @@ class TTopicSession : public TActorBootstrapped<TTopicSession> {
void SubscribeOnNextEvent();
void SendToParsing(const std::vector<NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent::TMessage>& messages);
void SendData(TClientsInfo& info);
void FatalError(TStatus status);
void FatalError(const TStatus& status);
void ThrowFatalError(const TStatus& status);
void SendDataArrived(TClientsInfo& client);
void StopReadSession();
TString GetSessionId() const;
Expand Down Expand Up @@ -332,7 +333,8 @@ class TTopicSession : public TActorBootstrapped<TTopicSession> {
IgnoreFunc(TEvRowDispatcher::TEvGetNextBatch);
IgnoreFunc(NFq::TEvRowDispatcher::TEvStartSession);
IgnoreFunc(NFq::TEvRowDispatcher::TEvStopSession);
IgnoreFunc(NFq::TEvPrivate::TEvSendStatistic);,
IgnoreFunc(NFq::TEvPrivate::TEvSendStatistic);
IgnoreFunc(NFq::TEvPrivate::TEvReconnectSession);,
ExceptionFunc(std::exception, HandleException)
)
};
Expand Down Expand Up @@ -567,7 +569,7 @@ void TTopicSession::TTopicEventProcessor::operator()(NYdb::NTopic::TSessionClose
const TString message = TStringBuilder() << "Read session to topic \"" << Self.TopicPathPartition << "\" was closed";
LOG_ROW_DISPATCHER_DEBUG(message << ": " << ev.DebugString());

Self.FatalError(TStatus::Fail(
Self.ThrowFatalError(TStatus::Fail(
NYql::NDq::YdbStatusToDqStatus(static_cast<Ydb::StatusIds::StatusCode>(ev.GetStatus())),
NYdb::NAdapters::ToYqlIssues(ev.GetIssues())
).AddParentIssue(message));
Expand Down Expand Up @@ -782,7 +784,7 @@ void TTopicSession::RestartSessionIfOldestClient(const TClientsInfo& info) {
}
}

void TTopicSession::FatalError(TStatus status) {
void TTopicSession::FatalError(const TStatus& status) {
LOG_ROW_DISPATCHER_ERROR("FatalError: " << status.GetErrorMessage());

for (auto& [readActorId, info] : Clients) {
Expand All @@ -791,7 +793,11 @@ void TTopicSession::FatalError(TStatus status) {
}
StopReadSession();
Become(&TTopicSession::ErrorState);
ythrow yexception() << "FatalError: " << status.GetErrorMessage(); // To exit from current stack and call once PassAway() in HandleException().
}

void TTopicSession::ThrowFatalError(const TStatus& status) {
FatalError(status);
ythrow yexception() << "FatalError: " << status.GetErrorMessage();
}

void TTopicSession::SendSessionError(TActorId readActorId, TStatus status) {
Expand Down

0 comments on commit e122885

Please sign in to comment.