From 6d098b2379fefd23c31476b86138e634e8de3e5f Mon Sep 17 00:00:00 2001 From: Naiyuan Tian <110135109+nytian@users.noreply.github.com> Date: Wed, 4 Dec 2024 16:15:28 -0800 Subject: [PATCH] [Cherry Pick] Make scalers into per-task hub singletons #2967 (#2984) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * cherry pick change for sc * update version to rc.3 * remove warnings * cherry pick Use consistent scaler IDs for target-based and scale monitor implemen… --------- Co-authored-by: Chris Gillum --- .../AzureStorageDurabilityProvider.cs | 82 ++++++++++++++----- .../Listener/DurableTaskMetricsProvider.cs | 10 ++- .../Listener/DurableTaskScaleMonitor.cs | 32 ++------ .../Listener/DurableTaskTargetScaler.cs | 16 ++-- .../WebJobs.Extensions.DurableTask.csproj | 2 +- test/Common/TestEntities.cs | 12 ++- test/Common/TestHelpers.cs | 2 +- test/FunctionsV2/DurableTaskListenerTests.cs | 7 +- .../DurableTaskScaleMonitorTests.cs | 17 ++-- .../DurableTaskTargetScalerTests.cs | 1 - 10 files changed, 106 insertions(+), 75 deletions(-) diff --git a/src/WebJobs.Extensions.DurableTask/AzureStorageDurabilityProvider.cs b/src/WebJobs.Extensions.DurableTask/AzureStorageDurabilityProvider.cs index 5ef9ed716..895b99872 100644 --- a/src/WebJobs.Extensions.DurableTask/AzureStorageDurabilityProvider.cs +++ b/src/WebJobs.Extensions.DurableTask/AzureStorageDurabilityProvider.cs @@ -35,6 +35,16 @@ internal class AzureStorageDurabilityProvider : DurabilityProvider private readonly JObject storageOptionsJson; private readonly ILogger logger; + private readonly object initLock = new object(); + +#if !FUNCTIONS_V1 + private DurableTaskScaleMonitor singletonScaleMonitor; +#endif + +#if FUNCTIONS_V3_OR_GREATER + private DurableTaskTargetScaler singletonTargetScaler; +#endif + public AzureStorageDurabilityProvider( AzureStorageOrchestrationService service, IStorageServiceClientProviderFactory clientProviderFactory, @@ -226,12 +236,18 @@ internal static OrchestrationInstanceStatusQueryCondition ConvertWebjobsDurableC #if !FUNCTIONS_V1 internal DurableTaskMetricsProvider GetMetricsProvider( - string functionName, - string hubName, - StorageAccountClientProvider storageAccountClientProvider, - ILogger logger) + string hubName, + StorageAccountClientProvider storageAccountClientProvider, + ILogger logger) + { + return new DurableTaskMetricsProvider(hubName, logger, performanceMonitor: null, storageAccountClientProvider); + } + + // Common routine for getting the scaler ID. Note that we MUST use the same ID for both the + // scale monitor and the target scaler. + private static string GetScalerUniqueId(string hubName) { - return new DurableTaskMetricsProvider(functionName, hubName, logger, performanceMonitor: null, storageAccountClientProvider); + return $"DurableTask-AzureStorage:{hubName ?? "default"}"; } /// @@ -242,16 +258,28 @@ public override bool TryGetScaleMonitor( string connectionName, out IScaleMonitor scaleMonitor) { - StorageAccountClientProvider storageAccountClientProvider = this.clientProviderFactory.GetClientProvider(connectionName); - DurableTaskMetricsProvider metricsProvider = this.GetMetricsProvider(functionName, hubName, storageAccountClientProvider, this.logger); - scaleMonitor = new DurableTaskScaleMonitor( - functionId, - functionName, - hubName, - storageAccountClientProvider, - this.logger, - metricsProvider); - return true; + lock (this.initLock) + { + if (this.singletonScaleMonitor == null) + { + DurableTaskMetricsProvider metricsProvider = this.GetMetricsProvider( + hubName, + this.clientProviderFactory.GetClientProvider(connectionName), + this.logger); + + // Scalers in Durable Functions are shared for all functions in the same task hub. + // So instead of using a function ID, we use the task hub name as the basis for the descriptor ID. + string id = GetScalerUniqueId(hubName); + this.singletonScaleMonitor = new DurableTaskScaleMonitor( + id, + hubName, + this.logger, + metricsProvider); + } + + scaleMonitor = this.singletonScaleMonitor; + return true; + } } #endif @@ -263,11 +291,25 @@ public override bool TryGetTargetScaler( string connectionName, out ITargetScaler targetScaler) { - // This is only called by the ScaleController, it doesn't run in the Functions Host process. - StorageAccountClientProvider storageAccountClientProvider = this.clientProviderFactory.GetClientProvider(connectionName); - DurableTaskMetricsProvider metricsProvider = this.GetMetricsProvider(functionName, hubName, storageAccountClientProvider, this.logger); - targetScaler = new DurableTaskTargetScaler(functionId, metricsProvider, this, this.logger); - return true; + lock (this.initLock) + { + if (this.singletonTargetScaler == null) + { + // This is only called by the ScaleController, it doesn't run in the Functions Host process. + DurableTaskMetricsProvider metricsProvider = this.GetMetricsProvider( + hubName, + this.clientProviderFactory.GetClientProvider(connectionName), + this.logger); + + // Scalers in Durable Functions are shared for all functions in the same task hub. + // So instead of using a function ID, we use the task hub name as the basis for the descriptor ID. + string id = GetScalerUniqueId(hubName); + this.singletonTargetScaler = new DurableTaskTargetScaler(id, metricsProvider, this, this.logger); + } + + targetScaler = this.singletonTargetScaler; + return true; + } } #endif } diff --git a/src/WebJobs.Extensions.DurableTask/Listener/DurableTaskMetricsProvider.cs b/src/WebJobs.Extensions.DurableTask/Listener/DurableTaskMetricsProvider.cs index edfcd4d37..0aff5fc1c 100644 --- a/src/WebJobs.Extensions.DurableTask/Listener/DurableTaskMetricsProvider.cs +++ b/src/WebJobs.Extensions.DurableTask/Listener/DurableTaskMetricsProvider.cs @@ -14,16 +14,18 @@ namespace Microsoft.Azure.WebJobs.Extensions.DurableTask { internal class DurableTaskMetricsProvider { - private readonly string functionName; private readonly string hubName; private readonly ILogger logger; private readonly StorageAccountClientProvider storageAccountClientProvider; private DisconnectedPerformanceMonitor performanceMonitor; - public DurableTaskMetricsProvider(string functionName, string hubName, ILogger logger, DisconnectedPerformanceMonitor performanceMonitor, StorageAccountClientProvider storageAccountClientProvider) + public DurableTaskMetricsProvider( + string hubName, + ILogger logger, + DisconnectedPerformanceMonitor performanceMonitor, + StorageAccountClientProvider storageAccountClientProvider) { - this.functionName = functionName; this.hubName = hubName; this.logger = logger; this.performanceMonitor = performanceMonitor; @@ -43,7 +45,7 @@ public virtual async Task GetMetricsAsync() } catch (Exception e) when (e.InnerException is RequestFailedException) { - this.logger.LogWarning("{details}. Function: {functionName}. HubName: {hubName}.", e.ToString(), this.functionName, this.hubName); + this.logger.LogWarning("{details}. HubName: {hubName}.", e.ToString(), this.hubName); } if (heartbeat != null) diff --git a/src/WebJobs.Extensions.DurableTask/Listener/DurableTaskScaleMonitor.cs b/src/WebJobs.Extensions.DurableTask/Listener/DurableTaskScaleMonitor.cs index 9d0155660..ce0da320b 100644 --- a/src/WebJobs.Extensions.DurableTask/Listener/DurableTaskScaleMonitor.cs +++ b/src/WebJobs.Extensions.DurableTask/Listener/DurableTaskScaleMonitor.cs @@ -6,8 +6,6 @@ using System.Collections.Generic; using System.Linq; using System.Threading.Tasks; -using Azure; -using DurableTask.AzureStorage; using DurableTask.AzureStorage.Monitoring; using Microsoft.Azure.WebJobs.Host.Scale; using Microsoft.Extensions.Logging; @@ -17,42 +15,29 @@ namespace Microsoft.Azure.WebJobs.Extensions.DurableTask { internal sealed class DurableTaskScaleMonitor : IScaleMonitor { - private readonly string functionId; - private readonly string functionName; private readonly string hubName; - private readonly StorageAccountClientProvider storageAccountClientProvider; private readonly ScaleMonitorDescriptor scaleMonitorDescriptor; private readonly ILogger logger; private readonly DurableTaskMetricsProvider durableTaskMetricsProvider; - private DisconnectedPerformanceMonitor performanceMonitor; - public DurableTaskScaleMonitor( - string functionId, - string functionName, + string id, string hubName, - StorageAccountClientProvider storageAccountClientProvider, ILogger logger, - DurableTaskMetricsProvider durableTaskMetricsProvider, - DisconnectedPerformanceMonitor performanceMonitor = null) + DurableTaskMetricsProvider durableTaskMetricsProvider) { - this.functionId = functionId; - this.functionName = functionName; this.hubName = hubName; - this.storageAccountClientProvider = storageAccountClientProvider; this.logger = logger; - this.performanceMonitor = performanceMonitor; this.durableTaskMetricsProvider = durableTaskMetricsProvider; #if FUNCTIONS_V3_OR_GREATER - this.scaleMonitorDescriptor = new ScaleMonitorDescriptor($"{this.functionId}-DurableTaskTrigger-{this.hubName}".ToLower(), this.functionId); + // Scalers in Durable Functions are shared for all functions in the same task hub. + // So instead of using a function ID, we use the task hub name as the basis for the descriptor ID. + this.scaleMonitorDescriptor = new ScaleMonitorDescriptor(id: id, functionId: id); #else -#pragma warning disable CS0618 // Type or member is obsolete. - // We need this because the new ScaleMonitorDescriptor constructor is not compatible with the WebJobs version of Functions V1 and V2. // Technically, it is also not available in Functions V3, but we don't have a TFM allowing us to differentiate between Functions V3 and V4. - this.scaleMonitorDescriptor = new ScaleMonitorDescriptor($"{this.functionId}-DurableTaskTrigger-{this.hubName}".ToLower()); -#pragma warning restore CS0618 // Type or member is obsolete. However, the new interface is not compatible with Functions V2 and V1 + this.scaleMonitorDescriptor = new ScaleMonitorDescriptor(id); #endif } @@ -151,9 +136,10 @@ private ScaleStatus GetScaleStatusCore(int workerCount, DurableTaskTriggerMetric if (writeToUserLogs) { this.logger.LogInformation( - $"Durable Functions Trigger Scale Decision: {scaleStatus.Vote.ToString()}, Reason: {scaleRecommendation?.Reason}", + "Durable Functions Trigger Scale Decision for {TaskHub}: {Vote}, Reason: {Reason}", this.hubName, - this.functionName); + scaleStatus.Vote, + scaleRecommendation?.Reason); } return scaleStatus; diff --git a/src/WebJobs.Extensions.DurableTask/Listener/DurableTaskTargetScaler.cs b/src/WebJobs.Extensions.DurableTask/Listener/DurableTaskTargetScaler.cs index c1faa94f4..cd6a58c1e 100644 --- a/src/WebJobs.Extensions.DurableTask/Listener/DurableTaskTargetScaler.cs +++ b/src/WebJobs.Extensions.DurableTask/Listener/DurableTaskTargetScaler.cs @@ -19,14 +19,18 @@ internal class DurableTaskTargetScaler : ITargetScaler private readonly TargetScalerResult scaleResult; private readonly DurabilityProvider durabilityProvider; private readonly ILogger logger; - private readonly string functionId; + private readonly string scaler; - public DurableTaskTargetScaler(string functionId, DurableTaskMetricsProvider metricsProvider, DurabilityProvider durabilityProvider, ILogger logger) + public DurableTaskTargetScaler( + string scalerId, + DurableTaskMetricsProvider metricsProvider, + DurabilityProvider durabilityProvider, + ILogger logger) { - this.functionId = functionId; + this.scaler = scalerId; this.metricsProvider = metricsProvider; this.scaleResult = new TargetScalerResult(); - this.TargetScalerDescriptor = new TargetScalerDescriptor(this.functionId); + this.TargetScalerDescriptor = new TargetScalerDescriptor(this.scaler); this.durabilityProvider = durabilityProvider; this.logger = logger; } @@ -68,7 +72,7 @@ public async Task GetScaleResultAsync(TargetScalerContext co // and the ScaleController is injecting it's own custom ILogger implementation that forwards logs to Kusto. var metricsLog = $"Metrics: workItemQueueLength={workItemQueueLength}. controlQueueLengths={serializedControlQueueLengths}. " + $"maxConcurrentOrchestrators={this.MaxConcurrentOrchestrators}. maxConcurrentActivities={this.MaxConcurrentActivities}"; - var scaleControllerLog = $"Target worker count for '{this.functionId}' is '{numWorkersToRequest}'. " + + var scaleControllerLog = $"Target worker count for '{this.scaler}' is '{numWorkersToRequest}'. " + metricsLog; // target worker count should never be negative @@ -85,7 +89,7 @@ public async Task GetScaleResultAsync(TargetScalerContext co // We want to augment the exception with metrics information for investigation purposes var metricsLog = $"Metrics: workItemQueueLength={metrics?.WorkItemQueueLength}. controlQueueLengths={metrics?.ControlQueueLengths}. " + $"maxConcurrentOrchestrators={this.MaxConcurrentOrchestrators}. maxConcurrentActivities={this.MaxConcurrentActivities}"; - var errorLog = $"Error: target worker count for '{this.functionId}' resulted in exception. " + metricsLog; + var errorLog = $"Error: target worker count for '{this.scaler}' resulted in exception. " + metricsLog; throw new Exception(errorLog, ex); } } diff --git a/src/WebJobs.Extensions.DurableTask/WebJobs.Extensions.DurableTask.csproj b/src/WebJobs.Extensions.DurableTask/WebJobs.Extensions.DurableTask.csproj index ab88063e1..df864126d 100644 --- a/src/WebJobs.Extensions.DurableTask/WebJobs.Extensions.DurableTask.csproj +++ b/src/WebJobs.Extensions.DurableTask/WebJobs.Extensions.DurableTask.csproj @@ -8,7 +8,7 @@ 0 0 $(MajorVersion).$(MinorVersion).$(PatchVersion) - rc.2 + rc.3 $(MajorVersion).$(MinorVersion).$(PatchVersion) $(MajorVersion).0.0.0 Microsoft Corporation diff --git a/test/Common/TestEntities.cs b/test/Common/TestEntities.cs index a1ba7e15b..5bf6ecd0a 100644 --- a/test/Common/TestEntities.cs +++ b/test/Common/TestEntities.cs @@ -340,10 +340,14 @@ public static async Task HttpEntity( private static async Task CallHttpAsync(string requestUri) { - using (HttpResponseMessage response = await SharedHttpClient.GetAsync(requestUri)) - { - return (int)response.StatusCode; - } + ////using (HttpResponseMessage response = await SharedHttpClient.GetAsync(requestUri)) + ////{ + //// return (int)response.StatusCode; + ////} + + // Making real calls to HTTP endpoints is not reliable in tests. + await Task.Delay(100); + return 200; } //-------------- an entity that uses custom deserialization diff --git a/test/Common/TestHelpers.cs b/test/Common/TestHelpers.cs index ac9d7feaa..990ba8515 100644 --- a/test/Common/TestHelpers.cs +++ b/test/Common/TestHelpers.cs @@ -16,8 +16,8 @@ using DurableTask.AzureStorage; using Microsoft.ApplicationInsights.Channel; #if !FUNCTIONS_V1 -using Microsoft.Extensions.Hosting; using Microsoft.Azure.WebJobs.Host.Scale; +using Microsoft.Extensions.Hosting; #endif using Microsoft.Azure.WebJobs.Extensions.DurableTask.Storage; using Microsoft.Azure.WebJobs.Host.TestCommon; diff --git a/test/FunctionsV2/DurableTaskListenerTests.cs b/test/FunctionsV2/DurableTaskListenerTests.cs index f77efd768..e79b51586 100644 --- a/test/FunctionsV2/DurableTaskListenerTests.cs +++ b/test/FunctionsV2/DurableTaskListenerTests.cs @@ -2,13 +2,10 @@ // Licensed under the MIT License. See LICENSE in the project root for license information. using System; -using System.Linq; -using Microsoft.Azure.WebJobs.Host.Executors; using Microsoft.Azure.WebJobs.Host.Scale; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging.Abstractions; using Microsoft.Extensions.Options; -using Moq; using Xunit; namespace Microsoft.Azure.WebJobs.Extensions.DurableTask.Tests @@ -40,9 +37,9 @@ public void GetMonitor_ReturnsExpectedValue() IScaleMonitor scaleMonitor = this.listener.GetMonitor(); Assert.Equal(typeof(DurableTaskScaleMonitor), scaleMonitor.GetType()); - Assert.Equal($"{this.functionId}-DurableTaskTrigger-DurableTaskHub".ToLower(), scaleMonitor.Descriptor.Id); + Assert.Equal($"DurableTask-AzureStorage:DurableTaskHub", scaleMonitor.Descriptor.Id); - var scaleMonitor2 = this.listener.GetMonitor(); + IScaleMonitor scaleMonitor2 = this.listener.GetMonitor(); Assert.Same(scaleMonitor, scaleMonitor2); } diff --git a/test/FunctionsV2/DurableTaskScaleMonitorTests.cs b/test/FunctionsV2/DurableTaskScaleMonitorTests.cs index ae2dd18a3..3ea4f9297 100644 --- a/test/FunctionsV2/DurableTaskScaleMonitorTests.cs +++ b/test/FunctionsV2/DurableTaskScaleMonitorTests.cs @@ -20,8 +20,6 @@ namespace Microsoft.Azure.WebJobs.Extensions.DurableTask.Tests { public class DurableTaskScaleMonitorTests { - private readonly string functionId = "DurableTaskTriggerFunctionId"; - private readonly FunctionName functionName = new FunctionName("DurableTaskTriggerFunctionName"); private readonly string hubName = "DurableTaskTriggerHubName"; private readonly StorageAccountClientProvider clientProvider = new StorageAccountClientProvider(TestHelpers.GetStorageConnectionString()); private readonly ITestOutputHelper output; @@ -39,32 +37,31 @@ public DurableTaskScaleMonitorTests(ITestOutputHelper output) this.loggerFactory.AddProvider(this.loggerProvider); ILogger logger = this.loggerFactory.CreateLogger(LogCategories.CreateTriggerCategory("DurableTask")); this.traceHelper = new EndToEndTraceHelper(logger, false); - this.performanceMonitor = new Mock(MockBehavior.Strict, new AzureStorageOrchestrationServiceSettings{ + this.performanceMonitor = new Mock(MockBehavior.Strict, new AzureStorageOrchestrationServiceSettings + { StorageAccountClientProvider = this.clientProvider, TaskHubName = this.hubName, }); var metricsProvider = new DurableTaskMetricsProvider( - this.functionName.Name, this.hubName, logger, this.performanceMonitor.Object, this.clientProvider); + string scalerId = $"DurableTask-AzureStorage:{this.hubName}"; + this.scaleMonitor = new DurableTaskScaleMonitor( - this.functionId, - this.functionName.Name, + scalerId, this.hubName, - this.clientProvider, logger, - metricsProvider, - this.performanceMonitor.Object); + metricsProvider); } [Fact] [Trait("Category", PlatformSpecificHelpers.TestCategory)] public void ScaleMonitorDescriptor_ReturnsExpectedValue() { - Assert.Equal($"{this.functionId}-DurableTaskTrigger-{this.hubName}".ToLower(), this.scaleMonitor.Descriptor.Id); + Assert.Equal($"DurableTask-AzureStorage:{this.hubName}", this.scaleMonitor.Descriptor.Id); } [Fact] diff --git a/test/FunctionsV2/DurableTaskTargetScalerTests.cs b/test/FunctionsV2/DurableTaskTargetScalerTests.cs index 7ee91d5b6..ac8115529 100644 --- a/test/FunctionsV2/DurableTaskTargetScalerTests.cs +++ b/test/FunctionsV2/DurableTaskTargetScalerTests.cs @@ -47,7 +47,6 @@ public DurableTaskTargetScalerTests(ITestOutputHelper output) StorageAccountClientProvider storageAccountClientProvider = null; this.metricsProviderMock = new Mock( MockBehavior.Strict, - "FunctionName", "HubName", logger, nullPerformanceMonitorMock,