Skip to content

Commit

Permalink
Support sync enqueue with wait instead of spin
Browse files Browse the repository at this point in the history
  • Loading branch information
badrishc committed Jan 23, 2023
1 parent 53297ce commit fe48c0e
Showing 1 changed file with 60 additions and 1 deletion.
61 changes: 60 additions & 1 deletion cs/src/core/FasterLog/FasterLog.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Net;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
Expand Down Expand Up @@ -521,6 +520,66 @@ public unsafe bool TryEnqueue(ReadOnlySpan<byte> entry, out long logicalAddress)
return true;
}

/// <summary>
/// Append a user-defined blittable struct header and two SpanByte entries entries atomically to the log.
/// </summary>
/// <param name="userHeader"></param>
/// <param name="item1"></param>
/// <param name="item2"></param>
/// <param name="logicalAddress">Logical address of added entry</param>
public unsafe void Enqueue<THeader>(THeader userHeader, ref SpanByte item1, ref SpanByte item2, out long logicalAddress)
where THeader : unmanaged
{
logicalAddress = 0;
var length = sizeof(THeader) + item1.TotalSize + item2.TotalSize;
int allocatedLength = headerSize + Align(length);
ValidateAllocatedLength(allocatedLength);

epoch.Resume();

logicalAddress = AllocateBlock(allocatedLength);

var physicalAddress = (byte*)allocator.GetPhysicalAddress(logicalAddress);
*(THeader*)(physicalAddress + headerSize) = userHeader;
item1.CopyTo(physicalAddress + headerSize + sizeof(THeader));
item2.CopyTo(physicalAddress + headerSize + sizeof(THeader) + item1.TotalSize);
SetHeader(length, physicalAddress);
if (AutoRefreshSafeTailAddress) DoAutoRefreshSafeTailAddress();
epoch.Suspend();
if (AutoCommit) Commit();
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
private long AllocateBlock(int recordSize)
{
while (true)
{
var flushEvent = allocator.FlushEvent;
var logicalAddress = allocator.TryAllocate(recordSize);
if (logicalAddress > 0)
return logicalAddress;

if (logicalAddress == 0)
{
epoch.Suspend();
if (cannedException != null) throw cannedException;
try
{
flushEvent.Wait();
}
finally
{
epoch.Resume();
}
}

// logicalAddress is < 0 so we do not expect flushEvent to be signaled; refresh the epoch and retry now
allocator.TryComplete();
epoch.ProtectAndDrain();
Thread.Yield();
}
}

/// <summary>
/// Try to append a user-defined blittable struct header and two SpanByte entries entries atomically to the log.
/// If it returns true, we are done. If it returns false, we need to retry.
Expand Down

0 comments on commit fe48c0e

Please sign in to comment.