From d3ea2902d6cac5a19ac4a75732a494f54eb7f1d9 Mon Sep 17 00:00:00 2001 From: sebastianburckhardt Date: Tue, 29 Oct 2024 11:07:48 -0700 Subject: [PATCH 1/4] implement detection of repeated recovery failures, and add triggers to boost tracing and disable prefetch during replay --- .../NetheriteOrchestrationServiceSettings.cs | 5 ++ .../Faster/AzureBlobs/BlobManager.cs | 74 ++++++++++++++----- .../Faster/AzureBlobs/CheckpointInfo.cs | 25 ++++++- .../StorageLayer/Faster/FasterKV.cs | 4 +- .../StorageLayer/Faster/FasterTraceHelper.cs | 6 +- .../StorageLayer/Faster/LogWorker.cs | 8 +- .../StorageLayer/Faster/PartitionStorage.cs | 10 ++- .../StorageLayer/Faster/StoreWorker.cs | 6 +- 8 files changed, 106 insertions(+), 32 deletions(-) diff --git a/src/DurableTask.Netherite/OrchestrationService/NetheriteOrchestrationServiceSettings.cs b/src/DurableTask.Netherite/OrchestrationService/NetheriteOrchestrationServiceSettings.cs index e96747d3..92e340e6 100644 --- a/src/DurableTask.Netherite/OrchestrationService/NetheriteOrchestrationServiceSettings.cs +++ b/src/DurableTask.Netherite/OrchestrationService/NetheriteOrchestrationServiceSettings.cs @@ -206,6 +206,11 @@ public class NetheriteOrchestrationServiceSettings /// public int PartitionStartupTimeoutMinutes { get; set; } = 15; + /// + /// If true, disables the prefetching during replay. + /// + public bool DisablePrefetchDuringReplay { get; set; } = false; + /// /// Allows attaching additional checkers and debuggers during testing. /// diff --git a/src/DurableTask.Netherite/StorageLayer/Faster/AzureBlobs/BlobManager.cs b/src/DurableTask.Netherite/StorageLayer/Faster/AzureBlobs/BlobManager.cs index d2871f17..d01589a1 100644 --- a/src/DurableTask.Netherite/StorageLayer/Faster/AzureBlobs/BlobManager.cs +++ b/src/DurableTask.Netherite/StorageLayer/Faster/AzureBlobs/BlobManager.cs @@ -42,6 +42,8 @@ partial class BlobManager : ICheckpointManager, ILogCommitManager BlobUtilsV12.BlockBlobClients eventLogCommitBlob; BlobLeaseClient leaseClient; + BlobUtilsV12.BlockBlobClients checkpointCompletedBlob; + BlobUtilsV12.BlobDirectory pageBlobPartitionDirectory; BlobUtilsV12.BlobDirectory blockBlobPartitionDirectory; @@ -419,6 +421,7 @@ public async Task StartAsync() this.eventLogCommitBlob = this.blockBlobPartitionDirectory.GetBlockBlobClient(CommitBlobName); this.leaseClient = this.eventLogCommitBlob.WithRetries.GetBlobLeaseClient(); + this.checkpointCompletedBlob = this.blockBlobPartitionDirectory.GetBlockBlobClient(this.GetCheckpointCompletedBlobName()); AzureStorageDevice createDevice(string name) => new AzureStorageDevice(name, this.blockBlobPartitionDirectory.GetSubDirectory(name), this.pageBlobPartitionDirectory.GetSubDirectory(name), this, true); @@ -1057,10 +1060,11 @@ IEnumerable ICheckpointManager.GetLogCheckpointTokens() internal async Task FindCheckpointsAsync(bool logIsEmpty) { - BlobUtilsV12.BlockBlobClients checkpointCompletedBlob = default; + string jsonString = null; + DateTimeOffset lastModified = default; + try { - string jsonString = null; if (this.UseLocalFiles) { @@ -1076,7 +1080,6 @@ internal async Task FindCheckpointsAsync(bool logIsEmpty) else { var partDir = this.blockBlobPartitionDirectory; - checkpointCompletedBlob = partDir.GetBlockBlobClient(this.GetCheckpointCompletedBlobName()); await this.PerformWithRetriesAsync( semaphore: null, @@ -1084,7 +1087,7 @@ await this.PerformWithRetriesAsync( "BlockBlobClient.DownloadContentAsync", "FindCheckpointsAsync", "", - checkpointCompletedBlob.Name, + this.checkpointCompletedBlob.Name, 1000, true, failIfReadonly: false, @@ -1092,8 +1095,9 @@ await this.PerformWithRetriesAsync( { try { - Azure.Response downloadResult = await checkpointCompletedBlob.WithRetries.DownloadContentAsync(); + Azure.Response downloadResult = await this.checkpointCompletedBlob.WithRetries.DownloadContentAsync(); jsonString = downloadResult.Value.Content.ToString(); + lastModified = downloadResult.Value.Details.LastModified; this.CheckpointInfoETag = downloadResult.Value.Details.ETag; return 1; } @@ -1105,22 +1109,58 @@ await this.PerformWithRetriesAsync( }); } - if (jsonString == null) - { - return false; - } - else - { - // read the fields from the json to update the checkpoint info - JsonConvert.PopulateObject(jsonString, this.CheckpointInfo); - return true; - } } catch (Exception e) { - this.HandleStorageError(nameof(FindCheckpointsAsync), "could not find any checkpoint", checkpointCompletedBlob.Name, e, true, this.PartitionErrorHandler.IsTerminated); + this.HandleStorageError(nameof(FindCheckpointsAsync), "could not find any checkpoint", this.checkpointCompletedBlob.Name, e, true, this.PartitionErrorHandler.IsTerminated); + throw; + } + + if (jsonString == null) + { + return false; + } + + try + { + // read the fields from the json to update the checkpoint info + JsonConvert.PopulateObject(jsonString, this.CheckpointInfo); + } + catch (JsonException e) + { + this.HandleStorageError(nameof(FindCheckpointsAsync), "could not parse json file describing last checkpoint", this.checkpointCompletedBlob.Name, e, true, false); throw; } + + if (this.CheckpointInfo.RecoveryAttempts > 0 || DateTimeOffset.UtcNow - lastModified > TimeSpan.FromMinutes(5)) + { + this.CheckpointInfo.RecoveryAttempts++; + + this.TraceHelper.FasterProgress($"Incremented recovery attempt counter to {this.CheckpointInfo.RecoveryAttempts} in {this.checkpointCompletedBlob.Name}."); + + await this.WriteCheckpointMetadataAsync(); + + if (this.CheckpointInfo.RecoveryAttempts > 3 && this.CheckpointInfo.RecoveryAttempts < 30) + { + this.TraceHelper.BoostTracing = true; + } + } + + return true; + } + + public async Task ClearRecoveryAttempts() + { + if (this.CheckpointInfo.RecoveryAttempts > 0) + { + this.CheckpointInfo.RecoveryAttempts = 0; + + this.TraceHelper.BoostTracing = false; + + await this.WriteCheckpointMetadataAsync(); + + this.TraceHelper.FasterProgress($"Cleared recovery attempt counter in {this.checkpointCompletedBlob.Name}."); + } } void ICheckpointManager.CommitIndexCheckpoint(Guid indexToken, byte[] commitMetadata) @@ -1436,7 +1476,7 @@ await this.PerformWithRetriesAsync( } } - internal async Task FinalizeCheckpointCompletedAsync() + internal async Task WriteCheckpointMetadataAsync() { var jsonText = JsonConvert.SerializeObject(this.CheckpointInfo, Formatting.Indented); if (this.UseLocalFiles) diff --git a/src/DurableTask.Netherite/StorageLayer/Faster/AzureBlobs/CheckpointInfo.cs b/src/DurableTask.Netherite/StorageLayer/Faster/AzureBlobs/CheckpointInfo.cs index 40f8d5da..30048422 100644 --- a/src/DurableTask.Netherite/StorageLayer/Faster/AzureBlobs/CheckpointInfo.cs +++ b/src/DurableTask.Netherite/StorageLayer/Faster/AzureBlobs/CheckpointInfo.cs @@ -10,25 +10,46 @@ namespace DurableTask.Netherite.Faster [JsonObject] class CheckpointInfo { + /// + /// The FasterKV token for the last index checkpoint taken before this checkpoint. + /// [JsonProperty] public Guid IndexToken { get; set; } + /// + /// The FasterKV token for this checkpoint. + /// [JsonProperty] public Guid LogToken { get; set; } + /// + /// The FasterLog position for this checkpoint. + /// [JsonProperty] public long CommitLogPosition { get; set; } + /// + /// The input queue (event hubs) position for this checkpoint. + /// [JsonProperty] public long InputQueuePosition { get; set; } + /// + /// If the input queue position is a batch, the position within the batch. + /// [JsonProperty] public int InputQueueBatchPosition { get; set; } + /// + /// The input queue fingerprint for this checkpoint. + /// [JsonProperty] public string InputQueueFingerprint { get; set; } - [JsonProperty] - public long NumberInstances { get; set; } + /// + /// The number of recovery attempts that have been made for this checkpoint. + /// + //[JsonProperty] + public int RecoveryAttempts { get; set; } } } diff --git a/src/DurableTask.Netherite/StorageLayer/Faster/FasterKV.cs b/src/DurableTask.Netherite/StorageLayer/Faster/FasterKV.cs index a7b323c0..3c152c85 100644 --- a/src/DurableTask.Netherite/StorageLayer/Faster/FasterKV.cs +++ b/src/DurableTask.Netherite/StorageLayer/Faster/FasterKV.cs @@ -380,7 +380,7 @@ void RunTask() { public override async Task FinalizeCheckpointCompletedAsync(Guid guid) { - await this.blobManager.FinalizeCheckpointCompletedAsync(); + await this.blobManager.WriteCheckpointMetadataAsync(); if (this.cacheDebugger == null) { @@ -739,7 +739,7 @@ public override async Task RunPrefetchSession(IAsyncEnumerable long lastReport = 0; void ReportProgress(int elapsedMillisecondsThreshold) { - if (stopwatch.ElapsedMilliseconds - lastReport >= elapsedMillisecondsThreshold) + if (stopwatch.ElapsedMilliseconds - lastReport >= elapsedMillisecondsThreshold || this.TraceHelper.BoostTracing) { this.blobManager.TraceHelper.FasterProgress( $"FasterKV PrefetchSession {sessionId} elapsed={stopwatch.Elapsed.TotalSeconds:F2}s issued={numberIssued} pending={maxConcurrency - prefetchSemaphore.CurrentCount} hits={numberHits} misses={numberMisses}"); diff --git a/src/DurableTask.Netherite/StorageLayer/Faster/FasterTraceHelper.cs b/src/DurableTask.Netherite/StorageLayer/Faster/FasterTraceHelper.cs index e42650f9..382948ab 100644 --- a/src/DurableTask.Netherite/StorageLayer/Faster/FasterTraceHelper.cs +++ b/src/DurableTask.Netherite/StorageLayer/Faster/FasterTraceHelper.cs @@ -25,7 +25,9 @@ public FasterTraceHelper(ILogger logger, LogLevel logLevelLimit, ILogger perform this.partitionId = (int) partitionId; } - public bool IsTracingAtMostDetailedLevel => this.logLevelLimit == LogLevel.Trace; + public bool IsTracingAtMostDetailedLevel => this.logLevelLimit == LogLevel.Trace || this.BoostTracing; + + public bool BoostTracing { get; set; } // ----- faster storage layer events @@ -139,7 +141,7 @@ public void FasterProgress(Func constructString) public void FasterStorageProgress(string details) { - if (this.logLevelLimit <= LogLevel.Trace) + if (this.logLevelLimit <= LogLevel.Trace || this.BoostTracing) { this.logger.LogTrace("Part{partition:D2} {details}", this.partitionId, details); EtwSource.Log.FasterStorageProgress(this.account, this.taskHub, this.partitionId, details, TraceUtils.AppName, TraceUtils.ExtensionVersion); diff --git a/src/DurableTask.Netherite/StorageLayer/Faster/LogWorker.cs b/src/DurableTask.Netherite/StorageLayer/Faster/LogWorker.cs index 16101ace..c187c79c 100644 --- a/src/DurableTask.Netherite/StorageLayer/Faster/LogWorker.cs +++ b/src/DurableTask.Netherite/StorageLayer/Faster/LogWorker.cs @@ -207,7 +207,7 @@ protected override async Task Process(IList batch) } } - public async Task ReplayCommitLog(long from, StoreWorker worker) + public async Task ReplayCommitLog(long from, StoreWorker worker, bool enablePrefetch) { // this procedure is called by StoreWorker during recovery. It replays all the events // that were committed to the log but are not reflected in the loaded store checkpoint. @@ -220,7 +220,11 @@ public async Task ReplayCommitLog(long from, StoreWorker worker) var fetchTask = this.FetchEvents(from, replayChannel.Writer, prefetchChannel.Writer); var replayTask = Task.Run(() => this.ReplayEvents(replayChannel.Reader, worker)); - var prefetchTask = Task.Run(() => worker.RunPrefetchSession(prefetchChannel.Reader.ReadAllAsync(this.cancellationToken))); + + if (enablePrefetch) + { + var prefetchTask = Task.Run(() => worker.RunPrefetchSession(prefetchChannel.Reader.ReadAllAsync(this.cancellationToken))); + } await fetchTask; await replayTask; diff --git a/src/DurableTask.Netherite/StorageLayer/Faster/PartitionStorage.cs b/src/DurableTask.Netherite/StorageLayer/Faster/PartitionStorage.cs index f534db80..5eb3ae57 100644 --- a/src/DurableTask.Netherite/StorageLayer/Faster/PartitionStorage.cs +++ b/src/DurableTask.Netherite/StorageLayer/Faster/PartitionStorage.cs @@ -20,9 +20,6 @@ class PartitionStorage : IPartitionState readonly ILogger logger; readonly ILogger performanceLogger; readonly MemoryTracker memoryTracker; - //readonly CloudStorageAccount storageAccount; - //readonly string localFileDirectory; - //readonly CloudStorageAccount pageBlobStorageAccount; Partition partition; BlobManager blobManager; @@ -193,7 +190,10 @@ async Task TerminationWrapper(Task what) if (this.log.TailAddress > (long)this.storeWorker.CommitLogPosition) { // replay log as the store checkpoint lags behind the log - await this.TerminationWrapper(this.storeWorker.ReplayCommitLog(this.logWorker)); + + bool disablePrefetch = this.blobManager.CheckpointInfo.RecoveryAttempts > 6 || this.settings.DisablePrefetchDuringReplay; + + await this.TerminationWrapper(this.storeWorker.ReplayCommitLog(this.logWorker, prefetch: !disablePrefetch)); } } catch (OperationCanceledException) when (this.partition.ErrorHandler.IsTerminated) @@ -215,6 +215,8 @@ async Task TerminationWrapper(Task what) } this.TraceHelper.FasterProgress("Recovery complete"); + + await this.blobManager.ClearRecoveryAttempts(); } this.blobManager.FaultInjector?.Started(this.blobManager); return this.storeWorker.InputQueuePosition; diff --git a/src/DurableTask.Netherite/StorageLayer/Faster/StoreWorker.cs b/src/DurableTask.Netherite/StorageLayer/Faster/StoreWorker.cs index 4b0238b9..79c2ef62 100644 --- a/src/DurableTask.Netherite/StorageLayer/Faster/StoreWorker.cs +++ b/src/DurableTask.Netherite/StorageLayer/Faster/StoreWorker.cs @@ -681,16 +681,16 @@ protected override async Task Process(IList batch) return target; } - public async Task ReplayCommitLog(LogWorker logWorker) + public async Task ReplayCommitLog(LogWorker logWorker, bool prefetch) { var startPosition = this.CommitLogPosition; - this.traceHelper.FasterProgress($"Replaying log from {startPosition}"); + this.traceHelper.FasterProgress($"Replaying log from {startPosition} prefetch={prefetch}"); var stopwatch = new System.Diagnostics.Stopwatch(); stopwatch.Start(); this.effectTracker.IsReplaying = true; - await logWorker.ReplayCommitLog(startPosition, this); + await logWorker.ReplayCommitLog(startPosition, this, prefetch); stopwatch.Stop(); this.effectTracker.IsReplaying = false; From b97eb92ec7668e4b90c59c4148166917a6f61af4 Mon Sep 17 00:00:00 2001 From: sebastianburckhardt Date: Tue, 29 Oct 2024 11:58:48 -0700 Subject: [PATCH 2/4] add comments as per PR feedback --- .../StorageLayer/Faster/AzureBlobs/BlobManager.cs | 3 +++ .../StorageLayer/Faster/PartitionStorage.cs | 2 ++ 2 files changed, 5 insertions(+) diff --git a/src/DurableTask.Netherite/StorageLayer/Faster/AzureBlobs/BlobManager.cs b/src/DurableTask.Netherite/StorageLayer/Faster/AzureBlobs/BlobManager.cs index d01589a1..e37a0ee0 100644 --- a/src/DurableTask.Netherite/StorageLayer/Faster/AzureBlobs/BlobManager.cs +++ b/src/DurableTask.Netherite/StorageLayer/Faster/AzureBlobs/BlobManager.cs @@ -1140,6 +1140,9 @@ await this.PerformWithRetriesAsync( await this.WriteCheckpointMetadataAsync(); + // we boost the tracing after three failed attempts. This boosting applies to the recovery part only. + // After thirty attempts, we stop boosting since it seems unlikely + // that there is any more information after that that cannot be found in the logs for the first 30 attempts. if (this.CheckpointInfo.RecoveryAttempts > 3 && this.CheckpointInfo.RecoveryAttempts < 30) { this.TraceHelper.BoostTracing = true; diff --git a/src/DurableTask.Netherite/StorageLayer/Faster/PartitionStorage.cs b/src/DurableTask.Netherite/StorageLayer/Faster/PartitionStorage.cs index 5eb3ae57..c17b663f 100644 --- a/src/DurableTask.Netherite/StorageLayer/Faster/PartitionStorage.cs +++ b/src/DurableTask.Netherite/StorageLayer/Faster/PartitionStorage.cs @@ -191,6 +191,8 @@ async Task TerminationWrapper(Task what) { // replay log as the store checkpoint lags behind the log + // disable prefetch if we have had many unsuccessful recovery attempts, or if the settings say so + // We choose 6 as the threshold since the tracing gets boosted after 3 attempts, and we want to see 3 attempts with boosted tracing before we disable prefetch bool disablePrefetch = this.blobManager.CheckpointInfo.RecoveryAttempts > 6 || this.settings.DisablePrefetchDuringReplay; await this.TerminationWrapper(this.storeWorker.ReplayCommitLog(this.logWorker, prefetch: !disablePrefetch)); From d89c992c71652fcf9cab1a639ce3f6b7e798dd22 Mon Sep 17 00:00:00 2001 From: sebastianburckhardt Date: Wed, 30 Oct 2024 08:53:19 -0700 Subject: [PATCH 3/4] fix the disabling of prefetch, make more readable, improve tracing --- .../Faster/AzureBlobs/BlobManager.cs | 12 +++++++---- .../StorageLayer/Faster/LogWorker.cs | 20 ++++++++++++++----- .../StorageLayer/Faster/StoreWorker.cs | 2 +- 3 files changed, 24 insertions(+), 10 deletions(-) diff --git a/src/DurableTask.Netherite/StorageLayer/Faster/AzureBlobs/BlobManager.cs b/src/DurableTask.Netherite/StorageLayer/Faster/AzureBlobs/BlobManager.cs index e37a0ee0..6af3503a 100644 --- a/src/DurableTask.Netherite/StorageLayer/Faster/AzureBlobs/BlobManager.cs +++ b/src/DurableTask.Netherite/StorageLayer/Faster/AzureBlobs/BlobManager.cs @@ -1140,10 +1140,14 @@ await this.PerformWithRetriesAsync( await this.WriteCheckpointMetadataAsync(); - // we boost the tracing after three failed attempts. This boosting applies to the recovery part only. - // After thirty attempts, we stop boosting since it seems unlikely - // that there is any more information after that that cannot be found in the logs for the first 30 attempts. - if (this.CheckpointInfo.RecoveryAttempts > 3 && this.CheckpointInfo.RecoveryAttempts < 30) + // we start to boost the tracing after three failed attempts. This boosting applies to the recovery part only. + int StartBoostingAfter = 3; + + // After some number of boosted attempts, we stop boosting since it seems unlikely that we will find new information. + int BoostFor = 10; + + if (this.CheckpointInfo.RecoveryAttempts > StartBoostingAfter + && this.CheckpointInfo.RecoveryAttempts <= StartBoostingAfter + BoostFor) { this.TraceHelper.BoostTracing = true; } diff --git a/src/DurableTask.Netherite/StorageLayer/Faster/LogWorker.cs b/src/DurableTask.Netherite/StorageLayer/Faster/LogWorker.cs index c187c79c..601f9fcb 100644 --- a/src/DurableTask.Netherite/StorageLayer/Faster/LogWorker.cs +++ b/src/DurableTask.Netherite/StorageLayer/Faster/LogWorker.cs @@ -215,10 +215,10 @@ public async Task ReplayCommitLog(long from, StoreWorker worker, bool enablePref { // we create a pipeline where the fetch task obtains a stream of events and then duplicates the // stream, so it can get replayed and prefetched in parallel. - var prefetchChannel = Channel.CreateBounded(1000); + var prefetchChannel = enablePrefetch ? Channel.CreateBounded(1000) : null; var replayChannel = Channel.CreateBounded(1000); - var fetchTask = this.FetchEvents(from, replayChannel.Writer, prefetchChannel.Writer); + var fetchTask = this.FetchEvents(from, replayChannel.Writer, prefetchChannel?.Writer); var replayTask = Task.Run(() => this.ReplayEvents(replayChannel.Reader, worker)); if (enablePrefetch) @@ -245,23 +245,33 @@ async Task FetchEvents(long from, ChannelWriter replayChan await replayChannelWriter.WriteAsync(partitionEvent); - if (partitionEvent is IRequiresPrefetch evt) + if (prefetchChannelWriter != null && partitionEvent is IRequiresPrefetch evt) { foreach (var key in evt.KeysToPrefetch) { + if (this.traceHelper.BoostTracing) + { + this.traceHelper.FasterProgress($"Replay Prefetches {key}"); + } + await prefetchChannelWriter.WriteAsync(key); } } } replayChannelWriter.Complete(); - prefetchChannelWriter.Complete(); + prefetchChannelWriter?.Complete(); } async Task ReplayEvents(ChannelReader reader, StoreWorker worker) { await foreach (var partitionEvent in reader.ReadAllAsync(this.cancellationToken)) { + if (this.traceHelper.BoostTracing) + { + this.traceHelper.FasterProgress($"Replaying PartitionEvent {partitionEvent.NextCommitLogPosition}"); + } + await worker.ReplayUpdate(partitionEvent); } } @@ -289,7 +299,7 @@ async IAsyncEnumerable EventsToReplay(long from) await iter.WaitAsync(this.cancellationToken); } - if (this.traceLogDetails) + if (this.traceHelper.IsTracingAtMostDetailedLevel) { this.TraceLogDetail("Read", iter.NextAddress, new ReadOnlySpan(result, 0, entryLength)); } diff --git a/src/DurableTask.Netherite/StorageLayer/Faster/StoreWorker.cs b/src/DurableTask.Netherite/StorageLayer/Faster/StoreWorker.cs index 79c2ef62..95ad1488 100644 --- a/src/DurableTask.Netherite/StorageLayer/Faster/StoreWorker.cs +++ b/src/DurableTask.Netherite/StorageLayer/Faster/StoreWorker.cs @@ -684,7 +684,7 @@ protected override async Task Process(IList batch) public async Task ReplayCommitLog(LogWorker logWorker, bool prefetch) { var startPosition = this.CommitLogPosition; - this.traceHelper.FasterProgress($"Replaying log from {startPosition} prefetch={prefetch}"); + this.traceHelper.FasterProgress($"Replaying log from {startPosition} prefetch={prefetch} boostTracing={this.traceHelper.BoostTracing}"); var stopwatch = new System.Diagnostics.Stopwatch(); stopwatch.Start(); From 8c6cb3ce47990a0afac98739020e124a293cdd89 Mon Sep 17 00:00:00 2001 From: sebastianburckhardt Date: Wed, 30 Oct 2024 14:22:05 -0700 Subject: [PATCH 4/4] disable prefetch on every other attempt only, to make sure we are not permanently breaking anything --- .../StorageLayer/Faster/PartitionStorage.cs | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/src/DurableTask.Netherite/StorageLayer/Faster/PartitionStorage.cs b/src/DurableTask.Netherite/StorageLayer/Faster/PartitionStorage.cs index c17b663f..6434dcb8 100644 --- a/src/DurableTask.Netherite/StorageLayer/Faster/PartitionStorage.cs +++ b/src/DurableTask.Netherite/StorageLayer/Faster/PartitionStorage.cs @@ -191,9 +191,11 @@ async Task TerminationWrapper(Task what) { // replay log as the store checkpoint lags behind the log - // disable prefetch if we have had many unsuccessful recovery attempts, or if the settings say so - // We choose 6 as the threshold since the tracing gets boosted after 3 attempts, and we want to see 3 attempts with boosted tracing before we disable prefetch - bool disablePrefetch = this.blobManager.CheckpointInfo.RecoveryAttempts > 6 || this.settings.DisablePrefetchDuringReplay; + // after six unsuccessful attempts, we start disabling prefetch on every other attempt, to see if this can remedy the problem + int startDisablingPrefetchAfter = 6; + + bool disablePrefetch = this.settings.DisablePrefetchDuringReplay + || (this.blobManager.CheckpointInfo.RecoveryAttempts > startDisablingPrefetchAfter && (this.blobManager.CheckpointInfo.RecoveryAttempts - startDisablingPrefetchAfter) % 2 == 1); await this.TerminationWrapper(this.storeWorker.ReplayCommitLog(this.logWorker, prefetch: !disablePrefetch)); }