Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] add mitigation for misrouted messages #1068

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -318,5 +318,12 @@ internal LogHelper Logger
/// Consumers that require separate dispatch (such as the new out-of-proc v2 SDKs) must set this to true.
/// </summary>
public bool UseSeparateQueueForEntityWorkItems { get; set; } = false;

/// <summary>
/// Gets or sets whether or not misrouted queue message should be corrected, according to the instanceID hashing function.
/// This defaults to false, to avoid possible "infinite re-routing" of messages. Users may set this to true to try to recover
/// from accidentally changing the partitionCount of an existing TaskHub, which can lead to misrouted messages.
/// </summary>
public bool CorrectMisourtedMessages { get; set; } = false;
}
}
32 changes: 32 additions & 0 deletions src/DurableTask.AzureStorage/Messaging/ControlQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,32 @@ public ControlQueue(

protected override TimeSpan MessageVisibilityTimeout => this.settings.ControlQueueVisibilityTimeout;

private async Task correctMessagesIfMisourted(MessageData messageData)
{
// validate that the message came from the expected queue
string instanceId = messageData.TaskMessage.OrchestrationInstance.InstanceId;
uint partitionIndex = Fnv1aHashHelper.ComputeHash(instanceId) % (uint)this.settings.PartitionCount;
string expectedQueueOfOrigin = AzureStorageOrchestrationService.GetControlQueueName(this.settings.TaskHubName, (int)partitionIndex);

// route to the right queue if the user opted in to the mitigation.
// This assumes that all workers already have correct partitionCount configuration
// RISK - if different workers have different partitionCount values,
// this could lead to infinite re-routing of orchestrator messages.
if (expectedQueueOfOrigin != this.Name)
{
// obtain reference to expected queue (should have been created already)
var expectedQueue = this.azureStorageClient.GetQueueReference(expectedQueueOfOrigin);
await expectedQueue.CreateIfNotExistsAsync();

// place on correct queue
var originalMessage = messageData.OriginalQueueMessage;
await expectedQueue.AddMessageAsync(originalMessage, TimeSpan.FromMinutes(1));

// delete message from current queue
await this.storageQueue.DeleteMessageAsync(originalMessage);
}
}

public async Task<IReadOnlyList<MessageData>> GetMessagesAsync(CancellationToken cancellationToken)
{
using (var linkedCts = CancellationTokenSource.CreateLinkedTokenSource(this.releaseCancellationToken, cancellationToken))
Expand Down Expand Up @@ -108,6 +134,12 @@ await batch.ParallelForEachAsync(async delegate (QueueMessage queueMessage)
messageData = await this.messageManager.DeserializeQueueMessageAsync(
queueMessage,
this.storageQueue.Name);

if (this.settings.CorrectMisourtedMessages)
{
await correctMessagesIfMisourted(messageData);
return; // skip further processing of this message
}
}
catch (Exception e)
{
Expand Down