From 93912d6712e4e6413607ad578ae7af6aad7b5c55 Mon Sep 17 00:00:00 2001 From: Adam Hathcock Date: Fri, 24 Jan 2025 11:24:51 +0000 Subject: [PATCH] Fix cancellation in schedular. It's no longer optional (#213) * Fix cancellation in schedular. It's no longer optional * formatting * fix tests because of stacktrace --- .../Serialization/ChannelLoader.cs | 2 +- src/Speckle.Sdk/Api/Operations/Operations.Send.cs | 9 +++++---- .../Serialisation/V2/Send/ObjectSerializer.cs | 8 ++------ .../Serialisation/V2/Send/ObjectSerializerFactory.cs | 2 +- .../Serialisation/V2/Send/PriorityScheduler.cs | 12 +++++------- .../Serialisation/V2/Send/SerializeProcess.cs | 9 ++++++--- .../Serialisation/V2/SerializeProcessFactory.cs | 3 +++ tests/Speckle.Sdk.Serialization.Testing/Program.cs | 3 ++- .../Speckle.Sdk.Serialization.Tests/DetachedTests.cs | 12 ++++++++---- ...xceptionTests.Test_Exceptions_Cache.verified.json | 2 +- ...ceptionTests.Test_Exceptions_Upload.verified.json | 2 +- .../ExceptionTests.cs | 6 ++++-- .../ExplicitInterfaceTests.cs | 3 ++- .../SerializationTests.cs | 3 ++- .../Benchmarks/GeneralSendTest.cs | 2 +- 15 files changed, 44 insertions(+), 34 deletions(-) diff --git a/src/Speckle.Sdk.Dependencies/Serialization/ChannelLoader.cs b/src/Speckle.Sdk.Dependencies/Serialization/ChannelLoader.cs index ccecd183..c8dcd782 100644 --- a/src/Speckle.Sdk.Dependencies/Serialization/ChannelLoader.cs +++ b/src/Speckle.Sdk.Dependencies/Serialization/ChannelLoader.cs @@ -11,7 +11,7 @@ public abstract class ChannelLoader private const int MAX_SAVE_CACHE_BATCH = 500; private const int MAX_SAVE_CACHE_PARALLELISM = 4; - protected async Task GetAndCache(IEnumerable allChildrenIds, CancellationToken cancellationToken = default) => + protected async Task GetAndCache(IEnumerable allChildrenIds, CancellationToken cancellationToken) => await allChildrenIds .ToChannel(cancellationToken: cancellationToken) .Pipe(MAX_READ_CACHE_PARALLELISM, CheckCache, cancellationToken: cancellationToken) diff --git a/src/Speckle.Sdk/Api/Operations/Operations.Send.cs b/src/Speckle.Sdk/Api/Operations/Operations.Send.cs index 16636e70..68ee9438 100644 --- a/src/Speckle.Sdk/Api/Operations/Operations.Send.cs +++ b/src/Speckle.Sdk/Api/Operations/Operations.Send.cs @@ -16,8 +16,8 @@ public async Task Send2( string streamId, string? authorizationToken, Base value, - IProgress? onProgressAction = null, - CancellationToken cancellationToken = default + IProgress? onProgressAction, + CancellationToken cancellationToken ) { using var receiveActivity = activityFactory.Start("Operations.Send"); @@ -29,9 +29,10 @@ public async Task Send2( url, streamId, authorizationToken, - onProgressAction + onProgressAction, + cancellationToken ); - var results = await process.Serialize(value, cancellationToken).ConfigureAwait(false); + var results = await process.Serialize(value).ConfigureAwait(false); receiveActivity?.SetStatus(SdkActivityStatusCode.Ok); return results; diff --git a/src/Speckle.Sdk/Serialisation/V2/Send/ObjectSerializer.cs b/src/Speckle.Sdk/Serialisation/V2/Send/ObjectSerializer.cs index 1d72177a..f3e88d44 100644 --- a/src/Speckle.Sdk/Serialisation/V2/Send/ObjectSerializer.cs +++ b/src/Speckle.Sdk/Serialisation/V2/Send/ObjectSerializer.cs @@ -29,7 +29,6 @@ public sealed class ObjectSerializer : IObjectSerializer private readonly IReadOnlyDictionary _childCache; - private readonly bool _trackDetachedChildren; private readonly IBasePropertyGatherer _propertyGatherer; private readonly CancellationToken _cancellationToken; @@ -51,7 +50,6 @@ public sealed class ObjectSerializer : IObjectSerializer /// /// Creates a new Serializer instance. /// - /// Whether to store all detachable objects while serializing. They can be retrieved via post serialization. /// public ObjectSerializer( IBasePropertyGatherer propertyGatherer, @@ -59,8 +57,7 @@ public ObjectSerializer( Pool> chunksPool, Pool> chunks2Pool, Pool> chunks3Pool, - bool trackDetachedChildren = false, - CancellationToken cancellationToken = default + CancellationToken cancellationToken ) { _propertyGatherer = propertyGatherer; @@ -69,7 +66,6 @@ public ObjectSerializer( _chunks2Pool = chunks2Pool; _chunks3Pool = chunks3Pool; _cancellationToken = cancellationToken; - _trackDetachedChildren = trackDetachedChildren; _chunks = chunksPool.Get(); } @@ -311,7 +307,7 @@ private void SerializeProperty(object? obj, JsonWriter writer, PropertyAttribute var json2 = ReferenceGenerator.CreateReference(id); AddClosure(id); // add to obj refs to return - if (baseObj.applicationId != null && _trackDetachedChildren) // && baseObj is not DataChunk && baseObj is not Abstract) // not needed, as data chunks will never have application ids, and abstract objs are not really used. + if (baseObj.applicationId != null) // && baseObj is not DataChunk && baseObj is not Abstract) // not needed, as data chunks will never have application ids, and abstract objs are not really used. { ObjectReferences[new(baseObj.applicationId)] = new ObjectReference() { diff --git a/src/Speckle.Sdk/Serialisation/V2/Send/ObjectSerializerFactory.cs b/src/Speckle.Sdk/Serialisation/V2/Send/ObjectSerializerFactory.cs index 9c5bbc8a..e96b98bf 100644 --- a/src/Speckle.Sdk/Serialisation/V2/Send/ObjectSerializerFactory.cs +++ b/src/Speckle.Sdk/Serialisation/V2/Send/ObjectSerializerFactory.cs @@ -13,5 +13,5 @@ public class ObjectSerializerFactory(IBasePropertyGatherer propertyGatherer) : I private readonly Pool> _chunk3Pool = Pools.CreateListPool(); public IObjectSerializer Create(IReadOnlyDictionary baseCache, CancellationToken cancellationToken) => - new ObjectSerializer(propertyGatherer, baseCache, _chunkPool, _chunk2Pool, _chunk3Pool, true, cancellationToken); + new ObjectSerializer(propertyGatherer, baseCache, _chunkPool, _chunk2Pool, _chunk3Pool, cancellationToken); } diff --git a/src/Speckle.Sdk/Serialisation/V2/Send/PriorityScheduler.cs b/src/Speckle.Sdk/Serialisation/V2/Send/PriorityScheduler.cs index 11917284..828f6bc9 100644 --- a/src/Speckle.Sdk/Serialisation/V2/Send/PriorityScheduler.cs +++ b/src/Speckle.Sdk/Serialisation/V2/Send/PriorityScheduler.cs @@ -6,19 +6,17 @@ namespace Speckle.Sdk.Serialisation.V2.Send; public sealed class PriorityScheduler( ILogger logger, ThreadPriority priority, - int maximumConcurrencyLevel + int maximumConcurrencyLevel, + CancellationToken cancellationToken ) : TaskScheduler, IDisposable { - private readonly CancellationTokenSource _cancellationTokenSource = new(); private readonly BlockingCollection _tasks = new(); private Thread[]? _threads; public void Dispose() { _tasks.CompleteAdding(); - _cancellationTokenSource.Cancel(); _tasks.Dispose(); - _cancellationTokenSource.Dispose(); } public override int MaximumConcurrencyLevel => maximumConcurrencyLevel; @@ -38,14 +36,14 @@ protected override void QueueTask(Task task) { try { - foreach (Task t in _tasks.GetConsumingEnumerable(_cancellationTokenSource.Token)) + foreach (Task t in _tasks.GetConsumingEnumerable(cancellationToken)) { - if (_cancellationTokenSource.IsCancellationRequested) + if (cancellationToken.IsCancellationRequested) { break; } TryExecuteTask(t); - if (_cancellationTokenSource.IsCancellationRequested) + if (cancellationToken.IsCancellationRequested) { break; } diff --git a/src/Speckle.Sdk/Serialisation/V2/Send/SerializeProcess.cs b/src/Speckle.Sdk/Serialisation/V2/Send/SerializeProcess.cs index c6ee2823..90e1ecaf 100644 --- a/src/Speckle.Sdk/Serialisation/V2/Send/SerializeProcess.cs +++ b/src/Speckle.Sdk/Serialisation/V2/Send/SerializeProcess.cs @@ -33,18 +33,21 @@ public sealed class SerializeProcess( IBaseChildFinder baseChildFinder, IObjectSerializerFactory objectSerializerFactory, ILoggerFactory loggerFactory, + CancellationToken cancellationToken, SerializeProcessOptions? options = null ) : ChannelSaver, ISerializeProcess { private readonly PriorityScheduler _highest = new( loggerFactory.CreateLogger(), ThreadPriority.Highest, - 2 + 2, + cancellationToken ); private readonly PriorityScheduler _belowNormal = new( loggerFactory.CreateLogger(), ThreadPriority.BelowNormal, - Environment.ProcessorCount * 2 + Environment.ProcessorCount * 2, + cancellationToken ); private readonly SerializeProcessOptions _options = options ?? new(false, false, false, false); @@ -74,7 +77,7 @@ public void Dispose() sqLiteJsonCacheManager.Dispose(); } - public async Task Serialize(Base root, CancellationToken cancellationToken) + public async Task Serialize(Base root) { var channelTask = Start(cancellationToken); var findTotalObjectsTask = Task.CompletedTask; diff --git a/src/Speckle.Sdk/Serialisation/V2/SerializeProcessFactory.cs b/src/Speckle.Sdk/Serialisation/V2/SerializeProcessFactory.cs index bc7bfc92..d3004441 100644 --- a/src/Speckle.Sdk/Serialisation/V2/SerializeProcessFactory.cs +++ b/src/Speckle.Sdk/Serialisation/V2/SerializeProcessFactory.cs @@ -13,6 +13,7 @@ ISerializeProcess CreateSerializeProcess( string streamId, string? authorizationToken, IProgress? progress, + CancellationToken cancellationToken, SerializeProcessOptions? options = null ); IDeserializeProcess CreateDeserializeProcess( @@ -38,6 +39,7 @@ public ISerializeProcess CreateSerializeProcess( string streamId, string? authorizationToken, IProgress? progress, + CancellationToken cancellationToken, SerializeProcessOptions? options = null ) { @@ -50,6 +52,7 @@ public ISerializeProcess CreateSerializeProcess( baseChildFinder, objectSerializerFactory, loggerFactory, + cancellationToken, options ); } diff --git a/tests/Speckle.Sdk.Serialization.Testing/Program.cs b/tests/Speckle.Sdk.Serialization.Testing/Program.cs index 27ec68ca..486607eb 100644 --- a/tests/Speckle.Sdk.Serialization.Testing/Program.cs +++ b/tests/Speckle.Sdk.Serialization.Testing/Program.cs @@ -60,9 +60,10 @@ streamId, token, progress, + default, new SerializeProcessOptions(skipCacheSendCheck, skipCacheSendSave, true, true) ); -await process2.Serialize(@base, default).ConfigureAwait(false); +await process2.Serialize(@base).ConfigureAwait(false); Console.WriteLine("Detach"); Console.ReadLine(); #pragma warning restore CA1506 diff --git a/tests/Speckle.Sdk.Serialization.Tests/DetachedTests.cs b/tests/Speckle.Sdk.Serialization.Tests/DetachedTests.cs index a33ac4bd..c058b55c 100644 --- a/tests/Speckle.Sdk.Serialization.Tests/DetachedTests.cs +++ b/tests/Speckle.Sdk.Serialization.Tests/DetachedTests.cs @@ -39,9 +39,10 @@ public async Task CanSerialize_New_Detached() new BaseChildFinder(new BasePropertyGatherer()), new ObjectSerializerFactory(new BasePropertyGatherer()), new NullLoggerFactory(), + default, new SerializeProcessOptions(false, false, true, true) ); - await process2.Serialize(@base, default); + await process2.Serialize(@base); await VerifyJsonDictionary(objects); } @@ -123,9 +124,10 @@ public async Task CanSerialize_New_Detached2() new BaseChildFinder(new BasePropertyGatherer()), new ObjectSerializerFactory(new BasePropertyGatherer()), new NullLoggerFactory(), + default, new SerializeProcessOptions(false, false, true, true) ); - var results = await process2.Serialize(@base, default); + var results = await process2.Serialize(@base); await VerifyJsonDictionary(objects); } @@ -192,9 +194,10 @@ public async Task CanSerialize_New_Detached_With_DataChunks() new BaseChildFinder(new BasePropertyGatherer()), new ObjectSerializerFactory(new BasePropertyGatherer()), new NullLoggerFactory(), + default, new SerializeProcessOptions(false, false, true, true) ); - var results = await process2.Serialize(@base, default); + var results = await process2.Serialize(@base); objects.Count.Should().Be(3); var x = JObject.Parse(objects["efeadaca70a85ae6d3acfc93a8b380db"]); @@ -226,9 +229,10 @@ public async Task CanSerialize_New_Detached_With_DataChunks2() new BaseChildFinder(new BasePropertyGatherer()), new ObjectSerializerFactory(new BasePropertyGatherer()), new NullLoggerFactory(), + default, new SerializeProcessOptions(false, false, true, true) ); - var results = await process2.Serialize(@base, default); + var results = await process2.Serialize(@base); await VerifyJsonDictionary(objects); } } diff --git a/tests/Speckle.Sdk.Serialization.Tests/ExceptionTests.Test_Exceptions_Cache.verified.json b/tests/Speckle.Sdk.Serialization.Tests/ExceptionTests.Test_Exceptions_Cache.verified.json index 50026526..8f35a047 100644 --- a/tests/Speckle.Sdk.Serialization.Tests/ExceptionTests.Test_Exceptions_Cache.verified.json +++ b/tests/Speckle.Sdk.Serialization.Tests/ExceptionTests.Test_Exceptions_Cache.verified.json @@ -6,5 +6,5 @@ "StackTrace": "at Speckle.Sdk.Serialization.Tests.ExceptionSendCacheManager.SaveObjects(IEnumerable`1 items)\nat Speckle.Sdk.Serialisation.V2.Send.SerializeProcess.SaveToCache(List`1 batch)\nat Open.ChannelExtensions.<28d3e838-dde6-44a1-8f5e-d1c739a178d0>Extensions.<>c__DisplayClass98_0`1.b__0(T e)\nat Open.ChannelExtensions.<28d3e838-dde6-44a1-8f5e-d1c739a178d0>Extensions.<>c__DisplayClass92_0`1.b__2(T item, Int64 _)\nat Open.ChannelExtensions.<28d3e838-dde6-44a1-8f5e-d1c739a178d0>Extensions.ReadUntilCancelledAsync[T](<5e816acc-9cf8-4b52-a8da-ceb5bd7eecc7>ChannelReader`1 reader, CancellationToken cancellationToken, Func`3 receiver, Boolean deferredExecution)\n--- End of stack trace from previous location ---", "Type": "NotImplementedException" }, - "StackTrace": "at Speckle.Sdk.Dependencies.Serialization.ChannelSaver`1.DoneSaving()\nat Speckle.Sdk.Serialisation.V2.Send.SerializeProcess.Serialize(Base root, CancellationToken cancellationToken)\n--- End of stack trace from previous location ---\nat Xunit.Assert.RecordExceptionAsync(Func`1 testCode)" + "StackTrace": "at Speckle.Sdk.Dependencies.Serialization.ChannelSaver`1.DoneSaving()\nat Speckle.Sdk.Serialisation.V2.Send.SerializeProcess.Serialize(Base root)\n--- End of stack trace from previous location ---\nat Xunit.Assert.RecordExceptionAsync(Func`1 testCode)" } diff --git a/tests/Speckle.Sdk.Serialization.Tests/ExceptionTests.Test_Exceptions_Upload.verified.json b/tests/Speckle.Sdk.Serialization.Tests/ExceptionTests.Test_Exceptions_Upload.verified.json index cd87f84a..5a2a0d20 100644 --- a/tests/Speckle.Sdk.Serialization.Tests/ExceptionTests.Test_Exceptions_Upload.verified.json +++ b/tests/Speckle.Sdk.Serialization.Tests/ExceptionTests.Test_Exceptions_Upload.verified.json @@ -26,5 +26,5 @@ "Type": "NotImplementedException" } ], - "StackTrace": "at Speckle.Sdk.Dependencies.Serialization.ChannelSaver`1.DoneSaving()\nat Speckle.Sdk.Serialisation.V2.Send.SerializeProcess.Serialize(Base root, CancellationToken cancellationToken)\n--- End of stack trace from previous location ---\nat Xunit.Assert.RecordExceptionAsync(Func`1 testCode)" + "StackTrace": "at Speckle.Sdk.Dependencies.Serialization.ChannelSaver`1.DoneSaving()\nat Speckle.Sdk.Serialisation.V2.Send.SerializeProcess.Serialize(Base root)\n--- End of stack trace from previous location ---\nat Xunit.Assert.RecordExceptionAsync(Func`1 testCode)" } diff --git a/tests/Speckle.Sdk.Serialization.Tests/ExceptionTests.cs b/tests/Speckle.Sdk.Serialization.Tests/ExceptionTests.cs index 256030d1..d4003f6e 100644 --- a/tests/Speckle.Sdk.Serialization.Tests/ExceptionTests.cs +++ b/tests/Speckle.Sdk.Serialization.Tests/ExceptionTests.cs @@ -30,11 +30,12 @@ public async Task Test_Exceptions_Upload() new BaseChildFinder(new BasePropertyGatherer()), new ObjectSerializerFactory(new BasePropertyGatherer()), new NullLoggerFactory(), + default, new SerializeProcessOptions(false, false, false, true) ); //4 exceptions are fine because we use 4 threads for saving cache - var ex = await Assert.ThrowsAsync(async () => await process2.Serialize(testClass, default)); + var ex = await Assert.ThrowsAsync(async () => await process2.Serialize(testClass)); await Verify(ex); } @@ -50,10 +51,11 @@ public async Task Test_Exceptions_Cache() new BaseChildFinder(new BasePropertyGatherer()), new ObjectSerializerFactory(new BasePropertyGatherer()), new NullLoggerFactory(), + default, new SerializeProcessOptions(false, false, false, true) ); - var ex = await Assert.ThrowsAsync(async () => await process2.Serialize(testClass, default)); + var ex = await Assert.ThrowsAsync(async () => await process2.Serialize(testClass)); await Verify(ex); } } diff --git a/tests/Speckle.Sdk.Serialization.Tests/ExplicitInterfaceTests.cs b/tests/Speckle.Sdk.Serialization.Tests/ExplicitInterfaceTests.cs index 446e9388..a5ec2d0c 100644 --- a/tests/Speckle.Sdk.Serialization.Tests/ExplicitInterfaceTests.cs +++ b/tests/Speckle.Sdk.Serialization.Tests/ExplicitInterfaceTests.cs @@ -26,10 +26,11 @@ public async Task Test_Json() new BaseChildFinder(new BasePropertyGatherer()), new ObjectSerializerFactory(new BasePropertyGatherer()), new NullLoggerFactory(), + default, new SerializeProcessOptions(false, false, true, true) ); - await process2.Serialize(testClass, default); + await process2.Serialize(testClass); await VerifyJsonDictionary(objects); } diff --git a/tests/Speckle.Sdk.Serialization.Tests/SerializationTests.cs b/tests/Speckle.Sdk.Serialization.Tests/SerializationTests.cs index d342d8cb..e2061419 100644 --- a/tests/Speckle.Sdk.Serialization.Tests/SerializationTests.cs +++ b/tests/Speckle.Sdk.Serialization.Tests/SerializationTests.cs @@ -267,9 +267,10 @@ public async Task Roundtrip_Test_New(string fileName, string rootId, int oldCoun new BaseChildFinder(new BasePropertyGatherer()), new ObjectSerializerFactory(new BasePropertyGatherer()), new NullLoggerFactory(), + default, new SerializeProcessOptions(true, true, false, true) ); - var (rootId2, _) = await serializeProcess.Serialize(root, default); + var (rootId2, _) = await serializeProcess.Serialize(root); rootId2.Should().Be(root.id); newIdToJson.Count.Should().Be(newCount); diff --git a/tests/Speckle.Sdk.Tests.Performance/Benchmarks/GeneralSendTest.cs b/tests/Speckle.Sdk.Tests.Performance/Benchmarks/GeneralSendTest.cs index 8f3d252c..46747dca 100644 --- a/tests/Speckle.Sdk.Tests.Performance/Benchmarks/GeneralSendTest.cs +++ b/tests/Speckle.Sdk.Tests.Performance/Benchmarks/GeneralSendTest.cs @@ -72,7 +72,7 @@ public async Task Send_old() [Benchmark] public async Task Send_new() { - var res = await _operations.Send2(new(acc.serverInfo.url), _project.id, acc.token, _testData); + var res = await _operations.Send2(new(acc.serverInfo.url), _project.id, acc.token, _testData, null, default); return await TagVersion($"Send_new {Guid.NewGuid()}", res.RootId); }