Skip to content

Commit

Permalink
Fix cancellation in schedular. It's no longer optional (#213)
Browse files Browse the repository at this point in the history
* Fix cancellation in schedular.  It's no longer optional

* formatting

* fix tests because of stacktrace
  • Loading branch information
adamhathcock authored Jan 24, 2025
1 parent f81fc97 commit 93912d6
Show file tree
Hide file tree
Showing 15 changed files with 44 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ public abstract class ChannelLoader<T>
private const int MAX_SAVE_CACHE_BATCH = 500;
private const int MAX_SAVE_CACHE_PARALLELISM = 4;

protected async Task GetAndCache(IEnumerable<string> allChildrenIds, CancellationToken cancellationToken = default) =>
protected async Task GetAndCache(IEnumerable<string> allChildrenIds, CancellationToken cancellationToken) =>
await allChildrenIds
.ToChannel(cancellationToken: cancellationToken)
.Pipe(MAX_READ_CACHE_PARALLELISM, CheckCache, cancellationToken: cancellationToken)
Expand Down
9 changes: 5 additions & 4 deletions src/Speckle.Sdk/Api/Operations/Operations.Send.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ public async Task<SerializeProcessResults> Send2(
string streamId,
string? authorizationToken,
Base value,
IProgress<ProgressArgs>? onProgressAction = null,
CancellationToken cancellationToken = default
IProgress<ProgressArgs>? onProgressAction,
CancellationToken cancellationToken
)
{
using var receiveActivity = activityFactory.Start("Operations.Send");
Expand All @@ -29,9 +29,10 @@ public async Task<SerializeProcessResults> Send2(
url,
streamId,
authorizationToken,
onProgressAction
onProgressAction,
cancellationToken
);
var results = await process.Serialize(value, cancellationToken).ConfigureAwait(false);
var results = await process.Serialize(value).ConfigureAwait(false);

receiveActivity?.SetStatus(SdkActivityStatusCode.Ok);
return results;
Expand Down
8 changes: 2 additions & 6 deletions src/Speckle.Sdk/Serialisation/V2/Send/ObjectSerializer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ public sealed class ObjectSerializer : IObjectSerializer

private readonly IReadOnlyDictionary<Id, NodeInfo> _childCache;

private readonly bool _trackDetachedChildren;
private readonly IBasePropertyGatherer _propertyGatherer;
private readonly CancellationToken _cancellationToken;

Expand All @@ -51,16 +50,14 @@ public sealed class ObjectSerializer : IObjectSerializer
/// <summary>
/// Creates a new Serializer instance.
/// </summary>
/// <param name="trackDetachedChildren">Whether to store all detachable objects while serializing. They can be retrieved via <see cref="ObjectReferences"/> post serialization.</param>
/// <param name="cancellationToken"></param>
public ObjectSerializer(
IBasePropertyGatherer propertyGatherer,
IReadOnlyDictionary<Id, NodeInfo> childCache,
Pool<List<(Id, Json, Closures)>> chunksPool,
Pool<List<DataChunk>> chunks2Pool,
Pool<List<object?>> chunks3Pool,
bool trackDetachedChildren = false,
CancellationToken cancellationToken = default
CancellationToken cancellationToken
)
{
_propertyGatherer = propertyGatherer;
Expand All @@ -69,7 +66,6 @@ public ObjectSerializer(
_chunks2Pool = chunks2Pool;
_chunks3Pool = chunks3Pool;
_cancellationToken = cancellationToken;
_trackDetachedChildren = trackDetachedChildren;
_chunks = chunksPool.Get();
}

Expand Down Expand Up @@ -311,7 +307,7 @@ private void SerializeProperty(object? obj, JsonWriter writer, PropertyAttribute
var json2 = ReferenceGenerator.CreateReference(id);
AddClosure(id);
// add to obj refs to return
if (baseObj.applicationId != null && _trackDetachedChildren) // && baseObj is not DataChunk && baseObj is not Abstract) // not needed, as data chunks will never have application ids, and abstract objs are not really used.
if (baseObj.applicationId != null) // && baseObj is not DataChunk && baseObj is not Abstract) // not needed, as data chunks will never have application ids, and abstract objs are not really used.
{
ObjectReferences[new(baseObj.applicationId)] = new ObjectReference()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,5 @@ public class ObjectSerializerFactory(IBasePropertyGatherer propertyGatherer) : I
private readonly Pool<List<object?>> _chunk3Pool = Pools.CreateListPool<object?>();

public IObjectSerializer Create(IReadOnlyDictionary<Id, NodeInfo> baseCache, CancellationToken cancellationToken) =>
new ObjectSerializer(propertyGatherer, baseCache, _chunkPool, _chunk2Pool, _chunk3Pool, true, cancellationToken);
new ObjectSerializer(propertyGatherer, baseCache, _chunkPool, _chunk2Pool, _chunk3Pool, cancellationToken);
}
12 changes: 5 additions & 7 deletions src/Speckle.Sdk/Serialisation/V2/Send/PriorityScheduler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,17 @@ namespace Speckle.Sdk.Serialisation.V2.Send;
public sealed class PriorityScheduler(
ILogger<PriorityScheduler> logger,
ThreadPriority priority,
int maximumConcurrencyLevel
int maximumConcurrencyLevel,
CancellationToken cancellationToken
) : TaskScheduler, IDisposable
{
private readonly CancellationTokenSource _cancellationTokenSource = new();
private readonly BlockingCollection<Task> _tasks = new();
private Thread[]? _threads;

public void Dispose()
{
_tasks.CompleteAdding();
_cancellationTokenSource.Cancel();
_tasks.Dispose();
_cancellationTokenSource.Dispose();
}

public override int MaximumConcurrencyLevel => maximumConcurrencyLevel;
Expand All @@ -38,14 +36,14 @@ protected override void QueueTask(Task task)
{
try
{
foreach (Task t in _tasks.GetConsumingEnumerable(_cancellationTokenSource.Token))
foreach (Task t in _tasks.GetConsumingEnumerable(cancellationToken))
{
if (_cancellationTokenSource.IsCancellationRequested)
if (cancellationToken.IsCancellationRequested)
{
break;
}
TryExecuteTask(t);
if (_cancellationTokenSource.IsCancellationRequested)
if (cancellationToken.IsCancellationRequested)
{
break;
}
Expand Down
9 changes: 6 additions & 3 deletions src/Speckle.Sdk/Serialisation/V2/Send/SerializeProcess.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,18 +33,21 @@ public sealed class SerializeProcess(
IBaseChildFinder baseChildFinder,
IObjectSerializerFactory objectSerializerFactory,
ILoggerFactory loggerFactory,
CancellationToken cancellationToken,
SerializeProcessOptions? options = null
) : ChannelSaver<BaseItem>, ISerializeProcess
{
private readonly PriorityScheduler _highest = new(
loggerFactory.CreateLogger<PriorityScheduler>(),
ThreadPriority.Highest,
2
2,
cancellationToken
);
private readonly PriorityScheduler _belowNormal = new(
loggerFactory.CreateLogger<PriorityScheduler>(),
ThreadPriority.BelowNormal,
Environment.ProcessorCount * 2
Environment.ProcessorCount * 2,
cancellationToken
);

private readonly SerializeProcessOptions _options = options ?? new(false, false, false, false);
Expand Down Expand Up @@ -74,7 +77,7 @@ public void Dispose()
sqLiteJsonCacheManager.Dispose();
}

public async Task<SerializeProcessResults> Serialize(Base root, CancellationToken cancellationToken)
public async Task<SerializeProcessResults> Serialize(Base root)
{
var channelTask = Start(cancellationToken);
var findTotalObjectsTask = Task.CompletedTask;
Expand Down
3 changes: 3 additions & 0 deletions src/Speckle.Sdk/Serialisation/V2/SerializeProcessFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ ISerializeProcess CreateSerializeProcess(
string streamId,
string? authorizationToken,
IProgress<ProgressArgs>? progress,
CancellationToken cancellationToken,
SerializeProcessOptions? options = null
);
IDeserializeProcess CreateDeserializeProcess(
Expand All @@ -38,6 +39,7 @@ public ISerializeProcess CreateSerializeProcess(
string streamId,
string? authorizationToken,
IProgress<ProgressArgs>? progress,
CancellationToken cancellationToken,
SerializeProcessOptions? options = null
)
{
Expand All @@ -50,6 +52,7 @@ public ISerializeProcess CreateSerializeProcess(
baseChildFinder,
objectSerializerFactory,
loggerFactory,
cancellationToken,
options
);
}
Expand Down
3 changes: 2 additions & 1 deletion tests/Speckle.Sdk.Serialization.Testing/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,10 @@
streamId,
token,
progress,
default,
new SerializeProcessOptions(skipCacheSendCheck, skipCacheSendSave, true, true)
);
await process2.Serialize(@base, default).ConfigureAwait(false);
await process2.Serialize(@base).ConfigureAwait(false);
Console.WriteLine("Detach");
Console.ReadLine();
#pragma warning restore CA1506
12 changes: 8 additions & 4 deletions tests/Speckle.Sdk.Serialization.Tests/DetachedTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,10 @@ public async Task CanSerialize_New_Detached()
new BaseChildFinder(new BasePropertyGatherer()),
new ObjectSerializerFactory(new BasePropertyGatherer()),
new NullLoggerFactory(),
default,
new SerializeProcessOptions(false, false, true, true)
);
await process2.Serialize(@base, default);
await process2.Serialize(@base);

await VerifyJsonDictionary(objects);
}
Expand Down Expand Up @@ -123,9 +124,10 @@ public async Task CanSerialize_New_Detached2()
new BaseChildFinder(new BasePropertyGatherer()),
new ObjectSerializerFactory(new BasePropertyGatherer()),
new NullLoggerFactory(),
default,
new SerializeProcessOptions(false, false, true, true)
);
var results = await process2.Serialize(@base, default);
var results = await process2.Serialize(@base);

await VerifyJsonDictionary(objects);
}
Expand Down Expand Up @@ -192,9 +194,10 @@ public async Task CanSerialize_New_Detached_With_DataChunks()
new BaseChildFinder(new BasePropertyGatherer()),
new ObjectSerializerFactory(new BasePropertyGatherer()),
new NullLoggerFactory(),
default,
new SerializeProcessOptions(false, false, true, true)
);
var results = await process2.Serialize(@base, default);
var results = await process2.Serialize(@base);

objects.Count.Should().Be(3);
var x = JObject.Parse(objects["efeadaca70a85ae6d3acfc93a8b380db"]);
Expand Down Expand Up @@ -226,9 +229,10 @@ public async Task CanSerialize_New_Detached_With_DataChunks2()
new BaseChildFinder(new BasePropertyGatherer()),
new ObjectSerializerFactory(new BasePropertyGatherer()),
new NullLoggerFactory(),
default,
new SerializeProcessOptions(false, false, true, true)
);
var results = await process2.Serialize(@base, default);
var results = await process2.Serialize(@base);
await VerifyJsonDictionary(objects);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,5 @@
"StackTrace": "at Speckle.Sdk.Serialization.Tests.ExceptionSendCacheManager.SaveObjects(IEnumerable`1 items)\nat Speckle.Sdk.Serialisation.V2.Send.SerializeProcess.SaveToCache(List`1 batch)\nat Open.ChannelExtensions.<28d3e838-dde6-44a1-8f5e-d1c739a178d0>Extensions.<>c__DisplayClass98_0`1.<ReadAllConcurrently>b__0(T e)\nat Open.ChannelExtensions.<28d3e838-dde6-44a1-8f5e-d1c739a178d0>Extensions.<>c__DisplayClass92_0`1.<ReadAllConcurrentlyAsync>b__2(T item, Int64 _)\nat Open.ChannelExtensions.<28d3e838-dde6-44a1-8f5e-d1c739a178d0>Extensions.ReadUntilCancelledAsync[T](<5e816acc-9cf8-4b52-a8da-ceb5bd7eecc7>ChannelReader`1 reader, CancellationToken cancellationToken, Func`3 receiver, Boolean deferredExecution)\n--- End of stack trace from previous location ---",
"Type": "NotImplementedException"
},
"StackTrace": "at Speckle.Sdk.Dependencies.Serialization.ChannelSaver`1.DoneSaving()\nat Speckle.Sdk.Serialisation.V2.Send.SerializeProcess.Serialize(Base root, CancellationToken cancellationToken)\n--- End of stack trace from previous location ---\nat Xunit.Assert.RecordExceptionAsync(Func`1 testCode)"
"StackTrace": "at Speckle.Sdk.Dependencies.Serialization.ChannelSaver`1.DoneSaving()\nat Speckle.Sdk.Serialisation.V2.Send.SerializeProcess.Serialize(Base root)\n--- End of stack trace from previous location ---\nat Xunit.Assert.RecordExceptionAsync(Func`1 testCode)"
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,5 @@
"Type": "NotImplementedException"
}
],
"StackTrace": "at Speckle.Sdk.Dependencies.Serialization.ChannelSaver`1.DoneSaving()\nat Speckle.Sdk.Serialisation.V2.Send.SerializeProcess.Serialize(Base root, CancellationToken cancellationToken)\n--- End of stack trace from previous location ---\nat Xunit.Assert.RecordExceptionAsync(Func`1 testCode)"
"StackTrace": "at Speckle.Sdk.Dependencies.Serialization.ChannelSaver`1.DoneSaving()\nat Speckle.Sdk.Serialisation.V2.Send.SerializeProcess.Serialize(Base root)\n--- End of stack trace from previous location ---\nat Xunit.Assert.RecordExceptionAsync(Func`1 testCode)"
}
6 changes: 4 additions & 2 deletions tests/Speckle.Sdk.Serialization.Tests/ExceptionTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,12 @@ public async Task Test_Exceptions_Upload()
new BaseChildFinder(new BasePropertyGatherer()),
new ObjectSerializerFactory(new BasePropertyGatherer()),
new NullLoggerFactory(),
default,
new SerializeProcessOptions(false, false, false, true)
);

//4 exceptions are fine because we use 4 threads for saving cache
var ex = await Assert.ThrowsAsync<AggregateException>(async () => await process2.Serialize(testClass, default));
var ex = await Assert.ThrowsAsync<AggregateException>(async () => await process2.Serialize(testClass));
await Verify(ex);
}

Expand All @@ -50,10 +51,11 @@ public async Task Test_Exceptions_Cache()
new BaseChildFinder(new BasePropertyGatherer()),
new ObjectSerializerFactory(new BasePropertyGatherer()),
new NullLoggerFactory(),
default,
new SerializeProcessOptions(false, false, false, true)
);

var ex = await Assert.ThrowsAsync<AggregateException>(async () => await process2.Serialize(testClass, default));
var ex = await Assert.ThrowsAsync<AggregateException>(async () => await process2.Serialize(testClass));
await Verify(ex);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,11 @@ public async Task Test_Json()
new BaseChildFinder(new BasePropertyGatherer()),
new ObjectSerializerFactory(new BasePropertyGatherer()),
new NullLoggerFactory(),
default,
new SerializeProcessOptions(false, false, true, true)
);

await process2.Serialize(testClass, default);
await process2.Serialize(testClass);

await VerifyJsonDictionary(objects);
}
Expand Down
3 changes: 2 additions & 1 deletion tests/Speckle.Sdk.Serialization.Tests/SerializationTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -267,9 +267,10 @@ public async Task Roundtrip_Test_New(string fileName, string rootId, int oldCoun
new BaseChildFinder(new BasePropertyGatherer()),
new ObjectSerializerFactory(new BasePropertyGatherer()),
new NullLoggerFactory(),
default,
new SerializeProcessOptions(true, true, false, true)
);
var (rootId2, _) = await serializeProcess.Serialize(root, default);
var (rootId2, _) = await serializeProcess.Serialize(root);

rootId2.Should().Be(root.id);
newIdToJson.Count.Should().Be(newCount);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public async Task<string> Send_old()
[Benchmark]
public async Task<string> Send_new()
{
var res = await _operations.Send2(new(acc.serverInfo.url), _project.id, acc.token, _testData);
var res = await _operations.Send2(new(acc.serverInfo.url), _project.id, acc.token, _testData, null, default);
return await TagVersion($"Send_new {Guid.NewGuid()}", res.RootId);
}

Expand Down

0 comments on commit 93912d6

Please sign in to comment.