Skip to content

Commit

Permalink
[C#] Fixes for LockTable and others (#783)
Browse files Browse the repository at this point in the history
* Locking fixes
- CompletePending* must also check to lock non-auxiliary records
- Ephemeral locking must look for lock evictions as well
- InternalLock must check for SpinWaitUntilRecordIsClosed
- Replace oneMiss handling in InternalLock
- EphemeralSUnlock must follow unlock failures
- LockTable should have its own bufferPool
- Ensure OnPagesClosed does eviction scans in address order

* More locking fixes:
- Fix missing unlock of a Tombstoned record in InternalContinuePendingRead
- Add more Asserts where ManualLocking requires a locked record
- Update and remove obsolete comments

* - Remove key-present optimization for SpinWaitUntilRecordIsClosed
- Better checking on following records transferred from LockTable in InternalLock(unlock)
- Add some additional comments

* Fix hash bucket overflow pointer during recovery
Fix debug assert logic

* fix sample

* Fix calls to TraceBack to use returned addresses correctly

* Fix min index size, to allow checkpointing via direct file system call (512 byte sector size).

* - Tighten up tentative LockTable lock acquisition
- Change more RETRY_NOW to RETRY_LATER
- Fix bug in TryLockExclusive; return value after draining readers
- CompleteTwoPhaseAssert fix to not steal LockTable records if recSrc.HasInMemorySrc
- Add comments and asserts

Co-authored-by: Badrish Chandramouli <[email protected]>
  • Loading branch information
TedHartMS and badrishc authored Jan 16, 2023
1 parent 0b877df commit 4ab5597
Show file tree
Hide file tree
Showing 25 changed files with 366 additions and 213 deletions.
2 changes: 1 addition & 1 deletion cs/samples/StoreVarLenTypes/AsciiSumSample.cs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public static void Run()
store.Log.FlushAndEvict(true); // Flush and evict all records to disk
var _status = s.RMW(_key, _input); // CopyUpdater to 270 (due to immutable source value on disk)

if (_status.IsPending)
if (!_status.IsPending)
{
Console.WriteLine("Error!");
return;
Expand Down
16 changes: 15 additions & 1 deletion cs/src/core/Allocator/AllocatorBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,10 @@ public abstract partial class AllocatorBase<Key, Value> : IDisposable
/// </summary>
public long BeginAddress;

/// <inheritdoc/>
public override string ToString()
=> $"TA {GetTailAddress()}, ROA {ReadOnlyAddress}, SafeROA {SafeReadOnlyAddress}, HA {HeadAddress}, SafeHA {SafeHeadAddress}, CUA {ClosedUntilAddress}, FUA {FlushedUntilAddress}";

#endregion

#region Protected device info
Expand Down Expand Up @@ -1294,13 +1298,23 @@ private void OnPagesClosed(long newSafeHeadAddress)
if (ReadCache)
EvictCallback(oldSafeHeadAddress, newSafeHeadAddress);

                // If we are going to scan, ensure earlier scans are done (we don't want to overwrite a later record with an earlier one)
                if ((OnLockEvictionObserver is not null) || (OnEvictionObserver is not null))
{
while (ClosedUntilAddress < oldSafeHeadAddress)
{
epoch.ProtectAndDrain();
Thread.Yield();
}
}

for (long closePageAddress = oldSafeHeadAddress & ~PageSizeMask; closePageAddress < newSafeHeadAddress; closePageAddress += PageSize)
{
long start = oldSafeHeadAddress > closePageAddress ? oldSafeHeadAddress : closePageAddress;
long end = newSafeHeadAddress < closePageAddress + PageSize ? newSafeHeadAddress : closePageAddress + PageSize;

// If there are no active locking sessions, there should be no locks in the log.
if (this.NumActiveLockingSessions > 0 && OnLockEvictionObserver is not null)
if (OnLockEvictionObserver is not null)
MemoryPageScan(start, end, OnLockEvictionObserver);

if (OnEvictionObserver is not null)
Expand Down
12 changes: 4 additions & 8 deletions cs/src/core/Allocator/LockEvictionObserver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,10 @@ public void OnNext(IFasterScanIterator<Key, Value> iter)
{
while (iter.GetNext(out RecordInfo info))
{
// If it is not Invalid, we must Seal it so there is no possibility it will be missed while we're in the process
// of transferring it to the Lock Table. Use manualLocking as we want to transfer the locks, not drain them.
if (!info.IsLocked)
continue;

// Now get it into the lock table, so it is ready as soon as the record is removed.
// We do not have to worry about conflicts with other threads, because lock and unlock stop at HeadAddress.
this.store.LockTable.TransferFromLogRecord(ref iter.GetKey(), info);
// Note: we do not have to worry about conflicts with other threads, because other operations
// (data operations and lock and unlock) stop at HeadAddress.
if (info.IsLocked)
this.store.LockTable.TransferFromLogRecord(ref iter.GetKey(), info);
}
}

Expand Down
13 changes: 5 additions & 8 deletions cs/src/core/ClientSession/LockableContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,8 @@ public unsafe void Lock(ref Key key, LockType lockType)
LockOperation lockOp = new(LockOperationType.Lock, lockType);

OperationStatus status;
bool oneMiss = false;
do
status = clientSession.fht.InternalLock(ref key, lockOp, ref oneMiss, out _);
status = clientSession.fht.InternalLock(ref key, lockOp, out _);
while (clientSession.fht.HandleImmediateNonPendingRetryStatus(status, clientSession.ctx, FasterSession));
Debug.Assert(status == OperationStatus.SUCCESS);

Expand Down Expand Up @@ -80,9 +79,8 @@ public void Unlock(ref Key key, LockType lockType)
LockOperation lockOp = new(LockOperationType.Unlock, lockType);

OperationStatus status;
bool oneMiss = false;
do
status = clientSession.fht.InternalLock(ref key, lockOp, ref oneMiss, out _);
status = clientSession.fht.InternalLock(ref key, lockOp, out _);
while (clientSession.fht.HandleImmediateNonPendingRetryStatus(status, clientSession.ctx, FasterSession));
Debug.Assert(status == OperationStatus.SUCCESS);

Expand Down Expand Up @@ -112,9 +110,8 @@ public void Unlock(ref Key key, LockType lockType)

OperationStatus status;
RecordInfo lockInfo;
bool oneMiss = false;
do
status = clientSession.fht.InternalLock(ref key, lockOp, ref oneMiss, out lockInfo);
status = clientSession.fht.InternalLock(ref key, lockOp, out lockInfo);
while (clientSession.fht.HandleImmediateNonPendingRetryStatus(status, clientSession.ctx, FasterSession));
Debug.Assert(status == OperationStatus.SUCCESS);
return (lockInfo.IsLockedExclusive, lockInfo.NumLockedShared);
Expand Down Expand Up @@ -667,13 +664,13 @@ public void CheckpointCompletionCallback(int sessionID, string sessionName, Comm
#region Ephemeral locking
public bool TryLockEphemeralExclusive(ref RecordInfo recordInfo)
{
Debug.Assert(recordInfo.IsLockedExclusive, "Attempting to use a non-XLocked key in a Lockable context (requesting XLock)");
Debug.Assert(recordInfo.IsLockedExclusive, $"Attempting to use a non-XLocked key in a Lockable context (requesting XLock): XLocked {recordInfo.IsLockedExclusive}, Slocked {recordInfo.NumLockedShared}");
return true;
}

public bool TryLockEphemeralShared(ref RecordInfo recordInfo)
{
Debug.Assert(recordInfo.IsLocked, "Attempting to use a non-Locked (S or X) key in a Lockable context (requesting SLock)");
Debug.Assert(recordInfo.IsLocked, $"Attempting to use a non-Locked (S or X) key in a Lockable context (requesting SLock): XLocked {recordInfo.IsLockedExclusive}, Slocked {recordInfo.NumLockedShared}");
return true;
}

Expand Down
13 changes: 5 additions & 8 deletions cs/src/core/ClientSession/LockableUnsafeContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,8 @@ public unsafe void Lock(ref Key key, LockType lockType)
LockOperation lockOp = new(LockOperationType.Lock, lockType);

OperationStatus status;
bool oneMiss = false;
do
status = clientSession.fht.InternalLock(ref key, lockOp, ref oneMiss, out _);
status = clientSession.fht.InternalLock(ref key, lockOp, out _);
while (clientSession.fht.HandleImmediateNonPendingRetryStatus(status, clientSession.ctx, FasterSession));
Debug.Assert(status == OperationStatus.SUCCESS);

Expand All @@ -82,9 +81,8 @@ public void Unlock(ref Key key, LockType lockType)
LockOperation lockOp = new(LockOperationType.Unlock, lockType);

OperationStatus status;
bool oneMiss = false;
do
status = clientSession.fht.InternalLock(ref key, lockOp, ref oneMiss, out _);
status = clientSession.fht.InternalLock(ref key, lockOp, out _);
while (clientSession.fht.HandleImmediateNonPendingRetryStatus(status, clientSession.ctx, FasterSession));
Debug.Assert(status == OperationStatus.SUCCESS);

Expand All @@ -107,9 +105,8 @@ public void Unlock(ref Key key, LockType lockType)

OperationStatus status;
RecordInfo lockInfo;
bool oneMiss = false;
do
status = clientSession.fht.InternalLock(ref key, lockOp, ref oneMiss, out lockInfo);
status = clientSession.fht.InternalLock(ref key, lockOp, out lockInfo);
while (clientSession.fht.HandleImmediateNonPendingRetryStatus(status, clientSession.ctx, FasterSession));
Debug.Assert(status == OperationStatus.SUCCESS);
return (lockInfo.IsLockedExclusive, lockInfo.NumLockedShared);
Expand Down Expand Up @@ -576,13 +573,13 @@ public void CheckpointCompletionCallback(int sessionID, string sessionName, Comm
#region Ephemeral locking
public bool TryLockEphemeralExclusive(ref RecordInfo recordInfo)
{
Debug.Assert(recordInfo.IsLockedExclusive, "Attempting to use a non-XLocked key in a Lockable context (requesting XLock)");
Debug.Assert(recordInfo.IsLockedExclusive, $"Attempting to use a non-XLocked key in a Lockable context (requesting XLock): XLocked {recordInfo.IsLockedExclusive}, Slocked {recordInfo.NumLockedShared}");
return true;
}

public bool TryLockEphemeralShared(ref RecordInfo recordInfo)
{
Debug.Assert(recordInfo.IsLocked, "Attempting to use a non-Locked (S or X) key in a Lockable context (requesting SLock)");
Debug.Assert(recordInfo.IsLocked, $"Attempting to use a non-Locked (S or X) key in a Lockable context (requesting SLock): XLocked {recordInfo.IsLockedExclusive}, Slocked {recordInfo.NumLockedShared}");
return true;
}

Expand Down
30 changes: 18 additions & 12 deletions cs/src/core/Index/Common/RecordInfo.cs
Original file line number Diff line number Diff line change
Expand Up @@ -129,9 +129,10 @@ public void UnlockExclusive()
/// </summary>
/// <returns>Whether lock was acquired successfully</returns>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public bool TryLockExclusive()
public bool TryLockExclusive(bool tentative = false)
{
int spinCount = Constants.kMaxLockSpins;
long tentativeBit = tentative ? kTentativeBitMask : 0;

// Acquire exclusive lock (readers may still be present; we'll drain them later)
while (true)
Expand All @@ -141,7 +142,7 @@ public bool TryLockExclusive()
return false;
if ((expected_word & kExclusiveLockBitMask) == 0)
{
if (expected_word == Interlocked.CompareExchange(ref word, expected_word | kExclusiveLockBitMask, expected_word))
if (expected_word == Interlocked.CompareExchange(ref word, expected_word | kExclusiveLockBitMask | tentativeBit, expected_word))
break;
}
if (spinCount > 0 && --spinCount <= 0)
Expand All @@ -154,8 +155,11 @@ public bool TryLockExclusive()
{
if ((word & kSharedLockMaskInWord) == 0)
{
// Someone else may have transferred/invalidated the record while we were draining reads.
return !IsIntermediateOrInvalidWord(this.word);
// Someone else may have transferred/invalidated the record while we were draining reads. *Don't* check for Tentative here;
// we may have set it above, and no record should be set to tentative after it's been inserted into the hash chain.
if ((this.word & (kSealedBitMask | kValidBitMask)) == kValidBitMask)
return true;
break;
}
Thread.Yield();
}
Expand All @@ -165,7 +169,7 @@ public bool TryLockExclusive()
for (; ; Thread.Yield())
{
long expected_word = word;
if (Interlocked.CompareExchange(ref word, expected_word & ~kExclusiveLockBitMask, expected_word) == expected_word)
if (Interlocked.CompareExchange(ref word, expected_word & ~(kExclusiveLockBitMask | tentativeBit), expected_word) == expected_word)
break;
}
return false;
Expand All @@ -189,7 +193,7 @@ public bool TryUnlockShared()
/// </summary>
/// <returns>Whether lock was acquired successfully</returns>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public bool TryLockShared()
public bool TryLockShared(bool tentative = false)
{
int spinCount = Constants.kMaxLockSpins;

Expand All @@ -202,7 +206,9 @@ public bool TryLockShared()
if (((expected_word & kExclusiveLockBitMask) == 0) // not exclusively locked
&& (expected_word & kSharedLockMaskInWord) != kSharedLockMaskInWord) // shared lock is not full
{
if (expected_word == Interlocked.CompareExchange(ref word, expected_word + kSharedLockIncrement, expected_word))
// If there are no shared locks, this one will be tentative if requested. Otherwise, do not force existing locks to be tentative.
long tentativeBit = tentative && ((expected_word & kSharedLockMaskInWord) == 0) ? kTentativeBitMask : 0;
if (expected_word == Interlocked.CompareExchange(ref word, (expected_word + kSharedLockIncrement) | tentativeBit, expected_word))
break;
}
if (spinCount > 0 && --spinCount <= 0)
Expand All @@ -214,19 +220,19 @@ public bool TryLockShared()

// For new records, which don't need the Interlocked overhead.
[MethodImpl(MethodImplOptions.AggressiveInlining)]
internal void InitializeLock(LockType lockType)
internal void InitializeLock(LockType lockType, bool tentative)
{
if (lockType == LockType.Shared)
this.InitializeLockShared();
this.InitializeLockShared(tentative);
else
this.InitializeLockExclusive();
this.InitializeLockExclusive(tentative);
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
internal void InitializeLockShared() => this.word += kSharedLockIncrement;
internal void InitializeLockShared(bool tentative = false) => this.word += kSharedLockIncrement | (tentative ? kTentativeBitMask : 0);

[MethodImpl(MethodImplOptions.AggressiveInlining)]
internal void InitializeLockExclusive() => this.word |= kExclusiveLockBitMask;
internal void InitializeLockExclusive(bool tentative = false) => this.word |= kExclusiveLockBitMask | (tentative ? kTentativeBitMask : 0);

/// <summary>
/// Try to reset the modified bit of the RecordInfo
Expand Down
10 changes: 5 additions & 5 deletions cs/src/core/Index/FASTER/FASTER.cs
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ public FasterKV(long size, LogSettings logSettings,
sectorSize = (int)logSettings.LogDevice.SectorSize;
Initialize(size, sectorSize);

this.LockTable = new LockTable<Key>(lockTableSize, this.comparer, keyLen, keyLen is null ? null : hlog.bufferPool);
this.LockTable = new LockTable<Key>(lockTableSize, this.comparer, keyLen);

systemState = SystemState.Make(Phase.REST, 1);

Expand Down Expand Up @@ -859,8 +859,8 @@ private unsafe long GetEntryCount()
for (int bucket_entry = 0; bucket_entry < Constants.kOverflowBucketIndex; ++bucket_entry)
if (b.bucket_entries[bucket_entry] >= beginAddress)
++total_entry_count;
if (b.bucket_entries[Constants.kOverflowBucketIndex] == 0) break;
b = *((HashBucket*)overflowBucketsAllocator.GetPhysicalAddress((b.bucket_entries[Constants.kOverflowBucketIndex])));
if ((b.bucket_entries[Constants.kOverflowBucketIndex] & Constants.kAddressMask) == 0) break;
b = *(HashBucket*)overflowBucketsAllocator.GetPhysicalAddress(b.bucket_entries[Constants.kOverflowBucketIndex] & Constants.kAddressMask);
}
}
return total_entry_count;
Expand Down Expand Up @@ -894,8 +894,8 @@ private unsafe string DumpDistributionInternal(int version)
++total_record_count;
}
}
if (b.bucket_entries[Constants.kOverflowBucketIndex] == 0) break;
b = *((HashBucket*)overflowBucketsAllocator.GetPhysicalAddress((b.bucket_entries[Constants.kOverflowBucketIndex])));
if ((b.bucket_entries[Constants.kOverflowBucketIndex] & Constants.kAddressMask) == 0) break;
b = *(HashBucket*)overflowBucketsAllocator.GetPhysicalAddress(b.bucket_entries[Constants.kOverflowBucketIndex] & Constants.kAddressMask);
}

if (!histogram.ContainsKey(cnt)) histogram[cnt] = 0;
Expand Down
Loading

0 comments on commit 4ab5597

Please sign in to comment.