Skip to content

Commit

Permalink
Extract CompletePending fix from Ted's PR (#722)
Browse files Browse the repository at this point in the history
  • Loading branch information
badrishc authored Jul 7, 2022
1 parent ad10d50 commit 61b7ec8
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 22 deletions.
2 changes: 1 addition & 1 deletion cs/src/core/Async/CompletePendingAsync.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ internal async ValueTask CompletePendingAsync<Input, Output, Context>(IFasterSes
try
{
InternalCompletePendingRequests(currentCtx, currentCtx, fasterSession, completedOutputs);
InternalCompleteRetryRequests(currentCtx, currentCtx, fasterSession);
InternalCompleteRetryRequests(currentCtx, currentCtx, fasterSession, completedOutputs);
}
finally
{
Expand Down
50 changes: 29 additions & 21 deletions cs/src/core/Index/FASTER/FASTERThread.cs
Original file line number Diff line number Diff line change
Expand Up @@ -119,15 +119,15 @@ internal void CopyContext<Input, Output, Context>(FasterExecutionContext<Input,
}

internal bool InternalCompletePending<Input, Output, Context, FasterSession>(
FasterExecutionContext<Input, Output, Context> ctx,
FasterSession fasterSession,
FasterExecutionContext<Input, Output, Context> ctx,
FasterSession fasterSession,
bool wait = false, CompletedOutputIterator<Key, Value, Input, Output, Context> completedOutputs = null)
where FasterSession : IFasterSession<Key, Value, Input, Output, Context>
{
while (true)
{
InternalCompletePendingRequests(ctx, ctx, fasterSession, completedOutputs);
InternalCompleteRetryRequests(ctx, ctx, fasterSession);
InternalCompleteRetryRequests(ctx, ctx, fasterSession, completedOutputs);
if (wait) ctx.WaitPending(epoch);

if (ctx.HasNoPendingRequests) return true;
Expand All @@ -143,9 +143,9 @@ internal bool InternalCompletePending<Input, Output, Context, FasterSession>(

#region Complete Retry Requests
internal void InternalCompleteRetryRequests<Input, Output, Context, FasterSession>(
FasterExecutionContext<Input, Output, Context> opCtx,
FasterExecutionContext<Input, Output, Context> currentCtx,
FasterSession fasterSession)
FasterExecutionContext<Input, Output, Context> opCtx,
FasterExecutionContext<Input, Output, Context> currentCtx,
FasterSession fasterSession, CompletedOutputIterator<Key, Value, Input, Output, Context> completedOutputs)
where FasterSession : IFasterSession<Key, Value, Input, Output, Context>
{
int count = opCtx.retryRequests.Count;
Expand All @@ -155,14 +155,21 @@ internal void InternalCompleteRetryRequests<Input, Output, Context, FasterSessio
for (int i = 0; i < count; i++)
{
var pendingContext = opCtx.retryRequests.Dequeue();
InternalCompleteRetryRequest(opCtx, currentCtx, pendingContext, fasterSession);
var status = InternalCompleteRetryRequest(opCtx, currentCtx, pendingContext, fasterSession);
if (completedOutputs is not null && status.IsCompletedSuccessfully)
{
// Transfer things to outputs from pendingContext before we dispose it.
completedOutputs.TransferTo(ref pendingContext, status);
}
if (!status.IsPending)
pendingContext.Dispose();
}
}

internal void InternalCompleteRetryRequest<Input, Output, Context, FasterSession>(
FasterExecutionContext<Input, Output, Context> opCtx,
FasterExecutionContext<Input, Output, Context> currentCtx,
PendingContext<Input, Output, Context> pendingContext,
internal Status InternalCompleteRetryRequest<Input, Output, Context, FasterSession>(
FasterExecutionContext<Input, Output, Context> opCtx,
FasterExecutionContext<Input, Output, Context> currentCtx,
PendingContext<Input, Output, Context> pendingContext,
FasterSession fasterSession)
where FasterSession : IFasterSession<Key, Value, Input, Output, Context>
{
Expand Down Expand Up @@ -204,16 +211,17 @@ ref pendingContext.input.Get(),
new RecordMetadata(pendingContext.recordInfo, pendingContext.logicalAddress));
break;
default:
throw new FasterException("Operation type not allowed for retry");
break;// throw new FasterException("Operation type not allowed for retry");
}
}
return status;
}
#endregion

#region Complete Pending Requests
internal void InternalCompletePendingRequests<Input, Output, Context, FasterSession>(
FasterExecutionContext<Input, Output, Context> opCtx,
FasterExecutionContext<Input, Output, Context> currentCtx,
FasterExecutionContext<Input, Output, Context> opCtx,
FasterExecutionContext<Input, Output, Context> currentCtx,
FasterSession fasterSession, CompletedOutputIterator<Key, Value, Input, Output, Context> completedOutputs)
where FasterSession : IFasterSession<Key, Value, Input, Output, Context>
{
Expand All @@ -228,9 +236,9 @@ internal void InternalCompletePendingRequests<Input, Output, Context, FasterSess
}

internal async ValueTask InternalCompletePendingRequestsAsync<Input, Output, Context, FasterSession>(
FasterExecutionContext<Input, Output, Context> opCtx,
FasterExecutionContext<Input, Output, Context> currentCtx,
FasterSession fasterSession,
FasterExecutionContext<Input, Output, Context> opCtx,
FasterExecutionContext<Input, Output, Context> currentCtx,
FasterSession fasterSession,
CancellationToken token,
CompletedOutputIterator<Key, Value, Input, Output, Context> completedOutputs)
where FasterSession : IFasterSession<Key, Value, Input, Output, Context>
Expand Down Expand Up @@ -261,9 +269,9 @@ internal async ValueTask InternalCompletePendingRequestsAsync<Input, Output, Con
}

internal void InternalCompletePendingRequest<Input, Output, Context, FasterSession>(
FasterExecutionContext<Input, Output, Context> opCtx,
FasterExecutionContext<Input, Output, Context> currentCtx,
FasterSession fasterSession,
FasterExecutionContext<Input, Output, Context> opCtx,
FasterExecutionContext<Input, Output, Context> currentCtx,
FasterSession fasterSession,
AsyncIOContext<Key, Value> request, CompletedOutputIterator<Key, Value, Input, Output, Context> completedOutputs)
where FasterSession : IFasterSession<Key, Value, Input, Output, Context>
{
Expand Down Expand Up @@ -349,4 +357,4 @@ ref pendingContext.input.Get(),
}
#endregion
}
}
}

0 comments on commit 61b7ec8

Please sign in to comment.