Skip to content

Commit

Permalink
Change Parallel.For to ZeroAllocJobScheduler
Browse files Browse the repository at this point in the history
This is more a proof of concept. When box2d v3 comes out having an actual job system will be useful. Also potentially as a replacement for Task.Run?
  • Loading branch information
metalgearsloth committed Nov 20, 2023
1 parent 202182e commit 330518e
Show file tree
Hide file tree
Showing 6 changed files with 115 additions and 23 deletions.
2 changes: 1 addition & 1 deletion Robust.Client/GameController/GameController.cs
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ internal bool StartupContinue(DisplayMode displayMode)
Options.DefaultWindowTitle ?? _resourceManifest!.DefaultWindowTitle ?? "RobustToolbox");

_taskManager.Initialize();
_parallelMgr.Initialize();
_fontManager.SetFontDpi((uint)_configurationManager.GetCVar(CVars.DisplayFontDpi));

// Load optional Robust modules.
Expand Down Expand Up @@ -357,7 +358,6 @@ internal bool StartupSystemSplash(

ProfileOptSetup.Setup(_configurationManager);

_parallelMgr.Initialize();
_prof.Initialize();

_resourceCache.Initialize(Options.LoadConfigAndUserData ? userDataDir : null);
Expand Down
3 changes: 1 addition & 2 deletions Robust.Server/BaseServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -204,8 +204,6 @@ public bool Start(ServerOptions options, Func<ILogHandler>? logHandlerFactory =

ProfileOptSetup.Setup(_config);

_parallelMgr.Initialize();

//Sets up Logging
_logHandlerFactory = logHandlerFactory;

Expand Down Expand Up @@ -268,6 +266,7 @@ public bool Start(ServerOptions options, Func<ILogHandler>? logHandlerFactory =

// Has to be done early because this guy's in charge of the main thread Synchronization Context.
_taskManager.Initialize();
_parallelMgr.Initialize();

LoadSettings();

Expand Down
49 changes: 34 additions & 15 deletions Robust.Shared/Physics/Systems/SharedPhysicsSystem.Contacts.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
using Robust.Shared.Physics.Dynamics.Contacts;
using Robust.Shared.Physics.Events;
using Robust.Shared.Utility;
using Schedulers;

namespace Robust.Shared.Physics.Systems;

Expand Down Expand Up @@ -582,24 +583,19 @@ internal void CollideContacts()

private void BuildManifolds(Contact[] contacts, int count, ContactStatus[] status, Vector2[] worldPoints)
{
var wake = ArrayPool<bool>.Shared.Rent(count);

if (count > ContactsPerThread * 2)
{
var batches = (int) Math.Ceiling((float) count / ContactsPerThread);
if (count == 0)
return;

Parallel.For(0, batches, i =>
{
var start = i * ContactsPerThread;
var end = Math.Min(start + ContactsPerThread, count);
UpdateContacts(contacts, start, end, status, wake, worldPoints);
});
var wake = ArrayPool<bool>.Shared.Rent(count);

}
else
_parallel.ProcessNow(new ManifoldsJob()
{
UpdateContacts(contacts, 0, count, status, wake, worldPoints);
}
Physics = this,
Status = status,
WorldPoints = worldPoints,
Contacts = contacts,
Wake = wake,
}, count);

// Can't do this during UpdateContacts due to IoC threading issues.
for (var i = 0; i < count; i++)
Expand All @@ -620,6 +616,29 @@ private void BuildManifolds(Contact[] contacts, int count, ContactStatus[] statu
ArrayPool<bool>.Shared.Return(wake);
}

private record struct ManifoldsJob : IJobParallelFor
{
public int ThreadCount => 0;
public int BatchSize => ContactsPerThread;

public SharedPhysicsSystem Physics;

public Contact[] Contacts;
public ContactStatus[] Status;
public Vector2[] WorldPoints;
public bool[] Wake;

public void Execute(int index)
{
var end = index + 1;
Physics.UpdateContacts(Contacts, index, end, Status, Wake, WorldPoints);
}

public void Finish()
{
}
}

private void UpdateContacts(Contact[] contacts, int start, int end, ContactStatus[] status, bool[] wake, Vector2[] worldPoints)
{
for (var i = start; i < end; i++)
Expand Down
1 change: 1 addition & 0 deletions Robust.Shared/Robust.Shared.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
<ProjectReference Include="..\NetSerializer\NetSerializer\NetSerializer.csproj" />
<ProjectReference Include="..\Robust.Physics\Robust.Physics.csproj" />
<ProjectReference Include="..\Robust.Shared.Maths\Robust.Shared.Maths.csproj" />
<ProjectReference Include="..\ZeroAllocJobScheduler\JobScheduler\JobScheduler.csproj" />
</ItemGroup>
<ItemGroup>
<EmbeddedResource Include="Utility\TypeAbbreviations.yaml">
Expand Down
59 changes: 55 additions & 4 deletions Robust.Shared/Threading/ParallelManager.cs
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using Robust.Shared.Configuration;
using Robust.Shared.IoC;
using Robust.Shared.Log;
using Robust.Shared.Utility;
using Schedulers;

namespace Robust.Shared.Threading;

Expand All @@ -18,6 +15,24 @@ public interface IParallelManager
/// Add the delegate to <see cref="ParallelCountChanged"/> and immediately invoke it.
/// </summary>
void AddAndInvokeParallelCountChanged(Action changed);

/// <summary>
/// Takes in a job that gets flushed.
/// </summary>
/// <param name="job"></param>
JobHandle Process(IJob job);

/// <summary>
/// Takes in a parallel job and runs it the specified amount.
/// </summary>
void ProcessNow(IJobParallelFor jobs, int amount);

/// <summary>
/// Takes in a parallel job and runs it without blocking.
/// </summary>
JobHandle Process(IJobParallelFor jobs, int amount);

void Wait(JobHandle handle);
}

internal interface IParallelManagerInternal : IParallelManager
Expand All @@ -29,12 +44,22 @@ internal sealed class ParallelManager : IParallelManagerInternal
{
[Dependency] private readonly IConfigurationManager _cfg = default!;

private JobScheduler _scheduler = default!;

public event Action? ParallelCountChanged;
public int ParallelProcessCount { get; private set; }

public void Initialize()
{
_cfg.OnValueChanged(CVars.ThreadParallelCount, UpdateCVar, true);

_scheduler = new JobScheduler(new JobScheduler.Config()
{
ThreadCount = ParallelProcessCount,
// Keep in mind parallel jobs count as 1.
MaxExpectedConcurrentJobs = Math.Max(32, ParallelProcessCount),
StrictAllocationMode = false,
});
}

public void AddAndInvokeParallelCountChanged(Action changed)
Expand All @@ -43,6 +68,13 @@ public void AddAndInvokeParallelCountChanged(Action changed)
changed();
}

public JobHandle Process(IJob job)
{
var handle = _scheduler.Schedule(job);
_scheduler.Flush();
return handle;
}

private void UpdateCVar(int value)
{
var oldCount = ParallelProcessCount;
Expand All @@ -51,5 +83,24 @@ private void UpdateCVar(int value)
if (oldCount != ParallelProcessCount)
ParallelCountChanged?.Invoke();
}

public void ProcessNow(IJobParallelFor job, int amount)
{
var handle = _scheduler.Schedule(job, amount);
_scheduler.Flush();
handle.Complete();
}

public JobHandle Process(IJobParallelFor job, int amount)
{
var handle = _scheduler.Schedule(job, amount);
_scheduler.Flush();
return handle;
}

public void Wait(JobHandle handle)
{
handle.Complete();
}
}

24 changes: 23 additions & 1 deletion Robust.UnitTesting/TestingParallelManager.cs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
using System;
using Robust.Shared.Threading;
using Schedulers;

namespace Robust.UnitTesting;

/// <summary>
/// Only allows 1 parallel process for testing purposes.
/// </summary>
/// </summary>j
public sealed class TestingParallelManager : IParallelManager
{
public event Action? ParallelCountChanged;
Expand All @@ -15,4 +16,25 @@ public void AddAndInvokeParallelCountChanged(Action changed)
// Gottem
return;
}

public void ProcessNow(IJobParallelFor job, int amount)
{
for (var i = 0; i < amount; i++)
{
job.Execute(i);
}

job.Finish();
}

public JobHandle Process(IJobParallelFor job, int amount)
{
for (var i = 0; i < amount; i++)
{
job.Execute(i);
}

job.Finish();
return new JobHandle();
}
}

0 comments on commit 330518e

Please sign in to comment.