Skip to content

Commit

Permalink
Object log recovery fixes (#158)
Browse files Browse the repository at this point in the history
* Fixes to correctly recover in presence of object log
* Use a SafeConcurrentDictionary to store file handles, to avoid spurious handle creation when many threads read in parallel
* Add an option to enable file buffering for the storage device
* Ensure that during recovery, we only load records from HeadAddress onwards, into main memory
  • Loading branch information
badrishc authored Jul 23, 2019
1 parent 46fdcdf commit 1205739
Show file tree
Hide file tree
Showing 9 changed files with 104 additions and 72 deletions.
2 changes: 1 addition & 1 deletion cs/src/core/Allocator/AllocatorBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ public unsafe abstract class AllocatorBase<Key, Value> : IDisposable
/// Allocate page
/// </summary>
/// <param name="index"></param>
protected abstract void AllocatePage(int index);
internal abstract void AllocatePage(int index);
/// <summary>
/// Whether page is allocated
/// </summary>
Expand Down
2 changes: 1 addition & 1 deletion cs/src/core/Allocator/BlittableAllocator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ public override void Dispose()
/// Allocate memory page, pinned in memory, and in sector aligned form, if possible
/// </summary>
/// <param name="index"></param>
protected override void AllocatePage(int index)
internal override void AllocatePage(int index)
{
var adjustedSize = PageSize + 2 * sectorSize;
byte[] tmp = new byte[adjustedSize];
Expand Down
75 changes: 43 additions & 32 deletions cs/src/core/Allocator/GenericAllocator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ internal override void DeleteFromMemory()
/// Allocate memory page, pinned in memory, and in sector aligned form, if possible
/// </summary>
/// <param name="index"></param>
protected override void AllocatePage(int index)
internal override void AllocatePage(int index)
{
values[index] = AllocatePage();
PageStatusIndicator[index].PageFlushCloseStatus.PageFlushStatus = PMMFlushStatus.Flushed;
Expand Down Expand Up @@ -312,6 +312,8 @@ private void WriteAsync<TContext>(long flushPage, ulong alignedDestinationAddres
}
fixed (RecordInfo* pin = &src[0].info)
{
Debug.Assert(buffer.aligned_pointer + numBytesToWrite <= (byte*)buffer.handle.AddrOfPinnedObject() + buffer.buffer.Length);

Buffer.MemoryCopy((void*)((long)Unsafe.AsPointer(ref src[0]) + start), buffer.aligned_pointer + start,
numBytesToWrite - start, numBytesToWrite - start);
}
Expand All @@ -320,6 +322,8 @@ private void WriteAsync<TContext>(long flushPage, ulong alignedDestinationAddres
{
fixed (RecordInfo* pin = &src[0].info)
{
Debug.Assert(buffer.aligned_pointer + numBytesToWrite <= (byte*)buffer.handle.AddrOfPinnedObject() + buffer.buffer.Length);

Buffer.MemoryCopy((void*)((long)Unsafe.AsPointer(ref src[0]) + aligned_start), buffer.aligned_pointer + aligned_start,
numBytesToWrite - aligned_start, numBytesToWrite - aligned_start);
}
Expand Down Expand Up @@ -513,26 +517,17 @@ private void AsyncReadPageWithObjectsCallback<TContext>(uint errorCode, uint num

PageAsyncReadResult<TContext> result = (PageAsyncReadResult<TContext>)Overlapped.Unpack(overlap).AsyncResult;

var src = values[result.page % BufferSize];
Record<Key, Value>[] src;

// We are reading into a frame
if (result.frame != null)
{
var frame = (GenericFrame<Key, Value>)result.frame;
src = frame.GetPage(result.page % frame.frameSize);

if (result.freeBuffer2 == null && result.freeBuffer1 != null && result.freeBuffer1.required_bytes > 0)
{
PopulatePageFrame(result.freeBuffer1.GetValidPointer(), (int)result.maxPtr, src);
}
}
else
{
if (result.freeBuffer2 == null && result.freeBuffer1 != null && result.freeBuffer1.required_bytes > 0)
{
PopulatePage(result.freeBuffer1.GetValidPointer(), (int)result.maxPtr, result.page);
}
}
src = values[result.page % BufferSize];


// Deserialize all objects until untilptr
if (result.resumePtr < result.untilPtr)
Expand Down Expand Up @@ -560,19 +555,18 @@ private void AsyncReadPageWithObjectsCallback<TContext>(uint errorCode, uint num
// We will be re-issuing I/O, so free current overlap
Overlapped.Free(overlap);

// Compute new untilPtr
// We will now be able to process all records until (but not including) untilPtr
GetObjectInfo(result.freeBuffer1.GetValidPointer(), ref result.untilPtr, result.maxPtr, ObjectBlockSize, src, out long startptr, out long size);
GetObjectInfo(result.freeBuffer1.GetValidPointer(), ref result.untilPtr, result.maxPtr, ObjectBlockSize, out long startptr, out long size);

// Object log fragment should be aligned by construction
Debug.Assert(startptr % sectorSize == 0);

if (size > int.MaxValue)
throw new Exception("Unable to read object page, total size greater than 2GB: " + size);

var objBuffer = bufferPool.Get((int)size);
result.freeBuffer2 = objBuffer;
var alignedLength = (size + (sectorSize - 1)) & ~(sectorSize - 1);
var objBuffer = bufferPool.Get((int)alignedLength);
result.freeBuffer2 = objBuffer;

// Request objects from objlog
result.objlogDevice.ReadAsync(
Expand Down Expand Up @@ -718,7 +712,10 @@ public void Deserialize(byte *raw, long ptr, long untilptr, Record<Key, Value>[]

while (ptr < untilptr)
{
if (!src[ptr / recordSize].info.Invalid)
ref Record<Key, Value> record = ref Unsafe.AsRef<Record<Key, Value>>(raw + ptr);
src[ptr / recordSize].info = record.info;

if (!record.info.Invalid)
{
if (KeyHasObjects())
{
Expand All @@ -729,21 +726,32 @@ public void Deserialize(byte *raw, long ptr, long untilptr, Record<Key, Value>[]
stream.Seek(streamStartPos + key_addr->Address - start_addr, SeekOrigin.Begin);
}

src[ptr/recordSize].key = new Key();
src[ptr / recordSize].key = new Key();
keySerializer.Deserialize(ref src[ptr/recordSize].key);
}
}
else
{
src[ptr / recordSize].key = record.key;
}

if (ValueHasObjects() && !src[ptr / recordSize].info.Tombstone)
if (!record.info.Tombstone)
{
var value_addr = GetValueAddressInfo((long)raw + ptr);
if (start_addr == -1) start_addr = value_addr->Address;
if (stream.Position != streamStartPos + value_addr->Address - start_addr)
if (ValueHasObjects())
{
stream.Seek(streamStartPos + value_addr->Address - start_addr, SeekOrigin.Begin);
var value_addr = GetValueAddressInfo((long)raw + ptr);
if (start_addr == -1) start_addr = value_addr->Address;
if (stream.Position != streamStartPos + value_addr->Address - start_addr)
{
stream.Seek(streamStartPos + value_addr->Address - start_addr, SeekOrigin.Begin);
}

src[ptr / recordSize].value = new Value();
valueSerializer.Deserialize(ref src[ptr / recordSize].value);
}
else
{
src[ptr / recordSize].value = record.value;
}

src[ptr / recordSize].value = new Value();
valueSerializer.Deserialize(ref src[ptr/recordSize].value);
}
}
ptr += GetRecordSize(ptr);
Expand All @@ -765,17 +773,18 @@ public void Deserialize(byte *raw, long ptr, long untilptr, Record<Key, Value>[]
/// <param name="ptr"></param>
/// <param name="untilptr"></param>
/// <param name="objectBlockSize"></param>
/// <param name="src"></param>
/// <param name="startptr"></param>
/// <param name="size"></param>
public void GetObjectInfo(byte* raw, ref long ptr, long untilptr, int objectBlockSize, Record<Key, Value>[] src, out long startptr, out long size)
public void GetObjectInfo(byte* raw, ref long ptr, long untilptr, int objectBlockSize, out long startptr, out long size)
{
long minObjAddress = long.MaxValue;
long maxObjAddress = long.MinValue;

while (ptr < untilptr)
{
if (!src[ptr/recordSize].info.Invalid)
ref Record<Key, Value> record = ref Unsafe.AsRef<Record<Key, Value>>(raw + ptr);

if (!record.info.Invalid)
{
if (KeyHasObjects())
{
Expand All @@ -794,7 +803,7 @@ public void GetObjectInfo(byte* raw, ref long ptr, long untilptr, int objectBloc
}


if (ValueHasObjects() && !src[ptr / recordSize].info.Tombstone)
if (ValueHasObjects() && !record.info.Tombstone)
{
var value_addr = GetValueAddressInfo((long)raw + ptr);
var addr = value_addr->Address;
Expand Down Expand Up @@ -941,6 +950,8 @@ internal void PopulatePage(byte* src, int required_bytes, ref Record<Key, Value>
{
fixed (RecordInfo* pin = &destinationPage[0].info)
{
Debug.Assert(required_bytes <= recordSize * destinationPage.Length);

Buffer.MemoryCopy(src, Unsafe.AsPointer(ref destinationPage[0]), required_bytes, required_bytes);
}
}
Expand Down
2 changes: 1 addition & 1 deletion cs/src/core/Allocator/VarLenBlittableAllocator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ public override void Dispose()
/// Allocate memory page, pinned in memory, and in sector aligned form, if possible
/// </summary>
/// <param name="index"></param>
protected override void AllocatePage(int index)
internal override void AllocatePage(int index)
{
var adjustedSize = PageSize + 2 * sectorSize;
byte[] tmp = new byte[adjustedSize];
Expand Down
15 changes: 11 additions & 4 deletions cs/src/core/Device/LocalStorageDevice.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,24 @@ public class LocalStorageDevice : StorageDeviceBase
{
private readonly bool preallocateFile;
private readonly bool deleteOnClose;
private readonly ConcurrentDictionary<int, SafeFileHandle> logHandles;
private readonly bool disableFileBuffering;
private readonly SafeConcurrentDictionary<int, SafeFileHandle> logHandles;

/// <summary>
/// Constructor
/// </summary>
/// <param name="filename"></param>
/// <param name="preallocateFile"></param>
/// <param name="deleteOnClose"></param>
public LocalStorageDevice(string filename, bool preallocateFile = false, bool deleteOnClose = false)
/// <param name="disableFileBuffering"></param>
public LocalStorageDevice(string filename, bool preallocateFile = false, bool deleteOnClose = false, bool disableFileBuffering = true)
: base(filename, GetSectorSize(filename))
{
Native32.EnableProcessPrivileges();
this.preallocateFile = preallocateFile;
this.deleteOnClose = deleteOnClose;
logHandles = new ConcurrentDictionary<int, SafeFileHandle>();
this.disableFileBuffering = disableFileBuffering;
logHandles = new SafeConcurrentDictionary<int, SafeFileHandle>();
}

/// <summary>
Expand Down Expand Up @@ -175,7 +178,11 @@ private SafeFileHandle CreateHandle(int segmentId)
uint fileCreation = unchecked((uint)FileMode.OpenOrCreate);
uint fileFlags = Native32.FILE_FLAG_OVERLAPPED;

fileFlags = fileFlags | Native32.FILE_FLAG_NO_BUFFERING;
if (this.disableFileBuffering)
{
fileFlags = fileFlags | Native32.FILE_FLAG_NO_BUFFERING;
}

if (deleteOnClose)
{
fileFlags = fileFlags | Native32.FILE_FLAG_DELETE_ON_CLOSE;
Expand Down
10 changes: 10 additions & 0 deletions cs/src/core/Index/Common/Contexts.cs
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,10 @@ public struct HybridLogRecoveryInfo
/// </summary>
public long finalLogicalAddress;
/// <summary>
/// Head address
/// </summary>
public long headAddress;
/// <summary>
/// Guid array
/// </summary>
public Guid[] guids;
Expand All @@ -299,6 +303,7 @@ public void Initialize(Guid token, int _version)
flushedLogicalAddress = 0;
startLogicalAddress = 0;
finalLogicalAddress = 0;
headAddress = 0;
guids = new Guid[LightEpoch.kTableSize + 1];
continueTokens = new Dictionary<Guid, long>();
objectLogSegmentOffsets = null;
Expand Down Expand Up @@ -331,6 +336,9 @@ public void Initialize(StreamReader reader)
value = reader.ReadLine();
finalLogicalAddress = long.Parse(value);

value = reader.ReadLine();
headAddress = long.Parse(value);

value = reader.ReadLine();
numThreads = int.Parse(value);

Expand Down Expand Up @@ -421,6 +429,7 @@ public void Write(StreamWriter writer)
writer.WriteLine(flushedLogicalAddress);
writer.WriteLine(startLogicalAddress);
writer.WriteLine(finalLogicalAddress);
writer.WriteLine(headAddress);
writer.WriteLine(numThreads);
for (int i = 0; i < numThreads; i++)
{
Expand Down Expand Up @@ -449,6 +458,7 @@ public void DebugPrint()
Debug.WriteLine("Flushed LogicalAddress: {0}", flushedLogicalAddress);
Debug.WriteLine("Start Logical Address: {0}", startLogicalAddress);
Debug.WriteLine("Final Logical Address: {0}", finalLogicalAddress);
Debug.WriteLine("Head Address: {0}", headAddress);
Debug.WriteLine("Num sessions recovered: {0}", numThreads);
Debug.WriteLine("Recovered sessions: ");
foreach (var sessionInfo in continueTokens)
Expand Down
2 changes: 2 additions & 0 deletions cs/src/core/Index/FASTER/Checkpoint.cs
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,8 @@ private bool GlobalMoveToNextState(SystemState currentState, SystemState nextSta
WriteIndexCheckpointCompleteFile();
}

_hybridLogCheckpoint.info.headAddress = hlog.HeadAddress;

if (FoldOverSnapshot)
{
hlog.ShiftReadOnlyToTail(out long tailAddress);
Expand Down
63 changes: 30 additions & 33 deletions cs/src/core/Index/FASTER/Recovery.cs
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ private void InternalRecover(Guid indexToken, Guid hybridLogToken)


// Read appropriate hybrid log pages into memory
RestoreHybridLog(recoveredHLCInfo.info.finalLogicalAddress);
RestoreHybridLog(recoveredHLCInfo.info.finalLogicalAddress, recoveredHLCInfo.info.headAddress);

// Recover session information
_recoveredSessions = new SafeConcurrentDictionary<Guid, long>();
Expand All @@ -195,53 +195,50 @@ private void InternalRecover(Guid indexToken, Guid hybridLogToken)
}
}

private void RestoreHybridLog(long untilAddress)
private void RestoreHybridLog(long untilAddress, long headAddress)
{

var tailPage = hlog.GetPage(untilAddress);
var headPage = default(long);
if (untilAddress > hlog.GetStartLogicalAddress(tailPage))
// Special case: we do not load any records into memory
if (headAddress == untilAddress)
{
headPage = (tailPage + 1) - hlog.GetHeadOffsetLagInPages(); ;
hlog.AllocatePage(hlog.GetPageIndexForAddress(headAddress));
}
else
{
headPage = tailPage - hlog.GetHeadOffsetLagInPages();
}
headPage = headPage > 0 ? headPage : 0;
var tailPage = hlog.GetPage(untilAddress);
var headPage = hlog.GetPage(headAddress);

var recoveryStatus = new RecoveryStatus(hlog.GetCapacityNumPages(), headPage, tailPage, untilAddress);
for (int i = 0; i < recoveryStatus.capacity; i++)
{
recoveryStatus.readStatus[i] = ReadStatus.Done;
}
var recoveryStatus = new RecoveryStatus(hlog.GetCapacityNumPages(), headPage, tailPage, untilAddress);
for (int i = 0; i < recoveryStatus.capacity; i++)
{
recoveryStatus.readStatus[i] = ReadStatus.Done;
}

var numPages = 0;
for (var page = headPage; page <= tailPage; page++)
{
var pageIndex = hlog.GetPageIndexForPage(page);
recoveryStatus.readStatus[pageIndex] = ReadStatus.Pending;
numPages++;
}
var numPages = 0;
for (var page = headPage; page <= tailPage; page++)
{
var pageIndex = hlog.GetPageIndexForPage(page);
recoveryStatus.readStatus[pageIndex] = ReadStatus.Pending;
numPages++;
}

hlog.AsyncReadPagesFromDevice(headPage, numPages, untilAddress, AsyncReadPagesCallbackForRecovery, recoveryStatus);
hlog.AsyncReadPagesFromDevice(headPage, numPages, untilAddress, AsyncReadPagesCallbackForRecovery, recoveryStatus);

var done = false;
while (!done)
{
done = true;
for (long page = headPage; page <= tailPage; page++)
var done = false;
while (!done)
{
int pageIndex = hlog.GetPageIndexForPage(page);
if (recoveryStatus.readStatus[pageIndex] == ReadStatus.Pending)
done = true;
for (long page = headPage; page <= tailPage; page++)
{
done = false;
break;
int pageIndex = hlog.GetPageIndexForPage(page);
if (recoveryStatus.readStatus[pageIndex] == ReadStatus.Pending)
{
done = false;
break;
}
}
}
}

var headAddress = hlog.GetFirstValidLogicalAddress(headPage);
hlog.RecoveryReset(untilAddress, headAddress);
}

Expand Down
Loading

0 comments on commit 1205739

Please sign in to comment.