Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sdk-dotnet): implement retries in task nodes and timeouts in external events #1300

Merged
merged 4 commits into from
Feb 18, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public void VariableAssigment_WithWfRunVariableContainingJson_ShouldAssignDetail
public void VariableAssigment_WithNodeOutput_ShouldAssignNodeOutputToVariable()
{
var nodeOutput = new NodeOutput("wait-to-collect-order-data", _parentWfThread);
nodeOutput.JsonPath = "$.order";
nodeOutput.WithJsonPath("$.order");

var variableAssigment = _parentWfThread.AssignVariableHelper(nodeOutput);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
using System;
using LittleHorse.Sdk.Common.Proto;
using LittleHorse.Sdk.Workflow.Spec;
using Moq;
using Xunit;

namespace LittleHorse.Sdk.Tests.Workflow.Spec;

public class WorkflowThreadTaskRetriesTest
{
private Action<WorkflowThread> _action;
void ParentEntrypoint(WorkflowThread thread)
{
}

public WorkflowThreadTaskRetriesTest()
{
LHLoggerFactoryProvider.Initialize(null);
_action = ParentEntrypoint;
}

[Fact]
public void WfThread_WithRetriesInTaskNode_ShouldCompile()
{
var numberOfExitNodes = 1;
var numberOfEntrypointNodes = 1;
var numberOfTasks = 1;
var workflowName = "TestWorkflow";
var mockParentWorkflow = new Mock<Sdk.Workflow.Spec.Workflow>(workflowName, _action);
void EntryPointAction(WorkflowThread wf)
{
wf.Execute("greet").WithRetries(2);
}
var workflowThread = new WorkflowThread(mockParentWorkflow.Object, EntryPointAction);

var compiledWfThread = workflowThread.Compile();

var expectedSpec = new ThreadSpec();

var entrypoint = new Node
{
Entrypoint = new EntrypointNode(),
OutgoingEdges = { new Edge { SinkNodeName = "1-greet-TASK" } }
};

var greetTask = new Node
{
Task = new TaskNode
{
TaskDefId = new TaskDefId { Name = "greet" },
Retries = 2
},
OutgoingEdges = { new Edge { SinkNodeName = "2-exit-EXIT" } }
};

var exitNode = new Node { Exit = new ExitNode() };

expectedSpec.Nodes.Add("0-entrypoint-ENTRYPOINT", entrypoint);
expectedSpec.Nodes.Add("1-greet-TASK", greetTask);
expectedSpec.Nodes.Add("2-exit-EXIT", exitNode);

var expectedNumberOfNodes = numberOfEntrypointNodes + numberOfExitNodes + numberOfTasks;
Assert.Equal(expectedNumberOfNodes, compiledWfThread.Nodes.Count);
Assert.Equal(expectedSpec, compiledWfThread);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -156,18 +156,19 @@ void EntryPointAction(WorkflowThread wf)
}

[Fact]
public void WfThread_WithExternalEvent_ShouldCompileItInTheWorkflowThread()
public void WfThread_WithExternalEvent_ShouldCompile()
{
var numberOfExitNodes = 1;
var numberOfEntrypointNodes = 1;
var numberOfExternalEvents = 1;
var numberOfTasks = 1;
var workflowName = "TestWorkflow";
var timeoutInSeconds = 30;
var mockParentWorkflow = new Mock<Sdk.Workflow.Spec.Workflow>(workflowName, _action);
void EntryPointAction(WorkflowThread wf)
{
WfRunVariable name = wf.DeclareStr("name");
name.Assign(wf.WaitForEvent("name-event"));
name.Assign(wf.WaitForEvent("name-event").WithTimeout(timeoutInSeconds));
wf.Execute("greet", name);
}
var workflowThread = new WorkflowThread(mockParentWorkflow.Object, EntryPointAction);
Expand All @@ -188,7 +189,8 @@ void EntryPointAction(WorkflowThread wf)
{
ExternalEvent = new ExternalEventNode
{
ExternalEventDefId = new ExternalEventDefId { Name = "name-event" }
ExternalEventDefId = new ExternalEventDefId { Name = "name-event" },
TimeoutSeconds = new VariableAssignment { LiteralValue = new VariableValue { Int = timeoutInSeconds }}
},
OutgoingEdges =
{
Expand Down
13 changes: 10 additions & 3 deletions sdk-dotnet/LittleHorse.Sdk/Workflow/Spec/NodeOutput.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ namespace LittleHorse.Sdk.Workflow.Spec;

public class NodeOutput
{
public string NodeName { get; set; }
public WorkflowThread Parent { get; set; }
public string? JsonPath { get; set; }
public string NodeName { get; private set; }
public WorkflowThread Parent { get; private set; }
public string? JsonPath { get; private set; }

public NodeOutput(string nodeName, WorkflowThread parent)
{
Expand All @@ -23,4 +23,11 @@ public NodeOutput WithJsonPath(string path)

return nodeOutput;
}

public NodeOutput WithTimeout(int timeoutSeconds)
{
Parent.AddTimeoutToExtEvt(this, timeoutSeconds);

return this;
}
}
22 changes: 22 additions & 0 deletions sdk-dotnet/LittleHorse.Sdk/Workflow/Spec/TaskNodeOutput.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
using LittleHorse.Sdk.Common.Proto;

namespace LittleHorse.Sdk.Workflow.Spec;

public class TaskNodeOutput : NodeOutput
{
public TaskNodeOutput(string nodeName, WorkflowThread parent) : base(nodeName, parent)
{
}

public TaskNodeOutput WithExponentialBackoff(ExponentialBackoffRetryPolicy policy)
{
Parent.OverrideTaskExponentialBackoffPolicy(this, policy);
return this;
}

public TaskNodeOutput WithRetries(int retries)
{
Parent.OverrideTaskRetries(this, retries);
return this;
}
}
55 changes: 53 additions & 2 deletions sdk-dotnet/LittleHorse.Sdk/Workflow/Spec/WorkflowThread.cs
Original file line number Diff line number Diff line change
Expand Up @@ -182,15 +182,15 @@ public WfRunVariable AddVariable(string name, Object typeOrDefaultVal)
/// pass that literal value in.
/// </param>
/// <returns>A NodeOutput for that TASK node.</returns>
public NodeOutput Execute(string taskName, params object[] args)
public TaskNodeOutput Execute(string taskName, params object[] args)
{
CheckIfWorkflowThreadIsActive();
_parent.AddTaskDefName(taskName);
var taskNode = CreateTaskNode(
new TaskNode { TaskDefId = new TaskDefId { Name = taskName } }, args);
string nodeName = AddNode(taskName, Node.NodeOneofCase.Task, taskNode);

return new NodeOutput(nodeName, this);
return new TaskNodeOutput(nodeName, this);
}

private VariableAssignment AssignVariable(Object variable)
Expand Down Expand Up @@ -522,4 +522,55 @@ internal VariableAssignment AssignVariableHelper(object? value)

return variableAssignment;
}

internal void AddTimeoutToExtEvt(NodeOutput node, int timeoutSeconds)
{
CheckIfWorkflowThreadIsActive();
Node newNode = FindNode(node.NodeName);

var timeoutValue = new VariableAssignment
{
LiteralValue = new VariableValue { Int = timeoutSeconds }
};

if (newNode.NodeCase == Node.NodeOneofCase.Task)
{
newNode.Task.TimeoutSeconds = timeoutSeconds;
}
else if (newNode.NodeCase == Node.NodeOneofCase.ExternalEvent)
{
newNode.ExternalEvent.TimeoutSeconds = timeoutValue;
}
else
{
throw new Exception("Timeouts are only supported on ExternalEvent and Task nodes.");
}
}

internal void OverrideTaskExponentialBackoffPolicy(TaskNodeOutput node, ExponentialBackoffRetryPolicy policy)
{
var newNode = CheckTaskNode(node);

newNode.Task.ExponentialBackoff = policy;
}

internal void OverrideTaskRetries(TaskNodeOutput node, int retries)
{
var newNode = CheckTaskNode(node);

newNode.Task.Retries = retries;
}

private Node CheckTaskNode(TaskNodeOutput node)
{
CheckIfWorkflowThreadIsActive();
Node newNode = FindNode(node.NodeName);

if (newNode.NodeCase != Node.NodeOneofCase.Task)
{
throw new InvalidOperationException("Impossible to not have task node here");
}

return newNode;
}
}