Skip to content

Commit

Permalink
Allow durability provider to disable status checks on RaiseEventAsync (
Browse files Browse the repository at this point in the history
…#2509)

* add a boolean flag to the durability provider to determine whether the status should be checked before raising the event.

* address PR feedback.

* modify the default behavior so that only the AzureStorageProvider is checking the status by default.

* fix the unit tests so they take into account the CheckStatusBeforeRaiseEvent configuration
  • Loading branch information
sebastianburckhardt authored Aug 9, 2023
1 parent 802f003 commit 0dff548
Show file tree
Hide file tree
Showing 7 changed files with 105 additions and 38 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -296,3 +296,4 @@ functions-extensions/

.vscode/
.ionide/
/src/WebJobs.Extensions.DurableTask/Microsoft.Azure.WebJobs.Extensions.DurableTask.xml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ public AzureStorageDurabilityProvider(

public override bool SupportsEntities => true;

public override bool CheckStatusBeforeRaiseEvent => true;

/// <summary>
/// The app setting containing the Azure Storage connection string.
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,9 @@ internal DurableClient(

string IDurableEntityClient.TaskHubName => this.TaskHubName;

private bool CheckStatusBeforeRaiseEvent
=> this.durableTaskOptions.ThrowStatusExceptionsOnRaiseEvent ?? this.durabilityProvider.CheckStatusBeforeRaiseEvent;

private IDurableClient GetDurableClient(string taskHubName, string connectionName)
{
if (string.Equals(this.TaskHubName, taskHubName, StringComparison.OrdinalIgnoreCase)
Expand Down Expand Up @@ -231,7 +234,7 @@ Task IDurableOrchestrationClient.RaiseEventAsync(string instanceId, string event
throw new ArgumentNullException(nameof(eventName));
}

return this.RaiseEventInternalAsync(this.client, this.TaskHubName, instanceId, eventName, eventData);
return this.RaiseEventInternalAsync(this.client, this.TaskHubName, instanceId, eventName, eventData, this.CheckStatusBeforeRaiseEvent);
}

/// <inheritdoc />
Expand All @@ -256,7 +259,7 @@ Task IDurableOrchestrationClient.RaiseEventAsync(string taskHubName, string inst
DurableClient durableClient = (DurableClient)this.GetDurableClient(taskHubName, connectionName);
TaskHubClient taskHubClient = durableClient.client;

return this.RaiseEventInternalAsync(taskHubClient, taskHubName, instanceId, eventName, eventData);
return this.RaiseEventInternalAsync(taskHubClient, taskHubName, instanceId, eventName, eventData, this.CheckStatusBeforeRaiseEvent);
}

/// <inheritdoc />
Expand Down Expand Up @@ -690,7 +693,7 @@ async Task CheckForOrphanedLockAndFixIt(DurableOrchestrationStatus status, strin
ParentInstanceId = lockOwner,
LockRequestId = "fix-orphaned-lock", // we don't know the original id but it does not matter
};
await this.RaiseEventInternalAsync(this.client, this.TaskHubName, status.InstanceId, EntityMessageEventNames.ReleaseMessageEventName, message);
await this.RaiseEventInternalAsync(this.client, this.TaskHubName, status.InstanceId, EntityMessageEventNames.ReleaseMessageEventName, message, checkStatusFirst: false);
Interlocked.Increment(ref finalResult.NumberOfOrphanedLocksRemoved);
}
}
Expand Down Expand Up @@ -729,38 +732,48 @@ private async Task RaiseEventInternalAsync(
string taskHubName,
string instanceId,
string eventName,
object eventData)
object eventData,
bool checkStatusFirst)
{
OrchestrationState status = await GetOrchestrationInstanceStateAsync(taskHubClient, instanceId);
if (status == null)
{
return;
}

if (IsOrchestrationRunning(status))
if (checkStatusFirst)
{
// External events are not supposed to target any particular execution ID.
// We need to clear it to avoid sending messages to an expired ContinueAsNew instance.
status.OrchestrationInstance.ExecutionId = null;

await taskHubClient.RaiseEventAsync(status.OrchestrationInstance, eventName, eventData);
OrchestrationState status = await GetOrchestrationInstanceStateAsync(taskHubClient, instanceId);
if (status == null)
{
return;
}

this.traceHelper.FunctionScheduled(
taskHubName,
status.Name,
instanceId,
reason: "RaiseEvent:" + eventName,
functionType: FunctionType.Orchestrator,
isReplay: false);
if (IsOrchestrationRunning(status))
{
// External events are not supposed to target any particular execution ID.
// We need to clear it to avoid sending messages to an expired ContinueAsNew instance.
status.OrchestrationInstance.ExecutionId = null;

await taskHubClient.RaiseEventAsync(status.OrchestrationInstance, eventName, eventData);

this.traceHelper.FunctionScheduled(
taskHubName,
status.Name,
instanceId,
reason: "RaiseEvent:" + eventName,
functionType: FunctionType.Orchestrator,
isReplay: false);
}
else
{
this.traceHelper.ExtensionWarningEvent(
hubName: taskHubName,
functionName: status.Name,
instanceId: instanceId,
message: $"Cannot raise event for instance in {status.OrchestrationStatus} state");
throw new InvalidOperationException($"Cannot raise event {eventName} for orchestration instance {instanceId} because instance is in {status.OrchestrationStatus} state");
}
}
else
{
this.traceHelper.ExtensionWarningEvent(
hubName: taskHubName,
functionName: status.Name,
instanceId: instanceId,
message: $"Cannot raise event for instance in {status.OrchestrationStatus} state");
throw new InvalidOperationException($"Cannot raise event {eventName} for orchestration instance {instanceId} because instance is in {status.OrchestrationStatus} state");
// fast path: always raise the event (it will be silently discarded
// if the instance does not exist or is not running)
await taskHubClient.RaiseEventAsync(new OrchestrationInstance() { InstanceId = instanceId }, eventName, eventData);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,8 @@ Task<string> StartNewAsync<T>(
/// event named <paramref name="eventName"/> using the
/// <see cref="IDurableOrchestrationContext.WaitForExternalEvent{T}(string)"/> API.
/// </para><para>
/// If the specified instance is not found or not running, this operation will throw an exception.
/// If the specified instance is not found or not running, an exception may be thrown. This behavior depends on the selected storage provider
/// and the configuration setting <see cref="DurableTaskOptions.ThrowStatusExceptionsOnRaiseEvent"/>.
/// </para>
/// </remarks>
/// <exception cref="ArgumentException">The instance id does not corespond to a valid orchestration instance.</exception>
Expand Down
5 changes: 5 additions & 0 deletions src/WebJobs.Extensions.DurableTask/DurabilityProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,11 @@ public DurabilityProvider(string storageProviderName, IOrchestrationService serv
/// </summary>
public virtual bool SupportsImplicitEntityDeletion => false;

/// <summary>
/// Whether or not to check the instance status before raising an event.
/// </summary>
public virtual bool CheckStatusBeforeRaiseEvent => false;

/// <summary>
/// JSON representation of configuration to emit in telemetry.
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,15 @@ public string HubName
/// </remarks>
public bool RollbackEntityOperationsOnExceptions { get; set; } = true;

/// <summary>
/// Controls the behavior of <see cref="IDurableOrchestrationClient.RaiseEventAsync(string,string,object)"/> in situations where the specified orchestration
/// does not exist, or is not in a running state. If set to true, an exception is thrown. If set to false, the event is silently discarded.
/// </summary>
/// <remarks>
/// The default behavior depends on the selected storage provider.
/// </remarks>
public bool? ThrowStatusExceptionsOnRaiseEvent { get; set; } = null;

/// <summary>
/// If true, takes a lease on the task hub container, allowing for only one app to process messages in a task hub at a time.
/// </summary>
Expand Down
52 changes: 44 additions & 8 deletions test/Common/DurableClientBaseTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -67,32 +67,54 @@ public async Task SignalEntity_InvalidEntityKey_ThrowsException(string entityKey
await Assert.ThrowsAnyAsync<ArgumentException>(async () => await durableClient.SignalEntityAsync(entityId, "test"));
}

[Fact]
[Theory]
[Trait("Category", PlatformSpecificHelpers.TestCategory)]
public async Task RaiseEventAsync_InvalidInstanceId_ThrowsException()
[InlineData(false)]
[InlineData(true)]
public async Task RaiseEventAsync_InvalidInstanceId_ThrowsException(bool checkStatusBeforeRaiseEvent)
{
var instanceId = Guid.NewGuid().ToString();
var orchestrationServiceClientMock = new Mock<IOrchestrationServiceClient>();
orchestrationServiceClientMock.Setup(x => x.GetOrchestrationStateAsync(It.IsAny<string>(), It.IsAny<bool>())).ReturnsAsync(GetInvalidInstanceState());
var storageProvider = new DurabilityProvider("test", new Mock<IOrchestrationService>().Object, orchestrationServiceClientMock.Object, "test");
var storageProvider = new DurabilityProvider("test", new Mock<IOrchestrationService>().Object, orchestrationServiceClientMock.Object, "test", checkStatusBeforeRaiseEvent);
var durableExtension = GetDurableTaskConfig();
var durableOrchestrationClient = (IDurableOrchestrationClient)new DurableClient(storageProvider, durableExtension, durableExtension.HttpApiHandler, new DurableClientAttribute { });
await Assert.ThrowsAnyAsync<ArgumentException>(async () => await durableOrchestrationClient.RaiseEventAsync("invalid_instance_id", "anyEvent", new { message = "any message" }));
Task RaiseEvent() => durableOrchestrationClient.RaiseEventAsync("invalid_instance_id", "anyEvent", new { message = "any message" });

if (checkStatusBeforeRaiseEvent)
{
await Assert.ThrowsAnyAsync<ArgumentException>(RaiseEvent);
}
else
{
await RaiseEvent(); // no exception
}
}

[Fact]
[Theory]
[Trait("Category", PlatformSpecificHelpers.TestCategory)]
public async Task RaiseEventAsync_NonRunningFunction_ThrowsException()
[InlineData(false)]
[InlineData(true)]
public async Task RaiseEventAsync_NonRunningFunction_ThrowsException(bool checkStatusBeforeRaiseEvent)
{
var instanceId = Guid.NewGuid().ToString();
var orchestrationServiceClientMock = new Mock<IOrchestrationServiceClient>();
orchestrationServiceClientMock.Setup(x => x.GetOrchestrationStateAsync(It.IsAny<string>(), It.IsAny<bool>()))
.ReturnsAsync(GetInstanceState(OrchestrationStatus.Completed));
var storageProvider = new DurabilityProvider("test", new Mock<IOrchestrationService>().Object, orchestrationServiceClientMock.Object, "test");
var storageProvider = new DurabilityProvider("test", new Mock<IOrchestrationService>().Object, orchestrationServiceClientMock.Object, "test", checkStatusBeforeRaiseEvent);
var durableExtension = GetDurableTaskConfig();
var durableOrchestrationClient = (IDurableOrchestrationClient)new DurableClient(storageProvider, durableExtension, durableExtension.HttpApiHandler, new DurableClientAttribute { });

await Assert.ThrowsAnyAsync<InvalidOperationException>(async () => await durableOrchestrationClient.RaiseEventAsync("valid_instance_id", "anyEvent", new { message = "any message" }));
Task RaiseEvent() => durableOrchestrationClient.RaiseEventAsync("valid_instance_id", "anyEvent", new { message = "any message" });

if (checkStatusBeforeRaiseEvent)
{
await Assert.ThrowsAnyAsync<InvalidOperationException>(RaiseEvent);
}
else
{
await RaiseEvent(); // no exception
}
}

[Fact]
Expand Down Expand Up @@ -342,5 +364,19 @@ private static DurableTaskExtension GetDurableTaskConfig()
new DurableHttpMessageHandlerFactory(),
platformInformationService: platformInformationService);
}

// wraps the durability provider class so we can configure the CheckStatusBeforeRaiseEvent property
private class DurabilityProvider : Microsoft.Azure.WebJobs.Extensions.DurableTask.DurabilityProvider
{
private readonly bool checkStatusBeforeRaiseEvent;

public DurabilityProvider(string storageProviderName, IOrchestrationService service, IOrchestrationServiceClient serviceClient, string connectionName, bool checkStatusBeforeRaiseEvent = true)
: base(storageProviderName, service, serviceClient, connectionName)
{
this.checkStatusBeforeRaiseEvent = checkStatusBeforeRaiseEvent;
}

public override bool CheckStatusBeforeRaiseEvent => this.checkStatusBeforeRaiseEvent;
}
}
}

0 comments on commit 0dff548

Please sign in to comment.