Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revise compaction state machine #410

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 25 additions & 14 deletions src/DurableTask.Netherite/StorageLayer/Faster/FasterKV.cs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ readonly ClientSession<Key, Value, EffectTracker, Output, object, IFunctions<Key
const int ReadRetryAfter = 20000;
EffectTracker effectTracker;

DateTime lastTraceForSkippedCompaction = DateTime.MinValue;

public FasterTraceHelper TraceHelper => this.blobManager.TraceHelper;

public int PageSizeBits => this.storelogsettings.PageSizeBits;
Expand Down Expand Up @@ -492,33 +494,42 @@ long MinimalLogSize
}
}

public override long? GetCompactionTarget()
public override bool CompactionIsDue(out long target)
{
// TODO empiric validation of the heuristics

var stats = (StatsState) this.singletons[(int)TrackedObjectKey.Stats.ObjectType];
long actualLogSize = this.fht.Log.TailAddress - this.fht.Log.BeginAddress;
long minimalLogSize = this.MinimalLogSize;
long compactionAreaSize = Math.Min(50000, this.fht.Log.SafeReadOnlyAddress - this.fht.Log.BeginAddress);
long compactionAreaSize = Math.Min(200000, this.fht.Log.SafeReadOnlyAddress - this.fht.Log.BeginAddress);

if (actualLogSize > 2 * minimalLogSize // there must be significant bloat
&& compactionAreaSize >= 5000) // and enough compaction area to justify the overhead
&& compactionAreaSize >= 10000) // and enough compaction area to justify the overhead
{
return this.fht.Log.BeginAddress + compactionAreaSize;
target = this.fht.Log.BeginAddress + compactionAreaSize;
return true;
}
else
{
this.TraceHelper.FasterCompactionProgress(
FasterTraceHelper.CompactionProgress.Skipped,
"",
this.Log.BeginAddress,
this.Log.SafeReadOnlyAddress,
this.Log.TailAddress,
minimalLogSize,
compactionAreaSize,
this.GetElapsedCompactionMilliseconds());

return null;
// trace the skipped compaction
// but this method is called quite frequently, so limit the traces to once per minute
if (this.lastTraceForSkippedCompaction + TimeSpan.FromMinutes(1) < DateTime.UtcNow)
{
this.lastTraceForSkippedCompaction = DateTime.UtcNow;
this.TraceHelper.FasterCompactionProgress(
FasterTraceHelper.CompactionProgress.Skipped,
"",
this.Log.BeginAddress,
this.Log.SafeReadOnlyAddress,
this.Log.TailAddress,
minimalLogSize,
compactionAreaSize,
this.GetElapsedCompactionMilliseconds());
}

target = 0;
return false;
}
}

Expand Down
59 changes: 42 additions & 17 deletions src/DurableTask.Netherite/StorageLayer/Faster/StoreWorker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ namespace DurableTask.Netherite.Faster
using DurableTask.Netherite.Scaling;
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;

Expand Down Expand Up @@ -362,23 +363,23 @@ bool CheckpointDue(out CheckpointTrigger trigger, out long? compactUntil)
{
trigger = CheckpointTrigger.EventCount;
}
else if (this.store.CompactionIsDue(out long compactionTarget))
{
compactUntil = compactionTarget;
trigger = CheckpointTrigger.Compaction;
}
else if (this.loadInfo.IsBusy() == null && DateTime.UtcNow > this.timeOfNextIdleCheckpoint)
{
// we have reached an idle point.
this.ScheduleNextIdleCheckpointTime();

compactUntil = this.store.GetCompactionTarget();
if (compactUntil.HasValue)
{
trigger = CheckpointTrigger.Compaction;
}
else if (this.numberEventsSinceLastCheckpoint > 0 || inputQueuePositionLag > 0)

if (this.numberEventsSinceLastCheckpoint > 0 || inputQueuePositionLag > 0)
{
// we checkpoint even though not much has happened
trigger = CheckpointTrigger.Idle;
}
}

return trigger != CheckpointTrigger.None;
}

Expand Down Expand Up @@ -431,7 +432,7 @@ void StartCheckpointOrFailOnTimeout(Func<bool> checkpointRoutine, string message
}
}

async ValueTask RunCheckpointingStateMachine()
async ValueTask<bool> RunCheckpointingStateMachine()
{
// handle progression of checkpointing state machine: none -> pendingCompaction -> pendingIndexCheckpoint -> pendingStoreCheckpoint -> none)
if (this.pendingStoreCheckpoint != null)
Expand Down Expand Up @@ -513,6 +514,13 @@ async ValueTask RunCheckpointingStateMachine()

this.pendingCompaction = this.RunCompactionAsync(compactUntil);
}
else
{
// there are no checkpoint or compaction operations in progress or due
return false;
}

return true; // there is a checkpoint or compaction operation in progress.
}

protected override async Task Process(IList<PartitionEvent> batch)
Expand Down Expand Up @@ -573,12 +581,6 @@ protected override async Task Process(IList<PartitionEvent> batch)
markPartitionAsActive = markPartitionAsActive || partitionEvent.CountsAsPartitionActivity;
}

// if we are processing events that count as activity, our latency category is at least "low"
if (markPartitionAsActive)
{
this.loadInfo.MarkActive();
}

Comment on lines -576 to -581
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

after this deleted piece of code, this method may return if it is shutting down. Could we be losing information by no longer recording the latency right before a shut down?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that would be o.k. since once we shut down we are no longer reporting the partition load anyway. The partition will be started somewhere else and start reporting load from there.

if (this.isShuttingDown || this.cancellationToken.IsCancellationRequested)
{
return;
Expand All @@ -587,8 +589,17 @@ protected override async Task Process(IList<PartitionEvent> batch)
this.store.AdjustCacheSize();

// handle progression of checkpointing state machine: none -> pendingCompaction -> pendingIndexCheckpoint -> pendingStoreCheckpoint -> none)
await this.RunCheckpointingStateMachine();

bool checkpointOrCompactionInProgress = await this.RunCheckpointingStateMachine();

// if a checkpoint or compaction is in progress, our latency category is at least "low"
markPartitionAsActive = markPartitionAsActive || checkpointOrCompactionInProgress;

if (markPartitionAsActive)
{
// mark this partition as having activity; this shows up in the partition table, and influences the the scale controller
this.loadInfo.MarkActive();
}

// periodically publish the partition load information and the send/receive positions
// also report checkpointing stats
if (this.lastPublished + PublishInterval < DateTime.UtcNow)
Expand Down Expand Up @@ -671,8 +682,22 @@ protected override async Task Process(IList<PartitionEvent> batch)
{
if (target.HasValue)
{
Stopwatch stopWatch = Stopwatch.StartNew();

target = await this.store.RunCompactionAsync(target.Value);

this.partition.Settings.TestHooks?.CheckpointInjector?.CompactionComplete(this.partition.ErrorHandler);

// we mark the latency of the compaction in the partition table so that the scale controller
// will scale out if needed to handle the compaction load
if (stopWatch.Elapsed.TotalSeconds > 5)
{
this.loadInfo.MarkHighLatency();
}
else if (stopWatch.Elapsed.TotalSeconds > 1)
{
this.loadInfo.MarkMediumLatency();
}
}

this.Notify();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ abstract class TrackedObjectStore

public abstract Task FinalizeCheckpointCompletedAsync(Guid guid);

public abstract long? GetCompactionTarget();
public abstract bool CompactionIsDue(out long target);

public abstract Task<long> RunCompactionAsync(long target);

Expand Down
Loading