Skip to content

Commit

Permalink
Minor issues in session mgmt, truncate, etc (#785)
Browse files Browse the repository at this point in the history
* Ensure active sessions count is never stuck at non-zero.

* Fix callback invocation

* Fix logic

* fix
  • Loading branch information
badrishc authored Jan 19, 2023
1 parent 4ab5597 commit 8155ba4
Show file tree
Hide file tree
Showing 4 changed files with 153 additions and 140 deletions.
38 changes: 30 additions & 8 deletions cs/remote/src/FASTER.server/Servers/FasterServerBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -83,13 +83,23 @@ public void Unregister(WireFormat wireFormat, out ISessionProvider provider)
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public bool AddSession(WireFormat protocol, ref ISessionProvider provider, INetworkSender networkSender, out IMessageConsumer session)
{
session = null;

if (Interlocked.Increment(ref activeSessionCount) <= 0)
{
session = null;
return false;

bool retVal = false;
try
{
session = provider.GetSession(protocol, networkSender);
retVal = activeSessions.TryAdd(session, default);
if (!retVal) Interlocked.Decrement(ref activeSessionCount);
}
catch
{
Interlocked.Decrement(ref activeSessionCount);
}
session = provider.GetSession(protocol, networkSender);
return activeSessions.TryAdd(session, default);
return retVal;
}

/// <inheritdoc />
Expand All @@ -116,8 +126,14 @@ internal void DisposeActiveSessions()
{
if (activeSessions.TryRemove(_session, out _))
{
_session.Dispose();
Interlocked.Decrement(ref activeSessionCount);
try
{
_session.Dispose();
}
finally
{
Interlocked.Decrement(ref activeSessionCount);
}
}
}
}
Expand All @@ -138,8 +154,14 @@ public void DisposeSession(IMessageConsumer _session)
{
if (activeSessions.TryRemove(_session, out _))
{
_session.Dispose();
Interlocked.Decrement(ref activeSessionCount);
try
{
_session.Dispose();
}
finally
{
Interlocked.Decrement(ref activeSessionCount);
}
}
}
}
Expand Down
4 changes: 3 additions & 1 deletion cs/src/core/Device/StorageDeviceBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -230,8 +230,10 @@ public void TruncateUntilSegment(int toSegment)
public virtual void TruncateUntilAddressAsync(long toAddress, AsyncCallback callback, IAsyncResult result)
{
if ((int)(toAddress >> segmentSizeBits) <= startSegment)
{
callback(result);
return;

}
// Truncate only up to segment boundary if address is not aligned
TruncateUntilSegmentAsync((int)(toAddress >> segmentSizeBits), callback, result);
}
Expand Down
135 changes: 65 additions & 70 deletions cs/src/core/Index/FASTER/Implementation/InternalRMW.cs
Original file line number Diff line number Diff line change
Expand Up @@ -154,96 +154,91 @@ internal OperationStatus InternalRMW<Input, Output, Context, FasterSession>(
if (sessionCtx.phase != Phase.REST)
{
latchDestination = AcquireLatchRMW(pendingContext, sessionCtx, ref stackCtx.hei, ref status, ref latchOperation, stackCtx.recSrc.LogicalAddress);
if (latchDestination == LatchDestination.Retry)
goto LatchRelease;
}
#endregion Entry latch operation if necessary

// We must use try/finally to ensure unlocking even in the presence of exceptions.
try
{
#region Normal processing
#region Address and source record checks

if (latchDestination == LatchDestination.NormalProcessing)
if (stackCtx.recSrc.HasReadCacheSrc)
{
if (stackCtx.recSrc.HasReadCacheSrc)
{
// Use the readcache record as the CopyUpdater source.
goto LockSourceRecord;
}
else if (stackCtx.recSrc.LogicalAddress >= hlog.ReadOnlyAddress)
{
// Mutable Region: Update the record in-place
srcRecordInfo = ref hlog.GetInfo(stackCtx.recSrc.PhysicalAddress);
if (!TryEphemeralXLock<Input, Output, Context, FasterSession>(fasterSession, ref stackCtx.recSrc, ref srcRecordInfo, out status))
goto LatchRelease;

if (srcRecordInfo.Tombstone)
goto CreateNewRecord;

if (!srcRecordInfo.IsValidUpdateOrLockSource)
{
EphemeralXUnlockAndAbandonUpdate<Input, Output, Context, FasterSession>(fasterSession, ref stackCtx.recSrc, ref srcRecordInfo);
srcRecordInfo = ref dummyRecordInfo;
goto CreateNewRecord;
}
// Use the readcache record as the CopyUpdater source.
goto LockSourceRecord;
}
else if (stackCtx.recSrc.LogicalAddress >= hlog.ReadOnlyAddress && latchDestination == LatchDestination.NormalProcessing)
{
// Mutable Region: Update the record in-place
srcRecordInfo = ref hlog.GetInfo(stackCtx.recSrc.PhysicalAddress);
if (!TryEphemeralXLock<Input, Output, Context, FasterSession>(fasterSession, ref stackCtx.recSrc, ref srcRecordInfo, out status))
goto LatchRelease;

rmwInfo.RecordInfo = srcRecordInfo;
if (fasterSession.InPlaceUpdater(ref key, ref input, ref hlog.GetValue(stackCtx.recSrc.PhysicalAddress), ref output, ref srcRecordInfo, ref rmwInfo, out status)
|| (rmwInfo.Action == RMWAction.ExpireAndStop))
{
this.MarkPage(stackCtx.recSrc.LogicalAddress, sessionCtx);

// ExpireAndStop means to override default Delete handling (which is to go to InitialUpdater) by leaving the tombstoned record as current.
// Our IFasterSession.InPlaceUpdater implementation has already reinitialized-in-place or set Tombstone as appropriate (inside the ephemeral lock)
// and marked the record.
pendingContext.recordInfo = srcRecordInfo;
pendingContext.logicalAddress = stackCtx.recSrc.LogicalAddress;
goto LatchRelease;
}
if (OperationStatusUtils.BasicOpCode(status) != OperationStatus.SUCCESS)
goto LatchRelease;
if (srcRecordInfo.Tombstone)
goto CreateNewRecord;

// InPlaceUpdater failed (e.g. insufficient space, another thread set Tombstone, etc). Use this record as the CopyUpdater source.
stackCtx.recSrc.HasMainLogSrc = true;
if (!srcRecordInfo.IsValidUpdateOrLockSource)
{
EphemeralXUnlockAndAbandonUpdate<Input, Output, Context, FasterSession>(fasterSession, ref stackCtx.recSrc, ref srcRecordInfo);
srcRecordInfo = ref dummyRecordInfo;
goto CreateNewRecord;
}
else if (stackCtx.recSrc.LogicalAddress >= hlog.SafeReadOnlyAddress && !hlog.GetInfo(stackCtx.recSrc.PhysicalAddress).Tombstone)

rmwInfo.RecordInfo = srcRecordInfo;
if (fasterSession.InPlaceUpdater(ref key, ref input, ref hlog.GetValue(stackCtx.recSrc.PhysicalAddress), ref output, ref srcRecordInfo, ref rmwInfo, out status)
|| (rmwInfo.Action == RMWAction.ExpireAndStop))
{
// Fuzzy Region: Must retry after epoch refresh, due to lost-update anomaly
status = OperationStatus.RETRY_LATER;
this.MarkPage(stackCtx.recSrc.LogicalAddress, sessionCtx);

// ExpireAndStop means to override default Delete handling (which is to go to InitialUpdater) by leaving the tombstoned record as current.
// Our IFasterSession.InPlaceUpdater implementation has already reinitialized-in-place or set Tombstone as appropriate (inside the ephemeral lock)
// and marked the record.
pendingContext.recordInfo = srcRecordInfo;
pendingContext.logicalAddress = stackCtx.recSrc.LogicalAddress;
goto LatchRelease;
}
else if (stackCtx.recSrc.LogicalAddress >= hlog.HeadAddress)
{
// Safe Read-Only Region: CopyUpdate to create a record in the mutable region
stackCtx.recSrc.HasMainLogSrc = true;
goto LockSourceRecord;
}
else if (stackCtx.recSrc.LogicalAddress >= hlog.BeginAddress)
{
// Disk Region: Need to issue async io requests. Locking will be check on pending completion.
status = OperationStatus.RECORD_ON_DISK;
latchDestination = LatchDestination.CreatePendingContext;
goto CreatePendingContext;
}
else
{
// No record exists - check for lock before creating new record.
Debug.Assert(!fasterSession.IsManualLocking || LockTable.IsLockedExclusive(ref key, stackCtx.hei.hash), "A Lockable-session RMW() of an on-disk or non-existent key requires a LockTable lock");
if (LockTable.IsActive && !fasterSession.DisableEphemeralLocking
&& !LockTable.TryLockEphemeral(ref key, stackCtx.hei.hash, LockType.Exclusive, out stackCtx.recSrc.HasLockTableLock))
{
status = OperationStatus.RETRY_LATER;
goto LatchRelease;
}
goto CreateNewRecord;
}
if (OperationStatusUtils.BasicOpCode(status) != OperationStatus.SUCCESS)
goto LatchRelease;

// InPlaceUpdater failed (e.g. insufficient space, another thread set Tombstone, etc). Use this record as the CopyUpdater source.
stackCtx.recSrc.HasMainLogSrc = true;
goto CreateNewRecord;
}
else if (latchDestination == LatchDestination.Retry)
else if (stackCtx.recSrc.LogicalAddress >= hlog.SafeReadOnlyAddress && !hlog.GetInfo(stackCtx.recSrc.PhysicalAddress).Tombstone && latchDestination == LatchDestination.NormalProcessing)
{
// Fuzzy Region: Must retry after epoch refresh, due to lost-update anomaly
status = OperationStatus.RETRY_LATER;
goto LatchRelease;
}
else if (stackCtx.recSrc.LogicalAddress >= hlog.HeadAddress)
{
// Safe Read-Only Region: CopyUpdate to create a record in the mutable region
stackCtx.recSrc.HasMainLogSrc = true;
goto LockSourceRecord;
}
else if (stackCtx.recSrc.LogicalAddress >= hlog.BeginAddress)
{
// Disk Region: Need to issue async io requests. Locking will be check on pending completion.
status = OperationStatus.RECORD_ON_DISK;
latchDestination = LatchDestination.CreatePendingContext;
goto CreatePendingContext;
}
else
{
// No record exists - check for lock before creating new record.
Debug.Assert(!fasterSession.IsManualLocking || LockTable.IsLockedExclusive(ref key, stackCtx.hei.hash), "A Lockable-session RMW() of an on-disk or non-existent key requires a LockTable lock");
if (LockTable.IsActive && !fasterSession.DisableEphemeralLocking
&& !LockTable.TryLockEphemeral(ref key, stackCtx.hei.hash, LockType.Exclusive, out stackCtx.recSrc.HasLockTableLock))
{
status = OperationStatus.RETRY_LATER;
goto LatchRelease;
}
goto CreateNewRecord;
}

#endregion Normal processing
#endregion Address and source record checks

#region Lock source record
LockSourceRecord:
Expand Down
Loading

0 comments on commit 8155ba4

Please sign in to comment.