Skip to content

Commit

Permalink
Refactor Common.Concurrency.*
Browse files Browse the repository at this point in the history
    - Update SingleThreadEventExecutor
    - Fixes SingleThreadEventExecutor::ShutdownGracefullyAsync
    - Add AbstractScheduledEventExecutor::ScheduleAtFixedRate
    - Add AbstractScheduledEventExecutor::ScheduleWithFixedDelay
    - Add DefaultEventExecutorChooserFactory
    - Rename PriorityQueue to DefaultPriorityQueue

* Refactor `Deque`
  • Loading branch information
cuteant committed Jul 26, 2020
1 parent e91da9b commit cdef32c
Show file tree
Hide file tree
Showing 146 changed files with 7,096 additions and 2,398 deletions.
19 changes: 19 additions & 0 deletions DotNetty.Netstandard.sln
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DotNetty.Codecs.Protobuf.Ne
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DotNetty.Codecs.Protobuf.Tests", "test\DotNetty.Codecs.Protobuf.Tests.Netstandard\DotNetty.Codecs.Protobuf.Tests.csproj", "{EE14EB67-04A4-45AE-91F0-0A0DB36D7C0B}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DotNetty.TestSuite", "test\DotNetty.TestSuite.Netstandard\DotNetty.TestSuite.csproj", "{C79616CE-4FDF-4128-ABF9-D1DEFCD5DE1F}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -457,6 +459,22 @@ Global
{EE14EB67-04A4-45AE-91F0-0A0DB36D7C0B}.Release|x64.Build.0 = Release|Any CPU
{EE14EB67-04A4-45AE-91F0-0A0DB36D7C0B}.Release|x86.ActiveCfg = Release|Any CPU
{EE14EB67-04A4-45AE-91F0-0A0DB36D7C0B}.Release|x86.Build.0 = Release|Any CPU
{C79616CE-4FDF-4128-ABF9-D1DEFCD5DE1F}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{C79616CE-4FDF-4128-ABF9-D1DEFCD5DE1F}.Debug|Any CPU.Build.0 = Debug|Any CPU
{C79616CE-4FDF-4128-ABF9-D1DEFCD5DE1F}.Debug|ARM.ActiveCfg = Debug|Any CPU
{C79616CE-4FDF-4128-ABF9-D1DEFCD5DE1F}.Debug|ARM.Build.0 = Debug|Any CPU
{C79616CE-4FDF-4128-ABF9-D1DEFCD5DE1F}.Debug|x64.ActiveCfg = Debug|Any CPU
{C79616CE-4FDF-4128-ABF9-D1DEFCD5DE1F}.Debug|x64.Build.0 = Debug|Any CPU
{C79616CE-4FDF-4128-ABF9-D1DEFCD5DE1F}.Debug|x86.ActiveCfg = Debug|Any CPU
{C79616CE-4FDF-4128-ABF9-D1DEFCD5DE1F}.Debug|x86.Build.0 = Debug|Any CPU
{C79616CE-4FDF-4128-ABF9-D1DEFCD5DE1F}.Release|Any CPU.ActiveCfg = Release|Any CPU
{C79616CE-4FDF-4128-ABF9-D1DEFCD5DE1F}.Release|Any CPU.Build.0 = Release|Any CPU
{C79616CE-4FDF-4128-ABF9-D1DEFCD5DE1F}.Release|ARM.ActiveCfg = Release|Any CPU
{C79616CE-4FDF-4128-ABF9-D1DEFCD5DE1F}.Release|ARM.Build.0 = Release|Any CPU
{C79616CE-4FDF-4128-ABF9-D1DEFCD5DE1F}.Release|x64.ActiveCfg = Release|Any CPU
{C79616CE-4FDF-4128-ABF9-D1DEFCD5DE1F}.Release|x64.Build.0 = Release|Any CPU
{C79616CE-4FDF-4128-ABF9-D1DEFCD5DE1F}.Release|x86.ActiveCfg = Release|Any CPU
{C79616CE-4FDF-4128-ABF9-D1DEFCD5DE1F}.Release|x86.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down Expand Up @@ -486,6 +504,7 @@ Global
{DB184943-0CE5-4D89-B808-9B6DFFAFE720} = {2CCCD679-102A-4422-97D8-DA1A55DAFCA5}
{F1355115-E7A6-439B-AFD8-521D543F458B} = {3D04C4DC-6F8E-4326-9569-92F3E26C6EEB}
{EE14EB67-04A4-45AE-91F0-0A0DB36D7C0B} = {2CCCD679-102A-4422-97D8-DA1A55DAFCA5}
{C79616CE-4FDF-4128-ABF9-D1DEFCD5DE1F} = {2CCCD679-102A-4422-97D8-DA1A55DAFCA5}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {A659CEFB-DDB3-49BE-AEDD-FF2F1B3297DB}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ namespace DotNetty.Microbench.Concurrency
using BenchmarkDotNet.Jobs;
using DotNetty.Common.Concurrency;
using DotNetty.Common.Internal;
using DotNetty.Transport.Channels;

[SimpleJob(RuntimeMoniker.Net48)]
[SimpleJob(RuntimeMoniker.NetCoreApp31)]
Expand All @@ -17,30 +18,36 @@ namespace DotNetty.Microbench.Concurrency
public class SingleThreadEventExecutorBenchmark
{
const int Iterations = 10 * 1000 * 1000;
TestExecutor concurrentQueueExecutor;
TestExecutor fixedMpscQueueExecutor;
ITestExecutor _singleThreadEventLoop;
ITestExecutor _concurrentQueueExecutor;
ITestExecutor _fixedMpscQueueExecutor;

[GlobalSetup]
public void GlobalSetup()
{
this.concurrentQueueExecutor = new TestExecutor("CompatibleConcurrentQueue", TimeSpan.FromSeconds(1), new CompatibleConcurrentQueue<IRunnable>());
this.fixedMpscQueueExecutor = new TestExecutor("FixedMpscQueue", TimeSpan.FromSeconds(1), PlatformDependent.NewFixedMpscQueue<IRunnable>(1 * 1000 * 1000));
_singleThreadEventLoop = new NewTestExecutor("SingleThreadEventLoop", TimeSpan.FromSeconds(1));
_concurrentQueueExecutor = new TestExecutor("CompatibleConcurrentQueue", TimeSpan.FromSeconds(1), new CompatibleConcurrentQueue<IRunnable>());
_fixedMpscQueueExecutor = new TestExecutor("FixedMpscQueue", TimeSpan.FromSeconds(1), PlatformDependent.NewFixedMpscQueue<IRunnable>(1 * 1000 * 1000));
}

[GlobalCleanup]
public void GlobalCleanup()
{
this.concurrentQueueExecutor?.ShutdownGracefullyAsync();
this.fixedMpscQueueExecutor?.ShutdownGracefullyAsync();
_singleThreadEventLoop?.ShutdownGracefullyAsync();
_concurrentQueueExecutor?.ShutdownGracefullyAsync();
_fixedMpscQueueExecutor?.ShutdownGracefullyAsync();
}

[Benchmark(Baseline = true)]
public void LoopConcurrentQueue() => Run(_singleThreadEventLoop);

[Benchmark]
public void ConcurrentQueue() => Run(this.concurrentQueueExecutor);
public void ConcurrentQueue() => Run(_concurrentQueueExecutor);

[Benchmark]
public void FixedMpscQueue() => Run(this.fixedMpscQueueExecutor);
public void FixedMpscQueue() => Run(_fixedMpscQueueExecutor);

static void Run(TestExecutor executor)
static void Run(ITestExecutor executor)
{
var mre = new ManualResetEvent(false);
var actionIn = new BenchActionIn(executor, mre);
Expand All @@ -67,55 +74,71 @@ static void Run(TestExecutor executor)
sealed class BenchActionIn : IRunnable
{
int value;
readonly IEventExecutor executor;
readonly ManualResetEvent evt;
readonly IEventExecutor _executor;
readonly ManualResetEvent _evt;

public BenchActionIn(IEventExecutor executor, ManualResetEvent evt)
{
this.executor = executor;
this.evt = evt;
_executor = executor;
_evt = evt;
}

public void Run()
{
if (++this.value < Iterations)
if (++value < Iterations)
{
this.executor.Execute(this);
_executor.Execute(this);
}
else
{
this.evt.Set();
_evt.Set();
}
}
}

sealed class BenchActionOut : IRunnable
{
int value;
readonly ManualResetEvent evt;
int _value;
readonly ManualResetEvent _evt;

public BenchActionOut(ManualResetEvent evt)
{
this.evt = evt;
_evt = evt;
}

public void Run()
{
if (++this.value >= Iterations)
if (++_value >= Iterations)
{
this.evt.Set();
_evt.Set();
}
}
}

sealed class TestExecutor : SingleThreadEventExecutor
interface ITestExecutor : IEventExecutor
{
string Name { get; }
}

sealed class NewTestExecutor : SingleThreadEventLoop, ITestExecutor
{
public NewTestExecutor(string threadName, TimeSpan breakoutInterval)
: base(null, breakoutInterval)
{
Name = threadName;
}

public string Name { get; }
}

sealed class TestExecutor : SingleThreadEventExecutorOld, ITestExecutor
{
public string Name { get; }

public TestExecutor(string threadName, TimeSpan breakoutInterval, IQueue<IRunnable> queue)
: base(threadName, breakoutInterval, queue)
{
this.Name = threadName;
Name = threadName;
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/DotNetty.Buffers/PoolArena.cs
Original file line number Diff line number Diff line change
Expand Up @@ -864,7 +864,7 @@ public unsafe OwnedPinnedBlock(byte[] array, void* origin, int offset, int lengt
protected override bool TryGetArray(out ArraySegment<byte> segment)
{
segment = new ArraySegment<byte>(_array, _offset, _length);
return true; ;
return true;
}

public unsafe override Span<byte> GetSpan()
Expand Down
4 changes: 2 additions & 2 deletions src/DotNetty.Codecs.Http/HttpClientCodec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ protected override void Encode(IChannelHandlerContext context, object message, L

if (message is IHttpRequest request)
{
_clientCodec._queue.AddToBack(request.Method);
_clientCodec._queue.AddLast(request.Method);
}

base.Encode(context, message, output);
Expand Down Expand Up @@ -186,7 +186,7 @@ protected override bool IsContentAlwaysEmpty(IHttpMessage msg)
//
// Even if we do not use the method to compare we still need to poll it to ensure we keep
// request / response pairs in sync.
_ = _clientCodec._queue.TryRemoveFromFront(out HttpMethod method);
_ = _clientCodec._queue.TryRemoveFirst(out HttpMethod method);

int statusCode = ((IHttpResponse)msg).Status.Code;
if (statusCode >= 100 && statusCode < 200)
Expand Down
4 changes: 2 additions & 2 deletions src/DotNetty.Codecs.Http/HttpContentEncoder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ protected override void Decode(IChannelHandlerContext ctx, IHttpRequest msg, Lis
acceptEncoding = ZeroLengthConnect;
}

_acceptEncodingQueue.AddToBack(acceptEncoding);
_acceptEncodingQueue.AddLast(acceptEncoding);
output.Add(ReferenceCountUtil.Retain(msg));
}

Expand Down Expand Up @@ -92,7 +92,7 @@ protected override void Encode(IChannelHandlerContext ctx, IHttpObject msg, List
else
{
// Get the list of encodings accepted by the peer.
if (!_acceptEncodingQueue.TryRemoveFromFront(out acceptEncoding))
if (!_acceptEncodingQueue.TryRemoveFirst(out acceptEncoding))
{
ThrowHelper.ThrowInvalidOperationException_CannotSendMore();
}
Expand Down
4 changes: 2 additions & 2 deletions src/DotNetty.Codecs.Http/HttpServerCodec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ protected override void Decode(IChannelHandlerContext context, IByteBuffer buffe
{
if (output[i] is IHttpRequest request)
{
this.serverCodec.queue.AddToBack(request.Method);
this.serverCodec.queue.AddLast(request.Method);
}
}
}
Expand Down Expand Up @@ -135,7 +135,7 @@ protected override void SanitizeHeadersBeforeEncode(IHttpResponse msg, bool isAl

protected override bool IsContentAlwaysEmpty(IHttpResponse msg)
{
_ = this.serverCodec.queue.TryRemoveFromFront(out this.method);
_ = this.serverCodec.queue.TryRemoveFirst(out this.method);
return HttpMethod.Head.Equals(this.method) || base.IsContentAlwaysEmpty(msg);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ public void Close(IPromise promise)
var inboundBuffer = ch._inboundBuffer;
if (inboundBuffer is object)
{
while (inboundBuffer.TryRemoveFromFront(out var msg))
while (inboundBuffer.TryRemoveFirst(out var msg))
{
_ = ReferenceCountUtil.Release(msg);
}
Expand Down Expand Up @@ -239,7 +239,7 @@ internal void DoBeginRead()
while (ch._readStatus != ReadStatus.Idle)
{
var inboundBuffer = ch._inboundBuffer;
if (inboundBuffer is null || (!inboundBuffer.TryRemoveFromFront(out var message)))
if (inboundBuffer is null || (!inboundBuffer.TryRemoveFirst(out var message)))
{
if (ReadEOS)
{
Expand All @@ -258,7 +258,7 @@ internal void DoBeginRead()
{
DoRead0((IHttp2Frame)message, allocHandle);
} while ((ReadEOS || (continueReading = allocHandle.ContinueReading())) &&
inboundBuffer.TryRemoveFromFront(out message));
inboundBuffer.TryRemoveFirst(out message));

if (continueReading && ch.IsParentReadInProgress && !ReadEOS)
{
Expand Down
2 changes: 1 addition & 1 deletion src/DotNetty.Codecs.Http2/AbstractHttp2StreamChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ internal void FireChildRead(IHttp2Frame frame)
{
_inboundBuffer = new Deque<object>(4);
}
_inboundBuffer.AddToBack(frame);
_inboundBuffer.AddLast(frame);
}
}

Expand Down
6 changes: 3 additions & 3 deletions src/DotNetty.Codecs.Http2/DefaultHttp2Connection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -975,7 +975,7 @@ public void Activate(DefaultHttp2Stream stream)
}
else
{
_pendingEvents.AddToBack(() => AddToActiveStreams(stream));
_pendingEvents.AddLast(() => AddToActiveStreams(stream));
}
}

Expand All @@ -987,7 +987,7 @@ internal void Deactivate(DefaultHttp2Stream stream, bool force)
}
else
{
_pendingEvents.AddToBack(() => RemoveFromActiveStreams(stream));
_pendingEvents.AddLast(() => RemoveFromActiveStreams(stream));
}
}

Expand Down Expand Up @@ -1081,7 +1081,7 @@ internal void DecrementPendingIterations()
--_pendingIterations;
if (AllowModifications())
{
while (_pendingEvents.TryRemoveFromFront(out Action evt))
while (_pendingEvents.TryRemoveFirst(out Action evt))
{
try
{
Expand Down
8 changes: 4 additions & 4 deletions src/DotNetty.Codecs.Http2/DefaultHttp2ConnectionEncoder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public void LifecycleManager(IHttp2LifecycleManager lifecycleManager)

public IHttp2RemoteFlowController FlowController => _connection.Remote.FlowController;

public Http2Settings PollSentSettings => _outstandingLocalSettingsQueue.RemoveFromFront();
public Http2Settings PollSentSettings => _outstandingLocalSettingsQueue.RemoveFirst();

public virtual void RemoteSettings(Http2Settings settings)
{
Expand Down Expand Up @@ -299,7 +299,7 @@ public virtual Task WriteRstStreamAsync(IChannelHandlerContext ctx, int streamId

public virtual Task WriteSettingsAsync(IChannelHandlerContext ctx, Http2Settings settings, IPromise promise)
{
_outstandingLocalSettingsQueue.AddToBack(settings);
_outstandingLocalSettingsQueue.AddLast(settings);
try
{
var pushEnabled = settings.PushEnabled();
Expand All @@ -323,7 +323,7 @@ public virtual Task WriteSettingsAckAsync(IChannelHandlerContext ctx, IPromise p
{
return _frameWriter.WriteSettingsAckAsync(ctx, promise);
}
Http2Settings settings = _outstandingRemoteSettingsQueue.RemoveFromFront();
Http2Settings settings = _outstandingRemoteSettingsQueue.RemoveFirst();
if (settings is null)
{
_ = promise.TrySetException(ThrowHelper.GetConnectionError_attempted_to_write_a_SETTINGS_ACK_with_no_pending_SETTINGS());
Expand Down Expand Up @@ -451,7 +451,7 @@ public void ConsumeReceivedSettings(Http2Settings settings)
{
_outstandingRemoteSettingsQueue = new Deque<Http2Settings>(2);
}
_outstandingRemoteSettingsQueue.AddToBack(settings);
_outstandingRemoteSettingsQueue.AddLast(settings);
}

/// <summary>
Expand Down
8 changes: 4 additions & 4 deletions src/DotNetty.Codecs.Http2/DefaultHttp2RemoteFlowController.cs
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ internal int WriteAllocatedBytes(int allocated)
// This frame has been fully written, remove this frame and notify it.
// Since we remove this frame first, we're guaranteed that its error
// method will not be called when we call cancel.
_ = _pendingWriteQueue.TryRemoveFromFront(out _);
_ = _pendingWriteQueue.TryRemoveFirst(out _);
frame.WriteComplete();
}
}
Expand Down Expand Up @@ -455,7 +455,7 @@ internal void EnqueueFrame(IHttp2RemoteFlowControlled frame)

void EnqueueFrameWithoutMerge(IHttp2RemoteFlowControlled frame)
{
_pendingWriteQueue.AddToBack(frame);
_pendingWriteQueue.AddLast(frame);
// This must be called after adding to the queue in order so that hasFrame() is
// updated before updating the stream state.
IncrementPendingBytes(frame.Size, true);
Expand Down Expand Up @@ -483,15 +483,15 @@ internal void Cancel(Http2Error error, Exception cause = null)
// Ensure that the queue can't be modified while we are writing.
if (_writing) { return; }

if (_pendingWriteQueue.TryRemoveFromFront(out IHttp2RemoteFlowControlled frame))
if (_pendingWriteQueue.TryRemoveFirst(out IHttp2RemoteFlowControlled frame))
{
// Only create exception once and reuse to reduce overhead of filling in the stacktrace.
Http2Exception exception = ThrowHelper.GetStreamError_StreamClosedBeforeWriteCouldTakePlace(
_stream.Id, error, cause);
do
{
WriteError(frame, exception);
} while (_pendingWriteQueue.TryRemoveFromFront(out frame));
} while (_pendingWriteQueue.TryRemoveFirst(out frame));
}

_controller._streamByteDistributor.UpdateStreamableBytes(this);
Expand Down
4 changes: 2 additions & 2 deletions src/DotNetty.Codecs.Http2/MaxCapacityQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@ public bool TryEnqueue(T item)
var queue = _queue;
if ((uint)_maxCapacity > (uint)queue.Count)
{
queue.AddToBack(item);
queue.AddLast(item);
return true;
}
return false;
}

public bool TryDequeue(out T result) => _queue.TryRemoveFromFront(out result);
public bool TryDequeue(out T result) => _queue.TryRemoveFirst(out result);

public void Clear() => _queue.Clear();
}
Expand Down
Loading

0 comments on commit cdef32c

Please sign in to comment.