Skip to content

Commit

Permalink
[Cherry Pick] Make scalers into per-task hub singletons #2967 (#2984)
Browse files Browse the repository at this point in the history
* cherry pick change for sc

* update version to rc.3

* remove warnings

* cherry pick Use consistent scaler IDs for target-based and scale monitor implemen…

---------

Co-authored-by: Chris Gillum <[email protected]>
  • Loading branch information
nytian and cgillum authored Dec 5, 2024
1 parent 67963c0 commit 6d098b2
Show file tree
Hide file tree
Showing 10 changed files with 106 additions and 75 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,16 @@ internal class AzureStorageDurabilityProvider : DurabilityProvider
private readonly JObject storageOptionsJson;
private readonly ILogger logger;

private readonly object initLock = new object();

#if !FUNCTIONS_V1
private DurableTaskScaleMonitor singletonScaleMonitor;
#endif

#if FUNCTIONS_V3_OR_GREATER
private DurableTaskTargetScaler singletonTargetScaler;
#endif

public AzureStorageDurabilityProvider(
AzureStorageOrchestrationService service,
IStorageServiceClientProviderFactory clientProviderFactory,
Expand Down Expand Up @@ -226,12 +236,18 @@ internal static OrchestrationInstanceStatusQueryCondition ConvertWebjobsDurableC
#if !FUNCTIONS_V1

internal DurableTaskMetricsProvider GetMetricsProvider(
string functionName,
string hubName,
StorageAccountClientProvider storageAccountClientProvider,
ILogger logger)
string hubName,
StorageAccountClientProvider storageAccountClientProvider,
ILogger logger)
{
return new DurableTaskMetricsProvider(hubName, logger, performanceMonitor: null, storageAccountClientProvider);
}

// Common routine for getting the scaler ID. Note that we MUST use the same ID for both the
// scale monitor and the target scaler.
private static string GetScalerUniqueId(string hubName)
{
return new DurableTaskMetricsProvider(functionName, hubName, logger, performanceMonitor: null, storageAccountClientProvider);
return $"DurableTask-AzureStorage:{hubName ?? "default"}";
}

/// <inheritdoc/>
Expand All @@ -242,16 +258,28 @@ public override bool TryGetScaleMonitor(
string connectionName,
out IScaleMonitor scaleMonitor)
{
StorageAccountClientProvider storageAccountClientProvider = this.clientProviderFactory.GetClientProvider(connectionName);
DurableTaskMetricsProvider metricsProvider = this.GetMetricsProvider(functionName, hubName, storageAccountClientProvider, this.logger);
scaleMonitor = new DurableTaskScaleMonitor(
functionId,
functionName,
hubName,
storageAccountClientProvider,
this.logger,
metricsProvider);
return true;
lock (this.initLock)
{
if (this.singletonScaleMonitor == null)
{
DurableTaskMetricsProvider metricsProvider = this.GetMetricsProvider(
hubName,
this.clientProviderFactory.GetClientProvider(connectionName),
this.logger);

// Scalers in Durable Functions are shared for all functions in the same task hub.
// So instead of using a function ID, we use the task hub name as the basis for the descriptor ID.
string id = GetScalerUniqueId(hubName);
this.singletonScaleMonitor = new DurableTaskScaleMonitor(
id,
hubName,
this.logger,
metricsProvider);
}

scaleMonitor = this.singletonScaleMonitor;
return true;
}
}

#endif
Expand All @@ -263,11 +291,25 @@ public override bool TryGetTargetScaler(
string connectionName,
out ITargetScaler targetScaler)
{
// This is only called by the ScaleController, it doesn't run in the Functions Host process.
StorageAccountClientProvider storageAccountClientProvider = this.clientProviderFactory.GetClientProvider(connectionName);
DurableTaskMetricsProvider metricsProvider = this.GetMetricsProvider(functionName, hubName, storageAccountClientProvider, this.logger);
targetScaler = new DurableTaskTargetScaler(functionId, metricsProvider, this, this.logger);
return true;
lock (this.initLock)
{
if (this.singletonTargetScaler == null)
{
// This is only called by the ScaleController, it doesn't run in the Functions Host process.
DurableTaskMetricsProvider metricsProvider = this.GetMetricsProvider(
hubName,
this.clientProviderFactory.GetClientProvider(connectionName),
this.logger);

// Scalers in Durable Functions are shared for all functions in the same task hub.
// So instead of using a function ID, we use the task hub name as the basis for the descriptor ID.
string id = GetScalerUniqueId(hubName);
this.singletonTargetScaler = new DurableTaskTargetScaler(id, metricsProvider, this, this.logger);
}

targetScaler = this.singletonTargetScaler;
return true;
}
}
#endif
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,18 @@ namespace Microsoft.Azure.WebJobs.Extensions.DurableTask
{
internal class DurableTaskMetricsProvider
{
private readonly string functionName;
private readonly string hubName;
private readonly ILogger logger;
private readonly StorageAccountClientProvider storageAccountClientProvider;

private DisconnectedPerformanceMonitor performanceMonitor;

public DurableTaskMetricsProvider(string functionName, string hubName, ILogger logger, DisconnectedPerformanceMonitor performanceMonitor, StorageAccountClientProvider storageAccountClientProvider)
public DurableTaskMetricsProvider(
string hubName,
ILogger logger,
DisconnectedPerformanceMonitor performanceMonitor,
StorageAccountClientProvider storageAccountClientProvider)
{
this.functionName = functionName;
this.hubName = hubName;
this.logger = logger;
this.performanceMonitor = performanceMonitor;
Expand All @@ -43,7 +45,7 @@ public virtual async Task<DurableTaskTriggerMetrics> GetMetricsAsync()
}
catch (Exception e) when (e.InnerException is RequestFailedException)
{
this.logger.LogWarning("{details}. Function: {functionName}. HubName: {hubName}.", e.ToString(), this.functionName, this.hubName);
this.logger.LogWarning("{details}. HubName: {hubName}.", e.ToString(), this.hubName);
}

if (heartbeat != null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Azure;
using DurableTask.AzureStorage;
using DurableTask.AzureStorage.Monitoring;
using Microsoft.Azure.WebJobs.Host.Scale;
using Microsoft.Extensions.Logging;
Expand All @@ -17,42 +15,29 @@ namespace Microsoft.Azure.WebJobs.Extensions.DurableTask
{
internal sealed class DurableTaskScaleMonitor : IScaleMonitor<DurableTaskTriggerMetrics>
{
private readonly string functionId;
private readonly string functionName;
private readonly string hubName;
private readonly StorageAccountClientProvider storageAccountClientProvider;
private readonly ScaleMonitorDescriptor scaleMonitorDescriptor;
private readonly ILogger logger;
private readonly DurableTaskMetricsProvider durableTaskMetricsProvider;

private DisconnectedPerformanceMonitor performanceMonitor;

public DurableTaskScaleMonitor(
string functionId,
string functionName,
string id,
string hubName,
StorageAccountClientProvider storageAccountClientProvider,
ILogger logger,
DurableTaskMetricsProvider durableTaskMetricsProvider,
DisconnectedPerformanceMonitor performanceMonitor = null)
DurableTaskMetricsProvider durableTaskMetricsProvider)
{
this.functionId = functionId;
this.functionName = functionName;
this.hubName = hubName;
this.storageAccountClientProvider = storageAccountClientProvider;
this.logger = logger;
this.performanceMonitor = performanceMonitor;
this.durableTaskMetricsProvider = durableTaskMetricsProvider;

#if FUNCTIONS_V3_OR_GREATER
this.scaleMonitorDescriptor = new ScaleMonitorDescriptor($"{this.functionId}-DurableTaskTrigger-{this.hubName}".ToLower(), this.functionId);
// Scalers in Durable Functions are shared for all functions in the same task hub.
// So instead of using a function ID, we use the task hub name as the basis for the descriptor ID.
this.scaleMonitorDescriptor = new ScaleMonitorDescriptor(id: id, functionId: id);
#else
#pragma warning disable CS0618 // Type or member is obsolete.

// We need this because the new ScaleMonitorDescriptor constructor is not compatible with the WebJobs version of Functions V1 and V2.
// Technically, it is also not available in Functions V3, but we don't have a TFM allowing us to differentiate between Functions V3 and V4.
this.scaleMonitorDescriptor = new ScaleMonitorDescriptor($"{this.functionId}-DurableTaskTrigger-{this.hubName}".ToLower());
#pragma warning restore CS0618 // Type or member is obsolete. However, the new interface is not compatible with Functions V2 and V1
this.scaleMonitorDescriptor = new ScaleMonitorDescriptor(id);

Check warning on line 40 in src/WebJobs.Extensions.DurableTask/Listener/DurableTaskScaleMonitor.cs

View workflow job for this annotation

GitHub Actions / build

'ScaleMonitorDescriptor.ScaleMonitorDescriptor(string)' is obsolete: 'This constructor is obsolete. Use the version that takes function id instead.'

Check warning on line 40 in src/WebJobs.Extensions.DurableTask/Listener/DurableTaskScaleMonitor.cs

View workflow job for this annotation

GitHub Actions / build

'ScaleMonitorDescriptor.ScaleMonitorDescriptor(string)' is obsolete: 'This constructor is obsolete. Use the version that takes function id instead.'

Check warning on line 40 in src/WebJobs.Extensions.DurableTask/Listener/DurableTaskScaleMonitor.cs

View workflow job for this annotation

GitHub Actions / build

'ScaleMonitorDescriptor.ScaleMonitorDescriptor(string)' is obsolete: 'This constructor is obsolete. Use the version that takes function id instead.'
#endif
}

Expand Down Expand Up @@ -151,9 +136,10 @@ private ScaleStatus GetScaleStatusCore(int workerCount, DurableTaskTriggerMetric
if (writeToUserLogs)
{
this.logger.LogInformation(
$"Durable Functions Trigger Scale Decision: {scaleStatus.Vote.ToString()}, Reason: {scaleRecommendation?.Reason}",
"Durable Functions Trigger Scale Decision for {TaskHub}: {Vote}, Reason: {Reason}",
this.hubName,
this.functionName);
scaleStatus.Vote,
scaleRecommendation?.Reason);
}

return scaleStatus;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,18 @@ internal class DurableTaskTargetScaler : ITargetScaler
private readonly TargetScalerResult scaleResult;
private readonly DurabilityProvider durabilityProvider;
private readonly ILogger logger;
private readonly string functionId;
private readonly string scaler;

public DurableTaskTargetScaler(string functionId, DurableTaskMetricsProvider metricsProvider, DurabilityProvider durabilityProvider, ILogger logger)
public DurableTaskTargetScaler(
string scalerId,
DurableTaskMetricsProvider metricsProvider,
DurabilityProvider durabilityProvider,
ILogger logger)
{
this.functionId = functionId;
this.scaler = scalerId;
this.metricsProvider = metricsProvider;
this.scaleResult = new TargetScalerResult();
this.TargetScalerDescriptor = new TargetScalerDescriptor(this.functionId);
this.TargetScalerDescriptor = new TargetScalerDescriptor(this.scaler);
this.durabilityProvider = durabilityProvider;
this.logger = logger;
}
Expand Down Expand Up @@ -68,7 +72,7 @@ public async Task<TargetScalerResult> GetScaleResultAsync(TargetScalerContext co
// and the ScaleController is injecting it's own custom ILogger implementation that forwards logs to Kusto.
var metricsLog = $"Metrics: workItemQueueLength={workItemQueueLength}. controlQueueLengths={serializedControlQueueLengths}. " +
$"maxConcurrentOrchestrators={this.MaxConcurrentOrchestrators}. maxConcurrentActivities={this.MaxConcurrentActivities}";
var scaleControllerLog = $"Target worker count for '{this.functionId}' is '{numWorkersToRequest}'. " +
var scaleControllerLog = $"Target worker count for '{this.scaler}' is '{numWorkersToRequest}'. " +
metricsLog;

// target worker count should never be negative
Expand All @@ -85,7 +89,7 @@ public async Task<TargetScalerResult> GetScaleResultAsync(TargetScalerContext co
// We want to augment the exception with metrics information for investigation purposes
var metricsLog = $"Metrics: workItemQueueLength={metrics?.WorkItemQueueLength}. controlQueueLengths={metrics?.ControlQueueLengths}. " +
$"maxConcurrentOrchestrators={this.MaxConcurrentOrchestrators}. maxConcurrentActivities={this.MaxConcurrentActivities}";
var errorLog = $"Error: target worker count for '{this.functionId}' resulted in exception. " + metricsLog;
var errorLog = $"Error: target worker count for '{this.scaler}' resulted in exception. " + metricsLog;
throw new Exception(errorLog, ex);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
<MinorVersion>0</MinorVersion>
<PatchVersion>0</PatchVersion>
<VersionPrefix>$(MajorVersion).$(MinorVersion).$(PatchVersion)</VersionPrefix>
<VersionSuffix>rc.2</VersionSuffix>
<VersionSuffix>rc.3</VersionSuffix>
<FileVersion>$(MajorVersion).$(MinorVersion).$(PatchVersion)</FileVersion>
<AssemblyVersion>$(MajorVersion).0.0.0</AssemblyVersion>
<Company>Microsoft Corporation</Company>
Expand Down
12 changes: 8 additions & 4 deletions test/Common/TestEntities.cs
Original file line number Diff line number Diff line change
Expand Up @@ -340,10 +340,14 @@ public static async Task HttpEntity(

private static async Task<int> CallHttpAsync(string requestUri)
{
using (HttpResponseMessage response = await SharedHttpClient.GetAsync(requestUri))
{
return (int)response.StatusCode;
}
////using (HttpResponseMessage response = await SharedHttpClient.GetAsync(requestUri))
////{
//// return (int)response.StatusCode;
////}

// Making real calls to HTTP endpoints is not reliable in tests.
await Task.Delay(100);
return 200;
}

//-------------- an entity that uses custom deserialization
Expand Down
2 changes: 1 addition & 1 deletion test/Common/TestHelpers.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
using DurableTask.AzureStorage;
using Microsoft.ApplicationInsights.Channel;
#if !FUNCTIONS_V1
using Microsoft.Extensions.Hosting;
using Microsoft.Azure.WebJobs.Host.Scale;
using Microsoft.Extensions.Hosting;
#endif
using Microsoft.Azure.WebJobs.Extensions.DurableTask.Storage;
using Microsoft.Azure.WebJobs.Host.TestCommon;
Expand Down
7 changes: 2 additions & 5 deletions test/FunctionsV2/DurableTaskListenerTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,10 @@
// Licensed under the MIT License. See LICENSE in the project root for license information.

using System;
using System.Linq;
using Microsoft.Azure.WebJobs.Host.Executors;
using Microsoft.Azure.WebJobs.Host.Scale;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using Microsoft.Extensions.Options;
using Moq;
using Xunit;

namespace Microsoft.Azure.WebJobs.Extensions.DurableTask.Tests
Expand Down Expand Up @@ -40,9 +37,9 @@ public void GetMonitor_ReturnsExpectedValue()
IScaleMonitor scaleMonitor = this.listener.GetMonitor();

Assert.Equal(typeof(DurableTaskScaleMonitor), scaleMonitor.GetType());
Assert.Equal($"{this.functionId}-DurableTaskTrigger-DurableTaskHub".ToLower(), scaleMonitor.Descriptor.Id);
Assert.Equal($"DurableTask-AzureStorage:DurableTaskHub", scaleMonitor.Descriptor.Id);

var scaleMonitor2 = this.listener.GetMonitor();
IScaleMonitor scaleMonitor2 = this.listener.GetMonitor();

Assert.Same(scaleMonitor, scaleMonitor2);
}
Expand Down
17 changes: 7 additions & 10 deletions test/FunctionsV2/DurableTaskScaleMonitorTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ namespace Microsoft.Azure.WebJobs.Extensions.DurableTask.Tests
{
public class DurableTaskScaleMonitorTests
{
private readonly string functionId = "DurableTaskTriggerFunctionId";
private readonly FunctionName functionName = new FunctionName("DurableTaskTriggerFunctionName");
private readonly string hubName = "DurableTaskTriggerHubName";
private readonly StorageAccountClientProvider clientProvider = new StorageAccountClientProvider(TestHelpers.GetStorageConnectionString());
private readonly ITestOutputHelper output;
Expand All @@ -39,32 +37,31 @@ public DurableTaskScaleMonitorTests(ITestOutputHelper output)
this.loggerFactory.AddProvider(this.loggerProvider);
ILogger logger = this.loggerFactory.CreateLogger(LogCategories.CreateTriggerCategory("DurableTask"));
this.traceHelper = new EndToEndTraceHelper(logger, false);
this.performanceMonitor = new Mock<DisconnectedPerformanceMonitor>(MockBehavior.Strict, new AzureStorageOrchestrationServiceSettings{
this.performanceMonitor = new Mock<DisconnectedPerformanceMonitor>(MockBehavior.Strict, new AzureStorageOrchestrationServiceSettings
{
StorageAccountClientProvider = this.clientProvider,
TaskHubName = this.hubName,
});
var metricsProvider = new DurableTaskMetricsProvider(
this.functionName.Name,
this.hubName,
logger,
this.performanceMonitor.Object,
this.clientProvider);

string scalerId = $"DurableTask-AzureStorage:{this.hubName}";

this.scaleMonitor = new DurableTaskScaleMonitor(
this.functionId,
this.functionName.Name,
scalerId,
this.hubName,
this.clientProvider,
logger,
metricsProvider,
this.performanceMonitor.Object);
metricsProvider);
}

[Fact]
[Trait("Category", PlatformSpecificHelpers.TestCategory)]
public void ScaleMonitorDescriptor_ReturnsExpectedValue()
{
Assert.Equal($"{this.functionId}-DurableTaskTrigger-{this.hubName}".ToLower(), this.scaleMonitor.Descriptor.Id);
Assert.Equal($"DurableTask-AzureStorage:{this.hubName}", this.scaleMonitor.Descriptor.Id);
}

[Fact]
Expand Down
1 change: 0 additions & 1 deletion test/FunctionsV2/DurableTaskTargetScalerTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ public DurableTaskTargetScalerTests(ITestOutputHelper output)
StorageAccountClientProvider storageAccountClientProvider = null;
this.metricsProviderMock = new Mock<DurableTaskMetricsProvider>(
MockBehavior.Strict,
"FunctionName",
"HubName",
logger,
nullPerformanceMonitorMock,
Expand Down

0 comments on commit 6d098b2

Please sign in to comment.