From 12057399cfbff103e93b1ef508871b2f2354b4bc Mon Sep 17 00:00:00 2001 From: Badrish Chandramouli Date: Tue, 23 Jul 2019 09:55:51 -0700 Subject: [PATCH] Object log recovery fixes (#158) * Fixes to correctly recover in presence of object log * Use a SafeConcurrentDictionary to store file handles, to avoid spurious handle creation when many threads read in parallel * Add an option to enable file buffering for the storage device * Ensure that during recovery, we only load records from HeadAddress onwards, into main memory --- cs/src/core/Allocator/AllocatorBase.cs | 2 +- cs/src/core/Allocator/BlittableAllocator.cs | 2 +- cs/src/core/Allocator/GenericAllocator.cs | 75 +++++++++++-------- .../Allocator/VarLenBlittableAllocator.cs | 2 +- cs/src/core/Device/LocalStorageDevice.cs | 15 +++- cs/src/core/Index/Common/Contexts.cs | 10 +++ cs/src/core/Index/FASTER/Checkpoint.cs | 2 + cs/src/core/Index/FASTER/Recovery.cs | 63 ++++++++-------- cs/src/core/Utilities/BufferPool.cs | 5 ++ 9 files changed, 104 insertions(+), 72 deletions(-) diff --git a/cs/src/core/Allocator/AllocatorBase.cs b/cs/src/core/Allocator/AllocatorBase.cs index 558ac6f52..f69fdec03 100644 --- a/cs/src/core/Allocator/AllocatorBase.cs +++ b/cs/src/core/Allocator/AllocatorBase.cs @@ -329,7 +329,7 @@ public unsafe abstract class AllocatorBase : IDisposable /// Allocate page /// /// - protected abstract void AllocatePage(int index); + internal abstract void AllocatePage(int index); /// /// Whether page is allocated /// diff --git a/cs/src/core/Allocator/BlittableAllocator.cs b/cs/src/core/Allocator/BlittableAllocator.cs index 85736465b..ef1a9a773 100644 --- a/cs/src/core/Allocator/BlittableAllocator.cs +++ b/cs/src/core/Allocator/BlittableAllocator.cs @@ -121,7 +121,7 @@ public override void Dispose() /// Allocate memory page, pinned in memory, and in sector aligned form, if possible /// /// - protected override void AllocatePage(int index) + internal override void AllocatePage(int index) { var adjustedSize = PageSize + 2 * sectorSize; byte[] tmp = new byte[adjustedSize]; diff --git a/cs/src/core/Allocator/GenericAllocator.cs b/cs/src/core/Allocator/GenericAllocator.cs index 3e8ac26c0..2aec04422 100644 --- a/cs/src/core/Allocator/GenericAllocator.cs +++ b/cs/src/core/Allocator/GenericAllocator.cs @@ -197,7 +197,7 @@ internal override void DeleteFromMemory() /// Allocate memory page, pinned in memory, and in sector aligned form, if possible /// /// - protected override void AllocatePage(int index) + internal override void AllocatePage(int index) { values[index] = AllocatePage(); PageStatusIndicator[index].PageFlushCloseStatus.PageFlushStatus = PMMFlushStatus.Flushed; @@ -312,6 +312,8 @@ private void WriteAsync(long flushPage, ulong alignedDestinationAddres } fixed (RecordInfo* pin = &src[0].info) { + Debug.Assert(buffer.aligned_pointer + numBytesToWrite <= (byte*)buffer.handle.AddrOfPinnedObject() + buffer.buffer.Length); + Buffer.MemoryCopy((void*)((long)Unsafe.AsPointer(ref src[0]) + start), buffer.aligned_pointer + start, numBytesToWrite - start, numBytesToWrite - start); } @@ -320,6 +322,8 @@ private void WriteAsync(long flushPage, ulong alignedDestinationAddres { fixed (RecordInfo* pin = &src[0].info) { + Debug.Assert(buffer.aligned_pointer + numBytesToWrite <= (byte*)buffer.handle.AddrOfPinnedObject() + buffer.buffer.Length); + Buffer.MemoryCopy((void*)((long)Unsafe.AsPointer(ref src[0]) + aligned_start), buffer.aligned_pointer + aligned_start, numBytesToWrite - aligned_start, numBytesToWrite - aligned_start); } @@ -513,26 +517,17 @@ private void AsyncReadPageWithObjectsCallback(uint errorCode, uint num PageAsyncReadResult result = (PageAsyncReadResult)Overlapped.Unpack(overlap).AsyncResult; - var src = values[result.page % BufferSize]; + Record[] src; // We are reading into a frame if (result.frame != null) { var frame = (GenericFrame)result.frame; src = frame.GetPage(result.page % frame.frameSize); - - if (result.freeBuffer2 == null && result.freeBuffer1 != null && result.freeBuffer1.required_bytes > 0) - { - PopulatePageFrame(result.freeBuffer1.GetValidPointer(), (int)result.maxPtr, src); - } } else - { - if (result.freeBuffer2 == null && result.freeBuffer1 != null && result.freeBuffer1.required_bytes > 0) - { - PopulatePage(result.freeBuffer1.GetValidPointer(), (int)result.maxPtr, result.page); - } - } + src = values[result.page % BufferSize]; + // Deserialize all objects until untilptr if (result.resumePtr < result.untilPtr) @@ -560,9 +555,8 @@ private void AsyncReadPageWithObjectsCallback(uint errorCode, uint num // We will be re-issuing I/O, so free current overlap Overlapped.Free(overlap); - // Compute new untilPtr // We will now be able to process all records until (but not including) untilPtr - GetObjectInfo(result.freeBuffer1.GetValidPointer(), ref result.untilPtr, result.maxPtr, ObjectBlockSize, src, out long startptr, out long size); + GetObjectInfo(result.freeBuffer1.GetValidPointer(), ref result.untilPtr, result.maxPtr, ObjectBlockSize, out long startptr, out long size); // Object log fragment should be aligned by construction Debug.Assert(startptr % sectorSize == 0); @@ -570,9 +564,9 @@ private void AsyncReadPageWithObjectsCallback(uint errorCode, uint num if (size > int.MaxValue) throw new Exception("Unable to read object page, total size greater than 2GB: " + size); - var objBuffer = bufferPool.Get((int)size); - result.freeBuffer2 = objBuffer; var alignedLength = (size + (sectorSize - 1)) & ~(sectorSize - 1); + var objBuffer = bufferPool.Get((int)alignedLength); + result.freeBuffer2 = objBuffer; // Request objects from objlog result.objlogDevice.ReadAsync( @@ -718,7 +712,10 @@ public void Deserialize(byte *raw, long ptr, long untilptr, Record[] while (ptr < untilptr) { - if (!src[ptr / recordSize].info.Invalid) + ref Record record = ref Unsafe.AsRef>(raw + ptr); + src[ptr / recordSize].info = record.info; + + if (!record.info.Invalid) { if (KeyHasObjects()) { @@ -729,21 +726,32 @@ public void Deserialize(byte *raw, long ptr, long untilptr, Record[] stream.Seek(streamStartPos + key_addr->Address - start_addr, SeekOrigin.Begin); } - src[ptr/recordSize].key = new Key(); + src[ptr / recordSize].key = new Key(); keySerializer.Deserialize(ref src[ptr/recordSize].key); - } + } + else + { + src[ptr / recordSize].key = record.key; + } - if (ValueHasObjects() && !src[ptr / recordSize].info.Tombstone) + if (!record.info.Tombstone) { - var value_addr = GetValueAddressInfo((long)raw + ptr); - if (start_addr == -1) start_addr = value_addr->Address; - if (stream.Position != streamStartPos + value_addr->Address - start_addr) + if (ValueHasObjects()) { - stream.Seek(streamStartPos + value_addr->Address - start_addr, SeekOrigin.Begin); + var value_addr = GetValueAddressInfo((long)raw + ptr); + if (start_addr == -1) start_addr = value_addr->Address; + if (stream.Position != streamStartPos + value_addr->Address - start_addr) + { + stream.Seek(streamStartPos + value_addr->Address - start_addr, SeekOrigin.Begin); + } + + src[ptr / recordSize].value = new Value(); + valueSerializer.Deserialize(ref src[ptr / recordSize].value); + } + else + { + src[ptr / recordSize].value = record.value; } - - src[ptr / recordSize].value = new Value(); - valueSerializer.Deserialize(ref src[ptr/recordSize].value); } } ptr += GetRecordSize(ptr); @@ -765,17 +773,18 @@ public void Deserialize(byte *raw, long ptr, long untilptr, Record[] /// /// /// - /// /// /// - public void GetObjectInfo(byte* raw, ref long ptr, long untilptr, int objectBlockSize, Record[] src, out long startptr, out long size) + public void GetObjectInfo(byte* raw, ref long ptr, long untilptr, int objectBlockSize, out long startptr, out long size) { long minObjAddress = long.MaxValue; long maxObjAddress = long.MinValue; while (ptr < untilptr) { - if (!src[ptr/recordSize].info.Invalid) + ref Record record = ref Unsafe.AsRef>(raw + ptr); + + if (!record.info.Invalid) { if (KeyHasObjects()) { @@ -794,7 +803,7 @@ public void GetObjectInfo(byte* raw, ref long ptr, long untilptr, int objectBloc } - if (ValueHasObjects() && !src[ptr / recordSize].info.Tombstone) + if (ValueHasObjects() && !record.info.Tombstone) { var value_addr = GetValueAddressInfo((long)raw + ptr); var addr = value_addr->Address; @@ -941,6 +950,8 @@ internal void PopulatePage(byte* src, int required_bytes, ref Record { fixed (RecordInfo* pin = &destinationPage[0].info) { + Debug.Assert(required_bytes <= recordSize * destinationPage.Length); + Buffer.MemoryCopy(src, Unsafe.AsPointer(ref destinationPage[0]), required_bytes, required_bytes); } } diff --git a/cs/src/core/Allocator/VarLenBlittableAllocator.cs b/cs/src/core/Allocator/VarLenBlittableAllocator.cs index 353409c27..3abd21c70 100644 --- a/cs/src/core/Allocator/VarLenBlittableAllocator.cs +++ b/cs/src/core/Allocator/VarLenBlittableAllocator.cs @@ -193,7 +193,7 @@ public override void Dispose() /// Allocate memory page, pinned in memory, and in sector aligned form, if possible /// /// - protected override void AllocatePage(int index) + internal override void AllocatePage(int index) { var adjustedSize = PageSize + 2 * sectorSize; byte[] tmp = new byte[adjustedSize]; diff --git a/cs/src/core/Device/LocalStorageDevice.cs b/cs/src/core/Device/LocalStorageDevice.cs index dcbde31fc..27517b8d7 100644 --- a/cs/src/core/Device/LocalStorageDevice.cs +++ b/cs/src/core/Device/LocalStorageDevice.cs @@ -18,7 +18,8 @@ public class LocalStorageDevice : StorageDeviceBase { private readonly bool preallocateFile; private readonly bool deleteOnClose; - private readonly ConcurrentDictionary logHandles; + private readonly bool disableFileBuffering; + private readonly SafeConcurrentDictionary logHandles; /// /// Constructor @@ -26,13 +27,15 @@ public class LocalStorageDevice : StorageDeviceBase /// /// /// - public LocalStorageDevice(string filename, bool preallocateFile = false, bool deleteOnClose = false) + /// + public LocalStorageDevice(string filename, bool preallocateFile = false, bool deleteOnClose = false, bool disableFileBuffering = true) : base(filename, GetSectorSize(filename)) { Native32.EnableProcessPrivileges(); this.preallocateFile = preallocateFile; this.deleteOnClose = deleteOnClose; - logHandles = new ConcurrentDictionary(); + this.disableFileBuffering = disableFileBuffering; + logHandles = new SafeConcurrentDictionary(); } /// @@ -175,7 +178,11 @@ private SafeFileHandle CreateHandle(int segmentId) uint fileCreation = unchecked((uint)FileMode.OpenOrCreate); uint fileFlags = Native32.FILE_FLAG_OVERLAPPED; - fileFlags = fileFlags | Native32.FILE_FLAG_NO_BUFFERING; + if (this.disableFileBuffering) + { + fileFlags = fileFlags | Native32.FILE_FLAG_NO_BUFFERING; + } + if (deleteOnClose) { fileFlags = fileFlags | Native32.FILE_FLAG_DELETE_ON_CLOSE; diff --git a/cs/src/core/Index/Common/Contexts.cs b/cs/src/core/Index/Common/Contexts.cs index 5d69fbdc7..16c289b0b 100644 --- a/cs/src/core/Index/Common/Contexts.cs +++ b/cs/src/core/Index/Common/Contexts.cs @@ -273,6 +273,10 @@ public struct HybridLogRecoveryInfo /// public long finalLogicalAddress; /// + /// Head address + /// + public long headAddress; + /// /// Guid array /// public Guid[] guids; @@ -299,6 +303,7 @@ public void Initialize(Guid token, int _version) flushedLogicalAddress = 0; startLogicalAddress = 0; finalLogicalAddress = 0; + headAddress = 0; guids = new Guid[LightEpoch.kTableSize + 1]; continueTokens = new Dictionary(); objectLogSegmentOffsets = null; @@ -331,6 +336,9 @@ public void Initialize(StreamReader reader) value = reader.ReadLine(); finalLogicalAddress = long.Parse(value); + value = reader.ReadLine(); + headAddress = long.Parse(value); + value = reader.ReadLine(); numThreads = int.Parse(value); @@ -421,6 +429,7 @@ public void Write(StreamWriter writer) writer.WriteLine(flushedLogicalAddress); writer.WriteLine(startLogicalAddress); writer.WriteLine(finalLogicalAddress); + writer.WriteLine(headAddress); writer.WriteLine(numThreads); for (int i = 0; i < numThreads; i++) { @@ -449,6 +458,7 @@ public void DebugPrint() Debug.WriteLine("Flushed LogicalAddress: {0}", flushedLogicalAddress); Debug.WriteLine("Start Logical Address: {0}", startLogicalAddress); Debug.WriteLine("Final Logical Address: {0}", finalLogicalAddress); + Debug.WriteLine("Head Address: {0}", headAddress); Debug.WriteLine("Num sessions recovered: {0}", numThreads); Debug.WriteLine("Recovered sessions: "); foreach (var sessionInfo in continueTokens) diff --git a/cs/src/core/Index/FASTER/Checkpoint.cs b/cs/src/core/Index/FASTER/Checkpoint.cs index bf26ea730..4d27c1ae8 100644 --- a/cs/src/core/Index/FASTER/Checkpoint.cs +++ b/cs/src/core/Index/FASTER/Checkpoint.cs @@ -251,6 +251,8 @@ private bool GlobalMoveToNextState(SystemState currentState, SystemState nextSta WriteIndexCheckpointCompleteFile(); } + _hybridLogCheckpoint.info.headAddress = hlog.HeadAddress; + if (FoldOverSnapshot) { hlog.ShiftReadOnlyToTail(out long tailAddress); diff --git a/cs/src/core/Index/FASTER/Recovery.cs b/cs/src/core/Index/FASTER/Recovery.cs index 5ac34edc3..72cf67e53 100644 --- a/cs/src/core/Index/FASTER/Recovery.cs +++ b/cs/src/core/Index/FASTER/Recovery.cs @@ -184,7 +184,7 @@ private void InternalRecover(Guid indexToken, Guid hybridLogToken) // Read appropriate hybrid log pages into memory - RestoreHybridLog(recoveredHLCInfo.info.finalLogicalAddress); + RestoreHybridLog(recoveredHLCInfo.info.finalLogicalAddress, recoveredHLCInfo.info.headAddress); // Recover session information _recoveredSessions = new SafeConcurrentDictionary(); @@ -195,53 +195,50 @@ private void InternalRecover(Guid indexToken, Guid hybridLogToken) } } - private void RestoreHybridLog(long untilAddress) + private void RestoreHybridLog(long untilAddress, long headAddress) { - - var tailPage = hlog.GetPage(untilAddress); - var headPage = default(long); - if (untilAddress > hlog.GetStartLogicalAddress(tailPage)) + // Special case: we do not load any records into memory + if (headAddress == untilAddress) { - headPage = (tailPage + 1) - hlog.GetHeadOffsetLagInPages(); ; + hlog.AllocatePage(hlog.GetPageIndexForAddress(headAddress)); } else { - headPage = tailPage - hlog.GetHeadOffsetLagInPages(); - } - headPage = headPage > 0 ? headPage : 0; + var tailPage = hlog.GetPage(untilAddress); + var headPage = hlog.GetPage(headAddress); - var recoveryStatus = new RecoveryStatus(hlog.GetCapacityNumPages(), headPage, tailPage, untilAddress); - for (int i = 0; i < recoveryStatus.capacity; i++) - { - recoveryStatus.readStatus[i] = ReadStatus.Done; - } + var recoveryStatus = new RecoveryStatus(hlog.GetCapacityNumPages(), headPage, tailPage, untilAddress); + for (int i = 0; i < recoveryStatus.capacity; i++) + { + recoveryStatus.readStatus[i] = ReadStatus.Done; + } - var numPages = 0; - for (var page = headPage; page <= tailPage; page++) - { - var pageIndex = hlog.GetPageIndexForPage(page); - recoveryStatus.readStatus[pageIndex] = ReadStatus.Pending; - numPages++; - } + var numPages = 0; + for (var page = headPage; page <= tailPage; page++) + { + var pageIndex = hlog.GetPageIndexForPage(page); + recoveryStatus.readStatus[pageIndex] = ReadStatus.Pending; + numPages++; + } - hlog.AsyncReadPagesFromDevice(headPage, numPages, untilAddress, AsyncReadPagesCallbackForRecovery, recoveryStatus); + hlog.AsyncReadPagesFromDevice(headPage, numPages, untilAddress, AsyncReadPagesCallbackForRecovery, recoveryStatus); - var done = false; - while (!done) - { - done = true; - for (long page = headPage; page <= tailPage; page++) + var done = false; + while (!done) { - int pageIndex = hlog.GetPageIndexForPage(page); - if (recoveryStatus.readStatus[pageIndex] == ReadStatus.Pending) + done = true; + for (long page = headPage; page <= tailPage; page++) { - done = false; - break; + int pageIndex = hlog.GetPageIndexForPage(page); + if (recoveryStatus.readStatus[pageIndex] == ReadStatus.Pending) + { + done = false; + break; + } } } } - var headAddress = hlog.GetFirstValidLogicalAddress(headPage); hlog.RecoveryReset(untilAddress, headAddress); } diff --git a/cs/src/core/Utilities/BufferPool.cs b/cs/src/core/Utilities/BufferPool.cs index 872eb8d4e..cfcb4eb77 100644 --- a/cs/src/core/Utilities/BufferPool.cs +++ b/cs/src/core/Utilities/BufferPool.cs @@ -133,6 +133,11 @@ public void Return(SectorAlignedMemory page) Array.Clear(page.buffer, 0, page.buffer.Length); if (!Disabled) queue[page.level].Enqueue(page); + else + { + page.handle.Free(); + page.buffer = null; + } } [MethodImpl(MethodImplOptions.AggressiveInlining)]