Skip to content

Commit

Permalink
Azure Storage Partition Manager V3 (#899)
Browse files Browse the repository at this point in the history
* update

* remove unnecessart pkg

* update test

* update drain process && test storage connection

* update commit

* update scaletests pm setting

* udpate commit

* update commit

* Update AzureStorageScaleTests.cs

* test UseTablePartitionManager setting

* remove typo

* change UseTablePartitionManager = true

* update null reference

* add control queue in AzureOrchestratorService

* update commit

* Update AzureStorageScaleTests.cs

* commit KillThreeWorker tests

* update GetControlQueueAsync

* update partition table name setting

* update log && commits && add new method to check alignment with owned queues and ownership lease

* remove typo and sort using

* update commit

* add tests

* update according to commits

* change test connection setting

* update according to reviews

* mark one test disabled for now cause it's flaky

* Update DurableTask.AzureStorage.csproj

* Update DurableTask.AzureStorage.csproj

* reconstruct ReadAndWriteTable method to be more readable

* udpate connection string setting

* update connection string setting

* update test according to commits and update connection string setting

* update connection string setting

* update logs

* update typo

* update tests

* update catch exceptions and logs

* remove typo

* update lease balancer and log

* remove unused package

* update commits

* Update AzureStorageScaleTests.cs for commits

* update commit

* update commits: drain process

* update checkownershiplease from void to task

* mark test disabledinci

* update according to commit

* remove test version

* Suggested edits for table partition manager (#921)

* update shutdown process to avoid race condition

* update shutdown process into one loop to avoid race condition

* add commit

* More suggested edits (#922)

* More suggested edits

* Sync'd with latest and added additional refactoring

* Adding a few corrections to my previous commit

* add new method CheckDrainTask to avoid redundant code

* update commit

* update shutdown wait time && KillLoop

* update commit

* Update AppLeaseManager.cs

Remove unrelated file using change.

* update AzureStorage version

* update AzureStorage version

* update commit for AcquireInterval

* update commit for AcquireInterval

* update commit

---------

Co-authored-by: Chris Gillum <[email protected]>
  • Loading branch information
nytian and cgillum authored Jul 10, 2023
1 parent 5fbfbfb commit 1bb5c8f
Show file tree
Hide file tree
Showing 9 changed files with 1,867 additions and 24 deletions.
721 changes: 721 additions & 0 deletions Test/DurableTask.AzureStorage.Tests/TestTablePartitionManager.cs

Large diffs are not rendered by default.

85 changes: 77 additions & 8 deletions src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ namespace DurableTask.AzureStorage
using DurableTask.Core.History;
using DurableTask.Core.Query;
using Microsoft.WindowsAzure.Storage;
using Newtonsoft.Json;
using Microsoft.WindowsAzure.Storage.Table;

/// <summary>
/// Orchestration service provider for the Durable Task Framework which uses Azure Storage as the durable store.
Expand Down Expand Up @@ -151,7 +151,17 @@ public AzureStorageOrchestrationService(AzureStorageOrchestrationServiceSettings
this.stats,
this.trackingStore);

if (this.settings.UseLegacyPartitionManagement)
if (this.settings.UseTablePartitionManagement && this.settings.UseLegacyPartitionManagement)
{
throw new ArgumentException("Cannot use both TablePartitionManagement and LegacyPartitionManagement. For improved reliability, consider using the TablePartitionManager.");
}
else if (this.settings.UseTablePartitionManagement)
{
this.partitionManager = new TablePartitionManager(
this,
this.azureStorageClient);
}
else if (this.settings.UseLegacyPartitionManagement)
{
this.partitionManager = new LegacyPartitionManager(
this,
Expand Down Expand Up @@ -503,18 +513,62 @@ internal async Task OnOwnershipLeaseAquiredAsync(BlobLease lease)
this.allControlQueues[lease.PartitionId] = controlQueue;
}

internal void DropLostControlQueue(TableLease partition)
{
// If lease is lost but we're still dequeuing messages, remove the queue
if (this.allControlQueues.TryGetValue(partition.RowKey, out ControlQueue controlQueue) &&
this.OwnedControlQueues.Contains(controlQueue) &&
partition.CurrentOwner != this.settings.WorkerId)
{
this.orchestrationSessionManager.RemoveQueue(partition.RowKey, CloseReason.LeaseLost, nameof(DropLostControlQueue));
}
}

internal Task OnOwnershipLeaseReleasedAsync(BlobLease lease, CloseReason reason)
{
this.orchestrationSessionManager.RemoveQueue(lease.PartitionId, reason, "Ownership LeaseCollectionBalancer");
return Utils.CompletedTask;
}

internal async Task OnTableLeaseAcquiredAsync(TableLease lease)
{
var controlQueue = new ControlQueue(this.azureStorageClient, lease.RowKey, this.messageManager);
await controlQueue.CreateIfNotExistsAsync();
this.orchestrationSessionManager.AddQueue(lease.RowKey, controlQueue, this.shutdownSource.Token);

this.allControlQueues[lease.RowKey] = controlQueue;
}

internal async Task DrainTablePartitionAsync(TableLease lease, CloseReason reason)
{
using var cts = new CancellationTokenSource(delay: TimeSpan.FromSeconds(60));
await this.orchestrationSessionManager.DrainAsync(lease.RowKey, reason, cts.Token, nameof(DrainTablePartitionAsync));
}

// Used for testing
internal Task<IEnumerable<BlobLease>> ListBlobLeasesAsync()
{
return this.partitionManager.GetOwnershipBlobLeases();
}

// Used for table partition manager testing
internal IEnumerable<TableLease> ListTableLeases()
{
return ((TablePartitionManager)this.partitionManager).GetTableLeases();
}

// Used for table partition manager testing.
internal void SimulateUnhealthyWorker(CancellationToken testToken)
{
((TablePartitionManager)this.partitionManager).SimulateUnhealthyWorker(testToken);
}

// Used for table partition manager testing
internal void KillPartitionManagerLoop()
{
((TablePartitionManager)this.partitionManager).KillLoop();
}

internal static async Task<Queue[]> GetControlQueuesAsync(
AzureStorageClient azureStorageClient,
int defaultPartitionCount)
Expand All @@ -526,14 +580,29 @@ internal static async Task<Queue[]> GetControlQueuesAsync(

string taskHub = azureStorageClient.Settings.TaskHubName;

BlobLeaseManager inactiveLeaseManager = GetBlobLeaseManager(azureStorageClient, "inactive");
// Need to check for leases in Azure Table Storage. Scale Controller calls into this method.
int partitionCount;
Table partitionTable = azureStorageClient.GetTableReference(azureStorageClient.Settings.PartitionTableName);

// Check if table partition manager is used. If so, get partition count from table.
// Else, get the partition count from the blobs.
if (await partitionTable.ExistsAsync())
{
TableEntitiesResponseInfo<DynamicTableEntity> result = await partitionTable.ExecuteQueryAsync(new TableQuery<DynamicTableEntity>());
partitionCount = result.ReturnedEntities.Count;
}
else
{
BlobLeaseManager inactiveLeaseManager = GetBlobLeaseManager(azureStorageClient, "inactive");

TaskHubInfo hubInfo = await inactiveLeaseManager.GetOrCreateTaskHubInfoAsync(
GetTaskHubInfo(taskHub, defaultPartitionCount),
checkIfStale: false);
TaskHubInfo hubInfo = await inactiveLeaseManager.GetOrCreateTaskHubInfoAsync(
GetTaskHubInfo(taskHub, defaultPartitionCount),
checkIfStale: false);
partitionCount = hubInfo.PartitionCount;
};

var controlQueues = new Queue[hubInfo.PartitionCount];
for (int i = 0; i < hubInfo.PartitionCount; i++)
var controlQueues = new Queue[partitionCount];
for (int i = 0; i < partitionCount; i++)
{
controlQueues[i] = azureStorageClient.GetQueueReference(GetControlQueueName(taskHub, i));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,14 @@
namespace DurableTask.AzureStorage
{
using System;
using DurableTask.AzureStorage.Partitioning;
using System.Runtime.Serialization;
using System.Threading.Tasks;
using DurableTask.AzureStorage.Logging;
using DurableTask.AzureStorage.Partitioning;
using DurableTask.Core;
using Microsoft.Extensions.Logging;
using Microsoft.WindowsAzure.Storage.Queue;
using Microsoft.WindowsAzure.Storage.Table;
using System.Runtime.Serialization;
using System.Threading.Tasks;

/// <summary>
/// Settings that impact the runtime behavior of the <see cref="AzureStorageOrchestrationService"/>.
Expand Down Expand Up @@ -160,8 +160,9 @@ public class AzureStorageOrchestrationServiceSettings
public TimeSpan LeaseAcquireInterval { get; set; } = TimeSpan.FromSeconds(10);

/// <summary>
/// Interval for which the lease is taken on Azure Blob representing a task hub partition. If the lease is not renewed within this
/// interval, it will cause it to expire and ownership of the partition will move to another worker instance.
/// Interval for which the lease is taken on Azure Blob representing a task hub partition in partition manager V1 (legacy partition manager) and V2 (safe partition manager).
/// The amount of time that a lease expiration deadline is extended on a renewal in partition manager V3 (table partition manager).
/// If the lease is not renewed within this within this timespan, it will expire and ownership of the partition may move to another worker.
/// </summary>
public TimeSpan LeaseInterval { get; set; } = TimeSpan.FromSeconds(30);

Expand Down Expand Up @@ -208,6 +209,11 @@ public class AzureStorageOrchestrationServiceSettings
/// </summary>
public bool UseLegacyPartitionManagement { get; set; } = false;

/// <summary>
/// Use the newer Azure Tables-based partition manager instead of the older Azure Blobs-based partition manager. The default value is <c>false</c>.
/// </summary>
public bool UseTablePartitionManagement { get; set; } = false;

/// <summary>
/// User serialization that will respect <see cref="IExtensibleDataObject"/>. Default is false.
/// </summary>
Expand Down Expand Up @@ -263,6 +269,7 @@ public class AzureStorageOrchestrationServiceSettings
internal string HistoryTableName => this.HasTrackingStoreStorageAccount ? $"{this.TrackingStoreNamePrefix}History" : $"{this.TaskHubName}History";

internal string InstanceTableName => this.HasTrackingStoreStorageAccount ? $"{this.TrackingStoreNamePrefix}Instances" : $"{this.TaskHubName}Instances";
internal string PartitionTableName => $"{this.TaskHubName}Partitions";

/// <summary>
/// Gets an instance of <see cref="LogHelper"/> that can be used for writing structured logs.
Expand Down
8 changes: 6 additions & 2 deletions src/DurableTask.AzureStorage/DurableTask.AzureStorage.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
<!-- Version Info -->
<PropertyGroup>
<MajorVersion>1</MajorVersion>
<MinorVersion>13</MinorVersion>
<PatchVersion>8</PatchVersion>
<MinorVersion>14</MinorVersion>
<PatchVersion>0</PatchVersion>

<VersionPrefix>$(MajorVersion).$(MinorVersion).$(PatchVersion)</VersionPrefix>
<FileVersion>$(VersionPrefix).0</FileVersion>
Expand All @@ -45,6 +45,10 @@
<PackageReference Include="WindowsAzure.Storage" version="9.3.1" />
</ItemGroup>

<ItemGroup>
<PackageReference Include="Azure.Data.Tables" Version="12.8.0" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\DurableTask.Core\DurableTask.Core.csproj" />
</ItemGroup>
Expand Down
39 changes: 39 additions & 0 deletions src/DurableTask.AzureStorage/OrchestrationSessionManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,45 @@ async Task DequeueLoop(string partitionId, ControlQueue controlQueue, Cancellati
$"Stopped listening for messages on queue {controlQueue.Name}.");
}

/// <summary>
/// The drain process occurs when the lease is stolen or the worker is shutting down,
/// prompting the worker to cease listening for new messages and to finish processing all the existing information in memory.
/// </summary>
/// <param name="partitionId">The partition that is going to released.</param>
/// <param name="reason">Reason to trigger the drain progres.</param>
/// <param name="cancellationToken">Cancel the drain process if it takes too long in case the worker is unhealthy.</param>
/// <param name="caller">The worker that calls this method.</param>
/// <returns></returns>
public async Task DrainAsync(string partitionId, CloseReason reason, CancellationToken cancellationToken, string caller)
{
// Start the drain process, mark the queue released to stop listening for new message
this.ReleaseQueue(partitionId, reason, caller);
try
{
// Wait until all messages from this queue have been processed.
while (!cancellationToken.IsCancellationRequested && this.IsControlQueueProcessingMessages(partitionId))
{
await Task.Delay(500, cancellationToken);
}
}
catch (OperationCanceledException)
{
this.settings.Logger.PartitionManagerError(
this.storageAccountName,
this.settings.TaskHubName,
this.settings.WorkerId,
partitionId,
$"Timed-out waiting for the partition to finish draining."
);
}
finally
{
// Remove the partition from memory
this.RemoveQueue(partitionId, reason, caller);
}
}


/// <summary>
/// This method enumerates all the provided queue messages looking for ExecutionStarted messages. If any are found, it
/// queries table storage to ensure that each message has a matching record in the Instances table. If not, this method
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,27 @@ class LeaseCollectionBalancerOptions
/// <summary>
/// Renew interval for all leases for partitions currently held by this instance.
/// </summary>
/// <remarks>
/// The table partition manager does not use this setting. It instead relies on <see cref="AcquireInterval"/>
/// to determine the interval for renewing partition leases.
/// </remarks>
public TimeSpan RenewInterval { get; set; } = TimeSpan.FromSeconds(10);

/// <summary>
/// Interval when this instance kicks off a task to compute if partitions are distributed evenly
/// among known host instances.
/// </summary>
/// <remarks>
/// When using the table partition manager, this property sets the frequency at which the
/// worker reads and updates the partition table except in the following two scenarios:
/// (1) If the worker fails to update the partition table, then the partitions table is read immediately.
/// (2) If the worker is waiting for a partition to be released or is working on releasing a partition, then the interval becomes 1 second.
/// </remarks>
public TimeSpan AcquireInterval { get; set; } = TimeSpan.FromSeconds(10);

/// <summary>
/// Interval for which the lease is taken on Azure Blob representing an EventHub partition. If the lease is not renewed within this
/// interval, it will cause it to expire and ownership of the partition will move to another instance.
/// Interval for which the lease is taken. If the lease is not renewed within this
/// interval, the lease will expire and ownership of the partition will move to another instance.
/// </summary>
public TimeSpan LeaseInterval { get; set; } = TimeSpan.FromSeconds(30);

Expand Down
80 changes: 80 additions & 0 deletions src/DurableTask.AzureStorage/Partitioning/TableLease.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
// ----------------------------------------------------------------------------------
// Copyright Microsoft Corporation
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// ----------------------------------------------------------------------------------

#nullable enable
namespace DurableTask.AzureStorage.Partitioning
{
using System;
using Azure;
using Azure.Data.Tables;

/// <summary>
/// The partition lease used by the table partition manager.
/// </summary>
public class TableLease : ITableEntity
{
// Constant partition key value used for all rows in the partition table. The actual value doesn't matter
// as long as all entries use the same partition key value.
internal const string DefaultPartitionKey = "";

/// <summary>
/// Required atrribute of Azure.Data.Tables storage entity. It is always set to <see cref="DefaultPartitionKey"/>.
/// </summary>
public string PartitionKey { get; set; } = DefaultPartitionKey;

/// <summary>
/// The name of the partition/control queue.
/// </summary>
public string? RowKey { get; set; }

/// <summary>
/// The current owner of this lease.
/// </summary>
public string? CurrentOwner { get; set; }

/// <summary>
/// The name of the worker stealing this lease. It's null when no worker is actively stealing it.
/// </summary>
public string? NextOwner { get; set; }

/// <summary>
/// The timestamp at which the partition was originally acquired by this worker.
/// </summary>
public DateTime? OwnedSince { get; set; }

/// <summary>
/// The timestamp at which the partition was last renewed.
/// </summary>
public DateTime? LastRenewal { get; set; }

/// <summary>
/// The timestamp at which the partition lease expires.
/// </summary>
public DateTime? ExpiresAt { get; set; }

/// <summary>
/// True if the partition is being drained; False otherwise.
/// </summary>
public bool IsDraining { get; set; } = false;

/// <summary>
/// Required atrribute of Azure.Data.Tables storage entity. Not used.
/// </summary>
public DateTimeOffset? Timestamp { get; set; }

/// <summary>
/// Unique identifier used to version entities and ensure concurrency safety in Azure.Data.Tables.
/// </summary>
public ETag ETag { get; set; }
}
}
Loading

0 comments on commit 1bb5c8f

Please sign in to comment.