Skip to content

Commit

Permalink
Merge pull request #80 from nayato/cancel-schedule
Browse files Browse the repository at this point in the history
Adds IEventExecutor.Schedule, proper cancellation of scheduled tasks
  • Loading branch information
nayato committed Apr 6, 2016
2 parents ae0798e + d5a4c30 commit 1986553
Show file tree
Hide file tree
Showing 23 changed files with 601 additions and 157 deletions.
2 changes: 1 addition & 1 deletion src/DotNetty.Buffers/ByteBufferUtil.cs
Original file line number Diff line number Diff line change
Expand Up @@ -579,7 +579,7 @@ public static string DecodeString(IByteBuffer src, int readerIndex, int len, Enc

if (src.HasArray)
{
return encoding.GetString(src.Array, readerIndex, len);
return encoding.GetString(src.Array, src.ArrayOffset + readerIndex, len);
}
else
{
Expand Down
15 changes: 15 additions & 0 deletions src/DotNetty.Common/Concurrency/AbstractEventExecutor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,21 @@ public void Execute(Action action)
this.Execute(new ActionTaskQueueNode(action));
}

public virtual IScheduledTask Schedule(Action action, TimeSpan delay)
{
throw new NotSupportedException();
}

public virtual IScheduledTask Schedule(Action<object> action, object state, TimeSpan delay)
{
throw new NotSupportedException();
}

public virtual IScheduledTask Schedule(Action<object, object> action, object context, object state, TimeSpan delay)
{
throw new NotSupportedException();
}

public virtual Task ScheduleAsync(Action action, TimeSpan delay)
{
return this.ScheduleAsync(action, delay, CancellationToken.None);
Expand Down
163 changes: 39 additions & 124 deletions src/DotNetty.Common/Concurrency/AbstractScheduledEventExecutor.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) Microsoft. All rights reserved.
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

namespace DotNetty.Common.Concurrency
Expand All @@ -15,8 +15,6 @@ namespace DotNetty.Common.Concurrency
/// </summary>
public abstract class AbstractScheduledEventExecutor : AbstractEventExecutor
{
static readonly Action<object, object> AddScheduledTaskAction = (e, t) => ((AbstractScheduledEventExecutor)e).ScheduledTaskQueue.Enqueue((IScheduledRunnable)t);

protected readonly PriorityQueue<IScheduledRunnable> ScheduledTaskQueue = new PriorityQueue<IScheduledRunnable>();

// TODO: support for EventExecutorGroup
Expand Down Expand Up @@ -94,172 +92,89 @@ protected bool HasScheduledTasks()
return scheduledTask != null && scheduledTask.Deadline <= PreciseTimeSpan.FromStart;
}

public override Task ScheduleAsync(Action action, TimeSpan delay, CancellationToken cancellationToken)
public override IScheduledTask Schedule(Action action, TimeSpan delay)
{
var scheduledTask = new ActionScheduledTask(action, PreciseTimeSpan.Deadline(delay), cancellationToken);
if (this.InEventLoop)
{
this.ScheduledTaskQueue.Enqueue(scheduledTask);
}
else
{
this.Execute(AddScheduledTaskAction, this, scheduledTask);
}
return scheduledTask.Completion;
}

public override Task ScheduleAsync(Action<object> action, object state, TimeSpan delay, CancellationToken cancellationToken)
{
var scheduledTask = new StateActionScheduledTask(action, state, PreciseTimeSpan.Deadline(delay), cancellationToken);
if (this.InEventLoop)
{
this.ScheduledTaskQueue.Enqueue(scheduledTask);
}
else
{
this.Execute(AddScheduledTaskAction, this, scheduledTask);
}
return scheduledTask.Completion;
return this.Schedule(new ActionScheduledTask(this, action, PreciseTimeSpan.Deadline(delay)));
}

public override Task ScheduleAsync(Action<object, object> action, object context, object state, TimeSpan delay, CancellationToken cancellationToken)
public override IScheduledTask Schedule(Action<object> action, object state, TimeSpan delay)
{
var scheduledTask = new StateActionWithContextScheduledTask(action, context, state, PreciseTimeSpan.Deadline(delay), cancellationToken);
if (this.InEventLoop)
{
this.ScheduledTaskQueue.Enqueue(scheduledTask);
}
else
{
this.Execute(AddScheduledTaskAction, this, scheduledTask);
}
return scheduledTask.Completion;
return this.Schedule(new StateActionScheduledTask(this, action, state, PreciseTimeSpan.Deadline(delay)));
}

#region Scheduled task data structures

protected interface IScheduledRunnable : IRunnable, IComparable<IScheduledRunnable>
public override IScheduledTask Schedule(Action<object, object> action, object context, object state, TimeSpan delay)
{
PreciseTimeSpan Deadline { get; }

bool Cancel();
return this.Schedule(new StateActionWithContextScheduledTask(this, action, context, state, PreciseTimeSpan.Deadline(delay)));
}

protected abstract class ScheduledTaskBase : MpscLinkedQueueNode<IRunnable>, IScheduledRunnable
public override Task ScheduleAsync(Action action, TimeSpan delay, CancellationToken cancellationToken)
{
readonly TaskCompletionSource promise;

protected ScheduledTaskBase(PreciseTimeSpan deadline, TaskCompletionSource promise, CancellationToken cancellationToken)
{
this.promise = promise;
this.Deadline = deadline;
this.CancellationToken = cancellationToken;
}

public PreciseTimeSpan Deadline { get; private set; }

public bool Cancel()
if (cancellationToken.IsCancellationRequested)
{
return this.promise.TrySetCanceled();
return TaskEx.Cancelled;
}

public Task Completion
if (!cancellationToken.CanBeCanceled)
{
get { return this.promise.Task; }
return this.Schedule(action, delay).Completion;
}

public CancellationToken CancellationToken { get; private set; }

int IComparable<IScheduledRunnable>.CompareTo(IScheduledRunnable other)
{
Contract.Requires(other != null);

return this.Deadline.CompareTo(other.Deadline);
}
return this.Schedule(new ActionScheduledAsyncTask(this, action, PreciseTimeSpan.Deadline(delay), cancellationToken)).Completion;
}

public override IRunnable Value
public override Task ScheduleAsync(Action<object> action, object state, TimeSpan delay, CancellationToken cancellationToken)
{
if (cancellationToken.IsCancellationRequested)
{
get { return this; }
return TaskEx.Cancelled;
}

public void Run()
if (!cancellationToken.CanBeCanceled)
{
if (this.CancellationToken.IsCancellationRequested)
{
this.promise.TrySetCanceled();
return;
}
if (this.Completion.IsCanceled)
{
return;
}
try
{
this.Execute();
this.promise.TryComplete();
}
catch (Exception ex)
{
// todo: check for fatal
this.promise.TrySetException(ex);
}
return this.Schedule(action, state, delay).Completion;
}

protected abstract void Execute();
return this.Schedule(new StateActionScheduledAsyncTask(this, action, state, PreciseTimeSpan.Deadline(delay), cancellationToken)).Completion;
}

sealed class ActionScheduledTask : ScheduledTaskBase
public override Task ScheduleAsync(Action<object, object> action, object context, object state, TimeSpan delay, CancellationToken cancellationToken)
{
readonly Action action;

public ActionScheduledTask(Action action, PreciseTimeSpan deadline, CancellationToken cancellationToken)
: base(deadline, new TaskCompletionSource(), cancellationToken)
if (cancellationToken.IsCancellationRequested)
{
this.action = action;
return TaskEx.Cancelled;
}

protected override void Execute()
if (!cancellationToken.CanBeCanceled)
{
this.action();
return this.Schedule(action, context, state, delay).Completion;
}

return this.Schedule(new StateActionWithContextScheduledAsyncTask(this, action, context, state, PreciseTimeSpan.Deadline(delay), cancellationToken)).Completion;
}

sealed class StateActionScheduledTask : ScheduledTaskBase
protected IScheduledRunnable Schedule(IScheduledRunnable task)
{
readonly Action<object> action;

public StateActionScheduledTask(Action<object> action, object state, PreciseTimeSpan deadline,
CancellationToken cancellationToken)
: base(deadline, new TaskCompletionSource(state), cancellationToken)
if (this.InEventLoop)
{
this.action = action;
this.ScheduledTaskQueue.Enqueue(task);
}

protected override void Execute()
else
{
this.action(this.Completion.AsyncState);
this.Execute((e, t) => ((AbstractScheduledEventExecutor)e).ScheduledTaskQueue.Enqueue((IScheduledRunnable)t), this, task);
}
return task;
}

sealed class StateActionWithContextScheduledTask : ScheduledTaskBase
internal void RemoveScheduled(IScheduledRunnable task)
{
readonly Action<object, object> action;
readonly object context;

public StateActionWithContextScheduledTask(Action<object, object> action, object context, object state,
PreciseTimeSpan deadline, CancellationToken cancellationToken)
: base(deadline, new TaskCompletionSource(state), cancellationToken)
if (this.InEventLoop)
{
this.action = action;
this.context = context;
this.ScheduledTaskQueue.Remove(task);
}

protected override void Execute()
else
{
this.action(this.context, this.Completion.AsyncState);
this.Execute((e, t) => ((AbstractScheduledEventExecutor)e).ScheduledTaskQueue.Remove((IScheduledRunnable)t), this, task);
}
}

#endregion
}
}
24 changes: 24 additions & 0 deletions src/DotNetty.Common/Concurrency/ActionScheduledAsyncTask.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

namespace DotNetty.Common.Concurrency
{
using System;
using System.Threading;

sealed class ActionScheduledAsyncTask : ScheduledAsyncTask
{
readonly Action action;

public ActionScheduledAsyncTask(AbstractScheduledEventExecutor executor, Action action, PreciseTimeSpan deadline, CancellationToken cancellationToken)
: base(executor, deadline, new TaskCompletionSource(), cancellationToken)
{
this.action = action;
}

protected override void Execute()
{
this.action();
}
}
}
23 changes: 23 additions & 0 deletions src/DotNetty.Common/Concurrency/ActionScheduledTask.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

namespace DotNetty.Common.Concurrency
{
using System;

sealed class ActionScheduledTask : ScheduledTask
{
readonly Action action;

public ActionScheduledTask(AbstractScheduledEventExecutor executor, Action action, PreciseTimeSpan deadline)
: base(executor, deadline, new TaskCompletionSource())
{
this.action = action;
}

protected override void Execute()
{
this.action();
}
}
}
28 changes: 28 additions & 0 deletions src/DotNetty.Common/Concurrency/IEventExecutor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,34 @@ public interface IEventExecutor
/// </remarks>
void Execute(Action<object, object> action, object context, object state);

/// <summary>
/// Schedules the given action for execution after the specified delay would pass.
/// </summary>
/// <remarks>
/// <para>Threading specifics are determined by <c>IEventExecutor</c> implementation.</para>
/// </remarks>
IScheduledTask Schedule(Action action, TimeSpan delay);

/// <summary>
/// Schedules the given action for execution after the specified delay would pass.
/// </summary>
/// <remarks>
/// <paramref name="state"/> parameter is useful to when repeated execution of an action against
/// different objects is needed.
/// <para>Threading specifics are determined by <c>IEventExecutor</c> implementation.</para>
/// </remarks>
IScheduledTask Schedule(Action<object> action, object state, TimeSpan delay);

/// <summary>
/// Schedules the given action for execution after the specified delay would pass.
/// </summary>
/// <remarks>
/// <paramref name="context"/> and <paramref name="state"/> parameters are useful when repeated execution of
/// an action against different objects in different context is needed.
/// <para>Threading specifics are determined by <c>IEventExecutor</c> implementation.</para>
/// </remarks>
IScheduledTask Schedule(Action<object, object> action, object context, object state, TimeSpan delay);

/// <summary>
/// Schedules the given action for execution after the specified delay would pass.
/// </summary>
Expand Down
11 changes: 11 additions & 0 deletions src/DotNetty.Common/Concurrency/IScheduledRunnable.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

namespace DotNetty.Common.Concurrency
{
using System;

public interface IScheduledRunnable : IRunnable, IScheduledTask, IComparable<IScheduledRunnable>
{
}
}
19 changes: 19 additions & 0 deletions src/DotNetty.Common/Concurrency/IScheduledTask.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

namespace DotNetty.Common.Concurrency
{
using System.Runtime.CompilerServices;
using System.Threading.Tasks;

public interface IScheduledTask
{
bool Cancel();

PreciseTimeSpan Deadline { get; }

Task Completion { get; }

TaskAwaiter GetAwaiter();
}
}
Loading

0 comments on commit 1986553

Please sign in to comment.