diff --git a/src/DotNetty.Common/Concurrency/SingleThreadEventExecutor.cs b/src/DotNetty.Common/Concurrency/SingleThreadEventExecutor.cs index e47fc77ee..7699b6cc9 100644 --- a/src/DotNetty.Common/Concurrency/SingleThreadEventExecutor.cs +++ b/src/DotNetty.Common/Concurrency/SingleThreadEventExecutor.cs @@ -27,7 +27,6 @@ public class SingleThreadEventExecutor : AbstractScheduledEventExecutor readonly MpscLinkedQueue taskQueue = new MpscLinkedQueue(); Thread thread; volatile int executionState = ST_NOT_STARTED; - readonly TimeSpan breakoutInterval; readonly PreciseTimeSpan preciseBreakoutInterval; PreciseTimeSpan lastExecutionTime; readonly ManualResetEventSlim emptyEvent = new ManualResetEventSlim(); @@ -41,7 +40,6 @@ public class SingleThreadEventExecutor : AbstractScheduledEventExecutor public SingleThreadEventExecutor(string threadName, TimeSpan breakoutInterval) { this.terminationCompletionSource = new TaskCompletionSource(); - this.breakoutInterval = breakoutInterval; this.preciseBreakoutInterval = PreciseTimeSpan.FromTimeSpan(breakoutInterval); this.scheduler = new ExecutorTaskScheduler(this); this.thread = new Thread(this.Loop) @@ -400,10 +398,23 @@ IRunnable PollTask() if (task == null) { this.emptyEvent.Reset(); - if ((task = this.taskQueue.Dequeue()) == null // revisit queue as producer might have put a task in meanwhile - && this.emptyEvent.Wait(this.breakoutInterval)) + if ((task = this.taskQueue.Dequeue()) == null) // revisit queue as producer might have put a task in meanwhile { - task = this.taskQueue.Dequeue(); + IScheduledRunnable nextScheduledTask = this.ScheduledTaskQueue.Peek(); + if (nextScheduledTask != null) + { + TimeSpan wakeUpTimeout = (nextScheduledTask.Deadline - PreciseTimeSpan.FromStart).ToTimeSpan(); + if (this.emptyEvent.Wait(wakeUpTimeout)) + { + // woken up before the next scheduled task was due + task = this.taskQueue.Dequeue(); + } + } + else + { + this.emptyEvent.Wait(); + task = this.taskQueue.Dequeue(); + } } } diff --git a/src/DotNetty.Common/DotNetty.Common.csproj b/src/DotNetty.Common/DotNetty.Common.csproj index 17e424f97..4e8ed3589 100644 --- a/src/DotNetty.Common/DotNetty.Common.csproj +++ b/src/DotNetty.Common/DotNetty.Common.csproj @@ -163,7 +163,7 @@ - + diff --git a/src/DotNetty.Common/Timestamp.cs b/src/DotNetty.Common/PreciseTimeSpan.cs similarity index 100% rename from src/DotNetty.Common/Timestamp.cs rename to src/DotNetty.Common/PreciseTimeSpan.cs diff --git a/src/DotNetty.Transport/Channels/SingleThreadEventLoop.cs b/src/DotNetty.Transport/Channels/SingleThreadEventLoop.cs index 25f9eac5a..5faea72c6 100644 --- a/src/DotNetty.Transport/Channels/SingleThreadEventLoop.cs +++ b/src/DotNetty.Transport/Channels/SingleThreadEventLoop.cs @@ -9,7 +9,7 @@ namespace DotNetty.Transport.Channels public class SingleThreadEventLoop : SingleThreadEventExecutor, IEventLoop { - static readonly TimeSpan DefaultBreakoutInterval = TimeSpan.FromSeconds(5); + static readonly TimeSpan DefaultBreakoutInterval = TimeSpan.FromMilliseconds(100); public SingleThreadEventLoop() : this(null, DefaultBreakoutInterval) diff --git a/test/DotNetty.Common.Tests/Concurrency/SingleThreadEventExecutorTests.cs b/test/DotNetty.Common.Tests/Concurrency/SingleThreadEventExecutorTests.cs index 3a32dba6d..0e14a9e1b 100644 --- a/test/DotNetty.Common.Tests/Concurrency/SingleThreadEventExecutorTests.cs +++ b/test/DotNetty.Common.Tests/Concurrency/SingleThreadEventExecutorTests.cs @@ -85,7 +85,40 @@ public void FuzzyScheduling(int producerCount, bool perCpu, int taskPerProducer) Assert.True(mre.WaitOne(TimeSpan.FromSeconds(5))); } - public class Container + [Theory] + [InlineData(true)] + [InlineData(false)] + public async Task ScheduledTaskFiresOnTime(bool scheduleFromExecutor) + { + var scheduler = new SingleThreadEventExecutor(null, TimeSpan.FromMinutes(1)); + var promise = new TaskCompletionSource(); + Func scheduleFunc = () => scheduler.ScheduleAsync(() => promise.Complete(), TimeSpan.FromMilliseconds(100)); + Task task = scheduleFromExecutor ? await scheduler.SubmitAsync(scheduleFunc) : scheduleFunc(); + await Task.WhenAny(task, Task.Delay(TimeSpan.FromMilliseconds(300))); + Assert.True(task.IsCompleted); + } + + [Fact] + public async Task ScheduledTaskFiresOnTimeWhileBusy() + { + var scheduler = new SingleThreadEventExecutor(null, TimeSpan.FromMilliseconds(10)); + var promise = new TaskCompletionSource(); + Action selfQueueAction = null; + selfQueueAction = () => + { + if (!promise.Task.IsCompleted) + { + scheduler.Execute(selfQueueAction); + } + }; + + scheduler.Execute(selfQueueAction); + Task task = scheduler.ScheduleAsync(() => promise.Complete(), TimeSpan.FromMilliseconds(100)); + await Task.WhenAny(task, Task.Delay(TimeSpan.FromMilliseconds(300))); + Assert.True(task.IsCompleted); + } + + class Container { public T Value; }