diff --git a/Test/DurableTask.AzureStorage.Tests/TestTablePartitionManager.cs b/Test/DurableTask.AzureStorage.Tests/TestTablePartitionManager.cs new file mode 100644 index 000000000..95fbb78c0 --- /dev/null +++ b/Test/DurableTask.AzureStorage.Tests/TestTablePartitionManager.cs @@ -0,0 +1,721 @@ +// ---------------------------------------------------------------------------------- +// Copyright Microsoft Corporation +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ---------------------------------------------------------------------------------- + +#nullable enable +namespace DurableTask.AzureStorage.Tests +{ + using DurableTask.Core; + using Microsoft.VisualStudio.TestTools.UnitTesting; + using System; + using System.Collections.Generic; + using System.Diagnostics; + using System.Linq; + using System.Runtime.Serialization; + using System.Threading; + using System.Threading.Tasks; + + [TestClass] + public class TestTablePartitionManager + { + readonly string connection = TestHelpers.GetTestStorageAccountConnectionString(); + + // Start with one worker and four partitions. + // Test the worker could claim all the partitions in 5 seconds. + [TestMethod] + public async Task TestOneWorkerWithFourPartition() + { + string testName = nameof(TestOneWorkerWithFourPartition); + var settings = new AzureStorageOrchestrationServiceSettings + { + TaskHubName = testName, + StorageConnectionString = this.connection, + WorkerId = "0", + AppName = testName, + UseTablePartitionManagement = true, + }; + var service = new AzureStorageOrchestrationService(settings); + await service.StartAsync(); + + + WaitForCondition( + timeout: TimeSpan.FromSeconds(5), + condition: () => + { + var partitions = service.ListTableLeases(); + Assert.AreEqual(4, partitions.Count()); + return partitions.All(p => p.CurrentOwner == "0"); + }); + + await service.StopAsync(); + await service.DeleteAsync(); + } + + // Starts with two workers and four partitions. + // Test that one worker can acquire two partitions, + // since two workers can't start at the same time, and one will start earlier than the other one. + // There should be a steal process, and test that the lease transfer will take no longer than 30 sec. + [TestMethod] + public async Task TestTwoWorkerWithFourPartitions() + { + var services = new AzureStorageOrchestrationService[2]; + string testName = nameof(TestTwoWorkerWithFourPartitions); + + for (int i = 0; i < 2; i++) + { + var settings = new AzureStorageOrchestrationServiceSettings + { + TaskHubName = testName, + StorageConnectionString = this.connection, + AppName = testName, + UseTablePartitionManagement = true, + WorkerId = i.ToString(), + }; + services[i] = new AzureStorageOrchestrationService(settings); + } + + var startTasks = services.Select(service => service.StartAsync()); + await Task.WhenAll(startTasks); + + WaitForCondition( + timeout: TimeSpan.FromSeconds(30), + condition: () => + { + var partitions = services[0].ListTableLeases(); + Assert.AreEqual(4, partitions.Count()); + return ((partitions.Count(p => p.CurrentOwner == "0") == 2) && + (partitions.Count(p => p.CurrentOwner == "1") == 2)); + }); + + var stopTasks = services.Select(service => service.StopAsync()); + await Task.WhenAll(stopTasks); + await services[0].DeleteAsync(); + } + + // Starts with four workers and four partitions. + // Test that each worker can acquire one partition. + // Since workers can't start at the same time, there should be a steal lease process. + // Test that the lease transfer will take no longer than 30 sec. + [TestMethod] + public async Task TestFourWorkerWithFourPartitions() + { + var services = new AzureStorageOrchestrationService[4]; + string testName = nameof(TestFourWorkerWithFourPartitions); + + for (int i = 0; i < 4; i++) + { + var settings = new AzureStorageOrchestrationServiceSettings + { + TaskHubName = testName, + StorageConnectionString = this.connection, + AppName = testName, + UseTablePartitionManagement = true, + WorkerId = i.ToString(), + }; + services[i] = new AzureStorageOrchestrationService(settings); + } + + var startTasks = services.Select(service => service.StartAsync()); + await Task.WhenAll(startTasks); + + WaitForCondition( + timeout: TimeSpan.FromSeconds(30), + condition: () => + { + var partitions = services[0].ListTableLeases(); + Assert.AreEqual(4, partitions.Count()); + return ((partitions.Count(p => p.CurrentOwner == "0") == 1) && + (partitions.Count(p => p.CurrentOwner == "1") == 1) && + (partitions.Count(p => p.CurrentOwner == "2") == 1) && + (partitions.Count(p => p.CurrentOwner == "3") == 1)); + }); + + + var stopTasks = services.Select(service => service.StopAsync()); + await Task.WhenAll(stopTasks); + await services[0].DeleteAsync(); + } + + // Starts with one worker and four partitions, then add three more workers. + // Test that each worker can acquire one partitions. + // Test that the lease transfer will take no longer than 30 sec. + [TestMethod] + public async Task TestAddThreeWorkersWithOneWorkerAndFourPartitions() + { + var services = new AzureStorageOrchestrationService[4]; + string testName = "TestAddThreeWorkers"; + + for (int i = 0; i < 4; i++) + { + var settings = new AzureStorageOrchestrationServiceSettings + { + TaskHubName = testName, + StorageConnectionString = this.connection, + AppName = testName, + UseTablePartitionManagement = true, + WorkerId = i.ToString(), + }; + services[i] = new AzureStorageOrchestrationService(settings); + } + + await services[0].StartAsync(); + // Wait for worker[0] to acquire all the partitions. Then start the other three workers. + WaitForCondition( + timeout: TimeSpan.FromSeconds(5), + condition: () => + { + var partitions = services[0].ListTableLeases(); + Assert.AreEqual(4, partitions.Count()); + return partitions.All(p => p.CurrentOwner == "0"); + }); + await services[1].StartAsync(); + await services[2].StartAsync(); + await services[3].StartAsync(); + + // Check that each worker has acquired one partition. + WaitForCondition( + timeout: TimeSpan.FromSeconds(30), + condition: () => + { + var partitions = services[0].ListTableLeases(); + Assert.AreEqual(4, partitions.Count()); + return ((partitions.Count(p => p.CurrentOwner == "0") == 1) && + (partitions.Count(p => p.CurrentOwner == "1") == 1) && + (partitions.Count(p => p.CurrentOwner == "2") == 1) && + (partitions.Count(p => p.CurrentOwner == "3") == 1)); + }); + + var stopTasks = services.Select(service => service.StopAsync()); + await Task.WhenAll(stopTasks); + await services[0].DeleteAsync(); + } + + // Starts with four workers and four partitions. And then add four more workers. + // Test that the added workers will do nothing. + [TestMethod] + public async Task TestAddFourWorkersWithFourWorkersAndFourPartitions() + { + TimeSpan timeout = TimeSpan.FromSeconds(15); + Stopwatch stopwatch = new Stopwatch(); + var services = new AzureStorageOrchestrationService[8]; + string testName = "TestAddFourWorkers"; + + for (int i = 0; i < 8; i++) + { + var settings = new AzureStorageOrchestrationServiceSettings + { + TaskHubName = testName, + StorageConnectionString = this.connection, + AppName = testName, + UseTablePartitionManagement = true, + WorkerId = i.ToString(), + }; + services[i] = new AzureStorageOrchestrationService(settings); + } + + for (int i = 0; i < 4; i++) + { + await services[i].StartAsync(); + } + + WaitForCondition( + timeout: TimeSpan.FromSeconds(30), + condition: () => + { + var partitions = services[0].ListTableLeases(); + Assert.AreEqual(4, partitions.Count()); + return + (partitions.Count(p => p.CurrentOwner == "0") == 1) && + (partitions.Count(p => p.CurrentOwner == "1") == 1) && + (partitions.Count(p => p.CurrentOwner == "2") == 1) && + (partitions.Count(p => p.CurrentOwner == "3") == 1); + }); + + for (int i = 4; i < 8; i++) + { + await services[i].StartAsync(); + } + + var oldDistribution = services[0].ListTableLeases(); + stopwatch.Start(); + bool isDistributionChanged = false; + + while (stopwatch.Elapsed < timeout) + { + var newDistribution = services[0].ListTableLeases(); + Assert.AreEqual(oldDistribution.Count(), newDistribution.Count()); + isDistributionChanged = !(oldDistribution.Zip(newDistribution, (p1, p2) => + p1.CurrentOwner == p2.CurrentOwner && p1.NextOwner== p2.NextOwner).All(result => result)); + + Assert.IsFalse(isDistributionChanged); + } + stopwatch.Stop(); + + var stopTasks = services.Select(service => service.StopAsync()); + await Task.WhenAll(stopTasks); + await services[0].DeleteAsync(); + } + + // Start with four workers and four partitions. And then scale down to three workers. + // Test that partitions will be rebalanced between the three workers: one worker will have two, and the other two both have one. + [TestMethod] + public async Task TestScalingDownToThreeWorkers() + { + var services = new AzureStorageOrchestrationService[4]; + string testName = "TestScalingDownToThree"; + + for (int i = 0; i < 4; i++) + { + var settings = new AzureStorageOrchestrationServiceSettings + { + TaskHubName = testName, + StorageConnectionString = this.connection, + AppName = testName, + UseTablePartitionManagement = true, + WorkerId = i.ToString(), + }; + services[i] = new AzureStorageOrchestrationService(settings); + } + + var startTasks = services.Select(service => service.StartAsync()); + await Task.WhenAll(startTasks); + + // wait for the partitions to be distributed equally among four workers. + WaitForCondition( + timeout: TimeSpan.FromSeconds(30), + condition: () => + { + var partitions = services[0].ListTableLeases(); + Assert.AreEqual(4, partitions.Count()); + return ((partitions.Count(p => p.CurrentOwner == "0") == 1) && + (partitions.Count(p => p.CurrentOwner == "1") == 1) && + (partitions.Count(p => p.CurrentOwner == "2") == 1) && + (partitions.Count(p => p.CurrentOwner == "3") == 1)); + }); + await services[3].StopAsync(); + + bool isBalanced = false; + + WaitForCondition( + timeout: TimeSpan.FromSeconds(10), + condition: () => + { + var partitions = services[0].ListTableLeases(); + Assert.AreEqual(4, partitions.Count()); + + // Assert that two partitions have the same CurrentOwner value and the other two have unique CurrentOwner values + int distinctCurrentOwnersCount = partitions.Select(x => x.CurrentOwner).Distinct().Count(); + isBalanced = distinctCurrentOwnersCount == 3; + return isBalanced; + }); + + var stopTasks = services.Select(service => service.StopAsync()); + await Task.WhenAll(stopTasks); + await services[0].DeleteAsync(); + } + + // Start with four workers and four partitions. And then scale down to one worker. + // Test that the left one worker will take the four partitions. + [TestMethod] + public async Task TestScalingDownToOneWorkers() + { + var services = new AzureStorageOrchestrationService[4]; + string testName = "TestScalingDownToOne"; + + for (int i = 0; i < 4; i++) + { + var settings = new AzureStorageOrchestrationServiceSettings + { + TaskHubName = testName, + StorageConnectionString = this.connection, + AppName = testName, + UseTablePartitionManagement = true, + WorkerId = i.ToString(), + }; + services[i] = new AzureStorageOrchestrationService(settings); + } + + var startTasks = services.Select(service => service.StartAsync()); + await Task.WhenAll(startTasks); + + // wait for the partitions to be distributed equally among four workers. + WaitForCondition( + timeout: TimeSpan.FromSeconds(30), + condition: () => + { + var partitions = services[0].ListTableLeases(); + Assert.AreEqual(4, partitions.Count()); + return ((partitions.Count(p => p.CurrentOwner == "0") == 1) && + (partitions.Count(p => p.CurrentOwner == "1") == 1) && + (partitions.Count(p => p.CurrentOwner == "2") == 1) && + (partitions.Count(p => p.CurrentOwner == "3") == 1)); + }); + + IList tasks = new List + { + services[1].StopAsync(), + services[2].StopAsync(), + services[3].StopAsync() + }; + await Task.WhenAll(tasks); + + + WaitForCondition( + timeout: TimeSpan.FromSeconds(10), + condition: () => + { + var partitions = services[0].ListTableLeases(); + Assert.AreEqual(4, partitions.Count()); + return partitions.All(p => p.CurrentOwner == "0"); + }); + + await services[0].StopAsync(); + await services[0].DeleteAsync(); + } + + [TestMethod] + // Start with four workers and four partitions. Then kill one worker. + // Test that the partitions will be rebalanced among the three remaining workers. + public async Task TestKillOneWorker() + { + var services = new AzureStorageOrchestrationService[4]; + string testName = nameof(TestKillOneWorker); + + for (int i = 0; i < 4; i++) + { + var settings = new AzureStorageOrchestrationServiceSettings + { + TaskHubName = testName, + StorageConnectionString = this.connection, + AppName = testName, + UseTablePartitionManagement = true, + WorkerId = i.ToString(), + }; + services[i] = new AzureStorageOrchestrationService(settings); + } + + var startTasks = services.Select(service => service.StartAsync()); + await Task.WhenAll(startTasks); + + // wait for the partitions to be distributed equally among four workers. + WaitForCondition( + timeout: TimeSpan.FromSeconds(30), + condition: () => + { + var partitions = services[0].ListTableLeases(); + Assert.AreEqual(4, partitions.Count()); + return ((partitions.Count(p => p.CurrentOwner == "0") == 1) && + (partitions.Count(p => p.CurrentOwner == "1") == 1) && + (partitions.Count(p => p.CurrentOwner == "2") == 1) && + (partitions.Count(p => p.CurrentOwner == "3") == 1)); + }); + + services[3].KillPartitionManagerLoop(); + + bool isBalanced = false; + + WaitForCondition( + timeout: TimeSpan.FromSeconds(40), + condition: () => + { + var partitions = services[0].ListTableLeases(); + Assert.AreEqual(4, partitions.Count()); + + // Assert that two partitions have the same CurrentOwner value and the other two have unique CurrentOwner values + int distinctCurrentOwnersCount = partitions.Select(x => x.CurrentOwner).Distinct().Count(); + isBalanced = distinctCurrentOwnersCount == 3; + return isBalanced; + }); + + IList tasks = new List(); + tasks.Add(services[0].StopAsync()); + tasks.Add(services[1].StopAsync()); + tasks.Add(services[2].StopAsync()); + await Task.WhenAll(tasks); + await services[0].DeleteAsync(); + } + + // Start with four workers and four partitions. Then kill three workers. + // Test that the remaining worker will take all the partitions. + [TestCategory("DisabledInCI")] + [TestMethod] + public async Task TestKillThreeWorker() + { + var services = new AzureStorageOrchestrationService[4]; + string testName = nameof(TestKillThreeWorker); + + for (int i = 0; i < 4; i++) + { + var settings = new AzureStorageOrchestrationServiceSettings + { + TaskHubName = testName, + StorageConnectionString = this.connection, + AppName = testName, + UseTablePartitionManagement = true, + WorkerId = i.ToString(), + }; + services[i] = new AzureStorageOrchestrationService(settings); + } + + var startTasks = services.Select(service => service.StartAsync()); + await Task.WhenAll(startTasks); + + // wait for the partitions to be distributed equally among four workers. + WaitForCondition( + timeout: TimeSpan.FromSeconds(30), + condition: () => + { + var partitions = services[0].ListTableLeases(); + Assert.AreEqual(4, partitions.Count()); + return ((partitions.Count(p => p.CurrentOwner == "0") == 1) && + (partitions.Count(p => p.CurrentOwner == "1") == 1) && + (partitions.Count(p => p.CurrentOwner == "2") == 1) && + (partitions.Count(p => p.CurrentOwner == "3") == 1)); + }); + + services[3].KillPartitionManagerLoop(); + services[2].KillPartitionManagerLoop(); + services[1].KillPartitionManagerLoop(); + + WaitForCondition( + timeout: TimeSpan.FromSeconds(40), + condition: () => + { + var partitions = services[0].ListTableLeases(); + Assert.AreEqual(4, partitions.Count()); + return partitions.All(p => p.CurrentOwner == "0"); + }); + + + await services[0].StopAsync(); + await services[0].DeleteAsync(); + } + + /// + /// End to end test to simulate two workers with one partition. + /// Simulate that one worker becomes unhealthy, make sure the other worker will take over the partition. + /// After that the unhealthy worker becomes healthy again, make sure it will not take the partition back and also + /// it won't dequeue the control queue of the stolen partition. + /// + [TestMethod] + public async Task TestUnhealthyWorker() + { + const int WorkerCount = 2; + const int InstanceCount = 50; + var services = new AzureStorageOrchestrationService[WorkerCount]; + var taskHubWorkers = new TaskHubWorker[WorkerCount]; + + for (int i = 0; i < WorkerCount; i++) + { + var settings = new AzureStorageOrchestrationServiceSettings() + { + StorageConnectionString = this.connection, + TaskHubName = "TestUnhealthyWorker", + PartitionCount = 1, + WorkerId = i.ToString(), + UseTablePartitionManagement = true, + }; + services[i] = new AzureStorageOrchestrationService(settings); + taskHubWorkers[i] = new TaskHubWorker(services[i]); + taskHubWorkers[i].AddTaskOrchestrations(typeof(LongRunningOrchestrator)); + } + + // Create 50 orchestration instances. + var client = new TaskHubClient(services[0]); + var createInstanceTasks = new Task[InstanceCount]; + for (int i = 0; i < InstanceCount; i++) + { + createInstanceTasks[i] = client.CreateOrchestrationInstanceAsync(typeof(LongRunningOrchestrator), input: null); + } + OrchestrationInstance[] instances = await Task.WhenAll(createInstanceTasks); + + await taskHubWorkers[0].StartAsync(); + // Ensure worker 0 acquired the partition. + WaitForCondition( + timeout: TimeSpan.FromSeconds(2), + condition: () => services[0].ListTableLeases().Single().CurrentOwner == "0"); + await taskHubWorkers[1].StartAsync(); + + using var cts = new CancellationTokenSource(); + { + // Kill worker[0] and start a new worker. + // The new worker will take over the partitions of worker[0]. + services[0].KillPartitionManagerLoop(); + WaitForCondition( + timeout: TimeSpan.FromSeconds(40), + condition: () => services[0].ListTableLeases().Single().CurrentOwner == "1"); + + // After worker[1] takes over the lease, restart the worker[0]. + services[0].SimulateUnhealthyWorker(cts.Token); + + // Wait one second for worker0 to remove the lost partitions' control queue. + await Task.Delay(1000); + Assert.AreEqual(1, services[1].OwnedControlQueues.Count()); + Assert.AreEqual(0, services[0].OwnedControlQueues.Count()); + + // Check all the instances could be processed successfully. + OrchestrationState[] states = await Task.WhenAll( + instances.Select(i => client.WaitForOrchestrationAsync(i, TimeSpan.FromSeconds(30)))); + Assert.IsTrue( + Array.TrueForAll(states, s => s?.OrchestrationStatus == OrchestrationStatus.Completed), + "Not all orchestrations completed successfully!"); + } + await taskHubWorkers[1].StopAsync(); + } + + /// + /// End to End test with four workers and four partitions. + /// Ensure that no worker listens to the same queue at the same time during the balancing process. + /// Ensure that all instances should be processed sucessfully. + /// + /// + [TestCategory("DisabledInCI")] + [TestMethod] + public async Task EnsureOwnedQueueExclusive() + { + const int WorkerCount = 4; + const int InstanceCount = 100; + var services = new AzureStorageOrchestrationService[WorkerCount]; + var taskHubWorkers = new TaskHubWorker[WorkerCount]; + + // Create 4 task hub workers. + for (int i = 0; i < WorkerCount; i++) + { + var settings = new AzureStorageOrchestrationServiceSettings() + { + StorageConnectionString = this.connection, + TaskHubName = nameof(EnsureOwnedQueueExclusive), + PartitionCount = 4, + WorkerId = i.ToString(), + UseTablePartitionManagement = true, + }; + services[i] = new AzureStorageOrchestrationService(settings); + taskHubWorkers[i] = new TaskHubWorker(services[i]); + taskHubWorkers[i].AddTaskOrchestrations(typeof(HelloOrchestrator)); + taskHubWorkers[i].AddTaskActivities(typeof(Hello)); + } + + // Create 100 orchestration instances. + var client = new TaskHubClient(services[0]); + var createInstanceTasks = new Task[InstanceCount]; + for (int i = 0; i < InstanceCount; i++) + { + createInstanceTasks[i] = client.CreateOrchestrationInstanceAsync(typeof(HelloOrchestrator), input: i.ToString()); + } + OrchestrationInstance[] instances = await Task.WhenAll(createInstanceTasks); + + var taskHubWorkerTasks = taskHubWorkers.Select(worker => worker.StartAsync()); + await Task.WhenAll(taskHubWorkerTasks); + + // Check all the workers are not listening to the same queue at the same time during the balancing. + bool isBalanced = false; + var timeout = TimeSpan.FromSeconds(30); + var sw = Stopwatch.StartNew(); + while (!isBalanced) + { + Assert.IsTrue(sw.Elapsed <= timeout, "Timeout expired!"); + var partitions = services[0].ListTableLeases(); + Assert.AreEqual(4, partitions.Count()); + isBalanced = (partitions.Count(p => p.CurrentOwner == "0") == 1) && + (partitions.Count(p => p.CurrentOwner == "1") == 1) && + (partitions.Count(p => p.CurrentOwner == "2") == 1) && + (partitions.Count(p => p.CurrentOwner == "3") == 1); + + string[] array0 = services[0].OwnedControlQueues.Select(i => i.Name).ToArray(); + string[] array1 = services[1].OwnedControlQueues.Select(i => i.Name).ToArray(); + string[] array2 = services[2].OwnedControlQueues.Select(i => i.Name).ToArray(); + string[] array3 = services[3].OwnedControlQueues.Select(i => i.Name).ToArray(); + bool allUnique = CheckAllArraysUnique(array1, array2, array3, array0); + await Task.Delay(1000); + + Assert.IsTrue(allUnique, "Multiple workers lsiten to the same queue at the same time."); + } + + // Check all the instances could be processed successfully. + OrchestrationState[] states = await Task.WhenAll( + instances.Select(i => client.WaitForOrchestrationAsync(i, TimeSpan.FromSeconds(30)))); + Assert.IsTrue( + Array.TrueForAll(states, s => s?.OrchestrationStatus == OrchestrationStatus.Completed), + "Not all orchestrations completed successfully!"); + + var stopServiceTasks = taskHubWorkers.Select(worker => worker.StopAsync()); + await Task.WhenAll(stopServiceTasks); + } + + [KnownType(typeof(Hello))] + internal class HelloOrchestrator : TaskOrchestration + { + public override async Task RunTask(OrchestrationContext context, string input) + { + return await context.ScheduleTask(typeof(Hello), "world"); + } + } + + internal class Hello : TaskActivity + { + protected override string Execute(TaskContext context, string input) + { + if (string.IsNullOrEmpty(input)) + { + throw new ArgumentNullException(nameof(input)); + } + + Console.WriteLine($"Activity: Hello {input}"); + return $"Hello, {input}!"; + } + } + + internal class LongRunningOrchestrator : TaskOrchestration + { + public override Task RunTask(OrchestrationContext context, string input) + { + Thread.Sleep(TimeSpan.FromSeconds(10)); + return Task.FromResult(string.Empty); + } + } + + // Check if all the arrays elements are unique. + static bool CheckAllArraysUnique(params string[][] arrays) + { + int totalArrays = arrays.Length; + + for (int i = 0; i < totalArrays - 1; i++) + { + for (int j = i + 1; j < totalArrays; j++) + { + if (arrays[i].Intersect(arrays[j]).Any()) + { + return false; // Found at least one common element, arrays are not unique + } + } + } + + return true; // No common elements found, arrays are unique + } + + static void WaitForCondition(TimeSpan timeout, Func condition) + { + if (Debugger.IsAttached) + { + // Give more time for debugging so we can step through the code + timeout = TimeSpan.FromMinutes(3); + } + + var sw = Stopwatch.StartNew(); + while (!condition()) + { + Assert.IsTrue(sw.Elapsed < timeout, "Timeout expired"); + Thread.Sleep(TimeSpan.FromSeconds(1)); + } + } + } +} diff --git a/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs b/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs index 3bdb99d3a..8967e30cd 100644 --- a/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs +++ b/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs @@ -31,7 +31,7 @@ namespace DurableTask.AzureStorage using DurableTask.Core.History; using DurableTask.Core.Query; using Microsoft.WindowsAzure.Storage; - using Newtonsoft.Json; + using Microsoft.WindowsAzure.Storage.Table; /// /// Orchestration service provider for the Durable Task Framework which uses Azure Storage as the durable store. @@ -151,7 +151,17 @@ public AzureStorageOrchestrationService(AzureStorageOrchestrationServiceSettings this.stats, this.trackingStore); - if (this.settings.UseLegacyPartitionManagement) + if (this.settings.UseTablePartitionManagement && this.settings.UseLegacyPartitionManagement) + { + throw new ArgumentException("Cannot use both TablePartitionManagement and LegacyPartitionManagement. For improved reliability, consider using the TablePartitionManager."); + } + else if (this.settings.UseTablePartitionManagement) + { + this.partitionManager = new TablePartitionManager( + this, + this.azureStorageClient); + } + else if (this.settings.UseLegacyPartitionManagement) { this.partitionManager = new LegacyPartitionManager( this, @@ -503,18 +513,62 @@ internal async Task OnOwnershipLeaseAquiredAsync(BlobLease lease) this.allControlQueues[lease.PartitionId] = controlQueue; } + internal void DropLostControlQueue(TableLease partition) + { + // If lease is lost but we're still dequeuing messages, remove the queue + if (this.allControlQueues.TryGetValue(partition.RowKey, out ControlQueue controlQueue) && + this.OwnedControlQueues.Contains(controlQueue) && + partition.CurrentOwner != this.settings.WorkerId) + { + this.orchestrationSessionManager.RemoveQueue(partition.RowKey, CloseReason.LeaseLost, nameof(DropLostControlQueue)); + } + } + internal Task OnOwnershipLeaseReleasedAsync(BlobLease lease, CloseReason reason) { this.orchestrationSessionManager.RemoveQueue(lease.PartitionId, reason, "Ownership LeaseCollectionBalancer"); return Utils.CompletedTask; } + internal async Task OnTableLeaseAcquiredAsync(TableLease lease) + { + var controlQueue = new ControlQueue(this.azureStorageClient, lease.RowKey, this.messageManager); + await controlQueue.CreateIfNotExistsAsync(); + this.orchestrationSessionManager.AddQueue(lease.RowKey, controlQueue, this.shutdownSource.Token); + + this.allControlQueues[lease.RowKey] = controlQueue; + } + + internal async Task DrainTablePartitionAsync(TableLease lease, CloseReason reason) + { + using var cts = new CancellationTokenSource(delay: TimeSpan.FromSeconds(60)); + await this.orchestrationSessionManager.DrainAsync(lease.RowKey, reason, cts.Token, nameof(DrainTablePartitionAsync)); + } + // Used for testing internal Task> ListBlobLeasesAsync() { return this.partitionManager.GetOwnershipBlobLeases(); } + // Used for table partition manager testing + internal IEnumerable ListTableLeases() + { + return ((TablePartitionManager)this.partitionManager).GetTableLeases(); + } + + // Used for table partition manager testing. + internal void SimulateUnhealthyWorker(CancellationToken testToken) + { + ((TablePartitionManager)this.partitionManager).SimulateUnhealthyWorker(testToken); + } + + // Used for table partition manager testing + internal void KillPartitionManagerLoop() + { + ((TablePartitionManager)this.partitionManager).KillLoop(); + } + internal static async Task GetControlQueuesAsync( AzureStorageClient azureStorageClient, int defaultPartitionCount) @@ -526,14 +580,29 @@ internal static async Task GetControlQueuesAsync( string taskHub = azureStorageClient.Settings.TaskHubName; - BlobLeaseManager inactiveLeaseManager = GetBlobLeaseManager(azureStorageClient, "inactive"); + // Need to check for leases in Azure Table Storage. Scale Controller calls into this method. + int partitionCount; + Table partitionTable = azureStorageClient.GetTableReference(azureStorageClient.Settings.PartitionTableName); + + // Check if table partition manager is used. If so, get partition count from table. + // Else, get the partition count from the blobs. + if (await partitionTable.ExistsAsync()) + { + TableEntitiesResponseInfo result = await partitionTable.ExecuteQueryAsync(new TableQuery()); + partitionCount = result.ReturnedEntities.Count; + } + else + { + BlobLeaseManager inactiveLeaseManager = GetBlobLeaseManager(azureStorageClient, "inactive"); - TaskHubInfo hubInfo = await inactiveLeaseManager.GetOrCreateTaskHubInfoAsync( - GetTaskHubInfo(taskHub, defaultPartitionCount), - checkIfStale: false); + TaskHubInfo hubInfo = await inactiveLeaseManager.GetOrCreateTaskHubInfoAsync( + GetTaskHubInfo(taskHub, defaultPartitionCount), + checkIfStale: false); + partitionCount = hubInfo.PartitionCount; + }; - var controlQueues = new Queue[hubInfo.PartitionCount]; - for (int i = 0; i < hubInfo.PartitionCount; i++) + var controlQueues = new Queue[partitionCount]; + for (int i = 0; i < partitionCount; i++) { controlQueues[i] = azureStorageClient.GetQueueReference(GetControlQueueName(taskHub, i)); } diff --git a/src/DurableTask.AzureStorage/AzureStorageOrchestrationServiceSettings.cs b/src/DurableTask.AzureStorage/AzureStorageOrchestrationServiceSettings.cs index 95c66e51a..8d432b151 100644 --- a/src/DurableTask.AzureStorage/AzureStorageOrchestrationServiceSettings.cs +++ b/src/DurableTask.AzureStorage/AzureStorageOrchestrationServiceSettings.cs @@ -14,14 +14,14 @@ namespace DurableTask.AzureStorage { using System; - using DurableTask.AzureStorage.Partitioning; + using System.Runtime.Serialization; + using System.Threading.Tasks; using DurableTask.AzureStorage.Logging; + using DurableTask.AzureStorage.Partitioning; using DurableTask.Core; using Microsoft.Extensions.Logging; using Microsoft.WindowsAzure.Storage.Queue; using Microsoft.WindowsAzure.Storage.Table; - using System.Runtime.Serialization; - using System.Threading.Tasks; /// /// Settings that impact the runtime behavior of the . @@ -160,8 +160,9 @@ public class AzureStorageOrchestrationServiceSettings public TimeSpan LeaseAcquireInterval { get; set; } = TimeSpan.FromSeconds(10); /// - /// Interval for which the lease is taken on Azure Blob representing a task hub partition. If the lease is not renewed within this - /// interval, it will cause it to expire and ownership of the partition will move to another worker instance. + /// Interval for which the lease is taken on Azure Blob representing a task hub partition in partition manager V1 (legacy partition manager) and V2 (safe partition manager). + /// The amount of time that a lease expiration deadline is extended on a renewal in partition manager V3 (table partition manager). + /// If the lease is not renewed within this within this timespan, it will expire and ownership of the partition may move to another worker. /// public TimeSpan LeaseInterval { get; set; } = TimeSpan.FromSeconds(30); @@ -208,6 +209,11 @@ public class AzureStorageOrchestrationServiceSettings /// public bool UseLegacyPartitionManagement { get; set; } = false; + /// + /// Use the newer Azure Tables-based partition manager instead of the older Azure Blobs-based partition manager. The default value is false. + /// + public bool UseTablePartitionManagement { get; set; } = false; + /// /// User serialization that will respect . Default is false. /// @@ -263,6 +269,7 @@ public class AzureStorageOrchestrationServiceSettings internal string HistoryTableName => this.HasTrackingStoreStorageAccount ? $"{this.TrackingStoreNamePrefix}History" : $"{this.TaskHubName}History"; internal string InstanceTableName => this.HasTrackingStoreStorageAccount ? $"{this.TrackingStoreNamePrefix}Instances" : $"{this.TaskHubName}Instances"; + internal string PartitionTableName => $"{this.TaskHubName}Partitions"; /// /// Gets an instance of that can be used for writing structured logs. diff --git a/src/DurableTask.AzureStorage/DurableTask.AzureStorage.csproj b/src/DurableTask.AzureStorage/DurableTask.AzureStorage.csproj index 4cf0ceb2b..827c73752 100644 --- a/src/DurableTask.AzureStorage/DurableTask.AzureStorage.csproj +++ b/src/DurableTask.AzureStorage/DurableTask.AzureStorage.csproj @@ -20,8 +20,8 @@ 1 - 13 - 8 + 14 + 0 $(MajorVersion).$(MinorVersion).$(PatchVersion) $(VersionPrefix).0 @@ -45,6 +45,10 @@ + + + + diff --git a/src/DurableTask.AzureStorage/OrchestrationSessionManager.cs b/src/DurableTask.AzureStorage/OrchestrationSessionManager.cs index dca19c019..6d990c63d 100644 --- a/src/DurableTask.AzureStorage/OrchestrationSessionManager.cs +++ b/src/DurableTask.AzureStorage/OrchestrationSessionManager.cs @@ -174,6 +174,45 @@ async Task DequeueLoop(string partitionId, ControlQueue controlQueue, Cancellati $"Stopped listening for messages on queue {controlQueue.Name}."); } + /// + /// The drain process occurs when the lease is stolen or the worker is shutting down, + /// prompting the worker to cease listening for new messages and to finish processing all the existing information in memory. + /// + /// The partition that is going to released. + /// Reason to trigger the drain progres. + /// Cancel the drain process if it takes too long in case the worker is unhealthy. + /// The worker that calls this method. + /// + public async Task DrainAsync(string partitionId, CloseReason reason, CancellationToken cancellationToken, string caller) + { + // Start the drain process, mark the queue released to stop listening for new message + this.ReleaseQueue(partitionId, reason, caller); + try + { + // Wait until all messages from this queue have been processed. + while (!cancellationToken.IsCancellationRequested && this.IsControlQueueProcessingMessages(partitionId)) + { + await Task.Delay(500, cancellationToken); + } + } + catch (OperationCanceledException) + { + this.settings.Logger.PartitionManagerError( + this.storageAccountName, + this.settings.TaskHubName, + this.settings.WorkerId, + partitionId, + $"Timed-out waiting for the partition to finish draining." + ); + } + finally + { + // Remove the partition from memory + this.RemoveQueue(partitionId, reason, caller); + } + } + + /// /// This method enumerates all the provided queue messages looking for ExecutionStarted messages. If any are found, it /// queries table storage to ensure that each message has a matching record in the Instances table. If not, this method diff --git a/src/DurableTask.AzureStorage/Partitioning/LeaseCollectionBalancerOptions.cs b/src/DurableTask.AzureStorage/Partitioning/LeaseCollectionBalancerOptions.cs index e18dbc692..3b214df09 100644 --- a/src/DurableTask.AzureStorage/Partitioning/LeaseCollectionBalancerOptions.cs +++ b/src/DurableTask.AzureStorage/Partitioning/LeaseCollectionBalancerOptions.cs @@ -23,17 +23,27 @@ class LeaseCollectionBalancerOptions /// /// Renew interval for all leases for partitions currently held by this instance. /// + /// + /// The table partition manager does not use this setting. It instead relies on + /// to determine the interval for renewing partition leases. + /// public TimeSpan RenewInterval { get; set; } = TimeSpan.FromSeconds(10); /// /// Interval when this instance kicks off a task to compute if partitions are distributed evenly /// among known host instances. /// + /// + /// When using the table partition manager, this property sets the frequency at which the + /// worker reads and updates the partition table except in the following two scenarios: + /// (1) If the worker fails to update the partition table, then the partitions table is read immediately. + /// (2) If the worker is waiting for a partition to be released or is working on releasing a partition, then the interval becomes 1 second. + /// public TimeSpan AcquireInterval { get; set; } = TimeSpan.FromSeconds(10); /// - /// Interval for which the lease is taken on Azure Blob representing an EventHub partition. If the lease is not renewed within this - /// interval, it will cause it to expire and ownership of the partition will move to another instance. + /// Interval for which the lease is taken. If the lease is not renewed within this + /// interval, the lease will expire and ownership of the partition will move to another instance. /// public TimeSpan LeaseInterval { get; set; } = TimeSpan.FromSeconds(30); diff --git a/src/DurableTask.AzureStorage/Partitioning/TableLease.cs b/src/DurableTask.AzureStorage/Partitioning/TableLease.cs new file mode 100644 index 000000000..7861001c3 --- /dev/null +++ b/src/DurableTask.AzureStorage/Partitioning/TableLease.cs @@ -0,0 +1,80 @@ +// ---------------------------------------------------------------------------------- +// Copyright Microsoft Corporation +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ---------------------------------------------------------------------------------- + +#nullable enable +namespace DurableTask.AzureStorage.Partitioning +{ + using System; + using Azure; + using Azure.Data.Tables; + + /// + /// The partition lease used by the table partition manager. + /// + public class TableLease : ITableEntity + { + // Constant partition key value used for all rows in the partition table. The actual value doesn't matter + // as long as all entries use the same partition key value. + internal const string DefaultPartitionKey = ""; + + /// + /// Required atrribute of Azure.Data.Tables storage entity. It is always set to . + /// + public string PartitionKey { get; set; } = DefaultPartitionKey; + + /// + /// The name of the partition/control queue. + /// + public string? RowKey { get; set; } + + /// + /// The current owner of this lease. + /// + public string? CurrentOwner { get; set; } + + /// + /// The name of the worker stealing this lease. It's null when no worker is actively stealing it. + /// + public string? NextOwner { get; set; } + + /// + /// The timestamp at which the partition was originally acquired by this worker. + /// + public DateTime? OwnedSince { get; set; } + + /// + /// The timestamp at which the partition was last renewed. + /// + public DateTime? LastRenewal { get; set; } + + /// + /// The timestamp at which the partition lease expires. + /// + public DateTime? ExpiresAt { get; set; } + + /// + /// True if the partition is being drained; False otherwise. + /// + public bool IsDraining { get; set; } = false; + + /// + /// Required atrribute of Azure.Data.Tables storage entity. Not used. + /// + public DateTimeOffset? Timestamp { get; set; } + + /// + /// Unique identifier used to version entities and ensure concurrency safety in Azure.Data.Tables. + /// + public ETag ETag { get; set; } + } +} diff --git a/src/DurableTask.AzureStorage/Partitioning/TablePartitionManager.cs b/src/DurableTask.AzureStorage/Partitioning/TablePartitionManager.cs new file mode 100644 index 000000000..fb43c81ec --- /dev/null +++ b/src/DurableTask.AzureStorage/Partitioning/TablePartitionManager.cs @@ -0,0 +1,883 @@ +// ---------------------------------------------------------------------------------- +// Copyright Microsoft Corporation +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ---------------------------------------------------------------------------------- + +#nullable enable +namespace DurableTask.AzureStorage.Partitioning +{ + using System; + using System.Collections.Generic; + using System.Linq; + using System.Threading; + using System.Threading.Tasks; + using Azure; + using Azure.Data.Tables; + using DurableTask.AzureStorage.Storage; + + /// + /// Partition Manager V3 based on Azure Storage Tables. + /// + /// + /// Previous partition managers were based on Azure Storage blobs, which are more complex to manage + /// and have more expensive per-transaction costs, particularly in Azure Storage V2 accounts. This + /// table storage-based partition manager aims to be both simpler, cheaper, easier to debug, and + /// faster when it comes to rebalancing. + /// + sealed class TablePartitionManager : IPartitionManager, IDisposable + { + // Used for logging purposes only, indicating that a particular parameter (usually a partition ID) doesn't apply. + const string NotApplicable = ""; + + readonly AzureStorageClient azureStorageClient; + readonly AzureStorageOrchestrationService service; + readonly AzureStorageOrchestrationServiceSettings settings; + readonly CancellationTokenSource gracefulShutdownTokenSource; + readonly CancellationTokenSource forcefulShutdownTokenSource; + readonly string storageAccountName; + readonly TableClient partitionTable; + readonly TableLeaseManager tableLeaseManager; + readonly LeaseCollectionBalancerOptions options; + + Task partitionManagerTask; + + /// + /// Constructor to initiate new instances of TablePartitionManager. + /// + /// Client for the storage account. + /// The service responsible for initiating or terminating the partition manager. + public TablePartitionManager( + AzureStorageOrchestrationService service, + AzureStorageClient azureStorageClient) + { + this.azureStorageClient = azureStorageClient; + this.service = service; + this.settings = this.azureStorageClient.Settings; + this.storageAccountName = this.azureStorageClient.TableAccountName; + + string connectionString = this.settings.StorageConnectionString ?? this.settings.StorageAccountDetails.ConnectionString; + if (string.IsNullOrEmpty(connectionString)) + { + throw new InvalidOperationException("A connection string is required to use the table partition manager. Managed identity is not yet supported when using the table partition manager. Please provide a connection string to your storage account in your app settings."); + } + + this.options = new LeaseCollectionBalancerOptions + { + AcquireInterval = this.settings.LeaseAcquireInterval, + LeaseInterval = this.settings.LeaseInterval, + ShouldStealLeases = true + }; + this.gracefulShutdownTokenSource = new CancellationTokenSource(); + this.forcefulShutdownTokenSource = new CancellationTokenSource(); + this.partitionTable = new TableClient(connectionString, this.settings.PartitionTableName); + this.tableLeaseManager = new TableLeaseManager(this.partitionTable, this.service, this.settings, this.storageAccountName, this.options); + this.partitionManagerTask = Task.CompletedTask; + } + + + /// + /// Starts the partition management loop for the current worker. + /// + Task IPartitionManager.StartAsync() + { + // Run the partition manager loop in the background + this.partitionManagerTask = this.PartitionManagerLoop( + this.gracefulShutdownTokenSource.Token, + this.forcefulShutdownTokenSource.Token); + this.settings.Logger.PartitionManagerInfo( + this.storageAccountName, + this.settings.TaskHubName, + this.settings.WorkerId, + partitionId: NotApplicable, + details: $"Started the background partition manager loop to acquire and balance partitions."); + return Task.CompletedTask; + } + + + /// + /// This loop is the main loop for worker to manage the partition table. + /// Worker will manage the table regularly with default interval. + /// If the worker is waiting for any other worker's partitions or is going to release any owned partitions, the wait time will be 1 second for timely update. + /// If worker failed to update the table or any other exceptions occurred, the worker will re-try immediately. + /// If the failure operations occurred too many times, the wait time will be back to default value to avoid excessive loggings. + /// Loop will end after shutdown is requested and the worker successfully released all ownership leases. + /// + /// Cancellation of this token initiates the graceful shutdown process of the partition manager. + /// Cancellation of this token forcefully aborts the partition manager loop. + async Task PartitionManagerLoop(CancellationToken gracefulShutdownToken, CancellationToken forcefulShutdownToken) + { + const int MaxFailureCount = 10; + + int consecutiveFailureCount = 0; + bool isShuttingDown = gracefulShutdownToken.IsCancellationRequested; + + while (true) + { + TimeSpan timeToSleep = this.options.AcquireInterval; + + try + { + ReadTableReponse response = await this.tableLeaseManager.ReadAndWriteTableAsync(isShuttingDown, forcefulShutdownToken); + + // If shutdown is requested and already released all ownership leases, then break the loop. + if (isShuttingDown && response.ReleasedAllLeases) + { + this.settings.Logger.PartitionManagerInfo( + this.storageAccountName, + this.settings.TaskHubName, + this.settings.WorkerId, + partitionId: NotApplicable, + "Successfully released all ownership leases for shutdown."); + break; + } + + // Poll more frequently if we are draining a partition or waiting for a partition to be released + // by another worker. This is a temporary state and we want to try and be as responsive to updates + // as possible to minimize the time spent in this state, which is effectively downtime for orchestrations. + if (response.IsDrainingPartition || response.IsWaitingForPartitionRelease) + { + timeToSleep = TimeSpan.FromSeconds(1); + } + + consecutiveFailureCount = 0; + } + // Exception Status 412 represents an out of date ETag. We already logged this. + catch (RequestFailedException ex) when (ex.Status == 412) + { + consecutiveFailureCount++; + } + // Eat any unexpected exceptions. + catch (Exception exception) + { + this.settings.Logger.PartitionManagerError( + this.storageAccountName, + this.settings.TaskHubName, + this.settings.WorkerId, + partitionId: NotApplicable, + details: $"Unexpected error occurred while trying to manage table partition leases: {exception}"); + + consecutiveFailureCount++; + } + + // If table update failed, we re-read the table immediately to obtain the latest ETag. + // In the case of too many successive failures, we wait before retrying to prevent excessive logs. + if (consecutiveFailureCount > 0 && consecutiveFailureCount < MaxFailureCount) + { + timeToSleep = TimeSpan.FromSeconds(0); + } + + try + { + if (isShuttingDown || forcefulShutdownToken.IsCancellationRequested) + { + // If shutdown is required, we sleep for a short period to ensure a relatively fast shutdown process + await Task.Delay(timeToSleep, forcefulShutdownToken); + } + else + { + // Normal case: the amount of time we sleep varies depending on the situation. + await Task.Delay(timeToSleep, gracefulShutdownToken); + } + } + catch (OperationCanceledException) when (gracefulShutdownToken.IsCancellationRequested) + { + // Shutdown requested, but we still need to release all leases + if (!isShuttingDown) + { + isShuttingDown = true; + this.settings.Logger.PartitionManagerInfo( + this.storageAccountName, + this.settings.TaskHubName, + this.settings.WorkerId, + partitionId: NotApplicable, + details: $"Requested to cancel partition manager table manage loop. Initiate shutdown process."); + } + } + } + + this.settings.Logger.PartitionManagerInfo( + this.storageAccountName, + this.settings.TaskHubName, + this.settings.WorkerId, + partitionId: NotApplicable, + "Stopped background table partition manager loop."); + } + + /// + /// Stop the partition manager. It first stops the partition manager loop to prevent stealing partitions during shutdown. + /// Then it starts the Task ShutDown() to release all ownership leases. + /// Worker will retry updating the table if the update failed or if any other exceptions occurred. + /// In the case of too many failed operations, the loop waiting time will be extended to avoid excessive logs. + /// + async Task IPartitionManager.StopAsync() + { + this.gracefulShutdownTokenSource.Cancel(); + this.settings.Logger.PartitionManagerInfo( + this.storageAccountName, + this.settings.TaskHubName, + this.settings.WorkerId, + partitionId: NotApplicable, + "Started draining the in-memory messages of all owned control queues for shutdown."); + + // Wait 10 minutes for the partition manager to shutdown gracefully. Otherwise force a shutdown. + var timeout = TimeSpan.FromMinutes(10); + var timeoutTask = Task.Delay(Timeout.Infinite, this.forcefulShutdownTokenSource.Token); + this.forcefulShutdownTokenSource.CancelAfter(timeout); + await Task.WhenAny(this.partitionManagerTask, timeoutTask); + + if (timeoutTask.IsCompleted) + { + throw new TimeoutException( + $"Timed-out waiting for the partition manager to shut down. Timeout duration: {timeout}", + timeoutTask.Exception?.InnerException); + } + + // Surface any unhandled exceptions + await this.partitionManagerTask; + + this.settings.Logger.PartitionManagerInfo( + this.storageAccountName, + this.settings.TaskHubName, + this.settings.WorkerId, + partitionId: NotApplicable, + "Table partition manager stopped successfully."); + } + + async Task IPartitionManager.CreateLeaseStore() + { + await this.partitionTable.CreateIfNotExistsAsync(); + } + + async Task IPartitionManager.CreateLease(string partitionId) + { + try + { + var newPartitionEntry = new TableLease { RowKey = partitionId }; + await this.partitionTable.AddEntityAsync(newPartitionEntry); + + this.settings.Logger.PartitionManagerInfo( + this.storageAccountName, + this.settings.TaskHubName, + this.settings.WorkerId, + partitionId, + "Successfully added the partition to the partition table."); + } + catch (RequestFailedException e) when (e.Status == 409 /* The specified entity already exists. */) + { + this.settings.Logger.PartitionManagerInfo( + this.storageAccountName, + this.settings.TaskHubName, + this.settings.WorkerId, + partitionId, + "This partition already exists in the partition table."); + } + } + + Task> IPartitionManager.GetOwnershipBlobLeases() + { + throw new NotImplementedException("This method is not implemented in the TablePartitionManager"); + } + + /// + /// Used for internal testing. + /// + internal IEnumerable GetTableLeases() + { + return this.partitionTable.Query(); + } + + Task IPartitionManager.DeleteLeases() + { + return this.partitionTable.DeleteAsync(); + } + + sealed class TableLeaseManager + { + readonly string workerName; + readonly AzureStorageOrchestrationService service; + readonly AzureStorageOrchestrationServiceSettings settings; + readonly TableClient partitionTable; + readonly string storageAccountName; + readonly Dictionary backgroundDrainTasks; + readonly LeaseCollectionBalancerOptions options; + + public TableLeaseManager( + TableClient table, + AzureStorageOrchestrationService service, + AzureStorageOrchestrationServiceSettings settings, + string storageAccountName, + LeaseCollectionBalancerOptions options) + { + this.partitionTable = table; + this.service = service; + this.settings = settings; + this.storageAccountName = storageAccountName; + this.workerName = this.settings.WorkerId; + this.backgroundDrainTasks = new Dictionary(); + this.options = options; + } + + /// + /// Reads the partition table to determine the tasks the worker should do. Used by the PartitionManagerLoop. + /// During the iteration, the worker will first claim any available partitions with method `TryClaimLease`. + /// Subsequently, if the partition is owned by this worker, it will proceed with method `CheckOwnershipLease`. + /// However, if the partition is owned by other workers, it will utilize method `CheckOtherWorkerLease`. + /// If the shutdown is requested, then stop regular claim and balance process, and call method `TryDrainAndReleaseAllPartitions` to release all ownership leases. + /// + /// Indicates that the partition manager is in the process of shutting down. + /// Cancellation of this token forcefully aborts the partition manager loop. + /// The incidates whether the worker is waiting to claim a stolen lease from other workers or working on releasing any ownership leases. + /// will be thrown if failed to update the partition table. Partition Manager loop will catch it and re-read the table to get the latest information. + public async Task ReadAndWriteTableAsync(bool isShuttingDown, CancellationToken forcefulShutdownToken) + { + var response = new ReadTableReponse(); + + IReadOnlyList partitions = this.partitionTable.Query(cancellationToken: forcefulShutdownToken).ToList(); + var partitionDistribution = new Dictionary>(); + int ownershipLeaseCount = 0; + + foreach (TableLease partition in partitions) + { + // In a worker becomes unhealthy, it may lose a lease without realizing it and continue listening + // for messages. We check for that case here and stop dequeuing messages if we discover that + // another worker currently owns the lease. + this.service.DropLostControlQueue(partition); + + bool claimedLease = false; + bool stoleLease = false; + bool renewedLease = false; + bool drainedLease = false; + bool releasedLease = false; + ETag etag = partition.ETag; + + // String previousOwner is for the steal process logs. Only used for stealing leases of any worker which is in shutdown process in this loop. + string previousOwner = partition.CurrentOwner ?? this.workerName; + + if (!isShuttingDown) + { + claimedLease = this.TryClaimLease(partition); + + this.CheckOtherWorkersLeases( + partition, + partitionDistribution, + response, + ref ownershipLeaseCount, + ref previousOwner, + ref stoleLease); + + this.RenewOrReleaseMyLease( + partition, + response, + ref ownershipLeaseCount, + ref releasedLease, + ref drainedLease, + ref renewedLease); + } + else + { + // If shutdown is requested, we drain and release all ownership partitions. + this.TryDrainAndReleaseAllPartitions( + partition, + response, + ref ownershipLeaseCount, + ref releasedLease, + ref drainedLease, + ref renewedLease); + } + + // Save updates to the partition entity if the lease is claimed, stolen, renewed, drained or released. + if (claimedLease || stoleLease || renewedLease || drainedLease || releasedLease) + { + try + { + await this.partitionTable.UpdateEntityAsync(partition, etag, TableUpdateMode.Replace, forcefulShutdownToken); + } + catch (RequestFailedException ex) when (ex.Status == 412) + { + this.settings.Logger.PartitionManagerInfo( + this.storageAccountName, + this.settings.TaskHubName, + this.settings.WorkerId, + partition.RowKey, + $"Failed to update table entry due to an Etag mismatch. Failed ETag value: '{etag}'."); + throw; + } + + if (claimedLease) + { + // Notify the orchestration session manager that we acquired a lease for one of the partitions. + // This will cause it to start reading control queue messages for that partition. + await this.service.OnTableLeaseAcquiredAsync(partition); + } + + this.LogHelper(partition, claimedLease, stoleLease, renewedLease, drainedLease, releasedLease, previousOwner); + } + } + + // Separately from lease acquisition/renewal, make sure the partitions are evenly balanced across workers. + await this.BalanceLeasesAsync(partitionDistribution, partitions, ownershipLeaseCount, response, forcefulShutdownToken); + + // If shutdown is requested and the worker releases all ownership leases, then set the response.IsReleasesAllLease to true to notify the partitionManagerLoop to stop. + if (isShuttingDown) + { + response.ReleasedAllLeases = ownershipLeaseCount == 0; + } + + return response; + } + + + /// + /// Checks to see if a lease is available to be claimed by the current worker. + /// + /// The partition to check. + bool TryClaimLease(TableLease partition) + { + //Check if the partition is empty, expired or stolen by the current worker and claim it. + bool isEmptyLease = partition.CurrentOwner == null && partition.NextOwner == null; + bool isExpired = DateTime.UtcNow >= partition.ExpiresAt; + bool isStolenByMe = partition.CurrentOwner == null && partition.NextOwner == this.workerName; + + bool isLeaseAvailable = isEmptyLease || isExpired || isStolenByMe; + if (isLeaseAvailable) + { + this.ClaimLease(partition); + return true; + } + + return false; + } + + // Check ownership lease. + // If the lease is not stolen by others, renew it. + // If the lease is stolen by others, check if starts drainning or if finishes drainning. + void RenewOrReleaseMyLease( + TableLease partition, + ReadTableReponse response, + ref int ownershipLeaseCount, + ref bool releasedLease, + ref bool drainedLease, + ref bool renewedLease) + { + if (partition.CurrentOwner != this.workerName) + { + // We don't own this lease, so nothing for us to do here. + return; + } + + if (partition.NextOwner == null) + { + // We still own the lease and nobody is trying to steal it. + ownershipLeaseCount++; + this.RenewLease(partition); + renewedLease = true; + } + else + { + // Somebody is trying to steal the lease. Start draining so that we can release it. + response.IsDrainingPartition = true; + this.CheckDrainTask( + partition, + ref releasedLease, + ref renewedLease, + ref drainedLease); + } + } + + // If the lease is other worker's lease. Store it to the dictionary for future balance. + // If the other worker is shutting down, steal the lease. + void CheckOtherWorkersLeases( + TableLease partition, + Dictionary> partitionDistribution, + ReadTableReponse response, + ref int ownershipLeaseCount, + ref string previousOwner, + ref bool stoleLease) + { + bool isOtherWorkersLease = partition.CurrentOwner != this.workerName && partition.NextOwner == null && partition.IsDraining == false; + bool isOtherWorkerStealingLease = partition.CurrentOwner != this.workerName && partition.NextOwner != null; + bool isOwnerShuttingDown = partition.CurrentOwner != this.workerName && partition.NextOwner == null && partition.IsDraining == true; + + string owner; + + // If the lease is other worker's current lease, add partition to the dictionary with CurrentOwner as key. + if (isOtherWorkersLease) + { + owner = partition.CurrentOwner!; + AddToDictionary(partition, partitionDistribution, owner); + } + + // If other worker's lease is stolen, assume the lease tranfer will finish successfully and add partition to the dictionary with NextOwner as key. + if (isOtherWorkerStealingLease) + { + owner = partition.NextOwner!; + + // If the lease was stolen by _this_ worker, we increase its currently owned lease count. + if (owner == this.workerName) + { + ownershipLeaseCount++; + response.IsWaitingForPartitionRelease = true; + } + // If the lease is stolen by another worker, keep track of it for rebalancing purposes. + else + { + AddToDictionary(partition, partitionDistribution, owner); + } + } + + // If the lease belongs to a worker that is shutting down, and it has not been stolen yet, steal it. + if (isOwnerShuttingDown) + { + previousOwner = partition.CurrentOwner!; + this.StealLease(partition); + stoleLease = true; + response.IsWaitingForPartitionRelease = true; + } + } + + // Method for draining and releasing all ownership partitions. + // This method will only be called when shutdown is requested. + void TryDrainAndReleaseAllPartitions( + TableLease partition, + ReadTableReponse response, + ref int ownershipLeaseCount, + ref bool releasedLease, + ref bool drainedLease, + ref bool renewedLease) + { + response.IsDrainingPartition = true; + + if (partition.CurrentOwner != this.workerName) + { + // If the lease is not owned by this worker, we don't need to drain it. + return; + } + + ownershipLeaseCount++; + this.CheckDrainTask( + partition, + ref releasedLease, + ref renewedLease, + ref drainedLease); + + if (releasedLease) + { + ownershipLeaseCount--; + } + } + + // This method performs a worker-level partition rebalancing process. + // The idea is to calculate the expected number of leases per worker in a fully balanced scenario + // (a.k.a their partition quota) and for each worker to steal partitions from others + // until they have met their quota. + // A few remarks: + // (1) The quota of partitions per worker is the number of partitions divided by the number of workers. If these two number can not be evenly divided, then the difference between quota of partitions per workers should not exceed one. + // (2) Workers only steal from workers that have exceeded their quota + // An exception will be thrown if the table update fails due to an outdated ETag so that the worker can re-read the table again to get the latest information. + // Any other exceptions will be captured through logs. + async Task BalanceLeasesAsync( + Dictionary> partitionDistribution, + IReadOnlyList partitions, + int ownershipLeaseCount, + ReadTableReponse response, + CancellationToken forceShutdownToken) + { + if (partitionDistribution.Count == 0) + { + // No partitions to be balanced. + return; + } + + int averageLeasesCount = partitions.Count / (partitionDistribution.Count + 1); + if (averageLeasesCount < ownershipLeaseCount) + { + // Already have enough leases. Return since there is no need to steal other workers' partitions. + return; + } + + // If this worker does not own enough partitions, search for leases to steal + foreach (IReadOnlyList ownedPartitions in partitionDistribution.Values) + { + int numLeasesToSteal = averageLeasesCount - ownershipLeaseCount; + if (numLeasesToSteal < 0) + { + // The current worker already has enough partitions. + break; + } + + // Only steal leases from takshub workers that own more leases than average. + // If a given task hub worker's lease count is less or equal to the average, skip it. + int numExcessiveLease = ownedPartitions.Count - averageLeasesCount; + if (numExcessiveLease <= 0) + { + continue; + } + + // The balancing condition requires that the differences in the number of leases assigned to each worker should not exceed 1, if the total number of partitions is not evenly divisible by the number of active workers. + // Thus, the maximum number of leases a worker can own is the average number of leases per worker plus one in this case. + // If a worker has more than one lease difference than average and _this_ worker has not reached the maximum, it should steal an additional lease. + if (numLeasesToSteal == 0 && numExcessiveLease > 1) + { + numLeasesToSteal = 1; + } + + numLeasesToSteal = Math.Min(numLeasesToSteal, numExcessiveLease); + for (int i = 0; i < numLeasesToSteal; i++) + { + ownershipLeaseCount++; + TableLease partition = ownedPartitions[i]; + ETag etag = partition.ETag; + string previousOwner = partition.CurrentOwner!; + this.StealLease(partition); + + try + { + await this.partitionTable.UpdateEntityAsync( + partition, + etag, + TableUpdateMode.Replace, + forceShutdownToken); + + this.settings.Logger.LeaseStealingSucceeded( + this.storageAccountName, + this.settings.TaskHubName, + this.settings.WorkerId, + previousOwner, + leaseType: NotApplicable, + partitionId: partition.RowKey); + + response.IsWaitingForPartitionRelease = true; + } + catch (RequestFailedException ex) when (ex.Status == 412 /* ETag conflict */) + { + this.settings.Logger.PartitionManagerInfo( + this.storageAccountName, + this.settings.TaskHubName, + this.settings.WorkerId, + partition.RowKey, + $"Failed to update table entry due to an Etag mismatch. Failed ETag value: '{etag}'"); + + // Another worker already modified this partition entry. Let the exception bubble up to the main + // loop, which will re-read the table immediately to get the latest updates. + throw; + } + catch (Exception exception) + { + // Eat any exceptions during lease stealing because we want to keep iterating through the partition list. + this.settings.Logger.PartitionManagerWarning( + this.storageAccountName, + this.settings.TaskHubName, + this.settings.WorkerId, + partition.RowKey, + $"Unexpected error occurred in stealing partition lease: {exception}"); + } + } + } + } + + void ClaimLease(TableLease lease) + { + lease.CurrentOwner = this.workerName; + lease.NextOwner = null; + lease.OwnedSince = DateTime.UtcNow; + lease.LastRenewal = DateTime.UtcNow; + lease.ExpiresAt = DateTime.UtcNow.AddMinutes(1); + lease.IsDraining = false; + } + + void DrainPartition(TableLease lease, CloseReason reason) + { + Task task = this.service.DrainTablePartitionAsync(lease, reason); + string partitionId = lease.RowKey!; + this.backgroundDrainTasks.Add(partitionId, task); + lease.IsDraining = true; + } + + void ReleaseLease(TableLease lease) + { + lease.IsDraining = false; + lease.CurrentOwner = null; + } + + void RenewLease(TableLease lease) + { + lease.LastRenewal = DateTime.UtcNow; + lease.ExpiresAt = DateTime.UtcNow.Add(this.options.LeaseInterval); + } + + void StealLease(TableLease lease) + { + lease.NextOwner = this.workerName; + } + + // Log operations on partition table. + void LogHelper( + TableLease partition, + bool claimedLease, + bool stoleLease, + bool renewedLease, + bool drainedLease, + bool releasedLease, + string previousOwner) + { + if (claimedLease) + { + this.settings.Logger.LeaseAcquisitionSucceeded( + this.storageAccountName, + this.settings.TaskHubName, + this.settings.WorkerId, + partitionId: partition.RowKey, + leaseType: NotApplicable); + } + if (stoleLease) + { + this.settings.Logger.LeaseStealingSucceeded( + this.storageAccountName, + this.settings.TaskHubName, + this.settings.WorkerId, + previousOwner, + leaseType: NotApplicable, + partitionId: partition.RowKey); + } + if (releasedLease) + { + this.settings.Logger.LeaseRemoved( + this.storageAccountName, + this.settings.TaskHubName, + this.settings.WorkerId, + partition.RowKey, + token: NotApplicable, + leaseType: NotApplicable); + } + if (drainedLease) + { + this.settings.Logger.PartitionManagerInfo( + this.storageAccountName, + this.settings.TaskHubName, + this.settings.WorkerId, + partitionId: partition.RowKey, + details: "Draining partition"); + } + if (renewedLease) + { + this.settings.Logger.LeaseRenewalResult( + this.storageAccountName, + this.settings.TaskHubName, + this.settings.WorkerId, + partitionId: partition.RowKey, + success: true, + token: NotApplicable, + leaseType: NotApplicable, + details: "Successfully renewed partition lease"); + } + } + + // Track partition distribution in the partitionDistribution dictionary. We use this information when balancing partitions. + static void AddToDictionary(TableLease partition, Dictionary> partitionDistribution, string owner) + { + if (partitionDistribution.TryGetValue(owner, out List ownedPartitions)) + { + ownedPartitions.Add(partition); + } + else + { + partitionDistribution.Add(owner, new List { partition }); + } + } + + // Method for checking status of draining process. + // In case operations on dictionary and isDraining are out of sync, always check first if we have any drain tasks in the dictionary. + void CheckDrainTask( + TableLease partition, + ref bool releasedLease, + ref bool renewedLease, + ref bool drainedLease) + { + // Check if drain process has started. + if (this.backgroundDrainTasks.TryGetValue(partition.RowKey!, out Task? drainTask)) + { + // Check if draining process has finished. If so, release the lease. + if (drainTask.IsCompleted) + { + this.settings.Logger.PartitionManagerInfo( + this.storageAccountName, + this.settings.TaskHubName, + this.settings.WorkerId, + partition.RowKey, + details: "Successfully drained partition. Lease will be released."); + + this.backgroundDrainTasks.Remove(partition.RowKey!); + this.ReleaseLease(partition); + releasedLease = true; + drainTask.GetAwaiter().GetResult(); // Surface any exceptions from the drain process. + return; + } + else// If draining process is stillongoing, we keep renewing the lease to prevent it from expiring + { + this.RenewLease(partition); + renewedLease = true; + } + } + else// If drain task hasn't been started yet, start it and keep renewing the lease to prevent it from expiring. + { + this.DrainPartition(partition, CloseReason.Shutdown); + this.RenewLease(partition); + renewedLease = true; + drainedLease = true; + } + } + } + + /// + /// The Response class describes the behavior of the TableLeaseManager's ReadAndWrite method. + /// If the worker is draining (i.e working to release its leases), the method sets the IsDrainingPartition flag to true. + /// If the worker is going to acquire another lease from another worker, it sets the WaitForPartition flag to true. + /// When either of these flags is true, the sleep time of the worker changes to 1 second. + /// + class ReadTableReponse + { + /// + /// True if the worker is working on release lease. + /// + public bool IsDrainingPartition { get; set; } = false; + + /// + /// True if the worker is waiting for a lease to be released. + /// + public bool IsWaitingForPartitionRelease { get; set; } = false; + + /// + /// True if the worker successfully released all ownership leases for shutdown. + /// + public bool ReleasedAllLeases { get; set; } = false; + } + + // used for internal testing + internal void SimulateUnhealthyWorker(CancellationToken testToken) + { + _ = this.PartitionManagerLoop( + gracefulShutdownToken: testToken, + forcefulShutdownToken: CancellationToken.None); + } + + // used for internal testing + internal void KillLoop() + { + this.forcefulShutdownTokenSource.Cancel(); + } + + public void Dispose() + { + this.gracefulShutdownTokenSource.Dispose(); + this.forcefulShutdownTokenSource.Dispose(); + } + } +} diff --git a/test/DurableTask.AzureStorage.Tests/AzureStorageScaleTests.cs b/test/DurableTask.AzureStorage.Tests/AzureStorageScaleTests.cs index dcc729fc7..5bedfdd51 100644 --- a/test/DurableTask.AzureStorage.Tests/AzureStorageScaleTests.cs +++ b/test/DurableTask.AzureStorage.Tests/AzureStorageScaleTests.cs @@ -38,6 +38,32 @@ namespace DurableTask.AzureStorage.Tests [TestClass] public class AzureStorageScaleTests { + public enum PartitionManagerType + { + V1Legacy, + V2Safe, + V3Table + } + + void SetPartitionManagerType(AzureStorageOrchestrationServiceSettings settings, PartitionManagerType partitionManagerType) + { + switch(partitionManagerType) + { + case PartitionManagerType.V1Legacy: + settings.UseTablePartitionManagement = false; + settings.UseLegacyPartitionManagement = true; + break; + case PartitionManagerType.V2Safe: + settings.UseTablePartitionManagement = false; + settings.UseLegacyPartitionManagement = false; + break; + case PartitionManagerType.V3Table: + settings.UseTablePartitionManagement = true; + settings.UseLegacyPartitionManagement = false; + break; + } + } + /// /// Basic validation of task hub creation. /// @@ -61,7 +87,7 @@ async Task EnsureTaskHubAsync( bool testDeletion, bool deleteBeforeCreate = true, string workerId = "test", - bool useLegacyPartitionManagement = false) + PartitionManagerType partitionManagerType = PartitionManagerType.V2Safe) { string storageConnectionString = TestHelpers.GetTestStorageAccountConnectionString(); var storageAccount = CloudStorageAccount.Parse(storageConnectionString); @@ -72,9 +98,10 @@ async Task EnsureTaskHubAsync( TaskHubName = taskHubName, StorageConnectionString = storageConnectionString, WorkerId = workerId, - AppName = testName, - UseLegacyPartitionManagement = useLegacyPartitionManagement, + AppName = testName }; + this.SetPartitionManagerType(settings, partitionManagerType); + Trace.TraceInformation($"Task Hub name: {taskHubName}"); @@ -205,9 +232,9 @@ public async Task> ListBlobsAsync(CloudBlobDirectory client) /// REQUIREMENT: No two workers will ever process the same control queue. /// [TestMethod] - [DataRow(true, 30)] - [DataRow(false, 180)] - public async Task MultiWorkerLeaseMovement(bool useLegacyPartitionManagement, int timeoutInSeconds) + [DataRow(PartitionManagerType.V1Legacy, 30)] + [DataRow(PartitionManagerType.V2Safe, 180)] + public async Task MultiWorkerLeaseMovement(PartitionManagerType partitionManagerType, int timeoutInSeconds) { const int MaxWorkerCount = 4; @@ -228,7 +255,9 @@ public async Task MultiWorkerLeaseMovement(bool useLegacyPartitionManagement, in testDeletion: false, deleteBeforeCreate: i == 0, workerId: workerIds[i], - useLegacyPartitionManagement: useLegacyPartitionManagement); + partitionManagerType: partitionManagerType + ); + await services[i].StartAsync(); currentWorkerCount++; } @@ -416,6 +445,7 @@ public async Task PartitionLost_AbandonPrefetchedSession() StorageConnectionString = TestHelpers.GetTestStorageAccountConnectionString(), ControlQueueBufferThreshold = 100, }; + this.SetPartitionManagerType(settings, PartitionManagerType.V2Safe); // STEP 1: Start up the service and queue up a large number of messages var service = new AzureStorageOrchestrationService(settings);