Skip to content

Commit

Permalink
Extended session support (#289)
Browse files Browse the repository at this point in the history
- Added support for extended sessions
- End-to-end test reliability and diagnostic improvements
- Refactored Event Grid tests
- Updated DurableTask.AzureStorage dependency to 1.2.0 (which brings in DurableTask.Core 2.0.0.5)
  • Loading branch information
cgillum authored May 1, 2018
1 parent 8f3ebfe commit 3700d19
Show file tree
Hide file tree
Showing 22 changed files with 977 additions and 606 deletions.
1 change: 1 addition & 0 deletions .stylecop/GlobalSuppressions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
// Project-level suppressions either have no target or are given
// a specific target and scoped to a namespace, type, member, etc.

[assembly: System.Diagnostics.CodeAnalysis.SuppressMessage("StyleCop.CSharp.DocumentationRules", "SA1611:Element parameters should be documented", Justification = "Test code does not require detailed documentation")]
[assembly: System.Diagnostics.CodeAnalysis.SuppressMessage("StyleCop.CSharp.DocumentationRules", "SA1623:Property summary documentation should match accessors", Justification = "In some cases we would rather prefix descriptions with 'Optional'")]
[assembly: System.Diagnostics.CodeAnalysis.SuppressMessage("StyleCop.CSharp.DocumentationRules", "SA1642:Constructor summary documentation should begin with standard text", Justification = "Not enforcing")]
[assembly: System.Diagnostics.CodeAnalysis.SuppressMessage("StyleCop.CSharp.LayoutRules", "SA1502:Element should not be on a single line", Justification = "This is more concise")]
Expand Down
24 changes: 12 additions & 12 deletions src/WebJobs.Extensions.DurableTask/DurableOrchestrationContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ public sealed class DurableOrchestrationContext : DurableOrchestrationContextBas
private string serializedInput;
private string serializedOutput;
private string serializedCustomStatus;
private int owningThreadId;

internal DurableOrchestrationContext(
DurableTaskExtension config,
Expand All @@ -48,7 +47,6 @@ internal DurableOrchestrationContext(
this.deferredTasks = new List<Func<Task>>();
this.orchestrationName = functionName;
this.orchestrationVersion = functionVersion;
this.owningThreadId = -1;
}

/// <inheritdoc />
Expand Down Expand Up @@ -77,11 +75,6 @@ internal DurableOrchestrationContext(

internal IList<HistoryEvent> History { get; set; }

internal void AssignToCurrentThread()
{
this.owningThreadId = Thread.CurrentThread.ManagedThreadId;
}

/// <summary>
/// Returns the orchestrator function input as a raw JSON string value.
/// </summary>
Expand Down Expand Up @@ -387,6 +380,16 @@ private async Task<TResult> CallDurableTaskFunctionAsync<TResult>(
e.InnerException?.Message);
throw new FunctionFailedException(message, e.InnerException);
}
catch (SubOrchestrationFailedException e)
{
exception = e;
string message = string.Format(
"The {0} function '{1}' failed: \"{2}\". See the function execution logs for additional details.",
functionType.ToString().ToLowerInvariant(),
functionName,
e.InnerException?.Message);
throw new FunctionFailedException(message, e.InnerException);
}
catch (Exception e)
{
exception = e;
Expand Down Expand Up @@ -457,13 +460,10 @@ private void ThrowIfInvalidAccess()
throw new InvalidOperationException("The inner context has not been initialized.");
}

// TODO: This should be considered best effort because it's possible that async work
// was scheduled and the CLR decided to run it on the same thread. The only guaranteed
// way to detect cross-thread access is to do it in the Durable Task Framework directly.
if (this.owningThreadId != -1 && this.owningThreadId != Thread.CurrentThread.ManagedThreadId)
if (!OrchestrationContext.IsOrchestratorThread)
{
throw new InvalidOperationException(
"Multithreaded execution was detected. This can happen if the orchestrator function code awaits on a task that was not created by a DurableOrchestrationContext method. More details can be found in this article https://docs.microsoft.com/en-us/azure/azure-functions/durable-functions-checkpointing-and-replay#orchestrator-code-constraints .");
"Multithreaded execution was detected. This can happen if the orchestrator function code awaits on a task that was not created by a DurableOrchestrationContext method. More details can be found in this article https://docs.microsoft.com/en-us/azure/azure-functions/durable-functions-checkpointing-and-replay#orchestrator-code-constraints.");
}
}

Expand Down
48 changes: 47 additions & 1 deletion src/WebJobs.Extensions.DurableTask/DurableTaskExtension.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

using System;
using System.Collections.Concurrent;
using System.Net;
using System.Net.Http;
using System.Reflection;
using System.Text;
Expand Down Expand Up @@ -155,6 +154,36 @@ public class DurableTaskExtension :
/// </summary>
public string EventGridKeySettingName { get; set; }

/// <summary>
/// Gets or sets a flag indicating whether to enable extended sessions.
/// </summary>
/// <remarks>
/// <para>Extended sessions can improve the performance of orchestrator functions by allowing them to skip
/// replays when new messages are received within short periods of time.</para>
/// <para>Note that orchestrator functions which are extended this way will continue to count against the
/// <see cref="MaxConcurrentOrchestratorFunctions"/> limit. To avoid starvation, only half of the maximum
/// number of allowed concurrent orchestrator functions can be concurrently extended at any given time.
/// The <see cref="ExtendedSessionIdleTimeoutInSeconds"/> property can also be used to control how long an idle
/// orchestrator function is allowed to be extended.</para>
/// <para>It is recommended that this property be set to <c>false</c> during development to help
/// ensure that the orchestrator code correctly obeys the idempotency rules.</para>
/// </remarks>
/// <value>
/// <c>true</c> to enable extended sessions; otherwise <c>false</c>.
/// </value>
public bool ExtendedSessionsEnabled { get; set; }

/// <summary>
/// Gets or sets the amount of time in seconds before an idle session times out. The default value is 30 seconds.
/// </summary>
/// <remarks>
/// This setting is applicable when <see cref="ExtendedSessionsEnabled"/> is set to <c>true</c>.
/// </remarks>
/// <value>
/// The number of seconds before an idle session times out.
/// </value>
public int ExtendedSessionIdleTimeoutInSeconds { get; set; } = 30;

internal LifeCycleNotificationHelper LifeCycleNotificationHelper => this.lifeCycleNotificationHelper;

internal EndToEndTraceHelper TraceHelper => this.traceHelper;
Expand Down Expand Up @@ -206,6 +235,17 @@ void IExtensionConfigProvider.Initialize(ExtensionConfigContext context)
this.orchestrationService = new AzureStorageOrchestrationService(settings);
this.taskHubWorker = new TaskHubWorker(this.orchestrationService, this, this);
this.taskHubWorker.AddOrchestrationDispatcherMiddleware(this.OrchestrationMiddleware);

context.Config.AddService<IOrchestrationService>(this.orchestrationService);
}

/// <summary>
/// Deletes all data stored in the current task hub.
/// </summary>
/// <returns>A task representing the async delete operation.</returns>
public Task DeleteTaskHubAsync()
{
return this.orchestrationService.DeleteAsync();
}

/// <summary>
Expand Down Expand Up @@ -374,15 +414,21 @@ internal AzureStorageOrchestrationServiceSettings GetOrchestrationServiceSetting
throw new InvalidOperationException("Unable to find an Azure Storage connection string to use for this binding.");
}

TimeSpan extendedSessionTimeout = TimeSpan.FromSeconds(
Math.Max(this.ExtendedSessionIdleTimeoutInSeconds, 0));

return new AzureStorageOrchestrationServiceSettings
{
StorageConnectionString = resolvedStorageConnectionString,
TaskHubName = taskHubNameOverride ?? this.HubName,
PartitionCount = this.PartitionCount,
ControlQueueBatchSize = this.ControlQueueBatchSize,
ControlQueueVisibilityTimeout = this.ControlQueueVisibilityTimeout,
WorkItemQueueVisibilityTimeout = this.WorkItemQueueVisibilityTimeout,
MaxConcurrentTaskOrchestrationWorkItems = this.MaxConcurrentOrchestratorFunctions,
MaxConcurrentTaskActivityWorkItems = this.MaxConcurrentActivityFunctions,
ExtendedSessionsEnabled = this.ExtendedSessionsEnabled,
ExtendedSessionIdleTimeout = extendedSessionTimeout,
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ public EventGridEvent() { }
public string Subject { get; set; }

[JsonProperty(PropertyName = "data")]
public object Data { get; set; }
public EventGridPayload Data { get; set; }

[JsonProperty(PropertyName = "eventType")]
public string EventType { get; set; }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,6 @@ private enum AsyncActionType

public void SetFunctionInvocationCallback(Func<Task> callback)
{
if (this.functionInvocationCallback != null)
{
throw new InvalidOperationException($"{nameof(this.SetFunctionInvocationCallback)} must be called only once.");
}

this.functionInvocationCallback = callback ?? throw new ArgumentNullException(nameof(callback));
}

Expand All @@ -62,7 +57,6 @@ public override async Task<string> Execute(OrchestrationContext innerContext, st
throw new InvalidOperationException($"The {nameof(this.functionInvocationCallback)} has not been assigned!");
}

this.context.AssignToCurrentThread();
this.context.SetInnerContext(innerContext);
this.context.SetInput(serializedInput);

Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@
<AssemblyName>Microsoft.Azure.WebJobs.Extensions.DurableTask</AssemblyName>
<RootNamespace>Microsoft.Azure.WebJobs.Extensions.DurableTask</RootNamespace>
<DocumentationFile>Microsoft.Azure.WebJobs.Extensions.DurableTask.xml</DocumentationFile>
<AssemblyVersion>1.3.1.0</AssemblyVersion>
<FileVersion>1.3.1.0</FileVersion>
<Version>1.3.1-rc</Version>
<AssemblyVersion>1.3.3.0</AssemblyVersion>
<FileVersion>1.3.3.0</FileVersion>
<Version>1.3.3-rc</Version>
<Company>Microsoft Corporation</Company>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="DurableTask.AzureStorage" Version="1.1.7-beta" />
<PackageReference Include="DurableTask.AzureStorage" Version="1.2.0" />
</ItemGroup>

<ItemGroup Condition="'$(Configuration)' == 'Debug'">
Expand Down
12 changes: 6 additions & 6 deletions test/BindingTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public BindingTests(ITestOutputHelper output)
[Fact]
public async Task ActivityTriggerAsJObject()
{
using (JobHost host = TestHelpers.GetJobHost(this.loggerFactory, nameof(this.ActivityTriggerAsJObject)))
using (JobHost host = TestHelpers.GetJobHost(this.loggerFactory, nameof(this.ActivityTriggerAsJObject), false))
{
await host.StartAsync();

Expand All @@ -48,7 +48,7 @@ public async Task ActivityTriggerAsJObject()
// The function checks to see if there is a property called "Foo" which is set to a value
// called "Bar" and returns true if this is the case. Otherwise returns false.
Assert.Equal(OrchestrationRuntimeStatus.Completed, status?.RuntimeStatus);
Assert.Equal(true, status?.Output);
Assert.True((bool)status?.Output);

await host.StopAsync();
}
Expand All @@ -57,7 +57,7 @@ public async Task ActivityTriggerAsJObject()
[Fact]
public async Task ActivityTriggerAsPOCO()
{
using (JobHost host = TestHelpers.GetJobHost(this.loggerFactory, nameof(this.ActivityTriggerAsPOCO)))
using (JobHost host = TestHelpers.GetJobHost(this.loggerFactory, nameof(this.ActivityTriggerAsPOCO), false))
{
await host.StartAsync();

Expand All @@ -83,7 +83,7 @@ public async Task ActivityTriggerAsPOCO()
[Fact]
public async Task ActivityTriggerAsNumber()
{
using (JobHost host = TestHelpers.GetJobHost(this.loggerFactory, nameof(this.ActivityTriggerAsNumber)))
using (JobHost host = TestHelpers.GetJobHost(this.loggerFactory, nameof(this.ActivityTriggerAsNumber), false))
{
await host.StartAsync();

Expand All @@ -107,7 +107,7 @@ public async Task ActivityTriggerAsNumber()
[Fact]
public async Task BindToBlobViaParameterName()
{
using (JobHost host = TestHelpers.GetJobHost(this.loggerFactory, nameof(this.ActivityTriggerAsNumber)))
using (JobHost host = TestHelpers.GetJobHost(this.loggerFactory, nameof(this.ActivityTriggerAsNumber), false))
{
await host.StartAsync();

Expand Down Expand Up @@ -157,7 +157,7 @@ public async Task BindToBlobViaParameterName()
[Fact]
public async Task BindToBlobViaPOCO()
{
using (JobHost host = TestHelpers.GetJobHost(this.loggerFactory, nameof(this.ActivityTriggerAsNumber)))
using (JobHost host = TestHelpers.GetJobHost(this.loggerFactory, nameof(this.ActivityTriggerAsNumber), false))
{
await host.StartAsync();

Expand Down
3 changes: 1 addition & 2 deletions test/DurableOrchestrationClientBaseTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,10 @@
using System;
using System.Threading.Tasks;
using FluentAssertions;
using Microsoft.Azure.WebJobs;
using Moq;
using Xunit;

namespace WebJobs.Extensions.DurableTask.Tests
namespace Microsoft.Azure.WebJobs.Extensions.DurableTask.Tests
{
public class DurableOrchestrationClientBaseTests
{
Expand Down
4 changes: 1 addition & 3 deletions test/DurableOrchestrationClientMock.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,8 @@
using System;
using System.Threading.Tasks;
using DurableTask.Core;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.DurableTask;

namespace WebJobs.Extensions.DurableTask.Tests
namespace Microsoft.Azure.WebJobs.Extensions.DurableTask.Tests
{
internal class DurableOrchestrationClientMock : DurableOrchestrationClient
{
Expand Down
3 changes: 1 addition & 2 deletions test/DurableOrchestrationContextBaseTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,10 @@
using System.Threading;
using System.Threading.Tasks;
using FluentAssertions;
using Microsoft.Azure.WebJobs;
using Moq;
using Xunit;

namespace WebJobs.Extensions.DurableTask.Tests
namespace Microsoft.Azure.WebJobs.Extensions.DurableTask.Tests
{
public class DurableOrchestrationContextBaseTests
{
Expand Down
Loading

0 comments on commit 3700d19

Please sign in to comment.