Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement detection and potential mitigation of recovery failure cycles #435

Merged
merged 4 commits into from
Oct 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,11 @@ public class NetheriteOrchestrationServiceSettings
/// </summary>
public int PartitionStartupTimeoutMinutes { get; set; } = 15;

/// <summary>
/// If true, disables the prefetching during replay.
/// </summary>
public bool DisablePrefetchDuringReplay { get; set; } = false;

/// <summary>
/// Allows attaching additional checkers and debuggers during testing.
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ partial class BlobManager : ICheckpointManager, ILogCommitManager
BlobUtilsV12.BlockBlobClients eventLogCommitBlob;
BlobLeaseClient leaseClient;

BlobUtilsV12.BlockBlobClients checkpointCompletedBlob;

BlobUtilsV12.BlobDirectory pageBlobPartitionDirectory;
BlobUtilsV12.BlobDirectory blockBlobPartitionDirectory;

Expand Down Expand Up @@ -419,6 +421,7 @@ public async Task StartAsync()

this.eventLogCommitBlob = this.blockBlobPartitionDirectory.GetBlockBlobClient(CommitBlobName);
this.leaseClient = this.eventLogCommitBlob.WithRetries.GetBlobLeaseClient();
this.checkpointCompletedBlob = this.blockBlobPartitionDirectory.GetBlockBlobClient(this.GetCheckpointCompletedBlobName());

AzureStorageDevice createDevice(string name) =>
new AzureStorageDevice(name, this.blockBlobPartitionDirectory.GetSubDirectory(name), this.pageBlobPartitionDirectory.GetSubDirectory(name), this, true);
Expand Down Expand Up @@ -1057,10 +1060,11 @@ IEnumerable<Guid> ICheckpointManager.GetLogCheckpointTokens()

internal async Task<bool> FindCheckpointsAsync(bool logIsEmpty)
{
BlobUtilsV12.BlockBlobClients checkpointCompletedBlob = default;
string jsonString = null;
DateTimeOffset lastModified = default;

try
{
string jsonString = null;

if (this.UseLocalFiles)
{
Expand All @@ -1076,24 +1080,24 @@ internal async Task<bool> FindCheckpointsAsync(bool logIsEmpty)
else
{
var partDir = this.blockBlobPartitionDirectory;
checkpointCompletedBlob = partDir.GetBlockBlobClient(this.GetCheckpointCompletedBlobName());

await this.PerformWithRetriesAsync(
semaphore: null,
requireLease: true,
"BlockBlobClient.DownloadContentAsync",
"FindCheckpointsAsync",
"",
checkpointCompletedBlob.Name,
this.checkpointCompletedBlob.Name,
1000,
true,
failIfReadonly: false,
async (numAttempts) =>
{
try
{
Azure.Response<BlobDownloadResult> downloadResult = await checkpointCompletedBlob.WithRetries.DownloadContentAsync();
Azure.Response<BlobDownloadResult> downloadResult = await this.checkpointCompletedBlob.WithRetries.DownloadContentAsync();
jsonString = downloadResult.Value.Content.ToString();
lastModified = downloadResult.Value.Details.LastModified;
this.CheckpointInfoETag = downloadResult.Value.Details.ETag;
return 1;
}
Expand All @@ -1105,22 +1109,65 @@ await this.PerformWithRetriesAsync(
});
}

if (jsonString == null)
{
return false;
}
else
{
// read the fields from the json to update the checkpoint info
JsonConvert.PopulateObject(jsonString, this.CheckpointInfo);
return true;
}
}
catch (Exception e)
{
this.HandleStorageError(nameof(FindCheckpointsAsync), "could not find any checkpoint", checkpointCompletedBlob.Name, e, true, this.PartitionErrorHandler.IsTerminated);
this.HandleStorageError(nameof(FindCheckpointsAsync), "could not find any checkpoint", this.checkpointCompletedBlob.Name, e, true, this.PartitionErrorHandler.IsTerminated);
throw;
}

if (jsonString == null)
{
return false;
}

try
{
// read the fields from the json to update the checkpoint info
JsonConvert.PopulateObject(jsonString, this.CheckpointInfo);
}
catch (JsonException e)
{
this.HandleStorageError(nameof(FindCheckpointsAsync), "could not parse json file describing last checkpoint", this.checkpointCompletedBlob.Name, e, true, false);
throw;
}

if (this.CheckpointInfo.RecoveryAttempts > 0 || DateTimeOffset.UtcNow - lastModified > TimeSpan.FromMinutes(5))
{
this.CheckpointInfo.RecoveryAttempts++;

this.TraceHelper.FasterProgress($"Incremented recovery attempt counter to {this.CheckpointInfo.RecoveryAttempts} in {this.checkpointCompletedBlob.Name}.");

await this.WriteCheckpointMetadataAsync();

// we start to boost the tracing after three failed attempts. This boosting applies to the recovery part only.
int StartBoostingAfter = 3;

// After some number of boosted attempts, we stop boosting since it seems unlikely that we will find new information.
int BoostFor = 10;

if (this.CheckpointInfo.RecoveryAttempts > StartBoostingAfter
&& this.CheckpointInfo.RecoveryAttempts <= StartBoostingAfter + BoostFor)
{
this.TraceHelper.BoostTracing = true;
}
}

return true;
}

public async Task ClearRecoveryAttempts()
{
if (this.CheckpointInfo.RecoveryAttempts > 0)
{
this.CheckpointInfo.RecoveryAttempts = 0;

this.TraceHelper.BoostTracing = false;

await this.WriteCheckpointMetadataAsync();

this.TraceHelper.FasterProgress($"Cleared recovery attempt counter in {this.checkpointCompletedBlob.Name}.");
}
}

void ICheckpointManager.CommitIndexCheckpoint(Guid indexToken, byte[] commitMetadata)
Expand Down Expand Up @@ -1436,7 +1483,7 @@ await this.PerformWithRetriesAsync(
}
}

internal async Task FinalizeCheckpointCompletedAsync()
internal async Task WriteCheckpointMetadataAsync()
{
var jsonText = JsonConvert.SerializeObject(this.CheckpointInfo, Formatting.Indented);
if (this.UseLocalFiles)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,25 +10,46 @@ namespace DurableTask.Netherite.Faster
[JsonObject]
class CheckpointInfo
{
/// <summary>
/// The FasterKV token for the last index checkpoint taken before this checkpoint.
/// </summary>
[JsonProperty]
public Guid IndexToken { get; set; }

/// <summary>
/// The FasterKV token for this checkpoint.
/// </summary>
[JsonProperty]
public Guid LogToken { get; set; }

/// <summary>
/// The FasterLog position for this checkpoint.
/// </summary>
[JsonProperty]
public long CommitLogPosition { get; set; }

/// <summary>
/// The input queue (event hubs) position for this checkpoint.
/// </summary>
[JsonProperty]
public long InputQueuePosition { get; set; }

/// <summary>
/// If the input queue position is a batch, the position within the batch.
/// </summary>
[JsonProperty]
public int InputQueueBatchPosition { get; set; }

/// <summary>
/// The input queue fingerprint for this checkpoint.
/// </summary>
[JsonProperty]
public string InputQueueFingerprint { get; set; }

[JsonProperty]
public long NumberInstances { get; set; }
/// <summary>
/// The number of recovery attempts that have been made for this checkpoint.
/// </summary>
//[JsonProperty]
public int RecoveryAttempts { get; set; }
}
}
4 changes: 2 additions & 2 deletions src/DurableTask.Netherite/StorageLayer/Faster/FasterKV.cs
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ void RunTask() {

public override async Task FinalizeCheckpointCompletedAsync(Guid guid)
{
await this.blobManager.FinalizeCheckpointCompletedAsync();
await this.blobManager.WriteCheckpointMetadataAsync();

if (this.cacheDebugger == null)
{
Expand Down Expand Up @@ -739,7 +739,7 @@ public override async Task RunPrefetchSession(IAsyncEnumerable<TrackedObjectKey>
long lastReport = 0;
void ReportProgress(int elapsedMillisecondsThreshold)
{
if (stopwatch.ElapsedMilliseconds - lastReport >= elapsedMillisecondsThreshold)
if (stopwatch.ElapsedMilliseconds - lastReport >= elapsedMillisecondsThreshold || this.TraceHelper.BoostTracing)
{
this.blobManager.TraceHelper.FasterProgress(
$"FasterKV PrefetchSession {sessionId} elapsed={stopwatch.Elapsed.TotalSeconds:F2}s issued={numberIssued} pending={maxConcurrency - prefetchSemaphore.CurrentCount} hits={numberHits} misses={numberMisses}");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ public FasterTraceHelper(ILogger logger, LogLevel logLevelLimit, ILogger perform
this.partitionId = (int) partitionId;
}

public bool IsTracingAtMostDetailedLevel => this.logLevelLimit == LogLevel.Trace;
public bool IsTracingAtMostDetailedLevel => this.logLevelLimit == LogLevel.Trace || this.BoostTracing;

public bool BoostTracing { get; set; }

// ----- faster storage layer events

Expand Down Expand Up @@ -139,7 +141,7 @@ public void FasterProgress(Func<string> constructString)

public void FasterStorageProgress(string details)
{
if (this.logLevelLimit <= LogLevel.Trace)
if (this.logLevelLimit <= LogLevel.Trace || this.BoostTracing)
{
this.logger.LogTrace("Part{partition:D2} {details}", this.partitionId, details);
EtwSource.Log.FasterStorageProgress(this.account, this.taskHub, this.partitionId, details, TraceUtils.AppName, TraceUtils.ExtensionVersion);
Expand Down
28 changes: 21 additions & 7 deletions src/DurableTask.Netherite/StorageLayer/Faster/LogWorker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -207,20 +207,24 @@ protected override async Task Process(IList<PartitionUpdateEvent> batch)
}
}

public async Task ReplayCommitLog(long from, StoreWorker worker)
public async Task ReplayCommitLog(long from, StoreWorker worker, bool enablePrefetch)
{
// this procedure is called by StoreWorker during recovery. It replays all the events
// that were committed to the log but are not reflected in the loaded store checkpoint.
try
{
// we create a pipeline where the fetch task obtains a stream of events and then duplicates the
// stream, so it can get replayed and prefetched in parallel.
var prefetchChannel = Channel.CreateBounded<TrackedObjectKey>(1000);
var prefetchChannel = enablePrefetch ? Channel.CreateBounded<TrackedObjectKey>(1000) : null;
var replayChannel = Channel.CreateBounded<PartitionUpdateEvent>(1000);

var fetchTask = this.FetchEvents(from, replayChannel.Writer, prefetchChannel.Writer);
var fetchTask = this.FetchEvents(from, replayChannel.Writer, prefetchChannel?.Writer);
var replayTask = Task.Run(() => this.ReplayEvents(replayChannel.Reader, worker));
var prefetchTask = Task.Run(() => worker.RunPrefetchSession(prefetchChannel.Reader.ReadAllAsync(this.cancellationToken)));

if (enablePrefetch)
{
var prefetchTask = Task.Run(() => worker.RunPrefetchSession(prefetchChannel.Reader.ReadAllAsync(this.cancellationToken)));
}

await fetchTask;
await replayTask;
Expand All @@ -241,23 +245,33 @@ async Task FetchEvents(long from, ChannelWriter<PartitionUpdateEvent> replayChan

await replayChannelWriter.WriteAsync(partitionEvent);

if (partitionEvent is IRequiresPrefetch evt)
if (prefetchChannelWriter != null && partitionEvent is IRequiresPrefetch evt)
{
foreach (var key in evt.KeysToPrefetch)
{
if (this.traceHelper.BoostTracing)
{
this.traceHelper.FasterProgress($"Replay Prefetches {key}");
}

await prefetchChannelWriter.WriteAsync(key);
}
}
}

replayChannelWriter.Complete();
prefetchChannelWriter.Complete();
prefetchChannelWriter?.Complete();
}

async Task ReplayEvents(ChannelReader<PartitionUpdateEvent> reader, StoreWorker worker)
{
await foreach (var partitionEvent in reader.ReadAllAsync(this.cancellationToken))
{
if (this.traceHelper.BoostTracing)
{
this.traceHelper.FasterProgress($"Replaying PartitionEvent {partitionEvent.NextCommitLogPosition}");
}

await worker.ReplayUpdate(partitionEvent);
}
}
Expand Down Expand Up @@ -285,7 +299,7 @@ async IAsyncEnumerable<PartitionUpdateEvent> EventsToReplay(long from)
await iter.WaitAsync(this.cancellationToken);
}

if (this.traceLogDetails)
if (this.traceHelper.IsTracingAtMostDetailedLevel)
{
this.TraceLogDetail("Read", iter.NextAddress, new ReadOnlySpan<byte>(result, 0, entryLength));
}
Expand Down
14 changes: 10 additions & 4 deletions src/DurableTask.Netherite/StorageLayer/Faster/PartitionStorage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,6 @@ class PartitionStorage : IPartitionState
readonly ILogger logger;
readonly ILogger performanceLogger;
readonly MemoryTracker memoryTracker;
//readonly CloudStorageAccount storageAccount;
//readonly string localFileDirectory;
//readonly CloudStorageAccount pageBlobStorageAccount;

Partition partition;
BlobManager blobManager;
Expand Down Expand Up @@ -193,7 +190,14 @@ async Task TerminationWrapper(Task what)
if (this.log.TailAddress > (long)this.storeWorker.CommitLogPosition)
{
// replay log as the store checkpoint lags behind the log
await this.TerminationWrapper(this.storeWorker.ReplayCommitLog(this.logWorker));

// after six unsuccessful attempts, we start disabling prefetch on every other attempt, to see if this can remedy the problem
int startDisablingPrefetchAfter = 6;

bool disablePrefetch = this.settings.DisablePrefetchDuringReplay
|| (this.blobManager.CheckpointInfo.RecoveryAttempts > startDisablingPrefetchAfter && (this.blobManager.CheckpointInfo.RecoveryAttempts - startDisablingPrefetchAfter) % 2 == 1);

await this.TerminationWrapper(this.storeWorker.ReplayCommitLog(this.logWorker, prefetch: !disablePrefetch));
}
}
catch (OperationCanceledException) when (this.partition.ErrorHandler.IsTerminated)
Expand All @@ -215,6 +219,8 @@ async Task TerminationWrapper(Task what)
}

this.TraceHelper.FasterProgress("Recovery complete");

await this.blobManager.ClearRecoveryAttempts();
}
this.blobManager.FaultInjector?.Started(this.blobManager);
return this.storeWorker.InputQueuePosition;
Expand Down
6 changes: 3 additions & 3 deletions src/DurableTask.Netherite/StorageLayer/Faster/StoreWorker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -681,16 +681,16 @@ protected override async Task Process(IList<PartitionEvent> batch)
return target;
}

public async Task ReplayCommitLog(LogWorker logWorker)
public async Task ReplayCommitLog(LogWorker logWorker, bool prefetch)
{
var startPosition = this.CommitLogPosition;
this.traceHelper.FasterProgress($"Replaying log from {startPosition}");
this.traceHelper.FasterProgress($"Replaying log from {startPosition} prefetch={prefetch} boostTracing={this.traceHelper.BoostTracing}");

var stopwatch = new System.Diagnostics.Stopwatch();
stopwatch.Start();

this.effectTracker.IsReplaying = true;
await logWorker.ReplayCommitLog(startPosition, this);
await logWorker.ReplayCommitLog(startPosition, this, prefetch);
stopwatch.Stop();
this.effectTracker.IsReplaying = false;

Expand Down
Loading