Skip to content

Commit

Permalink
Fix deltalog append logic
Browse files Browse the repository at this point in the history
  • Loading branch information
badrishc committed May 26, 2022
1 parent c1167e2 commit 61e7a3f
Showing 1 changed file with 13 additions and 8 deletions.
21 changes: 13 additions & 8 deletions cs/src/core/Allocator/AllocatorBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public abstract partial class AllocatorBase<Key, Value> : IDisposable
/// </summary>
protected readonly LightEpoch epoch;
private readonly bool ownedEpoch;

/// <summary>
/// Comparer
/// </summary>
Expand Down Expand Up @@ -226,7 +226,7 @@ public abstract partial class AllocatorBase<Key, Value> : IDisposable
/// Whether to preallocate log on initialization
/// </summary>
private readonly bool PreallocateLog = false;

/// <summary>
/// Error handling
/// </summary>
Expand Down Expand Up @@ -465,6 +465,11 @@ internal unsafe virtual void AsyncFlushDeltaToDevice(long startAddress, long end
deltaLog.Seal(destOffset);
deltaLog.Allocate(out entryLength, out destPhysicalAddress);
destOffset = 0;
if (destOffset + size > entryLength)
{
deltaLog.Seal(0);
deltaLog.Allocate(out entryLength, out destPhysicalAddress);
}
if (destOffset + size > entryLength)
throw new FasterException("Insufficient page size to write delta");
}
Expand Down Expand Up @@ -521,7 +526,7 @@ internal unsafe void ApplyDelta(DeltaLog log, long startPage, long endPage, long
unsafe
{
fixed (byte* m = metadata)
Buffer.MemoryCopy((void*) physicalAddress, m, entryLength, entryLength);
Buffer.MemoryCopy((void*)physicalAddress, m, entryLength, entryLength);
}

HybridLogRecoveryInfo recoveryInfo = new();
Expand All @@ -534,7 +539,7 @@ internal unsafe void ApplyDelta(DeltaLog log, long startPage, long endPage, long
break;
default:
throw new FasterException("Unexpected entry type");

}
}
}
Expand Down Expand Up @@ -1284,7 +1289,7 @@ private void OnPagesClosed(long newSafeHeadAddress)
if (this.NumActiveLockingSessions > 0 && OnLockEvictionObserver is not null)
MemoryPageScan(start, end, OnLockEvictionObserver);

if (OnEvictionObserver is not null)
if (OnEvictionObserver is not null)
MemoryPageScan(start, end, OnEvictionObserver);

int closePage = (int)(closePageAddress >> LogPageSizeBits);
Expand Down Expand Up @@ -1429,7 +1434,7 @@ protected void ShiftFlushedUntilAddress()
}
}
}

if (!errorList.Empty)
{
var info = errorList.GetEarliestError();
Expand Down Expand Up @@ -1952,7 +1957,7 @@ private void AsyncFlushPageCallback(uint errorCode, uint numBytes, object contex
{
// Note down error details and trigger handling only when we are certain this is the earliest
// error among currently issued flushes
errorList.Add(new CommitInfo { FromAddress = result.fromAddress, UntilAddress = result.untilAddress, ErrorCode = errorCode } );
errorList.Add(new CommitInfo { FromAddress = result.fromAddress, UntilAddress = result.untilAddress, ErrorCode = errorCode });
}
else
{
Expand Down Expand Up @@ -1994,7 +1999,7 @@ internal void UnsafeSkipError(CommitInfo info)
catch when (disposed) { }

}

/// <summary>
/// IOCompletion callback for page flush
/// </summary>
Expand Down

0 comments on commit 61e7a3f

Please sign in to comment.