diff --git a/cs/samples/MemOnlyCache/Program.cs b/cs/samples/MemOnlyCache/Program.cs
index c75c68a8a..8a93c1c35 100644
--- a/cs/samples/MemOnlyCache/Program.cs
+++ b/cs/samples/MemOnlyCache/Program.cs
@@ -16,32 +16,50 @@ class Program
///
/// Total database size
///
- const int DbSize = 10_000_000;
+ static int DbSize = 10_000_000;
+ const string DbSizeArg = "--dbsize";
///
/// Max key size; we choose actual size randomly
///
- const int MaxKeySize = 100;
+ static int MaxKeySize = 100;
+ const string MaxKeySizeArg = "--keysize";
///
/// Max value size; we choose actual size randomly
///
- const int MaxValueSize = 1000;
+ static int MaxValueSize = 1000;
+ const string MaxValueSizeArg = "--valuesize";
///
/// Number of threads accessing FASTER instances
///
- const int kNumThreads = 1;
+ static int NumThreads = 1;
+ const string NumThreadsArg = "-t";
///
/// Percentage of writes in incoming workload requests (remaining are reads)
///
- const int WritePercent = 0;
+ static int WritePercent = 0;
+ const string WritePercentArg = "-w";
///
/// Uniform random distribution (true) or Zipf distribution (false) of requests
///
- const bool UseUniform = false;
+ static bool UseUniform = false;
+ const string UseUniformArg = "-u";
+
+ ///
+ /// Uniform random distribution (true) or Zipf distribution (false) of requests
+ ///
+ static bool UseReadCTT = true;
+ const string NoReadCTTArg = "--noreadctt";
+
+ ///
+ /// Uniform random distribution (true) or Zipf distribution (false) of requests
+ ///
+ static bool UseReadCache = false;
+ const string UseReadCacheArg = "--readcache";
///
/// Skew factor (theta) of Zipf distribution
@@ -62,9 +80,102 @@ class Program
static long statusNotFound = 0;
static long statusFound = 0;
- static void Main()
+ const string HelpArg1 = "-?";
+ const string HelpArg2 = "/?";
+ const string HelpArg3 = "--help";
+ static bool IsHelpArg(string arg) => arg == HelpArg1 || arg == HelpArg2 || arg == HelpArg3;
+
+ private static bool Usage()
+ {
+ Console.WriteLine("Reads 'linked lists' of records for each key by backing up the previous-address chain, including showing record versions");
+ Console.WriteLine("Usage:");
+ Console.WriteLine($" {DbSizeArg}: Total database size. Default = {DbSize}");
+ Console.WriteLine($" {MaxKeySizeArg}: Max key size; we choose actual size randomly. Default = {MaxKeySize}");
+ Console.WriteLine($" {MaxValueSizeArg}: Max value size; we choose actual size randomly. Default = {MaxValueSize}");
+ Console.WriteLine($" {NumThreadsArg}: Number of threads accessing FASTER instances. Default = {NumThreads}");
+ Console.WriteLine($" {WritePercentArg}: Percentage of writes in incoming workload requests (remaining are reads). Default = {WritePercent}");
+ Console.WriteLine($" {UseUniformArg}: Uniform random distribution (true) or Zipf distribution (false) of requests. Default = {UseUniform}");
+ Console.WriteLine($" {NoReadCTTArg}: Copy Reads from Immutable region to tail of log. Default = {!UseReadCTT}");
+ Console.WriteLine($" {UseReadCacheArg}: Use the ReadCache. Default = {UseReadCache}");
+ Console.WriteLine($" {HelpArg1}, {HelpArg2}, {HelpArg3}: This screen.");
+ return false;
+ }
+
+ static bool GetArgs(string[] args)
+ {
+ for (var ii = 0; ii < args.Length; ++ii)
+ {
+ var arg = args[ii].ToLower();
+ var val = "n/a";
+ try
+ {
+ if (IsHelpArg(arg))
+ return Usage();
+
+ // Flag args (no value)
+ if (arg == UseUniformArg)
+ {
+ UseUniform = true;
+ continue;
+ }
+
+ if (arg == NoReadCTTArg)
+ {
+ UseReadCTT = false;
+ continue;
+ }
+
+ if (arg == UseReadCacheArg)
+ {
+ UseReadCache = true;
+ continue;
+ }
+
+ // Args taking a value
+ if (ii >= args.Length - 1)
+ {
+ Console.WriteLine($"Error: End of arg list encountered while processing arg {arg}; expected argument");
+ return false;
+ }
+ val = args[++ii];
+ if (arg == DbSizeArg)
+ {
+ DbSize = int.Parse(val);
+ continue;
+ }
+ if (arg == MaxKeySizeArg)
+ {
+ MaxKeySize = int.Parse(val);
+ continue;
+ }
+ if (arg == NumThreadsArg)
+ {
+ NumThreads = int.Parse(val);
+ continue;
+ }
+ if (arg == WritePercentArg)
+ {
+ WritePercent = int.Parse(val);
+ continue;
+ }
+
+ Console.WriteLine($"Unknown option: {arg}");
+ return Usage();
+ }
+ catch (Exception ex)
+ {
+ Console.WriteLine($"Error: Arg {arg}, value {val} encountered exception: {ex.Message}");
+ return false;
+ }
+ }
+ return true;
+ }
+
+ static void Main(string[] args)
{
// This sample shows the use of FASTER as a concurrent pure in-memory cache
+ if (!GetArgs(args))
+ return;
var log = new NullDevice(); // no storage involved
@@ -73,11 +184,14 @@ static void Main()
{
LogDevice = log, ObjectLogDevice = log,
MutableFraction = 0.9, // 10% of memory log is "read-only region"
- ReadFlags = ReadFlags.CopyReadsToTail, // reads in read-only region are copied to tail
+ ReadFlags = UseReadCTT ? ReadFlags.CopyReadsToTail : ReadFlags.None, // reads in read-only region are copied to tail
PageSizeBits = 14, // Each page is sized at 2^14 bytes
MemorySizeBits = 25, // (2^25 / 24) = ~1.39M key-value pairs (log uses 24 bytes per KV pair)
};
+ if (UseReadCache)
+ logSettings.ReadCacheSettings = new() { MemorySizeBits = logSettings.MemorySizeBits, PageSizeBits = logSettings.PageSizeBits };
+
// Number of records in memory, assuming class keys and values and x64 platform
// (8-byte key + 8-byte value + 8-byte header = 24 bytes per record)
int numRecords = (int)(Math.Pow(2, logSettings.MemorySizeBits) / 24);
@@ -118,13 +232,13 @@ private static void PopulateStore(int count)
private static void ContinuousRandomWorkload()
{
- var threads = new Thread[kNumThreads];
- for (int i = 0; i < kNumThreads; i++)
+ var threads = new Thread[NumThreads];
+ for (int i = 0; i < NumThreads; i++)
{
var x = i;
threads[i] = new Thread(() => RandomWorkload(x));
}
- for (int i = 0; i < kNumThreads; i++)
+ for (int i = 0; i < NumThreads; i++)
threads[i].Start();
Stopwatch sw = new();
@@ -207,6 +321,8 @@ private static void RandomWorkload(int threadid)
if (!status.Found)
{
+ if (status.IsFaulted)
+ throw new Exception("Error!");
localStatusNotFound++;
if (UpsertOnCacheMiss)
{
@@ -214,14 +330,12 @@ private static void RandomWorkload(int threadid)
session.Upsert(ref key, ref value);
}
}
- else if (status.Found)
+ else
{
localStatusFound++;
if (output.value[0] != (byte)key.key)
throw new Exception("Read error!");
}
- else
- throw new Exception("Error!");
}
i++;
}
diff --git a/cs/src/core/Allocator/AllocatorBase.cs b/cs/src/core/Allocator/AllocatorBase.cs
index 863b70cd2..6b575256c 100644
--- a/cs/src/core/Allocator/AllocatorBase.cs
+++ b/cs/src/core/Allocator/AllocatorBase.cs
@@ -21,8 +21,6 @@ internal struct FullPageStatus
[FieldOffset(0)]
public long LastFlushedUntilAddress;
[FieldOffset(8)]
- public long LastClosedUntilAddress;
- [FieldOffset(16)]
public long Dirty;
}
@@ -166,9 +164,15 @@ public abstract partial class AllocatorBase : IDisposable
///
public long BeginAddress;
+ ///
+ /// Address until which we are currently closing. Used to coordinate linear closing of pages.
+ /// Only one thread will be closing pages at a time.
+ ///
+ long OngoingCloseUntilAddress;
+
///
public override string ToString()
- => $"TA {GetTailAddress()}, ROA {ReadOnlyAddress}, SafeROA {SafeReadOnlyAddress}, HA {HeadAddress}, SafeHA {SafeHeadAddress}, CUA {ClosedUntilAddress}, FUA {FlushedUntilAddress}";
+ => $"TA {GetTailAddress()}, ROA {ReadOnlyAddress}, SafeROA {SafeReadOnlyAddress}, HA {HeadAddress}, SafeHA {SafeHeadAddress}, CUA {ClosedUntilAddress}, FUA {FlushedUntilAddress}, BA {BeginAddress}";
#endregion
@@ -933,9 +937,7 @@ public int EmptyPageCount
// Force eviction now if empty page count has increased
if (value >= oldEPC)
{
- bool prot = true;
- if (!epoch.ThisInstanceProtected())
- prot = false;
+ bool prot = epoch.ThisInstanceProtected();
if (!prot) epoch.Resume();
try
@@ -1282,69 +1284,78 @@ private void OnPagesMarkedReadOnly(long newSafeReadOnlyAddress)
}
}
- ///
+ ///
/// Action to be performed for when all threads have
/// agreed that a page range is closed.
///
///
private void OnPagesClosed(long newSafeHeadAddress)
{
+ Debug.Assert(newSafeHeadAddress > 0);
if (Utility.MonotonicUpdate(ref SafeHeadAddress, newSafeHeadAddress, out long oldSafeHeadAddress))
{
- // Also shift begin address if we are using a null storage device
- if (IsNullDevice)
- Utility.MonotonicUpdate(ref BeginAddress, newSafeHeadAddress, out _);
+ // This thread is responsible for [oldSafeHeadAddress -> newSafeHeadAddress]
+ for (; ; Thread.Yield())
+ {
+ long _ongoingCloseUntilAddress = OngoingCloseUntilAddress;
- if (ReadCache)
- EvictCallback(oldSafeHeadAddress, newSafeHeadAddress);
+ // If we are closing in the middle of an ongoing OPCWorker loop, exit.
+ if (_ongoingCloseUntilAddress >= newSafeHeadAddress)
+ break;
- // If we are going to scan, ensure earlier scans are done (we don't want to overwrite a later record with an earlier one)
- if ((OnLockEvictionObserver is not null) || (OnEvictionObserver is not null))
- {
- while (ClosedUntilAddress < oldSafeHeadAddress)
+ // We'll continue the loop if we fail the CAS here; that means another thread extended the Ongoing range.
+ if (Interlocked.CompareExchange(ref OngoingCloseUntilAddress, newSafeHeadAddress, _ongoingCloseUntilAddress) == _ongoingCloseUntilAddress)
{
- epoch.ProtectAndDrain();
- Thread.Yield();
+ if (_ongoingCloseUntilAddress == 0)
+ {
+ // There was no other thread running the OPCWorker loop, so this thread is responsible for closing [ClosedUntilAddress -> newSafeHeadAddress]
+ OnPagesClosedWorker();
+ }
+ else
+ {
+ // There was another thread runnning the OPCWorker loop, and its ongoing close operation was successfully extended to include the new safe
+ // head address; we have no further work here.
+ }
+ return;
}
}
+ }
+ }
+
+ private void OnPagesClosedWorker()
+ {
+ for (; ; Thread.Yield())
+ {
+ long closeStartAddress = ClosedUntilAddress;
+ long closeEndAddress = OngoingCloseUntilAddress;
- for (long closePageAddress = oldSafeHeadAddress & ~PageSizeMask; closePageAddress < newSafeHeadAddress; closePageAddress += PageSize)
+ // If we are using a null storage device, we must also shift BeginAddress
+ if (IsNullDevice)
+ Utility.MonotonicUpdate(ref BeginAddress, closeEndAddress, out _);
+
+ if (ReadCache)
+ EvictCallback(closeStartAddress, closeEndAddress);
+
+ for (long closePageAddress = closeStartAddress & ~PageSizeMask; closePageAddress < closeEndAddress; closePageAddress += PageSize)
{
- long start = oldSafeHeadAddress > closePageAddress ? oldSafeHeadAddress : closePageAddress;
- long end = newSafeHeadAddress < closePageAddress + PageSize ? newSafeHeadAddress : closePageAddress + PageSize;
+ long start = closeStartAddress > closePageAddress ? closeStartAddress : closePageAddress;
+ long end = closeEndAddress < closePageAddress + PageSize ? closeEndAddress : closePageAddress + PageSize;
- // If there are no active locking sessions, there should be no locks in the log.
if (OnLockEvictionObserver is not null)
MemoryPageScan(start, end, OnLockEvictionObserver);
-
if (OnEvictionObserver is not null)
MemoryPageScan(start, end, OnEvictionObserver);
- int closePage = (int)(closePageAddress >> LogPageSizeBits);
- int closePageIndex = closePage % BufferSize;
-
// If the end of the closing range is at the end of the page, free the page
if (end == closePageAddress + PageSize)
- {
- // If the start of the closing range is not at the beginning of this page, spin-wait until the adjacent earlier ranges on this page are closed
- if ((start & PageSizeMask) > 0)
- {
- while (ClosedUntilAddress < start)
- {
- epoch.ProtectAndDrain();
- Thread.Yield();
- }
- }
- FreePage(closePage);
- }
+ FreePage((int)(closePageAddress >> LogPageSizeBits));
- Utility.MonotonicUpdate(ref PageStatusIndicator[closePageIndex].LastClosedUntilAddress, end, out _);
- ShiftClosedUntilAddress();
- if (ClosedUntilAddress > FlushedUntilAddress)
- {
- throw new FasterException($"Closed address {ClosedUntilAddress} exceeds flushed address {FlushedUntilAddress}");
- }
+ Utility.MonotonicUpdate(ref ClosedUntilAddress, end, out _);
}
+
+ // End if we have exhausted co-operative work
+ if (Interlocked.CompareExchange(ref OngoingCloseUntilAddress, 0, closeEndAddress) == closeEndAddress)
+ break;
}
}
@@ -1474,30 +1485,6 @@ protected void ShiftFlushedUntilAddress()
}
}
- ///
- /// Shift ClosedUntil address
- ///
- protected void ShiftClosedUntilAddress()
- {
- long currentClosedUntilAddress = ClosedUntilAddress;
- long page = GetPage(currentClosedUntilAddress);
-
- bool update = false;
- long pageLastClosedAddress = PageStatusIndicator[page % BufferSize].LastClosedUntilAddress;
- while (pageLastClosedAddress >= currentClosedUntilAddress && currentClosedUntilAddress >= (page << LogPageSizeBits))
- {
- currentClosedUntilAddress = pageLastClosedAddress;
- update = true;
- page++;
- pageLastClosedAddress = PageStatusIndicator[(int)(page % BufferSize)].LastClosedUntilAddress;
- }
-
- if (update)
- {
- Utility.MonotonicUpdate(ref ClosedUntilAddress, currentClosedUntilAddress, out _);
- }
- }
-
///
/// Address for notification of flushed-until
///
diff --git a/cs/src/core/Epochs/LightEpoch.cs b/cs/src/core/Epochs/LightEpoch.cs
index 4d2754779..66d752caa 100644
--- a/cs/src/core/Epochs/LightEpoch.cs
+++ b/cs/src/core/Epochs/LightEpoch.cs
@@ -219,6 +219,20 @@ public void Resume()
ProtectAndDrain();
}
+ ///
+ /// Increment global current epoch
+ ///
+ ///
+ long BumpCurrentEpoch()
+ {
+ long nextEpoch = Interlocked.Increment(ref CurrentEpoch);
+
+ if (drainCount > 0)
+ Drain(nextEpoch);
+
+ return nextEpoch;
+ }
+
///
/// Increment current epoch and associate trigger action
/// with the prior epoch
@@ -316,20 +330,6 @@ public bool CheckIsComplete(int markerIdx, long version)
return true;
}
- ///
- /// Increment global current epoch
- ///
- ///
- long BumpCurrentEpoch()
- {
- long nextEpoch = Interlocked.Increment(ref CurrentEpoch);
-
- if (drainCount > 0)
- Drain(nextEpoch);
-
- return nextEpoch;
- }
-
///
/// Looks at all threads and return the latest safe epoch
///
diff --git a/cs/src/core/Index/FASTER/Implementation/ContinuePending.cs b/cs/src/core/Index/FASTER/Implementation/ContinuePending.cs
index b0083c12b..d2c2fd72a 100644
--- a/cs/src/core/Index/FASTER/Implementation/ContinuePending.cs
+++ b/cs/src/core/Index/FASTER/Implementation/ContinuePending.cs
@@ -514,7 +514,7 @@ internal OperationStatus InternalTryCopyToTail(Faste
if (!stackCtx.recSrc.HasInMemorySrc)
{
bool found = this.LockTable.TryGet(ref key, stackCtx.hei.hash, out var ltriLT);
- Debug.Assert(found && ltriLT.IsLocked && !ltriLT.Tentative, "TODO remove: Error--non-InMemorySrc expected to find a non-tentative locked locktable entry");
+ Debug.Assert(found && ltriLT.IsLocked && !ltriLT.Tentative, "Error--non-InMemorySrc expected to find a non-tentative locked locktable entry");
transferred = LockTable.IsActive && LockTable.TransferToLogRecord(ref key, stackCtx.hei.hash, ref newRecordInfo);
Debug.Assert(transferred, "ManualLocking Non-InMemory source should find a LockTable entry to transfer locks from");
}
+#if DEBUG
if (this.LockTable.TryGet(ref key, stackCtx.hei.hash, out var ltri))
- Debug.Assert(!ltri.IsLocked || ltri.Tentative, "TODO remove: Error--existing non-tentative lock in LT after CompleteTwoPhaseUpdate transfer");
+ {
+ // If !recSrc.HasInMemorySrc, then we just did a LockTable transfer to an existing tentative log record, and that tentative record should have
+ // prevented anyone from making a non-tentative LockTable entry. If we recSrc.HasInMemorySrc, then there should never be LockTable entry.
+ Debug.Assert(!ltri.IsLocked || ltri.Tentative, $"Error--existing non-tentative LT entry in CompleteTwoPhaseUpdate transfer; HasInMemSrc = {stackCtx.recSrc.HasInMemorySrc}");
+ }
+#endif
if (!transferred)
newRecordInfo.InitializeLockExclusive();
}
@@ -257,6 +263,8 @@ private bool CompleteTwoPhaseCopyToTail(F
else
{
SpinWaitUntilRecordIsClosed(ref key, stackCtx.hei.hash, stackCtx.recSrc.LogicalAddress, stackCtx.recSrc.Log);
+ if (!VerifyInMemoryAddresses(ref stackCtx))
+ return false;
success = !LockTable.IsActive || LockTable.CompleteTwoPhaseCopyToTail(ref key, stackCtx.hei.hash, ref newRecordInfo,
allowXLock: fasterSession.IsManualLocking, removeEphemeralLock: stackCtx.recSrc.HasInMemoryLock); // we acquired the lock via HasInMemoryLock
}
diff --git a/cs/src/core/Index/FASTER/Implementation/Helpers.cs b/cs/src/core/Index/FASTER/Implementation/Helpers.cs
index 0a7398c83..31e8e4416 100644
--- a/cs/src/core/Index/FASTER/Implementation/Helpers.cs
+++ b/cs/src/core/Index/FASTER/Implementation/Helpers.cs
@@ -92,7 +92,7 @@ static bool IsRecordValid(RecordInfo recordInfo, out OperationStatus status)
}
Debug.Assert(!recordInfo.Tentative, "Tentative bit should have been removed when record was invalidated");
- status = OperationStatus.RETRY_NOW; // An invalid record in the hash chain means the previous lock owner abandoned its operation and we can RETRY_NOW.
+ status = OperationStatus.RETRY_LATER;
return false;
}
@@ -118,8 +118,8 @@ private bool CASRecordIntoChain(ref OperationStackContext stackCtx,
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private bool VerifyInMemoryAddresses(ref OperationStackContext stackCtx, long skipReadCacheStartAddress = Constants.kInvalidAddress)
{
- // If we have an in-memory source that was evicted, return false and the caller will RETRY.
- if (stackCtx.recSrc.InMemorySourceWasEvicted())
+ // If we have an in-memory source that is pending eviction, return false and the caller will RETRY.
+ if (stackCtx.recSrc.InMemorySourceIsBelowHeadAddress())
return false;
// If we're not using readcache or the splice point is still above readcache.HeadAddress, we're good.
@@ -163,8 +163,8 @@ private bool VerifyInMemoryAddresses(ref OperationStackContext stack
// the readcache but is < readcache.HeadAddress, so wait until it is evicted.
SpinWaitUntilAddressIsClosed(stackCtx.recSrc.LatestLogicalAddress, readcache);
- // If we have an in-memory source that was evicted, return false and the caller will RETRY.
- if (stackCtx.recSrc.InMemorySourceWasEvicted())
+ // If we have an in-memory source that is pending eviction, return false and the caller will RETRY.
+ if (stackCtx.recSrc.InMemorySourceIsBelowHeadAddress())
return false;
}
}
diff --git a/cs/src/core/Index/FASTER/Implementation/InternalDelete.cs b/cs/src/core/Index/FASTER/Implementation/InternalDelete.cs
index 94071ffe1..93002b586 100644
--- a/cs/src/core/Index/FASTER/Implementation/InternalDelete.cs
+++ b/cs/src/core/Index/FASTER/Implementation/InternalDelete.cs
@@ -161,8 +161,7 @@ internal OperationStatus InternalDelete(
else
{
// Either on-disk or no record exists - check for lock before creating new record. First ensure any record lock has transitioned to the LockTable.
- if (stackCtx.recSrc.LogicalAddress >= hlog.BeginAddress)
- SpinWaitUntilRecordIsClosed(ref key, stackCtx.hei.hash, stackCtx.recSrc.LogicalAddress, hlog);
+ SpinWaitUntilRecordIsClosed(ref key, stackCtx.hei.hash, stackCtx.recSrc.LogicalAddress, hlog);
Debug.Assert(!fasterSession.IsManualLocking || LockTable.IsLockedExclusive(ref key, stackCtx.hei.hash), "A Lockable-session Delete() of an on-disk or non-existent key requires a LockTable lock");
if (LockTable.IsActive && !fasterSession.DisableEphemeralLocking && !LockTable.TryLockEphemeral(ref key, stackCtx.hei.hash, LockType.Exclusive, out stackCtx.recSrc.HasLockTableLock))
{
diff --git a/cs/src/core/Index/FASTER/Implementation/InternalLock.cs b/cs/src/core/Index/FASTER/Implementation/InternalLock.cs
index a70a87328..5b25ddac9 100644
--- a/cs/src/core/Index/FASTER/Implementation/InternalLock.cs
+++ b/cs/src/core/Index/FASTER/Implementation/InternalLock.cs
@@ -30,8 +30,8 @@ internal OperationStatus InternalLock(ref Key key, LockOperation lockOp, out Rec
// Not in memory. First make sure the record has been transferred to the lock table if we did not find it because it was in the eviction region.
var prevLogHA = hlog.HeadAddress;
var prevReadCacheHA = UseReadCache ? readcache.HeadAddress : 0;
- if (stackCtx.recSrc.LogicalAddress >= stackCtx.recSrc.Log.BeginAddress)
- SpinWaitUntilRecordIsClosed(ref key, stackCtx.hei.hash, stackCtx.recSrc.LogicalAddress, stackCtx.recSrc.Log);
+ Debug.Assert(stackCtx.recSrc.LogicalAddress < stackCtx.recSrc.Log.HeadAddress, "Expected record to be below HeadAddress as we did not find it in-memory");
+ SpinWaitUntilRecordIsClosed(ref key, stackCtx.hei.hash, stackCtx.recSrc.LogicalAddress, stackCtx.recSrc.Log);
// Do LockTable operations
if (lockOp.LockOperationType == LockOperationType.IsLocked)
diff --git a/cs/src/core/Index/FASTER/Implementation/InternalRMW.cs b/cs/src/core/Index/FASTER/Implementation/InternalRMW.cs
index 5b8b446bb..91669b4d8 100644
--- a/cs/src/core/Index/FASTER/Implementation/InternalRMW.cs
+++ b/cs/src/core/Index/FASTER/Implementation/InternalRMW.cs
@@ -220,14 +220,15 @@ internal OperationStatus InternalRMW(
}
else if (stackCtx.recSrc.LogicalAddress >= hlog.BeginAddress)
{
- // Disk Region: Need to issue async io requests. Locking will be check on pending completion.
+ // Disk Region: Need to issue async io requests. Locking will be checked on pending completion.
status = OperationStatus.RECORD_ON_DISK;
latchDestination = LatchDestination.CreatePendingContext;
goto CreatePendingContext;
}
else
{
- // No record exists - check for lock before creating new record.
+ // No record exists - check for lock before creating new record. First ensure any record lock has transitioned to the LockTable.
+ SpinWaitUntilRecordIsClosed(ref key, stackCtx.hei.hash, stackCtx.recSrc.LogicalAddress, hlog);
Debug.Assert(!fasterSession.IsManualLocking || LockTable.IsLockedExclusive(ref key, stackCtx.hei.hash), "A Lockable-session RMW() of an on-disk or non-existent key requires a LockTable lock");
if (LockTable.IsActive && !fasterSession.DisableEphemeralLocking
&& !LockTable.TryLockEphemeral(ref key, stackCtx.hei.hash, LockType.Exclusive, out stackCtx.recSrc.HasLockTableLock))
diff --git a/cs/src/core/Index/FASTER/Implementation/InternalRead.cs b/cs/src/core/Index/FASTER/Implementation/InternalRead.cs
index 156510ef7..c3a8cfb8a 100644
--- a/cs/src/core/Index/FASTER/Implementation/InternalRead.cs
+++ b/cs/src/core/Index/FASTER/Implementation/InternalRead.cs
@@ -122,6 +122,8 @@ internal OperationStatus InternalRead(
#region Normal processing
+ var prevHA = hlog.HeadAddress;
+
// Mutable region (even fuzzy region is included here)
if (stackCtx.recSrc.LogicalAddress >= hlog.SafeReadOnlyAddress)
{
@@ -131,44 +133,46 @@ internal OperationStatus InternalRead(
// Immutable region
else if (stackCtx.recSrc.LogicalAddress >= hlog.HeadAddress)
{
- status = ReadFromImmutableRegion(ref key, ref input, ref output, ref stackCtx, ref pendingContext, fasterSession, sessionCtx);
+ status = ReadFromImmutableRegion(ref key, ref input, ref output, useStartAddress, ref stackCtx, ref pendingContext, fasterSession, sessionCtx);
if (status == OperationStatus.ALLOCATE_FAILED && pendingContext.IsAsync) // May happen due to CopyToTailFromReadOnly
goto CreatePendingContext;
return status;
}
// On-Disk Region
- else if (stackCtx.recSrc.LogicalAddress >= hlog.BeginAddress)
+ else
{
-#if DEBUG
SpinWaitUntilAddressIsClosed(stackCtx.recSrc.LogicalAddress, hlog);
- Debug.Assert(!fasterSession.IsManualLocking || LockTable.IsLocked(ref key, stackCtx.hei.hash), "A Lockable-session Read() of an on-disk key requires a LockTable lock");
-#endif
- // Note: we do not lock here; we wait until reading from disk, then lock in the InternalContinuePendingRead chain.
- if (hlog.IsNullDevice)
- return OperationStatus.NOTFOUND;
- status = OperationStatus.RECORD_ON_DISK;
- if (sessionCtx.phase == Phase.PREPARE)
+ if (stackCtx.recSrc.LogicalAddress >= hlog.BeginAddress)
{
- if (!useStartAddress)
+ Debug.Assert(!fasterSession.IsManualLocking || LockTable.IsLocked(ref key, stackCtx.hei.hash), "A Lockable-session Read() of an on-disk key requires a LockTable lock");
+
+ // Note: we do not lock here; we wait until reading from disk, then lock in the InternalContinuePendingRead chain.
+ if (hlog.IsNullDevice)
+ return OperationStatus.NOTFOUND;
+
+ status = OperationStatus.RECORD_ON_DISK;
+ if (sessionCtx.phase == Phase.PREPARE)
{
- // Failure to latch indicates CPR_SHIFT, but don't hold on to shared latch during IO
- if (HashBucket.TryAcquireSharedLatch(ref stackCtx.hei))
- HashBucket.ReleaseSharedLatch(ref stackCtx.hei);
- else
- return OperationStatus.CPR_SHIFT_DETECTED;
+ if (!useStartAddress)
+ {
+ // Failure to latch indicates CPR_SHIFT, but don't hold on to shared latch during IO
+ if (HashBucket.TryAcquireSharedLatch(ref stackCtx.hei))
+ HashBucket.ReleaseSharedLatch(ref stackCtx.hei);
+ else
+ return OperationStatus.CPR_SHIFT_DETECTED;
+ }
}
- }
-
- goto CreatePendingContext;
- }
- // No record found
- else
- {
- Debug.Assert(!fasterSession.IsManualLocking || LockTable.IsLocked(ref key, stackCtx.hei.hash), "A Lockable-session Read() of a non-existent key requires a LockTable lock");
- return OperationStatus.NOTFOUND;
+ goto CreatePendingContext;
+ }
+ else
+ {
+ // No record found
+ Debug.Assert(!fasterSession.IsManualLocking || LockTable.IsLocked(ref key, stackCtx.hei.hash), "A Lockable-session Read() of a non-existent key requires a LockTable lock");
+ return OperationStatus.NOTFOUND;
+ }
}
#endregion
@@ -289,7 +293,7 @@ private OperationStatus ReadFromMutableRegion(ref Key key, ref Input input, ref Output output,
- ref OperationStackContext stackCtx,
+ bool useStartAddress, ref OperationStackContext stackCtx,
ref PendingContext pendingContext, FasterSession fasterSession,
FasterExecutionContext sessionCtx)
where FasterSession : IFasterSession
@@ -307,7 +311,10 @@ private OperationStatus ReadFromImmutableRegion(fasterSession, ref stackCtx.recSrc, ref srcRecordInfo, out var status))
+ // If we are starting from a specified address in the immutable region, we may have a Sealed record from a previous RCW.
+ // For this case, do not try to lock, EphemeralSUnlock will see that we do not have a lock so will not try to update it.
+ OperationStatus status = OperationStatus.SUCCESS;
+ if (!useStartAddress && !TryEphemeralSLock(fasterSession, ref stackCtx.recSrc, ref srcRecordInfo, out status))
return status;
try
@@ -323,14 +330,10 @@ private OperationStatus ReadFromImmutableRegion(
else
{
// Either on-disk or no record exists - check for lock before creating new record. First ensure any record lock has transitioned to the LockTable.
- if (stackCtx.recSrc.LogicalAddress >= hlog.BeginAddress)
- SpinWaitUntilRecordIsClosed(ref key, stackCtx.hei.hash, stackCtx.recSrc.LogicalAddress, hlog);
+ SpinWaitUntilRecordIsClosed(ref key, stackCtx.hei.hash, stackCtx.recSrc.LogicalAddress, hlog);
Debug.Assert(!fasterSession.IsManualLocking || LockTable.IsLockedExclusive(ref key, stackCtx.hei.hash), "A Lockable-session Upsert() of an on-disk or non-existent key requires a LockTable lock");
if (LockTable.IsActive && !fasterSession.DisableEphemeralLocking
&& !LockTable.TryLockEphemeral(ref key, stackCtx.hei.hash, LockType.Exclusive, out stackCtx.recSrc.HasLockTableLock))
diff --git a/cs/src/core/Index/FASTER/Implementation/ReadCache.cs b/cs/src/core/Index/FASTER/Implementation/ReadCache.cs
index 58ed6ed14..05660ced2 100644
--- a/cs/src/core/Index/FASTER/Implementation/ReadCache.cs
+++ b/cs/src/core/Index/FASTER/Implementation/ReadCache.cs
@@ -335,8 +335,13 @@ internal void ReadCacheEvict(long rcLogicalAddress, long rcToLogicalAddress)
var pa = readcache.GetPhysicalAddress(la);
ref RecordInfo ri = ref readcache.GetInfo(pa);
- // Due to collisions, we can compare the hash code but not the key
- Debug.Assert(comparer.GetHashCode64(ref readcache.GetKey(pa)) == hei.hash, "The keyHash of the hash-chain ReadCache entry does not match the one obtained from the initial readcache address");
+#if DEBUG
+ // Due to collisions, we can compare the hash code *mask* (i.e. the hash bucket index), not the key
+ var mask = state[resizeInfo.version].size_mask;
+ var rc_mask = hei.hash & mask;
+ var pa_mask = comparer.GetHashCode64(ref readcache.GetKey(pa)) & mask;
+ Debug.Assert(rc_mask == pa_mask, "The keyHash mask of the hash-chain ReadCache entry does not match the one obtained from the initial readcache address");
+#endif
// If the record's address is above the eviction range, leave it there and track nextPhysicalAddress.
if (la >= rcToLogicalAddress)
diff --git a/cs/src/core/Index/FASTER/Implementation/RecordSource.cs b/cs/src/core/Index/FASTER/Implementation/RecordSource.cs
index f2b90c240..0e54310cc 100644
--- a/cs/src/core/Index/FASTER/Implementation/RecordSource.cs
+++ b/cs/src/core/Index/FASTER/Implementation/RecordSource.cs
@@ -1,6 +1,7 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT license.
+using System.Diagnostics;
using System.Runtime.CompilerServices;
using static FASTER.core.Utility;
@@ -117,7 +118,7 @@ internal void MarkSourceRecordAfterSuccessfulCopyUpdate this.HasInMemorySrc && this.LogicalAddress < this.Log.HeadAddress;
+ internal bool InMemorySourceIsBelowHeadAddress() => this.HasInMemorySrc && this.LogicalAddress < this.Log.HeadAddress;
+
+ [Conditional("DEBUG")]
+ internal void AssertInMemorySourceWasNotEvicted()
+ {
+ if (this.HasInMemorySrc)
+ {
+ // We should have called VerifyInMemoryAddresses when starting this operation to verify we were above HeadAddress.
+ // After that, HeadAddress may be increased by another session, but we should always hold the epoch here and thus
+ // OnPagesClosed (which does the actual eviction) cannot be called.
+
+ // This should not be called on failure/retry, or it will fire spuriously. For example:
+ // - Lock a record that is on the next page to be evicted
+ // - Call BlockAllocate, which evicts that page
+ // - This will then fail the subsequent VerifyInMemoryAddresses call, because the record is now below HeadAddress
+ // In this case, the record has been legitimately evicted.
+ Debug.Assert(this.LogicalAddress >= this.Log.ClosedUntilAddress, "Record should always be in memory at this point, regardless of HeadAddress");
+ }
+ }
public override string ToString()
{
diff --git a/cs/src/core/Utilities/LockUtility.cs b/cs/src/core/Utilities/LockUtility.cs
index aa361312a..b190fb8aa 100644
--- a/cs/src/core/Utilities/LockUtility.cs
+++ b/cs/src/core/Utilities/LockUtility.cs
@@ -31,7 +31,7 @@ internal static bool HandleIntermediate(this ref RecordInfo recordInfo, out Oper
Thread.Yield();
// A record is only Sealed or Invalidated in the hash chain after the new record has been successfully inserted.
- internalStatus = OperationStatus.RETRY_NOW; // New record was inserted, so no epoch refresh is needed here.
+ internalStatus = OperationStatus.RETRY_LATER;
return true;
}
internalStatus = OperationStatus.SUCCESS;