Skip to content

Commit

Permalink
generic lookup: handle retrievable errors in grpc (#13119)
Browse files Browse the repository at this point in the history
  • Loading branch information
yumkam authored Jan 16, 2025
1 parent 0fd4bda commit 27eb560
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 12 deletions.
10 changes: 10 additions & 0 deletions ydb/library/yql/providers/generic/actors/yql_generic_base_actor.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ namespace NYql::NDq {
EvReadSplitsPart,
EvReadSplitsFinished,
EvError,
EvRetry,
EvEnd
};

Expand Down Expand Up @@ -89,6 +90,15 @@ namespace NYql::NDq {
NConnector::NApi::TError Error;
};

struct TEvRetry: NActors::TEventLocal<TEvRetry, EvRetry> {
explicit TEvRetry(ui32 nextRetries)
: NextRetries(nextRetries)
{
}

ui32 NextRetries;
};

protected: // TODO move common logic here
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ namespace NYql::NDq {
using namespace NActors;

namespace {
constexpr ui32 RequestRetriesLimit = 10; // TODO lookup parameters or PRAGMA?

const NKikimr::NMiniKQL::TStructType* MergeStructTypes(const NKikimr::NMiniKQL::TTypeEnvironment& env, const NKikimr::NMiniKQL::TStructType* t1, const NKikimr::NMiniKQL::TStructType* t2) {
Y_ABORT_UNLESS(t1);
Expand All @@ -45,7 +46,7 @@ namespace NYql::NDq {
template <typename T>
T ExtractFromConstFuture(const NThreading::TFuture<T>& f) {
// We want to avoid making a copy of data stored in a future.
// But there is no direct way to extract data from a const future5
// But there is no direct way to extract data from a const future
// So, we make a copy of the future, that is cheap. Then, extract the value from this copy.
// It destructs the value in the original future, but this trick is legal and documented here:
// https://docs.yandex-team.ru/arcadia-cpp/cookbook/concurrency
Expand Down Expand Up @@ -155,20 +156,25 @@ namespace NYql::NDq {
hFunc(TEvReadSplitsPart, Handle);
hFunc(TEvReadSplitsFinished, Handle);
hFunc(TEvError, Handle);
hFunc(TEvRetry, Handle);
hFunc(NActors::TEvents::TEvPoison, Handle);)

void Handle(TEvListSplitsIterator::TPtr ev) {
auto& iterator = ev->Get()->Iterator;
iterator->ReadNext().Subscribe(
[actorSystem = TActivationContext::ActorSystem(), selfId = SelfId()](const NConnector::TAsyncResult<NConnector::NApi::TListSplitsResponse>& asyncResult) {
[
actorSystem = TActivationContext::ActorSystem(),
selfId = SelfId(),
retriesRemaining = RetriesRemaining
](const NConnector::TAsyncResult<NConnector::NApi::TListSplitsResponse>& asyncResult) {
YQL_CLOG(DEBUG, ProviderGeneric) << "ActorId=" << selfId << " Got TListSplitsResponse from Connector";
auto result = ExtractFromConstFuture(asyncResult);
if (result.Status.Ok()) {
Y_ABORT_UNLESS(result.Response);
auto ev = new TEvListSplitsPart(std::move(*result.Response));
actorSystem->Send(new NActors::IEventHandle(selfId, selfId, ev));
} else {
SendError(actorSystem, selfId, result.Status);
SendRetryOrError(actorSystem, selfId, result.Status, retriesRemaining);
}
});
}
Expand All @@ -189,14 +195,18 @@ namespace NYql::NDq {
*readRequest.add_splits() = split;
readRequest.Setformat(NConnector::NApi::TReadSplitsRequest_EFormat::TReadSplitsRequest_EFormat_ARROW_IPC_STREAMING);
readRequest.set_filtering(NConnector::NApi::TReadSplitsRequest::FILTERING_MANDATORY);
Connector->ReadSplits(readRequest).Subscribe([actorSystem = TActivationContext::ActorSystem(), selfId = SelfId()](const NConnector::TReadSplitsStreamIteratorAsyncResult& asyncResult) {
Connector->ReadSplits(readRequest).Subscribe([
actorSystem = TActivationContext::ActorSystem(),
selfId = SelfId(),
retriesRemaining = RetriesRemaining
](const NConnector::TReadSplitsStreamIteratorAsyncResult& asyncResult) {
YQL_CLOG(DEBUG, ProviderGeneric) << "ActorId=" << selfId << " Got ReadSplitsStreamIterator from Connector";
auto result = ExtractFromConstFuture(asyncResult);
if (result.Status.Ok()) {
auto ev = new TEvReadSplitsIterator(std::move(result.Iterator));
actorSystem->Send(new NActors::IEventHandle(selfId, selfId, ev));
} else {
SendError(actorSystem, selfId, result.Status);
SendRetryOrError(actorSystem, selfId, result.Status, retriesRemaining);
}
});
}
Expand Down Expand Up @@ -225,6 +235,12 @@ namespace NYql::NDq {
actorSystem->Send(new NActors::IEventHandle(ParentId, SelfId(), errEv.release()));
}

void Handle(TEvRetry::TPtr ev) {
auto guard = Guard(*Alloc);
RetriesRemaining = ev->Get()->NextRetries;
SendRequest();
}

void Handle(NActors::TEvents::TEvPoison::TPtr) {
PassAway();
}
Expand All @@ -243,18 +259,22 @@ namespace NYql::NDq {
if (!request) {
return;
}
auto startCycleCount = GetCycleCountFast();
SentTime = TInstant::Now();
YQL_CLOG(DEBUG, ProviderGeneric) << "ActorId=" << SelfId() << " Got LookupRequest for " << request->size() << " keys";
Y_ABORT_IF(request->size() == 0 || request->size() > MaxKeysInRequest);

if (Count) {
Count->Inc();
InFlight->Inc();
Keys->Add(request->size());
}

Request = std::move(request);
RetriesRemaining = RequestRetriesLimit;
SendRequest();
}

void SendRequest() {
auto startCycleCount = GetCycleCountFast();
NConnector::NApi::TListSplitsRequest splitRequest;

auto error = FillSelect(*splitRequest.add_selects());
Expand All @@ -264,15 +284,19 @@ namespace NYql::NDq {
};

splitRequest.Setmax_split_count(1);
Connector->ListSplits(splitRequest).Subscribe([actorSystem = TActivationContext::ActorSystem(), selfId = SelfId()](const NConnector::TListSplitsStreamIteratorAsyncResult& asyncResult) {
Connector->ListSplits(splitRequest).Subscribe([
actorSystem = TActivationContext::ActorSystem(),
selfId = SelfId(),
retriesRemaining = RetriesRemaining
](const NConnector::TListSplitsStreamIteratorAsyncResult& asyncResult) {
auto result = ExtractFromConstFuture(asyncResult);
if (result.Status.Ok()) {
YQL_CLOG(DEBUG, ProviderGeneric) << "ActorId=" << selfId << " Got TListSplitsStreamIterator";
Y_ABORT_UNLESS(result.Iterator, "Uninitialized iterator");
auto ev = new TEvListSplitsIterator(std::move(result.Iterator));
actorSystem->Send(new NActors::IEventHandle(selfId, selfId, ev));
} else {
SendError(actorSystem, selfId, result.Status);
SendRetryOrError(actorSystem, selfId, result.Status, retriesRemaining);
}
});
if (CpuTime) {
Expand All @@ -282,12 +306,17 @@ namespace NYql::NDq {

void ReadNextData() {
ReadSplitsIterator->ReadNext().Subscribe(
[actorSystem = TActivationContext::ActorSystem(), selfId = SelfId()](const NConnector::TAsyncResult<NConnector::NApi::TReadSplitsResponse>& asyncResult) {
[
actorSystem = TActivationContext::ActorSystem(),
selfId = SelfId(),
retriesRemaining = RetriesRemaining
](const NConnector::TAsyncResult<NConnector::NApi::TReadSplitsResponse>& asyncResult) {
auto result = ExtractFromConstFuture(asyncResult);
if (result.Status.Ok()) {
YQL_CLOG(DEBUG, ProviderGeneric) << "ActorId=" << selfId << " Got DataChunk";
Y_ABORT_UNLESS(result.Response);
auto& response = *result.Response;
// TODO: retry on some YDB errors
if (NConnector::IsSuccess(response)) {
auto ev = new TEvReadSplitsPart(std::move(response));
actorSystem->Send(new NActors::IEventHandle(selfId, selfId, ev));
Expand All @@ -299,7 +328,7 @@ namespace NYql::NDq {
auto ev = new TEvReadSplitsFinished(std::move(result.Status));
actorSystem->Send(new NActors::IEventHandle(selfId, selfId, ev));
} else {
SendError(actorSystem, selfId, result.Status);
SendRetryOrError(actorSystem, selfId, result.Status, retriesRemaining);
}
});
}
Expand Down Expand Up @@ -365,7 +394,19 @@ namespace NYql::NDq {
new TEvError(std::move(error)));
}

static void SendError(NActors::TActorSystem* actorSystem, const NActors::TActorId& selfId, const NYdbGrpc::TGrpcStatus& status) {
static void SendRetryOrError(NActors::TActorSystem* actorSystem, const NActors::TActorId& selfId, const NYdbGrpc::TGrpcStatus& status, ui32 retriesRemaining) {
if (NConnector::GrpcStatusNeedsRetry(status)) {
if (retriesRemaining) {
const auto retry = RequestRetriesLimit - retriesRemaining;
const auto delay = TDuration::MilliSeconds(1u << retry); // Exponential delay from 1ms to ~0.5s
// <<< TODO tune/tweak
YQL_CLOG(WARN, ProviderGeneric) << "ActorId=" << selfId << " Got retrievable GRPC Error from Connector: " << status.ToDebugString() << ", retry " << (retry + 1) << " of " << RequestRetriesLimit << ", scheduled in " << delay;
--retriesRemaining;
actorSystem->Schedule(delay, new IEventHandle(selfId, selfId, new TEvRetry(retriesRemaining)));
return;
}
YQL_CLOG(ERROR, ProviderGeneric) << "ActorId=" << selfId << " Got retrievable GRPC Error from Connector: " << status.ToDebugString() << ", retry count exceed limit " << RequestRetriesLimit;
}
SendError(actorSystem, selfId, NConnector::ErrorFromGRPCStatus(status));
}

Expand Down Expand Up @@ -422,6 +463,8 @@ namespace NYql::NDq {

NConnector::NApi::TPredicate_TDisjunction disjunction;
for (const auto& [k, _] : *Request) {
// TODO consider skipping already retrieved keys
// ... but careful, can we end up with zero? TODO
NConnector::NApi::TPredicate_TConjunction conjunction;
for (ui32 c = 0; c != KeyType->GetMembersCount(); ++c) {
NConnector::NApi::TPredicate_TComparison eq;
Expand Down Expand Up @@ -454,6 +497,7 @@ namespace NYql::NDq {
std::shared_ptr<IDqAsyncLookupSource::TUnboxedValueMap> Request;
NConnector::IReadSplitsStreamIterator::TPtr ReadSplitsIterator; // TODO move me to TEvReadSplitsPart
NKikimr::NMiniKQL::TKeyPayloadPairVector LookupResult;
ui32 RetriesRemaining;
::NMonitoring::TDynamicCounters::TCounterPtr Count;
::NMonitoring::TDynamicCounters::TCounterPtr Keys;
::NMonitoring::TDynamicCounters::TCounterPtr ResultRows;
Expand Down

0 comments on commit 27eb560

Please sign in to comment.