From 45c65dc268d3faa3b36cae3b1c7b7aab280b5e80 Mon Sep 17 00:00:00 2001 From: Tianyu Li Date: Mon, 4 Dec 2023 19:33:45 +0000 Subject: [PATCH 1/4] Revise BumpEpoch to accommodate calling from both protected and unprotected contexts. Revise FasterLog completion logic to avoid epoch double-protect. --- cs/src/core/Epochs/LightEpoch.cs | 49 ++++++++++++++++----- cs/src/core/FasterLog/FasterLog.cs | 70 ++++++++++++++---------------- 2 files changed, 70 insertions(+), 49 deletions(-) diff --git a/cs/src/core/Epochs/LightEpoch.cs b/cs/src/core/Epochs/LightEpoch.cs index 01ca58c06..3b903f72c 100644 --- a/cs/src/core/Epochs/LightEpoch.cs +++ b/cs/src/core/Epochs/LightEpoch.cs @@ -181,7 +181,7 @@ public bool ThisInstanceProtected() } /// - /// Enter the thread into the protected code region + /// Enter the thread into the protected code region. /// /// Current epoch [MethodImpl(MethodImplOptions.AggressiveInlining)] @@ -225,7 +225,6 @@ public void Resume() /// long BumpCurrentEpoch() { - Debug.Assert(this.ThisInstanceProtected(), "BumpCurrentEpoch must be called on a protected thread"); long nextEpoch = Interlocked.Increment(ref CurrentEpoch); if (drainCount > 0) @@ -235,14 +234,16 @@ long BumpCurrentEpoch() } /// - /// Increment current epoch and associate trigger action - /// with the prior epoch + /// Increment current epoch and associate trigger action with the prior epoch. The trigger action will execute + /// on a protected thread only after the prior epoch is safe (i.e., after all active threads have advanced past it) /// /// Trigger action - /// public void BumpCurrentEpoch(Action onDrain) { - long PriorEpoch = BumpCurrentEpoch() - 1; + long priorEpoch = BumpCurrentEpoch() - 1; + // track whether we acquired protection when calling from unprotected thread, so we restore the thread to + // its pre-call protection status after we are done + var acquiredProtection = false; int i = 0; while (true) @@ -253,7 +254,7 @@ public void BumpCurrentEpoch(Action onDrain) if (Interlocked.CompareExchange(ref drainList[i].epoch, long.MaxValue - 1, long.MaxValue) == long.MaxValue) { drainList[i].action = onDrain; - drainList[i].epoch = PriorEpoch; + drainList[i].epoch = priorEpoch; Interlocked.Increment(ref drainCount); break; } @@ -264,12 +265,18 @@ public void BumpCurrentEpoch(Action onDrain) if (triggerEpoch <= SafeToReclaimEpoch) { + // Protection is required whenever we may execute a trigger action + if (!acquiredProtection && !ThisInstanceProtected()) + { + acquiredProtection = true; + Resume(); + } // This was a slot with an epoch that was safe to reclaim. If it still is, execute its trigger, then assign this action/epoch to the slot. if (Interlocked.CompareExchange(ref drainList[i].epoch, long.MaxValue - 1, triggerEpoch) == triggerEpoch) { var triggerAction = drainList[i].action; drainList[i].action = onDrain; - drainList[i].epoch = PriorEpoch; + drainList[i].epoch = priorEpoch; triggerAction(); break; } @@ -279,14 +286,34 @@ public void BumpCurrentEpoch(Action onDrain) if (++i == kDrainListSize) { // We are at the end of the drain list and found no empty or reclaimable slot. ProtectAndDrain, which should clear one or more slots. - ProtectAndDrain(); + if (!acquiredProtection && !ThisInstanceProtected()) + { + acquiredProtection = true; + Resume(); + } + else + { + ProtectAndDrain(); + } + i = 0; Thread.Yield(); } } - // Now ProtectAndDrain, which may execute the action we just added. - ProtectAndDrain(); + if (!acquiredProtection && !ThisInstanceProtected()) + { + acquiredProtection = true; + Resume(); + } + else + { + // Now ProtectAndDrain, which may execute the action we just added. + ProtectAndDrain(); + } + + if (acquiredProtection) + Release(); } /// diff --git a/cs/src/core/FasterLog/FasterLog.cs b/cs/src/core/FasterLog/FasterLog.cs index 82f444149..b29cccafc 100644 --- a/cs/src/core/FasterLog/FasterLog.cs +++ b/cs/src/core/FasterLog/FasterLog.cs @@ -42,6 +42,7 @@ public sealed class FasterLog : IDisposable readonly Queue<(long, FasterLogRecoveryInfo)> ongoingCommitRequests; readonly List coveredCommits = new(); long commitNum, commitCoveredAddress; + private bool logClosing = false; readonly LogCommitPolicy commitPolicy; @@ -74,6 +75,7 @@ public sealed class FasterLog : IDisposable /// public long SafeTailAddress; + /// /// Dictionary of recovered iterators and their committed until addresses /// @@ -231,6 +233,7 @@ public void Reset() CommittedUntilAddress = beginAddress; CommittedBeginAddress = beginAddress; SafeTailAddress = beginAddress; + logClosing = false; commitNum = 0; this.beginAddress = beginAddress; @@ -268,6 +271,8 @@ public void Initialize(long beginAddress, long committedUntilAddress, long lastC commitNum = lastCommitNum; this.beginAddress = beginAddress; + logClosing = false; + if (lastCommitNum > 0) logCommitManager.OnRecovery(lastCommitNum); } @@ -322,24 +327,15 @@ public void Dispose() /// whether to spin until log completion becomes committed public void CompleteLog(bool spinWait = false) { - // Ensure progress even if there is no thread in epoch table. Also, BumpCurrentEpoch must be done on a protected thread. - bool isProtected = epoch.ThisInstanceProtected(); - if (!isProtected) - epoch.Resume(); - try - { - // Ensure all currently started entries will enqueue before we declare log closed - epoch.BumpCurrentEpoch(() => - { - CommitInternal(out _, out _, false, Array.Empty(), long.MaxValue, null); - }); - } - finally + // Use this to signal to future enqueue calls that they should stop as we are closing the log + logClosing = true; + // use a bump to ensure that any concurrent enqueues that have marched passed the check will finish before + // we close the log + epoch.BumpCurrentEpoch(() => { - if (!isProtected) - epoch.Suspend(); - } - + CommitInternal(out _, out _, false, Array.Empty(), long.MaxValue, null); + }); + if (spinWait) WaitForCommit(TailAddress, long.MaxValue); } @@ -480,6 +476,7 @@ public long Enqueue(IEnumerable entries) where T : ILogEnqueueEntry #endregion #region TryEnqueue + /// /// Try to enqueue entry to log (in memory). If it returns true, we are /// done. If it returns false, we need to retry. @@ -497,7 +494,7 @@ public unsafe bool TryEnqueue(T entry, out long logicalAddress) where T : ILo epoch.Resume(); - if (commitNum == long.MaxValue) throw new FasterException("Attempting to enqueue into a completed log"); + if (logClosing || commitNum == long.MaxValue) throw new FasterException("Attempting to enqueue into a completed log"); logicalAddress = allocator.TryAllocateRetryNow(allocatedLength); if (logicalAddress == 0) @@ -538,7 +535,7 @@ public unsafe bool TryEnqueue(IEnumerable entries, out long logicalAddress ValidateAllocatedLength(allocatedLength); epoch.Resume(); - if (commitNum == long.MaxValue) throw new FasterException("Attempting to enqueue into a completed log"); + if (logClosing || commitNum == long.MaxValue) throw new FasterException("Attempting to enqueue into a completed log"); logicalAddress = allocator.TryAllocateRetryNow(allocatedLength); @@ -578,8 +575,7 @@ public unsafe bool TryEnqueue(byte[] entry, out long logicalAddress) ValidateAllocatedLength(allocatedLength); epoch.Resume(); - - if (commitNum == long.MaxValue) throw new FasterException("Attempting to enqueue into a completed log"); + if (logClosing || commitNum == long.MaxValue) throw new FasterException("Attempting to enqueue into a completed log"); logicalAddress = allocator.TryAllocateRetryNow(allocatedLength); if (logicalAddress == 0) @@ -619,8 +615,7 @@ public unsafe bool UnsafeTryEnqueueRaw(ReadOnlySpan entryBytes, bool noCom ValidateAllocatedLength(allocatedLength); epoch.Resume(); - - if (commitNum == long.MaxValue) throw new FasterException("Attempting to enqueue into a completed log"); + if (logClosing || commitNum == long.MaxValue) throw new FasterException("Attempting to enqueue into a completed log"); logicalAddress = allocator.TryAllocateRetryNow(allocatedLength); if (logicalAddress == 0) @@ -654,8 +649,7 @@ public unsafe bool TryEnqueue(ReadOnlySpan entry, out long logicalAddress) ValidateAllocatedLength(allocatedLength); epoch.Resume(); - - if (commitNum == long.MaxValue) throw new FasterException("Attempting to enqueue into a completed log"); + if (logClosing || commitNum == long.MaxValue) throw new FasterException("Attempting to enqueue into a completed log"); logicalAddress = allocator.TryAllocateRetryNow(allocatedLength); if (logicalAddress == 0) @@ -689,6 +683,7 @@ public unsafe void Enqueue(THeader userHeader, out long logicalAddress) ValidateAllocatedLength(allocatedLength); epoch.Resume(); + if (logClosing || commitNum == long.MaxValue) throw new FasterException("Attempting to enqueue into a completed log"); logicalAddress = AllocateBlock(allocatedLength); @@ -715,6 +710,7 @@ public unsafe void Enqueue(THeader userHeader, ref SpanByte item, out l ValidateAllocatedLength(allocatedLength); epoch.Resume(); + if (logClosing || commitNum == long.MaxValue) throw new FasterException("Attempting to enqueue into a completed log"); logicalAddress = AllocateBlock(allocatedLength); @@ -743,6 +739,7 @@ public unsafe void Enqueue(THeader userHeader, ref SpanByte item1, ref ValidateAllocatedLength(allocatedLength); epoch.Resume(); + if (logClosing || commitNum == long.MaxValue) throw new FasterException("Attempting to enqueue into a completed log"); logicalAddress = AllocateBlock(allocatedLength); @@ -772,8 +769,9 @@ public unsafe void Enqueue(THeader userHeader, ref SpanByte item1, ref int allocatedLength = headerSize + Align(length); ValidateAllocatedLength(allocatedLength); - epoch.Resume(); - + epoch.Resume(); + if (logClosing || commitNum == long.MaxValue) throw new FasterException("Attempting to enqueue into a completed log"); + logicalAddress = AllocateBlock(allocatedLength); var physicalAddress = (byte*)allocator.GetPhysicalAddress(logicalAddress); @@ -801,6 +799,7 @@ public unsafe void Enqueue(byte userHeader, ref SpanByte item, out long logicalA ValidateAllocatedLength(allocatedLength); epoch.Resume(); + if (logClosing || commitNum == long.MaxValue) throw new FasterException("Attempting to enqueue into a completed log"); logicalAddress = AllocateBlock(allocatedLength); @@ -862,6 +861,7 @@ public unsafe bool TryEnqueue(THeader userHeader, ref SpanByte item1, r ValidateAllocatedLength(allocatedLength); epoch.Resume(); + if (logClosing || commitNum == long.MaxValue) throw new FasterException("Attempting to enqueue into a completed log"); logicalAddress = allocator.TryAllocateRetryNow(allocatedLength); if (logicalAddress == 0) @@ -901,6 +901,7 @@ public unsafe bool TryEnqueue(THeader userHeader, ref SpanByte item1, r ValidateAllocatedLength(allocatedLength); epoch.Resume(); + if (logClosing || commitNum == long.MaxValue) throw new FasterException("Attempting to enqueue into a completed log"); logicalAddress = allocator.TryAllocateRetryNow(allocatedLength); if (logicalAddress == 0) @@ -938,6 +939,7 @@ public unsafe bool TryEnqueue(byte userHeader, ref SpanByte item, out long logic ValidateAllocatedLength(allocatedLength); epoch.Resume(); + if (logClosing || commitNum == long.MaxValue) throw new FasterException("Attempting to enqueue into a completed log"); logicalAddress = allocator.TryAllocateRetryNow(allocatedLength); if (logicalAddress == 0) @@ -2533,7 +2535,7 @@ private unsafe bool TryAppend(IReadOnlySpanBatch readOnlySpanBatch, out long log ValidateAllocatedLength(allocatedLength); epoch.Resume(); - if (commitNum == long.MaxValue) throw new FasterException("Attempting to enqueue into a completed log"); + if (logClosing || commitNum == long.MaxValue) throw new FasterException("Attempting to enqueue into a completed log"); logicalAddress = allocator.TryAllocateRetryNow(allocatedLength); @@ -2779,16 +2781,8 @@ private bool CommitInternal(out long commitTail, out long actualCommitNum, bool } // Otherwise, move to set read-only tail and flush - try - { - epoch.Resume(); - if (!allocator.ShiftReadOnlyToTail(out _, out _)) - CommitMetadataOnly(ref info); - } - finally - { - epoch.Suspend(); - } + if (!allocator.ShiftReadOnlyToTail(out _, out _)) + CommitMetadataOnly(ref info); return true; } From ae756ace85d754b68b3199ca8d70718b28173ab1 Mon Sep 17 00:00:00 2001 From: Tianyu Li Date: Mon, 4 Dec 2023 22:46:53 +0000 Subject: [PATCH 2/4] API change for BCE to add long return value and a default parameter variant --- cs/src/core/Epochs/LightEpoch.cs | 108 +++++++++++++++---------------- 1 file changed, 53 insertions(+), 55 deletions(-) diff --git a/cs/src/core/Epochs/LightEpoch.cs b/cs/src/core/Epochs/LightEpoch.cs index 3b903f72c..2d75e4328 100644 --- a/cs/src/core/Epochs/LightEpoch.cs +++ b/cs/src/core/Epochs/LightEpoch.cs @@ -219,88 +219,84 @@ public void Resume() ProtectAndDrain(); } - /// - /// Increment global current epoch - /// - /// - long BumpCurrentEpoch() - { - long nextEpoch = Interlocked.Increment(ref CurrentEpoch); - - if (drainCount > 0) - Drain(nextEpoch); - - return nextEpoch; - } - /// /// Increment current epoch and associate trigger action with the prior epoch. The trigger action will execute /// on a protected thread only after the prior epoch is safe (i.e., after all active threads have advanced past it) /// - /// Trigger action - public void BumpCurrentEpoch(Action onDrain) + /// Trigger action, or null if none is necessary + /// new epoch of the system + public long BumpCurrentEpoch(Action onDrain = null) { - long priorEpoch = BumpCurrentEpoch() - 1; + var nextEpoch = Interlocked.Increment(ref CurrentEpoch); + var priorEpoch = nextEpoch - 1; // track whether we acquired protection when calling from unprotected thread, so we restore the thread to // its pre-call protection status after we are done var acquiredProtection = false; - int i = 0; - while (true) + if (onDrain != null) { - if (drainList[i].epoch == long.MaxValue) + for (int i = 0;;) { - // This was an empty slot. If it still is, assign this action/epoch to the slot. - if (Interlocked.CompareExchange(ref drainList[i].epoch, long.MaxValue - 1, long.MaxValue) == long.MaxValue) + if (drainList[i].epoch == long.MaxValue) { - drainList[i].action = onDrain; - drainList[i].epoch = priorEpoch; - Interlocked.Increment(ref drainCount); - break; + // This was an empty slot. If it still is, assign this action/epoch to the slot. + if (Interlocked.CompareExchange(ref drainList[i].epoch, long.MaxValue - 1, long.MaxValue) == + long.MaxValue) + { + drainList[i].action = onDrain; + drainList[i].epoch = priorEpoch; + Interlocked.Increment(ref drainCount); + break; + } } - } - else - { - var triggerEpoch = drainList[i].epoch; + else + { + var triggerEpoch = drainList[i].epoch; - if (triggerEpoch <= SafeToReclaimEpoch) + if (triggerEpoch <= SafeToReclaimEpoch) + { + // Protection is required whenever we may execute a trigger action + if (!acquiredProtection && !ThisInstanceProtected()) + { + acquiredProtection = true; + Resume(); + } + + // This was a slot with an epoch that was safe to reclaim. If it still is, execute its trigger, then assign this action/epoch to the slot. + if (Interlocked.CompareExchange(ref drainList[i].epoch, long.MaxValue - 1, triggerEpoch) == + triggerEpoch) + { + var triggerAction = drainList[i].action; + drainList[i].action = onDrain; + drainList[i].epoch = priorEpoch; + triggerAction(); + break; + } + } + } + + if (++i == kDrainListSize) { - // Protection is required whenever we may execute a trigger action + // We are at the end of the drain list and found no empty or reclaimable slot. ProtectAndDrain, which should clear one or more slots. if (!acquiredProtection && !ThisInstanceProtected()) { acquiredProtection = true; Resume(); } - // This was a slot with an epoch that was safe to reclaim. If it still is, execute its trigger, then assign this action/epoch to the slot. - if (Interlocked.CompareExchange(ref drainList[i].epoch, long.MaxValue - 1, triggerEpoch) == triggerEpoch) + else { - var triggerAction = drainList[i].action; - drainList[i].action = onDrain; - drainList[i].epoch = priorEpoch; - triggerAction(); - break; + ProtectAndDrain(); } - } - } - if (++i == kDrainListSize) - { - // We are at the end of the drain list and found no empty or reclaimable slot. ProtectAndDrain, which should clear one or more slots. - if (!acquiredProtection && !ThisInstanceProtected()) - { - acquiredProtection = true; - Resume(); + i = 0; + Thread.Yield(); } - else - { - ProtectAndDrain(); - } - - i = 0; - Thread.Yield(); } } + // If just bumping epoch without adding any actions, there is no need to drain + if (drainCount == 0) return nextEpoch; + if (!acquiredProtection && !ThisInstanceProtected()) { acquiredProtection = true; @@ -314,6 +310,8 @@ public void BumpCurrentEpoch(Action onDrain) if (acquiredProtection) Release(); + + return nextEpoch; } /// From d9127b7b80db0ef6b42fb66d64e6206359259bdd Mon Sep 17 00:00:00 2001 From: Tianyu Li Date: Tue, 5 Dec 2023 16:31:44 +0000 Subject: [PATCH 3/4] add else for drain count check --- cs/src/core/Epochs/LightEpoch.cs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/cs/src/core/Epochs/LightEpoch.cs b/cs/src/core/Epochs/LightEpoch.cs index 2d75e4328..bf61d6cef 100644 --- a/cs/src/core/Epochs/LightEpoch.cs +++ b/cs/src/core/Epochs/LightEpoch.cs @@ -293,9 +293,11 @@ public long BumpCurrentEpoch(Action onDrain = null) } } } - // If just bumping epoch without adding any actions, there is no need to drain - if (drainCount == 0) return nextEpoch; + else if (drainCount == 0) + { + return nextEpoch; + } if (!acquiredProtection && !ThisInstanceProtected()) { From ca3c097165627a0b1e95e5ecda82dc2704f5b788 Mon Sep 17 00:00:00 2001 From: Tianyu Li Date: Tue, 5 Dec 2023 19:03:52 +0000 Subject: [PATCH 4/4] Split BCE into two variants --- cs/src/core/Epochs/LightEpoch.cs | 124 ++++++++++++++++++------------- 1 file changed, 72 insertions(+), 52 deletions(-) diff --git a/cs/src/core/Epochs/LightEpoch.cs b/cs/src/core/Epochs/LightEpoch.cs index bf61d6cef..989650d84 100644 --- a/cs/src/core/Epochs/LightEpoch.cs +++ b/cs/src/core/Epochs/LightEpoch.cs @@ -218,87 +218,107 @@ public void Resume() Acquire(); ProtectAndDrain(); } + + /// + /// Increment current epoch. + /// + /// new epoch of the system + public long BumpCurrentEpoch() + { + var nextEpoch = Interlocked.Increment(ref CurrentEpoch); + + if (drainCount > 0) + { + // track whether we acquired protection when calling from unprotected thread, so we restore the thread to + // its pre-call protection status after we are done + if (!ThisInstanceProtected()) + { + Resume(); + Release(); + } + else + { + ProtectAndDrain(); + } + } + + return nextEpoch; + } /// /// Increment current epoch and associate trigger action with the prior epoch. The trigger action will execute /// on a protected thread only after the prior epoch is safe (i.e., after all active threads have advanced past it) /// - /// Trigger action, or null if none is necessary + /// Trigger action /// new epoch of the system - public long BumpCurrentEpoch(Action onDrain = null) + public long BumpCurrentEpoch(Action onDrain) { + Debug.Assert(onDrain != null); + var nextEpoch = Interlocked.Increment(ref CurrentEpoch); var priorEpoch = nextEpoch - 1; // track whether we acquired protection when calling from unprotected thread, so we restore the thread to // its pre-call protection status after we are done var acquiredProtection = false; - - if (onDrain != null) + + for (int i = 0;;) { - for (int i = 0;;) + if (drainList[i].epoch == long.MaxValue) { - if (drainList[i].epoch == long.MaxValue) - { - // This was an empty slot. If it still is, assign this action/epoch to the slot. - if (Interlocked.CompareExchange(ref drainList[i].epoch, long.MaxValue - 1, long.MaxValue) == - long.MaxValue) - { - drainList[i].action = onDrain; - drainList[i].epoch = priorEpoch; - Interlocked.Increment(ref drainCount); - break; - } - } - else + // This was an empty slot. If it still is, assign this action/epoch to the slot. + if (Interlocked.CompareExchange(ref drainList[i].epoch, long.MaxValue - 1, long.MaxValue) == + long.MaxValue) { - var triggerEpoch = drainList[i].epoch; - - if (triggerEpoch <= SafeToReclaimEpoch) - { - // Protection is required whenever we may execute a trigger action - if (!acquiredProtection && !ThisInstanceProtected()) - { - acquiredProtection = true; - Resume(); - } - - // This was a slot with an epoch that was safe to reclaim. If it still is, execute its trigger, then assign this action/epoch to the slot. - if (Interlocked.CompareExchange(ref drainList[i].epoch, long.MaxValue - 1, triggerEpoch) == - triggerEpoch) - { - var triggerAction = drainList[i].action; - drainList[i].action = onDrain; - drainList[i].epoch = priorEpoch; - triggerAction(); - break; - } - } + drainList[i].action = onDrain; + drainList[i].epoch = priorEpoch; + Interlocked.Increment(ref drainCount); + break; } + } + else + { + var triggerEpoch = drainList[i].epoch; - if (++i == kDrainListSize) + if (triggerEpoch <= SafeToReclaimEpoch) { - // We are at the end of the drain list and found no empty or reclaimable slot. ProtectAndDrain, which should clear one or more slots. + // Protection is required whenever we may execute a trigger action if (!acquiredProtection && !ThisInstanceProtected()) { acquiredProtection = true; Resume(); } - else + + // This was a slot with an epoch that was safe to reclaim. If it still is, execute its trigger, then assign this action/epoch to the slot. + if (Interlocked.CompareExchange(ref drainList[i].epoch, long.MaxValue - 1, triggerEpoch) == + triggerEpoch) { - ProtectAndDrain(); + var triggerAction = drainList[i].action; + drainList[i].action = onDrain; + drainList[i].epoch = priorEpoch; + triggerAction(); + break; } + } + } - i = 0; - Thread.Yield(); + if (++i == kDrainListSize) + { + // We are at the end of the drain list and found no empty or reclaimable slot. ProtectAndDrain, which should clear one or more slots. + if (!acquiredProtection && !ThisInstanceProtected()) + { + acquiredProtection = true; + Resume(); + } + else + { + ProtectAndDrain(); } + i = 0; + Thread.Yield(); } } - // If just bumping epoch without adding any actions, there is no need to drain - else if (drainCount == 0) - { - return nextEpoch; - } - + + if (!acquiredProtection && !ThisInstanceProtected()) { acquiredProtection = true;