Skip to content

Commit

Permalink
Fix flush with generic allocator so that we do not write invalid data…
Browse files Browse the repository at this point in the history
… to disk in rare cases (record was invalid when making initial image, but (#916)
  • Loading branch information
badrishc authored May 7, 2024
1 parent e9e56ed commit e25782f
Showing 1 changed file with 24 additions and 8 deletions.
32 changes: 24 additions & 8 deletions cs/src/core/Allocator/GenericAllocator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -452,29 +452,46 @@ private void WriteAsync<TContext>(long flushPage, ulong alignedDestinationAddres
}

long endPosition = 0;
for (int i=start/recordSize; i<end/recordSize; i++)
for (int i = start / recordSize; i < end / recordSize; i++)
{
if (!src[i].info.Invalid)
byte* recordPtr = buffer.aligned_pointer + i * recordSize;

// Retrieve reference to record struct
ref var record = ref Unsafe.AsRef<Record<Key, Value>>(recordPtr);
AddressInfo* key_address = null, value_address = null;

// Zero out object reference addresses (AddressInfo) in the planned disk image
if (KeyHasObjects())
{
key_address = GetKeyAddressInfo((long)recordPtr);
*key_address = default;
}
if (ValueHasObjects())
{
value_address = GetValueAddressInfo((long)recordPtr);
*value_address = default;
}

// Now fill in AddressInfo data for the valid records
if (!record.info.Invalid)
{
var address = (flushPage << LogPageSizeBits) + i * recordSize;
if (address < fuzzyStartLogicalAddress || !src[i].info.IsInNewVersion)
if (address < fuzzyStartLogicalAddress || !record.info.IsInNewVersion)
{
if (KeyHasObjects())
{
long pos = ms.Position;
keySerializer.Serialize(ref src[i].key);
var key_address = GetKeyAddressInfo((long)(buffer.aligned_pointer + i * recordSize));
key_address->Address = pos;
key_address->Size = (int)(ms.Position - pos);
addr.Add((long)key_address);
endPosition = pos + key_address->Size;
}

if (ValueHasObjects() && !src[i].info.Tombstone)
if (ValueHasObjects() && !record.info.Tombstone)
{
long pos = ms.Position;
valueSerializer.Serialize(ref src[i].value);
var value_address = GetValueAddressInfo((long)(buffer.aligned_pointer + i * recordSize));
value_address->Address = pos;
value_address->Size = (int)(ms.Position - pos);
addr.Add((long)value_address);
Expand All @@ -484,7 +501,6 @@ private void WriteAsync<TContext>(long flushPage, ulong alignedDestinationAddres
else
{
// Mark v+1 records as invalid to avoid deserializing them on recovery
ref var record = ref Unsafe.AsRef<Record<Key, Value>>(buffer.aligned_pointer + i * recordSize);
record.info.SetInvalid();
}
}
Expand Down Expand Up @@ -846,7 +862,7 @@ public void Deserialize(byte *raw, long ptr, long untilptr, Record<Key, Value>[]
stream.Seek(streamStartPos + key_addr->Address - start_addr, SeekOrigin.Begin);
}

keySerializer.Deserialize(out src[ptr/recordSize].key);
keySerializer.Deserialize(out src[ptr / recordSize].key);
}
else
{
Expand Down

0 comments on commit e25782f

Please sign in to comment.