Skip to content

Commit

Permalink
Implement support for suspend/resume (#195)
Browse files Browse the repository at this point in the history
Also: Fix minor issue with DateTimeKind
  • Loading branch information
cgillum authored Nov 3, 2023
1 parent 8d26b5e commit ceddafa
Show file tree
Hide file tree
Showing 8 changed files with 109 additions and 27 deletions.
11 changes: 11 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,17 @@

(Add new notes here)

## v1.2.1

### New

* Support suspend/resume of orchestrations

### Updates

* SqlOrchestrationService.WaitForInstanceAsync no longer throws `TimeoutException` - only `OperationCanceledException` (previously could be either, depending on timing)
* Fix default DateTime values to have DateTimeKind of UTC (instead of Unspecified)

## v1.2.0

### New
Expand Down
1 change: 0 additions & 1 deletion src/DurableTask.SqlServer/Scripts/logic.sql
Original file line number Diff line number Diff line change
Expand Up @@ -630,7 +630,6 @@ BEGIN
E.[InstanceID] = I.[InstanceID]
WHERE
I.TaskHub = @TaskHub AND
I.[RuntimeStatus] NOT IN ('Suspended') AND
(I.[LockExpiration] IS NULL OR I.[LockExpiration] < @now) AND
(E.[VisibleTime] IS NULL OR E.[VisibleTime] < @now)

Expand Down
15 changes: 2 additions & 13 deletions src/DurableTask.SqlServer/SqlOrchestrationService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@ public override Task DeleteAsync(bool deleteInstanceStore)
currentStatus = SqlUtils.GetRuntimeStatus(reader);
isRunning =
currentStatus == OrchestrationStatus.Running ||
currentStatus == OrchestrationStatus.Suspended ||
currentStatus == OrchestrationStatus.Pending;
}
else
Expand Down Expand Up @@ -570,19 +571,7 @@ public override async Task<OrchestrationState> WaitForOrchestrationAsync(
return state;
}

try
{
await Task.Delay(TimeSpan.FromSeconds(1), combinedCts.Token);
}
catch (TaskCanceledException)
{
if (timeoutCts.Token.IsCancellationRequested)
{
throw new TimeoutException($"A caller-specified timeout of {timeout} has expired, but instance '{instanceId}' is still in an {state?.OrchestrationStatus.ToString() ?? "unknown"} state.");
}

throw;
}
await Task.Delay(TimeSpan.FromSeconds(1), combinedCts.Token);
}
}

Expand Down
29 changes: 20 additions & 9 deletions src/DurableTask.SqlServer/SqlUtils.cs
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,12 @@ public static HistoryEvent GetHistoryEvent(this DbDataReader reader, bool isOrch
TimerId = GetTaskId(reader),
};
break;
case EventType.ExecutionSuspended:
historyEvent = new ExecutionSuspendedEvent(eventId, GetPayloadText(reader));
break;
case EventType.ExecutionResumed:
historyEvent = new ExecutionResumedEvent(eventId, GetPayloadText(reader));
break;
default:
throw new InvalidOperationException($"Don't know how to interpret '{eventType}'.");
}
Expand Down Expand Up @@ -247,10 +253,10 @@ public static OrchestrationState GetOrchestrationState(this DbDataReader reader)

var state = new OrchestrationState
{
CompletedTime = reader.GetUtcDateTimeOrNull(reader.GetOrdinal("CompletedTime")) ?? default,
CreatedTime = reader.GetUtcDateTimeOrNull(reader.GetOrdinal("CreatedTime")) ?? default,
CompletedTime = GetUtcDateTime(reader, "CompletedTime"),
CreatedTime = GetUtcDateTime(reader, "CreatedTime"),
Input = reader.GetStringOrNull(reader.GetOrdinal("InputText")),
LastUpdatedTime = reader.GetUtcDateTimeOrNull(reader.GetOrdinal("LastUpdatedTime")) ?? default,
LastUpdatedTime = GetUtcDateTime(reader, "LastUpdatedTime"),
Name = GetName(reader),
Version = GetVersion(reader),
OrchestrationInstance = new OrchestrationInstance
Expand Down Expand Up @@ -411,23 +417,28 @@ internal static string GetInstanceId(DbDataReader reader)

static DateTime GetVisibleTime(DbDataReader reader)
{
int ordinal = reader.GetOrdinal("VisibleTime");
return GetUtcDateTime(reader, ordinal);
return GetUtcDateTime(reader, "VisibleTime");
}

static DateTime GetTimestamp(DbDataReader reader)
{
int ordinal = reader.GetOrdinal("Timestamp");
return GetUtcDateTime(reader, ordinal);
return GetUtcDateTime(reader, "Timestamp");
}

static DateTime? GetUtcDateTimeOrNull(this DbDataReader reader, int columnIndex)
static DateTime GetUtcDateTime(DbDataReader reader, string columnName)
{
return reader.IsDBNull(columnIndex) ? (DateTime?)null : GetUtcDateTime(reader, columnIndex);
int ordinal = reader.GetOrdinal(columnName);
return GetUtcDateTime(reader, ordinal);
}

static DateTime GetUtcDateTime(DbDataReader reader, int ordinal)
{
if (reader.IsDBNull(ordinal))
{
// Note that some serializers (like protobuf) won't accept non-UTC DateTime objects.
return DateTime.SpecifyKind(default, DateTimeKind.Utc);
}

// The SQL client always assumes DateTimeKind.Unspecified. We need to modify the result so that it knows it is UTC.
return DateTime.SpecifyKind(reader.GetDateTime(ordinal), DateTimeKind.Utc);
}
Expand Down
2 changes: 1 addition & 1 deletion src/common.props
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
<PropertyGroup>
<MajorVersion>1</MajorVersion>
<MinorVersion>2</MinorVersion>
<PatchVersion>0</PatchVersion>
<PatchVersion>1</PatchVersion>
<VersionPrefix>$(MajorVersion).$(MinorVersion).$(PatchVersion)</VersionPrefix>
<VersionSuffix></VersionSuffix>
<AssemblyVersion>$(MajorVersion).$(MinorVersion).0.0</AssemblyVersion>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -504,7 +504,7 @@ async Task ValidateDatabaseSchemaAsync(TestDatabase database, string schemaName
schemaName);
Assert.Equal(1, currentSchemaVersion.Major);
Assert.Equal(2, currentSchemaVersion.Minor);
Assert.Equal(0, currentSchemaVersion.Patch);
Assert.Equal(1, currentSchemaVersion.Patch);
}

sealed class TestDatabase : IDisposable
Expand Down
57 changes: 57 additions & 0 deletions test/DurableTask.SqlServer.Tests/Integration/Orchestrations.cs
Original file line number Diff line number Diff line change
Expand Up @@ -768,5 +768,62 @@ public async Task TraceContextFlowCorrectly()
Assert.True(activitySpan.Duration > delay);
Assert.True(activitySpan.Duration < delay * 2);
}

[Fact]
public async Task SuspendAndResumeInstance()
{
TaskCompletionSource<int> tcs = null;

const int EventCount = 5;
string orchestrationName = "SuspendResumeOrchestration";

TestInstance<string> instance = await this.testService.RunOrchestration<int, string>(
null,
orchestrationName,
implementation: async (ctx, _) =>
{
tcs = new TaskCompletionSource<int>();

int i;
for (i = 0; i < EventCount; i++)
{
await tcs.Task;
tcs = new TaskCompletionSource<int>();
}

return i;
},
onEvent: (ctx, name, value) =>
{
Assert.Equal("Event" + value, name);
tcs.TrySetResult(int.Parse(value));
});

// Wait for the orchestration to finish starting
await instance.WaitForStart();

// Suspend the orchestration so that it won't process any new events
await instance.SuspendAsync();

// Raise the events, which should get buffered but not consumed
for (int i = 0; i < EventCount; i++)
{
await instance.RaiseEventAsync($"Event{i}", i);
}

// Make sure that the orchestration *doesn't* complete
await Assert.ThrowsAnyAsync<OperationCanceledException>(
() => instance.WaitForCompletion(TimeSpan.FromSeconds(3), doNotAdjustTimeout: true));

// Confirm that the orchestration is in a suspended state
OrchestrationState state = await instance.GetStateAsync();
Assert.Equal(OrchestrationStatus.Suspended, state.OrchestrationStatus);

// Resume the orchestration
await instance.ResumeAsync();

// Now the orchestration should complete immediately
await instance.WaitForCompletion(timeout: TimeSpan.FromSeconds(3), expectedOutput: EventCount);
}
}
}
19 changes: 17 additions & 2 deletions test/DurableTask.SqlServer.Tests/Utils/TestInstance.cs
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,13 @@ public async Task<OrchestrationState> WaitForCompletion(
OrchestrationStatus expectedStatus = OrchestrationStatus.Completed,
object expectedOutput = null,
string expectedOutputRegex = null,
bool continuedAsNew = false)
bool continuedAsNew = false,
bool doNotAdjustTimeout = false)
{
AdjustTimeout(ref timeout);
if (!doNotAdjustTimeout)
{
AdjustTimeout(ref timeout);
}

OrchestrationState state = await this.client.WaitForOrchestrationAsync(this.GetInstanceForAnyExecution(), timeout);
Assert.NotNull(state);
Expand Down Expand Up @@ -158,6 +162,17 @@ internal async Task RestartAsync(TInput newInput, OrchestrationStatus[] dedupeSt
this.instance.ExecutionId = newInstance.ExecutionId;
}


internal Task SuspendAsync(string reason = null)
{
return this.client.SuspendInstanceAsync(this.instance, reason);
}

internal Task ResumeAsync(string reason = null)
{
return this.client.ResumeInstanceAsync(this.instance, reason);
}

static void AdjustTimeout(ref TimeSpan timeout)
{
timeout = timeout.AdjustForDebugging();
Expand Down

0 comments on commit ceddafa

Please sign in to comment.