From 4214dab8b51b7d8d29f2d7f904368fd3102976a3 Mon Sep 17 00:00:00 2001 From: KarlaCarvajal Date: Mon, 17 Feb 2025 18:18:37 -0500 Subject: [PATCH 1/2] feat(sdk-dotnet): Implement timeouts in tasks and external events nodes --- .../Workflow/Spec/NodeOutput.cs | 7 ++++++ .../Workflow/Spec/WorkflowThread.cs | 23 +++++++++++++++++++ 2 files changed, 30 insertions(+) diff --git a/sdk-dotnet/LittleHorse.Sdk/Workflow/Spec/NodeOutput.cs b/sdk-dotnet/LittleHorse.Sdk/Workflow/Spec/NodeOutput.cs index c03db0b89..c9ba23a62 100644 --- a/sdk-dotnet/LittleHorse.Sdk/Workflow/Spec/NodeOutput.cs +++ b/sdk-dotnet/LittleHorse.Sdk/Workflow/Spec/NodeOutput.cs @@ -23,4 +23,11 @@ public NodeOutput WithJsonPath(string path) return nodeOutput; } + + public NodeOutput WithTimeout(int timeoutSeconds) + { + Parent.AddTimeoutToExtEvt(this, timeoutSeconds); + + return this; + } } \ No newline at end of file diff --git a/sdk-dotnet/LittleHorse.Sdk/Workflow/Spec/WorkflowThread.cs b/sdk-dotnet/LittleHorse.Sdk/Workflow/Spec/WorkflowThread.cs index aeaa7fbc9..1011f63ef 100644 --- a/sdk-dotnet/LittleHorse.Sdk/Workflow/Spec/WorkflowThread.cs +++ b/sdk-dotnet/LittleHorse.Sdk/Workflow/Spec/WorkflowThread.cs @@ -522,4 +522,27 @@ internal VariableAssignment AssignVariableHelper(object? value) return variableAssignment; } + + internal void AddTimeoutToExtEvt(NodeOutput node, int timeoutSeconds) { + CheckIfWorkflowThreadIsActive(); + Node newNode = FindNode(node.NodeName); + + var timeoutValue = new VariableAssignment + { + LiteralValue = new VariableValue { Int = timeoutSeconds } + }; + + if (newNode.NodeCase == Node.NodeOneofCase.Task) + { + newNode.Task.TimeoutSeconds = timeoutSeconds; + } + else if (newNode.NodeCase == Node.NodeOneofCase.ExternalEvent) + { + newNode.ExternalEvent.TimeoutSeconds = timeoutValue; + } + else + { + throw new Exception("Timeouts are only supported on ExternalEvent and Task nodes."); + } + } } \ No newline at end of file From 9408887a995ad72c0092c6d78503eed3736b5d90 Mon Sep 17 00:00:00 2001 From: KarlaCarvajal Date: Tue, 18 Feb 2025 13:56:33 -0500 Subject: [PATCH 2/2] feat(sdk-dotnet): Implement retries in task nodes and timeouts in external events --- .../Workflow/Spec/LHVariableAssigmentTest.cs | 2 +- .../Spec/WorkflowThreadTaskRetriesTest.cs | 66 +++++++++++++++++++ .../Workflow/Spec/WorkflowThreadTest.cs | 8 ++- .../Workflow/Spec/NodeOutput.cs | 6 +- .../Workflow/Spec/TaskNodeOutput.cs | 22 +++++++ .../Workflow/Spec/WorkflowThread.cs | 34 +++++++++- 6 files changed, 128 insertions(+), 10 deletions(-) create mode 100644 sdk-dotnet/LittleHorse.Sdk.Tests/Workflow/Spec/WorkflowThreadTaskRetriesTest.cs create mode 100644 sdk-dotnet/LittleHorse.Sdk/Workflow/Spec/TaskNodeOutput.cs diff --git a/sdk-dotnet/LittleHorse.Sdk.Tests/Workflow/Spec/LHVariableAssigmentTest.cs b/sdk-dotnet/LittleHorse.Sdk.Tests/Workflow/Spec/LHVariableAssigmentTest.cs index 78e32830d..be3ccd1f4 100644 --- a/sdk-dotnet/LittleHorse.Sdk.Tests/Workflow/Spec/LHVariableAssigmentTest.cs +++ b/sdk-dotnet/LittleHorse.Sdk.Tests/Workflow/Spec/LHVariableAssigmentTest.cs @@ -55,7 +55,7 @@ public void VariableAssigment_WithWfRunVariableContainingJson_ShouldAssignDetail public void VariableAssigment_WithNodeOutput_ShouldAssignNodeOutputToVariable() { var nodeOutput = new NodeOutput("wait-to-collect-order-data", _parentWfThread); - nodeOutput.JsonPath = "$.order"; + nodeOutput.WithJsonPath("$.order"); var variableAssigment = _parentWfThread.AssignVariableHelper(nodeOutput); diff --git a/sdk-dotnet/LittleHorse.Sdk.Tests/Workflow/Spec/WorkflowThreadTaskRetriesTest.cs b/sdk-dotnet/LittleHorse.Sdk.Tests/Workflow/Spec/WorkflowThreadTaskRetriesTest.cs new file mode 100644 index 000000000..3f4c6e791 --- /dev/null +++ b/sdk-dotnet/LittleHorse.Sdk.Tests/Workflow/Spec/WorkflowThreadTaskRetriesTest.cs @@ -0,0 +1,66 @@ +using System; +using LittleHorse.Sdk.Common.Proto; +using LittleHorse.Sdk.Workflow.Spec; +using Moq; +using Xunit; + +namespace LittleHorse.Sdk.Tests.Workflow.Spec; + +public class WorkflowThreadTaskRetriesTest +{ + private Action _action; + void ParentEntrypoint(WorkflowThread thread) + { + } + + public WorkflowThreadTaskRetriesTest() + { + LHLoggerFactoryProvider.Initialize(null); + _action = ParentEntrypoint; + } + + [Fact] + public void WfThread_WithRetriesInTaskNode_ShouldCompile() + { + var numberOfExitNodes = 1; + var numberOfEntrypointNodes = 1; + var numberOfTasks = 1; + var workflowName = "TestWorkflow"; + var mockParentWorkflow = new Mock(workflowName, _action); + void EntryPointAction(WorkflowThread wf) + { + wf.Execute("greet").WithRetries(2); + } + var workflowThread = new WorkflowThread(mockParentWorkflow.Object, EntryPointAction); + + var compiledWfThread = workflowThread.Compile(); + + var expectedSpec = new ThreadSpec(); + + var entrypoint = new Node + { + Entrypoint = new EntrypointNode(), + OutgoingEdges = { new Edge { SinkNodeName = "1-greet-TASK" } } + }; + + var greetTask = new Node + { + Task = new TaskNode + { + TaskDefId = new TaskDefId { Name = "greet" }, + Retries = 2 + }, + OutgoingEdges = { new Edge { SinkNodeName = "2-exit-EXIT" } } + }; + + var exitNode = new Node { Exit = new ExitNode() }; + + expectedSpec.Nodes.Add("0-entrypoint-ENTRYPOINT", entrypoint); + expectedSpec.Nodes.Add("1-greet-TASK", greetTask); + expectedSpec.Nodes.Add("2-exit-EXIT", exitNode); + + var expectedNumberOfNodes = numberOfEntrypointNodes + numberOfExitNodes + numberOfTasks; + Assert.Equal(expectedNumberOfNodes, compiledWfThread.Nodes.Count); + Assert.Equal(expectedSpec, compiledWfThread); + } +} \ No newline at end of file diff --git a/sdk-dotnet/LittleHorse.Sdk.Tests/Workflow/Spec/WorkflowThreadTest.cs b/sdk-dotnet/LittleHorse.Sdk.Tests/Workflow/Spec/WorkflowThreadTest.cs index b4b133be5..048e07e53 100644 --- a/sdk-dotnet/LittleHorse.Sdk.Tests/Workflow/Spec/WorkflowThreadTest.cs +++ b/sdk-dotnet/LittleHorse.Sdk.Tests/Workflow/Spec/WorkflowThreadTest.cs @@ -156,18 +156,19 @@ void EntryPointAction(WorkflowThread wf) } [Fact] - public void WfThread_WithExternalEvent_ShouldCompileItInTheWorkflowThread() + public void WfThread_WithExternalEvent_ShouldCompile() { var numberOfExitNodes = 1; var numberOfEntrypointNodes = 1; var numberOfExternalEvents = 1; var numberOfTasks = 1; var workflowName = "TestWorkflow"; + var timeoutInSeconds = 30; var mockParentWorkflow = new Mock(workflowName, _action); void EntryPointAction(WorkflowThread wf) { WfRunVariable name = wf.DeclareStr("name"); - name.Assign(wf.WaitForEvent("name-event")); + name.Assign(wf.WaitForEvent("name-event").WithTimeout(timeoutInSeconds)); wf.Execute("greet", name); } var workflowThread = new WorkflowThread(mockParentWorkflow.Object, EntryPointAction); @@ -188,7 +189,8 @@ void EntryPointAction(WorkflowThread wf) { ExternalEvent = new ExternalEventNode { - ExternalEventDefId = new ExternalEventDefId { Name = "name-event" } + ExternalEventDefId = new ExternalEventDefId { Name = "name-event" }, + TimeoutSeconds = new VariableAssignment { LiteralValue = new VariableValue { Int = timeoutInSeconds }} }, OutgoingEdges = { diff --git a/sdk-dotnet/LittleHorse.Sdk/Workflow/Spec/NodeOutput.cs b/sdk-dotnet/LittleHorse.Sdk/Workflow/Spec/NodeOutput.cs index c9ba23a62..76fad0eb2 100644 --- a/sdk-dotnet/LittleHorse.Sdk/Workflow/Spec/NodeOutput.cs +++ b/sdk-dotnet/LittleHorse.Sdk/Workflow/Spec/NodeOutput.cs @@ -2,9 +2,9 @@ namespace LittleHorse.Sdk.Workflow.Spec; public class NodeOutput { - public string NodeName { get; set; } - public WorkflowThread Parent { get; set; } - public string? JsonPath { get; set; } + public string NodeName { get; private set; } + public WorkflowThread Parent { get; private set; } + public string? JsonPath { get; private set; } public NodeOutput(string nodeName, WorkflowThread parent) { diff --git a/sdk-dotnet/LittleHorse.Sdk/Workflow/Spec/TaskNodeOutput.cs b/sdk-dotnet/LittleHorse.Sdk/Workflow/Spec/TaskNodeOutput.cs new file mode 100644 index 000000000..3dea0bf0c --- /dev/null +++ b/sdk-dotnet/LittleHorse.Sdk/Workflow/Spec/TaskNodeOutput.cs @@ -0,0 +1,22 @@ +using LittleHorse.Sdk.Common.Proto; + +namespace LittleHorse.Sdk.Workflow.Spec; + +public class TaskNodeOutput : NodeOutput +{ + public TaskNodeOutput(string nodeName, WorkflowThread parent) : base(nodeName, parent) + { + } + + public TaskNodeOutput WithExponentialBackoff(ExponentialBackoffRetryPolicy policy) + { + Parent.OverrideTaskExponentialBackoffPolicy(this, policy); + return this; + } + + public TaskNodeOutput WithRetries(int retries) + { + Parent.OverrideTaskRetries(this, retries); + return this; + } +} \ No newline at end of file diff --git a/sdk-dotnet/LittleHorse.Sdk/Workflow/Spec/WorkflowThread.cs b/sdk-dotnet/LittleHorse.Sdk/Workflow/Spec/WorkflowThread.cs index 1011f63ef..c4238fd5a 100644 --- a/sdk-dotnet/LittleHorse.Sdk/Workflow/Spec/WorkflowThread.cs +++ b/sdk-dotnet/LittleHorse.Sdk/Workflow/Spec/WorkflowThread.cs @@ -182,7 +182,7 @@ public WfRunVariable AddVariable(string name, Object typeOrDefaultVal) /// pass that literal value in. /// /// A NodeOutput for that TASK node. - public NodeOutput Execute(string taskName, params object[] args) + public TaskNodeOutput Execute(string taskName, params object[] args) { CheckIfWorkflowThreadIsActive(); _parent.AddTaskDefName(taskName); @@ -190,7 +190,7 @@ public NodeOutput Execute(string taskName, params object[] args) new TaskNode { TaskDefId = new TaskDefId { Name = taskName } }, args); string nodeName = AddNode(taskName, Node.NodeOneofCase.Task, taskNode); - return new NodeOutput(nodeName, this); + return new TaskNodeOutput(nodeName, this); } private VariableAssignment AssignVariable(Object variable) @@ -523,7 +523,8 @@ internal VariableAssignment AssignVariableHelper(object? value) return variableAssignment; } - internal void AddTimeoutToExtEvt(NodeOutput node, int timeoutSeconds) { + internal void AddTimeoutToExtEvt(NodeOutput node, int timeoutSeconds) + { CheckIfWorkflowThreadIsActive(); Node newNode = FindNode(node.NodeName); @@ -545,4 +546,31 @@ internal void AddTimeoutToExtEvt(NodeOutput node, int timeoutSeconds) { throw new Exception("Timeouts are only supported on ExternalEvent and Task nodes."); } } + + internal void OverrideTaskExponentialBackoffPolicy(TaskNodeOutput node, ExponentialBackoffRetryPolicy policy) + { + var newNode = CheckTaskNode(node); + + newNode.Task.ExponentialBackoff = policy; + } + + internal void OverrideTaskRetries(TaskNodeOutput node, int retries) + { + var newNode = CheckTaskNode(node); + + newNode.Task.Retries = retries; + } + + private Node CheckTaskNode(TaskNodeOutput node) + { + CheckIfWorkflowThreadIsActive(); + Node newNode = FindNode(node.NodeName); + + if (newNode.NodeCase != Node.NodeOneofCase.Task) + { + throw new InvalidOperationException("Impossible to not have task node here"); + } + + return newNode; + } } \ No newline at end of file