From 7b537974c920bcfd5c8d3a42585c39f477c4926f Mon Sep 17 00:00:00 2001 From: Tim Date: Sun, 5 Nov 2023 16:55:49 -0500 Subject: [PATCH 1/3] Fix `AsyncLocal` in `Promise.Parallel*` body. --- .../Promises/Internal/ParallelInternal.cs | 38 +++++++++++++-- .../Tests/CoreTests/APIs/ParallelForTests.cs | 48 +++++++++++++++++++ 2 files changed, 82 insertions(+), 4 deletions(-) diff --git a/Package/Core/Promises/Internal/ParallelInternal.cs b/Package/Core/Promises/Internal/ParallelInternal.cs index ca4715b8..fdba85ed 100644 --- a/Package/Core/Promises/Internal/ParallelInternal.cs +++ b/Package/Core/Promises/Internal/ParallelInternal.cs @@ -230,6 +230,7 @@ internal sealed partial class PromiseParallelForEach _exceptions; @@ -257,6 +258,10 @@ internal static PromiseParallelForEach GetO promise._completionState = Promise.State.Resolved; promise._cancelationRef = CancelationRef.GetOrCreate(); cancelationToken.TryRegister(promise, out promise._externalCancelationRegistration); + if (Promise.Config.AsyncFlowExecutionContextEnabled) + { + promise._executionContext = ExecutionContext.Capture(); + } return promise; } @@ -265,6 +270,7 @@ internal override void MaybeDispose() Dispose(); _body = default(TParallelBody); _synchronizationContext = null; + _executionContext = null; ObjectPool.MaybeRepool(this); } @@ -283,12 +289,36 @@ internal void MaybeLaunchWorker(bool launchWorker) InterlockedAddWithUnsignedOverflowCheck(ref _waitCounter, 1); ScheduleContextCallback(_synchronizationContext, this, - obj => obj.UnsafeAs>().ExecuteWorker(true), - obj => obj.UnsafeAs>().ExecuteWorker(true) + obj => obj.UnsafeAs>().ExecuteWorkerAndLaunchNext(), + obj => obj.UnsafeAs>().ExecuteWorkerAndLaunchNext() ); } } + private void ExecuteWorkerAndLaunchNext() + { + if (_executionContext == null) + { + ExecuteWorker(true); + } + else + { + ExecutionContext.Run(_executionContext, obj => obj.UnsafeAs>().ExecuteWorker(true), this); + } + } + + private void ExecuteWorkerWithoutLaunchNext() + { + if (_executionContext == null) + { + ExecuteWorker(false); + } + else + { + ExecutionContext.Run(_executionContext, obj => obj.UnsafeAs>().ExecuteWorker(false), this); + } + } + private void ExecuteWorker(bool launchNext) { var currentContext = ts_currentContext; @@ -362,8 +392,8 @@ internal override void Handle(PromiseRefBase handler, object rejectContainer, Pr { // Schedule the worker body to run again on the context, but without launching another worker. ScheduleContextCallback(_synchronizationContext, this, - obj => obj.UnsafeAs>().ExecuteWorker(false), - obj => obj.UnsafeAs>().ExecuteWorker(false) + obj => obj.UnsafeAs>().ExecuteWorkerWithoutLaunchNext(), + obj => obj.UnsafeAs>().ExecuteWorkerWithoutLaunchNext() ); } else if (state == Promise.State.Canceled) diff --git a/Package/Tests/CoreTests/APIs/ParallelForTests.cs b/Package/Tests/CoreTests/APIs/ParallelForTests.cs index 4932a22e..27c933e1 100644 --- a/Package/Tests/CoreTests/APIs/ParallelForTests.cs +++ b/Package/Tests/CoreTests/APIs/ParallelForTests.cs @@ -529,6 +529,54 @@ public void ParallelFor_AllIndicesEnumeratedOnce_WithCaptureValue_Sync( Assert.True(set.Contains(i)); } } + +#if CSHARP_7_3_OR_NEWER + [Test] + public void ParallelFor_ExecutionContextFlowsToWorkerBodies( + [Values] bool foregroundContext) + { + Promise.Config.AsyncFlowExecutionContextEnabled = true; + var context = foregroundContext + ? (SynchronizationContext) TestHelper._foregroundContext + : TestHelper._backgroundContext; + + var al = new AsyncLocal(); + al.Value = 42; + Promise.ParallelFor(0, 100, async (item, cancellationToken) => + { + await Promise.SwitchToForegroundAwait(forceAsync: true); + Assert.AreEqual(42, al.Value); + }) + .WaitWithTimeoutWhileExecutingForegroundContext(TimeSpan.FromSeconds(Environment.ProcessorCount)); + } + + private static IEnumerable Iterate100() + { + for (int i = 0; i < 100; i++) + { + yield return i; + } + } + + [Test] + public void ParallelForEach_ExecutionContextFlowsToWorkerBodies( + [Values] bool foregroundContext) + { + Promise.Config.AsyncFlowExecutionContextEnabled = true; + var context = foregroundContext + ? (SynchronizationContext) TestHelper._foregroundContext + : TestHelper._backgroundContext; + + var al = new AsyncLocal(); + al.Value = 42; + Promise.ParallelForEach(Iterate100(), async (item, cancellationToken) => + { + await Promise.SwitchToForegroundAwait(forceAsync: true); + Assert.AreEqual(42, al.Value); + }) + .WaitWithTimeoutWhileExecutingForegroundContext(TimeSpan.FromSeconds(Environment.ProcessorCount)); + } +#endif // CSHARP_7_3_OR_NEWER } #endif // !UNITY_WEBGL } \ No newline at end of file From 966b309f4939ec537c533327f823f93391aa619f Mon Sep 17 00:00:00 2001 From: Tim Date: Sun, 5 Nov 2023 19:41:32 -0500 Subject: [PATCH 2/3] Fix AsyncLocal in Framework. --- Package/Core/Promises/Internal/ParallelInternal.cs | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/Package/Core/Promises/Internal/ParallelInternal.cs b/Package/Core/Promises/Internal/ParallelInternal.cs index fdba85ed..eacff30d 100644 --- a/Package/Core/Promises/Internal/ParallelInternal.cs +++ b/Package/Core/Promises/Internal/ParallelInternal.cs @@ -303,7 +303,9 @@ private void ExecuteWorkerAndLaunchNext() } else { - ExecutionContext.Run(_executionContext, obj => obj.UnsafeAs>().ExecuteWorker(true), this); + // .Net Framework doesn't allow us to re-use a captured context, so we have to copy it for each invocation. + // .Net Core's implementation of CreateCopy returns itself, so this is always as efficient as it can be. + ExecutionContext.Run(_executionContext.CreateCopy(), obj => obj.UnsafeAs>().ExecuteWorker(true), this); } } @@ -315,7 +317,9 @@ private void ExecuteWorkerWithoutLaunchNext() } else { - ExecutionContext.Run(_executionContext, obj => obj.UnsafeAs>().ExecuteWorker(false), this); + // .Net Framework doesn't allow us to re-use a captured context, so we have to copy it for each invocation. + // .Net Core's implementation of CreateCopy returns itself, so this is always as efficient as it can be. + ExecutionContext.Run(_executionContext.CreateCopy(), obj => obj.UnsafeAs>().ExecuteWorker(false), this); } } From e923270607331da3481cab0c30bd6cef8d0ae9ec Mon Sep 17 00:00:00 2001 From: Tim Date: Sun, 5 Nov 2023 21:15:41 -0500 Subject: [PATCH 3/3] Fixed tests. --- .../Tests/CoreTests/APIs/ParallelForTests.cs | 24 ++++++++++--------- 1 file changed, 13 insertions(+), 11 deletions(-) diff --git a/Package/Tests/CoreTests/APIs/ParallelForTests.cs b/Package/Tests/CoreTests/APIs/ParallelForTests.cs index 27c933e1..7b95ffab 100644 --- a/Package/Tests/CoreTests/APIs/ParallelForTests.cs +++ b/Package/Tests/CoreTests/APIs/ParallelForTests.cs @@ -211,32 +211,35 @@ public void AllItemsEnumeratedOnce_WithCaptureValue_Sync( } } - private IEnumerable IterateAndAssertContext(SynchronizationContext context) + private IEnumerable IterateAndAssertContext(SynchronizationType expectedContext, Thread mainThread) { - Assert.AreEqual(context, SynchronizationContext.Current); + TestHelper.AssertCallbackContext(expectedContext, expectedContext, mainThread); for (int i = 1; i <= 100; i++) { yield return i; - Assert.AreEqual(context, SynchronizationContext.Current); + TestHelper.AssertCallbackContext(expectedContext, expectedContext, mainThread); } } [Test] public void SynchronizationContext_AllCodeExecutedOnCorrectContext_Sync( - [Values] bool foregroundContext) + [Values(SynchronizationType.Foreground, SynchronizationType.Background)] SynchronizationType syncContext) { - SynchronizationContext context = foregroundContext ? - TestHelper._foregroundContext : - (SynchronizationContext) TestHelper._backgroundContext; + var mainThread = Thread.CurrentThread; + SynchronizationContext context = syncContext == SynchronizationType.Foreground + ? TestHelper._foregroundContext + : (SynchronizationContext) TestHelper._backgroundContext; - var otherContext = new PromiseSynchronizationContext(); + var otherContext = syncContext == SynchronizationType.Foreground + ? (SynchronizationContext) TestHelper._backgroundContext + : TestHelper._foregroundContext; var cq = new Queue(); bool isComplete = false; - Promise.ParallelForEach(IterateAndAssertContext(context), (item, cancelationToken) => + Promise.ParallelForEach(IterateAndAssertContext(syncContext, mainThread), (item, cancelationToken) => { - Assert.AreEqual(context, SynchronizationContext.Current); + TestHelper.AssertCallbackContext(syncContext, syncContext, mainThread); return Promise.SwitchToContext(context) .Then(() => { @@ -257,7 +260,6 @@ public void SynchronizationContext_AllCodeExecutedOnCorrectContext_Sync( if (!SpinWait.SpinUntil(() => { TestHelper.ExecuteForegroundCallbacks(); - otherContext.Execute(); return isComplete; }, TimeSpan.FromSeconds(1))) {