diff --git a/src/WebJobs.Extensions.DurableTask/AzureStorageDurabilityProvider.cs b/src/WebJobs.Extensions.DurableTask/AzureStorageDurabilityProvider.cs
index 5ef9ed716..895b99872 100644
--- a/src/WebJobs.Extensions.DurableTask/AzureStorageDurabilityProvider.cs
+++ b/src/WebJobs.Extensions.DurableTask/AzureStorageDurabilityProvider.cs
@@ -35,6 +35,16 @@ internal class AzureStorageDurabilityProvider : DurabilityProvider
private readonly JObject storageOptionsJson;
private readonly ILogger logger;
+ private readonly object initLock = new object();
+
+#if !FUNCTIONS_V1
+ private DurableTaskScaleMonitor singletonScaleMonitor;
+#endif
+
+#if FUNCTIONS_V3_OR_GREATER
+ private DurableTaskTargetScaler singletonTargetScaler;
+#endif
+
public AzureStorageDurabilityProvider(
AzureStorageOrchestrationService service,
IStorageServiceClientProviderFactory clientProviderFactory,
@@ -226,12 +236,18 @@ internal static OrchestrationInstanceStatusQueryCondition ConvertWebjobsDurableC
#if !FUNCTIONS_V1
internal DurableTaskMetricsProvider GetMetricsProvider(
- string functionName,
- string hubName,
- StorageAccountClientProvider storageAccountClientProvider,
- ILogger logger)
+ string hubName,
+ StorageAccountClientProvider storageAccountClientProvider,
+ ILogger logger)
+ {
+ return new DurableTaskMetricsProvider(hubName, logger, performanceMonitor: null, storageAccountClientProvider);
+ }
+
+ // Common routine for getting the scaler ID. Note that we MUST use the same ID for both the
+ // scale monitor and the target scaler.
+ private static string GetScalerUniqueId(string hubName)
{
- return new DurableTaskMetricsProvider(functionName, hubName, logger, performanceMonitor: null, storageAccountClientProvider);
+ return $"DurableTask-AzureStorage:{hubName ?? "default"}";
}
///
@@ -242,16 +258,28 @@ public override bool TryGetScaleMonitor(
string connectionName,
out IScaleMonitor scaleMonitor)
{
- StorageAccountClientProvider storageAccountClientProvider = this.clientProviderFactory.GetClientProvider(connectionName);
- DurableTaskMetricsProvider metricsProvider = this.GetMetricsProvider(functionName, hubName, storageAccountClientProvider, this.logger);
- scaleMonitor = new DurableTaskScaleMonitor(
- functionId,
- functionName,
- hubName,
- storageAccountClientProvider,
- this.logger,
- metricsProvider);
- return true;
+ lock (this.initLock)
+ {
+ if (this.singletonScaleMonitor == null)
+ {
+ DurableTaskMetricsProvider metricsProvider = this.GetMetricsProvider(
+ hubName,
+ this.clientProviderFactory.GetClientProvider(connectionName),
+ this.logger);
+
+ // Scalers in Durable Functions are shared for all functions in the same task hub.
+ // So instead of using a function ID, we use the task hub name as the basis for the descriptor ID.
+ string id = GetScalerUniqueId(hubName);
+ this.singletonScaleMonitor = new DurableTaskScaleMonitor(
+ id,
+ hubName,
+ this.logger,
+ metricsProvider);
+ }
+
+ scaleMonitor = this.singletonScaleMonitor;
+ return true;
+ }
}
#endif
@@ -263,11 +291,25 @@ public override bool TryGetTargetScaler(
string connectionName,
out ITargetScaler targetScaler)
{
- // This is only called by the ScaleController, it doesn't run in the Functions Host process.
- StorageAccountClientProvider storageAccountClientProvider = this.clientProviderFactory.GetClientProvider(connectionName);
- DurableTaskMetricsProvider metricsProvider = this.GetMetricsProvider(functionName, hubName, storageAccountClientProvider, this.logger);
- targetScaler = new DurableTaskTargetScaler(functionId, metricsProvider, this, this.logger);
- return true;
+ lock (this.initLock)
+ {
+ if (this.singletonTargetScaler == null)
+ {
+ // This is only called by the ScaleController, it doesn't run in the Functions Host process.
+ DurableTaskMetricsProvider metricsProvider = this.GetMetricsProvider(
+ hubName,
+ this.clientProviderFactory.GetClientProvider(connectionName),
+ this.logger);
+
+ // Scalers in Durable Functions are shared for all functions in the same task hub.
+ // So instead of using a function ID, we use the task hub name as the basis for the descriptor ID.
+ string id = GetScalerUniqueId(hubName);
+ this.singletonTargetScaler = new DurableTaskTargetScaler(id, metricsProvider, this, this.logger);
+ }
+
+ targetScaler = this.singletonTargetScaler;
+ return true;
+ }
}
#endif
}
diff --git a/src/WebJobs.Extensions.DurableTask/Listener/DurableTaskMetricsProvider.cs b/src/WebJobs.Extensions.DurableTask/Listener/DurableTaskMetricsProvider.cs
index edfcd4d37..0aff5fc1c 100644
--- a/src/WebJobs.Extensions.DurableTask/Listener/DurableTaskMetricsProvider.cs
+++ b/src/WebJobs.Extensions.DurableTask/Listener/DurableTaskMetricsProvider.cs
@@ -14,16 +14,18 @@ namespace Microsoft.Azure.WebJobs.Extensions.DurableTask
{
internal class DurableTaskMetricsProvider
{
- private readonly string functionName;
private readonly string hubName;
private readonly ILogger logger;
private readonly StorageAccountClientProvider storageAccountClientProvider;
private DisconnectedPerformanceMonitor performanceMonitor;
- public DurableTaskMetricsProvider(string functionName, string hubName, ILogger logger, DisconnectedPerformanceMonitor performanceMonitor, StorageAccountClientProvider storageAccountClientProvider)
+ public DurableTaskMetricsProvider(
+ string hubName,
+ ILogger logger,
+ DisconnectedPerformanceMonitor performanceMonitor,
+ StorageAccountClientProvider storageAccountClientProvider)
{
- this.functionName = functionName;
this.hubName = hubName;
this.logger = logger;
this.performanceMonitor = performanceMonitor;
@@ -43,7 +45,7 @@ public virtual async Task GetMetricsAsync()
}
catch (Exception e) when (e.InnerException is RequestFailedException)
{
- this.logger.LogWarning("{details}. Function: {functionName}. HubName: {hubName}.", e.ToString(), this.functionName, this.hubName);
+ this.logger.LogWarning("{details}. HubName: {hubName}.", e.ToString(), this.hubName);
}
if (heartbeat != null)
diff --git a/src/WebJobs.Extensions.DurableTask/Listener/DurableTaskScaleMonitor.cs b/src/WebJobs.Extensions.DurableTask/Listener/DurableTaskScaleMonitor.cs
index 9d0155660..ce0da320b 100644
--- a/src/WebJobs.Extensions.DurableTask/Listener/DurableTaskScaleMonitor.cs
+++ b/src/WebJobs.Extensions.DurableTask/Listener/DurableTaskScaleMonitor.cs
@@ -6,8 +6,6 @@
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
-using Azure;
-using DurableTask.AzureStorage;
using DurableTask.AzureStorage.Monitoring;
using Microsoft.Azure.WebJobs.Host.Scale;
using Microsoft.Extensions.Logging;
@@ -17,42 +15,29 @@ namespace Microsoft.Azure.WebJobs.Extensions.DurableTask
{
internal sealed class DurableTaskScaleMonitor : IScaleMonitor
{
- private readonly string functionId;
- private readonly string functionName;
private readonly string hubName;
- private readonly StorageAccountClientProvider storageAccountClientProvider;
private readonly ScaleMonitorDescriptor scaleMonitorDescriptor;
private readonly ILogger logger;
private readonly DurableTaskMetricsProvider durableTaskMetricsProvider;
- private DisconnectedPerformanceMonitor performanceMonitor;
-
public DurableTaskScaleMonitor(
- string functionId,
- string functionName,
+ string id,
string hubName,
- StorageAccountClientProvider storageAccountClientProvider,
ILogger logger,
- DurableTaskMetricsProvider durableTaskMetricsProvider,
- DisconnectedPerformanceMonitor performanceMonitor = null)
+ DurableTaskMetricsProvider durableTaskMetricsProvider)
{
- this.functionId = functionId;
- this.functionName = functionName;
this.hubName = hubName;
- this.storageAccountClientProvider = storageAccountClientProvider;
this.logger = logger;
- this.performanceMonitor = performanceMonitor;
this.durableTaskMetricsProvider = durableTaskMetricsProvider;
#if FUNCTIONS_V3_OR_GREATER
- this.scaleMonitorDescriptor = new ScaleMonitorDescriptor($"{this.functionId}-DurableTaskTrigger-{this.hubName}".ToLower(), this.functionId);
+ // Scalers in Durable Functions are shared for all functions in the same task hub.
+ // So instead of using a function ID, we use the task hub name as the basis for the descriptor ID.
+ this.scaleMonitorDescriptor = new ScaleMonitorDescriptor(id: id, functionId: id);
#else
-#pragma warning disable CS0618 // Type or member is obsolete.
-
// We need this because the new ScaleMonitorDescriptor constructor is not compatible with the WebJobs version of Functions V1 and V2.
// Technically, it is also not available in Functions V3, but we don't have a TFM allowing us to differentiate between Functions V3 and V4.
- this.scaleMonitorDescriptor = new ScaleMonitorDescriptor($"{this.functionId}-DurableTaskTrigger-{this.hubName}".ToLower());
-#pragma warning restore CS0618 // Type or member is obsolete. However, the new interface is not compatible with Functions V2 and V1
+ this.scaleMonitorDescriptor = new ScaleMonitorDescriptor(id);
#endif
}
@@ -151,9 +136,10 @@ private ScaleStatus GetScaleStatusCore(int workerCount, DurableTaskTriggerMetric
if (writeToUserLogs)
{
this.logger.LogInformation(
- $"Durable Functions Trigger Scale Decision: {scaleStatus.Vote.ToString()}, Reason: {scaleRecommendation?.Reason}",
+ "Durable Functions Trigger Scale Decision for {TaskHub}: {Vote}, Reason: {Reason}",
this.hubName,
- this.functionName);
+ scaleStatus.Vote,
+ scaleRecommendation?.Reason);
}
return scaleStatus;
diff --git a/src/WebJobs.Extensions.DurableTask/Listener/DurableTaskTargetScaler.cs b/src/WebJobs.Extensions.DurableTask/Listener/DurableTaskTargetScaler.cs
index c1faa94f4..cd6a58c1e 100644
--- a/src/WebJobs.Extensions.DurableTask/Listener/DurableTaskTargetScaler.cs
+++ b/src/WebJobs.Extensions.DurableTask/Listener/DurableTaskTargetScaler.cs
@@ -19,14 +19,18 @@ internal class DurableTaskTargetScaler : ITargetScaler
private readonly TargetScalerResult scaleResult;
private readonly DurabilityProvider durabilityProvider;
private readonly ILogger logger;
- private readonly string functionId;
+ private readonly string scaler;
- public DurableTaskTargetScaler(string functionId, DurableTaskMetricsProvider metricsProvider, DurabilityProvider durabilityProvider, ILogger logger)
+ public DurableTaskTargetScaler(
+ string scalerId,
+ DurableTaskMetricsProvider metricsProvider,
+ DurabilityProvider durabilityProvider,
+ ILogger logger)
{
- this.functionId = functionId;
+ this.scaler = scalerId;
this.metricsProvider = metricsProvider;
this.scaleResult = new TargetScalerResult();
- this.TargetScalerDescriptor = new TargetScalerDescriptor(this.functionId);
+ this.TargetScalerDescriptor = new TargetScalerDescriptor(this.scaler);
this.durabilityProvider = durabilityProvider;
this.logger = logger;
}
@@ -68,7 +72,7 @@ public async Task GetScaleResultAsync(TargetScalerContext co
// and the ScaleController is injecting it's own custom ILogger implementation that forwards logs to Kusto.
var metricsLog = $"Metrics: workItemQueueLength={workItemQueueLength}. controlQueueLengths={serializedControlQueueLengths}. " +
$"maxConcurrentOrchestrators={this.MaxConcurrentOrchestrators}. maxConcurrentActivities={this.MaxConcurrentActivities}";
- var scaleControllerLog = $"Target worker count for '{this.functionId}' is '{numWorkersToRequest}'. " +
+ var scaleControllerLog = $"Target worker count for '{this.scaler}' is '{numWorkersToRequest}'. " +
metricsLog;
// target worker count should never be negative
@@ -85,7 +89,7 @@ public async Task GetScaleResultAsync(TargetScalerContext co
// We want to augment the exception with metrics information for investigation purposes
var metricsLog = $"Metrics: workItemQueueLength={metrics?.WorkItemQueueLength}. controlQueueLengths={metrics?.ControlQueueLengths}. " +
$"maxConcurrentOrchestrators={this.MaxConcurrentOrchestrators}. maxConcurrentActivities={this.MaxConcurrentActivities}";
- var errorLog = $"Error: target worker count for '{this.functionId}' resulted in exception. " + metricsLog;
+ var errorLog = $"Error: target worker count for '{this.scaler}' resulted in exception. " + metricsLog;
throw new Exception(errorLog, ex);
}
}
diff --git a/src/WebJobs.Extensions.DurableTask/WebJobs.Extensions.DurableTask.csproj b/src/WebJobs.Extensions.DurableTask/WebJobs.Extensions.DurableTask.csproj
index ab88063e1..df864126d 100644
--- a/src/WebJobs.Extensions.DurableTask/WebJobs.Extensions.DurableTask.csproj
+++ b/src/WebJobs.Extensions.DurableTask/WebJobs.Extensions.DurableTask.csproj
@@ -8,7 +8,7 @@
0
0
$(MajorVersion).$(MinorVersion).$(PatchVersion)
- rc.2
+ rc.3
$(MajorVersion).$(MinorVersion).$(PatchVersion)
$(MajorVersion).0.0.0
Microsoft Corporation
diff --git a/test/Common/TestEntities.cs b/test/Common/TestEntities.cs
index a1ba7e15b..5bf6ecd0a 100644
--- a/test/Common/TestEntities.cs
+++ b/test/Common/TestEntities.cs
@@ -340,10 +340,14 @@ public static async Task HttpEntity(
private static async Task CallHttpAsync(string requestUri)
{
- using (HttpResponseMessage response = await SharedHttpClient.GetAsync(requestUri))
- {
- return (int)response.StatusCode;
- }
+ ////using (HttpResponseMessage response = await SharedHttpClient.GetAsync(requestUri))
+ ////{
+ //// return (int)response.StatusCode;
+ ////}
+
+ // Making real calls to HTTP endpoints is not reliable in tests.
+ await Task.Delay(100);
+ return 200;
}
//-------------- an entity that uses custom deserialization
diff --git a/test/Common/TestHelpers.cs b/test/Common/TestHelpers.cs
index ac9d7feaa..990ba8515 100644
--- a/test/Common/TestHelpers.cs
+++ b/test/Common/TestHelpers.cs
@@ -16,8 +16,8 @@
using DurableTask.AzureStorage;
using Microsoft.ApplicationInsights.Channel;
#if !FUNCTIONS_V1
-using Microsoft.Extensions.Hosting;
using Microsoft.Azure.WebJobs.Host.Scale;
+using Microsoft.Extensions.Hosting;
#endif
using Microsoft.Azure.WebJobs.Extensions.DurableTask.Storage;
using Microsoft.Azure.WebJobs.Host.TestCommon;
diff --git a/test/FunctionsV2/DurableTaskListenerTests.cs b/test/FunctionsV2/DurableTaskListenerTests.cs
index f77efd768..e79b51586 100644
--- a/test/FunctionsV2/DurableTaskListenerTests.cs
+++ b/test/FunctionsV2/DurableTaskListenerTests.cs
@@ -2,13 +2,10 @@
// Licensed under the MIT License. See LICENSE in the project root for license information.
using System;
-using System.Linq;
-using Microsoft.Azure.WebJobs.Host.Executors;
using Microsoft.Azure.WebJobs.Host.Scale;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using Microsoft.Extensions.Options;
-using Moq;
using Xunit;
namespace Microsoft.Azure.WebJobs.Extensions.DurableTask.Tests
@@ -40,9 +37,9 @@ public void GetMonitor_ReturnsExpectedValue()
IScaleMonitor scaleMonitor = this.listener.GetMonitor();
Assert.Equal(typeof(DurableTaskScaleMonitor), scaleMonitor.GetType());
- Assert.Equal($"{this.functionId}-DurableTaskTrigger-DurableTaskHub".ToLower(), scaleMonitor.Descriptor.Id);
+ Assert.Equal($"DurableTask-AzureStorage:DurableTaskHub", scaleMonitor.Descriptor.Id);
- var scaleMonitor2 = this.listener.GetMonitor();
+ IScaleMonitor scaleMonitor2 = this.listener.GetMonitor();
Assert.Same(scaleMonitor, scaleMonitor2);
}
diff --git a/test/FunctionsV2/DurableTaskScaleMonitorTests.cs b/test/FunctionsV2/DurableTaskScaleMonitorTests.cs
index ae2dd18a3..3ea4f9297 100644
--- a/test/FunctionsV2/DurableTaskScaleMonitorTests.cs
+++ b/test/FunctionsV2/DurableTaskScaleMonitorTests.cs
@@ -20,8 +20,6 @@ namespace Microsoft.Azure.WebJobs.Extensions.DurableTask.Tests
{
public class DurableTaskScaleMonitorTests
{
- private readonly string functionId = "DurableTaskTriggerFunctionId";
- private readonly FunctionName functionName = new FunctionName("DurableTaskTriggerFunctionName");
private readonly string hubName = "DurableTaskTriggerHubName";
private readonly StorageAccountClientProvider clientProvider = new StorageAccountClientProvider(TestHelpers.GetStorageConnectionString());
private readonly ITestOutputHelper output;
@@ -39,32 +37,31 @@ public DurableTaskScaleMonitorTests(ITestOutputHelper output)
this.loggerFactory.AddProvider(this.loggerProvider);
ILogger logger = this.loggerFactory.CreateLogger(LogCategories.CreateTriggerCategory("DurableTask"));
this.traceHelper = new EndToEndTraceHelper(logger, false);
- this.performanceMonitor = new Mock(MockBehavior.Strict, new AzureStorageOrchestrationServiceSettings{
+ this.performanceMonitor = new Mock(MockBehavior.Strict, new AzureStorageOrchestrationServiceSettings
+ {
StorageAccountClientProvider = this.clientProvider,
TaskHubName = this.hubName,
});
var metricsProvider = new DurableTaskMetricsProvider(
- this.functionName.Name,
this.hubName,
logger,
this.performanceMonitor.Object,
this.clientProvider);
+ string scalerId = $"DurableTask-AzureStorage:{this.hubName}";
+
this.scaleMonitor = new DurableTaskScaleMonitor(
- this.functionId,
- this.functionName.Name,
+ scalerId,
this.hubName,
- this.clientProvider,
logger,
- metricsProvider,
- this.performanceMonitor.Object);
+ metricsProvider);
}
[Fact]
[Trait("Category", PlatformSpecificHelpers.TestCategory)]
public void ScaleMonitorDescriptor_ReturnsExpectedValue()
{
- Assert.Equal($"{this.functionId}-DurableTaskTrigger-{this.hubName}".ToLower(), this.scaleMonitor.Descriptor.Id);
+ Assert.Equal($"DurableTask-AzureStorage:{this.hubName}", this.scaleMonitor.Descriptor.Id);
}
[Fact]
diff --git a/test/FunctionsV2/DurableTaskTargetScalerTests.cs b/test/FunctionsV2/DurableTaskTargetScalerTests.cs
index 7ee91d5b6..ac8115529 100644
--- a/test/FunctionsV2/DurableTaskTargetScalerTests.cs
+++ b/test/FunctionsV2/DurableTaskTargetScalerTests.cs
@@ -47,7 +47,6 @@ public DurableTaskTargetScalerTests(ITestOutputHelper output)
StorageAccountClientProvider storageAccountClientProvider = null;
this.metricsProviderMock = new Mock(
MockBehavior.Strict,
- "FunctionName",
"HubName",
logger,
nullPerformanceMonitorMock,