From 8155ba42a7a469155e7f06ff0708861ddc61a9e9 Mon Sep 17 00:00:00 2001 From: Badrish Chandramouli Date: Wed, 18 Jan 2023 19:16:51 -0800 Subject: [PATCH] Minor issues in session mgmt, truncate, etc (#785) * Ensure active sessions count is never stuck at non-zero. * Fix callback invocation * Fix logic * fix --- .../FASTER.server/Servers/FasterServerBase.cs | 38 +++-- cs/src/core/Device/StorageDeviceBase.cs | 4 +- .../FASTER/Implementation/InternalRMW.cs | 135 +++++++++--------- .../FASTER/Implementation/InternalUpsert.cs | 116 +++++++-------- 4 files changed, 153 insertions(+), 140 deletions(-) diff --git a/cs/remote/src/FASTER.server/Servers/FasterServerBase.cs b/cs/remote/src/FASTER.server/Servers/FasterServerBase.cs index d1d4af835..da6c172dd 100644 --- a/cs/remote/src/FASTER.server/Servers/FasterServerBase.cs +++ b/cs/remote/src/FASTER.server/Servers/FasterServerBase.cs @@ -83,13 +83,23 @@ public void Unregister(WireFormat wireFormat, out ISessionProvider provider) [MethodImpl(MethodImplOptions.AggressiveInlining)] public bool AddSession(WireFormat protocol, ref ISessionProvider provider, INetworkSender networkSender, out IMessageConsumer session) { + session = null; + if (Interlocked.Increment(ref activeSessionCount) <= 0) - { - session = null; return false; + + bool retVal = false; + try + { + session = provider.GetSession(protocol, networkSender); + retVal = activeSessions.TryAdd(session, default); + if (!retVal) Interlocked.Decrement(ref activeSessionCount); + } + catch + { + Interlocked.Decrement(ref activeSessionCount); } - session = provider.GetSession(protocol, networkSender); - return activeSessions.TryAdd(session, default); + return retVal; } /// @@ -116,8 +126,14 @@ internal void DisposeActiveSessions() { if (activeSessions.TryRemove(_session, out _)) { - _session.Dispose(); - Interlocked.Decrement(ref activeSessionCount); + try + { + _session.Dispose(); + } + finally + { + Interlocked.Decrement(ref activeSessionCount); + } } } } @@ -138,8 +154,14 @@ public void DisposeSession(IMessageConsumer _session) { if (activeSessions.TryRemove(_session, out _)) { - _session.Dispose(); - Interlocked.Decrement(ref activeSessionCount); + try + { + _session.Dispose(); + } + finally + { + Interlocked.Decrement(ref activeSessionCount); + } } } } diff --git a/cs/src/core/Device/StorageDeviceBase.cs b/cs/src/core/Device/StorageDeviceBase.cs index 7148acdd1..de7f8d2ec 100644 --- a/cs/src/core/Device/StorageDeviceBase.cs +++ b/cs/src/core/Device/StorageDeviceBase.cs @@ -230,8 +230,10 @@ public void TruncateUntilSegment(int toSegment) public virtual void TruncateUntilAddressAsync(long toAddress, AsyncCallback callback, IAsyncResult result) { if ((int)(toAddress >> segmentSizeBits) <= startSegment) + { + callback(result); return; - + } // Truncate only up to segment boundary if address is not aligned TruncateUntilSegmentAsync((int)(toAddress >> segmentSizeBits), callback, result); } diff --git a/cs/src/core/Index/FASTER/Implementation/InternalRMW.cs b/cs/src/core/Index/FASTER/Implementation/InternalRMW.cs index 6c695cf0e..a84d7bd59 100644 --- a/cs/src/core/Index/FASTER/Implementation/InternalRMW.cs +++ b/cs/src/core/Index/FASTER/Implementation/InternalRMW.cs @@ -154,96 +154,91 @@ internal OperationStatus InternalRMW( if (sessionCtx.phase != Phase.REST) { latchDestination = AcquireLatchRMW(pendingContext, sessionCtx, ref stackCtx.hei, ref status, ref latchOperation, stackCtx.recSrc.LogicalAddress); + if (latchDestination == LatchDestination.Retry) + goto LatchRelease; } #endregion Entry latch operation if necessary // We must use try/finally to ensure unlocking even in the presence of exceptions. try { - #region Normal processing + #region Address and source record checks - if (latchDestination == LatchDestination.NormalProcessing) + if (stackCtx.recSrc.HasReadCacheSrc) { - if (stackCtx.recSrc.HasReadCacheSrc) - { - // Use the readcache record as the CopyUpdater source. - goto LockSourceRecord; - } - else if (stackCtx.recSrc.LogicalAddress >= hlog.ReadOnlyAddress) - { - // Mutable Region: Update the record in-place - srcRecordInfo = ref hlog.GetInfo(stackCtx.recSrc.PhysicalAddress); - if (!TryEphemeralXLock(fasterSession, ref stackCtx.recSrc, ref srcRecordInfo, out status)) - goto LatchRelease; - - if (srcRecordInfo.Tombstone) - goto CreateNewRecord; - - if (!srcRecordInfo.IsValidUpdateOrLockSource) - { - EphemeralXUnlockAndAbandonUpdate(fasterSession, ref stackCtx.recSrc, ref srcRecordInfo); - srcRecordInfo = ref dummyRecordInfo; - goto CreateNewRecord; - } + // Use the readcache record as the CopyUpdater source. + goto LockSourceRecord; + } + else if (stackCtx.recSrc.LogicalAddress >= hlog.ReadOnlyAddress && latchDestination == LatchDestination.NormalProcessing) + { + // Mutable Region: Update the record in-place + srcRecordInfo = ref hlog.GetInfo(stackCtx.recSrc.PhysicalAddress); + if (!TryEphemeralXLock(fasterSession, ref stackCtx.recSrc, ref srcRecordInfo, out status)) + goto LatchRelease; - rmwInfo.RecordInfo = srcRecordInfo; - if (fasterSession.InPlaceUpdater(ref key, ref input, ref hlog.GetValue(stackCtx.recSrc.PhysicalAddress), ref output, ref srcRecordInfo, ref rmwInfo, out status) - || (rmwInfo.Action == RMWAction.ExpireAndStop)) - { - this.MarkPage(stackCtx.recSrc.LogicalAddress, sessionCtx); - - // ExpireAndStop means to override default Delete handling (which is to go to InitialUpdater) by leaving the tombstoned record as current. - // Our IFasterSession.InPlaceUpdater implementation has already reinitialized-in-place or set Tombstone as appropriate (inside the ephemeral lock) - // and marked the record. - pendingContext.recordInfo = srcRecordInfo; - pendingContext.logicalAddress = stackCtx.recSrc.LogicalAddress; - goto LatchRelease; - } - if (OperationStatusUtils.BasicOpCode(status) != OperationStatus.SUCCESS) - goto LatchRelease; + if (srcRecordInfo.Tombstone) + goto CreateNewRecord; - // InPlaceUpdater failed (e.g. insufficient space, another thread set Tombstone, etc). Use this record as the CopyUpdater source. - stackCtx.recSrc.HasMainLogSrc = true; + if (!srcRecordInfo.IsValidUpdateOrLockSource) + { + EphemeralXUnlockAndAbandonUpdate(fasterSession, ref stackCtx.recSrc, ref srcRecordInfo); + srcRecordInfo = ref dummyRecordInfo; goto CreateNewRecord; } - else if (stackCtx.recSrc.LogicalAddress >= hlog.SafeReadOnlyAddress && !hlog.GetInfo(stackCtx.recSrc.PhysicalAddress).Tombstone) + + rmwInfo.RecordInfo = srcRecordInfo; + if (fasterSession.InPlaceUpdater(ref key, ref input, ref hlog.GetValue(stackCtx.recSrc.PhysicalAddress), ref output, ref srcRecordInfo, ref rmwInfo, out status) + || (rmwInfo.Action == RMWAction.ExpireAndStop)) { - // Fuzzy Region: Must retry after epoch refresh, due to lost-update anomaly - status = OperationStatus.RETRY_LATER; + this.MarkPage(stackCtx.recSrc.LogicalAddress, sessionCtx); + + // ExpireAndStop means to override default Delete handling (which is to go to InitialUpdater) by leaving the tombstoned record as current. + // Our IFasterSession.InPlaceUpdater implementation has already reinitialized-in-place or set Tombstone as appropriate (inside the ephemeral lock) + // and marked the record. + pendingContext.recordInfo = srcRecordInfo; + pendingContext.logicalAddress = stackCtx.recSrc.LogicalAddress; goto LatchRelease; } - else if (stackCtx.recSrc.LogicalAddress >= hlog.HeadAddress) - { - // Safe Read-Only Region: CopyUpdate to create a record in the mutable region - stackCtx.recSrc.HasMainLogSrc = true; - goto LockSourceRecord; - } - else if (stackCtx.recSrc.LogicalAddress >= hlog.BeginAddress) - { - // Disk Region: Need to issue async io requests. Locking will be check on pending completion. - status = OperationStatus.RECORD_ON_DISK; - latchDestination = LatchDestination.CreatePendingContext; - goto CreatePendingContext; - } - else - { - // No record exists - check for lock before creating new record. - Debug.Assert(!fasterSession.IsManualLocking || LockTable.IsLockedExclusive(ref key, stackCtx.hei.hash), "A Lockable-session RMW() of an on-disk or non-existent key requires a LockTable lock"); - if (LockTable.IsActive && !fasterSession.DisableEphemeralLocking - && !LockTable.TryLockEphemeral(ref key, stackCtx.hei.hash, LockType.Exclusive, out stackCtx.recSrc.HasLockTableLock)) - { - status = OperationStatus.RETRY_LATER; - goto LatchRelease; - } - goto CreateNewRecord; - } + if (OperationStatusUtils.BasicOpCode(status) != OperationStatus.SUCCESS) + goto LatchRelease; + + // InPlaceUpdater failed (e.g. insufficient space, another thread set Tombstone, etc). Use this record as the CopyUpdater source. + stackCtx.recSrc.HasMainLogSrc = true; + goto CreateNewRecord; } - else if (latchDestination == LatchDestination.Retry) + else if (stackCtx.recSrc.LogicalAddress >= hlog.SafeReadOnlyAddress && !hlog.GetInfo(stackCtx.recSrc.PhysicalAddress).Tombstone && latchDestination == LatchDestination.NormalProcessing) { + // Fuzzy Region: Must retry after epoch refresh, due to lost-update anomaly + status = OperationStatus.RETRY_LATER; goto LatchRelease; } + else if (stackCtx.recSrc.LogicalAddress >= hlog.HeadAddress) + { + // Safe Read-Only Region: CopyUpdate to create a record in the mutable region + stackCtx.recSrc.HasMainLogSrc = true; + goto LockSourceRecord; + } + else if (stackCtx.recSrc.LogicalAddress >= hlog.BeginAddress) + { + // Disk Region: Need to issue async io requests. Locking will be check on pending completion. + status = OperationStatus.RECORD_ON_DISK; + latchDestination = LatchDestination.CreatePendingContext; + goto CreatePendingContext; + } + else + { + // No record exists - check for lock before creating new record. + Debug.Assert(!fasterSession.IsManualLocking || LockTable.IsLockedExclusive(ref key, stackCtx.hei.hash), "A Lockable-session RMW() of an on-disk or non-existent key requires a LockTable lock"); + if (LockTable.IsActive && !fasterSession.DisableEphemeralLocking + && !LockTable.TryLockEphemeral(ref key, stackCtx.hei.hash, LockType.Exclusive, out stackCtx.recSrc.HasLockTableLock)) + { + status = OperationStatus.RETRY_LATER; + goto LatchRelease; + } + goto CreateNewRecord; + } - #endregion Normal processing + #endregion Address and source record checks #region Lock source record LockSourceRecord: diff --git a/cs/src/core/Index/FASTER/Implementation/InternalUpsert.cs b/cs/src/core/Index/FASTER/Implementation/InternalUpsert.cs index 4816f7ba1..8cefa6a93 100644 --- a/cs/src/core/Index/FASTER/Implementation/InternalUpsert.cs +++ b/cs/src/core/Index/FASTER/Implementation/InternalUpsert.cs @@ -83,86 +83,80 @@ internal OperationStatus InternalUpsert( if (sessionCtx.phase != Phase.REST) { latchDestination = AcquireLatchUpsert(sessionCtx, ref stackCtx.hei, ref status, ref latchOperation, stackCtx.recSrc.LogicalAddress); + if (latchDestination == LatchDestination.Retry) + goto LatchRelease; } #endregion // We must use try/finally to ensure unlocking even in the presence of exceptions. try { - #region Normal processing + #region Address and source record checks - if (latchDestination == LatchDestination.NormalProcessing) + if (stackCtx.recSrc.HasReadCacheSrc) { - if (stackCtx.recSrc.HasReadCacheSrc) - { - // Use the readcache record as the CopyUpdater source. - goto LockSourceRecord; - } - else if (stackCtx.recSrc.LogicalAddress >= hlog.ReadOnlyAddress) - { - // Mutable Region: Update the record in-place - srcRecordInfo = ref hlog.GetInfo(stackCtx.recSrc.PhysicalAddress); - if (!TryEphemeralXLock(fasterSession, ref stackCtx.recSrc, ref srcRecordInfo, out status)) - goto LatchRelease; - - if (srcRecordInfo.Tombstone) - goto CreateNewRecord; - - if (!srcRecordInfo.IsValidUpdateOrLockSource) - { - EphemeralXUnlockAndAbandonUpdate(fasterSession, ref stackCtx.recSrc, ref srcRecordInfo); - srcRecordInfo = ref dummyRecordInfo; - goto CreateNewRecord; - } + // Use the readcache record as the CopyUpdater source. + goto LockSourceRecord; + } + else if (stackCtx.recSrc.LogicalAddress >= hlog.ReadOnlyAddress && latchDestination == LatchDestination.NormalProcessing) + { + // Mutable Region: Update the record in-place + // We perform mutable updates only if we are in normal processing phase of checkpointing + srcRecordInfo = ref hlog.GetInfo(stackCtx.recSrc.PhysicalAddress); + if (!TryEphemeralXLock(fasterSession, ref stackCtx.recSrc, ref srcRecordInfo, out status)) + goto LatchRelease; - upsertInfo.RecordInfo = srcRecordInfo; - ref Value recordValue = ref hlog.GetValue(stackCtx.recSrc.PhysicalAddress); - if (fasterSession.ConcurrentWriter(ref key, ref input, ref value, ref recordValue, ref output, ref srcRecordInfo, ref upsertInfo)) - { - this.MarkPage(stackCtx.recSrc.LogicalAddress, sessionCtx); - pendingContext.recordInfo = srcRecordInfo; - pendingContext.logicalAddress = stackCtx.recSrc.LogicalAddress; - status = OperationStatusUtils.AdvancedOpCode(OperationStatus.SUCCESS, StatusCode.InPlaceUpdatedRecord); - goto LatchRelease; - } - if (upsertInfo.Action == UpsertAction.CancelOperation) - { - status = OperationStatus.CANCELED; - goto LatchRelease; - } + if (srcRecordInfo.Tombstone) + goto CreateNewRecord; - // ConcurrentWriter failed (e.g. insufficient space, another thread set Tombstone, etc). Write a new record, but track that we have to seal and unlock this one. - stackCtx.recSrc.HasMainLogSrc = true; + if (!srcRecordInfo.IsValidUpdateOrLockSource) + { + EphemeralXUnlockAndAbandonUpdate(fasterSession, ref stackCtx.recSrc, ref srcRecordInfo); + srcRecordInfo = ref dummyRecordInfo; goto CreateNewRecord; } - else if (stackCtx.recSrc.LogicalAddress >= hlog.HeadAddress) + + upsertInfo.RecordInfo = srcRecordInfo; + ref Value recordValue = ref hlog.GetValue(stackCtx.recSrc.PhysicalAddress); + if (fasterSession.ConcurrentWriter(ref key, ref input, ref value, ref recordValue, ref output, ref srcRecordInfo, ref upsertInfo)) { - // Only need to go below ReadOnly for locking and Sealing. - stackCtx.recSrc.HasMainLogSrc = true; - goto LockSourceRecord; + this.MarkPage(stackCtx.recSrc.LogicalAddress, sessionCtx); + pendingContext.recordInfo = srcRecordInfo; + pendingContext.logicalAddress = stackCtx.recSrc.LogicalAddress; + status = OperationStatusUtils.AdvancedOpCode(OperationStatus.SUCCESS, StatusCode.InPlaceUpdatedRecord); + goto LatchRelease; } - else + if (upsertInfo.Action == UpsertAction.CancelOperation) { - // Either on-disk or no record exists - check for lock before creating new record. First ensure any record lock has transitioned to the LockTable. - if (stackCtx.recSrc.LogicalAddress >= hlog.BeginAddress) - SpinWaitUntilRecordIsClosed(ref key, stackCtx.hei.hash, stackCtx.recSrc.LogicalAddress, hlog); - Debug.Assert(!fasterSession.IsManualLocking || LockTable.IsLockedExclusive(ref key, stackCtx.hei.hash), "A Lockable-session Upsert() of an on-disk or non-existent key requires a LockTable lock"); - if (LockTable.IsActive && !fasterSession.DisableEphemeralLocking - && !LockTable.TryLockEphemeral(ref key, stackCtx.hei.hash, LockType.Exclusive, out stackCtx.recSrc.HasLockTableLock)) - { - status = OperationStatus.RETRY_LATER; - goto LatchRelease; - } - goto CreateNewRecord; + status = OperationStatus.CANCELED; + goto LatchRelease; } + + // ConcurrentWriter failed (e.g. insufficient space, another thread set Tombstone, etc). Write a new record, but track that we have to seal and unlock this one. + stackCtx.recSrc.HasMainLogSrc = true; + goto CreateNewRecord; } - else if (latchDestination == LatchDestination.Retry) + else if (stackCtx.recSrc.LogicalAddress >= hlog.HeadAddress) { - goto LatchRelease; + // Only need to go below ReadOnly for locking and Sealing. + stackCtx.recSrc.HasMainLogSrc = true; + goto LockSourceRecord; } - - // All other regions: Create a record in the mutable region - #endregion Normal processing + else + { + // Either on-disk or no record exists - check for lock before creating new record. First ensure any record lock has transitioned to the LockTable. + if (stackCtx.recSrc.LogicalAddress >= hlog.BeginAddress) + SpinWaitUntilRecordIsClosed(ref key, stackCtx.hei.hash, stackCtx.recSrc.LogicalAddress, hlog); + Debug.Assert(!fasterSession.IsManualLocking || LockTable.IsLockedExclusive(ref key, stackCtx.hei.hash), "A Lockable-session Upsert() of an on-disk or non-existent key requires a LockTable lock"); + if (LockTable.IsActive && !fasterSession.DisableEphemeralLocking + && !LockTable.TryLockEphemeral(ref key, stackCtx.hei.hash, LockType.Exclusive, out stackCtx.recSrc.HasLockTableLock)) + { + status = OperationStatus.RETRY_LATER; + goto LatchRelease; + } + goto CreateNewRecord; + } + #endregion Address and source record checks #region Lock source record LockSourceRecord: