diff --git a/cs/samples/StoreVarLenTypes/AsciiSumSample.cs b/cs/samples/StoreVarLenTypes/AsciiSumSample.cs index 1311c066e..31b3551c0 100644 --- a/cs/samples/StoreVarLenTypes/AsciiSumSample.cs +++ b/cs/samples/StoreVarLenTypes/AsciiSumSample.cs @@ -59,7 +59,7 @@ public static void Run() store.Log.FlushAndEvict(true); // Flush and evict all records to disk var _status = s.RMW(_key, _input); // CopyUpdater to 270 (due to immutable source value on disk) - if (_status.IsPending) + if (!_status.IsPending) { Console.WriteLine("Error!"); return; diff --git a/cs/src/core/Allocator/AllocatorBase.cs b/cs/src/core/Allocator/AllocatorBase.cs index 4546f4328..863b70cd2 100644 --- a/cs/src/core/Allocator/AllocatorBase.cs +++ b/cs/src/core/Allocator/AllocatorBase.cs @@ -166,6 +166,10 @@ public abstract partial class AllocatorBase : IDisposable /// public long BeginAddress; + /// + public override string ToString() + => $"TA {GetTailAddress()}, ROA {ReadOnlyAddress}, SafeROA {SafeReadOnlyAddress}, HA {HeadAddress}, SafeHA {SafeHeadAddress}, CUA {ClosedUntilAddress}, FUA {FlushedUntilAddress}"; + #endregion #region Protected device info @@ -1294,13 +1298,23 @@ private void OnPagesClosed(long newSafeHeadAddress) if (ReadCache) EvictCallback(oldSafeHeadAddress, newSafeHeadAddress); +                // If we are going to scan, ensure earlier scans are done (we don't want to overwrite a later record with an earlier one) +                if ((OnLockEvictionObserver is not null) || (OnEvictionObserver is not null)) + { + while (ClosedUntilAddress < oldSafeHeadAddress) + { + epoch.ProtectAndDrain(); + Thread.Yield(); + } + } + for (long closePageAddress = oldSafeHeadAddress & ~PageSizeMask; closePageAddress < newSafeHeadAddress; closePageAddress += PageSize) { long start = oldSafeHeadAddress > closePageAddress ? oldSafeHeadAddress : closePageAddress; long end = newSafeHeadAddress < closePageAddress + PageSize ? newSafeHeadAddress : closePageAddress + PageSize; // If there are no active locking sessions, there should be no locks in the log. - if (this.NumActiveLockingSessions > 0 && OnLockEvictionObserver is not null) + if (OnLockEvictionObserver is not null) MemoryPageScan(start, end, OnLockEvictionObserver); if (OnEvictionObserver is not null) diff --git a/cs/src/core/Allocator/LockEvictionObserver.cs b/cs/src/core/Allocator/LockEvictionObserver.cs index 8f4b2f943..e50008533 100644 --- a/cs/src/core/Allocator/LockEvictionObserver.cs +++ b/cs/src/core/Allocator/LockEvictionObserver.cs @@ -26,14 +26,10 @@ public void OnNext(IFasterScanIterator iter) { while (iter.GetNext(out RecordInfo info)) { - // If it is not Invalid, we must Seal it so there is no possibility it will be missed while we're in the process - // of transferring it to the Lock Table. Use manualLocking as we want to transfer the locks, not drain them. - if (!info.IsLocked) - continue; - - // Now get it into the lock table, so it is ready as soon as the record is removed. - // We do not have to worry about conflicts with other threads, because lock and unlock stop at HeadAddress. - this.store.LockTable.TransferFromLogRecord(ref iter.GetKey(), info); + // Note: we do not have to worry about conflicts with other threads, because other operations + // (data operations and lock and unlock) stop at HeadAddress. + if (info.IsLocked) + this.store.LockTable.TransferFromLogRecord(ref iter.GetKey(), info); } } diff --git a/cs/src/core/ClientSession/LockableContext.cs b/cs/src/core/ClientSession/LockableContext.cs index 0fb94d4ba..89351cec5 100644 --- a/cs/src/core/ClientSession/LockableContext.cs +++ b/cs/src/core/ClientSession/LockableContext.cs @@ -49,9 +49,8 @@ public unsafe void Lock(ref Key key, LockType lockType) LockOperation lockOp = new(LockOperationType.Lock, lockType); OperationStatus status; - bool oneMiss = false; do - status = clientSession.fht.InternalLock(ref key, lockOp, ref oneMiss, out _); + status = clientSession.fht.InternalLock(ref key, lockOp, out _); while (clientSession.fht.HandleImmediateNonPendingRetryStatus(status, clientSession.ctx, FasterSession)); Debug.Assert(status == OperationStatus.SUCCESS); @@ -80,9 +79,8 @@ public void Unlock(ref Key key, LockType lockType) LockOperation lockOp = new(LockOperationType.Unlock, lockType); OperationStatus status; - bool oneMiss = false; do - status = clientSession.fht.InternalLock(ref key, lockOp, ref oneMiss, out _); + status = clientSession.fht.InternalLock(ref key, lockOp, out _); while (clientSession.fht.HandleImmediateNonPendingRetryStatus(status, clientSession.ctx, FasterSession)); Debug.Assert(status == OperationStatus.SUCCESS); @@ -112,9 +110,8 @@ public void Unlock(ref Key key, LockType lockType) OperationStatus status; RecordInfo lockInfo; - bool oneMiss = false; do - status = clientSession.fht.InternalLock(ref key, lockOp, ref oneMiss, out lockInfo); + status = clientSession.fht.InternalLock(ref key, lockOp, out lockInfo); while (clientSession.fht.HandleImmediateNonPendingRetryStatus(status, clientSession.ctx, FasterSession)); Debug.Assert(status == OperationStatus.SUCCESS); return (lockInfo.IsLockedExclusive, lockInfo.NumLockedShared); @@ -667,13 +664,13 @@ public void CheckpointCompletionCallback(int sessionID, string sessionName, Comm #region Ephemeral locking public bool TryLockEphemeralExclusive(ref RecordInfo recordInfo) { - Debug.Assert(recordInfo.IsLockedExclusive, "Attempting to use a non-XLocked key in a Lockable context (requesting XLock)"); + Debug.Assert(recordInfo.IsLockedExclusive, $"Attempting to use a non-XLocked key in a Lockable context (requesting XLock): XLocked {recordInfo.IsLockedExclusive}, Slocked {recordInfo.NumLockedShared}"); return true; } public bool TryLockEphemeralShared(ref RecordInfo recordInfo) { - Debug.Assert(recordInfo.IsLocked, "Attempting to use a non-Locked (S or X) key in a Lockable context (requesting SLock)"); + Debug.Assert(recordInfo.IsLocked, $"Attempting to use a non-Locked (S or X) key in a Lockable context (requesting SLock): XLocked {recordInfo.IsLockedExclusive}, Slocked {recordInfo.NumLockedShared}"); return true; } diff --git a/cs/src/core/ClientSession/LockableUnsafeContext.cs b/cs/src/core/ClientSession/LockableUnsafeContext.cs index 7d89abc9a..85bb87cdf 100644 --- a/cs/src/core/ClientSession/LockableUnsafeContext.cs +++ b/cs/src/core/ClientSession/LockableUnsafeContext.cs @@ -58,9 +58,8 @@ public unsafe void Lock(ref Key key, LockType lockType) LockOperation lockOp = new(LockOperationType.Lock, lockType); OperationStatus status; - bool oneMiss = false; do - status = clientSession.fht.InternalLock(ref key, lockOp, ref oneMiss, out _); + status = clientSession.fht.InternalLock(ref key, lockOp, out _); while (clientSession.fht.HandleImmediateNonPendingRetryStatus(status, clientSession.ctx, FasterSession)); Debug.Assert(status == OperationStatus.SUCCESS); @@ -82,9 +81,8 @@ public void Unlock(ref Key key, LockType lockType) LockOperation lockOp = new(LockOperationType.Unlock, lockType); OperationStatus status; - bool oneMiss = false; do - status = clientSession.fht.InternalLock(ref key, lockOp, ref oneMiss, out _); + status = clientSession.fht.InternalLock(ref key, lockOp, out _); while (clientSession.fht.HandleImmediateNonPendingRetryStatus(status, clientSession.ctx, FasterSession)); Debug.Assert(status == OperationStatus.SUCCESS); @@ -107,9 +105,8 @@ public void Unlock(ref Key key, LockType lockType) OperationStatus status; RecordInfo lockInfo; - bool oneMiss = false; do - status = clientSession.fht.InternalLock(ref key, lockOp, ref oneMiss, out lockInfo); + status = clientSession.fht.InternalLock(ref key, lockOp, out lockInfo); while (clientSession.fht.HandleImmediateNonPendingRetryStatus(status, clientSession.ctx, FasterSession)); Debug.Assert(status == OperationStatus.SUCCESS); return (lockInfo.IsLockedExclusive, lockInfo.NumLockedShared); @@ -576,13 +573,13 @@ public void CheckpointCompletionCallback(int sessionID, string sessionName, Comm #region Ephemeral locking public bool TryLockEphemeralExclusive(ref RecordInfo recordInfo) { - Debug.Assert(recordInfo.IsLockedExclusive, "Attempting to use a non-XLocked key in a Lockable context (requesting XLock)"); + Debug.Assert(recordInfo.IsLockedExclusive, $"Attempting to use a non-XLocked key in a Lockable context (requesting XLock): XLocked {recordInfo.IsLockedExclusive}, Slocked {recordInfo.NumLockedShared}"); return true; } public bool TryLockEphemeralShared(ref RecordInfo recordInfo) { - Debug.Assert(recordInfo.IsLocked, "Attempting to use a non-Locked (S or X) key in a Lockable context (requesting SLock)"); + Debug.Assert(recordInfo.IsLocked, $"Attempting to use a non-Locked (S or X) key in a Lockable context (requesting SLock): XLocked {recordInfo.IsLockedExclusive}, Slocked {recordInfo.NumLockedShared}"); return true; } diff --git a/cs/src/core/Index/Common/RecordInfo.cs b/cs/src/core/Index/Common/RecordInfo.cs index b6881c488..0990de1f3 100644 --- a/cs/src/core/Index/Common/RecordInfo.cs +++ b/cs/src/core/Index/Common/RecordInfo.cs @@ -129,9 +129,10 @@ public void UnlockExclusive() /// /// Whether lock was acquired successfully [MethodImpl(MethodImplOptions.AggressiveInlining)] - public bool TryLockExclusive() + public bool TryLockExclusive(bool tentative = false) { int spinCount = Constants.kMaxLockSpins; + long tentativeBit = tentative ? kTentativeBitMask : 0; // Acquire exclusive lock (readers may still be present; we'll drain them later) while (true) @@ -141,7 +142,7 @@ public bool TryLockExclusive() return false; if ((expected_word & kExclusiveLockBitMask) == 0) { - if (expected_word == Interlocked.CompareExchange(ref word, expected_word | kExclusiveLockBitMask, expected_word)) + if (expected_word == Interlocked.CompareExchange(ref word, expected_word | kExclusiveLockBitMask | tentativeBit, expected_word)) break; } if (spinCount > 0 && --spinCount <= 0) @@ -154,8 +155,11 @@ public bool TryLockExclusive() { if ((word & kSharedLockMaskInWord) == 0) { - // Someone else may have transferred/invalidated the record while we were draining reads. - return !IsIntermediateOrInvalidWord(this.word); + // Someone else may have transferred/invalidated the record while we were draining reads. *Don't* check for Tentative here; + // we may have set it above, and no record should be set to tentative after it's been inserted into the hash chain. + if ((this.word & (kSealedBitMask | kValidBitMask)) == kValidBitMask) + return true; + break; } Thread.Yield(); } @@ -165,7 +169,7 @@ public bool TryLockExclusive() for (; ; Thread.Yield()) { long expected_word = word; - if (Interlocked.CompareExchange(ref word, expected_word & ~kExclusiveLockBitMask, expected_word) == expected_word) + if (Interlocked.CompareExchange(ref word, expected_word & ~(kExclusiveLockBitMask | tentativeBit), expected_word) == expected_word) break; } return false; @@ -189,7 +193,7 @@ public bool TryUnlockShared() /// /// Whether lock was acquired successfully [MethodImpl(MethodImplOptions.AggressiveInlining)] - public bool TryLockShared() + public bool TryLockShared(bool tentative = false) { int spinCount = Constants.kMaxLockSpins; @@ -202,7 +206,9 @@ public bool TryLockShared() if (((expected_word & kExclusiveLockBitMask) == 0) // not exclusively locked && (expected_word & kSharedLockMaskInWord) != kSharedLockMaskInWord) // shared lock is not full { - if (expected_word == Interlocked.CompareExchange(ref word, expected_word + kSharedLockIncrement, expected_word)) + // If there are no shared locks, this one will be tentative if requested. Otherwise, do not force existing locks to be tentative. + long tentativeBit = tentative && ((expected_word & kSharedLockMaskInWord) == 0) ? kTentativeBitMask : 0; + if (expected_word == Interlocked.CompareExchange(ref word, (expected_word + kSharedLockIncrement) | tentativeBit, expected_word)) break; } if (spinCount > 0 && --spinCount <= 0) @@ -214,19 +220,19 @@ public bool TryLockShared() // For new records, which don't need the Interlocked overhead. [MethodImpl(MethodImplOptions.AggressiveInlining)] - internal void InitializeLock(LockType lockType) + internal void InitializeLock(LockType lockType, bool tentative) { if (lockType == LockType.Shared) - this.InitializeLockShared(); + this.InitializeLockShared(tentative); else - this.InitializeLockExclusive(); + this.InitializeLockExclusive(tentative); } [MethodImpl(MethodImplOptions.AggressiveInlining)] - internal void InitializeLockShared() => this.word += kSharedLockIncrement; + internal void InitializeLockShared(bool tentative = false) => this.word += kSharedLockIncrement | (tentative ? kTentativeBitMask : 0); [MethodImpl(MethodImplOptions.AggressiveInlining)] - internal void InitializeLockExclusive() => this.word |= kExclusiveLockBitMask; + internal void InitializeLockExclusive(bool tentative = false) => this.word |= kExclusiveLockBitMask | (tentative ? kTentativeBitMask : 0); /// /// Try to reset the modified bit of the RecordInfo diff --git a/cs/src/core/Index/FASTER/FASTER.cs b/cs/src/core/Index/FASTER/FASTER.cs index 6222f4562..b465def3e 100644 --- a/cs/src/core/Index/FASTER/FASTER.cs +++ b/cs/src/core/Index/FASTER/FASTER.cs @@ -223,7 +223,7 @@ public FasterKV(long size, LogSettings logSettings, sectorSize = (int)logSettings.LogDevice.SectorSize; Initialize(size, sectorSize); - this.LockTable = new LockTable(lockTableSize, this.comparer, keyLen, keyLen is null ? null : hlog.bufferPool); + this.LockTable = new LockTable(lockTableSize, this.comparer, keyLen); systemState = SystemState.Make(Phase.REST, 1); @@ -859,8 +859,8 @@ private unsafe long GetEntryCount() for (int bucket_entry = 0; bucket_entry < Constants.kOverflowBucketIndex; ++bucket_entry) if (b.bucket_entries[bucket_entry] >= beginAddress) ++total_entry_count; - if (b.bucket_entries[Constants.kOverflowBucketIndex] == 0) break; - b = *((HashBucket*)overflowBucketsAllocator.GetPhysicalAddress((b.bucket_entries[Constants.kOverflowBucketIndex]))); + if ((b.bucket_entries[Constants.kOverflowBucketIndex] & Constants.kAddressMask) == 0) break; + b = *(HashBucket*)overflowBucketsAllocator.GetPhysicalAddress(b.bucket_entries[Constants.kOverflowBucketIndex] & Constants.kAddressMask); } } return total_entry_count; @@ -894,8 +894,8 @@ private unsafe string DumpDistributionInternal(int version) ++total_record_count; } } - if (b.bucket_entries[Constants.kOverflowBucketIndex] == 0) break; - b = *((HashBucket*)overflowBucketsAllocator.GetPhysicalAddress((b.bucket_entries[Constants.kOverflowBucketIndex]))); + if ((b.bucket_entries[Constants.kOverflowBucketIndex] & Constants.kAddressMask) == 0) break; + b = *(HashBucket*)overflowBucketsAllocator.GetPhysicalAddress(b.bucket_entries[Constants.kOverflowBucketIndex] & Constants.kAddressMask); } if (!histogram.ContainsKey(cnt)) histogram[cnt] = 0; diff --git a/cs/src/core/Index/FASTER/Implementation/ContinuePending.cs b/cs/src/core/Index/FASTER/Implementation/ContinuePending.cs index 0d32d42d1..19c584cca 100644 --- a/cs/src/core/Index/FASTER/Implementation/ContinuePending.cs +++ b/cs/src/core/Index/FASTER/Implementation/ContinuePending.cs @@ -37,22 +37,13 @@ internal OperationStatus InternalContinuePendingRead currentCtx) where FasterSession : IFasterSession { - ref RecordInfo recordInfo = ref hlog.GetInfoFromBytePointer(request.record.GetValidPointer()); + ref RecordInfo srcRecordInfo = ref hlog.GetInfoFromBytePointer(request.record.GetValidPointer()); + Debug.Assert(!srcRecordInfo.IsIntermediate, "Should always retrieve a non-Tentative, non-Sealed record from disk"); if (request.logicalAddress >= hlog.BeginAddress) { SpinWaitUntilClosed(request.logicalAddress); - // This should never be Tentative, so it doesn't matter it's the request record's RecordInfo. - if (recordInfo.IsIntermediate(out var status)) - { - Debug.Assert(!recordInfo.Tentative && recordInfo.Sealed, "Should have non-Tentative Sealed record here"); - if (!HandleImmediateRetryStatus(status, currentCtx, currentCtx, fasterSession, ref pendingContext)) - return status; - } - if (recordInfo.Tombstone) - goto NotFound; - // If NoKey, we do not have the key in the initial call and must use the key from the satisfied request. ref Key key = ref pendingContext.NoKey ? ref hlog.GetContextRecordKey(ref request) : ref pendingContext.key.Get(); OperationStackContext stackCtx = new(comparer.GetHashCode64(ref key)); @@ -64,37 +55,38 @@ internal OperationStatus InternalContinuePendingRead(fasterSession, ref key, ref stackCtx, LockType.Shared, pendingContext.PrevHighestKeyHashAddress); + // During the pending operation, the record may have been added to any of the possible locations. + var status = TryFindAndEphemeralLockRecord(fasterSession, ref key, ref stackCtx, LockType.Shared, pendingContext.PrevHighestKeyHashAddress); if (status != OperationStatus.SUCCESS) { if (HandleImmediateRetryStatus(status, currentCtx, currentCtx, fasterSession, ref pendingContext)) continue; return status; } - if (stackCtx.recSrc.HasReadCacheSrc) + if (stackCtx.recSrc.HasInMemorySrc) srcRecordInfo = ref stackCtx.recSrc.GetSrcRecordInfo(); try { + // Wait until after locking to check this. + if (srcRecordInfo.Tombstone) + goto NotFound; + ReadInfo readInfo = new() { SessionType = fasterSession.SessionType, Version = ctx.version, Address = request.logicalAddress, - RecordInfo = recordInfo + RecordInfo = srcRecordInfo }; ref var value = ref hlog.GetContextRecordValue(ref request); var expired = false; - if (!fasterSession.SingleReader(ref key, ref pendingContext.input.Get(), ref value, ref pendingContext.output, ref recordInfo, ref readInfo)) + if (!fasterSession.SingleReader(ref key, ref pendingContext.input.Get(), ref value, ref pendingContext.output, ref srcRecordInfo, ref readInfo)) { if (readInfo.Action == ReadAction.CancelOperation) { - pendingContext.recordInfo = recordInfo; + pendingContext.recordInfo = srcRecordInfo; return OperationStatus.CANCELED; } if (readInfo.Action != ReadAction.Expire) @@ -120,20 +112,20 @@ internal OperationStatus InternalContinuePendingRead(fasterSession, ref key, ref stackCtx, ref srcRecordInfo); + EphemeralSUnlockAfterPendingIO(fasterSession, ctx, ref pendingContext, ref key, ref stackCtx, ref srcRecordInfo); } } // end while (true) } NotFound: - pendingContext.recordInfo = recordInfo; + pendingContext.recordInfo = srcRecordInfo; return OperationStatus.NOTFOUND; } @@ -179,16 +171,10 @@ internal OperationStatus InternalContinuePendingRMW stackCtx = new(comparer.GetHashCode64(ref key)); + OperationStatus status; while (true) { @@ -199,14 +185,15 @@ internal OperationStatus InternalContinuePendingRMW(fasterSession, ref key, ref stackCtx, LockType.Exclusive, pendingContext.PrevHighestKeyHashAddress); + // During the pending operation, the record may have been added to any of the possible locations. + status = TryFindAndEphemeralLockRecord(fasterSession, ref key, ref stackCtx, LockType.Exclusive, pendingContext.PrevHighestKeyHashAddress); if (status != OperationStatus.SUCCESS) { if (HandleImmediateRetryStatus(status, sessionCtx, sessionCtx, fasterSession, ref pendingContext)) continue; return status; } - if (stackCtx.recSrc.HasReadCacheSrc) + if (stackCtx.recSrc.HasInMemorySrc) srcRecordInfo = ref stackCtx.recSrc.GetSrcRecordInfo(); try @@ -311,7 +298,7 @@ internal OperationStatus InternalCopyToTailForCompaction untilAddress) // Same check ITCTT does status = OperationStatus.NOTFOUND; @@ -341,7 +328,7 @@ internal OperationStatus InternalCopyToTailForCompaction(fasterSession, ref key, ref stackCtx, ref srcRecordInfo); + EphemeralSUnlockAfterPendingIO(fasterSession, currentCtx, ref pendingContext, ref key, ref stackCtx, ref srcRecordInfo); } } } while (HandleImmediateRetryStatus(status, currentCtx, currentCtx, fasterSession, ref pendingContext)); @@ -463,7 +450,7 @@ internal OperationStatus InternalTryCopyToTail(fasterSession, ref stackCtx, lockType); + + if (LockTable.IsActive && !fasterSession.DisableEphemeralLocking && !LockTable.TryLockEphemeral(ref key, stackCtx.hei.hash, lockType, out stackCtx.recSrc.HasLockTableLock)) + return OperationStatus.RETRY_LATER; + return OperationStatus.SUCCESS; + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private OperationStatus TryFindAndEphemeralLockRecord( + FasterSession fasterSession, ref Key key, ref OperationStackContext stackCtx, + LockType lockType, long prevHighestKeyHashAddress = Constants.kInvalidAddress) + where FasterSession : IFasterSession + { + var internalStatus = TryFindAndEphemeralLockAuxiliaryRecord(fasterSession, ref key, ref stackCtx, lockType, prevHighestKeyHashAddress); + if (stackCtx.recSrc.HasSrc) + return internalStatus; + + if (!TryFindRecordInMainLog(ref key, ref stackCtx, minOffset: hlog.HeadAddress, waitForTentative: true)) + return OperationStatus.SUCCESS; + return TryLockInMemoryRecord(fasterSession, ref stackCtx, lockType); + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private static OperationStatus TryLockInMemoryRecord(FasterSession fasterSession, ref OperationStackContext stackCtx, LockType lockType) where FasterSession : IFasterSession + { + ref var recordInfo = ref stackCtx.recSrc.GetSrcRecordInfo(); + var ok = lockType == LockType.Shared + ? fasterSession.TryLockEphemeralShared(ref recordInfo) + : fasterSession.TryLockEphemeralExclusive(ref recordInfo); + if (!ok) + return OperationStatus.RETRY_LATER; + stackCtx.recSrc.HasInMemoryLock = !fasterSession.DisableEphemeralLocking; return OperationStatus.SUCCESS; } @@ -39,7 +57,10 @@ static bool TryEphemeralXLock(FasterSessi { status = OperationStatus.SUCCESS; if (fasterSession.DisableEphemeralLocking) + { + Debug.Assert(!fasterSession.IsManualLocking || recordInfo.IsLockedExclusive, $"Attempting to use a non-XLocked key in a Manual Locking context (requesting XLock): XLocked {recordInfo.IsLockedExclusive}, Slocked {recordInfo.NumLockedShared}"); return true; + } // A failed lockOp means this is an intermediate record, e.g. Tentative or Sealed, or we exhausted the spin count. All these must RETRY_LATER. if (!fasterSession.TryLockEphemeralExclusive(ref recordInfo)) @@ -57,7 +78,10 @@ static bool TryEphemeralSLock(FasterSessi { status = OperationStatus.SUCCESS; if (fasterSession.DisableEphemeralLocking) + { + Debug.Assert(!fasterSession.IsManualLocking || recordInfo.IsLocked, $"Attempting to use a non-Locked (S or X) key in a Manual Locking context (requesting SLock): XLocked {recordInfo.IsLockedExclusive}, Slocked {recordInfo.NumLockedShared}"); return true; + } // A failed lockOp means this is an intermediate record, e.g. Tentative or Sealed, or we exhausted the spin count. All these must RETRY_LATER. if (!fasterSession.TryLockEphemeralShared(ref recordInfo)) @@ -70,7 +94,10 @@ static bool TryEphemeralSLock(FasterSessi } [MethodImpl(MethodImplOptions.AggressiveInlining)] - void EphemeralSUnlock(ref Key key, ref OperationStackContext stackCtx, ref RecordInfo recordInfo) + void EphemeralSUnlock(FasterSession fasterSession, FasterExecutionContext currentCtx, + ref PendingContext pendingContext, + ref Key key, ref OperationStackContext stackCtx, ref RecordInfo recordInfo) + where FasterSession : IFasterSession { if (!stackCtx.recSrc.HasInMemoryLock) return; @@ -79,18 +106,16 @@ void EphemeralSUnlock(ref Key key, ref OperationStackContext stackCt // be transferred from the readcache to the main log (or even to the LockTable, if the record was in the (SafeHeadAddress, ClosedUntilAddress) // interval when a Read started). - // If the record dived below HeadAddress, we must wait for it to enter the lock table before unlocking. - if (stackCtx.recSrc.LogicalAddress < stackCtx.recSrc.Log.HeadAddress) + // If the record dived below HeadAddress, we must wait for it to enter the lock table before unlocking; InternalLock does this (and starts + // by searching the in-memory space first, which is good because the record may have been transferred). + // If RecordInfo unlock fails, the locks were transferred to another recordInfo; do InternalLock to chase the key through the full process. + OperationStatus status; + do { - SpinWaitUntilRecordIsClosed(ref key, stackCtx.hei.hash, stackCtx.recSrc.LogicalAddress, stackCtx.recSrc.Log); - LockTable.Unlock(ref key, stackCtx.hei.hash, LockType.Shared); - } - else if (!recordInfo.TryUnlockShared()) - { - // Normal unlock failed, so the locks were transferred to another recordInfo; do a standard unlock to chase the key through the full process. - bool oneMiss = false; - InternalLock(ref key, new(LockOperationType.Unlock, LockType.Shared), ref oneMiss, out _); - } + if (stackCtx.recSrc.LogicalAddress >= stackCtx.recSrc.Log.HeadAddress && recordInfo.TryUnlockShared()) + break; + status = InternalLock(ref key, new(LockOperationType.Unlock, LockType.Shared), out _); + } while (HandleImmediateRetryStatus(status, currentCtx, currentCtx, fasterSession, ref pendingContext)); stackCtx.recSrc.HasInMemoryLock = false; } @@ -105,7 +130,11 @@ private void EphemeralXUnlockAfterUpdate( return; } - // Unlock exclusive locks, if any. + // Unlock exclusive locks, if any. Exclusive locks are different from shared locks, in that Shared locks can be transferred + // (due to CopyToTail or ReadCache) while the lock is held. Exclusive locks pin the lock in place: + // - The owning thread ensures that no epoch refresh is done, so there is no eviction if it is in memory + // - Other threads will attempt (and fail) to lock it in memory or in the locktable, until we release it. + // - This means there can be no transfer *from* the locktable while the XLock is held if (stackCtx.recSrc.HasInMemoryLock) { // This unlocks the source (old) record; the new record may already be operated on by other threads, which is fine. @@ -142,8 +171,9 @@ private void EphemeralXUnlockAndAbandonUpdate(FasterSession fasterSession, ref Key key, - ref OperationStackContext stackCtx, ref RecordInfo srcRecordInfo) + private void EphemeralSUnlockAfterPendingIO(FasterSession fasterSession, + FasterExecutionContext currentCtx, ref PendingContext pendingContext, + ref Key key, ref OperationStackContext stackCtx, ref RecordInfo srcRecordInfo) where FasterSession : IFasterSession { if (fasterSession.DisableEphemeralLocking) @@ -156,7 +186,7 @@ private void EphemeralSUnlockAfterPendingIO(Faste ref RecordInfo srcRecordInfo, ref RecordInfo newRecordInfo, out OperationStatus status) where FasterSession : IFasterSession { - // We don't check for ephemeral xlocking here; we know we had that lock, but we don't need to actually lock the new record because + // We don't check for ephem eral xlocking here; we know we had that lock, but we don't need to actually lock the new record because // we know this is the last step and we are going to unlock it immediately; it is protected until we remove the Tentative bit. if (fasterSession.IsManualLocking) { - // For manual locking, we should already have made sure there is an XLock for this. Preserve it on the new record. - // If there is a LockTable entry, transfer from it (which will remove it from the LockTable); otherwise just set the bit directly. - if (!LockTable.IsActive || !LockTable.TransferToLogRecord(ref key, stackCtx.hei.hash, ref newRecordInfo)) + // For manual locking, we should already have made sure there is an XLock for this, and must preserve it on the new record. + // If we do not have an in-memory source there should be a LockTable entry; transfer from it (which will remove it from the LockTable). + // Otherwise (we do have an in-memory source) just set the bit directly; we'll mark and clear the IM source below. + bool transferred = false; + if (!stackCtx.recSrc.HasInMemorySrc) + { + bool found = this.LockTable.TryGet(ref key, stackCtx.hei.hash, out var ltriLT); + Debug.Assert(found && ltriLT.IsLocked && !ltriLT.Tentative, "TODO remove: Error--non-InMemorySrc expected to find a non-tentative locked locktable entry"); + + transferred = LockTable.IsActive && LockTable.TransferToLogRecord(ref key, stackCtx.hei.hash, ref newRecordInfo); + Debug.Assert(transferred, "ManualLocking Non-InMemory source should find a LockTable entry to transfer locks from"); + } + if (this.LockTable.TryGet(ref key, stackCtx.hei.hash, out var ltri)) + Debug.Assert(!ltri.IsLocked || ltri.Tentative, "TODO remove: Error--existing non-tentative lock in LT after CompleteTwoPhaseUpdate transfer"); + if (!transferred) newRecordInfo.InitializeLockExclusive(); } else if ((LockTable.IsActive && !LockTable.CompleteTwoPhaseUpdate(ref key, stackCtx.hei.hash)) @@ -224,13 +266,13 @@ private bool CompleteTwoPhaseCopyToTail(F { if (fasterSession.IsManualLocking) { - // For manual locking, we should already have made sure there is at least an SLock for this; with no HasInMemorySrc, it is in the Lock Table. + // For manual locking, we should already have made sure there is at least an SLock for this; since there is no HasInMemorySrc, it is in the Lock Table. if (LockTable.IsActive) LockTable.TransferToLogRecord(ref key, stackCtx.hei.hash, ref newRecordInfo); } else { - // XLocks are not allowed, because another thread owns them. + // XLocks are not allowed here in the ephemeral section, because another thread owns them (ephemeral locking only takes a read lock for operations that end up here). success = (!LockTable.IsActive || LockTable.CompleteTwoPhaseCopyToTail(ref key, stackCtx.hei.hash, ref newRecordInfo, allowXLock: fasterSession.IsManualLocking, removeEphemeralLock: stackCtx.recSrc.HasLockTableLock)) && diff --git a/cs/src/core/Index/FASTER/Implementation/EpochOperations.cs b/cs/src/core/Index/FASTER/Implementation/EpochOperations.cs index c01a506e1..d30873157 100644 --- a/cs/src/core/Index/FASTER/Implementation/EpochOperations.cs +++ b/cs/src/core/Index/FASTER/Implementation/EpochOperations.cs @@ -81,10 +81,7 @@ void SpinWaitUntilRecordIsClosed(ref Key key, long keyHash, long logicalAddress, if (logicalAddress < log.ClosedUntilAddress) break; - // If the Lock Table contains the key, then the record was transferred but ClosedUntilAddress has not yet been updated - // because OnPagesClosed is waiting for the full page to be done. - if (LockTable.ContainsKey(ref key, keyHash)) - break; + // Note: We cannot jump out here if the Lock Table contains the key, because it may be an older version of the record. } } diff --git a/cs/src/core/Index/FASTER/Implementation/FindRecord.cs b/cs/src/core/Index/FASTER/Implementation/FindRecord.cs index 4b0585c81..05fb0b90a 100644 --- a/cs/src/core/Index/FASTER/Implementation/FindRecord.cs +++ b/cs/src/core/Index/FASTER/Implementation/FindRecord.cs @@ -1,6 +1,7 @@ // Copyright (c) Microsoft Corporation. All rights reserved. // Licensed under the MIT license. +using System.Diagnostics; using System.Runtime.CompilerServices; using static FASTER.core.LockUtility; @@ -9,15 +10,22 @@ namespace FASTER.core public unsafe partial class FasterKV : FasterBase, IFasterKV { [MethodImpl(MethodImplOptions.AggressiveInlining)] - private bool FindRecordInMemory(ref Key key, ref OperationStackContext stackCtx, long minOffset, bool waitForTentative = true) + private bool TryFindRecordInMemory(ref Key key, ref OperationStackContext stackCtx, long minOffset, bool waitForTentative = true) { if (!UseReadCache || !FindInReadCache(ref key, ref stackCtx, untilAddress: Constants.kInvalidAddress)) { - if (stackCtx.recSrc.LogicalAddress >= hlog.HeadAddress) - { - stackCtx.recSrc.PhysicalAddress = hlog.GetPhysicalAddress(stackCtx.recSrc.LogicalAddress); - TraceBackForKeyMatch(ref key, ref stackCtx.recSrc, minOffset, waitForTentative); - } + TryFindRecordInMainLog(ref key, ref stackCtx, minOffset, waitForTentative); + } + return stackCtx.recSrc.HasInMemorySrc; + } + + private bool TryFindRecordInMainLog(ref Key key, ref OperationStackContext stackCtx, long minOffset, bool waitForTentative) + { + Debug.Assert(!stackCtx.recSrc.HasInMemorySrc, "Should not have found record before this call"); + if (stackCtx.recSrc.LogicalAddress >= hlog.HeadAddress) + { + stackCtx.recSrc.PhysicalAddress = hlog.GetPhysicalAddress(stackCtx.recSrc.LogicalAddress); + TraceBackForKeyMatch(ref key, ref stackCtx.recSrc, minOffset, waitForTentative); } return stackCtx.recSrc.HasInMemorySrc; } diff --git a/cs/src/core/Index/FASTER/Implementation/HashEntryInfo.cs b/cs/src/core/Index/FASTER/Implementation/HashEntryInfo.cs index c773e73c9..6c650374d 100644 --- a/cs/src/core/Index/FASTER/Implementation/HashEntryInfo.cs +++ b/cs/src/core/Index/FASTER/Implementation/HashEntryInfo.cs @@ -46,6 +46,11 @@ internal HashEntryInfo(long hash) internal long CurrentAddress => new HashBucketEntry() { word = this.bucket->bucket_entries[this.slot] }.Address; internal long AbsoluteCurrentAddress => Utility.AbsoluteAddress(this.CurrentAddress); + /// + /// Return whether the has been updated + /// + internal bool IsNotCurrent => this.CurrentAddress != this.Address; + /// /// Whether the original address for this hash entry (at the time of FindTag, etc.) is a readcache address. /// diff --git a/cs/src/core/Index/FASTER/Implementation/Helpers.cs b/cs/src/core/Index/FASTER/Implementation/Helpers.cs index b4b18bffc..99f5e7814 100644 --- a/cs/src/core/Index/FASTER/Implementation/Helpers.cs +++ b/cs/src/core/Index/FASTER/Implementation/Helpers.cs @@ -123,7 +123,7 @@ static bool IsRecordValid(RecordInfo recordInfo, out OperationStatus status) internal void SetRecordInvalid(long logicalAddress) { // This is called on exception recovery for a tentative record. - var localLog = UseReadCache && IsReadCache(logicalAddress) ? readcache : hlog; + var localLog = IsReadCache(logicalAddress) ? readcache : hlog; ref var recordInfo = ref localLog.GetInfo(localLog.GetPhysicalAddress(AbsoluteAddress(logicalAddress))); Debug.Assert(recordInfo.Tentative, "Expected tentative record in SetRecordInvalid"); recordInfo.SetInvalid(); diff --git a/cs/src/core/Index/FASTER/Implementation/InternalDelete.cs b/cs/src/core/Index/FASTER/Implementation/InternalDelete.cs index 1b389bdf2..4d2092812 100644 --- a/cs/src/core/Index/FASTER/Implementation/InternalDelete.cs +++ b/cs/src/core/Index/FASTER/Implementation/InternalDelete.cs @@ -70,7 +70,7 @@ internal OperationStatus InternalDelete( stackCtx.SetRecordSourceToHashEntry(hlog); // We must always scan to HeadAddress; a Lockable*Context could be activated and lock the record in the immutable region while we're scanning. - FindRecordInMemory(ref key, ref stackCtx, hlog.HeadAddress); + TryFindRecordInMemory(ref key, ref stackCtx, hlog.HeadAddress); DeleteInfo deleteInfo = new() { @@ -351,7 +351,7 @@ private OperationStatus CreateNewRecordDelete : FasterBase, IFasterKV /// key of the record. /// Lock operation being done. - /// Indicates whether we had a missing record once before. We go around to the top to retry once if an expected LockTable record does not exist; - /// this handles the race where we try to unlock as lock records are transferred out of the lock table, but the caller got in before we inserted the Tentative record. /// Receives the recordInfo of the record being locked [MethodImpl(MethodImplOptions.AggressiveInlining)] - internal OperationStatus InternalLock(ref Key key, LockOperation lockOp, ref bool oneMiss, out RecordInfo lockInfo) + internal OperationStatus InternalLock(ref Key key, LockOperation lockOp, out RecordInfo lockInfo) { - Debug.Assert(epoch.ThisInstanceProtected(), "InternalLock must have epoch protected"); + Debug.Assert(epoch.ThisInstanceProtected(), "InternalLock must have protected epoch"); - OperationStackContext stackCtx = new (comparer.GetHashCode64(ref key)); + OperationStackContext stackCtx = new(comparer.GetHashCode64(ref key)); FindTag(ref stackCtx.hei); stackCtx.SetRecordSourceToHashEntry(hlog); - lockInfo = default; - if (FindRecordInMemory(ref key, ref stackCtx, minOffset: hlog.HeadAddress)) - { - ref RecordInfo recordInfo = ref stackCtx.recSrc.GetSrcRecordInfo(); - if (!recordInfo.IsIntermediate(out OperationStatus status)) - { - if (lockOp.LockOperationType == LockOperationType.IsLocked) - status = OperationStatus.SUCCESS; - else if (!recordInfo.TryLockOperation(lockOp)) - return OperationStatus.RETRY_LATER; - // TODO: Consider eliding the record (as in InternalRMW) from the hash table if we are X-unlocking a Tombstoned record. - } - if (lockOp.LockOperationType == LockOperationType.IsLocked) - lockInfo = recordInfo; - return status; - } + // If the record is in memory, then there can't be a LockTable lock. + if (TryFindAndLockRecordInMemory(ref key, lockOp, out lockInfo, ref stackCtx, out OperationStatus lockStatus)) + return lockStatus; - // Not in memory. Do LockTable operations + // Not in memory. First make sure the record has been transferred to the lock table if we did not find it because it was in the eviction region. + var prevLogHA = hlog.HeadAddress; + var prevReadCacheHA = UseReadCache ? readcache.HeadAddress : 0; + if (stackCtx.recSrc.LogicalAddress >= stackCtx.recSrc.Log.BeginAddress) + SpinWaitUntilRecordIsClosed(ref key, stackCtx.hei.hash, stackCtx.recSrc.LogicalAddress, stackCtx.recSrc.Log); + + // Do LockTable operations if (lockOp.LockOperationType == LockOperationType.IsLocked) return (!this.LockTable.IsActive || this.LockTable.TryGet(ref key, stackCtx.hei.hash, out lockInfo)) ? OperationStatus.SUCCESS : OperationStatus.RETRY_LATER; @@ -50,16 +41,43 @@ internal OperationStatus InternalLock(ref Key key, LockOperation lockOp, ref boo { if (this.LockTable.Unlock(ref key, stackCtx.hei.hash, lockOp.LockType)) return OperationStatus.SUCCESS; - if (oneMiss) + + // We may need to recheck in-memory, due to a race where, when T1 started this InternalLock call, the key was not in the hash table + // (it was a nonexistent key) but was in the LockTable: + // T1 did TryFindAndUnlockRecordInMemory above, and did not find the key in the hash table + // T2 did an Upsert of the key, which inserted a tentative entry into the log, then transferred the lock from the LockTable to that log record + // Or, T2 completed a pending Read and did CopyToTail or CopyToReadCache + // T1 would fail LockTable.Unlock and leave a locked record in the log + // If the address in the HashEntryInfo has changed, or if hei has a readcache address and either we can't navigate from the lowest readcache + // address (due to it being below HeadAddress) or its previous address does not point to the same address as when we started (which means a + // new log entry was spliced in, then we retry in-memory. + if (stackCtx.hei.IsNotCurrent || + (stackCtx.hei.IsReadCache + && (stackCtx.recSrc.LowestReadCachePhysicalAddress < readcache.HeadAddress + || readcache.GetInfo(stackCtx.recSrc.LowestReadCachePhysicalAddress).PreviousAddress != stackCtx.recSrc.LatestLogicalAddress))) { - Debug.Fail("Trying to unlock a nonexistent key"); - return OperationStatus.SUCCESS; // SUCCEED so we don't continue the loop; TODO change to OperationStatus.NOTFOUND and return false from Lock API + stackCtx.hei.SetToCurrent(); + stackCtx.SetRecordSourceToHashEntry(hlog); + if (TryFindAndLockRecordInMemory(ref key, lockOp, out lockInfo, ref stackCtx, out lockStatus)) + return lockStatus; + + // If the HeadAddresses have changed, then the key may have dropped below it and was/will be evicted back to the LockTable. + if (hlog.HeadAddress != prevLogHA || (UseReadCache && readcache.HeadAddress != prevReadCacheHA)) + return OperationStatus.RETRY_LATER; } - oneMiss = true; - return OperationStatus.RETRY_NOW; // oneMiss does not need an epoch refresh as there should be a (possibly tentative) record inserted at tail + + Debug.Fail("Trying to unlock a nonexistent key"); + return OperationStatus.SUCCESS; // SUCCEED so we don't continue the loop; TODO change to OperationStatus.NOTFOUND and return false from Lock API } - // Try to lock + // Try to lock. One of the following things can happen here: + // - We find a record in the LockTable and: + // - It is tentative; we fail the lock and return RETRY_LATER + // - It is not tentative; we either: + // - Succeed with the lock (probably an additional S lock) and return SUCCESS + // - Fail the lock and return RETRY_LATER + // - The LockTable failed to insert a record + // - We did not find a record so we added one, so proceed with two-phase insert protocol below. if (!this.LockTable.TryLockManual(ref key, stackCtx.hei.hash, lockOp.LockType, out bool tentativeLock)) return OperationStatus.RETRY_LATER; @@ -71,19 +89,26 @@ internal OperationStatus InternalLock(ref Key key, LockOperation lockOp, ref boo // First look in the readcache, then in memory. If there's any record there, Tentative or not, we back off this lock and retry. // The way two-phase insertion to the log (or readcache) works, the inserters will see our LockTable record and wait for it to become - // non-tentative, then see if the lock was permanent. If so, we won the race here, and it must be assumed our caller proceeded under + // non-tentative, which means the lock is permanent. If so, we won the race here, and it must be assumed our caller proceeded under // the assumption they had the lock. (Otherwise, we remove the lock table entry here, and the other thread proceeds). That means we // can't wait for tentative records here; that would deadlock (we wait for them to become non-tentative and they wait for us to become // non-tentative). So we must bring the records back here even if they are tentative, then bail on them. + // Note: We don't use TryFindRecordInMemory here because we only want to scan the tail portion of the hash chain; we've already searched + // below that, with the TryFindAndLockRecordInMemory call above. var found = false; - if (UseReadCache && stackCtx2.hei.IsReadCache && (!stackCtx.hei.IsReadCache || stackCtx2.hei.Address > stackCtx.hei.Address)) + if (stackCtx2.hei.IsReadCache && (!stackCtx.hei.IsReadCache || stackCtx2.hei.Address > stackCtx.hei.Address)) { + // stackCtx2 has readcache records. If stackCtx.hei is a readcache record, then we just have to search down to that record; + // otherwise we search the entire readcache. We only need to find the latest logical address if stackCtx.hei is *not* a readcache record. var untilAddress = stackCtx.hei.IsReadCache ? stackCtx.hei.Address : Constants.kInvalidAddress; found = FindInReadCache(ref key, ref stackCtx2, untilAddress, alwaysFindLatestLA: !stackCtx.hei.IsReadCache, waitForTentative: false); } if (!found) { + // Search the main log. Since we did not find the key in the readcache, we have either: + // - stackCtx.hei is not a readcache record: we have the most current LowestReadCache info in stackCtx2 (which may be none, if there are no readcache records) + // - stackCtx.hei is a readcache record: stackCtx2 stopped searching before that, so stackCtx1 has the most recent readcache info var lowestRcPhysicalAddress = stackCtx.hei.IsReadCache ? stackCtx.recSrc.LowestReadCachePhysicalAddress : stackCtx2.recSrc.LowestReadCachePhysicalAddress; var latestlogicalAddress = lowestRcPhysicalAddress != 0 ? readcache.GetInfo(lowestRcPhysicalAddress).PreviousAddress : stackCtx2.hei.Address; if (latestlogicalAddress > stackCtx.recSrc.LatestLogicalAddress) @@ -101,15 +126,36 @@ internal OperationStatus InternalLock(ref Key key, LockOperation lockOp, ref boo } // Success - if (tentativeLock) - { - if (this.LockTable.ClearTentativeBit(ref key, stackCtx.hei.hash)) - return OperationStatus.SUCCESS; + if (tentativeLock && !this.LockTable.ClearTentativeBit(ref key, stackCtx.hei.hash)) + return OperationStatus.RETRY_LATER; // The tentative record was not found, so the lock has not been done; retry + return OperationStatus.SUCCESS; + } - Debug.Fail("Should have found our tentative record"); - return OperationStatus.RETRY_NOW; // The tentative record was not there, so someone else removed it; retry does not need epoch refresh + /// Locks the record if it can find it in memory. + /// True if the key was found in memory, else false. 'lockStatus' returns the lock status, if found, else should be ignored. + private bool TryFindAndLockRecordInMemory(ref Key key, LockOperation lockOp, out RecordInfo lockInfo, ref OperationStackContext stackCtx, out OperationStatus lockStatus) + { + lockInfo = default; + if (TryFindRecordInMemory(ref key, ref stackCtx, minOffset: hlog.HeadAddress)) + { + ref RecordInfo recordInfo = ref stackCtx.recSrc.GetSrcRecordInfo(); + if (!recordInfo.IsIntermediate(out lockStatus)) + { + if (lockOp.LockOperationType == LockOperationType.IsLocked) + lockStatus = OperationStatus.SUCCESS; + else if (!recordInfo.TryLockOperation(lockOp)) + { + // TODO: Consider eliding the record (as in InternalRMW) from the hash table if we are X-unlocking a Tombstoned record. + lockStatus = OperationStatus.RETRY_LATER; + return true; + } + } + if (lockOp.LockOperationType == LockOperationType.IsLocked) + lockInfo = recordInfo; + return true; } - return OperationStatus.SUCCESS; + lockStatus = OperationStatus.SUCCESS; + return false; } } } diff --git a/cs/src/core/Index/FASTER/Implementation/InternalRMW.cs b/cs/src/core/Index/FASTER/Implementation/InternalRMW.cs index ec0216624..6c695cf0e 100644 --- a/cs/src/core/Index/FASTER/Implementation/InternalRMW.cs +++ b/cs/src/core/Index/FASTER/Implementation/InternalRMW.cs @@ -139,7 +139,7 @@ internal OperationStatus InternalRMW( // InternalContinuePendingRMW can stop comparing keys immediately above this address. long prevHighestKeyHashAddress = stackCtx.hei.Address; - FindRecordInMemory(ref key, ref stackCtx, hlog.HeadAddress); + TryFindRecordInMemory(ref key, ref stackCtx, hlog.HeadAddress); RMWInfo rmwInfo = new() { @@ -494,7 +494,7 @@ private OperationStatus CreateNewRecordRMW( var useStartAddress = startAddress != Constants.kInvalidAddress && !pendingContext.HasMinAddress; if (!useStartAddress) { - if (!FindTag(ref stackCtx.hei) || ((!UseReadCache || !stackCtx.hei.IsReadCache) && stackCtx.hei.Address < pendingContext.minAddress)) + if (!FindTag(ref stackCtx.hei) || (!stackCtx.hei.IsReadCache && stackCtx.hei.Address < pendingContext.minAddress)) return OperationStatus.NOTFOUND; prevHighestKeyHashAddress = stackCtx.hei.Address; } @@ -140,8 +140,10 @@ internal OperationStatus InternalRead( // On-Disk Region else if (stackCtx.recSrc.LogicalAddress >= hlog.BeginAddress) { +#if DEBUG + SpinWaitUntilAddressIsClosed(stackCtx.recSrc.LogicalAddress, hlog); Debug.Assert(!fasterSession.IsManualLocking || LockTable.IsLocked(ref key, stackCtx.hei.hash), "A Lockable-session Read() of an on-disk key requires a LockTable lock"); - +#endif // Note: we do not lock here; we wait until reading from disk, then lock in the InternalContinuePendingRead chain. if (hlog.IsNullDevice) return OperationStatus.NOTFOUND; @@ -233,7 +235,7 @@ private bool ReadFromCache(ref Key key, r } finally { - EphemeralSUnlock(ref key, ref stackCtx, ref srcRecordInfo); + EphemeralSUnlock(fasterSession, sessionCtx, ref pendingContext, ref key, ref stackCtx, ref srcRecordInfo); } } return false; @@ -282,7 +284,7 @@ private OperationStatus ReadFromMutableRegion( stackCtx.SetRecordSourceToHashEntry(hlog); // We must always scan to HeadAddress; a Lockable*Context could be activated and lock the record in the immutable region while we're scanning. - FindRecordInMemory(ref key, ref stackCtx, hlog.HeadAddress); + TryFindRecordInMemory(ref key, ref stackCtx, hlog.HeadAddress); UpsertInfo upsertInfo = new() { @@ -343,7 +343,7 @@ private OperationStatus CreateNewRecordUpsert HasInMemorySrc || HasLockTableLock; internal bool HasInMemorySrc => HasMainLogSrc || HasReadCacheSrc; + internal bool HasLock => HasInMemoryLock || HasLockTableLock; /// /// Initialize to the latest logical address from the caller. @@ -134,6 +135,7 @@ internal void MarkSourceRecordAfterSuccessfulCopyUpdate THeapKey CreateHeapKey(ref TKey key); + ref TKey GetHeapKeyRef(THeapKey heapKey); + bool IsActive(ref TValue value); bool Equals(ref TKey key, THeapKey heapKey); @@ -754,7 +756,8 @@ private void AddEntry(ref TKey key, ref TFuncs funcs, ref InMemKVBucket< where TFuncs : IAddEntryFunctions { // At this point we know we will add an entry, so pre-increment the active-bucket count to avoid a race where we add the entry but - // another thread does not see it immediately because this will become the first active bucket. + // another thread does not see it immediately because this will become the first active bucket. The XLock on the bucket during membership + // changes means this will only be done by the correct thread. if (!wasActive) IncrementActiveBuckets(); diff --git a/cs/src/core/Utilities/LockTable.cs b/cs/src/core/Utilities/LockTable.cs index 7c86ee94b..741fe92eb 100644 --- a/cs/src/core/Utilities/LockTable.cs +++ b/cs/src/core/Utilities/LockTable.cs @@ -11,25 +11,26 @@ namespace FASTER.core { internal class LockTable : IDisposable { - private const int kLockSpinCount = 10; - #region IInMemKVUserFunctions implementation - internal class LockTableFunctions : IInMemKVUserFunctions, RecordInfo> + internal class LockTableFunctions : IInMemKVUserFunctions, RecordInfo>, IDisposable { private readonly IFasterEqualityComparer keyComparer; private readonly IVariableLengthStruct keyLen; private readonly SectorAlignedBufferPool bufferPool; - internal LockTableFunctions(IFasterEqualityComparer keyComparer, IVariableLengthStruct keyLen, SectorAlignedBufferPool bufferPool) + internal LockTableFunctions(IFasterEqualityComparer keyComparer, IVariableLengthStruct keyLen) { this.keyComparer = keyComparer; this.keyLen = keyLen; - this.bufferPool = bufferPool; + if (keyLen is not null) + this.bufferPool = new SectorAlignedBufferPool(1, 1); } public IHeapContainer CreateHeapKey(ref TKey key) => bufferPool is null ? new StandardHeapContainer(ref key) : new VarLenHeapContainer(ref key, keyLen, bufferPool); + public ref TKey GetHeapKeyRef(IHeapContainer heapKey) => ref heapKey.Get(); + public bool Equals(ref TKey key, IHeapContainer heapKey) => keyComparer.Equals(ref key, ref heapKey.Get()); public long GetHashCode64(ref TKey key) => keyComparer.GetHashCode64(ref key); @@ -42,15 +43,20 @@ public void Dispose(ref IHeapContainer key, ref RecordInfo recordInfo) key = default; recordInfo = default; } + + public void Dispose() + { + this.bufferPool?.Free(); + } } #endregion IInMemKVUserFunctions implementation internal readonly InMemKV, RecordInfo, LockTableFunctions> kv; internal readonly LockTableFunctions functions; - internal LockTable(int numBuckets, IFasterEqualityComparer keyComparer, IVariableLengthStruct keyLen, SectorAlignedBufferPool bufferPool) + internal LockTable(int numBuckets, IFasterEqualityComparer keyComparer, IVariableLengthStruct keyLen) { - this.functions = new(keyComparer, keyLen, bufferPool); + this.functions = new(keyComparer, keyLen); this.kv = new InMemKV, RecordInfo, LockTableFunctions>(numBuckets, numBuckets >> 4, this.functions); } @@ -122,16 +128,18 @@ internal FindOrAddEntryFunctions_ManualLock(LockType lockType, bool isTentative) public void FoundEntry(ref TKey key, ref RecordInfo recordInfo) { - // If the entry is already there, we lock it non-tentatively - success = recordInfo.TryLock(lockType); - isTentative = false; + // If the entry is already there, we lock it, tentatively if specified + success = (lockType == LockType.Shared) + ? recordInfo.TryLockShared(isTentative) // This will only be tentative if there are no other locks at the time we finally acquire the lock + : recordInfo.TryLockExclusive(isTentative); + isTentative = recordInfo.Tentative; } public void AddedEntry(ref TKey key, ref RecordInfo recordInfo) { - recordInfo.InitializeLock(lockType); + recordInfo.InitializeLock(lockType, isTentative); + isTentative = recordInfo.Tentative; recordInfo.Valid = true; - recordInfo.Tentative = isTentative; success = true; } } @@ -169,8 +177,11 @@ internal FindEntryFunctions_Unlock(LockType lockType, bool wasTentative = false) public void FoundEntry(ref TKey key, ref RecordInfo recordInfo) { - Debug.Assert(wasTentative == recordInfo.Tentative, "entry.recordInfo.Tentative was not as expected"); - Debug.Assert(!recordInfo.Tentative || recordInfo.IsLockedExclusive, "A Tentative entry should be X locked"); + // If the record is xlocked and tentative, it means RecordInfo.TryLockExclusive is spinning in its "drain the readers" loop. +#if DEBUG + var ri = recordInfo; // Need a local for atomic comparisons; "ref recordInfo" state can change between the "&&" + Debug.Assert(wasTentative == ri.Tentative || lockType == LockType.Shared && ri.Tentative && ri.IsLockedExclusive, "Entry.recordInfo.Tentative was not as expected"); +#endif success = recordInfo.TryUnlock(lockType); } } @@ -202,13 +213,15 @@ public void FoundEntry(ref TKey key, ref RecordInfo recordInfo) } /// - /// Transfer locks from the record into the lock table. + /// Transfer locks from the Log record into the lock table. Called on eviction from main log or readcache. /// /// The key to unlock /// The log record to copy from [MethodImpl(MethodImplOptions.AggressiveInlining)] public void TransferFromLogRecord(ref TKey key, RecordInfo logRecordInfo) { + Debug.Assert(!logRecordInfo.IsIntermediate, "Should not have a transfer from an intermediate log record"); + // This is called from record eviction, which doesn't have a hashcode available, so we have to calculate it here. long hash = functions.GetHashCode64(ref key); var funcs = new AddEntryFunctions_TransferFromLogRecord(logRecordInfo); @@ -229,7 +242,7 @@ public void AddedEntry(ref TKey key, ref RecordInfo recordInfo) } /// - /// Transfer locks from the record into the lock table. + /// Transfer locks from the lock table into the Log record. /// /// The key to unlock /// The hash code of the key to lock, to avoid recalculating @@ -238,7 +251,7 @@ public void AddedEntry(ref TKey key, ref RecordInfo recordInfo) [MethodImpl(MethodImplOptions.AggressiveInlining)] public bool TransferToLogRecord(ref TKey key, long hash, ref RecordInfo logRecordInfo) { - Debug.Assert(logRecordInfo.Tentative, "Must retain tentative flag until locks are transferred"); + Debug.Assert(logRecordInfo.Tentative, "Caller must retain tentative flag in log record until locks are transferred"); var funcs = new FindEntryFunctions_TransferToLogRecord(logRecordInfo); kv.FindEntry(ref key, hash, ref funcs); logRecordInfo = funcs.toRecordInfo; @@ -258,9 +271,10 @@ internal FindEntryFunctions_TransferToLogRecord(RecordInfo toRI) public void NotFound(ref TKey key) { } - public void FoundEntry(ref TKey key, ref RecordInfo logRecordInfo) + public void FoundEntry(ref TKey key, ref RecordInfo recordInfo) { - toRecordInfo.TransferLocksFrom(ref logRecordInfo); + Debug.Assert(!recordInfo.Tentative, "Should not transfer from a tentative LT record"); + toRecordInfo.TransferLocksFrom(ref recordInfo); wasFound = true; } } @@ -275,6 +289,7 @@ public bool ClearTentativeBit(ref TKey key, long hash) { var funcs = new FindEntryFunctions_ClearTentative(); kv.FindEntry(ref key, hash, ref funcs); + Debug.Assert(funcs.success, "ClearTentativeBit should always find the entry"); return funcs.success; } @@ -355,7 +370,7 @@ public bool CompleteTwoPhaseCopyToTail(ref TKey key, long hash, ref RecordInfo l internal struct FindEntryFunctions_CompleteTwoPhaseCopyToTail : InMemKV, RecordInfo, LockTableFunctions>.IFindEntryFunctions { - internal RecordInfo toRecordInfo; // TODO: C# 11 will let this be a ref field + internal RecordInfo toRecordInfo; private readonly bool allowXLock, removeEphemeralLock; internal bool success; @@ -369,8 +384,8 @@ internal FindEntryFunctions_CompleteTwoPhaseCopyToTail(RecordInfo toRecordInfo, public void NotFound(ref TKey key) => success = true; - public void FoundEntry(ref TKey key, ref RecordInfo logRecordInfo) - => success = toRecordInfo.TransferReadLocksFromAndMarkSourceAtomic(ref logRecordInfo, allowXLock, seal: false, this.removeEphemeralLock); + public void FoundEntry(ref TKey key, ref RecordInfo recordInfo) + => success = toRecordInfo.TransferReadLocksFromAndMarkSourceAtomic(ref recordInfo, allowXLock, seal: false, this.removeEphemeralLock); } /// @@ -382,7 +397,11 @@ public void FoundEntry(ref TKey key, ref RecordInfo logRecordInfo) [MethodImpl(MethodImplOptions.AggressiveInlining)] public bool ContainsKey(ref TKey key, long hash) => kv.ContainsKey(ref key, hash); - public void Dispose() => kv.Dispose(); + public void Dispose() + { + kv.Dispose(); + functions.Dispose(); + } #region Internal methods for Test internal bool HasEntries(ref TKey key) => kv.HasEntries(ref key); diff --git a/cs/test/LockTableTests.cs b/cs/test/LockTableTests.cs index 7ae36c229..51ac2aa63 100644 --- a/cs/test/LockTableTests.cs +++ b/cs/test/LockTableTests.cs @@ -31,7 +31,7 @@ internal class LockTableTests public void Setup() { DeleteDirectory(MethodTestDir); - lockTable = new(Constants.kDefaultLockTableSize, new SingleBucketComparer(), bufferPool: null, keyLen: null); + lockTable = new(Constants.kDefaultLockTableSize, new SingleBucketComparer(), keyLen: null); } [TearDown] diff --git a/cs/test/MemoryLogCompactionTests.cs b/cs/test/MemoryLogCompactionTests.cs index a0be7f289..991f951fa 100644 --- a/cs/test/MemoryLogCompactionTests.cs +++ b/cs/test/MemoryLogCompactionTests.cs @@ -39,6 +39,7 @@ public void TearDown() [Test] [Category("FasterKV")] [Category("Compaction")] + [Ignore("Fix required in ReadOnlyMemory AsRef")] public void MemoryLogCompactionTest1([Values] TestUtils.DeviceType deviceType, [Values] CompactionType compactionType) { diff --git a/docs/_docs/30-fasterkv-record-locking.md b/docs/_docs/30-fasterkv-record-locking.md index 8f9ae122c..d7777d0f3 100644 --- a/docs/_docs/30-fasterkv-record-locking.md +++ b/docs/_docs/30-fasterkv-record-locking.md @@ -29,6 +29,10 @@ Here are some Manual-locking use cases: - Lock key1, do a bunch of operations on other keys, then unlock key1. As long as the set of keys for this operation are partitioned by the choice for key1 and all updates on those keys are done only when the lock for key1 is held, this ensures the consistency of those keys' values. - Executing transactions with a mix of shared and exclusive operations on any number of keys as an atomic operation. +It is important not to mix operations between the different context types: +- If you issue a `BeginUnsafe()` from one of the `*UnsafeContext`s and then make a call on a `BasicContext` (or `ClientSession`) or `LockableContext`, the latter will try to acquire the epoch which is held by the `*UnsafeContext`. +- If you acquire an exclusive lock on a key via one of the `Lockable*Context`s and then make an update call with a non-`Lockable` context on the same key, the latter will try to acquire the exclusive lock. + ### Considerations All manual locking of keys must lock the keys in a deterministic order, and unlock in the reverse order, to avoid deadlocks. @@ -232,11 +236,21 @@ When locking for `Read()` operations, we never take an exclusive lock. This requ #### Read Locking for InMemory Records Read-locks records as soon as they are found, calls the caller's `IFunctions` callback, unlocks, and returns. The sequence is: -If the record is in the readcache, readlock it, call `IFasterSession.SingleReader`, unlock it, and return. +If the record is in the readcache: +- readlock it +- call `IFasterSession.SingleReader` +- unlock it -If the record is in the mutable region, readlock it, call `IFasterSession.ConcurrentReader`, unlock it, and return. +If the record is in the mutable region: +- readlock it +- call `IFasterSession.ConcurrentReader` +- unlock it -If the record is in the immutable region, readlock it, call `IFasterSession.SingleReader`, unlock it, and return. The caller may have directed that these records be copied to tail; if so, we have the same issue of lock transfer to the new main-log record as in [Read Locking for OnDisk Records](#read-locking-for-ondisk-records). The unlocking utility function make this transparent to `InternalRead()` (or in this case, `ReadFromImmutableRegion()`). +If the record is in the immutable region: +- readlock it, +- call `IFasterSession.SingleReader` +- The caller may have directed that records read from the immutable region be copied to tail; if so, we have the same issue of lock transfer to the new main-log record as in [Read Locking for OnDisk Records](#read-locking-for-ondisk-records). The unlocking utility function make this transparent to `InternalRead()` (or in this case, `ReadFromImmutableRegion()`). +- unlock it #### Read Locking for OnDisk Records This goes through the pending-read processing starting with `InternalContinuePendingRead`. @@ -270,13 +284,29 @@ After this we know HeadAddress will not change. Initialize the new record from t Call `CompleteTwoPhaseCopyToTail` (like `InternalTryCopyToTail`, this includes copying to ReadCache despite the name). Since we have a readlock, not an exclusive lock, we must transfer all read locks, either from an in-memory source record or from a LockTable entry. -### Update Locking Conceptual Flow -TODO +### Upsert Locking Conceptual Flow +Because this is a blind update, it operates differently depending on whether, and where, a source record is found. + +If the record is in the readcache: +- exclusive lock it +- add a new record TODO complete details, e.g. call `IFasterSession.SingleReader`, ... +- unlock it + +If the record is in the mutable region: +- readlock it +- call `IFasterSession.ConcurrentReader` +- unlock it + +If the record is in the immutable region: +- readlock it, +- call `IFasterSession.SingleReader` +- The caller may have directed that records read from the immutable region be copied to tail; if so, we have the same issue of lock transfer to the new main-log record as in [Read Locking for OnDisk Records](#read-locking-for-ondisk-records). The unlocking utility function make this transparent to `InternalRead()` (or in this case, `ReadFromImmutableRegion()`). +- unlock it -#### Update Locking for InMemory Records +#### RMW Locking for InMemory Records TODO -#### Update Locking for OnDisk Records +#### Delete Locking for OnDisk Records TODO ## Implementation @@ -498,11 +528,11 @@ When the `ReadCache` is enabled, "records" from the `ReadCache` (actually simply - When there are no `ReadCache` entries in a hash chain, it looks like: `HashTable` -> m4000 -> m3000 -> m... - When there are `ReadCache` entries in a hash chain, it looks like: `HashTable` -> r8000 -> r7000 -> m4000 -> m3000 -> m... -As a terminology note, the sub-chain of r#### records is referred to as the `ReadCache` prefix of that hash chain. +As a terminology note, the sub-chain of r#### records is referred to as the `ReadCache` prefix chain of that hash chain. -In FASTER v1, updates involving `ReadCache` records strip the entire `ReadCache` prefix from the chain. Additionally, the `ReadCache` prefix is stripped from the hash chain when a `ReadCache` page with that hashcode is evicted due to memory limits. In FASTER v2, because `ReadCache` records may be locked, we must not lose those locks. This is resolved in two ways: -- On record updates, `ReadCache` prefixes are preserved except for the specific record being updated, which is spliced out and transferred to a `CopyToTail` on the main log, including any locks. -- When `ReadCache` pages are evicted, their records are removed from the `ReadCache` prefix, and any with locks are transferred to the `LockTable`. +In FASTER v1, updates involving `ReadCache` records stripped the entire `ReadCache` prefix from the chain. Additionally, the `ReadCache` prefix is stripped from the hash chain when a `ReadCache` page with that hashcode is evicted due to memory limits. In FASTER v2, because `ReadCache` records may be locked, we must not lose those locks. This is resolved in two ways: +- On record updates, `ReadCache` prefix chains are preserved except for the specific record being updated, which is spliced out and transferred to a `CopyToTail` on the main log, including any locks. +- When `ReadCache` pages are evicted, their records are removed from the `ReadCache` prefix chain, and any with locks are transferred to the `LockTable`. ### Record Transfers