Skip to content

Commit

Permalink
SQS: Fix TimeoutCookie_ leak (#11762)
Browse files Browse the repository at this point in the history
  • Loading branch information
qyryq authored Nov 20, 2024
1 parent 8bc06e6 commit 0cc149b
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 9 deletions.
15 changes: 8 additions & 7 deletions ydb/core/ymq/actor/action.h
Original file line number Diff line number Diff line change
Expand Up @@ -111,15 +111,15 @@ class TActionActor
DoBootstrap();
}

void Bootstrap(const NActors::TActorContext&) {
void Bootstrap(const NActors::TActorContext&) {
#define SQS_REQUEST_CASE(action) \
const auto& request = SourceSqsRequest_.Y_CAT(Get, action)(); \
auto response = Response_.Y_CAT(Mutable, action)(); \
FillAuthInformation(request); \
response->SetRequestId(RequestId_);

SQS_SWITCH_REQUEST_CUSTOM(SourceSqsRequest_, ENUMERATE_ALL_ACTIONS, Y_ABORT_UNLESS(false));
#undef SQS_REQUEST_CASE
#undef SQS_REQUEST_CASE

RLOG_SQS_DEBUG("Request started. Actor: " << this->SelfId()); // log new request id
StartTs_ = TActivationContext::Now();
Expand All @@ -130,6 +130,7 @@ class TActionActor

// Set timeout
if (cfg.GetRequestTimeoutMs()) {
TimeoutCookie_.Reset(ISchedulerCookie::Make2Way());
this->Schedule(TDuration::MilliSeconds(cfg.GetRequestTimeoutMs()), new TEvWakeup(REQUEST_TIMEOUT_WAKEUP_TAG), TimeoutCookie_.Get());
}

Expand Down Expand Up @@ -349,7 +350,7 @@ class TActionActor
RESPONSE_BATCH_CASE(SendMessageBatch)
RESPONSE_CASE(SetQueueAttributes)
RESPONSE_CASE(ListDeadLetterSourceQueues)
RESPONSE_CASE(CountQueues)
RESPONSE_CASE(CountQueues)
case NKikimrClient::TSqsResponse::kDeleteQueueBatch:
case NKikimrClient::TSqsResponse::kGetQueueAttributesBatch:
case NKikimrClient::TSqsResponse::kPurgeQueueBatch:
Expand Down Expand Up @@ -382,7 +383,7 @@ class TActionActor
);
}

protected:
protected:
template <class TResponse>
void AuditLogEntry(const TResponse& response, const TString& requestId, const TError* error = nullptr) {
if (!error && response.HasError()) {
Expand Down Expand Up @@ -555,7 +556,7 @@ class TActionActor
UserName_ = request.GetAuth().GetUserName();
FolderId_ = request.GetAuth().GetFolderId();
UserSID_ = request.GetAuth().GetUserSID();

if (IsCloud() && !FolderId_) {
auto items = ParseCloudSecurityToken(SecurityToken_);
UserName_ = std::get<0>(items);
Expand Down Expand Up @@ -885,7 +886,7 @@ class TActionActor
TIntrusivePtr<TSqsEvents::TQuoterResourcesForActions> QuoterResources_;
bool NeedReportSqsActionInflyCounter = false;
bool NeedReportYmqActionInflyCounter = false;
TSchedulerCookieHolder TimeoutCookie_ = ISchedulerCookie::Make2Way();
TSchedulerCookieHolder TimeoutCookie_;
NKikimrClient::TSqsRequest SourceSqsRequest_;
};

Expand Down
3 changes: 2 additions & 1 deletion ydb/core/ymq/actor/proxy_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ TString SecurityPrint(const NKikimrClient::TSqsResponse& resp) {
case NKikimrClient::TSqsResponse::kReceiveMessage: {
NKikimrClient::TSqsResponse respCopy = resp;
for (auto& msg : *respCopy.MutableReceiveMessage()->MutableMessages()) {
msg.SetData(TStringBuilder() << "[...user_data_" << msg.GetData().size() << "bytes" << "...]");
msg.SetData(TStringBuilder() << "[...user_data_" << msg.GetData().size() << "bytes" << "...]");
}
return TStringBuilder() << respCopy;
}
Expand Down Expand Up @@ -82,6 +82,7 @@ void TProxyActor::Bootstrap() {

const auto& cfg = Cfg();
if (cfg.GetRequestTimeoutMs()) {
TimeoutCookie_.Reset(ISchedulerCookie::Make2Way());
this->Schedule(TDuration::MilliSeconds(cfg.GetRequestTimeoutMs()), new TEvWakeup(), TimeoutCookie_.Get());
}

Expand Down
2 changes: 1 addition & 1 deletion ydb/core/ymq/actor/proxy_actor.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ class TProxyActor
THolder<IReplyCallback> Cb_;
bool ErrorResponse_ = false;
TInstant StartTs_;
TSchedulerCookieHolder TimeoutCookie_ = ISchedulerCookie::Make2Way();
TSchedulerCookieHolder TimeoutCookie_;

TIntrusivePtr<TUserCounters> UserCounters_;
TIntrusivePtr<TQueueCounters> QueueCounters_;
Expand Down

0 comments on commit 0cc149b

Please sign in to comment.