diff --git a/ydb/library/yql/providers/generic/actors/yql_generic_base_actor.h b/ydb/library/yql/providers/generic/actors/yql_generic_base_actor.h index d3a07148ccea..04bf62f047ad 100644 --- a/ydb/library/yql/providers/generic/actors/yql_generic_base_actor.h +++ b/ydb/library/yql/providers/generic/actors/yql_generic_base_actor.h @@ -21,6 +21,7 @@ namespace NYql::NDq { EvReadSplitsPart, EvReadSplitsFinished, EvError, + EvRetry, EvEnd }; @@ -89,6 +90,15 @@ namespace NYql::NDq { NConnector::NApi::TError Error; }; + struct TEvRetry: NActors::TEventLocal { + explicit TEvRetry(ui32 nextRetries) + : NextRetries(nextRetries) + { + } + + ui32 NextRetries; + }; + protected: // TODO move common logic here }; diff --git a/ydb/library/yql/providers/generic/actors/yql_generic_lookup_actor.cpp b/ydb/library/yql/providers/generic/actors/yql_generic_lookup_actor.cpp index 379f58edace3..e85e4842c2fd 100644 --- a/ydb/library/yql/providers/generic/actors/yql_generic_lookup_actor.cpp +++ b/ydb/library/yql/providers/generic/actors/yql_generic_lookup_actor.cpp @@ -28,6 +28,7 @@ namespace NYql::NDq { using namespace NActors; namespace { + constexpr ui32 RequestRetriesLimit = 10; // TODO lookup parameters or PRAGMA? const NKikimr::NMiniKQL::TStructType* MergeStructTypes(const NKikimr::NMiniKQL::TTypeEnvironment& env, const NKikimr::NMiniKQL::TStructType* t1, const NKikimr::NMiniKQL::TStructType* t2) { Y_ABORT_UNLESS(t1); @@ -45,7 +46,7 @@ namespace NYql::NDq { template T ExtractFromConstFuture(const NThreading::TFuture& f) { // We want to avoid making a copy of data stored in a future. - // But there is no direct way to extract data from a const future5 + // But there is no direct way to extract data from a const future // So, we make a copy of the future, that is cheap. Then, extract the value from this copy. // It destructs the value in the original future, but this trick is legal and documented here: // https://docs.yandex-team.ru/arcadia-cpp/cookbook/concurrency @@ -155,12 +156,17 @@ namespace NYql::NDq { hFunc(TEvReadSplitsPart, Handle); hFunc(TEvReadSplitsFinished, Handle); hFunc(TEvError, Handle); + hFunc(TEvRetry, Handle); hFunc(NActors::TEvents::TEvPoison, Handle);) void Handle(TEvListSplitsIterator::TPtr ev) { auto& iterator = ev->Get()->Iterator; iterator->ReadNext().Subscribe( - [actorSystem = TActivationContext::ActorSystem(), selfId = SelfId()](const NConnector::TAsyncResult& asyncResult) { + [ + actorSystem = TActivationContext::ActorSystem(), + selfId = SelfId(), + retriesRemaining = RetriesRemaining + ](const NConnector::TAsyncResult& asyncResult) { YQL_CLOG(DEBUG, ProviderGeneric) << "ActorId=" << selfId << " Got TListSplitsResponse from Connector"; auto result = ExtractFromConstFuture(asyncResult); if (result.Status.Ok()) { @@ -168,7 +174,7 @@ namespace NYql::NDq { auto ev = new TEvListSplitsPart(std::move(*result.Response)); actorSystem->Send(new NActors::IEventHandle(selfId, selfId, ev)); } else { - SendError(actorSystem, selfId, result.Status); + SendRetryOrError(actorSystem, selfId, result.Status, retriesRemaining); } }); } @@ -189,14 +195,18 @@ namespace NYql::NDq { *readRequest.add_splits() = split; readRequest.Setformat(NConnector::NApi::TReadSplitsRequest_EFormat::TReadSplitsRequest_EFormat_ARROW_IPC_STREAMING); readRequest.set_filtering(NConnector::NApi::TReadSplitsRequest::FILTERING_MANDATORY); - Connector->ReadSplits(readRequest).Subscribe([actorSystem = TActivationContext::ActorSystem(), selfId = SelfId()](const NConnector::TReadSplitsStreamIteratorAsyncResult& asyncResult) { + Connector->ReadSplits(readRequest).Subscribe([ + actorSystem = TActivationContext::ActorSystem(), + selfId = SelfId(), + retriesRemaining = RetriesRemaining + ](const NConnector::TReadSplitsStreamIteratorAsyncResult& asyncResult) { YQL_CLOG(DEBUG, ProviderGeneric) << "ActorId=" << selfId << " Got ReadSplitsStreamIterator from Connector"; auto result = ExtractFromConstFuture(asyncResult); if (result.Status.Ok()) { auto ev = new TEvReadSplitsIterator(std::move(result.Iterator)); actorSystem->Send(new NActors::IEventHandle(selfId, selfId, ev)); } else { - SendError(actorSystem, selfId, result.Status); + SendRetryOrError(actorSystem, selfId, result.Status, retriesRemaining); } }); } @@ -225,6 +235,12 @@ namespace NYql::NDq { actorSystem->Send(new NActors::IEventHandle(ParentId, SelfId(), errEv.release())); } + void Handle(TEvRetry::TPtr ev) { + auto guard = Guard(*Alloc); + RetriesRemaining = ev->Get()->NextRetries; + SendRequest(); + } + void Handle(NActors::TEvents::TEvPoison::TPtr) { PassAway(); } @@ -243,11 +259,9 @@ namespace NYql::NDq { if (!request) { return; } - auto startCycleCount = GetCycleCountFast(); SentTime = TInstant::Now(); YQL_CLOG(DEBUG, ProviderGeneric) << "ActorId=" << SelfId() << " Got LookupRequest for " << request->size() << " keys"; Y_ABORT_IF(request->size() == 0 || request->size() > MaxKeysInRequest); - if (Count) { Count->Inc(); InFlight->Inc(); @@ -255,6 +269,12 @@ namespace NYql::NDq { } Request = std::move(request); + RetriesRemaining = RequestRetriesLimit; + SendRequest(); + } + + void SendRequest() { + auto startCycleCount = GetCycleCountFast(); NConnector::NApi::TListSplitsRequest splitRequest; auto error = FillSelect(*splitRequest.add_selects()); @@ -264,7 +284,11 @@ namespace NYql::NDq { }; splitRequest.Setmax_split_count(1); - Connector->ListSplits(splitRequest).Subscribe([actorSystem = TActivationContext::ActorSystem(), selfId = SelfId()](const NConnector::TListSplitsStreamIteratorAsyncResult& asyncResult) { + Connector->ListSplits(splitRequest).Subscribe([ + actorSystem = TActivationContext::ActorSystem(), + selfId = SelfId(), + retriesRemaining = RetriesRemaining + ](const NConnector::TListSplitsStreamIteratorAsyncResult& asyncResult) { auto result = ExtractFromConstFuture(asyncResult); if (result.Status.Ok()) { YQL_CLOG(DEBUG, ProviderGeneric) << "ActorId=" << selfId << " Got TListSplitsStreamIterator"; @@ -272,7 +296,7 @@ namespace NYql::NDq { auto ev = new TEvListSplitsIterator(std::move(result.Iterator)); actorSystem->Send(new NActors::IEventHandle(selfId, selfId, ev)); } else { - SendError(actorSystem, selfId, result.Status); + SendRetryOrError(actorSystem, selfId, result.Status, retriesRemaining); } }); if (CpuTime) { @@ -282,12 +306,17 @@ namespace NYql::NDq { void ReadNextData() { ReadSplitsIterator->ReadNext().Subscribe( - [actorSystem = TActivationContext::ActorSystem(), selfId = SelfId()](const NConnector::TAsyncResult& asyncResult) { + [ + actorSystem = TActivationContext::ActorSystem(), + selfId = SelfId(), + retriesRemaining = RetriesRemaining + ](const NConnector::TAsyncResult& asyncResult) { auto result = ExtractFromConstFuture(asyncResult); if (result.Status.Ok()) { YQL_CLOG(DEBUG, ProviderGeneric) << "ActorId=" << selfId << " Got DataChunk"; Y_ABORT_UNLESS(result.Response); auto& response = *result.Response; + // TODO: retry on some YDB errors if (NConnector::IsSuccess(response)) { auto ev = new TEvReadSplitsPart(std::move(response)); actorSystem->Send(new NActors::IEventHandle(selfId, selfId, ev)); @@ -299,7 +328,7 @@ namespace NYql::NDq { auto ev = new TEvReadSplitsFinished(std::move(result.Status)); actorSystem->Send(new NActors::IEventHandle(selfId, selfId, ev)); } else { - SendError(actorSystem, selfId, result.Status); + SendRetryOrError(actorSystem, selfId, result.Status, retriesRemaining); } }); } @@ -365,7 +394,19 @@ namespace NYql::NDq { new TEvError(std::move(error))); } - static void SendError(NActors::TActorSystem* actorSystem, const NActors::TActorId& selfId, const NYdbGrpc::TGrpcStatus& status) { + static void SendRetryOrError(NActors::TActorSystem* actorSystem, const NActors::TActorId& selfId, const NYdbGrpc::TGrpcStatus& status, ui32 retriesRemaining) { + if (NConnector::GrpcStatusNeedsRetry(status)) { + if (retriesRemaining) { + const auto retry = RequestRetriesLimit - retriesRemaining; + const auto delay = TDuration::MilliSeconds(1u << retry); // Exponential delay from 1ms to ~0.5s + // <<< TODO tune/tweak + YQL_CLOG(WARN, ProviderGeneric) << "ActorId=" << selfId << " Got retrievable GRPC Error from Connector: " << status.ToDebugString() << ", retry " << (retry + 1) << " of " << RequestRetriesLimit << ", scheduled in " << delay; + --retriesRemaining; + actorSystem->Schedule(delay, new IEventHandle(selfId, selfId, new TEvRetry(retriesRemaining))); + return; + } + YQL_CLOG(ERROR, ProviderGeneric) << "ActorId=" << selfId << " Got retrievable GRPC Error from Connector: " << status.ToDebugString() << ", retry count exceed limit " << RequestRetriesLimit; + } SendError(actorSystem, selfId, NConnector::ErrorFromGRPCStatus(status)); } @@ -422,6 +463,8 @@ namespace NYql::NDq { NConnector::NApi::TPredicate_TDisjunction disjunction; for (const auto& [k, _] : *Request) { + // TODO consider skipping already retrieved keys + // ... but careful, can we end up with zero? TODO NConnector::NApi::TPredicate_TConjunction conjunction; for (ui32 c = 0; c != KeyType->GetMembersCount(); ++c) { NConnector::NApi::TPredicate_TComparison eq; @@ -454,6 +497,7 @@ namespace NYql::NDq { std::shared_ptr Request; NConnector::IReadSplitsStreamIterator::TPtr ReadSplitsIterator; // TODO move me to TEvReadSplitsPart NKikimr::NMiniKQL::TKeyPayloadPairVector LookupResult; + ui32 RetriesRemaining; ::NMonitoring::TDynamicCounters::TCounterPtr Count; ::NMonitoring::TDynamicCounters::TCounterPtr Keys; ::NMonitoring::TDynamicCounters::TCounterPtr ResultRows;