Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add WaitAsync overloads with timeouts #515

Merged
merged 2 commits into from
Jan 18, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Package/Core/InternalShared/PoolInternal.cs
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ internal static void AssertAllObjectsReleased()
{
// Some objects could be used on a ThreadPool thread that we can't control, like Promise.Delay using system timer.
// Wait a bit of time for it to settle before asserting.
System.Threading.SpinWait.SpinUntil(() => s_inUseObjects.Count == 0, TimeSpan.FromSeconds(Environment.ProcessorCount));
System.Threading.SpinWait.SpinUntil(() => s_inUseObjects.Count == 0, TimeSpan.FromSeconds(1));
lock (s_pooledObjects)
{
if (s_inUseObjects.Count > 0)
Expand Down
120 changes: 99 additions & 21 deletions Package/Core/Promises/Internal/CallbackHelperInternal.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@

#pragma warning disable IDE0074 // Use compound assignment

using Proto.Timers;
using System;
using System.Diagnostics;
using System.Runtime.CompilerServices;
using System.Threading;

namespace Proto.Promises
{
Expand Down Expand Up @@ -272,23 +274,61 @@ internal static Promise<TResult> Duplicate(Promise<TResult> _this)

internal static Promise<TResult> WaitAsync(Promise<TResult> _this, CancelationToken cancelationToken)
{
if (_this._ref == null)
if (cancelationToken.IsCancelationRequested)
{
return cancelationToken.IsCancelationRequested
? Promise<TResult>.Canceled()
: _this;
return Canceled(_this._ref, _this._id);
}
PromiseRef<TResult> promise;
if (cancelationToken.CanBeCanceled)
if (_this._ref?.State != Promise.State.Pending || !cancelationToken.CanBeCanceled)
{
var p = PromiseDuplicateCancel<TResult>.GetOrCreate();
promise = _this._ref.HookupCancelablePromise(p, _this._id, cancelationToken, ref p._cancelationHelper);
return Duplicate(_this);
}
else
var promise = WaitAsyncWithCancelationPromise<TResult>.GetOrCreate();
_this._ref.HookupCancelablePromise(promise, _this._id, cancelationToken, ref promise._cancelationHelper);
return new Promise<TResult>(promise, promise.Id, _this._result);
}

internal static Promise<TResult> WaitAsync(Promise<TResult> _this, TimeSpan timeout, TimerFactory timerFactory)
{
if (_this._ref?.State != Promise.State.Pending || timeout == Timeout.InfiniteTimeSpan)
{
promise = _this._ref.GetDuplicateT(_this._id);
return Duplicate(_this);
}
return new Promise<TResult>(promise, promise.Id, _this._result);
if (timeout == TimeSpan.Zero)
{
_this._ref?.MaybeMarkAwaitedAndDispose(_this._id);
return Promise<TResult>.Rejected(new TimeoutException());
}
var promise = WaitAsyncWithTimeoutPromise<TResult>.GetOrCreate(timeout, timerFactory);
_this._ref.HookupNewPromise(_this._id, promise);
return new Promise<TResult>(promise, promise.Id);
}

internal static Promise<TResult> WaitAsync(Promise<TResult> _this, TimeSpan timeout, TimerFactory timerFactory, CancelationToken cancelationToken)
{
if (!cancelationToken.CanBeCanceled)
{
return WaitAsync(_this, timeout, timerFactory);
}
if (timeout == Timeout.InfiniteTimeSpan)
{
return WaitAsync(_this, cancelationToken);
}

if (cancelationToken.IsCancelationRequested)
{
return Canceled(_this._ref, _this._id);
}
if (_this._ref?.State != Promise.State.Pending)
{
return Duplicate(_this);
}
if (timeout == TimeSpan.Zero)
{
_this._ref?.MaybeMarkAwaitedAndDispose(_this._id);
return Promise<TResult>.Rejected(new TimeoutException());
}
var promise = WaitAsyncWithTimeoutAndCancelationPromise<TResult>.GetOrCreateAndHookup(_this._ref, _this._id, timeout, timerFactory, cancelationToken);
return new Promise<TResult>(promise, promise.Id);
}

internal static Promise<TResult> ConfigureContinuation(Promise<TResult> _this, ContinuationOptions continuationOptions)
Expand Down Expand Up @@ -778,22 +818,60 @@ internal static Promise Duplicate(Promise _this)

internal static Promise WaitAsync(Promise _this, CancelationToken cancelationToken)
{
if (_this._ref == null)
if (cancelationToken.IsCancelationRequested)
{
return cancelationToken.IsCancelationRequested
? Promise.Canceled()
: _this;
return Canceled(_this._ref, _this._id);
}
PromiseRefBase promise;
if (cancelationToken.CanBeCanceled)
if (_this._ref?.State != Promise.State.Pending || !cancelationToken.CanBeCanceled)
{
var p = PromiseDuplicateCancel<VoidResult>.GetOrCreate();
promise = _this._ref.HookupCancelablePromise(p, _this._id, cancelationToken, ref p._cancelationHelper);
return Duplicate(_this);
}
else
var promise = WaitAsyncWithCancelationPromise<VoidResult>.GetOrCreate();
_this._ref.HookupCancelablePromise(promise, _this._id, cancelationToken, ref promise._cancelationHelper);
return new Promise(promise, promise.Id);
}

internal static Promise WaitAsync(Promise _this, TimeSpan timeout, TimerFactory timerFactory)
{
if (_this._ref?.State != Promise.State.Pending || timeout == Timeout.InfiniteTimeSpan)
{
return Duplicate(_this);
}
if (timeout == TimeSpan.Zero)
{
_this._ref?.MaybeMarkAwaitedAndDispose(_this._id);
return Promise.Rejected(new TimeoutException());
}
var promise = WaitAsyncWithTimeoutPromise<VoidResult>.GetOrCreate(timeout, timerFactory);
_this._ref.HookupNewPromise(_this._id, promise);
return new Promise(promise, promise.Id);
}

internal static Promise WaitAsync(Promise _this, TimeSpan timeout, TimerFactory timerFactory, CancelationToken cancelationToken)
{
if (!cancelationToken.CanBeCanceled)
{
return WaitAsync(_this, timeout, timerFactory);
}
if (timeout == Timeout.InfiniteTimeSpan)
{
return WaitAsync(_this, cancelationToken);
}

if (cancelationToken.IsCancelationRequested)
{
return Canceled(_this._ref, _this._id);
}
if (_this._ref?.State != Promise.State.Pending)
{
return Duplicate(_this);
}
if (timeout == TimeSpan.Zero)
{
promise = _this._ref.GetDuplicate(_this._id);
_this._ref?.MaybeMarkAwaitedAndDispose(_this._id);
return Promise.Rejected(new TimeoutException());
}
var promise = WaitAsyncWithTimeoutAndCancelationPromise<VoidResult>.GetOrCreateAndHookup(_this._ref, _this._id, timeout, timerFactory, cancelationToken);
return new Promise(promise, promise.Id);
}

Expand Down
62 changes: 0 additions & 62 deletions Package/Core/Promises/Internal/CancelInternal.cs
Original file line number Diff line number Diff line change
Expand Up @@ -93,68 +93,6 @@ internal void ReleaseOne()
=> _retainCounter = 1;
}

#if !PROTO_PROMISE_DEVELOPER_MODE
[DebuggerNonUserCode, StackTraceHidden]
#endif
internal sealed partial class PromiseDuplicateCancel<TResult> : PromiseSingleAwait<TResult>, ICancelable
{
private PromiseDuplicateCancel() { }

internal override void MaybeDispose()
{
if (_cancelationHelper.TryRelease())
{
Dispose();
_cancelationHelper = default;
ObjectPool.MaybeRepool(this);
}
}

[MethodImpl(InlineOption)]
private static PromiseDuplicateCancel<TResult> GetOrCreateInstance()
{
var obj = ObjectPool.TryTakeOrInvalid<PromiseDuplicateCancel<TResult>>();
return obj == InvalidAwaitSentinel.s_instance
? new PromiseDuplicateCancel<TResult>()
: obj.UnsafeAs<PromiseDuplicateCancel<TResult>>();
}

[MethodImpl(InlineOption)]
internal static PromiseDuplicateCancel<TResult> GetOrCreate()
{
var promise = GetOrCreateInstance();
promise.Reset();
promise._cancelationHelper.Reset();
return promise;
}

internal override void Handle(PromiseRefBase handler, Promise.State state)
{
ThrowIfInPool(this);
handler.SetCompletionState(state);
if (_cancelationHelper.TrySetCompleted())
{
_cancelationHelper.UnregisterAndWait();
_cancelationHelper.ReleaseOne();
HandleSelf(handler, state);
}
else
{
MaybeDispose();
handler.MaybeReportUnhandledAndDispose(state);
}
}

void ICancelable.Cancel()
{
ThrowIfInPool(this);
if (_cancelationHelper.TrySetCompleted())
{
HandleNextInternal(Promise.State.Canceled);
}
}
}

#if !PROTO_PROMISE_DEVELOPER_MODE
[DebuggerNonUserCode, StackTraceHidden]
#endif
Expand Down
82 changes: 34 additions & 48 deletions Package/Core/Promises/Internal/DelayInternal.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,30 +21,6 @@ partial class PromiseRefBase
#endif
internal sealed partial class DelayPromise : PromiseSingleAwait<VoidResult>
{
// Use ITimerSource and int directly instead of the Timer struct
// so that the fields can be packed efficiently without extra padding.
private ITimerSource _timerSource;
private int _timerToken;
// The timer callback can be invoked before the fields are actually assigned,
// so we use an Interlocked counter to ensure it is disposed properly.
private int _timerUseCounter;

private void MaybeDisposeTimer()
{
ThrowIfInPool(this);
if (InterlockedAddWithUnsignedOverflowCheck(ref _timerUseCounter, -1) == 0)
{
_timerSource.DisposeAsync(_timerToken).Forget();
}
}

internal override void MaybeDispose()
{
Dispose();
_timerSource = null;
ObjectPool.MaybeRepool(this);
}

[MethodImpl(InlineOption)]
private static DelayPromise GetFromPoolOrCreate()
{
Expand Down Expand Up @@ -73,42 +49,34 @@ internal static DelayPromise GetOrCreate(TimeSpan delay, TimerFactory timerFacto
return promise;
}

private void OnTimerCallback()
{
MaybeDisposeTimer();
HandleNextInternal(Promise.State.Resolved);
}
}

#if !PROTO_PROMISE_DEVELOPER_MODE
[DebuggerNonUserCode, StackTraceHidden]
#endif
internal sealed partial class DelayWithCancelationPromise : PromiseSingleAwait<VoidResult>, ICancelable
{
private Timers.Timer _timer;
// The timer and cancelation callbacks can race on different threads,
// and can be invoked before the fields are actually assigned;
// we use CancelationHelper to make sure they are used and disposed properly.
private CancelationHelper _cancelationHelper;

private void MaybeDisposeFields()
private void MaybeDisposeTimer()
{
ThrowIfInPool(this);
if (_cancelationHelper.TryRelease())
if (InterlockedAddWithUnsignedOverflowCheck(ref _timerUseCounter, -1) == 0)
{
_cancelationHelper.UnregisterAndWait();
_timer.DisposeAsync().Forget();
_timerSource.DisposeAsync(_timerToken).Forget();
}
}

internal override void MaybeDispose()
{
Dispose();
_cancelationHelper = default;
_timer = default;
_timerSource = null;
ObjectPool.MaybeRepool(this);
}

private void OnTimerCallback()
{
MaybeDisposeTimer();
HandleNextInternal(Promise.State.Resolved);
}
}

#if !PROTO_PROMISE_DEVELOPER_MODE
[DebuggerNonUserCode, StackTraceHidden]
#endif
internal sealed partial class DelayWithCancelationPromise : PromiseSingleAwait<VoidResult>, ICancelable
{
[MethodImpl(InlineOption)]
private static DelayWithCancelationPromise GetFromPoolOrCreate()
{
Expand Down Expand Up @@ -138,6 +106,24 @@ internal static DelayWithCancelationPromise GetOrCreate(TimeSpan delay, TimerFac
return promise;
}

private void MaybeDisposeFields()
{
ThrowIfInPool(this);
if (_cancelationHelper.TryRelease())
{
_cancelationHelper.UnregisterAndWait();
_timer.DisposeAsync().Forget();
}
}

internal override void MaybeDispose()
{
Dispose();
_cancelationHelper = default;
_timer = default;
ObjectPool.MaybeRepool(this);
}

private void OnTimerCallback()
{
ThrowIfInPool(this);
Expand Down
Loading
Loading